use crate::bridge::envelope::PhysicalPlan;
use crate::bridge::physical_plan::{
DocumentOp, GraphOp, KvOp, QueryOp, SpatialOp, TextOp, TimeseriesOp, VectorOp,
};
use crate::control::planner::physical::PhysicalTask;
use crate::control::security::auth_context::AuthContext;
use crate::control::security::rls::RlsPolicyStore;
use crate::types::TenantId;
pub fn inject_rls(
tasks: &mut [PhysicalTask],
rls_store: &RlsPolicyStore,
auth: &AuthContext,
) -> crate::Result<()> {
for task in tasks.iter_mut() {
let tenant_id = task.tenant_id.as_u32();
inject_rls_for_plan(tenant_id, &mut task.plan, rls_store, auth)?;
}
Ok(())
}
fn inject_rls_for_plan(
tenant_id: u32,
plan: &mut PhysicalPlan,
rls_store: &RlsPolicyStore,
auth: &AuthContext,
) -> crate::Result<()> {
match plan {
PhysicalPlan::Document(DocumentOp::Scan {
collection,
filters,
..
})
| PhysicalPlan::Kv(KvOp::Scan {
collection,
filters,
..
})
| PhysicalPlan::Query(QueryOp::Aggregate {
collection,
filters,
..
}) => {
let rls = get_rls(rls_store, tenant_id, collection, auth)?;
if !rls.is_empty() {
merge_filters(filters, &rls);
}
}
PhysicalPlan::Document(DocumentOp::PointGet {
collection,
rls_filters,
..
})
| PhysicalPlan::Kv(KvOp::Get {
collection,
rls_filters,
..
})
| PhysicalPlan::Vector(VectorOp::Search {
collection,
rls_filters,
..
})
| PhysicalPlan::Vector(VectorOp::MultiSearch {
collection,
rls_filters,
..
})
| PhysicalPlan::Text(TextOp::Search {
collection,
rls_filters,
..
})
| PhysicalPlan::Text(TextOp::HybridSearch {
collection,
rls_filters,
..
})
| PhysicalPlan::Timeseries(TimeseriesOp::Scan {
collection,
rls_filters,
..
})
| PhysicalPlan::Spatial(SpatialOp::Scan {
collection,
rls_filters,
..
}) => {
let rls = get_rls(rls_store, tenant_id, collection, auth)?;
if !rls.is_empty() {
*rls_filters = rls;
}
}
PhysicalPlan::Document(DocumentOp::RangeScan { collection, .. })
| PhysicalPlan::Document(DocumentOp::IndexLookup { collection, .. })
| PhysicalPlan::Kv(KvOp::BatchGet { collection, .. })
| PhysicalPlan::Kv(KvOp::FieldGet { collection, .. }) => {
let rls = get_rls(rls_store, tenant_id, collection, auth)?;
if !rls.is_empty() {
return Err(crate::Error::PlanError {
detail: format!(
"RLS policies on '{collection}' not supported with this operation type"
),
});
}
}
PhysicalPlan::Graph(
GraphOp::Hop { rls_filters, .. }
| GraphOp::Neighbors { rls_filters, .. }
| GraphOp::Path { rls_filters, .. }
| GraphOp::Subgraph { rls_filters, .. },
) => {
let _ = rls_filters;
}
_ => {}
}
Ok(())
}
fn get_rls(
rls_store: &RlsPolicyStore,
tenant_id: u32,
collection: &str,
auth: &AuthContext,
) -> crate::Result<Vec<u8>> {
rls_store
.combined_read_predicate_with_auth(tenant_id, collection, auth)
.ok_or_else(|| rls_deny_error(tenant_id, collection))
}
fn merge_filters(existing: &mut Vec<u8>, rls_bytes: &[u8]) {
if existing.is_empty() {
*existing = rls_bytes.to_vec();
return;
}
let mut all: Vec<crate::bridge::scan_filter::ScanFilter> =
rmp_serde::from_slice(existing).unwrap_or_default();
let rls: Vec<crate::bridge::scan_filter::ScanFilter> =
rmp_serde::from_slice(rls_bytes).unwrap_or_default();
all.extend(rls);
*existing = rmp_serde::to_vec_named(&all).unwrap_or_default();
}
fn rls_deny_error(tenant_id: u32, collection: &str) -> crate::Error {
crate::Error::RejectedAuthz {
tenant_id: TenantId::new(tenant_id),
resource: format!(
"RLS policy on '{}': unresolved session variable (deny by default)",
collection
),
}
}