extern crate std;
mod builder;
mod fan_out;
mod pipeline;
pub use builder::{PipelineBuilder, StageBuilder};
pub use fan_out::FanOutBuilder;
pub use pipeline::Pipeline;
use crate::channel::{Publisher, Subscriber};
use crate::pod::Pod;
use crate::wait::WaitStrategy;
use alloc::sync::Arc;
use alloc::vec::Vec;
use core::sync::atomic::{AtomicBool, AtomicU8, Ordering};
use std::thread::{self, JoinHandle};
const DEFAULT_CAPACITY: usize = 1024;
const STAGE_RUNNING: u8 = 0;
const STAGE_COMPLETED: u8 = 1;
const STAGE_PANICKED: u8 = 2;
struct SharedState {
shutdown: Arc<AtomicBool>,
handles: Vec<JoinHandle<()>>,
statuses: Vec<Arc<AtomicU8>>,
}
impl SharedState {
fn new() -> Self {
SharedState {
shutdown: Arc::new(AtomicBool::new(false)),
handles: Vec::new(),
statuses: Vec::new(),
}
}
}
fn spawn_stage<T, U>(
mut input: Subscriber<T>,
mut output: Publisher<U>,
shutdown: Arc<AtomicBool>,
f: impl Fn(T) -> U + Send + 'static,
strategy: WaitStrategy,
) -> (Arc<AtomicU8>, JoinHandle<()>)
where
T: Pod,
U: Pod,
{
let status = Arc::new(AtomicU8::new(STAGE_RUNNING));
let status_inner = status.clone();
let handle = thread::spawn(move || {
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let mut iter: u32 = 0;
loop {
if shutdown.load(Ordering::Acquire) {
return;
}
match input.try_recv() {
Ok(value) => {
let out = f(value);
output.publish(out);
iter = 0;
}
Err(crate::channel::TryRecvError::Empty) => {
strategy.wait(iter);
iter = iter.saturating_add(1);
}
Err(crate::channel::TryRecvError::Lagged { .. }) => {
iter = 0;
}
}
}
}));
match result {
Ok(()) => status_inner.store(STAGE_COMPLETED, Ordering::Release),
Err(_) => status_inner.store(STAGE_PANICKED, Ordering::Release),
}
});
(status, handle)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::wait::WaitStrategy;
#[test]
fn single_stage_pipeline() {
let (mut pub_, stages) = Pipeline::builder().capacity(64).input::<u64>();
let (mut output, pipeline) = stages.then(|x: u64| x * 3).build();
pub_.publish(7);
assert_eq!(output.recv(), 21);
pipeline.shutdown();
pipeline.join();
}
#[test]
fn two_stage_pipeline() {
let (mut pub_, stages) = Pipeline::builder().capacity(64).input::<u64>();
let (mut output, pipeline) = stages.then(|x: u64| x * 2).then(|x: u64| x + 1).build();
pub_.publish(10);
assert_eq!(output.recv(), 21);
pipeline.shutdown();
pipeline.join();
}
#[test]
fn three_stage_pipeline() {
let (mut pub_, stages) = Pipeline::builder().capacity(64).input::<i64>();
let (mut output, pipeline) = stages
.then(|x: i64| x + 10)
.then(|x: i64| x * 2)
.then(|x: i64| x - 5)
.build();
pub_.publish(5);
assert_eq!(output.recv(), 25);
pipeline.shutdown();
pipeline.join();
}
#[test]
fn pipeline_multiple_messages() {
let (mut pub_, stages) = Pipeline::builder().capacity(256).input::<u64>();
let (mut output, pipeline) = stages.then(|x: u64| x + 1).build();
for i in 0..100u64 {
pub_.publish(i);
}
for i in 0..100u64 {
assert_eq!(output.recv(), i + 1);
}
pipeline.shutdown();
pipeline.join();
}
#[test]
fn pipeline_type_transform() {
#[derive(Clone, Copy)]
#[repr(C)]
struct Input {
value: f64,
}
unsafe impl crate::Pod for Input {}
#[derive(Clone, Copy, Debug, PartialEq)]
#[repr(C)]
struct Output {
doubled: f64,
positive: u8,
}
unsafe impl crate::Pod for Output {}
let (mut pub_, stages) = Pipeline::builder().capacity(64).input::<Input>();
let (mut output, pipeline) = stages
.then(|inp: Input| Output {
doubled: inp.value * 2.0,
positive: if inp.value > 0.0 { 1 } else { 0 },
})
.build();
pub_.publish(Input { value: 3.5 });
let out = output.recv();
assert_eq!(out.doubled, 7.0);
assert_eq!(out.positive, 1);
pub_.publish(Input { value: -1.0 });
let out = output.recv();
assert_eq!(out.doubled, -2.0);
assert_eq!(out.positive, 0);
pipeline.shutdown();
pipeline.join();
}
#[test]
fn fan_out_basic() {
let (mut pub_, stages) = Pipeline::builder().capacity(64).input::<u64>();
let ((mut out_a, mut out_b), pipeline) =
stages.fan_out(|x: u64| x * 2, |x: u64| x + 100).build();
pub_.publish(5);
assert_eq!(out_a.recv(), 10);
assert_eq!(out_b.recv(), 105);
pipeline.shutdown();
pipeline.join();
}
#[test]
fn fan_out_multiple_messages() {
let (mut pub_, stages) = Pipeline::builder().capacity(64).input::<u64>();
let ((mut out_a, mut out_b), pipeline) =
stages.fan_out(|x: u64| x * 10, |x: u64| x + 1).build();
for i in 0..50u64 {
pub_.publish(i);
}
for i in 0..50u64 {
assert_eq!(out_a.recv(), i * 10);
assert_eq!(out_b.recv(), i + 1);
}
pipeline.shutdown();
pipeline.join();
}
#[test]
fn fan_out_then_a() {
let (mut pub_, stages) = Pipeline::builder().capacity(64).input::<u64>();
let ((mut out_a, mut out_b), pipeline) = stages
.fan_out(|x: u64| x * 2, |x: u64| x + 100)
.then_a(|x: u64| x + 1)
.build();
pub_.publish(5);
assert_eq!(out_a.recv(), 11); assert_eq!(out_b.recv(), 105);
pipeline.shutdown();
pipeline.join();
}
#[test]
fn fan_out_then_b() {
let (mut pub_, stages) = Pipeline::builder().capacity(64).input::<u64>();
let ((mut out_a, mut out_b), pipeline) = stages
.fan_out(|x: u64| x * 2, |x: u64| x + 100)
.then_b(|x: u64| x * 3)
.build();
pub_.publish(5);
assert_eq!(out_a.recv(), 10); assert_eq!(out_b.recv(), 315);
pipeline.shutdown();
pipeline.join();
}
#[test]
fn fan_out_then_both() {
let (mut pub_, stages) = Pipeline::builder().capacity(64).input::<u64>();
let ((mut out_a, mut out_b), pipeline) = stages
.fan_out(|x: u64| x * 2, |x: u64| x + 100)
.then_a(|x: u64| x + 1)
.then_b(|x: u64| x * 3)
.build();
pub_.publish(5);
assert_eq!(out_a.recv(), 11); assert_eq!(out_b.recv(), 315);
pipeline.shutdown();
pipeline.join();
}
#[test]
fn pipeline_stage_count() {
let (_, stages) = Pipeline::builder().capacity(64).input::<u64>();
let (_, pipeline) = stages.then(|x: u64| x).then(|x: u64| x).build();
assert_eq!(pipeline.stage_count(), 2);
pipeline.shutdown();
pipeline.join();
}
#[test]
fn pipeline_is_healthy() {
let (_, stages) = Pipeline::builder().capacity(64).input::<u64>();
let (_, pipeline) = stages.then(|x: u64| x).build();
assert!(pipeline.is_healthy());
assert!(pipeline.panicked_stages().is_empty());
pipeline.shutdown();
pipeline.join();
}
#[test]
fn pipeline_detects_panic() {
let (mut pub_, stages) = Pipeline::builder().capacity(64).input::<u64>();
let (_, pipeline) = stages
.then(|x: u64| -> u64 {
if x == 42 {
panic!("test panic");
}
x
})
.build();
pub_.publish(42);
let mut panicked = false;
for _ in 0..100 {
if !pipeline.panicked_stages().is_empty() {
panicked = true;
break;
}
std::thread::sleep(std::time::Duration::from_millis(10));
}
assert!(panicked, "expected stage to detect panic");
assert_eq!(pipeline.panicked_stages(), alloc::vec![0]);
pipeline.shutdown();
pipeline.join();
}
#[test]
fn pipeline_default_builder() {
let builder = PipelineBuilder::default();
let (mut pub_, stages) = builder.input::<u64>();
let (mut output, pipeline) = stages.then(|x: u64| x + 1).build();
pub_.publish(9);
assert_eq!(output.recv(), 10);
pipeline.shutdown();
pipeline.join();
}
#[test]
fn pipeline_linear_then_fan_out() {
let (mut pub_, stages) = Pipeline::builder().capacity(64).input::<u64>();
let ((mut out_a, mut out_b), pipeline) = stages
.then(|x: u64| x + 10)
.fan_out(|x: u64| x * 2, |x: u64| x * 3)
.build();
pub_.publish(5);
assert_eq!(out_a.recv(), 30);
assert_eq!(out_b.recv(), 45);
assert_eq!(pipeline.stage_count(), 3);
pipeline.shutdown();
pipeline.join();
}
#[test]
fn zero_stage_pipeline() {
let (mut pub_, stages) = Pipeline::builder().capacity(64).input::<u64>();
let (mut output, pipeline) = stages.build();
pub_.publish(42);
assert_eq!(output.recv(), 42);
assert_eq!(pipeline.stage_count(), 0);
pipeline.shutdown();
pipeline.join();
}
#[test]
fn pipeline_with_wait_strategy() {
let (mut pub_, stages) = Pipeline::builder().capacity(64).input::<u64>();
let (mut output, pipeline) = stages
.then_with(|x: u64| x * 2, WaitStrategy::YieldSpin)
.then_with(|x: u64| x + 1, WaitStrategy::BackoffSpin)
.build();
pub_.publish(10);
assert_eq!(output.recv(), 21);
pipeline.shutdown();
pipeline.join();
}
#[test]
fn pipeline_mixed_then_and_then_with() {
let (mut pub_, stages) = Pipeline::builder().capacity(64).input::<u64>();
let (mut output, pipeline) = stages
.then(|x: u64| x + 10)
.then_with(|x: u64| x * 2, WaitStrategy::BusySpin)
.then(|x: u64| x - 5)
.build();
pub_.publish(5);
assert_eq!(output.recv(), 25);
pipeline.shutdown();
pipeline.join();
}
#[test]
fn fan_out_then_a_with_strategy() {
let (mut pub_, stages) = Pipeline::builder().capacity(64).input::<u64>();
let ((mut out_a, mut out_b), pipeline) = stages
.fan_out(|x: u64| x * 2, |x: u64| x + 100)
.then_a_with(|x: u64| x + 1, WaitStrategy::YieldSpin)
.build();
pub_.publish(5);
assert_eq!(out_a.recv(), 11); assert_eq!(out_b.recv(), 105);
pipeline.shutdown();
pipeline.join();
}
#[test]
fn fan_out_then_b_with_strategy() {
let (mut pub_, stages) = Pipeline::builder().capacity(64).input::<u64>();
let ((mut out_a, mut out_b), pipeline) = stages
.fan_out(|x: u64| x * 2, |x: u64| x + 100)
.then_b_with(|x: u64| x * 3, WaitStrategy::BackoffSpin)
.build();
pub_.publish(5);
assert_eq!(out_a.recv(), 10); assert_eq!(out_b.recv(), 315);
pipeline.shutdown();
pipeline.join();
}
}