1use std::collections::HashMap;
30
31use sea_query::{
32 Alias, Asterisk, Condition, Expr, Func, Order, PostgresQueryBuilder, Query, SqliteQueryBuilder,
33 Value as SeaValue,
34};
35use sea_query_binder::SqlxBinder;
36use sqlx::Row;
37
38use crate::db::{DbPool, pool_for_dispatched};
39use crate::migrate::{Column, ModelMeta};
40use crate::orm::SqlType;
41use crate::orm::write::{WriteError, json_to_sea_value, null_for};
42
43fn resolve_pool_dyn(meta: &crate::migrate::ModelMeta, op: crate::db::RouteOp) -> crate::db::DbPool {
46 let ctx = crate::db::route_context();
47 let r = crate::db::router::router();
48 let alias = match op {
49 crate::db::RouteOp::Read => r.db_for_read(meta, &ctx),
50 crate::db::RouteOp::Write => r.db_for_write(meta, &ctx),
51 };
52 pool_for_dispatched(alias.as_str()).clone()
53}
54
55#[derive(Debug)]
78pub enum DynError {
79 Write(WriteError),
84 Sqlx(sqlx::Error),
88}
89
90impl std::fmt::Display for DynError {
91 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
92 match self {
93 Self::Write(e) => write!(f, "{e}"),
94 Self::Sqlx(e) => write!(f, "{e}"),
95 }
96 }
97}
98
99impl std::error::Error for DynError {
100 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
101 match self {
102 Self::Write(e) => Some(e),
103 Self::Sqlx(e) => Some(e),
104 }
105 }
106}
107
108impl From<sqlx::Error> for DynError {
109 fn from(e: sqlx::Error) -> Self {
110 Self::Sqlx(e)
111 }
112}
113
114impl From<WriteError> for DynError {
115 fn from(e: WriteError) -> Self {
116 Self::Write(e)
117 }
118}
119
120pub struct DynQuerySet<'a> {
127 meta: &'a ModelMeta,
128 where_clauses: Vec<Condition>,
133 order: Vec<(String, bool)>,
134 limit: Option<u64>,
135 offset: Option<u64>,
136 select_cols: Vec<String>,
137 with_deleted: bool,
138 only_deleted: bool,
139 hard_delete: bool,
140 select_related: Vec<String>,
148}
149
150impl<'a> DynQuerySet<'a> {
151 pub fn for_meta(meta: &'a ModelMeta) -> Self {
155 let select_cols = meta.fields.iter().map(|c| c.name.clone()).collect();
156 Self {
157 meta,
158 where_clauses: Vec::new(),
159 order: Vec::new(),
160 limit: None,
161 offset: None,
162 select_cols,
163 with_deleted: false,
164 only_deleted: false,
165 hard_delete: false,
166 select_related: Vec::new(),
167 }
168 }
169
170 pub fn with_deleted(mut self) -> Self {
173 self.with_deleted = true;
174 self
175 }
176
177 pub fn only_deleted(mut self) -> Self {
180 self.only_deleted = true;
181 self
182 }
183
184 pub fn hard_delete(mut self) -> Self {
186 self.hard_delete = true;
187 self
188 }
189
190 fn effective_where_clauses(&self) -> Vec<Condition> {
191 let mut clauses = self.where_clauses.clone();
192 if self.meta.soft_delete {
193 if self.only_deleted {
194 clauses
195 .push(Condition::all().add(Expr::col(Alias::new("deleted_at")).is_not_null()));
196 } else if !self.with_deleted {
197 clauses.push(Condition::all().add(Expr::col(Alias::new("deleted_at")).is_null()));
198 }
199 }
200 clauses
201 }
202
203 fn live_where_clauses(&self) -> Vec<Condition> {
204 let mut clauses = self.where_clauses.clone();
205 if self.meta.soft_delete {
206 clauses.push(Condition::all().add(Expr::col(Alias::new("deleted_at")).is_null()));
207 }
208 clauses
209 }
210
211 pub fn select_cols(mut self, cols: &[String]) -> Self {
215 let valid: Vec<String> = cols
216 .iter()
217 .filter(|n| self.meta.fields.iter().any(|c| &c.name == *n))
218 .cloned()
219 .collect();
220 if !valid.is_empty() {
221 self.select_cols = valid;
222 }
223 self
224 }
225
226 pub fn select_related_dyn(mut self, fields: &[String]) -> Self {
253 for name in fields {
254 let canonical = normalize_sr_token(name);
255 if validate_sr_chain(self.meta, &canonical).is_none() {
256 continue;
257 }
258 if !self.select_related.iter().any(|n| n == &canonical) {
259 self.select_related.push(canonical);
260 }
261 }
262 self
263 }
264
265 #[doc(hidden)]
268 pub fn select_related_fields(&self) -> &[String] {
269 &self.select_related
270 }
271
272 pub fn search(mut self, fields: &[String], term: &str) -> Self {
296 let term = term.trim();
297 if term.is_empty() {
298 return self;
299 }
300
301 let restricted = !fields.is_empty();
302 let as_int = term.parse::<i64>().ok();
303 let as_float = term.parse::<f64>().ok();
304 let as_bool = match term.to_ascii_lowercase().as_str() {
305 "true" => Some(true),
306 "false" => Some(false),
307 _ => None,
308 };
309 let like_pat = format!("%{}%", crate::orm::escape_like_literal(term)).to_uppercase();
312
313 let mut cond = Condition::any();
314 let mut added = 0;
315 for col in &self.meta.fields {
316 if restricted && !fields.iter().any(|f| f == &col.name) {
317 continue;
318 }
319 let predicate: Option<sea_query::SimpleExpr> = match col.ty {
320 SqlType::Text => Some(
321 Expr::expr(Func::upper(Expr::col(Alias::new(&col.name))))
322 .like(sea_query::LikeExpr::new(like_pat.clone()).escape('\\')),
323 ),
324 SqlType::SmallInt | SqlType::Integer | SqlType::BigInt | SqlType::ForeignKey => {
325 as_int.map(|n| Expr::col(Alias::new(&col.name)).eq(n))
326 }
327 SqlType::Real | SqlType::Double => {
328 as_float.map(|n| Expr::col(Alias::new(&col.name)).eq(n))
329 }
330 SqlType::Boolean => as_bool.map(|b| Expr::col(Alias::new(&col.name)).eq(b)),
331 _ => None,
332 };
333 if let Some(p) = predicate {
334 cond = cond.add(p);
335 added += 1;
336 }
337 }
338 if added > 0 {
339 self.where_clauses.push(cond);
340 }
341 self
342 }
343
344 pub fn filter_condition(mut self, cond: sea_query::Condition) -> Self {
350 self.where_clauses.push(cond);
351 self
352 }
353
354 pub fn filter_in_i64(mut self, col: &str, vals: &[i64]) -> Self {
357 if vals.is_empty() || !self.meta.fields.iter().any(|c| c.name == col) {
358 return self;
359 }
360 let cond = Condition::all().add(Expr::col(Alias::new(col)).is_in(vals.iter().copied()));
361 self.where_clauses.push(cond);
362 self
363 }
364
365 pub fn filter_m2m_contains_any(mut self, field_name: &str, child_ids: &[String]) -> Self {
389 if child_ids.is_empty() {
390 return self;
391 }
392 let Some(rel) = self
393 .meta
394 .m2m_relations
395 .iter()
396 .find(|r| r.field_name == field_name)
397 else {
398 return self;
399 };
400 let Some(pk_col) = self.meta.pk_column() else {
401 return self;
402 };
403 let target_pk_ty = crate::migrate::pk_meta_for_table(&rel.target_table)
414 .map(|(_, ty)| ty)
415 .unwrap_or(SqlType::BigInt);
416 let junction_table = format!("{}_{}", self.meta.table, rel.field_name);
417 let child_id_expr = Expr::col(Alias::new("child_id"));
418 let in_clause: sea_query::SimpleExpr = match target_pk_ty {
419 SqlType::Text | SqlType::Uuid => {
420 let bound: Vec<String> = child_ids
424 .iter()
425 .filter_map(|s| {
426 let s = s.trim();
427 if s.is_empty() {
428 None
429 } else {
430 Some(s.to_string())
431 }
432 })
433 .collect();
434 if bound.is_empty() {
435 return self;
436 }
437 child_id_expr.is_in(bound)
438 }
439 _ => {
440 let parsed: Vec<i64> = child_ids.iter().filter_map(|s| s.parse().ok()).collect();
444 if parsed.is_empty() {
445 return self;
446 }
447 child_id_expr.is_in(parsed)
448 }
449 };
450 let subq = Query::select()
451 .column(Alias::new("parent_id"))
452 .from(crate::db::router::schema_qualified_table(&junction_table))
453 .and_where(in_clause)
454 .to_owned();
455 let cond =
456 Condition::all().add(Expr::col(Alias::new(pk_col.name.clone())).in_subquery(subq));
457 self.where_clauses.push(cond);
458 self
459 }
460
461 pub fn filter_in_strings(mut self, col: &str, vals: &[String]) -> Self {
473 let Some(meta_col) = self.meta.fields.iter().find(|c| c.name == col) else {
474 return self;
475 };
476 if vals.is_empty() {
477 return self;
478 }
479 let expr = Expr::col(Alias::new(col));
480 let cond = match crate::migrate::fk_effective_type(meta_col) {
487 SqlType::SmallInt | SqlType::Integer => {
488 let parsed: Vec<i32> = vals.iter().filter_map(|s| s.parse().ok()).collect();
489 if parsed.is_empty() {
490 return self;
491 }
492 Condition::all().add(expr.is_in(parsed))
493 }
494 SqlType::BigInt | SqlType::ForeignKey => {
495 let parsed: Vec<i64> = vals.iter().filter_map(|s| s.parse().ok()).collect();
496 if parsed.is_empty() {
497 return self;
498 }
499 Condition::all().add(expr.is_in(parsed))
500 }
501 SqlType::Real | SqlType::Double => {
502 let parsed: Vec<f64> = vals.iter().filter_map(|s| s.parse().ok()).collect();
503 if parsed.is_empty() {
504 return self;
505 }
506 Condition::all().add(expr.is_in(parsed))
507 }
508 SqlType::Boolean => {
509 let parsed: Vec<bool> = vals
510 .iter()
511 .map(|s| matches!(s.as_str(), "true" | "on" | "1"))
512 .collect();
513 Condition::all().add(expr.is_in(parsed))
514 }
515 SqlType::Uuid => {
520 let parsed: Vec<uuid::Uuid> = vals
521 .iter()
522 .filter_map(|s| uuid::Uuid::parse_str(s).ok())
523 .collect();
524 if parsed.is_empty() {
525 return self;
526 }
527 Condition::all().add(expr.is_in(parsed))
528 }
529 _ => Condition::all().add(expr.is_in(vals.iter().map(|s| s.to_string()))),
530 };
531 self.where_clauses.push(cond);
532 self
533 }
534
535 pub fn filter_eq_string(mut self, col: &str, value: &str) -> Self {
539 let Some(meta_col) = self.meta.fields.iter().find(|c| c.name == col) else {
540 return self;
541 };
542 let expr = Expr::col(Alias::new(col));
543 let predicate = match crate::migrate::fk_effective_type(meta_col) {
546 SqlType::SmallInt | SqlType::Integer => value.parse::<i32>().ok().map(|v| expr.eq(v)),
547 SqlType::BigInt | SqlType::ForeignKey => value.parse::<i64>().ok().map(|v| expr.eq(v)),
548 SqlType::Real | SqlType::Double => value.parse::<f64>().ok().map(|v| expr.eq(v)),
549 SqlType::Boolean => {
550 let v = matches!(value, "true" | "on" | "1");
551 Some(expr.eq(v))
552 }
553 SqlType::Uuid => uuid::Uuid::parse_str(value).ok().map(|u| expr.eq(u)),
556 _ => Some(expr.eq(value.to_string())),
557 };
558 if let Some(p) = predicate {
559 self.where_clauses.push(Condition::all().add(p));
560 }
561 self
562 }
563
564 pub fn order_by_col(mut self, col: &str, descending: bool) -> Self {
567 if self.meta.fields.iter().any(|c| c.name == col) {
568 self.order.push((col.to_string(), descending));
569 }
570 self
571 }
572
573 pub fn limit(mut self, n: u64) -> Self {
575 self.limit = Some(n);
576 self
577 }
578
579 pub fn offset(mut self, n: u64) -> Self {
581 self.offset = Some(n);
582 self
583 }
584
585 pub async fn count(self) -> Result<i64, DynError> {
589 let mut q = Query::select();
590 q.from(crate::db::router::schema_qualified_table(&self.meta.table));
591 q.expr(Func::count(Expr::col(Asterisk)));
592 let where_clauses = self.effective_where_clauses();
593 for cond in &where_clauses {
594 q.cond_where(cond.clone());
595 }
596
597 match resolve_pool_dyn(self.meta, crate::db::RouteOp::Read) {
598 DbPool::Sqlite(pool) => {
599 let (sql, values) = q.build_sqlx(SqliteQueryBuilder);
600 let row = sqlx::query_with(&sql, values).fetch_one(&pool).await?;
601 Ok(row.try_get::<i64, _>(0)?)
602 }
603 DbPool::Postgres(pool) => {
604 let (sql, values) = q.build_sqlx(PostgresQueryBuilder);
605 let row = sqlx::query_with(&sql, values).fetch_one(&pool).await?;
606 Ok(row.try_get::<i64, _>(0)?)
607 }
608 }
609 }
610
611 pub async fn fetch_distinct_strings(self, col: &str) -> Result<Vec<String>, DynError> {
616 let Some(col_meta) = self.meta.fields.iter().find(|c| c.name == col) else {
617 return Ok(Vec::new());
618 };
619 let mut q = Query::select();
620 q.distinct();
621 q.from(crate::db::router::schema_qualified_table(&self.meta.table));
622 q.column(Alias::new(col));
623 let where_clauses = self.effective_where_clauses();
624 for cond in &where_clauses {
625 q.cond_where(cond.clone());
626 }
627 if let Some(n) = self.limit {
628 q.limit(n);
629 }
630
631 match resolve_pool_dyn(self.meta, crate::db::RouteOp::Read) {
632 DbPool::Sqlite(pool) => {
633 let (sql, values) = q.build_sqlx(SqliteQueryBuilder);
634 let rows = sqlx::query_with(&sql, values).fetch_all(&pool).await?;
635 let mut out = Vec::with_capacity(rows.len());
636 for row in rows {
637 out.push(decode_to_string(&row, col_meta)?);
638 }
639 Ok(out)
640 }
641 DbPool::Postgres(pool) => {
642 let (sql, values) = q.build_sqlx(PostgresQueryBuilder);
643 let rows = sqlx::query_with(&sql, values).fetch_all(&pool).await?;
644 let mut out = Vec::with_capacity(rows.len());
645 for row in rows {
646 out.push(decode_pg_to_string(&row, col_meta)?);
647 }
648 Ok(out)
649 }
650 }
651 }
652
653 pub async fn delete(self) -> Result<u64, DynError> {
662 if self.meta.soft_delete && !self.hard_delete {
663 return self.soft_delete_update().await;
664 }
665 let where_clauses = self.effective_where_clauses();
666 let parent_pks: Vec<serde_json::Value> = match self.meta.pk_column() {
670 Some(pk_col) => collect_parent_pks(&self.meta, pk_col, &where_clauses)
671 .await
672 .unwrap_or_default(),
673 None => Vec::new(),
674 };
675
676 let mut q = Query::delete();
677 q.from_table(crate::db::router::schema_qualified_table(&self.meta.table));
678 for cond in &where_clauses {
679 q.cond_where(cond.clone());
680 }
681
682 let rows_affected = match resolve_pool_dyn(self.meta, crate::db::RouteOp::Write) {
683 DbPool::Sqlite(pool) => {
684 let (sql, values) = q.build_sqlx(SqliteQueryBuilder);
685 let res = sqlx::query_with(&sql, values).execute(&pool).await?;
686 res.rows_affected()
687 }
688 DbPool::Postgres(pool) => {
689 let (sql, values) = q.build_sqlx(PostgresQueryBuilder);
690 let res = sqlx::query_with(&sql, values).execute(&pool).await?;
691 res.rows_affected()
692 }
693 };
694
695 crate::signals::emit_bulk_post_delete_by_table(&self.meta.table, parent_pks).await;
700 Ok(rows_affected)
701 }
702
703 async fn soft_delete_update(self) -> Result<u64, DynError> {
704 let where_clauses = self.live_where_clauses();
705 let parent_pks: Vec<serde_json::Value> = match self.meta.pk_column() {
706 Some(pk_col) => collect_parent_pks(self.meta, pk_col, &where_clauses)
707 .await
708 .unwrap_or_default(),
709 None => Vec::new(),
710 };
711
712 let mut q = Query::update();
713 q.table(crate::db::router::schema_qualified_table(&self.meta.table));
714 q.value(
715 Alias::new("deleted_at"),
716 sea_query::Value::ChronoDateTimeUtc(Some(Box::new(chrono::Utc::now()))),
717 );
718 for cond in &where_clauses {
719 q.cond_where(cond.clone());
720 }
721
722 let rows_affected = match resolve_pool_dyn(self.meta, crate::db::RouteOp::Write) {
723 DbPool::Sqlite(pool) => {
724 let (sql, values) = q.build_sqlx(SqliteQueryBuilder);
725 let res = sqlx::query_with(&sql, values).execute(&pool).await?;
726 res.rows_affected()
727 }
728 DbPool::Postgres(pool) => {
729 let (sql, values) = q.build_sqlx(PostgresQueryBuilder);
730 let res = sqlx::query_with(&sql, values).execute(&pool).await?;
731 res.rows_affected()
732 }
733 };
734
735 crate::signals::emit_bulk_post_delete_by_table(&self.meta.table, parent_pks).await;
736 Ok(rows_affected)
737 }
738
739 pub async fn restore(self) -> Result<u64, DynError> {
750 if !self.meta.soft_delete {
751 return Ok(0);
752 }
753 let mut where_clauses = self.where_clauses.clone();
757 where_clauses
758 .push(Condition::all().add(Expr::col(Alias::new("deleted_at")).is_not_null()));
759
760 let parent_pks: Vec<serde_json::Value> = match self.meta.pk_column() {
761 Some(pk_col) => collect_parent_pks(self.meta, pk_col, &where_clauses)
762 .await
763 .unwrap_or_default(),
764 None => Vec::new(),
765 };
766
767 let mut q = Query::update();
768 q.table(crate::db::router::schema_qualified_table(&self.meta.table));
769 q.value(
770 Alias::new("deleted_at"),
771 sea_query::Value::ChronoDateTimeUtc(None),
772 );
773 for cond in &where_clauses {
774 q.cond_where(cond.clone());
775 }
776
777 let rows_affected = match resolve_pool_dyn(self.meta, crate::db::RouteOp::Write) {
778 DbPool::Sqlite(pool) => {
779 let (sql, values) = q.build_sqlx(SqliteQueryBuilder);
780 let res = sqlx::query_with(&sql, values).execute(&pool).await?;
781 res.rows_affected()
782 }
783 DbPool::Postgres(pool) => {
784 let (sql, values) = q.build_sqlx(PostgresQueryBuilder);
785 let res = sqlx::query_with(&sql, values).execute(&pool).await?;
786 res.rows_affected()
787 }
788 };
789
790 crate::signals::emit_bulk_post_save_by_table(&self.meta.table, parent_pks, false).await;
794 Ok(rows_affected)
795 }
796
797 pub async fn update_one(self, col: &str, value: &str) -> Result<u64, DynError> {
802 let Some(col_meta) = self.meta.fields.iter().find(|c| c.name == col) else {
803 return Ok(0);
804 };
805 let sea_value = match form_str_to_sea_value(col_meta, value) {
806 Ok(v) => v,
807 Err(e) => {
809 return Err(DynError::Write(WriteError::Validator {
810 field: col_meta.name.clone(),
811 message: e.to_string(),
812 }));
813 }
814 };
815
816 let mut q = Query::update();
817 q.table(crate::db::router::schema_qualified_table(&self.meta.table));
818 q.value(Alias::new(col), sea_value);
819 let where_clauses = self.effective_where_clauses();
820 for cond in &where_clauses {
821 q.cond_where(cond.clone());
822 }
823
824 match resolve_pool_dyn(self.meta, crate::db::RouteOp::Write) {
825 DbPool::Sqlite(pool) => {
826 let (sql, values) = q.build_sqlx(SqliteQueryBuilder);
827 let res = sqlx::query_with(&sql, values).execute(&pool).await?;
828 Ok(res.rows_affected())
829 }
830 DbPool::Postgres(pool) => {
831 let (sql, values) = q.build_sqlx(PostgresQueryBuilder);
832 let res = sqlx::query_with(&sql, values).execute(&pool).await?;
833 Ok(res.rows_affected())
834 }
835 }
836 }
837
838 pub async fn update_form(
845 self,
846 form: &HashMap<String, String>,
847 skip: &[String],
848 ) -> Result<u64, DynError> {
849 let Some(q) = self.build_update_form_query(form, skip)? else {
850 return Ok(0);
851 };
852
853 match resolve_pool_dyn(self.meta, crate::db::RouteOp::Write) {
854 DbPool::Sqlite(pool) => {
855 let (sql, values) = q.build_sqlx(SqliteQueryBuilder);
856 let res = sqlx::query_with(&sql, values).execute(&pool).await?;
857 Ok(res.rows_affected())
858 }
859 DbPool::Postgres(pool) => {
860 let (sql, values) = q.build_sqlx(PostgresQueryBuilder);
861 let res = sqlx::query_with(&sql, values).execute(&pool).await?;
862 Ok(res.rows_affected())
863 }
864 }
865 }
866
867 fn build_update_form_query(
875 &self,
876 form: &HashMap<String, String>,
877 skip: &[String],
878 ) -> Result<Option<sea_query::UpdateStatement>, DynError> {
879 let mut q = Query::update();
880 q.table(crate::db::router::schema_qualified_table(&self.meta.table));
881 let mut any = false;
882 for col in &self.meta.fields {
883 if col.primary_key || skip.iter().any(|s| s == &col.name) {
884 continue;
885 }
886 if col.auto_now {
894 q.value(
895 Alias::new(&col.name),
896 crate::orm::write::now_for_column(col.ty),
897 );
898 any = true;
899 continue;
900 }
901 let Some(raw) = form.get(&col.name) else {
902 continue;
903 };
904 let sea_value = match form_str_to_sea_value(col, raw) {
905 Ok(v) => v,
906 Err(e) => {
912 return Err(DynError::Write(WriteError::Validator {
913 field: col.name.clone(),
914 message: e.to_string(),
915 }));
916 }
917 };
918 q.value(Alias::new(&col.name), sea_value);
919 any = true;
920 }
921 if !any {
922 return Ok(None);
923 }
924 let where_clauses = self.effective_where_clauses();
925 for cond in &where_clauses {
926 q.cond_where(cond.clone());
927 }
928 Ok(Some(q))
929 }
930
931 pub async fn update_form_in_tx(
939 self,
940 tx: &mut crate::db::Transaction,
941 form: &HashMap<String, String>,
942 skip: &[String],
943 ) -> Result<u64, DynError> {
944 let Some(q) = self.build_update_form_query(form, skip)? else {
945 return Ok(0);
946 };
947
948 match tx.backend_name() {
949 "sqlite" => {
950 let (sql, values) = q.build_sqlx(SqliteQueryBuilder);
951 let inner = tx.as_sqlite_mut().expect("sqlite backend_name");
952 let res = sqlx::query_with(&sql, values).execute(&mut **inner).await?;
953 Ok(res.rows_affected())
954 }
955 _ => {
956 let (sql, values) = q.build_sqlx(PostgresQueryBuilder);
957 let inner = tx.as_pg_mut().expect("postgres backend_name");
958 let res = sqlx::query_with(&sql, values).execute(&mut **inner).await?;
959 Ok(res.rows_affected())
960 }
961 }
962 }
963
964 pub async fn insert_form(
970 self,
971 form: &HashMap<String, String>,
972 skip: &[String],
973 ) -> Result<i64, DynError> {
974 let Some(mut q) = self.build_insert_form_query(form, skip)? else {
975 return Ok(0);
976 };
977
978 match resolve_pool_dyn(self.meta, crate::db::RouteOp::Write) {
979 DbPool::Sqlite(pool) => {
980 let (sql, vals) = q.build_sqlx(SqliteQueryBuilder);
981 let res = sqlx::query_with(&sql, vals).execute(&pool).await?;
982 Ok(res.last_insert_rowid())
983 }
984 DbPool::Postgres(pool) => {
985 let pk_name = self
991 .meta
992 .fields
993 .iter()
994 .find(|c| c.primary_key)
995 .map(|c| c.name.clone());
996 if let Some(pk) = pk_name {
997 q.returning_col(Alias::new(&pk));
998 let (sql, vals) = q.build_sqlx(PostgresQueryBuilder);
999 let row = sqlx::query_with(&sql, vals).fetch_one(&pool).await?;
1000 Ok(row.try_get::<i64, _>(pk.as_str()).unwrap_or(0))
1001 } else {
1002 let (sql, vals) = q.build_sqlx(PostgresQueryBuilder);
1003 let _ = sqlx::query_with(&sql, vals).execute(&pool).await?;
1004 Ok(0)
1005 }
1006 }
1007 }
1008 }
1009
1010 fn build_insert_form_query(
1019 &self,
1020 form: &HashMap<String, String>,
1021 skip: &[String],
1022 ) -> Result<Option<sea_query::InsertStatement>, DynError> {
1023 let mut cols: Vec<&str> = Vec::new();
1024 let mut values: Vec<SeaValue> = Vec::new();
1025 for col in &self.meta.fields {
1026 if skip.iter().any(|s| s == &col.name) {
1027 continue;
1028 }
1029 if col.primary_key
1032 && matches!(
1033 col.ty,
1034 SqlType::Integer | SqlType::BigInt | SqlType::SmallInt
1035 )
1036 && form.get(&col.name).is_none_or(|v| v.is_empty())
1037 {
1038 continue;
1039 }
1040 if (col.auto_now_add || col.auto_now)
1048 && form.get(&col.name).is_none_or(|v| v.is_empty())
1049 {
1050 cols.push(&col.name);
1051 values.push(crate::orm::write::now_for_column(col.ty));
1052 continue;
1053 }
1054 let raw = form.get(&col.name).map(|s| s.as_str()).unwrap_or("");
1055 let sea_value = match form_str_to_sea_value(col, raw) {
1056 Ok(v) => v,
1057 Err(e) => {
1060 return Err(DynError::Write(WriteError::Validator {
1061 field: col.name.clone(),
1062 message: e.to_string(),
1063 }));
1064 }
1065 };
1066 cols.push(&col.name);
1067 values.push(sea_value);
1068 }
1069 if cols.is_empty() {
1070 return Ok(None);
1071 }
1072
1073 let mut q = Query::insert();
1074 q.into_table(crate::db::router::schema_qualified_table(&self.meta.table));
1075 q.columns(cols.iter().map(|c| Alias::new(*c)).collect::<Vec<_>>());
1076 let exprs: Vec<sea_query::SimpleExpr> = values.into_iter().map(Into::into).collect();
1077 q.values_panic(exprs);
1078 Ok(Some(q))
1079 }
1080
1081 pub async fn insert_form_in_tx(
1090 self,
1091 tx: &mut crate::db::Transaction,
1092 form: &HashMap<String, String>,
1093 skip: &[String],
1094 ) -> Result<i64, DynError> {
1095 let Some(mut q) = self.build_insert_form_query(form, skip)? else {
1096 return Ok(0);
1097 };
1098
1099 match tx.backend_name() {
1100 "sqlite" => {
1101 let (sql, vals) = q.build_sqlx(SqliteQueryBuilder);
1102 let inner = tx.as_sqlite_mut().expect("sqlite backend_name");
1103 let res = sqlx::query_with(&sql, vals).execute(&mut **inner).await?;
1104 Ok(res.last_insert_rowid())
1105 }
1106 _ => {
1107 let pk_name = self
1111 .meta
1112 .fields
1113 .iter()
1114 .find(|c| c.primary_key)
1115 .map(|c| c.name.clone());
1116 let inner = tx.as_pg_mut().expect("postgres backend_name");
1117 if let Some(pk) = pk_name {
1118 q.returning_col(Alias::new(&pk));
1119 let (sql, vals) = q.build_sqlx(PostgresQueryBuilder);
1120 let row = sqlx::query_with(&sql, vals).fetch_one(&mut **inner).await?;
1121 Ok(row.try_get::<i64, _>(pk.as_str()).unwrap_or(0))
1122 } else {
1123 let (sql, vals) = q.build_sqlx(PostgresQueryBuilder);
1124 let _ = sqlx::query_with(&sql, vals).execute(&mut **inner).await?;
1125 Ok(0)
1126 }
1127 }
1128 }
1129 }
1130
1131 pub async fn fetch_as_strings(self) -> Result<Vec<HashMap<String, String>>, DynError> {
1136 let mut q = Query::select();
1137 q.from(crate::db::router::schema_qualified_table(&self.meta.table));
1138 for c in &self.select_cols {
1139 q.column(Alias::new(c));
1140 }
1141 let where_clauses = self.effective_where_clauses();
1142 for cond in &where_clauses {
1143 q.cond_where(cond.clone());
1144 }
1145 for (col, descending) in &self.order {
1146 q.order_by(
1147 Alias::new(col),
1148 if *descending { Order::Desc } else { Order::Asc },
1149 );
1150 }
1151 if let Some(n) = self.limit {
1152 q.limit(n);
1153 }
1154 if let Some(n) = self.offset {
1155 q.offset(n);
1156 }
1157
1158 match resolve_pool_dyn(self.meta, crate::db::RouteOp::Read) {
1159 DbPool::Sqlite(pool) => {
1160 let (sql, values) = q.build_sqlx(SqliteQueryBuilder);
1161 let rows = sqlx::query_with(&sql, values).fetch_all(&pool).await?;
1162 let mut out: Vec<HashMap<String, String>> = Vec::with_capacity(rows.len());
1163 for row in rows {
1164 let mut entry = HashMap::new();
1165 for col_name in &self.select_cols {
1166 if let Some(col_meta) =
1167 self.meta.fields.iter().find(|c| &c.name == col_name)
1168 {
1169 let v = decode_to_string(&row, col_meta)?;
1170 entry.insert(col_name.clone(), v);
1171 }
1172 }
1173 out.push(entry);
1174 }
1175 Ok(out)
1176 }
1177 DbPool::Postgres(pool) => {
1178 let (sql, values) = q.build_sqlx(PostgresQueryBuilder);
1179 let rows = sqlx::query_with(&sql, values).fetch_all(&pool).await?;
1180 let mut out: Vec<HashMap<String, String>> = Vec::with_capacity(rows.len());
1181 for row in rows {
1182 let mut entry = HashMap::new();
1183 for col_name in &self.select_cols {
1184 if let Some(col_meta) =
1185 self.meta.fields.iter().find(|c| &c.name == col_name)
1186 {
1187 let v = decode_pg_to_string(&row, col_meta)?;
1188 entry.insert(col_name.clone(), v);
1189 }
1190 }
1191 out.push(entry);
1192 }
1193 Ok(out)
1194 }
1195 }
1196 }
1197
1198 pub async fn fetch_as_json(
1204 self,
1205 ) -> Result<Vec<serde_json::Map<String, serde_json::Value>>, DynError> {
1206 let mut q = Query::select();
1207 q.from(crate::db::router::schema_qualified_table(&self.meta.table));
1208 for c in &self.select_cols {
1209 q.column(Alias::new(c));
1210 }
1211 let where_clauses = self.effective_where_clauses();
1212 for cond in &where_clauses {
1213 q.cond_where(cond.clone());
1214 }
1215 for (col, descending) in &self.order {
1216 q.order_by(
1217 Alias::new(col),
1218 if *descending { Order::Desc } else { Order::Asc },
1219 );
1220 }
1221 if let Some(n) = self.limit {
1222 q.limit(n);
1223 }
1224 if let Some(n) = self.offset {
1225 q.offset(n);
1226 }
1227
1228 let pk_name = self
1229 .meta
1230 .pk_column()
1231 .map(|c| c.name.clone())
1232 .unwrap_or_default();
1233 let selected_cols: Vec<(&String, &Column)> = self
1234 .select_cols
1235 .iter()
1236 .filter_map(|col_name| {
1237 self.meta
1238 .fields
1239 .iter()
1240 .find(|c| &c.name == col_name)
1241 .map(|col| (col_name, col))
1242 })
1243 .collect();
1244 let mut out: Vec<serde_json::Map<String, serde_json::Value>> =
1245 match resolve_pool_dyn(self.meta, crate::db::RouteOp::Read) {
1246 DbPool::Sqlite(pool) => {
1247 let (sql, values) = q.build_sqlx(SqliteQueryBuilder);
1248 let rows = sqlx::query_with(&sql, values).fetch_all(&pool).await?;
1249 let mut out: Vec<serde_json::Map<String, serde_json::Value>> =
1250 Vec::with_capacity(rows.len());
1251 for row in rows {
1252 let mut entry = serde_json::Map::new();
1253 for (col_name, col_meta) in &selected_cols {
1254 entry.insert((*col_name).clone(), decode_to_json(&row, col_meta)?);
1255 }
1256 out.push(entry);
1257 }
1258 out
1259 }
1260 DbPool::Postgres(pool) => {
1261 let (sql, values) = q.build_sqlx(PostgresQueryBuilder);
1262 let rows = sqlx::query_with(&sql, values).fetch_all(&pool).await?;
1263 let mut out: Vec<serde_json::Map<String, serde_json::Value>> =
1264 Vec::with_capacity(rows.len());
1265 for row in rows {
1266 let mut entry = serde_json::Map::new();
1267 for (col_name, col_meta) in &selected_cols {
1268 entry.insert((*col_name).clone(), decode_pg_to_json(&row, col_meta)?);
1269 }
1270 out.push(entry);
1271 }
1272 out
1273 }
1274 };
1275
1276 if !self.meta.m2m_relations.is_empty() && !out.is_empty() {
1286 hydrate_m2m_batched(&self.meta, &pk_name, &mut out).await?;
1287 }
1288
1289 if !self.select_related.is_empty() && !out.is_empty() {
1298 hydrate_select_related_into(&self.meta, &self.select_related, &mut out).await?;
1299 }
1300 Ok(out)
1301 }
1302
1303 pub async fn first_as_json(
1306 mut self,
1307 ) -> Result<Option<serde_json::Map<String, serde_json::Value>>, DynError> {
1308 self.limit = Some(1);
1309 let mut rows = self.fetch_as_json().await?;
1310 Ok(rows.pop())
1311 }
1312
1313 pub async fn fetch_one_json_in_tx(
1323 self,
1324 tx: &mut crate::db::Transaction,
1325 ) -> Result<Option<serde_json::Map<String, serde_json::Value>>, DynError> {
1326 let mut q = Query::select();
1327 q.from(crate::db::router::schema_qualified_table(&self.meta.table));
1328 for c in &self.meta.fields {
1329 q.column(Alias::new(&c.name));
1330 }
1331 let where_clauses = self.effective_where_clauses();
1332 for cond in &where_clauses {
1333 q.cond_where(cond.clone());
1334 }
1335 q.limit(1);
1336
1337 let out = match tx.backend_name() {
1338 "sqlite" => {
1339 let (sql, values) = q.build_sqlx(SqliteQueryBuilder);
1340 let inner = tx.as_sqlite_mut().expect("sqlite backend_name");
1341 let row = sqlx::query_with(&sql, values)
1342 .fetch_optional(&mut **inner)
1343 .await?;
1344 match row {
1345 Some(row) => {
1346 let mut entry = serde_json::Map::new();
1347 for col in &self.meta.fields {
1348 entry.insert(col.name.clone(), decode_to_json(&row, col)?);
1349 }
1350 Some(entry)
1351 }
1352 None => None,
1353 }
1354 }
1355 _ => {
1356 let (sql, values) = q.build_sqlx(PostgresQueryBuilder);
1357 let inner = tx.as_pg_mut().expect("postgres backend_name");
1358 let row = sqlx::query_with(&sql, values)
1359 .fetch_optional(&mut **inner)
1360 .await?;
1361 match row {
1362 Some(row) => {
1363 let mut entry = serde_json::Map::new();
1364 for col in &self.meta.fields {
1365 entry.insert(col.name.clone(), decode_pg_to_json(&row, col)?);
1366 }
1367 Some(entry)
1368 }
1369 None => None,
1370 }
1371 }
1372 };
1373 Ok(out)
1374 }
1375
1376 pub async fn insert_json(
1384 self,
1385 body: &serde_json::Map<String, serde_json::Value>,
1386 ) -> Result<serde_json::Map<String, serde_json::Value>, crate::orm::write::WriteError> {
1387 use crate::orm::write::WriteError;
1388
1389 let body_owned: serde_json::Map<String, serde_json::Value>;
1392 let body: &serde_json::Map<String, serde_json::Value> =
1393 match normalise_insert_body(self.meta, body) {
1394 Some(owned) => {
1395 body_owned = owned;
1396 &body_owned
1397 }
1398 None => body,
1399 };
1400
1401 let validation_errors = crate::orm::validation::validate_on_create(self.meta, body).await;
1403 if !validation_errors.is_empty() {
1404 return Err(WriteError::Multiple {
1405 errors: validation_errors,
1406 });
1407 }
1408
1409 let InsertPlan {
1412 mut q,
1413 pk_name,
1414 pk_ty,
1415 } = build_insert_plan(self.meta, body)?;
1416
1417 crate::signals::emit_pre_save_by_table(
1423 &self.meta.table,
1424 serde_json::Value::Object(body.clone()),
1425 true,
1426 )
1427 .await;
1428
1429 match resolve_pool_dyn(self.meta, crate::db::RouteOp::Write) {
1430 DbPool::Sqlite(pool) => {
1431 let (sql, vals) = q.build_sqlx(SqliteQueryBuilder);
1432 let res = sqlx::query_with(&sql, vals)
1433 .execute(&pool)
1434 .await
1435 .map_err(|e| classify_or_sqlx(e, body))?;
1436 let pk_pred = match pk_ty {
1440 SqlType::Integer | SqlType::BigInt | SqlType::SmallInt => {
1441 Expr::col(Alias::new(&pk_name)).eq(res.last_insert_rowid())
1442 }
1443 _ => {
1444 let supplied = body
1447 .get(&pk_name)
1448 .cloned()
1449 .unwrap_or(serde_json::Value::Null);
1450 let sea_value = crate::orm::write::json_to_sea_value(
1451 pk_ty, &supplied, false, &pk_name, None,
1452 )?;
1453 Expr::col(Alias::new(&pk_name)).eq(sea_value)
1454 }
1455 };
1456 let mut sel = Query::select();
1457 sel.from(crate::db::router::schema_qualified_table(&self.meta.table));
1458 for c in &self.meta.fields {
1459 sel.column(Alias::new(&c.name));
1460 }
1461 sel.cond_where(Condition::all().add(pk_pred));
1462 let (sel_sql, sel_vals) = sel.build_sqlx(SqliteQueryBuilder);
1463 let row = sqlx::query_with(&sel_sql, sel_vals)
1464 .fetch_one(&pool)
1465 .await?;
1466 let mut out = serde_json::Map::new();
1467 for col in &self.meta.fields {
1468 out.insert(col.name.clone(), decode_to_json(&row, col)?);
1469 }
1470 let pk_value = out.get(&pk_name).cloned();
1476 write_m2m_junctions(&self.meta, pk_value.as_ref(), body).await?;
1477 hydrate_m2m_into(&self.meta, pk_value.as_ref(), &mut out).await?;
1481 crate::signals::emit_post_save_by_table(
1483 &self.meta.table,
1484 serde_json::Value::Object(out.clone()),
1485 true,
1486 )
1487 .await;
1488 Ok(out)
1489 }
1490 DbPool::Postgres(pool) => {
1491 q.returning_all();
1496 let (sql, vals) = q.build_sqlx(PostgresQueryBuilder);
1497 let row = sqlx::query_with(&sql, vals)
1498 .fetch_one(&pool)
1499 .await
1500 .map_err(|e| classify_or_sqlx(e, body))?;
1501 let mut out = serde_json::Map::new();
1502 for col in &self.meta.fields {
1503 out.insert(col.name.clone(), decode_pg_to_json(&row, col)?);
1504 }
1505 let pk_value = out.get(&pk_name).cloned();
1506 write_m2m_junctions(&self.meta, pk_value.as_ref(), body).await?;
1507 hydrate_m2m_into(&self.meta, pk_value.as_ref(), &mut out).await?;
1508 crate::signals::emit_post_save_by_table(
1510 &self.meta.table,
1511 serde_json::Value::Object(out.clone()),
1512 true,
1513 )
1514 .await;
1515 Ok(out)
1516 }
1517 }
1518 }
1519
1520 pub async fn insert_json_in_tx(
1544 self,
1545 body: &serde_json::Map<String, serde_json::Value>,
1546 tx: &mut crate::db::Transaction,
1547 ) -> Result<serde_json::Map<String, serde_json::Value>, crate::orm::write::WriteError> {
1548 use crate::orm::write::WriteError;
1549
1550 let body_owned: serde_json::Map<String, serde_json::Value>;
1552 let body: &serde_json::Map<String, serde_json::Value> =
1553 match normalise_insert_body(self.meta, body) {
1554 Some(owned) => {
1555 body_owned = owned;
1556 &body_owned
1557 }
1558 None => body,
1559 };
1560
1561 let validation_errors =
1564 crate::orm::validation::validate_on_create_in_tx(self.meta, body, tx).await;
1565 if !validation_errors.is_empty() {
1566 return Err(WriteError::Multiple {
1567 errors: validation_errors,
1568 });
1569 }
1570
1571 let InsertPlan {
1573 mut q,
1574 pk_name,
1575 pk_ty,
1576 } = build_insert_plan(self.meta, body)?;
1577
1578 match tx.backend_name() {
1579 "sqlite" => {
1580 let (sql, vals) = q.build_sqlx(SqliteQueryBuilder);
1581 let res = {
1582 let inner = tx.as_sqlite_mut().expect("sqlite backend_name");
1583 sqlx::query_with(&sql, vals)
1584 .execute(&mut **inner)
1585 .await
1586 .map_err(|e| classify_or_sqlx(e, body))?
1587 };
1588 let pk_pred = match pk_ty {
1591 SqlType::Integer | SqlType::BigInt | SqlType::SmallInt => {
1592 Expr::col(Alias::new(&pk_name)).eq(res.last_insert_rowid())
1593 }
1594 _ => {
1595 let supplied = body
1596 .get(&pk_name)
1597 .cloned()
1598 .unwrap_or(serde_json::Value::Null);
1599 let sea_value = crate::orm::write::json_to_sea_value(
1600 pk_ty, &supplied, false, &pk_name, None,
1601 )?;
1602 Expr::col(Alias::new(&pk_name)).eq(sea_value)
1603 }
1604 };
1605 let mut sel = Query::select();
1606 sel.from(crate::db::router::schema_qualified_table(&self.meta.table));
1607 for c in &self.meta.fields {
1608 sel.column(Alias::new(&c.name));
1609 }
1610 sel.cond_where(Condition::all().add(pk_pred));
1611 let (sel_sql, sel_vals) = sel.build_sqlx(SqliteQueryBuilder);
1612 let mut out = serde_json::Map::new();
1613 {
1614 let inner = tx.as_sqlite_mut().expect("sqlite backend_name");
1615 let row = sqlx::query_with(&sel_sql, sel_vals)
1616 .fetch_one(&mut **inner)
1617 .await?;
1618 for col in &self.meta.fields {
1619 out.insert(col.name.clone(), decode_to_json(&row, col)?);
1620 }
1621 }
1622 let pk_value = out.get(&pk_name).cloned();
1624 write_m2m_junctions_in_tx(self.meta, pk_value.as_ref(), body, tx).await?;
1625 hydrate_m2m_into_tx(self.meta, pk_value.as_ref(), &mut out, tx).await?;
1626 Ok(out)
1627 }
1628 _ => {
1629 q.returning_all();
1630 let (sql, vals) = q.build_sqlx(PostgresQueryBuilder);
1631 let mut out = serde_json::Map::new();
1632 {
1633 let inner = tx.as_pg_mut().expect("postgres backend_name");
1634 let row = sqlx::query_with(&sql, vals)
1635 .fetch_one(&mut **inner)
1636 .await
1637 .map_err(|e| classify_or_sqlx(e, body))?;
1638 for col in &self.meta.fields {
1639 out.insert(col.name.clone(), decode_pg_to_json(&row, col)?);
1640 }
1641 }
1642 let pk_value = out.get(&pk_name).cloned();
1643 write_m2m_junctions_in_tx(self.meta, pk_value.as_ref(), body, tx).await?;
1644 hydrate_m2m_into_tx(self.meta, pk_value.as_ref(), &mut out, tx).await?;
1645 Ok(out)
1646 }
1647 }
1648 }
1649
1650 pub async fn update_json_in_tx(
1661 self,
1662 body: &serde_json::Map<String, serde_json::Value>,
1663 tx: &mut crate::db::Transaction,
1664 ) -> Result<u64, crate::orm::write::WriteError> {
1665 use crate::orm::write::WriteError;
1666
1667 let needs_owned = self
1670 .meta
1671 .fields
1672 .iter()
1673 .any(|c| c.noform || c.slug_from.is_some());
1674 let mut body_owned: serde_json::Map<String, serde_json::Value>;
1675 let body: &serde_json::Map<String, serde_json::Value> = if needs_owned {
1676 body_owned = body.clone();
1677 for col in &self.meta.fields {
1678 if col.noform {
1679 body_owned.remove(&col.name);
1680 }
1681 }
1682 crate::orm::write::apply_slug_from(&self.meta.fields, &mut body_owned, true);
1683 &body_owned
1684 } else {
1685 body
1686 };
1687
1688 let validation_errors =
1692 crate::orm::validation::validate_on_update_in_tx(self.meta, body, tx).await;
1693 if !validation_errors.is_empty() {
1694 return Err(WriteError::Multiple {
1695 errors: validation_errors,
1696 });
1697 }
1698
1699 let mut q = Query::update();
1700 q.table(crate::db::router::schema_qualified_table(&self.meta.table));
1701 let mut any = false;
1702 for col in &self.meta.fields {
1703 if col.primary_key {
1704 continue;
1705 }
1706 let Some(json) = body.get(&col.name) else {
1707 if col.auto_now {
1708 let now_value = crate::orm::write::now_for_column(col.ty);
1709 q.value(Alias::new(&col.name), now_value);
1710 any = true;
1711 }
1712 continue;
1713 };
1714 validate_numeric_bounds(col, json)?;
1715 if let (Some(fmt), Some(s)) = (col.text_format.as_deref(), json.as_str()) {
1716 if let Err(e) = crate::orm::validators::validate_text_format(fmt, s) {
1717 return Err(WriteError::Validator {
1718 field: col.name.clone(),
1719 message: e.to_string(),
1720 });
1721 }
1722 }
1723 let sea_value = crate::orm::write::json_to_sea_value(
1724 col.ty,
1725 json,
1726 col.nullable,
1727 &col.name,
1728 fk_target_pk_sql_type(col),
1729 )?;
1730 q.value(Alias::new(&col.name), sea_value);
1731 any = true;
1732 }
1733 let touches_m2m = self
1734 .meta
1735 .m2m_relations
1736 .iter()
1737 .any(|r| body.contains_key(&r.field_name));
1738 if !any && !touches_m2m {
1739 return Ok(0);
1740 }
1741 let where_clauses = self.effective_where_clauses();
1742 for cond in &where_clauses {
1743 q.cond_where(cond.clone());
1744 }
1745
1746 let parent_pks: Vec<serde_json::Value> = match self.meta.pk_column() {
1750 Some(pk_col) => {
1751 collect_parent_pks_in_tx(self.meta, pk_col, &where_clauses, tx).await?
1752 }
1753 None => Vec::new(),
1754 };
1755
1756 if any {
1757 match tx.backend_name() {
1758 "sqlite" => {
1759 let (sql, values) = q.build_sqlx(SqliteQueryBuilder);
1760 let inner = tx.as_sqlite_mut().expect("sqlite backend_name");
1761 sqlx::query_with(&sql, values)
1762 .execute(&mut **inner)
1763 .await
1764 .map_err(|e| classify_or_sqlx(e, body))?;
1765 }
1766 _ => {
1767 let (sql, values) = q.build_sqlx(PostgresQueryBuilder);
1768 let inner = tx.as_pg_mut().expect("postgres backend_name");
1769 sqlx::query_with(&sql, values)
1770 .execute(&mut **inner)
1771 .await
1772 .map_err(|e| classify_or_sqlx(e, body))?;
1773 }
1774 }
1775 }
1776 for pk in &parent_pks {
1777 write_m2m_junctions_in_tx(self.meta, Some(pk), body, tx).await?;
1778 }
1779 Ok(parent_pks.len().max(if any { 1 } else { 0 }) as u64)
1780 }
1781
1782 pub async fn delete_in_tx(self, tx: &mut crate::db::Transaction) -> Result<u64, DynError> {
1790 let soft = self.meta.soft_delete && !self.hard_delete;
1791 let where_clauses = if soft {
1792 self.live_where_clauses()
1793 } else {
1794 self.effective_where_clauses()
1795 };
1796
1797 let table = crate::db::router::schema_qualified_table(&self.meta.table);
1801 let build = |is_sqlite: bool| {
1802 if soft {
1803 let mut u = Query::update();
1804 u.table(table.clone());
1805 u.value(
1806 Alias::new("deleted_at"),
1807 sea_query::Value::ChronoDateTimeUtc(Some(Box::new(chrono::Utc::now()))),
1808 );
1809 for cond in &where_clauses {
1810 u.cond_where(cond.clone());
1811 }
1812 if is_sqlite {
1813 u.build_sqlx(SqliteQueryBuilder)
1814 } else {
1815 u.build_sqlx(PostgresQueryBuilder)
1816 }
1817 } else {
1818 let mut d = Query::delete();
1819 d.from_table(table.clone());
1820 for cond in &where_clauses {
1821 d.cond_where(cond.clone());
1822 }
1823 if is_sqlite {
1824 d.build_sqlx(SqliteQueryBuilder)
1825 } else {
1826 d.build_sqlx(PostgresQueryBuilder)
1827 }
1828 }
1829 };
1830
1831 let rows_affected = match tx.backend_name() {
1832 "sqlite" => {
1833 let (sql, values) = build(true);
1834 let inner = tx.as_sqlite_mut().expect("sqlite backend_name");
1835 sqlx::query_with(&sql, values)
1836 .execute(&mut **inner)
1837 .await?
1838 .rows_affected()
1839 }
1840 _ => {
1841 let (sql, values) = build(false);
1842 let inner = tx.as_pg_mut().expect("postgres backend_name");
1843 sqlx::query_with(&sql, values)
1844 .execute(&mut **inner)
1845 .await?
1846 .rows_affected()
1847 }
1848 };
1849 Ok(rows_affected)
1850 }
1851
1852 pub async fn update_json(
1856 self,
1857 body: &serde_json::Map<String, serde_json::Value>,
1858 ) -> Result<u64, crate::orm::write::WriteError> {
1859 use crate::orm::write::WriteError;
1860
1861 let needs_owned = self
1868 .meta
1869 .fields
1870 .iter()
1871 .any(|c| c.noform || c.slug_from.is_some());
1872 let mut body_owned: serde_json::Map<String, serde_json::Value>;
1873 let body: &serde_json::Map<String, serde_json::Value> = if needs_owned {
1874 body_owned = body.clone();
1875 for col in &self.meta.fields {
1876 if col.noform {
1877 body_owned.remove(&col.name);
1878 }
1879 }
1880 crate::orm::write::apply_slug_from(&self.meta.fields, &mut body_owned, true);
1881 &body_owned
1882 } else {
1883 body
1884 };
1885
1886 let validation_errors = crate::orm::validation::validate_on_update(&self.meta, body).await;
1892 if !validation_errors.is_empty() {
1893 return Err(WriteError::Multiple {
1894 errors: validation_errors,
1895 });
1896 }
1897
1898 let mut q = Query::update();
1899 q.table(crate::db::router::schema_qualified_table(&self.meta.table));
1900 let mut any = false;
1901 for col in &self.meta.fields {
1902 if col.primary_key {
1903 continue;
1904 }
1905 let Some(json) = body.get(&col.name) else {
1906 if col.auto_now {
1911 let now_value = crate::orm::write::now_for_column(col.ty);
1912 q.value(Alias::new(&col.name), now_value);
1913 any = true;
1914 }
1915 continue;
1916 };
1917 validate_numeric_bounds(col, json)?;
1918 if let (Some(fmt), Some(s)) = (col.text_format.as_deref(), json.as_str()) {
1921 if let Err(e) = crate::orm::validators::validate_text_format(fmt, s) {
1922 return Err(WriteError::Validator {
1923 field: col.name.clone(),
1924 message: e.to_string(),
1925 });
1926 }
1927 }
1928 let sea_value = crate::orm::write::json_to_sea_value(
1929 col.ty,
1930 json,
1931 col.nullable,
1932 &col.name,
1933 fk_target_pk_sql_type(col),
1934 )?;
1935 q.value(Alias::new(&col.name), sea_value);
1936 any = true;
1937 }
1938 let touches_m2m = self
1943 .meta
1944 .m2m_relations
1945 .iter()
1946 .any(|r| body.contains_key(&r.field_name));
1947 if !any && !touches_m2m {
1948 return Ok(0);
1949 }
1950 let where_clauses = self.effective_where_clauses();
1951 for cond in &where_clauses {
1952 q.cond_where(cond.clone());
1953 }
1954 let parent_pks: Vec<serde_json::Value> = match self.meta.pk_column() {
1966 Some(pk_col) => collect_parent_pks(&self.meta, pk_col, &self.where_clauses).await?,
1967 None => Vec::new(),
1968 };
1969
1970 match resolve_pool_dyn(self.meta, crate::db::RouteOp::Write) {
1971 DbPool::Sqlite(pool) => {
1972 if any {
1973 let (sql, values) = q.build_sqlx(SqliteQueryBuilder);
1974 sqlx::query_with(&sql, values)
1975 .execute(&pool)
1976 .await
1977 .map_err(|e| classify_or_sqlx(e, body))?;
1978 }
1979 for pk in &parent_pks {
1980 write_m2m_junctions(&self.meta, Some(pk), body).await?;
1981 }
1982 crate::signals::emit_bulk_post_save_by_table(
1988 &self.meta.table,
1989 parent_pks.clone(),
1990 false,
1991 )
1992 .await;
1993 Ok(parent_pks.len().max(if any { 1 } else { 0 }) as u64)
1994 }
1995 DbPool::Postgres(pool) => {
1996 if any {
1997 let (sql, values) = q.build_sqlx(PostgresQueryBuilder);
1998 sqlx::query_with(&sql, values)
1999 .execute(&pool)
2000 .await
2001 .map_err(|e| classify_or_sqlx(e, body))?;
2002 }
2003 for pk in &parent_pks {
2004 write_m2m_junctions(&self.meta, Some(pk), body).await?;
2005 }
2006 crate::signals::emit_bulk_post_save_by_table(
2007 &self.meta.table,
2008 parent_pks.clone(),
2009 false,
2010 )
2011 .await;
2012 Ok(parent_pks.len().max(if any { 1 } else { 0 }) as u64)
2013 }
2014 }
2015 }
2016}
2017
2018pub fn decode_to_string(
2024 row: &sqlx::sqlite::SqliteRow,
2025 col: &Column,
2026) -> Result<String, sqlx::Error> {
2027 use chrono::{DateTime, NaiveDate, NaiveTime, Utc};
2028 use serde_json::Value;
2029 use uuid::Uuid;
2030
2031 let name = col.name.as_str();
2032 if col.nullable {
2033 return Ok(match col.ty {
2034 SqlType::SmallInt | SqlType::Integer => row
2035 .try_get::<Option<i32>, _>(name)?
2036 .map_or(String::new(), |v| v.to_string()),
2037 SqlType::BigInt => row
2038 .try_get::<Option<i64>, _>(name)?
2039 .map_or(String::new(), |v| v.to_string()),
2040 SqlType::Real => row
2041 .try_get::<Option<f32>, _>(name)?
2042 .map_or(String::new(), |v| v.to_string()),
2043 SqlType::Double => row
2044 .try_get::<Option<f64>, _>(name)?
2045 .map_or(String::new(), |v| v.to_string()),
2046 SqlType::Boolean => row
2047 .try_get::<Option<bool>, _>(name)?
2048 .map_or(String::new(), |v| {
2049 if v { "true" } else { "false" }.to_string()
2050 }),
2051 SqlType::Text => row.try_get::<Option<String>, _>(name)?.unwrap_or_default(),
2052 SqlType::Date => row
2053 .try_get::<Option<NaiveDate>, _>(name)?
2054 .map_or(String::new(), |v| v.to_string()),
2055 SqlType::Time => row
2056 .try_get::<Option<NaiveTime>, _>(name)?
2057 .map_or(String::new(), |v| v.to_string()),
2058 SqlType::Timestamptz => row
2059 .try_get::<Option<DateTime<Utc>>, _>(name)?
2060 .map_or(String::new(), |v| v.to_rfc3339()),
2061 SqlType::Uuid => row
2062 .try_get::<Option<Uuid>, _>(name)?
2063 .map_or(String::new(), |v| v.to_string()),
2064 SqlType::Json => row
2065 .try_get::<Option<Value>, _>(name)?
2066 .map_or(String::new(), |v| v.to_string()),
2067 SqlType::Array(_) => panic_array_unsupported(&col.name),
2068 SqlType::Inet
2069 | SqlType::Cidr
2070 | SqlType::MacAddr
2071 | SqlType::Xml
2072 | SqlType::Ltree
2073 | SqlType::Bit
2074 | SqlType::FullText => panic_pg_only_unsupported(&col.name),
2075 SqlType::ForeignKey => match fk_target_pk_sql_type(col) {
2079 Some(SqlType::Text) => row.try_get::<Option<String>, _>(name)?.unwrap_or_default(),
2080 Some(SqlType::Uuid) => row
2081 .try_get::<Option<Uuid>, _>(name)?
2082 .map_or(String::new(), |v| v.to_string()),
2083 _ => row
2084 .try_get::<Option<i64>, _>(name)?
2085 .map_or(String::new(), |v| v.to_string()),
2086 },
2087 SqlType::Bytes => row
2088 .try_get::<Option<Vec<u8>>, _>(name)?
2089 .map_or(String::new(), |b| hex_encode(&b)),
2090 SqlType::Decimal => panic_pg_only_unsupported(&col.name),
2091 });
2092 }
2093 Ok(match col.ty {
2094 SqlType::SmallInt | SqlType::Integer => row.try_get::<i32, _>(name)?.to_string(),
2095 SqlType::BigInt => row.try_get::<i64, _>(name)?.to_string(),
2096 SqlType::Real => row.try_get::<f32, _>(name)?.to_string(),
2097 SqlType::Double => row.try_get::<f64, _>(name)?.to_string(),
2098 SqlType::Boolean => if row.try_get::<bool, _>(name)? {
2099 "true"
2100 } else {
2101 "false"
2102 }
2103 .to_string(),
2104 SqlType::Text => row.try_get::<String, _>(name)?,
2105 SqlType::Date => row.try_get::<NaiveDate, _>(name)?.to_string(),
2106 SqlType::Time => row.try_get::<NaiveTime, _>(name)?.to_string(),
2107 SqlType::Timestamptz => row.try_get::<DateTime<Utc>, _>(name)?.to_rfc3339(),
2108 SqlType::Uuid => row.try_get::<Uuid, _>(name)?.to_string(),
2109 SqlType::Json => row.try_get::<Value, _>(name)?.to_string(),
2110 SqlType::Array(_) => panic_array_unsupported(&col.name),
2111 SqlType::Inet
2112 | SqlType::Cidr
2113 | SqlType::MacAddr
2114 | SqlType::Xml
2115 | SqlType::Ltree
2116 | SqlType::Bit
2117 | SqlType::FullText => panic_pg_only_unsupported(&col.name),
2118 SqlType::ForeignKey => match fk_target_pk_sql_type(col) {
2119 Some(SqlType::Text) => row.try_get::<String, _>(name)?,
2120 Some(SqlType::Uuid) => row.try_get::<Uuid, _>(name)?.to_string(),
2121 _ => row.try_get::<i64, _>(name)?.to_string(),
2122 },
2123 SqlType::Bytes => hex_encode(&row.try_get::<Vec<u8>, _>(name)?),
2124 SqlType::Decimal => panic_pg_only_unsupported(&col.name),
2125 })
2126}
2127
2128pub fn decode_pg_to_string(
2139 row: &sqlx::postgres::PgRow,
2140 col: &Column,
2141) -> Result<String, sqlx::Error> {
2142 use chrono::{DateTime, NaiveDate, NaiveTime, Utc};
2143 use serde_json::Value;
2144 use uuid::Uuid;
2145
2146 let name = col.name.as_str();
2147 if col.nullable {
2148 return Ok(match col.ty {
2149 SqlType::SmallInt => row
2150 .try_get::<Option<i16>, _>(name)?
2151 .map_or(String::new(), |v| v.to_string()),
2152 SqlType::Integer => row
2153 .try_get::<Option<i32>, _>(name)?
2154 .map_or(String::new(), |v| v.to_string()),
2155 SqlType::BigInt => row
2156 .try_get::<Option<i64>, _>(name)?
2157 .map_or(String::new(), |v| v.to_string()),
2158 SqlType::Real => row
2159 .try_get::<Option<f32>, _>(name)?
2160 .map_or(String::new(), |v| v.to_string()),
2161 SqlType::Double => row
2162 .try_get::<Option<f64>, _>(name)?
2163 .map_or(String::new(), |v| v.to_string()),
2164 SqlType::Boolean => row
2165 .try_get::<Option<bool>, _>(name)?
2166 .map_or(String::new(), |v| {
2167 if v { "true" } else { "false" }.to_string()
2168 }),
2169 SqlType::Text => row.try_get::<Option<String>, _>(name)?.unwrap_or_default(),
2170 SqlType::Date => row
2171 .try_get::<Option<NaiveDate>, _>(name)?
2172 .map_or(String::new(), |v| v.to_string()),
2173 SqlType::Time => row
2174 .try_get::<Option<NaiveTime>, _>(name)?
2175 .map_or(String::new(), |v| v.to_string()),
2176 SqlType::Timestamptz => row
2177 .try_get::<Option<DateTime<Utc>>, _>(name)?
2178 .map_or(String::new(), |v| v.to_rfc3339()),
2179 SqlType::Uuid => row
2180 .try_get::<Option<Uuid>, _>(name)?
2181 .map_or(String::new(), |v| v.to_string()),
2182 SqlType::Json => row
2183 .try_get::<Option<Value>, _>(name)?
2184 .map_or(String::new(), |v| v.to_string()),
2185 SqlType::Array(_)
2191 | SqlType::Inet
2192 | SqlType::Cidr
2193 | SqlType::MacAddr
2194 | SqlType::Xml
2195 | SqlType::Ltree
2196 | SqlType::Bit
2197 | SqlType::FullText => row
2198 .try_get::<Option<String>, _>(name)
2199 .ok()
2200 .flatten()
2201 .unwrap_or_default(),
2202 SqlType::ForeignKey => match fk_target_pk_sql_type(col) {
2205 Some(SqlType::Text) => row.try_get::<Option<String>, _>(name)?.unwrap_or_default(),
2206 Some(SqlType::Uuid) => row
2207 .try_get::<Option<Uuid>, _>(name)?
2208 .map_or(String::new(), |v| v.to_string()),
2209 _ => row
2210 .try_get::<Option<i64>, _>(name)?
2211 .map_or(String::new(), |v| v.to_string()),
2212 },
2213 SqlType::Bytes => row
2214 .try_get::<Option<Vec<u8>>, _>(name)?
2215 .map_or(String::new(), |b| hex_encode(&b)),
2216 SqlType::Decimal => panic_pg_only_unsupported(&col.name),
2217 });
2218 }
2219 Ok(match col.ty {
2220 SqlType::SmallInt => row.try_get::<i16, _>(name)?.to_string(),
2221 SqlType::Integer => row.try_get::<i32, _>(name)?.to_string(),
2222 SqlType::BigInt => row.try_get::<i64, _>(name)?.to_string(),
2223 SqlType::Real => row.try_get::<f32, _>(name)?.to_string(),
2224 SqlType::Double => row.try_get::<f64, _>(name)?.to_string(),
2225 SqlType::Boolean => if row.try_get::<bool, _>(name)? {
2226 "true"
2227 } else {
2228 "false"
2229 }
2230 .to_string(),
2231 SqlType::Text => row.try_get::<String, _>(name)?,
2232 SqlType::Date => row.try_get::<NaiveDate, _>(name)?.to_string(),
2233 SqlType::Time => row.try_get::<NaiveTime, _>(name)?.to_string(),
2234 SqlType::Timestamptz => row.try_get::<DateTime<Utc>, _>(name)?.to_rfc3339(),
2235 SqlType::Uuid => row.try_get::<Uuid, _>(name)?.to_string(),
2236 SqlType::Json => row.try_get::<Value, _>(name)?.to_string(),
2237 SqlType::Array(_)
2239 | SqlType::Inet
2240 | SqlType::Cidr
2241 | SqlType::MacAddr
2242 | SqlType::Xml
2243 | SqlType::Ltree
2244 | SqlType::Bit
2245 | SqlType::FullText => row.try_get::<String, _>(name).unwrap_or_default(),
2246 SqlType::ForeignKey => match fk_target_pk_sql_type(col) {
2247 Some(SqlType::Text) => row.try_get::<String, _>(name)?,
2248 Some(SqlType::Uuid) => row.try_get::<Uuid, _>(name)?.to_string(),
2249 _ => row.try_get::<i64, _>(name)?.to_string(),
2250 },
2251 SqlType::Bytes => hex_encode(&row.try_get::<Vec<u8>, _>(name)?),
2252 SqlType::Decimal => panic_pg_only_unsupported(&col.name),
2253 })
2254}
2255
2256pub fn decode_to_json_aliased(
2269 row: &sqlx::sqlite::SqliteRow,
2270 col: &Column,
2271 alias: &str,
2272) -> Result<serde_json::Value, sqlx::Error> {
2273 let mut aliased = col.clone();
2274 aliased.name = alias.to_string();
2275 decode_to_json(row, &aliased)
2276}
2277
2278pub fn decode_pg_to_json_aliased(
2280 row: &sqlx::postgres::PgRow,
2281 col: &Column,
2282 alias: &str,
2283) -> Result<serde_json::Value, sqlx::Error> {
2284 let mut aliased = col.clone();
2285 aliased.name = alias.to_string();
2286 decode_pg_to_json(row, &aliased)
2287}
2288
2289fn fk_target_pk_sql_type(col: &Column) -> Option<SqlType> {
2313 if !matches!(col.ty, SqlType::ForeignKey) {
2314 return None;
2315 }
2316 let target_table = col.fk_target.as_deref()?;
2317 crate::migrate::pk_meta_for_table(target_table).map(|(_, ty)| ty)
2318}
2319
2320pub fn decode_to_json(
2321 row: &sqlx::sqlite::SqliteRow,
2322 col: &Column,
2323) -> Result<serde_json::Value, sqlx::Error> {
2324 use chrono::{DateTime, NaiveDate, NaiveTime, Utc};
2325 use serde_json::Value;
2326 use uuid::Uuid;
2327
2328 let name = col.name.as_str();
2329 if col.nullable {
2330 return Ok(match col.ty {
2331 SqlType::SmallInt | SqlType::Integer => row
2332 .try_get::<Option<i32>, _>(name)?
2333 .map_or(Value::Null, Value::from),
2334 SqlType::BigInt => row
2335 .try_get::<Option<i64>, _>(name)?
2336 .map_or(Value::Null, Value::from),
2337 SqlType::Real => row
2338 .try_get::<Option<f32>, _>(name)?
2339 .map_or(Value::Null, |v| Value::from(v as f64)),
2340 SqlType::Double => row
2341 .try_get::<Option<f64>, _>(name)?
2342 .map_or(Value::Null, Value::from),
2343 SqlType::Boolean => row
2344 .try_get::<Option<bool>, _>(name)?
2345 .map_or(Value::Null, Value::from),
2346 SqlType::Text => row
2347 .try_get::<Option<String>, _>(name)?
2348 .map_or(Value::Null, Value::from),
2349 SqlType::Date => row
2350 .try_get::<Option<NaiveDate>, _>(name)?
2351 .map_or(Value::Null, |v| Value::from(v.to_string())),
2352 SqlType::Time => row
2353 .try_get::<Option<NaiveTime>, _>(name)?
2354 .map_or(Value::Null, |v| Value::from(v.to_string())),
2355 SqlType::Timestamptz => row
2356 .try_get::<Option<DateTime<Utc>>, _>(name)?
2357 .map_or(Value::Null, |v| Value::from(v.to_rfc3339())),
2358 SqlType::Uuid => row
2359 .try_get::<Option<Uuid>, _>(name)?
2360 .map_or(Value::Null, |v| Value::from(v.to_string())),
2361 SqlType::Json => row
2362 .try_get::<Option<Value>, _>(name)?
2363 .unwrap_or(Value::Null),
2364 SqlType::Array(_) => panic_array_unsupported(&col.name),
2365 SqlType::Inet
2366 | SqlType::Cidr
2367 | SqlType::MacAddr
2368 | SqlType::Xml
2369 | SqlType::Ltree
2370 | SqlType::Bit
2371 | SqlType::FullText => panic_pg_only_unsupported(&col.name),
2372 SqlType::ForeignKey => match fk_target_pk_sql_type(col) {
2377 Some(SqlType::Text) => row
2378 .try_get::<Option<String>, _>(name)?
2379 .map_or(Value::Null, Value::from),
2380 Some(SqlType::Uuid) => row
2381 .try_get::<Option<Uuid>, _>(name)?
2382 .map_or(Value::Null, |v| Value::from(v.to_string())),
2383 _ => row
2384 .try_get::<Option<i64>, _>(name)?
2385 .map_or(Value::Null, Value::from),
2386 },
2387 SqlType::Bytes => row
2388 .try_get::<Option<Vec<u8>>, _>(name)?
2389 .map_or(Value::Null, |b| bytes_to_json(&b)),
2390 SqlType::Decimal => panic_pg_only_unsupported(&col.name),
2391 });
2392 }
2393 Ok(match col.ty {
2394 SqlType::SmallInt | SqlType::Integer => Value::from(row.try_get::<i32, _>(name)?),
2395 SqlType::BigInt => Value::from(row.try_get::<i64, _>(name)?),
2396 SqlType::Real => Value::from(row.try_get::<f32, _>(name)? as f64),
2397 SqlType::Double => Value::from(row.try_get::<f64, _>(name)?),
2398 SqlType::Boolean => Value::from(row.try_get::<bool, _>(name)?),
2399 SqlType::Text => Value::from(row.try_get::<String, _>(name)?),
2400 SqlType::Date => Value::from(row.try_get::<NaiveDate, _>(name)?.to_string()),
2401 SqlType::Time => Value::from(row.try_get::<NaiveTime, _>(name)?.to_string()),
2402 SqlType::Timestamptz => Value::from(row.try_get::<DateTime<Utc>, _>(name)?.to_rfc3339()),
2403 SqlType::Uuid => Value::from(row.try_get::<Uuid, _>(name)?.to_string()),
2404 SqlType::Json => row.try_get::<Value, _>(name)?,
2405 SqlType::Array(_) => panic_array_unsupported(&col.name),
2406 SqlType::Inet
2407 | SqlType::Cidr
2408 | SqlType::MacAddr
2409 | SqlType::Xml
2410 | SqlType::Ltree
2411 | SqlType::Bit
2412 | SqlType::FullText => panic_pg_only_unsupported(&col.name),
2413 SqlType::ForeignKey => match fk_target_pk_sql_type(col) {
2416 Some(SqlType::Text) => Value::from(row.try_get::<String, _>(name)?),
2417 Some(SqlType::Uuid) => Value::from(row.try_get::<Uuid, _>(name)?.to_string()),
2418 _ => Value::from(row.try_get::<i64, _>(name)?),
2419 },
2420 SqlType::Bytes => bytes_to_json(&row.try_get::<Vec<u8>, _>(name)?),
2421 SqlType::Decimal => panic_pg_only_unsupported(&col.name),
2422 })
2423}
2424
2425pub fn decode_pg_to_json(
2429 row: &sqlx::postgres::PgRow,
2430 col: &Column,
2431) -> Result<serde_json::Value, sqlx::Error> {
2432 use chrono::{DateTime, NaiveDate, NaiveTime, Utc};
2433 use serde_json::Value;
2434 use uuid::Uuid;
2435
2436 let name = col.name.as_str();
2437 if col.nullable {
2438 return Ok(match col.ty {
2439 SqlType::SmallInt => row
2440 .try_get::<Option<i16>, _>(name)?
2441 .map_or(Value::Null, Value::from),
2442 SqlType::Integer => row
2443 .try_get::<Option<i32>, _>(name)?
2444 .map_or(Value::Null, Value::from),
2445 SqlType::BigInt => row
2446 .try_get::<Option<i64>, _>(name)?
2447 .map_or(Value::Null, Value::from),
2448 SqlType::Real => row
2449 .try_get::<Option<f32>, _>(name)?
2450 .map_or(Value::Null, |v| Value::from(v as f64)),
2451 SqlType::Double => row
2452 .try_get::<Option<f64>, _>(name)?
2453 .map_or(Value::Null, Value::from),
2454 SqlType::Boolean => row
2455 .try_get::<Option<bool>, _>(name)?
2456 .map_or(Value::Null, Value::from),
2457 SqlType::Text => row
2458 .try_get::<Option<String>, _>(name)?
2459 .map_or(Value::Null, Value::from),
2460 SqlType::Date => row
2461 .try_get::<Option<NaiveDate>, _>(name)?
2462 .map_or(Value::Null, |v| Value::from(v.to_string())),
2463 SqlType::Time => row
2464 .try_get::<Option<NaiveTime>, _>(name)?
2465 .map_or(Value::Null, |v| Value::from(v.to_string())),
2466 SqlType::Timestamptz => row
2467 .try_get::<Option<DateTime<Utc>>, _>(name)?
2468 .map_or(Value::Null, |v| Value::from(v.to_rfc3339())),
2469 SqlType::Uuid => row
2470 .try_get::<Option<Uuid>, _>(name)?
2471 .map_or(Value::Null, |v| Value::from(v.to_string())),
2472 SqlType::Json => row
2473 .try_get::<Option<Value>, _>(name)?
2474 .unwrap_or(Value::Null),
2475 SqlType::Array(_)
2476 | SqlType::Inet
2477 | SqlType::Cidr
2478 | SqlType::MacAddr
2479 | SqlType::Xml
2480 | SqlType::Ltree
2481 | SqlType::Bit
2482 | SqlType::FullText => row
2483 .try_get::<Option<String>, _>(name)
2484 .ok()
2485 .flatten()
2486 .map_or(Value::Null, Value::from),
2487 SqlType::ForeignKey => match fk_target_pk_sql_type(col) {
2490 Some(SqlType::Text) => row
2491 .try_get::<Option<String>, _>(name)?
2492 .map_or(Value::Null, Value::from),
2493 Some(SqlType::Uuid) => row
2494 .try_get::<Option<Uuid>, _>(name)?
2495 .map_or(Value::Null, |v| Value::from(v.to_string())),
2496 _ => row
2497 .try_get::<Option<i64>, _>(name)?
2498 .map_or(Value::Null, Value::from),
2499 },
2500 SqlType::Bytes => row
2501 .try_get::<Option<Vec<u8>>, _>(name)?
2502 .map_or(Value::Null, |b| bytes_to_json(&b)),
2503 SqlType::Decimal => panic_pg_only_unsupported(&col.name),
2504 });
2505 }
2506 Ok(match col.ty {
2507 SqlType::SmallInt => Value::from(row.try_get::<i16, _>(name)?),
2508 SqlType::Integer => Value::from(row.try_get::<i32, _>(name)?),
2509 SqlType::BigInt => Value::from(row.try_get::<i64, _>(name)?),
2510 SqlType::Real => Value::from(row.try_get::<f32, _>(name)? as f64),
2511 SqlType::Double => Value::from(row.try_get::<f64, _>(name)?),
2512 SqlType::Boolean => Value::from(row.try_get::<bool, _>(name)?),
2513 SqlType::Text => Value::from(row.try_get::<String, _>(name)?),
2514 SqlType::Date => Value::from(row.try_get::<NaiveDate, _>(name)?.to_string()),
2515 SqlType::Time => Value::from(row.try_get::<NaiveTime, _>(name)?.to_string()),
2516 SqlType::Timestamptz => Value::from(row.try_get::<DateTime<Utc>, _>(name)?.to_rfc3339()),
2517 SqlType::Uuid => Value::from(row.try_get::<Uuid, _>(name)?.to_string()),
2518 SqlType::Json => row.try_get::<Value, _>(name)?,
2519 SqlType::Array(_)
2520 | SqlType::Inet
2521 | SqlType::Cidr
2522 | SqlType::MacAddr
2523 | SqlType::Xml
2524 | SqlType::Ltree
2525 | SqlType::Bit
2526 | SqlType::FullText => row
2527 .try_get::<String, _>(name)
2528 .map(Value::from)
2529 .unwrap_or(Value::Null),
2530 SqlType::ForeignKey => match fk_target_pk_sql_type(col) {
2533 Some(SqlType::Text) => Value::from(row.try_get::<String, _>(name)?),
2534 Some(SqlType::Uuid) => Value::from(row.try_get::<Uuid, _>(name)?.to_string()),
2535 _ => Value::from(row.try_get::<i64, _>(name)?),
2536 },
2537 SqlType::Bytes => bytes_to_json(&row.try_get::<Vec<u8>, _>(name)?),
2538 SqlType::Decimal => panic_pg_only_unsupported(&col.name),
2539 })
2540}
2541
2542fn form_str_to_sea_value(col: &Column, raw: &str) -> Result<SeaValue, WriteError> {
2550 if raw.is_empty() {
2551 if col.ty == SqlType::Boolean {
2552 return Ok(SeaValue::Bool(Some(false)));
2554 }
2555 if col.nullable {
2556 return Ok(null_for(col.ty));
2557 }
2558 return Err(WriteError::RequiredFieldMissing {
2559 field: col.name.clone(),
2560 });
2561 }
2562 if matches!(col.ty, SqlType::Json | SqlType::Array(_)) {
2574 let parsed: serde_json::Value =
2575 serde_json::from_str(raw).map_err(|e| WriteError::Validator {
2576 field: col.name.clone(),
2577 message: format!("Not valid JSON: {e}"),
2578 })?;
2579 return json_to_sea_value(col.ty, &parsed, col.nullable, &col.name, None);
2580 }
2581 if matches!(col.ty, SqlType::ForeignKey) {
2582 return match fk_target_pk_sql_type(col) {
2583 Some(SqlType::Text) => Ok(SeaValue::String(Some(Box::new(raw.to_string())))),
2584 Some(SqlType::Uuid) => uuid::Uuid::parse_str(raw)
2585 .map(|v| SeaValue::Uuid(Some(Box::new(v))))
2586 .map_err(|_| WriteError::TypeMismatch {
2587 field: col.name.clone(),
2588 expected: SqlType::Uuid,
2589 got: raw.to_string(),
2590 }),
2591 _ => raw
2592 .parse::<i64>()
2593 .map(|v| SeaValue::BigInt(Some(v)))
2594 .map_err(|_| WriteError::TypeMismatch {
2595 field: col.name.clone(),
2596 expected: SqlType::BigInt,
2597 got: raw.to_string(),
2598 }),
2599 };
2600 }
2601 let json = serde_json::Value::String(raw.to_string());
2602 json_to_sea_value(col.ty, &json, col.nullable, &col.name, None)
2603}
2604
2605fn hex_encode(bytes: &[u8]) -> String {
2609 let mut out = String::with_capacity(bytes.len() * 2);
2610 for b in bytes {
2611 out.push_str(&format!("{b:02x}"));
2612 }
2613 out
2614}
2615
2616fn bytes_to_json(bytes: &[u8]) -> serde_json::Value {
2619 serde_json::Value::Array(bytes.iter().map(|b| serde_json::Value::from(*b)).collect())
2620}
2621
2622fn panic_array_unsupported(column: &str) -> ! {
2623 panic!(
2624 "DynQuerySet: column `{column}` is a Postgres-only Array; the \
2625 field/backend system check should have failed boot."
2626 )
2627}
2628
2629fn panic_pg_only_unsupported(column: &str) -> ! {
2630 panic!(
2631 "DynQuerySet: column `{column}` is a Postgres-only network type \
2632 (Inet/Cidr/MacAddr); the field/backend system check should \
2633 have failed boot."
2634 )
2635}
2636
2637fn classify_or_sqlx(
2643 e: sqlx::Error,
2644 body: &serde_json::Map<String, serde_json::Value>,
2645) -> crate::orm::write::WriteError {
2646 if let Some(classified) = crate::orm::validation::classify_sql_error(&e, body) {
2647 return classified;
2648 }
2649 crate::orm::write::WriteError::Sqlx(e)
2650}
2651
2652fn validate_numeric_bounds(
2653 col: &Column,
2654 json: &serde_json::Value,
2655) -> Result<(), crate::orm::write::WriteError> {
2656 let Some(n) = json.as_f64() else {
2657 return Ok(());
2658 };
2659 if let Some(min) = col.min {
2660 if n < min as f64 {
2661 return Err(crate::orm::write::WriteError::Validator {
2662 field: col.name.clone(),
2663 message: format!("must be >= {min} (got {n})."),
2664 });
2665 }
2666 }
2667 if let Some(max) = col.max {
2668 if n > max as f64 {
2669 return Err(crate::orm::write::WriteError::Validator {
2670 field: col.name.clone(),
2671 message: format!("must be <= {max} (got {n})."),
2672 });
2673 }
2674 }
2675 Ok(())
2676}
2677
2678fn json_pk_to_sea(v: &serde_json::Value) -> Option<sea_query::Value> {
2684 match v {
2685 serde_json::Value::Number(n) => n.as_i64().map(|i| sea_query::Value::BigInt(Some(i))),
2686 serde_json::Value::String(s) => Some(sea_query::Value::String(Some(Box::new(s.clone())))),
2687 _ => None,
2688 }
2689}
2690
2691fn normalize_sr_token(name: &str) -> String {
2708 name.replace("__", ".")
2709}
2710
2711fn validate_sr_chain(root_meta: &crate::migrate::ModelMeta, chain: &str) -> Option<Vec<String>> {
2721 let hops: Vec<&str> = chain.split('.').filter(|s| !s.is_empty()).collect();
2722 if hops.is_empty() {
2723 return None;
2724 }
2725 let registered = crate::migrate::registered_models();
2726 let mut targets: Vec<String> = Vec::with_capacity(hops.len());
2727 let mut current_table: String = root_meta.table.clone();
2728 let mut current_meta: Option<crate::migrate::ModelMeta> = None;
2729 for hop in &hops {
2730 let meta_ref: &crate::migrate::ModelMeta =
2731 if current_table == root_meta.table && current_meta.is_none() {
2732 root_meta
2733 } else {
2734 current_meta = registered
2735 .iter()
2736 .find(|m| m.table == current_table)
2737 .cloned();
2738 current_meta.as_ref()?
2739 };
2740 let col = meta_ref.fields.iter().find(|c| &c.name == hop)?;
2741 let target = col.fk_target.clone()?;
2742 targets.push(target.clone());
2743 current_table = target;
2744 }
2745 Some(targets)
2746}
2747
2748async fn hydrate_select_related_into(
2765 meta: &crate::migrate::ModelMeta,
2766 sr_fields: &[String],
2767 rows: &mut [serde_json::Map<String, serde_json::Value>],
2768) -> Result<(), sqlx::Error> {
2769 let pool = resolve_pool_dyn(meta, crate::db::RouteOp::Read);
2770 for chain in sr_fields {
2771 let hops: Vec<&str> = chain.split('.').filter(|s| !s.is_empty()).collect();
2772 if hops.is_empty() {
2773 continue;
2774 }
2775 let Some(targets) = validate_sr_chain(meta, chain) else {
2776 continue;
2781 };
2782
2783 let registered = crate::migrate::registered_models();
2792 let hop_target_pk: Vec<(String, SqlType)> = targets
2793 .iter()
2794 .filter_map(|t| {
2795 registered
2796 .iter()
2797 .find(|m| &m.table == t)
2798 .and_then(|m| m.pk_column().map(|c| (c.name.clone(), c.ty)))
2799 })
2800 .collect();
2801 if hop_target_pk.len() != hops.len() {
2802 continue;
2806 }
2807 let hop_target_soft_delete: Vec<bool> = targets
2808 .iter()
2809 .map(|t| {
2810 registered
2811 .iter()
2812 .find(|m| &m.table == t)
2813 .is_some_and(|m| m.soft_delete)
2814 })
2815 .collect();
2816
2817 let first_field = hops[0];
2821 let mut ids: Vec<serde_json::Value> = Vec::with_capacity(rows.len());
2822 for row in rows.iter() {
2823 let Some(v) = row.get(first_field) else {
2824 continue;
2825 };
2826 if v.is_null() {
2827 continue;
2828 }
2829 ids.push(v.clone());
2830 }
2831 if ids.is_empty() {
2832 continue;
2833 }
2834 dedup_by_pk_key(&mut ids);
2835 let mut levels: Vec<Vec<serde_json::Value>> = Vec::with_capacity(hops.len());
2836 levels.push(
2837 crate::orm::queryset::hydration::fetch_related_as_json_by_pk(
2838 &targets[0],
2839 &hop_target_pk[0].0,
2840 hop_target_pk[0].1,
2841 hop_target_soft_delete[0],
2842 &ids,
2843 &pool,
2844 )
2845 .await?,
2846 );
2847
2848 for hop_idx in 1..hops.len() {
2849 let hop_field = hops[hop_idx];
2850 let hop_target = &targets[hop_idx];
2851 let prev_lvl = &levels[hop_idx - 1];
2852 let mut next_ids: Vec<serde_json::Value> = prev_lvl
2853 .iter()
2854 .filter_map(|r| {
2855 let v = r.as_object()?.get(hop_field)?;
2856 if v.is_null() { None } else { Some(v.clone()) }
2857 })
2858 .collect();
2859 if next_ids.is_empty() {
2860 break;
2865 }
2866 dedup_by_pk_key(&mut next_ids);
2867 levels.push(
2868 crate::orm::queryset::hydration::fetch_related_as_json_by_pk(
2869 hop_target,
2870 &hop_target_pk[hop_idx].0,
2871 hop_target_pk[hop_idx].1,
2872 hop_target_soft_delete[hop_idx],
2873 &next_ids,
2874 &pool,
2875 )
2876 .await?,
2877 );
2878 }
2879
2880 if levels.len() > 1 {
2885 for i in (0..levels.len() - 1).rev() {
2886 let next_pk_col = &hop_target_pk[i + 1].0;
2887 let next_by_pk: HashMap<String, serde_json::Value> = levels[i + 1]
2888 .iter()
2889 .filter_map(|obj| {
2890 let map = obj.as_object()?;
2891 let pk_val = map.get(next_pk_col.as_str())?;
2892 Some((pk_json_key(pk_val), obj.clone()))
2893 })
2894 .collect();
2895 let hop_field = hops[i + 1];
2896 for row in levels[i].iter_mut() {
2897 let Some(map) = row.as_object_mut() else {
2898 continue;
2899 };
2900 let Some(fk_val) = map.get(hop_field) else {
2901 continue;
2902 };
2903 if fk_val.is_null() {
2904 continue;
2905 }
2906 let key = pk_json_key(fk_val);
2907 if let Some(next_json) = next_by_pk.get(&key) {
2908 map.insert(hop_field.to_string(), next_json.clone());
2909 }
2910 }
2911 }
2912 }
2913
2914 let first_pk_col = &hop_target_pk[0].0;
2921 let first_by_pk: HashMap<String, serde_json::Value> = levels
2922 .into_iter()
2923 .next()
2924 .unwrap_or_default()
2925 .into_iter()
2926 .filter_map(|obj| {
2927 let map = obj.as_object()?;
2928 let pk_val = map.get(first_pk_col.as_str())?;
2929 Some((pk_json_key(pk_val), obj.clone()))
2930 })
2931 .collect();
2932 for row in rows.iter_mut() {
2933 let Some(fk_val) = row.get(first_field) else {
2934 continue;
2935 };
2936 if fk_val.is_null() {
2937 continue;
2938 }
2939 let key = pk_json_key(fk_val);
2940 if let Some(resolved) = first_by_pk.get(&key) {
2941 row.insert(first_field.to_string(), resolved.clone());
2942 }
2943 }
2944 }
2945 Ok(())
2946}
2947
2948fn dedup_by_pk_key(ids: &mut Vec<serde_json::Value>) {
2953 let mut seen: std::collections::HashSet<String> = std::collections::HashSet::new();
2954 ids.retain(|v| seen.insert(pk_json_key(v)));
2955}
2956
2957async fn hydrate_m2m_batched(
2971 meta: &crate::migrate::ModelMeta,
2972 pk_name: &str,
2973 rows: &mut [serde_json::Map<String, serde_json::Value>],
2974) -> Result<(), sqlx::Error> {
2975 if meta.m2m_relations.is_empty() || rows.is_empty() {
2976 return Ok(());
2977 }
2978
2979 for row in rows.iter_mut() {
2984 for rel in &meta.m2m_relations {
2985 row.insert(rel.field_name.clone(), serde_json::Value::Array(Vec::new()));
2986 }
2987 }
2988
2989 let mut parent_sea_vals: Vec<sea_query::Value> = Vec::with_capacity(rows.len());
2993 let mut seen_keys: std::collections::HashSet<String> = std::collections::HashSet::new();
2994 for row in rows.iter() {
2995 let Some(pk_json) = row.get(pk_name) else {
2996 continue;
2997 };
2998 let Some(sea_val) = json_pk_to_sea(pk_json) else {
2999 continue;
3000 };
3001 let key = pk_json_key(pk_json);
3002 if seen_keys.insert(key) {
3003 parent_sea_vals.push(sea_val);
3004 }
3005 }
3006 if parent_sea_vals.is_empty() {
3007 return Ok(());
3008 }
3009
3010 for rel in &meta.m2m_relations {
3011 let junction_table = format!("{}_{}", meta.table, rel.field_name);
3012 let mut sel = Query::select();
3013 sel.from(crate::db::router::schema_qualified_table(&junction_table));
3014 sel.column(Alias::new("parent_id"));
3015 sel.column(Alias::new("child_id"));
3016 sel.and_where(Expr::col(Alias::new("parent_id")).is_in(parent_sea_vals.clone()));
3017
3018 let mut children_by_parent: HashMap<String, Vec<serde_json::Value>> = HashMap::new();
3019 match resolve_pool_dyn(meta, crate::db::RouteOp::Read) {
3020 DbPool::Sqlite(pool) => {
3021 let (sql, values) = sel.build_sqlx(SqliteQueryBuilder);
3022 let db_rows = sqlx::query_with(&sql, values).fetch_all(&pool).await?;
3023 for r in &db_rows {
3024 let parent = read_junction_id_sqlite(r, "parent_id")?;
3025 let child = read_junction_id_sqlite(r, "child_id")?;
3026 children_by_parent
3027 .entry(pk_json_key(&parent))
3028 .or_default()
3029 .push(child);
3030 }
3031 }
3032 DbPool::Postgres(pool) => {
3033 let (sql, values) = sel.build_sqlx(PostgresQueryBuilder);
3034 let db_rows = sqlx::query_with(&sql, values).fetch_all(&pool).await?;
3035 for r in &db_rows {
3036 let parent = read_junction_id_pg(r, "parent_id")?;
3037 let child = read_junction_id_pg(r, "child_id")?;
3038 children_by_parent
3039 .entry(pk_json_key(&parent))
3040 .or_default()
3041 .push(child);
3042 }
3043 }
3044 }
3045
3046 for row in rows.iter_mut() {
3047 let Some(pk_json) = row.get(pk_name) else {
3048 continue;
3049 };
3050 let key = pk_json_key(pk_json);
3051 if let Some(children) = children_by_parent.remove(&key) {
3052 row.insert(rel.field_name.clone(), serde_json::Value::Array(children));
3053 }
3054 }
3055 }
3056 Ok(())
3057}
3058
3059fn pk_json_key(v: &serde_json::Value) -> String {
3065 match v {
3066 serde_json::Value::Number(n) => format!("n:{n}"),
3067 serde_json::Value::String(s) => format!("s:{s}"),
3068 other => format!("o:{other}"),
3069 }
3070}
3071
3072fn read_junction_id_sqlite(
3077 row: &sqlx::sqlite::SqliteRow,
3078 col: &str,
3079) -> Result<serde_json::Value, sqlx::Error> {
3080 if let Ok(i) = row.try_get::<i64, _>(col) {
3081 return Ok(serde_json::Value::Number(i.into()));
3082 }
3083 let s = row.try_get::<String, _>(col)?;
3084 Ok(serde_json::Value::String(s))
3085}
3086
3087fn read_junction_id_pg(
3088 row: &sqlx::postgres::PgRow,
3089 col: &str,
3090) -> Result<serde_json::Value, sqlx::Error> {
3091 if let Ok(i) = row.try_get::<i64, _>(col) {
3092 return Ok(serde_json::Value::Number(i.into()));
3093 }
3094 let s = row.try_get::<String, _>(col)?;
3095 Ok(serde_json::Value::String(s))
3096}
3097
3098async fn hydrate_m2m_into(
3099 meta: &crate::migrate::ModelMeta,
3100 parent_pk_json: Option<&serde_json::Value>,
3101 out: &mut serde_json::Map<String, serde_json::Value>,
3102) -> Result<(), sqlx::Error> {
3103 if meta.m2m_relations.is_empty() {
3104 return Ok(());
3105 }
3106 let Some(parent_pk_value) = parent_pk_json.and_then(json_pk_to_sea) else {
3107 return Ok(());
3108 };
3109 for rel in &meta.m2m_relations {
3110 let junction_table = format!("{}_{}", meta.table, rel.field_name);
3111 let mut sel = Query::select();
3112 sel.from(crate::db::router::schema_qualified_table(&junction_table));
3113 sel.column(Alias::new("child_id"));
3114 sel.and_where(Expr::col(Alias::new("parent_id")).eq(parent_pk_value.clone()));
3115 let children: Vec<serde_json::Value> =
3116 match resolve_pool_dyn(meta, crate::db::RouteOp::Read) {
3117 DbPool::Sqlite(pool) => {
3118 let (sql, values) = sel.build_sqlx(SqliteQueryBuilder);
3119 let rows = sqlx::query_with(&sql, values).fetch_all(&pool).await?;
3120 rows.iter()
3121 .map(|r| {
3122 r.try_get::<i64, _>("child_id")
3123 .map(|i| serde_json::Value::Number(i.into()))
3124 .or_else(|_| {
3125 r.try_get::<String, _>("child_id")
3126 .map(serde_json::Value::String)
3127 })
3128 })
3129 .collect::<Result<Vec<_>, _>>()?
3130 }
3131 DbPool::Postgres(pool) => {
3132 let (sql, values) = sel.build_sqlx(PostgresQueryBuilder);
3133 let rows = sqlx::query_with(&sql, values).fetch_all(&pool).await?;
3134 rows.iter()
3135 .map(|r| {
3136 r.try_get::<i64, _>("child_id")
3137 .map(|i| serde_json::Value::Number(i.into()))
3138 .or_else(|_| {
3139 r.try_get::<String, _>("child_id")
3140 .map(serde_json::Value::String)
3141 })
3142 })
3143 .collect::<Result<Vec<_>, _>>()?
3144 }
3145 };
3146 out.insert(rel.field_name.clone(), serde_json::Value::Array(children));
3147 }
3148 Ok(())
3149}
3150
3151async fn collect_parent_pks(
3158 meta: &crate::migrate::ModelMeta,
3159 pk_col: &crate::migrate::Column,
3160 where_clauses: &[Condition],
3161) -> Result<Vec<serde_json::Value>, crate::orm::write::WriteError> {
3162 let mut sel = Query::select();
3163 sel.from(crate::db::router::schema_qualified_table(&meta.table));
3164 sel.column(Alias::new(&pk_col.name));
3165 for cond in where_clauses {
3166 sel.cond_where(cond.clone());
3167 }
3168 match resolve_pool_dyn(meta, crate::db::RouteOp::Read) {
3169 DbPool::Sqlite(pool) => {
3170 let (sql, values) = sel.build_sqlx(SqliteQueryBuilder);
3171 let rows = sqlx::query_with(&sql, values).fetch_all(&pool).await?;
3172 rows.iter()
3173 .map(|row| decode_to_json(row, pk_col))
3174 .collect::<Result<Vec<_>, _>>()
3175 .map_err(crate::orm::write::WriteError::Sqlx)
3176 }
3177 DbPool::Postgres(pool) => {
3178 let (sql, values) = sel.build_sqlx(PostgresQueryBuilder);
3179 let rows = sqlx::query_with(&sql, values).fetch_all(&pool).await?;
3180 rows.iter()
3181 .map(|row| decode_pg_to_json(row, pk_col))
3182 .collect::<Result<Vec<_>, _>>()
3183 .map_err(crate::orm::write::WriteError::Sqlx)
3184 }
3185 }
3186}
3187
3188async fn collect_parent_pks_in_tx(
3192 meta: &crate::migrate::ModelMeta,
3193 pk_col: &crate::migrate::Column,
3194 where_clauses: &[Condition],
3195 tx: &mut crate::db::Transaction,
3196) -> Result<Vec<serde_json::Value>, crate::orm::write::WriteError> {
3197 let mut sel = Query::select();
3198 sel.from(crate::db::router::schema_qualified_table(&meta.table));
3199 sel.column(Alias::new(&pk_col.name));
3200 for cond in where_clauses {
3201 sel.cond_where(cond.clone());
3202 }
3203 match tx.backend_name() {
3204 "sqlite" => {
3205 let (sql, values) = sel.build_sqlx(SqliteQueryBuilder);
3206 let inner = tx.as_sqlite_mut().expect("sqlite backend_name");
3207 let rows = sqlx::query_with(&sql, values)
3208 .fetch_all(&mut **inner)
3209 .await?;
3210 rows.iter()
3211 .map(|row| decode_to_json(row, pk_col))
3212 .collect::<Result<Vec<_>, _>>()
3213 .map_err(crate::orm::write::WriteError::Sqlx)
3214 }
3215 _ => {
3216 let (sql, values) = sel.build_sqlx(PostgresQueryBuilder);
3217 let inner = tx.as_pg_mut().expect("postgres backend_name");
3218 let rows = sqlx::query_with(&sql, values)
3219 .fetch_all(&mut **inner)
3220 .await?;
3221 rows.iter()
3222 .map(|row| decode_pg_to_json(row, pk_col))
3223 .collect::<Result<Vec<_>, _>>()
3224 .map_err(crate::orm::write::WriteError::Sqlx)
3225 }
3226 }
3227}
3228
3229fn normalise_insert_body(
3246 meta: &crate::migrate::ModelMeta,
3247 body: &serde_json::Map<String, serde_json::Value>,
3248) -> Option<serde_json::Map<String, serde_json::Value>> {
3249 let needs_owned = meta
3250 .fields
3251 .iter()
3252 .any(|c| c.noform || c.slug_from.is_some());
3253 if !needs_owned {
3254 return None;
3255 }
3256 let mut owned = body.clone();
3257 for col in &meta.fields {
3258 if col.noform {
3259 owned.remove(&col.name);
3260 }
3261 }
3262 crate::orm::write::apply_slug_from(&meta.fields, &mut owned, false);
3263 Some(owned)
3264}
3265
3266struct InsertPlan {
3268 q: sea_query::InsertStatement,
3269 pk_name: String,
3270 pk_ty: SqlType,
3271}
3272
3273fn build_insert_plan(
3282 meta: &crate::migrate::ModelMeta,
3283 body: &serde_json::Map<String, serde_json::Value>,
3284) -> Result<InsertPlan, crate::orm::write::WriteError> {
3285 use crate::orm::write::{WriteError, is_default_pk};
3286
3287 let mut cols: Vec<&str> = Vec::new();
3288 let mut values: Vec<SeaValue> = Vec::new();
3289 for col in &meta.fields {
3290 if col.primary_key {
3291 let supplied = body.get(&col.name);
3292 let is_sentinel = match supplied {
3293 None | Some(serde_json::Value::Null) => true,
3294 Some(v) => is_default_pk(col.ty, v),
3295 };
3296 if matches!(
3297 col.ty,
3298 SqlType::Integer | SqlType::BigInt | SqlType::SmallInt
3299 ) && is_sentinel
3300 {
3301 continue;
3302 }
3303 }
3304 let Some(json) = body.get(&col.name) else {
3305 if col.auto_now_add || col.auto_now {
3306 let now_value = crate::orm::write::now_for_column(col.ty);
3307 cols.push(&col.name);
3308 values.push(now_value);
3309 continue;
3310 }
3311 continue;
3312 };
3313 if json.is_null() {
3314 continue;
3315 }
3316 validate_numeric_bounds(col, json)?;
3317 if let (Some(fmt), Some(s)) = (col.text_format.as_deref(), json.as_str()) {
3318 if let Err(e) = crate::orm::validators::validate_text_format(fmt, s) {
3319 return Err(WriteError::Validator {
3320 field: col.name.clone(),
3321 message: e.to_string(),
3322 });
3323 }
3324 }
3325 let sea_value = crate::orm::write::json_to_sea_value(
3326 col.ty,
3327 json,
3328 col.nullable,
3329 &col.name,
3330 fk_target_pk_sql_type(col),
3331 )?;
3332 cols.push(&col.name);
3333 values.push(sea_value);
3334 }
3335
3336 let pk_col = meta.fields.iter().find(|c| c.primary_key).ok_or_else(|| {
3337 WriteError::Sqlx(sqlx::Error::Protocol(
3338 "insert_json: model has no PK".to_string(),
3339 ))
3340 })?;
3341 let pk_name = pk_col.name.clone();
3342 let pk_ty = pk_col.ty;
3343
3344 let mut q = Query::insert();
3345 q.into_table(crate::db::router::schema_qualified_table(&meta.table));
3346 q.columns(cols.iter().map(|c| Alias::new(*c)).collect::<Vec<_>>());
3347 let exprs: Vec<sea_query::SimpleExpr> = values.into_iter().map(Into::into).collect();
3348 q.values_panic(exprs);
3349
3350 Ok(InsertPlan { q, pk_name, pk_ty })
3351}
3352
3353async fn write_m2m_junctions_in_tx(
3357 meta: &crate::migrate::ModelMeta,
3358 parent_pk_json: Option<&serde_json::Value>,
3359 body: &serde_json::Map<String, serde_json::Value>,
3360 tx: &mut crate::db::Transaction,
3361) -> Result<(), crate::orm::write::WriteError> {
3362 if meta.m2m_relations.is_empty() {
3363 return Ok(());
3364 }
3365 let Some(parent_pk_value) = parent_pk_json.and_then(json_pk_to_sea) else {
3366 return Ok(());
3367 };
3368 for rel in &meta.m2m_relations {
3369 let Some(value) = body.get(&rel.field_name) else {
3370 continue;
3371 };
3372 let Some(items) = value.as_array() else {
3373 continue;
3374 };
3375 let mut child_ids: Vec<sea_query::Value> = Vec::with_capacity(items.len());
3376 for item in items {
3377 if item.is_null() {
3378 continue;
3379 }
3380 if let Some(v) = json_pk_to_sea(item) {
3381 child_ids.push(v);
3382 }
3383 }
3384 let junction_table = format!("{}_{}", meta.table, rel.field_name);
3385 crate::orm::m2m::set_junction_dynamic_in_tx(
3386 &junction_table,
3387 parent_pk_value.clone(),
3388 child_ids,
3389 tx,
3390 )
3391 .await
3392 .map_err(crate::orm::write::WriteError::Sqlx)?;
3393 }
3394 Ok(())
3395}
3396
3397async fn hydrate_m2m_into_tx(
3402 meta: &crate::migrate::ModelMeta,
3403 parent_pk_json: Option<&serde_json::Value>,
3404 out: &mut serde_json::Map<String, serde_json::Value>,
3405 tx: &mut crate::db::Transaction,
3406) -> Result<(), sqlx::Error> {
3407 if meta.m2m_relations.is_empty() {
3408 return Ok(());
3409 }
3410 let Some(parent_pk_value) = parent_pk_json.and_then(json_pk_to_sea) else {
3411 return Ok(());
3412 };
3413 for rel in &meta.m2m_relations {
3414 let junction_table = format!("{}_{}", meta.table, rel.field_name);
3415 let mut sel = Query::select();
3416 sel.from(crate::db::router::schema_qualified_table(&junction_table));
3417 sel.column(Alias::new("child_id"));
3418 sel.and_where(Expr::col(Alias::new("parent_id")).eq(parent_pk_value.clone()));
3419 let children: Vec<serde_json::Value> = match tx.backend_name() {
3420 "sqlite" => {
3421 let inner = tx.as_sqlite_mut().expect("sqlite backend_name");
3422 let (sql, values) = sel.build_sqlx(SqliteQueryBuilder);
3423 let rows = sqlx::query_with(&sql, values)
3424 .fetch_all(&mut **inner)
3425 .await?;
3426 rows.iter()
3427 .map(|r| {
3428 r.try_get::<i64, _>("child_id")
3429 .map(|i| serde_json::Value::Number(i.into()))
3430 .or_else(|_| {
3431 r.try_get::<String, _>("child_id")
3432 .map(serde_json::Value::String)
3433 })
3434 })
3435 .collect::<Result<Vec<_>, _>>()?
3436 }
3437 _ => {
3438 let inner = tx.as_pg_mut().expect("postgres backend_name");
3439 let (sql, values) = sel.build_sqlx(PostgresQueryBuilder);
3440 let rows = sqlx::query_with(&sql, values)
3441 .fetch_all(&mut **inner)
3442 .await?;
3443 rows.iter()
3444 .map(|r| {
3445 r.try_get::<i64, _>("child_id")
3446 .map(|i| serde_json::Value::Number(i.into()))
3447 .or_else(|_| {
3448 r.try_get::<String, _>("child_id")
3449 .map(serde_json::Value::String)
3450 })
3451 })
3452 .collect::<Result<Vec<_>, _>>()?
3453 }
3454 };
3455 out.insert(rel.field_name.clone(), serde_json::Value::Array(children));
3456 }
3457 Ok(())
3458}
3459
3460async fn write_m2m_junctions(
3461 meta: &crate::migrate::ModelMeta,
3462 parent_pk_json: Option<&serde_json::Value>,
3463 body: &serde_json::Map<String, serde_json::Value>,
3464) -> Result<(), crate::orm::write::WriteError> {
3465 if meta.m2m_relations.is_empty() {
3466 return Ok(());
3467 }
3468 let Some(parent_pk_value) = parent_pk_json.and_then(json_pk_to_sea) else {
3469 return Ok(());
3470 };
3471 for rel in &meta.m2m_relations {
3472 let Some(value) = body.get(&rel.field_name) else {
3473 continue;
3474 };
3475 let Some(items) = value.as_array() else {
3476 continue; };
3478 let mut child_ids: Vec<sea_query::Value> = Vec::with_capacity(items.len());
3479 for item in items {
3480 if item.is_null() {
3481 continue;
3482 }
3483 if let Some(v) = json_pk_to_sea(item) {
3484 child_ids.push(v);
3485 }
3486 }
3487 let junction_table = format!("{}_{}", meta.table, rel.field_name);
3488 crate::orm::m2m::set_junction_dynamic(
3489 &junction_table,
3490 parent_pk_value.clone(),
3491 child_ids,
3492 Some(&meta.name),
3493 )
3494 .await
3495 .map_err(crate::orm::write::WriteError::Sqlx)?;
3496 }
3497 Ok(())
3498}
3499
3500fn coerce_csv_cell(ty: SqlType, nullable: bool, raw: &str) -> serde_json::Value {
3517 use serde_json::Value;
3518 if raw.is_empty() && nullable {
3519 return Value::Null;
3520 }
3521 match ty {
3522 SqlType::SmallInt | SqlType::Integer | SqlType::BigInt | SqlType::ForeignKey => raw
3523 .parse::<i64>()
3524 .map(Value::from)
3525 .unwrap_or_else(|_| Value::String(raw.to_string())),
3526 SqlType::Real | SqlType::Double => raw
3527 .parse::<f64>()
3528 .ok()
3529 .and_then(serde_json::Number::from_f64)
3530 .map(Value::Number)
3531 .unwrap_or_else(|| Value::String(raw.to_string())),
3532 SqlType::Boolean => match raw.trim().to_ascii_lowercase().as_str() {
3533 "true" | "1" | "t" | "yes" | "y" => Value::Bool(true),
3534 "false" | "0" | "f" | "no" | "n" => Value::Bool(false),
3535 _ => Value::String(raw.to_string()),
3536 },
3537 SqlType::Json => {
3538 serde_json::from_str(raw).unwrap_or_else(|_| Value::String(raw.to_string()))
3539 }
3540 _ => Value::String(raw.to_string()),
3541 }
3542}
3543
3544#[derive(Debug, Default)]
3551pub struct CsvImportReport {
3552 pub inserted: usize,
3553 pub errors: Vec<(usize, String)>,
3554}
3555
3556pub async fn import_table_rows(
3567 meta: &ModelMeta,
3568 headers: &[String],
3569 rows: &[Vec<String>],
3570) -> CsvImportReport {
3571 let col_for: HashMap<&str, &Column> =
3572 meta.fields.iter().map(|c| (c.name.as_str(), c)).collect();
3573
3574 let mut report = CsvImportReport::default();
3575 for (i, row) in rows.iter().enumerate() {
3576 let mut obj = serde_json::Map::new();
3577 for (header, cell) in headers.iter().zip(row.iter()) {
3578 if let Some(col) = col_for.get(header.as_str()) {
3579 obj.insert(header.clone(), coerce_csv_cell(col.ty, col.nullable, cell));
3580 }
3581 }
3582 match DynQuerySet::for_meta(meta).insert_json(&obj).await {
3583 Ok(_) => report.inserted += 1,
3584 Err(e) => report.errors.push((i + 2, e.to_string())),
3585 }
3586 }
3587 report
3588}
3589
3590#[cfg(test)]
3591mod tests {
3592 use super::form_str_to_sea_value;
3593 use crate::migrate::Column;
3594 use crate::orm::{FkAction, SqlType};
3595 use sea_query::Value as SeaValue;
3596
3597 fn col(name: &str, ty: SqlType, nullable: bool) -> Column {
3598 Column {
3599 name: name.to_string(),
3600 ty,
3601 primary_key: false,
3602 nullable,
3603 fk_target: None,
3604 noform: false,
3605 db_constraint: true,
3606 noedit: false,
3607 is_string_repr: false,
3608 max_length: 0,
3609 choices: Vec::new(),
3610 choice_labels: Vec::new(),
3611 default: String::new(),
3612 is_multichoice: false,
3613 unique: false,
3614 on_delete: FkAction::NoAction,
3615 on_update: FkAction::NoAction,
3616 index: false,
3617 auto_now_add: false,
3618 auto_now: false,
3619 help: String::new(),
3620 example: String::new(),
3621 widget: None,
3622 supported_backends: Vec::new(),
3623 min: None,
3624 max: None,
3625 text_format: None,
3626 slug_from: None,
3627 }
3628 }
3629
3630 #[test]
3631 fn form_fk_numeric_string_binds_as_bigint() {
3632 let mut plugin = col("plugin", SqlType::ForeignKey, false);
3633 plugin.fk_target = Some("plugin".to_string());
3634
3635 let value = form_str_to_sea_value(&plugin, "1").expect("coerce FK id");
3636
3637 assert_eq!(
3638 value,
3639 SeaValue::BigInt(Some(1)),
3640 "integer-backed FK form values must bind as bigint, not text"
3641 );
3642 }
3643
3644 #[test]
3645 fn nullable_form_fk_blank_binds_as_null_bigint() {
3646 let mut parent = col("parent", SqlType::ForeignKey, true);
3647 parent.fk_target = Some("plugin_comment".to_string());
3648
3649 let value = form_str_to_sea_value(&parent, "").expect("blank nullable FK");
3650
3651 assert_eq!(
3652 value,
3653 SeaValue::BigInt(None),
3654 "blank nullable integer-backed FK should bind SQL NULL"
3655 );
3656 }
3657}