scouter_events/queue/
bus.rs1use std::sync::Arc;
2
3use crate::error::{EventError, PyEventError};
4use pyo3::prelude::*;
5use scouter_types::QueueItem;
6use std::sync::RwLock;
7use tokio::task::JoinHandle;
8use tokio::time::Duration;
9use tokio::{sync::mpsc::UnboundedSender, task::AbortHandle};
10use tokio_util::sync::CancellationToken;
11use tracing::{debug, instrument, warn};
12
13#[derive(Debug)]
14pub enum Event {
15 Task(QueueItem),
16 Flush,
17}
18
19#[derive(Debug)]
20pub struct Task {
21 pub abort_handle: Option<AbortHandle>,
22 pub running: bool,
23 pub cancel_token: Option<CancellationToken>,
24}
25
26impl Task {
27 pub fn new() -> Self {
28 Self {
29 abort_handle: None,
30 running: false,
31 cancel_token: None,
32 }
33 }
34}
35
36impl Default for Task {
37 fn default() -> Self {
38 Self::new()
39 }
40}
41
42#[derive(Debug, Clone)]
43pub struct TaskState {
44 pub event_task: Arc<RwLock<Task>>,
46
47 pub background_task: Arc<RwLock<Task>>,
49
50 pub event_tx: UnboundedSender<Event>,
52
53 pub id: String,
54}
55
56impl TaskState {
57 pub fn add_background_cancellation_token(&mut self, token: CancellationToken) {
58 self.background_task.write().unwrap().cancel_token = Some(token);
59 }
60
61 pub fn cancel_background_task(&self) {
62 let cancel_token = &self.background_task.read().unwrap().cancel_token;
63 if let Some(cancel_token) = cancel_token {
64 debug!("Cancelling background task");
65 cancel_token.cancel();
66 }
67 }
68
69 pub fn add_event_cancellation_token(&mut self, token: CancellationToken) {
70 self.event_task.write().unwrap().cancel_token = Some(token);
71 }
72
73 fn flush_event_task(&self) -> Result<(), EventError> {
74 Ok(self.event_tx.send(Event::Flush)?)
75 }
76
77 fn cancel_event_task(&self) {
78 let cancel_token = &self.event_task.read().unwrap().cancel_token;
79 if let Some(cancel_token) = cancel_token {
80 debug!("Cancelling event task");
81 cancel_token.cancel();
82 }
83 }
84
85 pub fn add_event_abort_handle(&mut self, handle: JoinHandle<()>) {
86 self.event_task
87 .write()
88 .unwrap()
89 .abort_handle
90 .replace(handle.abort_handle());
91 }
92
93 pub fn add_background_abort_handle(&mut self, handle: JoinHandle<()>) {
94 self.background_task
95 .write()
96 .unwrap()
97 .abort_handle
98 .replace(handle.abort_handle());
99 }
100
101 pub fn is_event_running(&self) -> bool {
102 self.event_task.read().unwrap().running
103 }
104
105 pub fn has_background_handle(&self) -> bool {
106 self.background_task.read().unwrap().abort_handle.is_some()
107 }
108
109 pub fn is_background_running(&self) -> bool {
110 self.background_task.read().unwrap().running
111 }
112
113 pub fn set_event_running(&self, running: bool) {
114 let mut event_task = self.event_task.write().unwrap();
115 event_task.running = running;
116 }
117
118 pub fn set_background_running(&self, running: bool) {
119 let mut background_task = self.background_task.write().unwrap();
120 background_task.running = running;
121 }
122
123 fn shutdown_background_task(&self) -> Result<(), EventError> {
130 self.cancel_background_task();
132
133 let background_handle = {
135 let guard = self.background_task.write().unwrap().abort_handle.take();
136 guard
137 };
138
139 if let Some(handle) = background_handle {
140 handle.abort();
141 debug!("Background task handle shut down");
142 }
143
144 Ok(())
145 }
146
147 fn shutdown_event_task(&self) -> Result<(), EventError> {
154 match self.flush_event_task() {
155 Ok(_) => debug!("Event task flush signal sent"),
156 Err(e) => {
157 let error_msg = e.to_string();
158 if error_msg.contains("channel closed") {
159 debug!("Channel already closed for event task: {}", self.id);
160 } else {
161 warn!("Failed to send flush signal to event task: {}", e);
162 }
163 }
164 }
165
166 debug!("Waiting 250 ms to allow time for flush before cancelling event task");
167 std::thread::sleep(Duration::from_millis(250));
168
169 self.cancel_event_task();
170
171 debug!("Waiting 250 ms to allow time for flush before aborting event task");
173 std::thread::sleep(Duration::from_millis(250));
174
175 let event_handle = {
177 let guard = self.event_task.write().unwrap().abort_handle.take();
178 guard
179 };
180
181 if let Some(handle) = event_handle {
182 handle.abort();
183 debug!("Event task handle shut down");
184 }
185
186 Ok(())
187 }
188
189 pub fn shutdown_tasks(&self) -> Result<(), EventError> {
191 self.shutdown_background_task()?;
192 self.shutdown_event_task()?;
193 Ok(())
194 }
195}
196
197#[pyclass(name = "Queue")]
201pub struct QueueBus {
202 pub task_state: TaskState,
203
204 #[pyo3(get)]
205 pub identifier: String,
206}
207
208impl QueueBus {
209 #[instrument(skip_all)]
210 pub fn new(task_state: TaskState, identifier: String) -> Self {
211 debug!("Creating unbounded QueueBus");
212
213 Self {
214 task_state,
215 identifier,
216 }
217 }
218
219 #[instrument(skip_all)]
220 pub fn publish(&self, event: Event) -> Result<(), EventError> {
221 Ok(self.task_state.event_tx.send(event)?)
222 }
223}
224
225#[pymethods]
226impl QueueBus {
227 pub fn insert(&self, entity: &Bound<'_, PyAny>) -> Result<(), PyEventError> {
232 let entity = QueueItem::from_py_entity(entity)?;
233 debug!("Inserting event into QueueBus: {:?}", entity);
234 let event = Event::Task(entity);
235 self.publish(event)?;
236 Ok(())
237 }
238}