Skip to main content

sentinel_driver/
instrumentation.rs

1//! Instrumentation surface for `sentinel-driver`.
2//!
3//! Install via `Config::with_instrumentation`, `Pool::with_instrumentation`,
4//! or `Connection::set_instrumentation`. Default is a no-op.
5
6use std::sync::Arc;
7use std::time::Duration;
8
9use crate::transaction::IsolationLevel;
10use crate::Error;
11
12/// A driver consumer's hook into every operation Sentinel performs.
13///
14/// Events are passed by borrow — the implementation MUST NOT retain
15/// the `Event` past the call. Clone data inside the handler if needed.
16pub trait Instrumentation: Send + Sync + 'static {
17    fn on_event(&self, event: &Event<'_>);
18}
19
20/// Default no-op. Returns immediately via vtable dispatch.
21pub(crate) struct NoOpInstrumentation;
22
23impl Instrumentation for NoOpInstrumentation {
24    #[inline]
25    fn on_event(&self, _: &Event<'_>) {}
26}
27
28pub(crate) fn noop() -> Arc<dyn Instrumentation> {
29    Arc::new(NoOpInstrumentation)
30}
31
32#[non_exhaustive]
33pub enum Event<'a> {
34    // Connection lifecycle
35    Connect {
36        host: &'a str,
37        port: u16,
38    },
39    Authenticated {
40        user: &'a str,
41    },
42    Disconnect {
43        reason: DisconnectReason,
44    },
45
46    // Prepare
47    PrepareStart {
48        name: &'a str,
49        sql: &'a str,
50    },
51    PrepareFinish {
52        name: &'a str,
53        param_oids: &'a [u32],
54        col_count: u16,
55        duration: Duration,
56        cache_hit: bool,
57    },
58
59    // Execute (covers query / query_one / query_opt / execute / query_typed*)
60    ExecuteStart {
61        stmt: StmtRef<'a>,
62        param_count: usize,
63    },
64    ExecuteFinish {
65        stmt: StmtRef<'a>,
66        rows: u64,
67        duration: Duration,
68        outcome: Outcome<'a>,
69    },
70
71    // Pipeline
72    PipelineStart {
73        batch_len: usize,
74    },
75    PipelineFlush {
76        batch_len: usize,
77        total_duration: Duration,
78    },
79
80    // Transaction
81    TxBegin {
82        isolation: Option<IsolationLevel>,
83    },
84    TxCommit {
85        duration: Duration,
86    },
87    TxRollback {
88        duration: Duration,
89        reason: RollbackReason<'a>,
90    },
91
92    // Pool
93    PoolAcquireStart {
94        pending: usize,
95    },
96    PoolAcquireFinish {
97        wait: Duration,
98        outcome: AcquireOutcome,
99    },
100    PoolRelease,
101
102    // PG async messages
103    Notice {
104        severity: &'a str,
105        code: &'a str,
106        message: &'a str,
107    },
108    Notification {
109        channel: &'a str,
110        payload: &'a str,
111        pid: i32,
112    },
113
114    // Sentinel-level (sntl crate emits these; driver itself never does)
115    QueryMacro {
116        macro_name: &'a str,
117        query_id: &'a str,
118        sql: &'a str,
119    },
120    ReducerBegin {
121        name: &'a str,
122    },
123    ReducerCommit {
124        name: &'a str,
125        duration: Duration,
126    },
127    ReducerRollback {
128        name: &'a str,
129        error: &'a str,
130    },
131    MigrationApply {
132        version: &'a str,
133        duration: Duration,
134        checksum: &'a str,
135    },
136    MigrationDrift {
137        version: &'a str,
138        recorded: &'a str,
139        current: &'a str,
140    },
141}
142
143#[non_exhaustive]
144pub enum StmtRef<'a> {
145    Named { name: &'a str },
146    Inline { sql: &'a str },
147}
148
149impl<'a> StmtRef<'a> {
150    /// SQL text if available (Inline), else the prepared name (Named).
151    pub fn sql_or_name(&self) -> &'a str {
152        match self {
153            StmtRef::Named { name } => name,
154            StmtRef::Inline { sql } => sql,
155        }
156    }
157
158    /// First word of the SQL, uppercased. "OTHER" if no leading keyword found.
159    ///
160    /// Allocation-free: uses `eq_ignore_ascii_case` rather than uppercasing
161    /// the input, so it is safe to call on every `ExecuteStart` event.
162    pub fn op_hint(&self) -> &'static str {
163        let s = self.sql_or_name();
164        let first = s.split_ascii_whitespace().next().unwrap_or("");
165        if first.eq_ignore_ascii_case("SELECT") {
166            "SELECT"
167        } else if first.eq_ignore_ascii_case("INSERT") {
168            "INSERT"
169        } else if first.eq_ignore_ascii_case("UPDATE") {
170            "UPDATE"
171        } else if first.eq_ignore_ascii_case("DELETE") {
172            "DELETE"
173        } else if first.eq_ignore_ascii_case("BEGIN") {
174            "BEGIN"
175        } else if first.eq_ignore_ascii_case("COMMIT") {
176            "COMMIT"
177        } else if first.eq_ignore_ascii_case("ROLLBACK") {
178            "ROLLBACK"
179        } else if first.eq_ignore_ascii_case("WITH") {
180            "WITH"
181        } else {
182            "OTHER"
183        }
184    }
185}
186
187#[cfg(test)]
188mod tests {
189    use super::*;
190
191    #[test]
192    fn op_hint_is_case_insensitive() {
193        assert_eq!(StmtRef::Inline { sql: "SELECT 1" }.op_hint(), "SELECT");
194        assert_eq!(StmtRef::Inline { sql: "select 1" }.op_hint(), "SELECT");
195        assert_eq!(
196            StmtRef::Inline {
197                sql: "Insert into t values (1)"
198            }
199            .op_hint(),
200            "INSERT"
201        );
202        assert_eq!(
203            StmtRef::Inline {
204                sql: "   WITH cte AS (SELECT 1) SELECT * FROM cte"
205            }
206            .op_hint(),
207            "WITH"
208        );
209        assert_eq!(
210            StmtRef::Inline {
211                sql: "CREATE TABLE x ()"
212            }
213            .op_hint(),
214            "OTHER"
215        );
216        assert_eq!(StmtRef::Inline { sql: "" }.op_hint(), "OTHER");
217    }
218}
219
220#[non_exhaustive]
221pub enum Outcome<'a> {
222    Ok,
223    Err(&'a Error),
224}
225
226#[non_exhaustive]
227pub enum DisconnectReason {
228    Graceful,
229    BrokenPipe,
230    Timeout,
231    ServerKill,
232}
233
234#[derive(Debug)]
235#[non_exhaustive]
236pub enum RollbackReason<'a> {
237    Explicit,
238    Drop,
239    Error(&'a Error),
240}
241
242#[derive(Debug)]
243#[non_exhaustive]
244pub enum AcquireOutcome {
245    Ok,
246    Timeout,
247    PoolClosed,
248    /// Any other failure: TLS handshake, DNS, auth, or connection setup.
249    /// Use the error message in adjacent log context to disambiguate.
250    Error,
251}