1mod baseline;
21mod builder;
22mod custom;
23mod elapsed_compute;
24mod expression;
25mod value;
26
27use datafusion_common::HashMap;
28pub use datafusion_common::format::{MetricCategory, MetricType};
29use parking_lot::Mutex;
30use std::{
31 borrow::Cow,
32 fmt::{self, Debug, Display},
33 sync::Arc,
34 vec::IntoIter,
35};
36
37pub use baseline::{BaselineMetrics, RecordOutput, SpillMetrics, SplitMetrics};
40pub use builder::MetricBuilder;
41pub use custom::CustomMetricValue;
42pub use elapsed_compute::{ElapsedComputeFuture, ElapsedComputeFutureExt};
43pub use expression::ExpressionEvaluatorMetrics;
44pub use value::{
45 Count, Gauge, MetricValue, PruningMetrics, RatioMergeStrategy, RatioMetrics,
46 ScopedTimerGuard, Time, Timestamp,
47};
48
49#[derive(Debug)]
76pub struct Metric {
77 value: MetricValue,
79
80 labels: Vec<Label>,
82
83 partition: Option<usize>,
86
87 metric_type: MetricType,
88
89 metric_category: Option<MetricCategory>,
95}
96
97impl Display for Metric {
98 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
99 write!(f, "{}", self.value.name())?;
100
101 let mut iter = self
102 .partition
103 .iter()
104 .map(|partition| Label::new("partition", partition.to_string()))
105 .chain(self.labels().iter().cloned())
106 .peekable();
107
108 if iter.peek().is_some() {
110 write!(f, "{{")?;
111
112 let mut is_first = true;
113 for i in iter {
114 if !is_first {
115 write!(f, ", ")?;
116 } else {
117 is_first = false;
118 }
119
120 write!(f, "{i}")?;
121 }
122
123 write!(f, "}}")?;
124 }
125
126 write!(f, "={}", self.value)
128 }
129}
130
131impl Metric {
132 pub fn new(value: MetricValue, partition: Option<usize>) -> Self {
135 Self {
136 value,
137 labels: vec![],
138 partition,
139 metric_type: MetricType::Dev,
140 metric_category: None,
141 }
142 }
143
144 pub fn new_with_labels(
147 value: MetricValue,
148 partition: Option<usize>,
149 labels: Vec<Label>,
150 ) -> Self {
151 Self {
152 value,
153 labels,
154 partition,
155 metric_type: MetricType::Dev,
156 metric_category: None,
157 }
158 }
159
160 pub fn with_type(mut self, metric_type: MetricType) -> Self {
162 self.metric_type = metric_type;
163 self
164 }
165
166 pub fn with_category(mut self, category: MetricCategory) -> Self {
171 self.metric_category = Some(category);
172 self
173 }
174
175 pub fn with_label(mut self, label: Label) -> Self {
177 self.labels.push(label);
178 self
179 }
180
181 pub fn labels(&self) -> &[Label] {
183 &self.labels
184 }
185
186 pub fn value(&self) -> &MetricValue {
188 &self.value
189 }
190
191 pub fn value_mut(&mut self) -> &mut MetricValue {
193 &mut self.value
194 }
195
196 pub fn partition(&self) -> Option<usize> {
198 self.partition
199 }
200
201 pub fn metric_type(&self) -> MetricType {
203 self.metric_type
204 }
205
206 pub fn metric_category(&self) -> Option<MetricCategory> {
210 self.metric_category
211 }
212}
213
214#[derive(Default, Debug, Clone)]
216pub struct MetricsSet {
217 metrics: Vec<Arc<Metric>>,
218}
219
220impl MetricsSet {
221 pub fn new() -> Self {
223 Default::default()
224 }
225
226 pub fn push(&mut self, metric: Arc<Metric>) {
228 self.metrics.push(metric)
229 }
230
231 pub fn iter(&self) -> impl Iterator<Item = &Arc<Metric>> {
233 self.metrics.iter()
234 }
235
236 pub fn output_rows(&self) -> Option<usize> {
239 self.sum(|metric| matches!(metric.value(), MetricValue::OutputRows(_)))
240 .map(|v| v.as_usize())
241 }
242
243 pub fn spill_count(&self) -> Option<usize> {
246 self.sum(|metric| matches!(metric.value(), MetricValue::SpillCount(_)))
247 .map(|v| v.as_usize())
248 }
249
250 pub fn spilled_bytes(&self) -> Option<usize> {
253 self.sum(|metric| matches!(metric.value(), MetricValue::SpilledBytes(_)))
254 .map(|v| v.as_usize())
255 }
256
257 pub fn spilled_rows(&self) -> Option<usize> {
260 self.sum(|metric| matches!(metric.value(), MetricValue::SpilledRows(_)))
261 .map(|v| v.as_usize())
262 }
263
264 pub fn elapsed_compute(&self) -> Option<usize> {
267 self.sum(|metric| matches!(metric.value(), MetricValue::ElapsedCompute(_)))
268 .map(|v| v.as_usize())
269 }
270
271 pub fn sum<F>(&self, mut f: F) -> Option<MetricValue>
275 where
276 F: FnMut(&Metric) -> bool,
277 {
278 let mut iter = self
279 .metrics
280 .iter()
281 .filter(|metric| f(metric.as_ref()))
282 .peekable();
283
284 let mut accum = match iter.peek() {
285 None => {
286 return None;
287 }
288 Some(metric) => metric.value().new_empty(),
289 };
290
291 iter.for_each(|metric| accum.aggregate(metric.value()));
292
293 Some(accum)
294 }
295
296 pub fn sum_by_name(&self, metric_name: &str) -> Option<MetricValue> {
299 self.sum(|m| match m.value() {
300 MetricValue::Count { name, .. } => name == metric_name,
301 MetricValue::Time { name, .. } => name == metric_name,
302 MetricValue::OutputRows(_) => false,
303 MetricValue::ElapsedCompute(_) => false,
304 MetricValue::SpillCount(_) => false,
305 MetricValue::SpilledBytes(_) => false,
306 MetricValue::OutputBytes(_) => false,
307 MetricValue::OutputBatches(_) => false,
308 MetricValue::SpilledRows(_) => false,
309 MetricValue::CurrentMemoryUsage(_) => false,
310 MetricValue::Gauge { name, .. } => name == metric_name,
311 MetricValue::StartTimestamp(_) => false,
312 MetricValue::EndTimestamp(_) => false,
313 MetricValue::PruningMetrics { name, .. } => name == metric_name,
314 MetricValue::Ratio { name, .. } => name == metric_name,
315 MetricValue::Custom { .. } => false,
316 })
317 }
318
319 pub fn aggregate_by_name(&self) -> Self {
324 let mut map = HashMap::new();
325
326 for metric in &self.metrics {
328 let key = metric.value.name();
329 map.entry(key)
330 .and_modify(|accum: &mut Metric| {
331 accum.value_mut().aggregate(metric.value());
332 })
333 .or_insert_with(|| {
334 let partition = None;
336 let mut accum = Metric::new(metric.value().new_empty(), partition)
337 .with_type(metric.metric_type());
338 if let Some(cat) = metric.metric_category() {
339 accum = accum.with_category(cat);
340 }
341 accum.value_mut().aggregate(metric.value());
342 accum
343 });
344 }
345
346 let new_metrics = map
347 .into_iter()
348 .map(|(_k, v)| Arc::new(v))
349 .collect::<Vec<_>>();
350
351 Self {
352 metrics: new_metrics,
353 }
354 }
355
356 pub fn sorted_for_display(mut self) -> Self {
358 self.metrics.sort_unstable_by_key(|metric| {
359 (
360 metric.value().display_sort_key(),
361 metric.value().name().to_owned(),
362 )
363 });
364 self
365 }
366
367 pub fn timestamps_removed(self) -> Self {
369 let Self { metrics } = self;
370
371 let metrics = metrics
372 .into_iter()
373 .filter(|m| !m.value.is_timestamp())
374 .collect::<Vec<_>>();
375
376 Self { metrics }
377 }
378
379 pub fn filter_by_metric_types(self, allowed: &[MetricType]) -> Self {
382 if allowed.is_empty() {
383 return Self { metrics: vec![] };
384 }
385
386 let metrics = self
387 .metrics
388 .into_iter()
389 .filter(|metric| allowed.contains(&metric.metric_type()))
390 .collect::<Vec<_>>();
391 Self { metrics }
392 }
393
394 pub fn filter_by_categories(self, allowed: &[MetricCategory]) -> Self {
403 if allowed.is_empty() {
404 return Self { metrics: vec![] };
405 }
406
407 let metrics = self
408 .metrics
409 .into_iter()
410 .filter(|metric| {
411 let cat = metric
412 .metric_category()
413 .unwrap_or(MetricCategory::Uncategorized);
414 allowed.contains(&cat)
415 })
416 .collect::<Vec<_>>();
417 Self { metrics }
418 }
419}
420
421impl Display for MetricsSet {
422 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
424 let mut is_first = true;
425 for i in self.metrics.iter() {
426 if !is_first {
427 write!(f, ", ")?;
428 } else {
429 is_first = false;
430 }
431
432 write!(f, "{i}")?;
433 }
434 Ok(())
435 }
436}
437
438impl IntoIterator for MetricsSet {
439 type Item = Arc<Metric>;
440 type IntoIter = IntoIter<Self::Item>;
441
442 fn into_iter(self) -> Self::IntoIter {
443 self.metrics.into_iter()
444 }
445}
446
447impl<'a> IntoIterator for &'a MetricsSet {
448 type Item = &'a Arc<Metric>;
449 type IntoIter = std::slice::Iter<'a, Arc<Metric>>;
450
451 fn into_iter(self) -> Self::IntoIter {
452 self.metrics.iter()
453 }
454}
455
456impl Extend<Arc<Metric>> for MetricsSet {
457 fn extend<I: IntoIterator<Item = Arc<Metric>>>(&mut self, iter: I) {
458 self.metrics.extend(iter);
459 }
460}
461
462impl FromIterator<Arc<Metric>> for MetricsSet {
463 fn from_iter<T: IntoIterator<Item = Arc<Metric>>>(iter: T) -> Self {
464 Self {
465 metrics: iter.into_iter().collect(),
466 }
467 }
468}
469
470#[derive(Default, Debug, Clone)]
479pub struct ExecutionPlanMetricsSet {
480 inner: Arc<Mutex<MetricsSet>>,
481}
482
483impl ExecutionPlanMetricsSet {
484 pub fn new() -> Self {
486 Self {
487 inner: Arc::new(Mutex::new(MetricsSet::new())),
488 }
489 }
490
491 pub fn register(&self, metric: Arc<Metric>) {
493 self.inner.lock().push(metric)
494 }
495
496 pub fn clone_inner(&self) -> MetricsSet {
498 let guard = self.inner.lock();
499 (*guard).clone()
500 }
501}
502
503impl From<MetricsSet> for ExecutionPlanMetricsSet {
504 fn from(metrics: MetricsSet) -> Self {
505 Self {
506 inner: Arc::new(Mutex::new(metrics)),
507 }
508 }
509}
510
511#[derive(Debug, Clone, PartialEq, Eq, Hash)]
525pub struct Label {
526 name: Cow<'static, str>,
527 value: Cow<'static, str>,
528}
529
530impl Label {
531 pub fn new(
533 name: impl Into<Cow<'static, str>>,
534 value: impl Into<Cow<'static, str>>,
535 ) -> Self {
536 let name = name.into();
537 let value = value.into();
538 Self { name, value }
539 }
540
541 pub fn name(&self) -> &str {
543 self.name.as_ref()
544 }
545
546 pub fn value(&self) -> &str {
548 self.value.as_ref()
549 }
550}
551
552impl Display for Label {
553 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
554 write!(f, "{}={}", self.name, self.value)
555 }
556}
557
558#[cfg(test)]
559mod tests {
560 use std::time::Duration;
561
562 use chrono::{TimeZone, Utc};
563
564 use super::*;
565
566 #[test]
567 fn test_display_no_labels_no_partition() {
568 let count = Count::new();
569 count.add(33);
570 let value = MetricValue::OutputRows(count);
571 let partition = None;
572 let metric = Metric::new(value, partition);
573
574 assert_eq!("output_rows=33", metric.to_string())
575 }
576
577 #[test]
578 fn test_display_no_labels_with_partition() {
579 let count = Count::new();
580 count.add(44);
581 let value = MetricValue::OutputRows(count);
582 let partition = Some(1);
583 let metric = Metric::new(value, partition);
584
585 assert_eq!("output_rows{partition=1}=44", metric.to_string())
586 }
587
588 #[test]
589 fn test_display_labels_no_partition() {
590 let count = Count::new();
591 count.add(55);
592 let value = MetricValue::OutputRows(count);
593 let partition = None;
594 let label = Label::new("foo", "bar");
595 let metric = Metric::new_with_labels(value, partition, vec![label]);
596
597 assert_eq!("output_rows{foo=bar}=55", metric.to_string())
598 }
599
600 #[test]
601 fn test_display_labels_and_partition() {
602 let count = Count::new();
603 count.add(66);
604 let value = MetricValue::OutputRows(count);
605 let partition = Some(2);
606 let label = Label::new("foo", "bar");
607 let metric = Metric::new_with_labels(value, partition, vec![label]);
608
609 assert_eq!("output_rows{partition=2, foo=bar}=66", metric.to_string())
610 }
611
612 #[test]
613 fn test_output_rows() {
614 let metrics = ExecutionPlanMetricsSet::new();
615 assert!(metrics.clone_inner().output_rows().is_none());
616
617 let partition = 1;
618 let output_rows = MetricBuilder::new(&metrics).output_rows(partition);
619 output_rows.add(13);
620
621 let output_rows = MetricBuilder::new(&metrics).output_rows(partition + 1);
622 output_rows.add(7);
623 assert_eq!(metrics.clone_inner().output_rows().unwrap(), 20);
624 }
625
626 #[test]
627 fn test_elapsed_compute() {
628 let metrics = ExecutionPlanMetricsSet::new();
629 assert!(metrics.clone_inner().elapsed_compute().is_none());
630
631 let partition = 1;
632 let elapsed_compute = MetricBuilder::new(&metrics).elapsed_compute(partition);
633 elapsed_compute.add_duration(Duration::from_nanos(1234));
634
635 let elapsed_compute = MetricBuilder::new(&metrics).elapsed_compute(partition + 1);
636 elapsed_compute.add_duration(Duration::from_nanos(6));
637 assert_eq!(metrics.clone_inner().elapsed_compute().unwrap(), 1240);
638 }
639
640 #[test]
641 fn test_sum() {
642 let metrics = ExecutionPlanMetricsSet::new();
643
644 let count1 = MetricBuilder::new(&metrics)
645 .with_new_label("foo", "bar")
646 .counter("my_counter", 1);
647 count1.add(1);
648
649 let count2 = MetricBuilder::new(&metrics).counter("my_counter", 2);
650 count2.add(2);
651
652 let metrics = metrics.clone_inner();
653 assert!(metrics.sum(|_| false).is_none());
654
655 let expected_count = Count::new();
656 expected_count.add(3);
657 let expected_sum = MetricValue::Count {
658 name: "my_counter".into(),
659 count: expected_count,
660 };
661
662 assert_eq!(metrics.sum(|_| true), Some(expected_sum));
663 }
664
665 #[test]
666 #[should_panic(expected = "Mismatched metric types. Can not aggregate Count")]
667 fn test_bad_sum() {
668 let metrics = ExecutionPlanMetricsSet::new();
670
671 let count = MetricBuilder::new(&metrics).counter("my_metric", 1);
672 count.add(1);
673
674 let time = MetricBuilder::new(&metrics).subset_time("my_metric", 1);
675 time.add_duration(Duration::from_nanos(10));
676
677 metrics.clone_inner().sum(|_| true);
679 }
680
681 #[test]
682 fn test_aggregate_by_name() {
683 let metrics = ExecutionPlanMetricsSet::new();
684
685 let elapsed_compute1 = MetricBuilder::new(&metrics)
687 .with_new_label("foo", "bar")
688 .elapsed_compute(1);
689 elapsed_compute1.add_duration(Duration::from_nanos(12));
690
691 let elapsed_compute2 = MetricBuilder::new(&metrics).elapsed_compute(2);
692 elapsed_compute2.add_duration(Duration::from_nanos(34));
693
694 let elapsed_compute3 = MetricBuilder::new(&metrics).elapsed_compute(4);
695 elapsed_compute3.add_duration(Duration::from_nanos(56));
696
697 let output_rows = MetricBuilder::new(&metrics).output_rows(1); output_rows.add(56);
699
700 let aggregated = metrics.clone_inner().aggregate_by_name();
701
702 let elapsed_computes = aggregated
704 .iter()
705 .filter(|metric| matches!(metric.value(), MetricValue::ElapsedCompute(_)))
706 .collect::<Vec<_>>();
707 assert_eq!(elapsed_computes.len(), 1);
708 assert_eq!(elapsed_computes[0].value().as_usize(), 12 + 34 + 56);
709 assert!(elapsed_computes[0].partition().is_none());
710
711 let output_rows = aggregated
713 .iter()
714 .filter(|metric| matches!(metric.value(), MetricValue::OutputRows(_)))
715 .collect::<Vec<_>>();
716 assert_eq!(output_rows.len(), 1);
717 assert_eq!(output_rows[0].value().as_usize(), 56);
718 assert!(output_rows[0].partition.is_none())
719 }
720
721 #[test]
722 #[should_panic(expected = "Mismatched metric types. Can not aggregate Count")]
723 fn test_aggregate_partition_bad_sum() {
724 let metrics = ExecutionPlanMetricsSet::new();
725
726 let count = MetricBuilder::new(&metrics).counter("my_metric", 1);
727 count.add(1);
728
729 let time = MetricBuilder::new(&metrics).subset_time("my_metric", 1);
730 time.add_duration(Duration::from_nanos(10));
731
732 metrics.clone_inner().aggregate_by_name();
734 }
735
736 #[test]
737 fn test_aggregate_partition_timestamps() {
738 let metrics = ExecutionPlanMetricsSet::new();
739
740 let t1 = Utc.timestamp_nanos(1431648000000000);
742 let t2 = Utc.timestamp_nanos(1531648000000000);
744 let t3 = Utc.timestamp_nanos(1631648000000000);
746 let t4 = Utc.timestamp_nanos(1731648000000000);
748
749 let start_timestamp0 = MetricBuilder::new(&metrics).start_timestamp(0);
750 start_timestamp0.set(t1);
751 let end_timestamp0 = MetricBuilder::new(&metrics).end_timestamp(0);
752 end_timestamp0.set(t2);
753 let start_timestamp1 = MetricBuilder::new(&metrics).start_timestamp(0);
754 start_timestamp1.set(t3);
755 let end_timestamp1 = MetricBuilder::new(&metrics).end_timestamp(0);
756 end_timestamp1.set(t4);
757
758 let aggregated = metrics.clone_inner().aggregate_by_name();
760
761 let mut ts = aggregated
762 .iter()
763 .filter(|metric| {
764 matches!(metric.value(), MetricValue::StartTimestamp(_))
765 && metric.labels().is_empty()
766 })
767 .collect::<Vec<_>>();
768 assert_eq!(ts.len(), 1);
769 match ts.remove(0).value() {
770 MetricValue::StartTimestamp(ts) => {
771 assert_eq!(ts.value(), Some(t1));
773 }
774 _ => {
775 panic!("Not a timestamp");
776 }
777 };
778
779 let mut ts = aggregated
780 .iter()
781 .filter(|metric| {
782 matches!(metric.value(), MetricValue::EndTimestamp(_))
783 && metric.labels().is_empty()
784 })
785 .collect::<Vec<_>>();
786 assert_eq!(ts.len(), 1);
787 match ts.remove(0).value() {
788 MetricValue::EndTimestamp(ts) => {
789 assert_eq!(ts.value(), Some(t4));
791 }
792 _ => {
793 panic!("Not a timestamp");
794 }
795 };
796 }
797
798 #[test]
799 fn test_extend() {
800 let mut metrics = MetricsSet::new();
801 let m1 = Arc::new(Metric::new(MetricValue::OutputRows(Count::new()), None));
802 let m2 = Arc::new(Metric::new(MetricValue::SpillCount(Count::new()), None));
803
804 metrics.extend([Arc::clone(&m1), Arc::clone(&m2)]);
805 assert_eq!(metrics.iter().count(), 2);
806
807 let m3 = Arc::new(Metric::new(MetricValue::SpilledBytes(Count::new()), None));
808 metrics.extend(std::iter::once(Arc::clone(&m3)));
809 assert_eq!(metrics.iter().count(), 3);
810 }
811
812 #[test]
813 fn test_collect() {
814 let m1 = Arc::new(Metric::new(MetricValue::OutputRows(Count::new()), None));
815 let m2 = Arc::new(Metric::new(MetricValue::SpillCount(Count::new()), None));
816
817 let metrics: MetricsSet =
818 vec![Arc::clone(&m1), Arc::clone(&m2)].into_iter().collect();
819 assert_eq!(metrics.iter().count(), 2);
820
821 let empty: MetricsSet = std::iter::empty().collect();
822 assert_eq!(empty.iter().count(), 0);
823 }
824
825 #[test]
826 fn test_into_iterator_by_ref() {
827 let mut metrics = MetricsSet::new();
828 metrics.push(Arc::new(Metric::new(
829 MetricValue::OutputRows(Count::new()),
830 None,
831 )));
832 metrics.push(Arc::new(Metric::new(
833 MetricValue::SpillCount(Count::new()),
834 None,
835 )));
836
837 let mut count = 0;
838 for _m in &metrics {
839 count += 1;
840 }
841 assert_eq!(count, 2);
842 }
843
844 #[test]
845 fn test_sorted_for_display() {
846 let metrics = ExecutionPlanMetricsSet::new();
847 MetricBuilder::new(&metrics).end_timestamp(0);
848 MetricBuilder::new(&metrics).start_timestamp(0);
849 MetricBuilder::new(&metrics).elapsed_compute(0);
850 MetricBuilder::new(&metrics).counter("the_second_counter", 0);
851 MetricBuilder::new(&metrics).counter("the_counter", 0);
852 MetricBuilder::new(&metrics).counter("the_third_counter", 0);
853 MetricBuilder::new(&metrics).subset_time("the_time", 0);
854 MetricBuilder::new(&metrics).output_rows(0);
855 let metrics = metrics.clone_inner();
856
857 fn metric_names(metrics: &MetricsSet) -> String {
858 let n = metrics.iter().map(|m| m.value().name()).collect::<Vec<_>>();
859 n.join(", ")
860 }
861
862 assert_eq!(
863 "end_timestamp, start_timestamp, elapsed_compute, the_second_counter, the_counter, the_third_counter, the_time, output_rows",
864 metric_names(&metrics)
865 );
866
867 let metrics = metrics.sorted_for_display();
868 assert_eq!(
869 "output_rows, elapsed_compute, the_counter, the_second_counter, the_third_counter, the_time, start_timestamp, end_timestamp",
870 metric_names(&metrics)
871 );
872 }
873}