sled/
transaction.rs

1//! Fully serializable (ACID) multi-`Tree` transactions
2//!
3//! # Examples
4//! ```
5//! # use sled::{transaction::TransactionResult, Config};
6//! # fn main() -> TransactionResult<()> {
7//!
8//! let config = Config::new().temporary(true);
9//! let db1 = config.open().unwrap();
10//! let db = db1.open_tree(b"a").unwrap();
11//!
12//! // Use write-only transactions as a writebatch:
13//! db.transaction(|db| {
14//!     db.insert(b"k1", b"cats")?;
15//!     db.insert(b"k2", b"dogs")?;
16//!     Ok(())
17//! })?;
18//!
19//! // Atomically swap two items:
20//! db.transaction(|db| {
21//!     let v1_option = db.remove(b"k1")?;
22//!     let v1 = v1_option.unwrap();
23//!     let v2_option = db.remove(b"k2")?;
24//!     let v2 = v2_option.unwrap();
25//!
26//!     db.insert(b"k1", v2)?;
27//!     db.insert(b"k2", v1)?;
28//!
29//!     Ok(())
30//! })?;
31//!
32//! assert_eq!(&db.get(b"k1")?.unwrap(), b"dogs");
33//! assert_eq!(&db.get(b"k2")?.unwrap(), b"cats");
34//! # Ok(())
35//! # }
36//! ```
37//!
38//! Transactions also work on tuples of `Tree`s,
39//! preserving serializable ACID semantics!
40//! In this example, we treat two trees like a
41//! work queue, atomically apply updates to
42//! data and move them from the unprocessed `Tree`
43//! to the processed `Tree`.
44//!
45//! ```
46//! # use sled::{transaction::{TransactionResult, Transactional}, Config};
47//! # fn main() -> TransactionResult<()> {
48//!
49//! let config = Config::new().temporary(true);
50//! let db = config.open().unwrap();
51//!
52//! let unprocessed = db.open_tree(b"unprocessed items").unwrap();
53//! let processed = db.open_tree(b"processed items").unwrap();
54//!
55//! // An update somehow gets into the tree, which we
56//! // later trigger the atomic processing of.
57//! unprocessed.insert(b"k3", b"ligers").unwrap();
58//!
59//! // Atomically process the new item and move it
60//! // between `Tree`s.
61//! (&unprocessed, &processed)
62//!     .transaction(|(unprocessed, processed)| {
63//!         let unprocessed_item = unprocessed.remove(b"k3")?.unwrap();
64//!         let mut processed_item = b"yappin' ".to_vec();
65//!         processed_item.extend_from_slice(&unprocessed_item);
66//!         processed.insert(b"k3", processed_item)?;
67//!         Ok(())
68//!     })?;
69//!
70//! assert_eq!(unprocessed.get(b"k3").unwrap(), None);
71//! assert_eq!(&processed.get(b"k3").unwrap().unwrap(), b"yappin' ligers");
72//! # Ok(())
73//! # }
74//! ```
75#![allow(clippy::module_name_repetitions)]
76use std::{cell::RefCell, fmt, rc::Rc};
77
78#[cfg(not(feature = "testing"))]
79use std::collections::HashMap as Map;
80
81// we avoid HashMap while testing because
82// it makes tests non-deterministic
83#[cfg(feature = "testing")]
84use std::collections::BTreeMap as Map;
85
86use crate::{
87    concurrency_control, pin, Batch, Error, Guard, IVec, Protector, Result,
88    Tree,
89};
90
91/// A transaction that will
92/// be applied atomically to the
93/// Tree.
94#[derive(Clone)]
95pub struct TransactionalTree {
96    pub(super) tree: Tree,
97    pub(super) writes: Rc<RefCell<Map<IVec, Option<IVec>>>>,
98    pub(super) read_cache: Rc<RefCell<Map<IVec, Option<IVec>>>>,
99    pub(super) flush_on_commit: Rc<RefCell<bool>>,
100}
101
102/// An error type that is returned from the closure
103/// passed to the `transaction` method.
104#[derive(Debug, Clone, PartialEq)]
105pub enum UnabortableTransactionError {
106    /// An internal conflict has occurred and the `transaction` method will
107    /// retry the passed-in closure until it succeeds. This should never be
108    /// returned directly from the user's closure, as it will create an
109    /// infinite loop that never returns. This is why it is hidden.
110    Conflict,
111    /// A serious underlying storage issue has occurred that requires
112    /// attention from an operator or a remediating system, such as
113    /// corruption.
114    Storage(Error),
115}
116
117impl fmt::Display for UnabortableTransactionError {
118    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
119        use UnabortableTransactionError::*;
120        match self {
121            Conflict => write!(f, "Conflict during transaction"),
122            Storage(e) => e.fmt(f),
123        }
124    }
125}
126
127impl std::error::Error for UnabortableTransactionError {
128    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
129        match self {
130            UnabortableTransactionError::Storage(ref e) => Some(e),
131            _ => None,
132        }
133    }
134}
135
136pub(crate) type UnabortableTransactionResult<T> =
137    std::result::Result<T, UnabortableTransactionError>;
138
139impl From<Error> for UnabortableTransactionError {
140    fn from(error: Error) -> Self {
141        UnabortableTransactionError::Storage(error)
142    }
143}
144
145impl<E> From<UnabortableTransactionError> for ConflictableTransactionError<E> {
146    fn from(error: UnabortableTransactionError) -> Self {
147        match error {
148            UnabortableTransactionError::Conflict => {
149                ConflictableTransactionError::Conflict
150            }
151            UnabortableTransactionError::Storage(error) => {
152                ConflictableTransactionError::Storage(error)
153            }
154        }
155    }
156}
157
158/// An error type that is returned from the closure
159/// passed to the `transaction` method.
160#[derive(Debug, Clone, PartialEq)]
161pub enum ConflictableTransactionError<T = Error> {
162    /// A user-provided error type that indicates the transaction should abort.
163    /// This is passed into the return value of `transaction` as a direct Err
164    /// instance, rather than forcing users to interact with this enum
165    /// directly.
166    Abort(T),
167    #[doc(hidden)]
168    /// An internal conflict has occurred and the `transaction` method will
169    /// retry the passed-in closure until it succeeds. This should never be
170    /// returned directly from the user's closure, as it will create an
171    /// infinite loop that never returns. This is why it is hidden.
172    Conflict,
173    /// A serious underlying storage issue has occurred that requires
174    /// attention from an operator or a remediating system, such as
175    /// corruption.
176    Storage(Error),
177}
178
179impl<E: fmt::Display> fmt::Display for ConflictableTransactionError<E> {
180    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
181        use ConflictableTransactionError::*;
182        match self {
183            Abort(e) => e.fmt(f),
184            Conflict => write!(f, "Conflict during transaction"),
185            Storage(e) => e.fmt(f),
186        }
187    }
188}
189
190impl<E: std::error::Error> std::error::Error
191    for ConflictableTransactionError<E>
192{
193    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
194        match self {
195            ConflictableTransactionError::Storage(ref e) => Some(e),
196            _ => None,
197        }
198    }
199}
200
201/// An error type that is returned from the closure
202/// passed to the `transaction` method.
203#[derive(Debug, Clone, PartialEq)]
204pub enum TransactionError<T = Error> {
205    /// A user-provided error type that indicates the transaction should abort.
206    /// This is passed into the return value of `transaction` as a direct Err
207    /// instance, rather than forcing users to interact with this enum
208    /// directly.
209    Abort(T),
210    /// A serious underlying storage issue has occurred that requires
211    /// attention from an operator or a remediating system, such as
212    /// corruption.
213    Storage(Error),
214}
215
216impl<E: fmt::Display> fmt::Display for TransactionError<E> {
217    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
218        use TransactionError::*;
219        match self {
220            Abort(e) => e.fmt(f),
221            Storage(e) => e.fmt(f),
222        }
223    }
224}
225
226impl<E: std::error::Error> std::error::Error for TransactionError<E> {
227    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
228        match self {
229            TransactionError::Storage(ref e) => Some(e),
230            _ => None,
231        }
232    }
233}
234
235/// A transaction-related `Result` which is used for transparently handling
236/// concurrency-related conflicts when running transaction closures.
237pub type ConflictableTransactionResult<T, E = ()> =
238    std::result::Result<T, ConflictableTransactionError<E>>;
239
240impl<T> From<Error> for ConflictableTransactionError<T> {
241    fn from(error: Error) -> Self {
242        ConflictableTransactionError::Storage(error)
243    }
244}
245
246/// A transaction-related `Result` which is used for returning the
247/// final result of a transaction after potentially running the provided
248/// closure several times due to underlying conflicts.
249pub type TransactionResult<T, E = ()> =
250    std::result::Result<T, TransactionError<E>>;
251
252impl<T> From<Error> for TransactionError<T> {
253    fn from(error: Error) -> Self {
254        TransactionError::Storage(error)
255    }
256}
257
258impl TransactionalTree {
259    /// Set a key to a new value
260    pub fn insert<K, V>(
261        &self,
262        key: K,
263        value: V,
264    ) -> UnabortableTransactionResult<Option<IVec>>
265    where
266        K: AsRef<[u8]> + Into<IVec>,
267        V: Into<IVec>,
268    {
269        let old = self.get(key.as_ref())?;
270        let mut writes = self.writes.borrow_mut();
271        let _last_write =
272            writes.insert(key.into(), Some(value.into()));
273        Ok(old)
274    }
275
276    /// Remove a key
277    pub fn remove<K>(
278        &self,
279        key: K,
280    ) -> UnabortableTransactionResult<Option<IVec>>
281    where
282        K: AsRef<[u8]> + Into<IVec>,
283    {
284        let old = self.get(key.as_ref());
285        let mut writes = self.writes.borrow_mut();
286        let _last_write = writes.insert(key.into(), None);
287        old
288    }
289
290    /// Get the value associated with a key
291    pub fn get<K: AsRef<[u8]>>(
292        &self,
293        key: K,
294    ) -> UnabortableTransactionResult<Option<IVec>> {
295        let writes = self.writes.borrow();
296        if let Some(first_try) = writes.get(key.as_ref()) {
297            return Ok(first_try.clone());
298        }
299        let mut reads = self.read_cache.borrow_mut();
300        if let Some(second_try) = reads.get(key.as_ref()) {
301            return Ok(second_try.clone());
302        }
303
304        // not found in a cache, need to hit the backing db
305        let mut guard = pin();
306        let get = loop {
307            if let Ok(get) = self.tree.get_inner(key.as_ref(), &mut guard)? {
308                break get;
309            }
310        };
311        let last = reads.insert(key.as_ref().into(), get.clone());
312        assert!(last.is_none());
313
314        Ok(get)
315    }
316
317    /// Atomically apply multiple inserts and removals.
318    pub fn apply_batch(
319        &self,
320        batch: &Batch,
321    ) -> UnabortableTransactionResult<()> {
322        for (k, v_opt) in &batch.writes {
323            if let Some(v) = v_opt {
324                let _old = self.insert(k, v)?;
325            } else {
326                let _old = self.remove(k)?;
327            }
328        }
329        Ok(())
330    }
331
332    /// Flush the database before returning from the transaction.
333    pub fn flush(&self) {
334        *self.flush_on_commit.borrow_mut() = true;
335    }
336
337    /// Generate a monotonic ID. Not guaranteed to be
338    /// contiguous or idempotent, can produce different values in the
339    /// same transaction in case of conflicts.
340    /// Written to disk every `idgen_persist_interval`
341    /// operations, followed by a blocking flush. During recovery, we
342    /// take the last recovered generated ID and add 2x
343    /// the `idgen_persist_interval` to it. While persisting, if the
344    /// previous persisted counter wasn't synced to disk yet, we will do
345    /// a blocking flush to fsync the latest counter, ensuring
346    /// that we will never give out the same counter twice.
347    pub fn generate_id(&self) -> Result<u64> {
348        self.tree.context.pagecache.generate_id_inner()
349    }
350
351    fn unstage(&self) {
352        unimplemented!()
353    }
354
355    const fn validate(&self) -> bool {
356        true
357    }
358
359    fn commit(&self) -> Result<()> {
360        let writes = self.writes.borrow();
361        let mut guard = pin();
362        for (k, v_opt) in &*writes {
363            while self.tree.insert_inner(k, v_opt.clone(), &mut guard)?.is_err()
364            {
365            }
366        }
367        Ok(())
368    }
369    fn from_tree(tree: &Tree) -> Self {
370        Self {
371            tree: tree.clone(),
372            writes: Default::default(),
373            read_cache: Default::default(),
374            flush_on_commit: Default::default(),
375        }
376    }
377}
378
379/// A type which allows for pluggable transactional capabilities
380pub struct TransactionalTrees {
381    inner: Vec<TransactionalTree>,
382}
383
384impl TransactionalTrees {
385    fn stage(&self) -> UnabortableTransactionResult<Protector<'_>> {
386        Ok(concurrency_control::write())
387    }
388
389    fn unstage(&self) {
390        for tree in &self.inner {
391            tree.unstage();
392        }
393    }
394
395    fn validate(&self) -> bool {
396        for tree in &self.inner {
397            if !tree.validate() {
398                return false;
399            }
400        }
401        true
402    }
403
404    fn commit(&self, guard: &Guard) -> Result<()> {
405        let peg = self.inner[0].tree.context.pin_log(guard)?;
406        for tree in &self.inner {
407            tree.commit()?;
408        }
409
410        // when the peg drops, it ensures all updates
411        // written to the log since its creation are
412        // recovered atomically
413        peg.seal_batch()
414    }
415
416    fn flush_if_configured(&self) -> Result<()> {
417        let mut should_flush = None;
418
419        for tree in &self.inner {
420            if *tree.flush_on_commit.borrow() {
421                should_flush = Some(tree);
422                break;
423            }
424        }
425
426        if let Some(tree) = should_flush {
427            tree.tree.flush()?;
428        }
429        Ok(())
430    }
431}
432
433/// A simple constructor for `Err(TransactionError::Abort(_))`
434pub fn abort<A, T>(t: T) -> ConflictableTransactionResult<A, T> {
435    Err(ConflictableTransactionError::Abort(t))
436}
437
438/// A type that may be transacted on in sled transactions.
439pub trait Transactional<E = ()> {
440    /// An internal reference to an internal proxy type that
441    /// mediates transactional reads and writes.
442    type View;
443
444    /// An internal function for creating a top-level
445    /// transactional structure.
446    fn make_overlay(&self) -> Result<TransactionalTrees>;
447
448    /// An internal function for viewing the transactional
449    /// subcomponents based on the top-level transactional
450    /// structure.
451    fn view_overlay(overlay: &TransactionalTrees) -> Self::View;
452
453    /// Runs a transaction, possibly retrying the passed-in closure if
454    /// a concurrent conflict is detected that would cause a violation
455    /// of serializability. This is the only trait method that
456    /// you're most likely to use directly.
457    fn transaction<F, A>(&self, f: F) -> TransactionResult<A, E>
458    where
459        F: Fn(&Self::View) -> ConflictableTransactionResult<A, E>,
460    {
461        loop {
462            let tt = self.make_overlay()?;
463            let view = Self::view_overlay(&tt);
464
465            // NB locks must exist until this function returns.
466            let locks = if let Ok(l) = tt.stage() {
467                l
468            } else {
469                tt.unstage();
470                continue;
471            };
472            let ret = f(&view);
473            if !tt.validate() {
474                tt.unstage();
475                continue;
476            }
477            match ret {
478                Ok(r) => {
479                    let guard = pin();
480                    tt.commit(&guard)?;
481                    drop(locks);
482                    tt.flush_if_configured()?;
483                    return Ok(r);
484                }
485                Err(ConflictableTransactionError::Abort(e)) => {
486                    return Err(TransactionError::Abort(e));
487                }
488                Err(ConflictableTransactionError::Conflict) => continue,
489                Err(ConflictableTransactionError::Storage(other)) => {
490                    return Err(TransactionError::Storage(other));
491                }
492            }
493        }
494    }
495}
496
497impl<E> Transactional<E> for &Tree {
498    type View = TransactionalTree;
499
500    fn make_overlay(&self) -> Result<TransactionalTrees> {
501        Ok(TransactionalTrees {
502            inner: vec![TransactionalTree::from_tree(self)],
503        })
504    }
505
506    fn view_overlay(overlay: &TransactionalTrees) -> Self::View {
507        overlay.inner[0].clone()
508    }
509}
510
511impl<E> Transactional<E> for &&Tree {
512    type View = TransactionalTree;
513
514    fn make_overlay(&self) -> Result<TransactionalTrees> {
515        Ok(TransactionalTrees {
516            inner: vec![TransactionalTree::from_tree(*self)],
517        })
518    }
519
520    fn view_overlay(overlay: &TransactionalTrees) -> Self::View {
521        overlay.inner[0].clone()
522    }
523}
524
525impl<E> Transactional<E> for Tree {
526    type View = TransactionalTree;
527
528    fn make_overlay(&self) -> Result<TransactionalTrees> {
529        Ok(TransactionalTrees {
530            inner: vec![TransactionalTree::from_tree(self)],
531        })
532    }
533
534    fn view_overlay(overlay: &TransactionalTrees) -> Self::View {
535        overlay.inner[0].clone()
536    }
537}
538
539impl<E> Transactional<E> for [Tree] {
540    type View = Vec<TransactionalTree>;
541
542    fn make_overlay(&self) -> Result<TransactionalTrees> {
543        let same_db = self.windows(2).all(|w| {
544            let path_1 = w[0].context.get_path();
545            let path_2 = w[1].context.get_path();
546            path_1 == path_2
547        });
548        if !same_db {
549            return Err(Error::Unsupported(
550                "cannot use trees from multiple databases in the same transaction".into(),
551            ));
552        }
553
554        Ok(TransactionalTrees {
555            inner: self
556                .iter()
557                .map(|t| TransactionalTree::from_tree(t))
558                .collect(),
559        })
560    }
561
562    fn view_overlay(overlay: &TransactionalTrees) -> Self::View {
563        overlay.inner.clone()
564    }
565}
566
567impl<E> Transactional<E> for [&Tree] {
568    type View = Vec<TransactionalTree>;
569
570    fn make_overlay(&self) -> Result<TransactionalTrees> {
571        let same_db = self.windows(2).all(|w| {
572            let path_1 = w[0].context.get_path();
573            let path_2 = w[1].context.get_path();
574            path_1 == path_2
575        });
576        if !same_db {
577            return Err(Error::Unsupported(
578                "cannot use trees from multiple databases in the same transaction".into(),
579            ));
580        }
581
582        Ok(TransactionalTrees {
583            inner: self
584                .iter()
585                .map(|&t| TransactionalTree::from_tree(t))
586                .collect(),
587        })
588    }
589
590    fn view_overlay(overlay: &TransactionalTrees) -> Self::View {
591        overlay.inner.clone()
592    }
593}
594
595macro_rules! repeat_type {
596    ($t:ty, ($literal:literal)) => {
597        ($t,)
598    };
599    ($t:ty, ($($literals:literal),+)) => {
600        repeat_type!(IMPL $t, (), ($($literals),*))
601    };
602    (IMPL $t:ty, (), ($first:literal, $($rest:literal),*)) => {
603        repeat_type!(IMPL $t, ($t), ($($rest),*))
604    };
605    (IMPL $t:ty, ($($partial:tt),*), ($first:literal, $($rest:literal),*)) => {
606        repeat_type!(IMPL $t, ($t, $($partial),*), ($($rest),*))
607    };
608    (IMPL $t:ty, ($($partial:tt),*), ($last:literal)) => {
609        ($($partial),*, $t)
610    };
611}
612
613macro_rules! impl_transactional_tuple_trees {
614    ($($indices:tt),+) => {
615        impl<E> Transactional<E> for repeat_type!(&Tree, ($($indices),+)) {
616            type View = repeat_type!(TransactionalTree, ($($indices),+));
617
618            fn make_overlay(&self) -> Result<TransactionalTrees> {
619                let mut paths = vec![];
620                $(
621                    paths.push(self.$indices.context.get_path());
622                )+
623                if !paths.windows(2).all(|w| {
624                    w[0] == w[1]
625                }) {
626                    return Err(Error::Unsupported(
627                        "cannot use trees from multiple databases in the same transaction".into(),
628                    ));
629                }
630
631                Ok(TransactionalTrees {
632                    inner: vec![
633                        $(
634                            TransactionalTree::from_tree(self.$indices)
635                        ),+
636                    ],
637                })
638            }
639
640            fn view_overlay(overlay: &TransactionalTrees) -> Self::View {
641                (
642                    $(
643                        overlay.inner[$indices].clone()
644                    ),+,
645                )
646            }
647        }
648    };
649}
650
651impl_transactional_tuple_trees!(0);
652impl_transactional_tuple_trees!(0, 1);
653impl_transactional_tuple_trees!(0, 1, 2);
654impl_transactional_tuple_trees!(0, 1, 2, 3);
655impl_transactional_tuple_trees!(0, 1, 2, 3, 4);
656impl_transactional_tuple_trees!(0, 1, 2, 3, 4, 5);
657impl_transactional_tuple_trees!(0, 1, 2, 3, 4, 5, 6);
658impl_transactional_tuple_trees!(0, 1, 2, 3, 4, 5, 6, 7);
659impl_transactional_tuple_trees!(0, 1, 2, 3, 4, 5, 6, 7, 8);
660impl_transactional_tuple_trees!(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
661impl_transactional_tuple_trees!(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
662impl_transactional_tuple_trees!(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11);
663impl_transactional_tuple_trees!(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12);
664impl_transactional_tuple_trees!(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13);