Skip to main content

azul_core/
udp_framing.rs

1//! UDP chunked-message framing (SUPER_PLAN_2 P8).
2//!
3//! Splits a large payload (a video keyframe) into sequenced datagrams and
4//! reassembles them on the far side, tolerating reorder + loss: a message whose
5//! chunks never all arrive is dropped (bounded memory), never retransmitted -
6//! the fault-tolerant model realtime A/V wants. This is pure (no socket), so
7//! the dll's `Udp` handle builds on it and the logic is unit-testable here.
8//!
9//! Wire format per datagram: an 8-byte little-endian header
10//! (`msg_id: u32`, `chunk_idx: u16`, `chunk_count: u16`) followed by the chunk
11//! payload.
12
13use alloc::collections::BTreeMap;
14use alloc::vec::Vec;
15
16/// Datagram header length: msg_id (u32) + chunk_idx (u16) + chunk_count (u16).
17pub const CHUNK_HEADER_LEN: usize = 8;
18/// Conservative per-datagram payload (leaves room under a ~1400-byte path MTU).
19pub const DEFAULT_CHUNK_PAYLOAD: usize = 1200;
20/// Cap on in-flight partial messages; the oldest is evicted past this so a lost
21/// chunk can never leak memory.
22const MAX_PARTIAL_MESSAGES: usize = 256;
23
24/// Split `data` into chunk datagrams for `msg_id`. Each datagram is
25/// `CHUNK_HEADER_LEN + <= max_payload` bytes. An empty payload still produces
26/// one (header-only) chunk, so a zero-length message round-trips.
27pub fn chunk_message(msg_id: u32, data: &[u8], max_payload: usize) -> Vec<Vec<u8>> {
28    let max_payload = max_payload.max(1);
29    let count = if data.is_empty() {
30        1
31    } else {
32        data.len().div_ceil(max_payload)
33    };
34    let count_u16 = count.min(u16::MAX as usize) as u16;
35    let mut out = Vec::with_capacity(count_u16 as usize);
36    for idx in 0..count_u16 {
37        let start = idx as usize * max_payload;
38        let end = (start + max_payload).min(data.len());
39        let mut p = Vec::with_capacity(CHUNK_HEADER_LEN + end.saturating_sub(start));
40        p.extend_from_slice(&msg_id.to_le_bytes());
41        p.extend_from_slice(&idx.to_le_bytes());
42        p.extend_from_slice(&count_u16.to_le_bytes());
43        if start < end {
44            p.extend_from_slice(&data[start..end]);
45        }
46        out.push(p);
47    }
48    out
49}
50
51struct PartialMessage {
52    count: u16,
53    chunks: BTreeMap<u16, Vec<u8>>,
54}
55
56/// Reassembles chunk datagrams into complete messages, tolerating out-of-order
57/// delivery and dropping incomplete messages once too many pile up.
58#[derive(Default)]
59pub struct UdpReassembler {
60    partial: BTreeMap<u32, PartialMessage>,
61}
62
63impl UdpReassembler {
64    pub fn new() -> Self {
65        Self::default()
66    }
67
68    /// Ingest one datagram. Returns the fully-reassembled message if this
69    /// datagram completed one, else `None`. Malformed datagrams are ignored.
70    pub fn ingest(&mut self, datagram: &[u8]) -> Option<Vec<u8>> {
71        if datagram.len() < CHUNK_HEADER_LEN {
72            return None;
73        }
74        let msg_id = u32::from_le_bytes([datagram[0], datagram[1], datagram[2], datagram[3]]);
75        let idx = u16::from_le_bytes([datagram[4], datagram[5]]);
76        let count = u16::from_le_bytes([datagram[6], datagram[7]]);
77        if count == 0 || idx >= count {
78            return None;
79        }
80
81        let entry = self
82            .partial
83            .entry(msg_id)
84            .or_insert_with(|| PartialMessage {
85                count,
86                chunks: BTreeMap::new(),
87            });
88        entry.chunks.insert(idx, datagram[CHUNK_HEADER_LEN..].to_vec());
89
90        if entry.chunks.len() == entry.count as usize {
91            let msg = self.partial.remove(&msg_id).unwrap();
92            let mut out = Vec::new();
93            for (_, chunk) in msg.chunks {
94                out.extend_from_slice(&chunk);
95            }
96            return Some(out);
97        }
98
99        // Bound memory: evict the oldest partial message if too many pile up.
100        if self.partial.len() > MAX_PARTIAL_MESSAGES {
101            if let Some((&oldest, _)) = self.partial.iter().next() {
102                self.partial.remove(&oldest);
103            }
104        }
105        None
106    }
107}
108
109#[cfg(test)]
110mod tests {
111    use super::*;
112    use alloc::vec;
113
114    #[test]
115    fn chunk_reassemble_roundtrip() {
116        let data: Vec<u8> = (0..3000u32).map(|i| (i % 256) as u8).collect();
117        let chunks = chunk_message(7, &data, DEFAULT_CHUNK_PAYLOAD);
118        assert_eq!(chunks.len(), 3, "3000 bytes / 1200 = 3 chunks");
119
120        let mut r = UdpReassembler::new();
121        let mut done = None;
122        for c in &chunks {
123            if let Some(m) = r.ingest(c) {
124                done = Some(m);
125            }
126        }
127        assert_eq!(done.expect("message completes"), data);
128    }
129
130    #[test]
131    fn reassembles_out_of_order() {
132        let data: Vec<u8> = (0..2500u32).map(|i| (i % 7) as u8).collect();
133        let mut chunks = chunk_message(1, &data, DEFAULT_CHUNK_PAYLOAD);
134        chunks.reverse(); // deliver last-chunk-first
135
136        let mut r = UdpReassembler::new();
137        let mut done = None;
138        for c in &chunks {
139            if let Some(m) = r.ingest(c) {
140                done = Some(m);
141            }
142        }
143        assert_eq!(done.expect("reorder-tolerant"), data);
144    }
145
146    #[test]
147    fn incomplete_message_yields_nothing() {
148        let data: Vec<u8> = vec![9u8; 2000]; // 2 chunks
149        let chunks = chunk_message(2, &data, DEFAULT_CHUNK_PAYLOAD);
150        let mut r = UdpReassembler::new();
151        assert!(
152            r.ingest(&chunks[0]).is_none(),
153            "one of two chunks is not a complete message"
154        );
155    }
156
157    #[test]
158    fn empty_message_roundtrips() {
159        let chunks = chunk_message(3, &[], DEFAULT_CHUNK_PAYLOAD);
160        assert_eq!(chunks.len(), 1, "empty payload still sends one chunk");
161        let mut r = UdpReassembler::new();
162        assert_eq!(r.ingest(&chunks[0]).expect("completes"), Vec::<u8>::new());
163    }
164
165    #[test]
166    fn two_interleaved_messages() {
167        let a: Vec<u8> = vec![1u8; 1500]; // 2 chunks
168        let b: Vec<u8> = vec![2u8; 1500]; // 2 chunks
169        let ca = chunk_message(10, &a, DEFAULT_CHUNK_PAYLOAD);
170        let cb = chunk_message(11, &b, DEFAULT_CHUNK_PAYLOAD);
171        let mut r = UdpReassembler::new();
172        // Interleave: a0, b0, a1 (-> a done), b1 (-> b done)
173        assert!(r.ingest(&ca[0]).is_none());
174        assert!(r.ingest(&cb[0]).is_none());
175        assert_eq!(r.ingest(&ca[1]).expect("a done"), a);
176        assert_eq!(r.ingest(&cb[1]).expect("b done"), b);
177    }
178}