scouter_dataframe/parquet/
spc.rs1use super::types::BinnedTableName;
2use crate::error::DataFrameError;
3use crate::parquet::traits::ParquetFrame;
4use crate::sql::helper::get_binned_spc_drift_records_query;
5use crate::storage::ObjectStore;
6use arrow::array::AsArray;
7use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
8use arrow_array::array::{Float64Array, Int32Array, StringArray, TimestampNanosecondArray};
9use arrow_array::RecordBatch;
10use async_trait::async_trait;
11use chrono::{DateTime, TimeZone, Utc};
12use datafusion::dataframe::DataFrame;
13use datafusion::prelude::SessionContext;
14use scouter_settings::ObjectStorageSettings;
15use scouter_types::spc::{SpcDriftFeature, SpcDriftFeatures};
16use scouter_types::{ServerRecords, SpcRecord};
17use scouter_types::{StorageType, ToDriftRecords};
18use std::collections::BTreeMap;
19use std::sync::Arc;
20
21pub struct SpcDataFrame {
22 schema: Arc<Schema>,
23 pub object_store: ObjectStore,
24}
25
26#[async_trait]
27impl ParquetFrame for SpcDataFrame {
28 fn new(storage_settings: &ObjectStorageSettings) -> Result<Self, DataFrameError> {
29 SpcDataFrame::new(storage_settings)
30 }
31
32 async fn get_dataframe(&self, records: ServerRecords) -> Result<DataFrame, DataFrameError> {
33 let records = records.to_spc_drift_records()?;
34 let batch = self.build_batch(records)?;
35
36 let ctx = self.object_store.get_session()?;
37
38 let df = ctx.read_batches(vec![batch])?;
39
40 Ok(df)
41 }
42
43 fn storage_root(&self) -> String {
44 self.object_store.storage_settings.canonicalized_path()
45 }
46
47 fn storage_type(&self) -> StorageType {
48 self.object_store.storage_settings.storage_type.clone()
49 }
50
51 fn get_session_context(&self) -> Result<SessionContext, DataFrameError> {
52 Ok(self.object_store.get_session()?)
53 }
54
55 fn get_binned_sql(
56 &self,
57 bin: &f64,
58 start_time: &DateTime<Utc>,
59 end_time: &DateTime<Utc>,
60 entity_id: &i32,
61 ) -> String {
62 get_binned_spc_drift_records_query(bin, start_time, end_time, entity_id)
63 }
64
65 fn table_name(&self) -> String {
66 BinnedTableName::Spc.to_string()
67 }
68}
69impl SpcDataFrame {
70 pub fn new(storage_settings: &ObjectStorageSettings) -> Result<Self, DataFrameError> {
71 let schema = Arc::new(Schema::new(vec![
72 Field::new(
73 "created_at",
74 DataType::Timestamp(TimeUnit::Nanosecond, None),
75 false,
76 ),
77 Field::new("entity_id", DataType::Int32, false),
78 Field::new("feature", DataType::Utf8, false),
79 Field::new("value", DataType::Float64, false),
80 ]));
81
82 let object_store = ObjectStore::new(storage_settings)?;
83
84 Ok(SpcDataFrame {
85 schema,
86 object_store,
87 })
88 }
89
90 pub fn build_batch(&self, records: Vec<SpcRecord>) -> Result<RecordBatch, DataFrameError> {
91 let created_at = TimestampNanosecondArray::from_iter_values(
92 records
93 .iter()
94 .map(|r| r.created_at.timestamp_nanos_opt().unwrap_or_default()),
95 );
96 let entity_id = Int32Array::from_iter_values(records.iter().map(|r| r.entity_id));
97 let feature = StringArray::from_iter_values(records.iter().map(|r| r.feature.as_str()));
98 let value = Float64Array::from_iter_values(records.iter().map(|r| r.value));
99
100 Ok(RecordBatch::try_new(
101 self.schema.clone(),
102 vec![
103 Arc::new(created_at),
104 Arc::new(entity_id),
105 Arc::new(feature),
106 Arc::new(value),
107 ],
108 )?)
109 }
110}
111
112fn process_spc_record_batch(
121 batch: &RecordBatch,
122 features: &mut BTreeMap<String, SpcDriftFeature>,
123) -> Result<(), DataFrameError> {
124 let feature_array = batch
126 .column_by_name("feature")
127 .ok_or_else(|| DataFrameError::MissingFieldError("feature"))?
128 .as_string_view_opt()
129 .ok_or_else(|| DataFrameError::DowncastError("feature"))?;
130
131 let created_at_list = batch
133 .column_by_name("created_at")
134 .ok_or_else(|| DataFrameError::MissingFieldError("created_at"))?
135 .as_list_opt::<i32>()
136 .ok_or_else(|| DataFrameError::DowncastError("created_at"))?;
137
138 let values_list = batch
139 .column_by_name("values")
140 .ok_or_else(|| DataFrameError::MissingFieldError("values"))?
141 .as_list_opt::<i32>()
142 .ok_or_else(|| DataFrameError::DowncastError("values"))?;
143
144 for row in 0..batch.num_rows() {
145 let feature_name = feature_array.value(row).to_string();
146
147 let created_at = created_at_list
149 .value(row)
150 .as_primitive::<arrow::datatypes::TimestampNanosecondType>()
151 .iter()
152 .filter_map(|ts| ts.map(|t| Utc.timestamp_nanos(t)))
153 .collect::<Vec<_>>();
154
155 let values = values_list
157 .value(row)
158 .as_primitive::<arrow::datatypes::Float64Type>()
159 .iter()
160 .flatten()
161 .collect::<Vec<_>>();
162
163 features.insert(feature_name, SpcDriftFeature { created_at, values });
164 }
165
166 Ok(())
167}
168
169pub async fn dataframe_to_spc_drift_features(
177 df: DataFrame,
178) -> Result<SpcDriftFeatures, DataFrameError> {
179 let batches = df.collect().await?;
180
181 let mut features = BTreeMap::new();
182
183 for batch in batches {
184 process_spc_record_batch(&batch, &mut features)?;
185 }
186
187 Ok(SpcDriftFeatures { features })
188}