use super::{
Config, DerefOutcomes, Hive, OpenBuilder, Outcome, OutcomeBatch, OutcomeSender, OutcomeStore,
OwnedOutcomes, TaskQueues,
};
use crate::bee::{Queen, TaskId, Worker};
use derive_more::Debug;
use std::any;
use std::collections::HashMap;
use std::ops::{Deref, DerefMut};
#[derive(Debug)]
pub struct Husk<Q: Queen> {
config: Config,
#[debug("{}", any::type_name::<Q>())]
queen: Q,
num_panics: usize,
#[debug(skip)]
outcomes: HashMap<TaskId, Outcome<Q::Kind>>,
}
impl<Q: Queen> Husk<Q> {
pub(super) fn new(
config: Config,
queen: Q,
num_panics: usize,
outcomes: HashMap<TaskId, Outcome<Q::Kind>>,
) -> Self {
Self {
config,
queen,
num_panics,
outcomes,
}
}
pub fn queen(&self) -> &Q {
&self.queen
}
pub fn num_panics(&self) -> usize {
self.num_panics
}
pub fn into_parts(self) -> (Q, OutcomeBatch<Q::Kind>) {
(self.queen, OutcomeBatch::new(self.outcomes))
}
pub fn as_builder(&self) -> OpenBuilder {
OpenBuilder::from(self.config.clone())
}
pub fn into_hive<T: TaskQueues<Q::Kind>>(self) -> Hive<Q, T> {
self.as_builder()
.with_queen(self.queen)
.with_queues::<T>()
.build()
}
pub fn into_hive_swarm_send_unprocessed<T: TaskQueues<Q::Kind>>(
mut self,
tx: &OutcomeSender<Q::Kind>,
) -> (Hive<Q, T>, Vec<TaskId>) {
let unprocessed: Vec<_> = self
.remove_all_unprocessed()
.into_iter()
.map(|(_, input)| input)
.collect();
let hive = self
.as_builder()
.with_queen(self.queen)
.with_queues::<T>()
.build();
let task_ids = hive.swarm_send(unprocessed, tx);
(hive, task_ids)
}
pub fn into_hive_swarm_store_unprocessed<T: TaskQueues<Q::Kind>>(
mut self,
) -> (Hive<Q, T>, Vec<TaskId>) {
let unprocessed: Vec<_> = self
.remove_all_unprocessed()
.into_iter()
.map(|(_, input)| input)
.collect();
let hive = self
.as_builder()
.with_queen(self.queen)
.with_queues::<T>()
.build();
let task_ids = hive.swarm_store(unprocessed);
(hive, task_ids)
}
}
impl<W: Worker, Q: Queen<Kind = W>> DerefOutcomes<Q::Kind> for Husk<Q> {
#[inline]
fn outcomes_deref(&self) -> impl Deref<Target = HashMap<TaskId, Outcome<Q::Kind>>> {
&self.outcomes
}
#[inline]
fn outcomes_deref_mut(&mut self) -> impl DerefMut<Target = HashMap<TaskId, Outcome<Q::Kind>>> {
&mut self.outcomes
}
}
impl<W: Worker, Q: Queen<Kind = W>> OwnedOutcomes<Q::Kind> for Husk<Q> {
#[inline]
fn outcomes(self) -> HashMap<TaskId, Outcome<Q::Kind>> {
self.outcomes
}
#[inline]
fn outcomes_ref(&self) -> &HashMap<TaskId, Outcome<Q::Kind>> {
&self.outcomes
}
}
#[cfg(test)]
#[cfg_attr(coverage_nightly, coverage(off))]
mod tests {
use crate::bee::stock::{PunkWorker, Thunk, ThunkWorker};
use crate::hive::ChannelTaskQueues;
use crate::hive::{
Builder, ChannelBuilder, Outcome, OutcomeIteratorExt, OutcomeStore, TaskQueuesBuilder,
outcome_channel,
};
#[test]
fn test_unprocessed() {
let hive = ChannelBuilder::empty()
.num_threads(0)
.with_worker_default::<ThunkWorker<u8>>()
.build();
let mut task_ids = hive.map_store((0..10).map(|i| Thunk::from(move || i)));
hive.suspend();
let mut husk = hive.try_into_husk(false).unwrap();
assert!(husk.has_unprocessed());
for i in task_ids.iter() {
assert!(husk.get(*i).unwrap().is_unprocessed());
}
assert_eq!(husk.iter_unprocessed().count(), 10);
let mut unprocessed_task_ids = husk
.iter_unprocessed()
.map(|(task_id, _)| *task_id)
.collect::<Vec<_>>();
task_ids.sort();
unprocessed_task_ids.sort();
assert_eq!(task_ids, unprocessed_task_ids);
assert_eq!(husk.remove_all_unprocessed().len(), 10);
}
#[test]
fn test_reprocess_unprocessed() {
let hive1 = ChannelBuilder::empty()
.num_threads(0)
.with_worker_default::<ThunkWorker<u8>>()
.build();
let _ = hive1.map_store((0..10).map(|i| Thunk::from(move || i)));
hive1.suspend();
let husk1 = hive1.try_into_husk(false).unwrap();
let (hive2, _) = husk1.into_hive_swarm_store_unprocessed::<ChannelTaskQueues<_>>();
hive2.grow(8).expect("error spawning threads");
hive2.join();
let husk2 = hive2.try_into_husk(false).unwrap();
assert!(!husk2.has_unprocessed());
assert!(husk2.has_successes());
assert_eq!(husk2.iter_successes().count(), 10);
}
#[test]
fn test_reprocess_unprocessed_to() {
let hive1 = ChannelBuilder::empty()
.num_threads(0)
.with_worker_default::<ThunkWorker<u8>>()
.build();
let _ = hive1.map_store((0..10).map(|i| Thunk::from(move || i)));
hive1.suspend();
let husk1 = hive1.try_into_husk(false).unwrap();
let (tx, rx) = outcome_channel();
let (hive2, task_ids) = husk1.into_hive_swarm_send_unprocessed::<ChannelTaskQueues<_>>(&tx);
hive2.grow(8).expect("error spawning threads");
hive2.join();
let husk2 = hive2.try_into_husk(false).unwrap();
assert!(husk2.is_empty());
let mut outputs = rx
.select_ordered(task_ids)
.map(Outcome::unwrap)
.collect::<Vec<_>>();
outputs.sort();
assert_eq!(outputs, (0..10).collect::<Vec<_>>());
}
#[test]
fn test_into_result() {
let hive = ChannelBuilder::empty()
.num_threads(4)
.with_worker_default::<ThunkWorker<u8>>()
.build();
hive.map_store((0..10).map(|i| Thunk::from(move || i)));
hive.join();
let mut outputs = hive.try_into_husk(false).unwrap().into_parts().1.unwrap();
outputs.sort();
assert_eq!(outputs, (0..10).collect::<Vec<_>>());
}
#[test]
#[should_panic]
fn test_into_result_panic() {
let hive = ChannelBuilder::empty()
.num_threads(4)
.with_worker_default::<PunkWorker<u8>>()
.build();
hive.map_store(
(0..10).map(|i| Thunk::from(move || if i == 5 { panic!("oh no!") } else { i })),
);
hive.join();
let (_, result) = hive.try_into_husk(false).unwrap().into_parts();
let _ = result.ok_or_unwrap_errors(true);
}
}