1#![allow(dead_code)]
2
3use std::fmt::Debug;
56use std::marker::PhantomData;
57
58use crate::error::QueryResult;
59use crate::filter::{Filter, FilterValue};
60use crate::sql::quote_identifier;
61use crate::traits::{Model, QueryEngine};
62
63#[derive(Debug, Clone)]
65pub enum NestedWrite<T: Model> {
66 Create(Vec<NestedCreateData<T>>),
68 CreateOrConnect(Vec<NestedCreateOrConnectData<T>>),
70 Connect(Vec<Filter>),
72 Disconnect(Vec<Filter>),
74 Set(Vec<Filter>),
76 Delete(Vec<Filter>),
78 Update(Vec<NestedUpdateData<T>>),
80 Upsert(Vec<NestedUpsertData<T>>),
82 UpdateMany(NestedUpdateManyData<T>),
84 DeleteMany(Filter),
86}
87
88impl<T: Model> NestedWrite<T> {
89 pub fn create(data: NestedCreateData<T>) -> Self {
91 Self::Create(vec![data])
92 }
93
94 pub fn create_many(data: Vec<NestedCreateData<T>>) -> Self {
96 Self::Create(data)
97 }
98
99 pub fn connect_one(filter: impl Into<Filter>) -> Self {
101 Self::Connect(vec![filter.into()])
102 }
103
104 pub fn connect(filters: Vec<impl Into<Filter>>) -> Self {
106 Self::Connect(filters.into_iter().map(Into::into).collect())
107 }
108
109 pub fn disconnect_one(filter: impl Into<Filter>) -> Self {
111 Self::Disconnect(vec![filter.into()])
112 }
113
114 pub fn disconnect(filters: Vec<impl Into<Filter>>) -> Self {
116 Self::Disconnect(filters.into_iter().map(Into::into).collect())
117 }
118
119 pub fn set(filters: Vec<impl Into<Filter>>) -> Self {
121 Self::Set(filters.into_iter().map(Into::into).collect())
122 }
123
124 pub fn delete(filters: Vec<impl Into<Filter>>) -> Self {
126 Self::Delete(filters.into_iter().map(Into::into).collect())
127 }
128
129 pub fn delete_many(filter: impl Into<Filter>) -> Self {
131 Self::DeleteMany(filter.into())
132 }
133}
134
135#[derive(Debug, Clone)]
137pub struct NestedCreateData<T: Model> {
138 pub data: Vec<(String, FilterValue)>,
140 _model: PhantomData<T>,
142}
143
144impl<T: Model> NestedCreateData<T> {
145 pub fn new(data: Vec<(String, FilterValue)>) -> Self {
147 Self {
148 data,
149 _model: PhantomData,
150 }
151 }
152
153 pub fn from_pairs(
155 pairs: impl IntoIterator<Item = (impl Into<String>, impl Into<FilterValue>)>,
156 ) -> Self {
157 Self::new(
158 pairs
159 .into_iter()
160 .map(|(k, v)| (k.into(), v.into()))
161 .collect(),
162 )
163 }
164}
165
166impl<T: Model> Default for NestedCreateData<T> {
167 fn default() -> Self {
168 Self::new(Vec::new())
169 }
170}
171
172#[derive(Debug, Clone)]
174pub struct NestedCreateOrConnectData<T: Model> {
175 pub filter: Filter,
177 pub create: NestedCreateData<T>,
179}
180
181impl<T: Model> NestedCreateOrConnectData<T> {
182 pub fn new(filter: impl Into<Filter>, create: NestedCreateData<T>) -> Self {
184 Self {
185 filter: filter.into(),
186 create,
187 }
188 }
189}
190
191#[derive(Debug, Clone)]
193pub struct NestedUpdateData<T: Model> {
194 pub filter: Filter,
196 pub data: Vec<(String, FilterValue)>,
198 _model: PhantomData<T>,
200}
201
202impl<T: Model> NestedUpdateData<T> {
203 pub fn new(filter: impl Into<Filter>, data: Vec<(String, FilterValue)>) -> Self {
205 Self {
206 filter: filter.into(),
207 data,
208 _model: PhantomData,
209 }
210 }
211
212 pub fn from_pairs(
214 filter: impl Into<Filter>,
215 pairs: impl IntoIterator<Item = (impl Into<String>, impl Into<FilterValue>)>,
216 ) -> Self {
217 Self::new(
218 filter,
219 pairs
220 .into_iter()
221 .map(|(k, v)| (k.into(), v.into()))
222 .collect(),
223 )
224 }
225}
226
227#[derive(Debug, Clone)]
229pub struct NestedUpsertData<T: Model> {
230 pub filter: Filter,
232 pub create: NestedCreateData<T>,
234 pub update: Vec<(String, FilterValue)>,
236 _model: PhantomData<T>,
238}
239
240impl<T: Model> NestedUpsertData<T> {
241 pub fn new(
243 filter: impl Into<Filter>,
244 create: NestedCreateData<T>,
245 update: Vec<(String, FilterValue)>,
246 ) -> Self {
247 Self {
248 filter: filter.into(),
249 create,
250 update,
251 _model: PhantomData,
252 }
253 }
254}
255
256#[derive(Debug, Clone)]
258pub struct NestedUpdateManyData<T: Model> {
259 pub filter: Filter,
261 pub data: Vec<(String, FilterValue)>,
263 _model: PhantomData<T>,
265}
266
267impl<T: Model> NestedUpdateManyData<T> {
268 pub fn new(filter: impl Into<Filter>, data: Vec<(String, FilterValue)>) -> Self {
270 Self {
271 filter: filter.into(),
272 data,
273 _model: PhantomData,
274 }
275 }
276}
277
278#[derive(Debug)]
285pub struct NestedWriteBuilder {
286 parent_table: String,
288 parent_pk: Vec<String>,
290 related_table: String,
292 foreign_key: String,
294 is_one_to_many: bool,
296 join_table: Option<JoinTableInfo>,
298}
299
300#[derive(Debug, Clone)]
302pub struct JoinTableInfo {
303 pub table_name: String,
305 pub parent_column: String,
307 pub related_column: String,
309}
310
311impl NestedWriteBuilder {
312 pub fn one_to_many(
314 parent_table: impl Into<String>,
315 parent_pk: Vec<String>,
316 related_table: impl Into<String>,
317 foreign_key: impl Into<String>,
318 ) -> Self {
319 Self {
320 parent_table: parent_table.into(),
321 parent_pk,
322 related_table: related_table.into(),
323 foreign_key: foreign_key.into(),
324 is_one_to_many: true,
325 join_table: None,
326 }
327 }
328
329 pub fn many_to_many(
331 parent_table: impl Into<String>,
332 parent_pk: Vec<String>,
333 related_table: impl Into<String>,
334 join_table: JoinTableInfo,
335 ) -> Self {
336 Self {
337 parent_table: parent_table.into(),
338 parent_pk,
339 related_table: related_table.into(),
340 foreign_key: String::new(), is_one_to_many: false,
342 join_table: Some(join_table),
343 }
344 }
345
346 pub fn build_connect_sql<T: Model>(
348 &self,
349 parent_id: &FilterValue,
350 filters: &[Filter],
351 ) -> Vec<(String, Vec<FilterValue>)> {
352 let mut statements = Vec::new();
353
354 if self.is_one_to_many {
355 for filter in filters {
357 let (where_sql, mut params) = filter.to_sql(1, &crate::dialect::Postgres);
358 let sql = format!(
359 "UPDATE {} SET {} = ${} WHERE {}",
360 quote_identifier(&self.related_table),
361 quote_identifier(&self.foreign_key),
362 params.len() + 1,
363 where_sql
364 );
365 params.push(parent_id.clone());
366 statements.push((sql, params));
367 }
368 } else if let Some(join) = &self.join_table {
369 for filter in filters {
372 let (where_sql, mut params) = filter.to_sql(1, &crate::dialect::Postgres);
373
374 let select_sql = format!(
376 "SELECT {} FROM {} WHERE {}",
377 quote_identifier(T::PRIMARY_KEY.first().unwrap_or(&"id")),
378 quote_identifier(&self.related_table),
379 where_sql
380 );
381
382 let insert_sql = format!(
384 "INSERT INTO {} ({}, {}) SELECT ${}, {} FROM {} WHERE {} ON CONFLICT DO NOTHING",
385 quote_identifier(&join.table_name),
386 quote_identifier(&join.parent_column),
387 quote_identifier(&join.related_column),
388 params.len() + 1,
389 quote_identifier(T::PRIMARY_KEY.first().unwrap_or(&"id")),
390 quote_identifier(&self.related_table),
391 where_sql
392 );
393 params.push(parent_id.clone());
394 statements.push((insert_sql, params));
395 let _ = select_sql;
397 }
398 }
399
400 statements
401 }
402
403 pub fn build_disconnect_sql(
405 &self,
406 parent_id: &FilterValue,
407 filters: &[Filter],
408 ) -> Vec<(String, Vec<FilterValue>)> {
409 let mut statements = Vec::new();
410
411 if self.is_one_to_many {
412 for filter in filters {
414 let (where_sql, mut params) = filter.to_sql(1, &crate::dialect::Postgres);
415 let sql = format!(
416 "UPDATE {} SET {} = NULL WHERE {} AND {} = ${}",
417 quote_identifier(&self.related_table),
418 quote_identifier(&self.foreign_key),
419 where_sql,
420 quote_identifier(&self.foreign_key),
421 params.len() + 1
422 );
423 params.push(parent_id.clone());
424 statements.push((sql, params));
425 }
426 } else if let Some(join) = &self.join_table {
427 for filter in filters {
429 let (where_sql, mut params) = filter.to_sql(2, &crate::dialect::Postgres);
430 let sql = format!(
431 "DELETE FROM {} WHERE {} = $1 AND {} IN (SELECT id FROM {} WHERE {})",
432 quote_identifier(&join.table_name),
433 quote_identifier(&join.parent_column),
434 quote_identifier(&join.related_column),
435 quote_identifier(&self.related_table),
436 where_sql
437 );
438 let mut final_params = vec![parent_id.clone()];
439 final_params.extend(params);
440 params = final_params;
441 statements.push((sql, params));
442 }
443 }
444
445 statements
446 }
447
448 pub fn build_set_sql<T: Model>(
450 &self,
451 parent_id: &FilterValue,
452 filters: &[Filter],
453 ) -> Vec<(String, Vec<FilterValue>)> {
454 let mut statements = Vec::new();
455
456 if self.is_one_to_many {
458 let sql = format!(
459 "UPDATE {} SET {} = NULL WHERE {} = $1",
460 quote_identifier(&self.related_table),
461 quote_identifier(&self.foreign_key),
462 quote_identifier(&self.foreign_key)
463 );
464 statements.push((sql, vec![parent_id.clone()]));
465 } else if let Some(join) = &self.join_table {
466 let sql = format!(
467 "DELETE FROM {} WHERE {} = $1",
468 quote_identifier(&join.table_name),
469 quote_identifier(&join.parent_column)
470 );
471 statements.push((sql, vec![parent_id.clone()]));
472 }
473
474 statements.extend(self.build_connect_sql::<T>(parent_id, filters));
476
477 statements
478 }
479
480 pub fn build_create_sql<T: Model>(
482 &self,
483 parent_id: &FilterValue,
484 creates: &[NestedCreateData<T>],
485 ) -> Vec<(String, Vec<FilterValue>)> {
486 let mut statements = Vec::with_capacity(creates.len());
487 let quoted_table = quote_identifier(&self.related_table);
488
489 for create in creates {
490 let row_len = create.data.len() + 1;
491 let mut columns: Vec<String> = Vec::with_capacity(row_len);
492 let mut values: Vec<FilterValue> = Vec::with_capacity(row_len);
493 for (k, v) in &create.data {
494 columns.push(k.clone());
495 values.push(v.clone());
496 }
497
498 columns.push(self.foreign_key.clone());
499 values.push(parent_id.clone());
500
501 let mut col_list = String::new();
502 let mut placeholders = String::new();
503 for (i, c) in columns.iter().enumerate() {
504 if i > 0 {
505 col_list.push_str(", ");
506 placeholders.push_str(", ");
507 }
508 col_list.push_str("e_identifier(c));
509 use std::fmt::Write;
510 let _ = write!(placeholders, "${}", i + 1);
511 }
512
513 let sql = format!(
514 "INSERT INTO {} ({}) VALUES ({}) RETURNING *",
515 quoted_table, col_list, placeholders,
516 );
517
518 statements.push((sql, values));
519 }
520
521 statements
522 }
523
524 pub fn build_delete_sql(
526 &self,
527 parent_id: &FilterValue,
528 filters: &[Filter],
529 ) -> Vec<(String, Vec<FilterValue>)> {
530 let mut statements = Vec::new();
531
532 for filter in filters {
533 let (where_sql, mut params) = filter.to_sql(1, &crate::dialect::Postgres);
534 let sql = format!(
535 "DELETE FROM {} WHERE {} AND {} = ${}",
536 quote_identifier(&self.related_table),
537 where_sql,
538 quote_identifier(&self.foreign_key),
539 params.len() + 1
540 );
541 params.push(parent_id.clone());
542 statements.push((sql, params));
543 }
544
545 statements
546 }
547}
548
549#[derive(Debug, Default)]
551pub struct NestedWriteOperations {
552 pub pre_statements: Vec<(String, Vec<FilterValue>)>,
554 pub post_statements: Vec<(String, Vec<FilterValue>)>,
556}
557
558impl NestedWriteOperations {
559 pub fn new() -> Self {
561 Self::default()
562 }
563
564 pub fn add_pre(&mut self, sql: String, params: Vec<FilterValue>) {
566 self.pre_statements.push((sql, params));
567 }
568
569 pub fn add_post(&mut self, sql: String, params: Vec<FilterValue>) {
571 self.post_statements.push((sql, params));
572 }
573
574 pub fn extend(&mut self, other: Self) {
576 self.pre_statements.extend(other.pre_statements);
577 self.post_statements.extend(other.post_statements);
578 }
579
580 pub fn is_empty(&self) -> bool {
582 self.pre_statements.is_empty() && self.post_statements.is_empty()
583 }
584
585 pub fn len(&self) -> usize {
587 self.pre_statements.len() + self.post_statements.len()
588 }
589}
590
591#[derive(Debug, Clone)]
609pub enum NestedWriteOp {
610 Create {
615 relation: &'static str,
616 target_table: &'static str,
617 foreign_key: &'static str,
618 payload: Vec<Vec<(String, FilterValue)>>,
621 },
622 Connect {
632 relation: &'static str,
633 target_table: &'static str,
634 foreign_key: &'static str,
635 target_pk: &'static str,
636 pk: FilterValue,
637 },
638 Disconnect {
644 relation: &'static str,
645 target_table: &'static str,
646 foreign_key: &'static str,
647 target_pk: &'static str,
648 pk: FilterValue,
649 },
650 Delete {
656 relation: &'static str,
657 target_table: &'static str,
658 target_pk: &'static str,
659 pk: FilterValue,
660 },
661 DeleteMany {
669 relation: &'static str,
670 target_table: &'static str,
671 foreign_key: &'static str,
672 filter: Filter,
673 },
674 Update {
685 relation: &'static str,
686 target_table: &'static str,
687 target_pk: &'static str,
688 pk: FilterValue,
689 payload: Vec<(String, crate::inputs::WriteOp)>,
690 },
691 UpdateMany {
700 relation: &'static str,
701 target_table: &'static str,
702 foreign_key: &'static str,
703 filter: Filter,
704 payload: Vec<(String, crate::inputs::WriteOp)>,
705 },
706 Upsert {
740 relation: &'static str,
741 target_table: &'static str,
742 foreign_key: &'static str,
743 target_pk: &'static str,
744 pk: FilterValue,
745 create_payload: Vec<(String, FilterValue)>,
746 update_payload: Vec<(String, crate::inputs::WriteOp)>,
747 },
748 ConnectOrCreate {
766 relation: &'static str,
767 target_table: &'static str,
768 foreign_key: &'static str,
769 where_filter: Filter,
770 create_payload: Vec<(String, FilterValue)>,
771 },
772 Set {
790 relation: &'static str,
791 target_table: &'static str,
792 foreign_key: &'static str,
793 target_pk: &'static str,
794 set_pks: Vec<FilterValue>,
795 },
796}
797
798fn build_writeop_set_clause(
806 payload: &[(String, crate::inputs::WriteOp)],
807 dialect: &dyn crate::dialect::SqlDialect,
808 start_ph: usize,
809) -> (String, Vec<FilterValue>) {
810 let mut fragments: Vec<String> = Vec::with_capacity(payload.len());
811 let mut params: Vec<FilterValue> = Vec::with_capacity(payload.len());
812 let mut next_ph = start_ph;
813 for (col, op) in payload {
814 let (frag, maybe_val) =
815 op.to_set_fragment(&dialect.quote_ident(col), &dialect.placeholder(next_ph));
816 fragments.push(frag);
817 if let Some(val) = maybe_val {
818 params.push(val);
819 next_ph += 1;
820 }
821 }
822 (fragments.join(", "), params)
823}
824
825impl NestedWriteOp {
826 pub async fn execute<E>(self, engine: &E, parent_pk: &FilterValue) -> QueryResult<()>
833 where
834 E: QueryEngine,
835 {
836 match self {
837 NestedWriteOp::Connect {
838 relation: _,
839 target_table,
840 foreign_key,
841 target_pk,
842 pk,
843 } => {
844 let dialect = engine.dialect();
845 let sql = format!(
846 "UPDATE {} SET {} = {} WHERE {} = {}",
847 dialect.quote_ident(target_table),
848 dialect.quote_ident(foreign_key),
849 dialect.placeholder(1),
850 dialect.quote_ident(target_pk),
851 dialect.placeholder(2),
852 );
853 engine
854 .execute_raw(&sql, vec![parent_pk.clone(), pk])
855 .await?;
856 Ok(())
857 }
858 NestedWriteOp::Disconnect {
859 relation: _,
860 target_table,
861 foreign_key,
862 target_pk,
863 pk,
864 } => {
865 let dialect = engine.dialect();
866 let sql = format!(
867 "UPDATE {} SET {} = NULL WHERE {} = {}",
868 dialect.quote_ident(target_table),
869 dialect.quote_ident(foreign_key),
870 dialect.quote_ident(target_pk),
871 dialect.placeholder(1),
872 );
873 engine.execute_raw(&sql, vec![pk]).await?;
874 Ok(())
875 }
876 NestedWriteOp::Delete {
877 relation: _,
878 target_table,
879 target_pk,
880 pk,
881 } => {
882 let dialect = engine.dialect();
883 let sql = format!(
884 "DELETE FROM {} WHERE {} = {}",
885 dialect.quote_ident(target_table),
886 dialect.quote_ident(target_pk),
887 dialect.placeholder(1),
888 );
889 let affected = engine.execute_raw(&sql, vec![pk]).await?;
890 if affected != 1 {
891 return Err(crate::error::QueryError::not_found(target_table)
892 .with_context("Nested Delete by PK"));
893 }
894 Ok(())
895 }
896 NestedWriteOp::DeleteMany {
897 relation: _,
898 target_table,
899 foreign_key,
900 filter,
901 } => {
902 let dialect = engine.dialect();
903 let is_unconstrained = matches!(filter, Filter::None);
904 let sql = if is_unconstrained {
905 format!(
906 "DELETE FROM {} WHERE {} = {}",
907 dialect.quote_ident(target_table),
908 dialect.quote_ident(foreign_key),
909 dialect.placeholder(1),
910 )
911 } else {
912 let (filter_sql, params_tail) = filter.to_sql(2, &crate::dialect::Postgres);
913 let sql = format!(
914 "DELETE FROM {} WHERE {} = {} AND ({})",
915 dialect.quote_ident(target_table),
916 dialect.quote_ident(foreign_key),
917 dialect.placeholder(1),
918 filter_sql,
919 );
920 let mut params = Vec::with_capacity(params_tail.len() + 1);
921 params.push(parent_pk.clone());
922 params.extend(params_tail);
923 return engine.execute_raw(&sql, params).await.map(|_| ());
924 };
925 engine.execute_raw(&sql, vec![parent_pk.clone()]).await?;
926 Ok(())
927 }
928 NestedWriteOp::Update {
929 relation: _,
930 target_table,
931 target_pk,
932 pk,
933 payload,
934 } => {
935 if payload.is_empty() {
936 return Ok(());
937 }
938 let dialect = engine.dialect();
939 let (set_text, mut update_params) = build_writeop_set_clause(&payload, dialect, 1);
940 let next_placeholder = update_params.len() + 1;
941 update_params.push(pk);
942 let sql = format!(
943 "UPDATE {} SET {} WHERE {} = {}",
944 dialect.quote_ident(target_table),
945 set_text,
946 dialect.quote_ident(target_pk),
947 dialect.placeholder(next_placeholder),
948 );
949 let affected = engine.execute_raw(&sql, update_params).await?;
950 if affected != 1 {
951 return Err(crate::error::QueryError::not_found(target_table)
952 .with_context("Nested Update by PK"));
953 }
954 Ok(())
955 }
956 NestedWriteOp::UpdateMany {
957 relation: _,
958 target_table,
959 foreign_key,
960 filter,
961 payload,
962 } => {
963 if payload.is_empty() {
964 return Ok(());
965 }
966 let dialect = engine.dialect();
967 let (set_text, mut params) = build_writeop_set_clause(&payload, dialect, 1);
968 let fk_placeholder_idx = params.len() + 1;
969 let fk_placeholder = dialect.placeholder(fk_placeholder_idx);
970 let next_placeholder = fk_placeholder_idx + 1;
971 params.push(parent_pk.clone());
972
973 let is_unconstrained = matches!(filter, Filter::None);
974 let sql = if is_unconstrained {
975 format!(
976 "UPDATE {} SET {} WHERE {} = {}",
977 dialect.quote_ident(target_table),
978 set_text,
979 dialect.quote_ident(foreign_key),
980 fk_placeholder,
981 )
982 } else {
983 let (filter_sql, filter_params) =
984 filter.to_sql(next_placeholder, &crate::dialect::Postgres);
985 params.extend(filter_params);
986 format!(
987 "UPDATE {} SET {} WHERE {} = {} AND ({})",
988 dialect.quote_ident(target_table),
989 set_text,
990 dialect.quote_ident(foreign_key),
991 fk_placeholder,
992 filter_sql,
993 )
994 };
995 engine.execute_raw(&sql, params).await?;
996 Ok(())
997 }
998 NestedWriteOp::Upsert {
999 relation: _,
1000 target_table,
1001 foreign_key,
1002 target_pk,
1003 pk,
1004 create_payload,
1005 update_payload,
1006 } => {
1007 let dialect = engine.dialect();
1008
1009 if !dialect.supports_upsert() {
1012 return Err(crate::error::QueryError::unsupported(format!(
1013 "Nested Upsert is not supported by the `{}` engine",
1014 std::any::type_name::<dyn crate::dialect::SqlDialect>()
1015 )));
1016 }
1017
1018 if update_payload.is_empty() && create_payload.is_empty() {
1020 return Ok(());
1021 }
1022
1023 if create_payload.is_empty() {
1026 return Err(crate::error::QueryError::invalid_input(
1027 "create_payload",
1028 "Nested Upsert requires at least one create column when no row to update",
1029 ));
1030 }
1031
1032 let insert_arity = create_payload.len() + 1; let (probe_set_text, probe_set_params) =
1047 build_writeop_set_clause(&update_payload, dialect, insert_arity + 1);
1048
1049 let build_insert_columns_and_values = || {
1053 let mut columns: Vec<String> =
1054 create_payload.iter().map(|(c, _)| c.clone()).collect();
1055 let mut values: Vec<FilterValue> =
1056 create_payload.iter().map(|(_, v)| v.clone()).collect();
1057 columns.push(foreign_key.to_string());
1058 values.push(parent_pk.clone());
1059 let placeholders: Vec<String> =
1060 (1..=values.len()).map(|i| dialect.placeholder(i)).collect();
1061 let quoted_cols: Vec<String> =
1062 columns.iter().map(|c| dialect.quote_ident(c)).collect();
1063 (columns, values, placeholders, quoted_cols)
1064 };
1065
1066 let conflict_cols = [target_pk];
1069 let upsert_clause_text = dialect.upsert_clause(&conflict_cols, &probe_set_text);
1070
1071 if !upsert_clause_text.is_empty() {
1072 let (_, mut values, placeholders, quoted_cols) =
1076 build_insert_columns_and_values();
1077
1078 let _ = pk;
1083
1084 let effective_upsert_clause = if update_payload.is_empty() {
1085 let do_nothing = dialect.upsert_do_nothing_clause(&conflict_cols);
1090 if do_nothing.is_empty() {
1091 return Ok(());
1094 }
1095 do_nothing
1096 } else {
1097 values.extend(probe_set_params);
1098 upsert_clause_text
1099 };
1100
1101 let sql = format!(
1102 "INSERT INTO {} ({}) VALUES ({}){}",
1103 dialect.quote_ident(target_table),
1104 quoted_cols.join(", "),
1105 placeholders.join(", "),
1106 effective_upsert_clause,
1107 );
1108 engine.execute_raw(&sql, values).await?;
1109 return Ok(());
1110 }
1111
1112 if update_payload.is_empty() {
1122 let (_, values, placeholders, quoted_cols) = build_insert_columns_and_values();
1128 let insert_sql = format!(
1129 "INSERT INTO {} ({}) VALUES ({})",
1130 dialect.quote_ident(target_table),
1131 quoted_cols.join(", "),
1132 placeholders.join(", "),
1133 );
1134 engine.execute_raw(&insert_sql, values).await?;
1135 return Ok(());
1136 }
1137
1138 let (set_text, mut update_params) =
1140 build_writeop_set_clause(&update_payload, dialect, 1);
1141 let next_placeholder = update_params.len() + 1;
1142 update_params.push(pk.clone());
1143 let update_sql = format!(
1144 "UPDATE {} SET {} WHERE {} = {}",
1145 dialect.quote_ident(target_table),
1146 set_text,
1147 dialect.quote_ident(target_pk),
1148 dialect.placeholder(next_placeholder),
1149 );
1150 let affected = engine.execute_raw(&update_sql, update_params).await?;
1151 if affected > 0 {
1152 return Ok(());
1153 }
1154 let (_, values, placeholders, quoted_cols) = build_insert_columns_and_values();
1156 let insert_sql = format!(
1157 "INSERT INTO {} ({}) VALUES ({})",
1158 dialect.quote_ident(target_table),
1159 quoted_cols.join(", "),
1160 placeholders.join(", "),
1161 );
1162 engine.execute_raw(&insert_sql, values).await?;
1163 Ok(())
1164 }
1165 NestedWriteOp::ConnectOrCreate {
1166 relation: _,
1167 target_table,
1168 foreign_key,
1169 where_filter,
1170 create_payload,
1171 } => {
1172 if matches!(where_filter, Filter::None) {
1179 return Err(crate::error::QueryError::not_found(target_table).with_context(
1180 "Nested ConnectOrCreate: empty `where` block would match every row; supply a unique filter",
1181 ));
1182 }
1183 let dialect = engine.dialect();
1184 let (filter_sql, filter_params) = where_filter.to_sql(2, &crate::dialect::Postgres);
1186 let mut update_params: Vec<FilterValue> =
1187 Vec::with_capacity(filter_params.len() + 1);
1188 update_params.push(parent_pk.clone());
1189 update_params.extend(filter_params);
1190 let update_sql = format!(
1191 "UPDATE {} SET {} = {} WHERE {}",
1192 dialect.quote_ident(target_table),
1193 dialect.quote_ident(foreign_key),
1194 dialect.placeholder(1),
1195 filter_sql,
1196 );
1197 let affected = engine.execute_raw(&update_sql, update_params).await?;
1198 if affected > 0 {
1199 return Ok(());
1200 }
1201 if create_payload.is_empty() {
1203 return Err(
1204 crate::error::QueryError::not_found(target_table).with_context(
1205 "Nested ConnectOrCreate: no match and create payload empty",
1206 ),
1207 );
1208 }
1209 let mut columns: Vec<String> =
1210 create_payload.iter().map(|(c, _)| c.clone()).collect();
1211 let mut values: Vec<FilterValue> =
1212 create_payload.into_iter().map(|(_, v)| v).collect();
1213 columns.push(foreign_key.to_string());
1214 values.push(parent_pk.clone());
1215 let placeholders: Vec<String> =
1216 (1..=values.len()).map(|i| dialect.placeholder(i)).collect();
1217 let quoted_cols: Vec<String> =
1218 columns.iter().map(|c| dialect.quote_ident(c)).collect();
1219 let insert_sql = format!(
1220 "INSERT INTO {} ({}) VALUES ({})",
1221 dialect.quote_ident(target_table),
1222 quoted_cols.join(", "),
1223 placeholders.join(", "),
1224 );
1225 engine.execute_raw(&insert_sql, values).await?;
1226 Ok(())
1227 }
1228 NestedWriteOp::Create {
1229 relation: _,
1230 target_table,
1231 foreign_key,
1232 payload,
1233 } => {
1234 if payload.is_empty() {
1235 return Ok(());
1236 }
1237
1238 let dialect = engine.dialect();
1239
1240 let first = &payload[0];
1245 let mut columns: Vec<String> = first.iter().map(|(c, _)| c.clone()).collect();
1246 columns.push(foreign_key.to_string());
1247 let cols_per_row = columns.len();
1248
1249 let quoted_cols: Vec<String> =
1250 columns.iter().map(|c| dialect.quote_ident(c)).collect();
1251
1252 let mut values: Vec<FilterValue> = Vec::with_capacity(payload.len() * cols_per_row);
1253 let mut row_placeholders: Vec<String> = Vec::with_capacity(payload.len());
1254 let mut next_placeholder = 1usize;
1255
1256 for child in payload {
1257 let mut row_phs: Vec<String> = Vec::with_capacity(cols_per_row);
1258 for (_, v) in child {
1259 values.push(v);
1260 row_phs.push(dialect.placeholder(next_placeholder));
1261 next_placeholder += 1;
1262 }
1263 values.push(parent_pk.clone());
1264 row_phs.push(dialect.placeholder(next_placeholder));
1265 next_placeholder += 1;
1266 row_placeholders.push(format!("({})", row_phs.join(", ")));
1267 }
1268
1269 let sql = format!(
1274 "INSERT INTO {} ({}) VALUES {}",
1275 dialect.quote_ident(target_table),
1276 quoted_cols.join(", "),
1277 row_placeholders.join(", "),
1278 );
1279
1280 engine.execute_raw(&sql, values).await?;
1281 Ok(())
1282 }
1283 NestedWriteOp::Set {
1284 relation: _,
1285 target_table,
1286 foreign_key,
1287 target_pk,
1288 set_pks,
1289 } => {
1290 let dialect = engine.dialect();
1291
1292 if set_pks.is_empty() {
1294 let sql = format!(
1296 "UPDATE {} SET {} = NULL WHERE {} = {}",
1297 dialect.quote_ident(target_table),
1298 dialect.quote_ident(foreign_key),
1299 dialect.quote_ident(foreign_key),
1300 dialect.placeholder(1),
1301 );
1302 engine.execute_raw(&sql, vec![parent_pk.clone()]).await?;
1303 return Ok(());
1304 }
1305 let mut disconnect_params: Vec<FilterValue> = Vec::with_capacity(set_pks.len() + 1);
1307 disconnect_params.push(parent_pk.clone());
1308 let mut not_in_placeholders: Vec<String> = Vec::with_capacity(set_pks.len());
1309 for (i, pk) in set_pks.iter().enumerate() {
1310 disconnect_params.push(pk.clone());
1311 not_in_placeholders.push(dialect.placeholder(i + 2));
1312 }
1313 let disconnect_sql = format!(
1314 "UPDATE {} SET {} = NULL WHERE {} = {} AND {} NOT IN ({})",
1315 dialect.quote_ident(target_table),
1316 dialect.quote_ident(foreign_key),
1317 dialect.quote_ident(foreign_key),
1318 dialect.placeholder(1),
1319 dialect.quote_ident(target_pk),
1320 not_in_placeholders.join(", "),
1321 );
1322 engine
1323 .execute_raw(&disconnect_sql, disconnect_params)
1324 .await?;
1325
1326 let mut connect_params: Vec<FilterValue> = Vec::with_capacity(set_pks.len() + 1);
1328 connect_params.push(parent_pk.clone());
1329 let mut in_placeholders: Vec<String> = Vec::with_capacity(set_pks.len());
1330 for (i, pk) in set_pks.iter().enumerate() {
1331 connect_params.push(pk.clone());
1332 in_placeholders.push(dialect.placeholder(i + 2));
1333 }
1334 let connect_sql = format!(
1335 "UPDATE {} SET {} = {} WHERE {} IN ({})",
1336 dialect.quote_ident(target_table),
1337 dialect.quote_ident(foreign_key),
1338 dialect.placeholder(1),
1339 dialect.quote_ident(target_pk),
1340 in_placeholders.join(", "),
1341 );
1342 engine.execute_raw(&connect_sql, connect_params).await?;
1343 Ok(())
1344 }
1345 }
1346 }
1347}
1348
1349#[cfg(test)]
1350mod tests {
1351 use super::*;
1352 use std::sync::{Arc, Mutex};
1353
1354 use crate::error::QueryError;
1355 use crate::traits::BoxFuture;
1356
1357 type StatementLog = Arc<Mutex<Vec<(String, Vec<FilterValue>)>>>;
1359
1360 #[derive(Clone, Copy)]
1361 enum DialectKind {
1362 Postgres,
1363 Mssql,
1364 NotSql,
1365 }
1366
1367 #[derive(Clone)]
1375 struct RecordingEngine {
1376 recorded: StatementLog,
1377 affected: Arc<Mutex<Vec<u64>>>,
1378 dialect_kind: DialectKind,
1379 }
1380
1381 impl RecordingEngine {
1382 fn new() -> Self {
1383 Self {
1384 recorded: Arc::new(Mutex::new(Vec::new())),
1385 affected: Arc::new(Mutex::new(Vec::new())),
1386 dialect_kind: DialectKind::Postgres,
1387 }
1388 }
1389
1390 fn with_affected(seq: Vec<u64>) -> Self {
1393 let mut rev = seq;
1395 rev.reverse();
1396 Self {
1397 recorded: Arc::new(Mutex::new(Vec::new())),
1398 affected: Arc::new(Mutex::new(rev)),
1399 dialect_kind: DialectKind::Postgres,
1400 }
1401 }
1402
1403 fn mssql() -> Self {
1404 Self {
1405 dialect_kind: DialectKind::Mssql,
1406 ..Self::new()
1407 }
1408 }
1409
1410 fn mssql_with_affected(seq: Vec<u64>) -> Self {
1411 Self {
1412 dialect_kind: DialectKind::Mssql,
1413 ..Self::with_affected(seq)
1414 }
1415 }
1416
1417 fn notsql() -> Self {
1418 Self {
1419 dialect_kind: DialectKind::NotSql,
1420 ..Self::new()
1421 }
1422 }
1423
1424 fn statements(&self) -> Vec<(String, Vec<FilterValue>)> {
1425 self.recorded.lock().unwrap().clone()
1426 }
1427 }
1428
1429 impl crate::traits::QueryEngine for RecordingEngine {
1430 fn dialect(&self) -> &dyn crate::dialect::SqlDialect {
1431 match self.dialect_kind {
1432 DialectKind::Postgres => &crate::dialect::Postgres,
1433 DialectKind::Mssql => &crate::dialect::Mssql,
1434 DialectKind::NotSql => &crate::dialect::NotSql,
1435 }
1436 }
1437
1438 fn query_many<T: Model + crate::row::FromRow + Send + 'static>(
1439 &self,
1440 _sql: &str,
1441 _params: Vec<FilterValue>,
1442 ) -> BoxFuture<'_, QueryResult<Vec<T>>> {
1443 Box::pin(async { Ok(Vec::new()) })
1444 }
1445
1446 fn query_one<T: Model + crate::row::FromRow + Send + 'static>(
1447 &self,
1448 _sql: &str,
1449 _params: Vec<FilterValue>,
1450 ) -> BoxFuture<'_, QueryResult<T>> {
1451 Box::pin(async { Err(QueryError::not_found("test")) })
1452 }
1453
1454 fn query_optional<T: Model + crate::row::FromRow + Send + 'static>(
1455 &self,
1456 _sql: &str,
1457 _params: Vec<FilterValue>,
1458 ) -> BoxFuture<'_, QueryResult<Option<T>>> {
1459 Box::pin(async { Ok(None) })
1460 }
1461
1462 fn execute_insert<T: Model + crate::row::FromRow + Send + 'static>(
1463 &self,
1464 _sql: &str,
1465 _params: Vec<FilterValue>,
1466 ) -> BoxFuture<'_, QueryResult<T>> {
1467 Box::pin(async { Err(QueryError::not_found("test")) })
1468 }
1469
1470 fn execute_update<T: Model + crate::row::FromRow + Send + 'static>(
1471 &self,
1472 _sql: &str,
1473 _params: Vec<FilterValue>,
1474 ) -> BoxFuture<'_, QueryResult<Vec<T>>> {
1475 Box::pin(async { Ok(Vec::new()) })
1476 }
1477
1478 fn execute_delete(
1479 &self,
1480 _sql: &str,
1481 _params: Vec<FilterValue>,
1482 ) -> BoxFuture<'_, QueryResult<u64>> {
1483 Box::pin(async { Ok(0) })
1484 }
1485
1486 fn execute_raw(
1487 &self,
1488 sql: &str,
1489 params: Vec<FilterValue>,
1490 ) -> BoxFuture<'_, QueryResult<u64>> {
1491 let recorded = self.recorded.clone();
1492 let affected = self.affected.clone();
1493 let sql = sql.to_string();
1494 Box::pin(async move {
1495 recorded.lock().unwrap().push((sql, params));
1496 let next = affected.lock().unwrap().pop().unwrap_or(1);
1497 Ok(next)
1498 })
1499 }
1500
1501 fn count(&self, _sql: &str, _params: Vec<FilterValue>) -> BoxFuture<'_, QueryResult<u64>> {
1502 Box::pin(async { Ok(0) })
1503 }
1504 }
1505
1506 struct TestModel;
1507
1508 impl Model for TestModel {
1509 const MODEL_NAME: &'static str = "Post";
1510 const TABLE_NAME: &'static str = "posts";
1511 const PRIMARY_KEY: &'static [&'static str] = &["id"];
1512 const COLUMNS: &'static [&'static str] = &["id", "title", "user_id"];
1513 }
1514
1515 struct TagModel;
1516
1517 impl Model for TagModel {
1518 const MODEL_NAME: &'static str = "Tag";
1519 const TABLE_NAME: &'static str = "tags";
1520 const PRIMARY_KEY: &'static [&'static str] = &["id"];
1521 const COLUMNS: &'static [&'static str] = &["id", "name"];
1522 }
1523
1524 #[test]
1525 fn test_nested_create_data() {
1526 let data: NestedCreateData<TestModel> =
1527 NestedCreateData::from_pairs([("title", FilterValue::String("Test Post".to_string()))]);
1528
1529 assert_eq!(data.data.len(), 1);
1530 assert_eq!(data.data[0].0, "title");
1531 }
1532
1533 #[test]
1534 fn test_nested_write_create() {
1535 let data: NestedCreateData<TestModel> =
1536 NestedCreateData::from_pairs([("title", FilterValue::String("Test Post".to_string()))]);
1537
1538 let write: NestedWrite<TestModel> = NestedWrite::create(data);
1539
1540 match write {
1541 NestedWrite::Create(creates) => assert_eq!(creates.len(), 1),
1542 _ => panic!("Expected Create variant"),
1543 }
1544 }
1545
1546 #[test]
1547 fn test_nested_write_connect() {
1548 let write: NestedWrite<TestModel> = NestedWrite::connect(vec![
1549 Filter::Equals("id".into(), FilterValue::Int(1)),
1550 Filter::Equals("id".into(), FilterValue::Int(2)),
1551 ]);
1552
1553 match write {
1554 NestedWrite::Connect(filters) => assert_eq!(filters.len(), 2),
1555 _ => panic!("Expected Connect variant"),
1556 }
1557 }
1558
1559 #[test]
1560 fn test_nested_write_disconnect() {
1561 let write: NestedWrite<TestModel> =
1562 NestedWrite::disconnect_one(Filter::Equals("id".into(), FilterValue::Int(1)));
1563
1564 match write {
1565 NestedWrite::Disconnect(filters) => assert_eq!(filters.len(), 1),
1566 _ => panic!("Expected Disconnect variant"),
1567 }
1568 }
1569
1570 #[test]
1571 fn test_nested_write_set() {
1572 let write: NestedWrite<TestModel> =
1573 NestedWrite::set(vec![Filter::Equals("id".into(), FilterValue::Int(1))]);
1574
1575 match write {
1576 NestedWrite::Set(filters) => assert_eq!(filters.len(), 1),
1577 _ => panic!("Expected Set variant"),
1578 }
1579 }
1580
1581 #[test]
1582 fn test_builder_one_to_many_connect() {
1583 let builder =
1584 NestedWriteBuilder::one_to_many("users", vec!["id".to_string()], "posts", "user_id");
1585
1586 let parent_id = FilterValue::Int(1);
1587 let filters = vec![Filter::Equals("id".into(), FilterValue::Int(10))];
1588
1589 let statements = builder.build_connect_sql::<TestModel>(&parent_id, &filters);
1590
1591 assert_eq!(statements.len(), 1);
1592 let (sql, params) = &statements[0];
1593 assert!(sql.contains("UPDATE"));
1594 assert!(sql.contains("posts"));
1595 assert!(sql.contains("user_id"));
1596 assert_eq!(params.len(), 2);
1597 }
1598
1599 #[test]
1600 fn test_builder_one_to_many_disconnect() {
1601 let builder =
1602 NestedWriteBuilder::one_to_many("users", vec!["id".to_string()], "posts", "user_id");
1603
1604 let parent_id = FilterValue::Int(1);
1605 let filters = vec![Filter::Equals("id".into(), FilterValue::Int(10))];
1606
1607 let statements = builder.build_disconnect_sql(&parent_id, &filters);
1608
1609 assert_eq!(statements.len(), 1);
1610 let (sql, params) = &statements[0];
1611 assert!(sql.contains("UPDATE"));
1612 assert!(sql.contains("SET"));
1613 assert!(sql.contains("NULL"));
1614 assert_eq!(params.len(), 2);
1615 }
1616
1617 #[test]
1618 fn test_builder_many_to_many_connect() {
1619 let builder = NestedWriteBuilder::many_to_many(
1620 "posts",
1621 vec!["id".to_string()],
1622 "tags",
1623 JoinTableInfo {
1624 table_name: "post_tags".to_string(),
1625 parent_column: "post_id".to_string(),
1626 related_column: "tag_id".to_string(),
1627 },
1628 );
1629
1630 let parent_id = FilterValue::Int(1);
1631 let filters = vec![Filter::Equals("id".into(), FilterValue::Int(10))];
1632
1633 let statements = builder.build_connect_sql::<TagModel>(&parent_id, &filters);
1634
1635 assert_eq!(statements.len(), 1);
1636 let (sql, _params) = &statements[0];
1637 assert!(sql.contains("INSERT INTO"));
1638 assert!(sql.contains("post_tags"));
1639 assert!(sql.contains("ON CONFLICT DO NOTHING"));
1640 }
1641
1642 #[test]
1643 fn test_builder_create() {
1644 let builder =
1645 NestedWriteBuilder::one_to_many("users", vec!["id".to_string()], "posts", "user_id");
1646
1647 let parent_id = FilterValue::Int(1);
1648 let creates = vec![NestedCreateData::<TestModel>::from_pairs([(
1649 "title",
1650 FilterValue::String("New Post".to_string()),
1651 )])];
1652
1653 let statements = builder.build_create_sql::<TestModel>(&parent_id, &creates);
1654
1655 assert_eq!(statements.len(), 1);
1656 let (sql, params) = &statements[0];
1657 assert!(sql.contains("INSERT INTO"));
1658 assert!(sql.contains("posts"));
1659 assert!(sql.contains("RETURNING"));
1660 assert_eq!(params.len(), 2); }
1662
1663 #[test]
1664 fn test_builder_set() {
1665 let builder =
1666 NestedWriteBuilder::one_to_many("users", vec!["id".to_string()], "posts", "user_id");
1667
1668 let parent_id = FilterValue::Int(1);
1669 let filters = vec![Filter::Equals("id".into(), FilterValue::Int(10))];
1670
1671 let statements = builder.build_set_sql::<TestModel>(&parent_id, &filters);
1672
1673 assert!(statements.len() >= 2);
1675
1676 let (first_sql, _) = &statements[0];
1678 assert!(first_sql.contains("UPDATE"));
1679 assert!(first_sql.contains("NULL"));
1680 }
1681
1682 #[test]
1683 fn test_nested_write_operations() {
1684 let mut ops = NestedWriteOperations::new();
1685 assert!(ops.is_empty());
1686 assert_eq!(ops.len(), 0);
1687
1688 ops.add_pre("SELECT 1".to_string(), vec![]);
1689 ops.add_post("SELECT 2".to_string(), vec![]);
1690
1691 assert!(!ops.is_empty());
1692 assert_eq!(ops.len(), 2);
1693 }
1694
1695 #[test]
1696 fn test_nested_create_or_connect() {
1697 let create_data: NestedCreateData<TestModel> =
1698 NestedCreateData::from_pairs([("title", FilterValue::String("New Post".to_string()))]);
1699
1700 let create_or_connect = NestedCreateOrConnectData::new(
1701 Filter::Equals("title".into(), FilterValue::String("Existing".to_string())),
1702 create_data,
1703 );
1704
1705 assert!(matches!(create_or_connect.filter, Filter::Equals(..)));
1706 assert_eq!(create_or_connect.create.data.len(), 1);
1707 }
1708
1709 #[test]
1710 fn test_nested_update_data() {
1711 let update: NestedUpdateData<TestModel> = NestedUpdateData::from_pairs(
1712 Filter::Equals("id".into(), FilterValue::Int(1)),
1713 [("title", FilterValue::String("Updated".to_string()))],
1714 );
1715
1716 assert!(matches!(update.filter, Filter::Equals(..)));
1717 assert_eq!(update.data.len(), 1);
1718 assert_eq!(update.data[0].0, "title");
1719 }
1720
1721 #[tokio::test]
1722 async fn nested_op_connect_emits_update_set_where() {
1723 let engine = RecordingEngine::new();
1724 let op = NestedWriteOp::Connect {
1725 relation: "posts",
1726 target_table: "posts",
1727 foreign_key: "author_id",
1728 target_pk: "id",
1729 pk: FilterValue::Int(42),
1730 };
1731 let parent_pk = FilterValue::Int(7);
1732 op.execute(&engine, &parent_pk).await.unwrap();
1733
1734 let stmts = engine.statements();
1735 assert_eq!(stmts.len(), 1, "expected one UPDATE statement");
1736 let (sql, params) = &stmts[0];
1737 assert!(sql.contains("UPDATE"), "got: {sql}");
1739 assert!(sql.contains("posts"), "got: {sql}");
1740 assert!(sql.contains("author_id"), "got: {sql}");
1741 assert!(sql.contains("SET"), "got: {sql}");
1742 assert!(sql.contains("WHERE"), "got: {sql}");
1743 assert!(sql.contains("$1"), "got: {sql}");
1744 assert!(sql.contains("$2"), "got: {sql}");
1745 assert_eq!(params, &vec![FilterValue::Int(7), FilterValue::Int(42)]);
1746 }
1747
1748 #[tokio::test]
1749 async fn nested_op_delete_many_with_filter_emits_fk_and_filter_clause() {
1750 let engine = RecordingEngine::new();
1751 let op = NestedWriteOp::DeleteMany {
1752 relation: "posts",
1753 target_table: "posts",
1754 foreign_key: "author_id",
1755 filter: Filter::Equals("published".into(), FilterValue::Bool(false)),
1756 };
1757 op.execute(&engine, &FilterValue::Int(7)).await.unwrap();
1758
1759 let stmts = engine.statements();
1760 assert_eq!(stmts.len(), 1);
1761 let (sql, params) = &stmts[0];
1762 assert!(sql.contains("DELETE FROM"), "got: {sql}");
1763 assert!(sql.contains("author_id"), "got: {sql}");
1764 assert!(sql.contains("$1"), "got: {sql}");
1765 assert!(sql.contains("AND"), "got: {sql}");
1766 assert!(sql.contains("published"), "got: {sql}");
1767 assert_eq!(params.len(), 2);
1768 assert!(matches!(params[0], FilterValue::Int(7)));
1769 assert!(matches!(params[1], FilterValue::Bool(false)));
1770 }
1771
1772 #[tokio::test]
1773 async fn nested_op_delete_many_with_empty_filter_omits_and_clause() {
1774 let engine = RecordingEngine::new();
1775 let op = NestedWriteOp::DeleteMany {
1776 relation: "posts",
1777 target_table: "posts",
1778 foreign_key: "author_id",
1779 filter: Filter::None,
1780 };
1781 op.execute(&engine, &FilterValue::Int(7)).await.unwrap();
1782
1783 let stmts = engine.statements();
1784 let (sql, params) = &stmts[0];
1785 assert!(sql.contains("DELETE FROM"), "got: {sql}");
1786 assert!(
1787 !sql.contains("AND"),
1788 "should omit AND when filter empty: {sql}"
1789 );
1790 assert_eq!(params.len(), 1);
1791 }
1792
1793 #[tokio::test]
1794 async fn nested_op_delete_emits_delete_where_pk() {
1795 let engine = RecordingEngine::new();
1796 let op = NestedWriteOp::Delete {
1797 relation: "posts",
1798 target_table: "posts",
1799 target_pk: "id",
1800 pk: FilterValue::Int(42),
1801 };
1802 op.execute(&engine, &FilterValue::Int(7)).await.unwrap();
1803
1804 let stmts = engine.statements();
1805 assert_eq!(stmts.len(), 1);
1806 let (sql, params) = &stmts[0];
1807 assert!(sql.contains("DELETE FROM"), "got: {sql}");
1808 assert!(sql.contains("posts"), "got: {sql}");
1809 assert!(sql.contains("WHERE"), "got: {sql}");
1810 assert_eq!(params, &vec![FilterValue::Int(42)]);
1811 }
1812
1813 #[tokio::test]
1814 async fn nested_op_disconnect_emits_update_set_null() {
1815 let engine = RecordingEngine::new();
1816 let op = NestedWriteOp::Disconnect {
1817 relation: "posts",
1818 target_table: "posts",
1819 foreign_key: "author_id",
1820 target_pk: "id",
1821 pk: FilterValue::Int(42),
1822 };
1823 op.execute(&engine, &FilterValue::Int(7)).await.unwrap();
1824
1825 let stmts = engine.statements();
1826 assert_eq!(stmts.len(), 1);
1827 let (sql, params) = &stmts[0];
1828 assert!(sql.contains("UPDATE"), "got: {sql}");
1829 assert!(sql.contains("posts"), "got: {sql}");
1830 assert!(sql.contains("author_id"), "got: {sql}");
1831 assert!(sql.contains("NULL"), "got: {sql}");
1832 assert!(sql.contains("WHERE"), "got: {sql}");
1833 assert_eq!(params, &vec![FilterValue::Int(42)]);
1834 }
1835
1836 #[tokio::test]
1837 async fn nested_op_update_plain_set() {
1838 use crate::inputs::WriteOp;
1839 let engine = RecordingEngine::new();
1840 let op = NestedWriteOp::Update {
1841 relation: "posts",
1842 target_table: "posts",
1843 target_pk: "id",
1844 pk: FilterValue::Int(42),
1845 payload: vec![(
1846 "title".to_string(),
1847 WriteOp::Set(FilterValue::String("renamed".to_string())),
1848 )],
1849 };
1850 op.execute(&engine, &FilterValue::Int(7)).await.unwrap();
1851
1852 let stmts = engine.statements();
1853 assert_eq!(stmts.len(), 1);
1854 let (sql, params) = &stmts[0];
1855 assert!(sql.contains("UPDATE"), "got: {sql}");
1856 assert!(sql.contains("posts"), "got: {sql}");
1857 assert!(sql.contains("title"), "got: {sql}");
1858 assert!(sql.contains("SET"), "got: {sql}");
1859 assert!(sql.contains("WHERE"), "got: {sql}");
1860 assert!(sql.contains("$1"), "got: {sql}");
1861 assert!(sql.contains("$2"), "got: {sql}");
1862 assert_eq!(params.len(), 2);
1863 assert!(matches!(params[0], FilterValue::String(_)));
1864 assert_eq!(params[1], FilterValue::Int(42));
1865 }
1866
1867 #[tokio::test]
1868 async fn nested_op_update_increment() {
1869 use crate::inputs::WriteOp;
1870 let engine = RecordingEngine::new();
1871 let op = NestedWriteOp::Update {
1872 relation: "posts",
1873 target_table: "posts",
1874 target_pk: "id",
1875 pk: FilterValue::Int(42),
1876 payload: vec![("views".to_string(), WriteOp::Increment(FilterValue::Int(1)))],
1877 };
1878 op.execute(&engine, &FilterValue::Int(7)).await.unwrap();
1879
1880 let stmts = engine.statements();
1881 let (sql, _) = &stmts[0];
1882 assert!(sql.contains("+"), "got: {sql}");
1884 assert!(sql.contains("views"), "got: {sql}");
1885 }
1886
1887 #[tokio::test]
1888 async fn nested_op_update_mixed_set_and_increment() {
1889 use crate::inputs::WriteOp;
1890 let engine = RecordingEngine::new();
1891 let op = NestedWriteOp::Update {
1892 relation: "posts",
1893 target_table: "posts",
1894 target_pk: "id",
1895 pk: FilterValue::Int(42),
1896 payload: vec![
1897 (
1898 "title".to_string(),
1899 WriteOp::Set(FilterValue::String("renamed".to_string())),
1900 ),
1901 ("views".to_string(), WriteOp::Increment(FilterValue::Int(1))),
1902 ],
1903 };
1904 op.execute(&engine, &FilterValue::Int(7)).await.unwrap();
1905
1906 let stmts = engine.statements();
1907 let (sql, params) = &stmts[0];
1908 assert!(sql.contains("title"), "got: {sql}");
1909 assert!(sql.contains("views"), "got: {sql}");
1910 assert!(sql.contains("+"), "got: {sql}");
1911 assert!(sql.contains("$3"), "got: {sql}");
1913 assert_eq!(params.len(), 3);
1914 }
1915
1916 #[tokio::test]
1917 async fn nested_op_update_empty_payload_is_noop() {
1918 let engine = RecordingEngine::new();
1919 let op = NestedWriteOp::Update {
1920 relation: "posts",
1921 target_table: "posts",
1922 target_pk: "id",
1923 pk: FilterValue::Int(42),
1924 payload: vec![],
1925 };
1926 op.execute(&engine, &FilterValue::Int(7)).await.unwrap();
1927 assert!(
1928 engine.statements().is_empty(),
1929 "empty payload should emit no SQL"
1930 );
1931 }
1932
1933 #[tokio::test]
1934 async fn nested_op_update_many_with_filter() {
1935 use crate::inputs::WriteOp;
1936 let engine = RecordingEngine::new();
1937 let op = NestedWriteOp::UpdateMany {
1938 relation: "posts",
1939 target_table: "posts",
1940 foreign_key: "author_id",
1941 filter: Filter::Equals("published".into(), FilterValue::Bool(false)),
1942 payload: vec![("views".to_string(), WriteOp::Set(FilterValue::Int(0)))],
1943 };
1944 op.execute(&engine, &FilterValue::Int(7)).await.unwrap();
1945
1946 let stmts = engine.statements();
1947 assert_eq!(stmts.len(), 1);
1948 let (sql, params) = &stmts[0];
1949 assert!(sql.contains("UPDATE"), "got: {sql}");
1950 assert!(sql.contains("posts"), "got: {sql}");
1951 assert!(sql.contains("author_id"), "got: {sql}");
1952 assert!(sql.contains("AND"), "got: {sql}");
1953 assert!(sql.contains("published"), "got: {sql}");
1954 assert_eq!(params.len(), 3);
1956 assert_eq!(params[0], FilterValue::Int(0));
1957 assert_eq!(params[1], FilterValue::Int(7));
1958 assert_eq!(params[2], FilterValue::Bool(false));
1959 }
1960
1961 #[tokio::test]
1962 async fn nested_op_update_many_with_empty_filter() {
1963 use crate::inputs::WriteOp;
1964 let engine = RecordingEngine::new();
1965 let op = NestedWriteOp::UpdateMany {
1966 relation: "posts",
1967 target_table: "posts",
1968 foreign_key: "author_id",
1969 filter: Filter::None,
1970 payload: vec![("views".to_string(), WriteOp::Set(FilterValue::Int(0)))],
1971 };
1972 op.execute(&engine, &FilterValue::Int(7)).await.unwrap();
1973
1974 let stmts = engine.statements();
1975 let (sql, params) = &stmts[0];
1976 assert!(sql.contains("UPDATE"), "got: {sql}");
1977 assert!(
1978 !sql.contains("AND"),
1979 "should omit AND when filter empty: {sql}"
1980 );
1981 assert_eq!(params.len(), 2);
1983 }
1984
1985 #[test]
1986 fn test_nested_upsert_data() {
1987 let create: NestedCreateData<TestModel> =
1988 NestedCreateData::from_pairs([("title", FilterValue::String("New".to_string()))]);
1989
1990 let upsert: NestedUpsertData<TestModel> = NestedUpsertData::new(
1991 Filter::Equals("id".into(), FilterValue::Int(1)),
1992 create,
1993 vec![(
1994 "title".to_string(),
1995 FilterValue::String("Updated".to_string()),
1996 )],
1997 );
1998
1999 assert!(matches!(upsert.filter, Filter::Equals(..)));
2000 assert_eq!(upsert.create.data.len(), 1);
2001 assert_eq!(upsert.update.len(), 1);
2002 }
2003
2004 #[tokio::test]
2005 async fn nested_op_upsert_single_statement_on_postgres() {
2006 use crate::inputs::WriteOp;
2007 let engine = RecordingEngine::new();
2011 let op = NestedWriteOp::Upsert {
2012 relation: "posts",
2013 target_table: "posts",
2014 foreign_key: "author_id",
2015 target_pk: "id",
2016 pk: FilterValue::Int(99),
2017 create_payload: vec![("title".to_string(), FilterValue::String("new".to_string()))],
2018 update_payload: vec![("views".to_string(), WriteOp::Increment(FilterValue::Int(1)))],
2019 };
2020 op.execute(&engine, &FilterValue::Int(7)).await.unwrap();
2021
2022 let stmts = engine.statements();
2023 assert_eq!(
2024 stmts.len(),
2025 1,
2026 "expected a single-statement upsert; got {stmts:#?}"
2027 );
2028 let (sql, params) = &stmts[0];
2029 assert!(sql.contains("INSERT INTO"), "got: {sql}");
2030 assert!(sql.contains("posts"), "got: {sql}");
2031 assert!(sql.contains("ON CONFLICT (\"id\")"), "got: {sql}");
2032 assert!(sql.contains("DO UPDATE SET"), "got: {sql}");
2033 assert!(
2036 sql.contains("VALUES ($1, $2)"),
2037 "INSERT VALUES placeholders: {sql}"
2038 );
2039 assert!(sql.contains("$3"), "got: {sql}");
2040 assert_eq!(params.len(), 3);
2042 assert_eq!(params[0], FilterValue::String("new".to_string()));
2043 assert_eq!(params[1], FilterValue::Int(7));
2044 assert_eq!(params[2], FilterValue::Int(1));
2045 }
2046
2047 #[tokio::test]
2048 async fn nested_op_upsert_two_statement_fallback_on_mssql_update_path() {
2049 use crate::inputs::WriteOp;
2050 let engine = RecordingEngine::mssql();
2055 let op = NestedWriteOp::Upsert {
2056 relation: "posts",
2057 target_table: "posts",
2058 foreign_key: "author_id",
2059 target_pk: "id",
2060 pk: FilterValue::Int(99),
2061 create_payload: vec![("title".to_string(), FilterValue::String("new".to_string()))],
2062 update_payload: vec![("views".to_string(), WriteOp::Increment(FilterValue::Int(1)))],
2063 };
2064 op.execute(&engine, &FilterValue::Int(7)).await.unwrap();
2065
2066 let stmts = engine.statements();
2067 assert_eq!(
2068 stmts.len(),
2069 1,
2070 "expected only the UPDATE — INSERT should not have run"
2071 );
2072 let (sql, update_params) = &stmts[0];
2073 assert!(sql.starts_with("UPDATE"), "got: {sql}");
2074 assert!(!sql.contains("ON CONFLICT"), "got: {sql}");
2075 assert!(!sql.contains("ON DUPLICATE"), "got: {sql}");
2076 assert!(sql.contains("[posts]"), "got: {sql}");
2078 assert!(sql.contains("@P1"), "SET clause should use @P1: {sql}");
2079 assert!(sql.contains("@P2"), "WHERE clause should use @P2: {sql}");
2080 assert_eq!(update_params.len(), 2);
2082 assert_eq!(update_params[0], FilterValue::Int(1)); assert_eq!(update_params[1], FilterValue::Int(99)); }
2085
2086 #[tokio::test]
2087 async fn nested_op_upsert_two_statement_fallback_on_mssql_insert_path() {
2088 use crate::inputs::WriteOp;
2089 let engine = RecordingEngine::mssql_with_affected(vec![0, 1]);
2092 let op = NestedWriteOp::Upsert {
2093 relation: "posts",
2094 target_table: "posts",
2095 foreign_key: "author_id",
2096 target_pk: "id",
2097 pk: FilterValue::Int(99),
2098 create_payload: vec![("title".to_string(), FilterValue::String("new".to_string()))],
2099 update_payload: vec![("views".to_string(), WriteOp::Increment(FilterValue::Int(1)))],
2100 };
2101 op.execute(&engine, &FilterValue::Int(7)).await.unwrap();
2102
2103 let stmts = engine.statements();
2104 assert_eq!(stmts.len(), 2, "expected UPDATE then INSERT");
2105 let (update_sql, _) = &stmts[0];
2106 assert!(update_sql.starts_with("UPDATE"), "got: {update_sql}");
2107 let (insert_sql, insert_params) = &stmts[1];
2108 assert!(insert_sql.starts_with("INSERT INTO"), "got: {insert_sql}");
2109 assert!(!insert_sql.contains("ON CONFLICT"), "got: {insert_sql}");
2110 assert!(insert_sql.contains("[posts]"), "got: {insert_sql}");
2111 assert!(insert_sql.contains("[author_id]"), "got: {insert_sql}");
2112 assert_eq!(insert_params.len(), 2);
2113 assert_eq!(insert_params[0], FilterValue::String("new".to_string()));
2114 assert_eq!(insert_params[1], FilterValue::Int(7));
2115 }
2116
2117 #[tokio::test]
2118 async fn nested_op_connect_or_create_connect_path_when_affected() {
2119 let engine = RecordingEngine::with_affected(vec![1]);
2123 let op = NestedWriteOp::ConnectOrCreate {
2124 relation: "posts",
2125 target_table: "posts",
2126 foreign_key: "author_id",
2127 where_filter: Filter::Equals("id".into(), FilterValue::Int(42)),
2128 create_payload: vec![(
2129 "title".to_string(),
2130 FilterValue::String("fallback".to_string()),
2131 )],
2132 };
2133 op.execute(&engine, &FilterValue::Int(7)).await.unwrap();
2134
2135 let stmts = engine.statements();
2136 assert_eq!(
2137 stmts.len(),
2138 1,
2139 "expected only the UPDATE — INSERT should not have run"
2140 );
2141 let (sql, params) = &stmts[0];
2142 assert!(sql.contains("UPDATE"), "got: {sql}");
2143 assert!(!sql.contains("INSERT"), "got: {sql}");
2144 assert!(sql.contains("posts"), "got: {sql}");
2145 assert!(sql.contains("author_id"), "got: {sql}");
2146 assert_eq!(params.len(), 2);
2148 assert_eq!(params[0], FilterValue::Int(7));
2149 assert_eq!(params[1], FilterValue::Int(42));
2150 }
2151
2152 #[tokio::test]
2153 async fn nested_op_connect_or_create_create_path_when_zero_affected() {
2154 let engine = RecordingEngine::with_affected(vec![0, 1]);
2158 let op = NestedWriteOp::ConnectOrCreate {
2159 relation: "posts",
2160 target_table: "posts",
2161 foreign_key: "author_id",
2162 where_filter: Filter::Equals("id".into(), FilterValue::Int(42)),
2163 create_payload: vec![(
2164 "title".to_string(),
2165 FilterValue::String("fallback".to_string()),
2166 )],
2167 };
2168 op.execute(&engine, &FilterValue::Int(7)).await.unwrap();
2169
2170 let stmts = engine.statements();
2171 assert_eq!(stmts.len(), 2);
2172 let (update_sql, _) = &stmts[0];
2173 assert!(update_sql.contains("UPDATE"), "got: {update_sql}");
2174 let (insert_sql, insert_params) = &stmts[1];
2175 assert!(insert_sql.contains("INSERT INTO"), "got: {insert_sql}");
2176 assert!(insert_sql.contains("posts"), "got: {insert_sql}");
2177 assert!(insert_sql.contains("title"), "got: {insert_sql}");
2178 assert!(insert_sql.contains("author_id"), "got: {insert_sql}");
2179 assert_eq!(insert_params.len(), 2);
2182 assert_eq!(
2183 insert_params[0],
2184 FilterValue::String("fallback".to_string())
2185 );
2186 assert_eq!(insert_params[1], FilterValue::Int(7));
2187 }
2188
2189 #[tokio::test]
2190 async fn nested_op_set_with_empty_list_clears_all_children() {
2191 let engine = RecordingEngine::new();
2194 let op = NestedWriteOp::Set {
2195 relation: "posts",
2196 target_table: "posts",
2197 foreign_key: "author_id",
2198 target_pk: "id",
2199 set_pks: vec![],
2200 };
2201 op.execute(&engine, &FilterValue::Int(7)).await.unwrap();
2202
2203 let stmts = engine.statements();
2204 assert_eq!(stmts.len(), 1, "expected only the disconnect-all UPDATE");
2205 let (sql, params) = &stmts[0];
2206 assert!(sql.contains("UPDATE"), "got: {sql}");
2207 assert!(sql.contains("posts"), "got: {sql}");
2208 assert!(sql.contains("author_id"), "got: {sql}");
2209 assert!(sql.contains("= NULL"), "got: {sql}");
2210 assert!(!sql.contains("NOT IN"), "got: {sql}");
2211 assert!(!sql.contains(" IN ("), "got: {sql}");
2212 assert_eq!(params.len(), 1);
2214 assert_eq!(params[0], FilterValue::Int(7));
2215 }
2216
2217 #[tokio::test]
2218 async fn nested_op_set_with_non_empty_list_emits_disconnect_then_connect() {
2219 let engine = RecordingEngine::new();
2220 let op = NestedWriteOp::Set {
2221 relation: "posts",
2222 target_table: "posts",
2223 foreign_key: "author_id",
2224 target_pk: "id",
2225 set_pks: vec![
2226 FilterValue::Int(1),
2227 FilterValue::Int(2),
2228 FilterValue::Int(3),
2229 ],
2230 };
2231 op.execute(&engine, &FilterValue::Int(7)).await.unwrap();
2232
2233 let stmts = engine.statements();
2234 assert_eq!(stmts.len(), 2);
2235 let (disconnect_sql, disconnect_params) = &stmts[0];
2236 assert!(disconnect_sql.contains("UPDATE"), "got: {disconnect_sql}");
2237 assert!(disconnect_sql.contains("posts"), "got: {disconnect_sql}");
2238 assert!(
2239 disconnect_sql.contains("author_id"),
2240 "got: {disconnect_sql}"
2241 );
2242 assert!(disconnect_sql.contains("= NULL"), "got: {disconnect_sql}");
2243 assert!(disconnect_sql.contains("NOT IN"), "got: {disconnect_sql}");
2244 assert_eq!(disconnect_params.len(), 4);
2246 assert_eq!(disconnect_params[0], FilterValue::Int(7));
2247 assert_eq!(disconnect_params[1], FilterValue::Int(1));
2248 assert_eq!(disconnect_params[2], FilterValue::Int(2));
2249 assert_eq!(disconnect_params[3], FilterValue::Int(3));
2250
2251 let (connect_sql, connect_params) = &stmts[1];
2252 assert!(connect_sql.contains("UPDATE"), "got: {connect_sql}");
2253 assert!(connect_sql.contains("posts"), "got: {connect_sql}");
2254 assert!(connect_sql.contains("author_id"), "got: {connect_sql}");
2255 assert!(connect_sql.contains(" IN ("), "got: {connect_sql}");
2256 assert!(!connect_sql.contains("NOT IN"), "got: {connect_sql}");
2257 assert_eq!(connect_params.len(), 4);
2259 assert_eq!(connect_params[0], FilterValue::Int(7));
2260 assert_eq!(connect_params[1], FilterValue::Int(1));
2261 assert_eq!(connect_params[2], FilterValue::Int(2));
2262 assert_eq!(connect_params[3], FilterValue::Int(3));
2263 }
2264
2265 #[tokio::test]
2266 async fn nested_op_set_with_single_element_uses_single_placeholder_in_lists() {
2267 let engine = RecordingEngine::new();
2268 let op = NestedWriteOp::Set {
2269 relation: "posts",
2270 target_table: "posts",
2271 foreign_key: "author_id",
2272 target_pk: "id",
2273 set_pks: vec![FilterValue::Int(5)],
2274 };
2275 op.execute(&engine, &FilterValue::Int(7)).await.unwrap();
2276
2277 let stmts = engine.statements();
2278 assert_eq!(stmts.len(), 2);
2279 let (disconnect_sql, _) = &stmts[0];
2280 assert!(
2282 disconnect_sql.contains("NOT IN ($2)"),
2283 "got: {disconnect_sql}"
2284 );
2285 let (connect_sql, _) = &stmts[1];
2286 assert!(connect_sql.contains(" IN ($2)"), "got: {connect_sql}");
2287 assert!(!connect_sql.contains("NOT IN"), "got: {connect_sql}");
2288 }
2289
2290 #[tokio::test]
2291 async fn nested_op_set_disconnect_clears_only_current_parents_children() {
2292 let engine = RecordingEngine::new();
2295 let op = NestedWriteOp::Set {
2296 relation: "posts",
2297 target_table: "posts",
2298 foreign_key: "author_id",
2299 target_pk: "id",
2300 set_pks: vec![FilterValue::Int(1)],
2301 };
2302 op.execute(&engine, &FilterValue::Int(7)).await.unwrap();
2303
2304 let stmts = engine.statements();
2305 let (disconnect_sql, _) = &stmts[0];
2306 assert!(
2308 disconnect_sql.contains("author_id\" = $1"),
2309 "expected `author_id = $1` clause; got: {disconnect_sql}"
2310 );
2311 }
2312
2313 #[tokio::test]
2314 async fn nested_op_connect_or_create_rejects_empty_where() {
2315 let engine = RecordingEngine::new();
2318 let op = NestedWriteOp::ConnectOrCreate {
2319 relation: "posts",
2320 target_table: "posts",
2321 foreign_key: "author_id",
2322 where_filter: Filter::None,
2323 create_payload: vec![(
2324 "title".to_string(),
2325 FilterValue::String("fallback".to_string()),
2326 )],
2327 };
2328 let err = op
2329 .execute(&engine, &FilterValue::Int(7))
2330 .await
2331 .expect_err("empty where must be rejected");
2332 let op_ctx = err.context.operation.clone().unwrap_or_default();
2333 assert!(op_ctx.contains("ConnectOrCreate"), "got: {op_ctx}");
2334 assert!(engine.statements().is_empty());
2336 }
2337
2338 #[tokio::test]
2339 async fn nested_op_upsert_single_statement_empty_update_payload_emits_do_nothing() {
2340 let engine = RecordingEngine::new();
2344 let op = NestedWriteOp::Upsert {
2345 relation: "posts",
2346 target_table: "posts",
2347 foreign_key: "author_id",
2348 target_pk: "id",
2349 pk: FilterValue::Int(99),
2350 create_payload: vec![("title".to_string(), FilterValue::String("new".into()))],
2351 update_payload: vec![],
2352 };
2353 op.execute(&engine, &FilterValue::Int(7)).await.unwrap();
2354
2355 let stmts = engine.statements();
2356 assert_eq!(
2357 stmts.len(),
2358 1,
2359 "expected one INSERT ... DO NOTHING; got {stmts:#?}"
2360 );
2361 let (sql, params) = &stmts[0];
2362 assert!(sql.starts_with("INSERT INTO"), "got: {sql}");
2363 assert!(sql.contains("posts"), "got: {sql}");
2364 assert!(sql.contains("ON CONFLICT (\"id\")"), "got: {sql}");
2365 assert!(sql.contains("DO NOTHING"), "got: {sql}");
2366 assert_eq!(params.len(), 2);
2368 assert_eq!(params[0], FilterValue::String("new".into()));
2369 assert_eq!(params[1], FilterValue::Int(7));
2370 }
2371
2372 #[tokio::test]
2373 async fn nested_op_upsert_two_statement_fallback_empty_update_payload_emits_bare_insert() {
2374 let engine = RecordingEngine::mssql();
2381 let op = NestedWriteOp::Upsert {
2382 relation: "posts",
2383 target_table: "posts",
2384 foreign_key: "author_id",
2385 target_pk: "id",
2386 pk: FilterValue::Int(99),
2387 create_payload: vec![("title".to_string(), FilterValue::String("new".into()))],
2388 update_payload: vec![],
2389 };
2390 op.execute(&engine, &FilterValue::Int(7)).await.unwrap();
2391
2392 let stmts = engine.statements();
2393 assert_eq!(stmts.len(), 1, "expected one bare INSERT; got {stmts:#?}");
2394 let (sql, params) = &stmts[0];
2395 assert!(sql.starts_with("INSERT INTO"), "got: {sql}");
2396 assert!(sql.contains("[posts]"), "got: {sql}");
2397 assert!(!sql.contains("ON CONFLICT"), "got: {sql}");
2398 assert_eq!(params.len(), 2);
2399 assert_eq!(params[0], FilterValue::String("new".into()));
2400 assert_eq!(params[1], FilterValue::Int(7));
2401 }
2402
2403 #[tokio::test]
2404 async fn nested_op_upsert_single_statement_all_unset_update_payload() {
2405 use crate::inputs::WriteOp;
2406 let engine = RecordingEngine::new();
2410 let op = NestedWriteOp::Upsert {
2411 relation: "posts",
2412 target_table: "posts",
2413 foreign_key: "author_id",
2414 target_pk: "id",
2415 pk: FilterValue::Int(99),
2416 create_payload: vec![("title".to_string(), FilterValue::String("new".into()))],
2417 update_payload: vec![("deleted_at".to_string(), WriteOp::Unset)],
2418 };
2419 op.execute(&engine, &FilterValue::Int(7)).await.unwrap();
2420
2421 let stmts = engine.statements();
2422 assert_eq!(
2423 stmts.len(),
2424 1,
2425 "expected one INSERT...ON CONFLICT statement; got {stmts:#?}"
2426 );
2427 let (sql, params) = &stmts[0];
2428 assert!(sql.starts_with("INSERT INTO"), "got: {sql}");
2429 assert!(
2430 sql.contains("ON CONFLICT (\"id\") DO UPDATE SET"),
2431 "got: {sql}"
2432 );
2433 assert!(sql.contains("\"deleted_at\" = NULL"), "got: {sql}");
2434 assert_eq!(params.len(), 2);
2436 assert_eq!(params[0], FilterValue::String("new".into()));
2437 assert_eq!(params[1], FilterValue::Int(7));
2438 }
2439
2440 #[tokio::test]
2441 async fn nested_op_upsert_empty_create_payload_returns_invalid_input() {
2442 use crate::inputs::WriteOp;
2443 let engine = RecordingEngine::new();
2444 let op = NestedWriteOp::Upsert {
2445 relation: "posts",
2446 target_table: "posts",
2447 foreign_key: "author_id",
2448 target_pk: "id",
2449 pk: FilterValue::Int(99),
2450 create_payload: vec![],
2451 update_payload: vec![("views".to_string(), WriteOp::Increment(FilterValue::Int(1)))],
2452 };
2453 let err = op.execute(&engine, &FilterValue::Int(7)).await.unwrap_err();
2454 assert_eq!(
2455 err.code,
2456 crate::error::ErrorCode::InvalidParameter,
2457 "expected InvalidParameter (invalid_input), got: {err:?}"
2458 );
2459 let msg = format!("{err}");
2460 assert!(
2461 msg.contains("create_payload") || msg.contains("create column"),
2462 "msg: {msg}"
2463 );
2464 }
2465
2466 #[tokio::test]
2467 async fn nested_op_upsert_on_notsql_returns_unsupported_not_panic() {
2468 use crate::inputs::WriteOp;
2469 let engine = RecordingEngine::notsql();
2470 let op = NestedWriteOp::Upsert {
2471 relation: "posts",
2472 target_table: "posts",
2473 foreign_key: "author_id",
2474 target_pk: "id",
2475 pk: FilterValue::Int(99),
2476 create_payload: vec![("title".to_string(), FilterValue::String("x".into()))],
2477 update_payload: vec![("views".to_string(), WriteOp::Increment(FilterValue::Int(1)))],
2478 };
2479 let err = op.execute(&engine, &FilterValue::Int(7)).await.unwrap_err();
2480 let msg = format!("{err}");
2481 assert!(
2482 msg.contains("Upsert is not supported")
2483 || msg.contains("unsupported")
2484 || msg.contains("Unsupported"),
2485 "msg: {msg}"
2486 );
2487 assert_eq!(
2488 engine.statements().len(),
2489 0,
2490 "no SQL should have been emitted"
2491 );
2492 }
2493}