Skip to main content

hypersync_client/
metrics.rs

1//! Observability for the streaming engine.
2//!
3//! The scheduler can optionally report, for every completed HTTP request, a
4//! [`RequestStats`] record and, when the stream ends, an aggregate
5//! [`StreamSummary`]. These are surfaced through the [`StreamObserver`] trait,
6//! which is attached explicitly via
7//! [`Client::stream_arrow_with_observer`](crate::Client::stream_arrow_with_observer).
8//!
9//! [`StreamMetrics`] is a ready-made, thread-safe observer implementation
10//! (atomics plus a small fixed histogram) that the caller constructs, passes in,
11//! and reads from during or after the stream. Power users can implement
12//! [`StreamObserver`] themselves to collect raw [`RequestStats`] (for exact
13//! percentiles, custom exporters, etc.).
14//!
15//! When no observer is attached the engine does no metrics work at all — there
16//! is zero overhead on the default streaming paths.
17
18use std::sync::atomic::{AtomicU64, Ordering};
19use std::time::{Duration, Instant};
20
21/// Whether a scheduled request extended the frontier or backfilled a gap left
22/// by an earlier truncated response.
23#[derive(Debug, Clone, Copy, PartialEq, Eq)]
24pub enum RequestKind {
25    /// Extends the frontier toward the upper bound.
26    Frontier,
27    /// Backfills a gap below the frontier left by an earlier truncated response.
28    GapFill,
29}
30
31/// Metrics for a single completed HTTP request, reported via
32/// [`StreamObserver::on_request`].
33#[derive(Debug, Clone)]
34pub struct RequestStats {
35    /// First block of the request (inclusive).
36    pub from_block: u64,
37    /// Requested exclusive end of the range.
38    pub requested_end: u64,
39    /// Exclusive end actually covered by the response.
40    pub next_block: u64,
41    /// `requested_end - from_block`.
42    pub requested_blocks: u64,
43    /// `next_block - from_block` (blocks actually covered).
44    pub actual_blocks: u64,
45    /// Block span the projector aimed for, before clamping to the hole.
46    pub projected_blocks: u64,
47    /// HTTP response body size in bytes.
48    pub response_bytes: u64,
49    /// Configured `response_bytes_target` at the time of the request.
50    pub target_bytes: u64,
51    /// `response_bytes / target_bytes`.
52    pub size_ratio: f64,
53    /// Observed density, `response_bytes / actual_blocks`.
54    pub bytes_per_block: f64,
55    /// `next_block < requested_end` — the server stopped early.
56    pub truncated: bool,
57    /// Whether this request extended the frontier or backfilled a gap.
58    pub kind: RequestKind,
59    /// Wall-clock latency of the request (fetch + decode/map).
60    pub duration: Duration,
61}
62
63/// Number of size-vs-target histogram buckets.
64pub const NUM_SIZE_BUCKETS: usize = 6;
65
66/// Human-readable labels for the size-vs-target histogram buckets, relative to
67/// `response_bytes_target`.
68pub const SIZE_BUCKET_LABELS: [&str; NUM_SIZE_BUCKETS] = [
69    "<0.25", "0.25-0.5", "0.5-0.75", "0.75-1.0", "1.0-1.25", ">1.25",
70];
71
72/// Index of the size-vs-target histogram bucket for a given `size_ratio`.
73fn size_bucket(ratio: f64) -> usize {
74    if ratio < 0.25 {
75        0
76    } else if ratio < 0.5 {
77        1
78    } else if ratio < 0.75 {
79        2
80    } else if ratio < 1.0 {
81        3
82    } else if ratio < 1.25 {
83        4
84    } else {
85        5
86    }
87}
88
89/// Observer that receives streaming metrics.
90///
91/// Implementations must be cheap and non-blocking: `on_request` is called on the
92/// scheduler task once per completed HTTP request.
93pub trait StreamObserver: Send + Sync {
94    /// Called once for every completed HTTP request.
95    fn on_request(&self, stats: &RequestStats);
96    /// Called once per scheduler iteration with the current in-flight request
97    /// count and undelivered reorder-buffer size. Lets observers track buffer
98    /// high-water and concurrency saturation live. Default: no-op.
99    fn on_progress(&self, _in_flight: u64, _buffered_bytes: u64) {}
100    /// Called once when the stream finishes (or is closed early), with the final
101    /// aggregate summary. Default: no-op.
102    fn on_finish(&self, _summary: &StreamSummary) {}
103}
104
105/// Aggregate summary of a stream run, produced by [`StreamMetrics::summary`] and
106/// passed to [`StreamObserver::on_finish`].
107#[derive(Debug, Clone)]
108pub struct StreamSummary {
109    /// Total completed requests.
110    pub num_requests: u64,
111    /// Requests whose response was truncated before the requested end.
112    pub num_truncated: u64,
113    /// `num_truncated / num_requests` (0 when no requests).
114    pub truncation_rate: f64,
115    /// Sum of response body sizes.
116    pub total_bytes: u64,
117    /// Sum of blocks covered.
118    pub total_blocks: u64,
119    /// Wall-clock span of the run (engine-fed when finished, else measured live
120    /// from when the metrics handle was created).
121    pub wall_clock: Duration,
122    /// `total_blocks / wall_clock` (0 when no elapsed time).
123    pub blocks_per_sec: f64,
124    /// `total_bytes / wall_clock` (0 when no elapsed time).
125    pub bytes_per_sec: f64,
126    /// Mean `size_ratio` across requests.
127    pub mean_size_ratio: f64,
128    /// Size-vs-target histogram counts, see [`SIZE_BUCKET_LABELS`].
129    pub size_histogram: [u64; NUM_SIZE_BUCKETS],
130    /// Mean density (`total_bytes / total_blocks`).
131    pub mean_bytes_per_block: f64,
132    /// Smallest block-range size requested-and-covered.
133    pub min_blocks: u64,
134    /// Mean covered block-range size (`total_blocks / num_requests`).
135    pub mean_blocks: f64,
136    /// Largest block-range size covered.
137    pub max_blocks: u64,
138    /// High-water mark of the undelivered reorder buffer, in bytes.
139    pub max_buffered_bytes_observed: u64,
140    /// Mean number of in-flight requests across scheduler iterations.
141    pub mean_in_flight: f64,
142    /// Requests that extended the frontier.
143    pub num_frontier: u64,
144    /// Requests that backfilled a gap.
145    pub num_gap_fill: u64,
146}
147
148/// Thread-safe aggregate metrics handle.
149///
150/// Construct one, pass an `Arc<StreamMetrics>` to
151/// [`Client::stream_arrow_with_observer`](crate::Client::stream_arrow_with_observer),
152/// keep a clone, and call [`summary`](Self::summary) during or after the stream.
153#[derive(Debug)]
154pub struct StreamMetrics {
155    num_requests: AtomicU64,
156    num_truncated: AtomicU64,
157    total_bytes: AtomicU64,
158    total_blocks: AtomicU64,
159    /// Sum of `size_ratio * 1_000_000`, for the running mean.
160    size_ratio_micros: AtomicU64,
161    size_histogram: [AtomicU64; NUM_SIZE_BUCKETS],
162    min_blocks: AtomicU64,
163    max_blocks: AtomicU64,
164    max_buffered_bytes_observed: AtomicU64,
165    in_flight_sum: AtomicU64,
166    in_flight_samples: AtomicU64,
167    num_frontier: AtomicU64,
168    num_gap_fill: AtomicU64,
169    /// When the handle was created; used to derive wall-clock live.
170    created: Instant,
171    /// Optional precise elapsed override (nanos) fed by the engine at end of
172    /// stream. `0` means "unset", in which case `created.elapsed()` is used.
173    elapsed_nanos: AtomicU64,
174}
175
176impl Default for StreamMetrics {
177    fn default() -> Self {
178        Self {
179            num_requests: AtomicU64::new(0),
180            num_truncated: AtomicU64::new(0),
181            total_bytes: AtomicU64::new(0),
182            total_blocks: AtomicU64::new(0),
183            size_ratio_micros: AtomicU64::new(0),
184            size_histogram: Default::default(),
185            min_blocks: AtomicU64::new(u64::MAX),
186            max_blocks: AtomicU64::new(0),
187            max_buffered_bytes_observed: AtomicU64::new(0),
188            in_flight_sum: AtomicU64::new(0),
189            in_flight_samples: AtomicU64::new(0),
190            num_frontier: AtomicU64::new(0),
191            num_gap_fill: AtomicU64::new(0),
192            created: Instant::now(),
193            elapsed_nanos: AtomicU64::new(0),
194        }
195    }
196}
197
198impl StreamMetrics {
199    /// Create an empty metrics handle.
200    pub fn new() -> Self {
201        Self::default()
202    }
203
204    /// Record the current undelivered-buffer size; keeps the observed maximum.
205    pub fn record_buffered_bytes(&self, bytes: u64) {
206        self.max_buffered_bytes_observed
207            .fetch_max(bytes, Ordering::Relaxed);
208    }
209
210    /// Record the current in-flight request count (one sample per scheduler
211    /// iteration), for the mean-in-flight estimate.
212    pub fn record_in_flight(&self, count: u64) {
213        self.in_flight_sum.fetch_add(count, Ordering::Relaxed);
214        self.in_flight_samples.fetch_add(1, Ordering::Relaxed);
215    }
216
217    /// Feed the engine's monotonic elapsed time so throughput can be derived.
218    pub fn record_elapsed(&self, elapsed: Duration) {
219        self.elapsed_nanos
220            .fetch_max(elapsed.as_nanos() as u64, Ordering::Relaxed);
221    }
222
223    /// Snapshot the current aggregate summary. Safe to call at any time.
224    pub fn summary(&self) -> StreamSummary {
225        let num_requests = self.num_requests.load(Ordering::Relaxed);
226        let num_truncated = self.num_truncated.load(Ordering::Relaxed);
227        let total_bytes = self.total_bytes.load(Ordering::Relaxed);
228        let total_blocks = self.total_blocks.load(Ordering::Relaxed);
229        let in_flight_samples = self.in_flight_samples.load(Ordering::Relaxed);
230        let in_flight_sum = self.in_flight_sum.load(Ordering::Relaxed);
231
232        let mut size_histogram = [0u64; NUM_SIZE_BUCKETS];
233        for (dst, src) in size_histogram.iter_mut().zip(self.size_histogram.iter()) {
234            *dst = src.load(Ordering::Relaxed);
235        }
236
237        // Prefer the engine-fed precise elapsed override; otherwise measure live
238        // from when this handle was created.
239        let wall_clock = {
240            let fed = self.elapsed_nanos.load(Ordering::Relaxed);
241            if fed > 0 {
242                Duration::from_nanos(fed)
243            } else {
244                self.created.elapsed()
245            }
246        };
247        let secs = wall_clock.as_secs_f64();
248
249        let div = |num: f64, den: f64| if den > 0.0 { num / den } else { 0.0 };
250
251        StreamSummary {
252            num_requests,
253            num_truncated,
254            truncation_rate: div(num_truncated as f64, num_requests as f64),
255            total_bytes,
256            total_blocks,
257            wall_clock,
258            blocks_per_sec: div(total_blocks as f64, secs),
259            bytes_per_sec: div(total_bytes as f64, secs),
260            mean_size_ratio: div(
261                self.size_ratio_micros.load(Ordering::Relaxed) as f64 / 1_000_000.0,
262                num_requests as f64,
263            ),
264            size_histogram,
265            mean_bytes_per_block: div(total_bytes as f64, total_blocks as f64),
266            min_blocks: {
267                let m = self.min_blocks.load(Ordering::Relaxed);
268                if m == u64::MAX {
269                    0
270                } else {
271                    m
272                }
273            },
274            mean_blocks: div(total_blocks as f64, num_requests as f64),
275            max_blocks: self.max_blocks.load(Ordering::Relaxed),
276            max_buffered_bytes_observed: self.max_buffered_bytes_observed.load(Ordering::Relaxed),
277            mean_in_flight: div(in_flight_sum as f64, in_flight_samples as f64),
278            num_frontier: self.num_frontier.load(Ordering::Relaxed),
279            num_gap_fill: self.num_gap_fill.load(Ordering::Relaxed),
280        }
281    }
282}
283
284impl StreamObserver for StreamMetrics {
285    fn on_request(&self, stats: &RequestStats) {
286        self.num_requests.fetch_add(1, Ordering::Relaxed);
287        if stats.truncated {
288            self.num_truncated.fetch_add(1, Ordering::Relaxed);
289        }
290        self.total_bytes
291            .fetch_add(stats.response_bytes, Ordering::Relaxed);
292        self.total_blocks
293            .fetch_add(stats.actual_blocks, Ordering::Relaxed);
294        self.size_ratio_micros.fetch_add(
295            (stats.size_ratio * 1_000_000.0).round() as u64,
296            Ordering::Relaxed,
297        );
298        self.size_histogram[size_bucket(stats.size_ratio)].fetch_add(1, Ordering::Relaxed);
299        self.min_blocks
300            .fetch_min(stats.actual_blocks, Ordering::Relaxed);
301        self.max_blocks
302            .fetch_max(stats.actual_blocks, Ordering::Relaxed);
303        match stats.kind {
304            RequestKind::Frontier => self.num_frontier.fetch_add(1, Ordering::Relaxed),
305            RequestKind::GapFill => self.num_gap_fill.fetch_add(1, Ordering::Relaxed),
306        };
307    }
308
309    fn on_progress(&self, in_flight: u64, buffered_bytes: u64) {
310        self.record_in_flight(in_flight);
311        self.record_buffered_bytes(buffered_bytes);
312    }
313}
314
315#[cfg(test)]
316mod tests {
317    use super::*;
318
319    fn stats(
320        actual_blocks: u64,
321        response_bytes: u64,
322        target: u64,
323        truncated: bool,
324    ) -> RequestStats {
325        let size_ratio = response_bytes as f64 / target as f64;
326        RequestStats {
327            from_block: 0,
328            requested_end: actual_blocks,
329            next_block: actual_blocks,
330            requested_blocks: actual_blocks,
331            actual_blocks,
332            projected_blocks: actual_blocks,
333            response_bytes,
334            target_bytes: target,
335            size_ratio,
336            bytes_per_block: response_bytes as f64 / actual_blocks as f64,
337            truncated,
338            kind: RequestKind::Frontier,
339            duration: Duration::from_millis(10),
340        }
341    }
342
343    #[test]
344    fn size_bucket_boundaries() {
345        assert_eq!(size_bucket(0.0), 0);
346        assert_eq!(size_bucket(0.24), 0);
347        assert_eq!(size_bucket(0.25), 1);
348        assert_eq!(size_bucket(0.49), 1);
349        assert_eq!(size_bucket(0.5), 2);
350        assert_eq!(size_bucket(0.74), 2);
351        assert_eq!(size_bucket(0.75), 3);
352        assert_eq!(size_bucket(0.99), 3);
353        assert_eq!(size_bucket(1.0), 4);
354        assert_eq!(size_bucket(1.24), 4);
355        assert_eq!(size_bucket(1.25), 5);
356        assert_eq!(size_bucket(10.0), 5);
357    }
358
359    #[test]
360    fn empty_summary_is_zeroed() {
361        let m = StreamMetrics::new();
362        let s = m.summary();
363        assert_eq!(s.num_requests, 0);
364        assert_eq!(s.truncation_rate, 0.0);
365        assert_eq!(s.min_blocks, 0);
366        assert_eq!(s.max_blocks, 0);
367        assert_eq!(s.mean_in_flight, 0.0);
368        assert_eq!(s.blocks_per_sec, 0.0);
369    }
370
371    #[test]
372    fn aggregates_requests() {
373        let m = StreamMetrics::new();
374        // target = 400k. Three requests: under, on-target, over.
375        m.on_request(&stats(100, 200_000, 400_000, true)); // ratio 0.5 -> bucket 2
376        m.on_request(&stats(200, 400_000, 400_000, false)); // ratio 1.0 -> bucket 4
377        m.on_request(&stats(50, 600_000, 400_000, false)); // ratio 1.5 -> bucket 5
378
379        let s = m.summary();
380        assert_eq!(s.num_requests, 3);
381        assert_eq!(s.num_truncated, 1);
382        assert!((s.truncation_rate - 1.0 / 3.0).abs() < 1e-9);
383        assert_eq!(s.total_bytes, 1_200_000);
384        assert_eq!(s.total_blocks, 350);
385        assert_eq!(s.min_blocks, 50);
386        assert_eq!(s.max_blocks, 200);
387        assert!((s.mean_blocks - 350.0 / 3.0).abs() < 1e-9);
388        assert!((s.mean_size_ratio - 1.0).abs() < 1e-9); // (0.5+1.0+1.5)/3
389        assert_eq!(s.size_histogram[2], 1);
390        assert_eq!(s.size_histogram[4], 1);
391        assert_eq!(s.size_histogram[5], 1);
392        assert!((s.mean_bytes_per_block - 1_200_000.0 / 350.0).abs() < 1e-6);
393        assert_eq!(s.num_frontier, 3);
394        assert_eq!(s.num_gap_fill, 0);
395    }
396
397    #[test]
398    fn in_flight_and_buffer_tracking() {
399        let m = StreamMetrics::new();
400        m.record_in_flight(2);
401        m.record_in_flight(4);
402        m.record_buffered_bytes(1000);
403        m.record_buffered_bytes(500);
404        let s = m.summary();
405        assert_eq!(s.mean_in_flight, 3.0);
406        assert_eq!(s.max_buffered_bytes_observed, 1000);
407    }
408
409    #[test]
410    fn on_progress_feeds_in_flight_and_buffer() {
411        let m = StreamMetrics::new();
412        m.on_progress(1, 100);
413        m.on_progress(5, 50);
414        let s = m.summary();
415        assert_eq!(s.mean_in_flight, 3.0);
416        assert_eq!(s.max_buffered_bytes_observed, 100);
417    }
418
419    #[test]
420    fn throughput_from_elapsed() {
421        let m = StreamMetrics::new();
422        m.on_request(&stats(1000, 400_000, 400_000, false));
423        m.record_elapsed(Duration::from_secs(2));
424        let s = m.summary();
425        assert_eq!(s.blocks_per_sec, 500.0);
426        assert_eq!(s.bytes_per_sec, 200_000.0);
427    }
428
429    #[test]
430    fn gap_fill_counted() {
431        let m = StreamMetrics::new();
432        let mut st = stats(100, 100_000, 400_000, false);
433        st.kind = RequestKind::GapFill;
434        m.on_request(&st);
435        let s = m.summary();
436        assert_eq!(s.num_gap_fill, 1);
437        assert_eq!(s.num_frontier, 0);
438    }
439}