Skip to main content

vortex_datafusion/persistent/
metrics.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Helpers for extracting Vortex scan metrics from DataFusion execution plans.
5use std::sync::Arc;
6use std::time::Duration;
7
8use datafusion_datasource::file_scan_config::FileScanConfig;
9use datafusion_datasource::source::DataSourceExec;
10use datafusion_physical_plan::ExecutionPlan;
11use datafusion_physical_plan::ExecutionPlanVisitor;
12use datafusion_physical_plan::Metric as DatafusionMetric;
13use datafusion_physical_plan::accept;
14use datafusion_physical_plan::metrics::Count;
15use datafusion_physical_plan::metrics::Gauge;
16use datafusion_physical_plan::metrics::Label as DatafusionLabel;
17use datafusion_physical_plan::metrics::MetricValue as DatafusionMetricValue;
18use datafusion_physical_plan::metrics::MetricsSet;
19use datafusion_physical_plan::metrics::Time;
20use vortex::error::VortexExpect;
21use vortex::metrics::Label;
22use vortex::metrics::Metric;
23use vortex::metrics::MetricValue;
24
25use crate::persistent::source::VortexSource;
26
27pub(crate) static PARTITION_LABEL: &str = "partition";
28pub(crate) static PATH_LABEL: &str = "file_path";
29
30/// Walks a physical plan and returns one [`MetricsSet`] per
31/// [`DataSourceExec`].
32///
33/// For Vortex-backed scans, the returned metrics include both the metrics
34/// already attached to the `DataSourceExec` and the Vortex metrics accumulated
35/// in [`VortexSource::metrics_registry`].
36///
37/// This helper exists because the Vortex read path records most scan metrics in
38/// a Vortex [`MetricsRegistry`] rather than in DataFusion's native metrics set.
39///
40/// # Example
41///
42/// ```no_run
43/// # use std::sync::Arc;
44/// use datafusion_physical_plan::ExecutionPlan;
45/// use vortex_datafusion::metrics::VortexMetricsFinder;
46///
47/// # let plan: Arc<dyn ExecutionPlan> = todo!();
48/// for metrics in VortexMetricsFinder::find_all(plan.as_ref()) {
49///     for metric in metrics.aggregate_by_name().sorted_for_display().iter() {
50///         println!("{metric}");
51///     }
52/// }
53/// ```
54///
55/// [`DataSourceExec`]: datafusion_datasource::source::DataSourceExec
56/// [`VortexSource::metrics_registry`]: crate::VortexSource::metrics_registry
57/// [`MetricsRegistry`]: vortex::metrics::MetricsRegistry
58#[derive(Default)]
59pub struct VortexMetricsFinder(Vec<MetricsSet>);
60
61impl VortexMetricsFinder {
62    /// Collects metrics for each `DataSourceExec` in `plan`, augmenting any
63    /// Vortex-backed scan with the attached Vortex registry snapshot.
64    pub fn find_all(plan: &dyn ExecutionPlan) -> Vec<MetricsSet> {
65        let mut finder = Self::default();
66        match accept(plan, &mut finder) {
67            Ok(()) => finder.0,
68            Err(_) => Vec::new(),
69        }
70    }
71}
72
73impl ExecutionPlanVisitor for VortexMetricsFinder {
74    type Error = std::convert::Infallible;
75    fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
76        if let Some(exec) = plan.as_any().downcast_ref::<DataSourceExec>() {
77            // Start with exec metrics or create a new set
78            let mut set = exec.metrics().unwrap_or_default();
79
80            // Include our own metrics from VortexSource
81            if let Some(file_scan) = exec.data_source().as_any().downcast_ref::<FileScanConfig>()
82                && let Some(scan) = file_scan
83                    .file_source
84                    .as_any()
85                    .downcast_ref::<VortexSource>()
86            {
87                for metric in scan
88                    .metrics_registry()
89                    .snapshot()
90                    .iter()
91                    .flat_map(metric_to_datafusion)
92                {
93                    set.push(Arc::new(metric));
94                }
95            }
96
97            self.0.push(set);
98
99            Ok(false)
100        } else {
101            Ok(true)
102        }
103    }
104}
105
106fn metric_to_datafusion(metric: &Metric) -> impl Iterator<Item = DatafusionMetric> {
107    let (partition, labels) = labels_to_datafusion(metric.labels());
108    metric_value_to_datafusion(metric.name(), metric.value())
109        .into_iter()
110        .map(move |metric_value| {
111            DatafusionMetric::new_with_labels(metric_value, partition, labels.clone())
112        })
113}
114
115fn labels_to_datafusion(tags: &[Label]) -> (Option<usize>, Vec<DatafusionLabel>) {
116    tags.iter()
117        .fold((None, Vec::new()), |(mut partition, mut labels), metric| {
118            if metric.key() == PARTITION_LABEL {
119                partition = metric.value().parse().ok();
120            } else {
121                labels.push(DatafusionLabel::new(
122                    metric.key().to_string(),
123                    metric.value().to_string(),
124                ));
125            }
126            (partition, labels)
127        })
128}
129
130fn metric_value_to_datafusion(name: &str, metric: &MetricValue) -> Vec<DatafusionMetricValue> {
131    match metric {
132        MetricValue::Counter(counter) => counter
133            .value()
134            .try_into()
135            .into_iter()
136            .map(|count| df_counter(name.to_string(), count))
137            .collect(),
138        MetricValue::Histogram(hist) => {
139            let mut res = Vec::new();
140
141            res.push(df_counter(format!("{name}_count"), hist.count()));
142
143            if !hist.is_empty() {
144                if let Some(max) = f_to_u(hist.quantile(1.0).vortex_expect("must not be empty")) {
145                    res.push(df_gauge(format!("{name}_max"), max));
146                }
147
148                if let Some(min) = f_to_u(hist.quantile(0.0).vortex_expect("must not be empty")) {
149                    res.push(df_gauge(format!("{name}_min"), min));
150                }
151
152                if let Some(p95) = f_to_u(hist.quantile(0.95).vortex_expect("must not be empty")) {
153                    res.push(df_gauge(format!("{name}_p95"), p95));
154                }
155                if let Some(p99) = f_to_u(hist.quantile(0.99).vortex_expect("must not be empty")) {
156                    res.push(df_gauge(format!("{name}_p99"), p99));
157                }
158            }
159
160            res
161        }
162        MetricValue::Timer(timer) => {
163            let mut res = Vec::new();
164            res.push(df_counter(format!("{name}_count"), timer.count()));
165
166            if !timer.is_empty() {
167                let max = timer.quantile(1.0).vortex_expect("must not be empty");
168                res.push(df_timer(format!("{name}_max"), max));
169
170                let min = timer.quantile(0.0).vortex_expect("must not be empty");
171                res.push(df_timer(format!("{name}_min"), min));
172
173                let p95 = timer.quantile(0.95).vortex_expect("must not be empty");
174                res.push(df_timer(format!("{name}_p95"), p95));
175
176                let p99 = timer.quantile(0.99).vortex_expect("must not be empty");
177                res.push(df_timer(format!("{name}_p99"), p99));
178            }
179
180            res
181        }
182        // TODO(os): add more metric types when added to VortexMetrics
183        _ => vec![],
184    }
185}
186
187fn df_counter(name: String, value: usize) -> DatafusionMetricValue {
188    let count = Count::new();
189    count.add(value);
190    DatafusionMetricValue::Count {
191        name: name.into(),
192        count,
193    }
194}
195
196fn df_gauge(name: String, value: usize) -> DatafusionMetricValue {
197    let gauge = Gauge::new();
198    gauge.set(value);
199    DatafusionMetricValue::Gauge {
200        name: name.into(),
201        gauge,
202    }
203}
204
205fn df_timer(name: String, value: Duration) -> DatafusionMetricValue {
206    let time = Time::new();
207    time.add_duration(value);
208    DatafusionMetricValue::Time {
209        name: name.into(),
210        time,
211    }
212}
213
214#[expect(
215    clippy::cast_possible_truncation,
216    reason = "truncation is checked before cast"
217)]
218fn f_to_u(f: f64) -> Option<usize> {
219    (f.is_finite() && f >= usize::MIN as f64 && f <= usize::MAX as f64).then(||
220        // After the range check, truncation is guaranteed to keep the value in usize bounds.
221        f.trunc() as usize)
222}
223
224#[cfg(test)]
225mod tests {
226
227    use datafusion_datasource::source::DataSourceExec;
228    use datafusion_physical_plan::ExecutionPlanVisitor;
229    use datafusion_physical_plan::accept;
230
231    use super::VortexMetricsFinder;
232    use crate::common_tests::TestSessionContext;
233
234    /// Counts the number of DataSourceExec nodes in a plan.
235    struct DataSourceExecCounter(usize);
236
237    impl ExecutionPlanVisitor for DataSourceExecCounter {
238        type Error = std::convert::Infallible;
239        fn pre_visit(
240            &mut self,
241            plan: &dyn datafusion_physical_plan::ExecutionPlan,
242        ) -> Result<bool, Self::Error> {
243            if plan.as_any().downcast_ref::<DataSourceExec>().is_some() {
244                self.0 += 1;
245                Ok(false)
246            } else {
247                Ok(true)
248            }
249        }
250    }
251
252    #[tokio::test]
253    async fn metrics_finder_returns_one_set_per_data_source_exec() -> anyhow::Result<()> {
254        let ctx = TestSessionContext::default();
255
256        ctx.session
257            .sql(
258                "CREATE EXTERNAL TABLE my_tbl \
259                (c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
260                STORED AS vortex \
261                LOCATION 'files/'",
262            )
263            .await?;
264
265        ctx.session
266            .sql("INSERT INTO my_tbl VALUES ('a', 1), ('b', 2)")
267            .await?
268            .collect()
269            .await?;
270
271        let df = ctx.session.sql("SELECT * FROM my_tbl").await?;
272        let (state, plan) = df.into_parts();
273        let physical_plan = state.create_physical_plan(&plan).await?;
274
275        // Count DataSourceExec nodes
276        let mut counter = DataSourceExecCounter(0);
277        accept(physical_plan.as_ref(), &mut counter)?;
278
279        // Get metrics sets
280        let metrics_sets = VortexMetricsFinder::find_all(physical_plan.as_ref());
281
282        assert!(!metrics_sets.is_empty());
283        assert_eq!(
284            metrics_sets.len(),
285            counter.0,
286            "Expected one MetricsSet per DataSourceExec, got {} sets for {} DataSourceExec nodes",
287            metrics_sets.len(),
288            counter.0
289        );
290
291        Ok(())
292    }
293}