1use crate::transaction_tracker::{SavepointId, TransactionId, TransactionTracker};
2use crate::tree_store::{
3 AllPageNumbersBtreeIter, BtreeRangeIter, Checksum, FreedPageList, FreedTableKey,
4 InternalTableDefinition, PageHint, PageNumber, RawBtree, SerializedSavepoint, TableTree,
5 TableType, TransactionalMemory, PAGE_SIZE,
6};
7use crate::types::{RedbKey, RedbValue};
8use crate::{
9 CompactionError, DatabaseError, Durability, ReadOnlyTable, ReadableTable, SavepointError,
10 StorageError,
11};
12use crate::{ReadTransaction, Result, WriteTransaction};
13use std::fmt::{Debug, Display, Formatter};
14
15use std::fs::{File, OpenOptions};
16use std::io;
17use std::io::ErrorKind;
18use std::marker::PhantomData;
19use std::ops::RangeFull;
20use std::path::Path;
21#[cfg(not(target_has_atomic = "64"))]
22use portable_atomic::{AtomicU64, Ordering};
23#[cfg(target_has_atomic = "64")]
24use std::sync::atomic::{AtomicU64, Ordering};
25use std::sync::{Arc, Condvar, Mutex};
26
27use crate::error::TransactionError;
28use crate::multimap_table::{parse_subtree_roots, DynamicCollection};
29use crate::sealed::Sealed;
30use crate::transactions::SAVEPOINT_TABLE;
31use crate::tree_store::file_backend::FileBackend;
32#[cfg(feature = "logging")]
33use log::{info, warn};
34
35#[allow(clippy::len_without_is_empty)]
36pub trait StorageBackend: 'static + Debug + Send + Sync {
38 fn len(&self) -> std::result::Result<u64, io::Error>;
40
41 fn read(&self, offset: u64, len: usize) -> std::result::Result<Vec<u8>, io::Error>;
45
46 fn set_len(&self, len: u64) -> std::result::Result<(), io::Error>;
50
51 fn sync_data(&self, eventual: bool) -> std::result::Result<(), io::Error>;
57
58 fn write(&self, offset: u64, data: &[u8]) -> std::result::Result<(), io::Error>;
60}
61
62struct AtomicTransactionId {
63 inner: AtomicU64,
64}
65
66impl AtomicTransactionId {
67 fn new(last_id: TransactionId) -> Self {
68 Self {
69 inner: AtomicU64::new(last_id.raw_id()),
70 }
71 }
72
73 fn next(&self) -> TransactionId {
74 let id = self.inner.fetch_add(1, Ordering::AcqRel);
75 TransactionId::new(id)
76 }
77}
78
79pub trait TableHandle: Sealed {
80 fn name(&self) -> &str;
82}
83
84#[derive(Clone)]
85pub struct UntypedTableHandle {
86 name: String,
87}
88
89impl UntypedTableHandle {
90 pub(crate) fn new(name: String) -> Self {
91 Self { name }
92 }
93}
94
95impl TableHandle for UntypedTableHandle {
96 fn name(&self) -> &str {
97 &self.name
98 }
99}
100
101impl Sealed for UntypedTableHandle {}
102
103pub trait MultimapTableHandle: Sealed {
104 fn name(&self) -> &str;
106}
107
108#[derive(Clone)]
109pub struct UntypedMultimapTableHandle {
110 name: String,
111}
112
113impl UntypedMultimapTableHandle {
114 pub(crate) fn new(name: String) -> Self {
115 Self { name }
116 }
117}
118
119impl MultimapTableHandle for UntypedMultimapTableHandle {
120 fn name(&self) -> &str {
121 &self.name
122 }
123}
124
125impl Sealed for UntypedMultimapTableHandle {}
126
127pub struct TableDefinition<'a, K: RedbKey + 'static, V: RedbValue + 'static> {
134 name: &'a str,
135 _key_type: PhantomData<K>,
136 _value_type: PhantomData<V>,
137}
138
139impl<'a, K: RedbKey + 'static, V: RedbValue + 'static> TableDefinition<'a, K, V> {
140 pub const fn new(name: &'a str) -> Self {
146 assert!(!name.is_empty());
147 Self {
148 name,
149 _key_type: PhantomData,
150 _value_type: PhantomData,
151 }
152 }
153}
154
155impl<'a, K: RedbKey + 'static, V: RedbValue + 'static> TableHandle for TableDefinition<'a, K, V> {
156 fn name(&self) -> &str {
157 self.name
158 }
159}
160
161impl<K: RedbKey, V: RedbValue> Sealed for TableDefinition<'_, K, V> {}
162
163impl<'a, K: RedbKey + 'static, V: RedbValue + 'static> Clone for TableDefinition<'a, K, V> {
164 fn clone(&self) -> Self {
165 *self
166 }
167}
168
169impl<'a, K: RedbKey + 'static, V: RedbValue + 'static> Copy for TableDefinition<'a, K, V> {}
170
171impl<'a, K: RedbKey + 'static, V: RedbValue + 'static> Display for TableDefinition<'a, K, V> {
172 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
173 write!(
174 f,
175 "{}<{}, {}>",
176 self.name,
177 K::type_name().name(),
178 V::type_name().name()
179 )
180 }
181}
182
183pub struct MultimapTableDefinition<'a, K: RedbKey + 'static, V: RedbKey + 'static> {
192 name: &'a str,
193 _key_type: PhantomData<K>,
194 _value_type: PhantomData<V>,
195}
196
197impl<'a, K: RedbKey + 'static, V: RedbKey + 'static> MultimapTableDefinition<'a, K, V> {
198 pub const fn new(name: &'a str) -> Self {
199 assert!(!name.is_empty());
200 Self {
201 name,
202 _key_type: PhantomData,
203 _value_type: PhantomData,
204 }
205 }
206}
207
208impl<'a, K: RedbKey + 'static, V: RedbKey + 'static> MultimapTableHandle
209 for MultimapTableDefinition<'a, K, V>
210{
211 fn name(&self) -> &str {
212 self.name
213 }
214}
215
216impl<K: RedbKey, V: RedbKey> Sealed for MultimapTableDefinition<'_, K, V> {}
217
218impl<'a, K: RedbKey + 'static, V: RedbKey + 'static> Clone for MultimapTableDefinition<'a, K, V> {
219 fn clone(&self) -> Self {
220 *self
221 }
222}
223
224impl<'a, K: RedbKey + 'static, V: RedbKey + 'static> Copy for MultimapTableDefinition<'a, K, V> {}
225
226impl<'a, K: RedbKey + 'static, V: RedbKey + 'static> Display for MultimapTableDefinition<'a, K, V> {
227 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
228 write!(
229 f,
230 "{}<{}, {}>",
231 self.name,
232 K::type_name().name(),
233 V::type_name().name()
234 )
235 }
236}
237
238pub struct Database {
269 mem: TransactionalMemory,
270 next_transaction_id: AtomicTransactionId,
271 transaction_tracker: Arc<Mutex<TransactionTracker>>,
272 live_write_transaction: Mutex<Option<TransactionId>>,
273 live_write_transaction_available: Condvar,
274}
275
276impl Database {
277 pub fn create(path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
282 Self::builder().create(path)
283 }
284
285 pub fn open(path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
287 Self::builder().open(path)
288 }
289
290 pub(crate) fn start_write_transaction(&self) -> TransactionId {
291 let mut live_write_transaction = self.live_write_transaction.lock().unwrap();
292 while live_write_transaction.is_some() {
293 live_write_transaction = self
294 .live_write_transaction_available
295 .wait(live_write_transaction)
296 .unwrap();
297 }
298 assert!(live_write_transaction.is_none());
299 let transaction_id = self.next_transaction_id.next();
300 #[cfg(feature = "logging")]
301 info!("Beginning write transaction id={:?}", transaction_id);
302 *live_write_transaction = Some(transaction_id);
303
304 transaction_id
305 }
306
307 pub(crate) fn end_write_transaction(&self, id: TransactionId) {
308 let mut live_write_transaction = self.live_write_transaction.lock().unwrap();
309 assert_eq!(live_write_transaction.unwrap(), id);
310 *live_write_transaction = None;
311 self.live_write_transaction_available.notify_one();
312 }
313
314 pub(crate) fn get_memory(&self) -> &TransactionalMemory {
315 &self.mem
316 }
317
318 pub(crate) fn verify_primary_checksums(mem: &TransactionalMemory) -> Result<bool> {
319 let fake_freed_pages = Arc::new(Mutex::new(vec![]));
320 let table_tree = TableTree::new(mem.get_data_root(), mem, fake_freed_pages.clone());
321 if !table_tree.verify_checksums()? {
322 return Ok(false);
323 }
324 let system_table_tree =
325 TableTree::new(mem.get_system_root(), mem, fake_freed_pages.clone());
326 if !system_table_tree.verify_checksums()? {
327 return Ok(false);
328 }
329 assert!(fake_freed_pages.lock().unwrap().is_empty());
330
331 if let Some((freed_root, freed_checksum)) = mem.get_freed_root() {
332 if !RawBtree::new(
333 Some((freed_root, freed_checksum)),
334 FreedTableKey::fixed_width(),
335 FreedPageList::fixed_width(),
336 mem,
337 )
338 .verify_checksum()?
339 {
340 return Ok(false);
341 }
342 }
343
344 Ok(true)
345 }
346
347 pub fn check_integrity(&mut self) -> Result<bool> {
357 let allocator_hash = self.mem.allocator_hash();
358 let mut was_clean = self.mem.clear_cache_and_reload()?;
359
360 if !Self::verify_primary_checksums(&self.mem)? {
361 was_clean = false;
362 }
363
364 Self::do_repair(&mut self.mem, &|_| {}).map_err(|err| match err {
365 DatabaseError::Storage(storage_err) => storage_err,
366 _ => unreachable!(),
367 })?;
368 if allocator_hash != self.mem.allocator_hash() {
369 was_clean = false;
370 }
371 self.mem.begin_writable()?;
372
373 Ok(was_clean)
374 }
375
376 pub fn compact(&mut self) -> Result<bool, CompactionError> {
380 let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
383 if txn.list_persistent_savepoints()?.next().is_some() {
384 return Err(CompactionError::PersistentSavepointExists);
385 }
386 if self
387 .transaction_tracker
388 .lock()
389 .unwrap()
390 .any_savepoint_exists()
391 {
392 return Err(CompactionError::EphemeralSavepointExists);
393 }
394 txn.set_durability(Durability::Paranoid);
395 txn.commit().map_err(|e| e.into_storage_error())?;
396 let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
398 txn.set_durability(Durability::Paranoid);
399 txn.commit().map_err(|e| e.into_storage_error())?;
400 assert!(self.mem.get_freed_root().is_none());
403
404 let mut compacted = false;
405 loop {
407 let mut progress = false;
408
409 let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
410 if txn.compact_pages()? {
411 progress = true;
412 txn.commit().map_err(|e| e.into_storage_error())?;
413 } else {
414 txn.abort()?;
415 }
416
417 let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
419 txn.set_durability(Durability::Paranoid);
420 txn.commit().map_err(|e| e.into_storage_error())?;
421 assert!(self.mem.get_freed_root().is_none());
422
423 if !progress {
424 break;
425 } else {
426 compacted = true;
427 }
428 }
429
430 Ok(compacted)
431 }
432
433 fn mark_persistent_savepoints(
434 system_root: Option<(PageNumber, Checksum)>,
435 mem: &TransactionalMemory,
436 oldest_unprocessed_free_transaction: TransactionId,
437 ) -> Result {
438 let freed_list = Arc::new(Mutex::new(vec![]));
439 let table_tree = TableTree::new(system_root, mem, freed_list);
440 let fake_transaction_tracker = Arc::new(Mutex::new(TransactionTracker::new()));
441 if let Some(savepoint_table_def) = table_tree
442 .get_table::<SavepointId, SerializedSavepoint>(
443 SAVEPOINT_TABLE.name(),
444 TableType::Normal,
445 )
446 .map_err(|e| {
447 e.into_storage_error_or_corrupted("Persistent savepoint table corrupted")
448 })?
449 {
450 let savepoint_table: ReadOnlyTable<SavepointId, SerializedSavepoint> =
451 ReadOnlyTable::new(
452 "internal savepoint table".to_string(),
453 savepoint_table_def.get_root(),
454 PageHint::None,
455 mem,
456 )?;
457 for result in savepoint_table.range::<SavepointId>(..)? {
458 let (_, savepoint_data) = result?;
459 let savepoint = savepoint_data
460 .value()
461 .to_savepoint(fake_transaction_tracker.clone());
462 if let Some((root, _)) = savepoint.get_user_root() {
463 Self::mark_tables_recursive(root, mem, true)?;
464 }
465 Self::mark_freed_tree(
466 savepoint.get_freed_root(),
467 mem,
468 oldest_unprocessed_free_transaction,
469 )?;
470 }
471 }
472
473 Ok(())
474 }
475
476 fn mark_freed_tree(
477 freed_root: Option<(PageNumber, Checksum)>,
478 mem: &TransactionalMemory,
479 oldest_unprocessed_free_transaction: TransactionId,
480 ) -> Result {
481 if let Some((root, _)) = freed_root {
482 let freed_pages_iter = AllPageNumbersBtreeIter::new(
483 root,
484 FreedTableKey::fixed_width(),
485 FreedPageList::fixed_width(),
486 mem,
487 )?;
488 mem.mark_pages_allocated(freed_pages_iter, true)?;
489 }
490
491 let freed_table: ReadOnlyTable<FreedTableKey, FreedPageList<'static>> = ReadOnlyTable::new(
492 "internal freed table".to_string(),
493 freed_root,
494 PageHint::None,
495 mem,
496 )?;
497 let lookup_key = FreedTableKey {
498 transaction_id: oldest_unprocessed_free_transaction.raw_id(),
499 pagination_id: 0,
500 };
501 for result in freed_table.range::<FreedTableKey>(lookup_key..)? {
502 let (_, freed_page_list) = result?;
503 let mut freed_page_list_as_vec = vec![];
504 for i in 0..freed_page_list.value().len() {
505 freed_page_list_as_vec.push(Ok(freed_page_list.value().get(i)));
506 }
507 mem.mark_pages_allocated(freed_page_list_as_vec.into_iter(), true)?;
508 }
509
510 Ok(())
511 }
512
513 fn mark_tables_recursive(
514 root: PageNumber,
515 mem: &TransactionalMemory,
516 allow_duplicates: bool,
517 ) -> Result {
518 let master_pages_iter = AllPageNumbersBtreeIter::new(root, None, None, mem)?;
521 mem.mark_pages_allocated(master_pages_iter, allow_duplicates)?;
522
523 let iter: BtreeRangeIter<&str, InternalTableDefinition> =
525 BtreeRangeIter::new::<RangeFull, &str>(&(..), Some(root), mem)?;
526
527 for entry in iter {
529 let definition = entry?.value();
530 if let Some((table_root, _)) = definition.get_root() {
531 match definition.get_type() {
532 TableType::Normal => {
533 let table_pages_iter = AllPageNumbersBtreeIter::new(
534 table_root,
535 definition.get_fixed_key_size(),
536 definition.get_fixed_value_size(),
537 mem,
538 )?;
539 mem.mark_pages_allocated(table_pages_iter, allow_duplicates)?;
540 }
541 TableType::Multimap => {
542 let table_pages_iter = AllPageNumbersBtreeIter::new(
543 table_root,
544 definition.get_fixed_key_size(),
545 DynamicCollection::<()>::fixed_width_with(
546 definition.get_fixed_value_size(),
547 ),
548 mem,
549 )?;
550 mem.mark_pages_allocated(table_pages_iter, allow_duplicates)?;
551
552 let table_pages_iter = AllPageNumbersBtreeIter::new(
553 table_root,
554 definition.get_fixed_key_size(),
555 DynamicCollection::<()>::fixed_width_with(
556 definition.get_fixed_value_size(),
557 ),
558 mem,
559 )?;
560 for table_page in table_pages_iter {
561 let page = mem.get_page(table_page?)?;
562 let subtree_roots = parse_subtree_roots(
563 &page,
564 definition.get_fixed_key_size(),
565 definition.get_fixed_value_size(),
566 );
567 for (sub_root, _) in subtree_roots {
568 let sub_root_iter = AllPageNumbersBtreeIter::new(
569 sub_root,
570 definition.get_fixed_value_size(),
571 <()>::fixed_width(),
572 mem,
573 )?;
574 mem.mark_pages_allocated(sub_root_iter, allow_duplicates)?;
575 }
576 }
577 }
578 }
579 }
580 }
581
582 Ok(())
583 }
584
585 fn do_repair(
586 mem: &mut TransactionalMemory,
587 repair_callback: &(dyn Fn(&mut RepairSession) + 'static),
588 ) -> Result<(), DatabaseError> {
589 if !Self::verify_primary_checksums(mem)? {
590 let mut handle = RepairSession::new(0.3);
592 repair_callback(&mut handle);
593 if handle.aborted() {
594 return Err(DatabaseError::RepairAborted);
595 }
596
597 mem.repair_primary_corrupted();
598 mem.clear_read_cache();
602 if !Self::verify_primary_checksums(mem)? {
603 return Err(DatabaseError::Storage(StorageError::Corrupted(
604 "Failed to repair database. All roots are corrupted".to_string(),
605 )));
606 }
607 }
608 let mut handle = RepairSession::new(0.6);
610 repair_callback(&mut handle);
611 if handle.aborted() {
612 return Err(DatabaseError::RepairAborted);
613 }
614
615 mem.begin_repair()?;
616
617 let data_root = mem.get_data_root();
618 if let Some((root, _)) = data_root {
619 Self::mark_tables_recursive(root, mem, false)?;
620 }
621
622 let freed_root = mem.get_freed_root();
623 Self::mark_freed_tree(freed_root, mem, TransactionId::new(0))?;
625 let freed_table: ReadOnlyTable<FreedTableKey, FreedPageList<'static>> = ReadOnlyTable::new(
626 "internal freed table".to_string(),
627 freed_root,
628 PageHint::None,
629 mem,
630 )?;
631 let oldest_unprocessed_transaction =
634 if let Some(entry) = freed_table.range::<FreedTableKey>(..)?.next() {
635 TransactionId::new(entry?.0.value().transaction_id)
636 } else {
637 mem.get_last_committed_transaction_id()?
638 };
639 drop(freed_table);
640
641 let mut handle = RepairSession::new(0.9);
643 repair_callback(&mut handle);
644 if handle.aborted() {
645 return Err(DatabaseError::RepairAborted);
646 }
647
648 let system_root = mem.get_system_root();
649 if let Some((root, _)) = system_root {
650 Self::mark_tables_recursive(root, mem, false)?;
651 }
652 Self::mark_persistent_savepoints(system_root, mem, oldest_unprocessed_transaction)?;
653
654 mem.end_repair()?;
655
656 mem.clear_read_cache();
659
660 let transaction_id = mem.get_last_committed_transaction_id()?.next();
661 mem.commit(
662 data_root,
663 system_root,
664 freed_root,
665 transaction_id,
666 false,
667 true,
668 )?;
669
670 Ok(())
671 }
672
673 fn new(
674 file: Box<dyn StorageBackend>,
675 page_size: usize,
676 region_size: Option<u64>,
677 read_cache_size_bytes: usize,
678 write_cache_size_bytes: usize,
679 repair_callback: &(dyn Fn(&mut RepairSession) + 'static),
680 ) -> Result<Self, DatabaseError> {
681 #[cfg(feature = "logging")]
682 let file_path = format!("{:?}", &file);
683 #[cfg(feature = "logging")]
684 info!("Opening database {:?}", &file_path);
685 let mut mem = TransactionalMemory::new(
686 file,
687 page_size,
688 region_size,
689 read_cache_size_bytes,
690 write_cache_size_bytes,
691 )?;
692 if mem.needs_repair()? {
693 #[cfg(feature = "logging")]
694 warn!("Database {:?} not shutdown cleanly. Repairing", &file_path);
695 let mut handle = RepairSession::new(0.0);
696 repair_callback(&mut handle);
697 if handle.aborted() {
698 return Err(DatabaseError::RepairAborted);
699 }
700 Self::do_repair(&mut mem, repair_callback)?;
701 }
702
703 mem.begin_writable()?;
704 let next_transaction_id = mem.get_last_committed_transaction_id()?.next();
705
706 let db = Database {
707 mem,
708 next_transaction_id: AtomicTransactionId::new(next_transaction_id),
709 transaction_tracker: Arc::new(Mutex::new(TransactionTracker::new())),
710 live_write_transaction: Mutex::new(None),
711 live_write_transaction_available: Condvar::new(),
712 };
713
714 let txn = db.begin_write().map_err(|e| e.into_storage_error())?;
716 if let Some(next_id) = txn.next_persistent_savepoint_id()? {
717 db.transaction_tracker
718 .lock()
719 .unwrap()
720 .restore_savepoint_counter_state(next_id);
721 }
722 for id in txn.list_persistent_savepoints()? {
723 let savepoint = match txn.get_persistent_savepoint(id) {
724 Ok(savepoint) => savepoint,
725 Err(err) => match err {
726 SavepointError::InvalidSavepoint => unreachable!(),
727 SavepointError::Storage(storage) => {
728 return Err(storage.into());
729 }
730 },
731 };
732 db.transaction_tracker
733 .lock()
734 .unwrap()
735 .register_persistent_savepoint(&savepoint);
736 }
737 txn.abort()?;
738
739 Ok(db)
740 }
741
742 fn allocate_read_transaction(&self) -> Result<TransactionId> {
743 let mut guard = self.transaction_tracker.lock().unwrap();
744 let id = self.mem.get_last_committed_transaction_id()?;
745 guard.register_read_transaction(id);
746
747 Ok(id)
748 }
749
750 pub(crate) fn allocate_savepoint(&self) -> Result<(SavepointId, TransactionId)> {
751 let id = self
752 .transaction_tracker
753 .lock()
754 .unwrap()
755 .allocate_savepoint();
756 Ok((id, self.allocate_read_transaction()?))
757 }
758
759 pub fn builder() -> Builder {
761 Builder::new()
762 }
763
764 pub fn begin_write(&self) -> Result<WriteTransaction, TransactionError> {
770 WriteTransaction::new(self, self.transaction_tracker.clone()).map_err(|e| e.into())
771 }
772
773 pub fn begin_read(&self) -> Result<ReadTransaction, TransactionError> {
781 let id = self.allocate_read_transaction()?;
782 #[cfg(feature = "logging")]
783 info!("Beginning read transaction id={:?}", id);
784 Ok(ReadTransaction::new(
785 self.get_memory(),
786 self.transaction_tracker.clone(),
787 id,
788 ))
789 }
790}
791
792pub struct RepairSession {
793 progress: f64,
794 aborted: bool,
795}
796
797impl RepairSession {
798 pub(crate) fn new(progress: f64) -> Self {
799 Self {
800 progress,
801 aborted: false,
802 }
803 }
804
805 pub(crate) fn aborted(&self) -> bool {
806 self.aborted
807 }
808
809 pub fn abort(&mut self) {
811 self.aborted = true;
812 }
813
814 pub fn progress(&self) -> f64 {
816 self.progress
817 }
818}
819
820pub struct Builder {
822 page_size: usize,
823 region_size: Option<u64>,
824 read_cache_size_bytes: usize,
825 write_cache_size_bytes: usize,
826 repair_callback: Box<dyn Fn(&mut RepairSession)>,
827}
828
829impl Builder {
830 #[allow(clippy::new_without_default)]
836 pub fn new() -> Self {
837 let mut result = Self {
838 page_size: PAGE_SIZE,
842 region_size: None,
843 read_cache_size_bytes: 0,
845 write_cache_size_bytes: 0,
847 repair_callback: Box::new(|_| {}),
848 };
849
850 result.set_cache_size(1024 * 1024 * 1024);
851 result
852 }
853
854 pub fn set_repair_callback(
862 &mut self,
863 callback: impl Fn(&mut RepairSession) + 'static,
864 ) -> &mut Self {
865 self.repair_callback = Box::new(callback);
866 self
867 }
868
869 #[cfg(any(fuzzing, test))]
877 pub fn set_page_size(&mut self, size: usize) -> &mut Self {
878 assert!(size.is_power_of_two());
879 self.page_size = std::cmp::max(size, 512);
880 self
881 }
882
883 pub fn set_cache_size(&mut self, bytes: usize) -> &mut Self {
885 self.read_cache_size_bytes = bytes / 10 * 9;
887 self.write_cache_size_bytes = bytes / 10;
888 self
889 }
890
891 #[cfg(any(test, fuzzing))]
892 pub fn set_region_size(&mut self, size: u64) -> &mut Self {
893 assert!(size.is_power_of_two());
894 self.region_size = Some(size);
895 self
896 }
897
898 pub fn create(&self, path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
903 let file = OpenOptions::new()
904 .read(true)
905 .write(true)
906 .create(true)
907 .open(path)?;
908
909 Database::new(
910 Box::new(FileBackend::new(file)?),
911 self.page_size,
912 self.region_size,
913 self.read_cache_size_bytes,
914 self.write_cache_size_bytes,
915 &self.repair_callback,
916 )
917 }
918
919 pub fn open(&self, path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
921 let file = OpenOptions::new().read(true).write(true).open(path)?;
922
923 if file.metadata()?.len() == 0 {
924 return Err(StorageError::Io(ErrorKind::InvalidData.into()).into());
925 }
926
927 Database::new(
928 Box::new(FileBackend::new(file)?),
929 self.page_size,
930 None,
931 self.read_cache_size_bytes,
932 self.write_cache_size_bytes,
933 &self.repair_callback,
934 )
935 }
936
937 pub fn create_file(&self, file: File) -> Result<Database, DatabaseError> {
941 Database::new(
942 Box::new(FileBackend::new(file)?),
943 self.page_size,
944 self.region_size,
945 self.read_cache_size_bytes,
946 self.write_cache_size_bytes,
947 &self.repair_callback,
948 )
949 }
950
951 pub fn create_with_backend(
953 &self,
954 backend: impl StorageBackend,
955 ) -> Result<Database, DatabaseError> {
956 Database::new(
957 Box::new(backend),
958 self.page_size,
959 self.region_size,
960 self.read_cache_size_bytes,
961 self.write_cache_size_bytes,
962 &self.repair_callback,
963 )
964 }
965}
966
967impl std::fmt::Debug for Database {
968 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
969 f.debug_struct("Database").finish()
970 }
971}
972
973#[cfg(test)]
974mod test {
975 use crate::backends::FileBackend;
976 use crate::{
977 Database, DatabaseError, Durability, ReadableTable, StorageBackend, StorageError,
978 TableDefinition,
979 };
980 use std::io::ErrorKind;
981 #[cfg(not(target_has_atomic = "64"))]
982 use portable_atomic::{AtomicU64, Ordering};
983 #[cfg(target_has_atomic = "64")]
984 use std::sync::atomic::{AtomicU64, Ordering};
985
986 #[derive(Debug)]
987 struct FailingBackend {
988 inner: FileBackend,
989 countdown: AtomicU64,
990 }
991
992 impl FailingBackend {
993 fn new(backend: FileBackend, countdown: u64) -> Self {
994 Self {
995 inner: backend,
996 countdown: AtomicU64::new(countdown),
997 }
998 }
999
1000 fn check_countdown(&self) -> Result<(), std::io::Error> {
1001 if self.countdown.load(Ordering::SeqCst) == 0 {
1002 return Err(std::io::Error::from(ErrorKind::Other));
1003 }
1004
1005 Ok(())
1006 }
1007
1008 fn decrement_countdown(&self) -> Result<(), std::io::Error> {
1009 if self
1010 .countdown
1011 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| {
1012 if x > 0 {
1013 Some(x - 1)
1014 } else {
1015 None
1016 }
1017 })
1018 .is_err()
1019 {
1020 return Err(std::io::Error::from(ErrorKind::Other));
1021 }
1022
1023 Ok(())
1024 }
1025 }
1026
1027 impl StorageBackend for FailingBackend {
1028 fn len(&self) -> Result<u64, std::io::Error> {
1029 self.inner.len()
1030 }
1031
1032 fn read(&self, offset: u64, len: usize) -> Result<Vec<u8>, std::io::Error> {
1033 self.check_countdown()?;
1034 self.inner.read(offset, len)
1035 }
1036
1037 fn set_len(&self, len: u64) -> Result<(), std::io::Error> {
1038 self.inner.set_len(len)
1039 }
1040
1041 fn sync_data(&self, eventual: bool) -> Result<(), std::io::Error> {
1042 self.check_countdown()?;
1043 self.inner.sync_data(eventual)
1044 }
1045
1046 fn write(&self, offset: u64, data: &[u8]) -> Result<(), std::io::Error> {
1047 self.decrement_countdown()?;
1048 self.inner.write(offset, data)
1049 }
1050 }
1051
1052 #[test]
1053 fn crash_regression4() {
1054 let tmpfile = crate::create_tempfile();
1055
1056 let backend = FailingBackend::new(
1057 FileBackend::new(tmpfile.as_file().try_clone().unwrap()).unwrap(),
1058 23,
1059 );
1060 let db = Database::builder()
1061 .set_cache_size(12686)
1062 .set_page_size(8 * 1024)
1063 .set_region_size(32 * 4096)
1064 .create_with_backend(backend)
1065 .unwrap();
1066
1067 let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1068
1069 let tx = db.begin_write().unwrap();
1070 let _savepoint = tx.ephemeral_savepoint().unwrap();
1071 let _persistent_savepoint = tx.persistent_savepoint().unwrap();
1072 tx.commit().unwrap();
1073 let tx = db.begin_write().unwrap();
1074 {
1075 let mut table = tx.open_table(table_def).unwrap();
1076 let _ = table.insert_reserve(118821, 360).unwrap();
1077 }
1078 let result = tx.commit();
1079 assert!(result.is_err());
1080
1081 drop(db);
1082 Database::builder()
1083 .set_cache_size(1024 * 1024)
1084 .set_page_size(8 * 1024)
1085 .set_region_size(32 * 4096)
1086 .create(tmpfile.path())
1087 .unwrap();
1088 }
1089
1090 #[test]
1091 fn small_pages() {
1092 let tmpfile = crate::create_tempfile();
1093
1094 let db = Database::builder()
1095 .set_page_size(512)
1096 .create(tmpfile.path())
1097 .unwrap();
1098
1099 let table_definition: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1100 let txn = db.begin_write().unwrap();
1101 {
1102 txn.open_table(table_definition).unwrap();
1103 }
1104 txn.commit().unwrap();
1105 }
1106
1107 #[test]
1108 fn small_pages2() {
1109 let tmpfile = crate::create_tempfile();
1110
1111 let db = Database::builder()
1112 .set_page_size(512)
1113 .create(tmpfile.path())
1114 .unwrap();
1115
1116 let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1117
1118 let mut tx = db.begin_write().unwrap();
1119 tx.set_durability(Durability::Paranoid);
1120 let savepoint0 = tx.ephemeral_savepoint().unwrap();
1121 {
1122 tx.open_table(table_def).unwrap();
1123 }
1124 tx.commit().unwrap();
1125
1126 let mut tx = db.begin_write().unwrap();
1127 tx.set_durability(Durability::Paranoid);
1128 let savepoint1 = tx.ephemeral_savepoint().unwrap();
1129 tx.restore_savepoint(&savepoint0).unwrap();
1130 tx.set_durability(Durability::None);
1131 {
1132 let mut t = tx.open_table(table_def).unwrap();
1133 t.insert_reserve(&660503, 489).unwrap().as_mut().fill(0xFF);
1134 assert!(t.remove(&291295).unwrap().is_none());
1135 }
1136 tx.commit().unwrap();
1137
1138 let mut tx = db.begin_write().unwrap();
1139 tx.set_durability(Durability::Paranoid);
1140 tx.restore_savepoint(&savepoint0).unwrap();
1141 {
1142 tx.open_table(table_def).unwrap();
1143 }
1144 tx.commit().unwrap();
1145
1146 let mut tx = db.begin_write().unwrap();
1147 tx.set_durability(Durability::Paranoid);
1148 let savepoint2 = tx.ephemeral_savepoint().unwrap();
1149 drop(savepoint0);
1150 tx.restore_savepoint(&savepoint2).unwrap();
1151 {
1152 let mut t = tx.open_table(table_def).unwrap();
1153 assert!(t.get(&2059).unwrap().is_none());
1154 assert!(t.remove(&145227).unwrap().is_none());
1155 assert!(t.remove(&145227).unwrap().is_none());
1156 }
1157 tx.commit().unwrap();
1158
1159 let mut tx = db.begin_write().unwrap();
1160 tx.set_durability(Durability::Paranoid);
1161 let savepoint3 = tx.ephemeral_savepoint().unwrap();
1162 drop(savepoint1);
1163 tx.restore_savepoint(&savepoint3).unwrap();
1164 {
1165 tx.open_table(table_def).unwrap();
1166 }
1167 tx.commit().unwrap();
1168
1169 let mut tx = db.begin_write().unwrap();
1170 tx.set_durability(Durability::Paranoid);
1171 let savepoint4 = tx.ephemeral_savepoint().unwrap();
1172 drop(savepoint2);
1173 tx.restore_savepoint(&savepoint3).unwrap();
1174 tx.set_durability(Durability::None);
1175 {
1176 let mut t = tx.open_table(table_def).unwrap();
1177 assert!(t.remove(&207936).unwrap().is_none());
1178 }
1179 tx.abort().unwrap();
1180
1181 let mut tx = db.begin_write().unwrap();
1182 tx.set_durability(Durability::Paranoid);
1183 let savepoint5 = tx.ephemeral_savepoint().unwrap();
1184 drop(savepoint3);
1185 assert!(tx.restore_savepoint(&savepoint4).is_err());
1186 {
1187 tx.open_table(table_def).unwrap();
1188 }
1189 tx.commit().unwrap();
1190
1191 let mut tx = db.begin_write().unwrap();
1192 tx.set_durability(Durability::Paranoid);
1193 tx.restore_savepoint(&savepoint5).unwrap();
1194 tx.set_durability(Durability::None);
1195 {
1196 tx.open_table(table_def).unwrap();
1197 }
1198 tx.commit().unwrap();
1199 }
1200
1201 #[test]
1202 fn small_pages3() {
1203 let tmpfile = crate::create_tempfile();
1204
1205 let db = Database::builder()
1206 .set_page_size(1024)
1207 .create(tmpfile.path())
1208 .unwrap();
1209
1210 let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1211
1212 let mut tx = db.begin_write().unwrap();
1213 let _savepoint0 = tx.ephemeral_savepoint().unwrap();
1214 tx.set_durability(Durability::None);
1215 {
1216 let mut t = tx.open_table(table_def).unwrap();
1217 let value = vec![0; 306];
1218 t.insert(&539717, value.as_slice()).unwrap();
1219 }
1220 tx.abort().unwrap();
1221
1222 let mut tx = db.begin_write().unwrap();
1223 let savepoint1 = tx.ephemeral_savepoint().unwrap();
1224 tx.restore_savepoint(&savepoint1).unwrap();
1225 tx.set_durability(Durability::None);
1226 {
1227 let mut t = tx.open_table(table_def).unwrap();
1228 let value = vec![0; 2008];
1229 t.insert(&784384, value.as_slice()).unwrap();
1230 }
1231 tx.abort().unwrap();
1232 }
1233
1234 #[test]
1235 fn small_pages4() {
1236 let tmpfile = crate::create_tempfile();
1237
1238 let db = Database::builder()
1239 .set_cache_size(1024 * 1024)
1240 .set_page_size(1024)
1241 .create(tmpfile.path())
1242 .unwrap();
1243
1244 let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1245
1246 let tx = db.begin_write().unwrap();
1247 {
1248 tx.open_table(table_def).unwrap();
1249 }
1250 tx.commit().unwrap();
1251
1252 let tx = db.begin_write().unwrap();
1253 {
1254 let mut t = tx.open_table(table_def).unwrap();
1255 assert!(t.get(&131072).unwrap().is_none());
1256 let value = vec![0xFF; 1130];
1257 t.insert(&42394, value.as_slice()).unwrap();
1258 t.insert_reserve(&744037, 3645).unwrap().as_mut().fill(0xFF);
1259 assert!(t.get(&0).unwrap().is_none());
1260 }
1261 tx.abort().unwrap();
1262
1263 let tx = db.begin_write().unwrap();
1264 {
1265 let mut t = tx.open_table(table_def).unwrap();
1266 t.insert_reserve(&118749, 734).unwrap().as_mut().fill(0xFF);
1267 }
1268 tx.abort().unwrap();
1269 }
1270
1271 #[test]
1272 fn dynamic_shrink() {
1273 let tmpfile = crate::create_tempfile();
1274 let table_definition: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1275 let big_value = vec![0u8; 1024];
1276
1277 let db = Database::builder()
1278 .set_region_size(1024 * 1024)
1279 .create(tmpfile.path())
1280 .unwrap();
1281
1282 let txn = db.begin_write().unwrap();
1283 {
1284 let mut table = txn.open_table(table_definition).unwrap();
1285 for i in 0..2048 {
1286 table.insert(&i, big_value.as_slice()).unwrap();
1287 }
1288 }
1289 txn.commit().unwrap();
1290
1291 let file_size = tmpfile.as_file().metadata().unwrap().len();
1292
1293 let txn = db.begin_write().unwrap();
1294 {
1295 let mut table = txn.open_table(table_definition).unwrap();
1296 for i in 0..2048 {
1297 table.remove(&i).unwrap();
1298 }
1299 }
1300 txn.commit().unwrap();
1301
1302 let txn = db.begin_write().unwrap();
1304 {
1305 let mut table = txn.open_table(table_definition).unwrap();
1306 table.insert(0, [].as_slice()).unwrap();
1307 }
1308 txn.commit().unwrap();
1309 let txn = db.begin_write().unwrap();
1310 {
1311 let mut table = txn.open_table(table_definition).unwrap();
1312 table.remove(0).unwrap();
1313 }
1314 txn.commit().unwrap();
1315 let txn = db.begin_write().unwrap();
1316 txn.commit().unwrap();
1317
1318 let final_file_size = tmpfile.as_file().metadata().unwrap().len();
1319 assert!(final_file_size < file_size);
1320 }
1321
1322 #[test]
1323 fn create_new_db_in_empty_file() {
1324 let tmpfile = crate::create_tempfile();
1325
1326 let _db = Database::builder()
1327 .create_file(tmpfile.into_file())
1328 .unwrap();
1329 }
1330
1331 #[test]
1332 fn open_missing_file() {
1333 let tmpfile = crate::create_tempfile();
1334
1335 let err = Database::builder()
1336 .open(tmpfile.path().with_extension("missing"))
1337 .unwrap_err();
1338
1339 match err {
1340 DatabaseError::Storage(StorageError::Io(err)) if err.kind() == ErrorKind::NotFound => {}
1341 err => panic!("Unexpected error for empty file: {err}"),
1342 }
1343 }
1344
1345 #[test]
1346 fn open_empty_file() {
1347 let tmpfile = crate::create_tempfile();
1348
1349 let err = Database::builder().open(tmpfile.path()).unwrap_err();
1350
1351 match err {
1352 DatabaseError::Storage(StorageError::Io(err))
1353 if err.kind() == ErrorKind::InvalidData => {}
1354 err => panic!("Unexpected error for empty file: {err}"),
1355 }
1356 }
1357}