nv_runtime/batch/metrics.rs
1use std::sync::atomic::{AtomicU64, Ordering};
2
3// ---------------------------------------------------------------------------
4// BatchMetrics
5// ---------------------------------------------------------------------------
6
7/// Live metrics snapshot for a batch coordinator.
8///
9/// All counters are monotonically increasing. Compute rates by taking
10/// deltas between snapshots.
11///
12/// # Counter semantics
13///
14/// - **`items_submitted`** — incremented when a feed thread calls
15/// `submit_and_wait`, *before* the channel send. Every call
16/// increments this exactly once, regardless of outcome.
17/// - **`items_rejected`** — incremented when `try_send` fails
18/// (`QueueFull` or `Disconnected`). Rejected items never reach
19/// the coordinator.
20/// - **`items_processed`** — incremented by the coordinator after
21/// the batch processor returns (success or error). Each item in
22/// the dispatched batch is counted.
23///
24/// **Invariant** (approximate under concurrent reads):
25/// `items_submitted >= items_processed + items_rejected`.
26#[derive(Debug, Clone, Copy, Default)]
27pub struct BatchMetrics {
28 /// Total batches dispatched to the processor.
29 pub batches_dispatched: u64,
30 /// Total items successfully dispatched and processed (success or
31 /// error) by the batch processor. Does not include rejected items.
32 pub items_processed: u64,
33 /// Total items submitted by feed threads (includes both accepted
34 /// and rejected submissions).
35 pub items_submitted: u64,
36 /// Items rejected because the submission queue was full or the
37 /// coordinator was shut down (`Disconnected`). These items never
38 /// reached the coordinator thread.
39 pub items_rejected: u64,
40 /// Items whose response was not received before the feed-side
41 /// safety timeout (`max_latency + response_timeout`). The
42 /// coordinator may still process these items, but the feed thread
43 /// abandoned waiting.
44 ///
45 /// A non-zero value indicates the batch processor is slower than
46 /// the configured safety margin allows. Consider increasing
47 /// `response_timeout` or reducing `max_batch_size`.
48 pub items_timed_out: u64,
49 /// Cumulative batch processing time (nanoseconds).
50 pub total_processing_ns: u64,
51 /// Cumulative batch formation wait time (nanoseconds).
52 pub total_formation_ns: u64,
53 /// Smallest batch size dispatched (0 if no batches yet).
54 pub min_batch_size: u64,
55 /// Largest batch size dispatched.
56 pub max_batch_size_seen: u64,
57 /// The configured `max_batch_size` for this coordinator.
58 ///
59 /// Included in the snapshot so callers can compute fill ratios
60 /// without retaining a reference to the original config.
61 pub configured_max_batch_size: u64,
62 /// Number of consecutive batch errors (processor `Err` or panic)
63 /// since the last successful dispatch. Reset to 0 on success.
64 ///
65 /// Useful for alerting: a steadily increasing value indicates a
66 /// persistently broken processor. Zero means the last batch
67 /// succeeded.
68 pub consecutive_errors: u64,
69}
70
71impl BatchMetrics {
72 /// Approximate number of items currently in-flight (submitted but not
73 /// yet processed or rejected).
74 ///
75 /// Computed from atomic counters — may be briefly inconsistent under
76 /// heavy contention, but sufficient for monitoring and dashboards.
77 ///
78 /// **Caveat**: items that timed out on the feed side may still be
79 /// processed by the coordinator. When this happens, `items_processed`
80 /// includes the timed-out item and this counter can undercount. The
81 /// discrepancy is bounded by `items_timed_out`.
82 #[must_use]
83 pub fn pending_items(&self) -> u64 {
84 self.items_submitted
85 .saturating_sub(self.items_processed)
86 .saturating_sub(self.items_rejected)
87 }
88
89 /// Fraction of submissions rejected (`items_rejected / items_submitted`).
90 ///
91 /// Returns `None` if no items have been submitted.
92 /// A consistently non-zero value indicates sustained overload.
93 #[must_use]
94 pub fn rejection_rate(&self) -> Option<f64> {
95 if self.items_submitted == 0 {
96 return None;
97 }
98 Some(self.items_rejected as f64 / self.items_submitted as f64)
99 }
100
101 /// Fraction of submissions that timed out (`items_timed_out / items_submitted`).
102 ///
103 /// Returns `None` if no items have been submitted.
104 /// A non-zero value indicates the processor is too slow for the
105 /// configured safety timeout.
106 #[must_use]
107 pub fn timeout_rate(&self) -> Option<f64> {
108 if self.items_submitted == 0 {
109 return None;
110 }
111 Some(self.items_timed_out as f64 / self.items_submitted as f64)
112 }
113
114 /// Average batch size across all dispatched batches.
115 ///
116 /// Returns `None` if no batches have been dispatched yet.
117 /// O(1), zero-allocation.
118 #[must_use]
119 pub fn avg_batch_size(&self) -> Option<f64> {
120 if self.batches_dispatched == 0 {
121 return None;
122 }
123 Some(self.items_processed as f64 / self.batches_dispatched as f64)
124 }
125
126 /// Average batch fill ratio: `avg_batch_size / configured_max_batch_size`.
127 ///
128 /// A value of `1.0` means batches are consistently full. Lower values
129 /// indicate partial batches (dispatched on timeout rather than on size).
130 ///
131 /// Returns `None` if no batches have been dispatched yet.
132 /// O(1), zero-allocation.
133 #[must_use]
134 pub fn avg_fill_ratio(&self) -> Option<f64> {
135 let avg = self.avg_batch_size()?;
136 if self.configured_max_batch_size == 0 {
137 return None;
138 }
139 Some(avg / self.configured_max_batch_size as f64)
140 }
141
142 /// Average processing time per batch (nanoseconds).
143 ///
144 /// Returns `None` if no batches have been dispatched yet.
145 /// O(1), zero-allocation.
146 #[must_use]
147 pub fn avg_processing_ns(&self) -> Option<f64> {
148 if self.batches_dispatched == 0 {
149 return None;
150 }
151 Some(self.total_processing_ns as f64 / self.batches_dispatched as f64)
152 }
153
154 /// Average formation wait time per batch (nanoseconds).
155 ///
156 /// Formation time is the interval from the first item arriving to
157 /// the batch being dispatched. Lower values indicate faster batch
158 /// filling (high submission rate or small batch size).
159 ///
160 /// Returns `None` if no batches have been dispatched yet.
161 /// O(1), zero-allocation.
162 #[must_use]
163 pub fn avg_formation_ns(&self) -> Option<f64> {
164 if self.batches_dispatched == 0 {
165 return None;
166 }
167 Some(self.total_formation_ns as f64 / self.batches_dispatched as f64)
168 }
169}
170
171impl std::fmt::Display for BatchMetrics {
172 /// Human-readable diagnostic summary.
173 ///
174 /// Example output:
175 /// ```text
176 /// batches=120 items=480/500 rejected=15 timed_out=2 fill=0.80 \
177 /// avg_proc=12.5ms avg_form=8.2ms consec_err=0
178 /// ```
179 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
180 write!(
181 f,
182 "batches={} items={}/{} rejected={} timed_out={} fill={} avg_proc={} avg_form={} consec_err={}",
183 self.batches_dispatched,
184 self.items_processed,
185 self.items_submitted,
186 self.items_rejected,
187 self.items_timed_out,
188 self.avg_fill_ratio()
189 .map_or_else(|| "n/a".to_string(), |r| format!("{r:.2}")),
190 self.avg_processing_ns().map_or_else(
191 || "n/a".to_string(),
192 |ns| format!("{:.1}ms", ns / 1_000_000.0)
193 ),
194 self.avg_formation_ns().map_or_else(
195 || "n/a".to_string(),
196 |ns| format!("{:.1}ms", ns / 1_000_000.0)
197 ),
198 self.consecutive_errors,
199 )
200 }
201}
202
203// ---------------------------------------------------------------------------
204// BatchMetricsInner
205// ---------------------------------------------------------------------------
206
207/// Internal atomic counters for lock-free metrics recording.
208pub(super) struct BatchMetricsInner {
209 batches_dispatched: AtomicU64,
210 items_processed: AtomicU64,
211 items_submitted: AtomicU64,
212 items_rejected: AtomicU64,
213 items_timed_out: AtomicU64,
214 total_processing_ns: AtomicU64,
215 total_formation_ns: AtomicU64,
216 min_batch_size: AtomicU64,
217 max_batch_size_seen: AtomicU64,
218 consecutive_errors: AtomicU64,
219}
220
221impl BatchMetricsInner {
222 pub(super) fn new() -> Self {
223 Self {
224 batches_dispatched: AtomicU64::new(0),
225 items_processed: AtomicU64::new(0),
226 items_submitted: AtomicU64::new(0),
227 items_rejected: AtomicU64::new(0),
228 items_timed_out: AtomicU64::new(0),
229 total_processing_ns: AtomicU64::new(0),
230 total_formation_ns: AtomicU64::new(0),
231 min_batch_size: AtomicU64::new(u64::MAX),
232 max_batch_size_seen: AtomicU64::new(0),
233 consecutive_errors: AtomicU64::new(0),
234 }
235 }
236
237 pub(super) fn snapshot(&self, configured_max_batch_size: u64) -> BatchMetrics {
238 let min = self.min_batch_size.load(Ordering::Relaxed);
239 BatchMetrics {
240 batches_dispatched: self.batches_dispatched.load(Ordering::Relaxed),
241 items_processed: self.items_processed.load(Ordering::Relaxed),
242 items_submitted: self.items_submitted.load(Ordering::Relaxed),
243 items_rejected: self.items_rejected.load(Ordering::Relaxed),
244 items_timed_out: self.items_timed_out.load(Ordering::Relaxed),
245 total_processing_ns: self.total_processing_ns.load(Ordering::Relaxed),
246 total_formation_ns: self.total_formation_ns.load(Ordering::Relaxed),
247 min_batch_size: if min == u64::MAX { 0 } else { min },
248 max_batch_size_seen: self.max_batch_size_seen.load(Ordering::Relaxed),
249 configured_max_batch_size,
250 consecutive_errors: self.consecutive_errors.load(Ordering::Relaxed),
251 }
252 }
253
254 pub(super) fn record_dispatch(&self, batch_size: usize, formation_ns: u64, processing_ns: u64) {
255 let bs = batch_size as u64;
256 self.batches_dispatched.fetch_add(1, Ordering::Relaxed);
257 self.items_processed.fetch_add(bs, Ordering::Relaxed);
258 self.total_processing_ns
259 .fetch_add(processing_ns, Ordering::Relaxed);
260 self.total_formation_ns
261 .fetch_add(formation_ns, Ordering::Relaxed);
262 self.min_batch_size.fetch_min(bs, Ordering::Relaxed);
263 self.max_batch_size_seen.fetch_max(bs, Ordering::Relaxed);
264 }
265
266 pub(super) fn record_submission(&self) {
267 self.items_submitted.fetch_add(1, Ordering::Relaxed);
268 }
269
270 pub(super) fn record_rejection(&self) {
271 self.items_rejected.fetch_add(1, Ordering::Relaxed);
272 }
273
274 pub(super) fn record_timeout(&self) {
275 self.items_timed_out.fetch_add(1, Ordering::Relaxed);
276 }
277
278 pub(super) fn record_batch_success(&self) {
279 self.consecutive_errors.store(0, Ordering::Relaxed);
280 }
281
282 pub(super) fn record_batch_error(&self) {
283 self.consecutive_errors.fetch_add(1, Ordering::Relaxed);
284 }
285}