use async_trait::async_trait;
use serde::{Serialize, de::DeserializeOwned};
use sqlx::{FromRow, Pool, Postgres};
use std::marker::PhantomData;
use crate::error::CoreError;
use crate::domain::database::DatabaseAdapter;
use crate::domain::query::{IntoDbFilter, QueryBuilder};
use crate::infrastructure::database::bindings::traits::{IdResult, PgInsert};
pub struct PostgreSQLAdapter<T: Send + Sync> {
pool: Pool<Postgres>,
table: String,
_phantom: PhantomData<T>,
}
impl<T: Send + Sync> PostgreSQLAdapter<T> {
pub fn new(pool: &Pool<Postgres>, table: &str) -> Self {
PostgreSQLAdapter {
pool: pool.clone(),
table: table.into(),
_phantom: PhantomData,
}
}
}
#[async_trait]
impl<T> DatabaseAdapter<T> for PostgreSQLAdapter<T>
where
T: Send
+ Sync
+ Serialize
+ DeserializeOwned
+ 'static
+ for<'r> FromRow<'r, sqlx::postgres::PgRow>
+ Unpin
+ PgInsert,
{
async fn insert(&self, data: T) -> Result<String, CoreError> {
let columns: String = T::columns()
.iter()
.map(|k| format!("\"{}\"", k))
.collect::<Vec<String>>()
.join(", ");
let values = T::columns()
.iter()
.enumerate()
.map(|(i, _)| format!("${}", i + 1))
.collect::<Vec<String>>()
.join(", ");
let sql = format!(
"INSERT INTO {} ({}) VALUES ({}) RETURNING id",
self.table, columns, values
);
let query = sqlx::query_as::<_, IdResult>(&sql);
let row: IdResult = data.bind_query(query).fetch_one(&self.pool).await?;
Ok(row.id)
}
async fn insert_many(&self, data: Vec<T>) -> Result<Vec<String>, CoreError> {
let columns: String = T::columns()
.iter()
.map(|k| format!("\"{}\"", k))
.collect::<Vec<String>>()
.join(", ");
let mut count = 0;
let mut rows: Vec<String> = Vec::new();
for _ in 0..data.len() {
let values = T::columns()
.iter()
.map(|_| {
count += 1;
format!("${}", count)
})
.collect::<Vec<String>>()
.join(", ");
rows.push(format!("({values})"));
}
let placeholders = rows.join(", ");
let sql = format!(
"INSERT INTO {} ({}) VALUES {} RETURNING id",
self.table, columns, placeholders
);
let query = sqlx::query_as::<_, IdResult>(&sql);
let query = data.into_iter().fold(query, |q, d| d.bind_query(q));
let rows: Vec<IdResult> = query.fetch_all(&self.pool).await?;
Ok(rows.into_iter().map(|r| r.id).collect())
}
async fn upsert(&self, data: T) -> Result<String, CoreError> {
let columns: String = T::columns()
.iter()
.map(|k| format!("\"{}\"", k))
.collect::<Vec<String>>()
.join(", ");
let values = T::columns()
.iter()
.enumerate()
.map(|(i, _)| format!("${}", i + 1))
.collect::<Vec<String>>()
.join(", ");
let conflict_vals = T::uniques().join(", ");
let sql = format!(
"INSERT INTO {} ({}) VALUES ({}) ON CONFLICT ({}) DO UPDATE SET id = {}.id RETURNING id",
self.table, columns, values, conflict_vals, self.table
);
let query = sqlx::query_as::<_, IdResult>(&sql);
let row: IdResult = data.bind_query(query).fetch_one(&self.pool).await?;
Ok(row.id)
}
async fn upsert_many(&self, data: Vec<T>) -> Result<Vec<String>, CoreError> {
let columns: String = T::columns()
.iter()
.map(|k| format!("\"{}\"", k))
.collect::<Vec<String>>()
.join(", ");
let conflict_vals = T::uniques().join(", ");
let mut count = 0;
let mut rows: Vec<String> = Vec::new();
for _ in 0..data.len() {
let row = T::columns()
.iter()
.map(|_| {
count += 1;
format!("${}", count)
})
.collect::<Vec<String>>()
.join(", ");
rows.push(format!("({row})"));
}
let placeholders = rows.join(", ");
let sql = format!(
"INSERT INTO {} ({}) VALUES {} ON CONFLICT ({}) DO UPDATE SET id = {}.id RETURNING id",
self.table, columns, placeholders, conflict_vals, self.table
);
let query = sqlx::query_as::<_, IdResult>(&sql);
let query = data.into_iter().fold(query, |q, d| d.bind_query(q));
let rows: Vec<IdResult> = query.fetch_all(&self.pool).await?;
Ok(rows.into_iter().map(|r| r.id).collect())
}
async fn find_all(&self, filter: QueryBuilder) -> Result<Vec<T>, CoreError> {
let (where_clause, values) = filter.into_postgres_filter(0);
let sql = format!("SELECT * FROM {} WHERE {}", self.table, where_clause);
let mut query = sqlx::query_as::<_, T>(&sql);
for v in values {
query = v.bind_pg(query);
}
query.fetch_all(&self.pool).await.map_err(Into::into)
}
async fn find_one(&self, filter: QueryBuilder) -> Result<Option<T>, CoreError> {
let (where_clause, values) = filter.into_postgres_filter(0);
let sql = format!("SELECT * FROM {} WHERE {}", self.table, where_clause);
let mut query = sqlx::query_as::<_, T>(&sql);
for v in values {
query = v.bind_pg(query);
}
query.fetch_optional(&self.pool).await.map_err(Into::into)
}
async fn find_one_and_update(
&self,
filter: QueryBuilder,
update: QueryBuilder,
) -> Result<Option<T>, CoreError> {
let (set_clause, update_values) = update.into_postgres_update();
let offset = update_values.len();
let (where_clause, filter_values) = filter.into_postgres_filter(offset);
let sql = format!(
"UPDATE {} SET {} WHERE {} RETURNING *",
self.table, set_clause, where_clause
);
let mut query = sqlx::query_as::<_, T>(&sql);
for v in update_values.into_iter().chain(filter_values) {
query = v.bind_pg(query);
}
query.fetch_optional(&self.pool).await.map_err(Into::into)
}
async fn update_many(
&self,
filter: QueryBuilder,
update: QueryBuilder,
) -> Result<(), CoreError> {
let (set_clause, update_values) = update.into_postgres_update();
let offset = update_values.len();
let (where_clause, filter_values) = filter.into_postgres_filter(offset);
let sql = format!(
"UPDATE {} SET {} WHERE {} RETURNING *",
self.table, set_clause, where_clause
);
let mut query = sqlx::query_as::<_, T>(&sql);
for v in update_values.into_iter().chain(filter_values) {
query = v.bind_pg(query);
}
query.fetch_optional(&self.pool).await?;
Ok(())
}
async fn delete_one(&self, filter: QueryBuilder) -> Result<(), CoreError> {
let (where_clause, values) = filter.into_postgres_filter(0);
let sql = format!("DELETE FROM {} WHERE {}", self.table, where_clause);
let mut query = sqlx::query_as::<_, T>(&sql);
for v in values {
query = v.bind_pg(query);
}
query.fetch_optional(&self.pool).await?;
Ok(())
}
async fn delete_many(&self, filter: QueryBuilder) -> Result<(), CoreError> {
let (where_clause, values) = filter.into_postgres_filter(0);
let sql = format!("DELETE FROM {} WHERE {}", self.table, where_clause);
let mut query = sqlx::query_as::<_, T>(&sql);
for v in values {
query = v.bind_pg(query);
}
query.fetch_optional(&self.pool).await?;
Ok(())
}
}