1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
//! Shared batch processing trait and entry types.
//!
//! # Design intent
//!
//! Most perception stages are per-feed: each feed has its own detector,
//! tracker, classifier, etc. However, inference-heavy stages (object
//! detection, embedding extraction, scene classification) benefit from
//! batching frames across multiple feeds into a single accelerator call.
//!
//! The [`BatchProcessor`] trait captures this pattern. The runtime
//! collects frames from multiple feeds, dispatches them as a batch, and
//! routes per-item results back to each feed's pipeline continuation.
//!
//! # Backend independence
//!
//! `BatchProcessor` does not assume ONNX, TensorRT, OpenVINO, or any
//! specific inference framework. Implementors choose their own backend.
//!
//! # Temporal window support
//!
//! Each [`BatchEntry`] carries a single [`FrameEnvelope`] — the right
//! granularity for single-frame inference.
//!
//! For models that operate on **temporal windows or clips** (e.g.,
//! video transformers, clip-based action recognition), the extension
//! path is:
//!
//! 1. A per-feed stage assembles the frame window from internal state
//! or the temporal store.
//! 2. The assembled window is stored as a typed artifact in
//! [`StageOutput::artifacts`](crate::StageOutput) for a downstream
//! stage, or the batch processor manages its own per-feed window
//! buffers internally.
//!
//! This keeps the `BatchEntry` type and coordinator protocol focused on
//! the single-frame case.
use StageError;
use ;
use FrameEnvelope;
use ViewSnapshot;
use crate;
/// An entry in a batch, passed to [`BatchProcessor::process`].
///
/// Each entry represents one frame from one feed. The processor reads
/// `frame` (and optionally `view` and `feed_id`) then writes the
/// per-item result into `output`.
///
/// # Contract
///
/// After [`BatchProcessor::process`] returns `Ok(())`, every entry's
/// `output` should be `Some(StageOutput)`. Entries left as `None` are
/// treated as if the processor returned [`StageOutput::empty()`] for
/// that item.
/// User-implementable trait for shared batch inference.
///
/// A `BatchProcessor` receives frames from potentially multiple feeds,
/// processes them together (typically via GPU-accelerated inference),
/// and writes per-frame results back into each [`BatchEntry::output`].
///
/// # Ownership model
///
/// The processor is **owned** by its coordinator thread (moved in via
/// `Box<dyn BatchProcessor>`). It is never shared across threads —
/// `Sync` is not required. The coordinator is the sole caller of
/// every method on the trait.
///
/// `process()` takes `&mut self`, so the processor can hold mutable
/// state (GPU session handles, scratch buffers, etc.) without interior
/// mutability.
///
/// # Lifecycle
///
/// 1. `on_start()` — called once when the batch coordinator starts.
/// 2. `process()` — called once per formed batch.
/// 3. `on_stop()` — called once when the coordinator shuts down.
///
/// # Error handling
///
/// If `process()` returns `Err(StageError)`, the entire batch fails.
/// All feed threads waiting on that batch receive the error and drop
/// their frames (same semantics as a per-feed stage error).
///
/// # Output flexibility
///
/// [`StageOutput`] is the same type used by per-feed stages. A batch
/// processor is not limited to detection — it can produce:
///
/// - **Detections** via [`StageOutput::with_detections`].
/// - **Scene features** via [`StageOutput::with_scene_features`]
/// (e.g., scene classification, embedding extraction).
/// - **Signals** via [`StageOutput::with_signals`].
/// - **Arbitrary typed artifacts** via [`StageOutput::with_artifact`]
/// for downstream per-feed stages to consume.
///
/// Post-batch per-feed stages see these outputs through the normal
/// [`StageContext::artifacts`](crate::StageContext) accumulator.
///
/// # Example
///
/// ```rust,no_run
/// use nv_perception::batch::{BatchProcessor, BatchEntry};
/// use nv_perception::{StageId, StageOutput, DetectionSet};
/// use nv_core::error::StageError;
///
/// struct MyDetector { /* model handle */ }
///
/// impl BatchProcessor for MyDetector {
/// fn id(&self) -> StageId { StageId("my_detector") }
///
/// fn process(&mut self, items: &mut [BatchEntry]) -> Result<(), StageError> {
/// for item in items.iter_mut() {
/// let pixels = item.frame.require_host_data()
/// .map_err(|e| StageError::ProcessingFailed {
/// stage_id: self.id(),
/// detail: e.to_string(),
/// })?;
/// // ... run model on &*pixels ...
/// item.output = Some(StageOutput::with_detections(DetectionSet::empty()));
/// }
/// Ok(())
/// }
/// }
/// ```