1use std::collections::{BTreeMap, HashMap};
6
7use crate::catalog::codec::{CatalogRecordWire, MAX_COLLECTION_NAME_BYTES};
8use crate::error::{DbError, SchemaError};
9use crate::schema::{validate_field_defs, CollectionId, FieldDef, IndexDef, SchemaVersion};
10
11#[derive(Debug, Clone, PartialEq)]
13pub struct CollectionInfo {
14 pub id: CollectionId,
15 pub name: String,
16 pub current_version: SchemaVersion,
17 pub fields: Vec<FieldDef>,
18 pub indexes: Vec<IndexDef>,
19 pub primary_field: Option<String>,
21 pub version_history: BTreeMap<u32, Vec<FieldDef>>,
23}
24
25impl CollectionInfo {
26 pub fn fields_at_version(&self, version: u32) -> Option<&[FieldDef]> {
28 self.version_history.get(&version).map(|f| f.as_slice())
29 }
30}
31
32#[derive(Debug, Clone)]
34pub struct Catalog {
35 by_id: HashMap<u32, CollectionInfo>,
36 by_name: HashMap<String, CollectionId>,
37 next_id: u32,
39}
40
41impl Default for Catalog {
42 fn default() -> Self {
43 Self {
44 by_id: HashMap::new(),
45 by_name: HashMap::new(),
46 next_id: 1,
47 }
48 }
49}
50
51impl Catalog {
52 pub fn next_collection_id(&self) -> CollectionId {
54 CollectionId(self.next_id)
55 }
56
57 pub fn is_empty(&self) -> bool {
58 self.by_id.is_empty()
59 }
60
61 pub fn len(&self) -> usize {
62 self.by_id.len()
63 }
64
65 pub fn get(&self, id: CollectionId) -> Option<&CollectionInfo> {
66 self.by_id.get(&id.0)
67 }
68
69 pub fn lookup_name(&self, name: &str) -> Option<CollectionId> {
71 self.by_name.get(name.trim()).copied()
72 }
73
74 pub fn collection_names(&self) -> Vec<String> {
75 let mut names: Vec<String> = self.by_name.keys().cloned().collect();
76 names.sort();
77 names
78 }
79
80 pub fn collections(&self) -> Vec<CollectionInfo> {
81 let mut v: Vec<CollectionInfo> = self.by_id.values().cloned().collect();
82 v.sort_by_key(|c| c.id.0);
83 v
84 }
85
86 #[cfg(test)]
88 pub(crate) fn test_orphan_name_lookup(&mut self, name: &str, id: CollectionId) {
89 self.by_name.insert(name.trim().to_string(), id);
90 }
91
92 #[cfg(test)]
93 pub(crate) fn test_insert_collection_info(&mut self, info: CollectionInfo) {
94 self.by_name.insert(info.name.clone(), info.id);
97 self.by_id.insert(info.id.0, info);
98 self.next_id = self.next_id.max(self.by_id.len() as u32 + 1);
99 }
100
101 pub fn has_top_level_field(fields: &[FieldDef], name: &str) -> bool {
103 fields
104 .iter()
105 .any(|f| f.path.0.len() == 1 && f.path.0[0] == name)
106 }
107
108 pub fn apply_record(&mut self, record: CatalogRecordWire) -> Result<(), DbError> {
110 match record {
111 CatalogRecordWire::CreateCollection {
112 collection_id,
113 name,
114 schema_version,
115 fields,
116 indexes,
117 primary_field,
118 } => self.apply_create(
119 collection_id,
120 name,
121 schema_version,
122 fields,
123 indexes,
124 primary_field,
125 ),
126 CatalogRecordWire::NewSchemaVersion {
127 collection_id,
128 schema_version,
129 fields,
130 indexes,
131 } => self.apply_new_version(collection_id, schema_version, fields, indexes),
132 }
133 }
134
135 fn validate_name(name: &str) -> Result<(), DbError> {
136 if name.is_empty() {
137 return Err(DbError::Schema(SchemaError::InvalidCollectionName));
138 }
139 if name.len() > MAX_COLLECTION_NAME_BYTES {
140 return Err(DbError::Schema(SchemaError::InvalidCollectionName));
141 }
142 Ok(())
143 }
144
145 fn apply_create(
146 &mut self,
147 collection_id: u32,
148 name: String,
149 schema_version: u32,
150 fields: Vec<FieldDef>,
151 indexes: Vec<IndexDef>,
152 primary_field: Option<String>,
153 ) -> Result<(), DbError> {
154 Self::validate_name(&name)?;
155 if schema_version != 1 {
156 return Err(DbError::Schema(SchemaError::InvalidSchemaVersion {
157 expected: 1,
158 got: schema_version,
159 }));
160 }
161 if collection_id != self.next_id {
162 return Err(DbError::Schema(SchemaError::UnexpectedCollectionId {
163 expected: self.next_id,
164 got: collection_id,
165 }));
166 }
167 if self.by_name.contains_key(&name) {
168 return Err(DbError::Schema(SchemaError::DuplicateCollectionName {
169 name: name.clone(),
170 }));
171 }
172 validate_field_defs(&fields)?;
173 for f in &fields {
174 crate::validation::validate_constraints_at_registration(&f.constraints)?;
175 }
176 if let Some(ref pk) = primary_field {
177 if !Catalog::has_top_level_field(&fields, pk) {
178 return Err(DbError::Schema(SchemaError::PrimaryFieldNotFound {
179 name: pk.clone(),
180 }));
181 }
182 }
183 let id = CollectionId(collection_id);
184 let mut version_history = BTreeMap::new();
185 version_history.insert(1, fields.clone());
186 let info = CollectionInfo {
187 id,
188 name: name.clone(),
189 current_version: SchemaVersion(1),
190 fields,
191 indexes,
192 primary_field,
193 version_history,
194 };
195 self.by_id.insert(collection_id, info);
196 self.by_name.insert(name, id);
197 self.next_id = collection_id.saturating_add(1);
198 Ok(())
199 }
200
201 fn apply_new_version(
202 &mut self,
203 collection_id: u32,
204 schema_version: u32,
205 fields: Vec<FieldDef>,
206 indexes: Vec<IndexDef>,
207 ) -> Result<(), DbError> {
208 let col = self.by_id.get_mut(&collection_id).ok_or(DbError::Schema(
209 SchemaError::UnknownCollection { id: collection_id },
210 ))?;
211 let expected = col.current_version.0.saturating_add(1);
212 if schema_version != expected {
213 return Err(DbError::Schema(SchemaError::InvalidSchemaVersion {
214 expected,
215 got: schema_version,
216 }));
217 }
218 validate_field_defs(&fields)?;
219 for f in &fields {
220 crate::validation::validate_constraints_at_registration(&f.constraints)?;
221 }
222 if let Some(ref pk) = col.primary_field {
223 if !Catalog::has_top_level_field(&fields, pk) {
224 return Err(DbError::Schema(SchemaError::PrimaryFieldMissingInSchema {
225 name: pk.clone(),
226 }));
227 }
228 }
229 col.current_version = SchemaVersion(schema_version);
230 col.version_history.insert(schema_version, fields.clone());
231 col.fields = fields;
232 col.indexes = indexes;
233 Ok(())
234 }
235}
236
237#[cfg(test)]
238mod tests {
239 include!(concat!(
240 env!("CARGO_MANIFEST_DIR"),
241 "/tests/unit/src_catalog_state_tests.rs"
242 ));
243}