scouter_events/queue/
bus.rs

1use crate::error::{EventError, PyEventError};
2use pyo3::prelude::*;
3use scouter_types::QueueItem;
4use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
5use tokio::sync::oneshot;
6use tracing::{debug, instrument};
7
8#[derive(Debug)]
9pub enum Event {
10    Task(QueueItem),
11}
12
13/// QueueBus is an mpsc bus that allows for publishing events to subscribers.
14/// It leverage an unbounded channel
15/// Primary way to publish non-blocking events to background queues with ScouterQueue
16#[pyclass(name = "Queue")]
17pub struct QueueBus {
18    tx: UnboundedSender<Event>,
19    shutdown_tx: Option<oneshot::Sender<()>>,
20}
21
22impl QueueBus {
23    #[instrument(skip_all)]
24    pub fn new() -> (Self, UnboundedReceiver<Event>, oneshot::Receiver<()>) {
25        debug!("Creating unbounded QueueBus");
26        let (tx, rx) = mpsc::unbounded_channel();
27        let (shutdown_tx, shutdown_rx) = oneshot::channel();
28
29        (
30            Self {
31                tx,
32                shutdown_tx: Some(shutdown_tx),
33            },
34            rx,
35            shutdown_rx,
36        )
37    }
38
39    #[instrument(skip_all)]
40    pub fn publish(&self, event: Event) -> Result<(), EventError> {
41        Ok(self.tx.send(event)?)
42    }
43}
44
45#[pymethods]
46impl QueueBus {
47    /// Insert an event to the bus
48    ///
49    /// # Arguments
50    /// * `event` - The event to publish
51    pub fn insert(&mut self, entity: &Bound<'_, PyAny>) -> Result<(), PyEventError> {
52        let entity = QueueItem::from_py_entity(entity)?;
53        let event = Event::Task(entity);
54        self.publish(event)?;
55        Ok(())
56    }
57
58    /// Shutdown the bus
59    /// This will send a messages to the background queue, which will trigger a flush on the queue
60    #[instrument(skip_all)]
61    pub fn shutdown(&mut self) {
62        // Signal shutdown
63        if let Some(shutdown_tx) = self.shutdown_tx.take() {
64            let _ = shutdown_tx.send(());
65        }
66    }
67}