scouter_events/queue/traits/
queue.rs1use crate::error::{EventError, FeatureQueueError};
4use crate::producer::RustScouterProducer;
5use async_trait::async_trait;
6use chrono::{DateTime, Utc};
7use crossbeam_queue::ArrayQueue;
8use scouter_types::QueueExt;
9use scouter_types::ServerRecords;
10use std::sync::Arc;
11use std::sync::RwLock;
12use tokio::runtime::Runtime;
13use tokio::sync::watch;
14use tokio::time::{sleep, Duration};
15
16use tracing::{debug, error, info, info_span, Instrument};
17
18pub trait FeatureQueue: Send + Sync {
19 fn create_drift_records_from_batch<T: QueueExt>(
20 &self,
21 batch: Vec<T>,
22 ) -> Result<ServerRecords, FeatureQueueError>;
23}
24
25pub trait BackgroundTask {
26 type DataItem: QueueExt + Send + Sync + 'static;
27 type Processor: FeatureQueue + Send + Sync + 'static;
28
29 #[allow(clippy::too_many_arguments)]
30 fn start_background_task(
31 &self,
32 data_queue: Arc<ArrayQueue<Self::DataItem>>,
33 processor: Arc<Self::Processor>,
34 mut producer: RustScouterProducer,
35 last_publish: Arc<RwLock<DateTime<Utc>>>,
36 runtime: Arc<Runtime>,
37 mut stop_rx: watch::Receiver<()>,
38 queue_capacity: usize,
39 label: &'static str,
40 ) -> Result<(), EventError> {
41 let future = async move {
42 loop {
43 tokio::select! {
44 _ = sleep(Duration::from_secs(2)) => {
45 let now = Utc::now();
46
47 let should_process = {
49 if let Ok(last) = last_publish.read() {
50 (now - *last).num_seconds() >= 30
51 } else {
52 false
53 }
54 };
55
56 if should_process {
57 debug!("Processing queued data");
58
59 let mut batch = Vec::with_capacity(queue_capacity);
60 while let Some(item) = data_queue.pop() {
61 batch.push(item);
62 }
63
64 if let Ok(mut guard) = last_publish.write() {
66 *guard = now;
67 }
68
69 if !batch.is_empty() {
70 match processor.create_drift_records_from_batch(batch) {
71 Ok(records) => {
72 if let Err(e) = producer.publish(records).await {
73 error!("Failed to publish records: {}", e);
74 } else {
75
76 debug!("Successfully published records");
77 }
78 }
79 Err(e) => error!("Failed to create drift records: {}", e),
80 }
81 }
82
83 }
84 },
85 _ = stop_rx.changed() => {
86 info!("Stopping background task");
87 if let Err(e) = producer.flush().await {
88 error!("Failed to flush producer: {}", e);
89 }
90 break;
91 }
92 }
93 }
94 };
95
96 let span = info_span!("background_task", task = %label);
97 runtime.spawn(future.instrument(span));
98 Ok(())
99 }
100}
101
102#[async_trait]
105pub trait QueueMethods {
106 type ItemType: QueueExt + 'static + Clone;
107 type FeatureQueue: FeatureQueue + 'static;
108
109 fn capacity(&self) -> usize;
111 fn get_producer(&mut self) -> &mut RustScouterProducer;
112 fn queue(&self) -> Arc<ArrayQueue<Self::ItemType>>;
113 fn feature_queue(&self) -> Arc<Self::FeatureQueue>;
114 fn last_publish(&self) -> Arc<RwLock<DateTime<Utc>>>;
115 fn should_process(&self, current_count: usize) -> bool;
116
117 fn update_last_publish(&mut self) -> Result<(), EventError> {
118 if let Ok(mut last_publish) = self.last_publish().write() {
119 *last_publish = Utc::now();
120 }
121
122 Ok(())
123 }
124
125 async fn publish(&mut self, records: ServerRecords) -> Result<(), EventError> {
129 let producer = self.get_producer();
130 producer.publish(records).await
131 }
132
133 async fn insert(&mut self, item: Self::ItemType) -> Result<(), EventError> {
135 self.insert_with_backpressure(item).await?;
136
137 let queue = self.queue();
138
139 if queue.len() >= self.capacity() {
142 self.try_publish(queue).await?;
143 }
144
145 Ok(())
146 }
147
148 async fn try_publish(
150 &mut self,
151 queue: Arc<ArrayQueue<Self::ItemType>>,
152 ) -> Result<(), EventError> {
153 let mut batch = Vec::with_capacity(queue.capacity());
154
155 while let Some(metrics) = queue.pop() {
156 batch.push(metrics);
157 }
158
159 if !batch.is_empty() {
160 let feature_queue = self.feature_queue();
161 match feature_queue.create_drift_records_from_batch(batch) {
162 Ok(records) => {
163 self.publish(records).await?;
164 self.update_last_publish()?;
165 }
166 Err(e) => error!("Failed to create drift records: {}", e),
167 }
168 }
169
170 Ok(())
171 }
172
173 async fn flush(&mut self) -> Result<(), EventError>;
175
176 async fn insert_with_backpressure(&mut self, item: Self::ItemType) -> Result<(), EventError> {
179 let queue = self.queue();
180 let max_retries = 3;
181 let mut current_retry = 0;
182
183 while current_retry < max_retries {
184 match queue.push(item.clone()) {
185 Ok(_) => return Ok(()),
186 Err(_) => {
187 current_retry += 1;
188 if current_retry == max_retries {
189 return Err(EventError::QueuePushError);
190 }
191 sleep(Duration::from_millis(100 * 2_u64.pow(current_retry))).await;
193 }
194 }
195 }
196
197 Err(EventError::QueuePushRetryError)
198 }
199}