1use crate::{
6 snapshot_nonce::SnapshotNonce,
7 tx::{
8 optimistic::{
9 conflict_manager::ConflictManager,
10 oracle::{CommitOutcome, Oracle},
11 },
12 write_tx::BaseTransaction,
13 },
14 Database, Guard, Iter, Keyspace, PersistMode, Readable,
15};
16use lsm_tree::{Slice, UserKey, UserValue};
17use std::{
18 fmt,
19 ops::{Bound, RangeBounds, RangeFull},
20 sync::Arc,
21};
22
23#[derive(Debug)]
27pub struct Conflict;
28
29impl std::error::Error for Conflict {}
30
31impl fmt::Display for Conflict {
32 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
33 "Transaction conflict".fmt(f)
34 }
35}
36
37#[clippy::has_significant_drop]
51pub struct WriteTransaction {
52 inner: BaseTransaction,
53 cm: ConflictManager,
54 oracle: Arc<Oracle>,
55}
56
57impl Readable for WriteTransaction {
58 fn get<K: AsRef<[u8]>>(
59 &self,
60 keyspace: impl AsRef<Keyspace>,
61 key: K,
62 ) -> crate::Result<Option<UserValue>> {
63 let keyspace = keyspace.as_ref();
64
65 let res = self.inner.get(keyspace, key.as_ref())?;
66
67 self.cm.mark_read(keyspace.id, key.as_ref().into());
68
69 Ok(res)
70 }
71
72 fn contains_key<K: AsRef<[u8]>>(
73 &self,
74 keyspace: impl AsRef<Keyspace>,
75 key: K,
76 ) -> crate::Result<bool> {
77 let keyspace = keyspace.as_ref();
78
79 let contains = self.inner.contains_key(keyspace, key.as_ref())?;
80
81 self.cm.mark_read(keyspace.id, key.as_ref().into());
82
83 Ok(contains)
84 }
85
86 fn first_key_value(&self, keyspace: impl AsRef<Keyspace>) -> Option<Guard> {
87 self.iter(&keyspace).next()
88 }
89
90 fn last_key_value(&self, keyspace: impl AsRef<Keyspace>) -> Option<Guard> {
91 self.iter(&keyspace).next_back()
92 }
93
94 fn size_of<K: AsRef<[u8]>>(
95 &self,
96 keyspace: impl AsRef<Keyspace>,
97 key: K,
98 ) -> crate::Result<Option<u32>> {
99 let keyspace = keyspace.as_ref();
100 self.inner.size_of(keyspace, key)
101 }
102
103 fn iter(&self, keyspace: impl AsRef<Keyspace>) -> Iter {
104 self.cm.mark_range(keyspace.as_ref().id, RangeFull);
105 self.inner.iter(keyspace)
106 }
107
108 fn range<K: AsRef<[u8]>, R: RangeBounds<K>>(
109 &self,
110 keyspace: impl AsRef<Keyspace>,
111 range: R,
112 ) -> Iter {
113 let start: Bound<Slice> = range.start_bound().map(|k| k.as_ref().into());
114 let end: Bound<Slice> = range.end_bound().map(|k| k.as_ref().into());
115
116 self.cm.mark_range(keyspace.as_ref().id, (start, end));
117
118 self.inner.range(keyspace, range)
119 }
120
121 fn prefix<K: AsRef<[u8]>>(&self, keyspace: impl AsRef<Keyspace>, prefix: K) -> Iter {
122 self.range(keyspace, lsm_tree::range::prefix_to_range(prefix.as_ref()))
123 }
124}
125
126impl WriteTransaction {
127 pub(crate) fn new(db: Database, nonce: SnapshotNonce, oracle: Arc<Oracle>) -> Self {
128 Self {
129 inner: BaseTransaction::new(db, nonce),
130 cm: ConflictManager::default(),
131 oracle,
132 }
133 }
134
135 #[must_use]
137 pub fn durability(mut self, mode: Option<PersistMode>) -> Self {
138 self.inner = self.inner.durability(mode);
139 self
140 }
141
142 pub fn take<K: Into<UserKey>>(
169 &mut self,
170 keyspace: impl AsRef<Keyspace>,
171 key: K,
172 ) -> crate::Result<Option<UserValue>> {
173 self.fetch_update(keyspace, key, |_| None)
174 }
175
176 pub fn update_fetch<K: Into<UserKey>, F: FnOnce(Option<&UserValue>) -> Option<UserValue>>(
227 &mut self,
228 keyspace: impl AsRef<Keyspace>,
229 key: K,
230 f: F,
231 ) -> crate::Result<Option<UserValue>> {
232 let keyspace = keyspace.as_ref();
233 let key: UserKey = key.into();
234
235 let updated = self.inner.update_fetch(keyspace, key.clone(), f)?;
236
237 self.cm.mark_read(keyspace.id, key.clone());
238 self.cm.mark_conflict(keyspace.id, key);
239
240 Ok(updated)
241 }
242
243 pub fn fetch_update<K: Into<UserKey>, F: FnOnce(Option<&UserValue>) -> Option<UserValue>>(
294 &mut self,
295 keyspace: impl AsRef<Keyspace>,
296 key: K,
297 f: F,
298 ) -> crate::Result<Option<UserValue>> {
299 let keyspace = keyspace.as_ref();
300 let key = key.into();
301
302 let prev = self.inner.fetch_update(keyspace, key.clone(), f)?;
303
304 self.cm.mark_read(keyspace.id, key.clone());
305 self.cm.mark_conflict(keyspace.id, key);
306
307 Ok(prev)
308 }
309
310 pub fn insert<K: Into<UserKey>, V: Into<UserValue>>(
339 &mut self,
340 keyspace: impl AsRef<Keyspace>,
341 key: K,
342 value: V,
343 ) {
344 let keyspace = keyspace.as_ref();
345 let key: UserKey = key.into();
346
347 self.inner.insert(keyspace, key.clone(), value);
348 self.cm.mark_conflict(keyspace.id, key);
349 }
350
351 pub fn remove<K: Into<UserKey>>(&mut self, keyspace: impl AsRef<Keyspace>, key: K) {
382 let keyspace = keyspace.as_ref();
383 let key: UserKey = key.into();
384
385 self.inner.remove(keyspace, key.clone());
386 self.cm.mark_conflict(keyspace.id, key);
387 }
388
389 #[doc(hidden)]
426 pub fn remove_weak<K: Into<UserKey>>(&mut self, keyspace: impl AsRef<Keyspace>, key: K) {
427 let keyspace = keyspace.as_ref();
428 let key: UserKey = key.into();
429
430 self.inner.remove_weak(keyspace, key.clone());
431 self.cm.mark_conflict(keyspace.id, key);
432 }
433
434 pub fn commit(self) -> crate::Result<Result<(), Conflict>> {
440 if self.inner.memtables.is_empty() {
443 return Ok(Ok(()));
444 }
445
446 let oracle = self.oracle.clone();
447
448 match oracle.with_commit(self.inner.nonce.instant, self.cm, move || {
449 self.inner.commit()
450 })? {
451 CommitOutcome::Ok => Ok(Ok(())),
452 CommitOutcome::Aborted(e) => Err(e),
453 CommitOutcome::Conflicted => Ok(Err(Conflict)),
454 }
455 }
456
457 pub fn rollback(self) {
460 self.inner.rollback();
461 }
462}
463
464#[cfg(test)]
465mod tests {
466 use crate::{
467 Conflict, KeyspaceCreateOptions, OptimisticTxDatabase, OptimisticTxKeyspace, Readable,
468 };
469 use tempfile::TempDir;
470 use test_log::test;
471
472 struct TestEnv {
473 db: OptimisticTxDatabase,
474 tree: OptimisticTxKeyspace,
475
476 #[expect(unused)]
477 tmpdir: TempDir,
478 }
479
480 impl TestEnv {
481 fn seed_hermitage_data(&self) -> crate::Result<()> {
482 self.tree.insert([1u8], [10u8])?;
483 self.tree.insert([2u8], [20u8])?;
484 Ok(())
485 }
486 }
487
488 fn setup() -> Result<TestEnv, Box<dyn std::error::Error>> {
489 let tmpdir = tempfile::tempdir()?;
490 let db = OptimisticTxDatabase::builder(tmpdir.path()).open()?;
491
492 let tree = db.keyspace("foo", KeyspaceCreateOptions::default)?;
493
494 Ok(TestEnv { db, tree, tmpdir })
495 }
496
497 #[test]
499 #[expect(clippy::unwrap_used)]
500 fn tx_ssi_arthur_1() -> Result<(), Box<dyn std::error::Error>> {
501 let env = setup()?;
502
503 let mut tx = env.db.write_tx()?;
504 tx.insert(env.tree.inner(), "a1", 10u64.to_be_bytes());
505 tx.insert(env.tree.inner(), "b1", 100u64.to_be_bytes());
506 tx.insert(env.tree.inner(), "b2", 200u64.to_be_bytes());
507 tx.commit()??;
508
509 let mut tx1 = env.db.write_tx()?;
510 let val = tx1
511 .range(&env.tree, "a".."b")
512 .map(|kv| {
513 let v = kv.value().unwrap();
514
515 let mut buf = [0u8; 8];
516 buf.copy_from_slice(&v);
517 u64::from_be_bytes(buf)
518 })
519 .sum::<u64>();
520 tx1.insert(env.tree.inner(), "b3", 10u64.to_be_bytes());
521 assert_eq!(10, val);
522
523 let mut tx2 = env.db.write_tx()?;
524 let val = tx2
525 .range(&env.tree, "b".."c")
526 .map(|kv| {
527 let v = kv.value().unwrap();
528
529 let mut buf = [0u8; 8];
530 buf.copy_from_slice(&v);
531 u64::from_be_bytes(buf)
532 })
533 .sum::<u64>();
534 tx2.insert(env.tree.inner(), "a3", 300u64.to_be_bytes());
535 assert_eq!(300, val);
536 tx2.commit()??;
537 assert!(matches!(tx1.commit()?, Err(Conflict)));
538
539 let tx3 = env.db.write_tx()?;
540 let val = tx3
541 .iter(&env.tree)
542 .filter_map(|kv| {
543 let (k, v) = kv.into_inner().unwrap();
544
545 if k.starts_with(b"a") {
546 let mut buf = [0u8; 8];
547 buf.copy_from_slice(&v);
548 Some(u64::from_be_bytes(buf))
549 } else {
550 None
551 }
552 })
553 .sum::<u64>();
554 assert_eq!(310, val);
555
556 Ok(())
557 }
558
559 #[test]
561 #[expect(clippy::unwrap_used)]
562 fn tx_ssi_arthur_2() -> Result<(), Box<dyn std::error::Error>> {
563 let env = setup()?;
564
565 let mut tx = env.db.write_tx()?;
566 tx.insert(env.tree.inner(), "b1", 100u64.to_be_bytes());
567 tx.insert(env.tree.inner(), "b2", 200u64.to_be_bytes());
568 tx.commit()??;
569
570 let mut tx1 = env.db.write_tx()?;
571 let val = tx1
572 .range(&env.tree, "a".."b")
573 .map(|kv| {
574 let v = kv.value().unwrap();
575
576 let mut buf = [0u8; 8];
577 buf.copy_from_slice(&v);
578 u64::from_be_bytes(buf)
579 })
580 .sum::<u64>();
581 tx1.insert(env.tree.inner(), "b3", 0u64.to_be_bytes());
582 assert_eq!(0, val);
583
584 let mut tx2 = env.db.write_tx()?;
585 let val = tx2
586 .range(&env.tree, "b".."c")
587 .map(|kv| {
588 let v = kv.value().unwrap();
589
590 let mut buf = [0u8; 8];
591 buf.copy_from_slice(&v);
592 u64::from_be_bytes(buf)
593 })
594 .sum::<u64>();
595 tx2.insert(env.tree.inner(), "a3", 300u64.to_be_bytes());
596 assert_eq!(300, val);
597 tx2.commit()??;
598 assert!(matches!(tx1.commit()?, Err(Conflict)));
599
600 let tx3 = env.db.write_tx()?;
601 let val = tx3
602 .iter(&env.tree)
603 .filter_map(|kv| {
604 let (k, v) = kv.into_inner().unwrap();
605
606 if k.starts_with(b"a") {
607 let mut buf = [0u8; 8];
608 buf.copy_from_slice(&v);
609 Some(u64::from_be_bytes(buf))
610 } else {
611 None
612 }
613 })
614 .sum::<u64>();
615 assert_eq!(300, val);
616
617 Ok(())
618 }
619
620 #[test]
621 fn tx_ssi_basic() -> Result<(), Box<dyn std::error::Error>> {
622 let env = setup()?;
623
624 let mut tx1 = env.db.write_tx()?;
625 let mut tx2 = env.db.write_tx()?;
626
627 tx1.insert(env.tree.inner(), "hello", "world");
628
629 tx1.commit()??;
630 assert!(env.tree.contains_key("hello")?);
631
632 assert_eq!(tx2.get(env.tree.inner(), "hello")?, None);
633
634 tx2.insert(env.tree.inner(), "hello", "world2");
635 assert!(matches!(tx2.commit()?, Err(Conflict)));
636
637 let mut tx1 = env.db.write_tx()?;
638 let mut tx2 = env.db.write_tx()?;
639
640 tx1.iter(&env.tree).next();
641 tx2.insert(env.tree.inner(), "hello", "world2");
642
643 tx1.insert(env.tree.inner(), "hello2", "world1");
644 tx1.commit()??;
645
646 tx2.commit()??;
647
648 Ok(())
649 }
650
651 #[test]
652 #[expect(clippy::unwrap_used)]
653 fn tx_ssi_ww() -> Result<(), Box<dyn std::error::Error>> {
654 let env = setup()?;
656
657 let mut tx1 = env.db.write_tx()?;
658 let mut tx2 = env.db.write_tx()?;
659
660 tx1.insert(env.tree.inner(), "a", "a");
661 tx2.insert(env.tree.inner(), "b", "c");
662 tx1.insert(env.tree.inner(), "b", "b");
663 tx1.commit()??;
664
665 tx2.insert(env.tree.inner(), "a", "c");
666
667 tx2.commit()??;
668 assert_eq!(b"c", &*env.tree.get("a")?.unwrap());
669 assert_eq!(b"c", &*env.tree.get("b")?.unwrap());
670
671 Ok(())
672 }
673
674 #[test]
675 #[expect(clippy::unwrap_used)]
676 fn tx_ssi_swap() -> Result<(), Box<dyn std::error::Error>> {
677 let env = setup()?;
678
679 env.tree.insert("x", "x")?;
680 env.tree.insert("y", "y")?;
681
682 let mut tx1 = env.db.write_tx()?;
683 let mut tx2 = env.db.write_tx()?;
684
685 {
686 let x = tx1.get(env.tree.inner(), "x")?.unwrap();
687 tx1.insert(env.tree.inner(), "y", x);
688 }
689
690 {
691 let y = tx2.get(env.tree.inner(), "y")?.unwrap();
692 tx2.insert(env.tree.inner(), "x", y);
693 }
694
695 tx1.commit()??;
696 assert!(matches!(tx2.commit()?, Err(Conflict)));
697
698 Ok(())
699 }
700
701 #[test]
702 fn tx_ssi_write_cycles() -> Result<(), Box<dyn std::error::Error>> {
703 let env = setup()?;
704 env.seed_hermitage_data()?;
705
706 let mut t1 = env.db.write_tx()?;
707 let mut t2 = env.db.write_tx()?;
708
709 t1.insert(env.tree.inner(), [1u8], [11u8]);
710 t2.insert(env.tree.inner(), [1u8], [12u8]);
711 t1.insert(env.tree.inner(), [2u8], [21u8]);
712 t1.commit()??;
713
714 assert_eq!(env.tree.get([1u8])?, Some([11u8].into()));
715
716 t2.insert(env.tree.inner(), [2u8], [22u8]);
717 t2.commit()??;
718
719 assert_eq!(env.tree.get([1u8])?, Some([12u8].into()));
720 assert_eq!(env.tree.get([2u8])?, Some([22u8].into()));
721
722 Ok(())
723 }
724
725 #[test]
726 fn tx_ssi_aborted_reads() -> Result<(), Box<dyn std::error::Error>> {
727 let env = setup()?;
728 env.seed_hermitage_data()?;
729
730 let mut t1 = env.db.write_tx()?;
731 let t2 = env.db.write_tx()?;
732
733 t1.insert(env.tree.inner(), [1u8], [101u8]);
734
735 assert_eq!(t2.get(env.tree.inner(), [1u8])?, Some([10u8].into()));
736
737 t1.rollback();
738
739 assert_eq!(t2.get(env.tree.inner(), [1u8])?, Some([10u8].into()));
740
741 t2.commit()??;
742
743 Ok(())
744 }
745
746 #[expect(clippy::unwrap_used)]
747 #[test]
748 fn tx_ssi_anti_dependency_cycles() -> Result<(), Box<dyn std::error::Error>> {
749 let env = setup()?;
750 env.seed_hermitage_data()?;
751
752 let mut t1 = env.db.write_tx()?;
753 {
754 let mut iter = t1.iter(&env.tree);
755 assert_eq!(
756 iter.next().unwrap().into_inner()?,
757 ([1u8].into(), [10u8].into()),
758 );
759 assert_eq!(
760 iter.next().unwrap().into_inner()?,
761 ([2u8].into(), [20u8].into()),
762 );
763 assert!(iter.next().is_none());
764 }
765
766 let mut t2 = env.db.write_tx()?;
767 let new = t2.update_fetch(&env.tree, [2u8], |v| {
768 v.and_then(|v| v.first().copied()).map(|v| [v + 5].into())
769 })?;
770 assert_eq!(new, Some([25u8].into()));
771 t2.commit()??;
772
773 let t3 = env.db.write_tx()?;
774 {
775 let mut iter = t3.iter(&env.tree);
776 assert_eq!(
777 iter.next().unwrap().into_inner()?,
778 ([1u8].into(), [10u8].into()),
779 );
780 assert_eq!(
781 iter.next().unwrap().into_inner()?,
782 ([2u8].into(), [25u8].into()),
783 ); assert!(iter.next().is_none());
785 }
786
787 t3.commit()??;
788
789 t1.insert(env.tree.inner(), [1u8], [0u8]);
790
791 assert!(matches!(t1.commit()?, Err(Conflict)));
792
793 Ok(())
794 }
795
796 #[test]
797 fn tx_ssi_update_fetch_update() -> Result<(), Box<dyn std::error::Error>> {
798 let env = setup()?;
799
800 let mut t1 = env.db.write_tx()?;
801 let mut t2 = env.db.write_tx()?;
802
803 let new = t1.update_fetch(env.tree.inner(), "hello", |_| Some("world".into()))?;
804 assert_eq!(new, Some("world".into()));
805 let old = t2.fetch_update(env.tree.inner(), "hello", |_| Some("world2".into()))?;
806 assert_eq!(old, None);
807
808 t1.commit()??;
809 assert!(matches!(t2.commit()?, Err(Conflict)));
810
811 assert_eq!(env.tree.get("hello")?, Some("world".into()));
812
813 let mut t1 = env.db.write_tx()?;
814 let mut t2 = env.db.write_tx()?;
815
816 let old = t1.fetch_update(env.tree.inner(), "hello", |_| Some("world3".into()))?;
817 assert_eq!(old, Some("world".into()));
818 let new = t2.update_fetch(env.tree.inner(), "hello2", |_| Some("world2".into()))?;
819 assert_eq!(new, Some("world2".into()));
820
821 t1.commit()??;
822 t2.commit()??;
823
824 assert_eq!(env.tree.get("hello")?, Some("world3".into()));
825 assert_eq!(env.tree.get("hello2")?, Some("world2".into()));
826
827 Ok(())
828 }
829
830 #[test]
831 fn tx_ssi_range() -> Result<(), Box<dyn std::error::Error>> {
832 let env = setup()?;
833
834 let mut t1 = env.db.write_tx()?;
835 let mut t2 = env.db.write_tx()?;
836
837 _ = t1.range(&env.tree, "h"..="hello");
838 t1.insert(env.tree.inner(), "foo", "bar");
839
840 t2.insert(env.tree.inner(), "hello", "world");
842
843 t2.commit()??;
844 assert!(matches!(t1.commit()?, Err(Conflict)));
845
846 let mut t1 = env.db.write_tx()?;
847 let mut t2 = env.db.write_tx()?;
848
849 _ = t1.range(&env.tree, "h"..="hello");
850 t1.insert(env.tree.inner(), "foo", "bar");
851
852 t2.insert(env.tree.inner(), "hello2", "world");
854
855 t2.commit()??;
856 t1.commit()??;
857
858 Ok(())
859 }
860
861 #[test]
862 fn tx_ssi_prefix() -> Result<(), Box<dyn std::error::Error>> {
863 let env = setup()?;
864
865 let mut t1 = env.db.write_tx()?;
866 let mut t2 = env.db.write_tx()?;
867
868 _ = t1.prefix(&env.tree, "hello");
869 t1.insert(env.tree.inner(), "foo", "bar");
870
871 t2.insert(env.tree.inner(), "hello", "world");
873
874 t2.commit()??;
875 assert!(matches!(t1.commit()?, Err(Conflict)));
876
877 let mut t1 = env.db.write_tx()?;
878 let mut t2 = env.db.write_tx()?;
879
880 _ = t1.prefix(&env.tree, "hello");
881 t1.insert(env.tree.inner(), "foo", "bar");
882
883 t2.insert(env.tree.inner(), "foobar", "world");
885
886 t2.commit()??;
887 t1.commit()??;
888
889 Ok(())
890 }
891}