scouter_dataframe/parquet/llm/
drift.rs1use crate::error::DataFrameError;
3use crate::parquet::traits::ParquetFrame;
4use crate::parquet::types::BinnedTableName;
5use crate::storage::ObjectStore;
6use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
7use arrow_array::array::{Int32Array, StringArray, TimestampNanosecondArray};
8use arrow_array::RecordBatch;
9use async_trait::async_trait;
10use chrono::{DateTime, Utc};
11use datafusion::dataframe::DataFrame;
12use datafusion::prelude::SessionContext;
13use scouter_settings::ObjectStorageSettings;
14
15use scouter_types::{LLMDriftServerRecord, ServerRecords, StorageType, ToDriftRecords};
16use std::sync::Arc;
17
18pub struct LLMDriftDataFrame {
19    schema: Arc<Schema>,
20    pub object_store: ObjectStore,
21}
22
23#[async_trait]
24impl ParquetFrame for LLMDriftDataFrame {
25    fn new(storage_settings: &ObjectStorageSettings) -> Result<Self, DataFrameError> {
26        LLMDriftDataFrame::new(storage_settings)
27    }
28
29    async fn get_dataframe(&self, records: ServerRecords) -> Result<DataFrame, DataFrameError> {
30        let records = records.to_llm_drift_records()?;
31        let batch = self.build_batch(records)?;
32
33        let ctx = self.object_store.get_session()?;
34
35        let df = ctx.read_batches(vec![batch])?;
36
37        Ok(df)
38    }
39
40    fn storage_root(&self) -> String {
41        self.object_store.storage_settings.canonicalized_path()
42    }
43
44    fn storage_type(&self) -> StorageType {
45        self.object_store.storage_settings.storage_type.clone()
46    }
47
48    fn get_session_context(&self) -> Result<SessionContext, DataFrameError> {
49        Ok(self.object_store.get_session()?)
50    }
51
52    fn get_binned_sql(
53        &self,
54        _bin: &f64,
55        _start_time: &DateTime<Utc>,
56        _end_time: &DateTime<Utc>,
57        _space: &str,
58        _name: &str,
59        _version: &str,
60    ) -> String {
61        "None".to_string()
62    }
63
64    fn table_name(&self) -> String {
65        BinnedTableName::LLMDrift.to_string()
66    }
67}
68
69impl LLMDriftDataFrame {
70    pub fn new(storage_settings: &ObjectStorageSettings) -> Result<Self, DataFrameError> {
71        let schema = Arc::new(Schema::new(vec![
72            Field::new("id", DataType::Int64, false),
73            Field::new(
74                "created_at",
75                DataType::Timestamp(TimeUnit::Nanosecond, None),
76                false,
77            ),
78            Field::new("uid", DataType::Utf8, false),
79            Field::new("space", DataType::Utf8, false),
80            Field::new("name", DataType::Utf8, false),
81            Field::new("version", DataType::Utf8, false),
82            Field::new("context", DataType::Utf8, false),
83            Field::new("prompt", DataType::Utf8, true),
84            Field::new("score", DataType::Utf8, true),
85            Field::new(
86                "updated_at",
87                DataType::Timestamp(TimeUnit::Nanosecond, None),
88                true,
89            ),
90            Field::new("status", DataType::Utf8, false),
91            Field::new(
92                "processing_started_at",
93                DataType::Timestamp(TimeUnit::Nanosecond, None),
94                true,
95            ),
96            Field::new(
97                "processing_ended_at",
98                DataType::Timestamp(TimeUnit::Nanosecond, None),
99                true,
100            ),
101            Field::new("processing_duration", DataType::Int32, true),
102        ]));
103
104        let object_store = ObjectStore::new(storage_settings)?;
105
106        Ok(LLMDriftDataFrame {
107            schema,
108            object_store,
109        })
110    }
111
112    fn build_batch(
113        &self,
114        records: Vec<LLMDriftServerRecord>,
115    ) -> Result<RecordBatch, DataFrameError> {
116        let id_array = arrow_array::Int64Array::from_iter_values(records.iter().map(|r| r.id));
117        let created_at_array = TimestampNanosecondArray::from_iter_values(
118            records
119                .iter()
120                .map(|r| r.created_at.timestamp_nanos_opt().unwrap_or_default()),
121        );
122        let uid_array = StringArray::from_iter_values(records.iter().map(|r| r.uid.as_str()));
123        let space_array = StringArray::from_iter_values(records.iter().map(|r| r.space.as_str()));
124        let name_array = StringArray::from_iter_values(records.iter().map(|r| r.name.as_str()));
125        let version_array =
126            StringArray::from_iter_values(records.iter().map(|r| r.version.as_str()));
127
128        let score_array = StringArray::from_iter_values(
129            records
130                .iter()
131                .map(|r| serde_json::to_string(&r.score).unwrap_or_else(|_| "{}".to_string())),
132        );
133        let context_array = StringArray::from_iter_values(
134            records
135                .iter()
136                .map(|r| serde_json::to_string(&r.context).unwrap_or_else(|_| "{}".to_string())),
137        );
138
139        let prompt_array = StringArray::from_iter(records.iter().map(|r| {
140            r.prompt
141                .as_ref()
142                .map(|p| serde_json::to_string(p).unwrap_or_else(|_| "{}".to_string()))
143        }));
144        let updated_at_array = TimestampNanosecondArray::from_iter(
145            records
146                .iter()
147                .map(|r| r.updated_at.and_then(|dt| dt.timestamp_nanos_opt())),
148        );
149        let status_array =
150            StringArray::from_iter_values(records.iter().map(|r| r.status.to_string()));
151
152        let processing_started_at_array =
153            TimestampNanosecondArray::from_iter(records.iter().map(|r| {
154                r.processing_started_at
155                    .and_then(|dt| dt.timestamp_nanos_opt())
156            }));
157
158        let processing_ended_at_array =
159            TimestampNanosecondArray::from_iter(records.iter().map(|r| {
160                r.processing_ended_at
161                    .and_then(|dt| dt.timestamp_nanos_opt())
162            }));
163
164        let processing_duration_array =
166            Int32Array::from_iter(records.iter().map(|r| r.processing_duration));
167
168        let batch = RecordBatch::try_new(
169            self.schema.clone(),
170            vec![
171                Arc::new(id_array),
172                Arc::new(created_at_array),
173                Arc::new(uid_array),
174                Arc::new(space_array),
175                Arc::new(name_array),
176                Arc::new(version_array),
177                Arc::new(context_array),
178                Arc::new(prompt_array),
179                Arc::new(score_array),
180                Arc::new(updated_at_array),
181                Arc::new(status_array),
182                Arc::new(processing_started_at_array),
183                Arc::new(processing_ended_at_array),
184                Arc::new(processing_duration_array),
185            ],
186        )?;
187
188        Ok(batch)
189    }
190}