scouter_events/queue/psi/
queue.rs1use crate::error::EventError;
2use crate::producer::RustScouterProducer;
3use crate::queue::psi::feature_queue::PsiFeatureQueue;
4use crate::queue::traits::{BackgroundTask, QueueMethods};
5use crate::queue::types::TransportConfig;
6use async_trait::async_trait;
7use chrono::{DateTime, Utc};
8use crossbeam_queue::ArrayQueue;
9use scouter_types::psi::PsiDriftProfile;
10use scouter_types::Features;
11use std::sync::Arc;
12use std::sync::RwLock;
13use tokio::runtime;
14use tokio::sync::watch;
15use tracing::debug;
16
17const PSI_MAX_QUEUE_SIZE: usize = 1000;
18
19pub struct PsiQueue {
20 queue: Arc<ArrayQueue<Features>>,
21 feature_queue: Arc<PsiFeatureQueue>,
22 producer: RustScouterProducer,
23 last_publish: Arc<RwLock<DateTime<Utc>>>,
24 stop_tx: Option<watch::Sender<()>>,
25 capacity: usize,
26}
27
28impl PsiQueue {
29 pub async fn new(
30 drift_profile: PsiDriftProfile,
31 config: TransportConfig,
32 runtime: Arc<runtime::Runtime>,
33 ) -> Result<Self, EventError> {
34 let queue = Arc::new(ArrayQueue::new(PSI_MAX_QUEUE_SIZE * 2));
37 let feature_queue = Arc::new(PsiFeatureQueue::new(drift_profile));
38 let last_publish: Arc<RwLock<DateTime<Utc>>> = Arc::new(RwLock::new(Utc::now()));
39 let producer = RustScouterProducer::new(config).await?;
40
41 debug!("Creating PSI Queue");
42
43 let (stop_tx, stop_rx) = watch::channel(());
44
45 let psi_queue = PsiQueue {
46 queue: queue.clone(),
47 feature_queue: feature_queue.clone(),
48 producer,
49 last_publish,
50 stop_tx: Some(stop_tx),
51 capacity: PSI_MAX_QUEUE_SIZE,
52 };
53
54 debug!("Starting Background Task");
55 psi_queue.start_background_worker(queue, feature_queue, stop_rx, runtime)?;
56
57 Ok(psi_queue)
58 }
59
60 fn start_background_worker(
61 &self,
62 metrics_queue: Arc<ArrayQueue<Features>>,
63 feature_queue: Arc<PsiFeatureQueue>,
64 stop_rx: watch::Receiver<()>,
65 rt: Arc<tokio::runtime::Runtime>,
66 ) -> Result<(), EventError> {
67 self.start_background_task(
68 metrics_queue,
69 feature_queue,
70 self.producer.clone(),
71 self.last_publish.clone(),
72 rt.clone(),
73 stop_rx,
74 PSI_MAX_QUEUE_SIZE,
75 "Psi Background Polling",
76 )
77 }
78}
79
80#[async_trait]
81impl QueueMethods for PsiQueue {
83 type ItemType = Features;
84 type FeatureQueue = PsiFeatureQueue;
85
86 fn capacity(&self) -> usize {
87 self.capacity
88 }
89
90 fn get_producer(&mut self) -> &mut RustScouterProducer {
91 &mut self.producer
92 }
93
94 fn queue(&self) -> Arc<ArrayQueue<Self::ItemType>> {
95 self.queue.clone()
96 }
97
98 fn feature_queue(&self) -> Arc<Self::FeatureQueue> {
99 self.feature_queue.clone()
100 }
101
102 fn last_publish(&self) -> Arc<RwLock<DateTime<Utc>>> {
103 self.last_publish.clone()
104 }
105
106 fn should_process(&self, current_count: usize) -> bool {
107 current_count >= self.capacity()
108 }
109
110 async fn flush(&mut self) -> Result<(), EventError> {
111 self.try_publish(self.queue()).await?;
113 if let Some(stop_tx) = self.stop_tx.take() {
114 let _ = stop_tx.send(());
115 }
116 self.producer.flush().await
117 }
118}
119
120impl BackgroundTask for PsiQueue {
124 type DataItem = Features;
125 type Processor = PsiFeatureQueue;
126}