1use std::{
2 any::Any,
3 borrow::{Borrow, Cow},
4 collections::HashMap,
5 convert::Infallible,
6 fmt::{Debug, Display},
7 fs,
8 ops::{Deref, DerefMut, RangeBounds},
9 path::{Path, PathBuf},
10 sync::{
11 atomic::{AtomicU16, Ordering},
12 Arc,
13 },
14};
15
16use flume::Sender;
17use once_cell::sync::Lazy;
18use parking_lot::{MappedMutexGuard, Mutex, MutexGuard};
19
20use crate::{
21 context::Context,
22 error::Error,
23 io::{fs::StdFileManager, FileManager, ManagedFile},
24 transaction::{LogEntry, ManagedTransaction, TransactionManager},
25 tree::{
26 self, root::AnyTreeRoot, state::AnyTreeState, EmbeddedIndex, KeySequence, Modification,
27 Operation, Reducer, ScanEvaluation, State, TransactableCompaction, TreeFile, TreeRoot,
28 VersionedTreeRoot,
29 },
30 vault::AnyVault,
31 ArcBytes, ChunkCache, ErrorKind,
32};
33
34#[derive(Debug)]
36pub struct Roots<File: ManagedFile> {
37 data: Arc<Data<File>>,
38}
39
40#[derive(Debug)]
41struct Data<File: ManagedFile> {
42 context: Context<File::Manager>,
43 transactions: TransactionManager<File::Manager>,
44 thread_pool: ThreadPool<File>,
45 path: PathBuf,
46 tree_states: Mutex<HashMap<String, Box<dyn AnyTreeState>>>,
47}
48
49impl<File: ManagedFile> Roots<File> {
50 fn open<P: Into<PathBuf> + Send>(
51 path: P,
52 context: Context<File::Manager>,
53 thread_pool: ThreadPool<File>,
54 ) -> Result<Self, Error> {
55 let path = path.into();
56 if !path.exists() {
57 fs::create_dir_all(&path)?;
58 } else if !path.is_dir() {
59 return Err(Error::from(format!(
60 "'{:?}' already exists, but is not a directory.",
61 path
62 )));
63 }
64
65 let transactions = TransactionManager::spawn(&path, context.clone())?;
66 Ok(Self {
67 data: Arc::new(Data {
68 context,
69 path,
70 transactions,
71 thread_pool,
72 tree_states: Mutex::default(),
73 }),
74 })
75 }
76
77 #[must_use]
79 pub fn path(&self) -> &Path {
80 &self.data.path
81 }
82
83 pub fn context(&self) -> &Context<File::Manager> {
85 &self.data.context
86 }
87
88 #[must_use]
90 pub fn transactions(&self) -> &TransactionManager<File::Manager> {
91 &self.data.transactions
92 }
93
94 pub fn tree<Root: tree::Root>(
102 &self,
103 root: TreeRoot<Root, File>,
104 ) -> Result<Tree<Root, File>, Error> {
105 check_name(&root.name)?;
106 let path = self.tree_path(&root.name);
107 if !path.exists() {
108 self.context().file_manager.append(&path)?;
109 }
110 let state = self.tree_state(root.name.clone());
111 Ok(Tree {
112 roots: self.clone(),
113 state,
114 vault: root.vault,
115 name: root.name,
116 })
117 }
118
119 fn tree_path(&self, name: &str) -> PathBuf {
120 self.path().join(format!("{}.nebari", name))
121 }
122
123 pub fn delete_tree(&self, name: impl Into<Cow<'static, str>>) -> Result<bool, Error> {
125 let name = name.into();
126 let mut tree_states = self.data.tree_states.lock();
127 self.context()
128 .file_manager
129 .delete(self.tree_path(name.as_ref()))?;
130 Ok(tree_states.remove(name.as_ref()).is_some())
131 }
132
133 pub fn tree_names(&self) -> Result<Vec<String>, Error> {
135 let mut names = Vec::new();
136 for entry in std::fs::read_dir(self.path())? {
137 let entry = entry?;
138 if let Some(name) = entry.file_name().to_str() {
139 if let Some(without_extension) = name.strip_suffix(".nebari") {
140 names.push(without_extension.to_string());
141 }
142 }
143 }
144 Ok(names)
145 }
146
147 fn tree_state<Root: tree::Root>(&self, name: impl Into<Cow<'static, str>>) -> State<Root> {
148 self.tree_states(&[Root::tree(name)])
149 .into_iter()
150 .next()
151 .unwrap()
152 .as_ref()
153 .as_any()
154 .downcast_ref::<State<Root>>()
155 .unwrap()
156 .clone()
157 }
158
159 fn tree_states<R: Borrow<T>, T: AnyTreeRoot<File> + ?Sized>(
160 &self,
161 trees: &[R],
162 ) -> Vec<Box<dyn AnyTreeState>> {
163 let mut tree_states = self.data.tree_states.lock();
164 let mut output = Vec::with_capacity(trees.len());
165 for tree in trees {
166 let state = tree_states
167 .entry(tree.borrow().name().to_string())
168 .or_insert_with(|| tree.borrow().default_state())
169 .cloned();
170 output.push(state);
171 }
172 output
173 }
174
175 pub fn transaction<R: Borrow<T>, T: AnyTreeRoot<File> + ?Sized>(
185 &self,
186 trees: &[R],
187 ) -> Result<ExecutingTransaction<File>, Error> {
188 for tree in trees {
189 check_name(tree.borrow().name()).map(|_| tree.borrow().name().as_bytes())?;
190 }
191 let transaction = self
192 .data
193 .transactions
194 .new_transaction(trees.iter().map(|t| t.borrow().name().as_bytes()));
195 let states = self.tree_states(trees);
196 let trees = trees
197 .iter()
198 .zip(states.into_iter())
199 .map(|(tree, state)| {
200 tree.borrow()
201 .begin_transaction(
202 transaction.id,
203 &self.tree_path(tree.borrow().name()),
204 state.as_ref(),
205 self.context(),
206 Some(&self.data.transactions),
207 )
208 .map(UnlockedTransactionTree::new)
209 })
210 .collect::<Result<Vec<_>, Error>>()?;
211 Ok(ExecutingTransaction {
212 roots: self.clone(),
213 transaction: Some(transaction),
214 trees,
215 })
216 }
217}
218
219fn check_name(name: &str) -> Result<(), Error> {
220 if name != "_transactions"
221 && name
222 .bytes()
223 .all(|c| matches!(c as char, 'a'..='z' | 'A'..='Z' | '0'..='9' | '-' | '.' | '_'))
224 {
225 Ok(())
226 } else {
227 Err(Error::from(ErrorKind::InvalidTreeName))
228 }
229}
230
231impl<File: ManagedFile> Clone for Roots<File> {
232 fn clone(&self) -> Self {
233 Self {
234 data: self.data.clone(),
235 }
236 }
237}
238
239#[must_use]
242pub struct ExecutingTransaction<File: ManagedFile> {
243 roots: Roots<File>,
244 trees: Vec<UnlockedTransactionTree<File>>,
245 transaction: Option<ManagedTransaction<File::Manager>>,
246}
247
248#[must_use]
250pub struct UnlockedTransactionTree<File: ManagedFile>(Mutex<Box<dyn AnyTransactionTree<File>>>);
251
252impl<File: ManagedFile> UnlockedTransactionTree<File> {
253 fn new(file: Box<dyn AnyTransactionTree<File>>) -> Self {
254 Self(Mutex::new(file))
255 }
256
257 pub fn lock<Root: tree::Root>(&self) -> LockedTransactionTree<'_, Root, File> {
264 LockedTransactionTree(MutexGuard::map(self.0.lock(), |tree| {
265 tree.as_mut().as_any_mut().downcast_mut().unwrap()
266 }))
267 }
268}
269
270#[must_use]
273pub struct LockedTransactionTree<'transaction, Root: tree::Root, File: ManagedFile>(
274 MappedMutexGuard<'transaction, TransactionTree<Root, File>>,
275);
276
277impl<'transaction, Root: tree::Root, File: ManagedFile> Deref
278 for LockedTransactionTree<'transaction, Root, File>
279{
280 type Target = TransactionTree<Root, File>;
281
282 fn deref(&self) -> &Self::Target {
283 &*self.0
284 }
285}
286
287impl<'transaction, Root: tree::Root, File: ManagedFile> DerefMut
288 for LockedTransactionTree<'transaction, Root, File>
289{
290 fn deref_mut(&mut self) -> &mut Self::Target {
291 &mut *self.0
292 }
293}
294
295impl<File: ManagedFile> ExecutingTransaction<File> {
296 #[must_use]
298 #[allow(clippy::missing_panics_doc)]
299 pub fn entry(&self) -> &LogEntry<'static> {
300 self.transaction
301 .as_ref()
302 .and_then(|tx| tx.transaction.as_ref())
303 .unwrap()
304 }
305
306 #[must_use]
308 #[allow(clippy::missing_panics_doc)]
309 pub fn entry_mut(&mut self) -> &mut LogEntry<'static> {
310 self.transaction
311 .as_mut()
312 .and_then(|tx| tx.transaction.as_mut())
313 .unwrap()
314 }
315
316 #[allow(clippy::missing_panics_doc)]
320 pub fn commit(mut self) -> Result<(), Error> {
321 let trees = std::mem::take(&mut self.trees);
322 let trees = self.roots.data.thread_pool.commit_trees(trees)?;
324
325 let transaction = self.transaction.take().unwrap();
327 let tree_locks = transaction.commit()?;
328
329 for tree in trees {
331 tree.state().publish();
332 }
333
334 drop(tree_locks);
336
337 Ok(())
338 }
339
340 pub fn rollback(self) {
344 drop(self);
345 }
346
347 pub fn tree<Root: tree::Root>(
349 &self,
350 index: usize,
351 ) -> Option<LockedTransactionTree<'_, Root, File>> {
352 self.unlocked_tree(index).map(UnlockedTransactionTree::lock)
353 }
354
355 pub fn unlocked_tree(&self, index: usize) -> Option<&UnlockedTransactionTree<File>> {
359 self.trees.get(index)
360 }
361
362 fn rollback_tree_states(&mut self) {
363 for tree in self.trees.drain(..) {
364 let tree = tree.0.lock();
365 tree.rollback();
366 }
367 }
368}
369
370impl<File: ManagedFile> Drop for ExecutingTransaction<File> {
371 fn drop(&mut self) {
372 if let Some(transaction) = self.transaction.take() {
373 self.rollback_tree_states();
374 drop(transaction);
376 }
377 }
378}
379
380pub struct TransactionTree<Root: tree::Root, File: ManagedFile> {
382 pub(crate) transaction_id: u64,
383 pub(crate) tree: TreeFile<Root, File>,
384}
385
386pub trait AnyTransactionTree<File: ManagedFile>: Any + Send + Sync {
387 fn as_any(&self) -> &dyn Any;
388 fn as_any_mut(&mut self) -> &mut dyn Any;
389
390 fn state(&self) -> Box<dyn AnyTreeState>;
391
392 fn commit(&mut self) -> Result<(), Error>;
393 fn rollback(&self);
394}
395
396impl<Root: tree::Root, File: ManagedFile> AnyTransactionTree<File> for TransactionTree<Root, File> {
397 fn as_any(&self) -> &dyn Any {
398 self
399 }
400 fn as_any_mut(&mut self) -> &mut dyn Any {
401 self
402 }
403
404 fn state(&self) -> Box<dyn AnyTreeState> {
405 Box::new(self.tree.state.clone())
406 }
407
408 fn commit(&mut self) -> Result<(), Error> {
409 self.tree.commit()
410 }
411
412 fn rollback(&self) {
413 let mut state = self.tree.state.lock();
414 state.rollback(&self.tree.state);
415 }
416}
417
418impl<File: ManagedFile, Index> TransactionTree<VersionedTreeRoot<Index>, File>
419where
420 Index: Clone + Reducer<Index> + EmbeddedIndex + Debug + 'static,
421{
422 pub fn current_sequence_id(&self) -> u64 {
424 let state = self.tree.state.lock();
425 state.root.sequence
426 }
427}
428
429impl<Root: tree::Root, File: ManagedFile> TransactionTree<Root, File> {
430 pub fn set(
432 &mut self,
433 key: impl Into<ArcBytes<'static>>,
434 value: impl Into<ArcBytes<'static>>,
435 ) -> Result<(), Error> {
436 self.modify(vec![key.into()], Operation::Set(value.into()))
437 }
438
439 pub fn modify<'a>(
441 &mut self,
442 keys: Vec<ArcBytes<'a>>,
443 operation: Operation<'a, ArcBytes<'static>>,
444 ) -> Result<(), Error> {
445 self.tree.modify(Modification {
446 keys,
447 transaction_id: Some(self.transaction_id),
448 operation,
449 })
450 }
451
452 pub fn replace(
454 &mut self,
455 key: impl Into<ArcBytes<'static>>,
456 value: impl Into<ArcBytes<'static>>,
457 ) -> Result<Option<ArcBytes<'static>>, Error> {
458 self.tree.replace(key, value, Some(self.transaction_id))
459 }
460
461 pub fn get(&mut self, key: &[u8]) -> Result<Option<ArcBytes<'static>>, Error> {
464 self.tree.get(key, true)
465 }
466
467 pub fn remove(&mut self, key: &[u8]) -> Result<Option<ArcBytes<'static>>, Error> {
469 self.tree.remove(key, Some(self.transaction_id))
470 }
471
472 pub fn compare_and_swap(
476 &mut self,
477 key: &[u8],
478 old: Option<&[u8]>,
479 new: Option<ArcBytes<'_>>,
480 ) -> Result<(), CompareAndSwapError> {
481 self.tree
482 .compare_and_swap(key, old, new, Some(self.transaction_id))
483 }
484
485 pub fn get_multiple<'keys, KeysIntoIter, KeysIter>(
488 &mut self,
489 keys: KeysIntoIter,
490 ) -> Result<Vec<(ArcBytes<'static>, ArcBytes<'static>)>, Error>
491 where
492 KeysIntoIter: IntoIterator<Item = &'keys [u8], IntoIter = KeysIter>,
493 KeysIter: Iterator<Item = &'keys [u8]> + ExactSizeIterator,
494 {
495 self.tree.get_multiple(keys, true)
496 }
497
498 pub fn get_range<'keys, KeyRangeBounds>(
500 &mut self,
501 range: &'keys KeyRangeBounds,
502 ) -> Result<Vec<(ArcBytes<'static>, ArcBytes<'static>)>, Error>
503 where
504 KeyRangeBounds: RangeBounds<&'keys [u8]> + Debug + ?Sized,
505 {
506 self.tree.get_range(range, true)
507 }
508
509 #[cfg_attr(
531 feature = "tracing",
532 tracing::instrument(skip(self, node_evaluator, key_evaluator, callback))
533 )]
534 pub fn scan<'b, 'keys, CallerError, KeyRangeBounds, NodeEvaluator, KeyEvaluator, DataCallback>(
535 &mut self,
536 range: &'keys KeyRangeBounds,
537 forwards: bool,
538 mut node_evaluator: NodeEvaluator,
539 mut key_evaluator: KeyEvaluator,
540 mut callback: DataCallback,
541 ) -> Result<(), AbortError<CallerError>>
542 where
543 KeyRangeBounds: RangeBounds<&'keys [u8]> + Debug + ?Sized,
544 NodeEvaluator: FnMut(&ArcBytes<'static>, &Root::ReducedIndex, usize) -> ScanEvaluation,
545 KeyEvaluator: FnMut(&ArcBytes<'static>, &Root::Index) -> ScanEvaluation,
546 DataCallback: FnMut(
547 ArcBytes<'static>,
548 &Root::Index,
549 ArcBytes<'static>,
550 ) -> Result<(), AbortError<CallerError>>,
551 CallerError: Display + Debug,
552 {
553 self.tree.scan(
554 range,
555 forwards,
556 true,
557 &mut node_evaluator,
558 &mut key_evaluator,
559 &mut callback,
560 )
561 }
562
563 #[cfg_attr(feature = "tracing", tracing::instrument(skip(self)))]
572 pub fn reduce<'keys, KeyRangeBounds>(
573 &mut self,
574 range: &'keys KeyRangeBounds,
575 ) -> Result<Root::ReducedIndex, Error>
576 where
577 KeyRangeBounds: RangeBounds<&'keys [u8]> + Debug + Clone + ?Sized,
578 {
579 self.tree.reduce(range, true)
580 }
581
582 #[cfg_attr(feature = "tracing", tracing::instrument(skip(self)))]
584 pub fn first_key(&mut self) -> Result<Option<ArcBytes<'static>>, Error> {
585 self.tree.first_key(true)
586 }
587
588 #[cfg_attr(feature = "tracing", tracing::instrument(skip(self)))]
590 pub fn first(&mut self) -> Result<Option<(ArcBytes<'static>, ArcBytes<'static>)>, Error> {
591 self.tree.first(true)
592 }
593
594 #[cfg_attr(feature = "tracing", tracing::instrument(skip(self)))]
596 pub fn last_key(&mut self) -> Result<Option<ArcBytes<'static>>, Error> {
597 self.tree.last_key(true)
598 }
599
600 #[cfg_attr(feature = "tracing", tracing::instrument(skip(self)))]
602 pub fn last(&mut self) -> Result<Option<(ArcBytes<'static>, ArcBytes<'static>)>, Error> {
603 self.tree.last(true)
604 }
605}
606
607#[derive(Debug, thiserror::Error)]
609pub enum CompareAndSwapError {
610 #[error("value did not match. existing value: {0:?}")]
612 Conflict(Option<ArcBytes<'static>>),
613 #[error("error during compare_and_swap: {0}")]
615 Error(#[from] Error),
616}
617
618#[derive(Debug)]
620#[must_use]
621pub struct Config<M: FileManager = StdFileManager> {
622 path: PathBuf,
623 vault: Option<Arc<dyn AnyVault>>,
624 cache: Option<ChunkCache>,
625 file_manager: Option<M>,
626 thread_pool: Option<ThreadPool<M::File>>,
627}
628
629impl<M: FileManager> Clone for Config<M> {
630 fn clone(&self) -> Self {
631 Self {
632 path: self.path.clone(),
633 vault: self.vault.clone(),
634 cache: self.cache.clone(),
635 file_manager: self.file_manager.clone(),
636 thread_pool: self.thread_pool.clone(),
637 }
638 }
639}
640
641impl Config<StdFileManager> {
642 pub fn new<P: AsRef<Path>>(path: P) -> Self {
644 Self {
645 path: path.as_ref().to_path_buf(),
646 vault: None,
647 cache: None,
648 thread_pool: None,
649 file_manager: None,
650 }
651 }
652
653 pub fn default_for<P: AsRef<Path>>(path: P) -> Self {
655 Self {
656 path: path.as_ref().to_path_buf(),
657 vault: None,
658 cache: Some(ChunkCache::new(2000, 65536)),
659 thread_pool: Some(ThreadPool::default()),
660 file_manager: None,
661 }
662 }
663
664 pub fn file_manager<M: FileManager>(self, file_manager: M) -> Config<M> {
670 assert!(self.thread_pool.is_none());
671 Config {
672 path: self.path,
673 vault: self.vault,
674 cache: self.cache,
675 file_manager: Some(file_manager),
676 thread_pool: None,
677 }
678 }
679}
680
681impl<M: FileManager> Config<M> {
682 pub fn vault<V: AnyVault>(mut self, vault: V) -> Self {
684 self.vault = Some(Arc::new(vault));
685 self
686 }
687
688 pub fn cache(mut self, cache: ChunkCache) -> Self {
690 self.cache = Some(cache);
691 self
692 }
693
694 pub fn shared_thread_pool(mut self, thread_pool: &ThreadPool<M::File>) -> Self {
698 self.thread_pool = Some(thread_pool.clone());
699 self
700 }
701
702 pub fn open(self) -> Result<Roots<M::File>, Error> {
704 Roots::open(
705 self.path,
706 Context {
707 file_manager: self.file_manager.unwrap_or_default(),
708 vault: self.vault,
709 cache: self.cache,
710 },
711 self.thread_pool.unwrap_or_default(),
712 )
713 }
714}
715
716pub struct Tree<Root: tree::Root, File: ManagedFile> {
718 roots: Roots<File>,
719 state: State<Root>,
720 vault: Option<Arc<dyn AnyVault>>,
721 name: Cow<'static, str>,
722}
723
724impl<Root: tree::Root, File: ManagedFile> Clone for Tree<Root, File> {
725 fn clone(&self) -> Self {
726 Self {
727 roots: self.roots.clone(),
728 state: self.state.clone(),
729 vault: self.vault.clone(),
730 name: self.name.clone(),
731 }
732 }
733}
734
735impl<Root: tree::Root, File: ManagedFile> Tree<Root, File> {
736 #[must_use]
738 pub fn name(&self) -> &str {
739 &self.name
740 }
741
742 #[must_use]
744 pub fn path(&self) -> PathBuf {
745 self.roots.tree_path(self.name())
746 }
747
748 #[must_use]
750 pub fn count(&self) -> u64 {
751 let state = self.state.lock();
752 state.root.count()
753 }
754
755 #[allow(clippy::missing_panics_doc)]
757 pub fn set(
758 &self,
759 key: impl Into<ArcBytes<'static>>,
760 value: impl Into<ArcBytes<'static>>,
761 ) -> Result<(), Error> {
762 let transaction = self.begin_transaction()?;
763 transaction.tree::<Root>(0).unwrap().set(key, value)?;
764 transaction.commit()
765 }
766
767 fn begin_transaction(&self) -> Result<ExecutingTransaction<File>, Error> {
768 let mut root = Root::tree(self.name.clone());
769 if let Some(vault) = &self.vault {
770 root.vault = Some(vault.clone());
771 }
772 self.roots.transaction(&[root])
773 }
774
775 fn open_for_read(&self) -> Result<TreeFile<Root, File>, Error> {
776 let context = self.vault.as_ref().map_or_else(
777 || Cow::Borrowed(self.roots.context()),
778 |vault| Cow::Owned(self.roots.context().clone().with_any_vault(vault.clone())),
779 );
780
781 TreeFile::<Root, File>::read(
782 self.path(),
783 self.state.clone(),
784 &context,
785 Some(self.roots.transactions()),
786 )
787 }
788
789 pub fn get(&self, key: &[u8]) -> Result<Option<ArcBytes<'static>>, Error> {
792 catch_compaction_and_retry(|| {
793 let mut tree = self.open_for_read()?;
794
795 tree.get(key, false)
796 })
797 }
798
799 #[allow(clippy::missing_panics_doc)]
801 pub fn replace(
802 &mut self,
803 key: impl Into<ArcBytes<'static>>,
804 value: impl Into<ArcBytes<'static>>,
805 ) -> Result<Option<ArcBytes<'static>>, Error> {
806 let transaction = self.begin_transaction()?;
807 let existing_value = transaction.tree::<Root>(0).unwrap().replace(key, value)?;
808 transaction.commit()?;
809 Ok(existing_value)
810 }
811
812 #[allow(clippy::missing_panics_doc)]
814 pub fn modify<'a>(
815 &mut self,
816 keys: Vec<ArcBytes<'a>>,
817 operation: Operation<'a, ArcBytes<'static>>,
818 ) -> Result<(), Error> {
819 let transaction = self.begin_transaction()?;
820 transaction
821 .tree::<Root>(0)
822 .unwrap()
823 .modify(keys, operation)?;
824 transaction.commit()?;
825 Ok(())
826 }
827
828 #[allow(clippy::missing_panics_doc)]
830 pub fn remove(&self, key: &[u8]) -> Result<Option<ArcBytes<'static>>, Error> {
831 let transaction = self.begin_transaction()?;
832 let existing_value = transaction.tree::<Root>(0).unwrap().remove(key)?;
833 transaction.commit()?;
834 Ok(existing_value)
835 }
836
837 #[allow(clippy::missing_panics_doc)]
841 pub fn compare_and_swap(
842 &self,
843 key: &[u8],
844 old: Option<&[u8]>,
845 new: Option<ArcBytes<'_>>,
846 ) -> Result<(), CompareAndSwapError> {
847 let transaction = self.begin_transaction()?;
848 transaction
849 .tree::<Root>(0)
850 .unwrap()
851 .compare_and_swap(key, old, new)?;
852 transaction.commit()?;
853 Ok(())
854 }
855
856 #[allow(clippy::needless_pass_by_value)]
859 pub fn get_multiple<'keys, Keys>(
860 &self,
861 keys: Keys,
862 ) -> Result<Vec<(ArcBytes<'static>, ArcBytes<'static>)>, Error>
863 where
864 Keys: Iterator<Item = &'keys [u8]> + ExactSizeIterator + Clone,
865 {
866 catch_compaction_and_retry(|| {
867 let mut tree = self.open_for_read()?;
868
869 tree.get_multiple(keys.clone(), false)
870 })
871 }
872
873 pub fn get_range<'keys, KeyRangeBounds>(
875 &self,
876 range: &'keys KeyRangeBounds,
877 ) -> Result<Vec<(ArcBytes<'static>, ArcBytes<'static>)>, Error>
878 where
879 KeyRangeBounds: RangeBounds<&'keys [u8]> + Debug + Clone + ?Sized,
880 {
881 catch_compaction_and_retry(|| {
882 let mut tree = self.open_for_read()?;
883
884 tree.get_range(range, false)
885 })
886 }
887
888 #[cfg_attr(
909 feature = "tracing",
910 tracing::instrument(skip(self, node_evaluator, key_evaluator, callback))
911 )]
912 pub fn scan<'keys, CallerError, KeyRangeBounds, NodeEvaluator, KeyEvaluator, DataCallback>(
913 &self,
914 range: &'keys KeyRangeBounds,
915 forwards: bool,
916 mut node_evaluator: NodeEvaluator,
917 mut key_evaluator: KeyEvaluator,
918 mut callback: DataCallback,
919 ) -> Result<(), AbortError<CallerError>>
920 where
921 KeyRangeBounds: RangeBounds<&'keys [u8]> + Debug + Clone + ?Sized,
922 NodeEvaluator: FnMut(&ArcBytes<'static>, &Root::ReducedIndex, usize) -> ScanEvaluation,
923 KeyEvaluator: FnMut(&ArcBytes<'static>, &Root::Index) -> ScanEvaluation,
924 DataCallback: FnMut(
925 ArcBytes<'static>,
926 &Root::Index,
927 ArcBytes<'static>,
928 ) -> Result<(), AbortError<CallerError>>,
929 CallerError: Display + Debug,
930 {
931 catch_compaction_and_retry_abortable(move || {
932 let mut tree = self.open_for_read()?;
933
934 tree.scan(
935 range,
936 forwards,
937 false,
938 &mut node_evaluator,
939 &mut key_evaluator,
940 &mut callback,
941 )
942 })
943 }
944
945 #[cfg_attr(feature = "tracing", tracing::instrument(skip(self)))]
954 pub fn reduce<'keys, KeyRangeBounds>(
955 &self,
956 range: &'keys KeyRangeBounds,
957 ) -> Result<Root::ReducedIndex, Error>
958 where
959 KeyRangeBounds: RangeBounds<&'keys [u8]> + Debug + Clone + ?Sized,
960 {
961 catch_compaction_and_retry(move || {
962 let mut tree = self.open_for_read()?;
963
964 tree.reduce(range, false)
965 })
966 }
967
968 #[cfg_attr(feature = "tracing", tracing::instrument(skip(self)))]
970 pub fn first_key(&self) -> Result<Option<ArcBytes<'static>>, Error> {
971 catch_compaction_and_retry(|| {
972 let mut tree = self.open_for_read()?;
973
974 tree.first_key(false)
975 })
976 }
977
978 #[cfg_attr(feature = "tracing", tracing::instrument(skip(self)))]
980 pub fn first(&self) -> Result<Option<(ArcBytes<'static>, ArcBytes<'static>)>, Error> {
981 catch_compaction_and_retry(|| {
982 let mut tree = self.open_for_read()?;
983
984 tree.first(false)
985 })
986 }
987
988 #[cfg_attr(feature = "tracing", tracing::instrument(skip(self)))]
990 pub fn last_key(&self) -> Result<Option<ArcBytes<'static>>, Error> {
991 catch_compaction_and_retry(|| {
992 let mut tree = self.open_for_read()?;
993
994 tree.last_key(false)
995 })
996 }
997
998 #[cfg_attr(feature = "tracing", tracing::instrument(skip(self)))]
1000 pub fn last(&self) -> Result<Option<(ArcBytes<'static>, ArcBytes<'static>)>, Error> {
1001 catch_compaction_and_retry(|| {
1002 let mut tree = self.open_for_read()?;
1003
1004 tree.last(false)
1005 })
1006 }
1007
1008 pub fn compact(&self) -> Result<(), Error> {
1015 let tree = self.open_for_read()?;
1016 tree.compact(
1017 &self.roots.context().file_manager,
1018 Some(TransactableCompaction {
1019 name: self.name.as_ref(),
1020 manager: self.roots.transactions(),
1021 }),
1022 )?;
1023 Ok(())
1024 }
1025}
1026
1027impl<Root: tree::Root, File: ManagedFile> AnyTreeRoot<File> for Tree<Root, File> {
1028 fn name(&self) -> &str {
1029 &self.name
1030 }
1031
1032 fn default_state(&self) -> Box<dyn AnyTreeState> {
1033 Box::new(State::<Root>::default())
1034 }
1035
1036 fn begin_transaction(
1037 &self,
1038 transaction_id: u64,
1039 file_path: &Path,
1040 state: &dyn AnyTreeState,
1041 context: &Context<File::Manager>,
1042 transactions: Option<&TransactionManager<File::Manager>>,
1043 ) -> Result<Box<dyn AnyTransactionTree<File>>, Error> {
1044 let context = self.vault.as_ref().map_or_else(
1045 || Cow::Borrowed(context),
1046 |vault| Cow::Owned(context.clone().with_any_vault(vault.clone())),
1047 );
1048 let tree = TreeFile::write(
1049 file_path,
1050 state
1051 .as_any()
1052 .downcast_ref::<State<Root>>()
1053 .unwrap()
1054 .clone(),
1055 &context,
1056 transactions,
1057 )?;
1058
1059 Ok(Box::new(TransactionTree {
1060 transaction_id,
1061 tree,
1062 }))
1063 }
1064}
1065
1066impl<File: ManagedFile, Index> Tree<VersionedTreeRoot<Index>, File>
1067where
1068 Index: EmbeddedIndex + Reducer<Index> + Clone + Debug + 'static,
1069{
1070 #[cfg_attr(
1078 feature = "tracing",
1079 tracing::instrument(skip(self, key_evaluator, data_callback))
1080 )]
1081 pub fn scan_sequences<CallerError, Range, KeyEvaluator, DataCallback>(
1082 &mut self,
1083 range: Range,
1084 forwards: bool,
1085 key_evaluator: &mut KeyEvaluator,
1086 data_callback: &mut DataCallback,
1087 ) -> Result<(), AbortError<CallerError>>
1088 where
1089 Range: Clone + RangeBounds<u64> + Debug + 'static,
1090 KeyEvaluator: FnMut(KeySequence) -> ScanEvaluation,
1091 DataCallback: FnMut(KeySequence, ArcBytes<'static>) -> Result<(), AbortError<CallerError>>,
1092 CallerError: Display + Debug,
1093 {
1094 catch_compaction_and_retry_abortable(|| {
1095 let mut tree = TreeFile::<VersionedTreeRoot<Index>, File>::read(
1096 self.path(),
1097 self.state.clone(),
1098 self.roots.context(),
1099 Some(self.roots.transactions()),
1100 )?;
1101
1102 tree.scan_sequences(range.clone(), forwards, false, key_evaluator, data_callback)
1103 })
1104 }
1105}
1106
1107#[derive(thiserror::Error, Debug)]
1109pub enum AbortError<CallerError: Display + Debug = Infallible> {
1110 #[error("other error: {0}")]
1112 Other(CallerError),
1113 #[error("database error: {0}")]
1115 Nebari(#[from] Error),
1116}
1117
1118impl AbortError<Infallible> {
1119 #[must_use]
1121 pub fn infallible(self) -> Error {
1122 match self {
1123 AbortError::Other(_) => unreachable!(),
1124 AbortError::Nebari(error) => error,
1125 }
1126 }
1127}
1128
1129#[derive(Debug)]
1131pub struct ThreadPool<File>
1132where
1133 File: ManagedFile,
1134{
1135 sender: flume::Sender<ThreadCommit<File>>,
1136 receiver: flume::Receiver<ThreadCommit<File>>,
1137 thread_count: Arc<AtomicU16>,
1138 maximum_threads: usize,
1139}
1140
1141impl<File: ManagedFile> ThreadPool<File> {
1142 #[must_use]
1145 pub fn new(maximum_threads: usize) -> Self {
1146 let (sender, receiver) = flume::unbounded();
1147 Self {
1148 sender,
1149 receiver,
1150 thread_count: Arc::new(AtomicU16::new(0)),
1151 maximum_threads,
1152 }
1153 }
1154
1155 fn commit_trees(
1156 &self,
1157 trees: Vec<UnlockedTransactionTree<File>>,
1158 ) -> Result<Vec<Box<dyn AnyTransactionTree<File>>>, Error> {
1159 if trees.len() == 1 {
1163 let mut tree = trees.into_iter().next().unwrap().0.into_inner();
1164 tree.commit()?;
1165 Ok(vec![tree])
1166 } else {
1167 let (completion_sender, completion_receiver) = flume::unbounded();
1169 let tree_count = trees.len();
1170 for tree in trees {
1171 self.sender.send(ThreadCommit {
1172 tree: tree.0.into_inner(),
1173 completion_sender: completion_sender.clone(),
1174 })?;
1175 }
1176
1177 let desired_threads = tree_count.min(self.maximum_threads);
1179 loop {
1180 let thread_count = self.thread_count.load(Ordering::SeqCst);
1181 if (thread_count as usize) >= desired_threads {
1182 break;
1183 }
1184
1185 if self
1187 .thread_count
1188 .compare_exchange(
1189 thread_count,
1190 thread_count + 1,
1191 Ordering::SeqCst,
1192 Ordering::SeqCst,
1193 )
1194 .is_ok()
1195 {
1196 let commit_receiver = self.receiver.clone();
1197 std::thread::Builder::new()
1198 .name(String::from("roots-txwriter"))
1199 .spawn(move || transaction_commit_thread(commit_receiver))
1200 .unwrap();
1201 }
1202 }
1203
1204 let mut results = Vec::with_capacity(tree_count);
1206 for _ in 0..tree_count {
1207 results.push(completion_receiver.recv()??);
1208 }
1209
1210 Ok(results)
1211 }
1212 }
1213}
1214
1215impl<File: ManagedFile> Clone for ThreadPool<File> {
1216 fn clone(&self) -> Self {
1217 Self {
1218 sender: self.sender.clone(),
1219 receiver: self.receiver.clone(),
1220 thread_count: self.thread_count.clone(),
1221 maximum_threads: self.maximum_threads,
1222 }
1223 }
1224}
1225
1226impl<File: ManagedFile> Default for ThreadPool<File> {
1227 fn default() -> Self {
1228 static CPU_COUNT: Lazy<usize> = Lazy::new(num_cpus::get);
1229 Self::new(*CPU_COUNT)
1230 }
1231}
1232
1233#[allow(clippy::needless_pass_by_value)]
1234fn transaction_commit_thread<File: ManagedFile>(receiver: flume::Receiver<ThreadCommit<File>>) {
1235 while let Ok(ThreadCommit {
1236 mut tree,
1237 completion_sender,
1238 }) = receiver.recv()
1239 {
1240 let result = tree.commit();
1241 let result = result.map(move |_| tree);
1242 drop(completion_sender.send(result));
1243 }
1244}
1245
1246struct ThreadCommit<File>
1247where
1248 File: ManagedFile,
1249{
1250 tree: Box<dyn AnyTransactionTree<File>>,
1251 completion_sender: Sender<Result<Box<dyn AnyTransactionTree<File>>, Error>>,
1252}
1253
1254fn catch_compaction_and_retry<R, F: Fn() -> Result<R, Error>>(func: F) -> Result<R, Error> {
1255 loop {
1256 match func() {
1257 Ok(result) => return Ok(result),
1258 Err(error) => {
1259 if matches!(error.kind, ErrorKind::TreeCompacted) {
1260 continue;
1261 }
1262
1263 return Err(error);
1264 }
1265 }
1266 }
1267}
1268
1269fn catch_compaction_and_retry_abortable<
1270 R,
1271 E: Display + Debug,
1272 F: FnMut() -> Result<R, AbortError<E>>,
1273>(
1274 mut func: F,
1275) -> Result<R, AbortError<E>> {
1276 loop {
1277 match func() {
1278 Ok(result) => return Ok(result),
1279 Err(AbortError::Nebari(error)) => {
1280 if matches!(error.kind, ErrorKind::TreeCompacted) {
1281 continue;
1282 }
1283
1284 return Err(AbortError::Nebari(error));
1285 }
1286 Err(other) => return Err(other),
1287 }
1288 }
1289}
1290
1291#[cfg(test)]
1292mod tests {
1293 use byteorder::{BigEndian, ByteOrder};
1294 use tempfile::tempdir;
1295
1296 use super::*;
1297 use crate::{
1298 io::{any::AnyFileManager, fs::StdFileManager, memory::MemoryFileManager},
1299 test_util::RotatorVault,
1300 tree::{Root, Unversioned, Versioned},
1301 };
1302
1303 fn basic_get_set<M: FileManager>(file_manager: M) {
1304 let tempdir = tempdir().unwrap();
1305 let roots = Config::new(tempdir.path())
1306 .file_manager(file_manager)
1307 .open()
1308 .unwrap();
1309
1310 let tree = roots.tree(Versioned::tree("test")).unwrap();
1311 tree.set(b"test", b"value").unwrap();
1312 let result = tree.get(b"test").unwrap().expect("key not found");
1313
1314 assert_eq!(result, b"value");
1315 }
1316
1317 #[test]
1318 fn memory_basic_get_set() {
1319 basic_get_set(MemoryFileManager::default());
1320 }
1321
1322 #[test]
1323 fn std_basic_get_set() {
1324 basic_get_set(StdFileManager::default());
1325 }
1326
1327 #[test]
1328 fn basic_transaction_isolation_test() {
1329 let tempdir = tempdir().unwrap();
1330
1331 let roots = Config::<StdFileManager>::new(tempdir.path())
1332 .open()
1333 .unwrap();
1334 let tree = roots.tree(Versioned::tree("test")).unwrap();
1335 tree.set(b"test", b"value").unwrap();
1336
1337 let transaction = roots.transaction(&[Versioned::tree("test")]).unwrap();
1339
1340 transaction
1342 .tree::<Versioned>(0)
1343 .unwrap()
1344 .set(b"test", b"updated value")
1345 .unwrap();
1346
1347 let result = transaction
1349 .tree::<Versioned>(0)
1350 .unwrap()
1351 .get(b"test")
1352 .unwrap()
1353 .expect("key not found");
1354 assert_eq!(result, b"updated value");
1355
1356 let result = tree.get(b"test").unwrap().expect("key not found");
1358 assert_eq!(result, b"value");
1359
1360 transaction.commit().unwrap();
1362
1363 let result = tree.get(b"test").unwrap().expect("key not found");
1365 assert_eq!(result, b"updated value");
1366 }
1367
1368 #[test]
1369 fn basic_transaction_rollback_test() {
1370 let tempdir = tempdir().unwrap();
1371
1372 let roots = Config::<StdFileManager>::new(tempdir.path())
1373 .open()
1374 .unwrap();
1375 let tree = roots.tree(Versioned::tree("test")).unwrap();
1376 tree.set(b"test", b"value").unwrap();
1377
1378 let transaction = roots.transaction(&[Versioned::tree("test")]).unwrap();
1380
1381 transaction
1383 .tree::<Versioned>(0)
1384 .unwrap()
1385 .set(b"test", b"updated value")
1386 .unwrap();
1387
1388 transaction.rollback();
1390
1391 let result = tree.get(b"test").unwrap().expect("key not found");
1393 assert_eq!(result, b"value");
1394
1395 let transaction = roots.transaction(&[Versioned::tree("test")]).unwrap();
1397 let result = transaction
1399 .tree::<Versioned>(0)
1400 .unwrap()
1401 .get(b"test")
1402 .unwrap()
1403 .expect("key not found");
1404 assert_eq!(result, b"value");
1405 }
1406
1407 #[test]
1408 fn std_compact_test_versioned() {
1409 compact_test::<Versioned, _>(StdFileManager::default());
1410 }
1411
1412 #[test]
1413 fn std_compact_test_unversioned() {
1414 compact_test::<Unversioned, _>(StdFileManager::default());
1415 }
1416
1417 #[test]
1418 fn memory_compact_test_versioned() {
1419 compact_test::<Versioned, _>(MemoryFileManager::default());
1420 }
1421
1422 #[test]
1423 fn memory_compact_test_unversioned() {
1424 compact_test::<Unversioned, _>(MemoryFileManager::default());
1425 }
1426
1427 #[test]
1428 fn any_compact_test_versioned() {
1429 compact_test::<Versioned, _>(AnyFileManager::std());
1430 compact_test::<Versioned, _>(AnyFileManager::memory());
1431 }
1432
1433 #[test]
1434 fn any_compact_test_unversioned() {
1435 compact_test::<Unversioned, _>(AnyFileManager::std());
1436 compact_test::<Unversioned, _>(AnyFileManager::memory());
1437 }
1438
1439 fn compact_test<R: Root, M: FileManager>(file_manager: M) {
1440 const OPERATION_COUNT: usize = 256;
1441 const WORKER_COUNT: usize = 4;
1442 let tempdir = tempdir().unwrap();
1443
1444 let roots = Config::new(tempdir.path())
1445 .file_manager(file_manager)
1446 .open()
1447 .unwrap();
1448 let tree = roots.tree(R::tree("test")).unwrap();
1449 tree.set("foo", b"bar").unwrap();
1450
1451 let mut threads = Vec::new();
1453 for worker in 0..WORKER_COUNT {
1454 let tree = tree.clone();
1455 threads.push(std::thread::spawn(move || {
1456 for relative_id in 0..OPERATION_COUNT {
1457 let absolute_id = (worker * OPERATION_COUNT + relative_id) as u64;
1458 tree.set(absolute_id.to_be_bytes(), absolute_id.to_be_bytes())
1459 .unwrap();
1460 let value = tree
1461 .remove(&absolute_id.to_be_bytes())
1462 .unwrap()
1463 .ok_or_else(|| panic!("value not found: {:?}", absolute_id))
1464 .unwrap();
1465 assert_eq!(BigEndian::read_u64(&value), absolute_id);
1466 tree.set(absolute_id.to_be_bytes(), absolute_id.to_be_bytes())
1467 .unwrap();
1468 let newer_value = tree
1469 .get(&absolute_id.to_be_bytes())
1470 .unwrap()
1471 .expect("couldn't find found");
1472 assert_eq!(value, newer_value);
1473 }
1474 }));
1475 }
1476
1477 threads.push(std::thread::spawn(move || {
1478 while tree.count() < (OPERATION_COUNT * WORKER_COUNT) as u64 {
1481 tree.compact().unwrap();
1482 }
1483 }));
1484
1485 for thread in threads {
1486 thread.join().unwrap();
1487 }
1488 }
1489
1490 #[test]
1491 fn name_tests() {
1492 assert!(check_name("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ_-.").is_ok());
1493 assert!(check_name("=").is_err());
1494 assert!(check_name("_transactions").is_err());
1495 }
1496
1497 #[test]
1498 fn context_encryption_tests() {
1499 let tempdir = tempdir().unwrap();
1500
1501 {
1503 let roots = Config::<StdFileManager>::new(tempdir.path())
1504 .vault(RotatorVault::new(13))
1505 .open()
1506 .unwrap();
1507 let tree = roots.tree(Versioned::tree("test")).unwrap();
1508 tree.set(b"test", b"value").unwrap();
1509 let other_tree = roots
1510 .tree(Versioned::tree("test-otherkey").with_vault(RotatorVault::new(42)))
1511 .unwrap();
1512 other_tree.set(b"test", b"other").unwrap();
1513 }
1514 {
1516 let roots = Config::<StdFileManager>::new(tempdir.path())
1517 .vault(RotatorVault::new(13))
1518 .open()
1519 .unwrap();
1520 let tree = roots.tree(Versioned::tree("test")).unwrap();
1521 let value = tree.get(b"test").unwrap();
1522 assert_eq!(value.as_deref(), Some(&b"value"[..]));
1523
1524 let bad_tree = roots.tree(Versioned::tree("test-otherkey")).unwrap();
1526 assert!(bad_tree.get(b"test").is_err());
1527
1528 let tree = roots
1530 .tree(Versioned::tree("test-otherkey").with_vault(RotatorVault::new(42)))
1531 .unwrap();
1532 let value = tree.get(b"test").unwrap();
1533 assert_eq!(value.as_deref(), Some(&b"other"[..]));
1534 }
1535 {
1536 let roots = Config::<StdFileManager>::new(tempdir.path())
1537 .open()
1538 .unwrap();
1539 let bad_tree = roots.tree(Versioned::tree("test")).unwrap();
1541 assert!(bad_tree.get(b"test").is_err());
1542
1543 let bad_tree = roots
1545 .tree(Versioned::tree("test").with_vault(RotatorVault::new(13)))
1546 .unwrap();
1547 assert_eq!(bad_tree.get(b"test").unwrap(), None);
1548 }
1549 }
1550
1551 #[test]
1552 fn too_large_transaction() {
1553 let tempdir = tempdir().unwrap();
1554
1555 let config = Config::<StdFileManager>::new(tempdir.path());
1556 {
1557 let roots = config.clone().open().unwrap();
1558
1559 let mut transaction = roots.transaction(&[Versioned::tree("test")]).unwrap();
1560
1561 transaction
1563 .tree::<Versioned>(0)
1564 .unwrap()
1565 .set(b"test", vec![0; 16 * 1024 * 1024])
1566 .unwrap();
1567
1568 assert!(matches!(
1570 transaction
1571 .entry_mut()
1572 .set_data(vec![0; 16 * 1024 * 1024 - 7])
1573 .unwrap_err()
1574 .kind,
1575 ErrorKind::ValueTooLarge
1576 ));
1577 transaction.rollback();
1579 }
1580 {
1582 let roots = config.open().unwrap();
1583
1584 let transaction = roots.transaction(&[Versioned::tree("test")]).unwrap();
1585
1586 transaction
1588 .tree::<Versioned>(0)
1589 .unwrap()
1590 .set(b"test", b"updated value")
1591 .unwrap();
1592
1593 transaction.commit().unwrap();
1594 }
1595 }
1596}