use nodedb::bridge::envelope::{ErrorCode, PhysicalPlan, Status};
use nodedb::bridge::physical_plan::{DocumentOp, GraphOp, VectorOp};
use crate::helpers::*;
#[test]
fn security_tenant_isolation() {
let (mut core, mut tx, mut rx) = make_core();
send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Document(DocumentOp::PointPut {
collection: "secrets".into(),
document_id: "s1".into(),
value: b"{\"data\":\"tenant1_secret\"}".to_vec(),
}),
);
let resp = send_raw(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Document(DocumentOp::PointGet {
collection: "secrets".into(),
document_id: "s1".into(),
rls_filters: Vec::new(),
}),
);
assert_eq!(resp.status, Status::Ok);
}
#[test]
fn security_rls_policy_enforcement() {
use nodedb::control::security::rls::{PolicyType, RlsPolicy, RlsPolicyStore};
let store = RlsPolicyStore::new();
let filter = nodedb::bridge::scan_filter::ScanFilter {
field: "status".into(),
op: "eq".into(),
value: serde_json::json!("approved"),
clauses: Vec::new(),
};
let predicate = rmp_serde::to_vec_named(&vec![filter]).unwrap();
store
.create_policy(RlsPolicy {
name: "require_approved".into(),
collection: "orders".into(),
tenant_id: 1,
policy_type: PolicyType::Write,
predicate,
compiled_predicate: None,
mode: nodedb::control::security::predicate::PolicyMode::default(),
on_deny: Default::default(),
enabled: true,
created_by: "admin".into(),
created_at: 0,
})
.unwrap();
let doc_ok = serde_json::json!({"status": "approved", "amount": 100});
assert!(store.check_write(1, "orders", &doc_ok, "user1").is_ok());
let doc_bad = serde_json::json!({"status": "pending", "amount": 200});
let err = store.check_write(1, "orders", &doc_bad, "user1");
assert!(err.is_err());
assert!(err.unwrap_err().to_string().contains("require_approved"));
assert!(store.check_write(99, "orders", &doc_bad, "user1").is_ok());
}
#[test]
fn security_jwt_validation() {
use nodedb::control::security::jwt::{JwtConfig, JwtError, JwtValidator};
let validator = JwtValidator::new(JwtConfig::default());
assert_eq!(
validator.validate("not-a-jwt").unwrap_err(),
JwtError::MalformedToken
);
assert_eq!(
validator.validate("header.payload").unwrap_err(),
JwtError::MalformedToken
);
}
#[test]
fn linearizability_read_after_write() {
let (mut core, mut tx, mut rx) = make_core();
for i in 0..20u32 {
let doc_id = format!("lin_{i}");
let value = format!("{{\"seq\":{i}}}");
send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Document(DocumentOp::PointPut {
collection: "linear".into(),
document_id: doc_id.clone(),
value: value.into_bytes(),
}),
);
let resp = send_raw(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Document(DocumentOp::PointGet {
collection: "linear".into(),
document_id: doc_id.clone(),
rls_filters: Vec::new(),
}),
);
assert_eq!(
resp.status,
Status::Ok,
"read-after-write failed for {doc_id}"
);
}
}
#[test]
fn linearizability_delete_visibility() {
let (mut core, mut tx, mut rx) = make_core();
send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Document(DocumentOp::PointPut {
collection: "linear".into(),
document_id: "del1".into(),
value: b"{\"x\":1}".to_vec(),
}),
);
send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Document(DocumentOp::PointDelete {
collection: "linear".into(),
document_id: "del1".into(),
}),
);
let resp = send_raw(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Document(DocumentOp::PointGet {
collection: "linear".into(),
document_id: "del1".into(),
rls_filters: Vec::new(),
}),
);
assert_eq!(resp.error_code, Some(ErrorCode::NotFound));
}
#[test]
fn wal_replay_deterministic() {
let (mut core, mut tx, mut rx) = make_core();
let ops = vec![
("put", "d1", b"{\"a\":1}".to_vec()),
("put", "d2", b"{\"b\":2}".to_vec()),
("put", "d3", b"{\"c\":3}".to_vec()),
("put", "d1", b"{\"a\":10}".to_vec()), ];
for (op, doc_id, value) in &ops {
if *op == "put" {
send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Document(DocumentOp::PointPut {
collection: "replay".into(),
document_id: doc_id.to_string(),
value: value.clone(),
}),
);
}
}
let d1 = send_raw(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Document(DocumentOp::PointGet {
collection: "replay".into(),
document_id: "d1".into(),
rls_filters: Vec::new(),
}),
);
assert_eq!(d1.status, Status::Ok);
let d1_json = payload_json(&d1.payload);
assert!(
d1_json.contains("10"),
"d1 should be overwritten: {d1_json}"
);
let d2 = send_raw(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Document(DocumentOp::PointGet {
collection: "replay".into(),
document_id: "d2".into(),
rls_filters: Vec::new(),
}),
);
assert_eq!(d2.status, Status::Ok);
let d3 = send_raw(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Document(DocumentOp::PointGet {
collection: "replay".into(),
document_id: "d3".into(),
rls_filters: Vec::new(),
}),
);
assert_eq!(d3.status, Status::Ok);
}
#[test]
fn mixed_engine_isolation_no_cross_eviction() {
let (mut core, mut tx, mut rx) = make_core();
for i in 0..50u32 {
send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Document(DocumentOp::PointPut {
collection: "mixed".into(),
document_id: format!("doc_{i}"),
value: format!("{{\"val\":{i}}}").into_bytes(),
}),
);
}
for i in 0..50u32 {
send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Vector(VectorOp::Insert {
collection: "mixed".into(),
vector: vec![i as f32, 0.0, 0.0],
dim: 3,
field_name: String::new(),
doc_id: None,
}),
);
}
for i in 0..49u32 {
send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Graph(GraphOp::EdgePut {
src_id: format!("doc_{i}"),
label: "NEXT".into(),
dst_id: format!("doc_{}", i + 1),
properties: vec![],
}),
);
}
let doc = send_raw(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Document(DocumentOp::PointGet {
collection: "mixed".into(),
document_id: "doc_25".into(),
rls_filters: Vec::new(),
}),
);
assert_eq!(doc.status, Status::Ok, "sparse engine should be intact");
let vec_resp = send_raw(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Vector(VectorOp::Search {
collection: "mixed".into(),
query_vector: std::sync::Arc::from([25.0f32, 0.0, 0.0].as_slice()),
top_k: 3,
ef_search: 0,
filter_bitmap: None,
field_name: String::new(),
rls_filters: Vec::new(),
}),
);
assert_eq!(
vec_resp.status,
Status::Ok,
"vector engine should be intact"
);
let graph_resp = send_raw(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Graph(GraphOp::Neighbors {
node_id: "doc_25".into(),
edge_label: Some("NEXT".into()),
direction: nodedb::engine::graph::edge_store::Direction::Out,
rls_filters: Vec::new(),
}),
);
assert_eq!(
graph_resp.status,
Status::Ok,
"graph engine should be intact"
);
let neighbors_json = payload_json(&graph_resp.payload);
assert!(
neighbors_json.contains("doc_26"),
"graph should have doc_25→doc_26: {neighbors_json}"
);
}