Skip to main content

mako_engine/
event_store.rs

1//! [`EventStore`] trait and the in-process [`InMemoryEventStore`] implementation.
2//!
3//! The engine defines only the trait. The production implementation is
4//! [`SlateDbStore`][crate::store_slatedb::SlateDbStore], enabled by the `slatedb`
5//! feature flag. [`InMemoryEventStore`] is included here for tests, spikes, and
6//! development without external dependencies.
7
8use std::sync::Arc;
9
10#[cfg(any(test, feature = "testing"))]
11use std::collections::HashMap;
12#[cfg(any(test, feature = "testing"))]
13use time::OffsetDateTime;
14#[cfg(any(test, feature = "testing"))]
15use tokio::sync::RwLock;
16
17use crate::{
18    envelope::{EventEnvelope, NewEvent},
19    error::EngineError,
20    ids::StreamId,
21};
22
23// ── ExpectedVersion ───────────────────────────────────────────────────────────
24
25/// Optimistic concurrency control contract for [`EventStore::append`].
26///
27/// The caller declares which sequence number they expect the stream to be at.
28/// The store atomically checks this before writing; a mismatch means a
29/// concurrent writer modified the stream and the caller must reload and retry.
30#[derive(Debug, Clone, Copy, PartialEq, Eq)]
31pub enum ExpectedVersion {
32    /// The stream must not exist yet (sequence number 0).
33    NoStream,
34    /// The stream must be at exactly this sequence number.
35    Exact(u64),
36    /// Skip the concurrency check entirely.
37    ///
38    /// **Do not use in production workflow append paths.** `Any` silently
39    /// accepts any write regardless of concurrent modifications, which can
40    /// produce duplicate or interleaved events in a stream.
41    ///
42    /// Legitimate uses: [`MigrationRunner`] (bulk admin rewrites),
43    /// snapshot-accelerated store internals, and test scaffolding where the
44    /// caller owns all write access by construction.
45    ///
46    /// For normal workflow event appends always use [`ExpectedVersion::NoStream`]
47    /// (first event) or [`ExpectedVersion::Exact`] (subsequent events).
48    ///
49    /// [`MigrationRunner`]: crate::migration::MigrationRunner
50    Any,
51}
52
53// ── AppendResult ──────────────────────────────────────────────────────────────
54
55/// Metadata returned after a successful [`EventStore::append`].
56#[derive(Debug, Clone)]
57pub struct AppendResult {
58    /// The sequence number of the last event written in this batch.
59    pub last_sequence: u64,
60    /// The fully materialised envelopes as persisted by the store.
61    ///
62    /// Each envelope has its `event_id`, `sequence_number`, `stream_id`, and
63    /// `timestamp` stamped by the store. Callers use these for return values
64    /// and projection seeding without re-loading from storage.
65    pub events: Vec<EventEnvelope>,
66}
67
68// ── EventStore trait ──────────────────────────────────────────────────────────
69
70/// Append-only, ordered event stream storage contract.
71///
72/// ## Implementation requirements
73///
74/// - **Ordered**: events within a stream are always returned in append order.
75/// - **Atomic**: a multi-event append either fully succeeds or fully fails.
76/// - **Optimistic concurrency**: detect concurrent writers via
77///   [`ExpectedVersion`].
78/// - **Append-only**: events are never modified or deleted through this API.
79/// - **Sequence number ownership**: the store assigns `sequence_number`,
80///   `event_id`, `stream_id`, and `timestamp` on each appended envelope.
81///   Callers submit [`NewEvent`] values without these fields.
82///
83/// ## Blanket `Arc` implementation
84///
85/// `Arc<S>` implements `EventStore` whenever `S: EventStore`, so
86/// `Process<W, Arc<MyStore>>` works without any extra wrapper type.
87#[allow(async_fn_in_trait)] // RPIT-in-traits with Send bounds require AFIT; use #[allow] until
88// the ecosystem settles on a stable pattern for Rust 1.85 MSRV.
89pub trait EventStore: Send + Sync {
90    /// Atomically append `events` to `stream_id`.
91    ///
92    /// The store assigns `event_id`, `sequence_number`, `stream_id`, and
93    /// `timestamp` on each event. The fully materialised envelopes are
94    /// returned in [`AppendResult::events`].
95    ///
96    /// # Errors
97    ///
98    /// - [`EngineError::VersionConflict`] when `expected_version` is
99    ///   [`ExpectedVersion::NoStream`] or [`ExpectedVersion::Exact`] and the
100    ///   actual stream version does not match.
101    /// - [`EngineError::Store`] for underlying storage failures.
102    async fn append(
103        &self,
104        stream_id: &StreamId,
105        expected_version: ExpectedVersion,
106        events: &[NewEvent],
107    ) -> Result<AppendResult, EngineError>;
108
109    /// Load all events from `stream_id` in sequence order.
110    ///
111    /// Returns an empty `Vec` when the stream does not exist.
112    ///
113    /// # Errors
114    ///
115    /// Returns [`EngineError::Store`] for underlying storage failures.
116    async fn load(&self, stream_id: &StreamId) -> Result<Vec<EventEnvelope>, EngineError>;
117
118    /// Load events from `stream_id` starting after `from_sequence` (exclusive).
119    ///
120    /// Useful for incremental projection catch-up: pass the projection's last
121    /// processed sequence number to load only new events.
122    ///
123    /// Returns an empty `Vec` when no new events exist.
124    ///
125    /// # Errors
126    ///
127    /// Returns [`EngineError::Store`] for underlying storage failures.
128    async fn load_from(
129        &self,
130        stream_id: &StreamId,
131        from_sequence: u64,
132    ) -> Result<Vec<EventEnvelope>, EngineError>;
133
134    /// Return the current sequence number of `stream_id`.
135    ///
136    /// The sequence number equals the number of events in the stream (1-based
137    /// after the first append). Returns `0` when the stream does not exist.
138    ///
139    /// Use this instead of `load(…).await?.len()` when you only need the
140    /// count — backends can implement this as a cheap metadata query without
141    /// transferring event payloads.
142    ///
143    /// **Required.** There is no default implementation — a fallback that
144    /// loads all events defeats the O(1) metadata-query contract. Implementors
145    /// must read the stored sequence counter directly (e.g. a `sv/{stream_id}`
146    /// key in SlateDB) to avoid O(n) event-payload transfers.
147    ///
148    /// # Errors
149    ///
150    /// Returns [`EngineError::Store`] for underlying storage failures.
151    async fn stream_version(&self, stream_id: &StreamId) -> Result<u64, EngineError>;
152
153    /// Return all known stream identifiers in this store, optionally filtered
154    /// by `prefix`.
155    ///
156    /// When `prefix` is `Some("process/")`, only streams whose identifiers
157    /// start with `"process/"` are returned (e.g. all process-instance
158    /// streams). When `prefix` is `None`, all streams are returned.
159    ///
160    /// This is the primary enumeration API for multi-stream projections: the
161    /// caller discovers all relevant streams, then passes the list to
162    /// [`crate::projection::ProjectionRunner::run_all_streams`] /
163    /// [`crate::projection::ProjectionRunner::catch_up_all_streams`].
164    ///
165    /// The returned order is unspecified. Stable ordering can be achieved by
166    /// sorting the `Vec` before use if deterministic replay is required.
167    ///
168    /// **Required.** There is no default implementation — a missing override
169    /// silently returns no streams, causing multi-stream projections (e.g.
170    /// MABIS billing aggregations) to process zero streams and return empty
171    /// read models with no error signal.
172    ///
173    /// # Errors
174    ///
175    /// Returns [`EngineError::Store`] for underlying storage failures.
176    async fn list_streams(&self, prefix: Option<&str>) -> Result<Vec<StreamId>, EngineError>;
177
178    /// Paginated stream enumeration — equivalent to `list_streams` but returns
179    /// at most `limit` entries starting after `cursor` (exclusive, UTF-8
180    /// stream-ID order).
181    ///
182    /// # Parameters
183    ///
184    /// - `prefix` — optional key prefix to restrict the scan (same semantics as
185    ///   `list_streams`).
186    /// - `cursor` — if `Some(s)`, resume after stream ID `s`; `None` starts
187    ///   from the beginning.
188    /// - `limit` — maximum number of stream IDs to return per page.  A return
189    ///   count strictly less than `limit` indicates the last page.
190    ///
191    /// # Page iteration pattern
192    ///
193    /// ```rust,ignore
194    /// let mut cursor: Option<StreamId> = None;
195    /// loop {
196    ///     let page = store.list_streams_page(Some("process/"), cursor.as_ref(), 100).await?;
197    ///     let done = page.len() < 100;
198    ///     for id in &page { /* process */ }
199    ///     cursor = page.into_iter().last();
200    ///     if done { break; }
201    /// }
202    /// ```
203    ///
204    /// # Default implementation
205    ///
206    /// Falls back to `list_streams` + in-memory slicing for stores that do not
207    /// provide a native cursor scan.
208    ///
209    /// # ⚠️ Override required for production stores
210    ///
211    /// This default loads **all** matching stream IDs into memory on every call,
212    /// making `list_streams_page` loops O(n²) in total stream count.  Any
213    /// production `EventStore` implementation (e.g. PostgreSQL, CockroachDB)
214    /// **must** override this method with an efficient cursor-based scan.  The
215    /// SlateDB store already provides such an override.  Failure to override
216    /// this method will cause projection catch-up to degrade silently under
217    /// deployments with > 10,000 active process streams.
218    ///
219    /// # Errors
220    ///
221    /// Returns [`EngineError::Store`] for underlying storage failures.
222    async fn list_streams_page(
223        &self,
224        prefix: Option<&str>,
225        cursor: Option<&StreamId>,
226        limit: usize,
227    ) -> Result<Vec<StreamId>, EngineError> {
228        // Default: enumerate all + skip-after-cursor + take(limit).
229        let all = self.list_streams(prefix).await?;
230        let iter: Box<dyn Iterator<Item = StreamId>> = match cursor {
231            None => Box::new(all.into_iter()),
232            Some(c) => Box::new(all.into_iter().skip_while(move |id| id != c).skip(1)),
233        };
234        Ok(iter.take(limit).collect())
235    }
236
237    /// Fold over events in `stream_id` starting after `from_sequence`
238    /// (exclusive), accumulating state without materialising the full
239    /// `Vec<EventEnvelope>`.
240    ///
241    /// This is the memory-efficient alternative to `load_from` for large
242    /// streams. Instead of returning all events as a Vec, it applies `f` to
243    /// each event in order and returns the final accumulated value.
244    ///
245    /// `from_sequence = 0` folds from the beginning of the stream.
246    ///
247    /// ```rust,ignore
248    /// // Reconstruct process state event-by-event without a Vec allocation:
249    /// let state = store.fold_stream(
250    ///     &stream_id, 0, W::State::default(),
251    ///     |acc, env| Ok(acc.apply(env.event()))
252    /// ).await?;
253    /// ```
254    ///
255    /// **Required.** There is no default implementation — a fallback that
256    /// materialises `load_from(...)` into a `Vec` defeats the purpose of this
257    /// method for large MABIS billing streams (potentially thousands of events
258    /// per billing period). Implementors must provide a cursor-based scan for
259    /// constant-memory behaviour.
260    ///
261    /// # Errors
262    ///
263    /// Returns [`EngineError::Store`] for underlying storage failures.
264    /// Returns any error produced by `f`.
265    async fn fold_stream<T, F>(
266        &self,
267        stream_id: &StreamId,
268        from_sequence: u64,
269        initial: T,
270        f: F,
271    ) -> Result<T, EngineError>
272    where
273        T: Send,
274        F: FnMut(T, EventEnvelope) -> Result<T, EngineError> + Send;
275}
276
277// ── Arc<S> blanket impl ───────────────────────────────────────────────────────
278
279impl<S: EventStore> EventStore for Arc<S> {
280    async fn append(
281        &self,
282        stream_id: &StreamId,
283        expected_version: ExpectedVersion,
284        events: &[NewEvent],
285    ) -> Result<AppendResult, EngineError> {
286        self.as_ref()
287            .append(stream_id, expected_version, events)
288            .await
289    }
290
291    async fn load(&self, stream_id: &StreamId) -> Result<Vec<EventEnvelope>, EngineError> {
292        self.as_ref().load(stream_id).await
293    }
294
295    async fn load_from(
296        &self,
297        stream_id: &StreamId,
298        from_sequence: u64,
299    ) -> Result<Vec<EventEnvelope>, EngineError> {
300        self.as_ref().load_from(stream_id, from_sequence).await
301    }
302
303    async fn stream_version(&self, stream_id: &StreamId) -> Result<u64, EngineError> {
304        self.as_ref().stream_version(stream_id).await
305    }
306
307    async fn list_streams(&self, prefix: Option<&str>) -> Result<Vec<StreamId>, EngineError> {
308        self.as_ref().list_streams(prefix).await
309    }
310
311    async fn list_streams_page(
312        &self,
313        prefix: Option<&str>,
314        cursor: Option<&StreamId>,
315        limit: usize,
316    ) -> Result<Vec<StreamId>, EngineError> {
317        self.as_ref().list_streams_page(prefix, cursor, limit).await
318    }
319
320    async fn fold_stream<T, F>(
321        &self,
322        stream_id: &StreamId,
323        from_sequence: u64,
324        initial: T,
325        f: F,
326    ) -> Result<T, EngineError>
327    where
328        T: Send,
329        F: FnMut(T, EventEnvelope) -> Result<T, EngineError> + Send,
330    {
331        self.as_ref()
332            .fold_stream(stream_id, from_sequence, initial, f)
333            .await
334    }
335}
336
337// ── Arc<S>: AtomicAppend blanket impl ────────────────────────────────────────
338
339/// Blanket delegation so `Arc<S>` inherits the `AtomicAppend` contract from
340/// `S`.
341///
342/// This enables `Process<W, Arc<SlateDbStore>>` to call
343/// `execute_and_enqueue` and its retry/snapshot variants without requiring
344/// callers to unwrap the `Arc`.
345impl<S: AtomicAppend> AtomicAppend for Arc<S> {
346    async fn append_with_outbox(
347        &self,
348        stream_id: &StreamId,
349        expected_version: ExpectedVersion,
350        events: &[NewEvent],
351        outbox: &[crate::outbox::PendingOutbox],
352    ) -> Result<AppendResult, EngineError> {
353        self.as_ref()
354            .append_with_outbox(stream_id, expected_version, events, outbox)
355            .await
356    }
357
358    async fn append_with_outbox_and_deadlines(
359        &self,
360        stream_id: &StreamId,
361        expected_version: ExpectedVersion,
362        events: &[NewEvent],
363        outbox: &[crate::outbox::PendingOutbox],
364        deadlines: &[crate::deadline::Deadline],
365    ) -> Result<AppendResult, EngineError> {
366        self.as_ref()
367            .append_with_outbox_and_deadlines(
368                stream_id,
369                expected_version,
370                events,
371                outbox,
372                deadlines,
373            )
374            .await
375    }
376}
377
378// ── AtomicAppend trait ────────────────────────────────────────────────────────
379
380/// Extension of [`EventStore`] that atomically appends events **and** enqueues
381/// outbox messages in a single write operation.
382///
383/// Implementations must guarantee that either both the events and the outbox
384/// messages are persisted, or neither is — even across process crashes. For
385/// SlateDB this is achieved via a single [`WriteBatch`].
386///
387/// # Why a separate trait?
388///
389/// Not every [`EventStore`] backend supports atomic dual-writes (e.g. an
390/// in-memory test store). Keeping atomicity in a separate trait allows
391/// `Process::execute` to work against any `EventStore`, while
392/// `Process::execute_and_enqueue` requires the stronger `AtomicAppend` bound.
393///
394/// # Safety
395///
396/// Only call `append_with_outbox` from the engine's `execute_and_enqueue`
397/// path. Never write events first and outbox messages second — a crash
398/// between the two produces a silent lost APERAK.
399///
400/// [`WriteBatch`]: slatedb::WriteBatch
401#[allow(async_fn_in_trait)]
402pub trait AtomicAppend: EventStore {
403    /// Atomically append `events` to `stream_id` and schedule `outbox` messages.
404    ///
405    /// The `outbox` slice carries lightweight [`crate::outbox::PendingOutbox`]
406    /// values produced by [`Workflow::handle`]. The implementation is
407    /// responsible for materialising them into fully-typed
408    /// [`crate::outbox::OutboxMessage`] values using the store-assigned fields
409    /// of the stamped envelopes (e.g. `event_id` as `causation_event_id`).
410    ///
411    /// When `outbox` is empty, this degenerates to a plain `EventStore::append`.
412    ///
413    /// # Errors
414    ///
415    /// - [`EngineError::VersionConflict`] — optimistic concurrency check
416    ///   failed; reload state and retry.
417    /// - [`EngineError::Store`] or [`EngineError::Outbox`] — storage failure.
418    ///
419    /// [`Workflow::handle`]: crate::workflow::Workflow::handle
420    async fn append_with_outbox(
421        &self,
422        stream_id: &StreamId,
423        expected_version: ExpectedVersion,
424        events: &[NewEvent],
425        outbox: &[crate::outbox::PendingOutbox],
426    ) -> Result<AppendResult, EngineError>;
427
428    /// Atomically append `events`, schedule `outbox` messages, **and** register
429    /// `deadlines` in a single write operation.
430    ///
431    /// Stronger guarantee than calling [`append_with_outbox`] followed by
432    /// [`DeadlineStore::register`]: either all three sets of writes land or
433    /// none do. This eliminates the non-atomic window where a process event is
434    /// persisted but its regulatory deadline is lost (e.g. on a crash between
435    /// the two calls).
436    ///
437    /// # Default implementation
438    ///
439    /// The default falls back to [`append_with_outbox`] only — **deadlines are
440    /// not persisted**. Override this in [`AtomicAppend`] implementations that
441    /// include a deadline store in the same underlying database (e.g.
442    /// [`SlateDbStore`]) to achieve full atomicity.
443    ///
444    /// Callers using the default must register deadlines separately via
445    /// [`DeadlineStore::register`] after this returns.
446    ///
447    /// # Errors
448    ///
449    /// Same as [`append_with_outbox`].
450    ///
451    /// [`append_with_outbox`]: AtomicAppend::append_with_outbox
452    /// [`DeadlineStore::register`]: crate::deadline::DeadlineStore::register
453    /// [`SlateDbStore`]: crate::store_slatedb::SlateDbStore
454    async fn append_with_outbox_and_deadlines(
455        &self,
456        stream_id: &StreamId,
457        expected_version: ExpectedVersion,
458        events: &[NewEvent],
459        outbox: &[crate::outbox::PendingOutbox],
460        _deadlines: &[crate::deadline::Deadline],
461    ) -> Result<AppendResult, EngineError> {
462        // Default: non-atomic fallback — deadlines must be registered separately.
463        self.append_with_outbox(stream_id, expected_version, events, outbox)
464            .await
465    }
466}
467
468// ── InMemoryEventStore ────────────────────────────────────────────────────────
469
470/// Internal state held behind the `Arc<Mutex<…>>`.
471#[cfg(any(test, feature = "testing"))]
472#[derive(Debug, Default)]
473struct InMemoryState {
474    /// Per-stream ordered event log (sequence-number indexed).
475    streams: HashMap<StreamId, Vec<EventEnvelope>>,
476    /// Global insertion-order log across all streams.
477    ///
478    /// This is the source for [`InMemoryEventStore::all_events`]. Keeping a
479    /// separate flat list avoids collecting + sorting across stream-local
480    /// sequence namespaces, which would produce an arbitrary ordering when
481    /// multiple streams are active.
482    global: Vec<EventEnvelope>,
483}
484
485/// A fully in-memory [`EventStore`] for testing and development.
486///
487/// Backed by two logs protected by a `RwLock`:
488/// - A per-stream `HashMap` for sequence-ordered access.
489/// - A global flat `Vec` that preserves cross-stream insertion order.
490///
491/// The store assigns `event_id`, `sequence_number`, `stream_id`, and
492/// `timestamp` to each appended event — callers submit [`NewEvent`] values.
493///
494/// Cloning the store shares the underlying data via `Arc` — all clones see
495/// the same events.
496///
497/// **Not suitable for production.** Use this for:
498/// - Unit and integration tests
499/// - Spikes and local development
500/// - CI environments that must not depend on external services
501///
502/// Only available in `#[cfg(test)]` or with the `testing` feature enabled.
503#[cfg(any(test, feature = "testing"))]
504#[derive(Debug, Default, Clone)]
505pub struct InMemoryEventStore {
506    inner: Arc<RwLock<InMemoryState>>,
507}
508
509#[cfg(any(test, feature = "testing"))]
510impl InMemoryEventStore {
511    /// Create an empty store.
512    #[must_use]
513    pub fn new() -> Self {
514        Self::default()
515    }
516
517    /// Return all events across all streams in insertion order.
518    ///
519    /// Because sequence numbers are stream-local, this method uses the
520    /// insertion-order global log rather than sorting by sequence number.
521    ///
522    /// **Test/development use only.** Production code should use
523    /// [`EventStore::load`] or [`EventStore::load_from`] to read specific
524    /// streams. Loading all events at once from a production store can OOM
525    /// the process.
526    ///
527    /// Available only when the `testing` feature is enabled or in `cfg(test)`.
528    #[cfg(any(test, feature = "testing"))]
529    #[must_use]
530    pub async fn all_events(&self) -> Vec<EventEnvelope> {
531        self.inner.read().await.global.clone()
532    }
533
534    /// Return all events for a specific stream in sequence order.
535    ///
536    /// **Test/development use only.** In production code, prefer
537    /// [`EventStore::load`] which is part of the trait contract.
538    ///
539    /// Available only when the `testing` feature is enabled or in `cfg(test)`.
540    #[cfg(any(test, feature = "testing"))]
541    #[must_use]
542    pub async fn events_for(&self, stream_id: &StreamId) -> Vec<EventEnvelope> {
543        self.inner
544            .read()
545            .await
546            .streams
547            .get(stream_id)
548            .cloned()
549            .unwrap_or_default()
550    }
551}
552
553#[cfg(any(test, feature = "testing"))]
554impl EventStore for InMemoryEventStore {
555    async fn append(
556        &self,
557        stream_id: &StreamId,
558        expected_version: ExpectedVersion,
559        new_events: &[NewEvent],
560    ) -> Result<AppendResult, EngineError> {
561        let mut inner = self.inner.write().await;
562
563        let current = inner.streams.get(stream_id).map_or(0, |s| s.len() as u64);
564
565        // Optimistic concurrency check.
566        match expected_version {
567            ExpectedVersion::NoStream => {
568                if current != 0 {
569                    return Err(EngineError::VersionConflict {
570                        expected: 0,
571                        actual: current,
572                    });
573                }
574            }
575            ExpectedVersion::Exact(v) => {
576                if current != v {
577                    return Err(EngineError::VersionConflict {
578                        expected: v,
579                        actual: current,
580                    });
581                }
582            }
583            ExpectedVersion::Any => {}
584        }
585
586        // Stamp each NewEvent with store-assigned fields.
587        let now = OffsetDateTime::now_utc();
588        let envelopes: Vec<EventEnvelope> = new_events
589            .iter()
590            .enumerate()
591            .map(|(i, new)| {
592                EventEnvelope::from_new(new.clone(), stream_id.clone(), current + i as u64 + 1, now)
593            })
594            .collect();
595
596        // Append to the per-stream log.
597        inner
598            .streams
599            .entry(stream_id.clone())
600            .or_default()
601            .extend_from_slice(&envelopes);
602
603        // Append to the global insertion-order log.
604        inner.global.extend_from_slice(&envelopes);
605
606        Ok(AppendResult {
607            last_sequence: current + new_events.len() as u64,
608            events: envelopes,
609        })
610    }
611
612    async fn load(&self, stream_id: &StreamId) -> Result<Vec<EventEnvelope>, EngineError> {
613        let inner = self.inner.read().await;
614        Ok(inner.streams.get(stream_id).cloned().unwrap_or_default())
615    }
616
617    async fn load_from(
618        &self,
619        stream_id: &StreamId,
620        from_sequence: u64,
621    ) -> Result<Vec<EventEnvelope>, EngineError> {
622        let inner = self.inner.read().await;
623        Ok(inner
624            .streams
625            .get(stream_id)
626            .map(|events| {
627                events
628                    .iter()
629                    .filter(|e| e.sequence_number > from_sequence)
630                    .cloned()
631                    .collect()
632            })
633            .unwrap_or_default())
634    }
635
636    /// O(1) version check — reads the stream length from the HashMap without
637    /// cloning any event payloads.
638    async fn stream_version(&self, stream_id: &StreamId) -> Result<u64, EngineError> {
639        let inner = self.inner.read().await;
640        Ok(inner.streams.get(stream_id).map_or(0, |s| s.len() as u64))
641    }
642
643    /// Returns all known stream identifiers, optionally filtered by `prefix`.
644    ///
645    /// O(n) in the number of streams — scans the HashMap keys once.
646    async fn list_streams(&self, prefix: Option<&str>) -> Result<Vec<StreamId>, EngineError> {
647        let inner = self.inner.read().await;
648        let ids = inner
649            .streams
650            .keys()
651            .filter(|id| prefix.is_none_or(|p| id.as_str().starts_with(p)))
652            .cloned()
653            .collect();
654        Ok(ids)
655    }
656
657    /// Paginated stream enumeration for `InMemoryEventStore`.
658    ///
659    /// Collects all matching keys, sorts them for deterministic order, then
660    /// applies cursor + limit slicing.  O(n) — acceptable for test/dev stores.
661    async fn list_streams_page(
662        &self,
663        prefix: Option<&str>,
664        cursor: Option<&StreamId>,
665        limit: usize,
666    ) -> Result<Vec<StreamId>, EngineError> {
667        if limit == 0 {
668            return Ok(Vec::new());
669        }
670        let inner = self.inner.read().await;
671        let mut ids: Vec<StreamId> = inner
672            .streams
673            .keys()
674            .filter(|id| prefix.is_none_or(|p| id.as_str().starts_with(p)))
675            .cloned()
676            .collect();
677        // Sort for deterministic pagination order (HashMap is unordered).
678        ids.sort_unstable_by(|a, b| a.as_str().cmp(b.as_str()));
679        let iter: Box<dyn Iterator<Item = StreamId>> = match cursor {
680            None => Box::new(ids.into_iter()),
681            Some(c) => Box::new(ids.into_iter().skip_while(move |id| id != c).skip(1)),
682        };
683        Ok(iter.take(limit).collect())
684    }
685
686    /// Fold over events in `stream_id` starting after `from_sequence`
687    /// (exclusive) without materialising the full `Vec<EventEnvelope>`.
688    ///
689    /// In-memory implementation: iterates the in-memory Vec slice.  This is
690    /// O(N) memory but that is acceptable for the in-memory test store
691    /// (production code uses `SlateDbStore::fold_stream` which is cursor-based).
692    async fn fold_stream<T, F>(
693        &self,
694        stream_id: &StreamId,
695        from_sequence: u64,
696        initial: T,
697        mut f: F,
698    ) -> Result<T, EngineError>
699    where
700        T: Send,
701        F: FnMut(T, EventEnvelope) -> Result<T, EngineError> + Send,
702    {
703        let inner = self.inner.read().await;
704        let mut acc = initial;
705        if let Some(events) = inner.streams.get(stream_id) {
706            for env in events.iter().filter(|e| e.sequence_number > from_sequence) {
707                acc = f(acc, env.clone())?;
708            }
709        }
710        Ok(acc)
711    }
712}
713
714#[cfg(test)]
715mod tests {
716    use super::*;
717    use crate::ids::{ConversationId, CorrelationId, ProcessId, TenantId};
718    use crate::version::WorkflowId;
719
720    fn make_new_event() -> NewEvent {
721        NewEvent {
722            correlation_id: CorrelationId::new(),
723            causation_id: None,
724            conversation_id: ConversationId::new(),
725            process_id: ProcessId::new(),
726            tenant_id: TenantId::new(),
727            workflow_id: WorkflowId::new("test", "FV2024-10-01"),
728            event_type: "TestEvent".into(),
729            schema_version: 1,
730            payload: serde_json::json!({"test": true}),
731        }
732    }
733
734    #[tokio::test]
735    async fn append_and_load_roundtrip() {
736        let store = InMemoryEventStore::new();
737        let stream = StreamId::new("test/s1");
738
739        let result = store
740            .append(
741                &stream,
742                ExpectedVersion::NoStream,
743                &[make_new_event(), make_new_event()],
744            )
745            .await
746            .unwrap();
747
748        assert_eq!(result.events.len(), 2);
749        assert_eq!(result.events[0].sequence_number, 1);
750        assert_eq!(result.events[1].sequence_number, 2);
751        assert_eq!(result.last_sequence, 2);
752
753        let loaded = store.load(&stream).await.unwrap();
754        assert_eq!(loaded.len(), 2);
755    }
756
757    #[tokio::test]
758    async fn store_stamps_stream_id_and_sequence() {
759        let store = InMemoryEventStore::new();
760        let stream = StreamId::new("test/stamp");
761
762        let result = store
763            .append(&stream, ExpectedVersion::NoStream, &[make_new_event()])
764            .await
765            .unwrap();
766
767        let env = &result.events[0];
768        assert_eq!(env.stream_id, stream);
769        assert_eq!(env.sequence_number, 1);
770    }
771
772    #[tokio::test]
773    async fn version_conflict_is_detected() {
774        let store = InMemoryEventStore::new();
775        let stream = StreamId::new("test/s2");
776
777        store
778            .append(&stream, ExpectedVersion::NoStream, &[make_new_event()])
779            .await
780            .unwrap();
781
782        let err = store
783            .append(&stream, ExpectedVersion::NoStream, &[make_new_event()])
784            .await
785            .unwrap_err();
786
787        assert!(matches!(err, EngineError::VersionConflict { .. }));
788    }
789
790    #[tokio::test]
791    async fn load_from_returns_tail_only() {
792        let store = InMemoryEventStore::new();
793        let stream = StreamId::new("test/s3");
794        let events: Vec<_> = (0..5).map(|_| make_new_event()).collect();
795
796        store
797            .append(&stream, ExpectedVersion::NoStream, &events)
798            .await
799            .unwrap();
800
801        let tail = store.load_from(&stream, 3).await.unwrap();
802        assert_eq!(tail.len(), 2, "expected events 4 and 5");
803        assert_eq!(tail[0].sequence_number, 4);
804        assert_eq!(tail[1].sequence_number, 5);
805    }
806
807    #[tokio::test]
808    async fn all_events_preserves_insertion_order_across_streams() {
809        let store = InMemoryEventStore::new();
810        let s1 = StreamId::new("test/order-s1");
811        let s2 = StreamId::new("test/order-s2");
812
813        store
814            .append(&s1, ExpectedVersion::NoStream, &[make_new_event()])
815            .await
816            .unwrap();
817        store
818            .append(&s2, ExpectedVersion::NoStream, &[make_new_event()])
819            .await
820            .unwrap();
821        store
822            .append(&s1, ExpectedVersion::Exact(1), &[make_new_event()])
823            .await
824            .unwrap();
825
826        let all = store.all_events().await;
827        assert_eq!(all.len(), 3);
828        assert_eq!(all[0].stream_id, s1);
829        assert_eq!(all[1].stream_id, s2);
830        assert_eq!(all[2].stream_id, s1);
831    }
832
833    #[tokio::test]
834    async fn arc_wrapper_delegates_correctly() {
835        let store = Arc::new(InMemoryEventStore::new());
836        let stream = StreamId::new("test/arc-s1");
837
838        store
839            .append(&stream, ExpectedVersion::NoStream, &[make_new_event()])
840            .await
841            .unwrap();
842
843        let loaded = store.load(&stream).await.unwrap();
844        assert_eq!(loaded.len(), 1);
845    }
846
847    #[tokio::test]
848    async fn fold_stream_accumulates_without_full_vec() {
849        let store = InMemoryEventStore::new();
850        let stream = StreamId::new("test/fold-s1");
851        let events: Vec<_> = (0..4).map(|_| make_new_event()).collect();
852
853        store
854            .append(&stream, ExpectedVersion::NoStream, &events)
855            .await
856            .unwrap();
857
858        // Fold from the beginning: count events.
859        let count = store
860            .fold_stream(&stream, 0, 0usize, |acc, _| Ok(acc + 1))
861            .await
862            .unwrap();
863        assert_eq!(count, 4);
864
865        // Fold from sequence 2: count only tail events.
866        let tail_count = store
867            .fold_stream(&stream, 2, 0usize, |acc, _| Ok(acc + 1))
868            .await
869            .unwrap();
870        assert_eq!(tail_count, 2);
871    }
872}