1use std::collections::HashMap;
8
9use thiserror::Error;
10
11use crate::{
12 aggregate::Aggregate,
13 event::{DomainEvent, EventDecodeError, ProjectionEvent},
14 store::{EventFilter, EventStore, StoredEvent},
15};
16
17pub trait ProjectionFilters: Sized {
32 type Id;
34 type InstanceId;
38 type Metadata;
40
41 fn init(instance_id: &Self::InstanceId) -> Self;
47
48 fn filters<S>(instance_id: &Self::InstanceId) -> Filters<S, Self>
50 where
51 S: EventStore<Id = Self::Id, Metadata = Self::Metadata>;
52}
53pub trait Projection {
65 const KIND: &'static str;
67}
68pub trait ApplyProjection<E>: ProjectionFilters {
85 fn apply_projection(&mut self, aggregate_id: &Self::Id, event: &E, metadata: &Self::Metadata);
86}
87#[derive(Debug, Error)]
91pub enum ProjectionError<StoreError>
92where
93 StoreError: std::error::Error + 'static,
94{
95 #[error("failed to load events: {0}")]
96 Store(#[source] StoreError),
97 #[error("failed to decode event: {0}")]
98 EventDecode(#[source] EventDecodeError<StoreError>),
99}
100
101#[derive(Debug)]
103pub(crate) enum HandlerError<StoreError> {
104 EventDecode(EventDecodeError<StoreError>),
105 Store(StoreError),
106}
107
108impl<StoreError> From<StoreError> for HandlerError<StoreError> {
109 fn from(error: StoreError) -> Self {
110 Self::Store(error)
111 }
112}
113
114pub(crate) type EventHandler<P, S> = Box<
116 dyn Fn(
117 &mut P,
118 &<S as EventStore>::Id,
119 &StoredEvent<
120 <S as EventStore>::Id,
121 <S as EventStore>::Position,
122 <S as EventStore>::Data,
123 <S as EventStore>::Metadata,
124 >,
125 &<S as EventStore>::Metadata,
126 &S,
127 ) -> Result<(), HandlerError<<S as EventStore>::Error>>
128 + Send
129 + Sync,
130>;
131
132pub(crate) type PositionedFilters<S, P> = (
134 Vec<EventFilter<<S as EventStore>::Id, <S as EventStore>::Position>>,
135 HashMap<&'static str, EventHandler<P, S>>,
136);
137
138pub struct Filters<S, P>
147where
148 S: EventStore,
149{
150 pub(crate) specs: Vec<EventFilter<S::Id, ()>>,
151 pub(crate) handlers: HashMap<&'static str, EventHandler<P, S>>,
152}
153
154impl<S, P> Default for Filters<S, P>
155where
156 S: EventStore,
157 P: ProjectionFilters<Id = S::Id, Metadata = S::Metadata>,
158{
159 fn default() -> Self {
160 Self::new()
161 }
162}
163
164impl<S, P> Filters<S, P>
165where
166 S: EventStore,
167 P: ProjectionFilters<Id = S::Id, Metadata = S::Metadata>,
168{
169 #[must_use]
171 pub fn new() -> Self {
172 Self {
173 specs: Vec::new(),
174 handlers: HashMap::new(),
175 }
176 }
177
178 #[must_use]
180 pub fn event<E>(mut self) -> Self
181 where
182 E: DomainEvent + serde::de::DeserializeOwned,
183 P: ApplyProjection<E>,
184 {
185 self.specs.push(EventFilter::for_event(E::KIND));
186 self.handlers.insert(
187 E::KIND,
188 Box::new(|proj, agg_id, stored, metadata, store| {
189 let event: E = store.decode_event(stored)?;
190 ApplyProjection::apply_projection(proj, agg_id, &event, metadata);
191 Ok(())
192 }),
193 );
194 self
195 }
196
197 #[must_use]
200 pub fn events<E>(mut self) -> Self
201 where
202 E: ProjectionEvent,
203 P: ApplyProjection<E>,
204 {
205 for &kind in E::EVENT_KINDS {
206 self.specs.push(EventFilter::for_event(kind));
207 self.handlers.insert(
208 kind,
209 Box::new(move |proj, agg_id, stored, metadata, store| {
210 let event = E::from_stored(stored, store).map_err(HandlerError::EventDecode)?;
211 ApplyProjection::apply_projection(proj, agg_id, &event, metadata);
212 Ok(())
213 }),
214 );
215 }
216 self
217 }
218
219 #[must_use]
221 pub fn event_for<A, E>(mut self, aggregate_id: &S::Id) -> Self
222 where
223 A: Aggregate<Id = S::Id>,
224 E: DomainEvent + serde::de::DeserializeOwned,
225 P: ApplyProjection<E>,
226 {
227 self.specs.push(EventFilter::for_aggregate(
228 E::KIND,
229 A::KIND,
230 aggregate_id.clone(),
231 ));
232 self.handlers.insert(
233 E::KIND,
234 Box::new(|proj, agg_id, stored, metadata, store| {
235 let event: E = store.decode_event(stored)?;
236 ApplyProjection::apply_projection(proj, agg_id, &event, metadata);
237 Ok(())
238 }),
239 );
240 self
241 }
242
243 #[must_use]
245 pub fn events_for<A>(mut self, aggregate_id: &S::Id) -> Self
246 where
247 A: Aggregate<Id = S::Id>,
248 A::Event: ProjectionEvent,
249 P: ApplyProjection<A::Event>,
250 {
251 for &kind in <A::Event as ProjectionEvent>::EVENT_KINDS {
252 self.specs.push(EventFilter::for_aggregate(
253 kind,
254 A::KIND,
255 aggregate_id.clone(),
256 ));
257 self.handlers.insert(
258 kind,
259 Box::new(move |proj, agg_id, stored, metadata, store| {
260 let event = <A::Event as ProjectionEvent>::from_stored(stored, store)
261 .map_err(HandlerError::EventDecode)?;
262 ApplyProjection::apply_projection(proj, agg_id, &event, metadata);
263 Ok(())
264 }),
265 );
266 }
267 self
268 }
269
270 pub(crate) fn into_event_filters(self, after: Option<&S::Position>) -> PositionedFilters<S, P> {
272 let filters = self
273 .specs
274 .into_iter()
275 .map(|spec| {
276 let mut filter = EventFilter {
277 event_kind: spec.event_kind,
278 aggregate_kind: spec.aggregate_kind,
279 aggregate_id: spec.aggregate_id,
280 after_position: None,
281 };
282 if let Some(pos) = after {
283 filter = filter.after(pos.clone());
284 }
285 filter
286 })
287 .collect();
288 (filters, self.handlers)
289 }
290}
291
292#[cfg(test)]
293mod tests {
294 use std::{error::Error, io};
295
296 use super::*;
297
298 #[test]
299 fn error_display_store_mentions_loading() {
300 let error: ProjectionError<io::Error> =
301 ProjectionError::Store(io::Error::new(io::ErrorKind::NotFound, "not found"));
302 let msg = error.to_string();
303 assert!(msg.contains("failed to load events"));
304 assert!(error.source().is_some());
305 }
306}