use anyhow::Result;
use fgoxide::io::Io;
use fgoxide::iter::{ChunkedReadAheadIterator, IntoChunkedReadAheadIterator};
use seq_io::fastq::Reader as FastqReader;
use std::cmp::min;
use std::io::{self, BufRead, Read};
use std::path::PathBuf;
const BUFFER_SIZE: usize = 1024 * 1024;
const DEFAULT_CHUNK_SIZE: usize = 128 * 1024;
const DEFAULT_CHUNK_COUNT: usize = 32;
type ByteChunk = io::Result<Vec<u8>>;
type ByteChunkStream = ChunkedReadAheadIterator<ByteChunk>;
pub(crate) struct ReadAheadBuilder {
pub(crate) path: PathBuf,
pub(crate) chunk_size: usize,
pub(crate) chunk_count: usize,
}
impl ReadAheadBuilder {
pub(crate) fn build(self) -> Result<FastqReader<ChunkReader>> {
let decompressed = Io::new(5, BUFFER_SIZE)
.new_reader(&self.path)
.map_err(|e| anyhow::anyhow!("Failed to open {:?}: {e}", self.path))?;
let chunker =
ByteChunker { reader: decompressed, chunk_size: self.chunk_size, done: false };
let chunks = chunker.read_ahead(1, self.chunk_count);
let chunk_reader = ChunkReader { chunks, current: Vec::new(), pos: 0 };
Ok(FastqReader::with_capacity(chunk_reader, BUFFER_SIZE))
}
}
impl Default for ReadAheadBuilder {
fn default() -> Self {
Self {
path: PathBuf::new(),
chunk_size: DEFAULT_CHUNK_SIZE,
chunk_count: DEFAULT_CHUNK_COUNT,
}
}
}
struct ByteChunker {
reader: Box<dyn BufRead + Send>,
chunk_size: usize,
done: bool,
}
impl Iterator for ByteChunker {
type Item = ByteChunk;
fn next(&mut self) -> Option<Self::Item> {
if self.done {
return None;
}
let mut buf = vec![0u8; self.chunk_size];
let mut filled = 0;
while filled < self.chunk_size {
match self.reader.read(&mut buf[filled..]) {
Ok(0) => break,
Ok(n) => filled += n,
Err(e) => {
self.done = true;
return Some(Err(e));
}
}
}
if filled == 0 {
self.done = true;
None
} else {
buf.truncate(filled);
Some(Ok(buf))
}
}
}
pub(crate) struct ChunkReader {
chunks: ByteChunkStream,
current: Vec<u8>,
pos: usize,
}
impl Read for ChunkReader {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if buf.is_empty() {
return Ok(0);
}
loop {
if self.pos < self.current.len() {
let n = min(buf.len(), self.current.len() - self.pos);
buf[..n].copy_from_slice(&self.current[self.pos..self.pos + n]);
self.pos += n;
return Ok(n);
}
match self.chunks.next() {
Some(Ok(chunk)) => {
self.current = chunk;
self.pos = 0;
}
Some(Err(e)) => return Err(e),
None => return Ok(0),
}
}
}
}