1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
use super::*;
use crate::id::EntityIdType;
use crate::id::EventId;
use crate::store::index::IndexEntry;
use std::collections::BTreeMap;
impl<State> Store<State> {
/// READ: get a single event by ID.
///
/// # Errors
/// Returns `StoreError::NotFound` if no event with that ID exists.
/// Returns `StoreError::Io` or `StoreError::Serialization` if reading from disk fails.
pub fn get(&self, event_id: EventId) -> Result<StoredEvent<serde_json::Value>, StoreError> {
let raw = event_id.as_u128();
let entry = self.index.get_by_id(raw).ok_or(StoreError::NotFound(raw))?;
self.reader.read_entry(&entry.disk_pos)
}
/// READ: fetch a single event by ID with the payload left as raw
/// MessagePack bytes.
/// Mirrors [`get`](Self::get) but skips the JSON-decode step, suitable
/// for the `RawMsgpackInput` lane of a multi-event reactor.
///
/// # Errors
/// Returns `StoreError::NotFound` if no event with that ID exists.
/// Returns `StoreError::Io` or `StoreError::Serialization` if reading
/// from disk fails.
pub fn read_raw(&self, event_id: EventId) -> Result<StoredEvent<Vec<u8>>, StoreError> {
let raw = event_id.as_u128();
let entry = self.index.get_by_id(raw).ok_or(StoreError::NotFound(raw))?;
self.reader.read_entry_raw(&entry.disk_pos)
}
/// Verify an append receipt against the store's signing-key registry and
/// current index state, returning only a boolean.
///
/// Prefer [`Self::verify_append_receipt_detailed`] in new code when the
/// caller needs proof language or a stable rejection reason.
#[must_use]
pub fn verify_append_receipt(&self, receipt: &AppendReceipt) -> bool {
self.verify_append_receipt_detailed(receipt).is_valid()
}
/// Verify ack-shaped append receipt fields against the store's signing-key
/// registry and current index state.
///
/// Wire transports omit [`AppendReceipt::disk_pos`]; this helper hydrates
/// it from the committed index entry before delegating to
/// [`Self::verify_append_receipt_detailed`].
#[must_use]
pub fn verify_append_receipt_wire_detailed(
&self,
event_id: EventId,
sequence: u64,
content_hash: [u8; 32],
key_id: [u8; 32],
signature: Option<[u8; 64]>,
extensions: BTreeMap<ExtensionKey, EncodedBytes>,
) -> ReceiptVerification {
let Some(entry) = self.index.get_by_id(event_id.as_u128()) else {
return ReceiptVerification::Invalid(ReceiptVerificationError::MissingCommittedEvent);
};
let receipt = AppendReceipt {
event_id,
sequence,
disk_pos: entry.disk_pos,
content_hash,
key_id,
signature,
extensions,
};
self.verify_append_receipt_detailed(&receipt)
}
/// Verify a full persisted append receipt and return the exact acceptance
/// or rejection reason.
///
/// This API expects the native [`AppendReceipt`], including its committed
/// disk position. Wire transports that only carry ack-shaped fields should
/// use [`Self::verify_append_receipt_wire_detailed`] so the store can
/// hydrate the disk position from the committed index entry.
#[must_use]
pub fn verify_append_receipt_detailed(&self, receipt: &AppendReceipt) -> ReceiptVerification {
let Some(entry) = self.index.get_by_id(receipt.event_id.as_u128()) else {
return ReceiptVerification::Invalid(ReceiptVerificationError::MissingCommittedEvent);
};
if let Some(error) = append_receipt_index_mismatch(receipt, &entry) {
return ReceiptVerification::Invalid(error);
}
self.runtime.signing_registry.verify_append_receipt(
receipt,
&entry.coord,
entry.kind,
entry.hash_chain.prev_hash,
)
}
/// Verify a persisted denial receipt against the store's signing-key
/// registry and current index state.
#[must_use]
pub fn verify_denial_receipt(&self, receipt: &DenialReceipt) -> bool {
self.verify_denial_receipt_detailed(receipt).is_valid()
}
/// Verify a persisted denial receipt and return the exact acceptance or
/// rejection reason.
#[must_use]
pub fn verify_denial_receipt_detailed(&self, receipt: &DenialReceipt) -> ReceiptVerification {
let Some(entry) = self.index.get_by_id(receipt.event_id.as_u128()) else {
return ReceiptVerification::Invalid(ReceiptVerificationError::MissingCommittedEvent);
};
if let Some(error) = denial_receipt_index_mismatch(receipt, &entry) {
return ReceiptVerification::Invalid(error);
}
self.runtime.signing_registry.verify_denial_receipt(
receipt,
&entry.coord,
entry.kind,
entry.hash_chain.prev_hash,
)
}
/// READ: return every currently visible index entry matching a Region.
///
/// This is a convenience snapshot read for small, already-bounded regions.
/// For replay, audit, host parity, or user-facing pagination, prefer
/// [`Self::query_entries_after`], which pages strictly by
/// `global_sequence`.
#[must_use]
pub fn query(&self, region: &Region) -> Vec<IndexEntry> {
self.index.query(region)
}
/// READ: query a bounded page of visible events by Region in ascending
/// `global_sequence` order.
///
/// Pass `None` for the first page. Pass the last returned entry's
/// [`IndexEntry::global_sequence`] as `Some(after_global_sequence)` to
/// resume strictly after that entry. `limit == 0` returns an empty page.
///
/// This is commit-order pagination, not a live cursor or server-held
/// session. Durable delivery cursors live under the delivery APIs.
#[must_use]
pub fn query_entries_after(
&self,
region: &Region,
after_global_sequence: Option<u64>,
limit: usize,
) -> Vec<IndexEntry> {
let after_seq = after_global_sequence.unwrap_or(0);
let started = after_global_sequence.is_some();
self.index
.query_hits_after(region, after_seq, started, limit)
.into_iter()
.filter_map(|hit| self.index.upgrade_hit(hit))
.collect()
}
/// READ: walk bounded hash-chain ancestors from an event id.
///
/// This is substrate ancestry, not domain graph traversal.
pub fn walk_ancestors(
&self,
event_id: EventId,
limit: usize,
) -> Vec<StoredEvent<serde_json::Value>> {
ancestry::walk_ancestors(self, event_id.as_u128(), limit)
}
/// PROJECT: reconstruct typed state from events, with cache support.
///
/// # Errors
/// Returns any replay, deserialization, cache, or disk-read error surfaced
/// while reconstructing the projection state.
pub fn project<T>(&self, entity: &str, freshness: &Freshness) -> Result<Option<T>, StoreError>
where
T: EventSourced + serde::Serialize + serde::de::DeserializeOwned + 'static,
T::Input: projection::flow::ReplayInput,
{
projection::flow::project(self, entity, freshness)
}
/// Return the current per-entity generation if the entity exists.
///
/// Generations advance monotonically on every insert for that entity.
/// When entity-group overlays are disabled, this falls back to the entity
/// stream length so callers still get a stable monotonic skip token.
pub fn entity_generation(&self, entity: &str) -> Option<u64> {
self.index.entity_generation(entity)
}
/// Project only when the entity changed since `last_seen_generation`.
///
/// Returns `Ok(None)` when no change is observed. Otherwise returns the
/// generation at which the returned state was materialized together with
/// the freshly projected state. The returned generation is honest: a
/// cache-hit path returns the generation at which the cache was
/// stamped, a replay path returns the generation sampled before replay
/// started. Callers who persist this generation as a watermark (e.g.
/// [`ProjectionWatcher`]) will not silently consume a relevant append
/// against stale state (F5). To preserve that property, this API treats
/// [`Freshness::MaybeStale`] the same as [`Freshness::Consistent`].
///
/// # Errors
/// Returns any error surfaced by [`Store::project`] when the entity has
/// changed and the projection must be rebuilt.
pub fn project_if_changed<T>(
&self,
entity: &str,
last_seen_generation: u64,
freshness: &Freshness,
) -> Result<Option<(u64, Option<T>)>, StoreError>
where
T: EventSourced + serde::Serialize + serde::de::DeserializeOwned + 'static,
T::Input: projection::flow::ReplayInput,
{
projection::flow::project_if_changed(self, entity, last_seen_generation, freshness)
}
/// READ: query all events for an exact entity id.
#[must_use]
pub fn by_entity(&self, entity: &str) -> Vec<IndexEntry> {
self.index.stream(entity)
}
/// READ: query all events in the given scope.
#[must_use]
pub fn by_scope(&self, scope: &str) -> Vec<IndexEntry> {
self.query(&Region::scope(scope))
}
/// READ: query all events of the given event kind across all entities and scopes.
#[must_use]
pub fn by_fact(&self, kind: EventKind) -> Vec<IndexEntry> {
self.query(&Region::all().with_fact(KindFilter::Exact(kind)))
}
/// READ (typed): query all events whose kind matches `T::KIND`.
///
/// Available on both `Store<Open>` and `Store<ReadOnly>`.
#[must_use]
pub fn by_fact_typed<T: EventPayload>(&self) -> Vec<IndexEntry> {
self.by_fact(T::KIND)
}
/// CURSOR: pull-based, ordered delivery from the in-memory index.
///
/// Available on both `Store<Open>` and `Store<ReadOnly>`. This cursor is
/// process-local durable-delivery vocabulary, not query pagination. It
/// does not persist its position, so restart-time at-least-once semantics
/// require the checkpoint-bound cursor worker surface rather than this
/// constructor.
pub fn cursor_guaranteed(&self, region: &Region) -> Cursor {
Cursor::new(region.clone(), Arc::clone(&self.index))
}
}
fn append_receipt_index_mismatch(
receipt: &AppendReceipt,
entry: &IndexEntry,
) -> Option<ReceiptVerificationError> {
if receipt.event_id.as_u128() != entry.event_id {
return Some(ReceiptVerificationError::EventIdMismatch);
}
if receipt.sequence != entry.global_sequence {
return Some(ReceiptVerificationError::SequenceMismatch);
}
if receipt.disk_pos != entry.disk_pos {
return Some(ReceiptVerificationError::DiskPositionMismatch);
}
if receipt.content_hash != entry.hash_chain.event_hash {
return Some(ReceiptVerificationError::ContentHashMismatch);
}
if receipt.extensions != entry.receipt_extensions {
return Some(ReceiptVerificationError::ExtensionsMismatch);
}
None
}
fn denial_receipt_index_mismatch(
receipt: &DenialReceipt,
entry: &IndexEntry,
) -> Option<ReceiptVerificationError> {
if entry.kind != EventKind::SYSTEM_DENIAL {
return Some(ReceiptVerificationError::DenialKindMismatch);
}
if receipt.event_id.as_u128() != entry.event_id {
return Some(ReceiptVerificationError::EventIdMismatch);
}
if receipt.sequence != entry.global_sequence {
return Some(ReceiptVerificationError::SequenceMismatch);
}
if receipt.disk_pos != entry.disk_pos {
return Some(ReceiptVerificationError::DiskPositionMismatch);
}
if receipt.content_hash != entry.hash_chain.event_hash {
return Some(ReceiptVerificationError::ContentHashMismatch);
}
if receipt.extensions != entry.receipt_extensions {
return Some(ReceiptVerificationError::ExtensionsMismatch);
}
None
}
#[cfg(test)]
mod tests {
use crate::coordinate::Coordinate;
use crate::event::EventKind;
use crate::store::index::DiskPos;
use crate::store::{ReceiptVerification, ReceiptVerificationError, Store, StoreConfig};
use tempfile::TempDir;
#[test]
fn append_receipt_verification_rejects_disk_position_tampering() {
let dir = TempDir::new().expect("temp dir");
let store = Store::open(
StoreConfig::new(dir.path())
.with_enable_checkpoint(false)
.with_enable_mmap_index(false),
)
.expect("open store");
let coord = Coordinate::new("entity:receipt-disk-pos", "scope:test").expect("coord");
let mut receipt = store
.append(
&coord,
EventKind::custom(0xA, 20),
&serde_json::json!({"n": 1}),
)
.expect("append");
assert_eq!(
store.verify_append_receipt_detailed(&receipt),
ReceiptVerification::UnsignedAccepted
);
assert!(store.verify_append_receipt(&receipt));
receipt.disk_pos = DiskPos::new(
receipt.disk_pos.segment_id(),
receipt.disk_pos.offset() + 1,
receipt.disk_pos.length(),
);
assert!(
!store.verify_append_receipt(&receipt),
"disk position must match the committed index entry"
);
assert_eq!(
store.verify_append_receipt_detailed(&receipt),
ReceiptVerification::Invalid(ReceiptVerificationError::DiskPositionMismatch)
);
}
#[test]
fn wire_append_receipt_verification_hydrates_disk_pos_from_index() {
let dir = TempDir::new().expect("temp dir");
let store = Store::open(
StoreConfig::new(dir.path())
.with_enable_checkpoint(false)
.with_enable_mmap_index(false),
)
.expect("open store");
let coord = Coordinate::new("entity:wire-verify", "scope:test").expect("coord");
let receipt = store
.append(
&coord,
EventKind::custom(0xA, 22),
&serde_json::json!({"n": 1}),
)
.expect("append");
let verification = store.verify_append_receipt_wire_detailed(
receipt.event_id,
receipt.sequence,
receipt.content_hash,
receipt.key_id,
receipt.signature,
receipt.extensions.clone(),
);
assert_eq!(verification, ReceiptVerification::UnsignedAccepted);
}
}