scouter_dataframe/parquet/llm/
drift.rs1use crate::error::DataFrameError;
3use crate::parquet::traits::ParquetFrame;
4use crate::parquet::types::BinnedTableName;
5use crate::storage::ObjectStore;
6use arrow::datatypes::{DataType, Field, Schema, TimeUnit};
7use arrow_array::array::{Int32Array, StringArray, TimestampNanosecondArray};
8use arrow_array::RecordBatch;
9use async_trait::async_trait;
10use chrono::{DateTime, Utc};
11use datafusion::dataframe::DataFrame;
12use datafusion::prelude::SessionContext;
13use scouter_settings::ObjectStorageSettings;
14
15use scouter_types::{LLMDriftServerRecord, ServerRecords, StorageType, ToDriftRecords};
16use std::sync::Arc;
17
18pub struct LLMDriftDataFrame {
19 schema: Arc<Schema>,
20 pub object_store: ObjectStore,
21}
22
23#[async_trait]
24impl ParquetFrame for LLMDriftDataFrame {
25 fn new(storage_settings: &ObjectStorageSettings) -> Result<Self, DataFrameError> {
26 LLMDriftDataFrame::new(storage_settings)
27 }
28
29 async fn get_dataframe(&self, records: ServerRecords) -> Result<DataFrame, DataFrameError> {
30 let records = records.to_llm_drift_records()?;
31 let batch = self.build_batch(records)?;
32
33 let ctx = self.object_store.get_session()?;
34
35 let df = ctx.read_batches(vec![batch])?;
36
37 Ok(df)
38 }
39
40 fn storage_root(&self) -> String {
41 self.object_store.storage_settings.canonicalized_path()
42 }
43
44 fn storage_type(&self) -> StorageType {
45 self.object_store.storage_settings.storage_type.clone()
46 }
47
48 fn get_session_context(&self) -> Result<SessionContext, DataFrameError> {
49 Ok(self.object_store.get_session()?)
50 }
51
52 fn get_binned_sql(
53 &self,
54 _bin: &f64,
55 _start_time: &DateTime<Utc>,
56 _end_time: &DateTime<Utc>,
57 _space: &str,
58 _name: &str,
59 _version: &str,
60 ) -> String {
61 "None".to_string()
62 }
63
64 fn table_name(&self) -> String {
65 BinnedTableName::LLMDrift.to_string()
66 }
67}
68
69impl LLMDriftDataFrame {
70 pub fn new(storage_settings: &ObjectStorageSettings) -> Result<Self, DataFrameError> {
71 let schema = Arc::new(Schema::new(vec![
72 Field::new("id", DataType::Int64, false),
73 Field::new(
74 "created_at",
75 DataType::Timestamp(TimeUnit::Nanosecond, None),
76 false,
77 ),
78 Field::new("uid", DataType::Utf8, false),
79 Field::new("space", DataType::Utf8, false),
80 Field::new("name", DataType::Utf8, false),
81 Field::new("version", DataType::Utf8, false),
82 Field::new("context", DataType::Utf8, false),
83 Field::new("prompt", DataType::Utf8, true),
84 Field::new("score", DataType::Utf8, true),
85 Field::new(
86 "updated_at",
87 DataType::Timestamp(TimeUnit::Nanosecond, None),
88 true,
89 ),
90 Field::new("status", DataType::Utf8, false),
91 Field::new(
92 "processing_started_at",
93 DataType::Timestamp(TimeUnit::Nanosecond, None),
94 true,
95 ),
96 Field::new(
97 "processing_ended_at",
98 DataType::Timestamp(TimeUnit::Nanosecond, None),
99 true,
100 ),
101 Field::new("processing_duration", DataType::Int32, true),
102 ]));
103
104 let object_store = ObjectStore::new(storage_settings)?;
105
106 Ok(LLMDriftDataFrame {
107 schema,
108 object_store,
109 })
110 }
111
112 fn build_batch(
113 &self,
114 records: Vec<LLMDriftServerRecord>,
115 ) -> Result<RecordBatch, DataFrameError> {
116 let id_array = arrow_array::Int64Array::from_iter_values(records.iter().map(|r| r.id));
117 let created_at_array = TimestampNanosecondArray::from_iter_values(
118 records
119 .iter()
120 .map(|r| r.created_at.timestamp_nanos_opt().unwrap_or_default()),
121 );
122 let uid_array = StringArray::from_iter_values(records.iter().map(|r| r.uid.as_str()));
123 let space_array = StringArray::from_iter_values(records.iter().map(|r| r.space.as_str()));
124 let name_array = StringArray::from_iter_values(records.iter().map(|r| r.name.as_str()));
125 let version_array =
126 StringArray::from_iter_values(records.iter().map(|r| r.version.as_str()));
127
128 let score_array = StringArray::from_iter_values(
129 records
130 .iter()
131 .map(|r| serde_json::to_string(&r.score).unwrap_or_else(|_| "{}".to_string())),
132 );
133 let context_array = StringArray::from_iter_values(
134 records
135 .iter()
136 .map(|r| serde_json::to_string(&r.context).unwrap_or_else(|_| "{}".to_string())),
137 );
138
139 let prompt_array = StringArray::from_iter(records.iter().map(|r| {
140 r.prompt
141 .as_ref()
142 .map(|p| serde_json::to_string(p).unwrap_or_else(|_| "{}".to_string()))
143 }));
144 let updated_at_array = TimestampNanosecondArray::from_iter(
145 records
146 .iter()
147 .map(|r| r.updated_at.and_then(|dt| dt.timestamp_nanos_opt())),
148 );
149 let status_array =
150 StringArray::from_iter_values(records.iter().map(|r| r.status.to_string()));
151
152 let processing_started_at_array =
153 TimestampNanosecondArray::from_iter(records.iter().map(|r| {
154 r.processing_started_at
155 .and_then(|dt| dt.timestamp_nanos_opt())
156 }));
157
158 let processing_ended_at_array =
159 TimestampNanosecondArray::from_iter(records.iter().map(|r| {
160 r.processing_ended_at
161 .and_then(|dt| dt.timestamp_nanos_opt())
162 }));
163
164 let processing_duration_array =
166 Int32Array::from_iter(records.iter().map(|r| r.processing_duration));
167
168 let batch = RecordBatch::try_new(
169 self.schema.clone(),
170 vec![
171 Arc::new(id_array),
172 Arc::new(created_at_array),
173 Arc::new(uid_array),
174 Arc::new(space_array),
175 Arc::new(name_array),
176 Arc::new(version_array),
177 Arc::new(context_array),
178 Arc::new(prompt_array),
179 Arc::new(score_array),
180 Arc::new(updated_at_array),
181 Arc::new(status_array),
182 Arc::new(processing_started_at_array),
183 Arc::new(processing_ended_at_array),
184 Arc::new(processing_duration_array),
185 ],
186 )?;
187
188 Ok(batch)
189 }
190}