1use crate::error::CommitError;
2use crate::multimap_table::ReadOnlyUntypedMultimapTable;
3use crate::sealed::Sealed;
4use crate::table::ReadOnlyUntypedTable;
5use crate::transaction_tracker::{SavepointId, TransactionId, TransactionTracker};
6use crate::tree_store::{
7 Btree, BtreeMut, Checksum, FreedPageList, FreedTableKey, InternalTableDefinition, PageHint,
8 PageNumber, SerializedSavepoint, TableTree, TableType, TransactionalMemory, MAX_VALUE_LENGTH,
9};
10use crate::types::{RedbKey, RedbValue};
11use crate::{
12 AccessGuard, Database, MultimapTable, MultimapTableDefinition, MultimapTableHandle, Range,
13 ReadOnlyMultimapTable, ReadOnlyTable, Result, Savepoint, SavepointError, StorageError, Table,
14 TableDefinition, TableError, TableHandle, UntypedMultimapTableHandle, UntypedTableHandle,
15};
16#[cfg(feature = "logging")]
17use log::{info, warn};
18use std::borrow::Borrow;
19use std::cmp::min;
20use std::collections::{HashMap, HashSet};
21use std::fmt::{Display, Formatter};
22use std::marker::PhantomData;
23use std::ops::{RangeBounds, RangeFull};
24#[cfg(not(target_has_atomic = "64"))]
25use portable_atomic::{AtomicBool, Ordering};
26#[cfg(target_has_atomic = "64")]
27use std::sync::atomic::{AtomicBool, Ordering};
28use std::sync::{Arc, Mutex};
29use std::{panic, thread};
30
31const NEXT_SAVEPOINT_TABLE: SystemTableDefinition<(), SavepointId> =
32 SystemTableDefinition::new("next_savepoint_id");
33pub(crate) const SAVEPOINT_TABLE: SystemTableDefinition<SavepointId, SerializedSavepoint> =
34 SystemTableDefinition::new("persistent_savepoints");
35
36pub struct SystemTableDefinition<'a, K: RedbKey + 'static, V: RedbValue + 'static> {
37 name: &'a str,
38 _key_type: PhantomData<K>,
39 _value_type: PhantomData<V>,
40}
41
42impl<'a, K: RedbKey + 'static, V: RedbValue + 'static> SystemTableDefinition<'a, K, V> {
43 pub const fn new(name: &'a str) -> Self {
44 assert!(!name.is_empty());
45 Self {
46 name,
47 _key_type: PhantomData,
48 _value_type: PhantomData,
49 }
50 }
51}
52
53impl<'a, K: RedbKey + 'static, V: RedbValue + 'static> TableHandle
54 for SystemTableDefinition<'a, K, V>
55{
56 fn name(&self) -> &str {
57 self.name
58 }
59}
60
61impl<K: RedbKey, V: RedbValue> Sealed for SystemTableDefinition<'_, K, V> {}
62
63impl<'a, K: RedbKey + 'static, V: RedbValue + 'static> Clone for SystemTableDefinition<'a, K, V> {
64 fn clone(&self) -> Self {
65 *self
66 }
67}
68
69impl<'a, K: RedbKey + 'static, V: RedbValue + 'static> Copy for SystemTableDefinition<'a, K, V> {}
70
71impl<'a, K: RedbKey + 'static, V: RedbValue + 'static> Display for SystemTableDefinition<'a, K, V> {
72 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
73 write!(
74 f,
75 "{}<{}, {}>",
76 self.name,
77 K::type_name().name(),
78 V::type_name().name()
79 )
80 }
81}
82
83#[derive(Debug)]
85pub struct DatabaseStats {
86 pub(crate) tree_height: u32,
87 pub(crate) allocated_pages: u64,
88 pub(crate) leaf_pages: u64,
89 pub(crate) branch_pages: u64,
90 pub(crate) stored_leaf_bytes: u64,
91 pub(crate) metadata_bytes: u64,
92 pub(crate) fragmented_bytes: u64,
93 pub(crate) page_size: usize,
94}
95
96impl DatabaseStats {
97 pub fn tree_height(&self) -> u32 {
99 self.tree_height
100 }
101
102 pub fn allocated_pages(&self) -> u64 {
104 self.allocated_pages
105 }
106
107 pub fn leaf_pages(&self) -> u64 {
109 self.leaf_pages
110 }
111
112 pub fn branch_pages(&self) -> u64 {
114 self.branch_pages
115 }
116
117 pub fn stored_bytes(&self) -> u64 {
120 self.stored_leaf_bytes
121 }
122
123 pub fn metadata_bytes(&self) -> u64 {
125 self.metadata_bytes
126 }
127
128 pub fn fragmented_bytes(&self) -> u64 {
130 self.fragmented_bytes
131 }
132
133 pub fn page_size(&self) -> usize {
135 self.page_size
136 }
137}
138
139#[derive(Copy, Clone, Debug)]
140#[non_exhaustive]
141pub enum Durability {
142 None,
148 Eventual,
151 Immediate,
169 Paranoid,
192}
193
194pub struct SystemTable<'db, 's, K: RedbKey + 'static, V: RedbValue + 'static> {
196 name: String,
197 namespace: &'s mut SystemNamespace<'db>,
198 tree: BtreeMut<'s, K, V>,
199}
200
201impl<'db, 's, K: RedbKey + 'static, V: RedbValue + 'static> SystemTable<'db, 's, K, V> {
202 fn new(
203 name: &str,
204 table_root: Option<(PageNumber, Checksum)>,
205 freed_pages: Arc<Mutex<Vec<PageNumber>>>,
206 mem: &'db TransactionalMemory,
207 namespace: &'s mut SystemNamespace<'db>,
208 ) -> SystemTable<'db, 's, K, V> {
209 SystemTable {
210 name: name.to_string(),
211 namespace,
212 tree: BtreeMut::new(table_root, mem, freed_pages),
213 }
214 }
215
216 fn get<'a>(&self, key: impl Borrow<K::SelfType<'a>>) -> Result<Option<AccessGuard<V>>>
217 where
218 K: 'a,
219 {
220 self.tree.get(key.borrow())
221 }
222
223 fn range<'a, KR>(&self, range: impl RangeBounds<KR> + 'a) -> Result<Range<K, V>>
224 where
225 K: 'a,
226 KR: Borrow<K::SelfType<'a>> + 'a,
227 {
228 self.tree.range(&range).map(Range::new)
229 }
230
231 pub fn insert<'k, 'v>(
232 &mut self,
233 key: impl Borrow<K::SelfType<'k>>,
234 value: impl Borrow<V::SelfType<'v>>,
235 ) -> Result<Option<AccessGuard<V>>> {
236 let value_len = V::as_bytes(value.borrow()).as_ref().len();
237 if value_len > MAX_VALUE_LENGTH {
238 return Err(StorageError::ValueTooLarge(value_len));
239 }
240 let key_len = K::as_bytes(key.borrow()).as_ref().len();
241 if key_len > MAX_VALUE_LENGTH {
242 return Err(StorageError::ValueTooLarge(key_len));
243 }
244 self.tree.insert(key.borrow(), value.borrow())
245 }
246
247 pub fn remove<'a>(
248 &mut self,
249 key: impl Borrow<K::SelfType<'a>>,
250 ) -> Result<Option<AccessGuard<V>>>
251 where
252 K: 'a,
253 {
254 self.tree.remove(key.borrow())
255 }
256}
257
258impl<'db, 's, K: RedbKey + 'static, V: RedbValue + 'static> Drop for SystemTable<'db, 's, K, V> {
259 fn drop(&mut self) {
260 self.namespace.close_table(&self.name, &self.tree);
261 }
262}
263
264struct SystemNamespace<'db> {
265 table_tree: TableTree<'db>,
266}
267
268impl<'db> SystemNamespace<'db> {
269 fn open_system_table<'txn, 's, K: RedbKey + 'static, V: RedbValue + 'static>(
270 &'s mut self,
271 transaction: &'txn WriteTransaction<'db>,
272 definition: SystemTableDefinition<K, V>,
273 ) -> Result<SystemTable<'db, 's, K, V>> {
274 #[cfg(feature = "logging")]
275 info!("Opening system table: {}", definition);
276 let root = self
277 .table_tree
278 .get_or_create_table::<K, V>(definition.name(), TableType::Normal)
279 .map_err(|e| {
280 e.into_storage_error_or_corrupted("Internal error. System table is corrupted")
281 })?;
282 transaction.dirty.store(true, Ordering::Release);
283
284 Ok(SystemTable::new(
285 definition.name(),
286 root.get_root(),
287 transaction.freed_pages.clone(),
288 transaction.mem,
289 self,
290 ))
291 }
292
293 fn close_table<K: RedbKey + 'static, V: RedbValue + 'static>(
294 &mut self,
295 name: &str,
296 table: &BtreeMut<K, V>,
297 ) {
298 self.table_tree
299 .stage_update_table_root(name, table.get_root());
300 }
301}
302
303struct TableNamespace<'db> {
304 open_tables: HashMap<String, &'static panic::Location<'static>>,
305 table_tree: TableTree<'db>,
306}
307
308impl<'db> TableNamespace<'db> {
309 #[track_caller]
310 fn inner_open<K: RedbKey + 'static, V: RedbValue + 'static>(
311 &mut self,
312 name: &str,
313 table_type: TableType,
314 ) -> Result<Option<(PageNumber, Checksum)>, TableError> {
315 if let Some(location) = self.open_tables.get(name) {
316 return Err(TableError::TableAlreadyOpen(name.to_string(), location));
317 }
318
319 let internal_table = self
320 .table_tree
321 .get_or_create_table::<K, V>(name, table_type)?;
322 self.open_tables
323 .insert(name.to_string(), panic::Location::caller());
324
325 Ok(internal_table.get_root())
326 }
327
328 #[track_caller]
329 pub fn open_multimap_table<'txn, K: RedbKey + 'static, V: RedbKey + 'static>(
330 &mut self,
331 transaction: &'txn WriteTransaction<'db>,
332 definition: MultimapTableDefinition<K, V>,
333 ) -> Result<MultimapTable<'db, 'txn, K, V>, TableError> {
334 #[cfg(feature = "logging")]
335 info!("Opening multimap table: {}", definition);
336 let root = self.inner_open::<K, V>(definition.name(), TableType::Multimap)?;
337 transaction.dirty.store(true, Ordering::Release);
338
339 Ok(MultimapTable::new(
340 definition.name(),
341 root,
342 transaction.freed_pages.clone(),
343 transaction.mem,
344 transaction,
345 ))
346 }
347
348 #[track_caller]
349 pub fn open_table<'txn, K: RedbKey + 'static, V: RedbValue + 'static>(
350 &mut self,
351 transaction: &'txn WriteTransaction<'db>,
352 definition: TableDefinition<K, V>,
353 ) -> Result<Table<'db, 'txn, K, V>, TableError> {
354 #[cfg(feature = "logging")]
355 info!("Opening table: {}", definition);
356 let root = self.inner_open::<K, V>(definition.name(), TableType::Normal)?;
357 transaction.dirty.store(true, Ordering::Release);
358
359 Ok(Table::new(
360 definition.name(),
361 root,
362 transaction.freed_pages.clone(),
363 transaction.mem,
364 transaction,
365 ))
366 }
367
368 #[track_caller]
369 fn inner_delete(&mut self, name: &str, table_type: TableType) -> Result<bool, TableError> {
370 if let Some(location) = self.open_tables.get(name) {
371 return Err(TableError::TableAlreadyOpen(name.to_string(), location));
372 }
373
374 self.table_tree.delete_table(name, table_type)
375 }
376
377 #[track_caller]
378 fn delete_table<'txn>(
379 &mut self,
380 transaction: &'txn WriteTransaction<'db>,
381 name: &str,
382 ) -> Result<bool, TableError> {
383 #[cfg(feature = "logging")]
384 info!("Deleting table: {}", name);
385 transaction.dirty.store(true, Ordering::Release);
386 self.inner_delete(name, TableType::Normal)
387 }
388
389 #[track_caller]
390 fn delete_multimap_table<'txn>(
391 &mut self,
392 transaction: &'txn WriteTransaction<'db>,
393 name: &str,
394 ) -> Result<bool, TableError> {
395 #[cfg(feature = "logging")]
396 info!("Deleting multimap table: {}", name);
397 transaction.dirty.store(true, Ordering::Release);
398 self.inner_delete(name, TableType::Multimap)
399 }
400
401 pub(crate) fn close_table<K: RedbKey + 'static, V: RedbValue + 'static>(
402 &mut self,
403 name: &str,
404 table: &BtreeMut<K, V>,
405 ) {
406 self.open_tables.remove(name).unwrap();
407 self.table_tree
408 .stage_update_table_root(name, table.get_root());
409 }
410}
411
412pub struct WriteTransaction<'db> {
416 db: &'db Database,
417 transaction_tracker: Arc<Mutex<TransactionTracker>>,
418 mem: &'db TransactionalMemory,
419 transaction_id: TransactionId,
420 freed_tree: Mutex<BtreeMut<'db, FreedTableKey, FreedPageList<'static>>>,
423 freed_pages: Arc<Mutex<Vec<PageNumber>>>,
424 post_commit_frees: Arc<Mutex<Vec<PageNumber>>>,
427 tables: Mutex<TableNamespace<'db>>,
428 system_tables: Mutex<SystemNamespace<'db>>,
429 completed: bool,
430 dirty: AtomicBool,
431 durability: Durability,
432 created_persistent_savepoints: Mutex<HashSet<SavepointId>>,
434 deleted_persistent_savepoints: Mutex<Vec<(SavepointId, TransactionId)>>,
435}
436
437impl<'db> WriteTransaction<'db> {
438 pub(crate) fn new(
439 db: &'db Database,
440 transaction_tracker: Arc<Mutex<TransactionTracker>>,
441 ) -> Result<Self> {
442 let transaction_id = db.start_write_transaction();
443
444 let root_page = db.get_memory().get_data_root();
445 let system_page = db.get_memory().get_system_root();
446 let freed_root = db.get_memory().get_freed_root();
447 let freed_pages = Arc::new(Mutex::new(vec![]));
448 let post_commit_frees = Arc::new(Mutex::new(vec![]));
449
450 let tables = TableNamespace {
451 open_tables: Default::default(),
452 table_tree: TableTree::new(root_page, db.get_memory(), freed_pages.clone()),
453 };
454 let system_tables = SystemNamespace {
455 table_tree: TableTree::new(system_page, db.get_memory(), freed_pages.clone()),
456 };
457
458 Ok(Self {
459 db,
460 transaction_tracker,
461 mem: db.get_memory(),
462 transaction_id,
463 tables: Mutex::new(tables),
464 system_tables: Mutex::new(system_tables),
465 freed_tree: Mutex::new(BtreeMut::new(
466 freed_root,
467 db.get_memory(),
468 post_commit_frees.clone(),
469 )),
470 freed_pages,
471 post_commit_frees,
472 completed: false,
473 dirty: AtomicBool::new(false),
474 durability: Durability::Immediate,
475 created_persistent_savepoints: Mutex::new(Default::default()),
476 deleted_persistent_savepoints: Mutex::new(vec![]),
477 })
478 }
479
480 pub fn persistent_savepoint(&self) -> Result<u64, SavepointError> {
489 if !matches!(
490 self.durability,
491 Durability::Immediate | Durability::Paranoid
492 ) {
493 return Err(SavepointError::InvalidSavepoint);
494 }
495
496 let mut savepoint = self.ephemeral_savepoint()?;
497
498 let mut system_tables = self.system_tables.lock().unwrap();
499
500 let mut next_table = system_tables.open_system_table(self, NEXT_SAVEPOINT_TABLE)?;
501 next_table.insert((), savepoint.get_id().next())?;
502 drop(next_table);
503
504 let mut savepoint_table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
505 savepoint_table.insert(
506 savepoint.get_id(),
507 SerializedSavepoint::from_savepoint(&savepoint),
508 )?;
509
510 savepoint.set_persistent();
511
512 self.created_persistent_savepoints
513 .lock()
514 .unwrap()
515 .insert(savepoint.get_id());
516
517 Ok(savepoint.get_id().0)
518 }
519
520 pub(crate) fn next_persistent_savepoint_id(&self) -> Result<Option<SavepointId>> {
521 let mut system_tables = self.system_tables.lock().unwrap();
522 let next_table = system_tables.open_system_table(self, NEXT_SAVEPOINT_TABLE)?;
523 let value = next_table.get(())?;
524 if let Some(next_id) = value {
525 Ok(Some(next_id.value()))
526 } else {
527 Ok(None)
528 }
529 }
530
531 pub fn get_persistent_savepoint(&self, id: u64) -> Result<Savepoint, SavepointError> {
533 let mut system_tables = self.system_tables.lock().unwrap();
534 let table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
535 let value = table.get(SavepointId(id))?;
536
537 value
538 .map(|x| x.value().to_savepoint(self.transaction_tracker.clone()))
539 .ok_or(SavepointError::InvalidSavepoint)
540 }
541
542 pub fn delete_persistent_savepoint(&self, id: u64) -> Result<bool, SavepointError> {
549 if !matches!(
550 self.durability,
551 Durability::Immediate | Durability::Paranoid
552 ) {
553 return Err(SavepointError::InvalidSavepoint);
554 }
555 let mut system_tables = self.system_tables.lock().unwrap();
556 let mut table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
557 let savepoint = table.remove(SavepointId(id))?;
558 if let Some(serialized) = savepoint {
559 let savepoint = serialized
560 .value()
561 .to_savepoint(self.transaction_tracker.clone());
562 self.deleted_persistent_savepoints
563 .lock()
564 .unwrap()
565 .push((savepoint.get_id(), savepoint.get_transaction_id()));
566 Ok(true)
567 } else {
568 Ok(false)
569 }
570 }
571
572 pub fn list_persistent_savepoints(&self) -> Result<impl Iterator<Item = u64>> {
574 let mut system_tables = self.system_tables.lock().unwrap();
575 let table = system_tables.open_system_table(self, SAVEPOINT_TABLE)?;
576 let mut savepoints = vec![];
577 for savepoint in table.range::<SavepointId>(..)? {
578 savepoints.push(savepoint?.0.value().0);
579 }
580 Ok(savepoints.into_iter())
581 }
582
583 pub fn ephemeral_savepoint(&self) -> Result<Savepoint, SavepointError> {
589 if self.dirty.load(Ordering::Acquire) {
590 return Err(SavepointError::InvalidSavepoint);
591 }
592
593 let (id, transaction_id) = self.db.allocate_savepoint()?;
594 #[cfg(feature = "logging")]
595 info!(
596 "Creating savepoint id={:?}, txn_id={:?}",
597 id, transaction_id
598 );
599
600 let regional_allocators = self.mem.get_raw_allocator_states();
601 let root = self.mem.get_data_root();
602 let system_root = self.mem.get_system_root();
603 let freed_root = self.mem.get_freed_root();
604 let savepoint = Savepoint::new_ephemeral(
605 self.db.get_memory(),
606 self.transaction_tracker.clone(),
607 id,
608 transaction_id,
609 root,
610 system_root,
611 freed_root,
612 regional_allocators,
613 );
614
615 Ok(savepoint)
616 }
617
618 pub fn restore_savepoint(&mut self, savepoint: &Savepoint) -> Result<(), SavepointError> {
622 assert_eq!(
624 self.transaction_tracker.as_ref() as *const _,
625 savepoint.db_address()
626 );
627
628 if !self
629 .transaction_tracker
630 .lock()
631 .unwrap()
632 .is_valid_savepoint(savepoint.get_id())
633 {
634 return Err(SavepointError::InvalidSavepoint);
635 }
636 #[cfg(feature = "logging")]
637 info!(
638 "Beginning savepoint restore (id={:?}) in transaction id={:?}",
639 savepoint.get_id(),
640 self.transaction_id
641 );
642 assert_eq!(self.db.get_memory().get_version(), savepoint.get_version());
645 self.dirty.store(true, Ordering::Release);
646
647 let allocated_since_savepoint = self
648 .mem
649 .pages_allocated_since_raw_state(savepoint.get_regional_allocator_states());
650
651 let referenced_by_system_tree = self
653 .system_tables
654 .lock()
655 .unwrap()
656 .table_tree
657 .all_referenced_pages()?;
658
659 let mut freed_pages = vec![];
660 for page in allocated_since_savepoint {
661 if referenced_by_system_tree.contains(&page) {
662 continue;
663 }
664 if self.mem.uncommitted(page) {
665 self.mem.free(page);
666 } else {
667 freed_pages.push(page);
668 }
669 }
670 *self.freed_pages.lock().unwrap() = freed_pages;
671 self.tables.lock().unwrap().table_tree = TableTree::new(
672 savepoint.get_user_root(),
673 self.mem,
674 self.freed_pages.clone(),
675 );
676
677 let oldest_unprocessed_transaction = if let Some(entry) = self
680 .freed_tree
681 .lock()
682 .unwrap()
683 .range::<RangeFull, FreedTableKey>(&(..))?
684 .next()
685 {
686 entry?.key().transaction_id
687 } else {
688 self.transaction_id.raw_id()
689 };
690
691 let mut freed_tree = BtreeMut::new(
692 savepoint.get_freed_root(),
693 self.mem,
694 self.post_commit_frees.clone(),
695 );
696 let lookup_key = FreedTableKey {
697 transaction_id: oldest_unprocessed_transaction,
698 pagination_id: 0,
699 };
700 let mut to_remove = vec![];
701 for entry in freed_tree.range(&(..lookup_key))? {
702 to_remove.push(entry?.key());
703 }
704 for key in to_remove {
705 freed_tree.remove(&key)?;
706 }
707
708 *self.freed_tree.lock().unwrap() = freed_tree;
709
710 self.transaction_tracker
713 .lock()
714 .unwrap()
715 .invalidate_savepoints_after(savepoint.get_id());
716
717 for persistent_savepoint in self.list_persistent_savepoints()? {
718 if persistent_savepoint > savepoint.get_id().0 {
719 self.delete_persistent_savepoint(persistent_savepoint)?;
720 }
721 }
722
723 Ok(())
724 }
725
726 pub fn set_durability(&mut self, durability: Durability) {
731 let no_created = self
732 .created_persistent_savepoints
733 .lock()
734 .unwrap()
735 .is_empty();
736 let no_deleted = self
737 .deleted_persistent_savepoints
738 .lock()
739 .unwrap()
740 .is_empty();
741 assert!(no_created && no_deleted);
742 self.durability = durability;
743 }
744
745 #[track_caller]
749 pub fn open_table<'txn, K: RedbKey + 'static, V: RedbValue + 'static>(
750 &'txn self,
751 definition: TableDefinition<K, V>,
752 ) -> Result<Table<'db, 'txn, K, V>, TableError> {
753 self.tables.lock().unwrap().open_table(self, definition)
754 }
755
756 #[track_caller]
760 pub fn open_multimap_table<'txn, K: RedbKey + 'static, V: RedbKey + 'static>(
761 &'txn self,
762 definition: MultimapTableDefinition<K, V>,
763 ) -> Result<MultimapTable<'db, 'txn, K, V>, TableError> {
764 self.tables
765 .lock()
766 .unwrap()
767 .open_multimap_table(self, definition)
768 }
769
770 pub(crate) fn close_table<K: RedbKey + 'static, V: RedbValue + 'static>(
771 &self,
772 name: &str,
773 table: &BtreeMut<K, V>,
774 ) {
775 self.tables.lock().unwrap().close_table(name, table);
776 }
777
778 pub fn delete_table(&self, definition: impl TableHandle) -> Result<bool, TableError> {
782 let name = definition.name().to_string();
783 drop(definition);
785 self.tables.lock().unwrap().delete_table(self, &name)
786 }
787
788 pub fn delete_multimap_table(
792 &self,
793 definition: impl MultimapTableHandle,
794 ) -> Result<bool, TableError> {
795 let name = definition.name().to_string();
796 drop(definition);
798 self.tables
799 .lock()
800 .unwrap()
801 .delete_multimap_table(self, &name)
802 }
803
804 pub fn list_tables(&self) -> Result<impl Iterator<Item = UntypedTableHandle> + '_> {
806 self.tables
807 .lock()
808 .unwrap()
809 .table_tree
810 .list_tables(TableType::Normal)
811 .map(|x| x.into_iter().map(UntypedTableHandle::new))
812 }
813
814 pub fn list_multimap_tables(
816 &self,
817 ) -> Result<impl Iterator<Item = UntypedMultimapTableHandle> + '_> {
818 self.tables
819 .lock()
820 .unwrap()
821 .table_tree
822 .list_tables(TableType::Multimap)
823 .map(|x| x.into_iter().map(UntypedMultimapTableHandle::new))
824 }
825
826 pub fn commit(mut self) -> Result<(), CommitError> {
831 self.completed = true;
833 self.commit_inner()
834 }
835
836 fn commit_inner(&mut self) -> Result<(), CommitError> {
837 #[cfg(feature = "logging")]
838 info!(
839 "Committing transaction id={:?} with durability={:?}",
840 self.transaction_id, self.durability
841 );
842 match self.durability {
843 Durability::None => self.non_durable_commit()?,
844 Durability::Eventual => self.durable_commit(true, false)?,
845 Durability::Immediate => self.durable_commit(false, false)?,
846 Durability::Paranoid => self.durable_commit(false, true)?,
847 }
848
849 for (savepoint, transaction) in self.deleted_persistent_savepoints.lock().unwrap().iter() {
850 self.transaction_tracker
851 .lock()
852 .unwrap()
853 .deallocate_savepoint(*savepoint, *transaction);
854 }
855
856 #[cfg(feature = "logging")]
857 info!(
858 "Finished commit of transaction id={:?}",
859 self.transaction_id
860 );
861
862 Ok(())
863 }
864
865 pub fn abort(mut self) -> Result {
869 self.completed = true;
871 self.abort_inner()
872 }
873
874 fn abort_inner(&mut self) -> Result {
875 #[cfg(feature = "logging")]
876 info!("Aborting transaction id={:?}", self.transaction_id);
877 for savepoint in self.created_persistent_savepoints.lock().unwrap().iter() {
878 match self.delete_persistent_savepoint(savepoint.0) {
879 Ok(_) => {}
880 Err(err) => match err {
881 SavepointError::InvalidSavepoint => {
882 unreachable!();
883 }
884 SavepointError::Storage(storage_err) => {
885 return Err(storage_err);
886 }
887 },
888 }
889 }
890 self.tables
891 .lock()
892 .unwrap()
893 .table_tree
894 .clear_table_root_updates();
895 self.mem.rollback_uncommitted_writes()?;
896 #[cfg(feature = "logging")]
897 info!("Finished abort of transaction id={:?}", self.transaction_id);
898 Ok(())
899 }
900
901 pub(crate) fn durable_commit(&mut self, eventual: bool, two_phase: bool) -> Result {
902 let oldest_live_read = self
903 .transaction_tracker
904 .lock()
905 .unwrap()
906 .oldest_live_read_transaction()
907 .unwrap_or(self.transaction_id);
908
909 let user_root = self
910 .tables
911 .lock()
912 .unwrap()
913 .table_tree
914 .flush_table_root_updates()?;
915
916 let system_root = self
917 .system_tables
918 .lock()
919 .unwrap()
920 .table_tree
921 .flush_table_root_updates()?;
922
923 self.process_freed_pages(oldest_live_read)?;
924 let savepoint_exists = self
929 .transaction_tracker
930 .lock()
931 .unwrap()
932 .any_savepoint_exists();
933 self.store_freed_pages(savepoint_exists)?;
934
935 self.freed_tree.lock().unwrap().finalize_dirty_checksums()?;
938
939 let freed_root = self.freed_tree.lock().unwrap().get_root();
940
941 self.mem.commit(
942 user_root,
943 system_root,
944 freed_root,
945 self.transaction_id,
946 eventual,
947 two_phase,
948 )?;
949
950 self.transaction_tracker
952 .lock()
953 .unwrap()
954 .clear_pending_non_durable_commits();
955
956 for page in self.post_commit_frees.lock().unwrap().drain(..) {
959 self.mem.free(page);
960 }
961
962 Ok(())
963 }
964
965 pub(crate) fn non_durable_commit(&mut self) -> Result {
967 let user_root = self
968 .tables
969 .lock()
970 .unwrap()
971 .table_tree
972 .flush_table_root_updates()?;
973
974 let system_root = self
975 .system_tables
976 .lock()
977 .unwrap()
978 .table_tree
979 .flush_table_root_updates()?;
980
981 self.store_freed_pages(true)?;
984
985 self.freed_tree.lock().unwrap().finalize_dirty_checksums()?;
987
988 let freed_root = self.freed_tree.lock().unwrap().get_root();
989
990 self.mem
991 .non_durable_commit(user_root, system_root, freed_root, self.transaction_id)?;
992 self.transaction_tracker
995 .lock()
996 .unwrap()
997 .register_non_durable_commit(self.transaction_id);
998 Ok(())
999 }
1000
1001 pub(crate) fn compact_pages(&mut self) -> Result<bool> {
1004 let mut progress = false;
1005 if self.mem.relocate_region_tracker()? {
1007 progress = true;
1008 }
1009
1010 let mut tables = self.tables.lock().unwrap();
1012 let table_tree = &mut tables.table_tree;
1013 if table_tree.compact_tables()? {
1014 progress = true;
1015 }
1016
1017 Ok(progress)
1018 }
1019
1020 fn process_freed_pages(&mut self, oldest_live_read: TransactionId) -> Result {
1023 assert_eq!(PageNumber::serialized_size(), 8);
1025 let lookup_key = FreedTableKey {
1026 transaction_id: oldest_live_read.raw_id(),
1027 pagination_id: 0,
1028 };
1029
1030 let mut to_remove = vec![];
1031 let mut freed_tree = self.freed_tree.lock().unwrap();
1032 for entry in freed_tree.range(&(..lookup_key))? {
1033 let entry = entry?;
1034 to_remove.push(entry.key());
1035 let value = entry.value();
1036 for i in 0..value.len() {
1037 self.mem.free(value.get(i));
1038 }
1039 }
1040
1041 for key in to_remove {
1043 freed_tree.remove(&key)?;
1044 }
1045
1046 Ok(())
1047 }
1048
1049 fn store_freed_pages(&mut self, include_post_commit_free: bool) -> Result {
1050 assert_eq!(PageNumber::serialized_size(), 8); let mut pagination_counter = 0u64;
1053 let mut freed_tree = self.freed_tree.lock().unwrap();
1054 if include_post_commit_free {
1055 self.freed_pages
1058 .lock()
1059 .unwrap()
1060 .extend(self.post_commit_frees.lock().unwrap().drain(..));
1061 }
1062 while !self.freed_pages.lock().unwrap().is_empty() {
1063 let chunk_size = 100;
1064 let buffer_size = FreedPageList::required_bytes(chunk_size);
1065 let key = FreedTableKey {
1066 transaction_id: self.transaction_id.raw_id(),
1067 pagination_id: pagination_counter,
1068 };
1069 let mut access_guard =
1070 freed_tree.insert_reserve(&key, buffer_size.try_into().unwrap())?;
1071
1072 let mut freed_pages = self.freed_pages.lock().unwrap();
1073 let len = freed_pages.len();
1074 access_guard.as_mut().clear();
1075 for page in freed_pages.drain(len - min(len, chunk_size)..) {
1076 access_guard.as_mut().push_back(page);
1077 }
1078 drop(access_guard);
1079
1080 pagination_counter += 1;
1081
1082 if include_post_commit_free {
1083 freed_pages.extend(self.post_commit_frees.lock().unwrap().drain(..));
1086 }
1087 }
1088
1089 Ok(())
1090 }
1091
1092 pub fn stats(&self) -> Result<DatabaseStats> {
1094 let tables = self.tables.lock().unwrap();
1095 let table_tree = &tables.table_tree;
1096 let data_tree_stats = table_tree.stats()?;
1097 let freed_tree_stats = self.freed_tree.lock().unwrap().stats()?;
1098 let total_metadata_bytes = data_tree_stats.metadata_bytes()
1099 + freed_tree_stats.metadata_bytes
1100 + freed_tree_stats.stored_leaf_bytes;
1101 let total_fragmented =
1102 data_tree_stats.fragmented_bytes() + freed_tree_stats.fragmented_bytes;
1103
1104 Ok(DatabaseStats {
1105 tree_height: data_tree_stats.tree_height(),
1106 allocated_pages: self.mem.count_allocated_pages()?,
1107 leaf_pages: data_tree_stats.leaf_pages(),
1108 branch_pages: data_tree_stats.branch_pages(),
1109 stored_leaf_bytes: data_tree_stats.stored_bytes(),
1110 metadata_bytes: total_metadata_bytes,
1111 fragmented_bytes: total_fragmented,
1112 page_size: self.mem.get_page_size(),
1113 })
1114 }
1115
1116 #[allow(dead_code)]
1117 pub(crate) fn print_debug(&self) -> Result {
1118 if let Some(page) = self
1120 .tables
1121 .lock()
1122 .unwrap()
1123 .table_tree
1124 .flush_table_root_updates()
1125 .unwrap()
1126 {
1127 eprintln!("Master tree:");
1128 let master_tree: Btree<&str, InternalTableDefinition> =
1129 Btree::new(Some(page), PageHint::None, self.mem)?;
1130 master_tree.print_debug(true)?;
1131 }
1132
1133 Ok(())
1134 }
1135}
1136
1137impl<'a> Drop for WriteTransaction<'a> {
1138 fn drop(&mut self) {
1139 self.db.end_write_transaction(self.transaction_id);
1140 if !self.completed && !thread::panicking() && !self.mem.storage_failure() {
1141 #[allow(unused_variables)]
1142 if let Err(error) = self.abort_inner() {
1143 #[cfg(feature = "logging")]
1144 warn!("Failure automatically aborting transaction: {}", error);
1145 }
1146 }
1147 }
1148}
1149
1150pub struct ReadTransaction<'a> {
1154 transaction_tracker: Arc<Mutex<TransactionTracker>>,
1155 mem: &'a TransactionalMemory,
1156 tree: TableTree<'a>,
1157 transaction_id: TransactionId,
1158}
1159
1160impl<'db> ReadTransaction<'db> {
1161 pub(crate) fn new(
1162 mem: &'db TransactionalMemory,
1163 transaction_tracker: Arc<Mutex<TransactionTracker>>,
1164 transaction_id: TransactionId,
1165 ) -> Self {
1166 let root_page = mem.get_data_root();
1167 Self {
1168 transaction_tracker,
1169 mem,
1170 tree: TableTree::new(root_page, mem, Default::default()),
1171 transaction_id,
1172 }
1173 }
1174
1175 pub fn open_table<K: RedbKey + 'static, V: RedbValue + 'static>(
1177 &self,
1178 definition: TableDefinition<K, V>,
1179 ) -> Result<ReadOnlyTable<K, V>, TableError> {
1180 let header = self
1181 .tree
1182 .get_table::<K, V>(definition.name(), TableType::Normal)?
1183 .ok_or_else(|| TableError::TableDoesNotExist(definition.name().to_string()))?;
1184
1185 Ok(ReadOnlyTable::new(
1186 definition.name().to_string(),
1187 header.get_root(),
1188 PageHint::Clean,
1189 self.mem,
1190 )?)
1191 }
1192
1193 pub fn open_untyped_table(
1195 &self,
1196 handle: impl TableHandle,
1197 ) -> Result<ReadOnlyUntypedTable, TableError> {
1198 let header = self
1199 .tree
1200 .get_table_untyped(handle.name(), TableType::Normal)?
1201 .ok_or_else(|| TableError::TableDoesNotExist(handle.name().to_string()))?;
1202
1203 Ok(ReadOnlyUntypedTable::new(
1204 header.get_root(),
1205 header.get_fixed_key_size(),
1206 header.get_fixed_value_size(),
1207 self.mem,
1208 ))
1209 }
1210
1211 pub fn open_multimap_table<K: RedbKey + 'static, V: RedbKey + 'static>(
1213 &self,
1214 definition: MultimapTableDefinition<K, V>,
1215 ) -> Result<ReadOnlyMultimapTable<K, V>, TableError> {
1216 let header = self
1217 .tree
1218 .get_table::<K, V>(definition.name(), TableType::Multimap)?
1219 .ok_or_else(|| TableError::TableDoesNotExist(definition.name().to_string()))?;
1220
1221 Ok(ReadOnlyMultimapTable::new(
1222 header.get_root(),
1223 PageHint::Clean,
1224 self.mem,
1225 )?)
1226 }
1227
1228 pub fn open_untyped_multimap_table(
1230 &self,
1231 handle: impl MultimapTableHandle,
1232 ) -> Result<ReadOnlyUntypedMultimapTable, TableError> {
1233 let header = self
1234 .tree
1235 .get_table_untyped(handle.name(), TableType::Multimap)?
1236 .ok_or_else(|| TableError::TableDoesNotExist(handle.name().to_string()))?;
1237
1238 Ok(ReadOnlyUntypedMultimapTable::new(
1239 header.get_root(),
1240 header.get_fixed_key_size(),
1241 header.get_fixed_value_size(),
1242 self.mem,
1243 ))
1244 }
1245
1246 pub fn list_tables(&self) -> Result<impl Iterator<Item = UntypedTableHandle>> {
1248 self.tree
1249 .list_tables(TableType::Normal)
1250 .map(|x| x.into_iter().map(UntypedTableHandle::new))
1251 }
1252
1253 pub fn list_multimap_tables(&self) -> Result<impl Iterator<Item = UntypedMultimapTableHandle>> {
1255 self.tree
1256 .list_tables(TableType::Multimap)
1257 .map(|x| x.into_iter().map(UntypedMultimapTableHandle::new))
1258 }
1259}
1260
1261impl<'a> Drop for ReadTransaction<'a> {
1262 fn drop(&mut self) {
1263 self.transaction_tracker
1264 .lock()
1265 .unwrap()
1266 .deallocate_read_transaction(self.transaction_id);
1267 }
1268}
1269
1270#[cfg(test)]
1271mod test {
1272 use crate::{Database, TableDefinition};
1273
1274 const X: TableDefinition<&str, &str> = TableDefinition::new("x");
1275
1276 #[test]
1277 fn transaction_id_persistence() {
1278 let tmpfile = crate::create_tempfile();
1279 let db = Database::create(tmpfile.path()).unwrap();
1280 let write_txn = db.begin_write().unwrap();
1281 {
1282 let mut table = write_txn.open_table(X).unwrap();
1283 table.insert("hello", "world").unwrap();
1284 }
1285 let first_txn_id = write_txn.transaction_id;
1286 write_txn.commit().unwrap();
1287 drop(db);
1288
1289 let db2 = Database::create(tmpfile.path()).unwrap();
1290 let write_txn = db2.begin_write().unwrap();
1291 assert!(write_txn.transaction_id > first_txn_id);
1292 }
1293}