1use crate::transaction_tracker::{TransactionId, TransactionTracker};
2use crate::tree_store::{
3 BtreeHeader, InternalTableDefinition, PAGE_SIZE, PageHint, PageNumber, ReadOnlyBackend,
4 ShrinkPolicy, TableTree, TableType, TransactionalMemory,
5};
6use crate::types::{Key, Value};
7use crate::{
8 CompactionError, DatabaseError, Error, ReadOnlyTable, SavepointError, StorageError, TableError,
9};
10use crate::{ReadTransaction, Result, WriteTransaction};
11use std::fmt::{Debug, Display, Formatter};
12
13use std::fs::{File, OpenOptions};
14use std::marker::PhantomData;
15use std::path::Path;
16use std::sync::Arc;
17use std::{io, thread};
18
19use crate::error::TransactionError;
20use crate::sealed::Sealed;
21use crate::transactions::{
22 ALLOCATOR_STATE_TABLE_NAME, AllocatorStateKey, AllocatorStateTree, DATA_ALLOCATED_TABLE,
23 DATA_FREED_TABLE, PageList, SYSTEM_FREED_TABLE, SystemTableDefinition,
24 TransactionIdWithPagination,
25};
26use crate::tree_store::file_backend::FileBackend;
27#[cfg(feature = "logging")]
28use log::{debug, info, warn};
29
30#[allow(clippy::len_without_is_empty)]
31pub trait StorageBackend: 'static + Debug + Send + Sync {
33 fn len(&self) -> std::result::Result<u64, io::Error>;
35
36 fn read(&self, offset: u64, out: &mut [u8]) -> std::result::Result<(), io::Error>;
40
41 fn set_len(&self, len: u64) -> std::result::Result<(), io::Error>;
45
46 fn sync_data(&self) -> std::result::Result<(), io::Error>;
48
49 fn write(&self, offset: u64, data: &[u8]) -> std::result::Result<(), io::Error>;
51
52 fn close(&self) -> std::result::Result<(), io::Error> {
57 Ok(())
58 }
59}
60
61pub trait TableHandle: Sealed {
62 fn name(&self) -> &str;
64}
65
66#[derive(Clone)]
67pub struct UntypedTableHandle {
68 name: String,
69}
70
71impl UntypedTableHandle {
72 pub(crate) fn new(name: String) -> Self {
73 Self { name }
74 }
75}
76
77impl TableHandle for UntypedTableHandle {
78 fn name(&self) -> &str {
79 &self.name
80 }
81}
82
83impl Sealed for UntypedTableHandle {}
84
85pub trait MultimapTableHandle: Sealed {
86 fn name(&self) -> &str;
88}
89
90#[derive(Clone)]
91pub struct UntypedMultimapTableHandle {
92 name: String,
93}
94
95impl UntypedMultimapTableHandle {
96 pub(crate) fn new(name: String) -> Self {
97 Self { name }
98 }
99}
100
101impl MultimapTableHandle for UntypedMultimapTableHandle {
102 fn name(&self) -> &str {
103 &self.name
104 }
105}
106
107impl Sealed for UntypedMultimapTableHandle {}
108
109pub struct TableDefinition<'a, K: Key + 'static, V: Value + 'static> {
116 name: &'a str,
117 _key_type: PhantomData<K>,
118 _value_type: PhantomData<V>,
119}
120
121impl<'a, K: Key + 'static, V: Value + 'static> TableDefinition<'a, K, V> {
122 pub const fn new(name: &'a str) -> Self {
128 assert!(!name.is_empty());
129 Self {
130 name,
131 _key_type: PhantomData,
132 _value_type: PhantomData,
133 }
134 }
135}
136
137impl<K: Key + 'static, V: Value + 'static> TableHandle for TableDefinition<'_, K, V> {
138 fn name(&self) -> &str {
139 self.name
140 }
141}
142
143impl<K: Key, V: Value> Sealed for TableDefinition<'_, K, V> {}
144
145impl<K: Key + 'static, V: Value + 'static> Clone for TableDefinition<'_, K, V> {
146 fn clone(&self) -> Self {
147 *self
148 }
149}
150
151impl<K: Key + 'static, V: Value + 'static> Copy for TableDefinition<'_, K, V> {}
152
153impl<K: Key + 'static, V: Value + 'static> Display for TableDefinition<'_, K, V> {
154 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
155 write!(
156 f,
157 "{}<{}, {}>",
158 self.name,
159 K::type_name().name(),
160 V::type_name().name()
161 )
162 }
163}
164
165pub struct MultimapTableDefinition<'a, K: Key + 'static, V: Key + 'static> {
174 name: &'a str,
175 _key_type: PhantomData<K>,
176 _value_type: PhantomData<V>,
177}
178
179impl<'a, K: Key + 'static, V: Key + 'static> MultimapTableDefinition<'a, K, V> {
180 pub const fn new(name: &'a str) -> Self {
181 assert!(!name.is_empty());
182 Self {
183 name,
184 _key_type: PhantomData,
185 _value_type: PhantomData,
186 }
187 }
188}
189
190impl<K: Key + 'static, V: Key + 'static> MultimapTableHandle for MultimapTableDefinition<'_, K, V> {
191 fn name(&self) -> &str {
192 self.name
193 }
194}
195
196impl<K: Key, V: Key> Sealed for MultimapTableDefinition<'_, K, V> {}
197
198impl<K: Key + 'static, V: Key + 'static> Clone for MultimapTableDefinition<'_, K, V> {
199 fn clone(&self) -> Self {
200 *self
201 }
202}
203
204impl<K: Key + 'static, V: Key + 'static> Copy for MultimapTableDefinition<'_, K, V> {}
205
206impl<K: Key + 'static, V: Key + 'static> Display for MultimapTableDefinition<'_, K, V> {
207 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
208 write!(
209 f,
210 "{}<{}, {}>",
211 self.name,
212 K::type_name().name(),
213 V::type_name().name()
214 )
215 }
216}
217
218#[derive(Debug)]
222pub struct CacheStats {
223 pub(crate) evictions: u64,
224 pub(crate) read_hits: u64,
225 pub(crate) read_misses: u64,
226 pub(crate) write_hits: u64,
227 pub(crate) write_misses: u64,
228 pub(crate) used_bytes: usize,
229}
230
231impl CacheStats {
232 pub fn evictions(&self) -> u64 {
236 self.evictions
237 }
238
239 pub fn read_hits(&self) -> u64 {
241 self.read_hits
242 }
243
244 pub fn read_misses(&self) -> u64 {
246 self.read_misses
247 }
248
249 pub fn write_hits(&self) -> u64 {
251 self.write_hits
252 }
253
254 pub fn write_misses(&self) -> u64 {
256 self.write_misses
257 }
258
259 pub fn used_bytes(&self) -> usize {
261 self.used_bytes
262 }
263}
264
265pub(crate) struct TransactionGuard {
266 transaction_tracker: Option<Arc<TransactionTracker>>,
267 transaction_id: Option<TransactionId>,
268 write_transaction: bool,
269}
270
271impl TransactionGuard {
272 pub(crate) fn new_read(
273 transaction_id: TransactionId,
274 tracker: Arc<TransactionTracker>,
275 ) -> Self {
276 Self {
277 transaction_tracker: Some(tracker),
278 transaction_id: Some(transaction_id),
279 write_transaction: false,
280 }
281 }
282
283 pub(crate) fn new_write(
284 transaction_id: TransactionId,
285 tracker: Arc<TransactionTracker>,
286 ) -> Self {
287 Self {
288 transaction_tracker: Some(tracker),
289 transaction_id: Some(transaction_id),
290 write_transaction: true,
291 }
292 }
293
294 pub(crate) fn fake() -> Self {
296 Self {
297 transaction_tracker: None,
298 transaction_id: None,
299 write_transaction: false,
300 }
301 }
302
303 pub(crate) fn id(&self) -> TransactionId {
304 self.transaction_id.unwrap()
305 }
306
307 pub(crate) fn leak(mut self) -> TransactionId {
308 self.transaction_id.take().unwrap()
309 }
310}
311
312impl Drop for TransactionGuard {
313 fn drop(&mut self) {
314 if self.transaction_tracker.is_none() {
315 return;
316 }
317 if let Some(transaction_id) = self.transaction_id {
318 if self.write_transaction {
319 self.transaction_tracker
320 .as_ref()
321 .unwrap()
322 .end_write_transaction(transaction_id);
323 } else {
324 self.transaction_tracker
325 .as_ref()
326 .unwrap()
327 .deallocate_read_transaction(transaction_id);
328 }
329 }
330 }
331}
332
333pub trait ReadableDatabase {
334 fn begin_read(&self) -> Result<ReadTransaction, TransactionError>;
342
343 fn cache_stats(&self) -> CacheStats;
347}
348
349pub struct ReadOnlyDatabase {
390 mem: Arc<TransactionalMemory>,
391 transaction_tracker: Arc<TransactionTracker>,
392}
393
394impl ReadableDatabase for ReadOnlyDatabase {
395 fn begin_read(&self) -> Result<ReadTransaction, TransactionError> {
396 let id = self
397 .transaction_tracker
398 .register_read_transaction(&self.mem)?;
399 #[cfg(feature = "logging")]
400 debug!("Beginning read transaction id={id:?}");
401
402 let guard = TransactionGuard::new_read(id, self.transaction_tracker.clone());
403
404 ReadTransaction::new(self.mem.clone(), guard)
405 }
406
407 fn cache_stats(&self) -> CacheStats {
408 self.mem.cache_stats()
409 }
410}
411
412impl ReadOnlyDatabase {
413 pub fn open(path: impl AsRef<Path>) -> Result<ReadOnlyDatabase, DatabaseError> {
415 Builder::new().open_read_only(path)
416 }
417
418 fn new(
419 file: Box<dyn StorageBackend>,
420 page_size: usize,
421 region_size: Option<u64>,
422 read_cache_size_bytes: usize,
423 ) -> Result<Self, DatabaseError> {
424 #[cfg(feature = "logging")]
425 let file_path = format!("{:?}", &file);
426 #[cfg(feature = "logging")]
427 info!("Opening database in read-only {:?}", &file_path);
428 let mem = TransactionalMemory::new(
429 Box::new(ReadOnlyBackend::new(file)),
430 false,
431 page_size,
432 region_size,
433 read_cache_size_bytes,
434 0,
435 true,
436 )?;
437 let mem = Arc::new(mem);
438 if let Some(tree) = Database::get_allocator_state_table(&mem)? {
441 mem.load_allocator_state(&tree)?;
442 } else {
443 #[cfg(feature = "logging")]
444 warn!(
445 "Database {:?} not shutdown cleanly. Repair required",
446 &file_path
447 );
448 return Err(DatabaseError::RepairAborted);
449 }
450
451 let next_transaction_id = mem.get_last_committed_transaction_id()?.next();
452 let db = Self {
453 mem,
454 transaction_tracker: Arc::new(TransactionTracker::new(next_transaction_id)),
455 };
456
457 Ok(db)
458 }
459}
460
461pub struct Database {
495 mem: Arc<TransactionalMemory>,
496 transaction_tracker: Arc<TransactionTracker>,
497}
498
499impl ReadableDatabase for Database {
500 fn begin_read(&self) -> Result<ReadTransaction, TransactionError> {
501 let guard = self.allocate_read_transaction()?;
502 #[cfg(feature = "logging")]
503 debug!("Beginning read transaction id={:?}", guard.id());
504 ReadTransaction::new(self.get_memory(), guard)
505 }
506
507 fn cache_stats(&self) -> CacheStats {
508 self.mem.cache_stats()
509 }
510}
511
512impl Database {
513 pub fn create(path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
518 Self::builder().create(path)
519 }
520
521 pub fn open(path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
523 Self::builder().open(path)
524 }
525
526 pub(crate) fn get_memory(&self) -> Arc<TransactionalMemory> {
527 self.mem.clone()
528 }
529
530 pub(crate) fn verify_primary_checksums(mem: Arc<TransactionalMemory>) -> Result<bool> {
531 let table_tree = TableTree::new(
532 mem.get_data_root(),
533 PageHint::None,
534 Arc::new(TransactionGuard::fake()),
535 mem.clone(),
536 )?;
537 if !table_tree.verify_checksums()? {
538 return Ok(false);
539 }
540 let system_table_tree = TableTree::new(
541 mem.get_system_root(),
542 PageHint::None,
543 Arc::new(TransactionGuard::fake()),
544 mem.clone(),
545 )?;
546 if !system_table_tree.verify_checksums()? {
547 return Ok(false);
548 }
549
550 Ok(true)
551 }
552
553 pub fn check_integrity(&mut self) -> Result<bool, DatabaseError> {
563 let allocator_hash = self.mem.allocator_hash();
564 let mut was_clean = Arc::get_mut(&mut self.mem)
565 .unwrap()
566 .clear_cache_and_reload()?;
567
568 let old_roots = [self.mem.get_data_root(), self.mem.get_system_root()];
569
570 let new_roots = Self::do_repair(&mut self.mem, &|_| {}).map_err(|err| match err {
571 DatabaseError::Storage(storage_err) => storage_err,
572 _ => unreachable!(),
573 })?;
574
575 if old_roots != new_roots || allocator_hash != self.mem.allocator_hash() {
576 was_clean = false;
577 }
578
579 if !was_clean {
580 let next_transaction_id = self.mem.get_last_committed_transaction_id()?.next();
581 let [data_root, system_root] = new_roots;
582 self.mem.commit(
583 data_root,
584 system_root,
585 next_transaction_id,
586 true,
587 ShrinkPolicy::Never,
588 )?;
589 }
590
591 self.mem.begin_writable()?;
592
593 Ok(was_clean)
594 }
595
596 pub fn compact(&mut self) -> Result<bool, CompactionError> {
600 if self
601 .transaction_tracker
602 .oldest_live_read_transaction()
603 .is_some()
604 {
605 return Err(CompactionError::TransactionInProgress);
606 }
607 let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
612 if txn.list_persistent_savepoints()?.next().is_some() {
613 return Err(CompactionError::PersistentSavepointExists);
614 }
615 if self.transaction_tracker.any_savepoint_exists() {
616 return Err(CompactionError::EphemeralSavepointExists);
617 }
618 txn.set_two_phase_commit(true);
619 txn.commit().map_err(|e| e.into_storage_error())?;
620 let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
622 txn.set_two_phase_commit(true);
623 txn.commit().map_err(|e| e.into_storage_error())?;
624 let txn = self.begin_write().map_err(|e| e.into_storage_error())?;
627 assert!(!txn.pending_free_pages()?);
628 txn.abort()?;
629
630 let mut compacted = false;
631 loop {
633 let mut progress = false;
634
635 let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
636 if txn.compact_pages()? {
637 progress = true;
638 txn.commit().map_err(|e| e.into_storage_error())?;
639 } else {
640 txn.abort()?;
641 }
642
643 let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
645 txn.set_two_phase_commit(true);
646 txn.set_shrink_policy(ShrinkPolicy::Maximum);
648 txn.commit().map_err(|e| e.into_storage_error())?;
649 let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
653 txn.set_two_phase_commit(true);
654 txn.set_shrink_policy(ShrinkPolicy::Maximum);
656 txn.commit().map_err(|e| e.into_storage_error())?;
657 let txn = self.begin_write().map_err(|e| e.into_storage_error())?;
658 assert!(!txn.pending_free_pages()?);
659 txn.abort()?;
660
661 if !progress {
662 break;
663 }
664
665 compacted = true;
666 }
667
668 Ok(compacted)
669 }
670
671 #[cfg_attr(not(debug_assertions), expect(dead_code))]
672 fn check_repaired_allocated_pages_table(
673 system_root: Option<BtreeHeader>,
674 mem: Arc<TransactionalMemory>,
675 ) -> Result {
676 let table_tree = TableTree::new(
677 system_root,
678 PageHint::None,
679 Arc::new(TransactionGuard::fake()),
680 mem.clone(),
681 )?;
682 if let Some(table_def) = table_tree
683 .get_table::<TransactionIdWithPagination, PageList>(
684 DATA_ALLOCATED_TABLE.name(),
685 TableType::Normal,
686 )
687 .map_err(|e| e.into_storage_error_or_corrupted("Allocated pages table corrupted"))?
688 {
689 let InternalTableDefinition::Normal { table_root, .. } = table_def else {
690 unreachable!()
691 };
692 let table: ReadOnlyTable<TransactionIdWithPagination, PageList> = ReadOnlyTable::new(
693 DATA_ALLOCATED_TABLE.name().to_string(),
694 table_root,
695 PageHint::None,
696 Arc::new(TransactionGuard::fake()),
697 mem.clone(),
698 )?;
699 for result in table.range::<TransactionIdWithPagination>(..)? {
700 let (_, pages) = result?;
701 for i in 0..pages.value().len() {
702 assert!(mem.is_allocated(pages.value().get(i)));
703 }
704 }
705 }
706
707 Ok(())
708 }
709
710 fn visit_freed_tree<K: Key, V: Value, F>(
711 system_root: Option<BtreeHeader>,
712 table_def: SystemTableDefinition<K, V>,
713 mem: Arc<TransactionalMemory>,
714 mut visitor: F,
715 ) -> Result
716 where
717 F: FnMut(PageNumber) -> Result,
718 {
719 let fake_guard = Arc::new(TransactionGuard::fake());
720 let system_tree = TableTree::new(system_root, PageHint::None, fake_guard, mem.clone())?;
721 let table_name = table_def.name();
722 let result = match system_tree.get_table::<K, V>(table_name, TableType::Normal) {
723 Ok(result) => result,
724 Err(TableError::Storage(err)) => {
725 return Err(err);
726 }
727 Err(TableError::TableDoesNotExist(_)) => {
728 return Ok(());
729 }
730 Err(_) => {
731 return Err(StorageError::Corrupted(format!(
732 "Unable to open {table_name}"
733 )));
734 }
735 };
736
737 if let Some(definition) = result {
738 let table_root = match definition {
739 InternalTableDefinition::Normal { table_root, .. } => table_root,
740 InternalTableDefinition::Multimap { .. } => unreachable!(),
741 };
742 let table: ReadOnlyTable<TransactionIdWithPagination, PageList<'static>> =
743 ReadOnlyTable::new(
744 table_name.to_string(),
745 table_root,
746 PageHint::None,
747 Arc::new(TransactionGuard::fake()),
748 mem.clone(),
749 )?;
750 for result in table.range::<TransactionIdWithPagination>(..)? {
751 let (_, page_list) = result?;
752 for i in 0..page_list.value().len() {
753 visitor(page_list.value().get(i))?;
754 }
755 }
756 }
757
758 Ok(())
759 }
760
761 #[cfg(debug_assertions)]
762 fn mark_allocated_page_for_debug(
763 mem: &mut Arc<TransactionalMemory>, ) -> Result {
765 let data_root = mem.get_data_root();
766 {
767 let fake = Arc::new(TransactionGuard::fake());
768 let tables = TableTree::new(data_root, PageHint::None, fake, mem.clone())?;
769 tables.visit_all_pages(|path| {
770 mem.mark_debug_allocated_page(path.page_number());
771 Ok(())
772 })?;
773 }
774
775 let system_root = mem.get_system_root();
776 {
777 let fake = Arc::new(TransactionGuard::fake());
778 let system_tables = TableTree::new(system_root, PageHint::None, fake, mem.clone())?;
779 system_tables.visit_all_pages(|path| {
780 mem.mark_debug_allocated_page(path.page_number());
781 Ok(())
782 })?;
783 }
784
785 Self::visit_freed_tree(system_root, DATA_FREED_TABLE, mem.clone(), |page| {
786 mem.mark_debug_allocated_page(page);
787 Ok(())
788 })?;
789 Self::visit_freed_tree(system_root, SYSTEM_FREED_TABLE, mem.clone(), |page| {
790 mem.mark_debug_allocated_page(page);
791 Ok(())
792 })?;
793
794 Ok(())
795 }
796
797 fn do_repair(
798 mem: &mut Arc<TransactionalMemory>, repair_callback: &(dyn Fn(&mut RepairSession) + 'static),
800 ) -> Result<[Option<BtreeHeader>; 2], DatabaseError> {
801 if !Self::verify_primary_checksums(mem.clone())? {
802 if mem.used_two_phase_commit() {
803 return Err(DatabaseError::Storage(StorageError::Corrupted(
804 "Primary is corrupted despite 2-phase commit".to_string(),
805 )));
806 }
807
808 let mut handle = RepairSession::new(0.3);
810 repair_callback(&mut handle);
811 if handle.aborted() {
812 return Err(DatabaseError::RepairAborted);
813 }
814
815 mem.repair_primary_corrupted();
816 mem.clear_read_cache();
820 if !Self::verify_primary_checksums(mem.clone())? {
821 return Err(DatabaseError::Storage(StorageError::Corrupted(
822 "Failed to repair database. All roots are corrupted".to_string(),
823 )));
824 }
825 }
826 let mut handle = RepairSession::new(0.6);
828 repair_callback(&mut handle);
829 if handle.aborted() {
830 return Err(DatabaseError::RepairAborted);
831 }
832
833 mem.begin_repair()?;
834
835 let data_root = mem.get_data_root();
836 {
837 let fake = Arc::new(TransactionGuard::fake());
838 let tables = TableTree::new(data_root, PageHint::None, fake, mem.clone())?;
839 tables.visit_all_pages(|path| {
840 mem.mark_page_allocated(path.page_number());
841 Ok(())
842 })?;
843 }
844
845 let mut handle = RepairSession::new(0.9);
847 repair_callback(&mut handle);
848 if handle.aborted() {
849 return Err(DatabaseError::RepairAborted);
850 }
851
852 let system_root = mem.get_system_root();
853 {
854 let fake = Arc::new(TransactionGuard::fake());
855 let system_tables = TableTree::new(system_root, PageHint::None, fake, mem.clone())?;
856 system_tables.visit_all_pages(|path| {
857 mem.mark_page_allocated(path.page_number());
858 Ok(())
859 })?;
860 }
861
862 Self::visit_freed_tree(system_root, DATA_FREED_TABLE, mem.clone(), |page| {
863 mem.mark_page_allocated(page);
864 Ok(())
865 })?;
866 Self::visit_freed_tree(system_root, SYSTEM_FREED_TABLE, mem.clone(), |page| {
867 mem.mark_page_allocated(page);
868 Ok(())
869 })?;
870 #[cfg(debug_assertions)]
871 {
872 Self::check_repaired_allocated_pages_table(system_root, mem.clone())?;
873 }
874
875 mem.end_repair()?;
876
877 mem.clear_read_cache();
880
881 Ok([data_root, system_root])
882 }
883
884 #[allow(clippy::too_many_arguments)]
885 fn new(
886 file: Box<dyn StorageBackend>,
887 allow_initialize: bool,
888 page_size: usize,
889 region_size: Option<u64>,
890 read_cache_size_bytes: usize,
891 write_cache_size_bytes: usize,
892 repair_callback: &(dyn Fn(&mut RepairSession) + 'static),
893 ) -> Result<Self, DatabaseError> {
894 #[cfg(feature = "logging")]
895 let file_path = format!("{:?}", &file);
896 #[cfg(feature = "logging")]
897 info!("Opening database {:?}", &file_path);
898 let mem = TransactionalMemory::new(
899 file,
900 allow_initialize,
901 page_size,
902 region_size,
903 read_cache_size_bytes,
904 write_cache_size_bytes,
905 false,
906 )?;
907 let mut mem = Arc::new(mem);
908 if let Some(tree) = Self::get_allocator_state_table(&mem)? {
911 #[cfg(feature = "logging")]
912 info!("Found valid allocator state, full repair not needed");
913 mem.load_allocator_state(&tree)?;
914 #[cfg(debug_assertions)]
915 Self::mark_allocated_page_for_debug(&mut mem)?;
916 } else {
917 #[cfg(feature = "logging")]
918 warn!("Database {:?} not shutdown cleanly. Repairing", &file_path);
919 let mut handle = RepairSession::new(0.0);
920 repair_callback(&mut handle);
921 if handle.aborted() {
922 return Err(DatabaseError::RepairAborted);
923 }
924 let [data_root, system_root] = Self::do_repair(&mut mem, repair_callback)?;
925 let next_transaction_id = mem.get_last_committed_transaction_id()?.next();
926 mem.commit(
927 data_root,
928 system_root,
929 next_transaction_id,
930 true,
931 ShrinkPolicy::Never,
932 )?;
933 }
934
935 mem.begin_writable()?;
936 let next_transaction_id = mem.get_last_committed_transaction_id()?.next();
937
938 let db = Database {
939 mem,
940 transaction_tracker: Arc::new(TransactionTracker::new(next_transaction_id)),
941 };
942
943 let txn = db.begin_write().map_err(|e| e.into_storage_error())?;
945 if let Some(next_id) = txn.next_persistent_savepoint_id()? {
946 db.transaction_tracker
947 .restore_savepoint_counter_state(next_id);
948 }
949 for id in txn.list_persistent_savepoints()? {
950 let savepoint = match txn.get_persistent_savepoint(id) {
951 Ok(savepoint) => savepoint,
952 Err(err) => match err {
953 SavepointError::InvalidSavepoint => unreachable!(),
954 SavepointError::Storage(storage) => {
955 return Err(storage.into());
956 }
957 },
958 };
959 db.transaction_tracker
960 .register_persistent_savepoint(&savepoint);
961 }
962 txn.abort()?;
963
964 Ok(db)
965 }
966
967 fn get_allocator_state_table(
968 mem: &Arc<TransactionalMemory>,
969 ) -> Result<Option<AllocatorStateTree>> {
970 if !mem.used_two_phase_commit() {
972 return Ok(None);
973 }
974
975 let system_table_tree = TableTree::new(
977 mem.get_system_root(),
978 PageHint::None,
979 Arc::new(TransactionGuard::fake()),
980 mem.clone(),
981 )?;
982 let Some(allocator_state_table) = system_table_tree
983 .get_table::<AllocatorStateKey, &[u8]>(ALLOCATOR_STATE_TABLE_NAME, TableType::Normal)
984 .map_err(|e| e.into_storage_error_or_corrupted("Unexpected TableError"))?
985 else {
986 return Ok(None);
987 };
988
989 let InternalTableDefinition::Normal { table_root, .. } = allocator_state_table else {
991 unreachable!();
992 };
993 let tree = AllocatorStateTree::new(
994 table_root,
995 PageHint::None,
996 Arc::new(TransactionGuard::fake()),
997 mem.clone(),
998 )?;
999
1000 if !mem.is_valid_allocator_state(&tree)? {
1002 return Ok(None);
1003 }
1004
1005 Ok(Some(tree))
1006 }
1007
1008 fn allocate_read_transaction(&self) -> Result<TransactionGuard> {
1009 let id = self
1010 .transaction_tracker
1011 .register_read_transaction(&self.mem)?;
1012
1013 Ok(TransactionGuard::new_read(
1014 id,
1015 self.transaction_tracker.clone(),
1016 ))
1017 }
1018
1019 pub fn builder() -> Builder {
1021 Builder::new()
1022 }
1023
1024 pub fn begin_write(&self) -> Result<WriteTransaction, TransactionError> {
1030 self.mem.check_io_errors()?;
1032 let guard = TransactionGuard::new_write(
1033 self.transaction_tracker.start_write_transaction(),
1034 self.transaction_tracker.clone(),
1035 );
1036 WriteTransaction::new(guard, self.transaction_tracker.clone(), self.mem.clone())
1037 .map_err(|e| e.into())
1038 }
1039
1040 fn ensure_allocator_state_table_and_trim(&self) -> Result<(), Error> {
1041 #[cfg(feature = "logging")]
1043 debug!("Writing allocator state table");
1044 let mut tx = self.begin_write()?;
1045 tx.set_quick_repair(true);
1046 tx.set_shrink_policy(ShrinkPolicy::Maximum);
1047 tx.commit()?;
1048
1049 Ok(())
1050 }
1051}
1052
1053impl Drop for Database {
1054 fn drop(&mut self) {
1055 if !thread::panicking() && self.ensure_allocator_state_table_and_trim().is_err() {
1056 #[cfg(feature = "logging")]
1057 warn!("Failed to write allocator state table. Repair may be required at restart.");
1058 }
1059
1060 if self.mem.close().is_err() {
1061 #[cfg(feature = "logging")]
1062 warn!("Failed to flush database file. Repair may be required at restart.");
1063 }
1064 }
1065}
1066
1067pub struct RepairSession {
1068 progress: f64,
1069 aborted: bool,
1070}
1071
1072impl RepairSession {
1073 pub(crate) fn new(progress: f64) -> Self {
1074 Self {
1075 progress,
1076 aborted: false,
1077 }
1078 }
1079
1080 pub(crate) fn aborted(&self) -> bool {
1081 self.aborted
1082 }
1083
1084 pub fn abort(&mut self) {
1086 self.aborted = true;
1087 }
1088
1089 pub fn progress(&self) -> f64 {
1091 self.progress
1092 }
1093}
1094
1095pub struct Builder {
1097 page_size: usize,
1098 region_size: Option<u64>,
1099 read_cache_size_bytes: usize,
1100 write_cache_size_bytes: usize,
1101 repair_callback: Box<dyn Fn(&mut RepairSession)>,
1102}
1103
1104impl Builder {
1105 #[allow(clippy::new_without_default)]
1111 pub fn new() -> Self {
1112 let mut result = Self {
1113 page_size: PAGE_SIZE,
1117 region_size: None,
1118 read_cache_size_bytes: 0,
1120 write_cache_size_bytes: 0,
1122 repair_callback: Box::new(|_| {}),
1123 };
1124
1125 result.set_cache_size(1024 * 1024 * 1024);
1126 result
1127 }
1128
1129 pub fn set_repair_callback(
1137 &mut self,
1138 callback: impl Fn(&mut RepairSession) + 'static,
1139 ) -> &mut Self {
1140 self.repair_callback = Box::new(callback);
1141 self
1142 }
1143
1144 #[cfg(any(fuzzing, test))]
1152 pub fn set_page_size(&mut self, size: usize) -> &mut Self {
1153 assert!(size.is_power_of_two());
1154 self.page_size = std::cmp::max(size, 512);
1155 self
1156 }
1157
1158 pub fn set_cache_size(&mut self, bytes: usize) -> &mut Self {
1160 self.read_cache_size_bytes = bytes / 10 * 9;
1162 self.write_cache_size_bytes = bytes / 10;
1163 self
1164 }
1165
1166 #[cfg(any(test, fuzzing))]
1167 pub fn set_region_size(&mut self, size: u64) -> &mut Self {
1168 assert!(size.is_power_of_two());
1169 self.region_size = Some(size);
1170 self
1171 }
1172
1173 pub fn create(&self, path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
1178 let file = OpenOptions::new()
1179 .read(true)
1180 .write(true)
1181 .create(true)
1182 .truncate(false)
1183 .open(path)?;
1184
1185 Database::new(
1186 Box::new(FileBackend::new(file)?),
1187 true,
1188 self.page_size,
1189 self.region_size,
1190 self.read_cache_size_bytes,
1191 self.write_cache_size_bytes,
1192 &self.repair_callback,
1193 )
1194 }
1195
1196 pub fn open(&self, path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
1198 let file = OpenOptions::new().read(true).write(true).open(path)?;
1199
1200 Database::new(
1201 Box::new(FileBackend::new(file)?),
1202 false,
1203 self.page_size,
1204 None,
1205 self.read_cache_size_bytes,
1206 self.write_cache_size_bytes,
1207 &self.repair_callback,
1208 )
1209 }
1210
1211 pub fn open_read_only(
1217 &self,
1218 path: impl AsRef<Path>,
1219 ) -> Result<ReadOnlyDatabase, DatabaseError> {
1220 let file = OpenOptions::new().read(true).open(path)?;
1221
1222 ReadOnlyDatabase::new(
1223 Box::new(FileBackend::new_internal(file, true)?),
1224 self.page_size,
1225 None,
1226 self.read_cache_size_bytes,
1227 )
1228 }
1229
1230 pub fn create_file(&self, file: File) -> Result<Database, DatabaseError> {
1234 Database::new(
1235 Box::new(FileBackend::new(file)?),
1236 true,
1237 self.page_size,
1238 self.region_size,
1239 self.read_cache_size_bytes,
1240 self.write_cache_size_bytes,
1241 &self.repair_callback,
1242 )
1243 }
1244
1245 pub fn create_with_backend(
1247 &self,
1248 backend: impl StorageBackend,
1249 ) -> Result<Database, DatabaseError> {
1250 Database::new(
1251 Box::new(backend),
1252 true,
1253 self.page_size,
1254 self.region_size,
1255 self.read_cache_size_bytes,
1256 self.write_cache_size_bytes,
1257 &self.repair_callback,
1258 )
1259 }
1260}
1261
1262impl std::fmt::Debug for Database {
1263 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1264 f.debug_struct("Database").finish()
1265 }
1266}
1267
1268#[cfg(test)]
1269mod test {
1270 use crate::backends::FileBackend;
1271 use crate::{
1272 CommitError, Database, DatabaseError, Durability, ReadableTable, StorageBackend,
1273 StorageError, TableDefinition, TransactionError,
1274 };
1275 use std::fs::File;
1276 use std::io::{ErrorKind, Read, Seek, SeekFrom};
1277 use std::sync::Arc;
1278 use std::sync::atomic::{AtomicU64, Ordering};
1279
1280 #[derive(Debug)]
1281 struct FailingBackend {
1282 inner: FileBackend,
1283 countdown: Arc<AtomicU64>,
1284 }
1285
1286 impl FailingBackend {
1287 fn new(backend: FileBackend, countdown: u64) -> Self {
1288 Self {
1289 inner: backend,
1290 countdown: Arc::new(AtomicU64::new(countdown)),
1291 }
1292 }
1293
1294 fn check_countdown(&self) -> Result<(), std::io::Error> {
1295 if self.countdown.load(Ordering::SeqCst) == 0 {
1296 return Err(std::io::Error::from(ErrorKind::Other));
1297 }
1298
1299 Ok(())
1300 }
1301
1302 fn decrement_countdown(&self) -> Result<(), std::io::Error> {
1303 if self
1304 .countdown
1305 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| {
1306 if x > 0 { Some(x - 1) } else { None }
1307 })
1308 .is_err()
1309 {
1310 return Err(std::io::Error::from(ErrorKind::Other));
1311 }
1312
1313 Ok(())
1314 }
1315 }
1316
1317 impl StorageBackend for FailingBackend {
1318 fn len(&self) -> Result<u64, std::io::Error> {
1319 self.inner.len()
1320 }
1321
1322 fn read(&self, offset: u64, out: &mut [u8]) -> Result<(), std::io::Error> {
1323 self.check_countdown()?;
1324 self.inner.read(offset, out)
1325 }
1326
1327 fn set_len(&self, len: u64) -> Result<(), std::io::Error> {
1328 self.inner.set_len(len)
1329 }
1330
1331 fn sync_data(&self) -> Result<(), std::io::Error> {
1332 self.check_countdown()?;
1333 self.inner.sync_data()
1334 }
1335
1336 fn write(&self, offset: u64, data: &[u8]) -> Result<(), std::io::Error> {
1337 self.decrement_countdown()?;
1338 self.inner.write(offset, data)
1339 }
1340 }
1341
1342 #[test]
1343 fn crash_regression4() {
1344 let tmpfile = crate::create_tempfile();
1345 let (file, path) = tmpfile.into_parts();
1346
1347 let backend = FailingBackend::new(FileBackend::new(file).unwrap(), 20);
1348 let db = Database::builder()
1349 .set_cache_size(12686)
1350 .set_page_size(8 * 1024)
1351 .set_region_size(32 * 4096)
1352 .create_with_backend(backend)
1353 .unwrap();
1354
1355 let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1356
1357 let tx = db.begin_write().unwrap();
1358 let _savepoint = tx.ephemeral_savepoint().unwrap();
1359 let _persistent_savepoint = tx.persistent_savepoint().unwrap();
1360 tx.commit().unwrap();
1361 let tx = db.begin_write().unwrap();
1362 {
1363 let mut table = tx.open_table(table_def).unwrap();
1364 let _ = table.insert_reserve(118821, 360).unwrap();
1365 }
1366 let result = tx.commit();
1367 assert!(result.is_err());
1368
1369 drop(db);
1370 Database::builder()
1371 .set_cache_size(1024 * 1024)
1372 .set_page_size(8 * 1024)
1373 .set_region_size(32 * 4096)
1374 .create(&path)
1375 .unwrap();
1376 }
1377
1378 #[test]
1379 fn transient_io_error() {
1380 let tmpfile = crate::create_tempfile();
1381 let (file, path) = tmpfile.into_parts();
1382
1383 let backend = FailingBackend::new(FileBackend::new(file).unwrap(), u64::MAX);
1384 let countdown = backend.countdown.clone();
1385 let db = Database::builder()
1386 .set_cache_size(0)
1387 .create_with_backend(backend)
1388 .unwrap();
1389
1390 let table_def: TableDefinition<u64, u64> = TableDefinition::new("x");
1391
1392 let tx = db.begin_write().unwrap();
1394 {
1395 let mut table = tx.open_table(table_def).unwrap();
1396 table.insert(0, 0).unwrap();
1397 }
1398 tx.commit().unwrap();
1399 let tx = db.begin_write().unwrap();
1400 {
1401 let mut table = tx.open_table(table_def).unwrap();
1402 table.insert(0, 1).unwrap();
1403 }
1404 tx.commit().unwrap();
1405
1406 let tx = db.begin_write().unwrap();
1407 countdown.store(0, Ordering::SeqCst);
1409 let result = tx.commit().err().unwrap();
1410 assert!(matches!(result, CommitError::Storage(StorageError::Io(_))));
1411 let result = db.begin_write().err().unwrap();
1412 assert!(matches!(
1413 result,
1414 TransactionError::Storage(StorageError::PreviousIo)
1415 ));
1416 countdown.store(u64::MAX, Ordering::SeqCst);
1418 drop(db);
1419
1420 let mut file = File::open(&path).unwrap();
1422 file.seek(SeekFrom::Start(9)).unwrap();
1423 let mut god_byte = vec![0u8];
1424 assert_eq!(file.read(&mut god_byte).unwrap(), 1);
1425 assert_ne!(god_byte[0] & 2, 0);
1426 }
1427
1428 #[test]
1429 fn small_pages() {
1430 let tmpfile = crate::create_tempfile();
1431
1432 let db = Database::builder()
1433 .set_page_size(512)
1434 .create(tmpfile.path())
1435 .unwrap();
1436
1437 let table_definition: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1438 let txn = db.begin_write().unwrap();
1439 {
1440 txn.open_table(table_definition).unwrap();
1441 }
1442 txn.commit().unwrap();
1443 }
1444
1445 #[test]
1446 fn small_pages2() {
1447 let tmpfile = crate::create_tempfile();
1448
1449 let db = Database::builder()
1450 .set_page_size(512)
1451 .create(tmpfile.path())
1452 .unwrap();
1453
1454 let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1455
1456 let mut tx = db.begin_write().unwrap();
1457 tx.set_two_phase_commit(true);
1458 let savepoint0 = tx.ephemeral_savepoint().unwrap();
1459 {
1460 tx.open_table(table_def).unwrap();
1461 }
1462 tx.commit().unwrap();
1463
1464 let mut tx = db.begin_write().unwrap();
1465 tx.set_two_phase_commit(true);
1466 let savepoint1 = tx.ephemeral_savepoint().unwrap();
1467 tx.restore_savepoint(&savepoint0).unwrap();
1468 tx.set_durability(Durability::None).unwrap();
1469 {
1470 let mut t = tx.open_table(table_def).unwrap();
1471 t.insert_reserve(&660503, 489).unwrap().as_mut().fill(0xFF);
1472 assert!(t.remove(&291295).unwrap().is_none());
1473 }
1474 tx.commit().unwrap();
1475
1476 let mut tx = db.begin_write().unwrap();
1477 tx.set_two_phase_commit(true);
1478 tx.restore_savepoint(&savepoint0).unwrap();
1479 {
1480 tx.open_table(table_def).unwrap();
1481 }
1482 tx.commit().unwrap();
1483
1484 let mut tx = db.begin_write().unwrap();
1485 tx.set_two_phase_commit(true);
1486 let savepoint2 = tx.ephemeral_savepoint().unwrap();
1487 drop(savepoint0);
1488 tx.restore_savepoint(&savepoint2).unwrap();
1489 {
1490 let mut t = tx.open_table(table_def).unwrap();
1491 assert!(t.get(&2059).unwrap().is_none());
1492 assert!(t.remove(&145227).unwrap().is_none());
1493 assert!(t.remove(&145227).unwrap().is_none());
1494 }
1495 tx.commit().unwrap();
1496
1497 let mut tx = db.begin_write().unwrap();
1498 tx.set_two_phase_commit(true);
1499 let savepoint3 = tx.ephemeral_savepoint().unwrap();
1500 drop(savepoint1);
1501 tx.restore_savepoint(&savepoint3).unwrap();
1502 {
1503 tx.open_table(table_def).unwrap();
1504 }
1505 tx.commit().unwrap();
1506
1507 let mut tx = db.begin_write().unwrap();
1508 tx.set_two_phase_commit(true);
1509 let savepoint4 = tx.ephemeral_savepoint().unwrap();
1510 drop(savepoint2);
1511 tx.restore_savepoint(&savepoint3).unwrap();
1512 tx.set_durability(Durability::None).unwrap();
1513 {
1514 let mut t = tx.open_table(table_def).unwrap();
1515 assert!(t.remove(&207936).unwrap().is_none());
1516 }
1517 tx.abort().unwrap();
1518
1519 let mut tx = db.begin_write().unwrap();
1520 tx.set_two_phase_commit(true);
1521 let savepoint5 = tx.ephemeral_savepoint().unwrap();
1522 drop(savepoint3);
1523 assert!(tx.restore_savepoint(&savepoint4).is_err());
1524 {
1525 tx.open_table(table_def).unwrap();
1526 }
1527 tx.commit().unwrap();
1528
1529 let mut tx = db.begin_write().unwrap();
1530 tx.set_two_phase_commit(true);
1531 tx.restore_savepoint(&savepoint5).unwrap();
1532 tx.set_durability(Durability::None).unwrap();
1533 {
1534 tx.open_table(table_def).unwrap();
1535 }
1536 tx.commit().unwrap();
1537 }
1538
1539 #[test]
1540 fn small_pages3() {
1541 let tmpfile = crate::create_tempfile();
1542
1543 let db = Database::builder()
1544 .set_page_size(1024)
1545 .create(tmpfile.path())
1546 .unwrap();
1547
1548 let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1549
1550 let mut tx = db.begin_write().unwrap();
1551 let _savepoint0 = tx.ephemeral_savepoint().unwrap();
1552 tx.set_durability(Durability::None).unwrap();
1553 {
1554 let mut t = tx.open_table(table_def).unwrap();
1555 let value = vec![0; 306];
1556 t.insert(&539717, value.as_slice()).unwrap();
1557 }
1558 tx.abort().unwrap();
1559
1560 let mut tx = db.begin_write().unwrap();
1561 let savepoint1 = tx.ephemeral_savepoint().unwrap();
1562 tx.restore_savepoint(&savepoint1).unwrap();
1563 tx.set_durability(Durability::None).unwrap();
1564 {
1565 let mut t = tx.open_table(table_def).unwrap();
1566 let value = vec![0; 2008];
1567 t.insert(&784384, value.as_slice()).unwrap();
1568 }
1569 tx.abort().unwrap();
1570 }
1571
1572 #[test]
1573 fn small_pages4() {
1574 let tmpfile = crate::create_tempfile();
1575
1576 let db = Database::builder()
1577 .set_cache_size(1024 * 1024)
1578 .set_page_size(1024)
1579 .create(tmpfile.path())
1580 .unwrap();
1581
1582 let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1583
1584 let tx = db.begin_write().unwrap();
1585 {
1586 tx.open_table(table_def).unwrap();
1587 }
1588 tx.commit().unwrap();
1589
1590 let tx = db.begin_write().unwrap();
1591 {
1592 let mut t = tx.open_table(table_def).unwrap();
1593 assert!(t.get(&131072).unwrap().is_none());
1594 let value = vec![0xFF; 1130];
1595 t.insert(&42394, value.as_slice()).unwrap();
1596 t.insert_reserve(&744037, 3645).unwrap().as_mut().fill(0xFF);
1597 assert!(t.get(&0).unwrap().is_none());
1598 }
1599 tx.abort().unwrap();
1600
1601 let tx = db.begin_write().unwrap();
1602 {
1603 let mut t = tx.open_table(table_def).unwrap();
1604 t.insert_reserve(&118749, 734).unwrap().as_mut().fill(0xFF);
1605 }
1606 tx.abort().unwrap();
1607 }
1608
1609 #[test]
1610 fn dynamic_shrink() {
1611 let tmpfile = crate::create_tempfile();
1612 let table_definition: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1613 let big_value = vec![0u8; 1024];
1614
1615 let db = Database::builder()
1616 .set_region_size(1024 * 1024)
1617 .create(tmpfile.path())
1618 .unwrap();
1619
1620 let txn = db.begin_write().unwrap();
1621 {
1622 let mut table = txn.open_table(table_definition).unwrap();
1623 for i in 0..2048 {
1624 table.insert(&i, big_value.as_slice()).unwrap();
1625 }
1626 }
1627 txn.commit().unwrap();
1628
1629 let file_size = tmpfile.as_file().metadata().unwrap().len();
1630
1631 let txn = db.begin_write().unwrap();
1632 {
1633 let mut table = txn.open_table(table_definition).unwrap();
1634 for i in 0..2048 {
1635 table.remove(&i).unwrap();
1636 }
1637 }
1638 txn.commit().unwrap();
1639
1640 let txn = db.begin_write().unwrap();
1642 {
1643 let mut table = txn.open_table(table_definition).unwrap();
1644 table.insert(0, [].as_slice()).unwrap();
1645 }
1646 txn.commit().unwrap();
1647 let txn = db.begin_write().unwrap();
1648 {
1649 let mut table = txn.open_table(table_definition).unwrap();
1650 table.remove(0).unwrap();
1651 }
1652 txn.commit().unwrap();
1653 let txn = db.begin_write().unwrap();
1654 txn.commit().unwrap();
1655
1656 let final_file_size = tmpfile.as_file().metadata().unwrap().len();
1657 assert!(final_file_size < file_size);
1658 }
1659
1660 #[test]
1661 fn create_new_db_in_empty_file() {
1662 let tmpfile = crate::create_tempfile();
1663
1664 let _db = Database::builder()
1665 .create_file(tmpfile.into_file())
1666 .unwrap();
1667 }
1668
1669 #[test]
1670 fn open_missing_file() {
1671 let tmpfile = crate::create_tempfile();
1672
1673 let err = Database::builder()
1674 .open(tmpfile.path().with_extension("missing"))
1675 .unwrap_err();
1676
1677 match err {
1678 DatabaseError::Storage(StorageError::Io(err)) if err.kind() == ErrorKind::NotFound => {}
1679 err => panic!("Unexpected error for empty file: {err}"),
1680 }
1681 }
1682
1683 #[test]
1684 fn open_empty_file() {
1685 let tmpfile = crate::create_tempfile();
1686
1687 let err = Database::builder().open(tmpfile.path()).unwrap_err();
1688
1689 match err {
1690 DatabaseError::Storage(StorageError::Io(err))
1691 if err.kind() == ErrorKind::InvalidData => {}
1692 err => panic!("Unexpected error for empty file: {err}"),
1693 }
1694 }
1695}