scouter_dataframe/parquet/genai/
eval.rs1use crate::error::DataFrameError;
3use crate::parquet::traits::ParquetFrame;
4use crate::parquet::types::BinnedTableName;
5use crate::storage::ObjectStore;
6use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
7use arrow_array::array::{Int32Array, StringArray, TimestampNanosecondArray};
8use arrow_array::RecordBatch;
9use async_trait::async_trait;
10use chrono::{DateTime, Utc};
11use datafusion::dataframe::DataFrame;
12use datafusion::prelude::SessionContext;
13use scouter_settings::ObjectStorageSettings;
14use scouter_types::ToDriftRecords;
15use scouter_types::{BoxedGenAIEvalRecord, ServerRecords, StorageType};
16use std::sync::Arc;
17
18pub struct GenAIEvalDataFrame {
19 schema: Arc<Schema>,
20 pub object_store: ObjectStore,
21}
22
23#[async_trait]
24impl ParquetFrame for GenAIEvalDataFrame {
25 fn new(storage_settings: &ObjectStorageSettings) -> Result<Self, DataFrameError> {
26 GenAIEvalDataFrame::new(storage_settings)
27 }
28
29 async fn get_dataframe(&self, records: ServerRecords) -> Result<DataFrame, DataFrameError> {
30 let records = records.to_genai_eval_records()?;
31 let batch = self.build_batch(records)?;
32
33 let ctx = self.object_store.get_session()?;
34
35 let df = ctx.read_batches(vec![batch])?;
36
37 Ok(df)
38 }
39
40 fn storage_root(&self) -> String {
41 self.object_store.storage_settings.canonicalized_path()
42 }
43
44 fn storage_type(&self) -> StorageType {
45 self.object_store.storage_settings.storage_type.clone()
46 }
47
48 fn get_session_context(&self) -> Result<SessionContext, DataFrameError> {
49 Ok(self.object_store.get_session()?)
50 }
51
52 fn get_binned_sql(
53 &self,
54 _bin: &f64,
55 _start_time: &DateTime<Utc>,
56 _end_time: &DateTime<Utc>,
57 _entity_id: &i32,
58 ) -> String {
59 "None".to_string()
60 }
61
62 fn table_name(&self) -> String {
63 BinnedTableName::GenAIEval.to_string()
64 }
65}
66
67impl GenAIEvalDataFrame {
68 pub fn new(storage_settings: &ObjectStorageSettings) -> Result<Self, DataFrameError> {
69 let schema = Arc::new(Schema::new(vec![
70 Field::new("id", DataType::Int64, false),
71 Field::new("record_id", DataType::Utf8, true),
72 Field::new("session_id", DataType::Utf8, true),
73 Field::new(
74 "created_at",
75 DataType::Timestamp(TimeUnit::Nanosecond, None),
76 false,
77 ),
78 Field::new("uid", DataType::Utf8, false),
79 Field::new("context", DataType::Utf8, false),
80 Field::new(
81 "updated_at",
82 DataType::Timestamp(TimeUnit::Nanosecond, None),
83 true,
84 ),
85 Field::new("status", DataType::Utf8, false),
86 Field::new(
87 "processing_started_at",
88 DataType::Timestamp(TimeUnit::Nanosecond, None),
89 true,
90 ),
91 Field::new(
92 "processing_ended_at",
93 DataType::Timestamp(TimeUnit::Nanosecond, None),
94 true,
95 ),
96 Field::new("processing_duration", DataType::Int32, true),
97 Field::new("entity_id", DataType::Int32, false),
98 ]));
99
100 let object_store = ObjectStore::new(storage_settings)?;
101
102 Ok(GenAIEvalDataFrame {
103 schema,
104 object_store,
105 })
106 }
107
108 fn build_batch(
109 &self,
110 records: Vec<BoxedGenAIEvalRecord>,
111 ) -> Result<RecordBatch, DataFrameError> {
112 let id_array =
113 arrow_array::Int64Array::from_iter_values(records.iter().map(|r| r.record.id));
114 let record_id_array =
115 StringArray::from_iter_values(records.iter().map(|r| r.record.record_id.as_str()));
116 let session_id_array =
117 StringArray::from_iter_values(records.iter().map(|r| r.record.session_id.as_str()));
118 let created_at_array =
119 TimestampNanosecondArray::from_iter_values(records.iter().map(|r| {
120 r.record
121 .created_at
122 .timestamp_nanos_opt()
123 .unwrap_or_default()
124 }));
125 let uid_array =
126 StringArray::from_iter_values(records.iter().map(|r| r.record.uid.as_str()));
127 let entity_id_array =
128 Int32Array::from_iter_values(records.iter().map(|r| r.record.entity_id));
129 let context_array = StringArray::from_iter_values(records.iter().map(|r| {
130 serde_json::to_string(&r.record.context).unwrap_or_else(|_| "{}".to_string())
131 }));
132 let updated_at_array = TimestampNanosecondArray::from_iter(
133 records
134 .iter()
135 .map(|r| r.record.updated_at.and_then(|dt| dt.timestamp_nanos_opt())),
136 );
137 let status_array =
138 StringArray::from_iter_values(records.iter().map(|r| r.record.status.to_string()));
139
140 let processing_started_at_array =
141 TimestampNanosecondArray::from_iter(records.iter().map(|r| {
142 r.record
143 .processing_started_at
144 .and_then(|dt| dt.timestamp_nanos_opt())
145 }));
146
147 let processing_ended_at_array =
148 TimestampNanosecondArray::from_iter(records.iter().map(|r| {
149 r.record
150 .processing_ended_at
151 .and_then(|dt| dt.timestamp_nanos_opt())
152 }));
153
154 let processing_duration_array =
156 Int32Array::from_iter(records.iter().map(|r| r.record.processing_duration));
157
158 let batch = RecordBatch::try_new(
159 self.schema.clone(),
160 vec![
161 Arc::new(id_array),
162 Arc::new(record_id_array),
163 Arc::new(session_id_array),
164 Arc::new(created_at_array),
165 Arc::new(uid_array),
166 Arc::new(context_array),
167 Arc::new(updated_at_array),
168 Arc::new(status_array),
169 Arc::new(processing_started_at_array),
170 Arc::new(processing_ended_at_array),
171 Arc::new(processing_duration_array),
172 Arc::new(entity_id_array),
173 ],
174 )?;
175
176 Ok(batch)
177 }
178}