use crate::common::time_compat::Instant;
use lru::LruCache;
use rustc_hash::FxHashMap;
use std::cell::RefCell;
use std::collections::BinaryHeap;
use std::num::NonZeroUsize;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Condvar, LazyLock, Mutex};
use std::time::Duration;
const SCALAR_SUBQUERY_CACHE_SIZE: usize = 128;
const IN_SUBQUERY_CACHE_SIZE: usize = 128;
const SEMI_JOIN_CACHE_SIZE: usize = 256;
use crate::api::params::ParamVec;
use crate::common::{CompactArc, StringMap};
use crate::core::{Result, Row, Value, ValueMap, ValueSet};
static EMPTY_PARAMS: LazyLock<CompactArc<ParamVec>> =
LazyLock::new(|| CompactArc::new(ParamVec::new()));
static EMPTY_NAMED_PARAMS: LazyLock<Arc<FxHashMap<String, Value>>> =
LazyLock::new(|| Arc::new(FxHashMap::default()));
static EMPTY_DATABASE: LazyLock<Arc<Option<String>>> = LazyLock::new(|| Arc::new(None));
static EMPTY_SESSION_VARS: LazyLock<Arc<AHashMap<String, Value>>> =
LazyLock::new(|| Arc::new(AHashMap::new()));
use smallvec::SmallVec;
type ScalarSubqueryCacheEntry = (SmallVec<[CompactArc<str>; 2]>, Value);
thread_local! {
static SCALAR_SUBQUERY_CACHE: RefCell<LruCache<String, ScalarSubqueryCacheEntry>> =
RefCell::new(LruCache::new(NonZeroUsize::new(SCALAR_SUBQUERY_CACHE_SIZE).unwrap()));
}
pub fn clear_scalar_subquery_cache() {
SCALAR_SUBQUERY_CACHE.with(|cache| {
cache.borrow_mut().clear();
});
}
#[inline]
pub fn invalidate_scalar_subquery_cache_for_table(table_name: &str) {
SCALAR_SUBQUERY_CACHE.with(|cache| {
let mut c = cache.borrow_mut();
if c.is_empty() {
return;
}
let keys_to_remove: Vec<String> = c
.iter()
.filter(|(_, (tables, _))| tables.iter().any(|t| t.eq_ignore_ascii_case(table_name)))
.map(|(k, _)| k.clone())
.collect();
for key in keys_to_remove {
c.pop(&key);
}
});
}
pub fn get_cached_scalar_subquery(key: &str) -> Option<Value> {
SCALAR_SUBQUERY_CACHE.with(|cache| cache.borrow_mut().get(key).map(|(_, v)| v.clone()))
}
pub fn cache_scalar_subquery(key: String, tables: SmallVec<[CompactArc<str>; 2]>, value: Value) {
SCALAR_SUBQUERY_CACHE.with(|cache| {
cache.borrow_mut().put(key, (tables, value));
});
}
type InSubqueryCacheEntry = (SmallVec<[CompactArc<str>; 2]>, Vec<Value>);
thread_local! {
static IN_SUBQUERY_CACHE: RefCell<LruCache<String, InSubqueryCacheEntry>> =
RefCell::new(LruCache::new(NonZeroUsize::new(IN_SUBQUERY_CACHE_SIZE).unwrap()));
}
pub fn clear_in_subquery_cache() {
IN_SUBQUERY_CACHE.with(|cache| {
cache.borrow_mut().clear();
});
}
#[inline]
pub fn invalidate_in_subquery_cache_for_table(table_name: &str) {
IN_SUBQUERY_CACHE.with(|cache| {
let mut c = cache.borrow_mut();
if c.is_empty() {
return;
}
let keys_to_remove: Vec<String> = c
.iter()
.filter(|(_, (tables, _))| tables.iter().any(|t| t.eq_ignore_ascii_case(table_name)))
.map(|(k, _)| k.clone())
.collect();
for key in keys_to_remove {
c.pop(&key);
}
});
}
pub fn get_cached_in_subquery(key: &str) -> Option<Vec<Value>> {
IN_SUBQUERY_CACHE.with(|cache| cache.borrow_mut().get(key).map(|(_, v)| v.clone()))
}
pub fn cache_in_subquery(key: String, tables: SmallVec<[CompactArc<str>; 2]>, values: Vec<Value>) {
IN_SUBQUERY_CACHE.with(|cache| {
cache.borrow_mut().put(key, (tables, values));
});
}
use crate::parser::ast::{Expression, SelectStatement};
pub fn extract_table_names_for_cache(stmt: &SelectStatement) -> SmallVec<[CompactArc<str>; 2]> {
let mut tables = SmallVec::new();
if let Some(ref table_expr) = stmt.table_expr {
collect_real_table_names(table_expr, &mut tables);
}
tables
}
fn collect_real_table_names(source: &Expression, tables: &mut SmallVec<[CompactArc<str>; 2]>) {
match source {
Expression::TableSource(ts) => {
tables.push(CompactArc::from(ts.name.value_lower.as_str()));
}
Expression::JoinSource(js) => {
collect_real_table_names(&js.left, tables);
collect_real_table_names(&js.right, tables);
}
Expression::SubquerySource(ss) => {
if let Some(ref table_expr) = ss.subquery.table_expr {
collect_real_table_names(table_expr, tables);
}
}
_ => {}
}
}
use ahash::AHashMap;
use std::hash::{Hash, Hasher};
type SemiJoinCacheEntry = (CompactArc<str>, CompactArc<ValueSet>);
#[inline]
pub fn compute_semi_join_cache_key(table: &str, column: &str, pred_hash: u64) -> u64 {
let mut hasher = rustc_hash::FxHasher::default();
table.hash(&mut hasher);
column.hash(&mut hasher);
pred_hash.hash(&mut hasher);
hasher.finish()
}
thread_local! {
static SEMI_JOIN_CACHE: RefCell<LruCache<u64, SemiJoinCacheEntry>> =
RefCell::new(LruCache::new(NonZeroUsize::new(SEMI_JOIN_CACHE_SIZE).unwrap()));
}
pub fn clear_semi_join_cache() {
SEMI_JOIN_CACHE.with(|cache| {
cache.borrow_mut().clear();
});
}
#[inline]
pub fn invalidate_semi_join_cache_for_table(table_name: &str) {
SEMI_JOIN_CACHE.with(|cache| {
let mut c = cache.borrow_mut();
if c.is_empty() {
return;
}
let keys_to_remove: Vec<u64> = c
.iter()
.filter(|(_, (key_table, _))| key_table.eq_ignore_ascii_case(table_name))
.map(|(k, _)| *k)
.collect();
for key in keys_to_remove {
c.pop(&key);
}
});
}
#[inline]
pub fn get_cached_semi_join(key_hash: u64) -> Option<CompactArc<ValueSet>> {
SEMI_JOIN_CACHE.with(|cache| {
cache
.borrow_mut()
.get(&key_hash)
.map(|(_, v)| CompactArc::clone(v))
})
}
#[inline]
pub fn cache_semi_join_arc(key_hash: u64, table: &str, values: CompactArc<ValueSet>) {
SEMI_JOIN_CACHE.with(|cache| {
cache
.borrow_mut()
.put(key_hash, (CompactArc::from(table), values));
});
}
use super::expression::RowFilter;
thread_local! {
static EXISTS_PREDICATE_CACHE: RefCell<FxHashMap<String, RowFilter>> = RefCell::new(FxHashMap::default());
}
pub fn clear_exists_predicate_cache() {
EXISTS_PREDICATE_CACHE.with(|cache| {
cache.borrow_mut().clear();
});
}
pub fn get_cached_exists_predicate(key: &str) -> Option<RowFilter> {
EXISTS_PREDICATE_CACHE.with(|cache| cache.borrow().get(key).cloned())
}
pub fn cache_exists_predicate(key: String, filter: RowFilter) {
EXISTS_PREDICATE_CACHE.with(|cache| {
cache.borrow_mut().insert(key, filter);
});
}
use crate::storage::traits::Index;
thread_local! {
static EXISTS_INDEX_CACHE: RefCell<FxHashMap<String, std::sync::Arc<dyn Index>>> = RefCell::new(FxHashMap::default());
}
pub fn clear_exists_index_cache() {
EXISTS_INDEX_CACHE.with(|cache| {
cache.borrow_mut().clear();
});
}
pub fn get_cached_exists_index(key: &str) -> Option<std::sync::Arc<dyn Index>> {
EXISTS_INDEX_CACHE.with(|cache| cache.borrow().get(key).cloned())
}
pub fn cache_exists_index(key: String, index: std::sync::Arc<dyn Index>) {
EXISTS_INDEX_CACHE.with(|cache| {
cache.borrow_mut().insert(key, index);
});
}
pub type RowFetcher = Box<dyn Fn(&[i64]) -> crate::core::RowVec + Send + Sync>;
pub type RowCounter = Box<dyn Fn(&[i64]) -> usize + Send + Sync>;
thread_local! {
static EXISTS_FETCHER_CACHE: RefCell<FxHashMap<String, std::sync::Arc<RowFetcher>>> = RefCell::new(FxHashMap::default());
}
thread_local! {
static COUNT_COUNTER_CACHE: RefCell<FxHashMap<String, std::sync::Arc<RowCounter>>> = RefCell::new(FxHashMap::default());
}
pub fn clear_exists_fetcher_cache() {
EXISTS_FETCHER_CACHE.with(|cache| {
cache.borrow_mut().clear();
});
}
pub fn clear_count_counter_cache() {
COUNT_COUNTER_CACHE.with(|cache| {
cache.borrow_mut().clear();
});
}
pub fn get_cached_exists_fetcher(key: &str) -> Option<std::sync::Arc<RowFetcher>> {
EXISTS_FETCHER_CACHE.with(|cache| cache.borrow().get(key).cloned())
}
pub fn get_cached_count_counter(key: &str) -> Option<std::sync::Arc<RowCounter>> {
COUNT_COUNTER_CACHE.with(|cache| cache.borrow().get(key).cloned())
}
pub fn cache_exists_fetcher(key: String, fetcher: RowFetcher) {
EXISTS_FETCHER_CACHE.with(|cache| {
cache.borrow_mut().insert(key, std::sync::Arc::new(fetcher));
});
}
pub fn cache_count_counter(key: String, counter: RowCounter) {
COUNT_COUNTER_CACHE.with(|cache| {
cache.borrow_mut().insert(key, std::sync::Arc::new(counter));
});
}
thread_local! {
static EXISTS_SCHEMA_CACHE: RefCell<FxHashMap<String, CompactArc<Vec<String>>>> = RefCell::new(FxHashMap::default());
}
pub fn clear_exists_schema_cache() {
EXISTS_SCHEMA_CACHE.with(|cache| {
cache.borrow_mut().clear();
});
}
pub fn get_cached_exists_schema(key: &str) -> Option<CompactArc<Vec<String>>> {
EXISTS_SCHEMA_CACHE.with(|cache| cache.borrow().get(key).cloned())
}
pub fn cache_exists_schema(key: String, columns: CompactArc<Vec<String>>) {
EXISTS_SCHEMA_CACHE.with(|cache| {
cache.borrow_mut().insert(key, columns);
});
}
thread_local! {
static EXISTS_PRED_KEY_CACHE: RefCell<FxHashMap<usize, String>> = RefCell::new(FxHashMap::default());
}
pub fn clear_exists_pred_key_cache() {
EXISTS_PRED_KEY_CACHE.with(|cache| {
cache.borrow_mut().clear();
});
}
#[inline]
pub fn get_cached_exists_pred_key(subquery_ptr: usize) -> Option<String> {
EXISTS_PRED_KEY_CACHE.with(|cache| cache.borrow().get(&subquery_ptr).cloned())
}
#[inline]
pub fn cache_exists_pred_key(subquery_ptr: usize, pred_key: String) {
EXISTS_PRED_KEY_CACHE.with(|cache| {
cache.borrow_mut().insert(subquery_ptr, pred_key);
});
}
thread_local! {
static BATCH_AGGREGATE_CACHE: RefCell<FxHashMap<String, CompactArc<ValueMap<Value>>>> = RefCell::new(FxHashMap::default());
}
pub fn clear_batch_aggregate_cache() {
BATCH_AGGREGATE_CACHE.with(|cache| {
let mut c = cache.borrow_mut();
c.clear();
c.shrink_to_fit();
});
}
pub fn get_cached_batch_aggregate(key: &str) -> Option<CompactArc<ValueMap<Value>>> {
BATCH_AGGREGATE_CACHE.with(|cache| cache.borrow().get(key).cloned())
}
pub fn cache_batch_aggregate(key: String, values: ValueMap<Value>) {
BATCH_AGGREGATE_CACHE.with(|cache| {
cache.borrow_mut().insert(key, CompactArc::new(values));
});
}
#[derive(Clone)]
pub struct BatchAggregateLookupInfo {
pub cache_key: String,
pub outer_column_lower: String,
pub outer_qualified_lower: Option<String>,
pub is_count: bool,
}
thread_local! {
static BATCH_AGGREGATE_INFO_CACHE: RefCell<FxHashMap<usize, Option<Arc<BatchAggregateLookupInfo>>>> = RefCell::new(FxHashMap::default());
}
pub fn clear_batch_aggregate_info_cache() {
BATCH_AGGREGATE_INFO_CACHE.with(|cache| {
let mut c = cache.borrow_mut();
c.clear();
c.shrink_to_fit();
});
}
#[inline]
pub fn get_cached_batch_aggregate_info(
subquery_ptr: usize,
) -> Option<Option<Arc<BatchAggregateLookupInfo>>> {
BATCH_AGGREGATE_INFO_CACHE.with(|cache| cache.borrow().get(&subquery_ptr).cloned())
}
#[inline]
pub fn cache_batch_aggregate_info(
subquery_ptr: usize,
info: Option<BatchAggregateLookupInfo>,
) -> Option<Arc<BatchAggregateLookupInfo>> {
let arc_info = info.map(Arc::new);
let result = arc_info.clone();
BATCH_AGGREGATE_INFO_CACHE.with(|cache| {
cache.borrow_mut().insert(subquery_ptr, arc_info);
});
result
}
#[derive(Clone)]
pub struct ExistsCorrelationInfo {
pub outer_column: String,
pub outer_table: Option<String>,
pub inner_column: String,
pub inner_table: String,
pub outer_column_lower: String,
pub outer_qualified_lower: Option<String>,
pub additional_predicate: Option<Expression>,
pub index_cache_key: String,
}
thread_local! {
static EXISTS_CORRELATION_CACHE: RefCell<FxHashMap<usize, Option<Arc<ExistsCorrelationInfo>>>> = RefCell::new(FxHashMap::default());
}
pub fn clear_exists_correlation_cache() {
EXISTS_CORRELATION_CACHE.with(|cache| {
cache.borrow_mut().clear();
});
}
pub fn clear_all_thread_local_caches() {
SCALAR_SUBQUERY_CACHE.with(|cache| {
cache.borrow_mut().clear();
});
IN_SUBQUERY_CACHE.with(|cache| {
cache.borrow_mut().clear();
});
SEMI_JOIN_CACHE.with(|cache| {
cache.borrow_mut().clear();
});
EXISTS_PREDICATE_CACHE.with(|cache| {
let mut c = cache.borrow_mut();
c.clear();
c.shrink_to_fit();
});
EXISTS_INDEX_CACHE.with(|cache| {
let mut c = cache.borrow_mut();
c.clear();
c.shrink_to_fit();
});
EXISTS_FETCHER_CACHE.with(|cache| {
let mut c = cache.borrow_mut();
c.clear();
c.shrink_to_fit();
});
COUNT_COUNTER_CACHE.with(|cache| {
let mut c = cache.borrow_mut();
c.clear();
c.shrink_to_fit();
});
EXISTS_SCHEMA_CACHE.with(|cache| {
let mut c = cache.borrow_mut();
c.clear();
c.shrink_to_fit();
});
EXISTS_PRED_KEY_CACHE.with(|cache| {
let mut c = cache.borrow_mut();
c.clear();
c.shrink_to_fit();
});
BATCH_AGGREGATE_CACHE.with(|cache| {
let mut c = cache.borrow_mut();
c.clear();
c.shrink_to_fit();
});
BATCH_AGGREGATE_INFO_CACHE.with(|cache| {
let mut c = cache.borrow_mut();
c.clear();
c.shrink_to_fit();
});
EXISTS_CORRELATION_CACHE.with(|cache| {
let mut c = cache.borrow_mut();
c.clear();
c.shrink_to_fit();
});
crate::storage::expression::clear_regex_cache();
crate::storage::expression::clear_like_regex_cache();
crate::core::row_vec::clear_row_vec_pool();
crate::core::row_vec::clear_row_id_vec_pool();
crate::storage::mvcc::clear_version_map_pools();
super::expression::clear_program_cache();
super::query_classification::clear_classification_cache();
}
#[inline]
pub fn get_cached_exists_correlation(
subquery_ptr: usize,
) -> Option<Option<Arc<ExistsCorrelationInfo>>> {
EXISTS_CORRELATION_CACHE.with(|cache| cache.borrow().get(&subquery_ptr).cloned())
}
#[inline]
pub fn cache_exists_correlation(
subquery_ptr: usize,
info: Option<ExistsCorrelationInfo>,
) -> Option<Arc<ExistsCorrelationInfo>> {
let arc_info = info.map(Arc::new);
let result = arc_info.clone();
EXISTS_CORRELATION_CACHE.with(|cache| {
cache.borrow_mut().insert(subquery_ptr, arc_info);
});
result
}
#[derive(Debug, Clone)]
pub struct ExecutionContext {
params: CompactArc<ParamVec>,
named_params: Arc<FxHashMap<String, Value>>,
auto_commit: bool,
cancelled: Arc<AtomicBool>,
current_database: Arc<Option<String>>,
session_vars: Arc<AHashMap<String, Value>>,
timeout_ms: u64,
view_depth: usize,
pub(crate) query_depth: usize,
pub(crate) outer_row: Option<FxHashMap<CompactArc<str>, Value>>,
outer_columns: Option<CompactArc<Vec<String>>>,
cte_data: Option<Arc<CteDataMap>>,
transaction_id: Option<u64>,
}
type CteData = (CompactArc<Vec<String>>, CompactArc<Vec<(i64, Row)>>);
type CteDataMap = StringMap<CteData>;
impl Default for ExecutionContext {
fn default() -> Self {
Self::new()
}
}
impl ExecutionContext {
pub fn new() -> Self {
Self {
params: EMPTY_PARAMS.clone(),
named_params: EMPTY_NAMED_PARAMS.clone(),
auto_commit: true,
cancelled: Arc::new(AtomicBool::new(false)), current_database: EMPTY_DATABASE.clone(),
session_vars: EMPTY_SESSION_VARS.clone(),
timeout_ms: 0,
view_depth: 0,
query_depth: 0,
outer_row: None,
outer_columns: None,
cte_data: None,
transaction_id: None,
}
}
pub fn with_params(params: ParamVec) -> Self {
Self {
params: CompactArc::new(params),
..Self::new()
}
}
pub fn with_named_params(named_params: FxHashMap<String, Value>) -> Self {
Self {
named_params: Arc::new(named_params),
..Self::new()
}
}
pub fn get_param(&self, index: usize) -> Option<&Value> {
if index == 0 || index > self.params.len() {
None
} else {
self.params.get(index - 1)
}
}
pub fn get_named_param(&self, name: &str) -> Option<&Value> {
self.named_params.get(name)
}
pub fn params(&self) -> &[Value] {
&self.params
}
pub fn params_arc(&self) -> &CompactArc<ParamVec> {
&self.params
}
pub fn named_params(&self) -> &FxHashMap<String, Value> {
&self.named_params
}
pub fn named_params_arc(&self) -> &Arc<FxHashMap<String, Value>> {
&self.named_params
}
pub fn param_count(&self) -> usize {
self.params.len()
}
pub fn set_params(&mut self, params: ParamVec) {
self.params = CompactArc::new(params);
}
pub fn add_param(&mut self, value: Value) {
CompactArc::make_mut(&mut self.params).push(value);
}
pub fn set_named_param(&mut self, name: impl Into<String>, value: Value) {
Arc::make_mut(&mut self.named_params).insert(name.into(), value);
}
pub fn auto_commit(&self) -> bool {
self.auto_commit
}
pub fn set_auto_commit(&mut self, auto_commit: bool) {
self.auto_commit = auto_commit;
}
pub fn is_cancelled(&self) -> bool {
self.cancelled.load(Ordering::Relaxed)
}
pub fn cancel(&self) {
self.cancelled.store(true, Ordering::Relaxed);
}
pub fn cancellation_handle(&self) -> CancellationHandle {
CancellationHandle {
cancelled: self.cancelled.clone(),
}
}
pub fn current_database(&self) -> Option<&str> {
self.current_database.as_ref().as_deref()
}
pub fn set_current_database(&mut self, database: impl Into<String>) {
self.current_database = Arc::new(Some(database.into()));
}
pub fn get_session_var(&self, name: &str) -> Option<&Value> {
self.session_vars.get(name)
}
pub fn set_session_var(&mut self, name: impl Into<String>, value: Value) {
Arc::make_mut(&mut self.session_vars).insert(name.into(), value);
}
pub fn timeout_ms(&self) -> u64 {
self.timeout_ms
}
pub fn set_timeout_ms(&mut self, timeout_ms: u64) {
self.timeout_ms = timeout_ms;
}
pub fn has_timeout(&self) -> bool {
self.timeout_ms > 0
}
pub fn view_depth(&self) -> usize {
self.view_depth
}
pub fn with_incremented_view_depth(&self) -> Self {
Self {
params: self.params.clone(),
named_params: self.named_params.clone(),
auto_commit: self.auto_commit,
cancelled: self.cancelled.clone(),
current_database: self.current_database.clone(),
session_vars: self.session_vars.clone(),
timeout_ms: self.timeout_ms,
view_depth: self.view_depth + 1,
query_depth: self.query_depth + 1, outer_row: self.outer_row.clone(),
outer_columns: self.outer_columns.clone(),
cte_data: self.cte_data.clone(),
transaction_id: self.transaction_id,
}
}
pub fn with_incremented_query_depth(&self) -> Self {
Self {
params: self.params.clone(),
named_params: self.named_params.clone(),
auto_commit: self.auto_commit,
cancelled: self.cancelled.clone(),
current_database: self.current_database.clone(),
session_vars: self.session_vars.clone(),
timeout_ms: self.timeout_ms,
view_depth: self.view_depth,
query_depth: self.query_depth + 1,
outer_row: self.outer_row.clone(),
outer_columns: self.outer_columns.clone(),
cte_data: self.cte_data.clone(),
transaction_id: self.transaction_id,
}
}
pub fn outer_row(&self) -> Option<&FxHashMap<CompactArc<str>, Value>> {
self.outer_row.as_ref()
}
pub fn outer_columns(&self) -> Option<&[String]> {
self.outer_columns.as_ref().map(|v| v.as_slice())
}
pub fn with_outer_row(
&self,
outer_row: FxHashMap<CompactArc<str>, Value>,
outer_columns: CompactArc<Vec<String>>,
) -> Self {
Self {
params: self.params.clone(), named_params: self.named_params.clone(), auto_commit: self.auto_commit,
cancelled: self.cancelled.clone(), current_database: self.current_database.clone(), session_vars: self.session_vars.clone(), timeout_ms: self.timeout_ms,
view_depth: self.view_depth,
query_depth: self.query_depth + 1, outer_row: Some(outer_row),
outer_columns: Some(outer_columns), cte_data: self.cte_data.clone(), transaction_id: self.transaction_id,
}
}
pub fn get_cte(&self, name: &str) -> Option<&CteData> {
self.cte_data
.as_ref()
.and_then(|data| data.get(&name.to_lowercase()))
}
#[inline]
pub fn get_cte_by_lower(&self, name_lower: &str) -> Option<&CteData> {
self.cte_data.as_ref().and_then(|data| data.get(name_lower))
}
pub fn has_cte(&self, name: &str) -> bool {
self.cte_data
.as_ref()
.is_some_and(|data| data.contains_key(&name.to_lowercase()))
}
#[inline]
pub fn has_cte_by_lower(&self, name_lower: &str) -> bool {
self.cte_data
.as_ref()
.is_some_and(|data| data.contains_key(name_lower))
}
pub fn with_cte_data(&self, cte_data: Arc<CteDataMap>) -> Self {
Self {
params: self.params.clone(),
named_params: self.named_params.clone(),
auto_commit: self.auto_commit,
cancelled: self.cancelled.clone(),
current_database: self.current_database.clone(),
session_vars: self.session_vars.clone(),
timeout_ms: self.timeout_ms,
view_depth: self.view_depth,
query_depth: self.query_depth,
outer_row: self.outer_row.clone(),
outer_columns: self.outer_columns.clone(),
cte_data: Some(cte_data),
transaction_id: self.transaction_id,
}
}
pub fn transaction_id(&self) -> Option<u64> {
self.transaction_id
}
pub fn set_transaction_id(&mut self, txn_id: u64) {
self.transaction_id = Some(txn_id);
}
pub fn with_transaction_id(&self, txn_id: u64) -> Self {
Self {
params: self.params.clone(),
named_params: self.named_params.clone(),
auto_commit: self.auto_commit,
cancelled: self.cancelled.clone(),
current_database: self.current_database.clone(),
session_vars: self.session_vars.clone(),
timeout_ms: self.timeout_ms,
view_depth: self.view_depth,
query_depth: self.query_depth,
outer_row: self.outer_row.clone(),
outer_columns: self.outer_columns.clone(),
cte_data: self.cte_data.clone(),
transaction_id: Some(txn_id),
}
}
pub fn check_cancelled(&self) -> Result<()> {
if self.is_cancelled() {
Err(crate::core::Error::QueryCancelled)
} else {
Ok(())
}
}
}
#[derive(Debug, Clone)]
pub struct CancellationHandle {
cancelled: Arc<AtomicBool>,
}
impl CancellationHandle {
pub fn cancel(&self) {
self.cancelled.store(true, Ordering::Relaxed);
}
pub fn is_cancelled(&self) -> bool {
self.cancelled.load(Ordering::Relaxed)
}
}
struct TimeoutEntry {
deadline: Instant,
id: u64,
cancel_handle: CancellationHandle,
cancelled: Arc<AtomicBool>,
}
impl PartialEq for TimeoutEntry {
fn eq(&self, other: &Self) -> bool {
self.deadline == other.deadline && self.id == other.id
}
}
impl Eq for TimeoutEntry {}
impl PartialOrd for TimeoutEntry {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for TimeoutEntry {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
other.deadline.cmp(&self.deadline)
}
}
struct TimeoutManagerState {
timeouts: BinaryHeap<TimeoutEntry>,
shutdown: bool,
}
struct TimeoutManager {
state: Mutex<TimeoutManagerState>,
condvar: Condvar,
next_id: AtomicU64,
}
impl TimeoutManager {
fn new() -> Arc<Self> {
let manager = Arc::new(Self {
state: Mutex::new(TimeoutManagerState {
timeouts: BinaryHeap::new(),
shutdown: false,
}),
condvar: Condvar::new(),
next_id: AtomicU64::new(1),
});
let manager_clone = Arc::clone(&manager);
std::thread::Builder::new()
.name("stoolap-timeout-manager".to_string())
.spawn(move || {
manager_clone.run();
})
.expect("Failed to spawn timeout manager thread");
manager
}
fn run(&self) {
loop {
let mut state = self.state.lock().unwrap();
if state.shutdown && state.timeouts.is_empty() {
return;
}
let now = Instant::now();
while let Some(entry) = state.timeouts.peek() {
if entry.deadline <= now {
let entry = state.timeouts.pop().unwrap();
if !entry.cancelled.load(Ordering::Relaxed) {
entry.cancel_handle.cancel();
}
} else {
break;
}
}
let wait_duration = if let Some(entry) = state.timeouts.peek() {
entry.deadline.saturating_duration_since(now)
} else {
Duration::from_secs(3600) };
if wait_duration.is_zero() {
continue; }
let (new_state, _timeout_result) =
self.condvar.wait_timeout(state, wait_duration).unwrap();
state = new_state;
if state.shutdown && state.timeouts.is_empty() {
return;
}
}
}
fn register(
&self,
timeout_ms: u64,
cancel_handle: CancellationHandle,
cancelled: Arc<AtomicBool>,
) -> u64 {
let id = self.next_id.fetch_add(1, Ordering::Relaxed);
let deadline = Instant::now() + Duration::from_millis(timeout_ms);
let entry = TimeoutEntry {
deadline,
id,
cancel_handle,
cancelled,
};
let mut state = self.state.lock().unwrap();
let was_empty = state.timeouts.is_empty();
let is_earliest = state.timeouts.peek().is_none_or(|e| deadline < e.deadline);
state.timeouts.push(entry);
if was_empty || is_earliest {
self.condvar.notify_one();
}
id
}
}
fn global_timeout_manager() -> &'static Arc<TimeoutManager> {
use std::sync::OnceLock;
static MANAGER: OnceLock<Arc<TimeoutManager>> = OnceLock::new();
MANAGER.get_or_init(TimeoutManager::new)
}
pub struct TimeoutGuard {
cancelled: Arc<AtomicBool>,
}
impl TimeoutGuard {
pub fn new(ctx: &ExecutionContext) -> Option<Self> {
let timeout_ms = ctx.timeout_ms();
if timeout_ms == 0 {
return None;
}
let cancel_handle = ctx.cancellation_handle();
let cancelled = Arc::new(AtomicBool::new(false));
global_timeout_manager().register(timeout_ms, cancel_handle, Arc::clone(&cancelled));
Some(Self { cancelled })
}
}
impl Drop for TimeoutGuard {
fn drop(&mut self) {
self.cancelled.store(true, Ordering::Relaxed);
}
}
pub struct ExecutionContextBuilder {
ctx: ExecutionContext,
}
impl ExecutionContextBuilder {
pub fn new() -> Self {
Self {
ctx: ExecutionContext::new(),
}
}
pub fn params(mut self, params: ParamVec) -> Self {
self.ctx.params = CompactArc::new(params);
self
}
pub fn param(self, value: Value) -> Self {
let mut v = (*self.ctx.params).clone();
v.push(value);
Self {
ctx: ExecutionContext {
params: CompactArc::new(v),
..self.ctx
},
}
}
pub fn named_param(self, name: impl Into<String>, value: Value) -> Self {
Self {
ctx: ExecutionContext {
named_params: Arc::new({
let mut m = (*self.ctx.named_params).clone();
m.insert(name.into(), value);
m
}),
..self.ctx
},
}
}
pub fn auto_commit(mut self, auto_commit: bool) -> Self {
self.ctx.auto_commit = auto_commit;
self
}
pub fn database(mut self, database: impl Into<String>) -> Self {
self.ctx.current_database = Arc::new(Some(database.into()));
self
}
pub fn session_var(self, name: impl Into<String>, value: Value) -> Self {
Self {
ctx: ExecutionContext {
session_vars: Arc::new({
let mut m = (*self.ctx.session_vars).clone();
m.insert(name.into(), value);
m
}),
..self.ctx
},
}
}
pub fn timeout_ms(mut self, timeout_ms: u64) -> Self {
self.ctx.timeout_ms = timeout_ms;
self
}
pub fn build(self) -> ExecutionContext {
self.ctx
}
}
impl Default for ExecutionContextBuilder {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use rustc_hash::FxHashMap;
#[test]
fn test_context_new() {
let ctx = ExecutionContext::new();
assert_eq!(ctx.param_count(), 0);
assert!(ctx.auto_commit());
assert!(!ctx.is_cancelled());
}
#[test]
fn test_context_with_params() {
let ctx = ExecutionContext::with_params(smallvec::smallvec![
Value::Integer(1),
Value::text("hello")
]);
assert_eq!(ctx.param_count(), 2);
assert_eq!(ctx.get_param(1), Some(&Value::Integer(1)));
assert_eq!(ctx.get_param(2), Some(&Value::text("hello")));
assert_eq!(ctx.get_param(0), None); assert_eq!(ctx.get_param(3), None); }
#[test]
fn test_context_named_params() {
let mut params = FxHashMap::default();
params.insert("name".to_string(), Value::text("Alice"));
params.insert("age".to_string(), Value::Integer(30));
let ctx = ExecutionContext::with_named_params(params);
assert_eq!(ctx.get_named_param("name"), Some(&Value::text("Alice")));
assert_eq!(ctx.get_named_param("age"), Some(&Value::Integer(30)));
assert_eq!(ctx.get_named_param("unknown"), None);
}
#[test]
fn test_context_cancellation() {
let ctx = ExecutionContext::new();
assert!(!ctx.is_cancelled());
let handle = ctx.cancellation_handle();
assert!(!handle.is_cancelled());
handle.cancel();
assert!(ctx.is_cancelled());
assert!(handle.is_cancelled());
}
#[test]
fn test_context_check_cancelled() {
let ctx = ExecutionContext::new();
assert!(ctx.check_cancelled().is_ok());
ctx.cancel();
assert!(ctx.check_cancelled().is_err());
}
#[test]
fn test_context_session_vars() {
let mut ctx = ExecutionContext::new();
ctx.set_session_var("timezone", Value::text("UTC"));
assert_eq!(ctx.get_session_var("timezone"), Some(&Value::text("UTC")));
assert_eq!(ctx.get_session_var("unknown"), None);
}
#[test]
fn test_context_builder() {
let ctx = ExecutionContextBuilder::new()
.params(smallvec::smallvec![Value::Integer(1)])
.param(Value::Integer(2))
.named_param("name", Value::text("test"))
.auto_commit(false)
.database("mydb")
.timeout_ms(5000)
.build();
assert_eq!(ctx.param_count(), 2);
assert_eq!(ctx.get_param(1), Some(&Value::Integer(1)));
assert_eq!(ctx.get_param(2), Some(&Value::Integer(2)));
assert_eq!(ctx.get_named_param("name"), Some(&Value::text("test")));
assert!(!ctx.auto_commit());
assert_eq!(ctx.current_database(), Some("mydb"));
assert_eq!(ctx.timeout_ms(), 5000);
}
}