scouter_dataframe/parquet/
custom.rs

1use crate::error::DataFrameError;
2use crate::parquet::traits::ParquetFrame;
3use crate::parquet::types::BinnedTableName;
4use crate::sql::helper::get_binned_custom_metric_values_query;
5use crate::storage::ObjectStore;
6use arrow::array::AsArray;
7use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
8use arrow_array::array::{Float64Array, StringArray, TimestampNanosecondArray};
9use arrow_array::types::Float64Type;
10use arrow_array::{ListArray, StringViewArray};
11use arrow_array::{RecordBatch, StructArray};
12use async_trait::async_trait;
13use chrono::{DateTime, TimeZone, Utc};
14use datafusion::dataframe::DataFrame;
15use datafusion::prelude::SessionContext;
16use scouter_settings::ObjectStorageSettings;
17
18use scouter_types::{
19    custom::{BinnedCustomMetric, BinnedCustomMetricStats, BinnedCustomMetrics},
20    CustomMetricServerRecord, ServerRecords, StorageType, ToDriftRecords,
21};
22use std::sync::Arc;
23
24pub struct CustomMetricDataFrame {
25    schema: Arc<Schema>,
26    pub object_store: ObjectStore,
27}
28
29#[async_trait]
30impl ParquetFrame for CustomMetricDataFrame {
31    fn new(storage_settings: &ObjectStorageSettings) -> Result<Self, DataFrameError> {
32        CustomMetricDataFrame::new(storage_settings)
33    }
34
35    async fn get_dataframe(&self, records: ServerRecords) -> Result<DataFrame, DataFrameError> {
36        let records = records.to_custom_metric_drift_records()?;
37        let batch = self.build_batch(records)?;
38
39        let ctx = self.object_store.get_session()?;
40
41        let df = ctx.read_batches(vec![batch])?;
42
43        Ok(df)
44    }
45
46    fn storage_root(&self) -> String {
47        self.object_store.storage_settings.canonicalized_path()
48    }
49
50    fn storage_type(&self) -> StorageType {
51        self.object_store.storage_settings.storage_type.clone()
52    }
53
54    fn get_session_context(&self) -> Result<SessionContext, DataFrameError> {
55        Ok(self.object_store.get_session()?)
56    }
57
58    fn get_binned_sql(
59        &self,
60        bin: &f64,
61        start_time: &DateTime<Utc>,
62        end_time: &DateTime<Utc>,
63        space: &str,
64        name: &str,
65        version: &str,
66    ) -> String {
67        get_binned_custom_metric_values_query(bin, start_time, end_time, space, name, version)
68    }
69
70    fn table_name(&self) -> String {
71        BinnedTableName::CustomMetric.to_string()
72    }
73}
74
75impl CustomMetricDataFrame {
76    pub fn new(storage_settings: &ObjectStorageSettings) -> Result<Self, DataFrameError> {
77        let schema = Arc::new(Schema::new(vec![
78            Field::new(
79                "created_at",
80                DataType::Timestamp(TimeUnit::Nanosecond, None),
81                false,
82            ),
83            Field::new("space", DataType::Utf8, false),
84            Field::new("name", DataType::Utf8, false),
85            Field::new("version", DataType::Utf8, false),
86            Field::new("metric", DataType::Utf8, false),
87            Field::new("value", DataType::Float64, false),
88        ]));
89
90        let object_store = ObjectStore::new(storage_settings)?;
91
92        Ok(CustomMetricDataFrame {
93            schema,
94            object_store,
95        })
96    }
97
98    fn build_batch(
99        &self,
100        records: Vec<CustomMetricServerRecord>,
101    ) -> Result<RecordBatch, DataFrameError> {
102        let created_at_array = TimestampNanosecondArray::from_iter_values(
103            records
104                .iter()
105                .map(|r| r.created_at.timestamp_nanos_opt().unwrap_or_default()),
106        );
107
108        let space_array = StringArray::from_iter_values(records.iter().map(|r| r.space.as_str()));
109        let name_array = StringArray::from_iter_values(records.iter().map(|r| r.name.as_str()));
110        let version_array =
111            StringArray::from_iter_values(records.iter().map(|r| r.version.as_str()));
112        let metric_array = StringArray::from_iter_values(records.iter().map(|r| r.metric.as_str()));
113
114        let value_array = Float64Array::from_iter_values(records.iter().map(|r| r.value));
115
116        let batch = RecordBatch::try_new(
117            self.schema.clone(),
118            vec![
119                Arc::new(created_at_array),
120                Arc::new(space_array),
121                Arc::new(name_array),
122                Arc::new(version_array),
123                Arc::new(metric_array),
124                Arc::new(value_array),
125            ],
126        )?;
127
128        Ok(batch)
129    }
130}
131
132fn extract_created_at(batch: &RecordBatch) -> Result<Vec<DateTime<Utc>>, DataFrameError> {
133    let created_at_list = batch
134        .column(1)
135        .as_any()
136        .downcast_ref::<ListArray>()
137        .ok_or_else(|| DataFrameError::DowncastError("ListArray"))?;
138
139    let created_at_array = created_at_list.value(0);
140    Ok(created_at_array
141        .as_primitive::<arrow::datatypes::TimestampNanosecondType>()
142        .iter()
143        .filter_map(|ts| ts.map(|t| Utc.timestamp_nanos(t)))
144        .collect())
145}
146
147fn extract_stats(batch: &RecordBatch) -> Result<BinnedCustomMetricStats, DataFrameError> {
148    let stats_list = batch
149        .column(2)
150        .as_any()
151        .downcast_ref::<ListArray>()
152        .ok_or_else(|| DataFrameError::DowncastError("ListArray"))?
153        .value(0);
154
155    let stats_structs = stats_list
156        .as_any()
157        .downcast_ref::<StructArray>()
158        .ok_or_else(|| DataFrameError::DowncastError("StructArray"))?;
159
160    // extract avg, lower_bound, and upper_bound from the struct
161
162    // Extract avg, lower_bound, and upper_bound from the struct
163    let avg = stats_structs
164        .column_by_name("avg")
165        .ok_or_else(|| DataFrameError::MissingFieldError("avg"))?
166        .as_primitive::<Float64Type>()
167        .value(0);
168
169    let lower_bound = stats_structs
170        .column_by_name("lower_bound")
171        .ok_or_else(|| DataFrameError::MissingFieldError("lower_bound"))?
172        .as_primitive::<Float64Type>()
173        .value(0);
174
175    let upper_bound = stats_structs
176        .column_by_name("upper_bound")
177        .ok_or_else(|| DataFrameError::MissingFieldError("upper_bound"))?
178        .as_primitive::<Float64Type>()
179        .value(0);
180
181    Ok(BinnedCustomMetricStats {
182        avg,
183        lower_bound,
184        upper_bound,
185    })
186}
187
188fn process_custom_record_batch(batch: &RecordBatch) -> Result<BinnedCustomMetric, DataFrameError> {
189    let metric_array = batch
190        .column(0)
191        .as_any()
192        .downcast_ref::<StringViewArray>()
193        .expect("Failed to downcast to StringViewArray");
194    let metric_name = metric_array.value(0).to_string();
195    let created_at_list = extract_created_at(batch)?;
196    let stats = extract_stats(batch)?;
197
198    Ok(BinnedCustomMetric {
199        metric: metric_name,
200        created_at: created_at_list,
201        stats: vec![stats],
202    })
203}
204
205/// Convert a DataFrame to SpcDriftFeatures
206///
207/// # Arguments
208/// * `df` - The DataFrame to convert
209///
210/// # Returns
211/// * `SpcDriftFeatures` - The converted SpcDriftFeatures
212pub async fn dataframe_to_custom_drift_metrics(
213    df: DataFrame,
214) -> Result<BinnedCustomMetrics, DataFrameError> {
215    let batches = df.collect().await?;
216
217    let metrics: Vec<BinnedCustomMetric> = batches
218        .iter()
219        .map(process_custom_record_batch)
220        .collect::<Result<Vec<_>, _>>()?;
221
222    Ok(BinnedCustomMetrics::from_vec(metrics))
223}