1use crate::*;
2#[cfg(feature = "resilient")]
3use brec_common::{BLOCK_CRC_LEN, BLOCK_SIG_LEN, BLOCK_SIZE_FIELD_LEN};
4
5impl<B: BlockDef, P: PayloadDef<Inner>, Inner: PayloadInnerDef> ReadPacketFrom
20 for PacketDef<B, P, Inner>
21{
22 fn read<T: std::io::Read>(
23 buf: &mut T,
24 ctx: &mut <Self as PayloadSchema>::Context<'_>,
25 ) -> Result<Self, Error>
26 where
27 Self: Sized,
28 {
29 let header = PacketHeader::read(buf)?;
30 let mut pkg = PacketDef::default();
31 let mut read = 0;
32 if header.blocks_len > 0 {
33 let mut blocks = vec![0u8; header.blocks_len as usize];
34 buf.read_exact(&mut blocks)?;
35 let mut reader = std::io::Cursor::new(blocks);
36 let mut iterations = 0;
37 loop {
38 match <B as TryReadFromBuffered>::try_read(&mut reader)? {
39 ReadStatus::Success(blk) => {
40 read += blk.size();
41 pkg.blocks.push(blk);
42 if read == header.blocks_len {
43 break;
44 }
45 }
46 ReadStatus::NotEnoughData(needed) => {
47 return Err(Error::NotEnoughData(needed as usize));
48 }
49 }
50 iterations += 1;
51 if iterations > MAX_BLOCKS_COUNT as usize {
52 return Err(Error::MaxBlocksCount);
53 }
54 }
55 }
56 if header.payload {
57 let header = <PayloadHeader as ReadFrom>::read(buf)?;
58 let payload = <P as ExtractPayloadFrom<Inner>>::read(buf, &header, ctx)?;
59 pkg.payload = Some(payload);
60 }
61 Ok(pkg)
62 }
63}
64
65impl<B: BlockDef, P: PayloadDef<Inner>, Inner: PayloadInnerDef> TryReadPacketFrom
83 for PacketDef<B, P, Inner>
84{
85 fn try_read<T: std::io::Read + std::io::Seek>(
86 buf: &mut T,
87 ctx: &mut <Self as PayloadSchema>::Context<'_>,
88 ) -> Result<PacketReadStatus<Self>, Error>
89 where
90 Self: Sized,
91 {
92 let start_pos = buf.stream_position()?;
93 let available = buf.seek(std::io::SeekFrom::End(0))? - start_pos;
94 buf.seek(std::io::SeekFrom::Start(start_pos))?;
95 #[cfg(feature = "resilient")]
96 let mut unrecognized = Vec::new();
97 let packet_header = match <PacketHeader as TryReadFrom>::try_read(buf)? {
98 ReadStatus::NotEnoughData(needed) => {
99 return Ok(PacketReadStatus::NotEnoughData(needed));
100 }
101 ReadStatus::Success(header) => header,
102 };
103 let packet_size = PacketHeader::ssize() + packet_header.size;
104 if packet_size > available {
105 return Ok(PacketReadStatus::NotEnoughData(packet_size - available));
106 }
107 let mut pkg = PacketDef::default();
108 let mut read = 0;
109 if packet_header.blocks_len > 0 {
110 let mut iterations = 0;
111 loop {
112 match <B as TryReadFrom>::try_read(buf) {
113 Ok(ReadStatus::Success(blk)) => {
114 read += blk.size();
115 pkg.blocks.push(blk);
116 if read == packet_header.blocks_len {
117 break;
118 }
119 }
120 Ok(ReadStatus::NotEnoughData(needed)) => {
121 buf.seek(std::io::SeekFrom::Start(start_pos))?;
122 return Ok(PacketReadStatus::NotEnoughData(needed));
123 }
124 Err(err) => {
125 #[cfg(feature = "resilient")]
126 if let Error::SignatureDismatch(mut entry) = err {
127 let Some(body_len) = entry.len else {
128 buf.seek(std::io::SeekFrom::Start(start_pos))?;
129 return Err(Error::ZeroLengthBlock);
130 };
131 if body_len == 0 {
132 buf.seek(std::io::SeekFrom::Start(start_pos))?;
133 return Err(Error::InvalidLength);
134 }
135 let block_len = BLOCK_SIG_LEN as u64
136 + BLOCK_SIZE_FIELD_LEN as u64
137 + body_len
138 + BLOCK_CRC_LEN as u64;
139 let blocks_left = packet_header.blocks_len.saturating_sub(read);
140 if block_len > blocks_left {
141 buf.seek(std::io::SeekFrom::Start(start_pos))?;
142 return Err(Error::InvalidLength);
143 }
144 entry.pos = Some(PacketHeader::ssize() + read);
145 buf.seek(std::io::SeekFrom::Current(block_len as i64))?;
146 read += block_len;
147 unrecognized.push(entry);
148 if read == packet_header.blocks_len {
149 break;
150 }
151 iterations += 1;
152 if iterations > MAX_BLOCKS_COUNT as usize {
153 buf.seek(std::io::SeekFrom::Start(start_pos))?;
154 return Err(Error::MaxBlocksCount);
155 }
156 continue;
157 }
158 buf.seek(std::io::SeekFrom::Start(start_pos))?;
159 return Err(err);
160 }
161 }
162 iterations += 1;
163 if iterations > MAX_BLOCKS_COUNT as usize {
164 buf.seek(std::io::SeekFrom::Start(start_pos))?;
165 return Err(Error::MaxBlocksCount);
166 }
167 }
168 }
169 if packet_header.payload {
170 match <PayloadHeader as TryReadFrom>::try_read(buf)? {
171 ReadStatus::Success(payload_header) => {
172 let payload_total =
173 payload_header.size() as u64 + payload_header.payload_len() as u64;
174 let packet_payload_left = packet_header.size - packet_header.blocks_len;
175 if payload_total > packet_payload_left {
176 buf.seek(std::io::SeekFrom::Start(start_pos))?;
177 return Err(Error::InvalidLength);
178 }
179 match <P as TryExtractPayloadFrom<Inner>>::try_read(buf, &payload_header, ctx) {
180 Ok(ReadStatus::Success(payload)) => {
181 pkg.payload = Some(payload);
182 }
183 Ok(ReadStatus::NotEnoughData(needed)) => {
184 buf.seek(std::io::SeekFrom::Start(start_pos))?;
185 return Ok(PacketReadStatus::NotEnoughData(needed));
186 }
187 Err(err) => {
188 #[cfg(feature = "resilient")]
189 if let Error::SignatureDismatch(mut entry) = err {
190 let payload_len = payload_header.payload_len() as u64;
191 let payload_total = payload_len + payload_header.size() as u64;
192 let packet_payload_left =
193 packet_header.size - packet_header.blocks_len;
194 if payload_total > packet_payload_left {
195 buf.seek(std::io::SeekFrom::Start(start_pos))?;
196 return Err(Error::InvalidLength);
197 }
198 entry.pos =
199 Some(PacketHeader::ssize() + packet_header.blocks_len + 1);
200 entry.len = Some(payload_len);
201 buf.seek(std::io::SeekFrom::Current(payload_len as i64))?;
202 unrecognized.push(entry);
203 } else {
204 buf.seek(std::io::SeekFrom::Start(start_pos))?;
205 return Err(err);
206 }
207 #[cfg(not(feature = "resilient"))]
208 {
209 buf.seek(std::io::SeekFrom::Start(start_pos))?;
210 return Err(err);
211 }
212 }
213 }
214 }
215 ReadStatus::NotEnoughData(needed) => {
216 buf.seek(std::io::SeekFrom::Start(start_pos))?;
217 return Err(Error::NotEnoughData(needed as usize));
218 }
219 }
220 }
221 #[cfg(feature = "resilient")]
222 {
223 Ok(PacketReadStatus::success(pkg, unrecognized))
224 }
225 #[cfg(not(feature = "resilient"))]
226 {
227 Ok(PacketReadStatus::success(pkg))
228 }
229 }
230}
231
232impl<B: BlockDef, P: PayloadDef<Inner>, Inner: PayloadInnerDef> TryReadPacketFromBuffered
250 for PacketDef<B, P, Inner>
251{
252 fn try_read<T: std::io::BufRead>(
253 reader: &mut T,
254 ctx: &mut <Self as PayloadSchema>::Context<'_>,
255 ) -> Result<PacketReadStatus<Self>, Error>
256 where
257 Self: Sized,
258 {
259 let bytes = reader.fill_buf()?;
260 let available = bytes.len() as u64;
261 if available < PacketHeader::ssize() {
262 return Ok(PacketReadStatus::NotEnoughData(
263 PacketHeader::ssize() - available,
264 ));
265 }
266 let packet_header = PacketHeader::read_from_slice(bytes, false)?;
267 let packet_size = PacketHeader::ssize() + packet_header.size;
268 if packet_size > available {
269 return Ok(PacketReadStatus::NotEnoughData(packet_size - available));
270 }
271 reader.consume(PacketHeader::ssize() as usize);
272 #[cfg(feature = "resilient")]
273 let mut unrecognized = Vec::new();
274 let mut pkg = PacketDef::default();
275 let mut read = 0;
276 if packet_header.blocks_len > 0 {
277 let mut iterations = 0;
278 loop {
279 match <B as TryReadFromBuffered>::try_read(reader) {
280 Ok(ReadStatus::Success(blk)) => {
281 read += blk.size();
282 pkg.blocks.push(blk);
283 if read == packet_header.blocks_len {
284 break;
285 }
286 }
287 Ok(ReadStatus::NotEnoughData(needed)) => {
288 return Ok(PacketReadStatus::NotEnoughData(needed));
289 }
290 Err(err) => {
291 #[cfg(feature = "resilient")]
292 if let Error::SignatureDismatch(mut entry) = err {
293 let Some(body_len) = entry.len else {
294 return Err(Error::ZeroLengthBlock);
295 };
296 if body_len == 0 {
297 return Err(Error::InvalidLength);
298 }
299 let block_len = BLOCK_SIG_LEN as u64
300 + BLOCK_SIZE_FIELD_LEN as u64
301 + body_len
302 + BLOCK_CRC_LEN as u64;
303 let blocks_left = packet_header.blocks_len.saturating_sub(read);
304 if block_len > blocks_left {
305 return Err(Error::InvalidLength);
306 }
307 entry.pos = Some(PacketHeader::ssize() + read);
308 reader.consume(block_len as usize);
309 read += block_len;
310 unrecognized.push(entry);
311 if read == packet_header.blocks_len {
312 break;
313 }
314 iterations += 1;
315 if iterations > MAX_BLOCKS_COUNT as usize {
316 return Err(Error::MaxBlocksCount);
317 }
318 continue;
319 }
320 return Err(err);
321 }
322 }
323 iterations += 1;
324 if iterations > MAX_BLOCKS_COUNT as usize {
325 return Err(Error::MaxBlocksCount);
326 }
327 }
328 }
329 if packet_header.payload {
330 match <PayloadHeader as TryReadFromBuffered>::try_read(reader)? {
331 ReadStatus::Success(payload_header) => {
332 let payload_total =
333 payload_header.size() as u64 + payload_header.payload_len() as u64;
334 let packet_payload_left = packet_header.size - packet_header.blocks_len;
335 if payload_total > packet_payload_left {
336 return Err(Error::InvalidLength);
337 }
338 reader.consume(payload_header.size());
339 match <P as TryExtractPayloadFromBuffered<Inner>>::try_read(
340 reader,
341 &payload_header,
342 ctx,
343 ) {
344 Ok(ReadStatus::Success(payload)) => {
345 pkg.payload = Some(payload);
346 }
347 Ok(ReadStatus::NotEnoughData(needed)) => {
348 return Ok(PacketReadStatus::NotEnoughData(needed));
349 }
350 Err(err) => {
351 #[cfg(feature = "resilient")]
352 if let Error::SignatureDismatch(mut entry) = err {
353 let payload_len = payload_header.payload_len() as u64;
354 let payload_total = payload_len + payload_header.size() as u64;
355 let packet_payload_left =
356 packet_header.size - packet_header.blocks_len;
357 if payload_total > packet_payload_left {
358 return Err(Error::InvalidLength);
359 }
360 entry.pos =
361 Some(PacketHeader::ssize() + packet_header.blocks_len + 1);
362 entry.len = Some(payload_len);
363 reader.consume(payload_len as usize);
364 unrecognized.push(entry);
365 } else {
366 return Err(err);
367 }
368 #[cfg(not(feature = "resilient"))]
369 return Err(err);
370 }
371 }
372 }
373 ReadStatus::NotEnoughData(needed) => {
374 return Err(Error::NotEnoughData(needed as usize));
375 }
376 }
377 }
378 #[cfg(feature = "resilient")]
379 {
380 Ok(PacketReadStatus::success(pkg, unrecognized))
381 }
382 #[cfg(not(feature = "resilient"))]
383 {
384 Ok(PacketReadStatus::success(pkg))
385 }
386 }
387}