nv_runtime/runtime.rs
1//! Runtime, runtime builder, and runtime handle.
2//!
3//! The [`Runtime`] is the top-level owner constructed via [`RuntimeBuilder`].
4//! After building, call [`Runtime::handle()`] to obtain a [`RuntimeHandle`] —
5//! a cheaply cloneable control surface for adding/removing feeds, subscribing
6//! to health and output events, and triggering shutdown.
7//!
8//! [`Runtime::shutdown()`] consumes the runtime, joins all worker threads,
9//! and guarantees a clean stop.
10//!
11//! Each feed runs on a dedicated OS thread (see the `worker` module).
12
13use std::collections::{HashMap, HashSet};
14use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
15use std::sync::{Arc, Mutex};
16use std::time::{Duration, Instant};
17
18use nv_core::error::{NvError, RuntimeError};
19use nv_core::health::HealthEvent;
20use nv_core::id::{FeedId, StageId};
21use nv_media::DefaultMediaFactory;
22use nv_media::ingress::MediaIngressFactory;
23use nv_perception::BatchProcessor;
24use tokio::sync::broadcast;
25
26use crate::batch::{BatchConfig, BatchCoordinator, BatchHandle};
27use crate::feed::FeedConfig;
28use crate::feed_handle::FeedHandle;
29use crate::output::{LagDetector, SharedOutput};
30use crate::worker::{self, BroadcastHealthSink, FeedSharedState};
31
32/// Default health broadcast channel capacity.
33const DEFAULT_HEALTH_CAPACITY: usize = 256;
34
35/// Default output broadcast channel capacity.
36const DEFAULT_OUTPUT_CAPACITY: usize = 256;
37
38/// Default time to wait for a feed worker thread to join during
39/// remove/shutdown. If exceeded, the thread is detached.
40const DEFAULT_FEED_JOIN_TIMEOUT: Duration = Duration::from_secs(10);
41
42/// Default time to wait for a batch coordinator thread to join.
43const DEFAULT_COORDINATOR_JOIN_TIMEOUT: Duration = Duration::from_secs(10);
44
45/// Builder for constructing a [`Runtime`].
46///
47/// The runtime uses a default media backend unless a custom
48/// [`MediaIngressFactory`] is supplied via [`ingress_factory()`](Self::ingress_factory).
49///
50/// # Example
51///
52/// ```
53/// use nv_runtime::Runtime;
54///
55/// let runtime = Runtime::builder()
56/// .max_feeds(16)
57/// .build()
58/// .expect("failed to build runtime");
59/// ```
60pub struct RuntimeBuilder {
61 max_feeds: usize,
62 health_capacity: usize,
63 output_capacity: usize,
64 ingress_factory: Option<Box<dyn MediaIngressFactory>>,
65 feed_join_timeout: Duration,
66 coordinator_join_timeout: Duration,
67 custom_pipeline_policy: nv_core::security::CustomPipelinePolicy,
68}
69
70impl RuntimeBuilder {
71 /// Set the maximum number of concurrent feeds. Default: `64`.
72 #[must_use]
73 pub fn max_feeds(mut self, max: usize) -> Self {
74 self.max_feeds = max;
75 self
76 }
77
78 /// Set the health broadcast channel capacity. Default: `256`.
79 #[must_use]
80 pub fn health_capacity(mut self, cap: usize) -> Self {
81 self.health_capacity = cap;
82 self
83 }
84
85 /// Set the output broadcast channel capacity. Default: `256`.
86 ///
87 /// Controls how many `OutputEnvelope`s the aggregate output
88 /// subscription channel can buffer before the ring buffer wraps.
89 ///
90 /// When the internal sentinel receiver detects ring-buffer wrap,
91 /// the runtime emits a global [`HealthEvent::OutputLagged`] event
92 /// carrying the sentinel-observed per-event delta (not cumulative).
93 /// This indicates channel saturation / backpressure risk — it does
94 /// **not** guarantee that any specific external subscriber lost
95 /// messages.
96 #[must_use]
97 pub fn output_capacity(mut self, cap: usize) -> Self {
98 self.output_capacity = cap;
99 self
100 }
101
102 /// Set a custom `MediaIngressFactory`.
103 ///
104 /// By default the runtime uses the built-in media backend
105 /// ([`DefaultMediaFactory`]).
106 /// Replace this for testing or alternative backends.
107 #[must_use]
108 pub fn ingress_factory(mut self, factory: Box<dyn MediaIngressFactory>) -> Self {
109 self.ingress_factory = Some(factory);
110 self
111 }
112
113 /// Set the maximum time to wait for a feed worker thread to join
114 /// during shutdown or removal. Default: `10s`.
115 ///
116 /// If a feed thread does not finish within this timeout, it is
117 /// detached and a health event is emitted.
118 #[must_use]
119 pub fn feed_join_timeout(mut self, timeout: Duration) -> Self {
120 self.feed_join_timeout = timeout;
121 self
122 }
123
124 /// Set the maximum time to wait for a batch coordinator thread
125 /// to join during shutdown. Default: `10s`.
126 #[must_use]
127 pub fn coordinator_join_timeout(mut self, timeout: Duration) -> Self {
128 self.coordinator_join_timeout = timeout;
129 self
130 }
131
132 /// Set the custom pipeline security policy.
133 ///
134 /// Controls whether `SourceSpec::Custom` pipeline fragments are
135 /// accepted. Default: [`Reject`](nv_core::security::CustomPipelinePolicy::Reject).
136 ///
137 /// Set to [`AllowTrusted`](nv_core::security::CustomPipelinePolicy::AllowTrusted)
138 /// when pipeline strings originate from trusted sources (e.g.,
139 /// hard-coded application code).
140 #[must_use]
141 pub fn custom_pipeline_policy(
142 mut self,
143 policy: nv_core::security::CustomPipelinePolicy,
144 ) -> Self {
145 self.custom_pipeline_policy = policy;
146 self
147 }
148
149 /// Build the runtime.
150 ///
151 /// # Errors
152 ///
153 /// Returns `ConfigError::InvalidCapacity` if `health_capacity` or
154 /// `output_capacity` is zero.
155 pub fn build(self) -> Result<Runtime, NvError> {
156 use nv_core::error::ConfigError;
157
158 if self.health_capacity == 0 {
159 return Err(ConfigError::InvalidCapacity {
160 field: "health_capacity",
161 }
162 .into());
163 }
164 if self.output_capacity == 0 {
165 return Err(ConfigError::InvalidCapacity {
166 field: "output_capacity",
167 }
168 .into());
169 }
170
171 let (health_tx, _) = broadcast::channel(self.health_capacity);
172 let (output_tx, sentinel_rx) = broadcast::channel(self.output_capacity);
173 let lag_detector = Arc::new(LagDetector::new(sentinel_rx, self.output_capacity));
174
175 let health_sink = Arc::new(BroadcastHealthSink::new(health_tx.clone()));
176 let factory: Arc<dyn MediaIngressFactory> = match self.ingress_factory {
177 Some(f) => Arc::from(f),
178 None => Arc::new(DefaultMediaFactory::with_health_sink(health_sink as _)),
179 };
180
181 let inner = Arc::new(RuntimeInner {
182 max_feeds: self.max_feeds,
183 next_feed_id: AtomicU64::new(1),
184 feeds: Mutex::new(HashMap::new()),
185 coordinators: Mutex::new(Vec::new()),
186 batch_ids: Mutex::new(HashSet::new()),
187 health_tx,
188 output_tx,
189 lag_detector,
190 shutdown: AtomicBool::new(false),
191 factory,
192 started_at: Instant::now(),
193 feed_join_timeout: self.feed_join_timeout,
194 coordinator_join_timeout: self.coordinator_join_timeout,
195 detached_threads: Mutex::new(Vec::new()),
196 custom_pipeline_policy: self.custom_pipeline_policy,
197 });
198
199 Ok(Runtime { inner })
200 }
201}
202
203// ---------------------------------------------------------------------------
204// Shared interior — Arc-wrapped, accessible from both Runtime and RuntimeHandle
205// ---------------------------------------------------------------------------
206
207struct RuntimeInner {
208 max_feeds: usize,
209 next_feed_id: AtomicU64,
210 feeds: Mutex<HashMap<FeedId, RunningFeed>>,
211 /// Batch coordinators owned by the runtime for lifecycle management.
212 coordinators: Mutex<Vec<BatchCoordinator>>,
213 /// Tracks all claimed batch-processor IDs (active + in-progress starts).
214 /// Used as the serialization point for duplicate-ID rejection without
215 /// holding the heavier `coordinators` lock during `BatchCoordinator::start`.
216 batch_ids: Mutex<HashSet<StageId>>,
217 health_tx: broadcast::Sender<HealthEvent>,
218 output_tx: broadcast::Sender<SharedOutput>,
219 lag_detector: Arc<LagDetector>,
220 shutdown: AtomicBool,
221 factory: Arc<dyn MediaIngressFactory>,
222 /// Instant when the runtime was created. Used by `uptime()`.
223 started_at: Instant,
224 /// Timeout for joining feed worker threads.
225 feed_join_timeout: Duration,
226 /// Timeout for joining batch coordinator threads.
227 coordinator_join_timeout: Duration,
228 /// Threads that were detached due to join timeouts, tracked for
229 /// periodic reaping to prevent unbounded thread accumulation.
230 detached_threads: Mutex<Vec<DetachedJoin>>,
231 /// Security policy for custom pipeline fragments.
232 custom_pipeline_policy: nv_core::security::CustomPipelinePolicy,
233}
234
235/// Internal state tracked per running feed.
236struct RunningFeed {
237 shared: Arc<FeedSharedState>,
238 thread: Option<std::thread::JoinHandle<()>>,
239}
240
241impl RuntimeInner {
242 fn feed_count(&self) -> Result<usize, NvError> {
243 let feeds = self
244 .feeds
245 .lock()
246 .map_err(|_| NvError::Runtime(RuntimeError::RegistryPoisoned))?;
247 Ok(feeds.len())
248 }
249
250 /// Attempt to reap detached threads that have since finished.
251 /// Returns the number of threads still detached after reaping.
252 fn reap_detached(&self) -> usize {
253 let mut detached = self
254 .detached_threads
255 .lock()
256 .unwrap_or_else(|e| e.into_inner());
257 detached.retain(|d| {
258 match d.done_rx.try_recv() {
259 Ok(_) | Err(std::sync::mpsc::TryRecvError::Disconnected) => {
260 // Thread finished or channel dropped — safe to remove.
261 false
262 }
263 Err(std::sync::mpsc::TryRecvError::Empty) => true,
264 }
265 });
266 detached.len()
267 }
268
269 /// Store detached thread handles for later reaping.
270 fn track_detached(&self, joins: impl IntoIterator<Item = DetachedJoin>) {
271 let mut detached = self
272 .detached_threads
273 .lock()
274 .unwrap_or_else(|e| e.into_inner());
275 detached.extend(joins);
276 }
277
278 fn diagnostics(&self) -> Result<crate::diagnostics::RuntimeDiagnostics, NvError> {
279 // Snapshot Arc refs under the feed lock, then release immediately.
280 let shared_refs: Vec<Arc<FeedSharedState>> = {
281 let feeds = self
282 .feeds
283 .lock()
284 .map_err(|_| NvError::Runtime(RuntimeError::RegistryPoisoned))?;
285 feeds
286 .values()
287 .map(|entry| Arc::clone(&entry.shared))
288 .collect()
289 };
290
291 // Build per-feed diagnostics outside the lock.
292 let mut feed_diags: Vec<crate::diagnostics::FeedDiagnostics> = shared_refs
293 .iter()
294 .map(|shared| FeedHandle::new(Arc::clone(shared)).diagnostics())
295 .collect();
296
297 // Stable ordering by FeedId for deterministic dashboard diffing.
298 feed_diags.sort_by_key(|d| d.feed_id.as_u64());
299
300 // Batch coordinator diagnostics.
301 let batches: Vec<crate::diagnostics::BatchDiagnostics> = self
302 .coordinators
303 .lock()
304 .map_err(|_| NvError::Runtime(RuntimeError::RegistryPoisoned))?
305 .iter()
306 .map(|c| {
307 let h = c.handle();
308 crate::diagnostics::BatchDiagnostics {
309 processor_id: h.processor_id(),
310 metrics: h.metrics(),
311 }
312 })
313 .collect();
314
315 // Output channel lag status.
316 let output_lag = self.lag_detector.status();
317
318 // Reap any detached threads that have since finished and
319 // report how many are still outstanding.
320 let detached_thread_count = self.reap_detached();
321
322 Ok(crate::diagnostics::RuntimeDiagnostics {
323 uptime: self.started_at.elapsed(),
324 feed_count: feed_diags.len(),
325 max_feeds: self.max_feeds,
326 feeds: feed_diags,
327 batches,
328 output_lag,
329 detached_thread_count,
330 })
331 }
332
333 fn create_batch(
334 &self,
335 processor: Box<dyn BatchProcessor>,
336 config: BatchConfig,
337 ) -> Result<BatchHandle, NvError> {
338 if self.shutdown.load(Ordering::Relaxed) {
339 return Err(NvError::Runtime(RuntimeError::ShutdownInProgress));
340 }
341
342 // Reserve the processor ID in the lightweight batch_ids set. This
343 // is the serialization point for duplicate rejection — it avoids
344 // holding the heavier `coordinators` lock during the potentially
345 // slow `BatchCoordinator::start` (up to startup_timeout).
346 let processor_id = processor.id();
347 {
348 let mut ids = self
349 .batch_ids
350 .lock()
351 .map_err(|_| NvError::Runtime(RuntimeError::RegistryPoisoned))?;
352 if !ids.insert(processor_id) {
353 return Err(NvError::Config(
354 nv_core::error::ConfigError::DuplicateBatchProcessorId { id: processor_id },
355 ));
356 }
357 }
358
359 // Start coordinator without holding any lock (can block up to 30 s).
360 let coordinator = match BatchCoordinator::start(processor, config, self.health_tx.clone()) {
361 Ok(c) => c,
362 Err(e) => {
363 // Release the ID reservation on startup failure.
364 if let Ok(mut ids) = self.batch_ids.lock() {
365 ids.remove(&processor_id);
366 }
367 return Err(e);
368 }
369 };
370 let handle = coordinator.handle();
371
372 {
373 let mut coords = self
374 .coordinators
375 .lock()
376 .map_err(|_| NvError::Runtime(RuntimeError::RegistryPoisoned))?;
377 coords.push(coordinator);
378
379 // Re-check shutdown *under the same lock* that shutdown_all
380 // acquires to iterate coordinators. If shutdown slipped in
381 // during the (potentially 30 s) BatchCoordinator::start()
382 // window, this coordinator was never signaled. Remove it,
383 // shut it down inline, and return an error so the caller
384 // does not receive a handle to a dead coordinator.
385 if self.shutdown.load(Ordering::Acquire) {
386 if let Some(orphan) = coords.pop() {
387 drop(coords);
388 if let Some(detached) = orphan.shutdown(self.coordinator_join_timeout) {
389 self.track_detached(std::iter::once(detached));
390 }
391 }
392 if let Ok(mut ids) = self.batch_ids.lock() {
393 ids.remove(&processor_id);
394 }
395 return Err(NvError::Runtime(RuntimeError::ShutdownInProgress));
396 }
397 }
398
399 Ok(handle)
400 }
401
402 fn add_feed(&self, config: FeedConfig) -> Result<FeedHandle, NvError> {
403 if self.shutdown.load(Ordering::Relaxed) {
404 return Err(NvError::Runtime(RuntimeError::ShutdownInProgress));
405 }
406
407 // --- Security validation ---
408 // Reject Custom pipelines unless policy allows them.
409 if matches!(config.source, nv_core::config::SourceSpec::Custom { .. })
410 && self.custom_pipeline_policy == nv_core::security::CustomPipelinePolicy::Reject
411 {
412 return Err(NvError::Media(
413 nv_core::error::MediaError::CustomPipelineRejected,
414 ));
415 }
416 // Reject insecure RTSP URLs when RequireTls is set.
417 if let nv_core::config::SourceSpec::Rtsp {
418 ref url, security, ..
419 } = config.source
420 && security == nv_core::security::RtspSecurityPolicy::RequireTls
421 && nv_core::security::is_insecure_rtsp(url)
422 {
423 return Err(NvError::Media(
424 nv_core::error::MediaError::InsecureRtspRejected,
425 ));
426 }
427
428 let mut feeds = self
429 .feeds
430 .lock()
431 .map_err(|_| NvError::Runtime(RuntimeError::RegistryPoisoned))?;
432
433 if feeds.len() >= self.max_feeds {
434 return Err(NvError::Runtime(RuntimeError::FeedLimitExceeded {
435 max: self.max_feeds,
436 }));
437 }
438
439 let id = FeedId::new(self.next_feed_id.fetch_add(1, Ordering::Relaxed));
440
441 let (shared, thread) = worker::spawn_feed_worker(
442 id,
443 config,
444 Arc::clone(&self.factory),
445 self.health_tx.clone(),
446 self.output_tx.clone(),
447 Arc::clone(&self.lag_detector),
448 )?;
449
450 let handle = FeedHandle::new(Arc::clone(&shared));
451
452 feeds.insert(
453 id,
454 RunningFeed {
455 shared,
456 thread: Some(thread),
457 },
458 );
459
460 Ok(handle)
461 }
462
463 fn remove_feed(&self, feed_id: FeedId) -> Result<(), NvError> {
464 let mut feeds = self
465 .feeds
466 .lock()
467 .map_err(|_| NvError::Runtime(RuntimeError::RegistryPoisoned))?;
468
469 let entry = feeds
470 .remove(&feed_id)
471 .ok_or(NvError::Runtime(RuntimeError::FeedNotFound { feed_id }))?;
472
473 entry.shared.request_shutdown();
474 drop(feeds);
475
476 if let Some(handle) = entry.thread
477 && let Some(detached) =
478 bounded_join(handle, feed_id, &self.health_tx, self.feed_join_timeout)
479 {
480 self.track_detached(std::iter::once(detached));
481 }
482 Ok(())
483 }
484
485 fn shutdown_all(&self) -> Result<(), NvError> {
486 // Release so that the Acquire re-check in create_batch() sees it.
487 self.shutdown.store(true, Ordering::Release);
488
489 // --- Phase 1: Signal everything to stop. ---
490 //
491 // Signal feeds first so they stop submitting new frames.
492 let mut feeds = self
493 .feeds
494 .lock()
495 .map_err(|_| NvError::Runtime(RuntimeError::RegistryPoisoned))?;
496
497 for entry in feeds.values() {
498 entry.shared.request_shutdown();
499 }
500
501 let entries: Vec<_> = feeds.drain().collect();
502 drop(feeds);
503
504 // Signal batch coordinators *before* joining feed threads.
505 // Feed threads blocked in `submit_and_wait` unblock once the
506 // coordinator processes remaining items and exits (disconnecting
507 // response channels). Without this, feeds can hang for up to
508 // `max_latency + RESPONSE_TIMEOUT_SAFETY` waiting for a batch
509 // that the coordinator hasn't been told to stop.
510 //
511 // Use unwrap_or_else(PoisonError::into_inner) to guarantee
512 // coordinator signaling even when the mutex is poisoned —
513 // shutdown must not silently skip cleanup.
514 {
515 let coordinators = self.coordinators.lock().unwrap_or_else(|e| e.into_inner());
516 for coordinator in coordinators.iter() {
517 coordinator.signal_shutdown();
518 }
519 }
520
521 // --- Phase 2: Join feed threads. ---
522 //
523 // Coordinators are shutting down concurrently, so feed threads
524 // blocked on batch responses will unblock promptly.
525 for (id, mut entry) in entries {
526 if let Some(handle) = entry.thread.take()
527 && let Some(detached) =
528 bounded_join(handle, id, &self.health_tx, self.feed_join_timeout)
529 {
530 self.track_detached(std::iter::once(detached));
531 }
532 }
533
534 // --- Phase 3: Join batch coordinators. ---
535 //
536 // All feed threads are done (or detached). No new submissions.
537 {
538 let mut coordinators = self.coordinators.lock().unwrap_or_else(|e| e.into_inner());
539 let detached: Vec<_> = coordinators
540 .drain(..)
541 .filter_map(|c| c.shutdown(self.coordinator_join_timeout))
542 .collect();
543 if !detached.is_empty() {
544 self.track_detached(detached);
545 }
546 }
547
548 // Flush any pending sentinel-observed delta that was throttled
549 // but never emitted.
550 self.lag_detector.flush(&self.health_tx);
551
552 Ok(())
553 }
554}
555
556/// Join a feed worker thread with a bounded timeout.
557///
558/// If the thread does not finish within `timeout`, it is
559/// detached (the helper thread will eventually join when the worker
560/// finishes) and a `FeedStopped` health event with a timeout reason
561/// is emitted.
562///
563/// Returns `Some(DetachedJoin)` when the thread did not finish in time,
564/// allowing the caller to track the detached thread for later reaping.
565fn bounded_join(
566 handle: std::thread::JoinHandle<()>,
567 feed_id: FeedId,
568 health_tx: &broadcast::Sender<HealthEvent>,
569 timeout: Duration,
570) -> Option<DetachedJoin> {
571 let (done_tx, done_rx) = std::sync::mpsc::channel();
572 let label = format!("nv-join-{feed_id}");
573 let joiner = std::thread::Builder::new()
574 .name(label.clone())
575 .spawn(move || {
576 let result = handle.join();
577 let _ = done_tx.send(result);
578 });
579 match done_rx.recv_timeout(timeout) {
580 Ok(Ok(())) => None,
581 Ok(Err(_)) => {
582 tracing::error!(
583 feed_id = %feed_id,
584 "feed worker thread panicked during join",
585 );
586 None
587 }
588 Err(_) => {
589 tracing::warn!(
590 feed_id = %feed_id,
591 timeout_secs = timeout.as_secs(),
592 "feed worker thread did not finish within timeout — detaching",
593 );
594 let _ = health_tx.send(HealthEvent::FeedStopped {
595 feed_id,
596 reason: nv_core::health::StopReason::Fatal {
597 detail: format!(
598 "worker thread did not join within {}s — detached",
599 timeout.as_secs()
600 ),
601 },
602 });
603 joiner.ok().map(|j| DetachedJoin {
604 label,
605 done_rx,
606 joiner: j,
607 })
608 }
609 }
610}
611
612/// Tracks a thread that was detached due to a join timeout.
613///
614/// When a `bounded_join` times out, the helper thread continues waiting
615/// for the original thread to finish. This struct retains the helper's
616/// `JoinHandle` and the completion channel so the runtime can
617/// periodically attempt to reap finished threads.
618pub(crate) struct DetachedJoin {
619 /// Human-readable label for diagnostics logging.
620 #[allow(dead_code)]
621 pub(crate) label: String,
622 /// Receives the original thread's join result when the helper
623 /// thread finishes its blocking join.
624 pub(crate) done_rx: std::sync::mpsc::Receiver<std::thread::Result<()>>,
625 /// Helper thread handle — joining this is instantaneous once
626 /// `done_rx` has a message.
627 #[allow(dead_code)]
628 pub(crate) joiner: std::thread::JoinHandle<()>,
629}
630
631// ---------------------------------------------------------------------------
632// Runtime — owning entry point
633// ---------------------------------------------------------------------------
634
635/// The top-level NextVision runtime.
636///
637/// Manages cross-feed concerns: feed registry, global limits, and shutdown.
638/// Create via [`Runtime::builder()`].
639///
640/// Use [`handle()`](Runtime::handle) to obtain a cloneable [`RuntimeHandle`]
641/// for concurrent control from multiple threads. The `Runtime` itself can
642/// also be used directly for convenience.
643pub struct Runtime {
644 inner: Arc<RuntimeInner>,
645}
646
647impl Runtime {
648 /// Create a new [`RuntimeBuilder`].
649 #[must_use]
650 pub fn builder() -> RuntimeBuilder {
651 RuntimeBuilder {
652 max_feeds: 64,
653 health_capacity: DEFAULT_HEALTH_CAPACITY,
654 output_capacity: DEFAULT_OUTPUT_CAPACITY,
655 ingress_factory: None,
656 feed_join_timeout: DEFAULT_FEED_JOIN_TIMEOUT,
657 coordinator_join_timeout: DEFAULT_COORDINATOR_JOIN_TIMEOUT,
658 custom_pipeline_policy: nv_core::security::CustomPipelinePolicy::default(),
659 }
660 }
661
662 /// Obtain a cloneable [`RuntimeHandle`].
663 ///
664 /// The handle provides the same control surface as `Runtime` but can
665 /// be cloned and shared across threads.
666 #[must_use]
667 pub fn handle(&self) -> RuntimeHandle {
668 RuntimeHandle {
669 inner: Arc::clone(&self.inner),
670 }
671 }
672
673 /// Number of currently active feeds.
674 ///
675 /// # Errors
676 ///
677 /// Returns `RuntimeError::RegistryPoisoned` if the internal lock is poisoned.
678 pub fn feed_count(&self) -> Result<usize, NvError> {
679 self.inner.feed_count()
680 }
681
682 /// Maximum allowed concurrent feeds.
683 #[must_use]
684 pub fn max_feeds(&self) -> usize {
685 self.inner.max_feeds
686 }
687
688 /// Elapsed time since the runtime was created.
689 ///
690 /// Monotonically increasing. Useful for uptime dashboards and
691 /// health checks.
692 #[must_use]
693 pub fn uptime(&self) -> Duration {
694 self.inner.started_at.elapsed()
695 }
696
697 /// Get a consolidated diagnostics snapshot of the runtime and all feeds.
698 ///
699 /// Returns per-feed lifecycle state, metrics, queue depths, decode status,
700 /// and view-system health, plus batch coordinator metrics and output
701 /// channel lag status.
702 ///
703 /// Designed for periodic polling (1–5 s) by dashboards and health
704 /// probes. Complement with [`health_subscribe()`](Self::health_subscribe)
705 /// for event-driven state transitions.
706 ///
707 /// # Errors
708 ///
709 /// Returns `RuntimeError::RegistryPoisoned` if an internal lock is poisoned.
710 pub fn diagnostics(&self) -> Result<crate::diagnostics::RuntimeDiagnostics, NvError> {
711 self.inner.diagnostics()
712 }
713
714 /// Subscribe to aggregate health events from all feeds.
715 pub fn health_subscribe(&self) -> broadcast::Receiver<HealthEvent> {
716 self.inner.health_tx.subscribe()
717 }
718
719 /// Subscribe to aggregate output from all feeds.
720 ///
721 /// Each subscriber receives an `Arc<OutputEnvelope>` for every output
722 /// produced by any feed. The channel is bounded by the configured
723 /// `output_capacity` (default 256). Slow subscribers will receive
724 /// `RecvError::Lagged` when they fall behind.
725 ///
726 /// Channel saturation is monitored by an internal sentinel receiver.
727 /// When the sentinel detects ring-buffer wrap, the runtime emits a
728 /// global [`HealthEvent::OutputLagged`] event carrying the
729 /// sentinel-observed per-event delta. This is a saturation signal,
730 /// not a per-subscriber loss report. Live saturation state is also
731 /// available via [`Runtime::diagnostics()`] in
732 /// [`RuntimeDiagnostics::output_lag`](crate::diagnostics::RuntimeDiagnostics::output_lag).
733 pub fn output_subscribe(&self) -> broadcast::Receiver<SharedOutput> {
734 self.inner.output_tx.subscribe()
735 }
736
737 /// Add a new feed to the runtime.
738 ///
739 /// # Errors
740 ///
741 /// - `RuntimeError::FeedLimitExceeded` if the max feed count is reached.
742 /// - `RuntimeError::ShutdownInProgress` if shutdown has been initiated.
743 /// - `RuntimeError::ThreadSpawnFailed` if the OS thread cannot be created.
744 pub fn add_feed(&self, config: FeedConfig) -> Result<FeedHandle, NvError> {
745 self.inner.add_feed(config)
746 }
747
748 /// Create a shared batch coordinator for cross-feed inference.
749 ///
750 /// Returns a clonable [`BatchHandle`] that can be shared across
751 /// multiple feeds via `FeedPipeline::builder().batch(handle)`.
752 ///
753 /// The coordinator takes **ownership** of the processor (via `Box`).
754 /// A single coordinator thread is the sole caller of all processor
755 /// methods — no `Sync` bound is required. The coordinator is shut
756 /// down automatically when the runtime shuts down.
757 ///
758 /// # Errors
759 ///
760 /// - `RuntimeError::ShutdownInProgress` if shutdown has been initiated.
761 /// - `ConfigError::DuplicateBatchProcessorId` if a coordinator with the
762 /// same processor ID already exists.
763 /// - `ConfigError::InvalidPolicy` if `config` has invalid values.
764 /// - `RuntimeError::ThreadSpawnFailed` if the coordinator thread fails.
765 pub fn create_batch(
766 &self,
767 processor: Box<dyn BatchProcessor>,
768 config: BatchConfig,
769 ) -> Result<BatchHandle, NvError> {
770 self.inner.create_batch(processor, config)
771 }
772
773 /// Remove a feed by ID, stopping it gracefully.
774 ///
775 /// # Errors
776 ///
777 /// Returns `RuntimeError::FeedNotFound` if the ID does not exist.
778 pub fn remove_feed(&self, feed_id: FeedId) -> Result<(), NvError> {
779 self.inner.remove_feed(feed_id)
780 }
781
782 /// Initiate graceful shutdown of all feeds.
783 ///
784 /// Signals all worker threads to stop, waits for them to terminate,
785 /// and returns. After shutdown the runtime cannot accept new feeds.
786 pub fn shutdown(self) -> Result<(), NvError> {
787 self.inner.shutdown_all()
788 }
789}
790
791impl Drop for Runtime {
792 fn drop(&mut self) {
793 // Best-effort cleanup: if the user forgot to call shutdown(), make
794 // sure all worker threads are signaled and joined rather than
795 // silently detached.
796 if !self.inner.shutdown.load(Ordering::Acquire) {
797 let _ = self.inner.shutdown_all();
798 }
799 }
800}
801
802// ---------------------------------------------------------------------------
803// RuntimeHandle — cloneable control surface
804// ---------------------------------------------------------------------------
805
806/// Cloneable handle to the runtime.
807///
808/// Provides the same control surface as [`Runtime`] — add/remove feeds,
809/// subscribe to health and output events, and trigger shutdown.
810///
811/// Obtain via [`Runtime::handle()`].
812#[derive(Clone)]
813pub struct RuntimeHandle {
814 inner: Arc<RuntimeInner>,
815}
816
817impl RuntimeHandle {
818 /// Number of currently active feeds.
819 pub fn feed_count(&self) -> Result<usize, NvError> {
820 self.inner.feed_count()
821 }
822
823 /// Maximum allowed concurrent feeds.
824 #[must_use]
825 pub fn max_feeds(&self) -> usize {
826 self.inner.max_feeds
827 }
828
829 /// Elapsed time since the runtime was created.
830 ///
831 /// Monotonically increasing. Useful for uptime dashboards and
832 /// health checks.
833 #[must_use]
834 pub fn uptime(&self) -> Duration {
835 self.inner.started_at.elapsed()
836 }
837
838 /// Get a consolidated diagnostics snapshot of the runtime and all feeds.
839 ///
840 /// See [`Runtime::diagnostics()`] for details.
841 pub fn diagnostics(&self) -> Result<crate::diagnostics::RuntimeDiagnostics, NvError> {
842 self.inner.diagnostics()
843 }
844
845 /// Subscribe to aggregate health events from all feeds.
846 pub fn health_subscribe(&self) -> broadcast::Receiver<HealthEvent> {
847 self.inner.health_tx.subscribe()
848 }
849
850 /// Subscribe to aggregate output from all feeds.
851 ///
852 /// Bounded by the configured `output_capacity`. Slow subscribers
853 /// receive `RecvError::Lagged`. Channel saturation is reported via
854 /// [`HealthEvent::OutputLagged`] (sentinel-observed, not
855 /// per-subscriber loss).
856 pub fn output_subscribe(&self) -> broadcast::Receiver<SharedOutput> {
857 self.inner.output_tx.subscribe()
858 }
859
860 /// Add a new feed to the runtime.
861 ///
862 /// # Errors
863 ///
864 /// See [`Runtime::add_feed()`].
865 pub fn add_feed(&self, config: FeedConfig) -> Result<FeedHandle, NvError> {
866 self.inner.add_feed(config)
867 }
868
869 /// Create a shared batch coordinator.
870 ///
871 /// See [`Runtime::create_batch()`] for details.
872 pub fn create_batch(
873 &self,
874 processor: Box<dyn BatchProcessor>,
875 config: BatchConfig,
876 ) -> Result<BatchHandle, NvError> {
877 self.inner.create_batch(processor, config)
878 }
879
880 /// Remove a feed by ID, stopping it gracefully.
881 ///
882 /// # Errors
883 ///
884 /// Returns `RuntimeError::FeedNotFound` if the ID does not exist.
885 pub fn remove_feed(&self, feed_id: FeedId) -> Result<(), NvError> {
886 self.inner.remove_feed(feed_id)
887 }
888
889 /// Trigger graceful shutdown of all feeds.
890 ///
891 /// Signals all worker threads to stop, waits for them to terminate,
892 /// and returns. Unlike [`Runtime::shutdown()`], this does not consume
893 /// the handle — it can be called from any clone.
894 pub fn shutdown(&self) -> Result<(), NvError> {
895 self.inner.shutdown_all()
896 }
897}
898
899#[cfg(test)]
900#[path = "runtime_tests/mod.rs"]
901mod tests;