Skip to main content

datafusion_distributed/metrics/
latency_metric.rs

1use datafusion::common::instant::Instant;
2use datafusion::common::{Result, human_readable_duration};
3use datafusion::physical_expr_common::metrics::{MetricBuilder, MetricValue};
4use datafusion::physical_plan::metrics::CustomMetricValue;
5use sketches_ddsketch::{Config, DDSketch};
6use std::any::Any;
7use std::borrow::Cow;
8use std::fmt::{Display, Formatter};
9use std::sync::Arc;
10use std::sync::Mutex;
11use std::sync::atomic::AtomicUsize;
12use std::sync::atomic::Ordering::Relaxed;
13use std::time::Duration;
14
15/// Extension trait for DataFusion's metric system that adds support for latency related metrics.
16pub trait LatencyMetricExt {
17    fn min_latency(self, name: impl Into<Cow<'static, str>>) -> MinLatencyMetric;
18    fn max_latency(self, name: impl Into<Cow<'static, str>>) -> MaxLatencyMetric;
19    fn avg_latency(self, name: impl Into<Cow<'static, str>>) -> AvgLatencyMetric;
20    fn first_latency(self, name: impl Into<Cow<'static, str>>) -> FirstLatencyMetric;
21    fn p50_latency(self, name: impl Into<Cow<'static, str>>) -> P50LatencyMetric;
22    fn p75_latency(self, name: impl Into<Cow<'static, str>>) -> P75LatencyMetric;
23    fn p95_latency(self, name: impl Into<Cow<'static, str>>) -> P95LatencyMetric;
24    fn p99_latency(self, name: impl Into<Cow<'static, str>>) -> P99LatencyMetric;
25}
26
27impl LatencyMetricExt for MetricBuilder<'_> {
28    fn min_latency(self, name: impl Into<Cow<'static, str>>) -> MinLatencyMetric {
29        let value = MinLatencyMetric::default();
30        self.build(MetricValue::Custom {
31            name: name.into(),
32            value: Arc::new(value.clone()),
33        });
34        value
35    }
36
37    fn max_latency(self, name: impl Into<Cow<'static, str>>) -> MaxLatencyMetric {
38        let value = MaxLatencyMetric::default();
39        self.build(MetricValue::Custom {
40            name: name.into(),
41            value: Arc::new(value.clone()),
42        });
43        value
44    }
45
46    fn avg_latency(self, name: impl Into<Cow<'static, str>>) -> AvgLatencyMetric {
47        let value = AvgLatencyMetric::default();
48        self.build(MetricValue::Custom {
49            name: name.into(),
50            value: Arc::new(value.clone()),
51        });
52        value
53    }
54
55    fn first_latency(self, name: impl Into<Cow<'static, str>>) -> FirstLatencyMetric {
56        let value = FirstLatencyMetric::default();
57        self.build(MetricValue::Custom {
58            name: name.into(),
59            value: Arc::new(value.clone()),
60        });
61        value
62    }
63
64    fn p50_latency(self, name: impl Into<Cow<'static, str>>) -> P50LatencyMetric {
65        let value = P50LatencyMetric::default();
66        self.build(MetricValue::Custom {
67            name: name.into(),
68            value: Arc::new(value.clone()),
69        });
70        value
71    }
72
73    fn p75_latency(self, name: impl Into<Cow<'static, str>>) -> P75LatencyMetric {
74        let value = P75LatencyMetric::default();
75        self.build(MetricValue::Custom {
76            name: name.into(),
77            value: Arc::new(value.clone()),
78        });
79        value
80    }
81
82    fn p95_latency(self, name: impl Into<Cow<'static, str>>) -> P95LatencyMetric {
83        let value = P95LatencyMetric::default();
84        self.build(MetricValue::Custom {
85            name: name.into(),
86            value: Arc::new(value.clone()),
87        });
88        value
89    }
90
91    fn p99_latency(self, name: impl Into<Cow<'static, str>>) -> P99LatencyMetric {
92        let value = P99LatencyMetric::default();
93        self.build(MetricValue::Custom {
94            name: name.into(),
95            value: Arc::new(value.clone()),
96        });
97        value
98    }
99}
100
101#[derive(Debug, Clone)]
102pub struct MinLatencyMetric {
103    nanos: Arc<AtomicUsize>,
104}
105
106impl Default for MinLatencyMetric {
107    fn default() -> Self {
108        Self {
109            nanos: Arc::new(AtomicUsize::new(usize::MAX)),
110        }
111    }
112}
113
114impl MinLatencyMetric {
115    pub fn from_nanos(nanos: usize) -> Self {
116        Self {
117            nanos: Arc::new(AtomicUsize::new(nanos)),
118        }
119    }
120
121    pub fn value(&self) -> usize {
122        self.nanos.load(Relaxed)
123    }
124
125    pub fn add_elapsed(&self, start: Instant) {
126        self.add_duration(start.elapsed());
127    }
128
129    pub fn add_duration(&self, duration: Duration) {
130        let more_nanos = duration.as_nanos() as usize;
131        self.add_nanos(more_nanos);
132    }
133
134    pub fn add_nanos(&self, nanos: usize) {
135        self.nanos.fetch_min(nanos.max(1), Relaxed);
136    }
137}
138
139impl Display for MinLatencyMetric {
140    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
141        match self.value() {
142            usize::MAX => write!(f, "0ns"),
143            v => write!(f, "{}", human_readable_duration(v as u64)),
144        }
145    }
146}
147
148impl CustomMetricValue for MinLatencyMetric {
149    fn new_empty(&self) -> Arc<dyn CustomMetricValue> {
150        Arc::new(MinLatencyMetric::default())
151    }
152
153    fn aggregate(&self, other: Arc<dyn CustomMetricValue + 'static>) {
154        let Some(other) = other.as_any().downcast_ref::<Self>() else {
155            return;
156        };
157        self.nanos.fetch_min(other.nanos.load(Relaxed), Relaxed);
158    }
159
160    fn as_any(&self) -> &dyn Any {
161        self
162    }
163
164    fn as_usize(&self) -> usize {
165        self.value()
166    }
167
168    fn is_eq(&self, other: &Arc<dyn CustomMetricValue>) -> bool {
169        let Some(other) = other.as_any().downcast_ref::<Self>() else {
170            return false;
171        };
172        other.value() == self.value()
173    }
174}
175
176#[derive(Debug, Clone, Default)]
177pub struct MaxLatencyMetric {
178    nanos: Arc<AtomicUsize>,
179}
180
181impl MaxLatencyMetric {
182    pub fn from_nanos(nanos: usize) -> Self {
183        Self {
184            nanos: Arc::new(AtomicUsize::new(nanos)),
185        }
186    }
187
188    pub fn value(&self) -> usize {
189        self.nanos.load(Relaxed)
190    }
191
192    pub fn add_elapsed(&self, start: Instant) {
193        self.add_duration(start.elapsed());
194    }
195
196    pub fn add_duration(&self, duration: Duration) {
197        let more_nanos = duration.as_nanos() as usize;
198        self.add_nanos(more_nanos);
199    }
200
201    pub fn add_nanos(&self, nanos: usize) {
202        self.nanos.fetch_max(nanos.max(1), Relaxed);
203    }
204}
205
206impl Display for MaxLatencyMetric {
207    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
208        write!(f, "{}", human_readable_duration(self.value() as u64))
209    }
210}
211
212impl CustomMetricValue for MaxLatencyMetric {
213    fn new_empty(&self) -> Arc<dyn CustomMetricValue> {
214        Arc::new(MaxLatencyMetric::default())
215    }
216
217    fn aggregate(&self, other: Arc<dyn CustomMetricValue + 'static>) {
218        let Some(other) = other.as_any().downcast_ref::<Self>() else {
219            return;
220        };
221        self.nanos.fetch_max(other.nanos.load(Relaxed), Relaxed);
222    }
223
224    fn as_any(&self) -> &dyn Any {
225        self
226    }
227
228    fn as_usize(&self) -> usize {
229        self.value()
230    }
231
232    fn is_eq(&self, other: &Arc<dyn CustomMetricValue>) -> bool {
233        let Some(other) = other.as_any().downcast_ref::<Self>() else {
234            return false;
235        };
236        other.value() == self.value()
237    }
238}
239
240#[derive(Debug, Clone, Default)]
241pub struct AvgLatencyMetric {
242    nanos_sum: Arc<AtomicUsize>,
243    count: Arc<AtomicUsize>,
244}
245
246impl AvgLatencyMetric {
247    pub(crate) fn from_raw(nanos_sum: usize, count: usize) -> Self {
248        Self {
249            nanos_sum: Arc::new(AtomicUsize::new(nanos_sum)),
250            count: Arc::new(AtomicUsize::new(count)),
251        }
252    }
253
254    pub fn value(&self) -> usize {
255        self.nanos_sum.load(Relaxed) / self.count.load(Relaxed).max(1)
256    }
257
258    pub(crate) fn nanos_sum(&self) -> usize {
259        self.nanos_sum.load(Relaxed)
260    }
261
262    pub(crate) fn count(&self) -> usize {
263        self.count.load(Relaxed)
264    }
265
266    pub fn add_elapsed(&self, start: Instant) {
267        self.add_duration(start.elapsed());
268    }
269
270    pub fn add_duration(&self, duration: Duration) {
271        let more_nanos = duration.as_nanos() as usize;
272        self.add_nanos(more_nanos);
273    }
274
275    pub fn add_nanos(&self, nanos: usize) {
276        self.nanos_sum.fetch_add(nanos.max(1), Relaxed);
277        self.count.fetch_add(1, Relaxed);
278    }
279}
280
281impl Display for AvgLatencyMetric {
282    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
283        write!(f, "{}", human_readable_duration(self.value() as u64))
284    }
285}
286
287impl CustomMetricValue for AvgLatencyMetric {
288    fn new_empty(&self) -> Arc<dyn CustomMetricValue> {
289        Arc::new(AvgLatencyMetric::default())
290    }
291
292    fn aggregate(&self, other: Arc<dyn CustomMetricValue + 'static>) {
293        let Some(other) = other.as_any().downcast_ref::<Self>() else {
294            return;
295        };
296        self.nanos_sum
297            .fetch_add(other.nanos_sum.load(Relaxed), Relaxed);
298        self.count.fetch_add(other.count.load(Relaxed), Relaxed);
299    }
300
301    fn as_any(&self) -> &dyn Any {
302        self
303    }
304
305    fn as_usize(&self) -> usize {
306        self.value()
307    }
308
309    fn is_eq(&self, other: &Arc<dyn CustomMetricValue>) -> bool {
310        let Some(other) = other.as_any().downcast_ref::<Self>() else {
311            return false;
312        };
313        other.value() == self.value()
314    }
315}
316
317/// A latency metric that captures only the first recorded value, ignoring all subsequent ones.
318/// Uses 0 as the unset sentinel (valid durations are clamped to at least 1 nanosecond).
319#[derive(Debug, Clone, Default)]
320pub struct FirstLatencyMetric {
321    nanos: Arc<AtomicUsize>,
322}
323
324impl FirstLatencyMetric {
325    pub fn from_nanos(nanos: usize) -> Self {
326        Self {
327            nanos: Arc::new(AtomicUsize::new(nanos)),
328        }
329    }
330
331    pub fn value(&self) -> usize {
332        self.nanos.load(Relaxed)
333    }
334
335    pub fn add_elapsed(&self, start: Instant) {
336        self.add_duration(start.elapsed());
337    }
338
339    pub fn add_duration(&self, duration: Duration) {
340        let nanos = duration.as_nanos() as usize;
341        self.add_nanos(nanos);
342    }
343
344    pub fn add_nanos(&self, nanos: usize) {
345        // compare_exchange: only set if still at the sentinel value (0).
346        let _ = self
347            .nanos
348            .compare_exchange(0, nanos.max(1), Relaxed, Relaxed);
349    }
350}
351
352impl Display for FirstLatencyMetric {
353    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
354        write!(f, "{}", human_readable_duration(self.value() as u64))
355    }
356}
357
358impl CustomMetricValue for FirstLatencyMetric {
359    fn new_empty(&self) -> Arc<dyn CustomMetricValue> {
360        Arc::new(FirstLatencyMetric::default())
361    }
362
363    fn aggregate(&self, other: Arc<dyn CustomMetricValue + 'static>) {
364        let Some(other) = other.as_any().downcast_ref::<Self>() else {
365            return;
366        };
367        // Keep self's value if already set, otherwise take other's.
368        let _ = self
369            .nanos
370            .compare_exchange(0, other.nanos.load(Relaxed), Relaxed, Relaxed);
371    }
372
373    fn as_any(&self) -> &dyn Any {
374        self
375    }
376
377    fn as_usize(&self) -> usize {
378        self.value()
379    }
380
381    fn is_eq(&self, other: &Arc<dyn CustomMetricValue>) -> bool {
382        let Some(other) = other.as_any().downcast_ref::<Self>() else {
383            return false;
384        };
385        other.value() == self.value()
386    }
387}
388
389macro_rules! percentile_latency_metric {
390    ($name:ident, $percentile:expr) => {
391        #[derive(Clone)]
392        pub struct $name {
393            inner: Arc<Mutex<DDSketch>>,
394        }
395
396        impl std::fmt::Debug for $name {
397            fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
398                f.debug_struct(stringify!($name))
399                    .field("count", &self.count())
400                    .finish()
401            }
402        }
403
404        impl Default for $name {
405            fn default() -> Self {
406                Self {
407                    inner: Arc::new(Mutex::new(DDSketch::new(Config::defaults()))),
408                }
409            }
410        }
411
412        impl $name {
413            pub(crate) fn from_sketch(sketch: DDSketch) -> Self {
414                Self {
415                    inner: Arc::new(Mutex::new(sketch)),
416                }
417            }
418
419            pub fn value(&self) -> usize {
420                let sketch = self.inner.lock().unwrap();
421                sketch.quantile($percentile).unwrap_or(None).unwrap_or(0.0) as usize
422            }
423
424            pub(crate) fn serialize_sketch(&self) -> Result<Vec<u8>> {
425                let sketch = self.inner.lock().unwrap();
426                bincode::serialize(&*sketch).map_err(|e| {
427                    datafusion::error::DataFusionError::Internal(format!(
428                        "failed to serialize DDSketch: {e}"
429                    ))
430                })
431            }
432
433            pub(crate) fn count(&self) -> usize {
434                let sketch = self.inner.lock().unwrap();
435                sketch.count() as usize
436            }
437
438            pub fn add_elapsed(&self, start: Instant) {
439                self.add_duration(start.elapsed());
440            }
441
442            pub fn add_duration(&self, duration: Duration) {
443                let nanos = (duration.as_nanos() as usize).max(1);
444                self.add_nanos(nanos);
445            }
446
447            pub fn add_nanos(&self, nanos: usize) {
448                self.inner.lock().unwrap().add(nanos as f64);
449            }
450        }
451
452        impl Display for $name {
453            fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
454                write!(f, "{}", human_readable_duration(self.value() as u64))
455            }
456        }
457
458        impl CustomMetricValue for $name {
459            fn new_empty(&self) -> Arc<dyn CustomMetricValue> {
460                Arc::new($name::default())
461            }
462
463            fn aggregate(&self, other: Arc<dyn CustomMetricValue + 'static>) {
464                let Some(other) = other.as_any().downcast_ref::<Self>() else {
465                    return;
466                };
467                let other_sketch = other.inner.lock().unwrap();
468                let _ = self.inner.lock().unwrap().merge(&other_sketch);
469            }
470
471            fn as_any(&self) -> &dyn Any {
472                self
473            }
474
475            fn as_usize(&self) -> usize {
476                self.value()
477            }
478
479            fn is_eq(&self, other: &Arc<dyn CustomMetricValue>) -> bool {
480                let Some(other) = other.as_any().downcast_ref::<Self>() else {
481                    return false;
482                };
483                other.value() == self.value()
484            }
485        }
486    };
487}
488
489percentile_latency_metric!(P50LatencyMetric, 0.50);
490percentile_latency_metric!(P75LatencyMetric, 0.75);
491percentile_latency_metric!(P95LatencyMetric, 0.95);
492percentile_latency_metric!(P99LatencyMetric, 0.99);
493
494#[cfg(test)]
495mod tests {
496    use super::*;
497
498    #[test]
499    fn min_latency_tracks_minimum_and_aggregates() {
500        let m = MinLatencyMetric::default();
501        assert_eq!(m.value(), usize::MAX);
502        m.add_duration(Duration::from_millis(100));
503        m.add_duration(Duration::from_millis(50));
504        m.add_duration(Duration::from_millis(200));
505        assert_eq!(m.value(), Duration::from_millis(50).as_nanos() as usize);
506
507        let other = MinLatencyMetric::default();
508        other.add_duration(Duration::from_millis(10));
509        m.aggregate(Arc::new(other));
510        assert_eq!(m.value(), Duration::from_millis(10).as_nanos() as usize);
511    }
512
513    #[test]
514    fn max_latency_tracks_maximum_and_aggregates() {
515        let m = MaxLatencyMetric::default();
516        assert_eq!(m.value(), 0);
517        m.add_duration(Duration::from_millis(100));
518        m.add_duration(Duration::from_millis(200));
519        m.add_duration(Duration::from_millis(50));
520        assert_eq!(m.value(), Duration::from_millis(200).as_nanos() as usize);
521
522        let other = MaxLatencyMetric::default();
523        other.add_duration(Duration::from_millis(500));
524        m.aggregate(Arc::new(other));
525        assert_eq!(m.value(), Duration::from_millis(500).as_nanos() as usize);
526    }
527
528    #[test]
529    fn avg_latency_computes_average_and_aggregates() {
530        let m = AvgLatencyMetric::default();
531        assert_eq!(m.value(), 0);
532        m.add_duration(Duration::from_millis(100));
533        m.add_duration(Duration::from_millis(200));
534        assert_eq!(m.value(), Duration::from_millis(150).as_nanos() as usize);
535
536        let other = AvgLatencyMetric::default();
537        other.add_duration(Duration::from_millis(300));
538        m.aggregate(Arc::new(other));
539        // sum=600ms, count=3 -> avg=200ms
540        assert_eq!(m.value(), Duration::from_millis(200).as_nanos() as usize);
541    }
542
543    #[test]
544    fn first_latency_captures_first_value_and_aggregates() {
545        let m = FirstLatencyMetric::default();
546        assert_eq!(m.value(), 0);
547        m.add_duration(Duration::from_millis(100));
548        m.add_duration(Duration::from_millis(200));
549        assert_eq!(m.value(), Duration::from_millis(100).as_nanos() as usize);
550
551        // Aggregate keeps self's value when already set.
552        let other = FirstLatencyMetric::default();
553        other.add_duration(Duration::from_millis(50));
554        m.aggregate(Arc::new(other));
555        assert_eq!(m.value(), Duration::from_millis(100).as_nanos() as usize);
556
557        // Aggregate takes other's value when self is unset.
558        let unset = FirstLatencyMetric::default();
559        let other2 = FirstLatencyMetric::default();
560        other2.add_duration(Duration::from_millis(77));
561        unset.aggregate(Arc::new(other2));
562        assert_eq!(unset.value(), Duration::from_millis(77).as_nanos() as usize);
563    }
564
565    #[test]
566    fn p50_latency_returns_median() {
567        let m = P50LatencyMetric::default();
568        assert_eq!(m.value(), 0);
569        // Add 100 samples: 50 at 1ms, 50 at 100ms
570        for _ in 0..50 {
571            m.add_duration(Duration::from_millis(1));
572        }
573        for _ in 0..50 {
574            m.add_duration(Duration::from_millis(100));
575        }
576        // p50 should be near 1ms (DDSketch gives approximate quantiles)
577        let val = m.value();
578        assert!(val < Duration::from_millis(2).as_nanos() as usize);
579    }
580
581    #[test]
582    fn p99_latency_returns_high_value() {
583        let m = P99LatencyMetric::default();
584        // Add 98 samples at 1ms and 2 samples at 100ms
585        for _ in 0..98 {
586            m.add_duration(Duration::from_millis(1));
587        }
588        m.add_duration(Duration::from_millis(100));
589        m.add_duration(Duration::from_millis(100));
590        // p99 should be near 100ms
591        let val = m.value();
592        assert!(val >= Duration::from_millis(50).as_nanos() as usize);
593    }
594
595    #[test]
596    fn percentile_latency_aggregates() {
597        let m1 = P75LatencyMetric::default();
598        let m2 = P75LatencyMetric::default();
599        for _ in 0..75 {
600            m1.add_duration(Duration::from_millis(1));
601        }
602        for _ in 0..25 {
603            m2.add_duration(Duration::from_millis(100));
604        }
605        m1.aggregate(Arc::new(m2));
606        // After aggregation: 75 at 1ms, 25 at 100ms. p75 should be near 1ms.
607        let val = m1.value();
608        assert!(val < Duration::from_millis(2).as_nanos() as usize);
609    }
610
611    #[test]
612    fn zero_duration_clamped_to_one_nano() {
613        let min = MinLatencyMetric::default();
614        min.add_duration(Duration::ZERO);
615        assert_eq!(min.value(), 1);
616
617        let max = MaxLatencyMetric::default();
618        max.add_duration(Duration::ZERO);
619        assert_eq!(max.value(), 1);
620
621        let avg = AvgLatencyMetric::default();
622        avg.add_duration(Duration::ZERO);
623        assert_eq!(avg.value(), 1);
624
625        let first = FirstLatencyMetric::default();
626        first.add_duration(Duration::ZERO);
627        assert_eq!(first.value(), 1);
628    }
629}