1use std::collections::VecDeque;
6
7use crate::crc::CRC_LEN;
8
9#[derive(Default)]
13pub struct PacketReassembler {
14 buf: bytes::BytesMut,
15 synced: bool,
16 pending: VecDeque<bytes::Bytes>,
17}
18
19const HEADER_LEN: usize = 6;
21
22impl PacketReassembler {
23 pub fn new() -> Self {
25 Self::default()
26 }
27
28 pub fn feed(&mut self, payload: &[u8], pusi: bool) {
39 if payload.is_empty() {
40 return;
41 }
42
43 if pusi {
44 let ptr = payload[0] as usize;
45
46 let boundary = 1 + ptr;
48 if self.synced {
49 let end = boundary.min(payload.len());
50 if end > 1 {
51 self.buf.extend_from_slice(&payload[1..end]);
52 }
53 self.try_extract_packets();
55 }
56
57 self.buf.clear();
63
64 if boundary <= payload.len() {
65 if boundary < payload.len() {
66 self.buf.extend_from_slice(&payload[boundary..]);
67 self.try_extract_packets();
68 }
69 self.synced = true;
72 } else {
73 self.synced = false;
76 }
77 } else if self.synced {
78 self.buf.extend_from_slice(payload);
80 self.try_extract_packets();
81 }
82 }
84
85 fn try_extract_packets(&mut self) {
87 loop {
88 if self.buf.len() < HEADER_LEN {
90 break;
91 }
92
93 let payload_len_bits = ((self.buf[4] as u16) << 8) | (self.buf[5] as u16);
95 let payload_len_bytes = payload_len_bits.div_ceil(8);
96 let total_packet_len = HEADER_LEN + payload_len_bytes as usize + CRC_LEN;
97
98 if self.buf.len() < total_packet_len {
99 break;
100 }
101
102 let packet_bytes = self.buf.split_to(total_packet_len);
104 self.pending.push_back(packet_bytes.freeze());
105 }
106 }
107
108 pub fn pop_packet(&mut self) -> Option<bytes::Bytes> {
110 self.pending.pop_front()
111 }
112
113 pub fn drain_packets(&mut self) -> impl Iterator<Item = bytes::Bytes> + '_ {
115 self.pending.drain(..)
116 }
117}
118
119#[cfg(test)]
120mod tests {
121 use super::*;
122
123 fn make_t2mi_packet(packet_type: u8, count: u8, payload: &[u8]) -> Vec<u8> {
124 let mut pkt = Vec::with_capacity(6 + payload.len() + 4);
125 pkt.push(packet_type);
126 pkt.push(count);
127 let header_byte2 = 0x00;
129 let header_byte3 = 0x00;
130 pkt.push(header_byte2);
131 pkt.push(header_byte3);
132 let bits = (payload.len() * 8) as u16;
133 pkt.extend_from_slice(&bits.to_be_bytes());
134 pkt.extend_from_slice(payload);
135 pkt.extend_from_slice(&[0u8; 4]);
137 pkt
138 }
139
140 #[test]
141 fn reassembles_single_packet_with_pusi_offset_0() {
142 let t2mi = make_t2mi_packet(0x00, 0, &[0xAA, 0xBB]);
143 let mut reasm = PacketReassembler::new();
144 let mut ts_payload = vec![0x00];
146 ts_payload.extend_from_slice(&t2mi);
147 reasm.feed(&ts_payload, true);
148 let pkt = reasm.pop_packet().unwrap();
149 assert_eq!(&pkt[..], &t2mi[..]);
150 }
151
152 #[test]
153 fn reassembles_packet_spanning_two_ts_packets() {
154 let t2mi = make_t2mi_packet(0x00, 0, &[0xCC; 200]);
155 let mut reasm = PacketReassembler::new();
156
157 let mut ts1 = vec![0x00];
159 ts1.extend_from_slice(&t2mi[..100]);
160 reasm.feed(&ts1, true);
161 assert!(reasm.pop_packet().is_none());
162
163 reasm.feed(&t2mi[100..], false);
165 let pkt = reasm.pop_packet().unwrap();
166 assert_eq!(&pkt[..], &t2mi[..]);
167 }
168
169 #[test]
170 fn reassembles_two_packets_in_one_ts_payload() {
171 let t1 = make_t2mi_packet(0x00, 0, &[0x11]);
172 let t2 = make_t2mi_packet(0x01, 1, &[0x22]);
173 let mut reasm = PacketReassembler::new();
174
175 let mut ts_payload = vec![0x00]; ts_payload.extend_from_slice(&t1);
177 ts_payload.extend_from_slice(&t2);
178 reasm.feed(&ts_payload, true);
179
180 let p1 = reasm.pop_packet().unwrap();
181 let p2 = reasm.pop_packet().unwrap();
182 assert_eq!(&p1[..], &t1[..]);
183 assert_eq!(&p2[..], &t2[..]);
184 }
185
186 #[test]
187 fn handles_pusi_with_nonzero_pointer() {
188 let t2mi = make_t2mi_packet(0x00, 5, &[0xDE]);
189 let mut reasm = PacketReassembler::new();
190 let mut ts_payload = vec![0x03, 0xFF, 0xFF, 0xFF];
192 ts_payload.extend_from_slice(&t2mi);
193 reasm.feed(&ts_payload, true);
194 let pkt = reasm.pop_packet().unwrap();
195 assert_eq!(&pkt[..], &t2mi[..]);
196 }
197
198 #[test]
199 fn discards_data_before_first_pusi() {
200 let mut reasm = PacketReassembler::new();
201 reasm.feed(&[0xAA, 0xBB], false); assert!(reasm.pop_packet().is_none());
203 }
204
205 #[test]
206 fn handles_empty_payload() {
207 let mut reasm = PacketReassembler::new();
208 reasm.feed(&[], true);
209 assert!(reasm.pop_packet().is_none());
210 }
211
212 #[test]
217 fn corrupt_length_resyncs_at_next_pusi() {
218 let mut reasm = PacketReassembler::new();
219
220 let mut corrupt = vec![0x00u8, 0x00, 0x00, 0x00];
223 corrupt.extend_from_slice(&8000u16.to_be_bytes());
224 corrupt.extend_from_slice(&[0xEE; 20]);
225 let mut ts1 = vec![0x00]; ts1.extend_from_slice(&corrupt);
227 reasm.feed(&ts1, true);
228 assert!(reasm.pop_packet().is_none());
229
230 let good = make_t2mi_packet(0x00, 7, &[0xAB, 0xCD]);
232 let mut ts2 = vec![0x00];
233 ts2.extend_from_slice(&good);
234 reasm.feed(&ts2, true);
235
236 let pkt = reasm.pop_packet().expect("good packet must survive resync");
237 assert_eq!(&pkt[..], &good[..]);
238 assert!(reasm.pop_packet().is_none());
239 }
240
241 #[test]
244 fn pointer_past_payload_end_drops_sync() {
245 let mut reasm = PacketReassembler::new();
246 reasm.feed(&[0xFF, 0xAA, 0xBB], true); assert!(reasm.pop_packet().is_none());
248 reasm.feed(&[0xCC; 8], false);
250 assert!(reasm.pop_packet().is_none());
251 let good = make_t2mi_packet(0x00, 1, &[0x55]);
253 let mut ts = vec![0x00];
254 ts.extend_from_slice(&good);
255 reasm.feed(&ts, true);
256 assert_eq!(&reasm.pop_packet().unwrap()[..], &good[..]);
257 }
258
259 #[test]
260 fn drains_multiple_pending_packets() {
261 let t1 = make_t2mi_packet(0x00, 0, &[0xAA]);
262 let t2 = make_t2mi_packet(0x00, 1, &[0xBB]);
263 let t3 = make_t2mi_packet(0x00, 2, &[0xCC]);
264 let mut reasm = PacketReassembler::new();
265
266 let mut ts_payload = vec![0x00];
267 ts_payload.extend_from_slice(&t1);
268 ts_payload.extend_from_slice(&t2);
269 ts_payload.extend_from_slice(&t3);
270 reasm.feed(&ts_payload, true);
271
272 let packets: Vec<_> = reasm.drain_packets().collect();
273 assert_eq!(packets.len(), 3);
274 assert_eq!(&packets[0][..], &t1[..]);
275 assert_eq!(&packets[1][..], &t2[..]);
276 assert_eq!(&packets[2][..], &t3[..]);
277 }
278}