1use crate::blob_store::{BlobCompactionPolicy, BlobCompactionReport, BlobDedupConfig, BlobStats};
2use crate::cdc::CdcConfig;
3use crate::error::{BackendError, TransactionError};
4#[cfg(feature = "std")]
5use crate::group_commit::{GroupCommitError, GroupCommitter, WriteBatch};
6#[cfg(feature = "metrics")]
7use crate::observer::DbMetrics;
8use crate::observer::{DatabaseObserver, default_observer};
9use crate::sealed::Sealed;
10use crate::transaction_tracker::{TransactionId, TransactionTracker};
11use crate::transactions::{
12 ALLOCATOR_STATE_TABLE_NAME, AllocatorStateKey, AllocatorStateTree, DATA_ALLOCATED_TABLE,
13 DATA_FREED_TABLE, PageList, SYSTEM_FREED_TABLE, SystemTableDefinition,
14 TransactionIdWithPagination,
15};
16#[cfg(feature = "std")]
17use crate::tree_store::ReadOnlyBackend;
18#[cfg(feature = "std")]
19use crate::tree_store::salvage_tree_leaves;
20use crate::tree_store::{
21 Btree, BtreeHeader, CompressionConfig, InternalTableDefinition, PAGE_SIZE, PageHint,
22 PageNumber, ShrinkPolicy, TableTree, TableType, TransactionalMemory,
23};
24use crate::types::{Key, Value};
25use crate::{
26 CommitError, CompactionError, DatabaseError, Error, ReadOnlyTable, SavepointError,
27 StorageError, TableError,
28};
29use crate::{ReadTransaction, Result, WriteTransaction};
30use alloc::boxed::Box;
31use alloc::format;
32use alloc::string::{String, ToString};
33use alloc::sync::Arc;
34#[cfg(feature = "std")]
35use alloc::vec;
36use alloc::vec::Vec;
37use core::fmt::{Debug, Display, Formatter};
38use core::marker::PhantomData;
39
40#[cfg(feature = "std")]
41use std::fs::{File, OpenOptions};
42#[cfg(feature = "std")]
43use std::path::Path;
44#[cfg(feature = "std")]
45use std::time::{Duration, Instant};
46
47#[cfg(feature = "std")]
48use crate::tree_store::file_backend::FileBackend;
49#[cfg(feature = "logging")]
50use log::{debug, info, warn};
51
52#[allow(clippy::len_without_is_empty)]
53pub trait StorageBackend: 'static + Debug + Send + Sync {
55 fn len(&self) -> core::result::Result<u64, BackendError>;
57
58 fn read(&self, offset: u64, out: &mut [u8]) -> core::result::Result<(), BackendError>;
62
63 fn set_len(&self, len: u64) -> core::result::Result<(), BackendError>;
67
68 fn sync_data(&self) -> core::result::Result<(), BackendError>;
70
71 fn write(&self, offset: u64, data: &[u8]) -> core::result::Result<(), BackendError>;
73
74 fn close(&self) -> core::result::Result<(), BackendError> {
79 Ok(())
80 }
81}
82
83pub trait TableHandle: Sealed {
84 fn name(&self) -> &str;
86}
87
88#[derive(Clone)]
89pub struct UntypedTableHandle {
90 name: String,
91}
92
93impl UntypedTableHandle {
94 pub(crate) fn new(name: String) -> Self {
95 Self { name }
96 }
97}
98
99impl TableHandle for UntypedTableHandle {
100 fn name(&self) -> &str {
101 &self.name
102 }
103}
104
105impl Sealed for UntypedTableHandle {}
106
107pub trait MultimapTableHandle: Sealed {
108 fn name(&self) -> &str;
110}
111
112#[derive(Clone)]
113pub struct UntypedMultimapTableHandle {
114 name: String,
115}
116
117impl UntypedMultimapTableHandle {
118 pub(crate) fn new(name: String) -> Self {
119 Self { name }
120 }
121}
122
123impl MultimapTableHandle for UntypedMultimapTableHandle {
124 fn name(&self) -> &str {
125 &self.name
126 }
127}
128
129impl Sealed for UntypedMultimapTableHandle {}
130
131const fn const_starts_with(haystack: &[u8], needle: &[u8]) -> bool {
133 if needle.len() > haystack.len() {
134 return false;
135 }
136 let mut i = 0;
137 while i < needle.len() {
138 if haystack[i] != needle[i] {
139 return false;
140 }
141 i += 1;
142 }
143 true
144}
145
146pub struct TableDefinition<'a, K: Key + 'static, V: Value + 'static> {
153 name: &'a str,
154 _key_type: PhantomData<K>,
155 _value_type: PhantomData<V>,
156}
157
158impl<'a, K: Key + 'static, V: Value + 'static> TableDefinition<'a, K, V> {
159 pub const fn new(name: &'a str) -> Self {
165 assert!(!name.is_empty());
166 assert!(
167 !const_starts_with(name.as_bytes(), b"__ivfpq:"),
168 "table names starting with \"__ivfpq:\" are reserved for internal use"
169 );
170 Self {
171 name,
172 _key_type: PhantomData,
173 _value_type: PhantomData,
174 }
175 }
176
177 pub(crate) const fn new_internal(name: &'a str) -> Self {
180 assert!(!name.is_empty());
181 Self {
182 name,
183 _key_type: PhantomData,
184 _value_type: PhantomData,
185 }
186 }
187}
188
189impl<K: Key + 'static, V: Value + 'static> TableHandle for TableDefinition<'_, K, V> {
190 fn name(&self) -> &str {
191 self.name
192 }
193}
194
195impl<K: Key, V: Value> Sealed for TableDefinition<'_, K, V> {}
196
197impl<K: Key + 'static, V: Value + 'static> Clone for TableDefinition<'_, K, V> {
198 fn clone(&self) -> Self {
199 *self
200 }
201}
202
203impl<K: Key + 'static, V: Value + 'static> Copy for TableDefinition<'_, K, V> {}
204
205impl<K: Key + 'static, V: Value + 'static> Display for TableDefinition<'_, K, V> {
206 fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
207 write!(
208 f,
209 "{}<{}, {}>",
210 self.name,
211 K::type_name().name(),
212 V::type_name().name()
213 )
214 }
215}
216
217pub struct MultimapTableDefinition<'a, K: Key + 'static, V: Key + 'static> {
226 name: &'a str,
227 _key_type: PhantomData<K>,
228 _value_type: PhantomData<V>,
229}
230
231impl<'a, K: Key + 'static, V: Key + 'static> MultimapTableDefinition<'a, K, V> {
232 pub const fn new(name: &'a str) -> Self {
233 assert!(!name.is_empty());
234 assert!(
235 !const_starts_with(name.as_bytes(), b"__ivfpq:"),
236 "table names starting with \"__ivfpq:\" are reserved for internal use"
237 );
238 Self {
239 name,
240 _key_type: PhantomData,
241 _value_type: PhantomData,
242 }
243 }
244}
245
246impl<K: Key + 'static, V: Key + 'static> MultimapTableHandle for MultimapTableDefinition<'_, K, V> {
247 fn name(&self) -> &str {
248 self.name
249 }
250}
251
252impl<K: Key, V: Key> Sealed for MultimapTableDefinition<'_, K, V> {}
253
254impl<K: Key + 'static, V: Key + 'static> Clone for MultimapTableDefinition<'_, K, V> {
255 fn clone(&self) -> Self {
256 *self
257 }
258}
259
260impl<K: Key + 'static, V: Key + 'static> Copy for MultimapTableDefinition<'_, K, V> {}
261
262impl<K: Key + 'static, V: Key + 'static> Display for MultimapTableDefinition<'_, K, V> {
263 fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
264 write!(
265 f,
266 "{}<{}, {}>",
267 self.name,
268 K::type_name().name(),
269 V::type_name().name()
270 )
271 }
272}
273
274#[derive(Debug, Clone, Copy, PartialEq, Eq)]
276pub enum VerifyLevel {
277 Header,
279 Pages,
281 Full,
284}
285
286#[derive(Debug, Clone)]
288pub struct CorruptPageInfo {
289 pub page_number: u64,
291 pub table_name: Option<String>,
293 pub description: String,
295}
296
297#[cfg(feature = "std")]
299#[derive(Debug, Clone)]
300pub struct VerifyReport {
301 pub valid: bool,
303 pub header_valid: bool,
305 pub pages_checked: u64,
307 pub pages_corrupt: u64,
309 pub structural_valid: Option<bool>,
311 pub corrupt_details: Vec<CorruptPageInfo>,
313 pub duration: Duration,
315}
316
317#[cfg(feature = "std")]
319#[derive(Debug, Clone)]
320pub struct SalvageReport {
321 pub tables_found: u64,
323 pub tables_recovered: u64,
325 pub rows_recovered: u64,
327 pub rows_lost: u64,
329 pub corrupt_details: Vec<CorruptPageInfo>,
331 pub duration: Duration,
333}
334
335#[derive(Debug, Clone, Copy)]
337pub struct CompactionProgress {
338 pub pages_relocated: u64,
340 pub complete: bool,
342}
343
344#[derive(Debug, Clone, Copy)]
349pub struct CacheStats {
350 pub(crate) evictions: u64,
351 pub(crate) read_hits: u64,
352 pub(crate) read_misses: u64,
353 pub(crate) write_hits: u64,
354 pub(crate) write_misses: u64,
355 pub(crate) used_bytes: usize,
356 pub(crate) budget_bytes: Option<usize>,
357}
358
359impl CacheStats {
360 pub fn evictions(&self) -> u64 {
364 self.evictions
365 }
366
367 pub fn read_hits(&self) -> u64 {
369 self.read_hits
370 }
371
372 pub fn read_misses(&self) -> u64 {
374 self.read_misses
375 }
376
377 pub fn write_hits(&self) -> u64 {
379 self.write_hits
380 }
381
382 pub fn write_misses(&self) -> u64 {
384 self.write_misses
385 }
386
387 pub fn used_bytes(&self) -> usize {
389 self.used_bytes
390 }
391
392 pub fn budget_bytes(&self) -> Option<usize> {
397 self.budget_bytes
398 }
399}
400
401pub(crate) enum TransactionGuard {
402 Active {
403 transaction_tracker: Arc<TransactionTracker>,
404 transaction_id: Option<TransactionId>,
405 write_transaction: bool,
406 },
407 Verification,
409}
410
411impl TransactionGuard {
412 pub(crate) fn new_read(
413 transaction_id: TransactionId,
414 tracker: Arc<TransactionTracker>,
415 ) -> Self {
416 Self::Active {
417 transaction_tracker: tracker,
418 transaction_id: Some(transaction_id),
419 write_transaction: false,
420 }
421 }
422
423 pub(crate) fn new_write(
424 transaction_id: TransactionId,
425 tracker: Arc<TransactionTracker>,
426 ) -> Self {
427 Self::Active {
428 transaction_tracker: tracker,
429 transaction_id: Some(transaction_id),
430 write_transaction: true,
431 }
432 }
433
434 pub(crate) fn id(&self) -> Result<TransactionId, StorageError> {
435 match self {
436 Self::Active { transaction_id, .. } => transaction_id.ok_or_else(|| {
437 StorageError::Internal(String::from("TransactionGuard::id() called after leak()"))
438 }),
439 Self::Verification => Err(StorageError::Internal(String::from(
440 "TransactionGuard::id() called on Verification guard",
441 ))),
442 }
443 }
444
445 pub(crate) fn leak(&mut self) -> Result<TransactionId, StorageError> {
446 match self {
447 Self::Active { transaction_id, .. } => transaction_id.take().ok_or_else(|| {
448 StorageError::Internal(String::from(
449 "TransactionGuard::leak() called after prior leak()",
450 ))
451 }),
452 Self::Verification => Err(StorageError::Internal(String::from(
453 "TransactionGuard::leak() called on Verification guard",
454 ))),
455 }
456 }
457}
458
459impl Drop for TransactionGuard {
460 fn drop(&mut self) {
461 if let Self::Active {
462 transaction_tracker,
463 transaction_id: Some(transaction_id),
464 write_transaction,
465 } = self
466 {
467 if *write_transaction {
468 let _ = transaction_tracker.end_write_transaction(*transaction_id);
469 } else {
470 let _ = transaction_tracker.deallocate_read_transaction(*transaction_id);
471 }
472 }
473 }
474}
475
476pub trait ReadableDatabase {
477 fn begin_read(&self) -> Result<ReadTransaction, TransactionError>;
485
486 fn cache_stats(&self) -> CacheStats;
490}
491
492pub struct ReadOnlyDatabase {
533 mem: Arc<TransactionalMemory>,
534 transaction_tracker: Arc<TransactionTracker>,
535}
536
537impl ReadableDatabase for ReadOnlyDatabase {
538 fn begin_read(&self) -> Result<ReadTransaction, TransactionError> {
539 let id = self
540 .transaction_tracker
541 .register_read_transaction(&self.mem)?;
542 #[cfg(feature = "logging")]
543 debug!("Beginning read transaction id={id:?}");
544
545 let guard = TransactionGuard::new_read(id, self.transaction_tracker.clone());
546
547 ReadTransaction::new(
548 self.mem.clone(),
549 guard,
550 default_observer(),
551 #[cfg(feature = "metrics")]
552 Arc::new(DbMetrics::new()),
553 )
554 }
555
556 fn cache_stats(&self) -> CacheStats {
557 self.mem.cache_stats()
558 }
559}
560
561impl ReadOnlyDatabase {
562 #[cfg(feature = "std")]
564 pub fn open(path: impl AsRef<Path>) -> Result<ReadOnlyDatabase, DatabaseError> {
565 Builder::new().open_read_only(path)
566 }
567
568 #[allow(clippy::too_many_arguments)]
569 #[cfg(feature = "std")]
570 fn new(
571 file: Box<dyn StorageBackend>,
572 page_size: usize,
573 region_size: Option<u64>,
574 read_cache_size_bytes: usize,
575 compression: CompressionConfig,
576 memory_budget: Option<usize>,
577 read_verification: ReadVerification,
578 read_verification_callback: Option<Arc<ReadVerificationCallback>>,
579 ) -> Result<Self, DatabaseError> {
580 #[cfg(feature = "logging")]
581 let file_path = format!("{:?}", &file);
582 #[cfg(feature = "logging")]
583 info!("Opening database in read-only {:?}", &file_path);
584 let mem = TransactionalMemory::new(
585 Box::new(ReadOnlyBackend::new(file)),
586 false,
587 page_size,
588 region_size,
589 read_cache_size_bytes,
590 0,
591 true,
592 compression,
593 memory_budget,
594 read_verification,
595 read_verification_callback,
596 )?;
597 let mem = Arc::new(mem);
598 if let Some(tree) = Database::get_allocator_state_table(&mem)? {
601 mem.load_allocator_state(&tree)?;
602 } else {
603 #[cfg(feature = "logging")]
604 warn!(
605 "Database {:?} not shutdown cleanly. Repair required",
606 &file_path
607 );
608 return Err(DatabaseError::RepairAborted);
609 }
610
611 if !Database::verify_primary_checksums(mem.clone())? {
613 return Err(DatabaseError::Storage(StorageError::Corrupted(
614 "B-tree checksum verification failed".to_string(),
615 )));
616 }
617
618 let next_transaction_id = mem.get_last_committed_transaction_id()?.next()?;
619 let db = Self {
620 mem,
621 transaction_tracker: Arc::new(TransactionTracker::new(next_transaction_id)),
622 };
623
624 Ok(db)
625 }
626}
627
628#[derive(Debug, Clone)]
664pub struct TransactionInfo {
665 pub transaction_id: u64,
667 pub timestamp_ms: u64,
670}
671
672pub struct Database {
673 mem: Arc<TransactionalMemory>,
674 transaction_tracker: Arc<TransactionTracker>,
675 blob_dedup_config: BlobDedupConfig,
676 cdc_config: CdcConfig,
677 history_retention: u64,
678 blob_compaction_policy: BlobCompactionPolicy,
679 observer: Arc<dyn DatabaseObserver>,
680 #[cfg(feature = "metrics")]
681 db_metrics: Arc<DbMetrics>,
682 #[cfg(feature = "std")]
683 group_committer: GroupCommitter,
684}
685
686impl ReadableDatabase for Database {
687 fn begin_read(&self) -> Result<ReadTransaction, TransactionError> {
688 let guard = self.allocate_read_transaction()?;
689 let txn_id = guard.id().ok();
690 #[cfg(feature = "logging")]
691 debug!("Beginning read transaction id={txn_id:?}");
692 let txn = ReadTransaction::new(
693 self.get_memory(),
694 guard,
695 Arc::clone(&self.observer),
696 #[cfg(feature = "metrics")]
697 Arc::clone(&self.db_metrics),
698 )?;
699 if let Some(id) = txn_id {
700 self.observer.on_read_begin(id.raw_id());
701 #[cfg(feature = "metrics")]
702 self.db_metrics
703 .read_txn_opened
704 .fetch_add(1, portable_atomic::Ordering::Relaxed);
705 }
706 Ok(txn)
707 }
708
709 fn cache_stats(&self) -> CacheStats {
710 self.mem.cache_stats()
711 }
712}
713
714impl Database {
715 #[cfg(feature = "std")]
720 pub fn create(path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
721 Self::builder().create(path)
722 }
723
724 #[cfg(feature = "std")]
726 pub fn open(path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
727 Self::builder().open(path)
728 }
729
730 pub(crate) fn get_memory(&self) -> Arc<TransactionalMemory> {
731 self.mem.clone()
732 }
733
734 pub(crate) fn verify_primary_checksums(mem: Arc<TransactionalMemory>) -> Result<bool> {
735 let table_tree = TableTree::new(
736 mem.get_data_root(),
737 PageHint::None,
738 Arc::new(TransactionGuard::Verification),
739 mem.clone(),
740 )?;
741 if !table_tree.verify_checksums()? {
742 return Ok(false);
743 }
744 let system_table_tree = TableTree::new(
745 mem.get_system_root(),
746 PageHint::None,
747 Arc::new(TransactionGuard::Verification),
748 mem.clone(),
749 )?;
750 if !system_table_tree.verify_checksums()? {
751 return Ok(false);
752 }
753
754 Ok(true)
755 }
756
757 #[cfg(feature = "std")]
759 pub(crate) fn verify_primary_checksums_detailed(
760 mem: Arc<TransactionalMemory>,
761 ) -> Result<(u64, Vec<CorruptPageInfo>)> {
762 let mut total_pages = 0u64;
763 let mut all_corruptions = Vec::new();
764
765 let table_tree = TableTree::new(
766 mem.get_data_root(),
767 PageHint::None,
768 Arc::new(TransactionGuard::Verification),
769 mem.clone(),
770 )?;
771 let (pages, corruptions) = table_tree.verify_checksums_detailed()?;
772 total_pages += pages;
773 all_corruptions.extend(corruptions);
774
775 let system_table_tree = TableTree::new(
776 mem.get_system_root(),
777 PageHint::None,
778 Arc::new(TransactionGuard::Verification),
779 mem.clone(),
780 )?;
781 let (pages, corruptions) = system_table_tree.verify_checksums_detailed()?;
782 total_pages += pages;
783 all_corruptions.extend(corruptions);
784
785 Ok((total_pages, all_corruptions))
786 }
787
788 #[cfg(feature = "std")]
790 pub(crate) fn verify_primary_structure(
791 mem: Arc<TransactionalMemory>,
792 ) -> Result<Vec<CorruptPageInfo>> {
793 let mut all_corruptions = Vec::new();
794
795 let table_tree = TableTree::new(
796 mem.get_data_root(),
797 PageHint::None,
798 Arc::new(TransactionGuard::Verification),
799 mem.clone(),
800 )?;
801 all_corruptions.extend(table_tree.verify_structure_detailed()?);
802
803 let system_table_tree = TableTree::new(
804 mem.get_system_root(),
805 PageHint::None,
806 Arc::new(TransactionGuard::Verification),
807 mem.clone(),
808 )?;
809 all_corruptions.extend(system_table_tree.verify_structure_detailed()?);
810
811 Ok(all_corruptions)
812 }
813
814 #[cfg(feature = "std")]
823 #[allow(clippy::cast_possible_truncation)]
824 pub fn backup(&self, path: impl AsRef<Path>) -> Result<(), StorageError> {
825 use std::io::Write;
826
827 const CHUNK_SIZE: usize = 1024 * 1024; let _read_txn = self.begin_read().map_err(|e| e.into_storage_error())?;
831
832 self.mem.flush_data()?;
834
835 let file_len = self.mem.raw_len()?;
836 let mut dest =
837 File::create(path.as_ref()).map_err(|e| StorageError::Io(BackendError::Io(e)))?;
838 let mut buf = vec![0u8; CHUNK_SIZE];
839 let mut offset = 0u64;
840
841 while offset < file_len {
842 let remaining = (file_len - offset) as usize;
843 let to_read = remaining.min(CHUNK_SIZE);
844 let chunk = &mut buf[..to_read];
845 self.mem.read_raw(offset, chunk)?;
846 dest.write_all(chunk)
847 .map_err(|e| StorageError::Io(BackendError::Io(e)))?;
848 offset += to_read as u64;
849 }
850
851 dest.sync_all()
852 .map_err(|e| StorageError::Io(BackendError::Io(e)))?;
853
854 Ok(())
855 }
856
857 #[cfg(feature = "std")]
869 pub fn verify_backup(
870 path: impl AsRef<Path>,
871 level: VerifyLevel,
872 ) -> core::result::Result<VerifyReport, DatabaseError> {
873 let start = Instant::now();
874 let file = OpenOptions::new().read(true).open(path.as_ref())?;
875 let backend: Box<dyn StorageBackend> = Box::new(
876 crate::tree_store::file_backend::FileBackend::new_internal(file, true)?,
877 );
878
879 let (mem, header_valid) =
880 TransactionalMemory::new_for_verify(backend, PAGE_SIZE, None, CompressionConfig::None)?;
881
882 if level == VerifyLevel::Header {
883 return Ok(VerifyReport {
884 valid: header_valid,
885 header_valid,
886 pages_checked: 0,
887 pages_corrupt: 0,
888 structural_valid: None,
889 corrupt_details: Vec::new(),
890 duration: start.elapsed(),
891 });
892 }
893
894 let mem = Arc::new(mem);
895 let (pages_checked, mut corrupt_details) =
896 Self::verify_primary_checksums_detailed(mem.clone())?;
897 let pages_corrupt = corrupt_details.len() as u64;
898
899 let structural_valid = if level == VerifyLevel::Full {
900 let structural_corruptions = Self::verify_primary_structure(mem)?;
901 if !structural_corruptions.is_empty() {
902 corrupt_details.extend(structural_corruptions);
903 Some(false)
904 } else {
905 Some(true)
906 }
907 } else {
908 None
909 };
910
911 let valid = header_valid && pages_corrupt == 0 && structural_valid.unwrap_or(true);
912
913 Ok(VerifyReport {
914 valid,
915 header_valid,
916 pages_checked,
917 pages_corrupt,
918 structural_valid,
919 corrupt_details,
920 duration: start.elapsed(),
921 })
922 }
923
924 #[cfg(feature = "std")]
936 pub fn salvage(
937 corrupted_path: impl AsRef<Path>,
938 output_path: impl AsRef<Path>,
939 ) -> core::result::Result<SalvageReport, DatabaseError> {
940 let start = Instant::now();
941 let mut corrupt_details: Vec<CorruptPageInfo> = Vec::new();
942 let mut tables_recovered = 0u64;
943 let mut rows_recovered = 0u64;
944 let mut rows_lost = 0u64;
945
946 let file = OpenOptions::new()
948 .read(true)
949 .open(corrupted_path.as_ref())?;
950 let backend: Box<dyn crate::StorageBackend> = Box::new(
951 crate::tree_store::file_backend::FileBackend::new_internal(file, true)?,
952 );
953
954 let (mem, _header_valid) =
955 TransactionalMemory::new_for_verify(backend, PAGE_SIZE, None, CompressionConfig::None)?;
956 let mem = Arc::new(mem);
957
958 let data_root = mem.get_data_root();
960 let table_entries =
961 Self::salvage_discover_tables(mem.clone(), data_root, &mut corrupt_details);
962 let tables_found = table_entries.len() as u64;
963
964 let output_db = Database::builder().create(output_path.as_ref())?;
966
967 for (table_name, definition) in &table_entries {
969 let (table_root, fixed_key_size, fixed_value_size) = match definition {
970 InternalTableDefinition::Normal {
971 table_root,
972 fixed_key_size,
973 fixed_value_size,
974 ..
975 }
976 | InternalTableDefinition::Multimap {
977 table_root,
978 fixed_key_size,
979 fixed_value_size,
980 ..
981 } => (*table_root, *fixed_key_size, *fixed_value_size),
982 };
983
984 let Some(root) = table_root else {
985 continue;
986 };
987
988 let effective_value_size = if mem.compression().is_enabled() {
989 None
990 } else {
991 fixed_value_size
992 };
993
994 let mut pairs: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
995 let mut table_corruptions: Vec<CorruptPageInfo> = Vec::new();
996
997 let table_rows = salvage_tree_leaves(
998 root,
999 &mem,
1000 fixed_key_size,
1001 effective_value_size,
1002 &mut pairs,
1003 &mut table_corruptions,
1004 );
1005
1006 for c in &mut table_corruptions {
1008 c.table_name = Some(table_name.clone());
1009 }
1010
1011 if !pairs.is_empty() {
1012 let leaked_name: &'static str = Box::leak(table_name.clone().into_boxed_str());
1017 let raw_def: TableDefinition<&[u8], &[u8]> = TableDefinition::new(leaked_name);
1018 let write_txn = output_db
1019 .begin_write()
1020 .map_err(|e| DatabaseError::Storage(e.into_storage_error()))?;
1021 {
1022 let mut table = write_txn.open_table(raw_def).map_err(|e| {
1023 DatabaseError::Storage(
1024 e.into_storage_error_or_internal("salvage: open_table"),
1025 )
1026 })?;
1027 for (key, value) in &pairs {
1028 let _ = table.insert(key.as_slice(), value.as_slice());
1029 }
1030 }
1031 write_txn
1032 .commit()
1033 .map_err(|e| DatabaseError::Storage(e.into_storage_error()))?;
1034 tables_recovered += 1;
1035 }
1036
1037 rows_recovered += table_rows;
1038 rows_lost += table_corruptions.len() as u64;
1040 corrupt_details.extend(table_corruptions);
1041 }
1042
1043 Ok(SalvageReport {
1044 tables_found,
1045 tables_recovered,
1046 rows_recovered,
1047 rows_lost,
1048 corrupt_details,
1049 duration: start.elapsed(),
1050 })
1051 }
1052
1053 #[cfg(feature = "std")]
1055 fn salvage_discover_tables(
1056 mem: Arc<TransactionalMemory>,
1057 system_root: Option<BtreeHeader>,
1058 corruptions: &mut Vec<CorruptPageInfo>,
1059 ) -> Vec<(String, InternalTableDefinition)> {
1060 let Some(root) = system_root else {
1061 return Vec::new();
1062 };
1063
1064 let mut raw_pairs: Vec<(Vec<u8>, Vec<u8>)> = Vec::new();
1067 salvage_tree_leaves(root, &mem, None, None, &mut raw_pairs, corruptions);
1068
1069 let mut tables = Vec::new();
1070 for (key_bytes, value_bytes) in &raw_pairs {
1071 let name = match core::str::from_utf8(key_bytes) {
1072 Ok(s) => s.to_string(),
1073 Err(_) => continue,
1074 };
1075 if name.starts_with('\0') {
1077 continue;
1078 }
1079 let vb = value_bytes.clone();
1081 let parsed = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
1082 <InternalTableDefinition as crate::types::Value>::from_bytes(&vb)
1083 }));
1084 match parsed {
1085 Ok(definition) => tables.push((name, definition)),
1086 Err(_) => {
1087 corruptions.push(CorruptPageInfo {
1088 page_number: 0,
1089 table_name: Some(name),
1090 description: "corrupt table definition".to_string(),
1091 });
1092 }
1093 }
1094 }
1095
1096 tables
1097 }
1098
1099 #[cfg(feature = "std")]
1110 pub fn verify_integrity(&self, level: VerifyLevel) -> Result<VerifyReport> {
1111 let start = Instant::now();
1112
1113 let header_valid = true;
1115
1116 if level == VerifyLevel::Header {
1117 return Ok(VerifyReport {
1118 valid: true,
1119 header_valid,
1120 pages_checked: 0,
1121 pages_corrupt: 0,
1122 structural_valid: None,
1123 corrupt_details: Vec::new(),
1124 duration: start.elapsed(),
1125 });
1126 }
1127
1128 let (pages_checked, mut corrupt_details) =
1129 Self::verify_primary_checksums_detailed(self.mem.clone())?;
1130 let pages_corrupt = corrupt_details.len() as u64;
1131
1132 let structural_valid = if level == VerifyLevel::Full {
1133 let structural_corruptions = Self::verify_primary_structure(self.mem.clone())?;
1134 if !structural_corruptions.is_empty() {
1135 corrupt_details.extend(structural_corruptions);
1136 Some(false)
1137 } else {
1138 Some(true)
1139 }
1140 } else {
1141 None
1142 };
1143
1144 let valid = pages_corrupt == 0 && structural_valid.unwrap_or(true);
1145
1146 Ok(VerifyReport {
1147 valid,
1148 header_valid,
1149 pages_checked,
1150 pages_corrupt,
1151 structural_valid,
1152 corrupt_details,
1153 duration: start.elapsed(),
1154 })
1155 }
1156
1157 pub fn check_integrity(&mut self) -> Result<bool, DatabaseError> {
1167 let allocator_hash = self.mem.allocator_hash();
1168 let mut was_clean = Arc::get_mut(&mut self.mem)
1169 .ok_or_else(|| {
1170 DatabaseError::Storage(StorageError::invalid_config(
1171 "check_integrity() requires exclusive database access, but other references to the memory exist",
1172 ))
1173 })?
1174 .clear_cache_and_reload()?;
1175
1176 let old_roots = [self.mem.get_data_root(), self.mem.get_system_root()];
1177
1178 let new_roots = Self::do_repair(&mut self.mem, &|_| {}).map_err(|err| match err {
1179 DatabaseError::Storage(storage_err) => storage_err,
1180 _ => StorageError::Internal(
1181 "unexpected non-storage error during integrity check repair".to_string(),
1182 ),
1183 })?;
1184
1185 if old_roots != new_roots || allocator_hash != self.mem.allocator_hash() {
1186 was_clean = false;
1187 }
1188
1189 if !was_clean {
1190 let next_transaction_id = self.mem.get_last_committed_transaction_id()?.next()?;
1191 let [data_root, system_root] = new_roots;
1192 self.mem.commit(
1193 data_root,
1194 system_root,
1195 next_transaction_id,
1196 true,
1197 ShrinkPolicy::Never,
1198 )?;
1199 }
1200
1201 self.mem.begin_writable()?;
1202
1203 Ok(was_clean)
1204 }
1205
1206 pub fn compact(&mut self) -> Result<bool, CompactionError> {
1210 if self
1211 .transaction_tracker
1212 .oldest_live_read_transaction()
1213 .map_err(CompactionError::Storage)?
1214 .is_some()
1215 {
1216 return Err(CompactionError::TransactionInProgress);
1217 }
1218 let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
1223 if txn.list_persistent_savepoints()?.next().is_some() {
1224 return Err(CompactionError::PersistentSavepointExists);
1225 }
1226 if self
1227 .transaction_tracker
1228 .any_savepoint_exists()
1229 .map_err(CompactionError::Storage)?
1230 {
1231 return Err(CompactionError::EphemeralSavepointExists);
1232 }
1233 txn.set_two_phase_commit(true);
1234 txn.commit().map_err(|e| e.into_storage_error())?;
1235 let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
1237 txn.set_two_phase_commit(true);
1238 txn.commit().map_err(|e| e.into_storage_error())?;
1239 let txn = self.begin_write().map_err(|e| e.into_storage_error())?;
1242 assert!(!txn.pending_free_pages()?);
1243 txn.abort()?;
1244
1245 let mut compacted = false;
1246 loop {
1248 let mut progress = false;
1249
1250 let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
1251 if txn.compact_pages()? {
1252 progress = true;
1253 txn.commit().map_err(|e| e.into_storage_error())?;
1254 } else {
1255 txn.abort()?;
1256 }
1257
1258 let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
1260 txn.set_two_phase_commit(true);
1261 txn.set_shrink_policy(ShrinkPolicy::Maximum);
1263 txn.commit().map_err(|e| e.into_storage_error())?;
1264 let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
1268 txn.set_two_phase_commit(true);
1269 txn.set_shrink_policy(ShrinkPolicy::Maximum);
1271 txn.commit().map_err(|e| e.into_storage_error())?;
1272 let txn = self.begin_write().map_err(|e| e.into_storage_error())?;
1273 assert!(!txn.pending_free_pages()?);
1274 txn.abort()?;
1275
1276 if !progress {
1277 break;
1278 }
1279
1280 compacted = true;
1281 }
1282
1283 Ok(compacted)
1284 }
1285
1286 pub fn compact_blobs(&mut self) -> core::result::Result<BlobCompactionReport, CompactionError> {
1296 if self
1297 .transaction_tracker
1298 .oldest_live_read_transaction()
1299 .map_err(CompactionError::Storage)?
1300 .is_some()
1301 {
1302 return Err(CompactionError::TransactionInProgress);
1303 }
1304
1305 {
1307 let txn = self.begin_write().map_err(|e| e.into_storage_error())?;
1308 if txn.list_persistent_savepoints()?.next().is_some() {
1309 txn.abort().map_err(CompactionError::Storage)?;
1310 return Err(CompactionError::PersistentSavepointExists);
1311 }
1312 if self
1313 .transaction_tracker
1314 .any_savepoint_exists()
1315 .map_err(CompactionError::Storage)?
1316 {
1317 txn.abort().map_err(CompactionError::Storage)?;
1318 return Err(CompactionError::EphemeralSavepointExists);
1319 }
1320 txn.abort().map_err(CompactionError::Storage)?;
1321 }
1322
1323 let stats = {
1325 let txn = self.begin_write().map_err(|e| e.into_storage_error())?;
1326 let s = txn.blob_stats().map_err(CompactionError::Storage)?;
1327 txn.abort().map_err(CompactionError::Storage)?;
1328 s
1329 };
1330
1331 if stats.dead_bytes == 0 {
1332 return Ok(BlobCompactionReport {
1333 blobs_relocated: 0,
1334 live_bytes: stats.live_bytes,
1335 bytes_reclaimed: 0,
1336 was_noop: true,
1337 });
1338 }
1339
1340 let old_region_length = stats.region_bytes;
1341
1342 let (blobs_relocated, total_live_size) = {
1344 let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
1345 let result = txn.compact_blobs_pass(false);
1346 match result {
1347 Ok(r) => {
1348 txn.set_two_phase_commit(true);
1349 txn.commit().map_err(|e| e.into_storage_error())?;
1350 r
1351 }
1352 Err(e) => {
1353 txn.abort().map_err(CompactionError::Storage)?;
1354 return Err(CompactionError::Storage(e));
1355 }
1356 }
1357 };
1358
1359 {
1361 let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
1362 let result = txn.compact_blobs_pass(true);
1363 match result {
1364 Ok(_) => {
1365 txn.set_two_phase_commit(true);
1366 txn.commit().map_err(|e| e.into_storage_error())?;
1367 }
1368 Err(e) => {
1369 txn.abort().map_err(CompactionError::Storage)?;
1370 return Err(CompactionError::Storage(e));
1371 }
1372 }
1373 }
1374
1375 let blob_state = self.mem.get_committed_blob_state();
1381 let target_len = blob_state.region_offset + blob_state.region_length;
1382 if target_len > 0 {
1383 self.mem
1384 .truncate_to(target_len)
1385 .map_err(CompactionError::Storage)?;
1386 }
1387
1388 Ok(BlobCompactionReport {
1389 blobs_relocated,
1390 live_bytes: total_live_size,
1391 bytes_reclaimed: old_region_length - total_live_size,
1392 was_noop: false,
1393 })
1394 }
1395
1396 pub fn should_compact_blobs(&self) -> Result<Option<BlobStats>, TransactionError> {
1402 let txn = self.begin_write().map_err(|e| e.into_storage_error())?;
1403 let stats = txn.blob_stats().map_err(TransactionError::Storage)?;
1404 txn.abort().map_err(TransactionError::Storage)?;
1405
1406 let policy = &self.blob_compaction_policy;
1407 if stats.dead_bytes >= policy.min_dead_bytes
1408 && stats.fragmentation_ratio >= policy.fragmentation_threshold
1409 {
1410 Ok(Some(stats))
1411 } else {
1412 Ok(None)
1413 }
1414 }
1415
1416 pub fn compact_blobs_with_progress(
1423 &mut self,
1424 mut callback: impl FnMut(u64, u64, u64, u64) -> bool,
1425 ) -> core::result::Result<BlobCompactionReport, CompactionError> {
1426 if self
1427 .transaction_tracker
1428 .oldest_live_read_transaction()
1429 .map_err(CompactionError::Storage)?
1430 .is_some()
1431 {
1432 return Err(CompactionError::TransactionInProgress);
1433 }
1434
1435 {
1437 let txn = self.begin_write().map_err(|e| e.into_storage_error())?;
1438 if txn.list_persistent_savepoints()?.next().is_some() {
1439 txn.abort().map_err(CompactionError::Storage)?;
1440 return Err(CompactionError::PersistentSavepointExists);
1441 }
1442 if self
1443 .transaction_tracker
1444 .any_savepoint_exists()
1445 .map_err(CompactionError::Storage)?
1446 {
1447 txn.abort().map_err(CompactionError::Storage)?;
1448 return Err(CompactionError::EphemeralSavepointExists);
1449 }
1450 txn.abort().map_err(CompactionError::Storage)?;
1451 }
1452
1453 let stats = {
1455 let txn = self.begin_write().map_err(|e| e.into_storage_error())?;
1456 let s = txn.blob_stats().map_err(CompactionError::Storage)?;
1457 txn.abort().map_err(CompactionError::Storage)?;
1458 s
1459 };
1460
1461 if stats.dead_bytes == 0 {
1462 return Ok(BlobCompactionReport {
1463 blobs_relocated: 0,
1464 live_bytes: stats.live_bytes,
1465 bytes_reclaimed: 0,
1466 was_noop: true,
1467 });
1468 }
1469
1470 let old_region_length = stats.region_bytes;
1471 let total_blobs = stats.blob_count;
1472 let total_bytes = stats.live_bytes;
1473
1474 if !callback(0, total_blobs, 0, total_bytes) {
1476 return Err(CompactionError::Cancelled);
1477 }
1478
1479 let (blobs_relocated, total_live_size) = {
1481 let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
1482 let result = txn.compact_blobs_pass(false);
1483 match result {
1484 Ok(r) => {
1485 txn.set_two_phase_commit(true);
1486 txn.commit().map_err(|e| e.into_storage_error())?;
1487 r
1488 }
1489 Err(e) => {
1490 txn.abort().map_err(CompactionError::Storage)?;
1491 return Err(CompactionError::Storage(e));
1492 }
1493 }
1494 };
1495
1496 if !callback(blobs_relocated, total_blobs, total_live_size, total_bytes) {
1498 return Err(CompactionError::Cancelled);
1501 }
1502
1503 {
1505 let mut txn = self.begin_write().map_err(|e| e.into_storage_error())?;
1506 let result = txn.compact_blobs_pass(true);
1507 match result {
1508 Ok(_) => {
1509 txn.set_two_phase_commit(true);
1510 txn.commit().map_err(|e| e.into_storage_error())?;
1511 }
1512 Err(e) => {
1513 txn.abort().map_err(CompactionError::Storage)?;
1514 return Err(CompactionError::Storage(e));
1515 }
1516 }
1517 }
1518
1519 let blob_state = self.mem.get_committed_blob_state();
1521 let target_len = blob_state.region_offset + blob_state.region_length;
1522 if target_len > 0 {
1523 self.mem
1524 .truncate_to(target_len)
1525 .map_err(CompactionError::Storage)?;
1526 }
1527
1528 let _ = callback(blobs_relocated, total_blobs, total_live_size, total_bytes);
1530
1531 Ok(BlobCompactionReport {
1532 blobs_relocated,
1533 live_bytes: total_live_size,
1534 bytes_reclaimed: old_region_length - total_live_size,
1535 was_noop: false,
1536 })
1537 }
1538
1539 pub fn start_compaction(&self) -> core::result::Result<CompactionHandle<'_>, CompactionError> {
1559 let txn = self.begin_write().map_err(|e| e.into_storage_error())?;
1561 if txn.list_persistent_savepoints()?.next().is_some() {
1562 txn.abort().map_err(CompactionError::Storage)?;
1563 return Err(CompactionError::PersistentSavepointExists);
1564 }
1565 if self
1566 .transaction_tracker
1567 .any_savepoint_exists()
1568 .map_err(CompactionError::Storage)?
1569 {
1570 txn.abort().map_err(CompactionError::Storage)?;
1571 return Err(CompactionError::EphemeralSavepointExists);
1572 }
1573 txn.abort().map_err(CompactionError::Storage)?;
1574
1575 Ok(CompactionHandle { db: self })
1576 }
1577
1578 pub fn start_blob_compaction(
1588 &self,
1589 ) -> core::result::Result<BlobCompactionHandle<'_>, CompactionError> {
1590 if self
1591 .transaction_tracker
1592 .oldest_live_read_transaction()
1593 .map_err(CompactionError::Storage)?
1594 .is_some()
1595 {
1596 return Err(CompactionError::TransactionInProgress);
1597 }
1598
1599 {
1601 let txn = self.begin_write().map_err(|e| e.into_storage_error())?;
1602 if txn.list_persistent_savepoints()?.next().is_some() {
1603 txn.abort().map_err(CompactionError::Storage)?;
1604 return Err(CompactionError::PersistentSavepointExists);
1605 }
1606 if self
1607 .transaction_tracker
1608 .any_savepoint_exists()
1609 .map_err(CompactionError::Storage)?
1610 {
1611 txn.abort().map_err(CompactionError::Storage)?;
1612 return Err(CompactionError::EphemeralSavepointExists);
1613 }
1614 txn.abort().map_err(CompactionError::Storage)?;
1615 }
1616
1617 let stats = {
1619 let txn = self.begin_write().map_err(|e| e.into_storage_error())?;
1620 let s = txn.blob_stats().map_err(CompactionError::Storage)?;
1621 txn.abort().map_err(CompactionError::Storage)?;
1622 s
1623 };
1624
1625 if stats.dead_bytes == 0 {
1626 return Ok(BlobCompactionHandle {
1628 db: self,
1629 stats,
1630 phase: 2,
1631 blobs_relocated: 0,
1632 live_bytes: stats.live_bytes,
1633 });
1634 }
1635
1636 Ok(BlobCompactionHandle {
1637 db: self,
1638 stats,
1639 phase: 0,
1640 blobs_relocated: 0,
1641 live_bytes: 0,
1642 })
1643 }
1644
1645 #[cfg(feature = "std")]
1653 pub fn start_integrity_scanner(
1654 &self,
1655 config: crate::integrity_scanner::IntegrityScannerConfig,
1656 ) -> Result<crate::integrity_scanner::IntegrityScannerHandle, DatabaseError> {
1657 crate::integrity_scanner::IntegrityScannerHandle::start(self.mem.clone(), config)
1658 .map_err(DatabaseError::from)
1659 }
1660
1661 #[cfg(feature = "std")]
1667 pub fn export_incremental(
1668 &self,
1669 since_txn: u64,
1670 ) -> core::result::Result<crate::incremental::IncrementalSnapshot, StorageError> {
1671 crate::incremental::export_incremental(self, since_txn)
1672 }
1673
1674 #[cfg(feature = "std")]
1680 pub fn import_incremental(
1681 &self,
1682 snapshot: &crate::incremental::IncrementalSnapshot,
1683 ) -> core::result::Result<crate::incremental::IncrementalImportReport, StorageError> {
1684 crate::incremental::import_incremental(self, snapshot)
1685 }
1686
1687 #[cfg(feature = "std")]
1692 pub fn backup_incremental(
1693 &self,
1694 dest: impl AsRef<std::path::Path>,
1695 since_txn: u64,
1696 ) -> core::result::Result<crate::incremental::IncrementalBackupReport, StorageError> {
1697 crate::incremental::backup_incremental(self, dest.as_ref(), since_txn)
1698 }
1699
1700 #[cfg(feature = "std")]
1705 pub fn apply_incremental_backup(
1706 &self,
1707 path: impl AsRef<std::path::Path>,
1708 ) -> core::result::Result<crate::incremental::IncrementalImportReport, StorageError> {
1709 crate::incremental::apply_incremental_backup(self, path.as_ref())
1710 }
1711
1712 #[cfg_attr(not(debug_assertions), expect(dead_code))]
1713 fn check_repaired_allocated_pages_table(
1714 system_root: Option<BtreeHeader>,
1715 mem: Arc<TransactionalMemory>,
1716 ) -> Result {
1717 let table_tree = TableTree::new(
1718 system_root,
1719 PageHint::None,
1720 Arc::new(TransactionGuard::Verification),
1721 mem.clone(),
1722 )?;
1723 if let Some(table_def) = table_tree
1724 .get_table::<TransactionIdWithPagination, PageList>(
1725 DATA_ALLOCATED_TABLE.name(),
1726 TableType::Normal,
1727 )
1728 .map_err(|e| e.into_storage_error_or_internal("Allocated pages table corrupted"))?
1729 {
1730 let InternalTableDefinition::Normal { table_root, .. } = table_def else {
1731 return Err(StorageError::Internal(
1732 "unexpected non-normal table type for allocated pages table".to_string(),
1733 ));
1734 };
1735 let table: ReadOnlyTable<TransactionIdWithPagination, PageList> =
1736 ReadOnlyTable::new_uncompressed(
1737 DATA_ALLOCATED_TABLE.name().to_string(),
1738 table_root,
1739 PageHint::None,
1740 Arc::new(TransactionGuard::Verification),
1741 mem.clone(),
1742 )?;
1743 for result in table.range::<TransactionIdWithPagination>(..)? {
1744 let (_, pages) = result?;
1745 for i in 0..pages.value().len() {
1746 assert!(mem.is_allocated(pages.value().get(i)));
1747 }
1748 }
1749 }
1750
1751 Ok(())
1752 }
1753
1754 fn visit_freed_tree<K: Key, V: Value, F>(
1755 system_root: Option<BtreeHeader>,
1756 table_def: SystemTableDefinition<K, V>,
1757 mem: Arc<TransactionalMemory>,
1758 mut visitor: F,
1759 ) -> Result
1760 where
1761 F: FnMut(PageNumber) -> Result,
1762 {
1763 let fake_guard = Arc::new(TransactionGuard::Verification);
1764 let system_tree = TableTree::new(system_root, PageHint::None, fake_guard, mem.clone())?;
1765 let table_name = table_def.name();
1766 let result = match system_tree.get_table::<K, V>(table_name, TableType::Normal) {
1767 Ok(result) => result,
1768 Err(TableError::Storage(err)) => {
1769 return Err(err);
1770 }
1771 Err(TableError::TableDoesNotExist(_)) => {
1772 return Ok(());
1773 }
1774 Err(_) => {
1775 return Err(StorageError::Corrupted(format!(
1776 "Unable to open {table_name}"
1777 )));
1778 }
1779 };
1780
1781 if let Some(definition) = result {
1782 let table_root = match definition {
1783 InternalTableDefinition::Normal { table_root, .. } => table_root,
1784 InternalTableDefinition::Multimap { .. } => {
1785 return Err(StorageError::Corrupted(
1786 "unexpected multimap table type in freed tree lookup".to_string(),
1787 ));
1788 }
1789 };
1790 let table: ReadOnlyTable<TransactionIdWithPagination, PageList<'static>> =
1791 ReadOnlyTable::new_uncompressed(
1792 table_name.to_string(),
1793 table_root,
1794 PageHint::None,
1795 Arc::new(TransactionGuard::Verification),
1796 mem.clone(),
1797 )?;
1798 for result in table.range::<TransactionIdWithPagination>(..)? {
1799 let (_, page_list) = result?;
1800 for i in 0..page_list.value().len() {
1801 visitor(page_list.value().get(i))?;
1802 }
1803 }
1804 }
1805
1806 Ok(())
1807 }
1808
1809 #[cfg(debug_assertions)]
1810 fn mark_allocated_page_for_debug(
1811 mem: &mut Arc<TransactionalMemory>, ) -> Result {
1813 let data_root = mem.get_data_root();
1814 {
1815 let fake = Arc::new(TransactionGuard::Verification);
1816 let tables = TableTree::new(data_root, PageHint::None, fake, mem.clone())?;
1817 tables.visit_all_pages(|path| {
1818 mem.mark_debug_allocated_page(path.page_number());
1819 Ok(())
1820 })?;
1821 }
1822
1823 let system_root = mem.get_system_root();
1824 {
1825 let fake = Arc::new(TransactionGuard::Verification);
1826 let system_tables = TableTree::new(system_root, PageHint::None, fake, mem.clone())?;
1827 system_tables.visit_all_pages(|path| {
1828 mem.mark_debug_allocated_page(path.page_number());
1829 Ok(())
1830 })?;
1831 }
1832
1833 Self::visit_freed_tree(system_root, DATA_FREED_TABLE, mem.clone(), |page| {
1834 mem.mark_debug_allocated_page(page);
1835 Ok(())
1836 })?;
1837 Self::visit_freed_tree(system_root, SYSTEM_FREED_TABLE, mem.clone(), |page| {
1838 mem.mark_debug_allocated_page(page);
1839 Ok(())
1840 })?;
1841
1842 Ok(())
1843 }
1844
1845 fn do_repair(
1846 mem: &mut Arc<TransactionalMemory>, repair_callback: &(dyn Fn(&mut RepairSession) + 'static),
1848 ) -> Result<[Option<BtreeHeader>; 2], DatabaseError> {
1849 if !Self::verify_primary_checksums(mem.clone())? {
1850 if mem.used_two_phase_commit() {
1851 return Err(DatabaseError::Storage(StorageError::Corrupted(
1852 "Primary is corrupted despite 2-phase commit".to_string(),
1853 )));
1854 }
1855
1856 let mut handle = RepairSession::new(0.3);
1858 repair_callback(&mut handle);
1859 if handle.aborted() {
1860 return Err(DatabaseError::RepairAborted);
1861 }
1862
1863 mem.repair_primary_corrupted();
1864 mem.clear_read_cache();
1868 if !Self::verify_primary_checksums(mem.clone())? {
1869 return Err(DatabaseError::Storage(StorageError::Corrupted(
1870 "Failed to repair database. All roots are corrupted".to_string(),
1871 )));
1872 }
1873 }
1874 let mut handle = RepairSession::new(0.6);
1876 repair_callback(&mut handle);
1877 if handle.aborted() {
1878 return Err(DatabaseError::RepairAborted);
1879 }
1880
1881 mem.begin_repair()?;
1882
1883 let data_root = mem.get_data_root();
1884 {
1885 let fake = Arc::new(TransactionGuard::Verification);
1886 let tables = TableTree::new(data_root, PageHint::None, fake, mem.clone())?;
1887 tables.visit_all_pages(|path| {
1888 mem.mark_page_allocated(path.page_number());
1889 Ok(())
1890 })?;
1891 }
1892
1893 let mut handle = RepairSession::new(0.9);
1895 repair_callback(&mut handle);
1896 if handle.aborted() {
1897 return Err(DatabaseError::RepairAborted);
1898 }
1899
1900 let system_root = mem.get_system_root();
1901 {
1902 let fake = Arc::new(TransactionGuard::Verification);
1903 let system_tables = TableTree::new(system_root, PageHint::None, fake, mem.clone())?;
1904 system_tables.visit_all_pages(|path| {
1905 mem.mark_page_allocated(path.page_number());
1906 Ok(())
1907 })?;
1908 }
1909
1910 Self::visit_freed_tree(system_root, DATA_FREED_TABLE, mem.clone(), |page| {
1911 mem.mark_page_allocated(page);
1912 Ok(())
1913 })?;
1914 Self::visit_freed_tree(system_root, SYSTEM_FREED_TABLE, mem.clone(), |page| {
1915 mem.mark_page_allocated(page);
1916 Ok(())
1917 })?;
1918 #[cfg(debug_assertions)]
1919 {
1920 Self::check_repaired_allocated_pages_table(system_root, mem.clone())?;
1921 }
1922
1923 mem.end_repair()?;
1924
1925 mem.clear_read_cache();
1928
1929 Ok([data_root, system_root])
1930 }
1931
1932 #[allow(clippy::too_many_arguments)]
1933 fn new(
1934 file: Box<dyn StorageBackend>,
1935 allow_initialize: bool,
1936 page_size: usize,
1937 region_size: Option<u64>,
1938 read_cache_size_bytes: usize,
1939 write_cache_size_bytes: usize,
1940 repair_callback: &(dyn Fn(&mut RepairSession) + 'static),
1941 compression: CompressionConfig,
1942 blob_dedup_config: BlobDedupConfig,
1943 memory_budget: Option<usize>,
1944 cdc_config: CdcConfig,
1945 history_retention: u64,
1946 read_verification: ReadVerification,
1947 read_verification_callback: Option<Arc<ReadVerificationCallback>>,
1948 blob_compaction_policy: BlobCompactionPolicy,
1949 observer: Arc<dyn DatabaseObserver>,
1950 #[cfg(feature = "metrics")] db_metrics: Arc<DbMetrics>,
1951 ) -> Result<Self, DatabaseError> {
1952 #[cfg(feature = "logging")]
1953 let file_path = format!("{:?}", &file);
1954 #[cfg(feature = "logging")]
1955 info!("Opening database {:?}", &file_path);
1956 let mem = TransactionalMemory::new(
1957 file,
1958 allow_initialize,
1959 page_size,
1960 region_size,
1961 read_cache_size_bytes,
1962 write_cache_size_bytes,
1963 false,
1964 compression,
1965 memory_budget,
1966 read_verification,
1967 read_verification_callback,
1968 )?;
1969 let mut mem = Arc::new(mem);
1970 if let Some(tree) = Self::get_allocator_state_table(&mem)? {
1973 #[cfg(feature = "logging")]
1974 info!("Found valid allocator state, full repair not needed");
1975 mem.load_allocator_state(&tree)?;
1976 #[cfg(debug_assertions)]
1977 Self::mark_allocated_page_for_debug(&mut mem)?;
1978 } else {
1979 #[cfg(feature = "logging")]
1980 warn!("Database {:?} not shutdown cleanly. Repairing", &file_path);
1981 let mut handle = RepairSession::new(0.0);
1982 repair_callback(&mut handle);
1983 if handle.aborted() {
1984 return Err(DatabaseError::RepairAborted);
1985 }
1986 let [data_root, system_root] = Self::do_repair(&mut mem, repair_callback)?;
1987 let next_transaction_id = mem.get_last_committed_transaction_id()?.next()?;
1988 mem.commit(
1989 data_root,
1990 system_root,
1991 next_transaction_id,
1992 true,
1993 ShrinkPolicy::Never,
1994 )?;
1995 }
1996
1997 mem.begin_writable()?;
1998 let next_transaction_id = mem.get_last_committed_transaction_id()?.next()?;
1999
2000 let db = Database {
2001 mem,
2002 transaction_tracker: Arc::new(TransactionTracker::new(next_transaction_id)),
2003 blob_dedup_config: blob_dedup_config.clone(),
2004 cdc_config,
2005 history_retention,
2006 blob_compaction_policy,
2007 observer,
2008 #[cfg(feature = "metrics")]
2009 db_metrics,
2010 #[cfg(feature = "std")]
2011 group_committer: GroupCommitter::new(),
2012 };
2013
2014 let txn = db.begin_write().map_err(|e| e.into_storage_error())?;
2016 if let Some(next_id) = txn.next_persistent_savepoint_id()? {
2017 db.transaction_tracker
2018 .restore_savepoint_counter_state(next_id)?;
2019 }
2020 for id in txn.list_persistent_savepoints()? {
2021 let savepoint = match txn.get_persistent_savepoint(id) {
2022 Ok(savepoint) => savepoint,
2023 Err(err) => match err {
2024 SavepointError::InvalidSavepoint => {
2025 return Err(StorageError::Corrupted(
2026 "invalid savepoint encountered during database initialization"
2027 .to_string(),
2028 )
2029 .into());
2030 }
2031 SavepointError::Storage(storage) => {
2032 return Err(storage.into());
2033 }
2034 },
2035 };
2036 db.transaction_tracker
2037 .register_persistent_savepoint(&savepoint)?;
2038 }
2039 let history_ids = txn.list_history_snapshot_ids()?;
2041 if history_retention > 0 {
2042 for id in &history_ids {
2043 db.transaction_tracker
2044 .register_history_hold(TransactionId::new(*id))?;
2045 }
2046 }
2047 txn.abort()?;
2048
2049 if history_retention == 0 && !history_ids.is_empty() {
2052 let txn = db.begin_write().map_err(|e| e.into_storage_error())?;
2053 txn.purge_all_history_snapshots()?;
2054 txn.commit().map_err(|e| match e {
2055 CommitError::Storage(s) => DatabaseError::Storage(s),
2056 })?;
2057 }
2058
2059 Ok(db)
2060 }
2061
2062 fn get_allocator_state_table(
2063 mem: &Arc<TransactionalMemory>,
2064 ) -> Result<Option<AllocatorStateTree>> {
2065 if !mem.used_two_phase_commit() {
2067 return Ok(None);
2068 }
2069
2070 let system_table_tree = TableTree::new(
2072 mem.get_system_root(),
2073 PageHint::None,
2074 Arc::new(TransactionGuard::Verification),
2075 mem.clone(),
2076 )?;
2077 let Some(allocator_state_table) = system_table_tree
2078 .get_table::<AllocatorStateKey, &[u8]>(ALLOCATOR_STATE_TABLE_NAME, TableType::Normal)
2079 .map_err(|e| e.into_storage_error_or_internal("Unexpected TableError"))?
2080 else {
2081 return Ok(None);
2082 };
2083
2084 let InternalTableDefinition::Normal { table_root, .. } = allocator_state_table else {
2086 return Err(StorageError::Corrupted(
2087 "unexpected non-normal table type for allocator state table".to_string(),
2088 ));
2089 };
2090 let tree = Btree::new_uncompressed(
2091 table_root,
2092 PageHint::None,
2093 Arc::new(TransactionGuard::Verification),
2094 mem.clone(),
2095 )?;
2096
2097 if !mem.is_valid_allocator_state(&tree)? {
2099 return Ok(None);
2100 }
2101
2102 Ok(Some(tree))
2103 }
2104
2105 fn allocate_read_transaction(&self) -> Result<TransactionGuard> {
2106 let id = self
2107 .transaction_tracker
2108 .register_read_transaction(&self.mem)?;
2109
2110 Ok(TransactionGuard::new_read(
2111 id,
2112 self.transaction_tracker.clone(),
2113 ))
2114 }
2115
2116 pub fn builder() -> Builder {
2118 Builder::new()
2119 }
2120
2121 pub fn begin_write(&self) -> Result<WriteTransaction, TransactionError> {
2127 self.mem.check_io_errors()?;
2129 let guard = TransactionGuard::new_write(
2130 self.transaction_tracker.start_write_transaction()?,
2131 self.transaction_tracker.clone(),
2132 );
2133 WriteTransaction::new(
2134 guard,
2135 self.transaction_tracker.clone(),
2136 self.mem.clone(),
2137 self.blob_dedup_config.clone(),
2138 self.cdc_config.clone(),
2139 self.history_retention,
2140 Arc::clone(&self.observer),
2141 #[cfg(feature = "metrics")]
2142 Arc::clone(&self.db_metrics),
2143 )
2144 .map_err(|e| e.into())
2145 }
2146
2147 pub fn observer(&self) -> &Arc<dyn DatabaseObserver> {
2149 &self.observer
2150 }
2151
2152 #[cfg(feature = "metrics")]
2154 pub fn metrics(&self) -> &DbMetrics {
2155 &self.db_metrics
2156 }
2157
2158 pub fn begin_read_at(&self, transaction_id: u64) -> Result<ReadTransaction, TransactionError> {
2166 let lookup_txn = self.begin_read()?;
2167 let snapshot = lookup_txn
2168 .get_history_snapshot_ro(transaction_id)
2169 .map_err(TransactionError::Storage)?
2170 .ok_or(TransactionError::Storage(
2171 StorageError::HistorySnapshotNotFound(transaction_id),
2172 ))?;
2173 let user_root = snapshot.user_root();
2174 let guard = self.allocate_read_transaction()?;
2175 drop(lookup_txn);
2176 ReadTransaction::new_historical(
2177 self.mem.clone(),
2178 guard,
2179 user_root,
2180 Arc::clone(&self.observer),
2181 #[cfg(feature = "metrics")]
2182 Arc::clone(&self.db_metrics),
2183 )
2184 }
2185
2186 #[cfg(feature = "std")]
2191 pub fn begin_read_at_time(
2192 &self,
2193 timestamp_ms: u64,
2194 ) -> Result<ReadTransaction, TransactionError> {
2195 let lookup_txn = self.begin_read()?;
2196 let ids = lookup_txn
2197 .list_history_snapshot_ids_ro()
2198 .map_err(TransactionError::Storage)?;
2199 let mut best: Option<Option<BtreeHeader>> = None;
2200 for id in ids {
2201 if let Some(snap) = lookup_txn
2202 .get_history_snapshot_ro(id)
2203 .map_err(TransactionError::Storage)?
2204 && snap.timestamp_ms() <= timestamp_ms
2205 {
2206 best = Some(snap.user_root());
2207 }
2208 }
2209 let best_root = best.ok_or(TransactionError::Storage(
2210 StorageError::HistorySnapshotNotFound(timestamp_ms),
2211 ))?;
2212 let guard = self.allocate_read_transaction()?;
2213 drop(lookup_txn);
2214 ReadTransaction::new_historical(
2215 self.mem.clone(),
2216 guard,
2217 best_root,
2218 Arc::clone(&self.observer),
2219 #[cfg(feature = "metrics")]
2220 Arc::clone(&self.db_metrics),
2221 )
2222 }
2223
2224 pub fn transaction_history(&self) -> Result<Vec<TransactionInfo>, TransactionError> {
2228 let lookup_txn = self.begin_read()?;
2229 let ids = lookup_txn
2230 .list_history_snapshot_ids_ro()
2231 .map_err(TransactionError::Storage)?;
2232 let mut result = Vec::with_capacity(ids.len());
2233 for id in ids {
2234 if let Some(snap) = lookup_txn
2235 .get_history_snapshot_ro(id)
2236 .map_err(TransactionError::Storage)?
2237 {
2238 result.push(TransactionInfo {
2239 transaction_id: id,
2240 timestamp_ms: snap.timestamp_ms(),
2241 });
2242 }
2243 }
2244 Ok(result)
2245 }
2246
2247 #[cfg(feature = "std")]
2262 pub fn submit_write_batch(&self, batch: WriteBatch) -> Result<(), GroupCommitError> {
2263 let (should_lead, result_rx) = self.group_committer.enqueue(batch)?;
2264
2265 if should_lead {
2266 self.run_group_commit();
2267 }
2268
2269 result_rx.recv().unwrap_or(Err(GroupCommitError::Shutdown))
2270 }
2271
2272 #[cfg(feature = "std")]
2273 fn run_group_commit(&self) {
2274 let Ok(mut batches) = self.group_committer.drain_pending() else {
2277 let _ = self.group_committer.finish_leader();
2279 return;
2280 };
2281
2282 loop {
2283 if batches.is_empty() {
2284 match self.group_committer.finish_leader() {
2288 Ok(remaining) if remaining.is_empty() => return,
2289 Ok(remaining) => {
2290 batches = remaining;
2291 continue;
2292 }
2293 Err(_) => return,
2294 }
2295 }
2296
2297 let txn = match self.begin_write() {
2298 Ok(txn) => txn,
2299 Err(e) => {
2300 let msg = e.into_storage_error().to_string();
2301 for b in batches {
2302 let _ = b.result_tx.send(Err(GroupCommitError::TransactionFailed(
2303 StorageError::Corrupted(msg.clone()),
2304 )));
2305 }
2306 let _ = self.group_committer.finish_leader();
2307 return;
2308 }
2309 };
2310
2311 let mut senders = Vec::with_capacity(batches.len());
2312 let mut failed = false;
2313
2314 for pending in batches {
2315 if failed {
2316 let _ = pending.result_tx.send(Err(GroupCommitError::PeerFailed));
2317 continue;
2318 }
2319
2320 match pending.batch.apply(&txn) {
2321 Ok(()) => {
2322 senders.push(pending.result_tx);
2323 }
2324 Err(e) => {
2325 failed = true;
2326 let _ = pending
2327 .result_tx
2328 .send(Err(GroupCommitError::BatchFailed(e)));
2329 for tx in senders.drain(..) {
2330 let _ = tx.send(Err(GroupCommitError::PeerFailed));
2331 }
2332 }
2333 }
2334 }
2335
2336 if failed {
2337 let _ = txn.abort();
2338 let Ok(b) = self.group_committer.drain_pending() else {
2340 let _ = self.group_committer.finish_leader();
2341 return;
2342 };
2343 batches = b;
2344 continue;
2345 }
2346
2347 match txn.commit() {
2348 Ok(()) => {
2349 for tx in senders {
2350 let _ = tx.send(Ok(()));
2351 }
2352 }
2353 Err(e) => {
2354 let msg = e.into_storage_error().to_string();
2355 for tx in senders {
2356 let _ = tx.send(Err(GroupCommitError::CommitFailed(
2357 StorageError::Corrupted(msg.clone()),
2358 )));
2359 }
2360 }
2361 }
2362
2363 let Ok(b) = self.group_committer.drain_pending() else {
2365 let _ = self.group_committer.finish_leader();
2366 return;
2367 };
2368 batches = b;
2369 }
2370 }
2371
2372 fn ensure_allocator_state_table_and_trim(&self) -> Result<(), Error> {
2373 #[cfg(feature = "logging")]
2375 debug!("Writing allocator state table");
2376 let mut tx = self.begin_write()?;
2377 tx.set_quick_repair(true);
2378 tx.set_shrink_policy(ShrinkPolicy::Maximum);
2379 tx.commit()?;
2380
2381 Ok(())
2382 }
2383}
2384
2385impl Drop for Database {
2386 fn drop(&mut self) {
2387 #[cfg(feature = "std")]
2388 self.group_committer.shutdown();
2389
2390 let is_panicking = {
2391 #[cfg(feature = "std")]
2392 {
2393 std::thread::panicking()
2394 }
2395 #[cfg(not(feature = "std"))]
2396 {
2397 false
2398 }
2399 };
2400
2401 if !is_panicking && self.ensure_allocator_state_table_and_trim().is_err() {
2402 #[cfg(feature = "logging")]
2403 warn!("Failed to write allocator state table. Repair may be required at restart.");
2404 }
2405
2406 if self.mem.close().is_err() {
2407 #[cfg(feature = "logging")]
2408 warn!("Failed to flush database file. Repair may be required at restart.");
2409 }
2410 }
2411}
2412
2413pub struct CompactionHandle<'db> {
2422 db: &'db Database,
2423}
2424
2425impl CompactionHandle<'_> {
2426 pub fn step(&self) -> core::result::Result<CompactionProgress, CompactionError> {
2432 let mut txn = self.db.begin_write().map_err(|e| e.into_storage_error())?;
2434 txn.set_two_phase_commit(true);
2435 txn.commit().map_err(|e| e.into_storage_error())?;
2436
2437 let mut txn = self.db.begin_write().map_err(|e| e.into_storage_error())?;
2439 let relocated = txn.compact_pages()?;
2440 if relocated {
2441 txn.commit().map_err(|e| e.into_storage_error())?;
2442 } else {
2443 txn.abort()?;
2444 return Ok(CompactionProgress {
2445 pages_relocated: 0,
2446 complete: true,
2447 });
2448 }
2449
2450 let mut txn = self.db.begin_write().map_err(|e| e.into_storage_error())?;
2454 txn.set_two_phase_commit(true);
2455 txn.set_shrink_policy(ShrinkPolicy::Maximum);
2456 txn.commit().map_err(|e| e.into_storage_error())?;
2457
2458 let mut txn = self.db.begin_write().map_err(|e| e.into_storage_error())?;
2459 txn.set_two_phase_commit(true);
2460 txn.set_shrink_policy(ShrinkPolicy::Maximum);
2461 txn.commit().map_err(|e| e.into_storage_error())?;
2462
2463 let progress = CompactionProgress {
2464 pages_relocated: 1, complete: false,
2466 };
2467 self.db.observer.on_compaction_step(&progress);
2468 #[cfg(feature = "metrics")]
2469 self.db
2470 .db_metrics
2471 .compaction_pages_relocated
2472 .fetch_add(1, portable_atomic::Ordering::Relaxed);
2473 Ok(progress)
2474 }
2475
2476 pub fn run(&self) -> core::result::Result<u64, CompactionError> {
2478 let mut steps = 0u64;
2479 loop {
2480 let progress = self.step()?;
2481 if progress.complete {
2482 break;
2483 }
2484 steps += 1;
2485 }
2486 self.db.observer.on_compaction_complete(steps);
2487 Ok(steps)
2488 }
2489}
2490
2491#[derive(Debug, Clone, Copy)]
2493pub struct BlobCompactionProgress {
2494 pub blobs_relocated: u64,
2496 pub live_bytes: u64,
2498 pub phase: u8,
2500 pub complete: bool,
2502}
2503
2504pub struct BlobCompactionHandle<'db> {
2515 db: &'db Database,
2516 stats: BlobStats,
2517 phase: u8,
2518 blobs_relocated: u64,
2519 live_bytes: u64,
2520}
2521
2522impl BlobCompactionHandle<'_> {
2523 pub fn step(&mut self) -> core::result::Result<BlobCompactionProgress, CompactionError> {
2528 if self.phase == 0 {
2529 let mut txn = self.db.begin_write().map_err(|e| e.into_storage_error())?;
2531 let result = txn.compact_blobs_pass(false);
2532 match result {
2533 Ok((relocated, live_size)) => {
2534 txn.set_two_phase_commit(true);
2535 txn.commit().map_err(|e| e.into_storage_error())?;
2536 self.blobs_relocated = relocated;
2537 self.live_bytes = live_size;
2538 self.phase = 1;
2539 Ok(BlobCompactionProgress {
2540 blobs_relocated: relocated,
2541 live_bytes: live_size,
2542 phase: 1,
2543 complete: false,
2544 })
2545 }
2546 Err(e) => {
2547 txn.abort().map_err(CompactionError::Storage)?;
2548 Err(CompactionError::Storage(e))
2549 }
2550 }
2551 } else if self.phase == 1 {
2552 let mut txn = self.db.begin_write().map_err(|e| e.into_storage_error())?;
2554 let result = txn.compact_blobs_pass(true);
2555 match result {
2556 Ok(_) => {
2557 txn.set_two_phase_commit(true);
2558 txn.commit().map_err(|e| e.into_storage_error())?;
2559 }
2560 Err(e) => {
2561 txn.abort().map_err(CompactionError::Storage)?;
2562 return Err(CompactionError::Storage(e));
2563 }
2564 }
2565
2566 let blob_state = self.db.mem.get_committed_blob_state();
2568 let target_len = blob_state.region_offset + blob_state.region_length;
2569 if target_len > 0 {
2570 self.db
2571 .mem
2572 .truncate_to(target_len)
2573 .map_err(CompactionError::Storage)?;
2574 }
2575
2576 self.phase = 2;
2577 Ok(BlobCompactionProgress {
2578 blobs_relocated: self.blobs_relocated,
2579 live_bytes: self.live_bytes,
2580 phase: 2,
2581 complete: true,
2582 })
2583 } else {
2584 Ok(BlobCompactionProgress {
2586 blobs_relocated: self.blobs_relocated,
2587 live_bytes: self.live_bytes,
2588 phase: 2,
2589 complete: true,
2590 })
2591 }
2592 }
2593
2594 pub fn run(&mut self) -> core::result::Result<BlobCompactionReport, CompactionError> {
2596 loop {
2597 let progress = self.step()?;
2598 if progress.complete {
2599 let bytes_reclaimed = self.stats.region_bytes.saturating_sub(self.live_bytes);
2600 return Ok(BlobCompactionReport {
2601 blobs_relocated: self.blobs_relocated,
2602 live_bytes: self.live_bytes,
2603 bytes_reclaimed,
2604 was_noop: false,
2605 });
2606 }
2607 }
2608 }
2609}
2610
2611pub struct RepairSession {
2612 progress: f64,
2613 aborted: bool,
2614}
2615
2616impl RepairSession {
2617 pub(crate) fn new(progress: f64) -> Self {
2618 Self {
2619 progress,
2620 aborted: false,
2621 }
2622 }
2623
2624 pub(crate) fn aborted(&self) -> bool {
2625 self.aborted
2626 }
2627
2628 pub fn abort(&mut self) {
2630 self.aborted = true;
2631 }
2632
2633 pub fn progress(&self) -> f64 {
2635 self.progress
2636 }
2637}
2638
2639#[derive(Debug, Clone, Copy, PartialEq)]
2654pub enum ReadVerification {
2655 None,
2657 Sampled { rate: f32 },
2659 Full,
2661}
2662
2663impl Default for ReadVerification {
2664 fn default() -> Self {
2665 Self::None
2666 }
2667}
2668
2669#[derive(Debug, Clone, Copy, PartialEq, Eq)]
2671pub enum ReadVerificationAction {
2672 ReturnError,
2674 Continue,
2676}
2677
2678pub type ReadVerificationCallback = dyn Fn(u64) -> ReadVerificationAction + Send + Sync + 'static;
2683
2684pub struct Builder {
2686 page_size: usize,
2687 region_size: Option<u64>,
2688 read_cache_size_bytes: usize,
2689 write_cache_size_bytes: usize,
2690 compression: CompressionConfig,
2691 repair_callback: Box<dyn Fn(&mut RepairSession)>,
2692 blob_dedup_config: BlobDedupConfig,
2693 memory_budget: Option<usize>,
2694 cdc_config: CdcConfig,
2695 history_retention: u64,
2696 read_verification: ReadVerification,
2697 read_verification_callback: Option<Arc<ReadVerificationCallback>>,
2698 observer: Option<Arc<dyn DatabaseObserver>>,
2699 blob_compaction_policy: BlobCompactionPolicy,
2700}
2701
2702impl Builder {
2703 #[allow(clippy::new_without_default)]
2709 pub fn new() -> Self {
2710 let mut result = Self {
2711 page_size: PAGE_SIZE,
2715 region_size: None,
2716 read_cache_size_bytes: 0,
2718 write_cache_size_bytes: 0,
2720 compression: CompressionConfig::None,
2721 repair_callback: Box::new(|_| {}),
2722 blob_dedup_config: BlobDedupConfig::default(),
2723 memory_budget: None,
2724 cdc_config: CdcConfig::default(),
2725 history_retention: 0,
2726 read_verification: ReadVerification::None,
2727 read_verification_callback: None,
2728 observer: None,
2729 blob_compaction_policy: BlobCompactionPolicy::default(),
2730 };
2731
2732 result.set_cache_size(1024 * 1024 * 1024);
2733 result
2734 }
2735
2736 pub fn set_repair_callback(
2744 &mut self,
2745 callback: impl Fn(&mut RepairSession) + 'static,
2746 ) -> &mut Self {
2747 self.repair_callback = Box::new(callback);
2748 self
2749 }
2750
2751 pub fn set_observer(&mut self, observer: impl DatabaseObserver) -> &mut Self {
2758 self.observer = Some(Arc::new(observer));
2759 self
2760 }
2761
2762 pub fn set_blob_compaction_policy(&mut self, policy: BlobCompactionPolicy) -> &mut Self {
2768 self.blob_compaction_policy = policy;
2769 self
2770 }
2771
2772 #[cfg(any(fuzzing, test))]
2780 pub fn set_page_size(&mut self, size: usize) -> &mut Self {
2781 assert!(size.is_power_of_two());
2782 self.page_size = std::cmp::max(size, 512);
2783 self
2784 }
2785
2786 pub fn set_cache_size(&mut self, bytes: usize) -> &mut Self {
2788 self.read_cache_size_bytes = bytes / 10 * 9;
2790 self.write_cache_size_bytes = bytes / 10;
2791 self
2792 }
2793
2794 pub fn set_memory_budget(&mut self, bytes: usize) -> &mut Self {
2816 assert!(
2817 bytes >= 16384,
2818 "Memory budget must be at least 16 KiB (got {bytes} bytes). \
2819 Budgets below 4 page sizes cannot cache any data."
2820 );
2821 self.memory_budget = Some(bytes);
2822 self.read_cache_size_bytes = bytes / 100 * 70;
2823 self.write_cache_size_bytes = bytes / 100 * 20;
2824 self
2825 }
2826
2827 #[cfg(any(test, fuzzing))]
2828 pub fn set_region_size(&mut self, size: u64) -> &mut Self {
2829 assert!(size.is_power_of_two());
2830 self.region_size = Some(size);
2831 self
2832 }
2833
2834 pub fn set_compression(&mut self, compression: CompressionConfig) -> &mut Self {
2842 self.compression = compression;
2843 self
2844 }
2845
2846 pub fn set_blob_dedup(&mut self, enabled: bool) -> &mut Self {
2851 self.blob_dedup_config.enabled = enabled;
2852 self
2853 }
2854
2855 pub fn set_blob_dedup_min_size(&mut self, min_size: usize) -> &mut Self {
2860 self.blob_dedup_config.min_size = min_size;
2861 self
2862 }
2863
2864 pub fn set_cdc(&mut self, config: CdcConfig) -> &mut Self {
2871 self.cdc_config = config;
2872 self
2873 }
2874
2875 pub fn set_history_retention(&mut self, max_snapshots: u64) -> &mut Self {
2884 self.history_retention = max_snapshots;
2885 self
2886 }
2887
2888 pub fn set_read_verification(&mut self, mode: ReadVerification) -> &mut Self {
2895 self.read_verification = mode;
2896 self
2897 }
2898
2899 pub fn set_read_verification_callback(
2908 &mut self,
2909 callback: impl Fn(u64) -> ReadVerificationAction + Send + Sync + 'static,
2910 ) -> &mut Self {
2911 self.read_verification_callback = Some(Arc::new(callback));
2912 self
2913 }
2914
2915 #[cfg(feature = "std")]
2920 pub fn create(&self, path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
2921 let file = OpenOptions::new()
2922 .read(true)
2923 .write(true)
2924 .create(true)
2925 .truncate(false)
2926 .open(path)?;
2927
2928 Database::new(
2929 Box::new(FileBackend::new(file)?),
2930 true,
2931 self.page_size,
2932 self.region_size,
2933 self.read_cache_size_bytes,
2934 self.write_cache_size_bytes,
2935 &self.repair_callback,
2936 self.compression,
2937 self.blob_dedup_config.clone(),
2938 self.memory_budget,
2939 self.cdc_config.clone(),
2940 self.history_retention,
2941 self.read_verification,
2942 self.read_verification_callback.clone(),
2943 self.blob_compaction_policy,
2944 self.resolve_observer(),
2945 #[cfg(feature = "metrics")]
2946 Self::resolve_metrics(),
2947 )
2948 }
2949
2950 #[cfg(feature = "std")]
2952 pub fn open(&self, path: impl AsRef<Path>) -> Result<Database, DatabaseError> {
2953 let file = OpenOptions::new().read(true).write(true).open(path)?;
2954
2955 Database::new(
2956 Box::new(FileBackend::new(file)?),
2957 false,
2958 self.page_size,
2959 None,
2960 self.read_cache_size_bytes,
2961 self.write_cache_size_bytes,
2962 &self.repair_callback,
2963 self.compression,
2964 self.blob_dedup_config.clone(),
2965 self.memory_budget,
2966 self.cdc_config.clone(),
2967 self.history_retention,
2968 self.read_verification,
2969 self.read_verification_callback.clone(),
2970 self.blob_compaction_policy,
2971 self.resolve_observer(),
2972 #[cfg(feature = "metrics")]
2973 Self::resolve_metrics(),
2974 )
2975 }
2976
2977 #[cfg(feature = "std")]
2983 pub fn open_read_only(
2984 &self,
2985 path: impl AsRef<Path>,
2986 ) -> Result<ReadOnlyDatabase, DatabaseError> {
2987 let file = OpenOptions::new().read(true).open(path)?;
2988
2989 ReadOnlyDatabase::new(
2990 Box::new(FileBackend::new_internal(file, true)?),
2991 self.page_size,
2992 None,
2993 self.read_cache_size_bytes,
2994 self.compression,
2995 self.memory_budget,
2996 self.read_verification,
2997 self.read_verification_callback.clone(),
2998 )
2999 }
3000
3001 #[cfg(feature = "std")]
3005 pub fn create_file(&self, file: File) -> Result<Database, DatabaseError> {
3006 Database::new(
3007 Box::new(FileBackend::new(file)?),
3008 true,
3009 self.page_size,
3010 self.region_size,
3011 self.read_cache_size_bytes,
3012 self.write_cache_size_bytes,
3013 &self.repair_callback,
3014 self.compression,
3015 self.blob_dedup_config.clone(),
3016 self.memory_budget,
3017 self.cdc_config.clone(),
3018 self.history_retention,
3019 self.read_verification,
3020 self.read_verification_callback.clone(),
3021 self.blob_compaction_policy,
3022 self.resolve_observer(),
3023 #[cfg(feature = "metrics")]
3024 Self::resolve_metrics(),
3025 )
3026 }
3027
3028 pub fn create_with_backend(
3030 &self,
3031 backend: impl StorageBackend,
3032 ) -> Result<Database, DatabaseError> {
3033 Database::new(
3034 Box::new(backend),
3035 true,
3036 self.page_size,
3037 self.region_size,
3038 self.read_cache_size_bytes,
3039 self.write_cache_size_bytes,
3040 &self.repair_callback,
3041 self.compression,
3042 self.blob_dedup_config.clone(),
3043 self.memory_budget,
3044 self.cdc_config.clone(),
3045 self.history_retention,
3046 self.read_verification,
3047 self.read_verification_callback.clone(),
3048 self.blob_compaction_policy,
3049 self.resolve_observer(),
3050 #[cfg(feature = "metrics")]
3051 Self::resolve_metrics(),
3052 )
3053 }
3054
3055 fn resolve_observer(&self) -> Arc<dyn DatabaseObserver> {
3056 self.observer
3057 .as_ref()
3058 .map_or_else(default_observer, Arc::clone)
3059 }
3060
3061 #[cfg(feature = "metrics")]
3062 fn resolve_metrics() -> Arc<DbMetrics> {
3063 Arc::new(DbMetrics::new())
3064 }
3065}
3066
3067impl Debug for Database {
3068 fn fmt(&self, f: &mut Formatter<'_>) -> core::fmt::Result {
3069 f.debug_struct("Database").finish()
3070 }
3071}
3072
3073#[cfg(test)]
3074mod test {
3075 use crate::backends::FileBackend;
3076 use crate::error::BackendError;
3077 use crate::{
3078 CommitError, Database, DatabaseError, Durability, ReadableDatabase, ReadableTable,
3079 ReadableTableMetadata, StorageBackend, StorageError, TableDefinition, TransactionError,
3080 };
3081 use core::sync::atomic::Ordering;
3082 use portable_atomic::AtomicU64;
3083 use std::fs::File;
3084 use std::io::{ErrorKind, Read, Seek, SeekFrom};
3085 use std::sync::Arc;
3086
3087 #[derive(Debug)]
3088 struct FailingBackend {
3089 inner: FileBackend,
3090 countdown: Arc<AtomicU64>,
3091 }
3092
3093 impl FailingBackend {
3094 fn new(backend: FileBackend, countdown: u64) -> Self {
3095 Self {
3096 inner: backend,
3097 countdown: Arc::new(AtomicU64::new(countdown)),
3098 }
3099 }
3100
3101 fn check_countdown(&self) -> Result<(), BackendError> {
3102 if self.countdown.load(Ordering::SeqCst) == 0 {
3103 return Err(BackendError::from(std::io::Error::from(ErrorKind::Other)));
3104 }
3105
3106 Ok(())
3107 }
3108
3109 fn decrement_countdown(&self) -> Result<(), BackendError> {
3110 if self
3111 .countdown
3112 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| {
3113 if x > 0 { Some(x - 1) } else { None }
3114 })
3115 .is_err()
3116 {
3117 return Err(BackendError::from(std::io::Error::from(ErrorKind::Other)));
3118 }
3119
3120 Ok(())
3121 }
3122 }
3123
3124 impl StorageBackend for FailingBackend {
3125 fn len(&self) -> Result<u64, BackendError> {
3126 self.inner.len()
3127 }
3128
3129 fn read(&self, offset: u64, out: &mut [u8]) -> Result<(), BackendError> {
3130 self.check_countdown()?;
3131 self.inner.read(offset, out)
3132 }
3133
3134 fn set_len(&self, len: u64) -> Result<(), BackendError> {
3135 self.inner.set_len(len)
3136 }
3137
3138 fn sync_data(&self) -> Result<(), BackendError> {
3139 self.check_countdown()?;
3140 self.inner.sync_data()
3141 }
3142
3143 fn write(&self, offset: u64, data: &[u8]) -> Result<(), BackendError> {
3144 self.decrement_countdown()?;
3145 self.inner.write(offset, data)
3146 }
3147 }
3148
3149 #[test]
3150 fn crash_regression4() {
3151 let tmpfile = crate::create_tempfile();
3152 let (file, path) = tmpfile.into_parts();
3153
3154 let backend = FailingBackend::new(FileBackend::new(file).unwrap(), 20);
3155 let db = Database::builder()
3156 .set_cache_size(12686)
3157 .set_page_size(8 * 1024)
3158 .set_region_size(32 * 4096)
3159 .create_with_backend(backend)
3160 .unwrap();
3161
3162 let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
3163
3164 let tx = db.begin_write().unwrap();
3165 let _savepoint = tx.ephemeral_savepoint().unwrap();
3166 let _persistent_savepoint = tx.persistent_savepoint().unwrap();
3167 tx.commit().unwrap();
3168 let tx = db.begin_write().unwrap();
3169 {
3170 let mut table = tx.open_table(table_def).unwrap();
3171 let _ = table.insert_reserve(118821, 360).unwrap();
3172 }
3173 let result = tx.commit();
3174 assert!(result.is_err());
3175
3176 drop(db);
3177 Database::builder()
3178 .set_cache_size(1024 * 1024)
3179 .set_page_size(8 * 1024)
3180 .set_region_size(32 * 4096)
3181 .create(&path)
3182 .unwrap();
3183 }
3184
3185 #[test]
3186 fn transient_io_error() {
3187 let tmpfile = crate::create_tempfile();
3188 let (file, path) = tmpfile.into_parts();
3189
3190 let backend = FailingBackend::new(FileBackend::new(file).unwrap(), u64::MAX);
3191 let countdown = backend.countdown.clone();
3192 let db = Database::builder()
3193 .set_cache_size(0)
3194 .create_with_backend(backend)
3195 .unwrap();
3196
3197 let table_def: TableDefinition<u64, u64> = TableDefinition::new("x");
3198
3199 let tx = db.begin_write().unwrap();
3201 {
3202 let mut table = tx.open_table(table_def).unwrap();
3203 table.insert(0, 0).unwrap();
3204 }
3205 tx.commit().unwrap();
3206 let tx = db.begin_write().unwrap();
3207 {
3208 let mut table = tx.open_table(table_def).unwrap();
3209 table.insert(0, 1).unwrap();
3210 }
3211 tx.commit().unwrap();
3212
3213 let tx = db.begin_write().unwrap();
3214 countdown.store(0, Ordering::SeqCst);
3216 let result = tx.commit().err().unwrap();
3217 assert!(matches!(result, CommitError::Storage(StorageError::Io(_))));
3218 let result = db.begin_write().err().unwrap();
3219 assert!(matches!(
3220 result,
3221 TransactionError::Storage(StorageError::PreviousIo)
3222 ));
3223 countdown.store(u64::MAX, Ordering::SeqCst);
3225 drop(db);
3226
3227 let mut file = File::open(&path).unwrap();
3229 file.seek(SeekFrom::Start(9)).unwrap();
3230 let mut god_byte = vec![0u8];
3231 assert_eq!(file.read(&mut god_byte).unwrap(), 1);
3232 assert_ne!(god_byte[0] & 2, 0);
3233 }
3234
3235 #[test]
3236 fn small_pages() {
3237 let tmpfile = crate::create_tempfile();
3238
3239 let db = Database::builder()
3240 .set_page_size(512)
3241 .create(tmpfile.path())
3242 .unwrap();
3243
3244 let table_definition: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
3245 let txn = db.begin_write().unwrap();
3246 {
3247 txn.open_table(table_definition).unwrap();
3248 }
3249 txn.commit().unwrap();
3250 }
3251
3252 #[test]
3253 fn small_pages2() {
3254 let tmpfile = crate::create_tempfile();
3255
3256 let db = Database::builder()
3257 .set_page_size(512)
3258 .create(tmpfile.path())
3259 .unwrap();
3260
3261 let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
3262
3263 let mut tx = db.begin_write().unwrap();
3264 tx.set_two_phase_commit(true);
3265 let savepoint0 = tx.ephemeral_savepoint().unwrap();
3266 {
3267 tx.open_table(table_def).unwrap();
3268 }
3269 tx.commit().unwrap();
3270
3271 let mut tx = db.begin_write().unwrap();
3272 tx.set_two_phase_commit(true);
3273 let savepoint1 = tx.ephemeral_savepoint().unwrap();
3274 tx.restore_savepoint(&savepoint0).unwrap();
3275 tx.set_durability(Durability::None).unwrap();
3276 {
3277 let mut t = tx.open_table(table_def).unwrap();
3278 t.insert_reserve(&660503, 489).unwrap().as_mut().fill(0xFF);
3279 assert!(t.remove(&291295).unwrap().is_none());
3280 }
3281 tx.commit().unwrap();
3282
3283 let mut tx = db.begin_write().unwrap();
3284 tx.set_two_phase_commit(true);
3285 tx.restore_savepoint(&savepoint0).unwrap();
3286 {
3287 tx.open_table(table_def).unwrap();
3288 }
3289 tx.commit().unwrap();
3290
3291 let mut tx = db.begin_write().unwrap();
3292 tx.set_two_phase_commit(true);
3293 let savepoint2 = tx.ephemeral_savepoint().unwrap();
3294 drop(savepoint0);
3295 tx.restore_savepoint(&savepoint2).unwrap();
3296 {
3297 let mut t = tx.open_table(table_def).unwrap();
3298 assert!(t.get(&2059).unwrap().is_none());
3299 assert!(t.remove(&145227).unwrap().is_none());
3300 assert!(t.remove(&145227).unwrap().is_none());
3301 }
3302 tx.commit().unwrap();
3303
3304 let mut tx = db.begin_write().unwrap();
3305 tx.set_two_phase_commit(true);
3306 let savepoint3 = tx.ephemeral_savepoint().unwrap();
3307 drop(savepoint1);
3308 tx.restore_savepoint(&savepoint3).unwrap();
3309 {
3310 tx.open_table(table_def).unwrap();
3311 }
3312 tx.commit().unwrap();
3313
3314 let mut tx = db.begin_write().unwrap();
3315 tx.set_two_phase_commit(true);
3316 let savepoint4 = tx.ephemeral_savepoint().unwrap();
3317 drop(savepoint2);
3318 tx.restore_savepoint(&savepoint3).unwrap();
3319 tx.set_durability(Durability::None).unwrap();
3320 {
3321 let mut t = tx.open_table(table_def).unwrap();
3322 assert!(t.remove(&207936).unwrap().is_none());
3323 }
3324 tx.abort().unwrap();
3325
3326 let mut tx = db.begin_write().unwrap();
3327 tx.set_two_phase_commit(true);
3328 let savepoint5 = tx.ephemeral_savepoint().unwrap();
3329 drop(savepoint3);
3330 assert!(tx.restore_savepoint(&savepoint4).is_err());
3331 {
3332 tx.open_table(table_def).unwrap();
3333 }
3334 tx.commit().unwrap();
3335
3336 let mut tx = db.begin_write().unwrap();
3337 tx.set_two_phase_commit(true);
3338 tx.restore_savepoint(&savepoint5).unwrap();
3339 tx.set_durability(Durability::None).unwrap();
3340 {
3341 tx.open_table(table_def).unwrap();
3342 }
3343 tx.commit().unwrap();
3344 }
3345
3346 #[test]
3347 fn small_pages3() {
3348 let tmpfile = crate::create_tempfile();
3349
3350 let db = Database::builder()
3351 .set_page_size(1024)
3352 .create(tmpfile.path())
3353 .unwrap();
3354
3355 let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
3356
3357 let mut tx = db.begin_write().unwrap();
3358 let _savepoint0 = tx.ephemeral_savepoint().unwrap();
3359 tx.set_durability(Durability::None).unwrap();
3360 {
3361 let mut t = tx.open_table(table_def).unwrap();
3362 let value = vec![0; 306];
3363 t.insert(&539717, value.as_slice()).unwrap();
3364 }
3365 tx.abort().unwrap();
3366
3367 let mut tx = db.begin_write().unwrap();
3368 let savepoint1 = tx.ephemeral_savepoint().unwrap();
3369 tx.restore_savepoint(&savepoint1).unwrap();
3370 tx.set_durability(Durability::None).unwrap();
3371 {
3372 let mut t = tx.open_table(table_def).unwrap();
3373 let value = vec![0; 2008];
3374 t.insert(&784384, value.as_slice()).unwrap();
3375 }
3376 tx.abort().unwrap();
3377 }
3378
3379 #[test]
3380 fn small_pages4() {
3381 let tmpfile = crate::create_tempfile();
3382
3383 let db = Database::builder()
3384 .set_cache_size(1024 * 1024)
3385 .set_page_size(1024)
3386 .create(tmpfile.path())
3387 .unwrap();
3388
3389 let table_def: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
3390
3391 let tx = db.begin_write().unwrap();
3392 {
3393 tx.open_table(table_def).unwrap();
3394 }
3395 tx.commit().unwrap();
3396
3397 let tx = db.begin_write().unwrap();
3398 {
3399 let mut t = tx.open_table(table_def).unwrap();
3400 assert!(t.get(&131072).unwrap().is_none());
3401 let value = vec![0xFF; 1130];
3402 t.insert(&42394, value.as_slice()).unwrap();
3403 t.insert_reserve(&744037, 3645).unwrap().as_mut().fill(0xFF);
3404 assert!(t.get(&0).unwrap().is_none());
3405 }
3406 tx.abort().unwrap();
3407
3408 let tx = db.begin_write().unwrap();
3409 {
3410 let mut t = tx.open_table(table_def).unwrap();
3411 t.insert_reserve(&118749, 734).unwrap().as_mut().fill(0xFF);
3412 }
3413 tx.abort().unwrap();
3414 }
3415
3416 #[test]
3417 fn dynamic_shrink() {
3418 let tmpfile = crate::create_tempfile();
3419 let table_definition: TableDefinition<u64, &[u8]> = TableDefinition::new("x");
3420 let big_value = vec![0u8; 1024];
3421
3422 let db = Database::builder()
3423 .set_region_size(1024 * 1024)
3424 .create(tmpfile.path())
3425 .unwrap();
3426
3427 let txn = db.begin_write().unwrap();
3428 {
3429 let mut table = txn.open_table(table_definition).unwrap();
3430 for i in 0..2048 {
3431 table.insert(&i, big_value.as_slice()).unwrap();
3432 }
3433 }
3434 txn.commit().unwrap();
3435
3436 let file_size = tmpfile.as_file().metadata().unwrap().len();
3437
3438 let txn = db.begin_write().unwrap();
3439 {
3440 let mut table = txn.open_table(table_definition).unwrap();
3441 for i in 0..2048 {
3442 table.remove(&i).unwrap();
3443 }
3444 }
3445 txn.commit().unwrap();
3446
3447 let txn = db.begin_write().unwrap();
3449 {
3450 let mut table = txn.open_table(table_definition).unwrap();
3451 table.insert(0, [].as_slice()).unwrap();
3452 }
3453 txn.commit().unwrap();
3454 let txn = db.begin_write().unwrap();
3455 {
3456 let mut table = txn.open_table(table_definition).unwrap();
3457 table.remove(0).unwrap();
3458 }
3459 txn.commit().unwrap();
3460 let txn = db.begin_write().unwrap();
3461 txn.commit().unwrap();
3462
3463 let final_file_size = tmpfile.as_file().metadata().unwrap().len();
3464 assert!(final_file_size < file_size);
3465 }
3466
3467 #[test]
3468 fn create_new_db_in_empty_file() {
3469 let tmpfile = crate::create_tempfile();
3470
3471 let _db = Database::builder()
3472 .create_file(tmpfile.into_file())
3473 .unwrap();
3474 }
3475
3476 #[test]
3477 fn open_missing_file() {
3478 let tmpfile = crate::create_tempfile();
3479
3480 let err = Database::builder()
3481 .open(tmpfile.path().with_extension("missing"))
3482 .unwrap_err();
3483
3484 match err {
3485 DatabaseError::Storage(StorageError::Io(err)) if err.kind() == ErrorKind::NotFound => {}
3486 err => panic!("Unexpected error for empty file: {err}"),
3487 }
3488 }
3489
3490 #[test]
3491 fn open_empty_file() {
3492 let tmpfile = crate::create_tempfile();
3493
3494 let err = Database::builder().open(tmpfile.path()).unwrap_err();
3495
3496 match err {
3497 DatabaseError::Storage(StorageError::FormatError { .. }) => {}
3498 err => panic!("Unexpected error for empty file: {err}"),
3499 }
3500 }
3501
3502 #[test]
3503 fn salvage_valid_database() {
3504 const T1: TableDefinition<&str, u64> = TableDefinition::new("users");
3505 const T2: TableDefinition<u64, &[u8]> = TableDefinition::new("blobs");
3506
3507 let src = crate::create_tempfile();
3508 let dst = crate::create_tempfile();
3509
3510 {
3512 let db = Database::create(src.path()).unwrap();
3513 let txn = db.begin_write().unwrap();
3514 {
3515 let mut t = txn.open_table(T1).unwrap();
3516 t.insert("alice", &1u64).unwrap();
3517 t.insert("bob", &2u64).unwrap();
3518 t.insert("charlie", &3u64).unwrap();
3519 }
3520 {
3521 let mut t = txn.open_table(T2).unwrap();
3522 t.insert(100u64, b"hello".as_slice()).unwrap();
3523 t.insert(200u64, b"world".as_slice()).unwrap();
3524 }
3525 txn.commit().unwrap();
3526 }
3527
3528 let report = Database::salvage(src.path(), dst.path()).unwrap();
3529
3530 assert_eq!(report.tables_found, 2);
3531 assert_eq!(report.tables_recovered, 2);
3532 assert!(
3533 report.rows_recovered >= 5,
3534 "expected >= 5 rows, got {rows}",
3535 rows = report.rows_recovered
3536 );
3537 assert_eq!(report.rows_lost, 0);
3538 assert!(report.corrupt_details.is_empty());
3539
3540 let db = Database::open(dst.path()).unwrap();
3542 let txn = db.begin_read().unwrap();
3543 {
3544 let raw: TableDefinition<&[u8], &[u8]> = TableDefinition::new("users");
3545 let t = txn.open_table(raw).unwrap();
3546 assert_eq!(t.len().unwrap(), 3);
3547 }
3548 {
3549 let raw: TableDefinition<&[u8], &[u8]> = TableDefinition::new("blobs");
3550 let t = txn.open_table(raw).unwrap();
3551 assert_eq!(t.len().unwrap(), 2);
3552 }
3553 }
3554
3555 #[test]
3556 fn salvage_empty_file_returns_error() {
3557 let src = crate::create_tempfile();
3558 let dst = crate::create_tempfile();
3559
3560 let result = Database::salvage(src.path(), dst.path());
3562 assert!(result.is_err());
3563 }
3564
3565 #[test]
3566 fn salvage_with_data_corruption() {
3567 const TABLE: TableDefinition<u64, &[u8]> = TableDefinition::new("data");
3568
3569 let src = crate::create_tempfile();
3570 let dst = crate::create_tempfile();
3571
3572 {
3574 let db = Database::create(src.path()).unwrap();
3575 let txn = db.begin_write().unwrap();
3576 {
3577 let mut t = txn.open_table(TABLE).unwrap();
3578 let payload = [0xABu8; 200];
3580 for i in 0..500u64 {
3581 t.insert(i, payload.as_slice()).unwrap();
3582 }
3583 }
3584 txn.commit().unwrap();
3585 }
3586
3587 {
3589 use std::io::{Seek, SeekFrom, Write};
3590 let mut f = std::fs::OpenOptions::new()
3591 .write(true)
3592 .open(src.path())
3593 .unwrap();
3594 let file_len = f.metadata().unwrap().len();
3595 let corrupt_offset = file_len / 3;
3597 f.seek(SeekFrom::Start(corrupt_offset)).unwrap();
3598 f.write_all(&[0xFF; 4096]).unwrap();
3599 f.sync_all().unwrap();
3600 }
3601
3602 let report = Database::salvage(src.path(), dst.path()).unwrap();
3604 assert!(
3606 report.rows_recovered > 0 || report.tables_found > 0,
3607 "expected some recovery, got: {report:?}"
3608 );
3609 }
3610
3611 #[test]
3612 fn online_compaction_reduces_file_size() {
3613 const TABLE: TableDefinition<u64, &[u8]> = TableDefinition::new("data");
3614
3615 let tmpfile = crate::create_tempfile();
3616 let db = Database::create(tmpfile.path()).unwrap();
3617
3618 let payload = [0xCDu8; 512];
3620 let txn = db.begin_write().unwrap();
3621 {
3622 let mut t = txn.open_table(TABLE).unwrap();
3623 for i in 0..500u64 {
3624 t.insert(i, payload.as_slice()).unwrap();
3625 }
3626 }
3627 txn.commit().unwrap();
3628
3629 let txn = db.begin_write().unwrap();
3631 {
3632 let mut t = txn.open_table(TABLE).unwrap();
3633 for i in 0..450u64 {
3634 t.remove(i).unwrap();
3635 }
3636 }
3637 txn.commit().unwrap();
3638
3639 let size_before = std::fs::metadata(tmpfile.path()).unwrap().len();
3640
3641 let handle = db.start_compaction().unwrap();
3643 let steps = handle.run().unwrap();
3644 assert!(steps > 0, "expected at least one compaction step");
3645
3646 let size_after = std::fs::metadata(tmpfile.path()).unwrap().len();
3647 assert!(
3648 size_after < size_before,
3649 "file should shrink: before={size_before}, after={size_after}"
3650 );
3651
3652 let txn = db.begin_read().unwrap();
3654 let t = txn.open_table(TABLE).unwrap();
3655 assert_eq!(t.len().unwrap(), 50);
3656 for i in 450..500u64 {
3657 let val = t.get(i).unwrap().unwrap();
3658 assert_eq!(val.value(), payload.as_slice());
3659 }
3660 }
3661
3662 #[test]
3663 fn online_compaction_allows_concurrent_reads() {
3664 const TABLE: TableDefinition<u64, u64> = TableDefinition::new("nums");
3665
3666 let tmpfile = crate::create_tempfile();
3667 let db = Database::create(tmpfile.path()).unwrap();
3668
3669 let txn = db.begin_write().unwrap();
3671 {
3672 let mut t = txn.open_table(TABLE).unwrap();
3673 for i in 0..100u64 {
3674 t.insert(i, &(i * 10)).unwrap();
3675 }
3676 }
3677 txn.commit().unwrap();
3678
3679 let txn = db.begin_write().unwrap();
3681 {
3682 let mut t = txn.open_table(TABLE).unwrap();
3683 for i in 0..50u64 {
3684 t.remove(i).unwrap();
3685 }
3686 }
3687 txn.commit().unwrap();
3688
3689 let read_txn = db.begin_read().unwrap();
3691 let read_table = read_txn.open_table(TABLE).unwrap();
3692
3693 let handle = db.start_compaction().unwrap();
3695 let progress = handle.step().unwrap();
3696 let _ = progress;
3698
3699 assert_eq!(read_table.len().unwrap(), 50);
3702 for i in 50..100u64 {
3703 let val = read_table.get(i).unwrap().unwrap();
3704 assert_eq!(val.value(), i * 10);
3705 }
3706 }
3707
3708 #[test]
3709 fn online_compaction_rejects_persistent_savepoint() {
3710 const TABLE: TableDefinition<u64, u64> = TableDefinition::new("sp_test");
3711
3712 let tmpfile = crate::create_tempfile();
3713 let db = Database::create(tmpfile.path()).unwrap();
3714
3715 let txn = db.begin_write().unwrap();
3717 {
3718 let mut t = txn.open_table(TABLE).unwrap();
3719 t.insert(1u64, &1u64).unwrap();
3720 }
3721 txn.commit().unwrap();
3722
3723 let txn = db.begin_write().unwrap();
3725 let _sp = txn.persistent_savepoint().unwrap();
3726 txn.commit().unwrap();
3727
3728 let result = db.start_compaction();
3730 assert!(result.is_err());
3731 }
3732}