use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::{Duration, Instant};
use nv_core::error::RuntimeError;
use nv_core::health::HealthEvent;
use nv_core::id::FeedId;
use tokio::sync::broadcast;
use crate::output::{OutputSink, SharedOutput};
pub(crate) const DEFAULT_SINK_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
pub(super) const SINK_BP_THROTTLE_INTERVAL: Duration = Duration::from_secs(1);
pub(super) struct SinkWorker {
tx: std::sync::mpsc::SyncSender<SharedOutput>,
thread: Option<std::thread::JoinHandle<Box<dyn OutputSink>>>,
occupancy: Arc<AtomicUsize>,
}
impl SinkWorker {
pub(super) fn spawn(
feed_id: FeedId,
sink: Box<dyn OutputSink>,
health_tx: broadcast::Sender<HealthEvent>,
capacity: usize,
occupancy: Arc<AtomicUsize>,
) -> Result<Self, RuntimeError> {
let (tx, rx) = std::sync::mpsc::sync_channel::<SharedOutput>(capacity);
let occ = Arc::clone(&occupancy);
let thread = std::thread::Builder::new()
.name(format!("nv-sink-{}", feed_id))
.spawn(move || Self::run(feed_id, sink, rx, health_tx, occ))
.map_err(|e| RuntimeError::ThreadSpawnFailed {
detail: format!("sink worker for {feed_id}: {e}"),
})?;
Ok(Self {
tx,
thread: Some(thread),
occupancy,
})
}
pub(super) fn send(
&self,
output: SharedOutput,
sink_bp: &mut SinkBpThrottle,
health_tx: &broadcast::Sender<HealthEvent>,
feed_id: FeedId,
) -> bool {
self.occupancy.fetch_add(1, Ordering::Relaxed);
match self.tx.try_send(output) {
Ok(()) => true,
Err(std::sync::mpsc::TrySendError::Full(rejected)) => {
self.occupancy.fetch_sub(1, Ordering::Relaxed);
sink_bp.record_drop(health_tx, feed_id);
let _ = rejected; false
}
Err(std::sync::mpsc::TrySendError::Disconnected(_)) => {
self.occupancy.fetch_sub(1, Ordering::Relaxed);
false
}
}
}
fn run(
feed_id: FeedId,
sink: Box<dyn OutputSink>,
rx: std::sync::mpsc::Receiver<SharedOutput>,
health_tx: broadcast::Sender<HealthEvent>,
occupancy: Arc<AtomicUsize>,
) -> Box<dyn OutputSink> {
while let Ok(output) = rx.recv() {
occupancy.fetch_sub(1, Ordering::Relaxed);
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
sink.emit(output);
}));
if result.is_err() {
tracing::error!(
feed_id = %feed_id,
"OutputSink::emit() panicked — output dropped",
);
let _ = health_tx.send(HealthEvent::SinkPanic { feed_id });
}
}
sink
}
pub(super) fn shutdown(
mut self,
health_tx: &broadcast::Sender<HealthEvent>,
feed_id: FeedId,
timeout: Duration,
) -> SinkRecovery {
drop(self.tx);
let Some(handle) = self.thread.take() else {
return SinkRecovery::Lost;
};
let (done_tx, done_rx) = std::sync::mpsc::channel();
let _detached = std::thread::Builder::new()
.name(format!("nv-sink-join-{feed_id}"))
.spawn(move || {
let result = handle.join();
let _ = done_tx.send(result);
});
match done_rx.recv_timeout(timeout) {
Ok(Ok(sink)) => SinkRecovery::Recovered(sink),
Ok(Err(_)) => {
tracing::error!(
feed_id = %feed_id,
"sink worker thread panicked during shutdown",
);
let _ = health_tx.send(HealthEvent::SinkPanic { feed_id });
SinkRecovery::Lost
}
Err(_) => {
tracing::warn!(
feed_id = %feed_id,
timeout_secs = timeout.as_secs(),
"sink worker thread did not finish within timeout — detaching",
);
let _ = health_tx.send(HealthEvent::SinkTimeout { feed_id });
SinkRecovery::Lost
}
}
}
}
pub(super) enum SinkRecovery {
Recovered(Box<dyn OutputSink>),
Lost,
}
pub(super) struct NullSink;
impl OutputSink for NullSink {
fn emit(&self, _output: SharedOutput) {}
}
pub(super) struct SinkBpThrottle {
in_backpressure: bool,
accumulated: u64,
last_event: Instant,
}
impl SinkBpThrottle {
pub(super) fn new() -> Self {
Self {
in_backpressure: false,
accumulated: 0,
last_event: Instant::now(),
}
}
pub(super) fn record_drop(
&mut self,
health_tx: &broadcast::Sender<HealthEvent>,
feed_id: FeedId,
) {
self.accumulated += 1;
if !self.in_backpressure {
self.in_backpressure = true;
let delta = self.accumulated;
self.accumulated = 0;
self.last_event = Instant::now();
let _ = health_tx.send(HealthEvent::SinkBackpressure {
feed_id,
outputs_dropped: delta,
});
return;
}
if self.last_event.elapsed() >= SINK_BP_THROTTLE_INTERVAL {
let delta = self.accumulated;
self.accumulated = 0;
self.last_event = Instant::now();
let _ = health_tx.send(HealthEvent::SinkBackpressure {
feed_id,
outputs_dropped: delta,
});
}
}
pub(super) fn flush(&mut self, health_tx: &broadcast::Sender<HealthEvent>, feed_id: FeedId) {
if self.accumulated > 0 {
let delta = self.accumulated;
self.accumulated = 0;
self.in_backpressure = false;
let _ = health_tx.send(HealthEvent::SinkBackpressure {
feed_id,
outputs_dropped: delta,
});
}
}
}