vortex_datafusion/persistent/
metrics.rs1use std::sync::Arc;
6use std::time::Duration;
7
8use datafusion_datasource::file_scan_config::FileScanConfig;
9use datafusion_datasource::source::DataSourceExec;
10use datafusion_physical_plan::ExecutionPlan;
11use datafusion_physical_plan::ExecutionPlanVisitor;
12use datafusion_physical_plan::Metric as DatafusionMetric;
13use datafusion_physical_plan::accept;
14use datafusion_physical_plan::metrics::Count;
15use datafusion_physical_plan::metrics::Gauge;
16use datafusion_physical_plan::metrics::Label as DatafusionLabel;
17use datafusion_physical_plan::metrics::MetricValue as DatafusionMetricValue;
18use datafusion_physical_plan::metrics::MetricsSet;
19use datafusion_physical_plan::metrics::Time;
20use vortex::error::VortexExpect;
21use vortex::metrics::Label;
22use vortex::metrics::Metric;
23use vortex::metrics::MetricValue;
24
25use crate::persistent::source::VortexSource;
26
27pub(crate) static PARTITION_LABEL: &str = "partition";
28pub(crate) static PATH_LABEL: &str = "file_path";
29
30#[derive(Default)]
59pub struct VortexMetricsFinder(Vec<MetricsSet>);
60
61impl VortexMetricsFinder {
62 pub fn find_all(plan: &dyn ExecutionPlan) -> Vec<MetricsSet> {
65 let mut finder = Self::default();
66 match accept(plan, &mut finder) {
67 Ok(()) => finder.0,
68 Err(_) => Vec::new(),
69 }
70 }
71}
72
73impl ExecutionPlanVisitor for VortexMetricsFinder {
74 type Error = std::convert::Infallible;
75 fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
76 if let Some(exec) = plan.downcast_ref::<DataSourceExec>() {
77 let mut set = exec.metrics().unwrap_or_default();
79
80 if let Some(file_scan) = exec.data_source().downcast_ref::<FileScanConfig>()
82 && let Some(scan) = file_scan.file_source.downcast_ref::<VortexSource>()
83 {
84 for metric in scan
85 .metrics_registry()
86 .snapshot()
87 .iter()
88 .flat_map(metric_to_datafusion)
89 {
90 set.push(Arc::new(metric));
91 }
92 }
93
94 self.0.push(set);
95
96 Ok(false)
97 } else {
98 Ok(true)
99 }
100 }
101}
102
103fn metric_to_datafusion(metric: &Metric) -> impl Iterator<Item = DatafusionMetric> {
104 let (partition, labels) = labels_to_datafusion(metric.labels());
105 metric_value_to_datafusion(metric.name(), metric.value())
106 .into_iter()
107 .map(move |metric_value| {
108 DatafusionMetric::new_with_labels(metric_value, partition, labels.clone())
109 })
110}
111
112fn labels_to_datafusion(tags: &[Label]) -> (Option<usize>, Vec<DatafusionLabel>) {
113 tags.iter()
114 .fold((None, Vec::new()), |(mut partition, mut labels), metric| {
115 if metric.key() == PARTITION_LABEL {
116 partition = metric.value().parse().ok();
117 } else {
118 labels.push(DatafusionLabel::new(
119 metric.key().to_string(),
120 metric.value().to_string(),
121 ));
122 }
123 (partition, labels)
124 })
125}
126
127fn metric_value_to_datafusion(name: &str, metric: &MetricValue) -> Vec<DatafusionMetricValue> {
128 match metric {
129 MetricValue::Counter(counter) => counter
130 .value()
131 .try_into()
132 .into_iter()
133 .map(|count| df_counter(name.to_string(), count))
134 .collect(),
135 MetricValue::Histogram(hist) => {
136 let mut res = Vec::new();
137
138 res.push(df_counter(format!("{name}_count"), hist.count()));
139
140 if !hist.is_empty() {
141 if let Some(max) = f_to_u(hist.quantile(1.0).vortex_expect("must not be empty")) {
142 res.push(df_gauge(format!("{name}_max"), max));
143 }
144
145 if let Some(min) = f_to_u(hist.quantile(0.0).vortex_expect("must not be empty")) {
146 res.push(df_gauge(format!("{name}_min"), min));
147 }
148
149 if let Some(p95) = f_to_u(hist.quantile(0.95).vortex_expect("must not be empty")) {
150 res.push(df_gauge(format!("{name}_p95"), p95));
151 }
152 if let Some(p99) = f_to_u(hist.quantile(0.99).vortex_expect("must not be empty")) {
153 res.push(df_gauge(format!("{name}_p99"), p99));
154 }
155 }
156
157 res
158 }
159 MetricValue::Timer(timer) => {
160 let mut res = Vec::new();
161 res.push(df_counter(format!("{name}_count"), timer.count()));
162
163 if !timer.is_empty() {
164 let max = timer.quantile(1.0).vortex_expect("must not be empty");
165 res.push(df_timer(format!("{name}_max"), max));
166
167 let min = timer.quantile(0.0).vortex_expect("must not be empty");
168 res.push(df_timer(format!("{name}_min"), min));
169
170 let p95 = timer.quantile(0.95).vortex_expect("must not be empty");
171 res.push(df_timer(format!("{name}_p95"), p95));
172
173 let p99 = timer.quantile(0.99).vortex_expect("must not be empty");
174 res.push(df_timer(format!("{name}_p99"), p99));
175 }
176
177 res
178 }
179 _ => vec![],
181 }
182}
183
184fn df_counter(name: String, value: usize) -> DatafusionMetricValue {
185 let count = Count::new();
186 count.add(value);
187 DatafusionMetricValue::Count {
188 name: name.into(),
189 count,
190 }
191}
192
193fn df_gauge(name: String, value: usize) -> DatafusionMetricValue {
194 let gauge = Gauge::new();
195 gauge.set(value);
196 DatafusionMetricValue::Gauge {
197 name: name.into(),
198 gauge,
199 }
200}
201
202fn df_timer(name: String, value: Duration) -> DatafusionMetricValue {
203 let time = Time::new();
204 time.add_duration(value);
205 DatafusionMetricValue::Time {
206 name: name.into(),
207 time,
208 }
209}
210
211#[expect(
212 clippy::cast_possible_truncation,
213 reason = "truncation is checked before cast"
214)]
215fn f_to_u(f: f64) -> Option<usize> {
216 (f.is_finite() && f >= usize::MIN as f64 && f <= usize::MAX as f64).then(||
217 f.trunc() as usize)
219}
220
221#[cfg(test)]
222mod tests {
223
224 use datafusion_datasource::source::DataSourceExec;
225 use datafusion_physical_plan::ExecutionPlanVisitor;
226 use datafusion_physical_plan::accept;
227
228 use super::VortexMetricsFinder;
229 use crate::common_tests::TestSessionContext;
230
231 struct DataSourceExecCounter(usize);
233
234 impl ExecutionPlanVisitor for DataSourceExecCounter {
235 type Error = std::convert::Infallible;
236 fn pre_visit(
237 &mut self,
238 plan: &dyn datafusion_physical_plan::ExecutionPlan,
239 ) -> Result<bool, Self::Error> {
240 if plan.downcast_ref::<DataSourceExec>().is_some() {
241 self.0 += 1;
242 Ok(false)
243 } else {
244 Ok(true)
245 }
246 }
247 }
248
249 #[tokio::test]
250 async fn metrics_finder_returns_one_set_per_data_source_exec() -> anyhow::Result<()> {
251 let ctx = TestSessionContext::default();
252
253 ctx.session
254 .sql(
255 "CREATE EXTERNAL TABLE my_tbl \
256 (c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
257 STORED AS vortex \
258 LOCATION 'files/'",
259 )
260 .await?;
261
262 ctx.session
263 .sql("INSERT INTO my_tbl VALUES ('a', 1), ('b', 2)")
264 .await?
265 .collect()
266 .await?;
267
268 let df = ctx.session.sql("SELECT * FROM my_tbl").await?;
269 let (state, plan) = df.into_parts();
270 let physical_plan = state.create_physical_plan(&plan).await?;
271
272 let mut counter = DataSourceExecCounter(0);
274 accept(physical_plan.as_ref(), &mut counter)?;
275
276 let metrics_sets = VortexMetricsFinder::find_all(physical_plan.as_ref());
278
279 assert!(!metrics_sets.is_empty());
280 assert_eq!(
281 metrics_sets.len(),
282 counter.0,
283 "Expected one MetricsSet per DataSourceExec, got {} sets for {} DataSourceExec nodes",
284 metrics_sets.len(),
285 counter.0
286 );
287
288 Ok(())
289 }
290}