use nodedb::bridge::envelope::Status;
use nodedb_physical::physical_plan::{
AggregateSpec, ColumnarInsertIntent, ColumnarOp, DocumentOp, KvOp, MetaOp, PhysicalPlan,
QueryOp, TimeseriesOp,
};
use crate::helpers::*;
fn kv_put(key: &[u8], value: &[u8]) -> PhysicalPlan {
PhysicalPlan::Kv(KvOp::Put {
collection: "kv_coll".into(),
key: key.to_vec(),
value: value.to_vec(),
ttl_ms: 0,
surrogate: nodedb_types::Surrogate::ZERO,
})
}
fn kv_get(key: &[u8]) -> PhysicalPlan {
PhysicalPlan::Kv(KvOp::Get {
collection: "kv_coll".into(),
key: key.to_vec(),
rls_filters: Vec::new(),
surrogate_ceiling: None,
})
}
fn doc_put_conflict_seed(coll: &str) -> PhysicalPlan {
PhysicalPlan::Document(DocumentOp::PointPut {
collection: coll.into(),
document_id: "conflict_doc".into(),
value: b"seed".to_vec(),
surrogate: nodedb_types::Surrogate::ZERO,
pk_bytes: Vec::new(),
})
}
fn doc_insert_conflict(coll: &str) -> PhysicalPlan {
PhysicalPlan::Document(DocumentOp::PointInsert {
collection: coll.into(),
document_id: "conflict_doc".into(),
value: b"conflict".to_vec(),
surrogate: nodedb_types::Surrogate::ZERO,
if_absent: false,
})
}
fn doc_get(coll: &str, doc_id: &str) -> PhysicalPlan {
PhysicalPlan::Document(DocumentOp::PointGet {
collection: coll.into(),
document_id: doc_id.into(),
rls_filters: Vec::new(),
system_as_of_ms: None,
valid_at_ms: None,
surrogate: nodedb_types::Surrogate::ZERO,
pk_bytes: Vec::new(),
})
}
#[test]
fn rollback_matrix_kv_then_doc_fail() {
let (mut core, mut tx, mut rx, _dir) = make_core();
send_ok(&mut core, &mut tx, &mut rx, kv_put(b"k1", b"original"));
send_ok(&mut core, &mut tx, &mut rx, doc_put_conflict_seed("docs"));
let resp = send_raw(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Meta(MetaOp::TransactionBatch {
plans: vec![kv_put(b"k1", b"modified"), doc_insert_conflict("docs")],
}),
);
assert_eq!(resp.status, Status::Error, "batch must fail on conflict");
let r = send_raw(&mut core, &mut tx, &mut rx, kv_get(b"k1"));
assert_eq!(r.status, Status::Ok);
assert_eq!(&*r.payload, b"original", "kv put must be rolled back");
}
#[test]
fn rollback_matrix_doc_then_kv_fail() {
let (mut core, mut tx, mut rx, _dir) = make_core();
send_ok(&mut core, &mut tx, &mut rx, doc_put_conflict_seed("docs"));
let resp = send_raw(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Meta(MetaOp::TransactionBatch {
plans: vec![
kv_put(b"new_key", b"should_not_persist"),
doc_insert_conflict("docs"),
],
}),
);
assert_eq!(resp.status, Status::Error, "batch must fail on conflict");
let r = send_raw(&mut core, &mut tx, &mut rx, kv_get(b"new_key"));
let is_absent = r.status == Status::Error || r.payload.is_empty();
assert!(
is_absent,
"new_key must not persist after rollback; payload={:?}",
r.payload
);
}
#[test]
fn rollback_matrix_kv_delete_then_doc_fail() {
let (mut core, mut tx, mut rx, _dir) = make_core();
send_ok(&mut core, &mut tx, &mut rx, kv_put(b"del_key", b"keep_me"));
send_ok(&mut core, &mut tx, &mut rx, doc_put_conflict_seed("docs"));
let resp = send_raw(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Meta(MetaOp::TransactionBatch {
plans: vec![
PhysicalPlan::Kv(KvOp::Delete {
collection: "kv_coll".into(),
keys: vec![b"del_key".to_vec()],
}),
doc_insert_conflict("docs"),
],
}),
);
assert_eq!(resp.status, Status::Error, "batch must fail");
let r = send_raw(&mut core, &mut tx, &mut rx, kv_get(b"del_key"));
assert_eq!(r.status, Status::Ok);
assert_eq!(&*r.payload, b"keep_me", "kv delete must be rolled back");
}
#[test]
fn rollback_matrix_kv_batch_put_then_doc_fail() {
let (mut core, mut tx, mut rx, _dir) = make_core();
send_ok(&mut core, &mut tx, &mut rx, kv_put(b"k_a", b"a_orig"));
send_ok(&mut core, &mut tx, &mut rx, doc_put_conflict_seed("docs"));
let resp = send_raw(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Meta(MetaOp::TransactionBatch {
plans: vec![
PhysicalPlan::Kv(KvOp::BatchPut {
collection: "kv_coll".into(),
entries: vec![
(b"k_a".to_vec(), b"a_new".to_vec()),
(b"k_b".to_vec(), b"b_new".to_vec()),
],
ttl_ms: 0,
}),
doc_insert_conflict("docs"),
],
}),
);
assert_eq!(resp.status, Status::Error, "batch must fail");
let r = send_raw(&mut core, &mut tx, &mut rx, kv_get(b"k_a"));
assert_eq!(r.status, Status::Ok);
assert_eq!(&*r.payload, b"a_orig", "k_a must be rolled back");
let r = send_raw(&mut core, &mut tx, &mut rx, kv_get(b"k_b"));
let is_absent = r.status == Status::Error || r.payload.is_empty();
assert!(is_absent, "k_b must not persist after rollback");
}
#[test]
fn rollback_matrix_doc_then_doc_conflict_kv_intact() {
let (mut core, mut tx, mut rx, _dir) = make_core();
send_ok(
&mut core,
&mut tx,
&mut rx,
kv_put(b"anchor", b"anchor_val"),
);
send_ok(&mut core, &mut tx, &mut rx, doc_put_conflict_seed("docs"));
let resp = send_raw(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Meta(MetaOp::TransactionBatch {
plans: vec![
kv_put(b"anchor", b"anchor_modified"),
doc_insert_conflict("docs"),
],
}),
);
assert_eq!(resp.status, Status::Error);
let r = send_raw(&mut core, &mut tx, &mut rx, kv_get(b"anchor"));
assert_eq!(r.status, Status::Ok);
assert_eq!(&*r.payload, b"anchor_val");
let r = send_raw(&mut core, &mut tx, &mut rx, doc_get("docs", "conflict_doc"));
assert_eq!(r.status, Status::Ok, "conflict_doc must still exist");
}
#[test]
fn rollback_matrix_columnar_then_doc_fail() {
let (mut core, mut tx, mut rx, _dir) = make_core();
send_ok(
&mut core,
&mut tx,
&mut rx,
doc_put_conflict_seed("conflict_coll"),
);
let before = core.scan_collection(1, "metrics", 100).unwrap();
assert!(before.is_empty(), "columnar must be empty before batch");
let rows = serde_json::json!([{"id": "r1", "val": 42}]);
let payload = nodedb_types::json_to_msgpack(&rows).unwrap();
let resp = send_raw(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Meta(MetaOp::TransactionBatch {
plans: vec![
PhysicalPlan::Columnar(ColumnarOp::Insert {
collection: "metrics".into(),
payload,
format: "msgpack".into(),
intent: ColumnarInsertIntent::Insert,
on_conflict_updates: Vec::new(),
surrogates: Vec::new(),
schema_bytes: Vec::new(),
}),
doc_insert_conflict("conflict_coll"),
],
}),
);
assert_eq!(
resp.status,
Status::Error,
"batch must fail on doc conflict"
);
assert!(
!matches!(
resp.error_code,
Some(nodedb::bridge::envelope::ErrorCode::RollbackFailed { .. })
),
"rollback itself must succeed; got {:?}",
resp.error_code
);
let after = core.scan_collection(1, "metrics", 100).unwrap();
assert!(
after.is_empty(),
"columnar insert must be rolled back; found {} rows after rollback",
after.len()
);
}
#[test]
fn rollback_matrix_columnar_count_after_rollback() {
let (mut core, mut tx, mut rx, _dir) = make_core();
send_ok(
&mut core,
&mut tx,
&mut rx,
doc_put_conflict_seed("conflict_coll"),
);
let baseline = serde_json::json!([{"id": "baseline", "val": 1}]);
let baseline_payload = nodedb_types::json_to_msgpack(&baseline).unwrap();
send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Columnar(ColumnarOp::Insert {
collection: "metrics2".into(),
payload: baseline_payload,
format: "msgpack".into(),
intent: ColumnarInsertIntent::Insert,
on_conflict_updates: Vec::new(),
surrogates: Vec::new(),
schema_bytes: Vec::new(),
}),
);
let extra = serde_json::json!([{"id": "rolled_back", "val": 2}]);
let extra_payload = nodedb_types::json_to_msgpack(&extra).unwrap();
let resp = send_raw(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Meta(MetaOp::TransactionBatch {
plans: vec![
PhysicalPlan::Columnar(ColumnarOp::Insert {
collection: "metrics2".into(),
payload: extra_payload,
format: "msgpack".into(),
intent: ColumnarInsertIntent::Insert,
on_conflict_updates: Vec::new(),
surrogates: Vec::new(),
schema_bytes: Vec::new(),
}),
doc_insert_conflict("conflict_coll"),
],
}),
);
assert_eq!(resp.status, Status::Error, "batch must fail");
let agg_payload = send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Query(QueryOp::Aggregate {
collection: "metrics2".into(),
group_by: Vec::new(),
aggregates: vec![AggregateSpec {
function: "count".into(),
alias: "count(*)".into(),
user_alias: None,
field: "*".into(),
expr: None,
}],
filters: Vec::new(),
having: Vec::new(),
limit: 10,
sub_group_by: Vec::new(),
sub_aggregates: Vec::new(),
grouping_sets: Vec::new(),
sort_keys: Vec::new(),
}),
);
let json = crate::helpers::payload_json(&agg_payload);
let val: serde_json::Value = serde_json::from_str(&json).unwrap();
let count = val
.as_array()
.and_then(|a| a.first())
.and_then(|row| row.get("count(*)"))
.and_then(|v| v.as_i64())
.unwrap_or(-1);
assert_eq!(
count, 1,
"columnar row count after rollback must be 1 (baseline only); got count={count} json={json}"
);
}
#[test]
fn rollback_matrix_timeseries_then_doc_fail() {
let (mut core, mut tx, mut rx, _dir) = make_core();
send_ok(
&mut core,
&mut tx,
&mut rx,
doc_put_conflict_seed("conflict_coll"),
);
let ilp = "cpu,host=s1 value=0.5 1000000000\n\
cpu,host=s1 value=0.6 2000000000\n\
cpu,host=s1 value=0.7 3000000000\n";
let resp = send_raw(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Meta(MetaOp::TransactionBatch {
plans: vec![
PhysicalPlan::Timeseries(TimeseriesOp::Ingest {
collection: "cpu".into(),
payload: ilp.as_bytes().to_vec(),
format: "ilp".into(),
wal_lsn: None,
surrogates: Vec::new(),
}),
doc_insert_conflict("conflict_coll"),
],
}),
);
assert_eq!(
resp.status,
Status::Error,
"batch must fail on doc conflict"
);
assert!(
!matches!(
resp.error_code,
Some(nodedb::bridge::envelope::ErrorCode::RollbackFailed { .. })
),
"rollback itself must succeed; got {:?}",
resp.error_code
);
let scan_resp = send_raw(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Timeseries(TimeseriesOp::Scan {
collection: "cpu".into(),
time_range: (0, i64::MAX),
projection: Vec::new(),
limit: 100,
filters: Vec::new(),
bucket_interval_ms: 0,
group_by: Vec::new(),
aggregates: Vec::new(),
gap_fill: String::new(),
rls_filters: Vec::new(),
system_as_of_ms: None,
valid_at_ms: None,
computed_columns: Vec::new(),
}),
);
assert_eq!(scan_resp.status, Status::Ok);
let json = crate::helpers::payload_json(&scan_resp.payload);
let val: serde_json::Value =
serde_json::from_str(&json).unwrap_or(serde_json::Value::Array(vec![]));
let empty = vec![];
let arr = val.as_array().unwrap_or(&empty);
assert!(
arr.is_empty(),
"timeseries rows must be rolled back (truncate_to called); got {} rows: {json}",
arr.len()
);
}
#[test]
fn rollback_matrix_timeseries_count_after_rollback() {
let (mut core, mut tx, mut rx, _dir) = make_core();
send_ok(
&mut core,
&mut tx,
&mut rx,
doc_put_conflict_seed("conflict_coll"),
);
let baseline_ilp = "temp,host=s1 value=1.0 1000000000\n\
temp,host=s1 value=2.0 2000000000\n";
send_ok(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Timeseries(TimeseriesOp::Ingest {
collection: "temp".into(),
payload: baseline_ilp.as_bytes().to_vec(),
format: "ilp".into(),
wal_lsn: None,
surrogates: Vec::new(),
}),
);
let extra_ilp = "temp,host=s1 value=3.0 3000000000\n\
temp,host=s1 value=4.0 4000000000\n\
temp,host=s1 value=5.0 5000000000\n";
let resp = send_raw(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Meta(MetaOp::TransactionBatch {
plans: vec![
PhysicalPlan::Timeseries(TimeseriesOp::Ingest {
collection: "temp".into(),
payload: extra_ilp.as_bytes().to_vec(),
format: "ilp".into(),
wal_lsn: None,
surrogates: Vec::new(),
}),
doc_insert_conflict("conflict_coll"),
],
}),
);
assert_eq!(resp.status, Status::Error, "batch must fail");
let scan_resp = send_raw(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Timeseries(TimeseriesOp::Scan {
collection: "temp".into(),
time_range: (0, i64::MAX),
projection: Vec::new(),
limit: 100,
filters: Vec::new(),
bucket_interval_ms: 0,
group_by: Vec::new(),
aggregates: Vec::new(),
gap_fill: String::new(),
rls_filters: Vec::new(),
system_as_of_ms: None,
valid_at_ms: None,
computed_columns: Vec::new(),
}),
);
assert_eq!(scan_resp.status, Status::Ok);
let json = crate::helpers::payload_json(&scan_resp.payload);
let val: serde_json::Value =
serde_json::from_str(&json).unwrap_or(serde_json::Value::Array(vec![]));
let empty = vec![];
let arr = val.as_array().unwrap_or(&empty);
assert_eq!(
arr.len(),
2,
"timeseries must have only baseline 2 rows after rollback; got {} rows: {json}",
arr.len()
);
}