Skip to main content

hematite/catalog/
engine.rs

1//! Catalog storage engine.
2//!
3//! This is the narrow bridge between relational concepts and the generic lower layers.
4//!
5//! ```text
6//! SQL / planner / executor
7//!          |
8//!          v
9//!      CatalogEngine
10//!          |
11//!   +------+------+
12//!   |             |
13//! rows/indexes  schema metadata
14//!   |             |
15//!   +-------> generic B-tree
16//!                    |
17//!                    v
18//!                  pager
19//! ```
20//!
21//! Responsibilities:
22//! - hold runtime metadata such as row counts and next rowid;
23//! - persist schema roots and table metadata;
24//! - expose relational operations in terms of rows, keys, and cursors;
25//! - prevent page- and node-level details from leaking into query/catalog code.
26//!
27//! This file should coordinate access methods, not define relational byte formats. Row codecs and
28//! index-key codecs live beside the catalog model so the generic lower layers remain reusable.
29
30use crate::btree::{
31    ByteTree, ByteTreeStore, ByteTreeStoreSnapshot, JournalMode as BTreeJournalMode, KeyValueCodec,
32    PageId, PagerIntegrityReport, TreeSpaceStats, TypedTreeStore,
33};
34use crate::catalog::{DatabaseHeader, JournalMode, Table, TableId, Value};
35use crate::error::{HematiteError, Result};
36use std::collections::HashMap;
37use std::path::Path;
38
39use super::cursor::{IndexCursor, TableCursor};
40use super::{
41    engine_metadata, index_store, integrity, record::StoredRow, runtime_metadata, schema_store,
42    table_store, Schema,
43};
44
45#[derive(Debug, Clone)]
46pub struct TableRuntimeMetadata {
47    pub name: String,
48    pub root_page_id: PageId,
49    pub row_count: u64,
50    pub next_row_id: u64,
51}
52
53#[derive(Debug, Clone, PartialEq, Eq)]
54pub struct CatalogStorageStats {
55    pub table_count: usize,
56    pub total_rows: u64,
57    pub file_bytes: u64,
58    pub allocated_page_count: usize,
59    pub free_page_count: usize,
60    pub fragmented_free_page_count: usize,
61    pub trailing_free_page_count: usize,
62    pub live_table_page_count: usize,
63    pub overflow_page_count: usize,
64    pub table_used_bytes: usize,
65    pub table_unused_bytes: usize,
66}
67
68#[derive(Debug, Clone, PartialEq, Eq)]
69pub struct CatalogIntegrityReport {
70    pub table_count: usize,
71    pub live_page_count: usize,
72    pub index_page_count: usize,
73    pub overflow_page_count: usize,
74    pub free_page_count: usize,
75    pub total_rows: u64,
76    pub pager: PagerIntegrityReport,
77}
78
79#[derive(Debug, Clone)]
80pub struct CatalogEngineSnapshot {
81    table_metadata: HashMap<String, TableRuntimeMetadata>,
82    tree_store: ByteTreeStoreSnapshot,
83}
84
85#[derive(Debug)]
86pub struct CatalogEngine {
87    pub(crate) tree_store: ByteTreeStore,
88    pub(crate) table_metadata: HashMap<String, TableRuntimeMetadata>,
89}
90
91impl CatalogEngine {
92    pub(crate) const PAGE_SIZE: usize = ByteTreeStore::PAGE_SIZE;
93    pub(crate) const INVALID_PAGE_ID: PageId = ByteTreeStore::INVALID_PAGE_ID;
94    pub(crate) const STORAGE_METADATA_VERSION: u32 = 3;
95
96    pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
97        Self::from_tree_store(ByteTreeStore::open_path(path, 100)?)
98    }
99
100    pub fn new_in_memory() -> Result<Self> {
101        Self::from_tree_store(ByteTreeStore::new_in_memory(100)?)
102    }
103
104    pub(crate) fn from_tree_store(tree_store: ByteTreeStore) -> Result<Self> {
105        let mut engine = Self {
106            tree_store,
107            table_metadata: HashMap::new(),
108        };
109        engine_metadata::load_table_metadata(&mut engine)?;
110        Ok(engine)
111    }
112
113    pub fn read_database_header(&self) -> Result<Option<DatabaseHeader>> {
114        self.tree_store()
115            .read_reserved_blob(ByteTreeStore::DB_HEADER_PAGE_ID)?
116            .map(|page| DatabaseHeader::deserialize(&page))
117            .transpose()
118    }
119
120    pub fn initialize_database_header(&mut self, schema_root_page: u32) -> Result<DatabaseHeader> {
121        let header = DatabaseHeader::new(schema_root_page);
122        let mut page = vec![0; ByteTreeStore::PAGE_SIZE];
123        header.serialize(&mut page)?;
124        self.tree_store()
125            .write_reserved_blob(ByteTreeStore::DB_HEADER_PAGE_ID, &page)?;
126        self.tree_store().flush()?;
127        Ok(header)
128    }
129
130    pub fn allocate_table_id(&mut self) -> Result<TableId> {
131        let header_page = self
132            .tree_store()
133            .read_reserved_blob(ByteTreeStore::DB_HEADER_PAGE_ID)?
134            .ok_or_else(|| HematiteError::StorageError("Database header is missing".to_string()))?;
135        let mut header = DatabaseHeader::deserialize(&header_page)?;
136        let table_id = header.increment_table_id();
137
138        let mut updated_page = vec![0; ByteTreeStore::PAGE_SIZE];
139        header.serialize(&mut updated_page)?;
140        self.tree_store()
141            .write_reserved_blob(ByteTreeStore::DB_HEADER_PAGE_ID, &updated_page)?;
142        Ok(table_id)
143    }
144
145    pub fn set_next_table_id(&mut self, next_table_id: u32) -> Result<()> {
146        self.update_database_header(|header| {
147            header.next_table_id = next_table_id;
148        })
149    }
150
151    pub fn peek_next_table_id(&self) -> Result<TableId> {
152        let header = self
153            .read_database_header()?
154            .ok_or_else(|| HematiteError::StorageError("Database header is missing".to_string()))?;
155        Ok(TableId::new(header.next_table_id))
156    }
157
158    pub fn update_database_header<F>(&mut self, update: F) -> Result<()>
159    where
160        F: FnOnce(&mut DatabaseHeader),
161    {
162        let header_page = self
163            .tree_store()
164            .read_reserved_blob(ByteTreeStore::DB_HEADER_PAGE_ID)?
165            .ok_or_else(|| HematiteError::StorageError("Database header is missing".to_string()))?;
166        let mut header = DatabaseHeader::deserialize(&header_page)?;
167        update(&mut header);
168        header.checksum = header.calculate_checksum();
169
170        let mut updated_page = vec![0; ByteTreeStore::PAGE_SIZE];
171        header.serialize(&mut updated_page)?;
172        self.tree_store()
173            .write_reserved_blob(ByteTreeStore::DB_HEADER_PAGE_ID, &updated_page)
174    }
175
176    #[cfg(test)]
177    pub(crate) fn read_page(&self, page_id: PageId) -> Result<crate::storage::Page> {
178        let storage = self.tree_store().shared_storage();
179        let mut pager = storage.lock().map_err(|_| {
180            HematiteError::InternalError("Catalog engine pager mutex is poisoned".to_string())
181        })?;
182        pager.read_page(page_id)
183    }
184
185    #[cfg(test)]
186    pub(crate) fn write_page(&self, page: crate::storage::Page) -> Result<()> {
187        let storage = self.tree_store().shared_storage();
188        let mut pager = storage.lock().map_err(|_| {
189            HematiteError::InternalError("Catalog engine pager mutex is poisoned".to_string())
190        })?;
191        pager.write_page(page)
192    }
193
194    #[cfg(test)]
195    pub(crate) fn allocate_page(&self) -> Result<PageId> {
196        let storage = self.tree_store().shared_storage();
197        let page_id = storage
198            .lock()
199            .map_err(|_| {
200                HematiteError::InternalError("Catalog engine pager mutex is poisoned".to_string())
201            })?
202            .allocate_page()?;
203        if Self::is_reserved_page(page_id) {
204            return self.allocate_page();
205        }
206        Ok(page_id)
207    }
208
209    #[cfg(test)]
210    pub(crate) fn deallocate_page(&self, page_id: PageId) -> Result<()> {
211        if Self::is_reserved_page(page_id) {
212            return Err(HematiteError::StorageError(
213                "Cannot deallocate reserved page".to_string(),
214            ));
215        }
216        let storage = self.tree_store().shared_storage();
217        let mut pager = storage.lock().map_err(|_| {
218            HematiteError::InternalError("Catalog engine pager mutex is poisoned".to_string())
219        })?;
220        pager.deallocate_page(page_id)
221    }
222
223    #[cfg(test)]
224    pub(crate) fn with_pager<T>(
225        &self,
226        callback: impl FnOnce(&mut crate::storage::Pager) -> Result<T>,
227    ) -> Result<T> {
228        let storage = self.tree_store().shared_storage();
229        let mut pager = storage.lock().map_err(|_| {
230            HematiteError::InternalError("Catalog engine pager mutex is poisoned".to_string())
231        })?;
232        callback(&mut pager)
233    }
234
235    pub fn flush(&mut self) -> Result<()> {
236        engine_metadata::save_table_metadata(self)?;
237        self.tree_store().flush()
238    }
239
240    pub fn journal_mode(&self) -> Result<JournalMode> {
241        Ok(match self.tree_store().journal_mode()? {
242            BTreeJournalMode::Rollback => JournalMode::Rollback,
243            BTreeJournalMode::Wal => JournalMode::Wal,
244        })
245    }
246
247    pub fn set_journal_mode(&mut self, journal_mode: JournalMode) -> Result<()> {
248        let mode = match journal_mode {
249            JournalMode::Rollback => BTreeJournalMode::Rollback,
250            JournalMode::Wal => BTreeJournalMode::Wal,
251        };
252        self.tree_store().set_journal_mode(mode)
253    }
254
255    pub fn checkpoint_wal(&mut self) -> Result<()> {
256        self.tree_store().checkpoint_wal()
257    }
258
259    pub fn begin_transaction(&mut self) -> Result<()> {
260        self.tree_store().begin_transaction()
261    }
262
263    pub fn commit_transaction(&mut self) -> Result<()> {
264        engine_metadata::save_table_metadata(self)?;
265        self.tree_store().commit_transaction()
266    }
267
268    pub fn rollback_transaction(&mut self) -> Result<()> {
269        self.tree_store().rollback_transaction()
270    }
271
272    pub fn transaction_active(&self) -> Result<bool> {
273        self.tree_store().transaction_active()
274    }
275
276    pub(crate) fn begin_read(&mut self) -> Result<()> {
277        self.tree_store().begin_read()
278    }
279
280    pub(crate) fn end_read(&mut self) -> Result<()> {
281        self.tree_store().end_read()
282    }
283
284    pub fn snapshot(&self) -> Result<CatalogEngineSnapshot> {
285        Ok(CatalogEngineSnapshot {
286            table_metadata: self.table_metadata.clone(),
287            tree_store: self.tree_store.snapshot()?,
288        })
289    }
290
291    pub fn restore_snapshot(&mut self, snapshot: CatalogEngineSnapshot) -> Result<()> {
292        self.table_metadata = snapshot.table_metadata;
293        self.tree_store.restore_snapshot(snapshot.tree_store)
294    }
295
296    pub(crate) fn create_empty_btree(&self) -> Result<PageId> {
297        self.tree_store().create_tree()
298    }
299
300    pub(crate) fn get_table_metadata(&self) -> &HashMap<String, TableRuntimeMetadata> {
301        &self.table_metadata
302    }
303
304    pub(crate) fn load_schema(&self, schema_root: PageId) -> Result<Schema> {
305        schema_store::load_schema(self, schema_root)
306    }
307
308    pub(crate) fn save_schema(&mut self, schema: &Schema, current_root: PageId) -> Result<PageId> {
309        schema_store::save_schema(self, schema, current_root)
310    }
311
312    pub(crate) fn tree_store(&self) -> ByteTreeStore {
313        self.tree_store.clone()
314    }
315
316    pub(crate) fn typed_tree_store<C: KeyValueCodec>(&self) -> TypedTreeStore<C> {
317        TypedTreeStore::new(self.tree_store())
318    }
319
320    pub(crate) fn open_tree(&self, root_page_id: PageId) -> Result<ByteTree> {
321        self.tree_store().open_tree(root_page_id)
322    }
323
324    pub(crate) fn create_tree(&self) -> Result<PageId> {
325        self.tree_store().create_tree()
326    }
327
328    pub(crate) fn delete_tree(&self, root_page_id: PageId) -> Result<()> {
329        self.tree_store().delete_tree(root_page_id)
330    }
331
332    pub(crate) fn reset_tree(&self, root_page_id: PageId) -> Result<()> {
333        self.tree_store().reset_tree(root_page_id)
334    }
335
336    pub(crate) fn read_tree_entries(
337        &self,
338        root_page_id: PageId,
339    ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
340        self.open_tree(root_page_id)?.entries()
341    }
342
343    pub(crate) fn visit_tree_entries<F>(&self, root_page_id: PageId, mut visit: F) -> Result<()>
344    where
345        F: FnMut(&[u8], &[u8]) -> Result<()>,
346    {
347        let tree = self.open_tree(root_page_id)?;
348        let mut cursor = tree.cursor()?;
349        cursor.first()?;
350        while let Some((key, value)) = cursor.current()? {
351            visit(&key, &value)?;
352            if cursor.next().is_err() {
353                break;
354            }
355        }
356        Ok(())
357    }
358
359    pub(crate) fn collect_tree_page_ids(&self, root_page_id: PageId) -> Result<Vec<PageId>> {
360        self.tree_store().collect_page_ids(root_page_id)
361    }
362
363    pub(crate) fn collect_tree_space_stats(&self, root_page_id: PageId) -> Result<TreeSpaceStats> {
364        self.tree_store().collect_space_stats(root_page_id)
365    }
366
367    pub(crate) fn pager_integrity_report(&mut self) -> Result<PagerIntegrityReport> {
368        self.tree_store().validate_storage()
369    }
370
371    pub(crate) fn free_page_ids(&self) -> Result<Vec<PageId>> {
372        self.tree_store().free_page_ids()
373    }
374
375    pub(crate) fn is_reserved_page(page_id: PageId) -> bool {
376        page_id == ByteTreeStore::DB_HEADER_PAGE_ID
377            || page_id == ByteTreeStore::RESERVED_METADATA_PAGE_ID
378    }
379
380    pub fn get_storage_stats(&self) -> Result<CatalogStorageStats> {
381        table_store::get_storage_stats(self)
382    }
383
384    pub(crate) fn create_runtime_table_metadata(
385        &mut self,
386        table_name: &str,
387        root_page_id: PageId,
388    ) -> Result<()> {
389        runtime_metadata::create_table_metadata(self, table_name, root_page_id)
390    }
391
392    pub(crate) fn table_runtime_metadata(&self, table_name: &str) -> Result<&TableRuntimeMetadata> {
393        runtime_metadata::lookup_table_metadata(self, table_name)
394    }
395
396    pub(crate) fn remove_runtime_table_metadata(
397        &mut self,
398        table_name: &str,
399    ) -> Result<TableRuntimeMetadata> {
400        runtime_metadata::remove_table_metadata(self, table_name)
401    }
402
403    pub(crate) fn rename_table_runtime_metadata(
404        &mut self,
405        old_name: &str,
406        new_name: &str,
407    ) -> Result<()> {
408        runtime_metadata::rename_table_metadata(self, old_name, new_name)
409    }
410
411    pub(crate) fn record_generated_row_insert(
412        &mut self,
413        table_name: &str,
414        new_root_page_id: PageId,
415        row_id: u64,
416    ) {
417        runtime_metadata::apply_insert(self, table_name, new_root_page_id, Some(row_id + 1));
418    }
419
420    pub(crate) fn record_explicit_row_insert(
421        &mut self,
422        table_name: &str,
423        new_root_page_id: PageId,
424    ) {
425        runtime_metadata::apply_insert(self, table_name, new_root_page_id, None);
426    }
427
428    pub(crate) fn record_row_delete(
429        &mut self,
430        table_name: &str,
431        new_root_page_id: PageId,
432        deleted: bool,
433    ) {
434        runtime_metadata::apply_delete(self, table_name, new_root_page_id, deleted);
435    }
436
437    pub(crate) fn prepare_table_replace(&mut self, table_name: &str, rows: &[StoredRow]) {
438        runtime_metadata::prepare_replace(self, table_name, rows);
439    }
440
441    pub fn create_table(&mut self, table_name: &str) -> Result<PageId> {
442        table_store::create_table(self, table_name)
443    }
444
445    pub fn insert_into_table(&mut self, table_name: &str, row: Vec<Value>) -> Result<u64> {
446        table_store::insert_into_table(self, table_name, row)
447    }
448
449    pub fn replace_table_rows(&mut self, table_name: &str, rows: Vec<StoredRow>) -> Result<()> {
450        table_store::replace_table_rows(self, table_name, rows)
451    }
452
453    pub fn insert_row_with_rowid(&mut self, table_name: &str, row: StoredRow) -> Result<()> {
454        table_store::insert_row_with_rowid(self, table_name, row)
455    }
456
457    pub fn delete_from_table_by_rowid(&mut self, table_name: &str, rowid: u64) -> Result<bool> {
458        table_store::delete_from_table_by_rowid(self, table_name, rowid)
459    }
460
461    pub fn drop_table(&mut self, table_name: &str) -> Result<()> {
462        table_store::drop_table(self, table_name)
463    }
464
465    pub fn drop_table_with_indexes(&mut self, table: &Table) -> Result<()> {
466        index_store::drop_table_with_indexes(self, table)
467    }
468
469    pub fn open_table_cursor(&mut self, table_name: &str) -> Result<TableCursor> {
470        table_store::open_table_cursor(self, table_name)
471    }
472
473    pub fn read_rows_with_ids(&mut self, table_name: &str) -> Result<Vec<StoredRow>> {
474        table_store::read_rows_with_ids(self, table_name)
475    }
476
477    pub fn read_from_table(&mut self, table_name: &str) -> Result<Vec<Vec<Value>>> {
478        table_store::read_from_table(self, table_name)
479    }
480
481    pub fn lookup_row_by_rowid(
482        &mut self,
483        table_name: &str,
484        rowid: u64,
485    ) -> Result<Option<StoredRow>> {
486        table_store::lookup_row_by_rowid(self, table_name, rowid)
487    }
488
489    pub fn lookup_row_by_primary_key(
490        &mut self,
491        table: &Table,
492        key_values: &[Value],
493    ) -> Result<Option<StoredRow>> {
494        index_store::lookup_row_by_primary_key(self, table, key_values)
495    }
496
497    pub fn lookup_primary_key_rowid(
498        &mut self,
499        table: &Table,
500        key_values: &[Value],
501    ) -> Result<Option<u64>> {
502        index_store::lookup_primary_key_rowid(self, table, key_values)
503    }
504
505    pub fn register_primary_key_row(&mut self, table: &Table, row: StoredRow) -> Result<()> {
506        index_store::register_primary_key_row(self, table, row)
507    }
508
509    pub fn lookup_rows_by_secondary_index(
510        &mut self,
511        table: &Table,
512        index_name: &str,
513        key_values: &[Value],
514    ) -> Result<Vec<StoredRow>> {
515        index_store::lookup_rows_by_secondary_index(self, table, index_name, key_values)
516    }
517
518    pub fn lookup_secondary_index_rowids(
519        &mut self,
520        table: &Table,
521        index_name: &str,
522        key_values: &[Value],
523    ) -> Result<Vec<u64>> {
524        index_store::lookup_secondary_index_rowids(self, table, index_name, key_values)
525    }
526
527    pub fn register_secondary_index_row(&mut self, table: &Table, row: StoredRow) -> Result<()> {
528        index_store::register_secondary_index_row(self, table, row)
529    }
530
531    pub fn rebuild_primary_key_index(&mut self, table: &Table, rows: &[StoredRow]) -> Result<()> {
532        index_store::rebuild_primary_key_index(self, table, rows)
533    }
534
535    pub fn rebuild_secondary_indexes(&mut self, table: &Table, rows: &[StoredRow]) -> Result<()> {
536        index_store::rebuild_secondary_indexes(self, table, rows)
537    }
538
539    pub fn delete_primary_key_row(&mut self, table: &Table, row: &StoredRow) -> Result<bool> {
540        index_store::delete_primary_key_row(self, table, row)
541    }
542
543    pub fn delete_secondary_index_row(&mut self, table: &Table, row: &StoredRow) -> Result<()> {
544        index_store::delete_secondary_index_row(self, table, row)
545    }
546
547    pub fn encode_primary_key(&self, key_values: &[Value]) -> Result<Vec<u8>> {
548        index_store::encode_primary_key(key_values)
549    }
550
551    pub fn encode_secondary_index_key(&self, key_values: &[Value]) -> Result<Vec<u8>> {
552        index_store::encode_secondary_index_key(key_values)
553    }
554
555    pub fn open_primary_key_cursor(&mut self, table: &Table) -> Result<IndexCursor> {
556        index_store::open_primary_key_cursor(self, table)
557    }
558
559    pub fn open_secondary_index_cursor(
560        &mut self,
561        table: &Table,
562        index_name: &str,
563    ) -> Result<IndexCursor> {
564        index_store::open_secondary_index_cursor(self, table, index_name)
565    }
566
567    pub fn validate_table_indexes(&mut self, table: &Table) -> Result<()> {
568        integrity::validate_table_indexes(self, table)
569    }
570
571    pub(crate) fn validate_catalog_layout(
572        &mut self,
573        tables: &[Table],
574    ) -> Result<integrity::CatalogTreeUsage> {
575        integrity::validate_catalog_layout(self, tables)
576    }
577
578    pub fn validate_integrity(&mut self) -> Result<CatalogIntegrityReport> {
579        integrity::validate_integrity(self)
580    }
581}