Skip to main content

nv_runtime/batch/
handle.rs

1use std::sync::Arc;
2use std::sync::atomic::{AtomicUsize, Ordering};
3use std::time::Duration;
4
5use nv_core::error::StageError;
6use nv_core::id::StageId;
7use nv_perception::StageOutput;
8use nv_perception::batch::BatchEntry;
9
10use super::config::BatchConfig;
11use super::metrics::{BatchMetrics, BatchMetricsInner};
12
13/// Default response safety timeout when [`BatchConfig::response_timeout`]
14/// is `None`. See that field's documentation for semantics.
15const DEFAULT_RESPONSE_TIMEOUT: Duration = Duration::from_secs(5);
16
17// ---------------------------------------------------------------------------
18// Internal submission types
19// ---------------------------------------------------------------------------
20
21/// A pending item: the entry to process plus the response channel
22/// and optional per-feed in-flight guard.
23pub(super) struct PendingEntry {
24    pub(super) entry: BatchEntry,
25    pub(super) response_tx: std::sync::mpsc::SyncSender<Result<StageOutput, StageError>>,
26    /// Per-feed in-flight counter. The coordinator decrements this
27    /// after routing the result (or on drain). Must be decremented
28    /// exactly once per PendingEntry that reached the coordinator.
29    pub(super) in_flight_guard: Option<Arc<AtomicUsize>>,
30}
31
32// ---------------------------------------------------------------------------
33// BatchHandle
34// ---------------------------------------------------------------------------
35
36/// Clonable handle to a batch coordinator.
37///
38/// Obtained from [`Runtime::create_batch`](crate::Runtime::create_batch).
39/// Pass to [`FeedPipeline::builder().batch()`](crate::pipeline::FeedPipelineBuilder::batch)
40/// to enable shared batch processing for a feed.
41///
42/// Multiple feeds can share the same handle — their frames are batched
43/// together by the coordinator.
44///
45/// Clone is cheap (Arc bump).
46#[derive(Clone)]
47pub struct BatchHandle {
48    pub(crate) inner: Arc<BatchHandleInner>,
49}
50
51pub(crate) struct BatchHandleInner {
52    pub(super) submit_tx: std::sync::mpsc::SyncSender<PendingEntry>,
53    pub(super) metrics: Arc<BatchMetricsInner>,
54    pub(super) processor_id: StageId,
55    pub(super) config: BatchConfig,
56    pub(super) capabilities: Option<nv_perception::stage::StageCapabilities>,
57}
58
59impl BatchHandle {
60    /// The processor's stage ID.
61    #[must_use]
62    pub fn processor_id(&self) -> StageId {
63        self.inner.processor_id
64    }
65
66    /// The processor's declared capabilities (if any).
67    ///
68    /// Used by the feed pipeline validation to verify that post-batch
69    /// stages' input requirements are satisfied by the batch processor's
70    /// declared outputs.
71    #[must_use]
72    pub fn capabilities(&self) -> Option<&nv_perception::stage::StageCapabilities> {
73        self.inner.capabilities.as_ref()
74    }
75
76    /// Snapshot current batch metrics.
77    #[must_use]
78    pub fn metrics(&self) -> BatchMetrics {
79        self.inner
80            .metrics
81            .snapshot(self.inner.config.max_batch_size as u64)
82    }
83
84    /// Record that a feed-side timeout occurred.
85    ///
86    /// Called by the executor when `submit_and_wait` returns
87    /// `BatchSubmitError::Timeout`. The coordinator is unaware of
88    /// feed-side timeouts (it continues processing the batch), so
89    /// this counter is maintained from the feed side.
90    pub(crate) fn record_timeout(&self) {
91        self.inner.metrics.record_timeout();
92    }
93
94    /// Submit an entry and block until the batch result is ready.
95    ///
96    /// Called by the feed executor on the feed's OS thread.
97    ///
98    /// # In-flight guard
99    ///
100    /// When `in_flight` is `Some`, the counter is checked against
101    /// [`BatchConfig::max_in_flight_per_feed`] before submission.
102    /// If the feed is at its cap, returns
103    /// [`BatchSubmitError::InFlightCapReached`] immediately.
104    /// On successful `try_send`, the counter has been incremented;
105    /// the **coordinator** is responsible for decrementing it after
106    /// routing the result or during drain.
107    ///
108    /// # Per-submit channel allocation
109    ///
110    /// Each call allocates a `sync_channel(1)` for the response. This is
111    /// intentional: the allocation is a single small heap pair, occurs at
112    /// batch timescale (typically 20–100 ms), and is negligible relative to
113    /// the total per-frame cost. Alternatives (pre-allocated slot pool,
114    /// shared condvar) would add complexity and contention without
115    /// measurable benefit at realistic batch rates.
116    pub(crate) fn submit_and_wait(
117        &self,
118        entry: BatchEntry,
119        in_flight: Option<&Arc<AtomicUsize>>,
120    ) -> Result<StageOutput, BatchSubmitError> {
121        let (response_tx, response_rx) = std::sync::mpsc::sync_channel(1);
122        self.inner.metrics.record_submission();
123
124        // Check in-flight cap before attempting submission.
125        let guard = if let Some(counter) = in_flight {
126            let prev = counter.fetch_add(1, Ordering::Acquire);
127            if prev >= self.inner.config.max_in_flight_per_feed {
128                counter.fetch_sub(1, Ordering::Release);
129                self.inner.metrics.record_rejection();
130                return Err(BatchSubmitError::InFlightCapReached);
131            }
132            Some(Arc::clone(counter))
133        } else {
134            None
135        };
136
137        // Non-blocking submit. If the queue is full, fail immediately
138        // rather than blocking the feed thread. Decrement in-flight
139        // since the entry never reached the coordinator.
140        self.inner
141            .submit_tx
142            .try_send(PendingEntry {
143                entry,
144                response_tx,
145                in_flight_guard: guard.clone(),
146            })
147            .map_err(|e| {
148                // Entry rejected — coordinator never saw it, decrement.
149                if let Some(ref g) = guard {
150                    g.fetch_sub(1, Ordering::Release);
151                }
152                match e {
153                    std::sync::mpsc::TrySendError::Full(_) => {
154                        self.inner.metrics.record_rejection();
155                        BatchSubmitError::QueueFull
156                    }
157                    std::sync::mpsc::TrySendError::Disconnected(_) => {
158                        self.inner.metrics.record_rejection();
159                        BatchSubmitError::CoordinatorShutdown
160                    }
161                }
162            })?;
163
164        // Bounded wait for the response. The coordinator decrements
165        // the in-flight guard before sending the result, so by the
166        // time we receive the response the counter is already updated.
167        let safety = self
168            .inner
169            .config
170            .response_timeout
171            .unwrap_or(DEFAULT_RESPONSE_TIMEOUT);
172        let timeout = self.inner.config.max_latency + safety;
173        match response_rx.recv_timeout(timeout) {
174            Ok(Ok(output)) => Ok(output),
175            Ok(Err(stage_err)) => Err(BatchSubmitError::ProcessingFailed(stage_err)),
176            // Timeout: do NOT decrement — item is still in-flight
177            // inside the coordinator. The coordinator will decrement
178            // when it processes or drains the item.
179            Err(std::sync::mpsc::RecvTimeoutError::Timeout) => Err(BatchSubmitError::Timeout),
180            Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {
181                Err(BatchSubmitError::CoordinatorShutdown)
182            }
183        }
184    }
185}
186
187/// Internal batch submission errors.
188#[derive(Debug)]
189pub(crate) enum BatchSubmitError {
190    /// Submission queue is full — coordinator is overloaded.
191    QueueFull,
192    /// Coordinator thread has shut down.
193    CoordinatorShutdown,
194    /// Batch processor returned an error.
195    ProcessingFailed(StageError),
196    /// No response within safety timeout.
197    Timeout,
198    /// Feed has reached its per-feed in-flight cap. A prior timed-out
199    /// submission is still being processed by the coordinator.
200    InFlightCapReached,
201}