use std::collections::BTreeMap;
use crate::{envelope::EventEnvelope, error::EngineError, event_store::EventStore, ids::StreamId};
pub trait Projection {
fn name(&self) -> &'static str;
fn handle_event(&mut self, envelope: &EventEnvelope);
fn last_sequence(&self) -> Option<u64> {
None
}
}
#[derive(Debug, Default, Clone, serde::Serialize, serde::Deserialize)]
pub struct GlobalProjectionCheckpoint {
pub cursors: BTreeMap<StreamId, u64>,
}
impl GlobalProjectionCheckpoint {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn cursor_for(&self, stream_id: &StreamId) -> u64 {
self.cursors.get(stream_id).copied().unwrap_or(0)
}
pub fn advance(&mut self, stream_id: &StreamId, sequence: u64) {
let entry = self.cursors.entry(stream_id.clone()).or_insert(0);
if sequence > *entry {
*entry = sequence;
}
}
}
#[allow(async_fn_in_trait)]
pub trait ProjectionCheckpointStore {
async fn load_projection_checkpoint(
&self,
name: &str,
) -> Result<GlobalProjectionCheckpoint, EngineError>;
async fn save_projection_checkpoint(
&self,
name: &str,
checkpoint: &GlobalProjectionCheckpoint,
) -> Result<(), EngineError>;
async fn advance_projection_cursors(
&self,
name: &str,
_previous: &GlobalProjectionCheckpoint,
current: &GlobalProjectionCheckpoint,
) -> Result<(), EngineError> {
self.save_projection_checkpoint(name, current).await
}
}
pub struct ProjectionRunner;
impl ProjectionRunner {
pub fn run<P: Projection>(projection: &mut P, events: &[EventEnvelope]) {
for event in events {
projection.handle_event(event);
}
}
pub fn run_all(projections: &mut [&mut dyn Projection], events: &[EventEnvelope]) {
for event in events {
for projection in projections.iter_mut() {
projection.handle_event(event);
}
}
}
pub fn catch_up<P: Projection>(projection: &mut P, events: &[EventEnvelope]) {
let from = projection.last_sequence().unwrap_or(0);
if from == 0 {
Self::run(projection, events);
return;
}
let start = events.partition_point(|e| e.sequence_number <= from);
for event in &events[start..] {
projection.handle_event(event);
}
}
pub async fn run_from_store<P, S>(
projection: &mut P,
store: &S,
stream_id: &StreamId,
) -> Result<(), EngineError>
where
P: Projection + Send,
S: EventStore,
{
store
.fold_stream(stream_id, 0, (), |(), env| {
projection.handle_event(&env);
Ok(())
})
.await
}
pub async fn catch_up_from_store<P, S>(
projection: &mut P,
store: &S,
stream_id: &StreamId,
) -> Result<(), EngineError>
where
P: Projection + Send,
S: EventStore,
{
let from = projection.last_sequence().unwrap_or(0);
store
.fold_stream(stream_id, from, (), |(), env| {
projection.handle_event(&env);
Ok(())
})
.await
}
#[must_use = "pass the returned checkpoint to subsequent catch_up_all_streams calls; \
dropping it silently restarts replay from the beginning"]
pub async fn run_all_streams<P, S>(
projection: &mut P,
store: &S,
stream_ids: &[StreamId],
) -> Result<GlobalProjectionCheckpoint, EngineError>
where
P: Projection + Send,
S: EventStore,
{
let mut checkpoint = GlobalProjectionCheckpoint::new();
for stream_id in stream_ids {
let last_seq = store
.fold_stream(stream_id, 0, 0u64, |_, env| {
let seq = env.sequence_number;
projection.handle_event(&env);
Ok(seq)
})
.await?;
if last_seq > 0 {
checkpoint.advance(stream_id, last_seq);
}
}
Ok(checkpoint)
}
#[must_use = "pass the returned checkpoint to the next catch_up_all_streams call; \
dropping it silently discards incremental progress"]
pub async fn catch_up_all_streams<P, S>(
projection: &mut P,
store: &S,
stream_ids: &[StreamId],
checkpoint: &GlobalProjectionCheckpoint,
) -> Result<GlobalProjectionCheckpoint, EngineError>
where
P: Projection + Send,
S: EventStore,
{
let mut updated = checkpoint.clone();
for stream_id in stream_ids {
let from = checkpoint.cursor_for(stream_id);
let last_seq = store
.fold_stream(stream_id, from, from, |_, env| {
let seq = env.sequence_number;
projection.handle_event(&env);
Ok(seq)
})
.await?;
if last_seq > from {
updated.advance(stream_id, last_seq);
}
}
Ok(updated)
}
pub async fn run_matching_streams<P, S>(
projection: &mut P,
store: &S,
prefix: Option<&str>,
) -> Result<GlobalProjectionCheckpoint, EngineError>
where
P: Projection + Send,
S: EventStore,
{
let streams = store.list_streams(prefix).await?;
Self::run_all_streams(projection, store, &streams).await
}
pub async fn catch_up_matching_streams<P, S>(
projection: &mut P,
store: &S,
prefix: Option<&str>,
checkpoint: &GlobalProjectionCheckpoint,
) -> Result<GlobalProjectionCheckpoint, EngineError>
where
P: Projection + Send,
S: EventStore,
{
let streams = store.list_streams(prefix).await?;
Self::catch_up_all_streams(projection, store, &streams, checkpoint).await
}
pub async fn catch_up_persistent<P, S>(
projection: &mut P,
store: &S,
prefix: Option<&str>,
checkpoint_name: &str,
) -> Result<GlobalProjectionCheckpoint, EngineError>
where
P: Projection + Send,
S: EventStore + ProjectionCheckpointStore,
{
let checkpoint = store.load_projection_checkpoint(checkpoint_name).await?;
let streams = store.list_streams(prefix).await?;
let updated = Self::catch_up_all_streams(projection, store, &streams, &checkpoint).await?;
store
.advance_projection_cursors(checkpoint_name, &checkpoint, &updated)
.await?;
Ok(updated)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
envelope::NewEvent,
event_store::{ExpectedVersion, InMemoryEventStore},
ids::{ConversationId, CorrelationId, ProcessId, StreamId, TenantId},
version::WorkflowId,
};
use serde_json::json;
struct Counter {
count: usize,
last: Option<u64>,
}
impl Counter {
fn new() -> Self {
Self {
count: 0,
last: None,
}
}
}
impl Projection for Counter {
fn name(&self) -> &'static str {
"counter"
}
fn handle_event(&mut self, envelope: &EventEnvelope) {
self.count += 1;
self.last = Some(envelope.sequence_number);
}
fn last_sequence(&self) -> Option<u64> {
self.last
}
}
fn make_event() -> NewEvent {
NewEvent {
correlation_id: CorrelationId::new(),
causation_id: None,
conversation_id: ConversationId::new(),
process_id: ProcessId::new(),
tenant_id: TenantId::new(),
workflow_id: WorkflowId::new("test", "FV2024-10-01"),
event_type: "TestEvent".into(),
schema_version: 1,
payload: json!({}),
}
}
#[tokio::test]
async fn run_from_store_full_replay() {
let store = InMemoryEventStore::new();
let stream = StreamId::new("proj/s1");
store
.append(
&stream,
ExpectedVersion::NoStream,
&[make_event(), make_event(), make_event()],
)
.await
.unwrap();
let mut proj = Counter::new();
ProjectionRunner::run_from_store(&mut proj, &store, &stream)
.await
.unwrap();
assert_eq!(proj.count, 3);
assert_eq!(proj.last, Some(3));
}
#[tokio::test]
async fn catch_up_from_store_incremental() {
let store = InMemoryEventStore::new();
let stream = StreamId::new("proj/s2");
store
.append(
&stream,
ExpectedVersion::NoStream,
&[make_event(), make_event()],
)
.await
.unwrap();
let mut proj = Counter::new();
ProjectionRunner::run_from_store(&mut proj, &store, &stream)
.await
.unwrap();
assert_eq!(proj.count, 2);
store
.append(
&stream,
ExpectedVersion::Exact(2),
&[make_event(), make_event()],
)
.await
.unwrap();
ProjectionRunner::catch_up_from_store(&mut proj, &store, &stream)
.await
.unwrap();
assert_eq!(proj.count, 4);
assert_eq!(proj.last, Some(4));
}
#[tokio::test]
async fn run_all_streams_aggregates_across_multiple_streams() {
let store = InMemoryEventStore::new();
let s1 = StreamId::new("process/ms-s1");
let s2 = StreamId::new("process/ms-s2");
let s3 = StreamId::new("process/ms-s3");
store
.append(
&s1,
ExpectedVersion::NoStream,
&[make_event(), make_event()],
)
.await
.unwrap();
store
.append(
&s2,
ExpectedVersion::NoStream,
&[make_event(), make_event(), make_event()],
)
.await
.unwrap();
store
.append(&s3, ExpectedVersion::NoStream, &[make_event()])
.await
.unwrap();
let mut proj = Counter::new();
let cp = ProjectionRunner::run_all_streams(
&mut proj,
&store,
&[s1.clone(), s2.clone(), s3.clone()],
)
.await
.unwrap();
assert_eq!(proj.count, 6, "all 6 events across 3 streams must be fed");
assert_eq!(cp.cursor_for(&s1), 2);
assert_eq!(cp.cursor_for(&s2), 3);
assert_eq!(cp.cursor_for(&s3), 1);
}
#[tokio::test]
async fn catch_up_all_streams_feeds_only_new_events() {
let store = InMemoryEventStore::new();
let s1 = StreamId::new("process/cu-s1");
let s2 = StreamId::new("process/cu-s2");
store
.append(
&s1,
ExpectedVersion::NoStream,
&[make_event(), make_event()],
)
.await
.unwrap();
store
.append(&s2, ExpectedVersion::NoStream, &[make_event()])
.await
.unwrap();
let mut proj = Counter::new();
let cp = ProjectionRunner::run_all_streams(&mut proj, &store, &[s1.clone(), s2.clone()])
.await
.unwrap();
assert_eq!(proj.count, 3);
assert_eq!(cp.cursor_for(&s1), 2);
assert_eq!(cp.cursor_for(&s2), 1);
store
.append(&s1, ExpectedVersion::Exact(2), &[make_event()])
.await
.unwrap();
store
.append(
&s2,
ExpectedVersion::Exact(1),
&[make_event(), make_event()],
)
.await
.unwrap();
let cp2 = ProjectionRunner::catch_up_all_streams(
&mut proj,
&store,
&[s1.clone(), s2.clone()],
&cp,
)
.await
.unwrap();
assert_eq!(proj.count, 6, "3 new events added across both streams");
assert_eq!(cp2.cursor_for(&s1), 3, "s1 advanced from 2 to 3");
assert_eq!(cp2.cursor_for(&s2), 3, "s2 advanced from 1 to 3");
}
#[tokio::test]
async fn run_matching_streams_uses_prefix_filter() {
let store = InMemoryEventStore::new();
let proc1 = StreamId::new("process/match-p1");
let proc2 = StreamId::new("process/match-p2");
let partner = StreamId::new("partner/match-pp1");
store
.append(&proc1, ExpectedVersion::NoStream, &[make_event()])
.await
.unwrap();
store
.append(
&proc2,
ExpectedVersion::NoStream,
&[make_event(), make_event()],
)
.await
.unwrap();
store
.append(&partner, ExpectedVersion::NoStream, &[make_event()])
.await
.unwrap();
let mut proj = Counter::new();
let _ = ProjectionRunner::run_matching_streams(&mut proj, &store, Some("process/match-"))
.await
.unwrap();
assert_eq!(
proj.count, 3,
"partner stream must be excluded by prefix filter"
);
}
#[tokio::test]
async fn global_projection_checkpoint_serde_roundtrip() {
let mut cp = GlobalProjectionCheckpoint::new();
cp.advance(&StreamId::new("p/1"), 5);
cp.advance(&StreamId::new("p/2"), 3);
let json = serde_json::to_string(&cp).unwrap();
let cp2: GlobalProjectionCheckpoint = serde_json::from_str(&json).unwrap();
assert_eq!(cp2.cursor_for(&StreamId::new("p/1")), 5);
assert_eq!(cp2.cursor_for(&StreamId::new("p/2")), 3);
assert_eq!(cp2.cursor_for(&StreamId::new("p/never")), 0);
}
#[tokio::test]
async fn list_streams_with_prefix() {
let store = InMemoryEventStore::new();
let s1 = StreamId::new("process/ls-a");
let s2 = StreamId::new("process/ls-b");
let other = StreamId::new("partner/ls-c");
store
.append(&s1, ExpectedVersion::NoStream, &[make_event()])
.await
.unwrap();
store
.append(&s2, ExpectedVersion::NoStream, &[make_event()])
.await
.unwrap();
store
.append(&other, ExpectedVersion::NoStream, &[make_event()])
.await
.unwrap();
let mut streams = store.list_streams(Some("process/")).await.unwrap();
streams.sort_by_key(|s| s.as_str().to_owned()); assert_eq!(streams.len(), 2);
assert!(streams.iter().any(|s| s.as_str() == "process/ls-a"));
assert!(streams.iter().any(|s| s.as_str() == "process/ls-b"));
let all = store.list_streams(None).await.unwrap();
assert_eq!(all.len(), 3);
}
}