scouter_events/queue/
py_queue.rs

1#![allow(clippy::useless_conversion)]
2use crate::error::{EventError, PyEventError};
3use crate::queue::bus::{Event, QueueBus};
4use crate::queue::custom::CustomQueue;
5use crate::queue::psi::PsiQueue;
6use crate::queue::spc::SpcQueue;
7use crate::queue::traits::queue::QueueMethods;
8use crate::queue::types::TransportConfig;
9use pyo3::prelude::*;
10use scouter_types::{DriftProfile, QueueItem};
11use scouter_types::{Features, Metrics};
12use std::collections::HashMap;
13use std::path::PathBuf;
14use std::sync::Arc;
15use tokio::sync::mpsc::UnboundedReceiver;
16use tokio::sync::oneshot;
17use tracing::{debug, error};
18
19pub enum QueueNum {
20    Spc(SpcQueue),
21    Psi(PsiQueue),
22    Custom(CustomQueue),
23}
24
25impl QueueNum {
26    pub async fn new(
27        drift_profile: DriftProfile,
28        config: TransportConfig,
29        queue_runtime: Arc<tokio::runtime::Runtime>,
30    ) -> Result<Self, EventError> {
31        match drift_profile {
32            DriftProfile::Spc(spc_profile) => {
33                let queue = SpcQueue::new(spc_profile, config).await?;
34                Ok(QueueNum::Spc(queue))
35            }
36            DriftProfile::Psi(psi_profile) => {
37                let queue = PsiQueue::new(psi_profile, config, queue_runtime).await?;
38                Ok(QueueNum::Psi(queue))
39            }
40            DriftProfile::Custom(custom_profile) => {
41                let queue = CustomQueue::new(custom_profile, config, queue_runtime).await?;
42                Ok(QueueNum::Custom(queue))
43            }
44        }
45    }
46
47    /// Top-level insert method for the queue
48    /// This method will take a QueueItem and insert it into the appropriate queue
49    /// If features, inserts using insert_features (spc, psi)
50    /// If metrics, inserts using insert_metrics (custom)
51    ///
52    /// # Arguments
53    /// * `entity` - The entity to insert into the queue
54    pub async fn insert(&mut self, entity: QueueItem) -> Result<(), EventError> {
55        match entity {
56            QueueItem::Features(features) => self.insert_features(features).await,
57            QueueItem::Metrics(metrics) => self.insert_metrics(metrics).await,
58        }
59    }
60
61    /// Insert features into the queue. Currently only applies to PSI and SPC queues
62    ///
63    /// # Arguments
64    /// * `features` - The features to insert into the queue
65    ///
66    ///
67    pub async fn insert_features(&mut self, features: Features) -> Result<(), EventError> {
68        match self {
69            QueueNum::Psi(queue) => queue.insert(features).await,
70            QueueNum::Spc(queue) => queue.insert(features).await,
71            _ => Err(EventError::QueueNotSupportedFeatureError),
72        }
73    }
74
75    /// Insert metrics into the queue. Currently only applies to custom queues
76    ///
77    /// # Arguments
78    /// * `metrics` - The metrics to insert into the queue
79    ///
80    pub async fn insert_metrics(&mut self, metrics: Metrics) -> Result<(), EventError> {
81        match self {
82            QueueNum::Custom(queue) => queue.insert(metrics).await,
83            _ => Err(EventError::QueueNotSupportedMetricsError),
84        }
85    }
86
87    /// Flush the queue. This will publish the records to the producer
88    /// and shut down the background tasks
89    pub async fn flush(&mut self) -> Result<(), EventError> {
90        match self {
91            QueueNum::Spc(queue) => queue.flush().await,
92            QueueNum::Psi(queue) => queue.flush().await,
93            QueueNum::Custom(queue) => queue.flush().await,
94        }
95    }
96}
97
98#[allow(clippy::too_many_arguments)]
99async fn handle_queue_events(
100    mut rx: UnboundedReceiver<Event>,
101    mut shutdown_rx: oneshot::Receiver<()>,
102    drift_profile: DriftProfile,
103    config: TransportConfig,
104    id: String,
105    queue_runtime: Arc<tokio::runtime::Runtime>,
106    startup_tx: oneshot::Sender<()>,
107    completion_tx: oneshot::Sender<()>,
108) -> Result<(), EventError> {
109    let mut queue = QueueNum::new(drift_profile, config, queue_runtime).await?;
110
111    // Signal that initialization is complete
112    startup_tx
113        .send(())
114        .map_err(|_| EventError::SignalStartupError)?;
115
116    loop {
117        tokio::select! {
118            Some(event) = rx.recv() => {
119                match event {
120                    Event::Task(entity) => {
121                        match queue.insert(entity).await {
122                            Ok(_) => {
123                                debug!("Inserted entity into queue {}", id);
124                            }
125                            Err(e) => {
126                                error!("Error inserting entity into queue {}: {}", id, e);
127                            }
128                        }
129                    }
130                }
131            }
132            _ = &mut shutdown_rx => {
133                debug!("Shutdown signal received for queue {}", id);
134                queue.flush().await?;
135                completion_tx.send(()).map_err(|_| EventError::SignalCompletionError)?;
136                break;
137            }
138        }
139    }
140    Ok(())
141}
142
143#[pyclass]
144pub struct ScouterQueue {
145    queues: HashMap<String, Py<QueueBus>>,
146    _shared_runtime: Arc<tokio::runtime::Runtime>,
147    completion_rxs: HashMap<String, oneshot::Receiver<()>>,
148}
149
150#[pymethods]
151impl ScouterQueue {
152    /// Create a new ScouterQueue from a map of aliases and paths
153    /// This will create a new ScouterQueue for each path in the map
154    ///
155    /// # Process
156    /// 1. Create empty queues
157    /// 2. Extract transport config from python object
158    /// 3. Create a shared tokio runtime that is used to create background queues
159    /// 4. For each path in the map, create a new queue
160    /// 5. Spawn a new thread for each queue (some queues require background tasks)
161    /// 6. Return the ScouterQueue
162    ///
163    /// # Arguments
164    /// * `paths` - A map of aliases to paths
165    /// * `transport_config` - The transport config to use
166    ///
167    /// # Returns
168    /// * `ScouterQueue` - A new ScouterQueue
169    #[staticmethod]
170    #[pyo3(signature = (path, transport_config))]
171    pub fn from_path(
172        py: Python,
173        path: HashMap<String, PathBuf>,
174        transport_config: &Bound<'_, PyAny>,
175    ) -> Result<Self, PyEventError> {
176        let mut queues = HashMap::new();
177        let mut startup_rxs = Vec::new();
178        let mut completion_rxs = HashMap::new();
179
180        // Extract transport config from python object
181        let config = TransportConfig::from_py_config(transport_config)?;
182
183        // create a tokio runtime to run the background tasks
184        let shared_runtime =
185            Arc::new(tokio::runtime::Runtime::new().map_err(EventError::SetupTokioRuntimeError)?);
186
187        // load each profile from path
188        // In practice you can load as many profiles as you want
189        for (id, profile_path) in path {
190            let cloned_config = config.clone();
191            let drift_profile = DriftProfile::from_profile_path(profile_path)?;
192
193            // create startup channels to ensure queues are initialized before use
194            let (startup_tx, startup_rx) = oneshot::channel();
195
196            // create completion channels to ensure queues are flushed before shutdown
197            let (completion_tx, completion_rx) = oneshot::channel();
198            let (bus, rx, shutdown_rx) = QueueBus::new();
199
200            let queue_runtime = shared_runtime.clone();
201
202            // spawn a new thread for each queue
203            let id_clone = id.clone();
204            shared_runtime.spawn(handle_queue_events(
205                rx,
206                shutdown_rx,
207                drift_profile,
208                cloned_config,
209                id_clone,
210                queue_runtime,
211                startup_tx,
212                completion_tx,
213            ));
214
215            let queue = Py::new(py, bus)?;
216
217            queues.insert(id.clone(), queue);
218            startup_rxs.push((id.clone(), startup_rx));
219            completion_rxs.insert(id, completion_rx);
220        }
221
222        // wait for all queues to start up
223        shared_runtime.block_on(async {
224            for (id, startup_rx) in startup_rxs {
225                startup_rx.await.map_err(EventError::StartupReceiverError)?;
226                debug!("Queue {} initialized successfully", id);
227            }
228            Ok::<_, EventError>(())
229        })?;
230
231        Ok(ScouterQueue {
232            queues,
233            // need to keep the runtime alive for the life of ScouterQueue
234            _shared_runtime: shared_runtime,
235            completion_rxs,
236        })
237    }
238
239    /// Get a queue by its alias
240    ///
241    /// # Example
242    /// ```python
243    /// from scouter import ScouterQueue
244    ///
245    /// scouter_queues = ScouterQueue.from_path(...)
246    /// scouter_queues["queue_alias"].insert(features)
247    /// ```
248    pub fn __getitem__<'py>(
249        &self,
250        py: Python<'py>,
251        key: &str,
252    ) -> Result<&Bound<'py, QueueBus>, PyEventError> {
253        match self.queues.get(key) {
254            Some(queue) => Ok(queue.bind(py)),
255            None => Err(PyEventError::MissingQueueError(key.to_string())),
256        }
257    }
258
259    /// Triggers a global shutdown for all queues
260    ///
261    /// # Example
262    ///
263    /// ```python
264    /// from scouter import ScouterQueue
265    ///
266    /// scouter_queues = ScouterQueue.from_path(...)
267    /// scouter_queues.shutdown()
268    ///
269    /// ```
270    pub fn shutdown(&mut self, py: Python) -> Result<(), PyEventError> {
271        // trigger shutdown for all queues
272        for queue in self.queues.values() {
273            let bound = queue.bind(py);
274            bound
275                .call_method0("shutdown")
276                .map_err(PyEventError::ShutdownQueueError)?;
277        }
278
279        self._shared_runtime.block_on(async {
280            for (id, completion_rx) in self.completion_rxs.drain() {
281                completion_rx
282                    .await
283                    .map_err(EventError::ShutdownReceiverError)?;
284                debug!("Queue {} initialized successfully", id);
285            }
286            Ok::<_, PyEventError>(())
287        })?;
288
289        self.queues.clear();
290
291        Ok(())
292    }
293}