extern crate std;
use alloc::sync::Arc;
use alloc::vec::Vec;
use core::sync::atomic::{AtomicBool, AtomicU8, Ordering};
use std::thread::JoinHandle;
use super::{STAGE_PANICKED, STAGE_RUNNING};
pub struct Pipeline {
pub(super) handles: Vec<JoinHandle<()>>,
pub(super) shutdown: Arc<AtomicBool>,
pub(super) statuses: Vec<Arc<AtomicU8>>,
}
impl Pipeline {
pub fn builder() -> super::builder::PipelineBuilder {
super::builder::PipelineBuilder::new()
}
pub fn shutdown(&self) {
self.shutdown.store(true, Ordering::Release);
}
pub fn join(mut self) {
let handles = core::mem::take(&mut self.handles);
let mut first_panic = None;
for h in handles {
if let Err(e) = h.join() {
if first_panic.is_none() {
first_panic = Some(e);
}
}
}
if let Some(panic) = first_panic {
std::panic::resume_unwind(panic);
}
}
pub fn try_join(mut self) -> Result<(), alloc::boxed::Box<dyn core::any::Any + Send>> {
let handles = core::mem::take(&mut self.handles);
let mut first_panic = None;
for h in handles {
if let Err(e) = h.join() {
if first_panic.is_none() {
first_panic = Some(e);
}
}
}
match first_panic {
Some(panic) => Err(panic),
None => Ok(()),
}
}
pub fn panicked_stages(&self) -> Vec<usize> {
self.statuses
.iter()
.enumerate()
.filter(|(_, s)| s.load(Ordering::Acquire) == STAGE_PANICKED)
.map(|(i, _)| i)
.collect()
}
pub fn is_healthy(&self) -> bool {
self.statuses
.iter()
.all(|s| s.load(Ordering::Acquire) == STAGE_RUNNING)
}
pub fn stage_count(&self) -> usize {
self.statuses.len()
}
}
impl Drop for Pipeline {
fn drop(&mut self) {
self.shutdown.store(true, Ordering::Release);
}
}