scouter_events/queue/custom/
queue.rs1use crate::error::EventError;
2use crate::producer::RustScouterProducer;
3use crate::queue::custom::feature_queue::CustomMetricFeatureQueue;
4use crate::queue::traits::BackgroundTask;
5use crate::queue::traits::QueueMethods;
6use crate::queue::types::TransportConfig;
7use async_trait::async_trait;
8use chrono::{DateTime, Utc};
9use crossbeam_queue::ArrayQueue;
10use scouter_types::custom::CustomDriftProfile;
11use scouter_types::Metrics;
12use std::sync::Arc;
13use std::sync::RwLock;
14use tokio::runtime;
15use tokio::sync::watch;
16use tracing::debug;
17
18pub struct CustomQueue {
32 queue: Arc<ArrayQueue<Metrics>>,
33 feature_queue: Arc<CustomMetricFeatureQueue>,
34 producer: RustScouterProducer,
35 last_publish: Arc<RwLock<DateTime<Utc>>>,
36 stop_tx: Option<watch::Sender<()>>,
37 capacity: usize,
38}
39
40impl CustomQueue {
41 pub async fn new(
42 drift_profile: CustomDriftProfile,
43 config: TransportConfig,
44 runtime: Arc<runtime::Runtime>,
45 ) -> Result<Self, EventError> {
46 let sample_size = drift_profile.config.sample_size;
47
48 debug!("Creating Custom Metric Queue");
49 let metrics_queue = Arc::new(ArrayQueue::new(sample_size * 2));
51 let feature_queue = Arc::new(CustomMetricFeatureQueue::new(drift_profile));
52 let last_publish = Arc::new(RwLock::new(Utc::now()));
53
54 debug!("Creating Producer");
55 let producer = RustScouterProducer::new(config).await?;
56
57 let (stop_tx, stop_rx) = watch::channel(());
58
59 let custom_queue = CustomQueue {
60 queue: metrics_queue.clone(),
61 feature_queue: feature_queue.clone(),
62 producer,
63 last_publish,
64 stop_tx: Some(stop_tx),
65
66 capacity: sample_size,
67 };
68
69 debug!("Starting Background Task");
70 custom_queue.start_background_worker(metrics_queue, feature_queue, stop_rx, runtime)?;
71
72 Ok(custom_queue)
73 }
74
75 fn start_background_worker(
76 &self,
77 metrics_queue: Arc<ArrayQueue<Metrics>>,
78 feature_queue: Arc<CustomMetricFeatureQueue>,
79 stop_rx: watch::Receiver<()>,
80 rt: Arc<tokio::runtime::Runtime>,
81 ) -> Result<(), EventError> {
82 self.start_background_task(
83 metrics_queue,
84 feature_queue,
85 self.producer.clone(),
86 self.last_publish.clone(),
87 rt.clone(),
88 stop_rx,
89 self.capacity,
90 "Custom Background Polling",
91 )
92 }
93}
94
95impl BackgroundTask for CustomQueue {
96 type DataItem = Metrics;
97 type Processor = CustomMetricFeatureQueue;
98}
99
100#[async_trait]
101impl QueueMethods for CustomQueue {
103 type ItemType = Metrics;
104 type FeatureQueue = CustomMetricFeatureQueue;
105
106 fn capacity(&self) -> usize {
107 self.capacity
108 }
109
110 fn get_producer(&mut self) -> &mut RustScouterProducer {
111 &mut self.producer
112 }
113
114 fn queue(&self) -> Arc<ArrayQueue<Self::ItemType>> {
115 self.queue.clone()
116 }
117
118 fn feature_queue(&self) -> Arc<Self::FeatureQueue> {
119 self.feature_queue.clone()
120 }
121
122 fn last_publish(&self) -> Arc<RwLock<DateTime<Utc>>> {
123 self.last_publish.clone()
124 }
125
126 fn should_process(&self, current_count: usize) -> bool {
127 current_count >= self.capacity()
128 }
129
130 async fn flush(&mut self) -> Result<(), EventError> {
131 self.try_publish(self.queue()).await?;
133 if let Some(stop_tx) = self.stop_tx.take() {
134 let _ = stop_tx.send(());
135 }
136 self.producer.flush().await
137 }
138}