Skip to main content

scouter_dataframe/parquet/
custom.rs

1use crate::error::DataFrameError;
2use crate::parquet::traits::ParquetFrame;
3use crate::parquet::types::BinnedTableName;
4use crate::sql::helper::get_binned_custom_metric_values_query;
5use crate::storage::ObjectStore;
6use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
7use arrow_array::array::{Float64Array, StringArray, TimestampNanosecondArray};
8use arrow_array::{Int32Array, RecordBatch};
9use async_trait::async_trait;
10use chrono::{DateTime, Utc};
11use datafusion::dataframe::DataFrame;
12use datafusion::prelude::SessionContext;
13use scouter_settings::ObjectStorageSettings;
14
15use scouter_types::{CustomMetricRecord, ServerRecords, StorageType, ToDriftRecords};
16use std::sync::Arc;
17
18pub struct CustomMetricDataFrame {
19    schema: Arc<Schema>,
20    pub object_store: ObjectStore,
21}
22
23#[async_trait]
24impl ParquetFrame for CustomMetricDataFrame {
25    fn new(storage_settings: &ObjectStorageSettings) -> Result<Self, DataFrameError> {
26        CustomMetricDataFrame::new(storage_settings)
27    }
28
29    async fn get_dataframe(&self, records: ServerRecords) -> Result<DataFrame, DataFrameError> {
30        let records = records.to_custom_metric_drift_records()?;
31        let batch = self.build_batch(records)?;
32
33        let ctx = self.object_store.get_session()?;
34
35        let df = ctx.read_batches(vec![batch])?;
36
37        Ok(df)
38    }
39
40    fn storage_root(&self) -> String {
41        self.object_store.storage_settings.canonicalized_path()
42    }
43
44    fn storage_type(&self) -> StorageType {
45        self.object_store.storage_settings.storage_type.clone()
46    }
47
48    fn get_session_context(&self) -> Result<SessionContext, DataFrameError> {
49        Ok(self.object_store.get_session()?)
50    }
51
52    fn get_binned_sql(
53        &self,
54        bin: &f64,
55        start_time: &DateTime<Utc>,
56        end_time: &DateTime<Utc>,
57        entity_id: &i32,
58    ) -> String {
59        get_binned_custom_metric_values_query(bin, start_time, end_time, entity_id)
60    }
61
62    fn table_name(&self) -> String {
63        BinnedTableName::CustomMetric.to_string()
64    }
65}
66
67impl CustomMetricDataFrame {
68    pub fn new(storage_settings: &ObjectStorageSettings) -> Result<Self, DataFrameError> {
69        let schema = Arc::new(Schema::new(vec![
70            Field::new(
71                "created_at",
72                DataType::Timestamp(TimeUnit::Nanosecond, None),
73                false,
74            ),
75            Field::new("entity_id", DataType::Int32, false),
76            Field::new("metric", DataType::Utf8, false),
77            Field::new("value", DataType::Float64, false),
78        ]));
79
80        let object_store = ObjectStore::new(storage_settings)?;
81
82        Ok(CustomMetricDataFrame {
83            schema,
84            object_store,
85        })
86    }
87
88    fn build_batch(&self, records: Vec<CustomMetricRecord>) -> Result<RecordBatch, DataFrameError> {
89        let created_at_array = TimestampNanosecondArray::from_iter_values(
90            records
91                .iter()
92                .map(|r| r.created_at.timestamp_nanos_opt().unwrap_or_default()),
93        );
94        let entity_id_array = Int32Array::from_iter_values(records.iter().map(|r| r.entity_id));
95        let metric_array = StringArray::from_iter_values(records.iter().map(|r| r.metric.as_str()));
96        let value_array = Float64Array::from_iter_values(records.iter().map(|r| r.value));
97        let batch = RecordBatch::try_new(
98            self.schema.clone(),
99            vec![
100                Arc::new(created_at_array),
101                Arc::new(entity_id_array),
102                Arc::new(metric_array),
103                Arc::new(value_array),
104            ],
105        )?;
106
107        Ok(batch)
108    }
109}