Skip to main content

vortex_datafusion/persistent/
metrics.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Vortex table provider metrics.
5use std::sync::Arc;
6use std::time::Duration;
7
8use datafusion_datasource::file_scan_config::FileScanConfig;
9use datafusion_datasource::source::DataSourceExec;
10use datafusion_physical_plan::ExecutionPlan;
11use datafusion_physical_plan::ExecutionPlanVisitor;
12use datafusion_physical_plan::Metric as DatafusionMetric;
13use datafusion_physical_plan::accept;
14use datafusion_physical_plan::metrics::Count;
15use datafusion_physical_plan::metrics::Gauge;
16use datafusion_physical_plan::metrics::Label as DatafusionLabel;
17use datafusion_physical_plan::metrics::MetricValue as DatafusionMetricValue;
18use datafusion_physical_plan::metrics::MetricsSet;
19use datafusion_physical_plan::metrics::Time;
20use vortex::error::VortexExpect;
21use vortex::metrics::Label;
22use vortex::metrics::Metric;
23use vortex::metrics::MetricValue;
24
25use crate::persistent::source::VortexSource;
26
27pub(crate) static PARTITION_LABEL: &str = "partition";
28pub(crate) static PATH_LABEL: &str = "file_path";
29
30/// Extracts datafusion metrics from all VortexExec instances in
31/// a given physical plan.
32#[derive(Default)]
33pub struct VortexMetricsFinder(Vec<MetricsSet>);
34
35impl VortexMetricsFinder {
36    /// find all metrics for VortexExec nodes.
37    pub fn find_all(plan: &dyn ExecutionPlan) -> Vec<MetricsSet> {
38        let mut finder = Self::default();
39        match accept(plan, &mut finder) {
40            Ok(()) => finder.0,
41            Err(_) => Vec::new(),
42        }
43    }
44}
45
46impl ExecutionPlanVisitor for VortexMetricsFinder {
47    type Error = std::convert::Infallible;
48    fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
49        if let Some(exec) = plan.as_any().downcast_ref::<DataSourceExec>() {
50            // Start with exec metrics or create a new set
51            let mut set = exec.metrics().unwrap_or_default();
52
53            // Include our own metrics from VortexSource
54            if let Some(file_scan) = exec.data_source().as_any().downcast_ref::<FileScanConfig>()
55                && let Some(scan) = file_scan
56                    .file_source
57                    .as_any()
58                    .downcast_ref::<VortexSource>()
59            {
60                for metric in scan
61                    .metrics_registry()
62                    .snapshot()
63                    .iter()
64                    .flat_map(metric_to_datafusion)
65                {
66                    set.push(Arc::new(metric));
67                }
68            }
69
70            self.0.push(set);
71
72            Ok(false)
73        } else {
74            Ok(true)
75        }
76    }
77}
78
79fn metric_to_datafusion(metric: &Metric) -> impl Iterator<Item = DatafusionMetric> {
80    let (partition, labels) = labels_to_datafusion(metric.labels());
81    metric_value_to_datafusion(metric.name(), metric.value())
82        .into_iter()
83        .map(move |metric_value| {
84            DatafusionMetric::new_with_labels(metric_value, partition, labels.clone())
85        })
86}
87
88fn labels_to_datafusion(tags: &[Label]) -> (Option<usize>, Vec<DatafusionLabel>) {
89    tags.iter()
90        .fold((None, Vec::new()), |(mut partition, mut labels), metric| {
91            if metric.key() == PARTITION_LABEL {
92                partition = metric.value().parse().ok();
93            } else {
94                labels.push(DatafusionLabel::new(
95                    metric.key().to_string(),
96                    metric.value().to_string(),
97                ));
98            }
99            (partition, labels)
100        })
101}
102
103fn metric_value_to_datafusion(name: &str, metric: &MetricValue) -> Vec<DatafusionMetricValue> {
104    match metric {
105        MetricValue::Counter(counter) => counter
106            .value()
107            .try_into()
108            .into_iter()
109            .map(|count| df_counter(name.to_string(), count))
110            .collect(),
111        MetricValue::Histogram(hist) => {
112            let mut res = Vec::new();
113
114            res.push(df_counter(format!("{name}_count"), hist.count()));
115
116            if !hist.is_empty() {
117                if let Some(max) = f_to_u(hist.quantile(1.0).vortex_expect("must not be empty")) {
118                    res.push(df_gauge(format!("{name}_max"), max));
119                }
120
121                if let Some(min) = f_to_u(hist.quantile(0.0).vortex_expect("must not be empty")) {
122                    res.push(df_gauge(format!("{name}_min"), min));
123                }
124
125                if let Some(p95) = f_to_u(hist.quantile(0.95).vortex_expect("must not be empty")) {
126                    res.push(df_gauge(format!("{name}_p95"), p95));
127                }
128                if let Some(p99) = f_to_u(hist.quantile(0.99).vortex_expect("must not be empty")) {
129                    res.push(df_gauge(format!("{name}_p99"), p99));
130                }
131            }
132
133            res
134        }
135        MetricValue::Timer(timer) => {
136            let mut res = Vec::new();
137            res.push(df_counter(format!("{name}_count"), timer.count()));
138
139            if !timer.is_empty() {
140                let max = timer.quantile(1.0).vortex_expect("must not be empty");
141                res.push(df_timer(format!("{name}_max"), max));
142
143                let min = timer.quantile(0.0).vortex_expect("must not be empty");
144                res.push(df_timer(format!("{name}_min"), min));
145
146                let p95 = timer.quantile(0.95).vortex_expect("must not be empty");
147                res.push(df_timer(format!("{name}_p95"), p95));
148
149                let p99 = timer.quantile(0.99).vortex_expect("must not be empty");
150                res.push(df_timer(format!("{name}_p99"), p99));
151            }
152
153            res
154        }
155        // TODO(os): add more metric types when added to VortexMetrics
156        _ => vec![],
157    }
158}
159
160fn df_counter(name: String, value: usize) -> DatafusionMetricValue {
161    let count = Count::new();
162    count.add(value);
163    DatafusionMetricValue::Count {
164        name: name.into(),
165        count,
166    }
167}
168
169fn df_gauge(name: String, value: usize) -> DatafusionMetricValue {
170    let gauge = Gauge::new();
171    gauge.set(value);
172    DatafusionMetricValue::Gauge {
173        name: name.into(),
174        gauge,
175    }
176}
177
178fn df_timer(name: String, value: Duration) -> DatafusionMetricValue {
179    let time = Time::new();
180    time.add_duration(value);
181    DatafusionMetricValue::Time {
182        name: name.into(),
183        time,
184    }
185}
186
187#[expect(
188    clippy::cast_possible_truncation,
189    reason = "truncation is checked before cast"
190)]
191fn f_to_u(f: f64) -> Option<usize> {
192    (f.is_finite() && f >= usize::MIN as f64 && f <= usize::MAX as f64).then(||
193        // After the range check, truncation is guaranteed to keep the value in usize bounds.
194        f.trunc() as usize)
195}
196
197#[cfg(test)]
198mod tests {
199
200    use datafusion_datasource::source::DataSourceExec;
201    use datafusion_physical_plan::ExecutionPlanVisitor;
202    use datafusion_physical_plan::accept;
203
204    use super::VortexMetricsFinder;
205    use crate::common_tests::TestSessionContext;
206
207    /// Counts the number of DataSourceExec nodes in a plan.
208    struct DataSourceExecCounter(usize);
209
210    impl ExecutionPlanVisitor for DataSourceExecCounter {
211        type Error = std::convert::Infallible;
212        fn pre_visit(
213            &mut self,
214            plan: &dyn datafusion_physical_plan::ExecutionPlan,
215        ) -> Result<bool, Self::Error> {
216            if plan.as_any().downcast_ref::<DataSourceExec>().is_some() {
217                self.0 += 1;
218                Ok(false)
219            } else {
220                Ok(true)
221            }
222        }
223    }
224
225    #[tokio::test]
226    async fn metrics_finder_returns_one_set_per_data_source_exec() -> anyhow::Result<()> {
227        let ctx = TestSessionContext::default();
228
229        ctx.session
230            .sql(
231                "CREATE EXTERNAL TABLE my_tbl \
232                (c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
233                STORED AS vortex \
234                LOCATION 'files/'",
235            )
236            .await?;
237
238        ctx.session
239            .sql("INSERT INTO my_tbl VALUES ('a', 1), ('b', 2)")
240            .await?
241            .collect()
242            .await?;
243
244        let df = ctx.session.sql("SELECT * FROM my_tbl").await?;
245        let (state, plan) = df.into_parts();
246        let physical_plan = state.create_physical_plan(&plan).await?;
247
248        // Count DataSourceExec nodes
249        let mut counter = DataSourceExecCounter(0);
250        accept(physical_plan.as_ref(), &mut counter)?;
251
252        // Get metrics sets
253        let metrics_sets = VortexMetricsFinder::find_all(physical_plan.as_ref());
254
255        assert!(!metrics_sets.is_empty());
256        assert_eq!(
257            metrics_sets.len(),
258            counter.0,
259            "Expected one MetricsSet per DataSourceExec, got {} sets for {} DataSourceExec nodes",
260            metrics_sets.len(),
261            counter.0
262        );
263
264        Ok(())
265    }
266}