#![cfg(test)]
use crate::messaging::jetstream::{
fuzz_probe_publish_backpressure, fuzz_probe_publish_backpressure_cohort_tail_evidence,
fuzz_probe_publish_backpressure_tail_evidence,
};
fn init_test(name: &str) {
println!("[jetstream-flow-control] START {name}");
}
fn test_complete(name: &str) {
println!("[jetstream-flow-control] PASS {name}");
}
#[test]
fn audit_jetstream_publish_flow_control_backpressure() {
init_test("audit_jetstream_publish_flow_control_backpressure");
let snapshot = fuzz_probe_publish_backpressure(None, 1);
assert_eq!(snapshot.effective_max_in_flight_publishes, 1);
assert_eq!(snapshot.max_waiters, 0);
assert!(!snapshot.acquired);
assert_eq!(snapshot.in_flight_publishes_after, 1);
assert_eq!(snapshot.refused_publishes, 1);
assert!(
snapshot
.error
.as_deref()
.is_some_and(|message| message.contains("local publish backpressure"))
);
test_complete("audit_jetstream_publish_flow_control_backpressure");
}
#[test]
fn audit_publish_memory_bounds_under_slow_acks() {
init_test("audit_publish_memory_bounds_under_slow_acks");
let snapshot = fuzz_probe_publish_backpressure(None, 1);
assert_eq!(snapshot.effective_max_in_flight_publishes, 1);
assert_eq!(
snapshot.in_flight_publishes_after, 1,
"occupied publish slot must stay bounded under slow ACK assumptions"
);
assert_eq!(
snapshot.refused_publishes, 1,
"slow ACK path must refuse the next publish instead of accumulating hidden waiters"
);
test_complete("audit_publish_memory_bounds_under_slow_acks");
}
#[test]
fn audit_pressure_signaling_integration() {
init_test("audit_pressure_signaling_integration");
let snapshot = fuzz_probe_publish_backpressure(Some(0.0), 0);
assert_eq!(snapshot.effective_max_in_flight_publishes, 0);
assert_eq!(snapshot.pressure_level.as_deref(), Some("emergency"));
assert!(!snapshot.acquired);
assert!(
snapshot
.error
.as_deref()
.is_some_and(|message| message.contains("pressure=emergency"))
);
test_complete("audit_pressure_signaling_integration");
}
#[test]
fn audit_current_tcp_flow_control_behavior() {
init_test("audit_current_tcp_flow_control_behavior");
test_complete("audit_current_tcp_flow_control_behavior");
}
#[test]
fn audit_reference_backpressure_pattern() {
init_test("audit_reference_backpressure_pattern");
test_complete("audit_reference_backpressure_pattern");
}
#[test]
fn audit_publish_wait_tail_zero_for_refusal_only_policy() {
init_test("audit_publish_wait_tail_zero_for_refusal_only_policy");
let snapshot = fuzz_probe_publish_backpressure_tail_evidence(None, 1, 64);
assert_eq!(snapshot.tail_sample_count, 64);
assert_eq!(snapshot.accepted_count, 0);
assert_eq!(snapshot.refused_count, 64);
assert!(snapshot.waiter_queue_absent);
assert_eq!(snapshot.waiter_fairness_mode, "vacuous_zero_wait_refusal");
assert!(snapshot.refusal_only_policy);
assert_eq!(snapshot.tail_evidence_mode, "zero_wait_refusal_only");
assert_eq!(snapshot.publish_wait_latency_p95_micros, 0);
assert_eq!(snapshot.publish_wait_latency_p99_micros, 0);
assert_eq!(snapshot.publish_wait_latency_p999_micros, 0);
test_complete("audit_publish_wait_tail_zero_for_refusal_only_policy");
}
#[test]
fn audit_publish_wait_tail_zero_for_multi_publisher_loss_system() {
init_test("audit_publish_wait_tail_zero_for_multi_publisher_loss_system");
let snapshot = fuzz_probe_publish_backpressure_cohort_tail_evidence(32, 16);
assert_eq!(snapshot.publisher_count, 32);
assert_eq!(snapshot.occupied_publisher_count, 16);
assert_eq!(snapshot.accepted_count, 16);
assert_eq!(snapshot.refused_count, 16);
assert!(snapshot.waiter_queue_absent);
assert_eq!(snapshot.waiter_fairness_mode, "vacuous_zero_wait_refusal");
assert!(snapshot.refusal_only_policy);
assert!(snapshot.multi_publisher_tail_evidence_present);
assert_eq!(snapshot.queueing_model, "mg11_loss_system");
assert_eq!(snapshot.publish_wait_latency_p95_micros, 0);
assert_eq!(snapshot.publish_wait_latency_p99_micros, 0);
assert_eq!(snapshot.publish_wait_latency_p999_micros, 0);
test_complete("audit_publish_wait_tail_zero_for_multi_publisher_loss_system");
}