use crate::error::{Error, Result};
use crate::internal::sql_safety::quote_ident_for_backend;
use crate::soft_delete::{SoftDeleteScope, query_scope_for};
pub(crate) mod sql_safety;
#[allow(unused_imports)]
pub use crate::orm::{
ActiveModelBehavior, ActiveModelTrait, ActiveValue, ColumnTrait, ColumnType, Condition,
ConnectOptions, ConnectionTrait, Database as OrmDatabase, DatabaseConnection as OrmConnection,
DatabaseTransaction as OrmTransaction, DbBackend as OrmBackend, DbErr as OrmError, DeleteMany,
DeriveEntityModel, DeriveRelation, EntityTrait, EnumIter, ExecResult, FromQueryResult, Iden,
IntoActiveModel, Iterable, LoaderTrait, ModelTrait, PaginatorTrait, QueryFilter, QueryOrder,
QuerySelect, QueryTrait, Related, RelationDef, RelationTrait, Statement as OrmStatement,
TransactionTrait, TryGetable, Value,
entity::prelude::*,
schema::{Schema, SchemaBuilder},
sea_query::{
Alias, Asterisk, ColumnDef as OrmColumnDef, ColumnType as OrmColumnType, Expr, ExprTrait,
Index, MysqlQueryBuilder, OnConflict, PostgresQueryBuilder, Query, SimpleExpr,
SqliteQueryBuilder, Table, extension::postgres::PgBinOper,
},
sqlx,
};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Backend {
Postgres,
MySql,
Sqlite,
}
impl Backend {
pub(crate) fn as_database_type(self) -> crate::config::DatabaseType {
match self {
Self::Postgres => crate::config::DatabaseType::Postgres,
Self::MySql => crate::config::DatabaseType::MySQL,
Self::Sqlite => crate::config::DatabaseType::SQLite,
}
}
}
impl From<OrmBackend> for Backend {
fn from(backend: OrmBackend) -> Self {
match backend {
OrmBackend::Postgres => Self::Postgres,
OrmBackend::MySql => Self::MySql,
OrmBackend::Sqlite => Self::Sqlite,
_ => Self::Postgres,
}
}
}
impl From<Backend> for OrmBackend {
fn from(backend: Backend) -> Self {
match backend {
Backend::Postgres => OrmBackend::Postgres,
Backend::MySql => OrmBackend::MySql,
Backend::Sqlite => OrmBackend::Sqlite,
}
}
}
pub(crate) trait StatementBackend {
fn into_statement_backend(self) -> OrmBackend;
}
impl StatementBackend for Backend {
fn into_statement_backend(self) -> OrmBackend {
self.into()
}
}
impl StatementBackend for OrmBackend {
fn into_statement_backend(self) -> OrmBackend {
self
}
}
pub(crate) fn build_statement<B>(backend: B, sql: impl Into<String>) -> OrmStatement
where
B: StatementBackend,
{
OrmStatement::from_string(backend.into_statement_backend(), sql.into())
}
pub(crate) fn build_statement_with_values<B>(
backend: B,
sql: &str,
params: Vec<Value>,
) -> OrmStatement
where
B: StatementBackend,
{
OrmStatement::from_sql_and_values(backend.into_statement_backend(), sql, params)
}
#[doc(hidden)]
pub trait InternalModel: crate::model::ModelMeta + Sized + Send + Sync + Clone {
type Entity: EntityTrait;
type ActiveModel: ActiveModelTrait<Entity = Self::Entity> + ActiveModelBehavior + Send;
fn into_active_model(self) -> Self::ActiveModel;
fn from_entity_model(model: <Self::Entity as EntityTrait>::Model) -> Self;
fn to_entity_model(&self) -> <Self::Entity as EntityTrait>::Model;
fn column_from_str(name: &str) -> Option<<Self::Entity as EntityTrait>::Column>;
fn primary_key_columns() -> Vec<<Self::Entity as EntityTrait>::Column> {
Vec::new()
}
fn primary_key_condition(
primary_key: &<Self as crate::model::ModelMeta>::PrimaryKey,
) -> Condition;
fn primary_key_column() -> Option<<Self::Entity as EntityTrait>::Column> {
Self::primary_key_columns().into_iter().next()
}
fn refresh_runtime_relations_from(&mut self, _previous: &Self) {}
fn field_json_value(&self, _field: &str) -> Result<Option<serde_json::Value>> {
Ok(None)
}
}
#[doc(hidden)]
pub struct InternalConnection {
pub(crate) conn: OrmConnection,
}
impl InternalConnection {
pub async fn connect(url: &str) -> Result<Self> {
let conn = OrmDatabase::connect(url)
.await
.map_err(|e| Error::connection(e.to_string()))?;
Ok(Self { conn })
}
pub fn connection(&self) -> &OrmConnection {
&self.conn
}
}
pub(crate) fn translate_error(err: OrmError) -> Error {
match err {
OrmError::RecordNotFound(msg) => Error::not_found(msg),
OrmError::ConnectionAcquire(e) => Error::connection(e.to_string()),
OrmError::Conn(e) => Error::connection(e.to_string()),
OrmError::Exec(e) => Error::query(e.to_string()),
OrmError::Query(e) => Error::query(e.to_string()),
OrmError::ConvertFromU64(msg) => Error::conversion(msg),
OrmError::UnpackInsertId => Error::query("Failed to get insert ID".to_string()),
OrmError::UpdateGetPrimaryKey => {
Error::query("Failed to get primary key after update".to_string())
}
OrmError::Custom(msg) => Error::internal(msg),
_ => Error::internal(err.to_string()),
}
}
fn model_error_context<M>(query: impl Into<String>) -> crate::error::ErrorContext
where
M: crate::model::Model,
{
crate::error::ErrorContext::new()
.table(M::table_name())
.query(query.into())
}
fn supports_batch_insert_returning(
configured_db_type: Option<crate::config::DatabaseType>,
backend: Backend,
) -> bool {
if let Some(db_type) = configured_db_type {
return match db_type {
crate::config::DatabaseType::Postgres => matches!(backend, Backend::Postgres),
crate::config::DatabaseType::MariaDB => matches!(backend, Backend::MySql),
crate::config::DatabaseType::MySQL | crate::config::DatabaseType::SQLite => false,
};
}
matches!(backend, Backend::Postgres)
}
pub(crate) fn count_to_u64(count: i64, context: &str) -> Result<u64> {
u64::try_from(count).map_err(|_| {
Error::query(format!(
"Database returned a negative count ({count}) for {context}"
))
})
}
fn build_count_select<M>(condition: Option<Condition>) -> Select<M::Entity>
where
M: InternalModel + crate::model::Model,
{
let mut select = scoped_find::<M>()
.select_only()
.column_as(Expr::col(Asterisk).count(), "count");
if let Some(condition) = condition {
select = select.filter(condition);
}
select
}
fn build_exists_any_statement<M>(backend: Backend) -> OrmStatement
where
M: InternalModel + crate::model::Model,
{
let table = quote_ident_for_backend(backend, M::table_name());
let mut sql = format!("SELECT EXISTS(SELECT 1 FROM {}", table);
if matches!(
query_scope_for::<M>(false, false),
SoftDeleteScope::ActiveOnly
) {
let deleted_at = quote_ident_for_backend(backend, M::deleted_at_column());
sql.push_str(&format!(" WHERE {}.{} IS NULL", table, deleted_at));
}
sql.push(')');
build_statement(backend, sql)
}
fn scoped_find<M>() -> Select<M::Entity>
where
M: InternalModel + crate::model::Model,
{
let mut select = M::Entity::find();
if matches!(
query_scope_for::<M>(false, false),
SoftDeleteScope::ActiveOnly
) && let Some(deleted_at_column) = M::column_from_str(M::deleted_at_column())
{
select = select.filter(deleted_at_column.is_null());
}
select
}
#[doc(hidden)]
pub struct QueryExecutor;
impl QueryExecutor {
pub async fn find_all<M, C>(conn: &C) -> Result<Vec<M>>
where
M: InternalModel + crate::model::Model,
C: ConnectionTrait,
{
let results = scoped_find::<M>().all(conn);
let results = crate::profiling::__profile_future(results)
.await
.map_err(translate_error)
.map_err(|err| err.with_context(model_error_context::<M>("find_all()")))?;
Ok(results.into_iter().map(M::from_entity_model).collect())
}
pub async fn first<M, C>(conn: &C) -> Result<Option<M>>
where
M: InternalModel + crate::model::Model,
C: ConnectionTrait,
{
let result = scoped_find::<M>().one(conn);
let result = crate::profiling::__profile_future(result)
.await
.map_err(translate_error)
.map_err(|err| err.with_context(model_error_context::<M>("first()")))?;
Ok(result.map(M::from_entity_model))
}
pub async fn last<M, C>(conn: &C) -> Result<Option<M>>
where
M: InternalModel + crate::model::Model,
C: ConnectionTrait,
{
let mut select = scoped_find::<M>();
let mut query_label = String::from("last()");
let pk_columns = M::primary_key_columns();
if !pk_columns.is_empty() {
for pk_col in pk_columns {
select = select.order_by_desc(pk_col);
}
query_label = format!("last(order_by={} desc)", M::primary_key_names().join(", "));
}
let result = select.one(conn);
let result = crate::profiling::__profile_future(result)
.await
.map_err(translate_error)
.map_err(|err| err.with_context(model_error_context::<M>(query_label)))?;
Ok(result.map(M::from_entity_model))
}
pub async fn count<M, C>(conn: &C, condition: Option<Condition>) -> Result<u64>
where
M: InternalModel + crate::model::Model,
C: ConnectionTrait,
{
#[derive(Debug, FromQueryResult)]
struct CountResult {
count: i64,
}
let result = build_count_select::<M>(condition)
.into_model::<CountResult>()
.one(conn);
let result: Option<CountResult> = crate::profiling::__profile_future(result)
.await
.map_err(translate_error)
.map_err(|err| err.with_context(model_error_context::<M>("count(*)")))?;
result
.map(|r| count_to_u64(r.count, "count(*)"))
.transpose()
.map(|count| count.unwrap_or(0))
}
pub async fn exists_any<M, C>(conn: &C) -> Result<bool>
where
M: InternalModel + crate::model::Model,
C: ConnectionTrait,
{
let backend = Backend::from(conn.get_database_backend());
let statement = build_exists_any_statement::<M>(backend);
let result = crate::profiling::__profile_future(conn.query_one_raw(statement))
.await
.map_err(translate_error)
.map_err(|err| err.with_context(model_error_context::<M>("exists_any()")))?;
match result {
Some(row) => {
let exists = match backend {
Backend::Postgres => row.try_get_by_index(0).unwrap_or(false),
_ => {
let value: i32 = row.try_get_by_index(0).unwrap_or(0);
value > 0
}
};
Ok(exists)
}
None => Ok(false),
}
}
pub async fn paginate<M, C>(conn: &C, limit: u64, offset: u64) -> Result<Vec<M>>
where
M: InternalModel + crate::model::Model,
C: ConnectionTrait,
{
let results = scoped_find::<M>().offset(offset).limit(limit).all(conn);
let results = crate::profiling::__profile_future(results)
.await
.map_err(translate_error)
.map_err(|err| {
err.with_context(model_error_context::<M>(format!(
"paginate(limit={}, offset={})",
limit, offset
)))
})?;
Ok(results.into_iter().map(M::from_entity_model).collect())
}
pub async fn delete<M, C>(conn: &C, model: M) -> Result<u64>
where
M: InternalModel + crate::model::Model,
C: ConnectionTrait,
{
let active = model.into_active_model();
let result = active.delete(conn);
let result = crate::profiling::__profile_future(result)
.await
.map_err(translate_error)
.map_err(|err| err.with_context(model_error_context::<M>("delete(model)")))?;
Ok(result.rows_affected)
}
pub async fn insert_many<M, C>(conn: &C, models: Vec<M>) -> Result<Vec<M>>
where
M: InternalModel + crate::model::Model,
<<M as InternalModel>::Entity as EntityTrait>::Model: IntoActiveModel<M::ActiveModel>,
C: ConnectionTrait,
{
if models.is_empty() {
return Ok(Vec::new());
}
let batch_size = models.len();
let error_context =
model_error_context::<M>(format!("insert_many(batch_size={})", batch_size));
if models.len() == 1 {
let active = models.into_iter().next().unwrap().into_active_model();
let result =
crate::profiling::__profile_future(async move { active.insert(conn).await })
.await
.map_err(translate_error)
.map_err(|err| err.with_context(error_context.clone()))?;
return Ok(vec![M::from_entity_model(result)]);
}
let backend = Backend::from(conn.get_database_backend());
let supports_returning = supports_batch_insert_returning(
crate::config::TideConfig::get_database_type(),
backend,
);
if supports_returning {
let active_models: Vec<_> = models.into_iter().map(|m| m.into_active_model()).collect();
let results = M::Entity::insert_many(active_models).exec_with_returning(conn);
let results = crate::profiling::__profile_future(results)
.await
.map_err(translate_error)
.map_err(|err| err.with_context(error_context.clone()))?;
Ok(results.into_iter().map(M::from_entity_model).collect())
} else {
let mut results = Vec::with_capacity(models.len());
for model in models {
let active = model.into_active_model();
let result =
crate::profiling::__profile_future(async move { active.insert(conn).await })
.await
.map_err(translate_error)
.map_err(|err| err.with_context(error_context.clone()))?;
results.push(M::from_entity_model(result));
}
Ok(results)
}
}
}
#[cfg(test)]
#[path = "../testing/internal_tests.rs"]
mod tests;