use std::collections::{HashMap, HashSet};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use nv_core::error::{NvError, RuntimeError};
use nv_core::health::HealthEvent;
use nv_core::id::{FeedId, StageId};
use nv_media::DefaultMediaFactory;
use nv_media::ingress::MediaIngressFactory;
use nv_perception::BatchProcessor;
use tokio::sync::broadcast;
use crate::batch::{BatchConfig, BatchCoordinator, BatchHandle};
use crate::feed::FeedConfig;
use crate::feed_handle::FeedHandle;
use crate::output::{LagDetector, SharedOutput};
use crate::worker::{self, BroadcastHealthSink, FeedSharedState};
const DEFAULT_HEALTH_CAPACITY: usize = 256;
const DEFAULT_OUTPUT_CAPACITY: usize = 256;
const DEFAULT_FEED_JOIN_TIMEOUT: Duration = Duration::from_secs(10);
const DEFAULT_COORDINATOR_JOIN_TIMEOUT: Duration = Duration::from_secs(10);
pub struct RuntimeBuilder {
max_feeds: usize,
health_capacity: usize,
output_capacity: usize,
ingress_factory: Option<Box<dyn MediaIngressFactory>>,
feed_join_timeout: Duration,
coordinator_join_timeout: Duration,
custom_pipeline_policy: nv_core::security::CustomPipelinePolicy,
}
impl RuntimeBuilder {
#[must_use]
pub fn max_feeds(mut self, max: usize) -> Self {
self.max_feeds = max;
self
}
#[must_use]
pub fn health_capacity(mut self, cap: usize) -> Self {
self.health_capacity = cap;
self
}
#[must_use]
pub fn output_capacity(mut self, cap: usize) -> Self {
self.output_capacity = cap;
self
}
#[must_use]
pub fn ingress_factory(mut self, factory: Box<dyn MediaIngressFactory>) -> Self {
self.ingress_factory = Some(factory);
self
}
#[must_use]
pub fn feed_join_timeout(mut self, timeout: Duration) -> Self {
self.feed_join_timeout = timeout;
self
}
#[must_use]
pub fn coordinator_join_timeout(mut self, timeout: Duration) -> Self {
self.coordinator_join_timeout = timeout;
self
}
#[must_use]
pub fn custom_pipeline_policy(
mut self,
policy: nv_core::security::CustomPipelinePolicy,
) -> Self {
self.custom_pipeline_policy = policy;
self
}
pub fn build(self) -> Result<Runtime, NvError> {
use nv_core::error::ConfigError;
if self.health_capacity == 0 {
return Err(ConfigError::InvalidCapacity {
field: "health_capacity",
}
.into());
}
if self.output_capacity == 0 {
return Err(ConfigError::InvalidCapacity {
field: "output_capacity",
}
.into());
}
let (health_tx, _) = broadcast::channel(self.health_capacity);
let (output_tx, sentinel_rx) = broadcast::channel(self.output_capacity);
let lag_detector = Arc::new(LagDetector::new(sentinel_rx, self.output_capacity));
let health_sink = Arc::new(BroadcastHealthSink::new(health_tx.clone()));
let factory: Arc<dyn MediaIngressFactory> = match self.ingress_factory {
Some(f) => Arc::from(f),
None => Arc::new(DefaultMediaFactory::with_health_sink(health_sink as _)),
};
let inner = Arc::new(RuntimeInner {
max_feeds: self.max_feeds,
next_feed_id: AtomicU64::new(1),
feeds: Mutex::new(HashMap::new()),
coordinators: Mutex::new(Vec::new()),
batch_ids: Mutex::new(HashSet::new()),
health_tx,
output_tx,
lag_detector,
shutdown: AtomicBool::new(false),
factory,
started_at: Instant::now(),
feed_join_timeout: self.feed_join_timeout,
coordinator_join_timeout: self.coordinator_join_timeout,
detached_threads: Mutex::new(Vec::new()),
custom_pipeline_policy: self.custom_pipeline_policy,
});
Ok(Runtime { inner })
}
}
struct RuntimeInner {
max_feeds: usize,
next_feed_id: AtomicU64,
feeds: Mutex<HashMap<FeedId, RunningFeed>>,
coordinators: Mutex<Vec<BatchCoordinator>>,
batch_ids: Mutex<HashSet<StageId>>,
health_tx: broadcast::Sender<HealthEvent>,
output_tx: broadcast::Sender<SharedOutput>,
lag_detector: Arc<LagDetector>,
shutdown: AtomicBool,
factory: Arc<dyn MediaIngressFactory>,
started_at: Instant,
feed_join_timeout: Duration,
coordinator_join_timeout: Duration,
detached_threads: Mutex<Vec<DetachedJoin>>,
custom_pipeline_policy: nv_core::security::CustomPipelinePolicy,
}
struct RunningFeed {
shared: Arc<FeedSharedState>,
thread: Option<std::thread::JoinHandle<()>>,
}
impl RuntimeInner {
fn feed_count(&self) -> Result<usize, NvError> {
let feeds = self
.feeds
.lock()
.map_err(|_| NvError::Runtime(RuntimeError::RegistryPoisoned))?;
Ok(feeds.len())
}
fn reap_detached(&self) -> usize {
let mut detached = self
.detached_threads
.lock()
.unwrap_or_else(|e| e.into_inner());
detached.retain(|d| {
match d.done_rx.try_recv() {
Ok(_) | Err(std::sync::mpsc::TryRecvError::Disconnected) => {
false
}
Err(std::sync::mpsc::TryRecvError::Empty) => true,
}
});
detached.len()
}
fn track_detached(&self, joins: impl IntoIterator<Item = DetachedJoin>) {
let mut detached = self
.detached_threads
.lock()
.unwrap_or_else(|e| e.into_inner());
detached.extend(joins);
}
fn diagnostics(&self) -> Result<crate::diagnostics::RuntimeDiagnostics, NvError> {
let shared_refs: Vec<Arc<FeedSharedState>> = {
let feeds = self
.feeds
.lock()
.map_err(|_| NvError::Runtime(RuntimeError::RegistryPoisoned))?;
feeds
.values()
.map(|entry| Arc::clone(&entry.shared))
.collect()
};
let mut feed_diags: Vec<crate::diagnostics::FeedDiagnostics> = shared_refs
.iter()
.map(|shared| FeedHandle::new(Arc::clone(shared)).diagnostics())
.collect();
feed_diags.sort_by_key(|d| d.feed_id.as_u64());
let batches: Vec<crate::diagnostics::BatchDiagnostics> = self
.coordinators
.lock()
.map_err(|_| NvError::Runtime(RuntimeError::RegistryPoisoned))?
.iter()
.map(|c| {
let h = c.handle();
crate::diagnostics::BatchDiagnostics {
processor_id: h.processor_id(),
metrics: h.metrics(),
}
})
.collect();
let output_lag = self.lag_detector.status();
let detached_thread_count = self.reap_detached();
Ok(crate::diagnostics::RuntimeDiagnostics {
uptime: self.started_at.elapsed(),
feed_count: feed_diags.len(),
max_feeds: self.max_feeds,
feeds: feed_diags,
batches,
output_lag,
detached_thread_count,
})
}
fn create_batch(
&self,
processor: Box<dyn BatchProcessor>,
config: BatchConfig,
) -> Result<BatchHandle, NvError> {
if self.shutdown.load(Ordering::Relaxed) {
return Err(NvError::Runtime(RuntimeError::ShutdownInProgress));
}
let processor_id = processor.id();
{
let mut ids = self
.batch_ids
.lock()
.map_err(|_| NvError::Runtime(RuntimeError::RegistryPoisoned))?;
if !ids.insert(processor_id) {
return Err(NvError::Config(
nv_core::error::ConfigError::DuplicateBatchProcessorId { id: processor_id },
));
}
}
let coordinator = match BatchCoordinator::start(processor, config, self.health_tx.clone()) {
Ok(c) => c,
Err(e) => {
if let Ok(mut ids) = self.batch_ids.lock() {
ids.remove(&processor_id);
}
return Err(e);
}
};
let handle = coordinator.handle();
{
let mut coords = self
.coordinators
.lock()
.map_err(|_| NvError::Runtime(RuntimeError::RegistryPoisoned))?;
coords.push(coordinator);
if self.shutdown.load(Ordering::Acquire) {
if let Some(orphan) = coords.pop() {
drop(coords);
if let Some(detached) = orphan.shutdown(self.coordinator_join_timeout) {
self.track_detached(std::iter::once(detached));
}
}
if let Ok(mut ids) = self.batch_ids.lock() {
ids.remove(&processor_id);
}
return Err(NvError::Runtime(RuntimeError::ShutdownInProgress));
}
}
Ok(handle)
}
fn add_feed(&self, config: FeedConfig) -> Result<FeedHandle, NvError> {
if self.shutdown.load(Ordering::Relaxed) {
return Err(NvError::Runtime(RuntimeError::ShutdownInProgress));
}
if matches!(config.source, nv_core::config::SourceSpec::Custom { .. })
&& self.custom_pipeline_policy == nv_core::security::CustomPipelinePolicy::Reject
{
return Err(NvError::Media(
nv_core::error::MediaError::CustomPipelineRejected,
));
}
if let nv_core::config::SourceSpec::Rtsp {
ref url, security, ..
} = config.source
&& security == nv_core::security::RtspSecurityPolicy::RequireTls
&& nv_core::security::is_insecure_rtsp(url)
{
return Err(NvError::Media(
nv_core::error::MediaError::InsecureRtspRejected,
));
}
let mut feeds = self
.feeds
.lock()
.map_err(|_| NvError::Runtime(RuntimeError::RegistryPoisoned))?;
if feeds.len() >= self.max_feeds {
return Err(NvError::Runtime(RuntimeError::FeedLimitExceeded {
max: self.max_feeds,
}));
}
let id = FeedId::new(self.next_feed_id.fetch_add(1, Ordering::Relaxed));
let (shared, thread) = worker::spawn_feed_worker(
id,
config,
Arc::clone(&self.factory),
self.health_tx.clone(),
self.output_tx.clone(),
Arc::clone(&self.lag_detector),
)?;
let handle = FeedHandle::new(Arc::clone(&shared));
feeds.insert(
id,
RunningFeed {
shared,
thread: Some(thread),
},
);
Ok(handle)
}
fn remove_feed(&self, feed_id: FeedId) -> Result<(), NvError> {
let mut feeds = self
.feeds
.lock()
.map_err(|_| NvError::Runtime(RuntimeError::RegistryPoisoned))?;
let entry = feeds
.remove(&feed_id)
.ok_or(NvError::Runtime(RuntimeError::FeedNotFound { feed_id }))?;
entry.shared.request_shutdown();
drop(feeds);
if let Some(handle) = entry.thread
&& let Some(detached) =
bounded_join(handle, feed_id, &self.health_tx, self.feed_join_timeout)
{
self.track_detached(std::iter::once(detached));
}
Ok(())
}
fn shutdown_all(&self) -> Result<(), NvError> {
self.shutdown.store(true, Ordering::Release);
let mut feeds = self
.feeds
.lock()
.map_err(|_| NvError::Runtime(RuntimeError::RegistryPoisoned))?;
for entry in feeds.values() {
entry.shared.request_shutdown();
}
let entries: Vec<_> = feeds.drain().collect();
drop(feeds);
{
let coordinators = self.coordinators.lock().unwrap_or_else(|e| e.into_inner());
for coordinator in coordinators.iter() {
coordinator.signal_shutdown();
}
}
for (id, mut entry) in entries {
if let Some(handle) = entry.thread.take()
&& let Some(detached) =
bounded_join(handle, id, &self.health_tx, self.feed_join_timeout)
{
self.track_detached(std::iter::once(detached));
}
}
{
let mut coordinators = self.coordinators.lock().unwrap_or_else(|e| e.into_inner());
let detached: Vec<_> = coordinators
.drain(..)
.filter_map(|c| c.shutdown(self.coordinator_join_timeout))
.collect();
if !detached.is_empty() {
self.track_detached(detached);
}
}
self.lag_detector.flush(&self.health_tx);
Ok(())
}
}
fn bounded_join(
handle: std::thread::JoinHandle<()>,
feed_id: FeedId,
health_tx: &broadcast::Sender<HealthEvent>,
timeout: Duration,
) -> Option<DetachedJoin> {
let (done_tx, done_rx) = std::sync::mpsc::channel();
let label = format!("nv-join-{feed_id}");
let joiner = std::thread::Builder::new()
.name(label.clone())
.spawn(move || {
let result = handle.join();
let _ = done_tx.send(result);
});
match done_rx.recv_timeout(timeout) {
Ok(Ok(())) => None,
Ok(Err(_)) => {
tracing::error!(
feed_id = %feed_id,
"feed worker thread panicked during join",
);
None
}
Err(_) => {
tracing::warn!(
feed_id = %feed_id,
timeout_secs = timeout.as_secs(),
"feed worker thread did not finish within timeout — detaching",
);
let _ = health_tx.send(HealthEvent::FeedStopped {
feed_id,
reason: nv_core::health::StopReason::Fatal {
detail: format!(
"worker thread did not join within {}s — detached",
timeout.as_secs()
),
},
});
joiner.ok().map(|j| DetachedJoin {
label,
done_rx,
joiner: j,
})
}
}
}
pub(crate) struct DetachedJoin {
#[allow(dead_code)]
pub(crate) label: String,
pub(crate) done_rx: std::sync::mpsc::Receiver<std::thread::Result<()>>,
#[allow(dead_code)]
pub(crate) joiner: std::thread::JoinHandle<()>,
}
pub struct Runtime {
inner: Arc<RuntimeInner>,
}
impl Runtime {
#[must_use]
pub fn builder() -> RuntimeBuilder {
RuntimeBuilder {
max_feeds: 64,
health_capacity: DEFAULT_HEALTH_CAPACITY,
output_capacity: DEFAULT_OUTPUT_CAPACITY,
ingress_factory: None,
feed_join_timeout: DEFAULT_FEED_JOIN_TIMEOUT,
coordinator_join_timeout: DEFAULT_COORDINATOR_JOIN_TIMEOUT,
custom_pipeline_policy: nv_core::security::CustomPipelinePolicy::default(),
}
}
#[must_use]
pub fn handle(&self) -> RuntimeHandle {
RuntimeHandle {
inner: Arc::clone(&self.inner),
}
}
pub fn feed_count(&self) -> Result<usize, NvError> {
self.inner.feed_count()
}
#[must_use]
pub fn max_feeds(&self) -> usize {
self.inner.max_feeds
}
#[must_use]
pub fn uptime(&self) -> Duration {
self.inner.started_at.elapsed()
}
pub fn diagnostics(&self) -> Result<crate::diagnostics::RuntimeDiagnostics, NvError> {
self.inner.diagnostics()
}
pub fn health_subscribe(&self) -> broadcast::Receiver<HealthEvent> {
self.inner.health_tx.subscribe()
}
pub fn output_subscribe(&self) -> broadcast::Receiver<SharedOutput> {
self.inner.output_tx.subscribe()
}
pub fn add_feed(&self, config: FeedConfig) -> Result<FeedHandle, NvError> {
self.inner.add_feed(config)
}
pub fn create_batch(
&self,
processor: Box<dyn BatchProcessor>,
config: BatchConfig,
) -> Result<BatchHandle, NvError> {
self.inner.create_batch(processor, config)
}
pub fn remove_feed(&self, feed_id: FeedId) -> Result<(), NvError> {
self.inner.remove_feed(feed_id)
}
pub fn shutdown(self) -> Result<(), NvError> {
self.inner.shutdown_all()
}
}
impl Drop for Runtime {
fn drop(&mut self) {
if !self.inner.shutdown.load(Ordering::Acquire) {
let _ = self.inner.shutdown_all();
}
}
}
#[derive(Clone)]
pub struct RuntimeHandle {
inner: Arc<RuntimeInner>,
}
impl RuntimeHandle {
pub fn feed_count(&self) -> Result<usize, NvError> {
self.inner.feed_count()
}
#[must_use]
pub fn max_feeds(&self) -> usize {
self.inner.max_feeds
}
#[must_use]
pub fn uptime(&self) -> Duration {
self.inner.started_at.elapsed()
}
pub fn diagnostics(&self) -> Result<crate::diagnostics::RuntimeDiagnostics, NvError> {
self.inner.diagnostics()
}
pub fn health_subscribe(&self) -> broadcast::Receiver<HealthEvent> {
self.inner.health_tx.subscribe()
}
pub fn output_subscribe(&self) -> broadcast::Receiver<SharedOutput> {
self.inner.output_tx.subscribe()
}
pub fn add_feed(&self, config: FeedConfig) -> Result<FeedHandle, NvError> {
self.inner.add_feed(config)
}
pub fn create_batch(
&self,
processor: Box<dyn BatchProcessor>,
config: BatchConfig,
) -> Result<BatchHandle, NvError> {
self.inner.create_batch(processor, config)
}
pub fn remove_feed(&self, feed_id: FeedId) -> Result<(), NvError> {
self.inner.remove_feed(feed_id)
}
pub fn shutdown(&self) -> Result<(), NvError> {
self.inner.shutdown_all()
}
}
#[cfg(test)]
#[path = "runtime_tests/mod.rs"]
mod tests;