Skip to main content

nv_perception/
batch.rs

1//! Shared batch processing trait and entry types.
2//!
3//! # Design intent
4//!
5//! Most perception stages are per-feed: each feed has its own detector,
6//! tracker, classifier, etc. However, inference-heavy stages (object
7//! detection, embedding extraction, scene classification) benefit from
8//! batching frames across multiple feeds into a single accelerator call.
9//!
10//! The [`BatchProcessor`] trait captures this pattern. The runtime
11//! collects frames from multiple feeds, dispatches them as a batch, and
12//! routes per-item results back to each feed's pipeline continuation.
13//!
14//! # Backend independence
15//!
16//! `BatchProcessor` does not assume ONNX, TensorRT, OpenVINO, or any
17//! specific inference framework. Implementors choose their own backend.
18//!
19//! # Temporal window support
20//!
21//! Each [`BatchEntry`] carries a single [`FrameEnvelope`] — the right
22//! granularity for single-frame inference.
23//!
24//! For models that operate on **temporal windows or clips** (e.g.,
25//! video transformers, clip-based action recognition), the extension
26//! path is:
27//!
28//! 1. A per-feed stage assembles the frame window from internal state
29//!    or the temporal store.
30//! 2. The assembled window is stored as a typed artifact in
31//!    [`StageOutput::artifacts`](crate::StageOutput) for a downstream
32//!    stage, or the batch processor manages its own per-feed window
33//!    buffers internally.
34//!
35//! This keeps the `BatchEntry` type and coordinator protocol focused on
36//! the single-frame case.
37
38use nv_core::error::StageError;
39use nv_core::id::{FeedId, StageId};
40use nv_frame::FrameEnvelope;
41use nv_view::ViewSnapshot;
42
43use crate::stage::{StageCapabilities, StageCategory, StageOutput};
44
45/// An entry in a batch, passed to [`BatchProcessor::process`].
46///
47/// Each entry represents one frame from one feed. The processor reads
48/// `frame` (and optionally `view` and `feed_id`) then writes the
49/// per-item result into `output`.
50///
51/// # Contract
52///
53/// After [`BatchProcessor::process`] returns `Ok(())`, every entry's
54/// `output` should be `Some(StageOutput)`. Entries left as `None` are
55/// treated as if the processor returned [`StageOutput::empty()`] for
56/// that item.
57pub struct BatchEntry {
58    /// The feed that submitted this frame.
59    pub feed_id: FeedId,
60    /// The video frame to process.
61    ///
62    /// `FrameEnvelope` is `Arc`-backed — zero-copy reference, cheap to
63    /// clone. Use `frame.require_host_data()` to obtain host-readable
64    /// bytes (zero-copy for host frames, cached materialization for
65    /// device frames), or `frame.host_data()` when host residency is
66    /// guaranteed.
67    pub frame: FrameEnvelope,
68    /// View-state snapshot at the time of this frame.
69    ///
70    /// Processors may use this to skip inference during rapid camera
71    /// movement or adapt behavior based on camera stability.
72    pub view: ViewSnapshot,
73    /// Slot for the processor to write its per-item output.
74    ///
75    /// Must be set to `Some(...)` for each successfully processed item.
76    /// Any [`StageOutput`] variant is valid — detections, scene features,
77    /// signals, or typed artifacts for downstream consumption.
78    pub output: Option<StageOutput>,
79}
80
81/// User-implementable trait for shared batch inference.
82///
83/// A `BatchProcessor` receives frames from potentially multiple feeds,
84/// processes them together (typically via GPU-accelerated inference),
85/// and writes per-frame results back into each [`BatchEntry::output`].
86///
87/// # Ownership model
88///
89/// The processor is **owned** by its coordinator thread (moved in via
90/// `Box<dyn BatchProcessor>`). It is never shared across threads —
91/// `Sync` is not required. The coordinator is the sole caller of
92/// every method on the trait.
93///
94/// `process()` takes `&mut self`, so the processor can hold mutable
95/// state (GPU session handles, scratch buffers, etc.) without interior
96/// mutability.
97///
98/// # Lifecycle
99///
100/// 1. `on_start()` — called once when the batch coordinator starts.
101/// 2. `process()` — called once per formed batch.
102/// 3. `on_stop()` — called once when the coordinator shuts down.
103///
104/// # Error handling
105///
106/// If `process()` returns `Err(StageError)`, the entire batch fails.
107/// All feed threads waiting on that batch receive the error and drop
108/// their frames (same semantics as a per-feed stage error).
109///
110/// # Output flexibility
111///
112/// [`StageOutput`] is the same type used by per-feed stages. A batch
113/// processor is not limited to detection — it can produce:
114///
115/// - **Detections** via [`StageOutput::with_detections`].
116/// - **Scene features** via [`StageOutput::with_scene_features`]
117///   (e.g., scene classification, embedding extraction).
118/// - **Signals** via [`StageOutput::with_signals`].
119/// - **Arbitrary typed artifacts** via [`StageOutput::with_artifact`]
120///   for downstream per-feed stages to consume.
121///
122/// Post-batch per-feed stages see these outputs through the normal
123/// [`StageContext::artifacts`](crate::StageContext) accumulator.
124///
125/// # Example
126///
127/// ```rust,no_run
128/// use nv_perception::batch::{BatchProcessor, BatchEntry};
129/// use nv_perception::{StageId, StageOutput, DetectionSet};
130/// use nv_core::error::StageError;
131///
132/// struct MyDetector { /* model handle */ }
133///
134/// impl BatchProcessor for MyDetector {
135///     fn id(&self) -> StageId { StageId("my_detector") }
136///
137///     fn process(&mut self, items: &mut [BatchEntry]) -> Result<(), StageError> {
138///         for item in items.iter_mut() {
139///             let pixels = item.frame.require_host_data()
140///                 .map_err(|e| StageError::ProcessingFailed {
141///                     stage_id: self.id(),
142///                     detail: e.to_string(),
143///                 })?;
144///             // ... run model on &*pixels ...
145///             item.output = Some(StageOutput::with_detections(DetectionSet::empty()));
146///         }
147///         Ok(())
148///     }
149/// }
150/// ```
151pub trait BatchProcessor: Send + 'static {
152    /// Unique name for this processor (used in provenance, metrics, logging).
153    fn id(&self) -> StageId;
154
155    /// Process a batch of frames.
156    ///
157    /// For each entry in `items`, read `frame` (and optionally `view`),
158    /// perform inference, and set `output` to `Some(StageOutput)`.
159    ///
160    /// The batch may contain frames from multiple feeds.
161    /// `items.len()` is bounded by the configured `max_batch_size`.
162    fn process(&mut self, items: &mut [BatchEntry]) -> Result<(), StageError>;
163
164    /// Called once when the batch coordinator starts.
165    ///
166    /// Allocate GPU resources, load models, warm up the runtime here.
167    fn on_start(&mut self) -> Result<(), StageError> {
168        Ok(())
169    }
170
171    /// Called once when the batch coordinator shuts down.
172    ///
173    /// Release resources here. Best-effort — errors are logged but not
174    /// fatal.
175    fn on_stop(&mut self) -> Result<(), StageError> {
176        Ok(())
177    }
178
179    /// Optional category hint — defaults to [`StageCategory::FrameAnalysis`].
180    ///
181    /// Aligns with [`Stage::category()`](crate::Stage::category) so that
182    /// validation and metrics treat the batch processor as a pipeline
183    /// participant, not a foreign construct.
184    fn category(&self) -> StageCategory {
185        StageCategory::FrameAnalysis
186    }
187
188    /// Declare input/output capabilities for pipeline validation.
189    ///
190    /// When provided, the feed pipeline builder can validate that
191    /// pre-batch stages satisfy the processor's input requirements
192    /// and post-batch stages' inputs are met by the processor's outputs.
193    ///
194    /// Returns `None` by default (opts out of validation).
195    fn capabilities(&self) -> Option<StageCapabilities> {
196        None
197    }
198}