statum_core/
projection.rs1use core::fmt;
2use std::collections::{hash_map::Entry, HashMap};
3use std::hash::Hash;
4
5pub trait ProjectionReducer<Event> {
7 type Projection;
9 type Error;
11
12 fn seed(&self, event: &Event) -> Result<Self::Projection, Self::Error>;
14
15 fn apply(&self, projection: &mut Self::Projection, event: &Event) -> Result<(), Self::Error>;
17}
18
19#[derive(Debug, PartialEq, Eq)]
21pub enum ProjectionError<E> {
22 EmptyInput,
24 Reducer(E),
26}
27
28impl<E> fmt::Display for ProjectionError<E>
29where
30 E: fmt::Display,
31{
32 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
33 match self {
34 Self::EmptyInput => write!(f, "projection input was empty"),
35 Self::Reducer(error) => write!(f, "{error}"),
36 }
37 }
38}
39
40impl<E> std::error::Error for ProjectionError<E> where E: std::error::Error + 'static {}
41
42pub fn reduce_one<Event, I, R>(
44 events: I,
45 reducer: &R,
46) -> Result<R::Projection, ProjectionError<R::Error>>
47where
48 I: IntoIterator<Item = Event>,
49 R: ProjectionReducer<Event>,
50{
51 let mut iter = events.into_iter();
52 let first = iter.next().ok_or(ProjectionError::EmptyInput)?;
53 let mut projection = reducer.seed(&first).map_err(ProjectionError::Reducer)?;
54
55 for event in iter {
56 reducer
57 .apply(&mut projection, &event)
58 .map_err(ProjectionError::Reducer)?;
59 }
60
61 Ok(projection)
62}
63
64pub fn reduce_grouped<Event, I, K, KF, R>(
69 events: I,
70 key_fn: KF,
71 reducer: &R,
72) -> Result<Vec<R::Projection>, ProjectionError<R::Error>>
73where
74 I: IntoIterator<Item = Event>,
75 KF: Fn(&Event) -> K,
76 K: Eq + Hash + Clone,
77 R: ProjectionReducer<Event>,
78{
79 let mut order = Vec::new();
80 let mut projections = HashMap::new();
81
82 for event in events {
83 let key = key_fn(&event);
84 match projections.entry(key.clone()) {
85 Entry::Occupied(mut entry) => {
86 reducer
87 .apply(entry.get_mut(), &event)
88 .map_err(ProjectionError::Reducer)?;
89 }
90 Entry::Vacant(entry) => {
91 order.push(key);
92 let projection = reducer.seed(&event).map_err(ProjectionError::Reducer)?;
93 entry.insert(projection);
94 }
95 }
96 }
97
98 let mut results = Vec::with_capacity(order.len());
99 for key in order {
100 if let Some(projection) = projections.remove(&key) {
101 results.push(projection);
102 }
103 }
104
105 Ok(results)
106}
107
108#[cfg(test)]
109mod tests {
110 use super::*;
111
112 #[derive(Clone, Debug, PartialEq, Eq)]
113 struct Event {
114 stream: &'static str,
115 value: i32,
116 }
117
118 struct SumReducer;
119
120 impl ProjectionReducer<Event> for SumReducer {
121 type Projection = i32;
122 type Error = &'static str;
123
124 fn seed(&self, event: &Event) -> Result<Self::Projection, Self::Error> {
125 if event.value < 0 {
126 Err("negative seed")
127 } else {
128 Ok(event.value)
129 }
130 }
131
132 fn apply(
133 &self,
134 projection: &mut Self::Projection,
135 event: &Event,
136 ) -> Result<(), Self::Error> {
137 if event.value < 0 {
138 return Err("negative apply");
139 }
140
141 *projection += event.value;
142 Ok(())
143 }
144 }
145
146 #[test]
147 fn reduce_one_requires_input() {
148 let result = reduce_one(Vec::<Event>::new(), &SumReducer);
149 assert_eq!(result, Err(ProjectionError::EmptyInput));
150 }
151
152 #[test]
153 fn reduce_one_folds_one_stream() {
154 let result = reduce_one(
155 vec![
156 Event {
157 stream: "a",
158 value: 1,
159 },
160 Event {
161 stream: "a",
162 value: 2,
163 },
164 ],
165 &SumReducer,
166 )
167 .unwrap();
168
169 assert_eq!(result, 3);
170 }
171
172 #[test]
173 fn reduce_grouped_preserves_first_seen_order() {
174 let result = reduce_grouped(
175 vec![
176 Event {
177 stream: "b",
178 value: 1,
179 },
180 Event {
181 stream: "a",
182 value: 2,
183 },
184 Event {
185 stream: "b",
186 value: 3,
187 },
188 ],
189 |event| event.stream,
190 &SumReducer,
191 )
192 .unwrap();
193
194 assert_eq!(result, vec![4, 2]);
195 }
196
197 #[test]
198 fn reduce_grouped_propagates_reducer_errors() {
199 let result = reduce_grouped(
200 vec![
201 Event {
202 stream: "a",
203 value: 1,
204 },
205 Event {
206 stream: "a",
207 value: -1,
208 },
209 ],
210 |event| event.stream,
211 &SumReducer,
212 );
213
214 assert_eq!(result, Err(ProjectionError::Reducer("negative apply")));
215 }
216}