Skip to main content

mako_engine/
event_store.rs

1//! [`EventStore`] trait and the in-process [`InMemoryEventStore`] implementation.
2//!
3//! The engine defines only the trait. The production implementation is
4//! [`SlateDbStore`][crate::store_slatedb::SlateDbStore], enabled by the `slatedb`
5//! feature flag. [`InMemoryEventStore`] is included here for tests, spikes, and
6//! development without external dependencies.
7
8use std::sync::Arc;
9
10#[cfg(any(test, feature = "testing"))]
11use std::collections::HashMap;
12#[cfg(any(test, feature = "testing"))]
13use time::OffsetDateTime;
14#[cfg(any(test, feature = "testing"))]
15use tokio::sync::RwLock;
16
17use crate::{
18    envelope::{EventEnvelope, NewEvent},
19    error::EngineError,
20    ids::StreamId,
21};
22
23// ── ExpectedVersion ───────────────────────────────────────────────────────────
24
25/// Optimistic concurrency control contract for [`EventStore::append`].
26///
27/// The caller declares which sequence number they expect the stream to be at.
28/// The store atomically checks this before writing; a mismatch means a
29/// concurrent writer modified the stream and the caller must reload and retry.
30#[derive(Debug, Clone, Copy, PartialEq, Eq)]
31pub enum ExpectedVersion {
32    /// The stream must not exist yet (sequence number 0).
33    NoStream,
34    /// The stream must be at exactly this sequence number.
35    Exact(u64),
36    /// Skip the concurrency check entirely.
37    ///
38    /// **Do not use in production workflow append paths.** `Any` silently
39    /// accepts any write regardless of concurrent modifications, which can
40    /// produce duplicate or interleaved events in a stream.
41    ///
42    /// Legitimate uses: [`MigrationRunner`] (bulk admin rewrites),
43    /// snapshot-accelerated store internals, and test scaffolding where the
44    /// caller owns all write access by construction.
45    ///
46    /// For normal workflow event appends always use [`ExpectedVersion::NoStream`]
47    /// (first event) or [`ExpectedVersion::Exact`] (subsequent events).
48    ///
49    /// [`MigrationRunner`]: crate::migration::MigrationRunner
50    Any,
51}
52
53// ── AppendResult ──────────────────────────────────────────────────────────────
54
55/// Metadata returned after a successful [`EventStore::append`].
56#[derive(Debug, Clone)]
57pub struct AppendResult {
58    /// The sequence number of the last event written in this batch.
59    pub last_sequence: u64,
60    /// The fully materialised envelopes as persisted by the store.
61    ///
62    /// Each envelope has its `event_id`, `sequence_number`, `stream_id`, and
63    /// `timestamp` stamped by the store. Callers use these for return values
64    /// and projection seeding without re-loading from storage.
65    pub events: Vec<EventEnvelope>,
66}
67
68// ── EventStore trait ──────────────────────────────────────────────────────────
69
70/// Append-only, ordered event stream storage contract.
71///
72/// ## Implementation requirements
73///
74/// - **Ordered**: events within a stream are always returned in append order.
75/// - **Atomic**: a multi-event append either fully succeeds or fully fails.
76/// - **Optimistic concurrency**: detect concurrent writers via
77///   [`ExpectedVersion`].
78/// - **Append-only**: events are never modified or deleted through this API.
79/// - **Sequence number ownership**: the store assigns `sequence_number`,
80///   `event_id`, `stream_id`, and `timestamp` on each appended envelope.
81///   Callers submit [`NewEvent`] values without these fields.
82///
83/// ## Blanket `Arc` implementation
84///
85/// `Arc<S>` implements `EventStore` whenever `S: EventStore`, so
86/// `Process<W, Arc<MyStore>>` works without any extra wrapper type.
87#[allow(async_fn_in_trait)] // RPIT-in-traits with Send bounds require AFIT; use #[allow] until
88// the ecosystem settles on a stable pattern for Rust 1.85 MSRV.
89pub trait EventStore: Send + Sync {
90    /// Atomically append `events` to `stream_id`.
91    ///
92    /// The store assigns `event_id`, `sequence_number`, `stream_id`, and
93    /// `timestamp` on each event. The fully materialised envelopes are
94    /// returned in [`AppendResult::events`].
95    ///
96    /// # Errors
97    ///
98    /// - [`EngineError::VersionConflict`] when `expected_version` is
99    ///   [`ExpectedVersion::NoStream`] or [`ExpectedVersion::Exact`] and the
100    ///   actual stream version does not match.
101    /// - [`EngineError::Store`] for underlying storage failures.
102    async fn append(
103        &self,
104        stream_id: &StreamId,
105        expected_version: ExpectedVersion,
106        events: &[NewEvent],
107    ) -> Result<AppendResult, EngineError>;
108
109    /// Load all events from `stream_id` in sequence order.
110    ///
111    /// Returns an empty `Vec` when the stream does not exist.
112    ///
113    /// # Errors
114    ///
115    /// Returns [`EngineError::Store`] for underlying storage failures.
116    async fn load(&self, stream_id: &StreamId) -> Result<Vec<EventEnvelope>, EngineError>;
117
118    /// Load events from `stream_id` starting after `from_sequence` (exclusive).
119    ///
120    /// Useful for incremental projection catch-up: pass the projection's last
121    /// processed sequence number to load only new events.
122    ///
123    /// Returns an empty `Vec` when no new events exist.
124    ///
125    /// # Errors
126    ///
127    /// Returns [`EngineError::Store`] for underlying storage failures.
128    async fn load_from(
129        &self,
130        stream_id: &StreamId,
131        from_sequence: u64,
132    ) -> Result<Vec<EventEnvelope>, EngineError>;
133
134    /// Return the current sequence number of `stream_id`.
135    ///
136    /// The sequence number equals the number of events in the stream (1-based
137    /// after the first append). Returns `0` when the stream does not exist.
138    ///
139    /// Use this instead of `load(…).await?.len()` when you only need the
140    /// count — backends can implement this as a cheap metadata query without
141    /// transferring event payloads.
142    ///
143    /// **Required.** There is no default implementation — a fallback that
144    /// loads all events defeats the O(1) metadata-query contract. Implementors
145    /// must read the stored sequence counter directly (e.g. a `sv/{stream_id}`
146    /// key in SlateDB) to avoid O(n) event-payload transfers.
147    ///
148    /// # Errors
149    ///
150    /// Returns [`EngineError::Store`] for underlying storage failures.
151    async fn stream_version(&self, stream_id: &StreamId) -> Result<u64, EngineError>;
152
153    /// Return all known stream identifiers in this store, optionally filtered
154    /// by `prefix`.
155    ///
156    /// When `prefix` is `Some("process/")`, only streams whose identifiers
157    /// start with `"process/"` are returned (e.g. all process-instance
158    /// streams). When `prefix` is `None`, all streams are returned.
159    ///
160    /// This is the primary enumeration API for multi-stream projections: the
161    /// caller discovers all relevant streams, then passes the list to
162    /// [`crate::projection::ProjectionRunner::run_all_streams`] /
163    /// [`crate::projection::ProjectionRunner::catch_up_all_streams`].
164    ///
165    /// The returned order is unspecified. Stable ordering can be achieved by
166    /// sorting the `Vec` before use if deterministic replay is required.
167    ///
168    /// **Required.** There is no default implementation — a missing override
169    /// silently returns no streams, causing multi-stream projections (e.g.
170    /// MABIS billing aggregations) to process zero streams and return empty
171    /// read models with no error signal.
172    ///
173    /// # Errors
174    ///
175    /// Returns [`EngineError::Store`] for underlying storage failures.
176    async fn list_streams(&self, prefix: Option<&str>) -> Result<Vec<StreamId>, EngineError>;
177
178    /// Paginated stream enumeration — equivalent to `list_streams` but returns
179    /// at most `limit` entries starting after `cursor` (exclusive, UTF-8
180    /// stream-ID order).
181    ///
182    /// # Parameters
183    ///
184    /// - `prefix` — optional key prefix to restrict the scan (same semantics as
185    ///   `list_streams`).
186    /// - `cursor` — if `Some(s)`, resume after stream ID `s`; `None` starts
187    ///   from the beginning.
188    /// - `limit` — maximum number of stream IDs to return per page.  A return
189    ///   count strictly less than `limit` indicates the last page.
190    ///
191    /// # Page iteration pattern
192    ///
193    /// ```rust,ignore
194    /// let mut cursor: Option<StreamId> = None;
195    /// loop {
196    ///     let page = store.list_streams_page(Some("process/"), cursor.as_ref(), 100).await?;
197    ///     let done = page.len() < 100;
198    ///     for id in &page { /* process */ }
199    ///     cursor = page.into_iter().last();
200    ///     if done { break; }
201    /// }
202    /// ```
203    ///
204    /// # Default implementation
205    ///
206    /// Falls back to `list_streams` + in-memory slicing for stores that do not
207    /// provide a native cursor scan.
208    ///
209    /// # ⚠️ Override required for production stores
210    ///
211    /// This default loads **all** matching stream IDs into memory on every call,
212    /// making `list_streams_page` loops O(n²) in total stream count.  Any
213    /// production `EventStore` implementation (e.g. PostgreSQL, CockroachDB)
214    /// **must** override this method with an efficient cursor-based scan.  The
215    /// SlateDB store already provides such an override.  Failure to override
216    /// this method will cause projection catch-up to degrade silently under
217    /// deployments with > 10,000 active process streams.
218    ///
219    /// # Errors
220    ///
221    /// Returns [`EngineError::Store`] for underlying storage failures.
222    async fn list_streams_page(
223        &self,
224        prefix: Option<&str>,
225        cursor: Option<&StreamId>,
226        limit: usize,
227    ) -> Result<Vec<StreamId>, EngineError> {
228        // Default: enumerate all + skip-after-cursor + take(limit).
229        let all = self.list_streams(prefix).await?;
230        let iter: Box<dyn Iterator<Item = StreamId>> = match cursor {
231            None => Box::new(all.into_iter()),
232            Some(c) => Box::new(all.into_iter().skip_while(move |id| id != c).skip(1)),
233        };
234        Ok(iter.take(limit).collect())
235    }
236
237    /// Fold over events in `stream_id` starting after `from_sequence`
238    /// (exclusive), accumulating state without materialising the full
239    /// `Vec<EventEnvelope>`.
240    ///
241    /// This is the memory-efficient alternative to `load_from` for large
242    /// streams. Instead of returning all events as a Vec, it applies `f` to
243    /// each event in order and returns the final accumulated value.
244    ///
245    /// `from_sequence = 0` folds from the beginning of the stream.
246    ///
247    /// ```rust,ignore
248    /// // Reconstruct process state event-by-event without a Vec allocation:
249    /// let state = store.fold_stream(
250    ///     &stream_id, 0, W::State::default(),
251    ///     |acc, env| Ok(acc.apply(env.event()))
252    /// ).await?;
253    /// ```
254    ///
255    /// **Required.** There is no default implementation — a fallback that
256    /// materialises `load_from(...)` into a `Vec` defeats the purpose of this
257    /// method for large MABIS billing streams (potentially thousands of events
258    /// per billing period). Implementors must provide a cursor-based scan for
259    /// constant-memory behaviour.
260    ///
261    /// # Errors
262    ///
263    /// Returns [`EngineError::Store`] for underlying storage failures.
264    /// Returns any error produced by `f`.
265    async fn fold_stream<T, F>(
266        &self,
267        stream_id: &StreamId,
268        from_sequence: u64,
269        initial: T,
270        f: F,
271    ) -> Result<T, EngineError>
272    where
273        T: Send,
274        F: FnMut(T, EventEnvelope) -> Result<T, EngineError> + Send;
275}
276
277// ── Arc<S> blanket impl ───────────────────────────────────────────────────────
278
279impl<S: EventStore> EventStore for Arc<S> {
280    async fn append(
281        &self,
282        stream_id: &StreamId,
283        expected_version: ExpectedVersion,
284        events: &[NewEvent],
285    ) -> Result<AppendResult, EngineError> {
286        self.as_ref()
287            .append(stream_id, expected_version, events)
288            .await
289    }
290
291    async fn load(&self, stream_id: &StreamId) -> Result<Vec<EventEnvelope>, EngineError> {
292        self.as_ref().load(stream_id).await
293    }
294
295    async fn load_from(
296        &self,
297        stream_id: &StreamId,
298        from_sequence: u64,
299    ) -> Result<Vec<EventEnvelope>, EngineError> {
300        self.as_ref().load_from(stream_id, from_sequence).await
301    }
302
303    async fn stream_version(&self, stream_id: &StreamId) -> Result<u64, EngineError> {
304        self.as_ref().stream_version(stream_id).await
305    }
306
307    async fn list_streams(&self, prefix: Option<&str>) -> Result<Vec<StreamId>, EngineError> {
308        self.as_ref().list_streams(prefix).await
309    }
310
311    async fn list_streams_page(
312        &self,
313        prefix: Option<&str>,
314        cursor: Option<&StreamId>,
315        limit: usize,
316    ) -> Result<Vec<StreamId>, EngineError> {
317        self.as_ref().list_streams_page(prefix, cursor, limit).await
318    }
319
320    async fn fold_stream<T, F>(
321        &self,
322        stream_id: &StreamId,
323        from_sequence: u64,
324        initial: T,
325        f: F,
326    ) -> Result<T, EngineError>
327    where
328        T: Send,
329        F: FnMut(T, EventEnvelope) -> Result<T, EngineError> + Send,
330    {
331        self.as_ref()
332            .fold_stream(stream_id, from_sequence, initial, f)
333            .await
334    }
335}
336
337// ── Arc<S>: AtomicAppend blanket impl ────────────────────────────────────────
338
339/// Blanket delegation so `Arc<S>` inherits the `AtomicAppend` contract from
340/// `S`.
341///
342/// This enables `Process<W, Arc<SlateDbStore>>` to call
343/// `execute_and_enqueue` and its retry/snapshot variants without requiring
344/// callers to unwrap the `Arc`.
345impl<S: AtomicAppend> AtomicAppend for Arc<S> {
346    async fn append_with_outbox(
347        &self,
348        stream_id: &StreamId,
349        expected_version: ExpectedVersion,
350        events: &[NewEvent],
351        outbox: &[crate::outbox::PendingOutbox],
352    ) -> Result<AppendResult, EngineError> {
353        self.as_ref()
354            .append_with_outbox(stream_id, expected_version, events, outbox)
355            .await
356    }
357}
358
359// ── AtomicAppend trait ────────────────────────────────────────────────────────
360
361/// Extension of [`EventStore`] that atomically appends events **and** enqueues
362/// outbox messages in a single write operation.
363///
364/// Implementations must guarantee that either both the events and the outbox
365/// messages are persisted, or neither is — even across process crashes. For
366/// SlateDB this is achieved via a single [`WriteBatch`].
367///
368/// # Why a separate trait?
369///
370/// Not every [`EventStore`] backend supports atomic dual-writes (e.g. an
371/// in-memory test store). Keeping atomicity in a separate trait allows
372/// `Process::execute` to work against any `EventStore`, while
373/// `Process::execute_and_enqueue` requires the stronger `AtomicAppend` bound.
374///
375/// # Safety
376///
377/// Only call `append_with_outbox` from the engine's `execute_and_enqueue`
378/// path. Never write events first and outbox messages second — a crash
379/// between the two produces a silent lost APERAK.
380///
381/// [`WriteBatch`]: slatedb::WriteBatch
382#[allow(async_fn_in_trait)]
383pub trait AtomicAppend: EventStore {
384    /// Atomically append `events` to `stream_id` and schedule `outbox` messages.
385    ///
386    /// The `outbox` slice carries lightweight [`crate::outbox::PendingOutbox`]
387    /// values produced by [`Workflow::handle`]. The implementation is
388    /// responsible for materialising them into fully-typed
389    /// [`crate::outbox::OutboxMessage`] values using the store-assigned fields
390    /// of the stamped envelopes (e.g. `event_id` as `causation_event_id`).
391    ///
392    /// When `outbox` is empty, this degenerates to a plain `EventStore::append`.
393    ///
394    /// # Errors
395    ///
396    /// - [`EngineError::VersionConflict`] — optimistic concurrency check
397    ///   failed; reload state and retry.
398    /// - [`EngineError::Store`] or [`EngineError::Outbox`] — storage failure.
399    ///
400    /// [`Workflow::handle`]: crate::workflow::Workflow::handle
401    async fn append_with_outbox(
402        &self,
403        stream_id: &StreamId,
404        expected_version: ExpectedVersion,
405        events: &[NewEvent],
406        outbox: &[crate::outbox::PendingOutbox],
407    ) -> Result<AppendResult, EngineError>;
408}
409
410// ── InMemoryEventStore ────────────────────────────────────────────────────────
411
412/// Internal state held behind the `Arc<Mutex<…>>`.
413#[cfg(any(test, feature = "testing"))]
414#[derive(Debug, Default)]
415struct InMemoryState {
416    /// Per-stream ordered event log (sequence-number indexed).
417    streams: HashMap<StreamId, Vec<EventEnvelope>>,
418    /// Global insertion-order log across all streams.
419    ///
420    /// This is the source for [`InMemoryEventStore::all_events`]. Keeping a
421    /// separate flat list avoids collecting + sorting across stream-local
422    /// sequence namespaces, which would produce an arbitrary ordering when
423    /// multiple streams are active.
424    global: Vec<EventEnvelope>,
425}
426
427/// A fully in-memory [`EventStore`] for testing and development.
428///
429/// Backed by two logs protected by a `RwLock`:
430/// - A per-stream `HashMap` for sequence-ordered access.
431/// - A global flat `Vec` that preserves cross-stream insertion order.
432///
433/// The store assigns `event_id`, `sequence_number`, `stream_id`, and
434/// `timestamp` to each appended event — callers submit [`NewEvent`] values.
435///
436/// Cloning the store shares the underlying data via `Arc` — all clones see
437/// the same events.
438///
439/// **Not suitable for production.** Use this for:
440/// - Unit and integration tests
441/// - Spikes and local development
442/// - CI environments that must not depend on external services
443///
444/// Only available in `#[cfg(test)]` or with the `testing` feature enabled.
445#[cfg(any(test, feature = "testing"))]
446#[derive(Debug, Default, Clone)]
447pub struct InMemoryEventStore {
448    inner: Arc<RwLock<InMemoryState>>,
449}
450
451#[cfg(any(test, feature = "testing"))]
452impl InMemoryEventStore {
453    /// Create an empty store.
454    #[must_use]
455    pub fn new() -> Self {
456        Self::default()
457    }
458
459    /// Return all events across all streams in insertion order.
460    ///
461    /// Because sequence numbers are stream-local, this method uses the
462    /// insertion-order global log rather than sorting by sequence number.
463    ///
464    /// **Test/development use only.** Production code should use
465    /// [`EventStore::load`] or [`EventStore::load_from`] to read specific
466    /// streams. Loading all events at once from a production store can OOM
467    /// the process.
468    ///
469    /// Available only when the `testing` feature is enabled or in `cfg(test)`.
470    #[cfg(any(test, feature = "testing"))]
471    #[must_use]
472    pub async fn all_events(&self) -> Vec<EventEnvelope> {
473        self.inner.read().await.global.clone()
474    }
475
476    /// Return all events for a specific stream in sequence order.
477    ///
478    /// **Test/development use only.** In production code, prefer
479    /// [`EventStore::load`] which is part of the trait contract.
480    ///
481    /// Available only when the `testing` feature is enabled or in `cfg(test)`.
482    #[cfg(any(test, feature = "testing"))]
483    #[must_use]
484    pub async fn events_for(&self, stream_id: &StreamId) -> Vec<EventEnvelope> {
485        self.inner
486            .read()
487            .await
488            .streams
489            .get(stream_id)
490            .cloned()
491            .unwrap_or_default()
492    }
493}
494
495#[cfg(any(test, feature = "testing"))]
496impl EventStore for InMemoryEventStore {
497    async fn append(
498        &self,
499        stream_id: &StreamId,
500        expected_version: ExpectedVersion,
501        new_events: &[NewEvent],
502    ) -> Result<AppendResult, EngineError> {
503        let mut inner = self.inner.write().await;
504
505        let current = inner.streams.get(stream_id).map_or(0, |s| s.len() as u64);
506
507        // Optimistic concurrency check.
508        match expected_version {
509            ExpectedVersion::NoStream => {
510                if current != 0 {
511                    return Err(EngineError::VersionConflict {
512                        expected: 0,
513                        actual: current,
514                    });
515                }
516            }
517            ExpectedVersion::Exact(v) => {
518                if current != v {
519                    return Err(EngineError::VersionConflict {
520                        expected: v,
521                        actual: current,
522                    });
523                }
524            }
525            ExpectedVersion::Any => {}
526        }
527
528        // Stamp each NewEvent with store-assigned fields.
529        let now = OffsetDateTime::now_utc();
530        let envelopes: Vec<EventEnvelope> = new_events
531            .iter()
532            .enumerate()
533            .map(|(i, new)| {
534                EventEnvelope::from_new(new.clone(), stream_id.clone(), current + i as u64 + 1, now)
535            })
536            .collect();
537
538        // Append to the per-stream log.
539        inner
540            .streams
541            .entry(stream_id.clone())
542            .or_default()
543            .extend_from_slice(&envelopes);
544
545        // Append to the global insertion-order log.
546        inner.global.extend_from_slice(&envelopes);
547
548        Ok(AppendResult {
549            last_sequence: current + new_events.len() as u64,
550            events: envelopes,
551        })
552    }
553
554    async fn load(&self, stream_id: &StreamId) -> Result<Vec<EventEnvelope>, EngineError> {
555        let inner = self.inner.read().await;
556        Ok(inner.streams.get(stream_id).cloned().unwrap_or_default())
557    }
558
559    async fn load_from(
560        &self,
561        stream_id: &StreamId,
562        from_sequence: u64,
563    ) -> Result<Vec<EventEnvelope>, EngineError> {
564        let inner = self.inner.read().await;
565        Ok(inner
566            .streams
567            .get(stream_id)
568            .map(|events| {
569                events
570                    .iter()
571                    .filter(|e| e.sequence_number > from_sequence)
572                    .cloned()
573                    .collect()
574            })
575            .unwrap_or_default())
576    }
577
578    /// O(1) version check — reads the stream length from the HashMap without
579    /// cloning any event payloads.
580    async fn stream_version(&self, stream_id: &StreamId) -> Result<u64, EngineError> {
581        let inner = self.inner.read().await;
582        Ok(inner.streams.get(stream_id).map_or(0, |s| s.len() as u64))
583    }
584
585    /// Returns all known stream identifiers, optionally filtered by `prefix`.
586    ///
587    /// O(n) in the number of streams — scans the HashMap keys once.
588    async fn list_streams(&self, prefix: Option<&str>) -> Result<Vec<StreamId>, EngineError> {
589        let inner = self.inner.read().await;
590        let ids = inner
591            .streams
592            .keys()
593            .filter(|id| prefix.is_none_or(|p| id.as_str().starts_with(p)))
594            .cloned()
595            .collect();
596        Ok(ids)
597    }
598
599    /// Paginated stream enumeration for `InMemoryEventStore`.
600    ///
601    /// Collects all matching keys, sorts them for deterministic order, then
602    /// applies cursor + limit slicing.  O(n) — acceptable for test/dev stores.
603    async fn list_streams_page(
604        &self,
605        prefix: Option<&str>,
606        cursor: Option<&StreamId>,
607        limit: usize,
608    ) -> Result<Vec<StreamId>, EngineError> {
609        if limit == 0 {
610            return Ok(Vec::new());
611        }
612        let inner = self.inner.read().await;
613        let mut ids: Vec<StreamId> = inner
614            .streams
615            .keys()
616            .filter(|id| prefix.is_none_or(|p| id.as_str().starts_with(p)))
617            .cloned()
618            .collect();
619        // Sort for deterministic pagination order (HashMap is unordered).
620        ids.sort_unstable_by(|a, b| a.as_str().cmp(b.as_str()));
621        let iter: Box<dyn Iterator<Item = StreamId>> = match cursor {
622            None => Box::new(ids.into_iter()),
623            Some(c) => Box::new(ids.into_iter().skip_while(move |id| id != c).skip(1)),
624        };
625        Ok(iter.take(limit).collect())
626    }
627
628    /// Fold over events in `stream_id` starting after `from_sequence`
629    /// (exclusive) without materialising the full `Vec<EventEnvelope>`.
630    ///
631    /// In-memory implementation: iterates the in-memory Vec slice.  This is
632    /// O(N) memory but that is acceptable for the in-memory test store
633    /// (production code uses `SlateDbStore::fold_stream` which is cursor-based).
634    async fn fold_stream<T, F>(
635        &self,
636        stream_id: &StreamId,
637        from_sequence: u64,
638        initial: T,
639        mut f: F,
640    ) -> Result<T, EngineError>
641    where
642        T: Send,
643        F: FnMut(T, EventEnvelope) -> Result<T, EngineError> + Send,
644    {
645        let inner = self.inner.read().await;
646        let mut acc = initial;
647        if let Some(events) = inner.streams.get(stream_id) {
648            for env in events.iter().filter(|e| e.sequence_number > from_sequence) {
649                acc = f(acc, env.clone())?;
650            }
651        }
652        Ok(acc)
653    }
654}
655
656#[cfg(test)]
657mod tests {
658    use super::*;
659    use crate::ids::{ConversationId, CorrelationId, ProcessId, TenantId};
660    use crate::version::WorkflowId;
661
662    fn make_new_event() -> NewEvent {
663        NewEvent {
664            correlation_id: CorrelationId::new(),
665            causation_id: None,
666            conversation_id: ConversationId::new(),
667            process_id: ProcessId::new(),
668            tenant_id: TenantId::new(),
669            workflow_id: WorkflowId::new("test", "FV2024-10-01"),
670            event_type: "TestEvent".into(),
671            schema_version: 1,
672            payload: serde_json::json!({"test": true}),
673        }
674    }
675
676    #[tokio::test]
677    async fn append_and_load_roundtrip() {
678        let store = InMemoryEventStore::new();
679        let stream = StreamId::new("test/s1");
680
681        let result = store
682            .append(
683                &stream,
684                ExpectedVersion::NoStream,
685                &[make_new_event(), make_new_event()],
686            )
687            .await
688            .unwrap();
689
690        assert_eq!(result.events.len(), 2);
691        assert_eq!(result.events[0].sequence_number, 1);
692        assert_eq!(result.events[1].sequence_number, 2);
693        assert_eq!(result.last_sequence, 2);
694
695        let loaded = store.load(&stream).await.unwrap();
696        assert_eq!(loaded.len(), 2);
697    }
698
699    #[tokio::test]
700    async fn store_stamps_stream_id_and_sequence() {
701        let store = InMemoryEventStore::new();
702        let stream = StreamId::new("test/stamp");
703
704        let result = store
705            .append(&stream, ExpectedVersion::NoStream, &[make_new_event()])
706            .await
707            .unwrap();
708
709        let env = &result.events[0];
710        assert_eq!(env.stream_id, stream);
711        assert_eq!(env.sequence_number, 1);
712    }
713
714    #[tokio::test]
715    async fn version_conflict_is_detected() {
716        let store = InMemoryEventStore::new();
717        let stream = StreamId::new("test/s2");
718
719        store
720            .append(&stream, ExpectedVersion::NoStream, &[make_new_event()])
721            .await
722            .unwrap();
723
724        let err = store
725            .append(&stream, ExpectedVersion::NoStream, &[make_new_event()])
726            .await
727            .unwrap_err();
728
729        assert!(matches!(err, EngineError::VersionConflict { .. }));
730    }
731
732    #[tokio::test]
733    async fn load_from_returns_tail_only() {
734        let store = InMemoryEventStore::new();
735        let stream = StreamId::new("test/s3");
736        let events: Vec<_> = (0..5).map(|_| make_new_event()).collect();
737
738        store
739            .append(&stream, ExpectedVersion::NoStream, &events)
740            .await
741            .unwrap();
742
743        let tail = store.load_from(&stream, 3).await.unwrap();
744        assert_eq!(tail.len(), 2, "expected events 4 and 5");
745        assert_eq!(tail[0].sequence_number, 4);
746        assert_eq!(tail[1].sequence_number, 5);
747    }
748
749    #[tokio::test]
750    async fn all_events_preserves_insertion_order_across_streams() {
751        let store = InMemoryEventStore::new();
752        let s1 = StreamId::new("test/order-s1");
753        let s2 = StreamId::new("test/order-s2");
754
755        store
756            .append(&s1, ExpectedVersion::NoStream, &[make_new_event()])
757            .await
758            .unwrap();
759        store
760            .append(&s2, ExpectedVersion::NoStream, &[make_new_event()])
761            .await
762            .unwrap();
763        store
764            .append(&s1, ExpectedVersion::Exact(1), &[make_new_event()])
765            .await
766            .unwrap();
767
768        let all = store.all_events().await;
769        assert_eq!(all.len(), 3);
770        assert_eq!(all[0].stream_id, s1);
771        assert_eq!(all[1].stream_id, s2);
772        assert_eq!(all[2].stream_id, s1);
773    }
774
775    #[tokio::test]
776    async fn arc_wrapper_delegates_correctly() {
777        let store = Arc::new(InMemoryEventStore::new());
778        let stream = StreamId::new("test/arc-s1");
779
780        store
781            .append(&stream, ExpectedVersion::NoStream, &[make_new_event()])
782            .await
783            .unwrap();
784
785        let loaded = store.load(&stream).await.unwrap();
786        assert_eq!(loaded.len(), 1);
787    }
788
789    #[tokio::test]
790    async fn fold_stream_accumulates_without_full_vec() {
791        let store = InMemoryEventStore::new();
792        let stream = StreamId::new("test/fold-s1");
793        let events: Vec<_> = (0..4).map(|_| make_new_event()).collect();
794
795        store
796            .append(&stream, ExpectedVersion::NoStream, &events)
797            .await
798            .unwrap();
799
800        // Fold from the beginning: count events.
801        let count = store
802            .fold_stream(&stream, 0, 0usize, |acc, _| Ok(acc + 1))
803            .await
804            .unwrap();
805        assert_eq!(count, 4);
806
807        // Fold from sequence 2: count only tail events.
808        let tail_count = store
809            .fold_stream(&stream, 2, 0usize, |acc, _| Ok(acc + 1))
810            .await
811            .unwrap();
812        assert_eq!(tail_count, 2);
813    }
814}