use crate::result::IonResult;
use std::io::BufRead;
pub trait IonDataSource: BufRead {
fn skip_bytes(&mut self, number_of_bytes: usize) -> IonResult<()>;
fn next_byte(&mut self) -> IonResult<Option<u8>>;
fn read_next_byte_while<F>(&mut self, byte_processor: &mut F) -> IonResult<usize>
where
F: FnMut(u8) -> bool;
fn read_slice<T, F>(
&mut self,
length: usize,
fallback_buffer: &mut Vec<u8>,
slice_processor: F,
) -> IonResult<T>
where
F: FnOnce(&[u8]) -> IonResult<T>;
}
impl<T: BufRead> IonDataSource for T {
fn skip_bytes(&mut self, number_of_bytes: usize) -> IonResult<()> {
let mut bytes_skipped = 0;
while bytes_skipped < number_of_bytes {
let buffer = self.fill_buf()?;
let bytes_in_buffer = buffer.len();
let bytes_to_skip = (number_of_bytes - bytes_skipped).min(bytes_in_buffer);
self.consume(bytes_to_skip);
bytes_skipped += bytes_to_skip;
}
Ok(())
}
#[inline(always)]
fn next_byte(&mut self) -> IonResult<Option<u8>> {
match self.fill_buf()?.first() {
Some(&byte) => {
self.consume(1);
Ok(Some(byte))
}
None => Ok(None),
}
}
fn read_next_byte_while<F>(&mut self, byte_processor: &mut F) -> IonResult<usize>
where
F: FnMut(u8) -> bool,
{
let mut number_of_bytes_processed: usize = 0;
let mut number_of_buffered_bytes: usize;
let mut number_of_bytes_consumed: usize = 0;
loop {
let buffer = self.fill_buf()?;
number_of_buffered_bytes = buffer.len();
for byte in buffer {
number_of_bytes_processed += 1;
if !byte_processor(*byte) {
self.consume(number_of_bytes_processed - number_of_bytes_consumed);
return Ok(number_of_bytes_processed);
}
}
self.consume(number_of_buffered_bytes);
number_of_bytes_consumed += number_of_buffered_bytes;
}
}
fn read_slice<V, F>(
&mut self,
number_of_bytes: usize,
fallback_buffer: &mut Vec<u8>,
slice_processor: F,
) -> IonResult<V>
where
F: FnOnce(&[u8]) -> IonResult<V>,
{
let buffer = self.fill_buf()?;
if buffer.len() >= number_of_bytes {
let result = slice_processor(&buffer[..number_of_bytes]);
self.consume(number_of_bytes);
return result;
}
let buffer: &mut [u8] = if fallback_buffer.len() < number_of_bytes {
fallback_buffer.resize(number_of_bytes, 0);
fallback_buffer.as_mut()
} else {
let (required_buffer, _) = fallback_buffer.split_at_mut(number_of_bytes);
required_buffer
};
self.read_exact(buffer)?;
slice_processor(buffer)
}
}
#[cfg(test)]
mod tests {
use super::IonDataSource;
use std::io::BufReader;
fn test_data(buffer_size: usize, data: &'static [u8]) -> impl IonDataSource {
BufReader::with_capacity(buffer_size, data)
}
#[test]
fn test_next_byte() {
let mut data_source = test_data(2, &[1, 2, 3, 4, 5]);
assert_eq!(Some(1), data_source.next_byte().unwrap());
assert_eq!(Some(2), data_source.next_byte().unwrap());
assert_eq!(Some(3), data_source.next_byte().unwrap());
assert_eq!(Some(4), data_source.next_byte().unwrap());
assert_eq!(Some(5), data_source.next_byte().unwrap());
}
#[test]
fn test_skip_bytes() {
let mut data_source = test_data(2, &[1, 2, 3, 4, 5]);
data_source.skip_bytes(3).unwrap();
assert_eq!(Some(4), data_source.next_byte().unwrap());
data_source.skip_bytes(1).unwrap();
assert_eq!(None, data_source.next_byte().unwrap());
}
#[test]
fn test_read_next_byte_while() {
let mut data_source = test_data(2, &[1, 2, 3, 4, 5]);
let mut sum: u64 = 0;
let processor = &mut |byte: u8| {
sum += byte as u64;
byte < 4
};
let number_of_bytes_processed = data_source.read_next_byte_while(processor).unwrap();
assert_eq!(number_of_bytes_processed, 4);
assert_eq!(sum, 10);
}
#[test]
fn test_read_slice() {
let mut data_source = test_data(2, &[1, 2, 3, 4, 5]);
let processor = &mut |data: &[u8]| Ok(data.iter().map(|byte| *byte as i32).sum());
let sum = data_source
.read_slice(4, &mut Vec::new(), processor)
.unwrap();
assert_eq!(10, sum);
}
}