scouter_dataframe/parquet/
spc.rs1use super::types::BinnedTableName;
2use crate::error::DataFrameError;
3use crate::parquet::traits::ParquetFrame;
4use crate::sql::helper::get_binned_spc_drift_records_query;
5use crate::storage::ObjectStore;
6use arrow::array::AsArray;
7use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
8use arrow_array::array::{
9 Float64Array, ListArray, StringArray, StringViewArray, TimestampNanosecondArray,
10};
11use arrow_array::RecordBatch;
12use async_trait::async_trait;
13use chrono::{DateTime, TimeZone, Utc};
14use datafusion::dataframe::DataFrame;
15use datafusion::prelude::SessionContext;
16use scouter_settings::ObjectStorageSettings;
17use scouter_types::spc::{SpcDriftFeature, SpcDriftFeatures};
18use scouter_types::{ServerRecords, SpcServerRecord};
19use scouter_types::{StorageType, ToDriftRecords};
20use std::collections::BTreeMap;
21use std::sync::Arc;
22
23pub struct SpcDataFrame {
24 schema: Arc<Schema>,
25 pub object_store: ObjectStore,
26}
27
28#[async_trait]
29impl ParquetFrame for SpcDataFrame {
30 fn new(storage_settings: &ObjectStorageSettings) -> Result<Self, DataFrameError> {
31 SpcDataFrame::new(storage_settings)
32 }
33
34 async fn get_dataframe(&self, records: ServerRecords) -> Result<DataFrame, DataFrameError> {
35 let records = records.to_spc_drift_records()?;
36 let batch = self.build_batch(records)?;
37
38 let ctx = self.object_store.get_session()?;
39
40 let df = ctx.read_batches(vec![batch])?;
41
42 Ok(df)
43 }
44
45 fn storage_root(&self) -> String {
46 self.object_store.storage_settings.canonicalized_path()
47 }
48
49 fn storage_type(&self) -> StorageType {
50 self.object_store.storage_settings.storage_type.clone()
51 }
52
53 fn get_session_context(&self) -> Result<SessionContext, DataFrameError> {
54 Ok(self.object_store.get_session()?)
55 }
56
57 fn get_binned_sql(
58 &self,
59 bin: &f64,
60 start_time: &DateTime<Utc>,
61 end_time: &DateTime<Utc>,
62 space: &str,
63 name: &str,
64 version: &str,
65 ) -> String {
66 get_binned_spc_drift_records_query(bin, start_time, end_time, space, name, version)
67 }
68
69 fn table_name(&self) -> String {
70 BinnedTableName::Spc.to_string()
71 }
72}
73impl SpcDataFrame {
74 pub fn new(storage_settings: &ObjectStorageSettings) -> Result<Self, DataFrameError> {
75 let schema = Arc::new(Schema::new(vec![
76 Field::new(
77 "created_at",
78 DataType::Timestamp(TimeUnit::Nanosecond, None),
79 false,
80 ),
81 Field::new("space", DataType::Utf8, false),
82 Field::new("name", DataType::Utf8, false),
83 Field::new("version", DataType::Utf8, false),
84 Field::new("feature", DataType::Utf8, false),
85 Field::new("value", DataType::Float64, false),
86 ]));
87
88 let object_store = ObjectStore::new(storage_settings)?;
89
90 Ok(SpcDataFrame {
91 schema,
92 object_store,
93 })
94 }
95
96 pub fn build_batch(
97 &self,
98 records: Vec<SpcServerRecord>,
99 ) -> Result<RecordBatch, DataFrameError> {
100 let created_at = TimestampNanosecondArray::from_iter_values(
101 records
102 .iter()
103 .map(|r| r.created_at.timestamp_nanos_opt().unwrap_or_default()),
104 );
105 let space = StringArray::from_iter_values(records.iter().map(|r| r.space.as_str()));
106 let name = StringArray::from_iter_values(records.iter().map(|r| r.name.as_str()));
107 let version = StringArray::from_iter_values(records.iter().map(|r| r.version.as_str()));
108 let feature = StringArray::from_iter_values(records.iter().map(|r| r.feature.as_str()));
109 let value = Float64Array::from_iter_values(records.iter().map(|r| r.value));
110
111 Ok(RecordBatch::try_new(
112 self.schema.clone(),
113 vec![
114 Arc::new(created_at),
115 Arc::new(space),
116 Arc::new(name),
117 Arc::new(version),
118 Arc::new(feature),
119 Arc::new(value),
120 ],
121 )?)
122 }
123}
124
125fn process_spc_record_batch(
134 batch: &RecordBatch,
135 features: &mut BTreeMap<String, SpcDriftFeature>,
136) -> Result<(), DataFrameError> {
137 let feature_array = batch
139 .column(0)
140 .as_any()
141 .downcast_ref::<StringViewArray>()
142 .expect("Failed to downcast to StringViewArray");
143
144 let created_at_list = batch
146 .column(1)
147 .as_any()
148 .downcast_ref::<ListArray>()
149 .ok_or_else(|| DataFrameError::GetColumnError("created_at"))?;
150
151 let values_list = batch
152 .column(2)
153 .as_any()
154 .downcast_ref::<ListArray>()
155 .ok_or_else(|| DataFrameError::GetColumnError("values"))?;
156
157 for row in 0..batch.num_rows() {
158 let feature_name = feature_array.value(row).to_string();
159
160 let created_at_array = created_at_list.value(row);
162 let values_array = values_list.value(row);
163
164 let created_at = created_at_array
166 .as_primitive::<arrow::datatypes::TimestampNanosecondType>()
167 .iter()
168 .filter_map(|ts| ts.map(|t| Utc.timestamp_nanos(t)))
169 .collect::<Vec<_>>();
170
171 let values = values_array
173 .as_primitive::<arrow::datatypes::Float64Type>()
174 .iter()
175 .flatten()
176 .collect::<Vec<_>>();
177
178 features.insert(feature_name, SpcDriftFeature { created_at, values });
179 }
180
181 Ok(())
182}
183
184pub async fn dataframe_to_spc_drift_features(
192 df: DataFrame,
193) -> Result<SpcDriftFeatures, DataFrameError> {
194 let batches = df.collect().await?;
195
196 let mut features = BTreeMap::new();
197
198 for batch in batches {
199 process_spc_record_batch(&batch, &mut features)?;
200 }
201
202 Ok(SpcDriftFeatures { features })
203}