use std::borrow::Cow;
use std::io;
#[cfg(feature = "niffler")]
use crate::BoxedReader;
#[cfg(feature = "niffler")]
use std::path::Path;
use crate::{fastx::GenericReader, Error, Record, DEFAULT_MAX_RECORDS};
pub struct Reader<R: io::Read> {
reader: R,
overflow: Vec<u8>,
eof: bool,
batch_size: Option<usize>,
record_limit: Option<usize>,
}
#[cfg(feature = "niffler")]
impl Reader<BoxedReader> {
pub fn from_path<P: AsRef<Path>>(path: P) -> Result<Self, Error> {
let (reader, _format) = niffler::send::from_path(path)?;
Ok(Self::new(reader))
}
pub fn from_stdin() -> Result<Self, Error> {
let (reader, _format) = niffler::send::get_reader(Box::new(io::stdin()))?;
Ok(Self::new(reader))
}
pub fn from_optional_path<P: AsRef<Path>>(path: Option<P>) -> Result<Self, Error> {
match path {
Some(path) => Self::from_path(path),
None => Self::from_stdin(),
}
}
pub fn from_path_with_batch_size<P: AsRef<Path>>(
path: P,
batch_size: usize,
) -> Result<Self, Error> {
let (reader, _format) = niffler::send::from_path(path)?;
Self::with_batch_size(reader, batch_size)
}
pub fn from_stdin_with_batch_size(batch_size: usize) -> Result<Self, Error> {
let (reader, _format) = niffler::send::get_reader(Box::new(io::stdin()))?;
Self::with_batch_size(reader, batch_size)
}
pub fn from_optional_path_with_batch_size<P: AsRef<Path>>(
path: Option<P>,
batch_size: usize,
) -> Result<Self, Error> {
match path {
Some(path) => Self::from_path_with_batch_size(path, batch_size),
None => Self::from_stdin_with_batch_size(batch_size),
}
}
}
#[cfg(feature = "url")]
impl Reader<BoxedReader> {
pub fn from_url(url: &str) -> Result<Self, Error> {
let stream = reqwest::blocking::get(url)?;
let (reader, _format) = niffler::send::get_reader(Box::new(stream))?;
Ok(Self::new(reader))
}
pub fn from_url_with_batch_size(url: &str, batch_size: usize) -> Result<Self, Error> {
let stream = reqwest::blocking::get(url)?;
let (reader, _format) = niffler::send::get_reader(Box::new(stream))?;
Self::with_batch_size(reader, batch_size)
}
}
impl<R: io::Read> Reader<R> {
pub fn new(reader: R) -> Self {
Self {
overflow: Vec::with_capacity(1024),
reader,
eof: false,
batch_size: None,
record_limit: None,
}
}
pub fn with_batch_size(reader: R, batch_size: usize) -> Result<Self, Error> {
if batch_size == 0 {
return Err(Error::InvalidBatchSize(batch_size));
}
let mut reader = Self::new(reader);
reader.batch_size = Some(batch_size);
Ok(reader)
}
pub fn set_record_limit(&mut self, n: usize) {
self.record_limit = Some(n);
}
pub fn update_batch_size_in_bp(&mut self, batch_size_in_bp: usize) -> Result<(), Error> {
let mut rset = self.new_record_set_with_size(1);
rset.fill(self)?;
let mut batch_size = 1;
if let Some(record) = rset.iter().next() {
let len = record?.seq_raw().len();
if len > 0 {
batch_size = batch_size_in_bp.div_ceil(len);
}
}
self.reload(&mut rset);
self.batch_size = Some(batch_size);
Ok(())
}
pub fn new_record_set(&self) -> RecordSet {
if let Some(batch_size) = self.batch_size {
RecordSet::new(batch_size)
} else {
RecordSet::default()
}
}
pub fn new_record_set_with_size(&self, size: usize) -> RecordSet {
RecordSet::new(size)
}
pub fn add_to_overflow(&mut self, buffer: &[u8]) {
self.overflow.extend_from_slice(buffer);
}
pub fn batch_size(&self) -> usize {
self.batch_size.unwrap_or(DEFAULT_MAX_RECORDS)
}
pub fn set_eof(&mut self) {
self.eof = true;
}
pub fn exhausted(&self) -> bool {
self.eof && self.overflow.is_empty()
}
pub fn reload(&mut self, rset: &mut RecordSet) {
let buffer_slice = &rset.buffer;
let num_incoming = buffer_slice.len();
let num_existing = self.overflow.len();
let required_space = num_existing + num_incoming;
self.overflow
.resize(self.overflow.capacity().max(required_space), 0);
self.overflow.copy_within(..num_existing, num_incoming);
self.overflow[..num_incoming].copy_from_slice(buffer_slice);
self.overflow.truncate(required_space);
rset.clear();
}
}
#[cfg(feature = "ssh")]
impl Reader<BoxedReader> {
pub fn from_ssh(ssh_url: &str) -> Result<Self, Error> {
let ssh_reader = crate::ssh::SshReader::new(ssh_url)?;
let (reader, _format) = niffler::send::get_reader(Box::new(ssh_reader))?;
Ok(Self::new(reader))
}
pub fn from_ssh_with_batch_size(ssh_url: &str, batch_size: usize) -> Result<Self, Error> {
let ssh_reader = crate::ssh::SshReader::new(ssh_url)?;
let (reader, _format) = niffler::send::get_reader(Box::new(ssh_reader))?;
Self::with_batch_size(reader, batch_size)
}
}
#[cfg(feature = "gcs")]
impl Reader<BoxedReader> {
pub fn from_gcs(gcs_url: &str) -> Result<Self, Error> {
let gcs_reader = crate::gcs::GcsReader::new(gcs_url)?;
let (reader, _format) = niffler::send::get_reader(Box::new(gcs_reader))?;
Ok(Self::new(reader))
}
pub fn from_gcs_with_gcloud_args(gcs_url: &str, args: &[&str]) -> Result<Self, Error> {
let gcs_reader = crate::gcs::GcsReader::with_gcloud_args(gcs_url, args)?;
let (reader, _format) = niffler::send::get_reader(Box::new(gcs_reader))?;
Ok(Self::new(reader))
}
pub fn from_gcs_with_project(gcs_url: &str, project_id: &str) -> Result<Self, Error> {
let gcs_reader = crate::gcs::GcsReader::with_project(gcs_url, project_id)?;
let (reader, _format) = niffler::send::get_reader(Box::new(gcs_reader))?;
Ok(Self::new(reader))
}
pub fn from_gcs_with_batch_size(gcs_url: &str, batch_size: usize) -> Result<Self, Error> {
let gcs_reader = crate::gcs::GcsReader::new(gcs_url)?;
let (reader, _format) = niffler::send::get_reader(Box::new(gcs_reader))?;
Self::with_batch_size(reader, batch_size)
}
pub fn from_gcs_with_gcloud_args_and_batch_size(
gcs_url: &str,
gcloud_args: &[&str],
batch_size: usize,
) -> Result<Self, Error> {
let gcs_reader = crate::gcs::GcsReader::with_gcloud_args(gcs_url, gcloud_args)?;
let (reader, _format) = niffler::send::get_reader(Box::new(gcs_reader))?;
Self::with_batch_size(reader, batch_size)
}
pub fn from_gcs_with_project_and_batch_size(
gcs_url: &str,
project_id: &str,
batch_size: usize,
) -> Result<Self, Error> {
let gcs_reader = crate::gcs::GcsReader::with_project(gcs_url, project_id)?;
let (reader, _format) = niffler::send::get_reader(Box::new(gcs_reader))?;
Self::with_batch_size(reader, batch_size)
}
}
#[derive(Debug)]
pub struct RecordSet {
buffer: Vec<u8>,
record_starts: Vec<usize>,
last_searched_pos: usize,
positions: Vec<Positions>,
capacity: usize,
avg_record_size: usize,
}
impl Default for RecordSet {
fn default() -> Self {
Self::new(1024)
}
}
impl RecordSet {
#[must_use]
pub fn new(capacity: usize) -> Self {
Self {
buffer: Vec::with_capacity(256 * 1024), record_starts: Vec::new(),
last_searched_pos: 0,
positions: Vec::with_capacity(capacity),
capacity,
avg_record_size: 1024, }
}
pub fn clear(&mut self) {
self.buffer.clear();
self.record_starts.clear();
self.positions.clear();
self.last_searched_pos = 0;
}
pub fn n_records(&self) -> usize {
self.positions.len()
}
pub fn truncate(&mut self, n: usize) {
self.positions.truncate(n);
}
fn find_record_starts(&mut self, current_pos: usize) {
let search_buffer = &self.buffer[self.last_searched_pos..current_pos];
memchr::memchr_iter(b'>', search_buffer).for_each(|i| {
let abs_pos = i + self.last_searched_pos;
if abs_pos == 0 || self.buffer[abs_pos - 1] == b'\n' {
self.record_starts.push(abs_pos);
}
});
self.last_searched_pos = current_pos;
}
fn update_avg_record_size(&mut self, total_bytes: usize) {
let total_records = self.positions.len();
if total_records > 0 {
self.avg_record_size = total_bytes / total_records;
}
}
pub fn fill<R: io::Read>(
&mut self,
reader: &mut Reader<R>,
) -> std::result::Result<bool, Error> {
self.clear();
if !reader.overflow.is_empty() {
self.buffer.extend_from_slice(&reader.overflow);
reader.overflow.clear();
}
self.find_record_starts(self.buffer.len());
let initial_complete_records = if self.record_starts.len() > 1 {
self.record_starts.len() - 1
} else if self.record_starts.len() == 1 && reader.eof {
1
} else {
0
};
if initial_complete_records >= self.capacity {
return self.process_records(reader);
}
let records_needed = self.capacity.saturating_sub(initial_complete_records);
let target_read_size = self
.avg_record_size
.saturating_mul(records_needed)
.saturating_add(self.avg_record_size * 2) .min(4096);
let mut current_pos = self.buffer.len();
self.buffer.resize(current_pos + target_read_size, 0);
let required_record_starts = self.capacity + 1;
while self.record_starts.len() < required_record_starts && !reader.eof {
let remaining_space = self.buffer.len() - current_pos;
if remaining_space == 0 {
let additional = (target_read_size / 10).max(4096);
self.buffer.resize(self.buffer.len() + additional, 0);
}
match reader.reader.read(&mut self.buffer[current_pos..]) {
Ok(0) => {
reader.set_eof();
break;
}
Ok(n) => {
current_pos += n;
self.find_record_starts(current_pos);
}
Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
Err(e) => return Err(e.into()),
}
}
self.buffer.truncate(current_pos);
self.process_records(reader)
}
fn process_records<R: io::Read>(&mut self, reader: &mut Reader<R>) -> Result<bool, Error> {
let available_complete = if reader.eof && !self.record_starts.is_empty() {
self.record_starts.len()
} else if self.record_starts.len() > 1 {
self.record_starts.len() - 1
} else {
0
};
let records_to_process = available_complete.min(self.capacity);
if records_to_process > 0 {
for i in 0..records_to_process {
let record_start = self.record_starts[i];
let record_end = if i + 1 < self.record_starts.len() {
self.record_starts[i + 1]
} else {
self.buffer.len()
};
let seq_start = memchr::memchr(b'\n', &self.buffer[record_start..record_end])
.map_or(record_end, |pos| record_start + pos + 1);
self.positions.push(Positions {
start: record_start,
seq_start,
end: record_end,
});
}
let truncate_pos = if records_to_process < self.record_starts.len() {
self.record_starts[records_to_process]
} else {
self.buffer.len()
};
self.update_avg_record_size(truncate_pos);
if truncate_pos < self.buffer.len() {
reader
.overflow
.extend_from_slice(&self.buffer[truncate_pos..]);
}
self.buffer.truncate(truncate_pos);
} else if !self.buffer.is_empty() {
reader.overflow.extend_from_slice(&self.buffer);
self.buffer.clear();
}
Ok(!self.positions.is_empty())
}
pub fn iter(&self) -> impl Iterator<Item = Result<RefRecord<'_>, Error>> {
self.positions
.iter()
.map(move |&pos| RefRecord::new(&self.buffer, pos))
}
}
#[derive(Debug, Default, Clone, Copy)]
struct Positions {
start: usize,
seq_start: usize,
end: usize,
}
#[derive(Debug, Default, Clone)]
pub struct RefRecord<'a> {
buffer: &'a [u8],
positions: Positions,
}
impl<'a> RefRecord<'a> {
fn new(buffer: &'a [u8], positions: Positions) -> Result<Self, Error> {
let ref_record = Self { buffer, positions };
ref_record.validate_record()?;
Ok(ref_record)
}
fn validate_record(&self) -> Result<(), Error> {
if self.positions.start >= self.buffer.len() || self.positions.end > self.buffer.len() {
return Err(Error::UnboundedPositions);
}
if self.buffer[self.positions.start] != b'>' {
return Err(Error::InvalidHeader(
self.buffer[self.positions.start].into(),
'>',
));
}
Ok(())
}
#[inline]
#[must_use]
pub fn id(&self) -> &[u8] {
self.access_buffer(
self.positions.start + 1, self.positions.seq_start,
)
}
#[inline]
#[must_use]
pub fn seq(&self) -> Cow<'_, [u8]> {
let seq_region = self.seq_raw();
let newlines = memchr::memchr_iter(b'\n', seq_region).collect::<Vec<_>>();
if newlines.is_empty() {
Cow::Borrowed(seq_region)
} else if newlines.len() == 1 && seq_region.ends_with(b"\n") {
Cow::Borrowed(&seq_region[..seq_region.len() - 1])
} else {
let mut filtered = Vec::with_capacity(seq_region.len() - newlines.len());
let mut start = 0;
for &end in &newlines {
filtered.extend_from_slice(&seq_region[start..end]);
start = end + 1;
}
if start < seq_region.len() {
filtered.extend_from_slice(&seq_region[start..]);
}
Cow::Owned(filtered)
}
}
fn seq_raw(&self) -> &[u8] {
&self.buffer[self.positions.seq_start..self.positions.end]
}
#[inline(always)]
fn access_buffer(&self, left: usize, right: usize) -> &[u8] {
unsafe {
self.buffer.get_unchecked(left..right - 1)
}
}
}
impl Record for RefRecord<'_> {
fn id(&self) -> &[u8] {
self.id()
}
fn seq(&self) -> Cow<'_, [u8]> {
self.seq()
}
fn seq_raw(&self) -> &[u8] {
self.seq_raw()
}
fn qual(&self) -> Option<&[u8]> {
None
}
}
impl<R> GenericReader for crate::fasta::Reader<R>
where
R: io::Read + Send,
{
type RecordSet = crate::fasta::RecordSet;
type Error = crate::Error;
type RefRecord<'a> = crate::fasta::RefRecord<'a>;
fn new_record_set(&self) -> Self::RecordSet {
if let Some(batch_size) = self.batch_size {
Self::RecordSet::new(batch_size)
} else {
Self::RecordSet::default()
}
}
fn fill(&mut self, record: &mut Self::RecordSet) -> std::result::Result<bool, crate::Error> {
if let Some(0) = self.record_limit {
return Ok(false);
}
let filled = record.fill(self)?;
if filled {
if let Some(remaining) = &mut self.record_limit {
let n = record.n_records().min(*remaining);
record.truncate(n);
*remaining -= n;
}
}
Ok(filled)
}
fn iter(
record_set: &Self::RecordSet,
) -> impl ExactSizeIterator<Item = std::result::Result<Self::RefRecord<'_>, crate::Error>> {
record_set
.positions
.iter()
.map(move |&pos| Self::RefRecord::new(&record_set.buffer, pos))
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Cursor;
fn create_test_record(id: &str, seq: &str) -> String {
format!(">{id}\n{seq}\n")
}
#[test]
fn test_basic_record_parsing() {
let record = create_test_record("test1", "ACTG");
let mut reader = Reader::new(Cursor::new(record));
let mut record_set = RecordSet::new(1);
assert!(record_set.fill(&mut reader).unwrap());
let parsed_record = record_set.iter().next().unwrap().unwrap();
assert_eq!(parsed_record.id_str(), "test1");
assert_eq!(parsed_record.seq_str(), "ACTG");
}
#[test]
fn test_multiple_records() {
let records = [
create_test_record("test1", "ACTG"),
create_test_record("test2", "TGCA"),
]
.join("");
let mut reader = Reader::new(Cursor::new(records));
let mut record_set = RecordSet::new(2);
assert!(record_set.fill(&mut reader).unwrap());
let records: Vec<_> = record_set.iter().collect::<Result<_, _>>().unwrap();
assert_eq!(records.len(), 2);
assert_eq!(records[0].id_str(), "test1");
assert_eq!(records[1].id_str(), "test2");
}
#[test]
fn test_invalid_header() {
let record = "XACTG\nTGCA\n";
let mut reader = Reader::new(Cursor::new(record));
let mut record_set = RecordSet::new(1);
assert!(!record_set.fill(&mut reader).unwrap());
}
#[test]
fn test_junk_before_valid_record() {
let record = format!("X\n{}", create_test_record("test1", "ACTG"));
let mut reader = Reader::new(Cursor::new(record));
let mut record_set = RecordSet::new(1);
assert!(record_set.fill(&mut reader).unwrap());
let parsed_record = record_set.iter().next().unwrap().unwrap();
assert_eq!(parsed_record.id_str(), "test1");
assert_eq!(parsed_record.seq_str(), "ACTG");
}
#[test]
fn test_performance_single_vs_multiline() {
let single_line = create_test_record("single", "ACTG");
let mut reader = Reader::new(Cursor::new(single_line));
let mut record_set = RecordSet::new(1);
assert!(record_set.fill(&mut reader).unwrap());
let record = record_set.iter().next().unwrap().unwrap();
let seq = record.seq();
match seq {
std::borrow::Cow::Borrowed(_) => {
assert_eq!(record.seq_str(), "ACTG");
}
std::borrow::Cow::Owned(_) => {
panic!("Single-line sequence should return borrowed data for optimal performance");
}
}
let multiline = ">multiline\nAC\nTG\n";
let mut reader = Reader::new(Cursor::new(multiline));
let mut record_set = RecordSet::new(1);
assert!(record_set.fill(&mut reader).unwrap());
let record = record_set.iter().next().unwrap().unwrap();
let seq = record.seq();
match seq {
std::borrow::Cow::Borrowed(_) => {
panic!("Multiline sequence should return owned data after newline filtering");
}
std::borrow::Cow::Owned(_) => {
assert_eq!(record.seq_str(), "ACTG");
}
}
}
#[test]
fn test_passthrough_read() {
let record = create_test_record("test1", "ACTG");
let rdr = Cursor::new(record);
let (pass, _comp) = niffler::get_reader(Box::new(rdr)).unwrap();
let mut reader = Reader::new(pass);
let mut record_set = RecordSet::new(1);
assert!(record_set.fill(&mut reader).unwrap());
let parsed_record = record_set.iter().next().unwrap().unwrap();
assert_eq!(parsed_record.id_str(), "test1");
assert_eq!(parsed_record.seq_str(), "ACTG");
assert!(!record_set.fill(&mut reader).unwrap());
}
#[test]
fn test_multiline_fasta() {
let multiline_record = ">test_multiline\nACTG\nTGCA\nGGCC\n";
let mut reader = Reader::new(Cursor::new(multiline_record));
let mut record_set = RecordSet::new(1);
assert!(record_set.fill(&mut reader).unwrap());
let parsed_record = record_set.iter().next().unwrap().unwrap();
assert_eq!(parsed_record.id_str(), "test_multiline");
assert_eq!(parsed_record.seq_str(), "ACTGTGCAGGCC");
}
#[test]
fn test_mixed_single_and_multiline() {
let mixed_records = ">single\nACTG\n>multiline\nTGCA\nGGCC\nAAAA\n>another_single\nTTTT\n";
let mut reader = Reader::new(Cursor::new(mixed_records));
let mut record_set = RecordSet::new(3);
assert!(record_set.fill(&mut reader).unwrap());
let records: Vec<_> = record_set.iter().collect::<Result<_, _>>().unwrap();
assert_eq!(records.len(), 3);
assert_eq!(records[0].id_str(), "single");
assert_eq!(records[0].seq_str(), "ACTG");
assert_eq!(records[1].id_str(), "multiline");
assert_eq!(records[1].seq_str(), "TGCAGGCCAAAA");
assert_eq!(records[2].id_str(), "another_single");
assert_eq!(records[2].seq_str(), "TTTT");
}
#[cfg(feature = "niffler")]
#[test]
fn test_from_path() {
for ext in ["", ".gz", ".zst"] {
dbg!(ext);
let path = if ext.is_empty() {
String::from("./data/sample.fasta")
} else {
format!("./data/sample.fasta{}", ext)
};
let mut reader = Reader::from_path(path).unwrap();
let mut record_set = RecordSet::new(1);
assert!(record_set.fill(&mut reader).unwrap());
let parsed_record = record_set.iter().next().unwrap().unwrap();
println!("{}", parsed_record.id_str());
}
}
#[cfg(feature = "niffler")]
#[test]
fn test_from_path_with_batch_size() {
for ext in ["", ".gz", ".zst"] {
dbg!(ext);
let path = if ext.is_empty() {
String::from("./data/sample.fasta")
} else {
format!("./data/sample.fasta{}", ext)
};
let mut reader = Reader::from_path_with_batch_size(path, 2).unwrap();
let mut record_set = RecordSet::new(1);
assert!(record_set.fill(&mut reader).unwrap());
let parsed_record = record_set.iter().next().unwrap().unwrap();
println!("{}", parsed_record.id_str());
}
}
}