vortex_datafusion/persistent/
metrics.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Vortex table provider metrics.
5use std::sync::Arc;
6
7use datafusion_datasource::file_scan_config::FileScanConfig;
8use datafusion_datasource::source::DataSourceExec;
9use datafusion_physical_plan::metrics::{
10    Count, Gauge, Label as DatafusionLabel, MetricValue as DatafusionMetricValue, MetricsSet,
11};
12use datafusion_physical_plan::{
13    ExecutionPlan, ExecutionPlanVisitor, Metric as DatafusionMetric, accept,
14};
15use vortex::metrics::{Metric, MetricId, Tags};
16
17use crate::persistent::source::VortexSource;
18
19pub(crate) static PARTITION_LABEL: &str = "partition";
20
21/// Extracts datafusion metrics from all VortexExec instances in
22/// a given physical plan.
23#[derive(Default)]
24pub struct VortexMetricsFinder(Vec<MetricsSet>);
25
26impl VortexMetricsFinder {
27    /// find all metrics for VortexExec nodes.
28    pub fn find_all(plan: &dyn ExecutionPlan) -> Vec<MetricsSet> {
29        let mut finder = Self::default();
30        match accept(plan, &mut finder) {
31            Ok(()) => finder.0,
32            Err(_) => Vec::new(),
33        }
34    }
35}
36
37impl ExecutionPlanVisitor for VortexMetricsFinder {
38    type Error = std::convert::Infallible;
39    fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
40        if let Some(exec) = plan.as_any().downcast_ref::<DataSourceExec>() {
41            if let Some(metrics) = exec.metrics() {
42                self.0.push(metrics);
43            }
44
45            // Include our own metrics from VortexSource
46            if let Some(file_scan) = exec.data_source().as_any().downcast_ref::<FileScanConfig>()
47                && let Some(scan) = file_scan
48                    .file_source
49                    .as_any()
50                    .downcast_ref::<VortexSource>()
51            {
52                let mut set = MetricsSet::new();
53                for metric in scan
54                    .metrics
55                    .snapshot()
56                    .iter()
57                    .flat_map(|(id, metric)| metric_to_datafusion(id, metric))
58                {
59                    set.push(Arc::new(metric));
60                }
61
62                self.0.push(set);
63            }
64        }
65
66        Ok(true)
67    }
68}
69
70fn metric_to_datafusion(id: MetricId, metric: &Metric) -> impl Iterator<Item = DatafusionMetric> {
71    let (partition, labels) = tags_to_datafusion(id.tags());
72    metric_value_to_datafusion(id.name(), metric)
73        .into_iter()
74        .map(move |metric_value| {
75            DatafusionMetric::new_with_labels(metric_value, partition, labels.clone())
76        })
77}
78
79fn tags_to_datafusion(tags: &Tags) -> (Option<usize>, Vec<DatafusionLabel>) {
80    tags.iter()
81        .fold((None, Vec::new()), |(mut partition, mut labels), (k, v)| {
82            if k == PARTITION_LABEL {
83                partition = v.parse().ok();
84            } else {
85                labels.push(DatafusionLabel::new(k.to_string(), v.to_string()));
86            }
87            (partition, labels)
88        })
89}
90
91fn metric_value_to_datafusion(name: &str, metric: &Metric) -> Vec<DatafusionMetricValue> {
92    match metric {
93        Metric::Counter(counter) => counter
94            .count()
95            .try_into()
96            .into_iter()
97            .map(|count| df_counter(name.to_string(), count))
98            .collect(),
99        Metric::Histogram(hist) => {
100            let mut res = Vec::new();
101            if let Ok(count) = hist.count().try_into() {
102                res.push(df_counter(format!("{name}_count"), count));
103            }
104            let snapshot = hist.snapshot();
105            if let Ok(max) = snapshot.max().try_into() {
106                res.push(df_gauge(format!("{name}_max"), max));
107            }
108            if let Ok(min) = snapshot.min().try_into() {
109                res.push(df_gauge(format!("{name}_min"), min));
110            }
111            if let Some(p90) = f_to_u(snapshot.value(0.90)) {
112                res.push(df_gauge(format!("{name}_p95"), p90));
113            }
114            if let Some(p99) = f_to_u(snapshot.value(0.99)) {
115                res.push(df_gauge(format!("{name}_p99"), p99));
116            }
117            res
118        }
119        Metric::Timer(timer) => {
120            let mut res = Vec::new();
121            if let Ok(count) = timer.count().try_into() {
122                res.push(df_counter(format!("{name}_count"), count));
123            }
124            let snapshot = timer.snapshot();
125            if let Ok(max) = snapshot.max().try_into() {
126                // NOTE(os): unlike Time metrics, gauges allow custom aggregation
127                res.push(df_gauge(format!("{name}_max"), max));
128            }
129            if let Ok(min) = snapshot.min().try_into() {
130                res.push(df_gauge(format!("{name}_min"), min));
131            }
132            if let Some(p95) = f_to_u(snapshot.value(0.95)) {
133                res.push(df_gauge(format!("{name}_p95"), p95));
134            }
135            if let Some(p99) = f_to_u(snapshot.value(0.95)) {
136                res.push(df_gauge(format!("{name}_p99"), p99));
137            }
138            res
139        }
140        // TODO(os): add more metric types when added to VortexMetrics
141        _ => vec![],
142    }
143}
144
145fn df_counter(name: String, value: usize) -> DatafusionMetricValue {
146    let count = Count::new();
147    count.add(value);
148    DatafusionMetricValue::Count {
149        name: name.into(),
150        count,
151    }
152}
153
154fn df_gauge(name: String, value: usize) -> DatafusionMetricValue {
155    let gauge = Gauge::new();
156    gauge.set(value);
157    DatafusionMetricValue::Gauge {
158        name: name.into(),
159        gauge,
160    }
161}
162
163#[allow(clippy::cast_possible_truncation)]
164fn f_to_u(f: f64) -> Option<usize> {
165    (f.is_finite() && f >= usize::MIN as f64 && f <= usize::MAX as f64).then(||
166        // After the range check, truncation is guaranteed to keep the value in usize bounds.
167        f.trunc() as usize)
168}