atomr_patterns/cqrs/reader.rs
1//! [`Reader`] — read-side fold from journal events into a projection.
2//!
3//! Users implement this trait once per read model. `decode` deserializes
4//! the journal payload into the event type; `apply` folds events into
5//! the projection. The framework runs an async loop that polls the
6//! configured [`atomr_persistence_query::ReadJournal`] and drives this
7//! trait.
8
9use async_trait::async_trait;
10
11/// What stream of events the reader subscribes to. Selected via
12/// [`Reader::filter`]; defaults to [`ReaderFilter::All`].
13#[derive(Debug, Clone)]
14#[non_exhaustive]
15pub enum ReaderFilter {
16 /// Every event from every persistence id.
17 All,
18 /// Only events whose [`crate::DomainEvent::tags`] contains this tag.
19 Tag(String),
20 /// Only events from the given persistence id.
21 PersistenceId(String),
22 /// Only events whose persistence id is in the given set.
23 PersistenceIds(Vec<String>),
24}
25
26/// Fold journal events into a projection.
27///
28/// The runner polls the configured read journal, decodes each
29/// [`atomr_persistence_query::EventEnvelope`]'s payload into
30/// `Self::Event` via [`Reader::decode`], optionally filters by
31/// [`Reader::tag`], and calls [`Reader::apply`] per event. Per-pid
32/// offsets are tracked internally so each event is applied exactly
33/// once per process lifetime.
34#[async_trait]
35pub trait Reader: Send + 'static {
36 /// The event type this reader projects. Must match the aggregate's
37 /// event type when wired into a [`super::CqrsPattern`].
38 type Event: Send + Clone + 'static;
39
40 /// The read-model state this reader builds.
41 type Projection: Default + Send + Sync + 'static;
42
43 /// Domain error type for projection failures. Failures are logged
44 /// at `warn` level; the runner advances past the offending event so
45 /// it doesn't get stuck.
46 type Error: std::error::Error + Send + 'static;
47
48 /// Stable name of this reader. Used for tracing spans and
49 /// dashboard child-actor naming. Must be unique per CQRS instance.
50 fn name(&self) -> &str;
51
52 /// Legacy tag filter. Default `None`. Implemented in terms of
53 /// [`Self::filter`] so existing v1 readers keep working unchanged.
54 /// Prefer [`Self::filter`] in new code.
55 fn tag(&self) -> Option<String> {
56 None
57 }
58
59 /// What stream of events this reader follows. Default returns
60 /// [`ReaderFilter::Tag`] when [`Self::tag`] is `Some`, else
61 /// [`ReaderFilter::All`].
62 fn filter(&self) -> ReaderFilter {
63 match self.tag() {
64 Some(t) => ReaderFilter::Tag(t),
65 None => ReaderFilter::All,
66 }
67 }
68
69 /// Decode a journal payload back into the event type. The codec
70 /// must be the inverse of the aggregate's `encode_event`. Used
71 /// as the fallback when no [`crate::cqrs::EventCodecRegistry`] is
72 /// configured for the relevant `manifest`.
73 fn decode(bytes: &[u8]) -> Result<Self::Event, String>;
74
75 /// Apply one event to the projection.
76 async fn apply(
77 &mut self,
78 projection: &mut Self::Projection,
79 event: Self::Event,
80 ) -> Result<(), Self::Error>;
81}