scouter_events/queue/traits/
queue.rs

1// implements a BackgroundQueue trait
2
3use crate::error::{EventError, FeatureQueueError};
4use crate::producer::RustScouterProducer;
5use crate::queue::bus::TaskState;
6use async_trait::async_trait;
7use chrono::{DateTime, Utc};
8use crossbeam_queue::ArrayQueue;
9use scouter_types::QueueExt;
10use scouter_types::ServerRecords;
11use std::fmt::Debug;
12use std::sync::Arc;
13use std::sync::RwLock;
14use tokio::runtime::Runtime;
15use tokio::task::JoinHandle;
16use tokio::time::{sleep, Duration};
17use tokio_util::sync::CancellationToken;
18use tracing::{debug, error, info, info_span, Instrument};
19
20pub trait FeatureQueue: Send + Sync {
21    fn create_drift_records_from_batch<T: QueueExt>(
22        &self,
23        batch: Vec<T>,
24    ) -> Result<ServerRecords, FeatureQueueError>;
25}
26
27pub trait BackgroundTask: Send + Sync + 'static {
28    type DataItem: QueueExt + Send + Sync + 'static;
29    type Processor: FeatureQueue + Send + Sync + 'static;
30
31    #[allow(clippy::too_many_arguments)]
32    fn start_background_task(
33        &self,
34        data_queue: Arc<ArrayQueue<Self::DataItem>>,
35        processor: Arc<Self::Processor>,
36        mut producer: RustScouterProducer,
37        last_publish: Arc<RwLock<DateTime<Utc>>>,
38        runtime: Arc<Runtime>,
39        queue_capacity: usize,
40        identifier: String,
41        task_state: TaskState,
42        cancellation_token: CancellationToken,
43    ) -> Result<JoinHandle<()>, EventError> {
44        let span = info_span!("background_task", task = %identifier);
45
46        let future = async move {
47            debug!("Starting background task");
48
49            // Set running state immediately
50            task_state.set_background_running(true);
51            debug!("Background task set to running");
52
53            // Small delay to ensure state is propagated
54            sleep(Duration::from_millis(10)).await;
55            loop {
56                tokio::select! {
57                    _ = sleep(Duration::from_secs(2)) => {
58                        debug!("Waking up background task");
59
60                        let now = Utc::now();
61
62                        // Scope the read guard to drop it before the future is sent
63                        let should_process = {
64                            if let Ok(last) = last_publish.read() {
65                                (now - *last).num_seconds() >= 30
66                            } else {
67                                false
68                            }
69                        };
70
71                        if should_process {
72                            let mut batch = Vec::with_capacity(queue_capacity);
73                            while let Some(item) = data_queue.pop() {
74                                batch.push(item);
75                            }
76
77                            // Always update last_publish time, regardless of batch processing result
78                            if let Ok(mut guard) = last_publish.write() {
79                                *guard = now;
80                            }
81
82                            if !batch.is_empty() {
83                                match processor.create_drift_records_from_batch(batch) {
84                                    Ok(records) => {
85
86                                        // publish
87                                        if let Err(e) = producer.publish(records).await {
88                                            error!("Failed to publish records: {}", e);
89                                        } else {
90                                            info!("Successfully published records");
91                                        }
92                                    }
93                                    Err(e) => error!("Failed to create drift records: {}", e),
94                                }
95                            }
96
97                        }
98                    }
99                    _ = cancellation_token.cancelled()  => {
100                        info!("Stop signal received, shutting down background task");
101                        task_state.set_background_running(false);
102                        break;
103                    }
104                    else =>  {
105                        info!("Stop signal received, shutting down background task");
106                        task_state.set_background_running(false);
107                        break;
108                    }
109                }
110            }
111            debug!("Background task finished");
112        };
113
114        let handle = runtime.spawn(async move { future.instrument(span).await });
115        Ok(handle)
116    }
117}
118
119/// This is a primary trait implemented on all queues
120/// It provides the basic functionality for inserting, publishing, and flushing
121#[async_trait]
122pub trait QueueMethods {
123    type ItemType: QueueExt + 'static + Clone + Debug;
124    type FeatureQueue: FeatureQueue + 'static;
125
126    /// These all need to be implemented in the concrete queue type
127    fn capacity(&self) -> usize;
128    fn get_producer(&mut self) -> &mut RustScouterProducer;
129    fn queue(&self) -> Arc<ArrayQueue<Self::ItemType>>;
130    fn feature_queue(&self) -> Arc<Self::FeatureQueue>;
131    fn last_publish(&self) -> Arc<RwLock<DateTime<Utc>>>;
132    fn should_process(&self, current_count: usize) -> bool;
133
134    fn update_last_publish(&mut self) -> Result<(), EventError> {
135        if let Ok(mut last_publish) = self.last_publish().write() {
136            *last_publish = Utc::now();
137        }
138
139        Ok(())
140    }
141
142    /// Publish the records to the producer
143    /// Remember - everything flows down from python, so the async producers need
144    /// to be called in a blocking manner
145    async fn publish(&mut self, records: ServerRecords) -> Result<(), EventError> {
146        let producer = self.get_producer();
147        producer.publish(records).await
148    }
149
150    /// Insert an item into the queue
151    async fn insert(&mut self, item: Self::ItemType) -> Result<(), EventError> {
152        debug!("Inserting item into queue: {:?}", item);
153
154        self.insert_with_backpressure(item).await?;
155
156        let queue = self.queue();
157
158        // Check if we need to process the queue
159        // queues have a buffer in case of overflow, so we need to check if we are over the capacity, which is smaller
160        if queue.len() >= self.capacity() {
161            debug!(
162                "Queue reached capacity, processing queue, current count: {}, current_capacity: {}",
163                queue.len(),
164                self.capacity()
165            );
166            self.try_publish(queue.clone()).await?;
167        }
168
169        Ok(())
170    }
171
172    /// Process the queue and publish records
173    async fn try_publish(
174        &mut self,
175        queue: Arc<ArrayQueue<Self::ItemType>>,
176    ) -> Result<(), EventError> {
177        let mut batch = Vec::with_capacity(queue.capacity());
178
179        while let Some(metrics) = queue.pop() {
180            batch.push(metrics);
181        }
182
183        if !batch.is_empty() {
184            let feature_queue = self.feature_queue();
185            match feature_queue.create_drift_records_from_batch(batch) {
186                Ok(records) => {
187                    self.publish(records).await?;
188                    self.update_last_publish()?;
189                }
190                Err(e) => error!("Failed to create drift records: {}", e),
191            }
192        }
193
194        Ok(())
195    }
196
197    /// Flush the queue and shut down background tasks
198    async fn flush(&mut self) -> Result<(), EventError>;
199
200    /// Backpressure handling for inserting items into the queue
201    /// This will retry inserting the item a few times with exponential backoff
202    async fn insert_with_backpressure(&mut self, item: Self::ItemType) -> Result<(), EventError> {
203        let queue = self.queue();
204        let max_retries = 3;
205        let mut current_retry: u32 = 0;
206
207        while current_retry < max_retries {
208            match queue.push(item.clone()) {
209                Ok(_) => return Ok(()),
210                Err(_) => {
211                    current_retry += 1;
212                    if current_retry == max_retries {
213                        return Err(EventError::QueuePushError);
214                    }
215                    // Added exponential backoff: 100ms, 200ms, 400ms
216                    sleep(Duration::from_millis(100 * 2_u64.pow(current_retry))).await;
217                }
218            }
219        }
220
221        Err(EventError::QueuePushRetryError)
222    }
223}
224
225/// Waits for the background loop to start
226pub fn wait_for_background_task(task_state: &TaskState) -> Result<(), EventError> {
227    // Signal confirm start
228    if task_state.has_background_handle() {
229        let mut max_retries = 50;
230        while max_retries > 0 {
231            if task_state.is_background_running() {
232                debug!("Background loop started successfully");
233                return Ok(());
234            }
235            max_retries -= 1;
236            std::thread::sleep(Duration::from_millis(200));
237        }
238        error!("Background task failed to start");
239        Err(EventError::BackgroundTaskFailedToStartError)
240    } else {
241        debug!("No background handle to wait for");
242        Ok(())
243    }
244}
245
246/// Waits for the event task to start
247pub fn wait_for_event_task(task_state: &TaskState) -> Result<(), EventError> {
248    // Signal confirm start
249
250    let mut max_retries = 50;
251    while max_retries > 0 {
252        if task_state.is_event_running() {
253            debug!("Event task started successfully");
254            return Ok(());
255        }
256        max_retries -= 1;
257        std::thread::sleep(Duration::from_millis(200));
258    }
259    error!("Event task failed to start");
260    Err(EventError::EventTaskFailedToStartError)
261}