scouter_events/queue/spc/
queue.rs

1use crate::error::EventError;
2use crate::producer::RustScouterProducer;
3use crate::queue::spc::feature_queue::SpcFeatureQueue;
4use crate::queue::traits::QueueMethods;
5use crate::queue::types::TransportConfig;
6use async_trait::async_trait;
7use chrono::{DateTime, Utc};
8use crossbeam_queue::ArrayQueue;
9use scouter_types::spc::SpcDriftProfile;
10use scouter_types::Features;
11use std::sync::Arc;
12use std::sync::RwLock;
13
14pub struct SpcQueue {
15    queue: Arc<ArrayQueue<Features>>,
16    feature_queue: Arc<SpcFeatureQueue>,
17    producer: RustScouterProducer,
18    last_publish: Arc<RwLock<DateTime<Utc>>>,
19    capacity: usize,
20}
21
22impl SpcQueue {
23    pub async fn new(
24        drift_profile: SpcDriftProfile,
25        config: TransportConfig,
26    ) -> Result<Self, EventError> {
27        let sample_size = drift_profile.config.sample_size;
28        let queue = Arc::new(ArrayQueue::new(sample_size * 2)); // Add extra space for buffer
29        let feature_queue = Arc::new(SpcFeatureQueue::new(drift_profile));
30        let last_publish: Arc<RwLock<DateTime<Utc>>> = Arc::new(RwLock::new(Utc::now()));
31        let producer = RustScouterProducer::new(config).await?;
32
33        Ok(SpcQueue {
34            queue,
35            feature_queue,
36            producer,
37            last_publish,
38            capacity: sample_size,
39        })
40    }
41}
42
43#[async_trait]
44impl QueueMethods for SpcQueue {
45    type ItemType = Features;
46    type FeatureQueue = SpcFeatureQueue;
47
48    fn capacity(&self) -> usize {
49        self.capacity
50    }
51
52    fn get_producer(&mut self) -> &mut RustScouterProducer {
53        &mut self.producer
54    }
55
56    fn queue(&self) -> Arc<ArrayQueue<Self::ItemType>> {
57        self.queue.clone()
58    }
59
60    fn feature_queue(&self) -> Arc<Self::FeatureQueue> {
61        self.feature_queue.clone()
62    }
63
64    fn last_publish(&self) -> Arc<RwLock<DateTime<Utc>>> {
65        self.last_publish.clone()
66    }
67
68    fn should_process(&self, current_count: usize) -> bool {
69        current_count >= self.capacity()
70    }
71
72    async fn flush(&mut self) -> Result<(), EventError> {
73        self.producer.flush().await?;
74        Ok(())
75    }
76}