datafusion_distributed/metrics/
latency_metric.rs1use datafusion::common::instant::Instant;
2use datafusion::common::{Result, human_readable_duration};
3use datafusion::physical_expr_common::metrics::{MetricBuilder, MetricValue};
4use datafusion::physical_plan::metrics::CustomMetricValue;
5use sketches_ddsketch::{Config, DDSketch};
6use std::any::Any;
7use std::borrow::Cow;
8use std::fmt::{Display, Formatter};
9use std::sync::Arc;
10use std::sync::Mutex;
11use std::sync::atomic::AtomicUsize;
12use std::sync::atomic::Ordering::Relaxed;
13use std::time::Duration;
14
15pub trait LatencyMetricExt {
17 fn min_latency(self, name: impl Into<Cow<'static, str>>) -> MinLatencyMetric;
18 fn max_latency(self, name: impl Into<Cow<'static, str>>) -> MaxLatencyMetric;
19 fn avg_latency(self, name: impl Into<Cow<'static, str>>) -> AvgLatencyMetric;
20 fn first_latency(self, name: impl Into<Cow<'static, str>>) -> FirstLatencyMetric;
21 fn p50_latency(self, name: impl Into<Cow<'static, str>>) -> P50LatencyMetric;
22 fn p75_latency(self, name: impl Into<Cow<'static, str>>) -> P75LatencyMetric;
23 fn p95_latency(self, name: impl Into<Cow<'static, str>>) -> P95LatencyMetric;
24 fn p99_latency(self, name: impl Into<Cow<'static, str>>) -> P99LatencyMetric;
25}
26
27impl LatencyMetricExt for MetricBuilder<'_> {
28 fn min_latency(self, name: impl Into<Cow<'static, str>>) -> MinLatencyMetric {
29 let value = MinLatencyMetric::default();
30 self.build(MetricValue::Custom {
31 name: name.into(),
32 value: Arc::new(value.clone()),
33 });
34 value
35 }
36
37 fn max_latency(self, name: impl Into<Cow<'static, str>>) -> MaxLatencyMetric {
38 let value = MaxLatencyMetric::default();
39 self.build(MetricValue::Custom {
40 name: name.into(),
41 value: Arc::new(value.clone()),
42 });
43 value
44 }
45
46 fn avg_latency(self, name: impl Into<Cow<'static, str>>) -> AvgLatencyMetric {
47 let value = AvgLatencyMetric::default();
48 self.build(MetricValue::Custom {
49 name: name.into(),
50 value: Arc::new(value.clone()),
51 });
52 value
53 }
54
55 fn first_latency(self, name: impl Into<Cow<'static, str>>) -> FirstLatencyMetric {
56 let value = FirstLatencyMetric::default();
57 self.build(MetricValue::Custom {
58 name: name.into(),
59 value: Arc::new(value.clone()),
60 });
61 value
62 }
63
64 fn p50_latency(self, name: impl Into<Cow<'static, str>>) -> P50LatencyMetric {
65 let value = P50LatencyMetric::default();
66 self.build(MetricValue::Custom {
67 name: name.into(),
68 value: Arc::new(value.clone()),
69 });
70 value
71 }
72
73 fn p75_latency(self, name: impl Into<Cow<'static, str>>) -> P75LatencyMetric {
74 let value = P75LatencyMetric::default();
75 self.build(MetricValue::Custom {
76 name: name.into(),
77 value: Arc::new(value.clone()),
78 });
79 value
80 }
81
82 fn p95_latency(self, name: impl Into<Cow<'static, str>>) -> P95LatencyMetric {
83 let value = P95LatencyMetric::default();
84 self.build(MetricValue::Custom {
85 name: name.into(),
86 value: Arc::new(value.clone()),
87 });
88 value
89 }
90
91 fn p99_latency(self, name: impl Into<Cow<'static, str>>) -> P99LatencyMetric {
92 let value = P99LatencyMetric::default();
93 self.build(MetricValue::Custom {
94 name: name.into(),
95 value: Arc::new(value.clone()),
96 });
97 value
98 }
99}
100
101#[derive(Debug, Clone)]
102pub struct MinLatencyMetric {
103 nanos: Arc<AtomicUsize>,
104}
105
106impl Default for MinLatencyMetric {
107 fn default() -> Self {
108 Self {
109 nanos: Arc::new(AtomicUsize::new(usize::MAX)),
110 }
111 }
112}
113
114impl MinLatencyMetric {
115 pub fn from_nanos(nanos: usize) -> Self {
116 Self {
117 nanos: Arc::new(AtomicUsize::new(nanos)),
118 }
119 }
120
121 pub fn value(&self) -> usize {
122 self.nanos.load(Relaxed)
123 }
124
125 pub fn add_elapsed(&self, start: Instant) {
126 self.add_duration(start.elapsed());
127 }
128
129 pub fn add_duration(&self, duration: Duration) {
130 let more_nanos = duration.as_nanos() as usize;
131 self.nanos.fetch_min(more_nanos.max(1), Relaxed);
132 }
133}
134
135impl Display for MinLatencyMetric {
136 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
137 match self.value() {
138 usize::MAX => write!(f, "0ns"),
139 v => write!(f, "{}", human_readable_duration(v as u64)),
140 }
141 }
142}
143
144impl CustomMetricValue for MinLatencyMetric {
145 fn new_empty(&self) -> Arc<dyn CustomMetricValue> {
146 Arc::new(MinLatencyMetric::default())
147 }
148
149 fn aggregate(&self, other: Arc<dyn CustomMetricValue + 'static>) {
150 let Some(other) = other.as_any().downcast_ref::<Self>() else {
151 return;
152 };
153 self.nanos.fetch_min(other.nanos.load(Relaxed), Relaxed);
154 }
155
156 fn as_any(&self) -> &dyn Any {
157 self
158 }
159
160 fn as_usize(&self) -> usize {
161 self.value()
162 }
163
164 fn is_eq(&self, other: &Arc<dyn CustomMetricValue>) -> bool {
165 let Some(other) = other.as_any().downcast_ref::<Self>() else {
166 return false;
167 };
168 other.value() == self.value()
169 }
170}
171
172#[derive(Debug, Clone, Default)]
173pub struct MaxLatencyMetric {
174 nanos: Arc<AtomicUsize>,
175}
176
177impl MaxLatencyMetric {
178 pub fn from_nanos(nanos: usize) -> Self {
179 Self {
180 nanos: Arc::new(AtomicUsize::new(nanos)),
181 }
182 }
183
184 pub fn value(&self) -> usize {
185 self.nanos.load(Relaxed)
186 }
187
188 pub fn add_elapsed(&self, start: Instant) {
189 self.add_duration(start.elapsed());
190 }
191
192 pub fn add_duration(&self, duration: Duration) {
193 let more_nanos = duration.as_nanos() as usize;
194 self.nanos.fetch_max(more_nanos.max(1), Relaxed);
195 }
196}
197
198impl Display for MaxLatencyMetric {
199 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
200 write!(f, "{}", human_readable_duration(self.value() as u64))
201 }
202}
203
204impl CustomMetricValue for MaxLatencyMetric {
205 fn new_empty(&self) -> Arc<dyn CustomMetricValue> {
206 Arc::new(MaxLatencyMetric::default())
207 }
208
209 fn aggregate(&self, other: Arc<dyn CustomMetricValue + 'static>) {
210 let Some(other) = other.as_any().downcast_ref::<Self>() else {
211 return;
212 };
213 self.nanos.fetch_max(other.nanos.load(Relaxed), Relaxed);
214 }
215
216 fn as_any(&self) -> &dyn Any {
217 self
218 }
219
220 fn as_usize(&self) -> usize {
221 self.value()
222 }
223
224 fn is_eq(&self, other: &Arc<dyn CustomMetricValue>) -> bool {
225 let Some(other) = other.as_any().downcast_ref::<Self>() else {
226 return false;
227 };
228 other.value() == self.value()
229 }
230}
231
232#[derive(Debug, Clone, Default)]
233pub struct AvgLatencyMetric {
234 nanos_sum: Arc<AtomicUsize>,
235 count: Arc<AtomicUsize>,
236}
237
238impl AvgLatencyMetric {
239 pub(crate) fn from_raw(nanos_sum: usize, count: usize) -> Self {
240 Self {
241 nanos_sum: Arc::new(AtomicUsize::new(nanos_sum)),
242 count: Arc::new(AtomicUsize::new(count)),
243 }
244 }
245
246 pub fn value(&self) -> usize {
247 self.nanos_sum.load(Relaxed) / self.count.load(Relaxed).max(1)
248 }
249
250 pub(crate) fn nanos_sum(&self) -> usize {
251 self.nanos_sum.load(Relaxed)
252 }
253
254 pub(crate) fn count(&self) -> usize {
255 self.count.load(Relaxed)
256 }
257
258 pub fn add_elapsed(&self, start: Instant) {
259 self.add_duration(start.elapsed());
260 }
261
262 pub fn add_duration(&self, duration: Duration) {
263 let more_nanos = duration.as_nanos() as usize;
264 self.nanos_sum.fetch_add(more_nanos.max(1), Relaxed);
265 self.count.fetch_add(1, Relaxed);
266 }
267}
268
269impl Display for AvgLatencyMetric {
270 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
271 write!(f, "{}", human_readable_duration(self.value() as u64))
272 }
273}
274
275impl CustomMetricValue for AvgLatencyMetric {
276 fn new_empty(&self) -> Arc<dyn CustomMetricValue> {
277 Arc::new(AvgLatencyMetric::default())
278 }
279
280 fn aggregate(&self, other: Arc<dyn CustomMetricValue + 'static>) {
281 let Some(other) = other.as_any().downcast_ref::<Self>() else {
282 return;
283 };
284 self.nanos_sum
285 .fetch_add(other.nanos_sum.load(Relaxed), Relaxed);
286 self.count.fetch_add(other.count.load(Relaxed), Relaxed);
287 }
288
289 fn as_any(&self) -> &dyn Any {
290 self
291 }
292
293 fn as_usize(&self) -> usize {
294 self.value()
295 }
296
297 fn is_eq(&self, other: &Arc<dyn CustomMetricValue>) -> bool {
298 let Some(other) = other.as_any().downcast_ref::<Self>() else {
299 return false;
300 };
301 other.value() == self.value()
302 }
303}
304
305#[derive(Debug, Clone, Default)]
308pub struct FirstLatencyMetric {
309 nanos: Arc<AtomicUsize>,
310}
311
312impl FirstLatencyMetric {
313 pub fn from_nanos(nanos: usize) -> Self {
314 Self {
315 nanos: Arc::new(AtomicUsize::new(nanos)),
316 }
317 }
318
319 pub fn value(&self) -> usize {
320 self.nanos.load(Relaxed)
321 }
322
323 pub fn add_elapsed(&self, start: Instant) {
324 self.add_duration(start.elapsed());
325 }
326
327 pub fn add_duration(&self, duration: Duration) {
328 let nanos = duration.as_nanos() as usize;
329 let _ = self
331 .nanos
332 .compare_exchange(0, nanos.max(1), Relaxed, Relaxed);
333 }
334}
335
336impl Display for FirstLatencyMetric {
337 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
338 write!(f, "{}", human_readable_duration(self.value() as u64))
339 }
340}
341
342impl CustomMetricValue for FirstLatencyMetric {
343 fn new_empty(&self) -> Arc<dyn CustomMetricValue> {
344 Arc::new(FirstLatencyMetric::default())
345 }
346
347 fn aggregate(&self, other: Arc<dyn CustomMetricValue + 'static>) {
348 let Some(other) = other.as_any().downcast_ref::<Self>() else {
349 return;
350 };
351 let _ = self
353 .nanos
354 .compare_exchange(0, other.nanos.load(Relaxed), Relaxed, Relaxed);
355 }
356
357 fn as_any(&self) -> &dyn Any {
358 self
359 }
360
361 fn as_usize(&self) -> usize {
362 self.value()
363 }
364
365 fn is_eq(&self, other: &Arc<dyn CustomMetricValue>) -> bool {
366 let Some(other) = other.as_any().downcast_ref::<Self>() else {
367 return false;
368 };
369 other.value() == self.value()
370 }
371}
372
373macro_rules! percentile_latency_metric {
374 ($name:ident, $percentile:expr) => {
375 #[derive(Clone)]
376 pub struct $name {
377 inner: Arc<Mutex<DDSketch>>,
378 }
379
380 impl std::fmt::Debug for $name {
381 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
382 f.debug_struct(stringify!($name))
383 .field("count", &self.count())
384 .finish()
385 }
386 }
387
388 impl Default for $name {
389 fn default() -> Self {
390 Self {
391 inner: Arc::new(Mutex::new(DDSketch::new(Config::defaults()))),
392 }
393 }
394 }
395
396 impl $name {
397 pub(crate) fn from_sketch(sketch: DDSketch) -> Self {
398 Self {
399 inner: Arc::new(Mutex::new(sketch)),
400 }
401 }
402
403 pub fn value(&self) -> usize {
404 let sketch = self.inner.lock().unwrap();
405 sketch.quantile($percentile).unwrap_or(None).unwrap_or(0.0) as usize
406 }
407
408 pub(crate) fn serialize_sketch(&self) -> Result<Vec<u8>> {
409 let sketch = self.inner.lock().unwrap();
410 bincode::serialize(&*sketch).map_err(|e| {
411 datafusion::error::DataFusionError::Internal(format!(
412 "failed to serialize DDSketch: {e}"
413 ))
414 })
415 }
416
417 pub(crate) fn count(&self) -> usize {
418 let sketch = self.inner.lock().unwrap();
419 sketch.count() as usize
420 }
421
422 pub fn add_elapsed(&self, start: Instant) {
423 self.add_duration(start.elapsed());
424 }
425
426 pub fn add_duration(&self, duration: Duration) {
427 let nanos = (duration.as_nanos() as usize).max(1) as f64;
428 self.inner.lock().unwrap().add(nanos);
429 }
430 }
431
432 impl Display for $name {
433 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
434 write!(f, "{}", human_readable_duration(self.value() as u64))
435 }
436 }
437
438 impl CustomMetricValue for $name {
439 fn new_empty(&self) -> Arc<dyn CustomMetricValue> {
440 Arc::new($name::default())
441 }
442
443 fn aggregate(&self, other: Arc<dyn CustomMetricValue + 'static>) {
444 let Some(other) = other.as_any().downcast_ref::<Self>() else {
445 return;
446 };
447 let other_sketch = other.inner.lock().unwrap();
448 let _ = self.inner.lock().unwrap().merge(&other_sketch);
449 }
450
451 fn as_any(&self) -> &dyn Any {
452 self
453 }
454
455 fn as_usize(&self) -> usize {
456 self.value()
457 }
458
459 fn is_eq(&self, other: &Arc<dyn CustomMetricValue>) -> bool {
460 let Some(other) = other.as_any().downcast_ref::<Self>() else {
461 return false;
462 };
463 other.value() == self.value()
464 }
465 }
466 };
467}
468
469percentile_latency_metric!(P50LatencyMetric, 0.50);
470percentile_latency_metric!(P75LatencyMetric, 0.75);
471percentile_latency_metric!(P95LatencyMetric, 0.95);
472percentile_latency_metric!(P99LatencyMetric, 0.99);
473
474#[cfg(test)]
475mod tests {
476 use super::*;
477
478 #[test]
479 fn min_latency_tracks_minimum_and_aggregates() {
480 let m = MinLatencyMetric::default();
481 assert_eq!(m.value(), usize::MAX);
482 m.add_duration(Duration::from_millis(100));
483 m.add_duration(Duration::from_millis(50));
484 m.add_duration(Duration::from_millis(200));
485 assert_eq!(m.value(), Duration::from_millis(50).as_nanos() as usize);
486
487 let other = MinLatencyMetric::default();
488 other.add_duration(Duration::from_millis(10));
489 m.aggregate(Arc::new(other));
490 assert_eq!(m.value(), Duration::from_millis(10).as_nanos() as usize);
491 }
492
493 #[test]
494 fn max_latency_tracks_maximum_and_aggregates() {
495 let m = MaxLatencyMetric::default();
496 assert_eq!(m.value(), 0);
497 m.add_duration(Duration::from_millis(100));
498 m.add_duration(Duration::from_millis(200));
499 m.add_duration(Duration::from_millis(50));
500 assert_eq!(m.value(), Duration::from_millis(200).as_nanos() as usize);
501
502 let other = MaxLatencyMetric::default();
503 other.add_duration(Duration::from_millis(500));
504 m.aggregate(Arc::new(other));
505 assert_eq!(m.value(), Duration::from_millis(500).as_nanos() as usize);
506 }
507
508 #[test]
509 fn avg_latency_computes_average_and_aggregates() {
510 let m = AvgLatencyMetric::default();
511 assert_eq!(m.value(), 0);
512 m.add_duration(Duration::from_millis(100));
513 m.add_duration(Duration::from_millis(200));
514 assert_eq!(m.value(), Duration::from_millis(150).as_nanos() as usize);
515
516 let other = AvgLatencyMetric::default();
517 other.add_duration(Duration::from_millis(300));
518 m.aggregate(Arc::new(other));
519 assert_eq!(m.value(), Duration::from_millis(200).as_nanos() as usize);
521 }
522
523 #[test]
524 fn first_latency_captures_first_value_and_aggregates() {
525 let m = FirstLatencyMetric::default();
526 assert_eq!(m.value(), 0);
527 m.add_duration(Duration::from_millis(100));
528 m.add_duration(Duration::from_millis(200));
529 assert_eq!(m.value(), Duration::from_millis(100).as_nanos() as usize);
530
531 let other = FirstLatencyMetric::default();
533 other.add_duration(Duration::from_millis(50));
534 m.aggregate(Arc::new(other));
535 assert_eq!(m.value(), Duration::from_millis(100).as_nanos() as usize);
536
537 let unset = FirstLatencyMetric::default();
539 let other2 = FirstLatencyMetric::default();
540 other2.add_duration(Duration::from_millis(77));
541 unset.aggregate(Arc::new(other2));
542 assert_eq!(unset.value(), Duration::from_millis(77).as_nanos() as usize);
543 }
544
545 #[test]
546 fn p50_latency_returns_median() {
547 let m = P50LatencyMetric::default();
548 assert_eq!(m.value(), 0);
549 for _ in 0..50 {
551 m.add_duration(Duration::from_millis(1));
552 }
553 for _ in 0..50 {
554 m.add_duration(Duration::from_millis(100));
555 }
556 let val = m.value();
558 assert!(val < Duration::from_millis(2).as_nanos() as usize);
559 }
560
561 #[test]
562 fn p99_latency_returns_high_value() {
563 let m = P99LatencyMetric::default();
564 for _ in 0..98 {
566 m.add_duration(Duration::from_millis(1));
567 }
568 m.add_duration(Duration::from_millis(100));
569 m.add_duration(Duration::from_millis(100));
570 let val = m.value();
572 assert!(val >= Duration::from_millis(50).as_nanos() as usize);
573 }
574
575 #[test]
576 fn percentile_latency_aggregates() {
577 let m1 = P75LatencyMetric::default();
578 let m2 = P75LatencyMetric::default();
579 for _ in 0..75 {
580 m1.add_duration(Duration::from_millis(1));
581 }
582 for _ in 0..25 {
583 m2.add_duration(Duration::from_millis(100));
584 }
585 m1.aggregate(Arc::new(m2));
586 let val = m1.value();
588 assert!(val < Duration::from_millis(2).as_nanos() as usize);
589 }
590
591 #[test]
592 fn zero_duration_clamped_to_one_nano() {
593 let min = MinLatencyMetric::default();
594 min.add_duration(Duration::ZERO);
595 assert_eq!(min.value(), 1);
596
597 let max = MaxLatencyMetric::default();
598 max.add_duration(Duration::ZERO);
599 assert_eq!(max.value(), 1);
600
601 let avg = AvgLatencyMetric::default();
602 avg.add_duration(Duration::ZERO);
603 assert_eq!(avg.value(), 1);
604
605 let first = FirstLatencyMetric::default();
606 first.add_duration(Duration::ZERO);
607 assert_eq!(first.value(), 1);
608 }
609}