scouter_events/queue/llm/
queue.rs

1use crate::error::EventError;
2use crate::producer::RustScouterProducer;
3use crate::queue::llm::record_queue::LLMRecordQueue;
4use crate::queue::traits::BackgroundTask;
5use crate::queue::traits::QueueMethods;
6use crate::queue::types::TransportConfig;
7use async_trait::async_trait;
8use chrono::{DateTime, Utc};
9use crossbeam_queue::ArrayQueue;
10use scouter_types::llm::LLMDriftProfile;
11use scouter_types::LLMRecord;
12use std::sync::Arc;
13use std::sync::RwLock;
14use tracing::debug;
15
16/// The following code is a custom queue implementation for handling custom metrics.
17/// It consists of a `CustomQueue` struct that manages a queue of metrics and a background task
18///
19/// components:
20/// - `metrics_queue`: Crossbeam queue for storing metrics.
21/// - `feature_queue`: A Custom metric helper for converting batches of metrics to drift records.
22/// - `producer`: A RustScouter producer for publishing records.
23/// - `count`: Atomic counter for tracking the number of metrics.
24/// - `last_publish`: A timestamp for the last publish time.
25/// - `stop_tx`: A channel for stopping the background task.
26/// - `rt`: A Tokio runtime for executing asynchronous tasks.
27/// - `sample_size`: The size of the sample.
28/// - `sample`: A boolean indicating whether to sample metrics.
29pub struct LLMQueue {
30    queue: Arc<ArrayQueue<LLMRecord>>,
31    record_queue: Arc<LLMRecordQueue>,
32    producer: RustScouterProducer,
33    last_publish: Arc<RwLock<DateTime<Utc>>>,
34    capacity: usize,
35    sample_rate_percentage: f64,
36}
37
38impl LLMQueue {
39    pub async fn new(
40        drift_profile: LLMDriftProfile,
41        config: TransportConfig,
42    ) -> Result<Self, EventError> {
43        let sample_rate = drift_profile.config.sample_rate;
44
45        // calculate sample rate percentage (1 / sample_rate)
46        let sample_rate_percentage = 1.0 / sample_rate as f64;
47
48        debug!("Creating LLM Drift Queue");
49        // ArrayQueue size is based on sample rate
50        let queue = Arc::new(ArrayQueue::new(sample_rate * 2));
51        let record_queue = Arc::new(LLMRecordQueue::new(drift_profile));
52        let last_publish = Arc::new(RwLock::new(Utc::now()));
53
54        let producer = RustScouterProducer::new(config).await?;
55
56        let llm_queue = LLMQueue {
57            queue,
58            record_queue,
59            producer,
60            last_publish,
61            capacity: sample_rate,
62            sample_rate_percentage,
63        };
64
65        Ok(llm_queue)
66    }
67
68    pub fn should_insert(&self) -> bool {
69        // if the sample rate is 1, we always insert
70        if self.sample_rate_percentage == 1.0 {
71            return true;
72        }
73        // otherwise, we use the sample rate to determine if we should insert
74        rand::random::<f64>() < self.sample_rate_percentage
75    }
76}
77
78impl BackgroundTask for LLMQueue {
79    type DataItem = LLMRecord;
80    type Processor = LLMRecordQueue;
81}
82
83#[async_trait]
84/// Implementing primary methods
85impl QueueMethods for LLMQueue {
86    type ItemType = LLMRecord;
87    type FeatureQueue = LLMRecordQueue;
88
89    fn capacity(&self) -> usize {
90        self.capacity
91    }
92
93    fn get_producer(&mut self) -> &mut RustScouterProducer {
94        &mut self.producer
95    }
96
97    fn queue(&self) -> Arc<ArrayQueue<Self::ItemType>> {
98        self.queue.clone()
99    }
100
101    fn feature_queue(&self) -> Arc<Self::FeatureQueue> {
102        self.record_queue.clone()
103    }
104
105    fn last_publish(&self) -> Arc<RwLock<DateTime<Utc>>> {
106        self.last_publish.clone()
107    }
108
109    fn should_process(&self, current_count: usize) -> bool {
110        current_count >= self.capacity()
111    }
112
113    async fn flush(&mut self) -> Result<(), EventError> {
114        // publish any remaining drift records
115        self.try_publish(self.queue()).await?;
116
117        self.producer.flush().await?;
118        Ok(())
119    }
120}