use std::{
fmt::{Debug, Formatter},
hint::spin_loop,
io::{ErrorKind, Read},
};
use regex::bytes::Regex;
use crate::{ctrl::*, CustomChunker, RcErr, SimpleCustomChunker};
const DEFAULT_BUFFER_SIZE: usize = 1024;
pub struct ByteChunker<R> {
source: R,
fence: Regex,
read_buff: Vec<u8>,
search_buff: Vec<u8>,
error_status: ErrorStatus,
match_dispo: MatchDisposition,
last_scan_matched: bool,
scan_start_offset: usize,
}
impl<R> ByteChunker<R> {
pub fn new(source: R, delimiter: &str) -> Result<Self, RcErr> {
let fence = Regex::new(delimiter)?;
Ok(Self {
source,
fence,
read_buff: vec![0u8; DEFAULT_BUFFER_SIZE],
search_buff: Vec::new(),
error_status: ErrorStatus::Ok,
match_dispo: MatchDisposition::default(),
last_scan_matched: false,
scan_start_offset: 0,
})
}
pub fn with_buffer_size(mut self, size: usize) -> Self {
self.read_buff.resize(size, 0);
self.read_buff.shrink_to_fit();
self
}
pub fn on_error(mut self, response: ErrorResponse) -> Self {
self.error_status = match response {
ErrorResponse::Halt => {
if self.error_status != ErrorStatus::Errored {
ErrorStatus::Ok
} else {
ErrorStatus::Errored
}
}
ErrorResponse::Continue => ErrorStatus::Continue,
ErrorResponse::Ignore => ErrorStatus::Ignore,
};
self
}
pub fn with_match(mut self, behavior: MatchDisposition) -> Self {
self.match_dispo = behavior;
if matches!(behavior, MatchDisposition::Drop | MatchDisposition::Append) {
self.scan_start_offset = 0;
}
self
}
pub fn into_inner(self) -> R {
self.source
}
pub fn into_innards(self) -> (R, Vec<u8>) {
(self.source, self.search_buff)
}
pub fn with_adapter<A>(self, adapter: A) -> CustomChunker<R, A> {
(self, adapter).into()
}
pub fn with_simple_adapter<A>(self, adapter: A) -> SimpleCustomChunker<R, A>
{
(self, adapter).into()
}
fn scan_buffer(&mut self) -> Option<Vec<u8>> {
let (start, end) = match self
.fence
.find_at(&self.search_buff, self.scan_start_offset)
{
Some(m) => {
self.last_scan_matched = true;
(m.start(), m.end())
}
None => {
self.last_scan_matched = false;
return None;
}
};
let mut new_buff;
match self.match_dispo {
MatchDisposition::Drop => {
new_buff = self.search_buff.split_off(end);
self.search_buff.resize(start, 0);
}
MatchDisposition::Append => {
new_buff = self.search_buff.split_off(end);
}
MatchDisposition::Prepend => {
new_buff = self.search_buff.split_off(start);
self.scan_start_offset = end - start;
}
}
std::mem::swap(&mut new_buff, &mut self.search_buff);
Some(new_buff)
}
#[allow(dead_code)]
#[inline(always)]
fn buff_size(&self) -> usize {
return self.read_buff.len();
}
}
impl<R> Debug for ByteChunker<R> {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ByteChunker")
.field("source", &std::any::type_name::<R>())
.field("fence", &self.fence)
.field("read_buff", &String::from_utf8_lossy(&self.read_buff))
.field("search_buff", &String::from_utf8_lossy(&self.search_buff))
.field("error_status", &self.error_status)
.field("match_dispo", &self.match_dispo)
.field("last_scan_matched", &self.last_scan_matched)
.field("scan_start_offset", &self.scan_start_offset)
.finish()
}
}
impl<R: Read> Iterator for ByteChunker<R> {
type Item = Result<Vec<u8>, RcErr>;
fn next(&mut self) -> Option<Self::Item> {
if self.error_status == ErrorStatus::Errored {
return None;
}
loop {
if !self.last_scan_matched {
match self.source.read(&mut self.read_buff) {
Err(e) => match e.kind() {
ErrorKind::WouldBlock | ErrorKind::Interrupted => {
spin_loop();
continue;
}
_ => match self.error_status {
ErrorStatus::Ok | ErrorStatus::Errored => {
self.error_status = ErrorStatus::Errored;
return Some(Err(e.into()));
}
ErrorStatus::Continue => {
return Some(Err(e.into()));
}
ErrorStatus::Ignore => {
continue;
}
},
},
Ok(0) => {
if self.search_buff.is_empty() {
return None;
} else {
let mut new_buff: Vec<u8> = Vec::new();
std::mem::swap(&mut self.search_buff, &mut new_buff);
return Some(Ok(new_buff));
}
}
Ok(n) => {
self.search_buff.extend_from_slice(&self.read_buff[..n]);
match self.scan_buffer() {
Some(v) => return Some(Ok(v)),
None => {
spin_loop();
continue;
}
}
}
}
} else {
match self.scan_buffer() {
Some(v) => return Some(Ok(v)),
None => {
spin_loop();
continue;
}
}
}
}
}
}