vortex_datafusion/persistent/
metrics.rs1use std::sync::Arc;
6
7use datafusion_datasource::file_scan_config::FileScanConfig;
8use datafusion_datasource::source::DataSourceExec;
9use datafusion_physical_plan::metrics::{
10 Count, Gauge, Label as DatafusionLabel, MetricValue as DatafusionMetricValue, MetricsSet,
11};
12use datafusion_physical_plan::{
13 ExecutionPlan, ExecutionPlanVisitor, Metric as DatafusionMetric, accept,
14};
15use vortex::metrics::{Metric, MetricId, Tags};
16
17use crate::persistent::source::VortexSource;
18
19pub(crate) static PARTITION_LABEL: &str = "partition";
20
21#[derive(Default)]
24pub struct VortexMetricsFinder(Vec<MetricsSet>);
25
26impl VortexMetricsFinder {
27 pub fn find_all(plan: &dyn ExecutionPlan) -> Vec<MetricsSet> {
29 let mut finder = Self::default();
30 match accept(plan, &mut finder) {
31 Ok(()) => finder.0,
32 Err(_) => Vec::new(),
33 }
34 }
35}
36
37impl ExecutionPlanVisitor for VortexMetricsFinder {
38 type Error = std::convert::Infallible;
39 fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result<bool, Self::Error> {
40 if let Some(exec) = plan.as_any().downcast_ref::<DataSourceExec>() {
41 if let Some(metrics) = exec.metrics() {
42 self.0.push(metrics);
43 }
44
45 if let Some(file_scan) = exec.data_source().as_any().downcast_ref::<FileScanConfig>()
47 && let Some(scan) = file_scan
48 .file_source
49 .as_any()
50 .downcast_ref::<VortexSource>()
51 {
52 let mut set = MetricsSet::new();
53 for metric in scan
54 .metrics
55 .snapshot()
56 .iter()
57 .flat_map(|(id, metric)| metric_to_datafusion(id, metric))
58 {
59 set.push(Arc::new(metric));
60 }
61
62 self.0.push(set);
63 }
64 }
65
66 Ok(true)
67 }
68}
69
70fn metric_to_datafusion(id: MetricId, metric: &Metric) -> impl Iterator<Item = DatafusionMetric> {
71 let (partition, labels) = tags_to_datafusion(id.tags());
72 metric_value_to_datafusion(id.name(), metric)
73 .into_iter()
74 .map(move |metric_value| {
75 DatafusionMetric::new_with_labels(metric_value, partition, labels.clone())
76 })
77}
78
79fn tags_to_datafusion(tags: &Tags) -> (Option<usize>, Vec<DatafusionLabel>) {
80 tags.iter()
81 .fold((None, Vec::new()), |(mut partition, mut labels), (k, v)| {
82 if k == PARTITION_LABEL {
83 partition = v.parse().ok();
84 } else {
85 labels.push(DatafusionLabel::new(k.to_string(), v.to_string()));
86 }
87 (partition, labels)
88 })
89}
90
91fn metric_value_to_datafusion(name: &str, metric: &Metric) -> Vec<DatafusionMetricValue> {
92 match metric {
93 Metric::Counter(counter) => counter
94 .count()
95 .try_into()
96 .into_iter()
97 .map(|count| df_counter(name.to_string(), count))
98 .collect(),
99 Metric::Histogram(hist) => {
100 let mut res = Vec::new();
101 if let Ok(count) = hist.count().try_into() {
102 res.push(df_counter(format!("{name}_count"), count));
103 }
104 let snapshot = hist.snapshot();
105 if let Ok(max) = snapshot.max().try_into() {
106 res.push(df_gauge(format!("{name}_max"), max));
107 }
108 if let Ok(min) = snapshot.min().try_into() {
109 res.push(df_gauge(format!("{name}_min"), min));
110 }
111 if let Some(p90) = f_to_u(snapshot.value(0.90)) {
112 res.push(df_gauge(format!("{name}_p95"), p90));
113 }
114 if let Some(p99) = f_to_u(snapshot.value(0.99)) {
115 res.push(df_gauge(format!("{name}_p99"), p99));
116 }
117 res
118 }
119 Metric::Timer(timer) => {
120 let mut res = Vec::new();
121 if let Ok(count) = timer.count().try_into() {
122 res.push(df_counter(format!("{name}_count"), count));
123 }
124 let snapshot = timer.snapshot();
125 if let Ok(max) = snapshot.max().try_into() {
126 res.push(df_gauge(format!("{name}_max"), max));
128 }
129 if let Ok(min) = snapshot.min().try_into() {
130 res.push(df_gauge(format!("{name}_min"), min));
131 }
132 if let Some(p95) = f_to_u(snapshot.value(0.95)) {
133 res.push(df_gauge(format!("{name}_p95"), p95));
134 }
135 if let Some(p99) = f_to_u(snapshot.value(0.95)) {
136 res.push(df_gauge(format!("{name}_p99"), p99));
137 }
138 res
139 }
140 _ => vec![],
142 }
143}
144
145fn df_counter(name: String, value: usize) -> DatafusionMetricValue {
146 let count = Count::new();
147 count.add(value);
148 DatafusionMetricValue::Count {
149 name: name.into(),
150 count,
151 }
152}
153
154fn df_gauge(name: String, value: usize) -> DatafusionMetricValue {
155 let gauge = Gauge::new();
156 gauge.set(value);
157 DatafusionMetricValue::Gauge {
158 name: name.into(),
159 gauge,
160 }
161}
162
163#[allow(clippy::cast_possible_truncation)]
164fn f_to_u(f: f64) -> Option<usize> {
165 (f.is_finite() && f >= usize::MIN as f64 && f <= usize::MAX as f64).then(||
166 f.trunc() as usize)
168}