1use crate::{EntryAdminCommand, SessionLine, SqlStatementContext, StatsLine};
2use bytes::{BufMut, Bytes, BytesMut};
3use sqlparser::ast::{Statement, visit_relations};
4use std::borrow::Cow;
5use std::collections::BTreeSet;
6use std::fmt::{Display, Formatter};
7use std::ops::ControlFlow;
8use winnow_datetime::DateTime;
9
10#[derive(Clone, Debug, PartialEq)]
12pub struct Entry {
13 pub call: EntryCall,
15 pub session: EntrySession,
17 pub stats: EntryStats,
19 pub sql_attributes: EntrySqlAttributes,
21}
22
23impl Entry {
24 pub fn log_time(&self) -> DateTime {
26 self.call.log_time.clone()
27 }
28
29 pub fn user_name(&self) -> Cow<str> {
31 String::from_utf8_lossy(&self.session.user_name)
32 }
33
34 pub fn user_name_bytes(&self) -> Bytes {
36 self.session.user_name.clone()
37 }
38
39 pub fn sys_user_name(&self) -> Cow<str> {
41 String::from_utf8_lossy(&self.session.sys_user_name)
42 }
43
44 pub fn sys_user_name_bytes(&self) -> Bytes {
46 self.session.sys_user_name.clone()
47 }
48
49 pub fn host_name(&self) -> Option<Cow<str>> {
51 if let Some(v) = &self.session.host_name {
52 Some(String::from_utf8_lossy(v.as_ref()))
53 } else {
54 None
55 }
56 }
57
58 pub fn host_name_bytes(&self) -> Option<Bytes> {
60 self.session.host_name_bytes()
61 }
62
63 pub fn ip_address(&self) -> Option<Cow<'_, str>> {
65 self.session.ip_address()
66 }
67
68 pub fn ip_address_bytes(&self) -> Option<Bytes> {
70 self.session.ip_address_bytes()
71 }
72
73 pub fn thread_id(&self) -> u32 {
75 self.session.thread_id()
76 }
77
78 pub fn stats(&self) -> &EntryStats {
80 &self.stats
81 }
82
83 pub fn query_time(&self) -> f64 {
85 self.stats.query_time()
86 }
87
88 pub fn lock_time(&self) -> f64 {
90 self.stats.lock_time()
91 }
92
93 pub fn rows_sent(&self) -> u32 {
95 self.stats.rows_sent()
96 }
97
98 pub fn rows_examined(&self) -> u32 {
100 self.stats.rows_examined()
101 }
102}
103
104#[derive(Clone, Debug, PartialEq)]
105pub struct EntrySqlStatement {
107 pub statement: Statement,
109 pub context: Option<SqlStatementContext>,
110}
111
112impl EntrySqlStatement {
113 pub fn sql_context(&self) -> Option<SqlStatementContext> {
114 self.context.clone()
115 }
116
117 pub fn objects(&self) -> Vec<EntrySqlStatementObject> {
118 let mut visited = BTreeSet::new();
119
120 let _ = visit_relations(&self.statement, |relation| {
121 let ident = &relation.0;
122
123 let _ = visited.insert(if ident.len() == 2 {
124 EntrySqlStatementObject {
125 schema_name: Some(Bytes::from(ident[0].to_string())),
126 object_name: Bytes::from(ident[1].to_string()),
127 }
128 } else {
129 EntrySqlStatementObject {
130 schema_name: None,
131 object_name: ident.last().unwrap().to_string().to_owned().into(),
132 }
133 });
134
135 ControlFlow::<()>::Continue(())
136 });
137 visited.into_iter().collect()
138 }
139
140 pub fn sql_type(&self) -> EntrySqlType {
141 match self.statement {
142 Statement::Query(_) => EntrySqlType::Query,
143 Statement::Insert { .. } => EntrySqlType::Insert,
144 Statement::Update { .. } => EntrySqlType::Update,
145 Statement::Delete { .. } => EntrySqlType::Delete,
146 Statement::CreateTable { .. } => EntrySqlType::CreateTable,
147 Statement::CreateIndex { .. } => EntrySqlType::CreateIndex,
148 Statement::CreateView { .. } => EntrySqlType::CreateView,
149 Statement::AlterTable { .. } => EntrySqlType::AlterTable,
150 Statement::AlterIndex { .. } => EntrySqlType::AlterIndex,
151 Statement::Drop { .. } => EntrySqlType::Drop,
152 Statement::DropFunction { .. } => EntrySqlType::DropFunction,
153 Statement::Set { .. } => EntrySqlType::Set,
154 Statement::ShowVariable { .. } => EntrySqlType::ShowVariable,
155 Statement::ShowVariables { .. } => EntrySqlType::ShowVariables,
156 Statement::ShowCreate { .. } => EntrySqlType::ShowCreate,
157 Statement::ShowColumns { .. } => EntrySqlType::ShowColumns,
158 Statement::ShowTables { .. } => EntrySqlType::ShowTables,
159 Statement::ShowCollation { .. } => EntrySqlType::ShowCollation,
160 Statement::Use { .. } => EntrySqlType::Use,
161 Statement::StartTransaction { .. } => EntrySqlType::StartTransaction,
162 Statement::Commit { .. } => EntrySqlType::Commit,
163 Statement::Rollback { .. } => EntrySqlType::Rollback,
164 Statement::CreateSchema { .. } => EntrySqlType::CreateSchema,
165 Statement::CreateDatabase { .. } => EntrySqlType::CreateDatabase,
166 Statement::Grant { .. } => EntrySqlType::Grant,
167 Statement::Revoke { .. } => EntrySqlType::Revoke,
168 Statement::Kill { .. } => EntrySqlType::Kill,
169 Statement::ExplainTable { .. } => EntrySqlType::ExplainTable,
170 Statement::Explain { .. } => EntrySqlType::Explain,
171 Statement::Savepoint { .. } => EntrySqlType::Savepoint,
172 Statement::LockTables { .. } => EntrySqlType::LockTables,
173 Statement::UnlockTables { .. } => EntrySqlType::LockTables,
174 Statement::Flush { .. } => EntrySqlType::Flush,
175 _ => EntrySqlType::Unknown,
176 }
177 }
178}
179
180impl From<Statement> for EntrySqlStatement {
181 fn from(statement: Statement) -> Self {
182 EntrySqlStatement {
183 statement,
184 context: None,
185 }
186 }
187}
188
189#[derive(Clone, Debug, Ord, PartialOrd, PartialEq, Eq)]
191pub struct EntrySqlStatementObject {
192 pub schema_name: Option<Bytes>,
194 pub object_name: Bytes,
196}
197
198impl EntrySqlStatementObject {
199 pub fn schema_name(&self) -> Option<Cow<str>> {
201 if let Some(v) = &self.schema_name {
202 Some(String::from_utf8_lossy(v.as_ref()))
203 } else {
204 None
205 }
206 }
207
208 pub fn schema_name_bytes(&self) -> Option<Bytes> {
210 self.schema_name.clone()
211 }
212
213 pub fn object_name(&self) -> Cow<str> {
215 String::from_utf8_lossy(self.object_name.as_ref())
216 }
217
218 pub fn object_name_bytes(&self) -> Bytes {
220 self.object_name.clone()
221 }
222
223 pub fn full_object_name_bytes(&self) -> Bytes {
225 let mut s = if let Some(n) = self.schema_name.clone() {
226 let mut s = BytesMut::from(n.as_ref());
227 s.put_slice(b".");
228 s
229 } else {
230 BytesMut::new()
231 };
232
233 s.put_slice(self.object_name.as_ref());
234 s.freeze()
235 }
236
237 pub fn full_object_name(&self) -> Cow<'_, str> {
239 String::from_utf8_lossy(self.full_object_name_bytes().as_ref())
240 .to_string()
241 .into()
242 }
243}
244
245#[derive(Clone, Debug, PartialEq)]
250pub enum EntryStatement {
251 AdminCommand(EntryAdminCommand),
253 SqlStatement(EntrySqlStatement),
255 InvalidStatement(String),
257}
258
259impl EntryStatement {
260 pub fn objects(&self) -> Option<Vec<EntrySqlStatementObject>> {
262 match self {
263 Self::SqlStatement(s) => Some(s.objects().clone()),
264 _ => None,
265 }
266 }
267
268 pub fn sql_type(&self) -> Option<EntrySqlType> {
270 match self {
271 Self::SqlStatement(s) => Some(s.sql_type().clone()),
272 _ => None,
273 }
274 }
275
276 pub fn sql_context(&self) -> Option<SqlStatementContext> {
278 match self {
279 Self::SqlStatement(s) => s.sql_context().clone(),
280 _ => None,
281 }
282 }
283}
284
285#[derive(Copy, Clone, Debug, Eq, Hash, PartialEq)]
290pub enum EntrySqlType {
291 Query,
293 Insert,
295 Update,
297 Delete,
299 CreateTable,
301 CreateIndex,
303 CreateView,
305 AlterTable,
307 AlterIndex,
309 Drop,
311 DropFunction,
313 Set,
315 ShowVariable,
317 ShowVariables,
319 ShowCreate,
321 ShowColumns,
323 ShowTables,
325 ShowCollation,
327 Use,
329 StartTransaction,
331 SetTransaction,
333 Commit,
335 Rollback,
337 CreateSchema,
339 CreateDatabase,
341 Grant,
343 Revoke,
345 Kill,
347 ExplainTable,
349 Explain,
351 Savepoint,
353 LockTables,
355 UnlockTables,
357 Flush,
359 Unknown,
361}
362
363impl Display for EntrySqlType {
364 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
365 let out = match self {
366 Self::Query => "SELECT",
367 Self::Insert => "INSERT",
368 Self::Update => "UPDATE",
369 Self::Delete => "DELETE",
370 Self::CreateTable => "CREATE TABLE",
371 Self::CreateIndex => "CREATE INDEX",
372 Self::CreateView => "CREATE VIEW",
373 Self::AlterTable => "ALTER TABLE",
374 Self::AlterIndex => "ALTER INDEX",
375 Self::Drop => "DROP TABLE",
376 Self::DropFunction => "DROP FUNCTION",
377 Self::Set => "SET",
378 Self::ShowVariable => "SHOW VARIABLE",
379 Self::ShowVariables => "SHOW VARIABLES",
380 Self::ShowCreate => "SHOW CREATE TABLE",
381 Self::ShowColumns => "SHOW COLUMNS",
382 Self::ShowTables => "SHOW TABLES",
383 Self::ShowCollation => "SHOW COLLATION",
384 Self::Use => "USE",
385 Self::StartTransaction => "BEGIN TRANSACTION",
386 Self::SetTransaction => "SET TRANSACTION",
387 Self::Commit => "COMMIT TRANSACTION",
388 Self::Rollback => "ROLLBACK TRANSACTION",
389 Self::CreateSchema => "CREATE SCHEMA",
390 Self::CreateDatabase => "CREATE DATABASE",
391 Self::Grant => "GRANT",
392 Self::Revoke => "REVOKE",
393 Self::Kill => "KILL",
394 Self::ExplainTable => "EXPLAIN TABLE",
395 Self::Explain => "EXPLAIN",
396 Self::Savepoint => "SAVEPOINT",
397 Self::LockTables => "LOCK TABLES",
398 Self::UnlockTables => "UNLOCK TABLES",
399 Self::Flush => "FLUSH",
400 Self::Unknown => "NULL",
401 };
402
403 write!(f, "{}", out)
404 }
405}
406
407#[derive(Clone, Debug, PartialEq)]
409pub struct EntrySession {
410 pub user_name: Bytes,
412 pub sys_user_name: Bytes,
414 pub host_name: Option<Bytes>,
416 pub ip_address: Option<Bytes>,
418 pub thread_id: u32,
420}
421
422impl From<SessionLine> for EntrySession {
423 fn from(line: SessionLine) -> Self {
424 Self {
425 user_name: line.user,
426 sys_user_name: line.sys_user,
427 host_name: line.host,
428 ip_address: line.ip_address,
429 thread_id: line.thread_id,
430 }
431 }
432}
433
434impl EntrySession {
435 pub fn user_name(&self) -> Cow<str> {
437 String::from_utf8_lossy(&self.user_name)
438 }
439
440 pub fn user_name_bytes(&self) -> Bytes {
442 self.user_name.clone()
443 }
444
445 pub fn sys_user_name(&self) -> Cow<str> {
447 String::from_utf8_lossy(&self.sys_user_name)
448 }
449
450 pub fn sys_user_name_bytes(&self) -> Bytes {
452 self.sys_user_name.clone()
453 }
454
455 pub fn host_name(&self) -> Option<Cow<str>> {
457 if let Some(v) = &self.host_name {
458 Some(String::from_utf8_lossy(v.as_ref()))
459 } else {
460 None
461 }
462 }
463
464 pub fn host_name_bytes(&self) -> Option<Bytes> {
466 self.host_name.clone()
467 }
468
469 pub fn ip_address(&self) -> Option<Cow<'_, str>> {
471 if let Some(v) = &self.ip_address {
472 Some(String::from_utf8_lossy(v.as_ref()))
473 } else {
474 None
475 }
476 }
477
478 pub fn ip_address_bytes(&self) -> Option<Bytes> {
480 self.ip_address.clone()
481 }
482
483 pub fn thread_id(&self) -> u32 {
485 self.thread_id
486 }
487}
488
489#[derive(Clone, Debug, PartialEq)]
491pub struct EntrySqlAttributes {
492 pub sql: Bytes,
494 pub statement: EntryStatement,
496}
497
498impl EntrySqlAttributes {
499 pub fn sql_bytes(&self) -> Bytes {
501 self.sql.clone()
502 }
503
504 pub fn sql(&self) -> Cow<'_, str> {
506 String::from_utf8_lossy(self.sql.as_ref())
507 }
508
509 pub fn sql_type(&self) -> Option<EntrySqlType> {
511 self.statement.sql_type()
512 }
513
514 pub fn objects(&self) -> Option<Vec<EntrySqlStatementObject>> {
516 self.statement.objects()
517 }
518
519 pub fn statement(&self) -> &EntryStatement {
521 &self.statement
522 }
523}
524
525#[derive(Clone, Debug, PartialEq)]
527pub struct EntryCall {
528 pub log_time: DateTime,
530 pub set_timestamp: u32,
532}
533
534impl EntryCall {
535 pub fn new(log_time: DateTime, set_timestamp: u32) -> Self {
537 Self {
538 log_time,
539 set_timestamp,
540 }
541 }
542
543 pub fn log_time(&self) -> DateTime {
545 self.log_time.clone()
546 }
547
548 pub fn set_timestamp(&self) -> u32 {
550 self.set_timestamp
551 }
552}
553
554#[derive(Clone, Copy, Debug, PartialEq, PartialOrd)]
556pub struct EntryStats {
557 pub query_time: f64,
559 pub lock_time: f64,
561 pub rows_sent: u32,
563 pub rows_examined: u32,
565}
566
567impl EntryStats {
568 pub fn query_time(&self) -> f64 {
570 self.query_time
571 }
572
573 pub fn lock_time(&self) -> f64 {
575 self.lock_time
576 }
577
578 pub fn rows_sent(&self) -> u32 {
580 self.rows_sent
581 }
582
583 pub fn rows_examined(&self) -> u32 {
585 self.rows_examined
586 }
587}
588
589impl From<StatsLine> for EntryStats {
590 fn from(line: StatsLine) -> Self {
591 Self {
592 query_time: line.query_time,
593 lock_time: line.lock_time,
594 rows_sent: line.rows_sent,
595 rows_examined: line.rows_examined,
596 }
597 }
598}
599
600#[derive(Clone, Debug, PartialEq)]
602pub struct EntryContext {
603 pub request_id: Option<Bytes>,
605 pub caller: Option<Bytes>,
607 pub function: Option<Bytes>,
609 pub line: Option<u32>,
611}
612
613impl EntryContext {
614 pub fn request_id(&self) -> Option<Cow<str>> {
616 if let Some(v) = &self.request_id {
617 Some(String::from_utf8_lossy(v.as_ref()))
618 } else {
619 None
620 }
621 }
622
623 pub fn request_id_bytes(&self) -> Option<Bytes> {
625 self.request_id.clone()
626 }
627
628 pub fn caller(&self) -> Option<Cow<str>> {
630 if let Some(v) = &self.caller {
631 Some(String::from_utf8_lossy(v.as_ref()))
632 } else {
633 None
634 }
635 }
636
637 pub fn caller_bytes(&self) -> Option<Bytes> {
639 self.caller.clone()
640 }
641
642 pub fn function(&self) -> Option<Cow<str>> {
644 if let Some(v) = &self.function {
645 Some(String::from_utf8_lossy(v.as_ref()))
646 } else {
647 None
648 }
649 }
650
651 pub fn function_bytes(&self) -> Option<Bytes> {
653 self.function.clone()
654 }
655
656 pub fn line(&self) -> Option<u32> {
658 self.line
659 }
660}