1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
//! Shared batch coordination infrastructure.
//!
//! This module provides the runtime-side machinery for cross-feed batch
//! inference: a `BatchCoordinator` that collects frames from feed
//! threads, forms bounded batches, dispatches to a user-supplied
//! `BatchProcessor`, and routes results
//! back to feed-local pipeline continuations.
//!
//! # Ownership
//!
//! The coordinator takes **sole ownership** of the processor via
//! `Box<dyn BatchProcessor>`. The processor is never shared — `Sync` is
//! not required. All lifecycle methods (`on_start`, `process`, `on_stop`)
//! are called exclusively from the coordinator thread.
//!
//! # Thread model
//!
//! ```text
//! Feed-1 ──submit──┐ ┌── response ──→ Feed-1
//! Feed-2 ──submit──┤ BatchCoordinator thread ├── response ──→ Feed-2
//! Feed-3 ──submit──┘ (collects → dispatches) └── response ──→ Feed-3
//! │
//! BatchProcessor::process(&mut self, &mut [BatchEntry])
//! ```
//!
//! Each feed thread submits a `BatchEntry` (frame + view snapshot) via
//! a bounded channel and blocks on a per-item response channel. The
//! coordinator thread collects items until `max_batch_size` or
//! `max_latency`, calls the processor, and routes results back.
//!
//! # Backpressure
//!
//! Submission uses `try_send` (non-blocking). If the coordinator queue is
//! full, the feed thread receives `BatchSubmitError::QueueFull` and the
//! frame is dropped with a `HealthEvent::BatchSubmissionRejected` event.
//! Rejection counts are coalesced per-feed with a 1-second throttle
//! window, and flushed on recovery or lifecycle boundaries.
//!
//! # Fairness model
//!
//! All feeds share a single FIFO submission queue. The coordinator
//! processes items strictly in submission order.
//!
//! ## Per-feed in-flight cap
//!
//! Each feed is allowed at most [`max_in_flight_per_feed`](BatchConfig::max_in_flight_per_feed)
//! items in-flight simultaneously (default: **1**). An item is "in-flight"
//! from the moment it enters the submission queue until the coordinator
//! routes its result back (or drains it at shutdown).
//!
//! Under normal operation, `submit_and_wait`
//! is synchronous — the feed thread blocks until the result arrives.
//! With the default cap of 1, at most one item per feed is in the queue
//! at any time.
//!
//! **Timeout regime**: when `submit_and_wait` returns
//! `BatchSubmitError::Timeout`, the timed-out item remains in-flight
//! inside the coordinator. The in-flight cap prevents the feed from
//! stacking additional items: the next `submit_and_wait` call returns
//! `BatchSubmitError::InFlightCapReached` immediately until the
//! coordinator processes (or drains) the orphaned item. This bounds
//! per-feed queue occupancy to `max_in_flight_per_feed` even under
//! sustained processor slowness.
//!
//! Under normal load (queue rarely full, no timeouts) every feed's
//! frames are accepted and batched fairly by arrival time. Under
//! sustained overload (queue persistently full), `try_send` fails at
//! the instant a feed attempts submission.
//!
//! **Not guaranteed**: strict round-robin or weighted fairness. After
//! a batch completes, all participating feeds are unblocked
//! simultaneously. Scheduling jitter determines which feed's next
//! `try_send` arrives first. Over time the distribution is
//! approximately uniform, but short-term skew is possible.
//!
//! **Diagnostic**: per-feed rejection counts are visible via
//! `HealthEvent::BatchSubmissionRejected` events, per-feed
//! timeout counts via `HealthEvent::BatchTimeout` events, and
//! per-feed in-flight cap rejections via
//! `HealthEvent::BatchInFlightExceeded` events (all coalesced
//! per feed). Persistent in-flight rejections indicate the
//! processor is too slow for the configured timeout.
//!
//! # Queue sizing
//!
//! The default queue capacity is `max_batch_size * 4` (minimum 4).
//! Guidelines:
//!
//! - `queue_capacity >= num_feeds` is required for all feeds to have
//! a slot simultaneously. Because `submit_and_wait` serializes per
//! feed, this is the hard floor for avoiding unnecessary rejections.
//! - `queue_capacity >= max_batch_size * 2` prevents rejection during
//! normal batch-formation cadence (one batch forming, one draining).
//! - When both conditions conflict, prefer the larger value.
//!
//! # Shutdown
//!
//! The coordinator checks its shutdown flag every 100 ms during both
//! idle waiting (Phase 1) and batch formation (Phase 2). During
//! runtime shutdown, coordinators are signaled *before* feed threads
//! are joined, so feed threads blocked in `submit_and_wait` unblock
//! promptly when the coordinator exits and the response channel
//! disconnects.
//!
//! ## Startup timeout
//!
//! `BatchProcessor::on_start()` is given [`BatchConfig::startup_timeout`]
//! (default 30 s, configurable). GPU-backed processors such as TensorRT
//! may need several minutes for first-run engine compilation; set a
//! longer timeout via [`BatchConfig::with_startup_timeout`].
//! If it exceeds the timeout, the coordinator attempts a 2-second bounded
//! join before returning an error. If the thread is still alive after the
//! grace period it is detached (inherent safe-Rust limitation).
//!
//! ## Response timeout
//!
//! `submit_and_wait` blocks for at most `max_latency + response_timeout`
//! before returning `BatchSubmitError::Timeout`. The response timeout
//! defaults to 5 s (`DEFAULT_RESPONSE_TIMEOUT`) and can be configured
//! via [`BatchConfig::response_timeout`].
//!
//! ## Expected vs unexpected coordinator loss
//!
//! `PipelineExecutor` distinguishes
//! expected shutdown (feed/runtime is shutting down) from unexpected
//! coordinator death by checking the feed's shutdown flag. Unexpected
//! loss emits exactly one `HealthEvent::StageError` per feed.
//!
//! # Observability
//!
//! [`BatchHandle::metrics()`] returns a [`BatchMetrics`] snapshot with:
//! batches dispatched, items processed, items rejected, processing
//! latency, formation latency, and batch-size distribution.
// Public re-exports (visible to downstream crates via `nv_runtime::batch::*`
// and re-exported from `nv_runtime` in lib.rs).
pub use BatchConfig;
pub use BatchHandle;
pub use BatchMetrics;
// Crate-internal re-exports (used by executor, runtime, worker modules).
pub use BatchCoordinator;
pub use BatchSubmitError;