sourcery_core/
projection.rs

1//! Read-side primitives.
2//!
3//! Projections rebuild query models from streams of stored events. This module
4//! provides the projection trait, event application hooks via
5//! [`ApplyProjection`], and the [`ProjectionBuilder`] that wires everything
6//! together.
7use std::{collections::HashMap, marker::PhantomData};
8
9use thiserror::Error;
10
11use crate::{
12    aggregate::Aggregate,
13    codec::{Codec, EventDecodeError, ProjectionEvent},
14    event::DomainEvent,
15    store::{EventFilter, EventStore, StoredEvent},
16};
17
18/// Trait implemented by read models that can be constructed from an event
19/// stream.
20///
21/// Implementors specify the identifier and metadata types their
22/// [`ApplyProjection`] handlers expect. Projections are typically rebuilt by
23/// calling [`Repository::build_projection`] and configuring the desired event
24/// streams before invoking [`ProjectionBuilder::load`].
25// ANCHOR: projection_trait
26pub trait Projection: Default + Sized {
27    /// Aggregate identifier type this projection is compatible with.
28    type Id;
29    /// Metadata type expected by this projection
30    type Metadata;
31}
32// ANCHOR_END: projection_trait
33
34/// Apply an event to a projection with access to envelope context.
35///
36/// Implementations receive the aggregate identifier, the pure domain event,
37/// and metadata supplied by the backing store.
38///
39/// ```ignore
40/// impl ApplyProjection<InventoryAdjusted> for InventoryReport {
41///     fn apply_projection(&mut self, aggregate_id: &Self::Id, event: &InventoryAdjusted, _metadata: &Self::Metadata) {
42///         let stats = self.products.entry(aggregate_id.clone()).or_default();
43///         stats.quantity += event.delta;
44///     }
45/// }
46/// ```
47// ANCHOR: apply_projection_trait
48pub trait ApplyProjection<E>: Projection {
49    fn apply_projection(&mut self, aggregate_id: &Self::Id, event: &E, metadata: &Self::Metadata);
50}
51// ANCHOR_END: apply_projection_trait
52
53/// Errors that can occur when rebuilding a projection.
54#[derive(Debug, Error)]
55pub enum ProjectionError<StoreError, CodecError>
56where
57    StoreError: std::error::Error + 'static,
58    CodecError: std::error::Error + 'static,
59{
60    #[error("failed to load events: {0}")]
61    Store(#[source] StoreError),
62    #[error("failed to decode event kind `{event_kind}`: {error}")]
63    Codec {
64        event_kind: String,
65        #[source]
66        error: CodecError,
67    },
68    #[error("failed to decode event: {0}")]
69    EventDecode(#[source] crate::codec::EventDecodeError<CodecError>),
70    #[error("failed to deserialize snapshot: {0}")]
71    SnapshotDeserialize(#[source] CodecError),
72}
73
74/// Type alias for event handler closures.
75#[derive(Debug)]
76enum HandlerError<CodecError> {
77    Codec(CodecError),
78    EventDecode(EventDecodeError<CodecError>),
79}
80
81impl<CodecError> From<CodecError> for HandlerError<CodecError> {
82    fn from(error: CodecError) -> Self {
83        Self::Codec(error)
84    }
85}
86
87type EventHandler<P, S> = Box<
88    dyn Fn(
89            &mut P,
90            &<S as EventStore>::Id,
91            &[u8],
92            &<S as EventStore>::Metadata,
93            &<S as EventStore>::Codec,
94        ) -> Result<(), HandlerError<<<S as EventStore>::Codec as Codec>::Error>>
95        + Send
96        + Sync,
97>;
98
99/// Builder used to configure which events should be loaded for a projection.
100pub struct ProjectionBuilder<'a, S, P>
101where
102    S: EventStore,
103    P: Projection<Id = S::Id>,
104{
105    pub(super) store: &'a S,
106    /// Event kind -> handler mapping for O(1) dispatch
107    handlers: HashMap<String, EventHandler<P, S>>,
108    /// Filters for loading events from the store
109    filters: Vec<EventFilter<S::Id, S::Position>>,
110    pub(super) _phantom: PhantomData<fn() -> P>,
111}
112
113impl<'a, S, P> ProjectionBuilder<'a, S, P>
114where
115    S: EventStore,
116    P: Projection<Id = S::Id>,
117{
118    pub(super) fn new(store: &'a S) -> Self {
119        Self {
120            store,
121            handlers: HashMap::new(),
122            filters: Vec::new(),
123            _phantom: PhantomData,
124        }
125    }
126
127    /// Register a specific event type to load from all aggregates.
128    ///
129    /// # Type Constraints
130    ///
131    /// The store's metadata type must be convertible to the projection's
132    /// metadata type. `Clone` is required because event handlers receive
133    /// metadata by reference, but `Into::into()` requires ownership. The
134    /// metadata is cloned once per event.
135    ///
136    /// # Example
137    /// ```ignore
138    /// builder.event::<ProductRestocked>()  // All products
139    /// ```
140    #[must_use]
141    pub fn event<E>(mut self) -> Self
142    where
143        E: DomainEvent,
144        P: ApplyProjection<E>,
145        S::Metadata: Clone + Into<P::Metadata>,
146    {
147        self.filters.push(EventFilter::for_event(E::KIND));
148        self.handlers.insert(
149            E::KIND.to_string(),
150            Box::new(|proj, agg_id, data, metadata, codec| {
151                let event: E = codec.deserialize(data)?;
152                let metadata_converted: P::Metadata = metadata.clone().into();
153                ApplyProjection::apply_projection(proj, agg_id, &event, &metadata_converted);
154                Ok(())
155            }),
156        );
157        self
158    }
159
160    /// Register all event kinds supported by a `ProjectionEvent` sum type
161    /// across all aggregates.
162    ///
163    /// This is primarily intended for subscribing to an aggregate's generated
164    /// event enum (`A::Event` from `#[derive(Aggregate)]`) as a single
165    /// "unit", rather than registering each `DomainEvent` type
166    /// individually.
167    ///
168    /// # Example
169    /// ```ignore
170    /// builder.events::<AccountEvent>() // All accounts, all account event variants
171    /// ```
172    #[must_use]
173    pub fn events<E>(mut self) -> Self
174    where
175        E: ProjectionEvent,
176        P: ApplyProjection<E>,
177        S::Metadata: Clone + Into<P::Metadata>,
178    {
179        for &kind in E::EVENT_KINDS {
180            self.filters.push(EventFilter::for_event(kind));
181            self.handlers.insert(
182                kind.to_string(),
183                Box::new(move |proj, agg_id, data, metadata, codec| {
184                    let event =
185                        E::from_stored(kind, data, codec).map_err(HandlerError::EventDecode)?;
186                    let metadata_converted: P::Metadata = metadata.clone().into();
187                    ApplyProjection::apply_projection(proj, agg_id, &event, &metadata_converted);
188                    Ok(())
189                }),
190            );
191        }
192        self
193    }
194
195    /// Register a specific event type to load from a specific aggregate
196    /// instance.
197    ///
198    /// Use this when you only care about a single event kind. If you want to
199    /// subscribe to an aggregate's full event enum (`A::Event`), prefer
200    /// [`ProjectionBuilder::events_for`].
201    ///
202    /// # Example
203    /// ```ignore
204    /// builder.event_for::<Account, FundsDeposited>(&account_id); // One account stream
205    /// ```
206    #[must_use]
207    pub fn event_for<A, E>(mut self, aggregate_id: &S::Id) -> Self
208    where
209        A: Aggregate<Id = S::Id>,
210        E: DomainEvent,
211        P: ApplyProjection<E>,
212        S::Metadata: Clone + Into<P::Metadata>,
213    {
214        self.filters.push(EventFilter::for_aggregate(
215            E::KIND,
216            A::KIND,
217            aggregate_id.clone(),
218        ));
219        self.handlers.insert(
220            E::KIND.to_string(),
221            Box::new(|proj, agg_id, data, metadata, codec| {
222                let event: E = codec.deserialize(data)?;
223                let metadata_converted: P::Metadata = metadata.clone().into();
224                ApplyProjection::apply_projection(proj, agg_id, &event, &metadata_converted);
225                Ok(())
226            }),
227        );
228        self
229    }
230
231    /// Register all event kinds for a specific aggregate instance.
232    ///
233    /// This subscribes the projection to the aggregate's event sum type
234    /// (`A::Event`) and loads all events in that stream that correspond to
235    /// `A::Event::EVENT_KINDS`.
236    ///
237    /// # Example
238    /// ```ignore
239    /// let history = repository
240    ///     .build_projection::<AccountHistory>()
241    ///     .events_for::<Account>(&account_id)
242    ///     .load()?;
243    /// ```
244    #[must_use]
245    pub fn events_for<A>(mut self, aggregate_id: &S::Id) -> Self
246    where
247        A: Aggregate<Id = S::Id>,
248        A::Event: ProjectionEvent,
249        P: ApplyProjection<A::Event>,
250        S::Metadata: Clone + Into<P::Metadata>,
251    {
252        for &kind in <A::Event as ProjectionEvent>::EVENT_KINDS {
253            self.filters.push(EventFilter::for_aggregate(
254                kind,
255                A::KIND,
256                aggregate_id.clone(),
257            ));
258            self.handlers.insert(
259                kind.to_string(),
260                Box::new(move |proj, agg_id, data, metadata, codec| {
261                    let event = <A::Event as ProjectionEvent>::from_stored(kind, data, codec)
262                        .map_err(HandlerError::EventDecode)?;
263                    let metadata_converted: P::Metadata = metadata.clone().into();
264                    ApplyProjection::apply_projection(proj, agg_id, &event, &metadata_converted);
265                    Ok(())
266                }),
267            );
268        }
269        self
270    }
271
272    /// Replays the configured events and materializes the projection.
273    ///
274    /// Events are dispatched to the projection via the registered handlers,
275    /// which deserialize and apply each event through the appropriate
276    /// `ApplyProjection` implementation.
277    ///
278    /// # Errors
279    ///
280    /// Returns [`ProjectionError`] when the store fails to load events or when
281    /// an event cannot be deserialized.
282    #[tracing::instrument(
283        skip(self),
284        fields(
285            projection_type = std::any::type_name::<P>(),
286            filter_count = self.filters.len(),
287            handler_count = self.handlers.len()
288        )
289    )]
290    pub async fn load(self) -> Result<P, ProjectionError<S::Error, <S::Codec as Codec>::Error>> {
291        tracing::debug!("loading projection");
292
293        let events = self
294            .store
295            .load_events(&self.filters)
296            .await
297            .map_err(ProjectionError::Store)?;
298        let codec = self.store.codec();
299        let mut projection = P::default();
300
301        let event_count = events.len();
302        tracing::debug!(
303            events_to_replay = event_count,
304            "replaying events into projection"
305        );
306
307        for stored in events {
308            let StoredEvent {
309                aggregate_id,
310                kind,
311                data,
312                metadata,
313                ..
314            } = stored;
315
316            // O(1) handler lookup instead of O(n) linear scan
317            if let Some(handler) = self.handlers.get(&kind) {
318                (handler)(&mut projection, &aggregate_id, &data, &metadata, codec).map_err(
319                    |error| match error {
320                        HandlerError::Codec(error) => ProjectionError::Codec {
321                            event_kind: kind.clone(),
322                            error,
323                        },
324                        HandlerError::EventDecode(error) => ProjectionError::EventDecode(error),
325                    },
326                )?;
327            }
328            // Unknown kinds are intentionally skipped - projections only care
329            // about the events they explicitly registered handlers
330            // for
331        }
332
333        tracing::info!(events_applied = event_count, "projection loaded");
334        Ok(projection)
335    }
336}
337
338#[cfg(test)]
339mod tests {
340    use std::{error::Error, io};
341
342    use super::*;
343
344    #[test]
345    fn projection_error_display_store_mentions_loading() {
346        let error: ProjectionError<io::Error, io::Error> =
347            ProjectionError::Store(io::Error::new(io::ErrorKind::NotFound, "not found"));
348        let msg = error.to_string();
349        assert!(msg.contains("failed to load events"));
350        assert!(error.source().is_some());
351    }
352
353    #[test]
354    fn projection_error_display_codec_includes_event_kind() {
355        let error: ProjectionError<io::Error, io::Error> = ProjectionError::Codec {
356            event_kind: "test-event".to_string(),
357            error: io::Error::new(io::ErrorKind::InvalidData, "bad data"),
358        };
359        let msg = error.to_string();
360        assert!(msg.contains("failed to decode event kind `test-event`"));
361        assert!(error.source().is_some());
362    }
363}