use std::sync::atomic::Ordering;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use nv_core::error::MediaError;
use nv_core::health::HealthEvent;
use nv_core::id::FeedId;
use nv_frame::FrameEnvelope;
use nv_media::ingress::FrameSink;
use tokio::sync::broadcast;
use crate::queue::{FrameQueue, PushOutcome};
use super::shared_state::FeedSharedState;
const BP_THROTTLE_INTERVAL: Duration = Duration::from_secs(1);
pub(super) struct BackpressureThrottle {
inner: Mutex<BpThrottleInner>,
}
struct BpThrottleInner {
in_backpressure: bool,
accumulated: u64,
last_event: Instant,
}
impl BackpressureThrottle {
pub(super) fn new() -> Self {
Self {
inner: Mutex::new(BpThrottleInner {
in_backpressure: false,
accumulated: 0,
last_event: Instant::now(),
}),
}
}
fn record_drop(&self, health_tx: &broadcast::Sender<HealthEvent>, feed_id: FeedId) {
let Ok(mut inner) = self.inner.try_lock() else {
return;
};
inner.accumulated += 1;
if !inner.in_backpressure {
inner.in_backpressure = true;
let delta = inner.accumulated;
inner.accumulated = 0;
inner.last_event = Instant::now();
let _ = health_tx.send(HealthEvent::BackpressureDrop {
feed_id,
frames_dropped: delta,
});
return;
}
if inner.last_event.elapsed() >= BP_THROTTLE_INTERVAL {
let delta = inner.accumulated;
inner.accumulated = 0;
inner.last_event = Instant::now();
let _ = health_tx.send(HealthEvent::BackpressureDrop {
feed_id,
frames_dropped: delta,
});
}
}
}
pub(super) struct FeedFrameSink {
pub queue: Arc<FrameQueue>,
pub shared: Arc<FeedSharedState>,
pub health_tx: broadcast::Sender<HealthEvent>,
pub feed_id: FeedId,
pub bp_throttle: BackpressureThrottle,
}
impl FrameSink for FeedFrameSink {
fn on_frame(&self, frame: FrameEnvelope) {
self.shared.frames_received.fetch_add(1, Ordering::Relaxed);
let outcome = self.queue.push(frame);
match outcome {
PushOutcome::Accepted => {}
PushOutcome::DroppedOldest | PushOutcome::Rejected => {
self.shared.frames_dropped.fetch_add(1, Ordering::Relaxed);
self.bp_throttle.record_drop(&self.health_tx, self.feed_id);
}
}
}
fn on_error(&self, _error: MediaError) {
self.queue.wake_consumer();
}
fn on_eos(&self) {
self.queue.close();
}
fn wake(&self) {
self.queue.wake_consumer();
}
}