mysql_slowlog_parser/
codec.rs

1use crate::codec::EntryError::MissingField;
2use crate::parser::{
3    admin_command, details_comment, entry_user, log_header, parse_entry_stats, parse_entry_time,
4    parse_sql, sql_lines, start_timestamp_command, use_database, HeaderLines, Stream,
5};
6use crate::types::EntryStatement::SqlStatement;
7use crate::types::{Entry, EntryCall, EntrySqlAttributes, EntrySqlStatement, EntryStatement};
8use crate::{EntryCodecConfig, SessionLine, SqlStatementContext, StatsLine};
9use bytes::{Bytes, BytesMut};
10use log::debug;
11use std::default::Default;
12use std::fmt::{Display, Formatter};
13use std::ops::AddAssign;
14use thiserror::Error;
15use tokio::io;
16use tokio_util::codec::Decoder;
17use winnow::ascii::multispace0;
18use winnow::combinator::opt;
19use winnow::error::ErrMode;
20use winnow::stream::AsBytes;
21use winnow::stream::Stream as _;
22use winnow::PResult;
23use winnow::Parser;
24use winnow_iso8601::DateTime;
25
26const LENGTH_MAX: usize = 10000000000;
27
28/// Error when building an entry
29#[derive(Error, Debug)]
30pub enum EntryError {
31    /// a field is missing from the entry
32    #[error("entry field is missing: {0}")]
33    MissingField(String),
34    /// an entry contains a duplicate id
35    #[error("duplicate id: {0}")]
36    DuplicateId(String),
37}
38
39/// Errors for problems when reading frames from the source
40#[derive(Debug, Error)]
41pub enum CodecError {
42    /// a problem from the IO layer below caused the error
43    #[error("file read error: {0}")]
44    IO(#[from] io::Error),
45    /// a new entry started before the previous one was completed
46    #[error("found start of new entry before entry completed at line: {0}")]
47    IncompleteEntry(EntryError),
48}
49
50#[derive(Debug)]
51enum CodecExpect {
52    Header,
53    Time,
54    User,
55    Stats,
56    UseDatabase,
57    StartTimeStamp,
58    Sql,
59}
60
61impl Default for CodecExpect {
62    fn default() -> Self {
63        Self::Header
64    }
65}
66
67impl Display for CodecExpect {
68    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
69        let out = match self {
70            CodecExpect::Header => "header",
71            CodecExpect::Time => "time",
72            CodecExpect::User => "user",
73            CodecExpect::Stats => "stats",
74            CodecExpect::UseDatabase => "use database",
75            CodecExpect::StartTimeStamp => "start time stamp statement",
76            CodecExpect::Sql => "sql statement",
77        };
78        write!(f, "{}", out)
79    }
80}
81
82#[derive(Debug, Default)]
83struct EntryContext {
84    expects: CodecExpect,
85    headers: HeaderLines,
86    time: Option<DateTime>,
87    user: Option<SessionLine>,
88    stats: Option<StatsLine>,
89    set_timestamp: Option<u32>,
90    attributes: Option<EntrySqlAttributes>,
91}
92
93impl EntryContext {
94    fn complete(&mut self) -> Result<Entry, EntryError> {
95        let time = self.time.clone().ok_or(MissingField("time".into()))?;
96        let session = self.user.clone().ok_or(MissingField("user".into()))?;
97        let stats = self.stats.clone().ok_or(MissingField("stats".into()))?;
98        let set_timestamp = self
99            .set_timestamp
100            .clone()
101            .ok_or(MissingField("set timestamp".into()))?;
102        let attributes = self.attributes.clone().ok_or(MissingField("sql".into()))?;
103        let e = Entry {
104            call: EntryCall::new(time, set_timestamp),
105            session: session.into(),
106            stats: stats.into(),
107            sql_attributes: attributes,
108        };
109
110        self.reset();
111
112        Ok(e)
113    }
114
115    fn reset(&mut self) {
116        *self = EntryContext::default();
117    }
118}
119
120/// struct holding contextual information used while decoding
121#[derive(Debug, Default)]
122pub struct EntryCodec {
123    processed: usize,
124    context: EntryContext,
125    config: EntryCodecConfig,
126}
127
128impl EntryCodec {
129    /// create a new `EntryCodec` with the specified configuration
130    pub fn new(c: EntryCodecConfig) -> Self {
131        Self {
132            config: c,
133            ..Default::default()
134        }
135    }
136    /// calls the appropriate parser based on the current state held in the Codec context
137    fn parse_next<'b>(&mut self, i: &mut Stream<'b>) -> PResult<Option<Entry>> {
138        let entry = match self.context.expects {
139            CodecExpect::Header => {
140                let _ = multispace0(i)?;
141
142                let res = opt(log_header).parse_next(i)?;
143                self.context.expects = CodecExpect::Time;
144                self.context.headers = res.unwrap_or_default();
145
146                None
147            }
148            CodecExpect::Time => {
149                let _ = multispace0(i)?;
150
151                let dt = parse_entry_time(i)?;
152                self.context.time = Some(dt);
153                self.context.expects = CodecExpect::User;
154                None
155            }
156            CodecExpect::User => {
157                let sl = entry_user(i)?;
158                self.context.user = Some(sl);
159                self.context.expects = CodecExpect::Stats;
160                None
161            }
162            CodecExpect::Stats => {
163                let _ = multispace0(i)?;
164                let st = parse_entry_stats(i)?;
165                self.context.stats = Some(st);
166                self.context.expects = CodecExpect::UseDatabase;
167                None
168            }
169            CodecExpect::UseDatabase => {
170                let _ = multispace0(i)?;
171                let _ = opt(use_database).parse_next(i)?;
172
173                self.context.expects = CodecExpect::StartTimeStamp;
174                None
175            }
176            CodecExpect::StartTimeStamp => {
177                let _ = multispace0(i)?;
178                let st = start_timestamp_command(i)?;
179                self.context.set_timestamp = Some(st.into());
180                self.context.expects = CodecExpect::Sql;
181                None
182            }
183            CodecExpect::Sql => {
184                let _ = multispace0(i)?;
185
186                if let Ok(c) = admin_command(i) {
187                    self.context.attributes = Some(EntrySqlAttributes {
188                        sql: (c.command.clone()),
189                        statement: EntryStatement::AdminCommand(c),
190                    });
191                } else {
192                    let mut details = None;
193
194                    if let Ok(Some(d)) = opt(details_comment).parse_next(i) {
195                        details = Some(d);
196                    }
197
198                    let mut sql_lines = sql_lines(i)?;
199
200                    let s = if let Ok(s) =
201                        parse_sql(&String::from_utf8_lossy(&sql_lines), &self.config.masking)
202                    {
203                        if s.len() == 1 {
204                            let context: Option<SqlStatementContext> = if let Some(d) = details {
205                                if let Some(f) = &self.config.map_comment_context {
206                                    //TODO: map these keys
207                                    f(d)
208                                } else {
209                                    None
210                                }
211                            } else {
212                                None
213                            };
214
215                            let s = EntrySqlStatement {
216                                statement: s[0].clone(),
217                                context,
218                            };
219
220                            sql_lines = Bytes::from(s.statement.to_string());
221                            SqlStatement(s)
222                        } else {
223                            EntryStatement::InvalidStatement(
224                                String::from_utf8_lossy(&sql_lines).to_string(),
225                            )
226                        }
227                    } else {
228                        EntryStatement::InvalidStatement(
229                            String::from_utf8_lossy(&sql_lines).to_string(),
230                        )
231                    };
232
233                    self.context.attributes = Some(EntrySqlAttributes {
234                        sql: sql_lines,
235                        //-- TODO: pull this from the Entry Statement
236                        statement: s,
237                    });
238                }
239
240                let e = self.context.complete().unwrap();
241                Some(e)
242            }
243        };
244
245        return if let Some(e) = entry {
246            self.processed.add_assign(1);
247
248            Ok(Some(e))
249        } else {
250            Ok(None)
251        };
252    }
253}
254
255impl Decoder for EntryCodec {
256    type Item = Entry;
257    type Error = CodecError;
258
259    /// calls `parse_next` and manages state changes and buffer fill
260    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
261        if src.len() < 4 {
262            // Not enough data to read length marker.
263            return Ok(None);
264        }
265
266        // Read length marker.
267        let mut length_bytes = [0u8; 4];
268        length_bytes.copy_from_slice(&src[..4]);
269        let length = u32::from_le_bytes(length_bytes) as usize;
270
271        // Check that the length is not too large to avoid a denial of
272        // service attack where the server runs out of memory.
273        if length > LENGTH_MAX {
274            return Err(io::Error::new(
275                io::ErrorKind::InvalidData,
276                format!("Frame of length {} is too large.", length),
277            )
278            .into());
279        }
280
281        let b = &src.split()[..];
282        let mut i = Stream::new(&b);
283
284        let mut start = i.checkpoint();
285
286        loop {
287            if i.len() == 0 {
288                return Ok(None);
289            };
290
291            match self.parse_next(&mut i) {
292                Ok(e) => {
293                    if let Some(e) = e {
294                        self.context = EntryContext::default();
295
296                        src.extend_from_slice(i.as_bytes());
297
298                        return Ok(Some(e));
299                    } else {
300                        debug!("preparing input for next parser\n");
301
302                        start = i.checkpoint();
303
304                        continue;
305                    }
306                }
307                Err(ErrMode::Incomplete(_)) => {
308                    i.reset(&start);
309                    src.extend_from_slice(i.as_bytes());
310
311                    return Ok(None);
312                }
313                Err(ErrMode::Backtrack(e)) => {
314                    panic!(
315                        "unhandled parser backtrack error after {:#?} processed: {}",
316                        e.to_string(),
317                        self.processed
318                    );
319                }
320                Err(ErrMode::Cut(e)) => {
321                    panic!(
322                        "unhandled parser cut error after {:#?} processed: {}",
323                        e.to_string(),
324                        self.processed
325                    );
326                }
327            }
328        }
329    }
330
331    /// decodes end of file and ensures that there are no unprocessed bytes on the stream.
332    ///
333    /// and `io::Error` of type io::ErrorKind::Other is thrown in the case of remaining data.
334    fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
335        match self.decode(buf)? {
336            Some(frame) => Ok(Some(frame)),
337            None => {
338                let p = buf.iter().position(|v| !v.is_ascii_whitespace());
339
340                if p.is_none() {
341                    Ok(None)
342                } else {
343                    let out = format!(
344                        "bytes remaining on stream; {}",
345                        std::str::from_utf8(buf).unwrap()
346                    );
347                    Err(io::Error::new(io::ErrorKind::Other, out).into())
348                }
349            }
350        }
351    }
352}
353
354#[cfg(test)]
355mod tests {
356    use crate::codec::EntryCodec;
357    use crate::parser::parse_sql;
358    use crate::types::EntryStatement::SqlStatement;
359    use crate::types::{
360        Entry, EntryCall, EntrySession, EntrySqlAttributes, EntrySqlStatement,
361        EntrySqlStatementObject, EntryStatement, EntryStats,
362    };
363    use crate::{EntryCodecConfig, EntryMasking, SqlStatementContext};
364    use bytes::{Bytes, BytesMut};
365    use futures::StreamExt;
366    use std::default::Default;
367    use std::io::Cursor;
368    use std::ops::AddAssign;
369    use std::str::FromStr;
370    use tokio::fs::File;
371    use tokio_util::codec::Framed;
372    use winnow_iso8601::DateTime;
373
374    #[tokio::test]
375    async fn parses_select_entry() {
376        let sql_comment = "-- request_id: apLo5wdqkmKw4W7vGfiBc5, file: src/endpoints/original/mod\
377        .rs, method: notifications(), line: 38";
378        let sql = "SELECT film.film_id AS FID, film.title AS title, film.description AS \
379        description, category.name AS category, film.rental_rate AS price FROM category LEFT JOIN \
380         film_category ON category.category_id = film_category.category_id LEFT JOIN film ON \
381         film_category.film_id = film.film_id GROUP BY film.film_id, category.name;";
382        //NOTE: decimal places were shortened by parser, so this time is shortened
383        let time = "2018-02-05T02:46:47.273Z";
384
385        let entry = format!(
386            "# Time: {}
387# User@Host: msandbox[msandbox] @ localhost []  Id:    10
388# Query_time: 0.000352  Lock_time: 0.000000 Rows_sent: 0  Rows_examined: 0
389use mysql;
390SET timestamp=1517798807;
391{}
392{},
393",
394            time, sql_comment, sql
395        );
396
397        let mut eb = entry.as_bytes().to_vec();
398
399        let config = EntryCodecConfig {
400            masking: Default::default(),
401            map_comment_context: Some(|d| {
402                let acc = SqlStatementContext {
403                    request_id: d
404                        .get(&*BytesMut::from("request_id"))
405                        .and_then(|b| Some(b.clone())),
406                    caller: d
407                        .get(&*BytesMut::from("file"))
408                        .and_then(|b| Some(b.clone())),
409                    function: d
410                        .get(&*BytesMut::from("method"))
411                        .and_then(|b| Some(b.clone())),
412                    line: d
413                        .get(&*BytesMut::from("line"))
414                        .and_then(|b| String::from_utf8_lossy(b).parse().ok()),
415                };
416
417                if acc == SqlStatementContext::default() {
418                    None
419                } else {
420                    Some(acc)
421                }
422            }),
423        };
424
425        let mut ff = Framed::new(Cursor::new(&mut eb), EntryCodec::new(config));
426        let e = ff.next().await.unwrap().unwrap();
427
428        let stmts = parse_sql(sql, &EntryMasking::None).unwrap();
429
430        let expected_stmt = EntrySqlStatement {
431            statement: stmts.get(0).unwrap().clone(),
432            context: Some(SqlStatementContext {
433                request_id: Some("apLo5wdqkmKw4W7vGfiBc5".into()),
434                caller: Some("src/endpoints/original/mod.rs".into()),
435                function: Some("notifications()".into()),
436                line: Some(38),
437            }),
438        };
439
440        let expected_sql = sql.trim().strip_suffix(";").unwrap();
441
442        let expected_entry = Entry {
443            call: EntryCall::new(
444                //TODO: handle error
445                DateTime::from_str(time).unwrap(),
446                1517798807,
447            ),
448            session: EntrySession {
449                user_name: Bytes::from("msandbox"),
450                sys_user_name: Bytes::from("msandbox"),
451                host_name: Some(Bytes::from("localhost")),
452                ip_address: None,
453                thread_id: 10,
454            },
455            stats: EntryStats {
456                query_time: 0.000352,
457                lock_time: 0.0,
458                rows_sent: 0,
459                rows_examined: 0,
460            },
461            sql_attributes: EntrySqlAttributes {
462                sql: Bytes::from(expected_sql),
463                statement: SqlStatement(expected_stmt),
464            },
465        };
466
467        assert_eq!(e, expected_entry);
468    }
469
470    #[tokio::test]
471    async fn parses_multiple_entries() {
472        let entries = "# Time: 2018-02-05T02:46:47.273786Z
473# User@Host: msandbox[msandbox] @ localhost []  Id:    10
474# Query_time: 0.000352  Lock_time: 0.000000 Rows_sent: 0  Rows_examined: 0
475SET timestamp=1517798807;
476-- ID: 123, caller: hello_world()
477SELECT film.film_id AS FID, film.title AS title, film.description AS description, category.name AS category, film.rental_rate AS price
478FROM category LEFT JOIN film_category ON category.category_id = film_category.category_id LEFT JOIN film ON film_category.film_id = film.film_id
479GROUP BY film.film_id, category.name;
480# Time: 2018-02-05T02:46:47.273787Z
481# User@Host: msandbox[msandbox] @ localhost []  Id:    10
482# Query_time: 0.000352  Lock_time: 0.000000 Rows_sent: 0  Rows_examined: 0
483SET timestamp=1517798808;
484/*!40101 SET NAMES utf8 */;
485# Time: 2018-02-05T02:46:47.273788Z
486# User@Host: msandbox[msandbox] @ localhost []  Id:    10
487# Query_time: 0.000352  Lock_time: 0.000000 Rows_sent: 0  Rows_examined: 0
488SET timestamp=1517798809;
489-- ID: 456, caller: hello_world()
490SELECT film2.film_id AS FID, film2.title AS title, film2.description AS description, category.name
491AS category, film2.rental_rate AS price
492FROM category LEFT JOIN film_category ON category.category_id = film_category.category_id LEFT
493JOIN film2 ON film_category.film_id = film2.film_id
494GROUP BY film2.film_id, category.name;
495";
496
497        let mut eb = entries.as_bytes().to_vec();
498
499        let mut ff = Framed::with_capacity(Cursor::new(&mut eb), EntryCodec::default(), 4);
500
501        let mut found = 0;
502        let mut invalid = 0;
503
504        while let Some(res) = ff.next().await {
505            let e = res.unwrap();
506            found.add_assign(1);
507
508            if let EntryStatement::InvalidStatement(_) = e.sql_attributes.statement {
509                invalid.add_assign(1);
510            }
511        }
512
513        assert_eq!(found, 3, "found");
514        assert_eq!(invalid, 1, "valid");
515    }
516
517    #[tokio::test]
518    async fn parses_select_objects() {
519        let sql = String::from("SELECT film.film_id AS FID, film.title AS title, film.description AS description, category.name AS category, film.rental_rate AS price
520    FROM category LEFT JOIN film_category ON category.category_id = film_category.category_id LEFT
521    JOIN film ON film_category.film_id = film.film_id LEFT JOIN film AS dupe_film ON film_category
522    .film_id = dupe_film.film_id LEFT JOIN other.film AS other_film ON other_film.film_id =
523    film_category.film_id
524    GROUP BY film.film_id, category.name;");
525
526        let entry = format!(
527            "# Time: 2018-02-05T02:46:47.273786Z
528    # User@Host: msandbox[msandbox] @ localhost []  Id:    10
529    # Query_time: 0.000352  Lock_time: 0.000000 Rows_sent: 0  Rows_examined: 0
530    SET timestamp=1517798807;
531    {}",
532            sql
533        );
534
535        let expected = vec![
536            EntrySqlStatementObject {
537                schema_name: None,
538                object_name: "category".as_bytes().into(),
539            },
540            EntrySqlStatementObject {
541                schema_name: None,
542                object_name: "film".as_bytes().into(),
543            },
544            EntrySqlStatementObject {
545                schema_name: None,
546                object_name: "film_category".as_bytes().into(),
547            },
548            EntrySqlStatementObject {
549                schema_name: Some("other".as_bytes().into()),
550                object_name: "film".as_bytes().into(),
551            },
552        ];
553
554        let mut eb = entry.as_bytes().to_vec();
555
556        let mut ff = Framed::new(Cursor::new(&mut eb), EntryCodec::default());
557        let e = ff.next().await.unwrap().unwrap();
558
559        match e.sql_attributes.statement() {
560            SqlStatement(s) => {
561                assert_eq!(s.objects(), expected);
562                assert_eq!(s.sql_type().to_string(), "SELECT".to_string());
563            }
564            _ => {
565                panic!("should have parsed sql as SqlStatement")
566            }
567        }
568    }
569
570    #[tokio::test]
571    async fn parse_log_file() {
572        let f = File::open("assets/slow-test-queries.log").await.unwrap();
573        let mut ff = Framed::new(f, EntryCodec::default());
574
575        let mut i = 0;
576
577        while let Some(res) = ff.next().await {
578            let _ = res.unwrap();
579            i.add_assign(1);
580        }
581
582        assert_eq!(i, 310);
583    }
584
585    #[tokio::test]
586    async fn parse_mysql_log_file_small_capacity() {
587        let f = File::open("assets/slow-test-queries.log").await.unwrap();
588        let mut ff = Framed::with_capacity(f, EntryCodec::default(), 4);
589
590        let mut i = 0;
591
592        while let Some(res) = ff.next().await {
593            let _ = res.unwrap();
594            i.add_assign(1);
595        }
596
597        assert_eq!(i, 310);
598    }
599}