use std::time::Duration;
use crate::error::CanoError;
use crate::task::TaskResult;
#[derive(Clone, Debug, PartialEq)]
pub enum JoinStrategy {
All,
Any,
Quorum(usize),
Percentage(f64),
PartialResults(usize),
PartialTimeout,
}
impl JoinStrategy {
pub fn is_satisfied(&self, completed: usize, total: usize) -> bool {
match self {
JoinStrategy::All => completed >= total,
JoinStrategy::Any => completed >= 1,
JoinStrategy::Quorum(n) => completed >= *n,
JoinStrategy::Percentage(p) => {
let required_f = (total as f64 * p).ceil();
let required = if required_f >= usize::MAX as f64 {
usize::MAX
} else {
required_f as usize
};
completed >= required
}
JoinStrategy::PartialResults(min) => completed >= *min,
JoinStrategy::PartialTimeout => completed >= 1, }
}
}
#[derive(Clone, Debug)]
pub struct SplitTaskResult<TState> {
pub task_index: usize,
pub result: Result<TaskResult<TState>, CanoError>,
}
#[derive(Clone, Debug)]
pub struct SplitResult<TState> {
pub successes: Vec<SplitTaskResult<TState>>,
pub errors: Vec<SplitTaskResult<TState>>,
pub cancelled: Vec<usize>,
}
impl<TState> SplitResult<TState> {
pub fn new() -> Self {
Self {
successes: Vec::new(),
errors: Vec::new(),
cancelled: Vec::new(),
}
}
pub fn with_capacity(total_tasks: usize) -> Self {
Self {
successes: Vec::with_capacity(total_tasks),
errors: Vec::with_capacity(total_tasks),
cancelled: Vec::with_capacity(total_tasks),
}
}
pub fn completed_count(&self) -> usize {
self.successes.len() + self.errors.len()
}
pub fn total_count(&self) -> usize {
self.successes.len() + self.errors.len() + self.cancelled.len()
}
}
impl<TState> Default for SplitResult<TState> {
fn default() -> Self {
Self::new()
}
}
#[must_use]
#[derive(Clone)]
pub struct JoinConfig<TState> {
pub strategy: JoinStrategy,
pub timeout: Option<Duration>,
pub join_state: TState,
pub bulkhead: Option<usize>,
}
impl<TState> JoinConfig<TState>
where
TState: Clone,
{
pub fn new(strategy: JoinStrategy, join_state: TState) -> Self {
Self {
strategy,
timeout: None,
join_state,
bulkhead: None,
}
}
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.timeout = Some(timeout);
self
}
pub fn with_join_state(mut self, state: TState) -> Self {
self.join_state = state;
self
}
pub fn with_bulkhead(mut self, n: usize) -> Self {
self.bulkhead = Some(n);
self
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_join_strategy_is_satisfied() {
assert!(JoinStrategy::All.is_satisfied(3, 3));
assert!(!JoinStrategy::All.is_satisfied(2, 3));
assert!(JoinStrategy::Any.is_satisfied(1, 3));
assert!(!JoinStrategy::Any.is_satisfied(0, 3));
assert!(JoinStrategy::Quorum(2).is_satisfied(2, 3));
assert!(JoinStrategy::Quorum(2).is_satisfied(3, 3));
assert!(!JoinStrategy::Quorum(2).is_satisfied(1, 3));
assert!(JoinStrategy::Percentage(0.5).is_satisfied(2, 4));
assert!(JoinStrategy::Percentage(0.75).is_satisfied(3, 4));
assert!(!JoinStrategy::Percentage(0.75).is_satisfied(2, 4));
assert!(JoinStrategy::PartialResults(2).is_satisfied(2, 4));
assert!(JoinStrategy::PartialResults(2).is_satisfied(3, 4));
assert!(!JoinStrategy::PartialResults(2).is_satisfied(1, 4));
assert!(JoinStrategy::PartialTimeout.is_satisfied(1, 4));
assert!(JoinStrategy::PartialTimeout.is_satisfied(3, 4));
assert!(!JoinStrategy::PartialTimeout.is_satisfied(0, 4));
}
#[test]
fn is_satisfied_boundary_cases() {
assert!(JoinStrategy::All.is_satisfied(0, 0));
assert!(!JoinStrategy::Any.is_satisfied(0, 0));
assert!(JoinStrategy::Quorum(0).is_satisfied(0, 3));
assert!(!JoinStrategy::Quorum(5).is_satisfied(3, 3));
assert!(JoinStrategy::PartialResults(0).is_satisfied(0, 4));
assert!(!JoinStrategy::PartialTimeout.is_satisfied(0, 4));
}
#[test]
fn percentage_uses_ceil_and_handles_one_and_zero_total() {
assert!(JoinStrategy::Percentage(1.0).is_satisfied(3, 3));
assert!(!JoinStrategy::Percentage(1.0).is_satisfied(2, 3));
assert!(!JoinStrategy::Percentage(0.5).is_satisfied(1, 3));
assert!(JoinStrategy::Percentage(0.5).is_satisfied(2, 3));
assert!(JoinStrategy::Percentage(0.5).is_satisfied(0, 0));
}
#[test]
fn percentage_saturation_guard_does_not_overflow() {
assert!(JoinStrategy::Percentage(1.0).is_satisfied(usize::MAX, usize::MAX));
assert!(!JoinStrategy::Percentage(1.0).is_satisfied(0, usize::MAX));
}
#[test]
fn split_result_counts() {
let mut r: SplitResult<u8> = SplitResult::new();
assert_eq!(r.completed_count(), 0);
assert_eq!(r.total_count(), 0);
r.successes.push(SplitTaskResult {
task_index: 0,
result: Ok(TaskResult::Single(1)),
});
r.successes.push(SplitTaskResult {
task_index: 1,
result: Ok(TaskResult::Single(2)),
});
r.errors.push(SplitTaskResult {
task_index: 2,
result: Err(CanoError::generic("boom")),
});
r.cancelled.push(3);
r.cancelled.push(4);
assert_eq!(r.completed_count(), 3, "2 successes + 1 error");
assert_eq!(r.total_count(), 5, "completed + 2 cancelled");
}
#[test]
fn split_result_default_and_with_capacity_start_empty() {
let d: SplitResult<u8> = SplitResult::default();
assert_eq!(d.total_count(), 0);
assert!(d.successes.is_empty() && d.errors.is_empty() && d.cancelled.is_empty());
let c: SplitResult<u8> = SplitResult::with_capacity(10);
assert_eq!(c.total_count(), 0);
}
#[test]
fn join_config_builder_defaults_and_setters() {
let base = JoinConfig::new(JoinStrategy::All, "done");
assert_eq!(base.strategy, JoinStrategy::All);
assert_eq!(base.join_state, "done");
assert_eq!(base.timeout, None);
assert_eq!(base.bulkhead, None);
let tuned = JoinConfig::new(JoinStrategy::Any, "done")
.with_timeout(Duration::from_millis(250))
.with_join_state("elsewhere")
.with_bulkhead(4);
assert_eq!(tuned.strategy, JoinStrategy::Any);
assert_eq!(tuned.timeout, Some(Duration::from_millis(250)));
assert_eq!(tuned.join_state, "elsewhere");
assert_eq!(tuned.bulkhead, Some(4));
}
}