scouter_dataframe/parquet/
custom.rs1use crate::error::DataFrameError;
2use crate::parquet::traits::ParquetFrame;
3use crate::parquet::types::BinnedTableName;
4use crate::sql::helper::get_binned_custom_metric_values_query;
5use crate::storage::ObjectStore;
6use arrow::array::AsArray;
7use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
8use arrow_array::array::{Float64Array, StringArray, TimestampNanosecondArray};
9use arrow_array::types::Float64Type;
10use arrow_array::{ListArray, StringViewArray};
11use arrow_array::{RecordBatch, StructArray};
12use async_trait::async_trait;
13use chrono::{DateTime, TimeZone, Utc};
14use datafusion::dataframe::DataFrame;
15use datafusion::prelude::SessionContext;
16use scouter_settings::ObjectStorageSettings;
17
18use scouter_types::{
19 custom::{BinnedCustomMetric, BinnedCustomMetricStats, BinnedCustomMetrics},
20 CustomMetricServerRecord, ServerRecords, StorageType, ToDriftRecords,
21};
22use std::sync::Arc;
23
24pub struct CustomMetricDataFrame {
25 schema: Arc<Schema>,
26 pub object_store: ObjectStore,
27}
28
29#[async_trait]
30impl ParquetFrame for CustomMetricDataFrame {
31 fn new(storage_settings: &ObjectStorageSettings) -> Result<Self, DataFrameError> {
32 CustomMetricDataFrame::new(storage_settings)
33 }
34
35 async fn get_dataframe(&self, records: ServerRecords) -> Result<DataFrame, DataFrameError> {
36 let records = records.to_custom_metric_drift_records()?;
37 let batch = self.build_batch(records)?;
38
39 let ctx = self.object_store.get_session()?;
40
41 let df = ctx.read_batches(vec![batch])?;
42
43 Ok(df)
44 }
45
46 fn storage_root(&self) -> String {
47 self.object_store.storage_settings.canonicalized_path()
48 }
49
50 fn storage_type(&self) -> StorageType {
51 self.object_store.storage_settings.storage_type.clone()
52 }
53
54 fn get_session_context(&self) -> Result<SessionContext, DataFrameError> {
55 Ok(self.object_store.get_session()?)
56 }
57
58 fn get_binned_sql(
59 &self,
60 bin: &f64,
61 start_time: &DateTime<Utc>,
62 end_time: &DateTime<Utc>,
63 space: &str,
64 name: &str,
65 version: &str,
66 ) -> String {
67 get_binned_custom_metric_values_query(bin, start_time, end_time, space, name, version)
68 }
69
70 fn table_name(&self) -> String {
71 BinnedTableName::CustomMetric.to_string()
72 }
73}
74
75impl CustomMetricDataFrame {
76 pub fn new(storage_settings: &ObjectStorageSettings) -> Result<Self, DataFrameError> {
77 let schema = Arc::new(Schema::new(vec![
78 Field::new(
79 "created_at",
80 DataType::Timestamp(TimeUnit::Nanosecond, None),
81 false,
82 ),
83 Field::new("space", DataType::Utf8, false),
84 Field::new("name", DataType::Utf8, false),
85 Field::new("version", DataType::Utf8, false),
86 Field::new("metric", DataType::Utf8, false),
87 Field::new("value", DataType::Float64, false),
88 ]));
89
90 let object_store = ObjectStore::new(storage_settings)?;
91
92 Ok(CustomMetricDataFrame {
93 schema,
94 object_store,
95 })
96 }
97
98 fn build_batch(
99 &self,
100 records: Vec<CustomMetricServerRecord>,
101 ) -> Result<RecordBatch, DataFrameError> {
102 let created_at_array = TimestampNanosecondArray::from_iter_values(
103 records
104 .iter()
105 .map(|r| r.created_at.timestamp_nanos_opt().unwrap_or_default()),
106 );
107
108 let space_array = StringArray::from_iter_values(records.iter().map(|r| r.space.as_str()));
109 let name_array = StringArray::from_iter_values(records.iter().map(|r| r.name.as_str()));
110 let version_array =
111 StringArray::from_iter_values(records.iter().map(|r| r.version.as_str()));
112 let metric_array = StringArray::from_iter_values(records.iter().map(|r| r.metric.as_str()));
113
114 let value_array = Float64Array::from_iter_values(records.iter().map(|r| r.value));
115
116 let batch = RecordBatch::try_new(
117 self.schema.clone(),
118 vec![
119 Arc::new(created_at_array),
120 Arc::new(space_array),
121 Arc::new(name_array),
122 Arc::new(version_array),
123 Arc::new(metric_array),
124 Arc::new(value_array),
125 ],
126 )?;
127
128 Ok(batch)
129 }
130}
131
132fn extract_created_at(batch: &RecordBatch) -> Result<Vec<DateTime<Utc>>, DataFrameError> {
133 let created_at_list = batch
134 .column(1)
135 .as_any()
136 .downcast_ref::<ListArray>()
137 .ok_or_else(|| DataFrameError::DowncastError("ListArray"))?;
138
139 let created_at_array = created_at_list.value(0);
140 Ok(created_at_array
141 .as_primitive::<arrow::datatypes::TimestampNanosecondType>()
142 .iter()
143 .filter_map(|ts| ts.map(|t| Utc.timestamp_nanos(t)))
144 .collect())
145}
146
147fn extract_stats(batch: &RecordBatch) -> Result<BinnedCustomMetricStats, DataFrameError> {
148 let stats_list = batch
149 .column(2)
150 .as_any()
151 .downcast_ref::<ListArray>()
152 .ok_or_else(|| DataFrameError::DowncastError("ListArray"))?
153 .value(0);
154
155 let stats_structs = stats_list
156 .as_any()
157 .downcast_ref::<StructArray>()
158 .ok_or_else(|| DataFrameError::DowncastError("StructArray"))?;
159
160 let avg = stats_structs
164 .column_by_name("avg")
165 .ok_or_else(|| DataFrameError::MissingFieldError("avg"))?
166 .as_primitive::<Float64Type>()
167 .value(0);
168
169 let lower_bound = stats_structs
170 .column_by_name("lower_bound")
171 .ok_or_else(|| DataFrameError::MissingFieldError("lower_bound"))?
172 .as_primitive::<Float64Type>()
173 .value(0);
174
175 let upper_bound = stats_structs
176 .column_by_name("upper_bound")
177 .ok_or_else(|| DataFrameError::MissingFieldError("upper_bound"))?
178 .as_primitive::<Float64Type>()
179 .value(0);
180
181 Ok(BinnedCustomMetricStats {
182 avg,
183 lower_bound,
184 upper_bound,
185 })
186}
187
188fn process_custom_record_batch(batch: &RecordBatch) -> Result<BinnedCustomMetric, DataFrameError> {
189 let metric_array = batch
190 .column(0)
191 .as_any()
192 .downcast_ref::<StringViewArray>()
193 .expect("Failed to downcast to StringViewArray");
194 let metric_name = metric_array.value(0).to_string();
195 let created_at_list = extract_created_at(batch)?;
196 let stats = extract_stats(batch)?;
197
198 Ok(BinnedCustomMetric {
199 metric: metric_name,
200 created_at: created_at_list,
201 stats: vec![stats],
202 })
203}
204
205pub async fn dataframe_to_custom_drift_metrics(
213 df: DataFrame,
214) -> Result<BinnedCustomMetrics, DataFrameError> {
215 let batches = df.collect().await?;
216
217 let metrics: Vec<BinnedCustomMetric> = batches
218 .iter()
219 .map(process_custom_record_batch)
220 .collect::<Result<Vec<_>, _>>()?;
221
222 Ok(BinnedCustomMetrics::from_vec(metrics))
223}