scouter_events/queue/llm/
record_queue.rs

1use crate::error::FeatureQueueError;
2use crate::queue::traits::FeatureQueue;
3use core::result::Result::Ok;
4use scouter_types::BoxedLLMDriftServerRecord;
5use scouter_types::LLMRecord;
6use scouter_types::QueueExt;
7use scouter_types::{llm::LLMDriftProfile, LLMDriftServerRecord, ServerRecord, ServerRecords};
8use tracing::instrument;
9pub struct LLMRecordQueue {
10    drift_profile: LLMDriftProfile,
11    empty_queue: Vec<LLMRecord>,
12}
13
14impl LLMRecordQueue {
15    pub fn new(drift_profile: LLMDriftProfile) -> Self {
16        LLMRecordQueue {
17            drift_profile,
18            empty_queue: Vec::new(),
19        }
20    }
21
22    /// Insert llm records into the queue
23    ///
24    /// # Arguments
25    ///
26    /// * `records` - A vector of llm records to insert into the queue
27    ///
28    /// # Returns
29    ///
30    /// * `Result<(), FeatureQueueError>` - A result indicating success or failure
31    #[instrument(skip_all, name = "insert_llm")]
32    pub fn insert(
33        &self,
34        records: Vec<&LLMRecord>,
35        queue: &mut Vec<LLMRecord>,
36    ) -> Result<(), FeatureQueueError> {
37        for record in records {
38            queue.push(record.clone());
39        }
40        Ok(())
41    }
42
43    fn create_drift_records(
44        &self,
45        queue: Vec<LLMRecord>,
46    ) -> Result<ServerRecords, FeatureQueueError> {
47        let records = queue
48            .iter()
49            .map(|record| {
50                ServerRecord::LLMDrift(BoxedLLMDriftServerRecord::new(
51                    LLMDriftServerRecord::new_rs(
52                        self.drift_profile.config.space.clone(),
53                        self.drift_profile.config.name.clone(),
54                        self.drift_profile.config.version.clone(),
55                        record.prompt.clone(),
56                        record.context.clone(),
57                        record.created_at,
58                        record.uid.clone(),
59                        record.score.clone(),
60                    ),
61                )) // Removed the semicolon here
62            })
63            .collect::<Vec<ServerRecord>>();
64
65        Ok(ServerRecords::new(records))
66    }
67}
68
69impl FeatureQueue for LLMRecordQueue {
70    fn create_drift_records_from_batch<T: QueueExt>(
71        &self,
72        batch: Vec<T>,
73    ) -> Result<ServerRecords, FeatureQueueError> {
74        // clones the empty map (so we don't need to recreate it on each call)
75        let mut queue = self.empty_queue.clone();
76
77        for elem in batch {
78            self.insert(elem.llm_records(), &mut queue)?;
79        }
80
81        self.create_drift_records(queue)
82    }
83}
84
85#[cfg(test)]
86mod tests {
87
88    use super::*;
89    use potato_head::create_score_prompt;
90    use scouter_types::llm::{LLMAlertConfig, LLMDriftConfig, LLMDriftMetric, LLMDriftProfile};
91    use scouter_types::AlertThreshold;
92
93    async fn get_test_drift_profile() -> LLMDriftProfile {
94        let prompt = create_score_prompt(Some(vec!["input".to_string()]));
95        let metric1 = LLMDriftMetric::new(
96            "coherence",
97            5.0,
98            AlertThreshold::Below,
99            Some(0.5),
100            Some(prompt.clone()),
101        )
102        .unwrap();
103
104        let metric2 = LLMDriftMetric::new(
105            "relevancy",
106            5.0,
107            AlertThreshold::Below,
108            None,
109            Some(prompt.clone()),
110        )
111        .unwrap();
112
113        let alert_config = LLMAlertConfig::default();
114        let drift_config =
115            LLMDriftConfig::new("scouter", "ML", "0.1.0", 25, alert_config, None).unwrap();
116
117        LLMDriftProfile::from_metrics(drift_config, vec![metric1, metric2])
118            .await
119            .unwrap()
120    }
121
122    #[test]
123    fn test_feature_queue_llm_insert_record() {
124        let runtime = tokio::runtime::Runtime::new().unwrap();
125        let drift_profile = runtime.block_on(async { get_test_drift_profile().await });
126        let feature_queue = LLMRecordQueue::new(drift_profile);
127
128        assert_eq!(feature_queue.empty_queue.len(), 0);
129
130        let mut record_batch = Vec::new();
131        for _ in 0..1 {
132            let mut new_map = serde_json::Map::new();
133            // insert entry in map
134            new_map.insert("input".into(), serde_json::Value::String("test".into()));
135            let context = serde_json::Value::Object(new_map);
136
137            let record = LLMRecord::new_rs(Some(context), None);
138            record_batch.push(record);
139        }
140
141        let records = feature_queue
142            .create_drift_records_from_batch(record_batch)
143            .unwrap();
144
145        // empty should be excluded
146        assert_eq!(records.records.len(), 1);
147    }
148}