reddb_server/storage/query/engine/cancel.rs
1//! Cooperative query cancellation token.
2//!
3//! Issue #808 / PRD #750 (750d) — the propagation primitive for query
4//! cancellation. A [`CancelToken`] is a cheap, cloneable handle over a
5//! shared atomic flag. The streaming / cursor layer owns one token per
6//! live stream; the executor's pull-based iterators ([`super::iterator`])
7//! observe it between rows so a cancel raised by a client disconnect or an
8//! explicit cancel request stops scans, filters, joins, and the aggregates
9//! that consume them promptly, rather than after the whole result has been
10//! materialised.
11//!
12//! The token carries no reason of its own — the wire-visible reason lives
13//! at the stream layer ([`crate::server::output_stream::CloseReason`]).
14//! Here it is a single boolean: "should this query stop now?". Cooperative
15//! by design: an operator that never re-checks the token cannot be
16//! interrupted, so every long-running loop in the executor must poll
17//! [`CancelToken::is_cancelled`] on each iteration.
18
19use std::sync::atomic::{AtomicBool, Ordering};
20use std::sync::Arc;
21
22/// A cloneable handle over a shared cancellation flag.
23///
24/// Clones share the same underlying flag: cancelling any clone is observed
25/// by all of them. This is what lets the stream layer hold one handle while
26/// the executor holds another and still agree on whether to stop.
27#[derive(Clone, Debug, Default)]
28pub struct CancelToken {
29 flag: Arc<AtomicBool>,
30}
31
32impl CancelToken {
33 /// A fresh, un-cancelled token.
34 pub fn new() -> Self {
35 Self {
36 flag: Arc::new(AtomicBool::new(false)),
37 }
38 }
39
40 /// Raise the cancel signal. Idempotent — cancelling an already-cancelled
41 /// token is a no-op. Visible to every clone of this token.
42 pub fn cancel(&self) {
43 self.flag.store(true, Ordering::SeqCst);
44 }
45
46 /// `true` once any clone of this token has been cancelled.
47 pub fn is_cancelled(&self) -> bool {
48 self.flag.load(Ordering::SeqCst)
49 }
50}
51
52#[cfg(test)]
53mod tests {
54 use super::*;
55
56 #[test]
57 fn fresh_token_is_not_cancelled() {
58 let token = CancelToken::new();
59 assert!(!token.is_cancelled());
60 }
61
62 #[test]
63 fn cancel_is_observed() {
64 let token = CancelToken::new();
65 token.cancel();
66 assert!(token.is_cancelled());
67 }
68
69 #[test]
70 fn cancel_is_idempotent() {
71 let token = CancelToken::new();
72 token.cancel();
73 token.cancel();
74 assert!(token.is_cancelled());
75 }
76
77 #[test]
78 fn clones_share_the_same_flag() {
79 let token = CancelToken::new();
80 let clone = token.clone();
81 // Cancelling the clone is observed through the original handle —
82 // this is the property the stream layer and the executor rely on.
83 clone.cancel();
84 assert!(token.is_cancelled());
85 }
86
87 #[test]
88 fn cancel_crosses_thread_boundary() {
89 let token = CancelToken::new();
90 let remote = token.clone();
91 let handle = std::thread::spawn(move || {
92 remote.cancel();
93 });
94 handle.join().expect("canceller thread joins");
95 assert!(token.is_cancelled());
96 }
97}