scouter_events/queue/
py_queue.rs

1#![allow(clippy::useless_conversion)]
2use crate::error::{EventError, PyEventError};
3use crate::queue::bus::{Event, QueueBus};
4use crate::queue::custom::CustomQueue;
5use crate::queue::psi::PsiQueue;
6use crate::queue::spc::SpcQueue;
7use crate::queue::traits::queue::QueueMethods;
8use crate::queue::types::TransportConfig;
9use pyo3::prelude::*;
10use scouter_types::{DriftProfile, QueueItem};
11use scouter_types::{Features, Metrics};
12use std::collections::HashMap;
13use std::path::PathBuf;
14use std::sync::Arc;
15use std::sync::RwLock;
16use tokio::sync::mpsc::UnboundedReceiver;
17use tokio::sync::oneshot;
18use tracing::{debug, error, instrument};
19
20pub enum QueueNum {
21    Spc(SpcQueue),
22    Psi(PsiQueue),
23    Custom(CustomQueue),
24}
25
26impl QueueNum {
27    pub async fn new(
28        drift_profile: DriftProfile,
29        config: TransportConfig,
30        queue_runtime: Arc<tokio::runtime::Runtime>,
31    ) -> Result<Self, EventError> {
32        match drift_profile {
33            DriftProfile::Spc(spc_profile) => {
34                let queue = SpcQueue::new(spc_profile, config).await?;
35                Ok(QueueNum::Spc(queue))
36            }
37            DriftProfile::Psi(psi_profile) => {
38                let queue = PsiQueue::new(psi_profile, config, queue_runtime).await?;
39                Ok(QueueNum::Psi(queue))
40            }
41            DriftProfile::Custom(custom_profile) => {
42                let queue = CustomQueue::new(custom_profile, config, queue_runtime).await?;
43                Ok(QueueNum::Custom(queue))
44            }
45        }
46    }
47
48    /// Top-level insert method for the queue
49    /// This method will take a QueueItem and insert it into the appropriate queue
50    /// If features, inserts using insert_features (spc, psi)
51    /// If metrics, inserts using insert_metrics (custom)
52    ///
53    /// # Arguments
54    /// * `entity` - The entity to insert into the queue
55    #[instrument(skip_all)]
56    pub async fn insert(&mut self, entity: QueueItem) -> Result<(), EventError> {
57        debug!("Inserting entity into queue: {:?}", entity);
58        match entity {
59            QueueItem::Features(features) => self.insert_features(features).await,
60            QueueItem::Metrics(metrics) => self.insert_metrics(metrics).await,
61        }
62    }
63
64    /// Insert features into the queue. Currently only applies to PSI and SPC queues
65    ///
66    /// # Arguments
67    /// * `features` - The features to insert into the queue
68    ///
69    ///
70    #[instrument(skip_all)]
71    pub async fn insert_features(&mut self, features: Features) -> Result<(), EventError> {
72        match self {
73            QueueNum::Psi(queue) => queue.insert(features).await,
74            QueueNum::Spc(queue) => queue.insert(features).await,
75            _ => Err(EventError::QueueNotSupportedFeatureError),
76        }
77    }
78
79    /// Insert metrics into the queue. Currently only applies to custom queues
80    ///
81    /// # Arguments
82    /// * `metrics` - The metrics to insert into the queue
83    ///
84    pub async fn insert_metrics(&mut self, metrics: Metrics) -> Result<(), EventError> {
85        match self {
86            QueueNum::Custom(queue) => queue.insert(metrics).await,
87            _ => Err(EventError::QueueNotSupportedMetricsError),
88        }
89    }
90
91    /// Flush the queue. This will publish the records to the producer
92    /// and shut down the background tasks
93    pub async fn flush(&mut self) -> Result<(), EventError> {
94        match self {
95            QueueNum::Spc(queue) => queue.flush().await,
96            QueueNum::Psi(queue) => queue.flush().await,
97            QueueNum::Custom(queue) => queue.flush().await,
98        }
99    }
100}
101
102#[allow(clippy::too_many_arguments)]
103async fn handle_queue_events(
104    mut rx: UnboundedReceiver<Event>,
105    mut shutdown_rx: oneshot::Receiver<()>,
106    drift_profile: DriftProfile,
107    config: TransportConfig,
108    id: String,
109    queue_runtime: Arc<tokio::runtime::Runtime>,
110    completion_tx: oneshot::Sender<()>,
111    initialized: Arc<RwLock<bool>>,
112) -> Result<(), EventError> {
113    let mut queue = match QueueNum::new(drift_profile, config.clone(), queue_runtime).await {
114        Ok(q) => q,
115        Err(e) => {
116            error!("Failed to initialize queue {}: {}", id, e);
117            return Err(e);
118        }
119    };
120    loop {
121        tokio::select! {
122            Some(event) = rx.recv() => {
123                match event {
124                    Event::Task(entity) => {
125                        match queue.insert(entity).await {
126                            Ok(_) => {
127                                debug!("Inserted entity into queue {}", id);
128                            }
129                            Err(e) => {
130                                error!("Error inserting entity into queue {}: {}", id, e);
131                            }
132                        }
133                    }
134                    Event::Init => {
135                        debug!("Received Init event for queue {}", id);
136                        match initialized.write() {
137                            Ok(mut init) => {
138                                *init = true;
139                                debug!("Queue {} initialized successfully", id);
140                            }
141                            Err(e) => {
142                                error!("Failed to write to initialized lock for queue {}: {}", id, e);
143                            }
144                        }
145                    }
146                }
147            }
148            _ = &mut shutdown_rx => {
149                debug!("Shutdown signal received for queue {}", id);
150                queue.flush().await?;
151                completion_tx.send(()).map_err(|_| EventError::SignalCompletionError)?;
152                break;
153            }
154        }
155    }
156    Ok(())
157}
158
159#[pyclass]
160pub struct ScouterQueue {
161    queues: HashMap<String, Py<QueueBus>>,
162    _shared_runtime: Arc<tokio::runtime::Runtime>,
163    completion_rxs: HashMap<String, oneshot::Receiver<()>>,
164    transport_config: TransportConfig,
165}
166
167#[pymethods]
168impl ScouterQueue {
169    /// Create a new ScouterQueue from a map of aliases and paths
170    /// This will create a new ScouterQueue for each path in the map
171    ///
172    /// # Process
173    /// 1. Create empty queues
174    /// 2. Extract transport config from python object
175    /// 3. Create a shared tokio runtime that is used to create background queues
176    /// 4. For each path in the map, create a new queue
177    /// 5. Spawn a new thread for each queue (some queues require background tasks)
178    /// 6. Return the ScouterQueue
179    ///
180    /// # Arguments
181    /// * `paths` - A map of aliases to paths
182    /// * `transport_config` - The transport config to use
183    ///
184    /// # Returns
185    /// * `ScouterQueue` - A new ScouterQueue
186    #[staticmethod]
187    #[pyo3(signature = (path, transport_config))]
188    pub fn from_path(
189        py: Python,
190        path: HashMap<String, PathBuf>,
191        transport_config: &Bound<'_, PyAny>,
192    ) -> Result<Self, PyEventError> {
193        // create a tokio runtime to run the background tasks
194        let shared_runtime =
195            Arc::new(tokio::runtime::Runtime::new().map_err(EventError::SetupTokioRuntimeError)?);
196
197        ScouterQueue::from_path_rs(py, path, transport_config, shared_runtime)
198    }
199
200    /// Get a queue by its alias
201    ///
202    /// # Example
203    /// ```python
204    /// from scouter import ScouterQueue
205    ///
206    /// scouter_queues = ScouterQueue.from_path(...)
207    /// scouter_queues["queue_alias"].insert(features)
208    /// ```
209    pub fn __getitem__<'py>(
210        &self,
211        py: Python<'py>,
212        key: &str,
213    ) -> Result<&Bound<'py, QueueBus>, PyEventError> {
214        match self.queues.get(key) {
215            Some(queue) => Ok(queue.bind(py)),
216            None => Err(PyEventError::MissingQueueError(key.to_string())),
217        }
218    }
219
220    #[getter]
221    /// Get the transport config for the ScouterQueue
222    pub fn transport_config<'py>(
223        &self,
224        py: Python<'py>,
225    ) -> Result<Bound<'py, PyAny>, PyEventError> {
226        self.transport_config.to_py(py)
227    }
228
229    /// Triggers a global shutdown for all queues
230    ///
231    /// # Example
232    ///
233    /// ```python
234    /// from scouter import ScouterQueue
235    ///
236    /// scouter_queues = ScouterQueue.from_path(...)
237    /// scouter_queues.shutdown()
238    ///
239    /// ```
240    pub fn shutdown(&mut self, py: Python) -> Result<(), PyEventError> {
241        // trigger shutdown for all queues
242        for queue in self.queues.values() {
243            let bound = queue.bind(py);
244            bound
245                .call_method0("shutdown")
246                .map_err(PyEventError::ShutdownQueueError)?;
247        }
248
249        self._shared_runtime.block_on(async {
250            for (id, completion_rx) in self.completion_rxs.drain() {
251                completion_rx
252                    .await
253                    .map_err(EventError::ShutdownReceiverError)?;
254                debug!("Queue {} initialized successfully", id);
255            }
256            Ok::<_, PyEventError>(())
257        })?;
258
259        self.queues.clear();
260
261        Ok(())
262    }
263}
264
265impl ScouterQueue {
266    /// Create a new ScouterQueue from a map of aliases and paths
267    /// This will create a new ScouterQueue for each path in the map
268    /// This method was created to help with integration into the Opsml CardDeck where this
269    /// method is called directly.
270    ///
271    /// # Process
272    /// 1. Create empty queues
273    /// 2. Extract transport config from python object
274    /// 3. Create a shared tokio runtime that is used to create background queues
275    /// 4. For each path in the map, create a new queue
276    /// 5. Spawn a new thread for each queue (some queues require background tasks)
277    /// 6. Return the ScouterQueue
278    ///
279    /// # Arguments
280    /// * `paths` - A map of aliases to paths
281    /// * `transport_config` - The transport config to use
282    /// * *shared_runtime* - A shared tokio runtime that is used to create background queues
283    ///
284    /// # Returns
285    /// * `ScouterQueue` - A new ScouterQueue
286    pub fn from_path_rs(
287        py: Python,
288        path: HashMap<String, PathBuf>,
289        transport_config: &Bound<'_, PyAny>,
290        shared_runtime: Arc<tokio::runtime::Runtime>,
291    ) -> Result<Self, PyEventError> {
292        debug!("Creating ScouterQueue from path");
293        let mut queues = HashMap::new();
294        let mut completion_rxs = HashMap::new();
295
296        // assert transport config is not None
297        if transport_config.is_none() {
298            return Err(PyEventError::MissingTransportConfig);
299        }
300
301        // Extract transport config from python object
302        let config = TransportConfig::from_py_config(transport_config)?;
303
304        // load each profile from path
305        // In practice you can load as many profiles as you want
306        for (id, profile_path) in path {
307            let cloned_config = config.clone();
308            let drift_profile = DriftProfile::from_profile_path(profile_path)?;
309
310            // create startup channels to ensure queues are initialized before use
311            //let (startup_tx, startup_rx) = oneshot::channel();
312
313            // create completion channels to ensure queues are flushed before shutdown
314            let (completion_tx, completion_rx) = oneshot::channel();
315            let (bus, rx, shutdown_rx) = QueueBus::new();
316
317            let queue_runtime = shared_runtime.clone();
318
319            // spawn a new thread for each queue
320            let id_clone = id.clone();
321            let initialized = bus.initialized.clone();
322
323            // Just spawn the task without waiting for initialization
324            shared_runtime.spawn(async move {
325                match handle_queue_events(
326                    rx,
327                    shutdown_rx,
328                    drift_profile,
329                    cloned_config,
330                    id_clone,
331                    queue_runtime,
332                    completion_tx,
333                    initialized,
334                )
335                .await
336                {
337                    Ok(_) => debug!("Queue handler started successfully"),
338                    Err(e) => error!("Queue handler exited with error: {}", e),
339                }
340            });
341
342            // Check bus initialization.
343            // This will send an init event to ensure the spawned loop is working.
344            // If loop is running, loop will set the initialized flag to true
345            bus.init()?;
346
347            let queue = Py::new(py, bus)?;
348            queues.insert(id.clone(), queue);
349            completion_rxs.insert(id, completion_rx);
350        }
351
352        Ok(ScouterQueue {
353            queues,
354            // need to keep the runtime alive for the life of ScouterQueue
355            _shared_runtime: shared_runtime,
356            completion_rxs,
357            transport_config: config,
358        })
359    }
360}