Skip to main content

sourcery_core/
projection.rs

1//! Read-side primitives.
2//!
3//! Projections rebuild query models from streams of stored events. This module
4//! provides the projection trait, event application hooks via
5//! [`ApplyProjection`], and the [`Filters`] builder that wires everything
6//! together.
7use std::collections::HashMap;
8
9use thiserror::Error;
10
11use crate::{
12    aggregate::Aggregate,
13    event::{DomainEvent, EventDecodeError, ProjectionEvent},
14    store::{EventFilter, EventStore, StoredEvent},
15};
16
17/// Base trait for types that subscribe to events.
18///
19/// The `filters()` method returns both the event filters AND the handlers
20/// needed to process those events. It is generic over the store type `S`,
21/// allowing the same projection to work with any store that shares the
22/// same `Id` and `Metadata` types.
23///
24/// `filters()` must be **pure and deterministic**: given the same
25/// `instance_id`, it must always return the same filter set.
26///
27/// `init()` constructs a fresh instance from the instance identifier.
28/// This replaces the `Default` constraint, allowing instance-aware
29/// projections to capture their identity at construction time.
30// ANCHOR: projection_filters_trait
31pub trait ProjectionFilters: Sized {
32    /// Aggregate identifier type this subscriber is compatible with.
33    type Id;
34    /// Instance identifier for this subscriber.
35    ///
36    /// For singleton subscribers use `()`.
37    type InstanceId;
38    /// Metadata type expected by this subscriber's handlers.
39    type Metadata;
40
41    /// Construct a fresh instance from the instance identifier.
42    ///
43    /// For singleton projections (`InstanceId = ()`), this typically
44    /// delegates to `Self::default()`. For instance projections, this
45    /// captures the instance identifier at construction time.
46    fn init(instance_id: &Self::InstanceId) -> Self;
47
48    /// Build the filter set and handler map for this subscriber.
49    fn filters<S>(instance_id: &Self::InstanceId) -> Filters<S, Self>
50    where
51        S: EventStore<Id = Self::Id, Metadata = Self::Metadata>;
52}
53// ANCHOR_END: projection_filters_trait
54
55/// Trait implemented by read models that can be constructed from an event
56/// stream.
57///
58/// Contains a stable `KIND` identifier for snapshot storage.
59///
60/// `Projection` is intentionally independent from [`ProjectionFilters`], so
61/// `#[derive(Projection)]` can always be used while filter behaviour remains an
62/// explicit opt-in via `ProjectionFilters`.
63// ANCHOR: projection_trait
64pub trait Projection {
65    /// Stable identifier for this projection type.
66    const KIND: &'static str;
67}
68// ANCHOR_END: projection_trait
69
70/// Apply an event to a projection with access to envelope context.
71///
72/// Implementations receive the aggregate identifier, the pure domain event,
73/// and metadata supplied by the backing store.
74///
75/// ```ignore
76/// impl ApplyProjection<InventoryAdjusted> for InventoryReport {
77///     fn apply_projection(&mut self, aggregate_id: &Self::Id, event: &InventoryAdjusted, _metadata: &Self::Metadata) {
78///         let stats = self.products.entry(aggregate_id.clone()).or_default();
79///         stats.quantity += event.delta;
80///     }
81/// }
82/// ```
83// ANCHOR: apply_projection_trait
84pub trait ApplyProjection<E>: ProjectionFilters {
85    fn apply_projection(&mut self, aggregate_id: &Self::Id, event: &E, metadata: &Self::Metadata);
86}
87// ANCHOR_END: apply_projection_trait
88
89/// Errors that can occur when rebuilding a projection.
90#[derive(Debug, Error)]
91pub enum ProjectionError<StoreError>
92where
93    StoreError: std::error::Error + 'static,
94{
95    #[error("failed to load events: {0}")]
96    Store(#[source] StoreError),
97    #[error("failed to decode event: {0}")]
98    EventDecode(#[source] EventDecodeError<StoreError>),
99}
100
101/// Internal error type for event handler closures.
102#[derive(Debug)]
103pub(crate) enum HandlerError<StoreError> {
104    EventDecode(EventDecodeError<StoreError>),
105    Store(StoreError),
106}
107
108impl<StoreError> From<StoreError> for HandlerError<StoreError> {
109    fn from(error: StoreError) -> Self {
110        Self::Store(error)
111    }
112}
113
114/// Type alias for event handler closures used by [`Filters`].
115pub(crate) type EventHandler<P, S> = Box<
116    dyn Fn(
117            &mut P,
118            &<S as EventStore>::Id,
119            &StoredEvent<
120                <S as EventStore>::Id,
121                <S as EventStore>::Position,
122                <S as EventStore>::Data,
123                <S as EventStore>::Metadata,
124            >,
125            &<S as EventStore>::Metadata,
126            &S,
127        ) -> Result<(), HandlerError<<S as EventStore>::Error>>
128        + Send
129        + Sync,
130>;
131
132/// Positioned filters and handler map, ready for event loading.
133pub(crate) type PositionedFilters<S, P> = (
134    Vec<EventFilter<<S as EventStore>::Id, <S as EventStore>::Position>>,
135    HashMap<&'static str, EventHandler<P, S>>,
136);
137
138/// Combined filter configuration and handler map for a subscriber.
139///
140/// `Filters` captures both the event filters (which events to load) and the
141/// handler closures (how to apply them). It is parameterised by the store
142/// type `S` to enable store-mediated decoding.
143///
144/// Filters are constructed without positions. Positions are applied at
145/// load/subscribe time.
146pub struct Filters<S, P>
147where
148    S: EventStore,
149{
150    pub(crate) specs: Vec<EventFilter<S::Id, ()>>,
151    pub(crate) handlers: HashMap<&'static str, EventHandler<P, S>>,
152}
153
154impl<S, P> Default for Filters<S, P>
155where
156    S: EventStore,
157    P: ProjectionFilters<Id = S::Id, Metadata = S::Metadata>,
158{
159    fn default() -> Self {
160        Self::new()
161    }
162}
163
164impl<S, P> Filters<S, P>
165where
166    S: EventStore,
167    P: ProjectionFilters<Id = S::Id, Metadata = S::Metadata>,
168{
169    /// Create an empty filter set.
170    #[must_use]
171    pub fn new() -> Self {
172        Self {
173            specs: Vec::new(),
174            handlers: HashMap::new(),
175        }
176    }
177
178    /// Subscribe to a specific event type globally (all aggregates).
179    #[must_use]
180    pub fn event<E>(mut self) -> Self
181    where
182        E: DomainEvent + serde::de::DeserializeOwned,
183        P: ApplyProjection<E>,
184    {
185        self.specs.push(EventFilter::for_event(E::KIND));
186        self.handlers.insert(
187            E::KIND,
188            Box::new(|proj, agg_id, stored, metadata, store| {
189                let event: E = store.decode_event(stored)?;
190                ApplyProjection::apply_projection(proj, agg_id, &event, metadata);
191                Ok(())
192            }),
193        );
194        self
195    }
196
197    /// Subscribe to all event kinds supported by a [`ProjectionEvent`] sum
198    /// type across all aggregates.
199    #[must_use]
200    pub fn events<E>(mut self) -> Self
201    where
202        E: ProjectionEvent,
203        P: ApplyProjection<E>,
204    {
205        for &kind in E::EVENT_KINDS {
206            self.specs.push(EventFilter::for_event(kind));
207            self.handlers.insert(
208                kind,
209                Box::new(move |proj, agg_id, stored, metadata, store| {
210                    let event = E::from_stored(stored, store).map_err(HandlerError::EventDecode)?;
211                    ApplyProjection::apply_projection(proj, agg_id, &event, metadata);
212                    Ok(())
213                }),
214            );
215        }
216        self
217    }
218
219    /// Subscribe to a specific event type from a specific aggregate instance.
220    #[must_use]
221    pub fn event_for<A, E>(mut self, aggregate_id: &S::Id) -> Self
222    where
223        A: Aggregate<Id = S::Id>,
224        E: DomainEvent + serde::de::DeserializeOwned,
225        P: ApplyProjection<E>,
226    {
227        self.specs.push(EventFilter::for_aggregate(
228            E::KIND,
229            A::KIND,
230            aggregate_id.clone(),
231        ));
232        self.handlers.insert(
233            E::KIND,
234            Box::new(|proj, agg_id, stored, metadata, store| {
235                let event: E = store.decode_event(stored)?;
236                ApplyProjection::apply_projection(proj, agg_id, &event, metadata);
237                Ok(())
238            }),
239        );
240        self
241    }
242
243    /// Subscribe to all events from a specific aggregate instance.
244    #[must_use]
245    pub fn events_for<A>(mut self, aggregate_id: &S::Id) -> Self
246    where
247        A: Aggregate<Id = S::Id>,
248        A::Event: ProjectionEvent,
249        P: ApplyProjection<A::Event>,
250    {
251        for &kind in <A::Event as ProjectionEvent>::EVENT_KINDS {
252            self.specs.push(EventFilter::for_aggregate(
253                kind,
254                A::KIND,
255                aggregate_id.clone(),
256            ));
257            self.handlers.insert(
258                kind,
259                Box::new(move |proj, agg_id, stored, metadata, store| {
260                    let event = <A::Event as ProjectionEvent>::from_stored(stored, store)
261                        .map_err(HandlerError::EventDecode)?;
262                    ApplyProjection::apply_projection(proj, agg_id, &event, metadata);
263                    Ok(())
264                }),
265            );
266        }
267        self
268    }
269
270    /// Convert positionless filter specs into positioned [`EventFilter`]s.
271    pub(crate) fn into_event_filters(self, after: Option<&S::Position>) -> PositionedFilters<S, P> {
272        let filters = self
273            .specs
274            .into_iter()
275            .map(|spec| {
276                let mut filter = EventFilter {
277                    event_kind: spec.event_kind,
278                    aggregate_kind: spec.aggregate_kind,
279                    aggregate_id: spec.aggregate_id,
280                    after_position: None,
281                };
282                if let Some(pos) = after {
283                    filter = filter.after(pos.clone());
284                }
285                filter
286            })
287            .collect();
288        (filters, self.handlers)
289    }
290}
291
292#[cfg(test)]
293mod tests {
294    use std::{error::Error, io};
295
296    use super::*;
297
298    #[test]
299    fn error_display_store_mentions_loading() {
300        let error: ProjectionError<io::Error> =
301            ProjectionError::Store(io::Error::new(io::ErrorKind::NotFound, "not found"));
302        let msg = error.to_string();
303        assert!(msg.contains("failed to load events"));
304        assert!(error.source().is_some());
305    }
306}