sqry_core/query/budget.rs
1//! Per-tool runtime row budget (per `C_budget.md` §§1–6 +
2//! `00_contracts.md` §3.CC-1 + §3.CC-2).
3//!
4//! Runtime backstop for queries that slipped past the static cost
5//! gate ([`crate::query::cost_gate`] / Subagent B). The budget caps
6//! how many rows the executor may examine inside `evaluate_all`'s
7//! hot loop; on overflow the budget trips Subagent A's
8//! [`CancellationToken`] so the existing drain / drop pathway fires
9//! uniformly. A shared [`CancellationSource`] tag records *which*
10//! signal cancelled the token so the wrapper-side downcast can
11//! choose between [`BudgetExceeded`] (→ `query_too_broad` with
12//! `details.source = "runtime_budget"`) and
13//! [`crate::query::error::QueryError::Cancelled`] (→
14//! `deadline_exceeded`) deterministically — even when the
15//! sequential / parallel evaluator paths interleave with the
16//! wrapper deadline drop-guard.
17//!
18//! [`CancellationToken`]: crate::query::cancellation::CancellationToken
19
20use std::sync::Arc;
21use std::sync::atomic::{AtomicU8, AtomicU64, Ordering};
22
23use thiserror::Error;
24
25/// Documented default row budget per tool invocation.
26///
27/// 5_000_000 rows is large enough that healthy queries on
28/// realistic workspaces never trip it (a worst-case `kind:function`
29/// scan across every node in a multi-million-line monorepo is
30/// bounded by node-count, not by query shape). It exists to bound
31/// the worst-case runtime on a query that bypassed the static cost
32/// gate via a coupling rule that turned out to be inadequate
33/// (e.g. a `kind:function` coupled regex over a synthetic-graph
34/// monorepo with millions of generated function nodes — the
35/// maintainer's reported failure mode).
36pub const DEFAULT_BUDGET_ROWS: u64 = 5_000_000;
37
38/// Environment-variable name for the global default override.
39/// Per-call `budget_rows` MCP parameters (per
40/// `C_budget.md` §C5) take precedence over the env-var default.
41pub const ENV_TOOL_BUDGET_ROWS: &str = "SQRY_TOOL_BUDGET_ROWS";
42
43/// Default per-row check stride. Trades early-trip precision for
44/// `fetch_add` overhead. 256 keeps the per-row overhead under one
45/// extra branch + one `Relaxed` load on the cancel-poll path.
46pub const DEFAULT_CHECK_STRIDE: u64 = 256;
47
48/// Discriminator for which subsystem first triggered cancellation
49/// against a [`QueryBudget`]. Stored as `AtomicU8` so it can be
50/// tagged once and read by every Rayon worker observing
51/// `cancel.is_cancelled()`. Per `C_budget.md` §3 + Codex iter-1
52/// finding 3 fix.
53///
54/// First-observer-wins: any worker that notices the token is
55/// cancelled and finds the source still tagged `None` performs a
56/// CAS to install `External`. The CAS makes
57/// "deadline-cancel-arrived-while-budget-was-just-overflowing"
58/// deterministic — whichever signal raced to mark the source first
59/// wins, and every subsequent observer (including Rayon workers
60/// running on other cores) reads the same tag and emits the
61/// matching typed error.
62#[derive(Debug, Clone, Copy, PartialEq, Eq)]
63#[repr(u8)]
64pub enum CancellationSource {
65 /// Token not yet cancelled (or no source tag has been written
66 /// yet — see `evaluate_all`'s `classify_cancel` for the rule
67 /// that treats `None` observed alongside `is_cancelled()` as
68 /// `External`).
69 None = 0,
70 /// Budget [`QueryBudget::tick`] overflowed `max_rows` and
71 /// called `cancel.cancel()`.
72 Budget = 1,
73 /// Some other path — the wrapper deadline drop-guard, an
74 /// admin `daemon/cancel-tool` future hook, or a parent build
75 /// cancellation — flipped the shared token.
76 External = 2,
77}
78
79impl CancellationSource {
80 /// Reverse of `repr(u8)`. Maps unknown values to `None` so a
81 /// future variant added on a writer that the reader hasn't
82 /// caught up with does not fault.
83 #[inline]
84 #[must_use]
85 pub fn from_u8(v: u8) -> Self {
86 match v {
87 1 => CancellationSource::Budget,
88 2 => CancellationSource::External,
89 _ => CancellationSource::None,
90 }
91 }
92}
93
94/// Per-tool runtime budget. Constructed at the
95/// [`crate::query::executor::QueryExecutor`] boundary; threaded
96/// into [`crate::query::executor::graph_eval::GraphEvalContext`]
97/// so `evaluate_all` can sample-and-check it without re-reading
98/// env vars per call.
99///
100/// `Clone` is cheap: the inner counters are `Arc`-shared, so
101/// cloning across rayon workers is a refcount bump per field.
102#[derive(Debug, Clone)]
103pub struct QueryBudget {
104 /// Maximum rows the executor may examine before tripping
105 /// cancellation. `0` is rejected at construction sites (MCP
106 /// boundary + env-var parse).
107 pub max_rows: u64,
108 /// How many rows have actually been examined. Shared across
109 /// rayon worker threads; reset per `evaluate_all` call.
110 pub examined: Arc<AtomicU64>,
111 /// Shared cancellation token (canonical type per
112 /// `00_contracts.md` §3.CC-1). Tripped when
113 /// `examined >= max_rows`.
114 pub cancel: crate::query::cancellation::CancellationToken,
115 /// First-observer-wins source tag. Written exactly once by
116 /// the first signal to cancel the shared token (either
117 /// [`Self::tick`] on budget overflow or
118 /// [`Self::mark_external_cancel`]).
119 pub state: Arc<AtomicU8>,
120 /// How often (in rows) to compare `examined` against
121 /// `max_rows`. Trades early-trip precision for `fetch_add`
122 /// overhead.
123 pub check_stride: u64,
124}
125
126impl QueryBudget {
127 /// Construct a fresh budget with `max_rows` and the documented
128 /// [`DEFAULT_CHECK_STRIDE`]. The supplied token is the canonical
129 /// per-request `CancellationToken` from Subagent A's wrapper —
130 /// `tick()` calls `cancel()` on it when the budget trips, so
131 /// every clone of the token observes the cancellation through
132 /// the same shared `Arc<AtomicBool>`.
133 #[must_use]
134 pub fn new(max_rows: u64, cancel: crate::query::cancellation::CancellationToken) -> Self {
135 Self {
136 max_rows,
137 examined: Arc::new(AtomicU64::new(0)),
138 cancel,
139 state: Arc::new(AtomicU8::new(CancellationSource::None as u8)),
140 check_stride: DEFAULT_CHECK_STRIDE,
141 }
142 }
143
144 /// Construct an effectively-unbounded budget (`u64::MAX` rows)
145 /// for back-compat callers that have not opted into per-tool
146 /// budgeting yet. The cancellation token is still wired so
147 /// external cancel signals propagate normally.
148 #[must_use]
149 pub fn unbounded(cancel: crate::query::cancellation::CancellationToken) -> Self {
150 Self::new(u64::MAX, cancel)
151 }
152
153 /// Resolve the effective budget for an MCP tool call.
154 ///
155 /// Priority (per `C_budget.md` §2):
156 ///
157 /// 1. Per-call `budget_rows` MCP parameter (the caller can
158 /// opt to a tighter or looser bound for diagnostics).
159 /// 2. Environment-variable `SQRY_TOOL_BUDGET_ROWS`.
160 /// 3. [`DEFAULT_BUDGET_ROWS`].
161 ///
162 /// `0` is rejected at this boundary by mapping to the
163 /// effectively-unbounded variant — a budget of zero would
164 /// trip on the first row, which is never the operator
165 /// intent. Negative env values are similarly mapped to the
166 /// default.
167 #[must_use]
168 pub fn from_per_call_or_env(
169 per_call_budget: Option<u64>,
170 cancel: crate::query::cancellation::CancellationToken,
171 ) -> Self {
172 if let Some(rows) = per_call_budget
173 && rows > 0
174 {
175 return Self::new(rows, cancel);
176 }
177 let env_rows = std::env::var(ENV_TOOL_BUDGET_ROWS)
178 .ok()
179 .and_then(|s| s.parse::<u64>().ok())
180 .filter(|n| *n > 0);
181 let max_rows = env_rows.unwrap_or(DEFAULT_BUDGET_ROWS);
182 Self::new(max_rows, cancel)
183 }
184
185 /// Returns true iff the budget has been exceeded. Cheap: one
186 /// `Relaxed` load + comparison.
187 #[inline]
188 #[must_use]
189 pub fn exceeded(&self) -> bool {
190 self.examined.load(Ordering::Relaxed) >= self.max_rows
191 }
192
193 /// Reads the (possibly still `None`) cancellation source tag.
194 #[inline]
195 #[must_use]
196 pub fn source(&self) -> CancellationSource {
197 CancellationSource::from_u8(self.state.load(Ordering::Acquire))
198 }
199
200 /// Tag the source as `External` IFF it is still `None`. CAS,
201 /// safe to call from any worker on observation. No-op if
202 /// `Budget` already won the race. Returned bool indicates
203 /// whether THIS call won the CAS (used in tests).
204 #[inline]
205 pub fn mark_external_cancel(&self) -> bool {
206 self.state
207 .compare_exchange(
208 CancellationSource::None as u8,
209 CancellationSource::External as u8,
210 Ordering::AcqRel,
211 Ordering::Acquire,
212 )
213 .is_ok()
214 }
215
216 /// Tag the source as `Budget` IFF it is still `None`.
217 #[inline]
218 fn mark_budget_cancel(&self) -> bool {
219 self.state
220 .compare_exchange(
221 CancellationSource::None as u8,
222 CancellationSource::Budget as u8,
223 Ordering::AcqRel,
224 Ordering::Acquire,
225 )
226 .is_ok()
227 }
228
229 /// Increment by 1. Returns `Err(BudgetExceeded)` the first
230 /// time the post-increment count crosses `max_rows`.
231 /// Idempotent on subsequent crosses (so multiple rayon
232 /// workers racing past the threshold all see `Err` but the
233 /// cancel-once contract holds via `CancellationToken::cancel`'s
234 /// internal `AtomicBool`).
235 ///
236 /// Source-tag invariant: the tag is set to `Budget` BEFORE
237 /// `cancel.cancel()` is called. This ordering means any
238 /// observer that subsequently reads
239 /// `cancel.is_cancelled() == true` is guaranteed to see
240 /// `source() != None` — the consumer side of the invariant
241 /// lives in `evaluate_all`'s `classify_cancel` block.
242 ///
243 /// # Errors
244 ///
245 /// Returns [`BudgetExceeded`] when the post-increment
246 /// `examined` count meets or exceeds `max_rows`.
247 #[inline]
248 pub fn tick(&self) -> Result<(), BudgetExceeded> {
249 let prev = self.examined.fetch_add(1, Ordering::Relaxed);
250 if prev + 1 >= self.max_rows {
251 // Order matters: stamp source BEFORE flipping the
252 // token, so observers that see is_cancelled() also
253 // see Budget.
254 self.mark_budget_cancel();
255 self.cancel.cancel();
256 return Err(BudgetExceeded {
257 examined: prev + 1,
258 limit: self.max_rows,
259 predicate_shape: None,
260 });
261 }
262 Ok(())
263 }
264}
265
266/// Typed signal that a query exceeded its row budget. Surfaced
267/// from `evaluate_all` as `anyhow::Error::from(BudgetExceeded { .. })`;
268/// the MCP / daemon outer wrappers downcast on it (the same pattern
269/// `sqry-mcp/src/server.rs` uses for `RpcError`) and emit the
270/// `query_too_broad` envelope variant declared by Subagent B with
271/// `details.source = "runtime_budget"`,
272/// `details.examined = examined`, `details.limit = limit`.
273#[derive(Debug, Clone, Error, PartialEq, Eq)]
274#[error("query exceeded row budget: examined {examined} rows, limit {limit}")]
275pub struct BudgetExceeded {
276 /// Rows examined before the budget tripped. Surfaced as
277 /// `details.examined` in the wire envelope.
278 pub examined: u64,
279 /// Configured row budget at the time of the trip. Surfaced as
280 /// `details.limit` in the wire envelope.
281 pub limit: u64,
282 /// Sanitised AST shape (`Expr::shape_summary`) of the offending
283 /// query, ≤256 bytes, no values / paths / regex patterns. Cluster-C
284 /// iter-2: surfaced as `details.predicate_shape` on the runtime-
285 /// budget envelope so MCP clients see the same comparable shape
286 /// the cluster-B static-estimate envelope already exposes
287 /// (codex iter-1 review).
288 ///
289 /// `None` only when the executor surfaces budget exhaustion before
290 /// the finalize step has run (e.g. the cancellable wrappers' own
291 /// downcast). The envelope serializes `None` as JSON null.
292 pub predicate_shape: Option<String>,
293}
294
295#[cfg(test)]
296mod tests {
297 use super::*;
298 use crate::query::cancellation::CancellationToken;
299
300 #[test]
301 fn tick_below_max_returns_ok() {
302 let token = CancellationToken::new();
303 let budget = QueryBudget::new(10, token.clone());
304 for _ in 0..9 {
305 budget.tick().expect("first 9 ticks must succeed");
306 }
307 assert!(!budget.exceeded(), "9 ticks must not exceed budget of 10");
308 assert!(!token.is_cancelled(), "token must remain uncancelled");
309 assert_eq!(budget.source(), CancellationSource::None);
310 }
311
312 #[test]
313 fn tick_at_max_trips_cancel_and_stamps_budget_source() {
314 let token = CancellationToken::new();
315 let budget = QueryBudget::new(3, token.clone());
316 budget.tick().expect("tick 1 ok");
317 budget.tick().expect("tick 2 ok");
318 let err = budget.tick().expect_err("tick 3 must trip");
319 assert_eq!(err.examined, 3);
320 assert_eq!(err.limit, 3);
321 assert!(token.is_cancelled(), "tick must flip the token");
322 assert_eq!(
323 budget.source(),
324 CancellationSource::Budget,
325 "budget overflow must stamp source = Budget"
326 );
327 }
328
329 #[test]
330 fn external_cancel_first_blocks_budget_tag() {
331 let token = CancellationToken::new();
332 let budget = QueryBudget::new(3, token.clone());
333 // External arrives first.
334 assert!(budget.mark_external_cancel(), "External wins CAS");
335 // Subsequent budget overflow MUST NOT overwrite the tag.
336 budget.tick().expect("first tick ok");
337 budget.tick().expect("second tick ok");
338 let _ = budget.tick();
339 assert_eq!(
340 budget.source(),
341 CancellationSource::External,
342 "external-first must keep tag = External even after budget overflow"
343 );
344 }
345
346 #[test]
347 fn budget_cancel_first_blocks_external_tag() {
348 let token = CancellationToken::new();
349 let budget = QueryBudget::new(2, token.clone());
350 budget.tick().expect("first tick ok");
351 let _ = budget.tick(); // trips
352 assert_eq!(budget.source(), CancellationSource::Budget);
353 // External arrives after budget — CAS fails, tag stays Budget.
354 assert!(
355 !budget.mark_external_cancel(),
356 "external-second CAS must fail"
357 );
358 assert_eq!(budget.source(), CancellationSource::Budget);
359 }
360
361 #[test]
362 fn from_per_call_prefers_per_call_value_over_env() {
363 // Use a unique env-var snapshot so concurrent tests
364 // don't interfere — but here the per-call value should
365 // win regardless.
366 // SAFETY: setting an env var is a one-process-wide write;
367 // single-threaded test scope.
368 unsafe {
369 std::env::set_var(ENV_TOOL_BUDGET_ROWS, "999");
370 }
371 let token = CancellationToken::new();
372 let budget = QueryBudget::from_per_call_or_env(Some(42), token);
373 assert_eq!(budget.max_rows, 42, "per-call value must override env var");
374 unsafe {
375 std::env::remove_var(ENV_TOOL_BUDGET_ROWS);
376 }
377 }
378
379 #[test]
380 fn from_per_call_zero_falls_back_to_default() {
381 unsafe {
382 std::env::remove_var(ENV_TOOL_BUDGET_ROWS);
383 }
384 let token = CancellationToken::new();
385 let budget = QueryBudget::from_per_call_or_env(Some(0), token);
386 assert_eq!(
387 budget.max_rows, DEFAULT_BUDGET_ROWS,
388 "per-call zero must map to the default rather than trip immediately"
389 );
390 }
391
392 #[test]
393 fn from_per_call_none_uses_default_when_env_unset() {
394 unsafe {
395 std::env::remove_var(ENV_TOOL_BUDGET_ROWS);
396 }
397 let token = CancellationToken::new();
398 let budget = QueryBudget::from_per_call_or_env(None, token);
399 assert_eq!(budget.max_rows, DEFAULT_BUDGET_ROWS);
400 }
401
402 #[test]
403 fn unbounded_budget_never_trips_on_realistic_iteration_count() {
404 let token = CancellationToken::new();
405 let budget = QueryBudget::unbounded(token.clone());
406 // 1k ticks against a u64::MAX cap is nowhere near the
407 // overflow boundary — the unbounded variant must not
408 // accidentally trip.
409 for _ in 0..1_000 {
410 budget.tick().expect("unbounded must not trip");
411 }
412 assert!(!token.is_cancelled());
413 }
414}