1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
use sqlx::{ postgres::{PgArguments, PgQueryResult, PgRow}, PgPool, Postgres, }; sea_query::sea_query_driver_postgres!(); use sea_query_driver_postgres::bind_query; use crate::{debug_print, error::*, executor::*, DatabaseConnection, Statement}; use super::sqlx_common::*; pub struct SqlxPostgresConnector; pub struct SqlxPostgresPoolConnection { pool: PgPool, } impl SqlxPostgresConnector { pub fn accepts(string: &str) -> bool { string.starts_with("postgres://") } pub async fn connect(string: &str) -> Result<DatabaseConnection, DbErr> { if let Ok(pool) = PgPool::connect(string).await { Ok(DatabaseConnection::SqlxPostgresPoolConnection( SqlxPostgresPoolConnection { pool }, )) } else { Err(DbErr::Conn("Failed to connect.".to_owned())) } } } impl SqlxPostgresConnector { pub fn from_sqlx_postgres_pool(pool: PgPool) -> DatabaseConnection { DatabaseConnection::SqlxPostgresPoolConnection(SqlxPostgresPoolConnection { pool }) } } impl SqlxPostgresPoolConnection { pub async fn execute(&self, stmt: Statement) -> Result<ExecResult, DbErr> { debug_print!("{}", stmt); let query = sqlx_query(&stmt); if let Ok(conn) = &mut self.pool.acquire().await { match query.execute(conn).await { Ok(res) => Ok(res.into()), Err(err) => Err(sqlx_error_to_exec_err(err)), } } else { Err(DbErr::Exec( "Failed to acquire connection from pool.".to_owned(), )) } } pub async fn query_one(&self, stmt: Statement) -> Result<Option<QueryResult>, DbErr> { debug_print!("{}", stmt); let query = sqlx_query(&stmt); if let Ok(conn) = &mut self.pool.acquire().await { match query.fetch_one(conn).await { Ok(row) => Ok(Some(row.into())), Err(err) => match err { sqlx::Error::RowNotFound => Ok(None), _ => Err(DbErr::Query(err.to_string())), }, } } else { Err(DbErr::Query( "Failed to acquire connection from pool.".to_owned(), )) } } pub async fn query_all(&self, stmt: Statement) -> Result<Vec<QueryResult>, DbErr> { debug_print!("{}", stmt); let query = sqlx_query(&stmt); if let Ok(conn) = &mut self.pool.acquire().await { match query.fetch_all(conn).await { Ok(rows) => Ok(rows.into_iter().map(|r| r.into()).collect()), Err(err) => Err(sqlx_error_to_query_err(err)), } } else { Err(DbErr::Query( "Failed to acquire connection from pool.".to_owned(), )) } } } impl From<PgRow> for QueryResult { fn from(row: PgRow) -> QueryResult { QueryResult { row: QueryResultRow::SqlxPostgres(row), } } } impl From<PgQueryResult> for ExecResult { fn from(result: PgQueryResult) -> ExecResult { ExecResult { result: ExecResultHolder::SqlxPostgres { last_insert_id: 0, rows_affected: result.rows_affected(), }, } } } pub(crate) fn query_result_into_exec_result(res: QueryResult) -> Result<ExecResult, DbErr> { let last_insert_id: i32 = res.try_get("", "last_insert_id")?; Ok(ExecResult { result: ExecResultHolder::SqlxPostgres { last_insert_id: last_insert_id as u64, rows_affected: 0, }, }) } fn sqlx_query(stmt: &Statement) -> sqlx::query::Query<'_, Postgres, PgArguments> { let mut query = sqlx::query(&stmt.sql); if let Some(values) = &stmt.values { query = bind_query(query, values); } query }