sentinel_driver/
tracing_adapter.rs1use std::time::Duration;
10
11use crate::{Event, Instrumentation, Outcome};
12
13#[derive(Clone)]
15pub struct TracingInstrumentation {
16 pub max_sql_len: usize,
18 pub slow_threshold: Option<Duration>,
21}
22
23impl Default for TracingInstrumentation {
24 fn default() -> Self {
25 Self {
26 max_sql_len: 1024,
27 slow_threshold: None,
28 }
29 }
30}
31
32impl Instrumentation for TracingInstrumentation {
33 fn on_event(&self, ev: &Event<'_>) {
34 let span = tracing::Span::current();
35 match ev {
36 Event::ExecuteStart { stmt, param_count } => {
37 let sql = truncate(stmt.sql_or_name(), self.max_sql_len);
38 span.record("db.system", "postgresql");
39 span.record("db.statement", tracing::field::display(&sql));
40 span.record("db.operation", stmt.op_hint());
41 span.record("sntl.param_count", *param_count);
42 }
43 Event::ExecuteFinish {
44 rows,
45 duration,
46 outcome,
47 ..
48 } => {
49 span.record("db.rows_affected", *rows);
50 span.record("sntl.duration_us", duration.as_micros() as i64);
51 if let Outcome::Err(e) = outcome {
52 span.record("error", true);
53 tracing::error!(error = %e, "query failed");
54 }
55 if matches!(self.slow_threshold, Some(t) if *duration > t) {
56 tracing::warn!(slow = true, "slow query");
57 }
58 }
59 Event::PrepareFinish {
60 cache_hit,
61 duration,
62 ..
63 } => {
64 span.record("sntl.cache_hit", *cache_hit);
65 span.record("sntl.prepare_us", duration.as_micros() as i64);
66 }
67 Event::TxBegin { isolation } => {
68 tracing::info!(isolation = ?isolation, "tx begin");
69 }
70 Event::TxCommit { duration } => {
71 tracing::info!(duration_us = duration.as_micros() as i64, "tx commit");
72 }
73 Event::TxRollback { duration, reason } => {
74 tracing::warn!(
75 duration_us = duration.as_micros() as i64,
76 reason = ?reason,
77 "tx rollback"
78 );
79 }
80 Event::PipelineFlush {
81 batch_len,
82 total_duration,
83 } => {
84 span.record("sntl.pipeline_batch_len", *batch_len);
85 span.record("sntl.duration_us", total_duration.as_micros() as i64);
86 }
87 Event::PoolAcquireFinish { wait, outcome } => {
88 tracing::debug!(
89 wait_us = wait.as_micros() as i64,
90 outcome = ?outcome,
91 "pool acquire"
92 );
93 }
94 Event::Notice {
95 severity,
96 code,
97 message,
98 } => {
99 tracing::warn!(severity = %severity, code = %code, "{}", message);
100 }
101 Event::Notification { channel, pid, .. } => {
102 tracing::info!(channel = %channel, pid = pid, "notification");
103 }
104 _ => {}
110 }
111 }
112}
113
114fn truncate(s: &str, max: usize) -> &str {
115 if s.len() <= max {
116 return s;
117 }
118 let mut idx = max;
120 while idx > 0 && !s.is_char_boundary(idx) {
121 idx -= 1;
122 }
123 &s[..idx]
124}
125
126#[cfg(test)]
127mod tests {
128 use super::*;
129
130 #[test]
131 fn truncate_short_returns_input() {
132 assert_eq!(truncate("hello", 1024), "hello");
133 }
134
135 #[test]
136 fn truncate_long_cuts_at_boundary() {
137 let s = "a".repeat(2000);
138 let t = truncate(&s, 1024);
139 assert_eq!(t.len(), 1024);
140 }
141
142 #[test]
143 fn truncate_respects_multibyte_boundary() {
144 let s = "abcdéé";
146 let t = truncate(s, 5);
147 assert_eq!(t, "abcd");
150 }
151
152 #[test]
153 fn default_max_sql_len_is_1024() {
154 assert_eq!(TracingInstrumentation::default().max_sql_len, 1024);
155 }
156
157 #[test]
158 fn default_slow_threshold_is_none() {
159 assert!(TracingInstrumentation::default().slow_threshold.is_none());
160 }
161}