vibesql_executor/select/executor/
builder.rs

1//! SelectExecutor construction and initialization
2
3use std::{
4    cell::{Cell, OnceCell, RefCell},
5    collections::HashMap,
6};
7
8use instant::Instant;
9
10use crate::{
11    errors::ExecutorError,
12    evaluator::compiled_pivot::PivotAggregateGroup,
13    limits::{MAX_MEMORY_BYTES, MEMORY_WARNING_BYTES},
14    memory::QueryArena,
15};
16
17/// Executes SELECT queries
18pub struct SelectExecutor<'a> {
19    pub(super) database: &'a vibesql_storage::Database,
20    pub(super) outer_row: Option<&'a vibesql_storage::Row>,
21    pub(super) outer_schema: Option<&'a crate::schema::CombinedSchema>,
22    /// All outer rows for outer-correlated aggregates (issue #4930).
23    /// When an aggregate in a scalar subquery references only outer columns,
24    /// it should aggregate over ALL outer rows, not just the current one.
25    pub(super) outer_rows: Option<&'a [vibesql_storage::Row]>,
26    /// Procedural context for stored procedure/function variable resolution
27    pub(super) procedural_context: Option<&'a crate::procedural::ExecutionContext>,
28    /// CTE (Common Table Expression) context for accessing WITH clause results
29    /// Enables scalar subqueries to reference CTEs defined in the outer query
30    pub(super) cte_context: Option<&'a HashMap<String, super::super::cte::CteResult>>,
31    /// Subquery nesting depth (for preventing stack overflow)
32    pub(super) subquery_depth: usize,
33    /// Memory used by this query execution (in bytes)
34    pub(super) memory_used_bytes: Cell<usize>,
35    /// Flag to prevent logging the same warning multiple times
36    pub(super) memory_warning_logged: Cell<bool>,
37    /// Query start time (for timeout enforcement)
38    pub(crate) start_time: Instant,
39    /// Timeout in seconds (defaults to MAX_QUERY_EXECUTION_SECONDS)
40    pub timeout_seconds: u64,
41    /// Cache for aggregate results within a single group
42    /// Key: Hash of the aggregate expression (format: "{name}:{distinct}:{arg_debug}")
43    /// Value: Cached aggregate result
44    /// Scope: Per-group evaluation (cleared between groups)
45    /// Lazily initialized - only created when first aggregate is evaluated
46    pub(super) aggregate_cache: OnceCell<RefCell<HashMap<String, vibesql_types::SqlValue>>>,
47    /// Arena allocator for query-scoped allocations
48    /// Eliminates malloc/free overhead by using bump-pointer allocation
49    /// All allocations are freed when query completes
50    /// Lazily initialized - only created when first allocation is needed
51    pub(super) arena: OnceCell<RefCell<QueryArena>>,
52    /// Pivot aggregate group for batched SUM(CASE...) optimization
53    /// Detected once per query, executed once per group
54    /// Stores results directly in aggregate_cache
55    pub(super) pivot_group: RefCell<Option<PivotAggregateGroup>>,
56    /// Index of the "representative row" for aggregate context subqueries.
57    /// When evaluating correlated subqueries in aggregate SELECT lists, SQLite uses
58    /// the row that corresponds to the aggregate result (e.g., the row where the column
59    /// has its MAX value for MAX(col)). This field stores that row's index.
60    /// See issue #4683 for details.
61    pub(super) aggregate_representative_row_idx: RefCell<Option<usize>>,
62}
63
64impl<'a> SelectExecutor<'a> {
65    /// Create a new SELECT executor
66    ///
67    /// # Performance
68    ///
69    /// This constructor is optimized for OLTP workloads:
70    /// - Arena is lazily initialized (10MB allocation deferred until needed)
71    /// - Aggregate cache is lazily initialized (HashMap allocation deferred)
72    /// - Simple queries that don't use aggregates or complex allocations skip these costs
73    pub fn new(database: &'a vibesql_storage::Database) -> Self {
74        SelectExecutor {
75            database,
76            outer_row: None,
77            outer_schema: None,
78            outer_rows: None,
79            procedural_context: None,
80            cte_context: None,
81            subquery_depth: 0,
82            memory_used_bytes: Cell::new(0),
83            memory_warning_logged: Cell::new(false),
84            start_time: Instant::now(),
85            timeout_seconds: crate::limits::MAX_QUERY_EXECUTION_SECONDS,
86            aggregate_cache: OnceCell::new(),
87            arena: OnceCell::new(),
88            pivot_group: RefCell::new(None),
89            aggregate_representative_row_idx: RefCell::new(None),
90        }
91    }
92
93    /// Create a new SELECT executor with CTE context
94    /// Used for INSERT ... SELECT with CTEs (WITH clause on INSERT statement)
95    pub fn new_with_cte(
96        database: &'a vibesql_storage::Database,
97        cte_context: &'a HashMap<String, super::super::cte::CteResult>,
98    ) -> Self {
99        Self::new_with_cte_and_depth(database, cte_context, 0)
100    }
101
102    /// Create a new SELECT executor with outer context for correlated subqueries
103    pub fn new_with_outer_context(
104        database: &'a vibesql_storage::Database,
105        outer_row: &'a vibesql_storage::Row,
106        outer_schema: &'a crate::schema::CombinedSchema,
107    ) -> Self {
108        SelectExecutor {
109            database,
110            outer_row: Some(outer_row),
111            outer_schema: Some(outer_schema),
112            outer_rows: None,
113            procedural_context: None,
114            cte_context: None,
115            subquery_depth: 0,
116            memory_used_bytes: Cell::new(0),
117            memory_warning_logged: Cell::new(false),
118            start_time: Instant::now(),
119            timeout_seconds: crate::limits::MAX_QUERY_EXECUTION_SECONDS,
120            aggregate_cache: OnceCell::new(),
121            arena: OnceCell::new(),
122            pivot_group: RefCell::new(None),
123            aggregate_representative_row_idx: RefCell::new(None),
124        }
125    }
126
127    /// Create a new SELECT executor with explicit depth tracking
128    /// Used for non-correlated subqueries to propagate depth limit enforcement
129    pub fn new_with_depth(database: &'a vibesql_storage::Database, parent_depth: usize) -> Self {
130        SelectExecutor {
131            database,
132            outer_row: None,
133            outer_schema: None,
134            outer_rows: None,
135            procedural_context: None,
136            cte_context: None,
137            subquery_depth: parent_depth + 1,
138            memory_used_bytes: Cell::new(0),
139            memory_warning_logged: Cell::new(false),
140            start_time: Instant::now(),
141            timeout_seconds: crate::limits::MAX_QUERY_EXECUTION_SECONDS,
142            aggregate_cache: OnceCell::new(),
143            arena: OnceCell::new(),
144            pivot_group: RefCell::new(None),
145            aggregate_representative_row_idx: RefCell::new(None),
146        }
147    }
148
149    /// Create a new SELECT executor with outer context and explicit depth
150    /// Used when creating subquery executors to track nesting depth
151    ///
152    /// # Note on Timeout Inheritance
153    ///
154    /// Currently subqueries get their own 60s timeout rather than sharing parent's timeout.
155    /// This means a query with N subqueries could run for up to N*60s instead of 60s total.
156    ///
157    /// However, this is acceptable for the initial fix because:
158    /// 1. The main regression (100% timeout) was caused by ZERO timeout enforcement
159    /// 2. Having per-subquery timeouts still prevents infinite loops (the core issue)
160    /// 3. Most problematic queries cause recursive subquery execution, which IS caught
161    /// 4. Threading timeout through evaluators requires extensive refactoring
162    ///
163    /// Future improvement: Add timeout fields to ExpressionEvaluator and pass through
164    /// See: <https://github.com/rjwalters/vibesql/issues/1012#subquery-timeout>
165    pub fn new_with_outer_context_and_depth(
166        database: &'a vibesql_storage::Database,
167        outer_row: &'a vibesql_storage::Row,
168        outer_schema: &'a crate::schema::CombinedSchema,
169        parent_depth: usize,
170    ) -> Self {
171        SelectExecutor {
172            database,
173            outer_row: Some(outer_row),
174            outer_schema: Some(outer_schema),
175            outer_rows: None,
176            procedural_context: None,
177            cte_context: None,
178            subquery_depth: parent_depth + 1,
179            memory_used_bytes: Cell::new(0),
180            memory_warning_logged: Cell::new(false),
181            start_time: Instant::now(),
182            timeout_seconds: crate::limits::MAX_QUERY_EXECUTION_SECONDS,
183            aggregate_cache: OnceCell::new(),
184            arena: OnceCell::new(),
185            pivot_group: RefCell::new(None),
186            aggregate_representative_row_idx: RefCell::new(None),
187        }
188    }
189
190    /// Create a new SELECT executor with procedural context for stored procedures/functions
191    pub fn new_with_procedural_context(
192        database: &'a vibesql_storage::Database,
193        procedural_context: &'a crate::procedural::ExecutionContext,
194    ) -> Self {
195        SelectExecutor {
196            database,
197            outer_row: None,
198            outer_schema: None,
199            outer_rows: None,
200            procedural_context: Some(procedural_context),
201            cte_context: None,
202            subquery_depth: 0,
203            memory_used_bytes: Cell::new(0),
204            memory_warning_logged: Cell::new(false),
205            start_time: Instant::now(),
206            timeout_seconds: crate::limits::MAX_QUERY_EXECUTION_SECONDS,
207            aggregate_cache: OnceCell::new(),
208            arena: OnceCell::new(),
209            pivot_group: RefCell::new(None),
210            aggregate_representative_row_idx: RefCell::new(None),
211        }
212    }
213
214    /// Create a new SELECT executor with CTE context and depth tracking
215    /// Used for non-correlated subqueries that need access to parent CTEs
216    pub fn new_with_cte_and_depth(
217        database: &'a vibesql_storage::Database,
218        cte_context: &'a HashMap<String, super::super::cte::CteResult>,
219        parent_depth: usize,
220    ) -> Self {
221        SelectExecutor {
222            database,
223            outer_row: None,
224            outer_schema: None,
225            outer_rows: None,
226            procedural_context: None,
227            cte_context: Some(cte_context),
228            subquery_depth: parent_depth + 1,
229            memory_used_bytes: Cell::new(0),
230            memory_warning_logged: Cell::new(false),
231            start_time: Instant::now(),
232            timeout_seconds: crate::limits::MAX_QUERY_EXECUTION_SECONDS,
233            aggregate_cache: OnceCell::new(),
234            arena: OnceCell::new(),
235            pivot_group: RefCell::new(None),
236            aggregate_representative_row_idx: RefCell::new(None),
237        }
238    }
239
240    /// Create a new SELECT executor with outer context, CTE context, and depth tracking
241    /// Used for correlated subqueries that need access to both outer row and parent CTEs
242    pub fn new_with_outer_and_cte_and_depth(
243        database: &'a vibesql_storage::Database,
244        outer_row: &'a vibesql_storage::Row,
245        outer_schema: &'a crate::schema::CombinedSchema,
246        cte_context: &'a HashMap<String, super::super::cte::CteResult>,
247        parent_depth: usize,
248    ) -> Self {
249        SelectExecutor {
250            database,
251            outer_row: Some(outer_row),
252            outer_schema: Some(outer_schema),
253            outer_rows: None,
254            procedural_context: None,
255            cte_context: Some(cte_context),
256            subquery_depth: parent_depth + 1,
257            memory_used_bytes: Cell::new(0),
258            memory_warning_logged: Cell::new(false),
259            start_time: Instant::now(),
260            timeout_seconds: crate::limits::MAX_QUERY_EXECUTION_SECONDS,
261            aggregate_cache: OnceCell::new(),
262            arena: OnceCell::new(),
263            pivot_group: RefCell::new(None),
264            aggregate_representative_row_idx: RefCell::new(None),
265        }
266    }
267
268    /// Create a new SELECT executor with outer context, outer rows, and depth tracking
269    /// Used for outer-correlated aggregates (issue #4930) where aggregates in scalar
270    /// subqueries need access to ALL outer rows, not just the current one.
271    pub fn new_with_outer_rows_and_depth(
272        database: &'a vibesql_storage::Database,
273        outer_row: &'a vibesql_storage::Row,
274        outer_schema: &'a crate::schema::CombinedSchema,
275        outer_rows: &'a [vibesql_storage::Row],
276        parent_depth: usize,
277    ) -> Self {
278        SelectExecutor {
279            database,
280            outer_row: Some(outer_row),
281            outer_schema: Some(outer_schema),
282            outer_rows: Some(outer_rows),
283            procedural_context: None,
284            cte_context: None,
285            subquery_depth: parent_depth + 1,
286            memory_used_bytes: Cell::new(0),
287            memory_warning_logged: Cell::new(false),
288            start_time: Instant::now(),
289            timeout_seconds: crate::limits::MAX_QUERY_EXECUTION_SECONDS,
290            aggregate_cache: OnceCell::new(),
291            arena: OnceCell::new(),
292            pivot_group: RefCell::new(None),
293            aggregate_representative_row_idx: RefCell::new(None),
294        }
295    }
296
297    /// Create a new SELECT executor with outer context, outer rows, CTE, and depth tracking
298    /// Used for outer-correlated aggregates (issue #4930) with CTE support
299    pub fn new_with_outer_rows_and_cte_and_depth(
300        database: &'a vibesql_storage::Database,
301        outer_row: &'a vibesql_storage::Row,
302        outer_schema: &'a crate::schema::CombinedSchema,
303        outer_rows: &'a [vibesql_storage::Row],
304        cte_context: &'a HashMap<String, super::super::cte::CteResult>,
305        parent_depth: usize,
306    ) -> Self {
307        SelectExecutor {
308            database,
309            outer_row: Some(outer_row),
310            outer_schema: Some(outer_schema),
311            outer_rows: Some(outer_rows),
312            procedural_context: None,
313            cte_context: Some(cte_context),
314            subquery_depth: parent_depth + 1,
315            memory_used_bytes: Cell::new(0),
316            memory_warning_logged: Cell::new(false),
317            start_time: Instant::now(),
318            timeout_seconds: crate::limits::MAX_QUERY_EXECUTION_SECONDS,
319            aggregate_cache: OnceCell::new(),
320            arena: OnceCell::new(),
321            pivot_group: RefCell::new(None),
322            aggregate_representative_row_idx: RefCell::new(None),
323        }
324    }
325
326    /// Track memory allocation
327    pub(super) fn track_memory_allocation(&self, bytes: usize) -> Result<(), ExecutorError> {
328        let mut current = self.memory_used_bytes.get();
329        current += bytes;
330        self.memory_used_bytes.set(current);
331
332        // Log warning at threshold
333        if !self.memory_warning_logged.get() && current > MEMORY_WARNING_BYTES {
334            eprintln!(
335                "⚠️  Query memory usage: {:.2} GB",
336                current as f64 / 1024.0 / 1024.0 / 1024.0
337            );
338            self.memory_warning_logged.set(true);
339        }
340
341        // Hard limit
342        if current > MAX_MEMORY_BYTES {
343            return Err(ExecutorError::MemoryLimitExceeded {
344                used_bytes: current,
345                max_bytes: MAX_MEMORY_BYTES,
346            });
347        }
348
349        Ok(())
350    }
351
352    /// Track memory deallocation
353    #[cfg(test)]
354    pub(super) fn track_memory_deallocation(&self, bytes: usize) {
355        let current = self.memory_used_bytes.get();
356        self.memory_used_bytes.set(current.saturating_sub(bytes));
357    }
358
359    /// Override default timeout for this query (useful for testing)
360    pub fn with_timeout(mut self, seconds: u64) -> Self {
361        self.timeout_seconds = seconds;
362        self
363    }
364
365    /// Clear aggregate cache (should be called between group evaluations)
366    /// No-op if the cache has not been initialized (lazy initialization)
367    pub(super) fn clear_aggregate_cache(&self) {
368        if let Some(cache) = self.aggregate_cache.get() {
369            cache.borrow_mut().clear();
370        }
371    }
372
373    /// Get access to the aggregate cache, initializing it lazily if needed
374    pub(super) fn get_aggregate_cache(&self) -> &RefCell<HashMap<String, vibesql_types::SqlValue>> {
375        self.aggregate_cache.get_or_init(|| RefCell::new(HashMap::new()))
376    }
377
378    /// Get access to the query buffer pool for reducing allocations
379    pub(crate) fn query_buffer_pool(&self) -> &vibesql_storage::QueryBufferPool {
380        self.database.query_buffer_pool()
381    }
382
383    /// Check if query has exceeded timeout
384    /// Call this in hot loops to prevent infinite execution
385    pub fn check_timeout(&self) -> Result<(), crate::errors::ExecutorError> {
386        let elapsed = self.start_time.elapsed().as_secs();
387        if elapsed >= self.timeout_seconds {
388            return Err(crate::errors::ExecutorError::QueryTimeoutExceeded {
389                elapsed_seconds: elapsed,
390                max_seconds: self.timeout_seconds,
391            });
392        }
393        Ok(())
394    }
395
396    /// Get access to the query arena for allocations
397    /// The arena is lazily initialized on first access
398    #[allow(dead_code)]
399    pub(crate) fn arena(&self) -> &RefCell<QueryArena> {
400        self.arena.get_or_init(|| RefCell::new(QueryArena::new()))
401    }
402
403    /// Reset the arena for query reuse
404    /// Called at the start of each query execution
405    /// No-op if the arena has not been initialized (lazy initialization)
406    pub(super) fn reset_arena(&self) {
407        if let Some(arena) = self.arena.get() {
408            arena.borrow_mut().reset();
409        }
410    }
411
412    /// Reset the executor for reuse between queries
413    ///
414    /// This method prepares the executor for a new query execution by:
415    /// - Resetting the start time to now
416    /// - Clearing memory tracking counters
417    /// - Resetting the arena (if initialized)
418    /// - Clearing the aggregate cache (if initialized)
419    ///
420    /// # Performance
421    ///
422    /// Call this method to reuse an executor instead of creating a new one.
423    /// This avoids the allocation overhead of creating new HashMap and arena instances.
424    pub fn reset_for_reuse(&mut self) {
425        self.start_time = Instant::now();
426        self.memory_used_bytes.set(0);
427        self.memory_warning_logged.set(false);
428        self.subquery_depth = 0;
429        self.outer_row = None;
430        self.outer_schema = None;
431        self.procedural_context = None;
432        self.cte_context = None;
433
434        // Reset arena if it was initialized (clears offset, keeps buffer allocation)
435        if let Some(arena) = self.arena.get() {
436            arena.borrow_mut().reset();
437        }
438
439        // Clear aggregate cache if it was initialized (clears entries, keeps HashMap allocation)
440        if let Some(cache) = self.aggregate_cache.get() {
441            cache.borrow_mut().clear();
442        }
443
444        // Clear pivot group
445        *self.pivot_group.borrow_mut() = None;
446
447        // Clear representative row index
448        *self.aggregate_representative_row_idx.borrow_mut() = None;
449    }
450
451    /// Set the pivot aggregate group for this query
452    ///
453    /// Called once during query planning when a pivot pattern is detected.
454    /// The pivot group is then executed once per group in aggregation.
455    pub(super) fn set_pivot_group(&self, group: PivotAggregateGroup) {
456        *self.pivot_group.borrow_mut() = Some(group);
457    }
458
459    /// Execute pivot aggregates for the current group and cache results
460    ///
461    /// This executes all pivot aggregates in a single pass over the rows,
462    /// storing results in the aggregate cache. Subsequent calls to evaluate
463    /// individual pivot aggregates will hit the cache.
464    pub(super) fn execute_pivot_aggregates(
465        &self,
466        group_rows: &[vibesql_storage::Row],
467    ) -> Result<(), ExecutorError> {
468        let pivot_group = self.pivot_group.borrow();
469        if let Some(ref pivot) = *pivot_group {
470            let results = pivot.execute(group_rows)?;
471
472            // Store all pivot results in the aggregate cache
473            let cache = self.get_aggregate_cache();
474            let mut cache_mut = cache.borrow_mut();
475            for (cache_key, value) in results {
476                cache_mut.insert(cache_key, value);
477            }
478        }
479        Ok(())
480    }
481
482    /// Check if a pivot group is set for this query
483    pub(super) fn has_pivot_group(&self) -> bool {
484        self.pivot_group.borrow().is_some()
485    }
486
487    /// Set the representative row index for aggregate context subquery evaluation.
488    ///
489    /// This is used to implement SQLite's behavior where correlated subqueries in
490    /// aggregate SELECT lists use the row that corresponds to the aggregate result.
491    /// For example, in `SELECT max(a), (SELECT d FROM t2 WHERE a=c) FROM t1`,
492    /// the subquery uses `a` from the row where `a` has its maximum value.
493    ///
494    /// # Arguments
495    /// * `idx` - The index of the representative row in the current group's rows
496    pub(super) fn set_aggregate_representative_row(&self, idx: Option<usize>) {
497        *self.aggregate_representative_row_idx.borrow_mut() = idx;
498    }
499
500    /// Get the representative row index for aggregate context subquery evaluation.
501    /// Returns None if no representative row has been set.
502    pub(super) fn get_aggregate_representative_row(&self) -> Option<usize> {
503        *self.aggregate_representative_row_idx.borrow()
504    }
505
506    /// Find the representative row based on aggregates in the SELECT list.
507    ///
508    /// For MAX(col) aggregates, returns the index of the row where col has its maximum value.
509    /// For MIN(col) aggregates, returns the index of the row where col has its minimum value.
510    /// For other aggregates or no aggregates, returns None (fallback to first row behavior).
511    ///
512    /// This implements SQLite's behavior where bare column references in aggregate queries
513    /// use values from the row that contributed to the aggregate result.
514    ///
515    /// # Arguments
516    /// * `select_list` - The expanded SELECT list to scan for aggregates
517    /// * `group_rows` - The rows in the current group
518    /// * `evaluator` - Expression evaluator for computing column values
519    pub(super) fn find_representative_row_index(
520        &self,
521        select_list: &[vibesql_ast::SelectItem],
522        group_rows: &[vibesql_storage::Row],
523        evaluator: &crate::evaluator::CombinedExpressionEvaluator,
524    ) -> Option<usize> {
525        use crate::select::grouping::compare_sql_values;
526        use vibesql_types::SqlValue;
527
528        if group_rows.is_empty() {
529            return None;
530        }
531
532        // Scan SELECT list for MAX or MIN aggregates on a column
533        for item in select_list {
534            if let vibesql_ast::SelectItem::Expression { expr, .. } = item {
535                if let vibesql_ast::Expression::AggregateFunction { name, args, .. } = expr {
536                    let name_upper = name.to_uppercase();
537
538                    // We only care about MAX/MIN aggregates on columns
539                    if (name_upper == "MAX" || name_upper == "MIN") && args.len() == 1 {
540                        // Find the row where the column has its max/min value
541                        let mut best_idx = 0;
542                        let mut best_val: Option<SqlValue> = None;
543
544                        for (idx, row) in group_rows.iter().enumerate() {
545                            if let Ok(val) = evaluator.eval(&args[0], row) {
546                                // Skip NULL values
547                                if matches!(val, SqlValue::Null) {
548                                    continue;
549                                }
550
551                                let is_better = match &best_val {
552                                    None => true,
553                                    Some(best) => {
554                                        let cmp = compare_sql_values(&val, best);
555                                        if name_upper == "MAX" {
556                                            cmp == std::cmp::Ordering::Greater
557                                        } else {
558                                            cmp == std::cmp::Ordering::Less
559                                        }
560                                    }
561                                };
562
563                                if is_better {
564                                    best_idx = idx;
565                                    best_val = Some(val);
566                                }
567                            }
568                        }
569
570                        // If we found a non-NULL value, use that row
571                        if best_val.is_some() {
572                            return Some(best_idx);
573                        }
574                    }
575                }
576            }
577        }
578
579        // No suitable aggregate found, return None (will fall back to first row)
580        None
581    }
582}