scouter_events/queue/psi/
queue.rs

1use crate::error::EventError;
2use crate::producer::RustScouterProducer;
3use crate::queue::bus::TaskState;
4use crate::queue::psi::feature_queue::PsiFeatureQueue;
5use crate::queue::traits::{BackgroundTask, QueueMethods};
6use crate::queue::types::TransportConfig;
7use async_trait::async_trait;
8use chrono::{DateTime, Utc};
9use crossbeam_queue::ArrayQueue;
10use scouter_types::psi::PsiDriftProfile;
11use scouter_types::Features;
12use std::sync::Arc;
13use std::sync::RwLock;
14use tokio::runtime;
15use tokio_util::sync::CancellationToken;
16use tracing::debug;
17
18const PSI_MAX_QUEUE_SIZE: usize = 1000;
19
20pub struct PsiQueue {
21    queue: Arc<ArrayQueue<Features>>,
22    feature_queue: Arc<PsiFeatureQueue>,
23    producer: RustScouterProducer,
24    last_publish: Arc<RwLock<DateTime<Utc>>>,
25    capacity: usize,
26}
27
28impl PsiQueue {
29    pub async fn new(
30        drift_profile: PsiDriftProfile,
31        config: TransportConfig,
32        runtime: Arc<runtime::Runtime>,
33        task_state: &mut TaskState,
34        identifier: String,
35    ) -> Result<Self, EventError> {
36        // ArrayQueue size is based on the max PSI queue size
37
38        let queue = Arc::new(ArrayQueue::new(PSI_MAX_QUEUE_SIZE * 2));
39        let feature_queue = Arc::new(PsiFeatureQueue::new(drift_profile));
40        let last_publish: Arc<RwLock<DateTime<Utc>>> = Arc::new(RwLock::new(Utc::now()));
41        let producer = RustScouterProducer::new(config).await?;
42        let cancellation_token = CancellationToken::new();
43
44        let psi_queue = PsiQueue {
45            queue: queue.clone(),
46            feature_queue: feature_queue.clone(),
47            producer,
48            last_publish,
49            capacity: PSI_MAX_QUEUE_SIZE,
50        };
51
52        let handle = psi_queue.start_background_task(
53            queue,
54            feature_queue,
55            psi_queue.producer.clone(),
56            psi_queue.last_publish.clone(),
57            runtime.clone(),
58            PSI_MAX_QUEUE_SIZE,
59            identifier,
60            task_state.clone(),
61            cancellation_token.clone(),
62        )?;
63
64        task_state.add_background_abort_handle(handle);
65        task_state.add_background_cancellation_token(cancellation_token);
66
67        debug!("Created PSI Queue with capacity: {}", PSI_MAX_QUEUE_SIZE);
68
69        Ok(psi_queue)
70    }
71}
72
73/// Psi requires a background timed-task as a secondary processing mechanism
74/// i.e. Its possible that queue insertion is slow, and so we need a background
75/// task to process the queue at a regular interval
76impl BackgroundTask for PsiQueue {
77    type DataItem = Features;
78    type Processor = PsiFeatureQueue;
79}
80
81#[async_trait]
82/// Implementing primary methods
83impl QueueMethods for PsiQueue {
84    type ItemType = Features;
85    type FeatureQueue = PsiFeatureQueue;
86
87    fn capacity(&self) -> usize {
88        self.capacity
89    }
90
91    fn get_producer(&mut self) -> &mut RustScouterProducer {
92        &mut self.producer
93    }
94
95    fn queue(&self) -> Arc<ArrayQueue<Self::ItemType>> {
96        self.queue.clone()
97    }
98
99    fn feature_queue(&self) -> Arc<Self::FeatureQueue> {
100        self.feature_queue.clone()
101    }
102
103    fn last_publish(&self) -> Arc<RwLock<DateTime<Utc>>> {
104        self.last_publish.clone()
105    }
106
107    fn should_process(&self, current_count: usize) -> bool {
108        current_count >= self.capacity()
109    }
110
111    async fn flush(&mut self) -> Result<(), EventError> {
112        // publish any remaining drift records
113        self.try_publish(self.queue()).await?;
114        self.producer.flush().await?;
115
116        Ok(())
117    }
118}