brec 0.6.0

A flexible binary format for storing and streaming structured data as packets with CRC protection and recoverability from corruption. Built for extensibility and robustness.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
use std::{
    io::{BufRead, Cursor},
    ops::RangeInclusive,
};

use crate::*;

/// Iterator over a sequence of `Slot`s, yielding the byte ranges that contain actual packet data.
///
/// Each iteration returns a `RangeInclusive<u64>` representing the location of the used data region
/// within the slot. The offset includes the size of the slot header and accounts for cumulative offset.
///
/// The iterator skips over empty slots and automatically adjusts for the internal layout.
///
/// Useful for scanning files or buffers that store serialized packets in slot-based format.
pub struct PacketsLocatorIterator<'a, I: Iterator<Item = &'a Slot>> {
    offset: u64,
    slots: I,
}

impl<'a, I: Iterator<Item = &'a Slot>> PacketsLocatorIterator<'a, I> {
    /// Creates a new `PacketsLocatorIterator` over the provided slice of slots.
    pub fn new(slots: I) -> Self {
        Self { offset: 0, slots }
    }

    /// Seeks to the specified packet index across the slots.
    ///
    /// # Arguments
    /// * `packet` - The logical packet index to locate.
    ///
    /// # Returns
    /// Returns a `RangeInclusive<u64>` representing the byte range of the packet data within the source
    ///
    /// # Errors
    /// Returns `Error::OutOfBounds` if the packet index exceeds the total number of packets available in the slots.
    /// Returns `Error::EmptySource` if there are no slots to search through.
    pub fn from(&mut self, packet: usize) -> Result<RangeInclusive<u64>, Error> {
        let mut count = 0;
        let mut target = packet;
        for (idx, slot) in self.slots.by_ref().enumerate() {
            count += 1;
            if slot.count() <= target {
                target -= slot.count();
                self.offset += slot.size() + slot.width();
                continue;
            }
            if !slot.is_used(target) {
                return Err(Error::OutOfBounds(
                    slot.count() + idx * DEFAULT_SLOT_CAPACITY,
                    packet,
                ));
            }
            let Some(packet_offset) = slot.offset_of(target) else {
                return Err(Error::OutOfBounds(
                    slot.count() + idx * DEFAULT_SLOT_CAPACITY,
                    packet,
                ));
            };
            let slot_offset = self.offset;
            self.offset += slot.size() + slot.width();
            return Ok(RangeInclusive::new(
                slot_offset + slot.size() + packet_offset,
                slot_offset + slot.size() + slot.width(),
            ));
        }
        if count == 0 {
            Err(Error::EmptySource)
        } else {
            Err(Error::OutOfBounds(count * DEFAULT_SLOT_CAPACITY, packet))
        }
    }
}

impl<'a, I: Iterator<Item = &'a Slot>> Iterator for PacketsLocatorIterator<'a, I> {
    type Item = RangeInclusive<u64>;

    /// Returns the next occupied range of packet data, or `None` if finished.
    fn next(&mut self) -> Option<Self::Item> {
        let slot = self.slots.next()?;
        let slot_width = slot.width();
        if slot_width == 0 {
            return None;
        }
        let location = RangeInclusive::new(
            self.offset + slot.size(),
            self.offset + slot_width + slot.size(),
        );
        self.offset += slot_width + slot.size();
        Some(location)
    }
}

/// An iterator over stored packets distributed across multiple slots.
///
/// `ReaderIterator` reads packets from a `Read + Seek` source (e.g. file or memory stream)
/// using a list of `Slot`s and their recorded layout. It internally tracks the position and
/// reuses an internal buffer (`Cursor<Vec<u8>>`) to efficiently read packet data.
///
/// It yields deserialized `PacketDef<B, P, Inner>` values or parsing errors.
///
/// # Type Parameters
/// - `S`: Source implementing `Read + Seek`
/// - `B`: Block type (must implement `BlockDef`)
/// - `P`: Payload container type (must implement `PayloadDef`)
/// - `Inner`: Inner payload object (must implement `PayloadInnerDef`)
pub struct ReaderIterator<
    'a,
    I: Iterator<Item = &'a Slot>,
    S: std::io::Read + std::io::Seek,
    B: BlockDef,
    P: PayloadDef<Inner>,
    Inner: PayloadInnerDef,
> {
    locator: PacketsLocatorIterator<'a, I>,
    source: &'a mut S,
    buffer: Cursor<Vec<u8>>,
    ctx: &'a mut <Inner as ProtocolSchema>::Context<'a>,
    _block: std::marker::PhantomData<B>,
    _payload: std::marker::PhantomData<P>,
    _payload_inner: std::marker::PhantomData<Inner>,
}

impl<
    'a,
    I: Iterator<Item = &'a Slot>,
    S: std::io::Read + std::io::Seek,
    B: BlockDef,
    P: PayloadDef<Inner>,
    Inner: PayloadInnerDef,
> ReaderIterator<'a, I, S, B, P, Inner>
{
    /// Constructs a new `ReaderIterator` from the given stream and slot layout.
    pub fn new(
        source: &'a mut S,
        slots: I,
        ctx: &'a mut <Inner as ProtocolSchema>::Context<'a>,
    ) -> Self {
        Self {
            locator: PacketsLocatorIterator::new(slots),
            source,
            buffer: Cursor::new(Vec::new()),
            ctx,
            _block: std::marker::PhantomData,
            _payload: std::marker::PhantomData,
            _payload_inner: std::marker::PhantomData,
        }
    }
    /// Seeks to the specified packet index across the slots.
    ///
    /// # Arguments
    /// * `packet` - The logical packet index to locate.
    ///
    /// # Returns
    /// Returns a `RangeInclusive<u64>` representing the byte range of the packet data within the source
    ///
    /// # Errors
    /// Returns `Error::OutOfBounds` if the packet index exceeds the total number of packets available in the slots.
    /// Returns `Error::EmptySource` if there are no slots to search through.
    /// Returns other errors related to IO operation
    pub fn seek(mut self, packet: usize) -> Result<Self, Error> {
        let location = self.locator.from(packet)?;
        self.source
            .seek(std::io::SeekFrom::Start(*location.start()))?;
        let size = (location.end() - location.start()) as usize;
        let mut inner = vec![0u8; size];
        self.source.read_exact(&mut inner).unwrap();
        self.buffer = Cursor::new(inner);
        Ok(self)
    }
}

impl<
    'a,
    I: Iterator<Item = &'a Slot>,
    S: std::io::Read + std::io::Write + std::io::Seek,
    B: BlockDef,
    P: PayloadDef<Inner>,
    Inner: PayloadInnerDef,
> Iterator for ReaderIterator<'a, I, S, B, P, Inner>
{
    type Item = Result<PacketDef<B, P, Inner>, Error>;

    /// Reads and yields the next packet located in the slots.
    ///
    /// Loads the slot's region into an internal buffer and calls `PacketDef::read`.
    /// Returns `None` if all slots are exhausted.
    fn next(&mut self) -> Option<Self::Item> {
        // TODO: remove unwraps and handle errors properly
        if self.buffer.fill_buf().unwrap().is_empty() {
            let location = self.locator.next()?;
            if let Err(err) = self
                .source
                .seek(std::io::SeekFrom::Start(*location.start()))
            {
                return Some(Err(err.into()));
            }
            let size = (location.end() - location.start()) as usize;
            let mut inner = vec![0u8; size];
            self.source.read_exact(&mut inner).unwrap();
            self.buffer = Cursor::new(inner);
        }
        match <PacketDef<B, P, Inner> as ReadPacketFrom>::read(&mut self.buffer, self.ctx) {
            Err(err) => Some(Err(err)),
            Ok(pkg) => Some(Ok(pkg)),
        }
    }
}

/// An iterator over packets stored in slots with rule-based filtering.
///
/// This iterator functions like `ReaderIterator`, but applies `RulesDef`-based
/// filters to decide whether to yield, skip, or reject packets. The filtering is performed
/// during parsing using `PacketDef::filtered`, which allows:
/// - prefiltering by blocks
/// - filtering by payload
/// - filtering by fully parsed packet
///
/// # Type Parameters
/// - `S`: Input stream (`Read + Seek`)
/// - `B`: Block type
/// - `BR`: Referred block type for rule filtering
/// - `P`: Payload wrapper type
/// - `Inner`: Inner payload type
pub struct ReaderFilteredIterator<
    'a,
    I: Iterator<Item = &'a Slot>,
    S: std::io::Read + std::io::Seek,
    B: BlockDef,
    BR: BlockReferredDef<B>,
    P: PayloadDef<Inner>,
    Inner: PayloadInnerDef,
> {
    locator: PacketsLocatorIterator<'a, I>,
    source: &'a mut S,
    rules: &'a RulesDef<B, BR, P, Inner>,
    buffer: Cursor<Vec<u8>>,
    ctx: &'a mut <Inner as ProtocolSchema>::Context<'a>,
}

impl<
    'a,
    I: Iterator<Item = &'a Slot>,
    S: std::io::Read + std::io::Seek,
    B: BlockDef,
    BR: BlockReferredDef<B>,
    P: PayloadDef<Inner>,
    Inner: PayloadInnerDef,
> ReaderFilteredIterator<'a, I, S, B, BR, P, Inner>
{
    /// Constructs a new filtered packet iterator from the given stream, slots and rules.
    pub fn new(
        source: &'a mut S,
        slots: I,
        rules: &'a RulesDef<B, BR, P, Inner>,
        ctx: &'a mut <Inner as ProtocolSchema>::Context<'a>,
    ) -> Self {
        Self {
            locator: PacketsLocatorIterator::new(slots),
            source,
            rules,
            buffer: Cursor::new(Vec::new()),
            ctx,
        }
    }
}

impl<
    'a,
    I: Iterator<Item = &'a Slot>,
    S: std::io::Read + std::io::Seek,
    B: BlockDef,
    BR: BlockReferredDef<B>,
    P: PayloadDef<Inner>,
    Inner: PayloadInnerDef,
> Iterator for ReaderFilteredIterator<'a, I, S, B, BR, P, Inner>
{
    type Item = Result<PacketDef<B, P, Inner>, Error>;

    /// Attempts to read and yield the next packet that passes all configured rules.
    ///
    /// Internally uses `PacketDef::filtered` to apply:
    /// - block-level filtering
    /// - payload-level filtering
    /// - full-packet filtering
    ///
    /// Returns `Some(Ok(...))` if accepted, skips if denied, and `Err(...)` on error.
    fn next(&mut self) -> Option<Self::Item> {
        loop {
            if self.buffer.fill_buf().unwrap().is_empty() {
                let location = self.locator.next()?;
                if let Err(err) = self
                    .source
                    .seek(std::io::SeekFrom::Start(*location.start()))
                {
                    return Some(Err(err.into()));
                }
                let size = (location.end() - location.start()) as usize;
                let mut inner = vec![0u8; size];
                self.source.read_exact(&mut inner).unwrap();
                self.buffer = Cursor::new(inner);
            }
            match PacketDef::filtered(&mut self.buffer, self.rules, self.ctx) {
                Ok(LookInStatus::Accepted(_, packet)) => return Some(Ok(packet)),
                Ok(LookInStatus::Denied(_)) => {
                    continue;
                }
                Ok(LookInStatus::NotEnoughData(needed)) => {
                    return Some(Err(Error::NotEnoughData(needed)));
                }
                Err(err) => return Some(Err(err)),
            }
        }
    }
}

/// An iterator over a specified range of packets within a `ReaderDef`.
///
/// Unlike `ReaderIterator`, this variant yields a bounded number of packets starting from a specific index.
/// It uses the internal `storage.nth(n)` method to fetch each packet by logical index.
///
/// # Type Parameters
/// - `S`: Underlying storage stream (`Read + Write + Seek`)
/// - `B`: Block type
/// - `BR`: Referred block type
/// - `P`: Payload type wrapper
/// - `Inner`: Inner payload object
pub struct ReaderRangeIterator<
    'a,
    S: std::io::Read + std::io::Seek,
    B: BlockDef,
    BR: BlockReferredDef<B>,
    P: PayloadDef<Inner>,
    Inner: PayloadInnerDef,
> {
    storage: &'a mut ReaderDef<S, B, BR, P, Inner>,
    len: usize,
    from: usize,
    ctx: &'a mut <Inner as ProtocolSchema>::Context<'a>,
    _block: std::marker::PhantomData<B>,
    _payload: std::marker::PhantomData<P>,
    _payload_inner: std::marker::PhantomData<Inner>,
}

impl<
    'a,
    S: std::io::Read + std::io::Seek,
    B: BlockDef,
    BR: BlockReferredDef<B>,
    P: PayloadDef<Inner>,
    Inner: PayloadInnerDef,
> ReaderRangeIterator<'a, S, B, BR, P, Inner>
{
    pub fn new(
        storage: &'a mut ReaderDef<S, B, BR, P, Inner>,
        from: usize,
        len: usize,
        ctx: &'a mut <Inner as ProtocolSchema>::Context<'a>,
    ) -> Self {
        Self {
            storage,
            len,
            from,
            ctx,
            _block: std::marker::PhantomData,
            _payload: std::marker::PhantomData,
            _payload_inner: std::marker::PhantomData,
        }
    }
}

impl<
    S: std::io::Read + std::io::Seek,
    B: BlockDef,
    BR: BlockReferredDef<B>,
    P: PayloadDef<Inner>,
    Inner: PayloadInnerDef,
> Iterator for ReaderRangeIterator<'_, S, B, BR, P, Inner>
{
    type Item = Result<PacketDef<B, P, Inner>, Error>;

    /// Returns the next packet from the range by calling `storage.nth(current_index)`.
    ///
    /// Ends after yielding `len` elements or if an error occurs.
    fn next(&mut self) -> Option<Self::Item> {
        if self.len == 0 {
            return None;
        }
        let item = self.storage.nth(self.from, self.ctx);
        self.from += 1;
        self.len -= 1;
        match item {
            Ok(None) => None,
            Ok(Some(packet)) => Some(Ok(packet)),
            Err(err) => Some(Err(err)),
        }
    }
}

/// An iterator over a specific range of packets from storage with rule-based filtering.
///
/// Similar to `ReaderRangeIterator`, but each packet is passed through the configured filtering rules.
/// Internally uses `storage.nth_filtered(index)` to fetch and filter packets.
///
/// Only packets that match all rule conditions are yielded (`LookInStatus::Accepted`).
///
/// # Type Parameters
/// - `S`: Source stream with `Read + Write + Seek`
/// - `B`: Block type
/// - `BR`: Block reference type for filtering
/// - `P`: Payload container type
/// - `Inner`: Inner payload object
pub struct ReaderRangeFilteredIterator<
    'a,
    S: std::io::Read + std::io::Seek,
    B: BlockDef,
    BR: BlockReferredDef<B>,
    P: PayloadDef<Inner>,
    Inner: PayloadInnerDef,
> {
    storage: &'a mut ReaderDef<S, B, BR, P, Inner>,
    len: usize,
    from: usize,
    ctx: &'a mut <Inner as ProtocolSchema>::Context<'a>,
}

impl<
    'a,
    S: std::io::Read + std::io::Seek,
    B: BlockDef,
    BR: BlockReferredDef<B>,
    P: PayloadDef<Inner>,
    Inner: PayloadInnerDef,
> ReaderRangeFilteredIterator<'a, S, B, BR, P, Inner>
{
    pub fn new(
        storage: &'a mut ReaderDef<S, B, BR, P, Inner>,
        from: usize,
        len: usize,
        ctx: &'a mut <Inner as ProtocolSchema>::Context<'a>,
    ) -> Self {
        Self {
            storage,
            len,
            from,
            ctx,
        }
    }
}

impl<
    S: std::io::Read + std::io::Write + std::io::Seek,
    B: BlockDef,
    BR: BlockReferredDef<B>,
    P: PayloadDef<Inner>,
    Inner: PayloadInnerDef,
> Iterator for ReaderRangeFilteredIterator<'_, S, B, BR, P, Inner>
{
    type Item = Result<PacketDef<B, P, Inner>, Error>;

    /// Attempts to read and yield the next packet in range that passes all filtering rules.
    ///
    /// Skips over packets that are denied, stops when `len` is exhausted or `None` is returned from storage.
    fn next(&mut self) -> Option<Self::Item> {
        loop {
            if self.len == 0 {
                return None;
            }
            let item = self.storage.nth_filtered(self.from, self.ctx);
            self.from += 1;
            match item {
                Ok(None) => return None,
                Ok(Some(LookInStatus::Accepted(_, packet))) => {
                    self.len -= 1;
                    return Some(Ok(packet));
                }
                Ok(Some(LookInStatus::Denied(_))) => {
                    continue;
                }
                Ok(Some(LookInStatus::NotEnoughData(needed))) => {
                    return Some(Err(Error::NotEnoughData(needed)));
                }
                Err(err) => return Some(Err(err)),
            }
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    fn slot_with_lengths(lengths: &[u64]) -> Slot {
        let mut slot = Slot::new(lengths.to_vec(), lengths.len() as u64, [0; 4]);
        slot.overwrite_crc();
        slot
    }

    #[test]
    fn packets_locator_next_tracks_offsets_across_slots() {
        let slot_a = slot_with_lengths(&[10, 20, 0]);
        let slot_b = slot_with_lengths(&[7, 0]);
        let slots = [slot_a, slot_b];

        let mut it = PacketsLocatorIterator::new(slots.iter());

        let first = it.next().expect("first slot range");
        let second = it.next().expect("second slot range");
        assert!(it.next().is_none());

        let first_size = slots[0].size();
        assert_eq!(*first.start(), first_size);
        assert_eq!(*first.end(), first_size + slots[0].width());

        let second_base = slots[0].size() + slots[0].width();
        assert_eq!(*second.start(), second_base + slots[1].size());
        assert_eq!(
            *second.end(),
            second_base + slots[1].size() + slots[1].width()
        );
    }

    #[test]
    fn packets_locator_from_handles_valid_empty_and_oob() {
        let empty = slot_with_lengths(&[0, 0]);
        let used = slot_with_lengths(&[5, 6, 0]);
        let slots = [empty, used];
        let mut it = PacketsLocatorIterator::new(slots.iter());

        let range = it
            .from(0)
            .expect("first packet should resolve in second slot");
        let base = slots[0].size() + slots[0].width();
        assert_eq!(*range.start(), base + slots[1].size());
        assert_eq!(*range.end(), base + slots[1].size() + slots[1].width());

        let mut oob_it = PacketsLocatorIterator::new(slots.iter());
        assert!(matches!(oob_it.from(10), Err(Error::OutOfBounds(_, 10))));

        let mut no_slots = PacketsLocatorIterator::new([].iter());
        assert!(matches!(no_slots.from(0), Err(Error::EmptySource)));
    }
}