scouter_events/queue/custom/
queue.rs1use crate::error::EventError;
2use crate::producer::RustScouterProducer;
3use crate::queue::bus::TaskState;
4use crate::queue::custom::feature_queue::CustomMetricFeatureQueue;
5use crate::queue::traits::{BackgroundTask, QueueMethods};
6use crate::queue::types::TransportConfig;
7use async_trait::async_trait;
8use chrono::{DateTime, Utc};
9use crossbeam_queue::ArrayQueue;
10use scouter_types::custom::CustomDriftProfile;
11use scouter_types::Metrics;
12use std::sync::Arc;
13use std::sync::RwLock;
14use tokio::runtime;
15use tokio_util::sync::CancellationToken;
16
17pub struct CustomQueue {
31 queue: Arc<ArrayQueue<Metrics>>,
32 feature_queue: Arc<CustomMetricFeatureQueue>,
33 producer: RustScouterProducer,
34 last_publish: Arc<RwLock<DateTime<Utc>>>,
35 capacity: usize,
36}
37
38impl CustomQueue {
39 pub async fn new(
40 drift_profile: CustomDriftProfile,
41 config: TransportConfig,
42 runtime: Arc<runtime::Runtime>,
43 task_state: &mut TaskState,
44 identifier: String,
45 ) -> Result<Self, EventError> {
46 let sample_size = drift_profile.config.sample_size;
47
48 let metrics_queue = Arc::new(ArrayQueue::new(sample_size * 2));
50 let feature_queue = Arc::new(CustomMetricFeatureQueue::new(drift_profile));
51 let last_publish = Arc::new(RwLock::new(Utc::now()));
52
53 let producer = RustScouterProducer::new(config).await?;
54 let cancellation_token = CancellationToken::new();
55
56 let custom_queue = CustomQueue {
57 queue: metrics_queue.clone(),
58 feature_queue: feature_queue.clone(),
59 producer,
60 last_publish,
61 capacity: sample_size,
62 };
63 let handle = custom_queue.start_background_task(
64 metrics_queue,
65 feature_queue,
66 custom_queue.producer.clone(),
67 custom_queue.last_publish.clone(),
68 runtime.clone(),
69 custom_queue.capacity,
70 identifier,
71 task_state.clone(),
72 cancellation_token.clone(),
73 )?;
74
75 task_state.add_background_abort_handle(handle);
76 task_state.add_background_cancellation_token(cancellation_token);
77
78 Ok(custom_queue)
79 }
80}
81
82impl BackgroundTask for CustomQueue {
86 type DataItem = Metrics;
87 type Processor = CustomMetricFeatureQueue;
88}
89
90#[async_trait]
91impl QueueMethods for CustomQueue {
93 type ItemType = Metrics;
94 type FeatureQueue = CustomMetricFeatureQueue;
95
96 fn capacity(&self) -> usize {
97 self.capacity
98 }
99
100 fn get_producer(&mut self) -> &mut RustScouterProducer {
101 &mut self.producer
102 }
103
104 fn queue(&self) -> Arc<ArrayQueue<Self::ItemType>> {
105 self.queue.clone()
106 }
107
108 fn feature_queue(&self) -> Arc<Self::FeatureQueue> {
109 self.feature_queue.clone()
110 }
111
112 fn last_publish(&self) -> Arc<RwLock<DateTime<Utc>>> {
113 self.last_publish.clone()
114 }
115
116 fn should_process(&self, current_count: usize) -> bool {
117 current_count >= self.capacity()
118 }
119
120 async fn flush(&mut self) -> Result<(), EventError> {
121 self.try_publish(self.queue()).await?;
123 self.producer.flush().await?;
124
125 Ok(())
126 }
127}