1use crate::codec::EntryError::MissingField;
2use crate::parser::{
3 HeaderLines, Stream, admin_command, details_comment, entry_user, log_header, parse_entry_stats,
4 parse_entry_time, parse_sql, sql_lines, start_timestamp_command, use_database,
5};
6use crate::types::EntryStatement::SqlStatement;
7use crate::types::{Entry, EntryCall, EntrySqlAttributes, EntrySqlStatement, EntryStatement};
8use crate::{EntryCodecConfig, SessionLine, SqlStatementContext, StatsLine};
9use bytes::{Bytes, BytesMut};
10use log::debug;
11use std::default::Default;
12use std::fmt::{Display, Formatter};
13use std::ops::AddAssign;
14use thiserror::Error;
15use tokio::io;
16use tokio_util::codec::Decoder;
17use winnow::ModalResult;
18use winnow::Parser;
19use winnow::ascii::multispace0;
20use winnow::combinator::opt;
21use winnow::error::ErrMode;
22use winnow::stream::AsBytes;
23use winnow::stream::Stream as _;
24use winnow_datetime::DateTime;
25
26const LENGTH_MAX: usize = 10000000000;
27
28#[derive(Error, Debug)]
30pub enum EntryError {
31 #[error("entry field is missing: {0}")]
33 MissingField(String),
34 #[error("duplicate id: {0}")]
36 DuplicateId(String),
37}
38
39#[derive(Debug, Error)]
41pub enum CodecError {
42 #[error("file read error: {0}")]
44 IO(#[from] io::Error),
45 #[error("found start of new entry before entry completed at line: {0}")]
47 IncompleteEntry(EntryError),
48}
49
50#[derive(Debug)]
51enum CodecExpect {
52 Header,
53 Time,
54 User,
55 Stats,
56 UseDatabase,
57 StartTimeStamp,
58 Sql,
59}
60
61impl Default for CodecExpect {
62 fn default() -> Self {
63 Self::Header
64 }
65}
66
67impl Display for CodecExpect {
68 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
69 let out = match self {
70 CodecExpect::Header => "header",
71 CodecExpect::Time => "time",
72 CodecExpect::User => "user",
73 CodecExpect::Stats => "stats",
74 CodecExpect::UseDatabase => "use database",
75 CodecExpect::StartTimeStamp => "start time stamp statement",
76 CodecExpect::Sql => "sql statement",
77 };
78 write!(f, "{}", out)
79 }
80}
81
82#[derive(Debug, Default)]
83struct EntryContext {
84 expects: CodecExpect,
85 headers: HeaderLines,
86 time: Option<DateTime>,
87 user: Option<SessionLine>,
88 stats: Option<StatsLine>,
89 set_timestamp: Option<u32>,
90 attributes: Option<EntrySqlAttributes>,
91}
92
93impl EntryContext {
94 fn complete(&mut self) -> Result<Entry, EntryError> {
95 let time = self.time.clone().ok_or(MissingField("time".into()))?;
96 let session = self.user.clone().ok_or(MissingField("user".into()))?;
97 let stats = self.stats.clone().ok_or(MissingField("stats".into()))?;
98 let set_timestamp = self
99 .set_timestamp
100 .clone()
101 .ok_or(MissingField("set timestamp".into()))?;
102 let attributes = self.attributes.clone().ok_or(MissingField("sql".into()))?;
103 let e = Entry {
104 call: EntryCall::new(time, set_timestamp),
105 session: session.into(),
106 stats: stats.into(),
107 sql_attributes: attributes,
108 };
109
110 self.reset();
111
112 Ok(e)
113 }
114
115 fn reset(&mut self) {
116 *self = EntryContext::default();
117 }
118}
119
120#[derive(Debug, Default)]
122pub struct EntryCodec {
123 processed: usize,
124 context: EntryContext,
125 config: EntryCodecConfig,
126}
127
128impl EntryCodec {
129 pub fn new(c: EntryCodecConfig) -> Self {
131 Self {
132 config: c,
133 ..Default::default()
134 }
135 }
136 fn parse_next<'b>(&mut self, i: &mut Stream<'b>) -> ModalResult<Option<Entry>> {
138 let entry = match self.context.expects {
139 CodecExpect::Header => {
140 let _ = multispace0(i)?;
141
142 let res = opt(log_header).parse_next(i)?;
143 self.context.expects = CodecExpect::Time;
144 self.context.headers = res.unwrap_or_default();
145
146 None
147 }
148 CodecExpect::Time => {
149 let _ = multispace0(i)?;
150
151 let dt = parse_entry_time(i)?;
152 self.context.time = Some(dt);
153 self.context.expects = CodecExpect::User;
154 None
155 }
156 CodecExpect::User => {
157 let sl = entry_user(i)?;
158 self.context.user = Some(sl);
159 self.context.expects = CodecExpect::Stats;
160 None
161 }
162 CodecExpect::Stats => {
163 let _ = multispace0(i)?;
164 let st = parse_entry_stats(i)?;
165 self.context.stats = Some(st);
166 self.context.expects = CodecExpect::UseDatabase;
167 None
168 }
169 CodecExpect::UseDatabase => {
170 let _ = multispace0(i)?;
171 let _ = opt(use_database).parse_next(i)?;
172
173 self.context.expects = CodecExpect::StartTimeStamp;
174 None
175 }
176 CodecExpect::StartTimeStamp => {
177 let _ = multispace0(i)?;
178 let st = start_timestamp_command(i)?;
179 self.context.set_timestamp = Some(st.into());
180 self.context.expects = CodecExpect::Sql;
181 None
182 }
183 CodecExpect::Sql => {
184 let _ = multispace0(i)?;
185
186 if let Ok(c) = admin_command(i) {
187 self.context.attributes = Some(EntrySqlAttributes {
188 sql: (c.command.clone()),
189 statement: EntryStatement::AdminCommand(c),
190 });
191 } else {
192 let mut details = None;
193
194 if let Ok(Some(d)) = opt(details_comment).parse_next(i) {
195 details = Some(d);
196 }
197
198 let mut sql_lines = sql_lines(i)?;
199
200 let s = if let Ok(s) =
201 parse_sql(&String::from_utf8_lossy(&sql_lines), &self.config.masking)
202 {
203 if s.len() == 1 {
204 let context: Option<SqlStatementContext> = if let Some(d) = details {
205 if let Some(f) = &self.config.map_comment_context {
206 f(d)
208 } else {
209 None
210 }
211 } else {
212 None
213 };
214
215 let s = EntrySqlStatement {
216 statement: s[0].clone(),
217 context,
218 };
219
220 sql_lines = Bytes::from(s.statement.to_string());
221 SqlStatement(s)
222 } else {
223 EntryStatement::InvalidStatement(
224 String::from_utf8_lossy(&sql_lines).to_string(),
225 )
226 }
227 } else {
228 EntryStatement::InvalidStatement(
229 String::from_utf8_lossy(&sql_lines).to_string(),
230 )
231 };
232
233 self.context.attributes = Some(EntrySqlAttributes {
234 sql: sql_lines,
235 statement: s,
237 });
238 }
239
240 let e = self.context.complete().unwrap();
241 Some(e)
242 }
243 };
244
245 return if let Some(e) = entry {
246 self.processed.add_assign(1);
247
248 Ok(Some(e))
249 } else {
250 Ok(None)
251 };
252 }
253}
254
255impl Decoder for EntryCodec {
256 type Item = Entry;
257 type Error = CodecError;
258
259 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
261 if src.len() < 4 {
262 return Ok(None);
264 }
265
266 let mut length_bytes = [0u8; 4];
268 length_bytes.copy_from_slice(&src[..4]);
269 let length = u32::from_le_bytes(length_bytes) as usize;
270
271 if length > LENGTH_MAX {
274 return Err(io::Error::new(
275 io::ErrorKind::InvalidData,
276 format!("Frame of length {} is too large.", length),
277 )
278 .into());
279 }
280
281 let b = &src.split()[..];
282 let mut i = Stream::new(&b);
283
284 let mut start = i.checkpoint();
285
286 loop {
287 if i.len() == 0 {
288 return Ok(None);
289 };
290
291 match self.parse_next(&mut i) {
292 Ok(e) => {
293 if let Some(e) = e {
294 self.context = EntryContext::default();
295
296 src.extend_from_slice(i.as_bytes());
297
298 return Ok(Some(e));
299 } else {
300 debug!("preparing input for next parser\n");
301
302 start = i.checkpoint();
303
304 continue;
305 }
306 }
307 Err(ErrMode::Incomplete(_)) => {
308 i.reset(&start);
309 src.extend_from_slice(i.as_bytes());
310
311 return Ok(None);
312 }
313 Err(ErrMode::Backtrack(e)) => {
314 panic!(
315 "unhandled parser backtrack error after {:#?} processed: {}",
316 e.to_string(),
317 self.processed
318 );
319 }
320 Err(ErrMode::Cut(e)) => {
321 panic!(
322 "unhandled parser cut error after {:#?} processed: {}",
323 e.to_string(),
324 self.processed
325 );
326 }
327 }
328 }
329 }
330
331 fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
335 match self.decode(buf)? {
336 Some(frame) => Ok(Some(frame)),
337 None => {
338 let p = buf.iter().position(|v| !v.is_ascii_whitespace());
339
340 if p.is_none() {
341 Ok(None)
342 } else {
343 let out = format!(
344 "bytes remaining on stream; {}",
345 std::str::from_utf8(buf).unwrap()
346 );
347 Err(io::Error::new(io::ErrorKind::Other, out).into())
348 }
349 }
350 }
351 }
352}
353
354#[cfg(test)]
355mod tests {
356 use crate::codec::EntryCodec;
357 use crate::parser::parse_sql;
358 use crate::types::EntryStatement::SqlStatement;
359 use crate::types::{
360 Entry, EntryCall, EntrySession, EntrySqlAttributes, EntrySqlStatement,
361 EntrySqlStatementObject, EntryStatement, EntryStats,
362 };
363 use crate::{EntryCodecConfig, EntryMasking, SqlStatementContext};
364 use bytes::{Bytes, BytesMut};
365 use futures::StreamExt;
366 use std::default::Default;
367 use std::io::Cursor;
368 use std::ops::AddAssign;
369
370 use tokio::fs::File;
371 use tokio_util::codec::Framed;
372 use winnow::error::InputError;
373 use winnow_iso8601::datetime::datetime;
374
375 #[tokio::test]
376 async fn parses_select_entry() {
377 let sql_comment = "-- request_id: apLo5wdqkmKw4W7vGfiBc5, file: src/endpoints/original/mod\
378 .rs, method: notifications(), line: 38";
379 let sql = "SELECT film.film_id AS FID, film.title AS title, film.description AS \
380 description, category.name AS category, film.rental_rate AS price FROM category LEFT JOIN \
381 film_category ON category.category_id = film_category.category_id LEFT JOIN film ON \
382 film_category.film_id = film.film_id GROUP BY film.film_id, category.name;";
383 let mut time = "2018-02-05T02:46:47.273Z";
385
386 let entry = format!(
387 "# Time: {}
388# User@Host: msandbox[msandbox] @ localhost [] Id: 10
389# Query_time: 0.000352 Lock_time: 0.000000 Rows_sent: 0 Rows_examined: 0
390use mysql;
391SET timestamp=1517798807;
392{}
393{},
394",
395 time, sql_comment, sql
396 );
397
398 let mut eb = entry.as_bytes().to_vec();
399
400 let config = EntryCodecConfig {
401 masking: Default::default(),
402 map_comment_context: Some(|d| {
403 let acc = SqlStatementContext {
404 request_id: d
405 .get(&*BytesMut::from("request_id"))
406 .and_then(|b| Some(b.clone())),
407 caller: d
408 .get(&*BytesMut::from("file"))
409 .and_then(|b| Some(b.clone())),
410 function: d
411 .get(&*BytesMut::from("method"))
412 .and_then(|b| Some(b.clone())),
413 line: d
414 .get(&*BytesMut::from("line"))
415 .and_then(|b| String::from_utf8_lossy(b).parse().ok()),
416 };
417
418 if acc == SqlStatementContext::default() {
419 None
420 } else {
421 Some(acc)
422 }
423 }),
424 };
425
426 let mut ff = Framed::new(Cursor::new(&mut eb), EntryCodec::new(config));
427 let e = ff.next().await.unwrap().unwrap();
428
429 let stmts = parse_sql(sql, &EntryMasking::None).unwrap();
430
431 let expected_stmt = EntrySqlStatement {
432 statement: stmts.get(0).unwrap().clone(),
433 context: Some(SqlStatementContext {
434 request_id: Some("apLo5wdqkmKw4W7vGfiBc5".into()),
435 caller: Some("src/endpoints/original/mod.rs".into()),
436 function: Some("notifications()".into()),
437 line: Some(38),
438 }),
439 };
440
441 let expected_sql = sql.trim().strip_suffix(";").unwrap();
442
443 let expected_entry = Entry {
444 call: EntryCall::new(
445 datetime::<_, InputError<_>>(&mut time).unwrap(),
447 1517798807,
448 ),
449 session: EntrySession {
450 user_name: Bytes::from("msandbox"),
451 sys_user_name: Bytes::from("msandbox"),
452 host_name: Some(Bytes::from("localhost")),
453 ip_address: None,
454 thread_id: 10,
455 },
456 stats: EntryStats {
457 query_time: 0.000352,
458 lock_time: 0.0,
459 rows_sent: 0,
460 rows_examined: 0,
461 },
462 sql_attributes: EntrySqlAttributes {
463 sql: Bytes::from(expected_sql),
464 statement: SqlStatement(expected_stmt),
465 },
466 };
467
468 assert_eq!(e, expected_entry);
469 }
470
471 #[tokio::test]
472 async fn parses_multiple_entries() {
473 let entries = "# Time: 2018-02-05T02:46:47.273786Z
474# User@Host: msandbox[msandbox] @ localhost [] Id: 10
475# Query_time: 0.000352 Lock_time: 0.000000 Rows_sent: 0 Rows_examined: 0
476SET timestamp=1517798807;
477-- ID: 123, caller: hello_world()
478SELECT film.film_id AS FID, film.title AS title, film.description AS description, category.name AS category, film.rental_rate AS price
479FROM category LEFT JOIN film_category ON category.category_id = film_category.category_id LEFT JOIN film ON film_category.film_id = film.film_id
480GROUP BY film.film_id, category.name;
481# Time: 2018-02-05T02:46:47.273787Z
482# User@Host: msandbox[msandbox] @ localhost [] Id: 10
483# Query_time: 0.000352 Lock_time: 0.000000 Rows_sent: 0 Rows_examined: 0
484SET timestamp=1517798808;
485/*!40101 SET NAMES utf8 */;
486# Time: 2018-02-05T02:46:47.273788Z
487# User@Host: msandbox[msandbox] @ localhost [] Id: 10
488# Query_time: 0.000352 Lock_time: 0.000000 Rows_sent: 0 Rows_examined: 0
489SET timestamp=1517798809;
490-- ID: 456, caller: hello_world()
491SELECT film2.film_id AS FID, film2.title AS title, film2.description AS description, category.name
492AS category, film2.rental_rate AS price
493FROM category LEFT JOIN film_category ON category.category_id = film_category.category_id LEFT
494JOIN film2 ON film_category.film_id = film2.film_id
495GROUP BY film2.film_id, category.name;
496";
497
498 let mut eb = entries.as_bytes().to_vec();
499
500 let mut ff = Framed::with_capacity(Cursor::new(&mut eb), EntryCodec::default(), 4);
501
502 let mut found = 0;
503 let mut invalid = 0;
504
505 while let Some(res) = ff.next().await {
506 let e = res.unwrap();
507 found.add_assign(1);
508
509 if let EntryStatement::InvalidStatement(_) = e.sql_attributes.statement {
510 invalid.add_assign(1);
511 }
512 }
513
514 assert_eq!(found, 3, "found");
515 assert_eq!(invalid, 1, "valid");
516 }
517
518 #[tokio::test]
519 async fn parses_select_objects() {
520 let sql = String::from("SELECT film.film_id AS FID, film.title AS title, film.description AS description, category.name AS category, film.rental_rate AS price
521 FROM category LEFT JOIN film_category ON category.category_id = film_category.category_id LEFT
522 JOIN film ON film_category.film_id = film.film_id LEFT JOIN film AS dupe_film ON film_category
523 .film_id = dupe_film.film_id LEFT JOIN other.film AS other_film ON other_film.film_id =
524 film_category.film_id
525 GROUP BY film.film_id, category.name;");
526
527 let entry = format!(
528 "# Time: 2018-02-05T02:46:47.273786Z
529 # User@Host: msandbox[msandbox] @ localhost [] Id: 10
530 # Query_time: 0.000352 Lock_time: 0.000000 Rows_sent: 0 Rows_examined: 0
531 SET timestamp=1517798807;
532 {}",
533 sql
534 );
535
536 let expected = vec![
537 EntrySqlStatementObject {
538 schema_name: None,
539 object_name: "category".as_bytes().into(),
540 },
541 EntrySqlStatementObject {
542 schema_name: None,
543 object_name: "film".as_bytes().into(),
544 },
545 EntrySqlStatementObject {
546 schema_name: None,
547 object_name: "film_category".as_bytes().into(),
548 },
549 EntrySqlStatementObject {
550 schema_name: Some("other".as_bytes().into()),
551 object_name: "film".as_bytes().into(),
552 },
553 ];
554
555 let mut eb = entry.as_bytes().to_vec();
556
557 let mut ff = Framed::new(Cursor::new(&mut eb), EntryCodec::default());
558 let e = ff.next().await.unwrap().unwrap();
559
560 match e.sql_attributes.statement() {
561 SqlStatement(s) => {
562 assert_eq!(s.objects(), expected);
563 assert_eq!(s.sql_type().to_string(), "SELECT".to_string());
564 }
565 _ => {
566 panic!("should have parsed sql as SqlStatement")
567 }
568 }
569 }
570
571 #[tokio::test]
572 async fn parse_log_file() {
573 let f = File::open("assets/slow-test-queries.log").await.unwrap();
574 let mut ff = Framed::new(f, EntryCodec::default());
575
576 let mut i = 0;
577
578 while let Some(res) = ff.next().await {
579 let _ = res.unwrap();
580 i.add_assign(1);
581 }
582
583 assert_eq!(i, 310);
584 }
585
586 #[tokio::test]
587 async fn parse_mysql_log_file_small_capacity() {
588 let f = File::open("assets/slow-test-queries.log").await.unwrap();
589 let mut ff = Framed::with_capacity(f, EntryCodec::default(), 4);
590
591 let mut i = 0;
592
593 while let Some(res) = ff.next().await {
594 let _ = res.unwrap();
595 i.add_assign(1);
596 }
597
598 assert_eq!(i, 310);
599 }
600}