1mod baseline;
21mod builder;
22mod custom;
23mod expression;
24mod value;
25
26use datafusion_common::HashMap;
27use parking_lot::Mutex;
28use std::{
29 borrow::Cow,
30 fmt::{Debug, Display},
31 sync::Arc,
32};
33
34pub use baseline::{BaselineMetrics, RecordOutput, SpillMetrics, SplitMetrics};
37pub use builder::MetricBuilder;
38pub use custom::CustomMetricValue;
39pub use expression::ExpressionEvaluatorMetrics;
40pub use value::{
41 Count, Gauge, MetricValue, PruningMetrics, RatioMergeStrategy, RatioMetrics,
42 ScopedTimerGuard, Time, Timestamp,
43};
44
45#[derive(Debug)]
72pub struct Metric {
73 value: MetricValue,
75
76 labels: Vec<Label>,
78
79 partition: Option<usize>,
82
83 metric_type: MetricType,
84}
85
86#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
100pub enum MetricType {
101 SUMMARY,
103 DEV,
105}
106
107impl Display for Metric {
108 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
109 write!(f, "{}", self.value.name())?;
110
111 let mut iter = self
112 .partition
113 .iter()
114 .map(|partition| Label::new("partition", partition.to_string()))
115 .chain(self.labels().iter().cloned())
116 .peekable();
117
118 if iter.peek().is_some() {
120 write!(f, "{{")?;
121
122 let mut is_first = true;
123 for i in iter {
124 if !is_first {
125 write!(f, ", ")?;
126 } else {
127 is_first = false;
128 }
129
130 write!(f, "{i}")?;
131 }
132
133 write!(f, "}}")?;
134 }
135
136 write!(f, "={}", self.value)
138 }
139}
140
141impl Metric {
142 pub fn new(value: MetricValue, partition: Option<usize>) -> Self {
145 Self {
146 value,
147 labels: vec![],
148 partition,
149 metric_type: MetricType::DEV,
150 }
151 }
152
153 pub fn new_with_labels(
156 value: MetricValue,
157 partition: Option<usize>,
158 labels: Vec<Label>,
159 ) -> Self {
160 Self {
161 value,
162 labels,
163 partition,
164 metric_type: MetricType::DEV,
165 }
166 }
167
168 pub fn with_type(mut self, metric_type: MetricType) -> Self {
170 self.metric_type = metric_type;
171 self
172 }
173
174 pub fn with_label(mut self, label: Label) -> Self {
176 self.labels.push(label);
177 self
178 }
179
180 pub fn labels(&self) -> &[Label] {
182 &self.labels
183 }
184
185 pub fn value(&self) -> &MetricValue {
187 &self.value
188 }
189
190 pub fn value_mut(&mut self) -> &mut MetricValue {
192 &mut self.value
193 }
194
195 pub fn partition(&self) -> Option<usize> {
197 self.partition
198 }
199
200 pub fn metric_type(&self) -> MetricType {
202 self.metric_type
203 }
204}
205
206#[derive(Default, Debug, Clone)]
208pub struct MetricsSet {
209 metrics: Vec<Arc<Metric>>,
210}
211
212impl MetricsSet {
213 pub fn new() -> Self {
215 Default::default()
216 }
217
218 pub fn push(&mut self, metric: Arc<Metric>) {
220 self.metrics.push(metric)
221 }
222
223 pub fn iter(&self) -> impl Iterator<Item = &Arc<Metric>> {
225 self.metrics.iter()
226 }
227
228 pub fn output_rows(&self) -> Option<usize> {
231 self.sum(|metric| matches!(metric.value(), MetricValue::OutputRows(_)))
232 .map(|v| v.as_usize())
233 }
234
235 pub fn spill_count(&self) -> Option<usize> {
238 self.sum(|metric| matches!(metric.value(), MetricValue::SpillCount(_)))
239 .map(|v| v.as_usize())
240 }
241
242 pub fn spilled_bytes(&self) -> Option<usize> {
245 self.sum(|metric| matches!(metric.value(), MetricValue::SpilledBytes(_)))
246 .map(|v| v.as_usize())
247 }
248
249 pub fn spilled_rows(&self) -> Option<usize> {
252 self.sum(|metric| matches!(metric.value(), MetricValue::SpilledRows(_)))
253 .map(|v| v.as_usize())
254 }
255
256 pub fn elapsed_compute(&self) -> Option<usize> {
259 self.sum(|metric| matches!(metric.value(), MetricValue::ElapsedCompute(_)))
260 .map(|v| v.as_usize())
261 }
262
263 pub fn sum<F>(&self, mut f: F) -> Option<MetricValue>
267 where
268 F: FnMut(&Metric) -> bool,
269 {
270 let mut iter = self
271 .metrics
272 .iter()
273 .filter(|metric| f(metric.as_ref()))
274 .peekable();
275
276 let mut accum = match iter.peek() {
277 None => {
278 return None;
279 }
280 Some(metric) => metric.value().new_empty(),
281 };
282
283 iter.for_each(|metric| accum.aggregate(metric.value()));
284
285 Some(accum)
286 }
287
288 pub fn sum_by_name(&self, metric_name: &str) -> Option<MetricValue> {
291 self.sum(|m| match m.value() {
292 MetricValue::Count { name, .. } => name == metric_name,
293 MetricValue::Time { name, .. } => name == metric_name,
294 MetricValue::OutputRows(_) => false,
295 MetricValue::ElapsedCompute(_) => false,
296 MetricValue::SpillCount(_) => false,
297 MetricValue::SpilledBytes(_) => false,
298 MetricValue::OutputBytes(_) => false,
299 MetricValue::OutputBatches(_) => false,
300 MetricValue::SpilledRows(_) => false,
301 MetricValue::CurrentMemoryUsage(_) => false,
302 MetricValue::Gauge { name, .. } => name == metric_name,
303 MetricValue::StartTimestamp(_) => false,
304 MetricValue::EndTimestamp(_) => false,
305 MetricValue::PruningMetrics { name, .. } => name == metric_name,
306 MetricValue::Ratio { name, .. } => name == metric_name,
307 MetricValue::Custom { .. } => false,
308 })
309 }
310
311 pub fn aggregate_by_name(&self) -> Self {
316 let mut map = HashMap::new();
317
318 for metric in &self.metrics {
320 let key = metric.value.name();
321 map.entry(key)
322 .and_modify(|accum: &mut Metric| {
323 accum.value_mut().aggregate(metric.value());
324 })
325 .or_insert_with(|| {
326 let partition = None;
328 let mut accum = Metric::new(metric.value().new_empty(), partition)
329 .with_type(metric.metric_type());
330 accum.value_mut().aggregate(metric.value());
331 accum
332 });
333 }
334
335 let new_metrics = map
336 .into_iter()
337 .map(|(_k, v)| Arc::new(v))
338 .collect::<Vec<_>>();
339
340 Self {
341 metrics: new_metrics,
342 }
343 }
344
345 pub fn sorted_for_display(mut self) -> Self {
347 self.metrics.sort_unstable_by_key(|metric| {
348 (
349 metric.value().display_sort_key(),
350 metric.value().name().to_owned(),
351 )
352 });
353 self
354 }
355
356 pub fn timestamps_removed(self) -> Self {
358 let Self { metrics } = self;
359
360 let metrics = metrics
361 .into_iter()
362 .filter(|m| !m.value.is_timestamp())
363 .collect::<Vec<_>>();
364
365 Self { metrics }
366 }
367
368 pub fn filter_by_metric_types(self, allowed: &[MetricType]) -> Self {
371 if allowed.is_empty() {
372 return Self { metrics: vec![] };
373 }
374
375 let metrics = self
376 .metrics
377 .into_iter()
378 .filter(|metric| allowed.contains(&metric.metric_type()))
379 .collect::<Vec<_>>();
380 Self { metrics }
381 }
382}
383
384impl Display for MetricsSet {
385 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
387 let mut is_first = true;
388 for i in self.metrics.iter() {
389 if !is_first {
390 write!(f, ", ")?;
391 } else {
392 is_first = false;
393 }
394
395 write!(f, "{i}")?;
396 }
397 Ok(())
398 }
399}
400
401#[derive(Default, Debug, Clone)]
410pub struct ExecutionPlanMetricsSet {
411 inner: Arc<Mutex<MetricsSet>>,
412}
413
414impl ExecutionPlanMetricsSet {
415 pub fn new() -> Self {
417 Self {
418 inner: Arc::new(Mutex::new(MetricsSet::new())),
419 }
420 }
421
422 pub fn register(&self, metric: Arc<Metric>) {
424 self.inner.lock().push(metric)
425 }
426
427 pub fn clone_inner(&self) -> MetricsSet {
429 let guard = self.inner.lock();
430 (*guard).clone()
431 }
432}
433
434#[derive(Debug, Clone, PartialEq, Eq, Hash)]
448pub struct Label {
449 name: Cow<'static, str>,
450 value: Cow<'static, str>,
451}
452
453impl Label {
454 pub fn new(
456 name: impl Into<Cow<'static, str>>,
457 value: impl Into<Cow<'static, str>>,
458 ) -> Self {
459 let name = name.into();
460 let value = value.into();
461 Self { name, value }
462 }
463
464 pub fn name(&self) -> &str {
466 self.name.as_ref()
467 }
468
469 pub fn value(&self) -> &str {
471 self.value.as_ref()
472 }
473}
474
475impl Display for Label {
476 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
477 write!(f, "{}={}", self.name, self.value)
478 }
479}
480
481#[cfg(test)]
482mod tests {
483 use std::time::Duration;
484
485 use chrono::{TimeZone, Utc};
486
487 use super::*;
488
489 #[test]
490 fn test_display_no_labels_no_partition() {
491 let count = Count::new();
492 count.add(33);
493 let value = MetricValue::OutputRows(count);
494 let partition = None;
495 let metric = Metric::new(value, partition);
496
497 assert_eq!("output_rows=33", metric.to_string())
498 }
499
500 #[test]
501 fn test_display_no_labels_with_partition() {
502 let count = Count::new();
503 count.add(44);
504 let value = MetricValue::OutputRows(count);
505 let partition = Some(1);
506 let metric = Metric::new(value, partition);
507
508 assert_eq!("output_rows{partition=1}=44", metric.to_string())
509 }
510
511 #[test]
512 fn test_display_labels_no_partition() {
513 let count = Count::new();
514 count.add(55);
515 let value = MetricValue::OutputRows(count);
516 let partition = None;
517 let label = Label::new("foo", "bar");
518 let metric = Metric::new_with_labels(value, partition, vec![label]);
519
520 assert_eq!("output_rows{foo=bar}=55", metric.to_string())
521 }
522
523 #[test]
524 fn test_display_labels_and_partition() {
525 let count = Count::new();
526 count.add(66);
527 let value = MetricValue::OutputRows(count);
528 let partition = Some(2);
529 let label = Label::new("foo", "bar");
530 let metric = Metric::new_with_labels(value, partition, vec![label]);
531
532 assert_eq!("output_rows{partition=2, foo=bar}=66", metric.to_string())
533 }
534
535 #[test]
536 fn test_output_rows() {
537 let metrics = ExecutionPlanMetricsSet::new();
538 assert!(metrics.clone_inner().output_rows().is_none());
539
540 let partition = 1;
541 let output_rows = MetricBuilder::new(&metrics).output_rows(partition);
542 output_rows.add(13);
543
544 let output_rows = MetricBuilder::new(&metrics).output_rows(partition + 1);
545 output_rows.add(7);
546 assert_eq!(metrics.clone_inner().output_rows().unwrap(), 20);
547 }
548
549 #[test]
550 fn test_elapsed_compute() {
551 let metrics = ExecutionPlanMetricsSet::new();
552 assert!(metrics.clone_inner().elapsed_compute().is_none());
553
554 let partition = 1;
555 let elapsed_compute = MetricBuilder::new(&metrics).elapsed_compute(partition);
556 elapsed_compute.add_duration(Duration::from_nanos(1234));
557
558 let elapsed_compute = MetricBuilder::new(&metrics).elapsed_compute(partition + 1);
559 elapsed_compute.add_duration(Duration::from_nanos(6));
560 assert_eq!(metrics.clone_inner().elapsed_compute().unwrap(), 1240);
561 }
562
563 #[test]
564 fn test_sum() {
565 let metrics = ExecutionPlanMetricsSet::new();
566
567 let count1 = MetricBuilder::new(&metrics)
568 .with_new_label("foo", "bar")
569 .counter("my_counter", 1);
570 count1.add(1);
571
572 let count2 = MetricBuilder::new(&metrics).counter("my_counter", 2);
573 count2.add(2);
574
575 let metrics = metrics.clone_inner();
576 assert!(metrics.sum(|_| false).is_none());
577
578 let expected_count = Count::new();
579 expected_count.add(3);
580 let expected_sum = MetricValue::Count {
581 name: "my_counter".into(),
582 count: expected_count,
583 };
584
585 assert_eq!(metrics.sum(|_| true), Some(expected_sum));
586 }
587
588 #[test]
589 #[should_panic(expected = "Mismatched metric types. Can not aggregate Count")]
590 fn test_bad_sum() {
591 let metrics = ExecutionPlanMetricsSet::new();
593
594 let count = MetricBuilder::new(&metrics).counter("my_metric", 1);
595 count.add(1);
596
597 let time = MetricBuilder::new(&metrics).subset_time("my_metric", 1);
598 time.add_duration(Duration::from_nanos(10));
599
600 metrics.clone_inner().sum(|_| true);
602 }
603
604 #[test]
605 fn test_aggregate_by_name() {
606 let metrics = ExecutionPlanMetricsSet::new();
607
608 let elapsed_compute1 = MetricBuilder::new(&metrics)
610 .with_new_label("foo", "bar")
611 .elapsed_compute(1);
612 elapsed_compute1.add_duration(Duration::from_nanos(12));
613
614 let elapsed_compute2 = MetricBuilder::new(&metrics).elapsed_compute(2);
615 elapsed_compute2.add_duration(Duration::from_nanos(34));
616
617 let elapsed_compute3 = MetricBuilder::new(&metrics).elapsed_compute(4);
618 elapsed_compute3.add_duration(Duration::from_nanos(56));
619
620 let output_rows = MetricBuilder::new(&metrics).output_rows(1); output_rows.add(56);
622
623 let aggregated = metrics.clone_inner().aggregate_by_name();
624
625 let elapsed_computes = aggregated
627 .iter()
628 .filter(|metric| matches!(metric.value(), MetricValue::ElapsedCompute(_)))
629 .collect::<Vec<_>>();
630 assert_eq!(elapsed_computes.len(), 1);
631 assert_eq!(elapsed_computes[0].value().as_usize(), 12 + 34 + 56);
632 assert!(elapsed_computes[0].partition().is_none());
633
634 let output_rows = aggregated
636 .iter()
637 .filter(|metric| matches!(metric.value(), MetricValue::OutputRows(_)))
638 .collect::<Vec<_>>();
639 assert_eq!(output_rows.len(), 1);
640 assert_eq!(output_rows[0].value().as_usize(), 56);
641 assert!(output_rows[0].partition.is_none())
642 }
643
644 #[test]
645 #[should_panic(expected = "Mismatched metric types. Can not aggregate Count")]
646 fn test_aggregate_partition_bad_sum() {
647 let metrics = ExecutionPlanMetricsSet::new();
648
649 let count = MetricBuilder::new(&metrics).counter("my_metric", 1);
650 count.add(1);
651
652 let time = MetricBuilder::new(&metrics).subset_time("my_metric", 1);
653 time.add_duration(Duration::from_nanos(10));
654
655 metrics.clone_inner().aggregate_by_name();
657 }
658
659 #[test]
660 fn test_aggregate_partition_timestamps() {
661 let metrics = ExecutionPlanMetricsSet::new();
662
663 let t1 = Utc.timestamp_nanos(1431648000000000);
665 let t2 = Utc.timestamp_nanos(1531648000000000);
667 let t3 = Utc.timestamp_nanos(1631648000000000);
669 let t4 = Utc.timestamp_nanos(1731648000000000);
671
672 let start_timestamp0 = MetricBuilder::new(&metrics).start_timestamp(0);
673 start_timestamp0.set(t1);
674 let end_timestamp0 = MetricBuilder::new(&metrics).end_timestamp(0);
675 end_timestamp0.set(t2);
676 let start_timestamp1 = MetricBuilder::new(&metrics).start_timestamp(0);
677 start_timestamp1.set(t3);
678 let end_timestamp1 = MetricBuilder::new(&metrics).end_timestamp(0);
679 end_timestamp1.set(t4);
680
681 let aggregated = metrics.clone_inner().aggregate_by_name();
683
684 let mut ts = aggregated
685 .iter()
686 .filter(|metric| {
687 matches!(metric.value(), MetricValue::StartTimestamp(_))
688 && metric.labels().is_empty()
689 })
690 .collect::<Vec<_>>();
691 assert_eq!(ts.len(), 1);
692 match ts.remove(0).value() {
693 MetricValue::StartTimestamp(ts) => {
694 assert_eq!(ts.value(), Some(t1));
696 }
697 _ => {
698 panic!("Not a timestamp");
699 }
700 };
701
702 let mut ts = aggregated
703 .iter()
704 .filter(|metric| {
705 matches!(metric.value(), MetricValue::EndTimestamp(_))
706 && metric.labels().is_empty()
707 })
708 .collect::<Vec<_>>();
709 assert_eq!(ts.len(), 1);
710 match ts.remove(0).value() {
711 MetricValue::EndTimestamp(ts) => {
712 assert_eq!(ts.value(), Some(t4));
714 }
715 _ => {
716 panic!("Not a timestamp");
717 }
718 };
719 }
720
721 #[test]
722 fn test_sorted_for_display() {
723 let metrics = ExecutionPlanMetricsSet::new();
724 MetricBuilder::new(&metrics).end_timestamp(0);
725 MetricBuilder::new(&metrics).start_timestamp(0);
726 MetricBuilder::new(&metrics).elapsed_compute(0);
727 MetricBuilder::new(&metrics).counter("the_second_counter", 0);
728 MetricBuilder::new(&metrics).counter("the_counter", 0);
729 MetricBuilder::new(&metrics).counter("the_third_counter", 0);
730 MetricBuilder::new(&metrics).subset_time("the_time", 0);
731 MetricBuilder::new(&metrics).output_rows(0);
732 let metrics = metrics.clone_inner();
733
734 fn metric_names(metrics: &MetricsSet) -> String {
735 let n = metrics.iter().map(|m| m.value().name()).collect::<Vec<_>>();
736 n.join(", ")
737 }
738
739 assert_eq!(
740 "end_timestamp, start_timestamp, elapsed_compute, the_second_counter, the_counter, the_third_counter, the_time, output_rows",
741 metric_names(&metrics)
742 );
743
744 let metrics = metrics.sorted_for_display();
745 assert_eq!(
746 "output_rows, elapsed_compute, the_counter, the_second_counter, the_third_counter, the_time, start_timestamp, end_timestamp",
747 metric_names(&metrics)
748 );
749 }
750}