1use crate::ObjectKey;
11use asupersync::{Cx, Outcome};
12use sqlmodel_core::{Connection, Error, Model, Value, quote_ident};
13use std::collections::HashMap;
14
15#[derive(Debug, Clone)]
17pub enum PendingOp {
18 Insert {
20 key: ObjectKey,
22 table: &'static str,
24 columns: Vec<&'static str>,
26 values: Vec<Value>,
28 },
29 Update {
31 key: ObjectKey,
33 table: &'static str,
35 pk_columns: Vec<&'static str>,
37 pk_values: Vec<Value>,
39 set_columns: Vec<&'static str>,
41 set_values: Vec<Value>,
43 },
44 Delete {
46 key: ObjectKey,
48 table: &'static str,
50 pk_columns: Vec<&'static str>,
52 pk_values: Vec<Value>,
54 },
55}
56
57#[derive(Debug, Clone)]
59pub enum LinkTableOp {
60 Link {
62 table: String,
64 local_columns: Vec<String>,
66 local_values: Vec<Value>,
68 remote_columns: Vec<String>,
70 remote_values: Vec<Value>,
72 },
73 Unlink {
75 table: String,
77 local_columns: Vec<String>,
79 local_values: Vec<Value>,
81 remote_columns: Vec<String>,
83 remote_values: Vec<Value>,
85 },
86}
87
88impl LinkTableOp {
89 pub fn link(
91 table: impl Into<String>,
92 local_column: impl Into<String>,
93 local_value: Value,
94 remote_column: impl Into<String>,
95 remote_value: Value,
96 ) -> Self {
97 Self::link_multi(
98 table,
99 vec![local_column.into()],
100 vec![local_value],
101 vec![remote_column.into()],
102 vec![remote_value],
103 )
104 }
105
106 pub fn unlink(
108 table: impl Into<String>,
109 local_column: impl Into<String>,
110 local_value: Value,
111 remote_column: impl Into<String>,
112 remote_value: Value,
113 ) -> Self {
114 Self::unlink_multi(
115 table,
116 vec![local_column.into()],
117 vec![local_value],
118 vec![remote_column.into()],
119 vec![remote_value],
120 )
121 }
122
123 pub fn link_multi(
125 table: impl Into<String>,
126 local_columns: Vec<String>,
127 local_values: Vec<Value>,
128 remote_columns: Vec<String>,
129 remote_values: Vec<Value>,
130 ) -> Self {
131 Self::Link {
132 table: table.into(),
133 local_columns,
134 local_values,
135 remote_columns,
136 remote_values,
137 }
138 }
139
140 pub fn unlink_multi(
142 table: impl Into<String>,
143 local_columns: Vec<String>,
144 local_values: Vec<Value>,
145 remote_columns: Vec<String>,
146 remote_values: Vec<Value>,
147 ) -> Self {
148 Self::Unlink {
149 table: table.into(),
150 local_columns,
151 local_values,
152 remote_columns,
153 remote_values,
154 }
155 }
156
157 pub fn table(&self) -> &str {
159 match self {
160 LinkTableOp::Link { table, .. } => table,
161 LinkTableOp::Unlink { table, .. } => table,
162 }
163 }
164
165 pub fn is_link(&self) -> bool {
167 matches!(self, LinkTableOp::Link { .. })
168 }
169
170 pub fn is_unlink(&self) -> bool {
172 matches!(self, LinkTableOp::Unlink { .. })
173 }
174
175 pub fn to_sql(&self) -> String {
179 match self {
180 LinkTableOp::Link {
181 table,
182 local_columns,
183 remote_columns,
184 ..
185 } => format!(
186 "INSERT INTO {} ({}) VALUES ({})",
187 quote_ident(table),
188 local_columns
189 .iter()
190 .chain(remote_columns.iter())
191 .map(|c| quote_ident(c))
192 .collect::<Vec<_>>()
193 .join(", "),
194 (1..=(local_columns.len() + remote_columns.len()))
195 .map(|i| format!("${}", i))
196 .collect::<Vec<_>>()
197 .join(", ")
198 ),
199 LinkTableOp::Unlink {
200 table,
201 local_columns,
202 remote_columns,
203 ..
204 } => format!(
205 "DELETE FROM {} WHERE {}",
206 quote_ident(table),
207 local_columns
208 .iter()
209 .chain(remote_columns.iter())
210 .enumerate()
211 .map(|(i, c)| format!("{} = ${}", quote_ident(c), i + 1))
212 .collect::<Vec<_>>()
213 .join(" AND ")
214 ),
215 }
216 }
217
218 #[tracing::instrument(level = "debug", skip(cx, conn))]
220 pub async fn execute<C: Connection>(&self, cx: &Cx, conn: &C) -> Outcome<(), Error> {
221 let dialect = conn.dialect();
222 match self {
223 LinkTableOp::Link {
224 table,
225 local_columns,
226 local_values,
227 remote_columns,
228 remote_values,
229 } => {
230 if local_columns.len() != local_values.len()
231 || remote_columns.len() != remote_values.len()
232 {
233 return Outcome::Err(Error::Custom(
234 "link op columns/values length mismatch".to_string(),
235 ));
236 }
237
238 let mut params: Vec<Value> =
239 Vec::with_capacity(local_values.len() + remote_values.len());
240 params.extend(local_values.iter().cloned());
241 params.extend(remote_values.iter().cloned());
242
243 let col_list = local_columns
244 .iter()
245 .chain(remote_columns.iter())
246 .map(|c| dialect.quote_identifier(c))
247 .collect::<Vec<_>>()
248 .join(", ");
249 let placeholders = (1..=params.len())
250 .map(|i| dialect.placeholder(i))
251 .collect::<Vec<_>>()
252 .join(", ");
253 let sql = format!(
254 "INSERT INTO {} ({}) VALUES ({})",
255 dialect.quote_identifier(table),
256 col_list,
257 placeholders
258 );
259 tracing::trace!(sql = %sql, "Executing link INSERT");
260 conn.execute(cx, &sql, ¶ms).await.map(|_| ())
261 }
262 LinkTableOp::Unlink {
263 table,
264 local_columns,
265 local_values,
266 remote_columns,
267 remote_values,
268 } => {
269 if local_columns.len() != local_values.len()
270 || remote_columns.len() != remote_values.len()
271 {
272 return Outcome::Err(Error::Custom(
273 "unlink op columns/values length mismatch".to_string(),
274 ));
275 }
276
277 let mut params: Vec<Value> =
278 Vec::with_capacity(local_values.len() + remote_values.len());
279 params.extend(local_values.iter().cloned());
280 params.extend(remote_values.iter().cloned());
281
282 let where_clause = local_columns
283 .iter()
284 .chain(remote_columns.iter())
285 .enumerate()
286 .map(|(i, c)| {
287 format!(
288 "{} = {}",
289 dialect.quote_identifier(c),
290 dialect.placeholder(i + 1)
291 )
292 })
293 .collect::<Vec<_>>()
294 .join(" AND ");
295 let sql = format!(
296 "DELETE FROM {} WHERE {}",
297 dialect.quote_identifier(table),
298 where_clause
299 );
300 tracing::trace!(sql = %sql, "Executing link DELETE");
301 conn.execute(cx, &sql, ¶ms).await.map(|_| ())
302 }
303 }
304 }
305}
306
307#[tracing::instrument(level = "debug", skip(cx, conn, ops))]
309pub async fn execute_link_table_ops<C: Connection>(
310 cx: &Cx,
311 conn: &C,
312 ops: &[LinkTableOp],
313) -> Outcome<usize, Error> {
314 if ops.is_empty() {
315 return Outcome::Ok(0);
316 }
317
318 tracing::info!(count = ops.len(), "Executing link table operations");
319
320 let mut count = 0;
321 for op in ops {
322 match op.execute(cx, conn).await {
323 Outcome::Ok(()) => count += 1,
324 Outcome::Err(e) => return Outcome::Err(e),
325 Outcome::Cancelled(r) => return Outcome::Cancelled(r),
326 Outcome::Panicked(p) => return Outcome::Panicked(p),
327 }
328 }
329
330 tracing::debug!(executed = count, "Link table operations complete");
331 Outcome::Ok(count)
332}
333
334impl PendingOp {
335 pub fn table(&self) -> &'static str {
337 match self {
338 PendingOp::Insert { table, .. } => table,
339 PendingOp::Update { table, .. } => table,
340 PendingOp::Delete { table, .. } => table,
341 }
342 }
343
344 pub fn key(&self) -> ObjectKey {
346 match self {
347 PendingOp::Insert { key, .. } => *key,
348 PendingOp::Update { key, .. } => *key,
349 PendingOp::Delete { key, .. } => *key,
350 }
351 }
352
353 pub fn is_insert(&self) -> bool {
355 matches!(self, PendingOp::Insert { .. })
356 }
357
358 pub fn is_update(&self) -> bool {
360 matches!(self, PendingOp::Update { .. })
361 }
362
363 pub fn is_delete(&self) -> bool {
365 matches!(self, PendingOp::Delete { .. })
366 }
367
368 pub fn to_sql(&self) -> String {
377 match self {
378 PendingOp::Insert {
379 table,
380 columns,
381 values,
382 ..
383 } => {
384 if columns.is_empty() {
385 return format!(
386 "-- ERROR: INSERT INTO {} with no columns",
387 quote_ident(table)
388 );
389 }
390 let col_list: String = columns
391 .iter()
392 .map(|c| quote_ident(c))
393 .collect::<Vec<_>>()
394 .join(", ");
395 let placeholders: Vec<String> =
396 (1..=values.len()).map(|i| format!("${}", i)).collect();
397 format!(
398 "INSERT INTO {} ({}) VALUES ({})",
399 quote_ident(table),
400 col_list,
401 placeholders.join(", ")
402 )
403 }
404 PendingOp::Delete {
405 table, pk_columns, ..
406 } => {
407 if pk_columns.is_empty() {
408 return format!(
409 "-- ERROR: DELETE FROM {} with no pk_columns",
410 quote_ident(table)
411 );
412 }
413 if pk_columns.len() == 1 {
414 format!(
415 "DELETE FROM {} WHERE {} IN ($1)",
416 quote_ident(table),
417 quote_ident(pk_columns[0])
418 )
419 } else {
420 let where_clause: String = pk_columns
421 .iter()
422 .enumerate()
423 .map(|(i, col)| format!("{} = ${}", quote_ident(col), i + 1))
424 .collect::<Vec<_>>()
425 .join(" AND ");
426 format!("DELETE FROM {} WHERE {}", quote_ident(table), where_clause)
427 }
428 }
429 PendingOp::Update {
430 table,
431 pk_columns,
432 set_columns,
433 ..
434 } => {
435 if pk_columns.is_empty() {
436 return format!("-- ERROR: UPDATE {} with no pk_columns", quote_ident(table));
437 }
438 if set_columns.is_empty() {
439 return format!(
440 "-- ERROR: UPDATE {} with no set_columns",
441 quote_ident(table)
442 );
443 }
444 let mut param_idx = 1;
445 let set_clause: String = set_columns
446 .iter()
447 .map(|col| {
448 let s = format!("{} = ${}", quote_ident(col), param_idx);
449 param_idx += 1;
450 s
451 })
452 .collect::<Vec<_>>()
453 .join(", ");
454 let where_clause: String = pk_columns
455 .iter()
456 .map(|col| {
457 let s = format!("{} = ${}", quote_ident(col), param_idx);
458 param_idx += 1;
459 s
460 })
461 .collect::<Vec<_>>()
462 .join(" AND ");
463 format!(
464 "UPDATE {} SET {} WHERE {}",
465 quote_ident(table),
466 set_clause,
467 where_clause
468 )
469 }
470 }
471 }
472}
473
474#[derive(Debug, Default)]
480pub struct FlushOrderer {
481 dependencies: HashMap<&'static str, Vec<&'static str>>,
483}
484
485impl FlushOrderer {
486 pub fn new() -> Self {
488 Self::default()
489 }
490
491 pub fn register_model<T: Model>(&mut self) {
495 let table = T::TABLE_NAME;
496 let deps: Vec<&'static str> = T::fields()
497 .iter()
498 .filter_map(|f| f.foreign_key)
499 .filter_map(|fk| fk.split('.').next())
500 .collect();
501 self.dependencies.insert(table, deps);
502 }
503
504 pub fn register_table(&mut self, table: &'static str, depends_on: Vec<&'static str>) {
506 self.dependencies.insert(table, depends_on);
507 }
508
509 fn dependency_count(&self, table: &str) -> usize {
511 self.dependencies.get(table).map_or(0, Vec::len)
512 }
513
514 pub fn order(&self, ops: Vec<PendingOp>) -> FlushPlan {
521 let mut deletes = Vec::new();
522 let mut inserts = Vec::new();
523 let mut updates = Vec::new();
524
525 for op in ops {
526 match op {
527 PendingOp::Delete { .. } => deletes.push(op),
528 PendingOp::Insert { .. } => inserts.push(op),
529 PendingOp::Update { .. } => updates.push(op),
530 }
531 }
532
533 deletes.sort_by(|a, b| {
535 let a_deps = self.dependency_count(a.table());
536 let b_deps = self.dependency_count(b.table());
537 b_deps.cmp(&a_deps)
538 });
539
540 inserts.sort_by(|a, b| {
542 let a_deps = self.dependency_count(a.table());
543 let b_deps = self.dependency_count(b.table());
544 a_deps.cmp(&b_deps)
545 });
546
547 FlushPlan {
548 deletes,
549 inserts,
550 updates,
551 }
552 }
553}
554
555#[derive(Debug, Default)]
557pub struct FlushPlan {
558 pub deletes: Vec<PendingOp>,
560 pub inserts: Vec<PendingOp>,
562 pub updates: Vec<PendingOp>,
564}
565
566impl FlushPlan {
567 pub fn new() -> Self {
569 Self::default()
570 }
571
572 pub fn is_empty(&self) -> bool {
574 self.deletes.is_empty() && self.inserts.is_empty() && self.updates.is_empty()
575 }
576
577 pub fn len(&self) -> usize {
579 self.deletes.len() + self.inserts.len() + self.updates.len()
580 }
581
582 #[tracing::instrument(level = "info", skip(self, cx, conn))]
584 pub async fn execute<C: Connection>(&self, cx: &Cx, conn: &C) -> Outcome<FlushResult, Error> {
585 tracing::info!(
586 deletes = self.deletes.len(),
587 inserts = self.inserts.len(),
588 updates = self.updates.len(),
589 "Executing flush plan"
590 );
591
592 let start = std::time::Instant::now();
593 let mut result = FlushResult::default();
594
595 for batch in Self::batch_by_table(&self.deletes) {
597 match Self::execute_delete_batch(cx, conn, &batch).await {
598 Outcome::Ok(count) => result.deleted += count,
599 Outcome::Err(e) => return Outcome::Err(e),
600 Outcome::Cancelled(r) => return Outcome::Cancelled(r),
601 Outcome::Panicked(p) => return Outcome::Panicked(p),
602 }
603 }
604
605 for batch in Self::batch_by_table(&self.inserts) {
607 match Self::execute_insert_batch(cx, conn, &batch).await {
608 Outcome::Ok(count) => result.inserted += count,
609 Outcome::Err(e) => return Outcome::Err(e),
610 Outcome::Cancelled(r) => return Outcome::Cancelled(r),
611 Outcome::Panicked(p) => return Outcome::Panicked(p),
612 }
613 }
614
615 for op in &self.updates {
617 match Self::execute_update(cx, conn, op).await {
618 Outcome::Ok(()) => result.updated += 1,
619 Outcome::Err(e) => return Outcome::Err(e),
620 Outcome::Cancelled(r) => return Outcome::Cancelled(r),
621 Outcome::Panicked(p) => return Outcome::Panicked(p),
622 }
623 }
624
625 tracing::info!(
626 elapsed_ms = start.elapsed().as_millis(),
627 inserted = result.inserted,
628 updated = result.updated,
629 deleted = result.deleted,
630 "Flush complete"
631 );
632
633 Outcome::Ok(result)
634 }
635
636 fn batch_by_table(ops: &[PendingOp]) -> Vec<Vec<&PendingOp>> {
638 if ops.is_empty() {
639 return Vec::new();
640 }
641
642 let mut batches: Vec<Vec<&PendingOp>> = Vec::new();
643 let mut current_table: Option<&'static str> = None;
644 let mut current_batch: Vec<&PendingOp> = Vec::new();
645
646 for op in ops {
647 let table = op.table();
648 if current_table == Some(table) {
649 current_batch.push(op);
650 } else {
651 if !current_batch.is_empty() {
652 batches.push(current_batch);
653 }
654 current_batch = vec![op];
655 current_table = Some(table);
656 }
657 }
658
659 if !current_batch.is_empty() {
660 batches.push(current_batch);
661 }
662
663 batches
664 }
665
666 #[allow(clippy::result_large_err)]
667 fn build_insert_batch_sql(
668 dialect: sqlmodel_core::Dialect,
669 ops: &[&PendingOp],
670 ) -> Result<(String, Vec<Value>), Error> {
671 let table = ops[0].table();
672 let PendingOp::Insert { columns, .. } = ops[0] else {
673 return Err(Error::Custom("expected insert operation".to_string()));
674 };
675
676 let col_list: String = columns
677 .iter()
678 .map(|c| dialect.quote_identifier(c))
679 .collect::<Vec<_>>()
680 .join(", ");
681
682 let mut sql = format!(
683 "INSERT INTO {} ({}) VALUES ",
684 dialect.quote_identifier(table),
685 col_list
686 );
687 let mut params: Vec<Value> = Vec::new();
688 let mut param_idx = 1;
689
690 for (i, op) in ops.iter().enumerate() {
691 let PendingOp::Insert {
692 columns: row_columns,
693 values,
694 ..
695 } = op
696 else {
697 return Err(Error::Custom(
698 "mixed operation kinds in insert batch".to_string(),
699 ));
700 };
701
702 if row_columns != columns {
703 return Err(Error::Custom(format!(
704 "inconsistent insert columns in flush batch for table {table}"
705 )));
706 }
707 if values.len() != columns.len() {
708 return Err(Error::Custom(format!(
709 "insert column/value length mismatch for table {table}: {} columns vs {} values",
710 columns.len(),
711 values.len()
712 )));
713 }
714
715 if i > 0 {
716 sql.push_str(", ");
717 }
718 let placeholders: Vec<String> = (0..values.len())
719 .map(|_| {
720 let p = dialect.placeholder(param_idx);
721 param_idx += 1;
722 p
723 })
724 .collect();
725 sql.push('(');
726 sql.push_str(&placeholders.join(", "));
727 sql.push(')');
728 params.extend(values.iter().cloned());
729 }
730
731 Ok((sql, params))
732 }
733
734 #[allow(clippy::result_large_err)]
735 fn build_delete_batch_sql(
736 dialect: sqlmodel_core::Dialect,
737 ops: &[&PendingOp],
738 ) -> Result<Option<(String, Vec<Value>, usize)>, Error> {
739 let table = ops[0].table();
740 let PendingOp::Delete { pk_columns, .. } = ops[0] else {
741 return Err(Error::Custom("expected delete operation".to_string()));
742 };
743
744 if pk_columns.is_empty() {
746 tracing::warn!(
747 table = table,
748 count = ops.len(),
749 "Skipping DELETE batch for table without primary key - cannot identify rows"
750 );
751 return Ok(None);
752 }
753
754 if pk_columns.len() == 1 {
755 let pk_col = pk_columns[0];
756 let mut params: Vec<Value> = Vec::new();
757 let placeholders: Vec<String> = ops
758 .iter()
759 .filter_map(|op| {
760 if let PendingOp::Delete {
761 pk_columns: row_pk_columns,
762 pk_values,
763 ..
764 } = op
765 {
766 if row_pk_columns != pk_columns {
767 return None;
768 }
769 if pk_values.len() != 1 {
770 return None;
771 }
772 params.push(pk_values[0].clone());
773 return Some(dialect.placeholder(params.len()));
774 }
775 None
776 })
777 .collect();
778
779 if placeholders.is_empty() {
780 return Ok(None);
781 }
782
783 let actual_count = params.len();
784 let sql = format!(
785 "DELETE FROM {} WHERE {} IN ({})",
786 dialect.quote_identifier(table),
787 dialect.quote_identifier(pk_col),
788 placeholders.join(", ")
789 );
790 return Ok(Some((sql, params, actual_count)));
791 }
792
793 Err(Error::Custom(
794 "composite delete batch must be handled per-row".to_string(),
795 ))
796 }
797
798 #[allow(clippy::result_large_err)]
799 fn build_update_sql(
800 dialect: sqlmodel_core::Dialect,
801 op: &PendingOp,
802 ) -> Result<Option<(String, Vec<Value>)>, Error> {
803 let PendingOp::Update {
804 table,
805 pk_columns,
806 pk_values,
807 set_columns,
808 set_values,
809 ..
810 } = op
811 else {
812 return Ok(None);
813 };
814
815 if pk_columns.is_empty() || pk_values.is_empty() {
817 tracing::warn!(
818 table = *table,
819 "Skipping UPDATE for row without primary key - cannot identify row"
820 );
821 return Ok(None);
822 }
823 if set_columns.is_empty() {
824 return Ok(None);
825 }
826
827 if pk_columns.len() != pk_values.len() {
828 return Err(Error::Custom(format!(
829 "update primary key column/value length mismatch for table {table}: {} columns vs {} values",
830 pk_columns.len(),
831 pk_values.len()
832 )));
833 }
834 if set_columns.len() != set_values.len() {
835 return Err(Error::Custom(format!(
836 "update set column/value length mismatch for table {table}: {} columns vs {} values",
837 set_columns.len(),
838 set_values.len()
839 )));
840 }
841
842 let mut param_idx = 1;
843 let set_clause: String = set_columns
844 .iter()
845 .map(|col| {
846 let clause = format!(
847 "{} = {}",
848 dialect.quote_identifier(col),
849 dialect.placeholder(param_idx)
850 );
851 param_idx += 1;
852 clause
853 })
854 .collect::<Vec<_>>()
855 .join(", ");
856
857 let where_clause: String = pk_columns
858 .iter()
859 .map(|col| {
860 let clause = format!(
861 "{} = {}",
862 dialect.quote_identifier(col),
863 dialect.placeholder(param_idx)
864 );
865 param_idx += 1;
866 clause
867 })
868 .collect::<Vec<_>>()
869 .join(" AND ");
870
871 let sql = format!(
872 "UPDATE {} SET {} WHERE {}",
873 dialect.quote_identifier(table),
874 set_clause,
875 where_clause
876 );
877
878 let mut params: Vec<Value> = set_values.clone();
879 params.extend(pk_values.iter().cloned());
880
881 Ok(Some((sql, params)))
882 }
883
884 #[tracing::instrument(level = "debug", skip(cx, conn, ops))]
886 async fn execute_insert_batch<C: Connection>(
887 cx: &Cx,
888 conn: &C,
889 ops: &[&PendingOp],
890 ) -> Outcome<usize, Error> {
891 if ops.is_empty() {
892 return Outcome::Ok(0);
893 }
894
895 let table = ops[0].table();
896 let PendingOp::Insert { .. } = ops[0] else {
897 return Outcome::Ok(0);
898 };
899
900 tracing::debug!(table = table, count = ops.len(), "Executing insert batch");
901 let dialect = conn.dialect();
902 let (sql, params) = match Self::build_insert_batch_sql(dialect, ops) {
903 Ok(v) => v,
904 Err(e) => return Outcome::Err(e),
905 };
906
907 match conn.execute(cx, &sql, ¶ms).await {
908 Outcome::Ok(_) => Outcome::Ok(ops.len()),
909 Outcome::Err(e) => Outcome::Err(e),
910 Outcome::Cancelled(r) => Outcome::Cancelled(r),
911 Outcome::Panicked(p) => Outcome::Panicked(p),
912 }
913 }
914
915 #[tracing::instrument(level = "debug", skip(cx, conn, ops))]
917 async fn execute_delete_batch<C: Connection>(
918 cx: &Cx,
919 conn: &C,
920 ops: &[&PendingOp],
921 ) -> Outcome<usize, Error> {
922 if ops.is_empty() {
923 return Outcome::Ok(0);
924 }
925
926 let table = ops[0].table();
927 let PendingOp::Delete { pk_columns, .. } = ops[0] else {
928 return Outcome::Ok(0);
929 };
930
931 if pk_columns.is_empty() {
933 tracing::warn!(
934 table = table,
935 count = ops.len(),
936 "Skipping DELETE batch for table without primary key - cannot identify rows"
937 );
938 return Outcome::Ok(0);
939 }
940
941 tracing::debug!(table = table, count = ops.len(), "Executing delete batch");
942 let dialect = conn.dialect();
943
944 if pk_columns.len() == 1 {
947 let (sql, params, actual_count) = match Self::build_delete_batch_sql(dialect, ops) {
948 Ok(Some(v)) => v,
949 Ok(None) => return Outcome::Ok(0),
950 Err(e) => return Outcome::Err(e),
951 };
952
953 match conn.execute(cx, &sql, ¶ms).await {
954 Outcome::Ok(_) => Outcome::Ok(actual_count),
957 Outcome::Err(e) => Outcome::Err(e),
958 Outcome::Cancelled(r) => Outcome::Cancelled(r),
959 Outcome::Panicked(p) => Outcome::Panicked(p),
960 }
961 } else {
962 let mut deleted = 0;
964 for op in ops {
965 if let PendingOp::Delete {
966 pk_columns,
967 pk_values,
968 ..
969 } = op
970 {
971 if pk_values.is_empty() {
973 tracing::warn!(
974 table = table,
975 "Skipping DELETE for row with empty primary key values"
976 );
977 continue;
978 }
979 if pk_values.len() != pk_columns.len() {
980 return Outcome::Err(Error::Custom(format!(
981 "delete primary key column/value length mismatch for table {table}: {} columns vs {} values",
982 pk_columns.len(),
983 pk_values.len()
984 )));
985 }
986
987 let where_clause: String = pk_columns
988 .iter()
989 .enumerate()
990 .map(|(i, col)| {
991 format!(
992 "{} = {}",
993 dialect.quote_identifier(col),
994 dialect.placeholder(i + 1)
995 )
996 })
997 .collect::<Vec<_>>()
998 .join(" AND ");
999
1000 let sql = format!(
1001 "DELETE FROM {} WHERE {}",
1002 dialect.quote_identifier(table),
1003 where_clause
1004 );
1005
1006 match conn.execute(cx, &sql, pk_values).await {
1007 Outcome::Ok(_) => deleted += 1,
1008 Outcome::Err(e) => return Outcome::Err(e),
1009 Outcome::Cancelled(r) => return Outcome::Cancelled(r),
1010 Outcome::Panicked(p) => return Outcome::Panicked(p),
1011 }
1012 }
1013 }
1014 Outcome::Ok(deleted)
1015 }
1016 }
1017
1018 #[tracing::instrument(level = "debug", skip(cx, conn, op))]
1020 async fn execute_update<C: Connection>(
1021 cx: &Cx,
1022 conn: &C,
1023 op: &PendingOp,
1024 ) -> Outcome<(), Error> {
1025 let PendingOp::Update { table, .. } = op else {
1026 return Outcome::Ok(());
1027 };
1028
1029 tracing::debug!(table = *table, "Executing update");
1030 let dialect = conn.dialect();
1031 let (sql, params) = match Self::build_update_sql(dialect, op) {
1032 Ok(Some(v)) => v,
1033 Ok(None) => return Outcome::Ok(()),
1034 Err(e) => return Outcome::Err(e),
1035 };
1036
1037 match conn.execute(cx, &sql, ¶ms).await {
1038 Outcome::Ok(_) => Outcome::Ok(()),
1039 Outcome::Err(e) => Outcome::Err(e),
1040 Outcome::Cancelled(r) => Outcome::Cancelled(r),
1041 Outcome::Panicked(p) => Outcome::Panicked(p),
1042 }
1043 }
1044}
1045
1046#[derive(Debug, Default, Clone, Copy)]
1048pub struct FlushResult {
1049 pub inserted: usize,
1051 pub updated: usize,
1053 pub deleted: usize,
1055}
1056
1057impl FlushResult {
1058 pub fn new() -> Self {
1060 Self::default()
1061 }
1062
1063 pub fn total(&self) -> usize {
1065 self.inserted + self.updated + self.deleted
1066 }
1067}
1068
1069#[cfg(test)]
1070mod tests {
1071 use super::*;
1072 use sqlmodel_core::{FieldInfo, Row};
1073 use std::any::TypeId;
1074
1075 struct Team;
1077 struct Hero;
1078
1079 impl Model for Team {
1080 const TABLE_NAME: &'static str = "teams";
1081 const PRIMARY_KEY: &'static [&'static str] = &["id"];
1082
1083 fn fields() -> &'static [FieldInfo] {
1084 static FIELDS: [FieldInfo; 1] =
1085 [FieldInfo::new("id", "id", sqlmodel_core::SqlType::BigInt)
1086 .primary_key(true)
1087 .auto_increment(true)];
1088 &FIELDS
1089 }
1090
1091 fn primary_key_value(&self) -> Vec<Value> {
1092 vec![]
1093 }
1094
1095 fn from_row(_row: &Row) -> Result<Self, sqlmodel_core::Error> {
1096 Ok(Team)
1097 }
1098
1099 fn to_row(&self) -> Vec<(&'static str, Value)> {
1100 vec![]
1101 }
1102
1103 fn is_new(&self) -> bool {
1104 true
1105 }
1106 }
1107
1108 impl Model for Hero {
1109 const TABLE_NAME: &'static str = "heroes";
1110 const PRIMARY_KEY: &'static [&'static str] = &["id"];
1111
1112 fn fields() -> &'static [FieldInfo] {
1113 static FIELDS: [FieldInfo; 2] = [
1114 FieldInfo::new("id", "id", sqlmodel_core::SqlType::BigInt)
1115 .primary_key(true)
1116 .auto_increment(true),
1117 FieldInfo::new("team_id", "team_id", sqlmodel_core::SqlType::BigInt)
1118 .nullable(true)
1119 .foreign_key("teams.id"),
1120 ];
1121 &FIELDS
1122 }
1123
1124 fn primary_key_value(&self) -> Vec<Value> {
1125 vec![]
1126 }
1127
1128 fn from_row(_row: &Row) -> Result<Self, sqlmodel_core::Error> {
1129 Ok(Hero)
1130 }
1131
1132 fn to_row(&self) -> Vec<(&'static str, Value)> {
1133 vec![]
1134 }
1135
1136 fn is_new(&self) -> bool {
1137 true
1138 }
1139 }
1140
1141 fn make_insert(table: &'static str, pk: i64) -> PendingOp {
1142 PendingOp::Insert {
1143 key: ObjectKey {
1144 type_id: TypeId::of::<()>(),
1145 pk_hash: pk as u64,
1146 },
1147 table,
1148 columns: vec!["id", "name"],
1149 values: vec![Value::BigInt(pk), Value::Text("Test".to_string())],
1150 }
1151 }
1152
1153 fn make_delete(table: &'static str, pk: i64) -> PendingOp {
1154 PendingOp::Delete {
1155 key: ObjectKey {
1156 type_id: TypeId::of::<()>(),
1157 pk_hash: pk as u64,
1158 },
1159 table,
1160 pk_columns: vec!["id"],
1161 pk_values: vec![Value::BigInt(pk)],
1162 }
1163 }
1164
1165 fn make_update(table: &'static str, pk: i64) -> PendingOp {
1166 PendingOp::Update {
1167 key: ObjectKey {
1168 type_id: TypeId::of::<()>(),
1169 pk_hash: pk as u64,
1170 },
1171 table,
1172 pk_columns: vec!["id"],
1173 pk_values: vec![Value::BigInt(pk)],
1174 set_columns: vec!["name"],
1175 set_values: vec![Value::Text("Updated".to_string())],
1176 }
1177 }
1178
1179 #[test]
1180 fn test_pending_op_table_accessor() {
1181 let insert = make_insert("teams", 1);
1182 assert_eq!(insert.table(), "teams");
1183
1184 let delete = make_delete("heroes", 2);
1185 assert_eq!(delete.table(), "heroes");
1186
1187 let update = make_update("teams", 3);
1188 assert_eq!(update.table(), "teams");
1189 }
1190
1191 #[test]
1192 fn test_pending_op_type_checks() {
1193 let insert = make_insert("teams", 1);
1194 assert!(insert.is_insert());
1195 assert!(!insert.is_update());
1196 assert!(!insert.is_delete());
1197
1198 let update = make_update("teams", 1);
1199 assert!(update.is_update());
1200 assert!(!update.is_insert());
1201 assert!(!update.is_delete());
1202
1203 let delete = make_delete("teams", 1);
1204 assert!(delete.is_delete());
1205 assert!(!delete.is_insert());
1206 assert!(!delete.is_update());
1207 }
1208
1209 #[test]
1210 fn test_orderer_simple_no_deps() {
1211 let orderer = FlushOrderer::new();
1212 let ops = vec![
1213 make_insert("teams", 1),
1214 make_insert("teams", 2),
1215 make_delete("teams", 3),
1216 ];
1217
1218 let plan = orderer.order(ops);
1219 assert_eq!(plan.inserts.len(), 2);
1220 assert_eq!(plan.deletes.len(), 1);
1221 assert_eq!(plan.updates.len(), 0);
1222 }
1223
1224 #[test]
1225 fn test_orderer_parent_child_inserts() {
1226 let mut orderer = FlushOrderer::new();
1227 orderer.register_model::<Team>();
1228 orderer.register_model::<Hero>();
1229
1230 let ops = vec![
1232 make_insert("heroes", 1), make_insert("teams", 1), ];
1235
1236 let plan = orderer.order(ops);
1237
1238 assert_eq!(plan.inserts[0].table(), "teams");
1240 assert_eq!(plan.inserts[1].table(), "heroes");
1241 }
1242
1243 #[test]
1244 fn test_orderer_parent_child_deletes() {
1245 let mut orderer = FlushOrderer::new();
1246 orderer.register_model::<Team>();
1247 orderer.register_model::<Hero>();
1248
1249 let ops = vec![
1251 make_delete("teams", 1), make_delete("heroes", 1), ];
1254
1255 let plan = orderer.order(ops);
1256
1257 assert_eq!(plan.deletes[0].table(), "heroes");
1259 assert_eq!(plan.deletes[1].table(), "teams");
1260 }
1261
1262 #[test]
1263 fn test_batch_by_table_groups_correctly() {
1264 let ops = vec![
1265 make_insert("teams", 1),
1266 make_insert("teams", 2),
1267 make_insert("heroes", 1),
1268 make_insert("heroes", 2),
1269 make_insert("teams", 3),
1270 ];
1271
1272 let batches = FlushPlan::batch_by_table(&ops);
1273
1274 assert_eq!(batches.len(), 3);
1276 assert_eq!(batches[0].len(), 2); assert_eq!(batches[1].len(), 2); assert_eq!(batches[2].len(), 1); }
1280
1281 #[test]
1282 fn test_batch_empty_returns_empty() {
1283 let ops: Vec<PendingOp> = vec![];
1284 let batches = FlushPlan::batch_by_table(&ops);
1285 assert!(batches.is_empty());
1286 }
1287
1288 #[test]
1289 fn test_flush_plan_is_empty() {
1290 let plan = FlushPlan::new();
1291 assert!(plan.is_empty());
1292 assert_eq!(plan.len(), 0);
1293 }
1294
1295 #[test]
1296 fn test_flush_plan_len() {
1297 let plan = FlushPlan {
1298 deletes: vec![make_delete("teams", 1)],
1299 inserts: vec![make_insert("teams", 1), make_insert("teams", 2)],
1300 updates: vec![make_update("teams", 1)],
1301 };
1302 assert!(!plan.is_empty());
1303 assert_eq!(plan.len(), 4);
1304 }
1305
1306 #[test]
1307 fn test_flush_result_total() {
1308 let result = FlushResult {
1309 inserted: 5,
1310 updated: 3,
1311 deleted: 2,
1312 };
1313 assert_eq!(result.total(), 10);
1314 }
1315
1316 #[test]
1317 fn test_flush_result_default() {
1318 let result = FlushResult::new();
1319 assert_eq!(result.inserted, 0);
1320 assert_eq!(result.updated, 0);
1321 assert_eq!(result.deleted, 0);
1322 assert_eq!(result.total(), 0);
1323 }
1324
1325 #[test]
1330 fn test_link_table_op_link_constructor() {
1331 let op = LinkTableOp::link(
1332 "hero_powers".to_string(),
1333 "hero_id".to_string(),
1334 Value::BigInt(1),
1335 "power_id".to_string(),
1336 Value::BigInt(5),
1337 );
1338
1339 match op {
1340 LinkTableOp::Link {
1341 table,
1342 local_columns,
1343 local_values,
1344 remote_columns,
1345 remote_values,
1346 } => {
1347 assert_eq!(table, "hero_powers");
1348 assert_eq!(local_columns, vec!["hero_id".to_string()]);
1349 assert_eq!(local_values, vec![Value::BigInt(1)]);
1350 assert_eq!(remote_columns, vec!["power_id".to_string()]);
1351 assert_eq!(remote_values, vec![Value::BigInt(5)]);
1352 }
1353 LinkTableOp::Unlink { .. } => std::panic::panic_any("Expected Link variant"),
1354 }
1355 }
1356
1357 #[test]
1358 fn test_link_table_op_unlink_constructor() {
1359 let op = LinkTableOp::unlink(
1360 "hero_powers".to_string(),
1361 "hero_id".to_string(),
1362 Value::BigInt(1),
1363 "power_id".to_string(),
1364 Value::BigInt(5),
1365 );
1366
1367 match op {
1368 LinkTableOp::Unlink {
1369 table,
1370 local_columns,
1371 local_values,
1372 remote_columns,
1373 remote_values,
1374 } => {
1375 assert_eq!(table, "hero_powers");
1376 assert_eq!(local_columns, vec!["hero_id".to_string()]);
1377 assert_eq!(local_values, vec![Value::BigInt(1)]);
1378 assert_eq!(remote_columns, vec!["power_id".to_string()]);
1379 assert_eq!(remote_values, vec![Value::BigInt(5)]);
1380 }
1381 LinkTableOp::Link { .. } => std::panic::panic_any("Expected Unlink variant"),
1382 }
1383 }
1384
1385 #[test]
1386 fn test_link_table_op_is_link() {
1387 let link = LinkTableOp::link(
1388 "t".to_string(),
1389 "a".to_string(),
1390 Value::BigInt(1),
1391 "b".to_string(),
1392 Value::BigInt(2),
1393 );
1394 let unlink = LinkTableOp::unlink(
1395 "t".to_string(),
1396 "a".to_string(),
1397 Value::BigInt(1),
1398 "b".to_string(),
1399 Value::BigInt(2),
1400 );
1401
1402 assert!(matches!(link, LinkTableOp::Link { .. }));
1403 assert!(matches!(unlink, LinkTableOp::Unlink { .. }));
1404 }
1405
1406 #[test]
1407 fn test_link_table_op_debug_format() {
1408 let link = LinkTableOp::link(
1409 "hero_powers".to_string(),
1410 "hero_id".to_string(),
1411 Value::BigInt(1),
1412 "power_id".to_string(),
1413 Value::BigInt(5),
1414 );
1415 let debug_str = format!("{:?}", link);
1416 assert!(debug_str.contains("Link"));
1417 assert!(debug_str.contains("hero_powers"));
1418 }
1419
1420 #[test]
1421 fn test_link_table_op_clone() {
1422 let op = LinkTableOp::link(
1423 "hero_powers".to_string(),
1424 "hero_id".to_string(),
1425 Value::BigInt(1),
1426 "power_id".to_string(),
1427 Value::BigInt(5),
1428 );
1429 let cloned = op.clone();
1430
1431 match (op, cloned) {
1432 (
1433 LinkTableOp::Link {
1434 table: t1,
1435 local_values: lv1,
1436 remote_values: rv1,
1437 ..
1438 },
1439 LinkTableOp::Link {
1440 table: t2,
1441 local_values: lv2,
1442 remote_values: rv2,
1443 ..
1444 },
1445 ) => {
1446 assert_eq!(t1, t2);
1447 assert_eq!(lv1, lv2);
1448 assert_eq!(rv1, rv2);
1449 }
1450 _ => std::panic::panic_any("Clone should preserve variant"),
1451 }
1452 }
1453
1454 #[test]
1455 fn test_link_table_ops_empty_vec() {
1456 let ops: Vec<LinkTableOp> = vec![];
1458 assert!(ops.is_empty());
1459 }
1460
1461 #[test]
1462 fn test_link_table_ops_multiple_operations() {
1463 let ops = [
1464 LinkTableOp::link(
1465 "hero_powers".to_string(),
1466 "hero_id".to_string(),
1467 Value::BigInt(1),
1468 "power_id".to_string(),
1469 Value::BigInt(1),
1470 ),
1471 LinkTableOp::link(
1472 "hero_powers".to_string(),
1473 "hero_id".to_string(),
1474 Value::BigInt(1),
1475 "power_id".to_string(),
1476 Value::BigInt(2),
1477 ),
1478 LinkTableOp::unlink(
1479 "hero_powers".to_string(),
1480 "hero_id".to_string(),
1481 Value::BigInt(1),
1482 "power_id".to_string(),
1483 Value::BigInt(3),
1484 ),
1485 ];
1486
1487 let links: Vec<_> = ops
1488 .iter()
1489 .filter(|o| matches!(o, LinkTableOp::Link { .. }))
1490 .collect();
1491 let unlinks: Vec<_> = ops
1492 .iter()
1493 .filter(|o| matches!(o, LinkTableOp::Unlink { .. }))
1494 .collect();
1495
1496 assert_eq!(links.len(), 2);
1497 assert_eq!(unlinks.len(), 1);
1498 }
1499
1500 #[test]
1501 fn test_link_table_op_with_different_value_types() {
1502 let op_str = LinkTableOp::link(
1504 "tag_items".to_string(),
1505 "tag_id".to_string(),
1506 Value::Text("tag-uuid-123".to_string()),
1507 "item_id".to_string(),
1508 Value::Text("item-uuid-456".to_string()),
1509 );
1510
1511 match op_str {
1512 LinkTableOp::Link {
1513 local_values,
1514 remote_values,
1515 ..
1516 } => {
1517 assert!(matches!(local_values.first(), Some(Value::Text(_))));
1518 assert!(matches!(remote_values.first(), Some(Value::Text(_))));
1519 }
1520 LinkTableOp::Unlink { .. } => std::panic::panic_any("Expected Link"),
1521 }
1522
1523 let op_int = LinkTableOp::link(
1525 "user_roles".to_string(),
1526 "user_id".to_string(),
1527 Value::Int(42),
1528 "role_id".to_string(),
1529 Value::Int(7),
1530 );
1531
1532 match op_int {
1533 LinkTableOp::Link {
1534 local_values,
1535 remote_values,
1536 ..
1537 } => {
1538 assert!(matches!(local_values.first(), Some(Value::Int(_))));
1539 assert!(matches!(remote_values.first(), Some(Value::Int(_))));
1540 }
1541 LinkTableOp::Unlink { .. } => std::panic::panic_any("Expected Link"),
1542 }
1543 }
1544
1545 fn make_custom_insert(table: &'static str, columns: Vec<&'static str>, pk: i64) -> PendingOp {
1551 PendingOp::Insert {
1552 key: ObjectKey {
1553 type_id: TypeId::of::<()>(),
1554 pk_hash: pk as u64,
1555 },
1556 table,
1557 columns,
1558 values: vec![Value::BigInt(pk), Value::Text("Test".to_string())],
1559 }
1560 }
1561
1562 fn make_custom_delete(
1564 table: &'static str,
1565 pk_columns: Vec<&'static str>,
1566 pk: i64,
1567 ) -> PendingOp {
1568 PendingOp::Delete {
1569 key: ObjectKey {
1570 type_id: TypeId::of::<()>(),
1571 pk_hash: pk as u64,
1572 },
1573 table,
1574 pk_columns,
1575 pk_values: vec![Value::BigInt(pk)],
1576 }
1577 }
1578
1579 fn make_custom_update(
1581 table: &'static str,
1582 pk_columns: Vec<&'static str>,
1583 set_columns: Vec<&'static str>,
1584 pk: i64,
1585 ) -> PendingOp {
1586 PendingOp::Update {
1587 key: ObjectKey {
1588 type_id: TypeId::of::<()>(),
1589 pk_hash: pk as u64,
1590 },
1591 table,
1592 pk_columns,
1593 pk_values: vec![Value::BigInt(pk)],
1594 set_columns,
1595 set_values: vec![Value::Text("Updated".to_string())],
1596 }
1597 }
1598
1599 #[test]
1602 fn test_link_table_op_to_sql_simple() {
1603 let op = LinkTableOp::link(
1604 "hero_powers".to_string(),
1605 "hero_id".to_string(),
1606 Value::BigInt(1),
1607 "power_id".to_string(),
1608 Value::BigInt(5),
1609 );
1610 let sql = op.to_sql();
1611 assert_eq!(
1612 sql,
1613 "INSERT INTO \"hero_powers\" (\"hero_id\", \"power_id\") VALUES ($1, $2)"
1614 );
1615 }
1616
1617 #[test]
1618 fn test_link_table_op_to_sql_with_keywords() {
1619 let op = LinkTableOp::link(
1620 "order".to_string(), "select".to_string(), Value::BigInt(1),
1623 "from".to_string(), Value::BigInt(2),
1625 );
1626 let sql = op.to_sql();
1627 assert_eq!(
1628 sql,
1629 "INSERT INTO \"order\" (\"select\", \"from\") VALUES ($1, $2)"
1630 );
1631 }
1632
1633 #[test]
1634 fn test_link_table_op_to_sql_with_embedded_quotes() {
1635 let op = LinkTableOp::link(
1636 "my\"table".to_string(),
1637 "col\"a".to_string(),
1638 Value::BigInt(1),
1639 "col\"b".to_string(),
1640 Value::BigInt(2),
1641 );
1642 let sql = op.to_sql();
1643 assert_eq!(
1644 sql,
1645 "INSERT INTO \"my\"\"table\" (\"col\"\"a\", \"col\"\"b\") VALUES ($1, $2)"
1646 );
1647 }
1648
1649 #[test]
1650 fn test_link_table_op_unlink_to_sql_with_keywords() {
1651 let op = LinkTableOp::unlink(
1652 "user".to_string(),
1653 "index".to_string(),
1654 Value::BigInt(1),
1655 "key".to_string(),
1656 Value::BigInt(2),
1657 );
1658 let sql = op.to_sql();
1659 assert_eq!(
1660 sql,
1661 "DELETE FROM \"user\" WHERE \"index\" = $1 AND \"key\" = $2"
1662 );
1663 }
1664
1665 #[test]
1666 fn test_link_table_op_to_sql_with_unicode() {
1667 let op = LinkTableOp::link(
1668 "用户表".to_string(),
1669 "用户id".to_string(),
1670 Value::BigInt(1),
1671 "角色id".to_string(),
1672 Value::BigInt(2),
1673 );
1674 let sql = op.to_sql();
1675 assert_eq!(
1676 sql,
1677 "INSERT INTO \"用户表\" (\"用户id\", \"角色id\") VALUES ($1, $2)"
1678 );
1679 }
1680
1681 #[test]
1682 fn test_link_table_op_to_sql_with_spaces() {
1683 let op = LinkTableOp::link(
1684 "link table".to_string(),
1685 "local id".to_string(),
1686 Value::BigInt(1),
1687 "remote id".to_string(),
1688 Value::BigInt(2),
1689 );
1690 let sql = op.to_sql();
1691 assert_eq!(
1692 sql,
1693 "INSERT INTO \"link table\" (\"local id\", \"remote id\") VALUES ($1, $2)"
1694 );
1695 }
1696
1697 #[test]
1700 fn test_pending_op_insert_to_sql_simple() {
1701 let op = make_insert("teams", 1);
1702 let sql = op.to_sql();
1703 assert!(sql.starts_with("INSERT INTO \"teams\""));
1704 assert!(sql.contains("(\"id\", \"name\")"));
1705 assert!(sql.contains("VALUES ($1, $2)"));
1706 }
1707
1708 #[test]
1709 fn test_pending_op_insert_to_sql_with_keyword_table() {
1710 let op = make_custom_insert("order", vec!["id", "select"], 1);
1711 let sql = op.to_sql();
1712 assert_eq!(
1713 sql,
1714 "INSERT INTO \"order\" (\"id\", \"select\") VALUES ($1, $2)"
1715 );
1716 }
1717
1718 #[test]
1719 fn test_pending_op_insert_to_sql_with_quoted_names() {
1720 let op = make_custom_insert("my\"table", vec!["pk\"id", "data\"col"], 1);
1721 let sql = op.to_sql();
1722 assert_eq!(
1723 sql,
1724 "INSERT INTO \"my\"\"table\" (\"pk\"\"id\", \"data\"\"col\") VALUES ($1, $2)"
1725 );
1726 }
1727
1728 #[test]
1731 fn test_pending_op_delete_to_sql_single_pk() {
1732 let op = make_delete("teams", 1);
1733 let sql = op.to_sql();
1734 assert_eq!(sql, "DELETE FROM \"teams\" WHERE \"id\" IN ($1)");
1735 }
1736
1737 #[test]
1738 fn test_pending_op_delete_to_sql_with_keyword_table() {
1739 let op = make_custom_delete("order", vec!["id"], 1);
1740 let sql = op.to_sql();
1741 assert_eq!(sql, "DELETE FROM \"order\" WHERE \"id\" IN ($1)");
1742 }
1743
1744 #[test]
1745 fn test_pending_op_delete_to_sql_composite_pk() {
1746 let op = PendingOp::Delete {
1747 key: ObjectKey {
1748 type_id: TypeId::of::<()>(),
1749 pk_hash: 1,
1750 },
1751 table: "order_items",
1752 pk_columns: vec!["order_id", "item_id"],
1753 pk_values: vec![Value::BigInt(1), Value::BigInt(2)],
1754 };
1755 let sql = op.to_sql();
1756 assert_eq!(
1757 sql,
1758 "DELETE FROM \"order_items\" WHERE \"order_id\" = $1 AND \"item_id\" = $2"
1759 );
1760 }
1761
1762 #[test]
1763 fn test_pending_op_delete_to_sql_with_keyword_pk_columns() {
1764 let op = PendingOp::Delete {
1765 key: ObjectKey {
1766 type_id: TypeId::of::<()>(),
1767 pk_hash: 1,
1768 },
1769 table: "user",
1770 pk_columns: vec!["select", "from"],
1771 pk_values: vec![Value::BigInt(1), Value::BigInt(2)],
1772 };
1773 let sql = op.to_sql();
1774 assert_eq!(
1775 sql,
1776 "DELETE FROM \"user\" WHERE \"select\" = $1 AND \"from\" = $2"
1777 );
1778 }
1779
1780 #[test]
1783 fn test_pending_op_update_to_sql_simple() {
1784 let op = make_update("teams", 1);
1785 let sql = op.to_sql();
1786 assert_eq!(sql, "UPDATE \"teams\" SET \"name\" = $1 WHERE \"id\" = $2");
1787 }
1788
1789 #[test]
1790 fn test_pending_op_update_to_sql_with_keyword_names() {
1791 let op = make_custom_update("order", vec!["id"], vec!["select", "from"], 1);
1792 let sql = op.to_sql();
1793 assert_eq!(
1794 sql,
1795 "UPDATE \"order\" SET \"select\" = $1, \"from\" = $2 WHERE \"id\" = $3"
1796 );
1797 }
1798
1799 #[test]
1800 fn test_pending_op_update_to_sql_with_quoted_names() {
1801 let op = make_custom_update("my\"table", vec!["pk\"id"], vec!["data\"col"], 1);
1802 let sql = op.to_sql();
1803 assert_eq!(
1804 sql,
1805 "UPDATE \"my\"\"table\" SET \"data\"\"col\" = $1 WHERE \"pk\"\"id\" = $2"
1806 );
1807 }
1808
1809 #[test]
1810 fn test_pending_op_update_to_sql_composite_pk() {
1811 let op = PendingOp::Update {
1812 key: ObjectKey {
1813 type_id: TypeId::of::<()>(),
1814 pk_hash: 1,
1815 },
1816 table: "order_items",
1817 pk_columns: vec!["order_id", "item_id"],
1818 pk_values: vec![Value::BigInt(1), Value::BigInt(2)],
1819 set_columns: vec!["quantity"],
1820 set_values: vec![Value::Int(5)],
1821 };
1822 let sql = op.to_sql();
1823 assert_eq!(
1824 sql,
1825 "UPDATE \"order_items\" SET \"quantity\" = $1 WHERE \"order_id\" = $2 AND \"item_id\" = $3"
1826 );
1827 }
1828
1829 #[test]
1832 fn test_link_op_sql_injection_neutralized() {
1833 let op = LinkTableOp::link(
1835 "links\"; DROP TABLE users; --".to_string(),
1836 "col1".to_string(),
1837 Value::BigInt(1),
1838 "col2".to_string(),
1839 Value::BigInt(2),
1840 );
1841 let sql = op.to_sql();
1842 assert!(sql.contains("\"links\"\"; DROP TABLE users; --\""));
1844 assert!(sql.starts_with("INSERT INTO \""));
1846 }
1847
1848 #[test]
1849 fn test_pending_op_insert_sql_injection_neutralized() {
1850 let op = make_custom_insert("users\"; DROP TABLE secrets; --", vec!["id", "name"], 1);
1851 let sql = op.to_sql();
1852 assert!(sql.contains("\"users\"\"; DROP TABLE secrets; --\""));
1854 assert!(sql.starts_with("INSERT INTO \""));
1855 }
1856
1857 #[test]
1858 fn test_pending_op_update_sql_injection_neutralized() {
1859 let op = make_custom_update("data", vec!["id"], vec!["col\"; DROP TABLE data; --"], 1);
1860 let sql = op.to_sql();
1861 assert!(sql.contains("\"col\"\"; DROP TABLE data; --\""));
1863 }
1864
1865 #[test]
1868 fn test_pending_op_insert_many_columns() {
1869 let op = PendingOp::Insert {
1870 key: ObjectKey {
1871 type_id: TypeId::of::<()>(),
1872 pk_hash: 1,
1873 },
1874 table: "wide_table",
1875 columns: vec!["a", "b", "c", "d", "e"],
1876 values: vec![
1877 Value::Int(1),
1878 Value::Int(2),
1879 Value::Int(3),
1880 Value::Int(4),
1881 Value::Int(5),
1882 ],
1883 };
1884 let sql = op.to_sql();
1885 assert_eq!(
1886 sql,
1887 "INSERT INTO \"wide_table\" (\"a\", \"b\", \"c\", \"d\", \"e\") VALUES ($1, $2, $3, $4, $5)"
1888 );
1889 }
1890
1891 #[test]
1892 fn test_pending_op_update_many_set_columns() {
1893 let op = PendingOp::Update {
1894 key: ObjectKey {
1895 type_id: TypeId::of::<()>(),
1896 pk_hash: 1,
1897 },
1898 table: "items",
1899 pk_columns: vec!["id"],
1900 pk_values: vec![Value::BigInt(1)],
1901 set_columns: vec!["a", "b", "c"],
1902 set_values: vec![Value::Int(1), Value::Int(2), Value::Int(3)],
1903 };
1904 let sql = op.to_sql();
1905 assert_eq!(
1906 sql,
1907 "UPDATE \"items\" SET \"a\" = $1, \"b\" = $2, \"c\" = $3 WHERE \"id\" = $4"
1908 );
1909 }
1910
1911 #[test]
1912 fn test_link_table_op_empty_strings() {
1913 let op = LinkTableOp::link(
1915 String::new(),
1916 String::new(),
1917 Value::BigInt(1),
1918 String::new(),
1919 Value::BigInt(2),
1920 );
1921 let sql = op.to_sql();
1922 assert_eq!(sql, "INSERT INTO \"\" (\"\", \"\") VALUES ($1, $2)");
1923 }
1924
1925 #[test]
1928 fn test_pending_op_delete_empty_pk_columns() {
1929 let op = PendingOp::Delete {
1930 key: ObjectKey {
1931 type_id: TypeId::of::<()>(),
1932 pk_hash: 1,
1933 },
1934 table: "orphan_table",
1935 pk_columns: vec![], pk_values: vec![],
1937 };
1938 let sql = op.to_sql();
1939 assert!(sql.starts_with("-- ERROR:"));
1941 assert!(sql.contains("DELETE"));
1942 assert!(sql.contains("no pk_columns"));
1943 }
1944
1945 #[test]
1946 fn test_pending_op_update_empty_pk_columns() {
1947 let op = PendingOp::Update {
1948 key: ObjectKey {
1949 type_id: TypeId::of::<()>(),
1950 pk_hash: 1,
1951 },
1952 table: "orphan_table",
1953 pk_columns: vec![], pk_values: vec![],
1955 set_columns: vec!["name"],
1956 set_values: vec![Value::Text("test".to_string())],
1957 };
1958 let sql = op.to_sql();
1959 assert!(sql.starts_with("-- ERROR:"));
1961 assert!(sql.contains("UPDATE"));
1962 assert!(sql.contains("no pk_columns"));
1963 }
1964
1965 #[test]
1966 fn test_pending_op_update_empty_set_columns() {
1967 let op = PendingOp::Update {
1968 key: ObjectKey {
1969 type_id: TypeId::of::<()>(),
1970 pk_hash: 1,
1971 },
1972 table: "nothing_to_update",
1973 pk_columns: vec!["id"],
1974 pk_values: vec![Value::BigInt(1)],
1975 set_columns: vec![], set_values: vec![],
1977 };
1978 let sql = op.to_sql();
1979 assert!(sql.starts_with("-- ERROR:"));
1981 assert!(sql.contains("UPDATE"));
1982 assert!(sql.contains("no set_columns"));
1983 }
1984
1985 #[test]
1986 fn test_pending_op_insert_empty_columns() {
1987 let op = PendingOp::Insert {
1988 key: ObjectKey {
1989 type_id: TypeId::of::<()>(),
1990 pk_hash: 1,
1991 },
1992 table: "empty_insert",
1993 columns: vec![], values: vec![],
1995 };
1996 let sql = op.to_sql();
1997 assert!(sql.starts_with("-- ERROR:"));
1999 assert!(sql.contains("INSERT"));
2000 assert!(sql.contains("no columns"));
2001 }
2002
2003 #[test]
2004 fn test_build_insert_batch_sql_mysql_dialect() {
2005 let ops = [make_insert("teams", 1), make_insert("teams", 2)];
2006 let refs: Vec<&PendingOp> = ops.iter().collect();
2007 let (sql, params) = FlushPlan::build_insert_batch_sql(sqlmodel_core::Dialect::Mysql, &refs)
2008 .expect("build insert batch sql");
2009
2010 assert_eq!(
2011 sql,
2012 "INSERT INTO `teams` (`id`, `name`) VALUES (?, ?), (?, ?)"
2013 );
2014 assert_eq!(params.len(), 4);
2015 }
2016
2017 #[test]
2018 fn test_build_delete_batch_sql_sqlite_dialect() {
2019 let ops = [make_delete("heroes", 1), make_delete("heroes", 2)];
2020 let refs: Vec<&PendingOp> = ops.iter().collect();
2021 let built = FlushPlan::build_delete_batch_sql(sqlmodel_core::Dialect::Sqlite, &refs)
2022 .expect("build delete batch sql")
2023 .expect("non-empty delete sql");
2024
2025 assert_eq!(built.0, "DELETE FROM \"heroes\" WHERE \"id\" IN (?1, ?2)");
2026 assert_eq!(built.1.len(), 2);
2027 assert_eq!(built.2, 2);
2028 }
2029
2030 #[test]
2031 fn test_build_update_sql_mysql_dialect() {
2032 let op = make_update("teams", 42);
2033 let (sql, params) = FlushPlan::build_update_sql(sqlmodel_core::Dialect::Mysql, &op)
2034 .expect("build update sql")
2035 .expect("non-empty update sql");
2036
2037 assert_eq!(sql, "UPDATE `teams` SET `name` = ? WHERE `id` = ?");
2038 assert_eq!(params.len(), 2);
2039 }
2040
2041 #[test]
2042 fn test_build_update_sql_rejects_set_mismatch() {
2043 let op = PendingOp::Update {
2044 key: ObjectKey {
2045 type_id: TypeId::of::<()>(),
2046 pk_hash: 1,
2047 },
2048 table: "teams",
2049 pk_columns: vec!["id"],
2050 pk_values: vec![Value::BigInt(1)],
2051 set_columns: vec!["name", "active"],
2052 set_values: vec![Value::Text("A".to_string())],
2053 };
2054
2055 let err = FlushPlan::build_update_sql(sqlmodel_core::Dialect::Postgres, &op)
2056 .expect_err("expected set mismatch error");
2057 assert!(
2058 err.to_string()
2059 .contains("update set column/value length mismatch")
2060 );
2061 }
2062}