liminal/durability/store.rs
1use std::sync::Arc;
2
3use haematite::{ApiError, Event, EventStore};
4
5use super::DurabilityError;
6
7/// Entry read from a durable haematite stream.
8#[derive(Clone, Debug, PartialEq, Eq)]
9pub struct StoredEntry {
10 /// Opaque stored payload bytes.
11 pub payload: Vec<u8>,
12 /// Sequence number assigned by the stream.
13 pub sequence: u64,
14 /// Store timestamp associated with the entry.
15 pub timestamp: u64,
16}
17
18/// Direct durability surface matching haematite's append/read/cas/scan API.
19#[async_trait::async_trait]
20pub trait DurableStore: std::fmt::Debug + Send + Sync {
21 /// Appends `payload` to `stream_key` if `expected_seq` matches the stream head.
22 async fn append(
23 &self,
24 stream_key: &str,
25 payload: Vec<u8>,
26 expected_seq: u64,
27 ) -> Result<u64, DurabilityError>;
28
29 /// Reads entries from `stream_key` beginning at `offset`, up to `limit` entries.
30 async fn read_from(
31 &self,
32 stream_key: &str,
33 offset: u64,
34 limit: usize,
35 ) -> Result<Vec<StoredEntry>, DurabilityError>;
36
37 /// Atomically replaces a stored numeric value if it equals `old_value`.
38 ///
39 /// An `old_value` of `0` matches a key that is currently *absent* as well as
40 /// one explicitly stored as `0`: a fresh cursor is created on its first
41 /// checkpoint without a prior write. See [`HaematiteStore::cas`] for how this
42 /// "absent == 0" contract is preserved atomically over the real engine.
43 async fn cas(&self, key: &str, old_value: u64, new_value: u64) -> Result<(), DurabilityError>;
44
45 /// Reads a numeric value previously updated through compare-and-swap.
46 async fn read_value(&self, key: &str) -> Result<Option<u64>, DurabilityError>;
47
48 /// Scans entries by store prefix.
49 async fn scan(&self, prefix: &str) -> Result<Vec<StoredEntry>, DurabilityError>;
50
51 /// Flushes buffered writes so completed durable operations are persisted.
52 ///
53 /// # Errors
54 /// Returns [`DurabilityError`] when the underlying store cannot complete the flush.
55 async fn flush(&self) -> Result<(), DurabilityError>;
56}
57
58/// `DurableStore` implementation that delegates directly to haematite's `EventStore`.
59///
60/// The real [`EventStore`] is synchronous (every call blocks on the owning
61/// shard actor's reply), so each `async` method below completes on its first
62/// poll. The synchronous bridge in [`super::bridge`] relies on exactly that.
63#[derive(Clone, Debug)]
64pub struct HaematiteStore {
65 event_store: Arc<EventStore>,
66}
67
68impl HaematiteStore {
69 /// Wraps a haematite `EventStore` handle.
70 #[must_use]
71 pub const fn new(event_store: Arc<EventStore>) -> Self {
72 Self { event_store }
73 }
74}
75
76#[async_trait::async_trait]
77impl DurableStore for HaematiteStore {
78 async fn append(
79 &self,
80 stream_key: &str,
81 payload: Vec<u8>,
82 expected_seq: u64,
83 ) -> Result<u64, DurabilityError> {
84 // Contract bridge: liminal's `DurableStore::append` returns the *assigned
85 // event sequence* (0-based position of the just-appended event), which is
86 // exactly `expected_seq` for a single append. The real `EventStore::append`
87 // instead returns the stream's new next-sequence (`expected_seq + 1`), so
88 // subtract one to recover the assigned seq. A `0` next-seq is impossible
89 // after a successful single append, so the `checked_sub` cannot saturate
90 // silently; if it ever did the engine returned a contract-violating value.
91 let next_seq = self
92 .event_store
93 .append(stream_key.as_bytes(), &payload, expected_seq)
94 .map_err(DurabilityError::from)?;
95 next_seq.checked_sub(1).ok_or_else(|| {
96 DurabilityError::StoreError(ApiError::CorruptEvent(format!(
97 "append returned next-seq 0 for stream {stream_key}"
98 )))
99 })
100 }
101
102 async fn read_from(
103 &self,
104 stream_key: &str,
105 offset: u64,
106 limit: usize,
107 ) -> Result<Vec<StoredEntry>, DurabilityError> {
108 // The real `read_from` returns every event with seq >= offset and applies
109 // no limit; truncate to `limit` entries to honour the trait contract.
110 let mut events = self
111 .event_store
112 .read_from(stream_key.as_bytes(), offset)
113 .map_err(DurabilityError::from)?;
114 events.truncate(limit);
115 Ok(events.into_iter().map(StoredEntry::from).collect())
116 }
117
118 async fn cas(&self, key: &str, old_value: u64, new_value: u64) -> Result<(), DurabilityError> {
119 // Preserve liminal's "absent == 0" cursor contract faithfully over an
120 // engine that distinguishes `None` (absent) from `Some(0)` (a stored
121 // zero). The invariant that makes the mapping below correct: we NEVER
122 // persist a physical zero, so a logical value of 0 and physical absence
123 // always coincide.
124 //
125 // A `cas` whose target `new_value` is 0 must therefore write nothing — it
126 // only asserts the precondition. This is reachable as `cas(0, 0)` (a
127 // cursor checkpoint at offset 0; offsets are monotonic so they never CAS
128 // down to 0 from a higher value). Were we instead to let it store a
129 // physical zero, the *next* `cas(0, n)` — mapped to expect-absent `None`
130 // — would wrongly fail against the now-present key and permanently stall
131 // the cursor. Asserting via a read is race-free here precisely because no
132 // value is written, so there is no lost-update window.
133 if new_value == 0 {
134 return self
135 .event_store
136 .read_value(key.as_bytes())
137 .map_err(DurabilityError::from)?
138 .map_or(Ok(()), |stored| {
139 Err(DurabilityError::CursorRegression {
140 stored,
141 attempted: old_value,
142 })
143 });
144 }
145 // With a physical zero never stored, `old_value == 0` is exactly the
146 // expect-absent expectation. Any other `old_value` maps to `Some(_)`.
147 // This is a single CAS routed to the owning shard actor, where read,
148 // compare, and write run with no interleaving point (haematite's
149 // `ShardActor::cas`) — the engine's atomicity is preserved end to end.
150 let expected = if old_value == 0 {
151 None
152 } else {
153 Some(old_value)
154 };
155 self.event_store
156 .cas(key.as_bytes(), expected, new_value)
157 .map_err(DurabilityError::from)
158 }
159
160 async fn read_value(&self, key: &str) -> Result<Option<u64>, DurabilityError> {
161 self.event_store
162 .read_value(key.as_bytes())
163 .map_err(DurabilityError::from)
164 }
165
166 async fn scan(&self, prefix: &str) -> Result<Vec<StoredEntry>, DurabilityError> {
167 // The real `scan` predicate yields stream *metadata* (key + next_seq),
168 // not events. Liminal's contract is to return the events of every stream
169 // whose key matches `prefix`, so collect the matching stream keys, then
170 // read each stream's full event list and flatten the results.
171 let prefix_bytes = prefix.as_bytes().to_vec();
172 let matches = self
173 .event_store
174 .scan(|meta| meta.stream_key.starts_with(&prefix_bytes))
175 .map_err(DurabilityError::from)?;
176 let mut entries = Vec::new();
177 for stream in matches {
178 let events = self
179 .event_store
180 .read(&stream.stream_key)
181 .map_err(DurabilityError::from)?;
182 entries.extend(events.into_iter().map(StoredEntry::from));
183 }
184 Ok(entries)
185 }
186
187 async fn flush(&self) -> Result<(), DurabilityError> {
188 self.event_store.flush().map_err(DurabilityError::from)
189 }
190}
191
192impl From<Event> for StoredEntry {
193 fn from(event: Event) -> Self {
194 Self {
195 payload: event.payload,
196 sequence: event.seq,
197 timestamp: event.timestamp,
198 }
199 }
200}
201
202/// Maps a real-engine [`ApiError`] onto liminal's [`DurabilityError`].
203///
204/// The optimistic-concurrency variants route to their dedicated `DurabilityError`
205/// cases (`SequenceConflict`, `CursorRegression`); everything else is a
206/// store-level failure carried verbatim.
207impl From<ApiError> for DurabilityError {
208 fn from(error: ApiError) -> Self {
209 match error {
210 ApiError::SequenceConflict(conflict) => conflict.into(),
211 ApiError::CasMismatch(mismatch) => mismatch.into(),
212 other @ (ApiError::CorruptEvent(_)
213 | ApiError::Storage(_)
214 | ApiError::HistoryCompacted(_)) => Self::StoreError(other),
215 }
216 }
217}