Skip to main content

hematite/catalog/
schema.rs

1//! Schema management for database metadata
2
3use super::column::Column;
4use super::ids::TableId;
5use super::object::{NamedConstraintKind, Trigger, View};
6use super::table::{CheckConstraint, ForeignKeyConstraint, SecondaryIndex, Table};
7use crate::error::HematiteError;
8use crate::Result;
9use std::collections::HashMap;
10
11#[derive(Debug, Clone)]
12pub struct Schema {
13    tables: HashMap<TableId, Table>,
14    table_names: HashMap<String, TableId>,
15    views: HashMap<String, View>,
16    triggers: HashMap<String, Trigger>,
17    next_table_id: u32,
18    next_column_id: u32,
19}
20
21impl Schema {
22    pub fn new() -> Self {
23        Self {
24            tables: HashMap::new(),
25            table_names: HashMap::new(),
26            views: HashMap::new(),
27            triggers: HashMap::new(),
28            next_table_id: 1,
29            next_column_id: 1,
30        }
31    }
32
33    pub fn create_table(&mut self, name: String, columns: Vec<Column>) -> Result<TableId> {
34        // Check for duplicate table name
35        if self.table_names.contains_key(&name) {
36            return Err(HematiteError::ParseError(format!(
37                "Table '{}' already exists",
38                name
39            )));
40        }
41
42        // Validate column names are unique
43        let mut column_names = std::collections::HashSet::new();
44        for col in &columns {
45            if column_names.contains(&col.name) {
46                return Err(HematiteError::ParseError(format!(
47                    "Duplicate column name '{}'",
48                    col.name
49                )));
50            }
51            column_names.insert(col.name.clone());
52        }
53
54        let table_id = TableId::new(self.next_table_id);
55        self.next_table_id += 1;
56
57        // For now, we'll use a placeholder root page ID
58        // This will be assigned when the table is actually created in storage
59        let root_page_id = 0u32;
60
61        let table = Table::new(table_id, name.clone(), columns, root_page_id)?;
62
63        self.tables.insert(table_id, table);
64        self.table_names.insert(name, table_id);
65
66        Ok(table_id)
67    }
68
69    pub fn create_table_with_roots(
70        &mut self,
71        name: String,
72        columns: Vec<Column>,
73        table_root_page_id: u32,
74        primary_key_root_page_id: u32,
75    ) -> Result<TableId> {
76        if self.table_names.contains_key(&name) {
77            return Err(HematiteError::ParseError(format!(
78                "Table '{}' already exists",
79                name
80            )));
81        }
82
83        let mut column_names = std::collections::HashSet::new();
84        for col in &columns {
85            if column_names.contains(&col.name) {
86                return Err(HematiteError::ParseError(format!(
87                    "Duplicate column name '{}'",
88                    col.name
89                )));
90            }
91            column_names.insert(col.name.clone());
92        }
93
94        let table_id = TableId::new(self.next_table_id);
95        self.next_table_id += 1;
96
97        let mut table = Table::new(table_id, name.clone(), columns, table_root_page_id)?;
98        table.primary_key_index_root_page_id = primary_key_root_page_id;
99
100        self.tables.insert(table_id, table);
101        self.table_names.insert(name, table_id);
102
103        Ok(table_id)
104    }
105
106    pub fn get_table(&self, table_id: TableId) -> Option<&Table> {
107        self.tables.get(&table_id)
108    }
109
110    pub fn get_table_by_name(&self, name: &str) -> Option<&Table> {
111        self.table_names
112            .get(name)
113            .and_then(|&id| self.tables.get(&id))
114    }
115
116    pub fn view(&self, name: &str) -> Option<&View> {
117        self.views.get(name)
118    }
119
120    pub fn trigger(&self, name: &str) -> Option<&Trigger> {
121        self.triggers.get(name)
122    }
123
124    pub fn create_view(&mut self, view: View) -> Result<()> {
125        view.validate()?;
126        if self.table_names.contains_key(&view.name) || self.views.contains_key(&view.name) {
127            return Err(HematiteError::ParseError(format!(
128                "Schema object '{}' already exists",
129                view.name
130            )));
131        }
132        self.views.insert(view.name.clone(), view);
133        Ok(())
134    }
135
136    pub fn drop_view(&mut self, name: &str) -> Result<View> {
137        self.views
138            .remove(name)
139            .ok_or_else(|| HematiteError::StorageError(format!("View '{}' does not exist", name)))
140    }
141
142    pub fn create_trigger(&mut self, trigger: Trigger) -> Result<()> {
143        trigger.validate()?;
144        if self.table_names.contains_key(&trigger.name)
145            || self.views.contains_key(&trigger.name)
146            || self.triggers.contains_key(&trigger.name)
147        {
148            return Err(HematiteError::ParseError(format!(
149                "Schema object '{}' already exists",
150                trigger.name
151            )));
152        }
153        self.triggers.insert(trigger.name.clone(), trigger);
154        Ok(())
155    }
156
157    pub fn drop_trigger(&mut self, name: &str) -> Result<Trigger> {
158        self.triggers.remove(name).ok_or_else(|| {
159            HematiteError::StorageError(format!("Trigger '{}' does not exist", name))
160        })
161    }
162
163    pub(crate) fn tables(&self) -> &HashMap<TableId, Table> {
164        &self.tables
165    }
166
167    pub fn drop_table(&mut self, table_id: TableId) -> Result<()> {
168        let table = self
169            .tables
170            .get(&table_id)
171            .ok_or_else(|| HematiteError::StorageError("Table not found".to_string()))?;
172
173        let name = table.name.clone();
174        self.tables.remove(&table_id);
175        self.table_names.remove(&name);
176
177        Ok(())
178    }
179
180    pub fn rename_table(&mut self, table_id: TableId, new_name: String) -> Result<()> {
181        if self.table_names.contains_key(&new_name) {
182            return Err(HematiteError::ParseError(format!(
183                "Table '{}' already exists",
184                new_name
185            )));
186        }
187
188        let table = self
189            .tables
190            .get_mut(&table_id)
191            .ok_or_else(|| HematiteError::StorageError("Table not found".to_string()))?;
192        let old_name = std::mem::replace(&mut table.name, new_name.clone());
193        self.table_names.remove(&old_name);
194        self.table_names.insert(new_name, table_id);
195        Ok(())
196    }
197
198    pub fn add_secondary_index(&mut self, table_id: TableId, index: SecondaryIndex) -> Result<()> {
199        let table = self
200            .tables
201            .get_mut(&table_id)
202            .ok_or_else(|| HematiteError::StorageError("Table not found".to_string()))?;
203        table.add_secondary_index(index)
204    }
205
206    pub fn add_column(&mut self, table_id: TableId, column: Column) -> Result<()> {
207        let table = self
208            .tables
209            .get_mut(&table_id)
210            .ok_or_else(|| HematiteError::StorageError("Table not found".to_string()))?;
211        self.next_column_id = self
212            .next_column_id
213            .max(column.id.as_u32().saturating_add(1));
214        table.add_column(column)
215    }
216
217    pub fn rename_column(
218        &mut self,
219        table_id: TableId,
220        old_name: &str,
221        new_name: String,
222    ) -> Result<()> {
223        let table_name = self
224            .tables
225            .get(&table_id)
226            .ok_or_else(|| HematiteError::StorageError("Table not found".to_string()))?
227            .name
228            .clone();
229        let table = self
230            .tables
231            .get_mut(&table_id)
232            .ok_or_else(|| HematiteError::StorageError("Table not found".to_string()))?;
233        table.rename_column(old_name, new_name.clone())?;
234
235        for other_table in self.tables.values_mut() {
236            other_table.rewrite_inbound_referenced_column(&table_name, old_name, &new_name);
237        }
238
239        Ok(())
240    }
241
242    pub fn drop_column(&mut self, table_id: TableId, column_name: &str) -> Result<usize> {
243        let table_name = self
244            .tables
245            .get(&table_id)
246            .ok_or_else(|| HematiteError::StorageError("Table not found".to_string()))?
247            .name
248            .clone();
249        if self.tables.values().any(|other_table| {
250            other_table.id != table_id
251                && other_table.foreign_keys.iter().any(|foreign_key| {
252                    foreign_key.referenced_table == table_name
253                        && foreign_key
254                            .referenced_columns
255                            .iter()
256                            .any(|referenced_column| referenced_column == column_name)
257                })
258        }) {
259            return Err(HematiteError::StorageError(format!(
260                "Column '{}' is referenced by a foreign key",
261                column_name
262            )));
263        }
264        let table = self
265            .tables
266            .get_mut(&table_id)
267            .ok_or_else(|| HematiteError::StorageError("Table not found".to_string()))?;
268        table.drop_column(column_name)
269    }
270
271    pub fn set_column_default(
272        &mut self,
273        table_id: TableId,
274        column_name: &str,
275        default_value: Option<super::types::Value>,
276    ) -> Result<()> {
277        let table = self
278            .tables
279            .get_mut(&table_id)
280            .ok_or_else(|| HematiteError::StorageError("Table not found".to_string()))?;
281        table.set_column_default(column_name, default_value)
282    }
283
284    pub fn set_column_nullable(
285        &mut self,
286        table_id: TableId,
287        column_name: &str,
288        nullable: bool,
289    ) -> Result<()> {
290        let table = self
291            .tables
292            .get_mut(&table_id)
293            .ok_or_else(|| HematiteError::StorageError("Table not found".to_string()))?;
294        table.set_column_nullable(column_name, nullable)
295    }
296
297    pub fn add_check_constraint(
298        &mut self,
299        table_id: TableId,
300        constraint: CheckConstraint,
301    ) -> Result<()> {
302        let table = self
303            .tables
304            .get_mut(&table_id)
305            .ok_or_else(|| HematiteError::StorageError("Table not found".to_string()))?;
306        table.add_check_constraint(constraint)
307    }
308
309    pub fn add_foreign_key(
310        &mut self,
311        table_id: TableId,
312        constraint: ForeignKeyConstraint,
313    ) -> Result<()> {
314        let table = self
315            .tables
316            .get_mut(&table_id)
317            .ok_or_else(|| HematiteError::StorageError("Table not found".to_string()))?;
318        table.add_foreign_key(constraint)
319    }
320
321    pub fn drop_secondary_index(&mut self, table_id: TableId, index_name: &str) -> Result<()> {
322        let table = self
323            .tables
324            .get_mut(&table_id)
325            .ok_or_else(|| HematiteError::StorageError("Table not found".to_string()))?;
326        let _ = table.drop_secondary_index(index_name)?;
327        Ok(())
328    }
329
330    pub fn drop_named_constraint(
331        &mut self,
332        table_id: TableId,
333        constraint_name: &str,
334    ) -> Result<NamedConstraintKind> {
335        let table = self
336            .tables
337            .get_mut(&table_id)
338            .ok_or_else(|| HematiteError::StorageError("Table not found".to_string()))?;
339        table.drop_named_constraint(constraint_name)
340    }
341
342    pub fn set_table_primary_key_root_page(
343        &mut self,
344        table_id: TableId,
345        root_page_id: u32,
346    ) -> Result<()> {
347        let table = self
348            .tables
349            .get_mut(&table_id)
350            .ok_or_else(|| HematiteError::StorageError("Table not found".to_string()))?;
351        table.primary_key_index_root_page_id = root_page_id;
352        Ok(())
353    }
354
355    pub fn set_table_storage_roots(
356        &mut self,
357        table_id: TableId,
358        table_root_page_id: u32,
359        primary_key_root_page_id: u32,
360    ) -> Result<()> {
361        let table = self
362            .tables
363            .get_mut(&table_id)
364            .ok_or_else(|| HematiteError::StorageError("Table not found".to_string()))?;
365        table.root_page_id = table_root_page_id;
366        table.primary_key_index_root_page_id = primary_key_root_page_id;
367        Ok(())
368    }
369
370    pub fn list_tables(&self) -> Vec<(TableId, String)> {
371        self.tables
372            .iter()
373            .map(|(&id, table)| (id, table.name.clone()))
374            .collect()
375    }
376
377    pub fn list_views(&self) -> Vec<String> {
378        self.views.keys().cloned().collect()
379    }
380
381    pub fn list_triggers(&self) -> Vec<String> {
382        self.triggers.keys().cloned().collect()
383    }
384
385    pub fn get_table_count(&self) -> usize {
386        self.tables.len()
387    }
388
389    pub fn get_total_column_count(&self) -> usize {
390        self.tables.values().map(|table| table.column_count()).sum()
391    }
392
393    pub fn next_table_id(&self) -> u32 {
394        self.next_table_id
395    }
396
397    pub fn next_column_id(&self) -> u32 {
398        self.next_column_id
399    }
400
401    pub fn validate(&self) -> Result<()> {
402        // Check for orphaned table names
403        for (name, &table_id) in &self.table_names {
404            if !self.tables.contains_key(&table_id) {
405                return Err(HematiteError::CorruptedData(format!(
406                    "Orphaned table name '{}' references non-existent table",
407                    name
408                )));
409            }
410        }
411
412        // Check for tables without names
413        for (&table_id, table) in &self.tables {
414            if !self.table_names.contains_key(&table.name) {
415                return Err(HematiteError::CorruptedData(format!(
416                    "Table '{}' ({}) has no name entry",
417                    table.name,
418                    table_id.as_u32()
419                )));
420            }
421        }
422
423        Ok(())
424    }
425
426    pub fn serialize(&self, buffer: &mut Vec<u8>) -> Result<()> {
427        // Next table ID (4 bytes)
428        buffer.extend_from_slice(&self.next_table_id.to_le_bytes());
429
430        // Next column ID (4 bytes)
431        buffer.extend_from_slice(&self.next_column_id.to_le_bytes());
432
433        // Table count (4 bytes)
434        buffer.extend_from_slice(&(self.tables.len() as u32).to_le_bytes());
435
436        // Tables
437        for table in self.tables.values() {
438            table.serialize(buffer)?;
439        }
440
441        buffer.extend_from_slice(&(self.views.len() as u32).to_le_bytes());
442        for view in self.views.values() {
443            view.serialize(buffer);
444        }
445
446        buffer.extend_from_slice(&(self.triggers.len() as u32).to_le_bytes());
447        for trigger in self.triggers.values() {
448            trigger.serialize(buffer);
449        }
450
451        Ok(())
452    }
453
454    pub fn deserialize(buffer: &[u8]) -> Result<Self> {
455        let mut offset = 0;
456
457        if buffer.len() < 12 {
458            return Err(HematiteError::CorruptedData(
459                "Invalid schema header".to_string(),
460            ));
461        }
462
463        // Next table ID
464        let next_table_id = u32::from_le_bytes([
465            buffer[offset],
466            buffer[offset + 1],
467            buffer[offset + 2],
468            buffer[offset + 3],
469        ]);
470        offset += 4;
471
472        // Next column ID
473        let next_column_id = u32::from_le_bytes([
474            buffer[offset],
475            buffer[offset + 1],
476            buffer[offset + 2],
477            buffer[offset + 3],
478        ]);
479        offset += 4;
480
481        // Table count
482        let table_count = u32::from_le_bytes([
483            buffer[offset],
484            buffer[offset + 1],
485            buffer[offset + 2],
486            buffer[offset + 3],
487        ]) as usize;
488        offset += 4;
489
490        let mut schema = Self {
491            tables: HashMap::new(),
492            table_names: HashMap::new(),
493            views: HashMap::new(),
494            triggers: HashMap::new(),
495            next_table_id,
496            next_column_id,
497        };
498
499        // Tables
500        for _ in 0..table_count {
501            let table = Table::deserialize(buffer, &mut offset)?;
502            let name = table.name.clone();
503            let id = table.id;
504
505            schema.table_names.insert(name, id);
506            schema.tables.insert(id, table);
507        }
508
509        if offset == buffer.len() {
510            return Ok(schema);
511        }
512
513        if offset + 4 > buffer.len() {
514            return Err(HematiteError::CorruptedData(
515                "Invalid view count".to_string(),
516            ));
517        }
518        let view_count = u32::from_le_bytes([
519            buffer[offset],
520            buffer[offset + 1],
521            buffer[offset + 2],
522            buffer[offset + 3],
523        ]) as usize;
524        offset += 4;
525
526        for _ in 0..view_count {
527            let view = View::deserialize(buffer, &mut offset)?;
528            schema.views.insert(view.name.clone(), view);
529        }
530
531        if offset == buffer.len() {
532            return Ok(schema);
533        }
534
535        if offset + 4 > buffer.len() {
536            return Err(HematiteError::CorruptedData(
537                "Invalid trigger count".to_string(),
538            ));
539        }
540        let trigger_count = u32::from_le_bytes([
541            buffer[offset],
542            buffer[offset + 1],
543            buffer[offset + 2],
544            buffer[offset + 3],
545        ]) as usize;
546        offset += 4;
547
548        for _ in 0..trigger_count {
549            let trigger = Trigger::deserialize(buffer, &mut offset)?;
550            schema.triggers.insert(trigger.name.clone(), trigger);
551        }
552
553        Ok(schema)
554    }
555
556    pub fn set_table_root_page(&mut self, table_id: TableId, root_page_id: u32) -> Result<()> {
557        let table = self
558            .tables
559            .get_mut(&table_id)
560            .ok_or_else(|| HematiteError::StorageError("Table not found".to_string()))?;
561
562        table.root_page_id = root_page_id;
563        Ok(())
564    }
565
566    /// Insert a fully-defined table (used when loading persisted schema).
567    pub fn insert_table(&mut self, table: Table) -> Result<()> {
568        // Check for duplicate table name
569        if self.table_names.contains_key(&table.name) {
570            return Err(HematiteError::CorruptedData(format!(
571                "Duplicate table name '{}' while loading schema",
572                table.name
573            )));
574        }
575        // Check for duplicate table id
576        if self.tables.contains_key(&table.id) {
577            return Err(HematiteError::CorruptedData(format!(
578                "Duplicate table id {} while loading schema",
579                table.id.as_u32()
580            )));
581        }
582
583        // Advance ID generators to avoid collisions on subsequent creates.
584        self.next_table_id = self.next_table_id.max(table.id.as_u32().saturating_add(1));
585        for col in &table.columns {
586            self.next_column_id = self.next_column_id.max(col.id.as_u32().saturating_add(1));
587        }
588
589        self.table_names.insert(table.name.clone(), table.id);
590        self.tables.insert(table.id, table);
591        Ok(())
592    }
593
594    pub fn insert_view(&mut self, view: View) -> Result<()> {
595        if self.table_names.contains_key(&view.name) || self.views.contains_key(&view.name) {
596            return Err(HematiteError::CorruptedData(format!(
597                "Duplicate schema object name '{}' while loading schema",
598                view.name
599            )));
600        }
601        self.views.insert(view.name.clone(), view);
602        Ok(())
603    }
604
605    pub fn insert_trigger(&mut self, trigger: Trigger) -> Result<()> {
606        if self.triggers.contains_key(&trigger.name) {
607            return Err(HematiteError::CorruptedData(format!(
608                "Duplicate trigger name '{}' while loading schema",
609                trigger.name
610            )));
611        }
612        self.triggers.insert(trigger.name.clone(), trigger);
613        Ok(())
614    }
615}