Skip to main content

fjall/tx/optimistic/
write_tx.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use crate::{
6    snapshot_nonce::SnapshotNonce,
7    tx::{
8        optimistic::{
9            conflict_manager::ConflictManager,
10            oracle::{CommitOutcome, Oracle},
11        },
12        write_tx::BaseTransaction,
13    },
14    Database, Guard, Iter, Keyspace, PersistMode, Readable,
15};
16use lsm_tree::{Slice, UserKey, UserValue};
17use std::{
18    fmt,
19    ops::{Bound, RangeBounds, RangeFull},
20    sync::Arc,
21};
22
23/// Transaction conflict
24///
25/// SSI transactions can conflict which require them to be rerun.
26#[derive(Debug)]
27pub struct Conflict;
28
29impl std::error::Error for Conflict {}
30
31impl fmt::Display for Conflict {
32    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
33        "Transaction conflict".fmt(f)
34    }
35}
36
37/// A cross-keyspace transaction using optimistic concurrency control
38///
39/// Use [`WriteTransaction::commit`] to commit changes to the keyspace(s).
40///
41/// Transactions keep a consistent view of the database at the time,
42/// meaning old data will not be dropped until it is not referenced by any active transaction.
43///
44/// For that reason, you should try to keep transactions short-lived, and make sure they
45/// are not held somewhere forever.
46///
47/// # Caution
48///
49/// The transaction may fail and have to be rerun if it conflicts.
50#[clippy::has_significant_drop]
51pub struct WriteTransaction {
52    inner: BaseTransaction,
53    cm: ConflictManager,
54    oracle: Arc<Oracle>,
55}
56
57impl Readable for WriteTransaction {
58    fn get<K: AsRef<[u8]>>(
59        &self,
60        keyspace: impl AsRef<Keyspace>,
61        key: K,
62    ) -> crate::Result<Option<UserValue>> {
63        let keyspace = keyspace.as_ref();
64
65        let res = self.inner.get(keyspace, key.as_ref())?;
66
67        self.cm.mark_read(keyspace.id, key.as_ref().into());
68
69        Ok(res)
70    }
71
72    fn contains_key<K: AsRef<[u8]>>(
73        &self,
74        keyspace: impl AsRef<Keyspace>,
75        key: K,
76    ) -> crate::Result<bool> {
77        let keyspace = keyspace.as_ref();
78
79        let contains = self.inner.contains_key(keyspace, key.as_ref())?;
80
81        self.cm.mark_read(keyspace.id, key.as_ref().into());
82
83        Ok(contains)
84    }
85
86    fn first_key_value(&self, keyspace: impl AsRef<Keyspace>) -> Option<Guard> {
87        self.iter(&keyspace).next()
88    }
89
90    fn last_key_value(&self, keyspace: impl AsRef<Keyspace>) -> Option<Guard> {
91        self.iter(&keyspace).next_back()
92    }
93
94    fn size_of<K: AsRef<[u8]>>(
95        &self,
96        keyspace: impl AsRef<Keyspace>,
97        key: K,
98    ) -> crate::Result<Option<u32>> {
99        let keyspace = keyspace.as_ref();
100        self.inner.size_of(keyspace, key)
101    }
102
103    fn iter(&self, keyspace: impl AsRef<Keyspace>) -> Iter {
104        self.cm.mark_range(keyspace.as_ref().id, RangeFull);
105        self.inner.iter(keyspace)
106    }
107
108    fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
109        &self,
110        keyspace: impl AsRef<Keyspace>,
111        range: R,
112    ) -> Iter {
113        let start: Bound<Slice> = range.start_bound().map(|k| k.as_ref().into());
114        let end: Bound<Slice> = range.end_bound().map(|k| k.as_ref().into());
115
116        self.cm.mark_range(keyspace.as_ref().id, (start, end));
117
118        self.inner.range(keyspace, range)
119    }
120
121    fn prefix<K: AsRef<[u8]>>(&self, keyspace: impl AsRef<Keyspace>, prefix: K) -> Iter {
122        self.range(keyspace, lsm_tree::range::prefix_to_range(prefix.as_ref()))
123    }
124}
125
126impl WriteTransaction {
127    pub(crate) fn new(db: Database, nonce: SnapshotNonce, oracle: Arc<Oracle>) -> Self {
128        Self {
129            inner: BaseTransaction::new(db, nonce),
130            cm: ConflictManager::default(),
131            oracle,
132        }
133    }
134
135    /// Sets the durability level.
136    #[must_use]
137    pub fn durability(mut self, mode: Option<PersistMode>) -> Self {
138        self.inner = self.inner.durability(mode);
139        self
140    }
141
142    /// Removes an item and returns its value if it existed.
143    ///
144    /// ```
145    /// # use fjall::{OptimisticTxDatabase, KeyspaceCreateOptions, Readable};
146    /// # use std::sync::Arc;
147    /// #
148    /// # let folder = tempfile::tempdir()?;
149    /// # let db = OptimisticTxDatabase::builder(folder).open()?;
150    /// # let tree = db.keyspace("default", KeyspaceCreateOptions::default)?;
151    /// tree.insert("a", "abc")?;
152    ///
153    /// let mut tx = db.write_tx()?;
154    ///
155    /// let taken = tx.take(&tree, "a")?.unwrap();
156    /// assert_eq!(b"abc", &*taken);
157    /// tx.commit()?;
158    ///
159    /// let item = tree.get("a")?;
160    /// assert!(item.is_none());
161    /// #
162    /// # Ok::<(), fjall::Error>(())
163    /// ```
164    ///
165    /// # Errors
166    ///
167    /// Will return `Err` if an IO error occurs.
168    pub fn take<K: Into<UserKey>>(
169        &mut self,
170        keyspace: impl AsRef<Keyspace>,
171        key: K,
172    ) -> crate::Result<Option<UserValue>> {
173        self.fetch_update(keyspace, key, |_| None)
174    }
175
176    /// Atomically updates an item and returns the new value.
177    ///
178    /// Returning `None` removes the item if it existed before.
179    ///
180    /// # Examples
181    ///
182    /// ```
183    /// # use fjall::{OptimisticTxDatabase, KeyspaceCreateOptions, Readable, Slice};
184    /// #
185    /// # let folder = tempfile::tempdir()?;
186    /// # let db = OptimisticTxDatabase::builder(folder).open()?;
187    /// # let tree = db.keyspace("default", KeyspaceCreateOptions::default)?;
188    /// tree.insert("a", "abc")?;
189    ///
190    /// let mut tx = db.write_tx()?;
191    ///
192    /// let updated = tx.update_fetch(&tree, "a", |_| Some(Slice::from(*b"def")))?.unwrap();
193    /// assert_eq!(b"def", &*updated);
194    /// tx.commit()?;
195    ///
196    /// let item = tree.get("a")?;
197    /// assert_eq!(Some("def".as_bytes().into()), item);
198    /// #
199    /// # Ok::<(), fjall::Error>(())
200    /// ```
201    ///
202    /// ```
203    /// # use fjall::{OptimisticTxDatabase, KeyspaceCreateOptions, Readable};
204    /// # use std::sync::Arc;
205    /// #
206    /// # let folder = tempfile::tempdir()?;
207    /// # let db = OptimisticTxDatabase::builder(folder).open()?;
208    /// # let tree = db.keyspace("default", KeyspaceCreateOptions::default)?;
209    /// tree.insert("a", "abc")?;
210    ///
211    /// let mut tx = db.write_tx()?;
212    ///
213    /// let updated = tx.update_fetch(&tree, "a", |_| None)?;
214    /// assert!(updated.is_none());
215    /// tx.commit()?;
216    ///
217    /// let item = tree.get("a")?;
218    /// assert!(item.is_none());
219    /// #
220    /// # Ok::<(), fjall::Error>(())
221    /// ```
222    ///
223    /// # Errors
224    ///
225    /// Will return `Err` if an IO error occurs.
226    pub fn update_fetch<K: Into<UserKey>, F: FnOnce(Option<&UserValue>) -> Option<UserValue>>(
227        &mut self,
228        keyspace: impl AsRef<Keyspace>,
229        key: K,
230        f: F,
231    ) -> crate::Result<Option<UserValue>> {
232        let keyspace = keyspace.as_ref();
233        let key: UserKey = key.into();
234
235        let updated = self.inner.update_fetch(keyspace, key.clone(), f)?;
236
237        self.cm.mark_read(keyspace.id, key.clone());
238        self.cm.mark_conflict(keyspace.id, key);
239
240        Ok(updated)
241    }
242
243    /// Atomically updates an item and returns the previous value.
244    ///
245    /// Returning `None` removes the item if it existed before.
246    ///
247    /// # Examples
248    ///
249    /// ```
250    /// # use fjall::{OptimisticTxDatabase, KeyspaceCreateOptions, Readable, Slice};
251    /// #
252    /// # let folder = tempfile::tempdir()?;
253    /// # let db = OptimisticTxDatabase::builder(folder).open()?;
254    /// # let tree = db.keyspace("default", KeyspaceCreateOptions::default)?;
255    /// tree.insert("a", "abc")?;
256    ///
257    /// let mut tx = db.write_tx()?;
258    ///
259    /// let prev = tx.fetch_update(&tree, "a", |_| Some(Slice::from(*b"def")))?.unwrap();
260    /// assert_eq!(b"abc", &*prev);
261    /// tx.commit()?;
262    ///
263    /// let item = tree.get("a")?;
264    /// assert_eq!(Some("def".as_bytes().into()), item);
265    /// #
266    /// # Ok::<(), fjall::Error>(())
267    /// ```
268    ///
269    /// ```
270    /// # use fjall::{OptimisticTxDatabase, KeyspaceCreateOptions, Readable};
271    /// # use std::sync::Arc;
272    /// #
273    /// # let folder = tempfile::tempdir()?;
274    /// # let db = OptimisticTxDatabase::builder(folder).open()?;
275    /// # let tree = db.keyspace("default", KeyspaceCreateOptions::default)?;
276    /// tree.insert("a", "abc")?;
277    ///
278    /// let mut tx = db.write_tx()?;
279    ///
280    /// let prev = tx.fetch_update(&tree, "a", |_| None)?.unwrap();
281    /// assert_eq!(b"abc", &*prev);
282    /// tx.commit()?;
283    ///
284    /// let item = tree.get("a")?;
285    /// assert!(item.is_none());
286    /// #
287    /// # Ok::<(), fjall::Error>(())
288    /// ```
289    ///
290    /// # Errors
291    ///
292    /// Will return `Err` if an IO error occurs.
293    pub fn fetch_update<K: Into<UserKey>, F: FnOnce(Option<&UserValue>) -> Option<UserValue>>(
294        &mut self,
295        keyspace: impl AsRef<Keyspace>,
296        key: K,
297        f: F,
298    ) -> crate::Result<Option<UserValue>> {
299        let keyspace = keyspace.as_ref();
300        let key = key.into();
301
302        let prev = self.inner.fetch_update(keyspace, key.clone(), f)?;
303
304        self.cm.mark_read(keyspace.id, key.clone());
305        self.cm.mark_conflict(keyspace.id, key);
306
307        Ok(prev)
308    }
309
310    /// Inserts a key-value pair into the keyspace.
311    ///
312    /// Keys may be up to 65536 bytes long, values up to 2^32 bytes.
313    /// Shorter keys and values result in better performance.
314    ///
315    /// If the key already exists, the item will be overwritten.
316    ///
317    /// # Examples
318    ///
319    /// ```
320    /// # use fjall::{OptimisticTxDatabase, KeyspaceCreateOptions, Readable};
321    /// #
322    /// # let folder = tempfile::tempdir()?;
323    /// # let db = OptimisticTxDatabase::builder(folder).open()?;
324    /// # let tree = db.keyspace("default", KeyspaceCreateOptions::default)?;
325    /// tree.insert("a", "previous_value")?;
326    /// assert_eq!(b"previous_value", &*tree.get("a")?.unwrap());
327    ///
328    /// let mut tx = db.write_tx()?;
329    /// tx.insert(&tree, "a", "new_value");
330    ///
331    /// drop(tx);
332    ///
333    /// // Write was not committed
334    /// assert_eq!(b"previous_value", &*tree.get("a")?.unwrap());
335    /// #
336    /// # Ok::<(), fjall::Error>(())
337    /// ```
338    pub fn insert<K: Into<UserKey>, V: Into<UserValue>>(
339        &mut self,
340        keyspace: impl AsRef<Keyspace>,
341        key: K,
342        value: V,
343    ) {
344        let keyspace = keyspace.as_ref();
345        let key: UserKey = key.into();
346
347        self.inner.insert(keyspace, key.clone(), value);
348        self.cm.mark_conflict(keyspace.id, key);
349    }
350
351    /// Removes an item from the keyspace.
352    ///
353    /// The key may be up to 65536 bytes long.
354    /// Shorter keys result in better performance.
355    ///
356    /// # Examples
357    ///
358    /// ```
359    /// # use fjall::{OptimisticTxDatabase, KeyspaceCreateOptions, Readable};
360    /// #
361    /// # let folder = tempfile::tempdir()?;
362    /// # let db = OptimisticTxDatabase::builder(folder).open()?;
363    /// # let tree = db.keyspace("default", KeyspaceCreateOptions::default)?;
364    /// tree.insert("a", "previous_value")?;
365    /// assert_eq!(b"previous_value", &*tree.get("a")?.unwrap());
366    ///
367    /// let mut tx = db.write_tx()?;
368    /// tx.remove(&tree, "a");
369    ///
370    /// // Read-your-own-write
371    /// let item = tx.get(&tree, "a")?;
372    /// assert_eq!(None, item);
373    ///
374    /// drop(tx);
375    ///
376    /// // Deletion was not committed
377    /// assert_eq!(b"previous_value", &*tree.get("a")?.unwrap());
378    /// #
379    /// # Ok::<(), fjall::Error>(())
380    /// ```
381    pub fn remove<K: Into<UserKey>>(&mut self, keyspace: impl AsRef<Keyspace>, key: K) {
382        let keyspace = keyspace.as_ref();
383        let key: UserKey = key.into();
384
385        self.inner.remove(keyspace, key.clone());
386        self.cm.mark_conflict(keyspace.id, key);
387    }
388
389    /// Removes an item from the keyspace, leaving behind a weak tombstone.
390    ///
391    /// The tombstone marker of this delete operation will vanish when it
392    /// collides with its corresponding insertion.
393    /// This may cause older versions of the value to be resurrected, so it should
394    /// only be used and preferred in scenarios where a key is only ever written once.
395    ///
396    /// # Experimental
397    ///
398    /// This function is currently experimental.
399    ///
400    /// # Examples
401    ///
402    /// ```
403    /// # use fjall::{OptimisticTxDatabase, KeyspaceCreateOptions, Readable};
404    /// #
405    /// # let folder = tempfile::tempdir()?;
406    /// # let db = OptimisticTxDatabase::builder(folder).open()?;
407    /// # let tree = db.keyspace("default", KeyspaceCreateOptions::default)?;
408    /// tree.insert("a", "previous_value")?;
409    /// assert_eq!(b"previous_value", &*tree.get("a")?.unwrap());
410    ///
411    /// let mut tx = db.write_tx()?;
412    /// tx.remove_weak(&tree, "a");
413    ///
414    /// // Read-your-own-write
415    /// let item = tx.get(&tree, "a")?;
416    /// assert_eq!(None, item);
417    ///
418    /// drop(tx);
419    ///
420    /// // Deletion was not committed
421    /// assert_eq!(b"previous_value", &*tree.get("a")?.unwrap());
422    /// #
423    /// # Ok::<(), fjall::Error>(())
424    /// ```
425    #[doc(hidden)]
426    pub fn remove_weak<K: Into<UserKey>>(&mut self, keyspace: impl AsRef<Keyspace>, key: K) {
427        let keyspace = keyspace.as_ref();
428        let key: UserKey = key.into();
429
430        self.inner.remove_weak(keyspace, key.clone());
431        self.cm.mark_conflict(keyspace.id, key);
432    }
433
434    /// Commits the transaction.
435    ///
436    /// # Errors
437    ///
438    /// Will return `Err` if an IO error occurs.
439    pub fn commit(self) -> crate::Result<Result<(), Conflict>> {
440        // NOTE: We have no write set, so we are basically
441        // a read-only transaction, so nothing to do here
442        if self.inner.memtables.is_empty() {
443            return Ok(Ok(()));
444        }
445
446        let oracle = self.oracle.clone();
447
448        match oracle.with_commit(self.inner.nonce.instant, self.cm, move || {
449            self.inner.commit()
450        })? {
451            CommitOutcome::Ok => Ok(Ok(())),
452            CommitOutcome::Aborted(e) => Err(e),
453            CommitOutcome::Conflicted => Ok(Err(Conflict)),
454        }
455    }
456
457    /// More explicit alternative to dropping the transaction
458    /// to roll it back.
459    pub fn rollback(self) {
460        self.inner.rollback();
461    }
462}
463
464#[cfg(test)]
465mod tests {
466    use crate::{
467        Conflict, KeyspaceCreateOptions, OptimisticTxDatabase, OptimisticTxKeyspace, Readable,
468    };
469    use tempfile::TempDir;
470    use test_log::test;
471
472    struct TestEnv {
473        db: OptimisticTxDatabase,
474        tree: OptimisticTxKeyspace,
475
476        #[expect(unused)]
477        tmpdir: TempDir,
478    }
479
480    impl TestEnv {
481        fn seed_hermitage_data(&self) -> crate::Result<()> {
482            self.tree.insert([1u8], [10u8])?;
483            self.tree.insert([2u8], [20u8])?;
484            Ok(())
485        }
486    }
487
488    fn setup() -> Result<TestEnv, Box<dyn std::error::Error>> {
489        let tmpdir = tempfile::tempdir()?;
490        let db = OptimisticTxDatabase::builder(tmpdir.path()).open()?;
491
492        let tree = db.keyspace("foo", KeyspaceCreateOptions::default)?;
493
494        Ok(TestEnv { db, tree, tmpdir })
495    }
496
497    // Adapted from https://github.com/al8n/skipdb/issues/10
498    #[test]
499    #[expect(clippy::unwrap_used)]
500    fn tx_ssi_arthur_1() -> Result<(), Box<dyn std::error::Error>> {
501        let env = setup()?;
502
503        let mut tx = env.db.write_tx()?;
504        tx.insert(env.tree.inner(), "a1", 10u64.to_be_bytes());
505        tx.insert(env.tree.inner(), "b1", 100u64.to_be_bytes());
506        tx.insert(env.tree.inner(), "b2", 200u64.to_be_bytes());
507        tx.commit()??;
508
509        let mut tx1 = env.db.write_tx()?;
510        let val = tx1
511            .range(&env.tree, "a".."b")
512            .map(|kv| {
513                let v = kv.value().unwrap();
514
515                let mut buf = [0u8; 8];
516                buf.copy_from_slice(&v);
517                u64::from_be_bytes(buf)
518            })
519            .sum::<u64>();
520        tx1.insert(env.tree.inner(), "b3", 10u64.to_be_bytes());
521        assert_eq!(10, val);
522
523        let mut tx2 = env.db.write_tx()?;
524        let val = tx2
525            .range(&env.tree, "b".."c")
526            .map(|kv| {
527                let v = kv.value().unwrap();
528
529                let mut buf = [0u8; 8];
530                buf.copy_from_slice(&v);
531                u64::from_be_bytes(buf)
532            })
533            .sum::<u64>();
534        tx2.insert(env.tree.inner(), "a3", 300u64.to_be_bytes());
535        assert_eq!(300, val);
536        tx2.commit()??;
537        assert!(matches!(tx1.commit()?, Err(Conflict)));
538
539        let tx3 = env.db.write_tx()?;
540        let val = tx3
541            .iter(&env.tree)
542            .filter_map(|kv| {
543                let (k, v) = kv.into_inner().unwrap();
544
545                if k.starts_with(b"a") {
546                    let mut buf = [0u8; 8];
547                    buf.copy_from_slice(&v);
548                    Some(u64::from_be_bytes(buf))
549                } else {
550                    None
551                }
552            })
553            .sum::<u64>();
554        assert_eq!(310, val);
555
556        Ok(())
557    }
558
559    // Adapted from https://github.com/al8n/skipdb/issues/10
560    #[test]
561    #[expect(clippy::unwrap_used)]
562    fn tx_ssi_arthur_2() -> Result<(), Box<dyn std::error::Error>> {
563        let env = setup()?;
564
565        let mut tx = env.db.write_tx()?;
566        tx.insert(env.tree.inner(), "b1", 100u64.to_be_bytes());
567        tx.insert(env.tree.inner(), "b2", 200u64.to_be_bytes());
568        tx.commit()??;
569
570        let mut tx1 = env.db.write_tx()?;
571        let val = tx1
572            .range(&env.tree, "a".."b")
573            .map(|kv| {
574                let v = kv.value().unwrap();
575
576                let mut buf = [0u8; 8];
577                buf.copy_from_slice(&v);
578                u64::from_be_bytes(buf)
579            })
580            .sum::<u64>();
581        tx1.insert(env.tree.inner(), "b3", 0u64.to_be_bytes());
582        assert_eq!(0, val);
583
584        let mut tx2 = env.db.write_tx()?;
585        let val = tx2
586            .range(&env.tree, "b".."c")
587            .map(|kv| {
588                let v = kv.value().unwrap();
589
590                let mut buf = [0u8; 8];
591                buf.copy_from_slice(&v);
592                u64::from_be_bytes(buf)
593            })
594            .sum::<u64>();
595        tx2.insert(env.tree.inner(), "a3", 300u64.to_be_bytes());
596        assert_eq!(300, val);
597        tx2.commit()??;
598        assert!(matches!(tx1.commit()?, Err(Conflict)));
599
600        let tx3 = env.db.write_tx()?;
601        let val = tx3
602            .iter(&env.tree)
603            .filter_map(|kv| {
604                let (k, v) = kv.into_inner().unwrap();
605
606                if k.starts_with(b"a") {
607                    let mut buf = [0u8; 8];
608                    buf.copy_from_slice(&v);
609                    Some(u64::from_be_bytes(buf))
610                } else {
611                    None
612                }
613            })
614            .sum::<u64>();
615        assert_eq!(300, val);
616
617        Ok(())
618    }
619
620    #[test]
621    fn tx_ssi_basic() -> Result<(), Box<dyn std::error::Error>> {
622        let env = setup()?;
623
624        let mut tx1 = env.db.write_tx()?;
625        let mut tx2 = env.db.write_tx()?;
626
627        tx1.insert(env.tree.inner(), "hello", "world");
628
629        tx1.commit()??;
630        assert!(env.tree.contains_key("hello")?);
631
632        assert_eq!(tx2.get(env.tree.inner(), "hello")?, None);
633
634        tx2.insert(env.tree.inner(), "hello", "world2");
635        assert!(matches!(tx2.commit()?, Err(Conflict)));
636
637        let mut tx1 = env.db.write_tx()?;
638        let mut tx2 = env.db.write_tx()?;
639
640        tx1.iter(&env.tree).next();
641        tx2.insert(env.tree.inner(), "hello", "world2");
642
643        tx1.insert(env.tree.inner(), "hello2", "world1");
644        tx1.commit()??;
645
646        tx2.commit()??;
647
648        Ok(())
649    }
650
651    #[test]
652    #[expect(clippy::unwrap_used)]
653    fn tx_ssi_ww() -> Result<(), Box<dyn std::error::Error>> {
654        // https://en.wikipedia.org/wiki/Write%E2%80%93write_conflict
655        let env = setup()?;
656
657        let mut tx1 = env.db.write_tx()?;
658        let mut tx2 = env.db.write_tx()?;
659
660        tx1.insert(env.tree.inner(), "a", "a");
661        tx2.insert(env.tree.inner(), "b", "c");
662        tx1.insert(env.tree.inner(), "b", "b");
663        tx1.commit()??;
664
665        tx2.insert(env.tree.inner(), "a", "c");
666
667        tx2.commit()??;
668        assert_eq!(b"c", &*env.tree.get("a")?.unwrap());
669        assert_eq!(b"c", &*env.tree.get("b")?.unwrap());
670
671        Ok(())
672    }
673
674    #[test]
675    #[expect(clippy::unwrap_used)]
676    fn tx_ssi_swap() -> Result<(), Box<dyn std::error::Error>> {
677        let env = setup()?;
678
679        env.tree.insert("x", "x")?;
680        env.tree.insert("y", "y")?;
681
682        let mut tx1 = env.db.write_tx()?;
683        let mut tx2 = env.db.write_tx()?;
684
685        {
686            let x = tx1.get(env.tree.inner(), "x")?.unwrap();
687            tx1.insert(env.tree.inner(), "y", x);
688        }
689
690        {
691            let y = tx2.get(env.tree.inner(), "y")?.unwrap();
692            tx2.insert(env.tree.inner(), "x", y);
693        }
694
695        tx1.commit()??;
696        assert!(matches!(tx2.commit()?, Err(Conflict)));
697
698        Ok(())
699    }
700
701    #[test]
702    fn tx_ssi_write_cycles() -> Result<(), Box<dyn std::error::Error>> {
703        let env = setup()?;
704        env.seed_hermitage_data()?;
705
706        let mut t1 = env.db.write_tx()?;
707        let mut t2 = env.db.write_tx()?;
708
709        t1.insert(env.tree.inner(), [1u8], [11u8]);
710        t2.insert(env.tree.inner(), [1u8], [12u8]);
711        t1.insert(env.tree.inner(), [2u8], [21u8]);
712        t1.commit()??;
713
714        assert_eq!(env.tree.get([1u8])?, Some([11u8].into()));
715
716        t2.insert(env.tree.inner(), [2u8], [22u8]);
717        t2.commit()??;
718
719        assert_eq!(env.tree.get([1u8])?, Some([12u8].into()));
720        assert_eq!(env.tree.get([2u8])?, Some([22u8].into()));
721
722        Ok(())
723    }
724
725    #[test]
726    fn tx_ssi_aborted_reads() -> Result<(), Box<dyn std::error::Error>> {
727        let env = setup()?;
728        env.seed_hermitage_data()?;
729
730        let mut t1 = env.db.write_tx()?;
731        let t2 = env.db.write_tx()?;
732
733        t1.insert(env.tree.inner(), [1u8], [101u8]);
734
735        assert_eq!(t2.get(env.tree.inner(), [1u8])?, Some([10u8].into()));
736
737        t1.rollback();
738
739        assert_eq!(t2.get(env.tree.inner(), [1u8])?, Some([10u8].into()));
740
741        t2.commit()??;
742
743        Ok(())
744    }
745
746    #[expect(clippy::unwrap_used)]
747    #[test]
748    fn tx_ssi_anti_dependency_cycles() -> Result<(), Box<dyn std::error::Error>> {
749        let env = setup()?;
750        env.seed_hermitage_data()?;
751
752        let mut t1 = env.db.write_tx()?;
753        {
754            let mut iter = t1.iter(&env.tree);
755            assert_eq!(
756                iter.next().unwrap().into_inner()?,
757                ([1u8].into(), [10u8].into()),
758            );
759            assert_eq!(
760                iter.next().unwrap().into_inner()?,
761                ([2u8].into(), [20u8].into()),
762            );
763            assert!(iter.next().is_none());
764        }
765
766        let mut t2 = env.db.write_tx()?;
767        let new = t2.update_fetch(&env.tree, [2u8], |v| {
768            v.and_then(|v| v.first().copied()).map(|v| [v + 5].into())
769        })?;
770        assert_eq!(new, Some([25u8].into()));
771        t2.commit()??;
772
773        let t3 = env.db.write_tx()?;
774        {
775            let mut iter = t3.iter(&env.tree);
776            assert_eq!(
777                iter.next().unwrap().into_inner()?,
778                ([1u8].into(), [10u8].into()),
779            );
780            assert_eq!(
781                iter.next().unwrap().into_inner()?,
782                ([2u8].into(), [25u8].into()),
783            ); // changed here
784            assert!(iter.next().is_none());
785        }
786
787        t3.commit()??;
788
789        t1.insert(env.tree.inner(), [1u8], [0u8]);
790
791        assert!(matches!(t1.commit()?, Err(Conflict)));
792
793        Ok(())
794    }
795
796    #[test]
797    fn tx_ssi_update_fetch_update() -> Result<(), Box<dyn std::error::Error>> {
798        let env = setup()?;
799
800        let mut t1 = env.db.write_tx()?;
801        let mut t2 = env.db.write_tx()?;
802
803        let new = t1.update_fetch(env.tree.inner(), "hello", |_| Some("world".into()))?;
804        assert_eq!(new, Some("world".into()));
805        let old = t2.fetch_update(env.tree.inner(), "hello", |_| Some("world2".into()))?;
806        assert_eq!(old, None);
807
808        t1.commit()??;
809        assert!(matches!(t2.commit()?, Err(Conflict)));
810
811        assert_eq!(env.tree.get("hello")?, Some("world".into()));
812
813        let mut t1 = env.db.write_tx()?;
814        let mut t2 = env.db.write_tx()?;
815
816        let old = t1.fetch_update(env.tree.inner(), "hello", |_| Some("world3".into()))?;
817        assert_eq!(old, Some("world".into()));
818        let new = t2.update_fetch(env.tree.inner(), "hello2", |_| Some("world2".into()))?;
819        assert_eq!(new, Some("world2".into()));
820
821        t1.commit()??;
822        t2.commit()??;
823
824        assert_eq!(env.tree.get("hello")?, Some("world3".into()));
825        assert_eq!(env.tree.get("hello2")?, Some("world2".into()));
826
827        Ok(())
828    }
829
830    #[test]
831    fn tx_ssi_range() -> Result<(), Box<dyn std::error::Error>> {
832        let env = setup()?;
833
834        let mut t1 = env.db.write_tx()?;
835        let mut t2 = env.db.write_tx()?;
836
837        _ = t1.range(&env.tree, "h"..="hello");
838        t1.insert(env.tree.inner(), "foo", "bar");
839
840        // insert a key INSIDE the range read by t1
841        t2.insert(env.tree.inner(), "hello", "world");
842
843        t2.commit()??;
844        assert!(matches!(t1.commit()?, Err(Conflict)));
845
846        let mut t1 = env.db.write_tx()?;
847        let mut t2 = env.db.write_tx()?;
848
849        _ = t1.range(&env.tree, "h"..="hello");
850        t1.insert(env.tree.inner(), "foo", "bar");
851
852        // insert a key OUTSIDE the range read by t1
853        t2.insert(env.tree.inner(), "hello2", "world");
854
855        t2.commit()??;
856        t1.commit()??;
857
858        Ok(())
859    }
860
861    #[test]
862    fn tx_ssi_prefix() -> Result<(), Box<dyn std::error::Error>> {
863        let env = setup()?;
864
865        let mut t1 = env.db.write_tx()?;
866        let mut t2 = env.db.write_tx()?;
867
868        _ = t1.prefix(&env.tree, "hello");
869        t1.insert(env.tree.inner(), "foo", "bar");
870
871        // insert a key MATCHING the prefix read by t1
872        t2.insert(env.tree.inner(), "hello", "world");
873
874        t2.commit()??;
875        assert!(matches!(t1.commit()?, Err(Conflict)));
876
877        let mut t1 = env.db.write_tx()?;
878        let mut t2 = env.db.write_tx()?;
879
880        _ = t1.prefix(&env.tree, "hello");
881        t1.insert(env.tree.inner(), "foo", "bar");
882
883        // insert a key NOT MATCHING the range read by t1
884        t2.insert(env.tree.inner(), "foobar", "world");
885
886        t2.commit()??;
887        t1.commit()??;
888
889        Ok(())
890    }
891}