fastobo/parser/threaded/
mod.rs1use std::collections::HashMap;
2use std::convert::TryFrom;
3use std::fs::File;
4use std::io::BufRead;
5use std::io::BufReader;
6use std::iter::Iterator;
7use std::num::NonZeroUsize;
8use std::sync::Arc;
9
10use crossbeam_channel::Receiver;
11use crossbeam_channel::Select;
12use crossbeam_channel::Sender;
13use crossbeam_channel::TryRecvError;
14use lazy_static::lazy_static;
15
16use crate::ast::EntityFrame;
17use crate::ast::Frame;
18use crate::ast::HeaderClause;
19use crate::ast::HeaderFrame;
20use crate::ast::OboDoc;
21use crate::error::Error;
22use crate::error::SyntaxError;
23use crate::error::ThreadingError;
24use crate::syntax::Lexer;
25use crate::syntax::Rule;
26
27use super::Cache;
28use super::FromPair;
29use super::Parser;
30
31use self::consumer::Consumer;
32use self::consumer::Input as ConsumerInput;
33use self::consumer::Output as ConsumerOutput;
34
35mod consumer;
36
37#[derive(PartialEq, Eq)]
41enum State {
42 Idle,
43 Started,
44 AtEof,
45 Waiting,
46 Finished,
47}
48
49#[cfg_attr(feature = "_doc", doc(cfg(feature = "threading")))]
53pub struct ThreadedParser<B: BufRead> {
54 stream: B,
56 state: State,
58 consumers: Vec<Consumer>,
60
61 r_item: Receiver<ConsumerOutput>,
63 s_text: Sender<Option<ConsumerInput>>,
64
65 line: String,
67
68 threads: NonZeroUsize,
70
71 line_offset: usize,
73 offset: usize,
74
75 ordered: bool,
77 read_index: usize,
78 sent_index: usize,
79
80 queue: HashMap<usize, Result<Frame, Error>>,
82}
83
84impl<B: BufRead> AsRef<B> for ThreadedParser<B> {
85 fn as_ref(&self) -> &B {
86 &self.stream
87 }
88}
89
90impl<B: BufRead> AsRef<B> for Box<ThreadedParser<B>> {
91 fn as_ref(&self) -> &B {
92 (**self).as_ref()
93 }
94}
95
96impl<B: BufRead> AsMut<B> for ThreadedParser<B> {
97 fn as_mut(&mut self) -> &mut B {
98 &mut self.stream
99 }
100}
101
102impl<B: BufRead> AsMut<B> for Box<ThreadedParser<B>> {
103 fn as_mut(&mut self) -> &mut B {
104 (**self).as_mut()
105 }
106}
107
108impl<B: BufRead> From<B> for ThreadedParser<B> {
109 fn from(reader: B) -> Self {
110 <Self as Parser<B>>::new(reader)
111 }
112}
113
114impl<B: BufRead> From<B> for Box<ThreadedParser<B>> {
115 fn from(reader: B) -> Self {
116 Box::new(ThreadedParser::new(reader))
117 }
118}
119
120impl<B: BufRead> Iterator for ThreadedParser<B> {
121 type Item = Result<Frame, Error>;
122
123 fn next(&mut self) -> Option<Self::Item> {
124 macro_rules! send_or_error {
125 ($channel:expr, $msg:expr) => {
126 if $channel.send($msg).is_err() {
127 self.state = State::Finished;
128 let err = ThreadingError::DisconnectedChannel;
129 return Some(Err(Error::from(err)));
130 }
131 };
132 }
133
134 loop {
135 if self.ordered {
137 if let Some(result) = self.queue.remove(&self.read_index) {
138 self.read_index += 1;
139 return Some(result);
140 }
141 }
142
143 match self.r_item.try_recv().map(|i| (i.res, i.index)) {
145 Ok((Ok(entry), _)) if !self.ordered => return Some(Ok(entry)),
147 Ok((Err(e), _)) if !self.ordered => {
149 self.state = State::Finished;
150 return Some(Err(e));
151 }
152 Ok((result, index)) if index == self.read_index => {
154 self.read_index += 1;
155 return Some(result);
156 }
157 Ok((result, index)) => {
159 self.queue.insert(index, result);
160 }
161 Err(TryRecvError::Empty) if self.state == State::Waiting => {
163 self.state = State::Finished;
164 return None;
165 }
166 Err(TryRecvError::Empty) => (),
168 Err(TryRecvError::Disconnected) => {
170 if self.state != State::Finished {
171 self.state = State::Finished;
172 return Some(Err(Error::from(ThreadingError::DisconnectedChannel)));
173 }
174 }
175 }
176
177 match self.state {
179 State::Waiting => {
180 let mut select = Select::new();
181 select.recv(&self.r_item);
182 select.ready();
183 }
184 State::AtEof => {
185 self.state = State::Waiting;
186 for consumer in self.consumers.iter_mut() {
187 consumer.join().unwrap();
188 }
189 }
190 State::Idle => {
191 self.state = State::Started;
192 for consumer in &mut self.consumers {
193 consumer.start();
194 }
195 }
196 State::Finished => {
197 return None;
198 }
199 State::Started => {
200 let mut lines = String::new();
202 let mut l: &str;
203 let mut local_line_offset = 0;
204 let mut local_offset = 0;
205
206 loop {
207 lines.push_str(&self.line);
209 self.line.clear();
210
211 if let Err(e) = self.stream.read_line(&mut self.line) {
213 self.state = State::Finished;
214 return Some(Err(Error::from(e)));
215 }
216
217 l = self.line.trim_start();
219 if l.starts_with('[') {
220 let msg = ConsumerInput::new(
222 lines,
223 self.sent_index,
224 self.line_offset,
225 self.offset,
226 );
227 send_or_error!(self.s_text, Some(msg));
228 self.sent_index += 1;
230 self.line_offset += local_line_offset + 1;
231 self.offset += local_offset + self.line.len();
232 break;
233 } else if self.line.is_empty() {
234 self.state = State::AtEof;
236 if !lines.chars().all(|c| c.is_whitespace()) {
238 let msg = ConsumerInput::new(
239 lines,
240 self.sent_index,
241 self.line_offset,
242 self.offset,
243 );
244 send_or_error!(self.s_text, Some(msg));
245 }
246 for _ in 0..self.threads.get() {
248 send_or_error!(self.s_text, None);
249 }
250 break;
251 }
252
253 local_line_offset += 1;
255 local_offset += self.line.len();
256 }
257 }
258 }
259 }
260 }
261}
262
263impl<B: BufRead> Parser<B> for ThreadedParser<B> {
264 fn new(stream: B) -> Self {
269 lazy_static! {
270 static ref THREADS: usize = num_cpus::get();
271 }
272 let threads = unsafe { NonZeroUsize::new_unchecked(*THREADS) };
273 Self::with_threads(stream, threads)
274 }
275
276 fn with_threads(mut stream: B, threads: NonZeroUsize) -> Self {
278 let mut frame_clauses = Vec::new();
280 let mut line = String::new();
281 let mut l: &str;
282 let mut offset = 0;
283 let mut line_offset = 0;
284 let interner = Arc::new(Cache::default());
285
286 let (s_text, r_text) = crossbeam_channel::unbounded();
288 let (s_item, r_item) = crossbeam_channel::unbounded();
289
290 let header = loop {
292 line.clear();
294 if let Err(e) = stream.read_line(&mut line) {
295 break Err(Error::from(e));
296 };
297 l = line.trim_start();
298
299 if !l.starts_with('[') && !l.is_empty() {
301 let clause = Lexer::tokenize(Rule::HeaderClause, &line)
303 .map_err(SyntaxError::from)
304 .map(|mut p| p.next().unwrap())
305 .and_then(|p| HeaderClause::from_pair(p, &interner))
306 .map_err(SyntaxError::from);
307 match clause {
309 Ok(c) => frame_clauses.push(c),
310 Err(e) => {
311 let err = e.with_offsets(line_offset, offset);
312 break Err(Error::from(err));
313 }
314 };
315 }
316
317 if l.starts_with('[') || line.is_empty() {
319 break Ok(Frame::from(HeaderFrame::from(frame_clauses)));
320 } else {
321 line_offset += 1;
322 offset += line.len();
323 }
324 };
325
326 let mut consumers = Vec::with_capacity(threads.get());
328 for _ in 0..threads.get() {
329 let c = Consumer::new(r_text.clone(), s_item.clone(), interner.clone());
330 consumers.push(c);
331 }
332
333 s_item.send(ConsumerOutput::new(header, 0)).ok();
335
336 Self {
338 stream,
339 r_item,
340 s_text,
341 threads,
342 consumers,
343 line,
344 line_offset,
345 offset,
346 ordered: false,
347 read_index: 0,
348 sent_index: 1,
349 queue: HashMap::new(),
350 state: State::Idle,
351 }
352 }
353
354 fn ordered(&mut self, ordered: bool) -> &mut Self {
359 self.ordered = ordered;
360 self
361 }
362
363 fn into_inner(self) -> B {
365 self.stream
366 }
367}
368
369impl<B: BufRead> Parser<B> for Box<ThreadedParser<B>> {
370 fn new(stream: B) -> Self {
371 Box::new(ThreadedParser::new(stream))
372 }
373
374 fn with_threads(stream: B, threads: NonZeroUsize) -> Self {
375 Box::new(ThreadedParser::with_threads(stream, threads))
376 }
377
378 fn ordered(&mut self, ordered: bool) -> &mut Self {
379 (**self).ordered(ordered);
380 self
381 }
382
383 fn into_inner(self) -> B {
384 (*self).into_inner()
385 }
386}
387
388impl<B: BufRead> TryFrom<ThreadedParser<B>> for OboDoc {
389 type Error = Error;
390 fn try_from(mut reader: ThreadedParser<B>) -> Result<Self, Self::Error> {
391 OboDoc::try_from(&mut reader)
392 }
393}
394
395impl<B: BufRead> TryFrom<&mut ThreadedParser<B>> for OboDoc {
396 type Error = Error;
397 fn try_from(reader: &mut ThreadedParser<B>) -> Result<Self, Self::Error> {
398 let header = reader.next().unwrap()?.into_header().unwrap();
400
401 let entities = reader
403 .map(|r| r.map(|f| f.into_entity().unwrap()))
404 .collect::<Result<Vec<EntityFrame>, Error>>()?;
405
406 Ok(OboDoc::with_header(header).and_entities(entities))
408 }
409}
410
411impl<B: BufRead> TryFrom<Box<ThreadedParser<B>>> for OboDoc {
412 type Error = Error;
413 fn try_from(mut reader: Box<ThreadedParser<B>>) -> Result<Self, Self::Error> {
414 OboDoc::try_from(&mut (*reader))
415 }
416}
417
418impl From<File> for ThreadedParser<BufReader<File>> {
419 fn from(f: File) -> Self {
420 Self::new(BufReader::new(f))
421 }
422}
423
424impl From<File> for Box<ThreadedParser<BufReader<File>>> {
425 fn from(f: File) -> Self {
426 Box::new(ThreadedParser::new(BufReader::new(f)))
427 }
428}