1use crate::btree::{
31 ByteTree, ByteTreeStore, ByteTreeStoreSnapshot, JournalMode as BTreeJournalMode, KeyValueCodec,
32 PageId, PagerIntegrityReport, TreeSpaceStats, TypedTreeStore,
33};
34use crate::catalog::{DatabaseHeader, JournalMode, Table, TableId, Value};
35use crate::error::{HematiteError, Result};
36use std::collections::HashMap;
37use std::path::Path;
38
39use super::cursor::{IndexCursor, TableCursor};
40use super::{
41 engine_metadata, index_store, integrity, record::StoredRow, runtime_metadata, schema_store,
42 table_store, Schema,
43};
44
45#[derive(Debug, Clone)]
46pub struct TableRuntimeMetadata {
47 pub name: String,
48 pub root_page_id: PageId,
49 pub row_count: u64,
50 pub next_row_id: u64,
51}
52
53#[derive(Debug, Clone, PartialEq, Eq)]
54pub struct CatalogStorageStats {
55 pub table_count: usize,
56 pub total_rows: u64,
57 pub file_bytes: u64,
58 pub allocated_page_count: usize,
59 pub free_page_count: usize,
60 pub fragmented_free_page_count: usize,
61 pub trailing_free_page_count: usize,
62 pub live_table_page_count: usize,
63 pub overflow_page_count: usize,
64 pub table_used_bytes: usize,
65 pub table_unused_bytes: usize,
66}
67
68#[derive(Debug, Clone, PartialEq, Eq)]
69pub struct CatalogIntegrityReport {
70 pub table_count: usize,
71 pub live_page_count: usize,
72 pub index_page_count: usize,
73 pub overflow_page_count: usize,
74 pub free_page_count: usize,
75 pub total_rows: u64,
76 pub pager: PagerIntegrityReport,
77}
78
79#[derive(Debug, Clone)]
80pub struct CatalogEngineSnapshot {
81 table_metadata: HashMap<String, TableRuntimeMetadata>,
82 tree_store: ByteTreeStoreSnapshot,
83}
84
85#[derive(Debug)]
86pub struct CatalogEngine {
87 pub(crate) tree_store: ByteTreeStore,
88 pub(crate) table_metadata: HashMap<String, TableRuntimeMetadata>,
89}
90
91impl CatalogEngine {
92 pub(crate) const PAGE_SIZE: usize = ByteTreeStore::PAGE_SIZE;
93 pub(crate) const INVALID_PAGE_ID: PageId = ByteTreeStore::INVALID_PAGE_ID;
94 pub(crate) const STORAGE_METADATA_VERSION: u32 = 3;
95
96 pub fn new<P: AsRef<Path>>(path: P) -> Result<Self> {
97 Self::from_tree_store(ByteTreeStore::open_path(path, 100)?)
98 }
99
100 pub fn new_in_memory() -> Result<Self> {
101 Self::from_tree_store(ByteTreeStore::new_in_memory(100)?)
102 }
103
104 pub(crate) fn from_tree_store(tree_store: ByteTreeStore) -> Result<Self> {
105 let mut engine = Self {
106 tree_store,
107 table_metadata: HashMap::new(),
108 };
109 engine_metadata::load_table_metadata(&mut engine)?;
110 Ok(engine)
111 }
112
113 pub fn read_database_header(&self) -> Result<Option<DatabaseHeader>> {
114 self.tree_store()
115 .read_reserved_blob(ByteTreeStore::DB_HEADER_PAGE_ID)?
116 .map(|page| DatabaseHeader::deserialize(&page))
117 .transpose()
118 }
119
120 pub fn initialize_database_header(&mut self, schema_root_page: u32) -> Result<DatabaseHeader> {
121 let header = DatabaseHeader::new(schema_root_page);
122 let mut page = vec![0; ByteTreeStore::PAGE_SIZE];
123 header.serialize(&mut page)?;
124 self.tree_store()
125 .write_reserved_blob(ByteTreeStore::DB_HEADER_PAGE_ID, &page)?;
126 self.tree_store().flush()?;
127 Ok(header)
128 }
129
130 pub fn allocate_table_id(&mut self) -> Result<TableId> {
131 let header_page = self
132 .tree_store()
133 .read_reserved_blob(ByteTreeStore::DB_HEADER_PAGE_ID)?
134 .ok_or_else(|| HematiteError::StorageError("Database header is missing".to_string()))?;
135 let mut header = DatabaseHeader::deserialize(&header_page)?;
136 let table_id = header.increment_table_id();
137
138 let mut updated_page = vec![0; ByteTreeStore::PAGE_SIZE];
139 header.serialize(&mut updated_page)?;
140 self.tree_store()
141 .write_reserved_blob(ByteTreeStore::DB_HEADER_PAGE_ID, &updated_page)?;
142 Ok(table_id)
143 }
144
145 pub fn set_next_table_id(&mut self, next_table_id: u32) -> Result<()> {
146 self.update_database_header(|header| {
147 header.next_table_id = next_table_id;
148 })
149 }
150
151 pub fn peek_next_table_id(&self) -> Result<TableId> {
152 let header = self
153 .read_database_header()?
154 .ok_or_else(|| HematiteError::StorageError("Database header is missing".to_string()))?;
155 Ok(TableId::new(header.next_table_id))
156 }
157
158 pub fn update_database_header<F>(&mut self, update: F) -> Result<()>
159 where
160 F: FnOnce(&mut DatabaseHeader),
161 {
162 let header_page = self
163 .tree_store()
164 .read_reserved_blob(ByteTreeStore::DB_HEADER_PAGE_ID)?
165 .ok_or_else(|| HematiteError::StorageError("Database header is missing".to_string()))?;
166 let mut header = DatabaseHeader::deserialize(&header_page)?;
167 update(&mut header);
168 header.checksum = header.calculate_checksum();
169
170 let mut updated_page = vec![0; ByteTreeStore::PAGE_SIZE];
171 header.serialize(&mut updated_page)?;
172 self.tree_store()
173 .write_reserved_blob(ByteTreeStore::DB_HEADER_PAGE_ID, &updated_page)
174 }
175
176 #[cfg(test)]
177 pub(crate) fn read_page(&self, page_id: PageId) -> Result<crate::storage::Page> {
178 let storage = self.tree_store().shared_storage();
179 let mut pager = storage.lock().map_err(|_| {
180 HematiteError::InternalError("Catalog engine pager mutex is poisoned".to_string())
181 })?;
182 pager.read_page(page_id)
183 }
184
185 #[cfg(test)]
186 pub(crate) fn write_page(&self, page: crate::storage::Page) -> Result<()> {
187 let storage = self.tree_store().shared_storage();
188 let mut pager = storage.lock().map_err(|_| {
189 HematiteError::InternalError("Catalog engine pager mutex is poisoned".to_string())
190 })?;
191 pager.write_page(page)
192 }
193
194 #[cfg(test)]
195 pub(crate) fn allocate_page(&self) -> Result<PageId> {
196 let storage = self.tree_store().shared_storage();
197 let page_id = storage
198 .lock()
199 .map_err(|_| {
200 HematiteError::InternalError("Catalog engine pager mutex is poisoned".to_string())
201 })?
202 .allocate_page()?;
203 if Self::is_reserved_page(page_id) {
204 return self.allocate_page();
205 }
206 Ok(page_id)
207 }
208
209 #[cfg(test)]
210 pub(crate) fn deallocate_page(&self, page_id: PageId) -> Result<()> {
211 if Self::is_reserved_page(page_id) {
212 return Err(HematiteError::StorageError(
213 "Cannot deallocate reserved page".to_string(),
214 ));
215 }
216 let storage = self.tree_store().shared_storage();
217 let mut pager = storage.lock().map_err(|_| {
218 HematiteError::InternalError("Catalog engine pager mutex is poisoned".to_string())
219 })?;
220 pager.deallocate_page(page_id)
221 }
222
223 #[cfg(test)]
224 pub(crate) fn with_pager<T>(
225 &self,
226 callback: impl FnOnce(&mut crate::storage::Pager) -> Result<T>,
227 ) -> Result<T> {
228 let storage = self.tree_store().shared_storage();
229 let mut pager = storage.lock().map_err(|_| {
230 HematiteError::InternalError("Catalog engine pager mutex is poisoned".to_string())
231 })?;
232 callback(&mut pager)
233 }
234
235 pub fn flush(&mut self) -> Result<()> {
236 engine_metadata::save_table_metadata(self)?;
237 self.tree_store().flush()
238 }
239
240 pub fn journal_mode(&self) -> Result<JournalMode> {
241 Ok(match self.tree_store().journal_mode()? {
242 BTreeJournalMode::Rollback => JournalMode::Rollback,
243 BTreeJournalMode::Wal => JournalMode::Wal,
244 })
245 }
246
247 pub fn set_journal_mode(&mut self, journal_mode: JournalMode) -> Result<()> {
248 let mode = match journal_mode {
249 JournalMode::Rollback => BTreeJournalMode::Rollback,
250 JournalMode::Wal => BTreeJournalMode::Wal,
251 };
252 self.tree_store().set_journal_mode(mode)
253 }
254
255 pub fn checkpoint_wal(&mut self) -> Result<()> {
256 self.tree_store().checkpoint_wal()
257 }
258
259 pub fn begin_transaction(&mut self) -> Result<()> {
260 self.tree_store().begin_transaction()
261 }
262
263 pub fn commit_transaction(&mut self) -> Result<()> {
264 engine_metadata::save_table_metadata(self)?;
265 self.tree_store().commit_transaction()
266 }
267
268 pub fn rollback_transaction(&mut self) -> Result<()> {
269 self.tree_store().rollback_transaction()
270 }
271
272 pub fn transaction_active(&self) -> Result<bool> {
273 self.tree_store().transaction_active()
274 }
275
276 pub(crate) fn begin_read(&mut self) -> Result<()> {
277 self.tree_store().begin_read()
278 }
279
280 pub(crate) fn end_read(&mut self) -> Result<()> {
281 self.tree_store().end_read()
282 }
283
284 pub fn snapshot(&self) -> Result<CatalogEngineSnapshot> {
285 Ok(CatalogEngineSnapshot {
286 table_metadata: self.table_metadata.clone(),
287 tree_store: self.tree_store.snapshot()?,
288 })
289 }
290
291 pub fn restore_snapshot(&mut self, snapshot: CatalogEngineSnapshot) -> Result<()> {
292 self.table_metadata = snapshot.table_metadata;
293 self.tree_store.restore_snapshot(snapshot.tree_store)
294 }
295
296 pub(crate) fn create_empty_btree(&self) -> Result<PageId> {
297 self.tree_store().create_tree()
298 }
299
300 pub(crate) fn get_table_metadata(&self) -> &HashMap<String, TableRuntimeMetadata> {
301 &self.table_metadata
302 }
303
304 pub(crate) fn load_schema(&self, schema_root: PageId) -> Result<Schema> {
305 schema_store::load_schema(self, schema_root)
306 }
307
308 pub(crate) fn save_schema(&mut self, schema: &Schema, current_root: PageId) -> Result<PageId> {
309 schema_store::save_schema(self, schema, current_root)
310 }
311
312 pub(crate) fn tree_store(&self) -> ByteTreeStore {
313 self.tree_store.clone()
314 }
315
316 pub(crate) fn typed_tree_store<C: KeyValueCodec>(&self) -> TypedTreeStore<C> {
317 TypedTreeStore::new(self.tree_store())
318 }
319
320 pub(crate) fn open_tree(&self, root_page_id: PageId) -> Result<ByteTree> {
321 self.tree_store().open_tree(root_page_id)
322 }
323
324 pub(crate) fn create_tree(&self) -> Result<PageId> {
325 self.tree_store().create_tree()
326 }
327
328 pub(crate) fn delete_tree(&self, root_page_id: PageId) -> Result<()> {
329 self.tree_store().delete_tree(root_page_id)
330 }
331
332 pub(crate) fn reset_tree(&self, root_page_id: PageId) -> Result<()> {
333 self.tree_store().reset_tree(root_page_id)
334 }
335
336 pub(crate) fn read_tree_entries(
337 &self,
338 root_page_id: PageId,
339 ) -> Result<Vec<(Vec<u8>, Vec<u8>)>> {
340 self.open_tree(root_page_id)?.entries()
341 }
342
343 pub(crate) fn visit_tree_entries<F>(&self, root_page_id: PageId, mut visit: F) -> Result<()>
344 where
345 F: FnMut(&[u8], &[u8]) -> Result<()>,
346 {
347 let tree = self.open_tree(root_page_id)?;
348 let mut cursor = tree.cursor()?;
349 cursor.first()?;
350 while let Some((key, value)) = cursor.current()? {
351 visit(&key, &value)?;
352 if cursor.next().is_err() {
353 break;
354 }
355 }
356 Ok(())
357 }
358
359 pub(crate) fn collect_tree_page_ids(&self, root_page_id: PageId) -> Result<Vec<PageId>> {
360 self.tree_store().collect_page_ids(root_page_id)
361 }
362
363 pub(crate) fn collect_tree_space_stats(&self, root_page_id: PageId) -> Result<TreeSpaceStats> {
364 self.tree_store().collect_space_stats(root_page_id)
365 }
366
367 pub(crate) fn pager_integrity_report(&mut self) -> Result<PagerIntegrityReport> {
368 self.tree_store().validate_storage()
369 }
370
371 pub(crate) fn free_page_ids(&self) -> Result<Vec<PageId>> {
372 self.tree_store().free_page_ids()
373 }
374
375 pub(crate) fn is_reserved_page(page_id: PageId) -> bool {
376 page_id == ByteTreeStore::DB_HEADER_PAGE_ID
377 || page_id == ByteTreeStore::RESERVED_METADATA_PAGE_ID
378 }
379
380 pub fn get_storage_stats(&self) -> Result<CatalogStorageStats> {
381 table_store::get_storage_stats(self)
382 }
383
384 pub(crate) fn create_runtime_table_metadata(
385 &mut self,
386 table_name: &str,
387 root_page_id: PageId,
388 ) -> Result<()> {
389 runtime_metadata::create_table_metadata(self, table_name, root_page_id)
390 }
391
392 pub(crate) fn table_runtime_metadata(&self, table_name: &str) -> Result<&TableRuntimeMetadata> {
393 runtime_metadata::lookup_table_metadata(self, table_name)
394 }
395
396 pub(crate) fn remove_runtime_table_metadata(
397 &mut self,
398 table_name: &str,
399 ) -> Result<TableRuntimeMetadata> {
400 runtime_metadata::remove_table_metadata(self, table_name)
401 }
402
403 pub(crate) fn rename_table_runtime_metadata(
404 &mut self,
405 old_name: &str,
406 new_name: &str,
407 ) -> Result<()> {
408 runtime_metadata::rename_table_metadata(self, old_name, new_name)
409 }
410
411 pub(crate) fn record_generated_row_insert(
412 &mut self,
413 table_name: &str,
414 new_root_page_id: PageId,
415 row_id: u64,
416 ) {
417 runtime_metadata::apply_insert(self, table_name, new_root_page_id, Some(row_id + 1));
418 }
419
420 pub(crate) fn record_explicit_row_insert(
421 &mut self,
422 table_name: &str,
423 new_root_page_id: PageId,
424 ) {
425 runtime_metadata::apply_insert(self, table_name, new_root_page_id, None);
426 }
427
428 pub(crate) fn record_row_delete(
429 &mut self,
430 table_name: &str,
431 new_root_page_id: PageId,
432 deleted: bool,
433 ) {
434 runtime_metadata::apply_delete(self, table_name, new_root_page_id, deleted);
435 }
436
437 pub(crate) fn prepare_table_replace(&mut self, table_name: &str, rows: &[StoredRow]) {
438 runtime_metadata::prepare_replace(self, table_name, rows);
439 }
440
441 pub fn create_table(&mut self, table_name: &str) -> Result<PageId> {
442 table_store::create_table(self, table_name)
443 }
444
445 pub fn insert_into_table(&mut self, table_name: &str, row: Vec<Value>) -> Result<u64> {
446 table_store::insert_into_table(self, table_name, row)
447 }
448
449 pub fn replace_table_rows(&mut self, table_name: &str, rows: Vec<StoredRow>) -> Result<()> {
450 table_store::replace_table_rows(self, table_name, rows)
451 }
452
453 pub fn insert_row_with_rowid(&mut self, table_name: &str, row: StoredRow) -> Result<()> {
454 table_store::insert_row_with_rowid(self, table_name, row)
455 }
456
457 pub fn delete_from_table_by_rowid(&mut self, table_name: &str, rowid: u64) -> Result<bool> {
458 table_store::delete_from_table_by_rowid(self, table_name, rowid)
459 }
460
461 pub fn drop_table(&mut self, table_name: &str) -> Result<()> {
462 table_store::drop_table(self, table_name)
463 }
464
465 pub fn drop_table_with_indexes(&mut self, table: &Table) -> Result<()> {
466 index_store::drop_table_with_indexes(self, table)
467 }
468
469 pub fn open_table_cursor(&mut self, table_name: &str) -> Result<TableCursor> {
470 table_store::open_table_cursor(self, table_name)
471 }
472
473 pub fn read_rows_with_ids(&mut self, table_name: &str) -> Result<Vec<StoredRow>> {
474 table_store::read_rows_with_ids(self, table_name)
475 }
476
477 pub fn read_from_table(&mut self, table_name: &str) -> Result<Vec<Vec<Value>>> {
478 table_store::read_from_table(self, table_name)
479 }
480
481 pub fn lookup_row_by_rowid(
482 &mut self,
483 table_name: &str,
484 rowid: u64,
485 ) -> Result<Option<StoredRow>> {
486 table_store::lookup_row_by_rowid(self, table_name, rowid)
487 }
488
489 pub fn lookup_row_by_primary_key(
490 &mut self,
491 table: &Table,
492 key_values: &[Value],
493 ) -> Result<Option<StoredRow>> {
494 index_store::lookup_row_by_primary_key(self, table, key_values)
495 }
496
497 pub fn lookup_primary_key_rowid(
498 &mut self,
499 table: &Table,
500 key_values: &[Value],
501 ) -> Result<Option<u64>> {
502 index_store::lookup_primary_key_rowid(self, table, key_values)
503 }
504
505 pub fn register_primary_key_row(&mut self, table: &Table, row: StoredRow) -> Result<()> {
506 index_store::register_primary_key_row(self, table, row)
507 }
508
509 pub fn lookup_rows_by_secondary_index(
510 &mut self,
511 table: &Table,
512 index_name: &str,
513 key_values: &[Value],
514 ) -> Result<Vec<StoredRow>> {
515 index_store::lookup_rows_by_secondary_index(self, table, index_name, key_values)
516 }
517
518 pub fn lookup_secondary_index_rowids(
519 &mut self,
520 table: &Table,
521 index_name: &str,
522 key_values: &[Value],
523 ) -> Result<Vec<u64>> {
524 index_store::lookup_secondary_index_rowids(self, table, index_name, key_values)
525 }
526
527 pub fn register_secondary_index_row(&mut self, table: &Table, row: StoredRow) -> Result<()> {
528 index_store::register_secondary_index_row(self, table, row)
529 }
530
531 pub fn rebuild_primary_key_index(&mut self, table: &Table, rows: &[StoredRow]) -> Result<()> {
532 index_store::rebuild_primary_key_index(self, table, rows)
533 }
534
535 pub fn rebuild_secondary_indexes(&mut self, table: &Table, rows: &[StoredRow]) -> Result<()> {
536 index_store::rebuild_secondary_indexes(self, table, rows)
537 }
538
539 pub fn delete_primary_key_row(&mut self, table: &Table, row: &StoredRow) -> Result<bool> {
540 index_store::delete_primary_key_row(self, table, row)
541 }
542
543 pub fn delete_secondary_index_row(&mut self, table: &Table, row: &StoredRow) -> Result<()> {
544 index_store::delete_secondary_index_row(self, table, row)
545 }
546
547 pub fn encode_primary_key(&self, key_values: &[Value]) -> Result<Vec<u8>> {
548 index_store::encode_primary_key(key_values)
549 }
550
551 pub fn encode_secondary_index_key(&self, key_values: &[Value]) -> Result<Vec<u8>> {
552 index_store::encode_secondary_index_key(key_values)
553 }
554
555 pub fn open_primary_key_cursor(&mut self, table: &Table) -> Result<IndexCursor> {
556 index_store::open_primary_key_cursor(self, table)
557 }
558
559 pub fn open_secondary_index_cursor(
560 &mut self,
561 table: &Table,
562 index_name: &str,
563 ) -> Result<IndexCursor> {
564 index_store::open_secondary_index_cursor(self, table, index_name)
565 }
566
567 pub fn validate_table_indexes(&mut self, table: &Table) -> Result<()> {
568 integrity::validate_table_indexes(self, table)
569 }
570
571 pub(crate) fn validate_catalog_layout(
572 &mut self,
573 tables: &[Table],
574 ) -> Result<integrity::CatalogTreeUsage> {
575 integrity::validate_catalog_layout(self, tables)
576 }
577
578 pub fn validate_integrity(&mut self) -> Result<CatalogIntegrityReport> {
579 integrity::validate_integrity(self)
580 }
581}