drasi_core/query/
auto_future_queue_consumer.rs

1// Copyright 2024 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    sync::{atomic::AtomicU64, Arc},
17    time::{Duration, SystemTime},
18};
19
20use async_trait::async_trait;
21use tokio::sync::{mpsc, Mutex};
22
23use crate::{
24    evaluation::context::QueryPartEvaluationContext,
25    interface::{FutureElementRef, FutureQueueConsumer},
26    models::SourceChange,
27};
28
29use super::ContinuousQuery;
30
31pub struct AutoFutureQueueConsumer {
32    continuous_query: Arc<ContinuousQuery>,
33    channel_tx: mpsc::UnboundedSender<Vec<QueryPartEvaluationContext>>,
34    channel_rx: Mutex<mpsc::UnboundedReceiver<Vec<QueryPartEvaluationContext>>>,
35    now_override: Option<Arc<AtomicU64>>,
36}
37
38impl AutoFutureQueueConsumer {
39    pub fn new(continuous_query: Arc<ContinuousQuery>) -> Self {
40        let (channel_tx, channel_rx) = mpsc::unbounded_channel();
41
42        AutoFutureQueueConsumer {
43            continuous_query,
44            channel_tx,
45            channel_rx: Mutex::new(channel_rx),
46            now_override: None,
47        }
48    }
49
50    pub fn with_now_override(mut self, now_override: Arc<AtomicU64>) -> Self {
51        self.now_override = Some(now_override);
52        self
53    }
54
55    pub async fn recv(&self, timeout: Duration) -> Option<Vec<QueryPartEvaluationContext>> {
56        let mut rx = self.channel_rx.lock().await;
57        match tokio::time::timeout(timeout, rx.recv()).await {
58            Ok(Some(result)) => Some(result),
59            Ok(None) => None,
60            Err(_) => None,
61        }
62    }
63}
64
65#[async_trait]
66impl FutureQueueConsumer for AutoFutureQueueConsumer {
67    async fn on_due(
68        &self,
69        future_ref: &FutureElementRef,
70    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
71        let change = SourceChange::Future {
72            future_ref: future_ref.clone(),
73        };
74
75        let result = self.continuous_query.process_source_change(change).await?;
76        if !result.is_empty() {
77            self.channel_tx.send(result)?;
78        }
79        Ok(())
80    }
81    async fn on_error(
82        &self,
83        future_ref: &FutureElementRef,
84        error: Box<dyn std::error::Error + Send + Sync>,
85    ) {
86        log::error!(
87            "Error processing {} off future queue: {:?}",
88            future_ref.element_ref,
89            error
90        );
91    }
92
93    fn now(&self) -> u64 {
94        if let Some(now_override) = &self.now_override {
95            return now_override.load(std::sync::atomic::Ordering::Relaxed);
96        }
97
98        SystemTime::now()
99            .duration_since(SystemTime::UNIX_EPOCH)
100            .unwrap()
101            .as_secs()
102            * 1000
103    }
104}