use rustc_hash::FxHashMap;
use std::cell::RefCell;
use std::collections::{BinaryHeap, HashMap};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Condvar, Mutex};
use std::time::{Duration, Instant};
const DEFAULT_SCHEMA: &str = "public";
use crate::core::{Result, Row, Schema, Value};
thread_local! {
static SCALAR_SUBQUERY_CACHE: RefCell<FxHashMap<String, Value>> = RefCell::new(FxHashMap::default());
}
pub fn clear_scalar_subquery_cache() {
SCALAR_SUBQUERY_CACHE.with(|cache| {
let mut c = cache.borrow_mut();
c.clear();
c.shrink_to_fit(); });
}
pub fn get_cached_scalar_subquery(key: &str) -> Option<Value> {
SCALAR_SUBQUERY_CACHE.with(|cache| cache.borrow().get(key).cloned())
}
pub fn cache_scalar_subquery(key: String, value: Value) {
SCALAR_SUBQUERY_CACHE.with(|cache| {
cache.borrow_mut().insert(key, value);
});
}
thread_local! {
static IN_SUBQUERY_CACHE: RefCell<FxHashMap<String, Vec<Value>>> = RefCell::new(FxHashMap::default());
}
pub fn clear_in_subquery_cache() {
IN_SUBQUERY_CACHE.with(|cache| {
let mut c = cache.borrow_mut();
c.clear();
c.shrink_to_fit(); });
}
pub fn get_cached_in_subquery(key: &str) -> Option<Vec<Value>> {
IN_SUBQUERY_CACHE.with(|cache| cache.borrow().get(key).cloned())
}
pub fn cache_in_subquery(key: String, values: Vec<Value>) {
IN_SUBQUERY_CACHE.with(|cache| {
cache.borrow_mut().insert(key, values);
});
}
use ahash::AHashSet;
thread_local! {
static SEMI_JOIN_CACHE: RefCell<FxHashMap<String, Arc<AHashSet<Value>>>> = RefCell::new(FxHashMap::default());
}
pub fn clear_semi_join_cache() {
SEMI_JOIN_CACHE.with(|cache| {
let mut c = cache.borrow_mut();
c.clear();
c.shrink_to_fit();
});
}
pub fn get_cached_semi_join(key: &str) -> Option<Arc<AHashSet<Value>>> {
SEMI_JOIN_CACHE.with(|cache| cache.borrow().get(key).cloned())
}
pub fn cache_semi_join(key: String, values: AHashSet<Value>) {
SEMI_JOIN_CACHE.with(|cache| {
cache.borrow_mut().insert(key, Arc::new(values));
});
}
pub fn cache_semi_join_arc(key: String, values: Arc<AHashSet<Value>>) {
SEMI_JOIN_CACHE.with(|cache| {
cache.borrow_mut().insert(key, values);
});
}
use super::expression::RowFilter;
thread_local! {
static EXISTS_PREDICATE_CACHE: RefCell<FxHashMap<String, RowFilter>> = RefCell::new(FxHashMap::default());
}
pub fn clear_exists_predicate_cache() {
EXISTS_PREDICATE_CACHE.with(|cache| {
cache.borrow_mut().clear();
});
}
pub fn get_cached_exists_predicate(key: &str) -> Option<RowFilter> {
EXISTS_PREDICATE_CACHE.with(|cache| cache.borrow().get(key).cloned())
}
pub fn cache_exists_predicate(key: String, filter: RowFilter) {
EXISTS_PREDICATE_CACHE.with(|cache| {
cache.borrow_mut().insert(key, filter);
});
}
use crate::storage::traits::Index;
thread_local! {
static EXISTS_INDEX_CACHE: RefCell<FxHashMap<String, std::sync::Arc<dyn Index>>> = RefCell::new(FxHashMap::default());
}
pub fn clear_exists_index_cache() {
EXISTS_INDEX_CACHE.with(|cache| {
cache.borrow_mut().clear();
});
}
pub fn get_cached_exists_index(key: &str) -> Option<std::sync::Arc<dyn Index>> {
EXISTS_INDEX_CACHE.with(|cache| cache.borrow().get(key).cloned())
}
pub fn cache_exists_index(key: String, index: std::sync::Arc<dyn Index>) {
EXISTS_INDEX_CACHE.with(|cache| {
cache.borrow_mut().insert(key, index);
});
}
type RowFetcher = Box<dyn Fn(&[i64]) -> Vec<(i64, crate::core::Row)> + Send + Sync>;
thread_local! {
static EXISTS_FETCHER_CACHE: RefCell<FxHashMap<String, std::sync::Arc<RowFetcher>>> = RefCell::new(FxHashMap::default());
}
pub fn clear_exists_fetcher_cache() {
EXISTS_FETCHER_CACHE.with(|cache| {
cache.borrow_mut().clear();
});
}
pub fn get_cached_exists_fetcher(key: &str) -> Option<std::sync::Arc<RowFetcher>> {
EXISTS_FETCHER_CACHE.with(|cache| cache.borrow().get(key).cloned())
}
pub fn cache_exists_fetcher(key: String, fetcher: RowFetcher) {
EXISTS_FETCHER_CACHE.with(|cache| {
cache.borrow_mut().insert(key, std::sync::Arc::new(fetcher));
});
}
thread_local! {
static EXISTS_SCHEMA_CACHE: RefCell<FxHashMap<String, std::sync::Arc<Vec<String>>>> = RefCell::new(FxHashMap::default());
}
pub fn clear_exists_schema_cache() {
EXISTS_SCHEMA_CACHE.with(|cache| {
cache.borrow_mut().clear();
});
}
pub fn get_cached_exists_schema(key: &str) -> Option<std::sync::Arc<Vec<String>>> {
EXISTS_SCHEMA_CACHE.with(|cache| cache.borrow().get(key).cloned())
}
pub fn cache_exists_schema(key: String, columns: Vec<String>) {
EXISTS_SCHEMA_CACHE.with(|cache| {
cache.borrow_mut().insert(key, std::sync::Arc::new(columns));
});
}
thread_local! {
static EXISTS_PRED_KEY_CACHE: RefCell<FxHashMap<String, String>> = RefCell::new(FxHashMap::default());
}
pub fn clear_exists_pred_key_cache() {
EXISTS_PRED_KEY_CACHE.with(|cache| {
cache.borrow_mut().clear();
});
}
pub fn get_cached_exists_pred_key(subquery_key: &str) -> Option<String> {
EXISTS_PRED_KEY_CACHE.with(|cache| cache.borrow().get(subquery_key).cloned())
}
pub fn cache_exists_pred_key(subquery_key: String, pred_key: String) {
EXISTS_PRED_KEY_CACHE.with(|cache| {
cache.borrow_mut().insert(subquery_key, pred_key);
});
}
#[derive(Debug, Clone)]
pub struct ExecutionContext {
params: Arc<Vec<Value>>,
named_params: Arc<FxHashMap<String, Value>>,
auto_commit: bool,
cancelled: Arc<AtomicBool>,
current_schema: Arc<Option<String>>,
session_vars: Arc<HashMap<String, Value>>,
timeout_ms: u64,
view_depth: usize,
pub(crate) query_depth: usize,
pub(crate) outer_row: Option<FxHashMap<String, Value>>,
outer_columns: Option<Arc<Vec<String>>>,
cte_data: Option<Arc<CteDataMap>>,
transaction_id: Option<u64>,
}
type CteDataMap = FxHashMap<String, (Vec<String>, Vec<Row>)>;
use crate::storage::traits::Engine;
impl Default for ExecutionContext {
fn default() -> Self {
Self::new()
}
}
impl ExecutionContext {
pub fn new() -> Self {
Self {
params: Arc::new(Vec::new()),
named_params: Arc::new(FxHashMap::default()),
auto_commit: true,
cancelled: Arc::new(AtomicBool::new(false)),
current_schema: Arc::new(Some(DEFAULT_SCHEMA.to_string())),
session_vars: Arc::new(HashMap::new()),
timeout_ms: 0,
view_depth: 0,
query_depth: 0,
outer_row: None,
outer_columns: None,
cte_data: None,
transaction_id: None,
}
}
pub fn with_params(params: Vec<Value>) -> Self {
Self {
params: Arc::new(params),
..Self::new()
}
}
pub fn with_named_params(named_params: std::collections::HashMap<String, Value>) -> Self {
let fx_params: FxHashMap<String, Value> = named_params.into_iter().collect();
Self {
named_params: Arc::new(fx_params),
..Self::new()
}
}
pub fn get_param(&self, index: usize) -> Option<&Value> {
if index == 0 || index > self.params.len() {
None
} else {
self.params.get(index - 1)
}
}
pub fn get_named_param(&self, name: &str) -> Option<&Value> {
self.named_params.get(name)
}
pub fn param_count(&self) -> usize {
self.params.len()
}
pub fn params(&self) -> &[Value] {
&self.params
}
pub fn named_params(&self) -> &FxHashMap<String, Value> {
&self.named_params
}
pub fn named_params_arc(&self) -> &Arc<FxHashMap<String, Value>> {
&self.named_params
}
pub fn auto_commit(&self) -> bool {
self.auto_commit
}
pub fn cancel(&self) {
self.cancelled.store(true, Ordering::Relaxed);
}
pub fn is_cancelled(&self) -> bool {
self.cancelled.load(Ordering::Relaxed)
}
pub fn cancellation_handle(&self) -> CancellationHandle {
CancellationHandle {
cancelled: self.cancelled.clone(),
}
}
pub fn current_schema(&self) -> Option<&str> {
self.current_schema.as_deref()
}
pub fn set_current_schema(&mut self, schema: Option<String>) {
self.current_schema = Arc::new(schema);
}
pub fn session_vars(&self) -> &HashMap<String, Value> {
&self.session_vars
}
pub fn get_session_var(&self, name: &str) -> Option<&Value> {
self.session_vars.get(name)
}
pub fn set_session_var(&mut self, name: impl Into<String>, value: Value) {
let mut vars = (*self.session_vars).clone();
vars.insert(name.into(), value);
self.session_vars = Arc::new(vars);
}
pub fn timeout_ms(&self) -> u64 {
self.timeout_ms
}
pub fn view_depth(&self) -> usize {
self.view_depth
}
pub fn view_exists(&self, engine: &dyn Engine, view_name: &str) -> Result<bool> {
engine.view_exists(view_name)
}
pub fn table_exists(&self, engine: &dyn Engine, table_name: &str) -> Result<bool> {
engine.table_exists(table_name)
}
pub fn get_table_schema(&self, engine: &dyn Engine, table_name: &str) -> Result<Schema> {
engine.get_table_schema(table_name)
}
pub fn with_incremented_view_depth(&self) -> Self {
Self {
params: self.params.clone(),
named_params: self.named_params.clone(),
auto_commit: self.auto_commit,
cancelled: self.cancelled.clone(),
current_schema: self.current_schema.clone(),
session_vars: self.session_vars.clone(),
timeout_ms: self.timeout_ms,
view_depth: self.view_depth + 1,
query_depth: self.query_depth + 1, outer_row: self.outer_row.clone(),
outer_columns: self.outer_columns.clone(),
cte_data: self.cte_data.clone(),
transaction_id: self.transaction_id,
}
}
pub fn with_incremented_query_depth(&self) -> Self {
Self {
params: self.params.clone(),
named_params: self.named_params.clone(),
auto_commit: self.auto_commit,
cancelled: self.cancelled.clone(),
current_schema: self.current_schema.clone(),
session_vars: self.session_vars.clone(),
timeout_ms: self.timeout_ms,
view_depth: self.view_depth,
query_depth: self.query_depth + 1,
outer_row: self.outer_row.clone(),
outer_columns: self.outer_columns.clone(),
cte_data: self.cte_data.clone(),
transaction_id: self.transaction_id,
}
}
pub fn outer_row(&self) -> Option<&FxHashMap<String, Value>> {
self.outer_row.as_ref()
}
pub fn outer_columns(&self) -> Option<&[String]> {
self.outer_columns.as_ref().map(|v| v.as_slice())
}
pub fn with_outer_row(
&self,
outer_row: FxHashMap<String, Value>,
outer_columns: Arc<Vec<String>>,
) -> Self {
Self {
params: self.params.clone(), named_params: self.named_params.clone(), auto_commit: self.auto_commit,
cancelled: self.cancelled.clone(), current_schema: self.current_schema.clone(), session_vars: self.session_vars.clone(), timeout_ms: self.timeout_ms,
view_depth: self.view_depth,
query_depth: self.query_depth + 1, outer_row: Some(outer_row),
outer_columns: Some(outer_columns), cte_data: self.cte_data.clone(), transaction_id: self.transaction_id,
}
}
pub fn get_cte(&self, name: &str) -> Option<(&Vec<String>, &Vec<Row>)> {
self.cte_data.as_ref().and_then(|data| {
data.get(&name.to_lowercase())
.map(|(cols, rows)| (cols, rows))
})
}
pub fn has_cte(&self, name: &str) -> bool {
self.cte_data
.as_ref()
.is_some_and(|data| data.contains_key(&name.to_lowercase()))
}
pub fn with_cte_data(&self, cte_data: Arc<CteDataMap>) -> Self {
Self {
params: self.params.clone(),
named_params: self.named_params.clone(),
auto_commit: self.auto_commit,
cancelled: self.cancelled.clone(),
current_schema: self.current_schema.clone(),
session_vars: self.session_vars.clone(),
timeout_ms: self.timeout_ms,
view_depth: self.view_depth,
query_depth: self.query_depth,
outer_row: self.outer_row.clone(),
outer_columns: self.outer_columns.clone(),
cte_data: Some(cte_data),
transaction_id: self.transaction_id,
}
}
pub fn transaction_id(&self) -> Option<u64> {
self.transaction_id
}
pub fn set_transaction_id(&mut self, txn_id: u64) {
self.transaction_id = Some(txn_id);
}
pub fn with_transaction_id(&self, txn_id: u64) -> Self {
Self {
params: self.params.clone(),
named_params: self.named_params.clone(),
auto_commit: self.auto_commit,
cancelled: self.cancelled.clone(),
current_schema: self.current_schema.clone(),
session_vars: self.session_vars.clone(),
timeout_ms: self.timeout_ms,
view_depth: self.view_depth,
query_depth: self.query_depth,
outer_row: self.outer_row.clone(),
outer_columns: self.outer_columns.clone(),
cte_data: self.cte_data.clone(),
transaction_id: Some(txn_id),
}
}
pub fn check_cancelled(&self) -> Result<()> {
if self.is_cancelled() {
Err(crate::core::Error::QueryCancelled)
} else {
Ok(())
}
}
}
#[derive(Debug, Clone)]
pub struct CancellationHandle {
cancelled: Arc<AtomicBool>,
}
impl CancellationHandle {
pub fn cancel(&self) {
self.cancelled.store(true, Ordering::Relaxed);
}
pub fn is_cancelled(&self) -> bool {
self.cancelled.load(Ordering::Relaxed)
}
}
struct TimeoutEntry {
deadline: Instant,
id: u64,
cancel_handle: CancellationHandle,
cancelled: Arc<AtomicBool>,
}
impl PartialEq for TimeoutEntry {
fn eq(&self, other: &Self) -> bool {
self.deadline == other.deadline && self.id == other.id
}
}
impl Eq for TimeoutEntry {}
impl PartialOrd for TimeoutEntry {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for TimeoutEntry {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
other.deadline.cmp(&self.deadline)
}
}
struct TimeoutManagerState {
timeouts: BinaryHeap<TimeoutEntry>,
shutdown: bool,
}
struct TimeoutManager {
state: Mutex<TimeoutManagerState>,
condvar: Condvar,
next_id: AtomicU64,
}
impl TimeoutManager {
fn new() -> Arc<Self> {
let manager = Arc::new(Self {
state: Mutex::new(TimeoutManagerState {
timeouts: BinaryHeap::new(),
shutdown: false,
}),
condvar: Condvar::new(),
next_id: AtomicU64::new(1),
});
let manager_clone = Arc::clone(&manager);
std::thread::Builder::new()
.name("oxibase-timeout-manager".to_string())
.spawn(move || {
manager_clone.run();
})
.expect("Failed to spawn timeout manager thread");
manager
}
fn run(&self) {
loop {
let mut state = self.state.lock().unwrap();
if state.shutdown && state.timeouts.is_empty() {
return;
}
let now = Instant::now();
while let Some(entry) = state.timeouts.peek() {
if entry.deadline <= now {
let entry = state.timeouts.pop().unwrap();
if !entry.cancelled.load(Ordering::Relaxed) {
entry.cancel_handle.cancel();
}
} else {
break;
}
}
let wait_duration = if let Some(entry) = state.timeouts.peek() {
entry.deadline.saturating_duration_since(now)
} else {
Duration::from_secs(3600) };
if wait_duration.is_zero() {
continue; }
let (new_state, _timeout_result) =
self.condvar.wait_timeout(state, wait_duration).unwrap();
state = new_state;
if state.shutdown && state.timeouts.is_empty() {
return;
}
}
}
fn register(
&self,
timeout_ms: u64,
cancel_handle: CancellationHandle,
cancelled: Arc<AtomicBool>,
) -> u64 {
let id = self.next_id.fetch_add(1, Ordering::Relaxed);
let deadline = Instant::now() + Duration::from_millis(timeout_ms);
let entry = TimeoutEntry {
deadline,
id,
cancel_handle,
cancelled,
};
let mut state = self.state.lock().unwrap();
let was_empty = state.timeouts.is_empty();
let is_earliest = state.timeouts.peek().is_none_or(|e| deadline < e.deadline);
state.timeouts.push(entry);
if was_empty || is_earliest {
self.condvar.notify_one();
}
id
}
}
fn global_timeout_manager() -> &'static Arc<TimeoutManager> {
use std::sync::OnceLock;
static MANAGER: OnceLock<Arc<TimeoutManager>> = OnceLock::new();
MANAGER.get_or_init(TimeoutManager::new)
}
pub struct TimeoutGuard {
cancelled: Arc<AtomicBool>,
}
impl TimeoutGuard {
pub fn new(ctx: &ExecutionContext) -> Option<Self> {
let timeout_ms = ctx.timeout_ms();
if timeout_ms == 0 {
return None;
}
let cancel_handle = ctx.cancellation_handle();
let cancelled = Arc::new(AtomicBool::new(false));
global_timeout_manager().register(timeout_ms, cancel_handle, Arc::clone(&cancelled));
Some(Self { cancelled })
}
}
impl Drop for TimeoutGuard {
fn drop(&mut self) {
self.cancelled.store(true, Ordering::Relaxed);
}
}
pub struct ExecutionContextBuilder {
ctx: ExecutionContext,
}
impl ExecutionContextBuilder {
pub fn new() -> Self {
Self {
ctx: ExecutionContext::new(),
}
}
pub fn params(mut self, params: Vec<Value>) -> Self {
self.ctx.params = Arc::new(params);
self
}
pub fn param(self, value: Value) -> Self {
let mut v = (*self.ctx.params).clone();
v.push(value);
Self {
ctx: ExecutionContext {
params: Arc::new(v),
..self.ctx
},
}
}
pub fn named_param(self, name: impl Into<String>, value: Value) -> Self {
Self {
ctx: ExecutionContext {
named_params: Arc::new({
let mut m = (*self.ctx.named_params).clone();
m.insert(name.into(), value);
m
}),
..self.ctx
},
}
}
pub fn auto_commit(mut self, auto_commit: bool) -> Self {
self.ctx.auto_commit = auto_commit;
self
}
pub fn database(mut self, database: impl Into<String>) -> Self {
self.ctx.current_schema = Arc::new(Some(database.into()));
self
}
pub fn session_var(self, name: impl Into<String>, value: Value) -> Self {
Self {
ctx: ExecutionContext {
session_vars: Arc::new({
let mut m = (*self.ctx.session_vars).clone();
m.insert(name.into(), value);
m
}),
..self.ctx
},
}
}
pub fn timeout_ms(mut self, timeout_ms: u64) -> Self {
self.ctx.timeout_ms = timeout_ms;
self
}
pub fn build(self) -> ExecutionContext {
self.ctx
}
}
impl Default for ExecutionContextBuilder {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_context_new() {
let ctx = ExecutionContext::new();
assert_eq!(ctx.param_count(), 0);
assert!(ctx.auto_commit());
assert!(!ctx.is_cancelled());
}
#[test]
fn test_context_with_params() {
let ctx = ExecutionContext::with_params(vec![Value::Integer(1), Value::text("hello")]);
assert_eq!(ctx.param_count(), 2);
assert_eq!(ctx.get_param(1), Some(&Value::Integer(1)));
assert_eq!(ctx.get_param(2), Some(&Value::text("hello")));
assert_eq!(ctx.get_param(0), None); assert_eq!(ctx.get_param(3), None); }
#[test]
fn test_context_named_params() {
let mut params = HashMap::new();
params.insert("name".to_string(), Value::text("Alice"));
params.insert("age".to_string(), Value::Integer(30));
let ctx = ExecutionContext::with_named_params(params);
assert_eq!(ctx.get_named_param("name"), Some(&Value::text("Alice")));
assert_eq!(ctx.get_named_param("age"), Some(&Value::Integer(30)));
assert_eq!(ctx.get_named_param("unknown"), None);
}
#[test]
fn test_context_cancellation() {
let ctx = ExecutionContext::new();
assert!(!ctx.is_cancelled());
let handle = ctx.cancellation_handle();
assert!(!handle.is_cancelled());
handle.cancel();
assert!(ctx.is_cancelled());
assert!(handle.is_cancelled());
}
#[test]
fn test_context_check_cancelled() {
let ctx = ExecutionContext::new();
assert!(ctx.check_cancelled().is_ok());
ctx.cancel();
assert!(ctx.check_cancelled().is_err());
}
#[test]
fn test_context_session_vars() {
let mut ctx = ExecutionContext::new();
ctx.set_session_var("timezone", Value::text("UTC"));
assert_eq!(ctx.get_session_var("timezone"), Some(&Value::text("UTC")));
assert_eq!(ctx.get_session_var("unknown"), None);
}
#[test]
fn test_context_builder() {
let ctx = ExecutionContextBuilder::new()
.params(vec![Value::Integer(1)])
.param(Value::Integer(2))
.named_param("name", Value::text("test"))
.auto_commit(false)
.database("mydb")
.timeout_ms(5000)
.build();
assert_eq!(ctx.param_count(), 2);
assert_eq!(ctx.get_param(1), Some(&Value::Integer(1)));
assert_eq!(ctx.get_param(2), Some(&Value::Integer(2)));
assert_eq!(ctx.get_named_param("name"), Some(&Value::text("test")));
assert!(!ctx.auto_commit());
assert_eq!(ctx.current_schema(), Some("mydb"));
assert_eq!(ctx.timeout_ms(), 5000);
}
}