scouter_events/queue/traits/
queue.rs

1// implements a BackgroundQueue trait
2
3use crate::error::{EventError, FeatureQueueError};
4use crate::producer::RustScouterProducer;
5use async_trait::async_trait;
6use chrono::{DateTime, Utc};
7use crossbeam_queue::ArrayQueue;
8use scouter_types::QueueExt;
9use scouter_types::ServerRecords;
10use std::sync::Arc;
11use std::sync::RwLock;
12use tokio::runtime::Runtime;
13use tokio::sync::watch;
14use tokio::time::{sleep, Duration};
15
16use tracing::{debug, error, info, info_span, Instrument};
17
18pub trait FeatureQueue: Send + Sync {
19    fn create_drift_records_from_batch<T: QueueExt>(
20        &self,
21        batch: Vec<T>,
22    ) -> Result<ServerRecords, FeatureQueueError>;
23}
24
25pub trait BackgroundTask {
26    type DataItem: QueueExt + Send + Sync + 'static;
27    type Processor: FeatureQueue + Send + Sync + 'static;
28
29    #[allow(clippy::too_many_arguments)]
30    fn start_background_task(
31        &self,
32        data_queue: Arc<ArrayQueue<Self::DataItem>>,
33        processor: Arc<Self::Processor>,
34        mut producer: RustScouterProducer,
35        last_publish: Arc<RwLock<DateTime<Utc>>>,
36        runtime: Arc<Runtime>,
37        mut stop_rx: watch::Receiver<()>,
38        queue_capacity: usize,
39        label: &'static str,
40    ) -> Result<(), EventError> {
41        let future = async move {
42            loop {
43                tokio::select! {
44                    _ = sleep(Duration::from_secs(2)) => {
45                        let now = Utc::now();
46
47                        // Scope the read guard to drop it before the future is sent
48                        let should_process = {
49                            if let Ok(last) = last_publish.read() {
50                                (now - *last).num_seconds() >= 30
51                            } else {
52                                false
53                            }
54                        };
55
56                        if should_process {
57                            debug!("Processing queued data");
58
59                            let mut batch = Vec::with_capacity(queue_capacity);
60                            while let Some(item) = data_queue.pop() {
61                                batch.push(item);
62                            }
63
64                             // Always update last_publish time, regardless of batch processing result
65                             if let Ok(mut guard) = last_publish.write() {
66                                *guard = now;
67                            }
68
69                            if !batch.is_empty() {
70                                match processor.create_drift_records_from_batch(batch) {
71                                    Ok(records) => {
72                                        if let Err(e) = producer.publish(records).await {
73                                            error!("Failed to publish records: {}", e);
74                                        } else {
75
76                                            debug!("Successfully published records");
77                                        }
78                                    }
79                                    Err(e) => error!("Failed to create drift records: {}", e),
80                                }
81                            }
82
83                        }
84                    },
85                    _ = stop_rx.changed() => {
86                        info!("Stopping background task");
87                        if let Err(e) = producer.flush().await {
88                            error!("Failed to flush producer: {}", e);
89                        }
90                        break;
91                    }
92                }
93            }
94        };
95
96        let span = info_span!("background_task", task = %label);
97        runtime.spawn(future.instrument(span));
98        Ok(())
99    }
100}
101
102/// This is a primary trait implemented on all queues
103/// It provides the basic functionality for inserting, publishing, and flushing
104#[async_trait]
105pub trait QueueMethods {
106    type ItemType: QueueExt + 'static + Clone;
107    type FeatureQueue: FeatureQueue + 'static;
108
109    /// These all need to be implemented in the concrete queue type
110    fn capacity(&self) -> usize;
111    fn get_producer(&mut self) -> &mut RustScouterProducer;
112    fn queue(&self) -> Arc<ArrayQueue<Self::ItemType>>;
113    fn feature_queue(&self) -> Arc<Self::FeatureQueue>;
114    fn last_publish(&self) -> Arc<RwLock<DateTime<Utc>>>;
115    fn should_process(&self, current_count: usize) -> bool;
116
117    fn update_last_publish(&mut self) -> Result<(), EventError> {
118        if let Ok(mut last_publish) = self.last_publish().write() {
119            *last_publish = Utc::now();
120        }
121
122        Ok(())
123    }
124
125    /// Publish the records to the producer
126    /// Remember - everything flows down from python, so the async producers need
127    /// to be called in a blocking manner
128    async fn publish(&mut self, records: ServerRecords) -> Result<(), EventError> {
129        let producer = self.get_producer();
130        producer.publish(records).await
131    }
132
133    /// Insert an item into the queue
134    async fn insert(&mut self, item: Self::ItemType) -> Result<(), EventError> {
135        self.insert_with_backpressure(item).await?;
136
137        let queue = self.queue();
138
139        // Check if we need to process the queue
140        // queues have a buffer in case of overflow, so we need to check if we are over the capacity, which is smaller
141        if queue.len() >= self.capacity() {
142            self.try_publish(queue).await?;
143        }
144
145        Ok(())
146    }
147
148    /// Process the queue and publish records
149    async fn try_publish(
150        &mut self,
151        queue: Arc<ArrayQueue<Self::ItemType>>,
152    ) -> Result<(), EventError> {
153        let mut batch = Vec::with_capacity(queue.capacity());
154
155        while let Some(metrics) = queue.pop() {
156            batch.push(metrics);
157        }
158
159        if !batch.is_empty() {
160            let feature_queue = self.feature_queue();
161            match feature_queue.create_drift_records_from_batch(batch) {
162                Ok(records) => {
163                    self.publish(records).await?;
164                    self.update_last_publish()?;
165                }
166                Err(e) => error!("Failed to create drift records: {}", e),
167            }
168        }
169
170        Ok(())
171    }
172
173    /// Flush the queue and shut down background tasks
174    async fn flush(&mut self) -> Result<(), EventError>;
175
176    /// Backpressure handling for inserting items into the queue
177    /// This will retry inserting the item a few times with exponential backoff
178    async fn insert_with_backpressure(&mut self, item: Self::ItemType) -> Result<(), EventError> {
179        let queue = self.queue();
180        let max_retries = 3;
181        let mut current_retry = 0;
182
183        while current_retry < max_retries {
184            match queue.push(item.clone()) {
185                Ok(_) => return Ok(()),
186                Err(_) => {
187                    current_retry += 1;
188                    if current_retry == max_retries {
189                        return Err(EventError::QueuePushError);
190                    }
191                    // Exponential backoff: 100ms, 200ms, 400ms
192                    sleep(Duration::from_millis(100 * 2_u64.pow(current_retry))).await;
193                }
194            }
195        }
196
197        Err(EventError::QueuePushRetryError)
198    }
199}