Skip to main content

liminal/durability/
store.rs

1use std::sync::Arc;
2
3use haematite::{ApiError, Event, EventStore};
4
5use super::DurabilityError;
6
7/// Entry read from a durable haematite stream.
8#[derive(Clone, Debug, PartialEq, Eq)]
9pub struct StoredEntry {
10    /// Opaque stored payload bytes.
11    pub payload: Vec<u8>,
12    /// Sequence number assigned by the stream.
13    pub sequence: u64,
14    /// Store timestamp associated with the entry.
15    pub timestamp: u64,
16}
17
18/// Direct durability surface matching haematite's append/read/cas/scan API.
19#[async_trait::async_trait]
20pub trait DurableStore: std::fmt::Debug + Send + Sync {
21    /// Appends `payload` to `stream_key` if `expected_seq` matches the stream head.
22    async fn append(
23        &self,
24        stream_key: &str,
25        payload: Vec<u8>,
26        expected_seq: u64,
27    ) -> Result<u64, DurabilityError>;
28
29    /// Reads entries from `stream_key` beginning at `offset`, up to `limit` entries.
30    async fn read_from(
31        &self,
32        stream_key: &str,
33        offset: u64,
34        limit: usize,
35    ) -> Result<Vec<StoredEntry>, DurabilityError>;
36
37    /// Atomically replaces a stored numeric value if it equals `old_value`.
38    ///
39    /// An `old_value` of `0` matches a key that is currently *absent* as well as
40    /// one explicitly stored as `0`: a fresh cursor is created on its first
41    /// checkpoint without a prior write. See [`HaematiteStore::cas`] for how this
42    /// "absent == 0" contract is preserved atomically over the real engine.
43    async fn cas(&self, key: &str, old_value: u64, new_value: u64) -> Result<(), DurabilityError>;
44
45    /// Reads a numeric value previously updated through compare-and-swap.
46    async fn read_value(&self, key: &str) -> Result<Option<u64>, DurabilityError>;
47
48    /// Scans entries by store prefix.
49    async fn scan(&self, prefix: &str) -> Result<Vec<StoredEntry>, DurabilityError>;
50
51    /// Flushes buffered writes so completed durable operations are persisted.
52    ///
53    /// # Errors
54    /// Returns [`DurabilityError`] when the underlying store cannot complete the flush.
55    async fn flush(&self) -> Result<(), DurabilityError>;
56}
57
58/// `DurableStore` implementation that delegates directly to haematite's `EventStore`.
59///
60/// The real [`EventStore`] is synchronous (every call blocks on the owning
61/// shard actor's reply), so each `async` method below completes on its first
62/// poll. The synchronous bridge in [`super::bridge`] relies on exactly that.
63#[derive(Clone, Debug)]
64pub struct HaematiteStore {
65    event_store: Arc<EventStore>,
66}
67
68impl HaematiteStore {
69    /// Wraps a haematite `EventStore` handle.
70    #[must_use]
71    pub const fn new(event_store: Arc<EventStore>) -> Self {
72        Self { event_store }
73    }
74}
75
76#[async_trait::async_trait]
77impl DurableStore for HaematiteStore {
78    async fn append(
79        &self,
80        stream_key: &str,
81        payload: Vec<u8>,
82        expected_seq: u64,
83    ) -> Result<u64, DurabilityError> {
84        // Contract bridge: liminal's `DurableStore::append` returns the *assigned
85        // event sequence* (0-based position of the just-appended event), which is
86        // exactly `expected_seq` for a single append. The real `EventStore::append`
87        // instead returns the stream's new next-sequence (`expected_seq + 1`), so
88        // subtract one to recover the assigned seq. A `0` next-seq is impossible
89        // after a successful single append, so the `checked_sub` cannot saturate
90        // silently; if it ever did the engine returned a contract-violating value.
91        let next_seq = self
92            .event_store
93            .append(stream_key.as_bytes(), &payload, expected_seq)
94            .map_err(DurabilityError::from)?;
95        next_seq.checked_sub(1).ok_or_else(|| {
96            DurabilityError::StoreError(ApiError::CorruptEvent(format!(
97                "append returned next-seq 0 for stream {stream_key}"
98            )))
99        })
100    }
101
102    async fn read_from(
103        &self,
104        stream_key: &str,
105        offset: u64,
106        limit: usize,
107    ) -> Result<Vec<StoredEntry>, DurabilityError> {
108        // The real `read_from` returns every event with seq >= offset and applies
109        // no limit; truncate to `limit` entries to honour the trait contract.
110        let mut events = self
111            .event_store
112            .read_from(stream_key.as_bytes(), offset)
113            .map_err(DurabilityError::from)?;
114        events.truncate(limit);
115        Ok(events.into_iter().map(StoredEntry::from).collect())
116    }
117
118    async fn cas(&self, key: &str, old_value: u64, new_value: u64) -> Result<(), DurabilityError> {
119        // Preserve liminal's "absent == 0" cursor contract faithfully over an
120        // engine that distinguishes `None` (absent) from `Some(0)` (a stored
121        // zero). The invariant that makes the mapping below correct: we NEVER
122        // persist a physical zero, so a logical value of 0 and physical absence
123        // always coincide.
124        //
125        // A `cas` whose target `new_value` is 0 must therefore write nothing — it
126        // only asserts the precondition. This is reachable as `cas(0, 0)` (a
127        // cursor checkpoint at offset 0; offsets are monotonic so they never CAS
128        // down to 0 from a higher value). Were we instead to let it store a
129        // physical zero, the *next* `cas(0, n)` — mapped to expect-absent `None`
130        // — would wrongly fail against the now-present key and permanently stall
131        // the cursor. Asserting via a read is race-free here precisely because no
132        // value is written, so there is no lost-update window.
133        if new_value == 0 {
134            return self
135                .event_store
136                .read_value(key.as_bytes())
137                .map_err(DurabilityError::from)?
138                .map_or(Ok(()), |stored| {
139                    Err(DurabilityError::CursorRegression {
140                        stored,
141                        attempted: old_value,
142                    })
143                });
144        }
145        // With a physical zero never stored, `old_value == 0` is exactly the
146        // expect-absent expectation. Any other `old_value` maps to `Some(_)`.
147        // This is a single CAS routed to the owning shard actor, where read,
148        // compare, and write run with no interleaving point (haematite's
149        // `ShardActor::cas`) — the engine's atomicity is preserved end to end.
150        let expected = if old_value == 0 {
151            None
152        } else {
153            Some(old_value)
154        };
155        self.event_store
156            .cas(key.as_bytes(), expected, new_value)
157            .map_err(DurabilityError::from)
158    }
159
160    async fn read_value(&self, key: &str) -> Result<Option<u64>, DurabilityError> {
161        self.event_store
162            .read_value(key.as_bytes())
163            .map_err(DurabilityError::from)
164    }
165
166    async fn scan(&self, prefix: &str) -> Result<Vec<StoredEntry>, DurabilityError> {
167        // The real `scan` predicate yields stream *metadata* (key + next_seq),
168        // not events. Liminal's contract is to return the events of every stream
169        // whose key matches `prefix`, so collect the matching stream keys, then
170        // read each stream's full event list and flatten the results.
171        let prefix_bytes = prefix.as_bytes().to_vec();
172        let matches = self
173            .event_store
174            .scan(|meta| meta.stream_key.starts_with(&prefix_bytes))
175            .map_err(DurabilityError::from)?;
176        let mut entries = Vec::new();
177        for stream in matches {
178            let events = self
179                .event_store
180                .read(&stream.stream_key)
181                .map_err(DurabilityError::from)?;
182            entries.extend(events.into_iter().map(StoredEntry::from));
183        }
184        Ok(entries)
185    }
186
187    async fn flush(&self) -> Result<(), DurabilityError> {
188        self.event_store.flush().map_err(DurabilityError::from)
189    }
190}
191
192impl From<Event> for StoredEntry {
193    fn from(event: Event) -> Self {
194        Self {
195            payload: event.payload,
196            sequence: event.seq,
197            timestamp: event.timestamp,
198        }
199    }
200}
201
202/// Maps a real-engine [`ApiError`] onto liminal's [`DurabilityError`].
203///
204/// The optimistic-concurrency variants route to their dedicated `DurabilityError`
205/// cases (`SequenceConflict`, `CursorRegression`); everything else is a
206/// store-level failure carried verbatim.
207impl From<ApiError> for DurabilityError {
208    fn from(error: ApiError) -> Self {
209        match error {
210            ApiError::SequenceConflict(conflict) => conflict.into(),
211            ApiError::CasMismatch(mismatch) => mismatch.into(),
212            other @ (ApiError::CorruptEvent(_)
213            | ApiError::Storage(_)
214            | ApiError::HistoryCompacted(_)) => Self::StoreError(other),
215        }
216    }
217}