scouter_dataframe/parquet/llm/
metric.rs

1use crate::error::DataFrameError;
2use crate::parquet::traits::ParquetFrame;
3use crate::parquet::types::BinnedTableName;
4use crate::sql::helper::get_binned_llm_metric_values_query;
5use crate::storage::ObjectStore;
6use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
7use arrow_array::array::{Float64Array, StringArray, TimestampNanosecondArray};
8use arrow_array::RecordBatch;
9use async_trait::async_trait;
10use chrono::{DateTime, Utc};
11use datafusion::dataframe::DataFrame;
12use datafusion::prelude::SessionContext;
13use scouter_settings::ObjectStorageSettings;
14use scouter_types::{LLMMetricRecord, ServerRecords, StorageType, ToDriftRecords};
15use std::sync::Arc;
16
17pub struct LLMMetricDataFrame {
18    schema: Arc<Schema>,
19    pub object_store: ObjectStore,
20}
21
22#[async_trait]
23impl ParquetFrame for LLMMetricDataFrame {
24    fn new(storage_settings: &ObjectStorageSettings) -> Result<Self, DataFrameError> {
25        LLMMetricDataFrame::new(storage_settings)
26    }
27
28    async fn get_dataframe(&self, records: ServerRecords) -> Result<DataFrame, DataFrameError> {
29        let records = records.to_llm_metric_records()?;
30        let batch = self.build_batch(records)?;
31
32        let ctx = self.object_store.get_session()?;
33
34        let df = ctx.read_batches(vec![batch])?;
35
36        Ok(df)
37    }
38
39    fn storage_root(&self) -> String {
40        self.object_store.storage_settings.canonicalized_path()
41    }
42
43    fn storage_type(&self) -> StorageType {
44        self.object_store.storage_settings.storage_type.clone()
45    }
46
47    fn get_session_context(&self) -> Result<SessionContext, DataFrameError> {
48        Ok(self.object_store.get_session()?)
49    }
50
51    fn get_binned_sql(
52        &self,
53        bin: &f64,
54        start_time: &DateTime<Utc>,
55        end_time: &DateTime<Utc>,
56        space: &str,
57        name: &str,
58        version: &str,
59    ) -> String {
60        get_binned_llm_metric_values_query(bin, start_time, end_time, space, name, version)
61    }
62
63    fn table_name(&self) -> String {
64        BinnedTableName::LLMMetric.to_string()
65    }
66}
67
68impl LLMMetricDataFrame {
69    pub fn new(storage_settings: &ObjectStorageSettings) -> Result<Self, DataFrameError> {
70        let schema = Arc::new(Schema::new(vec![
71            Field::new(
72                "created_at",
73                DataType::Timestamp(TimeUnit::Nanosecond, None),
74                false,
75            ),
76            Field::new("record_uid", DataType::Utf8, false),
77            Field::new("space", DataType::Utf8, false),
78            Field::new("name", DataType::Utf8, false),
79            Field::new("version", DataType::Utf8, false),
80            Field::new("metric", DataType::Utf8, false),
81            Field::new("value", DataType::Float64, false),
82        ]));
83
84        let object_store = ObjectStore::new(storage_settings)?;
85
86        Ok(LLMMetricDataFrame {
87            schema,
88            object_store,
89        })
90    }
91
92    fn build_batch(&self, records: Vec<LLMMetricRecord>) -> Result<RecordBatch, DataFrameError> {
93        let created_at_array = TimestampNanosecondArray::from_iter_values(
94            records
95                .iter()
96                .map(|r| r.created_at.timestamp_nanos_opt().unwrap_or_default()),
97        );
98        let record_uid_array =
99            StringArray::from_iter_values(records.iter().map(|r| r.record_uid.as_str()));
100        let space_array = StringArray::from_iter_values(records.iter().map(|r| r.space.as_str()));
101        let name_array = StringArray::from_iter_values(records.iter().map(|r| r.name.as_str()));
102        let version_array =
103            StringArray::from_iter_values(records.iter().map(|r| r.version.as_str()));
104        let metric_array = StringArray::from_iter_values(records.iter().map(|r| r.metric.as_str()));
105
106        let value_array = Float64Array::from_iter_values(records.iter().map(|r| r.value));
107
108        let batch = RecordBatch::try_new(
109            self.schema.clone(),
110            vec![
111                Arc::new(created_at_array),
112                Arc::new(record_uid_array),
113                Arc::new(space_array),
114                Arc::new(name_array),
115                Arc::new(version_array),
116                Arc::new(metric_array),
117                Arc::new(value_array),
118            ],
119        )?;
120
121        Ok(batch)
122    }
123}