1#![warn(missing_docs)]
2
3use bitvec::prelude::*;
6use io::PagePool;
7use metrics::{Metric, Metrics};
8use std::{mem, sync::Arc};
9
10use merkle::{UpdatePool, Updater};
11use nomt_core::{
12 hasher::{NodeHasher, ValueHasher},
13 page_id::ROOT_PAGE_ID,
14 proof::PathProof,
15 trie::{InternalData, KeyPath, LeafData, Node, TERMINATOR},
16};
17use overlay::{LiveOverlay, OverlayMarker};
18use page_cache::PageCache;
19use parking_lot::{ArcRwLockReadGuard, Mutex, RwLock};
20use store::{Store, ValueTransaction};
21
22pub use io::IoUringPermission;
23pub use nomt_core::hasher;
24pub use nomt_core::proof;
25pub use nomt_core::trie;
26pub use nomt_core::witness::{
27 Witness, WitnessedOperations, WitnessedPath, WitnessedRead, WitnessedWrite,
28};
29pub use options::{Options, PanicOnSyncMode};
30pub use overlay::{InvalidAncestors, Overlay};
31pub use store::HashTableUtilization;
32
33#[cfg(any(feature = "benchmarks", feature = "fuzz"))]
35#[allow(missing_docs)]
36pub mod beatree;
37#[cfg(not(any(feature = "benchmarks", feature = "fuzz")))]
38mod beatree;
39
40mod bitbox;
41mod merkle;
42mod metrics;
43mod options;
44mod overlay;
45mod page_cache;
46mod page_diff;
47mod page_region;
48mod rollback;
49mod rw_pass_cell;
50mod seglog;
51mod store;
52mod sys;
53mod task;
54
55mod io;
56
57const MAX_COMMIT_CONCURRENCY: usize = 64;
58
59pub type Value = Vec<u8>;
61
62struct Shared {
63 root: Root,
65 last_commit_marker: Option<OverlayMarker>,
67}
68
69#[derive(Debug, Clone)]
71pub enum KeyReadWrite {
72 Read(Option<Value>),
74 Write(Option<Value>),
76 ReadThenWrite(Option<Value>, Option<Value>),
78}
79
80impl KeyReadWrite {
81 pub fn last_value(&self) -> Option<&[u8]> {
83 match self {
84 KeyReadWrite::Read(v) | KeyReadWrite::Write(v) | KeyReadWrite::ReadThenWrite(_, v) => {
85 v.as_deref()
86 }
87 }
88 }
89
90 pub fn is_write(&self) -> bool {
92 matches!(
93 self,
94 KeyReadWrite::Write(_) | KeyReadWrite::ReadThenWrite(_, _)
95 )
96 }
97
98 pub fn write(&mut self, new_value: Option<Value>) {
102 match *self {
103 KeyReadWrite::Read(ref mut value) => {
104 *self = KeyReadWrite::ReadThenWrite(mem::take(value), new_value);
105 }
106 KeyReadWrite::Write(ref mut value) => {
107 *value = new_value;
108 }
109 KeyReadWrite::ReadThenWrite(_, ref mut value) => {
110 *value = new_value;
111 }
112 }
113 }
114
115 pub fn read(&mut self, read_value: Option<Value>) {
119 match *self {
120 KeyReadWrite::Read(_) | KeyReadWrite::ReadThenWrite(_, _) => {}
121 KeyReadWrite::Write(ref mut value) => {
122 *self = KeyReadWrite::ReadThenWrite(read_value, mem::take(value));
123 }
124 }
125 }
126
127 fn to_compact<T: HashAlgorithm>(&self) -> crate::merkle::KeyReadWrite {
128 let hash = |v: &Value| T::hash_value(v);
129 match self {
130 KeyReadWrite::Read(_) => crate::merkle::KeyReadWrite::Read,
131 KeyReadWrite::Write(val) => crate::merkle::KeyReadWrite::Write(val.as_ref().map(hash)),
132 KeyReadWrite::ReadThenWrite(_, val) => {
133 crate::merkle::KeyReadWrite::ReadThenWrite(val.as_ref().map(hash))
134 }
135 }
136 }
137}
138
139#[derive(Clone, Copy, PartialEq, Eq)]
141#[cfg_attr(
142 feature = "borsh",
143 derive(borsh::BorshSerialize, borsh::BorshDeserialize)
144)]
145pub struct Root([u8; 32]);
146
147impl Root {
148 pub fn is_empty(&self) -> bool {
150 self.0 == trie::TERMINATOR
151 }
152
153 pub fn into_inner(self) -> [u8; 32] {
155 self.0
156 }
157}
158
159impl std::fmt::Display for Root {
160 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
161 for byte in &self.0[0..4] {
162 write!(f, "{:02x}", byte)?;
163 }
164
165 write!(f, "...")?;
166
167 for byte in &self.0[28..32] {
168 write!(f, "{:02x}", byte)?;
169 }
170 Ok(())
171 }
172}
173
174impl std::fmt::Debug for Root {
175 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
176 write!(f, "Root(")?;
177 for byte in &self.0 {
178 write!(f, "{:02x}", byte)?;
179 }
180 write!(f, ")")?;
181 Ok(())
182 }
183}
184
185impl AsRef<[u8]> for Root {
186 fn as_ref(&self) -> &[u8] {
187 self.0.as_ref()
188 }
189}
190
191impl From<[u8; 32]> for Root {
192 fn from(value: [u8; 32]) -> Self {
193 Self(value)
194 }
195}
196
197pub struct Nomt<T> {
199 merkle_update_pool: UpdatePool,
200 page_cache: PageCache,
202 page_pool: PagePool,
203 store: Store,
204 shared: Arc<Mutex<Shared>>,
205 access_lock: Arc<RwLock<()>>,
207 metrics: Metrics,
208 _marker: std::marker::PhantomData<T>,
209}
210
211impl<T: HashAlgorithm> Nomt<T> {
212 pub fn open(mut o: Options) -> anyhow::Result<Self> {
217 if o.commit_concurrency == 0 {
218 anyhow::bail!("commit concurrency must be greater than zero".to_string());
219 }
220
221 if o.commit_concurrency > MAX_COMMIT_CONCURRENCY {
222 o.commit_concurrency = MAX_COMMIT_CONCURRENCY;
223 }
224
225 let metrics = Metrics::new(o.metrics);
226
227 let page_pool = PagePool::new();
228 let store = Store::open(&o, page_pool.clone())?;
229 let root_page = store.load_page(ROOT_PAGE_ID)?;
230 let page_cache = PageCache::new(root_page, &o, metrics.clone());
231 let root = compute_root_node::<T>(&page_cache, &store);
232
233 if o.prepopulate_page_cache {
234 let io_handle = store.io_pool().make_handle();
235 merkle::prepopulate_cache(io_handle, &page_cache, &store, o.page_cache_upper_levels)?;
236 }
237
238 Ok(Self {
239 merkle_update_pool: UpdatePool::new(o.commit_concurrency, o.warm_up),
240 page_cache,
241 page_pool,
242 store,
243 shared: Arc::new(Mutex::new(Shared {
244 root: Root(root),
245 last_commit_marker: None,
246 })),
247 access_lock: Arc::new(RwLock::new(())),
248 metrics,
249 _marker: std::marker::PhantomData,
250 })
251 }
252
253 pub fn root(&self) -> Root {
255 self.shared.lock().root.clone()
256 }
257
258 pub fn is_empty(&self) -> bool {
260 self.root().is_empty()
261 }
262
263 #[doc(hidden)]
269 pub fn read(&self, path: KeyPath) -> anyhow::Result<Option<Value>> {
270 let _guard = self.access_lock.read();
271 self.store.load_value(path)
272 }
273
274 #[doc(hidden)]
276 pub fn sync_seqn(&self) -> u32 {
277 self.store.sync_seqn()
278 }
279
280 pub fn is_poisoned(&self) -> bool {
290 self.store.is_poisoned()
291 }
292
293 pub fn begin_session(&self, params: SessionParams) -> Session<T> {
307 let live_overlay = params.overlay;
308
309 let access_guard = params
313 .take_global_guard
314 .then(|| RwLock::read_arc(&self.access_lock));
315
316 let store = self.store.clone();
317 let rollback_delta = if params.record_rollback_delta {
318 self.store
319 .rollback()
320 .map(|r| r.delta_builder(&store, &live_overlay))
321 } else {
322 None
323 };
324
325 let prev_root = live_overlay
326 .parent_root()
327 .unwrap_or_else(|| self.root().into_inner());
328
329 Session {
330 store,
331 merkle_updater: self.merkle_update_pool.begin::<T>(
332 self.page_cache.clone(),
333 self.page_pool.clone(),
334 self.store.clone(),
335 live_overlay.clone(),
336 prev_root,
337 ),
338 metrics: self.metrics.clone(),
339 rollback_delta,
340 overlay: live_overlay,
341 witness_mode: params.witness,
342 access_guard,
343 prev_root: Root(prev_root),
344 _marker: std::marker::PhantomData,
345 }
346 }
347
348 pub fn rollback(&self, n: usize) -> anyhow::Result<()> {
355 if n == 0 {
356 return Ok(());
357 }
358
359 let _write_guard = self.access_lock.write();
360
361 let Some(rollback) = self.store.rollback() else {
362 anyhow::bail!("rollback: not enabled");
363 };
364 let Some(traceback) = rollback.truncate(n)? else {
365 anyhow::bail!("rollback: not enough logged for rolling back");
366 };
367
368 let mut session_params = SessionParams::default();
372 session_params.record_rollback_delta = false;
373
374 session_params.take_global_guard = false;
376 let sess = self.begin_session(session_params);
377
378 let mut actuals = Vec::new();
380 for (key, value) in traceback {
381 sess.warm_up(key);
382 let value = KeyReadWrite::Write(value);
383 actuals.push((key, value));
384 }
385
386 sess.finish(actuals)?.commit(&self)?;
387
388 Ok(())
389 }
390
391 #[doc(hidden)]
394 pub fn metrics(&self) -> Metrics {
395 self.metrics.clone()
396 }
397
398 pub fn hash_table_utilization(&self) -> HashTableUtilization {
400 self.store.hash_table_utilization()
401 }
402}
403
404pub struct WitnessMode(bool);
406
407impl WitnessMode {
408 pub fn read_write() -> Self {
410 WitnessMode(true)
411 }
412
413 pub fn disabled() -> Self {
415 WitnessMode(false)
416 }
417}
418
419pub struct SessionParams {
421 record_rollback_delta: bool,
423 take_global_guard: bool,
425
426 witness: WitnessMode,
427 overlay: LiveOverlay,
428}
429
430impl Default for SessionParams {
431 fn default() -> Self {
432 SessionParams {
433 record_rollback_delta: true,
434 take_global_guard: true,
435 witness: WitnessMode::disabled(),
436 overlay: LiveOverlay::new(None).unwrap(),
438 }
439 }
440}
441
442impl SessionParams {
443 pub fn witness_mode(mut self, witness: WitnessMode) -> Self {
448 self.witness = witness;
449 self
450 }
451
452 pub fn overlay<'a>(
463 mut self,
464 ancestors: impl IntoIterator<Item = &'a Overlay>,
465 ) -> Result<Self, InvalidAncestors> {
466 self.overlay = LiveOverlay::new(ancestors)?;
467 Ok(self)
468 }
469}
470
471pub struct Session<T> {
479 store: Store,
480 merkle_updater: Updater,
481 metrics: Metrics,
482 rollback_delta: Option<rollback::ReverseDeltaBuilder>,
483 overlay: LiveOverlay,
484 witness_mode: WitnessMode,
485 access_guard: Option<ArcRwLockReadGuard<parking_lot::RawRwLock, ()>>,
488 prev_root: Root,
489 _marker: std::marker::PhantomData<T>,
490}
491
492impl<T> Session<T> {
493 pub fn warm_up(&self, path: KeyPath) {
505 self.merkle_updater.warm_up(path);
506 }
507
508 pub fn read(&self, path: KeyPath) -> anyhow::Result<Option<Value>> {
512 let _maybe_guard = self.metrics.record(Metric::ValueFetchTime);
513 if let Some(value_change) = self.overlay.value(&path) {
514 return Ok(value_change.as_option().map(|v| v.to_vec()));
515 }
516 self.store.load_value(path)
517 }
518
519 pub fn prev_root(&self) -> Root {
521 self.prev_root
522 }
523
524 pub fn preserve_prior_value(&self, path: KeyPath) {
548 if let Some(rollback) = &self.rollback_delta {
549 rollback.tentative_preserve_prior(path);
550 }
551 }
552}
553
554impl<T: HashAlgorithm> Session<T> {
555 pub fn prove(&self, path: KeyPath) -> anyhow::Result<PathProof> {
562 Ok(self.merkle_updater.prove::<T>(path)?)
563 }
564
565 pub fn finish(
570 mut self,
571 actuals: Vec<(KeyPath, KeyReadWrite)>,
572 ) -> anyhow::Result<FinishedSession> {
573 if cfg!(debug_assertions) {
574 for i in 1..actuals.len() {
576 assert!(
577 actuals[i].0 > actuals[i - 1].0,
578 "actuals are not sorted at index {}",
579 i
580 );
581 }
582 }
583 let rollback_delta = self
584 .rollback_delta
585 .take()
586 .map(|delta_builder| delta_builder.finalize(&actuals));
587
588 let mut compact_actuals = Vec::with_capacity(actuals.len());
589 for (path, read_write) in &actuals {
590 compact_actuals.push((path.clone(), read_write.to_compact::<T>()));
591 }
592
593 let merkle_update_handle = self
594 .merkle_updater
595 .update_and_prove::<T>(compact_actuals, self.witness_mode.0)?;
596
597 let mut tx = self.store.new_value_tx();
598 for (path, read_write) in actuals {
599 if let KeyReadWrite::Write(value) | KeyReadWrite::ReadThenWrite(_, value) = read_write {
600 tx.write_value::<T>(path, value);
601 }
602 }
603
604 let merkle_output = merkle_update_handle.join()?;
605 Ok(FinishedSession {
606 value_transaction: tx,
607 merkle_output,
608 rollback_delta,
609 parent_overlay: self.overlay,
610 prev_root: self.prev_root,
611 take_global_guard: self.access_guard.is_some(),
612 })
613 }
614}
615
616pub struct FinishedSession {
624 value_transaction: ValueTransaction,
625 merkle_output: merkle::Output,
626 rollback_delta: Option<rollback::Delta>,
627 parent_overlay: LiveOverlay,
628 prev_root: Root,
629 take_global_guard: bool,
631}
632
633impl FinishedSession {
634 pub fn root(&self) -> Root {
636 Root(self.merkle_output.root)
637 }
638
639 pub fn prev_root(&self) -> Root {
641 self.prev_root
642 }
643
644 pub fn take_witness(&mut self) -> Option<Witness> {
649 self.merkle_output.witness.take()
650 }
651
652 pub fn into_overlay(self) -> Overlay {
655 let updated_pages = self
656 .merkle_output
657 .updated_pages
658 .into_frozen_iter(true)
659 .collect();
660 let values = self.value_transaction.into_iter().collect();
661
662 self.parent_overlay.finish(
663 self.prev_root.into_inner(),
664 self.merkle_output.root,
665 updated_pages,
666 values,
667 self.rollback_delta,
668 )
669 }
670
671 pub fn commit<T: HashAlgorithm>(self, nomt: &Nomt<T>) -> Result<(), anyhow::Error> {
679 let _write_guard = self.take_global_guard.then(|| nomt.access_lock.write());
680
681 {
682 let mut shared = nomt.shared.lock();
683 if shared.root != self.prev_root {
684 anyhow::bail!(
685 "Changeset no longer valid (expected previous root {:?}, got {:?})",
686 self.prev_root,
687 shared.root
688 );
689 }
690 shared.root = Root(self.merkle_output.root);
691 shared.last_commit_marker = None;
692 }
693
694 if let Some(rollback_delta) = self.rollback_delta {
695 let rollback = nomt.store.rollback().unwrap();
697 rollback.commit(rollback_delta)?;
698 }
699
700 nomt.store.commit(
701 self.value_transaction.into_iter(),
702 nomt.page_cache.clone(),
703 self.merkle_output
704 .updated_pages
705 .into_frozen_iter(false),
706 )
707 }
708
709 pub fn try_commit_nonblocking<T: HashAlgorithm>(
717 mut self,
718 nomt: &Nomt<T>,
719 ) -> Result<Option<Self>, anyhow::Error> {
720 let write_guard = self
721 .take_global_guard
722 .then(|| nomt.access_lock.try_write())
723 .flatten();
724 if write_guard.is_none() {
725 return Ok(Some(self));
726 }
727
728 if let Some(rollback_delta) = self.rollback_delta {
729 let rollback = nomt.store.rollback().unwrap();
731 if let Some(delta) = rollback.commit_nonblocking(rollback_delta)? {
732 self.rollback_delta = Some(delta);
733 return Ok(Some(self));
734 }
735 }
736
737 {
738 let mut shared = nomt.shared.lock();
739 if shared.root != self.prev_root {
740 anyhow::bail!(
741 "Changeset no longer valid (expected previous root {:?}, got {:?})",
742 self.prev_root,
743 shared.root
744 );
745 }
746 shared.root = Root(self.merkle_output.root);
747 shared.last_commit_marker = None;
748 }
749
750 nomt.store.commit(
751 self.value_transaction.into_iter(),
752 nomt.page_cache.clone(),
753 self.merkle_output
754 .updated_pages
755 .into_frozen_iter(false),
756 )?;
757
758 Ok(None)
759 }
760}
761
762impl Overlay {
763 pub fn commit<T: HashAlgorithm>(self, nomt: &Nomt<T>) -> anyhow::Result<()> {
771 if !self.parent_matches_marker(nomt.shared.lock().last_commit_marker.as_ref()) {
772 anyhow::bail!("Overlay parent not committed");
773 }
774
775 let root = self.root();
776 let page_changes: Vec<_> = self
777 .page_changes()
778 .into_iter()
779 .map(|(page_id, dirty_page)| (page_id.clone(), dirty_page.clone()))
780 .collect();
781 let values: Vec<_> = self
782 .value_changes()
783 .iter()
784 .map(|(k, v)| (k.clone(), v.clone()))
785 .collect();
786 let rollback_delta = self.rollback_delta().map(|delta| delta.clone());
787
788 let _write_guard = nomt.access_lock.write();
789
790 let marker = self.mark_committed();
791
792 {
793 let mut shared = nomt.shared.lock();
794 if shared.root != self.prev_root() {
795 anyhow::bail!(
796 "Changeset no longer valid (expected previous root {:?}, got {:?})",
797 self.prev_root(),
798 shared.root
799 );
800 }
801 shared.root = root;
802 shared.last_commit_marker = Some(marker);
803 }
804
805 if let Some(rollback_delta) = rollback_delta {
806 let rollback = nomt.store.rollback().unwrap();
808 rollback.commit(rollback_delta)?;
809 }
810
811 nomt.store
812 .commit(values, nomt.page_cache.clone(), page_changes)
813 }
814
815 pub fn try_commit_nonblocking<T: HashAlgorithm>(
823 self,
824 nomt: &Nomt<T>,
825 ) -> anyhow::Result<Option<Self>> {
826 if !self.parent_matches_marker(nomt.shared.lock().last_commit_marker.as_ref()) {
827 anyhow::bail!("Overlay parent not committed");
828 }
829
830 let root = self.root();
831 let page_changes: Vec<_> = self
832 .page_changes()
833 .into_iter()
834 .map(|(page_id, dirty_page)| (page_id.clone(), dirty_page.clone()))
835 .collect();
836 let values: Vec<_> = self
837 .value_changes()
838 .iter()
839 .map(|(k, v)| (k.clone(), v.clone()))
840 .collect();
841 let rollback_delta = self.rollback_delta().map(|delta| delta.clone());
842
843 let write_guard = nomt.access_lock.try_write();
844 if write_guard.is_none() {
845 return Ok(Some(self));
846 }
847
848 let marker = self.mark_committed();
849
850 {
851 let mut shared = nomt.shared.lock();
852 if shared.root != self.prev_root() {
853 anyhow::bail!(
854 "Changeset no longer valid (expected previous root {:?}, got {:?})",
855 self.prev_root(),
856 shared.root
857 );
858 }
859 shared.root = root;
860 shared.last_commit_marker = Some(marker);
861 }
862
863 if let Some(rollback_delta) = rollback_delta {
864 let rollback = nomt.store.rollback().unwrap();
866 rollback.commit(rollback_delta)?;
867 }
868
869 nomt.store
870 .commit(values, nomt.page_cache.clone(), page_changes)?;
871
872 Ok(None)
873 }
874}
875
876pub trait HashAlgorithm: ValueHasher + NodeHasher {}
882
883impl<T: ValueHasher + NodeHasher> HashAlgorithm for T {}
884
885fn compute_root_node<H: HashAlgorithm>(page_cache: &PageCache, store: &Store) -> Node {
886 if let Some((root_page, _)) = page_cache.get(ROOT_PAGE_ID) {
892 let left = root_page.node(0);
893 let right = root_page.node(1);
894
895 if left != TERMINATOR || right != TERMINATOR {
896 return H::hash_internal(&InternalData { left, right });
898 }
899 }
900
901 let read_tx = store.read_transaction();
903 let mut iterator = read_tx.iterator(beatree::Key::default(), None);
904
905 let io_handle = store.io_pool().make_handle();
906
907 loop {
908 match iterator.next() {
909 None => return TERMINATOR, Some(beatree::iterator::IterOutput::Blocked) => {
911 let leaf = match read_tx.load_leaf_async(
913 iterator.needed_leaves().next().unwrap(),
914 &io_handle,
915 0,
916 ) {
917 Ok(leaf_node) => leaf_node,
918 Err(leaf_load) => {
919 let complete_io = io_handle.recv().unwrap();
921
922 leaf_load.finish(complete_io.command.kind.unwrap_buf())
924 }
925 };
926
927 iterator.provide_leaf(leaf);
928 }
929 Some(beatree::iterator::IterOutput::Item(key_path, value)) => {
930 return H::hash_leaf(&LeafData {
932 key_path,
933 value_hash: H::hash_value(value),
934 });
935 }
936 Some(beatree::iterator::IterOutput::OverflowItem(key_path, value_hash, _)) => {
937 return H::hash_leaf(&LeafData {
939 key_path,
940 value_hash,
941 });
942 }
943 }
944 }
945}
946
947pub fn check_iou_permissions() -> IoUringPermission {
951 crate::io::check_iou_permissions()
952}
953
954#[cfg(test)]
955mod tests {
956 use crate::hasher::Blake3Hasher;
957
958 #[test]
959 fn session_is_sync() {
960 fn is_sync<T: Sync>() {}
961
962 is_sync::<crate::Session<Blake3Hasher>>();
963 }
964}