1#![allow(clippy::useless_conversion)]
2use crate::error::{EventError, PyEventError};
3use crate::queue::bus::Task;
4use crate::queue::bus::{Event, QueueBus, TaskState};
5use crate::queue::custom::CustomQueue;
6use crate::queue::llm::LLMQueue;
7use crate::queue::psi::PsiQueue;
8use crate::queue::spc::SpcQueue;
9use crate::queue::traits::queue::wait_for_background_task;
10use crate::queue::traits::queue::wait_for_event_task;
11use crate::queue::traits::queue::QueueMethods;
12use crate::queue::types::TransportConfig;
13use pyo3::prelude::*;
14use scouter_state::app_state;
15use scouter_types::{DriftProfile, LLMRecord, QueueItem};
16use scouter_types::{Features, Metrics};
17use std::collections::HashMap;
18use std::path::PathBuf;
19use std::sync::Arc;
20use std::sync::RwLock;
21use tokio::runtime;
22use tokio::sync::mpsc::UnboundedReceiver;
23use tokio_util::sync::CancellationToken;
24use tracing::{debug, error, instrument};
25
26fn create_event_state(id: String) -> (TaskState, UnboundedReceiver<Event>) {
27 let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel();
28
29 let background_task = Arc::new(RwLock::new(Task::new()));
31
32 let event_task = Arc::new(RwLock::new(Task::new()));
34
35 let event_state = TaskState {
36 event_task,
37 background_task,
38 event_tx,
39 id,
40 };
41
42 (event_state, event_rx)
43}
44pub enum QueueNum {
45 Spc(SpcQueue),
46 Psi(PsiQueue),
47 Custom(CustomQueue),
48 LLM(LLMQueue),
49}
50impl QueueNum {
52 pub async fn new(
53 transport_config: TransportConfig,
54 drift_profile: DriftProfile,
55 runtime: Arc<runtime::Runtime>,
56 task_state: &mut TaskState,
57 ) -> Result<Self, EventError> {
58 let identifier = drift_profile.identifier();
59 match drift_profile {
60 DriftProfile::Spc(spc_profile) => {
61 let queue = SpcQueue::new(spc_profile, transport_config).await?;
62 Ok(QueueNum::Spc(queue))
63 }
64 DriftProfile::Psi(psi_profile) => {
65 let queue = PsiQueue::new(
66 psi_profile,
67 transport_config,
68 runtime,
69 task_state,
70 identifier,
71 )
72 .await?;
73 Ok(QueueNum::Psi(queue))
74 }
75 DriftProfile::Custom(custom_profile) => {
76 let queue = CustomQueue::new(
77 custom_profile,
78 transport_config,
79 runtime,
80 task_state,
81 identifier,
82 )
83 .await?;
84 Ok(QueueNum::Custom(queue))
85 }
86 DriftProfile::LLM(llm_profile) => {
87 let queue = LLMQueue::new(llm_profile, transport_config).await?;
88 Ok(QueueNum::LLM(queue))
89 }
90 }
91 }
92
93 #[instrument(skip_all)]
101 pub async fn insert(&mut self, entity: QueueItem) -> Result<(), EventError> {
102 debug!("Inserting entity into queue: {:?}", entity);
103 match entity {
104 QueueItem::Features(features) => self.insert_features(features).await,
105 QueueItem::Metrics(metrics) => self.insert_metrics(metrics).await,
106 QueueItem::LLM(llm_record) => self.insert_llm_record(*llm_record).await,
107 }
108 }
109
110 #[instrument(skip_all)]
117 pub async fn insert_features(&mut self, features: Features) -> Result<(), EventError> {
118 match self {
119 QueueNum::Psi(queue) => queue.insert(features).await,
120 QueueNum::Spc(queue) => queue.insert(features).await,
121 _ => Err(EventError::QueueNotSupportedFeatureError),
122 }
123 }
124
125 pub async fn insert_metrics(&mut self, metrics: Metrics) -> Result<(), EventError> {
131 match self {
132 QueueNum::Custom(queue) => queue.insert(metrics).await,
133 _ => Err(EventError::QueueNotSupportedMetricsError),
134 }
135 }
136
137 pub async fn insert_llm_record(&mut self, llm_record: LLMRecord) -> Result<(), EventError> {
143 match self {
144 QueueNum::LLM(queue) => {
145 if !queue.should_insert() {
146 debug!("Skipping LLM record insertion due to sampling rate");
147 return Ok(());
148 }
149 queue.insert(llm_record).await
150 }
151 _ => Err(EventError::QueueNotSupportedLLMError),
152 }
153 }
154
155 pub async fn flush(&mut self) -> Result<(), EventError> {
158 match self {
159 QueueNum::Spc(queue) => queue.flush().await,
160 QueueNum::Psi(queue) => queue.flush().await,
161 QueueNum::Custom(queue) => queue.flush().await,
162 QueueNum::LLM(queue) => queue.flush().await,
163 }
164 }
165}
166
167#[allow(clippy::too_many_arguments)]
168async fn spawn_queue_event_handler(
169 mut event_rx: UnboundedReceiver<Event>,
170 transport_config: TransportConfig,
171 drift_profile: DriftProfile,
172 runtime: Arc<runtime::Runtime>,
173 id: String,
174 mut task_state: TaskState,
175 cancellation_token: CancellationToken,
176) -> Result<(), EventError> {
177 let mut queue =
185 match QueueNum::new(transport_config, drift_profile, runtime, &mut task_state).await {
186 Ok(q) => q,
187 Err(e) => {
188 error!("Failed to initialize queue {}: {}", id, e);
189 return Err(e);
190 }
191 };
192
193 task_state.set_event_running(true);
194 debug!("Event loop for queue {} set to running", id);
195 loop {
196 tokio::select! {
197 Some(event) = event_rx.recv() => {
198 match event {
199 Event::Task(entity) => {
200 match queue.insert(entity).await {
201 Ok(_) => {
202 debug!("Inserted entity into queue {}", id);
203 }
204 Err(e) => {
205 error!("Error inserting entity into queue {}: {}", id, e);
206 }
207 }
208 }
209 Event::Flush => {
210 debug!("Flush event received for queue {}", id);
211 match queue.flush().await {
212 Ok(_) => {
213 debug!("Successfully flushed queue {}", id);
214 }
215 Err(e) => {
216 error!("Error flushing queue {}: {}", id, e);
217 }
218 }
219 }
220 }
221 }
222
223 _ = cancellation_token.cancelled() => {
224 debug!("Stop signal received for queue {}", id);
225 match queue.flush().await {
226 Ok(_) => {
227 debug!("Successfully flushed queue {}", id);
228 }
229 Err(e) => {
230 error!("Error flushing queue {}: {}", id, e);
231 }
232 }
233 task_state.set_event_running(false);
234 break;
235 }
236
237 else => {
238 debug!("Event channel closed for queue {}, shutting down", id);
239 match queue.flush().await {
240 Ok(_) => {
241 debug!("Successfully flushed queue {}", id);
242 }
243 Err(e) => {
244 error!("Error flushing queue {}: {}", id, e);
245 }
246 }
247 task_state.set_event_running(false);
248 break;
249 }
250 }
251 }
252 Ok(())
253}
254
255#[pyclass]
257pub struct ScouterQueue {
258 queues: HashMap<String, Py<QueueBus>>,
259 _shared_runtime: Arc<tokio::runtime::Runtime>,
260 transport_config: TransportConfig,
261 pub queue_state: Arc<HashMap<String, TaskState>>,
262}
263
264#[pymethods]
265impl ScouterQueue {
266 #[staticmethod]
284 #[pyo3(signature = (path, transport_config))]
285 pub fn from_path(
286 py: Python,
287 path: HashMap<String, PathBuf>,
288 transport_config: &Bound<'_, PyAny>,
289 ) -> Result<Self, PyEventError> {
290 let shared_runtime = app_state().start_runtime();
292 ScouterQueue::from_path_rs(py, path, transport_config, shared_runtime, false)
293 }
294
295 pub fn __getitem__<'py>(
305 &self,
306 py: Python<'py>,
307 key: &str,
308 ) -> Result<&Bound<'py, QueueBus>, PyEventError> {
309 match self.queues.get(key) {
310 Some(queue) => Ok(queue.bind(py)),
311 None => Err(PyEventError::MissingQueueError(key.to_string())),
312 }
313 }
314
315 #[getter]
316 pub fn transport_config<'py>(
318 &self,
319 py: Python<'py>,
320 ) -> Result<Bound<'py, PyAny>, PyEventError> {
321 self.transport_config.to_py(py)
322 }
323
324 pub fn is_empty(&self) -> bool {
325 self.queues.is_empty()
326 }
327
328 pub fn shutdown(&mut self) -> Result<(), PyEventError> {
343 debug!("Starting ScouterQueue shutdown");
344
345 for (alias, event_state) in self.queue_state.iter() {
346 debug!("Shutting down queue: {}", alias);
347 event_state.shutdown_tasks()?;
350 }
351
352 self.queues.clear();
354 if !self.queues.is_empty() {
355 return Err(PyEventError::PendingEventsError);
356 }
357
358 debug!("All queues have been shutdown and cleared");
359
360 Ok(())
361 }
362}
363
364impl ScouterQueue {
365 pub fn from_path_rs(
386 py: Python,
387 path: HashMap<String, PathBuf>,
388 transport_config: &Bound<'_, PyAny>,
389 shared_runtime: Arc<tokio::runtime::Runtime>,
390 wait_for_startup: bool,
391 ) -> Result<Self, PyEventError> {
392 debug!("Creating ScouterQueue from path");
393 let mut queues = HashMap::new();
394 let mut queue_state = HashMap::new();
395
396 if transport_config.is_none() {
398 return Err(PyEventError::MissingTransportConfig);
399 }
400
401 let config = TransportConfig::from_py_config(transport_config)?;
403
404 for (id, profile_path) in path {
407 let cloned_config = config.clone();
408 let drift_profile = DriftProfile::from_profile_path(profile_path)?;
409
410 let (mut event_state, event_rx) = create_event_state(id.clone());
411
412 let bus = QueueBus::new(event_state.clone(), drift_profile.identifier());
414 queue_state.insert(id.clone(), event_state.clone());
415 let cancellation_token = CancellationToken::new();
416 let cloned_cancellation_token = cancellation_token.clone();
417
418 let clone_runtime = shared_runtime.clone();
420 let id_clone = id.clone();
421 let cloned_event_state = event_state.clone();
422
423 let handle = shared_runtime.spawn(async move {
425 match spawn_queue_event_handler(
426 event_rx,
427 cloned_config,
428 drift_profile,
429 clone_runtime,
430 id_clone,
431 cloned_event_state,
432 cloned_cancellation_token,
433 )
434 .await
435 {
436 Ok(running) => running,
437 Err(e) => {
438 error!("Queue initialization failed: {}", e);
439 }
440 }
441 });
442
443 event_state.add_event_abort_handle(handle);
445 event_state.add_event_cancellation_token(cancellation_token);
446
447 std::thread::sleep(std::time::Duration::from_millis(1000));
448
449 if wait_for_startup {
451 debug!("Waiting for queue {} to signal startup", id);
452 wait_for_background_task(&event_state)?;
453 wait_for_event_task(&event_state)?;
454 }
455
456 let queue = Py::new(py, bus)?;
457 queues.insert(id.clone(), queue);
458 }
459
460 Ok(ScouterQueue {
461 queues,
462 _shared_runtime: shared_runtime,
464 transport_config: config,
465 queue_state: Arc::new(queue_state),
466 })
467 }
468}