powdb_query/executor/mem_budget.rs
1//! Per-query memory budget accumulator (WS2).
2//!
3//! The blunt row-count caps (`MAX_SORT_ROWS`, `MAX_JOIN_ROWS`) cannot stop a
4//! query that materializes a small number of very large rows, and they don't
5//! cover GROUP BY hash tables or IN-list materialization at all. A crafted
6//! query could therefore OOM-kill the server process — fatal on AWS / Railway /
7//! Cloudflare where the process has a hard memory ceiling.
8//!
9//! This module adds a lightweight byte-budget accumulator that each
10//! materialization point charges as it grows its buffer. When the running
11//! total would exceed the configured limit we return
12//! [`QueryError::MemoryLimitExceeded`] cleanly — no panic, no partial state.
13//!
14//! ## Why a thread-local accumulator
15//!
16//! The read path runs concurrently behind `Arc<RwLock<Engine>>`: many threads
17//! call `execute_powql_readonly(&self)` at once. A single accumulator field on
18//! the `Engine` would (a) make `Engine` `!Sync` if it used `Cell`, and (b) be
19//! *wrong* even with an atomic, because concurrent queries would sum and reset
20//! each other's totals. Each query, however, runs to completion synchronously
21//! on a single thread (`spawn_blocking` → `dispatch_query` → `execute_powql*`),
22//! so a thread-local running total is both correct and contention-free. The
23//! per-query limit is passed explicitly (it lives on the `Engine` as a plain
24//! `usize`, which is `Copy`/`Sync`).
25//!
26//! Disk-spill (so over-budget queries still succeed) is explicitly deferred to
27//! Phase 3; for now over-budget is a clean error.
28
29use std::cell::Cell;
30
31use powdb_storage::types::Value;
32
33use crate::result::QueryError;
34
35/// Default per-query memory budget: 256 MB. Plumbed from
36/// `POWDB_QUERY_MEMORY_LIMIT` by the server.
37pub const DEFAULT_QUERY_MEMORY_LIMIT: usize = 256 * 1024 * 1024;
38
39thread_local! {
40 /// Bytes charged so far for the query currently executing on this thread.
41 /// Reset to zero at every *outermost* query entry via [`enter`].
42 static USED_BYTES: Cell<usize> = const { Cell::new(0) };
43
44 /// Reentrancy depth: how many budgeted statements are active on this
45 /// thread's call stack. `create_view` / `refresh_view` recursively call
46 /// `execute_powql` mid-statement, so the inner entry runs at depth > 0.
47 static DEPTH: Cell<u32> = const { Cell::new(0) };
48}
49
50/// Enter a budgeted statement frame. Returns an [`EnterGuard`] that decrements
51/// the depth on drop so the count stays correct even if execution unwinds via
52/// `?`/panic. Only the *outermost* entry (depth 0 → 1) zeroes the accumulator;
53/// nested entries (e.g. the source query of a `create_view`/`refresh_view`
54/// recursively calling `execute_powql`) leave the outer frame's charged bytes
55/// intact. This is a reentrancy guard rather than a save/restore because it is
56/// simpler — there is exactly one running total to protect and the guard makes
57/// the "outermost statement owns the reset" rule self-evident at the call site.
58#[inline]
59#[must_use = "the guard must be held for the duration of the statement"]
60pub fn enter() -> EnterGuard {
61 DEPTH.with(|d| {
62 let depth = d.get();
63 if depth == 0 {
64 USED_BYTES.with(|u| u.set(0));
65 }
66 d.set(depth + 1);
67 });
68 EnterGuard
69}
70
71/// RAII guard returned by [`enter`]; decrements the reentrancy depth on drop.
72pub struct EnterGuard;
73
74impl Drop for EnterGuard {
75 #[inline]
76 fn drop(&mut self) {
77 DEPTH.with(|d| d.set(d.get().saturating_sub(1)));
78 }
79}
80
81/// Reset the per-query running total for the current thread, unconditionally.
82/// Test-only escape hatch; production code goes through [`enter`] so nested
83/// statements never clobber an outer frame's accounting.
84#[cfg(test)]
85pub fn reset() {
86 USED_BYTES.with(|u| u.set(0));
87 DEPTH.with(|d| d.set(0));
88}
89
90/// Charge `bytes` against the current thread's running total, checking it
91/// against `limit_bytes`. Returns [`QueryError::MemoryLimitExceeded`] if this
92/// allocation would push the total over the limit. On error nothing is charged.
93#[inline]
94pub fn charge(bytes: usize, limit_bytes: usize) -> Result<(), QueryError> {
95 USED_BYTES.with(|u| {
96 let requested = u.get().saturating_add(bytes);
97 if requested > limit_bytes {
98 return Err(QueryError::MemoryLimitExceeded {
99 limit_bytes,
100 requested_bytes: requested,
101 });
102 }
103 u.set(requested);
104 Ok(())
105 })
106}
107
108/// Bytes charged so far on the current thread (test/diagnostic helper).
109#[cfg(test)]
110pub fn used() -> usize {
111 USED_BYTES.with(|u| u.get())
112}
113
114/// Estimate the in-memory footprint of a single `Value`, including the heap
115/// allocation behind `Str`/`Bytes`. The estimate counts the enum slot plus any
116/// owned heap bytes — it is intentionally an over-approximation so the guard
117/// trips slightly early rather than slightly late.
118#[inline]
119pub fn estimate_value_size(v: &Value) -> usize {
120 let base = std::mem::size_of::<Value>();
121 let heap = match v {
122 Value::Str(s) => s.capacity(),
123 Value::Bytes(b) => b.capacity(),
124 _ => 0,
125 };
126 base + heap
127}
128
129/// Estimate the in-memory footprint of a fully materialized row, including the
130/// `Vec<Value>` backing allocation.
131#[inline]
132pub fn estimate_row_size(row: &[Value]) -> usize {
133 let mut total = std::mem::size_of::<Vec<Value>>();
134 for v in row {
135 total += estimate_value_size(v);
136 }
137 total
138}
139
140#[cfg(test)]
141mod tests {
142 use super::*;
143
144 #[test]
145 fn charge_under_limit_succeeds() {
146 reset();
147 assert!(charge(512, 1024).is_ok());
148 assert!(charge(512, 1024).is_ok());
149 assert_eq!(used(), 1024);
150 }
151
152 #[test]
153 fn charge_over_limit_errors_without_charging() {
154 reset();
155 assert!(charge(512, 1024).is_ok());
156 let err = charge(1024, 1024).unwrap_err();
157 match err {
158 QueryError::MemoryLimitExceeded {
159 limit_bytes,
160 requested_bytes,
161 } => {
162 assert_eq!(limit_bytes, 1024);
163 assert_eq!(requested_bytes, 1536);
164 }
165 other => panic!("expected MemoryLimitExceeded, got {other:?}"),
166 }
167 // The failed charge did not advance the counter.
168 assert_eq!(used(), 512);
169 }
170
171 #[test]
172 fn nested_enter_preserves_outer_accounting() {
173 // Simulates an outer statement that charges bytes, then recursively
174 // enters a nested statement (as create_view/refresh_view do when their
175 // source query calls execute_powql). The nested `enter` must NOT reset
176 // the running total, so the outer frame's accounting survives.
177 reset();
178 let outer = enter();
179 assert_eq!(used(), 0, "outermost enter zeroes the accumulator");
180 charge(1000, 1_000_000).unwrap();
181 assert_eq!(used(), 1000);
182
183 {
184 // Nested statement entry (depth 1 -> 2): must not reset.
185 let _inner = enter();
186 assert_eq!(used(), 1000, "nested enter must not discard outer bytes");
187 charge(500, 1_000_000).unwrap();
188 assert_eq!(used(), 1500);
189 } // inner guard drops, depth 2 -> 1, accumulator untouched
190
191 assert_eq!(used(), 1500, "outer accounting includes nested charges");
192 drop(outer); // depth 1 -> 0
193
194 // A fresh outermost statement starts clean again.
195 let _next = enter();
196 assert_eq!(used(), 0, "next outermost statement resets");
197 }
198
199 #[test]
200 fn string_value_counts_heap_bytes() {
201 let small = estimate_value_size(&Value::Int(1));
202 let big = estimate_value_size(&Value::Str("x".repeat(10_000)));
203 assert!(big >= small + 10_000);
204 }
205}