vortex_datafusion/persistent/
metrics.rs1use std::sync::Arc;
6
7use datafusion_datasource::file_scan_config::FileScanConfig;
8use datafusion_datasource::source::DataSourceExec;
9use datafusion_physical_plan::ExecutionPlan;
10use datafusion_physical_plan::ExecutionPlanVisitor;
11use datafusion_physical_plan::Metric as DatafusionMetric;
12use datafusion_physical_plan::accept;
13use datafusion_physical_plan::metrics::Count;
14use datafusion_physical_plan::metrics::Gauge;
15use datafusion_physical_plan::metrics::Label as DatafusionLabel;
16use datafusion_physical_plan::metrics::MetricValue as DatafusionMetricValue;
17use datafusion_physical_plan::metrics::MetricsSet;
18use vortex::metrics::Metric;
19use vortex::metrics::MetricId;
20use vortex::metrics::MetricsSessionExt;
21use vortex::metrics::Tags;
22
23use crate::persistent::source::VortexSource;
24
25pub(crate) static PARTITION_LABEL: &str = "partition";
26
27#[derive(Default)]
30pub struct VortexMetricsFinder(Vec<MetricsSet>);
31
32impl VortexMetricsFinder {
33 pub fn find_all(plan: &dyn ExecutionPlan) -> Vec<MetricsSet> {
35 let mut finder = Self::default();
36 match accept(plan, &mut finder) {
37 Ok(()) => finder.0,
38 Err(_) => Vec::new(),
39 }
40 }
41}
42
43impl ExecutionPlanVisitor for VortexMetricsFinder {
44 type Error = std::convert::Infallible;
45 fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
46 if let Some(exec) = plan.as_any().downcast_ref::<DataSourceExec>() {
47 if let Some(metrics) = exec.metrics() {
48 self.0.push(metrics);
49 }
50
51 if let Some(file_scan) = exec.data_source().as_any().downcast_ref::<FileScanConfig>()
53 && let Some(scan) = file_scan
54 .file_source
55 .as_any()
56 .downcast_ref::<VortexSource>()
57 {
58 let mut set = MetricsSet::new();
59 for metric in scan
60 .session
61 .metrics()
62 .snapshot()
63 .iter()
64 .flat_map(|(id, metric)| metric_to_datafusion(id, metric))
65 {
66 set.push(Arc::new(metric));
67 }
68
69 self.0.push(set);
70 }
71 }
72
73 Ok(true)
74 }
75}
76
77fn metric_to_datafusion(id: MetricId, metric: &Metric) -> impl Iterator<Item = DatafusionMetric> {
78 let (partition, labels) = tags_to_datafusion(id.tags());
79 metric_value_to_datafusion(id.name(), metric)
80 .into_iter()
81 .map(move |metric_value| {
82 DatafusionMetric::new_with_labels(metric_value, partition, labels.clone())
83 })
84}
85
86fn tags_to_datafusion(tags: &Tags) -> (Option<usize>, Vec<DatafusionLabel>) {
87 tags.iter()
88 .fold((None, Vec::new()), |(mut partition, mut labels), (k, v)| {
89 if k == PARTITION_LABEL {
90 partition = v.parse().ok();
91 } else {
92 labels.push(DatafusionLabel::new(k.to_string(), v.to_string()));
93 }
94 (partition, labels)
95 })
96}
97
98fn metric_value_to_datafusion(name: &str, metric: &Metric) -> Vec<DatafusionMetricValue> {
99 match metric {
100 Metric::Counter(counter) => counter
101 .count()
102 .try_into()
103 .into_iter()
104 .map(|count| df_counter(name.to_string(), count))
105 .collect(),
106 Metric::Histogram(hist) => {
107 let mut res = Vec::new();
108 if let Ok(count) = hist.count().try_into() {
109 res.push(df_counter(format!("{name}_count"), count));
110 }
111 let snapshot = hist.snapshot();
112 if let Ok(max) = snapshot.max().try_into() {
113 res.push(df_gauge(format!("{name}_max"), max));
114 }
115 if let Ok(min) = snapshot.min().try_into() {
116 res.push(df_gauge(format!("{name}_min"), min));
117 }
118 if let Some(p90) = f_to_u(snapshot.value(0.90)) {
119 res.push(df_gauge(format!("{name}_p95"), p90));
120 }
121 if let Some(p99) = f_to_u(snapshot.value(0.99)) {
122 res.push(df_gauge(format!("{name}_p99"), p99));
123 }
124 res
125 }
126 Metric::Timer(timer) => {
127 let mut res = Vec::new();
128 if let Ok(count) = timer.count().try_into() {
129 res.push(df_counter(format!("{name}_count"), count));
130 }
131 let snapshot = timer.snapshot();
132 if let Ok(max) = snapshot.max().try_into() {
133 res.push(df_gauge(format!("{name}_max"), max));
135 }
136 if let Ok(min) = snapshot.min().try_into() {
137 res.push(df_gauge(format!("{name}_min"), min));
138 }
139 if let Some(p95) = f_to_u(snapshot.value(0.95)) {
140 res.push(df_gauge(format!("{name}_p95"), p95));
141 }
142 if let Some(p99) = f_to_u(snapshot.value(0.95)) {
143 res.push(df_gauge(format!("{name}_p99"), p99));
144 }
145 res
146 }
147 _ => vec![],
149 }
150}
151
152fn df_counter(name: String, value: usize) -> DatafusionMetricValue {
153 let count = Count::new();
154 count.add(value);
155 DatafusionMetricValue::Count {
156 name: name.into(),
157 count,
158 }
159}
160
161fn df_gauge(name: String, value: usize) -> DatafusionMetricValue {
162 let gauge = Gauge::new();
163 gauge.set(value);
164 DatafusionMetricValue::Gauge {
165 name: name.into(),
166 gauge,
167 }
168}
169
170#[expect(
171 clippy::cast_possible_truncation,
172 reason = "truncation is checked before cast"
173)]
174fn f_to_u(f: f64) -> Option<usize> {
175 (f.is_finite() && f >= usize::MIN as f64 && f <= usize::MAX as f64).then(||
176 f.trunc() as usize)
178}