use std::io::BufRead;
use crate::*;
pub enum NextPacket<B: BlockDef, P: PayloadDef<Inner>, Inner: PayloadInnerDef> {
NotEnoughData(usize),
NoData,
NotFound,
Skipped,
Found(PacketDef<B, P, Inner>),
}
pub enum PacketHeaderState {
NotFound,
NotEnoughData(usize, usize),
Found(PacketHeader, std::ops::RangeInclusive<usize>),
}
pub enum HeaderReadState {
Ready(Option<PacketHeader>),
Refill(Option<(Vec<u8>, usize)>),
Empty,
}
pub enum ResolveHeaderReady<B: BlockDef, P: PayloadDef<Inner>, Inner: PayloadInnerDef> {
Next(NextPacket<B, P, Inner>),
Resolved(PacketHeader),
}
pub struct PacketBufReaderDef<
'a,
R: std::io::Read,
B: BlockDef,
BR: BlockReferredDef<B>,
P: PayloadDef<Inner>,
Inner: PayloadInnerDef,
> {
inner: std::io::BufReader<&'a mut R>,
rules: RulesDef<B, BR, P, Inner>,
recent: HeaderReadState,
buffered: Vec<u8>,
}
impl<
'a,
R: std::io::Read,
B: BlockDef,
BR: BlockReferredDef<B>,
P: PayloadDef<Inner>,
Inner: PayloadInnerDef,
> PacketBufReaderDef<'a, R, B, BR, P, Inner>
{
fn read_header(buffer: &[u8]) -> Result<PacketHeaderState, Error> {
let Some(offset) = PacketHeader::get_pos(buffer) else {
return Ok(PacketHeaderState::NotFound);
};
if let Some(needed) = PacketHeader::is_not_enought(&buffer[offset..]) {
return Ok(PacketHeaderState::NotEnoughData(offset, needed));
}
Ok(PacketHeaderState::Found(
PacketHeader::read_from_slice(&buffer[offset..], false)?,
std::ops::RangeInclusive::new(offset, offset + PacketHeader::ssize() as usize),
))
}
fn drop_and_consume(
&mut self,
consume: Option<usize>,
result: Result<NextPacket<B, P, Inner>, Error>,
) -> Result<NextPacket<B, P, Inner>, Error> {
self.buffered.clear();
if let Some(s) = consume {
self.inner.consume(s)
}
result
}
fn resolve_header_ready(
&mut self,
header: PacketHeader,
) -> Result<ResolveHeaderReady<B, P, Inner>, Error> {
let buffer = self.inner.fill_buf()?;
let packet_size = header.size as usize;
let available = self.buffered.len() + buffer.len();
if packet_size > available {
let consumed = buffer.len();
self.buffered.extend_from_slice(buffer);
self.inner.consume(consumed);
self.recent = HeaderReadState::Ready(Some(header));
return Ok(ResolveHeaderReady::Next(NextPacket::NotEnoughData(
packet_size - available,
)));
}
if packet_size < self.buffered.len() {
return Err(Error::InvalidPacketReaderLogic);
}
let rest_data = packet_size - self.buffered.len();
self.buffered.extend_from_slice(&buffer[..rest_data]);
self.inner.consume(rest_data);
Ok(ResolveHeaderReady::Resolved(header))
}
fn resolve_header_refill(
&mut self,
mut buffer: Vec<u8>,
needed: usize,
) -> Result<NextPacket<B, P, Inner>, Error> {
let extracted = self.inner.fill_buf()?;
if extracted.is_empty() {
self.rules.ignore(&buffer)?;
return Ok(NextPacket::NoData);
}
let extracted_len = extracted.len();
if extracted_len < needed {
buffer.extend_from_slice(extracted);
self.inner.consume(extracted_len);
self.recent = HeaderReadState::Refill(Some((buffer, needed - extracted_len)));
return Ok(NextPacket::NotEnoughData(needed - extracted_len));
}
let buffered = buffer.len();
buffer.extend_from_slice(&extracted[..needed]);
match PacketBufReaderDef::<'a, R, B, BR, P, Inner>::read_header(&buffer)? {
PacketHeaderState::Found(header, sgmt) => {
if sgmt.start() > &0 {
self.rules.ignore(&buffer[..*sgmt.start()])?;
}
self.inner.consume(sgmt.end() - buffered);
self.recent = HeaderReadState::Ready(Some(header));
return Ok(NextPacket::NotEnoughData(0));
}
_ => {
}
}
buffer.extend_from_slice(&extracted[needed..]);
let header_len = PacketHeader::ssize() as usize;
match PacketBufReaderDef::<'a, R, B, BR, P, Inner>::read_header(&buffer)? {
PacketHeaderState::Found(header, sgmt) => {
if sgmt.start() > &0 {
self.rules.ignore(&buffer[..*sgmt.start()])?;
}
self.inner.consume(sgmt.end() - buffered);
self.recent = HeaderReadState::Ready(Some(header));
Ok(NextPacket::NotEnoughData(0))
}
PacketHeaderState::NotEnoughData(from, needed) => {
if from > 0 {
self.rules.ignore(&buffer[..from])?;
buffer.drain(..from);
}
self.inner.consume(extracted_len);
self.recent = HeaderReadState::Refill(Some((buffer, needed)));
Ok(NextPacket::NotEnoughData(needed))
}
PacketHeaderState::NotFound => {
if buffer.len() > header_len {
self.rules.ignore(&buffer[..buffer.len() - header_len])?;
buffer.drain(..(buffer.len() - header_len));
}
self.inner.consume(extracted_len);
self.recent = HeaderReadState::Refill(Some((buffer, header_len)));
Ok(NextPacket::NotFound)
}
}
}
pub fn new(inner: &'a mut R) -> Self {
Self {
inner: std::io::BufReader::new(inner),
rules: RulesDef::default(),
recent: HeaderReadState::Empty,
buffered: Vec::with_capacity(u32::MAX as usize),
}
}
pub fn add_rule(&mut self, rule: RuleDef<B, BR, P, Inner>) -> Result<(), Error> {
self.rules.add_rule(rule)
}
pub fn remove_rule(&mut self, rule: RuleDefId) {
self.rules.remove_rule(rule);
}
pub fn read(
&mut self,
ctx: &mut <Inner as PayloadSchema>::Context<'_>,
) -> Result<NextPacket<B, P, Inner>, Error> {
let recent = std::mem::replace(&mut self.recent, HeaderReadState::Empty);
let (packet_buffer, header, consume) = match recent {
HeaderReadState::Ready(Some(header)) => match self.resolve_header_ready(header)? {
ResolveHeaderReady::Next(next) => return Ok(next),
ResolveHeaderReady::Resolved(header) => (self.buffered.as_slice(), header, None),
},
HeaderReadState::Refill(Some((buffer, needed))) => {
return self.resolve_header_refill(buffer, needed);
}
HeaderReadState::Empty => {
self.buffered.clear();
let buffer = self.inner.fill_buf()?;
if buffer.is_empty() {
return Ok(NextPacket::NoData);
}
let available = buffer.len();
if available < PacketHeader::ssize() as usize {
let needed = (PacketHeader::ssize() as usize) - available;
let mut data: Vec<u8> = Vec::with_capacity(available);
data.extend_from_slice(buffer);
self.recent = HeaderReadState::Refill(Some((data, needed)));
self.inner.consume(available);
return Ok(NextPacket::NotEnoughData(needed));
}
match PacketBufReaderDef::<'a, R, B, BR, P, Inner>::read_header(buffer)? {
PacketHeaderState::NotFound => {
let header_len = PacketHeader::ssize() as usize;
if available > header_len {
self.rules.ignore(&buffer[..available - header_len])?;
self.recent = HeaderReadState::Refill(Some((
buffer[available - header_len..].to_vec(),
header_len,
)));
} else {
self.recent =
HeaderReadState::Refill(Some((buffer.to_vec(), header_len)));
}
self.inner.consume(available);
return Ok(NextPacket::NotFound);
}
PacketHeaderState::NotEnoughData(from, needed) => {
if from > 0 {
self.rules.ignore(&buffer[..from])?;
}
let mut data: Vec<u8> = Vec::with_capacity(buffer.len() - from);
data.extend_from_slice(&buffer[from..]);
self.recent = HeaderReadState::Refill(Some((data, needed)));
self.inner.consume(available);
return Ok(NextPacket::NotEnoughData(needed));
}
PacketHeaderState::Found(header, sgmt) => {
if sgmt.start() > &0 {
self.rules.ignore(&buffer[..*sgmt.start()])?;
}
let packet_size = header.size as usize;
let needs = packet_size + *sgmt.end();
if needs > available {
self.buffered.extend_from_slice(&buffer[*sgmt.end()..]);
self.inner.consume(available);
self.recent = HeaderReadState::Ready(Some(header));
return Ok(NextPacket::NotEnoughData(needs - available));
}
let consume = Some(*sgmt.end() + header.size as usize);
(
&buffer[*sgmt.end()..*sgmt.end() + header.size as usize],
header,
consume,
)
}
}
}
_error => {
return Err(Error::InvalidPacketReaderLogic);
}
};
let blocks_len = header.blocks_len as usize;
let blocks_buffer = &packet_buffer[..blocks_len];
let mut blocks = Vec::new();
let mut processed = 0;
let mut count = 0;
if !blocks_buffer.is_empty() {
loop {
if count == MAX_BLOCKS_COUNT {
self.buffered.clear();
return Err(Error::MaxBlocksCount);
}
let blk = match BR::read_from_slice(&blocks_buffer[processed..], false) {
Ok(blk) => blk,
Err(err) => {
return self.drop_and_consume(consume, Err(err));
}
};
if blk.size() == 0 {
return self.drop_and_consume(consume, Err(Error::ZeroLengthBlock));
}
processed += blk.size() as usize;
count += 1;
blocks.push(blk);
if processed == blocks_buffer.len() {
break;
}
}
}
if !self.rules.prefilter(&blocks) {
return self.drop_and_consume(consume, Ok(NextPacket::Skipped));
}
let pkg = if header.payload {
let mut payload_buffer = &packet_buffer[blocks_len..];
match <PayloadHeader as TryReadFromBuffered>::try_read(&mut payload_buffer) {
Ok(ReadStatus::Success(header)) => {
let mut payload_buffer = &packet_buffer[blocks_len + header.size()..];
if !self.rules.filter_payload(payload_buffer) {
return self.drop_and_consume(consume, Ok(NextPacket::Skipped));
}
match <P as TryExtractPayloadFromBuffered<Inner>>::try_read(
&mut payload_buffer,
&header,
ctx,
)? {
ReadStatus::Success(payload) => PacketDef::new(
blocks.into_iter().map(|blk| blk.into()).collect::<Vec<B>>(),
Some(payload),
),
ReadStatus::NotEnoughData(needed) => {
return self.drop_and_consume(
consume,
Err(Error::NotEnoughData(needed as usize)),
);
}
}
}
Ok(ReadStatus::NotEnoughData(needed)) => {
return self
.drop_and_consume(consume, Err(Error::NotEnoughData(needed as usize)));
}
Err(err) => {
return self.drop_and_consume(consume, Err(err));
}
}
} else {
PacketDef::new(
blocks.into_iter().map(|blk| blk.into()).collect::<Vec<B>>(),
None,
)
};
if !self.rules.filter_packet(&pkg) {
self.drop_and_consume(consume, Ok(NextPacket::Skipped))
} else {
self.drop_and_consume(consume, Ok(NextPacket::Found(pkg)))
}
}
}