Skip to main content

mako_engine/
projection.rs

1//! [`Projection`] trait and [`ProjectionRunner`].
2//!
3//! Projections build read models from the event stream. They are:
4//!
5//! - **Asynchronous** — fed events independently of the write path
6//! - **Disposable** — the read model can be dropped and rebuilt at any time
7//! - **Eventually consistent** — they may lag behind the write head
8//!
9//! Projection failures must never affect event persistence.
10//!
11//! # Incremental catch-up
12//!
13//! Projections that track their cursor position can implement
14//! [`Projection::last_sequence`] so [`ProjectionRunner::catch_up`] and
15//! [`ProjectionRunner::catch_up_from_store`] feed only new events.
16//!
17//! # Store-backed streaming
18//!
19//! [`ProjectionRunner::run_from_store`] and
20//! [`ProjectionRunner::catch_up_from_store`] load events directly from an
21//! [`EventStore`] without requiring the caller to pre-load the entire
22//! event slice into memory.
23//!
24//! # Multi-stream projections
25//!
26//! [`ProjectionRunner::run_all_streams`] and
27//! [`ProjectionRunner::catch_up_all_streams`] drive a projection across
28//! multiple streams simultaneously. This is required for process families
29//! where a read model aggregates across many process instances — for
30//! example, MABIS Bilanzkreisabrechnung aggregating events across thousands
31//! of MaLo-level process streams for a single billing period.
32//!
33//! The [`GlobalProjectionCheckpoint`] records per-stream cursors so
34//! incremental catch-up only feeds events newer than the last replay.
35//!
36//! ```rust,ignore
37//! // Initial full replay across all process streams:
38//! let checkpoint = ProjectionRunner::run_all_streams(
39//!     &mut billing_proj,
40//!     &store,
41//!     &stream_ids,
42//! ).await?;
43//!
44//! // Later: incremental update after new events arrive:
45//! let checkpoint = ProjectionRunner::catch_up_all_streams(
46//!     &mut billing_proj,
47//!     &store,
48//!     &stream_ids,
49//!     &checkpoint,
50//! ).await?;
51//! ```
52//!
53//! To enumerate all process streams automatically, use
54//! [`EventStore::list_streams`] with a prefix.  Pass `"process/"` to scan
55//! all tenants, or `&format!("process/{tenant_id}/")` to scope to one tenant:
56//!
57//! ```rust,ignore
58//! // All tenants:
59//! let streams = store.list_streams(Some("process/")).await?;
60//! // Single tenant:
61//! let streams = store.list_streams(Some(&format!("process/{tenant_id}/"))).await?;
62//! let checkpoint = ProjectionRunner::run_all_streams(
63//!     &mut billing_proj, &store, &streams,
64//! ).await?;
65//! ```
66
67use std::collections::BTreeMap;
68
69use crate::{envelope::EventEnvelope, error::EngineError, event_store::EventStore, ids::StreamId};
70
71// ── Projection trait ──────────────────────────────────────────────────────────
72
73/// A read-model builder that consumes events and maintains queryable state.
74///
75/// # Contract
76///
77/// - `handle_event` is called for every event in stream order.
78/// - `handle_event` must not panic on events it doesn't recognise (forward
79///   compatibility: new event types appear when new domain features are
80///   deployed before all projections are updated).
81/// - The projection is rebuilt from scratch by replaying all events through
82///   [`ProjectionRunner::run`]; implementations must tolerate this.
83pub trait Projection {
84    /// A stable human-readable name for this projection (used in logs/metrics).
85    fn name(&self) -> &'static str;
86
87    /// Process a single event, updating internal read-model state.
88    fn handle_event(&mut self, envelope: &EventEnvelope);
89
90    /// The sequence number of the last event this projection processed.
91    ///
92    /// Return `None` when the projection has not processed any events yet
93    /// (i.e. it needs a full replay).
94    ///
95    /// Implement this method if your projection stores the cursor alongside
96    /// the read model so [`ProjectionRunner::catch_up`] can perform
97    /// incremental updates.
98    ///
99    /// Defaults to `None`.
100    fn last_sequence(&self) -> Option<u64> {
101        None
102    }
103}
104
105// ── GlobalProjectionCheckpoint ────────────────────────────────────────────────
106
107/// Per-stream sequence number cursors for multi-stream projections.
108///
109/// Returned by [`ProjectionRunner::run_all_streams`] and
110/// [`ProjectionRunner::catch_up_all_streams`]. Pass an existing checkpoint
111/// to `catch_up_all_streams` so only events newer than the last replay are
112/// fed to the projection.
113///
114/// Persist this value (e.g. alongside the read model in a snapshot store) to
115/// survive process restarts and avoid full replays on restart.
116///
117/// A cursor value of `0` for a stream means "never seen" (equivalent to
118/// "replay from the beginning").
119#[derive(Debug, Default, Clone, serde::Serialize, serde::Deserialize)]
120pub struct GlobalProjectionCheckpoint {
121    /// Last-processed sequence number per stream identifier.
122    ///
123    /// Streams not present in this map have an implicit cursor of `0`.
124    pub cursors: BTreeMap<StreamId, u64>,
125}
126
127impl GlobalProjectionCheckpoint {
128    /// Create an empty checkpoint (all streams will be fully replayed).
129    #[must_use]
130    pub fn new() -> Self {
131        Self::default()
132    }
133
134    /// The last-processed sequence number for `stream_id`.
135    ///
136    /// Returns `0` when the stream has never been processed (signals a full
137    /// replay is needed for that stream).
138    #[must_use]
139    pub fn cursor_for(&self, stream_id: &StreamId) -> u64 {
140        self.cursors.get(stream_id).copied().unwrap_or(0)
141    }
142
143    /// Update the cursor for `stream_id` to `sequence` (if `sequence` is
144    /// greater than the current cursor).
145    pub fn advance(&mut self, stream_id: &StreamId, sequence: u64) {
146        let entry = self.cursors.entry(stream_id.clone()).or_insert(0);
147        if sequence > *entry {
148            *entry = sequence;
149        }
150    }
151}
152
153// ── ProjectionRunner ──────────────────────────────────────────────────────────
154
155/// Persist and load named [`GlobalProjectionCheckpoint`] values.
156///
157/// Implement this trait on your event store to enable
158/// [`ProjectionRunner::catch_up_persistent`], which avoids full replays on
159/// restart by persisting cursor progress after each catch-up cycle.
160///
161/// The SlateDB implementation stores one key per (projection, stream) pair
162/// under the `cp/{name}/{stream_id}` key space (raw u64 LE — no JSON). This
163/// bounds each `catch_up_persistent` cycle to O(changed_streams) writes
164/// instead of O(total_streams), which matters for MABIS deployments tracking
165/// tens of thousands of streams. Other backing stores may choose any suitable
166/// serialisation.
167#[allow(async_fn_in_trait)]
168pub trait ProjectionCheckpointStore {
169    /// Load a previously saved checkpoint by name.
170    ///
171    /// Returns an empty [`GlobalProjectionCheckpoint`] (all cursors zero) when
172    /// no checkpoint has been persisted for `name` yet — this triggers a full
173    /// replay from the beginning.
174    ///
175    /// # Errors
176    ///
177    /// Returns [`EngineError::Store`] on storage failure.
178    async fn load_projection_checkpoint(
179        &self,
180        name: &str,
181    ) -> Result<GlobalProjectionCheckpoint, EngineError>;
182
183    /// Persist `checkpoint` under `name`, overwriting any previously stored
184    /// value.
185    ///
186    /// # Errors
187    ///
188    /// Returns [`EngineError::Store`] on storage failure.
189    async fn save_projection_checkpoint(
190        &self,
191        name: &str,
192        checkpoint: &GlobalProjectionCheckpoint,
193    ) -> Result<(), EngineError>;
194
195    /// Persist only the cursors that advanced since `previous`.
196    ///
197    /// The default implementation ignores `previous` and saves the full
198    /// `current` checkpoint. Override in storage backends that support
199    /// per-key atomic writes (e.g. SlateDB `WriteBatch`) for O(changed)
200    /// write cost instead of O(total streams).
201    ///
202    /// # Errors
203    ///
204    /// Returns [`EngineError::Store`] on storage failure.
205    async fn advance_projection_cursors(
206        &self,
207        name: &str,
208        _previous: &GlobalProjectionCheckpoint,
209        current: &GlobalProjectionCheckpoint,
210    ) -> Result<(), EngineError> {
211        self.save_projection_checkpoint(name, current).await
212    }
213}
214
215// ── ProjectionRunner ──────────────────────────────────────────────────────────
216
217/// Drives one or more projections over a slice of events.
218///
219/// The runner is stateless — it simply iterates over events and calls
220/// [`Projection::handle_event`] for each.
221pub struct ProjectionRunner;
222
223impl ProjectionRunner {
224    /// Feed all `events` into `projection` in order (full replay).
225    ///
226    /// # Performance
227    ///
228    /// This method requires the caller to have already loaded all `events` into
229    /// a `Vec`. For large event streams, prefer [`run_from_store`] /
230    /// [`run_all_streams`] which use `fold_stream` internally and avoid
231    /// allocating the full event slice.
232    ///
233    /// [`run_from_store`]: ProjectionRunner::run_from_store
234    /// [`run_all_streams`]: ProjectionRunner::run_all_streams
235    pub fn run<P: Projection>(projection: &mut P, events: &[EventEnvelope]) {
236        for event in events {
237            projection.handle_event(event);
238        }
239    }
240
241    /// Feed all `events` into multiple projections simultaneously (single pass,
242    /// full replay).
243    ///
244    /// # Performance
245    ///
246    /// Same caveat as [`run`]: the caller must supply a pre-loaded slice.
247    /// For large streams, prefer [`run_all_streams`] which streams events
248    /// directly from the store with O(1) working memory.
249    ///
250    /// [`run`]: ProjectionRunner::run
251    /// [`run_all_streams`]: ProjectionRunner::run_all_streams
252    pub fn run_all(projections: &mut [&mut dyn Projection], events: &[EventEnvelope]) {
253        for event in events {
254            for projection in projections.iter_mut() {
255                projection.handle_event(event);
256            }
257        }
258    }
259
260    /// Feed only events newer than the projection's cursor into `projection`.
261    ///
262    /// Queries [`Projection::last_sequence`] to determine the starting point.
263    /// If the projection returns `None`, all `events` are fed (same as [`run`]).
264    ///
265    /// `events` must be sorted by `sequence_number` in ascending order (which
266    /// is the contract for all slices returned by [`EventStore::load`] /
267    /// [`EventStore::load_from`]).
268    ///
269    /// This is a binary-search–accelerated variant: it finds the first event
270    /// past the cursor in O(log n) then feeds the tail in O(k) where k is the
271    /// number of new events.
272    ///
273    /// [`run`]: ProjectionRunner::run
274    /// [`EventStore::load`]: crate::event_store::EventStore::load
275    /// [`EventStore::load_from`]: crate::event_store::EventStore::load_from
276    pub fn catch_up<P: Projection>(projection: &mut P, events: &[EventEnvelope]) {
277        let from = projection.last_sequence().unwrap_or(0);
278        if from == 0 {
279            Self::run(projection, events);
280            return;
281        }
282        // Binary search for the first event with sequence_number > from.
283        let start = events.partition_point(|e| e.sequence_number <= from);
284        for event in &events[start..] {
285            projection.handle_event(event);
286        }
287    }
288
289    /// Full replay of `stream_id` into `projection` without pre-loading the
290    /// event slice into a `Vec`.
291    ///
292    /// Uses [`EventStore::fold_stream`] internally so production backends can
293    /// stream events with cursor-based pagination rather than loading all
294    /// events at once.
295    ///
296    /// # Errors
297    ///
298    /// Returns [`EngineError::Store`] on storage failure.
299    /// Returns [`EngineError::Deserialization`] when the fold closure returns
300    /// an error (propagated from the store).
301    pub async fn run_from_store<P, S>(
302        projection: &mut P,
303        store: &S,
304        stream_id: &StreamId,
305    ) -> Result<(), EngineError>
306    where
307        P: Projection + Send,
308        S: EventStore,
309    {
310        store
311            .fold_stream(stream_id, 0, (), |(), env| {
312                projection.handle_event(&env);
313                Ok(())
314            })
315            .await
316    }
317
318    /// Incremental catch-up of `stream_id` into `projection` without
319    /// pre-loading the event slice into a `Vec`.
320    ///
321    /// Queries [`Projection::last_sequence`] to determine the starting point.
322    /// If the projection returns `None`, performs a full replay (same as
323    /// [`run_from_store`]).
324    ///
325    /// # Errors
326    ///
327    /// Returns [`EngineError::Store`] on storage failure.
328    ///
329    /// [`run_from_store`]: ProjectionRunner::run_from_store
330    pub async fn catch_up_from_store<P, S>(
331        projection: &mut P,
332        store: &S,
333        stream_id: &StreamId,
334    ) -> Result<(), EngineError>
335    where
336        P: Projection + Send,
337        S: EventStore,
338    {
339        let from = projection.last_sequence().unwrap_or(0);
340        store
341            .fold_stream(stream_id, from, (), |(), env| {
342                projection.handle_event(&env);
343                Ok(())
344            })
345            .await
346    }
347
348    // ── Multi-stream ─────────────────────────────────────────────────────────
349
350    /// Full replay of multiple `stream_ids` into `projection`.
351    ///
352    /// Events from each stream are fed in sequence order within that stream.
353    /// Streams are processed in the order given by `stream_ids` — if
354    /// cross-stream event ordering matters, sort `stream_ids` accordingly
355    /// or use a single global-sequence backend.
356    ///
357    /// Returns a [`GlobalProjectionCheckpoint`] recording the last-processed
358    /// sequence number for every stream. Pass this to
359    /// [`catch_up_all_streams`] for subsequent incremental updates.
360    ///
361    /// # Production workers: use `catch_up_persistent` instead
362    ///
363    /// `run_all_streams` performs a **full replay from sequence 0** every
364    /// time it is called. In a long-running background worker this becomes
365    /// prohibitively expensive as the event log grows. Use
366    /// [`catch_up_persistent`] instead — it loads and saves a durable
367    /// checkpoint so only events appended since the last run are fed to the
368    /// projection.
369    ///
370    /// This method is appropriate for one-shot diagnostic tools, tests, or
371    /// the very first population of a new projection.
372    ///
373    /// # Errors
374    ///
375    /// Returns [`EngineError::Store`] on storage failure for any stream.
376    ///
377    /// [`catch_up_all_streams`]: ProjectionRunner::catch_up_all_streams
378    /// [`catch_up_persistent`]:  ProjectionRunner::catch_up_persistent
379    #[must_use = "pass the returned checkpoint to subsequent catch_up_all_streams calls; \
380                  dropping it silently restarts replay from the beginning"]
381    pub async fn run_all_streams<P, S>(
382        projection: &mut P,
383        store: &S,
384        stream_ids: &[StreamId],
385    ) -> Result<GlobalProjectionCheckpoint, EngineError>
386    where
387        P: Projection + Send,
388        S: EventStore,
389    {
390        let mut checkpoint = GlobalProjectionCheckpoint::new();
391        for stream_id in stream_ids {
392            let last_seq = store
393                .fold_stream(stream_id, 0, 0u64, |_, env| {
394                    let seq = env.sequence_number;
395                    projection.handle_event(&env);
396                    Ok(seq)
397                })
398                .await?;
399            if last_seq > 0 {
400                checkpoint.advance(stream_id, last_seq);
401            }
402        }
403        Ok(checkpoint)
404    }
405
406    /// Incremental catch-up of multiple `stream_ids` into `projection`.
407    ///
408    /// For each stream, queries `checkpoint` for the last-processed sequence
409    /// number and feeds only events newer than that cursor.
410    ///
411    /// Returns an updated [`GlobalProjectionCheckpoint`] reflecting the new
412    /// cursors after this catch-up pass. Pass the returned checkpoint to the
413    /// next `catch_up_all_streams` call — do not reuse the input checkpoint.
414    ///
415    /// # Errors
416    ///
417    /// Returns [`EngineError::Store`] on storage failure for any stream.
418    #[must_use = "pass the returned checkpoint to the next catch_up_all_streams call; \
419                  dropping it silently discards incremental progress"]
420    pub async fn catch_up_all_streams<P, S>(
421        projection: &mut P,
422        store: &S,
423        stream_ids: &[StreamId],
424        checkpoint: &GlobalProjectionCheckpoint,
425    ) -> Result<GlobalProjectionCheckpoint, EngineError>
426    where
427        P: Projection + Send,
428        S: EventStore,
429    {
430        let mut updated = checkpoint.clone();
431        for stream_id in stream_ids {
432            let from = checkpoint.cursor_for(stream_id);
433            let last_seq = store
434                .fold_stream(stream_id, from, from, |_, env| {
435                    let seq = env.sequence_number;
436                    projection.handle_event(&env);
437                    Ok(seq)
438                })
439                .await?;
440            if last_seq > from {
441                updated.advance(stream_id, last_seq);
442            }
443        }
444        Ok(updated)
445    }
446
447    /// Discover all streams matching `prefix` and replay them into `projection`.
448    ///
449    /// Convenience wrapper around [`EventStore::list_streams`] +
450    /// [`run_all_streams`]. Useful when the full set of streams is not known
451    /// at compile time.
452    ///
453    /// # Production workers: use `catch_up_persistent` instead
454    ///
455    /// This function performs a **full replay from sequence 0** every call.
456    /// For persistent background workers, use [`catch_up_persistent`] so only
457    /// events appended since the last checkpoint are processed.
458    ///
459    /// # Errors
460    ///
461    /// Returns [`EngineError::Store`] on storage failures.
462    ///
463    /// [`run_all_streams`]:     ProjectionRunner::run_all_streams
464    /// [`catch_up_persistent`]: ProjectionRunner::catch_up_persistent
465    pub async fn run_matching_streams<P, S>(
466        projection: &mut P,
467        store: &S,
468        prefix: Option<&str>,
469    ) -> Result<GlobalProjectionCheckpoint, EngineError>
470    where
471        P: Projection + Send,
472        S: EventStore,
473    {
474        let streams = store.list_streams(prefix).await?;
475        Self::run_all_streams(projection, store, &streams).await
476    }
477
478    /// Incremental catch-up of all streams matching `prefix`.
479    ///
480    /// Convenience wrapper for the common pattern of discovering streams and
481    /// then calling `catch_up_all_streams`.
482    ///
483    /// # Errors
484    ///
485    /// Returns [`EngineError::Store`] on storage failures.
486    pub async fn catch_up_matching_streams<P, S>(
487        projection: &mut P,
488        store: &S,
489        prefix: Option<&str>,
490        checkpoint: &GlobalProjectionCheckpoint,
491    ) -> Result<GlobalProjectionCheckpoint, EngineError>
492    where
493        P: Projection + Send,
494        S: EventStore,
495    {
496        let streams = store.list_streams(prefix).await?;
497        Self::catch_up_all_streams(projection, store, &streams, checkpoint).await
498    }
499
500    /// Incremental, persistent catch-up for all streams matching `prefix`.
501    ///
502    /// Loads the named checkpoint from `store`, performs an incremental
503    /// catch-up of every matching stream, then saves the updated checkpoint
504    /// back atomically.  On the next call, only events appended since the last
505    /// run are processed — avoiding full replays across restarts.
506    ///
507    /// This is the preferred entry point for background projection workers
508    /// that must survive process restarts.
509    ///
510    /// # Key space
511    ///
512    /// The SlateDB implementation stores `cp/{checkpoint_name}/{stream_id}` →
513    /// `u64 LE` (8 bytes) per stream. Each cycle only writes the streams
514    /// whose cursors advanced, giving O(changed_streams) write cost instead
515    /// of O(total_streams).
516    ///
517    /// # Errors
518    ///
519    /// Returns [`EngineError::Store`] on any storage failure (checkpoint load,
520    /// event scan, or checkpoint save).
521    pub async fn catch_up_persistent<P, S>(
522        projection: &mut P,
523        store: &S,
524        prefix: Option<&str>,
525        checkpoint_name: &str,
526    ) -> Result<GlobalProjectionCheckpoint, EngineError>
527    where
528        P: Projection + Send,
529        S: EventStore + ProjectionCheckpointStore,
530    {
531        let checkpoint = store.load_projection_checkpoint(checkpoint_name).await?;
532        let streams = store.list_streams(prefix).await?;
533        let updated = Self::catch_up_all_streams(projection, store, &streams, &checkpoint).await?;
534        store
535            .advance_projection_cursors(checkpoint_name, &checkpoint, &updated)
536            .await?;
537        Ok(updated)
538    }
539}
540
541#[cfg(test)]
542mod tests {
543    use super::*;
544    use crate::{
545        envelope::NewEvent,
546        event_store::{ExpectedVersion, InMemoryEventStore},
547        ids::{ConversationId, CorrelationId, ProcessId, StreamId, TenantId},
548        version::WorkflowId,
549    };
550    use serde_json::json;
551
552    /// A simple counter projection that counts events and tracks its cursor.
553    struct Counter {
554        count: usize,
555        last: Option<u64>,
556    }
557
558    impl Counter {
559        fn new() -> Self {
560            Self {
561                count: 0,
562                last: None,
563            }
564        }
565    }
566
567    impl Projection for Counter {
568        fn name(&self) -> &'static str {
569            "counter"
570        }
571
572        fn handle_event(&mut self, envelope: &EventEnvelope) {
573            self.count += 1;
574            self.last = Some(envelope.sequence_number);
575        }
576
577        fn last_sequence(&self) -> Option<u64> {
578            self.last
579        }
580    }
581
582    fn make_event() -> NewEvent {
583        NewEvent {
584            correlation_id: CorrelationId::new(),
585            causation_id: None,
586            conversation_id: ConversationId::new(),
587            process_id: ProcessId::new(),
588            tenant_id: TenantId::new(),
589            workflow_id: WorkflowId::new("test", "FV2024-10-01"),
590            event_type: "TestEvent".into(),
591            schema_version: 1,
592            payload: json!({}),
593        }
594    }
595
596    #[tokio::test]
597    async fn run_from_store_full_replay() {
598        let store = InMemoryEventStore::new();
599        let stream = StreamId::new("proj/s1");
600
601        store
602            .append(
603                &stream,
604                ExpectedVersion::NoStream,
605                &[make_event(), make_event(), make_event()],
606            )
607            .await
608            .unwrap();
609
610        let mut proj = Counter::new();
611        ProjectionRunner::run_from_store(&mut proj, &store, &stream)
612            .await
613            .unwrap();
614
615        assert_eq!(proj.count, 3);
616        assert_eq!(proj.last, Some(3));
617    }
618
619    #[tokio::test]
620    async fn catch_up_from_store_incremental() {
621        let store = InMemoryEventStore::new();
622        let stream = StreamId::new("proj/s2");
623
624        store
625            .append(
626                &stream,
627                ExpectedVersion::NoStream,
628                &[make_event(), make_event()],
629            )
630            .await
631            .unwrap();
632
633        let mut proj = Counter::new();
634        // Full replay first.
635        ProjectionRunner::run_from_store(&mut proj, &store, &stream)
636            .await
637            .unwrap();
638        assert_eq!(proj.count, 2);
639
640        // Append two more events.
641        store
642            .append(
643                &stream,
644                ExpectedVersion::Exact(2),
645                &[make_event(), make_event()],
646            )
647            .await
648            .unwrap();
649
650        // Incremental catch-up should feed only the two new events.
651        ProjectionRunner::catch_up_from_store(&mut proj, &store, &stream)
652            .await
653            .unwrap();
654        assert_eq!(proj.count, 4);
655        assert_eq!(proj.last, Some(4));
656    }
657
658    // ── Multi-stream tests ────────────────────────────────────────────────────
659
660    #[tokio::test]
661    async fn run_all_streams_aggregates_across_multiple_streams() {
662        let store = InMemoryEventStore::new();
663        let s1 = StreamId::new("process/ms-s1");
664        let s2 = StreamId::new("process/ms-s2");
665        let s3 = StreamId::new("process/ms-s3");
666
667        // 2 events in s1, 3 in s2, 1 in s3.
668        store
669            .append(
670                &s1,
671                ExpectedVersion::NoStream,
672                &[make_event(), make_event()],
673            )
674            .await
675            .unwrap();
676        store
677            .append(
678                &s2,
679                ExpectedVersion::NoStream,
680                &[make_event(), make_event(), make_event()],
681            )
682            .await
683            .unwrap();
684        store
685            .append(&s3, ExpectedVersion::NoStream, &[make_event()])
686            .await
687            .unwrap();
688
689        let mut proj = Counter::new();
690        let cp = ProjectionRunner::run_all_streams(
691            &mut proj,
692            &store,
693            &[s1.clone(), s2.clone(), s3.clone()],
694        )
695        .await
696        .unwrap();
697
698        assert_eq!(proj.count, 6, "all 6 events across 3 streams must be fed");
699        assert_eq!(cp.cursor_for(&s1), 2);
700        assert_eq!(cp.cursor_for(&s2), 3);
701        assert_eq!(cp.cursor_for(&s3), 1);
702    }
703
704    #[tokio::test]
705    async fn catch_up_all_streams_feeds_only_new_events() {
706        let store = InMemoryEventStore::new();
707        let s1 = StreamId::new("process/cu-s1");
708        let s2 = StreamId::new("process/cu-s2");
709
710        store
711            .append(
712                &s1,
713                ExpectedVersion::NoStream,
714                &[make_event(), make_event()],
715            )
716            .await
717            .unwrap();
718        store
719            .append(&s2, ExpectedVersion::NoStream, &[make_event()])
720            .await
721            .unwrap();
722
723        let mut proj = Counter::new();
724        let cp = ProjectionRunner::run_all_streams(&mut proj, &store, &[s1.clone(), s2.clone()])
725            .await
726            .unwrap();
727        assert_eq!(proj.count, 3);
728        assert_eq!(cp.cursor_for(&s1), 2);
729        assert_eq!(cp.cursor_for(&s2), 1);
730
731        // Add one event to each stream.
732        store
733            .append(&s1, ExpectedVersion::Exact(2), &[make_event()])
734            .await
735            .unwrap();
736        store
737            .append(
738                &s2,
739                ExpectedVersion::Exact(1),
740                &[make_event(), make_event()],
741            )
742            .await
743            .unwrap();
744
745        let cp2 = ProjectionRunner::catch_up_all_streams(
746            &mut proj,
747            &store,
748            &[s1.clone(), s2.clone()],
749            &cp,
750        )
751        .await
752        .unwrap();
753
754        assert_eq!(proj.count, 6, "3 new events added across both streams");
755        assert_eq!(cp2.cursor_for(&s1), 3, "s1 advanced from 2 to 3");
756        assert_eq!(cp2.cursor_for(&s2), 3, "s2 advanced from 1 to 3");
757    }
758
759    #[tokio::test]
760    async fn run_matching_streams_uses_prefix_filter() {
761        let store = InMemoryEventStore::new();
762        let proc1 = StreamId::new("process/match-p1");
763        let proc2 = StreamId::new("process/match-p2");
764        let partner = StreamId::new("partner/match-pp1"); // should NOT be included
765
766        store
767            .append(&proc1, ExpectedVersion::NoStream, &[make_event()])
768            .await
769            .unwrap();
770        store
771            .append(
772                &proc2,
773                ExpectedVersion::NoStream,
774                &[make_event(), make_event()],
775            )
776            .await
777            .unwrap();
778        store
779            .append(&partner, ExpectedVersion::NoStream, &[make_event()])
780            .await
781            .unwrap();
782
783        let mut proj = Counter::new();
784        let _ = ProjectionRunner::run_matching_streams(&mut proj, &store, Some("process/match-"))
785            .await
786            .unwrap();
787
788        // Only the 3 events from proc1 + proc2 should have been fed.
789        assert_eq!(
790            proj.count, 3,
791            "partner stream must be excluded by prefix filter"
792        );
793    }
794
795    #[tokio::test]
796    async fn global_projection_checkpoint_serde_roundtrip() {
797        let mut cp = GlobalProjectionCheckpoint::new();
798        cp.advance(&StreamId::new("p/1"), 5);
799        cp.advance(&StreamId::new("p/2"), 3);
800
801        let json = serde_json::to_string(&cp).unwrap();
802        let cp2: GlobalProjectionCheckpoint = serde_json::from_str(&json).unwrap();
803
804        assert_eq!(cp2.cursor_for(&StreamId::new("p/1")), 5);
805        assert_eq!(cp2.cursor_for(&StreamId::new("p/2")), 3);
806        assert_eq!(cp2.cursor_for(&StreamId::new("p/never")), 0);
807    }
808
809    #[tokio::test]
810    async fn list_streams_with_prefix() {
811        let store = InMemoryEventStore::new();
812        let s1 = StreamId::new("process/ls-a");
813        let s2 = StreamId::new("process/ls-b");
814        let other = StreamId::new("partner/ls-c");
815
816        store
817            .append(&s1, ExpectedVersion::NoStream, &[make_event()])
818            .await
819            .unwrap();
820        store
821            .append(&s2, ExpectedVersion::NoStream, &[make_event()])
822            .await
823            .unwrap();
824        store
825            .append(&other, ExpectedVersion::NoStream, &[make_event()])
826            .await
827            .unwrap();
828
829        let mut streams = store.list_streams(Some("process/")).await.unwrap();
830        streams.sort_by_key(|s| s.as_str().to_owned()); // deterministic order
831        assert_eq!(streams.len(), 2);
832        assert!(streams.iter().any(|s| s.as_str() == "process/ls-a"));
833        assert!(streams.iter().any(|s| s.as_str() == "process/ls-b"));
834
835        let all = store.list_streams(None).await.unwrap();
836        assert_eq!(all.len(), 3);
837    }
838}