1use std::sync::Arc;
4use std::time::Duration;
5
6use nv_core::config::{CameraMode, ReconnectPolicy, SourceSpec};
7use nv_core::error::{ConfigError, NvError};
8use nv_media::DecodePreference;
9use nv_media::DeviceResidency;
10use nv_media::PostDecodeHook;
11use nv_media::PtzProvider;
12use nv_perception::{Stage, StagePipeline, ValidationMode, validate_pipeline_phased};
13use nv_temporal::RetentionPolicy;
14use nv_view::{EpochPolicy, ViewStateProvider};
15
16use crate::backpressure::BackpressurePolicy;
17use crate::batch::BatchHandle;
18use crate::output::{FrameInclusion, OutputSink, SinkFactory};
19use crate::pipeline::FeedPipeline;
20use crate::shutdown::RestartPolicy;
21use crate::worker::sink::DEFAULT_SINK_SHUTDOWN_TIMEOUT;
22
23pub struct FeedConfig {
27 pub(crate) source: SourceSpec,
28 pub(crate) camera_mode: CameraMode,
29 pub(crate) stages: Vec<Box<dyn Stage>>,
30 pub(crate) batch: Option<BatchHandle>,
31 pub(crate) post_batch_stages: Vec<Box<dyn Stage>>,
32 pub(crate) view_state_provider: Option<Box<dyn ViewStateProvider>>,
33 pub(crate) epoch_policy: Box<dyn EpochPolicy>,
34 pub(crate) output_sink: Box<dyn OutputSink>,
35 pub(crate) sink_factory: Option<SinkFactory>,
36 pub(crate) backpressure: BackpressurePolicy,
37 pub(crate) temporal: RetentionPolicy,
38 pub(crate) reconnect: ReconnectPolicy,
39 pub(crate) restart: RestartPolicy,
40 pub(crate) ptz_provider: Option<Arc<dyn PtzProvider>>,
41 pub(crate) frame_inclusion: FrameInclusion,
42 pub(crate) sink_queue_capacity: usize,
43 pub(crate) sink_shutdown_timeout: Duration,
44 pub(crate) decode_preference: DecodePreference,
45 pub(crate) post_decode_hook: Option<PostDecodeHook>,
46 pub(crate) device_residency: DeviceResidency,
47}
48
49pub struct FeedConfigBuilder {
64 source: Option<SourceSpec>,
65 camera_mode: Option<CameraMode>,
66 stages: Option<Vec<Box<dyn Stage>>>,
67 feed_pipeline: Option<FeedPipeline>,
68 view_state_provider: Option<Box<dyn ViewStateProvider>>,
69 epoch_policy: Option<Box<dyn EpochPolicy>>,
70 output_sink: Option<Box<dyn OutputSink>>,
71 sink_factory: Option<SinkFactory>,
72 backpressure: BackpressurePolicy,
73 temporal: RetentionPolicy,
74 reconnect: ReconnectPolicy,
75 restart: RestartPolicy,
76 ptz_provider: Option<Arc<dyn PtzProvider>>,
77 frame_inclusion: FrameInclusion,
78 validation_mode: ValidationMode,
79 sink_queue_capacity: usize,
80 sink_shutdown_timeout: Duration,
81 decode_preference: DecodePreference,
82 post_decode_hook: Option<PostDecodeHook>,
83 device_residency: DeviceResidency,
84}
85
86impl FeedConfig {
87 #[must_use]
89 pub fn builder() -> FeedConfigBuilder {
90 FeedConfigBuilder {
91 source: None,
92 camera_mode: None,
93 stages: None,
94 feed_pipeline: None,
95 view_state_provider: None,
96 epoch_policy: None,
97 output_sink: None,
98 sink_factory: None,
99 backpressure: BackpressurePolicy::default(),
100 temporal: RetentionPolicy::default(),
101 reconnect: ReconnectPolicy::default(),
102 restart: RestartPolicy::default(),
103 ptz_provider: None,
104 frame_inclusion: FrameInclusion::default(),
105 validation_mode: ValidationMode::default(),
106 sink_queue_capacity: 16,
107 sink_shutdown_timeout: DEFAULT_SINK_SHUTDOWN_TIMEOUT,
108 decode_preference: DecodePreference::default(),
109 post_decode_hook: None,
110 device_residency: DeviceResidency::default(),
111 }
112 }
113}
114
115impl FeedConfigBuilder {
116 #[must_use]
118 pub fn source(mut self, source: SourceSpec) -> Self {
119 self.source = Some(source);
120 self
121 }
122
123 #[must_use]
125 pub fn camera_mode(mut self, mode: CameraMode) -> Self {
126 self.camera_mode = Some(mode);
127 self
128 }
129
130 #[must_use]
135 pub fn stages(mut self, stages: Vec<Box<dyn Stage>>) -> Self {
136 self.stages = Some(stages);
137 self
138 }
139
140 #[must_use]
146 pub fn pipeline(mut self, pipeline: StagePipeline) -> Self {
147 self.stages = Some(pipeline.into_stages());
148 self
149 }
150
151 #[must_use]
160 pub fn feed_pipeline(mut self, pipeline: FeedPipeline) -> Self {
161 self.feed_pipeline = Some(pipeline);
162 self
163 }
164
165 #[must_use]
167 pub fn view_state_provider(mut self, provider: Box<dyn ViewStateProvider>) -> Self {
168 self.view_state_provider = Some(provider);
169 self
170 }
171
172 #[must_use]
174 pub fn epoch_policy(mut self, policy: Box<dyn EpochPolicy>) -> Self {
175 self.epoch_policy = Some(policy);
176 self
177 }
178
179 #[must_use]
181 pub fn output_sink(mut self, sink: Box<dyn OutputSink>) -> Self {
182 self.output_sink = Some(sink);
183 self
184 }
185
186 #[must_use]
193 pub fn sink_factory(mut self, factory: SinkFactory) -> Self {
194 self.sink_factory = Some(factory);
195 self
196 }
197
198 #[must_use]
200 pub fn backpressure(mut self, policy: BackpressurePolicy) -> Self {
201 self.backpressure = policy;
202 self
203 }
204
205 #[must_use]
207 pub fn temporal(mut self, policy: RetentionPolicy) -> Self {
208 self.temporal = policy;
209 self
210 }
211
212 #[must_use]
214 pub fn reconnect(mut self, policy: ReconnectPolicy) -> Self {
215 self.reconnect = policy;
216 self
217 }
218
219 #[must_use]
221 pub fn restart(mut self, policy: RestartPolicy) -> Self {
222 self.restart = policy;
223 self
224 }
225
226 #[must_use]
231 pub fn ptz_provider(mut self, provider: Arc<dyn PtzProvider>) -> Self {
232 self.ptz_provider = Some(provider);
233 self
234 }
235
236 #[must_use]
255 pub fn frame_inclusion(mut self, policy: FrameInclusion) -> Self {
256 self.frame_inclusion = policy;
257 self
258 }
259
260 #[must_use]
266 pub fn validation_mode(mut self, mode: ValidationMode) -> Self {
267 self.validation_mode = mode;
268 self
269 }
270
271 #[must_use]
276 pub fn sink_queue_capacity(mut self, capacity: usize) -> Self {
277 self.sink_queue_capacity = capacity;
278 self
279 }
280
281 #[must_use]
284 pub fn sink_shutdown_timeout(mut self, timeout: Duration) -> Self {
285 self.sink_shutdown_timeout = timeout;
286 self
287 }
288
289 #[must_use]
296 pub fn decode_preference(mut self, pref: DecodePreference) -> Self {
297 self.decode_preference = pref;
298 self
299 }
300
301 #[must_use]
311 pub fn post_decode_hook(mut self, hook: PostDecodeHook) -> Self {
312 self.post_decode_hook = Some(hook);
313 self
314 }
315
316 #[must_use]
325 pub fn device_residency(mut self, residency: DeviceResidency) -> Self {
326 self.device_residency = residency;
327 self
328 }
329
330 #[must_use]
335 pub fn add_stage(mut self, stage: impl Stage) -> Self {
336 self.stages
337 .get_or_insert_with(Vec::new)
338 .push(Box::new(stage));
339 self
340 }
341
342 #[must_use]
344 pub fn add_boxed_stage(mut self, stage: Box<dyn Stage>) -> Self {
345 self.stages.get_or_insert_with(Vec::new).push(stage);
346 self
347 }
348
349 pub fn build(self) -> Result<FeedConfig, NvError> {
356 let source = self
357 .source
358 .ok_or(ConfigError::MissingRequired { field: "source" })?;
359 let camera_mode = self.camera_mode.ok_or(ConfigError::MissingRequired {
360 field: "camera_mode",
361 })?;
362
363 let (stages, batch, post_batch_stages) = if let Some(fp) = self.feed_pipeline {
365 if self.stages.is_some() {
366 return Err(ConfigError::InvalidPolicy {
367 detail: "cannot set both stages() and feed_pipeline() — use one or the other"
368 .into(),
369 }
370 .into());
371 }
372 fp.into_parts()
373 } else {
374 let stages = self.stages.ok_or(ConfigError::MissingRequired {
375 field: "stages (or feed_pipeline)",
376 })?;
377 (stages, None, Vec::new())
378 };
379
380 if stages.is_empty() && post_batch_stages.is_empty() && batch.is_none() {
382 return Err(ConfigError::InvalidPolicy {
383 detail: "at least one perception stage or a batch point is required".into(),
384 }
385 .into());
386 }
387
388 let output_sink = self.output_sink.ok_or(ConfigError::MissingRequired {
389 field: "output_sink",
390 })?;
391
392 let batch_caps = batch.as_ref().and_then(|b| b.capabilities().cloned());
396 let batch_id = batch.as_ref().map(|b| b.processor_id());
397 match self.validation_mode {
398 ValidationMode::Off => {}
399 ValidationMode::Warn => {
400 for w in
401 validate_pipeline(&stages, batch_caps.as_ref(), batch_id, &post_batch_stages)
402 {
403 tracing::warn!("stage validation: {w:?}");
404 }
405 }
406 ValidationMode::Error => {
407 let warnings =
408 validate_pipeline(&stages, batch_caps.as_ref(), batch_id, &post_batch_stages);
409 if !warnings.is_empty() {
410 let detail = warnings
411 .iter()
412 .map(|w| format!("{w:?}"))
413 .collect::<Vec<_>>()
414 .join("; ");
415 return Err(ConfigError::StageValidation { detail }.into());
416 }
417 }
418 }
419
420 if self.backpressure.queue_depth() == 0 {
422 return Err(ConfigError::InvalidCapacity {
423 field: "queue_depth",
424 }
425 .into());
426 }
427
428 match camera_mode {
430 CameraMode::Observed if self.view_state_provider.is_none() => {
431 return Err(ConfigError::CameraModeConflict {
432 detail: "CameraMode::Observed requires a ViewStateProvider".into(),
433 }
434 .into());
435 }
436 CameraMode::Fixed if self.view_state_provider.is_some() => {
437 return Err(ConfigError::CameraModeConflict {
438 detail: "CameraMode::Fixed must not have a ViewStateProvider".into(),
439 }
440 .into());
441 }
442 _ => {}
443 }
444
445 let epoch_policy = self
447 .epoch_policy
448 .unwrap_or_else(|| Box::new(nv_view::DefaultEpochPolicy::default()));
449
450 Ok(FeedConfig {
451 source,
452 camera_mode,
453 stages,
454 batch,
455 post_batch_stages,
456 view_state_provider: self.view_state_provider,
457 epoch_policy,
458 output_sink,
459 sink_factory: self.sink_factory,
460 backpressure: self.backpressure,
461 temporal: self.temporal,
462 reconnect: self.reconnect,
463 restart: self.restart,
464 ptz_provider: self.ptz_provider,
465 frame_inclusion: self.frame_inclusion,
466 sink_queue_capacity: self.sink_queue_capacity.max(1),
467 sink_shutdown_timeout: self.sink_shutdown_timeout,
468 decode_preference: self.decode_preference,
469 post_decode_hook: self.post_decode_hook,
470 device_residency: self.device_residency,
471 })
472 }
473}
474
475fn validate_pipeline(
478 pre_batch: &[Box<dyn Stage>],
479 batch_caps: Option<&nv_perception::stage::StageCapabilities>,
480 batch_id: Option<nv_core::id::StageId>,
481 post_batch: &[Box<dyn Stage>],
482) -> Vec<nv_perception::ValidationWarning> {
483 validate_pipeline_phased(pre_batch, batch_caps, batch_id, post_batch)
484}
485
486#[cfg(test)]
487mod tests {
488 use super::*;
489 use nv_core::id::StageId;
490 use nv_perception::stage::StageCapabilities;
491 use nv_perception::{Stage, StageContext, StageOutput, ValidationWarning};
492
493 struct CapStage {
494 id: &'static str,
495 caps: StageCapabilities,
496 }
497 impl Stage for CapStage {
498 fn id(&self) -> StageId {
499 StageId(self.id)
500 }
501 fn process(
502 &mut self,
503 _: &StageContext<'_>,
504 ) -> Result<StageOutput, nv_core::error::StageError> {
505 Ok(StageOutput::empty())
506 }
507 fn capabilities(&self) -> Option<StageCapabilities> {
508 Some(self.caps)
509 }
510 }
511
512 #[test]
513 fn batch_consumes_validated_against_pre_batch() {
514 let pre: Vec<Box<dyn Stage>> = vec![];
515 let caps = StageCapabilities::new()
516 .consumes_detections()
517 .produces_tracks();
518 let batch_id = StageId("detector");
519 let post: Vec<Box<dyn Stage>> = vec![];
520
521 let warnings = validate_pipeline(&pre, Some(&caps), Some(batch_id), &post);
522 assert!(
523 warnings.iter().any(|w| matches!(
524 w,
525 ValidationWarning::UnsatisfiedDependency { stage_id, missing: "detections" }
526 if *stage_id == StageId("detector")
527 )),
528 "expected UnsatisfiedDependency for detections, got: {warnings:?}"
529 );
530 }
531
532 #[test]
533 fn batch_consumes_satisfied_by_pre_batch() {
534 let pre: Vec<Box<dyn Stage>> = vec![Box::new(CapStage {
535 id: "det_stage",
536 caps: StageCapabilities::new().produces_detections(),
537 })];
538 let caps = StageCapabilities::new()
539 .consumes_detections()
540 .produces_tracks();
541 let batch_id = StageId("tracker");
542 let post: Vec<Box<dyn Stage>> = vec![];
543
544 let warnings = validate_pipeline(&pre, Some(&caps), Some(batch_id), &post);
545 assert!(
546 !warnings
547 .iter()
548 .any(|w| matches!(w, ValidationWarning::UnsatisfiedDependency { .. })),
549 "no unsatisfied dependencies expected, got: {warnings:?}"
550 );
551 }
552
553 #[test]
554 fn batch_id_collision_detected() {
555 let pre: Vec<Box<dyn Stage>> = vec![Box::new(CapStage {
556 id: "detector",
557 caps: StageCapabilities::new().produces_detections(),
558 })];
559 let batch_id = StageId("detector"); let post: Vec<Box<dyn Stage>> = vec![];
561
562 let warnings = validate_pipeline(&pre, None, Some(batch_id), &post);
563 assert!(
564 warnings.iter().any(|w| matches!(
565 w,
566 ValidationWarning::DuplicateStageId { stage_id } if *stage_id == StageId("detector")
567 )),
568 "expected DuplicateStageId, got: {warnings:?}"
569 );
570 }
571}