mysql_slowlog_parser/
codec.rs

1use crate::codec::EntryError::MissingField;
2use crate::parser::{
3    HeaderLines, Stream, admin_command, details_comment, entry_user, log_header, parse_entry_stats,
4    parse_entry_time, parse_sql, sql_lines, start_timestamp_command, use_database,
5};
6use crate::types::EntryStatement::SqlStatement;
7use crate::types::{Entry, EntryCall, EntrySqlAttributes, EntrySqlStatement, EntryStatement};
8use crate::{EntryCodecConfig, SessionLine, SqlStatementContext, StatsLine};
9use bytes::{Bytes, BytesMut};
10use log::debug;
11use std::default::Default;
12use std::fmt::{Display, Formatter};
13use std::ops::AddAssign;
14use thiserror::Error;
15use tokio::io;
16use tokio_util::codec::Decoder;
17use winnow::ModalResult;
18use winnow::Parser;
19use winnow::ascii::multispace0;
20use winnow::combinator::opt;
21use winnow::error::ErrMode;
22use winnow::stream::AsBytes;
23use winnow::stream::Stream as _;
24use winnow_datetime::DateTime;
25
26const LENGTH_MAX: usize = 10000000000;
27
28/// Error when building an entry
29#[derive(Error, Debug)]
30pub enum EntryError {
31    /// a field is missing from the entry
32    #[error("entry field is missing: {0}")]
33    MissingField(String),
34    /// an entry contains a duplicate id
35    #[error("duplicate id: {0}")]
36    DuplicateId(String),
37}
38
39/// Errors for problems when reading frames from the source
40#[derive(Debug, Error)]
41pub enum CodecError {
42    /// a problem from the IO layer below caused the error
43    #[error("file read error: {0}")]
44    IO(#[from] io::Error),
45    /// a new entry started before the previous one was completed
46    #[error("found start of new entry before entry completed at line: {0}")]
47    IncompleteEntry(EntryError),
48}
49
50#[derive(Debug)]
51enum CodecExpect {
52    Header,
53    Time,
54    User,
55    Stats,
56    UseDatabase,
57    StartTimeStamp,
58    Sql,
59}
60
61impl Default for CodecExpect {
62    fn default() -> Self {
63        Self::Header
64    }
65}
66
67impl Display for CodecExpect {
68    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
69        let out = match self {
70            CodecExpect::Header => "header",
71            CodecExpect::Time => "time",
72            CodecExpect::User => "user",
73            CodecExpect::Stats => "stats",
74            CodecExpect::UseDatabase => "use database",
75            CodecExpect::StartTimeStamp => "start time stamp statement",
76            CodecExpect::Sql => "sql statement",
77        };
78        write!(f, "{}", out)
79    }
80}
81
82#[derive(Debug, Default)]
83struct EntryContext {
84    expects: CodecExpect,
85    headers: HeaderLines,
86    time: Option<DateTime>,
87    user: Option<SessionLine>,
88    stats: Option<StatsLine>,
89    set_timestamp: Option<u32>,
90    attributes: Option<EntrySqlAttributes>,
91}
92
93impl EntryContext {
94    fn complete(&mut self) -> Result<Entry, EntryError> {
95        let time = self.time.clone().ok_or(MissingField("time".into()))?;
96        let session = self.user.clone().ok_or(MissingField("user".into()))?;
97        let stats = self.stats.clone().ok_or(MissingField("stats".into()))?;
98        let set_timestamp = self
99            .set_timestamp
100            .clone()
101            .ok_or(MissingField("set timestamp".into()))?;
102        let attributes = self.attributes.clone().ok_or(MissingField("sql".into()))?;
103        let e = Entry {
104            call: EntryCall::new(time, set_timestamp),
105            session: session.into(),
106            stats: stats.into(),
107            sql_attributes: attributes,
108        };
109
110        self.reset();
111
112        Ok(e)
113    }
114
115    fn reset(&mut self) {
116        *self = EntryContext::default();
117    }
118}
119
120/// struct holding contextual information used while decoding
121#[derive(Debug, Default)]
122pub struct EntryCodec {
123    processed: usize,
124    context: EntryContext,
125    config: EntryCodecConfig,
126}
127
128impl EntryCodec {
129    /// create a new `EntryCodec` with the specified configuration
130    pub fn new(c: EntryCodecConfig) -> Self {
131        Self {
132            config: c,
133            ..Default::default()
134        }
135    }
136    /// calls the appropriate parser based on the current state held in the Codec context
137    fn parse_next<'b>(&mut self, i: &mut Stream<'b>) -> ModalResult<Option<Entry>> {
138        let entry = match self.context.expects {
139            CodecExpect::Header => {
140                let _ = multispace0(i)?;
141
142                let res = opt(log_header).parse_next(i)?;
143                self.context.expects = CodecExpect::Time;
144                self.context.headers = res.unwrap_or_default();
145
146                None
147            }
148            CodecExpect::Time => {
149                let _ = multispace0(i)?;
150
151                let dt = parse_entry_time(i)?;
152                self.context.time = Some(dt);
153                self.context.expects = CodecExpect::User;
154                None
155            }
156            CodecExpect::User => {
157                let sl = entry_user(i)?;
158                self.context.user = Some(sl);
159                self.context.expects = CodecExpect::Stats;
160                None
161            }
162            CodecExpect::Stats => {
163                let _ = multispace0(i)?;
164                let st = parse_entry_stats(i)?;
165                self.context.stats = Some(st);
166                self.context.expects = CodecExpect::UseDatabase;
167                None
168            }
169            CodecExpect::UseDatabase => {
170                let _ = multispace0(i)?;
171                let _ = opt(use_database).parse_next(i)?;
172
173                self.context.expects = CodecExpect::StartTimeStamp;
174                None
175            }
176            CodecExpect::StartTimeStamp => {
177                let _ = multispace0(i)?;
178                let st = start_timestamp_command(i)?;
179                self.context.set_timestamp = Some(st.into());
180                self.context.expects = CodecExpect::Sql;
181                None
182            }
183            CodecExpect::Sql => {
184                let _ = multispace0(i)?;
185
186                if let Ok(c) = admin_command(i) {
187                    self.context.attributes = Some(EntrySqlAttributes {
188                        sql: (c.command.clone()),
189                        statement: EntryStatement::AdminCommand(c),
190                    });
191                } else {
192                    let mut details = None;
193
194                    if let Ok(Some(d)) = opt(details_comment).parse_next(i) {
195                        details = Some(d);
196                    }
197
198                    let mut sql_lines = sql_lines(i)?;
199
200                    let s = if let Ok(s) =
201                        parse_sql(&String::from_utf8_lossy(&sql_lines), &self.config.masking)
202                    {
203                        if s.len() == 1 {
204                            let context: Option<SqlStatementContext> = if let Some(d) = details {
205                                if let Some(f) = &self.config.map_comment_context {
206                                    //TODO: map these keys
207                                    f(d)
208                                } else {
209                                    None
210                                }
211                            } else {
212                                None
213                            };
214
215                            let s = EntrySqlStatement {
216                                statement: s[0].clone(),
217                                context,
218                            };
219
220                            sql_lines = Bytes::from(s.statement.to_string());
221                            SqlStatement(s)
222                        } else {
223                            EntryStatement::InvalidStatement(
224                                String::from_utf8_lossy(&sql_lines).to_string(),
225                            )
226                        }
227                    } else {
228                        EntryStatement::InvalidStatement(
229                            String::from_utf8_lossy(&sql_lines).to_string(),
230                        )
231                    };
232
233                    self.context.attributes = Some(EntrySqlAttributes {
234                        sql: sql_lines,
235                        //-- TODO: pull this from the Entry Statement
236                        statement: s,
237                    });
238                }
239
240                let e = self.context.complete().unwrap();
241                Some(e)
242            }
243        };
244
245        return if let Some(e) = entry {
246            self.processed.add_assign(1);
247
248            Ok(Some(e))
249        } else {
250            Ok(None)
251        };
252    }
253}
254
255impl Decoder for EntryCodec {
256    type Item = Entry;
257    type Error = CodecError;
258
259    /// calls `parse_next` and manages state changes and buffer fill
260    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
261        if src.len() < 4 {
262            // Not enough data to read length marker.
263            return Ok(None);
264        }
265
266        // Read length marker.
267        let mut length_bytes = [0u8; 4];
268        length_bytes.copy_from_slice(&src[..4]);
269        let length = u32::from_le_bytes(length_bytes) as usize;
270
271        // Check that the length is not too large to avoid a denial of
272        // service attack where the server runs out of memory.
273        if length > LENGTH_MAX {
274            return Err(io::Error::new(
275                io::ErrorKind::InvalidData,
276                format!("Frame of length {} is too large.", length),
277            )
278            .into());
279        }
280
281        let b = &src.split()[..];
282        let mut i = Stream::new(&b);
283
284        let mut start = i.checkpoint();
285
286        loop {
287            if i.len() == 0 {
288                return Ok(None);
289            };
290
291            match self.parse_next(&mut i) {
292                Ok(e) => {
293                    if let Some(e) = e {
294                        self.context = EntryContext::default();
295
296                        src.extend_from_slice(i.as_bytes());
297
298                        return Ok(Some(e));
299                    } else {
300                        debug!("preparing input for next parser\n");
301
302                        start = i.checkpoint();
303
304                        continue;
305                    }
306                }
307                Err(ErrMode::Incomplete(_)) => {
308                    i.reset(&start);
309                    src.extend_from_slice(i.as_bytes());
310
311                    return Ok(None);
312                }
313                Err(ErrMode::Backtrack(e)) => {
314                    panic!(
315                        "unhandled parser backtrack error after {:#?} processed: {}",
316                        e.to_string(),
317                        self.processed
318                    );
319                }
320                Err(ErrMode::Cut(e)) => {
321                    panic!(
322                        "unhandled parser cut error after {:#?} processed: {}",
323                        e.to_string(),
324                        self.processed
325                    );
326                }
327            }
328        }
329    }
330
331    /// decodes end of file and ensures that there are no unprocessed bytes on the stream.
332    ///
333    /// and `io::Error` of type io::ErrorKind::Other is thrown in the case of remaining data.
334    fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
335        match self.decode(buf)? {
336            Some(frame) => Ok(Some(frame)),
337            None => {
338                let p = buf.iter().position(|v| !v.is_ascii_whitespace());
339
340                if p.is_none() {
341                    Ok(None)
342                } else {
343                    let out = format!(
344                        "bytes remaining on stream; {}",
345                        std::str::from_utf8(buf).unwrap()
346                    );
347                    Err(io::Error::new(io::ErrorKind::Other, out).into())
348                }
349            }
350        }
351    }
352}
353
354#[cfg(test)]
355mod tests {
356    use crate::codec::EntryCodec;
357    use crate::parser::parse_sql;
358    use crate::types::EntryStatement::SqlStatement;
359    use crate::types::{
360        Entry, EntryCall, EntrySession, EntrySqlAttributes, EntrySqlStatement,
361        EntrySqlStatementObject, EntryStatement, EntryStats,
362    };
363    use crate::{EntryCodecConfig, EntryMasking, SqlStatementContext};
364    use bytes::{Bytes, BytesMut};
365    use futures::StreamExt;
366    use std::default::Default;
367    use std::io::Cursor;
368    use std::ops::AddAssign;
369
370    use tokio::fs::File;
371    use tokio_util::codec::Framed;
372    use winnow::error::InputError;
373    use winnow_iso8601::datetime::datetime;
374
375    #[tokio::test]
376    async fn parses_select_entry() {
377        let sql_comment = "-- request_id: apLo5wdqkmKw4W7vGfiBc5, file: src/endpoints/original/mod\
378        .rs, method: notifications(), line: 38";
379        let sql = "SELECT film.film_id AS FID, film.title AS title, film.description AS \
380        description, category.name AS category, film.rental_rate AS price FROM category LEFT JOIN \
381         film_category ON category.category_id = film_category.category_id LEFT JOIN film ON \
382         film_category.film_id = film.film_id GROUP BY film.film_id, category.name;";
383        //NOTE: decimal places were shortened by parser, so this time is shortened
384        let mut time = "2018-02-05T02:46:47.273Z";
385
386        let entry = format!(
387            "# Time: {}
388# User@Host: msandbox[msandbox] @ localhost []  Id:    10
389# Query_time: 0.000352  Lock_time: 0.000000 Rows_sent: 0  Rows_examined: 0
390use mysql;
391SET timestamp=1517798807;
392{}
393{},
394",
395            time, sql_comment, sql
396        );
397
398        let mut eb = entry.as_bytes().to_vec();
399
400        let config = EntryCodecConfig {
401            masking: Default::default(),
402            map_comment_context: Some(|d| {
403                let acc = SqlStatementContext {
404                    request_id: d
405                        .get(&*BytesMut::from("request_id"))
406                        .and_then(|b| Some(b.clone())),
407                    caller: d
408                        .get(&*BytesMut::from("file"))
409                        .and_then(|b| Some(b.clone())),
410                    function: d
411                        .get(&*BytesMut::from("method"))
412                        .and_then(|b| Some(b.clone())),
413                    line: d
414                        .get(&*BytesMut::from("line"))
415                        .and_then(|b| String::from_utf8_lossy(b).parse().ok()),
416                };
417
418                if acc == SqlStatementContext::default() {
419                    None
420                } else {
421                    Some(acc)
422                }
423            }),
424        };
425
426        let mut ff = Framed::new(Cursor::new(&mut eb), EntryCodec::new(config));
427        let e = ff.next().await.unwrap().unwrap();
428
429        let stmts = parse_sql(sql, &EntryMasking::None).unwrap();
430
431        let expected_stmt = EntrySqlStatement {
432            statement: stmts.get(0).unwrap().clone(),
433            context: Some(SqlStatementContext {
434                request_id: Some("apLo5wdqkmKw4W7vGfiBc5".into()),
435                caller: Some("src/endpoints/original/mod.rs".into()),
436                function: Some("notifications()".into()),
437                line: Some(38),
438            }),
439        };
440
441        let expected_sql = sql.trim().strip_suffix(";").unwrap();
442
443        let expected_entry = Entry {
444            call: EntryCall::new(
445                //TODO: handle error
446                datetime::<_, InputError<_>>(&mut time).unwrap(),
447                1517798807,
448            ),
449            session: EntrySession {
450                user_name: Bytes::from("msandbox"),
451                sys_user_name: Bytes::from("msandbox"),
452                host_name: Some(Bytes::from("localhost")),
453                ip_address: None,
454                thread_id: 10,
455            },
456            stats: EntryStats {
457                query_time: 0.000352,
458                lock_time: 0.0,
459                rows_sent: 0,
460                rows_examined: 0,
461            },
462            sql_attributes: EntrySqlAttributes {
463                sql: Bytes::from(expected_sql),
464                statement: SqlStatement(expected_stmt),
465            },
466        };
467
468        assert_eq!(e, expected_entry);
469    }
470
471    #[tokio::test]
472    async fn parses_multiple_entries() {
473        let entries = "# Time: 2018-02-05T02:46:47.273786Z
474# User@Host: msandbox[msandbox] @ localhost []  Id:    10
475# Query_time: 0.000352  Lock_time: 0.000000 Rows_sent: 0  Rows_examined: 0
476SET timestamp=1517798807;
477-- ID: 123, caller: hello_world()
478SELECT film.film_id AS FID, film.title AS title, film.description AS description, category.name AS category, film.rental_rate AS price
479FROM category LEFT JOIN film_category ON category.category_id = film_category.category_id LEFT JOIN film ON film_category.film_id = film.film_id
480GROUP BY film.film_id, category.name;
481# Time: 2018-02-05T02:46:47.273787Z
482# User@Host: msandbox[msandbox] @ localhost []  Id:    10
483# Query_time: 0.000352  Lock_time: 0.000000 Rows_sent: 0  Rows_examined: 0
484SET timestamp=1517798808;
485/*!40101 SET NAMES utf8 */;
486# Time: 2018-02-05T02:46:47.273788Z
487# User@Host: msandbox[msandbox] @ localhost []  Id:    10
488# Query_time: 0.000352  Lock_time: 0.000000 Rows_sent: 0  Rows_examined: 0
489SET timestamp=1517798809;
490-- ID: 456, caller: hello_world()
491SELECT film2.film_id AS FID, film2.title AS title, film2.description AS description, category.name
492AS category, film2.rental_rate AS price
493FROM category LEFT JOIN film_category ON category.category_id = film_category.category_id LEFT
494JOIN film2 ON film_category.film_id = film2.film_id
495GROUP BY film2.film_id, category.name;
496";
497
498        let mut eb = entries.as_bytes().to_vec();
499
500        let mut ff = Framed::with_capacity(Cursor::new(&mut eb), EntryCodec::default(), 4);
501
502        let mut found = 0;
503        let mut invalid = 0;
504
505        while let Some(res) = ff.next().await {
506            let e = res.unwrap();
507            found.add_assign(1);
508
509            if let EntryStatement::InvalidStatement(_) = e.sql_attributes.statement {
510                invalid.add_assign(1);
511            }
512        }
513
514        assert_eq!(found, 3, "found");
515        assert_eq!(invalid, 1, "valid");
516    }
517
518    #[tokio::test]
519    async fn parses_select_objects() {
520        let sql = String::from("SELECT film.film_id AS FID, film.title AS title, film.description AS description, category.name AS category, film.rental_rate AS price
521    FROM category LEFT JOIN film_category ON category.category_id = film_category.category_id LEFT
522    JOIN film ON film_category.film_id = film.film_id LEFT JOIN film AS dupe_film ON film_category
523    .film_id = dupe_film.film_id LEFT JOIN other.film AS other_film ON other_film.film_id =
524    film_category.film_id
525    GROUP BY film.film_id, category.name;");
526
527        let entry = format!(
528            "# Time: 2018-02-05T02:46:47.273786Z
529    # User@Host: msandbox[msandbox] @ localhost []  Id:    10
530    # Query_time: 0.000352  Lock_time: 0.000000 Rows_sent: 0  Rows_examined: 0
531    SET timestamp=1517798807;
532    {}",
533            sql
534        );
535
536        let expected = vec![
537            EntrySqlStatementObject {
538                schema_name: None,
539                object_name: "category".as_bytes().into(),
540            },
541            EntrySqlStatementObject {
542                schema_name: None,
543                object_name: "film".as_bytes().into(),
544            },
545            EntrySqlStatementObject {
546                schema_name: None,
547                object_name: "film_category".as_bytes().into(),
548            },
549            EntrySqlStatementObject {
550                schema_name: Some("other".as_bytes().into()),
551                object_name: "film".as_bytes().into(),
552            },
553        ];
554
555        let mut eb = entry.as_bytes().to_vec();
556
557        let mut ff = Framed::new(Cursor::new(&mut eb), EntryCodec::default());
558        let e = ff.next().await.unwrap().unwrap();
559
560        match e.sql_attributes.statement() {
561            SqlStatement(s) => {
562                assert_eq!(s.objects(), expected);
563                assert_eq!(s.sql_type().to_string(), "SELECT".to_string());
564            }
565            _ => {
566                panic!("should have parsed sql as SqlStatement")
567            }
568        }
569    }
570
571    #[tokio::test]
572    async fn parse_log_file() {
573        let f = File::open("assets/slow-test-queries.log").await.unwrap();
574        let mut ff = Framed::new(f, EntryCodec::default());
575
576        let mut i = 0;
577
578        while let Some(res) = ff.next().await {
579            let _ = res.unwrap();
580            i.add_assign(1);
581        }
582
583        assert_eq!(i, 310);
584    }
585
586    #[tokio::test]
587    async fn parse_mysql_log_file_small_capacity() {
588        let f = File::open("assets/slow-test-queries.log").await.unwrap();
589        let mut ff = Framed::with_capacity(f, EntryCodec::default(), 4);
590
591        let mut i = 0;
592
593        while let Some(res) = ff.next().await {
594            let _ = res.unwrap();
595            i.add_assign(1);
596        }
597
598        assert_eq!(i, 310);
599    }
600}