mako_engine/event_store.rs
1//! [`EventStore`] trait and the in-process [`InMemoryEventStore`] implementation.
2//!
3//! The engine defines only the trait. The production implementation is
4//! [`SlateDbStore`][crate::store_slatedb::SlateDbStore], enabled by the `slatedb`
5//! feature flag. [`InMemoryEventStore`] is included here for tests, spikes, and
6//! development without external dependencies.
7
8use std::sync::Arc;
9
10#[cfg(any(test, feature = "testing"))]
11use std::collections::HashMap;
12#[cfg(any(test, feature = "testing"))]
13use time::OffsetDateTime;
14#[cfg(any(test, feature = "testing"))]
15use tokio::sync::RwLock;
16
17use crate::{
18 envelope::{EventEnvelope, NewEvent},
19 error::EngineError,
20 ids::StreamId,
21};
22
23// ── ExpectedVersion ───────────────────────────────────────────────────────────
24
25/// Optimistic concurrency control contract for [`EventStore::append`].
26///
27/// The caller declares which sequence number they expect the stream to be at.
28/// The store atomically checks this before writing; a mismatch means a
29/// concurrent writer modified the stream and the caller must reload and retry.
30#[derive(Debug, Clone, Copy, PartialEq, Eq)]
31pub enum ExpectedVersion {
32 /// The stream must not exist yet (sequence number 0).
33 NoStream,
34 /// The stream must be at exactly this sequence number.
35 Exact(u64),
36 /// Skip the concurrency check entirely.
37 ///
38 /// **Do not use in production workflow append paths.** `Any` silently
39 /// accepts any write regardless of concurrent modifications, which can
40 /// produce duplicate or interleaved events in a stream.
41 ///
42 /// Legitimate uses: [`MigrationRunner`] (bulk admin rewrites),
43 /// snapshot-accelerated store internals, and test scaffolding where the
44 /// caller owns all write access by construction.
45 ///
46 /// For normal workflow event appends always use [`ExpectedVersion::NoStream`]
47 /// (first event) or [`ExpectedVersion::Exact`] (subsequent events).
48 ///
49 /// [`MigrationRunner`]: crate::migration::MigrationRunner
50 Any,
51}
52
53// ── AppendResult ──────────────────────────────────────────────────────────────
54
55/// Metadata returned after a successful [`EventStore::append`].
56#[derive(Debug, Clone)]
57pub struct AppendResult {
58 /// The sequence number of the last event written in this batch.
59 pub last_sequence: u64,
60 /// The fully materialised envelopes as persisted by the store.
61 ///
62 /// Each envelope has its `event_id`, `sequence_number`, `stream_id`, and
63 /// `timestamp` stamped by the store. Callers use these for return values
64 /// and projection seeding without re-loading from storage.
65 pub events: Vec<EventEnvelope>,
66}
67
68// ── EventStore trait ──────────────────────────────────────────────────────────
69
70/// Append-only, ordered event stream storage contract.
71///
72/// ## Implementation requirements
73///
74/// - **Ordered**: events within a stream are always returned in append order.
75/// - **Atomic**: a multi-event append either fully succeeds or fully fails.
76/// - **Optimistic concurrency**: detect concurrent writers via
77/// [`ExpectedVersion`].
78/// - **Append-only**: events are never modified or deleted through this API.
79/// - **Sequence number ownership**: the store assigns `sequence_number`,
80/// `event_id`, `stream_id`, and `timestamp` on each appended envelope.
81/// Callers submit [`NewEvent`] values without these fields.
82///
83/// ## Blanket `Arc` implementation
84///
85/// `Arc<S>` implements `EventStore` whenever `S: EventStore`, so
86/// `Process<W, Arc<MyStore>>` works without any extra wrapper type.
87#[allow(async_fn_in_trait)] // RPIT-in-traits with Send bounds require AFIT; use #[allow] until
88// the ecosystem settles on a stable pattern for Rust 1.85 MSRV.
89pub trait EventStore: Send + Sync {
90 /// Atomically append `events` to `stream_id`.
91 ///
92 /// The store assigns `event_id`, `sequence_number`, `stream_id`, and
93 /// `timestamp` on each event. The fully materialised envelopes are
94 /// returned in [`AppendResult::events`].
95 ///
96 /// # Errors
97 ///
98 /// - [`EngineError::VersionConflict`] when `expected_version` is
99 /// [`ExpectedVersion::NoStream`] or [`ExpectedVersion::Exact`] and the
100 /// actual stream version does not match.
101 /// - [`EngineError::Store`] for underlying storage failures.
102 async fn append(
103 &self,
104 stream_id: &StreamId,
105 expected_version: ExpectedVersion,
106 events: &[NewEvent],
107 ) -> Result<AppendResult, EngineError>;
108
109 /// Load all events from `stream_id` in sequence order.
110 ///
111 /// Returns an empty `Vec` when the stream does not exist.
112 ///
113 /// # Errors
114 ///
115 /// Returns [`EngineError::Store`] for underlying storage failures.
116 async fn load(&self, stream_id: &StreamId) -> Result<Vec<EventEnvelope>, EngineError>;
117
118 /// Load events from `stream_id` starting after `from_sequence` (exclusive).
119 ///
120 /// Useful for incremental projection catch-up: pass the projection's last
121 /// processed sequence number to load only new events.
122 ///
123 /// Returns an empty `Vec` when no new events exist.
124 ///
125 /// # Errors
126 ///
127 /// Returns [`EngineError::Store`] for underlying storage failures.
128 async fn load_from(
129 &self,
130 stream_id: &StreamId,
131 from_sequence: u64,
132 ) -> Result<Vec<EventEnvelope>, EngineError>;
133
134 /// Return the current sequence number of `stream_id`.
135 ///
136 /// The sequence number equals the number of events in the stream (1-based
137 /// after the first append). Returns `0` when the stream does not exist.
138 ///
139 /// Use this instead of `load(…).await?.len()` when you only need the
140 /// count — backends can implement this as a cheap metadata query without
141 /// transferring event payloads.
142 ///
143 /// **Required.** There is no default implementation — a fallback that
144 /// loads all events defeats the O(1) metadata-query contract. Implementors
145 /// must read the stored sequence counter directly (e.g. a `sv/{stream_id}`
146 /// key in SlateDB) to avoid O(n) event-payload transfers.
147 ///
148 /// # Errors
149 ///
150 /// Returns [`EngineError::Store`] for underlying storage failures.
151 async fn stream_version(&self, stream_id: &StreamId) -> Result<u64, EngineError>;
152
153 /// Return all known stream identifiers in this store, optionally filtered
154 /// by `prefix`.
155 ///
156 /// When `prefix` is `Some("process/")`, only streams whose identifiers
157 /// start with `"process/"` are returned (e.g. all process-instance
158 /// streams). When `prefix` is `None`, all streams are returned.
159 ///
160 /// This is the primary enumeration API for multi-stream projections: the
161 /// caller discovers all relevant streams, then passes the list to
162 /// [`crate::projection::ProjectionRunner::run_all_streams`] /
163 /// [`crate::projection::ProjectionRunner::catch_up_all_streams`].
164 ///
165 /// The returned order is unspecified. Stable ordering can be achieved by
166 /// sorting the `Vec` before use if deterministic replay is required.
167 ///
168 /// **Required.** There is no default implementation — a missing override
169 /// silently returns no streams, causing multi-stream projections (e.g.
170 /// MABIS billing aggregations) to process zero streams and return empty
171 /// read models with no error signal.
172 ///
173 /// # Errors
174 ///
175 /// Returns [`EngineError::Store`] for underlying storage failures.
176 async fn list_streams(&self, prefix: Option<&str>) -> Result<Vec<StreamId>, EngineError>;
177
178 /// Paginated stream enumeration — equivalent to `list_streams` but returns
179 /// at most `limit` entries starting after `cursor` (exclusive, UTF-8
180 /// stream-ID order).
181 ///
182 /// # Parameters
183 ///
184 /// - `prefix` — optional key prefix to restrict the scan (same semantics as
185 /// `list_streams`).
186 /// - `cursor` — if `Some(s)`, resume after stream ID `s`; `None` starts
187 /// from the beginning.
188 /// - `limit` — maximum number of stream IDs to return per page. A return
189 /// count strictly less than `limit` indicates the last page.
190 ///
191 /// # Page iteration pattern
192 ///
193 /// ```rust,ignore
194 /// let mut cursor: Option<StreamId> = None;
195 /// loop {
196 /// let page = store.list_streams_page(Some("process/"), cursor.as_ref(), 100).await?;
197 /// let done = page.len() < 100;
198 /// for id in &page { /* process */ }
199 /// cursor = page.into_iter().last();
200 /// if done { break; }
201 /// }
202 /// ```
203 ///
204 /// # Default implementation
205 ///
206 /// Falls back to `list_streams` + in-memory slicing for stores that do not
207 /// provide a native cursor scan.
208 ///
209 /// # ⚠️ Override required for production stores
210 ///
211 /// This default loads **all** matching stream IDs into memory on every call,
212 /// making `list_streams_page` loops O(n²) in total stream count. Any
213 /// production `EventStore` implementation (e.g. PostgreSQL, CockroachDB)
214 /// **must** override this method with an efficient cursor-based scan. The
215 /// SlateDB store already provides such an override. Failure to override
216 /// this method will cause projection catch-up to degrade silently under
217 /// deployments with > 10,000 active process streams.
218 ///
219 /// # Errors
220 ///
221 /// Returns [`EngineError::Store`] for underlying storage failures.
222 async fn list_streams_page(
223 &self,
224 prefix: Option<&str>,
225 cursor: Option<&StreamId>,
226 limit: usize,
227 ) -> Result<Vec<StreamId>, EngineError> {
228 // Default: enumerate all + skip-after-cursor + take(limit).
229 let all = self.list_streams(prefix).await?;
230 let iter: Box<dyn Iterator<Item = StreamId>> = match cursor {
231 None => Box::new(all.into_iter()),
232 Some(c) => Box::new(all.into_iter().skip_while(move |id| id != c).skip(1)),
233 };
234 Ok(iter.take(limit).collect())
235 }
236
237 /// Fold over events in `stream_id` starting after `from_sequence`
238 /// (exclusive), accumulating state without materialising the full
239 /// `Vec<EventEnvelope>`.
240 ///
241 /// This is the memory-efficient alternative to `load_from` for large
242 /// streams. Instead of returning all events as a Vec, it applies `f` to
243 /// each event in order and returns the final accumulated value.
244 ///
245 /// `from_sequence = 0` folds from the beginning of the stream.
246 ///
247 /// ```rust,ignore
248 /// // Reconstruct process state event-by-event without a Vec allocation:
249 /// let state = store.fold_stream(
250 /// &stream_id, 0, W::State::default(),
251 /// |acc, env| Ok(acc.apply(env.event()))
252 /// ).await?;
253 /// ```
254 ///
255 /// **Required.** There is no default implementation — a fallback that
256 /// materialises `load_from(...)` into a `Vec` defeats the purpose of this
257 /// method for large MABIS billing streams (potentially thousands of events
258 /// per billing period). Implementors must provide a cursor-based scan for
259 /// constant-memory behaviour.
260 ///
261 /// # Errors
262 ///
263 /// Returns [`EngineError::Store`] for underlying storage failures.
264 /// Returns any error produced by `f`.
265 async fn fold_stream<T, F>(
266 &self,
267 stream_id: &StreamId,
268 from_sequence: u64,
269 initial: T,
270 f: F,
271 ) -> Result<T, EngineError>
272 where
273 T: Send,
274 F: FnMut(T, EventEnvelope) -> Result<T, EngineError> + Send;
275}
276
277// ── Arc<S> blanket impl ───────────────────────────────────────────────────────
278
279impl<S: EventStore> EventStore for Arc<S> {
280 async fn append(
281 &self,
282 stream_id: &StreamId,
283 expected_version: ExpectedVersion,
284 events: &[NewEvent],
285 ) -> Result<AppendResult, EngineError> {
286 self.as_ref()
287 .append(stream_id, expected_version, events)
288 .await
289 }
290
291 async fn load(&self, stream_id: &StreamId) -> Result<Vec<EventEnvelope>, EngineError> {
292 self.as_ref().load(stream_id).await
293 }
294
295 async fn load_from(
296 &self,
297 stream_id: &StreamId,
298 from_sequence: u64,
299 ) -> Result<Vec<EventEnvelope>, EngineError> {
300 self.as_ref().load_from(stream_id, from_sequence).await
301 }
302
303 async fn stream_version(&self, stream_id: &StreamId) -> Result<u64, EngineError> {
304 self.as_ref().stream_version(stream_id).await
305 }
306
307 async fn list_streams(&self, prefix: Option<&str>) -> Result<Vec<StreamId>, EngineError> {
308 self.as_ref().list_streams(prefix).await
309 }
310
311 async fn list_streams_page(
312 &self,
313 prefix: Option<&str>,
314 cursor: Option<&StreamId>,
315 limit: usize,
316 ) -> Result<Vec<StreamId>, EngineError> {
317 self.as_ref().list_streams_page(prefix, cursor, limit).await
318 }
319
320 async fn fold_stream<T, F>(
321 &self,
322 stream_id: &StreamId,
323 from_sequence: u64,
324 initial: T,
325 f: F,
326 ) -> Result<T, EngineError>
327 where
328 T: Send,
329 F: FnMut(T, EventEnvelope) -> Result<T, EngineError> + Send,
330 {
331 self.as_ref()
332 .fold_stream(stream_id, from_sequence, initial, f)
333 .await
334 }
335}
336
337// ── Arc<S>: AtomicAppend blanket impl ────────────────────────────────────────
338
339/// Blanket delegation so `Arc<S>` inherits the `AtomicAppend` contract from
340/// `S`.
341///
342/// This enables `Process<W, Arc<SlateDbStore>>` to call
343/// `execute_and_enqueue` and its retry/snapshot variants without requiring
344/// callers to unwrap the `Arc`.
345impl<S: AtomicAppend> AtomicAppend for Arc<S> {
346 async fn append_with_outbox(
347 &self,
348 stream_id: &StreamId,
349 expected_version: ExpectedVersion,
350 events: &[NewEvent],
351 outbox: &[crate::outbox::PendingOutbox],
352 ) -> Result<AppendResult, EngineError> {
353 self.as_ref()
354 .append_with_outbox(stream_id, expected_version, events, outbox)
355 .await
356 }
357}
358
359// ── AtomicAppend trait ────────────────────────────────────────────────────────
360
361/// Extension of [`EventStore`] that atomically appends events **and** enqueues
362/// outbox messages in a single write operation.
363///
364/// Implementations must guarantee that either both the events and the outbox
365/// messages are persisted, or neither is — even across process crashes. For
366/// SlateDB this is achieved via a single [`WriteBatch`].
367///
368/// # Why a separate trait?
369///
370/// Not every [`EventStore`] backend supports atomic dual-writes (e.g. an
371/// in-memory test store). Keeping atomicity in a separate trait allows
372/// `Process::execute` to work against any `EventStore`, while
373/// `Process::execute_and_enqueue` requires the stronger `AtomicAppend` bound.
374///
375/// # Safety
376///
377/// Only call `append_with_outbox` from the engine's `execute_and_enqueue`
378/// path. Never write events first and outbox messages second — a crash
379/// between the two produces a silent lost APERAK.
380///
381/// [`WriteBatch`]: slatedb::WriteBatch
382#[allow(async_fn_in_trait)]
383pub trait AtomicAppend: EventStore {
384 /// Atomically append `events` to `stream_id` and schedule `outbox` messages.
385 ///
386 /// The `outbox` slice carries lightweight [`crate::outbox::PendingOutbox`]
387 /// values produced by [`Workflow::handle`]. The implementation is
388 /// responsible for materialising them into fully-typed
389 /// [`crate::outbox::OutboxMessage`] values using the store-assigned fields
390 /// of the stamped envelopes (e.g. `event_id` as `causation_event_id`).
391 ///
392 /// When `outbox` is empty, this degenerates to a plain `EventStore::append`.
393 ///
394 /// # Errors
395 ///
396 /// - [`EngineError::VersionConflict`] — optimistic concurrency check
397 /// failed; reload state and retry.
398 /// - [`EngineError::Store`] or [`EngineError::Outbox`] — storage failure.
399 ///
400 /// [`Workflow::handle`]: crate::workflow::Workflow::handle
401 async fn append_with_outbox(
402 &self,
403 stream_id: &StreamId,
404 expected_version: ExpectedVersion,
405 events: &[NewEvent],
406 outbox: &[crate::outbox::PendingOutbox],
407 ) -> Result<AppendResult, EngineError>;
408}
409
410// ── InMemoryEventStore ────────────────────────────────────────────────────────
411
412/// Internal state held behind the `Arc<Mutex<…>>`.
413#[cfg(any(test, feature = "testing"))]
414#[derive(Debug, Default)]
415struct InMemoryState {
416 /// Per-stream ordered event log (sequence-number indexed).
417 streams: HashMap<StreamId, Vec<EventEnvelope>>,
418 /// Global insertion-order log across all streams.
419 ///
420 /// This is the source for [`InMemoryEventStore::all_events`]. Keeping a
421 /// separate flat list avoids collecting + sorting across stream-local
422 /// sequence namespaces, which would produce an arbitrary ordering when
423 /// multiple streams are active.
424 global: Vec<EventEnvelope>,
425}
426
427/// A fully in-memory [`EventStore`] for testing and development.
428///
429/// Backed by two logs protected by a `RwLock`:
430/// - A per-stream `HashMap` for sequence-ordered access.
431/// - A global flat `Vec` that preserves cross-stream insertion order.
432///
433/// The store assigns `event_id`, `sequence_number`, `stream_id`, and
434/// `timestamp` to each appended event — callers submit [`NewEvent`] values.
435///
436/// Cloning the store shares the underlying data via `Arc` — all clones see
437/// the same events.
438///
439/// **Not suitable for production.** Use this for:
440/// - Unit and integration tests
441/// - Spikes and local development
442/// - CI environments that must not depend on external services
443///
444/// Only available in `#[cfg(test)]` or with the `testing` feature enabled.
445#[cfg(any(test, feature = "testing"))]
446#[derive(Debug, Default, Clone)]
447pub struct InMemoryEventStore {
448 inner: Arc<RwLock<InMemoryState>>,
449}
450
451#[cfg(any(test, feature = "testing"))]
452impl InMemoryEventStore {
453 /// Create an empty store.
454 #[must_use]
455 pub fn new() -> Self {
456 Self::default()
457 }
458
459 /// Return all events across all streams in insertion order.
460 ///
461 /// Because sequence numbers are stream-local, this method uses the
462 /// insertion-order global log rather than sorting by sequence number.
463 ///
464 /// **Test/development use only.** Production code should use
465 /// [`EventStore::load`] or [`EventStore::load_from`] to read specific
466 /// streams. Loading all events at once from a production store can OOM
467 /// the process.
468 ///
469 /// Available only when the `testing` feature is enabled or in `cfg(test)`.
470 #[cfg(any(test, feature = "testing"))]
471 #[must_use]
472 pub async fn all_events(&self) -> Vec<EventEnvelope> {
473 self.inner.read().await.global.clone()
474 }
475
476 /// Return all events for a specific stream in sequence order.
477 ///
478 /// **Test/development use only.** In production code, prefer
479 /// [`EventStore::load`] which is part of the trait contract.
480 ///
481 /// Available only when the `testing` feature is enabled or in `cfg(test)`.
482 #[cfg(any(test, feature = "testing"))]
483 #[must_use]
484 pub async fn events_for(&self, stream_id: &StreamId) -> Vec<EventEnvelope> {
485 self.inner
486 .read()
487 .await
488 .streams
489 .get(stream_id)
490 .cloned()
491 .unwrap_or_default()
492 }
493}
494
495#[cfg(any(test, feature = "testing"))]
496impl EventStore for InMemoryEventStore {
497 async fn append(
498 &self,
499 stream_id: &StreamId,
500 expected_version: ExpectedVersion,
501 new_events: &[NewEvent],
502 ) -> Result<AppendResult, EngineError> {
503 let mut inner = self.inner.write().await;
504
505 let current = inner.streams.get(stream_id).map_or(0, |s| s.len() as u64);
506
507 // Optimistic concurrency check.
508 match expected_version {
509 ExpectedVersion::NoStream => {
510 if current != 0 {
511 return Err(EngineError::VersionConflict {
512 expected: 0,
513 actual: current,
514 });
515 }
516 }
517 ExpectedVersion::Exact(v) => {
518 if current != v {
519 return Err(EngineError::VersionConflict {
520 expected: v,
521 actual: current,
522 });
523 }
524 }
525 ExpectedVersion::Any => {}
526 }
527
528 // Stamp each NewEvent with store-assigned fields.
529 let now = OffsetDateTime::now_utc();
530 let envelopes: Vec<EventEnvelope> = new_events
531 .iter()
532 .enumerate()
533 .map(|(i, new)| {
534 EventEnvelope::from_new(new.clone(), stream_id.clone(), current + i as u64 + 1, now)
535 })
536 .collect();
537
538 // Append to the per-stream log.
539 inner
540 .streams
541 .entry(stream_id.clone())
542 .or_default()
543 .extend_from_slice(&envelopes);
544
545 // Append to the global insertion-order log.
546 inner.global.extend_from_slice(&envelopes);
547
548 Ok(AppendResult {
549 last_sequence: current + new_events.len() as u64,
550 events: envelopes,
551 })
552 }
553
554 async fn load(&self, stream_id: &StreamId) -> Result<Vec<EventEnvelope>, EngineError> {
555 let inner = self.inner.read().await;
556 Ok(inner.streams.get(stream_id).cloned().unwrap_or_default())
557 }
558
559 async fn load_from(
560 &self,
561 stream_id: &StreamId,
562 from_sequence: u64,
563 ) -> Result<Vec<EventEnvelope>, EngineError> {
564 let inner = self.inner.read().await;
565 Ok(inner
566 .streams
567 .get(stream_id)
568 .map(|events| {
569 events
570 .iter()
571 .filter(|e| e.sequence_number > from_sequence)
572 .cloned()
573 .collect()
574 })
575 .unwrap_or_default())
576 }
577
578 /// O(1) version check — reads the stream length from the HashMap without
579 /// cloning any event payloads.
580 async fn stream_version(&self, stream_id: &StreamId) -> Result<u64, EngineError> {
581 let inner = self.inner.read().await;
582 Ok(inner.streams.get(stream_id).map_or(0, |s| s.len() as u64))
583 }
584
585 /// Returns all known stream identifiers, optionally filtered by `prefix`.
586 ///
587 /// O(n) in the number of streams — scans the HashMap keys once.
588 async fn list_streams(&self, prefix: Option<&str>) -> Result<Vec<StreamId>, EngineError> {
589 let inner = self.inner.read().await;
590 let ids = inner
591 .streams
592 .keys()
593 .filter(|id| prefix.is_none_or(|p| id.as_str().starts_with(p)))
594 .cloned()
595 .collect();
596 Ok(ids)
597 }
598
599 /// Paginated stream enumeration for `InMemoryEventStore`.
600 ///
601 /// Collects all matching keys, sorts them for deterministic order, then
602 /// applies cursor + limit slicing. O(n) — acceptable for test/dev stores.
603 async fn list_streams_page(
604 &self,
605 prefix: Option<&str>,
606 cursor: Option<&StreamId>,
607 limit: usize,
608 ) -> Result<Vec<StreamId>, EngineError> {
609 if limit == 0 {
610 return Ok(Vec::new());
611 }
612 let inner = self.inner.read().await;
613 let mut ids: Vec<StreamId> = inner
614 .streams
615 .keys()
616 .filter(|id| prefix.is_none_or(|p| id.as_str().starts_with(p)))
617 .cloned()
618 .collect();
619 // Sort for deterministic pagination order (HashMap is unordered).
620 ids.sort_unstable_by(|a, b| a.as_str().cmp(b.as_str()));
621 let iter: Box<dyn Iterator<Item = StreamId>> = match cursor {
622 None => Box::new(ids.into_iter()),
623 Some(c) => Box::new(ids.into_iter().skip_while(move |id| id != c).skip(1)),
624 };
625 Ok(iter.take(limit).collect())
626 }
627
628 /// Fold over events in `stream_id` starting after `from_sequence`
629 /// (exclusive) without materialising the full `Vec<EventEnvelope>`.
630 ///
631 /// In-memory implementation: iterates the in-memory Vec slice. This is
632 /// O(N) memory but that is acceptable for the in-memory test store
633 /// (production code uses `SlateDbStore::fold_stream` which is cursor-based).
634 async fn fold_stream<T, F>(
635 &self,
636 stream_id: &StreamId,
637 from_sequence: u64,
638 initial: T,
639 mut f: F,
640 ) -> Result<T, EngineError>
641 where
642 T: Send,
643 F: FnMut(T, EventEnvelope) -> Result<T, EngineError> + Send,
644 {
645 let inner = self.inner.read().await;
646 let mut acc = initial;
647 if let Some(events) = inner.streams.get(stream_id) {
648 for env in events.iter().filter(|e| e.sequence_number > from_sequence) {
649 acc = f(acc, env.clone())?;
650 }
651 }
652 Ok(acc)
653 }
654}
655
656#[cfg(test)]
657mod tests {
658 use super::*;
659 use crate::ids::{ConversationId, CorrelationId, ProcessId, TenantId};
660 use crate::version::WorkflowId;
661
662 fn make_new_event() -> NewEvent {
663 NewEvent {
664 correlation_id: CorrelationId::new(),
665 causation_id: None,
666 conversation_id: ConversationId::new(),
667 process_id: ProcessId::new(),
668 tenant_id: TenantId::new(),
669 workflow_id: WorkflowId::new("test", "FV2024-10-01"),
670 event_type: "TestEvent".into(),
671 schema_version: 1,
672 payload: serde_json::json!({"test": true}),
673 }
674 }
675
676 #[tokio::test]
677 async fn append_and_load_roundtrip() {
678 let store = InMemoryEventStore::new();
679 let stream = StreamId::new("test/s1");
680
681 let result = store
682 .append(
683 &stream,
684 ExpectedVersion::NoStream,
685 &[make_new_event(), make_new_event()],
686 )
687 .await
688 .unwrap();
689
690 assert_eq!(result.events.len(), 2);
691 assert_eq!(result.events[0].sequence_number, 1);
692 assert_eq!(result.events[1].sequence_number, 2);
693 assert_eq!(result.last_sequence, 2);
694
695 let loaded = store.load(&stream).await.unwrap();
696 assert_eq!(loaded.len(), 2);
697 }
698
699 #[tokio::test]
700 async fn store_stamps_stream_id_and_sequence() {
701 let store = InMemoryEventStore::new();
702 let stream = StreamId::new("test/stamp");
703
704 let result = store
705 .append(&stream, ExpectedVersion::NoStream, &[make_new_event()])
706 .await
707 .unwrap();
708
709 let env = &result.events[0];
710 assert_eq!(env.stream_id, stream);
711 assert_eq!(env.sequence_number, 1);
712 }
713
714 #[tokio::test]
715 async fn version_conflict_is_detected() {
716 let store = InMemoryEventStore::new();
717 let stream = StreamId::new("test/s2");
718
719 store
720 .append(&stream, ExpectedVersion::NoStream, &[make_new_event()])
721 .await
722 .unwrap();
723
724 let err = store
725 .append(&stream, ExpectedVersion::NoStream, &[make_new_event()])
726 .await
727 .unwrap_err();
728
729 assert!(matches!(err, EngineError::VersionConflict { .. }));
730 }
731
732 #[tokio::test]
733 async fn load_from_returns_tail_only() {
734 let store = InMemoryEventStore::new();
735 let stream = StreamId::new("test/s3");
736 let events: Vec<_> = (0..5).map(|_| make_new_event()).collect();
737
738 store
739 .append(&stream, ExpectedVersion::NoStream, &events)
740 .await
741 .unwrap();
742
743 let tail = store.load_from(&stream, 3).await.unwrap();
744 assert_eq!(tail.len(), 2, "expected events 4 and 5");
745 assert_eq!(tail[0].sequence_number, 4);
746 assert_eq!(tail[1].sequence_number, 5);
747 }
748
749 #[tokio::test]
750 async fn all_events_preserves_insertion_order_across_streams() {
751 let store = InMemoryEventStore::new();
752 let s1 = StreamId::new("test/order-s1");
753 let s2 = StreamId::new("test/order-s2");
754
755 store
756 .append(&s1, ExpectedVersion::NoStream, &[make_new_event()])
757 .await
758 .unwrap();
759 store
760 .append(&s2, ExpectedVersion::NoStream, &[make_new_event()])
761 .await
762 .unwrap();
763 store
764 .append(&s1, ExpectedVersion::Exact(1), &[make_new_event()])
765 .await
766 .unwrap();
767
768 let all = store.all_events().await;
769 assert_eq!(all.len(), 3);
770 assert_eq!(all[0].stream_id, s1);
771 assert_eq!(all[1].stream_id, s2);
772 assert_eq!(all[2].stream_id, s1);
773 }
774
775 #[tokio::test]
776 async fn arc_wrapper_delegates_correctly() {
777 let store = Arc::new(InMemoryEventStore::new());
778 let stream = StreamId::new("test/arc-s1");
779
780 store
781 .append(&stream, ExpectedVersion::NoStream, &[make_new_event()])
782 .await
783 .unwrap();
784
785 let loaded = store.load(&stream).await.unwrap();
786 assert_eq!(loaded.len(), 1);
787 }
788
789 #[tokio::test]
790 async fn fold_stream_accumulates_without_full_vec() {
791 let store = InMemoryEventStore::new();
792 let stream = StreamId::new("test/fold-s1");
793 let events: Vec<_> = (0..4).map(|_| make_new_event()).collect();
794
795 store
796 .append(&stream, ExpectedVersion::NoStream, &events)
797 .await
798 .unwrap();
799
800 // Fold from the beginning: count events.
801 let count = store
802 .fold_stream(&stream, 0, 0usize, |acc, _| Ok(acc + 1))
803 .await
804 .unwrap();
805 assert_eq!(count, 4);
806
807 // Fold from sequence 2: count only tail events.
808 let tail_count = store
809 .fold_stream(&stream, 2, 0usize, |acc, _| Ok(acc + 1))
810 .await
811 .unwrap();
812 assert_eq!(tail_count, 2);
813 }
814}