1use crate::ObjectKey;
11use asupersync::{Cx, Outcome};
12use sqlmodel_core::{Connection, Error, Model, Value, quote_ident};
13use std::collections::HashMap;
14
15#[derive(Debug, Clone)]
17pub enum PendingOp {
18 Insert {
20 key: ObjectKey,
22 table: &'static str,
24 columns: Vec<&'static str>,
26 values: Vec<Value>,
28 },
29 Update {
31 key: ObjectKey,
33 table: &'static str,
35 pk_columns: Vec<&'static str>,
37 pk_values: Vec<Value>,
39 set_columns: Vec<&'static str>,
41 set_values: Vec<Value>,
43 },
44 Delete {
46 key: ObjectKey,
48 table: &'static str,
50 pk_columns: Vec<&'static str>,
52 pk_values: Vec<Value>,
54 },
55}
56
57#[derive(Debug, Clone)]
59pub enum LinkTableOp {
60 Link {
62 table: String,
64 local_column: String,
66 local_value: Value,
68 remote_column: String,
70 remote_value: Value,
72 },
73 Unlink {
75 table: String,
77 local_column: String,
79 local_value: Value,
81 remote_column: String,
83 remote_value: Value,
85 },
86}
87
88impl LinkTableOp {
89 pub fn link(
91 table: impl Into<String>,
92 local_column: impl Into<String>,
93 local_value: Value,
94 remote_column: impl Into<String>,
95 remote_value: Value,
96 ) -> Self {
97 Self::Link {
98 table: table.into(),
99 local_column: local_column.into(),
100 local_value,
101 remote_column: remote_column.into(),
102 remote_value,
103 }
104 }
105
106 pub fn unlink(
108 table: impl Into<String>,
109 local_column: impl Into<String>,
110 local_value: Value,
111 remote_column: impl Into<String>,
112 remote_value: Value,
113 ) -> Self {
114 Self::Unlink {
115 table: table.into(),
116 local_column: local_column.into(),
117 local_value,
118 remote_column: remote_column.into(),
119 remote_value,
120 }
121 }
122
123 pub fn table(&self) -> &str {
125 match self {
126 LinkTableOp::Link { table, .. } => table,
127 LinkTableOp::Unlink { table, .. } => table,
128 }
129 }
130
131 pub fn is_link(&self) -> bool {
133 matches!(self, LinkTableOp::Link { .. })
134 }
135
136 pub fn is_unlink(&self) -> bool {
138 matches!(self, LinkTableOp::Unlink { .. })
139 }
140
141 pub fn to_sql(&self) -> String {
145 match self {
146 LinkTableOp::Link {
147 table,
148 local_column,
149 remote_column,
150 ..
151 } => format!(
152 "INSERT INTO {} ({}, {}) VALUES ($1, $2)",
153 quote_ident(table),
154 quote_ident(local_column),
155 quote_ident(remote_column)
156 ),
157 LinkTableOp::Unlink {
158 table,
159 local_column,
160 remote_column,
161 ..
162 } => format!(
163 "DELETE FROM {} WHERE {} = $1 AND {} = $2",
164 quote_ident(table),
165 quote_ident(local_column),
166 quote_ident(remote_column)
167 ),
168 }
169 }
170
171 #[tracing::instrument(level = "debug", skip(cx, conn))]
173 pub async fn execute<C: Connection>(&self, cx: &Cx, conn: &C) -> Outcome<(), Error> {
174 match self {
175 LinkTableOp::Link {
176 table,
177 local_column,
178 local_value,
179 remote_column,
180 remote_value,
181 } => {
182 let sql = format!(
183 "INSERT INTO {} ({}, {}) VALUES ($1, $2)",
184 quote_ident(table),
185 quote_ident(local_column),
186 quote_ident(remote_column)
187 );
188 tracing::trace!(sql = %sql, "Executing link INSERT");
189 conn.execute(cx, &sql, &[local_value.clone(), remote_value.clone()])
190 .await
191 .map(|_| ())
192 }
193 LinkTableOp::Unlink {
194 table,
195 local_column,
196 local_value,
197 remote_column,
198 remote_value,
199 } => {
200 let sql = format!(
201 "DELETE FROM {} WHERE {} = $1 AND {} = $2",
202 quote_ident(table),
203 quote_ident(local_column),
204 quote_ident(remote_column)
205 );
206 tracing::trace!(sql = %sql, "Executing link DELETE");
207 conn.execute(cx, &sql, &[local_value.clone(), remote_value.clone()])
208 .await
209 .map(|_| ())
210 }
211 }
212 }
213}
214
215#[tracing::instrument(level = "debug", skip(cx, conn, ops))]
217pub async fn execute_link_table_ops<C: Connection>(
218 cx: &Cx,
219 conn: &C,
220 ops: &[LinkTableOp],
221) -> Outcome<usize, Error> {
222 if ops.is_empty() {
223 return Outcome::Ok(0);
224 }
225
226 tracing::info!(count = ops.len(), "Executing link table operations");
227
228 let mut count = 0;
229 for op in ops {
230 match op.execute(cx, conn).await {
231 Outcome::Ok(()) => count += 1,
232 Outcome::Err(e) => return Outcome::Err(e),
233 Outcome::Cancelled(r) => return Outcome::Cancelled(r),
234 Outcome::Panicked(p) => return Outcome::Panicked(p),
235 }
236 }
237
238 tracing::debug!(executed = count, "Link table operations complete");
239 Outcome::Ok(count)
240}
241
242impl PendingOp {
243 pub fn table(&self) -> &'static str {
245 match self {
246 PendingOp::Insert { table, .. } => table,
247 PendingOp::Update { table, .. } => table,
248 PendingOp::Delete { table, .. } => table,
249 }
250 }
251
252 pub fn key(&self) -> ObjectKey {
254 match self {
255 PendingOp::Insert { key, .. } => *key,
256 PendingOp::Update { key, .. } => *key,
257 PendingOp::Delete { key, .. } => *key,
258 }
259 }
260
261 pub fn is_insert(&self) -> bool {
263 matches!(self, PendingOp::Insert { .. })
264 }
265
266 pub fn is_update(&self) -> bool {
268 matches!(self, PendingOp::Update { .. })
269 }
270
271 pub fn is_delete(&self) -> bool {
273 matches!(self, PendingOp::Delete { .. })
274 }
275
276 pub fn to_sql(&self) -> String {
285 match self {
286 PendingOp::Insert {
287 table,
288 columns,
289 values,
290 ..
291 } => {
292 if columns.is_empty() {
293 return format!(
294 "-- ERROR: INSERT INTO {} with no columns",
295 quote_ident(table)
296 );
297 }
298 let col_list: String = columns
299 .iter()
300 .map(|c| quote_ident(c))
301 .collect::<Vec<_>>()
302 .join(", ");
303 let placeholders: Vec<String> =
304 (1..=values.len()).map(|i| format!("${}", i)).collect();
305 format!(
306 "INSERT INTO {} ({}) VALUES ({})",
307 quote_ident(table),
308 col_list,
309 placeholders.join(", ")
310 )
311 }
312 PendingOp::Delete {
313 table, pk_columns, ..
314 } => {
315 if pk_columns.is_empty() {
316 return format!(
317 "-- ERROR: DELETE FROM {} with no pk_columns",
318 quote_ident(table)
319 );
320 }
321 if pk_columns.len() == 1 {
322 format!(
323 "DELETE FROM {} WHERE {} IN ($1)",
324 quote_ident(table),
325 quote_ident(pk_columns[0])
326 )
327 } else {
328 let where_clause: String = pk_columns
329 .iter()
330 .enumerate()
331 .map(|(i, col)| format!("{} = ${}", quote_ident(col), i + 1))
332 .collect::<Vec<_>>()
333 .join(" AND ");
334 format!("DELETE FROM {} WHERE {}", quote_ident(table), where_clause)
335 }
336 }
337 PendingOp::Update {
338 table,
339 pk_columns,
340 set_columns,
341 ..
342 } => {
343 if pk_columns.is_empty() {
344 return format!("-- ERROR: UPDATE {} with no pk_columns", quote_ident(table));
345 }
346 if set_columns.is_empty() {
347 return format!(
348 "-- ERROR: UPDATE {} with no set_columns",
349 quote_ident(table)
350 );
351 }
352 let mut param_idx = 1;
353 let set_clause: String = set_columns
354 .iter()
355 .map(|col| {
356 let s = format!("{} = ${}", quote_ident(col), param_idx);
357 param_idx += 1;
358 s
359 })
360 .collect::<Vec<_>>()
361 .join(", ");
362 let where_clause: String = pk_columns
363 .iter()
364 .map(|col| {
365 let s = format!("{} = ${}", quote_ident(col), param_idx);
366 param_idx += 1;
367 s
368 })
369 .collect::<Vec<_>>()
370 .join(" AND ");
371 format!(
372 "UPDATE {} SET {} WHERE {}",
373 quote_ident(table),
374 set_clause,
375 where_clause
376 )
377 }
378 }
379 }
380}
381
382#[derive(Debug, Default)]
388pub struct FlushOrderer {
389 dependencies: HashMap<&'static str, Vec<&'static str>>,
391}
392
393impl FlushOrderer {
394 pub fn new() -> Self {
396 Self::default()
397 }
398
399 pub fn register_model<T: Model>(&mut self) {
403 let table = T::TABLE_NAME;
404 let deps: Vec<&'static str> = T::fields()
405 .iter()
406 .filter_map(|f| f.foreign_key)
407 .filter_map(|fk| fk.split('.').next())
408 .collect();
409 self.dependencies.insert(table, deps);
410 }
411
412 pub fn register_table(&mut self, table: &'static str, depends_on: Vec<&'static str>) {
414 self.dependencies.insert(table, depends_on);
415 }
416
417 fn dependency_count(&self, table: &str) -> usize {
419 self.dependencies.get(table).map_or(0, Vec::len)
420 }
421
422 pub fn order(&self, ops: Vec<PendingOp>) -> FlushPlan {
429 let mut deletes = Vec::new();
430 let mut inserts = Vec::new();
431 let mut updates = Vec::new();
432
433 for op in ops {
434 match op {
435 PendingOp::Delete { .. } => deletes.push(op),
436 PendingOp::Insert { .. } => inserts.push(op),
437 PendingOp::Update { .. } => updates.push(op),
438 }
439 }
440
441 deletes.sort_by(|a, b| {
443 let a_deps = self.dependency_count(a.table());
444 let b_deps = self.dependency_count(b.table());
445 b_deps.cmp(&a_deps)
446 });
447
448 inserts.sort_by(|a, b| {
450 let a_deps = self.dependency_count(a.table());
451 let b_deps = self.dependency_count(b.table());
452 a_deps.cmp(&b_deps)
453 });
454
455 FlushPlan {
456 deletes,
457 inserts,
458 updates,
459 }
460 }
461}
462
463#[derive(Debug, Default)]
465pub struct FlushPlan {
466 pub deletes: Vec<PendingOp>,
468 pub inserts: Vec<PendingOp>,
470 pub updates: Vec<PendingOp>,
472}
473
474impl FlushPlan {
475 pub fn new() -> Self {
477 Self::default()
478 }
479
480 pub fn is_empty(&self) -> bool {
482 self.deletes.is_empty() && self.inserts.is_empty() && self.updates.is_empty()
483 }
484
485 pub fn len(&self) -> usize {
487 self.deletes.len() + self.inserts.len() + self.updates.len()
488 }
489
490 #[tracing::instrument(level = "info", skip(self, cx, conn))]
492 pub async fn execute<C: Connection>(&self, cx: &Cx, conn: &C) -> Outcome<FlushResult, Error> {
493 tracing::info!(
494 deletes = self.deletes.len(),
495 inserts = self.inserts.len(),
496 updates = self.updates.len(),
497 "Executing flush plan"
498 );
499
500 let start = std::time::Instant::now();
501 let mut result = FlushResult::default();
502
503 for batch in Self::batch_by_table(&self.deletes) {
505 match Self::execute_delete_batch(cx, conn, &batch).await {
506 Outcome::Ok(count) => result.deleted += count,
507 Outcome::Err(e) => return Outcome::Err(e),
508 Outcome::Cancelled(r) => return Outcome::Cancelled(r),
509 Outcome::Panicked(p) => return Outcome::Panicked(p),
510 }
511 }
512
513 for batch in Self::batch_by_table(&self.inserts) {
515 match Self::execute_insert_batch(cx, conn, &batch).await {
516 Outcome::Ok(count) => result.inserted += count,
517 Outcome::Err(e) => return Outcome::Err(e),
518 Outcome::Cancelled(r) => return Outcome::Cancelled(r),
519 Outcome::Panicked(p) => return Outcome::Panicked(p),
520 }
521 }
522
523 for op in &self.updates {
525 match Self::execute_update(cx, conn, op).await {
526 Outcome::Ok(()) => result.updated += 1,
527 Outcome::Err(e) => return Outcome::Err(e),
528 Outcome::Cancelled(r) => return Outcome::Cancelled(r),
529 Outcome::Panicked(p) => return Outcome::Panicked(p),
530 }
531 }
532
533 tracing::info!(
534 elapsed_ms = start.elapsed().as_millis(),
535 inserted = result.inserted,
536 updated = result.updated,
537 deleted = result.deleted,
538 "Flush complete"
539 );
540
541 Outcome::Ok(result)
542 }
543
544 fn batch_by_table(ops: &[PendingOp]) -> Vec<Vec<&PendingOp>> {
546 if ops.is_empty() {
547 return Vec::new();
548 }
549
550 let mut batches: Vec<Vec<&PendingOp>> = Vec::new();
551 let mut current_table: Option<&'static str> = None;
552 let mut current_batch: Vec<&PendingOp> = Vec::new();
553
554 for op in ops {
555 let table = op.table();
556 if current_table == Some(table) {
557 current_batch.push(op);
558 } else {
559 if !current_batch.is_empty() {
560 batches.push(current_batch);
561 }
562 current_batch = vec![op];
563 current_table = Some(table);
564 }
565 }
566
567 if !current_batch.is_empty() {
568 batches.push(current_batch);
569 }
570
571 batches
572 }
573
574 #[tracing::instrument(level = "debug", skip(cx, conn, ops))]
576 async fn execute_insert_batch<C: Connection>(
577 cx: &Cx,
578 conn: &C,
579 ops: &[&PendingOp],
580 ) -> Outcome<usize, Error> {
581 if ops.is_empty() {
582 return Outcome::Ok(0);
583 }
584
585 let table = ops[0].table();
586 let PendingOp::Insert { columns, .. } = ops[0] else {
587 return Outcome::Ok(0);
588 };
589
590 tracing::debug!(table = table, count = ops.len(), "Executing insert batch");
591
592 let col_list: String = columns
595 .iter()
596 .map(|c| quote_ident(c))
597 .collect::<Vec<_>>()
598 .join(", ");
599
600 let mut sql = format!("INSERT INTO {} ({}) VALUES ", quote_ident(table), col_list);
601 let mut params: Vec<Value> = Vec::new();
602 let mut param_idx = 1;
603
604 for (i, op) in ops.iter().enumerate() {
605 if let PendingOp::Insert { values, .. } = op {
606 if i > 0 {
607 sql.push_str(", ");
608 }
609 let placeholders: Vec<String> = (0..values.len())
610 .map(|_| {
611 let p = format!("${}", param_idx);
612 param_idx += 1;
613 p
614 })
615 .collect();
616 sql.push('(');
617 sql.push_str(&placeholders.join(", "));
618 sql.push(')');
619 params.extend(values.iter().cloned());
620 }
621 }
622
623 match conn.execute(cx, &sql, ¶ms).await {
624 Outcome::Ok(_) => Outcome::Ok(ops.len()),
625 Outcome::Err(e) => Outcome::Err(e),
626 Outcome::Cancelled(r) => Outcome::Cancelled(r),
627 Outcome::Panicked(p) => Outcome::Panicked(p),
628 }
629 }
630
631 #[tracing::instrument(level = "debug", skip(cx, conn, ops))]
633 async fn execute_delete_batch<C: Connection>(
634 cx: &Cx,
635 conn: &C,
636 ops: &[&PendingOp],
637 ) -> Outcome<usize, Error> {
638 if ops.is_empty() {
639 return Outcome::Ok(0);
640 }
641
642 let table = ops[0].table();
643 let PendingOp::Delete { pk_columns, .. } = ops[0] else {
644 return Outcome::Ok(0);
645 };
646
647 if pk_columns.is_empty() {
649 tracing::warn!(
650 table = table,
651 count = ops.len(),
652 "Skipping DELETE batch for table without primary key - cannot identify rows"
653 );
654 return Outcome::Ok(0);
655 }
656
657 tracing::debug!(table = table, count = ops.len(), "Executing delete batch");
658
659 if pk_columns.len() == 1 {
662 let pk_col = pk_columns[0];
663 let mut params: Vec<Value> = Vec::new();
664 let placeholders: Vec<String> = ops
665 .iter()
666 .filter_map(|op| {
667 if let PendingOp::Delete { pk_values, .. } = op {
668 if let Some(pk) = pk_values.first() {
669 params.push(pk.clone());
670 return Some(format!("${}", params.len()));
672 }
673 }
674 None
675 })
676 .collect();
677
678 if placeholders.is_empty() {
679 return Outcome::Ok(0);
680 }
681
682 let actual_count = params.len();
683 let sql = format!(
684 "DELETE FROM {} WHERE {} IN ({})",
685 quote_ident(table),
686 quote_ident(pk_col),
687 placeholders.join(", ")
688 );
689
690 match conn.execute(cx, &sql, ¶ms).await {
691 Outcome::Ok(_) => Outcome::Ok(actual_count),
694 Outcome::Err(e) => Outcome::Err(e),
695 Outcome::Cancelled(r) => Outcome::Cancelled(r),
696 Outcome::Panicked(p) => Outcome::Panicked(p),
697 }
698 } else {
699 let mut deleted = 0;
701 for op in ops {
702 if let PendingOp::Delete {
703 pk_columns,
704 pk_values,
705 ..
706 } = op
707 {
708 if pk_values.is_empty() {
710 tracing::warn!(
711 table = table,
712 "Skipping DELETE for row with empty primary key values"
713 );
714 continue;
715 }
716
717 let where_clause: String = pk_columns
718 .iter()
719 .enumerate()
720 .map(|(i, col)| format!("{} = ${}", quote_ident(col), i + 1))
721 .collect::<Vec<_>>()
722 .join(" AND ");
723
724 let sql = format!("DELETE FROM {} WHERE {}", quote_ident(table), where_clause);
725
726 match conn.execute(cx, &sql, pk_values).await {
727 Outcome::Ok(_) => deleted += 1,
728 Outcome::Err(e) => return Outcome::Err(e),
729 Outcome::Cancelled(r) => return Outcome::Cancelled(r),
730 Outcome::Panicked(p) => return Outcome::Panicked(p),
731 }
732 }
733 }
734 Outcome::Ok(deleted)
735 }
736 }
737
738 #[tracing::instrument(level = "debug", skip(cx, conn, op))]
740 async fn execute_update<C: Connection>(
741 cx: &Cx,
742 conn: &C,
743 op: &PendingOp,
744 ) -> Outcome<(), Error> {
745 let PendingOp::Update {
746 table,
747 pk_columns,
748 pk_values,
749 set_columns,
750 set_values,
751 ..
752 } = op
753 else {
754 return Outcome::Ok(());
755 };
756
757 if pk_columns.is_empty() || pk_values.is_empty() {
759 tracing::warn!(
760 table = *table,
761 "Skipping UPDATE for row without primary key - cannot identify row"
762 );
763 return Outcome::Ok(());
764 }
765
766 if set_columns.is_empty() {
767 return Outcome::Ok(());
768 }
769
770 tracing::debug!(
771 table = *table,
772 columns = ?set_columns,
773 "Executing update"
774 );
775
776 let mut param_idx = 1;
778 let set_clause: String = set_columns
779 .iter()
780 .map(|col| {
781 let clause = format!("{} = ${}", quote_ident(col), param_idx);
782 param_idx += 1;
783 clause
784 })
785 .collect::<Vec<_>>()
786 .join(", ");
787
788 let where_clause: String = pk_columns
789 .iter()
790 .map(|col| {
791 let clause = format!("{} = ${}", quote_ident(col), param_idx);
792 param_idx += 1;
793 clause
794 })
795 .collect::<Vec<_>>()
796 .join(" AND ");
797
798 let sql = format!(
799 "UPDATE {} SET {} WHERE {}",
800 quote_ident(table),
801 set_clause,
802 where_clause
803 );
804
805 let mut params: Vec<Value> = set_values.clone();
806 params.extend(pk_values.iter().cloned());
807
808 match conn.execute(cx, &sql, ¶ms).await {
809 Outcome::Ok(_) => Outcome::Ok(()),
810 Outcome::Err(e) => Outcome::Err(e),
811 Outcome::Cancelled(r) => Outcome::Cancelled(r),
812 Outcome::Panicked(p) => Outcome::Panicked(p),
813 }
814 }
815}
816
817#[derive(Debug, Default, Clone, Copy)]
819pub struct FlushResult {
820 pub inserted: usize,
822 pub updated: usize,
824 pub deleted: usize,
826}
827
828impl FlushResult {
829 pub fn new() -> Self {
831 Self::default()
832 }
833
834 pub fn total(&self) -> usize {
836 self.inserted + self.updated + self.deleted
837 }
838}
839
840#[cfg(test)]
841mod tests {
842 use super::*;
843 use sqlmodel_core::{FieldInfo, Row};
844 use std::any::TypeId;
845
846 struct Team;
848 struct Hero;
849
850 impl Model for Team {
851 const TABLE_NAME: &'static str = "teams";
852 const PRIMARY_KEY: &'static [&'static str] = &["id"];
853
854 fn fields() -> &'static [FieldInfo] {
855 static FIELDS: [FieldInfo; 1] =
856 [FieldInfo::new("id", "id", sqlmodel_core::SqlType::BigInt)
857 .primary_key(true)
858 .auto_increment(true)];
859 &FIELDS
860 }
861
862 fn primary_key_value(&self) -> Vec<Value> {
863 vec![]
864 }
865
866 fn from_row(_row: &Row) -> Result<Self, sqlmodel_core::Error> {
867 Ok(Team)
868 }
869
870 fn to_row(&self) -> Vec<(&'static str, Value)> {
871 vec![]
872 }
873
874 fn is_new(&self) -> bool {
875 true
876 }
877 }
878
879 impl Model for Hero {
880 const TABLE_NAME: &'static str = "heroes";
881 const PRIMARY_KEY: &'static [&'static str] = &["id"];
882
883 fn fields() -> &'static [FieldInfo] {
884 static FIELDS: [FieldInfo; 2] = [
885 FieldInfo::new("id", "id", sqlmodel_core::SqlType::BigInt)
886 .primary_key(true)
887 .auto_increment(true),
888 FieldInfo::new("team_id", "team_id", sqlmodel_core::SqlType::BigInt)
889 .nullable(true)
890 .foreign_key("teams.id"),
891 ];
892 &FIELDS
893 }
894
895 fn primary_key_value(&self) -> Vec<Value> {
896 vec![]
897 }
898
899 fn from_row(_row: &Row) -> Result<Self, sqlmodel_core::Error> {
900 Ok(Hero)
901 }
902
903 fn to_row(&self) -> Vec<(&'static str, Value)> {
904 vec![]
905 }
906
907 fn is_new(&self) -> bool {
908 true
909 }
910 }
911
912 fn make_insert(table: &'static str, pk: i64) -> PendingOp {
913 PendingOp::Insert {
914 key: ObjectKey {
915 type_id: TypeId::of::<()>(),
916 pk_hash: pk as u64,
917 },
918 table,
919 columns: vec!["id", "name"],
920 values: vec![Value::BigInt(pk), Value::Text("Test".to_string())],
921 }
922 }
923
924 fn make_delete(table: &'static str, pk: i64) -> PendingOp {
925 PendingOp::Delete {
926 key: ObjectKey {
927 type_id: TypeId::of::<()>(),
928 pk_hash: pk as u64,
929 },
930 table,
931 pk_columns: vec!["id"],
932 pk_values: vec![Value::BigInt(pk)],
933 }
934 }
935
936 fn make_update(table: &'static str, pk: i64) -> PendingOp {
937 PendingOp::Update {
938 key: ObjectKey {
939 type_id: TypeId::of::<()>(),
940 pk_hash: pk as u64,
941 },
942 table,
943 pk_columns: vec!["id"],
944 pk_values: vec![Value::BigInt(pk)],
945 set_columns: vec!["name"],
946 set_values: vec![Value::Text("Updated".to_string())],
947 }
948 }
949
950 #[test]
951 fn test_pending_op_table_accessor() {
952 let insert = make_insert("teams", 1);
953 assert_eq!(insert.table(), "teams");
954
955 let delete = make_delete("heroes", 2);
956 assert_eq!(delete.table(), "heroes");
957
958 let update = make_update("teams", 3);
959 assert_eq!(update.table(), "teams");
960 }
961
962 #[test]
963 fn test_pending_op_type_checks() {
964 let insert = make_insert("teams", 1);
965 assert!(insert.is_insert());
966 assert!(!insert.is_update());
967 assert!(!insert.is_delete());
968
969 let update = make_update("teams", 1);
970 assert!(update.is_update());
971 assert!(!update.is_insert());
972 assert!(!update.is_delete());
973
974 let delete = make_delete("teams", 1);
975 assert!(delete.is_delete());
976 assert!(!delete.is_insert());
977 assert!(!delete.is_update());
978 }
979
980 #[test]
981 fn test_orderer_simple_no_deps() {
982 let orderer = FlushOrderer::new();
983 let ops = vec![
984 make_insert("teams", 1),
985 make_insert("teams", 2),
986 make_delete("teams", 3),
987 ];
988
989 let plan = orderer.order(ops);
990 assert_eq!(plan.inserts.len(), 2);
991 assert_eq!(plan.deletes.len(), 1);
992 assert_eq!(plan.updates.len(), 0);
993 }
994
995 #[test]
996 fn test_orderer_parent_child_inserts() {
997 let mut orderer = FlushOrderer::new();
998 orderer.register_model::<Team>();
999 orderer.register_model::<Hero>();
1000
1001 let ops = vec![
1003 make_insert("heroes", 1), make_insert("teams", 1), ];
1006
1007 let plan = orderer.order(ops);
1008
1009 assert_eq!(plan.inserts[0].table(), "teams");
1011 assert_eq!(plan.inserts[1].table(), "heroes");
1012 }
1013
1014 #[test]
1015 fn test_orderer_parent_child_deletes() {
1016 let mut orderer = FlushOrderer::new();
1017 orderer.register_model::<Team>();
1018 orderer.register_model::<Hero>();
1019
1020 let ops = vec![
1022 make_delete("teams", 1), make_delete("heroes", 1), ];
1025
1026 let plan = orderer.order(ops);
1027
1028 assert_eq!(plan.deletes[0].table(), "heroes");
1030 assert_eq!(plan.deletes[1].table(), "teams");
1031 }
1032
1033 #[test]
1034 fn test_batch_by_table_groups_correctly() {
1035 let ops = vec![
1036 make_insert("teams", 1),
1037 make_insert("teams", 2),
1038 make_insert("heroes", 1),
1039 make_insert("heroes", 2),
1040 make_insert("teams", 3),
1041 ];
1042
1043 let batches = FlushPlan::batch_by_table(&ops);
1044
1045 assert_eq!(batches.len(), 3);
1047 assert_eq!(batches[0].len(), 2); assert_eq!(batches[1].len(), 2); assert_eq!(batches[2].len(), 1); }
1051
1052 #[test]
1053 fn test_batch_empty_returns_empty() {
1054 let ops: Vec<PendingOp> = vec![];
1055 let batches = FlushPlan::batch_by_table(&ops);
1056 assert!(batches.is_empty());
1057 }
1058
1059 #[test]
1060 fn test_flush_plan_is_empty() {
1061 let plan = FlushPlan::new();
1062 assert!(plan.is_empty());
1063 assert_eq!(plan.len(), 0);
1064 }
1065
1066 #[test]
1067 fn test_flush_plan_len() {
1068 let plan = FlushPlan {
1069 deletes: vec![make_delete("teams", 1)],
1070 inserts: vec![make_insert("teams", 1), make_insert("teams", 2)],
1071 updates: vec![make_update("teams", 1)],
1072 };
1073 assert!(!plan.is_empty());
1074 assert_eq!(plan.len(), 4);
1075 }
1076
1077 #[test]
1078 fn test_flush_result_total() {
1079 let result = FlushResult {
1080 inserted: 5,
1081 updated: 3,
1082 deleted: 2,
1083 };
1084 assert_eq!(result.total(), 10);
1085 }
1086
1087 #[test]
1088 fn test_flush_result_default() {
1089 let result = FlushResult::new();
1090 assert_eq!(result.inserted, 0);
1091 assert_eq!(result.updated, 0);
1092 assert_eq!(result.deleted, 0);
1093 assert_eq!(result.total(), 0);
1094 }
1095
1096 #[test]
1101 fn test_link_table_op_link_constructor() {
1102 let op = LinkTableOp::link(
1103 "hero_powers".to_string(),
1104 "hero_id".to_string(),
1105 Value::BigInt(1),
1106 "power_id".to_string(),
1107 Value::BigInt(5),
1108 );
1109
1110 match op {
1111 LinkTableOp::Link {
1112 table,
1113 local_column,
1114 local_value,
1115 remote_column,
1116 remote_value,
1117 } => {
1118 assert_eq!(table, "hero_powers");
1119 assert_eq!(local_column, "hero_id");
1120 assert_eq!(local_value, Value::BigInt(1));
1121 assert_eq!(remote_column, "power_id");
1122 assert_eq!(remote_value, Value::BigInt(5));
1123 }
1124 LinkTableOp::Unlink { .. } => std::panic::panic_any("Expected Link variant"),
1125 }
1126 }
1127
1128 #[test]
1129 fn test_link_table_op_unlink_constructor() {
1130 let op = LinkTableOp::unlink(
1131 "hero_powers".to_string(),
1132 "hero_id".to_string(),
1133 Value::BigInt(1),
1134 "power_id".to_string(),
1135 Value::BigInt(5),
1136 );
1137
1138 match op {
1139 LinkTableOp::Unlink {
1140 table,
1141 local_column,
1142 local_value,
1143 remote_column,
1144 remote_value,
1145 } => {
1146 assert_eq!(table, "hero_powers");
1147 assert_eq!(local_column, "hero_id");
1148 assert_eq!(local_value, Value::BigInt(1));
1149 assert_eq!(remote_column, "power_id");
1150 assert_eq!(remote_value, Value::BigInt(5));
1151 }
1152 LinkTableOp::Link { .. } => std::panic::panic_any("Expected Unlink variant"),
1153 }
1154 }
1155
1156 #[test]
1157 fn test_link_table_op_is_link() {
1158 let link = LinkTableOp::link(
1159 "t".to_string(),
1160 "a".to_string(),
1161 Value::BigInt(1),
1162 "b".to_string(),
1163 Value::BigInt(2),
1164 );
1165 let unlink = LinkTableOp::unlink(
1166 "t".to_string(),
1167 "a".to_string(),
1168 Value::BigInt(1),
1169 "b".to_string(),
1170 Value::BigInt(2),
1171 );
1172
1173 assert!(matches!(link, LinkTableOp::Link { .. }));
1174 assert!(matches!(unlink, LinkTableOp::Unlink { .. }));
1175 }
1176
1177 #[test]
1178 fn test_link_table_op_debug_format() {
1179 let link = LinkTableOp::link(
1180 "hero_powers".to_string(),
1181 "hero_id".to_string(),
1182 Value::BigInt(1),
1183 "power_id".to_string(),
1184 Value::BigInt(5),
1185 );
1186 let debug_str = format!("{:?}", link);
1187 assert!(debug_str.contains("Link"));
1188 assert!(debug_str.contains("hero_powers"));
1189 }
1190
1191 #[test]
1192 fn test_link_table_op_clone() {
1193 let op = LinkTableOp::link(
1194 "hero_powers".to_string(),
1195 "hero_id".to_string(),
1196 Value::BigInt(1),
1197 "power_id".to_string(),
1198 Value::BigInt(5),
1199 );
1200 let cloned = op.clone();
1201
1202 match (op, cloned) {
1203 (
1204 LinkTableOp::Link {
1205 table: t1,
1206 local_value: lv1,
1207 remote_value: rv1,
1208 ..
1209 },
1210 LinkTableOp::Link {
1211 table: t2,
1212 local_value: lv2,
1213 remote_value: rv2,
1214 ..
1215 },
1216 ) => {
1217 assert_eq!(t1, t2);
1218 assert_eq!(lv1, lv2);
1219 assert_eq!(rv1, rv2);
1220 }
1221 _ => std::panic::panic_any("Clone should preserve variant"),
1222 }
1223 }
1224
1225 #[test]
1226 fn test_link_table_ops_empty_vec() {
1227 let ops: Vec<LinkTableOp> = vec![];
1229 assert!(ops.is_empty());
1230 }
1231
1232 #[test]
1233 fn test_link_table_ops_multiple_operations() {
1234 let ops = [
1235 LinkTableOp::link(
1236 "hero_powers".to_string(),
1237 "hero_id".to_string(),
1238 Value::BigInt(1),
1239 "power_id".to_string(),
1240 Value::BigInt(1),
1241 ),
1242 LinkTableOp::link(
1243 "hero_powers".to_string(),
1244 "hero_id".to_string(),
1245 Value::BigInt(1),
1246 "power_id".to_string(),
1247 Value::BigInt(2),
1248 ),
1249 LinkTableOp::unlink(
1250 "hero_powers".to_string(),
1251 "hero_id".to_string(),
1252 Value::BigInt(1),
1253 "power_id".to_string(),
1254 Value::BigInt(3),
1255 ),
1256 ];
1257
1258 let links: Vec<_> = ops
1259 .iter()
1260 .filter(|o| matches!(o, LinkTableOp::Link { .. }))
1261 .collect();
1262 let unlinks: Vec<_> = ops
1263 .iter()
1264 .filter(|o| matches!(o, LinkTableOp::Unlink { .. }))
1265 .collect();
1266
1267 assert_eq!(links.len(), 2);
1268 assert_eq!(unlinks.len(), 1);
1269 }
1270
1271 #[test]
1272 fn test_link_table_op_with_different_value_types() {
1273 let op_str = LinkTableOp::link(
1275 "tag_items".to_string(),
1276 "tag_id".to_string(),
1277 Value::Text("tag-uuid-123".to_string()),
1278 "item_id".to_string(),
1279 Value::Text("item-uuid-456".to_string()),
1280 );
1281
1282 match op_str {
1283 LinkTableOp::Link {
1284 local_value,
1285 remote_value,
1286 ..
1287 } => {
1288 assert!(matches!(local_value, Value::Text(_)));
1289 assert!(matches!(remote_value, Value::Text(_)));
1290 }
1291 LinkTableOp::Unlink { .. } => std::panic::panic_any("Expected Link"),
1292 }
1293
1294 let op_int = LinkTableOp::link(
1296 "user_roles".to_string(),
1297 "user_id".to_string(),
1298 Value::Int(42),
1299 "role_id".to_string(),
1300 Value::Int(7),
1301 );
1302
1303 match op_int {
1304 LinkTableOp::Link {
1305 local_value,
1306 remote_value,
1307 ..
1308 } => {
1309 assert!(matches!(local_value, Value::Int(_)));
1310 assert!(matches!(remote_value, Value::Int(_)));
1311 }
1312 LinkTableOp::Unlink { .. } => std::panic::panic_any("Expected Link"),
1313 }
1314 }
1315
1316 fn make_custom_insert(table: &'static str, columns: Vec<&'static str>, pk: i64) -> PendingOp {
1322 PendingOp::Insert {
1323 key: ObjectKey {
1324 type_id: TypeId::of::<()>(),
1325 pk_hash: pk as u64,
1326 },
1327 table,
1328 columns,
1329 values: vec![Value::BigInt(pk), Value::Text("Test".to_string())],
1330 }
1331 }
1332
1333 fn make_custom_delete(
1335 table: &'static str,
1336 pk_columns: Vec<&'static str>,
1337 pk: i64,
1338 ) -> PendingOp {
1339 PendingOp::Delete {
1340 key: ObjectKey {
1341 type_id: TypeId::of::<()>(),
1342 pk_hash: pk as u64,
1343 },
1344 table,
1345 pk_columns,
1346 pk_values: vec![Value::BigInt(pk)],
1347 }
1348 }
1349
1350 fn make_custom_update(
1352 table: &'static str,
1353 pk_columns: Vec<&'static str>,
1354 set_columns: Vec<&'static str>,
1355 pk: i64,
1356 ) -> PendingOp {
1357 PendingOp::Update {
1358 key: ObjectKey {
1359 type_id: TypeId::of::<()>(),
1360 pk_hash: pk as u64,
1361 },
1362 table,
1363 pk_columns,
1364 pk_values: vec![Value::BigInt(pk)],
1365 set_columns,
1366 set_values: vec![Value::Text("Updated".to_string())],
1367 }
1368 }
1369
1370 #[test]
1373 fn test_link_table_op_to_sql_simple() {
1374 let op = LinkTableOp::link(
1375 "hero_powers".to_string(),
1376 "hero_id".to_string(),
1377 Value::BigInt(1),
1378 "power_id".to_string(),
1379 Value::BigInt(5),
1380 );
1381 let sql = op.to_sql();
1382 assert_eq!(
1383 sql,
1384 "INSERT INTO \"hero_powers\" (\"hero_id\", \"power_id\") VALUES ($1, $2)"
1385 );
1386 }
1387
1388 #[test]
1389 fn test_link_table_op_to_sql_with_keywords() {
1390 let op = LinkTableOp::link(
1391 "order".to_string(), "select".to_string(), Value::BigInt(1),
1394 "from".to_string(), Value::BigInt(2),
1396 );
1397 let sql = op.to_sql();
1398 assert_eq!(
1399 sql,
1400 "INSERT INTO \"order\" (\"select\", \"from\") VALUES ($1, $2)"
1401 );
1402 }
1403
1404 #[test]
1405 fn test_link_table_op_to_sql_with_embedded_quotes() {
1406 let op = LinkTableOp::link(
1407 "my\"table".to_string(),
1408 "col\"a".to_string(),
1409 Value::BigInt(1),
1410 "col\"b".to_string(),
1411 Value::BigInt(2),
1412 );
1413 let sql = op.to_sql();
1414 assert_eq!(
1415 sql,
1416 "INSERT INTO \"my\"\"table\" (\"col\"\"a\", \"col\"\"b\") VALUES ($1, $2)"
1417 );
1418 }
1419
1420 #[test]
1421 fn test_link_table_op_unlink_to_sql_with_keywords() {
1422 let op = LinkTableOp::unlink(
1423 "user".to_string(),
1424 "index".to_string(),
1425 Value::BigInt(1),
1426 "key".to_string(),
1427 Value::BigInt(2),
1428 );
1429 let sql = op.to_sql();
1430 assert_eq!(
1431 sql,
1432 "DELETE FROM \"user\" WHERE \"index\" = $1 AND \"key\" = $2"
1433 );
1434 }
1435
1436 #[test]
1437 fn test_link_table_op_to_sql_with_unicode() {
1438 let op = LinkTableOp::link(
1439 "用户表".to_string(),
1440 "用户id".to_string(),
1441 Value::BigInt(1),
1442 "角色id".to_string(),
1443 Value::BigInt(2),
1444 );
1445 let sql = op.to_sql();
1446 assert_eq!(
1447 sql,
1448 "INSERT INTO \"用户表\" (\"用户id\", \"角色id\") VALUES ($1, $2)"
1449 );
1450 }
1451
1452 #[test]
1453 fn test_link_table_op_to_sql_with_spaces() {
1454 let op = LinkTableOp::link(
1455 "link table".to_string(),
1456 "local id".to_string(),
1457 Value::BigInt(1),
1458 "remote id".to_string(),
1459 Value::BigInt(2),
1460 );
1461 let sql = op.to_sql();
1462 assert_eq!(
1463 sql,
1464 "INSERT INTO \"link table\" (\"local id\", \"remote id\") VALUES ($1, $2)"
1465 );
1466 }
1467
1468 #[test]
1471 fn test_pending_op_insert_to_sql_simple() {
1472 let op = make_insert("teams", 1);
1473 let sql = op.to_sql();
1474 assert!(sql.starts_with("INSERT INTO \"teams\""));
1475 assert!(sql.contains("(\"id\", \"name\")"));
1476 assert!(sql.contains("VALUES ($1, $2)"));
1477 }
1478
1479 #[test]
1480 fn test_pending_op_insert_to_sql_with_keyword_table() {
1481 let op = make_custom_insert("order", vec!["id", "select"], 1);
1482 let sql = op.to_sql();
1483 assert_eq!(
1484 sql,
1485 "INSERT INTO \"order\" (\"id\", \"select\") VALUES ($1, $2)"
1486 );
1487 }
1488
1489 #[test]
1490 fn test_pending_op_insert_to_sql_with_quoted_names() {
1491 let op = make_custom_insert("my\"table", vec!["pk\"id", "data\"col"], 1);
1492 let sql = op.to_sql();
1493 assert_eq!(
1494 sql,
1495 "INSERT INTO \"my\"\"table\" (\"pk\"\"id\", \"data\"\"col\") VALUES ($1, $2)"
1496 );
1497 }
1498
1499 #[test]
1502 fn test_pending_op_delete_to_sql_single_pk() {
1503 let op = make_delete("teams", 1);
1504 let sql = op.to_sql();
1505 assert_eq!(sql, "DELETE FROM \"teams\" WHERE \"id\" IN ($1)");
1506 }
1507
1508 #[test]
1509 fn test_pending_op_delete_to_sql_with_keyword_table() {
1510 let op = make_custom_delete("order", vec!["id"], 1);
1511 let sql = op.to_sql();
1512 assert_eq!(sql, "DELETE FROM \"order\" WHERE \"id\" IN ($1)");
1513 }
1514
1515 #[test]
1516 fn test_pending_op_delete_to_sql_composite_pk() {
1517 let op = PendingOp::Delete {
1518 key: ObjectKey {
1519 type_id: TypeId::of::<()>(),
1520 pk_hash: 1,
1521 },
1522 table: "order_items",
1523 pk_columns: vec!["order_id", "item_id"],
1524 pk_values: vec![Value::BigInt(1), Value::BigInt(2)],
1525 };
1526 let sql = op.to_sql();
1527 assert_eq!(
1528 sql,
1529 "DELETE FROM \"order_items\" WHERE \"order_id\" = $1 AND \"item_id\" = $2"
1530 );
1531 }
1532
1533 #[test]
1534 fn test_pending_op_delete_to_sql_with_keyword_pk_columns() {
1535 let op = PendingOp::Delete {
1536 key: ObjectKey {
1537 type_id: TypeId::of::<()>(),
1538 pk_hash: 1,
1539 },
1540 table: "user",
1541 pk_columns: vec!["select", "from"],
1542 pk_values: vec![Value::BigInt(1), Value::BigInt(2)],
1543 };
1544 let sql = op.to_sql();
1545 assert_eq!(
1546 sql,
1547 "DELETE FROM \"user\" WHERE \"select\" = $1 AND \"from\" = $2"
1548 );
1549 }
1550
1551 #[test]
1554 fn test_pending_op_update_to_sql_simple() {
1555 let op = make_update("teams", 1);
1556 let sql = op.to_sql();
1557 assert_eq!(sql, "UPDATE \"teams\" SET \"name\" = $1 WHERE \"id\" = $2");
1558 }
1559
1560 #[test]
1561 fn test_pending_op_update_to_sql_with_keyword_names() {
1562 let op = make_custom_update("order", vec!["id"], vec!["select", "from"], 1);
1563 let sql = op.to_sql();
1564 assert_eq!(
1565 sql,
1566 "UPDATE \"order\" SET \"select\" = $1, \"from\" = $2 WHERE \"id\" = $3"
1567 );
1568 }
1569
1570 #[test]
1571 fn test_pending_op_update_to_sql_with_quoted_names() {
1572 let op = make_custom_update("my\"table", vec!["pk\"id"], vec!["data\"col"], 1);
1573 let sql = op.to_sql();
1574 assert_eq!(
1575 sql,
1576 "UPDATE \"my\"\"table\" SET \"data\"\"col\" = $1 WHERE \"pk\"\"id\" = $2"
1577 );
1578 }
1579
1580 #[test]
1581 fn test_pending_op_update_to_sql_composite_pk() {
1582 let op = PendingOp::Update {
1583 key: ObjectKey {
1584 type_id: TypeId::of::<()>(),
1585 pk_hash: 1,
1586 },
1587 table: "order_items",
1588 pk_columns: vec!["order_id", "item_id"],
1589 pk_values: vec![Value::BigInt(1), Value::BigInt(2)],
1590 set_columns: vec!["quantity"],
1591 set_values: vec![Value::Int(5)],
1592 };
1593 let sql = op.to_sql();
1594 assert_eq!(
1595 sql,
1596 "UPDATE \"order_items\" SET \"quantity\" = $1 WHERE \"order_id\" = $2 AND \"item_id\" = $3"
1597 );
1598 }
1599
1600 #[test]
1603 fn test_link_op_sql_injection_neutralized() {
1604 let op = LinkTableOp::link(
1606 "links\"; DROP TABLE users; --".to_string(),
1607 "col1".to_string(),
1608 Value::BigInt(1),
1609 "col2".to_string(),
1610 Value::BigInt(2),
1611 );
1612 let sql = op.to_sql();
1613 assert!(sql.contains("\"links\"\"; DROP TABLE users; --\""));
1615 assert!(sql.starts_with("INSERT INTO \""));
1617 }
1618
1619 #[test]
1620 fn test_pending_op_insert_sql_injection_neutralized() {
1621 let op = make_custom_insert("users\"; DROP TABLE secrets; --", vec!["id", "name"], 1);
1622 let sql = op.to_sql();
1623 assert!(sql.contains("\"users\"\"; DROP TABLE secrets; --\""));
1625 assert!(sql.starts_with("INSERT INTO \""));
1626 }
1627
1628 #[test]
1629 fn test_pending_op_update_sql_injection_neutralized() {
1630 let op = make_custom_update("data", vec!["id"], vec!["col\"; DROP TABLE data; --"], 1);
1631 let sql = op.to_sql();
1632 assert!(sql.contains("\"col\"\"; DROP TABLE data; --\""));
1634 }
1635
1636 #[test]
1639 fn test_pending_op_insert_many_columns() {
1640 let op = PendingOp::Insert {
1641 key: ObjectKey {
1642 type_id: TypeId::of::<()>(),
1643 pk_hash: 1,
1644 },
1645 table: "wide_table",
1646 columns: vec!["a", "b", "c", "d", "e"],
1647 values: vec![
1648 Value::Int(1),
1649 Value::Int(2),
1650 Value::Int(3),
1651 Value::Int(4),
1652 Value::Int(5),
1653 ],
1654 };
1655 let sql = op.to_sql();
1656 assert_eq!(
1657 sql,
1658 "INSERT INTO \"wide_table\" (\"a\", \"b\", \"c\", \"d\", \"e\") VALUES ($1, $2, $3, $4, $5)"
1659 );
1660 }
1661
1662 #[test]
1663 fn test_pending_op_update_many_set_columns() {
1664 let op = PendingOp::Update {
1665 key: ObjectKey {
1666 type_id: TypeId::of::<()>(),
1667 pk_hash: 1,
1668 },
1669 table: "items",
1670 pk_columns: vec!["id"],
1671 pk_values: vec![Value::BigInt(1)],
1672 set_columns: vec!["a", "b", "c"],
1673 set_values: vec![Value::Int(1), Value::Int(2), Value::Int(3)],
1674 };
1675 let sql = op.to_sql();
1676 assert_eq!(
1677 sql,
1678 "UPDATE \"items\" SET \"a\" = $1, \"b\" = $2, \"c\" = $3 WHERE \"id\" = $4"
1679 );
1680 }
1681
1682 #[test]
1683 fn test_link_table_op_empty_strings() {
1684 let op = LinkTableOp::link(
1686 String::new(),
1687 String::new(),
1688 Value::BigInt(1),
1689 String::new(),
1690 Value::BigInt(2),
1691 );
1692 let sql = op.to_sql();
1693 assert_eq!(sql, "INSERT INTO \"\" (\"\", \"\") VALUES ($1, $2)");
1694 }
1695
1696 #[test]
1699 fn test_pending_op_delete_empty_pk_columns() {
1700 let op = PendingOp::Delete {
1701 key: ObjectKey {
1702 type_id: TypeId::of::<()>(),
1703 pk_hash: 1,
1704 },
1705 table: "orphan_table",
1706 pk_columns: vec![], pk_values: vec![],
1708 };
1709 let sql = op.to_sql();
1710 assert!(sql.starts_with("-- ERROR:"));
1712 assert!(sql.contains("DELETE"));
1713 assert!(sql.contains("no pk_columns"));
1714 }
1715
1716 #[test]
1717 fn test_pending_op_update_empty_pk_columns() {
1718 let op = PendingOp::Update {
1719 key: ObjectKey {
1720 type_id: TypeId::of::<()>(),
1721 pk_hash: 1,
1722 },
1723 table: "orphan_table",
1724 pk_columns: vec![], pk_values: vec![],
1726 set_columns: vec!["name"],
1727 set_values: vec![Value::Text("test".to_string())],
1728 };
1729 let sql = op.to_sql();
1730 assert!(sql.starts_with("-- ERROR:"));
1732 assert!(sql.contains("UPDATE"));
1733 assert!(sql.contains("no pk_columns"));
1734 }
1735
1736 #[test]
1737 fn test_pending_op_update_empty_set_columns() {
1738 let op = PendingOp::Update {
1739 key: ObjectKey {
1740 type_id: TypeId::of::<()>(),
1741 pk_hash: 1,
1742 },
1743 table: "nothing_to_update",
1744 pk_columns: vec!["id"],
1745 pk_values: vec![Value::BigInt(1)],
1746 set_columns: vec![], set_values: vec![],
1748 };
1749 let sql = op.to_sql();
1750 assert!(sql.starts_with("-- ERROR:"));
1752 assert!(sql.contains("UPDATE"));
1753 assert!(sql.contains("no set_columns"));
1754 }
1755
1756 #[test]
1757 fn test_pending_op_insert_empty_columns() {
1758 let op = PendingOp::Insert {
1759 key: ObjectKey {
1760 type_id: TypeId::of::<()>(),
1761 pk_hash: 1,
1762 },
1763 table: "empty_insert",
1764 columns: vec![], values: vec![],
1766 };
1767 let sql = op.to_sql();
1768 assert!(sql.starts_with("-- ERROR:"));
1770 assert!(sql.contains("INSERT"));
1771 assert!(sql.contains("no columns"));
1772 }
1773}