Skip to main content

oxisql_core/
middleware.rs

1//! Middleware wrappers for [`Connection`] — logging and metrics.
2//!
3//! These are zero-cost wrappers in the sense that they delegate all operations
4//! to the inner connection; the only overhead is the middleware logic itself
5//! (timing, log formatting, counter increments).
6
7use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::Arc;
9use std::time::Instant;
10
11use async_trait::async_trait;
12
13use crate::schema::{ColumnInfo, ForeignKeyInfo, IndexInfo, TableInfo};
14use crate::{Connection, OxiSqlError, PreparedStatement, Row, ToSqlValue, Transaction};
15
16// ── LoggingConnection ─────────────────────────────────────────────────────────
17
18/// A [`Connection`] wrapper that logs every SQL operation.
19///
20/// Uses the [`log`] crate at `DEBUG` level for successful operations and
21/// `WARN` level for errors.  Enable logging in your application with any
22/// `log`-compatible backend (e.g. `env_logger`, `tracing`).
23///
24/// # Example
25///
26/// ```rust,no_run
27/// # async fn example() -> Result<(), oxisql_core::OxiSqlError> {
28/// // use oxisql_core::middleware::LoggingConnection;
29/// // let conn = LoggingConnection::new(backend_conn);
30/// // conn.execute("INSERT INTO t VALUES ($1)", &[&42i64]).await?;
31/// // Logs: [execute] INSERT INTO t VALUES ($1) — 123µs
32/// # Ok(())
33/// # }
34/// ```
35pub struct LoggingConnection<C> {
36    inner: C,
37    prefix: String,
38}
39
40impl<C: Connection> LoggingConnection<C> {
41    /// Wrap `inner` with logging.  Log lines have no prefix.
42    pub fn new(inner: C) -> Self {
43        Self {
44            inner,
45            prefix: String::new(),
46        }
47    }
48
49    /// Wrap `inner` with a label prepended to log lines.
50    pub fn with_prefix(inner: C, prefix: impl Into<String>) -> Self {
51        Self {
52            inner,
53            prefix: prefix.into(),
54        }
55    }
56
57    /// Consume the wrapper and return the inner connection.
58    pub fn into_inner(self) -> C {
59        self.inner
60    }
61
62    /// Return a reference to the prefix string.
63    pub fn prefix(&self) -> &str {
64        &self.prefix
65    }
66
67    fn fmt_prefix(&self) -> String {
68        if self.prefix.is_empty() {
69            String::new()
70        } else {
71            format!("{} ", self.prefix)
72        }
73    }
74}
75
76#[async_trait]
77impl<C: Connection + Send + Sync> Connection for LoggingConnection<C> {
78    async fn execute(&self, sql: &str, params: &[&dyn ToSqlValue]) -> Result<u64, OxiSqlError> {
79        let t = Instant::now();
80        let result = self.inner.execute(sql, params).await;
81        let elapsed = t.elapsed();
82        match &result {
83            Ok(n) => log::debug!(
84                "[{}execute] {} row(s) affected — {:.3}ms{}",
85                self.fmt_prefix(),
86                n,
87                elapsed.as_secs_f64() * 1000.0,
88                truncate_sql(sql),
89            ),
90            Err(e) => log::warn!(
91                "[{}execute] ERROR {} — {:.3}ms{}",
92                self.fmt_prefix(),
93                e,
94                elapsed.as_secs_f64() * 1000.0,
95                truncate_sql(sql),
96            ),
97        }
98        result
99    }
100
101    async fn query(&self, sql: &str, params: &[&dyn ToSqlValue]) -> Result<Vec<Row>, OxiSqlError> {
102        let t = Instant::now();
103        let result = self.inner.query(sql, params).await;
104        let elapsed = t.elapsed();
105        match &result {
106            Ok(rows) => log::debug!(
107                "[{}query] {} row(s) — {:.3}ms{}",
108                self.fmt_prefix(),
109                rows.len(),
110                elapsed.as_secs_f64() * 1000.0,
111                truncate_sql(sql),
112            ),
113            Err(e) => log::warn!(
114                "[{}query] ERROR {} — {:.3}ms{}",
115                self.fmt_prefix(),
116                e,
117                elapsed.as_secs_f64() * 1000.0,
118                truncate_sql(sql),
119            ),
120        }
121        result
122    }
123
124    async fn transaction(&self) -> Result<Box<dyn Transaction + '_>, OxiSqlError> {
125        log::debug!("[{}transaction] BEGIN", self.fmt_prefix());
126        self.inner.transaction().await
127    }
128
129    async fn execute_batch(&self, sql: &str) -> Result<u64, OxiSqlError> {
130        let t = Instant::now();
131        let result = self.inner.execute_batch(sql).await;
132        log::debug!(
133            "[{}execute_batch] {:.3}ms{}",
134            self.fmt_prefix(),
135            t.elapsed().as_secs_f64() * 1000.0,
136            truncate_sql(sql),
137        );
138        result
139    }
140
141    async fn ping(&self) -> Result<(), OxiSqlError> {
142        log::debug!("[{}ping]", self.fmt_prefix());
143        self.inner.ping().await
144    }
145
146    async fn prepare(&self, sql: &str) -> Result<Box<dyn PreparedStatement + '_>, OxiSqlError> {
147        log::debug!("[{}prepare]{}", self.fmt_prefix(), truncate_sql(sql));
148        self.inner.prepare(sql).await
149    }
150
151    async fn tables(&self) -> Result<Vec<TableInfo>, OxiSqlError> {
152        self.inner.tables().await
153    }
154
155    async fn columns(&self, table: &str) -> Result<Vec<ColumnInfo>, OxiSqlError> {
156        self.inner.columns(table).await
157    }
158
159    async fn indexes(&self, table: &str) -> Result<Vec<IndexInfo>, OxiSqlError> {
160        self.inner.indexes(table).await
161    }
162
163    async fn foreign_keys(&self, table: &str) -> Result<Vec<ForeignKeyInfo>, OxiSqlError> {
164        self.inner.foreign_keys(table).await
165    }
166}
167
168// ── TracingConnection ─────────────────────────────────────────────────────────
169
170/// A [`Connection`] wrapper that emits [`tracing`](https://docs.rs/tracing) spans
171/// and events for every SQL operation.
172///
173/// Mirrors [`LoggingConnection`] exactly but uses `tracing::debug!` /
174/// `tracing::warn!` instead of `log::debug!` / `log::warn!`.
175///
176/// Available only when the `tracing` feature is enabled.
177///
178/// # Example
179///
180/// ```rust,no_run
181/// # #[cfg(feature = "tracing")]
182/// # async fn example() -> Result<(), oxisql_core::OxiSqlError> {
183/// // use oxisql_core::TracingConnection;
184/// // let conn = TracingConnection::new(backend_conn);
185/// // conn.execute("INSERT INTO t VALUES ($1)", &[&42i64]).await?;
186/// // Emits a tracing DEBUG event: [execute] 1 row(s) affected — …ms | …
187/// # Ok(())
188/// # }
189/// ```
190#[cfg(feature = "tracing")]
191pub struct TracingConnection<C> {
192    inner: C,
193    prefix: String,
194}
195
196#[cfg(feature = "tracing")]
197impl<C: Connection> TracingConnection<C> {
198    /// Wrap `inner` with tracing.  Events have no prefix.
199    pub fn new(inner: C) -> Self {
200        Self {
201            inner,
202            prefix: String::new(),
203        }
204    }
205
206    /// Wrap `inner` with a label prepended to tracing event fields.
207    pub fn with_prefix(inner: C, prefix: impl Into<String>) -> Self {
208        Self {
209            inner,
210            prefix: prefix.into(),
211        }
212    }
213
214    /// Consume the wrapper and return the inner connection.
215    pub fn into_inner(self) -> C {
216        self.inner
217    }
218
219    /// Return a reference to the prefix string.
220    pub fn prefix(&self) -> &str {
221        &self.prefix
222    }
223
224    fn fmt_prefix(&self) -> String {
225        if self.prefix.is_empty() {
226            String::new()
227        } else {
228            format!("{} ", self.prefix)
229        }
230    }
231}
232
233#[cfg(feature = "tracing")]
234#[async_trait]
235impl<C: Connection + Send + Sync> Connection for TracingConnection<C> {
236    async fn execute(&self, sql: &str, params: &[&dyn ToSqlValue]) -> Result<u64, OxiSqlError> {
237        let t = Instant::now();
238        let result = self.inner.execute(sql, params).await;
239        let elapsed = t.elapsed();
240        match &result {
241            Ok(n) => tracing::debug!(
242                "[{}execute] {} row(s) affected — {:.3}ms{}",
243                self.fmt_prefix(),
244                n,
245                elapsed.as_secs_f64() * 1000.0,
246                truncate_sql(sql),
247            ),
248            Err(e) => tracing::warn!(
249                "[{}execute] ERROR {} — {:.3}ms{}",
250                self.fmt_prefix(),
251                e,
252                elapsed.as_secs_f64() * 1000.0,
253                truncate_sql(sql),
254            ),
255        }
256        result
257    }
258
259    async fn query(&self, sql: &str, params: &[&dyn ToSqlValue]) -> Result<Vec<Row>, OxiSqlError> {
260        let t = Instant::now();
261        let result = self.inner.query(sql, params).await;
262        let elapsed = t.elapsed();
263        match &result {
264            Ok(rows) => tracing::debug!(
265                "[{}query] {} row(s) — {:.3}ms{}",
266                self.fmt_prefix(),
267                rows.len(),
268                elapsed.as_secs_f64() * 1000.0,
269                truncate_sql(sql),
270            ),
271            Err(e) => tracing::warn!(
272                "[{}query] ERROR {} — {:.3}ms{}",
273                self.fmt_prefix(),
274                e,
275                elapsed.as_secs_f64() * 1000.0,
276                truncate_sql(sql),
277            ),
278        }
279        result
280    }
281
282    async fn transaction(&self) -> Result<Box<dyn Transaction + '_>, OxiSqlError> {
283        tracing::debug!("[{}transaction] BEGIN", self.fmt_prefix());
284        self.inner.transaction().await
285    }
286
287    async fn execute_batch(&self, sql: &str) -> Result<u64, OxiSqlError> {
288        let t = Instant::now();
289        let result = self.inner.execute_batch(sql).await;
290        tracing::debug!(
291            "[{}execute_batch] {:.3}ms{}",
292            self.fmt_prefix(),
293            t.elapsed().as_secs_f64() * 1000.0,
294            truncate_sql(sql),
295        );
296        result
297    }
298
299    async fn ping(&self) -> Result<(), OxiSqlError> {
300        tracing::debug!("[{}ping]", self.fmt_prefix());
301        self.inner.ping().await
302    }
303
304    async fn prepare(&self, sql: &str) -> Result<Box<dyn PreparedStatement + '_>, OxiSqlError> {
305        tracing::debug!("[{}prepare]{}", self.fmt_prefix(), truncate_sql(sql));
306        self.inner.prepare(sql).await
307    }
308
309    async fn tables(&self) -> Result<Vec<TableInfo>, OxiSqlError> {
310        self.inner.tables().await
311    }
312
313    async fn columns(&self, table: &str) -> Result<Vec<ColumnInfo>, OxiSqlError> {
314        self.inner.columns(table).await
315    }
316
317    async fn indexes(&self, table: &str) -> Result<Vec<IndexInfo>, OxiSqlError> {
318        self.inner.indexes(table).await
319    }
320
321    async fn foreign_keys(&self, table: &str) -> Result<Vec<ForeignKeyInfo>, OxiSqlError> {
322        self.inner.foreign_keys(table).await
323    }
324}
325
326#[cfg(all(test, feature = "tracing"))]
327mod tracing_tests {
328    use std::sync::{Arc, Mutex};
329
330    use tracing_subscriber::fmt::MakeWriter;
331
332    use crate::{Connection, OxiSqlError, Row, ToSqlValue, Transaction, Value};
333
334    use super::TracingConnection;
335
336    // ── Minimal in-memory writer for tracing_subscriber ──────────────────────
337
338    #[derive(Clone, Default)]
339    struct MemWriter(Arc<Mutex<Vec<u8>>>);
340
341    impl MemWriter {
342        fn contents(&self) -> String {
343            let data = self.0.lock().expect("lock");
344            String::from_utf8_lossy(&data).into_owned()
345        }
346    }
347
348    impl std::io::Write for MemWriter {
349        fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
350            self.0.lock().expect("lock").extend_from_slice(buf);
351            Ok(buf.len())
352        }
353        fn flush(&mut self) -> std::io::Result<()> {
354            Ok(())
355        }
356    }
357
358    impl<'a> MakeWriter<'a> for MemWriter {
359        type Writer = MemWriter;
360        fn make_writer(&'a self) -> Self::Writer {
361            self.clone()
362        }
363    }
364
365    // ── Mock Connection ───────────────────────────────────────────────────────
366
367    struct MockConn;
368
369    #[async_trait::async_trait]
370    impl Connection for MockConn {
371        async fn execute(
372            &self,
373            _sql: &str,
374            _params: &[&dyn ToSqlValue],
375        ) -> Result<u64, OxiSqlError> {
376            Ok(1)
377        }
378
379        async fn query(
380            &self,
381            _sql: &str,
382            _params: &[&dyn ToSqlValue],
383        ) -> Result<Vec<Row>, OxiSqlError> {
384            Ok(vec![Row::new(vec!["n".into()], vec![Value::I64(42)])])
385        }
386
387        async fn transaction(&self) -> Result<Box<dyn Transaction + '_>, OxiSqlError> {
388            Err(OxiSqlError::Other("no txn".into()))
389        }
390    }
391
392    // ── Tests ─────────────────────────────────────────────────────────────────
393
394    #[tokio::test]
395    async fn tracing_execute_emits_event() {
396        let writer = MemWriter::default();
397        let subscriber = tracing_subscriber::fmt()
398            .with_writer(writer.clone())
399            .with_ansi(false)
400            .with_max_level(tracing::Level::DEBUG)
401            .finish();
402        let _guard = tracing::subscriber::set_default(subscriber);
403
404        let conn = TracingConnection::new(MockConn);
405        let rows_affected = conn
406            .execute("INSERT INTO t VALUES ($1)", &[&42i64])
407            .await
408            .expect("execute ok");
409        assert_eq!(rows_affected, 1);
410
411        let output = writer.contents();
412        assert!(
413            output.contains("execute"),
414            "expected 'execute' in: {output}"
415        );
416    }
417
418    #[tokio::test]
419    async fn tracing_query_emits_event() {
420        let writer = MemWriter::default();
421        let subscriber = tracing_subscriber::fmt()
422            .with_writer(writer.clone())
423            .with_ansi(false)
424            .with_max_level(tracing::Level::DEBUG)
425            .finish();
426        let _guard = tracing::subscriber::set_default(subscriber);
427
428        let conn = TracingConnection::new(MockConn);
429        let rows = conn.query("SELECT n FROM t", &[]).await.expect("query ok");
430        assert_eq!(rows.len(), 1);
431
432        let output = writer.contents();
433        assert!(output.contains("query"), "expected 'query' in: {output}");
434    }
435
436    #[test]
437    fn tracing_conn_prefix_and_accessors() {
438        let conn = TracingConnection::with_prefix(MockConn, "mydb");
439        assert_eq!(conn.prefix(), "mydb");
440        // into_inner recovers the wrapped conn without panic
441        let _inner: MockConn = conn.into_inner();
442    }
443}
444
445// ── MetricsConnection ─────────────────────────────────────────────────────────
446
447/// Counters tracked by [`MetricsConnection`].
448#[derive(Debug, Default)]
449pub struct ConnectionMetrics {
450    /// Total number of `execute` calls.
451    pub executes: AtomicU64,
452    /// Total number of `query` calls.
453    pub queries: AtomicU64,
454    /// Total number of errors (execute + query).
455    pub errors: AtomicU64,
456    /// Total microseconds spent in `execute` calls.
457    pub execute_us: AtomicU64,
458    /// Total microseconds spent in `query` calls.
459    pub query_us: AtomicU64,
460}
461
462impl ConnectionMetrics {
463    /// Return a snapshot of the metrics as plain integers.
464    pub fn snapshot(&self) -> MetricsSnapshot {
465        MetricsSnapshot {
466            executes: self.executes.load(Ordering::Relaxed),
467            queries: self.queries.load(Ordering::Relaxed),
468            errors: self.errors.load(Ordering::Relaxed),
469            execute_us: self.execute_us.load(Ordering::Relaxed),
470            query_us: self.query_us.load(Ordering::Relaxed),
471        }
472    }
473}
474
475/// A point-in-time snapshot of [`ConnectionMetrics`].
476#[derive(Debug, Clone, PartialEq, Eq)]
477pub struct MetricsSnapshot {
478    /// Total number of `execute` calls recorded so far.
479    pub executes: u64,
480    /// Total number of `query` calls recorded so far.
481    pub queries: u64,
482    /// Total number of errors (execute + query) recorded so far.
483    pub errors: u64,
484    /// Total microseconds spent in `execute` calls.
485    pub execute_us: u64,
486    /// Total microseconds spent in `query` calls.
487    pub query_us: u64,
488}
489
490/// A [`Connection`] wrapper that counts operations and measures latency.
491///
492/// Access metrics through [`MetricsConnection::metrics`].  The metrics
493/// counters are `AtomicU64` so they are safe to read concurrently.
494///
495/// # Example
496///
497/// ```rust,no_run
498/// # async fn example() -> Result<(), oxisql_core::OxiSqlError> {
499/// // use oxisql_core::middleware::MetricsConnection;
500/// // use std::sync::Arc;
501/// // let metrics = Arc::new(oxisql_core::middleware::ConnectionMetrics::default());
502/// // let conn = MetricsConnection::new(backend_conn, Arc::clone(&metrics));
503/// // conn.execute("INSERT INTO t VALUES ($1)", &[&42i64]).await?;
504/// // println!("{:?}", metrics.snapshot());
505/// # Ok(())
506/// # }
507/// ```
508pub struct MetricsConnection<C> {
509    inner: C,
510    metrics: Arc<ConnectionMetrics>,
511}
512
513impl<C: Connection> MetricsConnection<C> {
514    /// Wrap `inner` with the given shared metrics store.
515    pub fn new(inner: C, metrics: Arc<ConnectionMetrics>) -> Self {
516        Self { inner, metrics }
517    }
518
519    /// Return a reference to the shared metrics.
520    pub fn metrics(&self) -> &Arc<ConnectionMetrics> {
521        &self.metrics
522    }
523
524    /// Consume the wrapper and return the inner connection.
525    pub fn into_inner(self) -> C {
526        self.inner
527    }
528}
529
530#[async_trait]
531impl<C: Connection + Send + Sync> Connection for MetricsConnection<C> {
532    async fn execute(&self, sql: &str, params: &[&dyn ToSqlValue]) -> Result<u64, OxiSqlError> {
533        let t = Instant::now();
534        let result = self.inner.execute(sql, params).await;
535        let us = t.elapsed().as_micros() as u64;
536        self.metrics.executes.fetch_add(1, Ordering::Relaxed);
537        self.metrics.execute_us.fetch_add(us, Ordering::Relaxed);
538        if result.is_err() {
539            self.metrics.errors.fetch_add(1, Ordering::Relaxed);
540        }
541        result
542    }
543
544    async fn query(&self, sql: &str, params: &[&dyn ToSqlValue]) -> Result<Vec<Row>, OxiSqlError> {
545        let t = Instant::now();
546        let result = self.inner.query(sql, params).await;
547        let us = t.elapsed().as_micros() as u64;
548        self.metrics.queries.fetch_add(1, Ordering::Relaxed);
549        self.metrics.query_us.fetch_add(us, Ordering::Relaxed);
550        if result.is_err() {
551            self.metrics.errors.fetch_add(1, Ordering::Relaxed);
552        }
553        result
554    }
555
556    async fn transaction(&self) -> Result<Box<dyn Transaction + '_>, OxiSqlError> {
557        self.inner.transaction().await
558    }
559
560    async fn execute_batch(&self, sql: &str) -> Result<u64, OxiSqlError> {
561        self.inner.execute_batch(sql).await
562    }
563
564    async fn ping(&self) -> Result<(), OxiSqlError> {
565        self.inner.ping().await
566    }
567
568    async fn prepare(&self, sql: &str) -> Result<Box<dyn PreparedStatement + '_>, OxiSqlError> {
569        self.inner.prepare(sql).await
570    }
571
572    async fn tables(&self) -> Result<Vec<TableInfo>, OxiSqlError> {
573        self.inner.tables().await
574    }
575
576    async fn columns(&self, table: &str) -> Result<Vec<ColumnInfo>, OxiSqlError> {
577        self.inner.columns(table).await
578    }
579
580    async fn indexes(&self, table: &str) -> Result<Vec<IndexInfo>, OxiSqlError> {
581        self.inner.indexes(table).await
582    }
583
584    async fn foreign_keys(&self, table: &str) -> Result<Vec<ForeignKeyInfo>, OxiSqlError> {
585        self.inner.foreign_keys(table).await
586    }
587}
588
589// ── RetryConnection ──────────────────────────────────────────────────────────
590
591/// A retry predicate function type: takes an error, returns `true` if it is
592/// considered transient (i.e., worth retrying).
593pub type RetryPredicate = Arc<dyn Fn(&OxiSqlError) -> bool + Send + Sync>;
594
595/// Policy governing how [`RetryConnection`] retries failed operations.
596///
597/// The delay sequence for retries is:
598/// `initial_delay_ms * backoff_factor^attempt`, capped at `max_delay_ms`.
599#[derive(Clone)]
600pub struct RetryPolicy {
601    /// Maximum number of retry attempts after the first failure.
602    pub max_retries: u32,
603    /// Initial delay in milliseconds before the first retry.
604    pub initial_delay_ms: u64,
605    /// Multiplicative backoff factor applied to each successive delay.
606    pub backoff_factor: f64,
607    /// Upper bound on the delay between retries in milliseconds.
608    pub max_delay_ms: u64,
609    /// Predicate that returns `true` for transient errors worth retrying.
610    pub predicate: RetryPredicate,
611}
612
613fn default_retry_predicate() -> RetryPredicate {
614    Arc::new(|e: &OxiSqlError| match e {
615        OxiSqlError::Timeout(_) => true,
616        OxiSqlError::Execution(msg) => {
617            msg.contains("connection reset")
618                || msg.contains("broken pipe")
619                || msg.contains("connection refused")
620                || msg.contains("timed out")
621                || msg.contains("temporarily unavailable")
622        }
623        _ => false,
624    })
625}
626
627impl Default for RetryPolicy {
628    fn default() -> Self {
629        Self {
630            max_retries: 3,
631            initial_delay_ms: 100,
632            backoff_factor: 2.0,
633            max_delay_ms: 5_000,
634            predicate: default_retry_predicate(),
635        }
636    }
637}
638
639impl std::fmt::Debug for RetryPolicy {
640    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
641        f.debug_struct("RetryPolicy")
642            .field("max_retries", &self.max_retries)
643            .field("initial_delay_ms", &self.initial_delay_ms)
644            .field("backoff_factor", &self.backoff_factor)
645            .field("max_delay_ms", &self.max_delay_ms)
646            .finish()
647    }
648}
649
650/// A [`Connection`] wrapper that retries transient failures according to a
651/// [`RetryPolicy`].
652///
653/// On each operation, if the inner connection returns an error that the policy
654/// predicate considers transient, the wrapper waits an exponentially increasing
655/// delay and retries up to `max_retries` times.
656///
657/// Introspection methods (`tables`, `columns`, `indexes`, `foreign_keys`) and
658/// `prepare` are delegated directly to the inner connection without retrying,
659/// as they are either non-transient or stateful operations.
660///
661/// # Example
662///
663/// ```rust,no_run
664/// # use oxisql_core::middleware::{RetryConnection, RetryPolicy};
665/// # use oxisql_core::Connection;
666/// # async fn example<C: Connection>(inner: C) {
667/// let conn = RetryConnection::new(inner, RetryPolicy::default());
668/// # }
669/// ```
670pub struct RetryConnection<C> {
671    inner: C,
672    policy: RetryPolicy,
673}
674
675impl<C: Connection> RetryConnection<C> {
676    /// Wrap `inner` with the given retry `policy`.
677    pub fn new(inner: C, policy: RetryPolicy) -> Self {
678        Self { inner, policy }
679    }
680
681    /// Return a reference to the inner connection.
682    pub fn inner(&self) -> &C {
683        &self.inner
684    }
685
686    /// Consume the wrapper and return the inner connection.
687    pub fn into_inner(self) -> C {
688        self.inner
689    }
690
691    /// Compute the delay in milliseconds for a given retry attempt (0-indexed).
692    pub(crate) fn delay_ms(&self, attempt: u32) -> u64 {
693        let delay =
694            self.policy.initial_delay_ms as f64 * self.policy.backoff_factor.powi(attempt as i32);
695        (delay as u64).min(self.policy.max_delay_ms)
696    }
697}
698
699#[async_trait]
700impl<C: Connection + Send + Sync> Connection for RetryConnection<C> {
701    async fn execute(&self, sql: &str, params: &[&dyn ToSqlValue]) -> Result<u64, OxiSqlError> {
702        let mut last_err: Option<OxiSqlError> = None;
703        for attempt in 0..=self.policy.max_retries {
704            match self.inner.execute(sql, params).await {
705                Ok(n) => return Ok(n),
706                Err(e) => {
707                    if attempt < self.policy.max_retries && (self.policy.predicate)(&e) {
708                        let delay = self.delay_ms(attempt);
709                        tokio::time::sleep(tokio::time::Duration::from_millis(delay)).await;
710                        last_err = Some(e);
711                    } else {
712                        return Err(e);
713                    }
714                }
715            }
716        }
717        Err(last_err.unwrap_or_else(|| OxiSqlError::Other("retry exhausted".into())))
718    }
719
720    async fn query(&self, sql: &str, params: &[&dyn ToSqlValue]) -> Result<Vec<Row>, OxiSqlError> {
721        let mut last_err: Option<OxiSqlError> = None;
722        for attempt in 0..=self.policy.max_retries {
723            match self.inner.query(sql, params).await {
724                Ok(rows) => return Ok(rows),
725                Err(e) => {
726                    if attempt < self.policy.max_retries && (self.policy.predicate)(&e) {
727                        let delay = self.delay_ms(attempt);
728                        tokio::time::sleep(tokio::time::Duration::from_millis(delay)).await;
729                        last_err = Some(e);
730                    } else {
731                        return Err(e);
732                    }
733                }
734            }
735        }
736        Err(last_err.unwrap_or_else(|| OxiSqlError::Other("retry exhausted".into())))
737    }
738
739    async fn transaction(&self) -> Result<Box<dyn crate::traits::Transaction + '_>, OxiSqlError> {
740        // Transactions involve state that cannot be safely replayed; no retry.
741        self.inner.transaction().await
742    }
743
744    async fn execute_batch(&self, sql: &str) -> Result<u64, OxiSqlError> {
745        // Delegate to inner; individual statements within the batch are
746        // not safe to retry automatically as a unit.
747        self.inner.execute_batch(sql).await
748    }
749
750    async fn ping(&self) -> Result<(), OxiSqlError> {
751        let mut last_err: Option<OxiSqlError> = None;
752        for attempt in 0..=self.policy.max_retries {
753            match self.inner.ping().await {
754                Ok(()) => return Ok(()),
755                Err(e) => {
756                    if attempt < self.policy.max_retries && (self.policy.predicate)(&e) {
757                        let delay = self.delay_ms(attempt);
758                        tokio::time::sleep(tokio::time::Duration::from_millis(delay)).await;
759                        last_err = Some(e);
760                    } else {
761                        return Err(e);
762                    }
763                }
764            }
765        }
766        Err(last_err.unwrap_or_else(|| OxiSqlError::Other("retry exhausted".into())))
767    }
768
769    async fn prepare(
770        &self,
771        sql: &str,
772    ) -> Result<Box<dyn crate::PreparedStatement + '_>, OxiSqlError> {
773        self.inner.prepare(sql).await
774    }
775
776    async fn tables(&self) -> Result<Vec<TableInfo>, OxiSqlError> {
777        self.inner.tables().await
778    }
779
780    async fn columns(&self, table: &str) -> Result<Vec<ColumnInfo>, OxiSqlError> {
781        self.inner.columns(table).await
782    }
783
784    async fn indexes(&self, table: &str) -> Result<Vec<IndexInfo>, OxiSqlError> {
785        self.inner.indexes(table).await
786    }
787
788    async fn foreign_keys(&self, table: &str) -> Result<Vec<ForeignKeyInfo>, OxiSqlError> {
789        self.inner.foreign_keys(table).await
790    }
791}
792
793// ── helpers ───────────────────────────────────────────────────────────────────
794
795/// Truncate SQL for display, taking care not to split on a UTF-8 char boundary.
796fn truncate_sql(sql: &str) -> String {
797    const MAX: usize = 80;
798    let trimmed = sql.trim();
799    if trimmed.len() <= MAX {
800        format!(" | {trimmed}")
801    } else {
802        // Find the char boundary at or before MAX bytes
803        let cut = trimmed
804            .char_indices()
805            .nth(MAX)
806            .map(|(i, _)| i)
807            .unwrap_or(trimmed.len());
808        format!(" | {}…", &trimmed[..cut])
809    }
810}