scouter_events/queue/custom/
queue.rs

1use crate::error::EventError;
2use crate::producer::RustScouterProducer;
3use crate::queue::custom::feature_queue::CustomMetricFeatureQueue;
4use crate::queue::traits::BackgroundTask;
5use crate::queue::traits::QueueMethods;
6use crate::queue::types::TransportConfig;
7use async_trait::async_trait;
8use chrono::{DateTime, Utc};
9use crossbeam_queue::ArrayQueue;
10use scouter_types::custom::CustomDriftProfile;
11use scouter_types::Metrics;
12use std::sync::Arc;
13use std::sync::RwLock;
14use tokio::runtime;
15use tokio::sync::watch;
16use tracing::debug;
17
18/// The following code is a custom queue implementation for handling custom metrics.
19/// It consists of a `CustomQueue` struct that manages a queue of metrics and a background task
20///
21/// components:
22/// - `metrics_queue`: Crossbeam queue for storing metrics.
23/// - `feature_queue`: A Custom metric helper for converting batches of metrics to drift records.
24/// - `producer`: A RustScouter producer for publishing records.
25/// - `count`: Atomic counter for tracking the number of metrics.
26/// - `last_publish`: A timestamp for the last publish time.
27/// - `stop_tx`: A channel for stopping the background task.
28/// - `rt`: A Tokio runtime for executing asynchronous tasks.
29/// - `sample_size`: The size of the sample.
30/// - `sample`: A boolean indicating whether to sample metrics.
31pub struct CustomQueue {
32    queue: Arc<ArrayQueue<Metrics>>,
33    feature_queue: Arc<CustomMetricFeatureQueue>,
34    producer: RustScouterProducer,
35    last_publish: Arc<RwLock<DateTime<Utc>>>,
36    stop_tx: Option<watch::Sender<()>>,
37    capacity: usize,
38}
39
40impl CustomQueue {
41    pub async fn new(
42        drift_profile: CustomDriftProfile,
43        config: TransportConfig,
44        runtime: Arc<runtime::Runtime>,
45    ) -> Result<Self, EventError> {
46        let sample_size = drift_profile.config.sample_size;
47
48        debug!("Creating Custom Metric Queue");
49        // ArrayQueue size is based on sample size
50        let metrics_queue = Arc::new(ArrayQueue::new(sample_size * 2));
51        let feature_queue = Arc::new(CustomMetricFeatureQueue::new(drift_profile));
52        let last_publish = Arc::new(RwLock::new(Utc::now()));
53
54        debug!("Creating Producer");
55        let producer = RustScouterProducer::new(config).await?;
56
57        let (stop_tx, stop_rx) = watch::channel(());
58
59        let custom_queue = CustomQueue {
60            queue: metrics_queue.clone(),
61            feature_queue: feature_queue.clone(),
62            producer,
63            last_publish,
64            stop_tx: Some(stop_tx),
65
66            capacity: sample_size,
67        };
68
69        debug!("Starting Background Task");
70        custom_queue.start_background_worker(metrics_queue, feature_queue, stop_rx, runtime)?;
71
72        Ok(custom_queue)
73    }
74
75    fn start_background_worker(
76        &self,
77        metrics_queue: Arc<ArrayQueue<Metrics>>,
78        feature_queue: Arc<CustomMetricFeatureQueue>,
79        stop_rx: watch::Receiver<()>,
80        rt: Arc<tokio::runtime::Runtime>,
81    ) -> Result<(), EventError> {
82        self.start_background_task(
83            metrics_queue,
84            feature_queue,
85            self.producer.clone(),
86            self.last_publish.clone(),
87            rt.clone(),
88            stop_rx,
89            self.capacity,
90            "Custom Background Polling",
91        )
92    }
93}
94
95impl BackgroundTask for CustomQueue {
96    type DataItem = Metrics;
97    type Processor = CustomMetricFeatureQueue;
98}
99
100#[async_trait]
101/// Implementing primary methods
102impl QueueMethods for CustomQueue {
103    type ItemType = Metrics;
104    type FeatureQueue = CustomMetricFeatureQueue;
105
106    fn capacity(&self) -> usize {
107        self.capacity
108    }
109
110    fn get_producer(&mut self) -> &mut RustScouterProducer {
111        &mut self.producer
112    }
113
114    fn queue(&self) -> Arc<ArrayQueue<Self::ItemType>> {
115        self.queue.clone()
116    }
117
118    fn feature_queue(&self) -> Arc<Self::FeatureQueue> {
119        self.feature_queue.clone()
120    }
121
122    fn last_publish(&self) -> Arc<RwLock<DateTime<Utc>>> {
123        self.last_publish.clone()
124    }
125
126    fn should_process(&self, current_count: usize) -> bool {
127        current_count >= self.capacity()
128    }
129
130    async fn flush(&mut self) -> Result<(), EventError> {
131        // publish any remaining drift records
132        self.try_publish(self.queue()).await?;
133        if let Some(stop_tx) = self.stop_tx.take() {
134            let _ = stop_tx.send(());
135        }
136        self.producer.flush().await
137    }
138}