Skip to main content

datafusion_distributed/metrics/
latency_metric.rs

1use datafusion::common::instant::Instant;
2use datafusion::common::{Result, human_readable_duration};
3use datafusion::physical_expr_common::metrics::{MetricBuilder, MetricValue};
4use datafusion::physical_plan::metrics::CustomMetricValue;
5use sketches_ddsketch::{Config, DDSketch};
6use std::any::Any;
7use std::borrow::Cow;
8use std::fmt::{Display, Formatter};
9use std::sync::Arc;
10use std::sync::Mutex;
11use std::sync::atomic::AtomicUsize;
12use std::sync::atomic::Ordering::Relaxed;
13use std::time::Duration;
14
15/// Extension trait for DataFusion's metric system that adds support for latency related metrics.
16pub trait LatencyMetricExt {
17    fn min_latency(self, name: impl Into<Cow<'static, str>>) -> MinLatencyMetric;
18    fn max_latency(self, name: impl Into<Cow<'static, str>>) -> MaxLatencyMetric;
19    fn avg_latency(self, name: impl Into<Cow<'static, str>>) -> AvgLatencyMetric;
20    fn first_latency(self, name: impl Into<Cow<'static, str>>) -> FirstLatencyMetric;
21    fn p50_latency(self, name: impl Into<Cow<'static, str>>) -> P50LatencyMetric;
22    fn p75_latency(self, name: impl Into<Cow<'static, str>>) -> P75LatencyMetric;
23    fn p95_latency(self, name: impl Into<Cow<'static, str>>) -> P95LatencyMetric;
24    fn p99_latency(self, name: impl Into<Cow<'static, str>>) -> P99LatencyMetric;
25}
26
27impl LatencyMetricExt for MetricBuilder<'_> {
28    fn min_latency(self, name: impl Into<Cow<'static, str>>) -> MinLatencyMetric {
29        let value = MinLatencyMetric::default();
30        self.build(MetricValue::Custom {
31            name: name.into(),
32            value: Arc::new(value.clone()),
33        });
34        value
35    }
36
37    fn max_latency(self, name: impl Into<Cow<'static, str>>) -> MaxLatencyMetric {
38        let value = MaxLatencyMetric::default();
39        self.build(MetricValue::Custom {
40            name: name.into(),
41            value: Arc::new(value.clone()),
42        });
43        value
44    }
45
46    fn avg_latency(self, name: impl Into<Cow<'static, str>>) -> AvgLatencyMetric {
47        let value = AvgLatencyMetric::default();
48        self.build(MetricValue::Custom {
49            name: name.into(),
50            value: Arc::new(value.clone()),
51        });
52        value
53    }
54
55    fn first_latency(self, name: impl Into<Cow<'static, str>>) -> FirstLatencyMetric {
56        let value = FirstLatencyMetric::default();
57        self.build(MetricValue::Custom {
58            name: name.into(),
59            value: Arc::new(value.clone()),
60        });
61        value
62    }
63
64    fn p50_latency(self, name: impl Into<Cow<'static, str>>) -> P50LatencyMetric {
65        let value = P50LatencyMetric::default();
66        self.build(MetricValue::Custom {
67            name: name.into(),
68            value: Arc::new(value.clone()),
69        });
70        value
71    }
72
73    fn p75_latency(self, name: impl Into<Cow<'static, str>>) -> P75LatencyMetric {
74        let value = P75LatencyMetric::default();
75        self.build(MetricValue::Custom {
76            name: name.into(),
77            value: Arc::new(value.clone()),
78        });
79        value
80    }
81
82    fn p95_latency(self, name: impl Into<Cow<'static, str>>) -> P95LatencyMetric {
83        let value = P95LatencyMetric::default();
84        self.build(MetricValue::Custom {
85            name: name.into(),
86            value: Arc::new(value.clone()),
87        });
88        value
89    }
90
91    fn p99_latency(self, name: impl Into<Cow<'static, str>>) -> P99LatencyMetric {
92        let value = P99LatencyMetric::default();
93        self.build(MetricValue::Custom {
94            name: name.into(),
95            value: Arc::new(value.clone()),
96        });
97        value
98    }
99}
100
101#[derive(Debug, Clone)]
102pub struct MinLatencyMetric {
103    nanos: Arc<AtomicUsize>,
104}
105
106impl Default for MinLatencyMetric {
107    fn default() -> Self {
108        Self {
109            nanos: Arc::new(AtomicUsize::new(usize::MAX)),
110        }
111    }
112}
113
114impl MinLatencyMetric {
115    pub fn from_nanos(nanos: usize) -> Self {
116        Self {
117            nanos: Arc::new(AtomicUsize::new(nanos)),
118        }
119    }
120
121    pub fn value(&self) -> usize {
122        self.nanos.load(Relaxed)
123    }
124
125    pub fn add_elapsed(&self, start: Instant) {
126        self.add_duration(start.elapsed());
127    }
128
129    pub fn add_duration(&self, duration: Duration) {
130        let more_nanos = duration.as_nanos() as usize;
131        self.nanos.fetch_min(more_nanos.max(1), Relaxed);
132    }
133}
134
135impl Display for MinLatencyMetric {
136    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
137        match self.value() {
138            usize::MAX => write!(f, "0ns"),
139            v => write!(f, "{}", human_readable_duration(v as u64)),
140        }
141    }
142}
143
144impl CustomMetricValue for MinLatencyMetric {
145    fn new_empty(&self) -> Arc<dyn CustomMetricValue> {
146        Arc::new(MinLatencyMetric::default())
147    }
148
149    fn aggregate(&self, other: Arc<dyn CustomMetricValue + 'static>) {
150        let Some(other) = other.as_any().downcast_ref::<Self>() else {
151            return;
152        };
153        self.nanos.fetch_min(other.nanos.load(Relaxed), Relaxed);
154    }
155
156    fn as_any(&self) -> &dyn Any {
157        self
158    }
159
160    fn as_usize(&self) -> usize {
161        self.value()
162    }
163
164    fn is_eq(&self, other: &Arc<dyn CustomMetricValue>) -> bool {
165        let Some(other) = other.as_any().downcast_ref::<Self>() else {
166            return false;
167        };
168        other.value() == self.value()
169    }
170}
171
172#[derive(Debug, Clone, Default)]
173pub struct MaxLatencyMetric {
174    nanos: Arc<AtomicUsize>,
175}
176
177impl MaxLatencyMetric {
178    pub fn from_nanos(nanos: usize) -> Self {
179        Self {
180            nanos: Arc::new(AtomicUsize::new(nanos)),
181        }
182    }
183
184    pub fn value(&self) -> usize {
185        self.nanos.load(Relaxed)
186    }
187
188    pub fn add_elapsed(&self, start: Instant) {
189        self.add_duration(start.elapsed());
190    }
191
192    pub fn add_duration(&self, duration: Duration) {
193        let more_nanos = duration.as_nanos() as usize;
194        self.nanos.fetch_max(more_nanos.max(1), Relaxed);
195    }
196}
197
198impl Display for MaxLatencyMetric {
199    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
200        write!(f, "{}", human_readable_duration(self.value() as u64))
201    }
202}
203
204impl CustomMetricValue for MaxLatencyMetric {
205    fn new_empty(&self) -> Arc<dyn CustomMetricValue> {
206        Arc::new(MaxLatencyMetric::default())
207    }
208
209    fn aggregate(&self, other: Arc<dyn CustomMetricValue + 'static>) {
210        let Some(other) = other.as_any().downcast_ref::<Self>() else {
211            return;
212        };
213        self.nanos.fetch_max(other.nanos.load(Relaxed), Relaxed);
214    }
215
216    fn as_any(&self) -> &dyn Any {
217        self
218    }
219
220    fn as_usize(&self) -> usize {
221        self.value()
222    }
223
224    fn is_eq(&self, other: &Arc<dyn CustomMetricValue>) -> bool {
225        let Some(other) = other.as_any().downcast_ref::<Self>() else {
226            return false;
227        };
228        other.value() == self.value()
229    }
230}
231
232#[derive(Debug, Clone, Default)]
233pub struct AvgLatencyMetric {
234    nanos_sum: Arc<AtomicUsize>,
235    count: Arc<AtomicUsize>,
236}
237
238impl AvgLatencyMetric {
239    pub(crate) fn from_raw(nanos_sum: usize, count: usize) -> Self {
240        Self {
241            nanos_sum: Arc::new(AtomicUsize::new(nanos_sum)),
242            count: Arc::new(AtomicUsize::new(count)),
243        }
244    }
245
246    pub fn value(&self) -> usize {
247        self.nanos_sum.load(Relaxed) / self.count.load(Relaxed).max(1)
248    }
249
250    pub(crate) fn nanos_sum(&self) -> usize {
251        self.nanos_sum.load(Relaxed)
252    }
253
254    pub(crate) fn count(&self) -> usize {
255        self.count.load(Relaxed)
256    }
257
258    pub fn add_elapsed(&self, start: Instant) {
259        self.add_duration(start.elapsed());
260    }
261
262    pub fn add_duration(&self, duration: Duration) {
263        let more_nanos = duration.as_nanos() as usize;
264        self.nanos_sum.fetch_add(more_nanos.max(1), Relaxed);
265        self.count.fetch_add(1, Relaxed);
266    }
267}
268
269impl Display for AvgLatencyMetric {
270    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
271        write!(f, "{}", human_readable_duration(self.value() as u64))
272    }
273}
274
275impl CustomMetricValue for AvgLatencyMetric {
276    fn new_empty(&self) -> Arc<dyn CustomMetricValue> {
277        Arc::new(AvgLatencyMetric::default())
278    }
279
280    fn aggregate(&self, other: Arc<dyn CustomMetricValue + 'static>) {
281        let Some(other) = other.as_any().downcast_ref::<Self>() else {
282            return;
283        };
284        self.nanos_sum
285            .fetch_add(other.nanos_sum.load(Relaxed), Relaxed);
286        self.count.fetch_add(other.count.load(Relaxed), Relaxed);
287    }
288
289    fn as_any(&self) -> &dyn Any {
290        self
291    }
292
293    fn as_usize(&self) -> usize {
294        self.value()
295    }
296
297    fn is_eq(&self, other: &Arc<dyn CustomMetricValue>) -> bool {
298        let Some(other) = other.as_any().downcast_ref::<Self>() else {
299            return false;
300        };
301        other.value() == self.value()
302    }
303}
304
305/// A latency metric that captures only the first recorded value, ignoring all subsequent ones.
306/// Uses 0 as the unset sentinel (valid durations are clamped to at least 1 nanosecond).
307#[derive(Debug, Clone, Default)]
308pub struct FirstLatencyMetric {
309    nanos: Arc<AtomicUsize>,
310}
311
312impl FirstLatencyMetric {
313    pub fn from_nanos(nanos: usize) -> Self {
314        Self {
315            nanos: Arc::new(AtomicUsize::new(nanos)),
316        }
317    }
318
319    pub fn value(&self) -> usize {
320        self.nanos.load(Relaxed)
321    }
322
323    pub fn add_elapsed(&self, start: Instant) {
324        self.add_duration(start.elapsed());
325    }
326
327    pub fn add_duration(&self, duration: Duration) {
328        let nanos = duration.as_nanos() as usize;
329        // compare_exchange: only set if still at the sentinel value (0).
330        let _ = self
331            .nanos
332            .compare_exchange(0, nanos.max(1), Relaxed, Relaxed);
333    }
334}
335
336impl Display for FirstLatencyMetric {
337    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
338        write!(f, "{}", human_readable_duration(self.value() as u64))
339    }
340}
341
342impl CustomMetricValue for FirstLatencyMetric {
343    fn new_empty(&self) -> Arc<dyn CustomMetricValue> {
344        Arc::new(FirstLatencyMetric::default())
345    }
346
347    fn aggregate(&self, other: Arc<dyn CustomMetricValue + 'static>) {
348        let Some(other) = other.as_any().downcast_ref::<Self>() else {
349            return;
350        };
351        // Keep self's value if already set, otherwise take other's.
352        let _ = self
353            .nanos
354            .compare_exchange(0, other.nanos.load(Relaxed), Relaxed, Relaxed);
355    }
356
357    fn as_any(&self) -> &dyn Any {
358        self
359    }
360
361    fn as_usize(&self) -> usize {
362        self.value()
363    }
364
365    fn is_eq(&self, other: &Arc<dyn CustomMetricValue>) -> bool {
366        let Some(other) = other.as_any().downcast_ref::<Self>() else {
367            return false;
368        };
369        other.value() == self.value()
370    }
371}
372
373macro_rules! percentile_latency_metric {
374    ($name:ident, $percentile:expr) => {
375        #[derive(Clone)]
376        pub struct $name {
377            inner: Arc<Mutex<DDSketch>>,
378        }
379
380        impl std::fmt::Debug for $name {
381            fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
382                f.debug_struct(stringify!($name))
383                    .field("count", &self.count())
384                    .finish()
385            }
386        }
387
388        impl Default for $name {
389            fn default() -> Self {
390                Self {
391                    inner: Arc::new(Mutex::new(DDSketch::new(Config::defaults()))),
392                }
393            }
394        }
395
396        impl $name {
397            pub(crate) fn from_sketch(sketch: DDSketch) -> Self {
398                Self {
399                    inner: Arc::new(Mutex::new(sketch)),
400                }
401            }
402
403            pub fn value(&self) -> usize {
404                let sketch = self.inner.lock().unwrap();
405                sketch.quantile($percentile).unwrap_or(None).unwrap_or(0.0) as usize
406            }
407
408            pub(crate) fn serialize_sketch(&self) -> Result<Vec<u8>> {
409                let sketch = self.inner.lock().unwrap();
410                bincode::serialize(&*sketch).map_err(|e| {
411                    datafusion::error::DataFusionError::Internal(format!(
412                        "failed to serialize DDSketch: {e}"
413                    ))
414                })
415            }
416
417            pub(crate) fn count(&self) -> usize {
418                let sketch = self.inner.lock().unwrap();
419                sketch.count() as usize
420            }
421
422            pub fn add_elapsed(&self, start: Instant) {
423                self.add_duration(start.elapsed());
424            }
425
426            pub fn add_duration(&self, duration: Duration) {
427                let nanos = (duration.as_nanos() as usize).max(1) as f64;
428                self.inner.lock().unwrap().add(nanos);
429            }
430        }
431
432        impl Display for $name {
433            fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
434                write!(f, "{}", human_readable_duration(self.value() as u64))
435            }
436        }
437
438        impl CustomMetricValue for $name {
439            fn new_empty(&self) -> Arc<dyn CustomMetricValue> {
440                Arc::new($name::default())
441            }
442
443            fn aggregate(&self, other: Arc<dyn CustomMetricValue + 'static>) {
444                let Some(other) = other.as_any().downcast_ref::<Self>() else {
445                    return;
446                };
447                let other_sketch = other.inner.lock().unwrap();
448                let _ = self.inner.lock().unwrap().merge(&other_sketch);
449            }
450
451            fn as_any(&self) -> &dyn Any {
452                self
453            }
454
455            fn as_usize(&self) -> usize {
456                self.value()
457            }
458
459            fn is_eq(&self, other: &Arc<dyn CustomMetricValue>) -> bool {
460                let Some(other) = other.as_any().downcast_ref::<Self>() else {
461                    return false;
462                };
463                other.value() == self.value()
464            }
465        }
466    };
467}
468
469percentile_latency_metric!(P50LatencyMetric, 0.50);
470percentile_latency_metric!(P75LatencyMetric, 0.75);
471percentile_latency_metric!(P95LatencyMetric, 0.95);
472percentile_latency_metric!(P99LatencyMetric, 0.99);
473
474#[cfg(test)]
475mod tests {
476    use super::*;
477
478    #[test]
479    fn min_latency_tracks_minimum_and_aggregates() {
480        let m = MinLatencyMetric::default();
481        assert_eq!(m.value(), usize::MAX);
482        m.add_duration(Duration::from_millis(100));
483        m.add_duration(Duration::from_millis(50));
484        m.add_duration(Duration::from_millis(200));
485        assert_eq!(m.value(), Duration::from_millis(50).as_nanos() as usize);
486
487        let other = MinLatencyMetric::default();
488        other.add_duration(Duration::from_millis(10));
489        m.aggregate(Arc::new(other));
490        assert_eq!(m.value(), Duration::from_millis(10).as_nanos() as usize);
491    }
492
493    #[test]
494    fn max_latency_tracks_maximum_and_aggregates() {
495        let m = MaxLatencyMetric::default();
496        assert_eq!(m.value(), 0);
497        m.add_duration(Duration::from_millis(100));
498        m.add_duration(Duration::from_millis(200));
499        m.add_duration(Duration::from_millis(50));
500        assert_eq!(m.value(), Duration::from_millis(200).as_nanos() as usize);
501
502        let other = MaxLatencyMetric::default();
503        other.add_duration(Duration::from_millis(500));
504        m.aggregate(Arc::new(other));
505        assert_eq!(m.value(), Duration::from_millis(500).as_nanos() as usize);
506    }
507
508    #[test]
509    fn avg_latency_computes_average_and_aggregates() {
510        let m = AvgLatencyMetric::default();
511        assert_eq!(m.value(), 0);
512        m.add_duration(Duration::from_millis(100));
513        m.add_duration(Duration::from_millis(200));
514        assert_eq!(m.value(), Duration::from_millis(150).as_nanos() as usize);
515
516        let other = AvgLatencyMetric::default();
517        other.add_duration(Duration::from_millis(300));
518        m.aggregate(Arc::new(other));
519        // sum=600ms, count=3 -> avg=200ms
520        assert_eq!(m.value(), Duration::from_millis(200).as_nanos() as usize);
521    }
522
523    #[test]
524    fn first_latency_captures_first_value_and_aggregates() {
525        let m = FirstLatencyMetric::default();
526        assert_eq!(m.value(), 0);
527        m.add_duration(Duration::from_millis(100));
528        m.add_duration(Duration::from_millis(200));
529        assert_eq!(m.value(), Duration::from_millis(100).as_nanos() as usize);
530
531        // Aggregate keeps self's value when already set.
532        let other = FirstLatencyMetric::default();
533        other.add_duration(Duration::from_millis(50));
534        m.aggregate(Arc::new(other));
535        assert_eq!(m.value(), Duration::from_millis(100).as_nanos() as usize);
536
537        // Aggregate takes other's value when self is unset.
538        let unset = FirstLatencyMetric::default();
539        let other2 = FirstLatencyMetric::default();
540        other2.add_duration(Duration::from_millis(77));
541        unset.aggregate(Arc::new(other2));
542        assert_eq!(unset.value(), Duration::from_millis(77).as_nanos() as usize);
543    }
544
545    #[test]
546    fn p50_latency_returns_median() {
547        let m = P50LatencyMetric::default();
548        assert_eq!(m.value(), 0);
549        // Add 100 samples: 50 at 1ms, 50 at 100ms
550        for _ in 0..50 {
551            m.add_duration(Duration::from_millis(1));
552        }
553        for _ in 0..50 {
554            m.add_duration(Duration::from_millis(100));
555        }
556        // p50 should be near 1ms (DDSketch gives approximate quantiles)
557        let val = m.value();
558        assert!(val < Duration::from_millis(2).as_nanos() as usize);
559    }
560
561    #[test]
562    fn p99_latency_returns_high_value() {
563        let m = P99LatencyMetric::default();
564        // Add 98 samples at 1ms and 2 samples at 100ms
565        for _ in 0..98 {
566            m.add_duration(Duration::from_millis(1));
567        }
568        m.add_duration(Duration::from_millis(100));
569        m.add_duration(Duration::from_millis(100));
570        // p99 should be near 100ms
571        let val = m.value();
572        assert!(val >= Duration::from_millis(50).as_nanos() as usize);
573    }
574
575    #[test]
576    fn percentile_latency_aggregates() {
577        let m1 = P75LatencyMetric::default();
578        let m2 = P75LatencyMetric::default();
579        for _ in 0..75 {
580            m1.add_duration(Duration::from_millis(1));
581        }
582        for _ in 0..25 {
583            m2.add_duration(Duration::from_millis(100));
584        }
585        m1.aggregate(Arc::new(m2));
586        // After aggregation: 75 at 1ms, 25 at 100ms. p75 should be near 1ms.
587        let val = m1.value();
588        assert!(val < Duration::from_millis(2).as_nanos() as usize);
589    }
590
591    #[test]
592    fn zero_duration_clamped_to_one_nano() {
593        let min = MinLatencyMetric::default();
594        min.add_duration(Duration::ZERO);
595        assert_eq!(min.value(), 1);
596
597        let max = MaxLatencyMetric::default();
598        max.add_duration(Duration::ZERO);
599        assert_eq!(max.value(), 1);
600
601        let avg = AvgLatencyMetric::default();
602        avg.add_duration(Duration::ZERO);
603        assert_eq!(avg.value(), 1);
604
605        let first = FirstLatencyMetric::default();
606        first.add_duration(Duration::ZERO);
607        assert_eq!(first.value(), 1);
608    }
609}