Skip to main content

atomr_patterns/cqrs/
reader.rs

1//! [`Reader`] — read-side fold from journal events into a projection.
2//!
3//! Users implement this trait once per read model. `decode` deserializes
4//! the journal payload into the event type; `apply` folds events into
5//! the projection. The framework runs an async loop that polls the
6//! configured [`atomr_persistence_query::ReadJournal`] and drives this
7//! trait.
8
9use async_trait::async_trait;
10
11/// What stream of events the reader subscribes to. Selected via
12/// [`Reader::filter`]; defaults to [`ReaderFilter::All`].
13#[derive(Debug, Clone)]
14#[non_exhaustive]
15pub enum ReaderFilter {
16    /// Every event from every persistence id.
17    All,
18    /// Only events whose [`crate::DomainEvent::tags`] contains this tag.
19    Tag(String),
20    /// Only events from the given persistence id.
21    PersistenceId(String),
22    /// Only events whose persistence id is in the given set.
23    PersistenceIds(Vec<String>),
24}
25
26/// Fold journal events into a projection.
27///
28/// The runner polls the configured read journal, decodes each
29/// [`atomr_persistence_query::EventEnvelope`]'s payload into
30/// `Self::Event` via [`Reader::decode`], optionally filters by
31/// [`Reader::tag`], and calls [`Reader::apply`] per event. Per-pid
32/// offsets are tracked internally so each event is applied exactly
33/// once per process lifetime.
34#[async_trait]
35pub trait Reader: Send + 'static {
36    /// The event type this reader projects. Must match the aggregate's
37    /// event type when wired into a [`super::CqrsPattern`].
38    type Event: Send + Clone + 'static;
39
40    /// The read-model state this reader builds.
41    type Projection: Default + Send + Sync + 'static;
42
43    /// Domain error type for projection failures. Failures are logged
44    /// at `warn` level; the runner advances past the offending event so
45    /// it doesn't get stuck.
46    type Error: std::error::Error + Send + 'static;
47
48    /// Stable name of this reader. Used for tracing spans and
49    /// dashboard child-actor naming. Must be unique per CQRS instance.
50    fn name(&self) -> &str;
51
52    /// Legacy tag filter. Default `None`. Implemented in terms of
53    /// [`Self::filter`] so existing v1 readers keep working unchanged.
54    /// Prefer [`Self::filter`] in new code.
55    fn tag(&self) -> Option<String> {
56        None
57    }
58
59    /// What stream of events this reader follows. Default returns
60    /// [`ReaderFilter::Tag`] when [`Self::tag`] is `Some`, else
61    /// [`ReaderFilter::All`].
62    fn filter(&self) -> ReaderFilter {
63        match self.tag() {
64            Some(t) => ReaderFilter::Tag(t),
65            None => ReaderFilter::All,
66        }
67    }
68
69    /// Decode a journal payload back into the event type. The codec
70    /// must be the inverse of the aggregate's `encode_event`. Used
71    /// as the fallback when no [`crate::cqrs::EventCodecRegistry`] is
72    /// configured for the relevant `manifest`.
73    fn decode(bytes: &[u8]) -> Result<Self::Event, String>;
74
75    /// Apply one event to the projection.
76    async fn apply(
77        &mut self,
78        projection: &mut Self::Projection,
79        event: Self::Event,
80    ) -> Result<(), Self::Error>;
81}