1mod baseline;
21mod builder;
22mod custom;
23mod value;
24
25use parking_lot::Mutex;
26use std::{
27 borrow::Cow,
28 fmt::{Debug, Display},
29 sync::Arc,
30};
31
32use datafusion_common::HashMap;
33
34pub use baseline::{BaselineMetrics, RecordOutput, SpillMetrics, SplitMetrics};
36pub use builder::MetricBuilder;
37pub use custom::CustomMetricValue;
38pub use value::{
39 Count, Gauge, MetricValue, PruningMetrics, RatioMetrics, ScopedTimerGuard, Time,
40 Timestamp,
41};
42
43#[derive(Debug)]
73pub struct Metric {
74 value: MetricValue,
76
77 labels: Vec<Label>,
79
80 partition: Option<usize>,
83
84 metric_type: MetricType,
85}
86
87#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
101pub enum MetricType {
102 SUMMARY,
104 DEV,
106}
107
108impl Display for Metric {
109 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
110 write!(f, "{}", self.value.name())?;
111
112 let mut iter = self
113 .partition
114 .iter()
115 .map(|partition| Label::new("partition", partition.to_string()))
116 .chain(self.labels().iter().cloned())
117 .peekable();
118
119 if iter.peek().is_some() {
121 write!(f, "{{")?;
122
123 let mut is_first = true;
124 for i in iter {
125 if !is_first {
126 write!(f, ", ")?;
127 } else {
128 is_first = false;
129 }
130
131 write!(f, "{i}")?;
132 }
133
134 write!(f, "}}")?;
135 }
136
137 write!(f, "={}", self.value)
139 }
140}
141
142impl Metric {
143 pub fn new(value: MetricValue, partition: Option<usize>) -> Self {
146 Self {
147 value,
148 labels: vec![],
149 partition,
150 metric_type: MetricType::DEV,
151 }
152 }
153
154 pub fn new_with_labels(
157 value: MetricValue,
158 partition: Option<usize>,
159 labels: Vec<Label>,
160 ) -> Self {
161 Self {
162 value,
163 labels,
164 partition,
165 metric_type: MetricType::DEV,
166 }
167 }
168
169 pub fn with_type(mut self, metric_type: MetricType) -> Self {
171 self.metric_type = metric_type;
172 self
173 }
174
175 pub fn with_label(mut self, label: Label) -> Self {
177 self.labels.push(label);
178 self
179 }
180
181 pub fn labels(&self) -> &[Label] {
183 &self.labels
184 }
185
186 pub fn value(&self) -> &MetricValue {
188 &self.value
189 }
190
191 pub fn value_mut(&mut self) -> &mut MetricValue {
193 &mut self.value
194 }
195
196 pub fn partition(&self) -> Option<usize> {
198 self.partition
199 }
200
201 pub fn metric_type(&self) -> MetricType {
203 self.metric_type
204 }
205}
206
207#[derive(Default, Debug, Clone)]
211pub struct MetricsSet {
212 metrics: Vec<Arc<Metric>>,
213}
214
215impl MetricsSet {
216 pub fn new() -> Self {
218 Default::default()
219 }
220
221 pub fn push(&mut self, metric: Arc<Metric>) {
223 self.metrics.push(metric)
224 }
225
226 pub fn iter(&self) -> impl Iterator<Item = &Arc<Metric>> {
228 self.metrics.iter()
229 }
230
231 pub fn output_rows(&self) -> Option<usize> {
234 self.sum(|metric| matches!(metric.value(), MetricValue::OutputRows(_)))
235 .map(|v| v.as_usize())
236 }
237
238 pub fn spill_count(&self) -> Option<usize> {
241 self.sum(|metric| matches!(metric.value(), MetricValue::SpillCount(_)))
242 .map(|v| v.as_usize())
243 }
244
245 pub fn spilled_bytes(&self) -> Option<usize> {
248 self.sum(|metric| matches!(metric.value(), MetricValue::SpilledBytes(_)))
249 .map(|v| v.as_usize())
250 }
251
252 pub fn spilled_rows(&self) -> Option<usize> {
255 self.sum(|metric| matches!(metric.value(), MetricValue::SpilledRows(_)))
256 .map(|v| v.as_usize())
257 }
258
259 pub fn elapsed_compute(&self) -> Option<usize> {
262 self.sum(|metric| matches!(metric.value(), MetricValue::ElapsedCompute(_)))
263 .map(|v| v.as_usize())
264 }
265
266 pub fn sum<F>(&self, mut f: F) -> Option<MetricValue>
270 where
271 F: FnMut(&Metric) -> bool,
272 {
273 let mut iter = self
274 .metrics
275 .iter()
276 .filter(|metric| f(metric.as_ref()))
277 .peekable();
278
279 let mut accum = match iter.peek() {
280 None => {
281 return None;
282 }
283 Some(metric) => metric.value().new_empty(),
284 };
285
286 iter.for_each(|metric| accum.aggregate(metric.value()));
287
288 Some(accum)
289 }
290
291 pub fn sum_by_name(&self, metric_name: &str) -> Option<MetricValue> {
294 self.sum(|m| match m.value() {
295 MetricValue::Count { name, .. } => name == metric_name,
296 MetricValue::Time { name, .. } => name == metric_name,
297 MetricValue::OutputRows(_) => false,
298 MetricValue::ElapsedCompute(_) => false,
299 MetricValue::SpillCount(_) => false,
300 MetricValue::SpilledBytes(_) => false,
301 MetricValue::OutputBytes(_) => false,
302 MetricValue::SpilledRows(_) => false,
303 MetricValue::CurrentMemoryUsage(_) => false,
304 MetricValue::Gauge { name, .. } => name == metric_name,
305 MetricValue::StartTimestamp(_) => false,
306 MetricValue::EndTimestamp(_) => false,
307 MetricValue::PruningMetrics { name, .. } => name == metric_name,
308 MetricValue::Ratio { name, .. } => name == metric_name,
309 MetricValue::Custom { .. } => false,
310 })
311 }
312
313 pub fn aggregate_by_name(&self) -> Self {
318 let mut map = HashMap::new();
319
320 for metric in &self.metrics {
322 let key = metric.value.name();
323 map.entry(key)
324 .and_modify(|accum: &mut Metric| {
325 accum.value_mut().aggregate(metric.value());
326 })
327 .or_insert_with(|| {
328 let partition = None;
330 let mut accum = Metric::new(metric.value().new_empty(), partition)
331 .with_type(metric.metric_type());
332 accum.value_mut().aggregate(metric.value());
333 accum
334 });
335 }
336
337 let new_metrics = map
338 .into_iter()
339 .map(|(_k, v)| Arc::new(v))
340 .collect::<Vec<_>>();
341
342 Self {
343 metrics: new_metrics,
344 }
345 }
346
347 pub fn sorted_for_display(mut self) -> Self {
349 self.metrics.sort_unstable_by_key(|metric| {
350 (
351 metric.value().display_sort_key(),
352 metric.value().name().to_owned(),
353 )
354 });
355 self
356 }
357
358 pub fn timestamps_removed(self) -> Self {
360 let Self { metrics } = self;
361
362 let metrics = metrics
363 .into_iter()
364 .filter(|m| !m.value.is_timestamp())
365 .collect::<Vec<_>>();
366
367 Self { metrics }
368 }
369
370 pub fn filter_by_metric_types(self, allowed: &[MetricType]) -> Self {
373 if allowed.is_empty() {
374 return Self { metrics: vec![] };
375 }
376
377 let metrics = self
378 .metrics
379 .into_iter()
380 .filter(|metric| allowed.contains(&metric.metric_type()))
381 .collect::<Vec<_>>();
382 Self { metrics }
383 }
384}
385
386impl Display for MetricsSet {
387 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
389 let mut is_first = true;
390 for i in self.metrics.iter() {
391 if !is_first {
392 write!(f, ", ")?;
393 } else {
394 is_first = false;
395 }
396
397 write!(f, "{i}")?;
398 }
399 Ok(())
400 }
401}
402
403#[derive(Default, Debug, Clone)]
415pub struct ExecutionPlanMetricsSet {
416 inner: Arc<Mutex<MetricsSet>>,
417}
418
419impl ExecutionPlanMetricsSet {
420 pub fn new() -> Self {
422 Self {
423 inner: Arc::new(Mutex::new(MetricsSet::new())),
424 }
425 }
426
427 pub fn register(&self, metric: Arc<Metric>) {
429 self.inner.lock().push(metric)
430 }
431
432 pub fn clone_inner(&self) -> MetricsSet {
434 let guard = self.inner.lock();
435 (*guard).clone()
436 }
437}
438
439#[derive(Debug, Clone, PartialEq, Eq, Hash)]
453pub struct Label {
454 name: Cow<'static, str>,
455 value: Cow<'static, str>,
456}
457
458impl Label {
459 pub fn new(
461 name: impl Into<Cow<'static, str>>,
462 value: impl Into<Cow<'static, str>>,
463 ) -> Self {
464 let name = name.into();
465 let value = value.into();
466 Self { name, value }
467 }
468
469 pub fn name(&self) -> &str {
471 self.name.as_ref()
472 }
473
474 pub fn value(&self) -> &str {
476 self.value.as_ref()
477 }
478}
479
480impl Display for Label {
481 fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
482 write!(f, "{}={}", self.name, self.value)
483 }
484}
485
486#[cfg(test)]
487mod tests {
488 use std::time::Duration;
489
490 use chrono::{TimeZone, Utc};
491
492 use super::*;
493
494 #[test]
495 fn test_display_no_labels_no_partition() {
496 let count = Count::new();
497 count.add(33);
498 let value = MetricValue::OutputRows(count);
499 let partition = None;
500 let metric = Metric::new(value, partition);
501
502 assert_eq!("output_rows=33", metric.to_string())
503 }
504
505 #[test]
506 fn test_display_no_labels_with_partition() {
507 let count = Count::new();
508 count.add(44);
509 let value = MetricValue::OutputRows(count);
510 let partition = Some(1);
511 let metric = Metric::new(value, partition);
512
513 assert_eq!("output_rows{partition=1}=44", metric.to_string())
514 }
515
516 #[test]
517 fn test_display_labels_no_partition() {
518 let count = Count::new();
519 count.add(55);
520 let value = MetricValue::OutputRows(count);
521 let partition = None;
522 let label = Label::new("foo", "bar");
523 let metric = Metric::new_with_labels(value, partition, vec![label]);
524
525 assert_eq!("output_rows{foo=bar}=55", metric.to_string())
526 }
527
528 #[test]
529 fn test_display_labels_and_partition() {
530 let count = Count::new();
531 count.add(66);
532 let value = MetricValue::OutputRows(count);
533 let partition = Some(2);
534 let label = Label::new("foo", "bar");
535 let metric = Metric::new_with_labels(value, partition, vec![label]);
536
537 assert_eq!("output_rows{partition=2, foo=bar}=66", metric.to_string())
538 }
539
540 #[test]
541 fn test_output_rows() {
542 let metrics = ExecutionPlanMetricsSet::new();
543 assert!(metrics.clone_inner().output_rows().is_none());
544
545 let partition = 1;
546 let output_rows = MetricBuilder::new(&metrics).output_rows(partition);
547 output_rows.add(13);
548
549 let output_rows = MetricBuilder::new(&metrics).output_rows(partition + 1);
550 output_rows.add(7);
551 assert_eq!(metrics.clone_inner().output_rows().unwrap(), 20);
552 }
553
554 #[test]
555 fn test_elapsed_compute() {
556 let metrics = ExecutionPlanMetricsSet::new();
557 assert!(metrics.clone_inner().elapsed_compute().is_none());
558
559 let partition = 1;
560 let elapsed_compute = MetricBuilder::new(&metrics).elapsed_compute(partition);
561 elapsed_compute.add_duration(Duration::from_nanos(1234));
562
563 let elapsed_compute = MetricBuilder::new(&metrics).elapsed_compute(partition + 1);
564 elapsed_compute.add_duration(Duration::from_nanos(6));
565 assert_eq!(metrics.clone_inner().elapsed_compute().unwrap(), 1240);
566 }
567
568 #[test]
569 fn test_sum() {
570 let metrics = ExecutionPlanMetricsSet::new();
571
572 let count1 = MetricBuilder::new(&metrics)
573 .with_new_label("foo", "bar")
574 .counter("my_counter", 1);
575 count1.add(1);
576
577 let count2 = MetricBuilder::new(&metrics).counter("my_counter", 2);
578 count2.add(2);
579
580 let metrics = metrics.clone_inner();
581 assert!(metrics.sum(|_| false).is_none());
582
583 let expected_count = Count::new();
584 expected_count.add(3);
585 let expected_sum = MetricValue::Count {
586 name: "my_counter".into(),
587 count: expected_count,
588 };
589
590 assert_eq!(metrics.sum(|_| true), Some(expected_sum));
591 }
592
593 #[test]
594 #[should_panic(expected = "Mismatched metric types. Can not aggregate Count")]
595 fn test_bad_sum() {
596 let metrics = ExecutionPlanMetricsSet::new();
598
599 let count = MetricBuilder::new(&metrics).counter("my_metric", 1);
600 count.add(1);
601
602 let time = MetricBuilder::new(&metrics).subset_time("my_metric", 1);
603 time.add_duration(Duration::from_nanos(10));
604
605 metrics.clone_inner().sum(|_| true);
607 }
608
609 #[test]
610 fn test_aggregate_by_name() {
611 let metrics = ExecutionPlanMetricsSet::new();
612
613 let elapsed_compute1 = MetricBuilder::new(&metrics)
615 .with_new_label("foo", "bar")
616 .elapsed_compute(1);
617 elapsed_compute1.add_duration(Duration::from_nanos(12));
618
619 let elapsed_compute2 = MetricBuilder::new(&metrics).elapsed_compute(2);
620 elapsed_compute2.add_duration(Duration::from_nanos(34));
621
622 let elapsed_compute3 = MetricBuilder::new(&metrics).elapsed_compute(4);
623 elapsed_compute3.add_duration(Duration::from_nanos(56));
624
625 let output_rows = MetricBuilder::new(&metrics).output_rows(1); output_rows.add(56);
627
628 let aggregated = metrics.clone_inner().aggregate_by_name();
629
630 let elapsed_computes = aggregated
632 .iter()
633 .filter(|metric| matches!(metric.value(), MetricValue::ElapsedCompute(_)))
634 .collect::<Vec<_>>();
635 assert_eq!(elapsed_computes.len(), 1);
636 assert_eq!(elapsed_computes[0].value().as_usize(), 12 + 34 + 56);
637 assert!(elapsed_computes[0].partition().is_none());
638
639 let output_rows = aggregated
641 .iter()
642 .filter(|metric| matches!(metric.value(), MetricValue::OutputRows(_)))
643 .collect::<Vec<_>>();
644 assert_eq!(output_rows.len(), 1);
645 assert_eq!(output_rows[0].value().as_usize(), 56);
646 assert!(output_rows[0].partition.is_none())
647 }
648
649 #[test]
650 #[should_panic(expected = "Mismatched metric types. Can not aggregate Count")]
651 fn test_aggregate_partition_bad_sum() {
652 let metrics = ExecutionPlanMetricsSet::new();
653
654 let count = MetricBuilder::new(&metrics).counter("my_metric", 1);
655 count.add(1);
656
657 let time = MetricBuilder::new(&metrics).subset_time("my_metric", 1);
658 time.add_duration(Duration::from_nanos(10));
659
660 metrics.clone_inner().aggregate_by_name();
662 }
663
664 #[test]
665 fn test_aggregate_partition_timestamps() {
666 let metrics = ExecutionPlanMetricsSet::new();
667
668 let t1 = Utc.timestamp_nanos(1431648000000000);
670 let t2 = Utc.timestamp_nanos(1531648000000000);
672 let t3 = Utc.timestamp_nanos(1631648000000000);
674 let t4 = Utc.timestamp_nanos(1731648000000000);
676
677 let start_timestamp0 = MetricBuilder::new(&metrics).start_timestamp(0);
678 start_timestamp0.set(t1);
679 let end_timestamp0 = MetricBuilder::new(&metrics).end_timestamp(0);
680 end_timestamp0.set(t2);
681 let start_timestamp1 = MetricBuilder::new(&metrics).start_timestamp(0);
682 start_timestamp1.set(t3);
683 let end_timestamp1 = MetricBuilder::new(&metrics).end_timestamp(0);
684 end_timestamp1.set(t4);
685
686 let aggregated = metrics.clone_inner().aggregate_by_name();
688
689 let mut ts = aggregated
690 .iter()
691 .filter(|metric| {
692 matches!(metric.value(), MetricValue::StartTimestamp(_))
693 && metric.labels().is_empty()
694 })
695 .collect::<Vec<_>>();
696 assert_eq!(ts.len(), 1);
697 match ts.remove(0).value() {
698 MetricValue::StartTimestamp(ts) => {
699 assert_eq!(ts.value(), Some(t1));
701 }
702 _ => {
703 panic!("Not a timestamp");
704 }
705 };
706
707 let mut ts = aggregated
708 .iter()
709 .filter(|metric| {
710 matches!(metric.value(), MetricValue::EndTimestamp(_))
711 && metric.labels().is_empty()
712 })
713 .collect::<Vec<_>>();
714 assert_eq!(ts.len(), 1);
715 match ts.remove(0).value() {
716 MetricValue::EndTimestamp(ts) => {
717 assert_eq!(ts.value(), Some(t4));
719 }
720 _ => {
721 panic!("Not a timestamp");
722 }
723 };
724 }
725
726 #[test]
727 fn test_sorted_for_display() {
728 let metrics = ExecutionPlanMetricsSet::new();
729 MetricBuilder::new(&metrics).end_timestamp(0);
730 MetricBuilder::new(&metrics).start_timestamp(0);
731 MetricBuilder::new(&metrics).elapsed_compute(0);
732 MetricBuilder::new(&metrics).counter("the_second_counter", 0);
733 MetricBuilder::new(&metrics).counter("the_counter", 0);
734 MetricBuilder::new(&metrics).counter("the_third_counter", 0);
735 MetricBuilder::new(&metrics).subset_time("the_time", 0);
736 MetricBuilder::new(&metrics).output_rows(0);
737 let metrics = metrics.clone_inner();
738
739 fn metric_names(metrics: &MetricsSet) -> String {
740 let n = metrics.iter().map(|m| m.value().name()).collect::<Vec<_>>();
741 n.join(", ")
742 }
743
744 assert_eq!("end_timestamp, start_timestamp, elapsed_compute, the_second_counter, the_counter, the_third_counter, the_time, output_rows", metric_names(&metrics));
745
746 let metrics = metrics.sorted_for_display();
747 assert_eq!("output_rows, elapsed_compute, the_counter, the_second_counter, the_third_counter, the_time, start_timestamp, end_timestamp", metric_names(&metrics));
748 }
749}