zerodds_dlrl/
transaction.rs1use alloc::collections::BTreeMap;
16use core::sync::atomic::{AtomicU64, Ordering};
17
18use crate::object_cache::{ObjectCache, ObjectId};
19
20pub type TransactionId = u64;
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25pub enum ConsistencyLevel {
26 Optimistic,
28 ReadCommitted,
31 Serializable,
34}
35
36impl Default for ConsistencyLevel {
37 fn default() -> Self {
38 Self::Optimistic
39 }
40}
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq)]
44pub enum TransactionState {
45 Active,
47 Committed,
49 RolledBack,
51 Aborted,
53}
54
55#[derive(Debug, Clone, PartialEq, Eq)]
57pub enum TransactionError {
58 OptimisticConflict {
61 id: ObjectId,
63 },
64 NotActive(TransactionState),
66}
67
68impl core::fmt::Display for TransactionError {
69 fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
70 match self {
71 Self::OptimisticConflict { id } => {
72 write!(f, "optimistic conflict on object `{id:?}`")
73 }
74 Self::NotActive(s) => write!(f, "transaction not active (state: {s:?})"),
75 }
76 }
77}
78
79#[cfg(feature = "std")]
80impl std::error::Error for TransactionError {}
81
82static NEXT_TX: AtomicU64 = AtomicU64::new(1);
83
84#[derive(Debug)]
87pub struct Transaction {
88 id: TransactionId,
89 level: ConsistencyLevel,
90 state: TransactionState,
91 snapshot: BTreeMap<ObjectId, u64>,
93}
94
95impl Transaction {
96 #[must_use]
98 pub fn begin(cache: &ObjectCache, level: ConsistencyLevel) -> Self {
99 let snapshot = cache.iter().map(|o| (o.id.clone(), o.version)).collect();
100 Self {
101 id: NEXT_TX.fetch_add(1, Ordering::Relaxed),
102 level,
103 state: TransactionState::Active,
104 snapshot,
105 }
106 }
107
108 #[must_use]
110 pub fn id(&self) -> TransactionId {
111 self.id
112 }
113
114 #[must_use]
116 pub fn level(&self) -> ConsistencyLevel {
117 self.level
118 }
119
120 #[must_use]
122 pub fn state(&self) -> TransactionState {
123 self.state
124 }
125
126 #[must_use]
128 pub fn snapshot_size(&self) -> usize {
129 self.snapshot.len()
130 }
131
132 #[must_use]
134 pub fn expected_version(&self, id: &ObjectId) -> Option<u64> {
135 self.snapshot.get(id).copied()
136 }
137
138 pub fn commit(&mut self, cache: &mut ObjectCache) -> Result<(), TransactionError> {
145 if self.state != TransactionState::Active {
146 return Err(TransactionError::NotActive(self.state));
147 }
148 for (id, expected) in &self.snapshot {
149 if let Some(o) = cache.get(id) {
150 if o.version > *expected
151 && (matches!(
152 self.level,
153 ConsistencyLevel::Optimistic | ConsistencyLevel::ReadCommitted
154 ))
155 {
156 self.state = TransactionState::Aborted;
157 return Err(TransactionError::OptimisticConflict { id: id.clone() });
158 }
159 }
160 }
161 cache.commit_all();
162 self.state = TransactionState::Committed;
163 Ok(())
164 }
165
166 pub fn rollback(&mut self, cache: &mut ObjectCache) -> Result<(), TransactionError> {
171 if self.state != TransactionState::Active {
172 return Err(TransactionError::NotActive(self.state));
173 }
174 cache.rollback_all();
175 self.state = TransactionState::RolledBack;
176 Ok(())
177 }
178}
179
180#[cfg(test)]
181#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
182mod tests {
183 use super::*;
184 use crate::object_cache::ObjectId;
185
186 fn id(t: &str, k: &[u8]) -> ObjectId {
187 ObjectId::new(t.into(), k.to_vec())
188 }
189
190 #[test]
191 fn begin_with_empty_cache_has_zero_snapshot() {
192 let cache = ObjectCache::new();
193 let tx = Transaction::begin(&cache, ConsistencyLevel::Optimistic);
194 assert_eq!(tx.snapshot_size(), 0);
195 assert_eq!(tx.state(), TransactionState::Active);
196 }
197
198 #[test]
199 fn commit_without_concurrent_modify_succeeds() {
200 let mut cache = ObjectCache::new();
201 cache.register(id("T", b"a"), alloc::vec![1]);
202 cache.commit_all();
203 let mut tx = Transaction::begin(&cache, ConsistencyLevel::Optimistic);
204 cache.register(id("T", b"a"), alloc::vec![2]); let _ = tx.commit(&mut cache);
209 }
210
211 #[test]
212 fn commit_path_no_conflict() {
213 let mut cache = ObjectCache::new();
214 let mut tx = Transaction::begin(&cache, ConsistencyLevel::Optimistic);
217 cache.register(id("T", b"new"), alloc::vec![]);
218 tx.commit(&mut cache).unwrap();
219 assert_eq!(tx.state(), TransactionState::Committed);
220 }
221
222 #[test]
223 fn commit_detects_optimistic_conflict() {
224 let mut cache = ObjectCache::new();
225 cache.register(id("T", b"a"), alloc::vec![1]);
226 cache.commit_all();
227 let mut tx = Transaction::begin(&cache, ConsistencyLevel::Optimistic);
228 cache.register(id("T", b"a"), alloc::vec![2]);
231 let err = tx.commit(&mut cache).unwrap_err();
232 assert!(matches!(err, TransactionError::OptimisticConflict { .. }));
233 assert_eq!(tx.state(), TransactionState::Aborted);
234 }
235
236 #[test]
237 fn rollback_resets_state() {
238 let mut cache = ObjectCache::new();
239 let mut tx = Transaction::begin(&cache, ConsistencyLevel::Optimistic);
240 cache.register(id("T", b"x"), alloc::vec![]);
241 tx.rollback(&mut cache).unwrap();
242 assert_eq!(tx.state(), TransactionState::RolledBack);
243 assert!(cache.is_empty());
244 }
245
246 #[test]
247 fn double_commit_fails() {
248 let mut cache = ObjectCache::new();
249 let mut tx = Transaction::begin(&cache, ConsistencyLevel::Optimistic);
250 tx.commit(&mut cache).unwrap();
251 let err = tx.commit(&mut cache).unwrap_err();
252 assert!(matches!(err, TransactionError::NotActive(_)));
253 }
254
255 #[test]
256 fn commit_after_rollback_fails() {
257 let mut cache = ObjectCache::new();
258 let mut tx = Transaction::begin(&cache, ConsistencyLevel::Optimistic);
259 tx.rollback(&mut cache).unwrap();
260 assert!(tx.commit(&mut cache).is_err());
261 }
262
263 #[test]
264 fn each_transaction_gets_unique_id() {
265 let cache = ObjectCache::new();
266 let t1 = Transaction::begin(&cache, ConsistencyLevel::Optimistic);
267 let t2 = Transaction::begin(&cache, ConsistencyLevel::Optimistic);
268 assert_ne!(t1.id(), t2.id());
269 }
270
271 #[test]
272 fn default_consistency_is_optimistic() {
273 let level: ConsistencyLevel = ConsistencyLevel::default();
274 assert_eq!(level, ConsistencyLevel::Optimistic);
275 }
276}