use std::{
num::NonZeroUsize,
time::{Duration, Instant},
};
use bincode::{BorrowDecode, Encode};
use rstest::{fixture, rstest};
use crate::fragment::{
FragmentError,
FragmentHeader,
FragmentIndex,
Fragmenter,
MessageId,
ReassembledMessage,
Reassembler,
ReassemblyError,
};
#[fixture]
fn reassembler_with_first_fragment(
#[default(1)] message_id: u64,
#[default(&[])] first_payload: &'static [u8],
) -> Reassembler {
let mut reassembler = Reassembler::new(
NonZeroUsize::new(8).expect("non-zero"),
Duration::from_secs(30),
);
let first = FragmentHeader::new(MessageId::new(message_id), FragmentIndex::zero(), false);
assert!(
reassembler
.push(first, first_payload)
.expect("first fragment accepted")
.is_none()
);
reassembler
}
#[test]
fn reassembler_allows_single_fragment_at_max_message_size() {
let max_message_size = NonZeroUsize::new(16).expect("non-zero");
let mut reassembler = Reassembler::new(max_message_size, Duration::from_secs(5));
let header = FragmentHeader::new(MessageId::new(20), FragmentIndex::zero(), true);
let payload = vec![0_u8; max_message_size.get()];
let result = reassembler
.push(header, payload)
.expect("fragment within limit should be accepted");
let assembled = result.expect("single fragment should complete reassembly");
assert_eq!(assembled.payload().len(), max_message_size.get());
assert_eq!(reassembler.buffered_len(), 0);
}
#[test]
fn reassembler_allows_multi_fragment_at_max_message_size() {
let max_message_size = NonZeroUsize::new(16).expect("non-zero");
let mut reassembler = Reassembler::new(max_message_size, Duration::from_secs(5));
let first_header = FragmentHeader::new(MessageId::new(21), FragmentIndex::zero(), false);
let second_header = FragmentHeader::new(MessageId::new(21), FragmentIndex::new(1), true);
let first_payload = vec![0_u8; 8];
let second_payload = vec![1_u8; max_message_size.get() - first_payload.len()];
assert!(
reassembler
.push(first_header, first_payload)
.expect("first fragment within limit")
.is_none(),
"first fragment should not complete the message",
);
let result = reassembler
.push(second_header, second_payload)
.expect("second fragment keeps total at limit");
let assembled = result.expect("fragments should complete reassembly at exact limit");
assert_eq!(assembled.payload().len(), max_message_size.get());
assert_eq!(reassembler.buffered_len(), 0);
}
#[test]
fn reassembler_returns_single_fragment_immediately() {
let mut reassembler = Reassembler::new(
NonZeroUsize::new(16).expect("non-zero"),
Duration::from_secs(5),
);
let header = FragmentHeader::new(MessageId::new(1), FragmentIndex::zero(), true);
let payload = vec![1_u8, 2, 3, 4];
let complete = reassembler
.push(header, payload.clone())
.expect("reassembly must succeed")
.expect("single fragment should complete message");
assert_eq!(complete.message_id(), MessageId::new(1));
assert_eq!(complete.payload(), payload.as_slice());
assert_eq!(reassembler.buffered_len(), 0);
}
#[rstest]
fn reassembler_accumulates_ordered_fragments(
#[with(2, &[5_u8, 6, 7])] mut reassembler_with_first_fragment: Reassembler,
) {
let final_fragment = FragmentHeader::new(MessageId::new(2), FragmentIndex::new(1), true);
let complete = reassembler_with_first_fragment
.push(final_fragment, [8_u8, 9])
.expect("final fragment accepted")
.expect("message should complete");
assert_eq!(complete.payload(), &[5, 6, 7, 8, 9]);
assert_eq!(reassembler_with_first_fragment.buffered_len(), 0);
}
#[rstest]
fn reassembler_rejects_out_of_order_and_drops_partial(
#[with(3, &[1_u8, 2])] mut reassembler_with_first_fragment: Reassembler,
) {
let skipped = FragmentHeader::new(MessageId::new(3), FragmentIndex::new(2), true);
let err = reassembler_with_first_fragment
.push(skipped, [3_u8])
.expect_err("out-of-order fragment must be rejected");
assert!(matches!(
err,
ReassemblyError::Fragment(FragmentError::IndexMismatch { .. })
));
assert_eq!(reassembler_with_first_fragment.buffered_len(), 0);
}
#[rstest]
fn reassembler_suppresses_duplicate_fragment(
#[with(31, &[1_u8, 2])] mut reassembler_with_first_fragment: Reassembler,
) {
let duplicate = FragmentHeader::new(MessageId::new(31), FragmentIndex::zero(), false);
let final_fragment = FragmentHeader::new(MessageId::new(31), FragmentIndex::new(1), true);
assert!(
reassembler_with_first_fragment
.push(duplicate, [9_u8, 9])
.expect("duplicate fragment should be suppressed")
.is_none()
);
assert_eq!(reassembler_with_first_fragment.buffered_len(), 1);
let complete = reassembler_with_first_fragment
.push(final_fragment, [3_u8])
.expect("final fragment should complete message")
.expect("message should be complete");
assert_eq!(complete.payload(), &[1, 2, 3]);
}
#[test]
fn reassembler_accepts_zero_length_fragments() {
let mut reassembler = Reassembler::new(
NonZeroUsize::new(8).expect("non-zero"),
Duration::from_secs(10),
);
let first = FragmentHeader::new(MessageId::new(44), FragmentIndex::zero(), false);
let second = FragmentHeader::new(MessageId::new(44), FragmentIndex::new(1), true);
assert!(
reassembler
.push(first, [])
.expect("empty fragment should be accepted")
.is_none()
);
let complete = reassembler
.push(second, [7_u8, 8])
.expect("final fragment should complete message")
.expect("message should be complete");
assert_eq!(complete.payload(), &[7, 8]);
}
#[test]
fn reassembler_enforces_maximum_payload_size() {
let mut reassembler = Reassembler::new(
NonZeroUsize::new(4).expect("non-zero"),
Duration::from_secs(30),
);
let first = FragmentHeader::new(MessageId::new(4), FragmentIndex::zero(), false);
let final_fragment = FragmentHeader::new(MessageId::new(4), FragmentIndex::new(1), true);
assert!(
reassembler
.push(first, [1_u8, 2, 3])
.expect("first fragment accepted")
.is_none()
);
let err = reassembler
.push(final_fragment, [4_u8, 5])
.expect_err("payload growth beyond cap must be rejected");
assert_eq!(
err,
ReassemblyError::MessageTooLarge {
message_id: MessageId::new(4),
attempted: 5,
limit: NonZeroUsize::new(4).expect("non-zero"),
}
);
assert_eq!(reassembler.buffered_len(), 0);
}
#[test]
fn reassembler_purges_expired_messages() {
let mut reassembler = Reassembler::new(
NonZeroUsize::new(8).expect("non-zero"),
Duration::from_secs(2),
);
let now = Instant::now();
let header = FragmentHeader::new(MessageId::new(5), FragmentIndex::zero(), false);
assert!(
reassembler
.push_at(header, [0_u8, 1], now)
.expect("first fragment accepted")
.is_none()
);
assert_eq!(reassembler.buffered_len(), 1);
let evicted = reassembler.purge_expired_at(now + Duration::from_secs(3));
assert_eq!(evicted, vec![MessageId::new(5)]);
assert_eq!(reassembler.buffered_len(), 0);
}
#[derive(Clone, Debug, Encode, BorrowDecode, PartialEq, Eq)]
struct ExampleMessage(u8);
#[test]
fn reassembler_decodes_reconstructed_message() {
let fragmenter = Fragmenter::new(NonZeroUsize::new(2).expect("non-zero"));
let batch = fragmenter
.fragment_message(&ExampleMessage(11))
.expect("fragment message");
let mut reassembler = Reassembler::new(
NonZeroUsize::new(4).expect("non-zero"),
Duration::from_secs(10),
);
let mut output: Option<ReassembledMessage> = None;
for fragment in batch {
let (header, payload) = fragment.into_parts();
output = reassembler
.push(header, payload)
.expect("fragment accepted");
}
let assembled = output.expect("message should complete");
let decoded: ExampleMessage = assembled.decode().expect("decode message");
assert_eq!(decoded, ExampleMessage(11));
}