use std::{
io::{BufRead, Cursor},
ops::RangeInclusive,
};
use crate::*;
pub struct PacketsLocatorIterator<'a, I: Iterator<Item = &'a Slot>> {
offset: u64,
slots: I,
}
impl<'a, I: Iterator<Item = &'a Slot>> PacketsLocatorIterator<'a, I> {
pub fn new(slots: I) -> Self {
Self { offset: 0, slots }
}
pub fn from(&mut self, packet: usize) -> Result<RangeInclusive<u64>, Error> {
let mut count = 0;
let mut target = packet;
for (idx, slot) in self.slots.by_ref().enumerate() {
count += 1;
if slot.count() <= target {
target -= slot.count();
self.offset += slot.size() + slot.width();
continue;
}
if !slot.is_used(target) {
return Err(Error::OutOfBounds(
slot.count() + idx * DEFAULT_SLOT_CAPACITY,
packet,
));
}
let Some(packet_offset) = slot.offset_of(target) else {
return Err(Error::OutOfBounds(
slot.count() + idx * DEFAULT_SLOT_CAPACITY,
packet,
));
};
let slot_offset = self.offset;
self.offset += slot.size() + slot.width();
return Ok(RangeInclusive::new(
slot_offset + slot.size() + packet_offset,
slot_offset + slot.size() + slot.width(),
));
}
if count == 0 {
Err(Error::EmptySource)
} else {
Err(Error::OutOfBounds(count * DEFAULT_SLOT_CAPACITY, packet))
}
}
}
impl<'a, I: Iterator<Item = &'a Slot>> Iterator for PacketsLocatorIterator<'a, I> {
type Item = RangeInclusive<u64>;
fn next(&mut self) -> Option<Self::Item> {
let slot = self.slots.next()?;
let slot_width = slot.width();
if slot_width == 0 {
return None;
}
let location = RangeInclusive::new(
self.offset + slot.size(),
self.offset + slot_width + slot.size(),
);
self.offset += slot_width + slot.size();
Some(location)
}
}
pub struct ReaderIterator<
'a,
I: Iterator<Item = &'a Slot>,
S: std::io::Read + std::io::Seek,
B: BlockDef,
P: PayloadDef<Inner>,
Inner: PayloadInnerDef,
> {
locator: PacketsLocatorIterator<'a, I>,
source: &'a mut S,
buffer: Cursor<Vec<u8>>,
ctx: &'a mut <Inner as ProtocolSchema>::Context<'a>,
_block: std::marker::PhantomData<B>,
_payload: std::marker::PhantomData<P>,
_payload_inner: std::marker::PhantomData<Inner>,
}
impl<
'a,
I: Iterator<Item = &'a Slot>,
S: std::io::Read + std::io::Seek,
B: BlockDef,
P: PayloadDef<Inner>,
Inner: PayloadInnerDef,
> ReaderIterator<'a, I, S, B, P, Inner>
{
pub fn new(
source: &'a mut S,
slots: I,
ctx: &'a mut <Inner as ProtocolSchema>::Context<'a>,
) -> Self {
Self {
locator: PacketsLocatorIterator::new(slots),
source,
buffer: Cursor::new(Vec::new()),
ctx,
_block: std::marker::PhantomData,
_payload: std::marker::PhantomData,
_payload_inner: std::marker::PhantomData,
}
}
pub fn seek(mut self, packet: usize) -> Result<Self, Error> {
let location = self.locator.from(packet)?;
self.source
.seek(std::io::SeekFrom::Start(*location.start()))?;
let size = (location.end() - location.start()) as usize;
let mut inner = vec![0u8; size];
self.source.read_exact(&mut inner).unwrap();
self.buffer = Cursor::new(inner);
Ok(self)
}
}
impl<
'a,
I: Iterator<Item = &'a Slot>,
S: std::io::Read + std::io::Write + std::io::Seek,
B: BlockDef,
P: PayloadDef<Inner>,
Inner: PayloadInnerDef,
> Iterator for ReaderIterator<'a, I, S, B, P, Inner>
{
type Item = Result<PacketDef<B, P, Inner>, Error>;
fn next(&mut self) -> Option<Self::Item> {
if self.buffer.fill_buf().unwrap().is_empty() {
let location = self.locator.next()?;
if let Err(err) = self
.source
.seek(std::io::SeekFrom::Start(*location.start()))
{
return Some(Err(err.into()));
}
let size = (location.end() - location.start()) as usize;
let mut inner = vec![0u8; size];
self.source.read_exact(&mut inner).unwrap();
self.buffer = Cursor::new(inner);
}
match <PacketDef<B, P, Inner> as ReadPacketFrom>::read(&mut self.buffer, self.ctx) {
Err(err) => Some(Err(err)),
Ok(pkg) => Some(Ok(pkg)),
}
}
}
pub struct ReaderFilteredIterator<
'a,
I: Iterator<Item = &'a Slot>,
S: std::io::Read + std::io::Seek,
B: BlockDef,
BR: BlockReferredDef<B>,
P: PayloadDef<Inner>,
Inner: PayloadInnerDef,
> {
locator: PacketsLocatorIterator<'a, I>,
source: &'a mut S,
rules: &'a RulesDef<B, BR, P, Inner>,
buffer: Cursor<Vec<u8>>,
ctx: &'a mut <Inner as ProtocolSchema>::Context<'a>,
}
impl<
'a,
I: Iterator<Item = &'a Slot>,
S: std::io::Read + std::io::Seek,
B: BlockDef,
BR: BlockReferredDef<B>,
P: PayloadDef<Inner>,
Inner: PayloadInnerDef,
> ReaderFilteredIterator<'a, I, S, B, BR, P, Inner>
{
pub fn new(
source: &'a mut S,
slots: I,
rules: &'a RulesDef<B, BR, P, Inner>,
ctx: &'a mut <Inner as ProtocolSchema>::Context<'a>,
) -> Self {
Self {
locator: PacketsLocatorIterator::new(slots),
source,
rules,
buffer: Cursor::new(Vec::new()),
ctx,
}
}
}
impl<
'a,
I: Iterator<Item = &'a Slot>,
S: std::io::Read + std::io::Seek,
B: BlockDef,
BR: BlockReferredDef<B>,
P: PayloadDef<Inner>,
Inner: PayloadInnerDef,
> Iterator for ReaderFilteredIterator<'a, I, S, B, BR, P, Inner>
{
type Item = Result<PacketDef<B, P, Inner>, Error>;
fn next(&mut self) -> Option<Self::Item> {
loop {
if self.buffer.fill_buf().unwrap().is_empty() {
let location = self.locator.next()?;
if let Err(err) = self
.source
.seek(std::io::SeekFrom::Start(*location.start()))
{
return Some(Err(err.into()));
}
let size = (location.end() - location.start()) as usize;
let mut inner = vec![0u8; size];
self.source.read_exact(&mut inner).unwrap();
self.buffer = Cursor::new(inner);
}
match PacketDef::filtered(&mut self.buffer, self.rules, self.ctx) {
Ok(LookInStatus::Accepted(_, packet)) => return Some(Ok(packet)),
Ok(LookInStatus::Denied(_)) => {
continue;
}
Ok(LookInStatus::NotEnoughData(needed)) => {
return Some(Err(Error::NotEnoughData(needed)));
}
Err(err) => return Some(Err(err)),
}
}
}
}
pub struct ReaderRangeIterator<
'a,
S: std::io::Read + std::io::Seek,
B: BlockDef,
BR: BlockReferredDef<B>,
P: PayloadDef<Inner>,
Inner: PayloadInnerDef,
> {
storage: &'a mut ReaderDef<S, B, BR, P, Inner>,
len: usize,
from: usize,
ctx: &'a mut <Inner as ProtocolSchema>::Context<'a>,
_block: std::marker::PhantomData<B>,
_payload: std::marker::PhantomData<P>,
_payload_inner: std::marker::PhantomData<Inner>,
}
impl<
'a,
S: std::io::Read + std::io::Seek,
B: BlockDef,
BR: BlockReferredDef<B>,
P: PayloadDef<Inner>,
Inner: PayloadInnerDef,
> ReaderRangeIterator<'a, S, B, BR, P, Inner>
{
pub fn new(
storage: &'a mut ReaderDef<S, B, BR, P, Inner>,
from: usize,
len: usize,
ctx: &'a mut <Inner as ProtocolSchema>::Context<'a>,
) -> Self {
Self {
storage,
len,
from,
ctx,
_block: std::marker::PhantomData,
_payload: std::marker::PhantomData,
_payload_inner: std::marker::PhantomData,
}
}
}
impl<
S: std::io::Read + std::io::Seek,
B: BlockDef,
BR: BlockReferredDef<B>,
P: PayloadDef<Inner>,
Inner: PayloadInnerDef,
> Iterator for ReaderRangeIterator<'_, S, B, BR, P, Inner>
{
type Item = Result<PacketDef<B, P, Inner>, Error>;
fn next(&mut self) -> Option<Self::Item> {
if self.len == 0 {
return None;
}
let item = self.storage.nth(self.from, self.ctx);
self.from += 1;
self.len -= 1;
match item {
Ok(None) => None,
Ok(Some(packet)) => Some(Ok(packet)),
Err(err) => Some(Err(err)),
}
}
}
pub struct ReaderRangeFilteredIterator<
'a,
S: std::io::Read + std::io::Seek,
B: BlockDef,
BR: BlockReferredDef<B>,
P: PayloadDef<Inner>,
Inner: PayloadInnerDef,
> {
storage: &'a mut ReaderDef<S, B, BR, P, Inner>,
len: usize,
from: usize,
ctx: &'a mut <Inner as ProtocolSchema>::Context<'a>,
}
impl<
'a,
S: std::io::Read + std::io::Seek,
B: BlockDef,
BR: BlockReferredDef<B>,
P: PayloadDef<Inner>,
Inner: PayloadInnerDef,
> ReaderRangeFilteredIterator<'a, S, B, BR, P, Inner>
{
pub fn new(
storage: &'a mut ReaderDef<S, B, BR, P, Inner>,
from: usize,
len: usize,
ctx: &'a mut <Inner as ProtocolSchema>::Context<'a>,
) -> Self {
Self {
storage,
len,
from,
ctx,
}
}
}
impl<
S: std::io::Read + std::io::Write + std::io::Seek,
B: BlockDef,
BR: BlockReferredDef<B>,
P: PayloadDef<Inner>,
Inner: PayloadInnerDef,
> Iterator for ReaderRangeFilteredIterator<'_, S, B, BR, P, Inner>
{
type Item = Result<PacketDef<B, P, Inner>, Error>;
fn next(&mut self) -> Option<Self::Item> {
loop {
if self.len == 0 {
return None;
}
let item = self.storage.nth_filtered(self.from, self.ctx);
self.from += 1;
match item {
Ok(None) => return None,
Ok(Some(LookInStatus::Accepted(_, packet))) => {
self.len -= 1;
return Some(Ok(packet));
}
Ok(Some(LookInStatus::Denied(_))) => {
continue;
}
Ok(Some(LookInStatus::NotEnoughData(needed))) => {
return Some(Err(Error::NotEnoughData(needed)));
}
Err(err) => return Some(Err(err)),
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
fn slot_with_lengths(lengths: &[u64]) -> Slot {
let mut slot = Slot::new(lengths.to_vec(), lengths.len() as u64, [0; 4]);
slot.overwrite_crc();
slot
}
#[test]
fn packets_locator_next_tracks_offsets_across_slots() {
let slot_a = slot_with_lengths(&[10, 20, 0]);
let slot_b = slot_with_lengths(&[7, 0]);
let slots = [slot_a, slot_b];
let mut it = PacketsLocatorIterator::new(slots.iter());
let first = it.next().expect("first slot range");
let second = it.next().expect("second slot range");
assert!(it.next().is_none());
let first_size = slots[0].size();
assert_eq!(*first.start(), first_size);
assert_eq!(*first.end(), first_size + slots[0].width());
let second_base = slots[0].size() + slots[0].width();
assert_eq!(*second.start(), second_base + slots[1].size());
assert_eq!(
*second.end(),
second_base + slots[1].size() + slots[1].width()
);
}
#[test]
fn packets_locator_from_handles_valid_empty_and_oob() {
let empty = slot_with_lengths(&[0, 0]);
let used = slot_with_lengths(&[5, 6, 0]);
let slots = [empty, used];
let mut it = PacketsLocatorIterator::new(slots.iter());
let range = it
.from(0)
.expect("first packet should resolve in second slot");
let base = slots[0].size() + slots[0].width();
assert_eq!(*range.start(), base + slots[1].size());
assert_eq!(*range.end(), base + slots[1].size() + slots[1].width());
let mut oob_it = PacketsLocatorIterator::new(slots.iter());
assert!(matches!(oob_it.from(10), Err(Error::OutOfBounds(_, 10))));
let mut no_slots = PacketsLocatorIterator::new([].iter());
assert!(matches!(no_slots.from(0), Err(Error::EmptySource)));
}
}