use super::Outcome;
use crate::bee::{TaskId, Worker};
use std::collections::{BTreeSet, HashMap, VecDeque};
pub type TaskResult<W> = Result<<W as Worker>::Output, <W as Worker>::Error>;
impl<W: Worker> From<Outcome<W>> for TaskResult<W> {
fn from(value: Outcome<W>) -> TaskResult<W> {
if let Outcome::Success { value, .. } = value {
Ok(value)
} else {
Err(value.try_into_error().expect("not an error outcome"))
}
}
}
pub struct UnorderedOutcomeIterator<W: Worker> {
inner: Box<dyn Iterator<Item = Outcome<W>>>,
task_ids: BTreeSet<TaskId>,
}
impl<W: Worker> UnorderedOutcomeIterator<W> {
pub fn new<T, I: IntoIterator<Item = TaskId>>(inner: T, task_ids: I) -> Self
where
T: IntoIterator<Item = Outcome<W>>,
T::IntoIter: 'static,
{
let task_ids: BTreeSet<_> = task_ids.into_iter().collect();
Self {
inner: Box::new(inner.into_iter()),
task_ids,
}
}
}
impl<W: Worker> Iterator for UnorderedOutcomeIterator<W> {
type Item = Outcome<W>;
fn next(&mut self) -> Option<Self::Item> {
if self.task_ids.is_empty() {
return None;
}
loop {
match self.inner.next() {
Some(outcome) if self.task_ids.remove(outcome.task_id()) => break Some(outcome),
None => {
break Some(Outcome::Missing {
task_id: self.task_ids.pop_first().unwrap(),
});
}
_ => continue, }
}
.inspect(|outcome| {
if let Some(subtask_ids) = outcome.subtask_ids() {
self.task_ids.extend(subtask_ids);
}
})
}
}
pub struct OrderedOutcomeIterator<W: Worker> {
inner: Box<dyn Iterator<Item = Outcome<W>>>,
task_ids: VecDeque<TaskId>,
buf: HashMap<TaskId, Outcome<W>>,
}
impl<W: Worker> OrderedOutcomeIterator<W> {
pub fn new<T, I: IntoIterator<Item = TaskId>>(inner: T, task_ids: I) -> Self
where
T: IntoIterator<Item = Outcome<W>>,
T::IntoIter: 'static,
{
let task_ids: VecDeque<TaskId> = task_ids.into_iter().collect();
Self {
inner: Box::new(inner.into_iter()),
buf: HashMap::with_capacity(task_ids.len()),
task_ids,
}
}
}
impl<W: Worker> Iterator for OrderedOutcomeIterator<W> {
type Item = Outcome<W>;
fn next(&mut self) -> Option<Self::Item> {
loop {
if let Some(next) = self.task_ids.front() {
if let Some(outcome) = self.buf.remove(next).or_else(|| self.inner.next()) {
let task_id = outcome.task_id();
if task_id == next {
self.task_ids.pop_front();
break Some(outcome);
} else {
self.buf.insert(*task_id, outcome);
continue;
}
} else {
break Some(Outcome::Missing {
task_id: self.task_ids.pop_front().unwrap(),
});
}
}
break None;
}
.inspect(|outcome| {
if let Some(subtask_ids) = outcome.subtask_ids() {
self.task_ids.extend(subtask_ids);
}
})
}
}
pub trait OutcomeIteratorExt<W: Worker>: IntoIterator<Item = Outcome<W>> + Sized {
fn select_unordered<I>(self, task_ids: I) -> impl Iterator<Item = Outcome<W>>
where
I: IntoIterator<Item = TaskId>,
<Self as IntoIterator>::IntoIter: 'static,
{
UnorderedOutcomeIterator::new(self, task_ids)
}
fn select_ordered<I>(self, task_ids: I) -> impl Iterator<Item = Outcome<W>>
where
I: IntoIterator<Item = TaskId>,
<Self as IntoIterator>::IntoIter: 'static,
{
OrderedOutcomeIterator::new(self, task_ids)
}
fn into_results(self) -> impl Iterator<Item = TaskResult<W>>
where
<Self as IntoIterator>::IntoIter: 'static,
{
self.into_iter().map(Outcome::into)
}
fn select_unordered_results<I>(self, task_ids: I) -> impl Iterator<Item = TaskResult<W>>
where
I: IntoIterator<Item = TaskId>,
<Self as IntoIterator>::IntoIter: 'static,
{
UnorderedOutcomeIterator::new(self, task_ids).map(Outcome::into)
}
fn select_ordered_results<I>(self, task_ids: I) -> impl Iterator<Item = TaskResult<W>>
where
I: IntoIterator<Item = TaskId>,
<Self as IntoIterator>::IntoIter: 'static,
{
OrderedOutcomeIterator::new(self, task_ids).map(Outcome::into)
}
fn into_outputs(self) -> impl Iterator<Item = W::Output>
where
<Self as IntoIterator>::IntoIter: 'static,
{
self.into_iter().map(Outcome::unwrap)
}
fn select_unordered_outputs<I>(self, task_ids: I) -> impl Iterator<Item = W::Output>
where
I: IntoIterator<Item = TaskId>,
<Self as IntoIterator>::IntoIter: 'static,
{
UnorderedOutcomeIterator::new(self, task_ids).map(Outcome::unwrap)
}
fn select_ordered_outputs<I>(self, task_ids: I) -> impl Iterator<Item = W::Output>
where
I: IntoIterator<Item = TaskId>,
<Self as IntoIterator>::IntoIter: 'static,
{
OrderedOutcomeIterator::new(self, task_ids).map(Outcome::unwrap)
}
}
impl<W: Worker, T: IntoIterator<Item = Outcome<W>>> OutcomeIteratorExt<W> for T {}
#[cfg(test)]
#[cfg_attr(coverage_nightly, coverage(off))]
mod tests {
use super::{OrderedOutcomeIterator, UnorderedOutcomeIterator};
use crate::bee::stock::EchoWorker;
use crate::hive::Outcome;
type Worker = EchoWorker<usize>;
type WorkerOutcome = Outcome<Worker>;
#[test]
fn test_unordered_missing() {
let outcomes = vec![
WorkerOutcome::Success {
value: 2,
task_id: 2,
},
WorkerOutcome::Success {
value: 1,
task_id: 1,
},
];
let unordered_outcomes: Vec<_> = UnorderedOutcomeIterator::new(outcomes, 0..3).collect();
assert_eq!(unordered_outcomes.len(), 3);
assert_eq!(
unordered_outcomes,
vec![
WorkerOutcome::Success {
value: 2,
task_id: 2
},
WorkerOutcome::Success {
value: 1,
task_id: 1
},
WorkerOutcome::Missing { task_id: 0 },
]
);
}
#[test]
fn test_ordered_missing() {
let outcomes = vec![
WorkerOutcome::Success {
value: 2,
task_id: 2,
},
WorkerOutcome::Success {
value: 1,
task_id: 1,
},
];
let ordered_outcomes: Vec<_> = OrderedOutcomeIterator::new(outcomes, 0..3).collect();
assert_eq!(ordered_outcomes.len(), 3);
assert_eq!(
ordered_outcomes,
vec![
WorkerOutcome::Missing { task_id: 0 },
WorkerOutcome::Success {
value: 1,
task_id: 1
},
WorkerOutcome::Success {
value: 2,
task_id: 2
},
]
);
}
}