use std::collections::HashMap;
use std::io::BufRead;
use std::io::BufReader;
use std::num::NonZeroUsize;
use std::convert::TryFrom;
use std::fs::File;
use std::iter::Iterator;
use crossbeam_channel::Receiver;
use crossbeam_channel::Sender;
use crossbeam_channel::TryRecvError;
use lazy_static::lazy_static;
use crate::ast::EntityFrame;
use crate::ast::Frame;
use crate::ast::HeaderClause;
use crate::ast::HeaderFrame;
use crate::ast::OboDoc;
use crate::error::Error;
use crate::error::SyntaxError;
use crate::error::ThreadingError;
use crate::syntax::Lexer;
use crate::syntax::Rule;
use super::FromPair;
use super::Parser;
use self::consumer::Consumer;
use self::consumer::Input as ConsumerInput;
use self::consumer::Output as ConsumerOutput;
mod consumer;
#[derive(PartialEq, Eq)]
enum State {
Idle,
Started,
AtEof,
Waiting,
Finished,
}
#[cfg_attr(feature = "_doc", doc(cfg(feature = "threading")))]
pub struct ThreadedParser<B: BufRead> {
stream: B,
state: State,
consumers: Vec<Consumer>,
r_item: Receiver<ConsumerOutput>,
s_text: Sender<Option<ConsumerInput>>,
line: String,
threads: NonZeroUsize,
line_offset: usize,
offset: usize,
ordered: bool,
read_index: usize,
sent_index: usize,
queue: HashMap<usize, Result<Frame, Error>>,
}
impl<B: BufRead> AsRef<B> for ThreadedParser<B> {
fn as_ref(&self) -> &B {
&self.stream
}
}
impl<B: BufRead> AsRef<B> for Box<ThreadedParser<B>> {
fn as_ref(&self) -> &B {
(**self).as_ref()
}
}
impl<B: BufRead> AsMut<B> for ThreadedParser<B> {
fn as_mut(&mut self) -> &mut B {
&mut self.stream
}
}
impl<B: BufRead> AsMut<B> for Box<ThreadedParser<B>> {
fn as_mut(&mut self) -> &mut B {
(**self).as_mut()
}
}
impl<B: BufRead> From<B> for ThreadedParser<B> {
fn from(reader: B) -> Self {
<Self as Parser<B>>::new(reader)
}
}
impl<B: BufRead> From<B> for Box<ThreadedParser<B>> {
fn from(reader: B) -> Self {
Box::new(ThreadedParser::new(reader))
}
}
impl<B: BufRead> Iterator for ThreadedParser<B> {
type Item = Result<Frame, Error>;
fn next(&mut self) -> Option<Self::Item> {
macro_rules! send_or_error {
($channel:expr, $msg:expr) => {
if $channel.send($msg).is_err() {
self.state = State::Finished;
let err = ThreadingError::DisconnectedChannel;
return Some(Err(Error::from(err)));
}
};
}
loop {
if self.ordered {
if let Some(result) = self.queue.remove(&self.read_index) {
self.read_index += 1;
return Some(result);
}
}
match self.r_item.try_recv().map(|i| (i.res, i.index)) {
Ok((Ok(entry), _)) if !self.ordered => return Some(Ok(entry)),
Ok((Err(e), _)) if !self.ordered => {
self.state = State::Finished;
return Some(Err(e));
}
Ok((result, index)) if index == self.read_index => {
self.read_index += 1;
return Some(result);
}
Ok((result, index)) => {
self.queue.insert(index, result);
}
Err(TryRecvError::Empty) if self.state == State::Waiting => {
self.state = State::Finished;
return None;
}
Err(TryRecvError::Empty) => (),
Err(TryRecvError::Disconnected) => {
if self.state != State::Finished {
self.state = State::Finished;
return Some(Err(Error::from(ThreadingError::DisconnectedChannel)));
}
}
}
match self.state {
State::Waiting => (),
State::AtEof => {
self.state = State::Waiting;
for consumer in self.consumers.iter_mut() {
consumer.join().unwrap();
}
}
State::Idle => {
self.state = State::Started;
for consumer in &mut self.consumers {
consumer.start();
}
}
State::Finished => {
return None;
}
State::Started => {
let mut lines = String::new();
let mut l: &str;
let mut local_line_offset = 0;
let mut local_offset = 0;
loop {
lines.push_str(&self.line);
self.line.clear();
if let Err(e) = self.stream.read_line(&mut self.line) {
self.state = State::Finished;
return Some(Err(Error::from(e)));
}
l = self.line.trim_start();
if l.starts_with('[') {
let msg = ConsumerInput::new(
lines,
self.sent_index,
self.line_offset,
self.offset,
);
send_or_error!(self.s_text, Some(msg));
self.sent_index += 1;
self.line_offset += local_line_offset + 1;
self.offset += local_offset + self.line.len();
break;
} else if self.line.is_empty() {
self.state = State::AtEof;
if !lines.chars().all(|c| c.is_whitespace()) {
let msg = ConsumerInput::new(
lines,
self.sent_index,
self.line_offset,
self.offset,
);
send_or_error!(self.s_text, Some(msg));
}
for _ in 0..self.threads.get() {
send_or_error!(self.s_text, None);
}
break;
}
local_line_offset += 1;
local_offset += self.line.len();
}
}
}
}
}
}
impl<B: BufRead> Parser<B> for ThreadedParser<B> {
fn new(stream: B) -> Self {
lazy_static! {
static ref THREADS: usize = num_cpus::get();
}
let threads = unsafe { NonZeroUsize::new_unchecked(*THREADS) };
Self::with_threads(stream, threads)
}
fn with_threads(mut stream: B, threads: NonZeroUsize) -> Self {
let mut frame_clauses = Vec::new();
let mut line = String::new();
let mut l: &str;
let mut offset = 0;
let mut line_offset = 0;
let (s_text, r_text) = crossbeam_channel::unbounded();
let (s_item, r_item) = crossbeam_channel::unbounded();
let header = loop {
line.clear();
if let Err(e) = stream.read_line(&mut line) {
break Err(Error::from(e));
};
l = line.trim_start();
if !l.starts_with('[') && !l.is_empty() {
let clause = Lexer::tokenize(Rule::HeaderClause, &line)
.map_err(SyntaxError::from)
.map(|mut p| p.next().unwrap())
.and_then(HeaderClause::from_pair)
.map_err(SyntaxError::from);
match clause {
Ok(c) => frame_clauses.push(c),
Err(e) => {
let err = e.with_offsets(line_offset, offset);
break Err(Error::from(err));
}
};
}
if l.starts_with('[') || line.is_empty() {
break Ok(Frame::from(HeaderFrame::from(frame_clauses)));
} else {
line_offset += 1;
offset += line.len();
}
};
let mut consumers = Vec::with_capacity(threads.get());
for _ in 0..threads.get() {
let c = Consumer::new(r_text.clone(), s_item.clone());
consumers.push(c);
}
s_item.send(ConsumerOutput::new(header, 0)).ok();
Self {
stream,
r_item,
s_text,
threads,
consumers,
line,
line_offset,
offset,
ordered: false,
read_index: 0,
sent_index: 1,
queue: HashMap::new(),
state: State::Idle,
}
}
fn ordered(&mut self, ordered: bool) -> &mut Self {
self.ordered = ordered;
self
}
fn into_inner(self) -> B {
self.stream
}
}
impl<B: BufRead> Parser<B> for Box<ThreadedParser<B>> {
fn new(stream: B) -> Self {
Box::new(ThreadedParser::new(stream))
}
fn with_threads(stream: B, threads: NonZeroUsize) -> Self {
Box::new(ThreadedParser::with_threads(stream, threads))
}
fn ordered(&mut self, ordered: bool) -> &mut Self {
(**self).ordered(ordered);
self
}
fn into_inner(self) -> B {
(*self).into_inner()
}
}
impl<B: BufRead> TryFrom<ThreadedParser<B>> for OboDoc {
type Error = Error;
fn try_from(mut reader: ThreadedParser<B>) -> Result<Self, Self::Error> {
OboDoc::try_from(&mut reader)
}
}
impl<B: BufRead> TryFrom<&mut ThreadedParser<B>> for OboDoc {
type Error = Error;
fn try_from(reader: &mut ThreadedParser<B>) -> Result<Self, Self::Error> {
let header = reader.next().unwrap()?.into_header_frame().unwrap();
let entities = reader
.map(|r| r.map(|f| f.into_entity_frame().unwrap()))
.collect::<Result<Vec<EntityFrame>, Error>>()?;
Ok(OboDoc::with_header(header).and_entities(entities))
}
}
impl<B: BufRead> TryFrom<Box<ThreadedParser<B>>> for OboDoc {
type Error = Error;
fn try_from(mut reader: Box<ThreadedParser<B>>) -> Result<Self, Self::Error> {
OboDoc::try_from(&mut (*reader))
}
}
impl From<File> for ThreadedParser<BufReader<File>> {
fn from(f: File) -> Self {
Self::new(BufReader::new(f))
}
}
impl From<File> for Box<ThreadedParser<BufReader<File>>> {
fn from(f: File) -> Self {
Box::new(ThreadedParser::new(BufReader::new(f)))
}
}