Skip to main content

nv_runtime/
pipeline.rs

1//! Unified feed pipeline — stages with an optional shared batch point.
2//!
3//! [`FeedPipeline`] is the public model for defining a feed's processing
4//! pipeline. It presents a single linear pipeline where an optional
5//! **batch point** can be inserted between per-feed stages:
6//!
7//! ```text
8//! [pre-batch stages] → [batch point] → [post-batch stages]
9//!                           │
10//!                    SharedBatchProcessor (frames from multiple feeds)
11//! ```
12//!
13//! A pipeline without a batch point is simply all stages executed in
14//! order on the feed thread — identical to the original per-feed model.
15//!
16//! A pipeline with a batch point splits into:
17//! - **Pre-batch stages** — per-feed, on the feed thread (e.g., preprocessing)
18//! - **Batch point** — frame submitted to a shared [`BatchHandle`], results
19//!   merged into the artifact accumulator
20//! - **Post-batch stages** — per-feed, on the feed thread (e.g., tracking,
21//!   temporal analysis)
22//!
23//! This keeps batching as a natural part of the pipeline instead of a
24//! separate subsystem.
25//!
26//! # Example
27//!
28//! ```rust,no_run
29//! # use nv_perception::{Stage, StageContext, StageOutput};
30//! # use nv_core::{StageId, StageError};
31//! # struct Preprocessor;
32//! # impl Stage for Preprocessor {
33//! #     fn id(&self) -> StageId { StageId("pre") }
34//! #     fn process(&mut self, _: &StageContext<'_>) -> Result<StageOutput, StageError> { Ok(StageOutput::empty()) }
35//! # }
36//! # struct Tracker;
37//! # impl Stage for Tracker {
38//! #     fn id(&self) -> StageId { StageId("trk") }
39//! #     fn process(&mut self, _: &StageContext<'_>) -> Result<StageOutput, StageError> { Ok(StageOutput::empty()) }
40//! # }
41//! use nv_runtime::pipeline::FeedPipeline;
42//! # fn example(batch_handle: nv_runtime::batch::BatchHandle) -> Result<(), Box<dyn std::error::Error>> {
43//!
44//! let pipeline = FeedPipeline::builder()
45//!     .add_stage(Preprocessor)        // pre-batch
46//!     .batch(batch_handle)?            // shared batch point
47//!     .add_stage(Tracker)              // post-batch
48//!     .build();
49//! # Ok(())
50//! # }
51//! ```
52
53use nv_perception::Stage;
54
55use crate::batch::BatchHandle;
56
57/// Errors from pipeline construction.
58#[derive(Debug, thiserror::Error)]
59pub enum PipelineError {
60    /// A second batch point was inserted into the pipeline.
61    #[error("only one batch point is allowed per pipeline")]
62    DuplicateBatchPoint,
63}
64
65/// The decomposed parts of a [`FeedPipeline`] consumed by the executor.
66pub(crate) type PipelineParts = (
67    Vec<Box<dyn Stage>>,
68    Option<BatchHandle>,
69    Vec<Box<dyn Stage>>,
70);
71
72/// A feed processing pipeline with an optional shared batch point.
73///
74/// Created via [`FeedPipeline::builder()`]. Consumed by
75/// [`FeedConfigBuilder::feed_pipeline()`](crate::FeedConfigBuilder).
76pub struct FeedPipeline {
77    pub(crate) pre_batch: Vec<Box<dyn Stage>>,
78    pub(crate) batch: Option<BatchHandle>,
79    pub(crate) post_batch: Vec<Box<dyn Stage>>,
80}
81
82impl FeedPipeline {
83    /// Create a new builder.
84    #[must_use]
85    pub fn builder() -> FeedPipelineBuilder {
86        FeedPipelineBuilder {
87            pre_batch: Vec::new(),
88            batch: None,
89            post_batch: Vec::new(),
90            after_batch: false,
91        }
92    }
93
94    /// Total number of per-feed stages (excluding the batch point).
95    #[must_use]
96    pub fn stage_count(&self) -> usize {
97        self.pre_batch.len() + self.post_batch.len()
98    }
99
100    /// Whether this pipeline includes a shared batch point.
101    #[must_use]
102    pub fn has_batch(&self) -> bool {
103        self.batch.is_some()
104    }
105
106    /// Decompose into parts consumed by the executor.
107    pub(crate) fn into_parts(self) -> PipelineParts {
108        (self.pre_batch, self.batch, self.post_batch)
109    }
110}
111
112/// Builder for [`FeedPipeline`].
113///
114/// Stages added before [`batch()`](Self::batch) are pre-batch stages.
115/// Stages added after are post-batch stages. If `batch()` is never
116/// called, all stages are treated as a normal linear pipeline (no
117/// batch boundary).
118pub struct FeedPipelineBuilder {
119    pre_batch: Vec<Box<dyn Stage>>,
120    batch: Option<BatchHandle>,
121    post_batch: Vec<Box<dyn Stage>>,
122    after_batch: bool,
123}
124
125impl FeedPipelineBuilder {
126    /// Append a per-feed stage to the pipeline.
127    ///
128    /// If called before [`batch()`](Self::batch), the stage runs
129    /// pre-batch. If called after, the stage runs post-batch.
130    #[must_use]
131    pub fn add_stage(mut self, stage: impl Stage) -> Self {
132        if self.after_batch {
133            self.post_batch.push(Box::new(stage));
134        } else {
135            self.pre_batch.push(Box::new(stage));
136        }
137        self
138    }
139
140    /// Append a boxed per-feed stage.
141    #[must_use]
142    pub fn add_boxed_stage(mut self, stage: Box<dyn Stage>) -> Self {
143        if self.after_batch {
144            self.post_batch.push(stage);
145        } else {
146            self.pre_batch.push(stage);
147        }
148        self
149    }
150
151    /// Insert the shared batch point at the current position.
152    ///
153    /// Stages added before this call are pre-batch; stages added after
154    /// are post-batch. Only one batch point is allowed per pipeline.
155    ///
156    /// # Errors
157    ///
158    /// Returns [`PipelineError::DuplicateBatchPoint`] if called more
159    /// than once on the same builder.
160    pub fn batch(mut self, handle: BatchHandle) -> Result<Self, PipelineError> {
161        if self.batch.is_some() {
162            return Err(PipelineError::DuplicateBatchPoint);
163        }
164        self.batch = Some(handle);
165        self.after_batch = true;
166        Ok(self)
167    }
168
169    /// Build the pipeline.
170    #[must_use]
171    pub fn build(self) -> FeedPipeline {
172        FeedPipeline {
173            pre_batch: self.pre_batch,
174            batch: self.batch,
175            post_batch: self.post_batch,
176        }
177    }
178}
179
180#[cfg(test)]
181mod tests {
182    use super::*;
183    use nv_core::error::StageError;
184    use nv_core::id::StageId;
185    use nv_perception::{StageContext, StageOutput};
186
187    struct DummyStage(&'static str);
188    impl Stage for DummyStage {
189        fn id(&self) -> StageId {
190            StageId(self.0)
191        }
192        fn process(&mut self, _ctx: &StageContext<'_>) -> Result<StageOutput, StageError> {
193            Ok(StageOutput::empty())
194        }
195    }
196
197    #[test]
198    fn pipeline_without_batch() {
199        let p = FeedPipeline::builder()
200            .add_stage(DummyStage("a"))
201            .add_stage(DummyStage("b"))
202            .build();
203
204        assert_eq!(p.stage_count(), 2);
205        assert!(!p.has_batch());
206        let (pre, batch, post) = p.into_parts();
207        assert_eq!(pre.len(), 2);
208        assert!(batch.is_none());
209        assert!(post.is_empty());
210    }
211
212    #[test]
213    fn pipeline_stage_count_with_batch() {
214        // Can't easily create a real BatchHandle in unit tests without
215        // the coordinator, so we test the builder's structural correctness
216        // by verifying pre/post split via an indirect approach.
217        let p = FeedPipeline::builder()
218            .add_stage(DummyStage("pre1"))
219            .add_stage(DummyStage("pre2"))
220            .build();
221
222        // Without batch, all stages are pre-batch.
223        let (pre, _, post) = p.into_parts();
224        assert_eq!(pre.len(), 2);
225        assert!(post.is_empty());
226    }
227
228    #[test]
229    fn double_batch_returns_error() {
230        // Create a real BatchHandle via the coordinator.
231        use crate::batch::{BatchConfig, BatchCoordinator};
232        use nv_core::health::HealthEvent;
233        use nv_core::id::StageId;
234        use nv_perception::batch::{BatchEntry, BatchProcessor};
235        use std::time::Duration;
236
237        struct Noop;
238        impl BatchProcessor for Noop {
239            fn id(&self) -> StageId {
240                StageId("noop")
241            }
242            fn process(&mut self, _: &mut [BatchEntry]) -> Result<(), nv_core::error::StageError> {
243                Ok(())
244            }
245        }
246
247        let (health_tx, _) = tokio::sync::broadcast::channel::<HealthEvent>(4);
248        let coord = BatchCoordinator::start(
249            Box::new(Noop),
250            BatchConfig {
251                max_batch_size: 1,
252                max_latency: Duration::from_millis(10),
253                queue_capacity: None,
254                response_timeout: None,
255                max_in_flight_per_feed: 1,
256                startup_timeout: None,
257            },
258            health_tx,
259        )
260        .unwrap();
261        let handle = coord.handle();
262
263        // The second `.batch()` call should return an error.
264        let result = FeedPipeline::builder()
265            .batch(handle.clone())
266            .expect("first batch() should succeed")
267            .batch(handle);
268        assert!(result.is_err(), "duplicate batch point should return error");
269    }
270}