scouter_events/queue/
bus.rs1use crate::error::{EventError, PyEventError};
2use pyo3::prelude::*;
3use scouter_types::QueueItem;
4use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
5use tokio::sync::oneshot;
6use tracing::{debug, instrument};
7
8#[derive(Debug)]
9pub enum Event {
10 Task(QueueItem),
11}
12
13#[pyclass(name = "Queue")]
17pub struct QueueBus {
18 tx: UnboundedSender<Event>,
19 shutdown_tx: Option<oneshot::Sender<()>>,
20}
21
22impl QueueBus {
23 #[instrument(skip_all)]
24 pub fn new() -> (Self, UnboundedReceiver<Event>, oneshot::Receiver<()>) {
25 debug!("Creating unbounded QueueBus");
26 let (tx, rx) = mpsc::unbounded_channel();
27 let (shutdown_tx, shutdown_rx) = oneshot::channel();
28
29 (
30 Self {
31 tx,
32 shutdown_tx: Some(shutdown_tx),
33 },
34 rx,
35 shutdown_rx,
36 )
37 }
38
39 #[instrument(skip_all)]
40 pub fn publish(&self, event: Event) -> Result<(), EventError> {
41 Ok(self.tx.send(event)?)
42 }
43}
44
45#[pymethods]
46impl QueueBus {
47 pub fn insert(&mut self, entity: &Bound<'_, PyAny>) -> Result<(), PyEventError> {
52 let entity = QueueItem::from_py_entity(entity)?;
53 let event = Event::Task(entity);
54 self.publish(event)?;
55 Ok(())
56 }
57
58 #[instrument(skip_all)]
61 pub fn shutdown(&mut self) {
62 if let Some(shutdown_tx) = self.shutdown_tx.take() {
64 let _ = shutdown_tx.send(());
65 }
66 }
67}