Skip to main content

statum_core/
projection.rs

1use core::fmt;
2use std::collections::{hash_map::Entry, HashMap};
3use std::hash::Hash;
4
5/// Fold events into a projection that can later be rehydrated with `#[validators]`.
6pub trait ProjectionReducer<Event> {
7    /// The output projection type.
8    type Projection;
9    /// The reducer-specific error type.
10    type Error;
11
12    /// Create the first projection value from the first event in a stream.
13    fn seed(&self, event: &Event) -> Result<Self::Projection, Self::Error>;
14
15    /// Apply a later event to an existing projection value.
16    fn apply(&self, projection: &mut Self::Projection, event: &Event) -> Result<(), Self::Error>;
17}
18
19/// Errors returned by projection helpers.
20#[derive(Debug, PartialEq, Eq)]
21pub enum ProjectionError<E> {
22    /// The reducer was asked to fold an empty stream.
23    EmptyInput,
24    /// The reducer rejected one of the events in the stream.
25    Reducer(E),
26}
27
28impl<E> fmt::Display for ProjectionError<E>
29where
30    E: fmt::Display,
31{
32    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
33        match self {
34            Self::EmptyInput => write!(f, "projection input was empty"),
35            Self::Reducer(error) => write!(f, "{error}"),
36        }
37    }
38}
39
40impl<E> std::error::Error for ProjectionError<E> where E: std::error::Error + 'static {}
41
42/// Reduce one ordered event stream into one projection value.
43pub fn reduce_one<Event, I, R>(
44    events: I,
45    reducer: &R,
46) -> Result<R::Projection, ProjectionError<R::Error>>
47where
48    I: IntoIterator<Item = Event>,
49    R: ProjectionReducer<Event>,
50{
51    let mut iter = events.into_iter();
52    let first = iter.next().ok_or(ProjectionError::EmptyInput)?;
53    let mut projection = reducer.seed(&first).map_err(ProjectionError::Reducer)?;
54
55    for event in iter {
56        reducer
57            .apply(&mut projection, &event)
58            .map_err(ProjectionError::Reducer)?;
59    }
60
61    Ok(projection)
62}
63
64/// Reduce many ordered event streams into projection values grouped by key.
65///
66/// Events are applied in encounter order for each key, and the output preserves
67/// the first-seen order of keys in the input stream.
68pub fn reduce_grouped<Event, I, K, KF, R>(
69    events: I,
70    key_fn: KF,
71    reducer: &R,
72) -> Result<Vec<R::Projection>, ProjectionError<R::Error>>
73where
74    I: IntoIterator<Item = Event>,
75    KF: Fn(&Event) -> K,
76    K: Eq + Hash + Clone,
77    R: ProjectionReducer<Event>,
78{
79    let mut order = Vec::new();
80    let mut projections = HashMap::new();
81
82    for event in events {
83        let key = key_fn(&event);
84        match projections.entry(key.clone()) {
85            Entry::Occupied(mut entry) => {
86                reducer
87                    .apply(entry.get_mut(), &event)
88                    .map_err(ProjectionError::Reducer)?;
89            }
90            Entry::Vacant(entry) => {
91                order.push(key);
92                let projection = reducer.seed(&event).map_err(ProjectionError::Reducer)?;
93                entry.insert(projection);
94            }
95        }
96    }
97
98    let mut results = Vec::with_capacity(order.len());
99    for key in order {
100        if let Some(projection) = projections.remove(&key) {
101            results.push(projection);
102        }
103    }
104
105    Ok(results)
106}
107
108#[cfg(test)]
109mod tests {
110    use super::*;
111
112    #[derive(Clone, Debug, PartialEq, Eq)]
113    struct Event {
114        stream: &'static str,
115        value: i32,
116    }
117
118    struct SumReducer;
119
120    impl ProjectionReducer<Event> for SumReducer {
121        type Projection = i32;
122        type Error = &'static str;
123
124        fn seed(&self, event: &Event) -> Result<Self::Projection, Self::Error> {
125            if event.value < 0 {
126                Err("negative seed")
127            } else {
128                Ok(event.value)
129            }
130        }
131
132        fn apply(
133            &self,
134            projection: &mut Self::Projection,
135            event: &Event,
136        ) -> Result<(), Self::Error> {
137            if event.value < 0 {
138                return Err("negative apply");
139            }
140
141            *projection += event.value;
142            Ok(())
143        }
144    }
145
146    #[test]
147    fn reduce_one_requires_input() {
148        let result = reduce_one(Vec::<Event>::new(), &SumReducer);
149        assert_eq!(result, Err(ProjectionError::EmptyInput));
150    }
151
152    #[test]
153    fn reduce_one_folds_one_stream() {
154        let result = reduce_one(
155            vec![
156                Event {
157                    stream: "a",
158                    value: 1,
159                },
160                Event {
161                    stream: "a",
162                    value: 2,
163                },
164            ],
165            &SumReducer,
166        )
167        .unwrap();
168
169        assert_eq!(result, 3);
170    }
171
172    #[test]
173    fn reduce_grouped_preserves_first_seen_order() {
174        let result = reduce_grouped(
175            vec![
176                Event {
177                    stream: "b",
178                    value: 1,
179                },
180                Event {
181                    stream: "a",
182                    value: 2,
183                },
184                Event {
185                    stream: "b",
186                    value: 3,
187                },
188            ],
189            |event| event.stream,
190            &SumReducer,
191        )
192        .unwrap();
193
194        assert_eq!(result, vec![4, 2]);
195    }
196
197    #[test]
198    fn reduce_grouped_propagates_reducer_errors() {
199        let result = reduce_grouped(
200            vec![
201                Event {
202                    stream: "a",
203                    value: 1,
204                },
205                Event {
206                    stream: "a",
207                    value: -1,
208                },
209            ],
210            |event| event.stream,
211            &SumReducer,
212        );
213
214        assert_eq!(result, Err(ProjectionError::Reducer("negative apply")));
215    }
216}