scouter_dataframe/parquet/
utils.rs

1use crate::error::DataFrameError;
2use arrow::array::AsArray;
3use arrow_array::types::Float64Type;
4use arrow_array::types::TimestampNanosecondType;
5use arrow_array::RecordBatch;
6use arrow_array::StringViewArray;
7use chrono::{DateTime, TimeZone, Utc};
8use datafusion::prelude::DataFrame;
9use scouter_types::{BinnedMetric, BinnedMetricStats, BinnedMetrics};
10use tracing::{debug, error, instrument};
11
12/// Now that we have at least 2 metric types that calculate avg, lower_bound, and upper_bound as part of their stats,
13/// it makes sense to implement a generic trait that we can use.
14pub struct ParquetHelper {}
15
16impl ParquetHelper {
17    #[instrument(skip_all)]
18    pub fn extract_feature_array(batch: &RecordBatch) -> Result<&StringViewArray, DataFrameError> {
19        let feature_array = batch
20            .column_by_name("feature")
21            .ok_or_else(|| {
22                error!("Missing 'feature' field in RecordBatch");
23                DataFrameError::MissingFieldError("feature")
24            })?
25            .as_string_view_opt()
26            .ok_or_else(|| {
27                error!("Failed to downcast 'feature' field to StringViewArray");
28                DataFrameError::DowncastError("StringViewArray")
29            })?;
30        Ok(feature_array)
31    }
32
33    #[instrument(skip_all)]
34    pub fn extract_created_at(batch: &RecordBatch) -> Result<Vec<DateTime<Utc>>, DataFrameError> {
35        let created_at_list = batch
36            .column_by_name("created_at")
37            .ok_or_else(|| {
38                error!("Missing 'created_at' field in RecordBatch");
39                DataFrameError::MissingFieldError("created_at")
40            })?
41            .as_list_opt::<i32>()
42            .ok_or_else(|| {
43                error!("Failed to downcast 'created_at' field to ListArray");
44                DataFrameError::DowncastError("ListArray")
45            })?;
46
47        let created_at_array = created_at_list.value(0);
48        Ok(created_at_array
49            .as_primitive::<TimestampNanosecondType>()
50            .iter()
51            .filter_map(|ts| ts.map(|t| Utc.timestamp_nanos(t)))
52            .collect())
53    }
54}
55pub struct BinnedMetricsExtractor {}
56
57impl BinnedMetricsExtractor {
58    #[instrument(skip_all)]
59    fn extract_stats(batch: &RecordBatch) -> Result<BinnedMetricStats, DataFrameError> {
60        let stats_list = batch
61            .column_by_name("stats")
62            .ok_or_else(|| {
63                error!("Missing 'stats' field in RecordBatch");
64                DataFrameError::MissingFieldError("stats")
65            })?
66            .as_list_opt::<i32>()
67            .ok_or_else(|| {
68                error!("Failed to downcast 'stats' field to ListArray");
69                DataFrameError::DowncastError("ListArray")
70            })?
71            .value(0);
72
73        let stats_structs = stats_list.as_struct_opt().ok_or_else(|| {
74            error!("Failed to downcast 'stats' field to StructArray");
75            DataFrameError::DowncastError("StructArray")
76        })?;
77
78        // extract avg, lower_bound, and upper_bound from the struct
79
80        // Extract avg, lower_bound, and upper_bound from the struct
81        let avg = stats_structs
82            .column_by_name("avg")
83            .ok_or_else(|| DataFrameError::MissingFieldError("avg"))
84            .inspect_err(|e| {
85                error!("Failed to get 'avg' field from stats: {:?}", e);
86            })?
87            .as_primitive_opt::<Float64Type>()
88            .ok_or_else(|| DataFrameError::DowncastError("Float64Array"))?
89            .value(0);
90
91        let lower_bound = stats_structs
92            .column_by_name("lower_bound")
93            .ok_or_else(|| DataFrameError::MissingFieldError("lower_bound"))
94            .inspect_err(|e| {
95                error!("Failed to get 'lower_bound' field from stats: {:?}", e);
96            })?
97            .as_primitive_opt::<Float64Type>()
98            .ok_or_else(|| DataFrameError::DowncastError("Float64Array"))?
99            .value(0);
100
101        let upper_bound = stats_structs
102            .column_by_name("upper_bound")
103            .ok_or_else(|| DataFrameError::MissingFieldError("upper_bound"))
104            .inspect_err(|e| {
105                error!("Failed to get 'upper_bound' field from stats: {:?}", e);
106            })?
107            .as_primitive_opt::<Float64Type>()
108            .ok_or_else(|| DataFrameError::DowncastError("Float64Array"))?
109            .value(0);
110
111        Ok(BinnedMetricStats {
112            avg,
113            lower_bound,
114            upper_bound,
115        })
116    }
117
118    #[instrument(skip_all)]
119    fn process_metric_record_batch(batch: &RecordBatch) -> Result<BinnedMetric, DataFrameError> {
120        debug!("Processing metric record batch");
121
122        let metric_array = batch
123            .column_by_name("metric")
124            .ok_or_else(|| {
125                error!("Missing 'metric' field in RecordBatch");
126                DataFrameError::MissingFieldError("metric")
127            })?
128            .as_string_view_opt()
129            .ok_or_else(|| {
130                error!("Failed to downcast 'metric' field to StringViewArray");
131                DataFrameError::DowncastError("StringViewArray")
132            })?;
133        let metric_name = metric_array.value(0).to_string();
134        let created_at_list = ParquetHelper::extract_created_at(batch)?;
135        let stats = Self::extract_stats(batch)?;
136
137        Ok(BinnedMetric {
138            metric: metric_name,
139            created_at: created_at_list,
140            stats: vec![stats],
141        })
142    }
143
144    /// Convert a DataFrame to BinnedMetrics.
145    ///
146    /// # Arguments
147    /// * `df` - The DataFrame to convert
148    ///
149    /// # Returns
150    /// * `BinnedMetrics` - The converted BinnedMetrics
151    #[instrument(skip_all)]
152    pub async fn dataframe_to_binned_metrics(
153        df: DataFrame,
154    ) -> Result<BinnedMetrics, DataFrameError> {
155        debug!("Converting DataFrame to binned metrics");
156
157        let batches = df.collect().await?;
158
159        let metrics: Vec<BinnedMetric> = batches
160            .iter()
161            .map(Self::process_metric_record_batch)
162            .collect::<Result<Vec<_>, _>>()
163            .inspect_err(|e| {
164                error!("Failed to process metric record batch: {:?}", e);
165            })?;
166
167        Ok(BinnedMetrics::from_vec(metrics))
168    }
169}