nv_perception/stage.rs
1//! The [`Stage`] trait — the core user-implementable perception contract.
2//!
3//! Stages are the primary extension point for adding perception capabilities
4//! to the pipeline. Each stage processes one frame at a time and produces
5//! structured output.
6//!
7//! # Design intent: one trait, composed pipelines
8//!
9//! The library uses a **single** `Stage` trait for all stage types —
10//! detection, tracking, classification, scene analysis, etc.
11//! The abstraction stays minimal, and specialization happens by
12//! convention (which fields a stage populates), not by type hierarchy.
13//!
14//! Instead, the pipeline _composes_ stages linearly: earlier stages write
15//! fields into [`StageOutput`] that later stages read from
16//! [`StageContext::artifacts`]. Specialization happens by convention (which
17//! fields a stage populates), not by type hierarchy.
18//!
19//! # Supported model patterns
20//!
21//! The single-trait design naturally supports several model architectures:
22//!
23//! - **Classical detector → tracker**: a `FrameAnalysis` stage produces
24//! detections, then an `Association` stage consumes them and produces tracks.
25//! - **Joint detection+tracking**: a single `FrameAnalysis` stage sets both
26//! `detections` and `tracks` in its [`StageOutput`]. No separate tracker
27//! stage is needed.
28//! - **Direct track emitter**: a stage produces `tracks` without intermediate
29//! detections. Set `detection_id` to `None` on each `TrackObservation`.
30//! - **Richer observations**: per-observation metadata (embeddings, features,
31//! model-specific scores) is stored in `TrackObservation::metadata`.
32//! Per-track metadata goes in [`Track::metadata`]. Per-frame shared data
33//! goes in [`StageOutput::artifacts`].
34//!
35//! # What a stage should do
36//!
37//! - Process a single frame and return structured results.
38//! - Read upstream artifacts from [`StageContext::artifacts`].
39//! - Read temporal history from [`StageContext::temporal`].
40//! - Populate only the [`StageOutput`] fields it owns.
41//! - Remain stateless across feeds — internal state is per-feed.
42//!
43//! # What a stage should NOT do
44//!
45//! - Block on network I/O (manage async bridges internally).
46//! - Mutate shared global state.
47//! - Produce side-channel output bypassing [`StageOutput`].
48//! - Depend on stage execution order beyond what upstream artifacts provide.
49//! - Accumulate unbounded internal state (use the temporal store instead).
50
51use crate::artifact::PerceptionArtifacts;
52use crate::detection::DetectionSet;
53use crate::scene::SceneFeature;
54use crate::signal::DerivedSignal;
55use crate::temporal_access::TemporalAccess;
56use crate::track::Track;
57use nv_core::TypedMetadata;
58use nv_core::error::StageError;
59use nv_core::id::{FeedId, StageId};
60use nv_core::metrics::StageMetrics;
61use nv_frame::FrameEnvelope;
62use nv_view::{ViewEpoch, ViewSnapshot};
63
64/// Optional category hint for a perception stage.
65///
66/// Does not affect execution order or behavior — the pipeline executor
67/// treats all stages uniformly. Categories serve as:
68///
69/// - **Documentation** — makes pipeline composition self-describing.
70/// - **Metrics grouping** — category-aware dashboards and provenance.
71/// - **Composition validation** — pipeline builders can warn about
72/// unusual orderings (e.g., a tracker before a detector).
73///
74/// A stage may produce **any combination** of output fields regardless of
75/// its declared category. Categories describe the stage's *primary role*,
76/// not a hard constraint on what it can output. For example, a joint
77/// detection+tracking model that reads pixels and produces both detections
78/// and tracks in one forward pass is best categorized as `FrameAnalysis`.
79///
80/// Stages report their category via [`Stage::category()`], which defaults
81/// to [`StageCategory::Custom`].
82#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
83pub enum StageCategory {
84 /// Reads frame pixel data, produces perception artifacts.
85 ///
86 /// This is the natural category for any model that takes raw pixels
87 /// as primary input, regardless of what it produces:
88 ///
89 /// - Classical object detectors (outputs: detections)
90 /// - Joint detection+tracking models (outputs: detections + tracks)
91 /// - Direct track-emitting models (outputs: tracks only)
92 /// - Feature extractors, scene classifiers (outputs: scene features)
93 /// - Background subtractors (outputs: detections or scene features)
94 FrameAnalysis,
95 /// Reads upstream artifacts (detections, features) and produces tracks.
96 ///
97 /// Use this for stages whose primary role is *association* — matching
98 /// observations across time — when the tracking logic is separate from
99 /// the pixel-reading model.
100 ///
101 /// Examples: multi-object tracker, re-identification matcher.
102 Association,
103 /// Reads temporal state and accumulated artifacts, produces derived
104 /// signals or scene-level features.
105 ///
106 /// Examples: trajectory analyzer, anomaly detector, dwell-time estimator.
107 TemporalAnalysis,
108 /// Reads accumulated artifacts and performs side-effect output.
109 ///
110 /// Returns empty [`StageOutput`] — does not modify the artifact accumulator.
111 /// Examples: structured logger, metric exporter, event publisher.
112 Sink,
113 /// Uncategorized or multi-purpose stage.
114 Custom,
115}
116
117/// Declares what artifact types a stage produces and consumes.
118///
119/// Used by [`StagePipeline::validate()`](crate::StagePipeline::validate) to
120/// detect unsatisfied dependencies (e.g., a tracker that consumes detections
121/// placed before the detector that produces them).
122///
123/// Stages report capabilities via [`Stage::capabilities()`]. The default
124/// implementation returns `None`, meaning the stage opts out of validation.
125///
126/// # Validated fields
127///
128/// The validator checks:
129/// - `consumes_detections` / `produces_detections`
130/// - `consumes_tracks` / `produces_tracks`
131///
132/// The remaining fields (`consumes_temporal`, `produces_signals`,
133/// `produces_scene_features`) are informational — available
134/// for external tooling but not enforced by the built-in validator.
135///
136/// # Example
137///
138/// ```
139/// use nv_perception::StageCapabilities;
140///
141/// let caps = StageCapabilities::new()
142/// .consumes_detections()
143/// .produces_tracks();
144/// ```
145#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
146pub struct StageCapabilities {
147 /// Stage reads detections from the artifact accumulator.
148 pub consumes_detections: bool,
149 /// Stage reads tracks from the artifact accumulator.
150 pub consumes_tracks: bool,
151 /// Stage reads temporal state.
152 pub consumes_temporal: bool,
153 /// Stage produces detections.
154 pub produces_detections: bool,
155 /// Stage produces tracks.
156 pub produces_tracks: bool,
157 /// Stage produces signals.
158 pub produces_signals: bool,
159 /// Stage produces scene features.
160 pub produces_scene_features: bool,
161}
162
163impl StageCapabilities {
164 /// Create empty capabilities (nothing consumed or produced).
165 #[must_use]
166 pub fn new() -> Self {
167 Self::default()
168 }
169
170 /// Mark this stage as consuming detections.
171 #[must_use]
172 pub fn consumes_detections(mut self) -> Self {
173 self.consumes_detections = true;
174 self
175 }
176
177 /// Mark this stage as consuming tracks.
178 #[must_use]
179 pub fn consumes_tracks(mut self) -> Self {
180 self.consumes_tracks = true;
181 self
182 }
183
184 /// Mark this stage as consuming temporal state.
185 #[must_use]
186 pub fn consumes_temporal(mut self) -> Self {
187 self.consumes_temporal = true;
188 self
189 }
190
191 /// Mark this stage as producing detections.
192 #[must_use]
193 pub fn produces_detections(mut self) -> Self {
194 self.produces_detections = true;
195 self
196 }
197
198 /// Mark this stage as producing tracks.
199 #[must_use]
200 pub fn produces_tracks(mut self) -> Self {
201 self.produces_tracks = true;
202 self
203 }
204
205 /// Mark this stage as producing signals.
206 #[must_use]
207 pub fn produces_signals(mut self) -> Self {
208 self.produces_signals = true;
209 self
210 }
211
212 /// Mark this stage as producing scene features.
213 #[must_use]
214 pub fn produces_scene_features(mut self) -> Self {
215 self.produces_scene_features = true;
216 self
217 }
218}
219
220/// Context provided to every stage invocation.
221///
222/// Contains the current frame, accumulated artifacts from prior stages,
223/// read-only view and temporal snapshots, and stage-level metrics.
224///
225/// All references are valid for the duration of the `process()` call.
226pub struct StageContext<'a> {
227 /// The feed this frame belongs to.
228 pub feed_id: FeedId,
229 /// The current video frame.
230 pub frame: &'a FrameEnvelope,
231 /// Accumulated outputs of all prior stages for this frame.
232 pub artifacts: &'a PerceptionArtifacts,
233 /// View-state snapshot for this frame.
234 pub view: &'a ViewSnapshot,
235 /// Read-only temporal state snapshot.
236 ///
237 /// Provides typed access to track histories, observation windows, and
238 /// view-epoch context. Implemented by `nv_temporal::TemporalStoreSnapshot`.
239 pub temporal: &'a dyn TemporalAccess,
240 /// Performance metrics for this stage.
241 pub metrics: &'a StageMetrics,
242}
243
244/// Output returned by a stage's `process()` method.
245///
246/// Each field is optional — a stage only sets the fields it produces.
247/// The pipeline executor merges this into the [`PerceptionArtifacts`] accumulator.
248///
249/// # Joint-model and direct-track-emitter patterns
250///
251/// A stage is not limited to producing a single artifact type. Common
252/// patterns beyond classical detection:
253///
254/// - **Joint det+track model**: set both `detections` and `tracks` in a
255/// single `StageOutput`. The executor merges both into the accumulator.
256/// Tracks produced this way can carry per-observation metadata via
257/// [`TrackObservation::metadata`](crate::TrackObservation::metadata).
258/// - **Direct track emitter**: set only `tracks` (leave `detections` as
259/// `None`). No intermediate `DetectionSet` is required. Set
260/// `TrackObservation::detection_id` to `None` since there are no
261/// upstream detections to reference.
262/// - **Richer observations**: stages that produce typed artifacts
263/// (embeddings, feature maps, attention weights) can store them in
264/// [`TrackObservation::metadata`](crate::TrackObservation::metadata)
265/// (per-observation) or [`Track::metadata`](crate::Track::metadata)
266/// (per-track), or pass them as `artifacts` (per-frame typed metadata)
267/// for downstream stage consumption.
268#[derive(Clone, Debug, Default)]
269pub struct StageOutput {
270 /// New or updated detection set.
271 ///
272 /// If `Some`, **replaces** the current detection set in the accumulator.
273 /// If `None`, the previous detection set is kept.
274 pub detections: Option<DetectionSet>,
275
276 /// New or updated track set.
277 ///
278 /// `Some(Vec<Track>)` is **authoritative** for this frame: it replaces
279 /// the current track set in the accumulator and marks the output as
280 /// authoritative. Previously-known tracks absent from this set are
281 /// treated as normally ended (`TrackEnded`) by the temporal store.
282 ///
283 /// `None` means this stage does not produce tracks — the previous
284 /// track set is kept and authoritativeness is unchanged.
285 pub tracks: Option<Vec<Track>>,
286
287 /// Derived signals — always **appended** to the accumulator.
288 pub signals: Vec<DerivedSignal>,
289
290 /// Scene-level features — always **appended** to the accumulator.
291 pub scene_features: Vec<SceneFeature>,
292
293 /// Typed artifacts for downstream stages — **merged** (last-writer-wins by `TypeId`).
294 ///
295 /// This is the primary extension seam for inter-stage communication
296 /// beyond the built-in fields. Any `Clone + Send + Sync + 'static`
297 /// value can be stored here by type, and downstream stages retrieve
298 /// it from [`StageContext::artifacts.stage_artifacts`](crate::PerceptionArtifacts::stage_artifacts).
299 ///
300 /// Example uses:
301 /// - Feature maps or embeddings from a detector for a downstream classifier.
302 /// - Prepared multi-frame input tensors for a temporal/clip-based model.
303 /// - Confidence calibration metadata from an upstream stage.
304 pub artifacts: TypedMetadata,
305}
306
307impl StageOutput {
308 /// Create an empty stage output (no detections, tracks, signals, or artifacts).
309 #[must_use]
310 pub fn empty() -> Self {
311 Self::default()
312 }
313
314 /// Create a stage output containing only detections.
315 #[must_use]
316 pub fn with_detections(detections: DetectionSet) -> Self {
317 Self {
318 detections: Some(detections),
319 ..Self::default()
320 }
321 }
322
323 /// Create a stage output containing only tracks.
324 #[must_use]
325 pub fn with_tracks(tracks: Vec<Track>) -> Self {
326 Self {
327 tracks: Some(tracks),
328 ..Self::default()
329 }
330 }
331
332 /// Create a stage output containing only signals.
333 #[must_use]
334 pub fn with_signals(signals: Vec<DerivedSignal>) -> Self {
335 Self {
336 signals,
337 ..Self::default()
338 }
339 }
340
341 /// Create a stage output containing a single signal.
342 #[must_use]
343 pub fn with_signal(signal: DerivedSignal) -> Self {
344 Self {
345 signals: vec![signal],
346 ..Self::default()
347 }
348 }
349
350 /// Create a stage output containing only scene features.
351 #[must_use]
352 pub fn with_scene_features(features: Vec<SceneFeature>) -> Self {
353 Self {
354 scene_features: features,
355 ..Self::default()
356 }
357 }
358
359 /// Create a stage output containing a single typed artifact.
360 ///
361 /// This is useful for stages that produce a single custom artifact
362 /// type for downstream consumption.
363 #[must_use]
364 pub fn with_artifact<T: Clone + Send + Sync + 'static>(value: T) -> Self {
365 let mut artifacts = TypedMetadata::new();
366 artifacts.insert(value);
367 Self {
368 artifacts,
369 ..Self::default()
370 }
371 }
372
373 /// Start building a [`StageOutput`] incrementally.
374 ///
375 /// # Example
376 ///
377 /// ```
378 /// use nv_perception::StageOutput;
379 ///
380 /// let output = StageOutput::build()
381 /// .detections(Default::default())
382 /// .artifact(42_u32)
383 /// .finish();
384 /// ```
385 #[must_use]
386 pub fn build() -> StageOutputBuilder {
387 StageOutputBuilder {
388 inner: Self::default(),
389 }
390 }
391}
392
393/// Incremental builder for [`StageOutput`].
394///
395/// Created via [`StageOutput::build()`]. Each setter returns `self` for chaining.
396pub struct StageOutputBuilder {
397 inner: StageOutput,
398}
399
400impl StageOutputBuilder {
401 /// Set the detection set.
402 #[must_use]
403 pub fn detections(mut self, detections: DetectionSet) -> Self {
404 self.inner.detections = Some(detections);
405 self
406 }
407
408 /// Set the track set.
409 #[must_use]
410 pub fn tracks(mut self, tracks: Vec<Track>) -> Self {
411 self.inner.tracks = Some(tracks);
412 self
413 }
414
415 /// Append a signal.
416 #[must_use]
417 pub fn signal(mut self, signal: DerivedSignal) -> Self {
418 self.inner.signals.push(signal);
419 self
420 }
421
422 /// Append signals.
423 #[must_use]
424 pub fn signals(mut self, signals: Vec<DerivedSignal>) -> Self {
425 self.inner.signals.extend(signals);
426 self
427 }
428
429 /// Append a scene feature.
430 #[must_use]
431 pub fn scene_feature(mut self, feature: SceneFeature) -> Self {
432 self.inner.scene_features.push(feature);
433 self
434 }
435
436 /// Insert a typed artifact.
437 #[must_use]
438 pub fn artifact<T: Clone + Send + Sync + 'static>(mut self, value: T) -> Self {
439 self.inner.artifacts.insert(value);
440 self
441 }
442
443 /// Consume the builder and produce the [`StageOutput`].
444 #[must_use]
445 pub fn finish(self) -> StageOutput {
446 self.inner
447 }
448}
449
450/// The core user-implementable perception trait.
451///
452/// This is the **only** extension point for adding perception logic to the
453/// pipeline. Stages run in a fixed linear order; each stage sees the
454/// accumulated [`PerceptionArtifacts`] from all
455/// prior stages.
456///
457/// All methods take `&mut self`. The executor holds exclusive ownership of each
458/// stage on the feed's dedicated OS thread — stages are never shared across
459/// threads or called concurrently within a feed.
460///
461/// Requires `Send + 'static` (moved onto the feed thread at startup).
462/// Does **not** require `Sync` — stages do not need to be shareable.
463///
464/// # Lifecycle
465///
466/// 1. `on_start()` — called once when the feed starts.
467/// 2. `process()` — called once per frame, in order.
468/// 3. `on_view_epoch_change()` — called when the camera view changes significantly.
469/// 4. `on_stop()` — called once when the feed stops.
470///
471/// # Error handling
472///
473/// - `process()` returning `Err(StageError)` drops that frame and emits a health event.
474/// The feed continues processing subsequent frames.
475/// - `on_start()` returning `Err` prevents the feed from starting.
476/// - A panic in `process()` is caught; the feed restarts per its restart policy.
477///
478/// # Supported model patterns
479///
480/// The library supports multiple perception model patterns through the
481/// same `Stage` trait. A stage may produce *any combination* of output
482/// fields — the pipeline does not enforce a single pattern.
483///
484/// | Pattern | Reads | Writes | Category |
485/// |---|---|---|---|
486/// | Classical detector | frame pixels | `detections` | `FrameAnalysis` |
487/// | Classical tracker | `detections`, temporal | `tracks` | `Association` |
488/// | Joint det+track model | frame pixels, temporal | `detections` + `tracks` | `FrameAnalysis` |
489/// | Direct track emitter | frame pixels | `tracks` (skip detections) | `FrameAnalysis` |
490/// | Classifier / enricher | `detections` or `tracks` | `artifacts` (typed metadata) | `Custom` |
491/// | Scene analysis | frame pixels, temporal | `scene_features`, `signals` | `TemporalAnalysis` |
492///
493/// These are conventions, not enforced constraints. A stage may read and write
494/// any combination of fields.
495pub trait Stage: Send + 'static {
496 /// Unique name for this stage (used in provenance, metrics, and logging).
497 ///
498 /// Must be a compile-time `&'static str`. Each stage in a pipeline should
499 /// have a distinct ID.
500 fn id(&self) -> StageId;
501
502 /// Process one frame.
503 ///
504 /// **Must not block on I/O.** Stages wrapping inference servers should
505 /// manage their own connection pool or async bridge internally.
506 ///
507 /// Called on the feed's dedicated stage-execution thread.
508 fn process(&mut self, ctx: &StageContext<'_>) -> Result<StageOutput, StageError>;
509
510 /// Called once when the feed starts.
511 ///
512 /// Allocate GPU resources, load models, open connections here.
513 /// Returning `Err` prevents the feed from starting.
514 fn on_start(&mut self) -> Result<(), StageError> {
515 Ok(())
516 }
517
518 /// Called once on feed shutdown.
519 ///
520 /// Release resources here. Best-effort — errors are logged but not fatal.
521 fn on_stop(&mut self) -> Result<(), StageError> {
522 Ok(())
523 }
524
525 /// Called when the view epoch changes (significant camera motion detected).
526 ///
527 /// Stages that maintain internal state dependent on camera view
528 /// (background models, local maps, feature caches) should reset or
529 /// adapt here. The `new_epoch` value identifies the new view epoch.
530 fn on_view_epoch_change(&mut self, _new_epoch: ViewEpoch) -> Result<(), StageError> {
531 Ok(())
532 }
533
534 /// Optional category hint for this stage.
535 ///
536 /// Defaults to [`StageCategory::Custom`]. Override to enable
537 /// composition validation and category-aware metrics.
538 fn category(&self) -> StageCategory {
539 StageCategory::Custom
540 }
541
542 /// Declare this stage's input/output capabilities for pipeline validation.
543 ///
544 /// Returns `None` by default, opting the stage out of dependency
545 /// validation. Override to enable
546 /// [`StagePipeline::validate()`](crate::StagePipeline::validate).
547 fn capabilities(&self) -> Option<StageCapabilities> {
548 None
549 }
550}
551
552#[cfg(test)]
553mod tests {
554 use super::*;
555
556 #[test]
557 fn stage_output_with_artifact() {
558 #[derive(Clone, Debug, PartialEq)]
559 struct CustomScore(f64);
560
561 let output = StageOutput::with_artifact(CustomScore(0.42));
562 assert!(output.detections.is_none());
563 assert!(output.tracks.is_none());
564 assert_eq!(
565 output.artifacts.get::<CustomScore>(),
566 Some(&CustomScore(0.42))
567 );
568 }
569
570 #[test]
571 fn stage_output_builder() {
572 #[derive(Clone, Debug, PartialEq)]
573 struct Tag(u32);
574
575 let dets = DetectionSet::empty();
576 let output = StageOutput::build()
577 .detections(dets)
578 .artifact(Tag(7))
579 .finish();
580
581 assert!(output.detections.is_some());
582 assert_eq!(output.artifacts.get::<Tag>(), Some(&Tag(7)));
583 }
584
585 #[test]
586 fn stage_capabilities_builder() {
587 let caps = StageCapabilities::new()
588 .consumes_detections()
589 .produces_tracks();
590
591 assert!(caps.consumes_detections);
592 assert!(!caps.consumes_tracks);
593 assert!(caps.produces_tracks);
594 assert!(!caps.produces_detections);
595 }
596}