scouter_events/queue/traits/
queue.rs1use crate::error::{EventError, FeatureQueueError};
4use crate::producer::RustScouterProducer;
5use crate::queue::bus::TaskState;
6use async_trait::async_trait;
7use chrono::{DateTime, Utc};
8use crossbeam_queue::ArrayQueue;
9use scouter_types::QueueExt;
10use scouter_types::ServerRecords;
11use std::fmt::Debug;
12use std::sync::Arc;
13use std::sync::RwLock;
14use tokio::runtime::Runtime;
15use tokio::task::JoinHandle;
16use tokio::time::{sleep, Duration};
17use tokio_util::sync::CancellationToken;
18use tracing::{debug, error, info, info_span, Instrument};
19
20pub trait FeatureQueue: Send + Sync {
21 fn create_drift_records_from_batch<T: QueueExt>(
22 &self,
23 batch: Vec<T>,
24 ) -> Result<ServerRecords, FeatureQueueError>;
25}
26
27pub trait BackgroundTask: Send + Sync + 'static {
28 type DataItem: QueueExt + Send + Sync + 'static;
29 type Processor: FeatureQueue + Send + Sync + 'static;
30
31 #[allow(clippy::too_many_arguments)]
32 fn start_background_task(
33 &self,
34 data_queue: Arc<ArrayQueue<Self::DataItem>>,
35 processor: Arc<Self::Processor>,
36 mut producer: RustScouterProducer,
37 last_publish: Arc<RwLock<DateTime<Utc>>>,
38 runtime: Arc<Runtime>,
39 queue_capacity: usize,
40 identifier: String,
41 task_state: TaskState,
42 cancellation_token: CancellationToken,
43 ) -> Result<JoinHandle<()>, EventError> {
44 let span = info_span!("background_task", task = %identifier);
45
46 let future = async move {
47 debug!("Starting background task");
48
49 task_state.set_background_running(true);
51 debug!("Background task set to running");
52
53 sleep(Duration::from_millis(10)).await;
55 loop {
56 tokio::select! {
57 _ = sleep(Duration::from_secs(2)) => {
58 debug!("Waking up background task");
59
60 let now = Utc::now();
61
62 let should_process = {
64 if let Ok(last) = last_publish.read() {
65 (now - *last).num_seconds() >= 30
66 } else {
67 false
68 }
69 };
70
71 if should_process {
72 let mut batch = Vec::with_capacity(queue_capacity);
73 while let Some(item) = data_queue.pop() {
74 batch.push(item);
75 }
76
77 if let Ok(mut guard) = last_publish.write() {
79 *guard = now;
80 }
81
82 if !batch.is_empty() {
83 match processor.create_drift_records_from_batch(batch) {
84 Ok(records) => {
85
86 if let Err(e) = producer.publish(records).await {
88 error!("Failed to publish records: {}", e);
89 } else {
90 info!("Successfully published records");
91 }
92 }
93 Err(e) => error!("Failed to create drift records: {}", e),
94 }
95 }
96
97 }
98 }
99 _ = cancellation_token.cancelled() => {
100 info!("Stop signal received, shutting down background task");
101 task_state.set_background_running(false);
102 break;
103 }
104 else => {
105 info!("Stop signal received, shutting down background task");
106 task_state.set_background_running(false);
107 break;
108 }
109 }
110 }
111 debug!("Background task finished");
112 };
113
114 let handle = runtime.spawn(async move { future.instrument(span).await });
115 Ok(handle)
116 }
117}
118
119#[async_trait]
122pub trait QueueMethods {
123 type ItemType: QueueExt + 'static + Clone + Debug;
124 type FeatureQueue: FeatureQueue + 'static;
125
126 fn capacity(&self) -> usize;
128 fn get_producer(&mut self) -> &mut RustScouterProducer;
129 fn queue(&self) -> Arc<ArrayQueue<Self::ItemType>>;
130 fn feature_queue(&self) -> Arc<Self::FeatureQueue>;
131 fn last_publish(&self) -> Arc<RwLock<DateTime<Utc>>>;
132 fn should_process(&self, current_count: usize) -> bool;
133
134 fn update_last_publish(&mut self) -> Result<(), EventError> {
135 if let Ok(mut last_publish) = self.last_publish().write() {
136 *last_publish = Utc::now();
137 }
138
139 Ok(())
140 }
141
142 async fn publish(&mut self, records: ServerRecords) -> Result<(), EventError> {
146 let producer = self.get_producer();
147 producer.publish(records).await
148 }
149
150 async fn insert(&mut self, item: Self::ItemType) -> Result<(), EventError> {
152 debug!("Inserting item into queue: {:?}", item);
153
154 self.insert_with_backpressure(item).await?;
155
156 let queue = self.queue();
157
158 if queue.len() >= self.capacity() {
161 debug!(
162 "Queue reached capacity, processing queue, current count: {}, current_capacity: {}",
163 queue.len(),
164 self.capacity()
165 );
166 self.try_publish(queue.clone()).await?;
167 }
168
169 Ok(())
170 }
171
172 async fn try_publish(
174 &mut self,
175 queue: Arc<ArrayQueue<Self::ItemType>>,
176 ) -> Result<(), EventError> {
177 let mut batch = Vec::with_capacity(queue.capacity());
178
179 while let Some(metrics) = queue.pop() {
180 batch.push(metrics);
181 }
182
183 if !batch.is_empty() {
184 let feature_queue = self.feature_queue();
185 match feature_queue.create_drift_records_from_batch(batch) {
186 Ok(records) => {
187 self.publish(records).await?;
188 self.update_last_publish()?;
189 }
190 Err(e) => error!("Failed to create drift records: {}", e),
191 }
192 }
193
194 Ok(())
195 }
196
197 async fn flush(&mut self) -> Result<(), EventError>;
199
200 async fn insert_with_backpressure(&mut self, item: Self::ItemType) -> Result<(), EventError> {
203 let queue = self.queue();
204 let max_retries = 3;
205 let mut current_retry: u32 = 0;
206
207 while current_retry < max_retries {
208 match queue.push(item.clone()) {
209 Ok(_) => return Ok(()),
210 Err(_) => {
211 current_retry += 1;
212 if current_retry == max_retries {
213 return Err(EventError::QueuePushError);
214 }
215 sleep(Duration::from_millis(100 * 2_u64.pow(current_retry))).await;
217 }
218 }
219 }
220
221 Err(EventError::QueuePushRetryError)
222 }
223}
224
225pub fn wait_for_background_task(task_state: &TaskState) -> Result<(), EventError> {
227 if task_state.has_background_handle() {
229 let mut max_retries = 50;
230 while max_retries > 0 {
231 if task_state.is_background_running() {
232 debug!("Background loop started successfully");
233 return Ok(());
234 }
235 max_retries -= 1;
236 std::thread::sleep(Duration::from_millis(200));
237 }
238 error!("Background task failed to start");
239 Err(EventError::BackgroundTaskFailedToStartError)
240 } else {
241 debug!("No background handle to wait for");
242 Ok(())
243 }
244}
245
246pub fn wait_for_event_task(task_state: &TaskState) -> Result<(), EventError> {
248 let mut max_retries = 50;
251 while max_retries > 0 {
252 if task_state.is_event_running() {
253 debug!("Event task started successfully");
254 return Ok(());
255 }
256 max_retries -= 1;
257 std::thread::sleep(Duration::from_millis(200));
258 }
259 error!("Event task failed to start");
260 Err(EventError::EventTaskFailedToStartError)
261}