1use crate::common::time_compat::Instant;
21use lru::LruCache;
22use rustc_hash::FxHashMap;
23use std::cell::RefCell;
24use std::collections::BinaryHeap;
25use std::num::NonZeroUsize;
26use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
27use std::sync::{Arc, Condvar, LazyLock, Mutex};
28use std::time::Duration;
29
30const SCALAR_SUBQUERY_CACHE_SIZE: usize = 128;
33const IN_SUBQUERY_CACHE_SIZE: usize = 128;
34const SEMI_JOIN_CACHE_SIZE: usize = 256;
35
36use crate::api::params::ParamVec;
37use crate::common::{CompactArc, StringMap};
38use crate::core::{Result, Row, Value, ValueMap, ValueSet};
39
40static EMPTY_PARAMS: LazyLock<CompactArc<ParamVec>> =
44 LazyLock::new(|| CompactArc::new(ParamVec::new()));
45static EMPTY_NAMED_PARAMS: LazyLock<Arc<FxHashMap<String, Value>>> =
46 LazyLock::new(|| Arc::new(FxHashMap::default()));
47static EMPTY_DATABASE: LazyLock<Arc<Option<String>>> = LazyLock::new(|| Arc::new(None));
48static EMPTY_SESSION_VARS: LazyLock<Arc<AHashMap<String, Value>>> =
49 LazyLock::new(|| Arc::new(AHashMap::new()));
50
51use smallvec::SmallVec;
57
58type ScalarSubqueryCacheEntry = (SmallVec<[CompactArc<str>; 2]>, Value);
60
61thread_local! {
62 static SCALAR_SUBQUERY_CACHE: RefCell<LruCache<String, ScalarSubqueryCacheEntry>> =
63 RefCell::new(LruCache::new(NonZeroUsize::new(SCALAR_SUBQUERY_CACHE_SIZE).unwrap()));
64}
65
66pub fn clear_scalar_subquery_cache() {
70 SCALAR_SUBQUERY_CACHE.with(|cache| {
71 cache.borrow_mut().clear();
72 });
73}
74
75#[inline]
78pub fn invalidate_scalar_subquery_cache_for_table(table_name: &str) {
79 SCALAR_SUBQUERY_CACHE.with(|cache| {
80 let mut c = cache.borrow_mut();
81 if c.is_empty() {
82 return;
83 }
84 let keys_to_remove: Vec<String> = c
86 .iter()
87 .filter(|(_, (tables, _))| tables.iter().any(|t| t.eq_ignore_ascii_case(table_name)))
88 .map(|(k, _)| k.clone())
89 .collect();
90 for key in keys_to_remove {
91 c.pop(&key);
92 }
93 });
94}
95
96pub fn get_cached_scalar_subquery(key: &str) -> Option<Value> {
98 SCALAR_SUBQUERY_CACHE.with(|cache| cache.borrow_mut().get(key).map(|(_, v)| v.clone()))
99}
100
101pub fn cache_scalar_subquery(key: String, tables: SmallVec<[CompactArc<str>; 2]>, value: Value) {
103 SCALAR_SUBQUERY_CACHE.with(|cache| {
104 cache.borrow_mut().put(key, (tables, value));
105 });
106}
107
108type InSubqueryCacheEntry = (SmallVec<[CompactArc<str>; 2]>, Vec<Value>);
116
117thread_local! {
118 static IN_SUBQUERY_CACHE: RefCell<LruCache<String, InSubqueryCacheEntry>> =
119 RefCell::new(LruCache::new(NonZeroUsize::new(IN_SUBQUERY_CACHE_SIZE).unwrap()));
120}
121
122pub fn clear_in_subquery_cache() {
126 IN_SUBQUERY_CACHE.with(|cache| {
127 cache.borrow_mut().clear();
128 });
129}
130
131#[inline]
134pub fn invalidate_in_subquery_cache_for_table(table_name: &str) {
135 IN_SUBQUERY_CACHE.with(|cache| {
136 let mut c = cache.borrow_mut();
137 if c.is_empty() {
138 return;
139 }
140 let keys_to_remove: Vec<String> = c
142 .iter()
143 .filter(|(_, (tables, _))| tables.iter().any(|t| t.eq_ignore_ascii_case(table_name)))
144 .map(|(k, _)| k.clone())
145 .collect();
146 for key in keys_to_remove {
147 c.pop(&key);
148 }
149 });
150}
151
152pub fn get_cached_in_subquery(key: &str) -> Option<Vec<Value>> {
154 IN_SUBQUERY_CACHE.with(|cache| cache.borrow_mut().get(key).map(|(_, v)| v.clone()))
155}
156
157pub fn cache_in_subquery(key: String, tables: SmallVec<[CompactArc<str>; 2]>, values: Vec<Value>) {
159 IN_SUBQUERY_CACHE.with(|cache| {
160 cache.borrow_mut().put(key, (tables, values));
161 });
162}
163
164use crate::parser::ast::{Expression, SelectStatement};
165
166pub fn extract_table_names_for_cache(stmt: &SelectStatement) -> SmallVec<[CompactArc<str>; 2]> {
170 let mut tables = SmallVec::new();
171 if let Some(ref table_expr) = stmt.table_expr {
172 collect_real_table_names(table_expr, &mut tables);
173 }
174 tables
175}
176
177fn collect_real_table_names(source: &Expression, tables: &mut SmallVec<[CompactArc<str>; 2]>) {
179 match source {
180 Expression::TableSource(ts) => {
181 tables.push(CompactArc::from(ts.name.value_lower.as_str()));
183 }
184 Expression::JoinSource(js) => {
185 collect_real_table_names(&js.left, tables);
186 collect_real_table_names(&js.right, tables);
187 }
188 Expression::SubquerySource(ss) => {
189 if let Some(ref table_expr) = ss.subquery.table_expr {
191 collect_real_table_names(table_expr, tables);
192 }
193 }
194 _ => {}
195 }
196}
197
198use ahash::AHashMap;
203use std::hash::{Hash, Hasher};
204
205type SemiJoinCacheEntry = (CompactArc<str>, CompactArc<ValueSet>);
207
208#[inline]
210pub fn compute_semi_join_cache_key(table: &str, column: &str, pred_hash: u64) -> u64 {
211 let mut hasher = rustc_hash::FxHasher::default();
212 table.hash(&mut hasher);
213 column.hash(&mut hasher);
214 pred_hash.hash(&mut hasher);
215 hasher.finish()
216}
217
218thread_local! {
219 static SEMI_JOIN_CACHE: RefCell<LruCache<u64, SemiJoinCacheEntry>> =
220 RefCell::new(LruCache::new(NonZeroUsize::new(SEMI_JOIN_CACHE_SIZE).unwrap()));
221}
222
223pub fn clear_semi_join_cache() {
227 SEMI_JOIN_CACHE.with(|cache| {
228 cache.borrow_mut().clear();
229 });
230}
231
232#[inline]
235pub fn invalidate_semi_join_cache_for_table(table_name: &str) {
236 SEMI_JOIN_CACHE.with(|cache| {
237 let mut c = cache.borrow_mut();
238 if c.is_empty() {
239 return;
240 }
241 let keys_to_remove: Vec<u64> = c
243 .iter()
244 .filter(|(_, (key_table, _))| key_table.eq_ignore_ascii_case(table_name))
245 .map(|(k, _)| *k)
246 .collect();
247 for key in keys_to_remove {
248 c.pop(&key);
249 }
250 });
251}
252
253#[inline]
255pub fn get_cached_semi_join(key_hash: u64) -> Option<CompactArc<ValueSet>> {
256 SEMI_JOIN_CACHE.with(|cache| {
257 cache
258 .borrow_mut()
259 .get(&key_hash)
260 .map(|(_, v)| CompactArc::clone(v))
261 })
262}
263
264#[inline]
266pub fn cache_semi_join_arc(key_hash: u64, table: &str, values: CompactArc<ValueSet>) {
267 SEMI_JOIN_CACHE.with(|cache| {
268 cache
269 .borrow_mut()
270 .put(key_hash, (CompactArc::from(table), values));
271 });
272}
273
274use super::expression::RowFilter;
278thread_local! {
279 static EXISTS_PREDICATE_CACHE: RefCell<FxHashMap<String, RowFilter>> = RefCell::new(FxHashMap::default());
280}
281
282pub fn clear_exists_predicate_cache() {
284 EXISTS_PREDICATE_CACHE.with(|cache| {
285 cache.borrow_mut().clear();
286 });
287}
288
289pub fn get_cached_exists_predicate(key: &str) -> Option<RowFilter> {
291 EXISTS_PREDICATE_CACHE.with(|cache| cache.borrow().get(key).cloned())
292}
293
294pub fn cache_exists_predicate(key: String, filter: RowFilter) {
296 EXISTS_PREDICATE_CACHE.with(|cache| {
297 cache.borrow_mut().insert(key, filter);
298 });
299}
300
301use crate::storage::traits::Index;
304thread_local! {
305 static EXISTS_INDEX_CACHE: RefCell<FxHashMap<String, std::sync::Arc<dyn Index>>> = RefCell::new(FxHashMap::default());
306}
307
308pub fn clear_exists_index_cache() {
310 EXISTS_INDEX_CACHE.with(|cache| {
311 cache.borrow_mut().clear();
312 });
313}
314
315pub fn get_cached_exists_index(key: &str) -> Option<std::sync::Arc<dyn Index>> {
317 EXISTS_INDEX_CACHE.with(|cache| cache.borrow().get(key).cloned())
318}
319
320pub fn cache_exists_index(key: String, index: std::sync::Arc<dyn Index>) {
322 EXISTS_INDEX_CACHE.with(|cache| {
323 cache.borrow_mut().insert(key, index);
324 });
325}
326
327pub type RowFetcher = Box<dyn Fn(&[i64]) -> crate::core::RowVec + Send + Sync>;
329
330pub type RowCounter = Box<dyn Fn(&[i64]) -> usize + Send + Sync>;
333
334thread_local! {
337 static EXISTS_FETCHER_CACHE: RefCell<FxHashMap<String, std::sync::Arc<RowFetcher>>> = RefCell::new(FxHashMap::default());
338}
339
340thread_local! {
343 static COUNT_COUNTER_CACHE: RefCell<FxHashMap<String, std::sync::Arc<RowCounter>>> = RefCell::new(FxHashMap::default());
344}
345
346pub fn clear_exists_fetcher_cache() {
348 EXISTS_FETCHER_CACHE.with(|cache| {
349 cache.borrow_mut().clear();
350 });
351}
352
353pub fn clear_count_counter_cache() {
355 COUNT_COUNTER_CACHE.with(|cache| {
356 cache.borrow_mut().clear();
357 });
358}
359
360pub fn get_cached_exists_fetcher(key: &str) -> Option<std::sync::Arc<RowFetcher>> {
362 EXISTS_FETCHER_CACHE.with(|cache| cache.borrow().get(key).cloned())
363}
364
365pub fn get_cached_count_counter(key: &str) -> Option<std::sync::Arc<RowCounter>> {
367 COUNT_COUNTER_CACHE.with(|cache| cache.borrow().get(key).cloned())
368}
369
370pub fn cache_exists_fetcher(key: String, fetcher: RowFetcher) {
372 EXISTS_FETCHER_CACHE.with(|cache| {
373 cache.borrow_mut().insert(key, std::sync::Arc::new(fetcher));
374 });
375}
376
377pub fn cache_count_counter(key: String, counter: RowCounter) {
379 COUNT_COUNTER_CACHE.with(|cache| {
380 cache.borrow_mut().insert(key, std::sync::Arc::new(counter));
381 });
382}
383
384thread_local! {
387 static EXISTS_SCHEMA_CACHE: RefCell<FxHashMap<String, CompactArc<Vec<String>>>> = RefCell::new(FxHashMap::default());
388}
389
390pub fn clear_exists_schema_cache() {
392 EXISTS_SCHEMA_CACHE.with(|cache| {
393 cache.borrow_mut().clear();
394 });
395}
396
397pub fn get_cached_exists_schema(key: &str) -> Option<CompactArc<Vec<String>>> {
399 EXISTS_SCHEMA_CACHE.with(|cache| cache.borrow().get(key).cloned())
400}
401
402pub fn cache_exists_schema(key: String, columns: CompactArc<Vec<String>>) {
404 EXISTS_SCHEMA_CACHE.with(|cache| {
405 cache.borrow_mut().insert(key, columns);
406 });
407}
408
409thread_local! {
412 static EXISTS_PRED_KEY_CACHE: RefCell<FxHashMap<usize, String>> = RefCell::new(FxHashMap::default());
413}
414
415pub fn clear_exists_pred_key_cache() {
417 EXISTS_PRED_KEY_CACHE.with(|cache| {
418 cache.borrow_mut().clear();
419 });
420}
421
422#[inline]
424pub fn get_cached_exists_pred_key(subquery_ptr: usize) -> Option<String> {
425 EXISTS_PRED_KEY_CACHE.with(|cache| cache.borrow().get(&subquery_ptr).cloned())
426}
427
428#[inline]
430pub fn cache_exists_pred_key(subquery_ptr: usize, pred_key: String) {
431 EXISTS_PRED_KEY_CACHE.with(|cache| {
432 cache.borrow_mut().insert(subquery_ptr, pred_key);
433 });
434}
435
436thread_local! {
440 static BATCH_AGGREGATE_CACHE: RefCell<FxHashMap<String, CompactArc<ValueMap<Value>>>> = RefCell::new(FxHashMap::default());
441}
442
443pub fn clear_batch_aggregate_cache() {
445 BATCH_AGGREGATE_CACHE.with(|cache| {
446 let mut c = cache.borrow_mut();
447 c.clear();
448 c.shrink_to_fit();
449 });
450}
451
452pub fn get_cached_batch_aggregate(key: &str) -> Option<CompactArc<ValueMap<Value>>> {
454 BATCH_AGGREGATE_CACHE.with(|cache| cache.borrow().get(key).cloned())
455}
456
457pub fn cache_batch_aggregate(key: String, values: ValueMap<Value>) {
459 BATCH_AGGREGATE_CACHE.with(|cache| {
460 cache.borrow_mut().insert(key, CompactArc::new(values));
461 });
462}
463
464#[derive(Clone)]
466pub struct BatchAggregateLookupInfo {
467 pub cache_key: String,
469 pub outer_column_lower: String,
471 pub outer_qualified_lower: Option<String>,
473 pub is_count: bool,
475}
476
477thread_local! {
481 static BATCH_AGGREGATE_INFO_CACHE: RefCell<FxHashMap<usize, Option<Arc<BatchAggregateLookupInfo>>>> = RefCell::new(FxHashMap::default());
482}
483
484pub fn clear_batch_aggregate_info_cache() {
486 BATCH_AGGREGATE_INFO_CACHE.with(|cache| {
487 let mut c = cache.borrow_mut();
488 c.clear();
489 c.shrink_to_fit();
490 });
491}
492
493#[inline]
496pub fn get_cached_batch_aggregate_info(
497 subquery_ptr: usize,
498) -> Option<Option<Arc<BatchAggregateLookupInfo>>> {
499 BATCH_AGGREGATE_INFO_CACHE.with(|cache| cache.borrow().get(&subquery_ptr).cloned())
500}
501
502#[inline]
505pub fn cache_batch_aggregate_info(
506 subquery_ptr: usize,
507 info: Option<BatchAggregateLookupInfo>,
508) -> Option<Arc<BatchAggregateLookupInfo>> {
509 let arc_info = info.map(Arc::new);
510 let result = arc_info.clone();
511 BATCH_AGGREGATE_INFO_CACHE.with(|cache| {
512 cache.borrow_mut().insert(subquery_ptr, arc_info);
513 });
514 result
515}
516
517#[derive(Clone)]
520pub struct ExistsCorrelationInfo {
521 pub outer_column: String,
523 pub outer_table: Option<String>,
525 pub inner_column: String,
527 pub inner_table: String,
529 pub outer_column_lower: String,
531 pub outer_qualified_lower: Option<String>,
533 pub additional_predicate: Option<Expression>,
535 pub index_cache_key: String,
537}
538
539thread_local! {
543 static EXISTS_CORRELATION_CACHE: RefCell<FxHashMap<usize, Option<Arc<ExistsCorrelationInfo>>>> = RefCell::new(FxHashMap::default());
544}
545
546pub fn clear_exists_correlation_cache() {
548 EXISTS_CORRELATION_CACHE.with(|cache| {
549 cache.borrow_mut().clear();
550 });
551}
552
553pub fn clear_all_thread_local_caches() {
557 SCALAR_SUBQUERY_CACHE.with(|cache| {
559 cache.borrow_mut().clear();
560 });
561 IN_SUBQUERY_CACHE.with(|cache| {
562 cache.borrow_mut().clear();
563 });
564 SEMI_JOIN_CACHE.with(|cache| {
565 cache.borrow_mut().clear();
566 });
567 EXISTS_PREDICATE_CACHE.with(|cache| {
569 let mut c = cache.borrow_mut();
570 c.clear();
571 c.shrink_to_fit();
572 });
573 EXISTS_INDEX_CACHE.with(|cache| {
574 let mut c = cache.borrow_mut();
575 c.clear();
576 c.shrink_to_fit();
577 });
578 EXISTS_FETCHER_CACHE.with(|cache| {
579 let mut c = cache.borrow_mut();
580 c.clear();
581 c.shrink_to_fit();
582 });
583 COUNT_COUNTER_CACHE.with(|cache| {
584 let mut c = cache.borrow_mut();
585 c.clear();
586 c.shrink_to_fit();
587 });
588 EXISTS_SCHEMA_CACHE.with(|cache| {
589 let mut c = cache.borrow_mut();
590 c.clear();
591 c.shrink_to_fit();
592 });
593 EXISTS_PRED_KEY_CACHE.with(|cache| {
594 let mut c = cache.borrow_mut();
595 c.clear();
596 c.shrink_to_fit();
597 });
598 BATCH_AGGREGATE_CACHE.with(|cache| {
599 let mut c = cache.borrow_mut();
600 c.clear();
601 c.shrink_to_fit();
602 });
603 BATCH_AGGREGATE_INFO_CACHE.with(|cache| {
604 let mut c = cache.borrow_mut();
605 c.clear();
606 c.shrink_to_fit();
607 });
608 EXISTS_CORRELATION_CACHE.with(|cache| {
609 let mut c = cache.borrow_mut();
610 c.clear();
611 c.shrink_to_fit();
612 });
613
614 crate::storage::expression::clear_regex_cache();
616 crate::storage::expression::clear_like_regex_cache();
617
618 crate::core::row_vec::clear_row_vec_pool();
620 crate::core::row_vec::clear_row_id_vec_pool();
621
622 crate::storage::mvcc::clear_version_map_pools();
624
625 super::expression::clear_program_cache();
627 super::query_classification::clear_classification_cache();
628}
629
630#[inline]
633pub fn get_cached_exists_correlation(
634 subquery_ptr: usize,
635) -> Option<Option<Arc<ExistsCorrelationInfo>>> {
636 EXISTS_CORRELATION_CACHE.with(|cache| cache.borrow().get(&subquery_ptr).cloned())
637}
638
639#[inline]
642pub fn cache_exists_correlation(
643 subquery_ptr: usize,
644 info: Option<ExistsCorrelationInfo>,
645) -> Option<Arc<ExistsCorrelationInfo>> {
646 let arc_info = info.map(Arc::new);
647 let result = arc_info.clone();
648 EXISTS_CORRELATION_CACHE.with(|cache| {
649 cache.borrow_mut().insert(subquery_ptr, arc_info);
650 });
651 result
652}
653
654#[derive(Debug, Clone)]
662pub struct ExecutionContext {
663 params: CompactArc<ParamVec>,
665 named_params: Arc<FxHashMap<String, Value>>,
667 auto_commit: bool,
669 cancelled: Arc<AtomicBool>,
671 current_database: Arc<Option<String>>,
673 session_vars: Arc<AHashMap<String, Value>>,
675 timeout_ms: u64,
677 view_depth: usize,
679 pub(crate) query_depth: usize,
682 pub(crate) outer_row: Option<FxHashMap<CompactArc<str>, Value>>,
687 outer_columns: Option<CompactArc<Vec<String>>>,
689 cte_data: Option<Arc<CteDataMap>>,
692 transaction_id: Option<u64>,
694}
695
696type CteData = (CompactArc<Vec<String>>, CompactArc<Vec<(i64, Row)>>);
699
700type CteDataMap = StringMap<CteData>;
704
705impl Default for ExecutionContext {
706 fn default() -> Self {
707 Self::new()
708 }
709}
710
711impl ExecutionContext {
712 pub fn new() -> Self {
715 Self {
716 params: EMPTY_PARAMS.clone(),
717 named_params: EMPTY_NAMED_PARAMS.clone(),
718 auto_commit: true,
719 cancelled: Arc::new(AtomicBool::new(false)), current_database: EMPTY_DATABASE.clone(),
721 session_vars: EMPTY_SESSION_VARS.clone(),
722 timeout_ms: 0,
723 view_depth: 0,
724 query_depth: 0,
725 outer_row: None,
726 outer_columns: None,
727 cte_data: None,
728 transaction_id: None,
729 }
730 }
731
732 pub fn with_params(params: ParamVec) -> Self {
734 Self {
735 params: CompactArc::new(params),
736 ..Self::new()
737 }
738 }
739
740 pub fn with_named_params(named_params: FxHashMap<String, Value>) -> Self {
742 Self {
743 named_params: Arc::new(named_params),
744 ..Self::new()
745 }
746 }
747
748 pub fn get_param(&self, index: usize) -> Option<&Value> {
750 if index == 0 || index > self.params.len() {
751 None
752 } else {
753 self.params.get(index - 1)
754 }
755 }
756
757 pub fn get_named_param(&self, name: &str) -> Option<&Value> {
759 self.named_params.get(name)
760 }
761
762 pub fn params(&self) -> &[Value] {
764 &self.params
765 }
766
767 pub fn params_arc(&self) -> &CompactArc<ParamVec> {
770 &self.params
771 }
772
773 pub fn named_params(&self) -> &FxHashMap<String, Value> {
775 &self.named_params
776 }
777
778 pub fn named_params_arc(&self) -> &Arc<FxHashMap<String, Value>> {
781 &self.named_params
782 }
783
784 pub fn param_count(&self) -> usize {
786 self.params.len()
787 }
788
789 pub fn set_params(&mut self, params: ParamVec) {
791 self.params = CompactArc::new(params);
792 }
793
794 pub fn add_param(&mut self, value: Value) {
796 CompactArc::make_mut(&mut self.params).push(value);
797 }
798
799 pub fn set_named_param(&mut self, name: impl Into<String>, value: Value) {
801 Arc::make_mut(&mut self.named_params).insert(name.into(), value);
802 }
803
804 pub fn auto_commit(&self) -> bool {
806 self.auto_commit
807 }
808
809 pub fn set_auto_commit(&mut self, auto_commit: bool) {
811 self.auto_commit = auto_commit;
812 }
813
814 pub fn is_cancelled(&self) -> bool {
816 self.cancelled.load(Ordering::Relaxed)
817 }
818
819 pub fn cancel(&self) {
821 self.cancelled.store(true, Ordering::Relaxed);
822 }
823
824 pub fn cancellation_handle(&self) -> CancellationHandle {
826 CancellationHandle {
827 cancelled: self.cancelled.clone(),
828 }
829 }
830
831 pub fn current_database(&self) -> Option<&str> {
833 self.current_database.as_ref().as_deref()
834 }
835
836 pub fn set_current_database(&mut self, database: impl Into<String>) {
838 self.current_database = Arc::new(Some(database.into()));
839 }
840
841 pub fn get_session_var(&self, name: &str) -> Option<&Value> {
843 self.session_vars.get(name)
844 }
845
846 pub fn set_session_var(&mut self, name: impl Into<String>, value: Value) {
848 Arc::make_mut(&mut self.session_vars).insert(name.into(), value);
849 }
850
851 pub fn timeout_ms(&self) -> u64 {
853 self.timeout_ms
854 }
855
856 pub fn set_timeout_ms(&mut self, timeout_ms: u64) {
858 self.timeout_ms = timeout_ms;
859 }
860
861 pub fn has_timeout(&self) -> bool {
863 self.timeout_ms > 0
864 }
865
866 pub fn view_depth(&self) -> usize {
868 self.view_depth
869 }
870
871 pub fn with_incremented_view_depth(&self) -> Self {
875 Self {
876 params: self.params.clone(),
877 named_params: self.named_params.clone(),
878 auto_commit: self.auto_commit,
879 cancelled: self.cancelled.clone(),
880 current_database: self.current_database.clone(),
881 session_vars: self.session_vars.clone(),
882 timeout_ms: self.timeout_ms,
883 view_depth: self.view_depth + 1,
884 query_depth: self.query_depth + 1, outer_row: self.outer_row.clone(),
886 outer_columns: self.outer_columns.clone(),
887 cte_data: self.cte_data.clone(),
888 transaction_id: self.transaction_id,
889 }
890 }
891
892 pub fn with_incremented_query_depth(&self) -> Self {
895 Self {
896 params: self.params.clone(),
897 named_params: self.named_params.clone(),
898 auto_commit: self.auto_commit,
899 cancelled: self.cancelled.clone(),
900 current_database: self.current_database.clone(),
901 session_vars: self.session_vars.clone(),
902 timeout_ms: self.timeout_ms,
903 view_depth: self.view_depth,
904 query_depth: self.query_depth + 1,
905 outer_row: self.outer_row.clone(),
906 outer_columns: self.outer_columns.clone(),
907 cte_data: self.cte_data.clone(),
908 transaction_id: self.transaction_id,
909 }
910 }
911
912 pub fn outer_row(&self) -> Option<&FxHashMap<CompactArc<str>, Value>> {
914 self.outer_row.as_ref()
915 }
916
917 pub fn outer_columns(&self) -> Option<&[String]> {
919 self.outer_columns.as_ref().map(|v| v.as_slice())
920 }
921
922 pub fn with_outer_row(
926 &self,
927 outer_row: FxHashMap<CompactArc<str>, Value>,
928 outer_columns: CompactArc<Vec<String>>,
929 ) -> Self {
930 Self {
931 params: self.params.clone(), named_params: self.named_params.clone(), auto_commit: self.auto_commit,
934 cancelled: self.cancelled.clone(), current_database: self.current_database.clone(), session_vars: self.session_vars.clone(), timeout_ms: self.timeout_ms,
938 view_depth: self.view_depth,
939 query_depth: self.query_depth + 1, outer_row: Some(outer_row),
941 outer_columns: Some(outer_columns), cte_data: self.cte_data.clone(), transaction_id: self.transaction_id,
944 }
945 }
946
947 pub fn get_cte(&self, name: &str) -> Option<&CteData> {
950 self.cte_data
951 .as_ref()
952 .and_then(|data| data.get(&name.to_lowercase()))
953 }
954
955 #[inline]
959 pub fn get_cte_by_lower(&self, name_lower: &str) -> Option<&CteData> {
960 self.cte_data.as_ref().and_then(|data| data.get(name_lower))
961 }
962
963 pub fn has_cte(&self, name: &str) -> bool {
965 self.cte_data
966 .as_ref()
967 .is_some_and(|data| data.contains_key(&name.to_lowercase()))
968 }
969
970 #[inline]
973 pub fn has_cte_by_lower(&self, name_lower: &str) -> bool {
974 self.cte_data
975 .as_ref()
976 .is_some_and(|data| data.contains_key(name_lower))
977 }
978
979 pub fn with_cte_data(&self, cte_data: Arc<CteDataMap>) -> Self {
982 Self {
983 params: self.params.clone(),
984 named_params: self.named_params.clone(),
985 auto_commit: self.auto_commit,
986 cancelled: self.cancelled.clone(),
987 current_database: self.current_database.clone(),
988 session_vars: self.session_vars.clone(),
989 timeout_ms: self.timeout_ms,
990 view_depth: self.view_depth,
991 query_depth: self.query_depth,
992 outer_row: self.outer_row.clone(),
993 outer_columns: self.outer_columns.clone(),
994 cte_data: Some(cte_data),
995 transaction_id: self.transaction_id,
996 }
997 }
998
999 pub fn transaction_id(&self) -> Option<u64> {
1001 self.transaction_id
1002 }
1003
1004 pub fn set_transaction_id(&mut self, txn_id: u64) {
1006 self.transaction_id = Some(txn_id);
1007 }
1008
1009 pub fn with_transaction_id(&self, txn_id: u64) -> Self {
1011 Self {
1012 params: self.params.clone(),
1013 named_params: self.named_params.clone(),
1014 auto_commit: self.auto_commit,
1015 cancelled: self.cancelled.clone(),
1016 current_database: self.current_database.clone(),
1017 session_vars: self.session_vars.clone(),
1018 timeout_ms: self.timeout_ms,
1019 view_depth: self.view_depth,
1020 query_depth: self.query_depth,
1021 outer_row: self.outer_row.clone(),
1022 outer_columns: self.outer_columns.clone(),
1023 cte_data: self.cte_data.clone(),
1024 transaction_id: Some(txn_id),
1025 }
1026 }
1027
1028 pub fn check_cancelled(&self) -> Result<()> {
1030 if self.is_cancelled() {
1031 Err(crate::core::Error::QueryCancelled)
1032 } else {
1033 Ok(())
1034 }
1035 }
1036}
1037
1038#[derive(Debug, Clone)]
1040pub struct CancellationHandle {
1041 cancelled: Arc<AtomicBool>,
1042}
1043
1044impl CancellationHandle {
1045 pub fn cancel(&self) {
1047 self.cancelled.store(true, Ordering::Relaxed);
1048 }
1049
1050 pub fn is_cancelled(&self) -> bool {
1052 self.cancelled.load(Ordering::Relaxed)
1053 }
1054}
1055
1056struct TimeoutEntry {
1065 deadline: Instant,
1067 id: u64,
1069 cancel_handle: CancellationHandle,
1071 cancelled: Arc<AtomicBool>,
1073}
1074
1075impl PartialEq for TimeoutEntry {
1076 fn eq(&self, other: &Self) -> bool {
1077 self.deadline == other.deadline && self.id == other.id
1078 }
1079}
1080
1081impl Eq for TimeoutEntry {}
1082
1083impl PartialOrd for TimeoutEntry {
1084 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
1085 Some(self.cmp(other))
1086 }
1087}
1088
1089impl Ord for TimeoutEntry {
1090 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
1091 other.deadline.cmp(&self.deadline)
1093 }
1094}
1095
1096struct TimeoutManagerState {
1098 timeouts: BinaryHeap<TimeoutEntry>,
1100 shutdown: bool,
1102}
1103
1104struct TimeoutManager {
1106 state: Mutex<TimeoutManagerState>,
1108 condvar: Condvar,
1110 next_id: AtomicU64,
1112}
1113
1114impl TimeoutManager {
1115 fn new() -> Arc<Self> {
1117 let manager = Arc::new(Self {
1118 state: Mutex::new(TimeoutManagerState {
1119 timeouts: BinaryHeap::new(),
1120 shutdown: false,
1121 }),
1122 condvar: Condvar::new(),
1123 next_id: AtomicU64::new(1),
1124 });
1125
1126 let manager_clone = Arc::clone(&manager);
1128 std::thread::Builder::new()
1129 .name("stoolap-timeout-manager".to_string())
1130 .spawn(move || {
1131 manager_clone.run();
1132 })
1133 .expect("Failed to spawn timeout manager thread");
1134
1135 manager
1136 }
1137
1138 fn run(&self) {
1140 loop {
1141 let mut state = self.state.lock().unwrap();
1142
1143 if state.shutdown && state.timeouts.is_empty() {
1145 return;
1146 }
1147
1148 let now = Instant::now();
1150 while let Some(entry) = state.timeouts.peek() {
1151 if entry.deadline <= now {
1152 let entry = state.timeouts.pop().unwrap();
1153 if !entry.cancelled.load(Ordering::Relaxed) {
1155 entry.cancel_handle.cancel();
1156 }
1157 } else {
1158 break;
1159 }
1160 }
1161
1162 let wait_duration = if let Some(entry) = state.timeouts.peek() {
1164 entry.deadline.saturating_duration_since(now)
1165 } else {
1166 Duration::from_secs(3600) };
1169
1170 if wait_duration.is_zero() {
1172 continue; }
1174 let (new_state, _timeout_result) =
1175 self.condvar.wait_timeout(state, wait_duration).unwrap();
1176 state = new_state;
1177
1178 if state.shutdown && state.timeouts.is_empty() {
1180 return;
1181 }
1182 }
1183 }
1184
1185 fn register(
1187 &self,
1188 timeout_ms: u64,
1189 cancel_handle: CancellationHandle,
1190 cancelled: Arc<AtomicBool>,
1191 ) -> u64 {
1192 let id = self.next_id.fetch_add(1, Ordering::Relaxed);
1193 let deadline = Instant::now() + Duration::from_millis(timeout_ms);
1194
1195 let entry = TimeoutEntry {
1196 deadline,
1197 id,
1198 cancel_handle,
1199 cancelled,
1200 };
1201
1202 let mut state = self.state.lock().unwrap();
1203 let was_empty = state.timeouts.is_empty();
1204 let is_earliest = state.timeouts.peek().is_none_or(|e| deadline < e.deadline);
1205
1206 state.timeouts.push(entry);
1207
1208 if was_empty || is_earliest {
1210 self.condvar.notify_one();
1211 }
1212
1213 id
1214 }
1215}
1216
1217fn global_timeout_manager() -> &'static Arc<TimeoutManager> {
1219 use std::sync::OnceLock;
1220 static MANAGER: OnceLock<Arc<TimeoutManager>> = OnceLock::new();
1221 MANAGER.get_or_init(TimeoutManager::new)
1222}
1223
1224pub struct TimeoutGuard {
1227 cancelled: Arc<AtomicBool>,
1229}
1230
1231impl TimeoutGuard {
1232 pub fn new(ctx: &ExecutionContext) -> Option<Self> {
1235 let timeout_ms = ctx.timeout_ms();
1236 if timeout_ms == 0 {
1237 return None;
1238 }
1239
1240 let cancel_handle = ctx.cancellation_handle();
1241 let cancelled = Arc::new(AtomicBool::new(false));
1242
1243 global_timeout_manager().register(timeout_ms, cancel_handle, Arc::clone(&cancelled));
1245
1246 Some(Self { cancelled })
1247 }
1248}
1249
1250impl Drop for TimeoutGuard {
1251 fn drop(&mut self) {
1252 self.cancelled.store(true, Ordering::Relaxed);
1254 }
1255}
1256
1257pub struct ExecutionContextBuilder {
1259 ctx: ExecutionContext,
1260}
1261
1262impl ExecutionContextBuilder {
1263 pub fn new() -> Self {
1265 Self {
1266 ctx: ExecutionContext::new(),
1267 }
1268 }
1269
1270 pub fn params(mut self, params: ParamVec) -> Self {
1272 self.ctx.params = CompactArc::new(params);
1273 self
1274 }
1275
1276 pub fn param(self, value: Value) -> Self {
1278 let mut v = (*self.ctx.params).clone();
1279 v.push(value);
1280 Self {
1281 ctx: ExecutionContext {
1282 params: CompactArc::new(v),
1283 ..self.ctx
1284 },
1285 }
1286 }
1287
1288 pub fn named_param(self, name: impl Into<String>, value: Value) -> Self {
1290 Self {
1291 ctx: ExecutionContext {
1292 named_params: Arc::new({
1293 let mut m = (*self.ctx.named_params).clone();
1294 m.insert(name.into(), value);
1295 m
1296 }),
1297 ..self.ctx
1298 },
1299 }
1300 }
1301
1302 pub fn auto_commit(mut self, auto_commit: bool) -> Self {
1304 self.ctx.auto_commit = auto_commit;
1305 self
1306 }
1307
1308 pub fn database(mut self, database: impl Into<String>) -> Self {
1310 self.ctx.current_database = Arc::new(Some(database.into()));
1311 self
1312 }
1313
1314 pub fn session_var(self, name: impl Into<String>, value: Value) -> Self {
1316 Self {
1317 ctx: ExecutionContext {
1318 session_vars: Arc::new({
1319 let mut m = (*self.ctx.session_vars).clone();
1320 m.insert(name.into(), value);
1321 m
1322 }),
1323 ..self.ctx
1324 },
1325 }
1326 }
1327
1328 pub fn timeout_ms(mut self, timeout_ms: u64) -> Self {
1330 self.ctx.timeout_ms = timeout_ms;
1331 self
1332 }
1333
1334 pub fn build(self) -> ExecutionContext {
1336 self.ctx
1337 }
1338}
1339
1340impl Default for ExecutionContextBuilder {
1341 fn default() -> Self {
1342 Self::new()
1343 }
1344}
1345
1346#[cfg(test)]
1347mod tests {
1348 use super::*;
1349 use rustc_hash::FxHashMap;
1350
1351 #[test]
1352 fn test_context_new() {
1353 let ctx = ExecutionContext::new();
1354 assert_eq!(ctx.param_count(), 0);
1355 assert!(ctx.auto_commit());
1356 assert!(!ctx.is_cancelled());
1357 }
1358
1359 #[test]
1360 fn test_context_with_params() {
1361 let ctx = ExecutionContext::with_params(smallvec::smallvec![
1362 Value::Integer(1),
1363 Value::text("hello")
1364 ]);
1365 assert_eq!(ctx.param_count(), 2);
1366 assert_eq!(ctx.get_param(1), Some(&Value::Integer(1)));
1367 assert_eq!(ctx.get_param(2), Some(&Value::text("hello")));
1368 assert_eq!(ctx.get_param(0), None); assert_eq!(ctx.get_param(3), None); }
1371
1372 #[test]
1373 fn test_context_named_params() {
1374 let mut params = FxHashMap::default();
1375 params.insert("name".to_string(), Value::text("Alice"));
1376 params.insert("age".to_string(), Value::Integer(30));
1377
1378 let ctx = ExecutionContext::with_named_params(params);
1379 assert_eq!(ctx.get_named_param("name"), Some(&Value::text("Alice")));
1380 assert_eq!(ctx.get_named_param("age"), Some(&Value::Integer(30)));
1381 assert_eq!(ctx.get_named_param("unknown"), None);
1382 }
1383
1384 #[test]
1385 fn test_context_cancellation() {
1386 let ctx = ExecutionContext::new();
1387 assert!(!ctx.is_cancelled());
1388
1389 let handle = ctx.cancellation_handle();
1390 assert!(!handle.is_cancelled());
1391
1392 handle.cancel();
1393 assert!(ctx.is_cancelled());
1394 assert!(handle.is_cancelled());
1395 }
1396
1397 #[test]
1398 fn test_context_check_cancelled() {
1399 let ctx = ExecutionContext::new();
1400 assert!(ctx.check_cancelled().is_ok());
1401
1402 ctx.cancel();
1403 assert!(ctx.check_cancelled().is_err());
1404 }
1405
1406 #[test]
1407 fn test_context_session_vars() {
1408 let mut ctx = ExecutionContext::new();
1409 ctx.set_session_var("timezone", Value::text("UTC"));
1410
1411 assert_eq!(ctx.get_session_var("timezone"), Some(&Value::text("UTC")));
1412 assert_eq!(ctx.get_session_var("unknown"), None);
1413 }
1414
1415 #[test]
1416 fn test_context_builder() {
1417 let ctx = ExecutionContextBuilder::new()
1418 .params(smallvec::smallvec![Value::Integer(1)])
1419 .param(Value::Integer(2))
1420 .named_param("name", Value::text("test"))
1421 .auto_commit(false)
1422 .database("mydb")
1423 .timeout_ms(5000)
1424 .build();
1425
1426 assert_eq!(ctx.param_count(), 2);
1427 assert_eq!(ctx.get_param(1), Some(&Value::Integer(1)));
1428 assert_eq!(ctx.get_param(2), Some(&Value::Integer(2)));
1429 assert_eq!(ctx.get_named_param("name"), Some(&Value::text("test")));
1430 assert!(!ctx.auto_commit());
1431 assert_eq!(ctx.current_database(), Some("mydb"));
1432 assert_eq!(ctx.timeout_ms(), 5000);
1433 }
1434}