use modql::field::{HasSeaFields, SeaField, SeaFields};
use modql::filter::{FilterGroups, ListOptions};
use sea_query::{Condition, Expr, PostgresQueryBuilder, Query};
use sea_query_binder::SqlxBinder;
use sqlx::postgres::PgRow;
use sqlx::FromRow;
use sqlx::Row;
use crate::ctx::Session;
use crate::model::base::{
prep_fields_for_create, prep_fields_for_update, CommonIden, DbBmc, LIST_LIMIT_DEFAULT, LIST_LIMIT_MAX,
};
use crate::model::ModelManager;
use crate::model::{Error, Result};
use super::{check_number_of_affected, Id};
pub async fn create<MC, E>(session: &Session, mm: &ModelManager, data: E) -> Result<i64>
where
MC: DbBmc,
E: HasSeaFields,
{
let mut fields = data.not_none_sea_fields();
fields = prep_fields_for_create::<MC>(fields, session);
let (columns, sea_values) = fields.for_sea_insert();
let mut query = Query::insert();
query
.into_table(MC::table_ref())
.columns(columns)
.values(sea_values)?
.returning(Query::returning().columns([CommonIden::Id]));
let (sql, values) = query.build_sqlx(PostgresQueryBuilder);
let sqlx_query = sqlx::query_as_with::<_, (i64,), _>(&sql, values);
let (id,) = mm.dbx().fetch_one(sqlx_query).await?;
Ok(id)
}
pub async fn create_many<MC, E>(session: &Session, mm: &ModelManager, data: Vec<E>) -> Result<Vec<i64>>
where
MC: DbBmc,
E: HasSeaFields,
{
let mut ids = Vec::with_capacity(data.len());
let mut query = Query::insert();
for item in data {
let mut fields = item.not_none_sea_fields();
fields = prep_fields_for_create::<MC>(fields, session);
let (columns, sea_values) = fields.for_sea_insert();
query.into_table(MC::table_ref()).columns(columns.clone()).values(sea_values)?;
}
query.returning(Query::returning().columns([CommonIden::Id]));
let (sql, values) = query.build_sqlx(PostgresQueryBuilder);
let sqlx_query = sqlx::query_as_with::<_, (i64,), _>(&sql, values);
mm.dbx().begin_txn().await?;
let rows = mm.dbx().fetch_all(sqlx_query).await?;
for row in rows {
let (id,): (i64,) = row;
ids.push(id);
}
mm.dbx().commit_txn().await?;
Ok(ids)
}
pub async fn insert<MC, E>(session: &Session, mm: &ModelManager, data: E) -> Result<()>
where
MC: DbBmc,
E: HasSeaFields,
{
let mut fields = data.not_none_sea_fields();
fields = prep_fields_for_create::<MC>(fields, session);
let (columns, sea_values) = fields.for_sea_insert();
let mut query = Query::insert();
query.into_table(MC::table_ref()).columns(columns).values(sea_values)?;
let (sql, values) = query.build_sqlx(PostgresQueryBuilder);
let sqlx_query = sqlx::query_with(&sql, values);
let count = mm.dbx().execute(sqlx_query).await?;
if count == 0 {
Ok(())
} else {
Err(Error::CountFail)
}
}
pub async fn insert_many<MC, E>(session: &Session, mm: &ModelManager, data: impl IntoIterator<Item = E>) -> Result<u64>
where
MC: DbBmc,
E: HasSeaFields,
{
let mut query = Query::insert();
for item in data {
let mut fields = item.not_none_sea_fields();
fields = prep_fields_for_create::<MC>(fields, session);
let (columns, sea_values) = fields.for_sea_insert();
query.into_table(MC::table_ref()).columns(columns).values(sea_values)?;
}
let (sql, values) = query.build_sqlx(PostgresQueryBuilder);
let sqlx_query = sqlx::query_with(&sql, values);
let rows = mm.dbx().execute(sqlx_query).await?;
Ok(rows)
}
pub async fn get<MC, E>(mm: &ModelManager, id: Id) -> Result<E>
where
MC: DbBmc,
E: for<'r> FromRow<'r, PgRow> + Unpin + Send,
E: HasSeaFields,
{
let mut query = Query::select();
query.from(MC::table_ref()).columns(E::sea_column_refs()).and_where(Expr::col(CommonIden::Id).eq(id.clone()));
let (sql, values) = query.build_sqlx(PostgresQueryBuilder);
let sqlx_query = sqlx::query_as_with::<_, E, _>(&sql, values);
let entity = mm.dbx().fetch_optional(sqlx_query).await?.ok_or(Error::EntityNotFound {
schema: MC::SCHEMA,
entity: MC::TABLE,
id,
})?;
Ok(entity)
}
pub async fn first<MC, E, F>(
session: &Session,
mm: &ModelManager,
filter: Option<F>,
list_options: Option<ListOptions>,
) -> Result<Option<E>>
where
MC: DbBmc,
F: Into<FilterGroups>,
E: for<'r> FromRow<'r, PgRow> + Unpin + Send,
E: HasSeaFields,
{
let list_options = match list_options {
Some(mut list_options) => {
list_options.offset = None;
list_options.limit = Some(1);
list_options.order_bys = list_options.order_bys.or_else(|| Some("id".into()));
list_options
}
None => ListOptions {
limit: Some(1),
offset: None,
order_bys: Some("id".into()), },
};
list::<MC, E, F>(session, mm, filter, Some(list_options)).await.map(|item| item.into_iter().next())
}
pub async fn list<MC, E, F>(
_session: &Session,
mm: &ModelManager,
filter: Option<F>,
list_options: Option<ListOptions>,
) -> Result<Vec<E>>
where
MC: DbBmc,
F: Into<FilterGroups>,
E: for<'r> FromRow<'r, PgRow> + Unpin + Send,
E: HasSeaFields,
{
let mut query = Query::select();
query.from(MC::table_ref()).columns(E::sea_column_refs());
if let Some(filter) = filter {
let filters: FilterGroups = filter.into();
let cond: Condition = filters.try_into()?;
query.cond_where(cond);
}
let list_options = compute_list_options(list_options)?;
list_options.apply_to_sea_query(&mut query);
let (sql, values) = query.build_sqlx(PostgresQueryBuilder);
let sqlx_query = sqlx::query_as_with::<_, E, _>(&sql, values);
let entities = mm.dbx().fetch_all(sqlx_query).await?;
Ok(entities)
}
pub async fn count<MC, F>(_session: &Session, mm: &ModelManager, filter: Option<F>) -> Result<i64>
where
MC: DbBmc,
F: Into<FilterGroups>,
{
let db = mm.dbx().db();
let mut query = Query::select().from(MC::table_ref()).expr(Expr::col(sea_query::Asterisk).count()).to_owned();
if let Some(filter) = filter {
let filters: FilterGroups = filter.into();
let cond: Condition = filters.try_into()?;
query.cond_where(cond);
}
let query_str = query.to_string(PostgresQueryBuilder);
let result = sqlx::query(&query_str).fetch_one(db).await.map_err(|_| Error::CountFail)?;
let count: i64 = result.try_get("count").map_err(|_| Error::CountFail)?;
Ok(count)
}
pub async fn update<MC, E>(session: &Session, mm: &ModelManager, id: Id, data: E) -> Result<()>
where
MC: DbBmc,
E: HasSeaFields,
{
let mut fields = data.not_none_sea_fields();
if MC::has_modification_timestamps() {
fields = prep_fields_for_update::<MC>(fields, session);
}
let fields = fields.for_sea_update();
let mut query = Query::update();
query.table(MC::table_ref()).values(fields).and_where(Expr::col(CommonIden::Id).eq(id.clone()));
let (sql, values) = query.build_sqlx(PostgresQueryBuilder);
let sqlx_query = sqlx::query_with(&sql, values);
let count = mm.dbx().execute(sqlx_query).await?;
_check_result::<MC>(count, id)
}
pub async fn delete<MC>(session: &Session, mm: &ModelManager, id: Id) -> Result<()>
where
MC: DbBmc,
{
let (sql, values) = if MC::use_logic_delete() {
let mut fields = SeaFields::new(vec![SeaField::new(CommonIden::LogiscalDeletion, true)]);
if MC::has_modification_timestamps() {
fields = prep_fields_for_update::<MC>(fields, session);
}
let fields = fields.for_sea_update();
Query::update()
.table(MC::table_ref())
.values(fields)
.and_where(Expr::col(CommonIden::Id).eq(id.clone()))
.build_sqlx(PostgresQueryBuilder)
} else {
Query::delete()
.from_table(MC::table_ref())
.and_where(Expr::col(CommonIden::Id).eq(id.clone()))
.build_sqlx(PostgresQueryBuilder)
};
let sqlx_query = sqlx::query_with(&sql, values);
let count = mm.dbx().execute(sqlx_query).await?;
_check_result::<MC>(count, id)
}
fn _check_result<MC>(count: u64, id: Id) -> Result<()>
where
MC: DbBmc,
{
if count == 0 {
Err(Error::EntityNotFound { schema: MC::SCHEMA, entity: MC::SCHEMA, id })
} else {
Ok(())
}
}
pub async fn delete_many<MC>(_session: &Session, mm: &ModelManager, ids: Vec<Id>) -> Result<u64>
where
MC: DbBmc,
{
if ids.is_empty() {
return Ok(0);
}
let ids_len = ids.len();
let mut query = Query::delete();
query.from_table(MC::table_ref()).and_where(Expr::col(CommonIden::Id).is_in(ids));
let (sql, values) = query.build_sqlx(PostgresQueryBuilder);
let sqlx_query = sqlx::query_with(&sql, values);
let n = mm.dbx().execute(sqlx_query).await?;
check_number_of_affected::<MC>(ids_len, n)
}
pub fn compute_list_options(list_options: Option<ListOptions>) -> Result<ListOptions> {
if let Some(mut list_options) = list_options {
if let Some(limit) = list_options.limit {
if limit > LIST_LIMIT_MAX {
return Err(Error::ListLimitOverMax { max: LIST_LIMIT_MAX, actual: limit });
}
}
else {
list_options.limit = Some(LIST_LIMIT_DEFAULT);
}
Ok(list_options)
}
else {
Ok(ListOptions { limit: Some(LIST_LIMIT_DEFAULT), offset: None, order_bys: Some("id".into()) })
}
}