Skip to main content

drasi_core/query/
auto_future_queue_consumer.rs

1// Copyright 2024 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    sync::{atomic::AtomicU64, Arc, Weak},
17    time::{Duration, SystemTime},
18};
19
20use async_trait::async_trait;
21use tokio::sync::{mpsc, Mutex};
22
23use crate::{evaluation::context::QueryPartEvaluationContext, interface::FutureQueueConsumer};
24
25use super::ContinuousQuery;
26
27pub struct AutoFutureQueueConsumer {
28    continuous_query: Weak<ContinuousQuery>,
29    channel_tx: mpsc::UnboundedSender<Vec<QueryPartEvaluationContext>>,
30    channel_rx: Mutex<mpsc::UnboundedReceiver<Vec<QueryPartEvaluationContext>>>,
31    now_override: Option<Arc<AtomicU64>>,
32}
33
34impl AutoFutureQueueConsumer {
35    pub fn new(continuous_query: Arc<ContinuousQuery>) -> Self {
36        let (channel_tx, channel_rx) = mpsc::unbounded_channel();
37
38        AutoFutureQueueConsumer {
39            continuous_query: Arc::downgrade(&continuous_query),
40            channel_tx,
41            channel_rx: Mutex::new(channel_rx),
42            now_override: None,
43        }
44    }
45
46    pub fn with_now_override(mut self, now_override: Arc<AtomicU64>) -> Self {
47        self.now_override = Some(now_override);
48        self
49    }
50
51    pub async fn recv(&self, timeout: Duration) -> Option<Vec<QueryPartEvaluationContext>> {
52        let mut rx = self.channel_rx.lock().await;
53        match tokio::time::timeout(timeout, rx.recv()).await {
54            Ok(Some(result)) => Some(result),
55            Ok(None) => None,
56            Err(_) => None,
57        }
58    }
59}
60
61#[async_trait]
62impl FutureQueueConsumer for AutoFutureQueueConsumer {
63    async fn on_items_due(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
64        let cq = self
65            .continuous_query
66            .upgrade()
67            .ok_or("ContinuousQuery has been dropped")?;
68
69        if let Some(due_result) = cq.process_due_futures().await? {
70            if !due_result.results.is_empty() {
71                self.channel_tx.send(due_result.results)?;
72            }
73        }
74        Ok(())
75    }
76
77    async fn on_error(&self, error: Box<dyn std::error::Error + Send + Sync>) {
78        log::error!("Error processing future queue item: {error:?}");
79    }
80
81    fn now(&self) -> u64 {
82        if let Some(now_override) = &self.now_override {
83            return now_override.load(std::sync::atomic::Ordering::Relaxed);
84        }
85
86        SystemTime::now()
87            .duration_since(SystemTime::UNIX_EPOCH)
88            .unwrap_or_default()
89            .as_secs()
90            * 1000
91    }
92}