vibesql_executor/select/executor/
builder.rs

1//! SelectExecutor construction and initialization
2
3use std::{
4    cell::{Cell, OnceCell, RefCell},
5    collections::HashMap,
6};
7use instant::Instant;
8
9use crate::{
10    errors::ExecutorError,
11    evaluator::compiled_pivot::PivotAggregateGroup,
12    limits::{MAX_MEMORY_BYTES, MEMORY_WARNING_BYTES},
13    memory::QueryArena,
14};
15
16/// Executes SELECT queries
17pub struct SelectExecutor<'a> {
18    pub(super) database: &'a vibesql_storage::Database,
19    pub(super) outer_row: Option<&'a vibesql_storage::Row>,
20    pub(super) outer_schema: Option<&'a crate::schema::CombinedSchema>,
21    /// Procedural context for stored procedure/function variable resolution
22    pub(super) procedural_context: Option<&'a crate::procedural::ExecutionContext>,
23    /// CTE (Common Table Expression) context for accessing WITH clause results
24    /// Enables scalar subqueries to reference CTEs defined in the outer query
25    pub(super) cte_context: Option<&'a HashMap<String, super::super::cte::CteResult>>,
26    /// Subquery nesting depth (for preventing stack overflow)
27    pub(super) subquery_depth: usize,
28    /// Memory used by this query execution (in bytes)
29    pub(super) memory_used_bytes: Cell<usize>,
30    /// Flag to prevent logging the same warning multiple times
31    pub(super) memory_warning_logged: Cell<bool>,
32    /// Query start time (for timeout enforcement)
33    pub(crate) start_time: Instant,
34    /// Timeout in seconds (defaults to MAX_QUERY_EXECUTION_SECONDS)
35    pub timeout_seconds: u64,
36    /// Cache for aggregate results within a single group
37    /// Key: Hash of the aggregate expression (format: "{name}:{distinct}:{arg_debug}")
38    /// Value: Cached aggregate result
39    /// Scope: Per-group evaluation (cleared between groups)
40    /// Lazily initialized - only created when first aggregate is evaluated
41    pub(super) aggregate_cache: OnceCell<RefCell<HashMap<String, vibesql_types::SqlValue>>>,
42    /// Arena allocator for query-scoped allocations
43    /// Eliminates malloc/free overhead by using bump-pointer allocation
44    /// All allocations are freed when query completes
45    /// Lazily initialized - only created when first allocation is needed
46    pub(super) arena: OnceCell<RefCell<QueryArena>>,
47    /// Pivot aggregate group for batched SUM(CASE...) optimization
48    /// Detected once per query, executed once per group
49    /// Stores results directly in aggregate_cache
50    pub(super) pivot_group: RefCell<Option<PivotAggregateGroup>>,
51}
52
53impl<'a> SelectExecutor<'a> {
54    /// Create a new SELECT executor
55    ///
56    /// # Performance
57    ///
58    /// This constructor is optimized for OLTP workloads:
59    /// - Arena is lazily initialized (10MB allocation deferred until needed)
60    /// - Aggregate cache is lazily initialized (HashMap allocation deferred)
61    /// - Simple queries that don't use aggregates or complex allocations skip these costs
62    pub fn new(database: &'a vibesql_storage::Database) -> Self {
63        SelectExecutor {
64            database,
65            outer_row: None,
66            outer_schema: None,
67            procedural_context: None,
68            cte_context: None,
69            subquery_depth: 0,
70            memory_used_bytes: Cell::new(0),
71            memory_warning_logged: Cell::new(false),
72            start_time: Instant::now(),
73            timeout_seconds: crate::limits::MAX_QUERY_EXECUTION_SECONDS,
74            aggregate_cache: OnceCell::new(),
75            arena: OnceCell::new(),
76            pivot_group: RefCell::new(None),
77        }
78    }
79
80    /// Create a new SELECT executor with outer context for correlated subqueries
81    pub fn new_with_outer_context(
82        database: &'a vibesql_storage::Database,
83        outer_row: &'a vibesql_storage::Row,
84        outer_schema: &'a crate::schema::CombinedSchema,
85    ) -> Self {
86        SelectExecutor {
87            database,
88            outer_row: Some(outer_row),
89            outer_schema: Some(outer_schema),
90            procedural_context: None,
91            cte_context: None,
92            subquery_depth: 0,
93            memory_used_bytes: Cell::new(0),
94            memory_warning_logged: Cell::new(false),
95            start_time: Instant::now(),
96            timeout_seconds: crate::limits::MAX_QUERY_EXECUTION_SECONDS,
97            aggregate_cache: OnceCell::new(),
98            arena: OnceCell::new(),
99            pivot_group: RefCell::new(None),
100        }
101    }
102
103    /// Create a new SELECT executor with explicit depth tracking
104    /// Used for non-correlated subqueries to propagate depth limit enforcement
105    pub fn new_with_depth(database: &'a vibesql_storage::Database, parent_depth: usize) -> Self {
106        SelectExecutor {
107            database,
108            outer_row: None,
109            outer_schema: None,
110            procedural_context: None,
111            cte_context: None,
112            subquery_depth: parent_depth + 1,
113            memory_used_bytes: Cell::new(0),
114            memory_warning_logged: Cell::new(false),
115            start_time: Instant::now(),
116            timeout_seconds: crate::limits::MAX_QUERY_EXECUTION_SECONDS,
117            aggregate_cache: OnceCell::new(),
118            arena: OnceCell::new(),
119            pivot_group: RefCell::new(None),
120        }
121    }
122
123    /// Create a new SELECT executor with outer context and explicit depth
124    /// Used when creating subquery executors to track nesting depth
125    ///
126    /// # Note on Timeout Inheritance
127    ///
128    /// Currently subqueries get their own 60s timeout rather than sharing parent's timeout.
129    /// This means a query with N subqueries could run for up to N*60s instead of 60s total.
130    ///
131    /// However, this is acceptable for the initial fix because:
132    /// 1. The main regression (100% timeout) was caused by ZERO timeout enforcement
133    /// 2. Having per-subquery timeouts still prevents infinite loops (the core issue)
134    /// 3. Most problematic queries cause recursive subquery execution, which IS caught
135    /// 4. Threading timeout through evaluators requires extensive refactoring
136    ///
137    /// Future improvement: Add timeout fields to ExpressionEvaluator and pass through
138    /// See: <https://github.com/rjwalters/vibesql/issues/1012#subquery-timeout>
139    pub fn new_with_outer_context_and_depth(
140        database: &'a vibesql_storage::Database,
141        outer_row: &'a vibesql_storage::Row,
142        outer_schema: &'a crate::schema::CombinedSchema,
143        parent_depth: usize,
144    ) -> Self {
145        SelectExecutor {
146            database,
147            outer_row: Some(outer_row),
148            outer_schema: Some(outer_schema),
149            procedural_context: None,
150            cte_context: None,
151            subquery_depth: parent_depth + 1,
152            memory_used_bytes: Cell::new(0),
153            memory_warning_logged: Cell::new(false),
154            start_time: Instant::now(),
155            timeout_seconds: crate::limits::MAX_QUERY_EXECUTION_SECONDS,
156            aggregate_cache: OnceCell::new(),
157            arena: OnceCell::new(),
158            pivot_group: RefCell::new(None),
159        }
160    }
161
162    /// Create a new SELECT executor with procedural context for stored procedures/functions
163    pub fn new_with_procedural_context(
164        database: &'a vibesql_storage::Database,
165        procedural_context: &'a crate::procedural::ExecutionContext,
166    ) -> Self {
167        SelectExecutor {
168            database,
169            outer_row: None,
170            outer_schema: None,
171            procedural_context: Some(procedural_context),
172            cte_context: None,
173            subquery_depth: 0,
174            memory_used_bytes: Cell::new(0),
175            memory_warning_logged: Cell::new(false),
176            start_time: Instant::now(),
177            timeout_seconds: crate::limits::MAX_QUERY_EXECUTION_SECONDS,
178            aggregate_cache: OnceCell::new(),
179            arena: OnceCell::new(),
180            pivot_group: RefCell::new(None),
181        }
182    }
183
184    /// Create a new SELECT executor with CTE context and depth tracking
185    /// Used for non-correlated subqueries that need access to parent CTEs
186    pub fn new_with_cte_and_depth(
187        database: &'a vibesql_storage::Database,
188        cte_context: &'a HashMap<String, super::super::cte::CteResult>,
189        parent_depth: usize,
190    ) -> Self {
191        SelectExecutor {
192            database,
193            outer_row: None,
194            outer_schema: None,
195            procedural_context: None,
196            cte_context: Some(cte_context),
197            subquery_depth: parent_depth + 1,
198            memory_used_bytes: Cell::new(0),
199            memory_warning_logged: Cell::new(false),
200            start_time: Instant::now(),
201            timeout_seconds: crate::limits::MAX_QUERY_EXECUTION_SECONDS,
202            aggregate_cache: OnceCell::new(),
203            arena: OnceCell::new(),
204            pivot_group: RefCell::new(None),
205        }
206    }
207
208    /// Create a new SELECT executor with outer context, CTE context, and depth tracking
209    /// Used for correlated subqueries that need access to both outer row and parent CTEs
210    pub fn new_with_outer_and_cte_and_depth(
211        database: &'a vibesql_storage::Database,
212        outer_row: &'a vibesql_storage::Row,
213        outer_schema: &'a crate::schema::CombinedSchema,
214        cte_context: &'a HashMap<String, super::super::cte::CteResult>,
215        parent_depth: usize,
216    ) -> Self {
217        SelectExecutor {
218            database,
219            outer_row: Some(outer_row),
220            outer_schema: Some(outer_schema),
221            procedural_context: None,
222            cte_context: Some(cte_context),
223            subquery_depth: parent_depth + 1,
224            memory_used_bytes: Cell::new(0),
225            memory_warning_logged: Cell::new(false),
226            start_time: Instant::now(),
227            timeout_seconds: crate::limits::MAX_QUERY_EXECUTION_SECONDS,
228            aggregate_cache: OnceCell::new(),
229            arena: OnceCell::new(),
230            pivot_group: RefCell::new(None),
231        }
232    }
233
234    /// Track memory allocation
235    pub(super) fn track_memory_allocation(&self, bytes: usize) -> Result<(), ExecutorError> {
236        let mut current = self.memory_used_bytes.get();
237        current += bytes;
238        self.memory_used_bytes.set(current);
239
240        // Log warning at threshold
241        if !self.memory_warning_logged.get() && current > MEMORY_WARNING_BYTES {
242            eprintln!(
243                "⚠️  Query memory usage: {:.2} GB",
244                current as f64 / 1024.0 / 1024.0 / 1024.0
245            );
246            self.memory_warning_logged.set(true);
247        }
248
249        // Hard limit
250        if current > MAX_MEMORY_BYTES {
251            return Err(ExecutorError::MemoryLimitExceeded {
252                used_bytes: current,
253                max_bytes: MAX_MEMORY_BYTES,
254            });
255        }
256
257        Ok(())
258    }
259
260    /// Track memory deallocation
261    #[cfg(test)]
262    pub(super) fn track_memory_deallocation(&self, bytes: usize) {
263        let current = self.memory_used_bytes.get();
264        self.memory_used_bytes.set(current.saturating_sub(bytes));
265    }
266
267    /// Override default timeout for this query (useful for testing)
268    pub fn with_timeout(mut self, seconds: u64) -> Self {
269        self.timeout_seconds = seconds;
270        self
271    }
272
273    /// Clear aggregate cache (should be called between group evaluations)
274    /// No-op if the cache has not been initialized (lazy initialization)
275    pub(super) fn clear_aggregate_cache(&self) {
276        if let Some(cache) = self.aggregate_cache.get() {
277            cache.borrow_mut().clear();
278        }
279    }
280
281    /// Get access to the aggregate cache, initializing it lazily if needed
282    pub(super) fn get_aggregate_cache(&self) -> &RefCell<HashMap<String, vibesql_types::SqlValue>> {
283        self.aggregate_cache.get_or_init(|| RefCell::new(HashMap::new()))
284    }
285
286    /// Get access to the query buffer pool for reducing allocations
287    pub(crate) fn query_buffer_pool(&self) -> &vibesql_storage::QueryBufferPool {
288        self.database.query_buffer_pool()
289    }
290
291    /// Check if query has exceeded timeout
292    /// Call this in hot loops to prevent infinite execution
293    pub fn check_timeout(&self) -> Result<(), crate::errors::ExecutorError> {
294        let elapsed = self.start_time.elapsed().as_secs();
295        if elapsed >= self.timeout_seconds {
296            return Err(crate::errors::ExecutorError::QueryTimeoutExceeded {
297                elapsed_seconds: elapsed,
298                max_seconds: self.timeout_seconds,
299            });
300        }
301        Ok(())
302    }
303
304    /// Get access to the query arena for allocations
305    /// The arena is lazily initialized on first access
306    #[allow(dead_code)]
307    pub(crate) fn arena(&self) -> &RefCell<QueryArena> {
308        self.arena.get_or_init(|| RefCell::new(QueryArena::new()))
309    }
310
311    /// Reset the arena for query reuse
312    /// Called at the start of each query execution
313    /// No-op if the arena has not been initialized (lazy initialization)
314    pub(super) fn reset_arena(&self) {
315        if let Some(arena) = self.arena.get() {
316            arena.borrow_mut().reset();
317        }
318    }
319
320    /// Reset the executor for reuse between queries
321    ///
322    /// This method prepares the executor for a new query execution by:
323    /// - Resetting the start time to now
324    /// - Clearing memory tracking counters
325    /// - Resetting the arena (if initialized)
326    /// - Clearing the aggregate cache (if initialized)
327    ///
328    /// # Performance
329    ///
330    /// Call this method to reuse an executor instead of creating a new one.
331    /// This avoids the allocation overhead of creating new HashMap and arena instances.
332    pub fn reset_for_reuse(&mut self) {
333        self.start_time = Instant::now();
334        self.memory_used_bytes.set(0);
335        self.memory_warning_logged.set(false);
336        self.subquery_depth = 0;
337        self.outer_row = None;
338        self.outer_schema = None;
339        self.procedural_context = None;
340        self.cte_context = None;
341
342        // Reset arena if it was initialized (clears offset, keeps buffer allocation)
343        if let Some(arena) = self.arena.get() {
344            arena.borrow_mut().reset();
345        }
346
347        // Clear aggregate cache if it was initialized (clears entries, keeps HashMap allocation)
348        if let Some(cache) = self.aggregate_cache.get() {
349            cache.borrow_mut().clear();
350        }
351
352        // Clear pivot group
353        *self.pivot_group.borrow_mut() = None;
354    }
355
356    /// Set the pivot aggregate group for this query
357    ///
358    /// Called once during query planning when a pivot pattern is detected.
359    /// The pivot group is then executed once per group in aggregation.
360    pub(super) fn set_pivot_group(&self, group: PivotAggregateGroup) {
361        *self.pivot_group.borrow_mut() = Some(group);
362    }
363
364    /// Execute pivot aggregates for the current group and cache results
365    ///
366    /// This executes all pivot aggregates in a single pass over the rows,
367    /// storing results in the aggregate cache. Subsequent calls to evaluate
368    /// individual pivot aggregates will hit the cache.
369    pub(super) fn execute_pivot_aggregates(
370        &self,
371        group_rows: &[vibesql_storage::Row],
372    ) -> Result<(), ExecutorError> {
373        let pivot_group = self.pivot_group.borrow();
374        if let Some(ref pivot) = *pivot_group {
375            let results = pivot.execute(group_rows)?;
376
377            // Store all pivot results in the aggregate cache
378            let cache = self.get_aggregate_cache();
379            let mut cache_mut = cache.borrow_mut();
380            for (cache_key, value) in results {
381                cache_mut.insert(cache_key, value);
382            }
383        }
384        Ok(())
385    }
386
387    /// Check if a pivot group is set for this query
388    pub(super) fn has_pivot_group(&self) -> bool {
389        self.pivot_group.borrow().is_some()
390    }
391}