vortex_datafusion/persistent/
metrics.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! Vortex table provider metrics.
5use std::sync::Arc;
6
7use datafusion_datasource::file_scan_config::FileScanConfig;
8use datafusion_datasource::source::DataSourceExec;
9use datafusion_physical_plan::metrics::{
10    Count, Gauge, Label as DatafusionLabel, MetricValue as DatafusionMetricValue, MetricsSet,
11};
12use datafusion_physical_plan::{
13    ExecutionPlan, ExecutionPlanVisitor, Metric as DatafusionMetric, accept,
14};
15use vortex::metrics::{Metric, MetricId, MetricsSessionExt, Tags};
16
17use crate::persistent::source::VortexSource;
18
19pub(crate) static PARTITION_LABEL: &str = "partition";
20
21/// Extracts datafusion metrics from all VortexExec instances in
22/// a given physical plan.
23#[derive(Default)]
24pub struct VortexMetricsFinder(Vec<MetricsSet>);
25
26impl VortexMetricsFinder {
27    /// find all metrics for VortexExec nodes.
28    pub fn find_all(plan: &dyn ExecutionPlan) -> Vec<MetricsSet> {
29        let mut finder = Self::default();
30        match accept(plan, &mut finder) {
31            Ok(()) => finder.0,
32            Err(_) => Vec::new(),
33        }
34    }
35}
36
37impl ExecutionPlanVisitor for VortexMetricsFinder {
38    type Error = std::convert::Infallible;
39    fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
40        if let Some(exec) = plan.as_any().downcast_ref::<DataSourceExec>() {
41            if let Some(metrics) = exec.metrics() {
42                self.0.push(metrics);
43            }
44
45            // Include our own metrics from VortexSource
46            if let Some(file_scan) = exec.data_source().as_any().downcast_ref::<FileScanConfig>()
47                && let Some(scan) = file_scan
48                    .file_source
49                    .as_any()
50                    .downcast_ref::<VortexSource>()
51            {
52                let mut set = MetricsSet::new();
53                for metric in scan
54                    .session
55                    .metrics()
56                    .snapshot()
57                    .iter()
58                    .flat_map(|(id, metric)| metric_to_datafusion(id, metric))
59                {
60                    set.push(Arc::new(metric));
61                }
62
63                self.0.push(set);
64            }
65        }
66
67        Ok(true)
68    }
69}
70
71fn metric_to_datafusion(id: MetricId, metric: &Metric) -> impl Iterator<Item = DatafusionMetric> {
72    let (partition, labels) = tags_to_datafusion(id.tags());
73    metric_value_to_datafusion(id.name(), metric)
74        .into_iter()
75        .map(move |metric_value| {
76            DatafusionMetric::new_with_labels(metric_value, partition, labels.clone())
77        })
78}
79
80fn tags_to_datafusion(tags: &Tags) -> (Option<usize>, Vec<DatafusionLabel>) {
81    tags.iter()
82        .fold((None, Vec::new()), |(mut partition, mut labels), (k, v)| {
83            if k == PARTITION_LABEL {
84                partition = v.parse().ok();
85            } else {
86                labels.push(DatafusionLabel::new(k.to_string(), v.to_string()));
87            }
88            (partition, labels)
89        })
90}
91
92fn metric_value_to_datafusion(name: &str, metric: &Metric) -> Vec<DatafusionMetricValue> {
93    match metric {
94        Metric::Counter(counter) => counter
95            .count()
96            .try_into()
97            .into_iter()
98            .map(|count| df_counter(name.to_string(), count))
99            .collect(),
100        Metric::Histogram(hist) => {
101            let mut res = Vec::new();
102            if let Ok(count) = hist.count().try_into() {
103                res.push(df_counter(format!("{name}_count"), count));
104            }
105            let snapshot = hist.snapshot();
106            if let Ok(max) = snapshot.max().try_into() {
107                res.push(df_gauge(format!("{name}_max"), max));
108            }
109            if let Ok(min) = snapshot.min().try_into() {
110                res.push(df_gauge(format!("{name}_min"), min));
111            }
112            if let Some(p90) = f_to_u(snapshot.value(0.90)) {
113                res.push(df_gauge(format!("{name}_p95"), p90));
114            }
115            if let Some(p99) = f_to_u(snapshot.value(0.99)) {
116                res.push(df_gauge(format!("{name}_p99"), p99));
117            }
118            res
119        }
120        Metric::Timer(timer) => {
121            let mut res = Vec::new();
122            if let Ok(count) = timer.count().try_into() {
123                res.push(df_counter(format!("{name}_count"), count));
124            }
125            let snapshot = timer.snapshot();
126            if let Ok(max) = snapshot.max().try_into() {
127                // NOTE(os): unlike Time metrics, gauges allow custom aggregation
128                res.push(df_gauge(format!("{name}_max"), max));
129            }
130            if let Ok(min) = snapshot.min().try_into() {
131                res.push(df_gauge(format!("{name}_min"), min));
132            }
133            if let Some(p95) = f_to_u(snapshot.value(0.95)) {
134                res.push(df_gauge(format!("{name}_p95"), p95));
135            }
136            if let Some(p99) = f_to_u(snapshot.value(0.95)) {
137                res.push(df_gauge(format!("{name}_p99"), p99));
138            }
139            res
140        }
141        // TODO(os): add more metric types when added to VortexMetrics
142        _ => vec![],
143    }
144}
145
146fn df_counter(name: String, value: usize) -> DatafusionMetricValue {
147    let count = Count::new();
148    count.add(value);
149    DatafusionMetricValue::Count {
150        name: name.into(),
151        count,
152    }
153}
154
155fn df_gauge(name: String, value: usize) -> DatafusionMetricValue {
156    let gauge = Gauge::new();
157    gauge.set(value);
158    DatafusionMetricValue::Gauge {
159        name: name.into(),
160        gauge,
161    }
162}
163
164#[allow(clippy::cast_possible_truncation)]
165fn f_to_u(f: f64) -> Option<usize> {
166    (f.is_finite() && f >= usize::MIN as f64 && f <= usize::MAX as f64).then(||
167        // After the range check, truncation is guaranteed to keep the value in usize bounds.
168        f.trunc() as usize)
169}