smt_scope/parsers/mod.rs
1use crate::FResult;
2use crate::FatalError;
3
4pub use self::wrapper_async_parser::*;
5pub use self::wrapper_stream_parser::*;
6use futures::{AsyncBufRead, AsyncBufReadExt, AsyncRead};
7use std::fmt::Debug;
8use std::fs::{File, Metadata};
9use std::io::{BufRead, BufReader};
10use std::path::Path;
11use std::time::Duration;
12use wasm_timer::Instant;
13
14pub mod z3;
15
16#[cfg(not(feature = "mem_dbg"))]
17pub trait LogParserHelper: Default {}
18#[cfg(not(feature = "mem_dbg"))]
19impl<T: Default> LogParserHelper for T {}
20
21#[cfg(feature = "mem_dbg")]
22pub trait LogParserHelper: Default + mem_dbg::MemDbg + mem_dbg::MemSize {}
23#[cfg(feature = "mem_dbg")]
24impl<T: Default + mem_dbg::MemDbg + mem_dbg::MemSize> LogParserHelper for T {}
25
26/// Trait for a generic SMT solver trace parser. Intended to support different
27/// solvers or log formats.
28pub trait LogParser: LogParserHelper {
29 /// Can be used to allow for parsing entries across multiple lines.
30 fn is_line_start(&mut self, _first_byte: u8) -> bool {
31 true
32 }
33
34 /// Process a single line of the log file. Return `true` if parsing should
35 /// continue, or `false` if parsing should stop.
36 fn process_line(&mut self, line: &str, line_no: usize) -> FResult<bool>;
37
38 fn end_of_file(&mut self);
39
40 /// Creates a new parser. Only use this if you cannot use the following
41 /// convenience methods:
42 /// - [`new_file`] for creating a streaming parser from a file path
43 /// - [`new_str`] or [`new_string`] for creating a parser from a strings
44 fn from<'r, R: BufRead + 'r>(reader: R) -> StreamParser<'r, Self> {
45 StreamParser::new(reader)
46 }
47 /// Creates a new async parser from an async buffered reader. The parser
48 /// will read rom the reader line-by-line.
49 fn from_async<'r, R: AsyncBufRead + Unpin + 'r>(reader: R) -> AsyncParser<'r, Self> {
50 AsyncParser::new(reader)
51 }
52
53 /// Creates a new parser from the contents of a log file. The string
54 /// argument must live as long as parsing is ongoing. Release this
55 /// constraint by using [`take_parser`](StreamParser::take_parser) to end
56 /// parsing. If you want the parser to take ownership of the string instead
57 /// (i.e. you are running into lifetime issues), use
58 /// [`from_string`](Self::from_string) instead.
59 fn from_str(s: &str) -> StreamParser<'_, Self> {
60 s.as_bytes().into_parser()
61 }
62
63 /// See [`from_str`](LogParser::from_str) and [`process_all`](StreamParser::process_all).
64 fn from_str_all(s: &str) -> Result<Self, FatalError> {
65 Self::from_str(s).process_all()
66 }
67
68 /// Creates a new parser from the contents of a log file. The parser takes
69 /// ownership over the string.
70 fn from_string(s: String) -> StreamParser<'static, Self> {
71 s.into_cursor().into_parser()
72 }
73
74 /// See [`from_string`](LogParser::from_string) and [`process_all`](StreamParser::process_all).
75 fn from_string_all(s: String) -> Result<Self, FatalError> {
76 Self::from_string(s).process_all()
77 }
78
79 /// Creates a new streaming parser from a file. Additionally returns the
80 /// file metadata so that the progress can be calculated from the file size.
81 ///
82 /// This method is an alternative to
83 /// `from_string(fs::read_to_string(self)?)`. This approach to parsing is
84 /// ~5% slower, but should use only ~50% as much memory due to not having
85 /// the entire loaded String in memory.
86 fn from_file<P: AsRef<Path>>(p: P) -> std::io::Result<(Metadata, StreamParser<'static, Self>)> {
87 let (meta, reader) = p.read_open()?;
88 Ok((meta, reader.into_parser()))
89 }
90
91 /// See [`from_file`](LogParser::from_file) and [`process_all`](StreamParser::process_all).
92 fn from_file_all<P: AsRef<Path>>(p: P) -> Result<Self, FatalError> {
93 let (_, parser) = Self::from_file(p)?;
94 parser.process_all()
95 }
96}
97
98////////////////////
99// Parser Creation
100////////////////////
101
102pub trait IntoStreamParser<'r> {
103 /// Creates a new parser from a buffered reader.
104 fn into_parser<Parser: LogParser>(self) -> StreamParser<'r, Parser>;
105}
106impl<'r, R: BufRead + 'r> IntoStreamParser<'r> for R {
107 fn into_parser<Parser: LogParser>(self) -> StreamParser<'r, Parser> {
108 Parser::from(self)
109 }
110}
111
112pub trait CursorRead: AsRef<[u8]> + Sized {
113 /// Turns any `[u8]` data such as a [`String`] into a
114 /// [`Cursor`](std::io::Cursor) which implements
115 /// [`BufRead`](std::io::BufRead). Intended to be chained with
116 /// [`into_parser`](IntoStreamParser::into_parser).
117 fn into_cursor(self) -> std::io::Cursor<Self> {
118 std::io::Cursor::new(self)
119 }
120}
121impl<T: AsRef<[u8]>> CursorRead for T {}
122
123pub trait FileRead: AsRef<Path> + Sized {
124 /// Opens a file and returns a buffered reader and the file's metadata. A
125 /// more memory efficient alternative to
126 /// `fs::read_to_string(self)?.into_cursor()`.
127 fn read_open(self) -> std::io::Result<(Metadata, BufReader<File>)> {
128 let file = File::open(self)?;
129 let metadata = file.metadata()?;
130 let reader = BufReader::new(file);
131 Ok((metadata, reader))
132 }
133}
134impl<T: AsRef<Path>> FileRead for T {}
135
136pub trait IntoAsyncParser<'r> {
137 /// Creates a new parser from an async buffered reader.
138 fn into_async_parser<Parser: LogParser>(self) -> AsyncParser<'r, Parser>;
139}
140impl<'r, R: AsyncBufRead + Unpin + 'r> IntoAsyncParser<'r> for R {
141 fn into_async_parser<Parser: LogParser>(self) -> AsyncParser<'r, Parser> {
142 Parser::from_async(self)
143 }
144}
145
146pub trait AsyncBufferRead: AsyncRead + Sized {
147 /// Buffers any [`AsyncRead`](futures::io::AsyncRead) stream into a
148 /// [`BufAsyncRead`](futures::io::BufReader) stream. Do not use this if the
149 /// underlying stream already implements
150 /// [`BufAsyncRead`](futures::io::BufReader).
151 fn buffer(self) -> futures::io::BufReader<Self> {
152 futures::io::BufReader::new(self)
153 }
154}
155impl<T: AsyncRead> AsyncBufferRead for T {}
156
157pub trait AsyncCursorRead: AsRef<[u8]> + Unpin + Sized {
158 /// Turns any `[u8]` data such as a [`String`] into a async
159 /// [`Cursor`](futures::io::Cursor) which implements
160 /// [`AsyncBufRead`](futures::io::AsyncBufRead). Intended to be chained with
161 /// [`into_async_parser`](IntoAsyncParser::into_async_parser).
162 fn into_async_cursor(self) -> futures::io::Cursor<Self> {
163 futures::io::Cursor::new(self)
164 }
165}
166impl<T: AsRef<[u8]> + Unpin> AsyncCursorRead for T {}
167
168////////////////////
169// Parser Execution
170////////////////////
171
172#[derive(Debug, Clone)]
173pub enum ParseState<T> {
174 Paused(T, ReaderState),
175 Completed { end_of_stream: bool },
176 Error(FatalError),
177}
178
179impl<T> ParseState<T> {
180 pub fn is_timeout(&self) -> bool {
181 matches!(self, Self::Paused(..))
182 }
183 pub fn error(&self) -> Option<&FatalError> {
184 match self {
185 Self::Error(err) => Some(err),
186 _ => None,
187 }
188 }
189
190 fn map<U>(self, f: impl FnOnce(T) -> U) -> ParseState<U> {
191 match self {
192 Self::Paused(t, rs) => ParseState::Paused(f(t), rs),
193 Self::Completed { end_of_stream } => ParseState::Completed { end_of_stream },
194 Self::Error(err) => ParseState::Error(err),
195 }
196 }
197}
198
199/// Progress information for a parser.
200#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
201pub struct ReaderState {
202 /// The number of bytes parsed so far.
203 pub bytes_read: usize,
204 /// The number of lines parsed so far.
205 pub lines_read: usize,
206}
207
208#[duplicate::duplicate_item(
209 EitherParser ReadBound async add_await(code);
210 [StreamParser] [BufRead + 'r] [] [code];
211 [AsyncParser] [AsyncBufRead + Unpin + 'r] [async] [code.await];
212)]
213mod wrapper {
214 use super::*;
215 enum Never {}
216
217 /// Struct which contains both a parser state as well as the stream of lines
218 /// which are being parsed. Always use this instead of the raw underlying
219 /// parser. Feeds the parser line-by-line with a callback to indicate if or
220 /// when to pause. Supports any parser as long as it implements
221 /// [`LogParser`].
222 pub struct EitherParser<'r, Parser: LogParser> {
223 reader: Option<Box<dyn ReadBound>>,
224 reader_state: ReaderState,
225 parser: Parser,
226 }
227 impl<'r, Parser: LogParser, R: ReadBound> From<R> for EitherParser<'r, Parser> {
228 fn from(reader: R) -> Self {
229 Self::new(reader)
230 }
231 }
232 impl<'r, Parser: LogParser> EitherParser<'r, Parser> {
233 pub(super) fn new(reader: impl ReadBound) -> Self {
234 Self {
235 reader: Some(Box::new(reader)),
236 reader_state: ReaderState::default(),
237 parser: Parser::default(),
238 }
239 }
240
241 /// Get the current parser state.
242 pub fn parser(&self) -> &Parser {
243 &self.parser
244 }
245 /// Get the current parser state.
246 pub fn take_parser(mut self) -> Parser {
247 if let Some(reader) = self.reader.take() {
248 drop(reader);
249 self.parser.end_of_file();
250 }
251 self.parser
252 }
253 /// Get the current reader progress.
254 pub fn reader_state(&self) -> ReaderState {
255 self.reader_state
256 }
257 /// Have we finished parsing?
258 pub fn is_done(&self) -> bool {
259 self.reader.is_none()
260 }
261
262 async fn process_line(
263 reader: &mut Box<dyn ReadBound>,
264 reader_state: &mut ReaderState,
265 parser: &mut Parser,
266 buf: &mut String,
267 ) -> Result<Option<bool>, FatalError> {
268 buf.clear();
269 // Read line
270 let mut bytes_read = 0;
271
272 loop {
273 bytes_read += add_await([reader.read_line(buf)])?;
274 reader_state.lines_read += 1;
275 let peek = add_await([reader.fill_buf()])?;
276 // Stop reading if this is the end or we don't have a multiline.
277 if peek.is_empty() || parser.is_line_start(peek[0]) {
278 break;
279 }
280 }
281 if bytes_read == 0 {
282 return Ok(Some(true));
283 }
284 // Remove newline from end
285 if buf.ends_with('\n') {
286 buf.pop();
287 if buf.ends_with('\r') {
288 buf.pop();
289 }
290 } else {
291 // Ignore the last line in a file if it doesn't end with a
292 // newline, this happens if the process writing to it was
293 // killed.
294 return Ok(Some(true));
295 }
296
297 // Parse line
298 reader_state.bytes_read += bytes_read;
299 let stop_parsing = !parser.process_line(buf, reader_state.lines_read)?;
300 Ok(stop_parsing.then_some(false))
301 }
302
303 /// Parse the the input while calling the `predicate` callback after
304 /// each line. Keep parsing until the callback returns `Some(t)` or we
305 /// reach the end of the input. If we stopped due to the callback,
306 /// return the current progress as `ParseState::Paused(t, state)`. After
307 /// this function returns, use [`parser`](Self::parser) to retrieve the
308 /// parser state.
309 ///
310 /// The `predicate` callback should aim to return quickly since **it is
311 /// called between each line!** If heavier processing is required
312 /// consider using [`process_check_every`] or [`process_until_every`].
313 pub async fn process_until<T>(
314 &mut self,
315 mut predicate: impl FnMut(&Parser, ReaderState) -> Option<T>,
316 ) -> ParseState<T> {
317 let Some(reader) = self.reader.as_mut() else {
318 return ParseState::Completed {
319 end_of_stream: true,
320 };
321 };
322 let mut buf = String::new();
323 loop {
324 if let Some(t) = predicate(&self.parser, self.reader_state) {
325 return ParseState::Paused(t, self.reader_state);
326 }
327 let state = match add_await([Self::process_line(
328 reader,
329 &mut self.reader_state,
330 &mut self.parser,
331 &mut buf,
332 )]) {
333 Ok(None) => continue,
334 Ok(Some(end_of_stream)) => ParseState::Completed { end_of_stream },
335 Err(err) => ParseState::Error(err),
336 };
337 drop(self.reader.take()); // Release file handle/free up memory
338 self.parser.end_of_file();
339 return state;
340 }
341 }
342 /// Identical to [`process_until`] except the predicate is only checked
343 /// every `lines_per_check` lines.
344 pub async fn process_until_every<T>(
345 &mut self,
346 mut predicate: impl FnMut(&Parser, ReaderState) -> Option<T>,
347 lines_per_check: usize,
348 ) -> ParseState<T> {
349 assert!(lines_per_check > 0);
350 let mut remaining = lines_per_check;
351 add_await([self.process_until(move |p, rs| {
352 remaining -= 1;
353 if remaining == 0 {
354 remaining = lines_per_check;
355 predicate(p, rs)
356 } else {
357 None
358 }
359 })])
360 }
361
362 /// Parse the the input while calling the `predicate` callback every
363 /// `delta` time. Keep parsing until the callback returns `Some(t)` or
364 /// we reach the end of the input. If we stopped due to the callback,
365 /// return the current progress as `ParseState::Paused(t, state)`. After
366 /// this function returns, use [`parser`](Self::parser) to retrieve the
367 /// parser state.
368 pub async fn process_check_every<T>(
369 &mut self,
370 delta: Duration,
371 mut predicate: impl FnMut(&Parser, ReaderState) -> Option<T>,
372 ) -> ParseState<T> {
373 // Calling `Instant::now` between each line can get expensive, so
374 // we'll try to avoid it. We will try to check every
375 // `MAX_LINES_PER_TIME_VARIATION * time_left`, ensuring that we
376 // never go over time as long as the lines-per-time of the parser
377 // does not drop by more than `MAX_TIME_OVER_APPROX` between checks.
378 // The closer this is to `1`, the fewer checks we'll do.
379 const MAX_LINES_PER_TIME_VARIATION: u128 = 50;
380 let initial_lines_per_check =
381 delta.as_millis().try_into().unwrap_or(usize::MAX).max(10);
382 // How many lines must pass before we check time again?
383 let mut lines_per_check = initial_lines_per_check;
384 // How many lines until the next time check?
385 let mut next_check = lines_per_check;
386 let max_lpc = lines_per_check.saturating_mul(2);
387 let mut start = Instant::now();
388 let mut last_check_time = start;
389 add_await([self.process_until(move |p, rs| {
390 next_check -= 1;
391 if next_check == 0 {
392 let now = Instant::now();
393 match delta.checked_sub(now - start) {
394 Some(time_left) if !time_left.is_zero() => {
395 let time_left = time_left.as_nanos();
396 let check_delta = (now - last_check_time).as_nanos();
397 last_check_time = now;
398 if check_delta < MAX_LINES_PER_TIME_VARIATION {
399 lines_per_check = 1;
400 } else {
401 let check_delta =
402 check_delta.saturating_mul(MAX_LINES_PER_TIME_VARIATION);
403 // How much smaller is `lines_per_check` than it
404 // should be?
405 let under_approx = (time_left / check_delta)
406 .try_into()
407 .unwrap_or(usize::MAX)
408 .max(1);
409 // Do rounding up division to make sure
410 // `over_approx > 1` as soon as `check_delta >
411 // time_left`.
412 let over_approx = check_delta.div_ceil(time_left);
413 // How much larger is `lines_per_check` than it
414 // should be?
415 let over_approx =
416 over_approx.try_into().unwrap_or(usize::MAX).max(1);
417 let new_lpc = (lines_per_check * under_approx) / over_approx;
418 lines_per_check = new_lpc.min(max_lpc).max(1);
419 }
420 next_check = lines_per_check;
421 }
422 _ => {
423 // Out of time, reset to initial values and call
424 // the predicate.
425 lines_per_check = initial_lines_per_check;
426 next_check = lines_per_check;
427 start = now;
428 last_check_time = now;
429 return predicate(p, rs);
430 }
431 }
432 }
433 None
434 })])
435 }
436
437 /// Parse the entire file as a stream. Using [`process_all_timeout`]
438 /// instead is recommended as this method will cause the process to hang
439 /// if given a very large file.
440 pub async fn process_all(mut self) -> FResult<Parser> {
441 match add_await([self.process_until(|_, _| None::<Never>)]) {
442 ParseState::Paused(n, _) => match n {},
443 ParseState::Completed { .. } => Ok(self.take_parser()),
444 ParseState::Error(err) => Err(err),
445 }
446 }
447 /// Try to parse everything, but stop after a given timeout. The result
448 /// tuple contains `ParseState::Paused(read_info)` if the timeout was
449 /// reached, and the parser state at the end (i.e. the state is complete
450 /// only if `ParseState::Completed` was returned).
451 ///
452 /// Parsing cannot be resumed if the timeout is reached. If you need
453 /// support for resuming, use [`process_check_every`] or
454 /// [`process_until`] instead.
455 pub async fn process_all_timeout(mut self, timeout: Duration) -> (ParseState<()>, Parser) {
456 let result = add_await([self.process_check_every(timeout, |_, _| Some(()))]);
457 (result, self.take_parser())
458 }
459 /// Try to parse everything, but stop after parsing `bytes` bytes or
460 /// `lines` lines (whichever comes first). The limit is ignored when set
461 /// to `None` (i.e. if both are `None` this is identical to
462 /// `process_all`). The result tuple contains
463 /// `ParseState::Paused(read_info)` if the limit was reached, and the
464 /// parser state at the end (i.e. the state is complete only if
465 /// `ParseState::Completed` was returned).
466 ///
467 /// Parsing cannot be resumed if the limit is reached. If you need
468 /// support for resuming, use [`process_until`] instead.
469 pub async fn process_all_limit(
470 mut self,
471 bytes: Option<usize>,
472 lines: Option<usize>,
473 ) -> (ParseState<()>, Parser) {
474 let result = match (bytes, lines) {
475 (Some(bytes), Some(lines)) => add_await([self.process_until(|_, s| {
476 (s.bytes_read >= bytes || s.lines_read >= lines).then_some(())
477 })]),
478 (Some(bytes), None) => {
479 add_await([self.process_until(|_, s| (s.bytes_read >= bytes).then_some(()))])
480 }
481 (None, Some(lines)) => {
482 add_await([self.process_until(|_, s| (s.lines_read >= lines).then_some(()))])
483 }
484 (None, None) => {
485 add_await([self.process_until(|_, _| None::<Never>)]).map(|n| match n {})
486 }
487 };
488 (result, self.take_parser())
489 }
490 }
491}