drasi_core/query/
auto_future_queue_consumer.rs1use std::{
16 sync::{atomic::AtomicU64, Arc, Weak},
17 time::{Duration, SystemTime},
18};
19
20use async_trait::async_trait;
21use tokio::sync::{mpsc, Mutex};
22
23use crate::{evaluation::context::QueryPartEvaluationContext, interface::FutureQueueConsumer};
24
25use super::ContinuousQuery;
26
27pub struct AutoFutureQueueConsumer {
28 continuous_query: Weak<ContinuousQuery>,
29 channel_tx: mpsc::UnboundedSender<Vec<QueryPartEvaluationContext>>,
30 channel_rx: Mutex<mpsc::UnboundedReceiver<Vec<QueryPartEvaluationContext>>>,
31 now_override: Option<Arc<AtomicU64>>,
32}
33
34impl AutoFutureQueueConsumer {
35 pub fn new(continuous_query: Arc<ContinuousQuery>) -> Self {
36 let (channel_tx, channel_rx) = mpsc::unbounded_channel();
37
38 AutoFutureQueueConsumer {
39 continuous_query: Arc::downgrade(&continuous_query),
40 channel_tx,
41 channel_rx: Mutex::new(channel_rx),
42 now_override: None,
43 }
44 }
45
46 pub fn with_now_override(mut self, now_override: Arc<AtomicU64>) -> Self {
47 self.now_override = Some(now_override);
48 self
49 }
50
51 pub async fn recv(&self, timeout: Duration) -> Option<Vec<QueryPartEvaluationContext>> {
52 let mut rx = self.channel_rx.lock().await;
53 match tokio::time::timeout(timeout, rx.recv()).await {
54 Ok(Some(result)) => Some(result),
55 Ok(None) => None,
56 Err(_) => None,
57 }
58 }
59}
60
61#[async_trait]
62impl FutureQueueConsumer for AutoFutureQueueConsumer {
63 async fn on_items_due(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
64 let cq = self
65 .continuous_query
66 .upgrade()
67 .ok_or("ContinuousQuery has been dropped")?;
68
69 if let Some(due_result) = cq.process_due_futures().await? {
70 if !due_result.results.is_empty() {
71 self.channel_tx.send(due_result.results)?;
72 }
73 }
74 Ok(())
75 }
76
77 async fn on_error(&self, error: Box<dyn std::error::Error + Send + Sync>) {
78 log::error!("Error processing future queue item: {error:?}");
79 }
80
81 fn now(&self) -> u64 {
82 if let Some(now_override) = &self.now_override {
83 return now_override.load(std::sync::atomic::Ordering::Relaxed);
84 }
85
86 SystemTime::now()
87 .duration_since(SystemTime::UNIX_EPOCH)
88 .unwrap_or_default()
89 .as_secs()
90 * 1000
91 }
92}