mod frame_processing;
mod lifecycle;
mod view;
#[cfg(test)]
#[path = "tests.rs"]
mod tests;
use std::collections::HashSet;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::time::Instant;
use nv_core::TrackId;
use nv_core::config::CameraMode;
use nv_core::id::FeedId;
use nv_core::metrics::StageMetrics;
use nv_core::timestamp::MonotonicTs;
use nv_perception::Stage;
use nv_temporal::{RetentionPolicy, TemporalStore};
use nv_view::{ContextValidity, EpochPolicy, ViewSnapshot, ViewState, ViewStateProvider};
use crate::batch::BatchHandle;
use crate::output::FrameInclusion;
const BATCH_REJECTION_THROTTLE: std::time::Duration = std::time::Duration::from_secs(1);
const BATCH_TIMEOUT_THROTTLE: std::time::Duration = std::time::Duration::from_secs(1);
const BATCH_IN_FLIGHT_THROTTLE: std::time::Duration = std::time::Duration::from_secs(1);
const FRAME_LAG_THROTTLE: std::time::Duration = std::time::Duration::from_secs(1);
const FRAME_LAG_THRESHOLD_MS: u64 = 2_000;
pub(crate) struct PipelineExecutor {
pub(super) feed_id: FeedId,
pub(super) camera_mode: CameraMode,
pub(super) stages: Vec<Box<dyn Stage>>,
pub(super) batch: Option<BatchHandle>,
pub(super) post_batch_stages: Vec<Box<dyn Stage>>,
pub(super) temporal: TemporalStore,
pub(super) view_state: ViewState,
pub(super) view_snapshot: ViewSnapshot,
pub(super) view_state_provider: Option<Box<dyn ViewStateProvider>>,
pub(super) epoch_policy: Box<dyn EpochPolicy>,
pub(super) stage_metrics: Vec<StageMetrics>,
pub(super) frames_processed: u64,
pub(super) clock_anchor: Instant,
pub(super) clock_anchor_ts: MonotonicTs,
pub(super) motion_state_start: Instant,
pub(super) frame_inclusion: FrameInclusion,
pub(super) frame_sample_counter: u64,
pub(super) fps_warmup_start_ts: Option<MonotonicTs>,
pub(super) track_id_buf: HashSet<TrackId>,
pub(super) ended_buf: Vec<TrackId>,
pub(super) batch_rejection_count: u64,
pub(super) last_batch_rejection_event: Option<Instant>,
pub(super) batch_timeout_count: u64,
pub(super) last_batch_timeout_event: Option<Instant>,
pub(super) batch_in_flight: Option<Arc<std::sync::atomic::AtomicUsize>>,
pub(super) batch_in_flight_rejection_count: u64,
pub(super) last_batch_in_flight_rejection_event: Option<Instant>,
pub(super) feed_shutdown: Arc<AtomicBool>,
pub(super) coordinator_loss_emitted: bool,
pub(super) frame_lag_count: u64,
pub(super) frame_lag_peak_age_ms: u64,
pub(super) last_frame_lag_event: Option<Instant>,
}
impl PipelineExecutor {
#[allow(clippy::too_many_arguments)]
pub fn new(
feed_id: FeedId,
stages: Vec<Box<dyn Stage>>,
batch: Option<BatchHandle>,
post_batch_stages: Vec<Box<dyn Stage>>,
retention: RetentionPolicy,
camera_mode: CameraMode,
view_state_provider: Option<Box<dyn ViewStateProvider>>,
epoch_policy: Box<dyn EpochPolicy>,
frame_inclusion: FrameInclusion,
feed_shutdown: Arc<AtomicBool>,
) -> Self {
let view_state = match camera_mode {
CameraMode::Fixed => ViewState::fixed_initial(),
CameraMode::Observed => ViewState::observed_initial(),
};
let view_snapshot = ViewSnapshot::new(view_state.clone());
let total_stage_count = stages.len() + post_batch_stages.len();
let batch_in_flight = batch
.as_ref()
.map(|_| Arc::new(std::sync::atomic::AtomicUsize::new(0)));
let now = Instant::now();
Self {
feed_id,
camera_mode,
stages,
batch,
post_batch_stages,
temporal: TemporalStore::new(retention),
view_state,
view_snapshot,
view_state_provider,
epoch_policy,
stage_metrics: vec![
StageMetrics {
frames_processed: 0,
errors: 0,
};
total_stage_count
],
frames_processed: 0,
clock_anchor: now,
clock_anchor_ts: MonotonicTs::from_nanos(0),
motion_state_start: now,
frame_inclusion,
frame_sample_counter: 0,
fps_warmup_start_ts: None,
track_id_buf: HashSet::new(),
ended_buf: Vec::new(),
batch_rejection_count: 0,
last_batch_rejection_event: None,
batch_timeout_count: 0,
last_batch_timeout_event: None,
batch_in_flight,
batch_in_flight_rejection_count: 0,
last_batch_in_flight_rejection_event: None,
feed_shutdown,
coordinator_loss_emitted: false,
frame_lag_count: 0,
frame_lag_peak_age_ms: 0,
last_frame_lag_event: None,
}
}
pub fn view_epoch(&self) -> u64 {
self.view_snapshot.epoch().as_u64()
}
pub fn track_count(&self) -> usize {
self.temporal.track_count()
}
pub fn stability_score(&self) -> f32 {
self.view_snapshot.stability_score()
}
pub fn context_validity_ordinal(&self) -> u8 {
match self.view_snapshot.validity() {
ContextValidity::Valid => 0,
ContextValidity::Degraded { .. } => 1,
ContextValidity::Invalid => 2,
}
}
}
fn instant_to_ts_impl(anchor: Instant, anchor_ts: MonotonicTs, t: Instant) -> MonotonicTs {
let elapsed = t.duration_since(anchor);
MonotonicTs::from_nanos(anchor_ts.as_nanos() + elapsed.as_nanos() as u64)
}
#[cfg(test)]
impl PipelineExecutor {
pub fn frames_processed(&self) -> u64 {
self.frames_processed
}
}