use std::sync::Arc;
use std::time::Duration;
use crate::transaction::IsolationLevel;
use crate::Error;
pub trait Instrumentation: Send + Sync + 'static {
fn on_event(&self, event: &Event<'_>);
}
pub(crate) struct NoOpInstrumentation;
impl Instrumentation for NoOpInstrumentation {
#[inline]
fn on_event(&self, _: &Event<'_>) {}
}
pub(crate) fn noop() -> Arc<dyn Instrumentation> {
Arc::new(NoOpInstrumentation)
}
#[non_exhaustive]
pub enum Event<'a> {
Connect {
host: &'a str,
port: u16,
},
Authenticated {
user: &'a str,
},
Disconnect {
reason: DisconnectReason,
},
PrepareStart {
name: &'a str,
sql: &'a str,
},
PrepareFinish {
name: &'a str,
param_oids: &'a [u32],
col_count: u16,
duration: Duration,
cache_hit: bool,
},
ExecuteStart {
stmt: StmtRef<'a>,
param_count: usize,
},
ExecuteFinish {
stmt: StmtRef<'a>,
rows: u64,
duration: Duration,
outcome: Outcome<'a>,
},
PipelineStart {
batch_len: usize,
},
PipelineFlush {
batch_len: usize,
total_duration: Duration,
},
TxBegin {
isolation: Option<IsolationLevel>,
},
TxCommit {
duration: Duration,
},
TxRollback {
duration: Duration,
reason: RollbackReason<'a>,
},
PoolAcquireStart {
pending: usize,
},
PoolAcquireFinish {
wait: Duration,
outcome: AcquireOutcome,
},
PoolRelease,
Notice {
severity: &'a str,
code: &'a str,
message: &'a str,
},
Notification {
channel: &'a str,
payload: &'a str,
pid: i32,
},
QueryMacro {
macro_name: &'a str,
query_id: &'a str,
sql: &'a str,
},
ReducerBegin {
name: &'a str,
},
ReducerCommit {
name: &'a str,
duration: Duration,
},
ReducerRollback {
name: &'a str,
error: &'a str,
},
MigrationApply {
version: &'a str,
duration: Duration,
checksum: &'a str,
},
MigrationDrift {
version: &'a str,
recorded: &'a str,
current: &'a str,
},
}
#[non_exhaustive]
pub enum StmtRef<'a> {
Named { name: &'a str },
Inline { sql: &'a str },
}
impl<'a> StmtRef<'a> {
pub fn sql_or_name(&self) -> &'a str {
match self {
StmtRef::Named { name } => name,
StmtRef::Inline { sql } => sql,
}
}
pub fn op_hint(&self) -> &'static str {
let s = self.sql_or_name();
let first = s.split_ascii_whitespace().next().unwrap_or("");
if first.eq_ignore_ascii_case("SELECT") {
"SELECT"
} else if first.eq_ignore_ascii_case("INSERT") {
"INSERT"
} else if first.eq_ignore_ascii_case("UPDATE") {
"UPDATE"
} else if first.eq_ignore_ascii_case("DELETE") {
"DELETE"
} else if first.eq_ignore_ascii_case("BEGIN") {
"BEGIN"
} else if first.eq_ignore_ascii_case("COMMIT") {
"COMMIT"
} else if first.eq_ignore_ascii_case("ROLLBACK") {
"ROLLBACK"
} else if first.eq_ignore_ascii_case("WITH") {
"WITH"
} else {
"OTHER"
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn op_hint_is_case_insensitive() {
assert_eq!(StmtRef::Inline { sql: "SELECT 1" }.op_hint(), "SELECT");
assert_eq!(StmtRef::Inline { sql: "select 1" }.op_hint(), "SELECT");
assert_eq!(
StmtRef::Inline {
sql: "Insert into t values (1)"
}
.op_hint(),
"INSERT"
);
assert_eq!(
StmtRef::Inline {
sql: " WITH cte AS (SELECT 1) SELECT * FROM cte"
}
.op_hint(),
"WITH"
);
assert_eq!(
StmtRef::Inline {
sql: "CREATE TABLE x ()"
}
.op_hint(),
"OTHER"
);
assert_eq!(StmtRef::Inline { sql: "" }.op_hint(), "OTHER");
}
}
#[non_exhaustive]
pub enum Outcome<'a> {
Ok,
Err(&'a Error),
}
#[non_exhaustive]
pub enum DisconnectReason {
Graceful,
BrokenPipe,
Timeout,
ServerKill,
}
#[derive(Debug)]
#[non_exhaustive]
pub enum RollbackReason<'a> {
Explicit,
Drop,
Error(&'a Error),
}
#[derive(Debug)]
#[non_exhaustive]
pub enum AcquireOutcome {
Ok,
Timeout,
PoolClosed,
Error,
}