1use std::collections::VecDeque;
6
7use crate::crc::CRC_LEN;
8
9#[derive(Default)]
13pub struct PacketReassembler {
14 buf: bytes::BytesMut,
15 synced: bool,
16 pending: VecDeque<bytes::Bytes>,
17}
18
19const HEADER_LEN: usize = 6;
21
22impl PacketReassembler {
23 #[must_use]
25 pub fn new() -> Self {
26 Self::default()
27 }
28
29 pub fn feed(&mut self, payload: &[u8], pusi: bool) {
40 if payload.is_empty() {
41 return;
42 }
43
44 if pusi {
45 let ptr = payload[0] as usize;
46
47 let boundary = 1 + ptr;
49 if self.synced {
50 let end = boundary.min(payload.len());
51 if end > 1 {
52 self.buf.extend_from_slice(&payload[1..end]);
53 }
54 self.try_extract_packets();
56 }
57
58 self.buf.clear();
64
65 if boundary <= payload.len() {
66 if boundary < payload.len() {
67 self.buf.extend_from_slice(&payload[boundary..]);
68 self.try_extract_packets();
69 }
70 self.synced = true;
73 } else {
74 self.synced = false;
77 }
78 } else if self.synced {
79 self.buf.extend_from_slice(payload);
81 self.try_extract_packets();
82 }
83 }
85
86 fn try_extract_packets(&mut self) {
88 loop {
89 if self.buf.len() < HEADER_LEN {
91 break;
92 }
93
94 let payload_len_bits = ((self.buf[4] as u16) << 8) | (self.buf[5] as u16);
96 let payload_len_bytes = payload_len_bits.div_ceil(8);
97 let total_packet_len = HEADER_LEN + payload_len_bytes as usize + CRC_LEN;
98
99 if self.buf.len() < total_packet_len {
100 break;
101 }
102
103 let packet_bytes = self.buf.split_to(total_packet_len);
105 self.pending.push_back(packet_bytes.freeze());
106 }
107 }
108
109 pub fn pop_packet(&mut self) -> Option<bytes::Bytes> {
111 self.pending.pop_front()
112 }
113
114 pub fn drain_packets(&mut self) -> impl Iterator<Item = bytes::Bytes> + '_ {
116 self.pending.drain(..)
117 }
118}
119
120#[cfg(test)]
121mod tests {
122 use super::*;
123
124 fn make_t2mi_packet(packet_type: u8, count: u8, payload: &[u8]) -> Vec<u8> {
125 let mut pkt = Vec::with_capacity(6 + payload.len() + 4);
126 pkt.push(packet_type);
127 pkt.push(count);
128 let header_byte2 = 0x00;
130 let header_byte3 = 0x00;
131 pkt.push(header_byte2);
132 pkt.push(header_byte3);
133 let bits = (payload.len() * 8) as u16;
134 pkt.extend_from_slice(&bits.to_be_bytes());
135 pkt.extend_from_slice(payload);
136 pkt.extend_from_slice(&[0u8; 4]);
138 pkt
139 }
140
141 #[test]
142 fn reassembles_single_packet_with_pusi_offset_0() {
143 let t2mi = make_t2mi_packet(0x00, 0, &[0xAA, 0xBB]);
144 let mut reasm = PacketReassembler::new();
145 let mut ts_payload = vec![0x00];
147 ts_payload.extend_from_slice(&t2mi);
148 reasm.feed(&ts_payload, true);
149 let pkt = reasm.pop_packet().unwrap();
150 assert_eq!(&pkt[..], &t2mi[..]);
151 }
152
153 #[test]
154 fn reassembles_packet_spanning_two_ts_packets() {
155 let t2mi = make_t2mi_packet(0x00, 0, &[0xCC; 200]);
156 let mut reasm = PacketReassembler::new();
157
158 let mut ts1 = vec![0x00];
160 ts1.extend_from_slice(&t2mi[..100]);
161 reasm.feed(&ts1, true);
162 assert!(reasm.pop_packet().is_none());
163
164 reasm.feed(&t2mi[100..], false);
166 let pkt = reasm.pop_packet().unwrap();
167 assert_eq!(&pkt[..], &t2mi[..]);
168 }
169
170 #[test]
171 fn reassembles_two_packets_in_one_ts_payload() {
172 let t1 = make_t2mi_packet(0x00, 0, &[0x11]);
173 let t2 = make_t2mi_packet(0x01, 1, &[0x22]);
174 let mut reasm = PacketReassembler::new();
175
176 let mut ts_payload = vec![0x00]; ts_payload.extend_from_slice(&t1);
178 ts_payload.extend_from_slice(&t2);
179 reasm.feed(&ts_payload, true);
180
181 let p1 = reasm.pop_packet().unwrap();
182 let p2 = reasm.pop_packet().unwrap();
183 assert_eq!(&p1[..], &t1[..]);
184 assert_eq!(&p2[..], &t2[..]);
185 }
186
187 #[test]
188 fn handles_pusi_with_nonzero_pointer() {
189 let t2mi = make_t2mi_packet(0x00, 5, &[0xDE]);
190 let mut reasm = PacketReassembler::new();
191 let mut ts_payload = vec![0x03, 0xFF, 0xFF, 0xFF];
193 ts_payload.extend_from_slice(&t2mi);
194 reasm.feed(&ts_payload, true);
195 let pkt = reasm.pop_packet().unwrap();
196 assert_eq!(&pkt[..], &t2mi[..]);
197 }
198
199 #[test]
200 fn discards_data_before_first_pusi() {
201 let mut reasm = PacketReassembler::new();
202 reasm.feed(&[0xAA, 0xBB], false); assert!(reasm.pop_packet().is_none());
204 }
205
206 #[test]
207 fn handles_empty_payload() {
208 let mut reasm = PacketReassembler::new();
209 reasm.feed(&[], true);
210 assert!(reasm.pop_packet().is_none());
211 }
212
213 #[test]
218 fn corrupt_length_resyncs_at_next_pusi() {
219 let mut reasm = PacketReassembler::new();
220
221 let mut corrupt = vec![0x00u8, 0x00, 0x00, 0x00];
224 corrupt.extend_from_slice(&8000u16.to_be_bytes());
225 corrupt.extend_from_slice(&[0xEE; 20]);
226 let mut ts1 = vec![0x00]; ts1.extend_from_slice(&corrupt);
228 reasm.feed(&ts1, true);
229 assert!(reasm.pop_packet().is_none());
230
231 let good = make_t2mi_packet(0x00, 7, &[0xAB, 0xCD]);
233 let mut ts2 = vec![0x00];
234 ts2.extend_from_slice(&good);
235 reasm.feed(&ts2, true);
236
237 let pkt = reasm.pop_packet().expect("good packet must survive resync");
238 assert_eq!(&pkt[..], &good[..]);
239 assert!(reasm.pop_packet().is_none());
240 }
241
242 #[test]
245 fn pointer_past_payload_end_drops_sync() {
246 let mut reasm = PacketReassembler::new();
247 reasm.feed(&[0xFF, 0xAA, 0xBB], true); assert!(reasm.pop_packet().is_none());
249 reasm.feed(&[0xCC; 8], false);
251 assert!(reasm.pop_packet().is_none());
252 let good = make_t2mi_packet(0x00, 1, &[0x55]);
254 let mut ts = vec![0x00];
255 ts.extend_from_slice(&good);
256 reasm.feed(&ts, true);
257 assert_eq!(&reasm.pop_packet().unwrap()[..], &good[..]);
258 }
259
260 #[test]
261 fn drains_multiple_pending_packets() {
262 let t1 = make_t2mi_packet(0x00, 0, &[0xAA]);
263 let t2 = make_t2mi_packet(0x00, 1, &[0xBB]);
264 let t3 = make_t2mi_packet(0x00, 2, &[0xCC]);
265 let mut reasm = PacketReassembler::new();
266
267 let mut ts_payload = vec![0x00];
268 ts_payload.extend_from_slice(&t1);
269 ts_payload.extend_from_slice(&t2);
270 ts_payload.extend_from_slice(&t3);
271 reasm.feed(&ts_payload, true);
272
273 let packets: Vec<_> = reasm.drain_packets().collect();
274 assert_eq!(packets.len(), 3);
275 assert_eq!(&packets[0][..], &t1[..]);
276 assert_eq!(&packets[1][..], &t2[..]);
277 assert_eq!(&packets[2][..], &t3[..]);
278 }
279}