use crate::channel::broadcast::{self, RecvError, SendError};
use crate::cx::Cx;
use crate::lab::{LabConfig, LabRuntime};
use proptest::prelude::*;
use std::collections::HashMap;
use std::future::Future;
use std::rc::Rc;
use std::task::{Context, Poll};
fn test_cx() -> Cx<crate::cx::cap::All> {
Cx::for_testing()
}
fn block_on<F: Future>(f: F) -> F::Output {
let waker = std::task::Waker::noop().clone(); let mut cx = Context::from_waker(&waker);
let mut pinned = Box::pin(f);
loop {
match pinned.as_mut().poll(&mut cx) {
Poll::Ready(v) => return v,
Poll::Pending => std::thread::yield_now(),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct BroadcastMessage {
id: u64,
payload: String,
timestamp: u32,
}
impl BroadcastMessage {
fn new(id: u64, payload: impl Into<String>, timestamp: u32) -> Self {
Self {
id,
payload: payload.into(),
timestamp,
}
}
}
#[derive(Debug)]
struct FastReceiverHarness {
sender: crate::channel::broadcast::Sender<BroadcastMessage>,
receivers: Vec<crate::channel::broadcast::Receiver<BroadcastMessage>>,
}
impl FastReceiverHarness {
fn new(capacity: usize, receiver_count: usize) -> Self {
let (sender, receiver) = broadcast::channel(capacity);
let mut receivers = vec![receiver];
for _ in 1..receiver_count {
receivers.push(sender.subscribe());
}
Self { sender, receivers }
}
fn send_message(
&mut self,
cx: &Cx,
message: BroadcastMessage,
) -> Result<usize, SendError<BroadcastMessage>> {
match self.sender.send(cx, message) {
Ok(receiver_count) => Ok(receiver_count),
Err(e) => Err(e),
}
}
fn send_messages(
&mut self,
cx: &Cx,
messages: &[BroadcastMessage],
) -> Result<usize, SendError<BroadcastMessage>> {
let mut total_receivers = 0;
for msg in messages {
total_receivers = self.send_message(cx, msg.clone())?;
}
Ok(total_receivers)
}
fn receiver_mut(
&mut self,
index: usize,
) -> &mut crate::channel::broadcast::Receiver<BroadcastMessage> {
&mut self.receivers[index]
}
fn receiver_count(&self) -> usize {
self.receivers.len()
}
}
#[cfg(test)]
mod tests {
#![allow(
clippy::pedantic,
clippy::nursery,
clippy::expect_fun_call,
clippy::map_unwrap_or,
clippy::cast_possible_wrap
)]
use super::*;
#[test]
fn mr1_fast_receiver_preservation() {
let _runtime = Rc::new(LabRuntime::new(LabConfig::default()));
proptest!(|(
capacity in 3usize..8,
receiver_count in 2usize..5,
message_count in 2usize..6,
payload_seed in any::<u32>(),
)| {
let cx = test_cx();
let mut harness = FastReceiverHarness::new(capacity, receiver_count);
prop_assume!(capacity >= message_count);
let messages: Vec<BroadcastMessage> = (0..message_count)
.map(|i| {
BroadcastMessage::new(
i as u64,
format!("payload_{}_{}", payload_seed, i),
i as u32,
)
})
.collect();
let send_result = harness.send_messages(&cx, &messages);
prop_assert!(send_result.is_ok(), "MR1 VIOLATION: Send failed with fast receivers: {:?}", send_result);
let reported_receiver_count = send_result.unwrap(); prop_assert_eq!(reported_receiver_count, harness.receiver_count(),
"MR1 VIOLATION: Send reported wrong receiver count");
for receiver_idx in 0..harness.receiver_count() {
let mut received_messages = Vec::new();
for expected_idx in 0..message_count {
match block_on(harness.receiver_mut(receiver_idx).recv(&cx)) {
Ok(msg) => received_messages.push(msg),
Err(RecvError::Lagged(skip_count)) => {
prop_assert!(false,
"MR1 VIOLATION: Receiver {} lagged (skipped {}) with sufficient capacity {} for {} messages",
receiver_idx, skip_count, capacity, message_count);
}
Err(RecvError::Closed) => {
prop_assert!(false,
"MR1 VIOLATION: Receiver {} got Closed at message {} of {}",
receiver_idx, expected_idx, message_count);
}
Err(RecvError::Cancelled) => {
prop_assert!(false,
"MR1 VIOLATION: Receiver {} got Cancelled at message {} of {}",
receiver_idx, expected_idx, message_count);
}
Err(RecvError::PolledAfterCompletion) => {
prop_assert!(false,
"MR1 VIOLATION: Receiver {} recv future was polled after completion at message {} of {}",
receiver_idx, expected_idx, message_count);
}
}
}
prop_assert_eq!(received_messages.len(), message_count,
"MR1 VIOLATION: Receiver {} got {} messages, expected {}",
receiver_idx, received_messages.len(), message_count);
for (received, expected) in received_messages.iter().zip(&messages) {
prop_assert_eq!(received, expected,
"MR1 VIOLATION: Receiver {} got wrong message content", receiver_idx);
}
prop_assert_eq!(&received_messages, &messages,
"MR1 VIOLATION: Receiver {} got messages in wrong order", receiver_idx);
}
});
}
#[test]
fn mr2_receiver_count_independence() {
let _runtime = Rc::new(LabRuntime::new(LabConfig::default()));
proptest!(|(
capacity in 4usize..8,
message_count in 2usize..5,
content_base in "[a-z]{2,4}",
)| {
prop_assume!(capacity >= message_count);
let cx = test_cx();
let messages: Vec<BroadcastMessage> = (0..message_count)
.map(|i| BroadcastMessage::new(i as u64, format!("{}{}", content_base, i), i as u32))
.collect();
let mut results_by_receiver_count = HashMap::new();
for receiver_count in [1, 2, 3, 4].iter().cloned() {
let mut harness = FastReceiverHarness::new(capacity, receiver_count);
harness.send_messages(&cx, &messages).unwrap();
let mut receiver_sequences = Vec::new();
for receiver_idx in 0..receiver_count {
let mut received = Vec::new();
for _ in 0..message_count {
match block_on(harness.receiver_mut(receiver_idx).recv(&cx)) {
Ok(msg) => received.push(msg),
Err(e) => {
prop_assert!(false,
"MR2 VIOLATION: Receiver {} failed with {} receivers: {:?}",
receiver_idx, receiver_count, e);
}
}
}
receiver_sequences.push(received);
}
results_by_receiver_count.insert(receiver_count, receiver_sequences);
}
let reference_sequence = &messages;
for (count, receiver_sequences) in &results_by_receiver_count {
for (receiver_idx, received_seq) in receiver_sequences.iter().enumerate() {
prop_assert_eq!(received_seq, reference_sequence,
"MR2 VIOLATION: With {} receivers, receiver {} saw different sequence",
count, receiver_idx);
}
if receiver_sequences.len() > 1 {
let first_receiver_seq = &receiver_sequences[0];
for (idx, other_seq) in receiver_sequences.iter().enumerate().skip(1) {
prop_assert_eq!(other_seq, first_receiver_seq,
"MR2 VIOLATION: Receivers 0 and {} saw different sequences with {} total receivers",
idx, count);
}
}
}
});
}
#[test]
fn mr3_send_rate_independence() {
let _runtime = Rc::new(LabRuntime::new(LabConfig::default()));
proptest!(|(
capacity in 3usize..6,
message_count in 2usize..4, // Keep small for timing tests
payload_prefix in "[0-9]{2}",
)| {
prop_assume!(capacity >= message_count);
let cx = test_cx();
let messages: Vec<BroadcastMessage> = (0..message_count)
.map(|i| BroadcastMessage::new(i as u64, format!("{}_msg_{}", payload_prefix, i), i as u32))
.collect();
let mut harness_fast = FastReceiverHarness::new(capacity, 2);
harness_fast.send_messages(&cx, &messages).unwrap();
let mut fast_received_r0 = Vec::new();
let mut fast_received_r1 = Vec::new();
for _ in 0..message_count {
let msg0 = block_on(harness_fast.receiver_mut(0).recv(&cx)).unwrap(); let msg1 = block_on(harness_fast.receiver_mut(1).recv(&cx)).unwrap(); fast_received_r0.push(msg0);
fast_received_r1.push(msg1);
}
let mut harness_slow = FastReceiverHarness::new(capacity, 2);
let mut slow_received_r0 = Vec::new();
let mut slow_received_r1 = Vec::new();
for msg in &messages {
harness_slow.send_message(&cx, msg.clone()).unwrap();
let msg0 = block_on(harness_slow.receiver_mut(0).recv(&cx)).unwrap(); let msg1 = block_on(harness_slow.receiver_mut(1).recv(&cx)).unwrap();
slow_received_r0.push(msg0);
slow_received_r1.push(msg1);
}
prop_assert_eq!(&fast_received_r0, &slow_received_r0,
"MR3 VIOLATION: Receiver 0 saw different sequences for fast vs slow send rates");
prop_assert_eq!(&fast_received_r1, &slow_received_r1,
"MR3 VIOLATION: Receiver 1 saw different sequences for fast vs slow send rates");
prop_assert_eq!(&fast_received_r0, &messages,
"MR3 VIOLATION: Fast send rate didn't preserve all messages");
prop_assert_eq!(&slow_received_r0, &messages,
"MR3 VIOLATION: Slow send rate didn't preserve all messages");
});
}
#[test]
fn mr4_subscription_timing_independence() {
let _runtime = Rc::new(LabRuntime::new(LabConfig::default()));
proptest!(|(
capacity in 4usize..8,
pre_messages in 1usize..3,
post_messages in 2usize..4,
data_tag in "[a-z]{3}",
)| {
prop_assume!(capacity >= pre_messages + post_messages);
let cx = test_cx();
let pre_subscription_messages: Vec<BroadcastMessage> = (0..pre_messages)
.map(|i| BroadcastMessage::new(i as u64, format!("pre_{}_{}", data_tag, i), i as u32))
.collect();
let post_subscription_messages: Vec<BroadcastMessage> = (pre_messages..pre_messages + post_messages)
.map(|i| BroadcastMessage::new(i as u64, format!("post_{}_{}", data_tag, i), i as u32))
.collect();
let (sender_early, mut early_receiver) = broadcast::channel(capacity);
for msg in &pre_subscription_messages {
sender_early.send(&cx, msg.clone()).unwrap(); }
for msg in &post_subscription_messages {
sender_early.send(&cx, msg.clone()).unwrap(); }
let mut early_all_received = Vec::new();
for _ in 0..(pre_messages + post_messages) {
match block_on(early_receiver.recv(&cx)) {
Ok(msg) => early_all_received.push(msg),
Err(e) => prop_assert!(false, "Early receiver error: {:?}", e),
}
}
let (sender_late, _initial_receiver) = broadcast::channel(capacity);
for msg in &pre_subscription_messages {
sender_late.send(&cx, msg.clone()).unwrap(); }
let mut late_receiver = sender_late.subscribe();
for msg in &post_subscription_messages {
sender_late.send(&cx, msg.clone()).unwrap(); }
let mut late_post_received = Vec::new();
for _ in 0..post_messages {
match block_on(late_receiver.recv(&cx)) {
Ok(msg) => late_post_received.push(msg),
Err(e) => prop_assert!(false, "Late receiver error: {:?}", e),
}
}
let early_post_received: Vec<_> = early_all_received
.into_iter()
.skip(pre_messages)
.collect();
prop_assert_eq!(early_post_received, late_post_received.clone(),
"MR4 VIOLATION: Early vs late subscription produced different post-subscription sequences");
prop_assert_eq!(late_post_received, post_subscription_messages,
"MR4 VIOLATION: Late subscription didn't preserve post-subscription messages");
});
}
#[test]
fn mr_composite_complete_preservation() {
let _runtime = Rc::new(LabRuntime::new(LabConfig::default()));
let cx = test_cx();
let messages = vec![
BroadcastMessage::new(1, "alpha", 1),
BroadcastMessage::new(2, "beta", 2),
BroadcastMessage::new(3, "gamma", 3),
];
let mut harness = FastReceiverHarness::new(5, 3);
let send_result = harness.send_messages(&cx, &messages);
assert!(send_result.is_ok(), "Composite MR: Send failed");
assert_eq!(
send_result.unwrap(),
3,
"Composite MR: Wrong receiver count reported"
);
for receiver_idx in 0..3 {
for (expected_idx, expected_msg) in messages.iter().enumerate() {
let received = block_on(harness.receiver_mut(receiver_idx).recv(&cx));
match received {
Ok(msg) => {
assert_eq!(
&msg, expected_msg,
"Composite MR: Receiver {} got wrong message at position {}",
receiver_idx, expected_idx
);
}
Err(e) => panic!(
"Composite MR: Receiver {} failed at position {}: {:?}",
receiver_idx, expected_idx, e
),
}
}
}
}
}
#[cfg(test)]
mod validation_tests {
use super::*;
#[test]
fn validate_fast_receiver_harness() {
let _runtime = Rc::new(LabRuntime::new(LabConfig::default()));
let cx = test_cx();
let mut harness = FastReceiverHarness::new(3, 2);
let msg = BroadcastMessage::new(1, "test", 1);
let send_result = harness.send_message(&cx, msg.clone());
assert!(
send_result.is_ok(),
"Harness validation: Send should succeed"
);
assert_eq!(
send_result.unwrap(),
2,
"Harness validation: Should report 2 receivers"
);
let recv1 = block_on(harness.receiver_mut(0).recv(&cx));
let recv2 = block_on(harness.receiver_mut(1).recv(&cx));
assert_eq!(
recv1.unwrap(),
msg,
"Harness validation: Receiver 0 should get message"
);
assert_eq!(
recv2.unwrap(),
msg,
"Harness validation: Receiver 1 should get message"
);
}
#[test]
fn validate_capacity_constraints() {
let _runtime = Rc::new(LabRuntime::new(LabConfig::default()));
let cx = test_cx();
let mut harness = FastReceiverHarness::new(2, 1);
let messages = vec![
BroadcastMessage::new(1, "msg1", 1),
BroadcastMessage::new(2, "msg2", 2),
];
let send_result = harness.send_messages(&cx, &messages);
assert!(
send_result.is_ok(),
"Capacity validation: Should handle exact capacity match"
);
for expected_msg in &messages {
let received = block_on(harness.receiver_mut(0).recv(&cx));
assert_eq!(
received.unwrap(),
*expected_msg,
"Capacity validation: Should receive correct message"
);
}
}
}