1use std::sync::atomic::{AtomicU64, Ordering};
8use std::sync::Arc;
9use std::time::Instant;
10
11use async_trait::async_trait;
12
13use crate::schema::{ColumnInfo, ForeignKeyInfo, IndexInfo, TableInfo};
14use crate::{Connection, OxiSqlError, PreparedStatement, Row, ToSqlValue, Transaction};
15
16pub struct LoggingConnection<C> {
36 inner: C,
37 prefix: String,
38}
39
40impl<C: Connection> LoggingConnection<C> {
41 pub fn new(inner: C) -> Self {
43 Self {
44 inner,
45 prefix: String::new(),
46 }
47 }
48
49 pub fn with_prefix(inner: C, prefix: impl Into<String>) -> Self {
51 Self {
52 inner,
53 prefix: prefix.into(),
54 }
55 }
56
57 pub fn into_inner(self) -> C {
59 self.inner
60 }
61
62 pub fn prefix(&self) -> &str {
64 &self.prefix
65 }
66
67 fn fmt_prefix(&self) -> String {
68 if self.prefix.is_empty() {
69 String::new()
70 } else {
71 format!("{} ", self.prefix)
72 }
73 }
74}
75
76#[async_trait]
77impl<C: Connection + Send + Sync> Connection for LoggingConnection<C> {
78 async fn execute(&self, sql: &str, params: &[&dyn ToSqlValue]) -> Result<u64, OxiSqlError> {
79 let t = Instant::now();
80 let result = self.inner.execute(sql, params).await;
81 let elapsed = t.elapsed();
82 match &result {
83 Ok(n) => log::debug!(
84 "[{}execute] {} row(s) affected — {:.3}ms{}",
85 self.fmt_prefix(),
86 n,
87 elapsed.as_secs_f64() * 1000.0,
88 truncate_sql(sql),
89 ),
90 Err(e) => log::warn!(
91 "[{}execute] ERROR {} — {:.3}ms{}",
92 self.fmt_prefix(),
93 e,
94 elapsed.as_secs_f64() * 1000.0,
95 truncate_sql(sql),
96 ),
97 }
98 result
99 }
100
101 async fn query(&self, sql: &str, params: &[&dyn ToSqlValue]) -> Result<Vec<Row>, OxiSqlError> {
102 let t = Instant::now();
103 let result = self.inner.query(sql, params).await;
104 let elapsed = t.elapsed();
105 match &result {
106 Ok(rows) => log::debug!(
107 "[{}query] {} row(s) — {:.3}ms{}",
108 self.fmt_prefix(),
109 rows.len(),
110 elapsed.as_secs_f64() * 1000.0,
111 truncate_sql(sql),
112 ),
113 Err(e) => log::warn!(
114 "[{}query] ERROR {} — {:.3}ms{}",
115 self.fmt_prefix(),
116 e,
117 elapsed.as_secs_f64() * 1000.0,
118 truncate_sql(sql),
119 ),
120 }
121 result
122 }
123
124 async fn transaction(&self) -> Result<Box<dyn Transaction + '_>, OxiSqlError> {
125 log::debug!("[{}transaction] BEGIN", self.fmt_prefix());
126 self.inner.transaction().await
127 }
128
129 async fn execute_batch(&self, sql: &str) -> Result<u64, OxiSqlError> {
130 let t = Instant::now();
131 let result = self.inner.execute_batch(sql).await;
132 log::debug!(
133 "[{}execute_batch] {:.3}ms{}",
134 self.fmt_prefix(),
135 t.elapsed().as_secs_f64() * 1000.0,
136 truncate_sql(sql),
137 );
138 result
139 }
140
141 async fn ping(&self) -> Result<(), OxiSqlError> {
142 log::debug!("[{}ping]", self.fmt_prefix());
143 self.inner.ping().await
144 }
145
146 async fn prepare(&self, sql: &str) -> Result<Box<dyn PreparedStatement + '_>, OxiSqlError> {
147 log::debug!("[{}prepare]{}", self.fmt_prefix(), truncate_sql(sql));
148 self.inner.prepare(sql).await
149 }
150
151 async fn tables(&self) -> Result<Vec<TableInfo>, OxiSqlError> {
152 self.inner.tables().await
153 }
154
155 async fn columns(&self, table: &str) -> Result<Vec<ColumnInfo>, OxiSqlError> {
156 self.inner.columns(table).await
157 }
158
159 async fn indexes(&self, table: &str) -> Result<Vec<IndexInfo>, OxiSqlError> {
160 self.inner.indexes(table).await
161 }
162
163 async fn foreign_keys(&self, table: &str) -> Result<Vec<ForeignKeyInfo>, OxiSqlError> {
164 self.inner.foreign_keys(table).await
165 }
166}
167
168#[cfg(feature = "tracing")]
191pub struct TracingConnection<C> {
192 inner: C,
193 prefix: String,
194}
195
196#[cfg(feature = "tracing")]
197impl<C: Connection> TracingConnection<C> {
198 pub fn new(inner: C) -> Self {
200 Self {
201 inner,
202 prefix: String::new(),
203 }
204 }
205
206 pub fn with_prefix(inner: C, prefix: impl Into<String>) -> Self {
208 Self {
209 inner,
210 prefix: prefix.into(),
211 }
212 }
213
214 pub fn into_inner(self) -> C {
216 self.inner
217 }
218
219 pub fn prefix(&self) -> &str {
221 &self.prefix
222 }
223
224 fn fmt_prefix(&self) -> String {
225 if self.prefix.is_empty() {
226 String::new()
227 } else {
228 format!("{} ", self.prefix)
229 }
230 }
231}
232
233#[cfg(feature = "tracing")]
234#[async_trait]
235impl<C: Connection + Send + Sync> Connection for TracingConnection<C> {
236 async fn execute(&self, sql: &str, params: &[&dyn ToSqlValue]) -> Result<u64, OxiSqlError> {
237 let t = Instant::now();
238 let result = self.inner.execute(sql, params).await;
239 let elapsed = t.elapsed();
240 match &result {
241 Ok(n) => tracing::debug!(
242 "[{}execute] {} row(s) affected — {:.3}ms{}",
243 self.fmt_prefix(),
244 n,
245 elapsed.as_secs_f64() * 1000.0,
246 truncate_sql(sql),
247 ),
248 Err(e) => tracing::warn!(
249 "[{}execute] ERROR {} — {:.3}ms{}",
250 self.fmt_prefix(),
251 e,
252 elapsed.as_secs_f64() * 1000.0,
253 truncate_sql(sql),
254 ),
255 }
256 result
257 }
258
259 async fn query(&self, sql: &str, params: &[&dyn ToSqlValue]) -> Result<Vec<Row>, OxiSqlError> {
260 let t = Instant::now();
261 let result = self.inner.query(sql, params).await;
262 let elapsed = t.elapsed();
263 match &result {
264 Ok(rows) => tracing::debug!(
265 "[{}query] {} row(s) — {:.3}ms{}",
266 self.fmt_prefix(),
267 rows.len(),
268 elapsed.as_secs_f64() * 1000.0,
269 truncate_sql(sql),
270 ),
271 Err(e) => tracing::warn!(
272 "[{}query] ERROR {} — {:.3}ms{}",
273 self.fmt_prefix(),
274 e,
275 elapsed.as_secs_f64() * 1000.0,
276 truncate_sql(sql),
277 ),
278 }
279 result
280 }
281
282 async fn transaction(&self) -> Result<Box<dyn Transaction + '_>, OxiSqlError> {
283 tracing::debug!("[{}transaction] BEGIN", self.fmt_prefix());
284 self.inner.transaction().await
285 }
286
287 async fn execute_batch(&self, sql: &str) -> Result<u64, OxiSqlError> {
288 let t = Instant::now();
289 let result = self.inner.execute_batch(sql).await;
290 tracing::debug!(
291 "[{}execute_batch] {:.3}ms{}",
292 self.fmt_prefix(),
293 t.elapsed().as_secs_f64() * 1000.0,
294 truncate_sql(sql),
295 );
296 result
297 }
298
299 async fn ping(&self) -> Result<(), OxiSqlError> {
300 tracing::debug!("[{}ping]", self.fmt_prefix());
301 self.inner.ping().await
302 }
303
304 async fn prepare(&self, sql: &str) -> Result<Box<dyn PreparedStatement + '_>, OxiSqlError> {
305 tracing::debug!("[{}prepare]{}", self.fmt_prefix(), truncate_sql(sql));
306 self.inner.prepare(sql).await
307 }
308
309 async fn tables(&self) -> Result<Vec<TableInfo>, OxiSqlError> {
310 self.inner.tables().await
311 }
312
313 async fn columns(&self, table: &str) -> Result<Vec<ColumnInfo>, OxiSqlError> {
314 self.inner.columns(table).await
315 }
316
317 async fn indexes(&self, table: &str) -> Result<Vec<IndexInfo>, OxiSqlError> {
318 self.inner.indexes(table).await
319 }
320
321 async fn foreign_keys(&self, table: &str) -> Result<Vec<ForeignKeyInfo>, OxiSqlError> {
322 self.inner.foreign_keys(table).await
323 }
324}
325
326#[cfg(all(test, feature = "tracing"))]
327mod tracing_tests {
328 use std::sync::{Arc, Mutex};
329
330 use tracing_subscriber::fmt::MakeWriter;
331
332 use crate::{Connection, OxiSqlError, Row, ToSqlValue, Transaction, Value};
333
334 use super::TracingConnection;
335
336 #[derive(Clone, Default)]
339 struct MemWriter(Arc<Mutex<Vec<u8>>>);
340
341 impl MemWriter {
342 fn contents(&self) -> String {
343 let data = self.0.lock().expect("lock");
344 String::from_utf8_lossy(&data).into_owned()
345 }
346 }
347
348 impl std::io::Write for MemWriter {
349 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
350 self.0.lock().expect("lock").extend_from_slice(buf);
351 Ok(buf.len())
352 }
353 fn flush(&mut self) -> std::io::Result<()> {
354 Ok(())
355 }
356 }
357
358 impl<'a> MakeWriter<'a> for MemWriter {
359 type Writer = MemWriter;
360 fn make_writer(&'a self) -> Self::Writer {
361 self.clone()
362 }
363 }
364
365 struct MockConn;
368
369 #[async_trait::async_trait]
370 impl Connection for MockConn {
371 async fn execute(
372 &self,
373 _sql: &str,
374 _params: &[&dyn ToSqlValue],
375 ) -> Result<u64, OxiSqlError> {
376 Ok(1)
377 }
378
379 async fn query(
380 &self,
381 _sql: &str,
382 _params: &[&dyn ToSqlValue],
383 ) -> Result<Vec<Row>, OxiSqlError> {
384 Ok(vec![Row::new(vec!["n".into()], vec![Value::I64(42)])])
385 }
386
387 async fn transaction(&self) -> Result<Box<dyn Transaction + '_>, OxiSqlError> {
388 Err(OxiSqlError::Other("no txn".into()))
389 }
390 }
391
392 #[tokio::test]
395 async fn tracing_execute_emits_event() {
396 let writer = MemWriter::default();
397 let subscriber = tracing_subscriber::fmt()
398 .with_writer(writer.clone())
399 .with_ansi(false)
400 .with_max_level(tracing::Level::DEBUG)
401 .finish();
402 let _guard = tracing::subscriber::set_default(subscriber);
403
404 let conn = TracingConnection::new(MockConn);
405 let rows_affected = conn
406 .execute("INSERT INTO t VALUES ($1)", &[&42i64])
407 .await
408 .expect("execute ok");
409 assert_eq!(rows_affected, 1);
410
411 let output = writer.contents();
412 assert!(
413 output.contains("execute"),
414 "expected 'execute' in: {output}"
415 );
416 }
417
418 #[tokio::test]
419 async fn tracing_query_emits_event() {
420 let writer = MemWriter::default();
421 let subscriber = tracing_subscriber::fmt()
422 .with_writer(writer.clone())
423 .with_ansi(false)
424 .with_max_level(tracing::Level::DEBUG)
425 .finish();
426 let _guard = tracing::subscriber::set_default(subscriber);
427
428 let conn = TracingConnection::new(MockConn);
429 let rows = conn.query("SELECT n FROM t", &[]).await.expect("query ok");
430 assert_eq!(rows.len(), 1);
431
432 let output = writer.contents();
433 assert!(output.contains("query"), "expected 'query' in: {output}");
434 }
435
436 #[test]
437 fn tracing_conn_prefix_and_accessors() {
438 let conn = TracingConnection::with_prefix(MockConn, "mydb");
439 assert_eq!(conn.prefix(), "mydb");
440 let _inner: MockConn = conn.into_inner();
442 }
443}
444
445#[derive(Debug, Default)]
449pub struct ConnectionMetrics {
450 pub executes: AtomicU64,
452 pub queries: AtomicU64,
454 pub errors: AtomicU64,
456 pub execute_us: AtomicU64,
458 pub query_us: AtomicU64,
460}
461
462impl ConnectionMetrics {
463 pub fn snapshot(&self) -> MetricsSnapshot {
465 MetricsSnapshot {
466 executes: self.executes.load(Ordering::Relaxed),
467 queries: self.queries.load(Ordering::Relaxed),
468 errors: self.errors.load(Ordering::Relaxed),
469 execute_us: self.execute_us.load(Ordering::Relaxed),
470 query_us: self.query_us.load(Ordering::Relaxed),
471 }
472 }
473}
474
475#[derive(Debug, Clone, PartialEq, Eq)]
477pub struct MetricsSnapshot {
478 pub executes: u64,
480 pub queries: u64,
482 pub errors: u64,
484 pub execute_us: u64,
486 pub query_us: u64,
488}
489
490pub struct MetricsConnection<C> {
509 inner: C,
510 metrics: Arc<ConnectionMetrics>,
511}
512
513impl<C: Connection> MetricsConnection<C> {
514 pub fn new(inner: C, metrics: Arc<ConnectionMetrics>) -> Self {
516 Self { inner, metrics }
517 }
518
519 pub fn metrics(&self) -> &Arc<ConnectionMetrics> {
521 &self.metrics
522 }
523
524 pub fn into_inner(self) -> C {
526 self.inner
527 }
528}
529
530#[async_trait]
531impl<C: Connection + Send + Sync> Connection for MetricsConnection<C> {
532 async fn execute(&self, sql: &str, params: &[&dyn ToSqlValue]) -> Result<u64, OxiSqlError> {
533 let t = Instant::now();
534 let result = self.inner.execute(sql, params).await;
535 let us = t.elapsed().as_micros() as u64;
536 self.metrics.executes.fetch_add(1, Ordering::Relaxed);
537 self.metrics.execute_us.fetch_add(us, Ordering::Relaxed);
538 if result.is_err() {
539 self.metrics.errors.fetch_add(1, Ordering::Relaxed);
540 }
541 result
542 }
543
544 async fn query(&self, sql: &str, params: &[&dyn ToSqlValue]) -> Result<Vec<Row>, OxiSqlError> {
545 let t = Instant::now();
546 let result = self.inner.query(sql, params).await;
547 let us = t.elapsed().as_micros() as u64;
548 self.metrics.queries.fetch_add(1, Ordering::Relaxed);
549 self.metrics.query_us.fetch_add(us, Ordering::Relaxed);
550 if result.is_err() {
551 self.metrics.errors.fetch_add(1, Ordering::Relaxed);
552 }
553 result
554 }
555
556 async fn transaction(&self) -> Result<Box<dyn Transaction + '_>, OxiSqlError> {
557 self.inner.transaction().await
558 }
559
560 async fn execute_batch(&self, sql: &str) -> Result<u64, OxiSqlError> {
561 self.inner.execute_batch(sql).await
562 }
563
564 async fn ping(&self) -> Result<(), OxiSqlError> {
565 self.inner.ping().await
566 }
567
568 async fn prepare(&self, sql: &str) -> Result<Box<dyn PreparedStatement + '_>, OxiSqlError> {
569 self.inner.prepare(sql).await
570 }
571
572 async fn tables(&self) -> Result<Vec<TableInfo>, OxiSqlError> {
573 self.inner.tables().await
574 }
575
576 async fn columns(&self, table: &str) -> Result<Vec<ColumnInfo>, OxiSqlError> {
577 self.inner.columns(table).await
578 }
579
580 async fn indexes(&self, table: &str) -> Result<Vec<IndexInfo>, OxiSqlError> {
581 self.inner.indexes(table).await
582 }
583
584 async fn foreign_keys(&self, table: &str) -> Result<Vec<ForeignKeyInfo>, OxiSqlError> {
585 self.inner.foreign_keys(table).await
586 }
587}
588
589pub type RetryPredicate = Arc<dyn Fn(&OxiSqlError) -> bool + Send + Sync>;
594
595#[derive(Clone)]
600pub struct RetryPolicy {
601 pub max_retries: u32,
603 pub initial_delay_ms: u64,
605 pub backoff_factor: f64,
607 pub max_delay_ms: u64,
609 pub predicate: RetryPredicate,
611}
612
613fn default_retry_predicate() -> RetryPredicate {
614 Arc::new(|e: &OxiSqlError| match e {
615 OxiSqlError::Timeout(_) => true,
616 OxiSqlError::Execution(msg) => {
617 msg.contains("connection reset")
618 || msg.contains("broken pipe")
619 || msg.contains("connection refused")
620 || msg.contains("timed out")
621 || msg.contains("temporarily unavailable")
622 }
623 _ => false,
624 })
625}
626
627impl Default for RetryPolicy {
628 fn default() -> Self {
629 Self {
630 max_retries: 3,
631 initial_delay_ms: 100,
632 backoff_factor: 2.0,
633 max_delay_ms: 5_000,
634 predicate: default_retry_predicate(),
635 }
636 }
637}
638
639impl std::fmt::Debug for RetryPolicy {
640 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
641 f.debug_struct("RetryPolicy")
642 .field("max_retries", &self.max_retries)
643 .field("initial_delay_ms", &self.initial_delay_ms)
644 .field("backoff_factor", &self.backoff_factor)
645 .field("max_delay_ms", &self.max_delay_ms)
646 .finish()
647 }
648}
649
650pub struct RetryConnection<C> {
671 inner: C,
672 policy: RetryPolicy,
673}
674
675impl<C: Connection> RetryConnection<C> {
676 pub fn new(inner: C, policy: RetryPolicy) -> Self {
678 Self { inner, policy }
679 }
680
681 pub fn inner(&self) -> &C {
683 &self.inner
684 }
685
686 pub fn into_inner(self) -> C {
688 self.inner
689 }
690
691 pub(crate) fn delay_ms(&self, attempt: u32) -> u64 {
693 let delay =
694 self.policy.initial_delay_ms as f64 * self.policy.backoff_factor.powi(attempt as i32);
695 (delay as u64).min(self.policy.max_delay_ms)
696 }
697}
698
699#[async_trait]
700impl<C: Connection + Send + Sync> Connection for RetryConnection<C> {
701 async fn execute(&self, sql: &str, params: &[&dyn ToSqlValue]) -> Result<u64, OxiSqlError> {
702 let mut last_err: Option<OxiSqlError> = None;
703 for attempt in 0..=self.policy.max_retries {
704 match self.inner.execute(sql, params).await {
705 Ok(n) => return Ok(n),
706 Err(e) => {
707 if attempt < self.policy.max_retries && (self.policy.predicate)(&e) {
708 let delay = self.delay_ms(attempt);
709 tokio::time::sleep(tokio::time::Duration::from_millis(delay)).await;
710 last_err = Some(e);
711 } else {
712 return Err(e);
713 }
714 }
715 }
716 }
717 Err(last_err.unwrap_or_else(|| OxiSqlError::Other("retry exhausted".into())))
718 }
719
720 async fn query(&self, sql: &str, params: &[&dyn ToSqlValue]) -> Result<Vec<Row>, OxiSqlError> {
721 let mut last_err: Option<OxiSqlError> = None;
722 for attempt in 0..=self.policy.max_retries {
723 match self.inner.query(sql, params).await {
724 Ok(rows) => return Ok(rows),
725 Err(e) => {
726 if attempt < self.policy.max_retries && (self.policy.predicate)(&e) {
727 let delay = self.delay_ms(attempt);
728 tokio::time::sleep(tokio::time::Duration::from_millis(delay)).await;
729 last_err = Some(e);
730 } else {
731 return Err(e);
732 }
733 }
734 }
735 }
736 Err(last_err.unwrap_or_else(|| OxiSqlError::Other("retry exhausted".into())))
737 }
738
739 async fn transaction(&self) -> Result<Box<dyn crate::traits::Transaction + '_>, OxiSqlError> {
740 self.inner.transaction().await
742 }
743
744 async fn execute_batch(&self, sql: &str) -> Result<u64, OxiSqlError> {
745 self.inner.execute_batch(sql).await
748 }
749
750 async fn ping(&self) -> Result<(), OxiSqlError> {
751 let mut last_err: Option<OxiSqlError> = None;
752 for attempt in 0..=self.policy.max_retries {
753 match self.inner.ping().await {
754 Ok(()) => return Ok(()),
755 Err(e) => {
756 if attempt < self.policy.max_retries && (self.policy.predicate)(&e) {
757 let delay = self.delay_ms(attempt);
758 tokio::time::sleep(tokio::time::Duration::from_millis(delay)).await;
759 last_err = Some(e);
760 } else {
761 return Err(e);
762 }
763 }
764 }
765 }
766 Err(last_err.unwrap_or_else(|| OxiSqlError::Other("retry exhausted".into())))
767 }
768
769 async fn prepare(
770 &self,
771 sql: &str,
772 ) -> Result<Box<dyn crate::PreparedStatement + '_>, OxiSqlError> {
773 self.inner.prepare(sql).await
774 }
775
776 async fn tables(&self) -> Result<Vec<TableInfo>, OxiSqlError> {
777 self.inner.tables().await
778 }
779
780 async fn columns(&self, table: &str) -> Result<Vec<ColumnInfo>, OxiSqlError> {
781 self.inner.columns(table).await
782 }
783
784 async fn indexes(&self, table: &str) -> Result<Vec<IndexInfo>, OxiSqlError> {
785 self.inner.indexes(table).await
786 }
787
788 async fn foreign_keys(&self, table: &str) -> Result<Vec<ForeignKeyInfo>, OxiSqlError> {
789 self.inner.foreign_keys(table).await
790 }
791}
792
793fn truncate_sql(sql: &str) -> String {
797 const MAX: usize = 80;
798 let trimmed = sql.trim();
799 if trimmed.len() <= MAX {
800 format!(" | {trimmed}")
801 } else {
802 let cut = trimmed
804 .char_indices()
805 .nth(MAX)
806 .map(|(i, _)| i)
807 .unwrap_or(trimmed.len());
808 format!(" | {}…", &trimmed[..cut])
809 }
810}