nv_runtime/pipeline.rs
1//! Unified feed pipeline — stages with an optional shared batch point.
2//!
3//! [`FeedPipeline`] is the public model for defining a feed's processing
4//! pipeline. It presents a single linear pipeline where an optional
5//! **batch point** can be inserted between per-feed stages:
6//!
7//! ```text
8//! [pre-batch stages] → [batch point] → [post-batch stages]
9//! │
10//! SharedBatchProcessor (frames from multiple feeds)
11//! ```
12//!
13//! A pipeline without a batch point is simply all stages executed in
14//! order on the feed thread — identical to the original per-feed model.
15//!
16//! A pipeline with a batch point splits into:
17//! - **Pre-batch stages** — per-feed, on the feed thread (e.g., preprocessing)
18//! - **Batch point** — frame submitted to a shared [`BatchHandle`], results
19//! merged into the artifact accumulator
20//! - **Post-batch stages** — per-feed, on the feed thread (e.g., tracking,
21//! temporal analysis)
22//!
23//! This keeps batching as a natural part of the pipeline instead of a
24//! separate subsystem.
25//!
26//! # Example
27//!
28//! ```rust,no_run
29//! # use nv_perception::{Stage, StageContext, StageOutput};
30//! # use nv_core::{StageId, StageError};
31//! # struct Preprocessor;
32//! # impl Stage for Preprocessor {
33//! # fn id(&self) -> StageId { StageId("pre") }
34//! # fn process(&mut self, _: &StageContext<'_>) -> Result<StageOutput, StageError> { Ok(StageOutput::empty()) }
35//! # }
36//! # struct Tracker;
37//! # impl Stage for Tracker {
38//! # fn id(&self) -> StageId { StageId("trk") }
39//! # fn process(&mut self, _: &StageContext<'_>) -> Result<StageOutput, StageError> { Ok(StageOutput::empty()) }
40//! # }
41//! use nv_runtime::pipeline::FeedPipeline;
42//! # fn example(batch_handle: nv_runtime::batch::BatchHandle) -> Result<(), Box<dyn std::error::Error>> {
43//!
44//! let pipeline = FeedPipeline::builder()
45//! .add_stage(Preprocessor) // pre-batch
46//! .batch(batch_handle)? // shared batch point
47//! .add_stage(Tracker) // post-batch
48//! .build();
49//! # Ok(())
50//! # }
51//! ```
52
53use nv_perception::Stage;
54
55use crate::batch::BatchHandle;
56
57/// Errors from pipeline construction.
58#[derive(Debug, thiserror::Error)]
59pub enum PipelineError {
60 /// A second batch point was inserted into the pipeline.
61 #[error("only one batch point is allowed per pipeline")]
62 DuplicateBatchPoint,
63}
64
65/// The decomposed parts of a [`FeedPipeline`] consumed by the executor.
66pub(crate) type PipelineParts = (
67 Vec<Box<dyn Stage>>,
68 Option<BatchHandle>,
69 Vec<Box<dyn Stage>>,
70);
71
72/// A feed processing pipeline with an optional shared batch point.
73///
74/// Created via [`FeedPipeline::builder()`]. Consumed by
75/// [`FeedConfigBuilder::feed_pipeline()`](crate::FeedConfigBuilder).
76pub struct FeedPipeline {
77 pub(crate) pre_batch: Vec<Box<dyn Stage>>,
78 pub(crate) batch: Option<BatchHandle>,
79 pub(crate) post_batch: Vec<Box<dyn Stage>>,
80}
81
82impl FeedPipeline {
83 /// Create a new builder.
84 #[must_use]
85 pub fn builder() -> FeedPipelineBuilder {
86 FeedPipelineBuilder {
87 pre_batch: Vec::new(),
88 batch: None,
89 post_batch: Vec::new(),
90 after_batch: false,
91 }
92 }
93
94 /// Total number of per-feed stages (excluding the batch point).
95 #[must_use]
96 pub fn stage_count(&self) -> usize {
97 self.pre_batch.len() + self.post_batch.len()
98 }
99
100 /// Whether this pipeline includes a shared batch point.
101 #[must_use]
102 pub fn has_batch(&self) -> bool {
103 self.batch.is_some()
104 }
105
106 /// Decompose into parts consumed by the executor.
107 pub(crate) fn into_parts(self) -> PipelineParts {
108 (self.pre_batch, self.batch, self.post_batch)
109 }
110}
111
112/// Builder for [`FeedPipeline`].
113///
114/// Stages added before [`batch()`](Self::batch) are pre-batch stages.
115/// Stages added after are post-batch stages. If `batch()` is never
116/// called, all stages are treated as a normal linear pipeline (no
117/// batch boundary).
118pub struct FeedPipelineBuilder {
119 pre_batch: Vec<Box<dyn Stage>>,
120 batch: Option<BatchHandle>,
121 post_batch: Vec<Box<dyn Stage>>,
122 after_batch: bool,
123}
124
125impl FeedPipelineBuilder {
126 /// Append a per-feed stage to the pipeline.
127 ///
128 /// If called before [`batch()`](Self::batch), the stage runs
129 /// pre-batch. If called after, the stage runs post-batch.
130 #[must_use]
131 pub fn add_stage(mut self, stage: impl Stage) -> Self {
132 if self.after_batch {
133 self.post_batch.push(Box::new(stage));
134 } else {
135 self.pre_batch.push(Box::new(stage));
136 }
137 self
138 }
139
140 /// Append a boxed per-feed stage.
141 #[must_use]
142 pub fn add_boxed_stage(mut self, stage: Box<dyn Stage>) -> Self {
143 if self.after_batch {
144 self.post_batch.push(stage);
145 } else {
146 self.pre_batch.push(stage);
147 }
148 self
149 }
150
151 /// Insert the shared batch point at the current position.
152 ///
153 /// Stages added before this call are pre-batch; stages added after
154 /// are post-batch. Only one batch point is allowed per pipeline.
155 ///
156 /// # Errors
157 ///
158 /// Returns [`PipelineError::DuplicateBatchPoint`] if called more
159 /// than once on the same builder.
160 pub fn batch(mut self, handle: BatchHandle) -> Result<Self, PipelineError> {
161 if self.batch.is_some() {
162 return Err(PipelineError::DuplicateBatchPoint);
163 }
164 self.batch = Some(handle);
165 self.after_batch = true;
166 Ok(self)
167 }
168
169 /// Build the pipeline.
170 #[must_use]
171 pub fn build(self) -> FeedPipeline {
172 FeedPipeline {
173 pre_batch: self.pre_batch,
174 batch: self.batch,
175 post_batch: self.post_batch,
176 }
177 }
178}
179
180#[cfg(test)]
181mod tests {
182 use super::*;
183 use nv_core::error::StageError;
184 use nv_core::id::StageId;
185 use nv_perception::{StageContext, StageOutput};
186
187 struct DummyStage(&'static str);
188 impl Stage for DummyStage {
189 fn id(&self) -> StageId {
190 StageId(self.0)
191 }
192 fn process(&mut self, _ctx: &StageContext<'_>) -> Result<StageOutput, StageError> {
193 Ok(StageOutput::empty())
194 }
195 }
196
197 #[test]
198 fn pipeline_without_batch() {
199 let p = FeedPipeline::builder()
200 .add_stage(DummyStage("a"))
201 .add_stage(DummyStage("b"))
202 .build();
203
204 assert_eq!(p.stage_count(), 2);
205 assert!(!p.has_batch());
206 let (pre, batch, post) = p.into_parts();
207 assert_eq!(pre.len(), 2);
208 assert!(batch.is_none());
209 assert!(post.is_empty());
210 }
211
212 #[test]
213 fn pipeline_stage_count_with_batch() {
214 // Can't easily create a real BatchHandle in unit tests without
215 // the coordinator, so we test the builder's structural correctness
216 // by verifying pre/post split via an indirect approach.
217 let p = FeedPipeline::builder()
218 .add_stage(DummyStage("pre1"))
219 .add_stage(DummyStage("pre2"))
220 .build();
221
222 // Without batch, all stages are pre-batch.
223 let (pre, _, post) = p.into_parts();
224 assert_eq!(pre.len(), 2);
225 assert!(post.is_empty());
226 }
227
228 #[test]
229 fn double_batch_returns_error() {
230 // Create a real BatchHandle via the coordinator.
231 use crate::batch::{BatchConfig, BatchCoordinator};
232 use nv_core::health::HealthEvent;
233 use nv_core::id::StageId;
234 use nv_perception::batch::{BatchEntry, BatchProcessor};
235 use std::time::Duration;
236
237 struct Noop;
238 impl BatchProcessor for Noop {
239 fn id(&self) -> StageId {
240 StageId("noop")
241 }
242 fn process(&mut self, _: &mut [BatchEntry]) -> Result<(), nv_core::error::StageError> {
243 Ok(())
244 }
245 }
246
247 let (health_tx, _) = tokio::sync::broadcast::channel::<HealthEvent>(4);
248 let coord = BatchCoordinator::start(
249 Box::new(Noop),
250 BatchConfig {
251 max_batch_size: 1,
252 max_latency: Duration::from_millis(10),
253 queue_capacity: None,
254 response_timeout: None,
255 max_in_flight_per_feed: 1,
256 startup_timeout: None,
257 },
258 health_tx,
259 )
260 .unwrap();
261 let handle = coord.handle();
262
263 // The second `.batch()` call should return an error.
264 let result = FeedPipeline::builder()
265 .batch(handle.clone())
266 .expect("first batch() should succeed")
267 .batch(handle);
268 assert!(result.is_err(), "duplicate batch point should return error");
269 }
270}