Skip to main content

brec/packet/
read.rs

1use crate::*;
2#[cfg(feature = "resilient")]
3use brec_common::{BLOCK_CRC_LEN, BLOCK_SIG_LEN, BLOCK_SIZE_FIELD_LEN};
4
5/// Reads a complete `PacketDef` from a stream, including header, blocks, and optional payload.
6///
7/// This implementation **does not support partial reads** - if the header is successfully
8/// read but the blocks or payload data are incomplete, an I/O error will be returned.
9///
10/// # Notes
11/// - Does **not** return `Error::NotEnoughData`; instead, read failures always result in `std::io::Error`.
12/// - Use this implementation when you're sure the entire packet is available in the stream.
13///
14/// # Errors
15/// - `Error::SignatureDismatch` or `Error::CrcDismatch` if header validation fails.
16/// - `Error::NotEnoughData` if there’s insufficient data in the inner block stream.
17/// - `Error::MaxBlocksCount` if the block count exceeds the allowed maximum.
18/// - Any decoding or payload-related error from underlying implementations.
19impl<B: BlockDef, P: PayloadDef<Inner>, Inner: PayloadInnerDef> ReadPacketFrom
20    for PacketDef<B, P, Inner>
21{
22    fn read<T: std::io::Read>(
23        buf: &mut T,
24        ctx: &mut <Self as PayloadSchema>::Context<'_>,
25    ) -> Result<Self, Error>
26    where
27        Self: Sized,
28    {
29        let header = PacketHeader::read(buf)?;
30        let mut pkg = PacketDef::default();
31        let mut read = 0;
32        if header.blocks_len > 0 {
33            let mut blocks = vec![0u8; header.blocks_len as usize];
34            buf.read_exact(&mut blocks)?;
35            let mut reader = std::io::Cursor::new(blocks);
36            let mut iterations = 0;
37            loop {
38                match <B as TryReadFromBuffered>::try_read(&mut reader)? {
39                    ReadStatus::Success(blk) => {
40                        read += blk.size();
41                        pkg.blocks.push(blk);
42                        if read == header.blocks_len {
43                            break;
44                        }
45                    }
46                    ReadStatus::NotEnoughData(needed) => {
47                        return Err(Error::NotEnoughData(needed as usize));
48                    }
49                }
50                iterations += 1;
51                if iterations > MAX_BLOCKS_COUNT as usize {
52                    return Err(Error::MaxBlocksCount);
53                }
54            }
55        }
56        if header.payload {
57            let header = <PayloadHeader as ReadFrom>::read(buf)?;
58            let payload = <P as ExtractPayloadFrom<Inner>>::read(buf, &header, ctx)?;
59            pkg.payload = Some(payload);
60        }
61        Ok(pkg)
62    }
63}
64
65/// Attempts to read a `PacketDef` from a seekable stream with partial read awareness.
66///
67/// This implementation checks if enough data is available before attempting to decode,
68/// and can return `ReadStatus::NotEnoughData(...)` instead of failing with an I/O error.
69///
70/// # Behavior
71/// - If not enough data is available for the entire payload, stream position is reset.
72/// - If read fails partway through (block or payload), stream position is reset and the error returned.
73/// - If block count exceeds `MAX_BLOCKS_COUNT`, returns `Error::MaxBlocksCount`.
74///
75/// # Returns
76/// - `PacketReadStatus::Success(packet)` - full packet successfully read.
77/// - `PacketReadStatus::NotEnoughData(bytes)` - more data needed to complete the packet.
78/// - `Error` - on decoding, CRC, signature, or logic errors.
79///
80/// # Stream behavior
81/// Seeks forward to read the packet, and seeks back on early return or error.
82impl<B: BlockDef, P: PayloadDef<Inner>, Inner: PayloadInnerDef> TryReadPacketFrom
83    for PacketDef<B, P, Inner>
84{
85    fn try_read<T: std::io::Read + std::io::Seek>(
86        buf: &mut T,
87        ctx: &mut <Self as PayloadSchema>::Context<'_>,
88    ) -> Result<PacketReadStatus<Self>, Error>
89    where
90        Self: Sized,
91    {
92        let start_pos = buf.stream_position()?;
93        let available = buf.seek(std::io::SeekFrom::End(0))? - start_pos;
94        buf.seek(std::io::SeekFrom::Start(start_pos))?;
95        #[cfg(feature = "resilient")]
96        let mut unrecognized = Vec::new();
97        let packet_header = match <PacketHeader as TryReadFrom>::try_read(buf)? {
98            ReadStatus::NotEnoughData(needed) => {
99                return Ok(PacketReadStatus::NotEnoughData(needed));
100            }
101            ReadStatus::Success(header) => header,
102        };
103        let packet_size = PacketHeader::ssize() + packet_header.size;
104        if packet_size > available {
105            return Ok(PacketReadStatus::NotEnoughData(packet_size - available));
106        }
107        let mut pkg = PacketDef::default();
108        let mut read = 0;
109        if packet_header.blocks_len > 0 {
110            let mut iterations = 0;
111            loop {
112                match <B as TryReadFrom>::try_read(buf) {
113                    Ok(ReadStatus::Success(blk)) => {
114                        read += blk.size();
115                        pkg.blocks.push(blk);
116                        if read == packet_header.blocks_len {
117                            break;
118                        }
119                    }
120                    Ok(ReadStatus::NotEnoughData(needed)) => {
121                        buf.seek(std::io::SeekFrom::Start(start_pos))?;
122                        return Ok(PacketReadStatus::NotEnoughData(needed));
123                    }
124                    Err(err) => {
125                        #[cfg(feature = "resilient")]
126                        if let Error::SignatureDismatch(mut entry) = err {
127                            let Some(body_len) = entry.len else {
128                                buf.seek(std::io::SeekFrom::Start(start_pos))?;
129                                return Err(Error::ZeroLengthBlock);
130                            };
131                            if body_len == 0 {
132                                buf.seek(std::io::SeekFrom::Start(start_pos))?;
133                                return Err(Error::InvalidLength);
134                            }
135                            let block_len = BLOCK_SIG_LEN as u64
136                                + BLOCK_SIZE_FIELD_LEN as u64
137                                + body_len
138                                + BLOCK_CRC_LEN as u64;
139                            let blocks_left = packet_header.blocks_len.saturating_sub(read);
140                            if block_len > blocks_left {
141                                buf.seek(std::io::SeekFrom::Start(start_pos))?;
142                                return Err(Error::InvalidLength);
143                            }
144                            entry.pos = Some(PacketHeader::ssize() + read);
145                            buf.seek(std::io::SeekFrom::Current(block_len as i64))?;
146                            read += block_len;
147                            unrecognized.push(entry);
148                            if read == packet_header.blocks_len {
149                                break;
150                            }
151                            iterations += 1;
152                            if iterations > MAX_BLOCKS_COUNT as usize {
153                                buf.seek(std::io::SeekFrom::Start(start_pos))?;
154                                return Err(Error::MaxBlocksCount);
155                            }
156                            continue;
157                        }
158                        buf.seek(std::io::SeekFrom::Start(start_pos))?;
159                        return Err(err);
160                    }
161                }
162                iterations += 1;
163                if iterations > MAX_BLOCKS_COUNT as usize {
164                    buf.seek(std::io::SeekFrom::Start(start_pos))?;
165                    return Err(Error::MaxBlocksCount);
166                }
167            }
168        }
169        if packet_header.payload {
170            match <PayloadHeader as TryReadFrom>::try_read(buf)? {
171                ReadStatus::Success(payload_header) => {
172                    let payload_total =
173                        payload_header.size() as u64 + payload_header.payload_len() as u64;
174                    let packet_payload_left = packet_header.size - packet_header.blocks_len;
175                    if payload_total > packet_payload_left {
176                        buf.seek(std::io::SeekFrom::Start(start_pos))?;
177                        return Err(Error::InvalidLength);
178                    }
179                    match <P as TryExtractPayloadFrom<Inner>>::try_read(buf, &payload_header, ctx) {
180                        Ok(ReadStatus::Success(payload)) => {
181                            pkg.payload = Some(payload);
182                        }
183                        Ok(ReadStatus::NotEnoughData(needed)) => {
184                            buf.seek(std::io::SeekFrom::Start(start_pos))?;
185                            return Ok(PacketReadStatus::NotEnoughData(needed));
186                        }
187                        Err(err) => {
188                            #[cfg(feature = "resilient")]
189                            if let Error::SignatureDismatch(mut entry) = err {
190                                let payload_len = payload_header.payload_len() as u64;
191                                let payload_total = payload_len + payload_header.size() as u64;
192                                let packet_payload_left =
193                                    packet_header.size - packet_header.blocks_len;
194                                if payload_total > packet_payload_left {
195                                    buf.seek(std::io::SeekFrom::Start(start_pos))?;
196                                    return Err(Error::InvalidLength);
197                                }
198                                entry.pos =
199                                    Some(PacketHeader::ssize() + packet_header.blocks_len + 1);
200                                entry.len = Some(payload_len);
201                                buf.seek(std::io::SeekFrom::Current(payload_len as i64))?;
202                                unrecognized.push(entry);
203                            } else {
204                                buf.seek(std::io::SeekFrom::Start(start_pos))?;
205                                return Err(err);
206                            }
207                            #[cfg(not(feature = "resilient"))]
208                            {
209                                buf.seek(std::io::SeekFrom::Start(start_pos))?;
210                                return Err(err);
211                            }
212                        }
213                    }
214                }
215                ReadStatus::NotEnoughData(needed) => {
216                    buf.seek(std::io::SeekFrom::Start(start_pos))?;
217                    return Err(Error::NotEnoughData(needed as usize));
218                }
219            }
220        }
221        #[cfg(feature = "resilient")]
222        {
223            Ok(PacketReadStatus::success(pkg, unrecognized))
224        }
225        #[cfg(not(feature = "resilient"))]
226        {
227            Ok(PacketReadStatus::success(pkg))
228        }
229    }
230}
231
232/// Attempts to read a `PacketDef` from a buffered reader.
233///
234/// This is similar to `TryReadFrom`, but works with non-seekable buffered sources (e.g., network streams).
235///
236/// # Behavior
237/// - Reads header directly from `BufRead::fill_buf()` and consumes it.
238/// - Ensures that `header.size` bytes are available before decoding.
239/// - Supports partial reads using `ReadStatus::NotEnoughData(...)`.
240///
241/// # Returns
242/// - `PacketReadStatus::Success(packet)` - if all required data was read and validated.
243/// - `PacketReadStatus::NotEnoughData(bytes)` - if more bytes are needed.
244/// - `Error::MaxBlocksCount` - if the block limit is exceeded.
245/// - Any decoding or CRC/signature errors.
246///
247/// # Notes
248/// The header and block stream are parsed directly from the internal buffer. Payload data may be buffered or streamed depending on implementation.
249impl<B: BlockDef, P: PayloadDef<Inner>, Inner: PayloadInnerDef> TryReadPacketFromBuffered
250    for PacketDef<B, P, Inner>
251{
252    fn try_read<T: std::io::BufRead>(
253        reader: &mut T,
254        ctx: &mut <Self as PayloadSchema>::Context<'_>,
255    ) -> Result<PacketReadStatus<Self>, Error>
256    where
257        Self: Sized,
258    {
259        let bytes = reader.fill_buf()?;
260        let available = bytes.len() as u64;
261        if available < PacketHeader::ssize() {
262            return Ok(PacketReadStatus::NotEnoughData(
263                PacketHeader::ssize() - available,
264            ));
265        }
266        let packet_header = PacketHeader::read_from_slice(bytes, false)?;
267        let packet_size = PacketHeader::ssize() + packet_header.size;
268        if packet_size > available {
269            return Ok(PacketReadStatus::NotEnoughData(packet_size - available));
270        }
271        reader.consume(PacketHeader::ssize() as usize);
272        #[cfg(feature = "resilient")]
273        let mut unrecognized = Vec::new();
274        let mut pkg = PacketDef::default();
275        let mut read = 0;
276        if packet_header.blocks_len > 0 {
277            let mut iterations = 0;
278            loop {
279                match <B as TryReadFromBuffered>::try_read(reader) {
280                    Ok(ReadStatus::Success(blk)) => {
281                        read += blk.size();
282                        pkg.blocks.push(blk);
283                        if read == packet_header.blocks_len {
284                            break;
285                        }
286                    }
287                    Ok(ReadStatus::NotEnoughData(needed)) => {
288                        return Ok(PacketReadStatus::NotEnoughData(needed));
289                    }
290                    Err(err) => {
291                        #[cfg(feature = "resilient")]
292                        if let Error::SignatureDismatch(mut entry) = err {
293                            let Some(body_len) = entry.len else {
294                                return Err(Error::ZeroLengthBlock);
295                            };
296                            if body_len == 0 {
297                                return Err(Error::InvalidLength);
298                            }
299                            let block_len = BLOCK_SIG_LEN as u64
300                                + BLOCK_SIZE_FIELD_LEN as u64
301                                + body_len
302                                + BLOCK_CRC_LEN as u64;
303                            let blocks_left = packet_header.blocks_len.saturating_sub(read);
304                            if block_len > blocks_left {
305                                return Err(Error::InvalidLength);
306                            }
307                            entry.pos = Some(PacketHeader::ssize() + read);
308                            reader.consume(block_len as usize);
309                            read += block_len;
310                            unrecognized.push(entry);
311                            if read == packet_header.blocks_len {
312                                break;
313                            }
314                            iterations += 1;
315                            if iterations > MAX_BLOCKS_COUNT as usize {
316                                return Err(Error::MaxBlocksCount);
317                            }
318                            continue;
319                        }
320                        return Err(err);
321                    }
322                }
323                iterations += 1;
324                if iterations > MAX_BLOCKS_COUNT as usize {
325                    return Err(Error::MaxBlocksCount);
326                }
327            }
328        }
329        if packet_header.payload {
330            match <PayloadHeader as TryReadFromBuffered>::try_read(reader)? {
331                ReadStatus::Success(payload_header) => {
332                    let payload_total =
333                        payload_header.size() as u64 + payload_header.payload_len() as u64;
334                    let packet_payload_left = packet_header.size - packet_header.blocks_len;
335                    if payload_total > packet_payload_left {
336                        return Err(Error::InvalidLength);
337                    }
338                    reader.consume(payload_header.size());
339                    match <P as TryExtractPayloadFromBuffered<Inner>>::try_read(
340                        reader,
341                        &payload_header,
342                        ctx,
343                    ) {
344                        Ok(ReadStatus::Success(payload)) => {
345                            pkg.payload = Some(payload);
346                        }
347                        Ok(ReadStatus::NotEnoughData(needed)) => {
348                            return Ok(PacketReadStatus::NotEnoughData(needed));
349                        }
350                        Err(err) => {
351                            #[cfg(feature = "resilient")]
352                            if let Error::SignatureDismatch(mut entry) = err {
353                                let payload_len = payload_header.payload_len() as u64;
354                                let payload_total = payload_len + payload_header.size() as u64;
355                                let packet_payload_left =
356                                    packet_header.size - packet_header.blocks_len;
357                                if payload_total > packet_payload_left {
358                                    return Err(Error::InvalidLength);
359                                }
360                                entry.pos =
361                                    Some(PacketHeader::ssize() + packet_header.blocks_len + 1);
362                                entry.len = Some(payload_len);
363                                reader.consume(payload_len as usize);
364                                unrecognized.push(entry);
365                            } else {
366                                return Err(err);
367                            }
368                            #[cfg(not(feature = "resilient"))]
369                            return Err(err);
370                        }
371                    }
372                }
373                ReadStatus::NotEnoughData(needed) => {
374                    return Err(Error::NotEnoughData(needed as usize));
375                }
376            }
377        }
378        #[cfg(feature = "resilient")]
379        {
380            Ok(PacketReadStatus::success(pkg, unrecognized))
381        }
382        #[cfg(not(feature = "resilient"))]
383        {
384            Ok(PacketReadStatus::success(pkg))
385        }
386    }
387}