use core::fmt;
use std::marker::PhantomData;
use crate::types::Outcome;
use crate::types::cancel::CancelReason;
use crate::types::outcome::PanicPayload;
#[derive(Debug)]
pub struct Quorum<T, E> {
_t: PhantomData<T>,
_e: PhantomData<E>,
}
impl<T, E> Quorum<T, E> {
#[inline]
#[must_use]
pub const fn new() -> Self {
Self {
_t: PhantomData,
_e: PhantomData,
}
}
}
impl<T, E> Default for Quorum<T, E> {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub enum QuorumError<E> {
InsufficientSuccesses {
required: usize,
total: usize,
achieved: usize,
errors: Vec<E>,
},
Cancelled(CancelReason),
Panicked(PanicPayload),
InvalidQuorum {
required: usize,
total: usize,
},
}
impl<E: fmt::Display> fmt::Display for QuorumError<E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::InsufficientSuccesses {
required,
total,
achieved,
errors,
} => {
write!(
f,
"quorum not met: needed {required}/{total}, got {achieved} successes with {} errors",
errors.len()
)
}
Self::Cancelled(r) => write!(f, "quorum cancelled: {r}"),
Self::Panicked(p) => write!(f, "quorum panicked: {p}"),
Self::InvalidQuorum { required, total } => {
write!(
f,
"invalid quorum: required {required} exceeds total {total}"
)
}
}
}
}
impl<E: fmt::Debug + fmt::Display> std::error::Error for QuorumError<E> {}
#[derive(Debug)]
pub struct QuorumResult<T, E> {
pub quorum_met: bool,
pub required: usize,
pub successes: Vec<(usize, T)>,
pub failures: Vec<(usize, QuorumFailure<E>)>,
pub has_cancellation: bool,
pub has_panic: bool,
}
#[derive(Debug, Clone)]
pub enum QuorumFailure<E> {
Error(E),
Cancelled(CancelReason),
Panicked(PanicPayload),
}
impl<E: fmt::Display> fmt::Display for QuorumFailure<E> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Error(e) => write!(f, "error: {e}"),
Self::Cancelled(r) => write!(f, "cancelled: {r}"),
Self::Panicked(p) => write!(f, "panicked: {p}"),
}
}
}
impl<T, E> QuorumResult<T, E> {
#[must_use]
pub fn new(
quorum_met: bool,
required: usize,
successes: Vec<(usize, T)>,
failures: Vec<(usize, QuorumFailure<E>)>,
) -> Self {
let has_cancellation = failures
.iter()
.any(|(_, f)| matches!(f, QuorumFailure::Cancelled(_)));
let has_panic = failures
.iter()
.any(|(_, f)| matches!(f, QuorumFailure::Panicked(_)));
Self {
quorum_met,
required,
successes,
failures,
has_cancellation,
has_panic,
}
}
#[must_use]
pub const fn is_success(&self) -> bool {
self.quorum_met
}
#[must_use]
pub fn success_count(&self) -> usize {
self.successes.len()
}
#[must_use]
pub fn failure_count(&self) -> usize {
self.failures.len()
}
#[must_use]
pub fn total(&self) -> usize {
self.successes.len() + self.failures.len()
}
}
#[must_use]
pub fn quorum_outcomes<T, E>(required: usize, outcomes: Vec<Outcome<T, E>>) -> QuorumResult<T, E> {
if required == 0 {
let failures: Vec<_> = outcomes
.into_iter()
.enumerate()
.map(|(i, o)| match o {
Outcome::Ok(_) => (i, QuorumFailure::Cancelled(CancelReason::quorum_met())),
Outcome::Err(e) => (i, QuorumFailure::Error(e)),
Outcome::Cancelled(r) => (i, QuorumFailure::Cancelled(r)),
Outcome::Panicked(p) => (i, QuorumFailure::Panicked(p)),
})
.collect();
return QuorumResult::new(true, required, Vec::new(), failures);
}
let total = outcomes.len();
let mut successes = Vec::with_capacity(total);
let mut failures = Vec::with_capacity(total);
for (i, outcome) in outcomes.into_iter().enumerate() {
match outcome {
Outcome::Ok(v) => {
successes.push((i, v));
}
Outcome::Err(e) => {
failures.push((i, QuorumFailure::Error(e)));
}
Outcome::Cancelled(r) => {
failures.push((i, QuorumFailure::Cancelled(r)));
}
Outcome::Panicked(p) => {
failures.push((i, QuorumFailure::Panicked(p)));
}
}
}
let quorum_met = successes.len() >= required;
QuorumResult::new(quorum_met, required, successes, failures)
}
#[must_use]
pub const fn quorum_still_possible(
required: usize,
total: usize,
successes: usize,
failures: usize,
) -> bool {
let remaining = total.saturating_sub(successes).saturating_sub(failures);
successes + remaining >= required
}
#[must_use]
pub const fn quorum_achieved(required: usize, successes: usize) -> bool {
successes >= required
}
pub fn quorum_to_result<T, E>(result: QuorumResult<T, E>) -> Result<Vec<T>, QuorumError<E>> {
let total = result.total();
if result.required > total {
return Err(QuorumError::InvalidQuorum {
required: result.required,
total,
});
}
if result.required == 0 {
return Ok(Vec::new());
}
for (_, failure) in &result.failures {
if let QuorumFailure::Panicked(p) = failure {
return Err(QuorumError::Panicked(p.clone()));
}
}
if result.quorum_met {
Ok(result.successes.into_iter().map(|(_, v)| v).collect())
} else {
for (_, failure) in &result.failures {
if let QuorumFailure::Cancelled(r) = failure {
if !matches!(r.kind(), crate::types::cancel::CancelKind::RaceLost) {
return Err(QuorumError::Cancelled(r.clone()));
}
}
}
let success_count = result.success_count();
let required = result.required;
let errors: Vec<E> = result
.failures
.into_iter()
.filter_map(|(_, f)| match f {
QuorumFailure::Error(e) => Some(e),
_ => None,
})
.collect();
Err(QuorumError::InsufficientSuccesses {
required,
total,
achieved: success_count,
errors,
})
}
}
impl CancelReason {
#[must_use]
pub fn quorum_met() -> Self {
Self::race_loser()
}
}
#[cfg(test)]
mod tests {
use super::*;
use proptest::prelude::*;
#[derive(Debug, Clone)]
enum QuorumCase {
Ok(i32),
ErrAlpha,
ErrBeta,
CancelUser,
CancelTimeout,
CancelShutdown,
Panic,
}
impl QuorumCase {
fn into_outcome(self) -> Outcome<i32, &'static str> {
match self {
Self::Ok(value) => Outcome::Ok(value),
Self::ErrAlpha => Outcome::Err("err-alpha"),
Self::ErrBeta => Outcome::Err("err-beta"),
Self::CancelUser => Outcome::Cancelled(CancelReason::user("user")),
Self::CancelTimeout => Outcome::Cancelled(CancelReason::timeout()),
Self::CancelShutdown => Outcome::Cancelled(CancelReason::shutdown()),
Self::Panic => Outcome::Panicked(PanicPayload::new("boom")),
}
}
}
fn quorum_case_strategy() -> impl Strategy<Value = QuorumCase> {
prop_oneof![
any::<i16>().prop_map(|value| QuorumCase::Ok(i32::from(value))),
Just(QuorumCase::ErrAlpha),
Just(QuorumCase::ErrBeta),
Just(QuorumCase::CancelUser),
Just(QuorumCase::CancelTimeout),
Just(QuorumCase::CancelShutdown),
Just(QuorumCase::Panic),
]
}
fn failure_signature(
failure: &QuorumFailure<&'static str>,
) -> (&'static str, Option<u8>, Option<&'static str>) {
match failure {
QuorumFailure::Error(error) => ("err", None, Some(*error)),
QuorumFailure::Cancelled(reason) => ("cancelled", Some(reason.severity()), None),
QuorumFailure::Panicked(_) => ("panic", None, None),
}
}
fn ordered_projection(
result: &QuorumResult<i32, &'static str>,
) -> Vec<(&'static str, Option<i32>, Option<u8>, Option<&'static str>)> {
let mut projection = vec![("unassigned", None, None, None); result.total()];
for (index, value) in &result.successes {
projection[*index] = ("ok", Some(*value), None, None);
}
for (index, failure) in &result.failures {
projection[*index] = match failure {
QuorumFailure::Error(error) => ("err", None, None, Some(*error)),
QuorumFailure::Cancelled(reason) => {
("cancelled", None, Some(reason.severity()), None)
}
QuorumFailure::Panicked(_) => ("panic", None, None, None),
};
}
projection
}
fn quorum_to_result_signature(
result: QuorumResult<i32, &'static str>,
) -> (&'static str, usize, Vec<i32>, Vec<&'static str>) {
match quorum_to_result(result) {
Ok(mut values) => {
values.sort_unstable();
("ok", values.len(), values, Vec::new())
}
Err(QuorumError::InvalidQuorum { required, total }) => {
("invalid", required + total, Vec::new(), Vec::new())
}
Err(QuorumError::Panicked(_)) => ("panic", 0, Vec::new(), Vec::new()),
Err(QuorumError::Cancelled(_)) => ("cancelled", 0, Vec::new(), Vec::new()),
Err(QuorumError::InsufficientSuccesses {
achieved,
mut errors,
..
}) => {
errors.sort_unstable();
("insufficient", achieved, Vec::new(), errors)
}
}
}
#[test]
fn quorum_all_succeed() {
let outcomes: Vec<Outcome<i32, &str>> =
vec![Outcome::Ok(1), Outcome::Ok(2), Outcome::Ok(3)];
let result = quorum_outcomes(2, outcomes);
assert!(result.quorum_met);
assert_eq!(result.success_count(), 3);
assert_eq!(result.failure_count(), 0);
}
#[test]
fn quorum_exact_meet() {
let outcomes: Vec<Outcome<i32, &str>> =
vec![Outcome::Ok(1), Outcome::Err("e1"), Outcome::Ok(2)];
let result = quorum_outcomes(2, outcomes);
assert!(result.quorum_met);
assert_eq!(result.success_count(), 2);
assert_eq!(result.failure_count(), 1);
}
#[test]
fn quorum_not_met() {
let outcomes: Vec<Outcome<i32, &str>> =
vec![Outcome::Ok(1), Outcome::Err("e1"), Outcome::Err("e2")];
let result = quorum_outcomes(2, outcomes);
assert!(!result.quorum_met);
assert_eq!(result.success_count(), 1);
assert_eq!(result.failure_count(), 2);
}
#[test]
fn quorum_zero_required() {
let outcomes: Vec<Outcome<i32, &str>> =
vec![Outcome::Ok(1), Outcome::Ok(2), Outcome::Ok(3)];
let result = quorum_outcomes(0, outcomes);
assert!(result.quorum_met);
assert_eq!(result.success_count(), 0);
assert_eq!(result.failure_count(), 3);
}
#[test]
fn quorum_zero_to_result_returns_empty_even_if_losers_panic() {
let outcomes: Vec<Outcome<i32, &str>> = vec![
Outcome::Err("e1"),
Outcome::Panicked(PanicPayload::new("boom")),
Outcome::Cancelled(CancelReason::timeout()),
];
let result = quorum_outcomes(0, outcomes);
let values = quorum_to_result(result).expect("quorum(0, N) should succeed");
assert!(values.is_empty());
}
#[test]
fn quorum_n_of_n_is_join() {
let outcomes: Vec<Outcome<i32, &str>> =
vec![Outcome::Ok(1), Outcome::Ok(2), Outcome::Ok(3)];
let result = quorum_outcomes(3, outcomes);
assert!(result.quorum_met);
assert_eq!(result.success_count(), 3);
}
#[test]
fn quorum_n_of_n_one_fails() {
let outcomes: Vec<Outcome<i32, &str>> =
vec![Outcome::Ok(1), Outcome::Err("e"), Outcome::Ok(3)];
let result = quorum_outcomes(3, outcomes);
assert!(!result.quorum_met);
assert_eq!(result.success_count(), 2);
}
#[test]
fn quorum_1_of_n_is_race() {
let outcomes: Vec<Outcome<i32, &str>> =
vec![Outcome::Err("e1"), Outcome::Ok(2), Outcome::Err("e2")];
let result = quorum_outcomes(1, outcomes);
assert!(result.quorum_met);
assert_eq!(result.success_count(), 1);
}
#[test]
fn quorum_invalid_m_greater_than_n() {
let outcomes: Vec<Outcome<i32, &str>> = vec![Outcome::Ok(1), Outcome::Ok(2)];
let result = quorum_outcomes(5, outcomes);
assert!(!result.quorum_met);
let err = quorum_to_result(result).unwrap_err();
assert!(matches!(err, QuorumError::InvalidQuorum { .. }));
}
#[test]
fn quorum_with_cancellation() {
let outcomes: Vec<Outcome<i32, &str>> = vec![
Outcome::Ok(1),
Outcome::Cancelled(CancelReason::timeout()),
Outcome::Ok(2),
];
let result = quorum_outcomes(2, outcomes);
assert!(result.quorum_met);
assert!(result.has_cancellation);
}
#[test]
fn quorum_with_panic() {
let outcomes: Vec<Outcome<i32, &str>> = vec![
Outcome::Ok(1),
Outcome::Panicked(PanicPayload::new("boom")),
Outcome::Ok(2),
];
let result = quorum_outcomes(2, outcomes);
assert!(result.quorum_met);
assert!(result.has_panic);
}
#[test]
fn quorum_to_result_success() {
let outcomes: Vec<Outcome<i32, &str>> =
vec![Outcome::Ok(1), Outcome::Ok(2), Outcome::Err("e")];
let result = quorum_outcomes(2, outcomes);
let values = quorum_to_result(result);
assert!(values.is_ok());
let v = values.unwrap();
assert_eq!(v.len(), 2);
assert!(v.contains(&1));
assert!(v.contains(&2));
}
#[test]
fn quorum_to_result_insufficient() {
let outcomes: Vec<Outcome<i32, &str>> =
vec![Outcome::Ok(1), Outcome::Err("e1"), Outcome::Err("e2")];
let result = quorum_outcomes(2, outcomes);
let values = quorum_to_result(result);
assert!(values.is_err());
match values.unwrap_err() {
QuorumError::InsufficientSuccesses {
required,
achieved,
errors,
..
} => {
assert_eq!(required, 2);
assert_eq!(achieved, 1);
assert_eq!(errors.len(), 2);
}
_ => panic!("Expected InsufficientSuccesses"),
}
}
#[test]
fn quorum_to_result_panic() {
let outcomes: Vec<Outcome<i32, &str>> = vec![
Outcome::Ok(1),
Outcome::Panicked(PanicPayload::new("boom")),
Outcome::Err("e"),
];
let result = quorum_outcomes(3, outcomes);
let values = quorum_to_result(result);
assert!(values.is_err());
assert!(matches!(values.unwrap_err(), QuorumError::Panicked(_)));
}
#[test]
fn quorum_still_possible_test() {
assert!(quorum_still_possible(2, 3, 0, 0));
assert!(quorum_still_possible(2, 3, 1, 0));
assert!(quorum_still_possible(2, 3, 2, 0));
assert!(!quorum_still_possible(2, 3, 0, 2));
assert!(quorum_still_possible(2, 3, 1, 1));
}
#[test]
fn quorum_achieved_test() {
assert!(!quorum_achieved(2, 0));
assert!(!quorum_achieved(2, 1));
assert!(quorum_achieved(2, 2));
assert!(quorum_achieved(2, 3));
assert!(quorum_achieved(0, 0)); }
#[test]
fn quorum_error_display() {
let err: QuorumError<&str> = QuorumError::InsufficientSuccesses {
required: 2,
total: 3,
achieved: 1,
errors: vec!["e1", "e2"],
};
assert!(err.to_string().contains("needed 2/3"));
assert!(err.to_string().contains("got 1 successes"));
let err: QuorumError<&str> = QuorumError::Cancelled(CancelReason::timeout());
assert!(err.to_string().contains("cancelled"));
let err: QuorumError<&str> = QuorumError::Panicked(PanicPayload::new("boom"));
assert!(err.to_string().contains("panicked"));
let err: QuorumError<&str> = QuorumError::InvalidQuorum {
required: 5,
total: 3,
};
assert!(err.to_string().contains("invalid quorum"));
}
#[test]
fn quorum_preserves_indices() {
let outcomes: Vec<Outcome<i32, &str>> = vec![
Outcome::Err("e0"),
Outcome::Ok(10),
Outcome::Err("e2"),
Outcome::Ok(30),
];
let result = quorum_outcomes(2, outcomes);
assert!(result.quorum_met);
assert!(result.successes.iter().any(|(i, v)| *i == 1 && *v == 10));
assert!(result.successes.iter().any(|(i, v)| *i == 3 && *v == 30));
assert!(result.failures.iter().any(|(i, _)| *i == 0));
assert!(result.failures.iter().any(|(i, _)| *i == 2));
}
#[test]
fn quorum_1_equals_race_semantics() {
let outcomes_success: Vec<Outcome<i32, &str>> =
vec![Outcome::Err("e1"), Outcome::Ok(2), Outcome::Err("e3")];
let result = quorum_outcomes(1, outcomes_success);
assert!(result.quorum_met);
let outcomes_fail: Vec<Outcome<i32, &str>> =
vec![Outcome::Err("e1"), Outcome::Err("e2"), Outcome::Err("e3")];
let result = quorum_outcomes(1, outcomes_fail);
assert!(!result.quorum_met);
}
#[test]
fn quorum_n_equals_join_semantics() {
let outcomes_success: Vec<Outcome<i32, &str>> =
vec![Outcome::Ok(1), Outcome::Ok(2), Outcome::Ok(3)];
let result = quorum_outcomes(3, outcomes_success);
assert!(result.quorum_met);
let outcomes_fail: Vec<Outcome<i32, &str>> =
vec![Outcome::Ok(1), Outcome::Err("e"), Outcome::Ok(3)];
let result = quorum_outcomes(3, outcomes_fail);
assert!(!result.quorum_met);
}
#[test]
fn quorum_monotone_in_required() {
let outcomes: Vec<Outcome<i32, &str>> =
vec![Outcome::Ok(1), Outcome::Ok(2), Outcome::Err("e")];
let result_2 = quorum_outcomes(2, outcomes.clone());
let result_1 = quorum_outcomes(1, outcomes.clone());
let result_3 = quorum_outcomes(3, outcomes);
assert!(result_1.quorum_met);
assert!(result_2.quorum_met);
assert!(!result_3.quorum_met);
}
#[test]
fn metamorphic_appending_error_losers_preserves_met_quorum_result() {
let base_outcomes: Vec<Outcome<i32, &str>> =
vec![Outcome::Ok(10), Outcome::Err("e1"), Outcome::Ok(20)];
let extended_outcomes: Vec<Outcome<i32, &str>> = vec![
Outcome::Ok(10),
Outcome::Err("e1"),
Outcome::Ok(20),
Outcome::Err("e2"),
Outcome::Err("e3"),
];
let base_result = quorum_outcomes(2, base_outcomes);
let extended_result = quorum_outcomes(2, extended_outcomes);
assert!(base_result.quorum_met);
assert!(extended_result.quorum_met);
let mut base_success_values = base_result
.successes
.iter()
.map(|(_, value)| *value)
.collect::<Vec<_>>();
let mut extended_success_values = extended_result
.successes
.iter()
.map(|(_, value)| *value)
.collect::<Vec<_>>();
base_success_values.sort_unstable();
extended_success_values.sort_unstable();
assert_eq!(base_success_values, vec![10, 20]);
assert_eq!(extended_success_values, base_success_values);
assert_eq!(
quorum_to_result_signature(base_result),
quorum_to_result_signature(extended_result)
);
}
#[test]
fn quorum_empty_outcomes() {
let outcomes: Vec<Outcome<i32, &str>> = vec![];
let result = quorum_outcomes(0, outcomes.clone());
assert!(result.quorum_met);
let result = quorum_outcomes(1, outcomes);
assert!(!result.quorum_met);
}
#[test]
fn quorum_error_debug_clone() {
let e: QuorumError<&str> = QuorumError::InsufficientSuccesses {
required: 3,
total: 5,
achieved: 1,
errors: vec!["e1"],
};
let e2 = e.clone();
let dbg = format!("{e:?}");
assert!(dbg.contains("InsufficientSuccesses"));
let dbg2 = format!("{e2:?}");
assert!(dbg2.contains("InsufficientSuccesses"));
}
#[test]
fn quorum_failure_debug_clone() {
let f: QuorumFailure<&str> = QuorumFailure::Error("bad");
let f2 = f.clone();
let dbg = format!("{f:?}");
assert!(dbg.contains("Error"));
let dbg2 = format!("{f2:?}");
assert!(dbg2.contains("Error"));
}
proptest! {
#[test]
fn metamorphic_quorum_rotation_preserves_projection_and_verdict(
cases in prop::collection::vec(quorum_case_strategy(), 1..12),
raw_required in 0usize..16,
raw_shift in 0usize..32,
) {
let shift = raw_shift % cases.len();
let required = raw_required % (cases.len() + 3);
let base_result = quorum_outcomes(
required,
cases
.iter()
.cloned()
.map(QuorumCase::into_outcome)
.collect::<Vec<_>>(),
);
let mut rotated_cases = cases.clone();
rotated_cases.rotate_left(shift);
let rotated_result = quorum_outcomes(
required,
rotated_cases
.iter()
.cloned()
.map(QuorumCase::into_outcome)
.collect::<Vec<_>>(),
);
prop_assert_eq!(base_result.quorum_met, rotated_result.quorum_met);
prop_assert_eq!(base_result.required, rotated_result.required);
prop_assert_eq!(base_result.total(), rotated_result.total());
prop_assert_eq!(base_result.has_cancellation, rotated_result.has_cancellation);
prop_assert_eq!(base_result.has_panic, rotated_result.has_panic);
let mut base_success_values = base_result
.successes
.iter()
.map(|(_, value)| *value)
.collect::<Vec<_>>();
let mut rotated_success_values = rotated_result
.successes
.iter()
.map(|(_, value)| *value)
.collect::<Vec<_>>();
base_success_values.sort_unstable();
rotated_success_values.sort_unstable();
prop_assert_eq!(
base_success_values,
rotated_success_values,
"rotating branch order must preserve the quorum success multiset"
);
let mut base_failure_signatures = base_result
.failures
.iter()
.map(|(_, failure)| failure_signature(failure))
.collect::<Vec<_>>();
let mut rotated_failure_signatures = rotated_result
.failures
.iter()
.map(|(_, failure)| failure_signature(failure))
.collect::<Vec<_>>();
base_failure_signatures.sort_unstable();
rotated_failure_signatures.sort_unstable();
prop_assert_eq!(
base_failure_signatures,
rotated_failure_signatures,
"rotating branch order must preserve the quorum failure multiset"
);
let mut expected_rotated_projection = ordered_projection(&base_result);
expected_rotated_projection.rotate_left(shift);
prop_assert_eq!(
ordered_projection(&rotated_result),
expected_rotated_projection,
"a quiescent quorum must preserve the branch projection under the same rotation"
);
prop_assert_eq!(
quorum_to_result_signature(base_result),
quorum_to_result_signature(rotated_result),
"rotating branch order must preserve the fail-fast quorum verdict class"
);
}
}
}