Skip to main content

uni_store/runtime/
context.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4use crate::runtime::l0::L0Buffer;
5use parking_lot::RwLock;
6use std::sync::Arc;
7use std::time::Instant;
8use tokio_util::sync::CancellationToken;
9
10#[derive(Clone)]
11pub struct QueryContext {
12    pub l0: Arc<RwLock<L0Buffer>>,
13    pub transaction_l0: Option<Arc<RwLock<L0Buffer>>>,
14    /// L0 buffers currently being flushed to L1.
15    /// These remain visible to reads until flush completes successfully.
16    pub pending_flush_l0s: Vec<Arc<RwLock<L0Buffer>>>,
17    pub deadline: Option<Instant>,
18    /// Cooperative cancellation token. Checked alongside the deadline in
19    /// `check_timeout()`.
20    pub cancellation_token: Option<CancellationToken>,
21}
22
23impl QueryContext {
24    pub fn new(l0: Arc<RwLock<L0Buffer>>) -> Self {
25        Self {
26            l0,
27            transaction_l0: None,
28            pending_flush_l0s: Vec::new(),
29            deadline: None,
30            cancellation_token: None,
31        }
32    }
33
34    pub fn new_with_tx(
35        l0: Arc<RwLock<L0Buffer>>,
36        transaction_l0: Option<Arc<RwLock<L0Buffer>>>,
37    ) -> Self {
38        Self {
39            l0,
40            transaction_l0,
41            pending_flush_l0s: Vec::new(),
42            deadline: None,
43            cancellation_token: None,
44        }
45    }
46
47    pub fn new_with_pending(
48        l0: Arc<RwLock<L0Buffer>>,
49        transaction_l0: Option<Arc<RwLock<L0Buffer>>>,
50        pending_flush_l0s: Vec<Arc<RwLock<L0Buffer>>>,
51    ) -> Self {
52        Self {
53            l0,
54            transaction_l0,
55            pending_flush_l0s,
56            deadline: None,
57            cancellation_token: None,
58        }
59    }
60
61    pub fn set_deadline(&mut self, deadline: Instant) {
62        self.deadline = Some(deadline);
63    }
64
65    pub fn set_cancellation_token(&mut self, token: CancellationToken) {
66        self.cancellation_token = Some(token);
67    }
68
69    pub fn check_timeout(&self) -> anyhow::Result<()> {
70        if let Some(ref token) = self.cancellation_token
71            && token.is_cancelled()
72        {
73            return Err(anyhow::anyhow!("Query cancelled"));
74        }
75        if let Some(deadline) = self.deadline
76            && Instant::now() > deadline
77        {
78            return Err(anyhow::anyhow!("Query timed out"));
79        }
80        Ok(())
81    }
82}