use core::fmt;
use std::collections::{hash_map::Entry, HashMap};
use std::hash::Hash;
pub trait ProjectionReducer<Event> {
type Projection;
type Error;
fn seed(&self, event: &Event) -> Result<Self::Projection, Self::Error>;
fn apply(&self, projection: &mut Self::Projection, event: &Event) -> Result<(), Self::Error>;
}
#[derive(Debug, PartialEq, Eq)]
pub enum ProjectionError<E> {
EmptyInput,
Reducer(E),
}
impl<E> fmt::Display for ProjectionError<E>
where
E: fmt::Display,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::EmptyInput => write!(f, "projection input was empty"),
Self::Reducer(error) => write!(f, "{error}"),
}
}
}
impl<E> std::error::Error for ProjectionError<E> where E: std::error::Error + 'static {}
pub fn reduce_one<Event, I, R>(
events: I,
reducer: &R,
) -> Result<R::Projection, ProjectionError<R::Error>>
where
I: IntoIterator<Item = Event>,
R: ProjectionReducer<Event>,
{
let mut iter = events.into_iter();
let first = iter.next().ok_or(ProjectionError::EmptyInput)?;
let mut projection = reducer.seed(&first).map_err(ProjectionError::Reducer)?;
for event in iter {
reducer
.apply(&mut projection, &event)
.map_err(ProjectionError::Reducer)?;
}
Ok(projection)
}
pub fn reduce_grouped<Event, I, K, KF, R>(
events: I,
key_fn: KF,
reducer: &R,
) -> Result<Vec<R::Projection>, ProjectionError<R::Error>>
where
I: IntoIterator<Item = Event>,
KF: Fn(&Event) -> K,
K: Eq + Hash + Clone,
R: ProjectionReducer<Event>,
{
let mut order = Vec::new();
let mut projections = HashMap::new();
for event in events {
let key = key_fn(&event);
match projections.entry(key.clone()) {
Entry::Occupied(mut entry) => {
reducer
.apply(entry.get_mut(), &event)
.map_err(ProjectionError::Reducer)?;
}
Entry::Vacant(entry) => {
order.push(key);
let projection = reducer.seed(&event).map_err(ProjectionError::Reducer)?;
entry.insert(projection);
}
}
}
let mut results = Vec::with_capacity(order.len());
for key in order {
if let Some(projection) = projections.remove(&key) {
results.push(projection);
}
}
Ok(results)
}
#[cfg(test)]
mod tests {
use super::*;
#[derive(Clone, Debug, PartialEq, Eq)]
struct Event {
stream: &'static str,
value: i32,
}
struct SumReducer;
impl ProjectionReducer<Event> for SumReducer {
type Projection = i32;
type Error = &'static str;
fn seed(&self, event: &Event) -> Result<Self::Projection, Self::Error> {
if event.value < 0 {
Err("negative seed")
} else {
Ok(event.value)
}
}
fn apply(
&self,
projection: &mut Self::Projection,
event: &Event,
) -> Result<(), Self::Error> {
if event.value < 0 {
return Err("negative apply");
}
*projection += event.value;
Ok(())
}
}
#[test]
fn reduce_one_requires_input() {
let result = reduce_one(Vec::<Event>::new(), &SumReducer);
assert_eq!(result, Err(ProjectionError::EmptyInput));
}
#[test]
fn reduce_one_folds_one_stream() {
let result = reduce_one(
vec![
Event {
stream: "a",
value: 1,
},
Event {
stream: "a",
value: 2,
},
],
&SumReducer,
)
.unwrap();
assert_eq!(result, 3);
}
#[test]
fn reduce_grouped_preserves_first_seen_order() {
let result = reduce_grouped(
vec![
Event {
stream: "b",
value: 1,
},
Event {
stream: "a",
value: 2,
},
Event {
stream: "b",
value: 3,
},
],
|event| event.stream,
&SumReducer,
)
.unwrap();
assert_eq!(result, vec![4, 2]);
}
#[test]
fn reduce_grouped_propagates_reducer_errors() {
let result = reduce_grouped(
vec![
Event {
stream: "a",
value: 1,
},
Event {
stream: "a",
value: -1,
},
],
|event| event.stream,
&SumReducer,
);
assert_eq!(result, Err(ProjectionError::Reducer("negative apply")));
}
}