scouter_events/queue/
bus.rs

1use crate::error::{EventError, PyEventError};
2use pyo3::prelude::*;
3use scouter_types::QueueItem;
4use std::sync::Arc;
5use std::sync::RwLock;
6use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
7use tokio::sync::oneshot;
8use tracing::{debug, instrument};
9
10#[derive(Debug)]
11pub enum Event {
12    Task(QueueItem),
13    Init,
14}
15
16/// QueueBus is an mpsc bus that allows for publishing events to subscribers.
17/// It leverage an unbounded channel
18/// Primary way to publish non-blocking events to background queues with ScouterQueue
19#[pyclass(name = "Queue")]
20pub struct QueueBus {
21    tx: UnboundedSender<Event>,
22    shutdown_tx: Option<oneshot::Sender<()>>,
23    pub initialized: Arc<RwLock<bool>>,
24}
25
26impl QueueBus {
27    #[instrument(skip_all)]
28    pub fn new() -> (Self, UnboundedReceiver<Event>, oneshot::Receiver<()>) {
29        debug!("Creating unbounded QueueBus");
30        let (tx, rx) = mpsc::unbounded_channel();
31        let (shutdown_tx, shutdown_rx) = oneshot::channel();
32        let initialized = Arc::new(RwLock::new(false));
33
34        (
35            Self {
36                tx,
37                shutdown_tx: Some(shutdown_tx),
38                initialized,
39            },
40            rx,
41            shutdown_rx,
42        )
43    }
44
45    #[instrument(skip_all)]
46    pub fn publish(&self, event: Event) -> Result<(), EventError> {
47        Ok(self.tx.send(event)?)
48    }
49
50    pub fn is_initialized(&self) -> bool {
51        // Check if the bus is initialized
52        if let Ok(initialized) = self.initialized.read() {
53            *initialized
54        } else {
55            false
56        }
57    }
58}
59
60#[pymethods]
61impl QueueBus {
62    /// Insert an event to the bus
63    ///
64    /// # Arguments
65    /// * `event` - The event to publish
66    pub fn insert(&mut self, entity: &Bound<'_, PyAny>) -> Result<(), PyEventError> {
67        let entity = QueueItem::from_py_entity(entity)?;
68        debug!("Inserting event into QueueBus: {:?}", entity);
69        let event = Event::Task(entity);
70        self.publish(event)?;
71        Ok(())
72    }
73
74    /// Shutdown the bus
75    /// This will send a messages to the background queue, which will trigger a flush on the queue
76    #[instrument(skip_all)]
77    pub fn shutdown(&mut self) {
78        // Signal shutdown
79        if let Some(shutdown_tx) = self.shutdown_tx.take() {
80            let _ = shutdown_tx.send(());
81        }
82    }
83}
84
85impl QueueBus {
86    /// Check if the bus is initialized
87    pub fn init(&self) -> Result<(), EventError> {
88        std::thread::sleep(std::time::Duration::from_millis(20));
89        let mut attempts = 0;
90        while !self.is_initialized() {
91            debug!("QueueBus is not initialized, waiting...");
92            if attempts >= 100 {
93                return Err(EventError::InitializationError);
94            }
95            attempts += 1;
96            std::thread::sleep(std::time::Duration::from_millis(10));
97
98            let event = Event::Init;
99            debug!("Initializing QueueBus");
100            self.publish(event)?;
101        }
102        Ok(())
103    }
104}