vortex_datafusion/persistent/
metrics.rs1use std::sync::Arc;
6use std::time::Duration;
7
8use datafusion_datasource::file_scan_config::FileScanConfig;
9use datafusion_datasource::source::DataSourceExec;
10use datafusion_physical_plan::ExecutionPlan;
11use datafusion_physical_plan::ExecutionPlanVisitor;
12use datafusion_physical_plan::Metric as DatafusionMetric;
13use datafusion_physical_plan::accept;
14use datafusion_physical_plan::metrics::Count;
15use datafusion_physical_plan::metrics::Gauge;
16use datafusion_physical_plan::metrics::Label as DatafusionLabel;
17use datafusion_physical_plan::metrics::MetricValue as DatafusionMetricValue;
18use datafusion_physical_plan::metrics::MetricsSet;
19use datafusion_physical_plan::metrics::Time;
20use vortex::error::VortexExpect;
21use vortex::metrics::Label;
22use vortex::metrics::Metric;
23use vortex::metrics::MetricValue;
24
25use crate::persistent::source::VortexSource;
26
27pub(crate) static PARTITION_LABEL: &str = "partition";
28pub(crate) static PATH_LABEL: &str = "file_path";
29
30#[derive(Default)]
59pub struct VortexMetricsFinder(Vec<MetricsSet>);
60
61impl VortexMetricsFinder {
62 pub fn find_all(plan: &dyn ExecutionPlan) -> Vec<MetricsSet> {
65 let mut finder = Self::default();
66 match accept(plan, &mut finder) {
67 Ok(()) => finder.0,
68 Err(_) => Vec::new(),
69 }
70 }
71}
72
73impl ExecutionPlanVisitor for VortexMetricsFinder {
74 type Error = std::convert::Infallible;
75 fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
76 if let Some(exec) = plan.as_any().downcast_ref::<DataSourceExec>() {
77 let mut set = exec.metrics().unwrap_or_default();
79
80 if let Some(file_scan) = exec.data_source().as_any().downcast_ref::<FileScanConfig>()
82 && let Some(scan) = file_scan
83 .file_source
84 .as_any()
85 .downcast_ref::<VortexSource>()
86 {
87 for metric in scan
88 .metrics_registry()
89 .snapshot()
90 .iter()
91 .flat_map(metric_to_datafusion)
92 {
93 set.push(Arc::new(metric));
94 }
95 }
96
97 self.0.push(set);
98
99 Ok(false)
100 } else {
101 Ok(true)
102 }
103 }
104}
105
106fn metric_to_datafusion(metric: &Metric) -> impl Iterator<Item = DatafusionMetric> {
107 let (partition, labels) = labels_to_datafusion(metric.labels());
108 metric_value_to_datafusion(metric.name(), metric.value())
109 .into_iter()
110 .map(move |metric_value| {
111 DatafusionMetric::new_with_labels(metric_value, partition, labels.clone())
112 })
113}
114
115fn labels_to_datafusion(tags: &[Label]) -> (Option<usize>, Vec<DatafusionLabel>) {
116 tags.iter()
117 .fold((None, Vec::new()), |(mut partition, mut labels), metric| {
118 if metric.key() == PARTITION_LABEL {
119 partition = metric.value().parse().ok();
120 } else {
121 labels.push(DatafusionLabel::new(
122 metric.key().to_string(),
123 metric.value().to_string(),
124 ));
125 }
126 (partition, labels)
127 })
128}
129
130fn metric_value_to_datafusion(name: &str, metric: &MetricValue) -> Vec<DatafusionMetricValue> {
131 match metric {
132 MetricValue::Counter(counter) => counter
133 .value()
134 .try_into()
135 .into_iter()
136 .map(|count| df_counter(name.to_string(), count))
137 .collect(),
138 MetricValue::Histogram(hist) => {
139 let mut res = Vec::new();
140
141 res.push(df_counter(format!("{name}_count"), hist.count()));
142
143 if !hist.is_empty() {
144 if let Some(max) = f_to_u(hist.quantile(1.0).vortex_expect("must not be empty")) {
145 res.push(df_gauge(format!("{name}_max"), max));
146 }
147
148 if let Some(min) = f_to_u(hist.quantile(0.0).vortex_expect("must not be empty")) {
149 res.push(df_gauge(format!("{name}_min"), min));
150 }
151
152 if let Some(p95) = f_to_u(hist.quantile(0.95).vortex_expect("must not be empty")) {
153 res.push(df_gauge(format!("{name}_p95"), p95));
154 }
155 if let Some(p99) = f_to_u(hist.quantile(0.99).vortex_expect("must not be empty")) {
156 res.push(df_gauge(format!("{name}_p99"), p99));
157 }
158 }
159
160 res
161 }
162 MetricValue::Timer(timer) => {
163 let mut res = Vec::new();
164 res.push(df_counter(format!("{name}_count"), timer.count()));
165
166 if !timer.is_empty() {
167 let max = timer.quantile(1.0).vortex_expect("must not be empty");
168 res.push(df_timer(format!("{name}_max"), max));
169
170 let min = timer.quantile(0.0).vortex_expect("must not be empty");
171 res.push(df_timer(format!("{name}_min"), min));
172
173 let p95 = timer.quantile(0.95).vortex_expect("must not be empty");
174 res.push(df_timer(format!("{name}_p95"), p95));
175
176 let p99 = timer.quantile(0.99).vortex_expect("must not be empty");
177 res.push(df_timer(format!("{name}_p99"), p99));
178 }
179
180 res
181 }
182 _ => vec![],
184 }
185}
186
187fn df_counter(name: String, value: usize) -> DatafusionMetricValue {
188 let count = Count::new();
189 count.add(value);
190 DatafusionMetricValue::Count {
191 name: name.into(),
192 count,
193 }
194}
195
196fn df_gauge(name: String, value: usize) -> DatafusionMetricValue {
197 let gauge = Gauge::new();
198 gauge.set(value);
199 DatafusionMetricValue::Gauge {
200 name: name.into(),
201 gauge,
202 }
203}
204
205fn df_timer(name: String, value: Duration) -> DatafusionMetricValue {
206 let time = Time::new();
207 time.add_duration(value);
208 DatafusionMetricValue::Time {
209 name: name.into(),
210 time,
211 }
212}
213
214#[expect(
215 clippy::cast_possible_truncation,
216 reason = "truncation is checked before cast"
217)]
218fn f_to_u(f: f64) -> Option<usize> {
219 (f.is_finite() && f >= usize::MIN as f64 && f <= usize::MAX as f64).then(||
220 f.trunc() as usize)
222}
223
224#[cfg(test)]
225mod tests {
226
227 use datafusion_datasource::source::DataSourceExec;
228 use datafusion_physical_plan::ExecutionPlanVisitor;
229 use datafusion_physical_plan::accept;
230
231 use super::VortexMetricsFinder;
232 use crate::common_tests::TestSessionContext;
233
234 struct DataSourceExecCounter(usize);
236
237 impl ExecutionPlanVisitor for DataSourceExecCounter {
238 type Error = std::convert::Infallible;
239 fn pre_visit(
240 &mut self,
241 plan: &dyn datafusion_physical_plan::ExecutionPlan,
242 ) -> Result<bool, Self::Error> {
243 if plan.as_any().downcast_ref::<DataSourceExec>().is_some() {
244 self.0 += 1;
245 Ok(false)
246 } else {
247 Ok(true)
248 }
249 }
250 }
251
252 #[tokio::test]
253 async fn metrics_finder_returns_one_set_per_data_source_exec() -> anyhow::Result<()> {
254 let ctx = TestSessionContext::default();
255
256 ctx.session
257 .sql(
258 "CREATE EXTERNAL TABLE my_tbl \
259 (c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
260 STORED AS vortex \
261 LOCATION 'files/'",
262 )
263 .await?;
264
265 ctx.session
266 .sql("INSERT INTO my_tbl VALUES ('a', 1), ('b', 2)")
267 .await?
268 .collect()
269 .await?;
270
271 let df = ctx.session.sql("SELECT * FROM my_tbl").await?;
272 let (state, plan) = df.into_parts();
273 let physical_plan = state.create_physical_plan(&plan).await?;
274
275 let mut counter = DataSourceExecCounter(0);
277 accept(physical_plan.as_ref(), &mut counter)?;
278
279 let metrics_sets = VortexMetricsFinder::find_all(physical_plan.as_ref());
281
282 assert!(!metrics_sets.is_empty());
283 assert_eq!(
284 metrics_sets.len(),
285 counter.0,
286 "Expected one MetricsSet per DataSourceExec, got {} sets for {} DataSourceExec nodes",
287 metrics_sets.len(),
288 counter.0
289 );
290
291 Ok(())
292 }
293}