use super::*;
#[test]
fn t5_partial_batch_append_failure_errs_all_and_poisons() {
let durable = CountingDurable::fail_write_on(b"CNT6", 3);
let shared = Arc::new(graph_with_durable(
70_006,
durable.clone(),
on(8, 8 * 1024 * 1024),
));
let mut sealeds = Vec::new();
for label in ["a", "b", "c", "d", "e"] {
let mut txn = shared.begin_write();
txn.mutator()
.create_node(LabelSet::single(db_string(label)), PropertyMap::new())
.unwrap();
sealeds.push(txn.seal(None, None).expect("seals"));
}
let sealed_a = sealeds.remove(0);
let mut replies = Vec::new();
while let Some(sealed) = sealeds.pop() {
replies.push(
shared
.submit_sealed_async_for_test(sealed)
.expect("later seq enqueued"),
);
}
let a_reply = shared
.submit_sealed_async_for_test(sealed_a)
.expect("seq 0 enqueued");
let a_result = a_reply
.recv_timeout(Duration::from_secs(10))
.expect("seq 0 waiter did not hang");
let mut err_count = usize::from(a_result.is_err());
for reply in replies {
let result = reply
.recv_timeout(Duration::from_secs(10))
.expect("later waiter did not hang");
if result.is_err() {
err_count += 1;
}
}
assert_eq!(err_count, 5, "all 5 members Err (incl. already-appended)");
assert_eq!(shared.read().meta.generation, 0);
assert_eq!(shared.read().node_count(), 0);
let mut txn = shared.begin_write();
txn.mutator()
.create_node(LabelSet::new(), PropertyMap::new())
.unwrap();
assert!(
matches!(txn.commit(), Err(GraphError::Durable { .. })),
"post-poison commit fails fast",
);
}
#[test]
fn t6_flush_failure_poisons_all() {
let durable = CountingDurable::fail_flush(b"CNT7");
let shared = Arc::new(graph_with_durable(
70_007,
durable.clone(),
on(8, 8 * 1024 * 1024),
));
let mut sealeds = Vec::new();
for label in ["a", "b", "c", "d"] {
let mut txn = shared.begin_write();
txn.mutator()
.create_node(LabelSet::single(db_string(label)), PropertyMap::new())
.unwrap();
sealeds.push(txn.seal(None, None).expect("seals"));
}
let deadline = Instant::now() + Duration::from_secs(10);
let mut handles = Vec::new();
for sealed in sealeds {
let shared = Arc::clone(&shared);
handles.push(thread::spawn(move || shared.submit_sealed_for_test(sealed)));
}
let mut err_count = 0;
for handle in handles {
if handle.join().expect("no panic").is_err() {
err_count += 1;
}
}
assert!(Instant::now() < deadline, "no waiter hung");
assert_eq!(err_count, 4, "all 4 Err on flush failure");
assert_eq!(shared.read().meta.generation, 0, "nothing published");
assert_eq!(durable.flush_count(), 0);
}
struct ArmPublishPanicProvider {
tag: ProviderTag,
armed: AtomicBool,
}
impl ArmPublishPanicProvider {
fn new(tag: &[u8; 4]) -> Arc<Self> {
Arc::new(Self {
tag: ProviderTag(*tag),
armed: AtomicBool::new(false),
})
}
}
impl IndexProvider for ArmPublishPanicProvider {
fn provider_tag(&self) -> ProviderTag {
self.tag
}
fn read_section(&self, _sub: SubTag, _bytes: &[u8]) -> Result<(), ProviderError> {
Ok(())
}
fn write_section(&self, _sub: SubTag) -> Result<Vec<u8>, ProviderError> {
Ok(Vec::new())
}
fn on_change(&self, change: &Change) -> Result<(), ProviderError> {
if matches!(change, Change::NodeCreated { .. }) && !self.armed.swap(true, Ordering::SeqCst)
{
crate::write_txn::publish_panic_inject::arm(1);
}
Ok(())
}
fn declared_sub_tags(&self) -> &[SubTag] {
&[]
}
}
#[test]
fn t5b_publish_tail_panic_acks_member_errs_rest_and_poisons() {
let durable = CountingDurable::new(b"CNT8");
let provider = ArmPublishPanicProvider::new(b"ARMP");
let shared = Arc::new(graph_with_durable_and_provider(
70_008,
durable,
provider,
on(8, 8 * 1024 * 1024),
));
let mut sealeds = Vec::new();
for label in ["x", "y", "z"] {
let mut txn = shared.begin_write();
txn.mutator()
.create_node(LabelSet::single(db_string(label)), PropertyMap::new())
.unwrap();
sealeds.push(txn.seal(None, None).expect("seals"));
}
let deadline = Instant::now() + Duration::from_secs(10);
let sealed_0 = sealeds.remove(0);
let mut handles = Vec::new();
while let Some(sealed) = sealeds.pop() {
let shared = Arc::clone(&shared);
handles.push(thread::spawn(move || shared.submit_sealed_for_test(sealed)));
for _ in 0..1_000 {
thread::yield_now();
}
}
let r0 = shared.submit_sealed_for_test(sealed_0);
let mut later = Vec::new();
for handle in handles {
later.push(handle.join().expect("waiter thread did not panic"));
}
assert!(Instant::now() < deadline, "no waiter hung");
assert!(
r0.is_ok(),
"member 0 published before the panic, got {r0:?}"
);
assert_eq!(r0.unwrap().generation, 1);
assert_eq!(
later.iter().filter(|r| r.is_err()).count(),
2,
"the panicking member + the remaining member both Err",
);
assert_eq!(shared.read().node_count(), 1);
assert_eq!(shared.read().meta.generation, 1);
let mut txn = shared.begin_write();
txn.mutator()
.create_node(LabelSet::new(), PropertyMap::new())
.unwrap();
assert!(
matches!(txn.commit(), Err(GraphError::Durable { .. })),
"post-panic commit fails fast (engine poisoned)",
);
}