scouter_dataframe/parquet/
dataframe.rs1use crate::error::DataFrameError;
2use crate::parquet::custom::CustomMetricDataFrame;
3use crate::parquet::psi::PsiDataFrame;
4use crate::parquet::spc::SpcDataFrame;
5use crate::parquet::traits::ParquetFrame;
6use crate::storage::ObjectStore;
7use chrono::{DateTime, Utc};
8use datafusion::prelude::DataFrame;
9use scouter_settings::ObjectStorageSettings;
10use scouter_types::{RecordType, ServerRecords, StorageType};
11use tracing::instrument;
12
13pub enum ParquetDataFrame {
14 CustomMetric(CustomMetricDataFrame),
15 Psi(PsiDataFrame),
16 Spc(SpcDataFrame),
17}
18
19impl ParquetDataFrame {
20 pub fn new(
21 storage_settings: &ObjectStorageSettings,
22 record_type: &RecordType,
23 ) -> Result<Self, DataFrameError> {
24 match record_type {
25 RecordType::Custom => Ok(ParquetDataFrame::CustomMetric(CustomMetricDataFrame::new(
26 storage_settings,
27 )?)),
28 RecordType::Psi => Ok(ParquetDataFrame::Psi(PsiDataFrame::new(storage_settings)?)),
29 RecordType::Spc => Ok(ParquetDataFrame::Spc(SpcDataFrame::new(storage_settings)?)),
30
31 _ => Err(DataFrameError::InvalidRecordTypeError),
32 }
33 }
34
35 #[instrument(skip_all, err)]
43 pub async fn write_parquet(
44 &self,
45 rpath: &str,
46 records: ServerRecords,
47 ) -> Result<(), DataFrameError> {
48 let rpath = &self.resolve_path(rpath);
49
50 match self {
51 ParquetDataFrame::CustomMetric(df) => df.write_parquet(rpath, records).await,
52 ParquetDataFrame::Psi(df) => df.write_parquet(rpath, records).await,
53 ParquetDataFrame::Spc(df) => df.write_parquet(rpath, records).await,
54 }
55 }
56
57 pub fn storage_root(&self) -> String {
58 match self {
59 ParquetDataFrame::CustomMetric(df) => df.storage_root(),
60 ParquetDataFrame::Psi(df) => df.storage_root(),
61 ParquetDataFrame::Spc(df) => df.storage_root(),
62 }
63 }
64
65 pub fn storage_client(&self) -> ObjectStore {
67 match self {
68 ParquetDataFrame::CustomMetric(df) => df.object_store.clone(),
69 ParquetDataFrame::Psi(df) => df.object_store.clone(),
70 ParquetDataFrame::Spc(df) => df.object_store.clone(),
71 }
72 }
73
74 #[allow(clippy::too_many_arguments)]
85 pub async fn get_binned_metrics(
86 &self,
87 path: &str,
88 bin: &f64,
89 start_time: &DateTime<Utc>,
90 end_time: &DateTime<Utc>,
91 space: &str,
92 name: &str,
93 version: &str,
94 ) -> Result<DataFrame, DataFrameError> {
95 let read_path = &self.resolve_path(path);
96
97 match self {
98 ParquetDataFrame::CustomMetric(df) => {
99 df.get_binned_metrics(read_path, bin, start_time, end_time, space, name, version)
100 .await
101 }
102 ParquetDataFrame::Psi(df) => {
103 df.get_binned_metrics(read_path, bin, start_time, end_time, space, name, version)
104 .await
105 }
106 ParquetDataFrame::Spc(df) => {
107 df.get_binned_metrics(read_path, bin, start_time, end_time, space, name, version)
108 .await
109 }
110 }
111 }
112
113 pub fn storage_type(&self) -> StorageType {
115 match self {
116 ParquetDataFrame::CustomMetric(df) => {
117 df.object_store.storage_settings.storage_type.clone()
118 }
119 ParquetDataFrame::Psi(df) => df.object_store.storage_settings.storage_type.clone(),
120 ParquetDataFrame::Spc(df) => df.object_store.storage_settings.storage_type.clone(),
121 }
122 }
123
124 pub fn resolve_path(&self, path: &str) -> String {
125 format!("{}/{}/", self.storage_root(), path)
126 }
127}
128
129#[cfg(test)]
130mod tests {
131
132 use super::*;
133 use crate::parquet::custom::dataframe_to_custom_drift_metrics;
134 use crate::parquet::psi::dataframe_to_psi_drift_features;
135 use crate::parquet::spc::dataframe_to_spc_drift_features;
136 use chrono::Utc;
137 use object_store::path::Path;
138 use rand::Rng;
139 use scouter_settings::ObjectStorageSettings;
140 use scouter_types::{
141 CustomMetricServerRecord, PsiServerRecord, ServerRecord, ServerRecords, SpcServerRecord,
142 };
143
144 fn cleanup() {
145 let storage_settings = ObjectStorageSettings::default();
146 let current_dir = std::env::current_dir().unwrap();
147 let storage_path = current_dir.join(storage_settings.storage_root());
148 if storage_path.exists() {
149 std::fs::remove_dir_all(storage_path).unwrap();
150 }
151 }
152
153 #[tokio::test]
154 async fn test_write_custom_dataframe_local() {
155 cleanup();
156 let storage_settings = ObjectStorageSettings::default();
157 let df = ParquetDataFrame::new(&storage_settings, &RecordType::Custom).unwrap();
158 let mut batch = Vec::new();
159 let start_utc = Utc::now();
160 let end_utc_for_test = start_utc + chrono::Duration::hours(3);
161
162 for i in 0..3 {
164 for j in 0..50 {
165 let record = ServerRecord::Custom(CustomMetricServerRecord {
166 created_at: Utc::now() + chrono::Duration::hours(i),
167 name: "test".to_string(),
168 space: "test".to_string(),
169 version: "1.0".to_string(),
170 metric: format!("metric{}", i),
171 value: j as f64,
172 });
173
174 batch.push(record);
175 }
176 }
177
178 let records = ServerRecords::new(batch);
179 let rpath = "custom";
180 df.write_parquet(rpath, records.clone()).await.unwrap();
181
182 let canonical_path = df.storage_root();
184 let data_path = object_store::path::Path::from(canonical_path);
185
186 let files = df.storage_client().list(Some(&data_path)).await.unwrap();
188 assert_eq!(files.len(), 3);
189
190 let new_df = ParquetDataFrame::new(&storage_settings, &RecordType::Custom).unwrap();
192
193 let read_df = new_df
194 .get_binned_metrics(
195 rpath,
196 &0.01,
197 &start_utc,
198 &end_utc_for_test,
199 "test",
200 "test",
201 "1.0",
202 )
203 .await
204 .unwrap();
205
206 let binned_metrics = dataframe_to_custom_drift_metrics(read_df).await.unwrap();
209
210 assert_eq!(binned_metrics.metrics.len(), 3);
211
212 for file in files.iter() {
214 let path = Path::from(file.to_string());
215 df.storage_client()
216 .delete(&path)
217 .await
218 .expect("Failed to delete file");
219 }
220 let files = df.storage_client().list(Some(&data_path)).await.unwrap();
223 assert_eq!(files.len(), 0);
224
225 cleanup();
227 }
228
229 #[tokio::test]
230 async fn test_write_psi_dataframe_local() {
231 cleanup();
232
233 let storage_settings = ObjectStorageSettings::default();
234 let df = ParquetDataFrame::new(&storage_settings, &RecordType::Psi).unwrap();
235 let mut batch = Vec::new();
236 let start_utc = Utc::now();
237 let end_utc_for_test = start_utc + chrono::Duration::hours(3);
238
239 for i in 0..3 {
240 for j in 0..5 {
241 let record = ServerRecord::Psi(PsiServerRecord {
242 created_at: Utc::now() + chrono::Duration::hours(i),
243 name: "test".to_string(),
244 space: "test".to_string(),
245 version: "1.0".to_string(),
246 feature: "feature1".to_string(),
247 bin_id: j as usize,
248 bin_count: rand::rng().random_range(0..100),
249 });
250
251 batch.push(record);
252 }
253 }
254
255 for i in 0..3 {
256 for j in 0..5 {
257 let record = ServerRecord::Psi(PsiServerRecord {
258 created_at: Utc::now() + chrono::Duration::hours(i),
259 name: "test".to_string(),
260 space: "test".to_string(),
261 version: "1.0".to_string(),
262 feature: "feature2".to_string(),
263 bin_id: j as usize,
264 bin_count: rand::rng().random_range(0..100),
265 });
266
267 batch.push(record);
268 }
269 }
270
271 let records = ServerRecords::new(batch);
272 let rpath = "psi";
273 df.write_parquet(rpath, records.clone()).await.unwrap();
274
275 let canonical_path = df.storage_root();
277 let data_path = object_store::path::Path::from(canonical_path);
278
279 let files = df.storage_client().list(Some(&data_path)).await.unwrap();
281 assert_eq!(files.len(), 3);
282
283 let read_df = df
285 .get_binned_metrics(
286 rpath,
287 &0.01,
288 &start_utc,
289 &end_utc_for_test,
290 "test",
291 "test",
292 "1.0",
293 )
294 .await
295 .unwrap();
296
297 let psi_drift = dataframe_to_psi_drift_features(read_df).await.unwrap();
298 assert_eq!(psi_drift.len(), 2);
299
300 for file in files.iter() {
302 let path = Path::from(file.to_string());
303 df.storage_client()
304 .delete(&path)
305 .await
306 .expect("Failed to delete file");
307 }
308 let files = df.storage_client().list(Some(&data_path)).await.unwrap();
311 assert_eq!(files.len(), 0);
312
313 cleanup();
315 }
316
317 #[tokio::test]
318 async fn test_write_spc_dataframe_local() {
319 cleanup();
320 let storage_settings = ObjectStorageSettings::default();
321 let df = ParquetDataFrame::new(&storage_settings, &RecordType::Spc).unwrap();
322 let mut batch = Vec::new();
323 let start_utc = Utc::now();
324 let end_utc_for_test = start_utc + chrono::Duration::hours(3);
325
326 for i in 0..5 {
327 let record = ServerRecord::Spc(SpcServerRecord {
328 created_at: Utc::now() + chrono::Duration::hours(i),
329 name: "test".to_string(),
330 space: "test".to_string(),
331 version: "1.0".to_string(),
332 feature: "feature1".to_string(),
333 value: i as f64,
334 });
335
336 batch.push(record);
337 }
338
339 for i in 0..5 {
340 let record = ServerRecord::Spc(SpcServerRecord {
341 created_at: Utc::now() + chrono::Duration::hours(i),
342 name: "test".to_string(),
343 space: "test".to_string(),
344 version: "1.0".to_string(),
345 feature: "feature2".to_string(),
346 value: i as f64,
347 });
348
349 batch.push(record);
350 }
351
352 let records = ServerRecords::new(batch);
353 let rpath = "spc";
354 df.write_parquet(rpath, records.clone()).await.unwrap();
355
356 let canonical_path = df.storage_root();
358 let data_path = object_store::path::Path::from(canonical_path);
359
360 let files = df.storage_client().list(Some(&data_path)).await.unwrap();
362 assert_eq!(files.len(), 5);
363
364 let read_df = df
366 .get_binned_metrics(
367 rpath,
368 &0.01,
369 &start_utc,
370 &end_utc_for_test,
371 "test",
372 "test",
373 "1.0",
374 )
375 .await
376 .unwrap();
377
378 let _spc_drift = dataframe_to_spc_drift_features(read_df).await.unwrap();
379
380 for file in files.iter() {
382 let path = Path::from(file.to_string());
383 df.storage_client()
384 .delete(&path)
385 .await
386 .expect("Failed to delete file");
387 }
388 let files = df.storage_client().list(Some(&data_path)).await.unwrap();
391 assert_eq!(files.len(), 0);
392
393 cleanup();
395 }
396}