photon-ring 2.5.0

Ultra-low-latency SPMC/MPMC pub/sub using stamped ring buffers. Formally sound with atomic-slots feature. no_std compatible.
Documentation
// Copyright 2026 Photon Ring Contributors
// SPDX-License-Identifier: Apache-2.0

extern crate std;

use alloc::sync::Arc;
use alloc::vec::Vec;
use core::sync::atomic::{AtomicBool, AtomicU8, Ordering};
use std::thread::JoinHandle;

use super::{STAGE_PANICKED, STAGE_RUNNING};

/// A multi-stage processing pipeline.
///
/// Each stage reads from one Photon Ring channel, applies a transformation,
/// and publishes to the next channel. Stages run on dedicated threads.
///
/// Created via [`Pipeline::builder`] -> [`PipelineBuilder::input`] ->
/// [`StageBuilder::then`] / [`StageBuilder::fan_out`] -> [`StageBuilder::build`].
pub struct Pipeline {
    pub(super) handles: Vec<JoinHandle<()>>,
    pub(super) shutdown: Arc<AtomicBool>,
    pub(super) statuses: Vec<Arc<AtomicU8>>,
}

impl Pipeline {
    /// Create a new pipeline builder.
    pub fn builder() -> super::builder::PipelineBuilder {
        super::builder::PipelineBuilder::new()
    }

    /// Signal all stages to shut down gracefully.
    ///
    /// Stages will finish processing their current item and then exit.
    /// Call [`join`](Pipeline::join) to wait for them to complete.
    pub fn shutdown(&self) {
        self.shutdown.store(true, Ordering::Release);
    }

    /// Wait for all stage threads to finish.
    ///
    /// This consumes the pipeline. Call [`shutdown`](Pipeline::shutdown)
    /// first, or the threads may run indefinitely.
    ///
    /// If any stage panicked, the first panic is resumed after all threads
    /// have been joined. Use [`try_join`](Pipeline::try_join) to handle
    /// panics without re-unwinding.
    pub fn join(mut self) {
        let handles = core::mem::take(&mut self.handles);
        let mut first_panic = None;
        for h in handles {
            if let Err(e) = h.join() {
                if first_panic.is_none() {
                    first_panic = Some(e);
                }
            }
        }
        if let Some(panic) = first_panic {
            std::panic::resume_unwind(panic);
        }
    }

    /// Wait for all stage threads to finish, returning any panic payload
    /// instead of re-unwinding.
    ///
    /// Returns `Ok(())` if all stages completed cleanly, or
    /// `Err(payload)` with the first panic payload if any stage panicked.
    pub fn try_join(mut self) -> Result<(), alloc::boxed::Box<dyn core::any::Any + Send>> {
        let handles = core::mem::take(&mut self.handles);
        let mut first_panic = None;
        for h in handles {
            if let Err(e) = h.join() {
                if first_panic.is_none() {
                    first_panic = Some(e);
                }
            }
        }
        match first_panic {
            Some(panic) => Err(panic),
            None => Ok(()),
        }
    }

    /// Return the indices of stages that have panicked.
    ///
    /// Returns an empty vec if all stages are healthy.
    pub fn panicked_stages(&self) -> Vec<usize> {
        self.statuses
            .iter()
            .enumerate()
            .filter(|(_, s)| s.load(Ordering::Acquire) == STAGE_PANICKED)
            .map(|(i, _)| i)
            .collect()
    }

    /// Returns `true` if all stages are still running (none have panicked
    /// or completed).
    pub fn is_healthy(&self) -> bool {
        self.statuses
            .iter()
            .all(|s| s.load(Ordering::Acquire) == STAGE_RUNNING)
    }

    /// Number of stages in this pipeline.
    pub fn stage_count(&self) -> usize {
        self.statuses.len()
    }
}

/// Signals shutdown on drop. Call `join()` before dropping for a clean shutdown.
impl Drop for Pipeline {
    fn drop(&mut self) {
        self.shutdown.store(true, Ordering::Release);
    }
}