use crate::runtime::l0::L0Buffer;
use parking_lot::RwLock;
use std::sync::Arc;
use std::time::Instant;
use tokio_util::sync::CancellationToken;
#[derive(Clone)]
pub struct QueryContext {
pub l0: Arc<RwLock<L0Buffer>>,
pub transaction_l0: Option<Arc<RwLock<L0Buffer>>>,
pub pending_flush_l0s: Vec<Arc<RwLock<L0Buffer>>>,
pub deadline: Option<Instant>,
pub cancellation_token: Option<CancellationToken>,
}
impl QueryContext {
pub fn new(l0: Arc<RwLock<L0Buffer>>) -> Self {
Self {
l0,
transaction_l0: None,
pending_flush_l0s: Vec::new(),
deadline: None,
cancellation_token: None,
}
}
pub fn new_with_tx(
l0: Arc<RwLock<L0Buffer>>,
transaction_l0: Option<Arc<RwLock<L0Buffer>>>,
) -> Self {
Self {
l0,
transaction_l0,
pending_flush_l0s: Vec::new(),
deadline: None,
cancellation_token: None,
}
}
pub fn new_with_pending(
l0: Arc<RwLock<L0Buffer>>,
transaction_l0: Option<Arc<RwLock<L0Buffer>>>,
pending_flush_l0s: Vec<Arc<RwLock<L0Buffer>>>,
) -> Self {
Self {
l0,
transaction_l0,
pending_flush_l0s,
deadline: None,
cancellation_token: None,
}
}
pub fn set_deadline(&mut self, deadline: Instant) {
self.deadline = Some(deadline);
}
pub fn set_cancellation_token(&mut self, token: CancellationToken) {
self.cancellation_token = Some(token);
}
pub fn check_timeout(&self) -> anyhow::Result<()> {
if let Some(ref token) = self.cancellation_token
&& token.is_cancelled()
{
return Err(anyhow::anyhow!("Query cancelled"));
}
if let Some(deadline) = self.deadline
&& Instant::now() > deadline
{
return Err(anyhow::anyhow!("Query timed out"));
}
Ok(())
}
}