use proptest::prelude::*;
use rust_tango::{Consumer, DCache, Fctl, Fseq, MCache, Producer, TangoError};
fn payload_strategy(max_size: usize) -> impl Strategy<Value = Vec<u8>> {
proptest::collection::vec(any::<u8>(), 0..=max_size)
}
proptest! {
#[test]
fn publish_consume_roundtrip(
payloads in proptest::collection::vec(payload_strategy(64), 1..32)
) {
let mcache = MCache::<64>::new();
let dcache = DCache::<64, 128>::new();
let fseq = Fseq::new(1);
let producer = Producer::new(&mcache, &dcache, &fseq);
let mut consumer = Consumer::new(&mcache, &dcache, 1);
let mut expected = Vec::new();
for payload in &payloads {
producer.publish(payload, 0, 0, 0).unwrap();
let truncated: Vec<u8> = payload.iter().take(128).copied().collect();
expected.push(truncated);
}
for expected_payload in &expected {
let fragment = consumer.poll().unwrap().expect("should have fragment");
prop_assert_eq!(fragment.payload.as_slice(), expected_payload.as_slice());
}
prop_assert!(consumer.poll().unwrap().is_none());
}
#[test]
fn flow_control_prevents_overrun(
count in 1usize..100,
credits in 1u64..16,
) {
let mcache = MCache::<64>::new();
let dcache = DCache::<64, 64>::new();
let fseq = Fseq::new(1);
let fctl = Fctl::new(credits);
let producer = Producer::with_flow_control(&mcache, &dcache, &fseq, &fctl);
let mut consumer = Consumer::with_flow_control(&mcache, &dcache, &fctl, 1);
let mut published = 0;
let mut consumed = 0;
for _ in 0..count {
match producer.publish(b"test", 0, 0, 0) {
Ok(_) => published += 1,
Err(TangoError::NoCredits) => {
}
Err(e) => prop_assert!(false, "unexpected error: {:?}", e),
}
match consumer.poll() {
Ok(Some(_)) => consumed += 1,
Ok(None) => {}
Err(TangoError::Overrun) => {
prop_assert!(false, "overrun should not happen with flow control");
}
Err(e) => prop_assert!(false, "unexpected error: {:?}", e),
}
}
while let Ok(Some(_)) = consumer.poll() {
consumed += 1;
}
prop_assert_eq!(consumed, published, "all published messages should be consumed");
}
#[test]
fn sequence_numbers_monotonic(count in 1usize..64) {
let mcache = MCache::<64>::new();
let dcache = DCache::<64, 64>::new();
let fseq = Fseq::new(1);
let fctl = Fctl::new(64);
let producer = Producer::with_flow_control(&mcache, &dcache, &fseq, &fctl);
let mut consumer = Consumer::with_flow_control(&mcache, &dcache, &fctl, 1);
for _ in 0..count {
producer.publish(b"x", 0, 0, 0).unwrap();
}
let mut last_seq = 0u64;
for _ in 0..count {
let fragment = consumer.poll().unwrap().expect("should have fragment");
prop_assert!(fragment.meta.seq > last_seq, "seq should be monotonic");
last_seq = fragment.meta.seq;
}
}
#[test]
fn payload_truncation(
payload in payload_strategy(256),
) {
let mcache = MCache::<8>::new();
let dcache = DCache::<8, 64>::new();
let fseq = Fseq::new(1);
let producer = Producer::new(&mcache, &dcache, &fseq);
let mut consumer = Consumer::new(&mcache, &dcache, 1);
let meta = producer.publish(&payload, 0, 0, 0).unwrap();
let expected_size = payload.len().min(64);
prop_assert_eq!(meta.size as usize, expected_size);
let fragment = consumer.poll().unwrap().expect("should have fragment");
prop_assert_eq!(fragment.payload.len(), expected_size);
let expected: Vec<u8> = payload.iter().take(64).copied().collect();
prop_assert_eq!(fragment.payload.as_slice(), expected.as_slice());
}
#[test]
fn credits_conserved(
initial_credits in 1u64..32,
operations in proptest::collection::vec(any::<bool>(), 1..100),
) {
let fctl = Fctl::new(initial_credits);
let mcache = MCache::<64>::new();
let dcache = DCache::<64, 64>::new();
let fseq = Fseq::new(1);
let producer = Producer::with_flow_control(&mcache, &dcache, &fseq, &fctl);
let mut consumer = Consumer::with_flow_control(&mcache, &dcache, &fctl, 1);
let mut in_flight = 0u64;
for &is_publish in &operations {
if is_publish {
if producer.publish(b"x", 0, 0, 0).is_ok() {
in_flight += 1;
}
} else if consumer.poll().unwrap().is_some() {
in_flight -= 1;
}
let available = fctl.available();
prop_assert_eq!(
available + in_flight,
initial_credits,
"credits should be conserved"
);
}
}
#[test]
fn metadata_preserved(
sig in any::<u64>(),
ctl in any::<u16>(),
ts in any::<u32>(),
) {
let mcache = MCache::<8>::new();
let dcache = DCache::<8, 64>::new();
let fseq = Fseq::new(1);
let producer = Producer::new(&mcache, &dcache, &fseq);
let mut consumer = Consumer::new(&mcache, &dcache, 1);
let meta = producer.publish(b"test", sig, ctl, ts).unwrap();
prop_assert_eq!(meta.sig, sig);
prop_assert_eq!(meta.ctl, ctl);
prop_assert_eq!(meta.ts, ts);
let fragment = consumer.poll().unwrap().expect("should have fragment");
prop_assert_eq!(fragment.meta.sig, sig);
prop_assert_eq!(fragment.meta.ctl, ctl);
prop_assert_eq!(fragment.meta.ts, ts);
}
}