1use crate::db::TransactionGuard;
2use crate::error::CommitError;
3use crate::multimap_table::ReadOnlyUntypedMultimapTable;
4use crate::sealed::Sealed;
5use crate::table::ReadOnlyUntypedTable;
6use crate::transaction_tracker::{SavepointId, TransactionId, TransactionTracker};
7use crate::tree_store::{
8 Btree, BtreeHeader, BtreeMut, InternalTableDefinition, MAX_PAIR_LENGTH, MAX_VALUE_LENGTH, Page,
9 PageHint, PageListMut, PageNumber, PageTrackerPolicy, SerializedSavepoint, ShrinkPolicy,
10 TableTree, TableTreeMut, TableType, TransactionalMemory,
11};
12use crate::types::{Key, Value};
13use crate::{
14 AccessGuard, AccessGuardMutInPlace, ExtractIf, MultimapTable, MultimapTableDefinition,
15 MultimapTableHandle, MutInPlaceValue, Range, ReadOnlyMultimapTable, ReadOnlyTable, Result,
16 Savepoint, SavepointError, SetDurabilityError, StorageError, Table, TableDefinition,
17 TableError, TableHandle, TransactionError, TypeName, UntypedMultimapTableHandle,
18 UntypedTableHandle,
19};
20#[cfg(feature = "logging")]
21use log::{debug, warn};
22use std::borrow::Borrow;
23use std::cmp::min;
24use std::collections::{BTreeMap, HashMap, HashSet};
25use std::fmt::{Debug, Display, Formatter};
26use std::marker::PhantomData;
27use std::mem::size_of;
28use std::ops::RangeBounds;
29use std::sync::atomic::{AtomicBool, Ordering};
30use std::sync::{Arc, Mutex};
31use std::{panic, thread};
32
33const MAX_PAGES_PER_COMPACTION: usize = 1_000_000;
34const NEXT_SAVEPOINT_TABLE: SystemTableDefinition<(), SavepointId> =
35 SystemTableDefinition::new("next_savepoint_id");
36pub(crate) const SAVEPOINT_TABLE: SystemTableDefinition<SavepointId, SerializedSavepoint> =
37 SystemTableDefinition::new("persistent_savepoints");
38pub(crate) const DATA_ALLOCATED_TABLE: SystemTableDefinition<
41 TransactionIdWithPagination,
42 PageList,
43> = SystemTableDefinition::new("data_pages_allocated");
44pub(crate) const DATA_FREED_TABLE: SystemTableDefinition<TransactionIdWithPagination, PageList> =
47 SystemTableDefinition::new("data_pages_unreachable");
48pub(crate) const SYSTEM_FREED_TABLE: SystemTableDefinition<TransactionIdWithPagination, PageList> =
51 SystemTableDefinition::new("system_pages_unreachable");
52pub(crate) const ALLOCATOR_STATE_TABLE_NAME: &str = "allocator_state";
55pub(crate) type AllocatorStateTree = Btree<AllocatorStateKey, &'static [u8]>;
56pub(crate) type AllocatorStateTreeMut<'a> = BtreeMut<'a, AllocatorStateKey, &'static [u8]>;
57pub(crate) type SystemFreedTree<'a> = BtreeMut<'a, TransactionIdWithPagination, PageList<'static>>;
58
59#[derive(Debug)]
63pub(crate) struct PageList<'a> {
64 data: &'a [u8],
65}
66
67impl PageList<'_> {
68 fn required_bytes(len: usize) -> usize {
69 2 + PageNumber::serialized_size() * len
70 }
71
72 pub(crate) fn len(&self) -> usize {
73 u16::from_le_bytes(self.data[..size_of::<u16>()].try_into().unwrap()).into()
74 }
75
76 pub(crate) fn get(&self, index: usize) -> PageNumber {
77 let start = size_of::<u16>() + PageNumber::serialized_size() * index;
78 PageNumber::from_le_bytes(
79 self.data[start..(start + PageNumber::serialized_size())]
80 .try_into()
81 .unwrap(),
82 )
83 }
84}
85
86impl Value for PageList<'_> {
87 type SelfType<'a>
88 = PageList<'a>
89 where
90 Self: 'a;
91 type AsBytes<'a>
92 = &'a [u8]
93 where
94 Self: 'a;
95
96 fn fixed_width() -> Option<usize> {
97 None
98 }
99
100 fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
101 where
102 Self: 'a,
103 {
104 PageList { data }
105 }
106
107 fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> &'b [u8]
108 where
109 Self: 'b,
110 {
111 value.data
112 }
113
114 fn type_name() -> TypeName {
115 TypeName::internal("redb::PageList")
116 }
117}
118
119impl MutInPlaceValue for PageList<'_> {
120 type BaseRefType = PageListMut;
121
122 fn initialize(data: &mut [u8]) {
123 assert!(data.len() >= 8);
124 data[..8].fill(0);
126 }
127
128 fn from_bytes_mut(data: &mut [u8]) -> &mut Self::BaseRefType {
129 unsafe { &mut *(std::ptr::from_mut::<[u8]>(data) as *mut PageListMut) }
130 }
131}
132
133#[derive(Debug)]
134pub(crate) struct TransactionIdWithPagination {
135 pub(crate) transaction_id: u64,
136 pub(crate) pagination_id: u64,
137}
138
139impl Value for TransactionIdWithPagination {
140 type SelfType<'a>
141 = TransactionIdWithPagination
142 where
143 Self: 'a;
144 type AsBytes<'a>
145 = [u8; 2 * size_of::<u64>()]
146 where
147 Self: 'a;
148
149 fn fixed_width() -> Option<usize> {
150 Some(2 * size_of::<u64>())
151 }
152
153 fn from_bytes<'a>(data: &'a [u8]) -> Self
154 where
155 Self: 'a,
156 {
157 let transaction_id = u64::from_le_bytes(data[..size_of::<u64>()].try_into().unwrap());
158 let pagination_id = u64::from_le_bytes(data[size_of::<u64>()..].try_into().unwrap());
159 Self {
160 transaction_id,
161 pagination_id,
162 }
163 }
164
165 fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> [u8; 2 * size_of::<u64>()]
166 where
167 Self: 'b,
168 {
169 let mut result = [0u8; 2 * size_of::<u64>()];
170 result[..size_of::<u64>()].copy_from_slice(&value.transaction_id.to_le_bytes());
171 result[size_of::<u64>()..].copy_from_slice(&value.pagination_id.to_le_bytes());
172 result
173 }
174
175 fn type_name() -> TypeName {
176 TypeName::internal("redb::TransactionIdWithPagination")
177 }
178}
179
180impl Key for TransactionIdWithPagination {
181 fn compare(data1: &[u8], data2: &[u8]) -> std::cmp::Ordering {
182 let value1 = Self::from_bytes(data1);
183 let value2 = Self::from_bytes(data2);
184
185 match value1.transaction_id.cmp(&value2.transaction_id) {
186 std::cmp::Ordering::Greater => std::cmp::Ordering::Greater,
187 std::cmp::Ordering::Equal => value1.pagination_id.cmp(&value2.pagination_id),
188 std::cmp::Ordering::Less => std::cmp::Ordering::Less,
189 }
190 }
191}
192
193#[derive(Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, Debug)]
194pub(crate) enum AllocatorStateKey {
195 Deprecated,
196 Region(u32),
197 RegionTracker,
198 TransactionId,
199}
200
201impl Value for AllocatorStateKey {
202 type SelfType<'a> = Self;
203 type AsBytes<'a> = [u8; 1 + size_of::<u32>()];
204
205 fn fixed_width() -> Option<usize> {
206 Some(1 + size_of::<u32>())
207 }
208
209 fn from_bytes<'a>(data: &'a [u8]) -> Self::SelfType<'a>
210 where
211 Self: 'a,
212 {
213 match data[0] {
214 0..=2 => Self::Deprecated,
216 3 => Self::Region(u32::from_le_bytes(data[1..].try_into().unwrap())),
217 4 => Self::RegionTracker,
218 5 => Self::TransactionId,
219 _ => unreachable!(),
220 }
221 }
222
223 fn as_bytes<'a, 'b: 'a>(value: &'a Self::SelfType<'b>) -> Self::AsBytes<'a>
224 where
225 Self: 'a,
226 Self: 'b,
227 {
228 let mut result = Self::AsBytes::default();
229 match value {
230 Self::Region(region) => {
231 result[0] = 3;
232 result[1..].copy_from_slice(&u32::to_le_bytes(*region));
233 }
234 Self::RegionTracker => {
235 result[0] = 4;
236 }
237 Self::TransactionId => {
238 result[0] = 5;
239 }
240 AllocatorStateKey::Deprecated => {
241 result[0] = 0;
242 }
243 }
244
245 result
246 }
247
248 fn type_name() -> TypeName {
249 TypeName::internal("redb::AllocatorStateKey")
250 }
251}
252
253impl Key for AllocatorStateKey {
254 fn compare(data1: &[u8], data2: &[u8]) -> std::cmp::Ordering {
255 Self::from_bytes(data1).cmp(&Self::from_bytes(data2))
256 }
257}
258
259pub struct SystemTableDefinition<'a, K: Key + 'static, V: Value + 'static> {
260 name: &'a str,
261 _key_type: PhantomData<K>,
262 _value_type: PhantomData<V>,
263}
264
265impl<'a, K: Key + 'static, V: Value + 'static> SystemTableDefinition<'a, K, V> {
266 pub const fn new(name: &'a str) -> Self {
267 assert!(!name.is_empty());
268 Self {
269 name,
270 _key_type: PhantomData,
271 _value_type: PhantomData,
272 }
273 }
274}
275
276impl<K: Key + 'static, V: Value + 'static> TableHandle for SystemTableDefinition<'_, K, V> {
277 fn name(&self) -> &str {
278 self.name
279 }
280}
281
282impl<K: Key, V: Value> Sealed for SystemTableDefinition<'_, K, V> {}
283
284impl<K: Key + 'static, V: Value + 'static> Clone for SystemTableDefinition<'_, K, V> {
285 fn clone(&self) -> Self {
286 *self
287 }
288}
289
290impl<K: Key + 'static, V: Value + 'static> Copy for SystemTableDefinition<'_, K, V> {}
291
292impl<K: Key + 'static, V: Value + 'static> Display for SystemTableDefinition<'_, K, V> {
293 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
294 write!(
295 f,
296 "{}<{}, {}>",
297 self.name,
298 K::type_name().name(),
299 V::type_name().name()
300 )
301 }
302}
303
304#[derive(Debug)]
306pub struct DatabaseStats {
307 pub(crate) tree_height: u32,
308 pub(crate) allocated_pages: u64,
309 pub(crate) leaf_pages: u64,
310 pub(crate) branch_pages: u64,
311 pub(crate) stored_leaf_bytes: u64,
312 pub(crate) metadata_bytes: u64,
313 pub(crate) fragmented_bytes: u64,
314 pub(crate) page_size: usize,
315}
316
317impl DatabaseStats {
318 pub fn tree_height(&self) -> u32 {
320 self.tree_height
321 }
322
323 pub fn allocated_pages(&self) -> u64 {
325 self.allocated_pages
326 }
327
328 pub fn leaf_pages(&self) -> u64 {
330 self.leaf_pages
331 }
332
333 pub fn branch_pages(&self) -> u64 {
335 self.branch_pages
336 }
337
338 pub fn stored_bytes(&self) -> u64 {
341 self.stored_leaf_bytes
342 }
343
344 pub fn metadata_bytes(&self) -> u64 {
346 self.metadata_bytes
347 }
348
349 pub fn fragmented_bytes(&self) -> u64 {
351 self.fragmented_bytes
352 }
353
354 pub fn page_size(&self) -> usize {
356 self.page_size
357 }
358}
359
360#[derive(Copy, Clone, Debug, PartialEq, serde::Serialize, serde::Deserialize)]
361#[non_exhaustive]
362pub enum Durability {
363 None,
366 Immediate,
369}
370
371#[derive(Copy, Clone, Debug, PartialEq, Eq)]
374enum InternalDurability {
375 None,
376 Immediate,
377}
378
379pub struct SystemTable<'db, 's, K: Key + 'static, V: Value + 'static> {
381 name: String,
382 namespace: &'s mut SystemNamespace<'db>,
383 tree: BtreeMut<'s, K, V>,
384 transaction_guard: Arc<TransactionGuard>,
385}
386
387impl<'db, 's, K: Key + 'static, V: Value + 'static> SystemTable<'db, 's, K, V> {
388 fn new(
389 name: &str,
390 table_root: Option<BtreeHeader>,
391 freed_pages: Arc<Mutex<Vec<PageNumber>>>,
392 guard: Arc<TransactionGuard>,
393 mem: Arc<TransactionalMemory>,
394 namespace: &'s mut SystemNamespace<'db>,
395 ) -> SystemTable<'db, 's, K, V> {
396 let ignore = Arc::new(Mutex::new(PageTrackerPolicy::Ignore));
399 SystemTable {
400 name: name.to_string(),
401 namespace,
402 tree: BtreeMut::new(table_root, guard.clone(), mem, freed_pages, ignore),
403 transaction_guard: guard,
404 }
405 }
406
407 fn get<'a>(&self, key: impl Borrow<K::SelfType<'a>>) -> Result<Option<AccessGuard<'_, V>>>
408 where
409 K: 'a,
410 {
411 self.tree.get(key.borrow())
412 }
413
414 fn range<'a, KR>(&self, range: impl RangeBounds<KR> + 'a) -> Result<Range<'_, K, V>>
415 where
416 K: 'a,
417 KR: Borrow<K::SelfType<'a>> + 'a,
418 {
419 self.tree
420 .range(&range)
421 .map(|x| Range::new(x, self.transaction_guard.clone()))
422 }
423
424 pub fn extract_from_if<'a, KR, F: for<'f> FnMut(K::SelfType<'f>, V::SelfType<'f>) -> bool>(
425 &mut self,
426 range: impl RangeBounds<KR> + 'a,
427 predicate: F,
428 ) -> Result<ExtractIf<'_, K, V, F>>
429 where
430 KR: Borrow<K::SelfType<'a>> + 'a,
431 {
432 self.tree
433 .extract_from_if(&range, predicate)
434 .map(ExtractIf::new)
435 }
436
437 pub fn insert<'k, 'v>(
438 &mut self,
439 key: impl Borrow<K::SelfType<'k>>,
440 value: impl Borrow<V::SelfType<'v>>,
441 ) -> Result<Option<AccessGuard<'_, V>>> {
442 let value_len = V::as_bytes(value.borrow()).as_ref().len();
443 if value_len > MAX_VALUE_LENGTH {
444 return Err(StorageError::ValueTooLarge(value_len));
445 }
446 let key_len = K::as_bytes(key.borrow()).as_ref().len();
447 if key_len > MAX_VALUE_LENGTH {
448 return Err(StorageError::ValueTooLarge(key_len));
449 }
450 if value_len + key_len > MAX_PAIR_LENGTH {
451 return Err(StorageError::ValueTooLarge(value_len + key_len));
452 }
453 self.tree.insert(key.borrow(), value.borrow())
454 }
455
456 pub fn remove<'a>(
457 &mut self,
458 key: impl Borrow<K::SelfType<'a>>,
459 ) -> Result<Option<AccessGuard<'_, V>>>
460 where
461 K: 'a,
462 {
463 self.tree.remove(key.borrow())
464 }
465}
466
467impl<K: Key + 'static, V: MutInPlaceValue + 'static> SystemTable<'_, '_, K, V> {
468 pub fn insert_reserve<'a>(
469 &mut self,
470 key: impl Borrow<K::SelfType<'a>>,
471 value_length: usize,
472 ) -> Result<AccessGuardMutInPlace<'_, V>> {
473 if value_length > MAX_VALUE_LENGTH {
474 return Err(StorageError::ValueTooLarge(value_length));
475 }
476 let key_len = K::as_bytes(key.borrow()).as_ref().len();
477 if key_len > MAX_VALUE_LENGTH {
478 return Err(StorageError::ValueTooLarge(key_len));
479 }
480 if value_length + key_len > MAX_PAIR_LENGTH {
481 return Err(StorageError::ValueTooLarge(value_length + key_len));
482 }
483 self.tree.insert_reserve(key.borrow(), value_length)
484 }
485}
486
487impl<K: Key + 'static, V: Value + 'static> Drop for SystemTable<'_, '_, K, V> {
488 fn drop(&mut self) {
489 self.namespace.close_table(
490 &self.name,
491 &self.tree,
492 self.tree.get_root().map(|x| x.length).unwrap_or_default(),
493 );
494 }
495}
496
497struct SystemNamespace<'db> {
498 table_tree: TableTreeMut<'db>,
499 freed_pages: Arc<Mutex<Vec<PageNumber>>>,
500 transaction_guard: Arc<TransactionGuard>,
501}
502
503impl<'db> SystemNamespace<'db> {
504 fn new(
505 root_page: Option<BtreeHeader>,
506 guard: Arc<TransactionGuard>,
507 mem: Arc<TransactionalMemory>,
508 ) -> Self {
509 let ignore = Arc::new(Mutex::new(PageTrackerPolicy::Ignore));
512 let freed_pages = Arc::new(Mutex::new(vec![]));
513 Self {
514 table_tree: TableTreeMut::new(
515 root_page,
516 guard.clone(),
517 mem,
518 freed_pages.clone(),
519 ignore,
520 ),
521 freed_pages,
522 transaction_guard: guard.clone(),
523 }
524 }
525
526 fn system_freed_pages(&self) -> Arc<Mutex<Vec<PageNumber>>> {
527 self.freed_pages.clone()
528 }
529
530 fn open_system_table<'txn, 's, K: Key + 'static, V: Value + 'static>(
531 &'s mut self,
532 transaction: &'txn WriteTransaction,
533 definition: SystemTableDefinition<K, V>,
534 ) -> Result<SystemTable<'db, 's, K, V>> {
535 let (root, _) = self
536 .table_tree
537 .get_or_create_table::<K, V>(definition.name(), TableType::Normal)
538 .map_err(|e| {
539 e.into_storage_error_or_corrupted("Internal error. System table is corrupted")
540 })?;
541 transaction.dirty.store(true, Ordering::Release);
542
543 Ok(SystemTable::new(
544 definition.name(),
545 root,
546 self.freed_pages.clone(),
547 self.transaction_guard.clone(),
548 transaction.mem.clone(),
549 self,
550 ))
551 }
552
553 fn close_table<K: Key + 'static, V: Value + 'static>(
554 &mut self,
555 name: &str,
556 table: &BtreeMut<K, V>,
557 length: u64,
558 ) {
559 self.table_tree
560 .stage_update_table_root(name, table.get_root(), length);
561 }
562}
563
564struct TableNamespace<'db> {
565 open_tables: HashMap<String, &'static panic::Location<'static>>,
566 allocated_pages: Arc<Mutex<PageTrackerPolicy>>,
567 freed_pages: Arc<Mutex<Vec<PageNumber>>>,
568 table_tree: TableTreeMut<'db>,
569}
570
571impl TableNamespace<'_> {
572 fn new(
573 root_page: Option<BtreeHeader>,
574 guard: Arc<TransactionGuard>,
575 mem: Arc<TransactionalMemory>,
576 ) -> Self {
577 let allocated = Arc::new(Mutex::new(PageTrackerPolicy::new_tracking()));
578 let freed_pages = Arc::new(Mutex::new(vec![]));
579 let table_tree = TableTreeMut::new(
580 root_page,
581 guard,
582 mem,
583 freed_pages.clone(),
586 allocated.clone(),
587 );
588 Self {
589 open_tables: Default::default(),
590 table_tree,
591 freed_pages,
592 allocated_pages: allocated,
593 }
594 }
595
596 fn set_dirty(&mut self, transaction: &WriteTransaction) {
597 transaction.dirty.store(true, Ordering::Release);
598 if !transaction.transaction_tracker.any_savepoint_exists() {
599 *self.allocated_pages.lock().unwrap() = PageTrackerPolicy::Ignore;
602 }
603 }
604
605 fn set_root(&mut self, root: Option<BtreeHeader>) {
606 assert!(self.open_tables.is_empty());
607 self.table_tree.set_root(root);
608 }
609
610 #[track_caller]
611 fn inner_open<K: Key + 'static, V: Value + 'static>(
612 &mut self,
613 name: &str,
614 table_type: TableType,
615 ) -> Result<(Option<BtreeHeader>, u64), TableError> {
616 if let Some(location) = self.open_tables.get(name) {
617 return Err(TableError::TableAlreadyOpen(name.to_string(), location));
618 }
619
620 let root = self
621 .table_tree
622 .get_or_create_table::<K, V>(name, table_type)?;
623 self.open_tables
624 .insert(name.to_string(), panic::Location::caller());
625
626 Ok(root)
627 }
628
629 #[track_caller]
630 pub fn open_multimap_table<'txn, K: Key + 'static, V: Key + 'static>(
631 &mut self,
632 transaction: &'txn WriteTransaction,
633 definition: MultimapTableDefinition<K, V>,
634 ) -> Result<MultimapTable<'txn, K, V>, TableError> {
635 #[cfg(feature = "logging")]
636 debug!("Opening multimap table: {definition}");
637 let (root, length) = self.inner_open::<K, V>(definition.name(), TableType::Multimap)?;
638 self.set_dirty(transaction);
639
640 Ok(MultimapTable::new(
641 definition.name(),
642 root,
643 length,
644 self.freed_pages.clone(),
645 self.allocated_pages.clone(),
646 transaction.mem.clone(),
647 transaction,
648 ))
649 }
650
651 #[track_caller]
652 pub fn open_table<'txn, K: Key + 'static, V: Value + 'static>(
653 &mut self,
654 transaction: &'txn WriteTransaction,
655 definition: TableDefinition<K, V>,
656 ) -> Result<Table<'txn, K, V>, TableError> {
657 #[cfg(feature = "logging")]
658 debug!("Opening table: {definition}");
659 let (root, _) = self.inner_open::<K, V>(definition.name(), TableType::Normal)?;
660 self.set_dirty(transaction);
661
662 Ok(Table::new(
663 definition.name(),
664 root,
665 self.freed_pages.clone(),
666 self.allocated_pages.clone(),
667 transaction.mem.clone(),
668 transaction,
669 ))
670 }
671
672 #[track_caller]
673 fn inner_rename(
674 &mut self,
675 name: &str,
676 new_name: &str,
677 table_type: TableType,
678 ) -> Result<(), TableError> {
679 if let Some(location) = self.open_tables.get(name) {
680 return Err(TableError::TableAlreadyOpen(name.to_string(), location));
681 }
682
683 self.table_tree.rename_table(name, new_name, table_type)
684 }
685
686 #[track_caller]
687 fn rename_table(
688 &mut self,
689 transaction: &WriteTransaction,
690 name: &str,
691 new_name: &str,
692 ) -> Result<(), TableError> {
693 #[cfg(feature = "logging")]
694 debug!("Renaming table: {name} to {new_name}");
695 self.set_dirty(transaction);
696 self.inner_rename(name, new_name, TableType::Normal)
697 }
698
699 #[track_caller]
700 fn rename_multimap_table(
701 &mut self,
702 transaction: &WriteTransaction,
703 name: &str,
704 new_name: &str,
705 ) -> Result<(), TableError> {
706 #[cfg(feature = "logging")]
707 debug!("Renaming multimap table: {name} to {new_name}");
708 self.set_dirty(transaction);
709 self.inner_rename(name, new_name, TableType::Multimap)
710 }
711
712 #[track_caller]
713 fn inner_delete(&mut self, name: &str, table_type: TableType) -> Result<bool, TableError> {
714 if let Some(location) = self.open_tables.get(name) {
715 return Err(TableError::TableAlreadyOpen(name.to_string(), location));
716 }
717
718 self.table_tree.delete_table(name, table_type)
719 }
720
721 #[track_caller]
722 fn delete_table(
723 &mut self,
724 transaction: &WriteTransaction,
725 name: &str,
726 ) -> Result<bool, TableError> {
727 #[cfg(feature = "logging")]
728 debug!("Deleting table: {name}");
729 self.set_dirty(transaction);
730 self.inner_delete(name, TableType::Normal)
731 }
732
733 #[track_caller]
734 fn delete_multimap_table(
735 &mut self,
736 transaction: &WriteTransaction,
737 name: &str,
738 ) -> Result<bool, TableError> {
739 #[cfg(feature = "logging")]
740 debug!("Deleting multimap table: {name}");
741 self.set_dirty(transaction);
742 self.inner_delete(name, TableType::Multimap)
743 }
744
745 pub(crate) fn close_table<K: Key + 'static, V: Value + 'static>(
746 &mut self,
747 name: &str,
748 table: &BtreeMut<K, V>,
749 length: u64,
750 ) {
751 self.open_tables.remove(name).unwrap();
752 self.table_tree
753 .stage_update_table_root(name, table.get_root(), length);
754 }
755}
756
757pub struct WriteTransaction {
761 transaction_tracker: Arc<TransactionTracker>,
762 mem: Arc<TransactionalMemory>,
763 transaction_guard: Arc<TransactionGuard>,
764 transaction_id: TransactionId,
765 tables: Mutex<TableNamespace<'static>>,
766 system_tables: Mutex<SystemNamespace<'static>>,
767 completed: bool,
768 dirty: AtomicBool,
769 durability: InternalDurability,
770 two_phase_commit: bool,
771 shrink_policy: ShrinkPolicy,
772 quick_repair: bool,
773 created_persistent_savepoints: Mutex<HashSet<SavepointId>>,
775 deleted_persistent_savepoints: Mutex<Vec<(SavepointId, TransactionId)>>,
776 wal_journal: Option<Arc<crate::column_family::wal::journal::WALJournal>>,
778 cf_name: Option<String>,
779 checkpoint_manager: Option<Arc<crate::column_family::wal::checkpoint::CheckpointManager>>,
780}
781
782impl WriteTransaction {
783 pub(crate) fn new(
784 guard: TransactionGuard,
785 transaction_tracker: Arc<TransactionTracker>,
786 mem: Arc<TransactionalMemory>,
787 ) -> Result<Self> {
788 let transaction_id = guard.id();
789 let guard = Arc::new(guard);
790
791 let root_page = mem.get_data_root();
792 let system_page = mem.get_system_root();
793
794 let tables = TableNamespace::new(root_page, guard.clone(), mem.clone());
795 let system_tables = SystemNamespace::new(system_page, guard.clone(), mem.clone());
796
797 Ok(Self {
798 transaction_tracker,
799 mem: mem.clone(),
800 transaction_guard: guard.clone(),
801 transaction_id,
802 tables: Mutex::new(tables),
803 system_tables: Mutex::new(system_tables),
804 completed: false,
805 dirty: AtomicBool::new(false),
806 durability: InternalDurability::Immediate,
807 two_phase_commit: false,
808 quick_repair: false,
809 shrink_policy: ShrinkPolicy::Default,
810 created_persistent_savepoints: Mutex::new(Default::default()),
811 deleted_persistent_savepoints: Mutex::new(vec![]),
812 wal_journal: None,
813 cf_name: None,
814 checkpoint_manager: None,
815 })
816 }
817
818 pub(crate) fn set_shrink_policy(&mut self, shrink_policy: ShrinkPolicy) {
819 self.shrink_policy = shrink_policy;
820 }
821
822 pub(crate) fn set_wal_context(
824 &mut self,
825 cf_name: String,
826 wal_journal: Arc<crate::column_family::wal::journal::WALJournal>,
827 checkpoint_manager: Option<Arc<crate::column_family::wal::checkpoint::CheckpointManager>>,
828 ) {
829 self.cf_name = Some(cf_name);
830 self.wal_journal = Some(wal_journal);
831 self.checkpoint_manager = checkpoint_manager;
832 }
833
834 pub fn disable_wal(&mut self) {
842 self.wal_journal = None;
843 self.checkpoint_manager = None;
844 }
845
846 pub(crate) fn pending_free_pages(&self) -> Result<bool> {
847 let mut system_tables = self.system_tables.lock().unwrap();
848 if system_tables
849 .open_system_table(self, DATA_FREED_TABLE)?
850 .tree
851 .get_root()
852 .is_some()
853 {
854 return Ok(true);
855 }
856 if system_tables
857 .open_system_table(self, SYSTEM_FREED_TABLE)?
858 .tree
859 .get_root()
860 .is_some()
861 {
862 return Ok(true);
863 }
864
865 Ok(false)
866 }
867
868 #[cfg(debug_assertions)]
869 pub fn print_allocated_page_debug(&self) {
870 let mut all_allocated: HashSet<PageNumber> =
871 HashSet::from_iter(self.mem.all_allocated_pages());
872
873 self.mem.debug_check_allocator_consistency();
874
875 let mut table_pages = vec![];
876 self.tables
877 .lock()
878 .unwrap()
879 .table_tree
880 .visit_all_pages(|path| {
881 table_pages.push(path.page_number());
882 Ok(())
883 })
884 .unwrap();
885 println!("Tables");
886 for p in table_pages {
887 assert!(all_allocated.remove(&p));
888 println!("{p:?}");
889 }
890
891 let mut system_table_pages = vec![];
892 self.system_tables
893 .lock()
894 .unwrap()
895 .table_tree
896 .visit_all_pages(|path| {
897 system_table_pages.push(path.page_number());
898 Ok(())
899 })
900 .unwrap();
901 println!("System tables");
902 for p in system_table_pages {
903 assert!(all_allocated.remove(&p));
904 println!("{p:?}");
905 }
906
907 {
908 println!("Pending free (in data freed table)");
909 let mut system_tables = self.system_tables.lock().unwrap();
910 let data_freed = system_tables
911 .open_system_table(self, DATA_FREED_TABLE)
912 .unwrap();
913 for entry in data_freed.range::<TransactionIdWithPagination>(..).unwrap() {
914 let (_, entry) = entry.unwrap();
915 let value = entry.value();
916 for i in 0..value.len() {
917 let p = value.get(i);
918 assert!(all_allocated.remove(&p));
919 println!("{p:?}");
920 }
921 }
922 }
923 {
924 println!("Pending free (in system freed table)");
925 let mut system_tables = self.system_tables.lock().unwrap();
926 let system_freed = system_tables
927 .open_system_table(self, SYSTEM_FREED_TABLE)
928 .unwrap();
929 for entry in system_freed
930 .range::<TransactionIdWithPagination>(..)
931 .unwrap()
932 {
933 let (_, entry) = entry.unwrap();
934 let value = entry.value();
935 for i in 0..value.len() {
936 let p = value.get(i);
937 assert!(all_allocated.remove(&p));
938 println!("{p:?}");
939 }
940 }
941 }
942 {
943 let tables = self.tables.lock().unwrap();
944 let pages = tables.freed_pages.lock().unwrap();
945 if !pages.is_empty() {
946 println!("Pages in in-memory data freed_pages");
947 for p in pages.iter() {
948 println!("{p:?}");
949 assert!(all_allocated.remove(p));
950 }
951 }
952 }
953 {
954 let system_tables = self.system_tables.lock().unwrap();
955 let pages = system_tables.freed_pages.lock().unwrap();
956 if !pages.is_empty() {
957 println!("Pages in in-memory system freed_pages");
958 for p in pages.iter() {
959 println!("{p:?}");
960 assert!(all_allocated.remove(p));
961 }
962 }
963 }
964 if !all_allocated.is_empty() {
965 println!("Leaked pages");
966 for p in all_allocated {
967 println!("{p:?}");
968 }
969 }
970 }
971
972 pub fn persistent_savepoint(&self) -> Result<u64, SavepointError> {
981 if self.durability != InternalDurability::Immediate {
982 return Err(SavepointError::InvalidSavepoint);
983 }
984
985 let mut savepoint = self.ephemeral_savepoint()?;
986
987 let mut system_tables = self.system_tables.lock().unwrap();
988
989 let mut next_table = system_tables.open_system_table(self, NEXT_SAVEPOINT_TABLE)?;
990 next_table.insert((), savepoint.get_id().next())?;
991 drop(next_table);
992
993 let mut savepoint_table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
994 savepoint_table.insert(
995 savepoint.get_id(),
996 SerializedSavepoint::from_savepoint(&savepoint),
997 )?;
998
999 savepoint.set_persistent();
1000
1001 self.created_persistent_savepoints
1002 .lock()
1003 .unwrap()
1004 .insert(savepoint.get_id());
1005
1006 Ok(savepoint.get_id().0)
1007 }
1008
1009 pub(crate) fn transaction_guard(&self) -> Arc<TransactionGuard> {
1010 self.transaction_guard.clone()
1011 }
1012
1013 pub(crate) fn next_persistent_savepoint_id(&self) -> Result<Option<SavepointId>> {
1014 let mut system_tables = self.system_tables.lock().unwrap();
1015 let next_table = system_tables.open_system_table(self, NEXT_SAVEPOINT_TABLE)?;
1016 let value = next_table.get(())?;
1017 if let Some(next_id) = value {
1018 Ok(Some(next_id.value()))
1019 } else {
1020 Ok(None)
1021 }
1022 }
1023
1024 pub fn get_persistent_savepoint(&self, id: u64) -> Result<Savepoint, SavepointError> {
1026 let mut system_tables = self.system_tables.lock().unwrap();
1027 let table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
1028 let value = table.get(SavepointId(id))?;
1029
1030 value
1031 .map(|x| x.value().to_savepoint(self.transaction_tracker.clone()))
1032 .ok_or(SavepointError::InvalidSavepoint)
1033 }
1034
1035 pub fn delete_persistent_savepoint(&self, id: u64) -> Result<bool, SavepointError> {
1042 if self.durability != InternalDurability::Immediate {
1043 return Err(SavepointError::InvalidSavepoint);
1044 }
1045 let mut system_tables = self.system_tables.lock().unwrap();
1046 let mut table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
1047 let savepoint = table.remove(SavepointId(id))?;
1048 if let Some(serialized) = savepoint {
1049 let savepoint = serialized
1050 .value()
1051 .to_savepoint(self.transaction_tracker.clone());
1052 self.deleted_persistent_savepoints
1053 .lock()
1054 .unwrap()
1055 .push((savepoint.get_id(), savepoint.get_transaction_id()));
1056 Ok(true)
1057 } else {
1058 Ok(false)
1059 }
1060 }
1061
1062 pub fn list_persistent_savepoints(&self) -> Result<impl Iterator<Item = u64>> {
1064 let mut system_tables = self.system_tables.lock().unwrap();
1065 let table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
1066 let mut savepoints = vec![];
1067 for savepoint in table.range::<SavepointId>(..)? {
1068 savepoints.push(savepoint?.0.value().0);
1069 }
1070 Ok(savepoints.into_iter())
1071 }
1072
1073 fn allocate_read_transaction(&self) -> Result<TransactionGuard> {
1075 let id = self
1076 .transaction_tracker
1077 .register_read_transaction(&self.mem)?;
1078
1079 Ok(TransactionGuard::new_read(
1080 id,
1081 self.transaction_tracker.clone(),
1082 ))
1083 }
1084
1085 fn allocate_savepoint(&self) -> Result<(SavepointId, TransactionId)> {
1086 let transaction_id = self.allocate_read_transaction()?.leak();
1087 let id = self.transaction_tracker.allocate_savepoint(transaction_id);
1088 Ok((id, transaction_id))
1089 }
1090
1091 pub fn ephemeral_savepoint(&self) -> Result<Savepoint, SavepointError> {
1097 if self.dirty.load(Ordering::Acquire) {
1098 return Err(SavepointError::InvalidSavepoint);
1099 }
1100
1101 let (id, transaction_id) = self.allocate_savepoint()?;
1102 #[cfg(feature = "logging")]
1103 debug!("Creating savepoint id={id:?}, txn_id={transaction_id:?}");
1104
1105 let root = self.mem.get_data_root();
1106 let savepoint = Savepoint::new_ephemeral(
1107 &self.mem,
1108 self.transaction_tracker.clone(),
1109 id,
1110 transaction_id,
1111 root,
1112 );
1113
1114 Ok(savepoint)
1115 }
1116
1117 pub fn restore_savepoint(&mut self, savepoint: &Savepoint) -> Result<(), SavepointError> {
1121 assert_eq!(
1123 std::ptr::from_ref(self.transaction_tracker.as_ref()),
1124 savepoint.db_address()
1125 );
1126
1127 if !self
1128 .transaction_tracker
1129 .is_valid_savepoint(savepoint.get_id())
1130 {
1131 return Err(SavepointError::InvalidSavepoint);
1132 }
1133 #[cfg(feature = "logging")]
1134 debug!(
1135 "Beginning savepoint restore (id={:?}) in transaction id={:?}",
1136 savepoint.get_id(),
1137 self.transaction_id
1138 );
1139 assert_eq!(self.mem.get_version(), savepoint.get_version());
1142 self.dirty.store(true, Ordering::Release);
1143
1144 {
1154 self.tables
1155 .lock()
1156 .unwrap()
1157 .set_root(savepoint.get_user_root());
1158 }
1159
1160 let txn_id = savepoint.get_transaction_id().next().raw_id();
1162 {
1163 let lower = TransactionIdWithPagination {
1164 transaction_id: txn_id,
1165 pagination_id: 0,
1166 };
1167 let mut system_tables = self.system_tables.lock().unwrap();
1168 let mut data_freed = system_tables.open_system_table(self, DATA_FREED_TABLE)?;
1169 for entry in data_freed.extract_from_if(lower.., |_, _| true)? {
1170 entry?;
1171 }
1172 }
1174
1175 {
1177 let tables = self.tables.lock().unwrap();
1178 let mut data_freed_pages = tables.freed_pages.lock().unwrap();
1179 let mut system_tables = self.system_tables.lock().unwrap();
1180 let data_allocated = system_tables.open_system_table(self, DATA_ALLOCATED_TABLE)?;
1181 let lower = TransactionIdWithPagination {
1182 transaction_id: txn_id,
1183 pagination_id: 0,
1184 };
1185 for entry in data_allocated.range(lower..)? {
1186 let (_, value) = entry?;
1187 for i in 0..value.value().len() {
1188 data_freed_pages.push(value.value().get(i));
1189 }
1190 }
1191 }
1192
1193 self.transaction_tracker
1196 .invalidate_savepoints_after(savepoint.get_id());
1197 for persistent_savepoint in self.list_persistent_savepoints()? {
1198 if persistent_savepoint > savepoint.get_id().0 {
1199 self.delete_persistent_savepoint(persistent_savepoint)?;
1200 }
1201 }
1202
1203 Ok(())
1204 }
1205
1206 pub fn set_durability(&mut self, durability: Durability) -> Result<(), SetDurabilityError> {
1212 let created = !self
1213 .created_persistent_savepoints
1214 .lock()
1215 .unwrap()
1216 .is_empty();
1217 let deleted = !self
1218 .deleted_persistent_savepoints
1219 .lock()
1220 .unwrap()
1221 .is_empty();
1222 if (created || deleted) && !matches!(durability, Durability::Immediate) {
1223 return Err(SetDurabilityError::PersistentSavepointModified);
1224 }
1225
1226 self.durability = match durability {
1227 Durability::None => InternalDurability::None,
1228 Durability::Immediate => InternalDurability::Immediate,
1229 };
1230
1231 Ok(())
1232 }
1233
1234 pub fn set_two_phase_commit(&mut self, enabled: bool) {
1274 self.two_phase_commit = enabled;
1275 }
1276
1277 pub fn set_quick_repair(&mut self, enabled: bool) {
1288 self.quick_repair = enabled;
1289 }
1290
1291 #[track_caller]
1295 pub fn open_table<'txn, K: Key + 'static, V: Value + 'static>(
1296 &'txn self,
1297 definition: TableDefinition<K, V>,
1298 ) -> Result<Table<'txn, K, V>, TableError> {
1299 self.tables.lock().unwrap().open_table(self, definition)
1300 }
1301
1302 #[track_caller]
1306 pub fn open_multimap_table<'txn, K: Key + 'static, V: Key + 'static>(
1307 &'txn self,
1308 definition: MultimapTableDefinition<K, V>,
1309 ) -> Result<MultimapTable<'txn, K, V>, TableError> {
1310 self.tables
1311 .lock()
1312 .unwrap()
1313 .open_multimap_table(self, definition)
1314 }
1315
1316 pub(crate) fn close_table<K: Key + 'static, V: Value + 'static>(
1317 &self,
1318 name: &str,
1319 table: &BtreeMut<K, V>,
1320 length: u64,
1321 ) {
1322 self.tables.lock().unwrap().close_table(name, table, length);
1323 }
1324
1325 pub fn rename_table(
1327 &self,
1328 definition: impl TableHandle,
1329 new_name: impl TableHandle,
1330 ) -> Result<(), TableError> {
1331 let name = definition.name().to_string();
1332 drop(definition);
1334 self.tables
1335 .lock()
1336 .unwrap()
1337 .rename_table(self, &name, new_name.name())
1338 }
1339
1340 pub fn rename_multimap_table(
1342 &self,
1343 definition: impl MultimapTableHandle,
1344 new_name: impl MultimapTableHandle,
1345 ) -> Result<(), TableError> {
1346 let name = definition.name().to_string();
1347 drop(definition);
1349 self.tables
1350 .lock()
1351 .unwrap()
1352 .rename_multimap_table(self, &name, new_name.name())
1353 }
1354
1355 pub fn delete_table(&self, definition: impl TableHandle) -> Result<bool, TableError> {
1359 let name = definition.name().to_string();
1360 drop(definition);
1362 self.tables.lock().unwrap().delete_table(self, &name)
1363 }
1364
1365 pub fn delete_multimap_table(
1369 &self,
1370 definition: impl MultimapTableHandle,
1371 ) -> Result<bool, TableError> {
1372 let name = definition.name().to_string();
1373 drop(definition);
1375 self.tables
1376 .lock()
1377 .unwrap()
1378 .delete_multimap_table(self, &name)
1379 }
1380
1381 pub fn list_tables(&self) -> Result<impl Iterator<Item = UntypedTableHandle> + '_> {
1383 self.tables
1384 .lock()
1385 .unwrap()
1386 .table_tree
1387 .list_tables(TableType::Normal)
1388 .map(|x| x.into_iter().map(UntypedTableHandle::new))
1389 }
1390
1391 pub fn list_multimap_tables(
1393 &self,
1394 ) -> Result<impl Iterator<Item = UntypedMultimapTableHandle> + '_> {
1395 self.tables
1396 .lock()
1397 .unwrap()
1398 .table_tree
1399 .list_tables(TableType::Multimap)
1400 .map(|x| x.into_iter().map(UntypedMultimapTableHandle::new))
1401 }
1402
1403 pub fn commit(mut self) -> Result<(), CommitError> {
1408 self.completed = true;
1410 self.commit_inner()
1411 }
1412
1413 fn commit_inner(&mut self) -> Result<(), CommitError> {
1414 if self.quick_repair {
1416 self.two_phase_commit = true;
1417 }
1418
1419 let (user_root, allocated_pages, data_freed) =
1420 self.tables.lock().unwrap().table_tree.flush_and_close()?;
1421
1422 let data_freed_clone = data_freed.clone();
1424 let allocated_pages_vec: Vec<_> = allocated_pages.iter().copied().collect();
1425
1426 self.store_data_freed_pages(data_freed)?;
1427 self.store_allocated_pages(allocated_pages.into_iter().collect())?;
1428
1429 let (system_root, post_commit_frees) = match self.durability {
1432 InternalDurability::None => {
1433 let (sys_root, frees) = self.prepare_system_root_for_non_durable_commit()?;
1434 (sys_root, Some(frees))
1435 }
1436 InternalDurability::Immediate => {
1437 if self.wal_journal.is_some() {
1438 let (sys_root, frees) = self.prepare_system_root_for_non_durable_commit()?;
1440 (sys_root, Some(frees))
1441 } else {
1442 let sys_root = self.prepare_system_root_for_durable_commit()?;
1444 (sys_root, None)
1445 }
1446 }
1447 };
1448
1449 if let (Some(wal_journal), Some(cf_name)) = (&self.wal_journal, &self.cf_name) {
1451 use crate::column_family::wal::entry::{WALEntry, WALTransactionPayload};
1452
1453 let payload = WALTransactionPayload {
1454 user_root: user_root.map(|h| (h.root, h.checksum, h.length)),
1455 system_root: system_root.map(|h| (h.root, h.checksum, h.length)),
1456 freed_pages: data_freed_clone,
1457 allocated_pages: allocated_pages_vec,
1458 durability: match self.durability {
1459 InternalDurability::None => Durability::None,
1460 InternalDurability::Immediate => Durability::Immediate,
1461 },
1462 };
1463
1464 let mut entry = WALEntry::new(cf_name.clone(), self.transaction_id.raw_id(), payload);
1465
1466 let sequence = wal_journal
1468 .append(&mut entry)
1469 .map_err(|e| CommitError::Storage(StorageError::from(e)))?;
1470
1471 wal_journal
1473 .wait_for_sync(sequence)
1474 .map_err(|e| CommitError::Storage(StorageError::from(e)))?;
1475
1476 if let Some(checkpoint_mgr) = &self.checkpoint_manager {
1478 checkpoint_mgr.register_pending(sequence);
1479 }
1480 }
1481
1482 #[cfg(feature = "logging")]
1483 debug!(
1484 "Committing transaction id={:?} with durability={:?} two_phase={} quick_repair={}",
1485 self.transaction_id, self.durability, self.two_phase_commit, self.quick_repair
1486 );
1487
1488 match self.durability {
1491 InternalDurability::None => {
1492 self.mem
1493 .non_durable_commit(user_root, system_root, self.transaction_id)?;
1494 self.transaction_tracker.register_non_durable_commit(
1495 self.transaction_id,
1496 self.mem.get_last_durable_transaction_id()?,
1497 );
1498 if let Some(frees) = post_commit_frees {
1499 for page in frees {
1500 self.mem.free(page, &mut PageTrackerPolicy::Ignore);
1501 }
1502 }
1503 }
1504 InternalDurability::Immediate => {
1505 if self.wal_journal.is_some() {
1506 self.mem
1508 .non_durable_commit(user_root, system_root, self.transaction_id)?;
1509 self.transaction_tracker.register_non_durable_commit(
1510 self.transaction_id,
1511 self.mem.get_last_durable_transaction_id()?,
1512 );
1513 if let Some(frees) = post_commit_frees {
1514 for page in frees {
1515 self.mem.free(page, &mut PageTrackerPolicy::Ignore);
1516 }
1517 }
1518 } else {
1519 self.mem.commit(
1522 user_root,
1523 system_root,
1524 self.transaction_id,
1525 self.two_phase_commit,
1526 self.shrink_policy,
1527 )?;
1528 self.transaction_tracker.clear_pending_non_durable_commits();
1529 let system_freed_pages =
1531 self.system_tables.lock().unwrap().system_freed_pages();
1532 for page in system_freed_pages.lock().unwrap().drain(..) {
1533 self.mem.free(page, &mut PageTrackerPolicy::Ignore);
1534 }
1535 }
1536 }
1537 }
1538
1539 for (savepoint, transaction) in self.deleted_persistent_savepoints.lock().unwrap().iter() {
1540 self.transaction_tracker
1541 .deallocate_savepoint(*savepoint, *transaction);
1542 }
1543
1544 assert!(
1545 self.system_tables
1546 .lock()
1547 .unwrap()
1548 .system_freed_pages()
1549 .lock()
1550 .unwrap()
1551 .is_empty()
1552 );
1553 assert!(
1554 self.tables
1555 .lock()
1556 .unwrap()
1557 .freed_pages
1558 .lock()
1559 .unwrap()
1560 .is_empty()
1561 );
1562
1563 #[cfg(feature = "logging")]
1564 debug!(
1565 "Finished commit of transaction id={:?}",
1566 self.transaction_id
1567 );
1568
1569 Ok(())
1570 }
1571
1572 fn store_data_freed_pages(&self, mut freed_pages: Vec<PageNumber>) -> Result {
1573 let mut system_tables = self.system_tables.lock().unwrap();
1574 let mut freed_table = system_tables.open_system_table(self, DATA_FREED_TABLE)?;
1575 let mut pagination_counter = 0;
1576 while !freed_pages.is_empty() {
1577 let chunk_size = 400;
1578 let buffer_size = PageList::required_bytes(chunk_size);
1579 let key = TransactionIdWithPagination {
1580 transaction_id: self.transaction_id.raw_id(),
1581 pagination_id: pagination_counter,
1582 };
1583 let mut access_guard = freed_table.insert_reserve(&key, buffer_size)?;
1584
1585 let len = freed_pages.len();
1586 access_guard.as_mut().clear();
1587 for page in freed_pages.drain(len - min(len, chunk_size)..) {
1588 debug_assert!(
1590 self.mem.is_allocated(page),
1591 "Page is not allocated: {page:?}"
1592 );
1593 debug_assert!(!self.mem.uncommitted(page), "Page is uncommitted: {page:?}");
1594 access_guard.as_mut().push_back(page);
1595 }
1596
1597 pagination_counter += 1;
1598 }
1599
1600 Ok(())
1601 }
1602
1603 fn store_allocated_pages(&self, mut data_allocated_pages: Vec<PageNumber>) -> Result {
1604 let mut system_tables = self.system_tables.lock().unwrap();
1605 let mut allocated_table = system_tables.open_system_table(self, DATA_ALLOCATED_TABLE)?;
1606 let mut pagination_counter = 0;
1607 while !data_allocated_pages.is_empty() {
1608 let chunk_size = 400;
1609 let buffer_size = PageList::required_bytes(chunk_size);
1610 let key = TransactionIdWithPagination {
1611 transaction_id: self.transaction_id.raw_id(),
1612 pagination_id: pagination_counter,
1613 };
1614 let mut access_guard = allocated_table.insert_reserve(&key, buffer_size)?;
1615
1616 let len = data_allocated_pages.len();
1617 access_guard.as_mut().clear();
1618 for page in data_allocated_pages.drain(len - min(len, chunk_size)..) {
1619 debug_assert!(
1623 self.mem.is_allocated(page),
1624 "Page is not allocated: {page:?}"
1625 );
1626 debug_assert!(self.mem.uncommitted(page), "Page is committed: {page:?}");
1627 access_guard.as_mut().push_back(page);
1628 }
1629
1630 pagination_counter += 1;
1631 }
1632
1633 let oldest = self
1635 .transaction_tracker
1636 .oldest_savepoint()
1637 .map_or(u64::MAX, |(_, x)| x.raw_id());
1638 let key = TransactionIdWithPagination {
1639 transaction_id: oldest,
1640 pagination_id: 0,
1641 };
1642 for entry in allocated_table.extract_from_if(..key, |_, _| true)? {
1643 entry?;
1644 }
1645
1646 Ok(())
1647 }
1648
1649 pub fn abort(mut self) -> Result {
1653 self.completed = true;
1655 self.abort_inner()
1656 }
1657
1658 fn abort_inner(&mut self) -> Result {
1659 #[cfg(feature = "logging")]
1660 debug!("Aborting transaction id={:?}", self.transaction_id);
1661 self.tables
1662 .lock()
1663 .unwrap()
1664 .table_tree
1665 .clear_root_updates_and_close();
1666 for savepoint in self.created_persistent_savepoints.lock().unwrap().iter() {
1667 match self.delete_persistent_savepoint(savepoint.0) {
1668 Ok(_) => {}
1669 Err(err) => match err {
1670 SavepointError::InvalidSavepoint => {
1671 unreachable!();
1672 }
1673 SavepointError::Storage(storage_err) => {
1674 return Err(storage_err);
1675 }
1676 },
1677 }
1678 }
1679 self.mem.rollback_uncommitted_writes()?;
1680 #[cfg(feature = "logging")]
1681 debug!("Finished abort of transaction id={:?}", self.transaction_id);
1682 Ok(())
1683 }
1684
1685 fn prepare_system_root_for_durable_commit(&mut self) -> Result<Option<BtreeHeader>> {
1689 let free_until_transaction = self
1690 .transaction_tracker
1691 .oldest_live_read_transaction()
1692 .map_or(self.transaction_id, |x| x.next());
1693 self.process_freed_pages(free_until_transaction)?;
1694
1695 let mut system_tables = self.system_tables.lock().unwrap();
1696 let system_freed_pages = system_tables.system_freed_pages();
1697 let system_tree = system_tables.table_tree.flush_table_root_updates()?;
1698 system_tree
1699 .delete_table(ALLOCATOR_STATE_TABLE_NAME, TableType::Normal)
1700 .map_err(|e| e.into_storage_error_or_corrupted("Unexpected TableError"))?;
1701
1702 if self.quick_repair {
1703 system_tree.create_table_and_flush_table_root(
1704 ALLOCATOR_STATE_TABLE_NAME,
1705 |system_tree_ref, tree: &mut AllocatorStateTreeMut| {
1706 let mut pagination_counter = 0;
1707
1708 loop {
1709 let num_regions = self
1710 .mem
1711 .reserve_allocator_state(tree, self.transaction_id)?;
1712
1713 self.store_system_freed_pages(
1717 system_tree_ref,
1718 system_freed_pages.clone(),
1719 None,
1720 &mut pagination_counter,
1721 )?;
1722
1723 if self.mem.try_save_allocator_state(tree, num_regions)? {
1724 return Ok(());
1725 }
1726
1727 while let Some(guards) = tree.last()? {
1732 let key = guards.0.value();
1733 drop(guards);
1734 tree.remove(&key)?;
1735 }
1736 }
1737 },
1738 )?;
1739 }
1740
1741 let system_root = system_tree.finalize_dirty_checksums()?;
1742 Ok(system_root)
1743 }
1744
1745 fn prepare_system_root_for_non_durable_commit(
1749 &mut self,
1750 ) -> Result<(Option<BtreeHeader>, Vec<PageNumber>)> {
1751 let mut free_until_transaction = self
1752 .transaction_tracker
1753 .oldest_live_read_nondurable_transaction()
1754 .map_or(self.transaction_id, |x| x.next());
1755 if let Some((_, oldest_savepoint)) = self.transaction_tracker.oldest_savepoint() {
1763 free_until_transaction = TransactionId::min(free_until_transaction, oldest_savepoint);
1764 }
1765 self.process_freed_pages_nondurable(free_until_transaction)?;
1766
1767 let mut post_commit_frees = vec![];
1768
1769 let system_root = {
1770 let mut system_tables = self.system_tables.lock().unwrap();
1771 let system_freed_pages = system_tables.system_freed_pages();
1772 system_tables.table_tree.flush_table_root_updates()?;
1773 for page in system_freed_pages
1774 .lock()
1775 .unwrap()
1776 .extract_if(.., |p| self.mem.unpersisted(*p))
1777 {
1778 post_commit_frees.push(page);
1779 }
1780 self.store_system_freed_pages(
1783 &mut system_tables.table_tree,
1784 system_freed_pages,
1785 Some(&mut post_commit_frees),
1786 &mut 0,
1787 )?;
1788
1789 system_tables
1790 .table_tree
1791 .flush_table_root_updates()?
1792 .finalize_dirty_checksums()?
1793 };
1794
1795 Ok((system_root, post_commit_frees))
1796 }
1797
1798 pub(crate) fn compact_pages(&mut self) -> Result<bool> {
1801 let mut progress = false;
1802
1803 let mut highest_pages = BTreeMap::new();
1805 let mut tables = self.tables.lock().unwrap();
1806 let table_tree = &mut tables.table_tree;
1807 table_tree.highest_index_pages(MAX_PAGES_PER_COMPACTION, &mut highest_pages)?;
1808 let mut system_tables = self.system_tables.lock().unwrap();
1809 let system_table_tree = &mut system_tables.table_tree;
1810 system_table_tree.highest_index_pages(MAX_PAGES_PER_COMPACTION, &mut highest_pages)?;
1811
1812 let mut relocation_map = HashMap::new();
1814 for path in highest_pages.into_values().rev() {
1815 if relocation_map.contains_key(&path.page_number()) {
1816 continue;
1817 }
1818 let old_page = self.mem.get_page(path.page_number())?;
1819 let mut new_page = self.mem.allocate_lowest(old_page.memory().len())?;
1820 let new_page_number = new_page.get_page_number();
1821 new_page.memory_mut()[0] = old_page.memory()[0];
1824 drop(new_page);
1825 if new_page_number < path.page_number() {
1827 relocation_map.insert(path.page_number(), new_page_number);
1828 for parent in path.parents() {
1829 if relocation_map.contains_key(parent) {
1830 continue;
1831 }
1832 let old_parent = self.mem.get_page(*parent)?;
1833 let mut new_page = self.mem.allocate_lowest(old_parent.memory().len())?;
1834 let new_page_number = new_page.get_page_number();
1835 new_page.memory_mut()[0] = old_parent.memory()[0];
1838 drop(new_page);
1839 relocation_map.insert(*parent, new_page_number);
1840 }
1841 } else {
1842 self.mem
1843 .free(new_page_number, &mut PageTrackerPolicy::Ignore);
1844 break;
1845 }
1846 }
1847
1848 if !relocation_map.is_empty() {
1849 progress = true;
1850 }
1851
1852 table_tree.relocate_tables(&relocation_map)?;
1853 system_table_tree.relocate_tables(&relocation_map)?;
1854
1855 Ok(progress)
1856 }
1857
1858 fn process_freed_pages(&mut self, free_until: TransactionId) -> Result {
1861 assert_eq!(PageNumber::serialized_size(), 8);
1863
1864 let mut system_tables = self.system_tables.lock().unwrap();
1866 {
1867 let mut data_freed = system_tables.open_system_table(self, DATA_FREED_TABLE)?;
1868 let key = TransactionIdWithPagination {
1869 transaction_id: free_until.raw_id(),
1870 pagination_id: 0,
1871 };
1872 for entry in data_freed.extract_from_if(..key, |_, _| true)? {
1873 let (_, page_list) = entry?;
1874 for i in 0..page_list.value().len() {
1875 self.mem
1876 .free(page_list.value().get(i), &mut PageTrackerPolicy::Ignore);
1877 }
1878 }
1879 }
1880
1881 {
1883 let mut system_freed = system_tables.open_system_table(self, SYSTEM_FREED_TABLE)?;
1884 let key = TransactionIdWithPagination {
1885 transaction_id: free_until.raw_id(),
1886 pagination_id: 0,
1887 };
1888 for entry in system_freed.extract_from_if(..key, |_, _| true)? {
1889 let (_, page_list) = entry?;
1890 for i in 0..page_list.value().len() {
1891 self.mem
1892 .free(page_list.value().get(i), &mut PageTrackerPolicy::Ignore);
1893 }
1894 }
1895 }
1896
1897 Ok(())
1898 }
1899
1900 fn process_freed_pages_nondurable_helper(
1901 &mut self,
1902 free_until: TransactionId,
1903 definition: SystemTableDefinition<TransactionIdWithPagination, PageList>,
1904 ) -> Result<Vec<TransactionId>> {
1905 let mut processed = vec![];
1906 let mut system_tables = self.system_tables.lock().unwrap();
1907
1908 let last_key = TransactionIdWithPagination {
1909 transaction_id: free_until.raw_id(),
1910 pagination_id: 0,
1911 };
1912 let oldest_unprocessed = self
1913 .transaction_tracker
1914 .oldest_unprocessed_non_durable_commit()
1915 .map_or(free_until.raw_id(), |x| x.raw_id());
1916 let first_key = TransactionIdWithPagination {
1917 transaction_id: oldest_unprocessed,
1918 pagination_id: 0,
1919 };
1920 let mut data_freed = system_tables.open_system_table(self, definition)?;
1921
1922 let mut candidate_transactions = vec![];
1923 for entry in data_freed.range(first_key..last_key)? {
1924 let (key, _) = entry?;
1925 let transaction_id = TransactionId::new(key.value().transaction_id);
1926 if self
1927 .transaction_tracker
1928 .is_unprocessed_non_durable_commit(transaction_id)
1929 {
1930 candidate_transactions.push(transaction_id);
1931 }
1932 }
1933 for transaction_id in candidate_transactions {
1934 let mut key = TransactionIdWithPagination {
1935 transaction_id: transaction_id.raw_id(),
1936 pagination_id: 0,
1937 };
1938 loop {
1939 let Some(entry) = data_freed.get(&key)? else {
1940 break;
1941 };
1942 let pages = entry.value();
1943 let mut new_pages = vec![];
1944 for i in 0..pages.len() {
1945 let page = pages.get(i);
1946 if !self
1947 .mem
1948 .free_if_unpersisted(page, &mut PageTrackerPolicy::Ignore)
1949 {
1950 new_pages.push(page);
1951 }
1952 }
1953 if new_pages.len() != pages.len() {
1954 drop(entry);
1955 if new_pages.is_empty() {
1956 data_freed.remove(&key)?;
1957 } else {
1958 let required = PageList::required_bytes(new_pages.len());
1959 let mut page_list_mut = data_freed.insert_reserve(&key, required)?;
1960 for page in new_pages {
1961 page_list_mut.as_mut().push_back(page);
1962 }
1963 }
1964 }
1965 key.pagination_id += 1;
1966 }
1967 processed.push(transaction_id);
1968 }
1969
1970 Ok(processed)
1971 }
1972
1973 fn process_freed_pages_nondurable(&mut self, free_until: TransactionId) -> Result {
1980 assert_eq!(PageNumber::serialized_size(), 8);
1982
1983 let mut processed =
1985 self.process_freed_pages_nondurable_helper(free_until, DATA_FREED_TABLE)?;
1986
1987 processed
1989 .extend(self.process_freed_pages_nondurable_helper(free_until, SYSTEM_FREED_TABLE)?);
1990
1991 for transaction_id in processed {
1992 self.transaction_tracker
1993 .mark_unprocessed_non_durable_commit(transaction_id);
1994 }
1995
1996 Ok(())
1997 }
1998
1999 fn store_system_freed_pages(
2000 &self,
2001 system_tree: &mut TableTreeMut,
2002 system_freed_pages: Arc<Mutex<Vec<PageNumber>>>,
2003 mut unpersisted_pages: Option<&mut Vec<PageNumber>>,
2004 pagination_counter: &mut u64,
2005 ) -> Result {
2006 assert_eq!(PageNumber::serialized_size(), 8); system_tree.open_table_and_flush_table_root(
2009 SYSTEM_FREED_TABLE.name(),
2010 |system_freed_tree: &mut SystemFreedTree| {
2011 while !system_freed_pages.lock().unwrap().is_empty() {
2012 let chunk_size = 200;
2013 let buffer_size = PageList::required_bytes(chunk_size);
2014 let key = TransactionIdWithPagination {
2015 transaction_id: self.transaction_id.raw_id(),
2016 pagination_id: *pagination_counter,
2017 };
2018 let mut access_guard = system_freed_tree.insert_reserve(&key, buffer_size)?;
2019
2020 let mut freed_pages = system_freed_pages.lock().unwrap();
2021 let len = freed_pages.len();
2022 access_guard.as_mut().clear();
2023 for page in freed_pages.drain(len - min(len, chunk_size)..) {
2024 if let Some(ref mut unpersisted_pages) = unpersisted_pages
2025 && self.mem.unpersisted(page)
2026 {
2027 unpersisted_pages.push(page);
2028 } else {
2029 access_guard.as_mut().push_back(page);
2030 }
2031 }
2032 drop(access_guard);
2033
2034 *pagination_counter += 1;
2035 }
2036 Ok(())
2037 },
2038 )?;
2039
2040 Ok(())
2041 }
2042
2043 pub fn stats(&self) -> Result<DatabaseStats> {
2045 let tables = self.tables.lock().unwrap();
2046 let table_tree = &tables.table_tree;
2047 let data_tree_stats = table_tree.stats()?;
2048
2049 let system_tables = self.system_tables.lock().unwrap();
2050 let system_table_tree = &system_tables.table_tree;
2051 let system_tree_stats = system_table_tree.stats()?;
2052
2053 let total_metadata_bytes = data_tree_stats.metadata_bytes()
2054 + system_tree_stats.metadata_bytes
2055 + system_tree_stats.stored_leaf_bytes;
2056 let total_fragmented = data_tree_stats.fragmented_bytes()
2057 + system_tree_stats.fragmented_bytes
2058 + self.mem.count_free_pages()? * (self.mem.get_page_size() as u64);
2059
2060 Ok(DatabaseStats {
2061 tree_height: data_tree_stats.tree_height(),
2062 allocated_pages: self.mem.count_allocated_pages()?,
2063 leaf_pages: data_tree_stats.leaf_pages(),
2064 branch_pages: data_tree_stats.branch_pages(),
2065 stored_leaf_bytes: data_tree_stats.stored_bytes(),
2066 metadata_bytes: total_metadata_bytes,
2067 fragmented_bytes: total_fragmented,
2068 page_size: self.mem.get_page_size(),
2069 })
2070 }
2071
2072 #[allow(dead_code)]
2073 pub(crate) fn print_debug(&self) -> Result {
2074 let mut tables = self.tables.lock().unwrap();
2076 if let Some(page) = tables
2077 .table_tree
2078 .flush_table_root_updates()
2079 .unwrap()
2080 .finalize_dirty_checksums()
2081 .unwrap()
2082 {
2083 eprintln!("Master tree:");
2084 let master_tree: Btree<&str, InternalTableDefinition> = Btree::new(
2085 Some(page),
2086 PageHint::None,
2087 self.transaction_guard.clone(),
2088 self.mem.clone(),
2089 )?;
2090 master_tree.print_debug(true)?;
2091 }
2092
2093 let mut system_tables = self.system_tables.lock().unwrap();
2095 if let Some(page) = system_tables
2096 .table_tree
2097 .flush_table_root_updates()
2098 .unwrap()
2099 .finalize_dirty_checksums()
2100 .unwrap()
2101 {
2102 eprintln!("System tree:");
2103 let master_tree: Btree<&str, InternalTableDefinition> = Btree::new(
2104 Some(page),
2105 PageHint::None,
2106 self.transaction_guard.clone(),
2107 self.mem.clone(),
2108 )?;
2109 master_tree.print_debug(true)?;
2110 }
2111
2112 Ok(())
2113 }
2114}
2115
2116impl Drop for WriteTransaction {
2117 fn drop(&mut self) {
2118 if !self.completed && !thread::panicking() && !self.mem.storage_failure() {
2119 #[allow(unused_variables)]
2120 if let Err(error) = self.abort_inner() {
2121 #[cfg(feature = "logging")]
2122 warn!("Failure automatically aborting transaction: {error}");
2123 }
2124 } else if !self.completed && self.mem.storage_failure() {
2125 self.tables
2126 .lock()
2127 .unwrap()
2128 .table_tree
2129 .clear_root_updates_and_close();
2130 }
2131 }
2132}
2133
2134pub struct ReadTransaction {
2138 mem: Arc<TransactionalMemory>,
2139 tree: TableTree,
2140}
2141
2142impl ReadTransaction {
2143 pub(crate) fn new(
2144 mem: Arc<TransactionalMemory>,
2145 guard: TransactionGuard,
2146 ) -> Result<Self, TransactionError> {
2147 let root_page = mem.get_data_root();
2148 let guard = Arc::new(guard);
2149 Ok(Self {
2150 mem: mem.clone(),
2151 tree: TableTree::new(root_page, PageHint::Clean, guard, mem)
2152 .map_err(TransactionError::Storage)?,
2153 })
2154 }
2155
2156 pub fn open_table<K: Key + 'static, V: Value + 'static>(
2158 &self,
2159 definition: TableDefinition<K, V>,
2160 ) -> Result<ReadOnlyTable<K, V>, TableError> {
2161 let header = self
2162 .tree
2163 .get_table::<K, V>(definition.name(), TableType::Normal)?
2164 .ok_or_else(|| TableError::TableDoesNotExist(definition.name().to_string()))?;
2165
2166 match header {
2167 InternalTableDefinition::Normal { table_root, .. } => Ok(ReadOnlyTable::new(
2168 definition.name().to_string(),
2169 table_root,
2170 PageHint::Clean,
2171 self.tree.transaction_guard().clone(),
2172 self.mem.clone(),
2173 )?),
2174 InternalTableDefinition::Multimap { .. } => unreachable!(),
2175 }
2176 }
2177
2178 pub fn open_untyped_table(
2180 &self,
2181 handle: impl TableHandle,
2182 ) -> Result<ReadOnlyUntypedTable, TableError> {
2183 let header = self
2184 .tree
2185 .get_table_untyped(handle.name(), TableType::Normal)?
2186 .ok_or_else(|| TableError::TableDoesNotExist(handle.name().to_string()))?;
2187
2188 match header {
2189 InternalTableDefinition::Normal {
2190 table_root,
2191 fixed_key_size,
2192 fixed_value_size,
2193 ..
2194 } => Ok(ReadOnlyUntypedTable::new(
2195 table_root,
2196 fixed_key_size,
2197 fixed_value_size,
2198 self.mem.clone(),
2199 )),
2200 InternalTableDefinition::Multimap { .. } => unreachable!(),
2201 }
2202 }
2203
2204 pub fn open_multimap_table<K: Key + 'static, V: Key + 'static>(
2206 &self,
2207 definition: MultimapTableDefinition<K, V>,
2208 ) -> Result<ReadOnlyMultimapTable<K, V>, TableError> {
2209 let header = self
2210 .tree
2211 .get_table::<K, V>(definition.name(), TableType::Multimap)?
2212 .ok_or_else(|| TableError::TableDoesNotExist(definition.name().to_string()))?;
2213
2214 match header {
2215 InternalTableDefinition::Normal { .. } => unreachable!(),
2216 InternalTableDefinition::Multimap {
2217 table_root,
2218 table_length,
2219 ..
2220 } => Ok(ReadOnlyMultimapTable::new(
2221 table_root,
2222 table_length,
2223 PageHint::Clean,
2224 self.tree.transaction_guard().clone(),
2225 self.mem.clone(),
2226 )?),
2227 }
2228 }
2229
2230 pub fn open_untyped_multimap_table(
2232 &self,
2233 handle: impl MultimapTableHandle,
2234 ) -> Result<ReadOnlyUntypedMultimapTable, TableError> {
2235 let header = self
2236 .tree
2237 .get_table_untyped(handle.name(), TableType::Multimap)?
2238 .ok_or_else(|| TableError::TableDoesNotExist(handle.name().to_string()))?;
2239
2240 match header {
2241 InternalTableDefinition::Normal { .. } => unreachable!(),
2242 InternalTableDefinition::Multimap {
2243 table_root,
2244 table_length,
2245 fixed_key_size,
2246 fixed_value_size,
2247 ..
2248 } => Ok(ReadOnlyUntypedMultimapTable::new(
2249 table_root,
2250 table_length,
2251 fixed_key_size,
2252 fixed_value_size,
2253 self.mem.clone(),
2254 )),
2255 }
2256 }
2257
2258 pub fn list_tables(&self) -> Result<impl Iterator<Item = UntypedTableHandle>> {
2260 self.tree
2261 .list_tables(TableType::Normal)
2262 .map(|x| x.into_iter().map(UntypedTableHandle::new))
2263 }
2264
2265 pub fn list_multimap_tables(&self) -> Result<impl Iterator<Item = UntypedMultimapTableHandle>> {
2267 self.tree
2268 .list_tables(TableType::Multimap)
2269 .map(|x| x.into_iter().map(UntypedMultimapTableHandle::new))
2270 }
2271
2272 pub fn close(self) -> Result<(), TransactionError> {
2280 if Arc::strong_count(self.tree.transaction_guard()) > 1 {
2281 return Err(TransactionError::ReadTransactionStillInUse(Box::new(self)));
2282 }
2283 Ok(())
2285 }
2286}
2287
2288impl Debug for ReadTransaction {
2289 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
2290 f.write_str("ReadTransaction")
2291 }
2292}
2293
2294#[cfg(test)]
2295mod test {
2296 use crate::{Database, TableDefinition};
2297
2298 const X: TableDefinition<&str, &str> = TableDefinition::new("x");
2299
2300 #[test]
2301 fn transaction_id_persistence() {
2302 let tmpfile = crate::create_tempfile();
2303 let db = Database::create(tmpfile.path()).unwrap();
2304 let write_txn = db.begin_write().unwrap();
2305 {
2306 let mut table = write_txn.open_table(X).unwrap();
2307 table.insert("hello", "world").unwrap();
2308 }
2309 let first_txn_id = write_txn.transaction_id;
2310 write_txn.commit().unwrap();
2311 drop(db);
2312
2313 let db2 = Database::create(tmpfile.path()).unwrap();
2314 let write_txn = db2.begin_write().unwrap();
2315 assert!(write_txn.transaction_id > first_txn_id);
2316 }
2317}