scouter_dataframe/parquet/
spc.rs

1use super::types::BinnedTableName;
2use crate::error::DataFrameError;
3use crate::parquet::traits::ParquetFrame;
4use crate::sql::helper::get_binned_spc_drift_records_query;
5use crate::storage::ObjectStore;
6use arrow::array::AsArray;
7use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
8use arrow_array::array::{Float64Array, StringArray, TimestampNanosecondArray};
9use arrow_array::RecordBatch;
10use async_trait::async_trait;
11use chrono::{DateTime, TimeZone, Utc};
12use datafusion::dataframe::DataFrame;
13use datafusion::prelude::SessionContext;
14use scouter_settings::ObjectStorageSettings;
15use scouter_types::spc::{SpcDriftFeature, SpcDriftFeatures};
16use scouter_types::{ServerRecords, SpcServerRecord};
17use scouter_types::{StorageType, ToDriftRecords};
18use std::collections::BTreeMap;
19use std::sync::Arc;
20
21pub struct SpcDataFrame {
22    schema: Arc<Schema>,
23    pub object_store: ObjectStore,
24}
25
26#[async_trait]
27impl ParquetFrame for SpcDataFrame {
28    fn new(storage_settings: &ObjectStorageSettings) -> Result<Self, DataFrameError> {
29        SpcDataFrame::new(storage_settings)
30    }
31
32    async fn get_dataframe(&self, records: ServerRecords) -> Result<DataFrame, DataFrameError> {
33        let records = records.to_spc_drift_records()?;
34        let batch = self.build_batch(records)?;
35
36        let ctx = self.object_store.get_session()?;
37
38        let df = ctx.read_batches(vec![batch])?;
39
40        Ok(df)
41    }
42
43    fn storage_root(&self) -> String {
44        self.object_store.storage_settings.canonicalized_path()
45    }
46
47    fn storage_type(&self) -> StorageType {
48        self.object_store.storage_settings.storage_type.clone()
49    }
50
51    fn get_session_context(&self) -> Result<SessionContext, DataFrameError> {
52        Ok(self.object_store.get_session()?)
53    }
54
55    fn get_binned_sql(
56        &self,
57        bin: &f64,
58        start_time: &DateTime<Utc>,
59        end_time: &DateTime<Utc>,
60        space: &str,
61        name: &str,
62        version: &str,
63    ) -> String {
64        get_binned_spc_drift_records_query(bin, start_time, end_time, space, name, version)
65    }
66
67    fn table_name(&self) -> String {
68        BinnedTableName::Spc.to_string()
69    }
70}
71impl SpcDataFrame {
72    pub fn new(storage_settings: &ObjectStorageSettings) -> Result<Self, DataFrameError> {
73        let schema = Arc::new(Schema::new(vec![
74            Field::new(
75                "created_at",
76                DataType::Timestamp(TimeUnit::Nanosecond, None),
77                false,
78            ),
79            Field::new("space", DataType::Utf8, false),
80            Field::new("name", DataType::Utf8, false),
81            Field::new("version", DataType::Utf8, false),
82            Field::new("feature", DataType::Utf8, false),
83            Field::new("value", DataType::Float64, false),
84        ]));
85
86        let object_store = ObjectStore::new(storage_settings)?;
87
88        Ok(SpcDataFrame {
89            schema,
90            object_store,
91        })
92    }
93
94    pub fn build_batch(
95        &self,
96        records: Vec<SpcServerRecord>,
97    ) -> Result<RecordBatch, DataFrameError> {
98        let created_at = TimestampNanosecondArray::from_iter_values(
99            records
100                .iter()
101                .map(|r| r.created_at.timestamp_nanos_opt().unwrap_or_default()),
102        );
103        let space = StringArray::from_iter_values(records.iter().map(|r| r.space.as_str()));
104        let name = StringArray::from_iter_values(records.iter().map(|r| r.name.as_str()));
105        let version = StringArray::from_iter_values(records.iter().map(|r| r.version.as_str()));
106        let feature = StringArray::from_iter_values(records.iter().map(|r| r.feature.as_str()));
107        let value = Float64Array::from_iter_values(records.iter().map(|r| r.value));
108
109        Ok(RecordBatch::try_new(
110            self.schema.clone(),
111            vec![
112                Arc::new(created_at),
113                Arc::new(space),
114                Arc::new(name),
115                Arc::new(version),
116                Arc::new(feature),
117                Arc::new(value),
118            ],
119        )?)
120    }
121}
122
123/// Helper function to process a record batch to feature and SpcDriftFeature
124///
125/// # Arguments
126/// * `batch` - The record batch to process
127/// * `features` - The features to populate
128///
129/// # Returns
130/// * `Result<(), DataFrameError>` - The result of the processing
131fn process_spc_record_batch(
132    batch: &RecordBatch,
133    features: &mut BTreeMap<String, SpcDriftFeature>,
134) -> Result<(), DataFrameError> {
135    // Feature is the first column and is stringarray
136    let feature_array = batch
137        .column_by_name("feature")
138        .ok_or_else(|| DataFrameError::MissingFieldError("feature"))?
139        .as_string_view_opt()
140        .ok_or_else(|| DataFrameError::DowncastError("feature"))?;
141
142    // The created_at and values columns are lists<i32>
143    let created_at_list = batch
144        .column_by_name("created_at")
145        .ok_or_else(|| DataFrameError::MissingFieldError("created_at"))?
146        .as_list_opt::<i32>()
147        .ok_or_else(|| DataFrameError::DowncastError("created_at"))?;
148
149    let values_list = batch
150        .column_by_name("values")
151        .ok_or_else(|| DataFrameError::MissingFieldError("values"))?
152        .as_list_opt::<i32>()
153        .ok_or_else(|| DataFrameError::DowncastError("values"))?;
154
155    for row in 0..batch.num_rows() {
156        let feature_name = feature_array.value(row).to_string();
157
158        // Convert timestamps to DateTime<Utc>
159        let created_at = created_at_list
160            .value(row)
161            .as_primitive::<arrow::datatypes::TimestampNanosecondType>()
162            .iter()
163            .filter_map(|ts| ts.map(|t| Utc.timestamp_nanos(t)))
164            .collect::<Vec<_>>();
165
166        // Convert values to Vec<f64>
167        let values = values_list
168            .value(row)
169            .as_primitive::<arrow::datatypes::Float64Type>()
170            .iter()
171            .flatten()
172            .collect::<Vec<_>>();
173
174        features.insert(feature_name, SpcDriftFeature { created_at, values });
175    }
176
177    Ok(())
178}
179
180/// Convert a DataFrame to SpcDriftFeatures
181///
182/// # Arguments
183/// * `df` - The DataFrame to convert
184///
185/// # Returns
186/// * `SpcDriftFeatures` - The converted SpcDriftFeatures
187pub async fn dataframe_to_spc_drift_features(
188    df: DataFrame,
189) -> Result<SpcDriftFeatures, DataFrameError> {
190    let batches = df.collect().await?;
191
192    let mut features = BTreeMap::new();
193
194    for batch in batches {
195        process_spc_record_batch(&batch, &mut features)?;
196    }
197
198    Ok(SpcDriftFeatures { features })
199}