nomt/
lib.rs

1#![warn(missing_docs)]
2
3//! A Nearly-Optimal Merkle Trie Database.
4
5use bitvec::prelude::*;
6use io::PagePool;
7use metrics::{Metric, Metrics};
8use std::{mem, sync::Arc};
9
10use merkle::{UpdatePool, Updater};
11use nomt_core::{
12    hasher::{NodeHasher, ValueHasher},
13    page_id::ROOT_PAGE_ID,
14    proof::PathProof,
15    trie::{InternalData, KeyPath, LeafData, Node, TERMINATOR},
16};
17use overlay::{LiveOverlay, OverlayMarker};
18use page_cache::PageCache;
19use parking_lot::{ArcRwLockReadGuard, Mutex, RwLock};
20use store::{Store, ValueTransaction};
21
22pub use io::IoUringPermission;
23pub use nomt_core::hasher;
24pub use nomt_core::proof;
25pub use nomt_core::trie;
26pub use nomt_core::witness::{
27    Witness, WitnessedOperations, WitnessedPath, WitnessedRead, WitnessedWrite,
28};
29pub use options::{Options, PanicOnSyncMode};
30pub use overlay::{InvalidAncestors, Overlay};
31pub use store::HashTableUtilization;
32
33// beatree module needs to be exposed to be benchmarked and fuzzed
34#[cfg(any(feature = "benchmarks", feature = "fuzz"))]
35#[allow(missing_docs)]
36pub mod beatree;
37#[cfg(not(any(feature = "benchmarks", feature = "fuzz")))]
38mod beatree;
39
40mod bitbox;
41mod merkle;
42mod metrics;
43mod options;
44mod overlay;
45mod page_cache;
46mod page_diff;
47mod page_region;
48mod rollback;
49mod rw_pass_cell;
50mod seglog;
51mod store;
52mod sys;
53mod task;
54
55mod io;
56
57const MAX_COMMIT_CONCURRENCY: usize = 64;
58
59/// A full value stored within the trie.
60pub type Value = Vec<u8>;
61
62struct Shared {
63    /// The current root of the trie.
64    root: Root,
65    /// The marker of the last committed overlay. `None` if the last commit was not an overlay.
66    last_commit_marker: Option<OverlayMarker>,
67}
68
69/// Whether a key was read, written, or both, along with old and new values.
70#[derive(Debug, Clone)]
71pub enum KeyReadWrite {
72    /// The key was read. Contains the read value.
73    Read(Option<Value>),
74    /// The key was written. Contains the written value.
75    Write(Option<Value>),
76    /// The key was both read and written. Contains the previous value and the new value.
77    ReadThenWrite(Option<Value>, Option<Value>),
78}
79
80impl KeyReadWrite {
81    /// Returns the last recorded value for the slot.
82    pub fn last_value(&self) -> Option<&[u8]> {
83        match self {
84            KeyReadWrite::Read(v) | KeyReadWrite::Write(v) | KeyReadWrite::ReadThenWrite(_, v) => {
85                v.as_deref()
86            }
87        }
88    }
89
90    /// Returns true if the key was written to.
91    pub fn is_write(&self) -> bool {
92        matches!(
93            self,
94            KeyReadWrite::Write(_) | KeyReadWrite::ReadThenWrite(_, _)
95        )
96    }
97
98    /// Updates the state of the given slot.
99    ///
100    /// If the slot was read, it becomes read-then-write. If it was written, the value is updated.
101    pub fn write(&mut self, new_value: Option<Value>) {
102        match *self {
103            KeyReadWrite::Read(ref mut value) => {
104                *self = KeyReadWrite::ReadThenWrite(mem::take(value), new_value);
105            }
106            KeyReadWrite::Write(ref mut value) => {
107                *value = new_value;
108            }
109            KeyReadWrite::ReadThenWrite(_, ref mut value) => {
110                *value = new_value;
111            }
112        }
113    }
114
115    /// Updates the state of the given slot.
116    ///
117    /// If the slot was written, it becomes read-then-write.
118    pub fn read(&mut self, read_value: Option<Value>) {
119        match *self {
120            KeyReadWrite::Read(_) | KeyReadWrite::ReadThenWrite(_, _) => {}
121            KeyReadWrite::Write(ref mut value) => {
122                *self = KeyReadWrite::ReadThenWrite(read_value, mem::take(value));
123            }
124        }
125    }
126
127    fn to_compact<T: HashAlgorithm>(&self) -> crate::merkle::KeyReadWrite {
128        let hash = |v: &Value| T::hash_value(v);
129        match self {
130            KeyReadWrite::Read(_) => crate::merkle::KeyReadWrite::Read,
131            KeyReadWrite::Write(val) => crate::merkle::KeyReadWrite::Write(val.as_ref().map(hash)),
132            KeyReadWrite::ReadThenWrite(_, val) => {
133                crate::merkle::KeyReadWrite::ReadThenWrite(val.as_ref().map(hash))
134            }
135        }
136    }
137}
138
139/// The root of the Merkle Trie.
140#[derive(Clone, Copy, PartialEq, Eq)]
141#[cfg_attr(
142    feature = "borsh",
143    derive(borsh::BorshSerialize, borsh::BorshDeserialize)
144)]
145pub struct Root([u8; 32]);
146
147impl Root {
148    /// Whether the root represents an empty trie.
149    pub fn is_empty(&self) -> bool {
150        self.0 == trie::TERMINATOR
151    }
152
153    /// Get the underlying bytes of the root.
154    pub fn into_inner(self) -> [u8; 32] {
155        self.0
156    }
157}
158
159impl std::fmt::Display for Root {
160    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
161        for byte in &self.0[0..4] {
162            write!(f, "{:02x}", byte)?;
163        }
164
165        write!(f, "...")?;
166
167        for byte in &self.0[28..32] {
168            write!(f, "{:02x}", byte)?;
169        }
170        Ok(())
171    }
172}
173
174impl std::fmt::Debug for Root {
175    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
176        write!(f, "Root(")?;
177        for byte in &self.0 {
178            write!(f, "{:02x}", byte)?;
179        }
180        write!(f, ")")?;
181        Ok(())
182    }
183}
184
185impl AsRef<[u8]> for Root {
186    fn as_ref(&self) -> &[u8] {
187        self.0.as_ref()
188    }
189}
190
191impl From<[u8; 32]> for Root {
192    fn from(value: [u8; 32]) -> Self {
193        Self(value)
194    }
195}
196
197/// An instance of the Nearly-Optimal Merkle Trie Database.
198pub struct Nomt<T> {
199    merkle_update_pool: UpdatePool,
200    /// The handle to the page cache.
201    page_cache: PageCache,
202    page_pool: PagePool,
203    store: Store,
204    shared: Arc<Mutex<Shared>>,
205    /// Used to protect the multiple-readers-one-writer API
206    access_lock: Arc<RwLock<()>>,
207    metrics: Metrics,
208    _marker: std::marker::PhantomData<T>,
209}
210
211impl<T: HashAlgorithm> Nomt<T> {
212    /// Open the database with the given options.
213    ///
214    /// It is recommended to check io_uring permissions before calling this function by calling
215    /// [`check_iou_permissions`]
216    pub fn open(mut o: Options) -> anyhow::Result<Self> {
217        if o.commit_concurrency == 0 {
218            anyhow::bail!("commit concurrency must be greater than zero".to_string());
219        }
220
221        if o.commit_concurrency > MAX_COMMIT_CONCURRENCY {
222            o.commit_concurrency = MAX_COMMIT_CONCURRENCY;
223        }
224
225        let metrics = Metrics::new(o.metrics);
226
227        let page_pool = PagePool::new();
228        let store = Store::open(&o, page_pool.clone())?;
229        let root_page = store.load_page(ROOT_PAGE_ID)?;
230        let page_cache = PageCache::new(root_page, &o, metrics.clone());
231        let root = compute_root_node::<T>(&page_cache, &store);
232
233        if o.prepopulate_page_cache {
234            let io_handle = store.io_pool().make_handle();
235            merkle::prepopulate_cache(io_handle, &page_cache, &store, o.page_cache_upper_levels)?;
236        }
237
238        Ok(Self {
239            merkle_update_pool: UpdatePool::new(o.commit_concurrency, o.warm_up),
240            page_cache,
241            page_pool,
242            store,
243            shared: Arc::new(Mutex::new(Shared {
244                root: Root(root),
245                last_commit_marker: None,
246            })),
247            access_lock: Arc::new(RwLock::new(())),
248            metrics,
249            _marker: std::marker::PhantomData,
250        })
251    }
252
253    /// Returns a recent root of the trie.
254    pub fn root(&self) -> Root {
255        self.shared.lock().root.clone()
256    }
257
258    /// Returns true if the trie has no items in it.
259    pub fn is_empty(&self) -> bool {
260        self.root().is_empty()
261    }
262
263    /// Returns the value stored under the given key.
264    ///
265    /// Returns `None` if the value is not stored under the given key. Fails only if I/O fails.
266    ///
267    /// This is used for testing for now.
268    #[doc(hidden)]
269    pub fn read(&self, path: KeyPath) -> anyhow::Result<Option<Value>> {
270        let _guard = self.access_lock.read();
271        self.store.load_value(path)
272    }
273
274    /// Returns the current sync sequence number.
275    #[doc(hidden)]
276    pub fn sync_seqn(&self) -> u32 {
277        self.store.sync_seqn()
278    }
279
280    /// Whether the database is poisoned.
281    ///
282    /// A database becomes poisoned when an error occurred during a commit operation.
283    ///
284    /// From this point on, the database is in an inconsistent state and should be considered
285    /// read-only. Any further modifying operations will return an error.
286    ///
287    /// In order to recover from a poisoned database, the application should discard this instance
288    /// and create a new one.
289    pub fn is_poisoned(&self) -> bool {
290        self.store.is_poisoned()
291    }
292
293    /// Create a new [`Session`] object with the given parameters.
294    ///
295    /// This will block if there are any ongoing commits or rollbacks. Multiple sessions may
296    /// coexist.
297    ///
298    /// The [`Session`] is a read-only handle on the database and is used to create a changeset to
299    /// be applied to the database. Sessions provide read interfaces and additionally coordinate
300    /// work such as proving and rollback preimages which make committing more efficient.
301    ///
302    /// There may be multiple sessions live, though the existence of an outstanding session will
303    /// prevent writes to the database. Sessions are the main way of reading to the database,
304    /// and permit a changeset to be committed either directly to the database or into an
305    /// in-memory [`Overlay`].
306    pub fn begin_session(&self, params: SessionParams) -> Session<T> {
307        let live_overlay = params.overlay;
308
309        // We must take the access guard before instantiating the rollback delta,
310        // because it creates a read transaction and any commits or rollbacks will block
311        // indefinitely for us to finish.
312        let access_guard = params
313            .take_global_guard
314            .then(|| RwLock::read_arc(&self.access_lock));
315
316        let store = self.store.clone();
317        let rollback_delta = if params.record_rollback_delta {
318            self.store
319                .rollback()
320                .map(|r| r.delta_builder(&store, &live_overlay))
321        } else {
322            None
323        };
324
325        let prev_root = live_overlay
326            .parent_root()
327            .unwrap_or_else(|| self.root().into_inner());
328
329        Session {
330            store,
331            merkle_updater: self.merkle_update_pool.begin::<T>(
332                self.page_cache.clone(),
333                self.page_pool.clone(),
334                self.store.clone(),
335                live_overlay.clone(),
336                prev_root,
337            ),
338            metrics: self.metrics.clone(),
339            rollback_delta,
340            overlay: live_overlay,
341            witness_mode: params.witness,
342            access_guard,
343            prev_root: Root(prev_root),
344            _marker: std::marker::PhantomData,
345        }
346    }
347
348    /// Perform a rollback of the last `n` commits.
349    ///
350    /// This function will block until all ongoing commits or [`Session`]s are finished.
351    ///
352    /// Fails if the DB is not configured for rollback or doesn't have enough commits logged to
353    /// rollback.
354    pub fn rollback(&self, n: usize) -> anyhow::Result<()> {
355        if n == 0 {
356            return Ok(());
357        }
358
359        let _write_guard = self.access_lock.write();
360
361        let Some(rollback) = self.store.rollback() else {
362            anyhow::bail!("rollback: not enabled");
363        };
364        let Some(traceback) = rollback.truncate(n)? else {
365            anyhow::bail!("rollback: not enough logged for rolling back");
366        };
367
368        // Begin a new session. We do not allow rollback for this operation because that would
369        // interfere with the rollback log: if another rollback were to be issued, it must rollback
370        // the changes in the rollback log and not the changes performed by the current rollback.
371        let mut session_params = SessionParams::default();
372        session_params.record_rollback_delta = false;
373
374        // We hold a write guard and don't need the session to take any other.
375        session_params.take_global_guard = false;
376        let sess = self.begin_session(session_params);
377
378        // Convert the traceback into a series of write commands.
379        let mut actuals = Vec::new();
380        for (key, value) in traceback {
381            sess.warm_up(key);
382            let value = KeyReadWrite::Write(value);
383            actuals.push((key, value));
384        }
385
386        sess.finish(actuals)?.commit(&self)?;
387
388        Ok(())
389    }
390
391    /// Return Nomt's metrics.
392    /// To collect them, they need to be activated at [`Nomt`] creation
393    #[doc(hidden)]
394    pub fn metrics(&self) -> Metrics {
395        self.metrics.clone()
396    }
397
398    /// Get the hash-table space utilization.
399    pub fn hash_table_utilization(&self) -> HashTableUtilization {
400        self.store.hash_table_utilization()
401    }
402}
403
404/// A configuration type used to inform NOMT whether to generate witnesses of accessed data.
405pub struct WitnessMode(bool);
406
407impl WitnessMode {
408    /// Witness all reads and writes to the trie.
409    pub fn read_write() -> Self {
410        WitnessMode(true)
411    }
412
413    /// Do not generate a witness.
414    pub fn disabled() -> Self {
415        WitnessMode(false)
416    }
417}
418
419/// Parameters for instantiating a session.
420pub struct SessionParams {
421    // INTERNAL: only false during rollback. determines whether the rollback delta is built
422    record_rollback_delta: bool,
423    // INTERNAL: only false during rollback. determines whether a global read lock is taken
424    take_global_guard: bool,
425
426    witness: WitnessMode,
427    overlay: LiveOverlay,
428}
429
430impl Default for SessionParams {
431    fn default() -> Self {
432        SessionParams {
433            record_rollback_delta: true,
434            take_global_guard: true,
435            witness: WitnessMode::disabled(),
436            // UNWRAP: empty live overlay always valid.
437            overlay: LiveOverlay::new(None).unwrap(),
438        }
439    }
440}
441
442impl SessionParams {
443    /// Whether to generate a witness of the read and written keys. Default: disabled
444    ///
445    /// If `WitnessMode::read_write()` is provided, then when this session has concluded it will be
446    /// possible to use [`FinishedSession::take_witness`] to get the recorded witness.
447    pub fn witness_mode(mut self, witness: WitnessMode) -> Self {
448        self.witness = witness;
449        self
450    }
451
452    /// Use a set of live overlays (ancestors, in descending order) as a parent. Default: None
453    ///
454    /// Errors are returned if the set of ancestor overlays provided are not _sound_ or _complete_.
455    ///
456    /// An error is returned if the overlays do not represent a chain, starting from the first.
457    /// i.e. the second overlay must be an ancestor of the first, and the third must be an ancestor
458    /// of the second, and so on. This can be thought of as soundness.
459    ///
460    /// An error is returned if the final overlay's parent has not yet been committed. The complete
461    /// set of all uncommitted ancestors must be provided.
462    pub fn overlay<'a>(
463        mut self,
464        ancestors: impl IntoIterator<Item = &'a Overlay>,
465    ) -> Result<Self, InvalidAncestors> {
466        self.overlay = LiveOverlay::new(ancestors)?;
467        Ok(self)
468    }
469}
470
471/// A session presents a way of interaction with the trie.
472///
473/// The session enables the application to perform reads and prepare writes.
474///
475/// When the session is finished, the application can confirm the changes by calling
476/// [`Session::finish`] or others and create a [`Witness`] that can be used to prove the
477/// correctness of replaying the same operations.
478pub struct Session<T> {
479    store: Store,
480    merkle_updater: Updater,
481    metrics: Metrics,
482    rollback_delta: Option<rollback::ReverseDeltaBuilder>,
483    overlay: LiveOverlay,
484    witness_mode: WitnessMode,
485    // Note: this needs to be after rollback_delta and merkle_updater in declaration order,
486    // so this is dropped after all read transactions are taken, even when the session is dropped.
487    access_guard: Option<ArcRwLockReadGuard<parking_lot::RawRwLock, ()>>,
488    prev_root: Root,
489    _marker: std::marker::PhantomData<T>,
490}
491
492impl<T> Session<T> {
493    /// Signal to the backend to warm up the merkle paths and b-tree pages for a key, so they are
494    /// ready by the time you commit the session.
495    ///
496    /// This should be called for every logical write within the session, as well as every logical
497    /// read if you expect to generate a merkle proof for the session. If you do not expect to
498    /// prove this session, you can skip calling this for reads, but still need to warm up logical
499    /// writes.
500    ///
501    /// The purpose of warming up is to move I/O out of the critical path of committing a
502    /// session to maximize throughput.
503    /// There is no correctness issue with doing too many warm-ups, but there is a cost for I/O.
504    pub fn warm_up(&self, path: KeyPath) {
505        self.merkle_updater.warm_up(path);
506    }
507
508    /// Synchronously read the value stored under the given key.
509    ///
510    /// Returns `None` if the value is not stored under the given key. Fails only if I/O fails.
511    pub fn read(&self, path: KeyPath) -> anyhow::Result<Option<Value>> {
512        let _maybe_guard = self.metrics.record(Metric::ValueFetchTime);
513        if let Some(value_change) = self.overlay.value(&path) {
514            return Ok(value_change.as_option().map(|v| v.to_vec()));
515        }
516        self.store.load_value(path)
517    }
518
519    /// Returns the [`Root`] at which this session is based off of.
520    pub fn prev_root(&self) -> Root {
521        self.prev_root
522    }
523
524    /// Signals that the given key is going to be written to. Relevant only if rollback is enabled.
525    ///
526    /// This function initiates an I/O load operation to fetch and preserve the prior value of the key.
527    /// It serves as a hint to reduce I/O operations during the commit process by pre-loading values
528    /// that are likely to be needed.
529    ///
530    /// Important considerations:
531    /// 1. This function does not perform deduplication. Calling it multiple times for the same key
532    ///    will result in multiple load operations, which can be wasteful.
533    /// 2. The definitive set of values to be updated is determined by the update (e.g.
534    ///    [`Session::finish`]) call.
535    ///
536    ///    It's safe to call this function for keys that may not ultimately be written, and keys
537    ///    not marked here but included in the final set will still be preserved.
538    /// 3. While this function helps optimize I/O, it's not strictly necessary for correctness.
539    ///    The commit process will ensure all required prior values are preserved.
540    /// 4. If the path is given to [`Session::finish`] (and others) with the `ReadThenWrite`
541    ///    operation, calling this function is not needed as the prior value will be taken from
542    ///    there.
543    ///
544    /// For best performance, call this function once for each key you expect to write during the
545    /// session, except for those that will be part of a `ReadThenWrite` operation. The earlier
546    /// this call is issued, the better for efficiency.
547    pub fn preserve_prior_value(&self, path: KeyPath) {
548        if let Some(rollback) = &self.rollback_delta {
549            rollback.tentative_preserve_prior(path);
550        }
551    }
552}
553
554impl<T: HashAlgorithm> Session<T> {
555    /// Get a merkle proof for the given key path.
556    ///
557    /// This will block until the proof is fetched from the database.
558    /// Note that this may require multiple round trips to the database to complete.
559    ///
560    /// Fails only if I/O fails. Proves either the existence or non-existence of the key.
561    pub fn prove(&self, path: KeyPath) -> anyhow::Result<PathProof> {
562        Ok(self.merkle_updater.prove::<T>(path)?)
563    }
564
565    /// Finish the session. Provide the actual reads and writes (in sorted order) that are to be
566    /// considered within the finished session.
567    ///
568    /// This function blocks until the merkle root and changeset are computed.
569    pub fn finish(
570        mut self,
571        actuals: Vec<(KeyPath, KeyReadWrite)>,
572    ) -> anyhow::Result<FinishedSession> {
573        if cfg!(debug_assertions) {
574            // Check that the actuals are sorted by key path.
575            for i in 1..actuals.len() {
576                assert!(
577                    actuals[i].0 > actuals[i - 1].0,
578                    "actuals are not sorted at index {}",
579                    i
580                );
581            }
582        }
583        let rollback_delta = self
584            .rollback_delta
585            .take()
586            .map(|delta_builder| delta_builder.finalize(&actuals));
587
588        let mut compact_actuals = Vec::with_capacity(actuals.len());
589        for (path, read_write) in &actuals {
590            compact_actuals.push((path.clone(), read_write.to_compact::<T>()));
591        }
592
593        let merkle_update_handle = self
594            .merkle_updater
595            .update_and_prove::<T>(compact_actuals, self.witness_mode.0)?;
596
597        let mut tx = self.store.new_value_tx();
598        for (path, read_write) in actuals {
599            if let KeyReadWrite::Write(value) | KeyReadWrite::ReadThenWrite(_, value) = read_write {
600                tx.write_value::<T>(path, value);
601            }
602        }
603
604        let merkle_output = merkle_update_handle.join()?;
605        Ok(FinishedSession {
606            value_transaction: tx,
607            merkle_output,
608            rollback_delta,
609            parent_overlay: self.overlay,
610            prev_root: self.prev_root,
611            take_global_guard: self.access_guard.is_some(),
612        })
613    }
614}
615
616/// A finished session.
617///
618/// This is the result of completing a session and computing the merkle root and merkle DB changes,
619/// but which has not yet been applied to the underlying store.
620///
621/// It may either be committed directly to the database or transformed into an in-memory [`Overlay`]
622/// to be used as a base for further in-memory sessions.
623pub struct FinishedSession {
624    value_transaction: ValueTransaction,
625    merkle_output: merkle::Output,
626    rollback_delta: Option<rollback::Delta>,
627    parent_overlay: LiveOverlay,
628    prev_root: Root,
629    // INTERNAL: whether to take a write guard while committing. always true except during rollback.
630    take_global_guard: bool,
631}
632
633impl FinishedSession {
634    /// Get the root as-of this session.
635    pub fn root(&self) -> Root {
636        Root(self.merkle_output.root)
637    }
638
639    /// Returns the [`Root`] at which this session is based off of.
640    pub fn prev_root(&self) -> Root {
641        self.prev_root
642    }
643
644    /// Take the witness, if any.
645    ///
646    /// If this session was configured with proving  (see [`SessionParams::witness_mode`]),
647    /// this will be `Some` on the first call and `None` thereafter.
648    pub fn take_witness(&mut self) -> Option<Witness> {
649        self.merkle_output.witness.take()
650    }
651
652    /// Transform this into an overlay that can be queried in memory and used as the base for
653    /// further in-memory [`Session`]s.
654    pub fn into_overlay(self) -> Overlay {
655        let updated_pages = self
656            .merkle_output
657            .updated_pages
658            .into_frozen_iter(/* into_overlay */ true)
659            .collect();
660        let values = self.value_transaction.into_iter().collect();
661
662        self.parent_overlay.finish(
663            self.prev_root.into_inner(),
664            self.merkle_output.root,
665            updated_pages,
666            values,
667            self.rollback_delta,
668        )
669    }
670
671    /// Commit this session to disk directly.
672    ///
673    /// This function will block until all ongoing sessions and commits have finished.
674    ///
675    /// This will return an error if I/O fails or if the changeset is no longer valid.
676    /// The changeset may be invalidated if another competing session, overlay, or rollback was
677    /// committed.
678    pub fn commit<T: HashAlgorithm>(self, nomt: &Nomt<T>) -> Result<(), anyhow::Error> {
679        let _write_guard = self.take_global_guard.then(|| nomt.access_lock.write());
680
681        {
682            let mut shared = nomt.shared.lock();
683            if shared.root != self.prev_root {
684                anyhow::bail!(
685                    "Changeset no longer valid (expected previous root {:?}, got {:?})",
686                    self.prev_root,
687                    shared.root
688                );
689            }
690            shared.root = Root(self.merkle_output.root);
691            shared.last_commit_marker = None;
692        }
693
694        if let Some(rollback_delta) = self.rollback_delta {
695            // UNWRAP: if rollback_delta is `Some`, then rollback must be also `Some`.
696            let rollback = nomt.store.rollback().unwrap();
697            rollback.commit(rollback_delta)?;
698        }
699
700        nomt.store.commit(
701            self.value_transaction.into_iter(),
702            nomt.page_cache.clone(),
703            self.merkle_output
704                .updated_pages
705                .into_frozen_iter(/* into_overlay */ false),
706        )
707    }
708
709    /// Commit this session to disk directly without blocking.
710    ///
711    /// This function will return `Ok(Some(Self))` if there are any ongoing sessions or commits.
712    ///
713    /// This will return an error if I/O fails or if the changeset is no longer valid.
714    /// The changeset may be invalidated if another competing session, overlay, or rollback was
715    /// committed.
716    pub fn try_commit_nonblocking<T: HashAlgorithm>(
717        mut self,
718        nomt: &Nomt<T>,
719    ) -> Result<Option<Self>, anyhow::Error> {
720        let write_guard = self
721            .take_global_guard
722            .then(|| nomt.access_lock.try_write())
723            .flatten();
724        if write_guard.is_none() {
725            return Ok(Some(self));
726        }
727
728        if let Some(rollback_delta) = self.rollback_delta {
729            // UNWRAP: if rollback_delta is `Some`, then rollback must be also `Some`.
730            let rollback = nomt.store.rollback().unwrap();
731            if let Some(delta) = rollback.commit_nonblocking(rollback_delta)? {
732                self.rollback_delta = Some(delta);
733                return Ok(Some(self));
734            }
735        }
736
737        {
738            let mut shared = nomt.shared.lock();
739            if shared.root != self.prev_root {
740                anyhow::bail!(
741                    "Changeset no longer valid (expected previous root {:?}, got {:?})",
742                    self.prev_root,
743                    shared.root
744                );
745            }
746            shared.root = Root(self.merkle_output.root);
747            shared.last_commit_marker = None;
748        }
749
750        nomt.store.commit(
751            self.value_transaction.into_iter(),
752            nomt.page_cache.clone(),
753            self.merkle_output
754                .updated_pages
755                .into_frozen_iter(/* into_overlay */ false),
756        )?;
757
758        Ok(None)
759    }
760}
761
762impl Overlay {
763    /// Commit the changes from this overlay to the underlying database.
764    ///
765    /// This function will block until all ongoing sessions and commits have finished.
766    ///
767    /// This will return an error if I/O fails or if the changeset is no longer valid, or if the
768    /// overlay has an uncommitted parent. An overlay may be invalidated by a competing commit or
769    /// rollback.
770    pub fn commit<T: HashAlgorithm>(self, nomt: &Nomt<T>) -> anyhow::Result<()> {
771        if !self.parent_matches_marker(nomt.shared.lock().last_commit_marker.as_ref()) {
772            anyhow::bail!("Overlay parent not committed");
773        }
774
775        let root = self.root();
776        let page_changes: Vec<_> = self
777            .page_changes()
778            .into_iter()
779            .map(|(page_id, dirty_page)| (page_id.clone(), dirty_page.clone()))
780            .collect();
781        let values: Vec<_> = self
782            .value_changes()
783            .iter()
784            .map(|(k, v)| (k.clone(), v.clone()))
785            .collect();
786        let rollback_delta = self.rollback_delta().map(|delta| delta.clone());
787
788        let _write_guard = nomt.access_lock.write();
789
790        let marker = self.mark_committed();
791
792        {
793            let mut shared = nomt.shared.lock();
794            if shared.root != self.prev_root() {
795                anyhow::bail!(
796                    "Changeset no longer valid (expected previous root {:?}, got {:?})",
797                    self.prev_root(),
798                    shared.root
799                );
800            }
801            shared.root = root;
802            shared.last_commit_marker = Some(marker);
803        }
804
805        if let Some(rollback_delta) = rollback_delta {
806            // UNWRAP: if rollback_delta is `Some`, then rollback must be also `Some`.
807            let rollback = nomt.store.rollback().unwrap();
808            rollback.commit(rollback_delta)?;
809        }
810
811        nomt.store
812            .commit(values, nomt.page_cache.clone(), page_changes)
813    }
814
815    /// Commit the changes from this overlay to the underlying database without blocking.
816    ///
817    /// This function will return `Ok(Some(Self))` if there are ongoing sessions and commits.
818    ///
819    /// This will return an error if I/O fails or if the changeset is no longer valid, or if the
820    /// overlay has an uncommitted parent. An overlay may be invalidated by a competing commit or
821    /// rollback.
822    pub fn try_commit_nonblocking<T: HashAlgorithm>(
823        self,
824        nomt: &Nomt<T>,
825    ) -> anyhow::Result<Option<Self>> {
826        if !self.parent_matches_marker(nomt.shared.lock().last_commit_marker.as_ref()) {
827            anyhow::bail!("Overlay parent not committed");
828        }
829
830        let root = self.root();
831        let page_changes: Vec<_> = self
832            .page_changes()
833            .into_iter()
834            .map(|(page_id, dirty_page)| (page_id.clone(), dirty_page.clone()))
835            .collect();
836        let values: Vec<_> = self
837            .value_changes()
838            .iter()
839            .map(|(k, v)| (k.clone(), v.clone()))
840            .collect();
841        let rollback_delta = self.rollback_delta().map(|delta| delta.clone());
842
843        let write_guard = nomt.access_lock.try_write();
844        if write_guard.is_none() {
845            return Ok(Some(self));
846        }
847
848        let marker = self.mark_committed();
849
850        {
851            let mut shared = nomt.shared.lock();
852            if shared.root != self.prev_root() {
853                anyhow::bail!(
854                    "Changeset no longer valid (expected previous root {:?}, got {:?})",
855                    self.prev_root(),
856                    shared.root
857                );
858            }
859            shared.root = root;
860            shared.last_commit_marker = Some(marker);
861        }
862
863        if let Some(rollback_delta) = rollback_delta {
864            // UNWRAP: if rollback_delta is `Some`, then rollback must be also `Some`.
865            let rollback = nomt.store.rollback().unwrap();
866            rollback.commit(rollback_delta)?;
867        }
868
869        nomt.store
870            .commit(values, nomt.page_cache.clone(), page_changes)?;
871
872        Ok(None)
873    }
874}
875
876/// A marker trait for hash functions usable with NOMT. The type must support both hashing nodes as
877/// well as values.
878///
879/// This is automatically implemented for types implementing
880/// both [`NodeHasher`] and [`ValueHasher`].
881pub trait HashAlgorithm: ValueHasher + NodeHasher {}
882
883impl<T: ValueHasher + NodeHasher> HashAlgorithm for T {}
884
885fn compute_root_node<H: HashAlgorithm>(page_cache: &PageCache, store: &Store) -> Node {
886    // 3 cases.
887    // 1: root page is empty and beatree is empty. in this case, root is the TERMINATOR.
888    // 2: root page is empty and beatree has a single item. in this case, root is a leaf.
889    // 3: root is an internal node.
890
891    if let Some((root_page, _)) = page_cache.get(ROOT_PAGE_ID) {
892        let left = root_page.node(0);
893        let right = root_page.node(1);
894
895        if left != TERMINATOR || right != TERMINATOR {
896            // case 3
897            return H::hash_internal(&InternalData { left, right });
898        }
899    }
900
901    // cases 1/2
902    let read_tx = store.read_transaction();
903    let mut iterator = read_tx.iterator(beatree::Key::default(), None);
904
905    let io_handle = store.io_pool().make_handle();
906
907    loop {
908        match iterator.next() {
909            None => return TERMINATOR, // case 1
910            Some(beatree::iterator::IterOutput::Blocked) => {
911                // UNWRAP: when blocked, needed leaf always exists.
912                let leaf = match read_tx.load_leaf_async(
913                    iterator.needed_leaves().next().unwrap(),
914                    &io_handle,
915                    0,
916                ) {
917                    Ok(leaf_node) => leaf_node,
918                    Err(leaf_load) => {
919                        // UNWRAP: `Err` indicates a request was sent.
920                        let complete_io = io_handle.recv().unwrap();
921
922                        // UNWRAP: the I/O command submitted by `load_leaf_async` is always a `Read`
923                        leaf_load.finish(complete_io.command.kind.unwrap_buf())
924                    }
925                };
926
927                iterator.provide_leaf(leaf);
928            }
929            Some(beatree::iterator::IterOutput::Item(key_path, value)) => {
930                // case 2
931                return H::hash_leaf(&LeafData {
932                    key_path,
933                    value_hash: H::hash_value(value),
934                });
935            }
936            Some(beatree::iterator::IterOutput::OverflowItem(key_path, value_hash, _)) => {
937                // case 2
938                return H::hash_leaf(&LeafData {
939                    key_path,
940                    value_hash,
941                });
942            }
943        }
944    }
945}
946
947/// Check whether the current device has permission to use io_uring.
948///
949/// On non-Linux platforms, this will always return `NotSupported`.
950pub fn check_iou_permissions() -> IoUringPermission {
951    crate::io::check_iou_permissions()
952}
953
954#[cfg(test)]
955mod tests {
956    use crate::hasher::Blake3Hasher;
957
958    #[test]
959    fn session_is_sync() {
960        fn is_sync<T: Sync>() {}
961
962        is_sync::<crate::Session<Blake3Hasher>>();
963    }
964}