smt_scope/parsers/
mod.rs

1use crate::FResult;
2use crate::FatalError;
3
4pub use self::wrapper_async_parser::*;
5pub use self::wrapper_stream_parser::*;
6use futures::{AsyncBufRead, AsyncBufReadExt, AsyncRead};
7use std::fmt::Debug;
8use std::fs::{File, Metadata};
9use std::io::{BufRead, BufReader};
10use std::path::Path;
11use std::time::Duration;
12use wasm_timer::Instant;
13
14pub mod z3;
15
16#[cfg(not(feature = "mem_dbg"))]
17pub trait LogParserHelper: Default {}
18#[cfg(not(feature = "mem_dbg"))]
19impl<T: Default> LogParserHelper for T {}
20
21#[cfg(feature = "mem_dbg")]
22pub trait LogParserHelper: Default + mem_dbg::MemDbg + mem_dbg::MemSize {}
23#[cfg(feature = "mem_dbg")]
24impl<T: Default + mem_dbg::MemDbg + mem_dbg::MemSize> LogParserHelper for T {}
25
26/// Trait for a generic SMT solver trace parser. Intended to support different
27/// solvers or log formats.
28pub trait LogParser: LogParserHelper {
29    /// Can be used to allow for parsing entries across multiple lines.
30    fn is_line_start(&mut self, _first_byte: u8) -> bool {
31        true
32    }
33
34    /// Process a single line of the log file. Return `true` if parsing should
35    /// continue, or `false` if parsing should stop.
36    fn process_line(&mut self, line: &str, line_no: usize) -> FResult<bool>;
37
38    fn end_of_file(&mut self);
39
40    /// Creates a new parser. Only use this if you cannot use the following
41    /// convenience methods:
42    /// - [`new_file`] for creating a streaming parser from a file path
43    /// - [`new_str`] or [`new_string`] for creating a parser from a strings
44    fn from<'r, R: BufRead + 'r>(reader: R) -> StreamParser<'r, Self> {
45        StreamParser::new(reader)
46    }
47    /// Creates a new async parser from an async buffered reader. The parser
48    /// will read rom the reader line-by-line.
49    fn from_async<'r, R: AsyncBufRead + Unpin + 'r>(reader: R) -> AsyncParser<'r, Self> {
50        AsyncParser::new(reader)
51    }
52
53    /// Creates a new parser from the contents of a log file. The string
54    /// argument must live as long as parsing is ongoing. Release this
55    /// constraint by using [`take_parser`](StreamParser::take_parser) to end
56    /// parsing. If you want the parser to take ownership of the string instead
57    /// (i.e. you are running into lifetime issues), use
58    /// [`from_string`](Self::from_string) instead.
59    fn from_str(s: &str) -> StreamParser<'_, Self> {
60        s.as_bytes().into_parser()
61    }
62
63    /// See [`from_str`](LogParser::from_str) and [`process_all`](StreamParser::process_all).
64    fn from_str_all(s: &str) -> Result<Self, FatalError> {
65        Self::from_str(s).process_all()
66    }
67
68    /// Creates a new parser from the contents of a log file. The parser takes
69    /// ownership over the string.
70    fn from_string(s: String) -> StreamParser<'static, Self> {
71        s.into_cursor().into_parser()
72    }
73
74    /// See [`from_string`](LogParser::from_string) and [`process_all`](StreamParser::process_all).
75    fn from_string_all(s: String) -> Result<Self, FatalError> {
76        Self::from_string(s).process_all()
77    }
78
79    /// Creates a new streaming parser from a file. Additionally returns the
80    /// file metadata so that the progress can be calculated from the file size.
81    ///
82    /// This method is an alternative to
83    /// `from_string(fs::read_to_string(self)?)`. This approach to parsing is
84    /// ~5% slower, but should use only ~50% as much memory due to not having
85    /// the entire loaded String in memory.
86    fn from_file<P: AsRef<Path>>(p: P) -> std::io::Result<(Metadata, StreamParser<'static, Self>)> {
87        let (meta, reader) = p.read_open()?;
88        Ok((meta, reader.into_parser()))
89    }
90
91    /// See [`from_file`](LogParser::from_file) and [`process_all`](StreamParser::process_all).
92    fn from_file_all<P: AsRef<Path>>(p: P) -> Result<Self, FatalError> {
93        let (_, parser) = Self::from_file(p)?;
94        parser.process_all()
95    }
96}
97
98////////////////////
99// Parser Creation
100////////////////////
101
102pub trait IntoStreamParser<'r> {
103    /// Creates a new parser from a buffered reader.
104    fn into_parser<Parser: LogParser>(self) -> StreamParser<'r, Parser>;
105}
106impl<'r, R: BufRead + 'r> IntoStreamParser<'r> for R {
107    fn into_parser<Parser: LogParser>(self) -> StreamParser<'r, Parser> {
108        Parser::from(self)
109    }
110}
111
112pub trait CursorRead: AsRef<[u8]> + Sized {
113    /// Turns any `[u8]` data such as a [`String`] into a
114    /// [`Cursor`](std::io::Cursor) which implements
115    /// [`BufRead`](std::io::BufRead). Intended to be chained with
116    /// [`into_parser`](IntoStreamParser::into_parser).
117    fn into_cursor(self) -> std::io::Cursor<Self> {
118        std::io::Cursor::new(self)
119    }
120}
121impl<T: AsRef<[u8]>> CursorRead for T {}
122
123pub trait FileRead: AsRef<Path> + Sized {
124    /// Opens a file and returns a buffered reader and the file's metadata. A
125    /// more memory efficient alternative to
126    /// `fs::read_to_string(self)?.into_cursor()`.
127    fn read_open(self) -> std::io::Result<(Metadata, BufReader<File>)> {
128        let file = File::open(self)?;
129        let metadata = file.metadata()?;
130        let reader = BufReader::new(file);
131        Ok((metadata, reader))
132    }
133}
134impl<T: AsRef<Path>> FileRead for T {}
135
136pub trait IntoAsyncParser<'r> {
137    /// Creates a new parser from an async buffered reader.
138    fn into_async_parser<Parser: LogParser>(self) -> AsyncParser<'r, Parser>;
139}
140impl<'r, R: AsyncBufRead + Unpin + 'r> IntoAsyncParser<'r> for R {
141    fn into_async_parser<Parser: LogParser>(self) -> AsyncParser<'r, Parser> {
142        Parser::from_async(self)
143    }
144}
145
146pub trait AsyncBufferRead: AsyncRead + Sized {
147    /// Buffers any [`AsyncRead`](futures::io::AsyncRead) stream into a
148    /// [`BufAsyncRead`](futures::io::BufReader) stream. Do not use this if the
149    /// underlying stream already implements
150    /// [`BufAsyncRead`](futures::io::BufReader).
151    fn buffer(self) -> futures::io::BufReader<Self> {
152        futures::io::BufReader::new(self)
153    }
154}
155impl<T: AsyncRead> AsyncBufferRead for T {}
156
157pub trait AsyncCursorRead: AsRef<[u8]> + Unpin + Sized {
158    /// Turns any `[u8]` data such as a [`String`] into a async
159    /// [`Cursor`](futures::io::Cursor) which implements
160    /// [`AsyncBufRead`](futures::io::AsyncBufRead). Intended to be chained with
161    /// [`into_async_parser`](IntoAsyncParser::into_async_parser).
162    fn into_async_cursor(self) -> futures::io::Cursor<Self> {
163        futures::io::Cursor::new(self)
164    }
165}
166impl<T: AsRef<[u8]> + Unpin> AsyncCursorRead for T {}
167
168////////////////////
169// Parser Execution
170////////////////////
171
172#[derive(Debug, Clone)]
173pub enum ParseState<T> {
174    Paused(T, ReaderState),
175    Completed { end_of_stream: bool },
176    Error(FatalError),
177}
178
179impl<T> ParseState<T> {
180    pub fn is_timeout(&self) -> bool {
181        matches!(self, Self::Paused(..))
182    }
183    pub fn error(&self) -> Option<&FatalError> {
184        match self {
185            Self::Error(err) => Some(err),
186            _ => None,
187        }
188    }
189
190    fn map<U>(self, f: impl FnOnce(T) -> U) -> ParseState<U> {
191        match self {
192            Self::Paused(t, rs) => ParseState::Paused(f(t), rs),
193            Self::Completed { end_of_stream } => ParseState::Completed { end_of_stream },
194            Self::Error(err) => ParseState::Error(err),
195        }
196    }
197}
198
199/// Progress information for a parser.
200#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
201pub struct ReaderState {
202    /// The number of bytes parsed so far.
203    pub bytes_read: usize,
204    /// The number of lines parsed so far.
205    pub lines_read: usize,
206}
207
208#[duplicate::duplicate_item(
209    EitherParser   ReadBound                   async   add_await(code);
210    [StreamParser] [BufRead + 'r]              []      [code];
211    [AsyncParser]  [AsyncBufRead + Unpin + 'r] [async] [code.await];
212)]
213mod wrapper {
214    use super::*;
215    enum Never {}
216
217    /// Struct which contains both a parser state as well as the stream of lines
218    /// which are being parsed. Always use this instead of the raw underlying
219    /// parser. Feeds the parser line-by-line with a callback to indicate if or
220    /// when to pause. Supports any parser as long as it implements
221    /// [`LogParser`].
222    pub struct EitherParser<'r, Parser: LogParser> {
223        reader: Option<Box<dyn ReadBound>>,
224        reader_state: ReaderState,
225        parser: Parser,
226    }
227    impl<'r, Parser: LogParser, R: ReadBound> From<R> for EitherParser<'r, Parser> {
228        fn from(reader: R) -> Self {
229            Self::new(reader)
230        }
231    }
232    impl<'r, Parser: LogParser> EitherParser<'r, Parser> {
233        pub(super) fn new(reader: impl ReadBound) -> Self {
234            Self {
235                reader: Some(Box::new(reader)),
236                reader_state: ReaderState::default(),
237                parser: Parser::default(),
238            }
239        }
240
241        /// Get the current parser state.
242        pub fn parser(&self) -> &Parser {
243            &self.parser
244        }
245        /// Get the current parser state.
246        pub fn take_parser(mut self) -> Parser {
247            if let Some(reader) = self.reader.take() {
248                drop(reader);
249                self.parser.end_of_file();
250            }
251            self.parser
252        }
253        /// Get the current reader progress.
254        pub fn reader_state(&self) -> ReaderState {
255            self.reader_state
256        }
257        /// Have we finished parsing?
258        pub fn is_done(&self) -> bool {
259            self.reader.is_none()
260        }
261
262        async fn process_line(
263            reader: &mut Box<dyn ReadBound>,
264            reader_state: &mut ReaderState,
265            parser: &mut Parser,
266            buf: &mut String,
267        ) -> Result<Option<bool>, FatalError> {
268            buf.clear();
269            // Read line
270            let mut bytes_read = 0;
271
272            loop {
273                bytes_read += add_await([reader.read_line(buf)])?;
274                reader_state.lines_read += 1;
275                let peek = add_await([reader.fill_buf()])?;
276                // Stop reading if this is the end or we don't have a multiline.
277                if peek.is_empty() || parser.is_line_start(peek[0]) {
278                    break;
279                }
280            }
281            if bytes_read == 0 {
282                return Ok(Some(true));
283            }
284            // Remove newline from end
285            if buf.ends_with('\n') {
286                buf.pop();
287                if buf.ends_with('\r') {
288                    buf.pop();
289                }
290            } else {
291                // Ignore the last line in a file if it doesn't end with a
292                // newline, this happens if the process writing to it was
293                // killed.
294                return Ok(Some(true));
295            }
296
297            // Parse line
298            reader_state.bytes_read += bytes_read;
299            let stop_parsing = !parser.process_line(buf, reader_state.lines_read)?;
300            Ok(stop_parsing.then_some(false))
301        }
302
303        /// Parse the the input while calling the `predicate` callback after
304        /// each line. Keep parsing until the callback returns `Some(t)` or we
305        /// reach the end of the input. If we stopped due to the callback,
306        /// return the current progress as `ParseState::Paused(t, state)`. After
307        /// this function returns, use [`parser`](Self::parser) to retrieve the
308        /// parser state.
309        ///
310        /// The `predicate` callback should aim to return quickly since **it is
311        /// called between each line!** If heavier processing is required
312        /// consider using [`process_check_every`] or [`process_until_every`].
313        pub async fn process_until<T>(
314            &mut self,
315            mut predicate: impl FnMut(&Parser, ReaderState) -> Option<T>,
316        ) -> ParseState<T> {
317            let Some(reader) = self.reader.as_mut() else {
318                return ParseState::Completed {
319                    end_of_stream: true,
320                };
321            };
322            let mut buf = String::new();
323            loop {
324                if let Some(t) = predicate(&self.parser, self.reader_state) {
325                    return ParseState::Paused(t, self.reader_state);
326                }
327                let state = match add_await([Self::process_line(
328                    reader,
329                    &mut self.reader_state,
330                    &mut self.parser,
331                    &mut buf,
332                )]) {
333                    Ok(None) => continue,
334                    Ok(Some(end_of_stream)) => ParseState::Completed { end_of_stream },
335                    Err(err) => ParseState::Error(err),
336                };
337                drop(self.reader.take()); // Release file handle/free up memory
338                self.parser.end_of_file();
339                return state;
340            }
341        }
342        /// Identical to [`process_until`] except the predicate is only checked
343        /// every `lines_per_check` lines.
344        pub async fn process_until_every<T>(
345            &mut self,
346            mut predicate: impl FnMut(&Parser, ReaderState) -> Option<T>,
347            lines_per_check: usize,
348        ) -> ParseState<T> {
349            assert!(lines_per_check > 0);
350            let mut remaining = lines_per_check;
351            add_await([self.process_until(move |p, rs| {
352                remaining -= 1;
353                if remaining == 0 {
354                    remaining = lines_per_check;
355                    predicate(p, rs)
356                } else {
357                    None
358                }
359            })])
360        }
361
362        /// Parse the the input while calling the `predicate` callback every
363        /// `delta` time. Keep parsing until the callback returns `Some(t)` or
364        /// we reach the end of the input. If we stopped due to the callback,
365        /// return the current progress as `ParseState::Paused(t, state)`. After
366        /// this function returns, use [`parser`](Self::parser) to retrieve the
367        /// parser state.
368        pub async fn process_check_every<T>(
369            &mut self,
370            delta: Duration,
371            mut predicate: impl FnMut(&Parser, ReaderState) -> Option<T>,
372        ) -> ParseState<T> {
373            // Calling `Instant::now` between each line can get expensive, so
374            // we'll try to avoid it. We will try to check every
375            // `MAX_LINES_PER_TIME_VARIATION * time_left`, ensuring that we
376            // never go over time as long as the lines-per-time of the parser
377            // does not drop by more than `MAX_TIME_OVER_APPROX` between checks.
378            // The closer this is to `1`, the fewer checks we'll do.
379            const MAX_LINES_PER_TIME_VARIATION: u128 = 50;
380            let initial_lines_per_check =
381                delta.as_millis().try_into().unwrap_or(usize::MAX).max(10);
382            // How many lines must pass before we check time again?
383            let mut lines_per_check = initial_lines_per_check;
384            // How many lines until the next time check?
385            let mut next_check = lines_per_check;
386            let max_lpc = lines_per_check.saturating_mul(2);
387            let mut start = Instant::now();
388            let mut last_check_time = start;
389            add_await([self.process_until(move |p, rs| {
390                next_check -= 1;
391                if next_check == 0 {
392                    let now = Instant::now();
393                    match delta.checked_sub(now - start) {
394                        Some(time_left) if !time_left.is_zero() => {
395                            let time_left = time_left.as_nanos();
396                            let check_delta = (now - last_check_time).as_nanos();
397                            last_check_time = now;
398                            if check_delta < MAX_LINES_PER_TIME_VARIATION {
399                                lines_per_check = 1;
400                            } else {
401                                let check_delta =
402                                    check_delta.saturating_mul(MAX_LINES_PER_TIME_VARIATION);
403                                // How much smaller is `lines_per_check` than it
404                                // should be?
405                                let under_approx = (time_left / check_delta)
406                                    .try_into()
407                                    .unwrap_or(usize::MAX)
408                                    .max(1);
409                                // Do rounding up division to make sure
410                                // `over_approx > 1` as soon as `check_delta >
411                                // time_left`.
412                                let over_approx = check_delta.div_ceil(time_left);
413                                // How much larger is `lines_per_check` than it
414                                // should be?
415                                let over_approx =
416                                    over_approx.try_into().unwrap_or(usize::MAX).max(1);
417                                let new_lpc = (lines_per_check * under_approx) / over_approx;
418                                lines_per_check = new_lpc.min(max_lpc).max(1);
419                            }
420                            next_check = lines_per_check;
421                        }
422                        _ => {
423                            // Out of time, reset to initial values and call
424                            // the predicate.
425                            lines_per_check = initial_lines_per_check;
426                            next_check = lines_per_check;
427                            start = now;
428                            last_check_time = now;
429                            return predicate(p, rs);
430                        }
431                    }
432                }
433                None
434            })])
435        }
436
437        /// Parse the entire file as a stream. Using [`process_all_timeout`]
438        /// instead is recommended as this method will cause the process to hang
439        /// if given a very large file.
440        pub async fn process_all(mut self) -> FResult<Parser> {
441            match add_await([self.process_until(|_, _| None::<Never>)]) {
442                ParseState::Paused(n, _) => match n {},
443                ParseState::Completed { .. } => Ok(self.take_parser()),
444                ParseState::Error(err) => Err(err),
445            }
446        }
447        /// Try to parse everything, but stop after a given timeout. The result
448        /// tuple contains `ParseState::Paused(read_info)` if the timeout was
449        /// reached, and the parser state at the end (i.e. the state is complete
450        /// only if `ParseState::Completed` was returned).
451        ///
452        /// Parsing cannot be resumed if the timeout is reached. If you need
453        /// support for resuming, use [`process_check_every`] or
454        /// [`process_until`] instead.
455        pub async fn process_all_timeout(mut self, timeout: Duration) -> (ParseState<()>, Parser) {
456            let result = add_await([self.process_check_every(timeout, |_, _| Some(()))]);
457            (result, self.take_parser())
458        }
459        /// Try to parse everything, but stop after parsing `bytes` bytes or
460        /// `lines` lines (whichever comes first). The limit is ignored when set
461        /// to `None` (i.e. if both are `None` this is identical to
462        /// `process_all`). The result tuple contains
463        /// `ParseState::Paused(read_info)` if the limit  was reached, and the
464        /// parser state at the end (i.e. the state is complete only if
465        /// `ParseState::Completed` was returned).
466        ///
467        /// Parsing cannot be resumed if the limit is reached. If you need
468        /// support for resuming, use [`process_until`] instead.
469        pub async fn process_all_limit(
470            mut self,
471            bytes: Option<usize>,
472            lines: Option<usize>,
473        ) -> (ParseState<()>, Parser) {
474            let result = match (bytes, lines) {
475                (Some(bytes), Some(lines)) => add_await([self.process_until(|_, s| {
476                    (s.bytes_read >= bytes || s.lines_read >= lines).then_some(())
477                })]),
478                (Some(bytes), None) => {
479                    add_await([self.process_until(|_, s| (s.bytes_read >= bytes).then_some(()))])
480                }
481                (None, Some(lines)) => {
482                    add_await([self.process_until(|_, s| (s.lines_read >= lines).then_some(()))])
483                }
484                (None, None) => {
485                    add_await([self.process_until(|_, _| None::<Never>)]).map(|n| match n {})
486                }
487            };
488            (result, self.take_parser())
489        }
490    }
491}