Skip to main content

sim_lib_stream_combinators/
cell.rs

1use std::sync::Mutex;
2
3use sim_kernel::{Error, Result};
4
5/// Immutable snapshot of a [`StreamCell`] value at a known version.
6///
7/// A snapshot pairs the cloned cell value with the version counter that was
8/// current when it was read, so callers can later attempt a compare-and-set
9/// against that version.
10#[derive(Clone, Debug, PartialEq, Eq)]
11pub struct CellSnapshot<T> {
12    /// The cell value as observed at this snapshot.
13    pub value: T,
14    /// The cell version counter at the time of the read.
15    pub version: u64,
16}
17
18/// Versioned, thread-safe cell holding the latest value of a stream.
19///
20/// A `StreamCell` collapses a stream to its most recent value with optimistic
21/// concurrency: each successful write bumps a version counter, and writers must
22/// present the version they observed to commit. It is the shared "current
23/// value" handoff point between a producing stream and its consumers.
24///
25/// # Examples
26///
27/// ```
28/// use sim_lib_stream_combinators::stream_cell;
29///
30/// let cell = stream_cell("first".to_owned());
31/// let snapshot = cell.get().unwrap();
32/// assert_eq!(snapshot.version, 0);
33///
34/// let next = cell.set("second".to_owned(), snapshot.version).unwrap();
35/// assert_eq!(next, 1);
36///
37/// // A stale version is rejected.
38/// assert!(cell.set("third".to_owned(), snapshot.version).is_err());
39/// assert_eq!(cell.get().unwrap().value, "second");
40/// ```
41pub struct StreamCell<T> {
42    state: Mutex<CellState<T>>,
43}
44
45struct CellState<T> {
46    value: T,
47    version: u64,
48}
49
50/// Builds a [`StreamCell`] holding `value` at version `0`.
51pub fn stream_cell<T>(value: T) -> StreamCell<T> {
52    StreamCell::new(value)
53}
54
55impl<T> StreamCell<T> {
56    /// Creates a cell holding `value` at version `0`.
57    pub fn new(value: T) -> Self {
58        Self {
59            state: Mutex::new(CellState { value, version: 0 }),
60        }
61    }
62}
63
64impl<T: Clone> StreamCell<T> {
65    /// Reads a versioned snapshot of the current cell value.
66    pub fn get(&self) -> Result<CellSnapshot<T>> {
67        let state = self
68            .state
69            .lock()
70            .map_err(|_| Error::PoisonedLock("stream cell"))?;
71        Ok(CellSnapshot {
72            value: state.value.clone(),
73            version: state.version,
74        })
75    }
76
77    /// Writes `value` only if `expected_version` matches the current version.
78    ///
79    /// On success the version is bumped and the new version is returned; a
80    /// mismatch leaves the cell untouched and returns an error.
81    pub fn set(&self, value: T, expected_version: u64) -> Result<u64> {
82        let mut state = self
83            .state
84            .lock()
85            .map_err(|_| Error::PoisonedLock("stream cell"))?;
86        if state.version != expected_version {
87            return Err(Error::Eval(format!(
88                "stale stream cell version: expected {expected_version}, current {}",
89                state.version
90            )));
91        }
92        state.value = value;
93        state.version = state.version.saturating_add(1);
94        Ok(state.version)
95    }
96}