1use crate::clause::Where;
11use crate::expr::{Dialect, Expr};
12use asupersync::{Cx, Outcome};
13use sqlmodel_core::{
14 Connection, FieldInfo, InheritanceStrategy, Model, Row, TransactionOps, Value,
15};
16use std::collections::HashSet;
17use std::marker::PhantomData;
18
19fn is_joined_inheritance_child<M: Model>() -> bool {
20 let inh = M::inheritance();
21 inh.strategy == InheritanceStrategy::Joined && inh.parent.is_some()
22}
23
24#[derive(Debug, Clone, Copy, PartialEq, Eq)]
25enum JoinedTableTarget {
26 Parent,
27 Child,
28}
29
30type JoinedSetPairs = Vec<(&'static str, Value)>;
31
32#[allow(clippy::result_large_err)]
33fn joined_parent_meta<M: Model>()
34-> Result<(&'static str, &'static [FieldInfo]), sqlmodel_core::Error> {
35 let inh = M::inheritance();
36 let Some(parent_table) = inh.parent else {
37 return Err(sqlmodel_core::Error::Custom(
38 "joined-table inheritance child missing parent table metadata".to_string(),
39 ));
40 };
41 let Some(parent_fields_fn) = inh.parent_fields_fn else {
42 return Err(sqlmodel_core::Error::Custom(
43 "joined-table inheritance child missing parent_fields_fn metadata".to_string(),
44 ));
45 };
46 Ok((parent_table, parent_fields_fn()))
47}
48
49#[allow(clippy::result_large_err)]
50fn classify_joined_column<M: Model>(
51 column: &str,
52 parent_table: &'static str,
53 parent_fields: &'static [FieldInfo],
54) -> Result<(JoinedTableTarget, &'static str), sqlmodel_core::Error> {
55 let child_fields = M::fields();
56
57 let child_lookup = |name: &str| -> Option<&'static str> {
58 child_fields
59 .iter()
60 .find(|f| f.column_name == name)
61 .map(|f| f.column_name)
62 };
63 let parent_lookup = |name: &str| -> Option<&'static str> {
64 parent_fields
65 .iter()
66 .find(|f| f.column_name == name)
67 .map(|f| f.column_name)
68 };
69
70 if let Some((table, col)) = column.split_once('.') {
71 if table == parent_table {
72 return parent_lookup(col)
73 .map(|c| (JoinedTableTarget::Parent, c))
74 .ok_or_else(|| {
75 sqlmodel_core::Error::Custom(format!(
76 "unknown parent column '{col}' for joined-table inheritance child"
77 ))
78 });
79 }
80 if table == M::TABLE_NAME {
81 return child_lookup(col)
82 .map(|c| (JoinedTableTarget::Child, c))
83 .ok_or_else(|| {
84 sqlmodel_core::Error::Custom(format!(
85 "unknown child column '{col}' for joined-table inheritance child"
86 ))
87 });
88 }
89 return Err(sqlmodel_core::Error::Custom(format!(
90 "unknown table qualifier '{table}' for joined-table inheritance DML; expected '{}' or '{}'",
91 parent_table,
92 M::TABLE_NAME
93 )));
94 }
95
96 let in_parent = parent_lookup(column);
97 let in_child = child_lookup(column);
98 match (in_parent, in_child) {
99 (Some(c), None) => Ok((JoinedTableTarget::Parent, c)),
100 (None, Some(c)) => Ok((JoinedTableTarget::Child, c)),
101 (Some(_), Some(_)) => Err(sqlmodel_core::Error::Custom(format!(
102 "ambiguous joined-table inheritance column '{column}' exists in both parent and child tables; qualify as '{parent_table}.{column}' or '{}.{column}'",
103 M::TABLE_NAME
104 ))),
105 (None, None) => Err(sqlmodel_core::Error::Custom(format!(
106 "unknown joined-table inheritance column '{column}'"
107 ))),
108 }
109}
110
111#[allow(clippy::result_large_err)]
112fn build_joined_pk_select_sql<M: Model>(
113 dialect: Dialect,
114 where_clause: Option<&Where>,
115 param_offset: usize,
116) -> Result<(String, Vec<Value>), sqlmodel_core::Error> {
117 let (parent_table, _parent_fields) = joined_parent_meta::<M>()?;
118 let pk_cols = M::PRIMARY_KEY;
119 if pk_cols.is_empty() {
120 return Err(sqlmodel_core::Error::Custom(
121 "joined-table inheritance DML requires a primary key".to_string(),
122 ));
123 }
124
125 let mut sql = String::new();
126 sql.push_str("SELECT ");
127 sql.push_str(
129 &pk_cols
130 .iter()
131 .map(|c| format!("{}.{}", M::TABLE_NAME, c))
132 .collect::<Vec<_>>()
133 .join(", "),
134 );
135 sql.push_str(" FROM ");
136 sql.push_str(M::TABLE_NAME);
137 sql.push_str(" JOIN ");
138 sql.push_str(parent_table);
139 sql.push_str(" ON ");
140 sql.push_str(
141 &pk_cols
142 .iter()
143 .map(|c| format!("{}.{} = {}.{}", M::TABLE_NAME, c, parent_table, c))
144 .collect::<Vec<_>>()
145 .join(" AND "),
146 );
147
148 let mut params = Vec::new();
149 if let Some(w) = where_clause {
150 let (where_sql, where_params) = w.build_with_dialect(dialect, param_offset);
151 sql.push_str(" WHERE ");
152 sql.push_str(&where_sql);
153 params.extend(where_params);
154 }
155
156 Ok((sql, params))
157}
158
159#[allow(clippy::result_large_err)]
160fn extract_pk_values_from_rows(
161 rows: Vec<Row>,
162 pk_col_count: usize,
163) -> Result<Vec<Vec<Value>>, sqlmodel_core::Error> {
164 let mut pk_values = Vec::with_capacity(rows.len());
165 for row in rows {
166 if row.len() < pk_col_count {
167 return Err(sqlmodel_core::Error::Custom(format!(
168 "joined-table inheritance PK lookup returned {} columns; expected at least {}",
169 row.len(),
170 pk_col_count
171 )));
172 }
173 let mut vals = Vec::with_capacity(pk_col_count);
174 for i in 0..pk_col_count {
175 let Some(v) = row.get(i) else {
176 return Err(sqlmodel_core::Error::Custom(format!(
177 "joined-table inheritance PK lookup missing column index {i}"
178 )));
179 };
180 vals.push(v.clone());
181 }
182 pk_values.push(vals);
183 }
184 Ok(pk_values)
185}
186
187async fn select_joined_pk_values_in_tx<Tx: TransactionOps, M: Model>(
188 tx: &Tx,
189 cx: &Cx,
190 dialect: Dialect,
191 where_clause: Option<&Where>,
192) -> Outcome<Vec<Vec<Value>>, sqlmodel_core::Error> {
193 let pk_cols = M::PRIMARY_KEY;
194 let (pk_sql, pk_params) = match build_joined_pk_select_sql::<M>(dialect, where_clause, 0) {
195 Ok(v) => v,
196 Err(e) => return Outcome::Err(e),
197 };
198 match tx.query(cx, &pk_sql, &pk_params).await {
199 Outcome::Ok(rows) => match extract_pk_values_from_rows(rows, pk_cols.len()) {
200 Ok(vals) => Outcome::Ok(vals),
201 Err(e) => Outcome::Err(e),
202 },
203 Outcome::Err(e) => Outcome::Err(e),
204 Outcome::Cancelled(r) => Outcome::Cancelled(r),
205 Outcome::Panicked(p) => Outcome::Panicked(p),
206 }
207}
208
209#[allow(clippy::result_large_err)]
210fn split_explicit_joined_sets<M: Model>(
211 explicit_sets: &[SetClause],
212 parent_table: &'static str,
213 parent_fields: &'static [FieldInfo],
214) -> Result<(JoinedSetPairs, JoinedSetPairs), sqlmodel_core::Error> {
215 let mut parent_sets = Vec::new();
216 let mut child_sets = Vec::new();
217
218 for set in explicit_sets {
219 let (target, col) = classify_joined_column::<M>(&set.column, parent_table, parent_fields)?;
220 if M::PRIMARY_KEY.contains(&col) {
221 return Err(sqlmodel_core::Error::Custom(format!(
222 "joined-table inheritance update does not support setting primary key column '{col}'"
223 )));
224 }
225 match target {
226 JoinedTableTarget::Parent => parent_sets.push((col, set.value.clone())),
227 JoinedTableTarget::Child => child_sets.push((col, set.value.clone())),
228 }
229 }
230
231 Ok((parent_sets, child_sets))
232}
233
234fn build_pk_in_where(
235 dialect: Dialect,
236 pk_cols: &[&'static str],
237 pk_values: &[Vec<Value>],
238 param_offset: usize,
239) -> (String, Vec<Value>) {
240 let mut params: Vec<Value> = Vec::new();
241
242 if pk_cols.is_empty() || pk_values.is_empty() {
243 return (String::new(), params);
244 }
245
246 if pk_cols.len() == 1 {
247 let col = pk_cols[0];
248 let mut placeholders = Vec::new();
249 for vals in pk_values {
250 if vals.len() != 1 {
251 continue;
252 }
253 params.push(vals[0].clone());
254 placeholders.push(dialect.placeholder(param_offset + params.len()));
255 }
256 return (format!("{col} IN ({})", placeholders.join(", ")), params);
257 }
258
259 let cols_tuple = format!("({})", pk_cols.join(", "));
261 let mut groups = Vec::new();
262 for vals in pk_values {
263 if vals.len() != pk_cols.len() {
264 continue;
265 }
266 let mut ph = Vec::new();
267 for v in vals {
268 params.push(v.clone());
269 ph.push(dialect.placeholder(param_offset + params.len()));
270 }
271 groups.push(format!("({})", ph.join(", ")));
272 }
273
274 (format!("{cols_tuple} IN ({})", groups.join(", ")), params)
275}
276
277fn build_pk_in_where_qualified(
278 dialect: Dialect,
279 table: &str,
280 pk_cols: &[&'static str],
281 pk_values: &[Vec<Value>],
282 param_offset: usize,
283) -> (String, Vec<Value>) {
284 let qualified_cols: Vec<String> = pk_cols.iter().map(|c| format!("{table}.{c}")).collect();
285
286 let mut params: Vec<Value> = Vec::new();
287 if qualified_cols.is_empty() || pk_values.is_empty() {
288 return (String::new(), params);
289 }
290
291 if qualified_cols.len() == 1 {
292 let col = &qualified_cols[0];
293 let mut placeholders = Vec::new();
294 for vals in pk_values {
295 if vals.len() != 1 {
296 continue;
297 }
298 params.push(vals[0].clone());
299 placeholders.push(dialect.placeholder(param_offset + params.len()));
300 }
301 return (format!("{col} IN ({})", placeholders.join(", ")), params);
302 }
303
304 let cols_tuple = format!("({})", qualified_cols.join(", "));
305 let mut groups = Vec::new();
306 for vals in pk_values {
307 if vals.len() != qualified_cols.len() {
308 continue;
309 }
310 let mut ph = Vec::new();
311 for v in vals {
312 params.push(v.clone());
313 ph.push(dialect.placeholder(param_offset + params.len()));
314 }
315 groups.push(format!("({})", ph.join(", ")));
316 }
317
318 (format!("{cols_tuple} IN ({})", groups.join(", ")), params)
319}
320
321fn build_update_sql_for_table_pk_in(
322 dialect: Dialect,
323 table: &str,
324 pk_cols: &[&'static str],
325 pk_values: &[Vec<Value>],
326 set_pairs: &[(&'static str, Value)],
327) -> (String, Vec<Value>) {
328 let mut params = Vec::new();
329 let mut set_clauses = Vec::new();
330 for (col, value) in set_pairs {
331 set_clauses.push(format!(
332 "{} = {}",
333 col,
334 dialect.placeholder(params.len() + 1)
335 ));
336 params.push(value.clone());
337 }
338 if set_clauses.is_empty() {
339 return (String::new(), Vec::new());
340 }
341
342 let (pk_where, pk_params) = build_pk_in_where(dialect, pk_cols, pk_values, params.len());
343 if pk_where.is_empty() {
344 return (String::new(), Vec::new());
345 }
346
347 let sql = format!(
348 "UPDATE {} SET {} WHERE {}",
349 table,
350 set_clauses.join(", "),
351 pk_where
352 );
353 params.extend(pk_params);
354 (sql, params)
355}
356
357fn build_delete_sql_for_table_pk_in(
358 dialect: Dialect,
359 table: &str,
360 pk_cols: &[&'static str],
361 pk_values: &[Vec<Value>],
362) -> (String, Vec<Value>) {
363 let (pk_where, pk_params) = build_pk_in_where(dialect, pk_cols, pk_values, 0);
364 if pk_where.is_empty() {
365 return (String::new(), Vec::new());
366 }
367 (format!("DELETE FROM {table} WHERE {pk_where}"), pk_params)
368}
369
370#[allow(clippy::result_large_err)]
371fn build_joined_child_select_sql_by_pk_in<M: Model>(
372 dialect: Dialect,
373 pk_cols: &[&'static str],
374 pk_values: &[Vec<Value>],
375) -> Result<(String, Vec<Value>), sqlmodel_core::Error> {
376 let (parent_table, parent_fields) = joined_parent_meta::<M>()?;
377 if pk_cols.is_empty() {
378 return Err(sqlmodel_core::Error::Custom(
379 "joined-table inheritance returning requires a primary key".to_string(),
380 ));
381 }
382
383 let child_cols: Vec<&'static str> = M::fields().iter().map(|f| f.column_name).collect();
384 let parent_cols: Vec<&'static str> = parent_fields.iter().map(|f| f.column_name).collect();
385
386 let mut col_parts = Vec::new();
387 for col in &child_cols {
388 col_parts.push(format!(
389 "{}.{} AS {}__{}",
390 M::TABLE_NAME,
391 col,
392 M::TABLE_NAME,
393 col
394 ));
395 }
396 for col in &parent_cols {
397 col_parts.push(format!(
398 "{}.{} AS {}__{}",
399 parent_table, col, parent_table, col
400 ));
401 }
402
403 let mut sql = String::new();
404 sql.push_str("SELECT ");
405 sql.push_str(&col_parts.join(", "));
406 sql.push_str(" FROM ");
407 sql.push_str(M::TABLE_NAME);
408 sql.push_str(" JOIN ");
409 sql.push_str(parent_table);
410 sql.push_str(" ON ");
411 sql.push_str(
412 &pk_cols
413 .iter()
414 .map(|c| format!("{}.{} = {}.{}", M::TABLE_NAME, c, parent_table, c))
415 .collect::<Vec<_>>()
416 .join(" AND "),
417 );
418
419 let (pk_where, pk_params) =
420 build_pk_in_where_qualified(dialect, M::TABLE_NAME, pk_cols, pk_values, 0);
421 if pk_where.is_empty() {
422 return Ok((String::new(), Vec::new()));
423 }
424 sql.push_str(" WHERE ");
425 sql.push_str(&pk_where);
426
427 Ok((sql, pk_params))
428}
429
430fn rewrite_insert_as_ignore(sql: &mut String) {
431 if let Some(rest) = sql.strip_prefix("INSERT INTO ") {
432 *sql = format!("INSERT IGNORE INTO {rest}");
433 }
434}
435
436fn append_on_conflict_clause(
437 dialect: Dialect,
438 sql: &mut String,
439 pk_cols: &[&'static str],
440 insert_columns: &[&'static str],
441 on_conflict: &OnConflict,
442) {
443 if dialect == Dialect::Mysql {
444 match on_conflict {
445 OnConflict::DoNothing => {
446 rewrite_insert_as_ignore(sql);
447 return;
448 }
449 OnConflict::DoUpdate { columns, .. } => {
450 let update_cols: Vec<String> = if columns.is_empty() {
451 insert_columns
452 .iter()
453 .filter(|c| !pk_cols.contains(c))
454 .map(|c| (*c).to_string())
455 .collect()
456 } else {
457 columns.clone()
458 };
459
460 if update_cols.is_empty() {
461 rewrite_insert_as_ignore(sql);
462 return;
463 }
464
465 sql.push_str(" ON DUPLICATE KEY UPDATE ");
466 sql.push_str(
467 &update_cols
468 .iter()
469 .map(|c| format!("{c} = VALUES({c})"))
470 .collect::<Vec<_>>()
471 .join(", "),
472 );
473 return;
474 }
475 }
476 }
477
478 match on_conflict {
479 OnConflict::DoNothing => {
480 sql.push_str(" ON CONFLICT DO NOTHING");
481 }
482 OnConflict::DoUpdate { columns, target } => {
483 sql.push_str(" ON CONFLICT");
484
485 let effective_target: Vec<String> = if target.is_empty() {
486 pk_cols.iter().map(|c| (*c).to_string()).collect()
487 } else {
488 target.clone()
489 };
490
491 if effective_target.is_empty() {
492 sql.push_str(" DO NOTHING");
493 return;
494 }
495
496 sql.push_str(" (");
497 sql.push_str(&effective_target.join(", "));
498 sql.push(')');
499
500 let update_cols: Vec<String> = if columns.is_empty() {
501 insert_columns
502 .iter()
503 .filter(|c| !pk_cols.contains(c))
504 .map(|c| (*c).to_string())
505 .collect()
506 } else {
507 columns.clone()
508 };
509
510 if update_cols.is_empty() {
511 sql.push_str(" DO NOTHING");
512 return;
513 }
514
515 sql.push_str(" DO UPDATE SET ");
516 sql.push_str(
517 &update_cols
518 .iter()
519 .map(|c| format!("{c} = EXCLUDED.{c}"))
520 .collect::<Vec<_>>()
521 .join(", "),
522 );
523 }
524 }
525}
526
527fn build_insert_sql_for_table_with_columns(
528 dialect: Dialect,
529 table: &str,
530 fields: &[FieldInfo],
531 row: &[(&'static str, Value)],
532 returning: Option<&str>,
533) -> (String, Vec<Value>, Vec<&'static str>) {
534 let insert_fields: Vec<_> = row
535 .iter()
536 .map(|(name, value)| {
537 let field = fields.iter().find(|f| f.column_name == *name);
538 if let Some(f) = field {
539 if f.auto_increment && matches!(value, Value::Null) {
540 return (*name, Value::Default);
541 }
542 }
543 (*name, value.clone())
544 })
545 .collect();
546
547 let mut columns = Vec::new();
548 let mut placeholders = Vec::new();
549 let mut params = Vec::new();
550
551 for (name, value) in insert_fields {
552 if matches!(value, Value::Default) && dialect == Dialect::Sqlite {
553 continue;
555 }
556
557 columns.push(name);
558
559 if matches!(value, Value::Default) {
560 placeholders.push("DEFAULT".to_string());
561 } else {
562 params.push(value);
563 placeholders.push(dialect.placeholder(params.len()));
564 }
565 }
566
567 let mut sql = if columns.is_empty() {
568 format!("INSERT INTO {} DEFAULT VALUES", table)
569 } else {
570 format!(
571 "INSERT INTO {} ({}) VALUES ({})",
572 table,
573 columns.join(", "),
574 placeholders.join(", ")
575 )
576 };
577
578 if let Some(ret) = returning {
579 sql.push_str(" RETURNING ");
580 sql.push_str(ret);
581 }
582
583 (sql, params, columns)
584}
585
586fn build_insert_sql_for_table(
587 dialect: Dialect,
588 table: &str,
589 fields: &[FieldInfo],
590 row: &[(&'static str, Value)],
591 returning: Option<&str>,
592) -> (String, Vec<Value>) {
593 let (sql, params, _cols) =
594 build_insert_sql_for_table_with_columns(dialect, table, fields, row, returning);
595 (sql, params)
596}
597
598fn build_update_sql_for_table(
599 dialect: Dialect,
600 table: &str,
601 pk_cols: &[&'static str],
602 pk_vals: &[Value],
603 set_pairs: &[(&'static str, Value)],
604) -> (String, Vec<Value>) {
605 let mut params = Vec::new();
606 let mut set_clauses = Vec::new();
607 for (col, value) in set_pairs {
608 set_clauses.push(format!(
609 "{} = {}",
610 col,
611 dialect.placeholder(params.len() + 1)
612 ));
613 params.push(value.clone());
614 }
615
616 if set_clauses.is_empty() {
617 return (String::new(), Vec::new());
618 }
619
620 let mut sql = format!("UPDATE {} SET {}", table, set_clauses.join(", "));
621 if !pk_cols.is_empty() && pk_cols.len() == pk_vals.len() {
622 let where_parts: Vec<String> = pk_cols
623 .iter()
624 .enumerate()
625 .map(|(i, col)| format!("{} = {}", col, dialect.placeholder(params.len() + i + 1)))
626 .collect();
627 sql.push_str(" WHERE ");
628 sql.push_str(&where_parts.join(" AND "));
629 params.extend_from_slice(pk_vals);
630 }
631
632 (sql, params)
633}
634
635fn extract_single_pk_i64(pk_vals: &[Value]) -> Option<i64> {
636 if pk_vals.len() != 1 {
637 return None;
638 }
639 match &pk_vals[0] {
640 Value::BigInt(v) => Some(*v),
641 Value::Int(v) => Some(i64::from(*v)),
642 _ => None,
643 }
644}
645
646async fn insert_joined_model_in_tx<Tx: TransactionOps, M: Model>(
647 tx: &Tx,
648 cx: &Cx,
649 dialect: Dialect,
650 model: &M,
651 parent_table: &'static str,
652 parent_fields: &'static [FieldInfo],
653) -> Outcome<(u64, Vec<Value>), sqlmodel_core::Error> {
654 let Some(parent_row) = model.joined_parent_row() else {
655 return Outcome::Err(sqlmodel_core::Error::Custom(
656 "joined-table inheritance child missing joined_parent_row() implementation".to_string(),
657 ));
658 };
659
660 let pk_cols = M::PRIMARY_KEY;
661 if pk_cols.is_empty() {
662 return Outcome::Err(sqlmodel_core::Error::Custom(
663 "joined-table inheritance insert requires a primary key column".to_string(),
664 ));
665 }
666 let mut effective_pk_vals = model.primary_key_value();
667 let pk_col = pk_cols.first().copied();
668 let needs_generated_id = pk_col.is_some()
669 && effective_pk_vals.len() == 1
670 && parent_fields
671 .iter()
672 .find(|f| f.column_name == pk_col.unwrap_or("") && f.primary_key)
673 .is_some_and(|f| f.auto_increment)
674 && effective_pk_vals[0].is_null();
675
676 let mut inserted_id: Option<i64> = None;
677 if dialect == Dialect::Postgres && needs_generated_id {
678 let Some(pk_col) = pk_col else {
679 return Outcome::Err(sqlmodel_core::Error::Custom(
680 "joined-table inheritance insert requires a primary key column".to_string(),
681 ));
682 };
683 let (sql, params, _cols) = build_insert_sql_for_table_with_columns(
684 dialect,
685 parent_table,
686 parent_fields,
687 &parent_row,
688 Some(pk_col),
689 );
690 match tx.query_one(cx, &sql, ¶ms).await {
691 Outcome::Ok(Some(row)) => match row.get_as::<i64>(0) {
692 Ok(v) => inserted_id = Some(v),
693 Err(e) => return Outcome::Err(e),
694 },
695 Outcome::Ok(None) => {
696 return Outcome::Err(sqlmodel_core::Error::Custom(
697 "base insert returned no row".to_string(),
698 ));
699 }
700 Outcome::Err(e) => return Outcome::Err(e),
701 Outcome::Cancelled(r) => return Outcome::Cancelled(r),
702 Outcome::Panicked(p) => return Outcome::Panicked(p),
703 }
704 } else {
705 let (sql, params, _cols) = build_insert_sql_for_table_with_columns(
706 dialect,
707 parent_table,
708 parent_fields,
709 &parent_row,
710 None,
711 );
712 match tx.execute(cx, &sql, ¶ms).await {
713 Outcome::Ok(_) => {}
714 Outcome::Err(e) => return Outcome::Err(e),
715 Outcome::Cancelled(r) => return Outcome::Cancelled(r),
716 Outcome::Panicked(p) => return Outcome::Panicked(p),
717 }
718
719 if needs_generated_id {
720 let id_sql = match dialect {
721 Dialect::Sqlite => "SELECT last_insert_rowid()",
722 Dialect::Mysql => "SELECT LAST_INSERT_ID()",
723 Dialect::Postgres => unreachable!(),
724 };
725 match tx.query_one(cx, id_sql, &[]).await {
726 Outcome::Ok(Some(row)) => match row.get_as::<i64>(0) {
727 Ok(v) => inserted_id = Some(v),
728 Err(e) => return Outcome::Err(e),
729 },
730 Outcome::Ok(None) => {
731 return Outcome::Err(sqlmodel_core::Error::Custom(
732 "failed to fetch last insert id".to_string(),
733 ));
734 }
735 Outcome::Err(e) => return Outcome::Err(e),
736 Outcome::Cancelled(r) => return Outcome::Cancelled(r),
737 Outcome::Panicked(p) => return Outcome::Panicked(p),
738 }
739 }
740 }
741
742 let mut child_row = model.to_row();
743 if let (Some(pk_col), Some(id)) = (pk_col, inserted_id) {
744 if pk_cols.len() != 1 {
745 return Outcome::Err(sqlmodel_core::Error::Custom(
746 "joined-table inheritance auto-increment insert currently requires a single-column primary key"
747 .to_string(),
748 ));
749 }
750 for (name, value) in &mut child_row {
751 if *name == pk_col && value.is_null() {
752 *value = Value::BigInt(id);
753 }
754 }
755 if effective_pk_vals.len() == 1 && effective_pk_vals[0].is_null() {
756 effective_pk_vals[0] = Value::BigInt(id);
757 }
758 }
759
760 let (child_sql, child_params, _cols) = build_insert_sql_for_table_with_columns(
761 dialect,
762 M::TABLE_NAME,
763 M::fields(),
764 &child_row,
765 None,
766 );
767 match tx.execute(cx, &child_sql, &child_params).await {
768 Outcome::Ok(count) => Outcome::Ok((count, effective_pk_vals)),
769 Outcome::Err(e) => Outcome::Err(e),
770 Outcome::Cancelled(r) => Outcome::Cancelled(r),
771 Outcome::Panicked(p) => Outcome::Panicked(p),
772 }
773}
774
775async fn tx_rollback_best_effort<Tx: TransactionOps>(tx: Tx, cx: &Cx) {
776 let _ = tx.rollback(cx).await;
777}
778
779#[derive(Debug, Clone)]
783pub enum OnConflict {
784 DoNothing,
786 DoUpdate {
788 columns: Vec<String>,
790 target: Vec<String>,
792 },
793}
794
795#[derive(Debug)]
812pub struct InsertBuilder<'a, M: Model> {
813 model: &'a M,
814 returning: bool,
815 on_conflict: Option<OnConflict>,
816}
817
818impl<'a, M: Model> InsertBuilder<'a, M> {
819 pub fn new(model: &'a M) -> Self {
821 Self {
822 model,
823 returning: false,
824 on_conflict: None,
825 }
826 }
827
828 pub fn returning(mut self) -> Self {
832 self.returning = true;
833 self
834 }
835
836 pub fn on_conflict_do_nothing(mut self) -> Self {
841 self.on_conflict = Some(OnConflict::DoNothing);
842 self
843 }
844
845 pub fn on_conflict_do_update(mut self, columns: &[&str]) -> Self {
859 self.on_conflict = Some(OnConflict::DoUpdate {
860 columns: columns.iter().map(|s| s.to_string()).collect(),
861 target: Vec::new(), });
863 self
864 }
865
866 pub fn on_conflict_target_do_update(mut self, target: &[&str], columns: &[&str]) -> Self {
873 self.on_conflict = Some(OnConflict::DoUpdate {
874 columns: columns.iter().map(|s| s.to_string()).collect(),
875 target: target.iter().map(|s| s.to_string()).collect(),
876 });
877 self
878 }
879
880 pub fn build(&self) -> (String, Vec<Value>) {
882 self.build_with_dialect(Dialect::default())
883 }
884
885 pub fn build_with_dialect(&self, dialect: Dialect) -> (String, Vec<Value>) {
887 let row = self.model.to_row();
888 let fields = M::fields();
889
890 let insert_fields: Vec<_> = row
891 .iter()
892 .map(|(name, value)| {
893 let field = fields.iter().find(|f| f.column_name == *name);
894 if let Some(f) = field {
895 if f.auto_increment && matches!(value, Value::Null) {
896 return (*name, Value::Default);
897 }
898 }
899 (*name, value.clone())
900 })
901 .collect();
902
903 let mut columns = Vec::new();
904 let mut placeholders = Vec::new();
905 let mut params = Vec::new();
906
907 for (name, value) in insert_fields {
908 if matches!(value, Value::Default) && dialect == Dialect::Sqlite {
909 continue;
911 }
912
913 columns.push(name);
914
915 if matches!(value, Value::Default) {
916 placeholders.push("DEFAULT".to_string());
917 } else {
918 params.push(value);
919 placeholders.push(dialect.placeholder(params.len()));
920 }
921 }
922
923 let mut sql = if columns.is_empty() {
924 format!("INSERT INTO {} DEFAULT VALUES", M::TABLE_NAME)
925 } else {
926 format!(
927 "INSERT INTO {} ({}) VALUES ({})",
928 M::TABLE_NAME,
929 columns.join(", "),
930 placeholders.join(", ")
931 )
932 };
933
934 if let Some(on_conflict) = &self.on_conflict {
936 append_on_conflict_clause(dialect, &mut sql, M::PRIMARY_KEY, &columns, on_conflict);
937 }
938
939 if self.returning {
941 sql.push_str(" RETURNING *");
942 }
943
944 (sql, params)
945 }
946
947 pub async fn execute<C: Connection>(
949 self,
950 cx: &Cx,
951 conn: &C,
952 ) -> Outcome<i64, sqlmodel_core::Error> {
953 if is_joined_inheritance_child::<M>() {
954 let dialect = conn.dialect();
955 let on_conflict = self.on_conflict.clone();
956 let (parent_table, parent_fields) = match joined_parent_meta::<M>() {
957 Ok(v) => v,
958 Err(e) => return Outcome::Err(e),
959 };
960
961 let Some(parent_row) = self.model.joined_parent_row() else {
962 return Outcome::Err(sqlmodel_core::Error::Custom(
963 "joined-table inheritance child missing joined_parent_row() implementation"
964 .to_string(),
965 ));
966 };
967
968 let pk_vals = self.model.primary_key_value();
969 let pk_col = M::PRIMARY_KEY.first().copied();
970 let needs_generated_id = pk_col.is_some()
971 && pk_vals.len() == 1
972 && parent_fields
973 .iter()
974 .find(|f| f.column_name == pk_col.unwrap_or("") && f.primary_key)
975 .is_some_and(|f| f.auto_increment)
976 && pk_vals[0].is_null();
977
978 if on_conflict.is_some() {
979 if needs_generated_id || pk_vals.iter().any(|v| v.is_null()) {
980 return Outcome::Err(sqlmodel_core::Error::Custom(
981 "joined-table inheritance insert ON CONFLICT requires explicit primary key values (auto-increment upsert is not supported yet)"
982 .to_string(),
983 ));
984 }
985 if let Some(OnConflict::DoUpdate { target, .. }) = &on_conflict {
988 let pk_target: Vec<String> =
989 M::PRIMARY_KEY.iter().map(|c| (*c).to_string()).collect();
990 if !target.is_empty() && target != &pk_target {
991 return Outcome::Err(sqlmodel_core::Error::Custom(
992 "joined-table inheritance insert ON CONFLICT currently only supports the primary key as conflict target"
993 .to_string(),
994 ));
995 }
996 }
997 }
998
999 let parent_allowed: HashSet<&'static str> =
1000 parent_fields.iter().map(|f| f.column_name).collect();
1001 let child_allowed: HashSet<&'static str> =
1002 M::fields().iter().map(|f| f.column_name).collect();
1003
1004 let (parent_on_conflict, child_on_conflict) = match &on_conflict {
1005 None => (None, None),
1006 Some(OnConflict::DoNothing) => {
1007 (Some(OnConflict::DoNothing), Some(OnConflict::DoNothing))
1008 }
1009 Some(OnConflict::DoUpdate { columns, target }) => {
1010 for c in columns {
1012 if !parent_allowed.contains(c.as_str())
1013 && !child_allowed.contains(c.as_str())
1014 {
1015 return Outcome::Err(sqlmodel_core::Error::Custom(format!(
1016 "unknown joined-table inheritance ON CONFLICT update column '{c}'"
1017 )));
1018 }
1019 }
1020
1021 let parent_cols: Vec<String> = columns
1022 .iter()
1023 .filter(|c| parent_allowed.contains(c.as_str()))
1024 .cloned()
1025 .collect();
1026 let child_cols: Vec<String> = columns
1027 .iter()
1028 .filter(|c| child_allowed.contains(c.as_str()))
1029 .cloned()
1030 .collect();
1031
1032 (
1033 Some(OnConflict::DoUpdate {
1034 columns: parent_cols,
1035 target: target.clone(),
1036 }),
1037 Some(OnConflict::DoUpdate {
1038 columns: child_cols,
1039 target: target.clone(),
1040 }),
1041 )
1042 }
1043 };
1044
1045 let tx_out = conn.begin(cx).await;
1046 let tx = match tx_out {
1047 Outcome::Ok(t) => t,
1048 Outcome::Err(e) => return Outcome::Err(e),
1049 Outcome::Cancelled(r) => return Outcome::Cancelled(r),
1050 Outcome::Panicked(p) => return Outcome::Panicked(p),
1051 };
1052
1053 let mut inserted_id: Option<i64> = None;
1055 if dialect == Dialect::Postgres {
1056 let Some(pk_col) = pk_col else {
1057 tx_rollback_best_effort(tx, cx).await;
1058 return Outcome::Err(sqlmodel_core::Error::Custom(
1059 "joined-table inheritance insert requires a primary key column".to_string(),
1060 ));
1061 };
1062
1063 if needs_generated_id {
1064 let (sql, params, _cols) = build_insert_sql_for_table_with_columns(
1065 dialect,
1066 parent_table,
1067 parent_fields,
1068 &parent_row,
1069 Some(pk_col),
1070 );
1071 match tx.query_one(cx, &sql, ¶ms).await {
1072 Outcome::Ok(Some(row)) => match row.get_as::<i64>(0) {
1073 Ok(v) => inserted_id = Some(v),
1074 Err(e) => {
1075 tx_rollback_best_effort(tx, cx).await;
1076 return Outcome::Err(e);
1077 }
1078 },
1079 Outcome::Ok(None) => {
1080 tx_rollback_best_effort(tx, cx).await;
1081 return Outcome::Err(sqlmodel_core::Error::Custom(
1082 "base insert returned no row".to_string(),
1083 ));
1084 }
1085 Outcome::Err(e) => {
1086 tx_rollback_best_effort(tx, cx).await;
1087 return Outcome::Err(e);
1088 }
1089 Outcome::Cancelled(r) => {
1090 tx_rollback_best_effort(tx, cx).await;
1091 return Outcome::Cancelled(r);
1092 }
1093 Outcome::Panicked(p) => {
1094 tx_rollback_best_effort(tx, cx).await;
1095 return Outcome::Panicked(p);
1096 }
1097 }
1098 } else {
1099 let (mut sql, params, cols) = build_insert_sql_for_table_with_columns(
1100 dialect,
1101 parent_table,
1102 parent_fields,
1103 &parent_row,
1104 None,
1105 );
1106 if let Some(oc) = &parent_on_conflict {
1107 append_on_conflict_clause(dialect, &mut sql, M::PRIMARY_KEY, &cols, oc);
1108 }
1109 match tx.execute(cx, &sql, ¶ms).await {
1110 Outcome::Ok(_) => {}
1111 Outcome::Err(e) => {
1112 tx_rollback_best_effort(tx, cx).await;
1113 return Outcome::Err(e);
1114 }
1115 Outcome::Cancelled(r) => {
1116 tx_rollback_best_effort(tx, cx).await;
1117 return Outcome::Cancelled(r);
1118 }
1119 Outcome::Panicked(p) => {
1120 tx_rollback_best_effort(tx, cx).await;
1121 return Outcome::Panicked(p);
1122 }
1123 }
1124 }
1125 } else {
1126 let (mut sql, params, cols) = build_insert_sql_for_table_with_columns(
1127 dialect,
1128 parent_table,
1129 parent_fields,
1130 &parent_row,
1131 None,
1132 );
1133 if let Some(oc) = &parent_on_conflict {
1134 append_on_conflict_clause(dialect, &mut sql, M::PRIMARY_KEY, &cols, oc);
1135 }
1136 match tx.execute(cx, &sql, ¶ms).await {
1137 Outcome::Ok(_) => {}
1138 Outcome::Err(e) => {
1139 tx_rollback_best_effort(tx, cx).await;
1140 return Outcome::Err(e);
1141 }
1142 Outcome::Cancelled(r) => {
1143 tx_rollback_best_effort(tx, cx).await;
1144 return Outcome::Cancelled(r);
1145 }
1146 Outcome::Panicked(p) => {
1147 tx_rollback_best_effort(tx, cx).await;
1148 return Outcome::Panicked(p);
1149 }
1150 }
1151
1152 if needs_generated_id {
1153 let id_sql = match dialect {
1154 Dialect::Sqlite => "SELECT last_insert_rowid()",
1155 Dialect::Mysql => "SELECT LAST_INSERT_ID()",
1156 Dialect::Postgres => unreachable!(),
1157 };
1158 match tx.query_one(cx, id_sql, &[]).await {
1159 Outcome::Ok(Some(row)) => match row.get_as::<i64>(0) {
1160 Ok(v) => inserted_id = Some(v),
1161 Err(e) => {
1162 tx_rollback_best_effort(tx, cx).await;
1163 return Outcome::Err(e);
1164 }
1165 },
1166 Outcome::Ok(None) => {
1167 tx_rollback_best_effort(tx, cx).await;
1168 return Outcome::Err(sqlmodel_core::Error::Custom(
1169 "failed to fetch last insert id".to_string(),
1170 ));
1171 }
1172 Outcome::Err(e) => {
1173 tx_rollback_best_effort(tx, cx).await;
1174 return Outcome::Err(e);
1175 }
1176 Outcome::Cancelled(r) => {
1177 tx_rollback_best_effort(tx, cx).await;
1178 return Outcome::Cancelled(r);
1179 }
1180 Outcome::Panicked(p) => {
1181 tx_rollback_best_effort(tx, cx).await;
1182 return Outcome::Panicked(p);
1183 }
1184 }
1185 }
1186 }
1187
1188 let mut child_row = self.model.to_row();
1190 if let (Some(pk_col), Some(id)) = (pk_col, inserted_id) {
1191 if M::PRIMARY_KEY.len() != 1 {
1192 tx_rollback_best_effort(tx, cx).await;
1193 return Outcome::Err(sqlmodel_core::Error::Custom(
1194 "joined-table inheritance auto-increment insert currently requires a single-column primary key"
1195 .to_string(),
1196 ));
1197 }
1198
1199 for (name, value) in &mut child_row {
1200 if *name == pk_col && value.is_null() {
1201 *value = Value::BigInt(id);
1202 }
1203 }
1204 }
1205
1206 let (mut child_sql, child_params, child_cols) = build_insert_sql_for_table_with_columns(
1207 dialect,
1208 M::TABLE_NAME,
1209 M::fields(),
1210 &child_row,
1211 None,
1212 );
1213 if let Some(oc) = &child_on_conflict {
1214 append_on_conflict_clause(dialect, &mut child_sql, M::PRIMARY_KEY, &child_cols, oc);
1215 }
1216
1217 match tx.execute(cx, &child_sql, &child_params).await {
1218 Outcome::Ok(_) => {}
1219 Outcome::Err(e) => {
1220 tx_rollback_best_effort(tx, cx).await;
1221 return Outcome::Err(e);
1222 }
1223 Outcome::Cancelled(r) => {
1224 tx_rollback_best_effort(tx, cx).await;
1225 return Outcome::Cancelled(r);
1226 }
1227 Outcome::Panicked(p) => {
1228 tx_rollback_best_effort(tx, cx).await;
1229 return Outcome::Panicked(p);
1230 }
1231 }
1232
1233 match tx.commit(cx).await {
1234 Outcome::Ok(()) => {}
1235 Outcome::Err(e) => return Outcome::Err(e),
1236 Outcome::Cancelled(r) => return Outcome::Cancelled(r),
1237 Outcome::Panicked(p) => return Outcome::Panicked(p),
1238 }
1239
1240 let id = inserted_id
1241 .or_else(|| extract_single_pk_i64(&pk_vals))
1242 .unwrap_or(0);
1243 return Outcome::Ok(id);
1244 }
1245
1246 let (sql, params) = self.build_with_dialect(conn.dialect());
1247 conn.insert(cx, &sql, ¶ms).await
1248 }
1249
1250 pub async fn execute_returning<C: Connection>(
1254 mut self,
1255 cx: &Cx,
1256 conn: &C,
1257 ) -> Outcome<Option<Row>, sqlmodel_core::Error> {
1258 self.returning = true;
1259 if is_joined_inheritance_child::<M>() {
1260 if self.on_conflict.is_some() {
1261 return Outcome::Err(sqlmodel_core::Error::Custom(
1262 "joined-table inheritance insert_returning does not support ON CONFLICT; use execute() for ON CONFLICT semantics"
1263 .to_string(),
1264 ));
1265 }
1266
1267 let dialect = conn.dialect();
1268 let inh = M::inheritance();
1269 let Some(parent_table) = inh.parent else {
1270 return Outcome::Err(sqlmodel_core::Error::Custom(
1271 "joined-table inheritance child missing parent table metadata".to_string(),
1272 ));
1273 };
1274 let Some(parent_fields_fn) = inh.parent_fields_fn else {
1275 return Outcome::Err(sqlmodel_core::Error::Custom(
1276 "joined-table inheritance child missing parent_fields_fn metadata".to_string(),
1277 ));
1278 };
1279 let parent_fields = parent_fields_fn();
1280
1281 let Some(parent_row) = self.model.joined_parent_row() else {
1282 return Outcome::Err(sqlmodel_core::Error::Custom(
1283 "joined-table inheritance child missing joined_parent_row() implementation"
1284 .to_string(),
1285 ));
1286 };
1287
1288 let pk_vals = self.model.primary_key_value();
1289 let pk_col = M::PRIMARY_KEY.first().copied();
1290 let needs_generated_id = pk_col.is_some()
1291 && pk_vals.len() == 1
1292 && parent_fields
1293 .iter()
1294 .find(|f| f.column_name == pk_col.unwrap_or("") && f.primary_key)
1295 .is_some_and(|f| f.auto_increment)
1296 && pk_vals[0].is_null();
1297
1298 let tx_out = conn.begin(cx).await;
1299 let tx = match tx_out {
1300 Outcome::Ok(t) => t,
1301 Outcome::Err(e) => return Outcome::Err(e),
1302 Outcome::Cancelled(r) => return Outcome::Cancelled(r),
1303 Outcome::Panicked(p) => return Outcome::Panicked(p),
1304 };
1305
1306 let mut inserted_id: Option<i64> = None;
1307 if dialect == Dialect::Postgres {
1308 let Some(pk_col) = pk_col else {
1309 tx_rollback_best_effort(tx, cx).await;
1310 return Outcome::Err(sqlmodel_core::Error::Custom(
1311 "joined-table inheritance insert requires a primary key column".to_string(),
1312 ));
1313 };
1314
1315 let (sql, params) = build_insert_sql_for_table(
1316 dialect,
1317 parent_table,
1318 parent_fields,
1319 &parent_row,
1320 Some(pk_col),
1321 );
1322 match tx.query_one(cx, &sql, ¶ms).await {
1323 Outcome::Ok(Some(row)) => match row.get_as::<i64>(0) {
1324 Ok(v) => inserted_id = Some(v),
1325 Err(e) => {
1326 tx_rollback_best_effort(tx, cx).await;
1327 return Outcome::Err(e);
1328 }
1329 },
1330 Outcome::Ok(None) => {
1331 tx_rollback_best_effort(tx, cx).await;
1332 return Outcome::Err(sqlmodel_core::Error::Custom(
1333 "base insert returned no row".to_string(),
1334 ));
1335 }
1336 Outcome::Err(e) => {
1337 tx_rollback_best_effort(tx, cx).await;
1338 return Outcome::Err(e);
1339 }
1340 Outcome::Cancelled(r) => {
1341 tx_rollback_best_effort(tx, cx).await;
1342 return Outcome::Cancelled(r);
1343 }
1344 Outcome::Panicked(p) => {
1345 tx_rollback_best_effort(tx, cx).await;
1346 return Outcome::Panicked(p);
1347 }
1348 }
1349 } else {
1350 let (sql, params) = build_insert_sql_for_table(
1351 dialect,
1352 parent_table,
1353 parent_fields,
1354 &parent_row,
1355 None,
1356 );
1357 match tx.execute(cx, &sql, ¶ms).await {
1358 Outcome::Ok(_) => {}
1359 Outcome::Err(e) => {
1360 tx_rollback_best_effort(tx, cx).await;
1361 return Outcome::Err(e);
1362 }
1363 Outcome::Cancelled(r) => {
1364 tx_rollback_best_effort(tx, cx).await;
1365 return Outcome::Cancelled(r);
1366 }
1367 Outcome::Panicked(p) => {
1368 tx_rollback_best_effort(tx, cx).await;
1369 return Outcome::Panicked(p);
1370 }
1371 }
1372
1373 if needs_generated_id {
1374 let id_sql = match dialect {
1375 Dialect::Sqlite => "SELECT last_insert_rowid()",
1376 Dialect::Mysql => "SELECT LAST_INSERT_ID()",
1377 Dialect::Postgres => unreachable!(),
1378 };
1379 match tx.query_one(cx, id_sql, &[]).await {
1380 Outcome::Ok(Some(row)) => match row.get_as::<i64>(0) {
1381 Ok(v) => inserted_id = Some(v),
1382 Err(e) => {
1383 tx_rollback_best_effort(tx, cx).await;
1384 return Outcome::Err(e);
1385 }
1386 },
1387 Outcome::Ok(None) => {
1388 tx_rollback_best_effort(tx, cx).await;
1389 return Outcome::Err(sqlmodel_core::Error::Custom(
1390 "failed to fetch last insert id".to_string(),
1391 ));
1392 }
1393 Outcome::Err(e) => {
1394 tx_rollback_best_effort(tx, cx).await;
1395 return Outcome::Err(e);
1396 }
1397 Outcome::Cancelled(r) => {
1398 tx_rollback_best_effort(tx, cx).await;
1399 return Outcome::Cancelled(r);
1400 }
1401 Outcome::Panicked(p) => {
1402 tx_rollback_best_effort(tx, cx).await;
1403 return Outcome::Panicked(p);
1404 }
1405 }
1406 }
1407 }
1408
1409 let mut child_row = self.model.to_row();
1410 if let (Some(pk_col), Some(id)) = (pk_col, inserted_id) {
1411 if M::PRIMARY_KEY.len() != 1 {
1412 tx_rollback_best_effort(tx, cx).await;
1413 return Outcome::Err(sqlmodel_core::Error::Custom(
1414 "joined-table inheritance auto-increment insert currently requires a single-column primary key"
1415 .to_string(),
1416 ));
1417 }
1418
1419 for (name, value) in &mut child_row {
1420 if *name == pk_col && value.is_null() {
1421 *value = Value::BigInt(id);
1422 }
1423 }
1424 }
1425
1426 let (child_sql, child_params) = build_insert_sql_for_table(
1427 dialect,
1428 M::TABLE_NAME,
1429 M::fields(),
1430 &child_row,
1431 Some("*"),
1432 );
1433 let row_out = match tx.query_one(cx, &child_sql, &child_params).await {
1434 Outcome::Ok(row) => Outcome::Ok(row),
1435 Outcome::Err(e) => {
1436 tx_rollback_best_effort(tx, cx).await;
1437 return Outcome::Err(e);
1438 }
1439 Outcome::Cancelled(r) => {
1440 tx_rollback_best_effort(tx, cx).await;
1441 return Outcome::Cancelled(r);
1442 }
1443 Outcome::Panicked(p) => {
1444 tx_rollback_best_effort(tx, cx).await;
1445 return Outcome::Panicked(p);
1446 }
1447 };
1448
1449 match tx.commit(cx).await {
1450 Outcome::Ok(()) => {}
1451 Outcome::Err(e) => return Outcome::Err(e),
1452 Outcome::Cancelled(r) => return Outcome::Cancelled(r),
1453 Outcome::Panicked(p) => return Outcome::Panicked(p),
1454 }
1455
1456 return row_out;
1457 }
1458
1459 let (sql, params) = self.build_with_dialect(conn.dialect());
1460 conn.query_one(cx, &sql, ¶ms).await
1461 }
1462}
1463
1464#[derive(Debug)]
1474pub struct InsertManyBuilder<'a, M: Model> {
1475 models: &'a [M],
1476 returning: bool,
1477 on_conflict: Option<OnConflict>,
1478}
1479
1480impl<'a, M: Model> InsertManyBuilder<'a, M> {
1481 pub fn new(models: &'a [M]) -> Self {
1483 Self {
1484 models,
1485 returning: false,
1486 on_conflict: None,
1487 }
1488 }
1489
1490 pub fn returning(mut self) -> Self {
1492 self.returning = true;
1493 self
1494 }
1495
1496 pub fn on_conflict_do_nothing(mut self) -> Self {
1498 self.on_conflict = Some(OnConflict::DoNothing);
1499 self
1500 }
1501
1502 pub fn on_conflict_do_update(mut self, columns: &[&str]) -> Self {
1504 self.on_conflict = Some(OnConflict::DoUpdate {
1505 columns: columns.iter().map(|s| s.to_string()).collect(),
1506 target: Vec::new(),
1507 });
1508 self
1509 }
1510
1511 pub fn build(&self) -> (String, Vec<Value>) {
1513 self.build_with_dialect(Dialect::default())
1514 }
1515
1516 pub fn build_with_dialect(&self, dialect: Dialect) -> (String, Vec<Value>) {
1518 let batches = self.build_batches_with_dialect(dialect);
1519 match batches.len() {
1520 0 => (String::new(), Vec::new()),
1521 1 => batches.into_iter().next().unwrap(),
1522 _ => {
1523 tracing::warn!(
1524 table = M::TABLE_NAME,
1525 "Bulk insert requires multiple statements for this dialect. \
1526 Use build_batches_with_dialect or execute() instead of build_with_dialect."
1527 );
1528 (String::new(), Vec::new())
1529 }
1530 }
1531 }
1532
1533 pub fn build_batches_with_dialect(&self, dialect: Dialect) -> Vec<(String, Vec<Value>)> {
1538 enum Batch {
1539 Values {
1540 columns: Vec<&'static str>,
1541 rows: Vec<Vec<Value>>,
1542 },
1543 DefaultValues,
1544 }
1545
1546 if self.models.is_empty() {
1547 return Vec::new();
1548 }
1549
1550 if is_joined_inheritance_child::<M>() {
1551 tracing::warn!(
1552 table = M::TABLE_NAME,
1553 "build_batches_with_dialect is not available for joined-table inheritance; use execute()/execute_returning()"
1554 );
1555 return Vec::new();
1556 }
1557
1558 if dialect != Dialect::Sqlite {
1559 return vec![self.build_single_with_dialect(dialect)];
1560 }
1561
1562 let fields = M::fields();
1563 let rows: Vec<Vec<(&'static str, Value)>> =
1564 self.models.iter().map(|model| model.to_row()).collect();
1565
1566 let insert_columns: Vec<_> = fields
1568 .iter()
1569 .filter_map(|field| {
1570 if field.auto_increment {
1571 return Some(field.column_name);
1572 }
1573 let has_value = rows.iter().any(|row| {
1574 row.iter()
1575 .find(|(name, _)| name == &field.column_name)
1576 .is_some_and(|(_, v)| !matches!(v, Value::Null))
1577 });
1578 if has_value {
1579 Some(field.column_name)
1580 } else {
1581 None
1582 }
1583 })
1584 .collect();
1585
1586 let mut batches: Vec<Batch> = Vec::new();
1587
1588 for row in &rows {
1589 let mut columns_for_row = Vec::new();
1590 let mut values_for_row = Vec::new();
1591
1592 for col in &insert_columns {
1593 let mut val = row
1594 .iter()
1595 .find(|(name, _)| name == col)
1596 .map_or(Value::Null, |(_, v)| v.clone());
1597
1598 if let Some(f) = fields.iter().find(|f| f.column_name == *col) {
1600 if f.auto_increment && matches!(val, Value::Null) {
1601 val = Value::Default;
1602 }
1603 }
1604
1605 if matches!(val, Value::Default) {
1606 continue;
1607 }
1608
1609 columns_for_row.push(*col);
1610 values_for_row.push(val);
1611 }
1612
1613 if columns_for_row.is_empty() {
1614 batches.push(Batch::DefaultValues);
1615 continue;
1616 }
1617
1618 match batches.last_mut() {
1619 Some(Batch::Values { columns, rows }) if *columns == columns_for_row => {
1620 rows.push(values_for_row);
1621 }
1622 _ => batches.push(Batch::Values {
1623 columns: columns_for_row,
1624 rows: vec![values_for_row],
1625 }),
1626 }
1627 }
1628
1629 let mut statements = Vec::new();
1630
1631 for batch in batches {
1632 match batch {
1633 Batch::DefaultValues => {
1634 let mut sql = format!("INSERT INTO {} DEFAULT VALUES", M::TABLE_NAME);
1635 self.append_on_conflict(dialect, &mut sql, &[]);
1636 self.append_returning(&mut sql);
1637 statements.push((sql, Vec::new()));
1638 }
1639 Batch::Values { columns, rows } => {
1640 let (sql, params) = self.build_values_batch_sql(dialect, &columns, &rows);
1641 statements.push((sql, params));
1642 }
1643 }
1644 }
1645
1646 statements
1647 }
1648
1649 fn build_single_with_dialect(&self, dialect: Dialect) -> (String, Vec<Value>) {
1650 let fields = M::fields();
1651 let rows: Vec<Vec<(&'static str, Value)>> =
1652 self.models.iter().map(|model| model.to_row()).collect();
1653
1654 let insert_columns: Vec<_> = fields
1657 .iter()
1658 .filter_map(|field| {
1659 if field.auto_increment {
1660 return Some(field.column_name);
1661 }
1662 let has_value = rows.iter().any(|row| {
1663 row.iter()
1664 .find(|(name, _)| name == &field.column_name)
1665 .is_some_and(|(_, v)| !matches!(v, Value::Null))
1666 });
1667 if has_value {
1668 Some(field.column_name)
1669 } else {
1670 None
1671 }
1672 })
1673 .collect();
1674
1675 let mut all_values = Vec::new();
1676 let mut value_groups = Vec::new();
1677
1678 for row in &rows {
1679 let values: Vec<_> = insert_columns
1680 .iter()
1681 .map(|col| {
1682 let val = row
1683 .iter()
1684 .find(|(name, _)| name == col)
1685 .map_or(Value::Null, |(_, v)| v.clone());
1686
1687 let field = fields.iter().find(|f| f.column_name == *col);
1689 if let Some(f) = field {
1690 if f.auto_increment && matches!(val, Value::Null) {
1691 return Value::Default;
1692 }
1693 }
1694 val
1695 })
1696 .collect();
1697
1698 let mut placeholders = Vec::new();
1699 for v in &values {
1700 if matches!(v, Value::Default) {
1701 placeholders.push("DEFAULT".to_string());
1702 } else {
1703 all_values.push(v.clone());
1704 placeholders.push(dialect.placeholder(all_values.len()));
1705 }
1706 }
1707
1708 value_groups.push(format!("({})", placeholders.join(", ")));
1709 }
1710
1711 let mut sql = format!(
1712 "INSERT INTO {} ({}) VALUES {}",
1713 M::TABLE_NAME,
1714 insert_columns.join(", "),
1715 value_groups.join(", ")
1716 );
1717
1718 self.append_on_conflict(dialect, &mut sql, &insert_columns);
1719 self.append_returning(&mut sql);
1720
1721 (sql, all_values)
1722 }
1723
1724 fn build_values_batch_sql(
1725 &self,
1726 dialect: Dialect,
1727 columns: &[&'static str],
1728 rows: &[Vec<Value>],
1729 ) -> (String, Vec<Value>) {
1730 let mut params = Vec::new();
1731 let mut value_groups = Vec::new();
1732
1733 for row in rows {
1734 let mut placeholders = Vec::new();
1735 for value in row {
1736 if matches!(value, Value::Default) {
1737 placeholders.push("DEFAULT".to_string());
1738 } else {
1739 params.push(value.clone());
1740 placeholders.push(dialect.placeholder(params.len()));
1741 }
1742 }
1743 value_groups.push(format!("({})", placeholders.join(", ")));
1744 }
1745
1746 let mut sql = if columns.is_empty() {
1747 format!("INSERT INTO {} DEFAULT VALUES", M::TABLE_NAME)
1748 } else {
1749 format!(
1750 "INSERT INTO {} ({}) VALUES {}",
1751 M::TABLE_NAME,
1752 columns.join(", "),
1753 value_groups.join(", ")
1754 )
1755 };
1756
1757 self.append_on_conflict(dialect, &mut sql, columns);
1758 self.append_returning(&mut sql);
1759
1760 (sql, params)
1761 }
1762
1763 fn append_on_conflict(
1764 &self,
1765 dialect: Dialect,
1766 sql: &mut String,
1767 insert_columns: &[&'static str],
1768 ) {
1769 if let Some(on_conflict) = &self.on_conflict {
1770 append_on_conflict_clause(dialect, sql, M::PRIMARY_KEY, insert_columns, on_conflict);
1771 }
1772 }
1773
1774 fn append_returning(&self, sql: &mut String) {
1775 if self.returning {
1776 sql.push_str(" RETURNING *");
1777 }
1778 }
1779
1780 pub async fn execute<C: Connection>(
1782 self,
1783 cx: &Cx,
1784 conn: &C,
1785 ) -> Outcome<u64, sqlmodel_core::Error> {
1786 if is_joined_inheritance_child::<M>() {
1787 if self.on_conflict.is_some() {
1788 return Outcome::Err(sqlmodel_core::Error::Custom(
1789 "joined-table inheritance bulk insert does not support ON CONFLICT yet"
1790 .to_string(),
1791 ));
1792 }
1793
1794 let dialect = conn.dialect();
1795 let (parent_table, parent_fields) = match joined_parent_meta::<M>() {
1796 Ok(v) => v,
1797 Err(e) => return Outcome::Err(e),
1798 };
1799 let tx_out = conn.begin(cx).await;
1800 let tx = match tx_out {
1801 Outcome::Ok(t) => t,
1802 Outcome::Err(e) => return Outcome::Err(e),
1803 Outcome::Cancelled(r) => return Outcome::Cancelled(r),
1804 Outcome::Panicked(p) => return Outcome::Panicked(p),
1805 };
1806
1807 let mut total_inserted: u64 = 0;
1808 for model in self.models {
1809 match insert_joined_model_in_tx::<_, M>(
1810 &tx,
1811 cx,
1812 dialect,
1813 model,
1814 parent_table,
1815 parent_fields,
1816 )
1817 .await
1818 {
1819 Outcome::Ok((count, _)) => {
1820 total_inserted = total_inserted.saturating_add(count);
1821 }
1822 Outcome::Err(e) => {
1823 tx_rollback_best_effort(tx, cx).await;
1824 return Outcome::Err(e);
1825 }
1826 Outcome::Cancelled(r) => {
1827 tx_rollback_best_effort(tx, cx).await;
1828 return Outcome::Cancelled(r);
1829 }
1830 Outcome::Panicked(p) => {
1831 tx_rollback_best_effort(tx, cx).await;
1832 return Outcome::Panicked(p);
1833 }
1834 }
1835 }
1836
1837 return match tx.commit(cx).await {
1838 Outcome::Ok(()) => Outcome::Ok(total_inserted),
1839 Outcome::Err(e) => Outcome::Err(e),
1840 Outcome::Cancelled(r) => Outcome::Cancelled(r),
1841 Outcome::Panicked(p) => Outcome::Panicked(p),
1842 };
1843 }
1844
1845 let batches = self.build_batches_with_dialect(conn.dialect());
1846 if batches.is_empty() {
1847 return Outcome::Ok(0);
1848 }
1849
1850 if batches.len() == 1 {
1851 let (sql, params) = &batches[0];
1852 return conn.execute(cx, sql, params).await;
1853 }
1854
1855 let outcome = conn.batch(cx, &batches).await;
1856 outcome.map(|counts| counts.into_iter().sum())
1857 }
1858
1859 pub async fn execute_returning<C: Connection>(
1861 mut self,
1862 cx: &Cx,
1863 conn: &C,
1864 ) -> Outcome<Vec<Row>, sqlmodel_core::Error> {
1865 self.returning = true;
1866 if is_joined_inheritance_child::<M>() {
1867 if self.on_conflict.is_some() {
1868 return Outcome::Err(sqlmodel_core::Error::Custom(
1869 "joined-table inheritance bulk insert does not support ON CONFLICT yet"
1870 .to_string(),
1871 ));
1872 }
1873
1874 let dialect = conn.dialect();
1875 let (parent_table, parent_fields) = match joined_parent_meta::<M>() {
1876 Ok(v) => v,
1877 Err(e) => return Outcome::Err(e),
1878 };
1879 let pk_cols = M::PRIMARY_KEY;
1880 if pk_cols.is_empty() {
1881 return Outcome::Err(sqlmodel_core::Error::Custom(
1882 "joined-table inheritance returning requires a primary key".to_string(),
1883 ));
1884 }
1885
1886 let tx_out = conn.begin(cx).await;
1887 let tx = match tx_out {
1888 Outcome::Ok(t) => t,
1889 Outcome::Err(e) => return Outcome::Err(e),
1890 Outcome::Cancelled(r) => return Outcome::Cancelled(r),
1891 Outcome::Panicked(p) => return Outcome::Panicked(p),
1892 };
1893
1894 let mut inserted_pk_values: Vec<Vec<Value>> = Vec::with_capacity(self.models.len());
1895 for model in self.models {
1896 match insert_joined_model_in_tx::<_, M>(
1897 &tx,
1898 cx,
1899 dialect,
1900 model,
1901 parent_table,
1902 parent_fields,
1903 )
1904 .await
1905 {
1906 Outcome::Ok((_count, pk_vals)) => {
1907 if pk_vals.len() != pk_cols.len() || pk_vals.iter().any(Value::is_null) {
1908 tx_rollback_best_effort(tx, cx).await;
1909 return Outcome::Err(sqlmodel_core::Error::Custom(
1910 "joined-table inheritance bulk insert returning requires non-null primary key values"
1911 .to_string(),
1912 ));
1913 }
1914 inserted_pk_values.push(pk_vals);
1915 }
1916 Outcome::Err(e) => {
1917 tx_rollback_best_effort(tx, cx).await;
1918 return Outcome::Err(e);
1919 }
1920 Outcome::Cancelled(r) => {
1921 tx_rollback_best_effort(tx, cx).await;
1922 return Outcome::Cancelled(r);
1923 }
1924 Outcome::Panicked(p) => {
1925 tx_rollback_best_effort(tx, cx).await;
1926 return Outcome::Panicked(p);
1927 }
1928 }
1929 }
1930
1931 if inserted_pk_values.is_empty() {
1932 return match tx.commit(cx).await {
1933 Outcome::Ok(()) => Outcome::Ok(Vec::new()),
1934 Outcome::Err(e) => Outcome::Err(e),
1935 Outcome::Cancelled(r) => Outcome::Cancelled(r),
1936 Outcome::Panicked(p) => Outcome::Panicked(p),
1937 };
1938 }
1939
1940 let (select_sql, select_params) = match build_joined_child_select_sql_by_pk_in::<M>(
1941 dialect,
1942 pk_cols,
1943 &inserted_pk_values,
1944 ) {
1945 Ok(v) => v,
1946 Err(e) => {
1947 tx_rollback_best_effort(tx, cx).await;
1948 return Outcome::Err(e);
1949 }
1950 };
1951 if select_sql.is_empty() {
1952 tx_rollback_best_effort(tx, cx).await;
1953 return Outcome::Ok(Vec::new());
1954 }
1955 let rows = match tx.query(cx, &select_sql, &select_params).await {
1956 Outcome::Ok(rows) => rows,
1957 Outcome::Err(e) => {
1958 tx_rollback_best_effort(tx, cx).await;
1959 return Outcome::Err(e);
1960 }
1961 Outcome::Cancelled(r) => {
1962 tx_rollback_best_effort(tx, cx).await;
1963 return Outcome::Cancelled(r);
1964 }
1965 Outcome::Panicked(p) => {
1966 tx_rollback_best_effort(tx, cx).await;
1967 return Outcome::Panicked(p);
1968 }
1969 };
1970
1971 return match tx.commit(cx).await {
1972 Outcome::Ok(()) => Outcome::Ok(rows),
1973 Outcome::Err(e) => Outcome::Err(e),
1974 Outcome::Cancelled(r) => Outcome::Cancelled(r),
1975 Outcome::Panicked(p) => Outcome::Panicked(p),
1976 };
1977 }
1978
1979 let batches = self.build_batches_with_dialect(conn.dialect());
1980 if batches.is_empty() {
1981 return Outcome::Ok(Vec::new());
1982 }
1983
1984 let mut all_rows = Vec::new();
1985 for (sql, params) in batches {
1986 match conn.query(cx, &sql, ¶ms).await {
1987 Outcome::Ok(mut rows) => all_rows.append(&mut rows),
1988 Outcome::Err(e) => return Outcome::Err(e),
1989 Outcome::Cancelled(r) => return Outcome::Cancelled(r),
1990 Outcome::Panicked(p) => return Outcome::Panicked(p),
1991 }
1992 }
1993
1994 Outcome::Ok(all_rows)
1995 }
1996}
1997
1998#[derive(Debug, Clone)]
2000pub struct SetClause {
2001 column: String,
2002 value: Value,
2003}
2004
2005#[derive(Debug)]
2024pub struct UpdateBuilder<'a, M: Model> {
2025 model: Option<&'a M>,
2026 where_clause: Option<Where>,
2027 set_fields: Option<Vec<&'static str>>,
2028 explicit_sets: Vec<SetClause>,
2029 returning: bool,
2030}
2031
2032impl<'a, M: Model> UpdateBuilder<'a, M> {
2033 pub fn new(model: &'a M) -> Self {
2035 Self {
2036 model: Some(model),
2037 where_clause: None,
2038 set_fields: None,
2039 explicit_sets: Vec::new(),
2040 returning: false,
2041 }
2042 }
2043
2044 pub fn empty() -> Self {
2048 Self {
2049 model: None,
2050 where_clause: None,
2051 set_fields: None,
2052 explicit_sets: Vec::new(),
2053 returning: false,
2054 }
2055 }
2056
2057 pub fn set<V: Into<Value>>(mut self, column: &str, value: V) -> Self {
2062 self.explicit_sets.push(SetClause {
2063 column: column.to_string(),
2064 value: value.into(),
2065 });
2066 self
2067 }
2068
2069 pub fn set_only(mut self, fields: &[&'static str]) -> Self {
2071 self.set_fields = Some(fields.to_vec());
2072 self
2073 }
2074
2075 pub fn filter(mut self, expr: Expr) -> Self {
2077 self.where_clause = Some(match self.where_clause {
2078 Some(existing) => existing.and(expr),
2079 None => Where::new(expr),
2080 });
2081 self
2082 }
2083
2084 pub fn returning(mut self) -> Self {
2086 self.returning = true;
2087 self
2088 }
2089
2090 pub fn build(&self) -> (String, Vec<Value>) {
2092 self.build_with_dialect(Dialect::default())
2093 }
2094
2095 pub fn build_with_dialect(&self, dialect: Dialect) -> (String, Vec<Value>) {
2097 let pk = M::PRIMARY_KEY;
2098 let mut params = Vec::new();
2099 let mut set_clauses = Vec::new();
2100
2101 for set in &self.explicit_sets {
2103 set_clauses.push(format!(
2104 "{} = {}",
2105 set.column,
2106 dialect.placeholder(params.len() + 1)
2107 ));
2108 params.push(set.value.clone());
2109 }
2110
2111 if let Some(model) = &self.model {
2113 let row = model.to_row();
2114
2115 let update_fields: Vec<_> = row
2117 .iter()
2118 .filter(|(name, _)| {
2119 if pk.contains(name) {
2121 return false;
2122 }
2123 if self.explicit_sets.iter().any(|s| s.column == *name) {
2125 return false;
2126 }
2127 if let Some(fields) = &self.set_fields {
2129 return fields.contains(name);
2130 }
2131 true
2132 })
2133 .collect();
2134
2135 for (name, value) in update_fields {
2136 set_clauses.push(format!(
2137 "{} = {}",
2138 name,
2139 dialect.placeholder(params.len() + 1)
2140 ));
2141 params.push(value.clone());
2142 }
2143 }
2144
2145 if set_clauses.is_empty() {
2146 return (String::new(), Vec::new());
2148 }
2149
2150 let mut sql = format!("UPDATE {} SET {}", M::TABLE_NAME, set_clauses.join(", "));
2151
2152 if let Some(where_clause) = &self.where_clause {
2154 let (where_sql, where_params) = where_clause.build_with_dialect(dialect, params.len());
2155 sql.push_str(" WHERE ");
2156 sql.push_str(&where_sql);
2157 params.extend(where_params);
2158 } else if let Some(model) = &self.model {
2159 let pk_values = model.primary_key_value();
2161 let pk_conditions: Vec<_> = pk
2162 .iter()
2163 .zip(pk_values.iter())
2164 .enumerate()
2165 .map(|(i, (col, _))| {
2166 format!("{} = {}", col, dialect.placeholder(params.len() + i + 1))
2167 })
2168 .collect();
2169
2170 if !pk_conditions.is_empty() {
2171 sql.push_str(" WHERE ");
2172 sql.push_str(&pk_conditions.join(" AND "));
2173 params.extend(pk_values);
2174 }
2175 }
2176
2177 if self.returning {
2179 sql.push_str(" RETURNING *");
2180 }
2181
2182 (sql, params)
2183 }
2184
2185 pub async fn execute<C: Connection>(
2192 self,
2193 cx: &Cx,
2194 conn: &C,
2195 ) -> Outcome<u64, sqlmodel_core::Error> {
2196 if is_joined_inheritance_child::<M>() {
2197 if self.model.is_none() {
2198 if self.explicit_sets.is_empty() {
2199 return Outcome::Err(sqlmodel_core::Error::Custom(
2200 "joined-table inheritance explicit update requires at least one SET clause"
2201 .to_string(),
2202 ));
2203 }
2204
2205 let dialect = conn.dialect();
2206 let (parent_table, parent_fields) = match joined_parent_meta::<M>() {
2207 Ok(v) => v,
2208 Err(e) => return Outcome::Err(e),
2209 };
2210 let (parent_sets, child_sets) = match split_explicit_joined_sets::<M>(
2211 &self.explicit_sets,
2212 parent_table,
2213 parent_fields,
2214 ) {
2215 Ok(v) => v,
2216 Err(e) => return Outcome::Err(e),
2217 };
2218
2219 let tx_out = conn.begin(cx).await;
2220 let tx = match tx_out {
2221 Outcome::Ok(t) => t,
2222 Outcome::Err(e) => return Outcome::Err(e),
2223 Outcome::Cancelled(r) => return Outcome::Cancelled(r),
2224 Outcome::Panicked(p) => return Outcome::Panicked(p),
2225 };
2226
2227 let pk_values = match select_joined_pk_values_in_tx::<_, M>(
2228 &tx,
2229 cx,
2230 dialect,
2231 self.where_clause.as_ref(),
2232 )
2233 .await
2234 {
2235 Outcome::Ok(v) => v,
2236 Outcome::Err(e) => {
2237 tx_rollback_best_effort(tx, cx).await;
2238 return Outcome::Err(e);
2239 }
2240 Outcome::Cancelled(r) => {
2241 tx_rollback_best_effort(tx, cx).await;
2242 return Outcome::Cancelled(r);
2243 }
2244 Outcome::Panicked(p) => {
2245 tx_rollback_best_effort(tx, cx).await;
2246 return Outcome::Panicked(p);
2247 }
2248 };
2249
2250 if pk_values.is_empty() {
2251 return match tx.commit(cx).await {
2252 Outcome::Ok(()) => Outcome::Ok(0),
2253 Outcome::Err(e) => Outcome::Err(e),
2254 Outcome::Cancelled(r) => Outcome::Cancelled(r),
2255 Outcome::Panicked(p) => Outcome::Panicked(p),
2256 };
2257 }
2258
2259 let mut total = 0_u64;
2260
2261 if !parent_sets.is_empty() {
2262 let (parent_sql, parent_params) = build_update_sql_for_table_pk_in(
2263 dialect,
2264 parent_table,
2265 M::PRIMARY_KEY,
2266 &pk_values,
2267 &parent_sets,
2268 );
2269 if !parent_sql.is_empty() {
2270 match tx.execute(cx, &parent_sql, &parent_params).await {
2271 Outcome::Ok(n) => total = total.saturating_add(n),
2272 Outcome::Err(e) => {
2273 tx_rollback_best_effort(tx, cx).await;
2274 return Outcome::Err(e);
2275 }
2276 Outcome::Cancelled(r) => {
2277 tx_rollback_best_effort(tx, cx).await;
2278 return Outcome::Cancelled(r);
2279 }
2280 Outcome::Panicked(p) => {
2281 tx_rollback_best_effort(tx, cx).await;
2282 return Outcome::Panicked(p);
2283 }
2284 }
2285 }
2286 }
2287
2288 if !child_sets.is_empty() {
2289 let (child_sql, child_params) = build_update_sql_for_table_pk_in(
2290 dialect,
2291 M::TABLE_NAME,
2292 M::PRIMARY_KEY,
2293 &pk_values,
2294 &child_sets,
2295 );
2296 if !child_sql.is_empty() {
2297 match tx.execute(cx, &child_sql, &child_params).await {
2298 Outcome::Ok(n) => total = total.saturating_add(n),
2299 Outcome::Err(e) => {
2300 tx_rollback_best_effort(tx, cx).await;
2301 return Outcome::Err(e);
2302 }
2303 Outcome::Cancelled(r) => {
2304 tx_rollback_best_effort(tx, cx).await;
2305 return Outcome::Cancelled(r);
2306 }
2307 Outcome::Panicked(p) => {
2308 tx_rollback_best_effort(tx, cx).await;
2309 return Outcome::Panicked(p);
2310 }
2311 }
2312 }
2313 }
2314
2315 return match tx.commit(cx).await {
2316 Outcome::Ok(()) => Outcome::Ok(total),
2317 Outcome::Err(e) => Outcome::Err(e),
2318 Outcome::Cancelled(r) => Outcome::Cancelled(r),
2319 Outcome::Panicked(p) => Outcome::Panicked(p),
2320 };
2321 }
2322
2323 if self.where_clause.is_some() || !self.explicit_sets.is_empty() {
2324 return Outcome::Err(sqlmodel_core::Error::Custom(
2325 "joined-table inheritance update with a model supports model-based updates only; use UpdateBuilder::empty().set(...).filter(...) for explicit WHERE/SET"
2326 .to_string(),
2327 ));
2328 }
2329
2330 let dialect = conn.dialect();
2331 let Some(model) = self.model else {
2332 return Outcome::Err(sqlmodel_core::Error::Custom(
2333 "update called without model".to_string(),
2334 ));
2335 };
2336 let inh = M::inheritance();
2337 let Some(parent_table) = inh.parent else {
2338 return Outcome::Err(sqlmodel_core::Error::Custom(
2339 "joined-table inheritance child missing parent table metadata".to_string(),
2340 ));
2341 };
2342 let Some(parent_fields_fn) = inh.parent_fields_fn else {
2343 return Outcome::Err(sqlmodel_core::Error::Custom(
2344 "joined-table inheritance child missing parent_fields_fn metadata".to_string(),
2345 ));
2346 };
2347 let parent_fields = parent_fields_fn();
2348 let Some(parent_row) = model.joined_parent_row() else {
2349 return Outcome::Err(sqlmodel_core::Error::Custom(
2350 "joined-table inheritance child missing joined_parent_row() implementation"
2351 .to_string(),
2352 ));
2353 };
2354
2355 let pk_cols = M::PRIMARY_KEY;
2356 let pk_vals = model.primary_key_value();
2357
2358 let tx_out = conn.begin(cx).await;
2359 let tx = match tx_out {
2360 Outcome::Ok(t) => t,
2361 Outcome::Err(e) => return Outcome::Err(e),
2362 Outcome::Cancelled(r) => return Outcome::Cancelled(r),
2363 Outcome::Panicked(p) => return Outcome::Panicked(p),
2364 };
2365
2366 let mut total = 0_u64;
2367
2368 let mut parent_sets: Vec<(&'static str, Value)> = Vec::new();
2370 for f in parent_fields {
2371 if f.primary_key || pk_cols.contains(&f.column_name) {
2372 continue;
2373 }
2374 if let Some((_, v)) = parent_row.iter().find(|(k, _)| *k == f.column_name) {
2375 parent_sets.push((f.column_name, v.clone()));
2376 }
2377 }
2378 let (parent_sql, parent_params) =
2379 build_update_sql_for_table(dialect, parent_table, pk_cols, &pk_vals, &parent_sets);
2380 if !parent_sql.is_empty() {
2381 match tx.execute(cx, &parent_sql, &parent_params).await {
2382 Outcome::Ok(n) => total = total.saturating_add(n),
2383 Outcome::Err(e) => {
2384 tx_rollback_best_effort(tx, cx).await;
2385 return Outcome::Err(e);
2386 }
2387 Outcome::Cancelled(r) => {
2388 tx_rollback_best_effort(tx, cx).await;
2389 return Outcome::Cancelled(r);
2390 }
2391 Outcome::Panicked(p) => {
2392 tx_rollback_best_effort(tx, cx).await;
2393 return Outcome::Panicked(p);
2394 }
2395 }
2396 }
2397
2398 let row = model.to_row();
2400 let mut child_sets: Vec<(&'static str, Value)> = Vec::new();
2401 for (name, value) in row {
2402 if pk_cols.contains(&name) {
2403 continue;
2404 }
2405 if let Some(fields) = &self.set_fields {
2406 if !fields.contains(&name) {
2407 continue;
2408 }
2409 }
2410 child_sets.push((name, value));
2411 }
2412 let (child_sql, child_params) =
2413 build_update_sql_for_table(dialect, M::TABLE_NAME, pk_cols, &pk_vals, &child_sets);
2414 if !child_sql.is_empty() {
2415 match tx.execute(cx, &child_sql, &child_params).await {
2416 Outcome::Ok(n) => total = total.saturating_add(n),
2417 Outcome::Err(e) => {
2418 tx_rollback_best_effort(tx, cx).await;
2419 return Outcome::Err(e);
2420 }
2421 Outcome::Cancelled(r) => {
2422 tx_rollback_best_effort(tx, cx).await;
2423 return Outcome::Cancelled(r);
2424 }
2425 Outcome::Panicked(p) => {
2426 tx_rollback_best_effort(tx, cx).await;
2427 return Outcome::Panicked(p);
2428 }
2429 }
2430 }
2431
2432 match tx.commit(cx).await {
2433 Outcome::Ok(()) => Outcome::Ok(total),
2434 Outcome::Err(e) => Outcome::Err(e),
2435 Outcome::Cancelled(r) => Outcome::Cancelled(r),
2436 Outcome::Panicked(p) => Outcome::Panicked(p),
2437 }
2438 } else {
2439 let (sql, params) = self.build_with_dialect(conn.dialect());
2440 if sql.is_empty() {
2441 return Outcome::Ok(0);
2442 }
2443 conn.execute(cx, &sql, ¶ms).await
2444 }
2445 }
2446
2447 pub async fn execute_returning<C: Connection>(
2449 mut self,
2450 cx: &Cx,
2451 conn: &C,
2452 ) -> Outcome<Vec<Row>, sqlmodel_core::Error> {
2453 self.returning = true;
2454 if is_joined_inheritance_child::<M>() {
2455 if self.model.is_none() {
2456 if self.explicit_sets.is_empty() {
2457 return Outcome::Err(sqlmodel_core::Error::Custom(
2458 "joined-table inheritance explicit update_returning requires at least one SET clause"
2459 .to_string(),
2460 ));
2461 }
2462
2463 let dialect = conn.dialect();
2464 let (parent_table, parent_fields) = match joined_parent_meta::<M>() {
2465 Ok(v) => v,
2466 Err(e) => return Outcome::Err(e),
2467 };
2468 let (parent_sets, child_sets) = match split_explicit_joined_sets::<M>(
2469 &self.explicit_sets,
2470 parent_table,
2471 parent_fields,
2472 ) {
2473 Ok(v) => v,
2474 Err(e) => return Outcome::Err(e),
2475 };
2476
2477 let tx_out = conn.begin(cx).await;
2478 let tx = match tx_out {
2479 Outcome::Ok(t) => t,
2480 Outcome::Err(e) => return Outcome::Err(e),
2481 Outcome::Cancelled(r) => return Outcome::Cancelled(r),
2482 Outcome::Panicked(p) => return Outcome::Panicked(p),
2483 };
2484
2485 let pk_values = match select_joined_pk_values_in_tx::<_, M>(
2486 &tx,
2487 cx,
2488 dialect,
2489 self.where_clause.as_ref(),
2490 )
2491 .await
2492 {
2493 Outcome::Ok(v) => v,
2494 Outcome::Err(e) => {
2495 tx_rollback_best_effort(tx, cx).await;
2496 return Outcome::Err(e);
2497 }
2498 Outcome::Cancelled(r) => {
2499 tx_rollback_best_effort(tx, cx).await;
2500 return Outcome::Cancelled(r);
2501 }
2502 Outcome::Panicked(p) => {
2503 tx_rollback_best_effort(tx, cx).await;
2504 return Outcome::Panicked(p);
2505 }
2506 };
2507
2508 if pk_values.is_empty() {
2509 return match tx.commit(cx).await {
2510 Outcome::Ok(()) => Outcome::Ok(Vec::new()),
2511 Outcome::Err(e) => Outcome::Err(e),
2512 Outcome::Cancelled(r) => Outcome::Cancelled(r),
2513 Outcome::Panicked(p) => Outcome::Panicked(p),
2514 };
2515 }
2516
2517 if !parent_sets.is_empty() {
2518 let (parent_sql, parent_params) = build_update_sql_for_table_pk_in(
2519 dialect,
2520 parent_table,
2521 M::PRIMARY_KEY,
2522 &pk_values,
2523 &parent_sets,
2524 );
2525 if !parent_sql.is_empty() {
2526 match tx.execute(cx, &parent_sql, &parent_params).await {
2527 Outcome::Ok(_) => {}
2528 Outcome::Err(e) => {
2529 tx_rollback_best_effort(tx, cx).await;
2530 return Outcome::Err(e);
2531 }
2532 Outcome::Cancelled(r) => {
2533 tx_rollback_best_effort(tx, cx).await;
2534 return Outcome::Cancelled(r);
2535 }
2536 Outcome::Panicked(p) => {
2537 tx_rollback_best_effort(tx, cx).await;
2538 return Outcome::Panicked(p);
2539 }
2540 }
2541 }
2542 }
2543
2544 if !child_sets.is_empty() {
2545 let (child_sql, child_params) = build_update_sql_for_table_pk_in(
2546 dialect,
2547 M::TABLE_NAME,
2548 M::PRIMARY_KEY,
2549 &pk_values,
2550 &child_sets,
2551 );
2552 if !child_sql.is_empty() {
2553 match tx.execute(cx, &child_sql, &child_params).await {
2554 Outcome::Ok(_) => {}
2555 Outcome::Err(e) => {
2556 tx_rollback_best_effort(tx, cx).await;
2557 return Outcome::Err(e);
2558 }
2559 Outcome::Cancelled(r) => {
2560 tx_rollback_best_effort(tx, cx).await;
2561 return Outcome::Cancelled(r);
2562 }
2563 Outcome::Panicked(p) => {
2564 tx_rollback_best_effort(tx, cx).await;
2565 return Outcome::Panicked(p);
2566 }
2567 }
2568 }
2569 }
2570
2571 let (select_sql, select_params) = match build_joined_child_select_sql_by_pk_in::<M>(
2572 dialect,
2573 M::PRIMARY_KEY,
2574 &pk_values,
2575 ) {
2576 Ok(v) => v,
2577 Err(e) => {
2578 tx_rollback_best_effort(tx, cx).await;
2579 return Outcome::Err(e);
2580 }
2581 };
2582 let rows = if select_sql.is_empty() {
2583 Vec::new()
2584 } else {
2585 match tx.query(cx, &select_sql, &select_params).await {
2586 Outcome::Ok(rows) => rows,
2587 Outcome::Err(e) => {
2588 tx_rollback_best_effort(tx, cx).await;
2589 return Outcome::Err(e);
2590 }
2591 Outcome::Cancelled(r) => {
2592 tx_rollback_best_effort(tx, cx).await;
2593 return Outcome::Cancelled(r);
2594 }
2595 Outcome::Panicked(p) => {
2596 tx_rollback_best_effort(tx, cx).await;
2597 return Outcome::Panicked(p);
2598 }
2599 }
2600 };
2601
2602 return match tx.commit(cx).await {
2603 Outcome::Ok(()) => Outcome::Ok(rows),
2604 Outcome::Err(e) => Outcome::Err(e),
2605 Outcome::Cancelled(r) => Outcome::Cancelled(r),
2606 Outcome::Panicked(p) => Outcome::Panicked(p),
2607 };
2608 }
2609
2610 if self.where_clause.is_some() || !self.explicit_sets.is_empty() {
2611 return Outcome::Err(sqlmodel_core::Error::Custom(
2612 "joined-table inheritance update_returning with a model supports model-based updates only; use UpdateBuilder::empty().set(...).filter(...) for explicit WHERE/SET"
2613 .to_string(),
2614 ));
2615 }
2616
2617 let dialect = conn.dialect();
2618 let Some(model) = self.model else {
2619 return Outcome::Err(sqlmodel_core::Error::Custom(
2620 "update_returning called without model".to_string(),
2621 ));
2622 };
2623 let inh = M::inheritance();
2624 let Some(parent_table) = inh.parent else {
2625 return Outcome::Err(sqlmodel_core::Error::Custom(
2626 "joined-table inheritance child missing parent table metadata".to_string(),
2627 ));
2628 };
2629 let Some(parent_fields_fn) = inh.parent_fields_fn else {
2630 return Outcome::Err(sqlmodel_core::Error::Custom(
2631 "joined-table inheritance child missing parent_fields_fn metadata".to_string(),
2632 ));
2633 };
2634 let parent_fields = parent_fields_fn();
2635 let Some(parent_row) = model.joined_parent_row() else {
2636 return Outcome::Err(sqlmodel_core::Error::Custom(
2637 "joined-table inheritance child missing joined_parent_row() implementation"
2638 .to_string(),
2639 ));
2640 };
2641
2642 let pk_cols = M::PRIMARY_KEY;
2643 let pk_vals = model.primary_key_value();
2644
2645 let tx_out = conn.begin(cx).await;
2646 let tx = match tx_out {
2647 Outcome::Ok(t) => t,
2648 Outcome::Err(e) => return Outcome::Err(e),
2649 Outcome::Cancelled(r) => return Outcome::Cancelled(r),
2650 Outcome::Panicked(p) => return Outcome::Panicked(p),
2651 };
2652
2653 let mut parent_sets: Vec<(&'static str, Value)> = Vec::new();
2655 for f in parent_fields {
2656 if f.primary_key || pk_cols.contains(&f.column_name) {
2657 continue;
2658 }
2659 if let Some((_, v)) = parent_row.iter().find(|(k, _)| *k == f.column_name) {
2660 parent_sets.push((f.column_name, v.clone()));
2661 }
2662 }
2663 let (parent_sql, parent_params) =
2664 build_update_sql_for_table(dialect, parent_table, pk_cols, &pk_vals, &parent_sets);
2665 if !parent_sql.is_empty() {
2666 match tx.execute(cx, &parent_sql, &parent_params).await {
2667 Outcome::Ok(_) => {}
2668 Outcome::Err(e) => {
2669 tx_rollback_best_effort(tx, cx).await;
2670 return Outcome::Err(e);
2671 }
2672 Outcome::Cancelled(r) => {
2673 tx_rollback_best_effort(tx, cx).await;
2674 return Outcome::Cancelled(r);
2675 }
2676 Outcome::Panicked(p) => {
2677 tx_rollback_best_effort(tx, cx).await;
2678 return Outcome::Panicked(p);
2679 }
2680 }
2681 }
2682
2683 let row = model.to_row();
2685 let mut child_sets: Vec<(&'static str, Value)> = Vec::new();
2686 for (name, value) in row {
2687 if pk_cols.contains(&name) {
2688 continue;
2689 }
2690 if let Some(fields) = &self.set_fields {
2691 if !fields.contains(&name) {
2692 continue;
2693 }
2694 }
2695 child_sets.push((name, value));
2696 }
2697 let (mut child_sql, child_params) =
2698 build_update_sql_for_table(dialect, M::TABLE_NAME, pk_cols, &pk_vals, &child_sets);
2699 if child_sql.is_empty() {
2700 tx_rollback_best_effort(tx, cx).await;
2701 return Outcome::Ok(Vec::new());
2702 }
2703 child_sql.push_str(" RETURNING *");
2704
2705 let rows = match tx.query(cx, &child_sql, &child_params).await {
2706 Outcome::Ok(rows) => rows,
2707 Outcome::Err(e) => {
2708 tx_rollback_best_effort(tx, cx).await;
2709 return Outcome::Err(e);
2710 }
2711 Outcome::Cancelled(r) => {
2712 tx_rollback_best_effort(tx, cx).await;
2713 return Outcome::Cancelled(r);
2714 }
2715 Outcome::Panicked(p) => {
2716 tx_rollback_best_effort(tx, cx).await;
2717 return Outcome::Panicked(p);
2718 }
2719 };
2720
2721 match tx.commit(cx).await {
2722 Outcome::Ok(()) => Outcome::Ok(rows),
2723 Outcome::Err(e) => Outcome::Err(e),
2724 Outcome::Cancelled(r) => Outcome::Cancelled(r),
2725 Outcome::Panicked(p) => Outcome::Panicked(p),
2726 }
2727 } else {
2728 let (sql, params) = self.build_with_dialect(conn.dialect());
2729 if sql.is_empty() {
2730 return Outcome::Ok(Vec::new());
2731 }
2732 conn.query(cx, &sql, ¶ms).await
2733 }
2734 }
2735}
2736
2737#[derive(Debug)]
2758pub struct DeleteBuilder<'a, M: Model> {
2759 model: Option<&'a M>,
2760 where_clause: Option<Where>,
2761 returning: bool,
2762 _marker: PhantomData<M>,
2763}
2764
2765impl<'a, M: Model> DeleteBuilder<'a, M> {
2766 pub fn new() -> Self {
2768 Self {
2769 model: None,
2770 where_clause: None,
2771 returning: false,
2772 _marker: PhantomData,
2773 }
2774 }
2775
2776 pub fn from_model(model: &'a M) -> Self {
2780 Self {
2781 model: Some(model),
2782 where_clause: None,
2783 returning: false,
2784 _marker: PhantomData,
2785 }
2786 }
2787
2788 pub fn filter(mut self, expr: Expr) -> Self {
2790 self.where_clause = Some(match self.where_clause {
2791 Some(existing) => existing.and(expr),
2792 None => Where::new(expr),
2793 });
2794 self
2795 }
2796
2797 pub fn returning(mut self) -> Self {
2799 self.returning = true;
2800 self
2801 }
2802
2803 pub fn build(&self) -> (String, Vec<Value>) {
2805 self.build_with_dialect(Dialect::default())
2806 }
2807
2808 pub fn build_with_dialect(&self, dialect: Dialect) -> (String, Vec<Value>) {
2810 let mut sql = format!("DELETE FROM {}", M::TABLE_NAME);
2811 let mut params = Vec::new();
2812
2813 if let Some(where_clause) = &self.where_clause {
2814 let (where_sql, where_params) = where_clause.build_with_dialect(dialect, 0);
2815 sql.push_str(" WHERE ");
2816 sql.push_str(&where_sql);
2817 params = where_params;
2818 } else if let Some(model) = &self.model {
2819 let pk = M::PRIMARY_KEY;
2821 let pk_values = model.primary_key_value();
2822 let pk_conditions: Vec<_> = pk
2823 .iter()
2824 .zip(pk_values.iter())
2825 .enumerate()
2826 .map(|(i, (col, _))| format!("{} = {}", col, dialect.placeholder(i + 1)))
2827 .collect();
2828
2829 if !pk_conditions.is_empty() {
2830 sql.push_str(" WHERE ");
2831 sql.push_str(&pk_conditions.join(" AND "));
2832 params.extend(pk_values);
2833 }
2834 }
2835
2836 if self.returning {
2838 sql.push_str(" RETURNING *");
2839 }
2840
2841 (sql, params)
2842 }
2843
2844 pub async fn execute<C: Connection>(
2850 self,
2851 cx: &Cx,
2852 conn: &C,
2853 ) -> Outcome<u64, sqlmodel_core::Error> {
2854 if is_joined_inheritance_child::<M>() {
2855 let dialect = conn.dialect();
2856 let (parent_table, _parent_fields) = match joined_parent_meta::<M>() {
2857 Ok(v) => v,
2858 Err(e) => return Outcome::Err(e),
2859 };
2860
2861 let tx_out = conn.begin(cx).await;
2862 let tx = match tx_out {
2863 Outcome::Ok(t) => t,
2864 Outcome::Err(e) => return Outcome::Err(e),
2865 Outcome::Cancelled(r) => return Outcome::Cancelled(r),
2866 Outcome::Panicked(p) => return Outcome::Panicked(p),
2867 };
2868
2869 let pk_values = if let Some(where_clause) = self.where_clause.as_ref() {
2870 match select_joined_pk_values_in_tx::<_, M>(&tx, cx, dialect, Some(where_clause))
2871 .await
2872 {
2873 Outcome::Ok(v) => v,
2874 Outcome::Err(e) => {
2875 tx_rollback_best_effort(tx, cx).await;
2876 return Outcome::Err(e);
2877 }
2878 Outcome::Cancelled(r) => {
2879 tx_rollback_best_effort(tx, cx).await;
2880 return Outcome::Cancelled(r);
2881 }
2882 Outcome::Panicked(p) => {
2883 tx_rollback_best_effort(tx, cx).await;
2884 return Outcome::Panicked(p);
2885 }
2886 }
2887 } else if let Some(model) = self.model {
2888 vec![model.primary_key_value()]
2889 } else {
2890 match select_joined_pk_values_in_tx::<_, M>(&tx, cx, dialect, None).await {
2892 Outcome::Ok(v) => v,
2893 Outcome::Err(e) => {
2894 tx_rollback_best_effort(tx, cx).await;
2895 return Outcome::Err(e);
2896 }
2897 Outcome::Cancelled(r) => {
2898 tx_rollback_best_effort(tx, cx).await;
2899 return Outcome::Cancelled(r);
2900 }
2901 Outcome::Panicked(p) => {
2902 tx_rollback_best_effort(tx, cx).await;
2903 return Outcome::Panicked(p);
2904 }
2905 }
2906 };
2907
2908 if pk_values.is_empty() {
2909 return match tx.commit(cx).await {
2910 Outcome::Ok(()) => Outcome::Ok(0),
2911 Outcome::Err(e) => Outcome::Err(e),
2912 Outcome::Cancelled(r) => Outcome::Cancelled(r),
2913 Outcome::Panicked(p) => Outcome::Panicked(p),
2914 };
2915 }
2916
2917 let (child_sql, child_params) = build_delete_sql_for_table_pk_in(
2918 dialect,
2919 M::TABLE_NAME,
2920 M::PRIMARY_KEY,
2921 &pk_values,
2922 );
2923 let (parent_sql, parent_params) =
2924 build_delete_sql_for_table_pk_in(dialect, parent_table, M::PRIMARY_KEY, &pk_values);
2925
2926 let mut total = 0_u64;
2927
2928 if !child_sql.is_empty() {
2929 match tx.execute(cx, &child_sql, &child_params).await {
2930 Outcome::Ok(n) => total = total.saturating_add(n),
2931 Outcome::Err(e) => {
2932 tx_rollback_best_effort(tx, cx).await;
2933 return Outcome::Err(e);
2934 }
2935 Outcome::Cancelled(r) => {
2936 tx_rollback_best_effort(tx, cx).await;
2937 return Outcome::Cancelled(r);
2938 }
2939 Outcome::Panicked(p) => {
2940 tx_rollback_best_effort(tx, cx).await;
2941 return Outcome::Panicked(p);
2942 }
2943 }
2944 }
2945
2946 if !parent_sql.is_empty() {
2947 match tx.execute(cx, &parent_sql, &parent_params).await {
2948 Outcome::Ok(n) => total = total.saturating_add(n),
2949 Outcome::Err(e) => {
2950 tx_rollback_best_effort(tx, cx).await;
2951 return Outcome::Err(e);
2952 }
2953 Outcome::Cancelled(r) => {
2954 tx_rollback_best_effort(tx, cx).await;
2955 return Outcome::Cancelled(r);
2956 }
2957 Outcome::Panicked(p) => {
2958 tx_rollback_best_effort(tx, cx).await;
2959 return Outcome::Panicked(p);
2960 }
2961 }
2962 }
2963
2964 match tx.commit(cx).await {
2965 Outcome::Ok(()) => Outcome::Ok(total),
2966 Outcome::Err(e) => Outcome::Err(e),
2967 Outcome::Cancelled(r) => Outcome::Cancelled(r),
2968 Outcome::Panicked(p) => Outcome::Panicked(p),
2969 }
2970 } else {
2971 let (sql, params) = self.build_with_dialect(conn.dialect());
2972 conn.execute(cx, &sql, ¶ms).await
2973 }
2974 }
2975
2976 pub async fn execute_returning<C: Connection>(
2981 mut self,
2982 cx: &Cx,
2983 conn: &C,
2984 ) -> Outcome<Vec<Row>, sqlmodel_core::Error> {
2985 self.returning = true;
2986 if is_joined_inheritance_child::<M>() {
2987 let dialect = conn.dialect();
2988 let (parent_table, _parent_fields) = match joined_parent_meta::<M>() {
2989 Ok(v) => v,
2990 Err(e) => return Outcome::Err(e),
2991 };
2992
2993 let tx_out = conn.begin(cx).await;
2994 let tx = match tx_out {
2995 Outcome::Ok(t) => t,
2996 Outcome::Err(e) => return Outcome::Err(e),
2997 Outcome::Cancelled(r) => return Outcome::Cancelled(r),
2998 Outcome::Panicked(p) => return Outcome::Panicked(p),
2999 };
3000
3001 let pk_values = if let Some(where_clause) = self.where_clause.as_ref() {
3002 match select_joined_pk_values_in_tx::<_, M>(&tx, cx, dialect, Some(where_clause))
3003 .await
3004 {
3005 Outcome::Ok(v) => v,
3006 Outcome::Err(e) => {
3007 tx_rollback_best_effort(tx, cx).await;
3008 return Outcome::Err(e);
3009 }
3010 Outcome::Cancelled(r) => {
3011 tx_rollback_best_effort(tx, cx).await;
3012 return Outcome::Cancelled(r);
3013 }
3014 Outcome::Panicked(p) => {
3015 tx_rollback_best_effort(tx, cx).await;
3016 return Outcome::Panicked(p);
3017 }
3018 }
3019 } else if let Some(model) = self.model {
3020 vec![model.primary_key_value()]
3021 } else {
3022 match select_joined_pk_values_in_tx::<_, M>(&tx, cx, dialect, None).await {
3023 Outcome::Ok(v) => v,
3024 Outcome::Err(e) => {
3025 tx_rollback_best_effort(tx, cx).await;
3026 return Outcome::Err(e);
3027 }
3028 Outcome::Cancelled(r) => {
3029 tx_rollback_best_effort(tx, cx).await;
3030 return Outcome::Cancelled(r);
3031 }
3032 Outcome::Panicked(p) => {
3033 tx_rollback_best_effort(tx, cx).await;
3034 return Outcome::Panicked(p);
3035 }
3036 }
3037 };
3038
3039 if pk_values.is_empty() {
3040 return match tx.commit(cx).await {
3041 Outcome::Ok(()) => Outcome::Ok(Vec::new()),
3042 Outcome::Err(e) => Outcome::Err(e),
3043 Outcome::Cancelled(r) => Outcome::Cancelled(r),
3044 Outcome::Panicked(p) => Outcome::Panicked(p),
3045 };
3046 }
3047
3048 let (select_sql, select_params) = match build_joined_child_select_sql_by_pk_in::<M>(
3049 dialect,
3050 M::PRIMARY_KEY,
3051 &pk_values,
3052 ) {
3053 Ok(v) => v,
3054 Err(e) => {
3055 tx_rollback_best_effort(tx, cx).await;
3056 return Outcome::Err(e);
3057 }
3058 };
3059 let rows = if select_sql.is_empty() {
3060 Vec::new()
3061 } else {
3062 match tx.query(cx, &select_sql, &select_params).await {
3063 Outcome::Ok(rows) => rows,
3064 Outcome::Err(e) => {
3065 tx_rollback_best_effort(tx, cx).await;
3066 return Outcome::Err(e);
3067 }
3068 Outcome::Cancelled(r) => {
3069 tx_rollback_best_effort(tx, cx).await;
3070 return Outcome::Cancelled(r);
3071 }
3072 Outcome::Panicked(p) => {
3073 tx_rollback_best_effort(tx, cx).await;
3074 return Outcome::Panicked(p);
3075 }
3076 }
3077 };
3078
3079 let (child_sql, child_params) = build_delete_sql_for_table_pk_in(
3080 dialect,
3081 M::TABLE_NAME,
3082 M::PRIMARY_KEY,
3083 &pk_values,
3084 );
3085 let (parent_sql, parent_params) =
3086 build_delete_sql_for_table_pk_in(dialect, parent_table, M::PRIMARY_KEY, &pk_values);
3087
3088 if !child_sql.is_empty() {
3089 match tx.execute(cx, &child_sql, &child_params).await {
3090 Outcome::Ok(_) => {}
3091 Outcome::Err(e) => {
3092 tx_rollback_best_effort(tx, cx).await;
3093 return Outcome::Err(e);
3094 }
3095 Outcome::Cancelled(r) => {
3096 tx_rollback_best_effort(tx, cx).await;
3097 return Outcome::Cancelled(r);
3098 }
3099 Outcome::Panicked(p) => {
3100 tx_rollback_best_effort(tx, cx).await;
3101 return Outcome::Panicked(p);
3102 }
3103 }
3104 }
3105
3106 if !parent_sql.is_empty() {
3107 match tx.execute(cx, &parent_sql, &parent_params).await {
3108 Outcome::Ok(_) => {}
3109 Outcome::Err(e) => {
3110 tx_rollback_best_effort(tx, cx).await;
3111 return Outcome::Err(e);
3112 }
3113 Outcome::Cancelled(r) => {
3114 tx_rollback_best_effort(tx, cx).await;
3115 return Outcome::Cancelled(r);
3116 }
3117 Outcome::Panicked(p) => {
3118 tx_rollback_best_effort(tx, cx).await;
3119 return Outcome::Panicked(p);
3120 }
3121 }
3122 }
3123
3124 return match tx.commit(cx).await {
3125 Outcome::Ok(()) => Outcome::Ok(rows),
3126 Outcome::Err(e) => Outcome::Err(e),
3127 Outcome::Cancelled(r) => Outcome::Cancelled(r),
3128 Outcome::Panicked(p) => Outcome::Panicked(p),
3129 };
3130 }
3131 let (sql, params) = self.build_with_dialect(conn.dialect());
3132 conn.query(cx, &sql, ¶ms).await
3133 }
3134}
3135
3136impl<M: Model> Default for DeleteBuilder<'_, M> {
3137 fn default() -> Self {
3138 Self::new()
3139 }
3140}
3141
3142#[derive(Debug)]
3144pub struct QueryBuilder {
3145 sql: String,
3146 params: Vec<Value>,
3147}
3148
3149impl QueryBuilder {
3150 pub fn new(sql: impl Into<String>) -> Self {
3152 Self {
3153 sql: sql.into(),
3154 params: Vec::new(),
3155 }
3156 }
3157
3158 pub fn bind(mut self, value: impl Into<Value>) -> Self {
3160 self.params.push(value.into());
3161 self
3162 }
3163
3164 pub fn bind_all(mut self, values: impl IntoIterator<Item = Value>) -> Self {
3166 self.params.extend(values);
3167 self
3168 }
3169
3170 pub fn build(self) -> (String, Vec<Value>) {
3172 (self.sql, self.params)
3173 }
3174}
3175
3176#[cfg(test)]
3177mod tests {
3178 use super::*;
3179 use crate::expr::Dialect;
3180 use sqlmodel_core::field::FieldInfo;
3181 use sqlmodel_core::types::SqlType;
3182
3183 struct TestHero {
3185 id: Option<i64>,
3186 name: String,
3187 age: i32,
3188 }
3189
3190 impl Model for TestHero {
3191 const TABLE_NAME: &'static str = "heroes";
3192 const PRIMARY_KEY: &'static [&'static str] = &["id"];
3193
3194 fn fields() -> &'static [FieldInfo] {
3195 static FIELDS: &[FieldInfo] = &[
3196 FieldInfo::new("id", "id", SqlType::BigInt)
3197 .primary_key(true)
3198 .auto_increment(true)
3199 .nullable(true),
3200 FieldInfo::new("name", "name", SqlType::Text),
3201 FieldInfo::new("age", "age", SqlType::Integer),
3202 ];
3203 FIELDS
3204 }
3205
3206 fn to_row(&self) -> Vec<(&'static str, Value)> {
3207 vec![
3208 ("id", self.id.map_or(Value::Null, Value::BigInt)),
3209 ("name", Value::Text(self.name.clone())),
3210 ("age", Value::Int(self.age)),
3211 ]
3212 }
3213
3214 fn from_row(_row: &Row) -> sqlmodel_core::Result<Self> {
3215 Err(sqlmodel_core::Error::Custom(
3216 "from_row not used in tests".to_string(),
3217 ))
3218 }
3219
3220 fn primary_key_value(&self) -> Vec<Value> {
3221 vec![self.id.map_or(Value::Null, Value::BigInt)]
3222 }
3223
3224 fn is_new(&self) -> bool {
3225 self.id.is_none()
3226 }
3227 }
3228
3229 struct TestOnlyId {
3230 id: Option<i64>,
3231 }
3232
3233 impl Model for TestOnlyId {
3234 const TABLE_NAME: &'static str = "only_ids";
3235 const PRIMARY_KEY: &'static [&'static str] = &["id"];
3236
3237 fn fields() -> &'static [FieldInfo] {
3238 static FIELDS: &[FieldInfo] = &[FieldInfo::new("id", "id", SqlType::BigInt)
3239 .primary_key(true)
3240 .auto_increment(true)
3241 .nullable(true)];
3242 FIELDS
3243 }
3244
3245 fn to_row(&self) -> Vec<(&'static str, Value)> {
3246 vec![("id", self.id.map_or(Value::Null, Value::BigInt))]
3247 }
3248
3249 fn from_row(_row: &Row) -> sqlmodel_core::Result<Self> {
3250 Err(sqlmodel_core::Error::Custom(
3251 "from_row not used in tests".to_string(),
3252 ))
3253 }
3254
3255 fn primary_key_value(&self) -> Vec<Value> {
3256 vec![self.id.map_or(Value::Null, Value::BigInt)]
3257 }
3258
3259 fn is_new(&self) -> bool {
3260 self.id.is_none()
3261 }
3262 }
3263
3264 #[test]
3265 fn test_insert_basic() {
3266 let hero = TestHero {
3267 id: None,
3268 name: "Spider-Man".to_string(),
3269 age: 25,
3270 };
3271 let (sql, params) = InsertBuilder::new(&hero).build();
3272
3273 assert_eq!(
3275 sql,
3276 "INSERT INTO heroes (id, name, age) VALUES (DEFAULT, $1, $2)"
3277 );
3278 assert_eq!(params.len(), 2);
3279 }
3280
3281 #[test]
3282 fn test_insert_returning() {
3283 let hero = TestHero {
3284 id: None,
3285 name: "Spider-Man".to_string(),
3286 age: 25,
3287 };
3288 let (sql, _) = InsertBuilder::new(&hero).returning().build();
3289
3290 assert!(sql.ends_with(" RETURNING *"));
3291 }
3292
3293 #[test]
3294 fn test_insert_on_conflict_do_nothing() {
3295 let hero = TestHero {
3296 id: None,
3297 name: "Spider-Man".to_string(),
3298 age: 25,
3299 };
3300 let (sql, _) = InsertBuilder::new(&hero).on_conflict_do_nothing().build();
3301
3302 assert!(sql.contains("ON CONFLICT DO NOTHING"));
3303 }
3304
3305 #[test]
3306 fn test_insert_on_conflict_do_update() {
3307 let hero = TestHero {
3308 id: None,
3309 name: "Spider-Man".to_string(),
3310 age: 25,
3311 };
3312 let (sql, _) = InsertBuilder::new(&hero)
3313 .on_conflict_do_update(&["name", "age"])
3314 .build();
3315
3316 assert!(sql.contains("ON CONFLICT (id) DO UPDATE SET"));
3317 assert!(sql.contains("name = EXCLUDED.name"));
3318 assert!(sql.contains("age = EXCLUDED.age"));
3319 }
3320
3321 #[test]
3322 fn test_insert_mysql_on_conflict_do_nothing() {
3323 let hero = TestHero {
3324 id: None,
3325 name: "Spider-Man".to_string(),
3326 age: 25,
3327 };
3328 let (sql, _) = InsertBuilder::new(&hero)
3329 .on_conflict_do_nothing()
3330 .build_with_dialect(Dialect::Mysql);
3331
3332 assert!(sql.starts_with("INSERT IGNORE INTO heroes"));
3333 assert!(!sql.contains("ON CONFLICT"));
3334 }
3335
3336 #[test]
3337 fn test_insert_mysql_on_conflict_do_update() {
3338 let hero = TestHero {
3339 id: None,
3340 name: "Spider-Man".to_string(),
3341 age: 25,
3342 };
3343 let (sql, _) = InsertBuilder::new(&hero)
3344 .on_conflict_do_update(&["name", "age"])
3345 .build_with_dialect(Dialect::Mysql);
3346
3347 assert!(sql.contains("ON DUPLICATE KEY UPDATE"));
3348 assert!(sql.contains("name = VALUES(name)"));
3349 assert!(sql.contains("age = VALUES(age)"));
3350 assert!(!sql.contains("ON CONFLICT"));
3351 }
3352
3353 #[test]
3354 fn test_insert_many_mysql_on_conflict_do_update() {
3355 let heroes = vec![
3356 TestHero {
3357 id: None,
3358 name: "Spider-Man".to_string(),
3359 age: 25,
3360 },
3361 TestHero {
3362 id: None,
3363 name: "Iron Man".to_string(),
3364 age: 45,
3365 },
3366 ];
3367 let (sql, params) = InsertManyBuilder::new(&heroes)
3368 .on_conflict_do_update(&["name"])
3369 .build_with_dialect(Dialect::Mysql);
3370
3371 assert!(sql.contains("ON DUPLICATE KEY UPDATE"));
3372 assert!(sql.contains("name = VALUES(name)"));
3373 assert!(!sql.contains("ON CONFLICT"));
3374 assert_eq!(params.len(), 4);
3375 }
3376
3377 #[test]
3378 fn test_insert_many() {
3379 let heroes = vec![
3380 TestHero {
3381 id: None,
3382 name: "Spider-Man".to_string(),
3383 age: 25,
3384 },
3385 TestHero {
3386 id: None,
3387 name: "Iron Man".to_string(),
3388 age: 45,
3389 },
3390 ];
3391 let (sql, params) = InsertManyBuilder::new(&heroes).build();
3392
3393 assert!(sql.starts_with("INSERT INTO heroes (id, name, age) VALUES"));
3395 assert!(sql.contains("(DEFAULT, $1, $2), (DEFAULT, $3, $4)"));
3396 assert_eq!(params.len(), 4);
3397 }
3398
3399 #[test]
3400 fn test_insert_sqlite_omits_default_columns() {
3401 let hero = TestHero {
3402 id: None,
3403 name: "Spider-Man".to_string(),
3404 age: 25,
3405 };
3406 let (sql, params) = InsertBuilder::new(&hero).build_with_dialect(Dialect::Sqlite);
3407
3408 assert_eq!(sql, "INSERT INTO heroes (name, age) VALUES (?1, ?2)");
3409 assert_eq!(params.len(), 2);
3410 }
3411
3412 #[test]
3413 fn test_insert_sqlite_default_values_only() {
3414 let model = TestOnlyId { id: None };
3415 let (sql, params) = InsertBuilder::new(&model).build_with_dialect(Dialect::Sqlite);
3416
3417 assert_eq!(sql, "INSERT INTO only_ids DEFAULT VALUES");
3418 assert!(params.is_empty());
3419 }
3420
3421 #[test]
3422 fn test_insert_many_sqlite_omits_auto_increment() {
3423 let heroes = vec![
3424 TestHero {
3425 id: None,
3426 name: "Spider-Man".to_string(),
3427 age: 25,
3428 },
3429 TestHero {
3430 id: None,
3431 name: "Iron Man".to_string(),
3432 age: 45,
3433 },
3434 ];
3435 let batches = InsertManyBuilder::new(&heroes).build_batches_with_dialect(Dialect::Sqlite);
3436
3437 assert_eq!(batches.len(), 1);
3438 let (sql, params) = &batches[0];
3439 assert!(sql.starts_with("INSERT INTO heroes (name, age) VALUES"));
3440 assert!(sql.contains("(?1, ?2), (?3, ?4)"));
3441 assert_eq!(params.len(), 4);
3442 }
3443
3444 #[test]
3445 fn test_insert_many_sqlite_mixed_defaults_split() {
3446 let heroes = vec![
3447 TestHero {
3448 id: Some(1),
3449 name: "Spider-Man".to_string(),
3450 age: 25,
3451 },
3452 TestHero {
3453 id: None,
3454 name: "Iron Man".to_string(),
3455 age: 45,
3456 },
3457 ];
3458 let batches = InsertManyBuilder::new(&heroes).build_batches_with_dialect(Dialect::Sqlite);
3459
3460 assert_eq!(batches.len(), 2);
3461 assert_eq!(
3462 batches[0].0,
3463 "INSERT INTO heroes (id, name, age) VALUES (?1, ?2, ?3)"
3464 );
3465 assert_eq!(
3466 batches[1].0,
3467 "INSERT INTO heroes (name, age) VALUES (?1, ?2)"
3468 );
3469 assert_eq!(batches[0].1.len(), 3);
3470 assert_eq!(batches[1].1.len(), 2);
3471 }
3472
3473 #[test]
3474 fn test_insert_many_sqlite_default_values_only() {
3475 let rows = vec![TestOnlyId { id: None }, TestOnlyId { id: None }];
3476 let batches = InsertManyBuilder::new(&rows).build_batches_with_dialect(Dialect::Sqlite);
3477
3478 assert_eq!(batches.len(), 2);
3479 assert_eq!(batches[0].0, "INSERT INTO only_ids DEFAULT VALUES");
3480 assert_eq!(batches[1].0, "INSERT INTO only_ids DEFAULT VALUES");
3481 assert!(batches[0].1.is_empty());
3482 assert!(batches[1].1.is_empty());
3483 }
3484
3485 #[test]
3486 fn test_update_basic() {
3487 let hero = TestHero {
3488 id: Some(1),
3489 name: "Spider-Man".to_string(),
3490 age: 26,
3491 };
3492 let (sql, params) = UpdateBuilder::new(&hero).build();
3493
3494 assert!(sql.starts_with("UPDATE heroes SET"));
3495 assert!(sql.contains("WHERE id = "));
3496 assert!(params.len() >= 2); }
3498
3499 #[test]
3500 fn test_update_explicit_set() {
3501 let (sql, params) = UpdateBuilder::<TestHero>::empty()
3502 .set("age", 30)
3503 .filter(Expr::col("id").eq(1))
3504 .build_with_dialect(Dialect::Postgres);
3505
3506 assert_eq!(sql, "UPDATE heroes SET age = $1 WHERE \"id\" = $2");
3507 assert_eq!(params.len(), 2);
3508 }
3509
3510 #[test]
3511 fn test_update_returning() {
3512 let hero = TestHero {
3513 id: Some(1),
3514 name: "Spider-Man".to_string(),
3515 age: 26,
3516 };
3517 let (sql, _) = UpdateBuilder::new(&hero).returning().build();
3518
3519 assert!(sql.ends_with(" RETURNING *"));
3520 }
3521
3522 #[test]
3523 fn test_delete_basic() {
3524 let (sql, _) = DeleteBuilder::<TestHero>::new()
3525 .filter(Expr::col("age").lt(18))
3526 .build_with_dialect(Dialect::Postgres);
3527
3528 assert_eq!(sql, "DELETE FROM heroes WHERE \"age\" < $1");
3529 }
3530
3531 #[test]
3532 fn test_delete_from_model() {
3533 let hero = TestHero {
3534 id: Some(42),
3535 name: "Spider-Man".to_string(),
3536 age: 25,
3537 };
3538 let (sql, params) = DeleteBuilder::from_model(&hero).build();
3539
3540 assert!(sql.contains("WHERE id = $1"));
3541 assert_eq!(params.len(), 1);
3542 }
3543
3544 #[test]
3545 fn test_delete_returning() {
3546 let (sql, _) = DeleteBuilder::<TestHero>::new()
3547 .filter(Expr::col("status").eq("inactive"))
3548 .returning()
3549 .build_with_dialect(Dialect::Postgres);
3550
3551 assert!(sql.ends_with(" RETURNING *"));
3552 }
3553
3554 #[test]
3555 fn test_dialect_sqlite() {
3556 let hero = TestHero {
3557 id: None,
3558 name: "Spider-Man".to_string(),
3559 age: 25,
3560 };
3561 let (sql, _) = InsertBuilder::new(&hero).build_with_dialect(Dialect::Sqlite);
3562
3563 assert!(sql.contains("?1"));
3564 assert!(sql.contains("?2"));
3565 }
3566
3567 #[test]
3568 fn test_dialect_mysql() {
3569 let hero = TestHero {
3570 id: None,
3571 name: "Spider-Man".to_string(),
3572 age: 25,
3573 };
3574 let (sql, _) = InsertBuilder::new(&hero).build_with_dialect(Dialect::Mysql);
3575
3576 assert!(sql.contains('?'));
3578 assert!(!sql.contains("$1"));
3579 }
3580}