vortex_datafusion/persistent/
metrics.rs1use std::sync::Arc;
6
7use datafusion_datasource::file_scan_config::FileScanConfig;
8use datafusion_datasource::source::DataSourceExec;
9use datafusion_physical_plan::metrics::{
10 Count, Gauge, Label as DatafusionLabel, MetricValue as DatafusionMetricValue, MetricsSet,
11};
12use datafusion_physical_plan::{
13 ExecutionPlan, ExecutionPlanVisitor, Metric as DatafusionMetric, accept,
14};
15use vortex::metrics::{Metric, MetricId, MetricsSessionExt, Tags};
16
17use crate::persistent::source::VortexSource;
18
19pub(crate) static PARTITION_LABEL: &str = "partition";
20
21#[derive(Default)]
24pub struct VortexMetricsFinder(Vec<MetricsSet>);
25
26impl VortexMetricsFinder {
27 pub fn find_all(plan: &dyn ExecutionPlan) -> Vec<MetricsSet> {
29 let mut finder = Self::default();
30 match accept(plan, &mut finder) {
31 Ok(()) => finder.0,
32 Err(_) => Vec::new(),
33 }
34 }
35}
36
37impl ExecutionPlanVisitor for VortexMetricsFinder {
38 type Error = std::convert::Infallible;
39 fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
40 if let Some(exec) = plan.as_any().downcast_ref::<DataSourceExec>() {
41 if let Some(metrics) = exec.metrics() {
42 self.0.push(metrics);
43 }
44
45 if let Some(file_scan) = exec.data_source().as_any().downcast_ref::<FileScanConfig>()
47 && let Some(scan) = file_scan
48 .file_source
49 .as_any()
50 .downcast_ref::<VortexSource>()
51 {
52 let mut set = MetricsSet::new();
53 for metric in scan
54 .session
55 .metrics()
56 .snapshot()
57 .iter()
58 .flat_map(|(id, metric)| metric_to_datafusion(id, metric))
59 {
60 set.push(Arc::new(metric));
61 }
62
63 self.0.push(set);
64 }
65 }
66
67 Ok(true)
68 }
69}
70
71fn metric_to_datafusion(id: MetricId, metric: &Metric) -> impl Iterator<Item = DatafusionMetric> {
72 let (partition, labels) = tags_to_datafusion(id.tags());
73 metric_value_to_datafusion(id.name(), metric)
74 .into_iter()
75 .map(move |metric_value| {
76 DatafusionMetric::new_with_labels(metric_value, partition, labels.clone())
77 })
78}
79
80fn tags_to_datafusion(tags: &Tags) -> (Option<usize>, Vec<DatafusionLabel>) {
81 tags.iter()
82 .fold((None, Vec::new()), |(mut partition, mut labels), (k, v)| {
83 if k == PARTITION_LABEL {
84 partition = v.parse().ok();
85 } else {
86 labels.push(DatafusionLabel::new(k.to_string(), v.to_string()));
87 }
88 (partition, labels)
89 })
90}
91
92fn metric_value_to_datafusion(name: &str, metric: &Metric) -> Vec<DatafusionMetricValue> {
93 match metric {
94 Metric::Counter(counter) => counter
95 .count()
96 .try_into()
97 .into_iter()
98 .map(|count| df_counter(name.to_string(), count))
99 .collect(),
100 Metric::Histogram(hist) => {
101 let mut res = Vec::new();
102 if let Ok(count) = hist.count().try_into() {
103 res.push(df_counter(format!("{name}_count"), count));
104 }
105 let snapshot = hist.snapshot();
106 if let Ok(max) = snapshot.max().try_into() {
107 res.push(df_gauge(format!("{name}_max"), max));
108 }
109 if let Ok(min) = snapshot.min().try_into() {
110 res.push(df_gauge(format!("{name}_min"), min));
111 }
112 if let Some(p90) = f_to_u(snapshot.value(0.90)) {
113 res.push(df_gauge(format!("{name}_p95"), p90));
114 }
115 if let Some(p99) = f_to_u(snapshot.value(0.99)) {
116 res.push(df_gauge(format!("{name}_p99"), p99));
117 }
118 res
119 }
120 Metric::Timer(timer) => {
121 let mut res = Vec::new();
122 if let Ok(count) = timer.count().try_into() {
123 res.push(df_counter(format!("{name}_count"), count));
124 }
125 let snapshot = timer.snapshot();
126 if let Ok(max) = snapshot.max().try_into() {
127 res.push(df_gauge(format!("{name}_max"), max));
129 }
130 if let Ok(min) = snapshot.min().try_into() {
131 res.push(df_gauge(format!("{name}_min"), min));
132 }
133 if let Some(p95) = f_to_u(snapshot.value(0.95)) {
134 res.push(df_gauge(format!("{name}_p95"), p95));
135 }
136 if let Some(p99) = f_to_u(snapshot.value(0.95)) {
137 res.push(df_gauge(format!("{name}_p99"), p99));
138 }
139 res
140 }
141 _ => vec![],
143 }
144}
145
146fn df_counter(name: String, value: usize) -> DatafusionMetricValue {
147 let count = Count::new();
148 count.add(value);
149 DatafusionMetricValue::Count {
150 name: name.into(),
151 count,
152 }
153}
154
155fn df_gauge(name: String, value: usize) -> DatafusionMetricValue {
156 let gauge = Gauge::new();
157 gauge.set(value);
158 DatafusionMetricValue::Gauge {
159 name: name.into(),
160 gauge,
161 }
162}
163
164#[allow(clippy::cast_possible_truncation)]
165fn f_to_u(f: f64) -> Option<usize> {
166 (f.is_finite() && f >= usize::MIN as f64 && f <= usize::MAX as f64).then(||
167 f.trunc() as usize)
169}