datafusion_distributed/metrics/
latency_metric.rs1use datafusion::common::instant::Instant;
2use datafusion::common::{Result, human_readable_duration};
3use datafusion::physical_expr_common::metrics::{MetricBuilder, MetricValue};
4use datafusion::physical_plan::metrics::CustomMetricValue;
5use sketches_ddsketch::{Config, DDSketch};
6use std::any::Any;
7use std::borrow::Cow;
8use std::fmt::{Display, Formatter};
9use std::sync::Arc;
10use std::sync::Mutex;
11use std::sync::atomic::AtomicUsize;
12use std::sync::atomic::Ordering::Relaxed;
13use std::time::Duration;
14
15pub trait LatencyMetricExt {
17 fn min_latency(self, name: impl Into<Cow<'static, str>>) -> MinLatencyMetric;
18 fn max_latency(self, name: impl Into<Cow<'static, str>>) -> MaxLatencyMetric;
19 fn avg_latency(self, name: impl Into<Cow<'static, str>>) -> AvgLatencyMetric;
20 fn first_latency(self, name: impl Into<Cow<'static, str>>) -> FirstLatencyMetric;
21 fn p50_latency(self, name: impl Into<Cow<'static, str>>) -> P50LatencyMetric;
22 fn p75_latency(self, name: impl Into<Cow<'static, str>>) -> P75LatencyMetric;
23 fn p95_latency(self, name: impl Into<Cow<'static, str>>) -> P95LatencyMetric;
24 fn p99_latency(self, name: impl Into<Cow<'static, str>>) -> P99LatencyMetric;
25}
26
27impl LatencyMetricExt for MetricBuilder<'_> {
28 fn min_latency(self, name: impl Into<Cow<'static, str>>) -> MinLatencyMetric {
29 let value = MinLatencyMetric::default();
30 self.build(MetricValue::Custom {
31 name: name.into(),
32 value: Arc::new(value.clone()),
33 });
34 value
35 }
36
37 fn max_latency(self, name: impl Into<Cow<'static, str>>) -> MaxLatencyMetric {
38 let value = MaxLatencyMetric::default();
39 self.build(MetricValue::Custom {
40 name: name.into(),
41 value: Arc::new(value.clone()),
42 });
43 value
44 }
45
46 fn avg_latency(self, name: impl Into<Cow<'static, str>>) -> AvgLatencyMetric {
47 let value = AvgLatencyMetric::default();
48 self.build(MetricValue::Custom {
49 name: name.into(),
50 value: Arc::new(value.clone()),
51 });
52 value
53 }
54
55 fn first_latency(self, name: impl Into<Cow<'static, str>>) -> FirstLatencyMetric {
56 let value = FirstLatencyMetric::default();
57 self.build(MetricValue::Custom {
58 name: name.into(),
59 value: Arc::new(value.clone()),
60 });
61 value
62 }
63
64 fn p50_latency(self, name: impl Into<Cow<'static, str>>) -> P50LatencyMetric {
65 let value = P50LatencyMetric::default();
66 self.build(MetricValue::Custom {
67 name: name.into(),
68 value: Arc::new(value.clone()),
69 });
70 value
71 }
72
73 fn p75_latency(self, name: impl Into<Cow<'static, str>>) -> P75LatencyMetric {
74 let value = P75LatencyMetric::default();
75 self.build(MetricValue::Custom {
76 name: name.into(),
77 value: Arc::new(value.clone()),
78 });
79 value
80 }
81
82 fn p95_latency(self, name: impl Into<Cow<'static, str>>) -> P95LatencyMetric {
83 let value = P95LatencyMetric::default();
84 self.build(MetricValue::Custom {
85 name: name.into(),
86 value: Arc::new(value.clone()),
87 });
88 value
89 }
90
91 fn p99_latency(self, name: impl Into<Cow<'static, str>>) -> P99LatencyMetric {
92 let value = P99LatencyMetric::default();
93 self.build(MetricValue::Custom {
94 name: name.into(),
95 value: Arc::new(value.clone()),
96 });
97 value
98 }
99}
100
101#[derive(Debug, Clone)]
102pub struct MinLatencyMetric {
103 nanos: Arc<AtomicUsize>,
104}
105
106impl Default for MinLatencyMetric {
107 fn default() -> Self {
108 Self {
109 nanos: Arc::new(AtomicUsize::new(usize::MAX)),
110 }
111 }
112}
113
114impl MinLatencyMetric {
115 pub fn from_nanos(nanos: usize) -> Self {
116 Self {
117 nanos: Arc::new(AtomicUsize::new(nanos)),
118 }
119 }
120
121 pub fn value(&self) -> usize {
122 self.nanos.load(Relaxed)
123 }
124
125 pub fn add_elapsed(&self, start: Instant) {
126 self.add_duration(start.elapsed());
127 }
128
129 pub fn add_duration(&self, duration: Duration) {
130 let more_nanos = duration.as_nanos() as usize;
131 self.add_nanos(more_nanos);
132 }
133
134 pub fn add_nanos(&self, nanos: usize) {
135 self.nanos.fetch_min(nanos.max(1), Relaxed);
136 }
137}
138
139impl Display for MinLatencyMetric {
140 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
141 match self.value() {
142 usize::MAX => write!(f, "0ns"),
143 v => write!(f, "{}", human_readable_duration(v as u64)),
144 }
145 }
146}
147
148impl CustomMetricValue for MinLatencyMetric {
149 fn new_empty(&self) -> Arc<dyn CustomMetricValue> {
150 Arc::new(MinLatencyMetric::default())
151 }
152
153 fn aggregate(&self, other: Arc<dyn CustomMetricValue + 'static>) {
154 let Some(other) = other.as_any().downcast_ref::<Self>() else {
155 return;
156 };
157 self.nanos.fetch_min(other.nanos.load(Relaxed), Relaxed);
158 }
159
160 fn as_any(&self) -> &dyn Any {
161 self
162 }
163
164 fn as_usize(&self) -> usize {
165 self.value()
166 }
167
168 fn is_eq(&self, other: &Arc<dyn CustomMetricValue>) -> bool {
169 let Some(other) = other.as_any().downcast_ref::<Self>() else {
170 return false;
171 };
172 other.value() == self.value()
173 }
174}
175
176#[derive(Debug, Clone, Default)]
177pub struct MaxLatencyMetric {
178 nanos: Arc<AtomicUsize>,
179}
180
181impl MaxLatencyMetric {
182 pub fn from_nanos(nanos: usize) -> Self {
183 Self {
184 nanos: Arc::new(AtomicUsize::new(nanos)),
185 }
186 }
187
188 pub fn value(&self) -> usize {
189 self.nanos.load(Relaxed)
190 }
191
192 pub fn add_elapsed(&self, start: Instant) {
193 self.add_duration(start.elapsed());
194 }
195
196 pub fn add_duration(&self, duration: Duration) {
197 let more_nanos = duration.as_nanos() as usize;
198 self.add_nanos(more_nanos);
199 }
200
201 pub fn add_nanos(&self, nanos: usize) {
202 self.nanos.fetch_max(nanos.max(1), Relaxed);
203 }
204}
205
206impl Display for MaxLatencyMetric {
207 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
208 write!(f, "{}", human_readable_duration(self.value() as u64))
209 }
210}
211
212impl CustomMetricValue for MaxLatencyMetric {
213 fn new_empty(&self) -> Arc<dyn CustomMetricValue> {
214 Arc::new(MaxLatencyMetric::default())
215 }
216
217 fn aggregate(&self, other: Arc<dyn CustomMetricValue + 'static>) {
218 let Some(other) = other.as_any().downcast_ref::<Self>() else {
219 return;
220 };
221 self.nanos.fetch_max(other.nanos.load(Relaxed), Relaxed);
222 }
223
224 fn as_any(&self) -> &dyn Any {
225 self
226 }
227
228 fn as_usize(&self) -> usize {
229 self.value()
230 }
231
232 fn is_eq(&self, other: &Arc<dyn CustomMetricValue>) -> bool {
233 let Some(other) = other.as_any().downcast_ref::<Self>() else {
234 return false;
235 };
236 other.value() == self.value()
237 }
238}
239
240#[derive(Debug, Clone, Default)]
241pub struct AvgLatencyMetric {
242 nanos_sum: Arc<AtomicUsize>,
243 count: Arc<AtomicUsize>,
244}
245
246impl AvgLatencyMetric {
247 pub(crate) fn from_raw(nanos_sum: usize, count: usize) -> Self {
248 Self {
249 nanos_sum: Arc::new(AtomicUsize::new(nanos_sum)),
250 count: Arc::new(AtomicUsize::new(count)),
251 }
252 }
253
254 pub fn value(&self) -> usize {
255 self.nanos_sum.load(Relaxed) / self.count.load(Relaxed).max(1)
256 }
257
258 pub(crate) fn nanos_sum(&self) -> usize {
259 self.nanos_sum.load(Relaxed)
260 }
261
262 pub(crate) fn count(&self) -> usize {
263 self.count.load(Relaxed)
264 }
265
266 pub fn add_elapsed(&self, start: Instant) {
267 self.add_duration(start.elapsed());
268 }
269
270 pub fn add_duration(&self, duration: Duration) {
271 let more_nanos = duration.as_nanos() as usize;
272 self.add_nanos(more_nanos);
273 }
274
275 pub fn add_nanos(&self, nanos: usize) {
276 self.nanos_sum.fetch_add(nanos.max(1), Relaxed);
277 self.count.fetch_add(1, Relaxed);
278 }
279}
280
281impl Display for AvgLatencyMetric {
282 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
283 write!(f, "{}", human_readable_duration(self.value() as u64))
284 }
285}
286
287impl CustomMetricValue for AvgLatencyMetric {
288 fn new_empty(&self) -> Arc<dyn CustomMetricValue> {
289 Arc::new(AvgLatencyMetric::default())
290 }
291
292 fn aggregate(&self, other: Arc<dyn CustomMetricValue + 'static>) {
293 let Some(other) = other.as_any().downcast_ref::<Self>() else {
294 return;
295 };
296 self.nanos_sum
297 .fetch_add(other.nanos_sum.load(Relaxed), Relaxed);
298 self.count.fetch_add(other.count.load(Relaxed), Relaxed);
299 }
300
301 fn as_any(&self) -> &dyn Any {
302 self
303 }
304
305 fn as_usize(&self) -> usize {
306 self.value()
307 }
308
309 fn is_eq(&self, other: &Arc<dyn CustomMetricValue>) -> bool {
310 let Some(other) = other.as_any().downcast_ref::<Self>() else {
311 return false;
312 };
313 other.value() == self.value()
314 }
315}
316
317#[derive(Debug, Clone, Default)]
320pub struct FirstLatencyMetric {
321 nanos: Arc<AtomicUsize>,
322}
323
324impl FirstLatencyMetric {
325 pub fn from_nanos(nanos: usize) -> Self {
326 Self {
327 nanos: Arc::new(AtomicUsize::new(nanos)),
328 }
329 }
330
331 pub fn value(&self) -> usize {
332 self.nanos.load(Relaxed)
333 }
334
335 pub fn add_elapsed(&self, start: Instant) {
336 self.add_duration(start.elapsed());
337 }
338
339 pub fn add_duration(&self, duration: Duration) {
340 let nanos = duration.as_nanos() as usize;
341 self.add_nanos(nanos);
342 }
343
344 pub fn add_nanos(&self, nanos: usize) {
345 let _ = self
347 .nanos
348 .compare_exchange(0, nanos.max(1), Relaxed, Relaxed);
349 }
350}
351
352impl Display for FirstLatencyMetric {
353 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
354 write!(f, "{}", human_readable_duration(self.value() as u64))
355 }
356}
357
358impl CustomMetricValue for FirstLatencyMetric {
359 fn new_empty(&self) -> Arc<dyn CustomMetricValue> {
360 Arc::new(FirstLatencyMetric::default())
361 }
362
363 fn aggregate(&self, other: Arc<dyn CustomMetricValue + 'static>) {
364 let Some(other) = other.as_any().downcast_ref::<Self>() else {
365 return;
366 };
367 let _ = self
369 .nanos
370 .compare_exchange(0, other.nanos.load(Relaxed), Relaxed, Relaxed);
371 }
372
373 fn as_any(&self) -> &dyn Any {
374 self
375 }
376
377 fn as_usize(&self) -> usize {
378 self.value()
379 }
380
381 fn is_eq(&self, other: &Arc<dyn CustomMetricValue>) -> bool {
382 let Some(other) = other.as_any().downcast_ref::<Self>() else {
383 return false;
384 };
385 other.value() == self.value()
386 }
387}
388
389macro_rules! percentile_latency_metric {
390 ($name:ident, $percentile:expr) => {
391 #[derive(Clone)]
392 pub struct $name {
393 inner: Arc<Mutex<DDSketch>>,
394 }
395
396 impl std::fmt::Debug for $name {
397 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
398 f.debug_struct(stringify!($name))
399 .field("count", &self.count())
400 .finish()
401 }
402 }
403
404 impl Default for $name {
405 fn default() -> Self {
406 Self {
407 inner: Arc::new(Mutex::new(DDSketch::new(Config::defaults()))),
408 }
409 }
410 }
411
412 impl $name {
413 pub(crate) fn from_sketch(sketch: DDSketch) -> Self {
414 Self {
415 inner: Arc::new(Mutex::new(sketch)),
416 }
417 }
418
419 pub fn value(&self) -> usize {
420 let sketch = self.inner.lock().unwrap();
421 sketch.quantile($percentile).unwrap_or(None).unwrap_or(0.0) as usize
422 }
423
424 pub(crate) fn serialize_sketch(&self) -> Result<Vec<u8>> {
425 let sketch = self.inner.lock().unwrap();
426 bincode::serialize(&*sketch).map_err(|e| {
427 datafusion::error::DataFusionError::Internal(format!(
428 "failed to serialize DDSketch: {e}"
429 ))
430 })
431 }
432
433 pub(crate) fn count(&self) -> usize {
434 let sketch = self.inner.lock().unwrap();
435 sketch.count() as usize
436 }
437
438 pub fn add_elapsed(&self, start: Instant) {
439 self.add_duration(start.elapsed());
440 }
441
442 pub fn add_duration(&self, duration: Duration) {
443 let nanos = (duration.as_nanos() as usize).max(1);
444 self.add_nanos(nanos);
445 }
446
447 pub fn add_nanos(&self, nanos: usize) {
448 self.inner.lock().unwrap().add(nanos as f64);
449 }
450 }
451
452 impl Display for $name {
453 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
454 write!(f, "{}", human_readable_duration(self.value() as u64))
455 }
456 }
457
458 impl CustomMetricValue for $name {
459 fn new_empty(&self) -> Arc<dyn CustomMetricValue> {
460 Arc::new($name::default())
461 }
462
463 fn aggregate(&self, other: Arc<dyn CustomMetricValue + 'static>) {
464 let Some(other) = other.as_any().downcast_ref::<Self>() else {
465 return;
466 };
467 let other_sketch = other.inner.lock().unwrap();
468 let _ = self.inner.lock().unwrap().merge(&other_sketch);
469 }
470
471 fn as_any(&self) -> &dyn Any {
472 self
473 }
474
475 fn as_usize(&self) -> usize {
476 self.value()
477 }
478
479 fn is_eq(&self, other: &Arc<dyn CustomMetricValue>) -> bool {
480 let Some(other) = other.as_any().downcast_ref::<Self>() else {
481 return false;
482 };
483 other.value() == self.value()
484 }
485 }
486 };
487}
488
489percentile_latency_metric!(P50LatencyMetric, 0.50);
490percentile_latency_metric!(P75LatencyMetric, 0.75);
491percentile_latency_metric!(P95LatencyMetric, 0.95);
492percentile_latency_metric!(P99LatencyMetric, 0.99);
493
494#[cfg(test)]
495mod tests {
496 use super::*;
497
498 #[test]
499 fn min_latency_tracks_minimum_and_aggregates() {
500 let m = MinLatencyMetric::default();
501 assert_eq!(m.value(), usize::MAX);
502 m.add_duration(Duration::from_millis(100));
503 m.add_duration(Duration::from_millis(50));
504 m.add_duration(Duration::from_millis(200));
505 assert_eq!(m.value(), Duration::from_millis(50).as_nanos() as usize);
506
507 let other = MinLatencyMetric::default();
508 other.add_duration(Duration::from_millis(10));
509 m.aggregate(Arc::new(other));
510 assert_eq!(m.value(), Duration::from_millis(10).as_nanos() as usize);
511 }
512
513 #[test]
514 fn max_latency_tracks_maximum_and_aggregates() {
515 let m = MaxLatencyMetric::default();
516 assert_eq!(m.value(), 0);
517 m.add_duration(Duration::from_millis(100));
518 m.add_duration(Duration::from_millis(200));
519 m.add_duration(Duration::from_millis(50));
520 assert_eq!(m.value(), Duration::from_millis(200).as_nanos() as usize);
521
522 let other = MaxLatencyMetric::default();
523 other.add_duration(Duration::from_millis(500));
524 m.aggregate(Arc::new(other));
525 assert_eq!(m.value(), Duration::from_millis(500).as_nanos() as usize);
526 }
527
528 #[test]
529 fn avg_latency_computes_average_and_aggregates() {
530 let m = AvgLatencyMetric::default();
531 assert_eq!(m.value(), 0);
532 m.add_duration(Duration::from_millis(100));
533 m.add_duration(Duration::from_millis(200));
534 assert_eq!(m.value(), Duration::from_millis(150).as_nanos() as usize);
535
536 let other = AvgLatencyMetric::default();
537 other.add_duration(Duration::from_millis(300));
538 m.aggregate(Arc::new(other));
539 assert_eq!(m.value(), Duration::from_millis(200).as_nanos() as usize);
541 }
542
543 #[test]
544 fn first_latency_captures_first_value_and_aggregates() {
545 let m = FirstLatencyMetric::default();
546 assert_eq!(m.value(), 0);
547 m.add_duration(Duration::from_millis(100));
548 m.add_duration(Duration::from_millis(200));
549 assert_eq!(m.value(), Duration::from_millis(100).as_nanos() as usize);
550
551 let other = FirstLatencyMetric::default();
553 other.add_duration(Duration::from_millis(50));
554 m.aggregate(Arc::new(other));
555 assert_eq!(m.value(), Duration::from_millis(100).as_nanos() as usize);
556
557 let unset = FirstLatencyMetric::default();
559 let other2 = FirstLatencyMetric::default();
560 other2.add_duration(Duration::from_millis(77));
561 unset.aggregate(Arc::new(other2));
562 assert_eq!(unset.value(), Duration::from_millis(77).as_nanos() as usize);
563 }
564
565 #[test]
566 fn p50_latency_returns_median() {
567 let m = P50LatencyMetric::default();
568 assert_eq!(m.value(), 0);
569 for _ in 0..50 {
571 m.add_duration(Duration::from_millis(1));
572 }
573 for _ in 0..50 {
574 m.add_duration(Duration::from_millis(100));
575 }
576 let val = m.value();
578 assert!(val < Duration::from_millis(2).as_nanos() as usize);
579 }
580
581 #[test]
582 fn p99_latency_returns_high_value() {
583 let m = P99LatencyMetric::default();
584 for _ in 0..98 {
586 m.add_duration(Duration::from_millis(1));
587 }
588 m.add_duration(Duration::from_millis(100));
589 m.add_duration(Duration::from_millis(100));
590 let val = m.value();
592 assert!(val >= Duration::from_millis(50).as_nanos() as usize);
593 }
594
595 #[test]
596 fn percentile_latency_aggregates() {
597 let m1 = P75LatencyMetric::default();
598 let m2 = P75LatencyMetric::default();
599 for _ in 0..75 {
600 m1.add_duration(Duration::from_millis(1));
601 }
602 for _ in 0..25 {
603 m2.add_duration(Duration::from_millis(100));
604 }
605 m1.aggregate(Arc::new(m2));
606 let val = m1.value();
608 assert!(val < Duration::from_millis(2).as_nanos() as usize);
609 }
610
611 #[test]
612 fn zero_duration_clamped_to_one_nano() {
613 let min = MinLatencyMetric::default();
614 min.add_duration(Duration::ZERO);
615 assert_eq!(min.value(), 1);
616
617 let max = MaxLatencyMetric::default();
618 max.add_duration(Duration::ZERO);
619 assert_eq!(max.value(), 1);
620
621 let avg = AvgLatencyMetric::default();
622 avg.add_duration(Duration::ZERO);
623 assert_eq!(avg.value(), 1);
624
625 let first = FirstLatencyMetric::default();
626 first.add_duration(Duration::ZERO);
627 assert_eq!(first.value(), 1);
628 }
629}