1#![allow(clippy::useless_conversion)]
2use crate::error::{EventError, PyEventError};
3use crate::queue::bus::Task;
4use crate::queue::bus::{Event, QueueBus, TaskState};
5use crate::queue::custom::CustomQueue;
6use crate::queue::llm::LLMQueue;
7use crate::queue::psi::PsiQueue;
8use crate::queue::spc::SpcQueue;
9use crate::queue::traits::queue::wait_for_background_task;
10use crate::queue::traits::queue::wait_for_event_task;
11use crate::queue::traits::queue::QueueMethods;
12use crate::queue::types::TransportConfig;
13use pyo3::prelude::*;
14use scouter_state::app_state;
15use scouter_types::{DriftProfile, LLMRecord, QueueItem};
16use scouter_types::{Features, Metrics};
17use std::collections::HashMap;
18use std::path::PathBuf;
19use std::sync::Arc;
20use std::sync::RwLock;
21use tokio::sync::mpsc::UnboundedReceiver;
22use tokio_util::sync::CancellationToken;
23use tracing::{debug, error, instrument};
24
25fn create_event_state(id: String) -> (TaskState, UnboundedReceiver<Event>) {
26 let (event_tx, event_rx) = tokio::sync::mpsc::unbounded_channel();
27
28 let background_task = Arc::new(RwLock::new(Task::new()));
30
31 let event_task = Arc::new(RwLock::new(Task::new()));
33
34 let event_state = TaskState {
35 event_task,
36 background_task,
37 event_tx,
38 id,
39 };
40
41 (event_state, event_rx)
42}
43pub enum QueueNum {
44 Spc(SpcQueue),
45 Psi(PsiQueue),
46 Custom(CustomQueue),
47 LLM(LLMQueue),
48}
49impl QueueNum {
51 pub async fn new(
52 transport_config: TransportConfig,
53 drift_profile: DriftProfile,
54 task_state: &mut TaskState,
55 ) -> Result<Self, EventError> {
56 let identifier = drift_profile.identifier();
57 match drift_profile {
58 DriftProfile::Spc(spc_profile) => {
59 let queue = SpcQueue::new(spc_profile, transport_config).await?;
60 Ok(QueueNum::Spc(queue))
61 }
62 DriftProfile::Psi(psi_profile) => {
63 let queue =
64 PsiQueue::new(psi_profile, transport_config, task_state, identifier).await?;
65 Ok(QueueNum::Psi(queue))
66 }
67 DriftProfile::Custom(custom_profile) => {
68 let queue =
69 CustomQueue::new(custom_profile, transport_config, task_state, identifier)
70 .await?;
71 Ok(QueueNum::Custom(queue))
72 }
73 DriftProfile::LLM(llm_profile) => {
74 let queue = LLMQueue::new(llm_profile, transport_config).await?;
75 Ok(QueueNum::LLM(queue))
76 }
77 }
78 }
79
80 #[instrument(skip_all)]
88 pub async fn insert(&mut self, entity: QueueItem) -> Result<(), EventError> {
89 debug!("Inserting entity into queue: {:?}", entity);
90 match entity {
91 QueueItem::Features(features) => self.insert_features(features).await,
92 QueueItem::Metrics(metrics) => self.insert_metrics(metrics).await,
93 QueueItem::LLM(llm_record) => self.insert_llm_record(*llm_record).await,
94 }
95 }
96
97 #[instrument(skip_all)]
104 pub async fn insert_features(&mut self, features: Features) -> Result<(), EventError> {
105 match self {
106 QueueNum::Psi(queue) => queue.insert(features).await,
107 QueueNum::Spc(queue) => queue.insert(features).await,
108 _ => Err(EventError::QueueNotSupportedFeatureError),
109 }
110 }
111
112 pub async fn insert_metrics(&mut self, metrics: Metrics) -> Result<(), EventError> {
118 match self {
119 QueueNum::Custom(queue) => queue.insert(metrics).await,
120 _ => Err(EventError::QueueNotSupportedMetricsError),
121 }
122 }
123
124 pub async fn insert_llm_record(&mut self, llm_record: LLMRecord) -> Result<(), EventError> {
130 match self {
131 QueueNum::LLM(queue) => {
132 if !queue.should_insert() {
133 debug!("Skipping LLM record insertion due to sampling rate");
134 return Ok(());
135 }
136 queue.insert(llm_record).await
137 }
138 _ => Err(EventError::QueueNotSupportedLLMError),
139 }
140 }
141
142 pub async fn flush(&mut self) -> Result<(), EventError> {
145 match self {
146 QueueNum::Spc(queue) => queue.flush().await,
147 QueueNum::Psi(queue) => queue.flush().await,
148 QueueNum::Custom(queue) => queue.flush().await,
149 QueueNum::LLM(queue) => queue.flush().await,
150 }
151 }
152}
153
154#[allow(clippy::too_many_arguments)]
155async fn spawn_queue_event_handler(
156 mut event_rx: UnboundedReceiver<Event>,
157 transport_config: TransportConfig,
158 drift_profile: DriftProfile,
159 id: String,
160 mut task_state: TaskState,
161 cancellation_token: CancellationToken,
162) -> Result<(), EventError> {
163 let mut queue = match QueueNum::new(transport_config, drift_profile, &mut task_state).await {
171 Ok(q) => q,
172 Err(e) => {
173 error!("Failed to initialize queue {}: {}", id, e);
174 return Err(e);
175 }
176 };
177
178 task_state.set_event_running(true);
179 debug!("Event loop for queue {} set to running", id);
180 loop {
181 tokio::select! {
182 Some(event) = event_rx.recv() => {
183 match event {
184 Event::Task(entity) => {
185 match queue.insert(entity).await {
186 Ok(_) => {
187 debug!("Inserted entity into queue {}", id);
188 }
189 Err(e) => {
190 error!("Error inserting entity into queue {}: {}", id, e);
191 }
192 }
193 }
194 Event::Flush => {
195 debug!("Flush event received for queue {}", id);
196 match queue.flush().await {
197 Ok(_) => {
198 debug!("Successfully flushed queue {}", id);
199 }
200 Err(e) => {
201 error!("Error flushing queue {}: {}", id, e);
202 }
203 }
204 }
205 }
206 }
207
208 _ = cancellation_token.cancelled() => {
209 debug!("Stop signal received for queue {}", id);
210 match queue.flush().await {
211 Ok(_) => {
212 debug!("Successfully flushed queue {}", id);
213 }
214 Err(e) => {
215 error!("Error flushing queue {}: {}", id, e);
216 }
217 }
218 task_state.set_event_running(false);
219 break;
220 }
221
222 else => {
223 debug!("Event channel closed for queue {}, shutting down", id);
224 match queue.flush().await {
225 Ok(_) => {
226 debug!("Successfully flushed queue {}", id);
227 }
228 Err(e) => {
229 error!("Error flushing queue {}: {}", id, e);
230 }
231 }
232 task_state.set_event_running(false);
233 break;
234 }
235 }
236 }
237 Ok(())
238}
239
240#[pyclass]
242pub struct ScouterQueue {
243 queues: HashMap<String, Py<QueueBus>>,
244 transport_config: TransportConfig,
245 pub queue_state: Arc<HashMap<String, TaskState>>,
246}
247
248#[pymethods]
249impl ScouterQueue {
250 #[staticmethod]
268 #[pyo3(signature = (path, transport_config))]
269 pub fn from_path(
270 py: Python,
271 path: HashMap<String, PathBuf>,
272 transport_config: &Bound<'_, PyAny>,
273 ) -> Result<Self, PyEventError> {
274 ScouterQueue::from_path_rs(py, path, transport_config, false)
275 }
276
277 pub fn __getitem__<'py>(
287 &self,
288 py: Python<'py>,
289 key: &str,
290 ) -> Result<&Bound<'py, QueueBus>, PyEventError> {
291 match self.queues.get(key) {
292 Some(queue) => Ok(queue.bind(py)),
293 None => Err(PyEventError::MissingQueueError(key.to_string())),
294 }
295 }
296
297 #[getter]
298 pub fn transport_config<'py>(
300 &self,
301 py: Python<'py>,
302 ) -> Result<Bound<'py, PyAny>, PyEventError> {
303 self.transport_config.to_py(py)
304 }
305
306 pub fn is_empty(&self) -> bool {
307 self.queues.is_empty()
308 }
309
310 pub fn shutdown(&mut self) -> Result<(), PyEventError> {
325 debug!("Starting ScouterQueue shutdown");
326
327 for (alias, event_state) in self.queue_state.iter() {
328 debug!("Shutting down queue: {}", alias);
329 event_state.shutdown_tasks()?;
332 }
333
334 self.queues.clear();
336 if !self.queues.is_empty() {
337 return Err(PyEventError::PendingEventsError);
338 }
339
340 debug!("All queues have been shutdown and cleared");
341
342 Ok(())
343 }
344}
345
346impl ScouterQueue {
347 pub fn from_path_rs(
368 py: Python,
369 path: HashMap<String, PathBuf>,
370 transport_config: &Bound<'_, PyAny>,
371 wait_for_startup: bool,
372 ) -> Result<Self, PyEventError> {
373 debug!("Creating ScouterQueue from path");
374 let mut queues = HashMap::new();
375 let mut queue_state = HashMap::new();
376
377 if transport_config.is_none() {
379 return Err(PyEventError::MissingTransportConfig);
380 }
381
382 let config = TransportConfig::from_py_config(transport_config)?;
384
385 for (id, profile_path) in path {
388 let cloned_config = config.clone();
389 let drift_profile = DriftProfile::from_profile_path(profile_path)?;
390
391 let (mut event_state, event_rx) = create_event_state(id.clone());
392
393 let bus = QueueBus::new(event_state.clone(), drift_profile.identifier());
395 queue_state.insert(id.clone(), event_state.clone());
396 let cancellation_token = CancellationToken::new();
397 let cloned_cancellation_token = cancellation_token.clone();
398
399 let runtime_handle = app_state().handle();
401 let id_clone = id.clone();
402 let cloned_event_state = event_state.clone();
403
404 let handle = runtime_handle.spawn(async move {
406 match spawn_queue_event_handler(
407 event_rx,
408 cloned_config,
409 drift_profile,
410 id_clone,
411 cloned_event_state,
412 cloned_cancellation_token,
413 )
414 .await
415 {
416 Ok(running) => running,
417 Err(e) => {
418 error!("Queue initialization failed: {}", e);
419 }
420 }
421 });
422
423 event_state.add_event_abort_handle(handle);
425 event_state.add_event_cancellation_token(cancellation_token);
426
427 std::thread::sleep(std::time::Duration::from_millis(1000));
428
429 if wait_for_startup {
431 debug!("Waiting for queue {} to signal startup", id);
432 wait_for_background_task(&event_state)?;
433 wait_for_event_task(&event_state)?;
434 }
435
436 let queue = Py::new(py, bus)?;
437 queues.insert(id.clone(), queue);
438 }
439
440 Ok(ScouterQueue {
441 queues,
442 transport_config: config,
443 queue_state: Arc::new(queue_state),
444 })
445 }
446}