Skip to main content

sqry_core/query/
budget.rs

1//! Per-tool runtime row budget (per `C_budget.md` §§1–6 +
2//! `00_contracts.md` §3.CC-1 + §3.CC-2).
3//!
4//! Runtime backstop for queries that slipped past the static cost
5//! gate ([`crate::query::cost_gate`] / Subagent B). The budget caps
6//! how many rows the executor may examine inside `evaluate_all`'s
7//! hot loop; on overflow the budget trips Subagent A's
8//! [`CancellationToken`] so the existing drain / drop pathway fires
9//! uniformly. A shared [`CancellationSource`] tag records *which*
10//! signal cancelled the token so the wrapper-side downcast can
11//! choose between [`BudgetExceeded`] (→ `query_too_broad` with
12//! `details.source = "runtime_budget"`) and
13//! [`crate::query::error::QueryError::Cancelled`] (→
14//! `deadline_exceeded`) deterministically — even when the
15//! sequential / parallel evaluator paths interleave with the
16//! wrapper deadline drop-guard.
17//!
18//! [`CancellationToken`]: crate::query::cancellation::CancellationToken
19
20use std::sync::Arc;
21use std::sync::atomic::{AtomicU8, AtomicU64, Ordering};
22
23use thiserror::Error;
24
25/// Documented default row budget per tool invocation.
26///
27/// 5_000_000 rows is large enough that healthy queries on
28/// realistic workspaces never trip it (a worst-case `kind:function`
29/// scan across every node in a multi-million-line monorepo is
30/// bounded by node-count, not by query shape). It exists to bound
31/// the worst-case runtime on a query that bypassed the static cost
32/// gate via a coupling rule that turned out to be inadequate
33/// (e.g. a `kind:function` coupled regex over a synthetic-graph
34/// monorepo with millions of generated function nodes — the
35/// maintainer's reported failure mode).
36pub const DEFAULT_BUDGET_ROWS: u64 = 5_000_000;
37
38/// Environment-variable name for the global default override.
39/// Per-call `budget_rows` MCP parameters (per
40/// `C_budget.md` §C5) take precedence over the env-var default.
41pub const ENV_TOOL_BUDGET_ROWS: &str = "SQRY_TOOL_BUDGET_ROWS";
42
43/// Default per-row check stride. Trades early-trip precision for
44/// `fetch_add` overhead. 256 keeps the per-row overhead under one
45/// extra branch + one `Relaxed` load on the cancel-poll path.
46pub const DEFAULT_CHECK_STRIDE: u64 = 256;
47
48/// Discriminator for which subsystem first triggered cancellation
49/// against a [`QueryBudget`]. Stored as `AtomicU8` so it can be
50/// tagged once and read by every Rayon worker observing
51/// `cancel.is_cancelled()`. Per `C_budget.md` §3 + Codex iter-1
52/// finding 3 fix.
53///
54/// First-observer-wins: any worker that notices the token is
55/// cancelled and finds the source still tagged `None` performs a
56/// CAS to install `External`. The CAS makes
57/// "deadline-cancel-arrived-while-budget-was-just-overflowing"
58/// deterministic — whichever signal raced to mark the source first
59/// wins, and every subsequent observer (including Rayon workers
60/// running on other cores) reads the same tag and emits the
61/// matching typed error.
62#[derive(Debug, Clone, Copy, PartialEq, Eq)]
63#[repr(u8)]
64pub enum CancellationSource {
65    /// Token not yet cancelled (or no source tag has been written
66    /// yet — see `evaluate_all`'s `classify_cancel` for the rule
67    /// that treats `None` observed alongside `is_cancelled()` as
68    /// `External`).
69    None = 0,
70    /// Budget [`QueryBudget::tick`] overflowed `max_rows` and
71    /// called `cancel.cancel()`.
72    Budget = 1,
73    /// Some other path — the wrapper deadline drop-guard, an
74    /// admin `daemon/cancel-tool` future hook, or a parent build
75    /// cancellation — flipped the shared token.
76    External = 2,
77}
78
79impl CancellationSource {
80    /// Reverse of `repr(u8)`. Maps unknown values to `None` so a
81    /// future variant added on a writer that the reader hasn't
82    /// caught up with does not fault.
83    #[inline]
84    #[must_use]
85    pub fn from_u8(v: u8) -> Self {
86        match v {
87            1 => CancellationSource::Budget,
88            2 => CancellationSource::External,
89            _ => CancellationSource::None,
90        }
91    }
92}
93
94/// Per-tool runtime budget. Constructed at the
95/// [`crate::query::executor::QueryExecutor`] boundary; threaded
96/// into [`crate::query::executor::graph_eval::GraphEvalContext`]
97/// so `evaluate_all` can sample-and-check it without re-reading
98/// env vars per call.
99///
100/// `Clone` is cheap: the inner counters are `Arc`-shared, so
101/// cloning across rayon workers is a refcount bump per field.
102#[derive(Debug, Clone)]
103pub struct QueryBudget {
104    /// Maximum rows the executor may examine before tripping
105    /// cancellation. `0` is rejected at construction sites (MCP
106    /// boundary + env-var parse).
107    pub max_rows: u64,
108    /// How many rows have actually been examined. Shared across
109    /// rayon worker threads; reset per `evaluate_all` call.
110    pub examined: Arc<AtomicU64>,
111    /// Shared cancellation token (canonical type per
112    /// `00_contracts.md` §3.CC-1). Tripped when
113    /// `examined >= max_rows`.
114    pub cancel: crate::query::cancellation::CancellationToken,
115    /// First-observer-wins source tag. Written exactly once by
116    /// the first signal to cancel the shared token (either
117    /// [`Self::tick`] on budget overflow or
118    /// [`Self::mark_external_cancel`]).
119    pub state: Arc<AtomicU8>,
120    /// How often (in rows) to compare `examined` against
121    /// `max_rows`. Trades early-trip precision for `fetch_add`
122    /// overhead.
123    pub check_stride: u64,
124}
125
126impl QueryBudget {
127    /// Construct a fresh budget with `max_rows` and the documented
128    /// [`DEFAULT_CHECK_STRIDE`]. The supplied token is the canonical
129    /// per-request `CancellationToken` from Subagent A's wrapper —
130    /// `tick()` calls `cancel()` on it when the budget trips, so
131    /// every clone of the token observes the cancellation through
132    /// the same shared `Arc<AtomicBool>`.
133    #[must_use]
134    pub fn new(max_rows: u64, cancel: crate::query::cancellation::CancellationToken) -> Self {
135        Self {
136            max_rows,
137            examined: Arc::new(AtomicU64::new(0)),
138            cancel,
139            state: Arc::new(AtomicU8::new(CancellationSource::None as u8)),
140            check_stride: DEFAULT_CHECK_STRIDE,
141        }
142    }
143
144    /// Construct an effectively-unbounded budget (`u64::MAX` rows)
145    /// for back-compat callers that have not opted into per-tool
146    /// budgeting yet. The cancellation token is still wired so
147    /// external cancel signals propagate normally.
148    #[must_use]
149    pub fn unbounded(cancel: crate::query::cancellation::CancellationToken) -> Self {
150        Self::new(u64::MAX, cancel)
151    }
152
153    /// Resolve the effective budget for an MCP tool call.
154    ///
155    /// Priority (per `C_budget.md` §2):
156    ///
157    /// 1. Per-call `budget_rows` MCP parameter (the caller can
158    ///    opt to a tighter or looser bound for diagnostics).
159    /// 2. Environment-variable `SQRY_TOOL_BUDGET_ROWS`.
160    /// 3. [`DEFAULT_BUDGET_ROWS`].
161    ///
162    /// `0` is rejected at this boundary by mapping to the
163    /// effectively-unbounded variant — a budget of zero would
164    /// trip on the first row, which is never the operator
165    /// intent. Negative env values are similarly mapped to the
166    /// default.
167    #[must_use]
168    pub fn from_per_call_or_env(
169        per_call_budget: Option<u64>,
170        cancel: crate::query::cancellation::CancellationToken,
171    ) -> Self {
172        if let Some(rows) = per_call_budget
173            && rows > 0
174        {
175            return Self::new(rows, cancel);
176        }
177        let env_rows = std::env::var(ENV_TOOL_BUDGET_ROWS)
178            .ok()
179            .and_then(|s| s.parse::<u64>().ok())
180            .filter(|n| *n > 0);
181        let max_rows = env_rows.unwrap_or(DEFAULT_BUDGET_ROWS);
182        Self::new(max_rows, cancel)
183    }
184
185    /// Returns true iff the budget has been exceeded. Cheap: one
186    /// `Relaxed` load + comparison.
187    #[inline]
188    #[must_use]
189    pub fn exceeded(&self) -> bool {
190        self.examined.load(Ordering::Relaxed) >= self.max_rows
191    }
192
193    /// Reads the (possibly still `None`) cancellation source tag.
194    #[inline]
195    #[must_use]
196    pub fn source(&self) -> CancellationSource {
197        CancellationSource::from_u8(self.state.load(Ordering::Acquire))
198    }
199
200    /// Tag the source as `External` IFF it is still `None`. CAS,
201    /// safe to call from any worker on observation. No-op if
202    /// `Budget` already won the race. Returned bool indicates
203    /// whether THIS call won the CAS (used in tests).
204    #[inline]
205    pub fn mark_external_cancel(&self) -> bool {
206        self.state
207            .compare_exchange(
208                CancellationSource::None as u8,
209                CancellationSource::External as u8,
210                Ordering::AcqRel,
211                Ordering::Acquire,
212            )
213            .is_ok()
214    }
215
216    /// Tag the source as `Budget` IFF it is still `None`.
217    #[inline]
218    fn mark_budget_cancel(&self) -> bool {
219        self.state
220            .compare_exchange(
221                CancellationSource::None as u8,
222                CancellationSource::Budget as u8,
223                Ordering::AcqRel,
224                Ordering::Acquire,
225            )
226            .is_ok()
227    }
228
229    /// Increment by 1. Returns `Err(BudgetExceeded)` the first
230    /// time the post-increment count crosses `max_rows`.
231    /// Idempotent on subsequent crosses (so multiple rayon
232    /// workers racing past the threshold all see `Err` but the
233    /// cancel-once contract holds via `CancellationToken::cancel`'s
234    /// internal `AtomicBool`).
235    ///
236    /// Source-tag invariant: the tag is set to `Budget` BEFORE
237    /// `cancel.cancel()` is called. This ordering means any
238    /// observer that subsequently reads
239    /// `cancel.is_cancelled() == true` is guaranteed to see
240    /// `source() != None` — the consumer side of the invariant
241    /// lives in `evaluate_all`'s `classify_cancel` block.
242    ///
243    /// # Errors
244    ///
245    /// Returns [`BudgetExceeded`] when the post-increment
246    /// `examined` count meets or exceeds `max_rows`.
247    #[inline]
248    pub fn tick(&self) -> Result<(), BudgetExceeded> {
249        let prev = self.examined.fetch_add(1, Ordering::Relaxed);
250        if prev + 1 >= self.max_rows {
251            // Order matters: stamp source BEFORE flipping the
252            // token, so observers that see is_cancelled() also
253            // see Budget.
254            self.mark_budget_cancel();
255            self.cancel.cancel();
256            return Err(BudgetExceeded {
257                examined: prev + 1,
258                limit: self.max_rows,
259                predicate_shape: None,
260            });
261        }
262        Ok(())
263    }
264}
265
266/// Typed signal that a query exceeded its row budget. Surfaced
267/// from `evaluate_all` as `anyhow::Error::from(BudgetExceeded { .. })`;
268/// the MCP / daemon outer wrappers downcast on it (the same pattern
269/// `sqry-mcp/src/server.rs` uses for `RpcError`) and emit the
270/// `query_too_broad` envelope variant declared by Subagent B with
271/// `details.source = "runtime_budget"`,
272/// `details.examined = examined`, `details.limit = limit`.
273#[derive(Debug, Clone, Error, PartialEq, Eq)]
274#[error("query exceeded row budget: examined {examined} rows, limit {limit}")]
275pub struct BudgetExceeded {
276    /// Rows examined before the budget tripped. Surfaced as
277    /// `details.examined` in the wire envelope.
278    pub examined: u64,
279    /// Configured row budget at the time of the trip. Surfaced as
280    /// `details.limit` in the wire envelope.
281    pub limit: u64,
282    /// Sanitised AST shape (`Expr::shape_summary`) of the offending
283    /// query, ≤256 bytes, no values / paths / regex patterns. Cluster-C
284    /// iter-2: surfaced as `details.predicate_shape` on the runtime-
285    /// budget envelope so MCP clients see the same comparable shape
286    /// the cluster-B static-estimate envelope already exposes
287    /// (codex iter-1 review).
288    ///
289    /// `None` only when the executor surfaces budget exhaustion before
290    /// the finalize step has run (e.g. the cancellable wrappers' own
291    /// downcast). The envelope serializes `None` as JSON null.
292    pub predicate_shape: Option<String>,
293}
294
295#[cfg(test)]
296mod tests {
297    use super::*;
298    use crate::query::cancellation::CancellationToken;
299
300    #[test]
301    fn tick_below_max_returns_ok() {
302        let token = CancellationToken::new();
303        let budget = QueryBudget::new(10, token.clone());
304        for _ in 0..9 {
305            budget.tick().expect("first 9 ticks must succeed");
306        }
307        assert!(!budget.exceeded(), "9 ticks must not exceed budget of 10");
308        assert!(!token.is_cancelled(), "token must remain uncancelled");
309        assert_eq!(budget.source(), CancellationSource::None);
310    }
311
312    #[test]
313    fn tick_at_max_trips_cancel_and_stamps_budget_source() {
314        let token = CancellationToken::new();
315        let budget = QueryBudget::new(3, token.clone());
316        budget.tick().expect("tick 1 ok");
317        budget.tick().expect("tick 2 ok");
318        let err = budget.tick().expect_err("tick 3 must trip");
319        assert_eq!(err.examined, 3);
320        assert_eq!(err.limit, 3);
321        assert!(token.is_cancelled(), "tick must flip the token");
322        assert_eq!(
323            budget.source(),
324            CancellationSource::Budget,
325            "budget overflow must stamp source = Budget"
326        );
327    }
328
329    #[test]
330    fn external_cancel_first_blocks_budget_tag() {
331        let token = CancellationToken::new();
332        let budget = QueryBudget::new(3, token.clone());
333        // External arrives first.
334        assert!(budget.mark_external_cancel(), "External wins CAS");
335        // Subsequent budget overflow MUST NOT overwrite the tag.
336        budget.tick().expect("first tick ok");
337        budget.tick().expect("second tick ok");
338        let _ = budget.tick();
339        assert_eq!(
340            budget.source(),
341            CancellationSource::External,
342            "external-first must keep tag = External even after budget overflow"
343        );
344    }
345
346    #[test]
347    fn budget_cancel_first_blocks_external_tag() {
348        let token = CancellationToken::new();
349        let budget = QueryBudget::new(2, token.clone());
350        budget.tick().expect("first tick ok");
351        let _ = budget.tick(); // trips
352        assert_eq!(budget.source(), CancellationSource::Budget);
353        // External arrives after budget — CAS fails, tag stays Budget.
354        assert!(
355            !budget.mark_external_cancel(),
356            "external-second CAS must fail"
357        );
358        assert_eq!(budget.source(), CancellationSource::Budget);
359    }
360
361    #[test]
362    fn from_per_call_prefers_per_call_value_over_env() {
363        // Use a unique env-var snapshot so concurrent tests
364        // don't interfere — but here the per-call value should
365        // win regardless.
366        // SAFETY: setting an env var is a one-process-wide write;
367        // single-threaded test scope.
368        unsafe {
369            std::env::set_var(ENV_TOOL_BUDGET_ROWS, "999");
370        }
371        let token = CancellationToken::new();
372        let budget = QueryBudget::from_per_call_or_env(Some(42), token);
373        assert_eq!(budget.max_rows, 42, "per-call value must override env var");
374        unsafe {
375            std::env::remove_var(ENV_TOOL_BUDGET_ROWS);
376        }
377    }
378
379    #[test]
380    fn from_per_call_zero_falls_back_to_default() {
381        unsafe {
382            std::env::remove_var(ENV_TOOL_BUDGET_ROWS);
383        }
384        let token = CancellationToken::new();
385        let budget = QueryBudget::from_per_call_or_env(Some(0), token);
386        assert_eq!(
387            budget.max_rows, DEFAULT_BUDGET_ROWS,
388            "per-call zero must map to the default rather than trip immediately"
389        );
390    }
391
392    #[test]
393    fn from_per_call_none_uses_default_when_env_unset() {
394        unsafe {
395            std::env::remove_var(ENV_TOOL_BUDGET_ROWS);
396        }
397        let token = CancellationToken::new();
398        let budget = QueryBudget::from_per_call_or_env(None, token);
399        assert_eq!(budget.max_rows, DEFAULT_BUDGET_ROWS);
400    }
401
402    #[test]
403    fn unbounded_budget_never_trips_on_realistic_iteration_count() {
404        let token = CancellationToken::new();
405        let budget = QueryBudget::unbounded(token.clone());
406        // 1k ticks against a u64::MAX cap is nowhere near the
407        // overflow boundary — the unbounded variant must not
408        // accidentally trip.
409        for _ in 0..1_000 {
410            budget.tick().expect("unbounded must not trip");
411        }
412        assert!(!token.is_cancelled());
413    }
414}