drasi_core/query/
auto_future_queue_consumer.rs1use std::{
16 sync::{atomic::AtomicU64, Arc},
17 time::{Duration, SystemTime},
18};
19
20use async_trait::async_trait;
21use tokio::sync::{mpsc, Mutex};
22
23use crate::{
24 evaluation::context::QueryPartEvaluationContext,
25 interface::{FutureElementRef, FutureQueueConsumer},
26 models::SourceChange,
27};
28
29use super::ContinuousQuery;
30
31pub struct AutoFutureQueueConsumer {
32 continuous_query: Arc<ContinuousQuery>,
33 channel_tx: mpsc::UnboundedSender<Vec<QueryPartEvaluationContext>>,
34 channel_rx: Mutex<mpsc::UnboundedReceiver<Vec<QueryPartEvaluationContext>>>,
35 now_override: Option<Arc<AtomicU64>>,
36}
37
38impl AutoFutureQueueConsumer {
39 pub fn new(continuous_query: Arc<ContinuousQuery>) -> Self {
40 let (channel_tx, channel_rx) = mpsc::unbounded_channel();
41
42 AutoFutureQueueConsumer {
43 continuous_query,
44 channel_tx,
45 channel_rx: Mutex::new(channel_rx),
46 now_override: None,
47 }
48 }
49
50 pub fn with_now_override(mut self, now_override: Arc<AtomicU64>) -> Self {
51 self.now_override = Some(now_override);
52 self
53 }
54
55 pub async fn recv(&self, timeout: Duration) -> Option<Vec<QueryPartEvaluationContext>> {
56 let mut rx = self.channel_rx.lock().await;
57 match tokio::time::timeout(timeout, rx.recv()).await {
58 Ok(Some(result)) => Some(result),
59 Ok(None) => None,
60 Err(_) => None,
61 }
62 }
63}
64
65#[async_trait]
66impl FutureQueueConsumer for AutoFutureQueueConsumer {
67 async fn on_due(
68 &self,
69 future_ref: &FutureElementRef,
70 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
71 let change = SourceChange::Future {
72 future_ref: future_ref.clone(),
73 };
74
75 let result = self.continuous_query.process_source_change(change).await?;
76 if !result.is_empty() {
77 self.channel_tx.send(result)?;
78 }
79 Ok(())
80 }
81 async fn on_error(
82 &self,
83 future_ref: &FutureElementRef,
84 error: Box<dyn std::error::Error + Send + Sync>,
85 ) {
86 log::error!(
87 "Error processing {} off future queue: {:?}",
88 future_ref.element_ref,
89 error
90 );
91 }
92
93 fn now(&self) -> u64 {
94 if let Some(now_override) = &self.now_override {
95 return now_override.load(std::sync::atomic::Ordering::Relaxed);
96 }
97
98 SystemTime::now()
99 .duration_since(SystemTime::UNIX_EPOCH)
100 .unwrap()
101 .as_secs()
102 * 1000
103 }
104}