Skip to main content

nv_perception/
stage.rs

1//! The [`Stage`] trait — the core user-implementable perception contract.
2//!
3//! Stages are the primary extension point for adding perception capabilities
4//! to the pipeline. Each stage processes one frame at a time and produces
5//! structured output.
6//!
7//! # Design intent: one trait, composed pipelines
8//!
9//! The library uses a **single** `Stage` trait for all stage types —
10//! detection, tracking, classification, scene analysis, etc.
11//! The abstraction stays minimal, and specialization happens by
12//! convention (which fields a stage populates), not by type hierarchy.
13//!
14//! Instead, the pipeline _composes_ stages linearly: earlier stages write
15//! fields into [`StageOutput`] that later stages read from
16//! [`StageContext::artifacts`]. Specialization happens by convention (which
17//! fields a stage populates), not by type hierarchy.
18//!
19//! # Supported model patterns
20//!
21//! The single-trait design naturally supports several model architectures:
22//!
23//! - **Classical detector → tracker**: a `FrameAnalysis` stage produces
24//!   detections, then an `Association` stage consumes them and produces tracks.
25//! - **Joint detection+tracking**: a single `FrameAnalysis` stage sets both
26//!   `detections` and `tracks` in its [`StageOutput`]. No separate tracker
27//!   stage is needed.
28//! - **Direct track emitter**: a stage produces `tracks` without intermediate
29//!   detections. Set `detection_id` to `None` on each `TrackObservation`.
30//! - **Richer observations**: per-observation metadata (embeddings, features,
31//!   model-specific scores) is stored in `TrackObservation::metadata`.
32//!   Per-track metadata goes in [`Track::metadata`]. Per-frame shared data
33//!   goes in [`StageOutput::artifacts`].
34//!
35//! # What a stage should do
36//!
37//! - Process a single frame and return structured results.
38//! - Read upstream artifacts from [`StageContext::artifacts`].
39//! - Read temporal history from [`StageContext::temporal`].
40//! - Populate only the [`StageOutput`] fields it owns.
41//! - Remain stateless across feeds — internal state is per-feed.
42//!
43//! # What a stage should NOT do
44//!
45//! - Block on network I/O (manage async bridges internally).
46//! - Mutate shared global state.
47//! - Produce side-channel output bypassing [`StageOutput`].
48//! - Depend on stage execution order beyond what upstream artifacts provide.
49//! - Accumulate unbounded internal state (use the temporal store instead).
50
51use crate::artifact::PerceptionArtifacts;
52use crate::detection::DetectionSet;
53use crate::scene::SceneFeature;
54use crate::signal::DerivedSignal;
55use crate::temporal_access::TemporalAccess;
56use crate::track::Track;
57use nv_core::TypedMetadata;
58use nv_core::error::StageError;
59use nv_core::id::{FeedId, StageId};
60use nv_core::metrics::StageMetrics;
61use nv_frame::FrameEnvelope;
62use nv_view::{ViewEpoch, ViewSnapshot};
63
64/// Optional category hint for a perception stage.
65///
66/// Does not affect execution order or behavior — the pipeline executor
67/// treats all stages uniformly. Categories serve as:
68///
69/// - **Documentation** — makes pipeline composition self-describing.
70/// - **Metrics grouping** — category-aware dashboards and provenance.
71/// - **Composition validation** — pipeline builders can warn about
72///   unusual orderings (e.g., a tracker before a detector).
73///
74/// A stage may produce **any combination** of output fields regardless of
75/// its declared category. Categories describe the stage's *primary role*,
76/// not a hard constraint on what it can output. For example, a joint
77/// detection+tracking model that reads pixels and produces both detections
78/// and tracks in one forward pass is best categorized as `FrameAnalysis`.
79///
80/// Stages report their category via [`Stage::category()`], which defaults
81/// to [`StageCategory::Custom`].
82#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
83pub enum StageCategory {
84    /// Reads frame pixel data, produces perception artifacts.
85    ///
86    /// This is the natural category for any model that takes raw pixels
87    /// as primary input, regardless of what it produces:
88    ///
89    /// - Classical object detectors (outputs: detections)
90    /// - Joint detection+tracking models (outputs: detections + tracks)
91    /// - Direct track-emitting models (outputs: tracks only)
92    /// - Feature extractors, scene classifiers (outputs: scene features)
93    /// - Background subtractors (outputs: detections or scene features)
94    FrameAnalysis,
95    /// Reads upstream artifacts (detections, features) and produces tracks.
96    ///
97    /// Use this for stages whose primary role is *association* — matching
98    /// observations across time — when the tracking logic is separate from
99    /// the pixel-reading model.
100    ///
101    /// Examples: multi-object tracker, re-identification matcher.
102    Association,
103    /// Reads temporal state and accumulated artifacts, produces derived
104    /// signals or scene-level features.
105    ///
106    /// Examples: trajectory analyzer, anomaly detector, dwell-time estimator.
107    TemporalAnalysis,
108    /// Reads accumulated artifacts and performs side-effect output.
109    ///
110    /// Returns empty [`StageOutput`] — does not modify the artifact accumulator.
111    /// Examples: structured logger, metric exporter, event publisher.
112    Sink,
113    /// Uncategorized or multi-purpose stage.
114    Custom,
115}
116
117/// Declares what artifact types a stage produces and consumes.
118///
119/// Used by [`StagePipeline::validate()`](crate::StagePipeline::validate) to
120/// detect unsatisfied dependencies (e.g., a tracker that consumes detections
121/// placed before the detector that produces them).
122///
123/// Stages report capabilities via [`Stage::capabilities()`]. The default
124/// implementation returns `None`, meaning the stage opts out of validation.
125///
126/// # Validated fields
127///
128/// The validator checks:
129/// - `consumes_detections` / `produces_detections`
130/// - `consumes_tracks` / `produces_tracks`
131///
132/// The remaining fields (`consumes_temporal`, `produces_signals`,
133/// `produces_scene_features`) are informational — available
134/// for external tooling but not enforced by the built-in validator.
135///
136/// # Example
137///
138/// ```
139/// use nv_perception::StageCapabilities;
140///
141/// let caps = StageCapabilities::new()
142///     .consumes_detections()
143///     .produces_tracks();
144/// ```
145#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default)]
146pub struct StageCapabilities {
147    /// Stage reads detections from the artifact accumulator.
148    pub consumes_detections: bool,
149    /// Stage reads tracks from the artifact accumulator.
150    pub consumes_tracks: bool,
151    /// Stage reads temporal state.
152    pub consumes_temporal: bool,
153    /// Stage produces detections.
154    pub produces_detections: bool,
155    /// Stage produces tracks.
156    pub produces_tracks: bool,
157    /// Stage produces signals.
158    pub produces_signals: bool,
159    /// Stage produces scene features.
160    pub produces_scene_features: bool,
161}
162
163impl StageCapabilities {
164    /// Create empty capabilities (nothing consumed or produced).
165    #[must_use]
166    pub fn new() -> Self {
167        Self::default()
168    }
169
170    /// Mark this stage as consuming detections.
171    #[must_use]
172    pub fn consumes_detections(mut self) -> Self {
173        self.consumes_detections = true;
174        self
175    }
176
177    /// Mark this stage as consuming tracks.
178    #[must_use]
179    pub fn consumes_tracks(mut self) -> Self {
180        self.consumes_tracks = true;
181        self
182    }
183
184    /// Mark this stage as consuming temporal state.
185    #[must_use]
186    pub fn consumes_temporal(mut self) -> Self {
187        self.consumes_temporal = true;
188        self
189    }
190
191    /// Mark this stage as producing detections.
192    #[must_use]
193    pub fn produces_detections(mut self) -> Self {
194        self.produces_detections = true;
195        self
196    }
197
198    /// Mark this stage as producing tracks.
199    #[must_use]
200    pub fn produces_tracks(mut self) -> Self {
201        self.produces_tracks = true;
202        self
203    }
204
205    /// Mark this stage as producing signals.
206    #[must_use]
207    pub fn produces_signals(mut self) -> Self {
208        self.produces_signals = true;
209        self
210    }
211
212    /// Mark this stage as producing scene features.
213    #[must_use]
214    pub fn produces_scene_features(mut self) -> Self {
215        self.produces_scene_features = true;
216        self
217    }
218}
219
220/// Context provided to every stage invocation.
221///
222/// Contains the current frame, accumulated artifacts from prior stages,
223/// read-only view and temporal snapshots,  and stage-level metrics.
224///
225/// All references are valid for the duration of the `process()` call.
226pub struct StageContext<'a> {
227    /// The feed this frame belongs to.
228    pub feed_id: FeedId,
229    /// The current video frame.
230    pub frame: &'a FrameEnvelope,
231    /// Accumulated outputs of all prior stages for this frame.
232    pub artifacts: &'a PerceptionArtifacts,
233    /// View-state snapshot for this frame.
234    pub view: &'a ViewSnapshot,
235    /// Read-only temporal state snapshot.
236    ///
237    /// Provides typed access to track histories, observation windows, and
238    /// view-epoch context. Implemented by `nv_temporal::TemporalStoreSnapshot`.
239    pub temporal: &'a dyn TemporalAccess,
240    /// Performance metrics for this stage.
241    pub metrics: &'a StageMetrics,
242}
243
244/// Output returned by a stage's `process()` method.
245///
246/// Each field is optional — a stage only sets the fields it produces.
247/// The pipeline executor merges this into the [`PerceptionArtifacts`] accumulator.
248///
249/// # Joint-model and direct-track-emitter patterns
250///
251/// A stage is not limited to producing a single artifact type. Common
252/// patterns beyond classical detection:
253///
254/// - **Joint det+track model**: set both `detections` and `tracks` in a
255///   single `StageOutput`. The executor merges both into the accumulator.
256///   Tracks produced this way can carry per-observation metadata via
257///   [`TrackObservation::metadata`](crate::TrackObservation::metadata).
258/// - **Direct track emitter**: set only `tracks` (leave `detections` as
259///   `None`). No intermediate `DetectionSet` is required. Set
260///   `TrackObservation::detection_id` to `None` since there are no
261///   upstream detections to reference.
262/// - **Richer observations**: stages that produce typed artifacts
263///   (embeddings, feature maps, attention weights) can store them in
264///   [`TrackObservation::metadata`](crate::TrackObservation::metadata)
265///   (per-observation) or [`Track::metadata`](crate::Track::metadata)
266///   (per-track), or pass them as `artifacts` (per-frame typed metadata)
267///   for downstream stage consumption.
268#[derive(Clone, Debug, Default)]
269pub struct StageOutput {
270    /// New or updated detection set.
271    ///
272    /// If `Some`, **replaces** the current detection set in the accumulator.
273    /// If `None`, the previous detection set is kept.
274    pub detections: Option<DetectionSet>,
275
276    /// New or updated track set.
277    ///
278    /// `Some(Vec<Track>)` is **authoritative** for this frame: it replaces
279    /// the current track set in the accumulator and marks the output as
280    /// authoritative. Previously-known tracks absent from this set are
281    /// treated as normally ended (`TrackEnded`) by the temporal store.
282    ///
283    /// `None` means this stage does not produce tracks — the previous
284    /// track set is kept and authoritativeness is unchanged.
285    pub tracks: Option<Vec<Track>>,
286
287    /// Derived signals — always **appended** to the accumulator.
288    pub signals: Vec<DerivedSignal>,
289
290    /// Scene-level features — always **appended** to the accumulator.
291    pub scene_features: Vec<SceneFeature>,
292
293    /// Typed artifacts for downstream stages — **merged** (last-writer-wins by `TypeId`).
294    ///
295    /// This is the primary extension seam for inter-stage communication
296    /// beyond the built-in fields. Any `Clone + Send + Sync + 'static`
297    /// value can be stored here by type, and downstream stages retrieve
298    /// it from [`StageContext::artifacts.stage_artifacts`](crate::PerceptionArtifacts::stage_artifacts).
299    ///
300    /// Example uses:
301    /// - Feature maps or embeddings from a detector for a downstream classifier.
302    /// - Prepared multi-frame input tensors for a temporal/clip-based model.
303    /// - Confidence calibration metadata from an upstream stage.
304    pub artifacts: TypedMetadata,
305}
306
307impl StageOutput {
308    /// Create an empty stage output (no detections, tracks, signals, or artifacts).
309    #[must_use]
310    pub fn empty() -> Self {
311        Self::default()
312    }
313
314    /// Create a stage output containing only detections.
315    #[must_use]
316    pub fn with_detections(detections: DetectionSet) -> Self {
317        Self {
318            detections: Some(detections),
319            ..Self::default()
320        }
321    }
322
323    /// Create a stage output containing only tracks.
324    #[must_use]
325    pub fn with_tracks(tracks: Vec<Track>) -> Self {
326        Self {
327            tracks: Some(tracks),
328            ..Self::default()
329        }
330    }
331
332    /// Create a stage output containing only signals.
333    #[must_use]
334    pub fn with_signals(signals: Vec<DerivedSignal>) -> Self {
335        Self {
336            signals,
337            ..Self::default()
338        }
339    }
340
341    /// Create a stage output containing a single signal.
342    #[must_use]
343    pub fn with_signal(signal: DerivedSignal) -> Self {
344        Self {
345            signals: vec![signal],
346            ..Self::default()
347        }
348    }
349
350    /// Create a stage output containing only scene features.
351    #[must_use]
352    pub fn with_scene_features(features: Vec<SceneFeature>) -> Self {
353        Self {
354            scene_features: features,
355            ..Self::default()
356        }
357    }
358
359    /// Create a stage output containing a single typed artifact.
360    ///
361    /// This is useful for stages that produce a single custom artifact
362    /// type for downstream consumption.
363    #[must_use]
364    pub fn with_artifact<T: Clone + Send + Sync + 'static>(value: T) -> Self {
365        let mut artifacts = TypedMetadata::new();
366        artifacts.insert(value);
367        Self {
368            artifacts,
369            ..Self::default()
370        }
371    }
372
373    /// Start building a [`StageOutput`] incrementally.
374    ///
375    /// # Example
376    ///
377    /// ```
378    /// use nv_perception::StageOutput;
379    ///
380    /// let output = StageOutput::build()
381    ///     .detections(Default::default())
382    ///     .artifact(42_u32)
383    ///     .finish();
384    /// ```
385    #[must_use]
386    pub fn build() -> StageOutputBuilder {
387        StageOutputBuilder {
388            inner: Self::default(),
389        }
390    }
391}
392
393/// Incremental builder for [`StageOutput`].
394///
395/// Created via [`StageOutput::build()`]. Each setter returns `self` for chaining.
396pub struct StageOutputBuilder {
397    inner: StageOutput,
398}
399
400impl StageOutputBuilder {
401    /// Set the detection set.
402    #[must_use]
403    pub fn detections(mut self, detections: DetectionSet) -> Self {
404        self.inner.detections = Some(detections);
405        self
406    }
407
408    /// Set the track set.
409    #[must_use]
410    pub fn tracks(mut self, tracks: Vec<Track>) -> Self {
411        self.inner.tracks = Some(tracks);
412        self
413    }
414
415    /// Append a signal.
416    #[must_use]
417    pub fn signal(mut self, signal: DerivedSignal) -> Self {
418        self.inner.signals.push(signal);
419        self
420    }
421
422    /// Append signals.
423    #[must_use]
424    pub fn signals(mut self, signals: Vec<DerivedSignal>) -> Self {
425        self.inner.signals.extend(signals);
426        self
427    }
428
429    /// Append a scene feature.
430    #[must_use]
431    pub fn scene_feature(mut self, feature: SceneFeature) -> Self {
432        self.inner.scene_features.push(feature);
433        self
434    }
435
436    /// Insert a typed artifact.
437    #[must_use]
438    pub fn artifact<T: Clone + Send + Sync + 'static>(mut self, value: T) -> Self {
439        self.inner.artifacts.insert(value);
440        self
441    }
442
443    /// Consume the builder and produce the [`StageOutput`].
444    #[must_use]
445    pub fn finish(self) -> StageOutput {
446        self.inner
447    }
448}
449
450/// The core user-implementable perception trait.
451///
452/// This is the **only** extension point for adding perception logic to the
453/// pipeline. Stages run in a fixed linear order; each stage sees the
454/// accumulated [`PerceptionArtifacts`] from all
455/// prior stages.
456///
457/// All methods take `&mut self`. The executor holds exclusive ownership of each
458/// stage on the feed's dedicated OS thread — stages are never shared across
459/// threads or called concurrently within a feed.
460///
461/// Requires `Send + 'static` (moved onto the feed thread at startup).
462/// Does **not** require `Sync` — stages do not need to be shareable.
463///
464/// # Lifecycle
465///
466/// 1. `on_start()` — called once when the feed starts.
467/// 2. `process()` — called once per frame, in order.
468/// 3. `on_view_epoch_change()` — called when the camera view changes significantly.
469/// 4. `on_stop()` — called once when the feed stops.
470///
471/// # Error handling
472///
473/// - `process()` returning `Err(StageError)` drops that frame and emits a health event.
474///   The feed continues processing subsequent frames.
475/// - `on_start()` returning `Err` prevents the feed from starting.
476/// - A panic in `process()` is caught; the feed restarts per its restart policy.
477///
478/// # Supported model patterns
479///
480/// The library supports multiple perception model patterns through the
481/// same `Stage` trait. A stage may produce *any combination* of output
482/// fields — the pipeline does not enforce a single pattern.
483///
484/// | Pattern | Reads | Writes | Category |
485/// |---|---|---|---|
486/// | Classical detector | frame pixels | `detections` | `FrameAnalysis` |
487/// | Classical tracker | `detections`, temporal | `tracks` | `Association` |
488/// | Joint det+track model | frame pixels, temporal | `detections` + `tracks` | `FrameAnalysis` |
489/// | Direct track emitter | frame pixels | `tracks` (skip detections) | `FrameAnalysis` |
490/// | Classifier / enricher | `detections` or `tracks` | `artifacts` (typed metadata) | `Custom` |
491/// | Scene analysis | frame pixels, temporal | `scene_features`, `signals` | `TemporalAnalysis` |
492///
493/// These are conventions, not enforced constraints. A stage may read and write
494/// any combination of fields.
495pub trait Stage: Send + 'static {
496    /// Unique name for this stage (used in provenance, metrics, and logging).
497    ///
498    /// Must be a compile-time `&'static str`. Each stage in a pipeline should
499    /// have a distinct ID.
500    fn id(&self) -> StageId;
501
502    /// Process one frame.
503    ///
504    /// **Must not block on I/O.** Stages wrapping inference servers should
505    /// manage their own connection pool or async bridge internally.
506    ///
507    /// Called on the feed's dedicated stage-execution thread.
508    fn process(&mut self, ctx: &StageContext<'_>) -> Result<StageOutput, StageError>;
509
510    /// Called once when the feed starts.
511    ///
512    /// Allocate GPU resources, load models, open connections here.
513    /// Returning `Err` prevents the feed from starting.
514    fn on_start(&mut self) -> Result<(), StageError> {
515        Ok(())
516    }
517
518    /// Called once on feed shutdown.
519    ///
520    /// Release resources here. Best-effort — errors are logged but not fatal.
521    fn on_stop(&mut self) -> Result<(), StageError> {
522        Ok(())
523    }
524
525    /// Called when the view epoch changes (significant camera motion detected).
526    ///
527    /// Stages that maintain internal state dependent on camera view
528    /// (background models, local maps, feature caches) should reset or
529    /// adapt here. The `new_epoch` value identifies the new view epoch.
530    fn on_view_epoch_change(&mut self, _new_epoch: ViewEpoch) -> Result<(), StageError> {
531        Ok(())
532    }
533
534    /// Optional category hint for this stage.
535    ///
536    /// Defaults to [`StageCategory::Custom`]. Override to enable
537    /// composition validation and category-aware metrics.
538    fn category(&self) -> StageCategory {
539        StageCategory::Custom
540    }
541
542    /// Declare this stage's input/output capabilities for pipeline validation.
543    ///
544    /// Returns `None` by default, opting the stage out of dependency
545    /// validation. Override to enable
546    /// [`StagePipeline::validate()`](crate::StagePipeline::validate).
547    fn capabilities(&self) -> Option<StageCapabilities> {
548        None
549    }
550}
551
552#[cfg(test)]
553mod tests {
554    use super::*;
555
556    #[test]
557    fn stage_output_with_artifact() {
558        #[derive(Clone, Debug, PartialEq)]
559        struct CustomScore(f64);
560
561        let output = StageOutput::with_artifact(CustomScore(0.42));
562        assert!(output.detections.is_none());
563        assert!(output.tracks.is_none());
564        assert_eq!(
565            output.artifacts.get::<CustomScore>(),
566            Some(&CustomScore(0.42))
567        );
568    }
569
570    #[test]
571    fn stage_output_builder() {
572        #[derive(Clone, Debug, PartialEq)]
573        struct Tag(u32);
574
575        let dets = DetectionSet::empty();
576        let output = StageOutput::build()
577            .detections(dets)
578            .artifact(Tag(7))
579            .finish();
580
581        assert!(output.detections.is_some());
582        assert_eq!(output.artifacts.get::<Tag>(), Some(&Tag(7)));
583    }
584
585    #[test]
586    fn stage_capabilities_builder() {
587        let caps = StageCapabilities::new()
588            .consumes_detections()
589            .produces_tracks();
590
591        assert!(caps.consumes_detections);
592        assert!(!caps.consumes_tracks);
593        assert!(caps.produces_tracks);
594        assert!(!caps.produces_detections);
595    }
596}