1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
//! Cooperative query cancellation, split out of `lib.rs` (lib.rs split
//! 19). `CancelToken` is the lightweight handle threaded through every
//! scanning loop: it wraps an optional `&AtomicBool` flag (the server's
//! per-query watchdog) and an optional monotonic deadline (PG
//! `statement_timeout`), and `check()` returns `EngineError::Cancelled`
//! when either trips. `none()` is the zero-cost default for the
//! uncancellable path. Public API — `spg-server` / `spg-embedded`
//! construct tokens via `CancelToken::none().with_deadline(...)`.
use crate::EngineError;
/// v7.17.0 Phase 2.3 — monotonic time source for deadline-aware
/// cancellation (PG `statement_timeout`). Returns microseconds
/// since some host-stable monotonic origin (typically the first
/// call into `Instant::now()` on the server). The engine never
/// calls `Instant::now()` directly so the crate stays `#![no_std]`.
pub type MonotonicNowFn = fn() -> u64;
#[derive(Debug, Clone, Copy)]
struct Deadline {
now_fn: MonotonicNowFn,
/// Absolute deadline in `now_fn()` units (microseconds).
deadline_us: u64,
}
#[derive(Debug, Clone, Copy)]
pub struct CancelToken<'a> {
flag: Option<&'a core::sync::atomic::AtomicBool>,
// v7.17.0 Phase 2.3 — when set, every existing `cancel.check()`
// checkpoint also fires `EngineError::Cancelled` once
// `(now_fn)() >= deadline_us`. No new check sites, no thread
// spawn per query — the monotonic now-fn read is a vDSO
// `clock_gettime(CLOCK_MONOTONIC)` (~20ns) and only runs when
// the host actually wired a deadline (statement_timeout > 0).
deadline: Option<Deadline>,
}
impl<'a> CancelToken<'a> {
#[must_use]
pub const fn none() -> Self {
Self {
flag: None,
deadline: None,
}
}
#[must_use]
pub const fn from_flag(f: &'a core::sync::atomic::AtomicBool) -> Self {
Self {
flag: Some(f),
deadline: None,
}
}
/// v7.17.0 Phase 2.3 — attach a monotonic deadline. `now_fn`
/// must return microseconds since a stable origin; the token
/// trips when `now_fn() >= deadline_us`. Compose with
/// `from_flag(...)` when both a watchdog flag and a per-statement
/// timeout are in play (e.g. server-wide `SPG_QUERY_TIMEOUT_MS`
/// plus session `statement_timeout`); the tighter of the two
/// wins by virtue of either signaling first.
#[must_use]
pub const fn with_deadline(mut self, now_fn: MonotonicNowFn, deadline_us: u64) -> Self {
self.deadline = Some(Deadline {
now_fn,
deadline_us,
});
self
}
#[must_use]
pub fn is_cancelled(self) -> bool {
if self
.flag
.is_some_and(|f| f.load(core::sync::atomic::Ordering::Relaxed))
{
return true;
}
// Deadline check is the second branch so the "no timeout"
// hot path (`deadline: None`) elides the now-fn call —
// predicted-not-taken on the SLO INSERT loop.
if let Some(d) = self.deadline
&& (d.now_fn)() >= d.deadline_us
{
return true;
}
false
}
/// Returns `Err(Cancelled)` if the token has been tripped.
/// Used at row-loop checkpoints to bail cooperatively without
/// scattering raw `is_cancelled` checks across the executor.
#[inline]
pub fn check(self) -> Result<(), EngineError> {
if self.is_cancelled() {
Err(EngineError::Cancelled)
} else {
Ok(())
}
}
}