Skip to main content

vortex_datafusion/persistent/
metrics.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Helpers for extracting Vortex scan metrics from DataFusion execution plans.
5use std::sync::Arc;
6use std::time::Duration;
7
8use datafusion_datasource::file_scan_config::FileScanConfig;
9use datafusion_datasource::source::DataSourceExec;
10use datafusion_physical_plan::ExecutionPlan;
11use datafusion_physical_plan::ExecutionPlanVisitor;
12use datafusion_physical_plan::Metric as DatafusionMetric;
13use datafusion_physical_plan::accept;
14use datafusion_physical_plan::metrics::Count;
15use datafusion_physical_plan::metrics::Gauge;
16use datafusion_physical_plan::metrics::Label as DatafusionLabel;
17use datafusion_physical_plan::metrics::MetricValue as DatafusionMetricValue;
18use datafusion_physical_plan::metrics::MetricsSet;
19use datafusion_physical_plan::metrics::Time;
20use vortex::error::VortexExpect;
21use vortex::metrics::Label;
22use vortex::metrics::Metric;
23use vortex::metrics::MetricValue;
24
25use crate::persistent::source::VortexSource;
26
27pub(crate) static PARTITION_LABEL: &str = "partition";
28pub(crate) static PATH_LABEL: &str = "file_path";
29
30/// Walks a physical plan and returns one [`MetricsSet`] per
31/// [`DataSourceExec`].
32///
33/// For Vortex-backed scans, the returned metrics include both the metrics
34/// already attached to the `DataSourceExec` and the Vortex metrics accumulated
35/// in [`VortexSource::metrics_registry`].
36///
37/// This helper exists because the Vortex read path records most scan metrics in
38/// a Vortex [`MetricsRegistry`] rather than in DataFusion's native metrics set.
39///
40/// # Example
41///
42/// ```no_run
43/// # use std::sync::Arc;
44/// use datafusion_physical_plan::ExecutionPlan;
45/// use vortex_datafusion::metrics::VortexMetricsFinder;
46///
47/// # let plan: Arc<dyn ExecutionPlan> = todo!();
48/// for metrics in VortexMetricsFinder::find_all(plan.as_ref()) {
49///     for metric in metrics.aggregate_by_name().sorted_for_display().iter() {
50///         println!("{metric}");
51///     }
52/// }
53/// ```
54///
55/// [`DataSourceExec`]: datafusion_datasource::source::DataSourceExec
56/// [`VortexSource::metrics_registry`]: crate::VortexSource::metrics_registry
57/// [`MetricsRegistry`]: vortex::metrics::MetricsRegistry
58#[derive(Default)]
59pub struct VortexMetricsFinder(Vec<MetricsSet>);
60
61impl VortexMetricsFinder {
62    /// Collects metrics for each `DataSourceExec` in `plan`, augmenting any
63    /// Vortex-backed scan with the attached Vortex registry snapshot.
64    pub fn find_all(plan: &dyn ExecutionPlan) -> Vec<MetricsSet> {
65        let mut finder = Self::default();
66        match accept(plan, &mut finder) {
67            Ok(()) => finder.0,
68            Err(_) => Vec::new(),
69        }
70    }
71}
72
73impl ExecutionPlanVisitor for VortexMetricsFinder {
74    type Error = std::convert::Infallible;
75    fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
76        if let Some(exec) = plan.downcast_ref::<DataSourceExec>() {
77            // Start with exec metrics or create a new set
78            let mut set = exec.metrics().unwrap_or_default();
79
80            // Include our own metrics from VortexSource
81            if let Some(file_scan) = exec.data_source().downcast_ref::<FileScanConfig>()
82                && let Some(scan) = file_scan.file_source.downcast_ref::<VortexSource>()
83            {
84                for metric in scan
85                    .metrics_registry()
86                    .snapshot()
87                    .iter()
88                    .flat_map(metric_to_datafusion)
89                {
90                    set.push(Arc::new(metric));
91                }
92            }
93
94            self.0.push(set);
95
96            Ok(false)
97        } else {
98            Ok(true)
99        }
100    }
101}
102
103fn metric_to_datafusion(metric: &Metric) -> impl Iterator<Item = DatafusionMetric> {
104    let (partition, labels) = labels_to_datafusion(metric.labels());
105    metric_value_to_datafusion(metric.name(), metric.value())
106        .into_iter()
107        .map(move |metric_value| {
108            DatafusionMetric::new_with_labels(metric_value, partition, labels.clone())
109        })
110}
111
112fn labels_to_datafusion(tags: &[Label]) -> (Option<usize>, Vec<DatafusionLabel>) {
113    tags.iter()
114        .fold((None, Vec::new()), |(mut partition, mut labels), metric| {
115            if metric.key() == PARTITION_LABEL {
116                partition = metric.value().parse().ok();
117            } else {
118                labels.push(DatafusionLabel::new(
119                    metric.key().to_string(),
120                    metric.value().to_string(),
121                ));
122            }
123            (partition, labels)
124        })
125}
126
127fn metric_value_to_datafusion(name: &str, metric: &MetricValue) -> Vec<DatafusionMetricValue> {
128    match metric {
129        MetricValue::Counter(counter) => counter
130            .value()
131            .try_into()
132            .into_iter()
133            .map(|count| df_counter(name.to_string(), count))
134            .collect(),
135        MetricValue::Histogram(hist) => {
136            let mut res = Vec::new();
137
138            res.push(df_counter(format!("{name}_count"), hist.count()));
139
140            if !hist.is_empty() {
141                if let Some(max) = f_to_u(hist.quantile(1.0).vortex_expect("must not be empty")) {
142                    res.push(df_gauge(format!("{name}_max"), max));
143                }
144
145                if let Some(min) = f_to_u(hist.quantile(0.0).vortex_expect("must not be empty")) {
146                    res.push(df_gauge(format!("{name}_min"), min));
147                }
148
149                if let Some(p95) = f_to_u(hist.quantile(0.95).vortex_expect("must not be empty")) {
150                    res.push(df_gauge(format!("{name}_p95"), p95));
151                }
152                if let Some(p99) = f_to_u(hist.quantile(0.99).vortex_expect("must not be empty")) {
153                    res.push(df_gauge(format!("{name}_p99"), p99));
154                }
155            }
156
157            res
158        }
159        MetricValue::Timer(timer) => {
160            let mut res = Vec::new();
161            res.push(df_counter(format!("{name}_count"), timer.count()));
162
163            if !timer.is_empty() {
164                let max = timer.quantile(1.0).vortex_expect("must not be empty");
165                res.push(df_timer(format!("{name}_max"), max));
166
167                let min = timer.quantile(0.0).vortex_expect("must not be empty");
168                res.push(df_timer(format!("{name}_min"), min));
169
170                let p95 = timer.quantile(0.95).vortex_expect("must not be empty");
171                res.push(df_timer(format!("{name}_p95"), p95));
172
173                let p99 = timer.quantile(0.99).vortex_expect("must not be empty");
174                res.push(df_timer(format!("{name}_p99"), p99));
175            }
176
177            res
178        }
179        // TODO(os): add more metric types when added to VortexMetrics
180        _ => vec![],
181    }
182}
183
184fn df_counter(name: String, value: usize) -> DatafusionMetricValue {
185    let count = Count::new();
186    count.add(value);
187    DatafusionMetricValue::Count {
188        name: name.into(),
189        count,
190    }
191}
192
193fn df_gauge(name: String, value: usize) -> DatafusionMetricValue {
194    let gauge = Gauge::new();
195    gauge.set(value);
196    DatafusionMetricValue::Gauge {
197        name: name.into(),
198        gauge,
199    }
200}
201
202fn df_timer(name: String, value: Duration) -> DatafusionMetricValue {
203    let time = Time::new();
204    time.add_duration(value);
205    DatafusionMetricValue::Time {
206        name: name.into(),
207        time,
208    }
209}
210
211#[expect(
212    clippy::cast_possible_truncation,
213    reason = "truncation is checked before cast"
214)]
215fn f_to_u(f: f64) -> Option<usize> {
216    (f.is_finite() && f >= usize::MIN as f64 && f <= usize::MAX as f64).then(||
217        // After the range check, truncation is guaranteed to keep the value in usize bounds.
218        f.trunc() as usize)
219}
220
221#[cfg(test)]
222mod tests {
223
224    use datafusion_datasource::source::DataSourceExec;
225    use datafusion_physical_plan::ExecutionPlanVisitor;
226    use datafusion_physical_plan::accept;
227
228    use super::VortexMetricsFinder;
229    use crate::common_tests::TestSessionContext;
230
231    /// Counts the number of DataSourceExec nodes in a plan.
232    struct DataSourceExecCounter(usize);
233
234    impl ExecutionPlanVisitor for DataSourceExecCounter {
235        type Error = std::convert::Infallible;
236        fn pre_visit(
237            &mut self,
238            plan: &dyn datafusion_physical_plan::ExecutionPlan,
239        ) -> Result<bool, Self::Error> {
240            if plan.downcast_ref::<DataSourceExec>().is_some() {
241                self.0 += 1;
242                Ok(false)
243            } else {
244                Ok(true)
245            }
246        }
247    }
248
249    #[tokio::test]
250    async fn metrics_finder_returns_one_set_per_data_source_exec() -> anyhow::Result<()> {
251        let ctx = TestSessionContext::default();
252
253        ctx.session
254            .sql(
255                "CREATE EXTERNAL TABLE my_tbl \
256                (c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
257                STORED AS vortex \
258                LOCATION 'files/'",
259            )
260            .await?;
261
262        ctx.session
263            .sql("INSERT INTO my_tbl VALUES ('a', 1), ('b', 2)")
264            .await?
265            .collect()
266            .await?;
267
268        let df = ctx.session.sql("SELECT * FROM my_tbl").await?;
269        let (state, plan) = df.into_parts();
270        let physical_plan = state.create_physical_plan(&plan).await?;
271
272        // Count DataSourceExec nodes
273        let mut counter = DataSourceExecCounter(0);
274        accept(physical_plan.as_ref(), &mut counter)?;
275
276        // Get metrics sets
277        let metrics_sets = VortexMetricsFinder::find_all(physical_plan.as_ref());
278
279        assert!(!metrics_sets.is_empty());
280        assert_eq!(
281            metrics_sets.len(),
282            counter.0,
283            "Expected one MetricsSet per DataSourceExec, got {} sets for {} DataSourceExec nodes",
284            metrics_sets.len(),
285            counter.0
286        );
287
288        Ok(())
289    }
290}