vortex_datafusion/persistent/
metrics.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Vortex table provider metrics.
5use std::sync::Arc;
6
7use datafusion_datasource::file_scan_config::FileScanConfig;
8use datafusion_datasource::source::DataSourceExec;
9use datafusion_physical_plan::ExecutionPlan;
10use datafusion_physical_plan::ExecutionPlanVisitor;
11use datafusion_physical_plan::Metric as DatafusionMetric;
12use datafusion_physical_plan::accept;
13use datafusion_physical_plan::metrics::Count;
14use datafusion_physical_plan::metrics::Gauge;
15use datafusion_physical_plan::metrics::Label as DatafusionLabel;
16use datafusion_physical_plan::metrics::MetricValue as DatafusionMetricValue;
17use datafusion_physical_plan::metrics::MetricsSet;
18use vortex::metrics::Metric;
19use vortex::metrics::MetricId;
20use vortex::metrics::MetricsSessionExt;
21use vortex::metrics::Tags;
22
23use crate::persistent::source::VortexSource;
24
25pub(crate) static PARTITION_LABEL: &str = "partition";
26
27/// Extracts datafusion metrics from all VortexExec instances in
28/// a given physical plan.
29#[derive(Default)]
30pub struct VortexMetricsFinder(Vec<MetricsSet>);
31
32impl VortexMetricsFinder {
33    /// find all metrics for VortexExec nodes.
34    pub fn find_all(plan: &dyn ExecutionPlan) -> Vec<MetricsSet> {
35        let mut finder = Self::default();
36        match accept(plan, &mut finder) {
37            Ok(()) => finder.0,
38            Err(_) => Vec::new(),
39        }
40    }
41}
42
43impl ExecutionPlanVisitor for VortexMetricsFinder {
44    type Error = std::convert::Infallible;
45    fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
46        if let Some(exec) = plan.as_any().downcast_ref::<DataSourceExec>() {
47            if let Some(metrics) = exec.metrics() {
48                self.0.push(metrics);
49            }
50
51            // Include our own metrics from VortexSource
52            if let Some(file_scan) = exec.data_source().as_any().downcast_ref::<FileScanConfig>()
53                && let Some(scan) = file_scan
54                    .file_source
55                    .as_any()
56                    .downcast_ref::<VortexSource>()
57            {
58                let mut set = MetricsSet::new();
59                for metric in scan
60                    .session
61                    .metrics()
62                    .snapshot()
63                    .iter()
64                    .flat_map(|(id, metric)| metric_to_datafusion(id, metric))
65                {
66                    set.push(Arc::new(metric));
67                }
68
69                self.0.push(set);
70            }
71        }
72
73        Ok(true)
74    }
75}
76
77fn metric_to_datafusion(id: MetricId, metric: &Metric) -> impl Iterator<Item = DatafusionMetric> {
78    let (partition, labels) = tags_to_datafusion(id.tags());
79    metric_value_to_datafusion(id.name(), metric)
80        .into_iter()
81        .map(move |metric_value| {
82            DatafusionMetric::new_with_labels(metric_value, partition, labels.clone())
83        })
84}
85
86fn tags_to_datafusion(tags: &Tags) -> (Option<usize>, Vec<DatafusionLabel>) {
87    tags.iter()
88        .fold((None, Vec::new()), |(mut partition, mut labels), (k, v)| {
89            if k == PARTITION_LABEL {
90                partition = v.parse().ok();
91            } else {
92                labels.push(DatafusionLabel::new(k.to_string(), v.to_string()));
93            }
94            (partition, labels)
95        })
96}
97
98fn metric_value_to_datafusion(name: &str, metric: &Metric) -> Vec<DatafusionMetricValue> {
99    match metric {
100        Metric::Counter(counter) => counter
101            .count()
102            .try_into()
103            .into_iter()
104            .map(|count| df_counter(name.to_string(), count))
105            .collect(),
106        Metric::Histogram(hist) => {
107            let mut res = Vec::new();
108            if let Ok(count) = hist.count().try_into() {
109                res.push(df_counter(format!("{name}_count"), count));
110            }
111            let snapshot = hist.snapshot();
112            if let Ok(max) = snapshot.max().try_into() {
113                res.push(df_gauge(format!("{name}_max"), max));
114            }
115            if let Ok(min) = snapshot.min().try_into() {
116                res.push(df_gauge(format!("{name}_min"), min));
117            }
118            if let Some(p90) = f_to_u(snapshot.value(0.90)) {
119                res.push(df_gauge(format!("{name}_p95"), p90));
120            }
121            if let Some(p99) = f_to_u(snapshot.value(0.99)) {
122                res.push(df_gauge(format!("{name}_p99"), p99));
123            }
124            res
125        }
126        Metric::Timer(timer) => {
127            let mut res = Vec::new();
128            if let Ok(count) = timer.count().try_into() {
129                res.push(df_counter(format!("{name}_count"), count));
130            }
131            let snapshot = timer.snapshot();
132            if let Ok(max) = snapshot.max().try_into() {
133                // NOTE(os): unlike Time metrics, gauges allow custom aggregation
134                res.push(df_gauge(format!("{name}_max"), max));
135            }
136            if let Ok(min) = snapshot.min().try_into() {
137                res.push(df_gauge(format!("{name}_min"), min));
138            }
139            if let Some(p95) = f_to_u(snapshot.value(0.95)) {
140                res.push(df_gauge(format!("{name}_p95"), p95));
141            }
142            if let Some(p99) = f_to_u(snapshot.value(0.95)) {
143                res.push(df_gauge(format!("{name}_p99"), p99));
144            }
145            res
146        }
147        // TODO(os): add more metric types when added to VortexMetrics
148        _ => vec![],
149    }
150}
151
152fn df_counter(name: String, value: usize) -> DatafusionMetricValue {
153    let count = Count::new();
154    count.add(value);
155    DatafusionMetricValue::Count {
156        name: name.into(),
157        count,
158    }
159}
160
161fn df_gauge(name: String, value: usize) -> DatafusionMetricValue {
162    let gauge = Gauge::new();
163    gauge.set(value);
164    DatafusionMetricValue::Gauge {
165        name: name.into(),
166        gauge,
167    }
168}
169
170#[expect(
171    clippy::cast_possible_truncation,
172    reason = "truncation is checked before cast"
173)]
174fn f_to_u(f: f64) -> Option<usize> {
175    (f.is_finite() && f >= usize::MIN as f64 && f <= usize::MAX as f64).then(||
176        // After the range check, truncation is guaranteed to keep the value in usize bounds.
177        f.trunc() as usize)
178}