pub(crate) mod utils;
#[cfg(feature = "threading")]
mod consumer;
#[cfg(feature = "threading")]
mod producer;
#[macro_use]
mod macros;
use std::collections::HashSet;
use std::io::BufRead;
use std::num::NonZeroUsize;
use std::str::FromStr;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::sync::Mutex;
use std::time::Duration;
#[cfg(feature = "threading")]
use crossbeam_channel::Receiver;
#[cfg(feature = "threading")]
use crossbeam_channel::RecvTimeoutError;
#[cfg(feature = "threading")]
use crossbeam_channel::Sender;
#[cfg(feature = "threading")]
use crossbeam_channel::TryRecvError;
use quick_xml::events::attributes::Attribute;
use quick_xml::events::BytesEnd;
use quick_xml::events::BytesStart;
use quick_xml::events::Event;
use quick_xml::Error as XmlError;
use quick_xml::Reader;
use super::error::Error;
#[cfg(feature = "threading")]
use self::consumer::Consumer;
#[cfg(feature = "threading")]
use self::producer::Producer;
#[allow(unused)]
const SLEEP_DURATION: Duration = Duration::from_millis(10);
#[cfg(feature = "threading")]
#[derive(Debug, PartialEq, Eq)]
enum State {
Idle,
Started,
Waiting,
Finished,
}
#[cfg(feature = "threading")]
pub struct ThreadedParser<B: BufRead, D: UniprotDatabase> {
state: State,
producer: Producer<B>,
consumers: Vec<Consumer<D>>,
r_item: Receiver<Result<D::Entry, Error>>,
}
#[cfg(feature = "threading")]
impl<B: BufRead + Send + 'static, D: UniprotDatabase> ThreadedParser<B, D> {
pub fn new(reader: B) -> Self {
lazy_static! {
static ref THREADS: usize = num_cpus::get();
}
let threads = unsafe { NonZeroUsize::new_unchecked(*THREADS) };
Self::with_threads(reader, threads)
}
pub fn with_threads(reader: B, threads: NonZeroUsize) -> Self {
let threads = threads.get();
let mut buffer = Vec::new();
let mut xml = Reader::from_reader(reader);
xml.expand_empty_elements(true);
let (s_text, r_text) = crossbeam_channel::bounded(threads);
let (s_item, r_item) = crossbeam_channel::bounded(threads);
loop {
buffer.clear();
match xml.read_event_into(&mut buffer) {
Ok(Event::Start(e)) if D::ROOTS.contains(&e.local_name().as_ref()) => {
break;
}
Ok(Event::Start(e)) => {
let x = String::from_utf8_lossy(e.local_name().as_ref()).into_owned();
s_item
.send(Err(Error::UnexpectedRoot(x)))
.expect("channel should still be connected");
break;
}
Err(e) => {
s_item
.send(Err(Error::from(e)))
.expect("channel should still be connected");
break;
}
Ok(Event::Eof) => {
let e = String::from("xml");
s_item
.send(Err(Error::from(XmlError::UnexpectedEof(e))))
.expect("channel should still be connected");
break;
}
_ => (),
}
}
let producer = Producer::new(xml.into_inner(), threads, s_text);
let mut consumers = Vec::with_capacity(threads);
for _ in 0..threads {
let consumer = Consumer::new(r_text.clone(), s_item.clone());
consumers.push(consumer);
}
Self {
r_item,
producer,
consumers,
state: State::Idle,
}
}
}
#[cfg(feature = "threading")]
impl<B: BufRead + Send + 'static, D: UniprotDatabase> Iterator for ThreadedParser<B, D> {
type Item = Result<D::Entry, Error>;
fn next(&mut self) -> Option<Self::Item> {
loop {
match self.state {
State::Idle => {
self.state = State::Started;
self.producer.start();
for consumer in &mut self.consumers {
consumer.start();
}
}
State::Finished => return None,
State::Waiting => {
self.producer.join().unwrap();
for consumer in &mut self.consumers {
consumer.join().unwrap();
}
match self.r_item.try_recv() {
Ok(item) => return Some(item),
Err(TryRecvError::Empty) => {
self.state = State::Finished;
return None;
}
Err(TryRecvError::Disconnected) => {
self.state = State::Finished;
return Some(Err(Error::DisconnectedChannel));
}
}
}
State::Started => {
match self.r_item.recv_timeout(SLEEP_DURATION) {
Ok(item) => return Some(item),
Err(RecvTimeoutError::Timeout) => {
if !self.producer.is_alive() {
self.state = State::Waiting
}
}
Err(RecvTimeoutError::Disconnected) => {
self.state = State::Finished;
return Some(Err(Error::DisconnectedChannel));
}
}
}
}
}
}
}
#[cfg(feature = "threading")]
pub type Parser<B, D> = ThreadedParser<B, D>;
pub struct SequentialParser<B: BufRead, D: UniprotDatabase> {
xml: Reader<B>,
buffer: Vec<u8>,
cache: Option<<Self as Iterator>::Item>,
finished: bool,
root: Vec<u8>,
}
impl<B: BufRead, D: UniprotDatabase> SequentialParser<B, D> {
pub fn new(reader: B) -> Self {
let mut root = Vec::new();
let mut buffer = Vec::new();
let mut xml = Reader::from_reader(reader);
xml.expand_empty_elements(true);
let cache = loop {
buffer.clear();
match xml.read_event_into(&mut buffer) {
Err(e) => break Some(Err(Error::from(e))),
Ok(Event::Start(e)) if D::ROOTS.contains(&e.local_name().as_ref()) => {
root.extend(e.local_name().as_ref());
break None;
}
Ok(Event::Start(e)) => {
let x = String::from_utf8_lossy(e.local_name().as_ref()).into_owned();
break Some(Err(Error::UnexpectedRoot(x)));
}
Ok(Event::Eof) => {
let e = String::from("xml");
break Some(Err(Error::from(XmlError::UnexpectedEof(e))));
}
_ => (),
}
};
Self {
xml,
buffer,
cache,
finished: false,
root,
}
}
pub fn parse_entry(reader: B) -> <Self as Iterator>::Item {
let mut xml = Reader::from_reader(reader);
xml.expand_empty_elements(true);
let mut parser = Self {
xml,
buffer: Vec::new(),
cache: None,
finished: false,
root: Vec::new(),
};
parser.next().unwrap_or_else(|| {
let e = String::from("xml");
Err(Error::from(XmlError::UnexpectedEof(e)))
})
}
}
impl<B: BufRead, D: UniprotDatabase> Iterator for SequentialParser<B, D> {
type Item = Result<D::Entry, Error>;
fn next(&mut self) -> Option<Self::Item> {
if let Some(item) = self.cache.take() {
return Some(item);
}
if self.finished {
return None;
}
loop {
self.buffer.clear();
match self.xml.read_event_into(&mut self.buffer) {
Err(e) => return Some(Err(Error::from(e))),
Ok(Event::Eof) => {
let e = String::from("entry");
self.finished = true;
return Some(Err(Error::from(XmlError::UnexpectedEof(e))));
}
Ok(Event::End(ref e)) if e.local_name().as_ref() == &self.root => {
self.finished = true;
return None;
}
Ok(Event::Start(ref e)) if e.local_name().as_ref() == b"entry" => {
return Some(D::Entry::from_xml(
&e.clone().into_owned(),
&mut self.xml,
&mut self.buffer,
));
}
_ => (),
}
}
}
}
#[cfg(not(feature = "threading"))]
pub type Parser<B, D> = SequentialParser<B, D>;
pub trait FromXml: Sized {
fn from_xml<B: BufRead>(
event: &BytesStart,
reader: &mut Reader<B>,
buffer: &mut Vec<u8>,
) -> Result<Self, Error>;
}
pub trait UniprotDatabase {
type Entry: FromXml + Send + 'static;
const ROOTS: &'static [&'static [u8]];
}