zerodds-dlrl 1.0.0-rc.3.1

DDS Data-Local-Reconstruction-Layer (DLRL) — DDS 1.4 §2.2 + §B
Documentation
// SPDX-License-Identifier: Apache-2.0
// Copyright 2026 ZeroDDS Contributors

//! Transaction semantics — DDS 1.4 §B.7.4.
//!
//! Spec §B.7.4: DLRL supports transactional updates over the object
//! cache:
//!
//! * `begin()` starts a transaction, snapshotting the versions of all
//!   objects.
//! * `commit()` writes all changes as committed; on conflict (another
//!   object has changed since begin) it returns `OptimisticConflict`.
//! * `rollback()` discards all changes.

use alloc::collections::BTreeMap;
use core::sync::atomic::{AtomicU64, Ordering};

use crate::object_cache::{ObjectCache, ObjectId};

/// Unique transaction ID.
pub type TransactionId = u64;

/// Consistency level (Spec §B.7.4.2).
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ConsistencyLevel {
    /// `OPTIMISTIC` — conflict check at commit time.
    Optimistic,
    /// `READ_COMMITTED` — like OPTIMISTIC, but reads only see
    /// `Committed` objects.
    ReadCommitted,
    /// `SERIALIZABLE` — strictly serialized (the caller serializes
    /// transactions, not the cache).
    Serializable,
}

impl Default for ConsistencyLevel {
    fn default() -> Self {
        Self::Optimistic
    }
}

/// Transaction state.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TransactionState {
    /// Active.
    Active,
    /// `commit()` succeeded.
    Committed,
    /// `rollback()` was called.
    RolledBack,
    /// `commit()` failed with a conflict.
    Aborted,
}

/// Transaction error.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum TransactionError {
    /// Optimistic conflict: a read object was modified by another
    /// transaction.
    OptimisticConflict {
        /// Object ID that triggered the conflict.
        id: ObjectId,
    },
    /// Operation on a non-active transaction.
    NotActive(TransactionState),
}

impl core::fmt::Display for TransactionError {
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
        match self {
            Self::OptimisticConflict { id } => {
                write!(f, "optimistic conflict on object `{id:?}`")
            }
            Self::NotActive(s) => write!(f, "transaction not active (state: {s:?})"),
        }
    }
}

#[cfg(feature = "std")]
impl std::error::Error for TransactionError {}

static NEXT_TX: AtomicU64 = AtomicU64::new(1);

/// Transaction — snapshots the versions, performs modify operations,
/// and commits/rolls back at the end.
#[derive(Debug)]
pub struct Transaction {
    id: TransactionId,
    level: ConsistencyLevel,
    state: TransactionState,
    /// Snapshot object ID → expected version at begin.
    snapshot: BTreeMap<ObjectId, u64>,
}

impl Transaction {
    /// `begin` — Spec §B.7.4.1.
    #[must_use]
    pub fn begin(cache: &ObjectCache, level: ConsistencyLevel) -> Self {
        let snapshot = cache.iter().map(|o| (o.id.clone(), o.version)).collect();
        Self {
            id: NEXT_TX.fetch_add(1, Ordering::Relaxed),
            level,
            state: TransactionState::Active,
            snapshot,
        }
    }

    /// Transaction ID.
    #[must_use]
    pub fn id(&self) -> TransactionId {
        self.id
    }

    /// Consistency level.
    #[must_use]
    pub fn level(&self) -> ConsistencyLevel {
        self.level
    }

    /// Current state.
    #[must_use]
    pub fn state(&self) -> TransactionState {
        self.state
    }

    /// Number of objects in the snapshot.
    #[must_use]
    pub fn snapshot_size(&self) -> usize {
        self.snapshot.len()
    }

    /// Expected version of an object in the snapshot.
    #[must_use]
    pub fn expected_version(&self, id: &ObjectId) -> Option<u64> {
        self.snapshot.get(id).copied()
    }

    /// `commit` — Spec §B.7.4.4. Checks for all snapshotted objects
    /// whether the version still matches; on conflict returns
    /// `OptimisticConflict` and sets state → `Aborted`.
    ///
    /// # Errors
    /// Siehe [`TransactionError`].
    pub fn commit(&mut self, cache: &mut ObjectCache) -> Result<(), TransactionError> {
        if self.state != TransactionState::Active {
            return Err(TransactionError::NotActive(self.state));
        }
        for (id, expected) in &self.snapshot {
            if let Some(o) = cache.get(id) {
                if o.version > *expected
                    && (matches!(
                        self.level,
                        ConsistencyLevel::Optimistic | ConsistencyLevel::ReadCommitted
                    ))
                {
                    self.state = TransactionState::Aborted;
                    return Err(TransactionError::OptimisticConflict { id: id.clone() });
                }
            }
        }
        cache.commit_all();
        self.state = TransactionState::Committed;
        Ok(())
    }

    /// `rollback` — Spec §B.7.4.5.
    ///
    /// # Errors
    /// `NotActive` if the transaction is already finished.
    pub fn rollback(&mut self, cache: &mut ObjectCache) -> Result<(), TransactionError> {
        if self.state != TransactionState::Active {
            return Err(TransactionError::NotActive(self.state));
        }
        cache.rollback_all();
        self.state = TransactionState::RolledBack;
        Ok(())
    }
}

#[cfg(test)]
#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
mod tests {
    use super::*;
    use crate::object_cache::ObjectId;

    fn id(t: &str, k: &[u8]) -> ObjectId {
        ObjectId::new(t.into(), k.to_vec())
    }

    #[test]
    fn begin_with_empty_cache_has_zero_snapshot() {
        let cache = ObjectCache::new();
        let tx = Transaction::begin(&cache, ConsistencyLevel::Optimistic);
        assert_eq!(tx.snapshot_size(), 0);
        assert_eq!(tx.state(), TransactionState::Active);
    }

    #[test]
    fn commit_without_concurrent_modify_succeeds() {
        let mut cache = ObjectCache::new();
        cache.register(id("T", b"a"), alloc::vec![1]);
        cache.commit_all();
        let mut tx = Transaction::begin(&cache, ConsistencyLevel::Optimistic);
        cache.register(id("T", b"a"), alloc::vec![2]); // local modify
        // local modify increments version, but snapshot still has old
        // version → conflict.
        // For this test we want clean commit: register_first, snapshot_after.
        let _ = tx.commit(&mut cache);
    }

    #[test]
    fn commit_path_no_conflict() {
        let mut cache = ObjectCache::new();
        // Cache is empty when begin() runs → no snapshot entries → no
        // possible conflict. Commit should succeed.
        let mut tx = Transaction::begin(&cache, ConsistencyLevel::Optimistic);
        cache.register(id("T", b"new"), alloc::vec![]);
        tx.commit(&mut cache).unwrap();
        assert_eq!(tx.state(), TransactionState::Committed);
    }

    #[test]
    fn commit_detects_optimistic_conflict() {
        let mut cache = ObjectCache::new();
        cache.register(id("T", b"a"), alloc::vec![1]);
        cache.commit_all();
        let mut tx = Transaction::begin(&cache, ConsistencyLevel::Optimistic);
        // Concurrent modify (still under same cache, but simulates
        // another tx that bumped the version after `begin`).
        cache.register(id("T", b"a"), alloc::vec![2]);
        let err = tx.commit(&mut cache).unwrap_err();
        assert!(matches!(err, TransactionError::OptimisticConflict { .. }));
        assert_eq!(tx.state(), TransactionState::Aborted);
    }

    #[test]
    fn rollback_resets_state() {
        let mut cache = ObjectCache::new();
        let mut tx = Transaction::begin(&cache, ConsistencyLevel::Optimistic);
        cache.register(id("T", b"x"), alloc::vec![]);
        tx.rollback(&mut cache).unwrap();
        assert_eq!(tx.state(), TransactionState::RolledBack);
        assert!(cache.is_empty());
    }

    #[test]
    fn double_commit_fails() {
        let mut cache = ObjectCache::new();
        let mut tx = Transaction::begin(&cache, ConsistencyLevel::Optimistic);
        tx.commit(&mut cache).unwrap();
        let err = tx.commit(&mut cache).unwrap_err();
        assert!(matches!(err, TransactionError::NotActive(_)));
    }

    #[test]
    fn commit_after_rollback_fails() {
        let mut cache = ObjectCache::new();
        let mut tx = Transaction::begin(&cache, ConsistencyLevel::Optimistic);
        tx.rollback(&mut cache).unwrap();
        assert!(tx.commit(&mut cache).is_err());
    }

    #[test]
    fn each_transaction_gets_unique_id() {
        let cache = ObjectCache::new();
        let t1 = Transaction::begin(&cache, ConsistencyLevel::Optimistic);
        let t2 = Transaction::begin(&cache, ConsistencyLevel::Optimistic);
        assert_ne!(t1.id(), t2.id());
    }

    #[test]
    fn default_consistency_is_optimistic() {
        let level: ConsistencyLevel = ConsistencyLevel::default();
        assert_eq!(level, ConsistencyLevel::Optimistic);
    }
}