scouter_dataframe/parquet/
utils.rs1use crate::error::DataFrameError;
2use arrow::array::AsArray;
3use arrow::datatypes::UInt32Type;
4use arrow_array::types::Float64Type;
5use arrow_array::types::TimestampNanosecondType;
6use arrow_array::RecordBatch;
7use arrow_array::StringViewArray;
8use chrono::{DateTime, TimeZone, Utc};
9use datafusion::prelude::DataFrame;
10use scouter_types::{BinnedMetric, BinnedMetricStats, BinnedMetrics};
11use tracing::{debug, error, instrument};
12
13pub struct ParquetHelper {}
16
17impl ParquetHelper {
18 #[instrument(skip_all)]
19 pub fn extract_feature_array(batch: &RecordBatch) -> Result<&StringViewArray, DataFrameError> {
20 let feature_array = batch
21 .column_by_name("feature")
22 .ok_or_else(|| {
23 error!("Missing 'feature' field in RecordBatch");
24 DataFrameError::MissingFieldError("feature")
25 })?
26 .as_string_view_opt()
27 .ok_or_else(|| {
28 error!("Failed to downcast 'feature' field to StringViewArray");
29 DataFrameError::DowncastError("StringViewArray")
30 })?;
31 Ok(feature_array)
32 }
33
34 #[instrument(skip_all)]
35 pub fn extract_created_at(batch: &RecordBatch) -> Result<Vec<DateTime<Utc>>, DataFrameError> {
36 let created_at_list = batch
37 .column_by_name("created_at")
38 .ok_or_else(|| {
39 error!("Missing 'created_at' field in RecordBatch");
40 DataFrameError::MissingFieldError("created_at")
41 })?
42 .as_list_opt::<i32>()
43 .ok_or_else(|| {
44 error!("Failed to downcast 'created_at' field to ListArray");
45 DataFrameError::DowncastError("ListArray")
46 })?;
47
48 let created_at_array = created_at_list.value(0);
49 Ok(created_at_array
50 .as_primitive::<TimestampNanosecondType>()
51 .iter()
52 .filter_map(|ts| ts.map(|t| Utc.timestamp_nanos(t)))
53 .collect())
54 }
55}
56pub struct BinnedMetricsExtractor {}
57
58impl BinnedMetricsExtractor {
59 #[instrument(skip_all)]
60 fn extract_stats(batch: &RecordBatch) -> Result<BinnedMetricStats, DataFrameError> {
61 let stats_list = batch
62 .column_by_name("stats")
63 .ok_or_else(|| {
64 error!("Missing 'stats' field in RecordBatch");
65 DataFrameError::MissingFieldError("stats")
66 })?
67 .as_list_opt::<i32>()
68 .ok_or_else(|| {
69 error!("Failed to downcast 'stats' field to ListArray");
70 DataFrameError::DowncastError("ListArray")
71 })?
72 .value(0);
73
74 let stats_structs = stats_list.as_struct_opt().ok_or_else(|| {
75 error!("Failed to downcast 'stats' field to StructArray");
76 DataFrameError::DowncastError("StructArray")
77 })?;
78
79 let avg = stats_structs
83 .column_by_name("avg")
84 .ok_or_else(|| DataFrameError::MissingFieldError("avg"))
85 .inspect_err(|e| {
86 error!("Failed to get 'avg' field from stats: {:?}", e);
87 })?
88 .as_primitive_opt::<Float64Type>()
89 .ok_or_else(|| DataFrameError::DowncastError("Float64Array"))?
90 .value(0);
91
92 let lower_bound = stats_structs
93 .column_by_name("lower_bound")
94 .ok_or_else(|| DataFrameError::MissingFieldError("lower_bound"))
95 .inspect_err(|e| {
96 error!("Failed to get 'lower_bound' field from stats: {:?}", e);
97 })?
98 .as_primitive_opt::<Float64Type>()
99 .ok_or_else(|| DataFrameError::DowncastError("Float64Array"))?
100 .value(0);
101
102 let upper_bound = stats_structs
103 .column_by_name("upper_bound")
104 .ok_or_else(|| DataFrameError::MissingFieldError("upper_bound"))
105 .inspect_err(|e| {
106 error!("Failed to get 'upper_bound' field from stats: {:?}", e);
107 })?
108 .as_primitive_opt::<Float64Type>()
109 .ok_or_else(|| DataFrameError::DowncastError("Float64Array"))?
110 .value(0);
111
112 Ok(BinnedMetricStats {
113 avg,
114 lower_bound,
115 upper_bound,
116 })
117 }
118
119 #[instrument(skip_all)]
120 fn process_metric_record_batch(batch: &RecordBatch) -> Result<BinnedMetric, DataFrameError> {
121 debug!("Processing metric record batch");
122
123 let metric_column = batch.column_by_name("metric").ok_or_else(|| {
124 error!("Missing 'metric' field in RecordBatch");
125 DataFrameError::MissingFieldError("metric")
126 })?;
127
128 let metric_name = if let Some(dict_array) = metric_column.as_dictionary_opt::<UInt32Type>()
130 {
131 let values = dict_array.values();
133 let string_values = values.as_string_opt::<i32>().ok_or_else(|| {
134 error!("Failed to downcast dictionary values to StringArray");
135 DataFrameError::DowncastError("StringArray")
136 })?;
137 let key = dict_array.key(0).ok_or_else(|| {
138 error!("Failed to get key from dictionary array");
139 DataFrameError::MissingFieldError("dictionary key")
140 })?;
141 string_values.value(key).to_string()
142 } else if let Some(string_view_array) = metric_column.as_string_view_opt() {
143 string_view_array.value(0).to_string()
145 } else if let Some(string_array) = metric_column.as_string_opt::<i32>() {
146 string_array.value(0).to_string()
148 } else {
149 error!("Failed to downcast 'metric' field to any supported string type");
150 return Err(DataFrameError::DowncastError("String type"));
151 };
152
153 let created_at_list = ParquetHelper::extract_created_at(batch)?;
154 let stats = Self::extract_stats(batch)?;
155
156 Ok(BinnedMetric {
157 metric: metric_name,
158 created_at: created_at_list,
159 stats: vec![stats],
160 })
161 }
162
163 #[instrument(skip_all)]
171 pub async fn dataframe_to_binned_metrics(
172 df: DataFrame,
173 ) -> Result<BinnedMetrics, DataFrameError> {
174 debug!("Converting DataFrame to binned metrics");
175
176 let batches = df.collect().await?;
177
178 let metrics: Vec<BinnedMetric> = batches
179 .iter()
180 .map(Self::process_metric_record_batch)
181 .collect::<Result<Vec<_>, _>>()
182 .inspect_err(|e| {
183 error!("Failed to process metric record batch: {:?}", e);
184 })?;
185
186 Ok(BinnedMetrics::from_vec(metrics))
187 }
188}