1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
use std::sync::Arc;
use haematite::{ApiError, Event, EventStore};
use super::DurabilityError;
/// Entry read from a durable haematite stream.
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct StoredEntry {
/// Opaque stored payload bytes.
pub payload: Vec<u8>,
/// Sequence number assigned by the stream.
pub sequence: u64,
/// Store timestamp associated with the entry.
pub timestamp: u64,
}
/// Direct durability surface matching haematite's append/read/cas/scan API.
#[async_trait::async_trait]
pub trait DurableStore: std::fmt::Debug + Send + Sync {
/// Appends `payload` to `stream_key` if `expected_seq` matches the stream head.
async fn append(
&self,
stream_key: &str,
payload: Vec<u8>,
expected_seq: u64,
) -> Result<u64, DurabilityError>;
/// Reads entries from `stream_key` beginning at `offset`, up to `limit` entries.
async fn read_from(
&self,
stream_key: &str,
offset: u64,
limit: usize,
) -> Result<Vec<StoredEntry>, DurabilityError>;
/// Atomically replaces a stored numeric value if it equals `old_value`.
///
/// An `old_value` of `0` matches a key that is currently *absent* as well as
/// one explicitly stored as `0`: a fresh cursor is created on its first
/// checkpoint without a prior write. See [`HaematiteStore::cas`] for how this
/// "absent == 0" contract is preserved atomically over the real engine.
async fn cas(&self, key: &str, old_value: u64, new_value: u64) -> Result<(), DurabilityError>;
/// Reads a numeric value previously updated through compare-and-swap.
async fn read_value(&self, key: &str) -> Result<Option<u64>, DurabilityError>;
/// Scans entries by store prefix.
async fn scan(&self, prefix: &str) -> Result<Vec<StoredEntry>, DurabilityError>;
/// Flushes buffered writes so completed durable operations are persisted.
///
/// # Errors
/// Returns [`DurabilityError`] when the underlying store cannot complete the flush.
async fn flush(&self) -> Result<(), DurabilityError>;
}
/// `DurableStore` implementation that delegates directly to haematite's `EventStore`.
///
/// The real [`EventStore`] is synchronous (every call blocks on the owning
/// shard actor's reply), so each `async` method below completes on its first
/// poll. The synchronous bridge in [`super::bridge`] relies on exactly that.
#[derive(Clone, Debug)]
pub struct HaematiteStore {
event_store: Arc<EventStore>,
}
impl HaematiteStore {
/// Wraps a haematite `EventStore` handle.
#[must_use]
pub const fn new(event_store: Arc<EventStore>) -> Self {
Self { event_store }
}
}
#[async_trait::async_trait]
impl DurableStore for HaematiteStore {
async fn append(
&self,
stream_key: &str,
payload: Vec<u8>,
expected_seq: u64,
) -> Result<u64, DurabilityError> {
// Contract bridge: liminal's `DurableStore::append` returns the *assigned
// event sequence* (0-based position of the just-appended event), which is
// exactly `expected_seq` for a single append. The real `EventStore::append`
// instead returns the stream's new next-sequence (`expected_seq + 1`), so
// subtract one to recover the assigned seq. A `0` next-seq is impossible
// after a successful single append, so the `checked_sub` cannot saturate
// silently; if it ever did the engine returned a contract-violating value.
let next_seq = self
.event_store
.append(stream_key.as_bytes(), &payload, expected_seq)
.map_err(DurabilityError::from)?;
next_seq.checked_sub(1).ok_or_else(|| {
DurabilityError::StoreError(ApiError::CorruptEvent(format!(
"append returned next-seq 0 for stream {stream_key}"
)))
})
}
async fn read_from(
&self,
stream_key: &str,
offset: u64,
limit: usize,
) -> Result<Vec<StoredEntry>, DurabilityError> {
// The real `read_from` returns every event with seq >= offset and applies
// no limit; truncate to `limit` entries to honour the trait contract.
let mut events = self
.event_store
.read_from(stream_key.as_bytes(), offset)
.map_err(DurabilityError::from)?;
events.truncate(limit);
Ok(events.into_iter().map(StoredEntry::from).collect())
}
async fn cas(&self, key: &str, old_value: u64, new_value: u64) -> Result<(), DurabilityError> {
// Preserve liminal's "absent == 0" cursor contract faithfully over an
// engine that distinguishes `None` (absent) from `Some(0)` (a stored
// zero). The invariant that makes the mapping below correct: we NEVER
// persist a physical zero, so a logical value of 0 and physical absence
// always coincide.
//
// A `cas` whose target `new_value` is 0 must therefore write nothing — it
// only asserts the precondition. This is reachable as `cas(0, 0)` (a
// cursor checkpoint at offset 0; offsets are monotonic so they never CAS
// down to 0 from a higher value). Were we instead to let it store a
// physical zero, the *next* `cas(0, n)` — mapped to expect-absent `None`
// — would wrongly fail against the now-present key and permanently stall
// the cursor. Asserting via a read is race-free here precisely because no
// value is written, so there is no lost-update window.
if new_value == 0 {
return self
.event_store
.read_value(key.as_bytes())
.map_err(DurabilityError::from)?
.map_or(Ok(()), |stored| {
Err(DurabilityError::CursorRegression {
stored,
attempted: old_value,
})
});
}
// With a physical zero never stored, `old_value == 0` is exactly the
// expect-absent expectation. Any other `old_value` maps to `Some(_)`.
// This is a single CAS routed to the owning shard actor, where read,
// compare, and write run with no interleaving point (haematite's
// `ShardActor::cas`) — the engine's atomicity is preserved end to end.
let expected = if old_value == 0 {
None
} else {
Some(old_value)
};
self.event_store
.cas(key.as_bytes(), expected, new_value)
.map_err(DurabilityError::from)
}
async fn read_value(&self, key: &str) -> Result<Option<u64>, DurabilityError> {
self.event_store
.read_value(key.as_bytes())
.map_err(DurabilityError::from)
}
async fn scan(&self, prefix: &str) -> Result<Vec<StoredEntry>, DurabilityError> {
// The real `scan` predicate yields stream *metadata* (key + next_seq),
// not events. Liminal's contract is to return the events of every stream
// whose key matches `prefix`, so collect the matching stream keys, then
// read each stream's full event list and flatten the results.
let prefix_bytes = prefix.as_bytes().to_vec();
let matches = self
.event_store
.scan(|meta| meta.stream_key.starts_with(&prefix_bytes))
.map_err(DurabilityError::from)?;
let mut entries = Vec::new();
for stream in matches {
let events = self
.event_store
.read(&stream.stream_key)
.map_err(DurabilityError::from)?;
entries.extend(events.into_iter().map(StoredEntry::from));
}
Ok(entries)
}
async fn flush(&self) -> Result<(), DurabilityError> {
self.event_store.flush().map_err(DurabilityError::from)
}
}
impl From<Event> for StoredEntry {
fn from(event: Event) -> Self {
Self {
payload: event.payload,
sequence: event.seq,
timestamp: event.timestamp,
}
}
}
/// Maps a real-engine [`ApiError`] onto liminal's [`DurabilityError`].
///
/// The optimistic-concurrency variants route to their dedicated `DurabilityError`
/// cases (`SequenceConflict`, `CursorRegression`); everything else is a
/// store-level failure carried verbatim.
impl From<ApiError> for DurabilityError {
fn from(error: ApiError) -> Self {
match error {
ApiError::SequenceConflict(conflict) => conflict.into(),
ApiError::CasMismatch(mismatch) => mismatch.into(),
other @ (ApiError::CorruptEvent(_)
| ApiError::Storage(_)
| ApiError::HistoryCompacted(_)) => Self::StoreError(other),
}
}
}