nv_runtime/batch/mod.rs
1//! Shared batch coordination infrastructure.
2//!
3//! This module provides the runtime-side machinery for cross-feed batch
4//! inference: a `BatchCoordinator` that collects frames from feed
5//! threads, forms bounded batches, dispatches to a user-supplied
6//! `BatchProcessor`, and routes results
7//! back to feed-local pipeline continuations.
8//!
9//! # Ownership
10//!
11//! The coordinator takes **sole ownership** of the processor via
12//! `Box<dyn BatchProcessor>`. The processor is never shared — `Sync` is
13//! not required. All lifecycle methods (`on_start`, `process`, `on_stop`)
14//! are called exclusively from the coordinator thread.
15//!
16//! # Thread model
17//!
18//! ```text
19//! Feed-1 ──submit──┐ ┌── response ──→ Feed-1
20//! Feed-2 ──submit──┤ BatchCoordinator thread ├── response ──→ Feed-2
21//! Feed-3 ──submit──┘ (collects → dispatches) └── response ──→ Feed-3
22//! │
23//! BatchProcessor::process(&mut self, &mut [BatchEntry])
24//! ```
25//!
26//! Each feed thread submits a `BatchEntry` (frame + view snapshot) via
27//! a bounded channel and blocks on a per-item response channel. The
28//! coordinator thread collects items until `max_batch_size` or
29//! `max_latency`, calls the processor, and routes results back.
30//!
31//! # Backpressure
32//!
33//! Submission uses `try_send` (non-blocking). If the coordinator queue is
34//! full, the feed thread receives `BatchSubmitError::QueueFull` and the
35//! frame is dropped with a `HealthEvent::BatchSubmissionRejected` event.
36//! Rejection counts are coalesced per-feed with a 1-second throttle
37//! window, and flushed on recovery or lifecycle boundaries.
38//!
39//! # Fairness model
40//!
41//! All feeds share a single FIFO submission queue. The coordinator
42//! processes items strictly in submission order.
43//!
44//! ## Per-feed in-flight cap
45//!
46//! Each feed is allowed at most [`max_in_flight_per_feed`](BatchConfig::max_in_flight_per_feed)
47//! items in-flight simultaneously (default: **1**). An item is "in-flight"
48//! from the moment it enters the submission queue until the coordinator
49//! routes its result back (or drains it at shutdown).
50//!
51//! Under normal operation, `submit_and_wait`
52//! is synchronous — the feed thread blocks until the result arrives.
53//! With the default cap of 1, at most one item per feed is in the queue
54//! at any time.
55//!
56//! **Timeout regime**: when `submit_and_wait` returns
57//! `BatchSubmitError::Timeout`, the timed-out item remains in-flight
58//! inside the coordinator. The in-flight cap prevents the feed from
59//! stacking additional items: the next `submit_and_wait` call returns
60//! `BatchSubmitError::InFlightCapReached` immediately until the
61//! coordinator processes (or drains) the orphaned item. This bounds
62//! per-feed queue occupancy to `max_in_flight_per_feed` even under
63//! sustained processor slowness.
64//!
65//! Under normal load (queue rarely full, no timeouts) every feed's
66//! frames are accepted and batched fairly by arrival time. Under
67//! sustained overload (queue persistently full), `try_send` fails at
68//! the instant a feed attempts submission.
69//!
70//! **Not guaranteed**: strict round-robin or weighted fairness. After
71//! a batch completes, all participating feeds are unblocked
72//! simultaneously. Scheduling jitter determines which feed's next
73//! `try_send` arrives first. Over time the distribution is
74//! approximately uniform, but short-term skew is possible.
75//!
76//! **Diagnostic**: per-feed rejection counts are visible via
77//! `HealthEvent::BatchSubmissionRejected` events, per-feed
78//! timeout counts via `HealthEvent::BatchTimeout` events, and
79//! per-feed in-flight cap rejections via
80//! `HealthEvent::BatchInFlightExceeded` events (all coalesced
81//! per feed). Persistent in-flight rejections indicate the
82//! processor is too slow for the configured timeout.
83//!
84//! # Queue sizing
85//!
86//! The default queue capacity is `max_batch_size * 4` (minimum 4).
87//! Guidelines:
88//!
89//! - `queue_capacity >= num_feeds` is required for all feeds to have
90//! a slot simultaneously. Because `submit_and_wait` serializes per
91//! feed, this is the hard floor for avoiding unnecessary rejections.
92//! - `queue_capacity >= max_batch_size * 2` prevents rejection during
93//! normal batch-formation cadence (one batch forming, one draining).
94//! - When both conditions conflict, prefer the larger value.
95//!
96//! # Shutdown
97//!
98//! The coordinator checks its shutdown flag every 100 ms during both
99//! idle waiting (Phase 1) and batch formation (Phase 2). During
100//! runtime shutdown, coordinators are signaled *before* feed threads
101//! are joined, so feed threads blocked in `submit_and_wait` unblock
102//! promptly when the coordinator exits and the response channel
103//! disconnects.
104//!
105//! ## Startup timeout
106//!
107//! `BatchProcessor::on_start()` is given [`BatchConfig::startup_timeout`]
108//! (default 30 s, configurable). GPU-backed processors such as TensorRT
109//! may need several minutes for first-run engine compilation; set a
110//! longer timeout via [`BatchConfig::with_startup_timeout`].
111//! If it exceeds the timeout, the coordinator attempts a 2-second bounded
112//! join before returning an error. If the thread is still alive after the
113//! grace period it is detached (inherent safe-Rust limitation).
114//!
115//! ## Response timeout
116//!
117//! `submit_and_wait` blocks for at most `max_latency + response_timeout`
118//! before returning `BatchSubmitError::Timeout`. The response timeout
119//! defaults to 5 s (`DEFAULT_RESPONSE_TIMEOUT`) and can be configured
120//! via [`BatchConfig::response_timeout`].
121//!
122//! ## Expected vs unexpected coordinator loss
123//!
124//! `PipelineExecutor` distinguishes
125//! expected shutdown (feed/runtime is shutting down) from unexpected
126//! coordinator death by checking the feed's shutdown flag. Unexpected
127//! loss emits exactly one `HealthEvent::StageError` per feed.
128//!
129//! # Observability
130//!
131//! [`BatchHandle::metrics()`] returns a [`BatchMetrics`] snapshot with:
132//! batches dispatched, items processed, items rejected, processing
133//! latency, formation latency, and batch-size distribution.
134
135mod config;
136mod coordinator;
137mod handle;
138mod metrics;
139
140// Public re-exports (visible to downstream crates via `nv_runtime::batch::*`
141// and re-exported from `nv_runtime` in lib.rs).
142pub use config::BatchConfig;
143pub use handle::BatchHandle;
144pub use metrics::BatchMetrics;
145
146// Crate-internal re-exports (used by executor, runtime, worker modules).
147pub(crate) use coordinator::BatchCoordinator;
148pub(crate) use handle::BatchSubmitError;
149
150#[cfg(test)]
151#[path = "tests.rs"]
152mod tests;