vortex_datafusion/persistent/
metrics.rs1use std::sync::Arc;
6use std::time::Duration;
7
8use datafusion_datasource::file_scan_config::FileScanConfig;
9use datafusion_datasource::source::DataSourceExec;
10use datafusion_physical_plan::ExecutionPlan;
11use datafusion_physical_plan::ExecutionPlanVisitor;
12use datafusion_physical_plan::Metric as DatafusionMetric;
13use datafusion_physical_plan::accept;
14use datafusion_physical_plan::metrics::Count;
15use datafusion_physical_plan::metrics::Gauge;
16use datafusion_physical_plan::metrics::Label as DatafusionLabel;
17use datafusion_physical_plan::metrics::MetricValue as DatafusionMetricValue;
18use datafusion_physical_plan::metrics::MetricsSet;
19use datafusion_physical_plan::metrics::Time;
20use vortex::error::VortexExpect;
21use vortex::metrics::Label;
22use vortex::metrics::Metric;
23use vortex::metrics::MetricValue;
24
25use crate::persistent::source::VortexSource;
26
27pub(crate) static PARTITION_LABEL: &str = "partition";
28pub(crate) static PATH_LABEL: &str = "file_path";
29
30#[derive(Default)]
33pub struct VortexMetricsFinder(Vec<MetricsSet>);
34
35impl VortexMetricsFinder {
36 pub fn find_all(plan: &dyn ExecutionPlan) -> Vec<MetricsSet> {
38 let mut finder = Self::default();
39 match accept(plan, &mut finder) {
40 Ok(()) => finder.0,
41 Err(_) => Vec::new(),
42 }
43 }
44}
45
46impl ExecutionPlanVisitor for VortexMetricsFinder {
47 type Error = std::convert::Infallible;
48 fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
49 if let Some(exec) = plan.as_any().downcast_ref::<DataSourceExec>() {
50 let mut set = exec.metrics().unwrap_or_default();
52
53 if let Some(file_scan) = exec.data_source().as_any().downcast_ref::<FileScanConfig>()
55 && let Some(scan) = file_scan
56 .file_source
57 .as_any()
58 .downcast_ref::<VortexSource>()
59 {
60 for metric in scan
61 .metrics_registry()
62 .snapshot()
63 .iter()
64 .flat_map(metric_to_datafusion)
65 {
66 set.push(Arc::new(metric));
67 }
68 }
69
70 self.0.push(set);
71
72 Ok(false)
73 } else {
74 Ok(true)
75 }
76 }
77}
78
79fn metric_to_datafusion(metric: &Metric) -> impl Iterator<Item = DatafusionMetric> {
80 let (partition, labels) = labels_to_datafusion(metric.labels());
81 metric_value_to_datafusion(metric.name(), metric.value())
82 .into_iter()
83 .map(move |metric_value| {
84 DatafusionMetric::new_with_labels(metric_value, partition, labels.clone())
85 })
86}
87
88fn labels_to_datafusion(tags: &[Label]) -> (Option<usize>, Vec<DatafusionLabel>) {
89 tags.iter()
90 .fold((None, Vec::new()), |(mut partition, mut labels), metric| {
91 if metric.key() == PARTITION_LABEL {
92 partition = metric.value().parse().ok();
93 } else {
94 labels.push(DatafusionLabel::new(
95 metric.key().to_string(),
96 metric.value().to_string(),
97 ));
98 }
99 (partition, labels)
100 })
101}
102
103fn metric_value_to_datafusion(name: &str, metric: &MetricValue) -> Vec<DatafusionMetricValue> {
104 match metric {
105 MetricValue::Counter(counter) => counter
106 .value()
107 .try_into()
108 .into_iter()
109 .map(|count| df_counter(name.to_string(), count))
110 .collect(),
111 MetricValue::Histogram(hist) => {
112 let mut res = Vec::new();
113
114 res.push(df_counter(format!("{name}_count"), hist.count()));
115
116 if !hist.is_empty() {
117 if let Some(max) = f_to_u(hist.quantile(1.0).vortex_expect("must not be empty")) {
118 res.push(df_gauge(format!("{name}_max"), max));
119 }
120
121 if let Some(min) = f_to_u(hist.quantile(0.0).vortex_expect("must not be empty")) {
122 res.push(df_gauge(format!("{name}_min"), min));
123 }
124
125 if let Some(p95) = f_to_u(hist.quantile(0.95).vortex_expect("must not be empty")) {
126 res.push(df_gauge(format!("{name}_p95"), p95));
127 }
128 if let Some(p99) = f_to_u(hist.quantile(0.99).vortex_expect("must not be empty")) {
129 res.push(df_gauge(format!("{name}_p99"), p99));
130 }
131 }
132
133 res
134 }
135 MetricValue::Timer(timer) => {
136 let mut res = Vec::new();
137 res.push(df_counter(format!("{name}_count"), timer.count()));
138
139 if !timer.is_empty() {
140 let max = timer.quantile(1.0).vortex_expect("must not be empty");
141 res.push(df_timer(format!("{name}_max"), max));
142
143 let min = timer.quantile(0.0).vortex_expect("must not be empty");
144 res.push(df_timer(format!("{name}_min"), min));
145
146 let p95 = timer.quantile(0.95).vortex_expect("must not be empty");
147 res.push(df_timer(format!("{name}_p95"), p95));
148
149 let p99 = timer.quantile(0.99).vortex_expect("must not be empty");
150 res.push(df_timer(format!("{name}_p99"), p99));
151 }
152
153 res
154 }
155 _ => vec![],
157 }
158}
159
160fn df_counter(name: String, value: usize) -> DatafusionMetricValue {
161 let count = Count::new();
162 count.add(value);
163 DatafusionMetricValue::Count {
164 name: name.into(),
165 count,
166 }
167}
168
169fn df_gauge(name: String, value: usize) -> DatafusionMetricValue {
170 let gauge = Gauge::new();
171 gauge.set(value);
172 DatafusionMetricValue::Gauge {
173 name: name.into(),
174 gauge,
175 }
176}
177
178fn df_timer(name: String, value: Duration) -> DatafusionMetricValue {
179 let time = Time::new();
180 time.add_duration(value);
181 DatafusionMetricValue::Time {
182 name: name.into(),
183 time,
184 }
185}
186
187#[expect(
188 clippy::cast_possible_truncation,
189 reason = "truncation is checked before cast"
190)]
191fn f_to_u(f: f64) -> Option<usize> {
192 (f.is_finite() && f >= usize::MIN as f64 && f <= usize::MAX as f64).then(||
193 f.trunc() as usize)
195}
196
197#[cfg(test)]
198mod tests {
199
200 use datafusion_datasource::source::DataSourceExec;
201 use datafusion_physical_plan::ExecutionPlanVisitor;
202 use datafusion_physical_plan::accept;
203
204 use super::VortexMetricsFinder;
205 use crate::common_tests::TestSessionContext;
206
207 struct DataSourceExecCounter(usize);
209
210 impl ExecutionPlanVisitor for DataSourceExecCounter {
211 type Error = std::convert::Infallible;
212 fn pre_visit(
213 &mut self,
214 plan: &dyn datafusion_physical_plan::ExecutionPlan,
215 ) -> Result<bool, Self::Error> {
216 if plan.as_any().downcast_ref::<DataSourceExec>().is_some() {
217 self.0 += 1;
218 Ok(false)
219 } else {
220 Ok(true)
221 }
222 }
223 }
224
225 #[tokio::test]
226 async fn metrics_finder_returns_one_set_per_data_source_exec() -> anyhow::Result<()> {
227 let ctx = TestSessionContext::default();
228
229 ctx.session
230 .sql(
231 "CREATE EXTERNAL TABLE my_tbl \
232 (c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
233 STORED AS vortex \
234 LOCATION 'files/'",
235 )
236 .await?;
237
238 ctx.session
239 .sql("INSERT INTO my_tbl VALUES ('a', 1), ('b', 2)")
240 .await?
241 .collect()
242 .await?;
243
244 let df = ctx.session.sql("SELECT * FROM my_tbl").await?;
245 let (state, plan) = df.into_parts();
246 let physical_plan = state.create_physical_plan(&plan).await?;
247
248 let mut counter = DataSourceExecCounter(0);
250 accept(physical_plan.as_ref(), &mut counter)?;
251
252 let metrics_sets = VortexMetricsFinder::find_all(physical_plan.as_ref());
254
255 assert!(!metrics_sets.is_empty());
256 assert_eq!(
257 metrics_sets.len(),
258 counter.0,
259 "Expected one MetricsSet per DataSourceExec, got {} sets for {} DataSourceExec nodes",
260 metrics_sets.len(),
261 counter.0
262 );
263
264 Ok(())
265 }
266}