1use crate::catalog::column::Column;
22use crate::catalog::engine::{CatalogEngine, CatalogEngineSnapshot, CatalogIntegrityReport};
23use crate::catalog::ids::TableId;
24use crate::catalog::object::{NamedConstraintKind, Trigger, View};
25use crate::catalog::schema::Schema;
26use crate::catalog::table::{CheckConstraint, ForeignKeyConstraint, SecondaryIndex, Table};
27use crate::catalog::JournalMode;
28use crate::error::Result;
29use std::collections::HashMap;
30#[derive(Debug)]
31pub struct Catalog {
32 engine: CatalogEngine,
33 schema: Schema,
34 schema_root: u32,
35 schema_dirty: bool,
36}
37
38#[derive(Debug, Clone)]
39pub(crate) struct CatalogSnapshot {
40 schema: Schema,
41 schema_root: u32,
42 schema_dirty: bool,
43 engine: CatalogEngineSnapshot,
44}
45
46impl Catalog {
47 pub fn open_or_create<P: AsRef<std::path::Path>>(path: P) -> Result<Self> {
49 Self::open_with_engine(CatalogEngine::new(path)?)
50 }
51
52 pub fn open_in_memory() -> Result<Self> {
53 Self::open_with_engine(CatalogEngine::new_in_memory()?)
54 }
55
56 fn open_with_engine(mut engine: CatalogEngine) -> Result<Self> {
57 let existing_header = engine.read_database_header()?;
58
59 let header = match existing_header {
60 Some(header) => header,
61 None => {
62 let schema_root = engine.create_tree()?;
64 engine.initialize_database_header(schema_root)?
65 }
66 };
67
68 let schema = engine.load_schema(header.schema_root_page)?;
70
71 Ok(Self {
72 engine,
73 schema,
74 schema_root: header.schema_root_page,
75 schema_dirty: false,
76 })
77 }
78
79 fn save_schema_to_btree(&mut self) -> Result<()> {
81 if !self.schema_dirty {
82 return Ok(());
83 }
84
85 let current_schema_root = self.engine.save_schema(&self.schema, self.schema_root)?;
86
87 let transaction_active = self.engine.transaction_active()?;
88 self.engine.update_database_header(|header| {
89 header.schema_root_page = current_schema_root;
90 })?;
91 if !transaction_active {
92 self.engine.flush()?;
93 }
94
95 self.schema_root = current_schema_root;
96 self.schema_dirty = false;
97 Ok(())
98 }
99
100 fn get_next_table_id(&mut self) -> Result<TableId> {
101 self.engine.allocate_table_id()
102 }
103
104 pub fn create_table(&mut self, name: &str, columns: Vec<Column>) -> Result<TableId> {
105 if self.schema.get_table_by_name(name).is_some() {
106 return Err(crate::error::HematiteError::StorageError(format!(
107 "Table '{}' already exists",
108 name
109 )));
110 }
111
112 let table_id = self.get_next_table_id()?;
113 let table = Table::new(table_id, name.to_string(), columns, 0u32)?;
114
115 self.schema.insert_table(table.clone())?;
116 self.schema_dirty = true;
117 self.save_schema_to_btree()?;
118
119 Ok(table_id)
120 }
121
122 pub fn create_table_with_roots(
123 &mut self,
124 name: &str,
125 columns: Vec<Column>,
126 table_root_page_id: u32,
127 primary_key_root_page_id: u32,
128 ) -> Result<TableId> {
129 if self.schema.get_table_by_name(name).is_some() {
130 return Err(crate::error::HematiteError::StorageError(format!(
131 "Table '{}' already exists",
132 name
133 )));
134 }
135
136 let table_id = self.get_next_table_id()?;
137 let mut table = Table::new(table_id, name.to_string(), columns, table_root_page_id)?;
138 table.primary_key_index_root_page_id = primary_key_root_page_id;
139
140 self.schema.insert_table(table)?;
141 self.schema_dirty = true;
142 self.save_schema_to_btree()?;
143
144 Ok(table_id)
145 }
146
147 pub fn get_table(&self, table_id: TableId) -> Result<Option<Table>> {
148 Ok(self.schema.get_table(table_id).cloned())
149 }
150
151 pub fn get_table_by_name(&self, name: &str) -> Result<Option<Table>> {
152 Ok(self.schema.get_table_by_name(name).cloned())
153 }
154
155 pub fn drop_table(&mut self, table_id: TableId) -> Result<()> {
156 let table = self.schema.get_table(table_id).cloned();
157 if table.is_none() {
158 return Err(crate::error::HematiteError::StorageError(
159 "Table not found".to_string(),
160 ));
161 }
162 let table = table.unwrap();
163 if let Some(view_name) = self.first_view_dependency_on(&table.name, None) {
164 return Err(crate::error::HematiteError::ParseError(format!(
165 "Cannot drop table '{}' because view '{}' depends on it",
166 table.name, view_name
167 )));
168 }
169 self.schema.drop_table(table_id)?;
170 self.schema_dirty = true;
171 self.save_schema_to_btree()?;
172
173 Ok(())
174 }
175
176 pub fn rename_table(&mut self, old_name: &str, new_name: &str) -> Result<()> {
177 let table = self.schema.get_table_by_name(old_name).ok_or_else(|| {
178 crate::error::HematiteError::StorageError(format!("Table '{}' not found", old_name))
179 })?;
180
181 self.schema.rename_table(table.id, new_name.to_string())?;
182 self.schema_dirty = true;
183 self.save_schema_to_btree()?;
184 Ok(())
185 }
186
187 pub fn add_column(&mut self, table_id: TableId, column: Column) -> Result<()> {
188 self.schema.add_column(table_id, column)?;
189 self.schema_dirty = true;
190 self.save_schema_to_btree()?;
191 Ok(())
192 }
193
194 pub fn rename_column(
195 &mut self,
196 table_id: TableId,
197 old_name: &str,
198 new_name: String,
199 ) -> Result<()> {
200 self.schema.rename_column(table_id, old_name, new_name)?;
201 self.schema_dirty = true;
202 self.save_schema_to_btree()?;
203 Ok(())
204 }
205
206 pub fn drop_column(&mut self, table_id: TableId, column_name: &str) -> Result<usize> {
207 let dropped_index = self.schema.drop_column(table_id, column_name)?;
208 self.schema_dirty = true;
209 self.save_schema_to_btree()?;
210 Ok(dropped_index)
211 }
212
213 pub fn set_column_default(
214 &mut self,
215 table_id: TableId,
216 column_name: &str,
217 default_value: Option<crate::catalog::Value>,
218 ) -> Result<()> {
219 self.schema
220 .set_column_default(table_id, column_name, default_value)?;
221 self.schema_dirty = true;
222 self.save_schema_to_btree()?;
223 Ok(())
224 }
225
226 pub fn set_column_nullable(
227 &mut self,
228 table_id: TableId,
229 column_name: &str,
230 nullable: bool,
231 ) -> Result<()> {
232 self.schema
233 .set_column_nullable(table_id, column_name, nullable)?;
234 self.schema_dirty = true;
235 self.save_schema_to_btree()?;
236 Ok(())
237 }
238
239 pub fn add_check_constraint(
240 &mut self,
241 table_id: TableId,
242 constraint: CheckConstraint,
243 ) -> Result<()> {
244 self.schema.add_check_constraint(table_id, constraint)?;
245 self.schema_dirty = true;
246 self.save_schema_to_btree()?;
247 Ok(())
248 }
249
250 pub fn add_foreign_key(
251 &mut self,
252 table_id: TableId,
253 constraint: ForeignKeyConstraint,
254 ) -> Result<()> {
255 self.schema.add_foreign_key(table_id, constraint)?;
256 self.schema_dirty = true;
257 self.save_schema_to_btree()?;
258 Ok(())
259 }
260
261 pub fn list_tables(&self) -> Result<Vec<(TableId, String)>> {
262 Ok(self.schema.list_tables())
263 }
264
265 pub fn create_view(&mut self, view: View) -> Result<()> {
266 if view
267 .dependencies
268 .iter()
269 .any(|dependency| dependency.eq_ignore_ascii_case(&view.name))
270 {
271 return Err(crate::error::HematiteError::ParseError(format!(
272 "View '{}' cannot depend on itself",
273 view.name
274 )));
275 }
276 for dependency in &view.dependencies {
277 if self.view_depends_on(dependency, &view.name) {
278 return Err(crate::error::HematiteError::ParseError(format!(
279 "Creating view '{}' would introduce a recursive view cycle through '{}'",
280 view.name, dependency
281 )));
282 }
283 }
284 self.schema.create_view(view)?;
285 self.schema_dirty = true;
286 self.save_schema_to_btree()
287 }
288
289 pub fn drop_view(&mut self, name: &str) -> Result<View> {
290 if let Some(view_name) = self.first_view_dependency_on(name, Some(name)) {
291 return Err(crate::error::HematiteError::ParseError(format!(
292 "Cannot drop view '{}' because view '{}' depends on it",
293 name, view_name
294 )));
295 }
296 let view = self.schema.drop_view(name)?;
297 self.schema_dirty = true;
298 self.save_schema_to_btree()?;
299 Ok(view)
300 }
301
302 pub fn get_view(&self, name: &str) -> Result<Option<View>> {
303 Ok(self.schema.view(name).cloned())
304 }
305
306 pub fn list_views(&self) -> Result<Vec<String>> {
307 Ok(self.schema.list_views())
308 }
309
310 pub fn create_trigger(&mut self, trigger: Trigger) -> Result<()> {
311 self.schema.create_trigger(trigger)?;
312 self.schema_dirty = true;
313 self.save_schema_to_btree()
314 }
315
316 pub fn drop_trigger(&mut self, name: &str) -> Result<Trigger> {
317 let trigger = self.schema.drop_trigger(name)?;
318 self.schema_dirty = true;
319 self.save_schema_to_btree()?;
320 Ok(trigger)
321 }
322
323 pub fn get_trigger(&self, name: &str) -> Result<Option<Trigger>> {
324 Ok(self.schema.trigger(name).cloned())
325 }
326
327 pub fn list_triggers(&self) -> Result<Vec<String>> {
328 Ok(self.schema.list_triggers())
329 }
330
331 pub fn drop_named_constraint(
332 &mut self,
333 table_id: TableId,
334 constraint_name: &str,
335 ) -> Result<NamedConstraintKind> {
336 let kind = self
337 .schema
338 .drop_named_constraint(table_id, constraint_name)?;
339 self.schema_dirty = true;
340 self.save_schema_to_btree()?;
341 Ok(kind)
342 }
343
344 pub fn get_schema(&self) -> &Schema {
345 &self.schema
346 }
347
348 fn first_view_dependency_on(
349 &self,
350 object_name: &str,
351 skip_view: Option<&str>,
352 ) -> Option<String> {
353 self.schema
354 .list_views()
355 .into_iter()
356 .filter(|view_name| !skip_view.is_some_and(|skip| view_name.eq_ignore_ascii_case(skip)))
357 .find(|view_name| {
358 self.schema.view(view_name).is_some_and(|view| {
359 view.dependencies
360 .iter()
361 .any(|dependency| dependency.eq_ignore_ascii_case(object_name))
362 })
363 })
364 }
365
366 fn view_depends_on(&self, view_name: &str, target_name: &str) -> bool {
367 let Some(view) = self.schema.view(view_name) else {
368 return false;
369 };
370 view.dependencies.iter().any(|dependency| {
371 dependency.eq_ignore_ascii_case(target_name)
372 || self.view_depends_on(dependency, target_name)
373 })
374 }
375
376 pub fn clone_schema(&self) -> Schema {
377 self.schema.clone()
378 }
379
380 pub fn with_engine<F, T>(&mut self, f: F) -> Result<T>
381 where
382 F: FnOnce(&mut CatalogEngine) -> Result<T>,
383 {
384 f(&mut self.engine)
385 }
386
387 pub(crate) fn with_read_engine<F, T>(&mut self, f: F) -> Result<T>
388 where
389 F: FnOnce(&mut CatalogEngine) -> Result<T>,
390 {
391 self.engine.begin_read()?;
392 let result = f(&mut self.engine);
393 let release = self.engine.end_read();
394 match (result, release) {
395 (Ok(value), Ok(())) => Ok(value),
396 (Err(err), _) => Err(err),
397 (Ok(_), Err(err)) => Err(err),
398 }
399 }
400
401 pub(crate) fn snapshot(&self) -> Result<CatalogSnapshot> {
402 Ok(CatalogSnapshot {
403 schema: self.schema.clone(),
404 schema_root: self.schema_root,
405 schema_dirty: self.schema_dirty,
406 engine: self.engine.snapshot()?,
407 })
408 }
409
410 pub(crate) fn restore_snapshot(&mut self, snapshot: CatalogSnapshot) -> Result<()> {
411 self.schema = snapshot.schema;
412 self.schema_root = snapshot.schema_root;
413 self.schema_dirty = snapshot.schema_dirty;
414 self.engine.restore_snapshot(snapshot.engine)
415 }
416
417 pub(crate) fn begin_transaction(&mut self) -> Result<()> {
418 self.engine.begin_transaction()
419 }
420
421 pub(crate) fn commit_transaction(&mut self) -> Result<()> {
422 self.save_schema_to_btree()?;
423 self.engine.commit_transaction()
424 }
425
426 pub(crate) fn rollback_transaction(&mut self) -> Result<()> {
427 self.engine.rollback_transaction()
428 }
429
430 pub fn flush_schema(&mut self) -> Result<()> {
431 self.save_schema_to_btree()
432 }
433
434 pub fn flush(&mut self) -> Result<()> {
435 self.save_schema_to_btree()?;
436 self.engine.flush()
437 }
438
439 pub fn journal_mode(&self) -> Result<JournalMode> {
440 self.engine.journal_mode()
441 }
442
443 pub fn set_journal_mode(&mut self, journal_mode: JournalMode) -> Result<()> {
444 self.save_schema_to_btree()?;
445 self.engine.set_journal_mode(journal_mode)
446 }
447
448 pub fn checkpoint_wal(&mut self) -> Result<()> {
449 self.save_schema_to_btree()?;
450 self.engine.checkpoint_wal()
451 }
452
453 pub fn replace_schema(&mut self, schema: Schema) -> Result<()> {
454 self.schema = schema;
455 self.schema_dirty = true;
456 self.save_schema_to_btree()?;
457 self.engine.set_next_table_id(self.schema.next_table_id())
458 }
459
460 pub fn set_table_root_page(&mut self, table_id: TableId, root_page: u32) -> Result<()> {
461 if self.schema.get_table(table_id).is_none() {
462 return Err(crate::error::HematiteError::StorageError(format!(
463 "Table ID {} not found",
464 table_id.as_u32()
465 )));
466 }
467
468 if root_page == 0 {
469 return Err(crate::error::HematiteError::StorageError(
470 "Root page 0 is reserved for database header".to_string(),
471 ));
472 }
473
474 self.schema.set_table_root_page(table_id, root_page)?;
475 self.schema_dirty = true;
476 self.save_schema_to_btree()?;
477
478 Ok(())
479 }
480
481 pub fn get_table_root_page(&self, table_id: TableId) -> Result<Option<u32>> {
482 if let Some(table) = self.schema.get_table(table_id) {
483 if table.root_page_id == 0 {
484 Ok(None)
485 } else {
486 Ok(Some(table.root_page_id))
487 }
488 } else {
489 Ok(None)
490 }
491 }
492
493 pub fn add_secondary_index(&mut self, table_id: TableId, index: SecondaryIndex) -> Result<()> {
494 self.schema.add_secondary_index(table_id, index)?;
495 self.schema_dirty = true;
496 self.save_schema_to_btree()?;
497
498 Ok(())
499 }
500
501 pub fn set_table_primary_key_root_page(
502 &mut self,
503 table_id: TableId,
504 root_page_id: u32,
505 ) -> Result<()> {
506 if root_page_id == 0 {
507 return Err(crate::error::HematiteError::StorageError(
508 "Root page 0 is reserved for database header".to_string(),
509 ));
510 }
511
512 self.schema
513 .set_table_primary_key_root_page(table_id, root_page_id)?;
514 self.schema_dirty = true;
515 self.save_schema_to_btree()?;
516
517 Ok(())
518 }
519
520 pub fn set_table_storage_roots(
521 &mut self,
522 table_id: TableId,
523 table_root_page_id: u32,
524 primary_key_root_page_id: u32,
525 ) -> Result<()> {
526 if table_root_page_id == 0 || primary_key_root_page_id == 0 {
527 return Err(crate::error::HematiteError::StorageError(
528 "Root page 0 is reserved for database header".to_string(),
529 ));
530 }
531
532 self.schema.set_table_storage_roots(
533 table_id,
534 table_root_page_id,
535 primary_key_root_page_id,
536 )?;
537 self.schema_dirty = true;
538 self.save_schema_to_btree()?;
539
540 Ok(())
541 }
542
543 pub fn validate_schema(&self) -> Result<()> {
544 let schema_result = self.schema.validate();
545
546 for (table_id, table_name) in self.list_tables()? {
547 let table = self.schema.get_table(table_id).ok_or_else(|| {
548 crate::error::HematiteError::StorageError(format!(
549 "Table {} found in list but not in schema",
550 table_name
551 ))
552 })?;
553
554 if table.root_page_id == 0 {
555 continue;
556 }
557 }
558
559 schema_result
560 }
561
562 pub fn validate_integrity(&mut self) -> Result<CatalogIntegrityReport> {
563 self.validate_schema()?;
564
565 let schema_tables = self
566 .schema
567 .list_tables()
568 .into_iter()
569 .filter_map(|(table_id, table_name)| {
570 self.schema
571 .get_table(table_id)
572 .map(|table| (table_name, table.root_page_id))
573 })
574 .collect::<HashMap<_, _>>();
575
576 let storage_tables = self
577 .engine
578 .get_table_metadata()
579 .iter()
580 .map(|(name, metadata)| (name.clone(), metadata.root_page_id))
581 .collect::<HashMap<_, _>>();
582
583 for (table_name, root_page_id) in &schema_tables {
584 let storage_root = storage_tables.get(table_name).ok_or_else(|| {
585 crate::error::HematiteError::CorruptedData(format!(
586 "Catalog table '{}' is missing from storage metadata",
587 table_name
588 ))
589 })?;
590
591 if storage_root != root_page_id {
592 return Err(crate::error::HematiteError::CorruptedData(format!(
593 "Catalog/storage root mismatch for table '{}': catalog={}, storage={}",
594 table_name, root_page_id, storage_root
595 )));
596 }
597 }
598
599 for table_name in storage_tables.keys() {
600 if !schema_tables.contains_key(table_name) {
601 return Err(crate::error::HematiteError::CorruptedData(format!(
602 "Storage metadata contains table '{}' missing from catalog schema",
603 table_name
604 )));
605 }
606 }
607
608 let tables = self
609 .schema
610 .list_tables()
611 .into_iter()
612 .filter_map(|(table_id, _)| self.schema.get_table(table_id).cloned())
613 .collect::<Vec<_>>();
614 let mut report = self.engine.validate_integrity()?;
615 let usage = self.engine.validate_catalog_layout(&tables)?;
616 report.live_page_count = usage.live_table_pages;
617 report.index_page_count = usage.live_index_pages;
618 Ok(report)
619 }
620
621 pub fn get_total_column_count(&self) -> usize {
622 self.schema.get_total_column_count()
623 }
624
625 pub fn get_table_stats(&self, table_id: TableId) -> Result<Option<TableStats>> {
626 if let Some(table) = self.schema.get_table(table_id) {
627 Ok(Some(TableStats {
628 id: table.id,
629 name: table.name.clone(),
630 column_count: table.column_count(),
631 primary_key_count: table.primary_key_count(),
632 root_page_id: table.root_page_id,
633 row_size: table.row_size(),
634 }))
635 } else {
636 Ok(None)
637 }
638 }
639
640 pub fn get_all_table_stats(&self) -> Result<Vec<TableStats>> {
641 let tables = self.list_tables()?;
642 let mut stats = Vec::new();
643
644 for (table_id, _name) in tables {
645 if let Some(table_stat) = self.get_table_stats(table_id)? {
646 stats.push(table_stat);
647 }
648 }
649
650 Ok(stats)
651 }
652
653 pub fn table_exists(&self, name: &str) -> bool {
654 self.schema.get_table_by_name(name).is_some()
655 }
656
657 pub fn table_exists_by_id(&self, table_id: TableId) -> bool {
658 self.schema.get_table(table_id).is_some()
659 }
660
661 pub fn peek_next_table_id(&self) -> Result<TableId> {
662 self.engine.peek_next_table_id()
663 }
664
665 pub fn create_table_with_root(
666 &mut self,
667 name: &str,
668 columns: Vec<Column>,
669 root_page: u32,
670 ) -> Result<TableId> {
671 if self.schema.get_table_by_name(name).is_some() {
672 return Err(crate::error::HematiteError::StorageError(format!(
673 "Table '{}' already exists",
674 name
675 )));
676 }
677
678 let table_id = self.get_next_table_id()?;
679
680 let table = Table::new(table_id, name.to_string(), columns, root_page)?;
681
682 self.schema.insert_table(table.clone())?;
683 self.schema_dirty = true;
684 self.save_schema_to_btree()?;
685
686 Ok(table_id)
687 }
688
689 pub fn get_table_columns(&self, table_id: TableId) -> Result<Option<Vec<Column>>> {
690 if let Some(table) = self.schema.get_table(table_id) {
691 Ok(Some(table.columns.clone()))
692 } else {
693 Ok(None)
694 }
695 }
696
697 pub fn get_table_columns_by_name(&self, name: &str) -> Result<Option<Vec<Column>>> {
698 if let Some(table) = self.schema.get_table_by_name(name) {
699 Ok(Some(table.columns.clone()))
700 } else {
701 Ok(None)
702 }
703 }
704
705 pub fn get_primary_key_columns(&self, table_id: TableId) -> Result<Option<Vec<Column>>> {
706 if let Some(table) = self.schema.get_table(table_id) {
707 let pk_columns = table
708 .primary_key_columns
709 .iter()
710 .map(|&index| table.columns[index].clone())
711 .collect();
712 Ok(Some(pk_columns))
713 } else {
714 Ok(None)
715 }
716 }
717}
718
719#[derive(Debug, Clone)]
721pub struct TableStats {
722 pub id: TableId,
723 pub name: String,
724 pub column_count: usize,
725 pub primary_key_count: usize,
726 pub root_page_id: u32,
727 pub row_size: usize,
728}