nv_perception/batch.rs
1//! Shared batch processing trait and entry types.
2//!
3//! # Design intent
4//!
5//! Most perception stages are per-feed: each feed has its own detector,
6//! tracker, classifier, etc. However, inference-heavy stages (object
7//! detection, embedding extraction, scene classification) benefit from
8//! batching frames across multiple feeds into a single accelerator call.
9//!
10//! The [`BatchProcessor`] trait captures this pattern. The runtime
11//! collects frames from multiple feeds, dispatches them as a batch, and
12//! routes per-item results back to each feed's pipeline continuation.
13//!
14//! # Backend independence
15//!
16//! `BatchProcessor` does not assume ONNX, TensorRT, OpenVINO, or any
17//! specific inference framework. Implementors choose their own backend.
18//!
19//! # Temporal window support
20//!
21//! Each [`BatchEntry`] carries a single [`FrameEnvelope`] — the right
22//! granularity for single-frame inference.
23//!
24//! For models that operate on **temporal windows or clips** (e.g.,
25//! video transformers, clip-based action recognition), the extension
26//! path is:
27//!
28//! 1. A per-feed stage assembles the frame window from internal state
29//! or the temporal store.
30//! 2. The assembled window is stored as a typed artifact in
31//! [`StageOutput::artifacts`](crate::StageOutput) for a downstream
32//! stage, or the batch processor manages its own per-feed window
33//! buffers internally.
34//!
35//! This keeps the `BatchEntry` type and coordinator protocol focused on
36//! the single-frame case.
37
38use nv_core::error::StageError;
39use nv_core::id::{FeedId, StageId};
40use nv_frame::FrameEnvelope;
41use nv_view::ViewSnapshot;
42
43use crate::stage::{StageCapabilities, StageCategory, StageOutput};
44
45/// An entry in a batch, passed to [`BatchProcessor::process`].
46///
47/// Each entry represents one frame from one feed. The processor reads
48/// `frame` (and optionally `view` and `feed_id`) then writes the
49/// per-item result into `output`.
50///
51/// # Contract
52///
53/// After [`BatchProcessor::process`] returns `Ok(())`, every entry's
54/// `output` should be `Some(StageOutput)`. Entries left as `None` are
55/// treated as if the processor returned [`StageOutput::empty()`] for
56/// that item.
57pub struct BatchEntry {
58 /// The feed that submitted this frame.
59 pub feed_id: FeedId,
60 /// The video frame to process.
61 ///
62 /// `FrameEnvelope` is `Arc`-backed — zero-copy reference, cheap to
63 /// clone. Use `frame.require_host_data()` to obtain host-readable
64 /// bytes (zero-copy for host frames, cached materialization for
65 /// device frames), or `frame.host_data()` when host residency is
66 /// guaranteed.
67 pub frame: FrameEnvelope,
68 /// View-state snapshot at the time of this frame.
69 ///
70 /// Processors may use this to skip inference during rapid camera
71 /// movement or adapt behavior based on camera stability.
72 pub view: ViewSnapshot,
73 /// Slot for the processor to write its per-item output.
74 ///
75 /// Must be set to `Some(...)` for each successfully processed item.
76 /// Any [`StageOutput`] variant is valid — detections, scene features,
77 /// signals, or typed artifacts for downstream consumption.
78 pub output: Option<StageOutput>,
79}
80
81/// User-implementable trait for shared batch inference.
82///
83/// A `BatchProcessor` receives frames from potentially multiple feeds,
84/// processes them together (typically via GPU-accelerated inference),
85/// and writes per-frame results back into each [`BatchEntry::output`].
86///
87/// # Ownership model
88///
89/// The processor is **owned** by its coordinator thread (moved in via
90/// `Box<dyn BatchProcessor>`). It is never shared across threads —
91/// `Sync` is not required. The coordinator is the sole caller of
92/// every method on the trait.
93///
94/// `process()` takes `&mut self`, so the processor can hold mutable
95/// state (GPU session handles, scratch buffers, etc.) without interior
96/// mutability.
97///
98/// # Lifecycle
99///
100/// 1. `on_start()` — called once when the batch coordinator starts.
101/// 2. `process()` — called once per formed batch.
102/// 3. `on_stop()` — called once when the coordinator shuts down.
103///
104/// # Error handling
105///
106/// If `process()` returns `Err(StageError)`, the entire batch fails.
107/// All feed threads waiting on that batch receive the error and drop
108/// their frames (same semantics as a per-feed stage error).
109///
110/// # Output flexibility
111///
112/// [`StageOutput`] is the same type used by per-feed stages. A batch
113/// processor is not limited to detection — it can produce:
114///
115/// - **Detections** via [`StageOutput::with_detections`].
116/// - **Scene features** via [`StageOutput::with_scene_features`]
117/// (e.g., scene classification, embedding extraction).
118/// - **Signals** via [`StageOutput::with_signals`].
119/// - **Arbitrary typed artifacts** via [`StageOutput::with_artifact`]
120/// for downstream per-feed stages to consume.
121///
122/// Post-batch per-feed stages see these outputs through the normal
123/// [`StageContext::artifacts`](crate::StageContext) accumulator.
124///
125/// # Example
126///
127/// ```rust,no_run
128/// use nv_perception::batch::{BatchProcessor, BatchEntry};
129/// use nv_perception::{StageId, StageOutput, DetectionSet};
130/// use nv_core::error::StageError;
131///
132/// struct MyDetector { /* model handle */ }
133///
134/// impl BatchProcessor for MyDetector {
135/// fn id(&self) -> StageId { StageId("my_detector") }
136///
137/// fn process(&mut self, items: &mut [BatchEntry]) -> Result<(), StageError> {
138/// for item in items.iter_mut() {
139/// let pixels = item.frame.require_host_data()
140/// .map_err(|e| StageError::ProcessingFailed {
141/// stage_id: self.id(),
142/// detail: e.to_string(),
143/// })?;
144/// // ... run model on &*pixels ...
145/// item.output = Some(StageOutput::with_detections(DetectionSet::empty()));
146/// }
147/// Ok(())
148/// }
149/// }
150/// ```
151pub trait BatchProcessor: Send + 'static {
152 /// Unique name for this processor (used in provenance, metrics, logging).
153 fn id(&self) -> StageId;
154
155 /// Process a batch of frames.
156 ///
157 /// For each entry in `items`, read `frame` (and optionally `view`),
158 /// perform inference, and set `output` to `Some(StageOutput)`.
159 ///
160 /// The batch may contain frames from multiple feeds.
161 /// `items.len()` is bounded by the configured `max_batch_size`.
162 fn process(&mut self, items: &mut [BatchEntry]) -> Result<(), StageError>;
163
164 /// Called once when the batch coordinator starts.
165 ///
166 /// Allocate GPU resources, load models, warm up the runtime here.
167 fn on_start(&mut self) -> Result<(), StageError> {
168 Ok(())
169 }
170
171 /// Called once when the batch coordinator shuts down.
172 ///
173 /// Release resources here. Best-effort — errors are logged but not
174 /// fatal.
175 fn on_stop(&mut self) -> Result<(), StageError> {
176 Ok(())
177 }
178
179 /// Optional category hint — defaults to [`StageCategory::FrameAnalysis`].
180 ///
181 /// Aligns with [`Stage::category()`](crate::Stage::category) so that
182 /// validation and metrics treat the batch processor as a pipeline
183 /// participant, not a foreign construct.
184 fn category(&self) -> StageCategory {
185 StageCategory::FrameAnalysis
186 }
187
188 /// Declare input/output capabilities for pipeline validation.
189 ///
190 /// When provided, the feed pipeline builder can validate that
191 /// pre-batch stages satisfy the processor's input requirements
192 /// and post-batch stages' inputs are met by the processor's outputs.
193 ///
194 /// Returns `None` by default (opts out of validation).
195 fn capabilities(&self) -> Option<StageCapabilities> {
196 None
197 }
198}