scouter_dataframe/parquet/genai/
eval.rs1use crate::error::DataFrameError;
3use crate::parquet::traits::ParquetFrame;
4use crate::parquet::types::BinnedTableName;
5use crate::storage::ObjectStore;
6use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
7use arrow_array::array::{
8 DictionaryArray, FixedSizeBinaryArray, Int32Array, Int64Array, StringArray,
9 TimestampNanosecondArray, UInt32Array, UInt8Array,
10};
11use arrow_array::RecordBatch;
12use async_trait::async_trait;
13use chrono::{DateTime, Utc};
14use datafusion::dataframe::DataFrame;
15use datafusion::prelude::SessionContext;
16use scouter_settings::ObjectStorageSettings;
17use scouter_types::ToDriftRecords;
18use scouter_types::{BoxedGenAIEvalRecord, ServerRecords, StorageType};
19use std::sync::Arc;
20
21pub struct GenAIEvalDataFrame {
22 schema: Arc<Schema>,
23 pub object_store: ObjectStore,
24}
25
26#[async_trait]
27impl ParquetFrame for GenAIEvalDataFrame {
28 fn new(storage_settings: &ObjectStorageSettings) -> Result<Self, DataFrameError> {
29 GenAIEvalDataFrame::new(storage_settings)
30 }
31
32 async fn get_dataframe(&self, records: ServerRecords) -> Result<DataFrame, DataFrameError> {
33 let records = records.to_genai_eval_records()?;
34 let batch = self.build_batch(records)?;
35
36 let ctx = self.object_store.get_session()?;
37
38 let df = ctx.read_batches(vec![batch])?;
39
40 Ok(df)
41 }
42
43 fn storage_root(&self) -> String {
44 self.object_store.storage_settings.canonicalized_path()
45 }
46
47 fn storage_type(&self) -> StorageType {
48 self.object_store.storage_settings.storage_type.clone()
49 }
50
51 fn get_session_context(&self) -> Result<SessionContext, DataFrameError> {
52 Ok(self.object_store.get_session()?)
53 }
54
55 fn get_binned_sql(
56 &self,
57 _bin: &f64,
58 _start_time: &DateTime<Utc>,
59 _end_time: &DateTime<Utc>,
60 _entity_id: &i32,
61 ) -> String {
62 "None".to_string()
63 }
64
65 fn table_name(&self) -> String {
66 BinnedTableName::GenAIEval.to_string()
67 }
68}
69
70impl GenAIEvalDataFrame {
71 pub fn new(storage_settings: &ObjectStorageSettings) -> Result<Self, DataFrameError> {
72 let schema = Arc::new(Schema::new(vec![
73 Field::new("id", DataType::Int64, false),
75 Field::new("uid", DataType::Utf8, false),
76 Field::new("entity_id", DataType::Int32, false),
77 Field::new("entity_uid", DataType::Utf8, false),
78 Field::new(
79 "entity_type",
80 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
81 false,
82 ),
83 Field::new(
84 "record_id",
85 DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)),
86 true,
87 ),
88 Field::new(
89 "session_id",
90 DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Utf8)),
91 true,
92 ),
93 Field::new(
94 "status",
95 DataType::Dictionary(Box::new(DataType::UInt8), Box::new(DataType::Utf8)),
96 false,
97 ),
98 Field::new(
99 "created_at",
100 DataType::Timestamp(TimeUnit::Nanosecond, None),
101 false,
102 ),
103 Field::new(
104 "updated_at",
105 DataType::Timestamp(TimeUnit::Nanosecond, None),
106 true,
107 ),
108 Field::new(
109 "processing_started_at",
110 DataType::Timestamp(TimeUnit::Nanosecond, None),
111 true,
112 ),
113 Field::new(
114 "processing_ended_at",
115 DataType::Timestamp(TimeUnit::Nanosecond, None),
116 true,
117 ),
118 Field::new("processing_duration", DataType::Int32, true),
119 Field::new("retry_count", DataType::Int32, false),
120 Field::new("context", DataType::Utf8, false),
121 Field::new("trace_id", DataType::FixedSizeBinary(16), true),
122 ]));
123
124 let object_store = ObjectStore::new(storage_settings)?;
125
126 Ok(GenAIEvalDataFrame {
127 schema,
128 object_store,
129 })
130 }
131
132 fn build_batch(
133 &self,
134 records: Vec<BoxedGenAIEvalRecord>,
135 ) -> Result<RecordBatch, DataFrameError> {
136 let id_array = Int64Array::from_iter_values(records.iter().map(|r| r.record.id));
138 let uid_array =
139 StringArray::from_iter_values(records.iter().map(|r| r.record.uid.as_str()));
140
141 let entity_id_array =
142 Int32Array::from_iter_values(records.iter().map(|r| r.record.entity_id));
143
144 let entity_uid_array =
145 StringArray::from_iter_values(records.iter().map(|r| r.record.entity_uid.as_str()));
146
147 let entity_type_values =
148 StringArray::from_iter_values(records.iter().map(|r| r.record.entity_type.to_string()));
149 let entity_type_keys = UInt8Array::from_iter_values(0..records.len() as u8);
150 let entity_type_array =
151 DictionaryArray::new(entity_type_keys, Arc::new(entity_type_values));
152
153 let record_id_values =
154 StringArray::from_iter_values(records.iter().map(|r| r.record.record_id.as_str()));
155 let record_id_keys = UInt32Array::from_iter_values(0..records.len() as u32);
156 let record_id_array = DictionaryArray::new(record_id_keys, Arc::new(record_id_values));
157
158 let session_id_values =
159 StringArray::from_iter_values(records.iter().map(|r| r.record.session_id.as_str()));
160 let session_id_keys = UInt32Array::from_iter_values(0..records.len() as u32);
161 let session_id_array = DictionaryArray::new(session_id_keys, Arc::new(session_id_values));
162
163 let status_values =
164 StringArray::from_iter_values(records.iter().map(|r| r.record.status.to_string()));
165 let status_keys = UInt8Array::from_iter_values(0..records.len() as u8);
166 let status_array = DictionaryArray::new(status_keys, Arc::new(status_values));
167
168 let created_at_array =
169 TimestampNanosecondArray::from_iter_values(records.iter().map(|r| {
170 r.record
171 .created_at
172 .timestamp_nanos_opt()
173 .unwrap_or_default()
174 }));
175 let updated_at_array = TimestampNanosecondArray::from_iter(
176 records
177 .iter()
178 .map(|r| r.record.updated_at.and_then(|dt| dt.timestamp_nanos_opt())),
179 );
180 let processing_started_at_array =
181 TimestampNanosecondArray::from_iter(records.iter().map(|r| {
182 r.record
183 .processing_started_at
184 .and_then(|dt| dt.timestamp_nanos_opt())
185 }));
186 let processing_ended_at_array =
187 TimestampNanosecondArray::from_iter(records.iter().map(|r| {
188 r.record
189 .processing_ended_at
190 .and_then(|dt| dt.timestamp_nanos_opt())
191 }));
192
193 let processing_duration_array =
194 Int32Array::from_iter(records.iter().map(|r| r.record.processing_duration));
195 let retry_count_array =
196 Int32Array::from_iter_values(records.iter().map(|r| r.record.retry_count));
197
198 let context_array = StringArray::from_iter_values(records.iter().map(|r| {
199 serde_json::to_string(&r.record.context).unwrap_or_else(|_| "{}".to_string())
200 }));
201
202 let trace_id_array = FixedSizeBinaryArray::try_from_sparse_iter_with_size(
203 records.iter().map(|r| {
204 r.record
205 .trace_id
206 .as_ref()
207 .map(|tid| tid.as_bytes().to_vec())
208 }),
209 16,
210 )?;
211
212 let batch = RecordBatch::try_new(
213 self.schema.clone(),
214 vec![
215 Arc::new(id_array),
216 Arc::new(uid_array),
217 Arc::new(entity_id_array),
218 Arc::new(entity_uid_array),
219 Arc::new(entity_type_array),
220 Arc::new(record_id_array),
221 Arc::new(session_id_array),
222 Arc::new(status_array),
223 Arc::new(created_at_array),
224 Arc::new(updated_at_array),
225 Arc::new(processing_started_at_array),
226 Arc::new(processing_ended_at_array),
227 Arc::new(processing_duration_array),
228 Arc::new(retry_count_array),
229 Arc::new(context_array),
230 Arc::new(trace_id_array),
231 ],
232 )?;
233
234 Ok(batch)
235 }
236}