use std::time::Duration;
use crate::error::CanoError;
use crate::task::TaskResult;
#[derive(Clone, Debug, PartialEq)]
pub enum JoinStrategy {
All,
Any,
Quorum(usize),
Percentage(f64),
PartialResults(usize),
PartialTimeout,
}
impl JoinStrategy {
pub fn is_satisfied(&self, completed: usize, total: usize) -> bool {
match self {
JoinStrategy::All => completed >= total,
JoinStrategy::Any => completed >= 1,
JoinStrategy::Quorum(n) => completed >= *n,
JoinStrategy::Percentage(p) => {
let required_f = (total as f64 * p).ceil();
let required = if required_f >= usize::MAX as f64 {
usize::MAX
} else {
required_f as usize
};
completed >= required
}
JoinStrategy::PartialResults(min) => completed >= *min,
JoinStrategy::PartialTimeout => completed >= 1, }
}
}
#[derive(Clone, Debug)]
pub struct SplitTaskResult<TState> {
pub task_index: usize,
pub result: Result<TaskResult<TState>, CanoError>,
}
#[derive(Clone, Debug)]
pub struct SplitResult<TState> {
pub successes: Vec<SplitTaskResult<TState>>,
pub errors: Vec<SplitTaskResult<TState>>,
pub cancelled: Vec<usize>,
}
impl<TState> SplitResult<TState> {
pub fn new() -> Self {
Self {
successes: Vec::new(),
errors: Vec::new(),
cancelled: Vec::new(),
}
}
pub fn with_capacity(total_tasks: usize) -> Self {
Self {
successes: Vec::with_capacity(total_tasks),
errors: Vec::with_capacity(total_tasks),
cancelled: Vec::with_capacity(total_tasks),
}
}
pub fn completed_count(&self) -> usize {
self.successes.len() + self.errors.len()
}
pub fn total_count(&self) -> usize {
self.successes.len() + self.errors.len() + self.cancelled.len()
}
}
impl<TState> Default for SplitResult<TState> {
fn default() -> Self {
Self::new()
}
}
#[must_use]
#[derive(Clone)]
pub struct JoinConfig<TState> {
pub strategy: JoinStrategy,
pub timeout: Option<Duration>,
pub join_state: TState,
pub bulkhead: Option<usize>,
}
impl<TState> JoinConfig<TState>
where
TState: Clone,
{
pub fn new(strategy: JoinStrategy, join_state: TState) -> Self {
Self {
strategy,
timeout: None,
join_state,
bulkhead: None,
}
}
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.timeout = Some(timeout);
self
}
pub fn with_join_state(mut self, state: TState) -> Self {
self.join_state = state;
self
}
pub fn with_bulkhead(mut self, n: usize) -> Self {
self.bulkhead = Some(n);
self
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_join_strategy_is_satisfied() {
assert!(JoinStrategy::All.is_satisfied(3, 3));
assert!(!JoinStrategy::All.is_satisfied(2, 3));
assert!(JoinStrategy::Any.is_satisfied(1, 3));
assert!(!JoinStrategy::Any.is_satisfied(0, 3));
assert!(JoinStrategy::Quorum(2).is_satisfied(2, 3));
assert!(JoinStrategy::Quorum(2).is_satisfied(3, 3));
assert!(!JoinStrategy::Quorum(2).is_satisfied(1, 3));
assert!(JoinStrategy::Percentage(0.5).is_satisfied(2, 4));
assert!(JoinStrategy::Percentage(0.75).is_satisfied(3, 4));
assert!(!JoinStrategy::Percentage(0.75).is_satisfied(2, 4));
assert!(JoinStrategy::PartialResults(2).is_satisfied(2, 4));
assert!(JoinStrategy::PartialResults(2).is_satisfied(3, 4));
assert!(!JoinStrategy::PartialResults(2).is_satisfied(1, 4));
assert!(JoinStrategy::PartialTimeout.is_satisfied(1, 4));
assert!(JoinStrategy::PartialTimeout.is_satisfied(3, 4));
assert!(!JoinStrategy::PartialTimeout.is_satisfied(0, 4));
}
}