use std::sync::Arc;
use crate::control::security::credential::CredentialStore;
use crate::control::security::identity::AuthenticatedIdentity;
use crate::control::security::permission::PermissionStore;
use crate::control::security::role::RoleStore;
pub struct PlanSecurityContext<'a> {
pub identity: &'a AuthenticatedIdentity,
pub auth: &'a crate::control::security::auth_context::AuthContext,
pub rls_store: &'a crate::control::security::rls::RlsPolicyStore,
pub permissions: &'a PermissionStore,
pub roles: &'a RoleStore,
pub permission_cache: Option<&'a crate::control::security::permission_tree::PermissionCache>,
}
pub struct QueryContext {
catalog_inputs: Option<CatalogInputs>,
retention_registry:
Option<Arc<crate::engine::timeseries::retention_policy::RetentionPolicyRegistry>>,
array_catalog: Option<crate::control::array_catalog::ArrayCatalogHandle>,
wal: Option<Arc<crate::wal::WalManager>>,
surrogate_assigner: Option<Arc<crate::control::surrogate::SurrogateAssigner>>,
cluster_enabled: bool,
bitemporal_retention_registry:
Option<Arc<crate::engine::bitemporal::BitemporalRetentionRegistry>>,
max_vector_dim: std::sync::atomic::AtomicU32,
}
#[derive(Clone)]
struct CatalogInputs {
credentials: Arc<CredentialStore>,
shared: Option<std::sync::Weak<crate::control::state::SharedState>>,
retention_policy_registry:
Option<Arc<crate::engine::timeseries::retention_policy::RetentionPolicyRegistry>>,
}
impl CatalogInputs {
fn build_adapter(
&self,
tenant_id: u64,
database_id: crate::types::DatabaseId,
) -> super::catalog_adapter::OriginCatalog {
if let Some(weak) = &self.shared
&& let Some(shared) = weak.upgrade()
{
super::catalog_adapter::OriginCatalog::new_with_lease(
&shared,
tenant_id,
database_id,
self.retention_policy_registry.clone(),
)
} else {
super::catalog_adapter::OriginCatalog::new(
Arc::clone(&self.credentials),
tenant_id,
database_id,
self.retention_policy_registry.clone(),
)
}
}
}
impl QueryContext {
pub fn new() -> Self {
Self {
catalog_inputs: None,
retention_registry: None,
array_catalog: None,
wal: None,
surrogate_assigner: None,
cluster_enabled: false,
bitemporal_retention_registry: None,
max_vector_dim: std::sync::atomic::AtomicU32::new(0),
}
}
pub fn for_state(state: &crate::control::state::SharedState) -> Self {
let mut ctx = Self::with_catalog(
Arc::clone(&state.credentials),
Some(Arc::clone(&state.retention_policy_registry)),
);
ctx.surrogate_assigner = Some(Arc::clone(&state.surrogate_assigner));
ctx.cluster_enabled = state.cluster_topology.is_some();
ctx.bitemporal_retention_registry = Some(Arc::clone(&state.bitemporal_retention_registry));
ctx.max_vector_dim
.store(0, std::sync::atomic::Ordering::Relaxed);
ctx
}
pub fn for_state_with_lease(state: &Arc<crate::control::state::SharedState>) -> Self {
let retention = Some(Arc::clone(&state.retention_policy_registry));
Self {
catalog_inputs: Some(CatalogInputs {
credentials: Arc::clone(&state.credentials),
shared: Some(Arc::downgrade(state)),
retention_policy_registry: retention.clone(),
}),
retention_registry: retention,
array_catalog: Some(state.array_catalog.clone()),
wal: Some(Arc::clone(&state.wal)),
surrogate_assigner: Some(Arc::clone(&state.surrogate_assigner)),
cluster_enabled: state.cluster_topology.is_some(),
bitemporal_retention_registry: Some(Arc::clone(&state.bitemporal_retention_registry)),
max_vector_dim: std::sync::atomic::AtomicU32::new(0),
}
}
pub fn set_max_vector_dim(&self, dim: u32) {
self.max_vector_dim
.store(dim, std::sync::atomic::Ordering::Relaxed);
}
pub fn set_rounding_mode(&self, _mode: &str) {}
pub fn with_catalog(
credentials: Arc<CredentialStore>,
retention_policy_registry: Option<
Arc<crate::engine::timeseries::retention_policy::RetentionPolicyRegistry>,
>,
) -> Self {
let catalog_inputs = Some(CatalogInputs {
credentials,
shared: None,
retention_policy_registry: retention_policy_registry.clone(),
});
Self {
catalog_inputs,
retention_registry: retention_policy_registry,
array_catalog: None,
wal: None,
surrogate_assigner: None,
cluster_enabled: false,
bitemporal_retention_registry: None,
max_vector_dim: std::sync::atomic::AtomicU32::new(0),
}
}
pub async fn plan_sql(
&self,
sql: &str,
tenant_id: crate::types::TenantId,
database_id: crate::types::DatabaseId,
) -> crate::Result<Vec<super::physical::PhysicalTask>> {
self.plan_with_nodedb_sql(sql, tenant_id, database_id)
.map(|(t, _)| t)
}
fn plan_with_nodedb_sql(
&self,
sql: &str,
tenant_id: crate::types::TenantId,
database_id: crate::types::DatabaseId,
) -> crate::Result<(
Vec<super::physical::PhysicalTask>,
super::descriptor_set::DescriptorVersionSet,
)> {
let inputs = match &self.catalog_inputs {
Some(i) => i,
None => {
return Err(crate::Error::PlanError {
detail: "no catalog available for SQL planning".into(),
});
}
};
let catalog = inputs.build_adapter(tenant_id.as_u64(), database_id);
let plans = nodedb_sql::plan_sql(sql, &catalog).map_err(|e| match e {
nodedb_sql::SqlError::RetryableSchemaChanged { descriptor } => {
crate::Error::RetryableSchemaChanged { descriptor }
}
nodedb_sql::SqlError::CollectionDeactivated {
name,
retention_expires_at_ns,
..
} => crate::Error::CollectionDeactivated {
tenant_id,
collection: name,
retention_expires_at_ns,
},
other => crate::Error::PlanError {
detail: format!("{other}"),
},
})?;
let version_set = catalog.take_recorded_versions();
let ctx = super::sql_plan_convert::ConvertContext {
retention_registry: self.retention_registry.clone(),
array_catalog: self.array_catalog.clone(),
credentials: self
.catalog_inputs
.as_ref()
.map(|i| Arc::clone(&i.credentials)),
wal: self.wal.clone(),
surrogate_assigner: self.surrogate_assigner.clone(),
cluster_enabled: self.cluster_enabled,
bitemporal_retention_registry: self.bitemporal_retention_registry.clone(),
max_vector_dim: self
.max_vector_dim
.load(std::sync::atomic::Ordering::Relaxed),
database_id,
};
let tasks = super::sql_plan_convert::convert(&plans, tenant_id, &ctx)?;
Ok((tasks, version_set))
}
#[allow(clippy::too_many_arguments)]
pub async fn plan_sql_with_rls(
&self,
sql: &str,
tenant_id: crate::types::TenantId,
database_id: crate::types::DatabaseId,
sec: &PlanSecurityContext<'_>,
) -> crate::Result<Vec<super::physical::PhysicalTask>> {
self.plan_sql_with_rls_returning(sql, tenant_id, database_id, sec, false)
.await
}
pub async fn plan_sql_with_rls_returning(
&self,
sql: &str,
tenant_id: crate::types::TenantId,
database_id: crate::types::DatabaseId,
sec: &PlanSecurityContext<'_>,
returning: bool,
) -> crate::Result<Vec<super::physical::PhysicalTask>> {
self.plan_sql_with_rls_and_versions(sql, tenant_id, database_id, sec, returning)
.await
.map(|(tasks, _)| tasks)
}
pub async fn plan_sql_with_rls_and_versions(
&self,
sql: &str,
tenant_id: crate::types::TenantId,
database_id: crate::types::DatabaseId,
sec: &PlanSecurityContext<'_>,
_returning: bool,
) -> crate::Result<(
Vec<super::physical::PhysicalTask>,
super::descriptor_set::DescriptorVersionSet,
)> {
let (mut tasks, version_set) = self.plan_with_nodedb_sql(sql, tenant_id, database_id)?;
super::rls_injection::inject_rls(&mut tasks, sec.rls_store, sec.auth)?;
if let Some(cache) = sec.permission_cache {
super::rls_injection::inject_permission_tree(&mut tasks, cache, sec.auth)?;
}
Ok((tasks, version_set))
}
pub async fn plan_sql_with_params_and_rls(
&self,
sql: &str,
params: &[nodedb_sql::ParamValue],
tenant_id: crate::types::TenantId,
database_id: crate::types::DatabaseId,
sec: &PlanSecurityContext<'_>,
) -> crate::Result<Vec<super::physical::PhysicalTask>> {
let inputs = match &self.catalog_inputs {
Some(i) => i,
None => {
return Err(crate::Error::PlanError {
detail: "no catalog available for SQL planning".into(),
});
}
};
let catalog = inputs.build_adapter(tenant_id.as_u64(), database_id);
let plans = nodedb_sql::plan_sql_with_params(sql, params, &catalog).map_err(|e| {
crate::Error::PlanError {
detail: format!("{e}"),
}
})?;
let ctx = super::sql_plan_convert::ConvertContext {
retention_registry: self.retention_registry.clone(),
array_catalog: self.array_catalog.clone(),
credentials: self
.catalog_inputs
.as_ref()
.map(|i| Arc::clone(&i.credentials)),
wal: self.wal.clone(),
surrogate_assigner: self.surrogate_assigner.clone(),
cluster_enabled: self.cluster_enabled,
bitemporal_retention_registry: self.bitemporal_retention_registry.clone(),
max_vector_dim: self
.max_vector_dim
.load(std::sync::atomic::Ordering::Relaxed),
database_id,
};
let mut tasks = super::sql_plan_convert::convert(&plans, tenant_id, &ctx)?;
super::rls_injection::inject_rls(&mut tasks, sec.rls_store, sec.auth)?;
if let Some(cache) = sec.permission_cache {
super::rls_injection::inject_permission_tree(&mut tasks, cache, sec.auth)?;
}
Ok(tasks)
}
}
impl Default for QueryContext {
fn default() -> Self {
Self::new()
}
}
pub const SYSTEM_FUNCTION_NAMES: &[&str] = &[
"doc_get",
"doc_exists",
"doc_array_contains",
"vector_distance",
"multi_vector_search",
"rrf_score",
"bm25_score",
"text_match",
"st_dwithin",
"st_contains",
"st_intersects",
"st_within",
"st_distance",
"geo_distance",
"time_bucket",
"ts_rate",
"ts_derivative",
"ts_moving_avg",
"ts_ema",
"ts_delta",
"ts_interpolate",
"ts_lag",
"ts_lead",
"ts_rank",
"ts_percentile",
"ts_stddev",
"ts_correlate",
"ts_zscore",
"ts_bollinger_upper",
"ts_bollinger_lower",
"ts_bollinger_mid",
"ts_bollinger_width",
"ts_moving_percentile",
"approx_count_distinct",
"approx_percentile",
"approx_topk",
"approx_count",
"round",
"nextval",
"currval",
"setval",
"next_preview",
];