mod iters;
use crate::*;
pub(crate) use iters::*;
pub type NthFilteredPacket<B, P, Inner> = Option<LookInStatus<PacketDef<B, P, Inner>>>;
pub struct ReaderDef<
S: std::io::Read + std::io::Seek,
B: BlockDef,
BR: BlockReferredDef<B>,
P: PayloadDef<Inner>,
Inner: PayloadInnerDef,
> {
pub slots: Vec<AnchoredSlot>,
inner: S,
locator: FreeSlotLocator,
rules: RulesDef<B, BR, P, Inner>,
}
impl<
S: std::io::Read + std::io::Seek,
B: BlockDef,
BR: BlockReferredDef<B>,
P: PayloadDef<Inner>,
Inner: PayloadInnerDef,
> ReaderDef<S, B, BR, P, Inner>
{
pub fn new(inner: S) -> Result<Self, Error> {
Self {
slots: Vec::new(),
inner,
locator: FreeSlotLocator::default(),
rules: RulesDef::default(),
}
.load()
}
fn load(mut self) -> Result<Self, Error> {
let mut offset = 0;
loop {
self.inner.seek(std::io::SeekFrom::Start(offset))?;
match <Slot as TryReadFrom>::try_read::<_, ()>(&mut self.inner) {
Ok(ReadStatus::Success(slot)) => {
let position = offset;
offset += slot.size() + slot.width();
self.slots.push(AnchoredSlot::new(slot, position));
}
Ok(ReadStatus::NotEnoughData(_needed)) => {
break;
}
Err(Error::CrcDismatch) => {
return Err(Error::DamagedSlot(Box::new(Error::CrcDismatch)));
}
Err(Error::SignatureDismatch(data)) => {
return Err(Error::DamagedSlot(Box::new(Error::SignatureDismatch(data))));
}
Err(err) => return Err(err),
}
}
self.locator
.setup(self.slots.iter().map(|anchored| &anchored.inner));
Ok(self)
}
pub fn reload(&mut self) -> Result<usize, Error> {
let previous_count: usize = self.slots.iter().map(|slot| slot.inner.count()).sum();
let mut source_pos;
let last = match self.slots.last().map(|v| (v, v.inner.expand())) {
Some((last, (Some(offset), Some(index), crc))) => {
source_pos = last.offset;
Some((offset, index, crc))
}
Some((last, (None, None, _))) => {
source_pos = last.offset + last.inner.width() + last.inner.size();
None
}
_ => {
source_pos = 0;
None
}
};
let origin_source_pos = source_pos;
loop {
self.inner.seek(std::io::SeekFrom::Start(source_pos))?;
match <Slot as TryReadFrom>::try_read::<_, ()>(&mut self.inner) {
Ok(ReadStatus::Success(slot)) => {
if let Some((_, _, crc)) = last
&& source_pos == origin_source_pos
{
if crc == slot.crc {
return Ok(0);
}
if let Some(lst) = self.slots.last_mut() {
lst.inner = slot;
if lst.get_free_slot_index().is_none() {
source_pos += lst.size() + lst.width();
} else {
break;
}
} else {
return Err(Error::AccessSlot(self.slots.len().saturating_sub(1)));
}
} else {
let position = source_pos;
source_pos += slot.size() + slot.width();
self.slots.push(AnchoredSlot::new(slot, position));
}
}
Ok(ReadStatus::NotEnoughData(needed)) => {
match (last.is_none(), origin_source_pos == source_pos) {
(true, true) => {
return Ok(0);
}
(false, true) => {
if needed == SlotHeader::ssize() {
break;
}
return Err(Error::DamagedSlot(Box::new(Error::NotEnoughData(
needed as usize,
))));
}
(false, false) | (true, false) => break,
}
}
Err(Error::CrcDismatch) => {
return Err(Error::DamagedSlot(Box::new(Error::CrcDismatch)));
}
Err(Error::SignatureDismatch(data)) => {
return Err(Error::DamagedSlot(Box::new(Error::SignatureDismatch(data))));
}
Err(err) => return Err(err),
}
}
let current_count: usize = self.slots.iter().map(|slot| slot.inner.count()).sum();
let read = current_count.saturating_sub(previous_count);
self.locator
.setup(self.slots.iter().map(|anchored| &anchored.inner));
Ok(read)
}
pub fn add_rule(&mut self, rule: RuleDef<B, BR, P, Inner>) -> Result<(), Error> {
self.rules.add_rule(rule)
}
pub fn remove_rule(&mut self, rule: RuleDefId) {
self.rules.remove_rule(rule);
}
pub fn count(&self) -> usize {
let (slot_index, _) = self.locator.current();
let Some(slot) = self.slots.get(slot_index) else {
return self.slots.len() * DEFAULT_SLOT_CAPACITY;
};
let Some(index) = slot.get_free_slot_index() else {
return self.slots.len() * DEFAULT_SLOT_CAPACITY;
};
slot_index * DEFAULT_SLOT_CAPACITY + index
}
pub fn get_offset(&self) -> u64 {
self.slots
.last()
.map(|slot| slot.offset + slot.width() + slot.size())
.unwrap_or(0)
}
pub fn iter<'a>(
&'a mut self,
ctx: &'a mut <Inner as ProtocolSchema>::Context<'a>,
) -> ReaderIterator<'a, impl Iterator<Item = &'a Slot>, S, B, P, Inner> {
ReaderIterator::new(
&mut self.inner,
self.slots.iter().map(|anchored| &anchored.inner),
ctx,
)
}
pub fn seek<'a>(
&'a mut self,
packet: usize,
ctx: &'a mut <Inner as ProtocolSchema>::Context<'a>,
) -> Result<ReaderIterator<'a, impl Iterator<Item = &'a Slot>, S, B, P, Inner>, Error> {
ReaderIterator::new(
&mut self.inner,
self.slots.iter().map(|anchored| &anchored.inner),
ctx,
)
.seek(packet)
}
pub fn filtered<'a>(
&'a mut self,
ctx: &'a mut <Inner as ProtocolSchema>::Context<'a>,
) -> ReaderFilteredIterator<'a, impl Iterator<Item = &'a Slot>, S, B, BR, P, Inner> {
ReaderFilteredIterator::new(
&mut self.inner,
self.slots.iter().map(|anchored| &anchored.inner),
&self.rules,
ctx,
)
}
pub fn nth(
&mut self,
nth: usize,
ctx: &mut <Inner as ProtocolSchema>::Context<'_>,
) -> Result<Option<PacketDef<B, P, Inner>>, Error> {
let slot_index = nth / DEFAULT_SLOT_CAPACITY;
let index_in_slot = nth % DEFAULT_SLOT_CAPACITY;
let Some(slot) = self.slots.get(slot_index) else {
return Ok(None);
};
if slot.is_empty(index_in_slot)? {
return Ok(None);
}
let Some(mut offset) = slot.get_slot_offset(index_in_slot) else {
return Ok(None);
};
offset += self.slots[..slot_index]
.iter()
.map(|slot| slot.width() + slot.size())
.sum::<u64>();
self.inner.seek(std::io::SeekFrom::Start(offset))?;
match <PacketDef<B, P, Inner> as TryReadPacketFrom>::try_read(&mut self.inner, ctx)? {
#[cfg(feature = "resilient")]
PacketReadStatus::Success((pkg, _unrecognized)) => Ok(Some(pkg)),
#[cfg(not(feature = "resilient"))]
PacketReadStatus::Success(pkg) => Ok(Some(pkg)),
PacketReadStatus::NotEnoughData(needed) => Err(Error::NotEnoughData(needed as usize)),
}
}
pub fn range<'a>(
&'a mut self,
from: usize,
len: usize,
ctx: &'a mut <Inner as ProtocolSchema>::Context<'a>,
) -> ReaderRangeIterator<'a, S, B, BR, P, Inner> {
ReaderRangeIterator::new(self, from, len, ctx)
}
pub fn range_filtered<'a>(
&'a mut self,
from: usize,
len: usize,
ctx: &'a mut <Inner as ProtocolSchema>::Context<'a>,
) -> ReaderRangeFilteredIterator<'a, S, B, BR, P, Inner> {
ReaderRangeFilteredIterator::new(self, from, len, ctx)
}
pub(crate) fn nth_filtered(
&mut self,
from: usize,
ctx: &mut <Inner as ProtocolSchema>::Context<'_>,
) -> Result<NthFilteredPacket<B, P, Inner>, Error> {
let slot_index = from / DEFAULT_SLOT_CAPACITY;
let index_in_slot = from % DEFAULT_SLOT_CAPACITY;
let Some(slot) = self.slots.get(slot_index) else {
return Ok(None);
};
if slot.is_empty(index_in_slot)? {
return Ok(None);
}
let Some(mut offset) = slot.get_slot_offset(index_in_slot) else {
return Ok(None);
};
offset += self.slots[..slot_index]
.iter()
.map(|slot| slot.width() + slot.size())
.sum::<u64>();
self.inner.seek(std::io::SeekFrom::Start(offset))?;
match PacketDef::filtered(&mut self.inner, &self.rules, ctx)? {
LookInStatus::Accepted(size, pkg) => Ok(Some(LookInStatus::Accepted(size, pkg))),
LookInStatus::Denied(size) => Ok(Some(LookInStatus::Denied(size))),
LookInStatus::NotEnoughData(needed) => Err(Error::NotEnoughData(needed)),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{RuleDef, RuleFnDef, tests::*};
use std::io::Cursor;
type ReaderUnderTest =
ReaderDef<Cursor<Vec<u8>>, TestBlock, TestBlock, TestPayload, TestPayload>;
fn empty_reader() -> ReaderUnderTest {
ReaderDef::new(Cursor::new(Vec::new())).expect("reader must initialize on empty source")
}
#[test]
fn reader_empty_source_basics_and_iterators() {
let mut reader = empty_reader();
assert!(reader.slots.is_empty());
assert_eq!(reader.count(), 0);
assert_eq!(reader.get_offset(), 0);
assert_eq!(reader.reload().expect("reload on empty"), 0);
assert!(reader.nth(0, &mut ()).expect("nth").is_none());
assert!(
reader
.nth_filtered(0, &mut ())
.expect("nth_filtered")
.is_none()
);
assert!(matches!(reader.seek(0, &mut ()), Err(Error::EmptySource)));
assert!(reader.iter(&mut ()).next().is_none());
assert!(reader.filtered(&mut ()).next().is_none());
assert!(reader.range(0, 10, &mut ()).next().is_none());
assert!(reader.range_filtered(0, 10, &mut ()).next().is_none());
}
#[test]
fn reader_add_rule_detects_duplicates_and_remove_is_safe() {
let mut reader = empty_reader();
reader
.add_rule(RuleDef::Prefilter(RuleFnDef::Static(|_| true)))
.expect("first prefilter rule should be added");
assert!(matches!(
reader.add_rule(RuleDef::Prefilter(RuleFnDef::Static(|_| true))),
Err(Error::RuleDuplicate)
));
reader.remove_rule(crate::RuleDefId::Prefilter);
reader
.add_rule(RuleDef::Prefilter(RuleFnDef::Static(|_| true)))
.expect("prefilter should be addable again after remove");
}
}