Skip to main content

statum_core/
projection.rs

1//! Event-stream projection helpers for Statum rebuild flows.
2//!
3//! `#[validators]` works on one persisted shape at a time. If your storage model
4//! is append-only, reduce events into a row-like projection first and then feed
5//! that projection into `into_machine()` or `.into_machines()`.
6//!
7//! ```
8//! use statum_core::projection::{reduce_grouped, ProjectionReducer};
9//!
10//! #[derive(Clone)]
11//! struct OrderEvent {
12//!     order_id: u64,
13//!     amount_cents: u64,
14//! }
15//!
16//! struct OrderTotals;
17//!
18//! impl ProjectionReducer<OrderEvent> for OrderTotals {
19//!     type Projection = (u64, u64);
20//!     type Error = core::convert::Infallible;
21//!
22//!     fn seed(&self, event: &OrderEvent) -> Result<Self::Projection, Self::Error> {
23//!         Ok((event.order_id, event.amount_cents))
24//!     }
25//!
26//!     fn apply(
27//!         &self,
28//!         projection: &mut Self::Projection,
29//!         event: &OrderEvent,
30//!     ) -> Result<(), Self::Error> {
31//!         projection.1 += event.amount_cents;
32//!         Ok(())
33//!     }
34//! }
35//!
36//! let projections = reduce_grouped(
37//!     vec![
38//!         OrderEvent {
39//!             order_id: 2,
40//!             amount_cents: 100,
41//!         },
42//!         OrderEvent {
43//!             order_id: 1,
44//!             amount_cents: 50,
45//!         },
46//!         OrderEvent {
47//!             order_id: 2,
48//!             amount_cents: 25,
49//!         },
50//!     ],
51//!     |event| event.order_id,
52//!     &OrderTotals,
53//! )?;
54//!
55//! assert_eq!(projections, vec![(2, 125), (1, 50)]);
56//! # Ok::<(), statum_core::projection::ProjectionError<core::convert::Infallible>>(())
57//! ```
58
59use core::fmt;
60use std::collections::{hash_map::Entry, HashMap};
61use std::hash::Hash;
62
63/// Fold events into a projection that can later be rehydrated with `#[validators]`.
64pub trait ProjectionReducer<Event> {
65    /// The output projection type.
66    type Projection;
67    /// The reducer-specific error type.
68    type Error;
69
70    /// Create the first projection value from the first event in a stream.
71    fn seed(&self, event: &Event) -> Result<Self::Projection, Self::Error>;
72
73    /// Apply a later event to an existing projection value.
74    fn apply(&self, projection: &mut Self::Projection, event: &Event) -> Result<(), Self::Error>;
75}
76
77/// Errors returned by projection helpers.
78#[derive(Debug, PartialEq, Eq)]
79pub enum ProjectionError<E> {
80    /// The reducer was asked to fold an empty stream.
81    EmptyInput,
82    /// The reducer rejected one of the events in the stream.
83    Reducer(E),
84}
85
86impl<E> fmt::Display for ProjectionError<E>
87where
88    E: fmt::Display,
89{
90    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
91        match self {
92            Self::EmptyInput => write!(f, "projection input was empty"),
93            Self::Reducer(error) => write!(f, "{error}"),
94        }
95    }
96}
97
98impl<E> std::error::Error for ProjectionError<E> where E: std::error::Error + 'static {}
99
100/// Reduce one ordered event stream into one projection value.
101pub fn reduce_one<Event, I, R>(
102    events: I,
103    reducer: &R,
104) -> Result<R::Projection, ProjectionError<R::Error>>
105where
106    I: IntoIterator<Item = Event>,
107    R: ProjectionReducer<Event>,
108{
109    let mut iter = events.into_iter();
110    let first = iter.next().ok_or(ProjectionError::EmptyInput)?;
111    let mut projection = reducer.seed(&first).map_err(ProjectionError::Reducer)?;
112
113    for event in iter {
114        reducer
115            .apply(&mut projection, &event)
116            .map_err(ProjectionError::Reducer)?;
117    }
118
119    Ok(projection)
120}
121
122/// Reduce many ordered event streams into projection values grouped by key.
123///
124/// Events are applied in encounter order for each key, and the output preserves
125/// the first-seen order of keys in the input stream.
126pub fn reduce_grouped<Event, I, K, KF, R>(
127    events: I,
128    key_fn: KF,
129    reducer: &R,
130) -> Result<Vec<R::Projection>, ProjectionError<R::Error>>
131where
132    I: IntoIterator<Item = Event>,
133    KF: Fn(&Event) -> K,
134    K: Eq + Hash + Clone,
135    R: ProjectionReducer<Event>,
136{
137    let mut order = Vec::new();
138    let mut projections = HashMap::new();
139
140    for event in events {
141        let key = key_fn(&event);
142        match projections.entry(key.clone()) {
143            Entry::Occupied(mut entry) => {
144                reducer
145                    .apply(entry.get_mut(), &event)
146                    .map_err(ProjectionError::Reducer)?;
147            }
148            Entry::Vacant(entry) => {
149                order.push(key);
150                let projection = reducer.seed(&event).map_err(ProjectionError::Reducer)?;
151                entry.insert(projection);
152            }
153        }
154    }
155
156    let mut results = Vec::with_capacity(order.len());
157    for key in order {
158        if let Some(projection) = projections.remove(&key) {
159            results.push(projection);
160        }
161    }
162
163    Ok(results)
164}
165
166#[cfg(test)]
167mod tests {
168    use super::*;
169
170    #[derive(Clone, Debug, PartialEq, Eq)]
171    struct Event {
172        stream: &'static str,
173        value: i32,
174    }
175
176    struct SumReducer;
177
178    impl ProjectionReducer<Event> for SumReducer {
179        type Projection = i32;
180        type Error = &'static str;
181
182        fn seed(&self, event: &Event) -> Result<Self::Projection, Self::Error> {
183            if event.value < 0 {
184                Err("negative seed")
185            } else {
186                Ok(event.value)
187            }
188        }
189
190        fn apply(
191            &self,
192            projection: &mut Self::Projection,
193            event: &Event,
194        ) -> Result<(), Self::Error> {
195            if event.value < 0 {
196                return Err("negative apply");
197            }
198
199            *projection += event.value;
200            Ok(())
201        }
202    }
203
204    #[test]
205    fn reduce_one_requires_input() {
206        let result = reduce_one(Vec::<Event>::new(), &SumReducer);
207        assert_eq!(result, Err(ProjectionError::EmptyInput));
208    }
209
210    #[test]
211    fn reduce_one_folds_one_stream() {
212        let result = reduce_one(
213            vec![
214                Event {
215                    stream: "a",
216                    value: 1,
217                },
218                Event {
219                    stream: "a",
220                    value: 2,
221                },
222            ],
223            &SumReducer,
224        )
225        .unwrap();
226
227        assert_eq!(result, 3);
228    }
229
230    #[test]
231    fn reduce_grouped_preserves_first_seen_order() {
232        let result = reduce_grouped(
233            vec![
234                Event {
235                    stream: "b",
236                    value: 1,
237                },
238                Event {
239                    stream: "a",
240                    value: 2,
241                },
242                Event {
243                    stream: "b",
244                    value: 3,
245                },
246            ],
247            |event| event.stream,
248            &SumReducer,
249        )
250        .unwrap();
251
252        assert_eq!(result, vec![4, 2]);
253    }
254
255    #[test]
256    fn reduce_grouped_propagates_reducer_errors() {
257        let result = reduce_grouped(
258            vec![
259                Event {
260                    stream: "a",
261                    value: 1,
262                },
263                Event {
264                    stream: "a",
265                    value: -1,
266                },
267            ],
268            |event| event.stream,
269            &SumReducer,
270        );
271
272        assert_eq!(result, Err(ProjectionError::Reducer("negative apply")));
273    }
274}