1#![allow(clippy::useless_conversion)]
2use crate::error::{EventError, PyEventError};
3use crate::queue::bus::{Event, QueueBus};
4use crate::queue::custom::CustomQueue;
5use crate::queue::psi::PsiQueue;
6use crate::queue::spc::SpcQueue;
7use crate::queue::traits::queue::QueueMethods;
8use crate::queue::types::TransportConfig;
9use pyo3::prelude::*;
10use scouter_types::{DriftProfile, QueueItem};
11use scouter_types::{Features, Metrics};
12use std::collections::HashMap;
13use std::path::PathBuf;
14use std::sync::Arc;
15use std::sync::RwLock;
16use tokio::sync::mpsc::UnboundedReceiver;
17use tokio::sync::oneshot;
18use tracing::{debug, error, instrument};
19
20pub enum QueueNum {
21 Spc(SpcQueue),
22 Psi(PsiQueue),
23 Custom(CustomQueue),
24}
25
26impl QueueNum {
27 pub async fn new(
28 drift_profile: DriftProfile,
29 config: TransportConfig,
30 queue_runtime: Arc<tokio::runtime::Runtime>,
31 ) -> Result<Self, EventError> {
32 match drift_profile {
33 DriftProfile::Spc(spc_profile) => {
34 let queue = SpcQueue::new(spc_profile, config).await?;
35 Ok(QueueNum::Spc(queue))
36 }
37 DriftProfile::Psi(psi_profile) => {
38 let queue = PsiQueue::new(psi_profile, config, queue_runtime).await?;
39 Ok(QueueNum::Psi(queue))
40 }
41 DriftProfile::Custom(custom_profile) => {
42 let queue = CustomQueue::new(custom_profile, config, queue_runtime).await?;
43 Ok(QueueNum::Custom(queue))
44 }
45 }
46 }
47
48 #[instrument(skip_all)]
56 pub async fn insert(&mut self, entity: QueueItem) -> Result<(), EventError> {
57 debug!("Inserting entity into queue: {:?}", entity);
58 match entity {
59 QueueItem::Features(features) => self.insert_features(features).await,
60 QueueItem::Metrics(metrics) => self.insert_metrics(metrics).await,
61 }
62 }
63
64 #[instrument(skip_all)]
71 pub async fn insert_features(&mut self, features: Features) -> Result<(), EventError> {
72 match self {
73 QueueNum::Psi(queue) => queue.insert(features).await,
74 QueueNum::Spc(queue) => queue.insert(features).await,
75 _ => Err(EventError::QueueNotSupportedFeatureError),
76 }
77 }
78
79 pub async fn insert_metrics(&mut self, metrics: Metrics) -> Result<(), EventError> {
85 match self {
86 QueueNum::Custom(queue) => queue.insert(metrics).await,
87 _ => Err(EventError::QueueNotSupportedMetricsError),
88 }
89 }
90
91 pub async fn flush(&mut self) -> Result<(), EventError> {
94 match self {
95 QueueNum::Spc(queue) => queue.flush().await,
96 QueueNum::Psi(queue) => queue.flush().await,
97 QueueNum::Custom(queue) => queue.flush().await,
98 }
99 }
100}
101
102#[allow(clippy::too_many_arguments)]
103async fn handle_queue_events(
104 mut rx: UnboundedReceiver<Event>,
105 mut shutdown_rx: oneshot::Receiver<()>,
106 drift_profile: DriftProfile,
107 config: TransportConfig,
108 id: String,
109 queue_runtime: Arc<tokio::runtime::Runtime>,
110 completion_tx: oneshot::Sender<()>,
111 initialized: Arc<RwLock<bool>>,
112) -> Result<(), EventError> {
113 let mut queue = match QueueNum::new(drift_profile, config.clone(), queue_runtime).await {
114 Ok(q) => q,
115 Err(e) => {
116 error!("Failed to initialize queue {}: {}", id, e);
117 return Err(e);
118 }
119 };
120 loop {
121 tokio::select! {
122 Some(event) = rx.recv() => {
123 match event {
124 Event::Task(entity) => {
125 match queue.insert(entity).await {
126 Ok(_) => {
127 debug!("Inserted entity into queue {}", id);
128 }
129 Err(e) => {
130 error!("Error inserting entity into queue {}: {}", id, e);
131 }
132 }
133 }
134 Event::Init => {
135 debug!("Received Init event for queue {}", id);
136 match initialized.write() {
137 Ok(mut init) => {
138 *init = true;
139 debug!("Queue {} initialized successfully", id);
140 }
141 Err(e) => {
142 error!("Failed to write to initialized lock for queue {}: {}", id, e);
143 }
144 }
145 }
146 }
147 }
148 _ = &mut shutdown_rx => {
149 debug!("Shutdown signal received for queue {}", id);
150 queue.flush().await?;
151 completion_tx.send(()).map_err(|_| EventError::SignalCompletionError)?;
152 break;
153 }
154 }
155 }
156 Ok(())
157}
158
159#[pyclass]
160pub struct ScouterQueue {
161 queues: HashMap<String, Py<QueueBus>>,
162 _shared_runtime: Arc<tokio::runtime::Runtime>,
163 completion_rxs: HashMap<String, oneshot::Receiver<()>>,
164 transport_config: TransportConfig,
165}
166
167#[pymethods]
168impl ScouterQueue {
169 #[staticmethod]
187 #[pyo3(signature = (path, transport_config))]
188 pub fn from_path(
189 py: Python,
190 path: HashMap<String, PathBuf>,
191 transport_config: &Bound<'_, PyAny>,
192 ) -> Result<Self, PyEventError> {
193 let shared_runtime =
195 Arc::new(tokio::runtime::Runtime::new().map_err(EventError::SetupTokioRuntimeError)?);
196
197 ScouterQueue::from_path_rs(py, path, transport_config, shared_runtime)
198 }
199
200 pub fn __getitem__<'py>(
210 &self,
211 py: Python<'py>,
212 key: &str,
213 ) -> Result<&Bound<'py, QueueBus>, PyEventError> {
214 match self.queues.get(key) {
215 Some(queue) => Ok(queue.bind(py)),
216 None => Err(PyEventError::MissingQueueError(key.to_string())),
217 }
218 }
219
220 #[getter]
221 pub fn transport_config<'py>(
223 &self,
224 py: Python<'py>,
225 ) -> Result<Bound<'py, PyAny>, PyEventError> {
226 self.transport_config.to_py(py)
227 }
228
229 pub fn shutdown(&mut self, py: Python) -> Result<(), PyEventError> {
241 for queue in self.queues.values() {
243 let bound = queue.bind(py);
244 bound
245 .call_method0("shutdown")
246 .map_err(PyEventError::ShutdownQueueError)?;
247 }
248
249 self._shared_runtime.block_on(async {
250 for (id, completion_rx) in self.completion_rxs.drain() {
251 completion_rx
252 .await
253 .map_err(EventError::ShutdownReceiverError)?;
254 debug!("Queue {} initialized successfully", id);
255 }
256 Ok::<_, PyEventError>(())
257 })?;
258
259 self.queues.clear();
260
261 Ok(())
262 }
263}
264
265impl ScouterQueue {
266 pub fn from_path_rs(
287 py: Python,
288 path: HashMap<String, PathBuf>,
289 transport_config: &Bound<'_, PyAny>,
290 shared_runtime: Arc<tokio::runtime::Runtime>,
291 ) -> Result<Self, PyEventError> {
292 debug!("Creating ScouterQueue from path");
293 let mut queues = HashMap::new();
294 let mut completion_rxs = HashMap::new();
295
296 if transport_config.is_none() {
298 return Err(PyEventError::MissingTransportConfig);
299 }
300
301 let config = TransportConfig::from_py_config(transport_config)?;
303
304 for (id, profile_path) in path {
307 let cloned_config = config.clone();
308 let drift_profile = DriftProfile::from_profile_path(profile_path)?;
309
310 let (completion_tx, completion_rx) = oneshot::channel();
315 let (bus, rx, shutdown_rx) = QueueBus::new();
316
317 let queue_runtime = shared_runtime.clone();
318
319 let id_clone = id.clone();
321 let initialized = bus.initialized.clone();
322
323 shared_runtime.spawn(async move {
325 match handle_queue_events(
326 rx,
327 shutdown_rx,
328 drift_profile,
329 cloned_config,
330 id_clone,
331 queue_runtime,
332 completion_tx,
333 initialized,
334 )
335 .await
336 {
337 Ok(_) => debug!("Queue handler started successfully"),
338 Err(e) => error!("Queue handler exited with error: {}", e),
339 }
340 });
341
342 bus.init()?;
346
347 let queue = Py::new(py, bus)?;
348 queues.insert(id.clone(), queue);
349 completion_rxs.insert(id, completion_rx);
350 }
351
352 Ok(ScouterQueue {
353 queues,
354 _shared_runtime: shared_runtime,
356 completion_rxs,
357 transport_config: config,
358 })
359 }
360}