1use crate::transaction_tracker::{SavepointId, TransactionId, TransactionTracker};
2use crate::tree_store::{
3 AllPageNumbersBtreeIter, BtreeRangeIter, Checksum, FreedPageList, FreedTableKey,
4 InternalTableDefinition, PageHint, PageNumber, RawBtree, SerializedSavepoint, TableTree,
5 TableType, TransactionalMemory, PAGE_SIZE,
6};
7use crate::types::{RedbKey, RedbValue};
8use crate::{
9 CompactionError, DatabaseError, Durability, ReadOnlyTable, ReadableTable, SavepointError,
10 StorageError,
11};
12use crate::{ReadTransaction, Result, WriteTransaction};
13use std::fmt::{Debug, Display, Formatter};
14
15use std::fs::{File, OpenOptions};
16use std::io;
17use std::io::ErrorKind;
18use std::marker::PhantomData;
19use std::ops::RangeFull;
20use std::path::Path;
21use std::sync::atomic::{AtomicU64, Ordering};
22use std::sync::{Arc, Condvar, Mutex};
23
24use crate::error::TransactionError;
25use crate::multimap_table::{parse_subtree_roots, DynamicCollection};
26use crate::sealed::Sealed;
27use crate::transactions::SAVEPOINT_TABLE;
28use crate::tree_store::file_backend::FileBackend;
29#[cfg(feature = "logging")]
30use log::{info, warn};
31
32#[allow(clippy::len_without_is_empty)]
33pub trait StorageBackend: 'static + Debug + Send + Sync {
35 fn len(&self) -> std::result::Result<u64, io::Error>;
37
38 fn read(&self, offset: u64, len: usize) -> std::result::Result<Vec<u8>, io::Error>;
42
43 fn set_len(&self, len: u64) -> std::result::Result<(), io::Error>;
47
48 fn sync_data(&self, eventual: bool) -> std::result::Result<(), io::Error>;
54
55 fn write(&self, offset: u64, data: &[u8]) -> std::result::Result<(), io::Error>;
57}
58
59struct AtomicTransactionId {
60 inner: AtomicU64,
61}
62
63impl AtomicTransactionId {
64 fn new(last_id: TransactionId) -> Self {
65 Self {
66 inner: AtomicU64::new(last_id.raw_id()),
67 }
68 }
69
70 fn next(&self) -> TransactionId {
71 let id = self.inner.fetch_add(1, Ordering::AcqRel);
72 TransactionId::new(id)
73 }
74}
75
76pub trait TableHandle: Sealed {
77 fn name(&self) -> &str;
79}
80
81#[derive(Clone)]
82pub struct UntypedTableHandle {
83 name: String,
84}
85
86impl UntypedTableHandle {
87 pub(crate) fn new(name: String) -> Self {
88 Self { name }
89 }
90}
91
92impl TableHandle for UntypedTableHandle {
93 fn name(&self) -> &str {
94 &self.name
95 }
96}
97
98impl Sealed for UntypedTableHandle {}
99
100pub trait MultimapTableHandle: Sealed {
101 fn name(&self) -> &str;
103}
104
105#[derive(Clone)]
106pub struct UntypedMultimapTableHandle {
107 name: String,
108}
109
110impl UntypedMultimapTableHandle {
111 pub(crate) fn new(name: String) -> Self {
112 Self { name }
113 }
114}
115
116impl MultimapTableHandle for UntypedMultimapTableHandle {
117 fn name(&self) -> &str {
118 &self.name
119 }
120}
121
122impl Sealed for UntypedMultimapTableHandle {}
123
124pub struct TableDefinition<'a, K: RedbKey + 'static, V: RedbValue + 'static> {
131 name: &'a str,
132 _key_type: PhantomData<K>,
133 _value_type: PhantomData<V>,
134}
135
136impl<'a, K: RedbKey + 'static, V: RedbValue + 'static> TableDefinition<'a, K, V> {
137 pub const fn new(name: &'a str) -> Self {
143 assert!(!name.is_empty());
144 Self {
145 name,
146 _key_type: PhantomData,
147 _value_type: PhantomData,
148 }
149 }
150}
151
152impl<'a, K: RedbKey + 'static, V: RedbValue + 'static> TableHandle for TableDefinition<'a, K, V> {
153 fn name(&self) -> &str {
154 self.name
155 }
156}
157
158impl<K: RedbKey, V: RedbValue> Sealed for TableDefinition<'_, K, V> {}
159
160impl<'a, K: RedbKey + 'static, V: RedbValue + 'static> Clone for TableDefinition<'a, K, V> {
161 fn clone(&self) -> Self {
162 *self
163 }
164}
165
166impl<'a, K: RedbKey + 'static, V: RedbValue + 'static> Copy for TableDefinition<'a, K, V> {}
167
168impl<'a, K: RedbKey + 'static, V: RedbValue + 'static> Display for TableDefinition<'a, K, V> {
169 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
170 write!(
171 f,
172 "{}<{}, {}>",
173 self.name,
174 K::type_name().name(),
175 V::type_name().name()
176 )
177 }
178}
179
180pub struct MultimapTableDefinition<'a, K: RedbKey + 'static, V: RedbKey + 'static> {
189 name: &'a str,
190 _key_type: PhantomData<K>,
191 _value_type: PhantomData<V>,
192}
193
194impl<'a, K: RedbKey + 'static, V: RedbKey + 'static> MultimapTableDefinition<'a, K, V> {
195 pub const fn new(name: &'a str) -> Self {
196 assert!(!name.is_empty());
197 Self {
198 name,
199 _key_type: PhantomData,
200 _value_type: PhantomData,
201 }
202 }
203}
204
205impl<'a, K: RedbKey + 'static, V: RedbKey + 'static> MultimapTableHandle
206 for MultimapTableDefinition<'a, K, V>
207{
208 fn name(&self) -> &str {
209 self.name
210 }
211}
212
213impl<K: RedbKey, V: RedbKey> Sealed for MultimapTableDefinition<'_, K, V> {}
214
215impl<'a, K: RedbKey + 'static, V: RedbKey + 'static> Clone for MultimapTableDefinition<'a, K, V> {
216 fn clone(&self) -> Self {
217 *self
218 }
219}
220
221impl<'a, K: RedbKey + 'static, V: RedbKey + 'static> Copy for MultimapTableDefinition<'a, K, V> {}
222
223impl<'a, K: RedbKey + 'static, V: RedbKey + 'static> Display for MultimapTableDefinition<'a, K, V> {
224 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
225 write!(
226 f,
227 "{}<{}, {}>",
228 self.name,
229 K::type_name().name(),
230 V::type_name().name()
231 )
232 }
233}
234
235pub struct Database {
266 mem: TransactionalMemory,
267 next_transaction_id: AtomicTransactionId,
268 transaction_tracker: Arc<Mutex<TransactionTracker>>,
269 live_write_transaction: Mutex<Option<TransactionId>>,
270 live_write_transaction_available: Condvar,
271}
272
273impl Database {
274 pub fn create(path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
279 Self::builder().create(path)
280 }
281
282 pub fn open(path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
284 Self::builder().open(path)
285 }
286
287 pub(crate) fn start_write_transaction(&self) -> TransactionId {
288 let mut live_write_transaction = self.live_write_transaction.lock().unwrap();
289 while live_write_transaction.is_some() {
290 live_write_transaction = self
291 .live_write_transaction_available
292 .wait(live_write_transaction)
293 .unwrap();
294 }
295 assert!(live_write_transaction.is_none());
296 let transaction_id = self.next_transaction_id.next();
297 #[cfg(feature = "logging")]
298 info!("Beginning write transaction id={:?}", transaction_id);
299 *live_write_transaction = Some(transaction_id);
300
301 transaction_id
302 }
303
304 pub(crate) fn end_write_transaction(&self, id: TransactionId) {
305 let mut live_write_transaction = self.live_write_transaction.lock().unwrap();
306 assert_eq!(live_write_transaction.unwrap(), id);
307 *live_write_transaction = None;
308 self.live_write_transaction_available.notify_one();
309 }
310
311 pub(crate) fn get_memory(&self) -> &TransactionalMemory {
312 &self.mem
313 }
314
315 pub(crate) fn verify_primary_checksums(mem: &TransactionalMemory) -> Result<bool> {
316 let fake_freed_pages = Arc::new(Mutex::new(vec![]));
317 let table_tree = TableTree::new(mem.get_data_root(), mem, fake_freed_pages.clone());
318 if !table_tree.verify_checksums()? {
319 return Ok(false);
320 }
321 let system_table_tree =
322 TableTree::new(mem.get_system_root(), mem, fake_freed_pages.clone());
323 if !system_table_tree.verify_checksums()? {
324 return Ok(false);
325 }
326 assert!(fake_freed_pages.lock().unwrap().is_empty());
327
328 if let Some((freed_root, freed_checksum)) = mem.get_freed_root() {
329 if !RawBtree::new(
330 Some((freed_root, freed_checksum)),
331 FreedTableKey::fixed_width(),
332 FreedPageList::fixed_width(),
333 mem,
334 )
335 .verify_checksum()?
336 {
337 return Ok(false);
338 }
339 }
340
341 Ok(true)
342 }
343
344 pub fn check_integrity(&mut self) -> Result<bool> {
354 let allocator_hash = self.mem.allocator_hash();
355 let mut was_clean = self.mem.clear_cache_and_reload()?;
356
357 if !Self::verify_primary_checksums(&self.mem)? {
358 was_clean = false;
359 }
360
361 Self::do_repair(&mut self.mem, &|_| {}).map_err(|err| match err {
362 DatabaseError::Storage(storage_err) => storage_err,
363 _ => unreachable!(),
364 })?;
365 if allocator_hash != self.mem.allocator_hash() {
366 was_clean = false;
367 }
368 self.mem.begin_writable()?;
369
370 Ok(was_clean)
371 }
372
373 pub fn compact(&mut self) -> Result<bool, CompactionError> {
377 let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
380 if txn.list_persistent_savepoints()?.next().is_some() {
381 return Err(CompactionError::PersistentSavepointExists);
382 }
383 if self
384 .transaction_tracker
385 .lock()
386 .unwrap()
387 .any_savepoint_exists()
388 {
389 return Err(CompactionError::EphemeralSavepointExists);
390 }
391 txn.set_durability(Durability::Paranoid);
392 txn.commit().map_err(|e| e.into_storage_error())?;
393 let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
395 txn.set_durability(Durability::Paranoid);
396 txn.commit().map_err(|e| e.into_storage_error())?;
397 assert!(self.mem.get_freed_root().is_none());
400
401 let mut compacted = false;
402 loop {
404 let mut progress = false;
405
406 let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
407 if txn.compact_pages()? {
408 progress = true;
409 txn.commit().map_err(|e| e.into_storage_error())?;
410 } else {
411 txn.abort()?;
412 }
413
414 let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
416 txn.set_durability(Durability::Paranoid);
417 txn.commit().map_err(|e| e.into_storage_error())?;
418 assert!(self.mem.get_freed_root().is_none());
419
420 if !progress {
421 break;
422 } else {
423 compacted = true;
424 }
425 }
426
427 Ok(compacted)
428 }
429
430 fn mark_persistent_savepoints(
431 system_root: Option<(PageNumber, Checksum)>,
432 mem: &TransactionalMemory,
433 oldest_unprocessed_free_transaction: TransactionId,
434 ) -> Result {
435 let freed_list = Arc::new(Mutex::new(vec![]));
436 let table_tree = TableTree::new(system_root, mem, freed_list);
437 let fake_transaction_tracker = Arc::new(Mutex::new(TransactionTracker::new()));
438 if let Some(savepoint_table_def) = table_tree
439 .get_table::<SavepointId, SerializedSavepoint>(
440 SAVEPOINT_TABLE.name(),
441 TableType::Normal,
442 )
443 .map_err(|e| {
444 e.into_storage_error_or_corrupted("Persistent savepoint table corrupted")
445 })?
446 {
447 let savepoint_table: ReadOnlyTable<SavepointId, SerializedSavepoint> =
448 ReadOnlyTable::new(
449 "internal savepoint table".to_string(),
450 savepoint_table_def.get_root(),
451 PageHint::None,
452 mem,
453 )?;
454 for result in savepoint_table.range::<SavepointId>(..)? {
455 let (_, savepoint_data) = result?;
456 let savepoint = savepoint_data
457 .value()
458 .to_savepoint(fake_transaction_tracker.clone());
459 if let Some((root, _)) = savepoint.get_user_root() {
460 Self::mark_tables_recursive(root, mem, true)?;
461 }
462 Self::mark_freed_tree(
463 savepoint.get_freed_root(),
464 mem,
465 oldest_unprocessed_free_transaction,
466 )?;
467 }
468 }
469
470 Ok(())
471 }
472
473 fn mark_freed_tree(
474 freed_root: Option<(PageNumber, Checksum)>,
475 mem: &TransactionalMemory,
476 oldest_unprocessed_free_transaction: TransactionId,
477 ) -> Result {
478 if let Some((root, _)) = freed_root {
479 let freed_pages_iter = AllPageNumbersBtreeIter::new(
480 root,
481 FreedTableKey::fixed_width(),
482 FreedPageList::fixed_width(),
483 mem,
484 )?;
485 mem.mark_pages_allocated(freed_pages_iter, true)?;
486 }
487
488 let freed_table: ReadOnlyTable<FreedTableKey, FreedPageList<'static>> = ReadOnlyTable::new(
489 "internal freed table".to_string(),
490 freed_root,
491 PageHint::None,
492 mem,
493 )?;
494 let lookup_key = FreedTableKey {
495 transaction_id: oldest_unprocessed_free_transaction.raw_id(),
496 pagination_id: 0,
497 };
498 for result in freed_table.range::<FreedTableKey>(lookup_key..)? {
499 let (_, freed_page_list) = result?;
500 let mut freed_page_list_as_vec = vec![];
501 for i in 0..freed_page_list.value().len() {
502 freed_page_list_as_vec.push(Ok(freed_page_list.value().get(i)));
503 }
504 mem.mark_pages_allocated(freed_page_list_as_vec.into_iter(), true)?;
505 }
506
507 Ok(())
508 }
509
510 fn mark_tables_recursive(
511 root: PageNumber,
512 mem: &TransactionalMemory,
513 allow_duplicates: bool,
514 ) -> Result {
515 let master_pages_iter = AllPageNumbersBtreeIter::new(root, None, None, mem)?;
518 mem.mark_pages_allocated(master_pages_iter, allow_duplicates)?;
519
520 let iter: BtreeRangeIter<&str, InternalTableDefinition> =
522 BtreeRangeIter::new::<RangeFull, &str>(&(..), Some(root), mem)?;
523
524 for entry in iter {
526 let definition = entry?.value();
527 if let Some((table_root, _)) = definition.get_root() {
528 match definition.get_type() {
529 TableType::Normal => {
530 let table_pages_iter = AllPageNumbersBtreeIter::new(
531 table_root,
532 definition.get_fixed_key_size(),
533 definition.get_fixed_value_size(),
534 mem,
535 )?;
536 mem.mark_pages_allocated(table_pages_iter, allow_duplicates)?;
537 }
538 TableType::Multimap => {
539 let table_pages_iter = AllPageNumbersBtreeIter::new(
540 table_root,
541 definition.get_fixed_key_size(),
542 DynamicCollection::<()>::fixed_width_with(
543 definition.get_fixed_value_size(),
544 ),
545 mem,
546 )?;
547 mem.mark_pages_allocated(table_pages_iter, allow_duplicates)?;
548
549 let table_pages_iter = AllPageNumbersBtreeIter::new(
550 table_root,
551 definition.get_fixed_key_size(),
552 DynamicCollection::<()>::fixed_width_with(
553 definition.get_fixed_value_size(),
554 ),
555 mem,
556 )?;
557 for table_page in table_pages_iter {
558 let page = mem.get_page(table_page?)?;
559 let subtree_roots = parse_subtree_roots(
560 &page,
561 definition.get_fixed_key_size(),
562 definition.get_fixed_value_size(),
563 );
564 for (sub_root, _) in subtree_roots {
565 let sub_root_iter = AllPageNumbersBtreeIter::new(
566 sub_root,
567 definition.get_fixed_value_size(),
568 <()>::fixed_width(),
569 mem,
570 )?;
571 mem.mark_pages_allocated(sub_root_iter, allow_duplicates)?;
572 }
573 }
574 }
575 }
576 }
577 }
578
579 Ok(())
580 }
581
582 fn do_repair(
583 mem: &mut TransactionalMemory,
584 repair_callback: &(dyn Fn(&mut RepairSession) + 'static),
585 ) -> Result<(), DatabaseError> {
586 if !Self::verify_primary_checksums(mem)? {
587 let mut handle = RepairSession::new(0.3);
589 repair_callback(&mut handle);
590 if handle.aborted() {
591 return Err(DatabaseError::RepairAborted);
592 }
593
594 mem.repair_primary_corrupted();
595 mem.clear_read_cache();
599 if !Self::verify_primary_checksums(mem)? {
600 return Err(DatabaseError::Storage(StorageError::Corrupted(
601 "Failed to repair database. All roots are corrupted".to_string(),
602 )));
603 }
604 }
605 let mut handle = RepairSession::new(0.6);
607 repair_callback(&mut handle);
608 if handle.aborted() {
609 return Err(DatabaseError::RepairAborted);
610 }
611
612 mem.begin_repair()?;
613
614 let data_root = mem.get_data_root();
615 if let Some((root, _)) = data_root {
616 Self::mark_tables_recursive(root, mem, false)?;
617 }
618
619 let freed_root = mem.get_freed_root();
620 Self::mark_freed_tree(freed_root, mem, TransactionId::new(0))?;
622 let freed_table: ReadOnlyTable<FreedTableKey, FreedPageList<'static>> = ReadOnlyTable::new(
623 "internal freed table".to_string(),
624 freed_root,
625 PageHint::None,
626 mem,
627 )?;
628 let oldest_unprocessed_transaction =
631 if let Some(entry) = freed_table.range::<FreedTableKey>(..)?.next() {
632 TransactionId::new(entry?.0.value().transaction_id)
633 } else {
634 mem.get_last_committed_transaction_id()?
635 };
636 drop(freed_table);
637
638 let mut handle = RepairSession::new(0.9);
640 repair_callback(&mut handle);
641 if handle.aborted() {
642 return Err(DatabaseError::RepairAborted);
643 }
644
645 let system_root = mem.get_system_root();
646 if let Some((root, _)) = system_root {
647 Self::mark_tables_recursive(root, mem, false)?;
648 }
649 Self::mark_persistent_savepoints(system_root, mem, oldest_unprocessed_transaction)?;
650
651 mem.end_repair()?;
652
653 mem.clear_read_cache();
656
657 let transaction_id = mem.get_last_committed_transaction_id()?.next();
658 mem.commit(
659 data_root,
660 system_root,
661 freed_root,
662 transaction_id,
663 false,
664 true,
665 )?;
666
667 Ok(())
668 }
669
670 fn new(
671 file: Box<dyn StorageBackend>,
672 page_size: usize,
673 region_size: Option<u64>,
674 read_cache_size_bytes: usize,
675 write_cache_size_bytes: usize,
676 repair_callback: &(dyn Fn(&mut RepairSession) + 'static),
677 ) -> Result<Self, DatabaseError> {
678 #[cfg(feature = "logging")]
679 let file_path = format!("{:?}", &file);
680 #[cfg(feature = "logging")]
681 info!("Opening database {:?}", &file_path);
682 let mut mem = TransactionalMemory::new(
683 file,
684 page_size,
685 region_size,
686 read_cache_size_bytes,
687 write_cache_size_bytes,
688 )?;
689 if mem.needs_repair()? {
690 #[cfg(feature = "logging")]
691 warn!("Database {:?} not shutdown cleanly. Repairing", &file_path);
692 let mut handle = RepairSession::new(0.0);
693 repair_callback(&mut handle);
694 if handle.aborted() {
695 return Err(DatabaseError::RepairAborted);
696 }
697 Self::do_repair(&mut mem, repair_callback)?;
698 }
699
700 mem.begin_writable()?;
701 let next_transaction_id = mem.get_last_committed_transaction_id()?.next();
702
703 let db = Database {
704 mem,
705 next_transaction_id: AtomicTransactionId::new(next_transaction_id),
706 transaction_tracker: Arc::new(Mutex::new(TransactionTracker::new())),
707 live_write_transaction: Mutex::new(None),
708 live_write_transaction_available: Condvar::new(),
709 };
710
711 let txn = db.begin_write().map_err(|e| e.into_storage_error())?;
713 if let Some(next_id) = txn.next_persistent_savepoint_id()? {
714 db.transaction_tracker
715 .lock()
716 .unwrap()
717 .restore_savepoint_counter_state(next_id);
718 }
719 for id in txn.list_persistent_savepoints()? {
720 let savepoint = match txn.get_persistent_savepoint(id) {
721 Ok(savepoint) => savepoint,
722 Err(err) => match err {
723 SavepointError::InvalidSavepoint => unreachable!(),
724 SavepointError::Storage(storage) => {
725 return Err(storage.into());
726 }
727 },
728 };
729 db.transaction_tracker
730 .lock()
731 .unwrap()
732 .register_persistent_savepoint(&savepoint);
733 }
734 txn.abort()?;
735
736 Ok(db)
737 }
738
739 fn allocate_read_transaction(&self) -> Result<TransactionId> {
740 let mut guard = self.transaction_tracker.lock().unwrap();
741 let id = self.mem.get_last_committed_transaction_id()?;
742 guard.register_read_transaction(id);
743
744 Ok(id)
745 }
746
747 pub(crate) fn allocate_savepoint(&self) -> Result<(SavepointId, TransactionId)> {
748 let id = self
749 .transaction_tracker
750 .lock()
751 .unwrap()
752 .allocate_savepoint();
753 Ok((id, self.allocate_read_transaction()?))
754 }
755
756 pub fn builder() -> Builder {
758 Builder::new()
759 }
760
761 pub fn begin_write(&self) -> Result<WriteTransaction, TransactionError> {
767 WriteTransaction::new(self, self.transaction_tracker.clone()).map_err(|e| e.into())
768 }
769
770 pub fn begin_read(&self) -> Result<ReadTransaction, TransactionError> {
778 let id = self.allocate_read_transaction()?;
779 #[cfg(feature = "logging")]
780 info!("Beginning read transaction id={:?}", id);
781 Ok(ReadTransaction::new(
782 self.get_memory(),
783 self.transaction_tracker.clone(),
784 id,
785 ))
786 }
787}
788
789pub struct RepairSession {
790 progress: f64,
791 aborted: bool,
792}
793
794impl RepairSession {
795 pub(crate) fn new(progress: f64) -> Self {
796 Self {
797 progress,
798 aborted: false,
799 }
800 }
801
802 pub(crate) fn aborted(&self) -> bool {
803 self.aborted
804 }
805
806 pub fn abort(&mut self) {
808 self.aborted = true;
809 }
810
811 pub fn progress(&self) -> f64 {
813 self.progress
814 }
815}
816
817pub struct Builder {
819 page_size: usize,
820 region_size: Option<u64>,
821 read_cache_size_bytes: usize,
822 write_cache_size_bytes: usize,
823 repair_callback: Box<dyn Fn(&mut RepairSession)>,
824}
825
826impl Builder {
827 #[allow(clippy::new_without_default)]
833 pub fn new() -> Self {
834 let mut result = Self {
835 page_size: PAGE_SIZE,
839 region_size: None,
840 read_cache_size_bytes: 0,
842 write_cache_size_bytes: 0,
844 repair_callback: Box::new(|_| {}),
845 };
846
847 result.set_cache_size(1024 * 1024 * 1024);
848 result
849 }
850
851 pub fn set_repair_callback(
859 &mut self,
860 callback: impl Fn(&mut RepairSession) + 'static,
861 ) -> &mut Self {
862 self.repair_callback = Box::new(callback);
863 self
864 }
865
866 #[cfg(any(fuzzing, test))]
874 pub fn set_page_size(&mut self, size: usize) -> &mut Self {
875 assert!(size.is_power_of_two());
876 self.page_size = std::cmp::max(size, 512);
877 self
878 }
879
880 pub fn set_cache_size(&mut self, bytes: usize) -> &mut Self {
882 self.read_cache_size_bytes = bytes / 10 * 9;
884 self.write_cache_size_bytes = bytes / 10;
885 self
886 }
887
888 #[cfg(any(test, fuzzing))]
889 pub fn set_region_size(&mut self, size: u64) -> &mut Self {
890 assert!(size.is_power_of_two());
891 self.region_size = Some(size);
892 self
893 }
894
895 pub fn create(&self, path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
900 let file = OpenOptions::new()
901 .read(true)
902 .write(true)
903 .create(true)
904 .open(path)?;
905
906 Database::new(
907 Box::new(FileBackend::new(file)?),
908 self.page_size,
909 self.region_size,
910 self.read_cache_size_bytes,
911 self.write_cache_size_bytes,
912 &self.repair_callback,
913 )
914 }
915
916 pub fn open(&self, path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
918 let file = OpenOptions::new().read(true).write(true).open(path)?;
919
920 if file.metadata()?.len() == 0 {
921 return Err(StorageError::Io(ErrorKind::InvalidData.into()).into());
922 }
923
924 Database::new(
925 Box::new(FileBackend::new(file)?),
926 self.page_size,
927 None,
928 self.read_cache_size_bytes,
929 self.write_cache_size_bytes,
930 &self.repair_callback,
931 )
932 }
933
934 pub fn create_file(&self, file: File) -> Result<Database, DatabaseError> {
938 Database::new(
939 Box::new(FileBackend::new(file)?),
940 self.page_size,
941 self.region_size,
942 self.read_cache_size_bytes,
943 self.write_cache_size_bytes,
944 &self.repair_callback,
945 )
946 }
947
948 pub fn create_with_backend(
950 &self,
951 backend: impl StorageBackend,
952 ) -> Result<Database, DatabaseError> {
953 Database::new(
954 Box::new(backend),
955 self.page_size,
956 self.region_size,
957 self.read_cache_size_bytes,
958 self.write_cache_size_bytes,
959 &self.repair_callback,
960 )
961 }
962}
963
964impl std::fmt::Debug for Database {
965 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
966 f.debug_struct("Database").finish()
967 }
968}
969
970#[cfg(test)]
971mod test {
972 use crate::backends::FileBackend;
973 use crate::{
974 Database, DatabaseError, Durability, ReadableTable, StorageBackend, StorageError,
975 TableDefinition,
976 };
977 use std::io::ErrorKind;
978 use std::sync::atomic::{AtomicU64, Ordering};
979
980 #[derive(Debug)]
981 struct FailingBackend {
982 inner: FileBackend,
983 countdown: AtomicU64,
984 }
985
986 impl FailingBackend {
987 fn new(backend: FileBackend, countdown: u64) -> Self {
988 Self {
989 inner: backend,
990 countdown: AtomicU64::new(countdown),
991 }
992 }
993
994 fn check_countdown(&self) -> Result<(), std::io::Error> {
995 if self.countdown.load(Ordering::SeqCst) == 0 {
996 return Err(std::io::Error::from(ErrorKind::Other));
997 }
998
999 Ok(())
1000 }
1001
1002 fn decrement_countdown(&self) -> Result<(), std::io::Error> {
1003 if self
1004 .countdown
1005 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| {
1006 if x > 0 {
1007 Some(x - 1)
1008 } else {
1009 None
1010 }
1011 })
1012 .is_err()
1013 {
1014 return Err(std::io::Error::from(ErrorKind::Other));
1015 }
1016
1017 Ok(())
1018 }
1019 }
1020
1021 impl StorageBackend for FailingBackend {
1022 fn len(&self) -> Result<u64, std::io::Error> {
1023 self.inner.len()
1024 }
1025
1026 fn read(&self, offset: u64, len: usize) -> Result<Vec<u8>, std::io::Error> {
1027 self.check_countdown()?;
1028 self.inner.read(offset, len)
1029 }
1030
1031 fn set_len(&self, len: u64) -> Result<(), std::io::Error> {
1032 self.inner.set_len(len)
1033 }
1034
1035 fn sync_data(&self, eventual: bool) -> Result<(), std::io::Error> {
1036 self.check_countdown()?;
1037 self.inner.sync_data(eventual)
1038 }
1039
1040 fn write(&self, offset: u64, data: &[u8]) -> Result<(), std::io::Error> {
1041 self.decrement_countdown()?;
1042 self.inner.write(offset, data)
1043 }
1044 }
1045
1046 #[test]
1047 fn crash_regression4() {
1048 let tmpfile = crate::create_tempfile();
1049
1050 let backend = FailingBackend::new(
1051 FileBackend::new(tmpfile.as_file().try_clone().unwrap()).unwrap(),
1052 23,
1053 );
1054 let db = Database::builder()
1055 .set_cache_size(12686)
1056 .set_page_size(8 * 1024)
1057 .set_region_size(32 * 4096)
1058 .create_with_backend(backend)
1059 .unwrap();
1060
1061 let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1062
1063 let tx = db.begin_write().unwrap();
1064 let _savepoint = tx.ephemeral_savepoint().unwrap();
1065 let _persistent_savepoint = tx.persistent_savepoint().unwrap();
1066 tx.commit().unwrap();
1067 let tx = db.begin_write().unwrap();
1068 {
1069 let mut table = tx.open_table(table_def).unwrap();
1070 let _ = table.insert_reserve(118821, 360).unwrap();
1071 }
1072 let result = tx.commit();
1073 assert!(result.is_err());
1074
1075 drop(db);
1076 Database::builder()
1077 .set_cache_size(1024 * 1024)
1078 .set_page_size(8 * 1024)
1079 .set_region_size(32 * 4096)
1080 .create(tmpfile.path())
1081 .unwrap();
1082 }
1083
1084 #[test]
1085 fn small_pages() {
1086 let tmpfile = crate::create_tempfile();
1087
1088 let db = Database::builder()
1089 .set_page_size(512)
1090 .create(tmpfile.path())
1091 .unwrap();
1092
1093 let table_definition: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1094 let txn = db.begin_write().unwrap();
1095 {
1096 txn.open_table(table_definition).unwrap();
1097 }
1098 txn.commit().unwrap();
1099 }
1100
1101 #[test]
1102 fn small_pages2() {
1103 let tmpfile = crate::create_tempfile();
1104
1105 let db = Database::builder()
1106 .set_page_size(512)
1107 .create(tmpfile.path())
1108 .unwrap();
1109
1110 let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1111
1112 let mut tx = db.begin_write().unwrap();
1113 tx.set_durability(Durability::Paranoid);
1114 let savepoint0 = tx.ephemeral_savepoint().unwrap();
1115 {
1116 tx.open_table(table_def).unwrap();
1117 }
1118 tx.commit().unwrap();
1119
1120 let mut tx = db.begin_write().unwrap();
1121 tx.set_durability(Durability::Paranoid);
1122 let savepoint1 = tx.ephemeral_savepoint().unwrap();
1123 tx.restore_savepoint(&savepoint0).unwrap();
1124 tx.set_durability(Durability::None);
1125 {
1126 let mut t = tx.open_table(table_def).unwrap();
1127 t.insert_reserve(&660503, 489).unwrap().as_mut().fill(0xFF);
1128 assert!(t.remove(&291295).unwrap().is_none());
1129 }
1130 tx.commit().unwrap();
1131
1132 let mut tx = db.begin_write().unwrap();
1133 tx.set_durability(Durability::Paranoid);
1134 tx.restore_savepoint(&savepoint0).unwrap();
1135 {
1136 tx.open_table(table_def).unwrap();
1137 }
1138 tx.commit().unwrap();
1139
1140 let mut tx = db.begin_write().unwrap();
1141 tx.set_durability(Durability::Paranoid);
1142 let savepoint2 = tx.ephemeral_savepoint().unwrap();
1143 drop(savepoint0);
1144 tx.restore_savepoint(&savepoint2).unwrap();
1145 {
1146 let mut t = tx.open_table(table_def).unwrap();
1147 assert!(t.get(&2059).unwrap().is_none());
1148 assert!(t.remove(&145227).unwrap().is_none());
1149 assert!(t.remove(&145227).unwrap().is_none());
1150 }
1151 tx.commit().unwrap();
1152
1153 let mut tx = db.begin_write().unwrap();
1154 tx.set_durability(Durability::Paranoid);
1155 let savepoint3 = tx.ephemeral_savepoint().unwrap();
1156 drop(savepoint1);
1157 tx.restore_savepoint(&savepoint3).unwrap();
1158 {
1159 tx.open_table(table_def).unwrap();
1160 }
1161 tx.commit().unwrap();
1162
1163 let mut tx = db.begin_write().unwrap();
1164 tx.set_durability(Durability::Paranoid);
1165 let savepoint4 = tx.ephemeral_savepoint().unwrap();
1166 drop(savepoint2);
1167 tx.restore_savepoint(&savepoint3).unwrap();
1168 tx.set_durability(Durability::None);
1169 {
1170 let mut t = tx.open_table(table_def).unwrap();
1171 assert!(t.remove(&207936).unwrap().is_none());
1172 }
1173 tx.abort().unwrap();
1174
1175 let mut tx = db.begin_write().unwrap();
1176 tx.set_durability(Durability::Paranoid);
1177 let savepoint5 = tx.ephemeral_savepoint().unwrap();
1178 drop(savepoint3);
1179 assert!(tx.restore_savepoint(&savepoint4).is_err());
1180 {
1181 tx.open_table(table_def).unwrap();
1182 }
1183 tx.commit().unwrap();
1184
1185 let mut tx = db.begin_write().unwrap();
1186 tx.set_durability(Durability::Paranoid);
1187 tx.restore_savepoint(&savepoint5).unwrap();
1188 tx.set_durability(Durability::None);
1189 {
1190 tx.open_table(table_def).unwrap();
1191 }
1192 tx.commit().unwrap();
1193 }
1194
1195 #[test]
1196 fn small_pages3() {
1197 let tmpfile = crate::create_tempfile();
1198
1199 let db = Database::builder()
1200 .set_page_size(1024)
1201 .create(tmpfile.path())
1202 .unwrap();
1203
1204 let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1205
1206 let mut tx = db.begin_write().unwrap();
1207 let _savepoint0 = tx.ephemeral_savepoint().unwrap();
1208 tx.set_durability(Durability::None);
1209 {
1210 let mut t = tx.open_table(table_def).unwrap();
1211 let value = vec![0; 306];
1212 t.insert(&539717, value.as_slice()).unwrap();
1213 }
1214 tx.abort().unwrap();
1215
1216 let mut tx = db.begin_write().unwrap();
1217 let savepoint1 = tx.ephemeral_savepoint().unwrap();
1218 tx.restore_savepoint(&savepoint1).unwrap();
1219 tx.set_durability(Durability::None);
1220 {
1221 let mut t = tx.open_table(table_def).unwrap();
1222 let value = vec![0; 2008];
1223 t.insert(&784384, value.as_slice()).unwrap();
1224 }
1225 tx.abort().unwrap();
1226 }
1227
1228 #[test]
1229 fn small_pages4() {
1230 let tmpfile = crate::create_tempfile();
1231
1232 let db = Database::builder()
1233 .set_cache_size(1024 * 1024)
1234 .set_page_size(1024)
1235 .create(tmpfile.path())
1236 .unwrap();
1237
1238 let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1239
1240 let tx = db.begin_write().unwrap();
1241 {
1242 tx.open_table(table_def).unwrap();
1243 }
1244 tx.commit().unwrap();
1245
1246 let tx = db.begin_write().unwrap();
1247 {
1248 let mut t = tx.open_table(table_def).unwrap();
1249 assert!(t.get(&131072).unwrap().is_none());
1250 let value = vec![0xFF; 1130];
1251 t.insert(&42394, value.as_slice()).unwrap();
1252 t.insert_reserve(&744037, 3645).unwrap().as_mut().fill(0xFF);
1253 assert!(t.get(&0).unwrap().is_none());
1254 }
1255 tx.abort().unwrap();
1256
1257 let tx = db.begin_write().unwrap();
1258 {
1259 let mut t = tx.open_table(table_def).unwrap();
1260 t.insert_reserve(&118749, 734).unwrap().as_mut().fill(0xFF);
1261 }
1262 tx.abort().unwrap();
1263 }
1264
1265 #[test]
1266 fn dynamic_shrink() {
1267 let tmpfile = crate::create_tempfile();
1268 let table_definition: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
1269 let big_value = vec![0u8; 1024];
1270
1271 let db = Database::builder()
1272 .set_region_size(1024 * 1024)
1273 .create(tmpfile.path())
1274 .unwrap();
1275
1276 let txn = db.begin_write().unwrap();
1277 {
1278 let mut table = txn.open_table(table_definition).unwrap();
1279 for i in 0..2048 {
1280 table.insert(&i, big_value.as_slice()).unwrap();
1281 }
1282 }
1283 txn.commit().unwrap();
1284
1285 let file_size = tmpfile.as_file().metadata().unwrap().len();
1286
1287 let txn = db.begin_write().unwrap();
1288 {
1289 let mut table = txn.open_table(table_definition).unwrap();
1290 for i in 0..2048 {
1291 table.remove(&i).unwrap();
1292 }
1293 }
1294 txn.commit().unwrap();
1295
1296 let txn = db.begin_write().unwrap();
1298 {
1299 let mut table = txn.open_table(table_definition).unwrap();
1300 table.insert(0, [].as_slice()).unwrap();
1301 }
1302 txn.commit().unwrap();
1303 let txn = db.begin_write().unwrap();
1304 {
1305 let mut table = txn.open_table(table_definition).unwrap();
1306 table.remove(0).unwrap();
1307 }
1308 txn.commit().unwrap();
1309 let txn = db.begin_write().unwrap();
1310 txn.commit().unwrap();
1311
1312 let final_file_size = tmpfile.as_file().metadata().unwrap().len();
1313 assert!(final_file_size < file_size);
1314 }
1315
1316 #[test]
1317 fn create_new_db_in_empty_file() {
1318 let tmpfile = crate::create_tempfile();
1319
1320 let _db = Database::builder()
1321 .create_file(tmpfile.into_file())
1322 .unwrap();
1323 }
1324
1325 #[test]
1326 fn open_missing_file() {
1327 let tmpfile = crate::create_tempfile();
1328
1329 let err = Database::builder()
1330 .open(tmpfile.path().with_extension("missing"))
1331 .unwrap_err();
1332
1333 match err {
1334 DatabaseError::Storage(StorageError::Io(err)) if err.kind() == ErrorKind::NotFound => {}
1335 err => panic!("Unexpected error for empty file: {err}"),
1336 }
1337 }
1338
1339 #[test]
1340 fn open_empty_file() {
1341 let tmpfile = crate::create_tempfile();
1342
1343 let err = Database::builder().open(tmpfile.path()).unwrap_err();
1344
1345 match err {
1346 DatabaseError::Storage(StorageError::Io(err))
1347 if err.kind() == ErrorKind::InvalidData => {}
1348 err => panic!("Unexpected error for empty file: {err}"),
1349 }
1350 }
1351}