mako_engine/event_store.rs
1//! [`EventStore`] trait and the in-process [`InMemoryEventStore`] implementation.
2//!
3//! The engine defines only the trait. The production implementation is
4//! [`SlateDbStore`][crate::store_slatedb::SlateDbStore], enabled by the `slatedb`
5//! feature flag. [`InMemoryEventStore`] is included here for tests, spikes, and
6//! development without external dependencies.
7
8use std::sync::Arc;
9
10#[cfg(any(test, feature = "testing"))]
11use std::collections::HashMap;
12#[cfg(any(test, feature = "testing"))]
13use time::OffsetDateTime;
14#[cfg(any(test, feature = "testing"))]
15use tokio::sync::RwLock;
16
17use crate::{
18 envelope::{EventEnvelope, NewEvent},
19 error::EngineError,
20 ids::StreamId,
21};
22
23// ── ExpectedVersion ───────────────────────────────────────────────────────────
24
25/// Optimistic concurrency control contract for [`EventStore::append`].
26///
27/// The caller declares which sequence number they expect the stream to be at.
28/// The store atomically checks this before writing; a mismatch means a
29/// concurrent writer modified the stream and the caller must reload and retry.
30#[derive(Debug, Clone, Copy, PartialEq, Eq)]
31pub enum ExpectedVersion {
32 /// The stream must not exist yet (sequence number 0).
33 NoStream,
34 /// The stream must be at exactly this sequence number.
35 Exact(u64),
36 /// Skip the concurrency check entirely.
37 ///
38 /// **Do not use in production workflow append paths.** `Any` silently
39 /// accepts any write regardless of concurrent modifications, which can
40 /// produce duplicate or interleaved events in a stream.
41 ///
42 /// Legitimate uses: [`MigrationRunner`] (bulk admin rewrites),
43 /// snapshot-accelerated store internals, and test scaffolding where the
44 /// caller owns all write access by construction.
45 ///
46 /// For normal workflow event appends always use [`ExpectedVersion::NoStream`]
47 /// (first event) or [`ExpectedVersion::Exact`] (subsequent events).
48 ///
49 /// [`MigrationRunner`]: crate::migration::MigrationRunner
50 Any,
51}
52
53// ── AppendResult ──────────────────────────────────────────────────────────────
54
55/// Metadata returned after a successful [`EventStore::append`].
56#[derive(Debug, Clone)]
57pub struct AppendResult {
58 /// The sequence number of the last event written in this batch.
59 pub last_sequence: u64,
60 /// The fully materialised envelopes as persisted by the store.
61 ///
62 /// Each envelope has its `event_id`, `sequence_number`, `stream_id`, and
63 /// `timestamp` stamped by the store. Callers use these for return values
64 /// and projection seeding without re-loading from storage.
65 pub events: Vec<EventEnvelope>,
66}
67
68// ── EventStore trait ──────────────────────────────────────────────────────────
69
70/// Append-only, ordered event stream storage contract.
71///
72/// ## Implementation requirements
73///
74/// - **Ordered**: events within a stream are always returned in append order.
75/// - **Atomic**: a multi-event append either fully succeeds or fully fails.
76/// - **Optimistic concurrency**: detect concurrent writers via
77/// [`ExpectedVersion`].
78/// - **Append-only**: events are never modified or deleted through this API.
79/// - **Sequence number ownership**: the store assigns `sequence_number`,
80/// `event_id`, `stream_id`, and `timestamp` on each appended envelope.
81/// Callers submit [`NewEvent`] values without these fields.
82///
83/// ## Blanket `Arc` implementation
84///
85/// `Arc<S>` implements `EventStore` whenever `S: EventStore`, so
86/// `Process<W, Arc<MyStore>>` works without any extra wrapper type.
87#[allow(async_fn_in_trait)] // RPIT-in-traits with Send bounds require AFIT; use #[allow] until
88// the ecosystem settles on a stable pattern for Rust 1.85 MSRV.
89pub trait EventStore: Send + Sync {
90 /// Atomically append `events` to `stream_id`.
91 ///
92 /// The store assigns `event_id`, `sequence_number`, `stream_id`, and
93 /// `timestamp` on each event. The fully materialised envelopes are
94 /// returned in [`AppendResult::events`].
95 ///
96 /// # Errors
97 ///
98 /// - [`EngineError::VersionConflict`] when `expected_version` is
99 /// [`ExpectedVersion::NoStream`] or [`ExpectedVersion::Exact`] and the
100 /// actual stream version does not match.
101 /// - [`EngineError::Store`] for underlying storage failures.
102 async fn append(
103 &self,
104 stream_id: &StreamId,
105 expected_version: ExpectedVersion,
106 events: &[NewEvent],
107 ) -> Result<AppendResult, EngineError>;
108
109 /// Load all events from `stream_id` in sequence order.
110 ///
111 /// Returns an empty `Vec` when the stream does not exist.
112 ///
113 /// # Errors
114 ///
115 /// Returns [`EngineError::Store`] for underlying storage failures.
116 async fn load(&self, stream_id: &StreamId) -> Result<Vec<EventEnvelope>, EngineError>;
117
118 /// Load events from `stream_id` starting after `from_sequence` (exclusive).
119 ///
120 /// Useful for incremental projection catch-up: pass the projection's last
121 /// processed sequence number to load only new events.
122 ///
123 /// Returns an empty `Vec` when no new events exist.
124 ///
125 /// # Errors
126 ///
127 /// Returns [`EngineError::Store`] for underlying storage failures.
128 async fn load_from(
129 &self,
130 stream_id: &StreamId,
131 from_sequence: u64,
132 ) -> Result<Vec<EventEnvelope>, EngineError>;
133
134 /// Return the current sequence number of `stream_id`.
135 ///
136 /// The sequence number equals the number of events in the stream (1-based
137 /// after the first append). Returns `0` when the stream does not exist.
138 ///
139 /// Use this instead of `load(…).await?.len()` when you only need the
140 /// count — backends can implement this as a cheap metadata query without
141 /// transferring event payloads.
142 ///
143 /// **Required.** There is no default implementation — a fallback that
144 /// loads all events defeats the O(1) metadata-query contract. Implementors
145 /// must read the stored sequence counter directly (e.g. a `sv/{stream_id}`
146 /// key in SlateDB) to avoid O(n) event-payload transfers.
147 ///
148 /// # Errors
149 ///
150 /// Returns [`EngineError::Store`] for underlying storage failures.
151 async fn stream_version(&self, stream_id: &StreamId) -> Result<u64, EngineError>;
152
153 /// Return all known stream identifiers in this store, optionally filtered
154 /// by `prefix`.
155 ///
156 /// When `prefix` is `Some("process/")`, only streams whose identifiers
157 /// start with `"process/"` are returned (e.g. all process-instance
158 /// streams). When `prefix` is `None`, all streams are returned.
159 ///
160 /// This is the primary enumeration API for multi-stream projections: the
161 /// caller discovers all relevant streams, then passes the list to
162 /// [`crate::projection::ProjectionRunner::run_all_streams`] /
163 /// [`crate::projection::ProjectionRunner::catch_up_all_streams`].
164 ///
165 /// The returned order is unspecified. Stable ordering can be achieved by
166 /// sorting the `Vec` before use if deterministic replay is required.
167 ///
168 /// **Required.** There is no default implementation — a missing override
169 /// silently returns no streams, causing multi-stream projections (e.g.
170 /// MABIS billing aggregations) to process zero streams and return empty
171 /// read models with no error signal.
172 ///
173 /// # Errors
174 ///
175 /// Returns [`EngineError::Store`] for underlying storage failures.
176 async fn list_streams(&self, prefix: Option<&str>) -> Result<Vec<StreamId>, EngineError>;
177
178 /// Paginated stream enumeration — equivalent to `list_streams` but returns
179 /// at most `limit` entries starting after `cursor` (exclusive, UTF-8
180 /// stream-ID order).
181 ///
182 /// # Parameters
183 ///
184 /// - `prefix` — optional key prefix to restrict the scan (same semantics as
185 /// `list_streams`).
186 /// - `cursor` — if `Some(s)`, resume after stream ID `s`; `None` starts
187 /// from the beginning.
188 /// - `limit` — maximum number of stream IDs to return per page. A return
189 /// count strictly less than `limit` indicates the last page.
190 ///
191 /// # Page iteration pattern
192 ///
193 /// ```rust,ignore
194 /// let mut cursor: Option<StreamId> = None;
195 /// loop {
196 /// let page = store.list_streams_page(Some("process/"), cursor.as_ref(), 100).await?;
197 /// let done = page.len() < 100;
198 /// for id in &page { /* process */ }
199 /// cursor = page.into_iter().last();
200 /// if done { break; }
201 /// }
202 /// ```
203 ///
204 /// # Default implementation
205 ///
206 /// Falls back to `list_streams` + in-memory slicing for stores that do not
207 /// provide a native cursor scan.
208 ///
209 /// # ⚠️ Override required for production stores
210 ///
211 /// This default loads **all** matching stream IDs into memory on every call,
212 /// making `list_streams_page` loops O(n²) in total stream count. Any
213 /// production `EventStore` implementation (e.g. PostgreSQL, CockroachDB)
214 /// **must** override this method with an efficient cursor-based scan. The
215 /// SlateDB store already provides such an override. Failure to override
216 /// this method will cause projection catch-up to degrade silently under
217 /// deployments with > 10,000 active process streams.
218 ///
219 /// # Errors
220 ///
221 /// Returns [`EngineError::Store`] for underlying storage failures.
222 async fn list_streams_page(
223 &self,
224 prefix: Option<&str>,
225 cursor: Option<&StreamId>,
226 limit: usize,
227 ) -> Result<Vec<StreamId>, EngineError> {
228 // Default: enumerate all + skip-after-cursor + take(limit).
229 let all = self.list_streams(prefix).await?;
230 let iter: Box<dyn Iterator<Item = StreamId>> = match cursor {
231 None => Box::new(all.into_iter()),
232 Some(c) => Box::new(all.into_iter().skip_while(move |id| id != c).skip(1)),
233 };
234 Ok(iter.take(limit).collect())
235 }
236
237 /// Fold over events in `stream_id` starting after `from_sequence`
238 /// (exclusive), accumulating state without materialising the full
239 /// `Vec<EventEnvelope>`.
240 ///
241 /// This is the memory-efficient alternative to `load_from` for large
242 /// streams. Instead of returning all events as a Vec, it applies `f` to
243 /// each event in order and returns the final accumulated value.
244 ///
245 /// `from_sequence = 0` folds from the beginning of the stream.
246 ///
247 /// ```rust,ignore
248 /// // Reconstruct process state event-by-event without a Vec allocation:
249 /// let state = store.fold_stream(
250 /// &stream_id, 0, W::State::default(),
251 /// |acc, env| Ok(acc.apply(env.event()))
252 /// ).await?;
253 /// ```
254 ///
255 /// **Required.** There is no default implementation — a fallback that
256 /// materialises `load_from(...)` into a `Vec` defeats the purpose of this
257 /// method for large MABIS billing streams (potentially thousands of events
258 /// per billing period). Implementors must provide a cursor-based scan for
259 /// constant-memory behaviour.
260 ///
261 /// # Errors
262 ///
263 /// Returns [`EngineError::Store`] for underlying storage failures.
264 /// Returns any error produced by `f`.
265 async fn fold_stream<T, F>(
266 &self,
267 stream_id: &StreamId,
268 from_sequence: u64,
269 initial: T,
270 f: F,
271 ) -> Result<T, EngineError>
272 where
273 T: Send,
274 F: FnMut(T, EventEnvelope) -> Result<T, EngineError> + Send;
275}
276
277// ── Arc<S> blanket impl ───────────────────────────────────────────────────────
278
279impl<S: EventStore> EventStore for Arc<S> {
280 async fn append(
281 &self,
282 stream_id: &StreamId,
283 expected_version: ExpectedVersion,
284 events: &[NewEvent],
285 ) -> Result<AppendResult, EngineError> {
286 self.as_ref()
287 .append(stream_id, expected_version, events)
288 .await
289 }
290
291 async fn load(&self, stream_id: &StreamId) -> Result<Vec<EventEnvelope>, EngineError> {
292 self.as_ref().load(stream_id).await
293 }
294
295 async fn load_from(
296 &self,
297 stream_id: &StreamId,
298 from_sequence: u64,
299 ) -> Result<Vec<EventEnvelope>, EngineError> {
300 self.as_ref().load_from(stream_id, from_sequence).await
301 }
302
303 async fn stream_version(&self, stream_id: &StreamId) -> Result<u64, EngineError> {
304 self.as_ref().stream_version(stream_id).await
305 }
306
307 async fn list_streams(&self, prefix: Option<&str>) -> Result<Vec<StreamId>, EngineError> {
308 self.as_ref().list_streams(prefix).await
309 }
310
311 async fn list_streams_page(
312 &self,
313 prefix: Option<&str>,
314 cursor: Option<&StreamId>,
315 limit: usize,
316 ) -> Result<Vec<StreamId>, EngineError> {
317 self.as_ref().list_streams_page(prefix, cursor, limit).await
318 }
319
320 async fn fold_stream<T, F>(
321 &self,
322 stream_id: &StreamId,
323 from_sequence: u64,
324 initial: T,
325 f: F,
326 ) -> Result<T, EngineError>
327 where
328 T: Send,
329 F: FnMut(T, EventEnvelope) -> Result<T, EngineError> + Send,
330 {
331 self.as_ref()
332 .fold_stream(stream_id, from_sequence, initial, f)
333 .await
334 }
335}
336
337// ── Arc<S>: AtomicAppend blanket impl ────────────────────────────────────────
338
339/// Blanket delegation so `Arc<S>` inherits the `AtomicAppend` contract from
340/// `S`.
341///
342/// This enables `Process<W, Arc<SlateDbStore>>` to call
343/// `execute_and_enqueue` and its retry/snapshot variants without requiring
344/// callers to unwrap the `Arc`.
345impl<S: AtomicAppend> AtomicAppend for Arc<S> {
346 async fn append_with_outbox(
347 &self,
348 stream_id: &StreamId,
349 expected_version: ExpectedVersion,
350 events: &[NewEvent],
351 outbox: &[crate::outbox::PendingOutbox],
352 ) -> Result<AppendResult, EngineError> {
353 self.as_ref()
354 .append_with_outbox(stream_id, expected_version, events, outbox)
355 .await
356 }
357
358 async fn append_with_outbox_and_deadlines(
359 &self,
360 stream_id: &StreamId,
361 expected_version: ExpectedVersion,
362 events: &[NewEvent],
363 outbox: &[crate::outbox::PendingOutbox],
364 deadlines: &[crate::deadline::Deadline],
365 ) -> Result<AppendResult, EngineError> {
366 self.as_ref()
367 .append_with_outbox_and_deadlines(
368 stream_id,
369 expected_version,
370 events,
371 outbox,
372 deadlines,
373 )
374 .await
375 }
376}
377
378// ── AtomicAppend trait ────────────────────────────────────────────────────────
379
380/// Extension of [`EventStore`] that atomically appends events **and** enqueues
381/// outbox messages in a single write operation.
382///
383/// Implementations must guarantee that either both the events and the outbox
384/// messages are persisted, or neither is — even across process crashes. For
385/// SlateDB this is achieved via a single [`WriteBatch`].
386///
387/// # Why a separate trait?
388///
389/// Not every [`EventStore`] backend supports atomic dual-writes (e.g. an
390/// in-memory test store). Keeping atomicity in a separate trait allows
391/// `Process::execute` to work against any `EventStore`, while
392/// `Process::execute_and_enqueue` requires the stronger `AtomicAppend` bound.
393///
394/// # Safety
395///
396/// Only call `append_with_outbox` from the engine's `execute_and_enqueue`
397/// path. Never write events first and outbox messages second — a crash
398/// between the two produces a silent lost APERAK.
399///
400/// [`WriteBatch`]: slatedb::WriteBatch
401#[allow(async_fn_in_trait)]
402pub trait AtomicAppend: EventStore {
403 /// Atomically append `events` to `stream_id` and schedule `outbox` messages.
404 ///
405 /// The `outbox` slice carries lightweight [`crate::outbox::PendingOutbox`]
406 /// values produced by [`Workflow::handle`]. The implementation is
407 /// responsible for materialising them into fully-typed
408 /// [`crate::outbox::OutboxMessage`] values using the store-assigned fields
409 /// of the stamped envelopes (e.g. `event_id` as `causation_event_id`).
410 ///
411 /// When `outbox` is empty, this degenerates to a plain `EventStore::append`.
412 ///
413 /// # Errors
414 ///
415 /// - [`EngineError::VersionConflict`] — optimistic concurrency check
416 /// failed; reload state and retry.
417 /// - [`EngineError::Store`] or [`EngineError::Outbox`] — storage failure.
418 ///
419 /// [`Workflow::handle`]: crate::workflow::Workflow::handle
420 async fn append_with_outbox(
421 &self,
422 stream_id: &StreamId,
423 expected_version: ExpectedVersion,
424 events: &[NewEvent],
425 outbox: &[crate::outbox::PendingOutbox],
426 ) -> Result<AppendResult, EngineError>;
427
428 /// Atomically append `events`, schedule `outbox` messages, **and** register
429 /// `deadlines` in a single write operation.
430 ///
431 /// Stronger guarantee than calling [`append_with_outbox`] followed by
432 /// [`DeadlineStore::register`]: either all three sets of writes land or
433 /// none do. This eliminates the non-atomic window where a process event is
434 /// persisted but its regulatory deadline is lost (e.g. on a crash between
435 /// the two calls).
436 ///
437 /// # Default implementation
438 ///
439 /// The default falls back to [`append_with_outbox`] only — **deadlines are
440 /// not persisted**. Override this in [`AtomicAppend`] implementations that
441 /// include a deadline store in the same underlying database (e.g.
442 /// [`SlateDbStore`]) to achieve full atomicity.
443 ///
444 /// Callers using the default must register deadlines separately via
445 /// [`DeadlineStore::register`] after this returns.
446 ///
447 /// # Errors
448 ///
449 /// Same as [`append_with_outbox`].
450 ///
451 /// [`append_with_outbox`]: AtomicAppend::append_with_outbox
452 /// [`DeadlineStore::register`]: crate::deadline::DeadlineStore::register
453 /// [`SlateDbStore`]: crate::store_slatedb::SlateDbStore
454 async fn append_with_outbox_and_deadlines(
455 &self,
456 stream_id: &StreamId,
457 expected_version: ExpectedVersion,
458 events: &[NewEvent],
459 outbox: &[crate::outbox::PendingOutbox],
460 _deadlines: &[crate::deadline::Deadline],
461 ) -> Result<AppendResult, EngineError> {
462 // Default: non-atomic fallback — deadlines must be registered separately.
463 self.append_with_outbox(stream_id, expected_version, events, outbox)
464 .await
465 }
466}
467
468// ── InMemoryEventStore ────────────────────────────────────────────────────────
469
470/// Internal state held behind the `Arc<Mutex<…>>`.
471#[cfg(any(test, feature = "testing"))]
472#[derive(Debug, Default)]
473struct InMemoryState {
474 /// Per-stream ordered event log (sequence-number indexed).
475 streams: HashMap<StreamId, Vec<EventEnvelope>>,
476 /// Global insertion-order log across all streams.
477 ///
478 /// This is the source for [`InMemoryEventStore::all_events`]. Keeping a
479 /// separate flat list avoids collecting + sorting across stream-local
480 /// sequence namespaces, which would produce an arbitrary ordering when
481 /// multiple streams are active.
482 global: Vec<EventEnvelope>,
483}
484
485/// A fully in-memory [`EventStore`] for testing and development.
486///
487/// Backed by two logs protected by a `RwLock`:
488/// - A per-stream `HashMap` for sequence-ordered access.
489/// - A global flat `Vec` that preserves cross-stream insertion order.
490///
491/// The store assigns `event_id`, `sequence_number`, `stream_id`, and
492/// `timestamp` to each appended event — callers submit [`NewEvent`] values.
493///
494/// Cloning the store shares the underlying data via `Arc` — all clones see
495/// the same events.
496///
497/// **Not suitable for production.** Use this for:
498/// - Unit and integration tests
499/// - Spikes and local development
500/// - CI environments that must not depend on external services
501///
502/// Only available in `#[cfg(test)]` or with the `testing` feature enabled.
503#[cfg(any(test, feature = "testing"))]
504#[derive(Debug, Default, Clone)]
505pub struct InMemoryEventStore {
506 inner: Arc<RwLock<InMemoryState>>,
507}
508
509#[cfg(any(test, feature = "testing"))]
510impl InMemoryEventStore {
511 /// Create an empty store.
512 #[must_use]
513 pub fn new() -> Self {
514 Self::default()
515 }
516
517 /// Return all events across all streams in insertion order.
518 ///
519 /// Because sequence numbers are stream-local, this method uses the
520 /// insertion-order global log rather than sorting by sequence number.
521 ///
522 /// **Test/development use only.** Production code should use
523 /// [`EventStore::load`] or [`EventStore::load_from`] to read specific
524 /// streams. Loading all events at once from a production store can OOM
525 /// the process.
526 ///
527 /// Available only when the `testing` feature is enabled or in `cfg(test)`.
528 #[cfg(any(test, feature = "testing"))]
529 #[must_use]
530 pub async fn all_events(&self) -> Vec<EventEnvelope> {
531 self.inner.read().await.global.clone()
532 }
533
534 /// Return all events for a specific stream in sequence order.
535 ///
536 /// **Test/development use only.** In production code, prefer
537 /// [`EventStore::load`] which is part of the trait contract.
538 ///
539 /// Available only when the `testing` feature is enabled or in `cfg(test)`.
540 #[cfg(any(test, feature = "testing"))]
541 #[must_use]
542 pub async fn events_for(&self, stream_id: &StreamId) -> Vec<EventEnvelope> {
543 self.inner
544 .read()
545 .await
546 .streams
547 .get(stream_id)
548 .cloned()
549 .unwrap_or_default()
550 }
551}
552
553#[cfg(any(test, feature = "testing"))]
554impl EventStore for InMemoryEventStore {
555 async fn append(
556 &self,
557 stream_id: &StreamId,
558 expected_version: ExpectedVersion,
559 new_events: &[NewEvent],
560 ) -> Result<AppendResult, EngineError> {
561 let mut inner = self.inner.write().await;
562
563 let current = inner.streams.get(stream_id).map_or(0, |s| s.len() as u64);
564
565 // Optimistic concurrency check.
566 match expected_version {
567 ExpectedVersion::NoStream => {
568 if current != 0 {
569 return Err(EngineError::VersionConflict {
570 expected: 0,
571 actual: current,
572 });
573 }
574 }
575 ExpectedVersion::Exact(v) => {
576 if current != v {
577 return Err(EngineError::VersionConflict {
578 expected: v,
579 actual: current,
580 });
581 }
582 }
583 ExpectedVersion::Any => {}
584 }
585
586 // Stamp each NewEvent with store-assigned fields.
587 let now = OffsetDateTime::now_utc();
588 let envelopes: Vec<EventEnvelope> = new_events
589 .iter()
590 .enumerate()
591 .map(|(i, new)| {
592 EventEnvelope::from_new(new.clone(), stream_id.clone(), current + i as u64 + 1, now)
593 })
594 .collect();
595
596 // Append to the per-stream log.
597 inner
598 .streams
599 .entry(stream_id.clone())
600 .or_default()
601 .extend_from_slice(&envelopes);
602
603 // Append to the global insertion-order log.
604 inner.global.extend_from_slice(&envelopes);
605
606 Ok(AppendResult {
607 last_sequence: current + new_events.len() as u64,
608 events: envelopes,
609 })
610 }
611
612 async fn load(&self, stream_id: &StreamId) -> Result<Vec<EventEnvelope>, EngineError> {
613 let inner = self.inner.read().await;
614 Ok(inner.streams.get(stream_id).cloned().unwrap_or_default())
615 }
616
617 async fn load_from(
618 &self,
619 stream_id: &StreamId,
620 from_sequence: u64,
621 ) -> Result<Vec<EventEnvelope>, EngineError> {
622 let inner = self.inner.read().await;
623 Ok(inner
624 .streams
625 .get(stream_id)
626 .map(|events| {
627 events
628 .iter()
629 .filter(|e| e.sequence_number > from_sequence)
630 .cloned()
631 .collect()
632 })
633 .unwrap_or_default())
634 }
635
636 /// O(1) version check — reads the stream length from the HashMap without
637 /// cloning any event payloads.
638 async fn stream_version(&self, stream_id: &StreamId) -> Result<u64, EngineError> {
639 let inner = self.inner.read().await;
640 Ok(inner.streams.get(stream_id).map_or(0, |s| s.len() as u64))
641 }
642
643 /// Returns all known stream identifiers, optionally filtered by `prefix`.
644 ///
645 /// O(n) in the number of streams — scans the HashMap keys once.
646 async fn list_streams(&self, prefix: Option<&str>) -> Result<Vec<StreamId>, EngineError> {
647 let inner = self.inner.read().await;
648 let ids = inner
649 .streams
650 .keys()
651 .filter(|id| prefix.is_none_or(|p| id.as_str().starts_with(p)))
652 .cloned()
653 .collect();
654 Ok(ids)
655 }
656
657 /// Paginated stream enumeration for `InMemoryEventStore`.
658 ///
659 /// Collects all matching keys, sorts them for deterministic order, then
660 /// applies cursor + limit slicing. O(n) — acceptable for test/dev stores.
661 async fn list_streams_page(
662 &self,
663 prefix: Option<&str>,
664 cursor: Option<&StreamId>,
665 limit: usize,
666 ) -> Result<Vec<StreamId>, EngineError> {
667 if limit == 0 {
668 return Ok(Vec::new());
669 }
670 let inner = self.inner.read().await;
671 let mut ids: Vec<StreamId> = inner
672 .streams
673 .keys()
674 .filter(|id| prefix.is_none_or(|p| id.as_str().starts_with(p)))
675 .cloned()
676 .collect();
677 // Sort for deterministic pagination order (HashMap is unordered).
678 ids.sort_unstable_by(|a, b| a.as_str().cmp(b.as_str()));
679 let iter: Box<dyn Iterator<Item = StreamId>> = match cursor {
680 None => Box::new(ids.into_iter()),
681 Some(c) => Box::new(ids.into_iter().skip_while(move |id| id != c).skip(1)),
682 };
683 Ok(iter.take(limit).collect())
684 }
685
686 /// Fold over events in `stream_id` starting after `from_sequence`
687 /// (exclusive) without materialising the full `Vec<EventEnvelope>`.
688 ///
689 /// In-memory implementation: iterates the in-memory Vec slice. This is
690 /// O(N) memory but that is acceptable for the in-memory test store
691 /// (production code uses `SlateDbStore::fold_stream` which is cursor-based).
692 async fn fold_stream<T, F>(
693 &self,
694 stream_id: &StreamId,
695 from_sequence: u64,
696 initial: T,
697 mut f: F,
698 ) -> Result<T, EngineError>
699 where
700 T: Send,
701 F: FnMut(T, EventEnvelope) -> Result<T, EngineError> + Send,
702 {
703 let inner = self.inner.read().await;
704 let mut acc = initial;
705 if let Some(events) = inner.streams.get(stream_id) {
706 for env in events.iter().filter(|e| e.sequence_number > from_sequence) {
707 acc = f(acc, env.clone())?;
708 }
709 }
710 Ok(acc)
711 }
712}
713
714#[cfg(test)]
715mod tests {
716 use super::*;
717 use crate::ids::{ConversationId, CorrelationId, ProcessId, TenantId};
718 use crate::version::WorkflowId;
719
720 fn make_new_event() -> NewEvent {
721 NewEvent {
722 correlation_id: CorrelationId::new(),
723 causation_id: None,
724 conversation_id: ConversationId::new(),
725 process_id: ProcessId::new(),
726 tenant_id: TenantId::new(),
727 workflow_id: WorkflowId::new("test", "FV2024-10-01"),
728 event_type: "TestEvent".into(),
729 schema_version: 1,
730 payload: serde_json::json!({"test": true}),
731 }
732 }
733
734 #[tokio::test]
735 async fn append_and_load_roundtrip() {
736 let store = InMemoryEventStore::new();
737 let stream = StreamId::new("test/s1");
738
739 let result = store
740 .append(
741 &stream,
742 ExpectedVersion::NoStream,
743 &[make_new_event(), make_new_event()],
744 )
745 .await
746 .unwrap();
747
748 assert_eq!(result.events.len(), 2);
749 assert_eq!(result.events[0].sequence_number, 1);
750 assert_eq!(result.events[1].sequence_number, 2);
751 assert_eq!(result.last_sequence, 2);
752
753 let loaded = store.load(&stream).await.unwrap();
754 assert_eq!(loaded.len(), 2);
755 }
756
757 #[tokio::test]
758 async fn store_stamps_stream_id_and_sequence() {
759 let store = InMemoryEventStore::new();
760 let stream = StreamId::new("test/stamp");
761
762 let result = store
763 .append(&stream, ExpectedVersion::NoStream, &[make_new_event()])
764 .await
765 .unwrap();
766
767 let env = &result.events[0];
768 assert_eq!(env.stream_id, stream);
769 assert_eq!(env.sequence_number, 1);
770 }
771
772 #[tokio::test]
773 async fn version_conflict_is_detected() {
774 let store = InMemoryEventStore::new();
775 let stream = StreamId::new("test/s2");
776
777 store
778 .append(&stream, ExpectedVersion::NoStream, &[make_new_event()])
779 .await
780 .unwrap();
781
782 let err = store
783 .append(&stream, ExpectedVersion::NoStream, &[make_new_event()])
784 .await
785 .unwrap_err();
786
787 assert!(matches!(err, EngineError::VersionConflict { .. }));
788 }
789
790 #[tokio::test]
791 async fn load_from_returns_tail_only() {
792 let store = InMemoryEventStore::new();
793 let stream = StreamId::new("test/s3");
794 let events: Vec<_> = (0..5).map(|_| make_new_event()).collect();
795
796 store
797 .append(&stream, ExpectedVersion::NoStream, &events)
798 .await
799 .unwrap();
800
801 let tail = store.load_from(&stream, 3).await.unwrap();
802 assert_eq!(tail.len(), 2, "expected events 4 and 5");
803 assert_eq!(tail[0].sequence_number, 4);
804 assert_eq!(tail[1].sequence_number, 5);
805 }
806
807 #[tokio::test]
808 async fn all_events_preserves_insertion_order_across_streams() {
809 let store = InMemoryEventStore::new();
810 let s1 = StreamId::new("test/order-s1");
811 let s2 = StreamId::new("test/order-s2");
812
813 store
814 .append(&s1, ExpectedVersion::NoStream, &[make_new_event()])
815 .await
816 .unwrap();
817 store
818 .append(&s2, ExpectedVersion::NoStream, &[make_new_event()])
819 .await
820 .unwrap();
821 store
822 .append(&s1, ExpectedVersion::Exact(1), &[make_new_event()])
823 .await
824 .unwrap();
825
826 let all = store.all_events().await;
827 assert_eq!(all.len(), 3);
828 assert_eq!(all[0].stream_id, s1);
829 assert_eq!(all[1].stream_id, s2);
830 assert_eq!(all[2].stream_id, s1);
831 }
832
833 #[tokio::test]
834 async fn arc_wrapper_delegates_correctly() {
835 let store = Arc::new(InMemoryEventStore::new());
836 let stream = StreamId::new("test/arc-s1");
837
838 store
839 .append(&stream, ExpectedVersion::NoStream, &[make_new_event()])
840 .await
841 .unwrap();
842
843 let loaded = store.load(&stream).await.unwrap();
844 assert_eq!(loaded.len(), 1);
845 }
846
847 #[tokio::test]
848 async fn fold_stream_accumulates_without_full_vec() {
849 let store = InMemoryEventStore::new();
850 let stream = StreamId::new("test/fold-s1");
851 let events: Vec<_> = (0..4).map(|_| make_new_event()).collect();
852
853 store
854 .append(&stream, ExpectedVersion::NoStream, &events)
855 .await
856 .unwrap();
857
858 // Fold from the beginning: count events.
859 let count = store
860 .fold_stream(&stream, 0, 0usize, |acc, _| Ok(acc + 1))
861 .await
862 .unwrap();
863 assert_eq!(count, 4);
864
865 // Fold from sequence 2: count only tail events.
866 let tail_count = store
867 .fold_stream(&stream, 2, 0usize, |acc, _| Ok(acc + 1))
868 .await
869 .unwrap();
870 assert_eq!(tail_count, 2);
871 }
872}