scouter_events/queue/spc/
queue.rs1use crate::error::EventError;
2use crate::producer::RustScouterProducer;
3use crate::queue::spc::feature_queue::SpcFeatureQueue;
4use crate::queue::traits::QueueMethods;
5use crate::queue::types::TransportConfig;
6use async_trait::async_trait;
7use chrono::{DateTime, Utc};
8use crossbeam_queue::ArrayQueue;
9use scouter_types::spc::SpcDriftProfile;
10use scouter_types::Features;
11use std::sync::Arc;
12use std::sync::RwLock;
13
14pub struct SpcQueue {
15 queue: Arc<ArrayQueue<Features>>,
16 feature_queue: Arc<SpcFeatureQueue>,
17 producer: RustScouterProducer,
18 last_publish: Arc<RwLock<DateTime<Utc>>>,
19 capacity: usize,
20}
21
22impl SpcQueue {
23 pub async fn new(
24 drift_profile: SpcDriftProfile,
25 config: TransportConfig,
26 ) -> Result<Self, EventError> {
27 let sample_size = drift_profile.config.sample_size;
28 let queue = Arc::new(ArrayQueue::new(sample_size * 2)); let feature_queue = Arc::new(SpcFeatureQueue::new(drift_profile));
30 let last_publish: Arc<RwLock<DateTime<Utc>>> = Arc::new(RwLock::new(Utc::now()));
31 let producer = RustScouterProducer::new(config).await?;
32
33 Ok(SpcQueue {
34 queue,
35 feature_queue,
36 producer,
37 last_publish,
38 capacity: sample_size,
39 })
40 }
41}
42
43#[async_trait]
44impl QueueMethods for SpcQueue {
45 type ItemType = Features;
46 type FeatureQueue = SpcFeatureQueue;
47
48 fn capacity(&self) -> usize {
49 self.capacity
50 }
51
52 fn get_producer(&mut self) -> &mut RustScouterProducer {
53 &mut self.producer
54 }
55
56 fn queue(&self) -> Arc<ArrayQueue<Self::ItemType>> {
57 self.queue.clone()
58 }
59
60 fn feature_queue(&self) -> Arc<Self::FeatureQueue> {
61 self.feature_queue.clone()
62 }
63
64 fn last_publish(&self) -> Arc<RwLock<DateTime<Utc>>> {
65 self.last_publish.clone()
66 }
67
68 fn should_process(&self, current_count: usize) -> bool {
69 current_count >= self.capacity()
70 }
71
72 async fn flush(&mut self) -> Result<(), EventError> {
73 self.producer.flush().await?;
74 Ok(())
75 }
76}