Skip to main content

sqry_core/query/
budget.rs

1//! Per-tool runtime row budget (per `C_budget.md` §§1–6 +
2//! `00_contracts.md` §3.CC-1 + §3.CC-2).
3//!
4//! Runtime backstop for queries that slipped past the static cost
5//! gate ([`crate::query::cost_gate`] / Subagent B). The budget caps
6//! how many rows the executor may examine inside `evaluate_all`'s
7//! hot loop; on overflow the budget trips Subagent A's
8//! [`CancellationToken`] so the existing drain / drop pathway fires
9//! uniformly. A shared [`CancellationSource`] tag records *which*
10//! signal cancelled the token so the wrapper-side downcast can
11//! choose between [`BudgetExceeded`] (→ `query_too_broad` with
12//! `details.source = "runtime_budget"`) and
13//! [`crate::query::error::QueryError::Cancelled`] (→
14//! `deadline_exceeded`) deterministically — even when the
15//! sequential / parallel evaluator paths interleave with the
16//! wrapper deadline drop-guard.
17//!
18//! [`CancellationToken`]: crate::query::cancellation::CancellationToken
19
20use std::sync::Arc;
21use std::sync::atomic::{AtomicU8, AtomicU64, Ordering};
22
23use thiserror::Error;
24
25/// Documented default row budget per tool invocation.
26///
27/// `5_000_000` rows is large enough that healthy queries on
28/// realistic workspaces never trip it (a worst-case `kind:function`
29/// scan across every node in a multi-million-line monorepo is
30/// bounded by node-count, not by query shape). It exists to bound
31/// the worst-case runtime on a query that bypassed the static cost
32/// gate via a coupling rule that turned out to be inadequate
33/// (e.g. a `kind:function` coupled regex over a synthetic-graph
34/// monorepo with millions of generated function nodes — the
35/// maintainer's reported failure mode).
36pub const DEFAULT_BUDGET_ROWS: u64 = 5_000_000;
37
38/// Environment-variable name for the global default override.
39/// Per-call `budget_rows` MCP parameters (per
40/// `C_budget.md` §C5) take precedence over the env-var default.
41pub const ENV_TOOL_BUDGET_ROWS: &str = "SQRY_TOOL_BUDGET_ROWS";
42
43/// Default per-row check stride. Trades early-trip precision for
44/// `fetch_add` overhead. 256 keeps the per-row overhead under one
45/// extra branch + one `Relaxed` load on the cancel-poll path.
46pub const DEFAULT_CHECK_STRIDE: u64 = 256;
47
48/// Discriminator for which subsystem first triggered cancellation
49/// against a [`QueryBudget`]. Stored as `AtomicU8` so it can be
50/// tagged once and read by every Rayon worker observing
51/// `cancel.is_cancelled()`. Per `C_budget.md` §3 + Codex iter-1
52/// finding 3 fix.
53///
54/// First-observer-wins: any worker that notices the token is
55/// cancelled and finds the source still tagged `None` performs a
56/// CAS to install `External`. The CAS makes
57/// "deadline-cancel-arrived-while-budget-was-just-overflowing"
58/// deterministic — whichever signal raced to mark the source first
59/// wins, and every subsequent observer (including Rayon workers
60/// running on other cores) reads the same tag and emits the
61/// matching typed error.
62#[derive(Debug, Clone, Copy, PartialEq, Eq)]
63#[repr(u8)]
64pub enum CancellationSource {
65    /// Token not yet cancelled (or no source tag has been written
66    /// yet — see `evaluate_all`'s `classify_cancel` for the rule
67    /// that treats `None` observed alongside `is_cancelled()` as
68    /// `External`).
69    None = 0,
70    /// Budget [`QueryBudget::tick`] overflowed `max_rows` and
71    /// called `cancel.cancel()`.
72    Budget = 1,
73    /// Some other path — the wrapper deadline drop-guard, an
74    /// admin `daemon/cancel-tool` future hook, or a parent build
75    /// cancellation — flipped the shared token.
76    External = 2,
77}
78
79impl CancellationSource {
80    /// Reverse of `repr(u8)`. Maps unknown values to `None` so a
81    /// future variant added on a writer that the reader hasn't
82    /// caught up with does not fault.
83    #[inline]
84    #[must_use]
85    pub fn from_u8(v: u8) -> Self {
86        match v {
87            1 => CancellationSource::Budget,
88            2 => CancellationSource::External,
89            _ => CancellationSource::None,
90        }
91    }
92}
93
94/// Per-tool runtime budget. Constructed at the
95/// [`crate::query::executor::QueryExecutor`] boundary; threaded
96/// into [`crate::query::executor::graph_eval::GraphEvalContext`]
97/// so `evaluate_all` can sample-and-check it without re-reading
98/// env vars per call.
99///
100/// `Clone` is cheap: the inner counters are `Arc`-shared, so
101/// cloning across rayon workers is a refcount bump per field.
102#[derive(Debug, Clone)]
103pub struct QueryBudget {
104    /// Maximum rows the executor may examine before tripping
105    /// cancellation. `0` is rejected at construction sites (MCP
106    /// boundary + env-var parse).
107    pub max_rows: u64,
108    /// How many rows have actually been examined. Shared across
109    /// rayon worker threads; reset per `evaluate_all` call.
110    pub examined: Arc<AtomicU64>,
111    /// Shared cancellation token (canonical type per
112    /// `00_contracts.md` §3.CC-1). Tripped when
113    /// `examined >= max_rows`.
114    pub cancel: crate::query::cancellation::CancellationToken,
115    /// First-observer-wins source tag. Written exactly once by
116    /// the first signal to cancel the shared token (either
117    /// [`Self::tick`] on budget overflow or
118    /// [`Self::mark_external_cancel`]).
119    pub state: Arc<AtomicU8>,
120    /// How often (in rows) to compare `examined` against
121    /// `max_rows`. Trades early-trip precision for `fetch_add`
122    /// overhead.
123    pub check_stride: u64,
124}
125
126impl QueryBudget {
127    /// Construct a fresh budget with `max_rows` and the documented
128    /// [`DEFAULT_CHECK_STRIDE`]. The supplied token is the canonical
129    /// per-request `CancellationToken` from Subagent A's wrapper —
130    /// `tick()` calls `cancel()` on it when the budget trips, so
131    /// every clone of the token observes the cancellation through
132    /// the same shared `Arc<AtomicBool>`.
133    #[must_use]
134    pub fn new(max_rows: u64, cancel: crate::query::cancellation::CancellationToken) -> Self {
135        Self {
136            max_rows,
137            examined: Arc::new(AtomicU64::new(0)),
138            cancel,
139            state: Arc::new(AtomicU8::new(CancellationSource::None as u8)),
140            check_stride: DEFAULT_CHECK_STRIDE,
141        }
142    }
143
144    /// Construct an effectively-unbounded budget (`u64::MAX` rows)
145    /// for back-compat callers that have not opted into per-tool
146    /// budgeting yet. The cancellation token is still wired so
147    /// external cancel signals propagate normally.
148    #[must_use]
149    pub fn unbounded(cancel: crate::query::cancellation::CancellationToken) -> Self {
150        Self::new(u64::MAX, cancel)
151    }
152
153    /// Resolve the effective budget for an MCP tool call.
154    ///
155    /// Priority (per `C_budget.md` §2):
156    ///
157    /// 1. Per-call `budget_rows` MCP parameter (the caller can
158    ///    opt to a tighter or looser bound for diagnostics).
159    /// 2. Environment-variable `SQRY_TOOL_BUDGET_ROWS`.
160    /// 3. [`DEFAULT_BUDGET_ROWS`].
161    ///
162    /// `0` is rejected at this boundary by mapping to the
163    /// effectively-unbounded variant — a budget of zero would
164    /// trip on the first row, which is never the operator
165    /// intent. Negative env values are similarly mapped to the
166    /// default.
167    #[must_use]
168    pub fn from_per_call_or_env(
169        per_call_budget: Option<u64>,
170        cancel: crate::query::cancellation::CancellationToken,
171    ) -> Self {
172        if let Some(rows) = per_call_budget
173            && rows > 0
174        {
175            return Self::new(rows, cancel);
176        }
177        let env_rows = std::env::var(ENV_TOOL_BUDGET_ROWS)
178            .ok()
179            .and_then(|s| s.parse::<u64>().ok())
180            .filter(|n| *n > 0);
181        let max_rows = env_rows.unwrap_or(DEFAULT_BUDGET_ROWS);
182        Self::new(max_rows, cancel)
183    }
184
185    /// Returns true iff the budget has been exceeded. Cheap: one
186    /// `Relaxed` load + comparison.
187    #[inline]
188    #[must_use]
189    pub fn exceeded(&self) -> bool {
190        self.examined.load(Ordering::Relaxed) >= self.max_rows
191    }
192
193    /// Reads the (possibly still `None`) cancellation source tag.
194    #[inline]
195    #[must_use]
196    pub fn source(&self) -> CancellationSource {
197        CancellationSource::from_u8(self.state.load(Ordering::Acquire))
198    }
199
200    /// Tag the source as `External` IFF it is still `None`. CAS,
201    /// safe to call from any worker on observation. No-op if
202    /// `Budget` already won the race. Returned bool indicates
203    /// whether THIS call won the CAS (used in tests).
204    #[inline]
205    #[must_use]
206    pub fn mark_external_cancel(&self) -> bool {
207        self.state
208            .compare_exchange(
209                CancellationSource::None as u8,
210                CancellationSource::External as u8,
211                Ordering::AcqRel,
212                Ordering::Acquire,
213            )
214            .is_ok()
215    }
216
217    /// Tag the source as `Budget` IFF it is still `None`.
218    #[inline]
219    fn mark_budget_cancel(&self) -> bool {
220        self.state
221            .compare_exchange(
222                CancellationSource::None as u8,
223                CancellationSource::Budget as u8,
224                Ordering::AcqRel,
225                Ordering::Acquire,
226            )
227            .is_ok()
228    }
229
230    /// Increment by 1. Returns `Err(BudgetExceeded)` the first
231    /// time the post-increment count crosses `max_rows`.
232    /// Idempotent on subsequent crosses (so multiple rayon
233    /// workers racing past the threshold all see `Err` but the
234    /// cancel-once contract holds via `CancellationToken::cancel`'s
235    /// internal `AtomicBool`).
236    ///
237    /// Source-tag invariant: the tag is set to `Budget` BEFORE
238    /// `cancel.cancel()` is called. This ordering means any
239    /// observer that subsequently reads
240    /// `cancel.is_cancelled() == true` is guaranteed to see
241    /// `source() != None` — the consumer side of the invariant
242    /// lives in `evaluate_all`'s `classify_cancel` block.
243    ///
244    /// # Errors
245    ///
246    /// Returns [`BudgetExceeded`] when the post-increment
247    /// `examined` count meets or exceeds `max_rows`.
248    #[inline]
249    pub fn tick(&self) -> Result<(), BudgetExceeded> {
250        let prev = self.examined.fetch_add(1, Ordering::Relaxed);
251        if prev + 1 >= self.max_rows {
252            // Order matters: stamp source BEFORE flipping the
253            // token, so observers that see is_cancelled() also
254            // see Budget.
255            self.mark_budget_cancel();
256            self.cancel.cancel();
257            return Err(BudgetExceeded {
258                examined: prev + 1,
259                limit: self.max_rows,
260                predicate_shape: None,
261            });
262        }
263        Ok(())
264    }
265}
266
267/// Typed signal that a query exceeded its row budget. Surfaced
268/// from `evaluate_all` as `anyhow::Error::from(BudgetExceeded { .. })`;
269/// the MCP / daemon outer wrappers downcast on it (the same pattern
270/// `sqry-mcp/src/server.rs` uses for `RpcError`) and emit the
271/// `query_too_broad` envelope variant declared by Subagent B with
272/// `details.source = "runtime_budget"`,
273/// `details.examined = examined`, `details.limit = limit`.
274#[derive(Debug, Clone, Error, PartialEq, Eq)]
275#[error("query exceeded row budget: examined {examined} rows, limit {limit}")]
276pub struct BudgetExceeded {
277    /// Rows examined before the budget tripped. Surfaced as
278    /// `details.examined` in the wire envelope.
279    pub examined: u64,
280    /// Configured row budget at the time of the trip. Surfaced as
281    /// `details.limit` in the wire envelope.
282    pub limit: u64,
283    /// Sanitised AST shape (`Expr::shape_summary`) of the offending
284    /// query, ≤256 bytes, no values / paths / regex patterns. Cluster-C
285    /// iter-2: surfaced as `details.predicate_shape` on the runtime-
286    /// budget envelope so MCP clients see the same comparable shape
287    /// the cluster-B static-estimate envelope already exposes
288    /// (codex iter-1 review).
289    ///
290    /// `None` only when the executor surfaces budget exhaustion before
291    /// the finalize step has run (e.g. the cancellable wrappers' own
292    /// downcast). The envelope serializes `None` as JSON null.
293    pub predicate_shape: Option<String>,
294}
295
296#[cfg(test)]
297mod tests {
298    use super::*;
299    use crate::query::cancellation::CancellationToken;
300
301    #[test]
302    fn tick_below_max_returns_ok() {
303        let token = CancellationToken::new();
304        let budget = QueryBudget::new(10, token.clone());
305        for _ in 0..9 {
306            budget.tick().expect("first 9 ticks must succeed");
307        }
308        assert!(!budget.exceeded(), "9 ticks must not exceed budget of 10");
309        assert!(!token.is_cancelled(), "token must remain uncancelled");
310        assert_eq!(budget.source(), CancellationSource::None);
311    }
312
313    #[test]
314    fn tick_at_max_trips_cancel_and_stamps_budget_source() {
315        let token = CancellationToken::new();
316        let budget = QueryBudget::new(3, token.clone());
317        budget.tick().expect("tick 1 ok");
318        budget.tick().expect("tick 2 ok");
319        let err = budget.tick().expect_err("tick 3 must trip");
320        assert_eq!(err.examined, 3);
321        assert_eq!(err.limit, 3);
322        assert!(token.is_cancelled(), "tick must flip the token");
323        assert_eq!(
324            budget.source(),
325            CancellationSource::Budget,
326            "budget overflow must stamp source = Budget"
327        );
328    }
329
330    #[test]
331    fn external_cancel_first_blocks_budget_tag() {
332        let token = CancellationToken::new();
333        let budget = QueryBudget::new(3, token.clone());
334        // External arrives first.
335        assert!(budget.mark_external_cancel(), "External wins CAS");
336        // Subsequent budget overflow MUST NOT overwrite the tag.
337        budget.tick().expect("first tick ok");
338        budget.tick().expect("second tick ok");
339        let _ = budget.tick();
340        assert_eq!(
341            budget.source(),
342            CancellationSource::External,
343            "external-first must keep tag = External even after budget overflow"
344        );
345    }
346
347    #[test]
348    fn budget_cancel_first_blocks_external_tag() {
349        let token = CancellationToken::new();
350        let budget = QueryBudget::new(2, token.clone());
351        budget.tick().expect("first tick ok");
352        let _ = budget.tick(); // trips
353        assert_eq!(budget.source(), CancellationSource::Budget);
354        // External arrives after budget — CAS fails, tag stays Budget.
355        assert!(
356            !budget.mark_external_cancel(),
357            "external-second CAS must fail"
358        );
359        assert_eq!(budget.source(), CancellationSource::Budget);
360    }
361
362    // The three tests below mutate the process-global
363    // ENV_TOOL_BUDGET_ROWS variable. Cargo runs tests on parallel
364    // threads, so without serialization the set_var("999") here races
365    // the remove_var-then-read in the siblings (observed as a CI flake,
366    // default 5_000_000 read back as 999). #[serial_test::serial] puts
367    // every env-mutating test in this binary on one lane.
368    #[test]
369    #[serial_test::serial]
370    fn from_per_call_prefers_per_call_value_over_env() {
371        // The per-call value should win regardless of the env var.
372        // SAFETY: env mutation is process-wide; serialized via
373        // #[serial_test::serial] above.
374        unsafe {
375            std::env::set_var(ENV_TOOL_BUDGET_ROWS, "999");
376        }
377        let token = CancellationToken::new();
378        let budget = QueryBudget::from_per_call_or_env(Some(42), token);
379        assert_eq!(budget.max_rows, 42, "per-call value must override env var");
380        unsafe {
381            std::env::remove_var(ENV_TOOL_BUDGET_ROWS);
382        }
383    }
384
385    #[test]
386    #[serial_test::serial]
387    fn from_per_call_zero_falls_back_to_default() {
388        unsafe {
389            std::env::remove_var(ENV_TOOL_BUDGET_ROWS);
390        }
391        let token = CancellationToken::new();
392        let budget = QueryBudget::from_per_call_or_env(Some(0), token);
393        assert_eq!(
394            budget.max_rows, DEFAULT_BUDGET_ROWS,
395            "per-call zero must map to the default rather than trip immediately"
396        );
397    }
398
399    #[test]
400    #[serial_test::serial]
401    fn from_per_call_none_uses_default_when_env_unset() {
402        unsafe {
403            std::env::remove_var(ENV_TOOL_BUDGET_ROWS);
404        }
405        let token = CancellationToken::new();
406        let budget = QueryBudget::from_per_call_or_env(None, token);
407        assert_eq!(budget.max_rows, DEFAULT_BUDGET_ROWS);
408    }
409
410    #[test]
411    fn unbounded_budget_never_trips_on_realistic_iteration_count() {
412        let token = CancellationToken::new();
413        let budget = QueryBudget::unbounded(token.clone());
414        // 1k ticks against a u64::MAX cap is nowhere near the
415        // overflow boundary — the unbounded variant must not
416        // accidentally trip.
417        for _ in 0..1_000 {
418            budget.tick().expect("unbounded must not trip");
419        }
420        assert!(!token.is_cancelled());
421    }
422}