Skip to main content

nv_runtime/
feed.rs

1//! Feed configuration and validation.
2
3use std::sync::Arc;
4use std::time::Duration;
5
6use nv_core::config::{CameraMode, ReconnectPolicy, SourceSpec};
7use nv_core::error::{ConfigError, NvError};
8use nv_media::DecodePreference;
9use nv_media::DeviceResidency;
10use nv_media::PostDecodeHook;
11use nv_media::PtzProvider;
12use nv_perception::{Stage, StagePipeline, ValidationMode, validate_pipeline_phased};
13use nv_temporal::RetentionPolicy;
14use nv_view::{EpochPolicy, ViewStateProvider};
15
16use crate::backpressure::BackpressurePolicy;
17use crate::batch::BatchHandle;
18use crate::output::{FrameInclusion, OutputSink, SinkFactory};
19use crate::pipeline::FeedPipeline;
20use crate::shutdown::RestartPolicy;
21use crate::worker::sink::DEFAULT_SINK_SHUTDOWN_TIMEOUT;
22
23/// Configuration for a single video feed.
24///
25/// Constructed via [`FeedConfig::builder()`].
26pub struct FeedConfig {
27    pub(crate) source: SourceSpec,
28    pub(crate) camera_mode: CameraMode,
29    pub(crate) stages: Vec<Box<dyn Stage>>,
30    pub(crate) batch: Option<BatchHandle>,
31    pub(crate) post_batch_stages: Vec<Box<dyn Stage>>,
32    pub(crate) view_state_provider: Option<Box<dyn ViewStateProvider>>,
33    pub(crate) epoch_policy: Box<dyn EpochPolicy>,
34    pub(crate) output_sink: Box<dyn OutputSink>,
35    pub(crate) sink_factory: Option<SinkFactory>,
36    pub(crate) backpressure: BackpressurePolicy,
37    pub(crate) temporal: RetentionPolicy,
38    pub(crate) reconnect: ReconnectPolicy,
39    pub(crate) restart: RestartPolicy,
40    pub(crate) ptz_provider: Option<Arc<dyn PtzProvider>>,
41    pub(crate) frame_inclusion: FrameInclusion,
42    pub(crate) sink_queue_capacity: usize,
43    pub(crate) sink_shutdown_timeout: Duration,
44    pub(crate) decode_preference: DecodePreference,
45    pub(crate) post_decode_hook: Option<PostDecodeHook>,
46    pub(crate) device_residency: DeviceResidency,
47}
48
49/// Builder for [`FeedConfig`].
50///
51/// # Required fields
52///
53/// - `source` — the video source specification.
54/// - `camera_mode` — `Fixed` or `Observed` (no default).
55/// - `stages` — at least one perception stage.
56/// - `output_sink` — where to send processed outputs.
57///
58/// # Validation
59///
60/// `build()` validates:
61/// - `Observed` mode requires a `ViewStateProvider`.
62/// - `Fixed` mode must not have a `ViewStateProvider`.
63pub struct FeedConfigBuilder {
64    source: Option<SourceSpec>,
65    camera_mode: Option<CameraMode>,
66    stages: Option<Vec<Box<dyn Stage>>>,
67    feed_pipeline: Option<FeedPipeline>,
68    view_state_provider: Option<Box<dyn ViewStateProvider>>,
69    epoch_policy: Option<Box<dyn EpochPolicy>>,
70    output_sink: Option<Box<dyn OutputSink>>,
71    sink_factory: Option<SinkFactory>,
72    backpressure: BackpressurePolicy,
73    temporal: RetentionPolicy,
74    reconnect: ReconnectPolicy,
75    restart: RestartPolicy,
76    ptz_provider: Option<Arc<dyn PtzProvider>>,
77    frame_inclusion: FrameInclusion,
78    validation_mode: ValidationMode,
79    sink_queue_capacity: usize,
80    sink_shutdown_timeout: Duration,
81    decode_preference: DecodePreference,
82    post_decode_hook: Option<PostDecodeHook>,
83    device_residency: DeviceResidency,
84}
85
86impl FeedConfig {
87    /// Create a new builder.
88    #[must_use]
89    pub fn builder() -> FeedConfigBuilder {
90        FeedConfigBuilder {
91            source: None,
92            camera_mode: None,
93            stages: None,
94            feed_pipeline: None,
95            view_state_provider: None,
96            epoch_policy: None,
97            output_sink: None,
98            sink_factory: None,
99            backpressure: BackpressurePolicy::default(),
100            temporal: RetentionPolicy::default(),
101            reconnect: ReconnectPolicy::default(),
102            restart: RestartPolicy::default(),
103            ptz_provider: None,
104            frame_inclusion: FrameInclusion::default(),
105            validation_mode: ValidationMode::default(),
106            sink_queue_capacity: 16,
107            sink_shutdown_timeout: DEFAULT_SINK_SHUTDOWN_TIMEOUT,
108            decode_preference: DecodePreference::default(),
109            post_decode_hook: None,
110            device_residency: DeviceResidency::default(),
111        }
112    }
113}
114
115impl FeedConfigBuilder {
116    /// Set the video source.
117    #[must_use]
118    pub fn source(mut self, source: SourceSpec) -> Self {
119        self.source = Some(source);
120        self
121    }
122
123    /// Set the camera mode (`Fixed` or `Observed`). **Required.**
124    #[must_use]
125    pub fn camera_mode(mut self, mode: CameraMode) -> Self {
126        self.camera_mode = Some(mode);
127        self
128    }
129
130    /// Set the ordered list of perception stages.
131    ///
132    /// Mutually exclusive with [`feed_pipeline()`](Self::feed_pipeline).
133    /// For pipelines with a shared batch point, use `feed_pipeline()`.
134    #[must_use]
135    pub fn stages(mut self, stages: Vec<Box<dyn Stage>>) -> Self {
136        self.stages = Some(stages);
137        self
138    }
139
140    /// Set the perception pipeline from a [`StagePipeline`].
141    ///
142    /// This is a convenience alternative to [`stages()`](Self::stages)
143    /// that accepts a pre-composed pipeline.
144    /// Mutually exclusive with [`feed_pipeline()`](Self::feed_pipeline).
145    #[must_use]
146    pub fn pipeline(mut self, pipeline: StagePipeline) -> Self {
147        self.stages = Some(pipeline.into_stages());
148        self
149    }
150
151    /// Set the feed pipeline, optionally including a shared batch point.
152    ///
153    /// This is the recommended way to configure feeds that participate in
154    /// shared batch inference. Use [`FeedPipeline::builder()`] to compose
155    /// pre-batch stages, a batch point, and post-batch stages.
156    ///
157    /// Mutually exclusive with [`stages()`](Self::stages) and
158    /// [`pipeline()`](Self::pipeline).
159    #[must_use]
160    pub fn feed_pipeline(mut self, pipeline: FeedPipeline) -> Self {
161        self.feed_pipeline = Some(pipeline);
162        self
163    }
164
165    /// Set the view state provider (required for `CameraMode::Observed`).
166    #[must_use]
167    pub fn view_state_provider(mut self, provider: Box<dyn ViewStateProvider>) -> Self {
168        self.view_state_provider = Some(provider);
169        self
170    }
171
172    /// Set the epoch policy (optional; defaults to `DefaultEpochPolicy`).
173    #[must_use]
174    pub fn epoch_policy(mut self, policy: Box<dyn EpochPolicy>) -> Self {
175        self.epoch_policy = Some(policy);
176        self
177    }
178
179    /// Set the output sink. **Required.**
180    #[must_use]
181    pub fn output_sink(mut self, sink: Box<dyn OutputSink>) -> Self {
182        self.output_sink = Some(sink);
183        self
184    }
185
186    /// Set an optional sink factory for reconstructing the sink after
187    /// timeout or panic.
188    ///
189    /// Without a factory, a sink that times out during shutdown is
190    /// permanently replaced with a silent no-op. With a factory, the
191    /// next feed restart constructs a fresh sink.
192    #[must_use]
193    pub fn sink_factory(mut self, factory: SinkFactory) -> Self {
194        self.sink_factory = Some(factory);
195        self
196    }
197
198    /// Set the backpressure policy. Default: `DropOldest { queue_depth: 4 }`.
199    #[must_use]
200    pub fn backpressure(mut self, policy: BackpressurePolicy) -> Self {
201        self.backpressure = policy;
202        self
203    }
204
205    /// Set the temporal retention policy.
206    #[must_use]
207    pub fn temporal(mut self, policy: RetentionPolicy) -> Self {
208        self.temporal = policy;
209        self
210    }
211
212    /// Set the reconnection policy.
213    #[must_use]
214    pub fn reconnect(mut self, policy: ReconnectPolicy) -> Self {
215        self.reconnect = policy;
216        self
217    }
218
219    /// Set the restart policy.
220    #[must_use]
221    pub fn restart(mut self, policy: RestartPolicy) -> Self {
222        self.restart = policy;
223        self
224    }
225
226    /// Set an optional PTZ telemetry provider.
227    ///
228    /// When provided, the media backend queries this on every decoded
229    /// frame to attach PTZ telemetry to the frame metadata.
230    #[must_use]
231    pub fn ptz_provider(mut self, provider: Arc<dyn PtzProvider>) -> Self {
232        self.ptz_provider = Some(provider);
233        self
234    }
235
236    /// Set the frame inclusion policy.
237    ///
238    /// - [`FrameInclusion::Never`] (default) — no pixel data in outputs.
239    /// - [`FrameInclusion::Always`] — every output carries a frame.
240    /// - [`FrameInclusion::Sampled { interval }`] — a frame is included
241    ///   every `interval` outputs, reducing host materialization and sink
242    ///   thread cost while keeping perception artifacts at full rate.
243    /// - [`FrameInclusion::TargetFps { target, fallback_interval }`] —
244    ///   resolve the sample interval dynamically from the observed source
245    ///   rate. During a warmup window (first ~30 frames),
246    ///   `fallback_interval` is used. Once the source cadence is
247    ///   estimated from frame timestamps, the interval is computed as
248    ///   `round(source_fps / target)` and locked for the feed's lifetime.
249    ///
250    /// Use [`TargetFps`](FrameInclusion::TargetFps) when the source FPS
251    /// is unknown at config time or varies across feeds. Use
252    /// [`Sampled`](FrameInclusion::Sampled) when the interval is known
253    /// statically.
254    #[must_use]
255    pub fn frame_inclusion(mut self, policy: FrameInclusion) -> Self {
256        self.frame_inclusion = policy;
257        self
258    }
259
260    /// Set the stage capability validation mode.
261    ///
262    /// - [`ValidationMode::Off`] (default) — no validation.
263    /// - [`ValidationMode::Warn`] — log warnings via `tracing::warn!`.
264    /// - [`ValidationMode::Error`] — any warning becomes a build error.
265    #[must_use]
266    pub fn validation_mode(mut self, mode: ValidationMode) -> Self {
267        self.validation_mode = mode;
268        self
269    }
270
271    /// Set the per-feed output sink queue capacity.
272    ///
273    /// This is the bounded channel between the feed worker thread and
274    /// the sink thread. Default: `16`. Must be at least 1.
275    #[must_use]
276    pub fn sink_queue_capacity(mut self, capacity: usize) -> Self {
277        self.sink_queue_capacity = capacity;
278        self
279    }
280
281    /// Set the timeout for joining the sink worker thread during
282    /// shutdown or restart. Default: 5 seconds.
283    #[must_use]
284    pub fn sink_shutdown_timeout(mut self, timeout: Duration) -> Self {
285        self.sink_shutdown_timeout = timeout;
286        self
287    }
288
289    /// Set the decode preference for this feed.
290    ///
291    /// Controls hardware vs. software decoder selection. Default is
292    /// [`DecodePreference::Auto`], which preserves existing behavior.
293    ///
294    /// See [`DecodePreference`] for variant semantics.
295    #[must_use]
296    pub fn decode_preference(mut self, pref: DecodePreference) -> Self {
297        self.decode_preference = pref;
298        self
299    }
300
301    /// Set a post-decode hook that can inject a pipeline element between
302    /// the decoder and the color-space converter.
303    ///
304    /// **Ignored when `device_residency` is `Provider`** — the provider
305    /// controls the full decoder-to-tail path.  Hooks are only evaluated
306    /// for `Host` and `Cuda` residency modes, so this can be set
307    /// unconditionally without conflicting with provider-managed feeds.
308    ///
309    /// See [`PostDecodeHook`] for details and usage examples.
310    #[must_use]
311    pub fn post_decode_hook(mut self, hook: PostDecodeHook) -> Self {
312        self.post_decode_hook = Some(hook);
313        self
314    }
315
316    /// Set the device residency mode for decoded frames.
317    ///
318    /// Controls the pipeline tail strategy:
319    /// - [`DeviceResidency::Host`] (default) — `videoconvert → appsink`
320    /// - [`DeviceResidency::Cuda`] — `cudaupload → cudaconvert → appsink(memory:CUDAMemory)`
321    /// - `DeviceResidency::Provider(p)` — delegates to the provider
322    ///
323    /// See [`DeviceResidency`] for details.
324    #[must_use]
325    pub fn device_residency(mut self, residency: DeviceResidency) -> Self {
326        self.device_residency = residency;
327        self
328    }
329
330    /// Append a single stage to the pipeline.
331    ///
332    /// Convenience alternative to [`stages()`](Self::stages) when
333    /// building one stage at a time.
334    #[must_use]
335    pub fn add_stage(mut self, stage: impl Stage) -> Self {
336        self.stages
337            .get_or_insert_with(Vec::new)
338            .push(Box::new(stage));
339        self
340    }
341
342    /// Append a boxed stage to the pipeline.
343    #[must_use]
344    pub fn add_boxed_stage(mut self, stage: Box<dyn Stage>) -> Self {
345        self.stages.get_or_insert_with(Vec::new).push(stage);
346        self
347    }
348
349    /// Build the feed configuration.
350    ///
351    /// # Errors
352    ///
353    /// - `MissingRequired` if `source`, `camera_mode`, stages (or feed_pipeline), or `output_sink` are not set.
354    /// - `CameraModeConflict` if `Observed` is set without a provider, or `Fixed` with a provider.
355    pub fn build(self) -> Result<FeedConfig, NvError> {
356        let source = self
357            .source
358            .ok_or(ConfigError::MissingRequired { field: "source" })?;
359        let camera_mode = self.camera_mode.ok_or(ConfigError::MissingRequired {
360            field: "camera_mode",
361        })?;
362
363        // Resolve stages: either from feed_pipeline or from stages/pipeline.
364        let (stages, batch, post_batch_stages) = if let Some(fp) = self.feed_pipeline {
365            if self.stages.is_some() {
366                return Err(ConfigError::InvalidPolicy {
367                    detail: "cannot set both stages() and feed_pipeline() — use one or the other"
368                        .into(),
369                }
370                .into());
371            }
372            fp.into_parts()
373        } else {
374            let stages = self.stages.ok_or(ConfigError::MissingRequired {
375                field: "stages (or feed_pipeline)",
376            })?;
377            (stages, None, Vec::new())
378        };
379
380        // At least one stage must exist somewhere in the pipeline.
381        if stages.is_empty() && post_batch_stages.is_empty() && batch.is_none() {
382            return Err(ConfigError::InvalidPolicy {
383                detail: "at least one perception stage or a batch point is required".into(),
384            }
385            .into());
386        }
387
388        let output_sink = self.output_sink.ok_or(ConfigError::MissingRequired {
389            field: "output_sink",
390        })?;
391
392        // Stage capability validation — batch-aware: pre-batch stages
393        // are validated first, then batch processor capabilities update
394        // the availability state, then post-batch stages are validated.
395        let batch_caps = batch.as_ref().and_then(|b| b.capabilities().cloned());
396        let batch_id = batch.as_ref().map(|b| b.processor_id());
397        match self.validation_mode {
398            ValidationMode::Off => {}
399            ValidationMode::Warn => {
400                for w in
401                    validate_pipeline(&stages, batch_caps.as_ref(), batch_id, &post_batch_stages)
402                {
403                    tracing::warn!("stage validation: {w:?}");
404                }
405            }
406            ValidationMode::Error => {
407                let warnings =
408                    validate_pipeline(&stages, batch_caps.as_ref(), batch_id, &post_batch_stages);
409                if !warnings.is_empty() {
410                    let detail = warnings
411                        .iter()
412                        .map(|w| format!("{w:?}"))
413                        .collect::<Vec<_>>()
414                        .join("; ");
415                    return Err(ConfigError::StageValidation { detail }.into());
416                }
417            }
418        }
419
420        // Validate queue depth.
421        if self.backpressure.queue_depth() == 0 {
422            return Err(ConfigError::InvalidCapacity {
423                field: "queue_depth",
424            }
425            .into());
426        }
427
428        // Validate camera mode vs. provider.
429        match camera_mode {
430            CameraMode::Observed if self.view_state_provider.is_none() => {
431                return Err(ConfigError::CameraModeConflict {
432                    detail: "CameraMode::Observed requires a ViewStateProvider".into(),
433                }
434                .into());
435            }
436            CameraMode::Fixed if self.view_state_provider.is_some() => {
437                return Err(ConfigError::CameraModeConflict {
438                    detail: "CameraMode::Fixed must not have a ViewStateProvider".into(),
439                }
440                .into());
441            }
442            _ => {}
443        }
444
445        // Default epoch policy when none is explicitly provided.
446        let epoch_policy = self
447            .epoch_policy
448            .unwrap_or_else(|| Box::new(nv_view::DefaultEpochPolicy::default()));
449
450        Ok(FeedConfig {
451            source,
452            camera_mode,
453            stages,
454            batch,
455            post_batch_stages,
456            view_state_provider: self.view_state_provider,
457            epoch_policy,
458            output_sink,
459            sink_factory: self.sink_factory,
460            backpressure: self.backpressure,
461            temporal: self.temporal,
462            reconnect: self.reconnect,
463            restart: self.restart,
464            ptz_provider: self.ptz_provider,
465            frame_inclusion: self.frame_inclusion,
466            sink_queue_capacity: self.sink_queue_capacity.max(1),
467            sink_shutdown_timeout: self.sink_shutdown_timeout,
468            decode_preference: self.decode_preference,
469            post_decode_hook: self.post_decode_hook,
470            device_residency: self.device_residency,
471        })
472    }
473}
474
475/// Validate pipeline stages accounting for a batch processor between
476/// pre-batch and post-batch stages.
477fn validate_pipeline(
478    pre_batch: &[Box<dyn Stage>],
479    batch_caps: Option<&nv_perception::stage::StageCapabilities>,
480    batch_id: Option<nv_core::id::StageId>,
481    post_batch: &[Box<dyn Stage>],
482) -> Vec<nv_perception::ValidationWarning> {
483    validate_pipeline_phased(pre_batch, batch_caps, batch_id, post_batch)
484}
485
486#[cfg(test)]
487mod tests {
488    use super::*;
489    use nv_core::id::StageId;
490    use nv_perception::stage::StageCapabilities;
491    use nv_perception::{Stage, StageContext, StageOutput, ValidationWarning};
492
493    struct CapStage {
494        id: &'static str,
495        caps: StageCapabilities,
496    }
497    impl Stage for CapStage {
498        fn id(&self) -> StageId {
499            StageId(self.id)
500        }
501        fn process(
502            &mut self,
503            _: &StageContext<'_>,
504        ) -> Result<StageOutput, nv_core::error::StageError> {
505            Ok(StageOutput::empty())
506        }
507        fn capabilities(&self) -> Option<StageCapabilities> {
508            Some(self.caps)
509        }
510    }
511
512    #[test]
513    fn batch_consumes_validated_against_pre_batch() {
514        let pre: Vec<Box<dyn Stage>> = vec![];
515        let caps = StageCapabilities::new()
516            .consumes_detections()
517            .produces_tracks();
518        let batch_id = StageId("detector");
519        let post: Vec<Box<dyn Stage>> = vec![];
520
521        let warnings = validate_pipeline(&pre, Some(&caps), Some(batch_id), &post);
522        assert!(
523            warnings.iter().any(|w| matches!(
524                w,
525                ValidationWarning::UnsatisfiedDependency { stage_id, missing: "detections" }
526                if *stage_id == StageId("detector")
527            )),
528            "expected UnsatisfiedDependency for detections, got: {warnings:?}"
529        );
530    }
531
532    #[test]
533    fn batch_consumes_satisfied_by_pre_batch() {
534        let pre: Vec<Box<dyn Stage>> = vec![Box::new(CapStage {
535            id: "det_stage",
536            caps: StageCapabilities::new().produces_detections(),
537        })];
538        let caps = StageCapabilities::new()
539            .consumes_detections()
540            .produces_tracks();
541        let batch_id = StageId("tracker");
542        let post: Vec<Box<dyn Stage>> = vec![];
543
544        let warnings = validate_pipeline(&pre, Some(&caps), Some(batch_id), &post);
545        assert!(
546            !warnings
547                .iter()
548                .any(|w| matches!(w, ValidationWarning::UnsatisfiedDependency { .. })),
549            "no unsatisfied dependencies expected, got: {warnings:?}"
550        );
551    }
552
553    #[test]
554    fn batch_id_collision_detected() {
555        let pre: Vec<Box<dyn Stage>> = vec![Box::new(CapStage {
556            id: "detector",
557            caps: StageCapabilities::new().produces_detections(),
558        })];
559        let batch_id = StageId("detector"); // same as pre-batch stage
560        let post: Vec<Box<dyn Stage>> = vec![];
561
562        let warnings = validate_pipeline(&pre, None, Some(batch_id), &post);
563        assert!(
564            warnings.iter().any(|w| matches!(
565                w,
566                ValidationWarning::DuplicateStageId { stage_id } if *stage_id == StageId("detector")
567            )),
568            "expected DuplicateStageId, got: {warnings:?}"
569        );
570    }
571}