scouter_dataframe/parquet/
spc.rs

1use super::types::BinnedTableName;
2use crate::error::DataFrameError;
3use crate::parquet::traits::ParquetFrame;
4use crate::sql::helper::get_binned_spc_drift_records_query;
5use crate::storage::ObjectStore;
6use arrow::array::AsArray;
7use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
8use arrow_array::array::{
9    Float64Array, ListArray, StringArray, StringViewArray, TimestampNanosecondArray,
10};
11use arrow_array::RecordBatch;
12use async_trait::async_trait;
13use chrono::{DateTime, TimeZone, Utc};
14use datafusion::dataframe::DataFrame;
15use datafusion::prelude::SessionContext;
16use scouter_settings::ObjectStorageSettings;
17use scouter_types::spc::{SpcDriftFeature, SpcDriftFeatures};
18use scouter_types::{ServerRecords, SpcServerRecord};
19use scouter_types::{StorageType, ToDriftRecords};
20use std::collections::BTreeMap;
21use std::sync::Arc;
22
23pub struct SpcDataFrame {
24    schema: Arc<Schema>,
25    pub object_store: ObjectStore,
26}
27
28#[async_trait]
29impl ParquetFrame for SpcDataFrame {
30    fn new(storage_settings: &ObjectStorageSettings) -> Result<Self, DataFrameError> {
31        SpcDataFrame::new(storage_settings)
32    }
33
34    async fn get_dataframe(&self, records: ServerRecords) -> Result<DataFrame, DataFrameError> {
35        let records = records.to_spc_drift_records()?;
36        let batch = self.build_batch(records)?;
37
38        let ctx = self.object_store.get_session()?;
39
40        let df = ctx.read_batches(vec![batch])?;
41
42        Ok(df)
43    }
44
45    fn storage_root(&self) -> String {
46        self.object_store.storage_settings.canonicalized_path()
47    }
48
49    fn storage_type(&self) -> StorageType {
50        self.object_store.storage_settings.storage_type.clone()
51    }
52
53    fn get_session_context(&self) -> Result<SessionContext, DataFrameError> {
54        Ok(self.object_store.get_session()?)
55    }
56
57    fn get_binned_sql(
58        &self,
59        bin: &f64,
60        start_time: &DateTime<Utc>,
61        end_time: &DateTime<Utc>,
62        space: &str,
63        name: &str,
64        version: &str,
65    ) -> String {
66        get_binned_spc_drift_records_query(bin, start_time, end_time, space, name, version)
67    }
68
69    fn table_name(&self) -> String {
70        BinnedTableName::Spc.to_string()
71    }
72}
73impl SpcDataFrame {
74    pub fn new(storage_settings: &ObjectStorageSettings) -> Result<Self, DataFrameError> {
75        let schema = Arc::new(Schema::new(vec![
76            Field::new(
77                "created_at",
78                DataType::Timestamp(TimeUnit::Nanosecond, None),
79                false,
80            ),
81            Field::new("space", DataType::Utf8, false),
82            Field::new("name", DataType::Utf8, false),
83            Field::new("version", DataType::Utf8, false),
84            Field::new("feature", DataType::Utf8, false),
85            Field::new("value", DataType::Float64, false),
86        ]));
87
88        let object_store = ObjectStore::new(storage_settings)?;
89
90        Ok(SpcDataFrame {
91            schema,
92            object_store,
93        })
94    }
95
96    pub fn build_batch(
97        &self,
98        records: Vec<SpcServerRecord>,
99    ) -> Result<RecordBatch, DataFrameError> {
100        let created_at = TimestampNanosecondArray::from_iter_values(
101            records
102                .iter()
103                .map(|r| r.created_at.timestamp_nanos_opt().unwrap_or_default()),
104        );
105        let space = StringArray::from_iter_values(records.iter().map(|r| r.space.as_str()));
106        let name = StringArray::from_iter_values(records.iter().map(|r| r.name.as_str()));
107        let version = StringArray::from_iter_values(records.iter().map(|r| r.version.as_str()));
108        let feature = StringArray::from_iter_values(records.iter().map(|r| r.feature.as_str()));
109        let value = Float64Array::from_iter_values(records.iter().map(|r| r.value));
110
111        Ok(RecordBatch::try_new(
112            self.schema.clone(),
113            vec![
114                Arc::new(created_at),
115                Arc::new(space),
116                Arc::new(name),
117                Arc::new(version),
118                Arc::new(feature),
119                Arc::new(value),
120            ],
121        )?)
122    }
123}
124
125/// Helper function to process a record batch to feature and SpcDriftFeature
126///
127/// # Arguments
128/// * `batch` - The record batch to process
129/// * `features` - The features to populate
130///
131/// # Returns
132/// * `Result<(), DataFrameError>` - The result of the processing
133fn process_spc_record_batch(
134    batch: &RecordBatch,
135    features: &mut BTreeMap<String, SpcDriftFeature>,
136) -> Result<(), DataFrameError> {
137    // Feature is the first column and is stringarray
138    let feature_array = batch
139        .column(0)
140        .as_any()
141        .downcast_ref::<StringViewArray>()
142        .expect("Failed to downcast to StringViewArray");
143
144    // The created_at and values columns are lists
145    let created_at_list = batch
146        .column(1)
147        .as_any()
148        .downcast_ref::<ListArray>()
149        .ok_or_else(|| DataFrameError::GetColumnError("created_at"))?;
150
151    let values_list = batch
152        .column(2)
153        .as_any()
154        .downcast_ref::<ListArray>()
155        .ok_or_else(|| DataFrameError::GetColumnError("values"))?;
156
157    for row in 0..batch.num_rows() {
158        let feature_name = feature_array.value(row).to_string();
159
160        // Get the inner arrays for this row
161        let created_at_array = created_at_list.value(row);
162        let values_array = values_list.value(row);
163
164        // Convert timestamps to DateTime<Utc>
165        let created_at = created_at_array
166            .as_primitive::<arrow::datatypes::TimestampNanosecondType>()
167            .iter()
168            .filter_map(|ts| ts.map(|t| Utc.timestamp_nanos(t)))
169            .collect::<Vec<_>>();
170
171        // Convert values to Vec<f64>
172        let values = values_array
173            .as_primitive::<arrow::datatypes::Float64Type>()
174            .iter()
175            .flatten()
176            .collect::<Vec<_>>();
177
178        features.insert(feature_name, SpcDriftFeature { created_at, values });
179    }
180
181    Ok(())
182}
183
184/// Convert a DataFrame to SpcDriftFeatures
185///
186/// # Arguments
187/// * `df` - The DataFrame to convert
188///
189/// # Returns
190/// * `SpcDriftFeatures` - The converted SpcDriftFeatures
191pub async fn dataframe_to_spc_drift_features(
192    df: DataFrame,
193) -> Result<SpcDriftFeatures, DataFrameError> {
194    let batches = df.collect().await?;
195
196    let mut features = BTreeMap::new();
197
198    for batch in batches {
199        process_spc_record_batch(&batch, &mut features)?;
200    }
201
202    Ok(SpcDriftFeatures { features })
203}