mysql_slowlog_parser/
parser.rs

1use crate::EntryMasking;
2use bytes::{BufMut, Bytes, BytesMut};
3use sqlparser::ast::Statement;
4use sqlparser::dialect::MySqlDialect;
5use sqlparser::parser::{Parser as SQLParser, ParserError};
6use sqlparser::tokenizer::{Token, Tokenizer};
7use std::borrow::Cow;
8use std::collections::HashMap;
9use std::ops::Not;
10use std::str;
11use std::str::FromStr;
12use winnow::ascii::{
13    Caseless, alpha1, alphanumeric1, digit1, float, line_ending, multispace0, multispace1,
14    till_line_ending,
15};
16use winnow::combinator::repeat;
17use winnow::combinator::{alt, trace};
18use winnow::combinator::{not, opt};
19use winnow::combinator::{preceded, terminated};
20use winnow::error::{ContextError, ErrMode, InputError};
21use winnow::token::{any, literal, take, take_till, take_until};
22use winnow::{ModalResult, Parser, Partial, seq};
23use winnow_datetime::DateTime;
24use winnow_iso8601::datetime::datetime;
25
26pub type Stream<'i> = Partial<&'i [u8]>;
27
28/// A struct holding a `DateTime` parsed from the Time: line of the entry
29/// ex: `# Time: 2018-02-05T02:46:43.015898Z`
30#[derive(Clone)]
31pub struct TimeLine {
32    time: DateTime,
33}
34
35impl TimeLine {
36    /// returns a clone of the DateTime parsed from the Time: line
37    pub fn time(&self) -> DateTime {
38        self.time.clone()
39    }
40}
41
42/// parses "# Time: .... entry line and returns a `DateTime`
43// # Time: 2015-06-26T16:43:23+0200";
44pub fn parse_entry_time(i: &mut Stream) -> ModalResult<DateTime> {
45    trace("parse_entry_time", move |input: &mut Stream| {
46        let dt = seq!(
47            _: literal("# Time:"),
48            _: multispace1,
49            datetime,
50        )
51        .parse_next(input)?;
52
53        Ok(dt.0)
54    })
55    .parse_next(i)
56}
57
58/// values from the User: entry line
59/// ex. # User@Host: msandbox\[msandbox\] @ localhost []  Id:     3
60#[derive(Clone, Debug, Eq, PartialEq)]
61pub struct SessionLine {
62    pub(crate) user: Bytes,
63    pub(crate) sys_user: Bytes,
64    pub(crate) host: Option<Bytes>,
65    pub(crate) ip_address: Option<Bytes>,
66    pub(crate) thread_id: u32,
67}
68
69impl SessionLine {
70    /// returns user as`Bytes`
71    pub fn user(&self) -> Bytes {
72        self.user.clone()
73    }
74
75    /// returns sys_user as`Bytes`
76    pub fn sys_user(&self) -> Bytes {
77        self.sys_user.clone()
78    }
79
80    /// returns possible host as`Option<Bytes>`
81    pub fn host(&self) -> Option<Bytes> {
82        self.host.clone()
83    }
84
85    /// returns possible ip_address as `Option<Bytes>`
86    pub fn ip_address(&self) -> Option<Bytes> {
87        self.ip_address.clone()
88    }
89
90    /// returns thread_id as `Bytes`
91    pub fn thread_id(&self) -> u32 {
92        self.thread_id
93    }
94}
95
96#[derive(Debug, PartialEq, Default)]
97pub struct HeaderLines {
98    version: Bytes,
99    tcp_port: Option<usize>,
100    socket: Option<Bytes>,
101}
102
103pub fn log_header<'a>(i: &mut Stream<'_>) -> ModalResult<HeaderLines> {
104    trace("log_header", move |input: &mut Stream<'_>| {
105        // check for the '#' since the last parser in the set is greedy
106        let head = seq!{
107            HeaderLines {
108                _: not(literal("#")),
109                _: take_until(1.., ", Version: "),
110                _:  (", Version: "),
111                version: take_until(1.., " started with:").map(|v: &[u8]| v.to_owned().into()),
112                _: literal(" started with:"),
113                _: multispace1,
114                _: literal("Tcp port:"),
115                _: multispace1,
116                tcp_port: opt(digit1).map(|v: Option<&[u8]>| v.and_then(|d| Some(str::from_utf8(d).unwrap().parse().unwrap()))),
117                _: multispace1,
118                _: literal("Unix socket: "),
119                socket: opt(take_till(1.., "\n".as_bytes())).map(|v: Option<&[u8]>| v.and_then(|d| Some(d.to_owned().into()))),
120                _: till_line_ending,
121                _: line_ending,
122                _: till_line_ending,
123                _: line_ending,
124            }
125        }.parse_next(input)?;
126
127        Ok(head)
128    }).parse_next(i)
129}
130
131pub fn sql_lines<'a>(i: &mut Stream<'_>) -> ModalResult<Bytes> {
132    trace("sql_lines", move |input: &mut Stream<'_>| {
133        let mut acc = BytesMut::new();
134
135        let mut escaped = false;
136        let mut quotes = vec![];
137
138        loop {
139            let c = any(input)? as char;
140
141            acc.put_slice(&[c as u8]);
142
143            if escaped.not() && (c == '\'' || c == '\"' || c == '`') {
144                if let Some(q) = quotes.last() {
145                    if &c == q {
146                        let _ = quotes.pop();
147                    } else {
148                        quotes.push(c);
149                    }
150                } else {
151                    quotes.push(c);
152                }
153            }
154
155            if escaped.not() && c == '\\' {
156                escaped = true;
157            } else {
158                escaped = false;
159            }
160
161            if quotes.len() == 0 && c == ';' {
162                return Ok(acc.freeze());
163            }
164        }
165    })
166    .parse_next(i)
167}
168
169pub fn alphanumerichyphen1<'a>(i: &mut Stream<'a>) -> ModalResult<&'a [u8]> {
170    alt((alphanumeric1, literal("_"), literal("-"))).parse_next(i)
171}
172
173pub fn host_name<'a>(i: &mut Stream<'_>) -> ModalResult<Bytes> {
174    trace("host_name", move |input: &mut Stream<'_>| {
175        let (mut first, second): (Vec<&[u8]>, &[u8]) = alt((
176            ((
177                repeat(1.., terminated(alphanumerichyphen1, literal("."))),
178                alpha1,
179            )),
180            ((repeat(1, alphanumerichyphen1), take(0 as usize))),
181        ))
182        .parse_next(input)?;
183
184        if !second.is_empty() {
185            first.push(second);
186        }
187
188        let b = first
189            .iter()
190            .enumerate()
191            .fold(BytesMut::new(), |mut acc, (c, p)| {
192                if c > 0 {
193                    acc.put_slice(".".as_bytes());
194                }
195
196                acc.put_slice(p);
197                acc
198            });
199
200        Ok(b.freeze())
201    })
202    .parse_next(i)
203}
204
205/// ip address handler that only handles IP4
206pub fn ip_address<'a>(i: &mut Stream<'_>) -> ModalResult<Bytes> {
207    trace("ip_address", move |input: &mut Stream<'_>| {
208        let p = seq!(
209            digit1,
210            preceded(literal("."), digit1),
211            preceded(literal("."), digit1),
212            preceded(literal("."), digit1),
213        )
214        .parse_next(input)?;
215
216        let b = [p.0, p.1, p.2, p.3]
217            .iter()
218            .enumerate()
219            .fold(BytesMut::new(), |mut acc, (c, p)| {
220                if c > 0 {
221                    acc.put_slice(".".as_bytes());
222                }
223
224                acc.put_slice(p);
225                acc
226            });
227
228        Ok(b.freeze())
229    })
230    .parse_next(i)
231}
232
233/// thread id parser for 'Id: [\d+]'
234pub fn entry_user_thread_id<'a>(i: &mut Stream<'_>) -> ModalResult<u32> {
235    trace("entry_user_thread_id", move |input: &mut Stream<'_>| {
236        let id = seq!(
237            _: literal("Id:"),
238            _: multispace1,
239            digit1
240        )
241        .parse_next(input)?;
242
243        Ok(u32::from_str(str::from_utf8(id.0).unwrap()).unwrap())
244    })
245    .parse_next(i)
246}
247
248pub fn user_name(i: &mut Stream) -> ModalResult<Bytes> {
249    trace("user_name", move |input: &mut Stream<'_>| {
250        let parts: Vec<&[u8]> =
251            repeat(1.., alt((alphanumeric1, literal("_")))).parse_next(input)?;
252
253        let b = parts.iter().fold(BytesMut::new(), |mut acc, p| {
254            acc.put_slice(p);
255            acc
256        });
257
258        Ok(b.freeze())
259    })
260    .parse_next(i)
261}
262
263/// user line parser
264pub fn entry_user(i: &mut Stream) -> ModalResult<SessionLine> {
265    trace("entry_user", move |input: &mut Stream<'_>| {
266        let s = seq! { SessionLine {
267            _: multispace0,
268            _: literal("# User@Host:"),
269            _: multispace1,
270            user: user_name,
271            _: literal("["),
272            sys_user: user_name,
273            _: literal("]"),
274            _: multispace1,
275            _: literal("@"),
276            _: multispace1,
277            host: opt(host_name),
278            _: multispace0,
279            _: literal("["),
280            _: multispace0,
281            ip_address: opt(ip_address),
282            _: multispace0,
283            _: literal("]"),
284            _: multispace1,
285            thread_id: entry_user_thread_id,
286        }}
287        .parse_next(input)?;
288
289        Ok(s)
290    })
291    .parse_next(i)
292}
293
294/// Struct containing information parsed from the initial comment in a SQL query
295#[derive(Clone, Debug, Default, PartialEq)]
296pub struct SqlStatementContext {
297    /// example field, should just be part of a HashMap
298    pub request_id: Option<Bytes>,
299    /// example field, should just be part of a HashMap
300    pub caller: Option<Bytes>,
301    /// example field, should just be part of a HashMap
302    pub function: Option<Bytes>,
303    /// example field, should just be part of a HashMap
304    pub line: Option<u32>,
305}
306
307impl SqlStatementContext {
308    /// example method, should be replaced by generic key lookup
309    pub fn request_id(&self) -> Option<Cow<str>> {
310        if let Some(i) = &self.request_id {
311            Some(String::from_utf8_lossy(i.as_ref()))
312        } else {
313            None
314        }
315    }
316
317    /// example method, should be replaced by generic key lookup
318    pub fn caller(&self) -> Option<Cow<str>> {
319        if let Some(c) = &self.caller {
320            Some(String::from_utf8_lossy(c.as_ref()))
321        } else {
322            None
323        }
324    }
325
326    /// example method, should be replaced by generic key lookup
327    pub fn function(&self) -> Option<Cow<str>> {
328        if let Some(f) = &self.function {
329            Some(String::from_utf8_lossy(f.as_ref()))
330        } else {
331            None
332        }
333    }
334
335    /// example method, should be replaced by generic key lookup
336    pub fn line(&self) -> Option<u32> {
337        self.line
338    }
339}
340
341pub fn details_comment<'a>(i: &mut Stream) -> ModalResult<HashMap<Bytes, Bytes>> {
342    trace("details_comment", move |input: &mut Stream<'_>| {
343        let mut name: Option<Bytes> = None;
344
345        let mut res: HashMap<Bytes, BytesMut> = HashMap::new();
346
347        let _ = literal("--").parse_next(input)?;
348
349        loop {
350            if name.is_none() {
351                if let Ok(n) = details_tag(input) {
352                    name.replace(n.clone());
353                    if let Some(_) = res.insert(n, BytesMut::new()) {
354                        //TODO: see if you need to set the ErrorKind::Assert specifically, like before
355                        return Err(ErrMode::Cut(ContextError::new()));
356                    }
357                }
358            }
359
360            if let Ok(c) = any::<Partial<&[u8]>, InputError<_>>(input) {
361                let c = c as char;
362
363                if c == '\n' || c == '\r' {
364                    break;
365                }
366
367                if c == ';' || c == ',' {
368                    name = None;
369                    continue;
370                }
371
372                if let Some(k) = &name {
373                    // TODO: previously this specified ErrorKind::Assert, figure out if this needs to be specificied still
374                    let v = &mut res.get_mut(k).ok_or(ErrMode::Cut(ContextError::new()))?;
375
376                    v.put_bytes(c as u8, 1);
377                } else {
378                    // TODO: previously this specified ErrorKind::Assert, figure out if this needs to be specificied still
379                    return Err(ErrMode::Cut(ContextError::new()));
380                }
381
382                continue;
383            } else {
384                break;
385            }
386        }
387
388        Ok(res.into_iter().map(|(k, v)| (k, v.freeze())).collect())
389    })
390    .parse_next(i)
391}
392
393pub fn details_tag<'a>(i: &mut Stream) -> ModalResult<Bytes> {
394    trace("details_tag", move |input: &mut Stream<'_>| {
395        let name = seq!(
396            _: multispace0,
397            user_name,
398            _: multispace0,
399            _: alt((literal(":"), literal("="))),
400            _: multispace0,
401        )
402        .parse_next(input)?;
403
404        Ok(name.0.into())
405    })
406    .parse_next(i)
407}
408
409/// values parsed from stats entry line
410#[derive(Clone, Copy, Debug, PartialEq)]
411pub struct StatsLine {
412    /// how long the overall query took
413    pub(crate) query_time: f64,
414    /// how long the query held locks
415    pub(crate) lock_time: f64,
416    /// how many rows were sent
417    pub(crate) rows_sent: u32,
418    /// how many rows were scanned
419    pub(crate) rows_examined: u32,
420}
421
422impl StatsLine {
423    /// how long the overall query took
424    pub fn query_time(&self) -> f64 {
425        self.query_time.clone()
426    }
427    /// how long the query held locks
428    pub fn lock_time(&self) -> f64 {
429        self.lock_time.clone()
430    }
431
432    /// how many rows were sent
433    pub fn rows_sent(&self) -> u32 {
434        self.rows_sent.clone()
435    }
436    /// how many rows were scanned
437    pub fn rows_examined(&self) -> u32 {
438        self.rows_examined.clone()
439    }
440}
441
442/// parse '# Query_time:...' entry line
443pub fn parse_entry_stats(i: &mut Stream<'_>) -> ModalResult<StatsLine> {
444    trace("parse_entry_stats", move |input: &mut Stream<'_>| {
445        let stats = seq! {StatsLine {
446            _: literal("#"),
447            _: multispace1,
448            _: literal("Query_time:"),
449            _: multispace1,
450            query_time: float,
451            _: multispace1,
452            _: literal("Lock_time:"),
453            _: multispace1,
454            lock_time: float,
455            _: multispace1,
456            _: literal("Rows_sent:"),
457            _: multispace1,
458            rows_sent: digit1.map(|d| str::from_utf8(d).unwrap().parse().unwrap()),
459            _: multispace1,
460            _: literal("Rows_examined:"),
461            _: multispace1,
462            rows_examined: digit1.map(|d| str::from_utf8(d).unwrap().parse().unwrap()),
463        }}
464        .parse_next(input)?;
465
466        Ok(stats)
467    })
468    .parse_next(i)
469}
470
471/// admin command values parsed from sql lines of an entry
472#[derive(Clone, Debug, PartialEq)]
473pub struct EntryAdminCommand {
474    /// the admin command sent
475    pub command: Bytes,
476}
477
478/// parse "# administrator command: " entry line
479pub fn admin_command<'a>(i: &mut Stream) -> ModalResult<EntryAdminCommand> {
480    trace("admin_command", move |input: &mut Stream<'_>| {
481        let command = seq!(
482            _: literal("# administrator command:"),
483            _: multispace1,
484            alphanumerichyphen1,
485            _: literal(";"),
486        )
487        .parse_next(input)?;
488
489        Ok(EntryAdminCommand {
490            command: command.0.to_owned().into(),
491        })
492    })
493    .parse_next(i)
494}
495
496/// parses 'USE database=\w+;' command which shows up at the start of some entry sql
497pub fn use_database(i: &mut Stream) -> ModalResult<Bytes> {
498    trace("use_database", move |input: &mut Stream<'_>| {
499        let db_name = seq!(
500            _: literal(Caseless("USE")),
501            _: multispace1,
502            user_name,
503            _: multispace0,
504            _: literal(";"),
505        )
506        .parse_next(input)?;
507
508        Ok(db_name.0.into())
509    })
510    .parse_next(i)
511}
512
513/// parses 'SET timestamp=\d{10};' command which starts
514pub fn start_timestamp_command(i: &mut Stream) -> ModalResult<u32> {
515    trace("start_timestamp_command", move |input: &mut Stream<'_>| {
516        let time = seq!(
517            _: literal("SET timestamp"),
518            _: multispace0,
519            _: literal("="),
520            _: multispace0,
521            digit1,
522            _: multispace0,
523            _: literal(";"),
524        )
525        .parse_next(input)?;
526
527        Ok(u32::from_str(str::from_utf8(time.0).unwrap()).unwrap())
528    })
529    .parse_next(i)
530}
531
532/// Parses one or more sql statements using `sqlparser::parse_statements`. This uses the
533/// `sqlparser::Tokenizer` to first tokenize the SQL and replace tokenized values with an
534/// masked value determined by the `&EntryMasking` value passed as an argument. In the case of
535/// `EntryMasking::None` this call is identical to calling `sqlparse::parse_statements`.
536/// command: " entry line
537pub fn parse_sql(sql: &str, mask: &EntryMasking) -> Result<Vec<Statement>, ParserError> {
538    let mut tokenizer = Tokenizer::new(&MySqlDialect {}, sql);
539    let mut tokens = tokenizer.tokenize()?;
540
541    tokens = mask_tokens(tokens, mask);
542
543    let mut parser = SQLParser::new(&MySqlDialect {}).with_tokens(tokens);
544
545    parser.parse_statements()
546}
547
548/// Replaces numbers, strings and literal tokenized by `sql_parser::Tokenizer` and replaces them
549/// with a masking values. Passing a value of `EntryMasking::None` will simply return the
550/// `Vec<Token>` passed in.
551pub fn mask_tokens(tokens: Vec<Token>, mask: &EntryMasking) -> Vec<Token> {
552    let mut acc = vec![];
553
554    if mask == &EntryMasking::None {
555        return tokens;
556    }
557
558    for t in tokens {
559        let mt = if let Token::Number(_, _) = t {
560            Token::Placeholder("?".into())
561        } else if let Token::Number(_, _) = t {
562            Token::Placeholder("?".into())
563        } else if let Token::SingleQuotedString(_) = t {
564            Token::Placeholder("?".into())
565        } else if let Token::DoubleQuotedString(_) = t {
566            Token::Placeholder("?".into())
567        } else if let Token::NationalStringLiteral(_) = t {
568            Token::Placeholder("?".into())
569        } else if let Token::EscapedStringLiteral(_) = t {
570            Token::Placeholder("?".into())
571        } else if let Token::HexStringLiteral(_) = t {
572            Token::Placeholder("?".into())
573        } else {
574            t
575        };
576
577        acc.push(mt);
578    }
579
580    acc
581}
582
583#[cfg(test)]
584mod tests {
585    use crate::EntryMasking;
586    use crate::parser::{
587        EntryAdminCommand, HeaderLines, SessionLine, StatsLine, Stream, admin_command,
588        details_comment, entry_user, host_name, ip_address, log_header, parse_entry_stats,
589        parse_entry_time, parse_sql, sql_lines, start_timestamp_command, use_database,
590    };
591    use bytes::Bytes;
592    use std::assert_eq;
593    use std::collections::HashMap;
594    use winnow_datetime::{Date, DateTime, Offset, Time};
595
596    #[test]
597    fn parses_time_line() {
598        let i = "# Time: 2015-06-26T16:43:23+0200";
599
600        let expected = DateTime {
601            date: Date::YMD {
602                year: 2015,
603                month: 6,
604                day: 26,
605            },
606            time: Time {
607                hour: 16,
608                minute: 43,
609                second: 23,
610                millisecond: 0,
611                offset: Some(Offset::Fixed {
612                    hours: 2,
613                    minutes: 0,
614                    critical: false,
615                }),
616                time_zone: None,
617                calendar: None,
618            },
619        };
620
621        let mut s = Stream::new(i.as_bytes());
622
623        //TODO: check for leftovers
624        let dt = parse_entry_time(&mut s).unwrap();
625        assert_eq!(expected, dt);
626    }
627
628    #[test]
629    fn parses_use_database() {
630        let i = "use mysql;";
631        let mut s = Stream::new(i.as_bytes());
632
633        let res = use_database(&mut s).unwrap();
634        assert_eq!(
635            (s, res),
636            (Stream::new("".as_bytes()), "mysql".trim().into())
637        );
638    }
639
640    #[test]
641    fn parses_localhost_host_name() {
642        let i = "localhost ";
643
644        let mut s = Stream::new(i.as_bytes());
645        let res = host_name(&mut s).unwrap();
646
647        assert_eq!(res, i.trim());
648    }
649
650    #[test]
651    fn parses_full_host_name() {
652        let i = "local.tests.rs ";
653
654        let mut s = Stream::new(i.as_bytes());
655        let res = host_name(&mut s).unwrap();
656
657        assert_eq!(res, Bytes::from("local.tests.rs".trim()));
658    }
659
660    #[test]
661    fn parses_ip_address() {
662        let i = "127.0.0.2 ";
663
664        let mut s = Stream::new(i.as_bytes());
665        let res = ip_address(&mut s).unwrap();
666
667        assert_eq!(res, Bytes::from(i.trim()));
668    }
669
670    #[test]
671    fn parses_user_line_no_ip() {
672        let i = "# User@Host: msandbox[msandbox] @ localhost []  Id:     3\n";
673
674        let expected = SessionLine {
675            user: Bytes::from("msandbox"),
676            sys_user: Bytes::from("msandbox"),
677            host: Some(Bytes::from("localhost")),
678            ip_address: None,
679            thread_id: 3,
680        };
681
682        let mut s = Stream::new(i.as_bytes());
683        let res = entry_user(&mut s).unwrap();
684        //TODO: check for left overs
685        assert_eq!(expected, res);
686    }
687
688    #[test]
689    fn parses_user_line_no_host() {
690        let i = "# User@Host: lobster[lobster] @ [192.168.56.1]  Id:   190\n";
691        let mut s = Stream::new(i.as_bytes());
692        let expected = SessionLine {
693            user: Bytes::from("lobster"),
694            sys_user: Bytes::from("lobster"),
695            host: None,
696            ip_address: Some(Bytes::from("192.168.56.1")),
697            thread_id: 190,
698        };
699
700        let res = entry_user(&mut s).unwrap();
701        assert_eq!(expected, res);
702    }
703
704    #[test]
705    fn parses_stats_line() {
706        let i = "# Query_time: 1.000016  Lock_time: 2.000000 Rows_sent: 3  Rows_examined: 4\n";
707
708        let expected = StatsLine {
709            query_time: 1.000016,
710            lock_time: 2.0,
711            rows_sent: 3,
712            rows_examined: 4,
713        };
714
715        let mut s = Stream::new(i.as_bytes());
716        let res = parse_entry_stats(&mut s).unwrap();
717        //TODO: check for leftovers
718        assert_eq!(expected, res);
719    }
720
721    #[test]
722    fn parses_admin_command_line() {
723        let i = "# administrator command: Quit;\n";
724
725        let expected = EntryAdminCommand {
726            command: "Quit".into(),
727        };
728
729        let mut s = Stream::new(i.as_bytes());
730        //TODO: check for leftovers
731        let res = admin_command(&mut s).unwrap();
732        assert_eq!(expected, res);
733    }
734
735    #[test]
736    fn parses_details_comment() {
737        let s0 = "-- Id: 123; long: some kind of details here; caller: hello_world()\n";
738        let s1 = "-- Id: 123, long: some kind of details here, caller : hello_world()\n";
739        let s2 = "-- Id= 123, long = some kind of details here, caller= hello_world()\n";
740
741        let expected = (
742            Stream::new("".as_bytes()),
743            HashMap::from([
744                ("Id".into(), "123".into()),
745                ("long".into(), "some kind of details here".into()),
746                ("caller".into(), "hello_world()".into()),
747            ]),
748        );
749
750        let mut s = Stream::new(s0.as_bytes());
751        let res = details_comment(&mut s).unwrap();
752        //TODO: Stream ToString and ToStr
753        assert_eq!((s, res), expected);
754
755        let mut s = Stream::new(s1.as_bytes());
756        let res = details_comment(&mut s).unwrap();
757        assert_eq!((s, res), expected);
758
759        let mut s = Stream::new(s2.as_bytes());
760        let res = details_comment(&mut s).unwrap();
761
762        assert_eq!((s, res), expected);
763    }
764
765    #[test]
766    fn parses_details_comment_trailing_key() {
767        let i = "-- Id: 123, long: some kind of details here, caller: hello_world():52\n";
768        let mut s = Stream::new(i.as_bytes());
769
770        let res = details_comment(&mut s).unwrap();
771
772        let expected = (
773            Stream::new("".as_bytes()),
774            HashMap::from([
775                ("Id".into(), "123".into()),
776                ("long".into(), "some kind of details here".into()),
777                ("caller".into(), "hello_world():52".into()),
778            ]),
779        );
780
781        assert_eq!((s, res), expected);
782
783        let i = "-- Id: 123, long: some kind of details here, caller: hello_world(): 52\n";
784        let mut s = Stream::new(i.as_bytes());
785
786        let res = details_comment(&mut s).unwrap();
787        let expected = (
788            Stream::new("".as_bytes()),
789            HashMap::from([
790                ("Id".into(), "123".into()),
791                ("long".into(), "some kind of details here".into()),
792                ("caller".into(), "hello_world(): 52".into()),
793            ]),
794        );
795
796        assert_eq!((s, res), expected);
797    }
798
799    #[test]
800    fn parses_start_timestamp() {
801        let l = "SET timestamp=1517798807;";
802        let mut s = Stream::new(l.as_bytes());
803        let res = start_timestamp_command(&mut s).unwrap();
804
805        let expected = (Stream::new("".as_bytes()), 1517798807);
806
807        assert_eq!((s, res), expected);
808    }
809
810    #[test]
811    fn parses_masked_selects() {
812        let sql0 = "SELECT a, b, 123, 'abcd', myfunc(b) \
813           FROM table_1 \
814           WHERE a > b AND b < 100 \
815           ORDER BY a DESC, b";
816
817        let sql1 = "SELECT a, b, 456, 'efg', myfunc(b) \
818           FROM table_1 \
819           WHERE a > b AND b < 1000 \
820           ORDER BY a DESC, b";
821
822        let ast0 = parse_sql(sql0, &EntryMasking::PlaceHolder).unwrap();
823        let ast1 = parse_sql(sql1, &EntryMasking::PlaceHolder).unwrap();
824
825        assert_eq!(ast0, ast1);
826    }
827
828    #[test]
829    fn parses_select_sql() {
830        let sql = "SELECT a, b, 123, 'abcd', myfunc(b) \
831           FROM table_1 \
832           WHERE a > b AND b < 100 \
833           ORDER BY a DESC, b;";
834
835        let mut s = Stream::new(sql.as_bytes());
836        let res = sql_lines(&mut s).unwrap();
837
838        assert_eq!(res, sql);
839    }
840
841    #[test]
842    fn parses_setter_sql() {
843        let sql = "/*!40101 SET NAMES utf8 */;\n";
844
845        let mut s = Stream::new(sql.as_bytes());
846        let res = sql_lines(&mut s).unwrap();
847
848        assert_eq!(res, sql.trim());
849    }
850
851    #[test]
852    fn parses_quoted_terminator_sql() {
853        let sql = "SELECT
854a.actor_id,
855a.first_name,
856a.last_name,
857GROUP_CONCAT(DISTINCT CONCAT(c.name, ': ',
858                (SELECT GROUP_CONCAT(f.title ORDER BY f.title SEPARATOR ', ')
859                    FROM sakila.film f
860                    INNER JOIN sakila.film_category fc
861                      ON f.film_id = fc.film_id
862                    INNER JOIN sakila.film_actor fa
863                      ON f.film_id = fa.film_id
864                    WHERE fc.category_id = c.category_id
865                    AND fa.actor_id = a.actor_id
866                 )
867             )
868             ORDER BY c.name SEPARATOR '; ')
869AS film_info
870FROM sakila.actor a;
871";
872
873        let mut s = Stream::new(sql.as_bytes());
874        let res = sql_lines(&mut s).unwrap();
875
876        assert_eq!((s, res), (Stream::new("\n".as_bytes()), sql.trim().into()));
877    }
878
879    #[test]
880    fn parses_quoted_quoted_terminator_sql() {
881        let sql = r#"SELECT
882a.actor_id,
883a.first_name,
884a.last_name,
885GROUP_CONCAT(DISTINCT CONCAT(c.name, ': ',
886                (SELECT GROUP_CONCAT(f.title ORDER BY f.title SEPARATOR ', ')
887                    FROM sakila.film f
888                    INNER JOIN sakila.film_category fc
889                      ON f.film_id = fc.film_id
890                    INNER JOIN sakila.film_actor fa
891                      ON f.film_id = fa.film_id
892                    WHERE fc.category_id = c.category_id
893                    AND fa.actor_id = a.actor_id
894                 )
895             )
896             ORDER BY c.name SEPARATOR '\'\"; ')
897AS film_info
898FROM sakila.actor a;
899"#;
900
901        let mut s = Stream::new(sql.as_bytes());
902        let res = sql_lines(&mut s).unwrap();
903
904        assert_eq!(res, sql.trim());
905    }
906
907    #[test]
908    fn parses_header() {
909        let h = "/home/karl/mysql/my-5.7/bin/mysqld, Version: 5.7.20-log (MySQL Community Server (GPL)). started with:
910Tcp port: 12345  Unix socket: /tmp/12345/mysql_sandbox12345.sock
911Time                 Id Command    Argument\n";
912
913        let mut s = Stream::new(h.as_bytes());
914
915        let res = log_header(&mut s).unwrap();
916
917        assert_eq!(
918            (s, res),
919            (
920                Stream::new("".as_bytes()),
921                HeaderLines {
922                    version: Bytes::from("5.7.20-log (MySQL Community Server (GPL))."),
923                    tcp_port: Some(12345),
924                    socket: Some(Bytes::from("/tmp/12345/mysql_sandbox12345.sock")),
925                }
926            )
927        );
928    }
929}