use crate::codec::EntryError::MissingField;
use crate::parser::{
HeaderLines, Stream, admin_command, details_comment, entry_user, log_header, parse_entry_stats,
parse_entry_time, parse_sql, sql_lines, start_timestamp_command, use_database,
};
use crate::types::EntryStatement::SqlStatement;
use crate::types::{Entry, EntryCall, EntrySqlAttributes, EntrySqlStatement, EntryStatement};
use crate::{EntryCodecConfig, SessionLine, SqlStatementContext, StatsLine};
use bytes::{Bytes, BytesMut};
use log::debug;
use std::default::Default;
use std::fmt::{Display, Formatter};
use std::ops::AddAssign;
use thiserror::Error;
use tokio::io;
use tokio_util::codec::Decoder;
use winnow::ModalResult;
use winnow::Parser;
use winnow::ascii::multispace0;
use winnow::combinator::opt;
use winnow::error::ErrMode;
use winnow::stream::AsBytes;
use winnow::stream::Stream as _;
use winnow_datetime::DateTime;
const LENGTH_MAX: usize = 10000000000;
#[derive(Error, Debug)]
pub enum EntryError {
#[error("entry field is missing: {0}")]
MissingField(String),
#[error("duplicate id: {0}")]
DuplicateId(String),
}
#[derive(Debug, Error)]
pub enum CodecError {
#[error("file read error: {0}")]
IO(#[from] io::Error),
#[error("found start of new entry before entry completed at line: {0}")]
IncompleteEntry(EntryError),
}
#[derive(Debug)]
enum CodecExpect {
Header,
Time,
User,
Stats,
UseDatabase,
StartTimeStamp,
Sql,
}
impl Default for CodecExpect {
fn default() -> Self {
Self::Header
}
}
impl Display for CodecExpect {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let out = match self {
CodecExpect::Header => "header",
CodecExpect::Time => "time",
CodecExpect::User => "user",
CodecExpect::Stats => "stats",
CodecExpect::UseDatabase => "use database",
CodecExpect::StartTimeStamp => "start time stamp statement",
CodecExpect::Sql => "sql statement",
};
write!(f, "{}", out)
}
}
#[derive(Debug, Default)]
struct EntryContext {
expects: CodecExpect,
headers: HeaderLines,
time: Option<DateTime>,
user: Option<SessionLine>,
stats: Option<StatsLine>,
set_timestamp: Option<u32>,
attributes: Option<EntrySqlAttributes>,
}
impl EntryContext {
fn complete(&mut self) -> Result<Entry, EntryError> {
let time = self.time.clone().ok_or(MissingField("time".into()))?;
let session = self.user.clone().ok_or(MissingField("user".into()))?;
let stats = self.stats.clone().ok_or(MissingField("stats".into()))?;
let set_timestamp = self
.set_timestamp
.clone()
.ok_or(MissingField("set timestamp".into()))?;
let attributes = self.attributes.clone().ok_or(MissingField("sql".into()))?;
let e = Entry {
call: EntryCall::new(time, set_timestamp),
session: session.into(),
stats: stats.into(),
sql_attributes: attributes,
};
self.reset();
Ok(e)
}
fn reset(&mut self) {
*self = EntryContext::default();
}
}
#[derive(Debug, Default)]
pub struct EntryCodec {
processed: usize,
context: EntryContext,
config: EntryCodecConfig,
}
impl EntryCodec {
pub fn new(c: EntryCodecConfig) -> Self {
Self {
config: c,
..Default::default()
}
}
fn parse_next<'b>(&mut self, i: &mut Stream<'b>) -> ModalResult<Option<Entry>> {
let entry = match self.context.expects {
CodecExpect::Header => {
let _ = multispace0(i)?;
let res = opt(log_header).parse_next(i)?;
self.context.expects = CodecExpect::Time;
self.context.headers = res.unwrap_or_default();
None
}
CodecExpect::Time => {
let _ = multispace0(i)?;
let dt = parse_entry_time(i)?;
self.context.time = Some(dt);
self.context.expects = CodecExpect::User;
None
}
CodecExpect::User => {
let sl = entry_user(i)?;
self.context.user = Some(sl);
self.context.expects = CodecExpect::Stats;
None
}
CodecExpect::Stats => {
let _ = multispace0(i)?;
let st = parse_entry_stats(i)?;
self.context.stats = Some(st);
self.context.expects = CodecExpect::UseDatabase;
None
}
CodecExpect::UseDatabase => {
let _ = multispace0(i)?;
let _ = opt(use_database).parse_next(i)?;
self.context.expects = CodecExpect::StartTimeStamp;
None
}
CodecExpect::StartTimeStamp => {
let _ = multispace0(i)?;
let st = start_timestamp_command(i)?;
self.context.set_timestamp = Some(st.into());
self.context.expects = CodecExpect::Sql;
None
}
CodecExpect::Sql => {
let _ = multispace0(i)?;
if let Ok(c) = admin_command(i) {
self.context.attributes = Some(EntrySqlAttributes {
sql: (c.command.clone()),
statement: EntryStatement::AdminCommand(c),
});
} else {
let mut details = None;
if let Ok(Some(d)) = opt(details_comment).parse_next(i) {
details = Some(d);
}
let mut sql_lines = sql_lines(i)?;
let s = if let Ok(s) =
parse_sql(&String::from_utf8_lossy(&sql_lines), &self.config.masking)
{
if s.len() == 1 {
let context: Option<SqlStatementContext> = if let Some(d) = details {
if let Some(f) = &self.config.map_comment_context {
f(d)
} else {
None
}
} else {
None
};
let s = EntrySqlStatement {
statement: s[0].clone(),
context,
};
sql_lines = Bytes::from(s.statement.to_string());
SqlStatement(s)
} else {
EntryStatement::InvalidStatement(
String::from_utf8_lossy(&sql_lines).to_string(),
)
}
} else {
EntryStatement::InvalidStatement(
String::from_utf8_lossy(&sql_lines).to_string(),
)
};
self.context.attributes = Some(EntrySqlAttributes {
sql: sql_lines,
statement: s,
});
}
let e = self.context.complete().unwrap();
Some(e)
}
};
return if let Some(e) = entry {
self.processed.add_assign(1);
Ok(Some(e))
} else {
Ok(None)
};
}
}
impl Decoder for EntryCodec {
type Item = Entry;
type Error = CodecError;
fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
if src.len() < 4 {
return Ok(None);
}
let mut length_bytes = [0u8; 4];
length_bytes.copy_from_slice(&src[..4]);
let length = u32::from_le_bytes(length_bytes) as usize;
if length > LENGTH_MAX {
return Err(io::Error::new(
io::ErrorKind::InvalidData,
format!("Frame of length {} is too large.", length),
)
.into());
}
let b = &src.split()[..];
let mut i = Stream::new(&b);
let mut start = i.checkpoint();
loop {
if i.len() == 0 {
return Ok(None);
};
match self.parse_next(&mut i) {
Ok(e) => {
if let Some(e) = e {
self.context = EntryContext::default();
src.extend_from_slice(i.as_bytes());
return Ok(Some(e));
} else {
debug!("preparing input for next parser\n");
start = i.checkpoint();
continue;
}
}
Err(ErrMode::Incomplete(_)) => {
i.reset(&start);
src.extend_from_slice(i.as_bytes());
return Ok(None);
}
Err(ErrMode::Backtrack(e)) => {
panic!(
"unhandled parser backtrack error after {:#?} processed: {}",
e.to_string(),
self.processed
);
}
Err(ErrMode::Cut(e)) => {
panic!(
"unhandled parser cut error after {:#?} processed: {}",
e.to_string(),
self.processed
);
}
}
}
}
fn decode_eof(&mut self, buf: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
match self.decode(buf)? {
Some(frame) => Ok(Some(frame)),
None => {
let p = buf.iter().position(|v| !v.is_ascii_whitespace());
if p.is_none() {
Ok(None)
} else {
let out = format!(
"bytes remaining on stream; {}",
std::str::from_utf8(buf).unwrap()
);
Err(io::Error::new(io::ErrorKind::Other, out).into())
}
}
}
}
}
#[cfg(test)]
mod tests {
use crate::codec::EntryCodec;
use crate::parser::parse_sql;
use crate::types::EntryStatement::SqlStatement;
use crate::types::{
Entry, EntryCall, EntrySession, EntrySqlAttributes, EntrySqlStatement,
EntrySqlStatementObject, EntryStatement, EntryStats,
};
use crate::{EntryCodecConfig, EntryMasking, SqlStatementContext};
use bytes::{Bytes, BytesMut};
use futures::StreamExt;
use std::default::Default;
use std::io::Cursor;
use std::ops::AddAssign;
use tokio::fs::File;
use tokio_util::codec::Framed;
use winnow::error::InputError;
use winnow_iso8601::datetime::datetime;
#[tokio::test]
async fn parses_select_entry() {
let sql_comment = "-- request_id: apLo5wdqkmKw4W7vGfiBc5, file: src/endpoints/original/mod\
.rs, method: notifications(), line: 38";
let sql = "SELECT film.film_id AS FID, film.title AS title, film.description AS \
description, category.name AS category, film.rental_rate AS price FROM category LEFT JOIN \
film_category ON category.category_id = film_category.category_id LEFT JOIN film ON \
film_category.film_id = film.film_id GROUP BY film.film_id, category.name;";
let mut time = "2018-02-05T02:46:47.273Z";
let entry = format!(
"# Time: {}
# User@Host: msandbox[msandbox] @ localhost [] Id: 10
# Query_time: 0.000352 Lock_time: 0.000000 Rows_sent: 0 Rows_examined: 0
use mysql;
SET timestamp=1517798807;
{}
{},
",
time, sql_comment, sql
);
let mut eb = entry.as_bytes().to_vec();
let config = EntryCodecConfig {
masking: Default::default(),
map_comment_context: Some(|d| {
let acc = SqlStatementContext {
request_id: d
.get(&*BytesMut::from("request_id"))
.and_then(|b| Some(b.clone())),
caller: d
.get(&*BytesMut::from("file"))
.and_then(|b| Some(b.clone())),
function: d
.get(&*BytesMut::from("method"))
.and_then(|b| Some(b.clone())),
line: d
.get(&*BytesMut::from("line"))
.and_then(|b| String::from_utf8_lossy(b).parse().ok()),
};
if acc == SqlStatementContext::default() {
None
} else {
Some(acc)
}
}),
};
let mut ff = Framed::new(Cursor::new(&mut eb), EntryCodec::new(config));
let e = ff.next().await.unwrap().unwrap();
let stmts = parse_sql(sql, &EntryMasking::None).unwrap();
let expected_stmt = EntrySqlStatement {
statement: stmts.get(0).unwrap().clone(),
context: Some(SqlStatementContext {
request_id: Some("apLo5wdqkmKw4W7vGfiBc5".into()),
caller: Some("src/endpoints/original/mod.rs".into()),
function: Some("notifications()".into()),
line: Some(38),
}),
};
let expected_sql = sql.trim().strip_suffix(";").unwrap();
let expected_entry = Entry {
call: EntryCall::new(
datetime::<_, InputError<_>>(&mut time).unwrap(),
1517798807,
),
session: EntrySession {
user_name: Bytes::from("msandbox"),
sys_user_name: Bytes::from("msandbox"),
host_name: Some(Bytes::from("localhost")),
ip_address: None,
thread_id: 10,
},
stats: EntryStats {
query_time: 0.000352,
lock_time: 0.0,
rows_sent: 0,
rows_examined: 0,
},
sql_attributes: EntrySqlAttributes {
sql: Bytes::from(expected_sql),
statement: SqlStatement(expected_stmt),
},
};
assert_eq!(e, expected_entry);
}
#[tokio::test]
async fn parses_multiple_entries() {
let entries = "# Time: 2018-02-05T02:46:47.273786Z
# User@Host: msandbox[msandbox] @ localhost [] Id: 10
# Query_time: 0.000352 Lock_time: 0.000000 Rows_sent: 0 Rows_examined: 0
SET timestamp=1517798807;
-- ID: 123, caller: hello_world()
SELECT film.film_id AS FID, film.title AS title, film.description AS description, category.name AS category, film.rental_rate AS price
FROM category LEFT JOIN film_category ON category.category_id = film_category.category_id LEFT JOIN film ON film_category.film_id = film.film_id
GROUP BY film.film_id, category.name;
# Time: 2018-02-05T02:46:47.273787Z
# User@Host: msandbox[msandbox] @ localhost [] Id: 10
# Query_time: 0.000352 Lock_time: 0.000000 Rows_sent: 0 Rows_examined: 0
SET timestamp=1517798808;
/*!40101 SET NAMES utf8 */;
# Time: 2018-02-05T02:46:47.273788Z
# User@Host: msandbox[msandbox] @ localhost [] Id: 10
# Query_time: 0.000352 Lock_time: 0.000000 Rows_sent: 0 Rows_examined: 0
SET timestamp=1517798809;
-- ID: 456, caller: hello_world()
SELECT film2.film_id AS FID, film2.title AS title, film2.description AS description, category.name
AS category, film2.rental_rate AS price
FROM category LEFT JOIN film_category ON category.category_id = film_category.category_id LEFT
JOIN film2 ON film_category.film_id = film2.film_id
GROUP BY film2.film_id, category.name;
";
let mut eb = entries.as_bytes().to_vec();
let mut ff = Framed::with_capacity(Cursor::new(&mut eb), EntryCodec::default(), 4);
let mut found = 0;
let mut invalid = 0;
while let Some(res) = ff.next().await {
let e = res.unwrap();
found.add_assign(1);
if let EntryStatement::InvalidStatement(_) = e.sql_attributes.statement {
invalid.add_assign(1);
}
}
assert_eq!(found, 3, "found");
assert_eq!(invalid, 1, "valid");
}
#[tokio::test]
async fn parses_select_objects() {
let sql = String::from("SELECT film.film_id AS FID, film.title AS title, film.description AS description, category.name AS category, film.rental_rate AS price
FROM category LEFT JOIN film_category ON category.category_id = film_category.category_id LEFT
JOIN film ON film_category.film_id = film.film_id LEFT JOIN film AS dupe_film ON film_category
.film_id = dupe_film.film_id LEFT JOIN other.film AS other_film ON other_film.film_id =
film_category.film_id
GROUP BY film.film_id, category.name;");
let entry = format!(
"# Time: 2018-02-05T02:46:47.273786Z
# User@Host: msandbox[msandbox] @ localhost [] Id: 10
# Query_time: 0.000352 Lock_time: 0.000000 Rows_sent: 0 Rows_examined: 0
SET timestamp=1517798807;
{}",
sql
);
let expected = vec![
EntrySqlStatementObject {
schema_name: None,
object_name: "category".as_bytes().into(),
},
EntrySqlStatementObject {
schema_name: None,
object_name: "film".as_bytes().into(),
},
EntrySqlStatementObject {
schema_name: None,
object_name: "film_category".as_bytes().into(),
},
EntrySqlStatementObject {
schema_name: Some("other".as_bytes().into()),
object_name: "film".as_bytes().into(),
},
];
let mut eb = entry.as_bytes().to_vec();
let mut ff = Framed::new(Cursor::new(&mut eb), EntryCodec::default());
let e = ff.next().await.unwrap().unwrap();
match e.sql_attributes.statement() {
SqlStatement(s) => {
assert_eq!(s.objects(), expected);
assert_eq!(s.sql_type().to_string(), "SELECT".to_string());
}
_ => {
panic!("should have parsed sql as SqlStatement")
}
}
}
#[tokio::test]
async fn parse_log_file() {
let f = File::open("assets/slow-test-queries.log").await.unwrap();
let mut ff = Framed::new(f, EntryCodec::default());
let mut i = 0;
while let Some(res) = ff.next().await {
let _ = res.unwrap();
i.add_assign(1);
}
assert_eq!(i, 310);
}
#[tokio::test]
async fn parse_mysql_log_file_small_capacity() {
let f = File::open("assets/slow-test-queries.log").await.unwrap();
let mut ff = Framed::with_capacity(f, EntryCodec::default(), 4);
let mut i = 0;
while let Some(res) = ff.next().await {
let _ = res.unwrap();
i.add_assign(1);
}
assert_eq!(i, 310);
}
}