nv_runtime/batch/handle.rs
1use std::sync::Arc;
2use std::sync::atomic::{AtomicUsize, Ordering};
3use std::time::Duration;
4
5use nv_core::error::StageError;
6use nv_core::id::StageId;
7use nv_perception::StageOutput;
8use nv_perception::batch::BatchEntry;
9
10use super::config::BatchConfig;
11use super::metrics::{BatchMetrics, BatchMetricsInner};
12
13/// Default response safety timeout when [`BatchConfig::response_timeout`]
14/// is `None`. See that field's documentation for semantics.
15const DEFAULT_RESPONSE_TIMEOUT: Duration = Duration::from_secs(5);
16
17// ---------------------------------------------------------------------------
18// Internal submission types
19// ---------------------------------------------------------------------------
20
21/// A pending item: the entry to process plus the response channel
22/// and optional per-feed in-flight guard.
23pub(super) struct PendingEntry {
24 pub(super) entry: BatchEntry,
25 pub(super) response_tx: std::sync::mpsc::SyncSender<Result<StageOutput, StageError>>,
26 /// Per-feed in-flight counter. The coordinator decrements this
27 /// after routing the result (or on drain). Must be decremented
28 /// exactly once per PendingEntry that reached the coordinator.
29 pub(super) in_flight_guard: Option<Arc<AtomicUsize>>,
30}
31
32// ---------------------------------------------------------------------------
33// BatchHandle
34// ---------------------------------------------------------------------------
35
36/// Clonable handle to a batch coordinator.
37///
38/// Obtained from [`Runtime::create_batch`](crate::Runtime::create_batch).
39/// Pass to [`FeedPipeline::builder().batch()`](crate::pipeline::FeedPipelineBuilder::batch)
40/// to enable shared batch processing for a feed.
41///
42/// Multiple feeds can share the same handle — their frames are batched
43/// together by the coordinator.
44///
45/// Clone is cheap (Arc bump).
46#[derive(Clone)]
47pub struct BatchHandle {
48 pub(crate) inner: Arc<BatchHandleInner>,
49}
50
51pub(crate) struct BatchHandleInner {
52 pub(super) submit_tx: std::sync::mpsc::SyncSender<PendingEntry>,
53 pub(super) metrics: Arc<BatchMetricsInner>,
54 pub(super) processor_id: StageId,
55 pub(super) config: BatchConfig,
56 pub(super) capabilities: Option<nv_perception::stage::StageCapabilities>,
57}
58
59impl BatchHandle {
60 /// The processor's stage ID.
61 #[must_use]
62 pub fn processor_id(&self) -> StageId {
63 self.inner.processor_id
64 }
65
66 /// The processor's declared capabilities (if any).
67 ///
68 /// Used by the feed pipeline validation to verify that post-batch
69 /// stages' input requirements are satisfied by the batch processor's
70 /// declared outputs.
71 #[must_use]
72 pub fn capabilities(&self) -> Option<&nv_perception::stage::StageCapabilities> {
73 self.inner.capabilities.as_ref()
74 }
75
76 /// Snapshot current batch metrics.
77 #[must_use]
78 pub fn metrics(&self) -> BatchMetrics {
79 self.inner
80 .metrics
81 .snapshot(self.inner.config.max_batch_size as u64)
82 }
83
84 /// Record that a feed-side timeout occurred.
85 ///
86 /// Called by the executor when `submit_and_wait` returns
87 /// `BatchSubmitError::Timeout`. The coordinator is unaware of
88 /// feed-side timeouts (it continues processing the batch), so
89 /// this counter is maintained from the feed side.
90 pub(crate) fn record_timeout(&self) {
91 self.inner.metrics.record_timeout();
92 }
93
94 /// Submit an entry and block until the batch result is ready.
95 ///
96 /// Called by the feed executor on the feed's OS thread.
97 ///
98 /// # In-flight guard
99 ///
100 /// When `in_flight` is `Some`, the counter is checked against
101 /// [`BatchConfig::max_in_flight_per_feed`] before submission.
102 /// If the feed is at its cap, returns
103 /// [`BatchSubmitError::InFlightCapReached`] immediately.
104 /// On successful `try_send`, the counter has been incremented;
105 /// the **coordinator** is responsible for decrementing it after
106 /// routing the result or during drain.
107 ///
108 /// # Per-submit channel allocation
109 ///
110 /// Each call allocates a `sync_channel(1)` for the response. This is
111 /// intentional: the allocation is a single small heap pair, occurs at
112 /// batch timescale (typically 20–100 ms), and is negligible relative to
113 /// the total per-frame cost. Alternatives (pre-allocated slot pool,
114 /// shared condvar) would add complexity and contention without
115 /// measurable benefit at realistic batch rates.
116 pub(crate) fn submit_and_wait(
117 &self,
118 entry: BatchEntry,
119 in_flight: Option<&Arc<AtomicUsize>>,
120 ) -> Result<StageOutput, BatchSubmitError> {
121 let (response_tx, response_rx) = std::sync::mpsc::sync_channel(1);
122 self.inner.metrics.record_submission();
123
124 // Check in-flight cap before attempting submission.
125 let guard = if let Some(counter) = in_flight {
126 let prev = counter.fetch_add(1, Ordering::Acquire);
127 if prev >= self.inner.config.max_in_flight_per_feed {
128 counter.fetch_sub(1, Ordering::Release);
129 self.inner.metrics.record_rejection();
130 return Err(BatchSubmitError::InFlightCapReached);
131 }
132 Some(Arc::clone(counter))
133 } else {
134 None
135 };
136
137 // Non-blocking submit. If the queue is full, fail immediately
138 // rather than blocking the feed thread. Decrement in-flight
139 // since the entry never reached the coordinator.
140 self.inner
141 .submit_tx
142 .try_send(PendingEntry {
143 entry,
144 response_tx,
145 in_flight_guard: guard.clone(),
146 })
147 .map_err(|e| {
148 // Entry rejected — coordinator never saw it, decrement.
149 if let Some(ref g) = guard {
150 g.fetch_sub(1, Ordering::Release);
151 }
152 match e {
153 std::sync::mpsc::TrySendError::Full(_) => {
154 self.inner.metrics.record_rejection();
155 BatchSubmitError::QueueFull
156 }
157 std::sync::mpsc::TrySendError::Disconnected(_) => {
158 self.inner.metrics.record_rejection();
159 BatchSubmitError::CoordinatorShutdown
160 }
161 }
162 })?;
163
164 // Bounded wait for the response. The coordinator decrements
165 // the in-flight guard before sending the result, so by the
166 // time we receive the response the counter is already updated.
167 let safety = self
168 .inner
169 .config
170 .response_timeout
171 .unwrap_or(DEFAULT_RESPONSE_TIMEOUT);
172 let timeout = self.inner.config.max_latency + safety;
173 match response_rx.recv_timeout(timeout) {
174 Ok(Ok(output)) => Ok(output),
175 Ok(Err(stage_err)) => Err(BatchSubmitError::ProcessingFailed(stage_err)),
176 // Timeout: do NOT decrement — item is still in-flight
177 // inside the coordinator. The coordinator will decrement
178 // when it processes or drains the item.
179 Err(std::sync::mpsc::RecvTimeoutError::Timeout) => Err(BatchSubmitError::Timeout),
180 Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {
181 Err(BatchSubmitError::CoordinatorShutdown)
182 }
183 }
184 }
185}
186
187/// Internal batch submission errors.
188#[derive(Debug)]
189pub(crate) enum BatchSubmitError {
190 /// Submission queue is full — coordinator is overloaded.
191 QueueFull,
192 /// Coordinator thread has shut down.
193 CoordinatorShutdown,
194 /// Batch processor returned an error.
195 ProcessingFailed(StageError),
196 /// No response within safety timeout.
197 Timeout,
198 /// Feed has reached its per-feed in-flight cap. A prior timed-out
199 /// submission is still being processed by the coordinator.
200 InFlightCapReached,
201}