1use crate::EntryMasking;
2use bytes::{BufMut, Bytes, BytesMut};
3use sqlparser::ast::Statement;
4use sqlparser::dialect::MySqlDialect;
5use sqlparser::parser::{Parser as SQLParser, ParserError};
6use sqlparser::tokenizer::{Token, Tokenizer};
7use std::borrow::Cow;
8use std::collections::HashMap;
9use std::ops::Not;
10use std::str;
11use std::str::FromStr;
12use winnow::ascii::{
13 Caseless, alpha1, alphanumeric1, digit1, float, line_ending, multispace0, multispace1,
14 till_line_ending,
15};
16use winnow::combinator::repeat;
17use winnow::combinator::{alt, trace};
18use winnow::combinator::{not, opt};
19use winnow::combinator::{preceded, terminated};
20use winnow::error::{ContextError, ErrMode, InputError};
21use winnow::token::{any, literal, take, take_till, take_until};
22use winnow::{ModalResult, Parser, Partial, seq};
23use winnow_datetime::DateTime;
24use winnow_iso8601::datetime::datetime;
25
26pub type Stream<'i> = Partial<&'i [u8]>;
27
28#[derive(Clone, Copy)]
31pub struct TimeLine {
32 time: DateTime,
33}
34
35impl TimeLine {
36 pub fn time(&self) -> DateTime {
38 self.time.clone()
39 }
40}
41
42pub fn parse_entry_time(i: &mut Stream) -> ModalResult<DateTime> {
45 trace("parse_entry_time", move |input: &mut Stream| {
46 let dt = seq!(
47 _: literal("# Time:"),
48 _: multispace1,
49 datetime,
50 )
51 .parse_next(input)?;
52
53 Ok(dt.0)
54 })
55 .parse_next(i)
56}
57
58#[derive(Clone, Debug, Eq, PartialEq)]
61pub struct SessionLine {
62 pub(crate) user: Bytes,
63 pub(crate) sys_user: Bytes,
64 pub(crate) host: Option<Bytes>,
65 pub(crate) ip_address: Option<Bytes>,
66 pub(crate) thread_id: u32,
67}
68
69impl SessionLine {
70 pub fn user(&self) -> Bytes {
72 self.user.clone()
73 }
74
75 pub fn sys_user(&self) -> Bytes {
77 self.sys_user.clone()
78 }
79
80 pub fn host(&self) -> Option<Bytes> {
82 self.host.clone()
83 }
84
85 pub fn ip_address(&self) -> Option<Bytes> {
87 self.ip_address.clone()
88 }
89
90 pub fn thread_id(&self) -> u32 {
92 self.thread_id
93 }
94}
95
96#[derive(Debug, PartialEq, Default)]
97pub struct HeaderLines {
98 version: Bytes,
99 tcp_port: Option<usize>,
100 socket: Option<Bytes>,
101}
102
103pub fn log_header<'a>(i: &mut Stream<'_>) -> ModalResult<HeaderLines> {
104 trace("log_header", move |input: &mut Stream<'_>| {
105 let head = seq!{
107 HeaderLines {
108 _: not(literal("#")),
109 _: take_until(1.., ", Version: "),
110 _: (", Version: "),
111 version: take_until(1.., " started with:").map(|v: &[u8]| v.to_owned().into()),
112 _: literal(" started with:"),
113 _: multispace1,
114 _: literal("Tcp port:"),
115 _: multispace1,
116 tcp_port: opt(digit1).map(|v: Option<&[u8]>| v.and_then(|d| Some(str::from_utf8(d).unwrap().parse().unwrap()))),
117 _: multispace1,
118 _: literal("Unix socket: "),
119 socket: opt(take_till(1.., "\n".as_bytes())).map(|v: Option<&[u8]>| v.and_then(|d| Some(d.to_owned().into()))),
120 _: till_line_ending,
121 _: line_ending,
122 _: till_line_ending,
123 _: line_ending,
124 }
125 }.parse_next(input)?;
126
127 Ok(head)
128 }).parse_next(i)
129}
130
131pub fn sql_lines<'a>(i: &mut Stream<'_>) -> ModalResult<Bytes> {
132 trace("sql_lines", move |input: &mut Stream<'_>| {
133 let mut acc = BytesMut::new();
134
135 let mut escaped = false;
136 let mut quotes = vec![];
137
138 loop {
139 let c = any(input)? as char;
140
141 acc.put_slice(&[c as u8]);
142
143 if escaped.not() && (c == '\'' || c == '\"' || c == '`') {
144 if let Some(q) = quotes.last() {
145 if &c == q {
146 let _ = quotes.pop();
147 } else {
148 quotes.push(c);
149 }
150 } else {
151 quotes.push(c);
152 }
153 }
154
155 if escaped.not() && c == '\\' {
156 escaped = true;
157 } else {
158 escaped = false;
159 }
160
161 if quotes.len() == 0 && c == ';' {
162 return Ok(acc.freeze());
163 }
164 }
165 })
166 .parse_next(i)
167}
168
169pub fn alphanumerichyphen1<'a>(i: &mut Stream<'a>) -> ModalResult<&'a [u8]> {
170 alt((alphanumeric1, literal("_"), literal("-"))).parse_next(i)
171}
172
173pub fn host_name<'a>(i: &mut Stream<'_>) -> ModalResult<Bytes> {
174 trace("host_name", move |input: &mut Stream<'_>| {
175 let (mut first, second): (Vec<&[u8]>, &[u8]) = alt((
176 ((
177 repeat(1.., terminated(alphanumerichyphen1, literal("."))),
178 alpha1,
179 )),
180 ((repeat(1, alphanumerichyphen1), take(0 as usize))),
181 ))
182 .parse_next(input)?;
183
184 if !second.is_empty() {
185 first.push(second);
186 }
187
188 let b = first
189 .iter()
190 .enumerate()
191 .fold(BytesMut::new(), |mut acc, (c, p)| {
192 if c > 0 {
193 acc.put_slice(".".as_bytes());
194 }
195
196 acc.put_slice(p);
197 acc
198 });
199
200 Ok(b.freeze())
201 })
202 .parse_next(i)
203}
204
205pub fn ip_address<'a>(i: &mut Stream<'_>) -> ModalResult<Bytes> {
207 trace("ip_address", move |input: &mut Stream<'_>| {
208 let p = seq!(
209 digit1,
210 preceded(literal("."), digit1),
211 preceded(literal("."), digit1),
212 preceded(literal("."), digit1),
213 )
214 .parse_next(input)?;
215
216 let b = [p.0, p.1, p.2, p.3]
217 .iter()
218 .enumerate()
219 .fold(BytesMut::new(), |mut acc, (c, p)| {
220 if c > 0 {
221 acc.put_slice(".".as_bytes());
222 }
223
224 acc.put_slice(p);
225 acc
226 });
227
228 Ok(b.freeze())
229 })
230 .parse_next(i)
231}
232
233pub fn entry_user_thread_id<'a>(i: &mut Stream<'_>) -> ModalResult<u32> {
235 trace("entry_user_thread_id", move |input: &mut Stream<'_>| {
236 let id = seq!(
237 _: literal("Id:"),
238 _: multispace1,
239 digit1
240 )
241 .parse_next(input)?;
242
243 Ok(u32::from_str(str::from_utf8(id.0).unwrap()).unwrap())
244 })
245 .parse_next(i)
246}
247
248pub fn user_name(i: &mut Stream) -> ModalResult<Bytes> {
249 trace("user_name", move |input: &mut Stream<'_>| {
250 let parts: Vec<&[u8]> =
251 repeat(1.., alt((alphanumeric1, literal("_")))).parse_next(input)?;
252
253 let b = parts.iter().fold(BytesMut::new(), |mut acc, p| {
254 acc.put_slice(p);
255 acc
256 });
257
258 Ok(b.freeze())
259 })
260 .parse_next(i)
261}
262
263pub fn entry_user(i: &mut Stream) -> ModalResult<SessionLine> {
265 trace("entry_user", move |input: &mut Stream<'_>| {
266 let s = seq! { SessionLine {
267 _: multispace0,
268 _: literal("# User@Host:"),
269 _: multispace1,
270 user: user_name,
271 _: literal("["),
272 sys_user: user_name,
273 _: literal("]"),
274 _: multispace1,
275 _: literal("@"),
276 _: multispace1,
277 host: opt(host_name),
278 _: multispace0,
279 _: literal("["),
280 _: multispace0,
281 ip_address: opt(ip_address),
282 _: multispace0,
283 _: literal("]"),
284 _: multispace1,
285 thread_id: entry_user_thread_id,
286 }}
287 .parse_next(input)?;
288
289 Ok(s)
290 })
291 .parse_next(i)
292}
293
294#[derive(Clone, Debug, Default, PartialEq)]
296pub struct SqlStatementContext {
297 pub request_id: Option<Bytes>,
299 pub caller: Option<Bytes>,
301 pub function: Option<Bytes>,
303 pub line: Option<u32>,
305}
306
307impl SqlStatementContext {
308 pub fn request_id(&self) -> Option<Cow<str>> {
310 if let Some(i) = &self.request_id {
311 Some(String::from_utf8_lossy(i.as_ref()))
312 } else {
313 None
314 }
315 }
316
317 pub fn caller(&self) -> Option<Cow<str>> {
319 if let Some(c) = &self.caller {
320 Some(String::from_utf8_lossy(c.as_ref()))
321 } else {
322 None
323 }
324 }
325
326 pub fn function(&self) -> Option<Cow<str>> {
328 if let Some(f) = &self.function {
329 Some(String::from_utf8_lossy(f.as_ref()))
330 } else {
331 None
332 }
333 }
334
335 pub fn line(&self) -> Option<u32> {
337 self.line
338 }
339}
340
341pub fn details_comment<'a>(i: &mut Stream) -> ModalResult<HashMap<Bytes, Bytes>> {
342 trace("details_comment", move |input: &mut Stream<'_>| {
343 let mut name: Option<Bytes> = None;
344
345 let mut res: HashMap<Bytes, BytesMut> = HashMap::new();
346
347 let _ = literal("--").parse_next(input)?;
348
349 loop {
350 if name.is_none() {
351 if let Ok(n) = details_tag(input) {
352 name.replace(n.clone());
353 if let Some(_) = res.insert(n, BytesMut::new()) {
354 return Err(ErrMode::Cut(ContextError::new()));
356 }
357 }
358 }
359
360 if let Ok(c) = any::<Partial<&[u8]>, InputError<_>>(input) {
361 let c = c as char;
362
363 if c == '\n' || c == '\r' {
364 break;
365 }
366
367 if c == ';' || c == ',' {
368 name = None;
369 continue;
370 }
371
372 if let Some(k) = &name {
373 let v = &mut res.get_mut(k).ok_or(ErrMode::Cut(ContextError::new()))?;
375
376 v.put_bytes(c as u8, 1);
377 } else {
378 return Err(ErrMode::Cut(ContextError::new()));
380 }
381
382 continue;
383 } else {
384 break;
385 }
386 }
387
388 Ok(res.into_iter().map(|(k, v)| (k, v.freeze())).collect())
389 })
390 .parse_next(i)
391}
392
393pub fn details_tag<'a>(i: &mut Stream) -> ModalResult<Bytes> {
394 trace("details_tag", move |input: &mut Stream<'_>| {
395 let name = seq!(
396 _: multispace0,
397 user_name,
398 _: multispace0,
399 _: alt((literal(":"), literal("="))),
400 _: multispace0,
401 )
402 .parse_next(input)?;
403
404 Ok(name.0.into())
405 })
406 .parse_next(i)
407}
408
409#[derive(Clone, Copy, Debug, PartialEq)]
411pub struct StatsLine {
412 pub(crate) query_time: f64,
414 pub(crate) lock_time: f64,
416 pub(crate) rows_sent: u32,
418 pub(crate) rows_examined: u32,
420}
421
422impl StatsLine {
423 pub fn query_time(&self) -> f64 {
425 self.query_time.clone()
426 }
427 pub fn lock_time(&self) -> f64 {
429 self.lock_time.clone()
430 }
431
432 pub fn rows_sent(&self) -> u32 {
434 self.rows_sent.clone()
435 }
436 pub fn rows_examined(&self) -> u32 {
438 self.rows_examined.clone()
439 }
440}
441
442pub fn parse_entry_stats(i: &mut Stream<'_>) -> ModalResult<StatsLine> {
444 trace("parse_entry_stats", move |input: &mut Stream<'_>| {
445 let stats = seq! {StatsLine {
446 _: literal("#"),
447 _: multispace1,
448 _: literal("Query_time:"),
449 _: multispace1,
450 query_time: float,
451 _: multispace1,
452 _: literal("Lock_time:"),
453 _: multispace1,
454 lock_time: float,
455 _: multispace1,
456 _: literal("Rows_sent:"),
457 _: multispace1,
458 rows_sent: digit1.map(|d| str::from_utf8(d).unwrap().parse().unwrap()),
459 _: multispace1,
460 _: literal("Rows_examined:"),
461 _: multispace1,
462 rows_examined: digit1.map(|d| str::from_utf8(d).unwrap().parse().unwrap()),
463 }}
464 .parse_next(input)?;
465
466 Ok(stats)
467 })
468 .parse_next(i)
469}
470
471#[derive(Clone, Debug, PartialEq)]
473pub struct EntryAdminCommand {
474 pub command: Bytes,
476}
477
478pub fn admin_command<'a>(i: &mut Stream) -> ModalResult<EntryAdminCommand> {
480 trace("admin_command", move |input: &mut Stream<'_>| {
481 let command = seq!(
482 _: literal("# administrator command:"),
483 _: multispace1,
484 alphanumerichyphen1,
485 _: literal(";"),
486 )
487 .parse_next(input)?;
488
489 Ok(EntryAdminCommand {
490 command: command.0.to_owned().into(),
491 })
492 })
493 .parse_next(i)
494}
495
496pub fn use_database(i: &mut Stream) -> ModalResult<Bytes> {
498 trace("use_database", move |input: &mut Stream<'_>| {
499 let db_name = seq!(
500 _: literal(Caseless("USE")),
501 _: multispace1,
502 user_name,
503 _: multispace0,
504 _: literal(";"),
505 )
506 .parse_next(input)?;
507
508 Ok(db_name.0.into())
509 })
510 .parse_next(i)
511}
512
513pub fn start_timestamp_command(i: &mut Stream) -> ModalResult<u32> {
515 trace("start_timestamp_command", move |input: &mut Stream<'_>| {
516 let time = seq!(
517 _: literal("SET timestamp"),
518 _: multispace0,
519 _: literal("="),
520 _: multispace0,
521 digit1,
522 _: multispace0,
523 _: literal(";"),
524 )
525 .parse_next(input)?;
526
527 Ok(u32::from_str(str::from_utf8(time.0).unwrap()).unwrap())
528 })
529 .parse_next(i)
530}
531
532pub fn parse_sql(sql: &str, mask: &EntryMasking) -> Result<Vec<Statement>, ParserError> {
538 let mut tokenizer = Tokenizer::new(&MySqlDialect {}, sql);
539 let mut tokens = tokenizer.tokenize()?;
540
541 tokens = mask_tokens(tokens, mask);
542
543 let mut parser = SQLParser::new(&MySqlDialect {}).with_tokens(tokens);
544
545 parser.parse_statements()
546}
547
548pub fn mask_tokens(tokens: Vec<Token>, mask: &EntryMasking) -> Vec<Token> {
552 let mut acc = vec![];
553
554 if mask == &EntryMasking::None {
555 return tokens;
556 }
557
558 for t in tokens {
559 let mt = if let Token::Number(_, _) = t {
560 Token::Placeholder("?".into())
561 } else if let Token::Number(_, _) = t {
562 Token::Placeholder("?".into())
563 } else if let Token::SingleQuotedString(_) = t {
564 Token::Placeholder("?".into())
565 } else if let Token::DoubleQuotedString(_) = t {
566 Token::Placeholder("?".into())
567 } else if let Token::NationalStringLiteral(_) = t {
568 Token::Placeholder("?".into())
569 } else if let Token::EscapedStringLiteral(_) = t {
570 Token::Placeholder("?".into())
571 } else if let Token::HexStringLiteral(_) = t {
572 Token::Placeholder("?".into())
573 } else {
574 t
575 };
576
577 acc.push(mt);
578 }
579
580 acc
581}
582
583#[cfg(test)]
584mod tests {
585 use crate::EntryMasking;
586 use crate::parser::{
587 EntryAdminCommand, HeaderLines, SessionLine, StatsLine, Stream, admin_command,
588 details_comment, entry_user, host_name, ip_address, log_header, parse_entry_stats,
589 parse_entry_time, parse_sql, sql_lines, start_timestamp_command, use_database,
590 };
591 use bytes::Bytes;
592 use std::assert_eq;
593 use std::collections::HashMap;
594 use winnow_datetime::{Date, DateTime, Offset, Time};
595
596 #[test]
597 fn parses_time_line() {
598 let i = "# Time: 2015-06-26T16:43:23+0200";
599
600 let expected = DateTime {
601 date: Date::YMD {
602 year: 2015,
603 month: 6,
604 day: 26,
605 },
606 time: Time {
607 hour: 16,
608 minute: 43,
609 second: 23,
610 millisecond: 0,
611 offset: Some(Offset {
612 offset_hours: 2,
613 offset_minutes: 0,
614 }),
615 },
616 };
617
618 let mut s = Stream::new(i.as_bytes());
619
620 let dt = parse_entry_time(&mut s).unwrap();
622 assert_eq!(expected, dt);
623 }
624
625 #[test]
626 fn parses_use_database() {
627 let i = "use mysql;";
628 let mut s = Stream::new(i.as_bytes());
629
630 let res = use_database(&mut s).unwrap();
631 assert_eq!(
632 (s, res),
633 (Stream::new("".as_bytes()), "mysql".trim().into())
634 );
635 }
636
637 #[test]
638 fn parses_localhost_host_name() {
639 let i = "localhost ";
640
641 let mut s = Stream::new(i.as_bytes());
642 let res = host_name(&mut s).unwrap();
643
644 assert_eq!(res, i.trim());
645 }
646
647 #[test]
648 fn parses_full_host_name() {
649 let i = "local.tests.rs ";
650
651 let mut s = Stream::new(i.as_bytes());
652 let res = host_name(&mut s).unwrap();
653
654 assert_eq!(res, Bytes::from("local.tests.rs".trim()));
655 }
656
657 #[test]
658 fn parses_ip_address() {
659 let i = "127.0.0.2 ";
660
661 let mut s = Stream::new(i.as_bytes());
662 let res = ip_address(&mut s).unwrap();
663
664 assert_eq!(res, Bytes::from(i.trim()));
665 }
666
667 #[test]
668 fn parses_user_line_no_ip() {
669 let i = "# User@Host: msandbox[msandbox] @ localhost [] Id: 3\n";
670
671 let expected = SessionLine {
672 user: Bytes::from("msandbox"),
673 sys_user: Bytes::from("msandbox"),
674 host: Some(Bytes::from("localhost")),
675 ip_address: None,
676 thread_id: 3,
677 };
678
679 let mut s = Stream::new(i.as_bytes());
680 let res = entry_user(&mut s).unwrap();
681 assert_eq!(expected, res);
683 }
684
685 #[test]
686 fn parses_user_line_no_host() {
687 let i = "# User@Host: lobster[lobster] @ [192.168.56.1] Id: 190\n";
688 let mut s = Stream::new(i.as_bytes());
689 let expected = SessionLine {
690 user: Bytes::from("lobster"),
691 sys_user: Bytes::from("lobster"),
692 host: None,
693 ip_address: Some(Bytes::from("192.168.56.1")),
694 thread_id: 190,
695 };
696
697 let res = entry_user(&mut s).unwrap();
698 assert_eq!(expected, res);
699 }
700
701 #[test]
702 fn parses_stats_line() {
703 let i = "# Query_time: 1.000016 Lock_time: 2.000000 Rows_sent: 3 Rows_examined: 4\n";
704
705 let expected = StatsLine {
706 query_time: 1.000016,
707 lock_time: 2.0,
708 rows_sent: 3,
709 rows_examined: 4,
710 };
711
712 let mut s = Stream::new(i.as_bytes());
713 let res = parse_entry_stats(&mut s).unwrap();
714 assert_eq!(expected, res);
716 }
717
718 #[test]
719 fn parses_admin_command_line() {
720 let i = "# administrator command: Quit;\n";
721
722 let expected = EntryAdminCommand {
723 command: "Quit".into(),
724 };
725
726 let mut s = Stream::new(i.as_bytes());
727 let res = admin_command(&mut s).unwrap();
729 assert_eq!(expected, res);
730 }
731
732 #[test]
733 fn parses_details_comment() {
734 let s0 = "-- Id: 123; long: some kind of details here; caller: hello_world()\n";
735 let s1 = "-- Id: 123, long: some kind of details here, caller : hello_world()\n";
736 let s2 = "-- Id= 123, long = some kind of details here, caller= hello_world()\n";
737
738 let expected = (
739 Stream::new("".as_bytes()),
740 HashMap::from([
741 ("Id".into(), "123".into()),
742 ("long".into(), "some kind of details here".into()),
743 ("caller".into(), "hello_world()".into()),
744 ]),
745 );
746
747 let mut s = Stream::new(s0.as_bytes());
748 let res = details_comment(&mut s).unwrap();
749 assert_eq!((s, res), expected);
751
752 let mut s = Stream::new(s1.as_bytes());
753 let res = details_comment(&mut s).unwrap();
754 assert_eq!((s, res), expected);
755
756 let mut s = Stream::new(s2.as_bytes());
757 let res = details_comment(&mut s).unwrap();
758
759 assert_eq!((s, res), expected);
760 }
761
762 #[test]
763 fn parses_details_comment_trailing_key() {
764 let i = "-- Id: 123, long: some kind of details here, caller: hello_world():52\n";
765 let mut s = Stream::new(i.as_bytes());
766
767 let res = details_comment(&mut s).unwrap();
768
769 let expected = (
770 Stream::new("".as_bytes()),
771 HashMap::from([
772 ("Id".into(), "123".into()),
773 ("long".into(), "some kind of details here".into()),
774 ("caller".into(), "hello_world():52".into()),
775 ]),
776 );
777
778 assert_eq!((s, res), expected);
779
780 let i = "-- Id: 123, long: some kind of details here, caller: hello_world(): 52\n";
781 let mut s = Stream::new(i.as_bytes());
782
783 let res = details_comment(&mut s).unwrap();
784 let expected = (
785 Stream::new("".as_bytes()),
786 HashMap::from([
787 ("Id".into(), "123".into()),
788 ("long".into(), "some kind of details here".into()),
789 ("caller".into(), "hello_world(): 52".into()),
790 ]),
791 );
792
793 assert_eq!((s, res), expected);
794 }
795
796 #[test]
797 fn parses_start_timestamp() {
798 let l = "SET timestamp=1517798807;";
799 let mut s = Stream::new(l.as_bytes());
800 let res = start_timestamp_command(&mut s).unwrap();
801
802 let expected = (Stream::new("".as_bytes()), 1517798807);
803
804 assert_eq!((s, res), expected);
805 }
806
807 #[test]
808 fn parses_masked_selects() {
809 let sql0 = "SELECT a, b, 123, 'abcd', myfunc(b) \
810 FROM table_1 \
811 WHERE a > b AND b < 100 \
812 ORDER BY a DESC, b";
813
814 let sql1 = "SELECT a, b, 456, 'efg', myfunc(b) \
815 FROM table_1 \
816 WHERE a > b AND b < 1000 \
817 ORDER BY a DESC, b";
818
819 let ast0 = parse_sql(sql0, &EntryMasking::PlaceHolder).unwrap();
820 let ast1 = parse_sql(sql1, &EntryMasking::PlaceHolder).unwrap();
821
822 assert_eq!(ast0, ast1);
823 }
824
825 #[test]
826 fn parses_select_sql() {
827 let sql = "SELECT a, b, 123, 'abcd', myfunc(b) \
828 FROM table_1 \
829 WHERE a > b AND b < 100 \
830 ORDER BY a DESC, b;";
831
832 let mut s = Stream::new(sql.as_bytes());
833 let res = sql_lines(&mut s).unwrap();
834
835 assert_eq!(res, sql);
836 }
837
838 #[test]
839 fn parses_setter_sql() {
840 let sql = "/*!40101 SET NAMES utf8 */;\n";
841
842 let mut s = Stream::new(sql.as_bytes());
843 let res = sql_lines(&mut s).unwrap();
844
845 assert_eq!(res, sql.trim());
846 }
847
848 #[test]
849 fn parses_quoted_terminator_sql() {
850 let sql = "SELECT
851a.actor_id,
852a.first_name,
853a.last_name,
854GROUP_CONCAT(DISTINCT CONCAT(c.name, ': ',
855 (SELECT GROUP_CONCAT(f.title ORDER BY f.title SEPARATOR ', ')
856 FROM sakila.film f
857 INNER JOIN sakila.film_category fc
858 ON f.film_id = fc.film_id
859 INNER JOIN sakila.film_actor fa
860 ON f.film_id = fa.film_id
861 WHERE fc.category_id = c.category_id
862 AND fa.actor_id = a.actor_id
863 )
864 )
865 ORDER BY c.name SEPARATOR '; ')
866AS film_info
867FROM sakila.actor a;
868";
869
870 let mut s = Stream::new(sql.as_bytes());
871 let res = sql_lines(&mut s).unwrap();
872
873 assert_eq!((s, res), (Stream::new("\n".as_bytes()), sql.trim().into()));
874 }
875
876 #[test]
877 fn parses_quoted_quoted_terminator_sql() {
878 let sql = r#"SELECT
879a.actor_id,
880a.first_name,
881a.last_name,
882GROUP_CONCAT(DISTINCT CONCAT(c.name, ': ',
883 (SELECT GROUP_CONCAT(f.title ORDER BY f.title SEPARATOR ', ')
884 FROM sakila.film f
885 INNER JOIN sakila.film_category fc
886 ON f.film_id = fc.film_id
887 INNER JOIN sakila.film_actor fa
888 ON f.film_id = fa.film_id
889 WHERE fc.category_id = c.category_id
890 AND fa.actor_id = a.actor_id
891 )
892 )
893 ORDER BY c.name SEPARATOR '\'\"; ')
894AS film_info
895FROM sakila.actor a;
896"#;
897
898 let mut s = Stream::new(sql.as_bytes());
899 let res = sql_lines(&mut s).unwrap();
900
901 assert_eq!(res, sql.trim());
902 }
903
904 #[test]
905 fn parses_header() {
906 let h = "/home/karl/mysql/my-5.7/bin/mysqld, Version: 5.7.20-log (MySQL Community Server (GPL)). started with:
907Tcp port: 12345 Unix socket: /tmp/12345/mysql_sandbox12345.sock
908Time Id Command Argument\n";
909
910 let mut s = Stream::new(h.as_bytes());
911
912 let res = log_header(&mut s).unwrap();
913
914 assert_eq!(
915 (s, res),
916 (
917 Stream::new("".as_bytes()),
918 HeaderLines {
919 version: Bytes::from("5.7.20-log (MySQL Community Server (GPL))."),
920 tcp_port: Some(12345),
921 socket: Some(Bytes::from("/tmp/12345/mysql_sandbox12345.sock")),
922 }
923 )
924 );
925 }
926}