Skip to main content

modelvault_core/catalog/
state.rs

1//! In-memory catalog: maps names to ids, tracks schema versions, and applies replayed records.
2//!
3//! State must match the sequence of `Schema` segments on disk.
4
5use std::collections::HashMap;
6
7use crate::catalog::codec::{CatalogRecordWire, MAX_COLLECTION_NAME_BYTES};
8use crate::error::{DbError, SchemaError};
9use crate::schema::{validate_field_defs, CollectionId, FieldDef, IndexDef, SchemaVersion};
10
11/// Snapshot of one registered collection (latest schema version).
12#[derive(Debug, Clone, PartialEq)]
13pub struct CollectionInfo {
14    pub id: CollectionId,
15    pub name: String,
16    pub current_version: SchemaVersion,
17    pub fields: Vec<FieldDef>,
18    pub indexes: Vec<IndexDef>,
19    /// Single top-level field name for the primary key (`None` for legacy catalog v1 segments).
20    pub primary_field: Option<String>,
21}
22
23/// Logical catalog: collection names, ids, and current schema version per collection.
24#[derive(Debug, Clone)]
25pub struct Catalog {
26    by_id: HashMap<u32, CollectionInfo>,
27    by_name: HashMap<String, CollectionId>,
28    /// Next `CollectionId` to assign on `create` (starts at `1` when empty).
29    next_id: u32,
30}
31
32impl Default for Catalog {
33    fn default() -> Self {
34        Self {
35            by_id: HashMap::new(),
36            by_name: HashMap::new(),
37            next_id: 1,
38        }
39    }
40}
41
42impl Catalog {
43    /// Next collection id that will be assigned (replay must produce sequential creates `1..n`).
44    pub fn next_collection_id(&self) -> CollectionId {
45        CollectionId(self.next_id)
46    }
47
48    pub fn is_empty(&self) -> bool {
49        self.by_id.is_empty()
50    }
51
52    pub fn len(&self) -> usize {
53        self.by_id.len()
54    }
55
56    pub fn get(&self, id: CollectionId) -> Option<&CollectionInfo> {
57        self.by_id.get(&id.0)
58    }
59
60    /// Resolve a registered collection by name (trimmed, matching [`Database::register_collection`](crate::db::Database::register_collection)).
61    pub fn lookup_name(&self, name: &str) -> Option<CollectionId> {
62        self.by_name.get(name.trim()).copied()
63    }
64
65    pub fn collection_names(&self) -> Vec<String> {
66        let mut names: Vec<String> = self.by_name.keys().cloned().collect();
67        names.sort();
68        names
69    }
70
71    pub fn collections(&self) -> Vec<CollectionInfo> {
72        let mut v: Vec<CollectionInfo> = self.by_id.values().cloned().collect();
73        v.sort_by_key(|c| c.id.0);
74        v
75    }
76
77    #[cfg(test)]
78    pub(crate) fn test_insert_collection_info(&mut self, info: CollectionInfo) {
79        // Test-only escape hatch for constructing deliberately inconsistent catalog states to
80        // exercise downstream error handling (checkpoint/replay hardening).
81        self.by_name.insert(info.name.clone(), info.id);
82        self.by_id.insert(info.id.0, info);
83        self.next_id = self.next_id.max(self.by_id.len() as u32 + 1);
84    }
85
86    /// `true` if `name` is a single-segment path on a top-level field.
87    pub fn has_top_level_field(fields: &[FieldDef], name: &str) -> bool {
88        fields
89            .iter()
90            .any(|f| f.path.0.len() == 1 && f.path.0[0] == name)
91    }
92
93    /// Apply one catalog record (from replay or after a local append).
94    pub fn apply_record(&mut self, record: CatalogRecordWire) -> Result<(), DbError> {
95        match record {
96            CatalogRecordWire::CreateCollection {
97                collection_id,
98                name,
99                schema_version,
100                fields,
101                indexes,
102                primary_field,
103            } => self.apply_create(
104                collection_id,
105                name,
106                schema_version,
107                fields,
108                indexes,
109                primary_field,
110            ),
111            CatalogRecordWire::NewSchemaVersion {
112                collection_id,
113                schema_version,
114                fields,
115                indexes,
116            } => self.apply_new_version(collection_id, schema_version, fields, indexes),
117        }
118    }
119
120    fn validate_name(name: &str) -> Result<(), DbError> {
121        if name.is_empty() {
122            return Err(DbError::Schema(SchemaError::InvalidCollectionName));
123        }
124        if name.len() > MAX_COLLECTION_NAME_BYTES {
125            return Err(DbError::Schema(SchemaError::InvalidCollectionName));
126        }
127        Ok(())
128    }
129
130    fn apply_create(
131        &mut self,
132        collection_id: u32,
133        name: String,
134        schema_version: u32,
135        fields: Vec<FieldDef>,
136        indexes: Vec<IndexDef>,
137        primary_field: Option<String>,
138    ) -> Result<(), DbError> {
139        Self::validate_name(&name)?;
140        if schema_version != 1 {
141            return Err(DbError::Schema(SchemaError::InvalidSchemaVersion {
142                expected: 1,
143                got: schema_version,
144            }));
145        }
146        if collection_id != self.next_id {
147            return Err(DbError::Schema(SchemaError::UnexpectedCollectionId {
148                expected: self.next_id,
149                got: collection_id,
150            }));
151        }
152        if self.by_name.contains_key(&name) {
153            return Err(DbError::Schema(SchemaError::DuplicateCollectionName {
154                name: name.clone(),
155            }));
156        }
157        validate_field_defs(&fields)?;
158        if let Some(ref pk) = primary_field {
159            if !Catalog::has_top_level_field(&fields, pk) {
160                return Err(DbError::Schema(SchemaError::PrimaryFieldNotFound {
161                    name: pk.clone(),
162                }));
163            }
164        }
165        let id = CollectionId(collection_id);
166        let info = CollectionInfo {
167            id,
168            name: name.clone(),
169            current_version: SchemaVersion(1),
170            fields,
171            indexes,
172            primary_field,
173        };
174        self.by_id.insert(collection_id, info);
175        self.by_name.insert(name, id);
176        self.next_id = collection_id.saturating_add(1);
177        Ok(())
178    }
179
180    fn apply_new_version(
181        &mut self,
182        collection_id: u32,
183        schema_version: u32,
184        fields: Vec<FieldDef>,
185        indexes: Vec<IndexDef>,
186    ) -> Result<(), DbError> {
187        let col = self.by_id.get_mut(&collection_id).ok_or(DbError::Schema(
188            SchemaError::UnknownCollection { id: collection_id },
189        ))?;
190        let expected = col.current_version.0.saturating_add(1);
191        if schema_version != expected {
192            return Err(DbError::Schema(SchemaError::InvalidSchemaVersion {
193                expected,
194                got: schema_version,
195            }));
196        }
197        validate_field_defs(&fields)?;
198        if let Some(ref pk) = col.primary_field {
199            if !Catalog::has_top_level_field(&fields, pk) {
200                return Err(DbError::Schema(SchemaError::PrimaryFieldMissingInSchema {
201                    name: pk.clone(),
202                }));
203            }
204        }
205        col.current_version = SchemaVersion(schema_version);
206        col.fields = fields;
207        col.indexes = indexes;
208        Ok(())
209    }
210}
211
212#[cfg(test)]
213mod tests {
214    include!(concat!(
215        env!("CARGO_MANIFEST_DIR"),
216        "/tests/unit/src_catalog_state_tests.rs"
217    ));
218}