Skip to main content

scouter_dataframe/parquet/genai/
eval.rs

1// This module contains dataframe operations for GenAI drift records (input, response, context, prompt).
2use crate::error::DataFrameError;
3use crate::parquet::traits::ParquetFrame;
4use crate::parquet::types::BinnedTableName;
5use crate::storage::ObjectStore;
6use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
7use arrow_array::array::{Int32Array, StringArray, TimestampNanosecondArray};
8use arrow_array::RecordBatch;
9use async_trait::async_trait;
10use chrono::{DateTime, Utc};
11use datafusion::dataframe::DataFrame;
12use datafusion::prelude::SessionContext;
13use scouter_settings::ObjectStorageSettings;
14use scouter_types::ToDriftRecords;
15use scouter_types::{BoxedGenAIEvalRecord, ServerRecords, StorageType};
16use std::sync::Arc;
17
18pub struct GenAIEvalDataFrame {
19    schema: Arc<Schema>,
20    pub object_store: ObjectStore,
21}
22
23#[async_trait]
24impl ParquetFrame for GenAIEvalDataFrame {
25    fn new(storage_settings: &ObjectStorageSettings) -> Result<Self, DataFrameError> {
26        GenAIEvalDataFrame::new(storage_settings)
27    }
28
29    async fn get_dataframe(&self, records: ServerRecords) -> Result<DataFrame, DataFrameError> {
30        let records = records.to_genai_eval_records()?;
31        let batch = self.build_batch(records)?;
32
33        let ctx = self.object_store.get_session()?;
34
35        let df = ctx.read_batches(vec![batch])?;
36
37        Ok(df)
38    }
39
40    fn storage_root(&self) -> String {
41        self.object_store.storage_settings.canonicalized_path()
42    }
43
44    fn storage_type(&self) -> StorageType {
45        self.object_store.storage_settings.storage_type.clone()
46    }
47
48    fn get_session_context(&self) -> Result<SessionContext, DataFrameError> {
49        Ok(self.object_store.get_session()?)
50    }
51
52    fn get_binned_sql(
53        &self,
54        _bin: &f64,
55        _start_time: &DateTime<Utc>,
56        _end_time: &DateTime<Utc>,
57        _entity_id: &i32,
58    ) -> String {
59        "None".to_string()
60    }
61
62    fn table_name(&self) -> String {
63        BinnedTableName::GenAIEval.to_string()
64    }
65}
66
67impl GenAIEvalDataFrame {
68    pub fn new(storage_settings: &ObjectStorageSettings) -> Result<Self, DataFrameError> {
69        let schema = Arc::new(Schema::new(vec![
70            Field::new("id", DataType::Int64, false),
71            Field::new("record_id", DataType::Utf8, true),
72            Field::new("session_id", DataType::Utf8, true),
73            Field::new(
74                "created_at",
75                DataType::Timestamp(TimeUnit::Nanosecond, None),
76                false,
77            ),
78            Field::new("uid", DataType::Utf8, false),
79            Field::new("context", DataType::Utf8, false),
80            Field::new(
81                "updated_at",
82                DataType::Timestamp(TimeUnit::Nanosecond, None),
83                true,
84            ),
85            Field::new("status", DataType::Utf8, false),
86            Field::new(
87                "processing_started_at",
88                DataType::Timestamp(TimeUnit::Nanosecond, None),
89                true,
90            ),
91            Field::new(
92                "processing_ended_at",
93                DataType::Timestamp(TimeUnit::Nanosecond, None),
94                true,
95            ),
96            Field::new("processing_duration", DataType::Int32, true),
97            Field::new("entity_id", DataType::Int32, false),
98        ]));
99
100        let object_store = ObjectStore::new(storage_settings)?;
101
102        Ok(GenAIEvalDataFrame {
103            schema,
104            object_store,
105        })
106    }
107
108    fn build_batch(
109        &self,
110        records: Vec<BoxedGenAIEvalRecord>,
111    ) -> Result<RecordBatch, DataFrameError> {
112        let id_array =
113            arrow_array::Int64Array::from_iter_values(records.iter().map(|r| r.record.id));
114        let record_id_array =
115            StringArray::from_iter_values(records.iter().map(|r| r.record.record_id.as_str()));
116        let session_id_array =
117            StringArray::from_iter_values(records.iter().map(|r| r.record.session_id.as_str()));
118        let created_at_array =
119            TimestampNanosecondArray::from_iter_values(records.iter().map(|r| {
120                r.record
121                    .created_at
122                    .timestamp_nanos_opt()
123                    .unwrap_or_default()
124            }));
125        let uid_array =
126            StringArray::from_iter_values(records.iter().map(|r| r.record.uid.as_str()));
127        let entity_id_array =
128            Int32Array::from_iter_values(records.iter().map(|r| r.record.entity_id));
129        let context_array = StringArray::from_iter_values(records.iter().map(|r| {
130            serde_json::to_string(&r.record.context).unwrap_or_else(|_| "{}".to_string())
131        }));
132        let updated_at_array = TimestampNanosecondArray::from_iter(
133            records
134                .iter()
135                .map(|r| r.record.updated_at.and_then(|dt| dt.timestamp_nanos_opt())),
136        );
137        let status_array =
138            StringArray::from_iter_values(records.iter().map(|r| r.record.status.to_string()));
139
140        let processing_started_at_array =
141            TimestampNanosecondArray::from_iter(records.iter().map(|r| {
142                r.record
143                    .processing_started_at
144                    .and_then(|dt| dt.timestamp_nanos_opt())
145            }));
146
147        let processing_ended_at_array =
148            TimestampNanosecondArray::from_iter(records.iter().map(|r| {
149                r.record
150                    .processing_ended_at
151                    .and_then(|dt| dt.timestamp_nanos_opt())
152            }));
153
154        // Calculate processing duration in seconds
155        let processing_duration_array =
156            Int32Array::from_iter(records.iter().map(|r| r.record.processing_duration));
157
158        let batch = RecordBatch::try_new(
159            self.schema.clone(),
160            vec![
161                Arc::new(id_array),
162                Arc::new(record_id_array),
163                Arc::new(session_id_array),
164                Arc::new(created_at_array),
165                Arc::new(uid_array),
166                Arc::new(context_array),
167                Arc::new(updated_at_array),
168                Arc::new(status_array),
169                Arc::new(processing_started_at_array),
170                Arc::new(processing_ended_at_array),
171                Arc::new(processing_duration_array),
172                Arc::new(entity_id_array),
173            ],
174        )?;
175
176        Ok(batch)
177    }
178}