Skip to main content

nv_runtime/batch/
metrics.rs

1use std::sync::atomic::{AtomicU64, Ordering};
2
3// ---------------------------------------------------------------------------
4// BatchMetrics
5// ---------------------------------------------------------------------------
6
7/// Live metrics snapshot for a batch coordinator.
8///
9/// All counters are monotonically increasing. Compute rates by taking
10/// deltas between snapshots.
11///
12/// # Counter semantics
13///
14/// - **`items_submitted`** — incremented when a feed thread calls
15///   `submit_and_wait`, *before* the channel send. Every call
16///   increments this exactly once, regardless of outcome.
17/// - **`items_rejected`** — incremented when `try_send` fails
18///   (`QueueFull` or `Disconnected`). Rejected items never reach
19///   the coordinator.
20/// - **`items_processed`** — incremented by the coordinator after
21///   the batch processor returns (success or error). Each item in
22///   the dispatched batch is counted.
23///
24/// **Invariant** (approximate under concurrent reads):
25/// `items_submitted >= items_processed + items_rejected`.
26#[derive(Debug, Clone, Copy, Default)]
27pub struct BatchMetrics {
28    /// Total batches dispatched to the processor.
29    pub batches_dispatched: u64,
30    /// Total items successfully dispatched and processed (success or
31    /// error) by the batch processor. Does not include rejected items.
32    pub items_processed: u64,
33    /// Total items submitted by feed threads (includes both accepted
34    /// and rejected submissions).
35    pub items_submitted: u64,
36    /// Items rejected because the submission queue was full or the
37    /// coordinator was shut down (`Disconnected`). These items never
38    /// reached the coordinator thread.
39    pub items_rejected: u64,
40    /// Items whose response was not received before the feed-side
41    /// safety timeout (`max_latency + response_timeout`). The
42    /// coordinator may still process these items, but the feed thread
43    /// abandoned waiting.
44    ///
45    /// A non-zero value indicates the batch processor is slower than
46    /// the configured safety margin allows. Consider increasing
47    /// `response_timeout` or reducing `max_batch_size`.
48    pub items_timed_out: u64,
49    /// Cumulative batch processing time (nanoseconds).
50    pub total_processing_ns: u64,
51    /// Cumulative batch formation wait time (nanoseconds).
52    pub total_formation_ns: u64,
53    /// Smallest batch size dispatched (0 if no batches yet).
54    pub min_batch_size: u64,
55    /// Largest batch size dispatched.
56    pub max_batch_size_seen: u64,
57    /// The configured `max_batch_size` for this coordinator.
58    ///
59    /// Included in the snapshot so callers can compute fill ratios
60    /// without retaining a reference to the original config.
61    pub configured_max_batch_size: u64,
62    /// Number of consecutive batch errors (processor `Err` or panic)
63    /// since the last successful dispatch. Reset to 0 on success.
64    ///
65    /// Useful for alerting: a steadily increasing value indicates a
66    /// persistently broken processor. Zero means the last batch
67    /// succeeded.
68    pub consecutive_errors: u64,
69}
70
71impl BatchMetrics {
72    /// Approximate number of items currently in-flight (submitted but not
73    /// yet processed or rejected).
74    ///
75    /// Computed from atomic counters — may be briefly inconsistent under
76    /// heavy contention, but sufficient for monitoring and dashboards.
77    ///
78    /// **Caveat**: items that timed out on the feed side may still be
79    /// processed by the coordinator. When this happens, `items_processed`
80    /// includes the timed-out item and this counter can undercount. The
81    /// discrepancy is bounded by `items_timed_out`.
82    #[must_use]
83    pub fn pending_items(&self) -> u64 {
84        self.items_submitted
85            .saturating_sub(self.items_processed)
86            .saturating_sub(self.items_rejected)
87    }
88
89    /// Fraction of submissions rejected (`items_rejected / items_submitted`).
90    ///
91    /// Returns `None` if no items have been submitted.
92    /// A consistently non-zero value indicates sustained overload.
93    #[must_use]
94    pub fn rejection_rate(&self) -> Option<f64> {
95        if self.items_submitted == 0 {
96            return None;
97        }
98        Some(self.items_rejected as f64 / self.items_submitted as f64)
99    }
100
101    /// Fraction of submissions that timed out (`items_timed_out / items_submitted`).
102    ///
103    /// Returns `None` if no items have been submitted.
104    /// A non-zero value indicates the processor is too slow for the
105    /// configured safety timeout.
106    #[must_use]
107    pub fn timeout_rate(&self) -> Option<f64> {
108        if self.items_submitted == 0 {
109            return None;
110        }
111        Some(self.items_timed_out as f64 / self.items_submitted as f64)
112    }
113
114    /// Average batch size across all dispatched batches.
115    ///
116    /// Returns `None` if no batches have been dispatched yet.
117    /// O(1), zero-allocation.
118    #[must_use]
119    pub fn avg_batch_size(&self) -> Option<f64> {
120        if self.batches_dispatched == 0 {
121            return None;
122        }
123        Some(self.items_processed as f64 / self.batches_dispatched as f64)
124    }
125
126    /// Average batch fill ratio: `avg_batch_size / configured_max_batch_size`.
127    ///
128    /// A value of `1.0` means batches are consistently full. Lower values
129    /// indicate partial batches (dispatched on timeout rather than on size).
130    ///
131    /// Returns `None` if no batches have been dispatched yet.
132    /// O(1), zero-allocation.
133    #[must_use]
134    pub fn avg_fill_ratio(&self) -> Option<f64> {
135        let avg = self.avg_batch_size()?;
136        if self.configured_max_batch_size == 0 {
137            return None;
138        }
139        Some(avg / self.configured_max_batch_size as f64)
140    }
141
142    /// Average processing time per batch (nanoseconds).
143    ///
144    /// Returns `None` if no batches have been dispatched yet.
145    /// O(1), zero-allocation.
146    #[must_use]
147    pub fn avg_processing_ns(&self) -> Option<f64> {
148        if self.batches_dispatched == 0 {
149            return None;
150        }
151        Some(self.total_processing_ns as f64 / self.batches_dispatched as f64)
152    }
153
154    /// Average formation wait time per batch (nanoseconds).
155    ///
156    /// Formation time is the interval from the first item arriving to
157    /// the batch being dispatched. Lower values indicate faster batch
158    /// filling (high submission rate or small batch size).
159    ///
160    /// Returns `None` if no batches have been dispatched yet.
161    /// O(1), zero-allocation.
162    #[must_use]
163    pub fn avg_formation_ns(&self) -> Option<f64> {
164        if self.batches_dispatched == 0 {
165            return None;
166        }
167        Some(self.total_formation_ns as f64 / self.batches_dispatched as f64)
168    }
169}
170
171impl std::fmt::Display for BatchMetrics {
172    /// Human-readable diagnostic summary.
173    ///
174    /// Example output:
175    /// ```text
176    /// batches=120 items=480/500 rejected=15 timed_out=2 fill=0.80 \
177    /// avg_proc=12.5ms avg_form=8.2ms consec_err=0
178    /// ```
179    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
180        write!(
181            f,
182            "batches={} items={}/{} rejected={} timed_out={} fill={} avg_proc={} avg_form={} consec_err={}",
183            self.batches_dispatched,
184            self.items_processed,
185            self.items_submitted,
186            self.items_rejected,
187            self.items_timed_out,
188            self.avg_fill_ratio()
189                .map_or_else(|| "n/a".to_string(), |r| format!("{r:.2}")),
190            self.avg_processing_ns().map_or_else(
191                || "n/a".to_string(),
192                |ns| format!("{:.1}ms", ns / 1_000_000.0)
193            ),
194            self.avg_formation_ns().map_or_else(
195                || "n/a".to_string(),
196                |ns| format!("{:.1}ms", ns / 1_000_000.0)
197            ),
198            self.consecutive_errors,
199        )
200    }
201}
202
203// ---------------------------------------------------------------------------
204// BatchMetricsInner
205// ---------------------------------------------------------------------------
206
207/// Internal atomic counters for lock-free metrics recording.
208pub(super) struct BatchMetricsInner {
209    batches_dispatched: AtomicU64,
210    items_processed: AtomicU64,
211    items_submitted: AtomicU64,
212    items_rejected: AtomicU64,
213    items_timed_out: AtomicU64,
214    total_processing_ns: AtomicU64,
215    total_formation_ns: AtomicU64,
216    min_batch_size: AtomicU64,
217    max_batch_size_seen: AtomicU64,
218    consecutive_errors: AtomicU64,
219}
220
221impl BatchMetricsInner {
222    pub(super) fn new() -> Self {
223        Self {
224            batches_dispatched: AtomicU64::new(0),
225            items_processed: AtomicU64::new(0),
226            items_submitted: AtomicU64::new(0),
227            items_rejected: AtomicU64::new(0),
228            items_timed_out: AtomicU64::new(0),
229            total_processing_ns: AtomicU64::new(0),
230            total_formation_ns: AtomicU64::new(0),
231            min_batch_size: AtomicU64::new(u64::MAX),
232            max_batch_size_seen: AtomicU64::new(0),
233            consecutive_errors: AtomicU64::new(0),
234        }
235    }
236
237    pub(super) fn snapshot(&self, configured_max_batch_size: u64) -> BatchMetrics {
238        let min = self.min_batch_size.load(Ordering::Relaxed);
239        BatchMetrics {
240            batches_dispatched: self.batches_dispatched.load(Ordering::Relaxed),
241            items_processed: self.items_processed.load(Ordering::Relaxed),
242            items_submitted: self.items_submitted.load(Ordering::Relaxed),
243            items_rejected: self.items_rejected.load(Ordering::Relaxed),
244            items_timed_out: self.items_timed_out.load(Ordering::Relaxed),
245            total_processing_ns: self.total_processing_ns.load(Ordering::Relaxed),
246            total_formation_ns: self.total_formation_ns.load(Ordering::Relaxed),
247            min_batch_size: if min == u64::MAX { 0 } else { min },
248            max_batch_size_seen: self.max_batch_size_seen.load(Ordering::Relaxed),
249            configured_max_batch_size,
250            consecutive_errors: self.consecutive_errors.load(Ordering::Relaxed),
251        }
252    }
253
254    pub(super) fn record_dispatch(&self, batch_size: usize, formation_ns: u64, processing_ns: u64) {
255        let bs = batch_size as u64;
256        self.batches_dispatched.fetch_add(1, Ordering::Relaxed);
257        self.items_processed.fetch_add(bs, Ordering::Relaxed);
258        self.total_processing_ns
259            .fetch_add(processing_ns, Ordering::Relaxed);
260        self.total_formation_ns
261            .fetch_add(formation_ns, Ordering::Relaxed);
262        self.min_batch_size.fetch_min(bs, Ordering::Relaxed);
263        self.max_batch_size_seen.fetch_max(bs, Ordering::Relaxed);
264    }
265
266    pub(super) fn record_submission(&self) {
267        self.items_submitted.fetch_add(1, Ordering::Relaxed);
268    }
269
270    pub(super) fn record_rejection(&self) {
271        self.items_rejected.fetch_add(1, Ordering::Relaxed);
272    }
273
274    pub(super) fn record_timeout(&self) {
275        self.items_timed_out.fetch_add(1, Ordering::Relaxed);
276    }
277
278    pub(super) fn record_batch_success(&self) {
279        self.consecutive_errors.store(0, Ordering::Relaxed);
280    }
281
282    pub(super) fn record_batch_error(&self) {
283        self.consecutive_errors.fetch_add(1, Ordering::Relaxed);
284    }
285}