use crate::bee::stock::{Caller, OnceCaller};
use crate::hive::{Builder, ChannelBuilder, Outcome, OutcomeBatch, TaskQueuesBuilder};
use std::fmt::Debug;
pub fn map<I, O, Inputs, F>(num_threads: usize, inputs: Inputs, f: F) -> Vec<O>
where
I: Send + Sync + 'static,
O: Send + Sync + 'static,
Inputs: IntoIterator<Item = I>,
F: FnMut(I) -> O + Send + Sync + Clone + 'static,
{
ChannelBuilder::default()
.num_threads(num_threads)
.with_worker(Caller::from(f))
.build()
.map(inputs)
.map(Outcome::unwrap)
.collect()
}
pub fn try_map<I, O, E, Inputs, F>(
num_threads: usize,
inputs: Inputs,
f: F,
) -> OutcomeBatch<OnceCaller<I, O, E, F>>
where
I: Send + Sync + 'static,
O: Send + Sync + 'static,
E: Send + Sync + Debug + 'static,
Inputs: IntoIterator<Item = I>,
F: FnMut(I) -> Result<O, E> + Send + Sync + Clone + 'static,
{
ChannelBuilder::default()
.num_threads(num_threads)
.with_worker(OnceCaller::from(f))
.build()
.map(inputs)
.into()
}
#[cfg(test)]
#[cfg_attr(coverage_nightly, coverage(off))]
mod tests {
use crate::hive::{Outcome, OutcomeStore};
#[test]
fn test_map() {
let outputs = super::map(4, 0..100, |i| i + 1);
assert_eq!(outputs, (1..=100).collect::<Vec<_>>());
}
#[test]
fn test_try_map() {
let result = super::try_map(
4,
0..100,
|i| {
if i == 50 { Err("Fiddy!") } else { Ok(i + 1) }
},
);
assert!(result.has_failures());
assert_eq!(1, result.num_failures());
assert!(matches!(
result.iter_failures().next().unwrap(),
Outcome::Failure { .. }
));
assert_eq!(99, result.num_successes());
assert!(result.ok_or_unwrap_errors(true).is_err());
}
}
#[cfg(feature = "retry")]
pub use retry::try_map_retryable;
#[cfg(feature = "retry")]
mod retry {
use crate::bee::stock::RetryCaller;
use crate::bee::{ApplyError, Context};
use crate::hive::{Builder, ChannelBuilder, OutcomeBatch, TaskQueuesBuilder};
use std::fmt::Debug;
pub fn try_map_retryable<I, O, E, Inputs, F>(
num_threads: usize,
max_retries: u8,
inputs: Inputs,
f: F,
) -> OutcomeBatch<RetryCaller<I, O, E, F>>
where
I: Send + Sync + 'static,
O: Send + Sync + 'static,
E: Send + Sync + Debug + 'static,
Inputs: IntoIterator<Item = I>,
F: FnMut(I, &Context<I>) -> Result<O, ApplyError<I, E>> + Send + Sync + Clone + 'static,
{
ChannelBuilder::default()
.num_threads(num_threads)
.max_retries(max_retries)
.with_worker(RetryCaller::from(f))
.build()
.map(inputs)
.into()
}
#[cfg(test)]
#[cfg_attr(coverage_nightly, coverage(off))]
mod tests {
use crate::bee::ApplyError;
use crate::hive::{Outcome, OutcomeStore};
#[test]
fn test_try_map_retryable() {
let result = super::try_map_retryable(4, 3, 0..100, |i, ctx| {
if i != 50 {
Ok(i + 1)
} else if ctx.attempt() == 3 {
Ok(500)
} else {
Err(ApplyError::Retryable {
input: 50,
error: format!("Fiddy {}", ctx.attempt()),
})
}
});
assert!(!result.has_failures());
}
#[test]
fn test_try_map_retyrable_fail() {
let result = super::try_map_retryable(4, 3, 0..100, |i, ctx| {
if i != 50 {
Ok(i + 1)
} else {
Err(ApplyError::Retryable {
input: 50,
error: format!("Fiddy {}", ctx.attempt()),
})
}
});
assert!(result.has_failures());
assert!(result.num_failures() == 1);
assert!(matches!(
result.iter_failures().next().unwrap(),
Outcome::MaxRetriesAttempted { .. }
))
}
}
}