scouter_events/queue/
bus.rs1use crate::error::{EventError, PyEventError};
2use pyo3::prelude::*;
3use scouter_types::QueueItem;
4use std::sync::Arc;
5use std::sync::RwLock;
6use tokio::sync::mpsc::{self, UnboundedReceiver, UnboundedSender};
7use tokio::sync::oneshot;
8use tracing::{debug, instrument};
9
10#[derive(Debug)]
11pub enum Event {
12 Task(QueueItem),
13 Init,
14}
15
16#[pyclass(name = "Queue")]
20pub struct QueueBus {
21 tx: UnboundedSender<Event>,
22 shutdown_tx: Option<oneshot::Sender<()>>,
23 pub initialized: Arc<RwLock<bool>>,
24}
25
26impl QueueBus {
27 #[instrument(skip_all)]
28 pub fn new() -> (Self, UnboundedReceiver<Event>, oneshot::Receiver<()>) {
29 debug!("Creating unbounded QueueBus");
30 let (tx, rx) = mpsc::unbounded_channel();
31 let (shutdown_tx, shutdown_rx) = oneshot::channel();
32 let initialized = Arc::new(RwLock::new(false));
33
34 (
35 Self {
36 tx,
37 shutdown_tx: Some(shutdown_tx),
38 initialized,
39 },
40 rx,
41 shutdown_rx,
42 )
43 }
44
45 #[instrument(skip_all)]
46 pub fn publish(&self, event: Event) -> Result<(), EventError> {
47 Ok(self.tx.send(event)?)
48 }
49
50 pub fn is_initialized(&self) -> bool {
51 if let Ok(initialized) = self.initialized.read() {
53 *initialized
54 } else {
55 false
56 }
57 }
58}
59
60#[pymethods]
61impl QueueBus {
62 pub fn insert(&mut self, entity: &Bound<'_, PyAny>) -> Result<(), PyEventError> {
67 let entity = QueueItem::from_py_entity(entity)?;
68 debug!("Inserting event into QueueBus: {:?}", entity);
69 let event = Event::Task(entity);
70 self.publish(event)?;
71 Ok(())
72 }
73
74 #[instrument(skip_all)]
77 pub fn shutdown(&mut self) {
78 if let Some(shutdown_tx) = self.shutdown_tx.take() {
80 let _ = shutdown_tx.send(());
81 }
82 }
83}
84
85impl QueueBus {
86 pub fn init(&self) -> Result<(), EventError> {
88 std::thread::sleep(std::time::Duration::from_millis(20));
89 let mut attempts = 0;
90 while !self.is_initialized() {
91 debug!("QueueBus is not initialized, waiting...");
92 if attempts >= 100 {
93 return Err(EventError::InitializationError);
94 }
95 attempts += 1;
96 std::thread::sleep(std::time::Duration::from_millis(10));
97
98 let event = Event::Init;
99 debug!("Initializing QueueBus");
100 self.publish(event)?;
101 }
102 Ok(())
103 }
104}