use std::{
sync::{atomic::AtomicU64, Arc, Weak},
time::{Duration, SystemTime},
};
use async_trait::async_trait;
use tokio::sync::{mpsc, Mutex};
use crate::{evaluation::context::QueryPartEvaluationContext, interface::FutureQueueConsumer};
use super::ContinuousQuery;
pub struct AutoFutureQueueConsumer {
continuous_query: Weak<ContinuousQuery>,
channel_tx: mpsc::UnboundedSender<Vec<QueryPartEvaluationContext>>,
channel_rx: Mutex<mpsc::UnboundedReceiver<Vec<QueryPartEvaluationContext>>>,
now_override: Option<Arc<AtomicU64>>,
}
impl AutoFutureQueueConsumer {
pub fn new(continuous_query: Arc<ContinuousQuery>) -> Self {
let (channel_tx, channel_rx) = mpsc::unbounded_channel();
AutoFutureQueueConsumer {
continuous_query: Arc::downgrade(&continuous_query),
channel_tx,
channel_rx: Mutex::new(channel_rx),
now_override: None,
}
}
pub fn with_now_override(mut self, now_override: Arc<AtomicU64>) -> Self {
self.now_override = Some(now_override);
self
}
pub async fn recv(&self, timeout: Duration) -> Option<Vec<QueryPartEvaluationContext>> {
let mut rx = self.channel_rx.lock().await;
match tokio::time::timeout(timeout, rx.recv()).await {
Ok(Some(result)) => Some(result),
Ok(None) => None,
Err(_) => None,
}
}
}
#[async_trait]
impl FutureQueueConsumer for AutoFutureQueueConsumer {
async fn on_items_due(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let cq = self
.continuous_query
.upgrade()
.ok_or("ContinuousQuery has been dropped")?;
if let Some(due_result) = cq.process_due_futures().await? {
if !due_result.results.is_empty() {
self.channel_tx.send(due_result.results)?;
}
}
Ok(())
}
async fn on_error(&self, error: Box<dyn std::error::Error + Send + Sync>) {
log::error!("Error processing future queue item: {error:?}");
}
fn now(&self) -> u64 {
if let Some(now_override) = &self.now_override {
return now_override.load(std::sync::atomic::Ordering::Relaxed);
}
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
* 1000
}
}