scouter_events/queue/
py_queue.rs

1#![allow(clippy::useless_conversion)]
2use crate::error::{EventError, PyEventError};
3use crate::queue::bus::Task;
4use crate::queue::bus::{Event, QueueBus, TaskState};
5use crate::queue::custom::CustomQueue;
6use crate::queue::llm::LLMQueue;
7use crate::queue::psi::PsiQueue;
8use crate::queue::spc::SpcQueue;
9use crate::queue::traits::queue::wait_for_background_task;
10use crate::queue::traits::queue::wait_for_event_task;
11use crate::queue::traits::queue::QueueMethods;
12use crate::queue::types::TransportConfig;
13use pyo3::prelude::*;
14use scouter_state::app_state;
15use scouter_types::{DriftProfile, LLMRecord, QueueItem};
16use scouter_types::{Features, Metrics};
17use std::collections::HashMap;
18use std::path::PathBuf;
19use std::sync::Arc;
20use std::sync::RwLock;
21use tokio::sync::mpsc::UnboundedReceiver;
22use tokio_util::sync::CancellationToken;
23use tracing::{debug, error, instrument};
24
25fn create_event_state(id: String) -> (TaskState, UnboundedReceiver<Event>) {
26    let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel();
27
28    // get background loop
29    let background_task = Arc::new(RwLock::new(Task::new()));
30
31    // get event loop
32    let event_task = Arc::new(RwLock::new(Task::new()));
33
34    let event_state = TaskState {
35        event_task,
36        background_task,
37        event_tx,
38        id,
39    };
40
41    (event_state, event_rx)
42}
43pub enum QueueNum {
44    Spc(SpcQueue),
45    Psi(PsiQueue),
46    Custom(CustomQueue),
47    LLM(LLMQueue),
48}
49// need to add queue running lock to each and return it to the queue bus
50impl QueueNum {
51    pub async fn new(
52        transport_config: TransportConfig,
53        drift_profile: DriftProfile,
54        task_state: &mut TaskState,
55    ) -> Result<Self, EventError> {
56        let identifier = drift_profile.identifier();
57        match drift_profile {
58            DriftProfile::Spc(spc_profile) => {
59                let queue = SpcQueue::new(spc_profile, transport_config).await?;
60                Ok(QueueNum::Spc(queue))
61            }
62            DriftProfile::Psi(psi_profile) => {
63                let queue =
64                    PsiQueue::new(psi_profile, transport_config, task_state, identifier).await?;
65                Ok(QueueNum::Psi(queue))
66            }
67            DriftProfile::Custom(custom_profile) => {
68                let queue =
69                    CustomQueue::new(custom_profile, transport_config, task_state, identifier)
70                        .await?;
71                Ok(QueueNum::Custom(queue))
72            }
73            DriftProfile::LLM(llm_profile) => {
74                let queue = LLMQueue::new(llm_profile, transport_config).await?;
75                Ok(QueueNum::LLM(queue))
76            }
77        }
78    }
79
80    /// Top-level insert method for the queue
81    /// This method will take a QueueItem and insert it into the appropriate queue
82    /// If features, inserts using insert_features (spc, psi)
83    /// If metrics, inserts using insert_metrics (custom)
84    ///
85    /// # Arguments
86    /// * `entity` - The entity to insert into the queue
87    #[instrument(skip_all)]
88    pub async fn insert(&mut self, entity: QueueItem) -> Result<(), EventError> {
89        debug!("Inserting entity into queue: {:?}", entity);
90        match entity {
91            QueueItem::Features(features) => self.insert_features(features).await,
92            QueueItem::Metrics(metrics) => self.insert_metrics(metrics).await,
93            QueueItem::LLM(llm_record) => self.insert_llm_record(*llm_record).await,
94        }
95    }
96
97    /// Insert features into the queue. Currently only applies to PSI and SPC queues
98    ///
99    /// # Arguments
100    /// * `features` - The features to insert into the queue
101    ///
102    ///
103    #[instrument(skip_all)]
104    pub async fn insert_features(&mut self, features: Features) -> Result<(), EventError> {
105        match self {
106            QueueNum::Psi(queue) => queue.insert(features).await,
107            QueueNum::Spc(queue) => queue.insert(features).await,
108            _ => Err(EventError::QueueNotSupportedFeatureError),
109        }
110    }
111
112    /// Insert metrics into the queue. Currently only applies to custom queues
113    ///
114    /// # Arguments
115    /// * `metrics` - The metrics to insert into the queue
116    ///
117    pub async fn insert_metrics(&mut self, metrics: Metrics) -> Result<(), EventError> {
118        match self {
119            QueueNum::Custom(queue) => queue.insert(metrics).await,
120            _ => Err(EventError::QueueNotSupportedMetricsError),
121        }
122    }
123
124    /// Insert LLM record into the queue. Currently only applies to LLM queues
125    ///
126    /// # Arguments
127    /// * `llm_record` - The LLM record to insert into the queue
128    ///
129    pub async fn insert_llm_record(&mut self, llm_record: LLMRecord) -> Result<(), EventError> {
130        match self {
131            QueueNum::LLM(queue) => {
132                if !queue.should_insert() {
133                    debug!("Skipping LLM record insertion due to sampling rate");
134                    return Ok(());
135                }
136                queue.insert(llm_record).await
137            }
138            _ => Err(EventError::QueueNotSupportedLLMError),
139        }
140    }
141
142    /// Flush the queue. This will publish the records to the producer
143    /// and shut down the background tasks
144    pub async fn flush(&mut self) -> Result<(), EventError> {
145        match self {
146            QueueNum::Spc(queue) => queue.flush().await,
147            QueueNum::Psi(queue) => queue.flush().await,
148            QueueNum::Custom(queue) => queue.flush().await,
149            QueueNum::LLM(queue) => queue.flush().await,
150        }
151    }
152}
153
154#[allow(clippy::too_many_arguments)]
155async fn spawn_queue_event_handler(
156    mut event_rx: UnboundedReceiver<Event>,
157    transport_config: TransportConfig,
158    drift_profile: DriftProfile,
159    id: String,
160    mut task_state: TaskState,
161    cancellation_token: CancellationToken,
162) -> Result<(), EventError> {
163    // This will create the specific queue based on the transport config and drift profile
164    // Available queues:
165    // - PSI - will also create a background task
166    // - SPC
167    // - Custom - will also create a background task
168    // - LLM
169    // event loops are used to monitor the background tasks of both custom and PSI queues
170    let mut queue = match QueueNum::new(transport_config, drift_profile, &mut task_state).await {
171        Ok(q) => q,
172        Err(e) => {
173            error!("Failed to initialize queue {}: {}", id, e);
174            return Err(e);
175        }
176    };
177
178    task_state.set_event_running(true);
179    debug!("Event loop for queue {} set to running", id);
180    loop {
181        tokio::select! {
182            Some(event) = event_rx.recv() => {
183                match event {
184                    Event::Task(entity) => {
185                        match queue.insert(entity).await {
186                            Ok(_) => {
187                                debug!("Inserted entity into queue {}", id);
188                            }
189                            Err(e) => {
190                                error!("Error inserting entity into queue {}: {}", id, e);
191                            }
192                        }
193                    }
194                    Event::Flush => {
195                        debug!("Flush event received for queue {}", id);
196                        match queue.flush().await {
197                            Ok(_) => {
198                                debug!("Successfully flushed queue {}", id);
199                            }
200                            Err(e) => {
201                                error!("Error flushing queue {}: {}", id, e);
202                            }
203                        }
204                    }
205                }
206            }
207
208            _ = cancellation_token.cancelled() => {
209                debug!("Stop signal received for queue {}", id);
210                match queue.flush().await {
211                    Ok(_) => {
212                        debug!("Successfully flushed queue {}", id);
213                    }
214                    Err(e) => {
215                        error!("Error flushing queue {}: {}", id, e);
216                    }
217                }
218                task_state.set_event_running(false);
219                break;
220            }
221
222            else => {
223                debug!("Event channel closed for queue {}, shutting down", id);
224                match queue.flush().await {
225                    Ok(_) => {
226                        debug!("Successfully flushed queue {}", id);
227                    }
228                    Err(e) => {
229                        error!("Error flushing queue {}: {}", id, e);
230                    }
231                }
232                task_state.set_event_running(false);
233                break;
234            }
235        }
236    }
237    Ok(())
238}
239
240// need to add version here
241#[pyclass]
242pub struct ScouterQueue {
243    queues: HashMap<String, Py<QueueBus>>,
244    transport_config: TransportConfig,
245    pub queue_state: Arc<HashMap<String, TaskState>>,
246}
247
248#[pymethods]
249impl ScouterQueue {
250    /// Create a new ScouterQueue from a map of aliases and paths
251    /// This will create a new ScouterQueue for each path in the map
252    ///
253    /// # Process
254    /// 1. Create empty queues
255    /// 2. Extract transport config from python object
256    /// 3. Create a shared tokio runtime that is used to create background queues
257    /// 4. For each path in the map, create a new queue
258    /// 5. Spawn a new thread for each queue (some queues require background tasks)
259    /// 6. Return the ScouterQueue
260    ///
261    /// # Arguments
262    /// * `paths` - A map of aliases to paths
263    /// * `transport_config` - The transport config to use
264    ///
265    /// # Returns
266    /// * `ScouterQueue` - A new ScouterQueue
267    #[staticmethod]
268    #[pyo3(signature = (path, transport_config))]
269    pub fn from_path(
270        py: Python,
271        path: HashMap<String, PathBuf>,
272        transport_config: &Bound<'_, PyAny>,
273    ) -> Result<Self, PyEventError> {
274        ScouterQueue::from_path_rs(py, path, transport_config, false)
275    }
276
277    /// Get a queue by its alias
278    ///
279    /// # Example
280    /// ```python
281    /// from scouter import ScouterQueue
282    ///
283    /// scouter_queues = ScouterQueue.from_path(...)
284    /// scouter_queues["queue_alias"].insert(features)
285    /// ```
286    pub fn __getitem__<'py>(
287        &self,
288        py: Python<'py>,
289        key: &str,
290    ) -> Result<&Bound<'py, QueueBus>, PyEventError> {
291        match self.queues.get(key) {
292            Some(queue) => Ok(queue.bind(py)),
293            None => Err(PyEventError::MissingQueueError(key.to_string())),
294        }
295    }
296
297    #[getter]
298    /// Get the transport config for the ScouterQueue
299    pub fn transport_config<'py>(
300        &self,
301        py: Python<'py>,
302    ) -> Result<Bound<'py, PyAny>, PyEventError> {
303        self.transport_config.to_py(py)
304    }
305
306    pub fn is_empty(&self) -> bool {
307        self.queues.is_empty()
308    }
309
310    /// Triggers a global shutdown for all queues
311    /// 1. This will call shutdown for all queues
312    /// 2. The queues will be cleared from the hashmap
313    /// 3. A loop will be run to ensure all background tasks have been shut down
314    ///
315    /// # Example
316    ///
317    /// ```python
318    /// from scouter import ScouterQueue
319    ///
320    /// scouter_queues = ScouterQueue.from_path(...)
321    /// scouter_queues.shutdown()
322    ///
323    /// ```
324    pub fn shutdown(&mut self) -> Result<(), PyEventError> {
325        debug!("Starting ScouterQueue shutdown");
326
327        for (alias, event_state) in self.queue_state.iter() {
328            debug!("Shutting down queue: {}", alias);
329            // Flush first
330            // shutdown the queue
331            event_state.shutdown_tasks()?;
332        }
333
334        // clear the queues
335        self.queues.clear();
336        if !self.queues.is_empty() {
337            return Err(PyEventError::PendingEventsError);
338        }
339
340        debug!("All queues have been shutdown and cleared");
341
342        Ok(())
343    }
344}
345
346impl ScouterQueue {
347    /// Create a new ScouterQueue from a map of aliases and paths
348    /// This will create a new ScouterQueue for each path in the map
349    /// This method was created to help with integration into the Opsml CardDeck where this
350    /// method is called directly.
351    ///
352    /// # Process
353    /// 1. Create empty queues
354    /// 2. Extract transport config from python object
355    /// 3. Create a shared tokio runtime that is used to create background queues
356    /// 4. For each path in the map, create a new queue
357    /// 5. Spawn a new thread for each queue (some queues require background tasks)
358    /// 6. Return the ScouterQueue
359    ///
360    /// # Arguments
361    /// * `paths` - A map of aliases to paths
362    /// * `transport_config` - The transport config to use
363    /// * *shared_runtime* - A shared tokio runtime that is used to create background queues
364    ///
365    /// # Returns
366    /// * `ScouterQueue` - A new ScouterQueue
367    pub fn from_path_rs(
368        py: Python,
369        path: HashMap<String, PathBuf>,
370        transport_config: &Bound<'_, PyAny>,
371        wait_for_startup: bool,
372    ) -> Result<Self, PyEventError> {
373        debug!("Creating ScouterQueue from path");
374        let mut queues = HashMap::new();
375        let mut queue_state = HashMap::new();
376
377        // assert transport config is not None
378        if transport_config.is_none() {
379            return Err(PyEventError::MissingTransportConfig);
380        }
381
382        // Extract transport config from python object
383        let config = TransportConfig::from_py_config(transport_config)?;
384
385        // load each profile from path
386        // In practice you can load as many profiles as you want
387        for (id, profile_path) in path {
388            let cloned_config = config.clone();
389            let drift_profile = DriftProfile::from_profile_path(profile_path)?;
390
391            let (mut event_state, event_rx) = create_event_state(id.clone());
392
393            // create startup channels to ensure queues are initialized before use
394            let bus = QueueBus::new(event_state.clone(), drift_profile.identifier());
395            queue_state.insert(id.clone(), event_state.clone());
396            let cancellation_token = CancellationToken::new();
397            let cloned_cancellation_token = cancellation_token.clone();
398
399            // queue args
400            let runtime_handle = app_state().handle();
401            let id_clone = id.clone();
402            let cloned_event_state = event_state.clone();
403
404            // Spawn the task without waiting for initialization
405            let handle = runtime_handle.spawn(async move {
406                match spawn_queue_event_handler(
407                    event_rx,
408                    cloned_config,
409                    drift_profile,
410                    id_clone,
411                    cloned_event_state,
412                    cloned_cancellation_token,
413                )
414                .await
415                {
416                    Ok(running) => running,
417                    Err(e) => {
418                        error!("Queue initialization failed: {}", e);
419                    }
420                }
421            });
422
423            // add handle and stop tx to event loops for management
424            event_state.add_event_abort_handle(handle);
425            event_state.add_event_cancellation_token(cancellation_token);
426
427            std::thread::sleep(std::time::Duration::from_millis(1000));
428
429            // wait for background task and event task to signal startup
430            if wait_for_startup {
431                debug!("Waiting for queue {} to signal startup", id);
432                wait_for_background_task(&event_state)?;
433                wait_for_event_task(&event_state)?;
434            }
435
436            let queue = Py::new(py, bus)?;
437            queues.insert(id.clone(), queue);
438        }
439
440        Ok(ScouterQueue {
441            queues,
442            transport_config: config,
443            queue_state: Arc::new(queue_state),
444        })
445    }
446}