scouter_dataframe/parquet/
spc.rs1use super::types::BinnedTableName;
2use crate::error::DataFrameError;
3use crate::parquet::traits::ParquetFrame;
4use crate::sql::helper::get_binned_spc_drift_records_query;
5use crate::storage::ObjectStore;
6use arrow::array::AsArray;
7use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
8use arrow_array::array::{Float64Array, StringArray, TimestampNanosecondArray};
9use arrow_array::RecordBatch;
10use async_trait::async_trait;
11use chrono::{DateTime, TimeZone, Utc};
12use datafusion::dataframe::DataFrame;
13use datafusion::prelude::SessionContext;
14use scouter_settings::ObjectStorageSettings;
15use scouter_types::spc::{SpcDriftFeature, SpcDriftFeatures};
16use scouter_types::{ServerRecords, SpcServerRecord};
17use scouter_types::{StorageType, ToDriftRecords};
18use std::collections::BTreeMap;
19use std::sync::Arc;
20
21pub struct SpcDataFrame {
22 schema: Arc<Schema>,
23 pub object_store: ObjectStore,
24}
25
26#[async_trait]
27impl ParquetFrame for SpcDataFrame {
28 fn new(storage_settings: &ObjectStorageSettings) -> Result<Self, DataFrameError> {
29 SpcDataFrame::new(storage_settings)
30 }
31
32 async fn get_dataframe(&self, records: ServerRecords) -> Result<DataFrame, DataFrameError> {
33 let records = records.to_spc_drift_records()?;
34 let batch = self.build_batch(records)?;
35
36 let ctx = self.object_store.get_session()?;
37
38 let df = ctx.read_batches(vec![batch])?;
39
40 Ok(df)
41 }
42
43 fn storage_root(&self) -> String {
44 self.object_store.storage_settings.canonicalized_path()
45 }
46
47 fn storage_type(&self) -> StorageType {
48 self.object_store.storage_settings.storage_type.clone()
49 }
50
51 fn get_session_context(&self) -> Result<SessionContext, DataFrameError> {
52 Ok(self.object_store.get_session()?)
53 }
54
55 fn get_binned_sql(
56 &self,
57 bin: &f64,
58 start_time: &DateTime<Utc>,
59 end_time: &DateTime<Utc>,
60 space: &str,
61 name: &str,
62 version: &str,
63 ) -> String {
64 get_binned_spc_drift_records_query(bin, start_time, end_time, space, name, version)
65 }
66
67 fn table_name(&self) -> String {
68 BinnedTableName::Spc.to_string()
69 }
70}
71impl SpcDataFrame {
72 pub fn new(storage_settings: &ObjectStorageSettings) -> Result<Self, DataFrameError> {
73 let schema = Arc::new(Schema::new(vec![
74 Field::new(
75 "created_at",
76 DataType::Timestamp(TimeUnit::Nanosecond, None),
77 false,
78 ),
79 Field::new("space", DataType::Utf8, false),
80 Field::new("name", DataType::Utf8, false),
81 Field::new("version", DataType::Utf8, false),
82 Field::new("feature", DataType::Utf8, false),
83 Field::new("value", DataType::Float64, false),
84 ]));
85
86 let object_store = ObjectStore::new(storage_settings)?;
87
88 Ok(SpcDataFrame {
89 schema,
90 object_store,
91 })
92 }
93
94 pub fn build_batch(
95 &self,
96 records: Vec<SpcServerRecord>,
97 ) -> Result<RecordBatch, DataFrameError> {
98 let created_at = TimestampNanosecondArray::from_iter_values(
99 records
100 .iter()
101 .map(|r| r.created_at.timestamp_nanos_opt().unwrap_or_default()),
102 );
103 let space = StringArray::from_iter_values(records.iter().map(|r| r.space.as_str()));
104 let name = StringArray::from_iter_values(records.iter().map(|r| r.name.as_str()));
105 let version = StringArray::from_iter_values(records.iter().map(|r| r.version.as_str()));
106 let feature = StringArray::from_iter_values(records.iter().map(|r| r.feature.as_str()));
107 let value = Float64Array::from_iter_values(records.iter().map(|r| r.value));
108
109 Ok(RecordBatch::try_new(
110 self.schema.clone(),
111 vec![
112 Arc::new(created_at),
113 Arc::new(space),
114 Arc::new(name),
115 Arc::new(version),
116 Arc::new(feature),
117 Arc::new(value),
118 ],
119 )?)
120 }
121}
122
123fn process_spc_record_batch(
132 batch: &RecordBatch,
133 features: &mut BTreeMap<String, SpcDriftFeature>,
134) -> Result<(), DataFrameError> {
135 let feature_array = batch
137 .column_by_name("feature")
138 .ok_or_else(|| DataFrameError::MissingFieldError("feature"))?
139 .as_string_view_opt()
140 .ok_or_else(|| DataFrameError::DowncastError("feature"))?;
141
142 let created_at_list = batch
144 .column_by_name("created_at")
145 .ok_or_else(|| DataFrameError::MissingFieldError("created_at"))?
146 .as_list_opt::<i32>()
147 .ok_or_else(|| DataFrameError::DowncastError("created_at"))?;
148
149 let values_list = batch
150 .column_by_name("values")
151 .ok_or_else(|| DataFrameError::MissingFieldError("values"))?
152 .as_list_opt::<i32>()
153 .ok_or_else(|| DataFrameError::DowncastError("values"))?;
154
155 for row in 0..batch.num_rows() {
156 let feature_name = feature_array.value(row).to_string();
157
158 let created_at = created_at_list
160 .value(row)
161 .as_primitive::<arrow::datatypes::TimestampNanosecondType>()
162 .iter()
163 .filter_map(|ts| ts.map(|t| Utc.timestamp_nanos(t)))
164 .collect::<Vec<_>>();
165
166 let values = values_list
168 .value(row)
169 .as_primitive::<arrow::datatypes::Float64Type>()
170 .iter()
171 .flatten()
172 .collect::<Vec<_>>();
173
174 features.insert(feature_name, SpcDriftFeature { created_at, values });
175 }
176
177 Ok(())
178}
179
180pub async fn dataframe_to_spc_drift_features(
188 df: DataFrame,
189) -> Result<SpcDriftFeatures, DataFrameError> {
190 let batches = df.collect().await?;
191
192 let mut features = BTreeMap::new();
193
194 for batch in batches {
195 process_spc_record_batch(&batch, &mut features)?;
196 }
197
198 Ok(SpcDriftFeatures { features })
199}