use crate::bridge::envelope::PhysicalPlan;
use crate::bridge::scan_filter::FilterOp;
use crate::control::security::auth_context::AuthContext;
use crate::control::security::rls::RlsPolicyStore;
use crate::types::TenantId;
use nodedb_physical::physical_plan::{
ColumnarOp, DocumentOp, GraphOp, KvOp, QueryOp, SpatialOp, TextOp, TimeseriesOp, VectorOp,
};
use nodedb_physical::physical_task::PhysicalTask;
pub fn inject_rls(
tasks: &mut [PhysicalTask],
rls_store: &RlsPolicyStore,
auth: &AuthContext,
) -> crate::Result<()> {
for task in tasks.iter_mut() {
let tenant_id = task.tenant_id.as_u64();
inject_rls_for_plan(tenant_id, &mut task.plan, rls_store, auth)?;
}
Ok(())
}
pub fn inject_rls_for_single_plan(
tenant_id: u64,
plan: &mut PhysicalPlan,
rls_store: &RlsPolicyStore,
auth: &AuthContext,
) -> crate::Result<()> {
inject_rls_for_plan(tenant_id, plan, rls_store, auth)
}
fn inject_rls_for_plan(
tenant_id: u64,
plan: &mut PhysicalPlan,
rls_store: &RlsPolicyStore,
auth: &AuthContext,
) -> crate::Result<()> {
match plan {
PhysicalPlan::Document(DocumentOp::Scan {
collection,
filters,
..
})
| PhysicalPlan::Kv(KvOp::Scan {
collection,
filters,
..
})
| PhysicalPlan::Query(QueryOp::Aggregate {
collection,
filters,
..
}) => {
let rls = get_rls(rls_store, tenant_id, collection, auth)?;
if !rls.is_empty() {
merge_filters(filters, &rls)?;
}
}
PhysicalPlan::Document(DocumentOp::PointGet {
collection,
rls_filters,
..
})
| PhysicalPlan::Kv(KvOp::Get {
collection,
rls_filters,
..
})
| PhysicalPlan::Vector(VectorOp::Search {
collection,
rls_filters,
..
})
| PhysicalPlan::Vector(VectorOp::MultiSearch {
collection,
rls_filters,
..
})
| PhysicalPlan::Text(TextOp::Search {
collection,
rls_filters,
..
})
| PhysicalPlan::Text(TextOp::HybridSearch {
collection,
rls_filters,
..
})
| PhysicalPlan::Text(TextOp::HybridSearchTriple {
collection,
rls_filters,
..
})
| PhysicalPlan::Columnar(ColumnarOp::Scan {
collection,
rls_filters,
..
})
| PhysicalPlan::Timeseries(TimeseriesOp::Scan {
collection,
rls_filters,
..
})
| PhysicalPlan::Spatial(SpatialOp::Scan {
collection,
rls_filters,
..
}) => {
let rls = get_rls(rls_store, tenant_id, collection, auth)?;
if !rls.is_empty() {
*rls_filters = rls;
}
}
PhysicalPlan::Document(DocumentOp::RangeScan { collection, .. })
| PhysicalPlan::Document(DocumentOp::IndexLookup { collection, .. })
| PhysicalPlan::Kv(KvOp::BatchGet { collection, .. })
| PhysicalPlan::Kv(KvOp::FieldGet { collection, .. }) => {
let rls = get_rls(rls_store, tenant_id, collection, auth)?;
if !rls.is_empty() {
return Err(crate::Error::PlanError {
detail: format!(
"RLS policies on '{collection}' not supported with this operation type"
),
});
}
}
PhysicalPlan::Graph(
GraphOp::Hop { rls_filters, .. }
| GraphOp::Neighbors { rls_filters, .. }
| GraphOp::Path { rls_filters, .. }
| GraphOp::Subgraph { rls_filters, .. },
) => {
let _ = rls_filters;
}
_ => {}
}
Ok(())
}
fn get_rls(
rls_store: &RlsPolicyStore,
tenant_id: u64,
collection: &str,
auth: &AuthContext,
) -> crate::Result<Vec<u8>> {
rls_store
.combined_read_predicate_with_auth(tenant_id, collection, auth)
.ok_or_else(|| rls_deny_error(tenant_id, collection))
}
fn merge_filters(existing: &mut Vec<u8>, rls_bytes: &[u8]) -> crate::Result<()> {
if existing.is_empty() {
*existing = rls_bytes.to_vec();
return Ok(());
}
let mut all: Vec<crate::bridge::scan_filter::ScanFilter> = zerompk::from_msgpack(existing)
.map_err(|e| crate::Error::PlanError {
detail: format!("RLS filter deserialization failed (existing): {e}"),
})?;
let rls: Vec<crate::bridge::scan_filter::ScanFilter> = zerompk::from_msgpack(rls_bytes)
.map_err(|e| crate::Error::PlanError {
detail: format!("RLS filter deserialization failed (new): {e}"),
})?;
all.extend(rls);
*existing = zerompk::to_msgpack_vec(&all).map_err(|e| crate::Error::PlanError {
detail: format!("RLS filter serialization failed: {e}"),
})?;
Ok(())
}
fn rls_deny_error(tenant_id: u64, collection: &str) -> crate::Error {
crate::Error::RejectedAuthz {
tenant_id: TenantId::new(tenant_id),
resource: format!(
"RLS policy on '{}': unresolved session variable (deny by default)",
collection
),
}
}
pub fn inject_permission_tree(
tasks: &mut [PhysicalTask],
cache: &crate::control::security::permission_tree::PermissionCache,
auth: &AuthContext,
) -> crate::Result<()> {
if auth.is_superuser() {
return Ok(());
}
for task in tasks.iter_mut() {
let tenant_id = task.tenant_id.as_u64();
inject_permission_tree_for_plan(tenant_id, &mut task.plan, cache, auth)?;
}
Ok(())
}
fn inject_permission_tree_for_plan(
tenant_id: u64,
plan: &mut PhysicalPlan,
cache: &crate::control::security::permission_tree::PermissionCache,
auth: &AuthContext,
) -> crate::Result<()> {
let (collection, filters, level_key) = match plan {
PhysicalPlan::Document(DocumentOp::Scan {
collection,
filters,
..
})
| PhysicalPlan::Kv(KvOp::Scan {
collection,
filters,
..
})
| PhysicalPlan::Query(QueryOp::Aggregate {
collection,
filters,
..
}) => (collection.as_str(), Some(filters), PermTreeLevel::Read),
PhysicalPlan::Document(DocumentOp::Upsert { collection, .. })
| PhysicalPlan::Document(DocumentOp::BatchInsert { collection, .. })
| PhysicalPlan::Kv(KvOp::Put { collection, .. })
| PhysicalPlan::Kv(KvOp::Insert { collection, .. })
| PhysicalPlan::Kv(KvOp::InsertIfAbsent { collection, .. })
| PhysicalPlan::Kv(KvOp::InsertOnConflictUpdate { collection, .. })
| PhysicalPlan::Kv(KvOp::BatchPut { collection, .. }) => {
(collection.as_str(), None, PermTreeLevel::Write)
}
PhysicalPlan::Document(DocumentOp::PointDelete { collection, .. })
| PhysicalPlan::Document(DocumentOp::BulkDelete { collection, .. })
| PhysicalPlan::Document(DocumentOp::Truncate { collection, .. })
| PhysicalPlan::Kv(KvOp::Delete { collection, .. }) => {
(collection.as_str(), None, PermTreeLevel::Delete)
}
_ => return Ok(()),
};
let Some(def) = cache.get_tree_def(tenant_id, collection) else {
return Ok(()); };
let required_level = match level_key {
PermTreeLevel::Read => &def.read_level,
PermTreeLevel::Write => &def.write_level,
PermTreeLevel::Delete => &def.delete_level,
};
let user_id = &auth.id;
let user_roles = &auth.roles;
if filters.is_none() {
let accessible = crate::control::security::permission_tree::resolver::accessible_resources(
cache,
def,
tenant_id,
user_id,
user_roles,
required_level,
);
if accessible.is_empty() {
return Err(crate::Error::RejectedAuthz {
tenant_id: TenantId::new(tenant_id),
resource: format!(
"permission tree on '{collection}': user has no '{required_level}' access"
),
});
}
return Ok(());
}
let accessible = crate::control::security::permission_tree::resolver::accessible_resources(
cache,
def,
tenant_id,
user_id,
user_roles,
required_level,
);
let in_filter = crate::bridge::scan_filter::ScanFilter {
field: def.resource_column.clone(),
op: FilterOp::In,
value: nodedb_types::Value::Array(
accessible
.into_iter()
.map(nodedb_types::Value::String)
.collect(),
),
clauses: Vec::new(),
expr: None,
};
let filter_bytes =
zerompk::to_msgpack_vec(&vec![in_filter]).map_err(|e| crate::Error::PlanError {
detail: format!("permission tree filter serialization: {e}"),
})?;
if let Some(filters) = filters {
merge_filters(filters, &filter_bytes)?;
}
Ok(())
}
enum PermTreeLevel {
Read,
Write,
Delete,
}