hakuban 0.8.5

Data-object sharing library
Documentation
use std::{
	backtrace::Backtrace,
	error::Error,
	fmt::{Debug, Display},
	pin::Pin,
	sync::Arc,
	time::Duration,
};

use colored::Colorize;
use futures::{future::try_join_all, stream::FuturesOrdered, Future, StreamExt, TryFutureExt};
use itertools::Itertools;
use log::debug;

type DynError = Box<dyn Error + Send + Sync + 'static>;

pub struct Shuffle<T: Clone + Sync + Send + 'static, V: Clone + Sync + Send + Debug + 'static> {
	//TODO: de-onionize; move Arc outside?
	variants: Vec<V>,
	setup_fn: Option<Arc<dyn Fn(usize, V) -> Box<dyn Future<Output = Result<T, DynError>> + Send + Sync> + Send + Sync + 'static>>,
	tasks: Vec<(&'static str, Arc<dyn Fn(Task, T) -> Box<dyn Future<Output = Result<(), DynError>> + Send + Sync> + Send + Sync + 'static>)>,
	teardown_fn: Option<Arc<dyn Fn(T) -> Box<dyn Future<Output = Result<(), DynError>> + Send + Sync> + Send + Sync + 'static>>,
	barrier_count: usize,
}

pub struct Task {
	pub name: &'static str,
	task_id: usize,
	standard_barrier: Arc<tokio::sync::Barrier>,
	barriers: Arc<Vec<tokio::sync::Barrier>>,
	next_barrier: usize,
	history: usize,
	tasks_count: usize,
}

impl<T: Clone + Sync + Send + 'static, V: Clone + Sync + Send + Debug + 'static> Shuffle<T, V> {
	pub fn new(fuzzy_barrier_count: usize, variants: Vec<V>) -> Shuffle<T, V> {
		Shuffle { setup_fn: None, tasks: Vec::new(), teardown_fn: None, barrier_count: fuzzy_barrier_count, variants }
	}

	pub fn task<R: Future<Output = Result<(), DynError>> + Send + Sync + 'static>(
		mut self,
		name: &'static str,
		lambda: impl (Fn(Task, T) -> R) + Send + Sync + Clone + 'static,
	) -> Shuffle<T, V> {
		self.tasks.push((name, Arc::new(move |task, t| Box::new(lambda(task, t)))));
		self
	}

	pub fn setup<Fut: Future<Output = Result<T, DynError>> + Send + Sync + 'static>(
		mut self,
		lambda: impl Fn(usize, V) -> Fut + Send + Sync + 'static,
	) -> Shuffle<T, V> {
		self.setup_fn = Some(Arc::new(move |tasks_count, variant| Box::new(lambda(tasks_count, variant))));
		self
	}

	pub fn teardown<E: Error + Sync + Send + 'static, Fut: Future<Output = Result<(), E>> + Send + Sync + 'static>(
		mut self,
		lambda: impl Fn(T) -> Fut + Send + Sync + 'static,
	) -> Shuffle<T, V> {
		self.teardown_fn = Some(Arc::new(move |shared| Box::new(lambda(shared).map_err(|error| Box::new(error) as DynError))));
		self
	}

	//TODO: timeout in any task hides fails
	pub async fn run(self, timeout: Duration) -> Result<(), Box<dyn Error + Sync + Send>> {
		//TODO: change this to report it at the end of proper test (if possible), or not at all for "should_panic" tests
		std::panic::set_hook(Box::new(|info| {
			let stacktrace = Backtrace::force_capture();
			println!(
				"▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒\n{}\nstacktrace:{}",
				info, stacktrace
			);
			std::process::abort();
		}));

		let histories_count = permutations::Permutations::new(self.tasks.len()).len().pow(self.barrier_count as u32);
		let histories_to_test: Vec<usize> = match std::env::var("HISTORY") {
			Ok(values) if !values.is_empty() => values.split(',').map(|value| value.parse().unwrap()).collect(),
			Ok(_) | Err(_) => (0..histories_count).collect(),
		};
		let mut join_handles = FuturesOrdered::new();
		for variant in self.variants.iter() {
			for i in &histories_to_test {
				let setup = self.setup_fn.clone();
				let teardown = self.teardown_fn.clone();
				let tasks = self.tasks.clone();
				let variant = variant.clone();
				let tasks_count = tasks.len();
				let history = *i;
				join_handles.push_back(
					tokio::spawn(async move {
						debug!("─────────────────────────────────────────────────────────  Running history {}", history);
						let standard_barrier = Arc::new(tokio::sync::Barrier::new(tasks_count));
						let barriers: Arc<Vec<_>> = Arc::new((0..self.barrier_count).map(|_| tokio::sync::Barrier::new(tasks_count)).collect());
						let setup_future = Box::into_pin(setup.unwrap()(tasks_count, variant));

						tokio::time::timeout(
							timeout,
							setup_future.and_then(|shared| async move {
								let result = try_join_all(tasks.iter().enumerate().map(|(task_id, task_plan)| {
									let task = Task {
										name: task_plan.0,
										barriers: barriers.clone(),
										standard_barrier: standard_barrier.clone(),
										next_barrier: 0,
										history,
										tasks_count,
										task_id,
									};
									Box::pin(
										tokio::spawn(Box::into_pin(task_plan.1(task, shared.clone())))
											.map_err(|join_error| Box::new(join_error) as DynError)
											.and_then(|x| async { x }),
									) as Pin<Box<dyn Future<Output = Result<(), DynError>> + Send + Sync>>
								}))
								.await
								.map(|_| ());

								if let Some(teardown) = teardown {
									Box::into_pin(teardown(shared)).await?;
								};
								result
							}),
						)
						.await
						.map_err(|err| Box::new(err) as DynError)
						.and_then(|x| x) //.map(|_|())
					})
					.map_err(|join_error| Box::new(join_error) as DynError)
					.and_then(|x| async { x }),
				);
			}
		}
		let mut fails_count = 0;
		let report = join_handles
			.collect::<Vec<Result<(), DynError>>>()
			.await
			.into_iter()
			.zip(self.variants.iter().flat_map(|variant| histories_to_test.iter().map(|history| vec![format!("{:?}", *variant), format!("{:3}", history)])))
			.map(|(result, history_and_variant)| match result {
				Ok(()) => (history_and_variant[0].clone(), format!("{} {}", history_and_variant[1], "pass".green())),
				Err(error) => {
					fails_count += 1;
					(history_and_variant[0].clone(), format!("{} {} {:?}", history_and_variant[1], "fail".red(), error))
				}
			})
			.chunk_by(|(variant, _history_result)| variant.clone())
			.into_iter()
			.map(|(variant, history_results)| format!("{variant:27}:    {}", history_results.map(|(_variant, history_result)| history_result).join(" ")))
			.join("\n");
		eprintln!("\n{}", report);

		if fails_count == 0 {
			Ok(())
		} else {
			Err(Box::new(std::io::Error::other(format!("{} histor-y/ies failed, run with RUST_LOG=debug to find out more", fails_count))))
		}
	}
}

impl Task {
	pub async fn fuzzy_barrier(&mut self) {
		self.barriers[self.next_barrier].wait().await;
		let permutations = permutations::Permutations::new(self.tasks_count);
		let permutations_per_barrier = permutations.len();
		let permutation_id = (self.history / permutations_per_barrier.pow(self.next_barrier as u32)).checked_rem_euclid(permutations_per_barrier).unwrap();
		let position = permutations.get(permutation_id).unwrap().apply(self.task_id);
		tokio::time::sleep(Duration::from_millis(100 * (position + 1) as u64)).await;
		if position == 0 {
			debug!("────────────────  Barrier {}", self.next_barrier);
		}
		debug!("Exiting fuzzy barrier {:2}, task {:10}", self.next_barrier, self.name);
		self.next_barrier += 1;
	}

	pub async fn barrier(&mut self) {
		self.standard_barrier.wait().await;
		debug!("Exiting classic barrier, task {:10}", self.name);
	}
}