1use std::sync::atomic::{AtomicU64, Ordering};
19use std::time::{Duration, Instant};
20
21#[derive(Debug, Clone, Copy, PartialEq, Eq)]
24pub enum RequestKind {
25 Frontier,
27 GapFill,
29}
30
31#[derive(Debug, Clone)]
34pub struct RequestStats {
35 pub from_block: u64,
37 pub requested_end: u64,
39 pub next_block: u64,
41 pub requested_blocks: u64,
43 pub actual_blocks: u64,
45 pub projected_blocks: u64,
47 pub response_bytes: u64,
49 pub target_bytes: u64,
51 pub size_ratio: f64,
53 pub bytes_per_block: f64,
55 pub truncated: bool,
57 pub kind: RequestKind,
59 pub duration: Duration,
61}
62
63pub const NUM_SIZE_BUCKETS: usize = 6;
65
66pub const SIZE_BUCKET_LABELS: [&str; NUM_SIZE_BUCKETS] = [
69 "<0.25", "0.25-0.5", "0.5-0.75", "0.75-1.0", "1.0-1.25", ">1.25",
70];
71
72fn size_bucket(ratio: f64) -> usize {
74 if ratio < 0.25 {
75 0
76 } else if ratio < 0.5 {
77 1
78 } else if ratio < 0.75 {
79 2
80 } else if ratio < 1.0 {
81 3
82 } else if ratio < 1.25 {
83 4
84 } else {
85 5
86 }
87}
88
89pub trait StreamObserver: Send + Sync {
94 fn on_request(&self, stats: &RequestStats);
96 fn on_progress(&self, _in_flight: u64, _buffered_bytes: u64) {}
100 fn on_finish(&self, _summary: &StreamSummary) {}
103}
104
105#[derive(Debug, Clone)]
108pub struct StreamSummary {
109 pub num_requests: u64,
111 pub num_truncated: u64,
113 pub truncation_rate: f64,
115 pub total_bytes: u64,
117 pub total_blocks: u64,
119 pub wall_clock: Duration,
122 pub blocks_per_sec: f64,
124 pub bytes_per_sec: f64,
126 pub mean_size_ratio: f64,
128 pub size_histogram: [u64; NUM_SIZE_BUCKETS],
130 pub mean_bytes_per_block: f64,
132 pub min_blocks: u64,
134 pub mean_blocks: f64,
136 pub max_blocks: u64,
138 pub max_buffered_bytes_observed: u64,
140 pub mean_in_flight: f64,
142 pub num_frontier: u64,
144 pub num_gap_fill: u64,
146}
147
148#[derive(Debug)]
154pub struct StreamMetrics {
155 num_requests: AtomicU64,
156 num_truncated: AtomicU64,
157 total_bytes: AtomicU64,
158 total_blocks: AtomicU64,
159 size_ratio_micros: AtomicU64,
161 size_histogram: [AtomicU64; NUM_SIZE_BUCKETS],
162 min_blocks: AtomicU64,
163 max_blocks: AtomicU64,
164 max_buffered_bytes_observed: AtomicU64,
165 in_flight_sum: AtomicU64,
166 in_flight_samples: AtomicU64,
167 num_frontier: AtomicU64,
168 num_gap_fill: AtomicU64,
169 created: Instant,
171 elapsed_nanos: AtomicU64,
174}
175
176impl Default for StreamMetrics {
177 fn default() -> Self {
178 Self {
179 num_requests: AtomicU64::new(0),
180 num_truncated: AtomicU64::new(0),
181 total_bytes: AtomicU64::new(0),
182 total_blocks: AtomicU64::new(0),
183 size_ratio_micros: AtomicU64::new(0),
184 size_histogram: Default::default(),
185 min_blocks: AtomicU64::new(u64::MAX),
186 max_blocks: AtomicU64::new(0),
187 max_buffered_bytes_observed: AtomicU64::new(0),
188 in_flight_sum: AtomicU64::new(0),
189 in_flight_samples: AtomicU64::new(0),
190 num_frontier: AtomicU64::new(0),
191 num_gap_fill: AtomicU64::new(0),
192 created: Instant::now(),
193 elapsed_nanos: AtomicU64::new(0),
194 }
195 }
196}
197
198impl StreamMetrics {
199 pub fn new() -> Self {
201 Self::default()
202 }
203
204 pub fn record_buffered_bytes(&self, bytes: u64) {
206 self.max_buffered_bytes_observed
207 .fetch_max(bytes, Ordering::Relaxed);
208 }
209
210 pub fn record_in_flight(&self, count: u64) {
213 self.in_flight_sum.fetch_add(count, Ordering::Relaxed);
214 self.in_flight_samples.fetch_add(1, Ordering::Relaxed);
215 }
216
217 pub fn record_elapsed(&self, elapsed: Duration) {
219 self.elapsed_nanos
220 .fetch_max(elapsed.as_nanos() as u64, Ordering::Relaxed);
221 }
222
223 pub fn summary(&self) -> StreamSummary {
225 let num_requests = self.num_requests.load(Ordering::Relaxed);
226 let num_truncated = self.num_truncated.load(Ordering::Relaxed);
227 let total_bytes = self.total_bytes.load(Ordering::Relaxed);
228 let total_blocks = self.total_blocks.load(Ordering::Relaxed);
229 let in_flight_samples = self.in_flight_samples.load(Ordering::Relaxed);
230 let in_flight_sum = self.in_flight_sum.load(Ordering::Relaxed);
231
232 let mut size_histogram = [0u64; NUM_SIZE_BUCKETS];
233 for (dst, src) in size_histogram.iter_mut().zip(self.size_histogram.iter()) {
234 *dst = src.load(Ordering::Relaxed);
235 }
236
237 let wall_clock = {
240 let fed = self.elapsed_nanos.load(Ordering::Relaxed);
241 if fed > 0 {
242 Duration::from_nanos(fed)
243 } else {
244 self.created.elapsed()
245 }
246 };
247 let secs = wall_clock.as_secs_f64();
248
249 let div = |num: f64, den: f64| if den > 0.0 { num / den } else { 0.0 };
250
251 StreamSummary {
252 num_requests,
253 num_truncated,
254 truncation_rate: div(num_truncated as f64, num_requests as f64),
255 total_bytes,
256 total_blocks,
257 wall_clock,
258 blocks_per_sec: div(total_blocks as f64, secs),
259 bytes_per_sec: div(total_bytes as f64, secs),
260 mean_size_ratio: div(
261 self.size_ratio_micros.load(Ordering::Relaxed) as f64 / 1_000_000.0,
262 num_requests as f64,
263 ),
264 size_histogram,
265 mean_bytes_per_block: div(total_bytes as f64, total_blocks as f64),
266 min_blocks: {
267 let m = self.min_blocks.load(Ordering::Relaxed);
268 if m == u64::MAX {
269 0
270 } else {
271 m
272 }
273 },
274 mean_blocks: div(total_blocks as f64, num_requests as f64),
275 max_blocks: self.max_blocks.load(Ordering::Relaxed),
276 max_buffered_bytes_observed: self.max_buffered_bytes_observed.load(Ordering::Relaxed),
277 mean_in_flight: div(in_flight_sum as f64, in_flight_samples as f64),
278 num_frontier: self.num_frontier.load(Ordering::Relaxed),
279 num_gap_fill: self.num_gap_fill.load(Ordering::Relaxed),
280 }
281 }
282}
283
284impl StreamObserver for StreamMetrics {
285 fn on_request(&self, stats: &RequestStats) {
286 self.num_requests.fetch_add(1, Ordering::Relaxed);
287 if stats.truncated {
288 self.num_truncated.fetch_add(1, Ordering::Relaxed);
289 }
290 self.total_bytes
291 .fetch_add(stats.response_bytes, Ordering::Relaxed);
292 self.total_blocks
293 .fetch_add(stats.actual_blocks, Ordering::Relaxed);
294 self.size_ratio_micros.fetch_add(
295 (stats.size_ratio * 1_000_000.0).round() as u64,
296 Ordering::Relaxed,
297 );
298 self.size_histogram[size_bucket(stats.size_ratio)].fetch_add(1, Ordering::Relaxed);
299 self.min_blocks
300 .fetch_min(stats.actual_blocks, Ordering::Relaxed);
301 self.max_blocks
302 .fetch_max(stats.actual_blocks, Ordering::Relaxed);
303 match stats.kind {
304 RequestKind::Frontier => self.num_frontier.fetch_add(1, Ordering::Relaxed),
305 RequestKind::GapFill => self.num_gap_fill.fetch_add(1, Ordering::Relaxed),
306 };
307 }
308
309 fn on_progress(&self, in_flight: u64, buffered_bytes: u64) {
310 self.record_in_flight(in_flight);
311 self.record_buffered_bytes(buffered_bytes);
312 }
313}
314
315#[cfg(test)]
316mod tests {
317 use super::*;
318
319 fn stats(
320 actual_blocks: u64,
321 response_bytes: u64,
322 target: u64,
323 truncated: bool,
324 ) -> RequestStats {
325 let size_ratio = response_bytes as f64 / target as f64;
326 RequestStats {
327 from_block: 0,
328 requested_end: actual_blocks,
329 next_block: actual_blocks,
330 requested_blocks: actual_blocks,
331 actual_blocks,
332 projected_blocks: actual_blocks,
333 response_bytes,
334 target_bytes: target,
335 size_ratio,
336 bytes_per_block: response_bytes as f64 / actual_blocks as f64,
337 truncated,
338 kind: RequestKind::Frontier,
339 duration: Duration::from_millis(10),
340 }
341 }
342
343 #[test]
344 fn size_bucket_boundaries() {
345 assert_eq!(size_bucket(0.0), 0);
346 assert_eq!(size_bucket(0.24), 0);
347 assert_eq!(size_bucket(0.25), 1);
348 assert_eq!(size_bucket(0.49), 1);
349 assert_eq!(size_bucket(0.5), 2);
350 assert_eq!(size_bucket(0.74), 2);
351 assert_eq!(size_bucket(0.75), 3);
352 assert_eq!(size_bucket(0.99), 3);
353 assert_eq!(size_bucket(1.0), 4);
354 assert_eq!(size_bucket(1.24), 4);
355 assert_eq!(size_bucket(1.25), 5);
356 assert_eq!(size_bucket(10.0), 5);
357 }
358
359 #[test]
360 fn empty_summary_is_zeroed() {
361 let m = StreamMetrics::new();
362 let s = m.summary();
363 assert_eq!(s.num_requests, 0);
364 assert_eq!(s.truncation_rate, 0.0);
365 assert_eq!(s.min_blocks, 0);
366 assert_eq!(s.max_blocks, 0);
367 assert_eq!(s.mean_in_flight, 0.0);
368 assert_eq!(s.blocks_per_sec, 0.0);
369 }
370
371 #[test]
372 fn aggregates_requests() {
373 let m = StreamMetrics::new();
374 m.on_request(&stats(100, 200_000, 400_000, true)); m.on_request(&stats(200, 400_000, 400_000, false)); m.on_request(&stats(50, 600_000, 400_000, false)); let s = m.summary();
380 assert_eq!(s.num_requests, 3);
381 assert_eq!(s.num_truncated, 1);
382 assert!((s.truncation_rate - 1.0 / 3.0).abs() < 1e-9);
383 assert_eq!(s.total_bytes, 1_200_000);
384 assert_eq!(s.total_blocks, 350);
385 assert_eq!(s.min_blocks, 50);
386 assert_eq!(s.max_blocks, 200);
387 assert!((s.mean_blocks - 350.0 / 3.0).abs() < 1e-9);
388 assert!((s.mean_size_ratio - 1.0).abs() < 1e-9); assert_eq!(s.size_histogram[2], 1);
390 assert_eq!(s.size_histogram[4], 1);
391 assert_eq!(s.size_histogram[5], 1);
392 assert!((s.mean_bytes_per_block - 1_200_000.0 / 350.0).abs() < 1e-6);
393 assert_eq!(s.num_frontier, 3);
394 assert_eq!(s.num_gap_fill, 0);
395 }
396
397 #[test]
398 fn in_flight_and_buffer_tracking() {
399 let m = StreamMetrics::new();
400 m.record_in_flight(2);
401 m.record_in_flight(4);
402 m.record_buffered_bytes(1000);
403 m.record_buffered_bytes(500);
404 let s = m.summary();
405 assert_eq!(s.mean_in_flight, 3.0);
406 assert_eq!(s.max_buffered_bytes_observed, 1000);
407 }
408
409 #[test]
410 fn on_progress_feeds_in_flight_and_buffer() {
411 let m = StreamMetrics::new();
412 m.on_progress(1, 100);
413 m.on_progress(5, 50);
414 let s = m.summary();
415 assert_eq!(s.mean_in_flight, 3.0);
416 assert_eq!(s.max_buffered_bytes_observed, 100);
417 }
418
419 #[test]
420 fn throughput_from_elapsed() {
421 let m = StreamMetrics::new();
422 m.on_request(&stats(1000, 400_000, 400_000, false));
423 m.record_elapsed(Duration::from_secs(2));
424 let s = m.summary();
425 assert_eq!(s.blocks_per_sec, 500.0);
426 assert_eq!(s.bytes_per_sec, 200_000.0);
427 }
428
429 #[test]
430 fn gap_fill_counted() {
431 let m = StreamMetrics::new();
432 let mut st = stats(100, 100_000, 400_000, false);
433 st.kind = RequestKind::GapFill;
434 m.on_request(&st);
435 let s = m.summary();
436 assert_eq!(s.num_gap_fill, 1);
437 assert_eq!(s.num_frontier, 0);
438 }
439}