datafusion_physical_plan/metrics/
mod.rs1mod baseline;
21mod builder;
22mod custom;
23mod value;
24
25use parking_lot::Mutex;
26use std::{
27 borrow::Cow,
28 fmt::{Debug, Display},
29 sync::Arc,
30};
31
32use datafusion_common::HashMap;
33
34pub use baseline::{BaselineMetrics, RecordOutput, SpillMetrics, SplitMetrics};
36pub use builder::MetricBuilder;
37pub use custom::CustomMetricValue;
38pub use value::{Count, Gauge, MetricValue, ScopedTimerGuard, Time, Timestamp};
39
40#[derive(Debug)]
71pub struct Metric {
72 value: MetricValue,
74
75 labels: Vec<Label>,
77
78 partition: Option<usize>,
81}
82
83impl Display for Metric {
84 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
85 write!(f, "{}", self.value.name())?;
86
87 let mut iter = self
88 .partition
89 .iter()
90 .map(|partition| Label::new("partition", partition.to_string()))
91 .chain(self.labels().iter().cloned())
92 .peekable();
93
94 if iter.peek().is_some() {
96 write!(f, "{{")?;
97
98 let mut is_first = true;
99 for i in iter {
100 if !is_first {
101 write!(f, ", ")?;
102 } else {
103 is_first = false;
104 }
105
106 write!(f, "{i}")?;
107 }
108
109 write!(f, "}}")?;
110 }
111
112 write!(f, "={}", self.value)
114 }
115}
116
117impl Metric {
118 pub fn new(value: MetricValue, partition: Option<usize>) -> Self {
121 Self {
122 value,
123 labels: vec![],
124 partition,
125 }
126 }
127
128 pub fn new_with_labels(
131 value: MetricValue,
132 partition: Option<usize>,
133 labels: Vec<Label>,
134 ) -> Self {
135 Self {
136 value,
137 labels,
138 partition,
139 }
140 }
141
142 pub fn with_label(mut self, label: Label) -> Self {
144 self.labels.push(label);
145 self
146 }
147
148 pub fn labels(&self) -> &[Label] {
150 &self.labels
151 }
152
153 pub fn value(&self) -> &MetricValue {
155 &self.value
156 }
157
158 pub fn value_mut(&mut self) -> &mut MetricValue {
160 &mut self.value
161 }
162
163 pub fn partition(&self) -> Option<usize> {
165 self.partition
166 }
167}
168
169#[derive(Default, Debug, Clone)]
173pub struct MetricsSet {
174 metrics: Vec<Arc<Metric>>,
175}
176
177impl MetricsSet {
178 pub fn new() -> Self {
180 Default::default()
181 }
182
183 pub fn push(&mut self, metric: Arc<Metric>) {
185 self.metrics.push(metric)
186 }
187
188 pub fn iter(&self) -> impl Iterator<Item = &Arc<Metric>> {
190 self.metrics.iter()
191 }
192
193 pub fn output_rows(&self) -> Option<usize> {
196 self.sum(|metric| matches!(metric.value(), MetricValue::OutputRows(_)))
197 .map(|v| v.as_usize())
198 }
199
200 pub fn spill_count(&self) -> Option<usize> {
203 self.sum(|metric| matches!(metric.value(), MetricValue::SpillCount(_)))
204 .map(|v| v.as_usize())
205 }
206
207 pub fn spilled_bytes(&self) -> Option<usize> {
210 self.sum(|metric| matches!(metric.value(), MetricValue::SpilledBytes(_)))
211 .map(|v| v.as_usize())
212 }
213
214 pub fn spilled_rows(&self) -> Option<usize> {
217 self.sum(|metric| matches!(metric.value(), MetricValue::SpilledRows(_)))
218 .map(|v| v.as_usize())
219 }
220
221 pub fn elapsed_compute(&self) -> Option<usize> {
224 self.sum(|metric| matches!(metric.value(), MetricValue::ElapsedCompute(_)))
225 .map(|v| v.as_usize())
226 }
227
228 pub fn sum<F>(&self, mut f: F) -> Option<MetricValue>
232 where
233 F: FnMut(&Metric) -> bool,
234 {
235 let mut iter = self
236 .metrics
237 .iter()
238 .filter(|metric| f(metric.as_ref()))
239 .peekable();
240
241 let mut accum = match iter.peek() {
242 None => {
243 return None;
244 }
245 Some(metric) => metric.value().new_empty(),
246 };
247
248 iter.for_each(|metric| accum.aggregate(metric.value()));
249
250 Some(accum)
251 }
252
253 pub fn sum_by_name(&self, metric_name: &str) -> Option<MetricValue> {
256 self.sum(|m| match m.value() {
257 MetricValue::Count { name, .. } => name == metric_name,
258 MetricValue::Time { name, .. } => name == metric_name,
259 MetricValue::OutputRows(_) => false,
260 MetricValue::ElapsedCompute(_) => false,
261 MetricValue::SpillCount(_) => false,
262 MetricValue::SpilledBytes(_) => false,
263 MetricValue::SpilledRows(_) => false,
264 MetricValue::CurrentMemoryUsage(_) => false,
265 MetricValue::Gauge { name, .. } => name == metric_name,
266 MetricValue::StartTimestamp(_) => false,
267 MetricValue::EndTimestamp(_) => false,
268 MetricValue::Custom { .. } => false,
269 })
270 }
271
272 pub fn aggregate_by_name(&self) -> Self {
277 let mut map = HashMap::new();
278
279 for metric in &self.metrics {
281 let key = metric.value.name();
282 map.entry(key)
283 .and_modify(|accum: &mut Metric| {
284 accum.value_mut().aggregate(metric.value());
285 })
286 .or_insert_with(|| {
287 let partition = None;
289 let mut accum = Metric::new(metric.value().new_empty(), partition);
290 accum.value_mut().aggregate(metric.value());
291 accum
292 });
293 }
294
295 let new_metrics = map
296 .into_iter()
297 .map(|(_k, v)| Arc::new(v))
298 .collect::<Vec<_>>();
299
300 Self {
301 metrics: new_metrics,
302 }
303 }
304
305 pub fn sorted_for_display(mut self) -> Self {
307 self.metrics.sort_unstable_by_key(|metric| {
308 (
309 metric.value().display_sort_key(),
310 metric.value().name().to_owned(),
311 )
312 });
313 self
314 }
315
316 pub fn timestamps_removed(self) -> Self {
318 let Self { metrics } = self;
319
320 let metrics = metrics
321 .into_iter()
322 .filter(|m| !m.value.is_timestamp())
323 .collect::<Vec<_>>();
324
325 Self { metrics }
326 }
327}
328
329impl Display for MetricsSet {
330 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
332 let mut is_first = true;
333 for i in self.metrics.iter() {
334 if !is_first {
335 write!(f, ", ")?;
336 } else {
337 is_first = false;
338 }
339
340 write!(f, "{i}")?;
341 }
342 Ok(())
343 }
344}
345
346#[derive(Default, Debug, Clone)]
358pub struct ExecutionPlanMetricsSet {
359 inner: Arc<Mutex<MetricsSet>>,
360}
361
362impl ExecutionPlanMetricsSet {
363 pub fn new() -> Self {
365 Self {
366 inner: Arc::new(Mutex::new(MetricsSet::new())),
367 }
368 }
369
370 pub fn register(&self, metric: Arc<Metric>) {
372 self.inner.lock().push(metric)
373 }
374
375 pub fn clone_inner(&self) -> MetricsSet {
377 let guard = self.inner.lock();
378 (*guard).clone()
379 }
380}
381
382#[derive(Debug, Clone, PartialEq, Eq, Hash)]
396pub struct Label {
397 name: Cow<'static, str>,
398 value: Cow<'static, str>,
399}
400
401impl Label {
402 pub fn new(
404 name: impl Into<Cow<'static, str>>,
405 value: impl Into<Cow<'static, str>>,
406 ) -> Self {
407 let name = name.into();
408 let value = value.into();
409 Self { name, value }
410 }
411
412 pub fn name(&self) -> &str {
414 self.name.as_ref()
415 }
416
417 pub fn value(&self) -> &str {
419 self.value.as_ref()
420 }
421}
422
423impl Display for Label {
424 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
425 write!(f, "{}={}", self.name, self.value)
426 }
427}
428
429#[cfg(test)]
430mod tests {
431 use std::time::Duration;
432
433 use chrono::{TimeZone, Utc};
434
435 use super::*;
436
437 #[test]
438 fn test_display_no_labels_no_partition() {
439 let count = Count::new();
440 count.add(33);
441 let value = MetricValue::OutputRows(count);
442 let partition = None;
443 let metric = Metric::new(value, partition);
444
445 assert_eq!("output_rows=33", metric.to_string())
446 }
447
448 #[test]
449 fn test_display_no_labels_with_partition() {
450 let count = Count::new();
451 count.add(44);
452 let value = MetricValue::OutputRows(count);
453 let partition = Some(1);
454 let metric = Metric::new(value, partition);
455
456 assert_eq!("output_rows{partition=1}=44", metric.to_string())
457 }
458
459 #[test]
460 fn test_display_labels_no_partition() {
461 let count = Count::new();
462 count.add(55);
463 let value = MetricValue::OutputRows(count);
464 let partition = None;
465 let label = Label::new("foo", "bar");
466 let metric = Metric::new_with_labels(value, partition, vec![label]);
467
468 assert_eq!("output_rows{foo=bar}=55", metric.to_string())
469 }
470
471 #[test]
472 fn test_display_labels_and_partition() {
473 let count = Count::new();
474 count.add(66);
475 let value = MetricValue::OutputRows(count);
476 let partition = Some(2);
477 let label = Label::new("foo", "bar");
478 let metric = Metric::new_with_labels(value, partition, vec![label]);
479
480 assert_eq!("output_rows{partition=2, foo=bar}=66", metric.to_string())
481 }
482
483 #[test]
484 fn test_output_rows() {
485 let metrics = ExecutionPlanMetricsSet::new();
486 assert!(metrics.clone_inner().output_rows().is_none());
487
488 let partition = 1;
489 let output_rows = MetricBuilder::new(&metrics).output_rows(partition);
490 output_rows.add(13);
491
492 let output_rows = MetricBuilder::new(&metrics).output_rows(partition + 1);
493 output_rows.add(7);
494 assert_eq!(metrics.clone_inner().output_rows().unwrap(), 20);
495 }
496
497 #[test]
498 fn test_elapsed_compute() {
499 let metrics = ExecutionPlanMetricsSet::new();
500 assert!(metrics.clone_inner().elapsed_compute().is_none());
501
502 let partition = 1;
503 let elapsed_compute = MetricBuilder::new(&metrics).elapsed_compute(partition);
504 elapsed_compute.add_duration(Duration::from_nanos(1234));
505
506 let elapsed_compute = MetricBuilder::new(&metrics).elapsed_compute(partition + 1);
507 elapsed_compute.add_duration(Duration::from_nanos(6));
508 assert_eq!(metrics.clone_inner().elapsed_compute().unwrap(), 1240);
509 }
510
511 #[test]
512 fn test_sum() {
513 let metrics = ExecutionPlanMetricsSet::new();
514
515 let count1 = MetricBuilder::new(&metrics)
516 .with_new_label("foo", "bar")
517 .counter("my_counter", 1);
518 count1.add(1);
519
520 let count2 = MetricBuilder::new(&metrics).counter("my_counter", 2);
521 count2.add(2);
522
523 let metrics = metrics.clone_inner();
524 assert!(metrics.sum(|_| false).is_none());
525
526 let expected_count = Count::new();
527 expected_count.add(3);
528 let expected_sum = MetricValue::Count {
529 name: "my_counter".into(),
530 count: expected_count,
531 };
532
533 assert_eq!(metrics.sum(|_| true), Some(expected_sum));
534 }
535
536 #[test]
537 #[should_panic(expected = "Mismatched metric types. Can not aggregate Count")]
538 fn test_bad_sum() {
539 let metrics = ExecutionPlanMetricsSet::new();
541
542 let count = MetricBuilder::new(&metrics).counter("my_metric", 1);
543 count.add(1);
544
545 let time = MetricBuilder::new(&metrics).subset_time("my_metric", 1);
546 time.add_duration(Duration::from_nanos(10));
547
548 metrics.clone_inner().sum(|_| true);
550 }
551
552 #[test]
553 fn test_aggregate_by_name() {
554 let metrics = ExecutionPlanMetricsSet::new();
555
556 let elapsed_compute1 = MetricBuilder::new(&metrics)
558 .with_new_label("foo", "bar")
559 .elapsed_compute(1);
560 elapsed_compute1.add_duration(Duration::from_nanos(12));
561
562 let elapsed_compute2 = MetricBuilder::new(&metrics).elapsed_compute(2);
563 elapsed_compute2.add_duration(Duration::from_nanos(34));
564
565 let elapsed_compute3 = MetricBuilder::new(&metrics).elapsed_compute(4);
566 elapsed_compute3.add_duration(Duration::from_nanos(56));
567
568 let output_rows = MetricBuilder::new(&metrics).output_rows(1); output_rows.add(56);
570
571 let aggregated = metrics.clone_inner().aggregate_by_name();
572
573 let elapsed_computes = aggregated
575 .iter()
576 .filter(|metric| matches!(metric.value(), MetricValue::ElapsedCompute(_)))
577 .collect::<Vec<_>>();
578 assert_eq!(elapsed_computes.len(), 1);
579 assert_eq!(elapsed_computes[0].value().as_usize(), 12 + 34 + 56);
580 assert!(elapsed_computes[0].partition().is_none());
581
582 let output_rows = aggregated
584 .iter()
585 .filter(|metric| matches!(metric.value(), MetricValue::OutputRows(_)))
586 .collect::<Vec<_>>();
587 assert_eq!(output_rows.len(), 1);
588 assert_eq!(output_rows[0].value().as_usize(), 56);
589 assert!(output_rows[0].partition.is_none())
590 }
591
592 #[test]
593 #[should_panic(expected = "Mismatched metric types. Can not aggregate Count")]
594 fn test_aggregate_partition_bad_sum() {
595 let metrics = ExecutionPlanMetricsSet::new();
596
597 let count = MetricBuilder::new(&metrics).counter("my_metric", 1);
598 count.add(1);
599
600 let time = MetricBuilder::new(&metrics).subset_time("my_metric", 1);
601 time.add_duration(Duration::from_nanos(10));
602
603 metrics.clone_inner().aggregate_by_name();
605 }
606
607 #[test]
608 fn test_aggregate_partition_timestamps() {
609 let metrics = ExecutionPlanMetricsSet::new();
610
611 let t1 = Utc.timestamp_nanos(1431648000000000);
613 let t2 = Utc.timestamp_nanos(1531648000000000);
615 let t3 = Utc.timestamp_nanos(1631648000000000);
617 let t4 = Utc.timestamp_nanos(1731648000000000);
619
620 let start_timestamp0 = MetricBuilder::new(&metrics).start_timestamp(0);
621 start_timestamp0.set(t1);
622 let end_timestamp0 = MetricBuilder::new(&metrics).end_timestamp(0);
623 end_timestamp0.set(t2);
624 let start_timestamp1 = MetricBuilder::new(&metrics).start_timestamp(0);
625 start_timestamp1.set(t3);
626 let end_timestamp1 = MetricBuilder::new(&metrics).end_timestamp(0);
627 end_timestamp1.set(t4);
628
629 let aggregated = metrics.clone_inner().aggregate_by_name();
631
632 let mut ts = aggregated
633 .iter()
634 .filter(|metric| {
635 matches!(metric.value(), MetricValue::StartTimestamp(_))
636 && metric.labels().is_empty()
637 })
638 .collect::<Vec<_>>();
639 assert_eq!(ts.len(), 1);
640 match ts.remove(0).value() {
641 MetricValue::StartTimestamp(ts) => {
642 assert_eq!(ts.value(), Some(t1));
644 }
645 _ => {
646 panic!("Not a timestamp");
647 }
648 };
649
650 let mut ts = aggregated
651 .iter()
652 .filter(|metric| {
653 matches!(metric.value(), MetricValue::EndTimestamp(_))
654 && metric.labels().is_empty()
655 })
656 .collect::<Vec<_>>();
657 assert_eq!(ts.len(), 1);
658 match ts.remove(0).value() {
659 MetricValue::EndTimestamp(ts) => {
660 assert_eq!(ts.value(), Some(t4));
662 }
663 _ => {
664 panic!("Not a timestamp");
665 }
666 };
667 }
668
669 #[test]
670 fn test_sorted_for_display() {
671 let metrics = ExecutionPlanMetricsSet::new();
672 MetricBuilder::new(&metrics).end_timestamp(0);
673 MetricBuilder::new(&metrics).start_timestamp(0);
674 MetricBuilder::new(&metrics).elapsed_compute(0);
675 MetricBuilder::new(&metrics).counter("the_second_counter", 0);
676 MetricBuilder::new(&metrics).counter("the_counter", 0);
677 MetricBuilder::new(&metrics).counter("the_third_counter", 0);
678 MetricBuilder::new(&metrics).subset_time("the_time", 0);
679 MetricBuilder::new(&metrics).output_rows(0);
680 let metrics = metrics.clone_inner();
681
682 fn metric_names(metrics: &MetricsSet) -> String {
683 let n = metrics.iter().map(|m| m.value().name()).collect::<Vec<_>>();
684 n.join(", ")
685 }
686
687 assert_eq!("end_timestamp, start_timestamp, elapsed_compute, the_second_counter, the_counter, the_third_counter, the_time, output_rows", metric_names(&metrics));
688
689 let metrics = metrics.sorted_for_display();
690 assert_eq!("output_rows, elapsed_compute, the_counter, the_second_counter, the_third_counter, the_time, start_timestamp, end_timestamp", metric_names(&metrics));
691 }
692}