Skip to main content

scouter_dataframe/parquet/
utils.rs

1use crate::error::DataFrameError;
2use arrow::array::AsArray;
3use arrow::datatypes::UInt32Type;
4use arrow_array::types::Float64Type;
5use arrow_array::types::TimestampNanosecondType;
6use arrow_array::RecordBatch;
7use arrow_array::StringViewArray;
8use chrono::{DateTime, TimeZone, Utc};
9use datafusion::prelude::DataFrame;
10use scouter_types::{BinnedMetric, BinnedMetricStats, BinnedMetrics};
11use tracing::{debug, error, instrument};
12
13/// Now that we have at least 2 metric types that calculate avg, lower_bound, and upper_bound as part of their stats,
14/// it makes sense to implement a generic trait that we can use.
15pub struct ParquetHelper {}
16
17impl ParquetHelper {
18    #[instrument(skip_all)]
19    pub fn extract_feature_array(batch: &RecordBatch) -> Result<&StringViewArray, DataFrameError> {
20        let feature_array = batch
21            .column_by_name("feature")
22            .ok_or_else(|| {
23                error!("Missing 'feature' field in RecordBatch");
24                DataFrameError::MissingFieldError("feature")
25            })?
26            .as_string_view_opt()
27            .ok_or_else(|| {
28                error!("Failed to downcast 'feature' field to StringViewArray");
29                DataFrameError::DowncastError("StringViewArray")
30            })?;
31        Ok(feature_array)
32    }
33
34    #[instrument(skip_all)]
35    pub fn extract_created_at(batch: &RecordBatch) -> Result<Vec<DateTime<Utc>>, DataFrameError> {
36        let created_at_list = batch
37            .column_by_name("created_at")
38            .ok_or_else(|| {
39                error!("Missing 'created_at' field in RecordBatch");
40                DataFrameError::MissingFieldError("created_at")
41            })?
42            .as_list_opt::<i32>()
43            .ok_or_else(|| {
44                error!("Failed to downcast 'created_at' field to ListArray");
45                DataFrameError::DowncastError("ListArray")
46            })?;
47
48        let created_at_array = created_at_list.value(0);
49        Ok(created_at_array
50            .as_primitive::<TimestampNanosecondType>()
51            .iter()
52            .filter_map(|ts| ts.map(|t| Utc.timestamp_nanos(t)))
53            .collect())
54    }
55}
56pub struct BinnedMetricsExtractor {}
57
58impl BinnedMetricsExtractor {
59    #[instrument(skip_all)]
60    fn extract_stats(batch: &RecordBatch) -> Result<BinnedMetricStats, DataFrameError> {
61        let stats_list = batch
62            .column_by_name("stats")
63            .ok_or_else(|| {
64                error!("Missing 'stats' field in RecordBatch");
65                DataFrameError::MissingFieldError("stats")
66            })?
67            .as_list_opt::<i32>()
68            .ok_or_else(|| {
69                error!("Failed to downcast 'stats' field to ListArray");
70                DataFrameError::DowncastError("ListArray")
71            })?
72            .value(0);
73
74        let stats_structs = stats_list.as_struct_opt().ok_or_else(|| {
75            error!("Failed to downcast 'stats' field to StructArray");
76            DataFrameError::DowncastError("StructArray")
77        })?;
78
79        // extract avg, lower_bound, and upper_bound from the struct
80
81        // Extract avg, lower_bound, and upper_bound from the struct
82        let avg = stats_structs
83            .column_by_name("avg")
84            .ok_or_else(|| DataFrameError::MissingFieldError("avg"))
85            .inspect_err(|e| {
86                error!("Failed to get 'avg' field from stats: {:?}", e);
87            })?
88            .as_primitive_opt::<Float64Type>()
89            .ok_or_else(|| DataFrameError::DowncastError("Float64Array"))?
90            .value(0);
91
92        let lower_bound = stats_structs
93            .column_by_name("lower_bound")
94            .ok_or_else(|| DataFrameError::MissingFieldError("lower_bound"))
95            .inspect_err(|e| {
96                error!("Failed to get 'lower_bound' field from stats: {:?}", e);
97            })?
98            .as_primitive_opt::<Float64Type>()
99            .ok_or_else(|| DataFrameError::DowncastError("Float64Array"))?
100            .value(0);
101
102        let upper_bound = stats_structs
103            .column_by_name("upper_bound")
104            .ok_or_else(|| DataFrameError::MissingFieldError("upper_bound"))
105            .inspect_err(|e| {
106                error!("Failed to get 'upper_bound' field from stats: {:?}", e);
107            })?
108            .as_primitive_opt::<Float64Type>()
109            .ok_or_else(|| DataFrameError::DowncastError("Float64Array"))?
110            .value(0);
111
112        Ok(BinnedMetricStats {
113            avg,
114            lower_bound,
115            upper_bound,
116        })
117    }
118
119    #[instrument(skip_all)]
120    fn process_metric_record_batch(batch: &RecordBatch) -> Result<BinnedMetric, DataFrameError> {
121        debug!("Processing metric record batch");
122
123        let metric_column = batch.column_by_name("metric").ok_or_else(|| {
124            error!("Missing 'metric' field in RecordBatch");
125            DataFrameError::MissingFieldError("metric")
126        })?;
127
128        // Handle both Dictionary and plain string types
129        let metric_name = if let Some(dict_array) = metric_column.as_dictionary_opt::<UInt32Type>()
130        {
131            // Dictionary-encoded string (e.g., from GenAI task_id)
132            let values = dict_array.values();
133            let string_values = values.as_string_opt::<i32>().ok_or_else(|| {
134                error!("Failed to downcast dictionary values to StringArray");
135                DataFrameError::DowncastError("StringArray")
136            })?;
137            let key = dict_array.key(0).ok_or_else(|| {
138                error!("Failed to get key from dictionary array");
139                DataFrameError::MissingFieldError("dictionary key")
140            })?;
141            string_values.value(key).to_string()
142        } else if let Some(string_view_array) = metric_column.as_string_view_opt() {
143            // StringView type
144            string_view_array.value(0).to_string()
145        } else if let Some(string_array) = metric_column.as_string_opt::<i32>() {
146            // Plain string type
147            string_array.value(0).to_string()
148        } else {
149            error!("Failed to downcast 'metric' field to any supported string type");
150            return Err(DataFrameError::DowncastError("String type"));
151        };
152
153        let created_at_list = ParquetHelper::extract_created_at(batch)?;
154        let stats = Self::extract_stats(batch)?;
155
156        Ok(BinnedMetric {
157            metric: metric_name,
158            created_at: created_at_list,
159            stats: vec![stats],
160        })
161    }
162
163    /// Convert a DataFrame to BinnedMetrics.
164    ///
165    /// # Arguments
166    /// * `df` - The DataFrame to convert
167    ///
168    /// # Returns
169    /// * `BinnedMetrics` - The converted BinnedMetrics
170    #[instrument(skip_all)]
171    pub async fn dataframe_to_binned_metrics(
172        df: DataFrame,
173    ) -> Result<BinnedMetrics, DataFrameError> {
174        debug!("Converting DataFrame to binned metrics");
175
176        let batches = df.collect().await?;
177
178        let metrics: Vec<BinnedMetric> = batches
179            .iter()
180            .map(Self::process_metric_record_batch)
181            .collect::<Result<Vec<_>, _>>()
182            .inspect_err(|e| {
183                error!("Failed to process metric record batch: {:?}", e);
184            })?;
185
186        Ok(BinnedMetrics::from_vec(metrics))
187    }
188}