scouter_events/queue/llm/
queue.rs1use crate::error::EventError;
2use crate::producer::RustScouterProducer;
3use crate::queue::llm::record_queue::LLMRecordQueue;
4use crate::queue::traits::BackgroundTask;
5use crate::queue::traits::QueueMethods;
6use crate::queue::types::TransportConfig;
7use async_trait::async_trait;
8use chrono::{DateTime, Utc};
9use crossbeam_queue::ArrayQueue;
10use scouter_types::llm::LLMDriftProfile;
11use scouter_types::LLMRecord;
12use std::sync::Arc;
13use std::sync::RwLock;
14use tracing::debug;
15
16pub struct LLMQueue {
30 queue: Arc<ArrayQueue<LLMRecord>>,
31 record_queue: Arc<LLMRecordQueue>,
32 producer: RustScouterProducer,
33 last_publish: Arc<RwLock<DateTime<Utc>>>,
34 capacity: usize,
35 sample_rate_percentage: f64,
36}
37
38impl LLMQueue {
39 pub async fn new(
40 drift_profile: LLMDriftProfile,
41 config: TransportConfig,
42 ) -> Result<Self, EventError> {
43 let sample_rate = drift_profile.config.sample_rate;
44
45 let sample_rate_percentage = 1.0 / sample_rate as f64;
47
48 debug!("Creating LLM Drift Queue");
49 let queue = Arc::new(ArrayQueue::new(sample_rate * 2));
51 let record_queue = Arc::new(LLMRecordQueue::new(drift_profile));
52 let last_publish = Arc::new(RwLock::new(Utc::now()));
53
54 let producer = RustScouterProducer::new(config).await?;
55
56 let llm_queue = LLMQueue {
57 queue,
58 record_queue,
59 producer,
60 last_publish,
61 capacity: sample_rate,
62 sample_rate_percentage,
63 };
64
65 Ok(llm_queue)
66 }
67
68 pub fn should_insert(&self) -> bool {
69 if self.sample_rate_percentage == 1.0 {
71 return true;
72 }
73 rand::random::<f64>() < self.sample_rate_percentage
75 }
76}
77
78impl BackgroundTask for LLMQueue {
79 type DataItem = LLMRecord;
80 type Processor = LLMRecordQueue;
81}
82
83#[async_trait]
84impl QueueMethods for LLMQueue {
86 type ItemType = LLMRecord;
87 type FeatureQueue = LLMRecordQueue;
88
89 fn capacity(&self) -> usize {
90 self.capacity
91 }
92
93 fn get_producer(&mut self) -> &mut RustScouterProducer {
94 &mut self.producer
95 }
96
97 fn queue(&self) -> Arc<ArrayQueue<Self::ItemType>> {
98 self.queue.clone()
99 }
100
101 fn feature_queue(&self) -> Arc<Self::FeatureQueue> {
102 self.record_queue.clone()
103 }
104
105 fn last_publish(&self) -> Arc<RwLock<DateTime<Utc>>> {
106 self.last_publish.clone()
107 }
108
109 fn should_process(&self, current_count: usize) -> bool {
110 current_count >= self.capacity()
111 }
112
113 async fn flush(&mut self) -> Result<(), EventError> {
114 self.try_publish(self.queue()).await?;
116
117 self.producer.flush().await?;
118 Ok(())
119 }
120}