1use std::{
4 cell::{Cell, OnceCell, RefCell},
5 collections::HashMap,
6};
7
8use instant::Instant;
9
10use crate::{
11 errors::ExecutorError,
12 evaluator::compiled_pivot::PivotAggregateGroup,
13 limits::{MAX_MEMORY_BYTES, MEMORY_WARNING_BYTES},
14 memory::QueryArena,
15};
16
17pub struct SelectExecutor<'a> {
19 pub(super) database: &'a vibesql_storage::Database,
20 pub(super) outer_row: Option<&'a vibesql_storage::Row>,
21 pub(super) outer_schema: Option<&'a crate::schema::CombinedSchema>,
22 pub(super) outer_rows: Option<&'a [vibesql_storage::Row]>,
26 pub(super) procedural_context: Option<&'a crate::procedural::ExecutionContext>,
28 pub(super) cte_context: Option<&'a HashMap<String, super::super::cte::CteResult>>,
31 pub(super) subquery_depth: usize,
33 pub(super) memory_used_bytes: Cell<usize>,
35 pub(super) memory_warning_logged: Cell<bool>,
37 pub(crate) start_time: Instant,
39 pub timeout_seconds: u64,
41 pub(super) aggregate_cache: OnceCell<RefCell<HashMap<String, vibesql_types::SqlValue>>>,
47 pub(super) arena: OnceCell<RefCell<QueryArena>>,
52 pub(super) pivot_group: RefCell<Option<PivotAggregateGroup>>,
56 pub(super) aggregate_representative_row_idx: RefCell<Option<usize>>,
62}
63
64impl<'a> SelectExecutor<'a> {
65 pub fn new(database: &'a vibesql_storage::Database) -> Self {
74 SelectExecutor {
75 database,
76 outer_row: None,
77 outer_schema: None,
78 outer_rows: None,
79 procedural_context: None,
80 cte_context: None,
81 subquery_depth: 0,
82 memory_used_bytes: Cell::new(0),
83 memory_warning_logged: Cell::new(false),
84 start_time: Instant::now(),
85 timeout_seconds: crate::limits::MAX_QUERY_EXECUTION_SECONDS,
86 aggregate_cache: OnceCell::new(),
87 arena: OnceCell::new(),
88 pivot_group: RefCell::new(None),
89 aggregate_representative_row_idx: RefCell::new(None),
90 }
91 }
92
93 pub fn new_with_cte(
96 database: &'a vibesql_storage::Database,
97 cte_context: &'a HashMap<String, super::super::cte::CteResult>,
98 ) -> Self {
99 Self::new_with_cte_and_depth(database, cte_context, 0)
100 }
101
102 pub fn new_with_outer_context(
104 database: &'a vibesql_storage::Database,
105 outer_row: &'a vibesql_storage::Row,
106 outer_schema: &'a crate::schema::CombinedSchema,
107 ) -> Self {
108 SelectExecutor {
109 database,
110 outer_row: Some(outer_row),
111 outer_schema: Some(outer_schema),
112 outer_rows: None,
113 procedural_context: None,
114 cte_context: None,
115 subquery_depth: 0,
116 memory_used_bytes: Cell::new(0),
117 memory_warning_logged: Cell::new(false),
118 start_time: Instant::now(),
119 timeout_seconds: crate::limits::MAX_QUERY_EXECUTION_SECONDS,
120 aggregate_cache: OnceCell::new(),
121 arena: OnceCell::new(),
122 pivot_group: RefCell::new(None),
123 aggregate_representative_row_idx: RefCell::new(None),
124 }
125 }
126
127 pub fn new_with_depth(database: &'a vibesql_storage::Database, parent_depth: usize) -> Self {
130 SelectExecutor {
131 database,
132 outer_row: None,
133 outer_schema: None,
134 outer_rows: None,
135 procedural_context: None,
136 cte_context: None,
137 subquery_depth: parent_depth + 1,
138 memory_used_bytes: Cell::new(0),
139 memory_warning_logged: Cell::new(false),
140 start_time: Instant::now(),
141 timeout_seconds: crate::limits::MAX_QUERY_EXECUTION_SECONDS,
142 aggregate_cache: OnceCell::new(),
143 arena: OnceCell::new(),
144 pivot_group: RefCell::new(None),
145 aggregate_representative_row_idx: RefCell::new(None),
146 }
147 }
148
149 pub fn new_with_outer_context_and_depth(
166 database: &'a vibesql_storage::Database,
167 outer_row: &'a vibesql_storage::Row,
168 outer_schema: &'a crate::schema::CombinedSchema,
169 parent_depth: usize,
170 ) -> Self {
171 SelectExecutor {
172 database,
173 outer_row: Some(outer_row),
174 outer_schema: Some(outer_schema),
175 outer_rows: None,
176 procedural_context: None,
177 cte_context: None,
178 subquery_depth: parent_depth + 1,
179 memory_used_bytes: Cell::new(0),
180 memory_warning_logged: Cell::new(false),
181 start_time: Instant::now(),
182 timeout_seconds: crate::limits::MAX_QUERY_EXECUTION_SECONDS,
183 aggregate_cache: OnceCell::new(),
184 arena: OnceCell::new(),
185 pivot_group: RefCell::new(None),
186 aggregate_representative_row_idx: RefCell::new(None),
187 }
188 }
189
190 pub fn new_with_procedural_context(
192 database: &'a vibesql_storage::Database,
193 procedural_context: &'a crate::procedural::ExecutionContext,
194 ) -> Self {
195 SelectExecutor {
196 database,
197 outer_row: None,
198 outer_schema: None,
199 outer_rows: None,
200 procedural_context: Some(procedural_context),
201 cte_context: None,
202 subquery_depth: 0,
203 memory_used_bytes: Cell::new(0),
204 memory_warning_logged: Cell::new(false),
205 start_time: Instant::now(),
206 timeout_seconds: crate::limits::MAX_QUERY_EXECUTION_SECONDS,
207 aggregate_cache: OnceCell::new(),
208 arena: OnceCell::new(),
209 pivot_group: RefCell::new(None),
210 aggregate_representative_row_idx: RefCell::new(None),
211 }
212 }
213
214 pub fn new_with_cte_and_depth(
217 database: &'a vibesql_storage::Database,
218 cte_context: &'a HashMap<String, super::super::cte::CteResult>,
219 parent_depth: usize,
220 ) -> Self {
221 SelectExecutor {
222 database,
223 outer_row: None,
224 outer_schema: None,
225 outer_rows: None,
226 procedural_context: None,
227 cte_context: Some(cte_context),
228 subquery_depth: parent_depth + 1,
229 memory_used_bytes: Cell::new(0),
230 memory_warning_logged: Cell::new(false),
231 start_time: Instant::now(),
232 timeout_seconds: crate::limits::MAX_QUERY_EXECUTION_SECONDS,
233 aggregate_cache: OnceCell::new(),
234 arena: OnceCell::new(),
235 pivot_group: RefCell::new(None),
236 aggregate_representative_row_idx: RefCell::new(None),
237 }
238 }
239
240 pub fn new_with_outer_and_cte_and_depth(
243 database: &'a vibesql_storage::Database,
244 outer_row: &'a vibesql_storage::Row,
245 outer_schema: &'a crate::schema::CombinedSchema,
246 cte_context: &'a HashMap<String, super::super::cte::CteResult>,
247 parent_depth: usize,
248 ) -> Self {
249 SelectExecutor {
250 database,
251 outer_row: Some(outer_row),
252 outer_schema: Some(outer_schema),
253 outer_rows: None,
254 procedural_context: None,
255 cte_context: Some(cte_context),
256 subquery_depth: parent_depth + 1,
257 memory_used_bytes: Cell::new(0),
258 memory_warning_logged: Cell::new(false),
259 start_time: Instant::now(),
260 timeout_seconds: crate::limits::MAX_QUERY_EXECUTION_SECONDS,
261 aggregate_cache: OnceCell::new(),
262 arena: OnceCell::new(),
263 pivot_group: RefCell::new(None),
264 aggregate_representative_row_idx: RefCell::new(None),
265 }
266 }
267
268 pub fn new_with_outer_rows_and_depth(
272 database: &'a vibesql_storage::Database,
273 outer_row: &'a vibesql_storage::Row,
274 outer_schema: &'a crate::schema::CombinedSchema,
275 outer_rows: &'a [vibesql_storage::Row],
276 parent_depth: usize,
277 ) -> Self {
278 SelectExecutor {
279 database,
280 outer_row: Some(outer_row),
281 outer_schema: Some(outer_schema),
282 outer_rows: Some(outer_rows),
283 procedural_context: None,
284 cte_context: None,
285 subquery_depth: parent_depth + 1,
286 memory_used_bytes: Cell::new(0),
287 memory_warning_logged: Cell::new(false),
288 start_time: Instant::now(),
289 timeout_seconds: crate::limits::MAX_QUERY_EXECUTION_SECONDS,
290 aggregate_cache: OnceCell::new(),
291 arena: OnceCell::new(),
292 pivot_group: RefCell::new(None),
293 aggregate_representative_row_idx: RefCell::new(None),
294 }
295 }
296
297 pub fn new_with_outer_rows_and_cte_and_depth(
300 database: &'a vibesql_storage::Database,
301 outer_row: &'a vibesql_storage::Row,
302 outer_schema: &'a crate::schema::CombinedSchema,
303 outer_rows: &'a [vibesql_storage::Row],
304 cte_context: &'a HashMap<String, super::super::cte::CteResult>,
305 parent_depth: usize,
306 ) -> Self {
307 SelectExecutor {
308 database,
309 outer_row: Some(outer_row),
310 outer_schema: Some(outer_schema),
311 outer_rows: Some(outer_rows),
312 procedural_context: None,
313 cte_context: Some(cte_context),
314 subquery_depth: parent_depth + 1,
315 memory_used_bytes: Cell::new(0),
316 memory_warning_logged: Cell::new(false),
317 start_time: Instant::now(),
318 timeout_seconds: crate::limits::MAX_QUERY_EXECUTION_SECONDS,
319 aggregate_cache: OnceCell::new(),
320 arena: OnceCell::new(),
321 pivot_group: RefCell::new(None),
322 aggregate_representative_row_idx: RefCell::new(None),
323 }
324 }
325
326 pub(super) fn track_memory_allocation(&self, bytes: usize) -> Result<(), ExecutorError> {
328 let mut current = self.memory_used_bytes.get();
329 current += bytes;
330 self.memory_used_bytes.set(current);
331
332 if !self.memory_warning_logged.get() && current > MEMORY_WARNING_BYTES {
334 eprintln!(
335 "⚠️ Query memory usage: {:.2} GB",
336 current as f64 / 1024.0 / 1024.0 / 1024.0
337 );
338 self.memory_warning_logged.set(true);
339 }
340
341 if current > MAX_MEMORY_BYTES {
343 return Err(ExecutorError::MemoryLimitExceeded {
344 used_bytes: current,
345 max_bytes: MAX_MEMORY_BYTES,
346 });
347 }
348
349 Ok(())
350 }
351
352 #[cfg(test)]
354 pub(super) fn track_memory_deallocation(&self, bytes: usize) {
355 let current = self.memory_used_bytes.get();
356 self.memory_used_bytes.set(current.saturating_sub(bytes));
357 }
358
359 pub fn with_timeout(mut self, seconds: u64) -> Self {
361 self.timeout_seconds = seconds;
362 self
363 }
364
365 pub(super) fn clear_aggregate_cache(&self) {
368 if let Some(cache) = self.aggregate_cache.get() {
369 cache.borrow_mut().clear();
370 }
371 }
372
373 pub(super) fn get_aggregate_cache(&self) -> &RefCell<HashMap<String, vibesql_types::SqlValue>> {
375 self.aggregate_cache.get_or_init(|| RefCell::new(HashMap::new()))
376 }
377
378 pub(crate) fn query_buffer_pool(&self) -> &vibesql_storage::QueryBufferPool {
380 self.database.query_buffer_pool()
381 }
382
383 pub fn check_timeout(&self) -> Result<(), crate::errors::ExecutorError> {
386 let elapsed = self.start_time.elapsed().as_secs();
387 if elapsed >= self.timeout_seconds {
388 return Err(crate::errors::ExecutorError::QueryTimeoutExceeded {
389 elapsed_seconds: elapsed,
390 max_seconds: self.timeout_seconds,
391 });
392 }
393 Ok(())
394 }
395
396 #[allow(dead_code)]
399 pub(crate) fn arena(&self) -> &RefCell<QueryArena> {
400 self.arena.get_or_init(|| RefCell::new(QueryArena::new()))
401 }
402
403 pub(super) fn reset_arena(&self) {
407 if let Some(arena) = self.arena.get() {
408 arena.borrow_mut().reset();
409 }
410 }
411
412 pub fn reset_for_reuse(&mut self) {
425 self.start_time = Instant::now();
426 self.memory_used_bytes.set(0);
427 self.memory_warning_logged.set(false);
428 self.subquery_depth = 0;
429 self.outer_row = None;
430 self.outer_schema = None;
431 self.procedural_context = None;
432 self.cte_context = None;
433
434 if let Some(arena) = self.arena.get() {
436 arena.borrow_mut().reset();
437 }
438
439 if let Some(cache) = self.aggregate_cache.get() {
441 cache.borrow_mut().clear();
442 }
443
444 *self.pivot_group.borrow_mut() = None;
446
447 *self.aggregate_representative_row_idx.borrow_mut() = None;
449 }
450
451 pub(super) fn set_pivot_group(&self, group: PivotAggregateGroup) {
456 *self.pivot_group.borrow_mut() = Some(group);
457 }
458
459 pub(super) fn execute_pivot_aggregates(
465 &self,
466 group_rows: &[vibesql_storage::Row],
467 ) -> Result<(), ExecutorError> {
468 let pivot_group = self.pivot_group.borrow();
469 if let Some(ref pivot) = *pivot_group {
470 let results = pivot.execute(group_rows)?;
471
472 let cache = self.get_aggregate_cache();
474 let mut cache_mut = cache.borrow_mut();
475 for (cache_key, value) in results {
476 cache_mut.insert(cache_key, value);
477 }
478 }
479 Ok(())
480 }
481
482 pub(super) fn has_pivot_group(&self) -> bool {
484 self.pivot_group.borrow().is_some()
485 }
486
487 pub(super) fn set_aggregate_representative_row(&self, idx: Option<usize>) {
497 *self.aggregate_representative_row_idx.borrow_mut() = idx;
498 }
499
500 pub(super) fn get_aggregate_representative_row(&self) -> Option<usize> {
503 *self.aggregate_representative_row_idx.borrow()
504 }
505
506 pub(super) fn find_representative_row_index(
520 &self,
521 select_list: &[vibesql_ast::SelectItem],
522 group_rows: &[vibesql_storage::Row],
523 evaluator: &crate::evaluator::CombinedExpressionEvaluator,
524 ) -> Option<usize> {
525 use crate::select::grouping::compare_sql_values;
526 use vibesql_types::SqlValue;
527
528 if group_rows.is_empty() {
529 return None;
530 }
531
532 for item in select_list {
534 if let vibesql_ast::SelectItem::Expression { expr, .. } = item {
535 if let vibesql_ast::Expression::AggregateFunction { name, args, .. } = expr {
536 let name_upper = name.to_uppercase();
537
538 if (name_upper == "MAX" || name_upper == "MIN") && args.len() == 1 {
540 let mut best_idx = 0;
542 let mut best_val: Option<SqlValue> = None;
543
544 for (idx, row) in group_rows.iter().enumerate() {
545 if let Ok(val) = evaluator.eval(&args[0], row) {
546 if matches!(val, SqlValue::Null) {
548 continue;
549 }
550
551 let is_better = match &best_val {
552 None => true,
553 Some(best) => {
554 let cmp = compare_sql_values(&val, best);
555 if name_upper == "MAX" {
556 cmp == std::cmp::Ordering::Greater
557 } else {
558 cmp == std::cmp::Ordering::Less
559 }
560 }
561 };
562
563 if is_better {
564 best_idx = idx;
565 best_val = Some(val);
566 }
567 }
568 }
569
570 if best_val.is_some() {
572 return Some(best_idx);
573 }
574 }
575 }
576 }
577 }
578
579 None
581 }
582}