Skip to main content

powdb_query/executor/
mem_budget.rs

1//! Per-query memory budget accumulator (WS2).
2//!
3//! The blunt row-count caps (`MAX_SORT_ROWS`, `MAX_JOIN_ROWS`) cannot stop a
4//! query that materializes a small number of very large rows, and they don't
5//! cover GROUP BY hash tables or IN-list materialization at all. A crafted
6//! query could therefore OOM-kill the server process — fatal on AWS / Railway /
7//! Cloudflare where the process has a hard memory ceiling.
8//!
9//! This module adds a lightweight byte-budget accumulator that each
10//! materialization point charges as it grows its buffer. When the running
11//! total would exceed the configured limit we return
12//! [`QueryError::MemoryLimitExceeded`] cleanly — no panic, no partial state.
13//!
14//! ## Why a thread-local accumulator
15//!
16//! The read path runs concurrently behind `Arc<RwLock<Engine>>`: many threads
17//! call `execute_powql_readonly(&self)` at once. A single accumulator field on
18//! the `Engine` would (a) make `Engine` `!Sync` if it used `Cell`, and (b) be
19//! *wrong* even with an atomic, because concurrent queries would sum and reset
20//! each other's totals. Each query, however, runs to completion synchronously
21//! on a single thread (`spawn_blocking` → `dispatch_query` → `execute_powql*`),
22//! so a thread-local running total is both correct and contention-free. The
23//! per-query limit is passed explicitly (it lives on the `Engine` as a plain
24//! `usize`, which is `Copy`/`Sync`).
25//!
26//! Disk-spill (so over-budget queries still succeed) is explicitly deferred to
27//! Phase 3; for now over-budget is a clean error.
28
29use std::cell::Cell;
30
31use powdb_storage::types::Value;
32
33use crate::result::QueryError;
34
35/// Default per-query memory budget: 256 MB. Plumbed from
36/// `POWDB_QUERY_MEMORY_LIMIT` by the server.
37pub const DEFAULT_QUERY_MEMORY_LIMIT: usize = 256 * 1024 * 1024;
38
39thread_local! {
40    /// Bytes charged so far for the query currently executing on this thread.
41    /// Reset to zero at every *outermost* query entry via [`enter`].
42    static USED_BYTES: Cell<usize> = const { Cell::new(0) };
43
44    /// Reentrancy depth: how many budgeted statements are active on this
45    /// thread's call stack. `create_view` / `refresh_view` recursively call
46    /// `execute_powql` mid-statement, so the inner entry runs at depth > 0.
47    static DEPTH: Cell<u32> = const { Cell::new(0) };
48}
49
50/// Enter a budgeted statement frame. Returns an [`EnterGuard`] that decrements
51/// the depth on drop so the count stays correct even if execution unwinds via
52/// `?`/panic. Only the *outermost* entry (depth 0 → 1) zeroes the accumulator;
53/// nested entries (e.g. the source query of a `create_view`/`refresh_view`
54/// recursively calling `execute_powql`) leave the outer frame's charged bytes
55/// intact. This is a reentrancy guard rather than a save/restore because it is
56/// simpler — there is exactly one running total to protect and the guard makes
57/// the "outermost statement owns the reset" rule self-evident at the call site.
58#[inline]
59#[must_use = "the guard must be held for the duration of the statement"]
60pub fn enter() -> EnterGuard {
61    DEPTH.with(|d| {
62        let depth = d.get();
63        if depth == 0 {
64            USED_BYTES.with(|u| u.set(0));
65        }
66        d.set(depth + 1);
67    });
68    EnterGuard
69}
70
71/// RAII guard returned by [`enter`]; decrements the reentrancy depth on drop.
72pub struct EnterGuard;
73
74impl Drop for EnterGuard {
75    #[inline]
76    fn drop(&mut self) {
77        DEPTH.with(|d| d.set(d.get().saturating_sub(1)));
78    }
79}
80
81/// Reset the per-query running total for the current thread, unconditionally.
82/// Test-only escape hatch; production code goes through [`enter`] so nested
83/// statements never clobber an outer frame's accounting.
84#[cfg(test)]
85pub fn reset() {
86    USED_BYTES.with(|u| u.set(0));
87    DEPTH.with(|d| d.set(0));
88}
89
90/// Charge `bytes` against the current thread's running total, checking it
91/// against `limit_bytes`. Returns [`QueryError::MemoryLimitExceeded`] if this
92/// allocation would push the total over the limit. On error nothing is charged.
93#[inline]
94pub fn charge(bytes: usize, limit_bytes: usize) -> Result<(), QueryError> {
95    USED_BYTES.with(|u| {
96        let requested = u.get().saturating_add(bytes);
97        if requested > limit_bytes {
98            return Err(QueryError::MemoryLimitExceeded {
99                limit_bytes,
100                requested_bytes: requested,
101            });
102        }
103        u.set(requested);
104        Ok(())
105    })
106}
107
108/// Bytes charged so far on the current thread (test/diagnostic helper).
109#[cfg(test)]
110pub fn used() -> usize {
111    USED_BYTES.with(|u| u.get())
112}
113
114/// Estimate the in-memory footprint of a single `Value`, including the heap
115/// allocation behind `Str`/`Bytes`. The estimate counts the enum slot plus any
116/// owned heap bytes — it is intentionally an over-approximation so the guard
117/// trips slightly early rather than slightly late.
118#[inline]
119pub fn estimate_value_size(v: &Value) -> usize {
120    let base = std::mem::size_of::<Value>();
121    let heap = match v {
122        Value::Str(s) => s.capacity(),
123        Value::Bytes(b) => b.capacity(),
124        _ => 0,
125    };
126    base + heap
127}
128
129/// Estimate the in-memory footprint of a fully materialized row, including the
130/// `Vec<Value>` backing allocation.
131#[inline]
132pub fn estimate_row_size(row: &[Value]) -> usize {
133    let mut total = std::mem::size_of::<Vec<Value>>();
134    for v in row {
135        total += estimate_value_size(v);
136    }
137    total
138}
139
140#[cfg(test)]
141mod tests {
142    use super::*;
143
144    #[test]
145    fn charge_under_limit_succeeds() {
146        reset();
147        assert!(charge(512, 1024).is_ok());
148        assert!(charge(512, 1024).is_ok());
149        assert_eq!(used(), 1024);
150    }
151
152    #[test]
153    fn charge_over_limit_errors_without_charging() {
154        reset();
155        assert!(charge(512, 1024).is_ok());
156        let err = charge(1024, 1024).unwrap_err();
157        match err {
158            QueryError::MemoryLimitExceeded {
159                limit_bytes,
160                requested_bytes,
161            } => {
162                assert_eq!(limit_bytes, 1024);
163                assert_eq!(requested_bytes, 1536);
164            }
165            other => panic!("expected MemoryLimitExceeded, got {other:?}"),
166        }
167        // The failed charge did not advance the counter.
168        assert_eq!(used(), 512);
169    }
170
171    #[test]
172    fn nested_enter_preserves_outer_accounting() {
173        // Simulates an outer statement that charges bytes, then recursively
174        // enters a nested statement (as create_view/refresh_view do when their
175        // source query calls execute_powql). The nested `enter` must NOT reset
176        // the running total, so the outer frame's accounting survives.
177        reset();
178        let outer = enter();
179        assert_eq!(used(), 0, "outermost enter zeroes the accumulator");
180        charge(1000, 1_000_000).unwrap();
181        assert_eq!(used(), 1000);
182
183        {
184            // Nested statement entry (depth 1 -> 2): must not reset.
185            let _inner = enter();
186            assert_eq!(used(), 1000, "nested enter must not discard outer bytes");
187            charge(500, 1_000_000).unwrap();
188            assert_eq!(used(), 1500);
189        } // inner guard drops, depth 2 -> 1, accumulator untouched
190
191        assert_eq!(used(), 1500, "outer accounting includes nested charges");
192        drop(outer); // depth 1 -> 0
193
194        // A fresh outermost statement starts clean again.
195        let _next = enter();
196        assert_eq!(used(), 0, "next outermost statement resets");
197    }
198
199    #[test]
200    fn string_value_counts_heap_bytes() {
201        let small = estimate_value_size(&Value::Int(1));
202        let big = estimate_value_size(&Value::Str("x".repeat(10_000)));
203        assert!(big >= small + 10_000);
204    }
205}