nebari/
roots.rs

1use std::{
2    any::Any,
3    borrow::{Borrow, Cow},
4    collections::HashMap,
5    convert::Infallible,
6    fmt::{Debug, Display},
7    fs,
8    ops::{Deref, DerefMut, RangeBounds},
9    path::{Path, PathBuf},
10    sync::{
11        atomic::{AtomicU16, Ordering},
12        Arc,
13    },
14};
15
16use flume::Sender;
17use once_cell::sync::Lazy;
18use parking_lot::{MappedMutexGuard, Mutex, MutexGuard};
19
20use crate::{
21    context::Context,
22    error::Error,
23    io::{fs::StdFileManager, FileManager, ManagedFile},
24    transaction::{LogEntry, ManagedTransaction, TransactionManager},
25    tree::{
26        self, root::AnyTreeRoot, state::AnyTreeState, EmbeddedIndex, KeySequence, Modification,
27        Operation, Reducer, ScanEvaluation, State, TransactableCompaction, TreeFile, TreeRoot,
28        VersionedTreeRoot,
29    },
30    vault::AnyVault,
31    ArcBytes, ChunkCache, ErrorKind,
32};
33
34/// A multi-tree transactional B-Tree database.
35#[derive(Debug)]
36pub struct Roots<File: ManagedFile> {
37    data: Arc<Data<File>>,
38}
39
40#[derive(Debug)]
41struct Data<File: ManagedFile> {
42    context: Context<File::Manager>,
43    transactions: TransactionManager<File::Manager>,
44    thread_pool: ThreadPool<File>,
45    path: PathBuf,
46    tree_states: Mutex<HashMap<String, Box<dyn AnyTreeState>>>,
47}
48
49impl<File: ManagedFile> Roots<File> {
50    fn open<P: Into<PathBuf> + Send>(
51        path: P,
52        context: Context<File::Manager>,
53        thread_pool: ThreadPool<File>,
54    ) -> Result<Self, Error> {
55        let path = path.into();
56        if !path.exists() {
57            fs::create_dir_all(&path)?;
58        } else if !path.is_dir() {
59            return Err(Error::from(format!(
60                "'{:?}' already exists, but is not a directory.",
61                path
62            )));
63        }
64
65        let transactions = TransactionManager::spawn(&path, context.clone())?;
66        Ok(Self {
67            data: Arc::new(Data {
68                context,
69                path,
70                transactions,
71                thread_pool,
72                tree_states: Mutex::default(),
73            }),
74        })
75    }
76
77    /// Returns the path to the database directory.
78    #[must_use]
79    pub fn path(&self) -> &Path {
80        &self.data.path
81    }
82
83    /// Returns the vault used to encrypt this database.
84    pub fn context(&self) -> &Context<File::Manager> {
85        &self.data.context
86    }
87
88    /// Returns the transaction manager for this database.
89    #[must_use]
90    pub fn transactions(&self) -> &TransactionManager<File::Manager> {
91        &self.data.transactions
92    }
93
94    /// Opens a tree named `name`.
95    ///
96    /// ## Errors
97    ///
98    /// - [`InvalidTreeName`](ErrorKind::InvalidTreeName): The name contained an
99    ///   invalid character. For a full list of valid characters, see the
100    ///   documentation on [`InvalidTreeName`](ErrorKind::InvalidTreeName).
101    pub fn tree<Root: tree::Root>(
102        &self,
103        root: TreeRoot<Root, File>,
104    ) -> Result<Tree<Root, File>, Error> {
105        check_name(&root.name)?;
106        let path = self.tree_path(&root.name);
107        if !path.exists() {
108            self.context().file_manager.append(&path)?;
109        }
110        let state = self.tree_state(root.name.clone());
111        Ok(Tree {
112            roots: self.clone(),
113            state,
114            vault: root.vault,
115            name: root.name,
116        })
117    }
118
119    fn tree_path(&self, name: &str) -> PathBuf {
120        self.path().join(format!("{}.nebari", name))
121    }
122
123    /// Removes a tree. Returns true if a tree was deleted.
124    pub fn delete_tree(&self, name: impl Into<Cow<'static, str>>) -> Result<bool, Error> {
125        let name = name.into();
126        let mut tree_states = self.data.tree_states.lock();
127        self.context()
128            .file_manager
129            .delete(self.tree_path(name.as_ref()))?;
130        Ok(tree_states.remove(name.as_ref()).is_some())
131    }
132
133    /// Returns a list of all the names of trees contained in this database.
134    pub fn tree_names(&self) -> Result<Vec<String>, Error> {
135        let mut names = Vec::new();
136        for entry in std::fs::read_dir(self.path())? {
137            let entry = entry?;
138            if let Some(name) = entry.file_name().to_str() {
139                if let Some(without_extension) = name.strip_suffix(".nebari") {
140                    names.push(without_extension.to_string());
141                }
142            }
143        }
144        Ok(names)
145    }
146
147    fn tree_state<Root: tree::Root>(&self, name: impl Into<Cow<'static, str>>) -> State<Root> {
148        self.tree_states(&[Root::tree(name)])
149            .into_iter()
150            .next()
151            .unwrap()
152            .as_ref()
153            .as_any()
154            .downcast_ref::<State<Root>>()
155            .unwrap()
156            .clone()
157    }
158
159    fn tree_states<R: Borrow<T>, T: AnyTreeRoot<File> + ?Sized>(
160        &self,
161        trees: &[R],
162    ) -> Vec<Box<dyn AnyTreeState>> {
163        let mut tree_states = self.data.tree_states.lock();
164        let mut output = Vec::with_capacity(trees.len());
165        for tree in trees {
166            let state = tree_states
167                .entry(tree.borrow().name().to_string())
168                .or_insert_with(|| tree.borrow().default_state())
169                .cloned();
170            output.push(state);
171        }
172        output
173    }
174
175    /// Begins a transaction over `trees`. All trees will be exclusively
176    /// accessible by the transaction. Dropping the executing transaction will
177    /// roll the transaction back.
178    ///
179    /// ## Errors
180    ///
181    /// - [`InvalidTreeName`](ErrorKind::InvalidTreeName): A tree name contained
182    ///   an invalid character. For a full list of valid characters, see the
183    ///   documentation on [`InvalidTreeName`](ErrorKind::InvalidTreeName).
184    pub fn transaction<R: Borrow<T>, T: AnyTreeRoot<File> + ?Sized>(
185        &self,
186        trees: &[R],
187    ) -> Result<ExecutingTransaction<File>, Error> {
188        for tree in trees {
189            check_name(tree.borrow().name()).map(|_| tree.borrow().name().as_bytes())?;
190        }
191        let transaction = self
192            .data
193            .transactions
194            .new_transaction(trees.iter().map(|t| t.borrow().name().as_bytes()));
195        let states = self.tree_states(trees);
196        let trees = trees
197            .iter()
198            .zip(states.into_iter())
199            .map(|(tree, state)| {
200                tree.borrow()
201                    .begin_transaction(
202                        transaction.id,
203                        &self.tree_path(tree.borrow().name()),
204                        state.as_ref(),
205                        self.context(),
206                        Some(&self.data.transactions),
207                    )
208                    .map(UnlockedTransactionTree::new)
209            })
210            .collect::<Result<Vec<_>, Error>>()?;
211        Ok(ExecutingTransaction {
212            roots: self.clone(),
213            transaction: Some(transaction),
214            trees,
215        })
216    }
217}
218
219fn check_name(name: &str) -> Result<(), Error> {
220    if name != "_transactions"
221        && name
222            .bytes()
223            .all(|c| matches!(c as char, 'a'..='z' | 'A'..='Z' | '0'..='9' | '-' | '.' | '_'))
224    {
225        Ok(())
226    } else {
227        Err(Error::from(ErrorKind::InvalidTreeName))
228    }
229}
230
231impl<File: ManagedFile> Clone for Roots<File> {
232    fn clone(&self) -> Self {
233        Self {
234            data: self.data.clone(),
235        }
236    }
237}
238
239/// An executing transaction. While this exists, no other transactions can
240/// execute across the same trees as this transaction holds.
241#[must_use]
242pub struct ExecutingTransaction<File: ManagedFile> {
243    roots: Roots<File>,
244    trees: Vec<UnlockedTransactionTree<File>>,
245    transaction: Option<ManagedTransaction<File::Manager>>,
246}
247
248/// A tree that belongs to an [`ExecutingTransaction`].
249#[must_use]
250pub struct UnlockedTransactionTree<File: ManagedFile>(Mutex<Box<dyn AnyTransactionTree<File>>>);
251
252impl<File: ManagedFile> UnlockedTransactionTree<File> {
253    fn new(file: Box<dyn AnyTransactionTree<File>>) -> Self {
254        Self(Mutex::new(file))
255    }
256
257    /// Locks this tree so that operations can be performed against it.
258    ///
259    /// # Panics
260    ///
261    /// This function panics if `Root` does not match the type specified when
262    /// starting the transaction.
263    pub fn lock<Root: tree::Root>(&self) -> LockedTransactionTree<'_, Root, File> {
264        LockedTransactionTree(MutexGuard::map(self.0.lock(), |tree| {
265            tree.as_mut().as_any_mut().downcast_mut().unwrap()
266        }))
267    }
268}
269
270/// A locked transaction tree. This transactional tree is exclusively available
271/// for writing and reading to the thread that locks it.
272#[must_use]
273pub struct LockedTransactionTree<'transaction, Root: tree::Root, File: ManagedFile>(
274    MappedMutexGuard<'transaction, TransactionTree<Root, File>>,
275);
276
277impl<'transaction, Root: tree::Root, File: ManagedFile> Deref
278    for LockedTransactionTree<'transaction, Root, File>
279{
280    type Target = TransactionTree<Root, File>;
281
282    fn deref(&self) -> &Self::Target {
283        &*self.0
284    }
285}
286
287impl<'transaction, Root: tree::Root, File: ManagedFile> DerefMut
288    for LockedTransactionTree<'transaction, Root, File>
289{
290    fn deref_mut(&mut self) -> &mut Self::Target {
291        &mut *self.0
292    }
293}
294
295impl<File: ManagedFile> ExecutingTransaction<File> {
296    /// Returns the [`LogEntry`] for this transaction.
297    #[must_use]
298    #[allow(clippy::missing_panics_doc)]
299    pub fn entry(&self) -> &LogEntry<'static> {
300        self.transaction
301            .as_ref()
302            .and_then(|tx| tx.transaction.as_ref())
303            .unwrap()
304    }
305
306    /// Returns a mutable reference to the [`LogEntry`] for this transaction.
307    #[must_use]
308    #[allow(clippy::missing_panics_doc)]
309    pub fn entry_mut(&mut self) -> &mut LogEntry<'static> {
310        self.transaction
311            .as_mut()
312            .and_then(|tx| tx.transaction.as_mut())
313            .unwrap()
314    }
315
316    /// Commits the transaction. Once this function has returned, all data
317    /// updates are guaranteed to be able to be accessed by all other readers as
318    /// well as impervious to sudden failures such as a power outage.
319    #[allow(clippy::missing_panics_doc)]
320    pub fn commit(mut self) -> Result<(), Error> {
321        let trees = std::mem::take(&mut self.trees);
322        // Write the trees to disk
323        let trees = self.roots.data.thread_pool.commit_trees(trees)?;
324
325        // Push the transaction to the log.
326        let transaction = self.transaction.take().unwrap();
327        let tree_locks = transaction.commit()?;
328
329        // Publish the tree states, now that the transaction has been fully recorded
330        for tree in trees {
331            tree.state().publish();
332        }
333
334        // Release the locks for the trees, allowing a new transaction to begin.
335        drop(tree_locks);
336
337        Ok(())
338    }
339
340    /// Rolls the transaction back. It is not necessary to call this function --
341    /// transactions will automatically be rolled back when the transaction is
342    /// dropped, if `commit()` isn't called first.
343    pub fn rollback(self) {
344        drop(self);
345    }
346
347    /// Accesses a locked tree.
348    pub fn tree<Root: tree::Root>(
349        &self,
350        index: usize,
351    ) -> Option<LockedTransactionTree<'_, Root, File>> {
352        self.unlocked_tree(index).map(UnlockedTransactionTree::lock)
353    }
354
355    /// Accesses an unlocked tree. Note: If you clone an
356    /// [`UnlockedTransactionTree`], you must make sure to drop all instances
357    /// before calling commit.
358    pub fn unlocked_tree(&self, index: usize) -> Option<&UnlockedTransactionTree<File>> {
359        self.trees.get(index)
360    }
361
362    fn rollback_tree_states(&mut self) {
363        for tree in self.trees.drain(..) {
364            let tree = tree.0.lock();
365            tree.rollback();
366        }
367    }
368}
369
370impl<File: ManagedFile> Drop for ExecutingTransaction<File> {
371    fn drop(&mut self) {
372        if let Some(transaction) = self.transaction.take() {
373            self.rollback_tree_states();
374            // Now the transaction can be dropped safely, freeing up access to the trees.
375            drop(transaction);
376        }
377    }
378}
379
380/// A tree that is modifiable during a transaction.
381pub struct TransactionTree<Root: tree::Root, File: ManagedFile> {
382    pub(crate) transaction_id: u64,
383    pub(crate) tree: TreeFile<Root, File>,
384}
385
386pub trait AnyTransactionTree<File: ManagedFile>: Any + Send + Sync {
387    fn as_any(&self) -> &dyn Any;
388    fn as_any_mut(&mut self) -> &mut dyn Any;
389
390    fn state(&self) -> Box<dyn AnyTreeState>;
391
392    fn commit(&mut self) -> Result<(), Error>;
393    fn rollback(&self);
394}
395
396impl<Root: tree::Root, File: ManagedFile> AnyTransactionTree<File> for TransactionTree<Root, File> {
397    fn as_any(&self) -> &dyn Any {
398        self
399    }
400    fn as_any_mut(&mut self) -> &mut dyn Any {
401        self
402    }
403
404    fn state(&self) -> Box<dyn AnyTreeState> {
405        Box::new(self.tree.state.clone())
406    }
407
408    fn commit(&mut self) -> Result<(), Error> {
409        self.tree.commit()
410    }
411
412    fn rollback(&self) {
413        let mut state = self.tree.state.lock();
414        state.rollback(&self.tree.state);
415    }
416}
417
418impl<File: ManagedFile, Index> TransactionTree<VersionedTreeRoot<Index>, File>
419where
420    Index: Clone + Reducer<Index> + EmbeddedIndex + Debug + 'static,
421{
422    /// Returns the latest sequence id.
423    pub fn current_sequence_id(&self) -> u64 {
424        let state = self.tree.state.lock();
425        state.root.sequence
426    }
427}
428
429impl<Root: tree::Root, File: ManagedFile> TransactionTree<Root, File> {
430    /// Sets `key` to `value`.
431    pub fn set(
432        &mut self,
433        key: impl Into<ArcBytes<'static>>,
434        value: impl Into<ArcBytes<'static>>,
435    ) -> Result<(), Error> {
436        self.modify(vec![key.into()], Operation::Set(value.into()))
437    }
438
439    /// Executes a modification.
440    pub fn modify<'a>(
441        &mut self,
442        keys: Vec<ArcBytes<'a>>,
443        operation: Operation<'a, ArcBytes<'static>>,
444    ) -> Result<(), Error> {
445        self.tree.modify(Modification {
446            keys,
447            transaction_id: Some(self.transaction_id),
448            operation,
449        })
450    }
451
452    /// Sets `key` to `value`. If a value already exists, it will be returned.
453    pub fn replace(
454        &mut self,
455        key: impl Into<ArcBytes<'static>>,
456        value: impl Into<ArcBytes<'static>>,
457    ) -> Result<Option<ArcBytes<'static>>, Error> {
458        self.tree.replace(key, value, Some(self.transaction_id))
459    }
460
461    /// Returns the current value of `key`. This will return updated information
462    /// if it has been previously updated within this transaction.
463    pub fn get(&mut self, key: &[u8]) -> Result<Option<ArcBytes<'static>>, Error> {
464        self.tree.get(key, true)
465    }
466
467    /// Removes `key` and returns the existing value, if present.
468    pub fn remove(&mut self, key: &[u8]) -> Result<Option<ArcBytes<'static>>, Error> {
469        self.tree.remove(key, Some(self.transaction_id))
470    }
471
472    /// Compares the value of `key` against `old`. If the values match, key will
473    /// be set to the new value if `new` is `Some` or removed if `new` is
474    /// `None`.
475    pub fn compare_and_swap(
476        &mut self,
477        key: &[u8],
478        old: Option<&[u8]>,
479        new: Option<ArcBytes<'_>>,
480    ) -> Result<(), CompareAndSwapError> {
481        self.tree
482            .compare_and_swap(key, old, new, Some(self.transaction_id))
483    }
484
485    /// Retrieves the values of `keys`. If any keys are not found, they will be
486    /// omitted from the results. Keys are required to be pre-sorted.
487    pub fn get_multiple<'keys, KeysIntoIter, KeysIter>(
488        &mut self,
489        keys: KeysIntoIter,
490    ) -> Result<Vec<(ArcBytes<'static>, ArcBytes<'static>)>, Error>
491    where
492        KeysIntoIter: IntoIterator<Item = &'keys [u8], IntoIter = KeysIter>,
493        KeysIter: Iterator<Item = &'keys [u8]> + ExactSizeIterator,
494    {
495        self.tree.get_multiple(keys, true)
496    }
497
498    /// Retrieves all of the values of keys within `range`.
499    pub fn get_range<'keys, KeyRangeBounds>(
500        &mut self,
501        range: &'keys KeyRangeBounds,
502    ) -> Result<Vec<(ArcBytes<'static>, ArcBytes<'static>)>, Error>
503    where
504        KeyRangeBounds: RangeBounds<&'keys [u8]> + Debug + ?Sized,
505    {
506        self.tree.get_range(range, true)
507    }
508
509    /// Scans the tree across all nodes that might contain nodes within `range`.
510    ///
511    /// If `forwards` is true, the tree is scanned in ascending order.
512    /// Otherwise, the tree is scanned in descending order.
513    ///
514    /// `node_evaluator` is invoked for each [`Interior`](crate::tree::Interior)
515    /// node to determine if the node should be traversed. The parameters to the
516    /// callback are:
517    ///
518    /// - `&ArcBytes<'static>`: The maximum key stored within the all children
519    ///   nodes.
520    /// - `&Root::ReducedIndex`: The reduced index value stored within the node.
521    /// - `usize`: The depth of the node. The root nodes are depth 0.
522    ///
523    /// The result of the callback is a [`ScanEvaluation`]. To read children
524    /// nodes, return [`ScanEvaluation::ReadData`].
525    ///
526    /// `key_evaluator` is invoked for each key encountered that is contained
527    /// within `range`. For all [`ScanEvaluation::ReadData`] results returned,
528    /// `callback` will be invoked with the key and values. `callback` may not
529    /// be invoked in the same order as the keys are scanned.
530    #[cfg_attr(
531        feature = "tracing",
532        tracing::instrument(skip(self, node_evaluator, key_evaluator, callback))
533    )]
534    pub fn scan<'b, 'keys, CallerError, KeyRangeBounds, NodeEvaluator, KeyEvaluator, DataCallback>(
535        &mut self,
536        range: &'keys KeyRangeBounds,
537        forwards: bool,
538        mut node_evaluator: NodeEvaluator,
539        mut key_evaluator: KeyEvaluator,
540        mut callback: DataCallback,
541    ) -> Result<(), AbortError<CallerError>>
542    where
543        KeyRangeBounds: RangeBounds<&'keys [u8]> + Debug + ?Sized,
544        NodeEvaluator: FnMut(&ArcBytes<'static>, &Root::ReducedIndex, usize) -> ScanEvaluation,
545        KeyEvaluator: FnMut(&ArcBytes<'static>, &Root::Index) -> ScanEvaluation,
546        DataCallback: FnMut(
547            ArcBytes<'static>,
548            &Root::Index,
549            ArcBytes<'static>,
550        ) -> Result<(), AbortError<CallerError>>,
551        CallerError: Display + Debug,
552    {
553        self.tree.scan(
554            range,
555            forwards,
556            true,
557            &mut node_evaluator,
558            &mut key_evaluator,
559            &mut callback,
560        )
561    }
562
563    /// Returns the reduced index over the provided range. This is an
564    /// aggregation function that builds atop the `scan()` operation which calls
565    /// [`Reducer::reduce()`] and [`Reducer::rereduce()`] on all matching
566    /// indexes stored within the nodes of this tree, producing a single
567    /// aggregated [`Root::ReducedIndex`](tree::Root::ReducedIndex) value.
568    ///
569    /// If no keys match, the returned result is what [`Reducer::rereduce()`]
570    /// returns when an empty slice is provided.
571    #[cfg_attr(feature = "tracing", tracing::instrument(skip(self)))]
572    pub fn reduce<'keys, KeyRangeBounds>(
573        &mut self,
574        range: &'keys KeyRangeBounds,
575    ) -> Result<Root::ReducedIndex, Error>
576    where
577        KeyRangeBounds: RangeBounds<&'keys [u8]> + Debug + Clone + ?Sized,
578    {
579        self.tree.reduce(range, true)
580    }
581
582    /// Returns the first key of the tree.
583    #[cfg_attr(feature = "tracing", tracing::instrument(skip(self)))]
584    pub fn first_key(&mut self) -> Result<Option<ArcBytes<'static>>, Error> {
585        self.tree.first_key(true)
586    }
587
588    /// Returns the first key and value of the tree.
589    #[cfg_attr(feature = "tracing", tracing::instrument(skip(self)))]
590    pub fn first(&mut self) -> Result<Option<(ArcBytes<'static>, ArcBytes<'static>)>, Error> {
591        self.tree.first(true)
592    }
593
594    /// Returns the last key of the tree.
595    #[cfg_attr(feature = "tracing", tracing::instrument(skip(self)))]
596    pub fn last_key(&mut self) -> Result<Option<ArcBytes<'static>>, Error> {
597        self.tree.last_key(true)
598    }
599
600    /// Returns the last key and value of the tree.
601    #[cfg_attr(feature = "tracing", tracing::instrument(skip(self)))]
602    pub fn last(&mut self) -> Result<Option<(ArcBytes<'static>, ArcBytes<'static>)>, Error> {
603        self.tree.last(true)
604    }
605}
606
607/// An error returned from `compare_and_swap()`.
608#[derive(Debug, thiserror::Error)]
609pub enum CompareAndSwapError {
610    /// The stored value did not match the conditional value.
611    #[error("value did not match. existing value: {0:?}")]
612    Conflict(Option<ArcBytes<'static>>),
613    /// Another error occurred while executing the operation.
614    #[error("error during compare_and_swap: {0}")]
615    Error(#[from] Error),
616}
617
618/// A database configuration used to open a database.
619#[derive(Debug)]
620#[must_use]
621pub struct Config<M: FileManager = StdFileManager> {
622    path: PathBuf,
623    vault: Option<Arc<dyn AnyVault>>,
624    cache: Option<ChunkCache>,
625    file_manager: Option<M>,
626    thread_pool: Option<ThreadPool<M::File>>,
627}
628
629impl<M: FileManager> Clone for Config<M> {
630    fn clone(&self) -> Self {
631        Self {
632            path: self.path.clone(),
633            vault: self.vault.clone(),
634            cache: self.cache.clone(),
635            file_manager: self.file_manager.clone(),
636            thread_pool: self.thread_pool.clone(),
637        }
638    }
639}
640
641impl Config<StdFileManager> {
642    /// Creates a new config to open a database located at `path`.
643    pub fn new<P: AsRef<Path>>(path: P) -> Self {
644        Self {
645            path: path.as_ref().to_path_buf(),
646            vault: None,
647            cache: None,
648            thread_pool: None,
649            file_manager: None,
650        }
651    }
652
653    /// Returns a default configuration to open a database located at `path`.
654    pub fn default_for<P: AsRef<Path>>(path: P) -> Self {
655        Self {
656            path: path.as_ref().to_path_buf(),
657            vault: None,
658            cache: Some(ChunkCache::new(2000, 65536)),
659            thread_pool: Some(ThreadPool::default()),
660            file_manager: None,
661        }
662    }
663
664    /// Sets the file manager.
665    ///
666    /// ## Panics
667    ///
668    /// Panics if called after a shared thread pool has been set.
669    pub fn file_manager<M: FileManager>(self, file_manager: M) -> Config<M> {
670        assert!(self.thread_pool.is_none());
671        Config {
672            path: self.path,
673            vault: self.vault,
674            cache: self.cache,
675            file_manager: Some(file_manager),
676            thread_pool: None,
677        }
678    }
679}
680
681impl<M: FileManager> Config<M> {
682    /// Sets the vault to use for this database.
683    pub fn vault<V: AnyVault>(mut self, vault: V) -> Self {
684        self.vault = Some(Arc::new(vault));
685        self
686    }
687
688    /// Sets the chunk cache to use for this database.
689    pub fn cache(mut self, cache: ChunkCache) -> Self {
690        self.cache = Some(cache);
691        self
692    }
693
694    /// Uses the `thread_pool` provided instead of creating its own. This will
695    /// allow a single thread pool to manage multiple [`Roots`] instances'
696    /// transactions.
697    pub fn shared_thread_pool(mut self, thread_pool: &ThreadPool<M::File>) -> Self {
698        self.thread_pool = Some(thread_pool.clone());
699        self
700    }
701
702    /// Opens the database, or creates one if the target path doesn't exist.
703    pub fn open(self) -> Result<Roots<M::File>, Error> {
704        Roots::open(
705            self.path,
706            Context {
707                file_manager: self.file_manager.unwrap_or_default(),
708                vault: self.vault,
709                cache: self.cache,
710            },
711            self.thread_pool.unwrap_or_default(),
712        )
713    }
714}
715
716/// A named collection of keys and values.
717pub struct Tree<Root: tree::Root, File: ManagedFile> {
718    roots: Roots<File>,
719    state: State<Root>,
720    vault: Option<Arc<dyn AnyVault>>,
721    name: Cow<'static, str>,
722}
723
724impl<Root: tree::Root, File: ManagedFile> Clone for Tree<Root, File> {
725    fn clone(&self) -> Self {
726        Self {
727            roots: self.roots.clone(),
728            state: self.state.clone(),
729            vault: self.vault.clone(),
730            name: self.name.clone(),
731        }
732    }
733}
734
735impl<Root: tree::Root, File: ManagedFile> Tree<Root, File> {
736    /// Returns the name of the tree.
737    #[must_use]
738    pub fn name(&self) -> &str {
739        &self.name
740    }
741
742    /// Returns the path to the file for this tree.
743    #[must_use]
744    pub fn path(&self) -> PathBuf {
745        self.roots.tree_path(self.name())
746    }
747
748    /// Returns the number of keys stored in the tree. Does not include deleted keys.
749    #[must_use]
750    pub fn count(&self) -> u64 {
751        let state = self.state.lock();
752        state.root.count()
753    }
754
755    /// Sets `key` to `value`. This is executed within its own transaction.
756    #[allow(clippy::missing_panics_doc)]
757    pub fn set(
758        &self,
759        key: impl Into<ArcBytes<'static>>,
760        value: impl Into<ArcBytes<'static>>,
761    ) -> Result<(), Error> {
762        let transaction = self.begin_transaction()?;
763        transaction.tree::<Root>(0).unwrap().set(key, value)?;
764        transaction.commit()
765    }
766
767    fn begin_transaction(&self) -> Result<ExecutingTransaction<File>, Error> {
768        let mut root = Root::tree(self.name.clone());
769        if let Some(vault) = &self.vault {
770            root.vault = Some(vault.clone());
771        }
772        self.roots.transaction(&[root])
773    }
774
775    fn open_for_read(&self) -> Result<TreeFile<Root, File>, Error> {
776        let context = self.vault.as_ref().map_or_else(
777            || Cow::Borrowed(self.roots.context()),
778            |vault| Cow::Owned(self.roots.context().clone().with_any_vault(vault.clone())),
779        );
780
781        TreeFile::<Root, File>::read(
782            self.path(),
783            self.state.clone(),
784            &context,
785            Some(self.roots.transactions()),
786        )
787    }
788
789    /// Retrieves the current value of `key`, if present. Does not reflect any
790    /// changes in pending transactions.
791    pub fn get(&self, key: &[u8]) -> Result<Option<ArcBytes<'static>>, Error> {
792        catch_compaction_and_retry(|| {
793            let mut tree = self.open_for_read()?;
794
795            tree.get(key, false)
796        })
797    }
798
799    /// Sets `key` to `value`. If a value already exists, it will be returned.
800    #[allow(clippy::missing_panics_doc)]
801    pub fn replace(
802        &mut self,
803        key: impl Into<ArcBytes<'static>>,
804        value: impl Into<ArcBytes<'static>>,
805    ) -> Result<Option<ArcBytes<'static>>, Error> {
806        let transaction = self.begin_transaction()?;
807        let existing_value = transaction.tree::<Root>(0).unwrap().replace(key, value)?;
808        transaction.commit()?;
809        Ok(existing_value)
810    }
811
812    /// Executes a modification.
813    #[allow(clippy::missing_panics_doc)]
814    pub fn modify<'a>(
815        &mut self,
816        keys: Vec<ArcBytes<'a>>,
817        operation: Operation<'a, ArcBytes<'static>>,
818    ) -> Result<(), Error> {
819        let transaction = self.begin_transaction()?;
820        transaction
821            .tree::<Root>(0)
822            .unwrap()
823            .modify(keys, operation)?;
824        transaction.commit()?;
825        Ok(())
826    }
827
828    /// Removes `key` and returns the existing value, if present. This is executed within its own transaction.
829    #[allow(clippy::missing_panics_doc)]
830    pub fn remove(&self, key: &[u8]) -> Result<Option<ArcBytes<'static>>, Error> {
831        let transaction = self.begin_transaction()?;
832        let existing_value = transaction.tree::<Root>(0).unwrap().remove(key)?;
833        transaction.commit()?;
834        Ok(existing_value)
835    }
836
837    /// Compares the value of `key` against `old`. If the values match, key will
838    /// be set to the new value if `new` is `Some` or removed if `new` is
839    /// `None`. This is executed within its own transaction.
840    #[allow(clippy::missing_panics_doc)]
841    pub fn compare_and_swap(
842        &self,
843        key: &[u8],
844        old: Option<&[u8]>,
845        new: Option<ArcBytes<'_>>,
846    ) -> Result<(), CompareAndSwapError> {
847        let transaction = self.begin_transaction()?;
848        transaction
849            .tree::<Root>(0)
850            .unwrap()
851            .compare_and_swap(key, old, new)?;
852        transaction.commit()?;
853        Ok(())
854    }
855
856    /// Retrieves the values of `keys`. If any keys are not found, they will be
857    /// omitted from the results. Keys are required to be pre-sorted.
858    #[allow(clippy::needless_pass_by_value)]
859    pub fn get_multiple<'keys, Keys>(
860        &self,
861        keys: Keys,
862    ) -> Result<Vec<(ArcBytes<'static>, ArcBytes<'static>)>, Error>
863    where
864        Keys: Iterator<Item = &'keys [u8]> + ExactSizeIterator + Clone,
865    {
866        catch_compaction_and_retry(|| {
867            let mut tree = self.open_for_read()?;
868
869            tree.get_multiple(keys.clone(), false)
870        })
871    }
872
873    /// Retrieves all of the values of keys within `range`.
874    pub fn get_range<'keys, KeyRangeBounds>(
875        &self,
876        range: &'keys KeyRangeBounds,
877    ) -> Result<Vec<(ArcBytes<'static>, ArcBytes<'static>)>, Error>
878    where
879        KeyRangeBounds: RangeBounds<&'keys [u8]> + Debug + Clone + ?Sized,
880    {
881        catch_compaction_and_retry(|| {
882            let mut tree = self.open_for_read()?;
883
884            tree.get_range(range, false)
885        })
886    }
887
888    /// Scans the tree across all nodes that might contain nodes within `range`.
889    ///
890    /// If `forwards` is true, the tree is scanned in ascending order.
891    /// Otherwise, the tree is scanned in descending order.
892    ///
893    /// `node_evaluator` is invoked for each [`Interior`](crate::tree::Interior) node to determine if
894    /// the node should be traversed. The parameters to the callback are:
895    ///
896    /// - `&ArcBytes<'static>`: The maximum key stored within the all children
897    ///   nodes.
898    /// - `&Root::ReducedIndex`: The reduced index value stored within the node.
899    /// - `usize`: The depth of the node. The root nodes are depth 0.
900    ///
901    /// The result of the callback is a [`ScanEvaluation`]. To read children
902    /// nodes, return [`ScanEvaluation::ReadData`].
903    ///
904    /// `key_evaluator` is invoked for each key encountered that is contained
905    /// within `range`. For all [`ScanEvaluation::ReadData`] results returned,
906    /// `callback` will be invoked with the key and values. `callback` may not
907    /// be invoked in the same order as the keys are scanned.
908    #[cfg_attr(
909        feature = "tracing",
910        tracing::instrument(skip(self, node_evaluator, key_evaluator, callback))
911    )]
912    pub fn scan<'keys, CallerError, KeyRangeBounds, NodeEvaluator, KeyEvaluator, DataCallback>(
913        &self,
914        range: &'keys KeyRangeBounds,
915        forwards: bool,
916        mut node_evaluator: NodeEvaluator,
917        mut key_evaluator: KeyEvaluator,
918        mut callback: DataCallback,
919    ) -> Result<(), AbortError<CallerError>>
920    where
921        KeyRangeBounds: RangeBounds<&'keys [u8]> + Debug + Clone + ?Sized,
922        NodeEvaluator: FnMut(&ArcBytes<'static>, &Root::ReducedIndex, usize) -> ScanEvaluation,
923        KeyEvaluator: FnMut(&ArcBytes<'static>, &Root::Index) -> ScanEvaluation,
924        DataCallback: FnMut(
925            ArcBytes<'static>,
926            &Root::Index,
927            ArcBytes<'static>,
928        ) -> Result<(), AbortError<CallerError>>,
929        CallerError: Display + Debug,
930    {
931        catch_compaction_and_retry_abortable(move || {
932            let mut tree = self.open_for_read()?;
933
934            tree.scan(
935                range,
936                forwards,
937                false,
938                &mut node_evaluator,
939                &mut key_evaluator,
940                &mut callback,
941            )
942        })
943    }
944
945    /// Returns the reduced index over the provided range. This is an
946    /// aggregation function that builds atop the `scan()` operation which calls
947    /// [`Reducer::reduce()`] and [`Reducer::rereduce()`] on all matching
948    /// indexes stored within the nodes of this tree, producing a single
949    /// aggregated [`Root::ReducedIndex`](tree::Root::ReducedIndex) value.
950    ///
951    /// If no keys match, the returned result is what [`Reducer::rereduce()`]
952    /// returns when an empty slice is provided.
953    #[cfg_attr(feature = "tracing", tracing::instrument(skip(self)))]
954    pub fn reduce<'keys, KeyRangeBounds>(
955        &self,
956        range: &'keys KeyRangeBounds,
957    ) -> Result<Root::ReducedIndex, Error>
958    where
959        KeyRangeBounds: RangeBounds<&'keys [u8]> + Debug + Clone + ?Sized,
960    {
961        catch_compaction_and_retry(move || {
962            let mut tree = self.open_for_read()?;
963
964            tree.reduce(range, false)
965        })
966    }
967
968    /// Returns the first key of the tree.
969    #[cfg_attr(feature = "tracing", tracing::instrument(skip(self)))]
970    pub fn first_key(&self) -> Result<Option<ArcBytes<'static>>, Error> {
971        catch_compaction_and_retry(|| {
972            let mut tree = self.open_for_read()?;
973
974            tree.first_key(false)
975        })
976    }
977
978    /// Returns the first key and value of the tree.
979    #[cfg_attr(feature = "tracing", tracing::instrument(skip(self)))]
980    pub fn first(&self) -> Result<Option<(ArcBytes<'static>, ArcBytes<'static>)>, Error> {
981        catch_compaction_and_retry(|| {
982            let mut tree = self.open_for_read()?;
983
984            tree.first(false)
985        })
986    }
987
988    /// Returns the last key of the tree.
989    #[cfg_attr(feature = "tracing", tracing::instrument(skip(self)))]
990    pub fn last_key(&self) -> Result<Option<ArcBytes<'static>>, Error> {
991        catch_compaction_and_retry(|| {
992            let mut tree = self.open_for_read()?;
993
994            tree.last_key(false)
995        })
996    }
997
998    /// Returns the last key and value of the tree.
999    #[cfg_attr(feature = "tracing", tracing::instrument(skip(self)))]
1000    pub fn last(&self) -> Result<Option<(ArcBytes<'static>, ArcBytes<'static>)>, Error> {
1001        catch_compaction_and_retry(|| {
1002            let mut tree = self.open_for_read()?;
1003
1004            tree.last(false)
1005        })
1006    }
1007
1008    /// Rewrites the database to remove data that is no longer current. Because
1009    /// Nebari uses an append-only format, this is helpful in reducing disk
1010    /// usage.
1011    ///
1012    /// See [`TreeFile::compact()`](crate::tree::TreeFile::compact) for more
1013    /// information.
1014    pub fn compact(&self) -> Result<(), Error> {
1015        let tree = self.open_for_read()?;
1016        tree.compact(
1017            &self.roots.context().file_manager,
1018            Some(TransactableCompaction {
1019                name: self.name.as_ref(),
1020                manager: self.roots.transactions(),
1021            }),
1022        )?;
1023        Ok(())
1024    }
1025}
1026
1027impl<Root: tree::Root, File: ManagedFile> AnyTreeRoot<File> for Tree<Root, File> {
1028    fn name(&self) -> &str {
1029        &self.name
1030    }
1031
1032    fn default_state(&self) -> Box<dyn AnyTreeState> {
1033        Box::new(State::<Root>::default())
1034    }
1035
1036    fn begin_transaction(
1037        &self,
1038        transaction_id: u64,
1039        file_path: &Path,
1040        state: &dyn AnyTreeState,
1041        context: &Context<File::Manager>,
1042        transactions: Option<&TransactionManager<File::Manager>>,
1043    ) -> Result<Box<dyn AnyTransactionTree<File>>, Error> {
1044        let context = self.vault.as_ref().map_or_else(
1045            || Cow::Borrowed(context),
1046            |vault| Cow::Owned(context.clone().with_any_vault(vault.clone())),
1047        );
1048        let tree = TreeFile::write(
1049            file_path,
1050            state
1051                .as_any()
1052                .downcast_ref::<State<Root>>()
1053                .unwrap()
1054                .clone(),
1055            &context,
1056            transactions,
1057        )?;
1058
1059        Ok(Box::new(TransactionTree {
1060            transaction_id,
1061            tree,
1062        }))
1063    }
1064}
1065
1066impl<File: ManagedFile, Index> Tree<VersionedTreeRoot<Index>, File>
1067where
1068    Index: EmbeddedIndex + Reducer<Index> + Clone + Debug + 'static,
1069{
1070    /// Scans the tree for keys that are contained within `range`. If `forwards`
1071    /// is true, scanning starts at the lowest sort-order key and scans forward.
1072    /// Otherwise, scanning starts at the highest sort-order key and scans
1073    /// backwards. `key_evaluator` is invoked for each key as it is encountered.
1074    /// For all [`ScanEvaluation::ReadData`] results returned, `callback` will be
1075    /// invoked with the key and values. The callback may not be invoked in the
1076    /// same order as the keys are scanned.
1077    #[cfg_attr(
1078        feature = "tracing",
1079        tracing::instrument(skip(self, key_evaluator, data_callback))
1080    )]
1081    pub fn scan_sequences<CallerError, Range, KeyEvaluator, DataCallback>(
1082        &mut self,
1083        range: Range,
1084        forwards: bool,
1085        key_evaluator: &mut KeyEvaluator,
1086        data_callback: &mut DataCallback,
1087    ) -> Result<(), AbortError<CallerError>>
1088    where
1089        Range: Clone + RangeBounds<u64> + Debug + 'static,
1090        KeyEvaluator: FnMut(KeySequence) -> ScanEvaluation,
1091        DataCallback: FnMut(KeySequence, ArcBytes<'static>) -> Result<(), AbortError<CallerError>>,
1092        CallerError: Display + Debug,
1093    {
1094        catch_compaction_and_retry_abortable(|| {
1095            let mut tree = TreeFile::<VersionedTreeRoot<Index>, File>::read(
1096                self.path(),
1097                self.state.clone(),
1098                self.roots.context(),
1099                Some(self.roots.transactions()),
1100            )?;
1101
1102            tree.scan_sequences(range.clone(), forwards, false, key_evaluator, data_callback)
1103        })
1104    }
1105}
1106
1107/// An error that could come from user code or Nebari.
1108#[derive(thiserror::Error, Debug)]
1109pub enum AbortError<CallerError: Display + Debug = Infallible> {
1110    /// An error unrelated to Nebari occurred.
1111    #[error("other error: {0}")]
1112    Other(CallerError),
1113    /// An error from Roots occurred.
1114    #[error("database error: {0}")]
1115    Nebari(#[from] Error),
1116}
1117
1118impl AbortError<Infallible> {
1119    /// Unwraps the error contained within an infallible abort error.
1120    #[must_use]
1121    pub fn infallible(self) -> Error {
1122        match self {
1123            AbortError::Other(_) => unreachable!(),
1124            AbortError::Nebari(error) => error,
1125        }
1126    }
1127}
1128
1129/// A thread pool that commits transactions to disk in parallel.
1130#[derive(Debug)]
1131pub struct ThreadPool<File>
1132where
1133    File: ManagedFile,
1134{
1135    sender: flume::Sender<ThreadCommit<File>>,
1136    receiver: flume::Receiver<ThreadCommit<File>>,
1137    thread_count: Arc<AtomicU16>,
1138    maximum_threads: usize,
1139}
1140
1141impl<File: ManagedFile> ThreadPool<File> {
1142    /// Returns a thread pool that will spawn up to `maximum_threads` to process
1143    /// file operations.
1144    #[must_use]
1145    pub fn new(maximum_threads: usize) -> Self {
1146        let (sender, receiver) = flume::unbounded();
1147        Self {
1148            sender,
1149            receiver,
1150            thread_count: Arc::new(AtomicU16::new(0)),
1151            maximum_threads,
1152        }
1153    }
1154
1155    fn commit_trees(
1156        &self,
1157        trees: Vec<UnlockedTransactionTree<File>>,
1158    ) -> Result<Vec<Box<dyn AnyTransactionTree<File>>>, Error> {
1159        // If we only have one tree, there's no reason to split IO across
1160        // threads. If we have multiple trees, we should split even with one
1161        // cpu: if one thread blocks, the other can continue executing.
1162        if trees.len() == 1 {
1163            let mut tree = trees.into_iter().next().unwrap().0.into_inner();
1164            tree.commit()?;
1165            Ok(vec![tree])
1166        } else {
1167            // Push the trees so that any existing threads can begin processing the queue.
1168            let (completion_sender, completion_receiver) = flume::unbounded();
1169            let tree_count = trees.len();
1170            for tree in trees {
1171                self.sender.send(ThreadCommit {
1172                    tree: tree.0.into_inner(),
1173                    completion_sender: completion_sender.clone(),
1174                })?;
1175            }
1176
1177            // Scale the queue if needed.
1178            let desired_threads = tree_count.min(self.maximum_threads);
1179            loop {
1180                let thread_count = self.thread_count.load(Ordering::SeqCst);
1181                if (thread_count as usize) >= desired_threads {
1182                    break;
1183                }
1184
1185                // Spawn a thread, but ensure that we don't spin up too many threads if another thread is committing at the same time.
1186                if self
1187                    .thread_count
1188                    .compare_exchange(
1189                        thread_count,
1190                        thread_count + 1,
1191                        Ordering::SeqCst,
1192                        Ordering::SeqCst,
1193                    )
1194                    .is_ok()
1195                {
1196                    let commit_receiver = self.receiver.clone();
1197                    std::thread::Builder::new()
1198                        .name(String::from("roots-txwriter"))
1199                        .spawn(move || transaction_commit_thread(commit_receiver))
1200                        .unwrap();
1201                }
1202            }
1203
1204            // Wait for our results
1205            let mut results = Vec::with_capacity(tree_count);
1206            for _ in 0..tree_count {
1207                results.push(completion_receiver.recv()??);
1208            }
1209
1210            Ok(results)
1211        }
1212    }
1213}
1214
1215impl<File: ManagedFile> Clone for ThreadPool<File> {
1216    fn clone(&self) -> Self {
1217        Self {
1218            sender: self.sender.clone(),
1219            receiver: self.receiver.clone(),
1220            thread_count: self.thread_count.clone(),
1221            maximum_threads: self.maximum_threads,
1222        }
1223    }
1224}
1225
1226impl<File: ManagedFile> Default for ThreadPool<File> {
1227    fn default() -> Self {
1228        static CPU_COUNT: Lazy<usize> = Lazy::new(num_cpus::get);
1229        Self::new(*CPU_COUNT)
1230    }
1231}
1232
1233#[allow(clippy::needless_pass_by_value)]
1234fn transaction_commit_thread<File: ManagedFile>(receiver: flume::Receiver<ThreadCommit<File>>) {
1235    while let Ok(ThreadCommit {
1236        mut tree,
1237        completion_sender,
1238    }) = receiver.recv()
1239    {
1240        let result = tree.commit();
1241        let result = result.map(move |_| tree);
1242        drop(completion_sender.send(result));
1243    }
1244}
1245
1246struct ThreadCommit<File>
1247where
1248    File: ManagedFile,
1249{
1250    tree: Box<dyn AnyTransactionTree<File>>,
1251    completion_sender: Sender<Result<Box<dyn AnyTransactionTree<File>>, Error>>,
1252}
1253
1254fn catch_compaction_and_retry<R, F: Fn() -> Result<R, Error>>(func: F) -> Result<R, Error> {
1255    loop {
1256        match func() {
1257            Ok(result) => return Ok(result),
1258            Err(error) => {
1259                if matches!(error.kind, ErrorKind::TreeCompacted) {
1260                    continue;
1261                }
1262
1263                return Err(error);
1264            }
1265        }
1266    }
1267}
1268
1269fn catch_compaction_and_retry_abortable<
1270    R,
1271    E: Display + Debug,
1272    F: FnMut() -> Result<R, AbortError<E>>,
1273>(
1274    mut func: F,
1275) -> Result<R, AbortError<E>> {
1276    loop {
1277        match func() {
1278            Ok(result) => return Ok(result),
1279            Err(AbortError::Nebari(error)) => {
1280                if matches!(error.kind, ErrorKind::TreeCompacted) {
1281                    continue;
1282                }
1283
1284                return Err(AbortError::Nebari(error));
1285            }
1286            Err(other) => return Err(other),
1287        }
1288    }
1289}
1290
1291#[cfg(test)]
1292mod tests {
1293    use byteorder::{BigEndian, ByteOrder};
1294    use tempfile::tempdir;
1295
1296    use super::*;
1297    use crate::{
1298        io::{any::AnyFileManager, fs::StdFileManager, memory::MemoryFileManager},
1299        test_util::RotatorVault,
1300        tree::{Root, Unversioned, Versioned},
1301    };
1302
1303    fn basic_get_set<M: FileManager>(file_manager: M) {
1304        let tempdir = tempdir().unwrap();
1305        let roots = Config::new(tempdir.path())
1306            .file_manager(file_manager)
1307            .open()
1308            .unwrap();
1309
1310        let tree = roots.tree(Versioned::tree("test")).unwrap();
1311        tree.set(b"test", b"value").unwrap();
1312        let result = tree.get(b"test").unwrap().expect("key not found");
1313
1314        assert_eq!(result, b"value");
1315    }
1316
1317    #[test]
1318    fn memory_basic_get_set() {
1319        basic_get_set(MemoryFileManager::default());
1320    }
1321
1322    #[test]
1323    fn std_basic_get_set() {
1324        basic_get_set(StdFileManager::default());
1325    }
1326
1327    #[test]
1328    fn basic_transaction_isolation_test() {
1329        let tempdir = tempdir().unwrap();
1330
1331        let roots = Config::<StdFileManager>::new(tempdir.path())
1332            .open()
1333            .unwrap();
1334        let tree = roots.tree(Versioned::tree("test")).unwrap();
1335        tree.set(b"test", b"value").unwrap();
1336
1337        // Begin a transaction
1338        let transaction = roots.transaction(&[Versioned::tree("test")]).unwrap();
1339
1340        // Replace the key with a new value.
1341        transaction
1342            .tree::<Versioned>(0)
1343            .unwrap()
1344            .set(b"test", b"updated value")
1345            .unwrap();
1346
1347        // Check that the transaction can read the new value
1348        let result = transaction
1349            .tree::<Versioned>(0)
1350            .unwrap()
1351            .get(b"test")
1352            .unwrap()
1353            .expect("key not found");
1354        assert_eq!(result, b"updated value");
1355
1356        // Ensure that existing read-access doesn't see the new value
1357        let result = tree.get(b"test").unwrap().expect("key not found");
1358        assert_eq!(result, b"value");
1359
1360        // Commit the transaction
1361        transaction.commit().unwrap();
1362
1363        // Ensure that the reader now sees the new value
1364        let result = tree.get(b"test").unwrap().expect("key not found");
1365        assert_eq!(result, b"updated value");
1366    }
1367
1368    #[test]
1369    fn basic_transaction_rollback_test() {
1370        let tempdir = tempdir().unwrap();
1371
1372        let roots = Config::<StdFileManager>::new(tempdir.path())
1373            .open()
1374            .unwrap();
1375        let tree = roots.tree(Versioned::tree("test")).unwrap();
1376        tree.set(b"test", b"value").unwrap();
1377
1378        // Begin a transaction
1379        let transaction = roots.transaction(&[Versioned::tree("test")]).unwrap();
1380
1381        // Replace the key with a new value.
1382        transaction
1383            .tree::<Versioned>(0)
1384            .unwrap()
1385            .set(b"test", b"updated value")
1386            .unwrap();
1387
1388        // Roll the transaction back
1389        transaction.rollback();
1390
1391        // Ensure that the reader still sees the old value
1392        let result = tree.get(b"test").unwrap().expect("key not found");
1393        assert_eq!(result, b"value");
1394
1395        // Begin a new transaction
1396        let transaction = roots.transaction(&[Versioned::tree("test")]).unwrap();
1397        // Check that the transaction has the original value
1398        let result = transaction
1399            .tree::<Versioned>(0)
1400            .unwrap()
1401            .get(b"test")
1402            .unwrap()
1403            .expect("key not found");
1404        assert_eq!(result, b"value");
1405    }
1406
1407    #[test]
1408    fn std_compact_test_versioned() {
1409        compact_test::<Versioned, _>(StdFileManager::default());
1410    }
1411
1412    #[test]
1413    fn std_compact_test_unversioned() {
1414        compact_test::<Unversioned, _>(StdFileManager::default());
1415    }
1416
1417    #[test]
1418    fn memory_compact_test_versioned() {
1419        compact_test::<Versioned, _>(MemoryFileManager::default());
1420    }
1421
1422    #[test]
1423    fn memory_compact_test_unversioned() {
1424        compact_test::<Unversioned, _>(MemoryFileManager::default());
1425    }
1426
1427    #[test]
1428    fn any_compact_test_versioned() {
1429        compact_test::<Versioned, _>(AnyFileManager::std());
1430        compact_test::<Versioned, _>(AnyFileManager::memory());
1431    }
1432
1433    #[test]
1434    fn any_compact_test_unversioned() {
1435        compact_test::<Unversioned, _>(AnyFileManager::std());
1436        compact_test::<Unversioned, _>(AnyFileManager::memory());
1437    }
1438
1439    fn compact_test<R: Root, M: FileManager>(file_manager: M) {
1440        const OPERATION_COUNT: usize = 256;
1441        const WORKER_COUNT: usize = 4;
1442        let tempdir = tempdir().unwrap();
1443
1444        let roots = Config::new(tempdir.path())
1445            .file_manager(file_manager)
1446            .open()
1447            .unwrap();
1448        let tree = roots.tree(R::tree("test")).unwrap();
1449        tree.set("foo", b"bar").unwrap();
1450
1451        // Spawn a pool of threads that will perform a series of operations
1452        let mut threads = Vec::new();
1453        for worker in 0..WORKER_COUNT {
1454            let tree = tree.clone();
1455            threads.push(std::thread::spawn(move || {
1456                for relative_id in 0..OPERATION_COUNT {
1457                    let absolute_id = (worker * OPERATION_COUNT + relative_id) as u64;
1458                    tree.set(absolute_id.to_be_bytes(), absolute_id.to_be_bytes())
1459                        .unwrap();
1460                    let value = tree
1461                        .remove(&absolute_id.to_be_bytes())
1462                        .unwrap()
1463                        .ok_or_else(|| panic!("value not found: {:?}", absolute_id))
1464                        .unwrap();
1465                    assert_eq!(BigEndian::read_u64(&value), absolute_id);
1466                    tree.set(absolute_id.to_be_bytes(), absolute_id.to_be_bytes())
1467                        .unwrap();
1468                    let newer_value = tree
1469                        .get(&absolute_id.to_be_bytes())
1470                        .unwrap()
1471                        .expect("couldn't find found");
1472                    assert_eq!(value, newer_value);
1473                }
1474            }));
1475        }
1476
1477        threads.push(std::thread::spawn(move || {
1478            // While those workers are running, this thread is going to continually
1479            // execute compaction.
1480            while tree.count() < (OPERATION_COUNT * WORKER_COUNT) as u64 {
1481                tree.compact().unwrap();
1482            }
1483        }));
1484
1485        for thread in threads {
1486            thread.join().unwrap();
1487        }
1488    }
1489
1490    #[test]
1491    fn name_tests() {
1492        assert!(check_name("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ_-.").is_ok());
1493        assert!(check_name("=").is_err());
1494        assert!(check_name("_transactions").is_err());
1495    }
1496
1497    #[test]
1498    fn context_encryption_tests() {
1499        let tempdir = tempdir().unwrap();
1500
1501        // Encrypt a tree using default encryption via the context
1502        {
1503            let roots = Config::<StdFileManager>::new(tempdir.path())
1504                .vault(RotatorVault::new(13))
1505                .open()
1506                .unwrap();
1507            let tree = roots.tree(Versioned::tree("test")).unwrap();
1508            tree.set(b"test", b"value").unwrap();
1509            let other_tree = roots
1510                .tree(Versioned::tree("test-otherkey").with_vault(RotatorVault::new(42)))
1511                .unwrap();
1512            other_tree.set(b"test", b"other").unwrap();
1513        }
1514        // Try to access the tree with the vault again.
1515        {
1516            let roots = Config::<StdFileManager>::new(tempdir.path())
1517                .vault(RotatorVault::new(13))
1518                .open()
1519                .unwrap();
1520            let tree = roots.tree(Versioned::tree("test")).unwrap();
1521            let value = tree.get(b"test").unwrap();
1522            assert_eq!(value.as_deref(), Some(&b"value"[..]));
1523
1524            // Verify we can't read the other tree without the right vault
1525            let bad_tree = roots.tree(Versioned::tree("test-otherkey")).unwrap();
1526            assert!(bad_tree.get(b"test").is_err());
1527
1528            // And test retrieving the other key with the correct vault
1529            let tree = roots
1530                .tree(Versioned::tree("test-otherkey").with_vault(RotatorVault::new(42)))
1531                .unwrap();
1532            let value = tree.get(b"test").unwrap();
1533            assert_eq!(value.as_deref(), Some(&b"other"[..]));
1534        }
1535        {
1536            let roots = Config::<StdFileManager>::new(tempdir.path())
1537                .open()
1538                .unwrap();
1539            // Try to access roots without the vault.
1540            let bad_tree = roots.tree(Versioned::tree("test")).unwrap();
1541            assert!(bad_tree.get(b"test").is_err());
1542
1543            // Try to access roots with the vault specified. In this situation, the transaction log will be unreadable, causing itself to not consider any transactions valid.
1544            let bad_tree = roots
1545                .tree(Versioned::tree("test").with_vault(RotatorVault::new(13)))
1546                .unwrap();
1547            assert_eq!(bad_tree.get(b"test").unwrap(), None);
1548        }
1549    }
1550
1551    #[test]
1552    fn too_large_transaction() {
1553        let tempdir = tempdir().unwrap();
1554
1555        let config = Config::<StdFileManager>::new(tempdir.path());
1556        {
1557            let roots = config.clone().open().unwrap();
1558
1559            let mut transaction = roots.transaction(&[Versioned::tree("test")]).unwrap();
1560
1561            // Write some data to the tree.
1562            transaction
1563                .tree::<Versioned>(0)
1564                .unwrap()
1565                .set(b"test", vec![0; 16 * 1024 * 1024])
1566                .unwrap();
1567
1568            // Issue a transaction that's too large.
1569            assert!(matches!(
1570                transaction
1571                    .entry_mut()
1572                    .set_data(vec![0; 16 * 1024 * 1024 - 7])
1573                    .unwrap_err()
1574                    .kind,
1575                ErrorKind::ValueTooLarge
1576            ));
1577            // Roll the transaction back
1578            transaction.rollback();
1579        }
1580        // Ensure that we can still write to the tree.
1581        {
1582            let roots = config.open().unwrap();
1583
1584            let transaction = roots.transaction(&[Versioned::tree("test")]).unwrap();
1585
1586            // Write some data to the tree
1587            transaction
1588                .tree::<Versioned>(0)
1589                .unwrap()
1590                .set(b"test", b"updated value")
1591                .unwrap();
1592
1593            transaction.commit().unwrap();
1594        }
1595    }
1596}