use std::{
io::{self, ErrorKind, Read},
mem::{self, MaybeUninit},
num::NonZeroUsize,
};
use crate::tape::Tape;
use super::Run;
pub trait RunBacking: Read {
fn finalize(&mut self);
}
impl RunBacking for Box<dyn Read + Send> {
fn finalize(&mut self) {
let dummy = Box::new(io::Cursor::new(&[]));
*self = dummy;
}
}
pub struct ExternalRun<T, TBacking>
where
TBacking: RunBacking,
{
source: TBacking,
buffer: Vec<MaybeUninit<T>>,
read_idx: usize,
remaining_entries: usize,
}
impl<T, B> Drop for ExternalRun<T, B>
where
B: RunBacking,
{
fn drop(&mut self) {
if mem::needs_drop::<T>() {
while self.next().is_some() {}
}
}
}
pub fn create_buffer_run<T>(source: Vec<T>) -> ExternalRun<T, Box<dyn Read + Send>> {
let buffer: Vec<MaybeUninit<T>> = unsafe {
core::mem::transmute(source)
};
let remaining_entries = buffer.len();
ExternalRun {
source: Box::new(io::Cursor::new(&[])),
buffer,
read_idx: 0,
remaining_entries,
}
}
impl<T, TBacking> ExternalRun<T, TBacking>
where
TBacking: RunBacking,
{
pub fn from_tape(tape: Tape<TBacking>, buffer_size: NonZeroUsize) -> Self {
let num_entries = tape.num_entries();
let source = tape.into_backing();
let mut buffer = Vec::with_capacity(buffer_size.into());
for _ in 0..buffer_size.into() {
buffer.push(MaybeUninit::uninit());
}
let mut res = Self {
buffer,
read_idx: 0,
remaining_entries: num_entries,
source,
};
res.refill_buffer();
res
}
fn refill_buffer(&mut self) {
fn read_with_retry(source: &mut impl Read, buffer: &mut [u8]) -> io::Result<usize> {
loop {
match source.read(buffer) {
Ok(size) => break Ok(size),
Err(e) if e.kind() == ErrorKind::Interrupted => {}
err => break err,
}
}
}
fn try_read_exact(source: &mut impl Read, mut buffer: &mut [u8]) -> usize {
let mut bytes_read = 0;
while !buffer.is_empty() {
let read = read_with_retry(source, buffer).expect("Unable to perform read on FileRun. This means that the file was modified from under us!");
if read == 0 {
break;
}
buffer = &mut buffer[read..];
bytes_read += read;
}
bytes_read
}
let item_size = std::mem::size_of::<T>();
if item_size == 0 {
self.read_idx = 0;
return;
}
let slice = unsafe {
let start = self.buffer.as_mut_ptr() as *mut u8;
std::slice::from_raw_parts_mut(start, self.buffer.len() * item_size)
};
let bytes_read = try_read_exact(&mut self.source, slice);
assert_eq!(
0,
bytes_read % item_size,
"The size of the file does not match anymore! was it modified?"
);
let remaining_size = bytes_read / item_size;
self.buffer.truncate(remaining_size);
self.read_idx = 0;
}
}
impl<T, TBacking> Run<T> for ExternalRun<T, TBacking>
where
TBacking: RunBacking,
{
fn peek(&self) -> Option<&T> {
if self.remaining_entries == 0 {
None
} else {
unsafe { Some(self.buffer[self.read_idx].assume_init_ref()) }
}
}
fn next(&mut self) -> Option<T> {
if self.remaining_entries == 0 {
self.source.finalize();
return None;
}
let result = unsafe { self.buffer[self.read_idx].assume_init_read() };
self.read_idx += 1;
self.remaining_entries -= 1;
if self.read_idx >= self.buffer.len() {
self.refill_buffer();
}
Some(result)
}
fn remaining_items(&self) -> usize {
self.remaining_entries
}
}
#[cfg(test)]
mod test {
use std::fmt::Debug;
impl RunBacking for std::io::Cursor<Vec<u8>> {
fn finalize(&mut self) {
let len = self.get_ref().len();
self.set_position(len as u64);
}
}
use crate::tape::vec_to_tape;
use super::*;
fn test_file_run<T>(data: Vec<T>, buffer_size: NonZeroUsize)
where
T: Clone + Eq + Debug,
{
let tape = vec_to_tape(data.clone());
let mut run = ExternalRun::from_tape(tape, buffer_size);
assert_eq!(data.len(), run.remaining_items());
let collected = std::iter::from_fn(|| run.next()).collect::<Vec<_>>();
assert_eq!(data, collected);
}
#[test]
fn test_drop() {
let vec: Vec<i32> = (1..5).collect();
let data: Vec<_> = core::iter::repeat(&vec).take(20).cloned().collect();
let tape = vec_to_tape(data);
let mut run: ExternalRun<Vec<i32>, _> =
ExternalRun::from_tape(tape, NonZeroUsize::new(4096).unwrap());
for _ in 0..10 {
run.next();
}
drop(run);
}
#[test]
fn works_with_vecs() {
let d = (1..100).collect::<Vec<_>>();
let data = vec![d; 10];
test_file_run(data, NonZeroUsize::new(2).unwrap());
}
#[test]
fn works_with_zst() {
let data = vec![(); 10];
test_file_run(data, NonZeroUsize::new(2).unwrap());
}
#[test]
fn works_with_larger_buffer() {
let size = NonZeroUsize::new(20).unwrap();
let data = vec![(); 10];
test_file_run(data, size);
let data = vec![1337; 10];
test_file_run(data, size);
}
#[test]
fn works_with_empty_data() {
let size = NonZeroUsize::new(10).unwrap();
let data = vec![(); 0];
test_file_run(data, size);
let data = vec![1; 0];
test_file_run(data, size);
}
}