mako_engine/projection.rs
1//! [`Projection`] trait and [`ProjectionRunner`].
2//!
3//! Projections build read models from the event stream. They are:
4//!
5//! - **Asynchronous** — fed events independently of the write path
6//! - **Disposable** — the read model can be dropped and rebuilt at any time
7//! - **Eventually consistent** — they may lag behind the write head
8//!
9//! Projection failures must never affect event persistence.
10//!
11//! # Incremental catch-up
12//!
13//! Projections that track their cursor position can implement
14//! [`Projection::last_sequence`] so [`ProjectionRunner::catch_up`] and
15//! [`ProjectionRunner::catch_up_from_store`] feed only new events.
16//!
17//! # Store-backed streaming
18//!
19//! [`ProjectionRunner::run_from_store`] and
20//! [`ProjectionRunner::catch_up_from_store`] load events directly from an
21//! [`EventStore`] without requiring the caller to pre-load the entire
22//! event slice into memory.
23//!
24//! # Multi-stream projections
25//!
26//! [`ProjectionRunner::run_all_streams`] and
27//! [`ProjectionRunner::catch_up_all_streams`] drive a projection across
28//! multiple streams simultaneously. This is required for process families
29//! where a read model aggregates across many process instances — for
30//! example, MABIS Bilanzkreisabrechnung aggregating events across thousands
31//! of MaLo-level process streams for a single billing period.
32//!
33//! The [`GlobalProjectionCheckpoint`] records per-stream cursors so
34//! incremental catch-up only feeds events newer than the last replay.
35//!
36//! ```rust,ignore
37//! // Initial full replay across all process streams:
38//! let checkpoint = ProjectionRunner::run_all_streams(
39//! &mut billing_proj,
40//! &store,
41//! &stream_ids,
42//! ).await?;
43//!
44//! // Later: incremental update after new events arrive:
45//! let checkpoint = ProjectionRunner::catch_up_all_streams(
46//! &mut billing_proj,
47//! &store,
48//! &stream_ids,
49//! &checkpoint,
50//! ).await?;
51//! ```
52//!
53//! To enumerate all process streams automatically, use
54//! [`EventStore::list_streams`] with a prefix. Pass `"process/"` to scan
55//! all tenants, or `&format!("process/{tenant_id}/")` to scope to one tenant:
56//!
57//! ```rust,ignore
58//! // All tenants:
59//! let streams = store.list_streams(Some("process/")).await?;
60//! // Single tenant:
61//! let streams = store.list_streams(Some(&format!("process/{tenant_id}/"))).await?;
62//! let checkpoint = ProjectionRunner::run_all_streams(
63//! &mut billing_proj, &store, &streams,
64//! ).await?;
65//! ```
66
67use std::collections::BTreeMap;
68
69use crate::{envelope::EventEnvelope, error::EngineError, event_store::EventStore, ids::StreamId};
70
71// ── Projection trait ──────────────────────────────────────────────────────────
72
73/// A read-model builder that consumes events and maintains queryable state.
74///
75/// # Contract
76///
77/// - `handle_event` is called for every event in stream order.
78/// - `handle_event` must not panic on events it doesn't recognise (forward
79/// compatibility: new event types appear when new domain features are
80/// deployed before all projections are updated).
81/// - The projection is rebuilt from scratch by replaying all events through
82/// [`ProjectionRunner::run`]; implementations must tolerate this.
83pub trait Projection {
84 /// A stable human-readable name for this projection (used in logs/metrics).
85 fn name(&self) -> &'static str;
86
87 /// Process a single event, updating internal read-model state.
88 fn handle_event(&mut self, envelope: &EventEnvelope);
89
90 /// The sequence number of the last event this projection processed.
91 ///
92 /// Return `None` when the projection has not processed any events yet
93 /// (i.e. it needs a full replay).
94 ///
95 /// Implement this method if your projection stores the cursor alongside
96 /// the read model so [`ProjectionRunner::catch_up`] can perform
97 /// incremental updates.
98 ///
99 /// Defaults to `None`.
100 fn last_sequence(&self) -> Option<u64> {
101 None
102 }
103}
104
105// ── GlobalProjectionCheckpoint ────────────────────────────────────────────────
106
107/// Per-stream sequence number cursors for multi-stream projections.
108///
109/// Returned by [`ProjectionRunner::run_all_streams`] and
110/// [`ProjectionRunner::catch_up_all_streams`]. Pass an existing checkpoint
111/// to `catch_up_all_streams` so only events newer than the last replay are
112/// fed to the projection.
113///
114/// Persist this value (e.g. alongside the read model in a snapshot store) to
115/// survive process restarts and avoid full replays on restart.
116///
117/// A cursor value of `0` for a stream means "never seen" (equivalent to
118/// "replay from the beginning").
119#[derive(Debug, Default, Clone, serde::Serialize, serde::Deserialize)]
120pub struct GlobalProjectionCheckpoint {
121 /// Last-processed sequence number per stream identifier.
122 ///
123 /// Streams not present in this map have an implicit cursor of `0`.
124 pub cursors: BTreeMap<StreamId, u64>,
125}
126
127impl GlobalProjectionCheckpoint {
128 /// Create an empty checkpoint (all streams will be fully replayed).
129 #[must_use]
130 pub fn new() -> Self {
131 Self::default()
132 }
133
134 /// The last-processed sequence number for `stream_id`.
135 ///
136 /// Returns `0` when the stream has never been processed (signals a full
137 /// replay is needed for that stream).
138 #[must_use]
139 pub fn cursor_for(&self, stream_id: &StreamId) -> u64 {
140 self.cursors.get(stream_id).copied().unwrap_or(0)
141 }
142
143 /// Update the cursor for `stream_id` to `sequence` (if `sequence` is
144 /// greater than the current cursor).
145 pub fn advance(&mut self, stream_id: &StreamId, sequence: u64) {
146 let entry = self.cursors.entry(stream_id.clone()).or_insert(0);
147 if sequence > *entry {
148 *entry = sequence;
149 }
150 }
151}
152
153// ── ProjectionRunner ──────────────────────────────────────────────────────────
154
155/// Persist and load named [`GlobalProjectionCheckpoint`] values.
156///
157/// Implement this trait on your event store to enable
158/// [`ProjectionRunner::catch_up_persistent`], which avoids full replays on
159/// restart by persisting cursor progress after each catch-up cycle.
160///
161/// The SlateDB implementation stores one key per (projection, stream) pair
162/// under the `cp/{name}/{stream_id}` key space (raw u64 LE — no JSON). This
163/// bounds each `catch_up_persistent` cycle to O(changed_streams) writes
164/// instead of O(total_streams), which matters for MABIS deployments tracking
165/// tens of thousands of streams. Other backing stores may choose any suitable
166/// serialisation.
167#[allow(async_fn_in_trait)]
168pub trait ProjectionCheckpointStore {
169 /// Load a previously saved checkpoint by name.
170 ///
171 /// Returns an empty [`GlobalProjectionCheckpoint`] (all cursors zero) when
172 /// no checkpoint has been persisted for `name` yet — this triggers a full
173 /// replay from the beginning.
174 ///
175 /// # Errors
176 ///
177 /// Returns [`EngineError::Store`] on storage failure.
178 async fn load_projection_checkpoint(
179 &self,
180 name: &str,
181 ) -> Result<GlobalProjectionCheckpoint, EngineError>;
182
183 /// Persist `checkpoint` under `name`, overwriting any previously stored
184 /// value.
185 ///
186 /// # Errors
187 ///
188 /// Returns [`EngineError::Store`] on storage failure.
189 async fn save_projection_checkpoint(
190 &self,
191 name: &str,
192 checkpoint: &GlobalProjectionCheckpoint,
193 ) -> Result<(), EngineError>;
194
195 /// Persist only the cursors that advanced since `previous`.
196 ///
197 /// The default implementation ignores `previous` and saves the full
198 /// `current` checkpoint. Override in storage backends that support
199 /// per-key atomic writes (e.g. SlateDB `WriteBatch`) for O(changed)
200 /// write cost instead of O(total streams).
201 ///
202 /// # Errors
203 ///
204 /// Returns [`EngineError::Store`] on storage failure.
205 async fn advance_projection_cursors(
206 &self,
207 name: &str,
208 _previous: &GlobalProjectionCheckpoint,
209 current: &GlobalProjectionCheckpoint,
210 ) -> Result<(), EngineError> {
211 self.save_projection_checkpoint(name, current).await
212 }
213}
214
215// ── ProjectionRunner ──────────────────────────────────────────────────────────
216
217/// Drives one or more projections over a slice of events.
218///
219/// The runner is stateless — it simply iterates over events and calls
220/// [`Projection::handle_event`] for each.
221pub struct ProjectionRunner;
222
223impl ProjectionRunner {
224 /// Feed all `events` into `projection` in order (full replay).
225 ///
226 /// # Performance
227 ///
228 /// This method requires the caller to have already loaded all `events` into
229 /// a `Vec`. For large event streams, prefer [`run_from_store`] /
230 /// [`run_all_streams`] which use `fold_stream` internally and avoid
231 /// allocating the full event slice.
232 ///
233 /// [`run_from_store`]: ProjectionRunner::run_from_store
234 /// [`run_all_streams`]: ProjectionRunner::run_all_streams
235 pub fn run<P: Projection>(projection: &mut P, events: &[EventEnvelope]) {
236 for event in events {
237 projection.handle_event(event);
238 }
239 }
240
241 /// Feed all `events` into multiple projections simultaneously (single pass,
242 /// full replay).
243 ///
244 /// # Performance
245 ///
246 /// Same caveat as [`run`]: the caller must supply a pre-loaded slice.
247 /// For large streams, prefer [`run_all_streams`] which streams events
248 /// directly from the store with O(1) working memory.
249 ///
250 /// [`run`]: ProjectionRunner::run
251 /// [`run_all_streams`]: ProjectionRunner::run_all_streams
252 pub fn run_all(projections: &mut [&mut dyn Projection], events: &[EventEnvelope]) {
253 for event in events {
254 for projection in projections.iter_mut() {
255 projection.handle_event(event);
256 }
257 }
258 }
259
260 /// Feed only events newer than the projection's cursor into `projection`.
261 ///
262 /// Queries [`Projection::last_sequence`] to determine the starting point.
263 /// If the projection returns `None`, all `events` are fed (same as [`run`]).
264 ///
265 /// `events` must be sorted by `sequence_number` in ascending order (which
266 /// is the contract for all slices returned by [`EventStore::load`] /
267 /// [`EventStore::load_from`]).
268 ///
269 /// This is a binary-search–accelerated variant: it finds the first event
270 /// past the cursor in O(log n) then feeds the tail in O(k) where k is the
271 /// number of new events.
272 ///
273 /// [`run`]: ProjectionRunner::run
274 /// [`EventStore::load`]: crate::event_store::EventStore::load
275 /// [`EventStore::load_from`]: crate::event_store::EventStore::load_from
276 pub fn catch_up<P: Projection>(projection: &mut P, events: &[EventEnvelope]) {
277 let from = projection.last_sequence().unwrap_or(0);
278 if from == 0 {
279 Self::run(projection, events);
280 return;
281 }
282 // Binary search for the first event with sequence_number > from.
283 let start = events.partition_point(|e| e.sequence_number <= from);
284 for event in &events[start..] {
285 projection.handle_event(event);
286 }
287 }
288
289 /// Full replay of `stream_id` into `projection` without pre-loading the
290 /// event slice into a `Vec`.
291 ///
292 /// Uses [`EventStore::fold_stream`] internally so production backends can
293 /// stream events with cursor-based pagination rather than loading all
294 /// events at once.
295 ///
296 /// # Errors
297 ///
298 /// Returns [`EngineError::Store`] on storage failure.
299 /// Returns [`EngineError::Deserialization`] when the fold closure returns
300 /// an error (propagated from the store).
301 pub async fn run_from_store<P, S>(
302 projection: &mut P,
303 store: &S,
304 stream_id: &StreamId,
305 ) -> Result<(), EngineError>
306 where
307 P: Projection + Send,
308 S: EventStore,
309 {
310 store
311 .fold_stream(stream_id, 0, (), |(), env| {
312 projection.handle_event(&env);
313 Ok(())
314 })
315 .await
316 }
317
318 /// Incremental catch-up of `stream_id` into `projection` without
319 /// pre-loading the event slice into a `Vec`.
320 ///
321 /// Queries [`Projection::last_sequence`] to determine the starting point.
322 /// If the projection returns `None`, performs a full replay (same as
323 /// [`run_from_store`]).
324 ///
325 /// # Errors
326 ///
327 /// Returns [`EngineError::Store`] on storage failure.
328 ///
329 /// [`run_from_store`]: ProjectionRunner::run_from_store
330 pub async fn catch_up_from_store<P, S>(
331 projection: &mut P,
332 store: &S,
333 stream_id: &StreamId,
334 ) -> Result<(), EngineError>
335 where
336 P: Projection + Send,
337 S: EventStore,
338 {
339 let from = projection.last_sequence().unwrap_or(0);
340 store
341 .fold_stream(stream_id, from, (), |(), env| {
342 projection.handle_event(&env);
343 Ok(())
344 })
345 .await
346 }
347
348 // ── Multi-stream ─────────────────────────────────────────────────────────
349
350 /// Full replay of multiple `stream_ids` into `projection`.
351 ///
352 /// Events from each stream are fed in sequence order within that stream.
353 /// Streams are processed in the order given by `stream_ids` — if
354 /// cross-stream event ordering matters, sort `stream_ids` accordingly
355 /// or use a single global-sequence backend.
356 ///
357 /// Returns a [`GlobalProjectionCheckpoint`] recording the last-processed
358 /// sequence number for every stream. Pass this to
359 /// [`catch_up_all_streams`] for subsequent incremental updates.
360 ///
361 /// # Production workers: use `catch_up_persistent` instead
362 ///
363 /// `run_all_streams` performs a **full replay from sequence 0** every
364 /// time it is called. In a long-running background worker this becomes
365 /// prohibitively expensive as the event log grows. Use
366 /// [`catch_up_persistent`] instead — it loads and saves a durable
367 /// checkpoint so only events appended since the last run are fed to the
368 /// projection.
369 ///
370 /// This method is appropriate for one-shot diagnostic tools, tests, or
371 /// the very first population of a new projection.
372 ///
373 /// # Errors
374 ///
375 /// Returns [`EngineError::Store`] on storage failure for any stream.
376 ///
377 /// [`catch_up_all_streams`]: ProjectionRunner::catch_up_all_streams
378 /// [`catch_up_persistent`]: ProjectionRunner::catch_up_persistent
379 #[must_use = "pass the returned checkpoint to subsequent catch_up_all_streams calls; \
380 dropping it silently restarts replay from the beginning"]
381 pub async fn run_all_streams<P, S>(
382 projection: &mut P,
383 store: &S,
384 stream_ids: &[StreamId],
385 ) -> Result<GlobalProjectionCheckpoint, EngineError>
386 where
387 P: Projection + Send,
388 S: EventStore,
389 {
390 let mut checkpoint = GlobalProjectionCheckpoint::new();
391 for stream_id in stream_ids {
392 let last_seq = store
393 .fold_stream(stream_id, 0, 0u64, |_, env| {
394 let seq = env.sequence_number;
395 projection.handle_event(&env);
396 Ok(seq)
397 })
398 .await?;
399 if last_seq > 0 {
400 checkpoint.advance(stream_id, last_seq);
401 }
402 }
403 Ok(checkpoint)
404 }
405
406 /// Incremental catch-up of multiple `stream_ids` into `projection`.
407 ///
408 /// For each stream, queries `checkpoint` for the last-processed sequence
409 /// number and feeds only events newer than that cursor.
410 ///
411 /// Returns an updated [`GlobalProjectionCheckpoint`] reflecting the new
412 /// cursors after this catch-up pass. Pass the returned checkpoint to the
413 /// next `catch_up_all_streams` call — do not reuse the input checkpoint.
414 ///
415 /// # Errors
416 ///
417 /// Returns [`EngineError::Store`] on storage failure for any stream.
418 #[must_use = "pass the returned checkpoint to the next catch_up_all_streams call; \
419 dropping it silently discards incremental progress"]
420 pub async fn catch_up_all_streams<P, S>(
421 projection: &mut P,
422 store: &S,
423 stream_ids: &[StreamId],
424 checkpoint: &GlobalProjectionCheckpoint,
425 ) -> Result<GlobalProjectionCheckpoint, EngineError>
426 where
427 P: Projection + Send,
428 S: EventStore,
429 {
430 let mut updated = checkpoint.clone();
431 for stream_id in stream_ids {
432 let from = checkpoint.cursor_for(stream_id);
433 let last_seq = store
434 .fold_stream(stream_id, from, from, |_, env| {
435 let seq = env.sequence_number;
436 projection.handle_event(&env);
437 Ok(seq)
438 })
439 .await?;
440 if last_seq > from {
441 updated.advance(stream_id, last_seq);
442 }
443 }
444 Ok(updated)
445 }
446
447 /// Discover all streams matching `prefix` and replay them into `projection`.
448 ///
449 /// Convenience wrapper around [`EventStore::list_streams`] +
450 /// [`run_all_streams`]. Useful when the full set of streams is not known
451 /// at compile time.
452 ///
453 /// # Production workers: use `catch_up_persistent` instead
454 ///
455 /// This function performs a **full replay from sequence 0** every call.
456 /// For persistent background workers, use [`catch_up_persistent`] so only
457 /// events appended since the last checkpoint are processed.
458 ///
459 /// # Errors
460 ///
461 /// Returns [`EngineError::Store`] on storage failures.
462 ///
463 /// [`run_all_streams`]: ProjectionRunner::run_all_streams
464 /// [`catch_up_persistent`]: ProjectionRunner::catch_up_persistent
465 pub async fn run_matching_streams<P, S>(
466 projection: &mut P,
467 store: &S,
468 prefix: Option<&str>,
469 ) -> Result<GlobalProjectionCheckpoint, EngineError>
470 where
471 P: Projection + Send,
472 S: EventStore,
473 {
474 let streams = store.list_streams(prefix).await?;
475 Self::run_all_streams(projection, store, &streams).await
476 }
477
478 /// Incremental catch-up of all streams matching `prefix`.
479 ///
480 /// Convenience wrapper for the common pattern of discovering streams and
481 /// then calling `catch_up_all_streams`.
482 ///
483 /// # Errors
484 ///
485 /// Returns [`EngineError::Store`] on storage failures.
486 pub async fn catch_up_matching_streams<P, S>(
487 projection: &mut P,
488 store: &S,
489 prefix: Option<&str>,
490 checkpoint: &GlobalProjectionCheckpoint,
491 ) -> Result<GlobalProjectionCheckpoint, EngineError>
492 where
493 P: Projection + Send,
494 S: EventStore,
495 {
496 let streams = store.list_streams(prefix).await?;
497 Self::catch_up_all_streams(projection, store, &streams, checkpoint).await
498 }
499
500 /// Incremental, persistent catch-up for all streams matching `prefix`.
501 ///
502 /// Loads the named checkpoint from `store`, performs an incremental
503 /// catch-up of every matching stream, then saves the updated checkpoint
504 /// back atomically. On the next call, only events appended since the last
505 /// run are processed — avoiding full replays across restarts.
506 ///
507 /// This is the preferred entry point for background projection workers
508 /// that must survive process restarts.
509 ///
510 /// # Key space
511 ///
512 /// The SlateDB implementation stores `cp/{checkpoint_name}/{stream_id}` →
513 /// `u64 LE` (8 bytes) per stream. Each cycle only writes the streams
514 /// whose cursors advanced, giving O(changed_streams) write cost instead
515 /// of O(total_streams).
516 ///
517 /// # Errors
518 ///
519 /// Returns [`EngineError::Store`] on any storage failure (checkpoint load,
520 /// event scan, or checkpoint save).
521 pub async fn catch_up_persistent<P, S>(
522 projection: &mut P,
523 store: &S,
524 prefix: Option<&str>,
525 checkpoint_name: &str,
526 ) -> Result<GlobalProjectionCheckpoint, EngineError>
527 where
528 P: Projection + Send,
529 S: EventStore + ProjectionCheckpointStore,
530 {
531 let checkpoint = store.load_projection_checkpoint(checkpoint_name).await?;
532 let streams = store.list_streams(prefix).await?;
533 let updated = Self::catch_up_all_streams(projection, store, &streams, &checkpoint).await?;
534 store
535 .advance_projection_cursors(checkpoint_name, &checkpoint, &updated)
536 .await?;
537 Ok(updated)
538 }
539}
540
541#[cfg(test)]
542mod tests {
543 use super::*;
544 use crate::{
545 envelope::NewEvent,
546 event_store::{ExpectedVersion, InMemoryEventStore},
547 ids::{ConversationId, CorrelationId, ProcessId, StreamId, TenantId},
548 version::WorkflowId,
549 };
550 use serde_json::json;
551
552 /// A simple counter projection that counts events and tracks its cursor.
553 struct Counter {
554 count: usize,
555 last: Option<u64>,
556 }
557
558 impl Counter {
559 fn new() -> Self {
560 Self {
561 count: 0,
562 last: None,
563 }
564 }
565 }
566
567 impl Projection for Counter {
568 fn name(&self) -> &'static str {
569 "counter"
570 }
571
572 fn handle_event(&mut self, envelope: &EventEnvelope) {
573 self.count += 1;
574 self.last = Some(envelope.sequence_number);
575 }
576
577 fn last_sequence(&self) -> Option<u64> {
578 self.last
579 }
580 }
581
582 fn make_event() -> NewEvent {
583 NewEvent {
584 correlation_id: CorrelationId::new(),
585 causation_id: None,
586 conversation_id: ConversationId::new(),
587 process_id: ProcessId::new(),
588 tenant_id: TenantId::new(),
589 workflow_id: WorkflowId::new("test", "FV2024-10-01"),
590 event_type: "TestEvent".into(),
591 schema_version: 1,
592 payload: json!({}),
593 }
594 }
595
596 #[tokio::test]
597 async fn run_from_store_full_replay() {
598 let store = InMemoryEventStore::new();
599 let stream = StreamId::new("proj/s1");
600
601 store
602 .append(
603 &stream,
604 ExpectedVersion::NoStream,
605 &[make_event(), make_event(), make_event()],
606 )
607 .await
608 .unwrap();
609
610 let mut proj = Counter::new();
611 ProjectionRunner::run_from_store(&mut proj, &store, &stream)
612 .await
613 .unwrap();
614
615 assert_eq!(proj.count, 3);
616 assert_eq!(proj.last, Some(3));
617 }
618
619 #[tokio::test]
620 async fn catch_up_from_store_incremental() {
621 let store = InMemoryEventStore::new();
622 let stream = StreamId::new("proj/s2");
623
624 store
625 .append(
626 &stream,
627 ExpectedVersion::NoStream,
628 &[make_event(), make_event()],
629 )
630 .await
631 .unwrap();
632
633 let mut proj = Counter::new();
634 // Full replay first.
635 ProjectionRunner::run_from_store(&mut proj, &store, &stream)
636 .await
637 .unwrap();
638 assert_eq!(proj.count, 2);
639
640 // Append two more events.
641 store
642 .append(
643 &stream,
644 ExpectedVersion::Exact(2),
645 &[make_event(), make_event()],
646 )
647 .await
648 .unwrap();
649
650 // Incremental catch-up should feed only the two new events.
651 ProjectionRunner::catch_up_from_store(&mut proj, &store, &stream)
652 .await
653 .unwrap();
654 assert_eq!(proj.count, 4);
655 assert_eq!(proj.last, Some(4));
656 }
657
658 // ── Multi-stream tests ────────────────────────────────────────────────────
659
660 #[tokio::test]
661 async fn run_all_streams_aggregates_across_multiple_streams() {
662 let store = InMemoryEventStore::new();
663 let s1 = StreamId::new("process/ms-s1");
664 let s2 = StreamId::new("process/ms-s2");
665 let s3 = StreamId::new("process/ms-s3");
666
667 // 2 events in s1, 3 in s2, 1 in s3.
668 store
669 .append(
670 &s1,
671 ExpectedVersion::NoStream,
672 &[make_event(), make_event()],
673 )
674 .await
675 .unwrap();
676 store
677 .append(
678 &s2,
679 ExpectedVersion::NoStream,
680 &[make_event(), make_event(), make_event()],
681 )
682 .await
683 .unwrap();
684 store
685 .append(&s3, ExpectedVersion::NoStream, &[make_event()])
686 .await
687 .unwrap();
688
689 let mut proj = Counter::new();
690 let cp = ProjectionRunner::run_all_streams(
691 &mut proj,
692 &store,
693 &[s1.clone(), s2.clone(), s3.clone()],
694 )
695 .await
696 .unwrap();
697
698 assert_eq!(proj.count, 6, "all 6 events across 3 streams must be fed");
699 assert_eq!(cp.cursor_for(&s1), 2);
700 assert_eq!(cp.cursor_for(&s2), 3);
701 assert_eq!(cp.cursor_for(&s3), 1);
702 }
703
704 #[tokio::test]
705 async fn catch_up_all_streams_feeds_only_new_events() {
706 let store = InMemoryEventStore::new();
707 let s1 = StreamId::new("process/cu-s1");
708 let s2 = StreamId::new("process/cu-s2");
709
710 store
711 .append(
712 &s1,
713 ExpectedVersion::NoStream,
714 &[make_event(), make_event()],
715 )
716 .await
717 .unwrap();
718 store
719 .append(&s2, ExpectedVersion::NoStream, &[make_event()])
720 .await
721 .unwrap();
722
723 let mut proj = Counter::new();
724 let cp = ProjectionRunner::run_all_streams(&mut proj, &store, &[s1.clone(), s2.clone()])
725 .await
726 .unwrap();
727 assert_eq!(proj.count, 3);
728 assert_eq!(cp.cursor_for(&s1), 2);
729 assert_eq!(cp.cursor_for(&s2), 1);
730
731 // Add one event to each stream.
732 store
733 .append(&s1, ExpectedVersion::Exact(2), &[make_event()])
734 .await
735 .unwrap();
736 store
737 .append(
738 &s2,
739 ExpectedVersion::Exact(1),
740 &[make_event(), make_event()],
741 )
742 .await
743 .unwrap();
744
745 let cp2 = ProjectionRunner::catch_up_all_streams(
746 &mut proj,
747 &store,
748 &[s1.clone(), s2.clone()],
749 &cp,
750 )
751 .await
752 .unwrap();
753
754 assert_eq!(proj.count, 6, "3 new events added across both streams");
755 assert_eq!(cp2.cursor_for(&s1), 3, "s1 advanced from 2 to 3");
756 assert_eq!(cp2.cursor_for(&s2), 3, "s2 advanced from 1 to 3");
757 }
758
759 #[tokio::test]
760 async fn run_matching_streams_uses_prefix_filter() {
761 let store = InMemoryEventStore::new();
762 let proc1 = StreamId::new("process/match-p1");
763 let proc2 = StreamId::new("process/match-p2");
764 let partner = StreamId::new("partner/match-pp1"); // should NOT be included
765
766 store
767 .append(&proc1, ExpectedVersion::NoStream, &[make_event()])
768 .await
769 .unwrap();
770 store
771 .append(
772 &proc2,
773 ExpectedVersion::NoStream,
774 &[make_event(), make_event()],
775 )
776 .await
777 .unwrap();
778 store
779 .append(&partner, ExpectedVersion::NoStream, &[make_event()])
780 .await
781 .unwrap();
782
783 let mut proj = Counter::new();
784 let _ = ProjectionRunner::run_matching_streams(&mut proj, &store, Some("process/match-"))
785 .await
786 .unwrap();
787
788 // Only the 3 events from proc1 + proc2 should have been fed.
789 assert_eq!(
790 proj.count, 3,
791 "partner stream must be excluded by prefix filter"
792 );
793 }
794
795 #[tokio::test]
796 async fn global_projection_checkpoint_serde_roundtrip() {
797 let mut cp = GlobalProjectionCheckpoint::new();
798 cp.advance(&StreamId::new("p/1"), 5);
799 cp.advance(&StreamId::new("p/2"), 3);
800
801 let json = serde_json::to_string(&cp).unwrap();
802 let cp2: GlobalProjectionCheckpoint = serde_json::from_str(&json).unwrap();
803
804 assert_eq!(cp2.cursor_for(&StreamId::new("p/1")), 5);
805 assert_eq!(cp2.cursor_for(&StreamId::new("p/2")), 3);
806 assert_eq!(cp2.cursor_for(&StreamId::new("p/never")), 0);
807 }
808
809 #[tokio::test]
810 async fn list_streams_with_prefix() {
811 let store = InMemoryEventStore::new();
812 let s1 = StreamId::new("process/ls-a");
813 let s2 = StreamId::new("process/ls-b");
814 let other = StreamId::new("partner/ls-c");
815
816 store
817 .append(&s1, ExpectedVersion::NoStream, &[make_event()])
818 .await
819 .unwrap();
820 store
821 .append(&s2, ExpectedVersion::NoStream, &[make_event()])
822 .await
823 .unwrap();
824 store
825 .append(&other, ExpectedVersion::NoStream, &[make_event()])
826 .await
827 .unwrap();
828
829 let mut streams = store.list_streams(Some("process/")).await.unwrap();
830 streams.sort_by_key(|s| s.as_str().to_owned()); // deterministic order
831 assert_eq!(streams.len(), 2);
832 assert!(streams.iter().any(|s| s.as_str() == "process/ls-a"));
833 assert!(streams.iter().any(|s| s.as_str() == "process/ls-b"));
834
835 let all = store.list_streams(None).await.unwrap();
836 assert_eq!(all.len(), 3);
837 }
838}