scouter_dataframe/parquet/
custom.rs1use crate::error::DataFrameError;
2use crate::parquet::traits::ParquetFrame;
3use crate::parquet::types::BinnedTableName;
4use crate::sql::helper::get_binned_custom_metric_values_query;
5use crate::storage::ObjectStore;
6use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
7use arrow_array::array::{Float64Array, StringArray, TimestampNanosecondArray};
8use arrow_array::RecordBatch;
9use async_trait::async_trait;
10use chrono::{DateTime, Utc};
11use datafusion::dataframe::DataFrame;
12use datafusion::prelude::SessionContext;
13use scouter_settings::ObjectStorageSettings;
14
15use scouter_types::{CustomMetricServerRecord, ServerRecords, StorageType, ToDriftRecords};
16use std::sync::Arc;
17
18pub struct CustomMetricDataFrame {
19 schema: Arc<Schema>,
20 pub object_store: ObjectStore,
21}
22
23#[async_trait]
24impl ParquetFrame for CustomMetricDataFrame {
25 fn new(storage_settings: &ObjectStorageSettings) -> Result<Self, DataFrameError> {
26 CustomMetricDataFrame::new(storage_settings)
27 }
28
29 async fn get_dataframe(&self, records: ServerRecords) -> Result<DataFrame, DataFrameError> {
30 let records = records.to_custom_metric_drift_records()?;
31 let batch = self.build_batch(records)?;
32
33 let ctx = self.object_store.get_session()?;
34
35 let df = ctx.read_batches(vec![batch])?;
36
37 Ok(df)
38 }
39
40 fn storage_root(&self) -> String {
41 self.object_store.storage_settings.canonicalized_path()
42 }
43
44 fn storage_type(&self) -> StorageType {
45 self.object_store.storage_settings.storage_type.clone()
46 }
47
48 fn get_session_context(&self) -> Result<SessionContext, DataFrameError> {
49 Ok(self.object_store.get_session()?)
50 }
51
52 fn get_binned_sql(
53 &self,
54 bin: &f64,
55 start_time: &DateTime<Utc>,
56 end_time: &DateTime<Utc>,
57 space: &str,
58 name: &str,
59 version: &str,
60 ) -> String {
61 get_binned_custom_metric_values_query(bin, start_time, end_time, space, name, version)
62 }
63
64 fn table_name(&self) -> String {
65 BinnedTableName::CustomMetric.to_string()
66 }
67}
68
69impl CustomMetricDataFrame {
70 pub fn new(storage_settings: &ObjectStorageSettings) -> Result<Self, DataFrameError> {
71 let schema = Arc::new(Schema::new(vec![
72 Field::new(
73 "created_at",
74 DataType::Timestamp(TimeUnit::Nanosecond, None),
75 false,
76 ),
77 Field::new("space", DataType::Utf8, false),
78 Field::new("name", DataType::Utf8, false),
79 Field::new("version", DataType::Utf8, false),
80 Field::new("metric", DataType::Utf8, false),
81 Field::new("value", DataType::Float64, false),
82 ]));
83
84 let object_store = ObjectStore::new(storage_settings)?;
85
86 Ok(CustomMetricDataFrame {
87 schema,
88 object_store,
89 })
90 }
91
92 fn build_batch(
93 &self,
94 records: Vec<CustomMetricServerRecord>,
95 ) -> Result<RecordBatch, DataFrameError> {
96 let created_at_array = TimestampNanosecondArray::from_iter_values(
97 records
98 .iter()
99 .map(|r| r.created_at.timestamp_nanos_opt().unwrap_or_default()),
100 );
101
102 let space_array = StringArray::from_iter_values(records.iter().map(|r| r.space.as_str()));
103 let name_array = StringArray::from_iter_values(records.iter().map(|r| r.name.as_str()));
104 let version_array =
105 StringArray::from_iter_values(records.iter().map(|r| r.version.as_str()));
106 let metric_array = StringArray::from_iter_values(records.iter().map(|r| r.metric.as_str()));
107
108 let value_array = Float64Array::from_iter_values(records.iter().map(|r| r.value));
109
110 let batch = RecordBatch::try_new(
111 self.schema.clone(),
112 vec![
113 Arc::new(created_at_array),
114 Arc::new(space_array),
115 Arc::new(name_array),
116 Arc::new(version_array),
117 Arc::new(metric_array),
118 Arc::new(value_array),
119 ],
120 )?;
121
122 Ok(batch)
123 }
124}