use crate::error::{Error, Result};
use crate::soft_delete::{SoftDeleteScope, query_scope_for};
#[allow(unused_imports)]
pub use sea_orm::{
ActiveModelBehavior, ActiveModelTrait, ActiveValue, ColumnTrait, ColumnType, Condition,
ConnectOptions, ConnectionTrait, Database as SeaDatabase, DatabaseConnection,
DatabaseTransaction, DbBackend, DbErr, DeleteMany, DeriveEntityModel, DeriveRelation,
EntityTrait, EnumIter, ExecResult, FromQueryResult, Iden, IntoActiveModel, Iterable,
LoaderTrait, ModelTrait, PaginatorTrait, QueryFilter, QueryOrder, QuerySelect, QueryTrait,
Related, RelationDef, RelationTrait, Statement, TransactionTrait, TryGetable, Value,
entity::prelude::*,
sea_query::{Alias, Asterisk, Expr, ExprTrait},
};
#[doc(hidden)]
pub trait InternalModel: crate::model::ModelMeta + Sized + Send + Sync + Clone {
type Entity: EntityTrait;
type ActiveModel: ActiveModelTrait<Entity = Self::Entity> + ActiveModelBehavior + Send;
fn into_active_model(self) -> Self::ActiveModel;
fn from_sea_model(model: <Self::Entity as EntityTrait>::Model) -> Self;
fn to_sea_model(&self) -> <Self::Entity as EntityTrait>::Model;
fn column_from_str(name: &str) -> Option<<Self::Entity as EntityTrait>::Column>;
fn primary_key_columns() -> Vec<<Self::Entity as EntityTrait>::Column> {
Vec::new()
}
fn primary_key_condition(
primary_key: &<Self as crate::model::ModelMeta>::PrimaryKey,
) -> Condition;
fn primary_key_column() -> Option<<Self::Entity as EntityTrait>::Column> {
Self::primary_key_columns().into_iter().next()
}
fn refresh_runtime_relations_from(&mut self, _previous: &Self) {}
}
#[doc(hidden)]
pub struct InternalConnection {
pub(crate) conn: DatabaseConnection,
}
impl InternalConnection {
pub async fn connect(url: &str) -> Result<Self> {
let conn = SeaDatabase::connect(url)
.await
.map_err(|e| Error::connection(e.to_string()))?;
Ok(Self { conn })
}
pub fn connection(&self) -> &DatabaseConnection {
&self.conn
}
}
pub(crate) fn translate_error(err: DbErr) -> Error {
match err {
DbErr::RecordNotFound(msg) => Error::not_found(msg),
DbErr::ConnectionAcquire(e) => Error::connection(e.to_string()),
DbErr::Conn(e) => Error::connection(e.to_string()),
DbErr::Exec(e) => Error::query(e.to_string()),
DbErr::Query(e) => Error::query(e.to_string()),
DbErr::ConvertFromU64(msg) => Error::conversion(msg),
DbErr::UnpackInsertId => Error::query("Failed to get insert ID".to_string()),
DbErr::UpdateGetPrimaryKey => {
Error::query("Failed to get primary key after update".to_string())
}
DbErr::Custom(msg) => Error::internal(msg),
_ => Error::internal(err.to_string()),
}
}
fn model_error_context<M>(query: impl Into<String>) -> crate::error::ErrorContext
where
M: crate::model::Model,
{
crate::error::ErrorContext::new()
.table(M::table_name())
.query(query.into())
}
fn supports_batch_insert_returning(
configured_db_type: Option<crate::config::DatabaseType>,
backend: DbBackend,
) -> bool {
if let Some(db_type) = configured_db_type {
return match db_type {
crate::config::DatabaseType::Postgres => matches!(backend, DbBackend::Postgres),
crate::config::DatabaseType::MariaDB => matches!(backend, DbBackend::MySql),
crate::config::DatabaseType::MySQL | crate::config::DatabaseType::SQLite => false,
};
}
matches!(backend, DbBackend::Postgres)
}
pub(crate) fn count_to_u64(count: i64, context: &str) -> Result<u64> {
u64::try_from(count).map_err(|_| {
Error::query(format!(
"Database returned a negative count ({count}) for {context}"
))
})
}
fn build_count_select<M>(condition: Option<Condition>) -> Select<M::Entity>
where
M: InternalModel + crate::model::Model,
{
let mut select = scoped_find::<M>()
.select_only()
.column_as(Expr::col(Asterisk).count(), "count");
if let Some(condition) = condition {
select = select.filter(condition);
}
select
}
fn build_exists_any_select<M>() -> Select<M::Entity>
where
M: InternalModel + crate::model::Model,
{
scoped_find::<M>()
.select_only()
.column_as(Expr::val(1), "exists_result")
.limit(1)
}
fn scoped_find<M>() -> Select<M::Entity>
where
M: InternalModel + crate::model::Model,
{
let mut select = M::Entity::find();
if matches!(
query_scope_for::<M>(false, false),
SoftDeleteScope::ActiveOnly
) && let Some(deleted_at_column) = M::column_from_str(M::deleted_at_column())
{
select = select.filter(deleted_at_column.is_null());
}
select
}
#[doc(hidden)]
pub struct QueryExecutor;
impl QueryExecutor {
pub async fn find_all<M, C>(conn: &C) -> Result<Vec<M>>
where
M: InternalModel + crate::model::Model,
C: ConnectionTrait,
{
let results = scoped_find::<M>().all(conn);
let results = crate::profiling::__profile_future(results)
.await
.map_err(translate_error)
.map_err(|err| err.with_context(model_error_context::<M>("find_all()")))?;
Ok(results.into_iter().map(M::from_sea_model).collect())
}
pub async fn first<M, C>(conn: &C) -> Result<Option<M>>
where
M: InternalModel + crate::model::Model,
C: ConnectionTrait,
{
let result = scoped_find::<M>().one(conn);
let result = crate::profiling::__profile_future(result)
.await
.map_err(translate_error)
.map_err(|err| err.with_context(model_error_context::<M>("first()")))?;
Ok(result.map(M::from_sea_model))
}
pub async fn last<M, C>(conn: &C) -> Result<Option<M>>
where
M: InternalModel + crate::model::Model,
C: ConnectionTrait,
{
let mut select = scoped_find::<M>();
let mut query_label = String::from("last()");
let pk_columns = M::primary_key_columns();
if !pk_columns.is_empty() {
for pk_col in pk_columns {
select = select.order_by_desc(pk_col);
}
query_label = format!("last(order_by={} desc)", M::primary_key_names().join(", "));
}
let result = select.one(conn);
let result = crate::profiling::__profile_future(result)
.await
.map_err(translate_error)
.map_err(|err| err.with_context(model_error_context::<M>(query_label)))?;
Ok(result.map(M::from_sea_model))
}
pub async fn count<M, C>(conn: &C, condition: Option<Condition>) -> Result<u64>
where
M: InternalModel + crate::model::Model,
C: ConnectionTrait,
{
#[derive(Debug, FromQueryResult)]
struct CountResult {
count: i64,
}
let result = build_count_select::<M>(condition)
.into_model::<CountResult>()
.one(conn);
let result: Option<CountResult> = crate::profiling::__profile_future(result)
.await
.map_err(translate_error)
.map_err(|err| err.with_context(model_error_context::<M>("count(*)")))?;
result
.map(|r| count_to_u64(r.count, "count(*)"))
.transpose()
.map(|count| count.unwrap_or(0))
}
pub async fn exists_any<M, C>(conn: &C) -> Result<bool>
where
M: InternalModel + crate::model::Model,
C: ConnectionTrait,
{
#[derive(Debug, FromQueryResult)]
struct ExistsResult {
exists_result: i32,
}
let result = build_exists_any_select::<M>()
.into_model::<ExistsResult>()
.one(conn);
let result: Option<ExistsResult> = crate::profiling::__profile_future(result)
.await
.map_err(translate_error)
.map_err(|err| err.with_context(model_error_context::<M>("exists_any()")))?;
let _ = result.as_ref().map(|row| row.exists_result);
Ok(result.is_some())
}
pub async fn paginate<M, C>(conn: &C, limit: u64, offset: u64) -> Result<Vec<M>>
where
M: InternalModel + crate::model::Model,
C: ConnectionTrait,
{
let results = scoped_find::<M>().offset(offset).limit(limit).all(conn);
let results = crate::profiling::__profile_future(results)
.await
.map_err(translate_error)
.map_err(|err| {
err.with_context(model_error_context::<M>(format!(
"paginate(limit={}, offset={})",
limit, offset
)))
})?;
Ok(results.into_iter().map(M::from_sea_model).collect())
}
pub async fn delete<M, C>(conn: &C, model: M) -> Result<u64>
where
M: InternalModel + crate::model::Model,
C: ConnectionTrait,
{
let active = model.into_active_model();
let result = active.delete(conn);
let result = crate::profiling::__profile_future(result)
.await
.map_err(translate_error)
.map_err(|err| err.with_context(model_error_context::<M>("delete(model)")))?;
Ok(result.rows_affected)
}
pub async fn insert_many<M, C>(conn: &C, models: Vec<M>) -> Result<Vec<M>>
where
M: InternalModel + crate::model::Model,
<<M as InternalModel>::Entity as EntityTrait>::Model: IntoActiveModel<M::ActiveModel>,
C: ConnectionTrait,
{
if models.is_empty() {
return Ok(Vec::new());
}
let batch_size = models.len();
let error_context =
model_error_context::<M>(format!("insert_many(batch_size={})", batch_size));
if models.len() == 1 {
let active = models.into_iter().next().unwrap().into_active_model();
let result =
crate::profiling::__profile_future(async move { active.insert(conn).await })
.await
.map_err(translate_error)
.map_err(|err| err.with_context(error_context.clone()))?;
return Ok(vec![M::from_sea_model(result)]);
}
let backend = conn.get_database_backend();
let supports_returning = supports_batch_insert_returning(
crate::config::TideConfig::get_database_type(),
backend,
);
if supports_returning {
let active_models: Vec<_> = models.into_iter().map(|m| m.into_active_model()).collect();
let results = M::Entity::insert_many(active_models).exec_with_returning(conn);
let results = crate::profiling::__profile_future(results)
.await
.map_err(translate_error)
.map_err(|err| err.with_context(error_context.clone()))?;
Ok(results.into_iter().map(M::from_sea_model).collect())
} else {
let mut results = Vec::with_capacity(models.len());
for model in models {
let active = model.into_active_model();
let result =
crate::profiling::__profile_future(async move { active.insert(conn).await })
.await
.map_err(translate_error)
.map_err(|err| err.with_context(error_context.clone()))?;
results.push(M::from_sea_model(result));
}
Ok(results)
}
}
}
#[cfg(test)]
#[path = "../testing/internal_tests.rs"]
mod tests;