1use crate::error::Result;
14use crate::row::Row;
15use crate::value::Value;
16use asupersync::{Cx, Outcome};
17
18#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
23pub enum IsolationLevel {
24 ReadUncommitted,
28
29 #[default]
33 ReadCommitted,
34
35 RepeatableRead,
39
40 Serializable,
44}
45
46impl IsolationLevel {
47 #[must_use]
49 pub const fn as_sql(&self) -> &'static str {
50 match self {
51 IsolationLevel::ReadUncommitted => "READ UNCOMMITTED",
52 IsolationLevel::ReadCommitted => "READ COMMITTED",
53 IsolationLevel::RepeatableRead => "REPEATABLE READ",
54 IsolationLevel::Serializable => "SERIALIZABLE",
55 }
56 }
57}
58
59#[derive(Debug, Clone)]
65pub struct PreparedStatement {
66 id: u64,
68 sql: String,
70 param_count: usize,
72 columns: Option<Vec<String>>,
74}
75
76impl PreparedStatement {
77 #[must_use]
81 pub fn new(id: u64, sql: String, param_count: usize) -> Self {
82 Self {
83 id,
84 sql,
85 param_count,
86 columns: None,
87 }
88 }
89
90 #[must_use]
92 pub fn with_columns(id: u64, sql: String, param_count: usize, columns: Vec<String>) -> Self {
93 Self {
94 id,
95 sql,
96 param_count,
97 columns: Some(columns),
98 }
99 }
100
101 #[must_use]
103 pub const fn id(&self) -> u64 {
104 self.id
105 }
106
107 #[must_use]
109 pub fn sql(&self) -> &str {
110 &self.sql
111 }
112
113 #[must_use]
115 pub const fn param_count(&self) -> usize {
116 self.param_count
117 }
118
119 #[must_use]
121 pub fn columns(&self) -> Option<&[String]> {
122 self.columns.as_deref()
123 }
124
125 #[must_use]
127 pub fn validate_params(&self, params: &[Value]) -> bool {
128 params.len() == self.param_count
129 }
130}
131
132#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
155pub enum Dialect {
156 #[default]
158 Postgres,
159 Sqlite,
161 Mysql,
163}
164
165impl Dialect {
166 pub fn placeholder(self, index: usize) -> String {
168 match self {
169 Dialect::Postgres => format!("${index}"),
170 Dialect::Sqlite => format!("?{index}"),
171 Dialect::Mysql => "?".to_string(),
172 }
173 }
174
175 pub const fn concat_op(self) -> &'static str {
177 match self {
178 Dialect::Postgres | Dialect::Sqlite => "||",
179 Dialect::Mysql => "", }
181 }
182
183 pub const fn supports_ilike(self) -> bool {
185 matches!(self, Dialect::Postgres)
186 }
187
188 pub fn quote_identifier(self, name: &str) -> String {
194 match self {
195 Dialect::Postgres | Dialect::Sqlite => {
196 let escaped = name.replace('"', "\"\"");
197 format!("\"{escaped}\"")
198 }
199 Dialect::Mysql => {
200 let escaped = name.replace('`', "``");
201 format!("`{escaped}`")
202 }
203 }
204 }
205}
206
207pub trait Connection: Send + Sync {
208 type Tx<'conn>: TransactionOps
210 where
211 Self: 'conn;
212
213 fn dialect(&self) -> Dialect {
218 Dialect::Postgres
219 }
220
221 fn query(
223 &self,
224 cx: &Cx,
225 sql: &str,
226 params: &[Value],
227 ) -> impl Future<Output = Outcome<Vec<Row>, crate::Error>> + Send;
228
229 fn query_one(
231 &self,
232 cx: &Cx,
233 sql: &str,
234 params: &[Value],
235 ) -> impl Future<Output = Outcome<Option<Row>, crate::Error>> + Send;
236
237 fn execute(
239 &self,
240 cx: &Cx,
241 sql: &str,
242 params: &[Value],
243 ) -> impl Future<Output = Outcome<u64, crate::Error>> + Send;
244
245 fn insert(
250 &self,
251 cx: &Cx,
252 sql: &str,
253 params: &[Value],
254 ) -> impl Future<Output = Outcome<i64, crate::Error>> + Send;
255
256 fn batch(
262 &self,
263 cx: &Cx,
264 statements: &[(String, Vec<Value>)],
265 ) -> impl Future<Output = Outcome<Vec<u64>, crate::Error>> + Send;
266
267 fn begin(&self, cx: &Cx) -> impl Future<Output = Outcome<Self::Tx<'_>, crate::Error>> + Send;
269
270 fn begin_with(
272 &self,
273 cx: &Cx,
274 isolation: IsolationLevel,
275 ) -> impl Future<Output = Outcome<Self::Tx<'_>, crate::Error>> + Send;
276
277 fn prepare(
282 &self,
283 cx: &Cx,
284 sql: &str,
285 ) -> impl Future<Output = Outcome<PreparedStatement, crate::Error>> + Send;
286
287 fn query_prepared(
289 &self,
290 cx: &Cx,
291 stmt: &PreparedStatement,
292 params: &[Value],
293 ) -> impl Future<Output = Outcome<Vec<Row>, crate::Error>> + Send;
294
295 fn execute_prepared(
297 &self,
298 cx: &Cx,
299 stmt: &PreparedStatement,
300 params: &[Value],
301 ) -> impl Future<Output = Outcome<u64, crate::Error>> + Send;
302
303 fn ping(&self, cx: &Cx) -> impl Future<Output = Outcome<(), crate::Error>> + Send;
305
306 fn is_valid(&self, cx: &Cx) -> impl Future<Output = bool> + Send {
308 async {
309 match self.ping(cx).await {
310 Outcome::Ok(()) => true,
311 Outcome::Err(_) | Outcome::Cancelled(_) | Outcome::Panicked(_) => false,
312 }
313 }
314 }
315
316 fn close(self, cx: &Cx) -> impl Future<Output = Result<()>> + Send;
318}
319
320pub trait TransactionOps: Send {
339 fn query(
341 &self,
342 cx: &Cx,
343 sql: &str,
344 params: &[Value],
345 ) -> impl Future<Output = Outcome<Vec<Row>, crate::Error>> + Send;
346
347 fn query_one(
349 &self,
350 cx: &Cx,
351 sql: &str,
352 params: &[Value],
353 ) -> impl Future<Output = Outcome<Option<Row>, crate::Error>> + Send;
354
355 fn execute(
357 &self,
358 cx: &Cx,
359 sql: &str,
360 params: &[Value],
361 ) -> impl Future<Output = Outcome<u64, crate::Error>> + Send;
362
363 fn savepoint(
367 &self,
368 cx: &Cx,
369 name: &str,
370 ) -> impl Future<Output = Outcome<(), crate::Error>> + Send;
371
372 fn rollback_to(
377 &self,
378 cx: &Cx,
379 name: &str,
380 ) -> impl Future<Output = Outcome<(), crate::Error>> + Send;
381
382 fn release(
387 &self,
388 cx: &Cx,
389 name: &str,
390 ) -> impl Future<Output = Outcome<(), crate::Error>> + Send;
391
392 fn commit(self, cx: &Cx) -> impl Future<Output = Outcome<(), crate::Error>> + Send;
394
395 fn rollback(self, cx: &Cx) -> impl Future<Output = Outcome<(), crate::Error>> + Send;
397}
398
399pub struct Transaction<'conn> {
408 conn: &'conn dyn TransactionInternal,
410 finalized: bool,
412}
413
414pub trait TransactionInternal: Send + Sync {
419 fn query_internal(
421 &self,
422 cx: &Cx,
423 sql: &str,
424 params: &[Value],
425 ) -> std::pin::Pin<Box<dyn Future<Output = Outcome<Vec<Row>, crate::Error>> + Send + '_>>;
426
427 fn query_one_internal(
429 &self,
430 cx: &Cx,
431 sql: &str,
432 params: &[Value],
433 ) -> std::pin::Pin<Box<dyn Future<Output = Outcome<Option<Row>, crate::Error>> + Send + '_>>;
434
435 fn execute_internal(
437 &self,
438 cx: &Cx,
439 sql: &str,
440 params: &[Value],
441 ) -> std::pin::Pin<Box<dyn Future<Output = Outcome<u64, crate::Error>> + Send + '_>>;
442
443 fn savepoint_internal(
445 &self,
446 cx: &Cx,
447 name: &str,
448 ) -> std::pin::Pin<Box<dyn Future<Output = Outcome<(), crate::Error>> + Send + '_>>;
449
450 fn rollback_to_internal(
452 &self,
453 cx: &Cx,
454 name: &str,
455 ) -> std::pin::Pin<Box<dyn Future<Output = Outcome<(), crate::Error>> + Send + '_>>;
456
457 fn release_internal(
459 &self,
460 cx: &Cx,
461 name: &str,
462 ) -> std::pin::Pin<Box<dyn Future<Output = Outcome<(), crate::Error>> + Send + '_>>;
463
464 fn commit_internal(
466 &self,
467 cx: &Cx,
468 ) -> std::pin::Pin<Box<dyn Future<Output = Outcome<(), crate::Error>> + Send + '_>>;
469
470 fn rollback_internal(
472 &self,
473 cx: &Cx,
474 ) -> std::pin::Pin<Box<dyn Future<Output = Outcome<(), crate::Error>> + Send + '_>>;
475}
476
477impl<'conn> Transaction<'conn> {
478 pub fn new(conn: &'conn dyn TransactionInternal) -> Self {
482 Self {
483 conn,
484 finalized: false,
485 }
486 }
487
488 #[must_use]
490 pub const fn is_finalized(&self) -> bool {
491 self.finalized
492 }
493}
494
495impl TransactionOps for Transaction<'_> {
496 fn query(
497 &self,
498 cx: &Cx,
499 sql: &str,
500 params: &[Value],
501 ) -> impl Future<Output = Outcome<Vec<Row>, crate::Error>> + Send {
502 self.conn.query_internal(cx, sql, params)
503 }
504
505 fn query_one(
506 &self,
507 cx: &Cx,
508 sql: &str,
509 params: &[Value],
510 ) -> impl Future<Output = Outcome<Option<Row>, crate::Error>> + Send {
511 self.conn.query_one_internal(cx, sql, params)
512 }
513
514 fn execute(
515 &self,
516 cx: &Cx,
517 sql: &str,
518 params: &[Value],
519 ) -> impl Future<Output = Outcome<u64, crate::Error>> + Send {
520 self.conn.execute_internal(cx, sql, params)
521 }
522
523 fn savepoint(
524 &self,
525 cx: &Cx,
526 name: &str,
527 ) -> impl Future<Output = Outcome<(), crate::Error>> + Send {
528 self.conn.savepoint_internal(cx, name)
529 }
530
531 fn rollback_to(
532 &self,
533 cx: &Cx,
534 name: &str,
535 ) -> impl Future<Output = Outcome<(), crate::Error>> + Send {
536 self.conn.rollback_to_internal(cx, name)
537 }
538
539 fn release(
540 &self,
541 cx: &Cx,
542 name: &str,
543 ) -> impl Future<Output = Outcome<(), crate::Error>> + Send {
544 self.conn.release_internal(cx, name)
545 }
546
547 async fn commit(mut self, cx: &Cx) -> Outcome<(), crate::Error> {
548 self.finalized = true;
549 self.conn.commit_internal(cx).await
550 }
551
552 async fn rollback(mut self, cx: &Cx) -> Outcome<(), crate::Error> {
553 self.finalized = true;
554 self.conn.rollback_internal(cx).await
555 }
556}
557
558use std::future::Future;
559
560impl Drop for Transaction<'_> {
561 fn drop(&mut self) {
562 if !self.finalized {
563 }
568 }
569}
570
571#[derive(Debug, Clone)]
573pub struct ConnectionConfig {
574 pub url: String,
576 pub connect_timeout_ms: u64,
578 pub query_timeout_ms: u64,
580 pub ssl_mode: SslMode,
582 pub application_name: Option<String>,
584}
585
586#[derive(Debug, Clone, Copy, Default)]
588pub enum SslMode {
589 Disable,
591 #[default]
593 Prefer,
594 Require,
596 VerifyCa,
598 VerifyFull,
600}
601
602impl Default for ConnectionConfig {
603 fn default() -> Self {
604 Self {
605 url: String::new(),
606 connect_timeout_ms: 30_000,
607 query_timeout_ms: 30_000,
608 ssl_mode: SslMode::default(),
609 application_name: None,
610 }
611 }
612}
613
614impl ConnectionConfig {
615 pub fn new(url: impl Into<String>) -> Self {
617 Self {
618 url: url.into(),
619 ..Default::default()
620 }
621 }
622
623 pub fn connect_timeout(mut self, ms: u64) -> Self {
625 self.connect_timeout_ms = ms;
626 self
627 }
628
629 pub fn query_timeout(mut self, ms: u64) -> Self {
631 self.query_timeout_ms = ms;
632 self
633 }
634
635 pub fn ssl_mode(mut self, mode: SslMode) -> Self {
637 self.ssl_mode = mode;
638 self
639 }
640
641 pub fn application_name(mut self, name: impl Into<String>) -> Self {
643 self.application_name = Some(name.into());
644 self
645 }
646}
647
648#[cfg(test)]
649mod tests {
650 use super::*;
651
652 #[test]
653 fn test_isolation_level_default() {
654 let level = IsolationLevel::default();
655 assert_eq!(level, IsolationLevel::ReadCommitted);
656 }
657
658 #[test]
659 fn test_isolation_level_as_sql() {
660 assert_eq!(IsolationLevel::ReadUncommitted.as_sql(), "READ UNCOMMITTED");
661 assert_eq!(IsolationLevel::ReadCommitted.as_sql(), "READ COMMITTED");
662 assert_eq!(IsolationLevel::RepeatableRead.as_sql(), "REPEATABLE READ");
663 assert_eq!(IsolationLevel::Serializable.as_sql(), "SERIALIZABLE");
664 }
665
666 #[test]
667 fn test_prepared_statement_new() {
668 let stmt = PreparedStatement::new(1, "SELECT * FROM users WHERE id = $1".to_string(), 1);
669 assert_eq!(stmt.id(), 1);
670 assert_eq!(stmt.sql(), "SELECT * FROM users WHERE id = $1");
671 assert_eq!(stmt.param_count(), 1);
672 assert!(stmt.columns().is_none());
673 }
674
675 #[test]
676 fn test_prepared_statement_with_columns() {
677 let stmt = PreparedStatement::with_columns(
678 2,
679 "SELECT id, name FROM users".to_string(),
680 0,
681 vec!["id".to_string(), "name".to_string()],
682 );
683 assert_eq!(stmt.id(), 2);
684 assert_eq!(stmt.param_count(), 0);
685 assert_eq!(
686 stmt.columns(),
687 Some(&["id".to_string(), "name".to_string()][..])
688 );
689 }
690
691 #[test]
692 fn test_prepared_statement_validate_params() {
693 let stmt = PreparedStatement::new(1, "SELECT $1, $2".to_string(), 2);
694
695 assert!(!stmt.validate_params(&[]));
696 assert!(!stmt.validate_params(&[Value::Int(1)]));
697 assert!(stmt.validate_params(&[Value::Int(1), Value::Int(2)]));
698 assert!(!stmt.validate_params(&[Value::Int(1), Value::Int(2), Value::Int(3)]));
699 }
700
701 #[test]
702 fn test_ssl_mode_default() {
703 let mode = SslMode::default();
704 assert!(matches!(mode, SslMode::Prefer));
705 }
706
707 #[test]
708 fn test_connection_config_builder() {
709 let config = ConnectionConfig::new("postgres://localhost/test")
710 .connect_timeout(5000)
711 .query_timeout(10000)
712 .ssl_mode(SslMode::Require)
713 .application_name("test_app");
714
715 assert_eq!(config.url, "postgres://localhost/test");
716 assert_eq!(config.connect_timeout_ms, 5000);
717 assert_eq!(config.query_timeout_ms, 10000);
718 assert!(matches!(config.ssl_mode, SslMode::Require));
719 assert_eq!(config.application_name, Some("test_app".to_string()));
720 }
721
722 #[test]
723 fn test_connection_config_default() {
724 let config = ConnectionConfig::default();
725 assert_eq!(config.url, "");
726 assert_eq!(config.connect_timeout_ms, 30_000);
727 assert_eq!(config.query_timeout_ms, 30_000);
728 assert!(matches!(config.ssl_mode, SslMode::Prefer));
729 assert!(config.application_name.is_none());
730 }
731}