use std::sync::Arc;
use crossbeam_queue::SegQueue;
use qubit_function::{
Callable,
Runnable,
};
use crate::{
BatchExecutionError,
BatchOutcome,
};
use super::{
BatchCallResult,
callable_task::CallableTask,
for_each_task::ForEachTask,
};
pub trait BatchExecutor: Send + Sync {
fn execute<T, E, I>(
&self,
tasks: I,
count: usize,
) -> Result<BatchOutcome<E>, BatchExecutionError<E>>
where
I: IntoIterator<Item = T>,
T: Runnable<E> + Send,
E: Send;
fn call<C, R, E, I>(
&self,
tasks: I,
count: usize,
) -> Result<BatchCallResult<R, E>, BatchExecutionError<E>>
where
I: IntoIterator<Item = C>,
C: Callable<R, E> + Send,
R: Send,
E: Send,
{
let outputs = Arc::new(SegQueue::new());
let runnable_tasks = tasks.into_iter().enumerate().map({
let outputs = Arc::clone(&outputs);
move |(index, callable)| CallableTask::new(callable, index, Arc::clone(&outputs))
});
let outcome = self.execute(runnable_tasks, count)?;
let values = collect_call_outputs(outputs, count);
Ok(BatchCallResult::new(outcome, values))
}
fn for_each<Item, E, I, F>(
&self,
items: I,
count: usize,
action: F,
) -> Result<BatchOutcome<E>, BatchExecutionError<E>>
where
I: IntoIterator<Item = Item>,
Item: Send,
F: Fn(Item) -> Result<(), E> + Send + Sync,
E: Send,
{
let action = Arc::new(action);
let tasks = items
.into_iter()
.map(move |item| ForEachTask::new(item, Arc::clone(&action)));
self.execute(tasks, count)
}
}
fn collect_call_outputs<R>(outputs: Arc<SegQueue<(usize, R)>>, count: usize) -> Vec<Option<R>> {
let outputs = match Arc::try_unwrap(outputs) {
Ok(outputs) => outputs,
Err(_) => panic!("callable output queue should have a single owner after execution"),
};
let mut values = Vec::with_capacity(count);
values.resize_with(count, || None);
while let Some((index, value)) = outputs.pop() {
let slot = values
.get_mut(index)
.expect("callable index must be within the declared count");
*slot = Some(value);
}
values
}