use bzip2::read::{BzDecoder, BzEncoder};
use flate2::Compression;
use flate2::bufread::MultiGzDecoder;
use gzp::ZWriter;
use gzp::deflate::Gzip;
use gzp::par::compress::{ParCompress, ParCompressBuilder};
use liblzma::read::{XzDecoder, XzEncoder};
use std::alloc::{Layout, alloc, dealloc};
use std::fs::File;
use std::io::IsTerminal;
use std::io::{self, BufRead, Read, Write};
use std::ptr::NonNull;
use std::slice;
use zstd::stream::read::Decoder as ZstdDecoder;
use zstd::stream::write::Encoder as ZstdEncoder;
pub const DEFAULT_IO_BUFFER_ALIGNMENT: usize = 4096;
struct AlignedBytes {
ptr: NonNull<u8>, len: usize, align: usize, }
impl AlignedBytes {
fn new(len: usize, align: usize) -> io::Result<Self> {
let align = normalized_alignment(align)?;
let len = len.max(1);
let layout = Layout::from_size_align(len, align).map_err(|_| {
io::Error::new(
io::ErrorKind::InvalidInput,
"buffer size and alignment form an invalid layout",
)
})?;
let ptr = unsafe { alloc(layout) };
let ptr = NonNull::new(ptr)
.ok_or_else(|| io::Error::other("failed to allocate aligned I/O buffer"))?;
Ok(Self { ptr, len, align })
}
#[inline]
fn as_slice(&self) -> &[u8] {
unsafe { slice::from_raw_parts(self.ptr.as_ptr(), self.len) }
}
#[inline]
fn as_mut_slice(&mut self) -> &mut [u8] {
unsafe { slice::from_raw_parts_mut(self.ptr.as_ptr(), self.len) }
}
}
impl Drop for AlignedBytes {
fn drop(&mut self) {
let layout = Layout::from_size_align(self.len, self.align)
.expect("aligned buffer layout should stay valid");
unsafe { dealloc(self.ptr.as_ptr(), layout) };
}
}
fn normalized_alignment(align: usize) -> io::Result<usize> {
if align == 0 || !align.is_power_of_two() {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"buffer alignment must be a non-zero power of two",
));
}
Ok(align)
}
struct AlignedBufReader<R> {
inner: R, buf: AlignedBytes, pos: usize, filled: usize, }
impl<R> AlignedBufReader<R> {
fn with_capacity_and_alignment(
capacity: usize,
alignment: usize,
inner: R,
) -> io::Result<Self> {
Ok(Self {
inner,
buf: AlignedBytes::new(capacity.max(1), alignment)?,
pos: 0,
filled: 0,
})
}
}
impl<R: Read> Read for AlignedBufReader<R> {
fn read(&mut self, out: &mut [u8]) -> io::Result<usize> {
if self.pos >= self.filled && out.len() >= self.buf.len {
self.pos = 0;
self.filled = 0;
return self.inner.read(out);
}
let available = self.fill_buf()?;
if available.is_empty() {
return Ok(0);
}
let len = available.len().min(out.len());
out[..len].copy_from_slice(&available[..len]);
self.consume(len);
Ok(len)
}
}
impl<R: Read> BufRead for AlignedBufReader<R> {
fn fill_buf(&mut self) -> io::Result<&[u8]> {
if self.pos >= self.filled {
self.filled = self.inner.read(self.buf.as_mut_slice())?;
self.pos = 0;
}
Ok(&self.buf.as_slice()[self.pos..self.filled])
}
fn consume(&mut self, amt: usize) {
self.pos = (self.pos + amt).min(self.filled);
}
}
struct AlignedBufWriter<W: Write> {
inner: W, buf: AlignedBytes, filled: usize, }
impl<W: Write> AlignedBufWriter<W> {
fn with_capacity_and_alignment(
capacity: usize,
alignment: usize,
inner: W,
) -> io::Result<Self> {
Ok(Self {
inner,
buf: AlignedBytes::new(capacity.max(1), alignment)?,
filled: 0,
})
}
}
impl<W: Write> AlignedBufWriter<W> {
fn flush_buf(&mut self) -> io::Result<()> {
let mut written = 0;
while written < self.filled {
let n = self
.inner
.write(&self.buf.as_slice()[written..self.filled])?;
if n == 0 {
return Err(io::Error::new(
io::ErrorKind::WriteZero,
"failed to flush aligned I/O buffer",
));
}
written += n;
}
self.filled = 0;
Ok(())
}
}
impl<W: Write> Drop for AlignedBufWriter<W> {
fn drop(&mut self) {
let _ = self.flush();
}
}
impl<W: Write> Write for AlignedBufWriter<W> {
fn write(&mut self, data: &[u8]) -> io::Result<usize> {
if data.len() > self.buf.len - self.filled {
self.flush_buf()?;
}
if self.filled == 0 && data.len() >= self.buf.len {
return self.inner.write(data);
}
let end = self.filled + data.len();
self.buf.as_mut_slice()[self.filled..end].copy_from_slice(data);
self.filled = end;
Ok(data.len())
}
fn flush(&mut self) -> io::Result<()> {
self.flush_buf()?;
self.inner.flush()
}
}
struct GzpGzipWriter<W: Write + Send + 'static> {
inner: Option<ParCompress<'static, Gzip, W>>,
}
impl<W: Write + Send + 'static> Write for GzpGzipWriter<W> {
#[inline]
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.inner
.as_mut()
.expect("gzp writer used after finish")
.write(buf)
}
#[inline]
fn flush(&mut self) -> io::Result<()> {
self.inner
.as_mut()
.expect("gzp writer used after finish")
.flush()
}
}
impl<W: Write + Send + 'static> Drop for GzpGzipWriter<W> {
fn drop(&mut self) {
if let Some(mut w) = self.inner.take() {
let _ = w.finish();
}
}
}
pub fn xopen(file: &str, buf_size: usize) -> io::Result<Box<dyn BufRead>> {
xopen_with_alignment(file, buf_size, DEFAULT_IO_BUFFER_ALIGNMENT)
}
pub fn xopen_with_alignment(
file: &str,
buf_size: usize,
buf_align: usize,
) -> io::Result<Box<dyn BufRead>> {
let buf_size = buf_size.max(4096);
let mut r: Box<dyn BufRead> = if file == "-" {
if io::stdin().is_terminal() {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"no data detected in STDIN",
));
}
Box::new(AlignedBufReader::with_capacity_and_alignment(
buf_size,
buf_align,
io::stdin().lock(),
)?)
} else {
Box::new(AlignedBufReader::with_capacity_and_alignment(
buf_size,
buf_align,
File::open(file)?,
)?)
};
let buf = r.fill_buf()?;
let reader: Box<dyn BufRead> = if buf.starts_with(&[0x1f, 0x8b]) {
Box::new(AlignedBufReader::with_capacity_and_alignment(
buf_size,
buf_align,
MultiGzDecoder::new(r),
)?)
} else if buf.starts_with(&[0xFD, b'7', b'z', b'X', b'Z', 0x00]) {
Box::new(AlignedBufReader::with_capacity_and_alignment(
buf_size,
buf_align,
XzDecoder::new(r),
)?)
} else if buf.starts_with(b"BZh") {
Box::new(AlignedBufReader::with_capacity_and_alignment(
buf_size,
buf_align,
BzDecoder::new(r),
)?)
} else if buf.starts_with(&[0x28, 0xB5, 0x2F, 0xFD]) {
Box::new(AlignedBufReader::with_capacity_and_alignment(
buf_size,
buf_align,
ZstdDecoder::new(r)?,
)?)
} else if buf.starts_with(&[0x04, 0x22, 0x4D, 0x18])
|| buf.starts_with(&[0x02, 0x21, 0x4C, 0x18])
{
Box::new(AlignedBufReader::with_capacity_and_alignment(
buf_size,
buf_align,
lz4_flex::frame::FrameDecoder::new(r),
)?)
} else {
r
};
Ok(reader)
}
pub fn xwrite(path: &str, buf_size: usize) -> io::Result<Box<dyn Write>> {
xwrite_with_alignment(path, buf_size, DEFAULT_IO_BUFFER_ALIGNMENT)
}
pub fn xwrite_with_alignment(
path: &str,
buf_size: usize,
buf_align: usize,
) -> io::Result<Box<dyn Write>> {
let buf_size = buf_size.max(4096);
if path == "-" {
return Ok(Box::new(AlignedBufWriter::with_capacity_and_alignment(
buf_size,
buf_align,
io::stdout().lock(),
)?));
}
let file = File::create(path)?;
let path_lc = path.to_ascii_lowercase();
let writer: Box<dyn Write> = if path_lc.ends_with(".gz") {
let parz = ParCompressBuilder::<Gzip>::new()
.compression_level(Compression::default())
.from_writer(file);
Box::new(AlignedBufWriter::with_capacity_and_alignment(
buf_size,
buf_align,
GzpGzipWriter { inner: Some(parz) },
)?)
} else if path_lc.ends_with(".xz") {
Box::new(AlignedBufWriter::with_capacity_and_alignment(
buf_size,
buf_align,
XzEncoder::new(file, 6),
)?)
} else if path_lc.ends_with(".bz2") {
Box::new(AlignedBufWriter::with_capacity_and_alignment(
buf_size,
buf_align,
BzEncoder::new(file, bzip2::Compression::default()),
)?)
} else if path_lc.ends_with(".zst") || path_lc.ends_with(".zstd") {
let encoder = ZstdEncoder::new(file, 0)?; Box::new(AlignedBufWriter::with_capacity_and_alignment(
buf_size,
buf_align,
encoder.auto_finish(),
)?)
} else if path_lc.ends_with(".lz4") {
let encoder = lz4_flex::frame::FrameEncoder::new(file);
Box::new(AlignedBufWriter::with_capacity_and_alignment(
buf_size,
buf_align,
encoder.auto_finish(),
)?)
} else {
Box::new(AlignedBufWriter::with_capacity_and_alignment(
buf_size, buf_align, file,
)?)
};
Ok(writer)
}
#[cfg(test)]
mod xwrite_drop_tests {
use super::*;
use std::fs;
use std::path::PathBuf;
use std::time::{SystemTime, UNIX_EPOCH};
fn temp_path(suffix: &str) -> PathBuf {
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos();
std::env::temp_dir().join(format!(
"fastseq-xopen-test-{}-{nanos}{suffix}",
std::process::id()
))
}
fn roundtrip(suffix: &str, data: &[u8]) {
let path = temp_path(suffix);
{
let mut writer = xwrite(path.to_str().unwrap(), 8192).unwrap();
writer.write_all(data).unwrap();
}
let mut reader = xopen(path.to_str().unwrap(), 8192).unwrap();
let mut content = Vec::new();
reader.read_to_end(&mut content).unwrap();
fs::remove_file(&path).unwrap();
assert_eq!(content, data);
}
const FASTA: &[u8] = b">chr1\nACGTACGTACGT\n>chrM\nTGCATGCATGCA\n";
#[test]
fn test_xwrite_flushes_on_drop_for_plain_file() {
let path = temp_path(".fa");
{
let mut writer = xwrite(path.to_str().unwrap(), 8192).unwrap();
writer.write_all(b">chr1\nACGT\n>chrM\nTGCA\n").unwrap();
}
let content = fs::read(&path).unwrap();
fs::remove_file(&path).unwrap();
assert_eq!(content, b">chr1\nACGT\n>chrM\nTGCA\n");
}
#[test]
fn test_xwrite_flushes_on_drop_for_gzip_file() {
let path = temp_path(".fa.gz");
{
let mut writer = xwrite(path.to_str().unwrap(), 8192).unwrap();
writer.write_all(b">chr1\nACGT\n>chrM\nTGCA\n").unwrap();
}
let mut reader = xopen(path.to_str().unwrap(), 8192).unwrap();
let mut content = Vec::new();
reader.read_to_end(&mut content).unwrap();
fs::remove_file(&path).unwrap();
assert_eq!(content, b">chr1\nACGT\n>chrM\nTGCA\n");
}
#[test]
fn test_roundtrip_plain() {
roundtrip(".fa", FASTA);
}
#[test]
fn test_roundtrip_gzip() {
roundtrip(".fa.gz", FASTA);
}
#[test]
fn test_roundtrip_xz() {
roundtrip(".fa.xz", FASTA);
}
#[test]
fn test_roundtrip_bzip2() {
roundtrip(".fa.bz2", FASTA);
}
#[test]
fn test_roundtrip_zstd() {
roundtrip(".fa.zst", FASTA);
}
#[test]
fn test_roundtrip_lz4() {
roundtrip(".fa.lz4", FASTA);
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::cell::RefCell;
use std::io::Cursor;
use std::rc::Rc;
#[test]
fn test_aligned_buf_reader_alignment() {
let reader =
AlignedBufReader::with_capacity_and_alignment(1024, 256, Cursor::new(Vec::<u8>::new()))
.unwrap();
assert_eq!(reader.buf.ptr.as_ptr() as usize % 256, 0);
}
#[test]
fn test_aligned_buf_writer_alignment() {
let writer =
AlignedBufWriter::with_capacity_and_alignment(1024, 256, Vec::<u8>::new()).unwrap();
assert_eq!(writer.buf.ptr.as_ptr() as usize % 256, 0);
}
#[test]
fn test_invalid_alignment_is_rejected() {
assert!(
AlignedBufReader::with_capacity_and_alignment(
1024,
3000,
Cursor::new(Vec::<u8>::new())
)
.is_err()
);
}
struct SpyReader {
data: Vec<u8>,
pos: usize,
read_sizes: Rc<RefCell<Vec<usize>>>,
}
impl Read for SpyReader {
fn read(&mut self, out: &mut [u8]) -> io::Result<usize> {
self.read_sizes.borrow_mut().push(out.len());
if self.pos >= self.data.len() {
return Ok(0);
}
let len = out.len().min(self.data.len() - self.pos);
out[..len].copy_from_slice(&self.data[self.pos..self.pos + len]);
self.pos += len;
Ok(len)
}
}
#[test]
fn test_aligned_buf_reader_large_read_bypasses_internal_buffer() {
let read_sizes = Rc::new(RefCell::new(Vec::new()));
let inner = SpyReader {
data: b"abcdefgh".to_vec(),
pos: 0,
read_sizes: Rc::clone(&read_sizes),
};
let mut reader = AlignedBufReader::with_capacity_and_alignment(4, 256, inner).unwrap();
let mut out = [0u8; 8];
let n = reader.read(&mut out).unwrap();
assert_eq!(n, 8);
assert_eq!(&out, b"abcdefgh");
assert_eq!(read_sizes.borrow().as_slice(), &[8]);
}
}