vibesql_executor/select/executor/
builder.rs1use std::{
4 cell::{Cell, OnceCell, RefCell},
5 collections::HashMap,
6};
7use instant::Instant;
8
9use crate::{
10 errors::ExecutorError,
11 evaluator::compiled_pivot::PivotAggregateGroup,
12 limits::{MAX_MEMORY_BYTES, MEMORY_WARNING_BYTES},
13 memory::QueryArena,
14};
15
16pub struct SelectExecutor<'a> {
18 pub(super) database: &'a vibesql_storage::Database,
19 pub(super) outer_row: Option<&'a vibesql_storage::Row>,
20 pub(super) outer_schema: Option<&'a crate::schema::CombinedSchema>,
21 pub(super) procedural_context: Option<&'a crate::procedural::ExecutionContext>,
23 pub(super) cte_context: Option<&'a HashMap<String, super::super::cte::CteResult>>,
26 pub(super) subquery_depth: usize,
28 pub(super) memory_used_bytes: Cell<usize>,
30 pub(super) memory_warning_logged: Cell<bool>,
32 pub(crate) start_time: Instant,
34 pub timeout_seconds: u64,
36 pub(super) aggregate_cache: OnceCell<RefCell<HashMap<String, vibesql_types::SqlValue>>>,
42 pub(super) arena: OnceCell<RefCell<QueryArena>>,
47 pub(super) pivot_group: RefCell<Option<PivotAggregateGroup>>,
51}
52
53impl<'a> SelectExecutor<'a> {
54 pub fn new(database: &'a vibesql_storage::Database) -> Self {
63 SelectExecutor {
64 database,
65 outer_row: None,
66 outer_schema: None,
67 procedural_context: None,
68 cte_context: None,
69 subquery_depth: 0,
70 memory_used_bytes: Cell::new(0),
71 memory_warning_logged: Cell::new(false),
72 start_time: Instant::now(),
73 timeout_seconds: crate::limits::MAX_QUERY_EXECUTION_SECONDS,
74 aggregate_cache: OnceCell::new(),
75 arena: OnceCell::new(),
76 pivot_group: RefCell::new(None),
77 }
78 }
79
80 pub fn new_with_outer_context(
82 database: &'a vibesql_storage::Database,
83 outer_row: &'a vibesql_storage::Row,
84 outer_schema: &'a crate::schema::CombinedSchema,
85 ) -> Self {
86 SelectExecutor {
87 database,
88 outer_row: Some(outer_row),
89 outer_schema: Some(outer_schema),
90 procedural_context: None,
91 cte_context: None,
92 subquery_depth: 0,
93 memory_used_bytes: Cell::new(0),
94 memory_warning_logged: Cell::new(false),
95 start_time: Instant::now(),
96 timeout_seconds: crate::limits::MAX_QUERY_EXECUTION_SECONDS,
97 aggregate_cache: OnceCell::new(),
98 arena: OnceCell::new(),
99 pivot_group: RefCell::new(None),
100 }
101 }
102
103 pub fn new_with_depth(database: &'a vibesql_storage::Database, parent_depth: usize) -> Self {
106 SelectExecutor {
107 database,
108 outer_row: None,
109 outer_schema: None,
110 procedural_context: None,
111 cte_context: None,
112 subquery_depth: parent_depth + 1,
113 memory_used_bytes: Cell::new(0),
114 memory_warning_logged: Cell::new(false),
115 start_time: Instant::now(),
116 timeout_seconds: crate::limits::MAX_QUERY_EXECUTION_SECONDS,
117 aggregate_cache: OnceCell::new(),
118 arena: OnceCell::new(),
119 pivot_group: RefCell::new(None),
120 }
121 }
122
123 pub fn new_with_outer_context_and_depth(
140 database: &'a vibesql_storage::Database,
141 outer_row: &'a vibesql_storage::Row,
142 outer_schema: &'a crate::schema::CombinedSchema,
143 parent_depth: usize,
144 ) -> Self {
145 SelectExecutor {
146 database,
147 outer_row: Some(outer_row),
148 outer_schema: Some(outer_schema),
149 procedural_context: None,
150 cte_context: None,
151 subquery_depth: parent_depth + 1,
152 memory_used_bytes: Cell::new(0),
153 memory_warning_logged: Cell::new(false),
154 start_time: Instant::now(),
155 timeout_seconds: crate::limits::MAX_QUERY_EXECUTION_SECONDS,
156 aggregate_cache: OnceCell::new(),
157 arena: OnceCell::new(),
158 pivot_group: RefCell::new(None),
159 }
160 }
161
162 pub fn new_with_procedural_context(
164 database: &'a vibesql_storage::Database,
165 procedural_context: &'a crate::procedural::ExecutionContext,
166 ) -> Self {
167 SelectExecutor {
168 database,
169 outer_row: None,
170 outer_schema: None,
171 procedural_context: Some(procedural_context),
172 cte_context: None,
173 subquery_depth: 0,
174 memory_used_bytes: Cell::new(0),
175 memory_warning_logged: Cell::new(false),
176 start_time: Instant::now(),
177 timeout_seconds: crate::limits::MAX_QUERY_EXECUTION_SECONDS,
178 aggregate_cache: OnceCell::new(),
179 arena: OnceCell::new(),
180 pivot_group: RefCell::new(None),
181 }
182 }
183
184 pub fn new_with_cte_and_depth(
187 database: &'a vibesql_storage::Database,
188 cte_context: &'a HashMap<String, super::super::cte::CteResult>,
189 parent_depth: usize,
190 ) -> Self {
191 SelectExecutor {
192 database,
193 outer_row: None,
194 outer_schema: None,
195 procedural_context: None,
196 cte_context: Some(cte_context),
197 subquery_depth: parent_depth + 1,
198 memory_used_bytes: Cell::new(0),
199 memory_warning_logged: Cell::new(false),
200 start_time: Instant::now(),
201 timeout_seconds: crate::limits::MAX_QUERY_EXECUTION_SECONDS,
202 aggregate_cache: OnceCell::new(),
203 arena: OnceCell::new(),
204 pivot_group: RefCell::new(None),
205 }
206 }
207
208 pub fn new_with_outer_and_cte_and_depth(
211 database: &'a vibesql_storage::Database,
212 outer_row: &'a vibesql_storage::Row,
213 outer_schema: &'a crate::schema::CombinedSchema,
214 cte_context: &'a HashMap<String, super::super::cte::CteResult>,
215 parent_depth: usize,
216 ) -> Self {
217 SelectExecutor {
218 database,
219 outer_row: Some(outer_row),
220 outer_schema: Some(outer_schema),
221 procedural_context: None,
222 cte_context: Some(cte_context),
223 subquery_depth: parent_depth + 1,
224 memory_used_bytes: Cell::new(0),
225 memory_warning_logged: Cell::new(false),
226 start_time: Instant::now(),
227 timeout_seconds: crate::limits::MAX_QUERY_EXECUTION_SECONDS,
228 aggregate_cache: OnceCell::new(),
229 arena: OnceCell::new(),
230 pivot_group: RefCell::new(None),
231 }
232 }
233
234 pub(super) fn track_memory_allocation(&self, bytes: usize) -> Result<(), ExecutorError> {
236 let mut current = self.memory_used_bytes.get();
237 current += bytes;
238 self.memory_used_bytes.set(current);
239
240 if !self.memory_warning_logged.get() && current > MEMORY_WARNING_BYTES {
242 eprintln!(
243 "⚠️ Query memory usage: {:.2} GB",
244 current as f64 / 1024.0 / 1024.0 / 1024.0
245 );
246 self.memory_warning_logged.set(true);
247 }
248
249 if current > MAX_MEMORY_BYTES {
251 return Err(ExecutorError::MemoryLimitExceeded {
252 used_bytes: current,
253 max_bytes: MAX_MEMORY_BYTES,
254 });
255 }
256
257 Ok(())
258 }
259
260 #[cfg(test)]
262 pub(super) fn track_memory_deallocation(&self, bytes: usize) {
263 let current = self.memory_used_bytes.get();
264 self.memory_used_bytes.set(current.saturating_sub(bytes));
265 }
266
267 pub fn with_timeout(mut self, seconds: u64) -> Self {
269 self.timeout_seconds = seconds;
270 self
271 }
272
273 pub(super) fn clear_aggregate_cache(&self) {
276 if let Some(cache) = self.aggregate_cache.get() {
277 cache.borrow_mut().clear();
278 }
279 }
280
281 pub(super) fn get_aggregate_cache(&self) -> &RefCell<HashMap<String, vibesql_types::SqlValue>> {
283 self.aggregate_cache.get_or_init(|| RefCell::new(HashMap::new()))
284 }
285
286 pub(crate) fn query_buffer_pool(&self) -> &vibesql_storage::QueryBufferPool {
288 self.database.query_buffer_pool()
289 }
290
291 pub fn check_timeout(&self) -> Result<(), crate::errors::ExecutorError> {
294 let elapsed = self.start_time.elapsed().as_secs();
295 if elapsed >= self.timeout_seconds {
296 return Err(crate::errors::ExecutorError::QueryTimeoutExceeded {
297 elapsed_seconds: elapsed,
298 max_seconds: self.timeout_seconds,
299 });
300 }
301 Ok(())
302 }
303
304 #[allow(dead_code)]
307 pub(crate) fn arena(&self) -> &RefCell<QueryArena> {
308 self.arena.get_or_init(|| RefCell::new(QueryArena::new()))
309 }
310
311 pub(super) fn reset_arena(&self) {
315 if let Some(arena) = self.arena.get() {
316 arena.borrow_mut().reset();
317 }
318 }
319
320 pub fn reset_for_reuse(&mut self) {
333 self.start_time = Instant::now();
334 self.memory_used_bytes.set(0);
335 self.memory_warning_logged.set(false);
336 self.subquery_depth = 0;
337 self.outer_row = None;
338 self.outer_schema = None;
339 self.procedural_context = None;
340 self.cte_context = None;
341
342 if let Some(arena) = self.arena.get() {
344 arena.borrow_mut().reset();
345 }
346
347 if let Some(cache) = self.aggregate_cache.get() {
349 cache.borrow_mut().clear();
350 }
351
352 *self.pivot_group.borrow_mut() = None;
354 }
355
356 pub(super) fn set_pivot_group(&self, group: PivotAggregateGroup) {
361 *self.pivot_group.borrow_mut() = Some(group);
362 }
363
364 pub(super) fn execute_pivot_aggregates(
370 &self,
371 group_rows: &[vibesql_storage::Row],
372 ) -> Result<(), ExecutorError> {
373 let pivot_group = self.pivot_group.borrow();
374 if let Some(ref pivot) = *pivot_group {
375 let results = pivot.execute(group_rows)?;
376
377 let cache = self.get_aggregate_cache();
379 let mut cache_mut = cache.borrow_mut();
380 for (cache_key, value) in results {
381 cache_mut.insert(cache_key, value);
382 }
383 }
384 Ok(())
385 }
386
387 pub(super) fn has_pivot_group(&self) -> bool {
389 self.pivot_group.borrow().is_some()
390 }
391}