Skip to main content

scouter_dataframe/parquet/bifrost/
query.rs

1use std::collections::HashMap;
2use tokio::sync::RwLock;
3use tokio_util::sync::CancellationToken;
4
5/// Tracks active queries for cancellation support.
6#[derive(Default)]
7pub struct QueryTracker {
8    active: RwLock<HashMap<String, CancellationToken>>,
9}
10
11impl QueryTracker {
12    pub fn new() -> Self {
13        Self {
14            active: RwLock::new(HashMap::new()),
15        }
16    }
17
18    /// Register a query and return a cancellation token.
19    /// The caller should poll `token.cancelled()` in a `tokio::select!`.
20    /// Returns `Err(DuplicateQueryId)` if the query_id is already registered.
21    pub async fn register(
22        &self,
23        query_id: &str,
24    ) -> Result<CancellationToken, crate::error::DatasetEngineError> {
25        let mut active = self.active.write().await;
26        if active.contains_key(query_id) {
27            return Err(crate::error::DatasetEngineError::DuplicateQueryId(
28                query_id.to_string(),
29            ));
30        }
31        let token = CancellationToken::new();
32        active.insert(query_id.to_string(), token.clone());
33        Ok(token)
34    }
35
36    /// Cancel a running query. Returns `true` if the query was found and cancelled.
37    pub async fn cancel(&self, query_id: &str) -> bool {
38        if let Some(token) = self.active.write().await.remove(query_id) {
39            token.cancel();
40            true
41        } else {
42            false
43        }
44    }
45
46    /// Remove a completed query from tracking.
47    pub async fn remove(&self, query_id: &str) {
48        self.active.write().await.remove(query_id);
49    }
50}
51
52/// Result of an executed query with metadata.
53pub struct QueryResult {
54    pub batches: Vec<arrow_array::RecordBatch>,
55    pub metadata: QueryExecutionMetadata,
56}
57
58/// Metadata about query execution.
59#[derive(Debug, Clone)]
60pub struct QueryExecutionMetadata {
61    pub query_id: String,
62    pub rows_returned: u64,
63    pub truncated: bool,
64    pub execution_time_ms: u64,
65    pub bytes_scanned: Option<u64>,
66}