use std::sync::atomic::{AtomicBool, AtomicU8, AtomicU32, AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, Condvar, Mutex};
use std::time::{Duration, Instant};
use nv_core::health::HealthEvent;
use nv_core::id::FeedId;
use nv_core::metrics::FeedMetrics;
use nv_media::ingress::HealthSink;
use tokio::sync::broadcast;
use crate::queue::FrameQueue;
pub(crate) struct FeedSharedState {
pub id: FeedId,
pub paused: AtomicBool,
pub shutdown: Arc<AtomicBool>,
pub frames_received: AtomicU64,
pub frames_dropped: AtomicU64,
pub frames_processed: AtomicU64,
pub tracks_active: AtomicU64,
pub view_epoch: AtomicU64,
pub restarts: AtomicU32,
pub alive: AtomicBool,
pub pause_condvar: (Mutex<bool>, Condvar),
queue: Mutex<Option<Arc<FrameQueue>>>,
pub sink_occupancy: Arc<AtomicUsize>,
pub sink_capacity: AtomicUsize,
pub session_started_at: Mutex<Instant>,
pub decode_status: Mutex<Option<(nv_core::health::DecodeOutcome, String)>>,
pub view_stability_score: AtomicU32,
pub view_context_validity: AtomicU8,
pub batch_processor_id: Option<nv_core::id::StageId>,
}
impl FeedSharedState {
pub fn new(id: FeedId, batch_processor_id: Option<nv_core::id::StageId>) -> Self {
Self {
id,
paused: AtomicBool::new(false),
shutdown: Arc::new(AtomicBool::new(false)),
frames_received: AtomicU64::new(0),
frames_dropped: AtomicU64::new(0),
frames_processed: AtomicU64::new(0),
tracks_active: AtomicU64::new(0),
view_epoch: AtomicU64::new(0),
restarts: AtomicU32::new(0),
alive: AtomicBool::new(true),
pause_condvar: (Mutex::new(false), Condvar::new()),
queue: Mutex::new(None),
sink_occupancy: Arc::new(AtomicUsize::new(0)),
sink_capacity: AtomicUsize::new(0),
session_started_at: Mutex::new(Instant::now()),
decode_status: Mutex::new(None),
view_stability_score: AtomicU32::new(1.0_f32.to_bits()),
view_context_validity: AtomicU8::new(0),
batch_processor_id,
}
}
pub fn metrics(&self) -> FeedMetrics {
FeedMetrics {
feed_id: self.id,
frames_received: self.frames_received.load(Ordering::Relaxed),
frames_dropped: self.frames_dropped.load(Ordering::Relaxed),
frames_processed: self.frames_processed.load(Ordering::Relaxed),
tracks_active: self.tracks_active.load(Ordering::Relaxed),
view_epoch: self.view_epoch.load(Ordering::Relaxed),
restarts: self.restarts.load(Ordering::Relaxed),
}
}
pub fn source_queue_telemetry(&self) -> (usize, usize) {
if let Ok(guard) = self.queue.lock()
&& let Some(ref q) = *guard
{
return (q.depth(), q.capacity());
}
(0, 0)
}
pub fn session_uptime(&self) -> Duration {
self.session_started_at
.lock()
.map(|t| t.elapsed())
.unwrap_or(Duration::ZERO)
}
pub fn decode_status(&self) -> Option<(nv_core::health::DecodeOutcome, String)> {
self.decode_status.lock().ok().and_then(|g| g.clone())
}
pub fn request_shutdown(&self) {
self.shutdown.store(true, Ordering::Relaxed);
let (_lock, cvar) = &self.pause_condvar;
cvar.notify_one();
if let Ok(guard) = self.queue.lock()
&& let Some(ref q) = *guard
{
q.wake_consumer();
}
}
pub(crate) fn set_queue(&self, queue: Option<Arc<FrameQueue>>) {
if let Ok(mut guard) = self.queue.lock() {
*guard = queue;
}
}
}
pub(crate) struct BroadcastHealthSink {
tx: broadcast::Sender<HealthEvent>,
}
impl BroadcastHealthSink {
pub fn new(tx: broadcast::Sender<HealthEvent>) -> Self {
Self { tx }
}
}
impl HealthSink for BroadcastHealthSink {
fn emit(&self, event: HealthEvent) {
let _ = self.tx.send(event);
}
}