use core::fmt;
use std::marker::PhantomData;
use crate::types::Outcome;
use crate::types::cancel::CancelReason;
use crate::types::outcome::PanicPayload;
use crate::types::policy::AggregateDecision;
#[derive(Debug)]
pub struct MapReduce<T> {
_t: PhantomData<T>,
}
impl<T> MapReduce<T> {
#[must_use]
pub const fn new() -> Self {
Self { _t: PhantomData }
}
}
impl<T> Default for MapReduce<T> {
fn default() -> Self {
Self::new()
}
}
impl<T> Clone for MapReduce<T> {
fn clone(&self) -> Self {
*self
}
}
impl<T> Copy for MapReduce<T> {}
pub struct MapReduceResult<T, E> {
pub decision: AggregateDecision<E>,
pub reduced: Option<T>,
pub successes: Vec<(usize, T)>,
pub total_count: usize,
}
impl<T, E> MapReduceResult<T, E> {
#[must_use]
pub fn new(
decision: AggregateDecision<E>,
reduced: Option<T>,
successes: Vec<(usize, T)>,
total_count: usize,
) -> Self {
Self {
decision,
reduced,
successes,
total_count,
}
}
#[must_use]
pub fn all_succeeded(&self) -> bool {
self.total_count > 0
&& matches!(self.decision, AggregateDecision::AllOk)
&& self.successes.len() == self.total_count
}
#[must_use]
pub fn success_count(&self) -> usize {
self.successes.len()
}
#[must_use]
pub fn failure_count(&self) -> usize {
self.total_count - self.successes.len()
}
#[must_use]
pub fn has_reduced(&self) -> bool {
self.reduced.is_some()
}
}
impl<T: fmt::Debug, E: fmt::Debug> fmt::Debug for MapReduceResult<T, E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("MapReduceResult")
.field("decision", &self.decision)
.field("reduced", &self.reduced)
.field("successes", &self.successes)
.field("total_count", &self.total_count)
.finish()
}
}
#[derive(Debug, Clone)]
pub enum MapReduceError<E> {
Error {
error: E,
index: usize,
total_failures: usize,
success_count: usize,
},
Cancelled(CancelReason),
Panicked {
payload: PanicPayload,
index: usize,
},
Empty,
}
impl<E> MapReduceError<E> {
#[must_use]
pub const fn error_index(&self) -> Option<usize> {
match self {
Self::Error { index, .. } => Some(*index),
_ => None,
}
}
#[must_use]
pub const fn panic_index(&self) -> Option<usize> {
match self {
Self::Panicked { index, .. } => Some(*index),
_ => None,
}
}
#[must_use]
pub const fn is_error(&self) -> bool {
matches!(self, Self::Error { .. })
}
#[must_use]
pub const fn is_cancelled(&self) -> bool {
matches!(self, Self::Cancelled(_))
}
#[must_use]
pub const fn is_panicked(&self) -> bool {
matches!(self, Self::Panicked { .. })
}
#[must_use]
pub const fn is_empty(&self) -> bool {
matches!(self, Self::Empty)
}
}
impl<E: fmt::Display> fmt::Display for MapReduceError<E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Error {
error,
index,
total_failures,
success_count,
} => write!(
f,
"map-reduce task {index} failed: {error} ({total_failures} failures, {success_count} successes)"
),
Self::Cancelled(r) => write!(f, "map-reduce cancelled: {r}"),
Self::Panicked { payload, index } => {
write!(f, "map-reduce task {index} panicked: {payload}")
}
Self::Empty => write!(f, "map-reduce requires at least one input"),
}
}
}
impl<E: fmt::Debug + fmt::Display> std::error::Error for MapReduceError<E> {}
pub fn map_reduce_outcomes<T, E, F>(
outcomes: Vec<Outcome<T, E>>,
reduce: F,
) -> (AggregateDecision<E>, Option<T>, Vec<(usize, T)>)
where
F: Fn(T, T) -> T,
T: Clone,
{
let total = outcomes.len();
let mut successes: Vec<(usize, T)> = Vec::with_capacity(total);
let mut first_error: Option<E> = None;
let mut strongest_cancel: Option<CancelReason> = None;
let mut panic_payload: Option<PanicPayload> = None;
let mut panic_index: Option<usize> = None;
for (i, outcome) in outcomes.into_iter().enumerate() {
match outcome {
Outcome::Panicked(p) => {
if panic_payload.is_none() {
panic_payload = Some(p);
panic_index = Some(i);
}
}
Outcome::Cancelled(r) => match &mut strongest_cancel {
None => strongest_cancel = Some(r),
Some(existing) => {
existing.strengthen(&r);
}
},
Outcome::Err(e) => {
if first_error.is_none() {
first_error = Some(e);
}
}
Outcome::Ok(v) => {
successes.push((i, v));
}
}
}
let decision = panic_payload.map_or_else(
|| {
strongest_cancel.map_or_else(
|| first_error.map_or(AggregateDecision::AllOk, AggregateDecision::FirstError),
AggregateDecision::Cancelled,
)
},
|p| AggregateDecision::Panicked {
payload: p,
first_panic_index: panic_index.expect("panic index missing"),
},
);
let reduced = if successes.is_empty() {
None
} else {
let mut iter = successes.iter();
let (_, first) = iter.next().expect("already checked non-empty");
let result = iter.fold(first.clone(), |acc, (_, v)| reduce(acc, v.clone()));
Some(result)
};
(decision, reduced, successes)
}
#[must_use]
pub fn make_map_reduce_result<T, E, F>(
outcomes: Vec<Outcome<T, E>>,
reduce: F,
) -> MapReduceResult<T, E>
where
F: Fn(T, T) -> T,
T: Clone,
{
let total_count = outcomes.len();
let (decision, reduced, successes) = map_reduce_outcomes(outcomes, reduce);
MapReduceResult::new(decision, reduced, successes, total_count)
}
pub fn map_reduce_to_result<T, E>(result: MapReduceResult<T, E>) -> Result<T, MapReduceError<E>> {
if result.total_count == 0 {
return Err(MapReduceError::Empty);
}
match result.decision {
AggregateDecision::AllOk => {
result.reduced.ok_or_else(|| MapReduceError::Empty)
}
AggregateDecision::FirstError(e) => {
let success_indices: std::collections::HashSet<usize> =
result.successes.iter().map(|(i, _)| *i).collect();
let first_error_index = (0..result.total_count)
.find(|i| !success_indices.contains(i))
.unwrap_or(0);
let total_failures = result.total_count - result.successes.len();
Err(MapReduceError::Error {
error: e,
index: first_error_index,
total_failures,
success_count: result.successes.len(),
})
}
AggregateDecision::Cancelled(r) => Err(MapReduceError::Cancelled(r)),
AggregateDecision::Panicked {
payload,
first_panic_index,
} => Err(MapReduceError::Panicked {
payload,
index: first_panic_index,
}),
}
}
#[must_use]
pub fn reduce_successes<T: Clone, E>(result: &MapReduceResult<T, E>) -> Option<T> {
result.reduced.clone()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn map_reduce_marker_type() {
let _mr: MapReduce<i32> = MapReduce::new();
let _mr_default: MapReduce<String> = MapReduce::default();
let m1: MapReduce<i32> = MapReduce::new();
let m2 = m1;
let m3 = m1;
assert!(std::mem::size_of_val(&m1) == std::mem::size_of_val(&m2));
assert!(std::mem::size_of_val(&m1) == std::mem::size_of_val(&m3));
}
#[test]
fn map_reduce_result_all_succeeded() {
let result: MapReduceResult<i32, &str> = MapReduceResult::new(
AggregateDecision::AllOk,
Some(6),
vec![(0, 1), (1, 2), (2, 3)],
3,
);
assert!(result.all_succeeded());
assert_eq!(result.success_count(), 3);
assert_eq!(result.failure_count(), 0);
assert!(result.has_reduced());
}
#[test]
fn map_reduce_result_partial_failure() {
let result: MapReduceResult<i32, &str> = MapReduceResult::new(
AggregateDecision::FirstError("oops"),
Some(4), vec![(0, 1), (2, 3)],
3,
);
assert!(!result.all_succeeded());
assert_eq!(result.success_count(), 2);
assert_eq!(result.failure_count(), 1);
assert!(result.has_reduced());
}
#[test]
fn map_reduce_error_predicates() {
let err: MapReduceError<&str> = MapReduceError::Error {
error: "test",
index: 2,
total_failures: 1,
success_count: 2,
};
assert!(err.is_error());
assert!(!err.is_cancelled());
assert!(!err.is_panicked());
assert!(!err.is_empty());
assert_eq!(err.error_index(), Some(2));
let err: MapReduceError<&str> = MapReduceError::Cancelled(CancelReason::timeout());
assert!(!err.is_error());
assert!(err.is_cancelled());
assert_eq!(err.error_index(), None);
let err: MapReduceError<&str> = MapReduceError::Panicked {
payload: PanicPayload::new("boom"),
index: 3,
};
assert!(!err.is_error());
assert!(err.is_panicked());
assert_eq!(err.panic_index(), Some(3));
let err: MapReduceError<&str> = MapReduceError::Empty;
assert!(err.is_empty());
}
#[test]
fn map_reduce_error_display() {
let err: MapReduceError<&str> = MapReduceError::Error {
error: "test error",
index: 3,
total_failures: 2,
success_count: 5,
};
let msg = err.to_string();
assert!(msg.contains("task 3"));
assert!(msg.contains("test error"));
assert!(msg.contains("2 failures"));
assert!(msg.contains("5 successes"));
let err: MapReduceError<&str> = MapReduceError::Panicked {
payload: PanicPayload::new("boom"),
index: 1,
};
assert!(err.to_string().contains("task 1 panicked"));
assert!(err.to_string().contains("boom"));
let err: MapReduceError<&str> = MapReduceError::Empty;
assert!(err.to_string().contains("at least one input"));
}
#[test]
fn map_reduce_outcomes_all_ok_sum() {
let outcomes: Vec<Outcome<i32, &str>> =
vec![Outcome::Ok(1), Outcome::Ok(2), Outcome::Ok(3)];
let (decision, reduced, successes) = map_reduce_outcomes(outcomes, |a, b| a + b);
assert!(matches!(decision, AggregateDecision::AllOk));
assert_eq!(reduced, Some(6)); assert_eq!(successes.len(), 3);
}
#[test]
fn map_reduce_outcomes_all_ok_product() {
let outcomes: Vec<Outcome<i32, &str>> =
vec![Outcome::Ok(2), Outcome::Ok(3), Outcome::Ok(4)];
let (decision, reduced, _) = map_reduce_outcomes(outcomes, |a, b| a * b);
assert!(matches!(decision, AggregateDecision::AllOk));
assert_eq!(reduced, Some(24)); }
#[test]
fn map_reduce_outcomes_partial_failure() {
let outcomes: Vec<Outcome<i32, &str>> =
vec![Outcome::Ok(1), Outcome::Err("failed"), Outcome::Ok(3)];
let (decision, reduced, successes) = map_reduce_outcomes(outcomes, |a, b| a + b);
assert!(matches!(decision, AggregateDecision::FirstError("failed")));
assert_eq!(reduced, Some(4)); assert_eq!(successes.len(), 2);
}
#[test]
fn map_reduce_outcomes_cancelled() {
let outcomes: Vec<Outcome<i32, &str>> = vec![
Outcome::Ok(1),
Outcome::Cancelled(CancelReason::timeout()),
Outcome::Ok(3),
];
let (decision, reduced, _) = map_reduce_outcomes(outcomes, |a, b| a + b);
assert!(matches!(decision, AggregateDecision::Cancelled(_)));
assert_eq!(reduced, Some(4)); }
#[test]
fn map_reduce_outcomes_panicked() {
let outcomes: Vec<Outcome<i32, &str>> = vec![
Outcome::Ok(1),
Outcome::Panicked(PanicPayload::new("boom")),
Outcome::Ok(3),
];
let (decision, reduced, successes) = map_reduce_outcomes(outcomes, |a, b| a + b);
match decision {
AggregateDecision::Panicked {
payload: _,
first_panic_index,
} => assert_eq!(first_panic_index, 1),
_ => panic!("Expected Panicked decision"),
}
assert_eq!(successes.len(), 2);
assert_eq!(reduced, Some(4)); }
#[test]
fn map_reduce_outcomes_preserves_input_order() {
let outcomes: Vec<Outcome<i32, &str>> =
vec![Outcome::Ok(1), Outcome::Ok(10), Outcome::Ok(100)];
let (_, reduced, _) = map_reduce_outcomes(outcomes, |a, b| a - b);
assert_eq!(reduced, Some(-109));
}
#[test]
fn map_reduce_outcomes_single_value() {
let outcomes: Vec<Outcome<i32, &str>> = vec![Outcome::Ok(42)];
let (decision, reduced, successes) = map_reduce_outcomes(outcomes, |a, b| a + b);
assert!(matches!(decision, AggregateDecision::AllOk));
assert_eq!(reduced, Some(42)); assert_eq!(successes.len(), 1);
}
#[test]
fn map_reduce_outcomes_empty() {
let outcomes: Vec<Outcome<i32, &str>> = vec![];
let (decision, reduced, successes) = map_reduce_outcomes(outcomes, |a, b| a + b);
assert!(matches!(decision, AggregateDecision::AllOk));
assert_eq!(reduced, None); assert!(successes.is_empty());
}
#[test]
fn map_reduce_result_empty_not_all_succeeded() {
let result: MapReduceResult<i32, &str> =
MapReduceResult::new(AggregateDecision::AllOk, None, vec![], 0);
assert!(!result.all_succeeded());
assert!(!result.has_reduced());
}
#[test]
fn make_map_reduce_result_success() {
let outcomes: Vec<Outcome<i32, &str>> =
vec![Outcome::Ok(1), Outcome::Ok(2), Outcome::Ok(3)];
let result = make_map_reduce_result(outcomes, |a, b| a + b);
assert!(result.all_succeeded());
assert_eq!(result.reduced, Some(6));
assert_eq!(result.total_count, 3);
}
#[test]
fn map_reduce_to_result_all_ok() {
let outcomes: Vec<Outcome<i32, &str>> =
vec![Outcome::Ok(1), Outcome::Ok(2), Outcome::Ok(3)];
let result = make_map_reduce_result(outcomes, |a, b| a + b);
let value = map_reduce_to_result(result);
assert_eq!(value.unwrap(), 6);
}
#[test]
fn map_reduce_to_result_error() {
let outcomes: Vec<Outcome<i32, &str>> =
vec![Outcome::Ok(1), Outcome::Err("failed"), Outcome::Ok(3)];
let result = make_map_reduce_result(outcomes, |a, b| a + b);
let value = map_reduce_to_result(result);
match value {
Err(MapReduceError::Error {
error,
index,
total_failures,
success_count,
}) => {
assert_eq!(error, "failed");
assert_eq!(index, 1);
assert_eq!(total_failures, 1);
assert_eq!(success_count, 2);
}
_ => panic!("expected MapReduceError::Error"),
}
}
#[test]
fn map_reduce_to_result_cancelled() {
let outcomes: Vec<Outcome<i32, &str>> =
vec![Outcome::Ok(1), Outcome::Cancelled(CancelReason::timeout())];
let result = make_map_reduce_result(outcomes, |a, b| a + b);
let value = map_reduce_to_result(result);
assert!(matches!(value, Err(MapReduceError::Cancelled(_))));
}
#[test]
fn map_reduce_to_result_panicked() {
let outcomes: Vec<Outcome<i32, &str>> = vec![Outcome::Panicked(PanicPayload::new("crash"))];
let result = make_map_reduce_result(outcomes, |a, b| a + b);
let value = map_reduce_to_result(result);
match value {
Err(MapReduceError::Panicked { payload: _, index }) => assert_eq!(index, 0),
_ => panic!("Expected Panicked error"),
}
}
#[test]
fn map_reduce_to_result_empty() {
let outcomes: Vec<Outcome<i32, &str>> = vec![];
let result = make_map_reduce_result(outcomes, |a, b| a + b);
let value = map_reduce_to_result(result);
assert!(matches!(value, Err(MapReduceError::Empty)));
}
#[test]
fn reduce_successes_partial() {
let outcomes: Vec<Outcome<i32, &str>> =
vec![Outcome::Ok(1), Outcome::Err("failed"), Outcome::Ok(3)];
let result = make_map_reduce_result(outcomes, |a, b| a + b);
let partial = reduce_successes(&result);
assert_eq!(partial, Some(4)); }
#[test]
fn reduce_successes_none_succeeded() {
let outcomes: Vec<Outcome<i32, &str>> =
vec![Outcome::Err("failed1"), Outcome::Err("failed2")];
let result = make_map_reduce_result(outcomes, |a, b| a + b);
let partial = reduce_successes(&result);
assert_eq!(partial, None);
}
#[test]
fn map_reduce_string_concat() {
let outcomes: Vec<Outcome<String, &str>> = vec![
Outcome::Ok("Hello".to_string()),
Outcome::Ok(" ".to_string()),
Outcome::Ok("World".to_string()),
];
let result = make_map_reduce_result(outcomes, |a, b| a + &b);
assert_eq!(result.reduced, Some("Hello World".to_string()));
}
#[test]
fn map_reduce_associative_vs_non_associative() {
let outcomes_a: Vec<Outcome<i32, &str>> =
vec![Outcome::Ok(1), Outcome::Ok(2), Outcome::Ok(3)];
let outcomes_b = outcomes_a.clone();
let sum_result = make_map_reduce_result(outcomes_a, |a, b| a + b);
assert_eq!(sum_result.reduced, Some(6));
let difference_result = make_map_reduce_result(outcomes_b, |a, b| a - b);
assert_eq!(difference_result.reduced, Some(-4));
}
#[test]
fn metamorphic_commutative_reducer_is_permutation_invariant() {
let outcomes_a: Vec<Outcome<i32, &str>> = vec![
Outcome::Ok(3),
Outcome::Ok(1),
Outcome::Ok(4),
Outcome::Ok(2),
];
let outcomes_b: Vec<Outcome<i32, &str>> = vec![
Outcome::Ok(2),
Outcome::Ok(4),
Outcome::Ok(1),
Outcome::Ok(3),
];
let (decision_a, reduced_a, successes_a) = map_reduce_outcomes(outcomes_a, |a, b| a + b);
let (decision_b, reduced_b, successes_b) = map_reduce_outcomes(outcomes_b, |a, b| a + b);
assert!(matches!(decision_a, AggregateDecision::AllOk));
assert!(matches!(decision_b, AggregateDecision::AllOk));
assert_eq!(
reduced_a, reduced_b,
"commutative reduction should be invariant under permutation of successful inputs"
);
assert_eq!(reduced_a, Some(10));
assert_eq!(successes_a.len(), successes_b.len());
assert_eq!(successes_a.len(), 4);
assert_ne!(
successes_a, successes_b,
"permuted inputs should still preserve their own input-order success traces"
);
}
#[test]
fn map_reduce_error_debug_clone() {
let e: MapReduceError<&str> = MapReduceError::Error {
error: "bad",
index: 2,
total_failures: 1,
success_count: 3,
};
let e2 = e.clone();
let dbg = format!("{e:?}");
assert!(dbg.contains("Error"));
let dbg2 = format!("{e2:?}");
assert!(dbg2.contains("Error"));
let empty: MapReduceError<&str> = MapReduceError::Empty;
let empty2 = empty.clone();
let dbg3 = format!("{empty:?}");
assert!(dbg3.contains("Empty"));
let dbg4 = format!("{empty2:?}");
assert!(dbg4.contains("Empty"));
}
}