scouter_events/queue/
py_queue.rs

1#![allow(clippy::useless_conversion)]
2use crate::error::{EventError, PyEventError};
3use crate::queue::bus::Task;
4use crate::queue::bus::{Event, QueueBus, TaskState};
5use crate::queue::custom::CustomQueue;
6use crate::queue::llm::LLMQueue;
7use crate::queue::psi::PsiQueue;
8use crate::queue::spc::SpcQueue;
9use crate::queue::traits::queue::wait_for_background_task;
10use crate::queue::traits::queue::wait_for_event_task;
11use crate::queue::traits::queue::QueueMethods;
12use crate::queue::types::TransportConfig;
13use pyo3::prelude::*;
14use scouter_state::app_state;
15use scouter_types::{DriftProfile, LLMRecord, QueueItem};
16use scouter_types::{Features, Metrics};
17use std::collections::HashMap;
18use std::path::PathBuf;
19use std::sync::Arc;
20use std::sync::RwLock;
21use tokio::runtime;
22use tokio::sync::mpsc::UnboundedReceiver;
23use tokio_util::sync::CancellationToken;
24use tracing::{debug, error, instrument};
25
26fn create_event_state(id: String) -> (TaskState, UnboundedReceiver<Event>) {
27    let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel();
28
29    // get background loop
30    let background_task = Arc::new(RwLock::new(Task::new()));
31
32    // get event loop
33    let event_task = Arc::new(RwLock::new(Task::new()));
34
35    let event_state = TaskState {
36        event_task,
37        background_task,
38        event_tx,
39        id,
40    };
41
42    (event_state, event_rx)
43}
44pub enum QueueNum {
45    Spc(SpcQueue),
46    Psi(PsiQueue),
47    Custom(CustomQueue),
48    LLM(LLMQueue),
49}
50// need to add queue running lock to each and return it to the queue bus
51impl QueueNum {
52    pub async fn new(
53        transport_config: TransportConfig,
54        drift_profile: DriftProfile,
55        runtime: Arc<runtime::Runtime>,
56        task_state: &mut TaskState,
57    ) -> Result<Self, EventError> {
58        let identifier = drift_profile.identifier();
59        match drift_profile {
60            DriftProfile::Spc(spc_profile) => {
61                let queue = SpcQueue::new(spc_profile, transport_config).await?;
62                Ok(QueueNum::Spc(queue))
63            }
64            DriftProfile::Psi(psi_profile) => {
65                let queue = PsiQueue::new(
66                    psi_profile,
67                    transport_config,
68                    runtime,
69                    task_state,
70                    identifier,
71                )
72                .await?;
73                Ok(QueueNum::Psi(queue))
74            }
75            DriftProfile::Custom(custom_profile) => {
76                let queue = CustomQueue::new(
77                    custom_profile,
78                    transport_config,
79                    runtime,
80                    task_state,
81                    identifier,
82                )
83                .await?;
84                Ok(QueueNum::Custom(queue))
85            }
86            DriftProfile::LLM(llm_profile) => {
87                let queue = LLMQueue::new(llm_profile, transport_config).await?;
88                Ok(QueueNum::LLM(queue))
89            }
90        }
91    }
92
93    /// Top-level insert method for the queue
94    /// This method will take a QueueItem and insert it into the appropriate queue
95    /// If features, inserts using insert_features (spc, psi)
96    /// If metrics, inserts using insert_metrics (custom)
97    ///
98    /// # Arguments
99    /// * `entity` - The entity to insert into the queue
100    #[instrument(skip_all)]
101    pub async fn insert(&mut self, entity: QueueItem) -> Result<(), EventError> {
102        debug!("Inserting entity into queue: {:?}", entity);
103        match entity {
104            QueueItem::Features(features) => self.insert_features(features).await,
105            QueueItem::Metrics(metrics) => self.insert_metrics(metrics).await,
106            QueueItem::LLM(llm_record) => self.insert_llm_record(*llm_record).await,
107        }
108    }
109
110    /// Insert features into the queue. Currently only applies to PSI and SPC queues
111    ///
112    /// # Arguments
113    /// * `features` - The features to insert into the queue
114    ///
115    ///
116    #[instrument(skip_all)]
117    pub async fn insert_features(&mut self, features: Features) -> Result<(), EventError> {
118        match self {
119            QueueNum::Psi(queue) => queue.insert(features).await,
120            QueueNum::Spc(queue) => queue.insert(features).await,
121            _ => Err(EventError::QueueNotSupportedFeatureError),
122        }
123    }
124
125    /// Insert metrics into the queue. Currently only applies to custom queues
126    ///
127    /// # Arguments
128    /// * `metrics` - The metrics to insert into the queue
129    ///
130    pub async fn insert_metrics(&mut self, metrics: Metrics) -> Result<(), EventError> {
131        match self {
132            QueueNum::Custom(queue) => queue.insert(metrics).await,
133            _ => Err(EventError::QueueNotSupportedMetricsError),
134        }
135    }
136
137    /// Insert LLM record into the queue. Currently only applies to LLM queues
138    ///
139    /// # Arguments
140    /// * `llm_record` - The LLM record to insert into the queue
141    ///
142    pub async fn insert_llm_record(&mut self, llm_record: LLMRecord) -> Result<(), EventError> {
143        match self {
144            QueueNum::LLM(queue) => {
145                if !queue.should_insert() {
146                    debug!("Skipping LLM record insertion due to sampling rate");
147                    return Ok(());
148                }
149                queue.insert(llm_record).await
150            }
151            _ => Err(EventError::QueueNotSupportedLLMError),
152        }
153    }
154
155    /// Flush the queue. This will publish the records to the producer
156    /// and shut down the background tasks
157    pub async fn flush(&mut self) -> Result<(), EventError> {
158        match self {
159            QueueNum::Spc(queue) => queue.flush().await,
160            QueueNum::Psi(queue) => queue.flush().await,
161            QueueNum::Custom(queue) => queue.flush().await,
162            QueueNum::LLM(queue) => queue.flush().await,
163        }
164    }
165}
166
167#[allow(clippy::too_many_arguments)]
168async fn spawn_queue_event_handler(
169    mut event_rx: UnboundedReceiver<Event>,
170    transport_config: TransportConfig,
171    drift_profile: DriftProfile,
172    runtime: Arc<runtime::Runtime>,
173    id: String,
174    mut task_state: TaskState,
175    cancellation_token: CancellationToken,
176) -> Result<(), EventError> {
177    // This will create the specific queue based on the transport config and drift profile
178    // Available queues:
179    // - PSI - will also create a background task
180    // - SPC
181    // - Custom - will also create a background task
182    // - LLM
183    // event loops are used to monitor the background tasks of both custom and PSI queues
184    let mut queue =
185        match QueueNum::new(transport_config, drift_profile, runtime, &mut task_state).await {
186            Ok(q) => q,
187            Err(e) => {
188                error!("Failed to initialize queue {}: {}", id, e);
189                return Err(e);
190            }
191        };
192
193    task_state.set_event_running(true);
194    debug!("Event loop for queue {} set to running", id);
195    loop {
196        tokio::select! {
197            Some(event) = event_rx.recv() => {
198                match event {
199                    Event::Task(entity) => {
200                        match queue.insert(entity).await {
201                            Ok(_) => {
202                                debug!("Inserted entity into queue {}", id);
203                            }
204                            Err(e) => {
205                                error!("Error inserting entity into queue {}: {}", id, e);
206                            }
207                        }
208                    }
209                    Event::Flush => {
210                        debug!("Flush event received for queue {}", id);
211                        match queue.flush().await {
212                            Ok(_) => {
213                                debug!("Successfully flushed queue {}", id);
214                            }
215                            Err(e) => {
216                                error!("Error flushing queue {}: {}", id, e);
217                            }
218                        }
219                    }
220                }
221            }
222
223            _ = cancellation_token.cancelled() => {
224                debug!("Stop signal received for queue {}", id);
225                match queue.flush().await {
226                    Ok(_) => {
227                        debug!("Successfully flushed queue {}", id);
228                    }
229                    Err(e) => {
230                        error!("Error flushing queue {}: {}", id, e);
231                    }
232                }
233                task_state.set_event_running(false);
234                break;
235            }
236
237            else => {
238                debug!("Event channel closed for queue {}, shutting down", id);
239                match queue.flush().await {
240                    Ok(_) => {
241                        debug!("Successfully flushed queue {}", id);
242                    }
243                    Err(e) => {
244                        error!("Error flushing queue {}: {}", id, e);
245                    }
246                }
247                task_state.set_event_running(false);
248                break;
249            }
250        }
251    }
252    Ok(())
253}
254
255// need to add version here
256#[pyclass]
257pub struct ScouterQueue {
258    queues: HashMap<String, Py<QueueBus>>,
259    _shared_runtime: Arc<tokio::runtime::Runtime>,
260    transport_config: TransportConfig,
261    pub queue_state: Arc<HashMap<String, TaskState>>,
262}
263
264#[pymethods]
265impl ScouterQueue {
266    /// Create a new ScouterQueue from a map of aliases and paths
267    /// This will create a new ScouterQueue for each path in the map
268    ///
269    /// # Process
270    /// 1. Create empty queues
271    /// 2. Extract transport config from python object
272    /// 3. Create a shared tokio runtime that is used to create background queues
273    /// 4. For each path in the map, create a new queue
274    /// 5. Spawn a new thread for each queue (some queues require background tasks)
275    /// 6. Return the ScouterQueue
276    ///
277    /// # Arguments
278    /// * `paths` - A map of aliases to paths
279    /// * `transport_config` - The transport config to use
280    ///
281    /// # Returns
282    /// * `ScouterQueue` - A new ScouterQueue
283    #[staticmethod]
284    #[pyo3(signature = (path, transport_config))]
285    pub fn from_path(
286        py: Python,
287        path: HashMap<String, PathBuf>,
288        transport_config: &Bound<'_, PyAny>,
289    ) -> Result<Self, PyEventError> {
290        // create a tokio runtime to run the background tasks
291        let shared_runtime = app_state().start_runtime();
292        ScouterQueue::from_path_rs(py, path, transport_config, shared_runtime, false)
293    }
294
295    /// Get a queue by its alias
296    ///
297    /// # Example
298    /// ```python
299    /// from scouter import ScouterQueue
300    ///
301    /// scouter_queues = ScouterQueue.from_path(...)
302    /// scouter_queues["queue_alias"].insert(features)
303    /// ```
304    pub fn __getitem__<'py>(
305        &self,
306        py: Python<'py>,
307        key: &str,
308    ) -> Result<&Bound<'py, QueueBus>, PyEventError> {
309        match self.queues.get(key) {
310            Some(queue) => Ok(queue.bind(py)),
311            None => Err(PyEventError::MissingQueueError(key.to_string())),
312        }
313    }
314
315    #[getter]
316    /// Get the transport config for the ScouterQueue
317    pub fn transport_config<'py>(
318        &self,
319        py: Python<'py>,
320    ) -> Result<Bound<'py, PyAny>, PyEventError> {
321        self.transport_config.to_py(py)
322    }
323
324    pub fn is_empty(&self) -> bool {
325        self.queues.is_empty()
326    }
327
328    /// Triggers a global shutdown for all queues
329    /// 1. This will call shutdown for all queues
330    /// 2. The queues will be cleared from the hashmap
331    /// 3. A loop will be run to ensure all background tasks have been shut down
332    ///
333    /// # Example
334    ///
335    /// ```python
336    /// from scouter import ScouterQueue
337    ///
338    /// scouter_queues = ScouterQueue.from_path(...)
339    /// scouter_queues.shutdown()
340    ///
341    /// ```
342    pub fn shutdown(&mut self) -> Result<(), PyEventError> {
343        debug!("Starting ScouterQueue shutdown");
344
345        for (alias, event_state) in self.queue_state.iter() {
346            debug!("Shutting down queue: {}", alias);
347            // Flush first
348            // shutdown the queue
349            event_state.shutdown_tasks()?;
350        }
351
352        // clear the queues
353        self.queues.clear();
354        if !self.queues.is_empty() {
355            return Err(PyEventError::PendingEventsError);
356        }
357
358        debug!("All queues have been shutdown and cleared");
359
360        Ok(())
361    }
362}
363
364impl ScouterQueue {
365    /// Create a new ScouterQueue from a map of aliases and paths
366    /// This will create a new ScouterQueue for each path in the map
367    /// This method was created to help with integration into the Opsml CardDeck where this
368    /// method is called directly.
369    ///
370    /// # Process
371    /// 1. Create empty queues
372    /// 2. Extract transport config from python object
373    /// 3. Create a shared tokio runtime that is used to create background queues
374    /// 4. For each path in the map, create a new queue
375    /// 5. Spawn a new thread for each queue (some queues require background tasks)
376    /// 6. Return the ScouterQueue
377    ///
378    /// # Arguments
379    /// * `paths` - A map of aliases to paths
380    /// * `transport_config` - The transport config to use
381    /// * *shared_runtime* - A shared tokio runtime that is used to create background queues
382    ///
383    /// # Returns
384    /// * `ScouterQueue` - A new ScouterQueue
385    pub fn from_path_rs(
386        py: Python,
387        path: HashMap<String, PathBuf>,
388        transport_config: &Bound<'_, PyAny>,
389        shared_runtime: Arc<tokio::runtime::Runtime>,
390        wait_for_startup: bool,
391    ) -> Result<Self, PyEventError> {
392        debug!("Creating ScouterQueue from path");
393        let mut queues = HashMap::new();
394        let mut queue_state = HashMap::new();
395
396        // assert transport config is not None
397        if transport_config.is_none() {
398            return Err(PyEventError::MissingTransportConfig);
399        }
400
401        // Extract transport config from python object
402        let config = TransportConfig::from_py_config(transport_config)?;
403
404        // load each profile from path
405        // In practice you can load as many profiles as you want
406        for (id, profile_path) in path {
407            let cloned_config = config.clone();
408            let drift_profile = DriftProfile::from_profile_path(profile_path)?;
409
410            let (mut event_state, event_rx) = create_event_state(id.clone());
411
412            // create startup channels to ensure queues are initialized before use
413            let bus = QueueBus::new(event_state.clone(), drift_profile.identifier());
414            queue_state.insert(id.clone(), event_state.clone());
415            let cancellation_token = CancellationToken::new();
416            let cloned_cancellation_token = cancellation_token.clone();
417
418            // queue args
419            let clone_runtime = shared_runtime.clone();
420            let id_clone = id.clone();
421            let cloned_event_state = event_state.clone();
422
423            // Spawn the task without waiting for initialization
424            let handle = shared_runtime.spawn(async move {
425                match spawn_queue_event_handler(
426                    event_rx,
427                    cloned_config,
428                    drift_profile,
429                    clone_runtime,
430                    id_clone,
431                    cloned_event_state,
432                    cloned_cancellation_token,
433                )
434                .await
435                {
436                    Ok(running) => running,
437                    Err(e) => {
438                        error!("Queue initialization failed: {}", e);
439                    }
440                }
441            });
442
443            // add handle and stop tx to event loops for management
444            event_state.add_event_abort_handle(handle);
445            event_state.add_event_cancellation_token(cancellation_token);
446
447            std::thread::sleep(std::time::Duration::from_millis(1000));
448
449            // wait for background task and event task to signal startup
450            if wait_for_startup {
451                debug!("Waiting for queue {} to signal startup", id);
452                wait_for_background_task(&event_state)?;
453                wait_for_event_task(&event_state)?;
454            }
455
456            let queue = Py::new(py, bus)?;
457            queues.insert(id.clone(), queue);
458        }
459
460        Ok(ScouterQueue {
461            queues,
462            // need to keep the runtime alive for the life of ScouterQueue
463            _shared_runtime: shared_runtime,
464            transport_config: config,
465            queue_state: Arc::new(queue_state),
466        })
467    }
468}