use crate::{EntryAdminCommand, SessionLine, SqlStatementContext, StatsLine};
use bytes::{BufMut, Bytes, BytesMut};
use sqlparser::ast::{Statement, visit_relations};
use std::borrow::Cow;
use std::collections::BTreeSet;
use std::fmt::{Display, Formatter};
use std::ops::ControlFlow;
use winnow_datetime::DateTime;
#[derive(Clone, Debug, PartialEq)]
pub struct Entry {
pub call: EntryCall,
pub session: EntrySession,
pub stats: EntryStats,
pub sql_attributes: EntrySqlAttributes,
}
impl Entry {
pub fn log_time(&self) -> DateTime {
self.call.log_time.clone()
}
pub fn user_name(&self) -> Cow<str> {
String::from_utf8_lossy(&self.session.user_name)
}
pub fn user_name_bytes(&self) -> Bytes {
self.session.user_name.clone()
}
pub fn sys_user_name(&self) -> Cow<str> {
String::from_utf8_lossy(&self.session.sys_user_name)
}
pub fn sys_user_name_bytes(&self) -> Bytes {
self.session.sys_user_name.clone()
}
pub fn host_name(&self) -> Option<Cow<str>> {
if let Some(v) = &self.session.host_name {
Some(String::from_utf8_lossy(v.as_ref()))
} else {
None
}
}
pub fn host_name_bytes(&self) -> Option<Bytes> {
self.session.host_name_bytes()
}
pub fn ip_address(&self) -> Option<Cow<'_, str>> {
self.session.ip_address()
}
pub fn ip_address_bytes(&self) -> Option<Bytes> {
self.session.ip_address_bytes()
}
pub fn thread_id(&self) -> u32 {
self.session.thread_id()
}
pub fn stats(&self) -> &EntryStats {
&self.stats
}
pub fn query_time(&self) -> f64 {
self.stats.query_time()
}
pub fn lock_time(&self) -> f64 {
self.stats.lock_time()
}
pub fn rows_sent(&self) -> u32 {
self.stats.rows_sent()
}
pub fn rows_examined(&self) -> u32 {
self.stats.rows_examined()
}
}
#[derive(Clone, Debug, PartialEq)]
pub struct EntrySqlStatement {
pub statement: Statement,
pub context: Option<SqlStatementContext>,
}
impl EntrySqlStatement {
pub fn sql_context(&self) -> Option<SqlStatementContext> {
self.context.clone()
}
pub fn objects(&self) -> Vec<EntrySqlStatementObject> {
let mut visited = BTreeSet::new();
let _ = visit_relations(&self.statement, |relation| {
let ident = &relation.0;
let _ = visited.insert(if ident.len() == 2 {
EntrySqlStatementObject {
schema_name: Some(Bytes::from(ident[0].to_string())),
object_name: Bytes::from(ident[1].to_string()),
}
} else {
EntrySqlStatementObject {
schema_name: None,
object_name: ident.last().unwrap().to_string().to_owned().into(),
}
});
ControlFlow::<()>::Continue(())
});
visited.into_iter().collect()
}
pub fn sql_type(&self) -> EntrySqlType {
match self.statement {
Statement::Query(_) => EntrySqlType::Query,
Statement::Insert { .. } => EntrySqlType::Insert,
Statement::Update { .. } => EntrySqlType::Update,
Statement::Delete { .. } => EntrySqlType::Delete,
Statement::CreateTable { .. } => EntrySqlType::CreateTable,
Statement::CreateIndex { .. } => EntrySqlType::CreateIndex,
Statement::CreateView { .. } => EntrySqlType::CreateView,
Statement::AlterTable { .. } => EntrySqlType::AlterTable,
Statement::AlterIndex { .. } => EntrySqlType::AlterIndex,
Statement::Drop { .. } => EntrySqlType::Drop,
Statement::DropFunction { .. } => EntrySqlType::DropFunction,
Statement::Set { .. } => EntrySqlType::Set,
Statement::ShowVariable { .. } => EntrySqlType::ShowVariable,
Statement::ShowVariables { .. } => EntrySqlType::ShowVariables,
Statement::ShowCreate { .. } => EntrySqlType::ShowCreate,
Statement::ShowColumns { .. } => EntrySqlType::ShowColumns,
Statement::ShowTables { .. } => EntrySqlType::ShowTables,
Statement::ShowCollation { .. } => EntrySqlType::ShowCollation,
Statement::Use { .. } => EntrySqlType::Use,
Statement::StartTransaction { .. } => EntrySqlType::StartTransaction,
Statement::Commit { .. } => EntrySqlType::Commit,
Statement::Rollback { .. } => EntrySqlType::Rollback,
Statement::CreateSchema { .. } => EntrySqlType::CreateSchema,
Statement::CreateDatabase { .. } => EntrySqlType::CreateDatabase,
Statement::Grant { .. } => EntrySqlType::Grant,
Statement::Revoke { .. } => EntrySqlType::Revoke,
Statement::Kill { .. } => EntrySqlType::Kill,
Statement::ExplainTable { .. } => EntrySqlType::ExplainTable,
Statement::Explain { .. } => EntrySqlType::Explain,
Statement::Savepoint { .. } => EntrySqlType::Savepoint,
Statement::LockTables { .. } => EntrySqlType::LockTables,
Statement::UnlockTables { .. } => EntrySqlType::LockTables,
Statement::Flush { .. } => EntrySqlType::Flush,
_ => EntrySqlType::Unknown,
}
}
}
impl From<Statement> for EntrySqlStatement {
fn from(statement: Statement) -> Self {
EntrySqlStatement {
statement,
context: None,
}
}
}
#[derive(Clone, Debug, Ord, PartialOrd, PartialEq, Eq)]
pub struct EntrySqlStatementObject {
pub schema_name: Option<Bytes>,
pub object_name: Bytes,
}
impl EntrySqlStatementObject {
pub fn schema_name(&self) -> Option<Cow<str>> {
if let Some(v) = &self.schema_name {
Some(String::from_utf8_lossy(v.as_ref()))
} else {
None
}
}
pub fn schema_name_bytes(&self) -> Option<Bytes> {
self.schema_name.clone()
}
pub fn object_name(&self) -> Cow<str> {
String::from_utf8_lossy(self.object_name.as_ref())
}
pub fn object_name_bytes(&self) -> Bytes {
self.object_name.clone()
}
pub fn full_object_name_bytes(&self) -> Bytes {
let mut s = if let Some(n) = self.schema_name.clone() {
let mut s = BytesMut::from(n.as_ref());
s.put_slice(b".");
s
} else {
BytesMut::new()
};
s.put_slice(self.object_name.as_ref());
s.freeze()
}
pub fn full_object_name(&self) -> Cow<'_, str> {
String::from_utf8_lossy(self.full_object_name_bytes().as_ref())
.to_string()
.into()
}
}
#[derive(Clone, Debug, PartialEq)]
pub enum EntryStatement {
AdminCommand(EntryAdminCommand),
SqlStatement(EntrySqlStatement),
InvalidStatement(String),
}
impl EntryStatement {
pub fn objects(&self) -> Option<Vec<EntrySqlStatementObject>> {
match self {
Self::SqlStatement(s) => Some(s.objects().clone()),
_ => None,
}
}
pub fn sql_type(&self) -> Option<EntrySqlType> {
match self {
Self::SqlStatement(s) => Some(s.sql_type().clone()),
_ => None,
}
}
pub fn sql_context(&self) -> Option<SqlStatementContext> {
match self {
Self::SqlStatement(s) => s.sql_context().clone(),
_ => None,
}
}
}
#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq)]
pub enum EntrySqlType {
Query,
Insert,
Update,
Delete,
CreateTable,
CreateIndex,
CreateView,
AlterTable,
AlterIndex,
Drop,
DropFunction,
Set,
ShowVariable,
ShowVariables,
ShowCreate,
ShowColumns,
ShowTables,
ShowCollation,
Use,
StartTransaction,
SetTransaction,
Commit,
Rollback,
CreateSchema,
CreateDatabase,
Grant,
Revoke,
Kill,
ExplainTable,
Explain,
Savepoint,
LockTables,
UnlockTables,
Flush,
Unknown,
}
impl Display for EntrySqlType {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
let out = match self {
Self::Query => "SELECT",
Self::Insert => "INSERT",
Self::Update => "UPDATE",
Self::Delete => "DELETE",
Self::CreateTable => "CREATE TABLE",
Self::CreateIndex => "CREATE INDEX",
Self::CreateView => "CREATE VIEW",
Self::AlterTable => "ALTER TABLE",
Self::AlterIndex => "ALTER INDEX",
Self::Drop => "DROP TABLE",
Self::DropFunction => "DROP FUNCTION",
Self::Set => "SET",
Self::ShowVariable => "SHOW VARIABLE",
Self::ShowVariables => "SHOW VARIABLES",
Self::ShowCreate => "SHOW CREATE TABLE",
Self::ShowColumns => "SHOW COLUMNS",
Self::ShowTables => "SHOW TABLES",
Self::ShowCollation => "SHOW COLLATION",
Self::Use => "USE",
Self::StartTransaction => "BEGIN TRANSACTION",
Self::SetTransaction => "SET TRANSACTION",
Self::Commit => "COMMIT TRANSACTION",
Self::Rollback => "ROLLBACK TRANSACTION",
Self::CreateSchema => "CREATE SCHEMA",
Self::CreateDatabase => "CREATE DATABASE",
Self::Grant => "GRANT",
Self::Revoke => "REVOKE",
Self::Kill => "KILL",
Self::ExplainTable => "EXPLAIN TABLE",
Self::Explain => "EXPLAIN",
Self::Savepoint => "SAVEPOINT",
Self::LockTables => "LOCK TABLES",
Self::UnlockTables => "UNLOCK TABLES",
Self::Flush => "FLUSH",
Self::Unknown => "NULL",
};
write!(f, "{}", out)
}
}
#[derive(Clone, Debug, PartialEq)]
pub struct EntrySession {
pub user_name: Bytes,
pub sys_user_name: Bytes,
pub host_name: Option<Bytes>,
pub ip_address: Option<Bytes>,
pub thread_id: u32,
}
impl From<SessionLine> for EntrySession {
fn from(line: SessionLine) -> Self {
Self {
user_name: line.user,
sys_user_name: line.sys_user,
host_name: line.host,
ip_address: line.ip_address,
thread_id: line.thread_id,
}
}
}
impl EntrySession {
pub fn user_name(&self) -> Cow<str> {
String::from_utf8_lossy(&self.user_name)
}
pub fn user_name_bytes(&self) -> Bytes {
self.user_name.clone()
}
pub fn sys_user_name(&self) -> Cow<str> {
String::from_utf8_lossy(&self.sys_user_name)
}
pub fn sys_user_name_bytes(&self) -> Bytes {
self.sys_user_name.clone()
}
pub fn host_name(&self) -> Option<Cow<str>> {
if let Some(v) = &self.host_name {
Some(String::from_utf8_lossy(v.as_ref()))
} else {
None
}
}
pub fn host_name_bytes(&self) -> Option<Bytes> {
self.host_name.clone()
}
pub fn ip_address(&self) -> Option<Cow<'_, str>> {
if let Some(v) = &self.ip_address {
Some(String::from_utf8_lossy(v.as_ref()))
} else {
None
}
}
pub fn ip_address_bytes(&self) -> Option<Bytes> {
self.ip_address.clone()
}
pub fn thread_id(&self) -> u32 {
self.thread_id
}
}
#[derive(Clone, Debug, PartialEq)]
pub struct EntrySqlAttributes {
pub sql: Bytes,
pub statement: EntryStatement,
}
impl EntrySqlAttributes {
pub fn sql_bytes(&self) -> Bytes {
self.sql.clone()
}
pub fn sql(&self) -> Cow<'_, str> {
String::from_utf8_lossy(self.sql.as_ref())
}
pub fn sql_type(&self) -> Option<EntrySqlType> {
self.statement.sql_type()
}
pub fn objects(&self) -> Option<Vec<EntrySqlStatementObject>> {
self.statement.objects()
}
pub fn statement(&self) -> &EntryStatement {
&self.statement
}
}
#[derive(Clone, Debug, PartialEq)]
pub struct EntryCall {
pub log_time: DateTime,
pub set_timestamp: u32,
}
impl EntryCall {
pub fn new(log_time: DateTime, set_timestamp: u32) -> Self {
Self {
log_time,
set_timestamp,
}
}
pub fn log_time(&self) -> DateTime {
self.log_time.clone()
}
pub fn set_timestamp(&self) -> u32 {
self.set_timestamp
}
}
#[derive(Clone, Copy, Debug, PartialEq, PartialOrd)]
pub struct EntryStats {
pub query_time: f64,
pub lock_time: f64,
pub rows_sent: u32,
pub rows_examined: u32,
}
impl EntryStats {
pub fn query_time(&self) -> f64 {
self.query_time
}
pub fn lock_time(&self) -> f64 {
self.lock_time
}
pub fn rows_sent(&self) -> u32 {
self.rows_sent
}
pub fn rows_examined(&self) -> u32 {
self.rows_examined
}
}
impl From<StatsLine> for EntryStats {
fn from(line: StatsLine) -> Self {
Self {
query_time: line.query_time,
lock_time: line.lock_time,
rows_sent: line.rows_sent,
rows_examined: line.rows_examined,
}
}
}
#[derive(Clone, Debug, PartialEq)]
pub struct EntryContext {
pub request_id: Option<Bytes>,
pub caller: Option<Bytes>,
pub function: Option<Bytes>,
pub line: Option<u32>,
}
impl EntryContext {
pub fn request_id(&self) -> Option<Cow<str>> {
if let Some(v) = &self.request_id {
Some(String::from_utf8_lossy(v.as_ref()))
} else {
None
}
}
pub fn request_id_bytes(&self) -> Option<Bytes> {
self.request_id.clone()
}
pub fn caller(&self) -> Option<Cow<str>> {
if let Some(v) = &self.caller {
Some(String::from_utf8_lossy(v.as_ref()))
} else {
None
}
}
pub fn caller_bytes(&self) -> Option<Bytes> {
self.caller.clone()
}
pub fn function(&self) -> Option<Cow<str>> {
if let Some(v) = &self.function {
Some(String::from_utf8_lossy(v.as_ref()))
} else {
None
}
}
pub fn function_bytes(&self) -> Option<Bytes> {
self.function.clone()
}
pub fn line(&self) -> Option<u32> {
self.line
}
}