scouter_events/queue/
bus.rs

1use std::sync::Arc;
2
3use crate::error::{EventError, PyEventError};
4use pyo3::prelude::*;
5use scouter_types::QueueItem;
6use std::sync::RwLock;
7use tokio::task::JoinHandle;
8use tokio::time::Duration;
9use tokio::{sync::mpsc::UnboundedSender, task::AbortHandle};
10use tokio_util::sync::CancellationToken;
11use tracing::{debug, instrument, warn};
12
13#[derive(Debug)]
14pub enum Event {
15    Task(QueueItem),
16    Flush,
17}
18
19#[derive(Debug)]
20pub struct Task {
21    pub abort_handle: Option<AbortHandle>,
22    pub running: bool,
23    pub cancel_token: Option<CancellationToken>,
24}
25
26impl Task {
27    pub fn new() -> Self {
28        Self {
29            abort_handle: None,
30            running: false,
31            cancel_token: None,
32        }
33    }
34}
35
36impl Default for Task {
37    fn default() -> Self {
38        Self::new()
39    }
40}
41
42#[derive(Debug, Clone)]
43pub struct TaskState {
44    // track the task that receives events
45    pub event_task: Arc<RwLock<Task>>,
46
47    // track the task that processes background tasks (only applies to psi and custom)
48    pub background_task: Arc<RwLock<Task>>,
49
50    // channel to send events to the event task
51    pub event_tx: UnboundedSender<Event>,
52
53    pub id: String,
54}
55
56impl TaskState {
57    pub fn add_background_cancellation_token(&mut self, token: CancellationToken) {
58        self.background_task.write().unwrap().cancel_token = Some(token);
59    }
60
61    pub fn cancel_background_task(&self) {
62        let cancel_token = &self.background_task.read().unwrap().cancel_token;
63        if let Some(cancel_token) = cancel_token {
64            debug!("Cancelling background task");
65            cancel_token.cancel();
66        }
67    }
68
69    pub fn add_event_cancellation_token(&mut self, token: CancellationToken) {
70        self.event_task.write().unwrap().cancel_token = Some(token);
71    }
72
73    fn flush_event_task(&self) -> Result<(), EventError> {
74        Ok(self.event_tx.send(Event::Flush)?)
75    }
76
77    fn cancel_event_task(&self) {
78        let cancel_token = &self.event_task.read().unwrap().cancel_token;
79        if let Some(cancel_token) = cancel_token {
80            debug!("Cancelling event task");
81            cancel_token.cancel();
82        }
83    }
84
85    pub fn add_event_abort_handle(&mut self, handle: JoinHandle<()>) {
86        self.event_task
87            .write()
88            .unwrap()
89            .abort_handle
90            .replace(handle.abort_handle());
91    }
92
93    pub fn add_background_abort_handle(&mut self, handle: JoinHandle<()>) {
94        self.background_task
95            .write()
96            .unwrap()
97            .abort_handle
98            .replace(handle.abort_handle());
99    }
100
101    pub fn is_event_running(&self) -> bool {
102        self.event_task.read().unwrap().running
103    }
104
105    pub fn has_background_handle(&self) -> bool {
106        self.background_task.read().unwrap().abort_handle.is_some()
107    }
108
109    pub fn is_background_running(&self) -> bool {
110        self.background_task.read().unwrap().running
111    }
112
113    pub fn set_event_running(&self, running: bool) {
114        let mut event_task = self.event_task.write().unwrap();
115        event_task.running = running;
116    }
117
118    pub fn set_background_running(&self, running: bool) {
119        let mut background_task = self.background_task.write().unwrap();
120        background_task.running = running;
121    }
122
123    /// Aborts the background task.
124    /// This will:
125    ///     (1) Send the cancel signal to the background task via the CancellationToken
126    ///     (2) Abort the background task's JoinHandle
127    /// This is intended to be called when shutting down and after
128    /// the associated queue has been flushed
129    fn shutdown_background_task(&self) -> Result<(), EventError> {
130        // check if handle
131        self.cancel_background_task();
132
133        // abort the background task
134        let background_handle = {
135            let guard = self.background_task.write().unwrap().abort_handle.take();
136            guard
137        };
138
139        if let Some(handle) = background_handle {
140            handle.abort();
141            debug!("Background task handle shut down");
142        }
143
144        Ok(())
145    }
146
147    /// Aborts the background task.
148    /// This will:
149    ///     (1) Send the cancel signal to the event task via the CancellationToken
150    ///     (2) Abort the event task's JoinHandle
151    /// This is intended to be called when shutting down and after
152    /// the associated queue has been flushed
153    fn shutdown_event_task(&self) -> Result<(), EventError> {
154        match self.flush_event_task() {
155            Ok(_) => debug!("Event task flush signal sent"),
156            Err(e) => {
157                let error_msg = e.to_string();
158                if error_msg.contains("channel closed") {
159                    debug!("Channel already closed for event task: {}", self.id);
160                } else {
161                    warn!("Failed to send flush signal to event task: {}", e);
162                }
163            }
164        }
165
166        debug!("Waiting 250 ms to allow time for flush before cancelling event task");
167        std::thread::sleep(Duration::from_millis(250));
168
169        self.cancel_event_task();
170
171        // wait 250 ms to allow time for flush before aborting thread
172        debug!("Waiting 250 ms to allow time for flush before aborting event task");
173        std::thread::sleep(Duration::from_millis(250));
174
175        // abort the event task
176        let event_handle = {
177            let guard = self.event_task.write().unwrap().abort_handle.take();
178            guard
179        };
180
181        if let Some(handle) = event_handle {
182            handle.abort();
183            debug!("Event task handle shut down");
184        }
185
186        Ok(())
187    }
188
189    /// Shuts down all async tasks
190    pub fn shutdown_tasks(&self) -> Result<(), EventError> {
191        self.shutdown_background_task()?;
192        self.shutdown_event_task()?;
193        Ok(())
194    }
195}
196
197/// QueueBus is an mpsc bus that allows for publishing events to subscribers.
198/// It leverage an unbounded channel
199/// Primary way to publish non-blocking events to background queues with ScouterQueue
200#[pyclass(name = "Queue")]
201pub struct QueueBus {
202    pub task_state: TaskState,
203
204    #[pyo3(get)]
205    pub identifier: String,
206}
207
208impl QueueBus {
209    #[instrument(skip_all)]
210    pub fn new(task_state: TaskState, identifier: String) -> Self {
211        debug!("Creating unbounded QueueBus");
212
213        Self {
214            task_state,
215            identifier,
216        }
217    }
218
219    #[instrument(skip_all)]
220    pub fn publish(&self, event: Event) -> Result<(), EventError> {
221        Ok(self.task_state.event_tx.send(event)?)
222    }
223}
224
225#[pymethods]
226impl QueueBus {
227    /// Insert an event to the bus
228    ///
229    /// # Arguments
230    /// * `event` - The event to publish
231    pub fn insert(&self, entity: &Bound<'_, PyAny>) -> Result<(), PyEventError> {
232        let entity = QueueItem::from_py_entity(entity)?;
233        debug!("Inserting event into QueueBus: {:?}", entity);
234        let event = Event::Task(entity);
235        self.publish(event)?;
236        Ok(())
237    }
238}