scouter_events/queue/psi/
queue.rs1use crate::error::EventError;
2use crate::producer::RustScouterProducer;
3use crate::queue::bus::TaskState;
4use crate::queue::psi::feature_queue::PsiFeatureQueue;
5use crate::queue::traits::{BackgroundTask, QueueMethods};
6use crate::queue::types::TransportConfig;
7use async_trait::async_trait;
8use chrono::{DateTime, Utc};
9use crossbeam_queue::ArrayQueue;
10use scouter_types::psi::PsiDriftProfile;
11use scouter_types::Features;
12use std::sync::Arc;
13use std::sync::RwLock;
14use tokio::runtime;
15use tokio_util::sync::CancellationToken;
16use tracing::debug;
17
18const PSI_MAX_QUEUE_SIZE: usize = 1000;
19
20pub struct PsiQueue {
21 queue: Arc<ArrayQueue<Features>>,
22 feature_queue: Arc<PsiFeatureQueue>,
23 producer: RustScouterProducer,
24 last_publish: Arc<RwLock<DateTime<Utc>>>,
25 capacity: usize,
26}
27
28impl PsiQueue {
29 pub async fn new(
30 drift_profile: PsiDriftProfile,
31 config: TransportConfig,
32 runtime: Arc<runtime::Runtime>,
33 task_state: &mut TaskState,
34 identifier: String,
35 ) -> Result<Self, EventError> {
36 let queue = Arc::new(ArrayQueue::new(PSI_MAX_QUEUE_SIZE * 2));
39 let feature_queue = Arc::new(PsiFeatureQueue::new(drift_profile));
40 let last_publish: Arc<RwLock<DateTime<Utc>>> = Arc::new(RwLock::new(Utc::now()));
41 let producer = RustScouterProducer::new(config).await?;
42 let cancellation_token = CancellationToken::new();
43
44 let psi_queue = PsiQueue {
45 queue: queue.clone(),
46 feature_queue: feature_queue.clone(),
47 producer,
48 last_publish,
49 capacity: PSI_MAX_QUEUE_SIZE,
50 };
51
52 let handle = psi_queue.start_background_task(
53 queue,
54 feature_queue,
55 psi_queue.producer.clone(),
56 psi_queue.last_publish.clone(),
57 runtime.clone(),
58 PSI_MAX_QUEUE_SIZE,
59 identifier,
60 task_state.clone(),
61 cancellation_token.clone(),
62 )?;
63
64 task_state.add_background_abort_handle(handle);
65 task_state.add_background_cancellation_token(cancellation_token);
66
67 debug!("Created PSI Queue with capacity: {}", PSI_MAX_QUEUE_SIZE);
68
69 Ok(psi_queue)
70 }
71}
72
73impl BackgroundTask for PsiQueue {
77 type DataItem = Features;
78 type Processor = PsiFeatureQueue;
79}
80
81#[async_trait]
82impl QueueMethods for PsiQueue {
84 type ItemType = Features;
85 type FeatureQueue = PsiFeatureQueue;
86
87 fn capacity(&self) -> usize {
88 self.capacity
89 }
90
91 fn get_producer(&mut self) -> &mut RustScouterProducer {
92 &mut self.producer
93 }
94
95 fn queue(&self) -> Arc<ArrayQueue<Self::ItemType>> {
96 self.queue.clone()
97 }
98
99 fn feature_queue(&self) -> Arc<Self::FeatureQueue> {
100 self.feature_queue.clone()
101 }
102
103 fn last_publish(&self) -> Arc<RwLock<DateTime<Utc>>> {
104 self.last_publish.clone()
105 }
106
107 fn should_process(&self, current_count: usize) -> bool {
108 current_count >= self.capacity()
109 }
110
111 async fn flush(&mut self) -> Result<(), EventError> {
112 self.try_publish(self.queue()).await?;
114 self.producer.flush().await?;
115
116 Ok(())
117 }
118}