Skip to main content

scouter_dataframe/parquet/genai/
eval.rs

1// This module contains dataframe operations for GenAI drift records (input, response, context, prompt).
2use crate::error::DataFrameError;
3use crate::parquet::traits::ParquetFrame;
4use crate::parquet::types::BinnedTableName;
5use crate::storage::ObjectStore;
6use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
7use arrow_array::array::{
8    DictionaryArray, FixedSizeBinaryArray, Int32Array, Int64Array, StringArray,
9    TimestampNanosecondArray, UInt32Array, UInt8Array,
10};
11use arrow_array::RecordBatch;
12use async_trait::async_trait;
13use chrono::{DateTime, Utc};
14use datafusion::dataframe::DataFrame;
15use datafusion::prelude::SessionContext;
16use scouter_settings::ObjectStorageSettings;
17use scouter_types::ToDriftRecords;
18use scouter_types::{BoxedGenAIEvalRecord, ServerRecords, StorageType};
19use std::sync::Arc;
20
21pub struct GenAIEvalDataFrame {
22    schema: Arc<Schema>,
23    pub object_store: ObjectStore,
24}
25
26#[async_trait]
27impl ParquetFrame for GenAIEvalDataFrame {
28    fn new(storage_settings: &ObjectStorageSettings) -> Result<Self, DataFrameError> {
29        GenAIEvalDataFrame::new(storage_settings)
30    }
31
32    async fn get_dataframe(&self, records: ServerRecords) -> Result<DataFrame, DataFrameError> {
33        let records = records.to_genai_eval_records()?;
34        let batch = self.build_batch(records)?;
35
36        let ctx = self.object_store.get_session()?;
37
38        let df = ctx.read_batches(vec![batch])?;
39
40        Ok(df)
41    }
42
43    fn storage_root(&self) -> String {
44        self.object_store.storage_settings.canonicalized_path()
45    }
46
47    fn storage_type(&self) -> StorageType {
48        self.object_store.storage_settings.storage_type.clone()
49    }
50
51    fn get_session_context(&self) -> Result<SessionContext, DataFrameError> {
52        Ok(self.object_store.get_session()?)
53    }
54
55    fn get_binned_sql(
56        &self,
57        _bin: &f64,
58        _start_time: &DateTime<Utc>,
59        _end_time: &DateTime<Utc>,
60        _entity_id: &i32,
61    ) -> String {
62        "None".to_string()
63    }
64
65    fn table_name(&self) -> String {
66        BinnedTableName::GenAIEval.to_string()
67    }
68}
69
70impl GenAIEvalDataFrame {
71    pub fn new(storage_settings: &ObjectStorageSettings) -> Result<Self, DataFrameError> {
72        let schema = Arc::new(Schema::new(vec![
73            // Primary keys and identifiers
74            Field::new("id", DataType::Int64, false),
75            Field::new("uid", DataType::Utf8, false),
76            Field::new("entity_id", DataType::Int32, false),
77            Field::new("entity_uid", DataType::Utf8, false),
78            Field::new(
79                "entity_type",
80                DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
81                false,
82            ),
83            Field::new(
84                "record_id",
85                DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)),
86                true,
87            ),
88            Field::new(
89                "session_id",
90                DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)),
91                true,
92            ),
93            Field::new(
94                "status",
95                DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
96                false,
97            ),
98            Field::new(
99                "created_at",
100                DataType::Timestamp(TimeUnit::Nanosecond, None),
101                false,
102            ),
103            Field::new(
104                "updated_at",
105                DataType::Timestamp(TimeUnit::Nanosecond, None),
106                true,
107            ),
108            Field::new(
109                "processing_started_at",
110                DataType::Timestamp(TimeUnit::Nanosecond, None),
111                true,
112            ),
113            Field::new(
114                "processing_ended_at",
115                DataType::Timestamp(TimeUnit::Nanosecond, None),
116                true,
117            ),
118            Field::new("processing_duration", DataType::Int32, true),
119            Field::new("retry_count", DataType::Int32, false),
120            Field::new("context", DataType::Utf8, false),
121            Field::new("trace_id", DataType::FixedSizeBinary(16), true),
122        ]));
123
124        let object_store = ObjectStore::new(storage_settings)?;
125
126        Ok(GenAIEvalDataFrame {
127            schema,
128            object_store,
129        })
130    }
131
132    fn build_batch(
133        &self,
134        records: Vec<BoxedGenAIEvalRecord>,
135    ) -> Result<RecordBatch, DataFrameError> {
136        // Build ID and UID arrays
137        let id_array = Int64Array::from_iter_values(records.iter().map(|r| r.record.id));
138        let uid_array =
139            StringArray::from_iter_values(records.iter().map(|r| r.record.uid.as_str()));
140
141        let entity_id_array =
142            Int32Array::from_iter_values(records.iter().map(|r| r.record.entity_id));
143
144        let entity_uid_array =
145            StringArray::from_iter_values(records.iter().map(|r| r.record.entity_uid.as_str()));
146
147        let entity_type_values =
148            StringArray::from_iter_values(records.iter().map(|r| r.record.entity_type.to_string()));
149        let entity_type_keys = UInt8Array::from_iter_values(0..records.len() as u8);
150        let entity_type_array =
151            DictionaryArray::new(entity_type_keys, Arc::new(entity_type_values));
152
153        let record_id_values =
154            StringArray::from_iter_values(records.iter().map(|r| r.record.record_id.as_str()));
155        let record_id_keys = UInt32Array::from_iter_values(0..records.len() as u32);
156        let record_id_array = DictionaryArray::new(record_id_keys, Arc::new(record_id_values));
157
158        let session_id_values =
159            StringArray::from_iter_values(records.iter().map(|r| r.record.session_id.as_str()));
160        let session_id_keys = UInt32Array::from_iter_values(0..records.len() as u32);
161        let session_id_array = DictionaryArray::new(session_id_keys, Arc::new(session_id_values));
162
163        let status_values =
164            StringArray::from_iter_values(records.iter().map(|r| r.record.status.to_string()));
165        let status_keys = UInt8Array::from_iter_values(0..records.len() as u8);
166        let status_array = DictionaryArray::new(status_keys, Arc::new(status_values));
167
168        let created_at_array =
169            TimestampNanosecondArray::from_iter_values(records.iter().map(|r| {
170                r.record
171                    .created_at
172                    .timestamp_nanos_opt()
173                    .unwrap_or_default()
174            }));
175        let updated_at_array = TimestampNanosecondArray::from_iter(
176            records
177                .iter()
178                .map(|r| r.record.updated_at.and_then(|dt| dt.timestamp_nanos_opt())),
179        );
180        let processing_started_at_array =
181            TimestampNanosecondArray::from_iter(records.iter().map(|r| {
182                r.record
183                    .processing_started_at
184                    .and_then(|dt| dt.timestamp_nanos_opt())
185            }));
186        let processing_ended_at_array =
187            TimestampNanosecondArray::from_iter(records.iter().map(|r| {
188                r.record
189                    .processing_ended_at
190                    .and_then(|dt| dt.timestamp_nanos_opt())
191            }));
192
193        let processing_duration_array =
194            Int32Array::from_iter(records.iter().map(|r| r.record.processing_duration));
195        let retry_count_array =
196            Int32Array::from_iter_values(records.iter().map(|r| r.record.retry_count));
197
198        let context_array = StringArray::from_iter_values(records.iter().map(|r| {
199            serde_json::to_string(&r.record.context).unwrap_or_else(|_| "{}".to_string())
200        }));
201
202        let trace_id_array = FixedSizeBinaryArray::try_from_sparse_iter_with_size(
203            records.iter().map(|r| {
204                r.record
205                    .trace_id
206                    .as_ref()
207                    .map(|tid| tid.as_bytes().to_vec())
208            }),
209            16,
210        )?;
211
212        let batch = RecordBatch::try_new(
213            self.schema.clone(),
214            vec![
215                Arc::new(id_array),
216                Arc::new(uid_array),
217                Arc::new(entity_id_array),
218                Arc::new(entity_uid_array),
219                Arc::new(entity_type_array),
220                Arc::new(record_id_array),
221                Arc::new(session_id_array),
222                Arc::new(status_array),
223                Arc::new(created_at_array),
224                Arc::new(updated_at_array),
225                Arc::new(processing_started_at_array),
226                Arc::new(processing_ended_at_array),
227                Arc::new(processing_duration_array),
228                Arc::new(retry_count_array),
229                Arc::new(context_array),
230                Arc::new(trace_id_array),
231            ],
232        )?;
233
234        Ok(batch)
235    }
236}