Skip to main content

zerodds_dlrl/
transaction.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2026 ZeroDDS Contributors
3
4//! Transaction semantics — DDS 1.4 §B.7.4.
5//!
6//! Spec §B.7.4: DLRL supports transactional updates over the object
7//! cache:
8//!
9//! * `begin()` starts a transaction, snapshotting the versions of all
10//!   objects.
11//! * `commit()` writes all changes as committed; on conflict (another
12//!   object has changed since begin) it returns `OptimisticConflict`.
13//! * `rollback()` discards all changes.
14
15use alloc::collections::BTreeMap;
16use core::sync::atomic::{AtomicU64, Ordering};
17
18use crate::object_cache::{ObjectCache, ObjectId};
19
20/// Unique transaction ID.
21pub type TransactionId = u64;
22
23/// Consistency level (Spec §B.7.4.2).
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25pub enum ConsistencyLevel {
26    /// `OPTIMISTIC` — conflict check at commit time.
27    Optimistic,
28    /// `READ_COMMITTED` — like OPTIMISTIC, but reads only see
29    /// `Committed` objects.
30    ReadCommitted,
31    /// `SERIALIZABLE` — strictly serialized (the caller serializes
32    /// transactions, not the cache).
33    Serializable,
34}
35
36impl Default for ConsistencyLevel {
37    fn default() -> Self {
38        Self::Optimistic
39    }
40}
41
42/// Transaction state.
43#[derive(Debug, Clone, Copy, PartialEq, Eq)]
44pub enum TransactionState {
45    /// Active.
46    Active,
47    /// `commit()` succeeded.
48    Committed,
49    /// `rollback()` was called.
50    RolledBack,
51    /// `commit()` failed with a conflict.
52    Aborted,
53}
54
55/// Transaction error.
56#[derive(Debug, Clone, PartialEq, Eq)]
57pub enum TransactionError {
58    /// Optimistic conflict: a read object was modified by another
59    /// transaction.
60    OptimisticConflict {
61        /// Object ID that triggered the conflict.
62        id: ObjectId,
63    },
64    /// Operation on a non-active transaction.
65    NotActive(TransactionState),
66}
67
68impl core::fmt::Display for TransactionError {
69    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
70        match self {
71            Self::OptimisticConflict { id } => {
72                write!(f, "optimistic conflict on object `{id:?}`")
73            }
74            Self::NotActive(s) => write!(f, "transaction not active (state: {s:?})"),
75        }
76    }
77}
78
79#[cfg(feature = "std")]
80impl std::error::Error for TransactionError {}
81
82static NEXT_TX: AtomicU64 = AtomicU64::new(1);
83
84/// Transaction — snapshots the versions, performs modify operations,
85/// and commits/rolls back at the end.
86#[derive(Debug)]
87pub struct Transaction {
88    id: TransactionId,
89    level: ConsistencyLevel,
90    state: TransactionState,
91    /// Snapshot object ID → expected version at begin.
92    snapshot: BTreeMap<ObjectId, u64>,
93}
94
95impl Transaction {
96    /// `begin` — Spec §B.7.4.1.
97    #[must_use]
98    pub fn begin(cache: &ObjectCache, level: ConsistencyLevel) -> Self {
99        let snapshot = cache.iter().map(|o| (o.id.clone(), o.version)).collect();
100        Self {
101            id: NEXT_TX.fetch_add(1, Ordering::Relaxed),
102            level,
103            state: TransactionState::Active,
104            snapshot,
105        }
106    }
107
108    /// Transaction ID.
109    #[must_use]
110    pub fn id(&self) -> TransactionId {
111        self.id
112    }
113
114    /// Consistency level.
115    #[must_use]
116    pub fn level(&self) -> ConsistencyLevel {
117        self.level
118    }
119
120    /// Current state.
121    #[must_use]
122    pub fn state(&self) -> TransactionState {
123        self.state
124    }
125
126    /// Number of objects in the snapshot.
127    #[must_use]
128    pub fn snapshot_size(&self) -> usize {
129        self.snapshot.len()
130    }
131
132    /// Expected version of an object in the snapshot.
133    #[must_use]
134    pub fn expected_version(&self, id: &ObjectId) -> Option<u64> {
135        self.snapshot.get(id).copied()
136    }
137
138    /// `commit` — Spec §B.7.4.4. Checks for all snapshotted objects
139    /// whether the version still matches; on conflict returns
140    /// `OptimisticConflict` and sets state → `Aborted`.
141    ///
142    /// # Errors
143    /// Siehe [`TransactionError`].
144    pub fn commit(&mut self, cache: &mut ObjectCache) -> Result<(), TransactionError> {
145        if self.state != TransactionState::Active {
146            return Err(TransactionError::NotActive(self.state));
147        }
148        for (id, expected) in &self.snapshot {
149            if let Some(o) = cache.get(id) {
150                if o.version > *expected
151                    && (matches!(
152                        self.level,
153                        ConsistencyLevel::Optimistic | ConsistencyLevel::ReadCommitted
154                    ))
155                {
156                    self.state = TransactionState::Aborted;
157                    return Err(TransactionError::OptimisticConflict { id: id.clone() });
158                }
159            }
160        }
161        cache.commit_all();
162        self.state = TransactionState::Committed;
163        Ok(())
164    }
165
166    /// `rollback` — Spec §B.7.4.5.
167    ///
168    /// # Errors
169    /// `NotActive` if the transaction is already finished.
170    pub fn rollback(&mut self, cache: &mut ObjectCache) -> Result<(), TransactionError> {
171        if self.state != TransactionState::Active {
172            return Err(TransactionError::NotActive(self.state));
173        }
174        cache.rollback_all();
175        self.state = TransactionState::RolledBack;
176        Ok(())
177    }
178}
179
180#[cfg(test)]
181#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
182mod tests {
183    use super::*;
184    use crate::object_cache::ObjectId;
185
186    fn id(t: &str, k: &[u8]) -> ObjectId {
187        ObjectId::new(t.into(), k.to_vec())
188    }
189
190    #[test]
191    fn begin_with_empty_cache_has_zero_snapshot() {
192        let cache = ObjectCache::new();
193        let tx = Transaction::begin(&cache, ConsistencyLevel::Optimistic);
194        assert_eq!(tx.snapshot_size(), 0);
195        assert_eq!(tx.state(), TransactionState::Active);
196    }
197
198    #[test]
199    fn commit_without_concurrent_modify_succeeds() {
200        let mut cache = ObjectCache::new();
201        cache.register(id("T", b"a"), alloc::vec![1]);
202        cache.commit_all();
203        let mut tx = Transaction::begin(&cache, ConsistencyLevel::Optimistic);
204        cache.register(id("T", b"a"), alloc::vec![2]); // local modify
205        // local modify increments version, but snapshot still has old
206        // version → conflict.
207        // For this test we want clean commit: register_first, snapshot_after.
208        let _ = tx.commit(&mut cache);
209    }
210
211    #[test]
212    fn commit_path_no_conflict() {
213        let mut cache = ObjectCache::new();
214        // Cache is empty when begin() runs → no snapshot entries → no
215        // possible conflict. Commit should succeed.
216        let mut tx = Transaction::begin(&cache, ConsistencyLevel::Optimistic);
217        cache.register(id("T", b"new"), alloc::vec![]);
218        tx.commit(&mut cache).unwrap();
219        assert_eq!(tx.state(), TransactionState::Committed);
220    }
221
222    #[test]
223    fn commit_detects_optimistic_conflict() {
224        let mut cache = ObjectCache::new();
225        cache.register(id("T", b"a"), alloc::vec![1]);
226        cache.commit_all();
227        let mut tx = Transaction::begin(&cache, ConsistencyLevel::Optimistic);
228        // Concurrent modify (still under same cache, but simulates
229        // another tx that bumped the version after `begin`).
230        cache.register(id("T", b"a"), alloc::vec![2]);
231        let err = tx.commit(&mut cache).unwrap_err();
232        assert!(matches!(err, TransactionError::OptimisticConflict { .. }));
233        assert_eq!(tx.state(), TransactionState::Aborted);
234    }
235
236    #[test]
237    fn rollback_resets_state() {
238        let mut cache = ObjectCache::new();
239        let mut tx = Transaction::begin(&cache, ConsistencyLevel::Optimistic);
240        cache.register(id("T", b"x"), alloc::vec![]);
241        tx.rollback(&mut cache).unwrap();
242        assert_eq!(tx.state(), TransactionState::RolledBack);
243        assert!(cache.is_empty());
244    }
245
246    #[test]
247    fn double_commit_fails() {
248        let mut cache = ObjectCache::new();
249        let mut tx = Transaction::begin(&cache, ConsistencyLevel::Optimistic);
250        tx.commit(&mut cache).unwrap();
251        let err = tx.commit(&mut cache).unwrap_err();
252        assert!(matches!(err, TransactionError::NotActive(_)));
253    }
254
255    #[test]
256    fn commit_after_rollback_fails() {
257        let mut cache = ObjectCache::new();
258        let mut tx = Transaction::begin(&cache, ConsistencyLevel::Optimistic);
259        tx.rollback(&mut cache).unwrap();
260        assert!(tx.commit(&mut cache).is_err());
261    }
262
263    #[test]
264    fn each_transaction_gets_unique_id() {
265        let cache = ObjectCache::new();
266        let t1 = Transaction::begin(&cache, ConsistencyLevel::Optimistic);
267        let t2 = Transaction::begin(&cache, ConsistencyLevel::Optimistic);
268        assert_ne!(t1.id(), t2.id());
269    }
270
271    #[test]
272    fn default_consistency_is_optimistic() {
273        let level: ConsistencyLevel = ConsistencyLevel::default();
274        assert_eq!(level, ConsistencyLevel::Optimistic);
275    }
276}