1#![allow(clippy::useless_conversion)]
2use crate::error::{EventError, PyEventError};
3use crate::queue::bus::{Event, QueueBus};
4use crate::queue::custom::CustomQueue;
5use crate::queue::psi::PsiQueue;
6use crate::queue::spc::SpcQueue;
7use crate::queue::traits::queue::QueueMethods;
8use crate::queue::types::TransportConfig;
9use pyo3::prelude::*;
10use scouter_types::{DriftProfile, QueueItem};
11use scouter_types::{Features, Metrics};
12use std::collections::HashMap;
13use std::path::PathBuf;
14use std::sync::Arc;
15use tokio::sync::mpsc::UnboundedReceiver;
16use tokio::sync::oneshot;
17use tracing::{debug, error};
18
19pub enum QueueNum {
20 Spc(SpcQueue),
21 Psi(PsiQueue),
22 Custom(CustomQueue),
23}
24
25impl QueueNum {
26 pub async fn new(
27 drift_profile: DriftProfile,
28 config: TransportConfig,
29 queue_runtime: Arc<tokio::runtime::Runtime>,
30 ) -> Result<Self, EventError> {
31 match drift_profile {
32 DriftProfile::Spc(spc_profile) => {
33 let queue = SpcQueue::new(spc_profile, config).await?;
34 Ok(QueueNum::Spc(queue))
35 }
36 DriftProfile::Psi(psi_profile) => {
37 let queue = PsiQueue::new(psi_profile, config, queue_runtime).await?;
38 Ok(QueueNum::Psi(queue))
39 }
40 DriftProfile::Custom(custom_profile) => {
41 let queue = CustomQueue::new(custom_profile, config, queue_runtime).await?;
42 Ok(QueueNum::Custom(queue))
43 }
44 }
45 }
46
47 pub async fn insert(&mut self, entity: QueueItem) -> Result<(), EventError> {
55 match entity {
56 QueueItem::Features(features) => self.insert_features(features).await,
57 QueueItem::Metrics(metrics) => self.insert_metrics(metrics).await,
58 }
59 }
60
61 pub async fn insert_features(&mut self, features: Features) -> Result<(), EventError> {
68 match self {
69 QueueNum::Psi(queue) => queue.insert(features).await,
70 QueueNum::Spc(queue) => queue.insert(features).await,
71 _ => Err(EventError::QueueNotSupportedFeatureError),
72 }
73 }
74
75 pub async fn insert_metrics(&mut self, metrics: Metrics) -> Result<(), EventError> {
81 match self {
82 QueueNum::Custom(queue) => queue.insert(metrics).await,
83 _ => Err(EventError::QueueNotSupportedMetricsError),
84 }
85 }
86
87 pub async fn flush(&mut self) -> Result<(), EventError> {
90 match self {
91 QueueNum::Spc(queue) => queue.flush().await,
92 QueueNum::Psi(queue) => queue.flush().await,
93 QueueNum::Custom(queue) => queue.flush().await,
94 }
95 }
96}
97
98#[allow(clippy::too_many_arguments)]
99async fn handle_queue_events(
100 mut rx: UnboundedReceiver<Event>,
101 mut shutdown_rx: oneshot::Receiver<()>,
102 drift_profile: DriftProfile,
103 config: TransportConfig,
104 id: String,
105 queue_runtime: Arc<tokio::runtime::Runtime>,
106 startup_tx: oneshot::Sender<()>,
107 completion_tx: oneshot::Sender<()>,
108) -> Result<(), EventError> {
109 let mut queue = QueueNum::new(drift_profile, config, queue_runtime).await?;
110
111 startup_tx
113 .send(())
114 .map_err(|_| EventError::SignalStartupError)?;
115
116 loop {
117 tokio::select! {
118 Some(event) = rx.recv() => {
119 match event {
120 Event::Task(entity) => {
121 match queue.insert(entity).await {
122 Ok(_) => {
123 debug!("Inserted entity into queue {}", id);
124 }
125 Err(e) => {
126 error!("Error inserting entity into queue {}: {}", id, e);
127 }
128 }
129 }
130 }
131 }
132 _ = &mut shutdown_rx => {
133 debug!("Shutdown signal received for queue {}", id);
134 queue.flush().await?;
135 completion_tx.send(()).map_err(|_| EventError::SignalCompletionError)?;
136 break;
137 }
138 }
139 }
140 Ok(())
141}
142
143#[pyclass]
144pub struct ScouterQueue {
145 queues: HashMap<String, Py<QueueBus>>,
146 _shared_runtime: Arc<tokio::runtime::Runtime>,
147 completion_rxs: HashMap<String, oneshot::Receiver<()>>,
148}
149
150#[pymethods]
151impl ScouterQueue {
152 #[staticmethod]
170 #[pyo3(signature = (path, transport_config))]
171 pub fn from_path(
172 py: Python,
173 path: HashMap<String, PathBuf>,
174 transport_config: &Bound<'_, PyAny>,
175 ) -> Result<Self, PyEventError> {
176 let mut queues = HashMap::new();
177 let mut startup_rxs = Vec::new();
178 let mut completion_rxs = HashMap::new();
179
180 let config = TransportConfig::from_py_config(transport_config)?;
182
183 let shared_runtime =
185 Arc::new(tokio::runtime::Runtime::new().map_err(EventError::SetupTokioRuntimeError)?);
186
187 for (id, profile_path) in path {
190 let cloned_config = config.clone();
191 let drift_profile = DriftProfile::from_profile_path(profile_path)?;
192
193 let (startup_tx, startup_rx) = oneshot::channel();
195
196 let (completion_tx, completion_rx) = oneshot::channel();
198 let (bus, rx, shutdown_rx) = QueueBus::new();
199
200 let queue_runtime = shared_runtime.clone();
201
202 let id_clone = id.clone();
204 shared_runtime.spawn(handle_queue_events(
205 rx,
206 shutdown_rx,
207 drift_profile,
208 cloned_config,
209 id_clone,
210 queue_runtime,
211 startup_tx,
212 completion_tx,
213 ));
214
215 let queue = Py::new(py, bus)?;
216
217 queues.insert(id.clone(), queue);
218 startup_rxs.push((id.clone(), startup_rx));
219 completion_rxs.insert(id, completion_rx);
220 }
221
222 shared_runtime.block_on(async {
224 for (id, startup_rx) in startup_rxs {
225 startup_rx.await.map_err(EventError::StartupReceiverError)?;
226 debug!("Queue {} initialized successfully", id);
227 }
228 Ok::<_, EventError>(())
229 })?;
230
231 Ok(ScouterQueue {
232 queues,
233 _shared_runtime: shared_runtime,
235 completion_rxs,
236 })
237 }
238
239 pub fn __getitem__<'py>(
249 &self,
250 py: Python<'py>,
251 key: &str,
252 ) -> Result<&Bound<'py, QueueBus>, PyEventError> {
253 match self.queues.get(key) {
254 Some(queue) => Ok(queue.bind(py)),
255 None => Err(PyEventError::MissingQueueError(key.to_string())),
256 }
257 }
258
259 pub fn shutdown(&mut self, py: Python) -> Result<(), PyEventError> {
271 for queue in self.queues.values() {
273 let bound = queue.bind(py);
274 bound
275 .call_method0("shutdown")
276 .map_err(PyEventError::ShutdownQueueError)?;
277 }
278
279 self._shared_runtime.block_on(async {
280 for (id, completion_rx) in self.completion_rxs.drain() {
281 completion_rx
282 .await
283 .map_err(EventError::ShutdownReceiverError)?;
284 debug!("Queue {} initialized successfully", id);
285 }
286 Ok::<_, PyEventError>(())
287 })?;
288
289 self.queues.clear();
290
291 Ok(())
292 }
293}