Skip to main content

sentinel_driver/
tracing_adapter.rs

1//! `tracing`-based `Instrumentation` impl.
2//!
3//! Records OTel-conformant `db.*` fields onto the current `tracing::Span`.
4//! Wire sites must have opened a span via `tracing::info_span!("db.query")`
5//! and entered it (`Span::in_scope` or `Instrument::instrument`); this
6//! adapter only records onto whatever span is current. It never creates
7//! new spans itself, so it works correctly across async await points.
8
9use std::time::Duration;
10
11use crate::{Event, Instrumentation, Outcome};
12
13/// Always-built tracing adapter.
14#[derive(Clone)]
15pub struct TracingInstrumentation {
16    /// Truncate `db.statement` past this many bytes. Default 1024.
17    pub max_sql_len: usize,
18    /// Emit a WARN-level event when a query's duration exceeds this.
19    /// `None` disables slow-query warnings.
20    pub slow_threshold: Option<Duration>,
21}
22
23impl Default for TracingInstrumentation {
24    fn default() -> Self {
25        Self {
26            max_sql_len: 1024,
27            slow_threshold: None,
28        }
29    }
30}
31
32impl Instrumentation for TracingInstrumentation {
33    fn on_event(&self, ev: &Event<'_>) {
34        let span = tracing::Span::current();
35        match ev {
36            Event::ExecuteStart { stmt, param_count } => {
37                let sql = truncate(stmt.sql_or_name(), self.max_sql_len);
38                span.record("db.system", "postgresql");
39                span.record("db.statement", tracing::field::display(&sql));
40                span.record("db.operation", stmt.op_hint());
41                span.record("sntl.param_count", *param_count);
42            }
43            Event::ExecuteFinish {
44                rows,
45                duration,
46                outcome,
47                ..
48            } => {
49                span.record("db.rows_affected", *rows);
50                span.record("sntl.duration_us", duration.as_micros() as i64);
51                if let Outcome::Err(e) = outcome {
52                    span.record("error", true);
53                    tracing::error!(error = %e, "query failed");
54                }
55                if matches!(self.slow_threshold, Some(t) if *duration > t) {
56                    tracing::warn!(slow = true, "slow query");
57                }
58            }
59            Event::PrepareFinish {
60                cache_hit,
61                duration,
62                ..
63            } => {
64                span.record("sntl.cache_hit", *cache_hit);
65                span.record("sntl.prepare_us", duration.as_micros() as i64);
66            }
67            Event::TxBegin { isolation } => {
68                tracing::info!(isolation = ?isolation, "tx begin");
69            }
70            Event::TxCommit { duration } => {
71                tracing::info!(duration_us = duration.as_micros() as i64, "tx commit");
72            }
73            Event::TxRollback { duration, reason } => {
74                tracing::warn!(
75                    duration_us = duration.as_micros() as i64,
76                    reason = ?reason,
77                    "tx rollback"
78                );
79            }
80            Event::PipelineFlush {
81                batch_len,
82                total_duration,
83            } => {
84                span.record("sntl.pipeline_batch_len", *batch_len);
85                span.record("sntl.duration_us", total_duration.as_micros() as i64);
86            }
87            Event::PoolAcquireFinish { wait, outcome } => {
88                tracing::debug!(
89                    wait_us = wait.as_micros() as i64,
90                    outcome = ?outcome,
91                    "pool acquire"
92                );
93            }
94            Event::Notice {
95                severity,
96                code,
97                message,
98            } => {
99                tracing::warn!(severity = %severity, code = %code, "{}", message);
100            }
101            Event::Notification { channel, pid, .. } => {
102                tracing::info!(channel = %channel, pid = pid, "notification");
103            }
104            // Other events (Connect, Disconnect, PoolRelease, PoolAcquireStart,
105            // PipelineStart, PrepareStart, ExecuteStart-already-handled,
106            // Authenticated, sntl-level events) are deliberately not echoed
107            // here — the sntl bridge will handle Sentinel-level ones, and
108            // the rest are pure correlation events without OTel mapping.
109            _ => {}
110        }
111    }
112}
113
114fn truncate(s: &str, max: usize) -> &str {
115    if s.len() <= max {
116        return s;
117    }
118    // Walk back to nearest char boundary to avoid panic on multi-byte
119    let mut idx = max;
120    while idx > 0 && !s.is_char_boundary(idx) {
121        idx -= 1;
122    }
123    &s[..idx]
124}
125
126#[cfg(test)]
127mod tests {
128    use super::*;
129
130    #[test]
131    fn truncate_short_returns_input() {
132        assert_eq!(truncate("hello", 1024), "hello");
133    }
134
135    #[test]
136    fn truncate_long_cuts_at_boundary() {
137        let s = "a".repeat(2000);
138        let t = truncate(&s, 1024);
139        assert_eq!(t.len(), 1024);
140    }
141
142    #[test]
143    fn truncate_respects_multibyte_boundary() {
144        // 'é' is 2 bytes in UTF-8; truncating mid-char must not panic
145        let s = "abcdéé";
146        let t = truncate(s, 5);
147        // max is 5; index 5 falls inside the 2-byte 'é' starting at index 4.
148        // The function should walk back to 4 (char boundary).
149        assert_eq!(t, "abcd");
150    }
151
152    #[test]
153    fn default_max_sql_len_is_1024() {
154        assert_eq!(TracingInstrumentation::default().max_sql_len, 1024);
155    }
156
157    #[test]
158    fn default_slow_threshold_is_none() {
159        assert!(TracingInstrumentation::default().slow_threshold.is_none());
160    }
161}