fastobo/parser/threaded/
mod.rs

1use std::collections::HashMap;
2use std::convert::TryFrom;
3use std::fs::File;
4use std::io::BufRead;
5use std::io::BufReader;
6use std::iter::Iterator;
7use std::num::NonZeroUsize;
8use std::sync::Arc;
9
10use crossbeam_channel::Receiver;
11use crossbeam_channel::Select;
12use crossbeam_channel::Sender;
13use crossbeam_channel::TryRecvError;
14use lazy_static::lazy_static;
15
16use crate::ast::EntityFrame;
17use crate::ast::Frame;
18use crate::ast::HeaderClause;
19use crate::ast::HeaderFrame;
20use crate::ast::OboDoc;
21use crate::error::Error;
22use crate::error::SyntaxError;
23use crate::error::ThreadingError;
24use crate::syntax::Lexer;
25use crate::syntax::Rule;
26
27use super::Cache;
28use super::FromPair;
29use super::Parser;
30
31use self::consumer::Consumer;
32use self::consumer::Input as ConsumerInput;
33use self::consumer::Output as ConsumerOutput;
34
35mod consumer;
36
37// ---
38
39/// The state of a `ThreadedParser` instance.
40#[derive(PartialEq, Eq)]
41enum State {
42    Idle,
43    Started,
44    AtEof,
45    Waiting,
46    Finished,
47}
48
49// ---
50
51/// An iterator reading entity frames contained in an OBO stream in parallel.
52#[cfg_attr(feature = "_doc", doc(cfg(feature = "threading")))]
53pub struct ThreadedParser<B: BufRead> {
54    // the reader
55    stream: B,
56    // the state of the parser
57    state: State,
58    // the consumer threads
59    consumers: Vec<Consumer>,
60
61    // communication channels
62    r_item: Receiver<ConsumerOutput>,
63    s_text: Sender<Option<ConsumerInput>>,
64
65    /// Buffer for the last line that was read.
66    line: String,
67
68    /// Number of threads requested by the user
69    threads: NonZeroUsize,
70
71    /// Offsets to report proper error position
72    line_offset: usize,
73    offset: usize,
74
75    /// Local progress counters
76    ordered: bool,
77    read_index: usize,
78    sent_index: usize,
79
80    /// Result queue to maintain frame order if in ordered mode.
81    queue: HashMap<usize, Result<Frame, Error>>,
82}
83
84impl<B: BufRead> AsRef<B> for ThreadedParser<B> {
85    fn as_ref(&self) -> &B {
86        &self.stream
87    }
88}
89
90impl<B: BufRead> AsRef<B> for Box<ThreadedParser<B>> {
91    fn as_ref(&self) -> &B {
92        (**self).as_ref()
93    }
94}
95
96impl<B: BufRead> AsMut<B> for ThreadedParser<B> {
97    fn as_mut(&mut self) -> &mut B {
98        &mut self.stream
99    }
100}
101
102impl<B: BufRead> AsMut<B> for Box<ThreadedParser<B>> {
103    fn as_mut(&mut self) -> &mut B {
104        (**self).as_mut()
105    }
106}
107
108impl<B: BufRead> From<B> for ThreadedParser<B> {
109    fn from(reader: B) -> Self {
110        <Self as Parser<B>>::new(reader)
111    }
112}
113
114impl<B: BufRead> From<B> for Box<ThreadedParser<B>> {
115    fn from(reader: B) -> Self {
116        Box::new(ThreadedParser::new(reader))
117    }
118}
119
120impl<B: BufRead> Iterator for ThreadedParser<B> {
121    type Item = Result<Frame, Error>;
122
123    fn next(&mut self) -> Option<Self::Item> {
124        macro_rules! send_or_error {
125            ($channel:expr, $msg:expr) => {
126                if $channel.send($msg).is_err() {
127                    self.state = State::Finished;
128                    let err = ThreadingError::DisconnectedChannel;
129                    return Some(Err(Error::from(err)));
130                }
131            };
132        }
133
134        loop {
135            // return and item from the queue if in ordered mode
136            if self.ordered {
137                if let Some(result) = self.queue.remove(&self.read_index) {
138                    self.read_index += 1;
139                    return Some(result);
140                }
141            }
142
143            // poll for parsed frames to return
144            match self.r_item.try_recv().map(|i| (i.res, i.index)) {
145                // item is found, don't care about order: simply return it
146                Ok((Ok(entry), _)) if !self.ordered => return Some(Ok(entry)),
147                // error is found: finalize and return it
148                Ok((Err(e), _)) if !self.ordered => {
149                    self.state = State::Finished;
150                    return Some(Err(e));
151                }
152                // item is found and is the right index: return it
153                Ok((result, index)) if index == self.read_index => {
154                    self.read_index += 1;
155                    return Some(result);
156                }
157                // item is found but is not the right index: store it
158                Ok((result, index)) => {
159                    self.queue.insert(index, result);
160                }
161                // empty queue after all the threads were joined: we are done
162                Err(TryRecvError::Empty) if self.state == State::Waiting => {
163                    self.state = State::Finished;
164                    return None;
165                }
166                // empty queue in any other state: just do something else
167                Err(TryRecvError::Empty) => (),
168                // queue was disconnected: stop and return an error
169                Err(TryRecvError::Disconnected) => {
170                    if self.state != State::Finished {
171                        self.state = State::Finished;
172                        return Some(Err(Error::from(ThreadingError::DisconnectedChannel)));
173                    }
174                }
175            }
176
177            // depending on the state, do something before polling
178            match self.state {
179                State::Waiting => {
180                    let mut select = Select::new();
181                    select.recv(&self.r_item);
182                    select.ready();
183                }
184                State::AtEof => {
185                    self.state = State::Waiting;
186                    for consumer in self.consumers.iter_mut() {
187                        consumer.join().unwrap();
188                    }
189                }
190                State::Idle => {
191                    self.state = State::Started;
192                    for consumer in &mut self.consumers {
193                        consumer.start();
194                    }
195                }
196                State::Finished => {
197                    return None;
198                }
199                State::Started => {
200                    //
201                    let mut lines = String::new();
202                    let mut l: &str;
203                    let mut local_line_offset = 0;
204                    let mut local_offset = 0;
205
206                    loop {
207                        // store the previous line and process the next line
208                        lines.push_str(&self.line);
209                        self.line.clear();
210
211                        // read the next line
212                        if let Err(e) = self.stream.read_line(&mut self.line) {
213                            self.state = State::Finished;
214                            return Some(Err(Error::from(e)));
215                        }
216
217                        // check if we reached the end of the frame
218                        l = self.line.trim_start();
219                        if l.starts_with('[') {
220                            // send the entire frame with the location offsets
221                            let msg = ConsumerInput::new(
222                                lines,
223                                self.sent_index,
224                                self.line_offset,
225                                self.offset,
226                            );
227                            send_or_error!(self.s_text, Some(msg));
228                            // update the local offsets and bail out
229                            self.sent_index += 1;
230                            self.line_offset += local_line_offset + 1;
231                            self.offset += local_offset + self.line.len();
232                            break;
233                        } else if self.line.is_empty() {
234                            // change the state to wait for workers to finish
235                            self.state = State::AtEof;
236                            // if some lines remain, send them as text
237                            if !lines.chars().all(|c| c.is_whitespace()) {
238                                let msg = ConsumerInput::new(
239                                    lines,
240                                    self.sent_index,
241                                    self.line_offset,
242                                    self.offset,
243                                );
244                                send_or_error!(self.s_text, Some(msg));
245                            }
246                            // poison-pill the remaining workers and bail out
247                            for _ in 0..self.threads.get() {
248                                send_or_error!(self.s_text, None);
249                            }
250                            break;
251                        }
252
253                        // Update local offsets
254                        local_line_offset += 1;
255                        local_offset += self.line.len();
256                    }
257                }
258            }
259        }
260    }
261}
262
263impl<B: BufRead> Parser<B> for ThreadedParser<B> {
264    /// Create a new `ThreadedParser` with all available CPUs.
265    ///
266    /// The number of available CPUs will be polled at runtime and then the
267    /// right number of threads will be spawned accordingly.
268    fn new(stream: B) -> Self {
269        lazy_static! {
270            static ref THREADS: usize = num_cpus::get();
271        }
272        let threads = unsafe { NonZeroUsize::new_unchecked(*THREADS) };
273        Self::with_threads(stream, threads)
274    }
275
276    /// Create a new `ThreadedParser` with the given number of threads.
277    fn with_threads(mut stream: B, threads: NonZeroUsize) -> Self {
278        // create the buffers and counters
279        let mut frame_clauses = Vec::new();
280        let mut line = String::new();
281        let mut l: &str;
282        let mut offset = 0;
283        let mut line_offset = 0;
284        let interner = Arc::new(Cache::default());
285
286        // create the communication channels
287        let (s_text, r_text) = crossbeam_channel::unbounded();
288        let (s_item, r_item) = crossbeam_channel::unbounded();
289
290        // read until we reach the first entity frame
291        let header = loop {
292            // Read the next line
293            line.clear();
294            if let Err(e) = stream.read_line(&mut line) {
295                break Err(Error::from(e));
296            };
297            l = line.trim_start();
298
299            // if the line is not empty, parse it
300            if !l.starts_with('[') && !l.is_empty() {
301                // parse the header clause
302                let clause = Lexer::tokenize(Rule::HeaderClause, &line)
303                    .map_err(SyntaxError::from)
304                    .map(|mut p| p.next().unwrap())
305                    .and_then(|p| HeaderClause::from_pair(p, &interner))
306                    .map_err(SyntaxError::from);
307                // check if the clause was parsed properly or not
308                match clause {
309                    Ok(c) => frame_clauses.push(c),
310                    Err(e) => {
311                        let err = e.with_offsets(line_offset, offset);
312                        break Err(Error::from(err));
313                    }
314                };
315            }
316
317            // if the line is the beginning of an entity frame, stop
318            if l.starts_with('[') || line.is_empty() {
319                break Ok(Frame::from(HeaderFrame::from(frame_clauses)));
320            } else {
321                line_offset += 1;
322                offset += line.len();
323            }
324        };
325
326        // create the consumers
327        let mut consumers = Vec::with_capacity(threads.get());
328        for _ in 0..threads.get() {
329            let c = Consumer::new(r_text.clone(), s_item.clone(), interner.clone());
330            consumers.push(c);
331        }
332
333        // send the header to the channel (to get it back immediately after)
334        s_item.send(ConsumerOutput::new(header, 0)).ok();
335
336        // return the parser
337        Self {
338            stream,
339            r_item,
340            s_text,
341            threads,
342            consumers,
343            line,
344            line_offset,
345            offset,
346            ordered: false,
347            read_index: 0,
348            sent_index: 1,
349            queue: HashMap::new(),
350            state: State::Idle,
351        }
352    }
353
354    /// Make the parser yield frames in the order they appear in the document.
355    ///
356    /// Note that this has a small performance impact, so this is disabled
357    /// by default.
358    fn ordered(&mut self, ordered: bool) -> &mut Self {
359        self.ordered = ordered;
360        self
361    }
362
363    /// Consume the reader and extract the internal reader.
364    fn into_inner(self) -> B {
365        self.stream
366    }
367}
368
369impl<B: BufRead> Parser<B> for Box<ThreadedParser<B>> {
370    fn new(stream: B) -> Self {
371        Box::new(ThreadedParser::new(stream))
372    }
373
374    fn with_threads(stream: B, threads: NonZeroUsize) -> Self {
375        Box::new(ThreadedParser::with_threads(stream, threads))
376    }
377
378    fn ordered(&mut self, ordered: bool) -> &mut Self {
379        (**self).ordered(ordered);
380        self
381    }
382
383    fn into_inner(self) -> B {
384        (*self).into_inner()
385    }
386}
387
388impl<B: BufRead> TryFrom<ThreadedParser<B>> for OboDoc {
389    type Error = Error;
390    fn try_from(mut reader: ThreadedParser<B>) -> Result<Self, Self::Error> {
391        OboDoc::try_from(&mut reader)
392    }
393}
394
395impl<B: BufRead> TryFrom<&mut ThreadedParser<B>> for OboDoc {
396    type Error = Error;
397    fn try_from(reader: &mut ThreadedParser<B>) -> Result<Self, Self::Error> {
398        // extract the header and create the doc
399        let header = reader.next().unwrap()?.into_header().unwrap();
400
401        // extract the remaining entities
402        let entities = reader
403            .map(|r| r.map(|f| f.into_entity().unwrap()))
404            .collect::<Result<Vec<EntityFrame>, Error>>()?;
405
406        // return the doc
407        Ok(OboDoc::with_header(header).and_entities(entities))
408    }
409}
410
411impl<B: BufRead> TryFrom<Box<ThreadedParser<B>>> for OboDoc {
412    type Error = Error;
413    fn try_from(mut reader: Box<ThreadedParser<B>>) -> Result<Self, Self::Error> {
414        OboDoc::try_from(&mut (*reader))
415    }
416}
417
418impl From<File> for ThreadedParser<BufReader<File>> {
419    fn from(f: File) -> Self {
420        Self::new(BufReader::new(f))
421    }
422}
423
424impl From<File> for Box<ThreadedParser<BufReader<File>>> {
425    fn from(f: File) -> Self {
426        Box::new(ThreadedParser::new(BufReader::new(f)))
427    }
428}