Skip to main content

modelvault_core/catalog/
state.rs

1//! In-memory catalog: maps names to ids, tracks schema versions, and applies replayed records.
2//!
3//! State must match the sequence of `Schema` segments on disk.
4
5use std::collections::HashMap;
6
7use crate::catalog::codec::{CatalogRecordWire, MAX_COLLECTION_NAME_BYTES};
8use crate::error::{DbError, SchemaError};
9use crate::schema::{validate_field_defs, CollectionId, FieldDef, IndexDef, SchemaVersion};
10
11/// Snapshot of one registered collection (latest schema version).
12#[derive(Debug, Clone, PartialEq)]
13pub struct CollectionInfo {
14    pub id: CollectionId,
15    pub name: String,
16    pub current_version: SchemaVersion,
17    pub fields: Vec<FieldDef>,
18    pub indexes: Vec<IndexDef>,
19    /// Single top-level field name for the primary key (`None` for legacy catalog v1 segments).
20    pub primary_field: Option<String>,
21}
22
23/// Logical catalog: collection names, ids, and current schema version per collection.
24#[derive(Debug, Clone)]
25pub struct Catalog {
26    by_id: HashMap<u32, CollectionInfo>,
27    by_name: HashMap<String, CollectionId>,
28    /// Next `CollectionId` to assign on `create` (starts at `1` when empty).
29    next_id: u32,
30}
31
32impl Default for Catalog {
33    fn default() -> Self {
34        Self {
35            by_id: HashMap::new(),
36            by_name: HashMap::new(),
37            next_id: 1,
38        }
39    }
40}
41
42impl Catalog {
43    /// Next collection id that will be assigned (replay must produce sequential creates `1..n`).
44    pub fn next_collection_id(&self) -> CollectionId {
45        CollectionId(self.next_id)
46    }
47
48    pub fn is_empty(&self) -> bool {
49        self.by_id.is_empty()
50    }
51
52    pub fn len(&self) -> usize {
53        self.by_id.len()
54    }
55
56    pub fn get(&self, id: CollectionId) -> Option<&CollectionInfo> {
57        self.by_id.get(&id.0)
58    }
59
60    /// Resolve a registered collection by name (trimmed, matching [`Database::register_collection`](crate::db::Database::register_collection)).
61    pub fn lookup_name(&self, name: &str) -> Option<CollectionId> {
62        self.by_name.get(name.trim()).copied()
63    }
64
65    pub fn collection_names(&self) -> Vec<String> {
66        let mut names: Vec<String> = self.by_name.keys().cloned().collect();
67        names.sort();
68        names
69    }
70
71    pub fn collections(&self) -> Vec<CollectionInfo> {
72        let mut v: Vec<CollectionInfo> = self.by_id.values().cloned().collect();
73        v.sort_by_key(|c| c.id.0);
74        v
75    }
76
77    /// Maps `name` to `id` without inserting into `by_id` (test-only catalog inconsistency).
78    #[cfg(test)]
79    pub(crate) fn test_orphan_name_lookup(&mut self, name: &str, id: CollectionId) {
80        self.by_name.insert(name.trim().to_string(), id);
81    }
82
83    #[cfg(test)]
84    pub(crate) fn test_insert_collection_info(&mut self, info: CollectionInfo) {
85        // Test-only escape hatch for constructing deliberately inconsistent catalog states to
86        // exercise downstream error handling (checkpoint/replay hardening).
87        self.by_name.insert(info.name.clone(), info.id);
88        self.by_id.insert(info.id.0, info);
89        self.next_id = self.next_id.max(self.by_id.len() as u32 + 1);
90    }
91
92    /// `true` if `name` is a single-segment path on a top-level field.
93    pub fn has_top_level_field(fields: &[FieldDef], name: &str) -> bool {
94        fields
95            .iter()
96            .any(|f| f.path.0.len() == 1 && f.path.0[0] == name)
97    }
98
99    /// Apply one catalog record (from replay or after a local append).
100    pub fn apply_record(&mut self, record: CatalogRecordWire) -> Result<(), DbError> {
101        match record {
102            CatalogRecordWire::CreateCollection {
103                collection_id,
104                name,
105                schema_version,
106                fields,
107                indexes,
108                primary_field,
109            } => self.apply_create(
110                collection_id,
111                name,
112                schema_version,
113                fields,
114                indexes,
115                primary_field,
116            ),
117            CatalogRecordWire::NewSchemaVersion {
118                collection_id,
119                schema_version,
120                fields,
121                indexes,
122            } => self.apply_new_version(collection_id, schema_version, fields, indexes),
123        }
124    }
125
126    fn validate_name(name: &str) -> Result<(), DbError> {
127        if name.is_empty() {
128            return Err(DbError::Schema(SchemaError::InvalidCollectionName));
129        }
130        if name.len() > MAX_COLLECTION_NAME_BYTES {
131            return Err(DbError::Schema(SchemaError::InvalidCollectionName));
132        }
133        Ok(())
134    }
135
136    fn apply_create(
137        &mut self,
138        collection_id: u32,
139        name: String,
140        schema_version: u32,
141        fields: Vec<FieldDef>,
142        indexes: Vec<IndexDef>,
143        primary_field: Option<String>,
144    ) -> Result<(), DbError> {
145        Self::validate_name(&name)?;
146        if schema_version != 1 {
147            return Err(DbError::Schema(SchemaError::InvalidSchemaVersion {
148                expected: 1,
149                got: schema_version,
150            }));
151        }
152        if collection_id != self.next_id {
153            return Err(DbError::Schema(SchemaError::UnexpectedCollectionId {
154                expected: self.next_id,
155                got: collection_id,
156            }));
157        }
158        if self.by_name.contains_key(&name) {
159            return Err(DbError::Schema(SchemaError::DuplicateCollectionName {
160                name: name.clone(),
161            }));
162        }
163        validate_field_defs(&fields)?;
164        if let Some(ref pk) = primary_field {
165            if !Catalog::has_top_level_field(&fields, pk) {
166                return Err(DbError::Schema(SchemaError::PrimaryFieldNotFound {
167                    name: pk.clone(),
168                }));
169            }
170        }
171        let id = CollectionId(collection_id);
172        let info = CollectionInfo {
173            id,
174            name: name.clone(),
175            current_version: SchemaVersion(1),
176            fields,
177            indexes,
178            primary_field,
179        };
180        self.by_id.insert(collection_id, info);
181        self.by_name.insert(name, id);
182        self.next_id = collection_id.saturating_add(1);
183        Ok(())
184    }
185
186    fn apply_new_version(
187        &mut self,
188        collection_id: u32,
189        schema_version: u32,
190        fields: Vec<FieldDef>,
191        indexes: Vec<IndexDef>,
192    ) -> Result<(), DbError> {
193        let col = self.by_id.get_mut(&collection_id).ok_or(DbError::Schema(
194            SchemaError::UnknownCollection { id: collection_id },
195        ))?;
196        let expected = col.current_version.0.saturating_add(1);
197        if schema_version != expected {
198            return Err(DbError::Schema(SchemaError::InvalidSchemaVersion {
199                expected,
200                got: schema_version,
201            }));
202        }
203        validate_field_defs(&fields)?;
204        if let Some(ref pk) = col.primary_field {
205            if !Catalog::has_top_level_field(&fields, pk) {
206                return Err(DbError::Schema(SchemaError::PrimaryFieldMissingInSchema {
207                    name: pk.clone(),
208                }));
209            }
210        }
211        col.current_version = SchemaVersion(schema_version);
212        col.fields = fields;
213        col.indexes = indexes;
214        Ok(())
215    }
216}
217
218#[cfg(test)]
219mod tests {
220    include!(concat!(
221        env!("CARGO_MANIFEST_DIR"),
222        "/tests/unit/src_catalog_state_tests.rs"
223    ));
224}