Skip to main content

stoolap/executor/
context.rs

1// Copyright 2025 Stoolap Contributors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Execution Context
16//!
17//! This module provides the execution context for SQL queries, including
18//! parameter handling, transaction state, and query options.
19
20use crate::common::time_compat::Instant;
21use lru::LruCache;
22use rustc_hash::FxHashMap;
23use std::cell::RefCell;
24use std::collections::BinaryHeap;
25use std::num::NonZeroUsize;
26use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
27use std::sync::{Arc, Condvar, LazyLock, Mutex};
28use std::time::Duration;
29
30// Cache size limits for subquery caches to prevent unbounded memory growth.
31// These are per-thread limits since the caches are thread-local.
32const SCALAR_SUBQUERY_CACHE_SIZE: usize = 128;
33const IN_SUBQUERY_CACHE_SIZE: usize = 128;
34const SEMI_JOIN_CACHE_SIZE: usize = 256;
35
36use crate::api::params::ParamVec;
37use crate::common::{CompactArc, StringMap};
38use crate::core::{Result, Row, Value, ValueMap, ValueSet};
39
40// Static defaults for ExecutionContext to avoid allocations for empty values.
41// These are shared across all contexts and only require Arc refcount bump on clone.
42// Note: cancelled is NOT shared - each context needs its own cancellation flag.
43static EMPTY_PARAMS: LazyLock<CompactArc<ParamVec>> =
44    LazyLock::new(|| CompactArc::new(ParamVec::new()));
45static EMPTY_NAMED_PARAMS: LazyLock<Arc<FxHashMap<String, Value>>> =
46    LazyLock::new(|| Arc::new(FxHashMap::default()));
47static EMPTY_DATABASE: LazyLock<Arc<Option<String>>> = LazyLock::new(|| Arc::new(None));
48static EMPTY_SESSION_VARS: LazyLock<Arc<AHashMap<String, Value>>> =
49    LazyLock::new(|| Arc::new(AHashMap::new()));
50
51// Cache for scalar subquery results to avoid re-execution.
52// Thread-local to avoid synchronization overhead.
53// Uses SQL string as key (not hash) to avoid collision risk.
54// Stores (tables_referenced, result) for table-based invalidation.
55// LRU-bounded to prevent unbounded memory growth.
56use smallvec::SmallVec;
57
58/// Cached scalar subquery entry: (tables_referenced for invalidation, result value)
59type ScalarSubqueryCacheEntry = (SmallVec<[CompactArc<str>; 2]>, Value);
60
61thread_local! {
62    static SCALAR_SUBQUERY_CACHE: RefCell<LruCache<String, ScalarSubqueryCacheEntry>> =
63        RefCell::new(LruCache::new(NonZeroUsize::new(SCALAR_SUBQUERY_CACHE_SIZE).unwrap()));
64}
65
66/// Clear the scalar subquery cache completely.
67/// NOTE: For normal operation, use `invalidate_scalar_subquery_cache_for_table` instead.
68/// This is only used for explicit cache clearing (e.g., after DDL operations).
69pub fn clear_scalar_subquery_cache() {
70    SCALAR_SUBQUERY_CACHE.with(|cache| {
71        cache.borrow_mut().clear();
72    });
73}
74
75/// Invalidate scalar subquery cache entries for a specific table.
76/// Should be called after INSERT, UPDATE, DELETE, or TRUNCATE on a table.
77#[inline]
78pub fn invalidate_scalar_subquery_cache_for_table(table_name: &str) {
79    SCALAR_SUBQUERY_CACHE.with(|cache| {
80        let mut c = cache.borrow_mut();
81        if c.is_empty() {
82            return;
83        }
84        // Collect keys to remove (LruCache doesn't have retain)
85        let keys_to_remove: Vec<String> = c
86            .iter()
87            .filter(|(_, (tables, _))| tables.iter().any(|t| t.eq_ignore_ascii_case(table_name)))
88            .map(|(k, _)| k.clone())
89            .collect();
90        for key in keys_to_remove {
91            c.pop(&key);
92        }
93    });
94}
95
96/// Get a cached scalar subquery result by SQL string key.
97pub fn get_cached_scalar_subquery(key: &str) -> Option<Value> {
98    SCALAR_SUBQUERY_CACHE.with(|cache| cache.borrow_mut().get(key).map(|(_, v)| v.clone()))
99}
100
101/// Cache a scalar subquery result with the tables it references.
102pub fn cache_scalar_subquery(key: String, tables: SmallVec<[CompactArc<str>; 2]>, value: Value) {
103    SCALAR_SUBQUERY_CACHE.with(|cache| {
104        cache.borrow_mut().put(key, (tables, value));
105    });
106}
107
108// Cache for IN subquery results to avoid re-execution.
109// Thread-local to avoid synchronization overhead.
110// Uses SQL string as key (not hash) to avoid collision risk.
111// Stores (tables_referenced, result) for table-based invalidation.
112// LRU-bounded to prevent unbounded memory growth.
113
114/// Cached IN subquery entry: (tables_referenced for invalidation, result values)
115type InSubqueryCacheEntry = (SmallVec<[CompactArc<str>; 2]>, Vec<Value>);
116
117thread_local! {
118    static IN_SUBQUERY_CACHE: RefCell<LruCache<String, InSubqueryCacheEntry>> =
119        RefCell::new(LruCache::new(NonZeroUsize::new(IN_SUBQUERY_CACHE_SIZE).unwrap()));
120}
121
122/// Clear the IN subquery cache completely.
123/// NOTE: For normal operation, use `invalidate_in_subquery_cache_for_table` instead.
124/// This is only used for explicit cache clearing (e.g., after DDL operations).
125pub fn clear_in_subquery_cache() {
126    IN_SUBQUERY_CACHE.with(|cache| {
127        cache.borrow_mut().clear();
128    });
129}
130
131/// Invalidate IN subquery cache entries for a specific table.
132/// Should be called after INSERT, UPDATE, DELETE, or TRUNCATE on a table.
133#[inline]
134pub fn invalidate_in_subquery_cache_for_table(table_name: &str) {
135    IN_SUBQUERY_CACHE.with(|cache| {
136        let mut c = cache.borrow_mut();
137        if c.is_empty() {
138            return;
139        }
140        // Collect keys to remove (LruCache doesn't have retain)
141        let keys_to_remove: Vec<String> = c
142            .iter()
143            .filter(|(_, (tables, _))| tables.iter().any(|t| t.eq_ignore_ascii_case(table_name)))
144            .map(|(k, _)| k.clone())
145            .collect();
146        for key in keys_to_remove {
147            c.pop(&key);
148        }
149    });
150}
151
152/// Get a cached IN subquery result by SQL string key.
153pub fn get_cached_in_subquery(key: &str) -> Option<Vec<Value>> {
154    IN_SUBQUERY_CACHE.with(|cache| cache.borrow_mut().get(key).map(|(_, v)| v.clone()))
155}
156
157/// Cache an IN subquery result with the tables it references.
158pub fn cache_in_subquery(key: String, tables: SmallVec<[CompactArc<str>; 2]>, values: Vec<Value>) {
159    IN_SUBQUERY_CACHE.with(|cache| {
160        cache.borrow_mut().put(key, (tables, values));
161    });
162}
163
164use crate::parser::ast::{Expression, SelectStatement};
165
166/// Extract actual table names from a SelectStatement for cache invalidation.
167/// This returns the real table names (not aliases) because DML operations
168/// reference tables by their actual names, not aliases.
169pub fn extract_table_names_for_cache(stmt: &SelectStatement) -> SmallVec<[CompactArc<str>; 2]> {
170    let mut tables = SmallVec::new();
171    if let Some(ref table_expr) = stmt.table_expr {
172        collect_real_table_names(table_expr, &mut tables);
173    }
174    tables
175}
176
177/// Recursively collect actual table names (not aliases) from a table source expression.
178fn collect_real_table_names(source: &Expression, tables: &mut SmallVec<[CompactArc<str>; 2]>) {
179    match source {
180        Expression::TableSource(ts) => {
181            // Always use the actual table name for cache invalidation
182            tables.push(CompactArc::from(ts.name.value_lower.as_str()));
183        }
184        Expression::JoinSource(js) => {
185            collect_real_table_names(&js.left, tables);
186            collect_real_table_names(&js.right, tables);
187        }
188        Expression::SubquerySource(ss) => {
189            // Recursively extract tables from nested subquery
190            if let Some(ref table_expr) = ss.subquery.table_expr {
191                collect_real_table_names(table_expr, tables);
192            }
193        }
194        _ => {}
195    }
196}
197
198// Cache for semi-join (EXISTS) hash sets to avoid re-execution.
199// Thread-local to avoid synchronization overhead.
200// Uses u64 hash key to avoid string allocation entirely.
201// LRU-bounded to prevent unbounded memory growth.
202use ahash::AHashMap;
203use std::hash::{Hash, Hasher};
204
205/// Cached semi-join entry: (table_name for invalidation, hash_set values)
206type SemiJoinCacheEntry = (CompactArc<str>, CompactArc<ValueSet>);
207
208/// Compute a cache key hash from table, column, and predicate hash without allocation.
209#[inline]
210pub fn compute_semi_join_cache_key(table: &str, column: &str, pred_hash: u64) -> u64 {
211    let mut hasher = rustc_hash::FxHasher::default();
212    table.hash(&mut hasher);
213    column.hash(&mut hasher);
214    pred_hash.hash(&mut hasher);
215    hasher.finish()
216}
217
218thread_local! {
219    static SEMI_JOIN_CACHE: RefCell<LruCache<u64, SemiJoinCacheEntry>> =
220        RefCell::new(LruCache::new(NonZeroUsize::new(SEMI_JOIN_CACHE_SIZE).unwrap()));
221}
222
223/// Clear the semi-join cache completely.
224/// NOTE: This is now only used for explicit cache clearing (e.g., after DDL operations).
225/// For DML operations, use `invalidate_semi_join_cache_for_table` instead.
226pub fn clear_semi_join_cache() {
227    SEMI_JOIN_CACHE.with(|cache| {
228        cache.borrow_mut().clear();
229    });
230}
231
232/// Invalidate semi-join cache entries for a specific table.
233/// Should be called after INSERT, UPDATE, DELETE, or TRUNCATE on a table.
234#[inline]
235pub fn invalidate_semi_join_cache_for_table(table_name: &str) {
236    SEMI_JOIN_CACHE.with(|cache| {
237        let mut c = cache.borrow_mut();
238        if c.is_empty() {
239            return;
240        }
241        // Collect keys to remove (LruCache doesn't have retain)
242        let keys_to_remove: Vec<u64> = c
243            .iter()
244            .filter(|(_, (key_table, _))| key_table.eq_ignore_ascii_case(table_name))
245            .map(|(k, _)| *k)
246            .collect();
247        for key in keys_to_remove {
248            c.pop(&key);
249        }
250    });
251}
252
253/// Get a cached semi-join hash set by key hash.
254#[inline]
255pub fn get_cached_semi_join(key_hash: u64) -> Option<CompactArc<ValueSet>> {
256    SEMI_JOIN_CACHE.with(|cache| {
257        cache
258            .borrow_mut()
259            .get(&key_hash)
260            .map(|(_, v)| CompactArc::clone(v))
261    })
262}
263
264/// Cache a semi-join hash set result (CompactArc version for zero-copy).
265#[inline]
266pub fn cache_semi_join_arc(key_hash: u64, table: &str, values: CompactArc<ValueSet>) {
267    SEMI_JOIN_CACHE.with(|cache| {
268        cache
269            .borrow_mut()
270            .put(key_hash, (CompactArc::from(table), values));
271    });
272}
273
274// Cache for EXISTS predicate filters to avoid re-compilation per row.
275// The key is the predicate expression string (after alias stripping).
276// The value is the compiled RowFilter.
277use super::expression::RowFilter;
278thread_local! {
279    static EXISTS_PREDICATE_CACHE: RefCell<FxHashMap<String, RowFilter>> = RefCell::new(FxHashMap::default());
280}
281
282/// Clear the EXISTS predicate cache. Should be called at the start of each top-level query.
283pub fn clear_exists_predicate_cache() {
284    EXISTS_PREDICATE_CACHE.with(|cache| {
285        cache.borrow_mut().clear();
286    });
287}
288
289/// Get a cached EXISTS predicate filter by key.
290pub fn get_cached_exists_predicate(key: &str) -> Option<RowFilter> {
291    EXISTS_PREDICATE_CACHE.with(|cache| cache.borrow().get(key).cloned())
292}
293
294/// Cache an EXISTS predicate filter.
295pub fn cache_exists_predicate(key: String, filter: RowFilter) {
296    EXISTS_PREDICATE_CACHE.with(|cache| {
297        cache.borrow_mut().insert(key, filter);
298    });
299}
300
301// Cache for EXISTS index lookups to avoid re-fetching per row.
302// The key is "table_name:column_name", the value is the index reference.
303use crate::storage::traits::Index;
304thread_local! {
305    static EXISTS_INDEX_CACHE: RefCell<FxHashMap<String, std::sync::Arc<dyn Index>>> = RefCell::new(FxHashMap::default());
306}
307
308/// Clear the EXISTS index cache. Should be called at the start of each top-level query.
309pub fn clear_exists_index_cache() {
310    EXISTS_INDEX_CACHE.with(|cache| {
311        cache.borrow_mut().clear();
312    });
313}
314
315/// Get a cached EXISTS index by key.
316pub fn get_cached_exists_index(key: &str) -> Option<std::sync::Arc<dyn Index>> {
317    EXISTS_INDEX_CACHE.with(|cache| cache.borrow().get(key).cloned())
318}
319
320/// Cache an EXISTS index.
321pub fn cache_exists_index(key: String, index: std::sync::Arc<dyn Index>) {
322    EXISTS_INDEX_CACHE.with(|cache| {
323        cache.borrow_mut().insert(key, index);
324    });
325}
326
327/// Type alias for row fetcher function used in EXISTS/COUNT optimization.
328pub type RowFetcher = Box<dyn Fn(&[i64]) -> crate::core::RowVec + Send + Sync>;
329
330/// Type alias for row counter function used in COUNT(*) optimization.
331/// This only counts visible rows without cloning their data.
332pub type RowCounter = Box<dyn Fn(&[i64]) -> usize + Send + Sync>;
333
334// Cache for EXISTS row fetchers to avoid repeated version store lookups.
335// The key is the table name, the value is the row fetcher function.
336thread_local! {
337    static EXISTS_FETCHER_CACHE: RefCell<FxHashMap<String, std::sync::Arc<RowFetcher>>> = RefCell::new(FxHashMap::default());
338}
339
340// Cache for COUNT row counters to avoid repeated version store lookups.
341// The key is the table name, the value is the row counter function.
342thread_local! {
343    static COUNT_COUNTER_CACHE: RefCell<FxHashMap<String, std::sync::Arc<RowCounter>>> = RefCell::new(FxHashMap::default());
344}
345
346/// Clear the EXISTS row fetcher cache. Should be called at the start of each top-level query.
347pub fn clear_exists_fetcher_cache() {
348    EXISTS_FETCHER_CACHE.with(|cache| {
349        cache.borrow_mut().clear();
350    });
351}
352
353/// Clear the COUNT row counter cache. Should be called at the start of each top-level query.
354pub fn clear_count_counter_cache() {
355    COUNT_COUNTER_CACHE.with(|cache| {
356        cache.borrow_mut().clear();
357    });
358}
359
360/// Get a cached EXISTS row fetcher by table name.
361pub fn get_cached_exists_fetcher(key: &str) -> Option<std::sync::Arc<RowFetcher>> {
362    EXISTS_FETCHER_CACHE.with(|cache| cache.borrow().get(key).cloned())
363}
364
365/// Get a cached COUNT row counter by table name.
366pub fn get_cached_count_counter(key: &str) -> Option<std::sync::Arc<RowCounter>> {
367    COUNT_COUNTER_CACHE.with(|cache| cache.borrow().get(key).cloned())
368}
369
370/// Cache an EXISTS row fetcher.
371pub fn cache_exists_fetcher(key: String, fetcher: RowFetcher) {
372    EXISTS_FETCHER_CACHE.with(|cache| {
373        cache.borrow_mut().insert(key, std::sync::Arc::new(fetcher));
374    });
375}
376
377/// Cache a COUNT row counter.
378pub fn cache_count_counter(key: String, counter: RowCounter) {
379    COUNT_COUNTER_CACHE.with(|cache| {
380        cache.borrow_mut().insert(key, std::sync::Arc::new(counter));
381    });
382}
383
384// Cache for table schema column names to avoid repeated get_table_schema() calls.
385// The key is the table name, the value is the list of column names.
386thread_local! {
387    static EXISTS_SCHEMA_CACHE: RefCell<FxHashMap<String, CompactArc<Vec<String>>>> = RefCell::new(FxHashMap::default());
388}
389
390/// Clear the EXISTS schema cache. Should be called at the start of each top-level query.
391pub fn clear_exists_schema_cache() {
392    EXISTS_SCHEMA_CACHE.with(|cache| {
393        cache.borrow_mut().clear();
394    });
395}
396
397/// Get cached table column names by table name.
398pub fn get_cached_exists_schema(key: &str) -> Option<CompactArc<Vec<String>>> {
399    EXISTS_SCHEMA_CACHE.with(|cache| cache.borrow().get(key).cloned())
400}
401
402/// Cache table column names (takes Arc for zero-copy sharing).
403pub fn cache_exists_schema(key: String, columns: CompactArc<Vec<String>>) {
404    EXISTS_SCHEMA_CACHE.with(|cache| {
405        cache.borrow_mut().insert(key, columns);
406    });
407}
408
409// Cache for pre-computed EXISTS predicate cache keys to avoid expensive format!("{:?}") on every probe.
410// The key is the subquery pointer address (usize), the value is the predicate cache key.
411thread_local! {
412    static EXISTS_PRED_KEY_CACHE: RefCell<FxHashMap<usize, String>> = RefCell::new(FxHashMap::default());
413}
414
415/// Clear the EXISTS predicate key cache.
416pub fn clear_exists_pred_key_cache() {
417    EXISTS_PRED_KEY_CACHE.with(|cache| {
418        cache.borrow_mut().clear();
419    });
420}
421
422/// Get cached predicate cache key by subquery pointer address.
423#[inline]
424pub fn get_cached_exists_pred_key(subquery_ptr: usize) -> Option<String> {
425    EXISTS_PRED_KEY_CACHE.with(|cache| cache.borrow().get(&subquery_ptr).cloned())
426}
427
428/// Cache a predicate cache key.
429#[inline]
430pub fn cache_exists_pred_key(subquery_ptr: usize, pred_key: String) {
431    EXISTS_PRED_KEY_CACHE.with(|cache| {
432        cache.borrow_mut().insert(subquery_ptr, pred_key);
433    });
434}
435
436// Cache for batch aggregate subquery results (e.g., COUNT(*) GROUP BY user_id).
437// Thread-local to avoid synchronization overhead.
438// The key is a stable identifier for the subquery, the value is a map from group key to aggregate value.
439thread_local! {
440    static BATCH_AGGREGATE_CACHE: RefCell<FxHashMap<String, CompactArc<ValueMap<Value>>>> = RefCell::new(FxHashMap::default());
441}
442
443/// Clear the batch aggregate cache. Should be called at the start of each top-level query.
444pub fn clear_batch_aggregate_cache() {
445    BATCH_AGGREGATE_CACHE.with(|cache| {
446        let mut c = cache.borrow_mut();
447        c.clear();
448        c.shrink_to_fit();
449    });
450}
451
452/// Get a cached batch aggregate result map by subquery identifier.
453pub fn get_cached_batch_aggregate(key: &str) -> Option<CompactArc<ValueMap<Value>>> {
454    BATCH_AGGREGATE_CACHE.with(|cache| cache.borrow().get(key).cloned())
455}
456
457/// Cache a batch aggregate result map.
458pub fn cache_batch_aggregate(key: String, values: ValueMap<Value>) {
459    BATCH_AGGREGATE_CACHE.with(|cache| {
460        cache.borrow_mut().insert(key, CompactArc::new(values));
461    });
462}
463
464/// Pre-computed info for batch aggregate lookups to avoid per-row allocations.
465#[derive(Clone)]
466pub struct BatchAggregateLookupInfo {
467    /// The cache key for the batch aggregate results
468    pub cache_key: String,
469    /// The outer column name (lowercase) to look up in outer_row
470    pub outer_column_lower: String,
471    /// Optional qualified outer column name (e.g., "u.id")
472    pub outer_qualified_lower: Option<String>,
473    /// Whether this is a COUNT expression (returns 0 for missing keys)
474    pub is_count: bool,
475}
476
477// Cache for batch aggregate lookup info to avoid recomputing per row.
478// The key is the subquery pointer address (usize), avoiding expensive to_string() per row.
479// Value is Arc-wrapped to avoid cloning strings on every lookup.
480thread_local! {
481    static BATCH_AGGREGATE_INFO_CACHE: RefCell<FxHashMap<usize, Option<Arc<BatchAggregateLookupInfo>>>> = RefCell::new(FxHashMap::default());
482}
483
484/// Clear the batch aggregate info cache.
485pub fn clear_batch_aggregate_info_cache() {
486    BATCH_AGGREGATE_INFO_CACHE.with(|cache| {
487        let mut c = cache.borrow_mut();
488        c.clear();
489        c.shrink_to_fit();
490    });
491}
492
493/// Get cached batch aggregate lookup info by subquery pointer address.
494/// Returns Arc to avoid cloning strings on every lookup.
495#[inline]
496pub fn get_cached_batch_aggregate_info(
497    subquery_ptr: usize,
498) -> Option<Option<Arc<BatchAggregateLookupInfo>>> {
499    BATCH_AGGREGATE_INFO_CACHE.with(|cache| cache.borrow().get(&subquery_ptr).cloned())
500}
501
502/// Cache batch aggregate lookup info and return the Arc-wrapped version.
503/// Returns None if info was None (not batchable).
504#[inline]
505pub fn cache_batch_aggregate_info(
506    subquery_ptr: usize,
507    info: Option<BatchAggregateLookupInfo>,
508) -> Option<Arc<BatchAggregateLookupInfo>> {
509    let arc_info = info.map(Arc::new);
510    let result = arc_info.clone();
511    BATCH_AGGREGATE_INFO_CACHE.with(|cache| {
512        cache.borrow_mut().insert(subquery_ptr, arc_info);
513    });
514    result
515}
516
517/// Pre-computed info for index nested loop EXISTS lookups to avoid per-row string operations.
518/// This caches the pre-computed lowercase column names for O(1) outer row lookups.
519#[derive(Clone)]
520pub struct ExistsCorrelationInfo {
521    /// The outer column name in original case
522    pub outer_column: String,
523    /// The outer table name (optional)
524    pub outer_table: Option<String>,
525    /// The inner column name
526    pub inner_column: String,
527    /// The inner table name
528    pub inner_table: String,
529    /// Pre-computed lowercase outer column name for fast HashMap lookup
530    pub outer_column_lower: String,
531    /// Pre-computed qualified outer column name (e.g., "u.id") in lowercase
532    pub outer_qualified_lower: Option<String>,
533    /// The additional predicate beyond the correlation (if any)
534    pub additional_predicate: Option<Expression>,
535    /// Pre-computed index cache key ("table:column") to avoid per-probe format! allocation
536    pub index_cache_key: String,
537}
538
539// Cache for EXISTS correlation info to avoid per-row extraction.
540// The key is the subquery pointer address (usize), avoiding format! allocation.
541// Value is Arc-wrapped to avoid cloning strings on every lookup.
542thread_local! {
543    static EXISTS_CORRELATION_CACHE: RefCell<FxHashMap<usize, Option<Arc<ExistsCorrelationInfo>>>> = RefCell::new(FxHashMap::default());
544}
545
546/// Clear the EXISTS correlation cache.
547pub fn clear_exists_correlation_cache() {
548    EXISTS_CORRELATION_CACHE.with(|cache| {
549        cache.borrow_mut().clear();
550    });
551}
552
553/// Clear ALL thread-local caches to release memory.
554/// Call this when a database is dropped to prevent memory leaks.
555/// This also shrinks all cache capacities to zero where applicable.
556pub fn clear_all_thread_local_caches() {
557    // Clear LRU-bounded caches (no shrink_to_fit needed - fixed capacity)
558    SCALAR_SUBQUERY_CACHE.with(|cache| {
559        cache.borrow_mut().clear();
560    });
561    IN_SUBQUERY_CACHE.with(|cache| {
562        cache.borrow_mut().clear();
563    });
564    SEMI_JOIN_CACHE.with(|cache| {
565        cache.borrow_mut().clear();
566    });
567    // Clear and shrink unbounded caches
568    EXISTS_PREDICATE_CACHE.with(|cache| {
569        let mut c = cache.borrow_mut();
570        c.clear();
571        c.shrink_to_fit();
572    });
573    EXISTS_INDEX_CACHE.with(|cache| {
574        let mut c = cache.borrow_mut();
575        c.clear();
576        c.shrink_to_fit();
577    });
578    EXISTS_FETCHER_CACHE.with(|cache| {
579        let mut c = cache.borrow_mut();
580        c.clear();
581        c.shrink_to_fit();
582    });
583    COUNT_COUNTER_CACHE.with(|cache| {
584        let mut c = cache.borrow_mut();
585        c.clear();
586        c.shrink_to_fit();
587    });
588    EXISTS_SCHEMA_CACHE.with(|cache| {
589        let mut c = cache.borrow_mut();
590        c.clear();
591        c.shrink_to_fit();
592    });
593    EXISTS_PRED_KEY_CACHE.with(|cache| {
594        let mut c = cache.borrow_mut();
595        c.clear();
596        c.shrink_to_fit();
597    });
598    BATCH_AGGREGATE_CACHE.with(|cache| {
599        let mut c = cache.borrow_mut();
600        c.clear();
601        c.shrink_to_fit();
602    });
603    BATCH_AGGREGATE_INFO_CACHE.with(|cache| {
604        let mut c = cache.borrow_mut();
605        c.clear();
606        c.shrink_to_fit();
607    });
608    EXISTS_CORRELATION_CACHE.with(|cache| {
609        let mut c = cache.borrow_mut();
610        c.clear();
611        c.shrink_to_fit();
612    });
613
614    // Clear storage expression caches (regex patterns)
615    crate::storage::expression::clear_regex_cache();
616    crate::storage::expression::clear_like_regex_cache();
617
618    // Clear RowVec and RowIdVec thread-local pools
619    crate::core::row_vec::clear_row_vec_pool();
620    crate::core::row_vec::clear_row_id_vec_pool();
621
622    // Clear global transaction version map pools
623    crate::storage::mvcc::clear_version_map_pools();
624
625    // Clear global LRU caches
626    super::expression::clear_program_cache();
627    super::query_classification::clear_classification_cache();
628}
629
630/// Get cached EXISTS correlation info by subquery pointer address.
631/// Returns Arc to avoid cloning strings on every lookup.
632#[inline]
633pub fn get_cached_exists_correlation(
634    subquery_ptr: usize,
635) -> Option<Option<Arc<ExistsCorrelationInfo>>> {
636    EXISTS_CORRELATION_CACHE.with(|cache| cache.borrow().get(&subquery_ptr).cloned())
637}
638
639/// Cache EXISTS correlation info and return the Arc-wrapped version.
640/// Returns None if info was None (correlation not extractable).
641#[inline]
642pub fn cache_exists_correlation(
643    subquery_ptr: usize,
644    info: Option<ExistsCorrelationInfo>,
645) -> Option<Arc<ExistsCorrelationInfo>> {
646    let arc_info = info.map(Arc::new);
647    let result = arc_info.clone();
648    EXISTS_CORRELATION_CACHE.with(|cache| {
649        cache.borrow_mut().insert(subquery_ptr, arc_info);
650    });
651    result
652}
653
654/// Execution context for SQL queries
655///
656/// The execution context carries state and configuration for query execution,
657/// including parameters, transaction state, and cancellation support.
658///
659/// Note: This struct uses Arc for immutable shared data to make cloning cheap
660/// during correlated subquery processing where context is cloned per row.
661#[derive(Debug, Clone)]
662pub struct ExecutionContext {
663    /// Query parameters ($1, $2, etc.) - wrapped in Arc for cheap cloning
664    params: CompactArc<ParamVec>,
665    /// Named parameters (:name) - wrapped in Arc for cheap cloning
666    named_params: Arc<FxHashMap<String, Value>>,
667    /// Whether to use auto-commit for DML statements
668    auto_commit: bool,
669    /// Cancellation flag
670    cancelled: Arc<AtomicBool>,
671    /// Current database/schema name - wrapped in Arc for cheap cloning
672    current_database: Arc<Option<String>>,
673    /// Session variables (SET key = value) - wrapped in Arc for cheap cloning
674    session_vars: Arc<AHashMap<String, Value>>,
675    /// Query timeout in milliseconds (0 = no timeout)
676    timeout_ms: u64,
677    /// Current view nesting depth (for detecting infinite recursion)
678    view_depth: usize,
679    /// Query execution depth (0 = top-level query, >0 = subquery/nested)
680    /// Used to ensure TimeoutGuard is only created once at the top level
681    pub(crate) query_depth: usize,
682    /// Outer row context for correlated subqueries
683    /// Maps column name (lowercase) to value from the outer query
684    /// Uses FxHashMap<CompactArc<str>, Value> for zero-cost key cloning in hot loops
685    /// pub(crate) to allow taking ownership back for reuse in optimized loops
686    pub(crate) outer_row: Option<FxHashMap<CompactArc<str>, Value>>,
687    /// Outer row column names (for qualified identifier resolution) - wrapped in Arc
688    outer_columns: Option<CompactArc<Vec<String>>>,
689    /// CTE data for subqueries to reference CTEs from outer query
690    /// Maps CTE name (lowercase) to (columns, rows)
691    cte_data: Option<Arc<CteDataMap>>,
692    /// Current transaction ID for CURRENT_TRANSACTION_ID() function
693    transaction_id: Option<u64>,
694}
695
696/// Type alias for CTE data: (columns, rows) with Arc for zero-copy sharing
697/// Uses Vec<(i64, Row)> for rows - same structure as RowVec but Arc-shareable
698type CteData = (CompactArc<Vec<String>>, CompactArc<Vec<(i64, Row)>>);
699
700/// Type alias for CTE data map to reduce type complexity
701/// Uses CompactArc<Vec<String>> for columns and CompactArc<Vec<(i64, Row)>> for rows
702/// to enable zero-copy sharing of CTE results with joins
703type CteDataMap = StringMap<CteData>;
704
705impl Default for ExecutionContext {
706    fn default() -> Self {
707        Self::new()
708    }
709}
710
711impl ExecutionContext {
712    /// Create a new empty execution context
713    /// Uses static defaults for empty collections to avoid allocations
714    pub fn new() -> Self {
715        Self {
716            params: EMPTY_PARAMS.clone(),
717            named_params: EMPTY_NAMED_PARAMS.clone(),
718            auto_commit: true,
719            cancelled: Arc::new(AtomicBool::new(false)), // Each context needs own flag
720            current_database: EMPTY_DATABASE.clone(),
721            session_vars: EMPTY_SESSION_VARS.clone(),
722            timeout_ms: 0,
723            view_depth: 0,
724            query_depth: 0,
725            outer_row: None,
726            outer_columns: None,
727            cte_data: None,
728            transaction_id: None,
729        }
730    }
731
732    /// Create an execution context with positional parameters
733    pub fn with_params(params: ParamVec) -> Self {
734        Self {
735            params: CompactArc::new(params),
736            ..Self::new()
737        }
738    }
739
740    /// Create an execution context with named parameters
741    pub fn with_named_params(named_params: FxHashMap<String, Value>) -> Self {
742        Self {
743            named_params: Arc::new(named_params),
744            ..Self::new()
745        }
746    }
747
748    /// Get a positional parameter by index (1-based)
749    pub fn get_param(&self, index: usize) -> Option<&Value> {
750        if index == 0 || index > self.params.len() {
751            None
752        } else {
753            self.params.get(index - 1)
754        }
755    }
756
757    /// Get a named parameter by name
758    pub fn get_named_param(&self, name: &str) -> Option<&Value> {
759        self.named_params.get(name)
760    }
761
762    /// Get all positional parameters
763    pub fn params(&self) -> &[Value] {
764        &self.params
765    }
766
767    /// Get the params Arc for zero-copy sharing.
768    /// Used by evaluator bridge to avoid cloning params.
769    pub fn params_arc(&self) -> &CompactArc<ParamVec> {
770        &self.params
771    }
772
773    /// Get all named parameters
774    pub fn named_params(&self) -> &FxHashMap<String, Value> {
775        &self.named_params
776    }
777
778    /// Get the named_params Arc for zero-copy sharing.
779    /// Used by evaluator bridge to avoid cloning params.
780    pub fn named_params_arc(&self) -> &Arc<FxHashMap<String, Value>> {
781        &self.named_params
782    }
783
784    /// Get the number of positional parameters
785    pub fn param_count(&self) -> usize {
786        self.params.len()
787    }
788
789    /// Set positional parameters
790    pub fn set_params(&mut self, params: ParamVec) {
791        self.params = CompactArc::new(params);
792    }
793
794    /// Add a positional parameter
795    pub fn add_param(&mut self, value: Value) {
796        CompactArc::make_mut(&mut self.params).push(value);
797    }
798
799    /// Set a named parameter
800    pub fn set_named_param(&mut self, name: impl Into<String>, value: Value) {
801        Arc::make_mut(&mut self.named_params).insert(name.into(), value);
802    }
803
804    /// Check if auto-commit is enabled
805    pub fn auto_commit(&self) -> bool {
806        self.auto_commit
807    }
808
809    /// Set auto-commit mode
810    pub fn set_auto_commit(&mut self, auto_commit: bool) {
811        self.auto_commit = auto_commit;
812    }
813
814    /// Check if the query has been cancelled
815    pub fn is_cancelled(&self) -> bool {
816        self.cancelled.load(Ordering::Relaxed)
817    }
818
819    /// Cancel the query
820    pub fn cancel(&self) {
821        self.cancelled.store(true, Ordering::Relaxed);
822    }
823
824    /// Get a cancellation handle that can be used from another thread
825    pub fn cancellation_handle(&self) -> CancellationHandle {
826        CancellationHandle {
827            cancelled: self.cancelled.clone(),
828        }
829    }
830
831    /// Get the current database/schema name
832    pub fn current_database(&self) -> Option<&str> {
833        self.current_database.as_ref().as_deref()
834    }
835
836    /// Set the current database/schema name
837    pub fn set_current_database(&mut self, database: impl Into<String>) {
838        self.current_database = Arc::new(Some(database.into()));
839    }
840
841    /// Get a session variable
842    pub fn get_session_var(&self, name: &str) -> Option<&Value> {
843        self.session_vars.get(name)
844    }
845
846    /// Set a session variable
847    pub fn set_session_var(&mut self, name: impl Into<String>, value: Value) {
848        Arc::make_mut(&mut self.session_vars).insert(name.into(), value);
849    }
850
851    /// Get the query timeout in milliseconds
852    pub fn timeout_ms(&self) -> u64 {
853        self.timeout_ms
854    }
855
856    /// Set the query timeout in milliseconds
857    pub fn set_timeout_ms(&mut self, timeout_ms: u64) {
858        self.timeout_ms = timeout_ms;
859    }
860
861    /// Check if a timeout has been set
862    pub fn has_timeout(&self) -> bool {
863        self.timeout_ms > 0
864    }
865
866    /// Get the current view nesting depth
867    pub fn view_depth(&self) -> usize {
868        self.view_depth
869    }
870
871    /// Create a new context with incremented view depth.
872    /// Used when executing nested views to track recursion depth.
873    /// Also increments query_depth since views are nested queries.
874    pub fn with_incremented_view_depth(&self) -> Self {
875        Self {
876            params: self.params.clone(),
877            named_params: self.named_params.clone(),
878            auto_commit: self.auto_commit,
879            cancelled: self.cancelled.clone(),
880            current_database: self.current_database.clone(),
881            session_vars: self.session_vars.clone(),
882            timeout_ms: self.timeout_ms,
883            view_depth: self.view_depth + 1,
884            query_depth: self.query_depth + 1, // Views are nested queries
885            outer_row: self.outer_row.clone(),
886            outer_columns: self.outer_columns.clone(),
887            cte_data: self.cte_data.clone(),
888            transaction_id: self.transaction_id,
889        }
890    }
891
892    /// Create a new context with incremented query depth.
893    /// Used when executing subqueries to ensure TimeoutGuard is only created at the top level.
894    pub fn with_incremented_query_depth(&self) -> Self {
895        Self {
896            params: self.params.clone(),
897            named_params: self.named_params.clone(),
898            auto_commit: self.auto_commit,
899            cancelled: self.cancelled.clone(),
900            current_database: self.current_database.clone(),
901            session_vars: self.session_vars.clone(),
902            timeout_ms: self.timeout_ms,
903            view_depth: self.view_depth,
904            query_depth: self.query_depth + 1,
905            outer_row: self.outer_row.clone(),
906            outer_columns: self.outer_columns.clone(),
907            cte_data: self.cte_data.clone(),
908            transaction_id: self.transaction_id,
909        }
910    }
911
912    /// Get the outer row context for correlated subqueries
913    pub fn outer_row(&self) -> Option<&FxHashMap<CompactArc<str>, Value>> {
914        self.outer_row.as_ref()
915    }
916
917    /// Get the outer row columns for correlated subqueries
918    pub fn outer_columns(&self) -> Option<&[String]> {
919        self.outer_columns.as_ref().map(|v| v.as_slice())
920    }
921
922    /// Create a new context with outer row context for correlated subqueries.
923    /// The outer_row maps lowercase column names (as CompactArc<str>) to their values.
924    /// NOTE: This is now cheap to clone due to Arc wrapping of immutable fields.
925    pub fn with_outer_row(
926        &self,
927        outer_row: FxHashMap<CompactArc<str>, Value>,
928        outer_columns: CompactArc<Vec<String>>,
929    ) -> Self {
930        Self {
931            params: self.params.clone(),             // Arc clone = cheap
932            named_params: self.named_params.clone(), // Arc clone = cheap
933            auto_commit: self.auto_commit,
934            cancelled: self.cancelled.clone(), // Arc clone = cheap
935            current_database: self.current_database.clone(), // Arc clone = cheap
936            session_vars: self.session_vars.clone(), // Arc clone = cheap
937            timeout_ms: self.timeout_ms,
938            view_depth: self.view_depth,
939            query_depth: self.query_depth + 1, // Increment for subquery
940            outer_row: Some(outer_row),
941            outer_columns: Some(outer_columns), // Arc clone = cheap
942            cte_data: self.cte_data.clone(),    // Arc clone = cheap
943            transaction_id: self.transaction_id,
944        }
945    }
946
947    /// Get CTE data by name (case-insensitive)
948    /// Returns Arc references to enable zero-copy sharing with joins
949    pub fn get_cte(&self, name: &str) -> Option<&CteData> {
950        self.cte_data
951            .as_ref()
952            .and_then(|data| data.get(&name.to_lowercase()))
953    }
954
955    /// Get CTE data by name that is already lowercase.
956    /// Use this when the name is known to be lowercase (e.g., from value_lower fields)
957    /// to avoid redundant to_lowercase() allocation.
958    #[inline]
959    pub fn get_cte_by_lower(&self, name_lower: &str) -> Option<&CteData> {
960        self.cte_data.as_ref().and_then(|data| data.get(name_lower))
961    }
962
963    /// Check if context has CTE data
964    pub fn has_cte(&self, name: &str) -> bool {
965        self.cte_data
966            .as_ref()
967            .is_some_and(|data| data.contains_key(&name.to_lowercase()))
968    }
969
970    /// Check if context has CTE data by name that is already lowercase.
971    /// Use this when the name is known to be lowercase to avoid allocation.
972    #[inline]
973    pub fn has_cte_by_lower(&self, name_lower: &str) -> bool {
974        self.cte_data
975            .as_ref()
976            .is_some_and(|data| data.contains_key(name_lower))
977    }
978
979    /// Create a new context with CTE data for subqueries to reference
980    /// Takes an Arc to avoid cloning large CTE datasets
981    pub fn with_cte_data(&self, cte_data: Arc<CteDataMap>) -> Self {
982        Self {
983            params: self.params.clone(),
984            named_params: self.named_params.clone(),
985            auto_commit: self.auto_commit,
986            cancelled: self.cancelled.clone(),
987            current_database: self.current_database.clone(),
988            session_vars: self.session_vars.clone(),
989            timeout_ms: self.timeout_ms,
990            view_depth: self.view_depth,
991            query_depth: self.query_depth,
992            outer_row: self.outer_row.clone(),
993            outer_columns: self.outer_columns.clone(),
994            cte_data: Some(cte_data),
995            transaction_id: self.transaction_id,
996        }
997    }
998
999    /// Get the current transaction ID
1000    pub fn transaction_id(&self) -> Option<u64> {
1001        self.transaction_id
1002    }
1003
1004    /// Set the transaction ID
1005    pub fn set_transaction_id(&mut self, txn_id: u64) {
1006        self.transaction_id = Some(txn_id);
1007    }
1008
1009    /// Create a new context with a transaction ID
1010    pub fn with_transaction_id(&self, txn_id: u64) -> Self {
1011        Self {
1012            params: self.params.clone(),
1013            named_params: self.named_params.clone(),
1014            auto_commit: self.auto_commit,
1015            cancelled: self.cancelled.clone(),
1016            current_database: self.current_database.clone(),
1017            session_vars: self.session_vars.clone(),
1018            timeout_ms: self.timeout_ms,
1019            view_depth: self.view_depth,
1020            query_depth: self.query_depth,
1021            outer_row: self.outer_row.clone(),
1022            outer_columns: self.outer_columns.clone(),
1023            cte_data: self.cte_data.clone(),
1024            transaction_id: Some(txn_id),
1025        }
1026    }
1027
1028    /// Check for cancellation and return an error if cancelled
1029    pub fn check_cancelled(&self) -> Result<()> {
1030        if self.is_cancelled() {
1031            Err(crate::core::Error::QueryCancelled)
1032        } else {
1033            Ok(())
1034        }
1035    }
1036}
1037
1038/// Handle for cancelling a query from another thread
1039#[derive(Debug, Clone)]
1040pub struct CancellationHandle {
1041    cancelled: Arc<AtomicBool>,
1042}
1043
1044impl CancellationHandle {
1045    /// Cancel the query
1046    pub fn cancel(&self) {
1047        self.cancelled.store(true, Ordering::Relaxed);
1048    }
1049
1050    /// Check if the query has been cancelled
1051    pub fn is_cancelled(&self) -> bool {
1052        self.cancelled.load(Ordering::Relaxed)
1053    }
1054}
1055
1056// ============================================================================
1057// Global Timeout Manager
1058// ============================================================================
1059//
1060// Uses a single background thread to manage all query timeouts efficiently.
1061// This avoids spawning a new thread for each query with a timeout.
1062
1063/// Entry in the timeout priority queue
1064struct TimeoutEntry {
1065    /// When the timeout expires
1066    deadline: Instant,
1067    /// Unique ID for this timeout (for cancellation)
1068    id: u64,
1069    /// Handle to cancel the query
1070    cancel_handle: CancellationHandle,
1071    /// Whether this timeout has been cancelled (query completed)
1072    cancelled: Arc<AtomicBool>,
1073}
1074
1075impl PartialEq for TimeoutEntry {
1076    fn eq(&self, other: &Self) -> bool {
1077        self.deadline == other.deadline && self.id == other.id
1078    }
1079}
1080
1081impl Eq for TimeoutEntry {}
1082
1083impl PartialOrd for TimeoutEntry {
1084    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1085        Some(self.cmp(other))
1086    }
1087}
1088
1089impl Ord for TimeoutEntry {
1090    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1091        // Reverse ordering so BinaryHeap becomes a min-heap (earliest deadline first)
1092        other.deadline.cmp(&self.deadline)
1093    }
1094}
1095
1096/// Global timeout manager state
1097struct TimeoutManagerState {
1098    /// Priority queue of pending timeouts (min-heap by deadline)
1099    timeouts: BinaryHeap<TimeoutEntry>,
1100    /// Whether the manager is shutting down
1101    shutdown: bool,
1102}
1103
1104/// Global timeout manager that handles all query timeouts in a single thread
1105struct TimeoutManager {
1106    /// Shared state protected by mutex
1107    state: Mutex<TimeoutManagerState>,
1108    /// Condition variable to wake the timer thread
1109    condvar: Condvar,
1110    /// Counter for generating unique timeout IDs
1111    next_id: AtomicU64,
1112}
1113
1114impl TimeoutManager {
1115    /// Create a new timeout manager and spawn its background thread
1116    fn new() -> Arc<Self> {
1117        let manager = Arc::new(Self {
1118            state: Mutex::new(TimeoutManagerState {
1119                timeouts: BinaryHeap::new(),
1120                shutdown: false,
1121            }),
1122            condvar: Condvar::new(),
1123            next_id: AtomicU64::new(1),
1124        });
1125
1126        // Spawn the background timer thread
1127        let manager_clone = Arc::clone(&manager);
1128        std::thread::Builder::new()
1129            .name("stoolap-timeout-manager".to_string())
1130            .spawn(move || {
1131                manager_clone.run();
1132            })
1133            .expect("Failed to spawn timeout manager thread");
1134
1135        manager
1136    }
1137
1138    /// Background thread loop
1139    fn run(&self) {
1140        loop {
1141            let mut state = self.state.lock().unwrap();
1142
1143            // Check for shutdown
1144            if state.shutdown && state.timeouts.is_empty() {
1145                return;
1146            }
1147
1148            // Process expired timeouts
1149            let now = Instant::now();
1150            while let Some(entry) = state.timeouts.peek() {
1151                if entry.deadline <= now {
1152                    let entry = state.timeouts.pop().unwrap();
1153                    // Only cancel if the timeout wasn't already cancelled
1154                    if !entry.cancelled.load(Ordering::Relaxed) {
1155                        entry.cancel_handle.cancel();
1156                    }
1157                } else {
1158                    break;
1159                }
1160            }
1161
1162            // Calculate wait time until next timeout
1163            let wait_duration = if let Some(entry) = state.timeouts.peek() {
1164                entry.deadline.saturating_duration_since(now)
1165            } else {
1166                // No timeouts pending, wait indefinitely for new work
1167                Duration::from_secs(3600) // 1 hour max wait
1168            };
1169
1170            // Wait for new work or timeout
1171            if wait_duration.is_zero() {
1172                continue; // Immediately process
1173            }
1174            let (new_state, _timeout_result) =
1175                self.condvar.wait_timeout(state, wait_duration).unwrap();
1176            state = new_state;
1177
1178            // Re-check shutdown after waking
1179            if state.shutdown && state.timeouts.is_empty() {
1180                return;
1181            }
1182        }
1183    }
1184
1185    /// Register a new timeout, returns the timeout ID
1186    fn register(
1187        &self,
1188        timeout_ms: u64,
1189        cancel_handle: CancellationHandle,
1190        cancelled: Arc<AtomicBool>,
1191    ) -> u64 {
1192        let id = self.next_id.fetch_add(1, Ordering::Relaxed);
1193        let deadline = Instant::now() + Duration::from_millis(timeout_ms);
1194
1195        let entry = TimeoutEntry {
1196            deadline,
1197            id,
1198            cancel_handle,
1199            cancelled,
1200        };
1201
1202        let mut state = self.state.lock().unwrap();
1203        let was_empty = state.timeouts.is_empty();
1204        let is_earliest = state.timeouts.peek().is_none_or(|e| deadline < e.deadline);
1205
1206        state.timeouts.push(entry);
1207
1208        // Wake the timer thread if this is the new earliest deadline
1209        if was_empty || is_earliest {
1210            self.condvar.notify_one();
1211        }
1212
1213        id
1214    }
1215}
1216
1217/// Get or create the global timeout manager
1218fn global_timeout_manager() -> &'static Arc<TimeoutManager> {
1219    use std::sync::OnceLock;
1220    static MANAGER: OnceLock<Arc<TimeoutManager>> = OnceLock::new();
1221    MANAGER.get_or_init(TimeoutManager::new)
1222}
1223
1224/// Guard that automatically cancels a query after a timeout.
1225/// Uses a global timeout manager for efficient handling of many concurrent timeouts.
1226pub struct TimeoutGuard {
1227    /// Flag to signal that the query completed (timeout should be ignored)
1228    cancelled: Arc<AtomicBool>,
1229}
1230
1231impl TimeoutGuard {
1232    /// Create a new timeout guard that will cancel the query after timeout_ms.
1233    /// Returns None if timeout_ms is 0 (no timeout).
1234    pub fn new(ctx: &ExecutionContext) -> Option<Self> {
1235        let timeout_ms = ctx.timeout_ms();
1236        if timeout_ms == 0 {
1237            return None;
1238        }
1239
1240        let cancel_handle = ctx.cancellation_handle();
1241        let cancelled = Arc::new(AtomicBool::new(false));
1242
1243        // Register with the global timeout manager
1244        global_timeout_manager().register(timeout_ms, cancel_handle, Arc::clone(&cancelled));
1245
1246        Some(Self { cancelled })
1247    }
1248}
1249
1250impl Drop for TimeoutGuard {
1251    fn drop(&mut self) {
1252        // Mark this timeout as cancelled so the manager ignores it
1253        self.cancelled.store(true, Ordering::Relaxed);
1254    }
1255}
1256
1257/// Builder for ExecutionContext
1258pub struct ExecutionContextBuilder {
1259    ctx: ExecutionContext,
1260}
1261
1262impl ExecutionContextBuilder {
1263    /// Create a new builder
1264    pub fn new() -> Self {
1265        Self {
1266            ctx: ExecutionContext::new(),
1267        }
1268    }
1269
1270    /// Add positional parameters
1271    pub fn params(mut self, params: ParamVec) -> Self {
1272        self.ctx.params = CompactArc::new(params);
1273        self
1274    }
1275
1276    /// Add a positional parameter
1277    pub fn param(self, value: Value) -> Self {
1278        let mut v = (*self.ctx.params).clone();
1279        v.push(value);
1280        Self {
1281            ctx: ExecutionContext {
1282                params: CompactArc::new(v),
1283                ..self.ctx
1284            },
1285        }
1286    }
1287
1288    /// Add a named parameter
1289    pub fn named_param(self, name: impl Into<String>, value: Value) -> Self {
1290        Self {
1291            ctx: ExecutionContext {
1292                named_params: Arc::new({
1293                    let mut m = (*self.ctx.named_params).clone();
1294                    m.insert(name.into(), value);
1295                    m
1296                }),
1297                ..self.ctx
1298            },
1299        }
1300    }
1301
1302    /// Set auto-commit mode
1303    pub fn auto_commit(mut self, auto_commit: bool) -> Self {
1304        self.ctx.auto_commit = auto_commit;
1305        self
1306    }
1307
1308    /// Set the current database
1309    pub fn database(mut self, database: impl Into<String>) -> Self {
1310        self.ctx.current_database = Arc::new(Some(database.into()));
1311        self
1312    }
1313
1314    /// Set a session variable
1315    pub fn session_var(self, name: impl Into<String>, value: Value) -> Self {
1316        Self {
1317            ctx: ExecutionContext {
1318                session_vars: Arc::new({
1319                    let mut m = (*self.ctx.session_vars).clone();
1320                    m.insert(name.into(), value);
1321                    m
1322                }),
1323                ..self.ctx
1324            },
1325        }
1326    }
1327
1328    /// Set the query timeout
1329    pub fn timeout_ms(mut self, timeout_ms: u64) -> Self {
1330        self.ctx.timeout_ms = timeout_ms;
1331        self
1332    }
1333
1334    /// Build the execution context
1335    pub fn build(self) -> ExecutionContext {
1336        self.ctx
1337    }
1338}
1339
1340impl Default for ExecutionContextBuilder {
1341    fn default() -> Self {
1342        Self::new()
1343    }
1344}
1345
1346#[cfg(test)]
1347mod tests {
1348    use super::*;
1349    use rustc_hash::FxHashMap;
1350
1351    #[test]
1352    fn test_context_new() {
1353        let ctx = ExecutionContext::new();
1354        assert_eq!(ctx.param_count(), 0);
1355        assert!(ctx.auto_commit());
1356        assert!(!ctx.is_cancelled());
1357    }
1358
1359    #[test]
1360    fn test_context_with_params() {
1361        let ctx = ExecutionContext::with_params(smallvec::smallvec![
1362            Value::Integer(1),
1363            Value::text("hello")
1364        ]);
1365        assert_eq!(ctx.param_count(), 2);
1366        assert_eq!(ctx.get_param(1), Some(&Value::Integer(1)));
1367        assert_eq!(ctx.get_param(2), Some(&Value::text("hello")));
1368        assert_eq!(ctx.get_param(0), None); // 0 is invalid
1369        assert_eq!(ctx.get_param(3), None); // Out of bounds
1370    }
1371
1372    #[test]
1373    fn test_context_named_params() {
1374        let mut params = FxHashMap::default();
1375        params.insert("name".to_string(), Value::text("Alice"));
1376        params.insert("age".to_string(), Value::Integer(30));
1377
1378        let ctx = ExecutionContext::with_named_params(params);
1379        assert_eq!(ctx.get_named_param("name"), Some(&Value::text("Alice")));
1380        assert_eq!(ctx.get_named_param("age"), Some(&Value::Integer(30)));
1381        assert_eq!(ctx.get_named_param("unknown"), None);
1382    }
1383
1384    #[test]
1385    fn test_context_cancellation() {
1386        let ctx = ExecutionContext::new();
1387        assert!(!ctx.is_cancelled());
1388
1389        let handle = ctx.cancellation_handle();
1390        assert!(!handle.is_cancelled());
1391
1392        handle.cancel();
1393        assert!(ctx.is_cancelled());
1394        assert!(handle.is_cancelled());
1395    }
1396
1397    #[test]
1398    fn test_context_check_cancelled() {
1399        let ctx = ExecutionContext::new();
1400        assert!(ctx.check_cancelled().is_ok());
1401
1402        ctx.cancel();
1403        assert!(ctx.check_cancelled().is_err());
1404    }
1405
1406    #[test]
1407    fn test_context_session_vars() {
1408        let mut ctx = ExecutionContext::new();
1409        ctx.set_session_var("timezone", Value::text("UTC"));
1410
1411        assert_eq!(ctx.get_session_var("timezone"), Some(&Value::text("UTC")));
1412        assert_eq!(ctx.get_session_var("unknown"), None);
1413    }
1414
1415    #[test]
1416    fn test_context_builder() {
1417        let ctx = ExecutionContextBuilder::new()
1418            .params(smallvec::smallvec![Value::Integer(1)])
1419            .param(Value::Integer(2))
1420            .named_param("name", Value::text("test"))
1421            .auto_commit(false)
1422            .database("mydb")
1423            .timeout_ms(5000)
1424            .build();
1425
1426        assert_eq!(ctx.param_count(), 2);
1427        assert_eq!(ctx.get_param(1), Some(&Value::Integer(1)));
1428        assert_eq!(ctx.get_param(2), Some(&Value::Integer(2)));
1429        assert_eq!(ctx.get_named_param("name"), Some(&Value::text("test")));
1430        assert!(!ctx.auto_commit());
1431        assert_eq!(ctx.current_database(), Some("mydb"));
1432        assert_eq!(ctx.timeout_ms(), 5000);
1433    }
1434}