1use crate::transaction_tracker::{TransactionId, TransactionTracker};
2use crate::tree_store::{
3 BtreeHeader, InternalTableDefinition, PageHint, PageNumber, ReadOnlyBackend, ShrinkPolicy,
4 TableTree, TableType, TransactionalMemory, PAGE_SIZE,
5};
6use crate::types::{Key, Value};
7use crate::{
8 CompactionError, DatabaseError, Error, ReadOnlyTable, SavepointError, StorageError, TableError,
9};
10use crate::{ReadTransaction, Result, WriteTransaction};
11use std::fmt::{Debug, Display, Formatter};
12
13use std::fs::{File, OpenOptions};
14use std::marker::PhantomData;
15use std::path::Path;
16use std::sync::Arc;
17use std::{io, thread};
18
19use crate::error::TransactionError;
20use crate::sealed::Sealed;
21use crate::transactions::{
22 AllocatorStateKey, AllocatorStateTree, PageList, SystemTableDefinition,
23 TransactionIdWithPagination, ALLOCATOR_STATE_TABLE_NAME, DATA_ALLOCATED_TABLE,
24 DATA_FREED_TABLE, SYSTEM_FREED_TABLE,
25};
26use crate::tree_store::file_backend::FileBackend;
27#[cfg(feature = "logging")]
28use log::{debug, info, warn};
29
30#[allow(clippy::len_without_is_empty)]
31pub trait StorageBackend: 'static + Debug + Send + Sync {
33 fn len(&self) -> std::result::Result<u64, io::Error>;
35
36 fn read(&self, offset: u64, out: &mut [u8]) -> std::result::Result<(), io::Error>;
40
41 fn set_len(&self, len: u64) -> std::result::Result<(), io::Error>;
45
46 fn sync_data(&self) -> std::result::Result<(), io::Error>;
48
49 fn write(&self, offset: u64, data: &[u8]) -> std::result::Result<(), io::Error>;
51
52 fn close(&self) -> std::result::Result<(), io::Error> {
57 Ok(())
58 }
59}
60
61pub trait TableHandle: Sealed {
62 fn name(&self) -> &str;
64}
65
66#[derive(Clone)]
67pub struct UntypedTableHandle {
68 name: String,
69}
70
71impl UntypedTableHandle {
72 pub(crate) fn new(name: String) -> Self {
73 Self { name }
74 }
75}
76
77impl TableHandle for UntypedTableHandle {
78 fn name(&self) -> &str {
79 &self.name
80 }
81}
82
83impl Sealed for UntypedTableHandle {}
84
85pub trait MultimapTableHandle: Sealed {
86 fn name(&self) -> &str;
88}
89
90#[derive(Clone)]
91pub struct UntypedMultimapTableHandle {
92 name: String,
93}
94
95impl UntypedMultimapTableHandle {
96 pub(crate) fn new(name: String) -> Self {
97 Self { name }
98 }
99}
100
101impl MultimapTableHandle for UntypedMultimapTableHandle {
102 fn name(&self) -> &str {
103 &self.name
104 }
105}
106
107impl Sealed for UntypedMultimapTableHandle {}
108
109pub struct TableDefinition<'a, K: Key + 'static, V: Value + 'static> {
116 name: &'a str,
117 _key_type: PhantomData<K>,
118 _value_type: PhantomData<V>,
119}
120
121impl<'a, K: Key + 'static, V: Value + 'static> TableDefinition<'a, K, V> {
122 pub const fn new(name: &'a str) -> Self {
128 assert!(!name.is_empty());
129 Self {
130 name,
131 _key_type: PhantomData,
132 _value_type: PhantomData,
133 }
134 }
135}
136
137impl<K: Key + 'static, V: Value + 'static> TableHandle for TableDefinition<'_, K, V> {
138 fn name(&self) -> &str {
139 self.name
140 }
141}
142
143impl<K: Key, V: Value> Sealed for TableDefinition<'_, K, V> {}
144
145impl<K: Key + 'static, V: Value + 'static> Clone for TableDefinition<'_, K, V> {
146 fn clone(&self) -> Self {
147 *self
148 }
149}
150
151impl<K: Key + 'static, V: Value + 'static> Copy for TableDefinition<'_, K, V> {}
152
153impl<K: Key + 'static, V: Value + 'static> Display for TableDefinition<'_, K, V> {
154 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
155 write!(
156 f,
157 "{}<{}, {}>",
158 self.name,
159 K::type_name().name(),
160 V::type_name().name()
161 )
162 }
163}
164
165pub struct MultimapTableDefinition<'a, K: Key + 'static, V: Key + 'static> {
174 name: &'a str,
175 _key_type: PhantomData<K>,
176 _value_type: PhantomData<V>,
177}
178
179impl<'a, K: Key + 'static, V: Key + 'static> MultimapTableDefinition<'a, K, V> {
180 pub const fn new(name: &'a str) -> Self {
181 assert!(!name.is_empty());
182 Self {
183 name,
184 _key_type: PhantomData,
185 _value_type: PhantomData,
186 }
187 }
188}
189
190impl<K: Key + 'static, V: Key + 'static> MultimapTableHandle for MultimapTableDefinition<'_, K, V> {
191 fn name(&self) -> &str {
192 self.name
193 }
194}
195
196impl<K: Key, V: Key> Sealed for MultimapTableDefinition<'_, K, V> {}
197
198impl<K: Key + 'static, V: Key + 'static> Clone for MultimapTableDefinition<'_, K, V> {
199 fn clone(&self) -> Self {
200 *self
201 }
202}
203
204impl<K: Key + 'static, V: Key + 'static> Copy for MultimapTableDefinition<'_, K, V> {}
205
206impl<K: Key + 'static, V: Key + 'static> Display for MultimapTableDefinition<'_, K, V> {
207 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
208 write!(
209 f,
210 "{}<{}, {}>",
211 self.name,
212 K::type_name().name(),
213 V::type_name().name()
214 )
215 }
216}
217
218#[derive(Debug)]
222pub struct CacheStats {
223 pub(crate) evictions: u64,
224 pub(crate) read_hits: u64,
225 pub(crate) read_misses: u64,
226 pub(crate) write_hits: u64,
227 pub(crate) write_misses: u64,
228 pub(crate) used_bytes: usize,
229}
230
231impl CacheStats {
232 pub fn evictions(&self) -> u64 {
236 self.evictions
237 }
238
239 pub fn read_hits(&self) -> u64 {
241 self.read_hits
242 }
243
244 pub fn read_misses(&self) -> u64 {
246 self.read_misses
247 }
248
249 pub fn write_hits(&self) -> u64 {
251 self.write_hits
252 }
253
254 pub fn write_misses(&self) -> u64 {
256 self.write_misses
257 }
258
259 pub fn used_bytes(&self) -> usize {
261 self.used_bytes
262 }
263}
264
265pub(crate) struct TransactionGuard {
266 transaction_tracker: Option<Arc<TransactionTracker>>,
267 transaction_id: Option<TransactionId>,
268 write_transaction: bool,
269}
270
271impl TransactionGuard {
272 pub(crate) fn new_read(
273 transaction_id: TransactionId,
274 tracker: Arc<TransactionTracker>,
275 ) -> Self {
276 Self {
277 transaction_tracker: Some(tracker),
278 transaction_id: Some(transaction_id),
279 write_transaction: false,
280 }
281 }
282
283 pub(crate) fn new_write(
284 transaction_id: TransactionId,
285 tracker: Arc<TransactionTracker>,
286 ) -> Self {
287 Self {
288 transaction_tracker: Some(tracker),
289 transaction_id: Some(transaction_id),
290 write_transaction: true,
291 }
292 }
293
294 pub(crate) fn fake() -> Self {
296 Self {
297 transaction_tracker: None,
298 transaction_id: None,
299 write_transaction: false,
300 }
301 }
302
303 pub(crate) fn id(&self) -> TransactionId {
304 self.transaction_id.unwrap()
305 }
306
307 pub(crate) fn leak(mut self) -> TransactionId {
308 self.transaction_id.take().unwrap()
309 }
310}
311
312impl Drop for TransactionGuard {
313 fn drop(&mut self) {
314 if self.transaction_tracker.is_none() {
315 return;
316 }
317 if let Some(transaction_id) = self.transaction_id {
318 if self.write_transaction {
319 self.transaction_tracker
320 .as_ref()
321 .unwrap()
322 .end_write_transaction(transaction_id);
323 } else {
324 self.transaction_tracker
325 .as_ref()
326 .unwrap()
327 .deallocate_read_transaction(transaction_id);
328 }
329 }
330 }
331}
332
333pub trait ReadableDatabase {
334 fn begin_read(&self) -> Result<ReadTransaction, TransactionError>;
342
343 fn cache_stats(&self) -> CacheStats;
347}
348
349pub struct ReadOnlyDatabase {
390 mem: Arc<TransactionalMemory>,
391 transaction_tracker: Arc<TransactionTracker>,
392}
393
394impl ReadableDatabase for ReadOnlyDatabase {
395 fn begin_read(&self) -> Result<ReadTransaction, TransactionError> {
396 let id = self
397 .transaction_tracker
398 .register_read_transaction(&self.mem)?;
399 #[cfg(feature = "logging")]
400 debug!("Beginning read transaction id={id:?}");
401
402 let guard = TransactionGuard::new_read(id, self.transaction_tracker.clone());
403
404 ReadTransaction::new(self.mem.clone(), guard)
405 }
406
407 fn cache_stats(&self) -> CacheStats {
408 self.mem.cache_stats()
409 }
410}
411
412impl ReadOnlyDatabase {
413 pub fn open(path: impl AsRef<Path>) -> Result<ReadOnlyDatabase, DatabaseError> {
415 Builder::new().open_read_only(path)
416 }
417
418 fn new(
419 file: Box<dyn StorageBackend>,
420 page_size: usize,
421 region_size: Option<u64>,
422 read_cache_size_bytes: usize,
423 ) -> Result<Self, DatabaseError> {
424 #[cfg(feature = "logging")]
425 let file_path = format!("{:?}", &file);
426 #[cfg(feature = "logging")]
427 info!("Opening database in read-only {:?}", &file_path);
428 let mem = TransactionalMemory::new(
429 Box::new(ReadOnlyBackend::new(file)),
430 false,
431 page_size,
432 region_size,
433 read_cache_size_bytes,
434 0,
435 true,
436 )?;
437 let mem = Arc::new(mem);
438 if let Some(tree) = Database::get_allocator_state_table(&mem)? {
441 mem.load_allocator_state(&tree)?;
442 } else {
443 #[cfg(feature = "logging")]
444 warn!(
445 "Database {:?} not shutdown cleanly. Repair required",
446 &file_path
447 );
448 return Err(DatabaseError::RepairAborted);
449 }
450
451 let next_transaction_id = mem.get_last_committed_transaction_id()?.next();
452 let db = Self {
453 mem,
454 transaction_tracker: Arc::new(TransactionTracker::new(next_transaction_id)),
455 };
456
457 Ok(db)
458 }
459}
460
461pub struct Database {
495 mem: Arc<TransactionalMemory>,
496 transaction_tracker: Arc<TransactionTracker>,
497}
498
499impl ReadableDatabase for Database {
500 fn begin_read(&self) -> Result<ReadTransaction, TransactionError> {
501 let guard = self.allocate_read_transaction()?;
502 #[cfg(feature = "logging")]
503 debug!("Beginning read transaction id={:?}", guard.id());
504 ReadTransaction::new(self.get_memory(), guard)
505 }
506
507 fn cache_stats(&self) -> CacheStats {
508 self.mem.cache_stats()
509 }
510}
511
512impl Database {
513 pub fn create(path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
518 Self::builder().create(path)
519 }
520
521 pub fn open(path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
523 Self::builder().open(path)
524 }
525
526 pub(crate) fn get_memory(&self) -> Arc<TransactionalMemory> {
527 self.mem.clone()
528 }
529
530 pub(crate) fn verify_primary_checksums(mem: Arc<TransactionalMemory>) -> Result<bool> {
531 let table_tree = TableTree::new(
532 mem.get_data_root(),
533 PageHint::None,
534 Arc::new(TransactionGuard::fake()),
535 mem.clone(),
536 )?;
537 if !table_tree.verify_checksums()? {
538 return Ok(false);
539 }
540 let system_table_tree = TableTree::new(
541 mem.get_system_root(),
542 PageHint::None,
543 Arc::new(TransactionGuard::fake()),
544 mem.clone(),
545 )?;
546 if !system_table_tree.verify_checksums()? {
547 return Ok(false);
548 }
549
550 Ok(true)
551 }
552
553 pub fn check_integrity(&mut self) -> Result<bool, DatabaseError> {
563 let allocator_hash = self.mem.allocator_hash();
564 let mut was_clean = Arc::get_mut(&mut self.mem)
565 .unwrap()
566 .clear_cache_and_reload()?;
567
568 let old_roots = [self.mem.get_data_root(), self.mem.get_system_root()];
569
570 let new_roots = Self::do_repair(&mut self.mem, &|_| {}).map_err(|err| match err {
571 DatabaseError::Storage(storage_err) => storage_err,
572 _ => unreachable!(),
573 })?;
574
575 if old_roots != new_roots || allocator_hash != self.mem.allocator_hash() {
576 was_clean = false;
577 }
578
579 if !was_clean {
580 let next_transaction_id = self.mem.get_last_committed_transaction_id()?.next();
581 let [data_root, system_root] = new_roots;
582 self.mem.commit(
583 data_root,
584 system_root,
585 next_transaction_id,
586 true,
587 ShrinkPolicy::Never,
588 )?;
589 }
590
591 self.mem.begin_writable()?;
592
593 Ok(was_clean)
594 }
595
596 pub fn compact(&mut self) -> Result<bool, CompactionError> {
600 if self
601 .transaction_tracker
602 .oldest_live_read_transaction()
603 .is_some()
604 {
605 return Err(CompactionError::TransactionInProgress);
606 }
607 let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
612 if txn.list_persistent_savepoints()?.next().is_some() {
613 return Err(CompactionError::PersistentSavepointExists);
614 }
615 if self.transaction_tracker.any_savepoint_exists() {
616 return Err(CompactionError::EphemeralSavepointExists);
617 }
618 txn.set_two_phase_commit(true);
619 txn.commit().map_err(|e| e.into_storage_error())?;
620 let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
622 txn.set_two_phase_commit(true);
623 txn.commit().map_err(|e| e.into_storage_error())?;
624 let txn = self.begin_write().map_err(|e| e.into_storage_error())?;
627 assert!(!txn.pending_free_pages()?);
628 txn.abort()?;
629
630 let mut compacted = false;
631 loop {
633 let mut progress = false;
634
635 let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
636 if txn.compact_pages()? {
637 progress = true;
638 txn.commit().map_err(|e| e.into_storage_error())?;
639 } else {
640 txn.abort()?;
641 }
642
643 let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
645 txn.set_two_phase_commit(true);
646 txn.set_shrink_policy(ShrinkPolicy::Maximum);
648 txn.commit().map_err(|e| e.into_storage_error())?;
649 let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
653 txn.set_two_phase_commit(true);
654 txn.set_shrink_policy(ShrinkPolicy::Maximum);
656 txn.commit().map_err(|e| e.into_storage_error())?;
657 let txn = self.begin_write().map_err(|e| e.into_storage_error())?;
658 assert!(!txn.pending_free_pages()?);
659 txn.abort()?;
660
661 if !progress {
662 break;
663 }
664
665 compacted = true;
666 }
667
668 Ok(compacted)
669 }
670
671 #[cfg_attr(not(debug_assertions), expect(dead_code))]
672 fn check_repaired_allocated_pages_table(
673 system_root: Option<BtreeHeader>,
674 mem: Arc<TransactionalMemory>,
675 ) -> Result {
676 let table_tree = TableTree::new(
677 system_root,
678 PageHint::None,
679 Arc::new(TransactionGuard::fake()),
680 mem.clone(),
681 )?;
682 if let Some(table_def) = table_tree
683 .get_table::<TransactionIdWithPagination, PageList>(
684 DATA_ALLOCATED_TABLE.name(),
685 TableType::Normal,
686 )
687 .map_err(|e| e.into_storage_error_or_corrupted("Allocated pages table corrupted"))?
688 {
689 let InternalTableDefinition::Normal { table_root, .. } = table_def else {
690 unreachable!()
691 };
692 let table: ReadOnlyTable<TransactionIdWithPagination, PageList> = ReadOnlyTable::new(
693 DATA_ALLOCATED_TABLE.name().to_string(),
694 table_root,
695 PageHint::None,
696 Arc::new(TransactionGuard::fake()),
697 mem.clone(),
698 )?;
699 for result in table.range::<TransactionIdWithPagination>(..)? {
700 let (_, pages) = result?;
701 for i in 0..pages.value().len() {
702 assert!(mem.is_allocated(pages.value().get(i)));
703 }
704 }
705 }
706
707 Ok(())
708 }
709
710 fn visit_freed_tree<K: Key, V: Value, F>(
711 system_root: Option<BtreeHeader>,
712 table_def: SystemTableDefinition<K, V>,
713 mem: Arc<TransactionalMemory>,
714 mut visitor: F,
715 ) -> Result
716 where
717 F: FnMut(PageNumber) -> Result,
718 {
719 let fake_guard = Arc::new(TransactionGuard::fake());
720 let system_tree = TableTree::new(system_root, PageHint::None, fake_guard, mem.clone())?;
721 let table_name = table_def.name();
722 let result = match system_tree.get_table::<K, V>(table_name, TableType::Normal) {
723 Ok(result) => result,
724 Err(TableError::Storage(err)) => {
725 return Err(err);
726 }
727 Err(TableError::TableDoesNotExist(_)) => {
728 return Ok(());
729 }
730 Err(_) => {
731 return Err(StorageError::Corrupted(format!(
732 "Unable to open {table_name}"
733 )));
734 }
735 };
736
737 if let Some(definition) = result {
738 let table_root = match definition {
739 InternalTableDefinition::Normal { table_root, .. } => table_root,
740 InternalTableDefinition::Multimap { .. } => unreachable!(),
741 };
742 let table: ReadOnlyTable<TransactionIdWithPagination, PageList<'static>> =
743 ReadOnlyTable::new(
744 table_name.to_string(),
745 table_root,
746 PageHint::None,
747 Arc::new(TransactionGuard::fake()),
748 mem.clone(),
749 )?;
750 for result in table.range::<TransactionIdWithPagination>(..)? {
751 let (_, page_list) = result?;
752 for i in 0..page_list.value().len() {
753 visitor(page_list.value().get(i))?;
754 }
755 }
756 }
757
758 Ok(())
759 }
760
761 #[cfg(debug_assertions)]
762 fn mark_allocated_page_for_debug(
763 mem: &mut Arc<TransactionalMemory>, ) -> Result {
765 let data_root = mem.get_data_root();
766 {
767 eprintln!("[MARK_DEBUG] Visiting data tree, root: {data_root:?}");
768 let fake = Arc::new(TransactionGuard::fake());
769 let tables = TableTree::new(data_root, PageHint::None, fake, mem.clone())?;
770 tables.visit_all_pages(|path| {
771 eprintln!(
772 "[MARK_DEBUG] Data tree: marking page {:?}",
773 path.page_number()
774 );
775 mem.mark_debug_allocated_page(path.page_number());
776 Ok(())
777 })?;
778 }
779
780 let system_root = mem.get_system_root();
781 {
782 eprintln!("[MARK_DEBUG] Visiting system tree, root: {system_root:?}");
783 let fake = Arc::new(TransactionGuard::fake());
784 let system_tables = TableTree::new(system_root, PageHint::None, fake, mem.clone())?;
785 system_tables.visit_all_pages(|path| {
786 eprintln!(
787 "[MARK_DEBUG] System tree: marking page {:?}",
788 path.page_number()
789 );
790 mem.mark_debug_allocated_page(path.page_number());
791 Ok(())
792 })?;
793 }
794
795 eprintln!("[MARK_DEBUG] Visiting DATA_FREED_TABLE");
796 Self::visit_freed_tree(system_root, DATA_FREED_TABLE, mem.clone(), |page| {
797 eprintln!("[MARK_DEBUG] DATA_FREED_TABLE: marking page {page:?}");
798 mem.mark_debug_allocated_page(page);
799 Ok(())
800 })?;
801 eprintln!("[MARK_DEBUG] Visiting SYSTEM_FREED_TABLE");
802 Self::visit_freed_tree(system_root, SYSTEM_FREED_TABLE, mem.clone(), |page| {
803 eprintln!("[MARK_DEBUG] SYSTEM_FREED_TABLE: marking page {page:?}");
804 mem.mark_debug_allocated_page(page);
805 Ok(())
806 })?;
807
808 Ok(())
809 }
810
811 fn do_repair(
812 mem: &mut Arc<TransactionalMemory>, repair_callback: &(dyn Fn(&mut RepairSession) + 'static),
814 ) -> Result<[Option<BtreeHeader>; 2], DatabaseError> {
815 if !Self::verify_primary_checksums(mem.clone())? {
816 if mem.used_two_phase_commit() {
817 return Err(DatabaseError::Storage(StorageError::Corrupted(
818 "Primary is corrupted despite 2-phase commit".to_string(),
819 )));
820 }
821
822 let mut handle = RepairSession::new(0.3);
824 repair_callback(&mut handle);
825 if handle.aborted() {
826 return Err(DatabaseError::RepairAborted);
827 }
828
829 mem.repair_primary_corrupted();
830 mem.clear_read_cache();
834 if !Self::verify_primary_checksums(mem.clone())? {
835 return Err(DatabaseError::Storage(StorageError::Corrupted(
836 "Failed to repair database. All roots are corrupted".to_string(),
837 )));
838 }
839 }
840 let mut handle = RepairSession::new(0.6);
842 repair_callback(&mut handle);
843 if handle.aborted() {
844 return Err(DatabaseError::RepairAborted);
845 }
846
847 mem.begin_repair()?;
848
849 let data_root = mem.get_data_root();
850 {
851 let fake = Arc::new(TransactionGuard::fake());
852 let tables = TableTree::new(data_root, PageHint::None, fake, mem.clone())?;
853 tables.visit_all_pages(|path| {
854 mem.mark_page_allocated(path.page_number());
855 Ok(())
856 })?;
857 }
858
859 let mut handle = RepairSession::new(0.9);
861 repair_callback(&mut handle);
862 if handle.aborted() {
863 return Err(DatabaseError::RepairAborted);
864 }
865
866 let system_root = mem.get_system_root();
867 {
868 let fake = Arc::new(TransactionGuard::fake());
869 let system_tables = TableTree::new(system_root, PageHint::None, fake, mem.clone())?;
870 system_tables.visit_all_pages(|path| {
871 mem.mark_page_allocated(path.page_number());
872 Ok(())
873 })?;
874 }
875
876 Self::visit_freed_tree(system_root, DATA_FREED_TABLE, mem.clone(), |page| {
877 mem.mark_page_allocated(page);
878 Ok(())
879 })?;
880 Self::visit_freed_tree(system_root, SYSTEM_FREED_TABLE, mem.clone(), |page| {
881 mem.mark_page_allocated(page);
882 Ok(())
883 })?;
884 #[cfg(debug_assertions)]
885 {
886 Self::check_repaired_allocated_pages_table(system_root, mem.clone())?;
887 }
888
889 mem.end_repair()?;
890
891 mem.clear_read_cache();
894
895 Ok([data_root, system_root])
896 }
897
898 #[allow(clippy::too_many_arguments)]
899 fn new(
900 file: Box<dyn StorageBackend>,
901 allow_initialize: bool,
902 page_size: usize,
903 region_size: Option<u64>,
904 read_cache_size_bytes: usize,
905 write_cache_size_bytes: usize,
906 repair_callback: &(dyn Fn(&mut RepairSession) + 'static),
907 ) -> Result<Self, DatabaseError> {
908 #[cfg(feature = "logging")]
909 let file_path = format!("{:?}", &file);
910 #[cfg(feature = "logging")]
911 info!("Opening database {:?}", &file_path);
912 let mem = TransactionalMemory::new(
913 file,
914 allow_initialize,
915 page_size,
916 region_size,
917 read_cache_size_bytes,
918 write_cache_size_bytes,
919 false,
920 )?;
921 let mut mem = Arc::new(mem);
922 if let Some(tree) = Self::get_allocator_state_table(&mem)? {
925 #[cfg(feature = "logging")]
926 info!("Found valid allocator state, full repair not needed");
927 mem.load_allocator_state(&tree)?;
928 #[cfg(debug_assertions)]
929 Self::mark_allocated_page_for_debug(&mut mem)?;
930 } else {
931 #[cfg(feature = "logging")]
932 warn!("Database {:?} not shutdown cleanly. Repairing", &file_path);
933 let mut handle = RepairSession::new(0.0);
934 repair_callback(&mut handle);
935 if handle.aborted() {
936 return Err(DatabaseError::RepairAborted);
937 }
938 let [data_root, system_root] = Self::do_repair(&mut mem, repair_callback)?;
939 let next_transaction_id = mem.get_last_committed_transaction_id()?.next();
940 mem.commit(
941 data_root,
942 system_root,
943 next_transaction_id,
944 true,
945 ShrinkPolicy::Never,
946 )?;
947 }
948
949 mem.begin_writable()?;
950 let next_transaction_id = mem.get_last_committed_transaction_id()?.next();
951
952 let db = Database {
953 mem,
954 transaction_tracker: Arc::new(TransactionTracker::new(next_transaction_id)),
955 };
956
957 let txn = db.begin_write().map_err(|e| e.into_storage_error())?;
959 if let Some(next_id) = txn.next_persistent_savepoint_id()? {
960 db.transaction_tracker
961 .restore_savepoint_counter_state(next_id);
962 }
963 for id in txn.list_persistent_savepoints()? {
964 let savepoint = match txn.get_persistent_savepoint(id) {
965 Ok(savepoint) => savepoint,
966 Err(err) => match err {
967 SavepointError::InvalidSavepoint => unreachable!(),
968 SavepointError::Storage(storage) => {
969 return Err(storage.into());
970 }
971 },
972 };
973 db.transaction_tracker
974 .register_persistent_savepoint(&savepoint);
975 }
976 txn.abort()?;
977
978 Ok(db)
979 }
980
981 pub(crate) fn get_allocator_state_table(
982 mem: &Arc<TransactionalMemory>,
983 ) -> Result<Option<AllocatorStateTree>> {
984 if !mem.used_two_phase_commit() {
986 return Ok(None);
987 }
988
989 let system_table_tree = TableTree::new(
991 mem.get_system_root(),
992 PageHint::None,
993 Arc::new(TransactionGuard::fake()),
994 mem.clone(),
995 )?;
996 let Some(allocator_state_table) = system_table_tree
997 .get_table::<AllocatorStateKey, &[u8]>(ALLOCATOR_STATE_TABLE_NAME, TableType::Normal)
998 .map_err(|e| e.into_storage_error_or_corrupted("Unexpected TableError"))?
999 else {
1000 return Ok(None);
1001 };
1002
1003 let InternalTableDefinition::Normal { table_root, .. } = allocator_state_table else {
1005 unreachable!();
1006 };
1007 let tree = AllocatorStateTree::new(
1008 table_root,
1009 PageHint::None,
1010 Arc::new(TransactionGuard::fake()),
1011 mem.clone(),
1012 )?;
1013
1014 if !mem.is_valid_allocator_state(&tree)? {
1016 return Ok(None);
1017 }
1018
1019 Ok(Some(tree))
1020 }
1021
1022 fn allocate_read_transaction(&self) -> Result<TransactionGuard> {
1023 let id = self
1024 .transaction_tracker
1025 .register_read_transaction(&self.mem)?;
1026
1027 Ok(TransactionGuard::new_read(
1028 id,
1029 self.transaction_tracker.clone(),
1030 ))
1031 }
1032
1033 pub fn builder() -> Builder {
1035 Builder::new()
1036 }
1037
1038 pub fn begin_write(&self) -> Result<WriteTransaction, TransactionError> {
1044 self.mem.check_io_errors()?;
1046 let guard = TransactionGuard::new_write(
1047 self.transaction_tracker.start_write_transaction(),
1048 self.transaction_tracker.clone(),
1049 );
1050 WriteTransaction::new(guard, self.transaction_tracker.clone(), self.mem.clone())
1051 .map_err(|e| e.into())
1052 }
1053
1054 fn ensure_allocator_state_table_and_trim(&self) -> Result<(), Error> {
1055 #[cfg(feature = "logging")]
1057 debug!("Writing allocator state table");
1058 eprintln!("[DATABASE_DROP_DEBUG] ensure_allocator_state_table_and_trim: begin_write");
1059 let mut tx = self.begin_write()?;
1060 eprintln!(
1061 "[DATABASE_DROP_DEBUG] ensure_allocator_state_table_and_trim: setting quick_repair"
1062 );
1063 tx.set_quick_repair(true);
1064 tx.set_shrink_policy(ShrinkPolicy::Maximum);
1065 eprintln!("[DATABASE_DROP_DEBUG] ensure_allocator_state_table_and_trim: about to commit");
1066 tx.commit()?;
1067 eprintln!("[DATABASE_DROP_DEBUG] ensure_allocator_state_table_and_trim: commit done");
1068
1069 Ok(())
1070 }
1071}
1072
1073impl Drop for Database {
1074 fn drop(&mut self) {
1075 eprintln!("[DATABASE_DROP_DEBUG] Database::drop called for instance {self:p}");
1076
1077 if !thread::panicking() && self.ensure_allocator_state_table_and_trim().is_err() {
1078 #[cfg(feature = "logging")]
1079 warn!("Failed to write allocator state table. Repair may be required at restart.");
1080 }
1081
1082 eprintln!("[DATABASE_DROP_DEBUG] About to call mem.close()");
1083 if self.mem.close().is_err() {
1084 #[cfg(feature = "logging")]
1085 warn!("Failed to flush database file. Repair may be required at restart.");
1086 }
1087 eprintln!("[DATABASE_DROP_DEBUG] Database::drop completed");
1088 }
1089}
1090
1091pub struct RepairSession {
1092 progress: f64,
1093 aborted: bool,
1094}
1095
1096impl RepairSession {
1097 pub(crate) fn new(progress: f64) -> Self {
1098 Self {
1099 progress,
1100 aborted: false,
1101 }
1102 }
1103
1104 pub(crate) fn aborted(&self) -> bool {
1105 self.aborted
1106 }
1107
1108 pub fn abort(&mut self) {
1110 self.aborted = true;
1111 }
1112
1113 pub fn progress(&self) -> f64 {
1115 self.progress
1116 }
1117}
1118
1119pub struct Builder {
1121 page_size: usize,
1122 region_size: Option<u64>,
1123 read_cache_size_bytes: usize,
1124 write_cache_size_bytes: usize,
1125 repair_callback: Box<dyn Fn(&mut RepairSession)>,
1126}
1127
1128impl Builder {
1129 #[allow(clippy::new_without_default)]
1135 pub fn new() -> Self {
1136 let mut result = Self {
1137 page_size: PAGE_SIZE,
1141 region_size: None,
1142 read_cache_size_bytes: 0,
1144 write_cache_size_bytes: 0,
1146 repair_callback: Box::new(|_| {}),
1147 };
1148
1149 result.set_cache_size(1024 * 1024 * 1024);
1150 result
1151 }
1152
1153 pub fn set_repair_callback(
1161 &mut self,
1162 callback: impl Fn(&mut RepairSession) + 'static,
1163 ) -> &mut Self {
1164 self.repair_callback = Box::new(callback);
1165 self
1166 }
1167
1168 #[cfg(any(fuzzing, test))]
1176 pub fn set_page_size(&mut self, size: usize) -> &mut Self {
1177 assert!(size.is_power_of_two());
1178 self.page_size = std::cmp::max(size, 512);
1179 self
1180 }
1181
1182 pub fn set_cache_size(&mut self, bytes: usize) -> &mut Self {
1184 self.read_cache_size_bytes = bytes / 10 * 9;
1186 self.write_cache_size_bytes = bytes / 10;
1187 self
1188 }
1189
1190 #[cfg(any(test, fuzzing))]
1191 pub fn set_region_size(&mut self, size: u64) -> &mut Self {
1192 assert!(size.is_power_of_two());
1193 self.region_size = Some(size);
1194 self
1195 }
1196
1197 pub fn create(&self, path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
1202 let file = OpenOptions::new()
1203 .read(true)
1204 .write(true)
1205 .create(true)
1206 .truncate(false)
1207 .open(path)?;
1208
1209 Database::new(
1210 Box::new(FileBackend::new(file)?),
1211 true,
1212 self.page_size,
1213 self.region_size,
1214 self.read_cache_size_bytes,
1215 self.write_cache_size_bytes,
1216 &self.repair_callback,
1217 )
1218 }
1219
1220 pub fn open(&self, path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
1222 let file = OpenOptions::new().read(true).write(true).open(path)?;
1223
1224 Database::new(
1225 Box::new(FileBackend::new(file)?),
1226 false,
1227 self.page_size,
1228 None,
1229 self.read_cache_size_bytes,
1230 self.write_cache_size_bytes,
1231 &self.repair_callback,
1232 )
1233 }
1234
1235 pub fn open_read_only(
1241 &self,
1242 path: impl AsRef<Path>,
1243 ) -> Result<ReadOnlyDatabase, DatabaseError> {
1244 let file = OpenOptions::new().read(true).open(path)?;
1245
1246 ReadOnlyDatabase::new(
1247 Box::new(FileBackend::new_internal(file, true)?),
1248 self.page_size,
1249 None,
1250 self.read_cache_size_bytes,
1251 )
1252 }
1253
1254 pub fn create_file(&self, file: File) -> Result<Database, DatabaseError> {
1258 Database::new(
1259 Box::new(FileBackend::new(file)?),
1260 true,
1261 self.page_size,
1262 self.region_size,
1263 self.read_cache_size_bytes,
1264 self.write_cache_size_bytes,
1265 &self.repair_callback,
1266 )
1267 }
1268
1269 pub fn create_with_backend(
1271 &self,
1272 backend: impl StorageBackend,
1273 ) -> Result<Database, DatabaseError> {
1274 Database::new(
1275 Box::new(backend),
1276 true,
1277 self.page_size,
1278 self.region_size,
1279 self.read_cache_size_bytes,
1280 self.write_cache_size_bytes,
1281 &self.repair_callback,
1282 )
1283 }
1284}
1285
1286impl std::fmt::Debug for Database {
1287 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1288 f.debug_struct("Database").finish()
1289 }
1290}
1291
1292#[cfg(test)]
1293mod test {
1294 use crate::backends::FileBackend;
1295 use crate::{
1296 CommitError, Database, DatabaseError, Durability, ReadableTable, StorageBackend,
1297 StorageError, TableDefinition, TransactionError,
1298 };
1299 use std::fs::File;
1300 use std::io::{ErrorKind, Read, Seek, SeekFrom};
1301 use std::sync::atomic::{AtomicU64, Ordering};
1302 use std::sync::Arc;
1303
1304 #[derive(Debug)]
1305 struct FailingBackend {
1306 inner: FileBackend,
1307 countdown: Arc<AtomicU64>,
1308 }
1309
1310 impl FailingBackend {
1311 fn new(backend: FileBackend, countdown: u64) -> Self {
1312 Self {
1313 inner: backend,
1314 countdown: Arc::new(AtomicU64::new(countdown)),
1315 }
1316 }
1317
1318 fn check_countdown(&self) -> Result<(), std::io::Error> {
1319 if self.countdown.load(Ordering::SeqCst) == 0 {
1320 return Err(std::io::Error::from(ErrorKind::Other));
1321 }
1322
1323 Ok(())
1324 }
1325
1326 fn decrement_countdown(&self) -> Result<(), std::io::Error> {
1327 if self
1328 .countdown
1329 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| {
1330 if x > 0 {
1331 Some(x - 1)
1332 } else {
1333 None
1334 }
1335 })
1336 .is_err()
1337 {
1338 return Err(std::io::Error::from(ErrorKind::Other));
1339 }
1340
1341 Ok(())
1342 }
1343 }
1344
1345 impl StorageBackend for FailingBackend {
1346 fn len(&self) -> Result<u64, std::io::Error> {
1347 self.inner.len()
1348 }
1349
1350 fn read(&self, offset: u64, out: &mut [u8]) -> Result<(), std::io::Error> {
1351 self.check_countdown()?;
1352 self.inner.read(offset, out)
1353 }
1354
1355 fn set_len(&self, len: u64) -> Result<(), std::io::Error> {
1356 self.inner.set_len(len)
1357 }
1358
1359 fn sync_data(&self) -> Result<(), std::io::Error> {
1360 self.check_countdown()?;
1361 self.inner.sync_data()
1362 }
1363
1364 fn write(&self, offset: u64, data: &[u8]) -> Result<(), std::io::Error> {
1365 self.decrement_countdown()?;
1366 self.inner.write(offset, data)
1367 }
1368 }
1369
1370 #[test]
1371 fn crash_regression4() {
1372 let tmpfile = crate::create_tempfile();
1373 let (file, path) = tmpfile.into_parts();
1374
1375 let backend = FailingBackend::new(FileBackend::new(file).unwrap(), 20);
1376 let db = Database::builder()
1377 .set_cache_size(12686)
1378 .set_page_size(8 * 1024)
1379 .set_region_size(32 * 4096)
1380 .create_with_backend(backend)
1381 .unwrap();
1382
1383 let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1384
1385 let tx = db.begin_write().unwrap();
1386 let _savepoint = tx.ephemeral_savepoint().unwrap();
1387 let _persistent_savepoint = tx.persistent_savepoint().unwrap();
1388 tx.commit().unwrap();
1389 let tx = db.begin_write().unwrap();
1390 {
1391 let mut table = tx.open_table(table_def).unwrap();
1392 let _ = table.insert_reserve(118821, 360).unwrap();
1393 }
1394 let result = tx.commit();
1395 assert!(result.is_err());
1396
1397 drop(db);
1398 Database::builder()
1399 .set_cache_size(1024 * 1024)
1400 .set_page_size(8 * 1024)
1401 .set_region_size(32 * 4096)
1402 .create(&path)
1403 .unwrap();
1404 }
1405
1406 #[test]
1407 fn transient_io_error() {
1408 let tmpfile = crate::create_tempfile();
1409 let (file, path) = tmpfile.into_parts();
1410
1411 let backend = FailingBackend::new(FileBackend::new(file).unwrap(), u64::MAX);
1412 let countdown = backend.countdown.clone();
1413 let db = Database::builder()
1414 .set_cache_size(0)
1415 .create_with_backend(backend)
1416 .unwrap();
1417
1418 let table_def: TableDefinition<u64, u64> = TableDefinition::new("x");
1419
1420 let tx = db.begin_write().unwrap();
1422 {
1423 let mut table = tx.open_table(table_def).unwrap();
1424 table.insert(0, 0).unwrap();
1425 }
1426 tx.commit().unwrap();
1427 let tx = db.begin_write().unwrap();
1428 {
1429 let mut table = tx.open_table(table_def).unwrap();
1430 table.insert(0, 1).unwrap();
1431 }
1432 tx.commit().unwrap();
1433
1434 let tx = db.begin_write().unwrap();
1435 countdown.store(0, Ordering::SeqCst);
1437 let result = tx.commit().err().unwrap();
1438 assert!(matches!(result, CommitError::Storage(StorageError::Io(_))));
1439 let result = db.begin_write().err().unwrap();
1440 assert!(matches!(
1441 result,
1442 TransactionError::Storage(StorageError::PreviousIo)
1443 ));
1444 countdown.store(u64::MAX, Ordering::SeqCst);
1446 drop(db);
1447
1448 let mut file = File::open(&path).unwrap();
1450 file.seek(SeekFrom::Start(9)).unwrap();
1451 let mut god_byte = vec![0u8];
1452 assert_eq!(file.read(&mut god_byte).unwrap(), 1);
1453 assert_ne!(god_byte[0] & 2, 0);
1454 }
1455
1456 #[test]
1457 fn small_pages() {
1458 let tmpfile = crate::create_tempfile();
1459
1460 let db = Database::builder()
1461 .set_page_size(512)
1462 .create(tmpfile.path())
1463 .unwrap();
1464
1465 let table_definition: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1466 let txn = db.begin_write().unwrap();
1467 {
1468 txn.open_table(table_definition).unwrap();
1469 }
1470 txn.commit().unwrap();
1471 }
1472
1473 #[test]
1474 fn small_pages2() {
1475 let tmpfile = crate::create_tempfile();
1476
1477 let db = Database::builder()
1478 .set_page_size(512)
1479 .create(tmpfile.path())
1480 .unwrap();
1481
1482 let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1483
1484 let mut tx = db.begin_write().unwrap();
1485 tx.set_two_phase_commit(true);
1486 let savepoint0 = tx.ephemeral_savepoint().unwrap();
1487 {
1488 tx.open_table(table_def).unwrap();
1489 }
1490 tx.commit().unwrap();
1491
1492 let mut tx = db.begin_write().unwrap();
1493 tx.set_two_phase_commit(true);
1494 let savepoint1 = tx.ephemeral_savepoint().unwrap();
1495 tx.restore_savepoint(&savepoint0).unwrap();
1496 tx.set_durability(Durability::None).unwrap();
1497 {
1498 let mut t = tx.open_table(table_def).unwrap();
1499 t.insert_reserve(&660503, 489).unwrap().as_mut().fill(0xFF);
1500 assert!(t.remove(&291295).unwrap().is_none());
1501 }
1502 tx.commit().unwrap();
1503
1504 let mut tx = db.begin_write().unwrap();
1505 tx.set_two_phase_commit(true);
1506 tx.restore_savepoint(&savepoint0).unwrap();
1507 {
1508 tx.open_table(table_def).unwrap();
1509 }
1510 tx.commit().unwrap();
1511
1512 let mut tx = db.begin_write().unwrap();
1513 tx.set_two_phase_commit(true);
1514 let savepoint2 = tx.ephemeral_savepoint().unwrap();
1515 drop(savepoint0);
1516 tx.restore_savepoint(&savepoint2).unwrap();
1517 {
1518 let mut t = tx.open_table(table_def).unwrap();
1519 assert!(t.get(&2059).unwrap().is_none());
1520 assert!(t.remove(&145227).unwrap().is_none());
1521 assert!(t.remove(&145227).unwrap().is_none());
1522 }
1523 tx.commit().unwrap();
1524
1525 let mut tx = db.begin_write().unwrap();
1526 tx.set_two_phase_commit(true);
1527 let savepoint3 = tx.ephemeral_savepoint().unwrap();
1528 drop(savepoint1);
1529 tx.restore_savepoint(&savepoint3).unwrap();
1530 {
1531 tx.open_table(table_def).unwrap();
1532 }
1533 tx.commit().unwrap();
1534
1535 let mut tx = db.begin_write().unwrap();
1536 tx.set_two_phase_commit(true);
1537 let savepoint4 = tx.ephemeral_savepoint().unwrap();
1538 drop(savepoint2);
1539 tx.restore_savepoint(&savepoint3).unwrap();
1540 tx.set_durability(Durability::None).unwrap();
1541 {
1542 let mut t = tx.open_table(table_def).unwrap();
1543 assert!(t.remove(&207936).unwrap().is_none());
1544 }
1545 tx.abort().unwrap();
1546
1547 let mut tx = db.begin_write().unwrap();
1548 tx.set_two_phase_commit(true);
1549 let savepoint5 = tx.ephemeral_savepoint().unwrap();
1550 drop(savepoint3);
1551 assert!(tx.restore_savepoint(&savepoint4).is_err());
1552 {
1553 tx.open_table(table_def).unwrap();
1554 }
1555 tx.commit().unwrap();
1556
1557 let mut tx = db.begin_write().unwrap();
1558 tx.set_two_phase_commit(true);
1559 tx.restore_savepoint(&savepoint5).unwrap();
1560 tx.set_durability(Durability::None).unwrap();
1561 {
1562 tx.open_table(table_def).unwrap();
1563 }
1564 tx.commit().unwrap();
1565 }
1566
1567 #[test]
1568 fn small_pages3() {
1569 let tmpfile = crate::create_tempfile();
1570
1571 let db = Database::builder()
1572 .set_page_size(1024)
1573 .create(tmpfile.path())
1574 .unwrap();
1575
1576 let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1577
1578 let mut tx = db.begin_write().unwrap();
1579 let _savepoint0 = tx.ephemeral_savepoint().unwrap();
1580 tx.set_durability(Durability::None).unwrap();
1581 {
1582 let mut t = tx.open_table(table_def).unwrap();
1583 let value = vec![0; 306];
1584 t.insert(&539717, value.as_slice()).unwrap();
1585 }
1586 tx.abort().unwrap();
1587
1588 let mut tx = db.begin_write().unwrap();
1589 let savepoint1 = tx.ephemeral_savepoint().unwrap();
1590 tx.restore_savepoint(&savepoint1).unwrap();
1591 tx.set_durability(Durability::None).unwrap();
1592 {
1593 let mut t = tx.open_table(table_def).unwrap();
1594 let value = vec![0; 2008];
1595 t.insert(&784384, value.as_slice()).unwrap();
1596 }
1597 tx.abort().unwrap();
1598 }
1599
1600 #[test]
1601 fn small_pages4() {
1602 let tmpfile = crate::create_tempfile();
1603
1604 let db = Database::builder()
1605 .set_cache_size(1024 * 1024)
1606 .set_page_size(1024)
1607 .create(tmpfile.path())
1608 .unwrap();
1609
1610 let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1611
1612 let tx = db.begin_write().unwrap();
1613 {
1614 tx.open_table(table_def).unwrap();
1615 }
1616 tx.commit().unwrap();
1617
1618 let tx = db.begin_write().unwrap();
1619 {
1620 let mut t = tx.open_table(table_def).unwrap();
1621 assert!(t.get(&131072).unwrap().is_none());
1622 let value = vec![0xFF; 1130];
1623 t.insert(&42394, value.as_slice()).unwrap();
1624 t.insert_reserve(&744037, 3645).unwrap().as_mut().fill(0xFF);
1625 assert!(t.get(&0).unwrap().is_none());
1626 }
1627 tx.abort().unwrap();
1628
1629 let tx = db.begin_write().unwrap();
1630 {
1631 let mut t = tx.open_table(table_def).unwrap();
1632 t.insert_reserve(&118749, 734).unwrap().as_mut().fill(0xFF);
1633 }
1634 tx.abort().unwrap();
1635 }
1636
1637 #[test]
1638 fn dynamic_shrink() {
1639 let tmpfile = crate::create_tempfile();
1640 let table_definition: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1641 let big_value = vec![0u8; 1024];
1642
1643 let db = Database::builder()
1644 .set_region_size(1024 * 1024)
1645 .create(tmpfile.path())
1646 .unwrap();
1647
1648 let txn = db.begin_write().unwrap();
1649 {
1650 let mut table = txn.open_table(table_definition).unwrap();
1651 for i in 0..2048 {
1652 table.insert(&i, big_value.as_slice()).unwrap();
1653 }
1654 }
1655 txn.commit().unwrap();
1656
1657 let file_size = tmpfile.as_file().metadata().unwrap().len();
1658
1659 let txn = db.begin_write().unwrap();
1660 {
1661 let mut table = txn.open_table(table_definition).unwrap();
1662 for i in 0..2048 {
1663 table.remove(&i).unwrap();
1664 }
1665 }
1666 txn.commit().unwrap();
1667
1668 let txn = db.begin_write().unwrap();
1670 {
1671 let mut table = txn.open_table(table_definition).unwrap();
1672 table.insert(0, [].as_slice()).unwrap();
1673 }
1674 txn.commit().unwrap();
1675 let txn = db.begin_write().unwrap();
1676 {
1677 let mut table = txn.open_table(table_definition).unwrap();
1678 table.remove(0).unwrap();
1679 }
1680 txn.commit().unwrap();
1681 let txn = db.begin_write().unwrap();
1682 txn.commit().unwrap();
1683
1684 let final_file_size = tmpfile.as_file().metadata().unwrap().len();
1685 assert!(final_file_size < file_size);
1686 }
1687
1688 #[test]
1689 fn create_new_db_in_empty_file() {
1690 let tmpfile = crate::create_tempfile();
1691
1692 let _db = Database::builder()
1693 .create_file(tmpfile.into_file())
1694 .unwrap();
1695 }
1696
1697 #[test]
1698 fn open_missing_file() {
1699 let tmpfile = crate::create_tempfile();
1700
1701 let err = Database::builder()
1702 .open(tmpfile.path().with_extension("missing"))
1703 .unwrap_err();
1704
1705 match err {
1706 DatabaseError::Storage(StorageError::Io(err)) if err.kind() == ErrorKind::NotFound => {}
1707 err => panic!("Unexpected error for empty file: {err}"),
1708 }
1709 }
1710
1711 #[test]
1712 fn open_empty_file() {
1713 let tmpfile = crate::create_tempfile();
1714
1715 let err = Database::builder().open(tmpfile.path()).unwrap_err();
1716
1717 match err {
1718 DatabaseError::Storage(StorageError::Io(err))
1719 if err.kind() == ErrorKind::InvalidData => {}
1720 err => panic!("Unexpected error for empty file: {err}"),
1721 }
1722 }
1723}