mysql_slowlog_parser/
parser.rs

1use crate::EntryMasking;
2use bytes::{BufMut, Bytes, BytesMut};
3use sqlparser::ast::Statement;
4use sqlparser::dialect::MySqlDialect;
5use sqlparser::parser::{Parser as SQLParser, ParserError};
6use sqlparser::tokenizer::{Token, Tokenizer};
7use std::borrow::Cow;
8use std::collections::HashMap;
9use std::ops::Not;
10use std::str;
11use std::str::FromStr;
12use winnow::ascii::{
13    Caseless, alpha1, alphanumeric1, digit1, float, line_ending, multispace0, multispace1,
14    till_line_ending,
15};
16use winnow::combinator::repeat;
17use winnow::combinator::{alt, trace};
18use winnow::combinator::{not, opt};
19use winnow::combinator::{preceded, terminated};
20use winnow::error::{ContextError, ErrMode, InputError};
21use winnow::token::{any, literal, take, take_till, take_until};
22use winnow::{ModalResult, Parser, Partial, seq};
23use winnow_datetime::DateTime;
24use winnow_iso8601::datetime::datetime;
25
26pub type Stream<'i> = Partial<&'i [u8]>;
27
28/// A struct holding a `DateTime` parsed from the Time: line of the entry
29/// ex: `# Time: 2018-02-05T02:46:43.015898Z`
30#[derive(Clone, Copy)]
31pub struct TimeLine {
32    time: DateTime,
33}
34
35impl TimeLine {
36    /// returns a clone of the DateTime parsed from the Time: line
37    pub fn time(&self) -> DateTime {
38        self.time.clone()
39    }
40}
41
42/// parses "# Time: .... entry line and returns a `DateTime`
43// # Time: 2015-06-26T16:43:23+0200";
44pub fn parse_entry_time(i: &mut Stream) -> ModalResult<DateTime> {
45    trace("parse_entry_time", move |input: &mut Stream| {
46        let dt = seq!(
47            _: literal("# Time:"),
48            _: multispace1,
49            datetime,
50        )
51        .parse_next(input)?;
52
53        Ok(dt.0)
54    })
55    .parse_next(i)
56}
57
58/// values from the User: entry line
59/// ex. # User@Host: msandbox\[msandbox\] @ localhost []  Id:     3
60#[derive(Clone, Debug, Eq, PartialEq)]
61pub struct SessionLine {
62    pub(crate) user: Bytes,
63    pub(crate) sys_user: Bytes,
64    pub(crate) host: Option<Bytes>,
65    pub(crate) ip_address: Option<Bytes>,
66    pub(crate) thread_id: u32,
67}
68
69impl SessionLine {
70    /// returns user as`Bytes`
71    pub fn user(&self) -> Bytes {
72        self.user.clone()
73    }
74
75    /// returns sys_user as`Bytes`
76    pub fn sys_user(&self) -> Bytes {
77        self.sys_user.clone()
78    }
79
80    /// returns possible host as`Option<Bytes>`
81    pub fn host(&self) -> Option<Bytes> {
82        self.host.clone()
83    }
84
85    /// returns possible ip_address as `Option<Bytes>`
86    pub fn ip_address(&self) -> Option<Bytes> {
87        self.ip_address.clone()
88    }
89
90    /// returns thread_id as `Bytes`
91    pub fn thread_id(&self) -> u32 {
92        self.thread_id
93    }
94}
95
96#[derive(Debug, PartialEq, Default)]
97pub struct HeaderLines {
98    version: Bytes,
99    tcp_port: Option<usize>,
100    socket: Option<Bytes>,
101}
102
103pub fn log_header<'a>(i: &mut Stream<'_>) -> ModalResult<HeaderLines> {
104    trace("log_header", move |input: &mut Stream<'_>| {
105        // check for the '#' since the last parser in the set is greedy
106        let head = seq!{
107            HeaderLines {
108                _: not(literal("#")),
109                _: take_until(1.., ", Version: "),
110                _:  (", Version: "),
111                version: take_until(1.., " started with:").map(|v: &[u8]| v.to_owned().into()),
112                _: literal(" started with:"),
113                _: multispace1,
114                _: literal("Tcp port:"),
115                _: multispace1,
116                tcp_port: opt(digit1).map(|v: Option<&[u8]>| v.and_then(|d| Some(str::from_utf8(d).unwrap().parse().unwrap()))),
117                _: multispace1,
118                _: literal("Unix socket: "),
119                socket: opt(take_till(1.., "\n".as_bytes())).map(|v: Option<&[u8]>| v.and_then(|d| Some(d.to_owned().into()))),
120                _: till_line_ending,
121                _: line_ending,
122                _: till_line_ending,
123                _: line_ending,
124            }
125        }.parse_next(input)?;
126
127        Ok(head)
128    }).parse_next(i)
129}
130
131pub fn sql_lines<'a>(i: &mut Stream<'_>) -> ModalResult<Bytes> {
132    trace("sql_lines", move |input: &mut Stream<'_>| {
133        let mut acc = BytesMut::new();
134
135        let mut escaped = false;
136        let mut quotes = vec![];
137
138        loop {
139            let c = any(input)? as char;
140
141            acc.put_slice(&[c as u8]);
142
143            if escaped.not() && (c == '\'' || c == '\"' || c == '`') {
144                if let Some(q) = quotes.last() {
145                    if &c == q {
146                        let _ = quotes.pop();
147                    } else {
148                        quotes.push(c);
149                    }
150                } else {
151                    quotes.push(c);
152                }
153            }
154
155            if escaped.not() && c == '\\' {
156                escaped = true;
157            } else {
158                escaped = false;
159            }
160
161            if quotes.len() == 0 && c == ';' {
162                return Ok(acc.freeze());
163            }
164        }
165    })
166    .parse_next(i)
167}
168
169pub fn alphanumerichyphen1<'a>(i: &mut Stream<'a>) -> ModalResult<&'a [u8]> {
170    alt((alphanumeric1, literal("_"), literal("-"))).parse_next(i)
171}
172
173pub fn host_name<'a>(i: &mut Stream<'_>) -> ModalResult<Bytes> {
174    trace("host_name", move |input: &mut Stream<'_>| {
175        let (mut first, second): (Vec<&[u8]>, &[u8]) = alt((
176            ((
177                repeat(1.., terminated(alphanumerichyphen1, literal("."))),
178                alpha1,
179            )),
180            ((repeat(1, alphanumerichyphen1), take(0 as usize))),
181        ))
182        .parse_next(input)?;
183
184        if !second.is_empty() {
185            first.push(second);
186        }
187
188        let b = first
189            .iter()
190            .enumerate()
191            .fold(BytesMut::new(), |mut acc, (c, p)| {
192                if c > 0 {
193                    acc.put_slice(".".as_bytes());
194                }
195
196                acc.put_slice(p);
197                acc
198            });
199
200        Ok(b.freeze())
201    })
202    .parse_next(i)
203}
204
205/// ip address handler that only handles IP4
206pub fn ip_address<'a>(i: &mut Stream<'_>) -> ModalResult<Bytes> {
207    trace("ip_address", move |input: &mut Stream<'_>| {
208        let p = seq!(
209            digit1,
210            preceded(literal("."), digit1),
211            preceded(literal("."), digit1),
212            preceded(literal("."), digit1),
213        )
214        .parse_next(input)?;
215
216        let b = [p.0, p.1, p.2, p.3]
217            .iter()
218            .enumerate()
219            .fold(BytesMut::new(), |mut acc, (c, p)| {
220                if c > 0 {
221                    acc.put_slice(".".as_bytes());
222                }
223
224                acc.put_slice(p);
225                acc
226            });
227
228        Ok(b.freeze())
229    })
230    .parse_next(i)
231}
232
233/// thread id parser for 'Id: [\d+]'
234pub fn entry_user_thread_id<'a>(i: &mut Stream<'_>) -> ModalResult<u32> {
235    trace("entry_user_thread_id", move |input: &mut Stream<'_>| {
236        let id = seq!(
237            _: literal("Id:"),
238            _: multispace1,
239            digit1
240        )
241        .parse_next(input)?;
242
243        Ok(u32::from_str(str::from_utf8(id.0).unwrap()).unwrap())
244    })
245    .parse_next(i)
246}
247
248pub fn user_name(i: &mut Stream) -> ModalResult<Bytes> {
249    trace("user_name", move |input: &mut Stream<'_>| {
250        let parts: Vec<&[u8]> =
251            repeat(1.., alt((alphanumeric1, literal("_")))).parse_next(input)?;
252
253        let b = parts.iter().fold(BytesMut::new(), |mut acc, p| {
254            acc.put_slice(p);
255            acc
256        });
257
258        Ok(b.freeze())
259    })
260    .parse_next(i)
261}
262
263/// user line parser
264pub fn entry_user(i: &mut Stream) -> ModalResult<SessionLine> {
265    trace("entry_user", move |input: &mut Stream<'_>| {
266        let s = seq! { SessionLine {
267            _: multispace0,
268            _: literal("# User@Host:"),
269            _: multispace1,
270            user: user_name,
271            _: literal("["),
272            sys_user: user_name,
273            _: literal("]"),
274            _: multispace1,
275            _: literal("@"),
276            _: multispace1,
277            host: opt(host_name),
278            _: multispace0,
279            _: literal("["),
280            _: multispace0,
281            ip_address: opt(ip_address),
282            _: multispace0,
283            _: literal("]"),
284            _: multispace1,
285            thread_id: entry_user_thread_id,
286        }}
287        .parse_next(input)?;
288
289        Ok(s)
290    })
291    .parse_next(i)
292}
293
294/// Struct containing information parsed from the initial comment in a SQL query
295#[derive(Clone, Debug, Default, PartialEq)]
296pub struct SqlStatementContext {
297    /// example field, should just be part of a HashMap
298    pub request_id: Option<Bytes>,
299    /// example field, should just be part of a HashMap
300    pub caller: Option<Bytes>,
301    /// example field, should just be part of a HashMap
302    pub function: Option<Bytes>,
303    /// example field, should just be part of a HashMap
304    pub line: Option<u32>,
305}
306
307impl SqlStatementContext {
308    /// example method, should be replaced by generic key lookup
309    pub fn request_id(&self) -> Option<Cow<str>> {
310        if let Some(i) = &self.request_id {
311            Some(String::from_utf8_lossy(i.as_ref()))
312        } else {
313            None
314        }
315    }
316
317    /// example method, should be replaced by generic key lookup
318    pub fn caller(&self) -> Option<Cow<str>> {
319        if let Some(c) = &self.caller {
320            Some(String::from_utf8_lossy(c.as_ref()))
321        } else {
322            None
323        }
324    }
325
326    /// example method, should be replaced by generic key lookup
327    pub fn function(&self) -> Option<Cow<str>> {
328        if let Some(f) = &self.function {
329            Some(String::from_utf8_lossy(f.as_ref()))
330        } else {
331            None
332        }
333    }
334
335    /// example method, should be replaced by generic key lookup
336    pub fn line(&self) -> Option<u32> {
337        self.line
338    }
339}
340
341pub fn details_comment<'a>(i: &mut Stream) -> ModalResult<HashMap<Bytes, Bytes>> {
342    trace("details_comment", move |input: &mut Stream<'_>| {
343        let mut name: Option<Bytes> = None;
344
345        let mut res: HashMap<Bytes, BytesMut> = HashMap::new();
346
347        let _ = literal("--").parse_next(input)?;
348
349        loop {
350            if name.is_none() {
351                if let Ok(n) = details_tag(input) {
352                    name.replace(n.clone());
353                    if let Some(_) = res.insert(n, BytesMut::new()) {
354                        //TODO: see if you need to set the ErrorKind::Assert specifically, like before
355                        return Err(ErrMode::Cut(ContextError::new()));
356                    }
357                }
358            }
359
360            if let Ok(c) = any::<Partial<&[u8]>, InputError<_>>(input) {
361                let c = c as char;
362
363                if c == '\n' || c == '\r' {
364                    break;
365                }
366
367                if c == ';' || c == ',' {
368                    name = None;
369                    continue;
370                }
371
372                if let Some(k) = &name {
373                    // TODO: previously this specified ErrorKind::Assert, figure out if this needs to be specificied still
374                    let v = &mut res.get_mut(k).ok_or(ErrMode::Cut(ContextError::new()))?;
375
376                    v.put_bytes(c as u8, 1);
377                } else {
378                    // TODO: previously this specified ErrorKind::Assert, figure out if this needs to be specificied still
379                    return Err(ErrMode::Cut(ContextError::new()));
380                }
381
382                continue;
383            } else {
384                break;
385            }
386        }
387
388        Ok(res.into_iter().map(|(k, v)| (k, v.freeze())).collect())
389    })
390    .parse_next(i)
391}
392
393pub fn details_tag<'a>(i: &mut Stream) -> ModalResult<Bytes> {
394    trace("details_tag", move |input: &mut Stream<'_>| {
395        let name = seq!(
396            _: multispace0,
397            user_name,
398            _: multispace0,
399            _: alt((literal(":"), literal("="))),
400            _: multispace0,
401        )
402        .parse_next(input)?;
403
404        Ok(name.0.into())
405    })
406    .parse_next(i)
407}
408
409/// values parsed from stats entry line
410#[derive(Clone, Copy, Debug, PartialEq)]
411pub struct StatsLine {
412    /// how long the overall query took
413    pub(crate) query_time: f64,
414    /// how long the query held locks
415    pub(crate) lock_time: f64,
416    /// how many rows were sent
417    pub(crate) rows_sent: u32,
418    /// how many rows were scanned
419    pub(crate) rows_examined: u32,
420}
421
422impl StatsLine {
423    /// how long the overall query took
424    pub fn query_time(&self) -> f64 {
425        self.query_time.clone()
426    }
427    /// how long the query held locks
428    pub fn lock_time(&self) -> f64 {
429        self.lock_time.clone()
430    }
431
432    /// how many rows were sent
433    pub fn rows_sent(&self) -> u32 {
434        self.rows_sent.clone()
435    }
436    /// how many rows were scanned
437    pub fn rows_examined(&self) -> u32 {
438        self.rows_examined.clone()
439    }
440}
441
442/// parse '# Query_time:...' entry line
443pub fn parse_entry_stats(i: &mut Stream<'_>) -> ModalResult<StatsLine> {
444    trace("parse_entry_stats", move |input: &mut Stream<'_>| {
445        let stats = seq! {StatsLine {
446            _: literal("#"),
447            _: multispace1,
448            _: literal("Query_time:"),
449            _: multispace1,
450            query_time: float,
451            _: multispace1,
452            _: literal("Lock_time:"),
453            _: multispace1,
454            lock_time: float,
455            _: multispace1,
456            _: literal("Rows_sent:"),
457            _: multispace1,
458            rows_sent: digit1.map(|d| str::from_utf8(d).unwrap().parse().unwrap()),
459            _: multispace1,
460            _: literal("Rows_examined:"),
461            _: multispace1,
462            rows_examined: digit1.map(|d| str::from_utf8(d).unwrap().parse().unwrap()),
463        }}
464        .parse_next(input)?;
465
466        Ok(stats)
467    })
468    .parse_next(i)
469}
470
471/// admin command values parsed from sql lines of an entry
472#[derive(Clone, Debug, PartialEq)]
473pub struct EntryAdminCommand {
474    /// the admin command sent
475    pub command: Bytes,
476}
477
478/// parse "# administrator command: " entry line
479pub fn admin_command<'a>(i: &mut Stream) -> ModalResult<EntryAdminCommand> {
480    trace("admin_command", move |input: &mut Stream<'_>| {
481        let command = seq!(
482            _: literal("# administrator command:"),
483            _: multispace1,
484            alphanumerichyphen1,
485            _: literal(";"),
486        )
487        .parse_next(input)?;
488
489        Ok(EntryAdminCommand {
490            command: command.0.to_owned().into(),
491        })
492    })
493    .parse_next(i)
494}
495
496/// parses 'USE database=\w+;' command which shows up at the start of some entry sql
497pub fn use_database(i: &mut Stream) -> ModalResult<Bytes> {
498    trace("use_database", move |input: &mut Stream<'_>| {
499        let db_name = seq!(
500            _: literal(Caseless("USE")),
501            _: multispace1,
502            user_name,
503            _: multispace0,
504            _: literal(";"),
505        )
506        .parse_next(input)?;
507
508        Ok(db_name.0.into())
509    })
510    .parse_next(i)
511}
512
513/// parses 'SET timestamp=\d{10};' command which starts
514pub fn start_timestamp_command(i: &mut Stream) -> ModalResult<u32> {
515    trace("start_timestamp_command", move |input: &mut Stream<'_>| {
516        let time = seq!(
517            _: literal("SET timestamp"),
518            _: multispace0,
519            _: literal("="),
520            _: multispace0,
521            digit1,
522            _: multispace0,
523            _: literal(";"),
524        )
525        .parse_next(input)?;
526
527        Ok(u32::from_str(str::from_utf8(time.0).unwrap()).unwrap())
528    })
529    .parse_next(i)
530}
531
532/// Parses one or more sql statements using `sqlparser::parse_statements`. This uses the
533/// `sqlparser::Tokenizer` to first tokenize the SQL and replace tokenized values with an
534/// masked value determined by the `&EntryMasking` value passed as an argument. In the case of
535/// `EntryMasking::None` this call is identical to calling `sqlparse::parse_statements`.
536/// command: " entry line
537pub fn parse_sql(sql: &str, mask: &EntryMasking) -> Result<Vec<Statement>, ParserError> {
538    let mut tokenizer = Tokenizer::new(&MySqlDialect {}, sql);
539    let mut tokens = tokenizer.tokenize()?;
540
541    tokens = mask_tokens(tokens, mask);
542
543    let mut parser = SQLParser::new(&MySqlDialect {}).with_tokens(tokens);
544
545    parser.parse_statements()
546}
547
548/// Replaces numbers, strings and literal tokenized by `sql_parser::Tokenizer` and replaces them
549/// with a masking values. Passing a value of `EntryMasking::None` will simply return the
550/// `Vec<Token>` passed in.
551pub fn mask_tokens(tokens: Vec<Token>, mask: &EntryMasking) -> Vec<Token> {
552    let mut acc = vec![];
553
554    if mask == &EntryMasking::None {
555        return tokens;
556    }
557
558    for t in tokens {
559        let mt = if let Token::Number(_, _) = t {
560            Token::Placeholder("?".into())
561        } else if let Token::Number(_, _) = t {
562            Token::Placeholder("?".into())
563        } else if let Token::SingleQuotedString(_) = t {
564            Token::Placeholder("?".into())
565        } else if let Token::DoubleQuotedString(_) = t {
566            Token::Placeholder("?".into())
567        } else if let Token::NationalStringLiteral(_) = t {
568            Token::Placeholder("?".into())
569        } else if let Token::EscapedStringLiteral(_) = t {
570            Token::Placeholder("?".into())
571        } else if let Token::HexStringLiteral(_) = t {
572            Token::Placeholder("?".into())
573        } else {
574            t
575        };
576
577        acc.push(mt);
578    }
579
580    acc
581}
582
583#[cfg(test)]
584mod tests {
585    use crate::EntryMasking;
586    use crate::parser::{
587        EntryAdminCommand, HeaderLines, SessionLine, StatsLine, Stream, admin_command,
588        details_comment, entry_user, host_name, ip_address, log_header, parse_entry_stats,
589        parse_entry_time, parse_sql, sql_lines, start_timestamp_command, use_database,
590    };
591    use bytes::Bytes;
592    use std::assert_eq;
593    use std::collections::HashMap;
594    use winnow_datetime::{Date, DateTime, Offset, Time};
595
596    #[test]
597    fn parses_time_line() {
598        let i = "# Time: 2015-06-26T16:43:23+0200";
599
600        let expected = DateTime {
601            date: Date::YMD {
602                year: 2015,
603                month: 6,
604                day: 26,
605            },
606            time: Time {
607                hour: 16,
608                minute: 43,
609                second: 23,
610                millisecond: 0,
611                offset: Some(Offset {
612                    offset_hours: 2,
613                    offset_minutes: 0,
614                }),
615            },
616        };
617
618        let mut s = Stream::new(i.as_bytes());
619
620        //TODO: check for leftovers
621        let dt = parse_entry_time(&mut s).unwrap();
622        assert_eq!(expected, dt);
623    }
624
625    #[test]
626    fn parses_use_database() {
627        let i = "use mysql;";
628        let mut s = Stream::new(i.as_bytes());
629
630        let res = use_database(&mut s).unwrap();
631        assert_eq!(
632            (s, res),
633            (Stream::new("".as_bytes()), "mysql".trim().into())
634        );
635    }
636
637    #[test]
638    fn parses_localhost_host_name() {
639        let i = "localhost ";
640
641        let mut s = Stream::new(i.as_bytes());
642        let res = host_name(&mut s).unwrap();
643
644        assert_eq!(res, i.trim());
645    }
646
647    #[test]
648    fn parses_full_host_name() {
649        let i = "local.tests.rs ";
650
651        let mut s = Stream::new(i.as_bytes());
652        let res = host_name(&mut s).unwrap();
653
654        assert_eq!(res, Bytes::from("local.tests.rs".trim()));
655    }
656
657    #[test]
658    fn parses_ip_address() {
659        let i = "127.0.0.2 ";
660
661        let mut s = Stream::new(i.as_bytes());
662        let res = ip_address(&mut s).unwrap();
663
664        assert_eq!(res, Bytes::from(i.trim()));
665    }
666
667    #[test]
668    fn parses_user_line_no_ip() {
669        let i = "# User@Host: msandbox[msandbox] @ localhost []  Id:     3\n";
670
671        let expected = SessionLine {
672            user: Bytes::from("msandbox"),
673            sys_user: Bytes::from("msandbox"),
674            host: Some(Bytes::from("localhost")),
675            ip_address: None,
676            thread_id: 3,
677        };
678
679        let mut s = Stream::new(i.as_bytes());
680        let res = entry_user(&mut s).unwrap();
681        //TODO: check for left overs
682        assert_eq!(expected, res);
683    }
684
685    #[test]
686    fn parses_user_line_no_host() {
687        let i = "# User@Host: lobster[lobster] @ [192.168.56.1]  Id:   190\n";
688        let mut s = Stream::new(i.as_bytes());
689        let expected = SessionLine {
690            user: Bytes::from("lobster"),
691            sys_user: Bytes::from("lobster"),
692            host: None,
693            ip_address: Some(Bytes::from("192.168.56.1")),
694            thread_id: 190,
695        };
696
697        let res = entry_user(&mut s).unwrap();
698        assert_eq!(expected, res);
699    }
700
701    #[test]
702    fn parses_stats_line() {
703        let i = "# Query_time: 1.000016  Lock_time: 2.000000 Rows_sent: 3  Rows_examined: 4\n";
704
705        let expected = StatsLine {
706            query_time: 1.000016,
707            lock_time: 2.0,
708            rows_sent: 3,
709            rows_examined: 4,
710        };
711
712        let mut s = Stream::new(i.as_bytes());
713        let res = parse_entry_stats(&mut s).unwrap();
714        //TODO: check for leftovers
715        assert_eq!(expected, res);
716    }
717
718    #[test]
719    fn parses_admin_command_line() {
720        let i = "# administrator command: Quit;\n";
721
722        let expected = EntryAdminCommand {
723            command: "Quit".into(),
724        };
725
726        let mut s = Stream::new(i.as_bytes());
727        //TODO: check for leftovers
728        let res = admin_command(&mut s).unwrap();
729        assert_eq!(expected, res);
730    }
731
732    #[test]
733    fn parses_details_comment() {
734        let s0 = "-- Id: 123; long: some kind of details here; caller: hello_world()\n";
735        let s1 = "-- Id: 123, long: some kind of details here, caller : hello_world()\n";
736        let s2 = "-- Id= 123, long = some kind of details here, caller= hello_world()\n";
737
738        let expected = (
739            Stream::new("".as_bytes()),
740            HashMap::from([
741                ("Id".into(), "123".into()),
742                ("long".into(), "some kind of details here".into()),
743                ("caller".into(), "hello_world()".into()),
744            ]),
745        );
746
747        let mut s = Stream::new(s0.as_bytes());
748        let res = details_comment(&mut s).unwrap();
749        //TODO: Stream ToString and ToStr
750        assert_eq!((s, res), expected);
751
752        let mut s = Stream::new(s1.as_bytes());
753        let res = details_comment(&mut s).unwrap();
754        assert_eq!((s, res), expected);
755
756        let mut s = Stream::new(s2.as_bytes());
757        let res = details_comment(&mut s).unwrap();
758
759        assert_eq!((s, res), expected);
760    }
761
762    #[test]
763    fn parses_details_comment_trailing_key() {
764        let i = "-- Id: 123, long: some kind of details here, caller: hello_world():52\n";
765        let mut s = Stream::new(i.as_bytes());
766
767        let res = details_comment(&mut s).unwrap();
768
769        let expected = (
770            Stream::new("".as_bytes()),
771            HashMap::from([
772                ("Id".into(), "123".into()),
773                ("long".into(), "some kind of details here".into()),
774                ("caller".into(), "hello_world():52".into()),
775            ]),
776        );
777
778        assert_eq!((s, res), expected);
779
780        let i = "-- Id: 123, long: some kind of details here, caller: hello_world(): 52\n";
781        let mut s = Stream::new(i.as_bytes());
782
783        let res = details_comment(&mut s).unwrap();
784        let expected = (
785            Stream::new("".as_bytes()),
786            HashMap::from([
787                ("Id".into(), "123".into()),
788                ("long".into(), "some kind of details here".into()),
789                ("caller".into(), "hello_world(): 52".into()),
790            ]),
791        );
792
793        assert_eq!((s, res), expected);
794    }
795
796    #[test]
797    fn parses_start_timestamp() {
798        let l = "SET timestamp=1517798807;";
799        let mut s = Stream::new(l.as_bytes());
800        let res = start_timestamp_command(&mut s).unwrap();
801
802        let expected = (Stream::new("".as_bytes()), 1517798807);
803
804        assert_eq!((s, res), expected);
805    }
806
807    #[test]
808    fn parses_masked_selects() {
809        let sql0 = "SELECT a, b, 123, 'abcd', myfunc(b) \
810           FROM table_1 \
811           WHERE a > b AND b < 100 \
812           ORDER BY a DESC, b";
813
814        let sql1 = "SELECT a, b, 456, 'efg', myfunc(b) \
815           FROM table_1 \
816           WHERE a > b AND b < 1000 \
817           ORDER BY a DESC, b";
818
819        let ast0 = parse_sql(sql0, &EntryMasking::PlaceHolder).unwrap();
820        let ast1 = parse_sql(sql1, &EntryMasking::PlaceHolder).unwrap();
821
822        assert_eq!(ast0, ast1);
823    }
824
825    #[test]
826    fn parses_select_sql() {
827        let sql = "SELECT a, b, 123, 'abcd', myfunc(b) \
828           FROM table_1 \
829           WHERE a > b AND b < 100 \
830           ORDER BY a DESC, b;";
831
832        let mut s = Stream::new(sql.as_bytes());
833        let res = sql_lines(&mut s).unwrap();
834
835        assert_eq!(res, sql);
836    }
837
838    #[test]
839    fn parses_setter_sql() {
840        let sql = "/*!40101 SET NAMES utf8 */;\n";
841
842        let mut s = Stream::new(sql.as_bytes());
843        let res = sql_lines(&mut s).unwrap();
844
845        assert_eq!(res, sql.trim());
846    }
847
848    #[test]
849    fn parses_quoted_terminator_sql() {
850        let sql = "SELECT
851a.actor_id,
852a.first_name,
853a.last_name,
854GROUP_CONCAT(DISTINCT CONCAT(c.name, ': ',
855                (SELECT GROUP_CONCAT(f.title ORDER BY f.title SEPARATOR ', ')
856                    FROM sakila.film f
857                    INNER JOIN sakila.film_category fc
858                      ON f.film_id = fc.film_id
859                    INNER JOIN sakila.film_actor fa
860                      ON f.film_id = fa.film_id
861                    WHERE fc.category_id = c.category_id
862                    AND fa.actor_id = a.actor_id
863                 )
864             )
865             ORDER BY c.name SEPARATOR '; ')
866AS film_info
867FROM sakila.actor a;
868";
869
870        let mut s = Stream::new(sql.as_bytes());
871        let res = sql_lines(&mut s).unwrap();
872
873        assert_eq!((s, res), (Stream::new("\n".as_bytes()), sql.trim().into()));
874    }
875
876    #[test]
877    fn parses_quoted_quoted_terminator_sql() {
878        let sql = r#"SELECT
879a.actor_id,
880a.first_name,
881a.last_name,
882GROUP_CONCAT(DISTINCT CONCAT(c.name, ': ',
883                (SELECT GROUP_CONCAT(f.title ORDER BY f.title SEPARATOR ', ')
884                    FROM sakila.film f
885                    INNER JOIN sakila.film_category fc
886                      ON f.film_id = fc.film_id
887                    INNER JOIN sakila.film_actor fa
888                      ON f.film_id = fa.film_id
889                    WHERE fc.category_id = c.category_id
890                    AND fa.actor_id = a.actor_id
891                 )
892             )
893             ORDER BY c.name SEPARATOR '\'\"; ')
894AS film_info
895FROM sakila.actor a;
896"#;
897
898        let mut s = Stream::new(sql.as_bytes());
899        let res = sql_lines(&mut s).unwrap();
900
901        assert_eq!(res, sql.trim());
902    }
903
904    #[test]
905    fn parses_header() {
906        let h = "/home/karl/mysql/my-5.7/bin/mysqld, Version: 5.7.20-log (MySQL Community Server (GPL)). started with:
907Tcp port: 12345  Unix socket: /tmp/12345/mysql_sandbox12345.sock
908Time                 Id Command    Argument\n";
909
910        let mut s = Stream::new(h.as_bytes());
911
912        let res = log_header(&mut s).unwrap();
913
914        assert_eq!(
915            (s, res),
916            (
917                Stream::new("".as_bytes()),
918                HeaderLines {
919                    version: Bytes::from("5.7.20-log (MySQL Community Server (GPL))."),
920                    tcp_port: Some(12345),
921                    socket: Some(Bytes::from("/tmp/12345/mysql_sandbox12345.sock")),
922                }
923            )
924        );
925    }
926}