1use std::{collections::HashMap, marker::PhantomData};
8
9use thiserror::Error;
10
11use crate::{
12 aggregate::Aggregate,
13 codec::{Codec, EventDecodeError, ProjectionEvent},
14 event::DomainEvent,
15 store::{EventFilter, EventStore, StoredEvent},
16};
17
18pub trait Projection: Default + Sized {
27 type Id;
29 type Metadata;
31}
32pub trait ApplyProjection<E>: Projection {
49 fn apply_projection(&mut self, aggregate_id: &Self::Id, event: &E, metadata: &Self::Metadata);
50}
51#[derive(Debug, Error)]
55pub enum ProjectionError<StoreError, CodecError>
56where
57 StoreError: std::error::Error + 'static,
58 CodecError: std::error::Error + 'static,
59{
60 #[error("failed to load events: {0}")]
61 Store(#[source] StoreError),
62 #[error("failed to decode event kind `{event_kind}`: {error}")]
63 Codec {
64 event_kind: String,
65 #[source]
66 error: CodecError,
67 },
68 #[error("failed to decode event: {0}")]
69 EventDecode(#[source] crate::codec::EventDecodeError<CodecError>),
70 #[error("failed to deserialize snapshot: {0}")]
71 SnapshotDeserialize(#[source] CodecError),
72}
73
74#[derive(Debug)]
76enum HandlerError<CodecError> {
77 Codec(CodecError),
78 EventDecode(EventDecodeError<CodecError>),
79}
80
81impl<CodecError> From<CodecError> for HandlerError<CodecError> {
82 fn from(error: CodecError) -> Self {
83 Self::Codec(error)
84 }
85}
86
87type EventHandler<P, S> = Box<
88 dyn Fn(
89 &mut P,
90 &<S as EventStore>::Id,
91 &[u8],
92 &<S as EventStore>::Metadata,
93 &<S as EventStore>::Codec,
94 ) -> Result<(), HandlerError<<<S as EventStore>::Codec as Codec>::Error>>
95 + Send
96 + Sync,
97>;
98
99pub struct ProjectionBuilder<'a, S, P>
101where
102 S: EventStore,
103 P: Projection<Id = S::Id>,
104{
105 pub(super) store: &'a S,
106 handlers: HashMap<String, EventHandler<P, S>>,
108 filters: Vec<EventFilter<S::Id, S::Position>>,
110 pub(super) _phantom: PhantomData<fn() -> P>,
111}
112
113impl<'a, S, P> ProjectionBuilder<'a, S, P>
114where
115 S: EventStore,
116 P: Projection<Id = S::Id>,
117{
118 pub(super) fn new(store: &'a S) -> Self {
119 Self {
120 store,
121 handlers: HashMap::new(),
122 filters: Vec::new(),
123 _phantom: PhantomData,
124 }
125 }
126
127 #[must_use]
141 pub fn event<E>(mut self) -> Self
142 where
143 E: DomainEvent,
144 P: ApplyProjection<E>,
145 S::Metadata: Clone + Into<P::Metadata>,
146 {
147 self.filters.push(EventFilter::for_event(E::KIND));
148 self.handlers.insert(
149 E::KIND.to_string(),
150 Box::new(|proj, agg_id, data, metadata, codec| {
151 let event: E = codec.deserialize(data)?;
152 let metadata_converted: P::Metadata = metadata.clone().into();
153 ApplyProjection::apply_projection(proj, agg_id, &event, &metadata_converted);
154 Ok(())
155 }),
156 );
157 self
158 }
159
160 #[must_use]
173 pub fn events<E>(mut self) -> Self
174 where
175 E: ProjectionEvent,
176 P: ApplyProjection<E>,
177 S::Metadata: Clone + Into<P::Metadata>,
178 {
179 for &kind in E::EVENT_KINDS {
180 self.filters.push(EventFilter::for_event(kind));
181 self.handlers.insert(
182 kind.to_string(),
183 Box::new(move |proj, agg_id, data, metadata, codec| {
184 let event =
185 E::from_stored(kind, data, codec).map_err(HandlerError::EventDecode)?;
186 let metadata_converted: P::Metadata = metadata.clone().into();
187 ApplyProjection::apply_projection(proj, agg_id, &event, &metadata_converted);
188 Ok(())
189 }),
190 );
191 }
192 self
193 }
194
195 #[must_use]
207 pub fn event_for<A, E>(mut self, aggregate_id: &S::Id) -> Self
208 where
209 A: Aggregate<Id = S::Id>,
210 E: DomainEvent,
211 P: ApplyProjection<E>,
212 S::Metadata: Clone + Into<P::Metadata>,
213 {
214 self.filters.push(EventFilter::for_aggregate(
215 E::KIND,
216 A::KIND,
217 aggregate_id.clone(),
218 ));
219 self.handlers.insert(
220 E::KIND.to_string(),
221 Box::new(|proj, agg_id, data, metadata, codec| {
222 let event: E = codec.deserialize(data)?;
223 let metadata_converted: P::Metadata = metadata.clone().into();
224 ApplyProjection::apply_projection(proj, agg_id, &event, &metadata_converted);
225 Ok(())
226 }),
227 );
228 self
229 }
230
231 #[must_use]
245 pub fn events_for<A>(mut self, aggregate_id: &S::Id) -> Self
246 where
247 A: Aggregate<Id = S::Id>,
248 A::Event: ProjectionEvent,
249 P: ApplyProjection<A::Event>,
250 S::Metadata: Clone + Into<P::Metadata>,
251 {
252 for &kind in <A::Event as ProjectionEvent>::EVENT_KINDS {
253 self.filters.push(EventFilter::for_aggregate(
254 kind,
255 A::KIND,
256 aggregate_id.clone(),
257 ));
258 self.handlers.insert(
259 kind.to_string(),
260 Box::new(move |proj, agg_id, data, metadata, codec| {
261 let event = <A::Event as ProjectionEvent>::from_stored(kind, data, codec)
262 .map_err(HandlerError::EventDecode)?;
263 let metadata_converted: P::Metadata = metadata.clone().into();
264 ApplyProjection::apply_projection(proj, agg_id, &event, &metadata_converted);
265 Ok(())
266 }),
267 );
268 }
269 self
270 }
271
272 #[tracing::instrument(
283 skip(self),
284 fields(
285 projection_type = std::any::type_name::<P>(),
286 filter_count = self.filters.len(),
287 handler_count = self.handlers.len()
288 )
289 )]
290 pub async fn load(self) -> Result<P, ProjectionError<S::Error, <S::Codec as Codec>::Error>> {
291 tracing::debug!("loading projection");
292
293 let events = self
294 .store
295 .load_events(&self.filters)
296 .await
297 .map_err(ProjectionError::Store)?;
298 let codec = self.store.codec();
299 let mut projection = P::default();
300
301 let event_count = events.len();
302 tracing::debug!(
303 events_to_replay = event_count,
304 "replaying events into projection"
305 );
306
307 for stored in events {
308 let StoredEvent {
309 aggregate_id,
310 kind,
311 data,
312 metadata,
313 ..
314 } = stored;
315
316 if let Some(handler) = self.handlers.get(&kind) {
318 (handler)(&mut projection, &aggregate_id, &data, &metadata, codec).map_err(
319 |error| match error {
320 HandlerError::Codec(error) => ProjectionError::Codec {
321 event_kind: kind.clone(),
322 error,
323 },
324 HandlerError::EventDecode(error) => ProjectionError::EventDecode(error),
325 },
326 )?;
327 }
328 }
332
333 tracing::info!(events_applied = event_count, "projection loaded");
334 Ok(projection)
335 }
336}
337
338#[cfg(test)]
339mod tests {
340 use std::{error::Error, io};
341
342 use super::*;
343
344 #[test]
345 fn projection_error_display_store_mentions_loading() {
346 let error: ProjectionError<io::Error, io::Error> =
347 ProjectionError::Store(io::Error::new(io::ErrorKind::NotFound, "not found"));
348 let msg = error.to_string();
349 assert!(msg.contains("failed to load events"));
350 assert!(error.source().is_some());
351 }
352
353 #[test]
354 fn projection_error_display_codec_includes_event_kind() {
355 let error: ProjectionError<io::Error, io::Error> = ProjectionError::Codec {
356 event_kind: "test-event".to_string(),
357 error: io::Error::new(io::ErrorKind::InvalidData, "bad data"),
358 };
359 let msg = error.to_string();
360 assert!(msg.contains("failed to decode event kind `test-event`"));
361 assert!(error.source().is_some());
362 }
363}