use nodedb::bridge::envelope::{PhysicalPlan, Status};
use nodedb::bridge::physical_plan::KvOp;
use crate::helpers::*;
#[test]
fn kv_put_get_delete() {
let (mut core, mut tx, mut rx) = make_core();
send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Kv(KvOp::Put {
collection: "cache".into(),
key: b"key1".to_vec(),
value: b"value1".to_vec(),
ttl_ms: 0,
}),
);
let payload = send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Kv(KvOp::Get {
collection: "cache".into(),
key: b"key1".to_vec(),
rls_filters: Vec::new(),
}),
);
assert_eq!(payload, b"value1");
let payload = send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Kv(KvOp::Delete {
collection: "cache".into(),
keys: vec![b"key1".to_vec()],
}),
);
let json: serde_json::Value = serde_json::from_slice(&payload).unwrap();
assert_eq!(json["deleted"], 1);
let resp = send_raw(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Kv(KvOp::Get {
collection: "cache".into(),
key: b"key1".to_vec(),
rls_filters: Vec::new(),
}),
);
assert_eq!(resp.status, Status::Error);
}
#[test]
fn kv_overwrite_returns_ok() {
let (mut core, mut tx, mut rx) = make_core();
send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Kv(KvOp::Put {
collection: "c".into(),
key: b"k".to_vec(),
value: b"v1".to_vec(),
ttl_ms: 0,
}),
);
send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Kv(KvOp::Put {
collection: "c".into(),
key: b"k".to_vec(),
value: b"v2".to_vec(),
ttl_ms: 0,
}),
);
let payload = send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Kv(KvOp::Get {
collection: "c".into(),
key: b"k".to_vec(),
rls_filters: Vec::new(),
}),
);
assert_eq!(payload, b"v2");
}
#[test]
fn kv_batch_put_and_get() {
let (mut core, mut tx, mut rx) = make_core();
let entries: Vec<(Vec<u8>, Vec<u8>)> = (0..5u8).map(|i| (vec![i], vec![i * 10])).collect();
let payload = send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Kv(KvOp::BatchPut {
collection: "c".into(),
entries,
ttl_ms: 0,
}),
);
let json: serde_json::Value = serde_json::from_slice(&payload).unwrap();
assert_eq!(json["inserted"], 5);
let keys: Vec<Vec<u8>> = (0..5u8).map(|i| vec![i]).collect();
let _payload = send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Kv(KvOp::BatchGet {
collection: "c".into(),
keys,
}),
);
}
#[test]
fn kv_scan_returns_entries() {
let (mut core, mut tx, mut rx) = make_core();
for i in 0..5u32 {
send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Kv(KvOp::Put {
collection: "scantest".into(),
key: format!("key{i}").into_bytes(),
value: format!("val{i}").into_bytes(),
ttl_ms: 0,
}),
);
}
let payload = send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Kv(KvOp::Scan {
collection: "scantest".into(),
cursor: Vec::new(),
count: 100,
filters: Vec::new(),
match_pattern: None,
}),
);
let json: serde_json::Value = serde_json::from_slice(&payload).unwrap();
let entries = json["entries"].as_array().unwrap();
assert_eq!(entries.len(), 5);
}
#[test]
fn kv_scan_with_match_pattern() {
let (mut core, mut tx, mut rx) = make_core();
for prefix in &["user:", "session:", "user:"] {
for i in 0..3u32 {
send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Kv(KvOp::Put {
collection: "mixed".into(),
key: format!("{prefix}{i}").into_bytes(),
value: b"data".to_vec(),
ttl_ms: 0,
}),
);
}
}
let payload = send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Kv(KvOp::Scan {
collection: "mixed".into(),
cursor: Vec::new(),
count: 100,
filters: Vec::new(),
match_pattern: Some("user:*".into()),
}),
);
let json: serde_json::Value = serde_json::from_slice(&payload).unwrap();
let entries = json["entries"].as_array().unwrap();
assert_eq!(entries.len(), 3);
}
#[test]
fn kv_expire_and_persist() {
let (mut core, mut tx, mut rx) = make_core();
send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Kv(KvOp::Put {
collection: "c".into(),
key: b"k".to_vec(),
value: b"v".to_vec(),
ttl_ms: 0,
}),
);
let resp = send_raw(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Kv(KvOp::Expire {
collection: "c".into(),
key: b"k".to_vec(),
ttl_ms: 60_000,
}),
);
assert_eq!(resp.status, Status::Ok);
let resp = send_raw(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Kv(KvOp::Persist {
collection: "c".into(),
key: b"k".to_vec(),
}),
);
assert_eq!(resp.status, Status::Ok);
let payload = send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Kv(KvOp::Get {
collection: "c".into(),
key: b"k".to_vec(),
rls_filters: Vec::new(),
}),
);
assert_eq!(payload, b"v");
}
#[test]
fn kv_register_index_and_lookup() {
let (mut core, mut tx, mut rx) = make_core();
let doc1 =
rmp_serde::to_vec(&serde_json::json!({"region": "us-east", "status": "active"})).unwrap();
let doc2 =
rmp_serde::to_vec(&serde_json::json!({"region": "eu-west", "status": "active"})).unwrap();
send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Kv(KvOp::Put {
collection: "sessions".into(),
key: b"s1".to_vec(),
value: doc1,
ttl_ms: 0,
}),
);
send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Kv(KvOp::Put {
collection: "sessions".into(),
key: b"s2".to_vec(),
value: doc2,
ttl_ms: 0,
}),
);
let payload = send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Kv(KvOp::RegisterIndex {
collection: "sessions".into(),
field: "region".into(),
field_position: 0,
backfill: true,
}),
);
let json: serde_json::Value = serde_json::from_slice(&payload).unwrap();
assert_eq!(json["backfilled"], 2);
}
#[test]
fn kv_drop_index() {
let (mut core, mut tx, mut rx) = make_core();
send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Kv(KvOp::RegisterIndex {
collection: "c".into(),
field: "status".into(),
field_position: 0,
backfill: false,
}),
);
let doc = rmp_serde::to_vec(&serde_json::json!({"status": "active"})).unwrap();
send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Kv(KvOp::Put {
collection: "c".into(),
key: b"k1".to_vec(),
value: doc,
ttl_ms: 0,
}),
);
let payload = send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Kv(KvOp::DropIndex {
collection: "c".into(),
field: "status".into(),
}),
);
let json: serde_json::Value = serde_json::from_slice(&payload).unwrap();
assert_eq!(json["entries_removed"], 1);
}
#[test]
fn kv_tenant_isolation() {
let (mut core, mut tx, mut rx) = make_core();
let req = nodedb::bridge::envelope::Request {
tenant_id: nodedb::types::TenantId::new(1),
..make_request(PhysicalPlan::Kv(KvOp::Put {
collection: "shared".into(),
key: b"k".to_vec(),
value: b"tenant1".to_vec(),
ttl_ms: 0,
}))
};
tx.try_push(nodedb::bridge::dispatch::BridgeRequest { inner: req })
.unwrap();
core.tick();
let resp = rx.try_pop().unwrap();
assert_eq!(resp.inner.status, Status::Ok);
let req = nodedb::bridge::envelope::Request {
tenant_id: nodedb::types::TenantId::new(2),
..make_request(PhysicalPlan::Kv(KvOp::Put {
collection: "shared".into(),
key: b"k".to_vec(),
value: b"tenant2".to_vec(),
ttl_ms: 0,
}))
};
tx.try_push(nodedb::bridge::dispatch::BridgeRequest { inner: req })
.unwrap();
core.tick();
let resp = rx.try_pop().unwrap();
assert_eq!(resp.inner.status, Status::Ok);
let req = nodedb::bridge::envelope::Request {
tenant_id: nodedb::types::TenantId::new(1),
..make_request(PhysicalPlan::Kv(KvOp::Get {
collection: "shared".into(),
key: b"k".to_vec(),
rls_filters: Vec::new(),
}))
};
tx.try_push(nodedb::bridge::dispatch::BridgeRequest { inner: req })
.unwrap();
core.tick();
let resp = rx.try_pop().unwrap();
assert_eq!(resp.inner.status, Status::Ok);
assert_eq!(resp.inner.payload.to_vec(), b"tenant1");
}