use nodedb::bridge::envelope::{ErrorCode, PhysicalPlan, Status};
use nodedb::bridge::physical_plan::{CrdtOp, DocumentOp, GraphOp, MetaOp, TextOp, VectorOp};
use crate::helpers::*;
fn vector_set_params(collection: &str) -> PhysicalPlan {
PhysicalPlan::Vector(VectorOp::SetParams {
collection: collection.into(),
field_name: String::new(),
m: 16,
ef_construction: 200,
metric: "cosine".into(),
index_type: String::new(),
pq_m: 0,
ivf_cells: 0,
ivf_nprobe: 0,
})
}
fn vector_seed(collection: &str) -> PhysicalPlan {
PhysicalPlan::Vector(VectorOp::Insert {
collection: collection.into(),
vector: vec![1.0, 2.0, 3.0],
dim: 3,
field_name: String::new(),
surrogate: nodedb_types::Surrogate::ZERO,
})
}
fn vector_fail(collection: &str) -> PhysicalPlan {
PhysicalPlan::Vector(VectorOp::Insert {
collection: collection.into(),
vector: vec![1.0, 2.0],
dim: 3,
field_name: String::new(),
surrogate: nodedb_types::Surrogate::ZERO,
})
}
fn doc_put(coll: &str, val: &[u8]) -> PhysicalPlan {
PhysicalPlan::Document(DocumentOp::PointPut {
collection: coll.into(),
document_id: "doc1".into(),
value: val.to_vec(),
surrogate: nodedb_types::Surrogate::ZERO,
pk_bytes: Vec::new(),
})
}
fn doc_get(coll: &str) -> PhysicalPlan {
PhysicalPlan::Document(DocumentOp::PointGet {
collection: coll.into(),
document_id: "doc1".into(),
rls_filters: Vec::new(),
system_as_of_ms: None,
valid_at_ms: None,
surrogate: nodedb_types::Surrogate::ZERO,
pk_bytes: Vec::new(),
})
}
fn doc_insert_conflict(coll: &str) -> PhysicalPlan {
PhysicalPlan::Document(DocumentOp::PointInsert {
collection: coll.into(),
document_id: "doc1".into(),
value: b"{\"conflict\":true}".to_vec(),
surrogate: nodedb_types::Surrogate::ZERO,
if_absent: false,
})
}
fn edge_put(coll: &str, src: &str, dst: &str) -> PhysicalPlan {
PhysicalPlan::Graph(GraphOp::EdgePut {
collection: coll.into(),
src_id: src.into(),
label: "REL".into(),
dst_id: dst.into(),
properties: Vec::new(),
src_surrogate: nodedb_types::Surrogate::ZERO,
dst_surrogate: nodedb_types::Surrogate::ZERO,
})
}
fn neighbors(src: &str) -> PhysicalPlan {
PhysicalPlan::Graph(GraphOp::Neighbors {
node_id: src.into(),
edge_label: Some("REL".into()),
direction: nodedb::engine::graph::edge_store::Direction::Out,
rls_filters: Vec::new(),
})
}
#[test]
fn rollback_matrix_doc_then_vector_fail() {
let (mut core, mut tx, mut rx, _dir) = make_core();
send_ok(&mut core, &mut tx, &mut rx, doc_put("docs", b"original"));
send_ok(&mut core, &mut tx, &mut rx, vector_set_params("vec"));
send_ok(&mut core, &mut tx, &mut rx, vector_seed("vec"));
let resp = send_raw(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Meta(MetaOp::TransactionBatch {
plans: vec![doc_put("docs", b"modified"), vector_fail("vec")],
}),
);
assert_eq!(resp.status, Status::Error, "batch should fail");
let r = send_raw(&mut core, &mut tx, &mut rx, doc_get("docs"));
assert_eq!(r.status, Status::Ok);
assert_eq!(&*r.payload, b"original");
}
#[test]
fn rollback_matrix_vector_then_doc_fail() {
let (mut core, mut tx, mut rx, _dir) = make_core();
send_ok(&mut core, &mut tx, &mut rx, vector_set_params("vec"));
send_ok(&mut core, &mut tx, &mut rx, vector_seed("vec"));
send_ok(&mut core, &mut tx, &mut rx, doc_put("docs", b"preexisting"));
let count_before: usize = {
1 };
let _ = count_before;
let v_plan = PhysicalPlan::Vector(VectorOp::Insert {
collection: "vec".into(),
vector: vec![0.5, 0.5, 0.5],
dim: 3,
field_name: String::new(),
surrogate: nodedb_types::Surrogate::ZERO,
});
let resp = send_raw(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Meta(MetaOp::TransactionBatch {
plans: vec![
v_plan,
doc_insert_conflict("docs"), ],
}),
);
assert_eq!(resp.status, Status::Error, "batch should fail");
assert!(
!matches!(resp.error_code, Some(ErrorCode::RollbackFailed { .. })),
"rollback itself must succeed; got {:?}",
resp.error_code
);
let r = send_raw(&mut core, &mut tx, &mut rx, doc_get("docs"));
assert_eq!(r.status, Status::Ok);
assert_eq!(&*r.payload, b"preexisting");
}
#[test]
fn rollback_matrix_doc_then_graph_fail() {
let (mut core, mut tx, mut rx, _dir) = make_core();
send_ok(&mut core, &mut tx, &mut rx, doc_put("docs", b"original"));
let resp = send_raw(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Meta(MetaOp::TransactionBatch {
plans: vec![
doc_put("docs", b"modified"),
doc_insert_conflict("docs"), ],
}),
);
assert_eq!(resp.status, Status::Error);
let r = send_raw(&mut core, &mut tx, &mut rx, doc_get("docs"));
assert_eq!(r.status, Status::Ok);
assert_eq!(&*r.payload, b"original");
}
#[test]
fn rollback_matrix_graph_then_vector_fail() {
let (mut core, mut tx, mut rx, _dir) = make_core();
send_ok(&mut core, &mut tx, &mut rx, vector_set_params("vec"));
send_ok(&mut core, &mut tx, &mut rx, vector_seed("vec"));
let resp = send_raw(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Meta(MetaOp::TransactionBatch {
plans: vec![edge_put("col", "alice", "bob"), vector_fail("vec")],
}),
);
assert_eq!(resp.status, Status::Error);
assert!(
!matches!(resp.error_code, Some(ErrorCode::RollbackFailed { .. })),
"rollback itself must succeed; got {:?}",
resp.error_code
);
let n = send_raw(&mut core, &mut tx, &mut rx, neighbors("alice"));
assert_eq!(n.status, Status::Ok);
assert!(
n.payload.len() <= 3,
"edge should have been rolled back, payload len={}",
n.payload.len()
);
}
#[test]
fn rollback_matrix_graph_then_graph_and_vector_fail() {
let (mut core, mut tx, mut rx, _dir) = make_core();
send_ok(&mut core, &mut tx, &mut rx, vector_set_params("vec"));
send_ok(&mut core, &mut tx, &mut rx, vector_seed("vec"));
let resp = send_raw(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Meta(MetaOp::TransactionBatch {
plans: vec![
edge_put("col", "a", "b"),
edge_put("col", "c", "d"),
vector_fail("vec"),
],
}),
);
assert_eq!(resp.status, Status::Error);
let n_ab = send_raw(&mut core, &mut tx, &mut rx, neighbors("a"));
assert_eq!(n_ab.status, Status::Ok);
assert!(n_ab.payload.len() <= 3, "edge a→b should be rolled back");
let n_cd = send_raw(&mut core, &mut tx, &mut rx, neighbors("c"));
assert_eq!(n_cd.status, Status::Ok);
assert!(n_cd.payload.len() <= 3, "edge c→d should be rolled back");
}
#[test]
fn rollback_matrix_crdt_buffered_then_vector_fail() {
let (mut core, mut tx, mut rx, _dir) = make_core();
send_ok(&mut core, &mut tx, &mut rx, doc_put("docs", b"original"));
send_ok(&mut core, &mut tx, &mut rx, vector_set_params("vec"));
send_ok(&mut core, &mut tx, &mut rx, vector_seed("vec"));
let crdt_delta: Vec<u8> = vec![0u8; 8]; let resp = send_raw(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Meta(MetaOp::TransactionBatch {
plans: vec![
PhysicalPlan::Crdt(CrdtOp::Apply {
collection: "crdt_coll".into(),
document_id: "crdt_doc1".into(),
delta: crdt_delta,
peer_id: 1,
mutation_id: 42,
surrogate: nodedb_types::Surrogate::ZERO,
}),
doc_put("docs", b"modified"),
vector_fail("vec"),
],
}),
);
assert_eq!(resp.status, Status::Error);
assert!(
!matches!(resp.error_code, Some(ErrorCode::RollbackFailed { .. })),
"rollback itself must succeed; got {:?}",
resp.error_code
);
let r = send_raw(&mut core, &mut tx, &mut rx, doc_get("docs"));
assert_eq!(r.status, Status::Ok);
assert_eq!(&*r.payload, b"original");
}
#[test]
fn rollback_matrix_doc_doc_second_fails() {
let (mut core, mut tx, mut rx, _dir) = make_core();
send_ok(&mut core, &mut tx, &mut rx, doc_put("docs", b"preexisting"));
let other_put = PhysicalPlan::Document(DocumentOp::PointPut {
collection: "docs".into(),
document_id: "other_doc".into(),
value: b"should_not_persist".to_vec(),
surrogate: nodedb_types::Surrogate::new(99),
pk_bytes: Vec::new(),
});
let resp = send_raw(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Meta(MetaOp::TransactionBatch {
plans: vec![
other_put,
doc_insert_conflict("docs"), ],
}),
);
assert_eq!(resp.status, Status::Error);
let r = send_raw(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Document(DocumentOp::PointGet {
collection: "docs".into(),
document_id: "other_doc".into(),
rls_filters: Vec::new(),
system_as_of_ms: None,
valid_at_ms: None,
surrogate: nodedb_types::Surrogate::new(99),
pk_bytes: Vec::new(),
}),
);
assert!(
r.status == Status::Error || r.payload.is_empty() || r.payload.len() <= 3,
"other_doc should have been rolled back; status={:?} payload_len={}",
r.status,
r.payload.len()
);
}
#[test]
fn rollback_matrix_fts_side_effect_rolled_back() {
let (mut core, mut tx, mut rx, _dir) = make_core();
send_ok(&mut core, &mut tx, &mut rx, vector_set_params("vec"));
send_ok(&mut core, &mut tx, &mut rx, vector_seed("vec"));
let doc_value = r#"{"title":"unique_rollback_sentinel quantum database"}"#;
let resp = send_raw(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Meta(MetaOp::TransactionBatch {
plans: vec![
PhysicalPlan::Document(DocumentOp::PointPut {
collection: "articles".into(),
document_id: "fts_rollback_doc".into(),
value: doc_value.as_bytes().to_vec(),
surrogate: nodedb_types::Surrogate::new(7001),
pk_bytes: b"fts_rollback_doc".to_vec(),
}),
vector_fail("vec"),
],
}),
);
assert_eq!(
resp.status,
Status::Error,
"batch must fail on dim-mismatch"
);
assert!(
!matches!(resp.error_code, Some(ErrorCode::RollbackFailed { .. })),
"rollback itself must succeed; got {:?}",
resp.error_code
);
let search_resp = send_raw(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Text(TextOp::Search {
collection: "articles".into(),
query: "unique_rollback_sentinel".into(),
top_k: 10,
fuzzy: false,
rls_filters: Vec::new(),
prefilter: None,
}),
);
assert_eq!(search_resp.status, Status::Ok);
let json = crate::helpers::payload_json(&search_resp.payload);
let val: serde_json::Value =
serde_json::from_str(&json).unwrap_or(serde_json::Value::Array(vec![]));
let empty = vec![];
let arr = val.as_array().unwrap_or(&empty);
assert!(
arr.is_empty(),
"FTS posting for rolled-back doc must not appear in search results; got {json}"
);
}
#[test]
fn rollback_matrix_spatial_not_written_in_tx_path() {
use nodedb::bridge::physical_plan::{SpatialOp, SpatialPredicate};
use nodedb_types::geometry::Geometry;
let (mut core, mut tx, mut rx, _dir) = make_core();
send_ok(&mut core, &mut tx, &mut rx, vector_set_params("vec"));
send_ok(&mut core, &mut tx, &mut rx, vector_seed("vec"));
let geo_doc = r#"{"name":"poi","location":{"type":"Point","coordinates":[10.0,20.0]}}"#;
let resp = send_raw(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Meta(MetaOp::TransactionBatch {
plans: vec![
PhysicalPlan::Document(DocumentOp::PointPut {
collection: "places".into(),
document_id: "geo_rollback_doc".into(),
value: geo_doc.as_bytes().to_vec(),
surrogate: nodedb_types::Surrogate::new(8001),
pk_bytes: b"geo_rollback_doc".to_vec(),
}),
vector_fail("vec"),
],
}),
);
assert_eq!(
resp.status,
Status::Error,
"batch must fail on dim-mismatch"
);
assert!(
!matches!(resp.error_code, Some(ErrorCode::RollbackFailed { .. })),
"rollback itself must succeed; got {:?}",
resp.error_code
);
let query_geometry = Geometry::Point {
coordinates: [10.0, 20.0],
};
let scan_resp = send_raw(
&mut core,
&mut tx,
&mut rx,
PhysicalPlan::Spatial(SpatialOp::Scan {
collection: "places".into(),
field: "location".into(),
predicate: SpatialPredicate::DWithin,
query_geometry,
distance_meters: 1_000_000.0,
attribute_filters: Vec::new(),
limit: 10,
projection: Vec::new(),
rls_filters: Vec::new(),
prefilter: None,
}),
);
assert_eq!(scan_resp.status, Status::Ok);
let json = crate::helpers::payload_json(&scan_resp.payload);
let val: serde_json::Value =
serde_json::from_str(&json).unwrap_or(serde_json::Value::Array(vec![]));
let empty = vec![];
let arr = val.as_array().unwrap_or(&empty);
assert!(
arr.is_empty(),
"spatial scan after rollback must return zero results (tx path never writes R-tree); \
got {json}"
);
}
#[test]
fn rollback_failed_error_code_is_typed() {
let code = ErrorCode::RollbackFailed {
entry_index: 2,
detail: "sparse store error: disk full".into(),
};
assert!(
matches!(code, ErrorCode::RollbackFailed { entry_index: 2, .. }),
"RollbackFailed must carry structured fields"
);
}