scouter_events/queue/psi/
queue.rs

1use crate::error::EventError;
2use crate::producer::RustScouterProducer;
3use crate::queue::psi::feature_queue::PsiFeatureQueue;
4use crate::queue::traits::{BackgroundTask, QueueMethods};
5use crate::queue::types::TransportConfig;
6use async_trait::async_trait;
7use chrono::{DateTime, Utc};
8use crossbeam_queue::ArrayQueue;
9use scouter_types::psi::PsiDriftProfile;
10use scouter_types::Features;
11use std::sync::Arc;
12use std::sync::RwLock;
13use tokio::runtime;
14use tokio::sync::watch;
15use tracing::debug;
16
17const PSI_MAX_QUEUE_SIZE: usize = 1000;
18
19pub struct PsiQueue {
20    queue: Arc<ArrayQueue<Features>>,
21    feature_queue: Arc<PsiFeatureQueue>,
22    producer: RustScouterProducer,
23    last_publish: Arc<RwLock<DateTime<Utc>>>,
24    stop_tx: Option<watch::Sender<()>>,
25    capacity: usize,
26}
27
28impl PsiQueue {
29    pub async fn new(
30        drift_profile: PsiDriftProfile,
31        config: TransportConfig,
32        runtime: Arc<runtime::Runtime>,
33    ) -> Result<Self, EventError> {
34        // ArrayQueue size is based on the max PSI queue size
35
36        let queue = Arc::new(ArrayQueue::new(PSI_MAX_QUEUE_SIZE * 2));
37        let feature_queue = Arc::new(PsiFeatureQueue::new(drift_profile));
38        let last_publish: Arc<RwLock<DateTime<Utc>>> = Arc::new(RwLock::new(Utc::now()));
39        let producer = RustScouterProducer::new(config).await?;
40
41        debug!("Creating PSI Queue");
42
43        let (stop_tx, stop_rx) = watch::channel(());
44
45        let psi_queue = PsiQueue {
46            queue: queue.clone(),
47            feature_queue: feature_queue.clone(),
48            producer,
49            last_publish,
50            stop_tx: Some(stop_tx),
51            capacity: PSI_MAX_QUEUE_SIZE,
52        };
53
54        debug!("Starting Background Task");
55        psi_queue.start_background_worker(queue, feature_queue, stop_rx, runtime)?;
56
57        Ok(psi_queue)
58    }
59
60    fn start_background_worker(
61        &self,
62        metrics_queue: Arc<ArrayQueue<Features>>,
63        feature_queue: Arc<PsiFeatureQueue>,
64        stop_rx: watch::Receiver<()>,
65        rt: Arc<tokio::runtime::Runtime>,
66    ) -> Result<(), EventError> {
67        self.start_background_task(
68            metrics_queue,
69            feature_queue,
70            self.producer.clone(),
71            self.last_publish.clone(),
72            rt.clone(),
73            stop_rx,
74            PSI_MAX_QUEUE_SIZE,
75            "Psi Background Polling",
76        )
77    }
78}
79
80#[async_trait]
81/// Implementing primary methods
82impl QueueMethods for PsiQueue {
83    type ItemType = Features;
84    type FeatureQueue = PsiFeatureQueue;
85
86    fn capacity(&self) -> usize {
87        self.capacity
88    }
89
90    fn get_producer(&mut self) -> &mut RustScouterProducer {
91        &mut self.producer
92    }
93
94    fn queue(&self) -> Arc<ArrayQueue<Self::ItemType>> {
95        self.queue.clone()
96    }
97
98    fn feature_queue(&self) -> Arc<Self::FeatureQueue> {
99        self.feature_queue.clone()
100    }
101
102    fn last_publish(&self) -> Arc<RwLock<DateTime<Utc>>> {
103        self.last_publish.clone()
104    }
105
106    fn should_process(&self, current_count: usize) -> bool {
107        current_count >= self.capacity()
108    }
109
110    async fn flush(&mut self) -> Result<(), EventError> {
111        // publish any remaining drift records
112        self.try_publish(self.queue()).await?;
113        if let Some(stop_tx) = self.stop_tx.take() {
114            let _ = stop_tx.send(());
115        }
116        self.producer.flush().await
117    }
118}
119
120/// Psi requires a background timed-task as a secondary processing mechanism
121/// i.e. Its possible that queue insertion is slow, and so we need a background
122/// task to process the queue at a regular interval
123impl BackgroundTask for PsiQueue {
124    type DataItem = Features;
125    type Processor = PsiFeatureQueue;
126}