scouter_dataframe/parquet/
utils.rs1use crate::error::DataFrameError;
2use arrow::array::AsArray;
3use arrow_array::types::Float64Type;
4use arrow_array::types::TimestampNanosecondType;
5use arrow_array::RecordBatch;
6use arrow_array::StringViewArray;
7use chrono::{DateTime, TimeZone, Utc};
8use datafusion::prelude::DataFrame;
9use scouter_types::{BinnedMetric, BinnedMetricStats, BinnedMetrics};
10use tracing::{debug, error, instrument};
11
12pub struct ParquetHelper {}
15
16impl ParquetHelper {
17 #[instrument(skip_all)]
18 pub fn extract_feature_array(batch: &RecordBatch) -> Result<&StringViewArray, DataFrameError> {
19 let feature_array = batch
20 .column_by_name("feature")
21 .ok_or_else(|| {
22 error!("Missing 'feature' field in RecordBatch");
23 DataFrameError::MissingFieldError("feature")
24 })?
25 .as_string_view_opt()
26 .ok_or_else(|| {
27 error!("Failed to downcast 'feature' field to StringViewArray");
28 DataFrameError::DowncastError("StringViewArray")
29 })?;
30 Ok(feature_array)
31 }
32
33 #[instrument(skip_all)]
34 pub fn extract_created_at(batch: &RecordBatch) -> Result<Vec<DateTime<Utc>>, DataFrameError> {
35 let created_at_list = batch
36 .column_by_name("created_at")
37 .ok_or_else(|| {
38 error!("Missing 'created_at' field in RecordBatch");
39 DataFrameError::MissingFieldError("created_at")
40 })?
41 .as_list_opt::<i32>()
42 .ok_or_else(|| {
43 error!("Failed to downcast 'created_at' field to ListArray");
44 DataFrameError::DowncastError("ListArray")
45 })?;
46
47 let created_at_array = created_at_list.value(0);
48 Ok(created_at_array
49 .as_primitive::<TimestampNanosecondType>()
50 .iter()
51 .filter_map(|ts| ts.map(|t| Utc.timestamp_nanos(t)))
52 .collect())
53 }
54}
55pub struct BinnedMetricsExtractor {}
56
57impl BinnedMetricsExtractor {
58 #[instrument(skip_all)]
59 fn extract_stats(batch: &RecordBatch) -> Result<BinnedMetricStats, DataFrameError> {
60 let stats_list = batch
61 .column_by_name("stats")
62 .ok_or_else(|| {
63 error!("Missing 'stats' field in RecordBatch");
64 DataFrameError::MissingFieldError("stats")
65 })?
66 .as_list_opt::<i32>()
67 .ok_or_else(|| {
68 error!("Failed to downcast 'stats' field to ListArray");
69 DataFrameError::DowncastError("ListArray")
70 })?
71 .value(0);
72
73 let stats_structs = stats_list.as_struct_opt().ok_or_else(|| {
74 error!("Failed to downcast 'stats' field to StructArray");
75 DataFrameError::DowncastError("StructArray")
76 })?;
77
78 let avg = stats_structs
82 .column_by_name("avg")
83 .ok_or_else(|| DataFrameError::MissingFieldError("avg"))
84 .inspect_err(|e| {
85 error!("Failed to get 'avg' field from stats: {:?}", e);
86 })?
87 .as_primitive_opt::<Float64Type>()
88 .ok_or_else(|| DataFrameError::DowncastError("Float64Array"))?
89 .value(0);
90
91 let lower_bound = stats_structs
92 .column_by_name("lower_bound")
93 .ok_or_else(|| DataFrameError::MissingFieldError("lower_bound"))
94 .inspect_err(|e| {
95 error!("Failed to get 'lower_bound' field from stats: {:?}", e);
96 })?
97 .as_primitive_opt::<Float64Type>()
98 .ok_or_else(|| DataFrameError::DowncastError("Float64Array"))?
99 .value(0);
100
101 let upper_bound = stats_structs
102 .column_by_name("upper_bound")
103 .ok_or_else(|| DataFrameError::MissingFieldError("upper_bound"))
104 .inspect_err(|e| {
105 error!("Failed to get 'upper_bound' field from stats: {:?}", e);
106 })?
107 .as_primitive_opt::<Float64Type>()
108 .ok_or_else(|| DataFrameError::DowncastError("Float64Array"))?
109 .value(0);
110
111 Ok(BinnedMetricStats {
112 avg,
113 lower_bound,
114 upper_bound,
115 })
116 }
117
118 #[instrument(skip_all)]
119 fn process_metric_record_batch(batch: &RecordBatch) -> Result<BinnedMetric, DataFrameError> {
120 debug!("Processing metric record batch");
121
122 let metric_array = batch
123 .column_by_name("metric")
124 .ok_or_else(|| {
125 error!("Missing 'metric' field in RecordBatch");
126 DataFrameError::MissingFieldError("metric")
127 })?
128 .as_string_view_opt()
129 .ok_or_else(|| {
130 error!("Failed to downcast 'metric' field to StringViewArray");
131 DataFrameError::DowncastError("StringViewArray")
132 })?;
133 let metric_name = metric_array.value(0).to_string();
134 let created_at_list = ParquetHelper::extract_created_at(batch)?;
135 let stats = Self::extract_stats(batch)?;
136
137 Ok(BinnedMetric {
138 metric: metric_name,
139 created_at: created_at_list,
140 stats: vec![stats],
141 })
142 }
143
144 #[instrument(skip_all)]
152 pub async fn dataframe_to_binned_metrics(
153 df: DataFrame,
154 ) -> Result<BinnedMetrics, DataFrameError> {
155 debug!("Converting DataFrame to binned metrics");
156
157 let batches = df.collect().await?;
158
159 let metrics: Vec<BinnedMetric> = batches
160 .iter()
161 .map(Self::process_metric_record_batch)
162 .collect::<Result<Vec<_>, _>>()
163 .inspect_err(|e| {
164 error!("Failed to process metric record batch: {:?}", e);
165 })?;
166
167 Ok(BinnedMetrics::from_vec(metrics))
168 }
169}