Skip to main content

hematite/catalog/
catalog.rs

1//! Relational catalog manager.
2//!
3//! The catalog owns schema-level state and coordinates it with the catalog engine.
4//!
5//! ```text
6//! in-memory schema --------------------+
7//!                                      |
8//! create/drop/alter style operations   |
9//!                                      v
10//!                               schema B-tree
11//!                                      |
12//!                                database header
13//! ```
14//!
15//! Core invariants:
16//! - the in-memory schema is authoritative while a catalog operation is running;
17//! - `schema_root` always names the durable schema tree recorded in page 0;
18//! - schema contents are written before the header is repointed at a new schema root;
19//! - transaction rollback restores both the schema snapshot and the engine snapshot.
20
21use crate::catalog::column::Column;
22use crate::catalog::engine::{CatalogEngine, CatalogEngineSnapshot, CatalogIntegrityReport};
23use crate::catalog::ids::TableId;
24use crate::catalog::object::{NamedConstraintKind, Trigger, View};
25use crate::catalog::schema::Schema;
26use crate::catalog::table::{CheckConstraint, ForeignKeyConstraint, SecondaryIndex, Table};
27use crate::catalog::JournalMode;
28use crate::error::Result;
29use std::collections::HashMap;
30#[derive(Debug)]
31pub struct Catalog {
32    engine: CatalogEngine,
33    schema: Schema,
34    schema_root: u32,
35    schema_dirty: bool,
36}
37
38#[derive(Debug, Clone)]
39pub(crate) struct CatalogSnapshot {
40    schema: Schema,
41    schema_root: u32,
42    schema_dirty: bool,
43    engine: CatalogEngineSnapshot,
44}
45
46impl Catalog {
47    /// Open or create a database with SQLite-style schema management
48    pub fn open_or_create<P: AsRef<std::path::Path>>(path: P) -> Result<Self> {
49        Self::open_with_engine(CatalogEngine::new(path)?)
50    }
51
52    pub fn open_in_memory() -> Result<Self> {
53        Self::open_with_engine(CatalogEngine::new_in_memory()?)
54    }
55
56    fn open_with_engine(mut engine: CatalogEngine) -> Result<Self> {
57        let existing_header = engine.read_database_header()?;
58
59        let header = match existing_header {
60            Some(header) => header,
61            None => {
62                // New database - create header and schema B-tree
63                let schema_root = engine.create_tree()?;
64                engine.initialize_database_header(schema_root)?
65            }
66        };
67
68        // Load schema from B-tree
69        let schema = engine.load_schema(header.schema_root_page)?;
70
71        Ok(Self {
72            engine,
73            schema,
74            schema_root: header.schema_root_page,
75            schema_dirty: false,
76        })
77    }
78
79    /// Save schema to the B-tree (transactional)
80    fn save_schema_to_btree(&mut self) -> Result<()> {
81        if !self.schema_dirty {
82            return Ok(());
83        }
84
85        let current_schema_root = self.engine.save_schema(&self.schema, self.schema_root)?;
86
87        let transaction_active = self.engine.transaction_active()?;
88        self.engine.update_database_header(|header| {
89            header.schema_root_page = current_schema_root;
90        })?;
91        if !transaction_active {
92            self.engine.flush()?;
93        }
94
95        self.schema_root = current_schema_root;
96        self.schema_dirty = false;
97        Ok(())
98    }
99
100    fn get_next_table_id(&mut self) -> Result<TableId> {
101        self.engine.allocate_table_id()
102    }
103
104    pub fn create_table(&mut self, name: &str, columns: Vec<Column>) -> Result<TableId> {
105        if self.schema.get_table_by_name(name).is_some() {
106            return Err(crate::error::HematiteError::StorageError(format!(
107                "Table '{}' already exists",
108                name
109            )));
110        }
111
112        let table_id = self.get_next_table_id()?;
113        let table = Table::new(table_id, name.to_string(), columns, 0u32)?;
114
115        self.schema.insert_table(table.clone())?;
116        self.schema_dirty = true;
117        self.save_schema_to_btree()?;
118
119        Ok(table_id)
120    }
121
122    pub fn create_table_with_roots(
123        &mut self,
124        name: &str,
125        columns: Vec<Column>,
126        table_root_page_id: u32,
127        primary_key_root_page_id: u32,
128    ) -> Result<TableId> {
129        if self.schema.get_table_by_name(name).is_some() {
130            return Err(crate::error::HematiteError::StorageError(format!(
131                "Table '{}' already exists",
132                name
133            )));
134        }
135
136        let table_id = self.get_next_table_id()?;
137        let mut table = Table::new(table_id, name.to_string(), columns, table_root_page_id)?;
138        table.primary_key_index_root_page_id = primary_key_root_page_id;
139
140        self.schema.insert_table(table)?;
141        self.schema_dirty = true;
142        self.save_schema_to_btree()?;
143
144        Ok(table_id)
145    }
146
147    pub fn get_table(&self, table_id: TableId) -> Result<Option<Table>> {
148        Ok(self.schema.get_table(table_id).cloned())
149    }
150
151    pub fn get_table_by_name(&self, name: &str) -> Result<Option<Table>> {
152        Ok(self.schema.get_table_by_name(name).cloned())
153    }
154
155    pub fn drop_table(&mut self, table_id: TableId) -> Result<()> {
156        let table = self.schema.get_table(table_id).cloned();
157        if table.is_none() {
158            return Err(crate::error::HematiteError::StorageError(
159                "Table not found".to_string(),
160            ));
161        }
162        let table = table.unwrap();
163        if let Some(view_name) = self.first_view_dependency_on(&table.name, None) {
164            return Err(crate::error::HematiteError::ParseError(format!(
165                "Cannot drop table '{}' because view '{}' depends on it",
166                table.name, view_name
167            )));
168        }
169        self.schema.drop_table(table_id)?;
170        self.schema_dirty = true;
171        self.save_schema_to_btree()?;
172
173        Ok(())
174    }
175
176    pub fn rename_table(&mut self, old_name: &str, new_name: &str) -> Result<()> {
177        let table = self.schema.get_table_by_name(old_name).ok_or_else(|| {
178            crate::error::HematiteError::StorageError(format!("Table '{}' not found", old_name))
179        })?;
180
181        self.schema.rename_table(table.id, new_name.to_string())?;
182        self.schema_dirty = true;
183        self.save_schema_to_btree()?;
184        Ok(())
185    }
186
187    pub fn add_column(&mut self, table_id: TableId, column: Column) -> Result<()> {
188        self.schema.add_column(table_id, column)?;
189        self.schema_dirty = true;
190        self.save_schema_to_btree()?;
191        Ok(())
192    }
193
194    pub fn rename_column(
195        &mut self,
196        table_id: TableId,
197        old_name: &str,
198        new_name: String,
199    ) -> Result<()> {
200        self.schema.rename_column(table_id, old_name, new_name)?;
201        self.schema_dirty = true;
202        self.save_schema_to_btree()?;
203        Ok(())
204    }
205
206    pub fn drop_column(&mut self, table_id: TableId, column_name: &str) -> Result<usize> {
207        let dropped_index = self.schema.drop_column(table_id, column_name)?;
208        self.schema_dirty = true;
209        self.save_schema_to_btree()?;
210        Ok(dropped_index)
211    }
212
213    pub fn set_column_default(
214        &mut self,
215        table_id: TableId,
216        column_name: &str,
217        default_value: Option<crate::catalog::Value>,
218    ) -> Result<()> {
219        self.schema
220            .set_column_default(table_id, column_name, default_value)?;
221        self.schema_dirty = true;
222        self.save_schema_to_btree()?;
223        Ok(())
224    }
225
226    pub fn set_column_nullable(
227        &mut self,
228        table_id: TableId,
229        column_name: &str,
230        nullable: bool,
231    ) -> Result<()> {
232        self.schema
233            .set_column_nullable(table_id, column_name, nullable)?;
234        self.schema_dirty = true;
235        self.save_schema_to_btree()?;
236        Ok(())
237    }
238
239    pub fn add_check_constraint(
240        &mut self,
241        table_id: TableId,
242        constraint: CheckConstraint,
243    ) -> Result<()> {
244        self.schema.add_check_constraint(table_id, constraint)?;
245        self.schema_dirty = true;
246        self.save_schema_to_btree()?;
247        Ok(())
248    }
249
250    pub fn add_foreign_key(
251        &mut self,
252        table_id: TableId,
253        constraint: ForeignKeyConstraint,
254    ) -> Result<()> {
255        self.schema.add_foreign_key(table_id, constraint)?;
256        self.schema_dirty = true;
257        self.save_schema_to_btree()?;
258        Ok(())
259    }
260
261    pub fn list_tables(&self) -> Result<Vec<(TableId, String)>> {
262        Ok(self.schema.list_tables())
263    }
264
265    pub fn create_view(&mut self, view: View) -> Result<()> {
266        if view
267            .dependencies
268            .iter()
269            .any(|dependency| dependency.eq_ignore_ascii_case(&view.name))
270        {
271            return Err(crate::error::HematiteError::ParseError(format!(
272                "View '{}' cannot depend on itself",
273                view.name
274            )));
275        }
276        for dependency in &view.dependencies {
277            if self.view_depends_on(dependency, &view.name) {
278                return Err(crate::error::HematiteError::ParseError(format!(
279                    "Creating view '{}' would introduce a recursive view cycle through '{}'",
280                    view.name, dependency
281                )));
282            }
283        }
284        self.schema.create_view(view)?;
285        self.schema_dirty = true;
286        self.save_schema_to_btree()
287    }
288
289    pub fn drop_view(&mut self, name: &str) -> Result<View> {
290        if let Some(view_name) = self.first_view_dependency_on(name, Some(name)) {
291            return Err(crate::error::HematiteError::ParseError(format!(
292                "Cannot drop view '{}' because view '{}' depends on it",
293                name, view_name
294            )));
295        }
296        let view = self.schema.drop_view(name)?;
297        self.schema_dirty = true;
298        self.save_schema_to_btree()?;
299        Ok(view)
300    }
301
302    pub fn get_view(&self, name: &str) -> Result<Option<View>> {
303        Ok(self.schema.view(name).cloned())
304    }
305
306    pub fn list_views(&self) -> Result<Vec<String>> {
307        Ok(self.schema.list_views())
308    }
309
310    pub fn create_trigger(&mut self, trigger: Trigger) -> Result<()> {
311        self.schema.create_trigger(trigger)?;
312        self.schema_dirty = true;
313        self.save_schema_to_btree()
314    }
315
316    pub fn drop_trigger(&mut self, name: &str) -> Result<Trigger> {
317        let trigger = self.schema.drop_trigger(name)?;
318        self.schema_dirty = true;
319        self.save_schema_to_btree()?;
320        Ok(trigger)
321    }
322
323    pub fn get_trigger(&self, name: &str) -> Result<Option<Trigger>> {
324        Ok(self.schema.trigger(name).cloned())
325    }
326
327    pub fn list_triggers(&self) -> Result<Vec<String>> {
328        Ok(self.schema.list_triggers())
329    }
330
331    pub fn drop_named_constraint(
332        &mut self,
333        table_id: TableId,
334        constraint_name: &str,
335    ) -> Result<NamedConstraintKind> {
336        let kind = self
337            .schema
338            .drop_named_constraint(table_id, constraint_name)?;
339        self.schema_dirty = true;
340        self.save_schema_to_btree()?;
341        Ok(kind)
342    }
343
344    pub fn get_schema(&self) -> &Schema {
345        &self.schema
346    }
347
348    fn first_view_dependency_on(
349        &self,
350        object_name: &str,
351        skip_view: Option<&str>,
352    ) -> Option<String> {
353        self.schema
354            .list_views()
355            .into_iter()
356            .filter(|view_name| !skip_view.is_some_and(|skip| view_name.eq_ignore_ascii_case(skip)))
357            .find(|view_name| {
358                self.schema.view(view_name).is_some_and(|view| {
359                    view.dependencies
360                        .iter()
361                        .any(|dependency| dependency.eq_ignore_ascii_case(object_name))
362                })
363            })
364    }
365
366    fn view_depends_on(&self, view_name: &str, target_name: &str) -> bool {
367        let Some(view) = self.schema.view(view_name) else {
368            return false;
369        };
370        view.dependencies.iter().any(|dependency| {
371            dependency.eq_ignore_ascii_case(target_name)
372                || self.view_depends_on(dependency, target_name)
373        })
374    }
375
376    pub fn clone_schema(&self) -> Schema {
377        self.schema.clone()
378    }
379
380    pub fn with_engine<F, T>(&mut self, f: F) -> Result<T>
381    where
382        F: FnOnce(&mut CatalogEngine) -> Result<T>,
383    {
384        f(&mut self.engine)
385    }
386
387    pub(crate) fn with_read_engine<F, T>(&mut self, f: F) -> Result<T>
388    where
389        F: FnOnce(&mut CatalogEngine) -> Result<T>,
390    {
391        self.engine.begin_read()?;
392        let result = f(&mut self.engine);
393        let release = self.engine.end_read();
394        match (result, release) {
395            (Ok(value), Ok(())) => Ok(value),
396            (Err(err), _) => Err(err),
397            (Ok(_), Err(err)) => Err(err),
398        }
399    }
400
401    pub(crate) fn snapshot(&self) -> Result<CatalogSnapshot> {
402        Ok(CatalogSnapshot {
403            schema: self.schema.clone(),
404            schema_root: self.schema_root,
405            schema_dirty: self.schema_dirty,
406            engine: self.engine.snapshot()?,
407        })
408    }
409
410    pub(crate) fn restore_snapshot(&mut self, snapshot: CatalogSnapshot) -> Result<()> {
411        self.schema = snapshot.schema;
412        self.schema_root = snapshot.schema_root;
413        self.schema_dirty = snapshot.schema_dirty;
414        self.engine.restore_snapshot(snapshot.engine)
415    }
416
417    pub(crate) fn begin_transaction(&mut self) -> Result<()> {
418        self.engine.begin_transaction()
419    }
420
421    pub(crate) fn commit_transaction(&mut self) -> Result<()> {
422        self.save_schema_to_btree()?;
423        self.engine.commit_transaction()
424    }
425
426    pub(crate) fn rollback_transaction(&mut self) -> Result<()> {
427        self.engine.rollback_transaction()
428    }
429
430    pub fn flush_schema(&mut self) -> Result<()> {
431        self.save_schema_to_btree()
432    }
433
434    pub fn flush(&mut self) -> Result<()> {
435        self.save_schema_to_btree()?;
436        self.engine.flush()
437    }
438
439    pub fn journal_mode(&self) -> Result<JournalMode> {
440        self.engine.journal_mode()
441    }
442
443    pub fn set_journal_mode(&mut self, journal_mode: JournalMode) -> Result<()> {
444        self.save_schema_to_btree()?;
445        self.engine.set_journal_mode(journal_mode)
446    }
447
448    pub fn checkpoint_wal(&mut self) -> Result<()> {
449        self.save_schema_to_btree()?;
450        self.engine.checkpoint_wal()
451    }
452
453    pub fn replace_schema(&mut self, schema: Schema) -> Result<()> {
454        self.schema = schema;
455        self.schema_dirty = true;
456        self.save_schema_to_btree()?;
457        self.engine.set_next_table_id(self.schema.next_table_id())
458    }
459
460    pub fn set_table_root_page(&mut self, table_id: TableId, root_page: u32) -> Result<()> {
461        if self.schema.get_table(table_id).is_none() {
462            return Err(crate::error::HematiteError::StorageError(format!(
463                "Table ID {} not found",
464                table_id.as_u32()
465            )));
466        }
467
468        if root_page == 0 {
469            return Err(crate::error::HematiteError::StorageError(
470                "Root page 0 is reserved for database header".to_string(),
471            ));
472        }
473
474        self.schema.set_table_root_page(table_id, root_page)?;
475        self.schema_dirty = true;
476        self.save_schema_to_btree()?;
477
478        Ok(())
479    }
480
481    pub fn get_table_root_page(&self, table_id: TableId) -> Result<Option<u32>> {
482        if let Some(table) = self.schema.get_table(table_id) {
483            if table.root_page_id == 0 {
484                Ok(None)
485            } else {
486                Ok(Some(table.root_page_id))
487            }
488        } else {
489            Ok(None)
490        }
491    }
492
493    pub fn add_secondary_index(&mut self, table_id: TableId, index: SecondaryIndex) -> Result<()> {
494        self.schema.add_secondary_index(table_id, index)?;
495        self.schema_dirty = true;
496        self.save_schema_to_btree()?;
497
498        Ok(())
499    }
500
501    pub fn set_table_primary_key_root_page(
502        &mut self,
503        table_id: TableId,
504        root_page_id: u32,
505    ) -> Result<()> {
506        if root_page_id == 0 {
507            return Err(crate::error::HematiteError::StorageError(
508                "Root page 0 is reserved for database header".to_string(),
509            ));
510        }
511
512        self.schema
513            .set_table_primary_key_root_page(table_id, root_page_id)?;
514        self.schema_dirty = true;
515        self.save_schema_to_btree()?;
516
517        Ok(())
518    }
519
520    pub fn set_table_storage_roots(
521        &mut self,
522        table_id: TableId,
523        table_root_page_id: u32,
524        primary_key_root_page_id: u32,
525    ) -> Result<()> {
526        if table_root_page_id == 0 || primary_key_root_page_id == 0 {
527            return Err(crate::error::HematiteError::StorageError(
528                "Root page 0 is reserved for database header".to_string(),
529            ));
530        }
531
532        self.schema.set_table_storage_roots(
533            table_id,
534            table_root_page_id,
535            primary_key_root_page_id,
536        )?;
537        self.schema_dirty = true;
538        self.save_schema_to_btree()?;
539
540        Ok(())
541    }
542
543    pub fn validate_schema(&self) -> Result<()> {
544        let schema_result = self.schema.validate();
545
546        for (table_id, table_name) in self.list_tables()? {
547            let table = self.schema.get_table(table_id).ok_or_else(|| {
548                crate::error::HematiteError::StorageError(format!(
549                    "Table {} found in list but not in schema",
550                    table_name
551                ))
552            })?;
553
554            if table.root_page_id == 0 {
555                continue;
556            }
557        }
558
559        schema_result
560    }
561
562    pub fn validate_integrity(&mut self) -> Result<CatalogIntegrityReport> {
563        self.validate_schema()?;
564
565        let schema_tables = self
566            .schema
567            .list_tables()
568            .into_iter()
569            .filter_map(|(table_id, table_name)| {
570                self.schema
571                    .get_table(table_id)
572                    .map(|table| (table_name, table.root_page_id))
573            })
574            .collect::<HashMap<_, _>>();
575
576        let storage_tables = self
577            .engine
578            .get_table_metadata()
579            .iter()
580            .map(|(name, metadata)| (name.clone(), metadata.root_page_id))
581            .collect::<HashMap<_, _>>();
582
583        for (table_name, root_page_id) in &schema_tables {
584            let storage_root = storage_tables.get(table_name).ok_or_else(|| {
585                crate::error::HematiteError::CorruptedData(format!(
586                    "Catalog table '{}' is missing from storage metadata",
587                    table_name
588                ))
589            })?;
590
591            if storage_root != root_page_id {
592                return Err(crate::error::HematiteError::CorruptedData(format!(
593                    "Catalog/storage root mismatch for table '{}': catalog={}, storage={}",
594                    table_name, root_page_id, storage_root
595                )));
596            }
597        }
598
599        for table_name in storage_tables.keys() {
600            if !schema_tables.contains_key(table_name) {
601                return Err(crate::error::HematiteError::CorruptedData(format!(
602                    "Storage metadata contains table '{}' missing from catalog schema",
603                    table_name
604                )));
605            }
606        }
607
608        let tables = self
609            .schema
610            .list_tables()
611            .into_iter()
612            .filter_map(|(table_id, _)| self.schema.get_table(table_id).cloned())
613            .collect::<Vec<_>>();
614        let mut report = self.engine.validate_integrity()?;
615        let usage = self.engine.validate_catalog_layout(&tables)?;
616        report.live_page_count = usage.live_table_pages;
617        report.index_page_count = usage.live_index_pages;
618        Ok(report)
619    }
620
621    pub fn get_total_column_count(&self) -> usize {
622        self.schema.get_total_column_count()
623    }
624
625    pub fn get_table_stats(&self, table_id: TableId) -> Result<Option<TableStats>> {
626        if let Some(table) = self.schema.get_table(table_id) {
627            Ok(Some(TableStats {
628                id: table.id,
629                name: table.name.clone(),
630                column_count: table.column_count(),
631                primary_key_count: table.primary_key_count(),
632                root_page_id: table.root_page_id,
633                row_size: table.row_size(),
634            }))
635        } else {
636            Ok(None)
637        }
638    }
639
640    pub fn get_all_table_stats(&self) -> Result<Vec<TableStats>> {
641        let tables = self.list_tables()?;
642        let mut stats = Vec::new();
643
644        for (table_id, _name) in tables {
645            if let Some(table_stat) = self.get_table_stats(table_id)? {
646                stats.push(table_stat);
647            }
648        }
649
650        Ok(stats)
651    }
652
653    pub fn table_exists(&self, name: &str) -> bool {
654        self.schema.get_table_by_name(name).is_some()
655    }
656
657    pub fn table_exists_by_id(&self, table_id: TableId) -> bool {
658        self.schema.get_table(table_id).is_some()
659    }
660
661    pub fn peek_next_table_id(&self) -> Result<TableId> {
662        self.engine.peek_next_table_id()
663    }
664
665    pub fn create_table_with_root(
666        &mut self,
667        name: &str,
668        columns: Vec<Column>,
669        root_page: u32,
670    ) -> Result<TableId> {
671        if self.schema.get_table_by_name(name).is_some() {
672            return Err(crate::error::HematiteError::StorageError(format!(
673                "Table '{}' already exists",
674                name
675            )));
676        }
677
678        let table_id = self.get_next_table_id()?;
679
680        let table = Table::new(table_id, name.to_string(), columns, root_page)?;
681
682        self.schema.insert_table(table.clone())?;
683        self.schema_dirty = true;
684        self.save_schema_to_btree()?;
685
686        Ok(table_id)
687    }
688
689    pub fn get_table_columns(&self, table_id: TableId) -> Result<Option<Vec<Column>>> {
690        if let Some(table) = self.schema.get_table(table_id) {
691            Ok(Some(table.columns.clone()))
692        } else {
693            Ok(None)
694        }
695    }
696
697    pub fn get_table_columns_by_name(&self, name: &str) -> Result<Option<Vec<Column>>> {
698        if let Some(table) = self.schema.get_table_by_name(name) {
699            Ok(Some(table.columns.clone()))
700        } else {
701            Ok(None)
702        }
703    }
704
705    pub fn get_primary_key_columns(&self, table_id: TableId) -> Result<Option<Vec<Column>>> {
706        if let Some(table) = self.schema.get_table(table_id) {
707            let pk_columns = table
708                .primary_key_columns
709                .iter()
710                .map(|&index| table.columns[index].clone())
711                .collect();
712            Ok(Some(pk_columns))
713        } else {
714            Ok(None)
715        }
716    }
717}
718
719/// Statistics for a table
720#[derive(Debug, Clone)]
721pub struct TableStats {
722    pub id: TableId,
723    pub name: String,
724    pub column_count: usize,
725    pub primary_key_count: usize,
726    pub root_page_id: u32,
727    pub row_size: usize,
728}