Skip to main content

nv_runtime/
runtime.rs

1//! Runtime, runtime builder, and runtime handle.
2//!
3//! The [`Runtime`] is the top-level owner constructed via [`RuntimeBuilder`].
4//! After building, call [`Runtime::handle()`] to obtain a [`RuntimeHandle`] —
5//! a cheaply cloneable control surface for adding/removing feeds, subscribing
6//! to health and output events, and triggering shutdown.
7//!
8//! [`Runtime::shutdown()`] consumes the runtime, joins all worker threads,
9//! and guarantees a clean stop.
10//!
11//! Each feed runs on a dedicated OS thread (see the `worker` module).
12
13use std::collections::{HashMap, HashSet};
14use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
15use std::sync::{Arc, Mutex};
16use std::time::{Duration, Instant};
17
18use nv_core::error::{NvError, RuntimeError};
19use nv_core::health::HealthEvent;
20use nv_core::id::{FeedId, StageId};
21use nv_media::DefaultMediaFactory;
22use nv_media::ingress::MediaIngressFactory;
23use nv_perception::BatchProcessor;
24use tokio::sync::broadcast;
25
26use crate::batch::{BatchConfig, BatchCoordinator, BatchHandle};
27use crate::feed::FeedConfig;
28use crate::feed_handle::FeedHandle;
29use crate::output::{LagDetector, SharedOutput};
30use crate::worker::{self, BroadcastHealthSink, FeedSharedState};
31
32/// Default health broadcast channel capacity.
33const DEFAULT_HEALTH_CAPACITY: usize = 256;
34
35/// Default output broadcast channel capacity.
36const DEFAULT_OUTPUT_CAPACITY: usize = 256;
37
38/// Default time to wait for a feed worker thread to join during
39/// remove/shutdown. If exceeded, the thread is detached.
40const DEFAULT_FEED_JOIN_TIMEOUT: Duration = Duration::from_secs(10);
41
42/// Default time to wait for a batch coordinator thread to join.
43const DEFAULT_COORDINATOR_JOIN_TIMEOUT: Duration = Duration::from_secs(10);
44
45/// Builder for constructing a [`Runtime`].
46///
47/// The runtime uses a default media backend unless a custom
48/// [`MediaIngressFactory`] is supplied via [`ingress_factory()`](Self::ingress_factory).
49///
50/// # Example
51///
52/// ```
53/// use nv_runtime::Runtime;
54///
55/// let runtime = Runtime::builder()
56///     .max_feeds(16)
57///     .build()
58///     .expect("failed to build runtime");
59/// ```
60pub struct RuntimeBuilder {
61    max_feeds: usize,
62    health_capacity: usize,
63    output_capacity: usize,
64    ingress_factory: Option<Box<dyn MediaIngressFactory>>,
65    feed_join_timeout: Duration,
66    coordinator_join_timeout: Duration,
67    custom_pipeline_policy: nv_core::security::CustomPipelinePolicy,
68}
69
70impl RuntimeBuilder {
71    /// Set the maximum number of concurrent feeds. Default: `64`.
72    #[must_use]
73    pub fn max_feeds(mut self, max: usize) -> Self {
74        self.max_feeds = max;
75        self
76    }
77
78    /// Set the health broadcast channel capacity. Default: `256`.
79    #[must_use]
80    pub fn health_capacity(mut self, cap: usize) -> Self {
81        self.health_capacity = cap;
82        self
83    }
84
85    /// Set the output broadcast channel capacity. Default: `256`.
86    ///
87    /// Controls how many `OutputEnvelope`s the aggregate output
88    /// subscription channel can buffer before the ring buffer wraps.
89    ///
90    /// When the internal sentinel receiver detects ring-buffer wrap,
91    /// the runtime emits a global [`HealthEvent::OutputLagged`] event
92    /// carrying the sentinel-observed per-event delta (not cumulative).
93    /// This indicates channel saturation / backpressure risk — it does
94    /// **not** guarantee that any specific external subscriber lost
95    /// messages.
96    #[must_use]
97    pub fn output_capacity(mut self, cap: usize) -> Self {
98        self.output_capacity = cap;
99        self
100    }
101
102    /// Set a custom `MediaIngressFactory`.
103    ///
104    /// By default the runtime uses the built-in media backend
105    /// ([`DefaultMediaFactory`]).
106    /// Replace this for testing or alternative backends.
107    #[must_use]
108    pub fn ingress_factory(mut self, factory: Box<dyn MediaIngressFactory>) -> Self {
109        self.ingress_factory = Some(factory);
110        self
111    }
112
113    /// Set the maximum time to wait for a feed worker thread to join
114    /// during shutdown or removal. Default: `10s`.
115    ///
116    /// If a feed thread does not finish within this timeout, it is
117    /// detached and a health event is emitted.
118    #[must_use]
119    pub fn feed_join_timeout(mut self, timeout: Duration) -> Self {
120        self.feed_join_timeout = timeout;
121        self
122    }
123
124    /// Set the maximum time to wait for a batch coordinator thread
125    /// to join during shutdown. Default: `10s`.
126    #[must_use]
127    pub fn coordinator_join_timeout(mut self, timeout: Duration) -> Self {
128        self.coordinator_join_timeout = timeout;
129        self
130    }
131
132    /// Set the custom pipeline security policy.
133    ///
134    /// Controls whether `SourceSpec::Custom` pipeline fragments are
135    /// accepted. Default: [`Reject`](nv_core::security::CustomPipelinePolicy::Reject).
136    ///
137    /// Set to [`AllowTrusted`](nv_core::security::CustomPipelinePolicy::AllowTrusted)
138    /// when pipeline strings originate from trusted sources (e.g.,
139    /// hard-coded application code).
140    #[must_use]
141    pub fn custom_pipeline_policy(
142        mut self,
143        policy: nv_core::security::CustomPipelinePolicy,
144    ) -> Self {
145        self.custom_pipeline_policy = policy;
146        self
147    }
148
149    /// Build the runtime.
150    ///
151    /// # Errors
152    ///
153    /// Returns `ConfigError::InvalidCapacity` if `health_capacity` or
154    /// `output_capacity` is zero.
155    pub fn build(self) -> Result<Runtime, NvError> {
156        use nv_core::error::ConfigError;
157
158        if self.health_capacity == 0 {
159            return Err(ConfigError::InvalidCapacity {
160                field: "health_capacity",
161            }
162            .into());
163        }
164        if self.output_capacity == 0 {
165            return Err(ConfigError::InvalidCapacity {
166                field: "output_capacity",
167            }
168            .into());
169        }
170
171        let (health_tx, _) = broadcast::channel(self.health_capacity);
172        let (output_tx, sentinel_rx) = broadcast::channel(self.output_capacity);
173        let lag_detector = Arc::new(LagDetector::new(sentinel_rx, self.output_capacity));
174
175        let health_sink = Arc::new(BroadcastHealthSink::new(health_tx.clone()));
176        let factory: Arc<dyn MediaIngressFactory> = match self.ingress_factory {
177            Some(f) => Arc::from(f),
178            None => Arc::new(DefaultMediaFactory::with_health_sink(health_sink as _)),
179        };
180
181        let inner = Arc::new(RuntimeInner {
182            max_feeds: self.max_feeds,
183            next_feed_id: AtomicU64::new(1),
184            feeds: Mutex::new(HashMap::new()),
185            coordinators: Mutex::new(Vec::new()),
186            batch_ids: Mutex::new(HashSet::new()),
187            health_tx,
188            output_tx,
189            lag_detector,
190            shutdown: AtomicBool::new(false),
191            factory,
192            started_at: Instant::now(),
193            feed_join_timeout: self.feed_join_timeout,
194            coordinator_join_timeout: self.coordinator_join_timeout,
195            detached_threads: Mutex::new(Vec::new()),
196            custom_pipeline_policy: self.custom_pipeline_policy,
197        });
198
199        Ok(Runtime { inner })
200    }
201}
202
203// ---------------------------------------------------------------------------
204// Shared interior — Arc-wrapped, accessible from both Runtime and RuntimeHandle
205// ---------------------------------------------------------------------------
206
207struct RuntimeInner {
208    max_feeds: usize,
209    next_feed_id: AtomicU64,
210    feeds: Mutex<HashMap<FeedId, RunningFeed>>,
211    /// Batch coordinators owned by the runtime for lifecycle management.
212    coordinators: Mutex<Vec<BatchCoordinator>>,
213    /// Tracks all claimed batch-processor IDs (active + in-progress starts).
214    /// Used as the serialization point for duplicate-ID rejection without
215    /// holding the heavier `coordinators` lock during `BatchCoordinator::start`.
216    batch_ids: Mutex<HashSet<StageId>>,
217    health_tx: broadcast::Sender<HealthEvent>,
218    output_tx: broadcast::Sender<SharedOutput>,
219    lag_detector: Arc<LagDetector>,
220    shutdown: AtomicBool,
221    factory: Arc<dyn MediaIngressFactory>,
222    /// Instant when the runtime was created. Used by `uptime()`.
223    started_at: Instant,
224    /// Timeout for joining feed worker threads.
225    feed_join_timeout: Duration,
226    /// Timeout for joining batch coordinator threads.
227    coordinator_join_timeout: Duration,
228    /// Threads that were detached due to join timeouts, tracked for
229    /// periodic reaping to prevent unbounded thread accumulation.
230    detached_threads: Mutex<Vec<DetachedJoin>>,
231    /// Security policy for custom pipeline fragments.
232    custom_pipeline_policy: nv_core::security::CustomPipelinePolicy,
233}
234
235/// Internal state tracked per running feed.
236struct RunningFeed {
237    shared: Arc<FeedSharedState>,
238    thread: Option<std::thread::JoinHandle<()>>,
239}
240
241impl RuntimeInner {
242    fn feed_count(&self) -> Result<usize, NvError> {
243        let feeds = self
244            .feeds
245            .lock()
246            .map_err(|_| NvError::Runtime(RuntimeError::RegistryPoisoned))?;
247        Ok(feeds.len())
248    }
249
250    /// Attempt to reap detached threads that have since finished.
251    /// Returns the number of threads still detached after reaping.
252    fn reap_detached(&self) -> usize {
253        let mut detached = self
254            .detached_threads
255            .lock()
256            .unwrap_or_else(|e| e.into_inner());
257        detached.retain(|d| {
258            match d.done_rx.try_recv() {
259                Ok(_) | Err(std::sync::mpsc::TryRecvError::Disconnected) => {
260                    // Thread finished or channel dropped — safe to remove.
261                    false
262                }
263                Err(std::sync::mpsc::TryRecvError::Empty) => true,
264            }
265        });
266        detached.len()
267    }
268
269    /// Store detached thread handles for later reaping.
270    fn track_detached(&self, joins: impl IntoIterator<Item = DetachedJoin>) {
271        let mut detached = self
272            .detached_threads
273            .lock()
274            .unwrap_or_else(|e| e.into_inner());
275        detached.extend(joins);
276    }
277
278    fn diagnostics(&self) -> Result<crate::diagnostics::RuntimeDiagnostics, NvError> {
279        // Snapshot Arc refs under the feed lock, then release immediately.
280        let shared_refs: Vec<Arc<FeedSharedState>> = {
281            let feeds = self
282                .feeds
283                .lock()
284                .map_err(|_| NvError::Runtime(RuntimeError::RegistryPoisoned))?;
285            feeds
286                .values()
287                .map(|entry| Arc::clone(&entry.shared))
288                .collect()
289        };
290
291        // Build per-feed diagnostics outside the lock.
292        let mut feed_diags: Vec<crate::diagnostics::FeedDiagnostics> = shared_refs
293            .iter()
294            .map(|shared| FeedHandle::new(Arc::clone(shared)).diagnostics())
295            .collect();
296
297        // Stable ordering by FeedId for deterministic dashboard diffing.
298        feed_diags.sort_by_key(|d| d.feed_id.as_u64());
299
300        // Batch coordinator diagnostics.
301        let batches: Vec<crate::diagnostics::BatchDiagnostics> = self
302            .coordinators
303            .lock()
304            .map_err(|_| NvError::Runtime(RuntimeError::RegistryPoisoned))?
305            .iter()
306            .map(|c| {
307                let h = c.handle();
308                crate::diagnostics::BatchDiagnostics {
309                    processor_id: h.processor_id(),
310                    metrics: h.metrics(),
311                }
312            })
313            .collect();
314
315        // Output channel lag status.
316        let output_lag = self.lag_detector.status();
317
318        // Reap any detached threads that have since finished and
319        // report how many are still outstanding.
320        let detached_thread_count = self.reap_detached();
321
322        Ok(crate::diagnostics::RuntimeDiagnostics {
323            uptime: self.started_at.elapsed(),
324            feed_count: feed_diags.len(),
325            max_feeds: self.max_feeds,
326            feeds: feed_diags,
327            batches,
328            output_lag,
329            detached_thread_count,
330        })
331    }
332
333    fn create_batch(
334        &self,
335        processor: Box<dyn BatchProcessor>,
336        config: BatchConfig,
337    ) -> Result<BatchHandle, NvError> {
338        if self.shutdown.load(Ordering::Relaxed) {
339            return Err(NvError::Runtime(RuntimeError::ShutdownInProgress));
340        }
341
342        // Reserve the processor ID in the lightweight batch_ids set.  This
343        // is the serialization point for duplicate rejection — it avoids
344        // holding the heavier `coordinators` lock during the potentially
345        // slow `BatchCoordinator::start` (up to startup_timeout).
346        let processor_id = processor.id();
347        {
348            let mut ids = self
349                .batch_ids
350                .lock()
351                .map_err(|_| NvError::Runtime(RuntimeError::RegistryPoisoned))?;
352            if !ids.insert(processor_id) {
353                return Err(NvError::Config(
354                    nv_core::error::ConfigError::DuplicateBatchProcessorId { id: processor_id },
355                ));
356            }
357        }
358
359        // Start coordinator without holding any lock (can block up to 30 s).
360        let coordinator = match BatchCoordinator::start(processor, config, self.health_tx.clone()) {
361            Ok(c) => c,
362            Err(e) => {
363                // Release the ID reservation on startup failure.
364                if let Ok(mut ids) = self.batch_ids.lock() {
365                    ids.remove(&processor_id);
366                }
367                return Err(e);
368            }
369        };
370        let handle = coordinator.handle();
371
372        {
373            let mut coords = self
374                .coordinators
375                .lock()
376                .map_err(|_| NvError::Runtime(RuntimeError::RegistryPoisoned))?;
377            coords.push(coordinator);
378
379            // Re-check shutdown *under the same lock* that shutdown_all
380            // acquires to iterate coordinators.  If shutdown slipped in
381            // during the (potentially 30 s) BatchCoordinator::start()
382            // window, this coordinator was never signaled.  Remove it,
383            // shut it down inline, and return an error so the caller
384            // does not receive a handle to a dead coordinator.
385            if self.shutdown.load(Ordering::Acquire) {
386                if let Some(orphan) = coords.pop() {
387                    drop(coords);
388                    if let Some(detached) = orphan.shutdown(self.coordinator_join_timeout) {
389                        self.track_detached(std::iter::once(detached));
390                    }
391                }
392                if let Ok(mut ids) = self.batch_ids.lock() {
393                    ids.remove(&processor_id);
394                }
395                return Err(NvError::Runtime(RuntimeError::ShutdownInProgress));
396            }
397        }
398
399        Ok(handle)
400    }
401
402    fn add_feed(&self, config: FeedConfig) -> Result<FeedHandle, NvError> {
403        if self.shutdown.load(Ordering::Relaxed) {
404            return Err(NvError::Runtime(RuntimeError::ShutdownInProgress));
405        }
406
407        // --- Security validation ---
408        // Reject Custom pipelines unless policy allows them.
409        if matches!(config.source, nv_core::config::SourceSpec::Custom { .. })
410            && self.custom_pipeline_policy == nv_core::security::CustomPipelinePolicy::Reject
411        {
412            return Err(NvError::Media(
413                nv_core::error::MediaError::CustomPipelineRejected,
414            ));
415        }
416        // Reject insecure RTSP URLs when RequireTls is set.
417        if let nv_core::config::SourceSpec::Rtsp {
418            ref url, security, ..
419        } = config.source
420            && security == nv_core::security::RtspSecurityPolicy::RequireTls
421            && nv_core::security::is_insecure_rtsp(url)
422        {
423            return Err(NvError::Media(
424                nv_core::error::MediaError::InsecureRtspRejected,
425            ));
426        }
427
428        let mut feeds = self
429            .feeds
430            .lock()
431            .map_err(|_| NvError::Runtime(RuntimeError::RegistryPoisoned))?;
432
433        if feeds.len() >= self.max_feeds {
434            return Err(NvError::Runtime(RuntimeError::FeedLimitExceeded {
435                max: self.max_feeds,
436            }));
437        }
438
439        let id = FeedId::new(self.next_feed_id.fetch_add(1, Ordering::Relaxed));
440
441        let (shared, thread) = worker::spawn_feed_worker(
442            id,
443            config,
444            Arc::clone(&self.factory),
445            self.health_tx.clone(),
446            self.output_tx.clone(),
447            Arc::clone(&self.lag_detector),
448        )?;
449
450        let handle = FeedHandle::new(Arc::clone(&shared));
451
452        feeds.insert(
453            id,
454            RunningFeed {
455                shared,
456                thread: Some(thread),
457            },
458        );
459
460        Ok(handle)
461    }
462
463    fn remove_feed(&self, feed_id: FeedId) -> Result<(), NvError> {
464        let mut feeds = self
465            .feeds
466            .lock()
467            .map_err(|_| NvError::Runtime(RuntimeError::RegistryPoisoned))?;
468
469        let entry = feeds
470            .remove(&feed_id)
471            .ok_or(NvError::Runtime(RuntimeError::FeedNotFound { feed_id }))?;
472
473        entry.shared.request_shutdown();
474        drop(feeds);
475
476        if let Some(handle) = entry.thread
477            && let Some(detached) =
478                bounded_join(handle, feed_id, &self.health_tx, self.feed_join_timeout)
479        {
480            self.track_detached(std::iter::once(detached));
481        }
482        Ok(())
483    }
484
485    fn shutdown_all(&self) -> Result<(), NvError> {
486        // Release so that the Acquire re-check in create_batch() sees it.
487        self.shutdown.store(true, Ordering::Release);
488
489        // --- Phase 1: Signal everything to stop. ---
490        //
491        // Signal feeds first so they stop submitting new frames.
492        let mut feeds = self
493            .feeds
494            .lock()
495            .map_err(|_| NvError::Runtime(RuntimeError::RegistryPoisoned))?;
496
497        for entry in feeds.values() {
498            entry.shared.request_shutdown();
499        }
500
501        let entries: Vec<_> = feeds.drain().collect();
502        drop(feeds);
503
504        // Signal batch coordinators *before* joining feed threads.
505        // Feed threads blocked in `submit_and_wait` unblock once the
506        // coordinator processes remaining items and exits (disconnecting
507        // response channels). Without this, feeds can hang for up to
508        // `max_latency + RESPONSE_TIMEOUT_SAFETY` waiting for a batch
509        // that the coordinator hasn't been told to stop.
510        //
511        // Use unwrap_or_else(PoisonError::into_inner) to guarantee
512        // coordinator signaling even when the mutex is poisoned —
513        // shutdown must not silently skip cleanup.
514        {
515            let coordinators = self.coordinators.lock().unwrap_or_else(|e| e.into_inner());
516            for coordinator in coordinators.iter() {
517                coordinator.signal_shutdown();
518            }
519        }
520
521        // --- Phase 2: Join feed threads. ---
522        //
523        // Coordinators are shutting down concurrently, so feed threads
524        // blocked on batch responses will unblock promptly.
525        for (id, mut entry) in entries {
526            if let Some(handle) = entry.thread.take()
527                && let Some(detached) =
528                    bounded_join(handle, id, &self.health_tx, self.feed_join_timeout)
529            {
530                self.track_detached(std::iter::once(detached));
531            }
532        }
533
534        // --- Phase 3: Join batch coordinators. ---
535        //
536        // All feed threads are done (or detached). No new submissions.
537        {
538            let mut coordinators = self.coordinators.lock().unwrap_or_else(|e| e.into_inner());
539            let detached: Vec<_> = coordinators
540                .drain(..)
541                .filter_map(|c| c.shutdown(self.coordinator_join_timeout))
542                .collect();
543            if !detached.is_empty() {
544                self.track_detached(detached);
545            }
546        }
547
548        // Flush any pending sentinel-observed delta that was throttled
549        // but never emitted.
550        self.lag_detector.flush(&self.health_tx);
551
552        Ok(())
553    }
554}
555
556/// Join a feed worker thread with a bounded timeout.
557///
558/// If the thread does not finish within `timeout`, it is
559/// detached (the helper thread will eventually join when the worker
560/// finishes) and a `FeedStopped` health event with a timeout reason
561/// is emitted.
562///
563/// Returns `Some(DetachedJoin)` when the thread did not finish in time,
564/// allowing the caller to track the detached thread for later reaping.
565fn bounded_join(
566    handle: std::thread::JoinHandle<()>,
567    feed_id: FeedId,
568    health_tx: &broadcast::Sender<HealthEvent>,
569    timeout: Duration,
570) -> Option<DetachedJoin> {
571    let (done_tx, done_rx) = std::sync::mpsc::channel();
572    let label = format!("nv-join-{feed_id}");
573    let joiner = std::thread::Builder::new()
574        .name(label.clone())
575        .spawn(move || {
576            let result = handle.join();
577            let _ = done_tx.send(result);
578        });
579    match done_rx.recv_timeout(timeout) {
580        Ok(Ok(())) => None,
581        Ok(Err(_)) => {
582            tracing::error!(
583                feed_id = %feed_id,
584                "feed worker thread panicked during join",
585            );
586            None
587        }
588        Err(_) => {
589            tracing::warn!(
590                feed_id = %feed_id,
591                timeout_secs = timeout.as_secs(),
592                "feed worker thread did not finish within timeout — detaching",
593            );
594            let _ = health_tx.send(HealthEvent::FeedStopped {
595                feed_id,
596                reason: nv_core::health::StopReason::Fatal {
597                    detail: format!(
598                        "worker thread did not join within {}s — detached",
599                        timeout.as_secs()
600                    ),
601                },
602            });
603            joiner.ok().map(|j| DetachedJoin {
604                label,
605                done_rx,
606                joiner: j,
607            })
608        }
609    }
610}
611
612/// Tracks a thread that was detached due to a join timeout.
613///
614/// When a `bounded_join` times out, the helper thread continues waiting
615/// for the original thread to finish. This struct retains the helper's
616/// `JoinHandle` and the completion channel so the runtime can
617/// periodically attempt to reap finished threads.
618pub(crate) struct DetachedJoin {
619    /// Human-readable label for diagnostics logging.
620    #[allow(dead_code)]
621    pub(crate) label: String,
622    /// Receives the original thread's join result when the helper
623    /// thread finishes its blocking join.
624    pub(crate) done_rx: std::sync::mpsc::Receiver<std::thread::Result<()>>,
625    /// Helper thread handle — joining this is instantaneous once
626    /// `done_rx` has a message.
627    #[allow(dead_code)]
628    pub(crate) joiner: std::thread::JoinHandle<()>,
629}
630
631// ---------------------------------------------------------------------------
632// Runtime — owning entry point
633// ---------------------------------------------------------------------------
634
635/// The top-level NextVision runtime.
636///
637/// Manages cross-feed concerns: feed registry, global limits, and shutdown.
638/// Create via [`Runtime::builder()`].
639///
640/// Use [`handle()`](Runtime::handle) to obtain a cloneable [`RuntimeHandle`]
641/// for concurrent control from multiple threads. The `Runtime` itself can
642/// also be used directly for convenience.
643pub struct Runtime {
644    inner: Arc<RuntimeInner>,
645}
646
647impl Runtime {
648    /// Create a new [`RuntimeBuilder`].
649    #[must_use]
650    pub fn builder() -> RuntimeBuilder {
651        RuntimeBuilder {
652            max_feeds: 64,
653            health_capacity: DEFAULT_HEALTH_CAPACITY,
654            output_capacity: DEFAULT_OUTPUT_CAPACITY,
655            ingress_factory: None,
656            feed_join_timeout: DEFAULT_FEED_JOIN_TIMEOUT,
657            coordinator_join_timeout: DEFAULT_COORDINATOR_JOIN_TIMEOUT,
658            custom_pipeline_policy: nv_core::security::CustomPipelinePolicy::default(),
659        }
660    }
661
662    /// Obtain a cloneable [`RuntimeHandle`].
663    ///
664    /// The handle provides the same control surface as `Runtime` but can
665    /// be cloned and shared across threads.
666    #[must_use]
667    pub fn handle(&self) -> RuntimeHandle {
668        RuntimeHandle {
669            inner: Arc::clone(&self.inner),
670        }
671    }
672
673    /// Number of currently active feeds.
674    ///
675    /// # Errors
676    ///
677    /// Returns `RuntimeError::RegistryPoisoned` if the internal lock is poisoned.
678    pub fn feed_count(&self) -> Result<usize, NvError> {
679        self.inner.feed_count()
680    }
681
682    /// Maximum allowed concurrent feeds.
683    #[must_use]
684    pub fn max_feeds(&self) -> usize {
685        self.inner.max_feeds
686    }
687
688    /// Elapsed time since the runtime was created.
689    ///
690    /// Monotonically increasing. Useful for uptime dashboards and
691    /// health checks.
692    #[must_use]
693    pub fn uptime(&self) -> Duration {
694        self.inner.started_at.elapsed()
695    }
696
697    /// Get a consolidated diagnostics snapshot of the runtime and all feeds.
698    ///
699    /// Returns per-feed lifecycle state, metrics, queue depths, decode status,
700    /// and view-system health, plus batch coordinator metrics and output
701    /// channel lag status.
702    ///
703    /// Designed for periodic polling (1–5 s) by dashboards and health
704    /// probes. Complement with [`health_subscribe()`](Self::health_subscribe)
705    /// for event-driven state transitions.
706    ///
707    /// # Errors
708    ///
709    /// Returns `RuntimeError::RegistryPoisoned` if an internal lock is poisoned.
710    pub fn diagnostics(&self) -> Result<crate::diagnostics::RuntimeDiagnostics, NvError> {
711        self.inner.diagnostics()
712    }
713
714    /// Subscribe to aggregate health events from all feeds.
715    pub fn health_subscribe(&self) -> broadcast::Receiver<HealthEvent> {
716        self.inner.health_tx.subscribe()
717    }
718
719    /// Subscribe to aggregate output from all feeds.
720    ///
721    /// Each subscriber receives an `Arc<OutputEnvelope>` for every output
722    /// produced by any feed. The channel is bounded by the configured
723    /// `output_capacity` (default 256). Slow subscribers will receive
724    /// `RecvError::Lagged` when they fall behind.
725    ///
726    /// Channel saturation is monitored by an internal sentinel receiver.
727    /// When the sentinel detects ring-buffer wrap, the runtime emits a
728    /// global [`HealthEvent::OutputLagged`] event carrying the
729    /// sentinel-observed per-event delta. This is a saturation signal,
730    /// not a per-subscriber loss report. Live saturation state is also
731    /// available via [`Runtime::diagnostics()`] in
732    /// [`RuntimeDiagnostics::output_lag`](crate::diagnostics::RuntimeDiagnostics::output_lag).
733    pub fn output_subscribe(&self) -> broadcast::Receiver<SharedOutput> {
734        self.inner.output_tx.subscribe()
735    }
736
737    /// Add a new feed to the runtime.
738    ///
739    /// # Errors
740    ///
741    /// - `RuntimeError::FeedLimitExceeded` if the max feed count is reached.
742    /// - `RuntimeError::ShutdownInProgress` if shutdown has been initiated.
743    /// - `RuntimeError::ThreadSpawnFailed` if the OS thread cannot be created.
744    pub fn add_feed(&self, config: FeedConfig) -> Result<FeedHandle, NvError> {
745        self.inner.add_feed(config)
746    }
747
748    /// Create a shared batch coordinator for cross-feed inference.
749    ///
750    /// Returns a clonable [`BatchHandle`] that can be shared across
751    /// multiple feeds via `FeedPipeline::builder().batch(handle)`.
752    ///
753    /// The coordinator takes **ownership** of the processor (via `Box`).
754    /// A single coordinator thread is the sole caller of all processor
755    /// methods — no `Sync` bound is required. The coordinator is shut
756    /// down automatically when the runtime shuts down.
757    ///
758    /// # Errors
759    ///
760    /// - `RuntimeError::ShutdownInProgress` if shutdown has been initiated.
761    /// - `ConfigError::DuplicateBatchProcessorId` if a coordinator with the
762    ///   same processor ID already exists.
763    /// - `ConfigError::InvalidPolicy` if `config` has invalid values.
764    /// - `RuntimeError::ThreadSpawnFailed` if the coordinator thread fails.
765    pub fn create_batch(
766        &self,
767        processor: Box<dyn BatchProcessor>,
768        config: BatchConfig,
769    ) -> Result<BatchHandle, NvError> {
770        self.inner.create_batch(processor, config)
771    }
772
773    /// Remove a feed by ID, stopping it gracefully.
774    ///
775    /// # Errors
776    ///
777    /// Returns `RuntimeError::FeedNotFound` if the ID does not exist.
778    pub fn remove_feed(&self, feed_id: FeedId) -> Result<(), NvError> {
779        self.inner.remove_feed(feed_id)
780    }
781
782    /// Initiate graceful shutdown of all feeds.
783    ///
784    /// Signals all worker threads to stop, waits for them to terminate,
785    /// and returns. After shutdown the runtime cannot accept new feeds.
786    pub fn shutdown(self) -> Result<(), NvError> {
787        self.inner.shutdown_all()
788    }
789}
790
791impl Drop for Runtime {
792    fn drop(&mut self) {
793        // Best-effort cleanup: if the user forgot to call shutdown(), make
794        // sure all worker threads are signaled and joined rather than
795        // silently detached.
796        if !self.inner.shutdown.load(Ordering::Acquire) {
797            let _ = self.inner.shutdown_all();
798        }
799    }
800}
801
802// ---------------------------------------------------------------------------
803// RuntimeHandle — cloneable control surface
804// ---------------------------------------------------------------------------
805
806/// Cloneable handle to the runtime.
807///
808/// Provides the same control surface as [`Runtime`] — add/remove feeds,
809/// subscribe to health and output events, and trigger shutdown.
810///
811/// Obtain via [`Runtime::handle()`].
812#[derive(Clone)]
813pub struct RuntimeHandle {
814    inner: Arc<RuntimeInner>,
815}
816
817impl RuntimeHandle {
818    /// Number of currently active feeds.
819    pub fn feed_count(&self) -> Result<usize, NvError> {
820        self.inner.feed_count()
821    }
822
823    /// Maximum allowed concurrent feeds.
824    #[must_use]
825    pub fn max_feeds(&self) -> usize {
826        self.inner.max_feeds
827    }
828
829    /// Elapsed time since the runtime was created.
830    ///
831    /// Monotonically increasing. Useful for uptime dashboards and
832    /// health checks.
833    #[must_use]
834    pub fn uptime(&self) -> Duration {
835        self.inner.started_at.elapsed()
836    }
837
838    /// Get a consolidated diagnostics snapshot of the runtime and all feeds.
839    ///
840    /// See [`Runtime::diagnostics()`] for details.
841    pub fn diagnostics(&self) -> Result<crate::diagnostics::RuntimeDiagnostics, NvError> {
842        self.inner.diagnostics()
843    }
844
845    /// Subscribe to aggregate health events from all feeds.
846    pub fn health_subscribe(&self) -> broadcast::Receiver<HealthEvent> {
847        self.inner.health_tx.subscribe()
848    }
849
850    /// Subscribe to aggregate output from all feeds.
851    ///
852    /// Bounded by the configured `output_capacity`. Slow subscribers
853    /// receive `RecvError::Lagged`. Channel saturation is reported via
854    /// [`HealthEvent::OutputLagged`] (sentinel-observed, not
855    /// per-subscriber loss).
856    pub fn output_subscribe(&self) -> broadcast::Receiver<SharedOutput> {
857        self.inner.output_tx.subscribe()
858    }
859
860    /// Add a new feed to the runtime.
861    ///
862    /// # Errors
863    ///
864    /// See [`Runtime::add_feed()`].
865    pub fn add_feed(&self, config: FeedConfig) -> Result<FeedHandle, NvError> {
866        self.inner.add_feed(config)
867    }
868
869    /// Create a shared batch coordinator.
870    ///
871    /// See [`Runtime::create_batch()`] for details.
872    pub fn create_batch(
873        &self,
874        processor: Box<dyn BatchProcessor>,
875        config: BatchConfig,
876    ) -> Result<BatchHandle, NvError> {
877        self.inner.create_batch(processor, config)
878    }
879
880    /// Remove a feed by ID, stopping it gracefully.
881    ///
882    /// # Errors
883    ///
884    /// Returns `RuntimeError::FeedNotFound` if the ID does not exist.
885    pub fn remove_feed(&self, feed_id: FeedId) -> Result<(), NvError> {
886        self.inner.remove_feed(feed_id)
887    }
888
889    /// Trigger graceful shutdown of all feeds.
890    ///
891    /// Signals all worker threads to stop, waits for them to terminate,
892    /// and returns. Unlike [`Runtime::shutdown()`], this does not consume
893    /// the handle — it can be called from any clone.
894    pub fn shutdown(&self) -> Result<(), NvError> {
895        self.inner.shutdown_all()
896    }
897}
898
899#[cfg(test)]
900#[path = "runtime_tests/mod.rs"]
901mod tests;