use nodedb::bridge::envelope::{PhysicalPlan, Status};
use nodedb::bridge::physical_plan::KvOp;
use crate::helpers::*;
#[test]
fn kv_protocol_command_sequence() {
let (mut core, mut tx, mut rx) = make_core();
send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Kv(KvOp::Put {
collection: "default".into(),
key: b"key1".to_vec(),
value: b"value1".to_vec(),
ttl_ms: 0,
}),
);
let payload = send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Kv(KvOp::Get {
collection: "default".into(),
key: b"key1".to_vec(),
rls_filters: Vec::new(),
}),
);
assert_eq!(payload, b"value1");
send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Kv(KvOp::Put {
collection: "default".into(),
key: b"key1".to_vec(),
value: b"value2".to_vec(),
ttl_ms: 0,
}),
);
let payload = send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Kv(KvOp::Get {
collection: "default".into(),
key: b"key1".to_vec(),
rls_filters: Vec::new(),
}),
);
assert_eq!(payload, b"value2");
let resp = send_raw(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Kv(KvOp::Get {
collection: "default".into(),
key: b"key1".to_vec(),
rls_filters: Vec::new(),
}),
);
assert_eq!(resp.status, Status::Ok);
let payload = send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Kv(KvOp::Delete {
collection: "default".into(),
keys: vec![b"key1".to_vec()],
}),
);
let json: serde_json::Value = serde_json::from_slice(&payload).unwrap();
assert_eq!(json["deleted"], 1);
let resp = send_raw(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Kv(KvOp::Get {
collection: "default".into(),
key: b"key1".to_vec(),
rls_filters: Vec::new(),
}),
);
assert_eq!(resp.status, Status::Error);
send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Kv(KvOp::BatchPut {
collection: "default".into(),
entries: vec![
(b"a".to_vec(), b"1".to_vec()),
(b"b".to_vec(), b"2".to_vec()),
(b"c".to_vec(), b"3".to_vec()),
],
ttl_ms: 0,
}),
);
let _payload = send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Kv(KvOp::BatchGet {
collection: "default".into(),
keys: vec![
b"a".to_vec(),
b"b".to_vec(),
b"c".to_vec(),
b"missing".to_vec(),
],
}),
);
let payload = send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Kv(KvOp::Delete {
collection: "default".into(),
keys: vec![b"a".to_vec(), b"b".to_vec(), b"c".to_vec()],
}),
);
let json: serde_json::Value = serde_json::from_slice(&payload).unwrap();
assert_eq!(json["deleted"], 3);
}
#[test]
fn kv_and_vector_coexist() {
use nodedb::bridge::physical_plan::VectorOp;
use std::sync::Arc;
let (mut core, mut tx, mut rx) = make_core();
for i in 0..5u32 {
send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Kv(KvOp::Put {
collection: "users".into(),
key: format!("user:{i}").into_bytes(),
value: format!("data:{i}").into_bytes(),
ttl_ms: 0,
}),
);
}
for i in 0..5u32 {
send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Vector(VectorOp::Insert {
collection: "embeddings".into(),
vector: vec![i as f32, 0.0, 0.0],
dim: 3,
field_name: String::new(),
doc_id: Some(format!("user:{i}")),
}),
);
}
let payload = send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Kv(KvOp::Get {
collection: "users".into(),
key: b"user:3".to_vec(),
rls_filters: Vec::new(),
}),
);
assert_eq!(payload, b"data:3");
let _payload = send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Vector(VectorOp::Search {
collection: "embeddings".into(),
query_vector: Arc::from([3.0f32, 0.0, 0.0].as_slice()),
top_k: 2,
ef_search: 0,
filter_bitmap: None,
field_name: String::new(),
rls_filters: Vec::new(),
}),
);
}
#[test]
fn ttl_expiry_produces_expired_key_info() {
let (mut core, mut tx, mut rx) = make_core();
send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Kv(KvOp::Put {
collection: "sessions".into(),
key: b"s1".to_vec(),
value: b"data".to_vec(),
ttl_ms: 1000, }),
);
let resp = send_raw(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Kv(KvOp::Get {
collection: "sessions".into(),
key: b"s1".to_vec(),
rls_filters: Vec::new(),
}),
);
assert_eq!(resp.status, Status::Ok);
send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Kv(KvOp::Put {
collection: "sessions".into(),
key: b"persistent".to_vec(),
value: b"forever".to_vec(),
ttl_ms: 0,
}),
);
let payload = send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Kv(KvOp::Get {
collection: "sessions".into(),
key: b"persistent".to_vec(),
rls_filters: Vec::new(),
}),
);
assert_eq!(payload, b"forever");
}
#[test]
fn kv_field_get_and_set() {
let (mut core, mut tx, mut rx) = make_core();
let doc = rmp_serde::to_vec(&serde_json::json!({
"name": "alice",
"age": 30,
"region": "us-east"
}))
.unwrap();
send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Kv(KvOp::Put {
collection: "users".into(),
key: b"u1".to_vec(),
value: doc,
ttl_ms: 0,
}),
);
let payload = send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Kv(KvOp::FieldGet {
collection: "users".into(),
key: b"u1".to_vec(),
fields: vec!["name".into(), "age".into()],
}),
);
let result: serde_json::Value = serde_json::from_slice(&payload).unwrap();
assert_eq!(result["name"], "alice");
assert_eq!(result["age"], 30);
send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Kv(KvOp::FieldSet {
collection: "users".into(),
key: b"u1".to_vec(),
updates: vec![(
"region".into(),
serde_json::to_vec(&serde_json::json!("eu-west")).unwrap(),
)],
}),
);
let payload = send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Kv(KvOp::FieldGet {
collection: "users".into(),
key: b"u1".to_vec(),
fields: vec!["region".into()],
}),
);
let result: serde_json::Value = serde_json::from_slice(&payload).unwrap();
assert_eq!(result["region"], "eu-west");
}
#[test]
fn kv_truncate_clears_all() {
let (mut core, mut tx, mut rx) = make_core();
for i in 0..10u32 {
send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Kv(KvOp::Put {
collection: "ephemeral".into(),
key: format!("k{i}").into_bytes(),
value: b"v".to_vec(),
ttl_ms: 0,
}),
);
}
let payload = send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Kv(KvOp::Truncate {
collection: "ephemeral".into(),
}),
);
let json: serde_json::Value = serde_json::from_slice(&payload).unwrap();
assert_eq!(json["deleted"], 10);
let resp = send_raw(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Kv(KvOp::Get {
collection: "ephemeral".into(),
key: b"k0".to_vec(),
rls_filters: Vec::new(),
}),
);
assert_eq!(resp.status, Status::Error);
}
#[test]
fn kv_index_write_amp_ratio_matches() {
let (mut core, mut tx, mut rx) = make_core();
for field in &["region", "status", "tier"] {
send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Kv(KvOp::RegisterIndex {
collection: "indexed".into(),
field: field.to_string(),
field_position: 0,
backfill: false,
}),
);
}
for i in 0..100u32 {
let doc = rmp_serde::to_vec(&serde_json::json!({
"region": format!("r{}", i % 5),
"status": if i % 2 == 0 { "active" } else { "inactive" },
"tier": format!("t{}", i % 3),
}))
.unwrap();
send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Kv(KvOp::Put {
collection: "indexed".into(),
key: format!("k{i:03}").into_bytes(),
value: doc,
ttl_ms: 0,
}),
);
}
let payload = send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Kv(KvOp::Scan {
collection: "indexed".into(),
cursor: Vec::new(),
count: 200,
filters: Vec::new(),
match_pattern: None,
}),
);
let json: serde_json::Value = serde_json::from_slice(&payload).unwrap();
let count = json["entries"].as_array().unwrap().len();
assert_eq!(count, 100);
}
#[test]
fn kv_mass_expiry_respects_reap_budget() {
let (mut core, mut tx, mut rx) = make_core();
for i in 0..1000u32 {
send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Kv(KvOp::Put {
collection: "mass_expire".into(),
key: format!("k{i:04}").into_bytes(),
value: b"v".to_vec(),
ttl_ms: 100, }),
);
}
send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Kv(KvOp::Put {
collection: "mass_expire".into(),
key: b"survivor".to_vec(),
value: b"alive".to_vec(),
ttl_ms: 0,
}),
);
let payload = send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Kv(KvOp::Get {
collection: "mass_expire".into(),
key: b"survivor".to_vec(),
rls_filters: Vec::new(),
}),
);
assert_eq!(payload, b"alive");
core.maybe_run_maintenance();
let payload = send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Kv(KvOp::Get {
collection: "mass_expire".into(),
key: b"survivor".to_vec(),
rls_filters: Vec::new(),
}),
);
assert_eq!(payload, b"alive");
}