1use std::collections::HashMap;
6
7use crate::catalog::codec::{CatalogRecordWire, MAX_COLLECTION_NAME_BYTES};
8use crate::error::{DbError, SchemaError};
9use crate::schema::{validate_field_defs, CollectionId, FieldDef, IndexDef, SchemaVersion};
10
11#[derive(Debug, Clone, PartialEq)]
13pub struct CollectionInfo {
14 pub id: CollectionId,
15 pub name: String,
16 pub current_version: SchemaVersion,
17 pub fields: Vec<FieldDef>,
18 pub indexes: Vec<IndexDef>,
19 pub primary_field: Option<String>,
21}
22
23#[derive(Debug, Clone)]
25pub struct Catalog {
26 by_id: HashMap<u32, CollectionInfo>,
27 by_name: HashMap<String, CollectionId>,
28 next_id: u32,
30}
31
32impl Default for Catalog {
33 fn default() -> Self {
34 Self {
35 by_id: HashMap::new(),
36 by_name: HashMap::new(),
37 next_id: 1,
38 }
39 }
40}
41
42impl Catalog {
43 pub fn next_collection_id(&self) -> CollectionId {
45 CollectionId(self.next_id)
46 }
47
48 pub fn is_empty(&self) -> bool {
49 self.by_id.is_empty()
50 }
51
52 pub fn len(&self) -> usize {
53 self.by_id.len()
54 }
55
56 pub fn get(&self, id: CollectionId) -> Option<&CollectionInfo> {
57 self.by_id.get(&id.0)
58 }
59
60 pub fn lookup_name(&self, name: &str) -> Option<CollectionId> {
62 self.by_name.get(name.trim()).copied()
63 }
64
65 pub fn collection_names(&self) -> Vec<String> {
66 let mut names: Vec<String> = self.by_name.keys().cloned().collect();
67 names.sort();
68 names
69 }
70
71 pub fn collections(&self) -> Vec<CollectionInfo> {
72 let mut v: Vec<CollectionInfo> = self.by_id.values().cloned().collect();
73 v.sort_by_key(|c| c.id.0);
74 v
75 }
76
77 #[cfg(test)]
79 pub(crate) fn test_orphan_name_lookup(&mut self, name: &str, id: CollectionId) {
80 self.by_name.insert(name.trim().to_string(), id);
81 }
82
83 #[cfg(test)]
84 pub(crate) fn test_insert_collection_info(&mut self, info: CollectionInfo) {
85 self.by_name.insert(info.name.clone(), info.id);
88 self.by_id.insert(info.id.0, info);
89 self.next_id = self.next_id.max(self.by_id.len() as u32 + 1);
90 }
91
92 pub fn has_top_level_field(fields: &[FieldDef], name: &str) -> bool {
94 fields
95 .iter()
96 .any(|f| f.path.0.len() == 1 && f.path.0[0] == name)
97 }
98
99 pub fn apply_record(&mut self, record: CatalogRecordWire) -> Result<(), DbError> {
101 match record {
102 CatalogRecordWire::CreateCollection {
103 collection_id,
104 name,
105 schema_version,
106 fields,
107 indexes,
108 primary_field,
109 } => self.apply_create(
110 collection_id,
111 name,
112 schema_version,
113 fields,
114 indexes,
115 primary_field,
116 ),
117 CatalogRecordWire::NewSchemaVersion {
118 collection_id,
119 schema_version,
120 fields,
121 indexes,
122 } => self.apply_new_version(collection_id, schema_version, fields, indexes),
123 }
124 }
125
126 fn validate_name(name: &str) -> Result<(), DbError> {
127 if name.is_empty() {
128 return Err(DbError::Schema(SchemaError::InvalidCollectionName));
129 }
130 if name.len() > MAX_COLLECTION_NAME_BYTES {
131 return Err(DbError::Schema(SchemaError::InvalidCollectionName));
132 }
133 Ok(())
134 }
135
136 fn apply_create(
137 &mut self,
138 collection_id: u32,
139 name: String,
140 schema_version: u32,
141 fields: Vec<FieldDef>,
142 indexes: Vec<IndexDef>,
143 primary_field: Option<String>,
144 ) -> Result<(), DbError> {
145 Self::validate_name(&name)?;
146 if schema_version != 1 {
147 return Err(DbError::Schema(SchemaError::InvalidSchemaVersion {
148 expected: 1,
149 got: schema_version,
150 }));
151 }
152 if collection_id != self.next_id {
153 return Err(DbError::Schema(SchemaError::UnexpectedCollectionId {
154 expected: self.next_id,
155 got: collection_id,
156 }));
157 }
158 if self.by_name.contains_key(&name) {
159 return Err(DbError::Schema(SchemaError::DuplicateCollectionName {
160 name: name.clone(),
161 }));
162 }
163 validate_field_defs(&fields)?;
164 if let Some(ref pk) = primary_field {
165 if !Catalog::has_top_level_field(&fields, pk) {
166 return Err(DbError::Schema(SchemaError::PrimaryFieldNotFound {
167 name: pk.clone(),
168 }));
169 }
170 }
171 let id = CollectionId(collection_id);
172 let info = CollectionInfo {
173 id,
174 name: name.clone(),
175 current_version: SchemaVersion(1),
176 fields,
177 indexes,
178 primary_field,
179 };
180 self.by_id.insert(collection_id, info);
181 self.by_name.insert(name, id);
182 self.next_id = collection_id.saturating_add(1);
183 Ok(())
184 }
185
186 fn apply_new_version(
187 &mut self,
188 collection_id: u32,
189 schema_version: u32,
190 fields: Vec<FieldDef>,
191 indexes: Vec<IndexDef>,
192 ) -> Result<(), DbError> {
193 let col = self.by_id.get_mut(&collection_id).ok_or(DbError::Schema(
194 SchemaError::UnknownCollection { id: collection_id },
195 ))?;
196 let expected = col.current_version.0.saturating_add(1);
197 if schema_version != expected {
198 return Err(DbError::Schema(SchemaError::InvalidSchemaVersion {
199 expected,
200 got: schema_version,
201 }));
202 }
203 validate_field_defs(&fields)?;
204 if let Some(ref pk) = col.primary_field {
205 if !Catalog::has_top_level_field(&fields, pk) {
206 return Err(DbError::Schema(SchemaError::PrimaryFieldMissingInSchema {
207 name: pk.clone(),
208 }));
209 }
210 }
211 col.current_version = SchemaVersion(schema_version);
212 col.fields = fields;
213 col.indexes = indexes;
214 Ok(())
215 }
216}
217
218#[cfg(test)]
219mod tests {
220 include!(concat!(
221 env!("CARGO_MANIFEST_DIR"),
222 "/tests/unit/src_catalog_state_tests.rs"
223 ));
224}