Skip to main content

selene_core/
cancellation.rs

1//! Cooperative cancellation primitives shared by the executor and algorithm crates.
2//!
3//! Cancellation is cooperative: callers request cancellation through a
4//! [`CancellationToken`], and long-running executor or algorithm loops observe
5//! it at explicit checkpoints. Work between checkpoints is allowed to finish
6//! before the cancellation surfaces.
7
8use std::{
9    sync::{
10        Arc,
11        atomic::{AtomicBool, AtomicUsize, Ordering},
12    },
13    time::{Duration, Instant},
14};
15
16/// Caller-owned cancellation flag that can be cloned across sessions or threads.
17///
18/// Clones share the same underlying flag, so a host can keep one handle and
19/// pass another into `selene-gql` session builders. Calling [`Self::cancel`]
20/// requests cancellation; it does not interrupt running Rust code
21/// preemptively.
22#[derive(Clone, Debug, Default)]
23pub struct CancellationToken(Arc<AtomicBool>);
24
25impl CancellationToken {
26    /// Construct a new token in the non-cancelled state.
27    #[must_use]
28    pub fn new() -> Self {
29        Self::default()
30    }
31
32    /// Request cooperative cancellation for all holders of this token.
33    pub fn cancel(&self) {
34        self.0.store(true, Ordering::Release);
35    }
36
37    /// Return true when cancellation has been requested.
38    #[must_use]
39    pub fn is_cancelled(&self) -> bool {
40        self.0.load(Ordering::Acquire)
41    }
42}
43
44/// Per-statement deterministic budget for scanned graph nodes.
45///
46/// The budget is separate from wall-clock deadlines: hot loops explicitly call
47/// [`CancellationChecker::note_nodes_scanned`] at batch boundaries, and the
48/// shared counter trips once the accumulated scanned node count exceeds the
49/// configured limit.
50#[derive(Debug)]
51pub struct NodeScanBudget {
52    max_nodes: usize,
53    scanned: AtomicUsize,
54}
55
56impl NodeScanBudget {
57    /// Construct a budget that permits at most `max_nodes` scanned nodes.
58    #[must_use]
59    pub const fn new(max_nodes: usize) -> Self {
60        Self {
61            max_nodes,
62            scanned: AtomicUsize::new(0),
63        }
64    }
65
66    /// Return the configured maximum node-scan count.
67    #[must_use]
68    pub const fn max_nodes(&self) -> usize {
69        self.max_nodes
70    }
71
72    /// Return the number of nodes observed by this budget so far.
73    #[must_use]
74    pub fn scanned(&self) -> usize {
75        self.scanned.load(Ordering::Acquire)
76    }
77
78    fn note_nodes_scanned(&self, nodes: usize) -> Result<(), CancellationCause> {
79        let mut observed = self.scanned.load(Ordering::Acquire);
80        loop {
81            let next = observed.saturating_add(nodes);
82            match self.scanned.compare_exchange_weak(
83                observed,
84                next,
85                Ordering::AcqRel,
86                Ordering::Acquire,
87            ) {
88                Ok(_) => {
89                    if next > self.max_nodes {
90                        return Err(CancellationCause::NodeScanBudgetExceeded {
91                            limit: self.max_nodes,
92                            scanned: next,
93                        });
94                    }
95                    return Ok(());
96                }
97                Err(actual) => observed = actual,
98            }
99        }
100    }
101}
102
103/// Cancellation outcome reported by cooperative checkpoint calls.
104#[derive(Clone, Copy, Debug, Eq, PartialEq)]
105pub enum CancellationCause {
106    /// The caller-owned token was cancelled.
107    Cancelled,
108    /// The statement deadline passed before the checkpoint.
109    Timeout {
110        /// Wall-clock duration since the deadline elapsed.
111        elapsed: Duration,
112    },
113    /// The deterministic node-scan budget was exceeded.
114    NodeScanBudgetExceeded {
115        /// Maximum allowed scanned nodes.
116        limit: usize,
117        /// Observed scanned nodes after the batch that crossed the limit.
118        scanned: usize,
119    },
120}
121
122/// Cheap composite checker passed into hot loops that cannot depend on `selene-gql`.
123///
124/// The checker combines an optional cancellation token, absolute deadline, and
125/// deterministic node-scan budget. It is intentionally `Copy` so callers can
126/// pass it into nested loops and algorithm hot loops without allocation.
127#[derive(Clone, Copy, Debug)]
128pub struct CancellationChecker<'a> {
129    token: Option<&'a CancellationToken>,
130    deadline: Option<Instant>,
131    node_scan_budget: Option<&'a NodeScanBudget>,
132}
133
134impl<'a> CancellationChecker<'a> {
135    /// Construct a checker from an optional token and optional deadline.
136    #[must_use]
137    pub const fn new(token: Option<&'a CancellationToken>, deadline: Option<Instant>) -> Self {
138        Self {
139            token,
140            deadline,
141            node_scan_budget: None,
142        }
143    }
144
145    /// Construct a checker with an additional deterministic node-scan budget.
146    #[must_use]
147    pub const fn new_with_node_scan_budget(
148        token: Option<&'a CancellationToken>,
149        deadline: Option<Instant>,
150        node_scan_budget: Option<&'a NodeScanBudget>,
151    ) -> Self {
152        Self {
153            token,
154            deadline,
155            node_scan_budget,
156        }
157    }
158
159    /// Construct a checker that never cancels or times out.
160    #[must_use]
161    pub const fn disabled() -> Self {
162        Self {
163            token: None,
164            deadline: None,
165            node_scan_budget: None,
166        }
167    }
168
169    /// Return true when this checker has no token, deadline, or node-scan budget to inspect.
170    #[must_use]
171    #[inline(always)]
172    pub const fn is_disabled(&self) -> bool {
173        self.token.is_none() && self.deadline.is_none() && self.node_scan_budget.is_none()
174    }
175
176    /// Return the first cancellation cause observed at this checkpoint.
177    ///
178    /// Token cancellation wins over deadline timeout so explicit caller
179    /// cancellation is not reported as a timeout when both are true.
180    #[inline]
181    pub fn check(&self) -> Result<(), CancellationCause> {
182        if self.token.is_some_and(CancellationToken::is_cancelled) {
183            return Err(CancellationCause::Cancelled);
184        }
185        if let Some(deadline) = self.deadline {
186            let now = Instant::now();
187            if now >= deadline {
188                return Err(CancellationCause::Timeout {
189                    elapsed: now.duration_since(deadline),
190                });
191            }
192        }
193        Ok(())
194    }
195
196    /// Check cancellation, then account for a batch of scanned graph nodes.
197    ///
198    /// Token cancellation wins over deadline timeout and scan-budget exhaustion;
199    /// deadline timeout wins over scan-budget exhaustion. Callers should invoke
200    /// this at scan batch boundaries rather than per row.
201    #[inline]
202    pub fn note_nodes_scanned(&self, nodes: usize) -> Result<(), CancellationCause> {
203        self.check()?;
204        if let Some(budget) = self.node_scan_budget {
205            budget.note_nodes_scanned(nodes)?;
206        }
207        Ok(())
208    }
209}
210
211#[cfg(test)]
212mod tests {
213    use super::*;
214
215    #[test]
216    fn disabled_checker_never_trips() {
217        let checker = CancellationChecker::disabled();
218        assert!(checker.is_disabled());
219        assert_eq!(checker.check(), Ok(()));
220    }
221
222    #[test]
223    fn checker_with_token_is_not_disabled() {
224        let token = CancellationToken::new();
225        let checker = CancellationChecker::new(Some(&token), None);
226        assert!(!checker.is_disabled());
227    }
228
229    #[test]
230    fn checker_with_deadline_is_not_disabled() {
231        let deadline = Instant::now();
232        let checker = CancellationChecker::new(None, Some(deadline));
233        assert!(!checker.is_disabled());
234    }
235
236    #[test]
237    fn checker_with_node_scan_budget_is_not_disabled() {
238        let budget = NodeScanBudget::new(10);
239        let checker = CancellationChecker::new_with_node_scan_budget(None, None, Some(&budget));
240        assert!(!checker.is_disabled());
241    }
242
243    #[test]
244    fn token_wins_over_deadline_when_both_tripped() {
245        // CORE-09: both a cancelled token AND an elapsed deadline are set. The
246        // checker must report Cancelled (explicit caller intent), not Timeout.
247        let token = CancellationToken::new();
248        token.cancel();
249        let elapsed_deadline = Instant::now() - Duration::from_secs(1);
250        let checker = CancellationChecker::new(Some(&token), Some(elapsed_deadline));
251        assert_eq!(checker.check(), Err(CancellationCause::Cancelled));
252    }
253
254    #[test]
255    fn token_wins_over_node_scan_budget_when_both_tripped() {
256        let token = CancellationToken::new();
257        token.cancel();
258        let budget = NodeScanBudget::new(0);
259        let checker =
260            CancellationChecker::new_with_node_scan_budget(Some(&token), None, Some(&budget));
261
262        assert_eq!(
263            checker.note_nodes_scanned(1),
264            Err(CancellationCause::Cancelled)
265        );
266        assert_eq!(budget.scanned(), 0);
267    }
268
269    #[test]
270    fn deadline_wins_over_node_scan_budget_when_both_tripped() {
271        let elapsed_deadline = Instant::now() - Duration::from_secs(1);
272        let budget = NodeScanBudget::new(0);
273        let checker = CancellationChecker::new_with_node_scan_budget(
274            None,
275            Some(elapsed_deadline),
276            Some(&budget),
277        );
278
279        assert!(matches!(
280            checker.note_nodes_scanned(1),
281            Err(CancellationCause::Timeout { .. })
282        ));
283        assert_eq!(budget.scanned(), 0);
284    }
285
286    #[test]
287    fn deadline_reported_when_only_deadline_tripped() {
288        let elapsed_deadline = Instant::now() - Duration::from_secs(1);
289        let checker = CancellationChecker::new(None, Some(elapsed_deadline));
290        assert!(matches!(
291            checker.check(),
292            Err(CancellationCause::Timeout { .. })
293        ));
294    }
295
296    #[test]
297    fn live_token_with_future_deadline_passes() {
298        let token = CancellationToken::new();
299        let future_deadline = Instant::now() + Duration::from_secs(3600);
300        let checker = CancellationChecker::new(Some(&token), Some(future_deadline));
301        assert_eq!(checker.check(), Ok(()));
302    }
303
304    #[test]
305    fn node_scan_budget_trips_after_crossing_limit() {
306        let budget = NodeScanBudget::new(3);
307        let checker = CancellationChecker::new_with_node_scan_budget(None, None, Some(&budget));
308
309        assert_eq!(checker.note_nodes_scanned(2), Ok(()));
310        assert_eq!(
311            checker.note_nodes_scanned(2),
312            Err(CancellationCause::NodeScanBudgetExceeded {
313                limit: 3,
314                scanned: 4
315            })
316        );
317        assert_eq!(budget.scanned(), 4);
318    }
319}