use std::{
backtrace::Backtrace,
error::Error,
fmt::{Debug, Display},
pin::Pin,
sync::Arc,
time::Duration,
};
use colored::Colorize;
use futures::{future::try_join_all, stream::FuturesOrdered, Future, StreamExt, TryFutureExt};
use itertools::Itertools;
use log::debug;
type DynError = Box<dyn Error + Send + Sync + 'static>;
pub struct Shuffle<T: Clone + Sync + Send + 'static, V: Clone + Sync + Send + Debug + 'static> {
variants: Vec<V>,
setup_fn: Option<Arc<dyn Fn(usize, V) -> Box<dyn Future<Output = Result<T, DynError>> + Send + Sync> + Send + Sync + 'static>>,
tasks: Vec<(&'static str, Arc<dyn Fn(Task, T) -> Box<dyn Future<Output = Result<(), DynError>> + Send + Sync> + Send + Sync + 'static>)>,
teardown_fn: Option<Arc<dyn Fn(T) -> Box<dyn Future<Output = Result<(), DynError>> + Send + Sync> + Send + Sync + 'static>>,
barrier_count: usize,
}
pub struct Task {
pub name: &'static str,
task_id: usize,
standard_barrier: Arc<tokio::sync::Barrier>,
barriers: Arc<Vec<tokio::sync::Barrier>>,
next_barrier: usize,
history: usize,
tasks_count: usize,
}
impl<T: Clone + Sync + Send + 'static, V: Clone + Sync + Send + Debug + 'static> Shuffle<T, V> {
pub fn new(fuzzy_barrier_count: usize, variants: Vec<V>) -> Shuffle<T, V> {
Shuffle { setup_fn: None, tasks: Vec::new(), teardown_fn: None, barrier_count: fuzzy_barrier_count, variants }
}
pub fn task<R: Future<Output = Result<(), DynError>> + Send + Sync + 'static>(
mut self,
name: &'static str,
lambda: impl (Fn(Task, T) -> R) + Send + Sync + Clone + 'static,
) -> Shuffle<T, V> {
self.tasks.push((name, Arc::new(move |task, t| Box::new(lambda(task, t)))));
self
}
pub fn setup<Fut: Future<Output = Result<T, DynError>> + Send + Sync + 'static>(
mut self,
lambda: impl Fn(usize, V) -> Fut + Send + Sync + 'static,
) -> Shuffle<T, V> {
self.setup_fn = Some(Arc::new(move |tasks_count, variant| Box::new(lambda(tasks_count, variant))));
self
}
pub fn teardown<E: Error + Sync + Send + 'static, Fut: Future<Output = Result<(), E>> + Send + Sync + 'static>(
mut self,
lambda: impl Fn(T) -> Fut + Send + Sync + 'static,
) -> Shuffle<T, V> {
self.teardown_fn = Some(Arc::new(move |shared| Box::new(lambda(shared).map_err(|error| Box::new(error) as DynError))));
self
}
pub async fn run(self, timeout: Duration) -> Result<(), Box<dyn Error + Sync + Send>> {
std::panic::set_hook(Box::new(|info| {
let stacktrace = Backtrace::force_capture();
println!(
"▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒\n{}\nstacktrace:{}",
info, stacktrace
);
std::process::abort();
}));
let histories_count = permutations::Permutations::new(self.tasks.len()).len().pow(self.barrier_count as u32);
let histories_to_test: Vec<usize> = match std::env::var("HISTORY") {
Ok(values) if !values.is_empty() => values.split(',').map(|value| value.parse().unwrap()).collect(),
Ok(_) | Err(_) => (0..histories_count).collect(),
};
let mut join_handles = FuturesOrdered::new();
for variant in self.variants.iter() {
for i in &histories_to_test {
let setup = self.setup_fn.clone();
let teardown = self.teardown_fn.clone();
let tasks = self.tasks.clone();
let variant = variant.clone();
let tasks_count = tasks.len();
let history = *i;
join_handles.push_back(
tokio::spawn(async move {
debug!("───────────────────────────────────────────────────────── Running history {}", history);
let standard_barrier = Arc::new(tokio::sync::Barrier::new(tasks_count));
let barriers: Arc<Vec<_>> = Arc::new((0..self.barrier_count).map(|_| tokio::sync::Barrier::new(tasks_count)).collect());
let setup_future = Box::into_pin(setup.unwrap()(tasks_count, variant));
tokio::time::timeout(
timeout,
setup_future.and_then(|shared| async move {
let result = try_join_all(tasks.iter().enumerate().map(|(task_id, task_plan)| {
let task = Task {
name: task_plan.0,
barriers: barriers.clone(),
standard_barrier: standard_barrier.clone(),
next_barrier: 0,
history,
tasks_count,
task_id,
};
Box::pin(
tokio::spawn(Box::into_pin(task_plan.1(task, shared.clone())))
.map_err(|join_error| Box::new(join_error) as DynError)
.and_then(|x| async { x }),
) as Pin<Box<dyn Future<Output = Result<(), DynError>> + Send + Sync>>
}))
.await
.map(|_| ());
if let Some(teardown) = teardown {
Box::into_pin(teardown(shared)).await?;
};
result
}),
)
.await
.map_err(|err| Box::new(err) as DynError)
.and_then(|x| x) })
.map_err(|join_error| Box::new(join_error) as DynError)
.and_then(|x| async { x }),
);
}
}
let mut fails_count = 0;
let report = join_handles
.collect::<Vec<Result<(), DynError>>>()
.await
.into_iter()
.zip(self.variants.iter().flat_map(|variant| histories_to_test.iter().map(|history| vec![format!("{:?}", *variant), format!("{:3}", history)])))
.map(|(result, history_and_variant)| match result {
Ok(()) => (history_and_variant[0].clone(), format!("{} {}", history_and_variant[1], "pass".green())),
Err(error) => {
fails_count += 1;
(history_and_variant[0].clone(), format!("{} {} {:?}", history_and_variant[1], "fail".red(), error))
}
})
.chunk_by(|(variant, _history_result)| variant.clone())
.into_iter()
.map(|(variant, history_results)| format!("{variant:27}: {}", history_results.map(|(_variant, history_result)| history_result).join(" ")))
.join("\n");
eprintln!("\n{}", report);
if fails_count == 0 {
Ok(())
} else {
Err(Box::new(std::io::Error::other(format!("{} histor-y/ies failed, run with RUST_LOG=debug to find out more", fails_count))))
}
}
}
impl Task {
pub async fn fuzzy_barrier(&mut self) {
self.barriers[self.next_barrier].wait().await;
let permutations = permutations::Permutations::new(self.tasks_count);
let permutations_per_barrier = permutations.len();
let permutation_id = (self.history / permutations_per_barrier.pow(self.next_barrier as u32)).checked_rem_euclid(permutations_per_barrier).unwrap();
let position = permutations.get(permutation_id).unwrap().apply(self.task_id);
tokio::time::sleep(Duration::from_millis(100 * (position + 1) as u64)).await;
if position == 0 {
debug!("──────────────── Barrier {}", self.next_barrier);
}
debug!("Exiting fuzzy barrier {:2}, task {:10}", self.next_barrier, self.name);
self.next_barrier += 1;
}
pub async fn barrier(&mut self) {
self.standard_barrier.wait().await;
debug!("Exiting classic barrier, task {:10}", self.name);
}
}