Skip to main content

reddb_server/storage/query/engine/
cancel.rs

1//! Cooperative query cancellation token.
2//!
3//! Issue #808 / PRD #750 (750d) — the propagation primitive for query
4//! cancellation. A [`CancelToken`] is a cheap, cloneable handle over a
5//! shared atomic flag. The streaming / cursor layer owns one token per
6//! live stream; the executor's pull-based iterators ([`super::iterator`])
7//! observe it between rows so a cancel raised by a client disconnect or an
8//! explicit cancel request stops scans, filters, joins, and the aggregates
9//! that consume them promptly, rather than after the whole result has been
10//! materialised.
11//!
12//! The token carries no reason of its own — the wire-visible reason lives
13//! at the stream layer ([`crate::server::output_stream::CloseReason`]).
14//! Here it is a single boolean: "should this query stop now?". Cooperative
15//! by design: an operator that never re-checks the token cannot be
16//! interrupted, so every long-running loop in the executor must poll
17//! [`CancelToken::is_cancelled`] on each iteration.
18
19use std::sync::atomic::{AtomicBool, Ordering};
20use std::sync::Arc;
21
22/// A cloneable handle over a shared cancellation flag.
23///
24/// Clones share the same underlying flag: cancelling any clone is observed
25/// by all of them. This is what lets the stream layer hold one handle while
26/// the executor holds another and still agree on whether to stop.
27#[derive(Clone, Debug, Default)]
28pub struct CancelToken {
29    flag: Arc<AtomicBool>,
30}
31
32impl CancelToken {
33    /// A fresh, un-cancelled token.
34    pub fn new() -> Self {
35        Self {
36            flag: Arc::new(AtomicBool::new(false)),
37        }
38    }
39
40    /// Raise the cancel signal. Idempotent — cancelling an already-cancelled
41    /// token is a no-op. Visible to every clone of this token.
42    pub fn cancel(&self) {
43        self.flag.store(true, Ordering::SeqCst);
44    }
45
46    /// `true` once any clone of this token has been cancelled.
47    pub fn is_cancelled(&self) -> bool {
48        self.flag.load(Ordering::SeqCst)
49    }
50}
51
52#[cfg(test)]
53mod tests {
54    use super::*;
55
56    #[test]
57    fn fresh_token_is_not_cancelled() {
58        let token = CancelToken::new();
59        assert!(!token.is_cancelled());
60    }
61
62    #[test]
63    fn cancel_is_observed() {
64        let token = CancelToken::new();
65        token.cancel();
66        assert!(token.is_cancelled());
67    }
68
69    #[test]
70    fn cancel_is_idempotent() {
71        let token = CancelToken::new();
72        token.cancel();
73        token.cancel();
74        assert!(token.is_cancelled());
75    }
76
77    #[test]
78    fn clones_share_the_same_flag() {
79        let token = CancelToken::new();
80        let clone = token.clone();
81        // Cancelling the clone is observed through the original handle —
82        // this is the property the stream layer and the executor rely on.
83        clone.cancel();
84        assert!(token.is_cancelled());
85    }
86
87    #[test]
88    fn cancel_crosses_thread_boundary() {
89        let token = CancelToken::new();
90        let remote = token.clone();
91        let handle = std::thread::spawn(move || {
92            remote.cancel();
93        });
94        handle.join().expect("canceller thread joins");
95        assert!(token.is_cancelled());
96    }
97}