scouter_events/queue/custom/
queue.rs

1use crate::error::EventError;
2use crate::producer::RustScouterProducer;
3use crate::queue::bus::TaskState;
4use crate::queue::custom::feature_queue::CustomMetricFeatureQueue;
5use crate::queue::traits::{BackgroundTask, QueueMethods};
6use crate::queue::types::TransportConfig;
7use async_trait::async_trait;
8use chrono::{DateTime, Utc};
9use crossbeam_queue::ArrayQueue;
10use scouter_types::custom::CustomDriftProfile;
11use scouter_types::Metrics;
12use std::sync::Arc;
13use std::sync::RwLock;
14use tokio::runtime;
15use tokio_util::sync::CancellationToken;
16
17/// The following code is a custom queue implementation for handling custom metrics.
18/// It consists of a `CustomQueue` struct that manages a queue of metrics and a background task
19///
20/// components:
21/// - `metrics_queue`: Crossbeam queue for storing metrics.
22/// - `feature_queue`: A Custom metric helper for converting batches of metrics to drift records.
23/// - `producer`: A RustScouter producer for publishing records.
24/// - `count`: Atomic counter for tracking the number of metrics.
25/// - `last_publish`: A timestamp for the last publish time.
26/// - `stop_tx`: A channel for stopping the background task.
27/// - `rt`: A Tokio runtime for executing asynchronous tasks.
28/// - `sample_size`: The size of the sample.
29/// - `sample`: A boolean indicating whether to sample metrics.
30pub struct CustomQueue {
31    queue: Arc<ArrayQueue<Metrics>>,
32    feature_queue: Arc<CustomMetricFeatureQueue>,
33    producer: RustScouterProducer,
34    last_publish: Arc<RwLock<DateTime<Utc>>>,
35    capacity: usize,
36}
37
38impl CustomQueue {
39    pub async fn new(
40        drift_profile: CustomDriftProfile,
41        config: TransportConfig,
42        runtime: Arc<runtime::Runtime>,
43        task_state: &mut TaskState,
44        identifier: String,
45    ) -> Result<Self, EventError> {
46        let sample_size = drift_profile.config.sample_size;
47
48        // ArrayQueue size is based on sample size
49        let metrics_queue = Arc::new(ArrayQueue::new(sample_size * 2));
50        let feature_queue = Arc::new(CustomMetricFeatureQueue::new(drift_profile));
51        let last_publish = Arc::new(RwLock::new(Utc::now()));
52
53        let producer = RustScouterProducer::new(config).await?;
54        let cancellation_token = CancellationToken::new();
55
56        let custom_queue = CustomQueue {
57            queue: metrics_queue.clone(),
58            feature_queue: feature_queue.clone(),
59            producer,
60            last_publish,
61            capacity: sample_size,
62        };
63        let handle = custom_queue.start_background_task(
64            metrics_queue,
65            feature_queue,
66            custom_queue.producer.clone(),
67            custom_queue.last_publish.clone(),
68            runtime.clone(),
69            custom_queue.capacity,
70            identifier,
71            task_state.clone(),
72            cancellation_token.clone(),
73        )?;
74
75        task_state.add_background_abort_handle(handle);
76        task_state.add_background_cancellation_token(cancellation_token);
77
78        Ok(custom_queue)
79    }
80}
81
82/// Custom requires a background timed-task as a secondary processing mechanism
83/// i.e. Its possible that queue insertion is slow, and so we need a background
84/// task to process the queue at a regular interval
85impl BackgroundTask for CustomQueue {
86    type DataItem = Metrics;
87    type Processor = CustomMetricFeatureQueue;
88}
89
90#[async_trait]
91/// Implementing primary methods
92impl QueueMethods for CustomQueue {
93    type ItemType = Metrics;
94    type FeatureQueue = CustomMetricFeatureQueue;
95
96    fn capacity(&self) -> usize {
97        self.capacity
98    }
99
100    fn get_producer(&mut self) -> &mut RustScouterProducer {
101        &mut self.producer
102    }
103
104    fn queue(&self) -> Arc<ArrayQueue<Self::ItemType>> {
105        self.queue.clone()
106    }
107
108    fn feature_queue(&self) -> Arc<Self::FeatureQueue> {
109        self.feature_queue.clone()
110    }
111
112    fn last_publish(&self) -> Arc<RwLock<DateTime<Utc>>> {
113        self.last_publish.clone()
114    }
115
116    fn should_process(&self, current_count: usize) -> bool {
117        current_count >= self.capacity()
118    }
119
120    async fn flush(&mut self) -> Result<(), EventError> {
121        // publish any remaining drift records
122        self.try_publish(self.queue()).await?;
123        self.producer.flush().await?;
124
125        Ok(())
126    }
127}