1use crate::EntryMasking;
2use bytes::{BufMut, Bytes, BytesMut};
3use sqlparser::ast::Statement;
4use sqlparser::dialect::MySqlDialect;
5use sqlparser::parser::{Parser as SQLParser, ParserError};
6use sqlparser::tokenizer::{Token, Tokenizer};
7use std::borrow::Cow;
8use std::collections::HashMap;
9use std::ops::Not;
10use std::str;
11use std::str::FromStr;
12use winnow::ascii::{
13 Caseless, alpha1, alphanumeric1, digit1, float, line_ending, multispace0, multispace1,
14 till_line_ending,
15};
16use winnow::combinator::repeat;
17use winnow::combinator::{alt, trace};
18use winnow::combinator::{not, opt};
19use winnow::combinator::{preceded, terminated};
20use winnow::error::{ContextError, ErrMode, InputError};
21use winnow::token::{any, literal, take, take_till, take_until};
22use winnow::{ModalResult, Parser, Partial, seq};
23use winnow_datetime::DateTime;
24use winnow_iso8601::datetime::datetime;
25
26pub type Stream<'i> = Partial<&'i [u8]>;
27
28#[derive(Clone)]
31pub struct TimeLine {
32 time: DateTime,
33}
34
35impl TimeLine {
36 pub fn time(&self) -> DateTime {
38 self.time.clone()
39 }
40}
41
42pub fn parse_entry_time(i: &mut Stream) -> ModalResult<DateTime> {
45 trace("parse_entry_time", move |input: &mut Stream| {
46 let dt = seq!(
47 _: literal("# Time:"),
48 _: multispace1,
49 datetime,
50 )
51 .parse_next(input)?;
52
53 Ok(dt.0)
54 })
55 .parse_next(i)
56}
57
58#[derive(Clone, Debug, Eq, PartialEq)]
61pub struct SessionLine {
62 pub(crate) user: Bytes,
63 pub(crate) sys_user: Bytes,
64 pub(crate) host: Option<Bytes>,
65 pub(crate) ip_address: Option<Bytes>,
66 pub(crate) thread_id: u32,
67}
68
69impl SessionLine {
70 pub fn user(&self) -> Bytes {
72 self.user.clone()
73 }
74
75 pub fn sys_user(&self) -> Bytes {
77 self.sys_user.clone()
78 }
79
80 pub fn host(&self) -> Option<Bytes> {
82 self.host.clone()
83 }
84
85 pub fn ip_address(&self) -> Option<Bytes> {
87 self.ip_address.clone()
88 }
89
90 pub fn thread_id(&self) -> u32 {
92 self.thread_id
93 }
94}
95
96#[derive(Debug, PartialEq, Default)]
97pub struct HeaderLines {
98 version: Bytes,
99 tcp_port: Option<usize>,
100 socket: Option<Bytes>,
101}
102
103pub fn log_header<'a>(i: &mut Stream<'_>) -> ModalResult<HeaderLines> {
104 trace("log_header", move |input: &mut Stream<'_>| {
105 let head = seq!{
107 HeaderLines {
108 _: not(literal("#")),
109 _: take_until(1.., ", Version: "),
110 _: (", Version: "),
111 version: take_until(1.., " started with:").map(|v: &[u8]| v.to_owned().into()),
112 _: literal(" started with:"),
113 _: multispace1,
114 _: literal("Tcp port:"),
115 _: multispace1,
116 tcp_port: opt(digit1).map(|v: Option<&[u8]>| v.and_then(|d| Some(str::from_utf8(d).unwrap().parse().unwrap()))),
117 _: multispace1,
118 _: literal("Unix socket: "),
119 socket: opt(take_till(1.., "\n".as_bytes())).map(|v: Option<&[u8]>| v.and_then(|d| Some(d.to_owned().into()))),
120 _: till_line_ending,
121 _: line_ending,
122 _: till_line_ending,
123 _: line_ending,
124 }
125 }.parse_next(input)?;
126
127 Ok(head)
128 }).parse_next(i)
129}
130
131pub fn sql_lines<'a>(i: &mut Stream<'_>) -> ModalResult<Bytes> {
132 trace("sql_lines", move |input: &mut Stream<'_>| {
133 let mut acc = BytesMut::new();
134
135 let mut escaped = false;
136 let mut quotes = vec![];
137
138 loop {
139 let c = any(input)? as char;
140
141 acc.put_slice(&[c as u8]);
142
143 if escaped.not() && (c == '\'' || c == '\"' || c == '`') {
144 if let Some(q) = quotes.last() {
145 if &c == q {
146 let _ = quotes.pop();
147 } else {
148 quotes.push(c);
149 }
150 } else {
151 quotes.push(c);
152 }
153 }
154
155 if escaped.not() && c == '\\' {
156 escaped = true;
157 } else {
158 escaped = false;
159 }
160
161 if quotes.len() == 0 && c == ';' {
162 return Ok(acc.freeze());
163 }
164 }
165 })
166 .parse_next(i)
167}
168
169pub fn alphanumerichyphen1<'a>(i: &mut Stream<'a>) -> ModalResult<&'a [u8]> {
170 alt((alphanumeric1, literal("_"), literal("-"))).parse_next(i)
171}
172
173pub fn host_name<'a>(i: &mut Stream<'_>) -> ModalResult<Bytes> {
174 trace("host_name", move |input: &mut Stream<'_>| {
175 let (mut first, second): (Vec<&[u8]>, &[u8]) = alt((
176 ((
177 repeat(1.., terminated(alphanumerichyphen1, literal("."))),
178 alpha1,
179 )),
180 ((repeat(1, alphanumerichyphen1), take(0 as usize))),
181 ))
182 .parse_next(input)?;
183
184 if !second.is_empty() {
185 first.push(second);
186 }
187
188 let b = first
189 .iter()
190 .enumerate()
191 .fold(BytesMut::new(), |mut acc, (c, p)| {
192 if c > 0 {
193 acc.put_slice(".".as_bytes());
194 }
195
196 acc.put_slice(p);
197 acc
198 });
199
200 Ok(b.freeze())
201 })
202 .parse_next(i)
203}
204
205pub fn ip_address<'a>(i: &mut Stream<'_>) -> ModalResult<Bytes> {
207 trace("ip_address", move |input: &mut Stream<'_>| {
208 let p = seq!(
209 digit1,
210 preceded(literal("."), digit1),
211 preceded(literal("."), digit1),
212 preceded(literal("."), digit1),
213 )
214 .parse_next(input)?;
215
216 let b = [p.0, p.1, p.2, p.3]
217 .iter()
218 .enumerate()
219 .fold(BytesMut::new(), |mut acc, (c, p)| {
220 if c > 0 {
221 acc.put_slice(".".as_bytes());
222 }
223
224 acc.put_slice(p);
225 acc
226 });
227
228 Ok(b.freeze())
229 })
230 .parse_next(i)
231}
232
233pub fn entry_user_thread_id<'a>(i: &mut Stream<'_>) -> ModalResult<u32> {
235 trace("entry_user_thread_id", move |input: &mut Stream<'_>| {
236 let id = seq!(
237 _: literal("Id:"),
238 _: multispace1,
239 digit1
240 )
241 .parse_next(input)?;
242
243 Ok(u32::from_str(str::from_utf8(id.0).unwrap()).unwrap())
244 })
245 .parse_next(i)
246}
247
248pub fn user_name(i: &mut Stream) -> ModalResult<Bytes> {
249 trace("user_name", move |input: &mut Stream<'_>| {
250 let parts: Vec<&[u8]> =
251 repeat(1.., alt((alphanumeric1, literal("_")))).parse_next(input)?;
252
253 let b = parts.iter().fold(BytesMut::new(), |mut acc, p| {
254 acc.put_slice(p);
255 acc
256 });
257
258 Ok(b.freeze())
259 })
260 .parse_next(i)
261}
262
263pub fn entry_user(i: &mut Stream) -> ModalResult<SessionLine> {
265 trace("entry_user", move |input: &mut Stream<'_>| {
266 let s = seq! { SessionLine {
267 _: multispace0,
268 _: literal("# User@Host:"),
269 _: multispace1,
270 user: user_name,
271 _: literal("["),
272 sys_user: user_name,
273 _: literal("]"),
274 _: multispace1,
275 _: literal("@"),
276 _: multispace1,
277 host: opt(host_name),
278 _: multispace0,
279 _: literal("["),
280 _: multispace0,
281 ip_address: opt(ip_address),
282 _: multispace0,
283 _: literal("]"),
284 _: multispace1,
285 thread_id: entry_user_thread_id,
286 }}
287 .parse_next(input)?;
288
289 Ok(s)
290 })
291 .parse_next(i)
292}
293
294#[derive(Clone, Debug, Default, PartialEq)]
296pub struct SqlStatementContext {
297 pub request_id: Option<Bytes>,
299 pub caller: Option<Bytes>,
301 pub function: Option<Bytes>,
303 pub line: Option<u32>,
305}
306
307impl SqlStatementContext {
308 pub fn request_id(&self) -> Option<Cow<str>> {
310 if let Some(i) = &self.request_id {
311 Some(String::from_utf8_lossy(i.as_ref()))
312 } else {
313 None
314 }
315 }
316
317 pub fn caller(&self) -> Option<Cow<str>> {
319 if let Some(c) = &self.caller {
320 Some(String::from_utf8_lossy(c.as_ref()))
321 } else {
322 None
323 }
324 }
325
326 pub fn function(&self) -> Option<Cow<str>> {
328 if let Some(f) = &self.function {
329 Some(String::from_utf8_lossy(f.as_ref()))
330 } else {
331 None
332 }
333 }
334
335 pub fn line(&self) -> Option<u32> {
337 self.line
338 }
339}
340
341pub fn details_comment<'a>(i: &mut Stream) -> ModalResult<HashMap<Bytes, Bytes>> {
342 trace("details_comment", move |input: &mut Stream<'_>| {
343 let mut name: Option<Bytes> = None;
344
345 let mut res: HashMap<Bytes, BytesMut> = HashMap::new();
346
347 let _ = literal("--").parse_next(input)?;
348
349 loop {
350 if name.is_none() {
351 if let Ok(n) = details_tag(input) {
352 name.replace(n.clone());
353 if let Some(_) = res.insert(n, BytesMut::new()) {
354 return Err(ErrMode::Cut(ContextError::new()));
356 }
357 }
358 }
359
360 if let Ok(c) = any::<Partial<&[u8]>, InputError<_>>(input) {
361 let c = c as char;
362
363 if c == '\n' || c == '\r' {
364 break;
365 }
366
367 if c == ';' || c == ',' {
368 name = None;
369 continue;
370 }
371
372 if let Some(k) = &name {
373 let v = &mut res.get_mut(k).ok_or(ErrMode::Cut(ContextError::new()))?;
375
376 v.put_bytes(c as u8, 1);
377 } else {
378 return Err(ErrMode::Cut(ContextError::new()));
380 }
381
382 continue;
383 } else {
384 break;
385 }
386 }
387
388 Ok(res.into_iter().map(|(k, v)| (k, v.freeze())).collect())
389 })
390 .parse_next(i)
391}
392
393pub fn details_tag<'a>(i: &mut Stream) -> ModalResult<Bytes> {
394 trace("details_tag", move |input: &mut Stream<'_>| {
395 let name = seq!(
396 _: multispace0,
397 user_name,
398 _: multispace0,
399 _: alt((literal(":"), literal("="))),
400 _: multispace0,
401 )
402 .parse_next(input)?;
403
404 Ok(name.0.into())
405 })
406 .parse_next(i)
407}
408
409#[derive(Clone, Copy, Debug, PartialEq)]
411pub struct StatsLine {
412 pub(crate) query_time: f64,
414 pub(crate) lock_time: f64,
416 pub(crate) rows_sent: u32,
418 pub(crate) rows_examined: u32,
420}
421
422impl StatsLine {
423 pub fn query_time(&self) -> f64 {
425 self.query_time.clone()
426 }
427 pub fn lock_time(&self) -> f64 {
429 self.lock_time.clone()
430 }
431
432 pub fn rows_sent(&self) -> u32 {
434 self.rows_sent.clone()
435 }
436 pub fn rows_examined(&self) -> u32 {
438 self.rows_examined.clone()
439 }
440}
441
442pub fn parse_entry_stats(i: &mut Stream<'_>) -> ModalResult<StatsLine> {
444 trace("parse_entry_stats", move |input: &mut Stream<'_>| {
445 let stats = seq! {StatsLine {
446 _: literal("#"),
447 _: multispace1,
448 _: literal("Query_time:"),
449 _: multispace1,
450 query_time: float,
451 _: multispace1,
452 _: literal("Lock_time:"),
453 _: multispace1,
454 lock_time: float,
455 _: multispace1,
456 _: literal("Rows_sent:"),
457 _: multispace1,
458 rows_sent: digit1.map(|d| str::from_utf8(d).unwrap().parse().unwrap()),
459 _: multispace1,
460 _: literal("Rows_examined:"),
461 _: multispace1,
462 rows_examined: digit1.map(|d| str::from_utf8(d).unwrap().parse().unwrap()),
463 }}
464 .parse_next(input)?;
465
466 Ok(stats)
467 })
468 .parse_next(i)
469}
470
471#[derive(Clone, Debug, PartialEq)]
473pub struct EntryAdminCommand {
474 pub command: Bytes,
476}
477
478pub fn admin_command<'a>(i: &mut Stream) -> ModalResult<EntryAdminCommand> {
480 trace("admin_command", move |input: &mut Stream<'_>| {
481 let command = seq!(
482 _: literal("# administrator command:"),
483 _: multispace1,
484 alphanumerichyphen1,
485 _: literal(";"),
486 )
487 .parse_next(input)?;
488
489 Ok(EntryAdminCommand {
490 command: command.0.to_owned().into(),
491 })
492 })
493 .parse_next(i)
494}
495
496pub fn use_database(i: &mut Stream) -> ModalResult<Bytes> {
498 trace("use_database", move |input: &mut Stream<'_>| {
499 let db_name = seq!(
500 _: literal(Caseless("USE")),
501 _: multispace1,
502 user_name,
503 _: multispace0,
504 _: literal(";"),
505 )
506 .parse_next(input)?;
507
508 Ok(db_name.0.into())
509 })
510 .parse_next(i)
511}
512
513pub fn start_timestamp_command(i: &mut Stream) -> ModalResult<u32> {
515 trace("start_timestamp_command", move |input: &mut Stream<'_>| {
516 let time = seq!(
517 _: literal("SET timestamp"),
518 _: multispace0,
519 _: literal("="),
520 _: multispace0,
521 digit1,
522 _: multispace0,
523 _: literal(";"),
524 )
525 .parse_next(input)?;
526
527 Ok(u32::from_str(str::from_utf8(time.0).unwrap()).unwrap())
528 })
529 .parse_next(i)
530}
531
532pub fn parse_sql(sql: &str, mask: &EntryMasking) -> Result<Vec<Statement>, ParserError> {
538 let mut tokenizer = Tokenizer::new(&MySqlDialect {}, sql);
539 let mut tokens = tokenizer.tokenize()?;
540
541 tokens = mask_tokens(tokens, mask);
542
543 let mut parser = SQLParser::new(&MySqlDialect {}).with_tokens(tokens);
544
545 parser.parse_statements()
546}
547
548pub fn mask_tokens(tokens: Vec<Token>, mask: &EntryMasking) -> Vec<Token> {
552 let mut acc = vec![];
553
554 if mask == &EntryMasking::None {
555 return tokens;
556 }
557
558 for t in tokens {
559 let mt = if let Token::Number(_, _) = t {
560 Token::Placeholder("?".into())
561 } else if let Token::Number(_, _) = t {
562 Token::Placeholder("?".into())
563 } else if let Token::SingleQuotedString(_) = t {
564 Token::Placeholder("?".into())
565 } else if let Token::DoubleQuotedString(_) = t {
566 Token::Placeholder("?".into())
567 } else if let Token::NationalStringLiteral(_) = t {
568 Token::Placeholder("?".into())
569 } else if let Token::EscapedStringLiteral(_) = t {
570 Token::Placeholder("?".into())
571 } else if let Token::HexStringLiteral(_) = t {
572 Token::Placeholder("?".into())
573 } else {
574 t
575 };
576
577 acc.push(mt);
578 }
579
580 acc
581}
582
583#[cfg(test)]
584mod tests {
585 use crate::EntryMasking;
586 use crate::parser::{
587 EntryAdminCommand, HeaderLines, SessionLine, StatsLine, Stream, admin_command,
588 details_comment, entry_user, host_name, ip_address, log_header, parse_entry_stats,
589 parse_entry_time, parse_sql, sql_lines, start_timestamp_command, use_database,
590 };
591 use bytes::Bytes;
592 use std::assert_eq;
593 use std::collections::HashMap;
594 use winnow_datetime::{Date, DateTime, Offset, Time};
595
596 #[test]
597 fn parses_time_line() {
598 let i = "# Time: 2015-06-26T16:43:23+0200";
599
600 let expected = DateTime {
601 date: Date::YMD {
602 year: 2015,
603 month: 6,
604 day: 26,
605 },
606 time: Time {
607 hour: 16,
608 minute: 43,
609 second: 23,
610 millisecond: 0,
611 offset: Some(Offset::Fixed {
612 hours: 2,
613 minutes: 0,
614 critical: false,
615 }),
616 time_zone: None,
617 calendar: None,
618 },
619 };
620
621 let mut s = Stream::new(i.as_bytes());
622
623 let dt = parse_entry_time(&mut s).unwrap();
625 assert_eq!(expected, dt);
626 }
627
628 #[test]
629 fn parses_use_database() {
630 let i = "use mysql;";
631 let mut s = Stream::new(i.as_bytes());
632
633 let res = use_database(&mut s).unwrap();
634 assert_eq!(
635 (s, res),
636 (Stream::new("".as_bytes()), "mysql".trim().into())
637 );
638 }
639
640 #[test]
641 fn parses_localhost_host_name() {
642 let i = "localhost ";
643
644 let mut s = Stream::new(i.as_bytes());
645 let res = host_name(&mut s).unwrap();
646
647 assert_eq!(res, i.trim());
648 }
649
650 #[test]
651 fn parses_full_host_name() {
652 let i = "local.tests.rs ";
653
654 let mut s = Stream::new(i.as_bytes());
655 let res = host_name(&mut s).unwrap();
656
657 assert_eq!(res, Bytes::from("local.tests.rs".trim()));
658 }
659
660 #[test]
661 fn parses_ip_address() {
662 let i = "127.0.0.2 ";
663
664 let mut s = Stream::new(i.as_bytes());
665 let res = ip_address(&mut s).unwrap();
666
667 assert_eq!(res, Bytes::from(i.trim()));
668 }
669
670 #[test]
671 fn parses_user_line_no_ip() {
672 let i = "# User@Host: msandbox[msandbox] @ localhost [] Id: 3\n";
673
674 let expected = SessionLine {
675 user: Bytes::from("msandbox"),
676 sys_user: Bytes::from("msandbox"),
677 host: Some(Bytes::from("localhost")),
678 ip_address: None,
679 thread_id: 3,
680 };
681
682 let mut s = Stream::new(i.as_bytes());
683 let res = entry_user(&mut s).unwrap();
684 assert_eq!(expected, res);
686 }
687
688 #[test]
689 fn parses_user_line_no_host() {
690 let i = "# User@Host: lobster[lobster] @ [192.168.56.1] Id: 190\n";
691 let mut s = Stream::new(i.as_bytes());
692 let expected = SessionLine {
693 user: Bytes::from("lobster"),
694 sys_user: Bytes::from("lobster"),
695 host: None,
696 ip_address: Some(Bytes::from("192.168.56.1")),
697 thread_id: 190,
698 };
699
700 let res = entry_user(&mut s).unwrap();
701 assert_eq!(expected, res);
702 }
703
704 #[test]
705 fn parses_stats_line() {
706 let i = "# Query_time: 1.000016 Lock_time: 2.000000 Rows_sent: 3 Rows_examined: 4\n";
707
708 let expected = StatsLine {
709 query_time: 1.000016,
710 lock_time: 2.0,
711 rows_sent: 3,
712 rows_examined: 4,
713 };
714
715 let mut s = Stream::new(i.as_bytes());
716 let res = parse_entry_stats(&mut s).unwrap();
717 assert_eq!(expected, res);
719 }
720
721 #[test]
722 fn parses_admin_command_line() {
723 let i = "# administrator command: Quit;\n";
724
725 let expected = EntryAdminCommand {
726 command: "Quit".into(),
727 };
728
729 let mut s = Stream::new(i.as_bytes());
730 let res = admin_command(&mut s).unwrap();
732 assert_eq!(expected, res);
733 }
734
735 #[test]
736 fn parses_details_comment() {
737 let s0 = "-- Id: 123; long: some kind of details here; caller: hello_world()\n";
738 let s1 = "-- Id: 123, long: some kind of details here, caller : hello_world()\n";
739 let s2 = "-- Id= 123, long = some kind of details here, caller= hello_world()\n";
740
741 let expected = (
742 Stream::new("".as_bytes()),
743 HashMap::from([
744 ("Id".into(), "123".into()),
745 ("long".into(), "some kind of details here".into()),
746 ("caller".into(), "hello_world()".into()),
747 ]),
748 );
749
750 let mut s = Stream::new(s0.as_bytes());
751 let res = details_comment(&mut s).unwrap();
752 assert_eq!((s, res), expected);
754
755 let mut s = Stream::new(s1.as_bytes());
756 let res = details_comment(&mut s).unwrap();
757 assert_eq!((s, res), expected);
758
759 let mut s = Stream::new(s2.as_bytes());
760 let res = details_comment(&mut s).unwrap();
761
762 assert_eq!((s, res), expected);
763 }
764
765 #[test]
766 fn parses_details_comment_trailing_key() {
767 let i = "-- Id: 123, long: some kind of details here, caller: hello_world():52\n";
768 let mut s = Stream::new(i.as_bytes());
769
770 let res = details_comment(&mut s).unwrap();
771
772 let expected = (
773 Stream::new("".as_bytes()),
774 HashMap::from([
775 ("Id".into(), "123".into()),
776 ("long".into(), "some kind of details here".into()),
777 ("caller".into(), "hello_world():52".into()),
778 ]),
779 );
780
781 assert_eq!((s, res), expected);
782
783 let i = "-- Id: 123, long: some kind of details here, caller: hello_world(): 52\n";
784 let mut s = Stream::new(i.as_bytes());
785
786 let res = details_comment(&mut s).unwrap();
787 let expected = (
788 Stream::new("".as_bytes()),
789 HashMap::from([
790 ("Id".into(), "123".into()),
791 ("long".into(), "some kind of details here".into()),
792 ("caller".into(), "hello_world(): 52".into()),
793 ]),
794 );
795
796 assert_eq!((s, res), expected);
797 }
798
799 #[test]
800 fn parses_start_timestamp() {
801 let l = "SET timestamp=1517798807;";
802 let mut s = Stream::new(l.as_bytes());
803 let res = start_timestamp_command(&mut s).unwrap();
804
805 let expected = (Stream::new("".as_bytes()), 1517798807);
806
807 assert_eq!((s, res), expected);
808 }
809
810 #[test]
811 fn parses_masked_selects() {
812 let sql0 = "SELECT a, b, 123, 'abcd', myfunc(b) \
813 FROM table_1 \
814 WHERE a > b AND b < 100 \
815 ORDER BY a DESC, b";
816
817 let sql1 = "SELECT a, b, 456, 'efg', myfunc(b) \
818 FROM table_1 \
819 WHERE a > b AND b < 1000 \
820 ORDER BY a DESC, b";
821
822 let ast0 = parse_sql(sql0, &EntryMasking::PlaceHolder).unwrap();
823 let ast1 = parse_sql(sql1, &EntryMasking::PlaceHolder).unwrap();
824
825 assert_eq!(ast0, ast1);
826 }
827
828 #[test]
829 fn parses_select_sql() {
830 let sql = "SELECT a, b, 123, 'abcd', myfunc(b) \
831 FROM table_1 \
832 WHERE a > b AND b < 100 \
833 ORDER BY a DESC, b;";
834
835 let mut s = Stream::new(sql.as_bytes());
836 let res = sql_lines(&mut s).unwrap();
837
838 assert_eq!(res, sql);
839 }
840
841 #[test]
842 fn parses_setter_sql() {
843 let sql = "/*!40101 SET NAMES utf8 */;\n";
844
845 let mut s = Stream::new(sql.as_bytes());
846 let res = sql_lines(&mut s).unwrap();
847
848 assert_eq!(res, sql.trim());
849 }
850
851 #[test]
852 fn parses_quoted_terminator_sql() {
853 let sql = "SELECT
854a.actor_id,
855a.first_name,
856a.last_name,
857GROUP_CONCAT(DISTINCT CONCAT(c.name, ': ',
858 (SELECT GROUP_CONCAT(f.title ORDER BY f.title SEPARATOR ', ')
859 FROM sakila.film f
860 INNER JOIN sakila.film_category fc
861 ON f.film_id = fc.film_id
862 INNER JOIN sakila.film_actor fa
863 ON f.film_id = fa.film_id
864 WHERE fc.category_id = c.category_id
865 AND fa.actor_id = a.actor_id
866 )
867 )
868 ORDER BY c.name SEPARATOR '; ')
869AS film_info
870FROM sakila.actor a;
871";
872
873 let mut s = Stream::new(sql.as_bytes());
874 let res = sql_lines(&mut s).unwrap();
875
876 assert_eq!((s, res), (Stream::new("\n".as_bytes()), sql.trim().into()));
877 }
878
879 #[test]
880 fn parses_quoted_quoted_terminator_sql() {
881 let sql = r#"SELECT
882a.actor_id,
883a.first_name,
884a.last_name,
885GROUP_CONCAT(DISTINCT CONCAT(c.name, ': ',
886 (SELECT GROUP_CONCAT(f.title ORDER BY f.title SEPARATOR ', ')
887 FROM sakila.film f
888 INNER JOIN sakila.film_category fc
889 ON f.film_id = fc.film_id
890 INNER JOIN sakila.film_actor fa
891 ON f.film_id = fa.film_id
892 WHERE fc.category_id = c.category_id
893 AND fa.actor_id = a.actor_id
894 )
895 )
896 ORDER BY c.name SEPARATOR '\'\"; ')
897AS film_info
898FROM sakila.actor a;
899"#;
900
901 let mut s = Stream::new(sql.as_bytes());
902 let res = sql_lines(&mut s).unwrap();
903
904 assert_eq!(res, sql.trim());
905 }
906
907 #[test]
908 fn parses_header() {
909 let h = "/home/karl/mysql/my-5.7/bin/mysqld, Version: 5.7.20-log (MySQL Community Server (GPL)). started with:
910Tcp port: 12345 Unix socket: /tmp/12345/mysql_sandbox12345.sock
911Time Id Command Argument\n";
912
913 let mut s = Stream::new(h.as_bytes());
914
915 let res = log_header(&mut s).unwrap();
916
917 assert_eq!(
918 (s, res),
919 (
920 Stream::new("".as_bytes()),
921 HeaderLines {
922 version: Bytes::from("5.7.20-log (MySQL Community Server (GPL))."),
923 tcp_port: Some(12345),
924 socket: Some(Bytes::from("/tmp/12345/mysql_sandbox12345.sock")),
925 }
926 )
927 );
928 }
929}