sim_lib_stream_combinators/cell.rs
1use std::sync::Mutex;
2
3use sim_kernel::{Error, Result};
4
5/// Immutable snapshot of a [`StreamCell`] value at a known version.
6///
7/// A snapshot pairs the cloned cell value with the version counter that was
8/// current when it was read, so callers can later attempt a compare-and-set
9/// against that version.
10#[derive(Clone, Debug, PartialEq, Eq)]
11pub struct CellSnapshot<T> {
12 /// The cell value as observed at this snapshot.
13 pub value: T,
14 /// The cell version counter at the time of the read.
15 pub version: u64,
16}
17
18/// Versioned, thread-safe cell holding the latest value of a stream.
19///
20/// A `StreamCell` collapses a stream to its most recent value with optimistic
21/// concurrency: each successful write bumps a version counter, and writers must
22/// present the version they observed to commit. It is the shared "current
23/// value" handoff point between a producing stream and its consumers.
24///
25/// # Examples
26///
27/// ```
28/// use sim_lib_stream_combinators::stream_cell;
29///
30/// let cell = stream_cell("first".to_owned());
31/// let snapshot = cell.get().unwrap();
32/// assert_eq!(snapshot.version, 0);
33///
34/// let next = cell.set("second".to_owned(), snapshot.version).unwrap();
35/// assert_eq!(next, 1);
36///
37/// // A stale version is rejected.
38/// assert!(cell.set("third".to_owned(), snapshot.version).is_err());
39/// assert_eq!(cell.get().unwrap().value, "second");
40/// ```
41pub struct StreamCell<T> {
42 state: Mutex<CellState<T>>,
43}
44
45struct CellState<T> {
46 value: T,
47 version: u64,
48}
49
50/// Builds a [`StreamCell`] holding `value` at version `0`.
51pub fn stream_cell<T>(value: T) -> StreamCell<T> {
52 StreamCell::new(value)
53}
54
55impl<T> StreamCell<T> {
56 /// Creates a cell holding `value` at version `0`.
57 pub fn new(value: T) -> Self {
58 Self {
59 state: Mutex::new(CellState { value, version: 0 }),
60 }
61 }
62}
63
64impl<T: Clone> StreamCell<T> {
65 /// Reads a versioned snapshot of the current cell value.
66 pub fn get(&self) -> Result<CellSnapshot<T>> {
67 let state = self
68 .state
69 .lock()
70 .map_err(|_| Error::PoisonedLock("stream cell"))?;
71 Ok(CellSnapshot {
72 value: state.value.clone(),
73 version: state.version,
74 })
75 }
76
77 /// Writes `value` only if `expected_version` matches the current version.
78 ///
79 /// On success the version is bumped and the new version is returned; a
80 /// mismatch leaves the cell untouched and returns an error.
81 pub fn set(&self, value: T, expected_version: u64) -> Result<u64> {
82 let mut state = self
83 .state
84 .lock()
85 .map_err(|_| Error::PoisonedLock("stream cell"))?;
86 if state.version != expected_version {
87 return Err(Error::Eval(format!(
88 "stale stream cell version: expected {expected_version}, current {}",
89 state.version
90 )));
91 }
92 state.value = value;
93 state.version = state.version.saturating_add(1);
94 Ok(state.version)
95 }
96}