use nodedb::bridge::envelope::{ErrorCode, Status};
use nodedb::bridge::scan_filter::ScanFilter;
use nodedb_physical::physical_plan::{DocumentOp, PhysicalPlan, UpdateValue};
use crate::helpers::*;
const COLLECTION: &str = "ollp_items";
fn filter_active() -> Vec<u8> {
let f = ScanFilter {
field: "active".into(),
op: "eq".into(),
value: nodedb_types::Value::Bool(true),
clauses: Vec::new(),
expr: None,
};
zerompk::to_msgpack_vec(&vec![f]).unwrap()
}
fn surrogate_for(id: &str) -> nodedb_types::Surrogate {
let mut h: u32 = 2_166_136_261;
for &b in id.as_bytes() {
h ^= b as u32;
h = h.wrapping_mul(16_777_619);
}
nodedb_types::Surrogate::new(h.max(1))
}
fn surrogate_u32(id: &str) -> u32 {
surrogate_for(id).as_u32()
}
fn insert_active(ctx: &mut TestCtx, id: &str) {
let value = format!(r#"{{"active":true,"name":"{id}"}}"#);
send_ok(
&mut ctx.core,
&mut ctx.tx,
&mut ctx.rx,
PhysicalPlan::Document(DocumentOp::PointPut {
collection: COLLECTION.into(),
document_id: id.into(),
value: value.into_bytes(),
surrogate: surrogate_for(id),
pk_bytes: id.as_bytes().to_vec(),
}),
);
}
fn bulk_update_plan(predicted: Option<Vec<u32>>) -> PhysicalPlan {
let updates = vec![(
"name".to_string(),
UpdateValue::Literal(nodedb_types::json_to_msgpack(&serde_json::json!("updated")).unwrap()),
)];
PhysicalPlan::Document(DocumentOp::BulkUpdate {
collection: COLLECTION.into(),
filters: filter_active(),
updates,
returning: None,
ollp_predicted_surrogates: predicted,
})
}
fn bulk_delete_plan(predicted: Option<Vec<u32>>) -> PhysicalPlan {
PhysicalPlan::Document(DocumentOp::BulkDelete {
collection: COLLECTION.into(),
filters: filter_active(),
returning: None,
ollp_predicted_surrogates: predicted,
})
}
#[test]
fn bulk_update_no_ollp_proceeds() {
let mut ctx = make_ctx();
insert_active(&mut ctx, "a");
insert_active(&mut ctx, "b");
let resp = send_raw(
&mut ctx.core,
&mut ctx.tx,
&mut ctx.rx,
bulk_update_plan(None),
);
assert_eq!(resp.status, Status::Ok, "no-OLLP BulkUpdate should succeed");
}
#[test]
fn bulk_delete_no_ollp_proceeds() {
let mut ctx = make_ctx();
insert_active(&mut ctx, "a");
insert_active(&mut ctx, "b");
let resp = send_raw(
&mut ctx.core,
&mut ctx.tx,
&mut ctx.rx,
bulk_delete_plan(None),
);
assert_eq!(resp.status, Status::Ok, "no-OLLP BulkDelete should succeed");
}
#[test]
fn bulk_update_correct_prediction_succeeds() {
let mut ctx = make_ctx();
insert_active(&mut ctx, "x1");
insert_active(&mut ctx, "x2");
let predicted = vec![surrogate_u32("x1"), surrogate_u32("x2")];
let resp = send_raw(
&mut ctx.core,
&mut ctx.tx,
&mut ctx.rx,
bulk_update_plan(Some(predicted)),
);
assert_eq!(
resp.status,
Status::Ok,
"BulkUpdate with correct prediction should succeed"
);
}
#[test]
fn bulk_delete_correct_prediction_succeeds() {
let mut ctx = make_ctx();
insert_active(&mut ctx, "y1");
insert_active(&mut ctx, "y2");
let predicted = vec![surrogate_u32("y1"), surrogate_u32("y2")];
let resp = send_raw(
&mut ctx.core,
&mut ctx.tx,
&mut ctx.rx,
bulk_delete_plan(Some(predicted)),
);
assert_eq!(
resp.status,
Status::Ok,
"BulkDelete with correct prediction should succeed"
);
}
#[test]
fn bulk_update_stale_prediction_returns_ollp_retry_required() {
let mut ctx = make_ctx();
insert_active(&mut ctx, "z1");
insert_active(&mut ctx, "z2");
let predicted = vec![surrogate_u32("z1"), surrogate_u32("z2")];
insert_active(&mut ctx, "z3");
let resp = send_raw(
&mut ctx.core,
&mut ctx.tx,
&mut ctx.rx,
bulk_update_plan(Some(predicted)),
);
assert_eq!(
resp.status,
Status::Error,
"stale prediction should produce Status::Error"
);
assert_eq!(
resp.error_code,
Some(ErrorCode::OllpRetryRequired),
"error code must be OllpRetryRequired, got {:?}",
resp.error_code
);
let payload = send_ok(
&mut ctx.core,
&mut ctx.tx,
&mut ctx.rx,
PhysicalPlan::Document(DocumentOp::PointGet {
collection: COLLECTION.into(),
document_id: "z1".into(),
rls_filters: Vec::new(),
system_as_of_ms: None,
valid_at_ms: None,
surrogate: surrogate_for("z1"),
pk_bytes: "z1".as_bytes().to_vec(),
}),
);
let val = payload_value(&payload);
let name = val.get("name").and_then(|n| n.as_str()).unwrap_or_default();
assert_eq!(
name, "z1",
"OllpRetryRequired must not have modified the document"
);
}
#[test]
fn bulk_delete_stale_prediction_returns_ollp_retry_required() {
let mut ctx = make_ctx();
insert_active(&mut ctx, "d1");
insert_active(&mut ctx, "d2");
let predicted = vec![surrogate_u32("d1"), surrogate_u32("d2")];
insert_active(&mut ctx, "d3");
let resp = send_raw(
&mut ctx.core,
&mut ctx.tx,
&mut ctx.rx,
bulk_delete_plan(Some(predicted)),
);
assert_eq!(resp.status, Status::Error);
assert_eq!(resp.error_code, Some(ErrorCode::OllpRetryRequired));
}
#[test]
fn bulk_update_retry_with_corrected_prediction_succeeds() {
let mut ctx = make_ctx();
insert_active(&mut ctx, "r1");
insert_active(&mut ctx, "r2");
let stale_predicted = vec![surrogate_u32("r1"), surrogate_u32("r2")];
insert_active(&mut ctx, "r3");
let first_resp = send_raw(
&mut ctx.core,
&mut ctx.tx,
&mut ctx.rx,
bulk_update_plan(Some(stale_predicted)),
);
assert_eq!(first_resp.error_code, Some(ErrorCode::OllpRetryRequired));
let corrected_predicted = vec![
surrogate_u32("r1"),
surrogate_u32("r2"),
surrogate_u32("r3"),
];
let retry_resp = send_raw(
&mut ctx.core,
&mut ctx.tx,
&mut ctx.rx,
bulk_update_plan(Some(corrected_predicted)),
);
assert_eq!(
retry_resp.status,
Status::Ok,
"retry with corrected prediction must succeed"
);
for id in ["r1", "r2", "r3"] {
let payload = send_ok(
&mut ctx.core,
&mut ctx.tx,
&mut ctx.rx,
PhysicalPlan::Document(DocumentOp::PointGet {
collection: COLLECTION.into(),
document_id: id.into(),
rls_filters: Vec::new(),
system_as_of_ms: None,
valid_at_ms: None,
surrogate: surrogate_for(id),
pk_bytes: id.as_bytes().to_vec(),
}),
);
let val = payload_value(&payload);
let name = val.get("name").and_then(|n| n.as_str()).unwrap_or_default();
assert_eq!(
name, "updated",
"doc {id} should have been updated on retry"
);
}
}
#[test]
fn bulk_update_superset_prediction_returns_ollp_retry_required() {
let mut ctx = make_ctx();
insert_active(&mut ctx, "s1");
insert_active(&mut ctx, "s2");
send_ok(
&mut ctx.core,
&mut ctx.tx,
&mut ctx.rx,
PhysicalPlan::Document(DocumentOp::PointDelete {
collection: COLLECTION.into(),
document_id: "s2".into(),
surrogate: surrogate_for("s2"),
pk_bytes: "s2".as_bytes().to_vec(),
returning: None,
}),
);
let stale_predicted = vec![surrogate_u32("s1"), surrogate_u32("s2")];
let resp = send_raw(
&mut ctx.core,
&mut ctx.tx,
&mut ctx.rx,
bulk_update_plan(Some(stale_predicted)),
);
assert_eq!(resp.status, Status::Error);
assert_eq!(resp.error_code, Some(ErrorCode::OllpRetryRequired));
}