Skip to main content

sqlmodel_session/
flush.rs

1//! Flush operation ordering and batching for SQLModel Session.
2//!
3//! This module handles writing pending changes to the database in the correct order:
4//! - DELETE child-first (to respect FK constraints)
5//! - INSERT parent-first (to respect FK constraints)
6//! - UPDATE any order (no circular FK assumed)
7//!
8//! Operations are batched by table for performance.
9
10use crate::ObjectKey;
11use asupersync::{Cx, Outcome};
12use sqlmodel_core::{Connection, Error, Model, Value, quote_ident};
13use std::collections::HashMap;
14
15/// A pending database operation.
16#[derive(Debug, Clone)]
17pub enum PendingOp {
18    /// Insert a new row.
19    Insert {
20        /// Object key for identity map.
21        key: ObjectKey,
22        /// Table name.
23        table: &'static str,
24        /// Column names.
25        columns: Vec<&'static str>,
26        /// Values to insert.
27        values: Vec<Value>,
28    },
29    /// Update an existing row.
30    Update {
31        /// Object key for identity map.
32        key: ObjectKey,
33        /// Table name.
34        table: &'static str,
35        /// Primary key column names.
36        pk_columns: Vec<&'static str>,
37        /// Primary key values.
38        pk_values: Vec<Value>,
39        /// Columns to update (only dirty ones).
40        set_columns: Vec<&'static str>,
41        /// New values for dirty columns.
42        set_values: Vec<Value>,
43    },
44    /// Delete an existing row.
45    Delete {
46        /// Object key for identity map.
47        key: ObjectKey,
48        /// Table name.
49        table: &'static str,
50        /// Primary key column names.
51        pk_columns: Vec<&'static str>,
52        /// Primary key values.
53        pk_values: Vec<Value>,
54    },
55}
56
57/// A pending link table operation (for many-to-many relationships).
58#[derive(Debug, Clone)]
59pub enum LinkTableOp {
60    /// Insert a link (relationship).
61    Link {
62        /// Link table name.
63        table: String,
64        /// Local (parent) column names.
65        local_columns: Vec<String>,
66        /// Local (parent) PK values (must match local_columns).
67        local_values: Vec<Value>,
68        /// Remote (child) column names.
69        remote_columns: Vec<String>,
70        /// Remote (child) PK values (must match remote_columns).
71        remote_values: Vec<Value>,
72    },
73    /// Delete a link (relationship).
74    Unlink {
75        /// Link table name.
76        table: String,
77        /// Local (parent) column names.
78        local_columns: Vec<String>,
79        /// Local (parent) PK values (must match local_columns).
80        local_values: Vec<Value>,
81        /// Remote (child) column names.
82        remote_columns: Vec<String>,
83        /// Remote (child) PK values (must match remote_columns).
84        remote_values: Vec<Value>,
85    },
86}
87
88impl LinkTableOp {
89    /// Create a link operation.
90    pub fn link(
91        table: impl Into<String>,
92        local_column: impl Into<String>,
93        local_value: Value,
94        remote_column: impl Into<String>,
95        remote_value: Value,
96    ) -> Self {
97        Self::link_multi(
98            table,
99            vec![local_column.into()],
100            vec![local_value],
101            vec![remote_column.into()],
102            vec![remote_value],
103        )
104    }
105
106    /// Create an unlink operation.
107    pub fn unlink(
108        table: impl Into<String>,
109        local_column: impl Into<String>,
110        local_value: Value,
111        remote_column: impl Into<String>,
112        remote_value: Value,
113    ) -> Self {
114        Self::unlink_multi(
115            table,
116            vec![local_column.into()],
117            vec![local_value],
118            vec![remote_column.into()],
119            vec![remote_value],
120        )
121    }
122
123    /// Create a link operation for composite keys.
124    pub fn link_multi(
125        table: impl Into<String>,
126        local_columns: Vec<String>,
127        local_values: Vec<Value>,
128        remote_columns: Vec<String>,
129        remote_values: Vec<Value>,
130    ) -> Self {
131        Self::Link {
132            table: table.into(),
133            local_columns,
134            local_values,
135            remote_columns,
136            remote_values,
137        }
138    }
139
140    /// Create an unlink operation for composite keys.
141    pub fn unlink_multi(
142        table: impl Into<String>,
143        local_columns: Vec<String>,
144        local_values: Vec<Value>,
145        remote_columns: Vec<String>,
146        remote_values: Vec<Value>,
147    ) -> Self {
148        Self::Unlink {
149            table: table.into(),
150            local_columns,
151            local_values,
152            remote_columns,
153            remote_values,
154        }
155    }
156
157    /// Get the table name.
158    pub fn table(&self) -> &str {
159        match self {
160            LinkTableOp::Link { table, .. } => table,
161            LinkTableOp::Unlink { table, .. } => table,
162        }
163    }
164
165    /// Check if this is a link (insert) operation.
166    pub fn is_link(&self) -> bool {
167        matches!(self, LinkTableOp::Link { .. })
168    }
169
170    /// Check if this is an unlink (delete) operation.
171    pub fn is_unlink(&self) -> bool {
172        matches!(self, LinkTableOp::Unlink { .. })
173    }
174
175    /// Generate the SQL that would be executed for this operation.
176    ///
177    /// Useful for testing and debugging.
178    pub fn to_sql(&self) -> String {
179        match self {
180            LinkTableOp::Link {
181                table,
182                local_columns,
183                remote_columns,
184                ..
185            } => format!(
186                "INSERT INTO {} ({}) VALUES ({})",
187                quote_ident(table),
188                local_columns
189                    .iter()
190                    .chain(remote_columns.iter())
191                    .map(|c| quote_ident(c))
192                    .collect::<Vec<_>>()
193                    .join(", "),
194                (1..=(local_columns.len() + remote_columns.len()))
195                    .map(|i| format!("${}", i))
196                    .collect::<Vec<_>>()
197                    .join(", ")
198            ),
199            LinkTableOp::Unlink {
200                table,
201                local_columns,
202                remote_columns,
203                ..
204            } => format!(
205                "DELETE FROM {} WHERE {}",
206                quote_ident(table),
207                local_columns
208                    .iter()
209                    .chain(remote_columns.iter())
210                    .enumerate()
211                    .map(|(i, c)| format!("{} = ${}", quote_ident(c), i + 1))
212                    .collect::<Vec<_>>()
213                    .join(" AND ")
214            ),
215        }
216    }
217
218    /// Execute this link table operation.
219    #[tracing::instrument(level = "debug", skip(cx, conn))]
220    pub async fn execute<C: Connection>(&self, cx: &Cx, conn: &C) -> Outcome<(), Error> {
221        let dialect = conn.dialect();
222        match self {
223            LinkTableOp::Link {
224                table,
225                local_columns,
226                local_values,
227                remote_columns,
228                remote_values,
229            } => {
230                if local_columns.len() != local_values.len()
231                    || remote_columns.len() != remote_values.len()
232                {
233                    return Outcome::Err(Error::Custom(
234                        "link op columns/values length mismatch".to_string(),
235                    ));
236                }
237
238                let mut params: Vec<Value> =
239                    Vec::with_capacity(local_values.len() + remote_values.len());
240                params.extend(local_values.iter().cloned());
241                params.extend(remote_values.iter().cloned());
242
243                let col_list = local_columns
244                    .iter()
245                    .chain(remote_columns.iter())
246                    .map(|c| dialect.quote_identifier(c))
247                    .collect::<Vec<_>>()
248                    .join(", ");
249                let placeholders = (1..=params.len())
250                    .map(|i| dialect.placeholder(i))
251                    .collect::<Vec<_>>()
252                    .join(", ");
253                let sql = format!(
254                    "INSERT INTO {} ({}) VALUES ({})",
255                    dialect.quote_identifier(table),
256                    col_list,
257                    placeholders
258                );
259                tracing::trace!(sql = %sql, "Executing link INSERT");
260                conn.execute(cx, &sql, &params).await.map(|_| ())
261            }
262            LinkTableOp::Unlink {
263                table,
264                local_columns,
265                local_values,
266                remote_columns,
267                remote_values,
268            } => {
269                if local_columns.len() != local_values.len()
270                    || remote_columns.len() != remote_values.len()
271                {
272                    return Outcome::Err(Error::Custom(
273                        "unlink op columns/values length mismatch".to_string(),
274                    ));
275                }
276
277                let mut params: Vec<Value> =
278                    Vec::with_capacity(local_values.len() + remote_values.len());
279                params.extend(local_values.iter().cloned());
280                params.extend(remote_values.iter().cloned());
281
282                let where_clause = local_columns
283                    .iter()
284                    .chain(remote_columns.iter())
285                    .enumerate()
286                    .map(|(i, c)| {
287                        format!(
288                            "{} = {}",
289                            dialect.quote_identifier(c),
290                            dialect.placeholder(i + 1)
291                        )
292                    })
293                    .collect::<Vec<_>>()
294                    .join(" AND ");
295                let sql = format!(
296                    "DELETE FROM {} WHERE {}",
297                    dialect.quote_identifier(table),
298                    where_clause
299                );
300                tracing::trace!(sql = %sql, "Executing link DELETE");
301                conn.execute(cx, &sql, &params).await.map(|_| ())
302            }
303        }
304    }
305}
306
307/// Execute a batch of link table operations.
308#[tracing::instrument(level = "debug", skip(cx, conn, ops))]
309pub async fn execute_link_table_ops<C: Connection>(
310    cx: &Cx,
311    conn: &C,
312    ops: &[LinkTableOp],
313) -> Outcome<usize, Error> {
314    if ops.is_empty() {
315        return Outcome::Ok(0);
316    }
317
318    tracing::info!(count = ops.len(), "Executing link table operations");
319
320    let mut count = 0;
321    for op in ops {
322        match op.execute(cx, conn).await {
323            Outcome::Ok(()) => count += 1,
324            Outcome::Err(e) => return Outcome::Err(e),
325            Outcome::Cancelled(r) => return Outcome::Cancelled(r),
326            Outcome::Panicked(p) => return Outcome::Panicked(p),
327        }
328    }
329
330    tracing::debug!(executed = count, "Link table operations complete");
331    Outcome::Ok(count)
332}
333
334impl PendingOp {
335    /// Get the table name for this operation.
336    pub fn table(&self) -> &'static str {
337        match self {
338            PendingOp::Insert { table, .. } => table,
339            PendingOp::Update { table, .. } => table,
340            PendingOp::Delete { table, .. } => table,
341        }
342    }
343
344    /// Get the object key for this operation.
345    pub fn key(&self) -> ObjectKey {
346        match self {
347            PendingOp::Insert { key, .. } => *key,
348            PendingOp::Update { key, .. } => *key,
349            PendingOp::Delete { key, .. } => *key,
350        }
351    }
352
353    /// Check if this is an insert operation.
354    pub fn is_insert(&self) -> bool {
355        matches!(self, PendingOp::Insert { .. })
356    }
357
358    /// Check if this is an update operation.
359    pub fn is_update(&self) -> bool {
360        matches!(self, PendingOp::Update { .. })
361    }
362
363    /// Check if this is a delete operation.
364    pub fn is_delete(&self) -> bool {
365        matches!(self, PendingOp::Delete { .. })
366    }
367
368    /// Generate the SQL that would be executed for this operation.
369    ///
370    /// This is useful for testing and debugging. For INSERT, this generates
371    /// a single-row insert. For DELETE/UPDATE, the SQL matches what would be
372    /// executed for a single operation.
373    ///
374    /// Returns a descriptive error string for invalid operations (e.g., empty
375    /// pk_columns for DELETE/UPDATE, empty set_columns for UPDATE).
376    pub fn to_sql(&self) -> String {
377        match self {
378            PendingOp::Insert {
379                table,
380                columns,
381                values,
382                ..
383            } => {
384                if columns.is_empty() {
385                    return format!(
386                        "-- ERROR: INSERT INTO {} with no columns",
387                        quote_ident(table)
388                    );
389                }
390                let col_list: String = columns
391                    .iter()
392                    .map(|c| quote_ident(c))
393                    .collect::<Vec<_>>()
394                    .join(", ");
395                let placeholders: Vec<String> =
396                    (1..=values.len()).map(|i| format!("${}", i)).collect();
397                format!(
398                    "INSERT INTO {} ({}) VALUES ({})",
399                    quote_ident(table),
400                    col_list,
401                    placeholders.join(", ")
402                )
403            }
404            PendingOp::Delete {
405                table, pk_columns, ..
406            } => {
407                if pk_columns.is_empty() {
408                    return format!(
409                        "-- ERROR: DELETE FROM {} with no pk_columns",
410                        quote_ident(table)
411                    );
412                }
413                if pk_columns.len() == 1 {
414                    format!(
415                        "DELETE FROM {} WHERE {} IN ($1)",
416                        quote_ident(table),
417                        quote_ident(pk_columns[0])
418                    )
419                } else {
420                    let where_clause: String = pk_columns
421                        .iter()
422                        .enumerate()
423                        .map(|(i, col)| format!("{} = ${}", quote_ident(col), i + 1))
424                        .collect::<Vec<_>>()
425                        .join(" AND ");
426                    format!("DELETE FROM {} WHERE {}", quote_ident(table), where_clause)
427                }
428            }
429            PendingOp::Update {
430                table,
431                pk_columns,
432                set_columns,
433                ..
434            } => {
435                if pk_columns.is_empty() {
436                    return format!("-- ERROR: UPDATE {} with no pk_columns", quote_ident(table));
437                }
438                if set_columns.is_empty() {
439                    return format!(
440                        "-- ERROR: UPDATE {} with no set_columns",
441                        quote_ident(table)
442                    );
443                }
444                let mut param_idx = 1;
445                let set_clause: String = set_columns
446                    .iter()
447                    .map(|col| {
448                        let s = format!("{} = ${}", quote_ident(col), param_idx);
449                        param_idx += 1;
450                        s
451                    })
452                    .collect::<Vec<_>>()
453                    .join(", ");
454                let where_clause: String = pk_columns
455                    .iter()
456                    .map(|col| {
457                        let s = format!("{} = ${}", quote_ident(col), param_idx);
458                        param_idx += 1;
459                        s
460                    })
461                    .collect::<Vec<_>>()
462                    .join(" AND ");
463                format!(
464                    "UPDATE {} SET {} WHERE {}",
465                    quote_ident(table),
466                    set_clause,
467                    where_clause
468                )
469            }
470        }
471    }
472}
473
474/// Builds a dependency graph and orders operations for flush.
475///
476/// Uses table foreign key relationships to determine correct ordering:
477/// - Parents must be inserted before children
478/// - Children must be deleted before parents
479#[derive(Debug, Default)]
480pub struct FlushOrderer {
481    /// Table -> tables it depends on (has FK to).
482    dependencies: HashMap<&'static str, Vec<&'static str>>,
483}
484
485impl FlushOrderer {
486    /// Create a new flush orderer.
487    pub fn new() -> Self {
488        Self::default()
489    }
490
491    /// Register a model type's dependencies.
492    ///
493    /// Extracts foreign key relationships from the model's field metadata.
494    pub fn register_model<T: Model>(&mut self) {
495        let table = T::TABLE_NAME;
496        let deps: Vec<&'static str> = T::fields()
497            .iter()
498            .filter_map(|f| f.foreign_key)
499            .filter_map(|fk| fk.split('.').next())
500            .collect();
501        self.dependencies.insert(table, deps);
502    }
503
504    /// Register a table's dependencies directly.
505    pub fn register_table(&mut self, table: &'static str, depends_on: Vec<&'static str>) {
506        self.dependencies.insert(table, depends_on);
507    }
508
509    /// Get the dependency count for a table.
510    fn dependency_count(&self, table: &str) -> usize {
511        self.dependencies.get(table).map_or(0, Vec::len)
512    }
513
514    /// Order operations into a flush plan.
515    ///
516    /// Returns operations grouped and sorted:
517    /// - Deletes: child-first (more dependencies = delete first)
518    /// - Inserts: parent-first (fewer dependencies = insert first)
519    /// - Updates: any order
520    pub fn order(&self, ops: Vec<PendingOp>) -> FlushPlan {
521        let mut deletes = Vec::new();
522        let mut inserts = Vec::new();
523        let mut updates = Vec::new();
524
525        for op in ops {
526            match op {
527                PendingOp::Delete { .. } => deletes.push(op),
528                PendingOp::Insert { .. } => inserts.push(op),
529                PendingOp::Update { .. } => updates.push(op),
530            }
531        }
532
533        // Sort deletes: children first (more deps = delete first)
534        deletes.sort_by(|a, b| {
535            let a_deps = self.dependency_count(a.table());
536            let b_deps = self.dependency_count(b.table());
537            b_deps.cmp(&a_deps)
538        });
539
540        // Sort inserts: parents first (fewer deps = insert first)
541        inserts.sort_by(|a, b| {
542            let a_deps = self.dependency_count(a.table());
543            let b_deps = self.dependency_count(b.table());
544            a_deps.cmp(&b_deps)
545        });
546
547        FlushPlan {
548            deletes,
549            inserts,
550            updates,
551        }
552    }
553}
554
555/// A plan for executing flush operations.
556#[derive(Debug, Default)]
557pub struct FlushPlan {
558    /// Delete operations (ordered child-first).
559    pub deletes: Vec<PendingOp>,
560    /// Insert operations (ordered parent-first).
561    pub inserts: Vec<PendingOp>,
562    /// Update operations (any order).
563    pub updates: Vec<PendingOp>,
564}
565
566impl FlushPlan {
567    /// Create an empty flush plan.
568    pub fn new() -> Self {
569        Self::default()
570    }
571
572    /// Check if the plan has any operations.
573    pub fn is_empty(&self) -> bool {
574        self.deletes.is_empty() && self.inserts.is_empty() && self.updates.is_empty()
575    }
576
577    /// Total number of operations in the plan.
578    pub fn len(&self) -> usize {
579        self.deletes.len() + self.inserts.len() + self.updates.len()
580    }
581
582    /// Execute the flush plan against the database.
583    #[tracing::instrument(level = "info", skip(self, cx, conn))]
584    pub async fn execute<C: Connection>(&self, cx: &Cx, conn: &C) -> Outcome<FlushResult, Error> {
585        tracing::info!(
586            deletes = self.deletes.len(),
587            inserts = self.inserts.len(),
588            updates = self.updates.len(),
589            "Executing flush plan"
590        );
591
592        let start = std::time::Instant::now();
593        let mut result = FlushResult::default();
594
595        // 1. Execute deletes (batched by table)
596        for batch in Self::batch_by_table(&self.deletes) {
597            match Self::execute_delete_batch(cx, conn, &batch).await {
598                Outcome::Ok(count) => result.deleted += count,
599                Outcome::Err(e) => return Outcome::Err(e),
600                Outcome::Cancelled(r) => return Outcome::Cancelled(r),
601                Outcome::Panicked(p) => return Outcome::Panicked(p),
602            }
603        }
604
605        // 2. Execute inserts (batched by table)
606        for batch in Self::batch_by_table(&self.inserts) {
607            match Self::execute_insert_batch(cx, conn, &batch).await {
608                Outcome::Ok(count) => result.inserted += count,
609                Outcome::Err(e) => return Outcome::Err(e),
610                Outcome::Cancelled(r) => return Outcome::Cancelled(r),
611                Outcome::Panicked(p) => return Outcome::Panicked(p),
612            }
613        }
614
615        // 3. Execute updates (one at a time - different columns may be dirty)
616        for op in &self.updates {
617            match Self::execute_update(cx, conn, op).await {
618                Outcome::Ok(()) => result.updated += 1,
619                Outcome::Err(e) => return Outcome::Err(e),
620                Outcome::Cancelled(r) => return Outcome::Cancelled(r),
621                Outcome::Panicked(p) => return Outcome::Panicked(p),
622            }
623        }
624
625        tracing::info!(
626            elapsed_ms = start.elapsed().as_millis(),
627            inserted = result.inserted,
628            updated = result.updated,
629            deleted = result.deleted,
630            "Flush complete"
631        );
632
633        Outcome::Ok(result)
634    }
635
636    /// Group operations by table name.
637    fn batch_by_table(ops: &[PendingOp]) -> Vec<Vec<&PendingOp>> {
638        if ops.is_empty() {
639            return Vec::new();
640        }
641
642        let mut batches: Vec<Vec<&PendingOp>> = Vec::new();
643        let mut current_table: Option<&'static str> = None;
644        let mut current_batch: Vec<&PendingOp> = Vec::new();
645
646        for op in ops {
647            let table = op.table();
648            if current_table == Some(table) {
649                current_batch.push(op);
650            } else {
651                if !current_batch.is_empty() {
652                    batches.push(current_batch);
653                }
654                current_batch = vec![op];
655                current_table = Some(table);
656            }
657        }
658
659        if !current_batch.is_empty() {
660            batches.push(current_batch);
661        }
662
663        batches
664    }
665
666    #[allow(clippy::result_large_err)]
667    fn build_insert_batch_sql(
668        dialect: sqlmodel_core::Dialect,
669        ops: &[&PendingOp],
670    ) -> Result<(String, Vec<Value>), Error> {
671        let table = ops[0].table();
672        let PendingOp::Insert { columns, .. } = ops[0] else {
673            return Err(Error::Custom("expected insert operation".to_string()));
674        };
675
676        let col_list: String = columns
677            .iter()
678            .map(|c| dialect.quote_identifier(c))
679            .collect::<Vec<_>>()
680            .join(", ");
681
682        let mut sql = format!(
683            "INSERT INTO {} ({}) VALUES ",
684            dialect.quote_identifier(table),
685            col_list
686        );
687        let mut params: Vec<Value> = Vec::new();
688        let mut param_idx = 1;
689
690        for (i, op) in ops.iter().enumerate() {
691            let PendingOp::Insert {
692                columns: row_columns,
693                values,
694                ..
695            } = op
696            else {
697                return Err(Error::Custom(
698                    "mixed operation kinds in insert batch".to_string(),
699                ));
700            };
701
702            if row_columns != columns {
703                return Err(Error::Custom(format!(
704                    "inconsistent insert columns in flush batch for table {table}"
705                )));
706            }
707            if values.len() != columns.len() {
708                return Err(Error::Custom(format!(
709                    "insert column/value length mismatch for table {table}: {} columns vs {} values",
710                    columns.len(),
711                    values.len()
712                )));
713            }
714
715            if i > 0 {
716                sql.push_str(", ");
717            }
718            let placeholders: Vec<String> = (0..values.len())
719                .map(|_| {
720                    let p = dialect.placeholder(param_idx);
721                    param_idx += 1;
722                    p
723                })
724                .collect();
725            sql.push('(');
726            sql.push_str(&placeholders.join(", "));
727            sql.push(')');
728            params.extend(values.iter().cloned());
729        }
730
731        Ok((sql, params))
732    }
733
734    #[allow(clippy::result_large_err)]
735    fn build_delete_batch_sql(
736        dialect: sqlmodel_core::Dialect,
737        ops: &[&PendingOp],
738    ) -> Result<Option<(String, Vec<Value>, usize)>, Error> {
739        let table = ops[0].table();
740        let PendingOp::Delete { pk_columns, .. } = ops[0] else {
741            return Err(Error::Custom("expected delete operation".to_string()));
742        };
743
744        // No PK means no safe WHERE clause.
745        if pk_columns.is_empty() {
746            tracing::warn!(
747                table = table,
748                count = ops.len(),
749                "Skipping DELETE batch for table without primary key - cannot identify rows"
750            );
751            return Ok(None);
752        }
753
754        if pk_columns.len() == 1 {
755            let pk_col = pk_columns[0];
756            let mut params: Vec<Value> = Vec::new();
757            let placeholders: Vec<String> = ops
758                .iter()
759                .filter_map(|op| {
760                    if let PendingOp::Delete {
761                        pk_columns: row_pk_columns,
762                        pk_values,
763                        ..
764                    } = op
765                    {
766                        if row_pk_columns != pk_columns {
767                            return None;
768                        }
769                        if pk_values.len() != 1 {
770                            return None;
771                        }
772                        params.push(pk_values[0].clone());
773                        return Some(dialect.placeholder(params.len()));
774                    }
775                    None
776                })
777                .collect();
778
779            if placeholders.is_empty() {
780                return Ok(None);
781            }
782
783            let actual_count = params.len();
784            let sql = format!(
785                "DELETE FROM {} WHERE {} IN ({})",
786                dialect.quote_identifier(table),
787                dialect.quote_identifier(pk_col),
788                placeholders.join(", ")
789            );
790            return Ok(Some((sql, params, actual_count)));
791        }
792
793        Err(Error::Custom(
794            "composite delete batch must be handled per-row".to_string(),
795        ))
796    }
797
798    #[allow(clippy::result_large_err)]
799    fn build_update_sql(
800        dialect: sqlmodel_core::Dialect,
801        op: &PendingOp,
802    ) -> Result<Option<(String, Vec<Value>)>, Error> {
803        let PendingOp::Update {
804            table,
805            pk_columns,
806            pk_values,
807            set_columns,
808            set_values,
809            ..
810        } = op
811        else {
812            return Ok(None);
813        };
814
815        // No PK means no safe WHERE clause.
816        if pk_columns.is_empty() || pk_values.is_empty() {
817            tracing::warn!(
818                table = *table,
819                "Skipping UPDATE for row without primary key - cannot identify row"
820            );
821            return Ok(None);
822        }
823        if set_columns.is_empty() {
824            return Ok(None);
825        }
826
827        if pk_columns.len() != pk_values.len() {
828            return Err(Error::Custom(format!(
829                "update primary key column/value length mismatch for table {table}: {} columns vs {} values",
830                pk_columns.len(),
831                pk_values.len()
832            )));
833        }
834        if set_columns.len() != set_values.len() {
835            return Err(Error::Custom(format!(
836                "update set column/value length mismatch for table {table}: {} columns vs {} values",
837                set_columns.len(),
838                set_values.len()
839            )));
840        }
841
842        let mut param_idx = 1;
843        let set_clause: String = set_columns
844            .iter()
845            .map(|col| {
846                let clause = format!(
847                    "{} = {}",
848                    dialect.quote_identifier(col),
849                    dialect.placeholder(param_idx)
850                );
851                param_idx += 1;
852                clause
853            })
854            .collect::<Vec<_>>()
855            .join(", ");
856
857        let where_clause: String = pk_columns
858            .iter()
859            .map(|col| {
860                let clause = format!(
861                    "{} = {}",
862                    dialect.quote_identifier(col),
863                    dialect.placeholder(param_idx)
864                );
865                param_idx += 1;
866                clause
867            })
868            .collect::<Vec<_>>()
869            .join(" AND ");
870
871        let sql = format!(
872            "UPDATE {} SET {} WHERE {}",
873            dialect.quote_identifier(table),
874            set_clause,
875            where_clause
876        );
877
878        let mut params: Vec<Value> = set_values.clone();
879        params.extend(pk_values.iter().cloned());
880
881        Ok(Some((sql, params)))
882    }
883
884    /// Execute a batch of insert operations.
885    #[tracing::instrument(level = "debug", skip(cx, conn, ops))]
886    async fn execute_insert_batch<C: Connection>(
887        cx: &Cx,
888        conn: &C,
889        ops: &[&PendingOp],
890    ) -> Outcome<usize, Error> {
891        if ops.is_empty() {
892            return Outcome::Ok(0);
893        }
894
895        let table = ops[0].table();
896        let PendingOp::Insert { .. } = ops[0] else {
897            return Outcome::Ok(0);
898        };
899
900        tracing::debug!(table = table, count = ops.len(), "Executing insert batch");
901        let dialect = conn.dialect();
902        let (sql, params) = match Self::build_insert_batch_sql(dialect, ops) {
903            Ok(v) => v,
904            Err(e) => return Outcome::Err(e),
905        };
906
907        match conn.execute(cx, &sql, &params).await {
908            Outcome::Ok(_) => Outcome::Ok(ops.len()),
909            Outcome::Err(e) => Outcome::Err(e),
910            Outcome::Cancelled(r) => Outcome::Cancelled(r),
911            Outcome::Panicked(p) => Outcome::Panicked(p),
912        }
913    }
914
915    /// Execute a batch of delete operations.
916    #[tracing::instrument(level = "debug", skip(cx, conn, ops))]
917    async fn execute_delete_batch<C: Connection>(
918        cx: &Cx,
919        conn: &C,
920        ops: &[&PendingOp],
921    ) -> Outcome<usize, Error> {
922        if ops.is_empty() {
923            return Outcome::Ok(0);
924        }
925
926        let table = ops[0].table();
927        let PendingOp::Delete { pk_columns, .. } = ops[0] else {
928            return Outcome::Ok(0);
929        };
930
931        // Skip if no primary key columns - cannot safely DELETE without WHERE clause
932        if pk_columns.is_empty() {
933            tracing::warn!(
934                table = table,
935                count = ops.len(),
936                "Skipping DELETE batch for table without primary key - cannot identify rows"
937            );
938            return Outcome::Ok(0);
939        }
940
941        tracing::debug!(table = table, count = ops.len(), "Executing delete batch");
942        let dialect = conn.dialect();
943
944        // For simple single-column PK, use IN clause
945        // DELETE FROM table WHERE pk IN ($1, $2, $3, ...)
946        if pk_columns.len() == 1 {
947            let (sql, params, actual_count) = match Self::build_delete_batch_sql(dialect, ops) {
948                Ok(Some(v)) => v,
949                Ok(None) => return Outcome::Ok(0),
950                Err(e) => return Outcome::Err(e),
951            };
952
953            match conn.execute(cx, &sql, &params).await {
954                // Return actual count of items in IN clause, not ops.len()
955                // (some ops may have been filtered out due to empty pk_values)
956                Outcome::Ok(_) => Outcome::Ok(actual_count),
957                Outcome::Err(e) => Outcome::Err(e),
958                Outcome::Cancelled(r) => Outcome::Cancelled(r),
959                Outcome::Panicked(p) => Outcome::Panicked(p),
960            }
961        } else {
962            // Composite PK: execute individual deletes
963            let mut deleted = 0;
964            for op in ops {
965                if let PendingOp::Delete {
966                    pk_columns,
967                    pk_values,
968                    ..
969                } = op
970                {
971                    // Skip if pk_values is empty - would cause parameter mismatch
972                    if pk_values.is_empty() {
973                        tracing::warn!(
974                            table = table,
975                            "Skipping DELETE for row with empty primary key values"
976                        );
977                        continue;
978                    }
979                    if pk_values.len() != pk_columns.len() {
980                        return Outcome::Err(Error::Custom(format!(
981                            "delete primary key column/value length mismatch for table {table}: {} columns vs {} values",
982                            pk_columns.len(),
983                            pk_values.len()
984                        )));
985                    }
986
987                    let where_clause: String = pk_columns
988                        .iter()
989                        .enumerate()
990                        .map(|(i, col)| {
991                            format!(
992                                "{} = {}",
993                                dialect.quote_identifier(col),
994                                dialect.placeholder(i + 1)
995                            )
996                        })
997                        .collect::<Vec<_>>()
998                        .join(" AND ");
999
1000                    let sql = format!(
1001                        "DELETE FROM {} WHERE {}",
1002                        dialect.quote_identifier(table),
1003                        where_clause
1004                    );
1005
1006                    match conn.execute(cx, &sql, pk_values).await {
1007                        Outcome::Ok(_) => deleted += 1,
1008                        Outcome::Err(e) => return Outcome::Err(e),
1009                        Outcome::Cancelled(r) => return Outcome::Cancelled(r),
1010                        Outcome::Panicked(p) => return Outcome::Panicked(p),
1011                    }
1012                }
1013            }
1014            Outcome::Ok(deleted)
1015        }
1016    }
1017
1018    /// Execute a single update operation.
1019    #[tracing::instrument(level = "debug", skip(cx, conn, op))]
1020    async fn execute_update<C: Connection>(
1021        cx: &Cx,
1022        conn: &C,
1023        op: &PendingOp,
1024    ) -> Outcome<(), Error> {
1025        let PendingOp::Update { table, .. } = op else {
1026            return Outcome::Ok(());
1027        };
1028
1029        tracing::debug!(table = *table, "Executing update");
1030        let dialect = conn.dialect();
1031        let (sql, params) = match Self::build_update_sql(dialect, op) {
1032            Ok(Some(v)) => v,
1033            Ok(None) => return Outcome::Ok(()),
1034            Err(e) => return Outcome::Err(e),
1035        };
1036
1037        match conn.execute(cx, &sql, &params).await {
1038            Outcome::Ok(_) => Outcome::Ok(()),
1039            Outcome::Err(e) => Outcome::Err(e),
1040            Outcome::Cancelled(r) => Outcome::Cancelled(r),
1041            Outcome::Panicked(p) => Outcome::Panicked(p),
1042        }
1043    }
1044}
1045
1046/// Result of a flush operation.
1047#[derive(Debug, Default, Clone, Copy)]
1048pub struct FlushResult {
1049    /// Number of rows inserted.
1050    pub inserted: usize,
1051    /// Number of rows updated.
1052    pub updated: usize,
1053    /// Number of rows deleted.
1054    pub deleted: usize,
1055}
1056
1057impl FlushResult {
1058    /// Create a new empty result.
1059    pub fn new() -> Self {
1060        Self::default()
1061    }
1062
1063    /// Total number of operations performed.
1064    pub fn total(&self) -> usize {
1065        self.inserted + self.updated + self.deleted
1066    }
1067}
1068
1069#[cfg(test)]
1070mod tests {
1071    use super::*;
1072    use sqlmodel_core::{FieldInfo, Row};
1073    use std::any::TypeId;
1074
1075    // Mock models for testing
1076    struct Team;
1077    struct Hero;
1078
1079    impl Model for Team {
1080        const TABLE_NAME: &'static str = "teams";
1081        const PRIMARY_KEY: &'static [&'static str] = &["id"];
1082
1083        fn fields() -> &'static [FieldInfo] {
1084            static FIELDS: [FieldInfo; 1] =
1085                [FieldInfo::new("id", "id", sqlmodel_core::SqlType::BigInt)
1086                    .primary_key(true)
1087                    .auto_increment(true)];
1088            &FIELDS
1089        }
1090
1091        fn primary_key_value(&self) -> Vec<Value> {
1092            vec![]
1093        }
1094
1095        fn from_row(_row: &Row) -> Result<Self, sqlmodel_core::Error> {
1096            Ok(Team)
1097        }
1098
1099        fn to_row(&self) -> Vec<(&'static str, Value)> {
1100            vec![]
1101        }
1102
1103        fn is_new(&self) -> bool {
1104            true
1105        }
1106    }
1107
1108    impl Model for Hero {
1109        const TABLE_NAME: &'static str = "heroes";
1110        const PRIMARY_KEY: &'static [&'static str] = &["id"];
1111
1112        fn fields() -> &'static [FieldInfo] {
1113            static FIELDS: [FieldInfo; 2] = [
1114                FieldInfo::new("id", "id", sqlmodel_core::SqlType::BigInt)
1115                    .primary_key(true)
1116                    .auto_increment(true),
1117                FieldInfo::new("team_id", "team_id", sqlmodel_core::SqlType::BigInt)
1118                    .nullable(true)
1119                    .foreign_key("teams.id"),
1120            ];
1121            &FIELDS
1122        }
1123
1124        fn primary_key_value(&self) -> Vec<Value> {
1125            vec![]
1126        }
1127
1128        fn from_row(_row: &Row) -> Result<Self, sqlmodel_core::Error> {
1129            Ok(Hero)
1130        }
1131
1132        fn to_row(&self) -> Vec<(&'static str, Value)> {
1133            vec![]
1134        }
1135
1136        fn is_new(&self) -> bool {
1137            true
1138        }
1139    }
1140
1141    fn make_insert(table: &'static str, pk: i64) -> PendingOp {
1142        PendingOp::Insert {
1143            key: ObjectKey {
1144                type_id: TypeId::of::<()>(),
1145                pk_hash: pk as u64,
1146            },
1147            table,
1148            columns: vec!["id", "name"],
1149            values: vec![Value::BigInt(pk), Value::Text("Test".to_string())],
1150        }
1151    }
1152
1153    fn make_delete(table: &'static str, pk: i64) -> PendingOp {
1154        PendingOp::Delete {
1155            key: ObjectKey {
1156                type_id: TypeId::of::<()>(),
1157                pk_hash: pk as u64,
1158            },
1159            table,
1160            pk_columns: vec!["id"],
1161            pk_values: vec![Value::BigInt(pk)],
1162        }
1163    }
1164
1165    fn make_update(table: &'static str, pk: i64) -> PendingOp {
1166        PendingOp::Update {
1167            key: ObjectKey {
1168                type_id: TypeId::of::<()>(),
1169                pk_hash: pk as u64,
1170            },
1171            table,
1172            pk_columns: vec!["id"],
1173            pk_values: vec![Value::BigInt(pk)],
1174            set_columns: vec!["name"],
1175            set_values: vec![Value::Text("Updated".to_string())],
1176        }
1177    }
1178
1179    #[test]
1180    fn test_pending_op_table_accessor() {
1181        let insert = make_insert("teams", 1);
1182        assert_eq!(insert.table(), "teams");
1183
1184        let delete = make_delete("heroes", 2);
1185        assert_eq!(delete.table(), "heroes");
1186
1187        let update = make_update("teams", 3);
1188        assert_eq!(update.table(), "teams");
1189    }
1190
1191    #[test]
1192    fn test_pending_op_type_checks() {
1193        let insert = make_insert("teams", 1);
1194        assert!(insert.is_insert());
1195        assert!(!insert.is_update());
1196        assert!(!insert.is_delete());
1197
1198        let update = make_update("teams", 1);
1199        assert!(update.is_update());
1200        assert!(!update.is_insert());
1201        assert!(!update.is_delete());
1202
1203        let delete = make_delete("teams", 1);
1204        assert!(delete.is_delete());
1205        assert!(!delete.is_insert());
1206        assert!(!delete.is_update());
1207    }
1208
1209    #[test]
1210    fn test_orderer_simple_no_deps() {
1211        let orderer = FlushOrderer::new();
1212        let ops = vec![
1213            make_insert("teams", 1),
1214            make_insert("teams", 2),
1215            make_delete("teams", 3),
1216        ];
1217
1218        let plan = orderer.order(ops);
1219        assert_eq!(plan.inserts.len(), 2);
1220        assert_eq!(plan.deletes.len(), 1);
1221        assert_eq!(plan.updates.len(), 0);
1222    }
1223
1224    #[test]
1225    fn test_orderer_parent_child_inserts() {
1226        let mut orderer = FlushOrderer::new();
1227        orderer.register_model::<Team>();
1228        orderer.register_model::<Hero>();
1229
1230        // Add child insert first, then parent
1231        let ops = vec![
1232            make_insert("heroes", 1), // Has FK to teams
1233            make_insert("teams", 1),  // No FK
1234        ];
1235
1236        let plan = orderer.order(ops);
1237
1238        // Teams should be first (fewer deps)
1239        assert_eq!(plan.inserts[0].table(), "teams");
1240        assert_eq!(plan.inserts[1].table(), "heroes");
1241    }
1242
1243    #[test]
1244    fn test_orderer_parent_child_deletes() {
1245        let mut orderer = FlushOrderer::new();
1246        orderer.register_model::<Team>();
1247        orderer.register_model::<Hero>();
1248
1249        // Add parent delete first, then child
1250        let ops = vec![
1251            make_delete("teams", 1),  // No FK
1252            make_delete("heroes", 1), // Has FK to teams
1253        ];
1254
1255        let plan = orderer.order(ops);
1256
1257        // Heroes should be first (more deps = delete first)
1258        assert_eq!(plan.deletes[0].table(), "heroes");
1259        assert_eq!(plan.deletes[1].table(), "teams");
1260    }
1261
1262    #[test]
1263    fn test_batch_by_table_groups_correctly() {
1264        let ops = vec![
1265            make_insert("teams", 1),
1266            make_insert("teams", 2),
1267            make_insert("heroes", 1),
1268            make_insert("heroes", 2),
1269            make_insert("teams", 3),
1270        ];
1271
1272        let batches = FlushPlan::batch_by_table(&ops);
1273
1274        // Should group consecutive same-table ops
1275        assert_eq!(batches.len(), 3);
1276        assert_eq!(batches[0].len(), 2); // teams 1, 2
1277        assert_eq!(batches[1].len(), 2); // heroes 1, 2
1278        assert_eq!(batches[2].len(), 1); // teams 3
1279    }
1280
1281    #[test]
1282    fn test_batch_empty_returns_empty() {
1283        let ops: Vec<PendingOp> = vec![];
1284        let batches = FlushPlan::batch_by_table(&ops);
1285        assert!(batches.is_empty());
1286    }
1287
1288    #[test]
1289    fn test_flush_plan_is_empty() {
1290        let plan = FlushPlan::new();
1291        assert!(plan.is_empty());
1292        assert_eq!(plan.len(), 0);
1293    }
1294
1295    #[test]
1296    fn test_flush_plan_len() {
1297        let plan = FlushPlan {
1298            deletes: vec![make_delete("teams", 1)],
1299            inserts: vec![make_insert("teams", 1), make_insert("teams", 2)],
1300            updates: vec![make_update("teams", 1)],
1301        };
1302        assert!(!plan.is_empty());
1303        assert_eq!(plan.len(), 4);
1304    }
1305
1306    #[test]
1307    fn test_flush_result_total() {
1308        let result = FlushResult {
1309            inserted: 5,
1310            updated: 3,
1311            deleted: 2,
1312        };
1313        assert_eq!(result.total(), 10);
1314    }
1315
1316    #[test]
1317    fn test_flush_result_default() {
1318        let result = FlushResult::new();
1319        assert_eq!(result.inserted, 0);
1320        assert_eq!(result.updated, 0);
1321        assert_eq!(result.deleted, 0);
1322        assert_eq!(result.total(), 0);
1323    }
1324
1325    // ========================================================================
1326    // Link Table Operation Tests
1327    // ========================================================================
1328
1329    #[test]
1330    fn test_link_table_op_link_constructor() {
1331        let op = LinkTableOp::link(
1332            "hero_powers".to_string(),
1333            "hero_id".to_string(),
1334            Value::BigInt(1),
1335            "power_id".to_string(),
1336            Value::BigInt(5),
1337        );
1338
1339        match op {
1340            LinkTableOp::Link {
1341                table,
1342                local_columns,
1343                local_values,
1344                remote_columns,
1345                remote_values,
1346            } => {
1347                assert_eq!(table, "hero_powers");
1348                assert_eq!(local_columns, vec!["hero_id".to_string()]);
1349                assert_eq!(local_values, vec![Value::BigInt(1)]);
1350                assert_eq!(remote_columns, vec!["power_id".to_string()]);
1351                assert_eq!(remote_values, vec![Value::BigInt(5)]);
1352            }
1353            LinkTableOp::Unlink { .. } => std::panic::panic_any("Expected Link variant"),
1354        }
1355    }
1356
1357    #[test]
1358    fn test_link_table_op_unlink_constructor() {
1359        let op = LinkTableOp::unlink(
1360            "hero_powers".to_string(),
1361            "hero_id".to_string(),
1362            Value::BigInt(1),
1363            "power_id".to_string(),
1364            Value::BigInt(5),
1365        );
1366
1367        match op {
1368            LinkTableOp::Unlink {
1369                table,
1370                local_columns,
1371                local_values,
1372                remote_columns,
1373                remote_values,
1374            } => {
1375                assert_eq!(table, "hero_powers");
1376                assert_eq!(local_columns, vec!["hero_id".to_string()]);
1377                assert_eq!(local_values, vec![Value::BigInt(1)]);
1378                assert_eq!(remote_columns, vec!["power_id".to_string()]);
1379                assert_eq!(remote_values, vec![Value::BigInt(5)]);
1380            }
1381            LinkTableOp::Link { .. } => std::panic::panic_any("Expected Unlink variant"),
1382        }
1383    }
1384
1385    #[test]
1386    fn test_link_table_op_is_link() {
1387        let link = LinkTableOp::link(
1388            "t".to_string(),
1389            "a".to_string(),
1390            Value::BigInt(1),
1391            "b".to_string(),
1392            Value::BigInt(2),
1393        );
1394        let unlink = LinkTableOp::unlink(
1395            "t".to_string(),
1396            "a".to_string(),
1397            Value::BigInt(1),
1398            "b".to_string(),
1399            Value::BigInt(2),
1400        );
1401
1402        assert!(matches!(link, LinkTableOp::Link { .. }));
1403        assert!(matches!(unlink, LinkTableOp::Unlink { .. }));
1404    }
1405
1406    #[test]
1407    fn test_link_table_op_debug_format() {
1408        let link = LinkTableOp::link(
1409            "hero_powers".to_string(),
1410            "hero_id".to_string(),
1411            Value::BigInt(1),
1412            "power_id".to_string(),
1413            Value::BigInt(5),
1414        );
1415        let debug_str = format!("{:?}", link);
1416        assert!(debug_str.contains("Link"));
1417        assert!(debug_str.contains("hero_powers"));
1418    }
1419
1420    #[test]
1421    fn test_link_table_op_clone() {
1422        let op = LinkTableOp::link(
1423            "hero_powers".to_string(),
1424            "hero_id".to_string(),
1425            Value::BigInt(1),
1426            "power_id".to_string(),
1427            Value::BigInt(5),
1428        );
1429        let cloned = op.clone();
1430
1431        match (op, cloned) {
1432            (
1433                LinkTableOp::Link {
1434                    table: t1,
1435                    local_values: lv1,
1436                    remote_values: rv1,
1437                    ..
1438                },
1439                LinkTableOp::Link {
1440                    table: t2,
1441                    local_values: lv2,
1442                    remote_values: rv2,
1443                    ..
1444                },
1445            ) => {
1446                assert_eq!(t1, t2);
1447                assert_eq!(lv1, lv2);
1448                assert_eq!(rv1, rv2);
1449            }
1450            _ => std::panic::panic_any("Clone should preserve variant"),
1451        }
1452    }
1453
1454    #[test]
1455    fn test_link_table_ops_empty_vec() {
1456        // Test that an empty ops vec handles correctly
1457        let ops: Vec<LinkTableOp> = vec![];
1458        assert!(ops.is_empty());
1459    }
1460
1461    #[test]
1462    fn test_link_table_ops_multiple_operations() {
1463        let ops = [
1464            LinkTableOp::link(
1465                "hero_powers".to_string(),
1466                "hero_id".to_string(),
1467                Value::BigInt(1),
1468                "power_id".to_string(),
1469                Value::BigInt(1),
1470            ),
1471            LinkTableOp::link(
1472                "hero_powers".to_string(),
1473                "hero_id".to_string(),
1474                Value::BigInt(1),
1475                "power_id".to_string(),
1476                Value::BigInt(2),
1477            ),
1478            LinkTableOp::unlink(
1479                "hero_powers".to_string(),
1480                "hero_id".to_string(),
1481                Value::BigInt(1),
1482                "power_id".to_string(),
1483                Value::BigInt(3),
1484            ),
1485        ];
1486
1487        let links: Vec<_> = ops
1488            .iter()
1489            .filter(|o| matches!(o, LinkTableOp::Link { .. }))
1490            .collect();
1491        let unlinks: Vec<_> = ops
1492            .iter()
1493            .filter(|o| matches!(o, LinkTableOp::Unlink { .. }))
1494            .collect();
1495
1496        assert_eq!(links.len(), 2);
1497        assert_eq!(unlinks.len(), 1);
1498    }
1499
1500    #[test]
1501    fn test_link_table_op_with_different_value_types() {
1502        // Test with string values
1503        let op_str = LinkTableOp::link(
1504            "tag_items".to_string(),
1505            "tag_id".to_string(),
1506            Value::Text("tag-uuid-123".to_string()),
1507            "item_id".to_string(),
1508            Value::Text("item-uuid-456".to_string()),
1509        );
1510
1511        match op_str {
1512            LinkTableOp::Link {
1513                local_values,
1514                remote_values,
1515                ..
1516            } => {
1517                assert!(matches!(local_values.first(), Some(Value::Text(_))));
1518                assert!(matches!(remote_values.first(), Some(Value::Text(_))));
1519            }
1520            LinkTableOp::Unlink { .. } => std::panic::panic_any("Expected Link"),
1521        }
1522
1523        // Test with integer values
1524        let op_int = LinkTableOp::link(
1525            "user_roles".to_string(),
1526            "user_id".to_string(),
1527            Value::Int(42),
1528            "role_id".to_string(),
1529            Value::Int(7),
1530        );
1531
1532        match op_int {
1533            LinkTableOp::Link {
1534                local_values,
1535                remote_values,
1536                ..
1537            } => {
1538                assert!(matches!(local_values.first(), Some(Value::Int(_))));
1539                assert!(matches!(remote_values.first(), Some(Value::Int(_))));
1540            }
1541            LinkTableOp::Unlink { .. } => std::panic::panic_any("Expected Link"),
1542        }
1543    }
1544
1545    // ================================================================================
1546    // DML Identifier Quoting Integration Tests
1547    // ================================================================================
1548
1549    // Helper to create PendingOp::Insert with custom names
1550    fn make_custom_insert(table: &'static str, columns: Vec<&'static str>, pk: i64) -> PendingOp {
1551        PendingOp::Insert {
1552            key: ObjectKey {
1553                type_id: TypeId::of::<()>(),
1554                pk_hash: pk as u64,
1555            },
1556            table,
1557            columns,
1558            values: vec![Value::BigInt(pk), Value::Text("Test".to_string())],
1559        }
1560    }
1561
1562    // Helper to create PendingOp::Delete with custom pk columns
1563    fn make_custom_delete(
1564        table: &'static str,
1565        pk_columns: Vec<&'static str>,
1566        pk: i64,
1567    ) -> PendingOp {
1568        PendingOp::Delete {
1569            key: ObjectKey {
1570                type_id: TypeId::of::<()>(),
1571                pk_hash: pk as u64,
1572            },
1573            table,
1574            pk_columns,
1575            pk_values: vec![Value::BigInt(pk)],
1576        }
1577    }
1578
1579    // Helper to create PendingOp::Update with custom column names
1580    fn make_custom_update(
1581        table: &'static str,
1582        pk_columns: Vec<&'static str>,
1583        set_columns: Vec<&'static str>,
1584        pk: i64,
1585    ) -> PendingOp {
1586        PendingOp::Update {
1587            key: ObjectKey {
1588                type_id: TypeId::of::<()>(),
1589                pk_hash: pk as u64,
1590            },
1591            table,
1592            pk_columns,
1593            pk_values: vec![Value::BigInt(pk)],
1594            set_columns,
1595            set_values: vec![Value::Text("Updated".to_string())],
1596        }
1597    }
1598
1599    // ------ LinkTableOp SQL Generation Tests ------
1600
1601    #[test]
1602    fn test_link_table_op_to_sql_simple() {
1603        let op = LinkTableOp::link(
1604            "hero_powers".to_string(),
1605            "hero_id".to_string(),
1606            Value::BigInt(1),
1607            "power_id".to_string(),
1608            Value::BigInt(5),
1609        );
1610        let sql = op.to_sql();
1611        assert_eq!(
1612            sql,
1613            "INSERT INTO \"hero_powers\" (\"hero_id\", \"power_id\") VALUES ($1, $2)"
1614        );
1615    }
1616
1617    #[test]
1618    fn test_link_table_op_to_sql_with_keywords() {
1619        let op = LinkTableOp::link(
1620            "order".to_string(),  // SQL keyword table
1621            "select".to_string(), // SQL keyword column
1622            Value::BigInt(1),
1623            "from".to_string(), // SQL keyword column
1624            Value::BigInt(2),
1625        );
1626        let sql = op.to_sql();
1627        assert_eq!(
1628            sql,
1629            "INSERT INTO \"order\" (\"select\", \"from\") VALUES ($1, $2)"
1630        );
1631    }
1632
1633    #[test]
1634    fn test_link_table_op_to_sql_with_embedded_quotes() {
1635        let op = LinkTableOp::link(
1636            "my\"table".to_string(),
1637            "col\"a".to_string(),
1638            Value::BigInt(1),
1639            "col\"b".to_string(),
1640            Value::BigInt(2),
1641        );
1642        let sql = op.to_sql();
1643        assert_eq!(
1644            sql,
1645            "INSERT INTO \"my\"\"table\" (\"col\"\"a\", \"col\"\"b\") VALUES ($1, $2)"
1646        );
1647    }
1648
1649    #[test]
1650    fn test_link_table_op_unlink_to_sql_with_keywords() {
1651        let op = LinkTableOp::unlink(
1652            "user".to_string(),
1653            "index".to_string(),
1654            Value::BigInt(1),
1655            "key".to_string(),
1656            Value::BigInt(2),
1657        );
1658        let sql = op.to_sql();
1659        assert_eq!(
1660            sql,
1661            "DELETE FROM \"user\" WHERE \"index\" = $1 AND \"key\" = $2"
1662        );
1663    }
1664
1665    #[test]
1666    fn test_link_table_op_to_sql_with_unicode() {
1667        let op = LinkTableOp::link(
1668            "用户表".to_string(),
1669            "用户id".to_string(),
1670            Value::BigInt(1),
1671            "角色id".to_string(),
1672            Value::BigInt(2),
1673        );
1674        let sql = op.to_sql();
1675        assert_eq!(
1676            sql,
1677            "INSERT INTO \"用户表\" (\"用户id\", \"角色id\") VALUES ($1, $2)"
1678        );
1679    }
1680
1681    #[test]
1682    fn test_link_table_op_to_sql_with_spaces() {
1683        let op = LinkTableOp::link(
1684            "link table".to_string(),
1685            "local id".to_string(),
1686            Value::BigInt(1),
1687            "remote id".to_string(),
1688            Value::BigInt(2),
1689        );
1690        let sql = op.to_sql();
1691        assert_eq!(
1692            sql,
1693            "INSERT INTO \"link table\" (\"local id\", \"remote id\") VALUES ($1, $2)"
1694        );
1695    }
1696
1697    // ------ PendingOp::Insert SQL Generation Tests ------
1698
1699    #[test]
1700    fn test_pending_op_insert_to_sql_simple() {
1701        let op = make_insert("teams", 1);
1702        let sql = op.to_sql();
1703        assert!(sql.starts_with("INSERT INTO \"teams\""));
1704        assert!(sql.contains("(\"id\", \"name\")"));
1705        assert!(sql.contains("VALUES ($1, $2)"));
1706    }
1707
1708    #[test]
1709    fn test_pending_op_insert_to_sql_with_keyword_table() {
1710        let op = make_custom_insert("order", vec!["id", "select"], 1);
1711        let sql = op.to_sql();
1712        assert_eq!(
1713            sql,
1714            "INSERT INTO \"order\" (\"id\", \"select\") VALUES ($1, $2)"
1715        );
1716    }
1717
1718    #[test]
1719    fn test_pending_op_insert_to_sql_with_quoted_names() {
1720        let op = make_custom_insert("my\"table", vec!["pk\"id", "data\"col"], 1);
1721        let sql = op.to_sql();
1722        assert_eq!(
1723            sql,
1724            "INSERT INTO \"my\"\"table\" (\"pk\"\"id\", \"data\"\"col\") VALUES ($1, $2)"
1725        );
1726    }
1727
1728    // ------ PendingOp::Delete SQL Generation Tests ------
1729
1730    #[test]
1731    fn test_pending_op_delete_to_sql_single_pk() {
1732        let op = make_delete("teams", 1);
1733        let sql = op.to_sql();
1734        assert_eq!(sql, "DELETE FROM \"teams\" WHERE \"id\" IN ($1)");
1735    }
1736
1737    #[test]
1738    fn test_pending_op_delete_to_sql_with_keyword_table() {
1739        let op = make_custom_delete("order", vec!["id"], 1);
1740        let sql = op.to_sql();
1741        assert_eq!(sql, "DELETE FROM \"order\" WHERE \"id\" IN ($1)");
1742    }
1743
1744    #[test]
1745    fn test_pending_op_delete_to_sql_composite_pk() {
1746        let op = PendingOp::Delete {
1747            key: ObjectKey {
1748                type_id: TypeId::of::<()>(),
1749                pk_hash: 1,
1750            },
1751            table: "order_items",
1752            pk_columns: vec!["order_id", "item_id"],
1753            pk_values: vec![Value::BigInt(1), Value::BigInt(2)],
1754        };
1755        let sql = op.to_sql();
1756        assert_eq!(
1757            sql,
1758            "DELETE FROM \"order_items\" WHERE \"order_id\" = $1 AND \"item_id\" = $2"
1759        );
1760    }
1761
1762    #[test]
1763    fn test_pending_op_delete_to_sql_with_keyword_pk_columns() {
1764        let op = PendingOp::Delete {
1765            key: ObjectKey {
1766                type_id: TypeId::of::<()>(),
1767                pk_hash: 1,
1768            },
1769            table: "user",
1770            pk_columns: vec!["select", "from"],
1771            pk_values: vec![Value::BigInt(1), Value::BigInt(2)],
1772        };
1773        let sql = op.to_sql();
1774        assert_eq!(
1775            sql,
1776            "DELETE FROM \"user\" WHERE \"select\" = $1 AND \"from\" = $2"
1777        );
1778    }
1779
1780    // ------ PendingOp::Update SQL Generation Tests ------
1781
1782    #[test]
1783    fn test_pending_op_update_to_sql_simple() {
1784        let op = make_update("teams", 1);
1785        let sql = op.to_sql();
1786        assert_eq!(sql, "UPDATE \"teams\" SET \"name\" = $1 WHERE \"id\" = $2");
1787    }
1788
1789    #[test]
1790    fn test_pending_op_update_to_sql_with_keyword_names() {
1791        let op = make_custom_update("order", vec!["id"], vec!["select", "from"], 1);
1792        let sql = op.to_sql();
1793        assert_eq!(
1794            sql,
1795            "UPDATE \"order\" SET \"select\" = $1, \"from\" = $2 WHERE \"id\" = $3"
1796        );
1797    }
1798
1799    #[test]
1800    fn test_pending_op_update_to_sql_with_quoted_names() {
1801        let op = make_custom_update("my\"table", vec!["pk\"id"], vec!["data\"col"], 1);
1802        let sql = op.to_sql();
1803        assert_eq!(
1804            sql,
1805            "UPDATE \"my\"\"table\" SET \"data\"\"col\" = $1 WHERE \"pk\"\"id\" = $2"
1806        );
1807    }
1808
1809    #[test]
1810    fn test_pending_op_update_to_sql_composite_pk() {
1811        let op = PendingOp::Update {
1812            key: ObjectKey {
1813                type_id: TypeId::of::<()>(),
1814                pk_hash: 1,
1815            },
1816            table: "order_items",
1817            pk_columns: vec!["order_id", "item_id"],
1818            pk_values: vec![Value::BigInt(1), Value::BigInt(2)],
1819            set_columns: vec!["quantity"],
1820            set_values: vec![Value::Int(5)],
1821        };
1822        let sql = op.to_sql();
1823        assert_eq!(
1824            sql,
1825            "UPDATE \"order_items\" SET \"quantity\" = $1 WHERE \"order_id\" = $2 AND \"item_id\" = $3"
1826        );
1827    }
1828
1829    // ------ SQL Injection Neutralization Tests ------
1830
1831    #[test]
1832    fn test_link_op_sql_injection_neutralized() {
1833        // Attempt SQL injection through table name
1834        let op = LinkTableOp::link(
1835            "links\"; DROP TABLE users; --".to_string(),
1836            "col1".to_string(),
1837            Value::BigInt(1),
1838            "col2".to_string(),
1839            Value::BigInt(2),
1840        );
1841        let sql = op.to_sql();
1842        // The embedded quote should be doubled, keeping everything as an identifier
1843        assert!(sql.contains("\"links\"\"; DROP TABLE users; --\""));
1844        // Count quotes - injection is neutralized
1845        assert!(sql.starts_with("INSERT INTO \""));
1846    }
1847
1848    #[test]
1849    fn test_pending_op_insert_sql_injection_neutralized() {
1850        let op = make_custom_insert("users\"; DROP TABLE secrets; --", vec!["id", "name"], 1);
1851        let sql = op.to_sql();
1852        // Injection attempt should be contained within quotes
1853        assert!(sql.contains("\"users\"\"; DROP TABLE secrets; --\""));
1854        assert!(sql.starts_with("INSERT INTO \""));
1855    }
1856
1857    #[test]
1858    fn test_pending_op_update_sql_injection_neutralized() {
1859        let op = make_custom_update("data", vec!["id"], vec!["col\"; DROP TABLE data; --"], 1);
1860        let sql = op.to_sql();
1861        // The malicious column name is safely quoted
1862        assert!(sql.contains("\"col\"\"; DROP TABLE data; --\""));
1863    }
1864
1865    // ------ Edge Cases ------
1866
1867    #[test]
1868    fn test_pending_op_insert_many_columns() {
1869        let op = PendingOp::Insert {
1870            key: ObjectKey {
1871                type_id: TypeId::of::<()>(),
1872                pk_hash: 1,
1873            },
1874            table: "wide_table",
1875            columns: vec!["a", "b", "c", "d", "e"],
1876            values: vec![
1877                Value::Int(1),
1878                Value::Int(2),
1879                Value::Int(3),
1880                Value::Int(4),
1881                Value::Int(5),
1882            ],
1883        };
1884        let sql = op.to_sql();
1885        assert_eq!(
1886            sql,
1887            "INSERT INTO \"wide_table\" (\"a\", \"b\", \"c\", \"d\", \"e\") VALUES ($1, $2, $3, $4, $5)"
1888        );
1889    }
1890
1891    #[test]
1892    fn test_pending_op_update_many_set_columns() {
1893        let op = PendingOp::Update {
1894            key: ObjectKey {
1895                type_id: TypeId::of::<()>(),
1896                pk_hash: 1,
1897            },
1898            table: "items",
1899            pk_columns: vec!["id"],
1900            pk_values: vec![Value::BigInt(1)],
1901            set_columns: vec!["a", "b", "c"],
1902            set_values: vec![Value::Int(1), Value::Int(2), Value::Int(3)],
1903        };
1904        let sql = op.to_sql();
1905        assert_eq!(
1906            sql,
1907            "UPDATE \"items\" SET \"a\" = $1, \"b\" = $2, \"c\" = $3 WHERE \"id\" = $4"
1908        );
1909    }
1910
1911    #[test]
1912    fn test_link_table_op_empty_strings() {
1913        // Edge case: empty identifiers (unusual but should still be quoted)
1914        let op = LinkTableOp::link(
1915            String::new(),
1916            String::new(),
1917            Value::BigInt(1),
1918            String::new(),
1919            Value::BigInt(2),
1920        );
1921        let sql = op.to_sql();
1922        assert_eq!(sql, "INSERT INTO \"\" (\"\", \"\") VALUES ($1, $2)");
1923    }
1924
1925    // ------ Invalid Operation Edge Cases ------
1926
1927    #[test]
1928    fn test_pending_op_delete_empty_pk_columns() {
1929        let op = PendingOp::Delete {
1930            key: ObjectKey {
1931                type_id: TypeId::of::<()>(),
1932                pk_hash: 1,
1933            },
1934            table: "orphan_table",
1935            pk_columns: vec![], // No PK columns - invalid!
1936            pk_values: vec![],
1937        };
1938        let sql = op.to_sql();
1939        // Should return error indicator, not invalid SQL
1940        assert!(sql.starts_with("-- ERROR:"));
1941        assert!(sql.contains("DELETE"));
1942        assert!(sql.contains("no pk_columns"));
1943    }
1944
1945    #[test]
1946    fn test_pending_op_update_empty_pk_columns() {
1947        let op = PendingOp::Update {
1948            key: ObjectKey {
1949                type_id: TypeId::of::<()>(),
1950                pk_hash: 1,
1951            },
1952            table: "orphan_table",
1953            pk_columns: vec![], // No PK columns - invalid!
1954            pk_values: vec![],
1955            set_columns: vec!["name"],
1956            set_values: vec![Value::Text("test".to_string())],
1957        };
1958        let sql = op.to_sql();
1959        // Should return error indicator, not invalid SQL
1960        assert!(sql.starts_with("-- ERROR:"));
1961        assert!(sql.contains("UPDATE"));
1962        assert!(sql.contains("no pk_columns"));
1963    }
1964
1965    #[test]
1966    fn test_pending_op_update_empty_set_columns() {
1967        let op = PendingOp::Update {
1968            key: ObjectKey {
1969                type_id: TypeId::of::<()>(),
1970                pk_hash: 1,
1971            },
1972            table: "nothing_to_update",
1973            pk_columns: vec!["id"],
1974            pk_values: vec![Value::BigInt(1)],
1975            set_columns: vec![], // No columns to set - invalid!
1976            set_values: vec![],
1977        };
1978        let sql = op.to_sql();
1979        // Should return error indicator, not invalid SQL
1980        assert!(sql.starts_with("-- ERROR:"));
1981        assert!(sql.contains("UPDATE"));
1982        assert!(sql.contains("no set_columns"));
1983    }
1984
1985    #[test]
1986    fn test_pending_op_insert_empty_columns() {
1987        let op = PendingOp::Insert {
1988            key: ObjectKey {
1989                type_id: TypeId::of::<()>(),
1990                pk_hash: 1,
1991            },
1992            table: "empty_insert",
1993            columns: vec![], // No columns - invalid!
1994            values: vec![],
1995        };
1996        let sql = op.to_sql();
1997        // Should return error indicator, not invalid SQL
1998        assert!(sql.starts_with("-- ERROR:"));
1999        assert!(sql.contains("INSERT"));
2000        assert!(sql.contains("no columns"));
2001    }
2002
2003    #[test]
2004    fn test_build_insert_batch_sql_mysql_dialect() {
2005        let ops = [make_insert("teams", 1), make_insert("teams", 2)];
2006        let refs: Vec<&PendingOp> = ops.iter().collect();
2007        let (sql, params) = FlushPlan::build_insert_batch_sql(sqlmodel_core::Dialect::Mysql, &refs)
2008            .expect("build insert batch sql");
2009
2010        assert_eq!(
2011            sql,
2012            "INSERT INTO `teams` (`id`, `name`) VALUES (?, ?), (?, ?)"
2013        );
2014        assert_eq!(params.len(), 4);
2015    }
2016
2017    #[test]
2018    fn test_build_delete_batch_sql_sqlite_dialect() {
2019        let ops = [make_delete("heroes", 1), make_delete("heroes", 2)];
2020        let refs: Vec<&PendingOp> = ops.iter().collect();
2021        let built = FlushPlan::build_delete_batch_sql(sqlmodel_core::Dialect::Sqlite, &refs)
2022            .expect("build delete batch sql")
2023            .expect("non-empty delete sql");
2024
2025        assert_eq!(built.0, "DELETE FROM \"heroes\" WHERE \"id\" IN (?1, ?2)");
2026        assert_eq!(built.1.len(), 2);
2027        assert_eq!(built.2, 2);
2028    }
2029
2030    #[test]
2031    fn test_build_update_sql_mysql_dialect() {
2032        let op = make_update("teams", 42);
2033        let (sql, params) = FlushPlan::build_update_sql(sqlmodel_core::Dialect::Mysql, &op)
2034            .expect("build update sql")
2035            .expect("non-empty update sql");
2036
2037        assert_eq!(sql, "UPDATE `teams` SET `name` = ? WHERE `id` = ?");
2038        assert_eq!(params.len(), 2);
2039    }
2040
2041    #[test]
2042    fn test_build_update_sql_rejects_set_mismatch() {
2043        let op = PendingOp::Update {
2044            key: ObjectKey {
2045                type_id: TypeId::of::<()>(),
2046                pk_hash: 1,
2047            },
2048            table: "teams",
2049            pk_columns: vec!["id"],
2050            pk_values: vec![Value::BigInt(1)],
2051            set_columns: vec!["name", "active"],
2052            set_values: vec![Value::Text("A".to_string())],
2053        };
2054
2055        let err = FlushPlan::build_update_sql(sqlmodel_core::Dialect::Postgres, &op)
2056            .expect_err("expected set mismatch error");
2057        assert!(
2058            err.to_string()
2059                .contains("update set column/value length mismatch")
2060        );
2061    }
2062}