use std::collections::HashMap;
use sea_query::{
Alias, Asterisk, Condition, Expr, Func, Order, PostgresQueryBuilder, Query, SqliteQueryBuilder,
Value as SeaValue,
};
use sea_query_binder::SqlxBinder;
use sqlx::Row;
use crate::db::{DbPool, pool_for_dispatched};
use crate::migrate::{Column, ModelMeta};
use crate::orm::SqlType;
use crate::orm::write::{WriteError, json_to_sea_value, null_for};
fn resolve_pool_dyn(meta: &crate::migrate::ModelMeta, op: crate::db::RouteOp) -> crate::db::DbPool {
let ctx = crate::db::route_context();
let r = crate::db::router::router();
let alias = match op {
crate::db::RouteOp::Read => r.db_for_read(meta, &ctx),
crate::db::RouteOp::Write => r.db_for_write(meta, &ctx),
};
pool_for_dispatched(alias.as_str()).clone()
}
#[derive(Debug)]
pub enum DynError {
Write(WriteError),
Sqlx(sqlx::Error),
}
impl std::fmt::Display for DynError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Write(e) => write!(f, "{e}"),
Self::Sqlx(e) => write!(f, "{e}"),
}
}
}
impl std::error::Error for DynError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Self::Write(e) => Some(e),
Self::Sqlx(e) => Some(e),
}
}
}
impl From<sqlx::Error> for DynError {
fn from(e: sqlx::Error) -> Self {
Self::Sqlx(e)
}
}
impl From<WriteError> for DynError {
fn from(e: WriteError) -> Self {
Self::Write(e)
}
}
pub struct DynQuerySet<'a> {
meta: &'a ModelMeta,
where_clauses: Vec<Condition>,
order: Vec<(String, bool)>,
limit: Option<u64>,
offset: Option<u64>,
select_cols: Vec<String>,
with_deleted: bool,
only_deleted: bool,
hard_delete: bool,
select_related: Vec<String>,
}
impl<'a> DynQuerySet<'a> {
pub fn for_meta(meta: &'a ModelMeta) -> Self {
let select_cols = meta.fields.iter().map(|c| c.name.clone()).collect();
Self {
meta,
where_clauses: Vec::new(),
order: Vec::new(),
limit: None,
offset: None,
select_cols,
with_deleted: false,
only_deleted: false,
hard_delete: false,
select_related: Vec::new(),
}
}
pub fn with_deleted(mut self) -> Self {
self.with_deleted = true;
self
}
pub fn only_deleted(mut self) -> Self {
self.only_deleted = true;
self
}
pub fn hard_delete(mut self) -> Self {
self.hard_delete = true;
self
}
fn effective_where_clauses(&self) -> Vec<Condition> {
let mut clauses = self.where_clauses.clone();
if self.meta.soft_delete {
if self.only_deleted {
clauses
.push(Condition::all().add(Expr::col(Alias::new("deleted_at")).is_not_null()));
} else if !self.with_deleted {
clauses.push(Condition::all().add(Expr::col(Alias::new("deleted_at")).is_null()));
}
}
clauses
}
fn live_where_clauses(&self) -> Vec<Condition> {
let mut clauses = self.where_clauses.clone();
if self.meta.soft_delete {
clauses.push(Condition::all().add(Expr::col(Alias::new("deleted_at")).is_null()));
}
clauses
}
pub fn select_cols(mut self, cols: &[String]) -> Self {
let valid: Vec<String> = cols
.iter()
.filter(|n| self.meta.fields.iter().any(|c| &c.name == *n))
.cloned()
.collect();
if !valid.is_empty() {
self.select_cols = valid;
}
self
}
pub fn select_related_dyn(mut self, fields: &[String]) -> Self {
for name in fields {
let canonical = normalize_sr_token(name);
if validate_sr_chain(self.meta, &canonical).is_none() {
continue;
}
if !self.select_related.iter().any(|n| n == &canonical) {
self.select_related.push(canonical);
}
}
self
}
#[doc(hidden)]
pub fn select_related_fields(&self) -> &[String] {
&self.select_related
}
pub fn search(mut self, fields: &[String], term: &str) -> Self {
let term = term.trim();
if term.is_empty() {
return self;
}
let restricted = !fields.is_empty();
let as_int = term.parse::<i64>().ok();
let as_float = term.parse::<f64>().ok();
let as_bool = match term.to_ascii_lowercase().as_str() {
"true" => Some(true),
"false" => Some(false),
_ => None,
};
let like_pat = format!("%{}%", crate::orm::escape_like_literal(term)).to_uppercase();
let mut cond = Condition::any();
let mut added = 0;
for col in &self.meta.fields {
if restricted && !fields.iter().any(|f| f == &col.name) {
continue;
}
let predicate: Option<sea_query::SimpleExpr> = match col.ty {
SqlType::Text => Some(
Expr::expr(Func::upper(Expr::col(Alias::new(&col.name))))
.like(sea_query::LikeExpr::new(like_pat.clone()).escape('\\')),
),
SqlType::SmallInt | SqlType::Integer | SqlType::BigInt | SqlType::ForeignKey => {
as_int.map(|n| Expr::col(Alias::new(&col.name)).eq(n))
}
SqlType::Real | SqlType::Double => {
as_float.map(|n| Expr::col(Alias::new(&col.name)).eq(n))
}
SqlType::Boolean => as_bool.map(|b| Expr::col(Alias::new(&col.name)).eq(b)),
_ => None,
};
if let Some(p) = predicate {
cond = cond.add(p);
added += 1;
}
}
if added > 0 {
self.where_clauses.push(cond);
}
self
}
pub fn filter_condition(mut self, cond: sea_query::Condition) -> Self {
self.where_clauses.push(cond);
self
}
pub fn filter_in_i64(mut self, col: &str, vals: &[i64]) -> Self {
if vals.is_empty() || !self.meta.fields.iter().any(|c| c.name == col) {
return self;
}
let cond = Condition::all().add(Expr::col(Alias::new(col)).is_in(vals.iter().copied()));
self.where_clauses.push(cond);
self
}
pub fn filter_m2m_contains_any(mut self, field_name: &str, child_ids: &[String]) -> Self {
if child_ids.is_empty() {
return self;
}
let Some(rel) = self
.meta
.m2m_relations
.iter()
.find(|r| r.field_name == field_name)
else {
return self;
};
let Some(pk_col) = self.meta.pk_column() else {
return self;
};
let target_pk_ty = crate::migrate::pk_meta_for_table(&rel.target_table)
.map(|(_, ty)| ty)
.unwrap_or(SqlType::BigInt);
let junction_table = format!("{}_{}", self.meta.table, rel.field_name);
let child_id_expr = Expr::col(Alias::new("child_id"));
let in_clause: sea_query::SimpleExpr = match target_pk_ty {
SqlType::Text | SqlType::Uuid => {
let bound: Vec<String> = child_ids
.iter()
.filter_map(|s| {
let s = s.trim();
if s.is_empty() {
None
} else {
Some(s.to_string())
}
})
.collect();
if bound.is_empty() {
return self;
}
child_id_expr.is_in(bound)
}
_ => {
let parsed: Vec<i64> = child_ids.iter().filter_map(|s| s.parse().ok()).collect();
if parsed.is_empty() {
return self;
}
child_id_expr.is_in(parsed)
}
};
let subq = Query::select()
.column(Alias::new("parent_id"))
.from(crate::db::router::schema_qualified_table(&junction_table))
.and_where(in_clause)
.to_owned();
let cond =
Condition::all().add(Expr::col(Alias::new(pk_col.name.clone())).in_subquery(subq));
self.where_clauses.push(cond);
self
}
pub fn filter_in_strings(mut self, col: &str, vals: &[String]) -> Self {
let Some(meta_col) = self.meta.fields.iter().find(|c| c.name == col) else {
return self;
};
if vals.is_empty() {
return self;
}
let expr = Expr::col(Alias::new(col));
let cond = match crate::migrate::fk_effective_type(meta_col) {
SqlType::SmallInt | SqlType::Integer => {
let parsed: Vec<i32> = vals.iter().filter_map(|s| s.parse().ok()).collect();
if parsed.is_empty() {
return self;
}
Condition::all().add(expr.is_in(parsed))
}
SqlType::BigInt | SqlType::ForeignKey => {
let parsed: Vec<i64> = vals.iter().filter_map(|s| s.parse().ok()).collect();
if parsed.is_empty() {
return self;
}
Condition::all().add(expr.is_in(parsed))
}
SqlType::Real | SqlType::Double => {
let parsed: Vec<f64> = vals.iter().filter_map(|s| s.parse().ok()).collect();
if parsed.is_empty() {
return self;
}
Condition::all().add(expr.is_in(parsed))
}
SqlType::Boolean => {
let parsed: Vec<bool> = vals
.iter()
.map(|s| matches!(s.as_str(), "true" | "on" | "1"))
.collect();
Condition::all().add(expr.is_in(parsed))
}
SqlType::Uuid => {
let parsed: Vec<uuid::Uuid> = vals
.iter()
.filter_map(|s| uuid::Uuid::parse_str(s).ok())
.collect();
if parsed.is_empty() {
return self;
}
Condition::all().add(expr.is_in(parsed))
}
_ => Condition::all().add(expr.is_in(vals.iter().map(|s| s.to_string()))),
};
self.where_clauses.push(cond);
self
}
pub fn filter_eq_string(mut self, col: &str, value: &str) -> Self {
let Some(meta_col) = self.meta.fields.iter().find(|c| c.name == col) else {
return self;
};
let expr = Expr::col(Alias::new(col));
let predicate = match crate::migrate::fk_effective_type(meta_col) {
SqlType::SmallInt | SqlType::Integer => value.parse::<i32>().ok().map(|v| expr.eq(v)),
SqlType::BigInt | SqlType::ForeignKey => value.parse::<i64>().ok().map(|v| expr.eq(v)),
SqlType::Real | SqlType::Double => value.parse::<f64>().ok().map(|v| expr.eq(v)),
SqlType::Boolean => {
let v = matches!(value, "true" | "on" | "1");
Some(expr.eq(v))
}
SqlType::Uuid => uuid::Uuid::parse_str(value).ok().map(|u| expr.eq(u)),
_ => Some(expr.eq(value.to_string())),
};
if let Some(p) = predicate {
self.where_clauses.push(Condition::all().add(p));
}
self
}
pub fn order_by_col(mut self, col: &str, descending: bool) -> Self {
if self.meta.fields.iter().any(|c| c.name == col) {
self.order.push((col.to_string(), descending));
}
self
}
pub fn limit(mut self, n: u64) -> Self {
self.limit = Some(n);
self
}
pub fn offset(mut self, n: u64) -> Self {
self.offset = Some(n);
self
}
pub async fn count(self) -> Result<i64, DynError> {
let mut q = Query::select();
q.from(crate::db::router::schema_qualified_table(&self.meta.table));
q.expr(Func::count(Expr::col(Asterisk)));
let where_clauses = self.effective_where_clauses();
for cond in &where_clauses {
q.cond_where(cond.clone());
}
match resolve_pool_dyn(self.meta, crate::db::RouteOp::Read) {
DbPool::Sqlite(pool) => {
let (sql, values) = q.build_sqlx(SqliteQueryBuilder);
let row = sqlx::query_with(&sql, values).fetch_one(&pool).await?;
Ok(row.try_get::<i64, _>(0)?)
}
DbPool::Postgres(pool) => {
let (sql, values) = q.build_sqlx(PostgresQueryBuilder);
let row = sqlx::query_with(&sql, values).fetch_one(&pool).await?;
Ok(row.try_get::<i64, _>(0)?)
}
}
}
pub async fn fetch_distinct_strings(self, col: &str) -> Result<Vec<String>, DynError> {
let Some(col_meta) = self.meta.fields.iter().find(|c| c.name == col) else {
return Ok(Vec::new());
};
let mut q = Query::select();
q.distinct();
q.from(crate::db::router::schema_qualified_table(&self.meta.table));
q.column(Alias::new(col));
let where_clauses = self.effective_where_clauses();
for cond in &where_clauses {
q.cond_where(cond.clone());
}
if let Some(n) = self.limit {
q.limit(n);
}
match resolve_pool_dyn(self.meta, crate::db::RouteOp::Read) {
DbPool::Sqlite(pool) => {
let (sql, values) = q.build_sqlx(SqliteQueryBuilder);
let rows = sqlx::query_with(&sql, values).fetch_all(&pool).await?;
let mut out = Vec::with_capacity(rows.len());
for row in rows {
out.push(decode_to_string(&row, col_meta)?);
}
Ok(out)
}
DbPool::Postgres(pool) => {
let (sql, values) = q.build_sqlx(PostgresQueryBuilder);
let rows = sqlx::query_with(&sql, values).fetch_all(&pool).await?;
let mut out = Vec::with_capacity(rows.len());
for row in rows {
out.push(decode_pg_to_string(&row, col_meta)?);
}
Ok(out)
}
}
}
pub async fn delete(self) -> Result<u64, DynError> {
if self.meta.soft_delete && !self.hard_delete {
return self.soft_delete_update().await;
}
let where_clauses = self.effective_where_clauses();
let parent_pks: Vec<serde_json::Value> = match self.meta.pk_column() {
Some(pk_col) => collect_parent_pks(&self.meta, pk_col, &where_clauses)
.await
.unwrap_or_default(),
None => Vec::new(),
};
let mut q = Query::delete();
q.from_table(crate::db::router::schema_qualified_table(&self.meta.table));
for cond in &where_clauses {
q.cond_where(cond.clone());
}
let rows_affected = match resolve_pool_dyn(self.meta, crate::db::RouteOp::Write) {
DbPool::Sqlite(pool) => {
let (sql, values) = q.build_sqlx(SqliteQueryBuilder);
let res = sqlx::query_with(&sql, values).execute(&pool).await?;
res.rows_affected()
}
DbPool::Postgres(pool) => {
let (sql, values) = q.build_sqlx(PostgresQueryBuilder);
let res = sqlx::query_with(&sql, values).execute(&pool).await?;
res.rows_affected()
}
};
crate::signals::emit_bulk_post_delete_by_table(&self.meta.table, parent_pks).await;
Ok(rows_affected)
}
async fn soft_delete_update(self) -> Result<u64, DynError> {
let where_clauses = self.live_where_clauses();
let parent_pks: Vec<serde_json::Value> = match self.meta.pk_column() {
Some(pk_col) => collect_parent_pks(self.meta, pk_col, &where_clauses)
.await
.unwrap_or_default(),
None => Vec::new(),
};
let mut q = Query::update();
q.table(crate::db::router::schema_qualified_table(&self.meta.table));
q.value(
Alias::new("deleted_at"),
sea_query::Value::ChronoDateTimeUtc(Some(Box::new(chrono::Utc::now()))),
);
for cond in &where_clauses {
q.cond_where(cond.clone());
}
let rows_affected = match resolve_pool_dyn(self.meta, crate::db::RouteOp::Write) {
DbPool::Sqlite(pool) => {
let (sql, values) = q.build_sqlx(SqliteQueryBuilder);
let res = sqlx::query_with(&sql, values).execute(&pool).await?;
res.rows_affected()
}
DbPool::Postgres(pool) => {
let (sql, values) = q.build_sqlx(PostgresQueryBuilder);
let res = sqlx::query_with(&sql, values).execute(&pool).await?;
res.rows_affected()
}
};
crate::signals::emit_bulk_post_delete_by_table(&self.meta.table, parent_pks).await;
Ok(rows_affected)
}
pub async fn restore(self) -> Result<u64, DynError> {
if !self.meta.soft_delete {
return Ok(0);
}
let mut where_clauses = self.where_clauses.clone();
where_clauses
.push(Condition::all().add(Expr::col(Alias::new("deleted_at")).is_not_null()));
let parent_pks: Vec<serde_json::Value> = match self.meta.pk_column() {
Some(pk_col) => collect_parent_pks(self.meta, pk_col, &where_clauses)
.await
.unwrap_or_default(),
None => Vec::new(),
};
let mut q = Query::update();
q.table(crate::db::router::schema_qualified_table(&self.meta.table));
q.value(
Alias::new("deleted_at"),
sea_query::Value::ChronoDateTimeUtc(None),
);
for cond in &where_clauses {
q.cond_where(cond.clone());
}
let rows_affected = match resolve_pool_dyn(self.meta, crate::db::RouteOp::Write) {
DbPool::Sqlite(pool) => {
let (sql, values) = q.build_sqlx(SqliteQueryBuilder);
let res = sqlx::query_with(&sql, values).execute(&pool).await?;
res.rows_affected()
}
DbPool::Postgres(pool) => {
let (sql, values) = q.build_sqlx(PostgresQueryBuilder);
let res = sqlx::query_with(&sql, values).execute(&pool).await?;
res.rows_affected()
}
};
crate::signals::emit_bulk_post_save_by_table(&self.meta.table, parent_pks, false).await;
Ok(rows_affected)
}
pub async fn update_one(self, col: &str, value: &str) -> Result<u64, DynError> {
let Some(col_meta) = self.meta.fields.iter().find(|c| c.name == col) else {
return Ok(0);
};
let sea_value = match form_str_to_sea_value(col_meta, value) {
Ok(v) => v,
Err(e) => {
return Err(DynError::Write(WriteError::Validator {
field: col_meta.name.clone(),
message: e.to_string(),
}));
}
};
let mut q = Query::update();
q.table(crate::db::router::schema_qualified_table(&self.meta.table));
q.value(Alias::new(col), sea_value);
let where_clauses = self.effective_where_clauses();
for cond in &where_clauses {
q.cond_where(cond.clone());
}
match resolve_pool_dyn(self.meta, crate::db::RouteOp::Write) {
DbPool::Sqlite(pool) => {
let (sql, values) = q.build_sqlx(SqliteQueryBuilder);
let res = sqlx::query_with(&sql, values).execute(&pool).await?;
Ok(res.rows_affected())
}
DbPool::Postgres(pool) => {
let (sql, values) = q.build_sqlx(PostgresQueryBuilder);
let res = sqlx::query_with(&sql, values).execute(&pool).await?;
Ok(res.rows_affected())
}
}
}
pub async fn update_form(
self,
form: &HashMap<String, String>,
skip: &[String],
) -> Result<u64, DynError> {
let Some(q) = self.build_update_form_query(form, skip)? else {
return Ok(0);
};
match resolve_pool_dyn(self.meta, crate::db::RouteOp::Write) {
DbPool::Sqlite(pool) => {
let (sql, values) = q.build_sqlx(SqliteQueryBuilder);
let res = sqlx::query_with(&sql, values).execute(&pool).await?;
Ok(res.rows_affected())
}
DbPool::Postgres(pool) => {
let (sql, values) = q.build_sqlx(PostgresQueryBuilder);
let res = sqlx::query_with(&sql, values).execute(&pool).await?;
Ok(res.rows_affected())
}
}
}
fn build_update_form_query(
&self,
form: &HashMap<String, String>,
skip: &[String],
) -> Result<Option<sea_query::UpdateStatement>, DynError> {
let mut q = Query::update();
q.table(crate::db::router::schema_qualified_table(&self.meta.table));
let mut any = false;
for col in &self.meta.fields {
if col.primary_key || skip.iter().any(|s| s == &col.name) {
continue;
}
if col.auto_now {
q.value(
Alias::new(&col.name),
crate::orm::write::now_for_column(col.ty),
);
any = true;
continue;
}
let Some(raw) = form.get(&col.name) else {
continue;
};
let sea_value = match form_str_to_sea_value(col, raw) {
Ok(v) => v,
Err(e) => {
return Err(DynError::Write(WriteError::Validator {
field: col.name.clone(),
message: e.to_string(),
}));
}
};
q.value(Alias::new(&col.name), sea_value);
any = true;
}
if !any {
return Ok(None);
}
let where_clauses = self.effective_where_clauses();
for cond in &where_clauses {
q.cond_where(cond.clone());
}
Ok(Some(q))
}
pub async fn update_form_in_tx(
self,
tx: &mut crate::db::Transaction,
form: &HashMap<String, String>,
skip: &[String],
) -> Result<u64, DynError> {
let Some(q) = self.build_update_form_query(form, skip)? else {
return Ok(0);
};
match tx.backend_name() {
"sqlite" => {
let (sql, values) = q.build_sqlx(SqliteQueryBuilder);
let inner = tx.as_sqlite_mut().expect("sqlite backend_name");
let res = sqlx::query_with(&sql, values).execute(&mut **inner).await?;
Ok(res.rows_affected())
}
_ => {
let (sql, values) = q.build_sqlx(PostgresQueryBuilder);
let inner = tx.as_pg_mut().expect("postgres backend_name");
let res = sqlx::query_with(&sql, values).execute(&mut **inner).await?;
Ok(res.rows_affected())
}
}
}
pub async fn insert_form(
self,
form: &HashMap<String, String>,
skip: &[String],
) -> Result<i64, DynError> {
let Some(mut q) = self.build_insert_form_query(form, skip)? else {
return Ok(0);
};
match resolve_pool_dyn(self.meta, crate::db::RouteOp::Write) {
DbPool::Sqlite(pool) => {
let (sql, vals) = q.build_sqlx(SqliteQueryBuilder);
let res = sqlx::query_with(&sql, vals).execute(&pool).await?;
Ok(res.last_insert_rowid())
}
DbPool::Postgres(pool) => {
let pk_name = self
.meta
.fields
.iter()
.find(|c| c.primary_key)
.map(|c| c.name.clone());
if let Some(pk) = pk_name {
q.returning_col(Alias::new(&pk));
let (sql, vals) = q.build_sqlx(PostgresQueryBuilder);
let row = sqlx::query_with(&sql, vals).fetch_one(&pool).await?;
Ok(row.try_get::<i64, _>(pk.as_str()).unwrap_or(0))
} else {
let (sql, vals) = q.build_sqlx(PostgresQueryBuilder);
let _ = sqlx::query_with(&sql, vals).execute(&pool).await?;
Ok(0)
}
}
}
}
fn build_insert_form_query(
&self,
form: &HashMap<String, String>,
skip: &[String],
) -> Result<Option<sea_query::InsertStatement>, DynError> {
let mut cols: Vec<&str> = Vec::new();
let mut values: Vec<SeaValue> = Vec::new();
for col in &self.meta.fields {
if skip.iter().any(|s| s == &col.name) {
continue;
}
if col.primary_key
&& matches!(
col.ty,
SqlType::Integer | SqlType::BigInt | SqlType::SmallInt
)
&& form.get(&col.name).is_none_or(|v| v.is_empty())
{
continue;
}
if (col.auto_now_add || col.auto_now)
&& form.get(&col.name).is_none_or(|v| v.is_empty())
{
cols.push(&col.name);
values.push(crate::orm::write::now_for_column(col.ty));
continue;
}
let raw = form.get(&col.name).map(|s| s.as_str()).unwrap_or("");
let sea_value = match form_str_to_sea_value(col, raw) {
Ok(v) => v,
Err(e) => {
return Err(DynError::Write(WriteError::Validator {
field: col.name.clone(),
message: e.to_string(),
}));
}
};
cols.push(&col.name);
values.push(sea_value);
}
if cols.is_empty() {
return Ok(None);
}
let mut q = Query::insert();
q.into_table(crate::db::router::schema_qualified_table(&self.meta.table));
q.columns(cols.iter().map(|c| Alias::new(*c)).collect::<Vec<_>>());
let exprs: Vec<sea_query::SimpleExpr> = values.into_iter().map(Into::into).collect();
q.values_panic(exprs);
Ok(Some(q))
}
pub async fn insert_form_in_tx(
self,
tx: &mut crate::db::Transaction,
form: &HashMap<String, String>,
skip: &[String],
) -> Result<i64, DynError> {
let Some(mut q) = self.build_insert_form_query(form, skip)? else {
return Ok(0);
};
match tx.backend_name() {
"sqlite" => {
let (sql, vals) = q.build_sqlx(SqliteQueryBuilder);
let inner = tx.as_sqlite_mut().expect("sqlite backend_name");
let res = sqlx::query_with(&sql, vals).execute(&mut **inner).await?;
Ok(res.last_insert_rowid())
}
_ => {
let pk_name = self
.meta
.fields
.iter()
.find(|c| c.primary_key)
.map(|c| c.name.clone());
let inner = tx.as_pg_mut().expect("postgres backend_name");
if let Some(pk) = pk_name {
q.returning_col(Alias::new(&pk));
let (sql, vals) = q.build_sqlx(PostgresQueryBuilder);
let row = sqlx::query_with(&sql, vals).fetch_one(&mut **inner).await?;
Ok(row.try_get::<i64, _>(pk.as_str()).unwrap_or(0))
} else {
let (sql, vals) = q.build_sqlx(PostgresQueryBuilder);
let _ = sqlx::query_with(&sql, vals).execute(&mut **inner).await?;
Ok(0)
}
}
}
}
pub async fn fetch_as_strings(self) -> Result<Vec<HashMap<String, String>>, DynError> {
let mut q = Query::select();
q.from(crate::db::router::schema_qualified_table(&self.meta.table));
for c in &self.select_cols {
q.column(Alias::new(c));
}
let where_clauses = self.effective_where_clauses();
for cond in &where_clauses {
q.cond_where(cond.clone());
}
for (col, descending) in &self.order {
q.order_by(
Alias::new(col),
if *descending { Order::Desc } else { Order::Asc },
);
}
if let Some(n) = self.limit {
q.limit(n);
}
if let Some(n) = self.offset {
q.offset(n);
}
match resolve_pool_dyn(self.meta, crate::db::RouteOp::Read) {
DbPool::Sqlite(pool) => {
let (sql, values) = q.build_sqlx(SqliteQueryBuilder);
let rows = sqlx::query_with(&sql, values).fetch_all(&pool).await?;
let mut out: Vec<HashMap<String, String>> = Vec::with_capacity(rows.len());
for row in rows {
let mut entry = HashMap::new();
for col_name in &self.select_cols {
if let Some(col_meta) =
self.meta.fields.iter().find(|c| &c.name == col_name)
{
let v = decode_to_string(&row, col_meta)?;
entry.insert(col_name.clone(), v);
}
}
out.push(entry);
}
Ok(out)
}
DbPool::Postgres(pool) => {
let (sql, values) = q.build_sqlx(PostgresQueryBuilder);
let rows = sqlx::query_with(&sql, values).fetch_all(&pool).await?;
let mut out: Vec<HashMap<String, String>> = Vec::with_capacity(rows.len());
for row in rows {
let mut entry = HashMap::new();
for col_name in &self.select_cols {
if let Some(col_meta) =
self.meta.fields.iter().find(|c| &c.name == col_name)
{
let v = decode_pg_to_string(&row, col_meta)?;
entry.insert(col_name.clone(), v);
}
}
out.push(entry);
}
Ok(out)
}
}
}
pub async fn fetch_as_json(
self,
) -> Result<Vec<serde_json::Map<String, serde_json::Value>>, DynError> {
let mut q = Query::select();
q.from(crate::db::router::schema_qualified_table(&self.meta.table));
for c in &self.select_cols {
q.column(Alias::new(c));
}
let where_clauses = self.effective_where_clauses();
for cond in &where_clauses {
q.cond_where(cond.clone());
}
for (col, descending) in &self.order {
q.order_by(
Alias::new(col),
if *descending { Order::Desc } else { Order::Asc },
);
}
if let Some(n) = self.limit {
q.limit(n);
}
if let Some(n) = self.offset {
q.offset(n);
}
let pk_name = self
.meta
.pk_column()
.map(|c| c.name.clone())
.unwrap_or_default();
let selected_cols: Vec<(&String, &Column)> = self
.select_cols
.iter()
.filter_map(|col_name| {
self.meta
.fields
.iter()
.find(|c| &c.name == col_name)
.map(|col| (col_name, col))
})
.collect();
let mut out: Vec<serde_json::Map<String, serde_json::Value>> =
match resolve_pool_dyn(self.meta, crate::db::RouteOp::Read) {
DbPool::Sqlite(pool) => {
let (sql, values) = q.build_sqlx(SqliteQueryBuilder);
let rows = sqlx::query_with(&sql, values).fetch_all(&pool).await?;
let mut out: Vec<serde_json::Map<String, serde_json::Value>> =
Vec::with_capacity(rows.len());
for row in rows {
let mut entry = serde_json::Map::new();
for (col_name, col_meta) in &selected_cols {
entry.insert((*col_name).clone(), decode_to_json(&row, col_meta)?);
}
out.push(entry);
}
out
}
DbPool::Postgres(pool) => {
let (sql, values) = q.build_sqlx(PostgresQueryBuilder);
let rows = sqlx::query_with(&sql, values).fetch_all(&pool).await?;
let mut out: Vec<serde_json::Map<String, serde_json::Value>> =
Vec::with_capacity(rows.len());
for row in rows {
let mut entry = serde_json::Map::new();
for (col_name, col_meta) in &selected_cols {
entry.insert((*col_name).clone(), decode_pg_to_json(&row, col_meta)?);
}
out.push(entry);
}
out
}
};
if !self.meta.m2m_relations.is_empty() && !out.is_empty() {
hydrate_m2m_batched(&self.meta, &pk_name, &mut out).await?;
}
if !self.select_related.is_empty() && !out.is_empty() {
hydrate_select_related_into(&self.meta, &self.select_related, &mut out).await?;
}
Ok(out)
}
pub async fn first_as_json(
mut self,
) -> Result<Option<serde_json::Map<String, serde_json::Value>>, DynError> {
self.limit = Some(1);
let mut rows = self.fetch_as_json().await?;
Ok(rows.pop())
}
pub async fn fetch_one_json_in_tx(
self,
tx: &mut crate::db::Transaction,
) -> Result<Option<serde_json::Map<String, serde_json::Value>>, DynError> {
let mut q = Query::select();
q.from(crate::db::router::schema_qualified_table(&self.meta.table));
for c in &self.meta.fields {
q.column(Alias::new(&c.name));
}
let where_clauses = self.effective_where_clauses();
for cond in &where_clauses {
q.cond_where(cond.clone());
}
q.limit(1);
let out = match tx.backend_name() {
"sqlite" => {
let (sql, values) = q.build_sqlx(SqliteQueryBuilder);
let inner = tx.as_sqlite_mut().expect("sqlite backend_name");
let row = sqlx::query_with(&sql, values)
.fetch_optional(&mut **inner)
.await?;
match row {
Some(row) => {
let mut entry = serde_json::Map::new();
for col in &self.meta.fields {
entry.insert(col.name.clone(), decode_to_json(&row, col)?);
}
Some(entry)
}
None => None,
}
}
_ => {
let (sql, values) = q.build_sqlx(PostgresQueryBuilder);
let inner = tx.as_pg_mut().expect("postgres backend_name");
let row = sqlx::query_with(&sql, values)
.fetch_optional(&mut **inner)
.await?;
match row {
Some(row) => {
let mut entry = serde_json::Map::new();
for col in &self.meta.fields {
entry.insert(col.name.clone(), decode_pg_to_json(&row, col)?);
}
Some(entry)
}
None => None,
}
}
};
Ok(out)
}
pub async fn insert_json(
self,
body: &serde_json::Map<String, serde_json::Value>,
) -> Result<serde_json::Map<String, serde_json::Value>, crate::orm::write::WriteError> {
use crate::orm::write::WriteError;
let body_owned: serde_json::Map<String, serde_json::Value>;
let body: &serde_json::Map<String, serde_json::Value> =
match normalise_insert_body(self.meta, body) {
Some(owned) => {
body_owned = owned;
&body_owned
}
None => body,
};
let validation_errors = crate::orm::validation::validate_on_create(self.meta, body).await;
if !validation_errors.is_empty() {
return Err(WriteError::Multiple {
errors: validation_errors,
});
}
let InsertPlan {
mut q,
pk_name,
pk_ty,
} = build_insert_plan(self.meta, body)?;
crate::signals::emit_pre_save_by_table(
&self.meta.table,
serde_json::Value::Object(body.clone()),
true,
)
.await;
match resolve_pool_dyn(self.meta, crate::db::RouteOp::Write) {
DbPool::Sqlite(pool) => {
let (sql, vals) = q.build_sqlx(SqliteQueryBuilder);
let res = sqlx::query_with(&sql, vals)
.execute(&pool)
.await
.map_err(|e| classify_or_sqlx(e, body))?;
let pk_pred = match pk_ty {
SqlType::Integer | SqlType::BigInt | SqlType::SmallInt => {
Expr::col(Alias::new(&pk_name)).eq(res.last_insert_rowid())
}
_ => {
let supplied = body
.get(&pk_name)
.cloned()
.unwrap_or(serde_json::Value::Null);
let sea_value = crate::orm::write::json_to_sea_value(
pk_ty, &supplied, false, &pk_name, None,
)?;
Expr::col(Alias::new(&pk_name)).eq(sea_value)
}
};
let mut sel = Query::select();
sel.from(crate::db::router::schema_qualified_table(&self.meta.table));
for c in &self.meta.fields {
sel.column(Alias::new(&c.name));
}
sel.cond_where(Condition::all().add(pk_pred));
let (sel_sql, sel_vals) = sel.build_sqlx(SqliteQueryBuilder);
let row = sqlx::query_with(&sel_sql, sel_vals)
.fetch_one(&pool)
.await?;
let mut out = serde_json::Map::new();
for col in &self.meta.fields {
out.insert(col.name.clone(), decode_to_json(&row, col)?);
}
let pk_value = out.get(&pk_name).cloned();
write_m2m_junctions(&self.meta, pk_value.as_ref(), body).await?;
hydrate_m2m_into(&self.meta, pk_value.as_ref(), &mut out).await?;
crate::signals::emit_post_save_by_table(
&self.meta.table,
serde_json::Value::Object(out.clone()),
true,
)
.await;
Ok(out)
}
DbPool::Postgres(pool) => {
q.returning_all();
let (sql, vals) = q.build_sqlx(PostgresQueryBuilder);
let row = sqlx::query_with(&sql, vals)
.fetch_one(&pool)
.await
.map_err(|e| classify_or_sqlx(e, body))?;
let mut out = serde_json::Map::new();
for col in &self.meta.fields {
out.insert(col.name.clone(), decode_pg_to_json(&row, col)?);
}
let pk_value = out.get(&pk_name).cloned();
write_m2m_junctions(&self.meta, pk_value.as_ref(), body).await?;
hydrate_m2m_into(&self.meta, pk_value.as_ref(), &mut out).await?;
crate::signals::emit_post_save_by_table(
&self.meta.table,
serde_json::Value::Object(out.clone()),
true,
)
.await;
Ok(out)
}
}
}
pub async fn insert_json_in_tx(
self,
body: &serde_json::Map<String, serde_json::Value>,
tx: &mut crate::db::Transaction,
) -> Result<serde_json::Map<String, serde_json::Value>, crate::orm::write::WriteError> {
use crate::orm::write::WriteError;
let body_owned: serde_json::Map<String, serde_json::Value>;
let body: &serde_json::Map<String, serde_json::Value> =
match normalise_insert_body(self.meta, body) {
Some(owned) => {
body_owned = owned;
&body_owned
}
None => body,
};
let validation_errors =
crate::orm::validation::validate_on_create_in_tx(self.meta, body, tx).await;
if !validation_errors.is_empty() {
return Err(WriteError::Multiple {
errors: validation_errors,
});
}
let InsertPlan {
mut q,
pk_name,
pk_ty,
} = build_insert_plan(self.meta, body)?;
match tx.backend_name() {
"sqlite" => {
let (sql, vals) = q.build_sqlx(SqliteQueryBuilder);
let res = {
let inner = tx.as_sqlite_mut().expect("sqlite backend_name");
sqlx::query_with(&sql, vals)
.execute(&mut **inner)
.await
.map_err(|e| classify_or_sqlx(e, body))?
};
let pk_pred = match pk_ty {
SqlType::Integer | SqlType::BigInt | SqlType::SmallInt => {
Expr::col(Alias::new(&pk_name)).eq(res.last_insert_rowid())
}
_ => {
let supplied = body
.get(&pk_name)
.cloned()
.unwrap_or(serde_json::Value::Null);
let sea_value = crate::orm::write::json_to_sea_value(
pk_ty, &supplied, false, &pk_name, None,
)?;
Expr::col(Alias::new(&pk_name)).eq(sea_value)
}
};
let mut sel = Query::select();
sel.from(crate::db::router::schema_qualified_table(&self.meta.table));
for c in &self.meta.fields {
sel.column(Alias::new(&c.name));
}
sel.cond_where(Condition::all().add(pk_pred));
let (sel_sql, sel_vals) = sel.build_sqlx(SqliteQueryBuilder);
let mut out = serde_json::Map::new();
{
let inner = tx.as_sqlite_mut().expect("sqlite backend_name");
let row = sqlx::query_with(&sel_sql, sel_vals)
.fetch_one(&mut **inner)
.await?;
for col in &self.meta.fields {
out.insert(col.name.clone(), decode_to_json(&row, col)?);
}
}
let pk_value = out.get(&pk_name).cloned();
write_m2m_junctions_in_tx(self.meta, pk_value.as_ref(), body, tx).await?;
hydrate_m2m_into_tx(self.meta, pk_value.as_ref(), &mut out, tx).await?;
Ok(out)
}
_ => {
q.returning_all();
let (sql, vals) = q.build_sqlx(PostgresQueryBuilder);
let mut out = serde_json::Map::new();
{
let inner = tx.as_pg_mut().expect("postgres backend_name");
let row = sqlx::query_with(&sql, vals)
.fetch_one(&mut **inner)
.await
.map_err(|e| classify_or_sqlx(e, body))?;
for col in &self.meta.fields {
out.insert(col.name.clone(), decode_pg_to_json(&row, col)?);
}
}
let pk_value = out.get(&pk_name).cloned();
write_m2m_junctions_in_tx(self.meta, pk_value.as_ref(), body, tx).await?;
hydrate_m2m_into_tx(self.meta, pk_value.as_ref(), &mut out, tx).await?;
Ok(out)
}
}
}
pub async fn update_json_in_tx(
self,
body: &serde_json::Map<String, serde_json::Value>,
tx: &mut crate::db::Transaction,
) -> Result<u64, crate::orm::write::WriteError> {
use crate::orm::write::WriteError;
let needs_owned = self
.meta
.fields
.iter()
.any(|c| c.noform || c.slug_from.is_some());
let mut body_owned: serde_json::Map<String, serde_json::Value>;
let body: &serde_json::Map<String, serde_json::Value> = if needs_owned {
body_owned = body.clone();
for col in &self.meta.fields {
if col.noform {
body_owned.remove(&col.name);
}
}
crate::orm::write::apply_slug_from(&self.meta.fields, &mut body_owned, true);
&body_owned
} else {
body
};
let validation_errors =
crate::orm::validation::validate_on_update_in_tx(self.meta, body, tx).await;
if !validation_errors.is_empty() {
return Err(WriteError::Multiple {
errors: validation_errors,
});
}
let mut q = Query::update();
q.table(crate::db::router::schema_qualified_table(&self.meta.table));
let mut any = false;
for col in &self.meta.fields {
if col.primary_key {
continue;
}
let Some(json) = body.get(&col.name) else {
if col.auto_now {
let now_value = crate::orm::write::now_for_column(col.ty);
q.value(Alias::new(&col.name), now_value);
any = true;
}
continue;
};
validate_numeric_bounds(col, json)?;
if let (Some(fmt), Some(s)) = (col.text_format.as_deref(), json.as_str()) {
if let Err(e) = crate::orm::validators::validate_text_format(fmt, s) {
return Err(WriteError::Validator {
field: col.name.clone(),
message: e.to_string(),
});
}
}
let sea_value = crate::orm::write::json_to_sea_value(
col.ty,
json,
col.nullable,
&col.name,
fk_target_pk_sql_type(col),
)?;
q.value(Alias::new(&col.name), sea_value);
any = true;
}
let touches_m2m = self
.meta
.m2m_relations
.iter()
.any(|r| body.contains_key(&r.field_name));
if !any && !touches_m2m {
return Ok(0);
}
let where_clauses = self.effective_where_clauses();
for cond in &where_clauses {
q.cond_where(cond.clone());
}
let parent_pks: Vec<serde_json::Value> = match self.meta.pk_column() {
Some(pk_col) => {
collect_parent_pks_in_tx(self.meta, pk_col, &where_clauses, tx).await?
}
None => Vec::new(),
};
if any {
match tx.backend_name() {
"sqlite" => {
let (sql, values) = q.build_sqlx(SqliteQueryBuilder);
let inner = tx.as_sqlite_mut().expect("sqlite backend_name");
sqlx::query_with(&sql, values)
.execute(&mut **inner)
.await
.map_err(|e| classify_or_sqlx(e, body))?;
}
_ => {
let (sql, values) = q.build_sqlx(PostgresQueryBuilder);
let inner = tx.as_pg_mut().expect("postgres backend_name");
sqlx::query_with(&sql, values)
.execute(&mut **inner)
.await
.map_err(|e| classify_or_sqlx(e, body))?;
}
}
}
for pk in &parent_pks {
write_m2m_junctions_in_tx(self.meta, Some(pk), body, tx).await?;
}
Ok(parent_pks.len().max(if any { 1 } else { 0 }) as u64)
}
pub async fn delete_in_tx(self, tx: &mut crate::db::Transaction) -> Result<u64, DynError> {
let soft = self.meta.soft_delete && !self.hard_delete;
let where_clauses = if soft {
self.live_where_clauses()
} else {
self.effective_where_clauses()
};
let table = crate::db::router::schema_qualified_table(&self.meta.table);
let build = |is_sqlite: bool| {
if soft {
let mut u = Query::update();
u.table(table.clone());
u.value(
Alias::new("deleted_at"),
sea_query::Value::ChronoDateTimeUtc(Some(Box::new(chrono::Utc::now()))),
);
for cond in &where_clauses {
u.cond_where(cond.clone());
}
if is_sqlite {
u.build_sqlx(SqliteQueryBuilder)
} else {
u.build_sqlx(PostgresQueryBuilder)
}
} else {
let mut d = Query::delete();
d.from_table(table.clone());
for cond in &where_clauses {
d.cond_where(cond.clone());
}
if is_sqlite {
d.build_sqlx(SqliteQueryBuilder)
} else {
d.build_sqlx(PostgresQueryBuilder)
}
}
};
let rows_affected = match tx.backend_name() {
"sqlite" => {
let (sql, values) = build(true);
let inner = tx.as_sqlite_mut().expect("sqlite backend_name");
sqlx::query_with(&sql, values)
.execute(&mut **inner)
.await?
.rows_affected()
}
_ => {
let (sql, values) = build(false);
let inner = tx.as_pg_mut().expect("postgres backend_name");
sqlx::query_with(&sql, values)
.execute(&mut **inner)
.await?
.rows_affected()
}
};
Ok(rows_affected)
}
pub async fn update_json(
self,
body: &serde_json::Map<String, serde_json::Value>,
) -> Result<u64, crate::orm::write::WriteError> {
use crate::orm::write::WriteError;
let needs_owned = self
.meta
.fields
.iter()
.any(|c| c.noform || c.slug_from.is_some());
let mut body_owned: serde_json::Map<String, serde_json::Value>;
let body: &serde_json::Map<String, serde_json::Value> = if needs_owned {
body_owned = body.clone();
for col in &self.meta.fields {
if col.noform {
body_owned.remove(&col.name);
}
}
crate::orm::write::apply_slug_from(&self.meta.fields, &mut body_owned, true);
&body_owned
} else {
body
};
let validation_errors = crate::orm::validation::validate_on_update(&self.meta, body).await;
if !validation_errors.is_empty() {
return Err(WriteError::Multiple {
errors: validation_errors,
});
}
let mut q = Query::update();
q.table(crate::db::router::schema_qualified_table(&self.meta.table));
let mut any = false;
for col in &self.meta.fields {
if col.primary_key {
continue;
}
let Some(json) = body.get(&col.name) else {
if col.auto_now {
let now_value = crate::orm::write::now_for_column(col.ty);
q.value(Alias::new(&col.name), now_value);
any = true;
}
continue;
};
validate_numeric_bounds(col, json)?;
if let (Some(fmt), Some(s)) = (col.text_format.as_deref(), json.as_str()) {
if let Err(e) = crate::orm::validators::validate_text_format(fmt, s) {
return Err(WriteError::Validator {
field: col.name.clone(),
message: e.to_string(),
});
}
}
let sea_value = crate::orm::write::json_to_sea_value(
col.ty,
json,
col.nullable,
&col.name,
fk_target_pk_sql_type(col),
)?;
q.value(Alias::new(&col.name), sea_value);
any = true;
}
let touches_m2m = self
.meta
.m2m_relations
.iter()
.any(|r| body.contains_key(&r.field_name));
if !any && !touches_m2m {
return Ok(0);
}
let where_clauses = self.effective_where_clauses();
for cond in &where_clauses {
q.cond_where(cond.clone());
}
let parent_pks: Vec<serde_json::Value> = match self.meta.pk_column() {
Some(pk_col) => collect_parent_pks(&self.meta, pk_col, &self.where_clauses).await?,
None => Vec::new(),
};
match resolve_pool_dyn(self.meta, crate::db::RouteOp::Write) {
DbPool::Sqlite(pool) => {
if any {
let (sql, values) = q.build_sqlx(SqliteQueryBuilder);
sqlx::query_with(&sql, values)
.execute(&pool)
.await
.map_err(|e| classify_or_sqlx(e, body))?;
}
for pk in &parent_pks {
write_m2m_junctions(&self.meta, Some(pk), body).await?;
}
crate::signals::emit_bulk_post_save_by_table(
&self.meta.table,
parent_pks.clone(),
false,
)
.await;
Ok(parent_pks.len().max(if any { 1 } else { 0 }) as u64)
}
DbPool::Postgres(pool) => {
if any {
let (sql, values) = q.build_sqlx(PostgresQueryBuilder);
sqlx::query_with(&sql, values)
.execute(&pool)
.await
.map_err(|e| classify_or_sqlx(e, body))?;
}
for pk in &parent_pks {
write_m2m_junctions(&self.meta, Some(pk), body).await?;
}
crate::signals::emit_bulk_post_save_by_table(
&self.meta.table,
parent_pks.clone(),
false,
)
.await;
Ok(parent_pks.len().max(if any { 1 } else { 0 }) as u64)
}
}
}
}
pub fn decode_to_string(
row: &sqlx::sqlite::SqliteRow,
col: &Column,
) -> Result<String, sqlx::Error> {
use chrono::{DateTime, NaiveDate, NaiveTime, Utc};
use serde_json::Value;
use uuid::Uuid;
let name = col.name.as_str();
if col.nullable {
return Ok(match col.ty {
SqlType::SmallInt | SqlType::Integer => row
.try_get::<Option<i32>, _>(name)?
.map_or(String::new(), |v| v.to_string()),
SqlType::BigInt => row
.try_get::<Option<i64>, _>(name)?
.map_or(String::new(), |v| v.to_string()),
SqlType::Real => row
.try_get::<Option<f32>, _>(name)?
.map_or(String::new(), |v| v.to_string()),
SqlType::Double => row
.try_get::<Option<f64>, _>(name)?
.map_or(String::new(), |v| v.to_string()),
SqlType::Boolean => row
.try_get::<Option<bool>, _>(name)?
.map_or(String::new(), |v| {
if v { "true" } else { "false" }.to_string()
}),
SqlType::Text => row.try_get::<Option<String>, _>(name)?.unwrap_or_default(),
SqlType::Date => row
.try_get::<Option<NaiveDate>, _>(name)?
.map_or(String::new(), |v| v.to_string()),
SqlType::Time => row
.try_get::<Option<NaiveTime>, _>(name)?
.map_or(String::new(), |v| v.to_string()),
SqlType::Timestamptz => row
.try_get::<Option<DateTime<Utc>>, _>(name)?
.map_or(String::new(), |v| v.to_rfc3339()),
SqlType::Uuid => row
.try_get::<Option<Uuid>, _>(name)?
.map_or(String::new(), |v| v.to_string()),
SqlType::Json => row
.try_get::<Option<Value>, _>(name)?
.map_or(String::new(), |v| v.to_string()),
SqlType::Array(_) => panic_array_unsupported(&col.name),
SqlType::Inet
| SqlType::Cidr
| SqlType::MacAddr
| SqlType::Xml
| SqlType::Ltree
| SqlType::Bit
| SqlType::FullText => panic_pg_only_unsupported(&col.name),
SqlType::ForeignKey => match fk_target_pk_sql_type(col) {
Some(SqlType::Text) => row.try_get::<Option<String>, _>(name)?.unwrap_or_default(),
Some(SqlType::Uuid) => row
.try_get::<Option<Uuid>, _>(name)?
.map_or(String::new(), |v| v.to_string()),
_ => row
.try_get::<Option<i64>, _>(name)?
.map_or(String::new(), |v| v.to_string()),
},
SqlType::Bytes => row
.try_get::<Option<Vec<u8>>, _>(name)?
.map_or(String::new(), |b| hex_encode(&b)),
SqlType::Decimal => panic_pg_only_unsupported(&col.name),
});
}
Ok(match col.ty {
SqlType::SmallInt | SqlType::Integer => row.try_get::<i32, _>(name)?.to_string(),
SqlType::BigInt => row.try_get::<i64, _>(name)?.to_string(),
SqlType::Real => row.try_get::<f32, _>(name)?.to_string(),
SqlType::Double => row.try_get::<f64, _>(name)?.to_string(),
SqlType::Boolean => if row.try_get::<bool, _>(name)? {
"true"
} else {
"false"
}
.to_string(),
SqlType::Text => row.try_get::<String, _>(name)?,
SqlType::Date => row.try_get::<NaiveDate, _>(name)?.to_string(),
SqlType::Time => row.try_get::<NaiveTime, _>(name)?.to_string(),
SqlType::Timestamptz => row.try_get::<DateTime<Utc>, _>(name)?.to_rfc3339(),
SqlType::Uuid => row.try_get::<Uuid, _>(name)?.to_string(),
SqlType::Json => row.try_get::<Value, _>(name)?.to_string(),
SqlType::Array(_) => panic_array_unsupported(&col.name),
SqlType::Inet
| SqlType::Cidr
| SqlType::MacAddr
| SqlType::Xml
| SqlType::Ltree
| SqlType::Bit
| SqlType::FullText => panic_pg_only_unsupported(&col.name),
SqlType::ForeignKey => match fk_target_pk_sql_type(col) {
Some(SqlType::Text) => row.try_get::<String, _>(name)?,
Some(SqlType::Uuid) => row.try_get::<Uuid, _>(name)?.to_string(),
_ => row.try_get::<i64, _>(name)?.to_string(),
},
SqlType::Bytes => hex_encode(&row.try_get::<Vec<u8>, _>(name)?),
SqlType::Decimal => panic_pg_only_unsupported(&col.name),
})
}
pub fn decode_pg_to_string(
row: &sqlx::postgres::PgRow,
col: &Column,
) -> Result<String, sqlx::Error> {
use chrono::{DateTime, NaiveDate, NaiveTime, Utc};
use serde_json::Value;
use uuid::Uuid;
let name = col.name.as_str();
if col.nullable {
return Ok(match col.ty {
SqlType::SmallInt => row
.try_get::<Option<i16>, _>(name)?
.map_or(String::new(), |v| v.to_string()),
SqlType::Integer => row
.try_get::<Option<i32>, _>(name)?
.map_or(String::new(), |v| v.to_string()),
SqlType::BigInt => row
.try_get::<Option<i64>, _>(name)?
.map_or(String::new(), |v| v.to_string()),
SqlType::Real => row
.try_get::<Option<f32>, _>(name)?
.map_or(String::new(), |v| v.to_string()),
SqlType::Double => row
.try_get::<Option<f64>, _>(name)?
.map_or(String::new(), |v| v.to_string()),
SqlType::Boolean => row
.try_get::<Option<bool>, _>(name)?
.map_or(String::new(), |v| {
if v { "true" } else { "false" }.to_string()
}),
SqlType::Text => row.try_get::<Option<String>, _>(name)?.unwrap_or_default(),
SqlType::Date => row
.try_get::<Option<NaiveDate>, _>(name)?
.map_or(String::new(), |v| v.to_string()),
SqlType::Time => row
.try_get::<Option<NaiveTime>, _>(name)?
.map_or(String::new(), |v| v.to_string()),
SqlType::Timestamptz => row
.try_get::<Option<DateTime<Utc>>, _>(name)?
.map_or(String::new(), |v| v.to_rfc3339()),
SqlType::Uuid => row
.try_get::<Option<Uuid>, _>(name)?
.map_or(String::new(), |v| v.to_string()),
SqlType::Json => row
.try_get::<Option<Value>, _>(name)?
.map_or(String::new(), |v| v.to_string()),
SqlType::Array(_)
| SqlType::Inet
| SqlType::Cidr
| SqlType::MacAddr
| SqlType::Xml
| SqlType::Ltree
| SqlType::Bit
| SqlType::FullText => row
.try_get::<Option<String>, _>(name)
.ok()
.flatten()
.unwrap_or_default(),
SqlType::ForeignKey => match fk_target_pk_sql_type(col) {
Some(SqlType::Text) => row.try_get::<Option<String>, _>(name)?.unwrap_or_default(),
Some(SqlType::Uuid) => row
.try_get::<Option<Uuid>, _>(name)?
.map_or(String::new(), |v| v.to_string()),
_ => row
.try_get::<Option<i64>, _>(name)?
.map_or(String::new(), |v| v.to_string()),
},
SqlType::Bytes => row
.try_get::<Option<Vec<u8>>, _>(name)?
.map_or(String::new(), |b| hex_encode(&b)),
SqlType::Decimal => panic_pg_only_unsupported(&col.name),
});
}
Ok(match col.ty {
SqlType::SmallInt => row.try_get::<i16, _>(name)?.to_string(),
SqlType::Integer => row.try_get::<i32, _>(name)?.to_string(),
SqlType::BigInt => row.try_get::<i64, _>(name)?.to_string(),
SqlType::Real => row.try_get::<f32, _>(name)?.to_string(),
SqlType::Double => row.try_get::<f64, _>(name)?.to_string(),
SqlType::Boolean => if row.try_get::<bool, _>(name)? {
"true"
} else {
"false"
}
.to_string(),
SqlType::Text => row.try_get::<String, _>(name)?,
SqlType::Date => row.try_get::<NaiveDate, _>(name)?.to_string(),
SqlType::Time => row.try_get::<NaiveTime, _>(name)?.to_string(),
SqlType::Timestamptz => row.try_get::<DateTime<Utc>, _>(name)?.to_rfc3339(),
SqlType::Uuid => row.try_get::<Uuid, _>(name)?.to_string(),
SqlType::Json => row.try_get::<Value, _>(name)?.to_string(),
SqlType::Array(_)
| SqlType::Inet
| SqlType::Cidr
| SqlType::MacAddr
| SqlType::Xml
| SqlType::Ltree
| SqlType::Bit
| SqlType::FullText => row.try_get::<String, _>(name).unwrap_or_default(),
SqlType::ForeignKey => match fk_target_pk_sql_type(col) {
Some(SqlType::Text) => row.try_get::<String, _>(name)?,
Some(SqlType::Uuid) => row.try_get::<Uuid, _>(name)?.to_string(),
_ => row.try_get::<i64, _>(name)?.to_string(),
},
SqlType::Bytes => hex_encode(&row.try_get::<Vec<u8>, _>(name)?),
SqlType::Decimal => panic_pg_only_unsupported(&col.name),
})
}
pub fn decode_to_json_aliased(
row: &sqlx::sqlite::SqliteRow,
col: &Column,
alias: &str,
) -> Result<serde_json::Value, sqlx::Error> {
let mut aliased = col.clone();
aliased.name = alias.to_string();
decode_to_json(row, &aliased)
}
pub fn decode_pg_to_json_aliased(
row: &sqlx::postgres::PgRow,
col: &Column,
alias: &str,
) -> Result<serde_json::Value, sqlx::Error> {
let mut aliased = col.clone();
aliased.name = alias.to_string();
decode_pg_to_json(row, &aliased)
}
fn fk_target_pk_sql_type(col: &Column) -> Option<SqlType> {
if !matches!(col.ty, SqlType::ForeignKey) {
return None;
}
let target_table = col.fk_target.as_deref()?;
crate::migrate::pk_meta_for_table(target_table).map(|(_, ty)| ty)
}
pub fn decode_to_json(
row: &sqlx::sqlite::SqliteRow,
col: &Column,
) -> Result<serde_json::Value, sqlx::Error> {
use chrono::{DateTime, NaiveDate, NaiveTime, Utc};
use serde_json::Value;
use uuid::Uuid;
let name = col.name.as_str();
if col.nullable {
return Ok(match col.ty {
SqlType::SmallInt | SqlType::Integer => row
.try_get::<Option<i32>, _>(name)?
.map_or(Value::Null, Value::from),
SqlType::BigInt => row
.try_get::<Option<i64>, _>(name)?
.map_or(Value::Null, Value::from),
SqlType::Real => row
.try_get::<Option<f32>, _>(name)?
.map_or(Value::Null, |v| Value::from(v as f64)),
SqlType::Double => row
.try_get::<Option<f64>, _>(name)?
.map_or(Value::Null, Value::from),
SqlType::Boolean => row
.try_get::<Option<bool>, _>(name)?
.map_or(Value::Null, Value::from),
SqlType::Text => row
.try_get::<Option<String>, _>(name)?
.map_or(Value::Null, Value::from),
SqlType::Date => row
.try_get::<Option<NaiveDate>, _>(name)?
.map_or(Value::Null, |v| Value::from(v.to_string())),
SqlType::Time => row
.try_get::<Option<NaiveTime>, _>(name)?
.map_or(Value::Null, |v| Value::from(v.to_string())),
SqlType::Timestamptz => row
.try_get::<Option<DateTime<Utc>>, _>(name)?
.map_or(Value::Null, |v| Value::from(v.to_rfc3339())),
SqlType::Uuid => row
.try_get::<Option<Uuid>, _>(name)?
.map_or(Value::Null, |v| Value::from(v.to_string())),
SqlType::Json => row
.try_get::<Option<Value>, _>(name)?
.unwrap_or(Value::Null),
SqlType::Array(_) => panic_array_unsupported(&col.name),
SqlType::Inet
| SqlType::Cidr
| SqlType::MacAddr
| SqlType::Xml
| SqlType::Ltree
| SqlType::Bit
| SqlType::FullText => panic_pg_only_unsupported(&col.name),
SqlType::ForeignKey => match fk_target_pk_sql_type(col) {
Some(SqlType::Text) => row
.try_get::<Option<String>, _>(name)?
.map_or(Value::Null, Value::from),
Some(SqlType::Uuid) => row
.try_get::<Option<Uuid>, _>(name)?
.map_or(Value::Null, |v| Value::from(v.to_string())),
_ => row
.try_get::<Option<i64>, _>(name)?
.map_or(Value::Null, Value::from),
},
SqlType::Bytes => row
.try_get::<Option<Vec<u8>>, _>(name)?
.map_or(Value::Null, |b| bytes_to_json(&b)),
SqlType::Decimal => panic_pg_only_unsupported(&col.name),
});
}
Ok(match col.ty {
SqlType::SmallInt | SqlType::Integer => Value::from(row.try_get::<i32, _>(name)?),
SqlType::BigInt => Value::from(row.try_get::<i64, _>(name)?),
SqlType::Real => Value::from(row.try_get::<f32, _>(name)? as f64),
SqlType::Double => Value::from(row.try_get::<f64, _>(name)?),
SqlType::Boolean => Value::from(row.try_get::<bool, _>(name)?),
SqlType::Text => Value::from(row.try_get::<String, _>(name)?),
SqlType::Date => Value::from(row.try_get::<NaiveDate, _>(name)?.to_string()),
SqlType::Time => Value::from(row.try_get::<NaiveTime, _>(name)?.to_string()),
SqlType::Timestamptz => Value::from(row.try_get::<DateTime<Utc>, _>(name)?.to_rfc3339()),
SqlType::Uuid => Value::from(row.try_get::<Uuid, _>(name)?.to_string()),
SqlType::Json => row.try_get::<Value, _>(name)?,
SqlType::Array(_) => panic_array_unsupported(&col.name),
SqlType::Inet
| SqlType::Cidr
| SqlType::MacAddr
| SqlType::Xml
| SqlType::Ltree
| SqlType::Bit
| SqlType::FullText => panic_pg_only_unsupported(&col.name),
SqlType::ForeignKey => match fk_target_pk_sql_type(col) {
Some(SqlType::Text) => Value::from(row.try_get::<String, _>(name)?),
Some(SqlType::Uuid) => Value::from(row.try_get::<Uuid, _>(name)?.to_string()),
_ => Value::from(row.try_get::<i64, _>(name)?),
},
SqlType::Bytes => bytes_to_json(&row.try_get::<Vec<u8>, _>(name)?),
SqlType::Decimal => panic_pg_only_unsupported(&col.name),
})
}
pub fn decode_pg_to_json(
row: &sqlx::postgres::PgRow,
col: &Column,
) -> Result<serde_json::Value, sqlx::Error> {
use chrono::{DateTime, NaiveDate, NaiveTime, Utc};
use serde_json::Value;
use uuid::Uuid;
let name = col.name.as_str();
if col.nullable {
return Ok(match col.ty {
SqlType::SmallInt => row
.try_get::<Option<i16>, _>(name)?
.map_or(Value::Null, Value::from),
SqlType::Integer => row
.try_get::<Option<i32>, _>(name)?
.map_or(Value::Null, Value::from),
SqlType::BigInt => row
.try_get::<Option<i64>, _>(name)?
.map_or(Value::Null, Value::from),
SqlType::Real => row
.try_get::<Option<f32>, _>(name)?
.map_or(Value::Null, |v| Value::from(v as f64)),
SqlType::Double => row
.try_get::<Option<f64>, _>(name)?
.map_or(Value::Null, Value::from),
SqlType::Boolean => row
.try_get::<Option<bool>, _>(name)?
.map_or(Value::Null, Value::from),
SqlType::Text => row
.try_get::<Option<String>, _>(name)?
.map_or(Value::Null, Value::from),
SqlType::Date => row
.try_get::<Option<NaiveDate>, _>(name)?
.map_or(Value::Null, |v| Value::from(v.to_string())),
SqlType::Time => row
.try_get::<Option<NaiveTime>, _>(name)?
.map_or(Value::Null, |v| Value::from(v.to_string())),
SqlType::Timestamptz => row
.try_get::<Option<DateTime<Utc>>, _>(name)?
.map_or(Value::Null, |v| Value::from(v.to_rfc3339())),
SqlType::Uuid => row
.try_get::<Option<Uuid>, _>(name)?
.map_or(Value::Null, |v| Value::from(v.to_string())),
SqlType::Json => row
.try_get::<Option<Value>, _>(name)?
.unwrap_or(Value::Null),
SqlType::Array(_)
| SqlType::Inet
| SqlType::Cidr
| SqlType::MacAddr
| SqlType::Xml
| SqlType::Ltree
| SqlType::Bit
| SqlType::FullText => row
.try_get::<Option<String>, _>(name)
.ok()
.flatten()
.map_or(Value::Null, Value::from),
SqlType::ForeignKey => match fk_target_pk_sql_type(col) {
Some(SqlType::Text) => row
.try_get::<Option<String>, _>(name)?
.map_or(Value::Null, Value::from),
Some(SqlType::Uuid) => row
.try_get::<Option<Uuid>, _>(name)?
.map_or(Value::Null, |v| Value::from(v.to_string())),
_ => row
.try_get::<Option<i64>, _>(name)?
.map_or(Value::Null, Value::from),
},
SqlType::Bytes => row
.try_get::<Option<Vec<u8>>, _>(name)?
.map_or(Value::Null, |b| bytes_to_json(&b)),
SqlType::Decimal => panic_pg_only_unsupported(&col.name),
});
}
Ok(match col.ty {
SqlType::SmallInt => Value::from(row.try_get::<i16, _>(name)?),
SqlType::Integer => Value::from(row.try_get::<i32, _>(name)?),
SqlType::BigInt => Value::from(row.try_get::<i64, _>(name)?),
SqlType::Real => Value::from(row.try_get::<f32, _>(name)? as f64),
SqlType::Double => Value::from(row.try_get::<f64, _>(name)?),
SqlType::Boolean => Value::from(row.try_get::<bool, _>(name)?),
SqlType::Text => Value::from(row.try_get::<String, _>(name)?),
SqlType::Date => Value::from(row.try_get::<NaiveDate, _>(name)?.to_string()),
SqlType::Time => Value::from(row.try_get::<NaiveTime, _>(name)?.to_string()),
SqlType::Timestamptz => Value::from(row.try_get::<DateTime<Utc>, _>(name)?.to_rfc3339()),
SqlType::Uuid => Value::from(row.try_get::<Uuid, _>(name)?.to_string()),
SqlType::Json => row.try_get::<Value, _>(name)?,
SqlType::Array(_)
| SqlType::Inet
| SqlType::Cidr
| SqlType::MacAddr
| SqlType::Xml
| SqlType::Ltree
| SqlType::Bit
| SqlType::FullText => row
.try_get::<String, _>(name)
.map(Value::from)
.unwrap_or(Value::Null),
SqlType::ForeignKey => match fk_target_pk_sql_type(col) {
Some(SqlType::Text) => Value::from(row.try_get::<String, _>(name)?),
Some(SqlType::Uuid) => Value::from(row.try_get::<Uuid, _>(name)?.to_string()),
_ => Value::from(row.try_get::<i64, _>(name)?),
},
SqlType::Bytes => bytes_to_json(&row.try_get::<Vec<u8>, _>(name)?),
SqlType::Decimal => panic_pg_only_unsupported(&col.name),
})
}
fn form_str_to_sea_value(col: &Column, raw: &str) -> Result<SeaValue, WriteError> {
if raw.is_empty() {
if col.ty == SqlType::Boolean {
return Ok(SeaValue::Bool(Some(false)));
}
if col.nullable {
return Ok(null_for(col.ty));
}
return Err(WriteError::RequiredFieldMissing {
field: col.name.clone(),
});
}
if matches!(col.ty, SqlType::Json | SqlType::Array(_)) {
let parsed: serde_json::Value =
serde_json::from_str(raw).map_err(|e| WriteError::Validator {
field: col.name.clone(),
message: format!("Not valid JSON: {e}"),
})?;
return json_to_sea_value(col.ty, &parsed, col.nullable, &col.name, None);
}
if matches!(col.ty, SqlType::ForeignKey) {
return match fk_target_pk_sql_type(col) {
Some(SqlType::Text) => Ok(SeaValue::String(Some(Box::new(raw.to_string())))),
Some(SqlType::Uuid) => uuid::Uuid::parse_str(raw)
.map(|v| SeaValue::Uuid(Some(Box::new(v))))
.map_err(|_| WriteError::TypeMismatch {
field: col.name.clone(),
expected: SqlType::Uuid,
got: raw.to_string(),
}),
_ => raw
.parse::<i64>()
.map(|v| SeaValue::BigInt(Some(v)))
.map_err(|_| WriteError::TypeMismatch {
field: col.name.clone(),
expected: SqlType::BigInt,
got: raw.to_string(),
}),
};
}
let json = serde_json::Value::String(raw.to_string());
json_to_sea_value(col.ty, &json, col.nullable, &col.name, None)
}
fn hex_encode(bytes: &[u8]) -> String {
let mut out = String::with_capacity(bytes.len() * 2);
for b in bytes {
out.push_str(&format!("{b:02x}"));
}
out
}
fn bytes_to_json(bytes: &[u8]) -> serde_json::Value {
serde_json::Value::Array(bytes.iter().map(|b| serde_json::Value::from(*b)).collect())
}
fn panic_array_unsupported(column: &str) -> ! {
panic!(
"DynQuerySet: column `{column}` is a Postgres-only Array; the \
field/backend system check should have failed boot."
)
}
fn panic_pg_only_unsupported(column: &str) -> ! {
panic!(
"DynQuerySet: column `{column}` is a Postgres-only network type \
(Inet/Cidr/MacAddr); the field/backend system check should \
have failed boot."
)
}
fn classify_or_sqlx(
e: sqlx::Error,
body: &serde_json::Map<String, serde_json::Value>,
) -> crate::orm::write::WriteError {
if let Some(classified) = crate::orm::validation::classify_sql_error(&e, body) {
return classified;
}
crate::orm::write::WriteError::Sqlx(e)
}
fn validate_numeric_bounds(
col: &Column,
json: &serde_json::Value,
) -> Result<(), crate::orm::write::WriteError> {
let Some(n) = json.as_f64() else {
return Ok(());
};
if let Some(min) = col.min {
if n < min as f64 {
return Err(crate::orm::write::WriteError::Validator {
field: col.name.clone(),
message: format!("must be >= {min} (got {n})."),
});
}
}
if let Some(max) = col.max {
if n > max as f64 {
return Err(crate::orm::write::WriteError::Validator {
field: col.name.clone(),
message: format!("must be <= {max} (got {n})."),
});
}
}
Ok(())
}
fn json_pk_to_sea(v: &serde_json::Value) -> Option<sea_query::Value> {
match v {
serde_json::Value::Number(n) => n.as_i64().map(|i| sea_query::Value::BigInt(Some(i))),
serde_json::Value::String(s) => Some(sea_query::Value::String(Some(Box::new(s.clone())))),
_ => None,
}
}
fn normalize_sr_token(name: &str) -> String {
name.replace("__", ".")
}
fn validate_sr_chain(root_meta: &crate::migrate::ModelMeta, chain: &str) -> Option<Vec<String>> {
let hops: Vec<&str> = chain.split('.').filter(|s| !s.is_empty()).collect();
if hops.is_empty() {
return None;
}
let registered = crate::migrate::registered_models();
let mut targets: Vec<String> = Vec::with_capacity(hops.len());
let mut current_table: String = root_meta.table.clone();
let mut current_meta: Option<crate::migrate::ModelMeta> = None;
for hop in &hops {
let meta_ref: &crate::migrate::ModelMeta =
if current_table == root_meta.table && current_meta.is_none() {
root_meta
} else {
current_meta = registered
.iter()
.find(|m| m.table == current_table)
.cloned();
current_meta.as_ref()?
};
let col = meta_ref.fields.iter().find(|c| &c.name == hop)?;
let target = col.fk_target.clone()?;
targets.push(target.clone());
current_table = target;
}
Some(targets)
}
async fn hydrate_select_related_into(
meta: &crate::migrate::ModelMeta,
sr_fields: &[String],
rows: &mut [serde_json::Map<String, serde_json::Value>],
) -> Result<(), sqlx::Error> {
let pool = resolve_pool_dyn(meta, crate::db::RouteOp::Read);
for chain in sr_fields {
let hops: Vec<&str> = chain.split('.').filter(|s| !s.is_empty()).collect();
if hops.is_empty() {
continue;
}
let Some(targets) = validate_sr_chain(meta, chain) else {
continue;
};
let registered = crate::migrate::registered_models();
let hop_target_pk: Vec<(String, SqlType)> = targets
.iter()
.filter_map(|t| {
registered
.iter()
.find(|m| &m.table == t)
.and_then(|m| m.pk_column().map(|c| (c.name.clone(), c.ty)))
})
.collect();
if hop_target_pk.len() != hops.len() {
continue;
}
let hop_target_soft_delete: Vec<bool> = targets
.iter()
.map(|t| {
registered
.iter()
.find(|m| &m.table == t)
.is_some_and(|m| m.soft_delete)
})
.collect();
let first_field = hops[0];
let mut ids: Vec<serde_json::Value> = Vec::with_capacity(rows.len());
for row in rows.iter() {
let Some(v) = row.get(first_field) else {
continue;
};
if v.is_null() {
continue;
}
ids.push(v.clone());
}
if ids.is_empty() {
continue;
}
dedup_by_pk_key(&mut ids);
let mut levels: Vec<Vec<serde_json::Value>> = Vec::with_capacity(hops.len());
levels.push(
crate::orm::queryset::hydration::fetch_related_as_json_by_pk(
&targets[0],
&hop_target_pk[0].0,
hop_target_pk[0].1,
hop_target_soft_delete[0],
&ids,
&pool,
)
.await?,
);
for hop_idx in 1..hops.len() {
let hop_field = hops[hop_idx];
let hop_target = &targets[hop_idx];
let prev_lvl = &levels[hop_idx - 1];
let mut next_ids: Vec<serde_json::Value> = prev_lvl
.iter()
.filter_map(|r| {
let v = r.as_object()?.get(hop_field)?;
if v.is_null() { None } else { Some(v.clone()) }
})
.collect();
if next_ids.is_empty() {
break;
}
dedup_by_pk_key(&mut next_ids);
levels.push(
crate::orm::queryset::hydration::fetch_related_as_json_by_pk(
hop_target,
&hop_target_pk[hop_idx].0,
hop_target_pk[hop_idx].1,
hop_target_soft_delete[hop_idx],
&next_ids,
&pool,
)
.await?,
);
}
if levels.len() > 1 {
for i in (0..levels.len() - 1).rev() {
let next_pk_col = &hop_target_pk[i + 1].0;
let next_by_pk: HashMap<String, serde_json::Value> = levels[i + 1]
.iter()
.filter_map(|obj| {
let map = obj.as_object()?;
let pk_val = map.get(next_pk_col.as_str())?;
Some((pk_json_key(pk_val), obj.clone()))
})
.collect();
let hop_field = hops[i + 1];
for row in levels[i].iter_mut() {
let Some(map) = row.as_object_mut() else {
continue;
};
let Some(fk_val) = map.get(hop_field) else {
continue;
};
if fk_val.is_null() {
continue;
}
let key = pk_json_key(fk_val);
if let Some(next_json) = next_by_pk.get(&key) {
map.insert(hop_field.to_string(), next_json.clone());
}
}
}
}
let first_pk_col = &hop_target_pk[0].0;
let first_by_pk: HashMap<String, serde_json::Value> = levels
.into_iter()
.next()
.unwrap_or_default()
.into_iter()
.filter_map(|obj| {
let map = obj.as_object()?;
let pk_val = map.get(first_pk_col.as_str())?;
Some((pk_json_key(pk_val), obj.clone()))
})
.collect();
for row in rows.iter_mut() {
let Some(fk_val) = row.get(first_field) else {
continue;
};
if fk_val.is_null() {
continue;
}
let key = pk_json_key(fk_val);
if let Some(resolved) = first_by_pk.get(&key) {
row.insert(first_field.to_string(), resolved.clone());
}
}
}
Ok(())
}
fn dedup_by_pk_key(ids: &mut Vec<serde_json::Value>) {
let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
ids.retain(|v| seen.insert(pk_json_key(v)));
}
async fn hydrate_m2m_batched(
meta: &crate::migrate::ModelMeta,
pk_name: &str,
rows: &mut [serde_json::Map<String, serde_json::Value>],
) -> Result<(), sqlx::Error> {
if meta.m2m_relations.is_empty() || rows.is_empty() {
return Ok(());
}
for row in rows.iter_mut() {
for rel in &meta.m2m_relations {
row.insert(rel.field_name.clone(), serde_json::Value::Array(Vec::new()));
}
}
let mut parent_sea_vals: Vec<sea_query::Value> = Vec::with_capacity(rows.len());
let mut seen_keys: std::collections::HashSet<String> = std::collections::HashSet::new();
for row in rows.iter() {
let Some(pk_json) = row.get(pk_name) else {
continue;
};
let Some(sea_val) = json_pk_to_sea(pk_json) else {
continue;
};
let key = pk_json_key(pk_json);
if seen_keys.insert(key) {
parent_sea_vals.push(sea_val);
}
}
if parent_sea_vals.is_empty() {
return Ok(());
}
for rel in &meta.m2m_relations {
let junction_table = format!("{}_{}", meta.table, rel.field_name);
let mut sel = Query::select();
sel.from(crate::db::router::schema_qualified_table(&junction_table));
sel.column(Alias::new("parent_id"));
sel.column(Alias::new("child_id"));
sel.and_where(Expr::col(Alias::new("parent_id")).is_in(parent_sea_vals.clone()));
let mut children_by_parent: HashMap<String, Vec<serde_json::Value>> = HashMap::new();
match resolve_pool_dyn(meta, crate::db::RouteOp::Read) {
DbPool::Sqlite(pool) => {
let (sql, values) = sel.build_sqlx(SqliteQueryBuilder);
let db_rows = sqlx::query_with(&sql, values).fetch_all(&pool).await?;
for r in &db_rows {
let parent = read_junction_id_sqlite(r, "parent_id")?;
let child = read_junction_id_sqlite(r, "child_id")?;
children_by_parent
.entry(pk_json_key(&parent))
.or_default()
.push(child);
}
}
DbPool::Postgres(pool) => {
let (sql, values) = sel.build_sqlx(PostgresQueryBuilder);
let db_rows = sqlx::query_with(&sql, values).fetch_all(&pool).await?;
for r in &db_rows {
let parent = read_junction_id_pg(r, "parent_id")?;
let child = read_junction_id_pg(r, "child_id")?;
children_by_parent
.entry(pk_json_key(&parent))
.or_default()
.push(child);
}
}
}
for row in rows.iter_mut() {
let Some(pk_json) = row.get(pk_name) else {
continue;
};
let key = pk_json_key(pk_json);
if let Some(children) = children_by_parent.remove(&key) {
row.insert(rel.field_name.clone(), serde_json::Value::Array(children));
}
}
}
Ok(())
}
fn pk_json_key(v: &serde_json::Value) -> String {
match v {
serde_json::Value::Number(n) => format!("n:{n}"),
serde_json::Value::String(s) => format!("s:{s}"),
other => format!("o:{other}"),
}
}
fn read_junction_id_sqlite(
row: &sqlx::sqlite::SqliteRow,
col: &str,
) -> Result<serde_json::Value, sqlx::Error> {
if let Ok(i) = row.try_get::<i64, _>(col) {
return Ok(serde_json::Value::Number(i.into()));
}
let s = row.try_get::<String, _>(col)?;
Ok(serde_json::Value::String(s))
}
fn read_junction_id_pg(
row: &sqlx::postgres::PgRow,
col: &str,
) -> Result<serde_json::Value, sqlx::Error> {
if let Ok(i) = row.try_get::<i64, _>(col) {
return Ok(serde_json::Value::Number(i.into()));
}
let s = row.try_get::<String, _>(col)?;
Ok(serde_json::Value::String(s))
}
async fn hydrate_m2m_into(
meta: &crate::migrate::ModelMeta,
parent_pk_json: Option<&serde_json::Value>,
out: &mut serde_json::Map<String, serde_json::Value>,
) -> Result<(), sqlx::Error> {
if meta.m2m_relations.is_empty() {
return Ok(());
}
let Some(parent_pk_value) = parent_pk_json.and_then(json_pk_to_sea) else {
return Ok(());
};
for rel in &meta.m2m_relations {
let junction_table = format!("{}_{}", meta.table, rel.field_name);
let mut sel = Query::select();
sel.from(crate::db::router::schema_qualified_table(&junction_table));
sel.column(Alias::new("child_id"));
sel.and_where(Expr::col(Alias::new("parent_id")).eq(parent_pk_value.clone()));
let children: Vec<serde_json::Value> =
match resolve_pool_dyn(meta, crate::db::RouteOp::Read) {
DbPool::Sqlite(pool) => {
let (sql, values) = sel.build_sqlx(SqliteQueryBuilder);
let rows = sqlx::query_with(&sql, values).fetch_all(&pool).await?;
rows.iter()
.map(|r| {
r.try_get::<i64, _>("child_id")
.map(|i| serde_json::Value::Number(i.into()))
.or_else(|_| {
r.try_get::<String, _>("child_id")
.map(serde_json::Value::String)
})
})
.collect::<Result<Vec<_>, _>>()?
}
DbPool::Postgres(pool) => {
let (sql, values) = sel.build_sqlx(PostgresQueryBuilder);
let rows = sqlx::query_with(&sql, values).fetch_all(&pool).await?;
rows.iter()
.map(|r| {
r.try_get::<i64, _>("child_id")
.map(|i| serde_json::Value::Number(i.into()))
.or_else(|_| {
r.try_get::<String, _>("child_id")
.map(serde_json::Value::String)
})
})
.collect::<Result<Vec<_>, _>>()?
}
};
out.insert(rel.field_name.clone(), serde_json::Value::Array(children));
}
Ok(())
}
async fn collect_parent_pks(
meta: &crate::migrate::ModelMeta,
pk_col: &crate::migrate::Column,
where_clauses: &[Condition],
) -> Result<Vec<serde_json::Value>, crate::orm::write::WriteError> {
let mut sel = Query::select();
sel.from(crate::db::router::schema_qualified_table(&meta.table));
sel.column(Alias::new(&pk_col.name));
for cond in where_clauses {
sel.cond_where(cond.clone());
}
match resolve_pool_dyn(meta, crate::db::RouteOp::Read) {
DbPool::Sqlite(pool) => {
let (sql, values) = sel.build_sqlx(SqliteQueryBuilder);
let rows = sqlx::query_with(&sql, values).fetch_all(&pool).await?;
rows.iter()
.map(|row| decode_to_json(row, pk_col))
.collect::<Result<Vec<_>, _>>()
.map_err(crate::orm::write::WriteError::Sqlx)
}
DbPool::Postgres(pool) => {
let (sql, values) = sel.build_sqlx(PostgresQueryBuilder);
let rows = sqlx::query_with(&sql, values).fetch_all(&pool).await?;
rows.iter()
.map(|row| decode_pg_to_json(row, pk_col))
.collect::<Result<Vec<_>, _>>()
.map_err(crate::orm::write::WriteError::Sqlx)
}
}
}
async fn collect_parent_pks_in_tx(
meta: &crate::migrate::ModelMeta,
pk_col: &crate::migrate::Column,
where_clauses: &[Condition],
tx: &mut crate::db::Transaction,
) -> Result<Vec<serde_json::Value>, crate::orm::write::WriteError> {
let mut sel = Query::select();
sel.from(crate::db::router::schema_qualified_table(&meta.table));
sel.column(Alias::new(&pk_col.name));
for cond in where_clauses {
sel.cond_where(cond.clone());
}
match tx.backend_name() {
"sqlite" => {
let (sql, values) = sel.build_sqlx(SqliteQueryBuilder);
let inner = tx.as_sqlite_mut().expect("sqlite backend_name");
let rows = sqlx::query_with(&sql, values)
.fetch_all(&mut **inner)
.await?;
rows.iter()
.map(|row| decode_to_json(row, pk_col))
.collect::<Result<Vec<_>, _>>()
.map_err(crate::orm::write::WriteError::Sqlx)
}
_ => {
let (sql, values) = sel.build_sqlx(PostgresQueryBuilder);
let inner = tx.as_pg_mut().expect("postgres backend_name");
let rows = sqlx::query_with(&sql, values)
.fetch_all(&mut **inner)
.await?;
rows.iter()
.map(|row| decode_pg_to_json(row, pk_col))
.collect::<Result<Vec<_>, _>>()
.map_err(crate::orm::write::WriteError::Sqlx)
}
}
}
fn normalise_insert_body(
meta: &crate::migrate::ModelMeta,
body: &serde_json::Map<String, serde_json::Value>,
) -> Option<serde_json::Map<String, serde_json::Value>> {
let needs_owned = meta
.fields
.iter()
.any(|c| c.noform || c.slug_from.is_some());
if !needs_owned {
return None;
}
let mut owned = body.clone();
for col in &meta.fields {
if col.noform {
owned.remove(&col.name);
}
}
crate::orm::write::apply_slug_from(&meta.fields, &mut owned, false);
Some(owned)
}
struct InsertPlan {
q: sea_query::InsertStatement,
pk_name: String,
pk_ty: SqlType,
}
fn build_insert_plan(
meta: &crate::migrate::ModelMeta,
body: &serde_json::Map<String, serde_json::Value>,
) -> Result<InsertPlan, crate::orm::write::WriteError> {
use crate::orm::write::{WriteError, is_default_pk};
let mut cols: Vec<&str> = Vec::new();
let mut values: Vec<SeaValue> = Vec::new();
for col in &meta.fields {
if col.primary_key {
let supplied = body.get(&col.name);
let is_sentinel = match supplied {
None | Some(serde_json::Value::Null) => true,
Some(v) => is_default_pk(col.ty, v),
};
if matches!(
col.ty,
SqlType::Integer | SqlType::BigInt | SqlType::SmallInt
) && is_sentinel
{
continue;
}
}
let Some(json) = body.get(&col.name) else {
if col.auto_now_add || col.auto_now {
let now_value = crate::orm::write::now_for_column(col.ty);
cols.push(&col.name);
values.push(now_value);
continue;
}
continue;
};
if json.is_null() {
continue;
}
validate_numeric_bounds(col, json)?;
if let (Some(fmt), Some(s)) = (col.text_format.as_deref(), json.as_str()) {
if let Err(e) = crate::orm::validators::validate_text_format(fmt, s) {
return Err(WriteError::Validator {
field: col.name.clone(),
message: e.to_string(),
});
}
}
let sea_value = crate::orm::write::json_to_sea_value(
col.ty,
json,
col.nullable,
&col.name,
fk_target_pk_sql_type(col),
)?;
cols.push(&col.name);
values.push(sea_value);
}
let pk_col = meta.fields.iter().find(|c| c.primary_key).ok_or_else(|| {
WriteError::Sqlx(sqlx::Error::Protocol(
"insert_json: model has no PK".to_string(),
))
})?;
let pk_name = pk_col.name.clone();
let pk_ty = pk_col.ty;
let mut q = Query::insert();
q.into_table(crate::db::router::schema_qualified_table(&meta.table));
q.columns(cols.iter().map(|c| Alias::new(*c)).collect::<Vec<_>>());
let exprs: Vec<sea_query::SimpleExpr> = values.into_iter().map(Into::into).collect();
q.values_panic(exprs);
Ok(InsertPlan { q, pk_name, pk_ty })
}
async fn write_m2m_junctions_in_tx(
meta: &crate::migrate::ModelMeta,
parent_pk_json: Option<&serde_json::Value>,
body: &serde_json::Map<String, serde_json::Value>,
tx: &mut crate::db::Transaction,
) -> Result<(), crate::orm::write::WriteError> {
if meta.m2m_relations.is_empty() {
return Ok(());
}
let Some(parent_pk_value) = parent_pk_json.and_then(json_pk_to_sea) else {
return Ok(());
};
for rel in &meta.m2m_relations {
let Some(value) = body.get(&rel.field_name) else {
continue;
};
let Some(items) = value.as_array() else {
continue;
};
let mut child_ids: Vec<sea_query::Value> = Vec::with_capacity(items.len());
for item in items {
if item.is_null() {
continue;
}
if let Some(v) = json_pk_to_sea(item) {
child_ids.push(v);
}
}
let junction_table = format!("{}_{}", meta.table, rel.field_name);
crate::orm::m2m::set_junction_dynamic_in_tx(
&junction_table,
parent_pk_value.clone(),
child_ids,
tx,
)
.await
.map_err(crate::orm::write::WriteError::Sqlx)?;
}
Ok(())
}
async fn hydrate_m2m_into_tx(
meta: &crate::migrate::ModelMeta,
parent_pk_json: Option<&serde_json::Value>,
out: &mut serde_json::Map<String, serde_json::Value>,
tx: &mut crate::db::Transaction,
) -> Result<(), sqlx::Error> {
if meta.m2m_relations.is_empty() {
return Ok(());
}
let Some(parent_pk_value) = parent_pk_json.and_then(json_pk_to_sea) else {
return Ok(());
};
for rel in &meta.m2m_relations {
let junction_table = format!("{}_{}", meta.table, rel.field_name);
let mut sel = Query::select();
sel.from(crate::db::router::schema_qualified_table(&junction_table));
sel.column(Alias::new("child_id"));
sel.and_where(Expr::col(Alias::new("parent_id")).eq(parent_pk_value.clone()));
let children: Vec<serde_json::Value> = match tx.backend_name() {
"sqlite" => {
let inner = tx.as_sqlite_mut().expect("sqlite backend_name");
let (sql, values) = sel.build_sqlx(SqliteQueryBuilder);
let rows = sqlx::query_with(&sql, values)
.fetch_all(&mut **inner)
.await?;
rows.iter()
.map(|r| {
r.try_get::<i64, _>("child_id")
.map(|i| serde_json::Value::Number(i.into()))
.or_else(|_| {
r.try_get::<String, _>("child_id")
.map(serde_json::Value::String)
})
})
.collect::<Result<Vec<_>, _>>()?
}
_ => {
let inner = tx.as_pg_mut().expect("postgres backend_name");
let (sql, values) = sel.build_sqlx(PostgresQueryBuilder);
let rows = sqlx::query_with(&sql, values)
.fetch_all(&mut **inner)
.await?;
rows.iter()
.map(|r| {
r.try_get::<i64, _>("child_id")
.map(|i| serde_json::Value::Number(i.into()))
.or_else(|_| {
r.try_get::<String, _>("child_id")
.map(serde_json::Value::String)
})
})
.collect::<Result<Vec<_>, _>>()?
}
};
out.insert(rel.field_name.clone(), serde_json::Value::Array(children));
}
Ok(())
}
async fn write_m2m_junctions(
meta: &crate::migrate::ModelMeta,
parent_pk_json: Option<&serde_json::Value>,
body: &serde_json::Map<String, serde_json::Value>,
) -> Result<(), crate::orm::write::WriteError> {
if meta.m2m_relations.is_empty() {
return Ok(());
}
let Some(parent_pk_value) = parent_pk_json.and_then(json_pk_to_sea) else {
return Ok(());
};
for rel in &meta.m2m_relations {
let Some(value) = body.get(&rel.field_name) else {
continue;
};
let Some(items) = value.as_array() else {
continue; };
let mut child_ids: Vec<sea_query::Value> = Vec::with_capacity(items.len());
for item in items {
if item.is_null() {
continue;
}
if let Some(v) = json_pk_to_sea(item) {
child_ids.push(v);
}
}
let junction_table = format!("{}_{}", meta.table, rel.field_name);
crate::orm::m2m::set_junction_dynamic(
&junction_table,
parent_pk_value.clone(),
child_ids,
Some(&meta.name),
)
.await
.map_err(crate::orm::write::WriteError::Sqlx)?;
}
Ok(())
}
fn coerce_csv_cell(ty: SqlType, nullable: bool, raw: &str) -> serde_json::Value {
use serde_json::Value;
if raw.is_empty() && nullable {
return Value::Null;
}
match ty {
SqlType::SmallInt | SqlType::Integer | SqlType::BigInt | SqlType::ForeignKey => raw
.parse::<i64>()
.map(Value::from)
.unwrap_or_else(|_| Value::String(raw.to_string())),
SqlType::Real | SqlType::Double => raw
.parse::<f64>()
.ok()
.and_then(serde_json::Number::from_f64)
.map(Value::Number)
.unwrap_or_else(|| Value::String(raw.to_string())),
SqlType::Boolean => match raw.trim().to_ascii_lowercase().as_str() {
"true" | "1" | "t" | "yes" | "y" => Value::Bool(true),
"false" | "0" | "f" | "no" | "n" => Value::Bool(false),
_ => Value::String(raw.to_string()),
},
SqlType::Json => {
serde_json::from_str(raw).unwrap_or_else(|_| Value::String(raw.to_string()))
}
_ => Value::String(raw.to_string()),
}
}
#[derive(Debug, Default)]
pub struct CsvImportReport {
pub inserted: usize,
pub errors: Vec<(usize, String)>,
}
pub async fn import_table_rows(
meta: &ModelMeta,
headers: &[String],
rows: &[Vec<String>],
) -> CsvImportReport {
let col_for: HashMap<&str, &Column> =
meta.fields.iter().map(|c| (c.name.as_str(), c)).collect();
let mut report = CsvImportReport::default();
for (i, row) in rows.iter().enumerate() {
let mut obj = serde_json::Map::new();
for (header, cell) in headers.iter().zip(row.iter()) {
if let Some(col) = col_for.get(header.as_str()) {
obj.insert(header.clone(), coerce_csv_cell(col.ty, col.nullable, cell));
}
}
match DynQuerySet::for_meta(meta).insert_json(&obj).await {
Ok(_) => report.inserted += 1,
Err(e) => report.errors.push((i + 2, e.to_string())),
}
}
report
}
#[cfg(test)]
mod tests {
use super::form_str_to_sea_value;
use crate::migrate::Column;
use crate::orm::{FkAction, SqlType};
use sea_query::Value as SeaValue;
fn col(name: &str, ty: SqlType, nullable: bool) -> Column {
Column {
name: name.to_string(),
ty,
primary_key: false,
nullable,
fk_target: None,
noform: false,
db_constraint: true,
noedit: false,
is_string_repr: false,
max_length: 0,
choices: Vec::new(),
choice_labels: Vec::new(),
default: String::new(),
is_multichoice: false,
unique: false,
on_delete: FkAction::NoAction,
on_update: FkAction::NoAction,
index: false,
auto_now_add: false,
auto_now: false,
help: String::new(),
example: String::new(),
widget: None,
supported_backends: Vec::new(),
min: None,
max: None,
text_format: None,
slug_from: None,
}
}
#[test]
fn form_fk_numeric_string_binds_as_bigint() {
let mut plugin = col("plugin", SqlType::ForeignKey, false);
plugin.fk_target = Some("plugin".to_string());
let value = form_str_to_sea_value(&plugin, "1").expect("coerce FK id");
assert_eq!(
value,
SeaValue::BigInt(Some(1)),
"integer-backed FK form values must bind as bigint, not text"
);
}
#[test]
fn nullable_form_fk_blank_binds_as_null_bigint() {
let mut parent = col("parent", SqlType::ForeignKey, true);
parent.fk_target = Some("plugin_comment".to_string());
let value = form_str_to_sea_value(&parent, "").expect("blank nullable FK");
assert_eq!(
value,
SeaValue::BigInt(None),
"blank nullable integer-backed FK should bind SQL NULL"
);
}
}