scouter_events/queue/llm/
record_queue.rs1use crate::error::FeatureQueueError;
2use crate::queue::traits::FeatureQueue;
3use core::result::Result::Ok;
4use scouter_types::BoxedLLMDriftServerRecord;
5use scouter_types::LLMRecord;
6use scouter_types::QueueExt;
7use scouter_types::{llm::LLMDriftProfile, LLMDriftServerRecord, ServerRecord, ServerRecords};
8use tracing::instrument;
9pub struct LLMRecordQueue {
10 drift_profile: LLMDriftProfile,
11 empty_queue: Vec<LLMRecord>,
12}
13
14impl LLMRecordQueue {
15 pub fn new(drift_profile: LLMDriftProfile) -> Self {
16 LLMRecordQueue {
17 drift_profile,
18 empty_queue: Vec::new(),
19 }
20 }
21
22 #[instrument(skip_all, name = "insert_llm")]
32 pub fn insert(
33 &self,
34 records: Vec<&LLMRecord>,
35 queue: &mut Vec<LLMRecord>,
36 ) -> Result<(), FeatureQueueError> {
37 for record in records {
38 queue.push(record.clone());
39 }
40 Ok(())
41 }
42
43 fn create_drift_records(
44 &self,
45 queue: Vec<LLMRecord>,
46 ) -> Result<ServerRecords, FeatureQueueError> {
47 let records = queue
48 .iter()
49 .map(|record| {
50 ServerRecord::LLMDrift(BoxedLLMDriftServerRecord::new(
51 LLMDriftServerRecord::new_rs(
52 self.drift_profile.config.space.clone(),
53 self.drift_profile.config.name.clone(),
54 self.drift_profile.config.version.clone(),
55 record.prompt.clone(),
56 record.context.clone(),
57 record.created_at,
58 record.uid.clone(),
59 record.score.clone(),
60 ),
61 )) })
63 .collect::<Vec<ServerRecord>>();
64
65 Ok(ServerRecords::new(records))
66 }
67}
68
69impl FeatureQueue for LLMRecordQueue {
70 fn create_drift_records_from_batch<T: QueueExt>(
71 &self,
72 batch: Vec<T>,
73 ) -> Result<ServerRecords, FeatureQueueError> {
74 let mut queue = self.empty_queue.clone();
76
77 for elem in batch {
78 self.insert(elem.llm_records(), &mut queue)?;
79 }
80
81 self.create_drift_records(queue)
82 }
83}
84
85#[cfg(test)]
86mod tests {
87
88 use super::*;
89 use potato_head::create_score_prompt;
90 use scouter_types::llm::{LLMAlertConfig, LLMDriftConfig, LLMDriftMetric, LLMDriftProfile};
91 use scouter_types::AlertThreshold;
92
93 async fn get_test_drift_profile() -> LLMDriftProfile {
94 let prompt = create_score_prompt(Some(vec!["input".to_string()]));
95 let metric1 = LLMDriftMetric::new(
96 "coherence",
97 5.0,
98 AlertThreshold::Below,
99 Some(0.5),
100 Some(prompt.clone()),
101 )
102 .unwrap();
103
104 let metric2 = LLMDriftMetric::new(
105 "relevancy",
106 5.0,
107 AlertThreshold::Below,
108 None,
109 Some(prompt.clone()),
110 )
111 .unwrap();
112
113 let alert_config = LLMAlertConfig::default();
114 let drift_config =
115 LLMDriftConfig::new("scouter", "ML", "0.1.0", 25, alert_config, None).unwrap();
116
117 LLMDriftProfile::from_metrics(drift_config, vec![metric1, metric2])
118 .await
119 .unwrap()
120 }
121
122 #[test]
123 fn test_feature_queue_llm_insert_record() {
124 let runtime = tokio::runtime::Runtime::new().unwrap();
125 let drift_profile = runtime.block_on(async { get_test_drift_profile().await });
126 let feature_queue = LLMRecordQueue::new(drift_profile);
127
128 assert_eq!(feature_queue.empty_queue.len(), 0);
129
130 let mut record_batch = Vec::new();
131 for _ in 0..1 {
132 let mut new_map = serde_json::Map::new();
133 new_map.insert("input".into(), serde_json::Value::String("test".into()));
135 let context = serde_json::Value::Object(new_map);
136
137 let record = LLMRecord::new_rs(Some(context), None);
138 record_batch.push(record);
139 }
140
141 let records = feature_queue
142 .create_drift_records_from_batch(record_batch)
143 .unwrap();
144
145 assert_eq!(records.records.len(), 1);
147 }
148}