1use std::sync::Arc;
7use std::time::Duration;
8
9use crate::transaction::IsolationLevel;
10use crate::Error;
11
12pub trait Instrumentation: Send + Sync + 'static {
17 fn on_event(&self, event: &Event<'_>);
18}
19
20pub(crate) struct NoOpInstrumentation;
22
23impl Instrumentation for NoOpInstrumentation {
24 #[inline]
25 fn on_event(&self, _: &Event<'_>) {}
26}
27
28pub(crate) fn noop() -> Arc<dyn Instrumentation> {
29 Arc::new(NoOpInstrumentation)
30}
31
32#[non_exhaustive]
33pub enum Event<'a> {
34 Connect {
36 host: &'a str,
37 port: u16,
38 },
39 Authenticated {
40 user: &'a str,
41 },
42 Disconnect {
43 reason: DisconnectReason,
44 },
45
46 PrepareStart {
48 name: &'a str,
49 sql: &'a str,
50 },
51 PrepareFinish {
52 name: &'a str,
53 param_oids: &'a [u32],
54 col_count: u16,
55 duration: Duration,
56 cache_hit: bool,
57 },
58
59 ExecuteStart {
61 stmt: StmtRef<'a>,
62 param_count: usize,
63 },
64 ExecuteFinish {
65 stmt: StmtRef<'a>,
66 rows: u64,
67 duration: Duration,
68 outcome: Outcome<'a>,
69 },
70
71 PipelineStart {
73 batch_len: usize,
74 },
75 PipelineFlush {
76 batch_len: usize,
77 total_duration: Duration,
78 },
79
80 TxBegin {
82 isolation: Option<IsolationLevel>,
83 },
84 TxCommit {
85 duration: Duration,
86 },
87 TxRollback {
88 duration: Duration,
89 reason: RollbackReason<'a>,
90 },
91
92 PoolAcquireStart {
94 pending: usize,
95 },
96 PoolAcquireFinish {
97 wait: Duration,
98 outcome: AcquireOutcome,
99 },
100 PoolRelease,
101
102 Notice {
104 severity: &'a str,
105 code: &'a str,
106 message: &'a str,
107 },
108 Notification {
109 channel: &'a str,
110 payload: &'a str,
111 pid: i32,
112 },
113
114 QueryMacro {
116 macro_name: &'a str,
117 query_id: &'a str,
118 sql: &'a str,
119 },
120 ReducerBegin {
121 name: &'a str,
122 },
123 ReducerCommit {
124 name: &'a str,
125 duration: Duration,
126 },
127 ReducerRollback {
128 name: &'a str,
129 error: &'a str,
130 },
131 MigrationApply {
132 version: &'a str,
133 duration: Duration,
134 checksum: &'a str,
135 },
136 MigrationDrift {
137 version: &'a str,
138 recorded: &'a str,
139 current: &'a str,
140 },
141}
142
143#[non_exhaustive]
144pub enum StmtRef<'a> {
145 Named { name: &'a str },
146 Inline { sql: &'a str },
147}
148
149impl<'a> StmtRef<'a> {
150 pub fn sql_or_name(&self) -> &'a str {
152 match self {
153 StmtRef::Named { name } => name,
154 StmtRef::Inline { sql } => sql,
155 }
156 }
157
158 pub fn op_hint(&self) -> &'static str {
163 let s = self.sql_or_name();
164 let first = s.split_ascii_whitespace().next().unwrap_or("");
165 if first.eq_ignore_ascii_case("SELECT") {
166 "SELECT"
167 } else if first.eq_ignore_ascii_case("INSERT") {
168 "INSERT"
169 } else if first.eq_ignore_ascii_case("UPDATE") {
170 "UPDATE"
171 } else if first.eq_ignore_ascii_case("DELETE") {
172 "DELETE"
173 } else if first.eq_ignore_ascii_case("BEGIN") {
174 "BEGIN"
175 } else if first.eq_ignore_ascii_case("COMMIT") {
176 "COMMIT"
177 } else if first.eq_ignore_ascii_case("ROLLBACK") {
178 "ROLLBACK"
179 } else if first.eq_ignore_ascii_case("WITH") {
180 "WITH"
181 } else {
182 "OTHER"
183 }
184 }
185}
186
187#[cfg(test)]
188mod tests {
189 use super::*;
190
191 #[test]
192 fn op_hint_is_case_insensitive() {
193 assert_eq!(StmtRef::Inline { sql: "SELECT 1" }.op_hint(), "SELECT");
194 assert_eq!(StmtRef::Inline { sql: "select 1" }.op_hint(), "SELECT");
195 assert_eq!(
196 StmtRef::Inline {
197 sql: "Insert into t values (1)"
198 }
199 .op_hint(),
200 "INSERT"
201 );
202 assert_eq!(
203 StmtRef::Inline {
204 sql: " WITH cte AS (SELECT 1) SELECT * FROM cte"
205 }
206 .op_hint(),
207 "WITH"
208 );
209 assert_eq!(
210 StmtRef::Inline {
211 sql: "CREATE TABLE x ()"
212 }
213 .op_hint(),
214 "OTHER"
215 );
216 assert_eq!(StmtRef::Inline { sql: "" }.op_hint(), "OTHER");
217 }
218}
219
220#[non_exhaustive]
221pub enum Outcome<'a> {
222 Ok,
223 Err(&'a Error),
224}
225
226#[non_exhaustive]
227pub enum DisconnectReason {
228 Graceful,
229 BrokenPipe,
230 Timeout,
231 ServerKill,
232}
233
234#[derive(Debug)]
235#[non_exhaustive]
236pub enum RollbackReason<'a> {
237 Explicit,
238 Drop,
239 Error(&'a Error),
240}
241
242#[derive(Debug)]
243#[non_exhaustive]
244pub enum AcquireOutcome {
245 Ok,
246 Timeout,
247 PoolClosed,
248 Error,
251}