use std::sync::Arc;
use std::time::Duration;
use nv_core::id::FeedId;
use nv_core::metrics::FeedMetrics;
#[derive(Clone)]
pub struct FeedHandle {
shared: Arc<crate::worker::FeedSharedState>,
}
#[derive(Debug, Clone, Copy)]
pub struct QueueTelemetry {
pub source_depth: usize,
pub source_capacity: usize,
pub sink_depth: usize,
pub sink_capacity: usize,
}
#[derive(Debug, Clone)]
pub struct DecodeStatus {
pub outcome: nv_core::health::DecodeOutcome,
pub detail: String,
}
impl FeedHandle {
pub(crate) fn new(shared: Arc<crate::worker::FeedSharedState>) -> Self {
Self { shared }
}
#[must_use]
pub fn id(&self) -> FeedId {
self.shared.id
}
#[must_use]
pub fn is_paused(&self) -> bool {
self.shared
.paused
.load(std::sync::atomic::Ordering::Relaxed)
}
#[must_use]
pub fn is_alive(&self) -> bool {
self.shared.alive.load(std::sync::atomic::Ordering::Relaxed)
}
#[must_use]
pub fn metrics(&self) -> FeedMetrics {
self.shared.metrics()
}
#[must_use]
pub fn queue_telemetry(&self) -> QueueTelemetry {
let (source_depth, source_capacity) = self.shared.source_queue_telemetry();
QueueTelemetry {
source_depth,
source_capacity,
sink_depth: self
.shared
.sink_occupancy
.load(std::sync::atomic::Ordering::Relaxed),
sink_capacity: self
.shared
.sink_capacity
.load(std::sync::atomic::Ordering::Relaxed),
}
}
#[must_use]
pub fn uptime(&self) -> Duration {
self.shared.session_uptime()
}
#[must_use]
pub fn decode_status(&self) -> Option<DecodeStatus> {
let (outcome, detail) = self.shared.decode_status()?;
Some(DecodeStatus { outcome, detail })
}
#[must_use]
pub fn diagnostics(&self) -> crate::diagnostics::FeedDiagnostics {
use crate::diagnostics::{ViewDiagnostics, ViewStatus};
let metrics = self.metrics();
let validity_ordinal = self
.shared
.view_context_validity
.load(std::sync::atomic::Ordering::Relaxed);
let status = match validity_ordinal {
0 => ViewStatus::Stable,
1 => ViewStatus::Degraded,
_ => ViewStatus::Invalid,
};
let stability_bits = self
.shared
.view_stability_score
.load(std::sync::atomic::Ordering::Relaxed);
crate::diagnostics::FeedDiagnostics {
feed_id: self.id(),
alive: self.is_alive(),
paused: self.is_paused(),
uptime: self.uptime(),
metrics,
queues: self.queue_telemetry(),
decode: self.decode_status(),
view: ViewDiagnostics {
epoch: metrics.view_epoch,
stability_score: f32::from_bits(stability_bits),
status,
},
batch_processor_id: self.shared.batch_processor_id,
}
}
pub fn pause(&self) -> Result<(), nv_core::NvError> {
let was_paused = self
.shared
.paused
.swap(true, std::sync::atomic::Ordering::Relaxed);
if was_paused {
return Err(nv_core::NvError::Runtime(
nv_core::error::RuntimeError::AlreadyPaused,
));
}
let (lock, _cvar) = &self.shared.pause_condvar;
*lock.lock().unwrap_or_else(|e| e.into_inner()) = true;
Ok(())
}
pub fn resume(&self) -> Result<(), nv_core::NvError> {
let was_paused = self
.shared
.paused
.swap(false, std::sync::atomic::Ordering::Relaxed);
if !was_paused {
return Err(nv_core::NvError::Runtime(
nv_core::error::RuntimeError::NotPaused,
));
}
let (lock, cvar) = &self.shared.pause_condvar;
*lock.lock().unwrap_or_else(|e| e.into_inner()) = false;
cvar.notify_one();
Ok(())
}
}