1use std::collections::HashMap;
6
7use crate::catalog::codec::{CatalogRecordWire, MAX_COLLECTION_NAME_BYTES};
8use crate::error::{DbError, SchemaError};
9use crate::schema::{validate_field_defs, CollectionId, FieldDef, IndexDef, SchemaVersion};
10
11#[derive(Debug, Clone, PartialEq)]
13pub struct CollectionInfo {
14 pub id: CollectionId,
15 pub name: String,
16 pub current_version: SchemaVersion,
17 pub fields: Vec<FieldDef>,
18 pub indexes: Vec<IndexDef>,
19 pub primary_field: Option<String>,
21}
22
23#[derive(Debug, Clone)]
25pub struct Catalog {
26 by_id: HashMap<u32, CollectionInfo>,
27 by_name: HashMap<String, CollectionId>,
28 next_id: u32,
30}
31
32impl Default for Catalog {
33 fn default() -> Self {
34 Self {
35 by_id: HashMap::new(),
36 by_name: HashMap::new(),
37 next_id: 1,
38 }
39 }
40}
41
42impl Catalog {
43 pub fn next_collection_id(&self) -> CollectionId {
45 CollectionId(self.next_id)
46 }
47
48 pub fn is_empty(&self) -> bool {
49 self.by_id.is_empty()
50 }
51
52 pub fn len(&self) -> usize {
53 self.by_id.len()
54 }
55
56 pub fn get(&self, id: CollectionId) -> Option<&CollectionInfo> {
57 self.by_id.get(&id.0)
58 }
59
60 pub fn lookup_name(&self, name: &str) -> Option<CollectionId> {
62 self.by_name.get(name.trim()).copied()
63 }
64
65 pub fn collection_names(&self) -> Vec<String> {
66 let mut names: Vec<String> = self.by_name.keys().cloned().collect();
67 names.sort();
68 names
69 }
70
71 pub fn collections(&self) -> Vec<CollectionInfo> {
72 let mut v: Vec<CollectionInfo> = self.by_id.values().cloned().collect();
73 v.sort_by_key(|c| c.id.0);
74 v
75 }
76
77 #[cfg(test)]
78 pub(crate) fn test_insert_collection_info(&mut self, info: CollectionInfo) {
79 self.by_name.insert(info.name.clone(), info.id);
82 self.by_id.insert(info.id.0, info);
83 self.next_id = self.next_id.max(self.by_id.len() as u32 + 1);
84 }
85
86 pub fn has_top_level_field(fields: &[FieldDef], name: &str) -> bool {
88 fields
89 .iter()
90 .any(|f| f.path.0.len() == 1 && f.path.0[0] == name)
91 }
92
93 pub fn apply_record(&mut self, record: CatalogRecordWire) -> Result<(), DbError> {
95 match record {
96 CatalogRecordWire::CreateCollection {
97 collection_id,
98 name,
99 schema_version,
100 fields,
101 indexes,
102 primary_field,
103 } => self.apply_create(
104 collection_id,
105 name,
106 schema_version,
107 fields,
108 indexes,
109 primary_field,
110 ),
111 CatalogRecordWire::NewSchemaVersion {
112 collection_id,
113 schema_version,
114 fields,
115 indexes,
116 } => self.apply_new_version(collection_id, schema_version, fields, indexes),
117 }
118 }
119
120 fn validate_name(name: &str) -> Result<(), DbError> {
121 if name.is_empty() {
122 return Err(DbError::Schema(SchemaError::InvalidCollectionName));
123 }
124 if name.len() > MAX_COLLECTION_NAME_BYTES {
125 return Err(DbError::Schema(SchemaError::InvalidCollectionName));
126 }
127 Ok(())
128 }
129
130 fn apply_create(
131 &mut self,
132 collection_id: u32,
133 name: String,
134 schema_version: u32,
135 fields: Vec<FieldDef>,
136 indexes: Vec<IndexDef>,
137 primary_field: Option<String>,
138 ) -> Result<(), DbError> {
139 Self::validate_name(&name)?;
140 if schema_version != 1 {
141 return Err(DbError::Schema(SchemaError::InvalidSchemaVersion {
142 expected: 1,
143 got: schema_version,
144 }));
145 }
146 if collection_id != self.next_id {
147 return Err(DbError::Schema(SchemaError::UnexpectedCollectionId {
148 expected: self.next_id,
149 got: collection_id,
150 }));
151 }
152 if self.by_name.contains_key(&name) {
153 return Err(DbError::Schema(SchemaError::DuplicateCollectionName {
154 name: name.clone(),
155 }));
156 }
157 validate_field_defs(&fields)?;
158 if let Some(ref pk) = primary_field {
159 if !Catalog::has_top_level_field(&fields, pk) {
160 return Err(DbError::Schema(SchemaError::PrimaryFieldNotFound {
161 name: pk.clone(),
162 }));
163 }
164 }
165 let id = CollectionId(collection_id);
166 let info = CollectionInfo {
167 id,
168 name: name.clone(),
169 current_version: SchemaVersion(1),
170 fields,
171 indexes,
172 primary_field,
173 };
174 self.by_id.insert(collection_id, info);
175 self.by_name.insert(name, id);
176 self.next_id = collection_id.saturating_add(1);
177 Ok(())
178 }
179
180 fn apply_new_version(
181 &mut self,
182 collection_id: u32,
183 schema_version: u32,
184 fields: Vec<FieldDef>,
185 indexes: Vec<IndexDef>,
186 ) -> Result<(), DbError> {
187 let col = self.by_id.get_mut(&collection_id).ok_or(DbError::Schema(
188 SchemaError::UnknownCollection { id: collection_id },
189 ))?;
190 let expected = col.current_version.0.saturating_add(1);
191 if schema_version != expected {
192 return Err(DbError::Schema(SchemaError::InvalidSchemaVersion {
193 expected,
194 got: schema_version,
195 }));
196 }
197 validate_field_defs(&fields)?;
198 if let Some(ref pk) = col.primary_field {
199 if !Catalog::has_top_level_field(&fields, pk) {
200 return Err(DbError::Schema(SchemaError::PrimaryFieldMissingInSchema {
201 name: pk.clone(),
202 }));
203 }
204 }
205 col.current_version = SchemaVersion(schema_version);
206 col.fields = fields;
207 col.indexes = indexes;
208 Ok(())
209 }
210}
211
212#[cfg(test)]
213mod tests {
214 include!(concat!(
215 env!("CARGO_MANIFEST_DIR"),
216 "/tests/unit/src_catalog_state_tests.rs"
217 ));
218}