Skip to main content

sqlmodel_session/
flush.rs

1//! Flush operation ordering and batching for SQLModel Session.
2//!
3//! This module handles writing pending changes to the database in the correct order:
4//! - DELETE child-first (to respect FK constraints)
5//! - INSERT parent-first (to respect FK constraints)
6//! - UPDATE any order (no circular FK assumed)
7//!
8//! Operations are batched by table for performance.
9
10use crate::ObjectKey;
11use asupersync::{Cx, Outcome};
12use sqlmodel_core::{Connection, Error, Model, Value, quote_ident};
13use std::collections::HashMap;
14
15/// A pending database operation.
16#[derive(Debug, Clone)]
17pub enum PendingOp {
18    /// Insert a new row.
19    Insert {
20        /// Object key for identity map.
21        key: ObjectKey,
22        /// Table name.
23        table: &'static str,
24        /// Column names.
25        columns: Vec<&'static str>,
26        /// Values to insert.
27        values: Vec<Value>,
28    },
29    /// Update an existing row.
30    Update {
31        /// Object key for identity map.
32        key: ObjectKey,
33        /// Table name.
34        table: &'static str,
35        /// Primary key column names.
36        pk_columns: Vec<&'static str>,
37        /// Primary key values.
38        pk_values: Vec<Value>,
39        /// Columns to update (only dirty ones).
40        set_columns: Vec<&'static str>,
41        /// New values for dirty columns.
42        set_values: Vec<Value>,
43    },
44    /// Delete an existing row.
45    Delete {
46        /// Object key for identity map.
47        key: ObjectKey,
48        /// Table name.
49        table: &'static str,
50        /// Primary key column names.
51        pk_columns: Vec<&'static str>,
52        /// Primary key values.
53        pk_values: Vec<Value>,
54    },
55}
56
57/// A pending link table operation (for many-to-many relationships).
58#[derive(Debug, Clone)]
59pub enum LinkTableOp {
60    /// Insert a link (relationship).
61    Link {
62        /// Link table name.
63        table: String,
64        /// Local (parent) column name.
65        local_column: String,
66        /// Local (parent) PK value.
67        local_value: Value,
68        /// Remote (child) column name.
69        remote_column: String,
70        /// Remote (child) PK value.
71        remote_value: Value,
72    },
73    /// Delete a link (relationship).
74    Unlink {
75        /// Link table name.
76        table: String,
77        /// Local (parent) column name.
78        local_column: String,
79        /// Local (parent) PK value.
80        local_value: Value,
81        /// Remote (child) column name.
82        remote_column: String,
83        /// Remote (child) PK value.
84        remote_value: Value,
85    },
86}
87
88impl LinkTableOp {
89    /// Create a link operation.
90    pub fn link(
91        table: impl Into<String>,
92        local_column: impl Into<String>,
93        local_value: Value,
94        remote_column: impl Into<String>,
95        remote_value: Value,
96    ) -> Self {
97        Self::Link {
98            table: table.into(),
99            local_column: local_column.into(),
100            local_value,
101            remote_column: remote_column.into(),
102            remote_value,
103        }
104    }
105
106    /// Create an unlink operation.
107    pub fn unlink(
108        table: impl Into<String>,
109        local_column: impl Into<String>,
110        local_value: Value,
111        remote_column: impl Into<String>,
112        remote_value: Value,
113    ) -> Self {
114        Self::Unlink {
115            table: table.into(),
116            local_column: local_column.into(),
117            local_value,
118            remote_column: remote_column.into(),
119            remote_value,
120        }
121    }
122
123    /// Get the table name.
124    pub fn table(&self) -> &str {
125        match self {
126            LinkTableOp::Link { table, .. } => table,
127            LinkTableOp::Unlink { table, .. } => table,
128        }
129    }
130
131    /// Check if this is a link (insert) operation.
132    pub fn is_link(&self) -> bool {
133        matches!(self, LinkTableOp::Link { .. })
134    }
135
136    /// Check if this is an unlink (delete) operation.
137    pub fn is_unlink(&self) -> bool {
138        matches!(self, LinkTableOp::Unlink { .. })
139    }
140
141    /// Generate the SQL that would be executed for this operation.
142    ///
143    /// Useful for testing and debugging.
144    pub fn to_sql(&self) -> String {
145        match self {
146            LinkTableOp::Link {
147                table,
148                local_column,
149                remote_column,
150                ..
151            } => format!(
152                "INSERT INTO {} ({}, {}) VALUES ($1, $2)",
153                quote_ident(table),
154                quote_ident(local_column),
155                quote_ident(remote_column)
156            ),
157            LinkTableOp::Unlink {
158                table,
159                local_column,
160                remote_column,
161                ..
162            } => format!(
163                "DELETE FROM {} WHERE {} = $1 AND {} = $2",
164                quote_ident(table),
165                quote_ident(local_column),
166                quote_ident(remote_column)
167            ),
168        }
169    }
170
171    /// Execute this link table operation.
172    #[tracing::instrument(level = "debug", skip(cx, conn))]
173    pub async fn execute<C: Connection>(&self, cx: &Cx, conn: &C) -> Outcome<(), Error> {
174        match self {
175            LinkTableOp::Link {
176                table,
177                local_column,
178                local_value,
179                remote_column,
180                remote_value,
181            } => {
182                let sql = format!(
183                    "INSERT INTO {} ({}, {}) VALUES ($1, $2)",
184                    quote_ident(table),
185                    quote_ident(local_column),
186                    quote_ident(remote_column)
187                );
188                tracing::trace!(sql = %sql, "Executing link INSERT");
189                conn.execute(cx, &sql, &[local_value.clone(), remote_value.clone()])
190                    .await
191                    .map(|_| ())
192            }
193            LinkTableOp::Unlink {
194                table,
195                local_column,
196                local_value,
197                remote_column,
198                remote_value,
199            } => {
200                let sql = format!(
201                    "DELETE FROM {} WHERE {} = $1 AND {} = $2",
202                    quote_ident(table),
203                    quote_ident(local_column),
204                    quote_ident(remote_column)
205                );
206                tracing::trace!(sql = %sql, "Executing link DELETE");
207                conn.execute(cx, &sql, &[local_value.clone(), remote_value.clone()])
208                    .await
209                    .map(|_| ())
210            }
211        }
212    }
213}
214
215/// Execute a batch of link table operations.
216#[tracing::instrument(level = "debug", skip(cx, conn, ops))]
217pub async fn execute_link_table_ops<C: Connection>(
218    cx: &Cx,
219    conn: &C,
220    ops: &[LinkTableOp],
221) -> Outcome<usize, Error> {
222    if ops.is_empty() {
223        return Outcome::Ok(0);
224    }
225
226    tracing::info!(count = ops.len(), "Executing link table operations");
227
228    let mut count = 0;
229    for op in ops {
230        match op.execute(cx, conn).await {
231            Outcome::Ok(()) => count += 1,
232            Outcome::Err(e) => return Outcome::Err(e),
233            Outcome::Cancelled(r) => return Outcome::Cancelled(r),
234            Outcome::Panicked(p) => return Outcome::Panicked(p),
235        }
236    }
237
238    tracing::debug!(executed = count, "Link table operations complete");
239    Outcome::Ok(count)
240}
241
242impl PendingOp {
243    /// Get the table name for this operation.
244    pub fn table(&self) -> &'static str {
245        match self {
246            PendingOp::Insert { table, .. } => table,
247            PendingOp::Update { table, .. } => table,
248            PendingOp::Delete { table, .. } => table,
249        }
250    }
251
252    /// Get the object key for this operation.
253    pub fn key(&self) -> ObjectKey {
254        match self {
255            PendingOp::Insert { key, .. } => *key,
256            PendingOp::Update { key, .. } => *key,
257            PendingOp::Delete { key, .. } => *key,
258        }
259    }
260
261    /// Check if this is an insert operation.
262    pub fn is_insert(&self) -> bool {
263        matches!(self, PendingOp::Insert { .. })
264    }
265
266    /// Check if this is an update operation.
267    pub fn is_update(&self) -> bool {
268        matches!(self, PendingOp::Update { .. })
269    }
270
271    /// Check if this is a delete operation.
272    pub fn is_delete(&self) -> bool {
273        matches!(self, PendingOp::Delete { .. })
274    }
275
276    /// Generate the SQL that would be executed for this operation.
277    ///
278    /// This is useful for testing and debugging. For INSERT, this generates
279    /// a single-row insert. For DELETE/UPDATE, the SQL matches what would be
280    /// executed for a single operation.
281    ///
282    /// Returns a descriptive error string for invalid operations (e.g., empty
283    /// pk_columns for DELETE/UPDATE, empty set_columns for UPDATE).
284    pub fn to_sql(&self) -> String {
285        match self {
286            PendingOp::Insert {
287                table,
288                columns,
289                values,
290                ..
291            } => {
292                if columns.is_empty() {
293                    return format!(
294                        "-- ERROR: INSERT INTO {} with no columns",
295                        quote_ident(table)
296                    );
297                }
298                let col_list: String = columns
299                    .iter()
300                    .map(|c| quote_ident(c))
301                    .collect::<Vec<_>>()
302                    .join(", ");
303                let placeholders: Vec<String> =
304                    (1..=values.len()).map(|i| format!("${}", i)).collect();
305                format!(
306                    "INSERT INTO {} ({}) VALUES ({})",
307                    quote_ident(table),
308                    col_list,
309                    placeholders.join(", ")
310                )
311            }
312            PendingOp::Delete {
313                table, pk_columns, ..
314            } => {
315                if pk_columns.is_empty() {
316                    return format!(
317                        "-- ERROR: DELETE FROM {} with no pk_columns",
318                        quote_ident(table)
319                    );
320                }
321                if pk_columns.len() == 1 {
322                    format!(
323                        "DELETE FROM {} WHERE {} IN ($1)",
324                        quote_ident(table),
325                        quote_ident(pk_columns[0])
326                    )
327                } else {
328                    let where_clause: String = pk_columns
329                        .iter()
330                        .enumerate()
331                        .map(|(i, col)| format!("{} = ${}", quote_ident(col), i + 1))
332                        .collect::<Vec<_>>()
333                        .join(" AND ");
334                    format!("DELETE FROM {} WHERE {}", quote_ident(table), where_clause)
335                }
336            }
337            PendingOp::Update {
338                table,
339                pk_columns,
340                set_columns,
341                ..
342            } => {
343                if pk_columns.is_empty() {
344                    return format!("-- ERROR: UPDATE {} with no pk_columns", quote_ident(table));
345                }
346                if set_columns.is_empty() {
347                    return format!(
348                        "-- ERROR: UPDATE {} with no set_columns",
349                        quote_ident(table)
350                    );
351                }
352                let mut param_idx = 1;
353                let set_clause: String = set_columns
354                    .iter()
355                    .map(|col| {
356                        let s = format!("{} = ${}", quote_ident(col), param_idx);
357                        param_idx += 1;
358                        s
359                    })
360                    .collect::<Vec<_>>()
361                    .join(", ");
362                let where_clause: String = pk_columns
363                    .iter()
364                    .map(|col| {
365                        let s = format!("{} = ${}", quote_ident(col), param_idx);
366                        param_idx += 1;
367                        s
368                    })
369                    .collect::<Vec<_>>()
370                    .join(" AND ");
371                format!(
372                    "UPDATE {} SET {} WHERE {}",
373                    quote_ident(table),
374                    set_clause,
375                    where_clause
376                )
377            }
378        }
379    }
380}
381
382/// Builds a dependency graph and orders operations for flush.
383///
384/// Uses table foreign key relationships to determine correct ordering:
385/// - Parents must be inserted before children
386/// - Children must be deleted before parents
387#[derive(Debug, Default)]
388pub struct FlushOrderer {
389    /// Table -> tables it depends on (has FK to).
390    dependencies: HashMap<&'static str, Vec<&'static str>>,
391}
392
393impl FlushOrderer {
394    /// Create a new flush orderer.
395    pub fn new() -> Self {
396        Self::default()
397    }
398
399    /// Register a model type's dependencies.
400    ///
401    /// Extracts foreign key relationships from the model's field metadata.
402    pub fn register_model<T: Model>(&mut self) {
403        let table = T::TABLE_NAME;
404        let deps: Vec<&'static str> = T::fields()
405            .iter()
406            .filter_map(|f| f.foreign_key)
407            .filter_map(|fk| fk.split('.').next())
408            .collect();
409        self.dependencies.insert(table, deps);
410    }
411
412    /// Register a table's dependencies directly.
413    pub fn register_table(&mut self, table: &'static str, depends_on: Vec<&'static str>) {
414        self.dependencies.insert(table, depends_on);
415    }
416
417    /// Get the dependency count for a table.
418    fn dependency_count(&self, table: &str) -> usize {
419        self.dependencies.get(table).map_or(0, Vec::len)
420    }
421
422    /// Order operations into a flush plan.
423    ///
424    /// Returns operations grouped and sorted:
425    /// - Deletes: child-first (more dependencies = delete first)
426    /// - Inserts: parent-first (fewer dependencies = insert first)
427    /// - Updates: any order
428    pub fn order(&self, ops: Vec<PendingOp>) -> FlushPlan {
429        let mut deletes = Vec::new();
430        let mut inserts = Vec::new();
431        let mut updates = Vec::new();
432
433        for op in ops {
434            match op {
435                PendingOp::Delete { .. } => deletes.push(op),
436                PendingOp::Insert { .. } => inserts.push(op),
437                PendingOp::Update { .. } => updates.push(op),
438            }
439        }
440
441        // Sort deletes: children first (more deps = delete first)
442        deletes.sort_by(|a, b| {
443            let a_deps = self.dependency_count(a.table());
444            let b_deps = self.dependency_count(b.table());
445            b_deps.cmp(&a_deps)
446        });
447
448        // Sort inserts: parents first (fewer deps = insert first)
449        inserts.sort_by(|a, b| {
450            let a_deps = self.dependency_count(a.table());
451            let b_deps = self.dependency_count(b.table());
452            a_deps.cmp(&b_deps)
453        });
454
455        FlushPlan {
456            deletes,
457            inserts,
458            updates,
459        }
460    }
461}
462
463/// A plan for executing flush operations.
464#[derive(Debug, Default)]
465pub struct FlushPlan {
466    /// Delete operations (ordered child-first).
467    pub deletes: Vec<PendingOp>,
468    /// Insert operations (ordered parent-first).
469    pub inserts: Vec<PendingOp>,
470    /// Update operations (any order).
471    pub updates: Vec<PendingOp>,
472}
473
474impl FlushPlan {
475    /// Create an empty flush plan.
476    pub fn new() -> Self {
477        Self::default()
478    }
479
480    /// Check if the plan has any operations.
481    pub fn is_empty(&self) -> bool {
482        self.deletes.is_empty() && self.inserts.is_empty() && self.updates.is_empty()
483    }
484
485    /// Total number of operations in the plan.
486    pub fn len(&self) -> usize {
487        self.deletes.len() + self.inserts.len() + self.updates.len()
488    }
489
490    /// Execute the flush plan against the database.
491    #[tracing::instrument(level = "info", skip(self, cx, conn))]
492    pub async fn execute<C: Connection>(&self, cx: &Cx, conn: &C) -> Outcome<FlushResult, Error> {
493        tracing::info!(
494            deletes = self.deletes.len(),
495            inserts = self.inserts.len(),
496            updates = self.updates.len(),
497            "Executing flush plan"
498        );
499
500        let start = std::time::Instant::now();
501        let mut result = FlushResult::default();
502
503        // 1. Execute deletes (batched by table)
504        for batch in Self::batch_by_table(&self.deletes) {
505            match Self::execute_delete_batch(cx, conn, &batch).await {
506                Outcome::Ok(count) => result.deleted += count,
507                Outcome::Err(e) => return Outcome::Err(e),
508                Outcome::Cancelled(r) => return Outcome::Cancelled(r),
509                Outcome::Panicked(p) => return Outcome::Panicked(p),
510            }
511        }
512
513        // 2. Execute inserts (batched by table)
514        for batch in Self::batch_by_table(&self.inserts) {
515            match Self::execute_insert_batch(cx, conn, &batch).await {
516                Outcome::Ok(count) => result.inserted += count,
517                Outcome::Err(e) => return Outcome::Err(e),
518                Outcome::Cancelled(r) => return Outcome::Cancelled(r),
519                Outcome::Panicked(p) => return Outcome::Panicked(p),
520            }
521        }
522
523        // 3. Execute updates (one at a time - different columns may be dirty)
524        for op in &self.updates {
525            match Self::execute_update(cx, conn, op).await {
526                Outcome::Ok(()) => result.updated += 1,
527                Outcome::Err(e) => return Outcome::Err(e),
528                Outcome::Cancelled(r) => return Outcome::Cancelled(r),
529                Outcome::Panicked(p) => return Outcome::Panicked(p),
530            }
531        }
532
533        tracing::info!(
534            elapsed_ms = start.elapsed().as_millis(),
535            inserted = result.inserted,
536            updated = result.updated,
537            deleted = result.deleted,
538            "Flush complete"
539        );
540
541        Outcome::Ok(result)
542    }
543
544    /// Group operations by table name.
545    fn batch_by_table(ops: &[PendingOp]) -> Vec<Vec<&PendingOp>> {
546        if ops.is_empty() {
547            return Vec::new();
548        }
549
550        let mut batches: Vec<Vec<&PendingOp>> = Vec::new();
551        let mut current_table: Option<&'static str> = None;
552        let mut current_batch: Vec<&PendingOp> = Vec::new();
553
554        for op in ops {
555            let table = op.table();
556            if current_table == Some(table) {
557                current_batch.push(op);
558            } else {
559                if !current_batch.is_empty() {
560                    batches.push(current_batch);
561                }
562                current_batch = vec![op];
563                current_table = Some(table);
564            }
565        }
566
567        if !current_batch.is_empty() {
568            batches.push(current_batch);
569        }
570
571        batches
572    }
573
574    /// Execute a batch of insert operations.
575    #[tracing::instrument(level = "debug", skip(cx, conn, ops))]
576    async fn execute_insert_batch<C: Connection>(
577        cx: &Cx,
578        conn: &C,
579        ops: &[&PendingOp],
580    ) -> Outcome<usize, Error> {
581        if ops.is_empty() {
582            return Outcome::Ok(0);
583        }
584
585        let table = ops[0].table();
586        let PendingOp::Insert { columns, .. } = ops[0] else {
587            return Outcome::Ok(0);
588        };
589
590        tracing::debug!(table = table, count = ops.len(), "Executing insert batch");
591
592        // Build multi-row INSERT SQL
593        // INSERT INTO table ("col1", "col2") VALUES ($1, $2), ($3, $4), ...
594        let col_list: String = columns
595            .iter()
596            .map(|c| quote_ident(c))
597            .collect::<Vec<_>>()
598            .join(", ");
599
600        let mut sql = format!("INSERT INTO {} ({}) VALUES ", quote_ident(table), col_list);
601        let mut params: Vec<Value> = Vec::new();
602        let mut param_idx = 1;
603
604        for (i, op) in ops.iter().enumerate() {
605            if let PendingOp::Insert { values, .. } = op {
606                if i > 0 {
607                    sql.push_str(", ");
608                }
609                let placeholders: Vec<String> = (0..values.len())
610                    .map(|_| {
611                        let p = format!("${}", param_idx);
612                        param_idx += 1;
613                        p
614                    })
615                    .collect();
616                sql.push('(');
617                sql.push_str(&placeholders.join(", "));
618                sql.push(')');
619                params.extend(values.iter().cloned());
620            }
621        }
622
623        match conn.execute(cx, &sql, &params).await {
624            Outcome::Ok(_) => Outcome::Ok(ops.len()),
625            Outcome::Err(e) => Outcome::Err(e),
626            Outcome::Cancelled(r) => Outcome::Cancelled(r),
627            Outcome::Panicked(p) => Outcome::Panicked(p),
628        }
629    }
630
631    /// Execute a batch of delete operations.
632    #[tracing::instrument(level = "debug", skip(cx, conn, ops))]
633    async fn execute_delete_batch<C: Connection>(
634        cx: &Cx,
635        conn: &C,
636        ops: &[&PendingOp],
637    ) -> Outcome<usize, Error> {
638        if ops.is_empty() {
639            return Outcome::Ok(0);
640        }
641
642        let table = ops[0].table();
643        let PendingOp::Delete { pk_columns, .. } = ops[0] else {
644            return Outcome::Ok(0);
645        };
646
647        // Skip if no primary key columns - cannot safely DELETE without WHERE clause
648        if pk_columns.is_empty() {
649            tracing::warn!(
650                table = table,
651                count = ops.len(),
652                "Skipping DELETE batch for table without primary key - cannot identify rows"
653            );
654            return Outcome::Ok(0);
655        }
656
657        tracing::debug!(table = table, count = ops.len(), "Executing delete batch");
658
659        // For simple single-column PK, use IN clause
660        // DELETE FROM table WHERE pk IN ($1, $2, $3, ...)
661        if pk_columns.len() == 1 {
662            let pk_col = pk_columns[0];
663            let mut params: Vec<Value> = Vec::new();
664            let placeholders: Vec<String> = ops
665                .iter()
666                .filter_map(|op| {
667                    if let PendingOp::Delete { pk_values, .. } = op {
668                        if let Some(pk) = pk_values.first() {
669                            params.push(pk.clone());
670                            // Use params.len() for correct placeholder index after push
671                            return Some(format!("${}", params.len()));
672                        }
673                    }
674                    None
675                })
676                .collect();
677
678            if placeholders.is_empty() {
679                return Outcome::Ok(0);
680            }
681
682            let actual_count = params.len();
683            let sql = format!(
684                "DELETE FROM {} WHERE {} IN ({})",
685                quote_ident(table),
686                quote_ident(pk_col),
687                placeholders.join(", ")
688            );
689
690            match conn.execute(cx, &sql, &params).await {
691                // Return actual count of items in IN clause, not ops.len()
692                // (some ops may have been filtered out due to empty pk_values)
693                Outcome::Ok(_) => Outcome::Ok(actual_count),
694                Outcome::Err(e) => Outcome::Err(e),
695                Outcome::Cancelled(r) => Outcome::Cancelled(r),
696                Outcome::Panicked(p) => Outcome::Panicked(p),
697            }
698        } else {
699            // Composite PK: execute individual deletes
700            let mut deleted = 0;
701            for op in ops {
702                if let PendingOp::Delete {
703                    pk_columns,
704                    pk_values,
705                    ..
706                } = op
707                {
708                    // Skip if pk_values is empty - would cause parameter mismatch
709                    if pk_values.is_empty() {
710                        tracing::warn!(
711                            table = table,
712                            "Skipping DELETE for row with empty primary key values"
713                        );
714                        continue;
715                    }
716
717                    let where_clause: String = pk_columns
718                        .iter()
719                        .enumerate()
720                        .map(|(i, col)| format!("{} = ${}", quote_ident(col), i + 1))
721                        .collect::<Vec<_>>()
722                        .join(" AND ");
723
724                    let sql = format!("DELETE FROM {} WHERE {}", quote_ident(table), where_clause);
725
726                    match conn.execute(cx, &sql, pk_values).await {
727                        Outcome::Ok(_) => deleted += 1,
728                        Outcome::Err(e) => return Outcome::Err(e),
729                        Outcome::Cancelled(r) => return Outcome::Cancelled(r),
730                        Outcome::Panicked(p) => return Outcome::Panicked(p),
731                    }
732                }
733            }
734            Outcome::Ok(deleted)
735        }
736    }
737
738    /// Execute a single update operation.
739    #[tracing::instrument(level = "debug", skip(cx, conn, op))]
740    async fn execute_update<C: Connection>(
741        cx: &Cx,
742        conn: &C,
743        op: &PendingOp,
744    ) -> Outcome<(), Error> {
745        let PendingOp::Update {
746            table,
747            pk_columns,
748            pk_values,
749            set_columns,
750            set_values,
751            ..
752        } = op
753        else {
754            return Outcome::Ok(());
755        };
756
757        // Skip if no primary key columns/values - cannot safely UPDATE without WHERE clause
758        if pk_columns.is_empty() || pk_values.is_empty() {
759            tracing::warn!(
760                table = *table,
761                "Skipping UPDATE for row without primary key - cannot identify row"
762            );
763            return Outcome::Ok(());
764        }
765
766        if set_columns.is_empty() {
767            return Outcome::Ok(());
768        }
769
770        tracing::debug!(
771            table = *table,
772            columns = ?set_columns,
773            "Executing update"
774        );
775
776        // UPDATE table SET col1 = $1, col2 = $2 WHERE pk = $3
777        let mut param_idx = 1;
778        let set_clause: String = set_columns
779            .iter()
780            .map(|col| {
781                let clause = format!("{} = ${}", quote_ident(col), param_idx);
782                param_idx += 1;
783                clause
784            })
785            .collect::<Vec<_>>()
786            .join(", ");
787
788        let where_clause: String = pk_columns
789            .iter()
790            .map(|col| {
791                let clause = format!("{} = ${}", quote_ident(col), param_idx);
792                param_idx += 1;
793                clause
794            })
795            .collect::<Vec<_>>()
796            .join(" AND ");
797
798        let sql = format!(
799            "UPDATE {} SET {} WHERE {}",
800            quote_ident(table),
801            set_clause,
802            where_clause
803        );
804
805        let mut params: Vec<Value> = set_values.clone();
806        params.extend(pk_values.iter().cloned());
807
808        match conn.execute(cx, &sql, &params).await {
809            Outcome::Ok(_) => Outcome::Ok(()),
810            Outcome::Err(e) => Outcome::Err(e),
811            Outcome::Cancelled(r) => Outcome::Cancelled(r),
812            Outcome::Panicked(p) => Outcome::Panicked(p),
813        }
814    }
815}
816
817/// Result of a flush operation.
818#[derive(Debug, Default, Clone, Copy)]
819pub struct FlushResult {
820    /// Number of rows inserted.
821    pub inserted: usize,
822    /// Number of rows updated.
823    pub updated: usize,
824    /// Number of rows deleted.
825    pub deleted: usize,
826}
827
828impl FlushResult {
829    /// Create a new empty result.
830    pub fn new() -> Self {
831        Self::default()
832    }
833
834    /// Total number of operations performed.
835    pub fn total(&self) -> usize {
836        self.inserted + self.updated + self.deleted
837    }
838}
839
840#[cfg(test)]
841mod tests {
842    use super::*;
843    use sqlmodel_core::{FieldInfo, Row};
844    use std::any::TypeId;
845
846    // Mock models for testing
847    struct Team;
848    struct Hero;
849
850    impl Model for Team {
851        const TABLE_NAME: &'static str = "teams";
852        const PRIMARY_KEY: &'static [&'static str] = &["id"];
853
854        fn fields() -> &'static [FieldInfo] {
855            static FIELDS: [FieldInfo; 1] =
856                [FieldInfo::new("id", "id", sqlmodel_core::SqlType::BigInt)
857                    .primary_key(true)
858                    .auto_increment(true)];
859            &FIELDS
860        }
861
862        fn primary_key_value(&self) -> Vec<Value> {
863            vec![]
864        }
865
866        fn from_row(_row: &Row) -> Result<Self, sqlmodel_core::Error> {
867            Ok(Team)
868        }
869
870        fn to_row(&self) -> Vec<(&'static str, Value)> {
871            vec![]
872        }
873
874        fn is_new(&self) -> bool {
875            true
876        }
877    }
878
879    impl Model for Hero {
880        const TABLE_NAME: &'static str = "heroes";
881        const PRIMARY_KEY: &'static [&'static str] = &["id"];
882
883        fn fields() -> &'static [FieldInfo] {
884            static FIELDS: [FieldInfo; 2] = [
885                FieldInfo::new("id", "id", sqlmodel_core::SqlType::BigInt)
886                    .primary_key(true)
887                    .auto_increment(true),
888                FieldInfo::new("team_id", "team_id", sqlmodel_core::SqlType::BigInt)
889                    .nullable(true)
890                    .foreign_key("teams.id"),
891            ];
892            &FIELDS
893        }
894
895        fn primary_key_value(&self) -> Vec<Value> {
896            vec![]
897        }
898
899        fn from_row(_row: &Row) -> Result<Self, sqlmodel_core::Error> {
900            Ok(Hero)
901        }
902
903        fn to_row(&self) -> Vec<(&'static str, Value)> {
904            vec![]
905        }
906
907        fn is_new(&self) -> bool {
908            true
909        }
910    }
911
912    fn make_insert(table: &'static str, pk: i64) -> PendingOp {
913        PendingOp::Insert {
914            key: ObjectKey {
915                type_id: TypeId::of::<()>(),
916                pk_hash: pk as u64,
917            },
918            table,
919            columns: vec!["id", "name"],
920            values: vec![Value::BigInt(pk), Value::Text("Test".to_string())],
921        }
922    }
923
924    fn make_delete(table: &'static str, pk: i64) -> PendingOp {
925        PendingOp::Delete {
926            key: ObjectKey {
927                type_id: TypeId::of::<()>(),
928                pk_hash: pk as u64,
929            },
930            table,
931            pk_columns: vec!["id"],
932            pk_values: vec![Value::BigInt(pk)],
933        }
934    }
935
936    fn make_update(table: &'static str, pk: i64) -> PendingOp {
937        PendingOp::Update {
938            key: ObjectKey {
939                type_id: TypeId::of::<()>(),
940                pk_hash: pk as u64,
941            },
942            table,
943            pk_columns: vec!["id"],
944            pk_values: vec![Value::BigInt(pk)],
945            set_columns: vec!["name"],
946            set_values: vec![Value::Text("Updated".to_string())],
947        }
948    }
949
950    #[test]
951    fn test_pending_op_table_accessor() {
952        let insert = make_insert("teams", 1);
953        assert_eq!(insert.table(), "teams");
954
955        let delete = make_delete("heroes", 2);
956        assert_eq!(delete.table(), "heroes");
957
958        let update = make_update("teams", 3);
959        assert_eq!(update.table(), "teams");
960    }
961
962    #[test]
963    fn test_pending_op_type_checks() {
964        let insert = make_insert("teams", 1);
965        assert!(insert.is_insert());
966        assert!(!insert.is_update());
967        assert!(!insert.is_delete());
968
969        let update = make_update("teams", 1);
970        assert!(update.is_update());
971        assert!(!update.is_insert());
972        assert!(!update.is_delete());
973
974        let delete = make_delete("teams", 1);
975        assert!(delete.is_delete());
976        assert!(!delete.is_insert());
977        assert!(!delete.is_update());
978    }
979
980    #[test]
981    fn test_orderer_simple_no_deps() {
982        let orderer = FlushOrderer::new();
983        let ops = vec![
984            make_insert("teams", 1),
985            make_insert("teams", 2),
986            make_delete("teams", 3),
987        ];
988
989        let plan = orderer.order(ops);
990        assert_eq!(plan.inserts.len(), 2);
991        assert_eq!(plan.deletes.len(), 1);
992        assert_eq!(plan.updates.len(), 0);
993    }
994
995    #[test]
996    fn test_orderer_parent_child_inserts() {
997        let mut orderer = FlushOrderer::new();
998        orderer.register_model::<Team>();
999        orderer.register_model::<Hero>();
1000
1001        // Add child insert first, then parent
1002        let ops = vec![
1003            make_insert("heroes", 1), // Has FK to teams
1004            make_insert("teams", 1),  // No FK
1005        ];
1006
1007        let plan = orderer.order(ops);
1008
1009        // Teams should be first (fewer deps)
1010        assert_eq!(plan.inserts[0].table(), "teams");
1011        assert_eq!(plan.inserts[1].table(), "heroes");
1012    }
1013
1014    #[test]
1015    fn test_orderer_parent_child_deletes() {
1016        let mut orderer = FlushOrderer::new();
1017        orderer.register_model::<Team>();
1018        orderer.register_model::<Hero>();
1019
1020        // Add parent delete first, then child
1021        let ops = vec![
1022            make_delete("teams", 1),  // No FK
1023            make_delete("heroes", 1), // Has FK to teams
1024        ];
1025
1026        let plan = orderer.order(ops);
1027
1028        // Heroes should be first (more deps = delete first)
1029        assert_eq!(plan.deletes[0].table(), "heroes");
1030        assert_eq!(plan.deletes[1].table(), "teams");
1031    }
1032
1033    #[test]
1034    fn test_batch_by_table_groups_correctly() {
1035        let ops = vec![
1036            make_insert("teams", 1),
1037            make_insert("teams", 2),
1038            make_insert("heroes", 1),
1039            make_insert("heroes", 2),
1040            make_insert("teams", 3),
1041        ];
1042
1043        let batches = FlushPlan::batch_by_table(&ops);
1044
1045        // Should group consecutive same-table ops
1046        assert_eq!(batches.len(), 3);
1047        assert_eq!(batches[0].len(), 2); // teams 1, 2
1048        assert_eq!(batches[1].len(), 2); // heroes 1, 2
1049        assert_eq!(batches[2].len(), 1); // teams 3
1050    }
1051
1052    #[test]
1053    fn test_batch_empty_returns_empty() {
1054        let ops: Vec<PendingOp> = vec![];
1055        let batches = FlushPlan::batch_by_table(&ops);
1056        assert!(batches.is_empty());
1057    }
1058
1059    #[test]
1060    fn test_flush_plan_is_empty() {
1061        let plan = FlushPlan::new();
1062        assert!(plan.is_empty());
1063        assert_eq!(plan.len(), 0);
1064    }
1065
1066    #[test]
1067    fn test_flush_plan_len() {
1068        let plan = FlushPlan {
1069            deletes: vec![make_delete("teams", 1)],
1070            inserts: vec![make_insert("teams", 1), make_insert("teams", 2)],
1071            updates: vec![make_update("teams", 1)],
1072        };
1073        assert!(!plan.is_empty());
1074        assert_eq!(plan.len(), 4);
1075    }
1076
1077    #[test]
1078    fn test_flush_result_total() {
1079        let result = FlushResult {
1080            inserted: 5,
1081            updated: 3,
1082            deleted: 2,
1083        };
1084        assert_eq!(result.total(), 10);
1085    }
1086
1087    #[test]
1088    fn test_flush_result_default() {
1089        let result = FlushResult::new();
1090        assert_eq!(result.inserted, 0);
1091        assert_eq!(result.updated, 0);
1092        assert_eq!(result.deleted, 0);
1093        assert_eq!(result.total(), 0);
1094    }
1095
1096    // ========================================================================
1097    // Link Table Operation Tests
1098    // ========================================================================
1099
1100    #[test]
1101    fn test_link_table_op_link_constructor() {
1102        let op = LinkTableOp::link(
1103            "hero_powers".to_string(),
1104            "hero_id".to_string(),
1105            Value::BigInt(1),
1106            "power_id".to_string(),
1107            Value::BigInt(5),
1108        );
1109
1110        match op {
1111            LinkTableOp::Link {
1112                table,
1113                local_column,
1114                local_value,
1115                remote_column,
1116                remote_value,
1117            } => {
1118                assert_eq!(table, "hero_powers");
1119                assert_eq!(local_column, "hero_id");
1120                assert_eq!(local_value, Value::BigInt(1));
1121                assert_eq!(remote_column, "power_id");
1122                assert_eq!(remote_value, Value::BigInt(5));
1123            }
1124            LinkTableOp::Unlink { .. } => std::panic::panic_any("Expected Link variant"),
1125        }
1126    }
1127
1128    #[test]
1129    fn test_link_table_op_unlink_constructor() {
1130        let op = LinkTableOp::unlink(
1131            "hero_powers".to_string(),
1132            "hero_id".to_string(),
1133            Value::BigInt(1),
1134            "power_id".to_string(),
1135            Value::BigInt(5),
1136        );
1137
1138        match op {
1139            LinkTableOp::Unlink {
1140                table,
1141                local_column,
1142                local_value,
1143                remote_column,
1144                remote_value,
1145            } => {
1146                assert_eq!(table, "hero_powers");
1147                assert_eq!(local_column, "hero_id");
1148                assert_eq!(local_value, Value::BigInt(1));
1149                assert_eq!(remote_column, "power_id");
1150                assert_eq!(remote_value, Value::BigInt(5));
1151            }
1152            LinkTableOp::Link { .. } => std::panic::panic_any("Expected Unlink variant"),
1153        }
1154    }
1155
1156    #[test]
1157    fn test_link_table_op_is_link() {
1158        let link = LinkTableOp::link(
1159            "t".to_string(),
1160            "a".to_string(),
1161            Value::BigInt(1),
1162            "b".to_string(),
1163            Value::BigInt(2),
1164        );
1165        let unlink = LinkTableOp::unlink(
1166            "t".to_string(),
1167            "a".to_string(),
1168            Value::BigInt(1),
1169            "b".to_string(),
1170            Value::BigInt(2),
1171        );
1172
1173        assert!(matches!(link, LinkTableOp::Link { .. }));
1174        assert!(matches!(unlink, LinkTableOp::Unlink { .. }));
1175    }
1176
1177    #[test]
1178    fn test_link_table_op_debug_format() {
1179        let link = LinkTableOp::link(
1180            "hero_powers".to_string(),
1181            "hero_id".to_string(),
1182            Value::BigInt(1),
1183            "power_id".to_string(),
1184            Value::BigInt(5),
1185        );
1186        let debug_str = format!("{:?}", link);
1187        assert!(debug_str.contains("Link"));
1188        assert!(debug_str.contains("hero_powers"));
1189    }
1190
1191    #[test]
1192    fn test_link_table_op_clone() {
1193        let op = LinkTableOp::link(
1194            "hero_powers".to_string(),
1195            "hero_id".to_string(),
1196            Value::BigInt(1),
1197            "power_id".to_string(),
1198            Value::BigInt(5),
1199        );
1200        let cloned = op.clone();
1201
1202        match (op, cloned) {
1203            (
1204                LinkTableOp::Link {
1205                    table: t1,
1206                    local_value: lv1,
1207                    remote_value: rv1,
1208                    ..
1209                },
1210                LinkTableOp::Link {
1211                    table: t2,
1212                    local_value: lv2,
1213                    remote_value: rv2,
1214                    ..
1215                },
1216            ) => {
1217                assert_eq!(t1, t2);
1218                assert_eq!(lv1, lv2);
1219                assert_eq!(rv1, rv2);
1220            }
1221            _ => std::panic::panic_any("Clone should preserve variant"),
1222        }
1223    }
1224
1225    #[test]
1226    fn test_link_table_ops_empty_vec() {
1227        // Test that an empty ops vec handles correctly
1228        let ops: Vec<LinkTableOp> = vec![];
1229        assert!(ops.is_empty());
1230    }
1231
1232    #[test]
1233    fn test_link_table_ops_multiple_operations() {
1234        let ops = [
1235            LinkTableOp::link(
1236                "hero_powers".to_string(),
1237                "hero_id".to_string(),
1238                Value::BigInt(1),
1239                "power_id".to_string(),
1240                Value::BigInt(1),
1241            ),
1242            LinkTableOp::link(
1243                "hero_powers".to_string(),
1244                "hero_id".to_string(),
1245                Value::BigInt(1),
1246                "power_id".to_string(),
1247                Value::BigInt(2),
1248            ),
1249            LinkTableOp::unlink(
1250                "hero_powers".to_string(),
1251                "hero_id".to_string(),
1252                Value::BigInt(1),
1253                "power_id".to_string(),
1254                Value::BigInt(3),
1255            ),
1256        ];
1257
1258        let links: Vec<_> = ops
1259            .iter()
1260            .filter(|o| matches!(o, LinkTableOp::Link { .. }))
1261            .collect();
1262        let unlinks: Vec<_> = ops
1263            .iter()
1264            .filter(|o| matches!(o, LinkTableOp::Unlink { .. }))
1265            .collect();
1266
1267        assert_eq!(links.len(), 2);
1268        assert_eq!(unlinks.len(), 1);
1269    }
1270
1271    #[test]
1272    fn test_link_table_op_with_different_value_types() {
1273        // Test with string values
1274        let op_str = LinkTableOp::link(
1275            "tag_items".to_string(),
1276            "tag_id".to_string(),
1277            Value::Text("tag-uuid-123".to_string()),
1278            "item_id".to_string(),
1279            Value::Text("item-uuid-456".to_string()),
1280        );
1281
1282        match op_str {
1283            LinkTableOp::Link {
1284                local_value,
1285                remote_value,
1286                ..
1287            } => {
1288                assert!(matches!(local_value, Value::Text(_)));
1289                assert!(matches!(remote_value, Value::Text(_)));
1290            }
1291            LinkTableOp::Unlink { .. } => std::panic::panic_any("Expected Link"),
1292        }
1293
1294        // Test with integer values
1295        let op_int = LinkTableOp::link(
1296            "user_roles".to_string(),
1297            "user_id".to_string(),
1298            Value::Int(42),
1299            "role_id".to_string(),
1300            Value::Int(7),
1301        );
1302
1303        match op_int {
1304            LinkTableOp::Link {
1305                local_value,
1306                remote_value,
1307                ..
1308            } => {
1309                assert!(matches!(local_value, Value::Int(_)));
1310                assert!(matches!(remote_value, Value::Int(_)));
1311            }
1312            LinkTableOp::Unlink { .. } => std::panic::panic_any("Expected Link"),
1313        }
1314    }
1315
1316    // ================================================================================
1317    // DML Identifier Quoting Integration Tests
1318    // ================================================================================
1319
1320    // Helper to create PendingOp::Insert with custom names
1321    fn make_custom_insert(table: &'static str, columns: Vec<&'static str>, pk: i64) -> PendingOp {
1322        PendingOp::Insert {
1323            key: ObjectKey {
1324                type_id: TypeId::of::<()>(),
1325                pk_hash: pk as u64,
1326            },
1327            table,
1328            columns,
1329            values: vec![Value::BigInt(pk), Value::Text("Test".to_string())],
1330        }
1331    }
1332
1333    // Helper to create PendingOp::Delete with custom pk columns
1334    fn make_custom_delete(
1335        table: &'static str,
1336        pk_columns: Vec<&'static str>,
1337        pk: i64,
1338    ) -> PendingOp {
1339        PendingOp::Delete {
1340            key: ObjectKey {
1341                type_id: TypeId::of::<()>(),
1342                pk_hash: pk as u64,
1343            },
1344            table,
1345            pk_columns,
1346            pk_values: vec![Value::BigInt(pk)],
1347        }
1348    }
1349
1350    // Helper to create PendingOp::Update with custom column names
1351    fn make_custom_update(
1352        table: &'static str,
1353        pk_columns: Vec<&'static str>,
1354        set_columns: Vec<&'static str>,
1355        pk: i64,
1356    ) -> PendingOp {
1357        PendingOp::Update {
1358            key: ObjectKey {
1359                type_id: TypeId::of::<()>(),
1360                pk_hash: pk as u64,
1361            },
1362            table,
1363            pk_columns,
1364            pk_values: vec![Value::BigInt(pk)],
1365            set_columns,
1366            set_values: vec![Value::Text("Updated".to_string())],
1367        }
1368    }
1369
1370    // ------ LinkTableOp SQL Generation Tests ------
1371
1372    #[test]
1373    fn test_link_table_op_to_sql_simple() {
1374        let op = LinkTableOp::link(
1375            "hero_powers".to_string(),
1376            "hero_id".to_string(),
1377            Value::BigInt(1),
1378            "power_id".to_string(),
1379            Value::BigInt(5),
1380        );
1381        let sql = op.to_sql();
1382        assert_eq!(
1383            sql,
1384            "INSERT INTO \"hero_powers\" (\"hero_id\", \"power_id\") VALUES ($1, $2)"
1385        );
1386    }
1387
1388    #[test]
1389    fn test_link_table_op_to_sql_with_keywords() {
1390        let op = LinkTableOp::link(
1391            "order".to_string(),  // SQL keyword table
1392            "select".to_string(), // SQL keyword column
1393            Value::BigInt(1),
1394            "from".to_string(), // SQL keyword column
1395            Value::BigInt(2),
1396        );
1397        let sql = op.to_sql();
1398        assert_eq!(
1399            sql,
1400            "INSERT INTO \"order\" (\"select\", \"from\") VALUES ($1, $2)"
1401        );
1402    }
1403
1404    #[test]
1405    fn test_link_table_op_to_sql_with_embedded_quotes() {
1406        let op = LinkTableOp::link(
1407            "my\"table".to_string(),
1408            "col\"a".to_string(),
1409            Value::BigInt(1),
1410            "col\"b".to_string(),
1411            Value::BigInt(2),
1412        );
1413        let sql = op.to_sql();
1414        assert_eq!(
1415            sql,
1416            "INSERT INTO \"my\"\"table\" (\"col\"\"a\", \"col\"\"b\") VALUES ($1, $2)"
1417        );
1418    }
1419
1420    #[test]
1421    fn test_link_table_op_unlink_to_sql_with_keywords() {
1422        let op = LinkTableOp::unlink(
1423            "user".to_string(),
1424            "index".to_string(),
1425            Value::BigInt(1),
1426            "key".to_string(),
1427            Value::BigInt(2),
1428        );
1429        let sql = op.to_sql();
1430        assert_eq!(
1431            sql,
1432            "DELETE FROM \"user\" WHERE \"index\" = $1 AND \"key\" = $2"
1433        );
1434    }
1435
1436    #[test]
1437    fn test_link_table_op_to_sql_with_unicode() {
1438        let op = LinkTableOp::link(
1439            "用户表".to_string(),
1440            "用户id".to_string(),
1441            Value::BigInt(1),
1442            "角色id".to_string(),
1443            Value::BigInt(2),
1444        );
1445        let sql = op.to_sql();
1446        assert_eq!(
1447            sql,
1448            "INSERT INTO \"用户表\" (\"用户id\", \"角色id\") VALUES ($1, $2)"
1449        );
1450    }
1451
1452    #[test]
1453    fn test_link_table_op_to_sql_with_spaces() {
1454        let op = LinkTableOp::link(
1455            "link table".to_string(),
1456            "local id".to_string(),
1457            Value::BigInt(1),
1458            "remote id".to_string(),
1459            Value::BigInt(2),
1460        );
1461        let sql = op.to_sql();
1462        assert_eq!(
1463            sql,
1464            "INSERT INTO \"link table\" (\"local id\", \"remote id\") VALUES ($1, $2)"
1465        );
1466    }
1467
1468    // ------ PendingOp::Insert SQL Generation Tests ------
1469
1470    #[test]
1471    fn test_pending_op_insert_to_sql_simple() {
1472        let op = make_insert("teams", 1);
1473        let sql = op.to_sql();
1474        assert!(sql.starts_with("INSERT INTO \"teams\""));
1475        assert!(sql.contains("(\"id\", \"name\")"));
1476        assert!(sql.contains("VALUES ($1, $2)"));
1477    }
1478
1479    #[test]
1480    fn test_pending_op_insert_to_sql_with_keyword_table() {
1481        let op = make_custom_insert("order", vec!["id", "select"], 1);
1482        let sql = op.to_sql();
1483        assert_eq!(
1484            sql,
1485            "INSERT INTO \"order\" (\"id\", \"select\") VALUES ($1, $2)"
1486        );
1487    }
1488
1489    #[test]
1490    fn test_pending_op_insert_to_sql_with_quoted_names() {
1491        let op = make_custom_insert("my\"table", vec!["pk\"id", "data\"col"], 1);
1492        let sql = op.to_sql();
1493        assert_eq!(
1494            sql,
1495            "INSERT INTO \"my\"\"table\" (\"pk\"\"id\", \"data\"\"col\") VALUES ($1, $2)"
1496        );
1497    }
1498
1499    // ------ PendingOp::Delete SQL Generation Tests ------
1500
1501    #[test]
1502    fn test_pending_op_delete_to_sql_single_pk() {
1503        let op = make_delete("teams", 1);
1504        let sql = op.to_sql();
1505        assert_eq!(sql, "DELETE FROM \"teams\" WHERE \"id\" IN ($1)");
1506    }
1507
1508    #[test]
1509    fn test_pending_op_delete_to_sql_with_keyword_table() {
1510        let op = make_custom_delete("order", vec!["id"], 1);
1511        let sql = op.to_sql();
1512        assert_eq!(sql, "DELETE FROM \"order\" WHERE \"id\" IN ($1)");
1513    }
1514
1515    #[test]
1516    fn test_pending_op_delete_to_sql_composite_pk() {
1517        let op = PendingOp::Delete {
1518            key: ObjectKey {
1519                type_id: TypeId::of::<()>(),
1520                pk_hash: 1,
1521            },
1522            table: "order_items",
1523            pk_columns: vec!["order_id", "item_id"],
1524            pk_values: vec![Value::BigInt(1), Value::BigInt(2)],
1525        };
1526        let sql = op.to_sql();
1527        assert_eq!(
1528            sql,
1529            "DELETE FROM \"order_items\" WHERE \"order_id\" = $1 AND \"item_id\" = $2"
1530        );
1531    }
1532
1533    #[test]
1534    fn test_pending_op_delete_to_sql_with_keyword_pk_columns() {
1535        let op = PendingOp::Delete {
1536            key: ObjectKey {
1537                type_id: TypeId::of::<()>(),
1538                pk_hash: 1,
1539            },
1540            table: "user",
1541            pk_columns: vec!["select", "from"],
1542            pk_values: vec![Value::BigInt(1), Value::BigInt(2)],
1543        };
1544        let sql = op.to_sql();
1545        assert_eq!(
1546            sql,
1547            "DELETE FROM \"user\" WHERE \"select\" = $1 AND \"from\" = $2"
1548        );
1549    }
1550
1551    // ------ PendingOp::Update SQL Generation Tests ------
1552
1553    #[test]
1554    fn test_pending_op_update_to_sql_simple() {
1555        let op = make_update("teams", 1);
1556        let sql = op.to_sql();
1557        assert_eq!(sql, "UPDATE \"teams\" SET \"name\" = $1 WHERE \"id\" = $2");
1558    }
1559
1560    #[test]
1561    fn test_pending_op_update_to_sql_with_keyword_names() {
1562        let op = make_custom_update("order", vec!["id"], vec!["select", "from"], 1);
1563        let sql = op.to_sql();
1564        assert_eq!(
1565            sql,
1566            "UPDATE \"order\" SET \"select\" = $1, \"from\" = $2 WHERE \"id\" = $3"
1567        );
1568    }
1569
1570    #[test]
1571    fn test_pending_op_update_to_sql_with_quoted_names() {
1572        let op = make_custom_update("my\"table", vec!["pk\"id"], vec!["data\"col"], 1);
1573        let sql = op.to_sql();
1574        assert_eq!(
1575            sql,
1576            "UPDATE \"my\"\"table\" SET \"data\"\"col\" = $1 WHERE \"pk\"\"id\" = $2"
1577        );
1578    }
1579
1580    #[test]
1581    fn test_pending_op_update_to_sql_composite_pk() {
1582        let op = PendingOp::Update {
1583            key: ObjectKey {
1584                type_id: TypeId::of::<()>(),
1585                pk_hash: 1,
1586            },
1587            table: "order_items",
1588            pk_columns: vec!["order_id", "item_id"],
1589            pk_values: vec![Value::BigInt(1), Value::BigInt(2)],
1590            set_columns: vec!["quantity"],
1591            set_values: vec![Value::Int(5)],
1592        };
1593        let sql = op.to_sql();
1594        assert_eq!(
1595            sql,
1596            "UPDATE \"order_items\" SET \"quantity\" = $1 WHERE \"order_id\" = $2 AND \"item_id\" = $3"
1597        );
1598    }
1599
1600    // ------ SQL Injection Neutralization Tests ------
1601
1602    #[test]
1603    fn test_link_op_sql_injection_neutralized() {
1604        // Attempt SQL injection through table name
1605        let op = LinkTableOp::link(
1606            "links\"; DROP TABLE users; --".to_string(),
1607            "col1".to_string(),
1608            Value::BigInt(1),
1609            "col2".to_string(),
1610            Value::BigInt(2),
1611        );
1612        let sql = op.to_sql();
1613        // The embedded quote should be doubled, keeping everything as an identifier
1614        assert!(sql.contains("\"links\"\"; DROP TABLE users; --\""));
1615        // Count quotes - injection is neutralized
1616        assert!(sql.starts_with("INSERT INTO \""));
1617    }
1618
1619    #[test]
1620    fn test_pending_op_insert_sql_injection_neutralized() {
1621        let op = make_custom_insert("users\"; DROP TABLE secrets; --", vec!["id", "name"], 1);
1622        let sql = op.to_sql();
1623        // Injection attempt should be contained within quotes
1624        assert!(sql.contains("\"users\"\"; DROP TABLE secrets; --\""));
1625        assert!(sql.starts_with("INSERT INTO \""));
1626    }
1627
1628    #[test]
1629    fn test_pending_op_update_sql_injection_neutralized() {
1630        let op = make_custom_update("data", vec!["id"], vec!["col\"; DROP TABLE data; --"], 1);
1631        let sql = op.to_sql();
1632        // The malicious column name is safely quoted
1633        assert!(sql.contains("\"col\"\"; DROP TABLE data; --\""));
1634    }
1635
1636    // ------ Edge Cases ------
1637
1638    #[test]
1639    fn test_pending_op_insert_many_columns() {
1640        let op = PendingOp::Insert {
1641            key: ObjectKey {
1642                type_id: TypeId::of::<()>(),
1643                pk_hash: 1,
1644            },
1645            table: "wide_table",
1646            columns: vec!["a", "b", "c", "d", "e"],
1647            values: vec![
1648                Value::Int(1),
1649                Value::Int(2),
1650                Value::Int(3),
1651                Value::Int(4),
1652                Value::Int(5),
1653            ],
1654        };
1655        let sql = op.to_sql();
1656        assert_eq!(
1657            sql,
1658            "INSERT INTO \"wide_table\" (\"a\", \"b\", \"c\", \"d\", \"e\") VALUES ($1, $2, $3, $4, $5)"
1659        );
1660    }
1661
1662    #[test]
1663    fn test_pending_op_update_many_set_columns() {
1664        let op = PendingOp::Update {
1665            key: ObjectKey {
1666                type_id: TypeId::of::<()>(),
1667                pk_hash: 1,
1668            },
1669            table: "items",
1670            pk_columns: vec!["id"],
1671            pk_values: vec![Value::BigInt(1)],
1672            set_columns: vec!["a", "b", "c"],
1673            set_values: vec![Value::Int(1), Value::Int(2), Value::Int(3)],
1674        };
1675        let sql = op.to_sql();
1676        assert_eq!(
1677            sql,
1678            "UPDATE \"items\" SET \"a\" = $1, \"b\" = $2, \"c\" = $3 WHERE \"id\" = $4"
1679        );
1680    }
1681
1682    #[test]
1683    fn test_link_table_op_empty_strings() {
1684        // Edge case: empty identifiers (unusual but should still be quoted)
1685        let op = LinkTableOp::link(
1686            String::new(),
1687            String::new(),
1688            Value::BigInt(1),
1689            String::new(),
1690            Value::BigInt(2),
1691        );
1692        let sql = op.to_sql();
1693        assert_eq!(sql, "INSERT INTO \"\" (\"\", \"\") VALUES ($1, $2)");
1694    }
1695
1696    // ------ Invalid Operation Edge Cases ------
1697
1698    #[test]
1699    fn test_pending_op_delete_empty_pk_columns() {
1700        let op = PendingOp::Delete {
1701            key: ObjectKey {
1702                type_id: TypeId::of::<()>(),
1703                pk_hash: 1,
1704            },
1705            table: "orphan_table",
1706            pk_columns: vec![], // No PK columns - invalid!
1707            pk_values: vec![],
1708        };
1709        let sql = op.to_sql();
1710        // Should return error indicator, not invalid SQL
1711        assert!(sql.starts_with("-- ERROR:"));
1712        assert!(sql.contains("DELETE"));
1713        assert!(sql.contains("no pk_columns"));
1714    }
1715
1716    #[test]
1717    fn test_pending_op_update_empty_pk_columns() {
1718        let op = PendingOp::Update {
1719            key: ObjectKey {
1720                type_id: TypeId::of::<()>(),
1721                pk_hash: 1,
1722            },
1723            table: "orphan_table",
1724            pk_columns: vec![], // No PK columns - invalid!
1725            pk_values: vec![],
1726            set_columns: vec!["name"],
1727            set_values: vec![Value::Text("test".to_string())],
1728        };
1729        let sql = op.to_sql();
1730        // Should return error indicator, not invalid SQL
1731        assert!(sql.starts_with("-- ERROR:"));
1732        assert!(sql.contains("UPDATE"));
1733        assert!(sql.contains("no pk_columns"));
1734    }
1735
1736    #[test]
1737    fn test_pending_op_update_empty_set_columns() {
1738        let op = PendingOp::Update {
1739            key: ObjectKey {
1740                type_id: TypeId::of::<()>(),
1741                pk_hash: 1,
1742            },
1743            table: "nothing_to_update",
1744            pk_columns: vec!["id"],
1745            pk_values: vec![Value::BigInt(1)],
1746            set_columns: vec![], // No columns to set - invalid!
1747            set_values: vec![],
1748        };
1749        let sql = op.to_sql();
1750        // Should return error indicator, not invalid SQL
1751        assert!(sql.starts_with("-- ERROR:"));
1752        assert!(sql.contains("UPDATE"));
1753        assert!(sql.contains("no set_columns"));
1754    }
1755
1756    #[test]
1757    fn test_pending_op_insert_empty_columns() {
1758        let op = PendingOp::Insert {
1759            key: ObjectKey {
1760                type_id: TypeId::of::<()>(),
1761                pk_hash: 1,
1762            },
1763            table: "empty_insert",
1764            columns: vec![], // No columns - invalid!
1765            values: vec![],
1766        };
1767        let sql = op.to_sql();
1768        // Should return error indicator, not invalid SQL
1769        assert!(sql.starts_with("-- ERROR:"));
1770        assert!(sql.contains("INSERT"));
1771        assert!(sql.contains("no columns"));
1772    }
1773}