Skip to main content

modelvault_core/catalog/
state.rs

1//! In-memory catalog: maps names to ids, tracks schema versions, and applies replayed records.
2//!
3//! State must match the sequence of `Schema` segments on disk.
4
5use std::collections::{BTreeMap, HashMap};
6
7use crate::catalog::codec::{CatalogRecordWire, MAX_COLLECTION_NAME_BYTES};
8use crate::error::{DbError, SchemaError};
9use crate::schema::{validate_field_defs, CollectionId, FieldDef, IndexDef, SchemaVersion};
10
11/// Snapshot of one registered collection (latest schema version).
12#[derive(Debug, Clone, PartialEq)]
13pub struct CollectionInfo {
14    pub id: CollectionId,
15    pub name: String,
16    pub current_version: SchemaVersion,
17    pub fields: Vec<FieldDef>,
18    pub indexes: Vec<IndexDef>,
19    /// Single top-level field name for the primary key (`None` for legacy catalog v1 segments).
20    pub primary_field: Option<String>,
21    /// Field layouts keyed by schema version (populated on create and each schema bump).
22    pub version_history: BTreeMap<u32, Vec<FieldDef>>,
23}
24
25impl CollectionInfo {
26    /// Field layout recorded for `version`, if replay populated history for that version.
27    pub fn fields_at_version(&self, version: u32) -> Option<&[FieldDef]> {
28        self.version_history.get(&version).map(|f| f.as_slice())
29    }
30}
31
32/// Logical catalog: collection names, ids, and current schema version per collection.
33#[derive(Debug, Clone)]
34pub struct Catalog {
35    by_id: HashMap<u32, CollectionInfo>,
36    by_name: HashMap<String, CollectionId>,
37    /// Next `CollectionId` to assign on `create` (starts at `1` when empty).
38    next_id: u32,
39}
40
41impl Default for Catalog {
42    fn default() -> Self {
43        Self {
44            by_id: HashMap::new(),
45            by_name: HashMap::new(),
46            next_id: 1,
47        }
48    }
49}
50
51impl Catalog {
52    /// Next collection id that will be assigned (replay must produce sequential creates `1..n`).
53    pub fn next_collection_id(&self) -> CollectionId {
54        CollectionId(self.next_id)
55    }
56
57    pub fn is_empty(&self) -> bool {
58        self.by_id.is_empty()
59    }
60
61    pub fn len(&self) -> usize {
62        self.by_id.len()
63    }
64
65    pub fn get(&self, id: CollectionId) -> Option<&CollectionInfo> {
66        self.by_id.get(&id.0)
67    }
68
69    /// Resolve a registered collection by name (trimmed, matching [`Database::register_collection`](crate::db::Database::register_collection)).
70    pub fn lookup_name(&self, name: &str) -> Option<CollectionId> {
71        self.by_name.get(name.trim()).copied()
72    }
73
74    pub fn collection_names(&self) -> Vec<String> {
75        let mut names: Vec<String> = self.by_name.keys().cloned().collect();
76        names.sort();
77        names
78    }
79
80    pub fn collections(&self) -> Vec<CollectionInfo> {
81        let mut v: Vec<CollectionInfo> = self.by_id.values().cloned().collect();
82        v.sort_by_key(|c| c.id.0);
83        v
84    }
85
86    /// Maps `name` to `id` without inserting into `by_id` (test-only catalog inconsistency).
87    #[cfg(test)]
88    pub(crate) fn test_orphan_name_lookup(&mut self, name: &str, id: CollectionId) {
89        self.by_name.insert(name.trim().to_string(), id);
90    }
91
92    #[cfg(test)]
93    pub(crate) fn test_insert_collection_info(&mut self, info: CollectionInfo) {
94        // Test-only escape hatch for constructing deliberately inconsistent catalog states to
95        // exercise downstream error handling (checkpoint/replay hardening).
96        self.by_name.insert(info.name.clone(), info.id);
97        self.by_id.insert(info.id.0, info);
98        self.next_id = self.next_id.max(self.by_id.len() as u32 + 1);
99    }
100
101    /// `true` if `name` is a single-segment path on a top-level field.
102    pub fn has_top_level_field(fields: &[FieldDef], name: &str) -> bool {
103        fields
104            .iter()
105            .any(|f| f.path.0.len() == 1 && f.path.0[0] == name)
106    }
107
108    /// Apply one catalog record (from replay or after a local append).
109    pub fn apply_record(&mut self, record: CatalogRecordWire) -> Result<(), DbError> {
110        match record {
111            CatalogRecordWire::CreateCollection {
112                collection_id,
113                name,
114                schema_version,
115                fields,
116                indexes,
117                primary_field,
118            } => self.apply_create(
119                collection_id,
120                name,
121                schema_version,
122                fields,
123                indexes,
124                primary_field,
125            ),
126            CatalogRecordWire::NewSchemaVersion {
127                collection_id,
128                schema_version,
129                fields,
130                indexes,
131            } => self.apply_new_version(collection_id, schema_version, fields, indexes),
132        }
133    }
134
135    fn validate_name(name: &str) -> Result<(), DbError> {
136        if name.is_empty() {
137            return Err(DbError::Schema(SchemaError::InvalidCollectionName));
138        }
139        if name.len() > MAX_COLLECTION_NAME_BYTES {
140            return Err(DbError::Schema(SchemaError::InvalidCollectionName));
141        }
142        Ok(())
143    }
144
145    fn apply_create(
146        &mut self,
147        collection_id: u32,
148        name: String,
149        schema_version: u32,
150        fields: Vec<FieldDef>,
151        indexes: Vec<IndexDef>,
152        primary_field: Option<String>,
153    ) -> Result<(), DbError> {
154        Self::validate_name(&name)?;
155        if schema_version != 1 {
156            return Err(DbError::Schema(SchemaError::InvalidSchemaVersion {
157                expected: 1,
158                got: schema_version,
159            }));
160        }
161        if collection_id != self.next_id {
162            return Err(DbError::Schema(SchemaError::UnexpectedCollectionId {
163                expected: self.next_id,
164                got: collection_id,
165            }));
166        }
167        if self.by_name.contains_key(&name) {
168            return Err(DbError::Schema(SchemaError::DuplicateCollectionName {
169                name: name.clone(),
170            }));
171        }
172        validate_field_defs(&fields)?;
173        for f in &fields {
174            crate::validation::validate_constraints_at_registration(&f.constraints)?;
175        }
176        if let Some(ref pk) = primary_field {
177            if !Catalog::has_top_level_field(&fields, pk) {
178                return Err(DbError::Schema(SchemaError::PrimaryFieldNotFound {
179                    name: pk.clone(),
180                }));
181            }
182        }
183        let id = CollectionId(collection_id);
184        let mut version_history = BTreeMap::new();
185        version_history.insert(1, fields.clone());
186        let info = CollectionInfo {
187            id,
188            name: name.clone(),
189            current_version: SchemaVersion(1),
190            fields,
191            indexes,
192            primary_field,
193            version_history,
194        };
195        self.by_id.insert(collection_id, info);
196        self.by_name.insert(name, id);
197        self.next_id = collection_id.saturating_add(1);
198        Ok(())
199    }
200
201    fn apply_new_version(
202        &mut self,
203        collection_id: u32,
204        schema_version: u32,
205        fields: Vec<FieldDef>,
206        indexes: Vec<IndexDef>,
207    ) -> Result<(), DbError> {
208        let col = self.by_id.get_mut(&collection_id).ok_or(DbError::Schema(
209            SchemaError::UnknownCollection { id: collection_id },
210        ))?;
211        let expected = col.current_version.0.saturating_add(1);
212        if schema_version != expected {
213            return Err(DbError::Schema(SchemaError::InvalidSchemaVersion {
214                expected,
215                got: schema_version,
216            }));
217        }
218        validate_field_defs(&fields)?;
219        for f in &fields {
220            crate::validation::validate_constraints_at_registration(&f.constraints)?;
221        }
222        if let Some(ref pk) = col.primary_field {
223            if !Catalog::has_top_level_field(&fields, pk) {
224                return Err(DbError::Schema(SchemaError::PrimaryFieldMissingInSchema {
225                    name: pk.clone(),
226                }));
227            }
228        }
229        col.current_version = SchemaVersion(schema_version);
230        col.version_history.insert(schema_version, fields.clone());
231        col.fields = fields;
232        col.indexes = indexes;
233        Ok(())
234    }
235}
236
237#[cfg(test)]
238mod tests {
239    include!(concat!(
240        env!("CARGO_MANIFEST_DIR"),
241        "/tests/unit/src_catalog_state_tests.rs"
242    ));
243}