use alloc::collections::BTreeMap;
use alloc::vec::Vec;
pub const CHUNK_HEADER_LEN: usize = 8;
pub const DEFAULT_CHUNK_PAYLOAD: usize = 1200;
const MAX_PARTIAL_MESSAGES: usize = 256;
pub fn chunk_message(msg_id: u32, data: &[u8], max_payload: usize) -> Vec<Vec<u8>> {
let max_payload = max_payload.max(1);
let count = if data.is_empty() {
1
} else {
data.len().div_ceil(max_payload)
};
let count_u16 = count.min(u16::MAX as usize) as u16;
let mut out = Vec::with_capacity(count_u16 as usize);
for idx in 0..count_u16 {
let start = idx as usize * max_payload;
let end = (start + max_payload).min(data.len());
let mut p = Vec::with_capacity(CHUNK_HEADER_LEN + end.saturating_sub(start));
p.extend_from_slice(&msg_id.to_le_bytes());
p.extend_from_slice(&idx.to_le_bytes());
p.extend_from_slice(&count_u16.to_le_bytes());
if start < end {
p.extend_from_slice(&data[start..end]);
}
out.push(p);
}
out
}
struct PartialMessage {
count: u16,
chunks: BTreeMap<u16, Vec<u8>>,
}
#[derive(Default)]
pub struct UdpReassembler {
partial: BTreeMap<u32, PartialMessage>,
}
impl UdpReassembler {
pub fn new() -> Self {
Self::default()
}
pub fn ingest(&mut self, datagram: &[u8]) -> Option<Vec<u8>> {
if datagram.len() < CHUNK_HEADER_LEN {
return None;
}
let msg_id = u32::from_le_bytes([datagram[0], datagram[1], datagram[2], datagram[3]]);
let idx = u16::from_le_bytes([datagram[4], datagram[5]]);
let count = u16::from_le_bytes([datagram[6], datagram[7]]);
if count == 0 || idx >= count {
return None;
}
let entry = self
.partial
.entry(msg_id)
.or_insert_with(|| PartialMessage {
count,
chunks: BTreeMap::new(),
});
entry.chunks.insert(idx, datagram[CHUNK_HEADER_LEN..].to_vec());
if entry.chunks.len() == entry.count as usize {
let msg = self.partial.remove(&msg_id).unwrap();
let mut out = Vec::new();
for (_, chunk) in msg.chunks {
out.extend_from_slice(&chunk);
}
return Some(out);
}
if self.partial.len() > MAX_PARTIAL_MESSAGES {
if let Some((&oldest, _)) = self.partial.iter().next() {
self.partial.remove(&oldest);
}
}
None
}
}
#[cfg(test)]
mod tests {
use super::*;
use alloc::vec;
#[test]
fn chunk_reassemble_roundtrip() {
let data: Vec<u8> = (0..3000u32).map(|i| (i % 256) as u8).collect();
let chunks = chunk_message(7, &data, DEFAULT_CHUNK_PAYLOAD);
assert_eq!(chunks.len(), 3, "3000 bytes / 1200 = 3 chunks");
let mut r = UdpReassembler::new();
let mut done = None;
for c in &chunks {
if let Some(m) = r.ingest(c) {
done = Some(m);
}
}
assert_eq!(done.expect("message completes"), data);
}
#[test]
fn reassembles_out_of_order() {
let data: Vec<u8> = (0..2500u32).map(|i| (i % 7) as u8).collect();
let mut chunks = chunk_message(1, &data, DEFAULT_CHUNK_PAYLOAD);
chunks.reverse();
let mut r = UdpReassembler::new();
let mut done = None;
for c in &chunks {
if let Some(m) = r.ingest(c) {
done = Some(m);
}
}
assert_eq!(done.expect("reorder-tolerant"), data);
}
#[test]
fn incomplete_message_yields_nothing() {
let data: Vec<u8> = vec![9u8; 2000]; let chunks = chunk_message(2, &data, DEFAULT_CHUNK_PAYLOAD);
let mut r = UdpReassembler::new();
assert!(
r.ingest(&chunks[0]).is_none(),
"one of two chunks is not a complete message"
);
}
#[test]
fn empty_message_roundtrips() {
let chunks = chunk_message(3, &[], DEFAULT_CHUNK_PAYLOAD);
assert_eq!(chunks.len(), 1, "empty payload still sends one chunk");
let mut r = UdpReassembler::new();
assert_eq!(r.ingest(&chunks[0]).expect("completes"), Vec::<u8>::new());
}
#[test]
fn two_interleaved_messages() {
let a: Vec<u8> = vec![1u8; 1500]; let b: Vec<u8> = vec![2u8; 1500]; let ca = chunk_message(10, &a, DEFAULT_CHUNK_PAYLOAD);
let cb = chunk_message(11, &b, DEFAULT_CHUNK_PAYLOAD);
let mut r = UdpReassembler::new();
assert!(r.ingest(&ca[0]).is_none());
assert!(r.ingest(&cb[0]).is_none());
assert_eq!(r.ingest(&ca[1]).expect("a done"), a);
assert_eq!(r.ingest(&cb[1]).expect("b done"), b);
}
}