scouter_dataframe/parquet/bifrost/
query.rs1use std::collections::HashMap;
2use tokio::sync::RwLock;
3use tokio_util::sync::CancellationToken;
4
5#[derive(Default)]
7pub struct QueryTracker {
8 active: RwLock<HashMap<String, CancellationToken>>,
9}
10
11impl QueryTracker {
12 pub fn new() -> Self {
13 Self {
14 active: RwLock::new(HashMap::new()),
15 }
16 }
17
18 pub async fn register(
22 &self,
23 query_id: &str,
24 ) -> Result<CancellationToken, crate::error::DatasetEngineError> {
25 let mut active = self.active.write().await;
26 if active.contains_key(query_id) {
27 return Err(crate::error::DatasetEngineError::DuplicateQueryId(
28 query_id.to_string(),
29 ));
30 }
31 let token = CancellationToken::new();
32 active.insert(query_id.to_string(), token.clone());
33 Ok(token)
34 }
35
36 pub async fn cancel(&self, query_id: &str) -> bool {
38 if let Some(token) = self.active.write().await.remove(query_id) {
39 token.cancel();
40 true
41 } else {
42 false
43 }
44 }
45
46 pub async fn remove(&self, query_id: &str) {
48 self.active.write().await.remove(query_id);
49 }
50}
51
52pub struct QueryResult {
54 pub batches: Vec<arrow_array::RecordBatch>,
55 pub metadata: QueryExecutionMetadata,
56}
57
58#[derive(Debug, Clone)]
60pub struct QueryExecutionMetadata {
61 pub query_id: String,
62 pub rows_returned: u64,
63 pub truncated: bool,
64 pub execution_time_ms: u64,
65 pub bytes_scanned: Option<u64>,
66}