statum_core/
projection.rs1use core::fmt;
60use std::collections::{hash_map::Entry, HashMap};
61use std::hash::Hash;
62
63pub trait ProjectionReducer<Event> {
65 type Projection;
67 type Error;
69
70 fn seed(&self, event: &Event) -> Result<Self::Projection, Self::Error>;
72
73 fn apply(&self, projection: &mut Self::Projection, event: &Event) -> Result<(), Self::Error>;
75}
76
77#[derive(Debug, PartialEq, Eq)]
79pub enum ProjectionError<E> {
80 EmptyInput,
82 Reducer(E),
84}
85
86impl<E> fmt::Display for ProjectionError<E>
87where
88 E: fmt::Display,
89{
90 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
91 match self {
92 Self::EmptyInput => write!(f, "projection input was empty"),
93 Self::Reducer(error) => write!(f, "{error}"),
94 }
95 }
96}
97
98impl<E> std::error::Error for ProjectionError<E> where E: std::error::Error + 'static {}
99
100pub fn reduce_one<Event, I, R>(
102 events: I,
103 reducer: &R,
104) -> Result<R::Projection, ProjectionError<R::Error>>
105where
106 I: IntoIterator<Item = Event>,
107 R: ProjectionReducer<Event>,
108{
109 let mut iter = events.into_iter();
110 let first = iter.next().ok_or(ProjectionError::EmptyInput)?;
111 let mut projection = reducer.seed(&first).map_err(ProjectionError::Reducer)?;
112
113 for event in iter {
114 reducer
115 .apply(&mut projection, &event)
116 .map_err(ProjectionError::Reducer)?;
117 }
118
119 Ok(projection)
120}
121
122pub fn reduce_grouped<Event, I, K, KF, R>(
127 events: I,
128 key_fn: KF,
129 reducer: &R,
130) -> Result<Vec<R::Projection>, ProjectionError<R::Error>>
131where
132 I: IntoIterator<Item = Event>,
133 KF: Fn(&Event) -> K,
134 K: Eq + Hash + Clone,
135 R: ProjectionReducer<Event>,
136{
137 let mut order = Vec::new();
138 let mut projections = HashMap::new();
139
140 for event in events {
141 let key = key_fn(&event);
142 match projections.entry(key.clone()) {
143 Entry::Occupied(mut entry) => {
144 reducer
145 .apply(entry.get_mut(), &event)
146 .map_err(ProjectionError::Reducer)?;
147 }
148 Entry::Vacant(entry) => {
149 order.push(key);
150 let projection = reducer.seed(&event).map_err(ProjectionError::Reducer)?;
151 entry.insert(projection);
152 }
153 }
154 }
155
156 let mut results = Vec::with_capacity(order.len());
157 for key in order {
158 if let Some(projection) = projections.remove(&key) {
159 results.push(projection);
160 }
161 }
162
163 Ok(results)
164}
165
166#[cfg(test)]
167mod tests {
168 use super::*;
169
170 #[derive(Clone, Debug, PartialEq, Eq)]
171 struct Event {
172 stream: &'static str,
173 value: i32,
174 }
175
176 struct SumReducer;
177
178 impl ProjectionReducer<Event> for SumReducer {
179 type Projection = i32;
180 type Error = &'static str;
181
182 fn seed(&self, event: &Event) -> Result<Self::Projection, Self::Error> {
183 if event.value < 0 {
184 Err("negative seed")
185 } else {
186 Ok(event.value)
187 }
188 }
189
190 fn apply(
191 &self,
192 projection: &mut Self::Projection,
193 event: &Event,
194 ) -> Result<(), Self::Error> {
195 if event.value < 0 {
196 return Err("negative apply");
197 }
198
199 *projection += event.value;
200 Ok(())
201 }
202 }
203
204 #[test]
205 fn reduce_one_requires_input() {
206 let result = reduce_one(Vec::<Event>::new(), &SumReducer);
207 assert_eq!(result, Err(ProjectionError::EmptyInput));
208 }
209
210 #[test]
211 fn reduce_one_folds_one_stream() {
212 let result = reduce_one(
213 vec![
214 Event {
215 stream: "a",
216 value: 1,
217 },
218 Event {
219 stream: "a",
220 value: 2,
221 },
222 ],
223 &SumReducer,
224 )
225 .unwrap();
226
227 assert_eq!(result, 3);
228 }
229
230 #[test]
231 fn reduce_grouped_preserves_first_seen_order() {
232 let result = reduce_grouped(
233 vec![
234 Event {
235 stream: "b",
236 value: 1,
237 },
238 Event {
239 stream: "a",
240 value: 2,
241 },
242 Event {
243 stream: "b",
244 value: 3,
245 },
246 ],
247 |event| event.stream,
248 &SumReducer,
249 )
250 .unwrap();
251
252 assert_eq!(result, vec![4, 2]);
253 }
254
255 #[test]
256 fn reduce_grouped_propagates_reducer_errors() {
257 let result = reduce_grouped(
258 vec![
259 Event {
260 stream: "a",
261 value: 1,
262 },
263 Event {
264 stream: "a",
265 value: -1,
266 },
267 ],
268 |event| event.stream,
269 &SumReducer,
270 );
271
272 assert_eq!(result, Err(ProjectionError::Reducer("negative apply")));
273 }
274}