Skip to main content

prax_query/
nested.rs

1#![allow(dead_code)]
2
3//! Nested write operations for managing relations in a single mutation.
4//!
5//! This module provides support for creating, connecting, disconnecting, and updating
6//! related records within a single create or update operation.
7//!
8//! # Example
9//!
10//! ```rust,ignore
11//! use prax_query::nested::*;
12//!
13//! // Create a user with nested posts
14//! let user = client
15//!     .user()
16//!     .create(user::create::Data {
17//!         email: "user@example.com".into(),
18//!         name: Some("John Doe".into()),
19//!         posts: Some(NestedWrite::create_many(vec![
20//!             post::create::Data { title: "First Post".into(), content: None },
21//!             post::create::Data { title: "Second Post".into(), content: None },
22//!         ])),
23//!     })
24//!     .exec()
25//!     .await?;
26//!
27//! // Connect existing posts to a user
28//! let user = client
29//!     .user()
30//!     .update(user::id::equals(1))
31//!     .data(user::update::Data {
32//!         posts: Some(NestedWrite::connect(vec![
33//!             post::id::equals(10),
34//!             post::id::equals(20),
35//!         ])),
36//!         ..Default::default()
37//!     })
38//!     .exec()
39//!     .await?;
40//!
41//! // Disconnect posts from a user
42//! let user = client
43//!     .user()
44//!     .update(user::id::equals(1))
45//!     .data(user::update::Data {
46//!         posts: Some(NestedWrite::disconnect(vec![
47//!             post::id::equals(10),
48//!         ])),
49//!         ..Default::default()
50//!     })
51//!     .exec()
52//!     .await?;
53//! ```
54
55use std::fmt::Debug;
56use std::marker::PhantomData;
57
58use crate::error::QueryResult;
59use crate::filter::{Filter, FilterValue};
60use crate::sql::quote_identifier;
61use crate::traits::{Model, QueryEngine};
62
63/// Represents a nested write operation for relations.
64#[derive(Debug, Clone)]
65pub enum NestedWrite<T: Model> {
66    /// Create new related records.
67    Create(Vec<NestedCreateData<T>>),
68    /// Create new records or connect existing ones.
69    CreateOrConnect(Vec<NestedCreateOrConnectData<T>>),
70    /// Connect existing records by their unique identifier.
71    Connect(Vec<Filter>),
72    /// Disconnect records from the relation.
73    Disconnect(Vec<Filter>),
74    /// Set the relation to exactly these records (disconnect all others).
75    Set(Vec<Filter>),
76    /// Delete related records.
77    Delete(Vec<Filter>),
78    /// Update related records.
79    Update(Vec<NestedUpdateData<T>>),
80    /// Update or create related records.
81    Upsert(Vec<NestedUpsertData<T>>),
82    /// Update many related records matching a filter.
83    UpdateMany(NestedUpdateManyData<T>),
84    /// Delete many related records matching a filter.
85    DeleteMany(Filter),
86}
87
88impl<T: Model> NestedWrite<T> {
89    /// Create a new related record.
90    pub fn create(data: NestedCreateData<T>) -> Self {
91        Self::Create(vec![data])
92    }
93
94    /// Create multiple new related records.
95    pub fn create_many(data: Vec<NestedCreateData<T>>) -> Self {
96        Self::Create(data)
97    }
98
99    /// Connect an existing record by filter.
100    pub fn connect_one(filter: impl Into<Filter>) -> Self {
101        Self::Connect(vec![filter.into()])
102    }
103
104    /// Connect multiple existing records by filters.
105    pub fn connect(filters: Vec<impl Into<Filter>>) -> Self {
106        Self::Connect(filters.into_iter().map(Into::into).collect())
107    }
108
109    /// Disconnect a record by filter.
110    pub fn disconnect_one(filter: impl Into<Filter>) -> Self {
111        Self::Disconnect(vec![filter.into()])
112    }
113
114    /// Disconnect multiple records by filters.
115    pub fn disconnect(filters: Vec<impl Into<Filter>>) -> Self {
116        Self::Disconnect(filters.into_iter().map(Into::into).collect())
117    }
118
119    /// Set the relation to exactly these records.
120    pub fn set(filters: Vec<impl Into<Filter>>) -> Self {
121        Self::Set(filters.into_iter().map(Into::into).collect())
122    }
123
124    /// Delete related records.
125    pub fn delete(filters: Vec<impl Into<Filter>>) -> Self {
126        Self::Delete(filters.into_iter().map(Into::into).collect())
127    }
128
129    /// Delete many related records matching a filter.
130    pub fn delete_many(filter: impl Into<Filter>) -> Self {
131        Self::DeleteMany(filter.into())
132    }
133}
134
135/// Data for creating a nested record.
136#[derive(Debug, Clone)]
137pub struct NestedCreateData<T: Model> {
138    /// The create data fields.
139    pub data: Vec<(String, FilterValue)>,
140    /// Marker for the model type.
141    _model: PhantomData<T>,
142}
143
144impl<T: Model> NestedCreateData<T> {
145    /// Create new nested create data.
146    pub fn new(data: Vec<(String, FilterValue)>) -> Self {
147        Self {
148            data,
149            _model: PhantomData,
150        }
151    }
152
153    /// Create from field-value pairs.
154    pub fn from_pairs(
155        pairs: impl IntoIterator<Item = (impl Into<String>, impl Into<FilterValue>)>,
156    ) -> Self {
157        Self::new(
158            pairs
159                .into_iter()
160                .map(|(k, v)| (k.into(), v.into()))
161                .collect(),
162        )
163    }
164}
165
166impl<T: Model> Default for NestedCreateData<T> {
167    fn default() -> Self {
168        Self::new(Vec::new())
169    }
170}
171
172/// Data for creating or connecting a nested record.
173#[derive(Debug, Clone)]
174pub struct NestedCreateOrConnectData<T: Model> {
175    /// Filter to find existing record.
176    pub filter: Filter,
177    /// Data to create if not found.
178    pub create: NestedCreateData<T>,
179}
180
181impl<T: Model> NestedCreateOrConnectData<T> {
182    /// Create new create-or-connect data.
183    pub fn new(filter: impl Into<Filter>, create: NestedCreateData<T>) -> Self {
184        Self {
185            filter: filter.into(),
186            create,
187        }
188    }
189}
190
191/// Data for updating a nested record.
192#[derive(Debug, Clone)]
193pub struct NestedUpdateData<T: Model> {
194    /// Filter to find the record to update.
195    pub filter: Filter,
196    /// The update data fields.
197    pub data: Vec<(String, FilterValue)>,
198    /// Marker for the model type.
199    _model: PhantomData<T>,
200}
201
202impl<T: Model> NestedUpdateData<T> {
203    /// Create new nested update data.
204    pub fn new(filter: impl Into<Filter>, data: Vec<(String, FilterValue)>) -> Self {
205        Self {
206            filter: filter.into(),
207            data,
208            _model: PhantomData,
209        }
210    }
211
212    /// Create from filter and field-value pairs.
213    pub fn from_pairs(
214        filter: impl Into<Filter>,
215        pairs: impl IntoIterator<Item = (impl Into<String>, impl Into<FilterValue>)>,
216    ) -> Self {
217        Self::new(
218            filter,
219            pairs
220                .into_iter()
221                .map(|(k, v)| (k.into(), v.into()))
222                .collect(),
223        )
224    }
225}
226
227/// Data for upserting a nested record.
228#[derive(Debug, Clone)]
229pub struct NestedUpsertData<T: Model> {
230    /// Filter to find existing record.
231    pub filter: Filter,
232    /// Data to create if not found.
233    pub create: NestedCreateData<T>,
234    /// Data to update if found.
235    pub update: Vec<(String, FilterValue)>,
236    /// Marker for the model type.
237    _model: PhantomData<T>,
238}
239
240impl<T: Model> NestedUpsertData<T> {
241    /// Create new nested upsert data.
242    pub fn new(
243        filter: impl Into<Filter>,
244        create: NestedCreateData<T>,
245        update: Vec<(String, FilterValue)>,
246    ) -> Self {
247        Self {
248            filter: filter.into(),
249            create,
250            update,
251            _model: PhantomData,
252        }
253    }
254}
255
256/// Data for updating many nested records.
257#[derive(Debug, Clone)]
258pub struct NestedUpdateManyData<T: Model> {
259    /// Filter to match records.
260    pub filter: Filter,
261    /// The update data fields.
262    pub data: Vec<(String, FilterValue)>,
263    /// Marker for the model type.
264    _model: PhantomData<T>,
265}
266
267impl<T: Model> NestedUpdateManyData<T> {
268    /// Create new nested update-many data.
269    pub fn new(filter: impl Into<Filter>, data: Vec<(String, FilterValue)>) -> Self {
270        Self {
271            filter: filter.into(),
272            data,
273            _model: PhantomData,
274        }
275    }
276}
277
278/// Builder for nested write SQL operations.
279///
280/// The SQL emitters here currently bake in [`crate::dialect::Postgres`] —
281/// nested writes are not yet wired into a live client, and the placeholder
282/// syntax (`$N`) is Postgres-shaped. When this builder lands on the live
283/// client path the dialect should thread through from the engine.
284#[derive(Debug)]
285pub struct NestedWriteBuilder {
286    /// The parent table name.
287    parent_table: String,
288    /// The parent primary key column(s).
289    parent_pk: Vec<String>,
290    /// The related table name.
291    related_table: String,
292    /// The foreign key column on the related table.
293    foreign_key: String,
294    /// Whether this is a one-to-many (true) or many-to-many (false) relation.
295    is_one_to_many: bool,
296    /// Join table for many-to-many relations.
297    join_table: Option<JoinTableInfo>,
298}
299
300/// Information about a join table for many-to-many relations.
301#[derive(Debug, Clone)]
302pub struct JoinTableInfo {
303    /// The join table name.
304    pub table_name: String,
305    /// Column referencing the parent table.
306    pub parent_column: String,
307    /// Column referencing the related table.
308    pub related_column: String,
309}
310
311impl NestedWriteBuilder {
312    /// Create a builder for a one-to-many relation.
313    pub fn one_to_many(
314        parent_table: impl Into<String>,
315        parent_pk: Vec<String>,
316        related_table: impl Into<String>,
317        foreign_key: impl Into<String>,
318    ) -> Self {
319        Self {
320            parent_table: parent_table.into(),
321            parent_pk,
322            related_table: related_table.into(),
323            foreign_key: foreign_key.into(),
324            is_one_to_many: true,
325            join_table: None,
326        }
327    }
328
329    /// Create a builder for a many-to-many relation.
330    pub fn many_to_many(
331        parent_table: impl Into<String>,
332        parent_pk: Vec<String>,
333        related_table: impl Into<String>,
334        join_table: JoinTableInfo,
335    ) -> Self {
336        Self {
337            parent_table: parent_table.into(),
338            parent_pk,
339            related_table: related_table.into(),
340            foreign_key: String::new(), // Not used for many-to-many
341            is_one_to_many: false,
342            join_table: Some(join_table),
343        }
344    }
345
346    /// Build SQL for connecting records.
347    pub fn build_connect_sql<T: Model>(
348        &self,
349        parent_id: &FilterValue,
350        filters: &[Filter],
351    ) -> Vec<(String, Vec<FilterValue>)> {
352        let mut statements = Vec::new();
353
354        if self.is_one_to_many {
355            // For one-to-many, update the foreign key on related records
356            for filter in filters {
357                let (where_sql, mut params) = filter.to_sql(1, &crate::dialect::Postgres);
358                let sql = format!(
359                    "UPDATE {} SET {} = ${} WHERE {}",
360                    quote_identifier(&self.related_table),
361                    quote_identifier(&self.foreign_key),
362                    params.len() + 1,
363                    where_sql
364                );
365                params.push(parent_id.clone());
366                statements.push((sql, params));
367            }
368        } else if let Some(join) = &self.join_table {
369            // For many-to-many, insert into join table
370            // First, we need to get the IDs of the related records
371            for filter in filters {
372                let (where_sql, mut params) = filter.to_sql(1, &crate::dialect::Postgres);
373
374                // Get the related record ID (assuming single-column PK for now)
375                let select_sql = format!(
376                    "SELECT {} FROM {} WHERE {}",
377                    quote_identifier(T::PRIMARY_KEY.first().unwrap_or(&"id")),
378                    quote_identifier(&self.related_table),
379                    where_sql
380                );
381
382                // Insert into join table
383                let insert_sql = format!(
384                    "INSERT INTO {} ({}, {}) SELECT ${}, {} FROM {} WHERE {} ON CONFLICT DO NOTHING",
385                    quote_identifier(&join.table_name),
386                    quote_identifier(&join.parent_column),
387                    quote_identifier(&join.related_column),
388                    params.len() + 1,
389                    quote_identifier(T::PRIMARY_KEY.first().unwrap_or(&"id")),
390                    quote_identifier(&self.related_table),
391                    where_sql
392                );
393                params.push(parent_id.clone());
394                statements.push((insert_sql, params));
395                // Keep select_sql for potential subquery use
396                let _ = select_sql;
397            }
398        }
399
400        statements
401    }
402
403    /// Build SQL for disconnecting records.
404    pub fn build_disconnect_sql(
405        &self,
406        parent_id: &FilterValue,
407        filters: &[Filter],
408    ) -> Vec<(String, Vec<FilterValue>)> {
409        let mut statements = Vec::new();
410
411        if self.is_one_to_many {
412            // For one-to-many, set the foreign key to NULL
413            for filter in filters {
414                let (where_sql, mut params) = filter.to_sql(1, &crate::dialect::Postgres);
415                let sql = format!(
416                    "UPDATE {} SET {} = NULL WHERE {} AND {} = ${}",
417                    quote_identifier(&self.related_table),
418                    quote_identifier(&self.foreign_key),
419                    where_sql,
420                    quote_identifier(&self.foreign_key),
421                    params.len() + 1
422                );
423                params.push(parent_id.clone());
424                statements.push((sql, params));
425            }
426        } else if let Some(join) = &self.join_table {
427            // For many-to-many, delete from join table
428            for filter in filters {
429                let (where_sql, mut params) = filter.to_sql(2, &crate::dialect::Postgres);
430                let sql = format!(
431                    "DELETE FROM {} WHERE {} = $1 AND {} IN (SELECT id FROM {} WHERE {})",
432                    quote_identifier(&join.table_name),
433                    quote_identifier(&join.parent_column),
434                    quote_identifier(&join.related_column),
435                    quote_identifier(&self.related_table),
436                    where_sql
437                );
438                let mut final_params = vec![parent_id.clone()];
439                final_params.extend(params);
440                params = final_params;
441                statements.push((sql, params));
442            }
443        }
444
445        statements
446    }
447
448    /// Build SQL for setting the relation (disconnect all, then connect specified).
449    pub fn build_set_sql<T: Model>(
450        &self,
451        parent_id: &FilterValue,
452        filters: &[Filter],
453    ) -> Vec<(String, Vec<FilterValue>)> {
454        let mut statements = Vec::new();
455
456        // First, disconnect all existing relations
457        if self.is_one_to_many {
458            let sql = format!(
459                "UPDATE {} SET {} = NULL WHERE {} = $1",
460                quote_identifier(&self.related_table),
461                quote_identifier(&self.foreign_key),
462                quote_identifier(&self.foreign_key)
463            );
464            statements.push((sql, vec![parent_id.clone()]));
465        } else if let Some(join) = &self.join_table {
466            let sql = format!(
467                "DELETE FROM {} WHERE {} = $1",
468                quote_identifier(&join.table_name),
469                quote_identifier(&join.parent_column)
470            );
471            statements.push((sql, vec![parent_id.clone()]));
472        }
473
474        // Then connect the specified records
475        statements.extend(self.build_connect_sql::<T>(parent_id, filters));
476
477        statements
478    }
479
480    /// Build SQL for creating nested records.
481    pub fn build_create_sql<T: Model>(
482        &self,
483        parent_id: &FilterValue,
484        creates: &[NestedCreateData<T>],
485    ) -> Vec<(String, Vec<FilterValue>)> {
486        let mut statements = Vec::with_capacity(creates.len());
487        let quoted_table = quote_identifier(&self.related_table);
488
489        for create in creates {
490            let row_len = create.data.len() + 1;
491            let mut columns: Vec<String> = Vec::with_capacity(row_len);
492            let mut values: Vec<FilterValue> = Vec::with_capacity(row_len);
493            for (k, v) in &create.data {
494                columns.push(k.clone());
495                values.push(v.clone());
496            }
497
498            columns.push(self.foreign_key.clone());
499            values.push(parent_id.clone());
500
501            let mut col_list = String::new();
502            let mut placeholders = String::new();
503            for (i, c) in columns.iter().enumerate() {
504                if i > 0 {
505                    col_list.push_str(", ");
506                    placeholders.push_str(", ");
507                }
508                col_list.push_str(&quote_identifier(c));
509                use std::fmt::Write;
510                let _ = write!(placeholders, "${}", i + 1);
511            }
512
513            let sql = format!(
514                "INSERT INTO {} ({}) VALUES ({}) RETURNING *",
515                quoted_table, col_list, placeholders,
516            );
517
518            statements.push((sql, values));
519        }
520
521        statements
522    }
523
524    /// Build SQL for deleting nested records.
525    pub fn build_delete_sql(
526        &self,
527        parent_id: &FilterValue,
528        filters: &[Filter],
529    ) -> Vec<(String, Vec<FilterValue>)> {
530        let mut statements = Vec::new();
531
532        for filter in filters {
533            let (where_sql, mut params) = filter.to_sql(1, &crate::dialect::Postgres);
534            let sql = format!(
535                "DELETE FROM {} WHERE {} AND {} = ${}",
536                quote_identifier(&self.related_table),
537                where_sql,
538                quote_identifier(&self.foreign_key),
539                params.len() + 1
540            );
541            params.push(parent_id.clone());
542            statements.push((sql, params));
543        }
544
545        statements
546    }
547}
548
549/// A container for collecting all nested write operations to execute.
550#[derive(Debug, Default)]
551pub struct NestedWriteOperations {
552    /// SQL statements to execute before the main operation.
553    pub pre_statements: Vec<(String, Vec<FilterValue>)>,
554    /// SQL statements to execute after the main operation.
555    pub post_statements: Vec<(String, Vec<FilterValue>)>,
556}
557
558impl NestedWriteOperations {
559    /// Create a new empty container.
560    pub fn new() -> Self {
561        Self::default()
562    }
563
564    /// Add a pre-operation statement.
565    pub fn add_pre(&mut self, sql: String, params: Vec<FilterValue>) {
566        self.pre_statements.push((sql, params));
567    }
568
569    /// Add a post-operation statement.
570    pub fn add_post(&mut self, sql: String, params: Vec<FilterValue>) {
571        self.post_statements.push((sql, params));
572    }
573
574    /// Extend with statements from another container.
575    pub fn extend(&mut self, other: Self) {
576        self.pre_statements.extend(other.pre_statements);
577        self.post_statements.extend(other.post_statements);
578    }
579
580    /// Check if there are any operations.
581    pub fn is_empty(&self) -> bool {
582        self.pre_statements.is_empty() && self.post_statements.is_empty()
583    }
584
585    /// Get total number of statements.
586    pub fn len(&self) -> usize {
587        self.pre_statements.len() + self.post_statements.len()
588    }
589}
590
591/// Model-erased nested write op used by `CreateOperation::with(...)`.
592///
593/// The type-parameterized [`NestedWrite`] above is keyed on the parent
594/// model and doesn't compose across heterogeneous child types — a
595/// `CreateOperation<E, User>.with(posts_write)` needs to carry child
596/// writes for a different model (`Post`) than the parent, so `User`'s
597/// `NestedWrite<User>` can't encode them. This sibling enum drops the
598/// model type parameter and carries only the runtime metadata the
599/// execution path actually needs: the target table, the foreign-key
600/// column on that table, and the raw child-column payload.
601///
602/// Emitted by the codegen's per-relation `create()` / `connect()`
603/// helpers on `user::posts::*`. Payloads are a nested
604/// `Vec<Vec<(String, FilterValue)>>` rather than a strongly-typed
605/// `CreateInput` because the derive path doesn't currently emit a
606/// `CreateInput` struct per model — see the task docs for the trade-off
607/// and the upgrade path.
608#[derive(Debug, Clone)]
609pub enum NestedWriteOp {
610    /// Create children whose FK column points at the parent's PK.
611    ///
612    /// `relation` is retained for diagnostics/debugging; the executor
613    /// only needs `target_table`, `foreign_key`, and `payload`.
614    Create {
615        relation: &'static str,
616        target_table: &'static str,
617        foreign_key: &'static str,
618        /// One `Vec<(column, value)>` per child row. The FK column +
619        /// parent PK are appended by [`NestedWriteOp::execute`].
620        payload: Vec<Vec<(String, FilterValue)>>,
621    },
622    /// Connect an existing child row by its primary-key value.
623    ///
624    /// Lowers to
625    /// `UPDATE <target_table> SET <foreign_key> = <parent_pk> WHERE <target_pk> = <pk>`
626    /// at execute time. The identifier fields are `&'static str` because
627    /// they come from codegen-emitted constants on the per-relation
628    /// `RelationMeta` / `Model` types — the type itself enforces the
629    /// SQL-safety boundary (see `.cursor/rules/sql-safety.mdc`). Only
630    /// `parent_pk` and `pk` flow as `$N`-bound parameters.
631    Connect {
632        relation: &'static str,
633        target_table: &'static str,
634        foreign_key: &'static str,
635        target_pk: &'static str,
636        pk: FilterValue,
637    },
638    /// Disconnect a child row by clearing its FK column to `NULL`.
639    ///
640    /// Lowers to `UPDATE <target_table> SET <foreign_key> = NULL WHERE <target_pk> = <pk>`.
641    /// The child row persists; only the FK is cleared. Use
642    /// [`NestedWriteOp::Delete`] to remove the row entirely.
643    Disconnect {
644        relation: &'static str,
645        target_table: &'static str,
646        foreign_key: &'static str,
647        target_pk: &'static str,
648        pk: FilterValue,
649    },
650    /// Delete a child row by its primary key.
651    ///
652    /// Lowers to `DELETE FROM <target_table> WHERE <target_pk> = <pk>`.
653    /// Returns `QueryError::not_found` when the PK doesn't match any row,
654    /// matching the Connect-batch affected-rows contract.
655    Delete {
656        relation: &'static str,
657        target_table: &'static str,
658        target_pk: &'static str,
659        pk: FilterValue,
660    },
661    /// Delete many child rows matching a scalar filter, scoped to the
662    /// parent's children only.
663    ///
664    /// Lowers to `DELETE FROM <target_table> WHERE <foreign_key> = <parent_pk> AND <filter>`.
665    /// The AND-with-parent-FK clause is a safety bound enforced at SQL
666    /// emit time — the user-supplied filter cannot remove rows belonging
667    /// to other parents.
668    DeleteMany {
669        relation: &'static str,
670        target_table: &'static str,
671        foreign_key: &'static str,
672        filter: Filter,
673    },
674    /// Update a child row by its primary key.
675    ///
676    /// Lowers to
677    /// `UPDATE <target_table> SET <writeop-fragments> WHERE <target_pk> = $1`.
678    /// Each entry in `payload` contributes one column assignment whose
679    /// shape is determined by the [`crate::inputs::WriteOp`] variant
680    /// (plain set, atomic increment/decrement/multiply/divide, or
681    /// null-out via Unset). Returns `QueryError::not_found` when the PK
682    /// doesn't match any row, mirroring [`NestedWriteOp::Delete`]'s
683    /// affected-rows contract.
684    Update {
685        relation: &'static str,
686        target_table: &'static str,
687        target_pk: &'static str,
688        pk: FilterValue,
689        payload: Vec<(String, crate::inputs::WriteOp)>,
690    },
691    /// Update many child rows matching a filter, scoped to the parent's
692    /// children only.
693    ///
694    /// Lowers to
695    /// `UPDATE <target_table> SET <writeop-fragments> WHERE <foreign_key> = $1 AND <filter>`.
696    /// The AND-with-parent-FK clause is a safety bound enforced at SQL
697    /// emit time — the user-supplied filter cannot reach rows belonging
698    /// to other parents.
699    UpdateMany {
700        relation: &'static str,
701        target_table: &'static str,
702        foreign_key: &'static str,
703        filter: Filter,
704        payload: Vec<(String, crate::inputs::WriteOp)>,
705    },
706    /// Upsert: update if a row matches `pk`, else insert.
707    ///
708    /// On dialects with native single-statement upsert (Postgres, SQLite,
709    /// DuckDB, MySQL), emits one
710    /// `INSERT INTO <target_table> (<create_cols + fk>) VALUES (...)
711    /// ON CONFLICT (<target_pk>) DO UPDATE SET <update_writeops>`
712    /// (or `ON DUPLICATE KEY UPDATE ...` on MySQL). The `pk` field is
713    /// unused on this path — conflict detection comes from the inserted
714    /// row's PK column, which the codegen guarantees is present in
715    /// `create_payload`.
716    ///
717    /// On MSSQL and CQL, falls back to the two-statement form:
718    /// 1. `UPDATE <target_table> SET <update_writeops> WHERE <target_pk> = $1`
719    /// 2. If `affected_rows == 0`, `INSERT INTO <target_table>
720    ///    (<create_cols + fk>) VALUES (<...>)`.
721    ///
722    /// **Limitations of the fallback path** (MSSQL/CQL only):
723    /// - The `affected_rows == 0` check cannot distinguish "row absent"
724    ///   from "row exists but no columns changed" — a UPDATE that hits an
725    ///   identical row produces a spurious INSERT attempt and likely a PK
726    ///   unique-violation. Wrap the fallback in a transaction (or use a
727    ///   single-statement dialect) for strongest semantics.
728    /// - There is a TOCTOU race between the UPDATE returning 0 and the
729    ///   subsequent INSERT; a concurrent writer can insert the same row
730    ///   first.
731    ///
732    /// Document-store engines (`NotSql` dialect) are rejected at the top
733    /// of `execute` with `QueryError::unsupported(...)`.
734    ///
735    /// **Empty payloads**: an empty `update_payload` on a single-statement
736    /// dialect lowers to `ON CONFLICT (...) DO NOTHING` (PG/SQLite/DuckDB)
737    /// or `INSERT IGNORE INTO` (MySQL — idempotent no-op).
738    /// An empty `create_payload` errors with `QueryError::invalid_input`.
739    Upsert {
740        relation: &'static str,
741        target_table: &'static str,
742        foreign_key: &'static str,
743        target_pk: &'static str,
744        pk: FilterValue,
745        create_payload: Vec<(String, FilterValue)>,
746        update_payload: Vec<(String, crate::inputs::WriteOp)>,
747    },
748    /// Connect an existing child row if a `where` filter matches, else
749    /// insert a new one with the parent's FK spliced in.
750    ///
751    /// Two-statement engine-agnostic lowering:
752    /// 1. `UPDATE <target_table> SET <foreign_key> = $1 WHERE <filter>`
753    ///    (the connect path — points any matching row at the parent).
754    /// 2. If `affected_rows == 0`, emit
755    ///    `INSERT INTO <target_table> (<create_cols + foreign_key>) VALUES (<...>)`
756    ///    (the create path).
757    ///
758    /// If the filter matches multiple rows, every match has its FK pointed
759    /// at the parent — `connect_or_create` is typically used with a unique
760    /// where, but this is not enforced at runtime.
761    ///
762    /// As a safety measure, an empty (`Filter::None`) `where_filter` is
763    /// rejected at execute time — without this guard, the UPDATE would
764    /// lower to `... WHERE TRUE`, rewriting every row in the table.
765    ConnectOrCreate {
766        relation: &'static str,
767        target_table: &'static str,
768        foreign_key: &'static str,
769        where_filter: Filter,
770        create_payload: Vec<(String, FilterValue)>,
771    },
772    /// Replace the relation contents — after execution, exactly the
773    /// listed child rows are connected to the parent. Rows currently
774    /// connected that aren't in `set_pks` get their FK cleared; rows in
775    /// `set_pks` that aren't currently connected (or are connected to a
776    /// different parent) get their FK pointed at this parent.
777    ///
778    /// Two-statement engine-agnostic lowering:
779    /// 1. `UPDATE <target_table> SET <foreign_key> = NULL WHERE <foreign_key> = $parent AND <target_pk> NOT IN (set_pks)`
780    /// 2. `UPDATE <target_table> SET <foreign_key> = $parent WHERE <target_pk> IN (set_pks)`
781    ///
782    /// When `set_pks` is empty, step 1's `NOT IN ()` is invalid SQL —
783    /// the executor special-cases this to `UPDATE <child> SET <fk> = NULL
784    /// WHERE <fk> = $parent` (no NOT IN clause), then skips step 2.
785    ///
786    /// `set:` claims rows for this parent regardless of who they belonged
787    /// to before — pre-existing FK values get overwritten. This matches
788    /// Prisma's relation-replacement semantics.
789    Set {
790        relation: &'static str,
791        target_table: &'static str,
792        foreign_key: &'static str,
793        target_pk: &'static str,
794        set_pks: Vec<FilterValue>,
795    },
796}
797
798/// Shared SET-clause builder for the `Upsert` single-statement and
799/// two-statement fallback paths. `start_ph` lets the caller position
800/// placeholders after any preceding INSERT values; PG-family dialects
801/// use this index numerically, MySQL ignores it (`?` placeholders are
802/// positional via param push order).
803///
804/// `Unset` ops emit `col = NULL` and consume no placeholder slot.
805fn build_writeop_set_clause(
806    payload: &[(String, crate::inputs::WriteOp)],
807    dialect: &dyn crate::dialect::SqlDialect,
808    start_ph: usize,
809) -> (String, Vec<FilterValue>) {
810    let mut fragments: Vec<String> = Vec::with_capacity(payload.len());
811    let mut params: Vec<FilterValue> = Vec::with_capacity(payload.len());
812    let mut next_ph = start_ph;
813    for (col, op) in payload {
814        let (frag, maybe_val) =
815            op.to_set_fragment(&dialect.quote_ident(col), &dialect.placeholder(next_ph));
816        fragments.push(frag);
817        if let Some(val) = maybe_val {
818            params.push(val);
819            next_ph += 1;
820        }
821    }
822    (fragments.join(", "), params)
823}
824
825impl NestedWriteOp {
826    /// Execute this nested write inside `engine`, using `parent_pk`
827    /// as the foreign-key value to splice into each child row.
828    ///
829    /// For `Create`, this emits one `INSERT INTO <target_table> (...)`
830    /// per child, appending the FK column + parent PK to whatever
831    /// columns/values the caller supplied.
832    pub async fn execute<E>(self, engine: &E, parent_pk: &FilterValue) -> QueryResult<()>
833    where
834        E: QueryEngine,
835    {
836        match self {
837            NestedWriteOp::Connect {
838                relation: _,
839                target_table,
840                foreign_key,
841                target_pk,
842                pk,
843            } => {
844                let dialect = engine.dialect();
845                let sql = format!(
846                    "UPDATE {} SET {} = {} WHERE {} = {}",
847                    dialect.quote_ident(target_table),
848                    dialect.quote_ident(foreign_key),
849                    dialect.placeholder(1),
850                    dialect.quote_ident(target_pk),
851                    dialect.placeholder(2),
852                );
853                engine
854                    .execute_raw(&sql, vec![parent_pk.clone(), pk])
855                    .await?;
856                Ok(())
857            }
858            NestedWriteOp::Disconnect {
859                relation: _,
860                target_table,
861                foreign_key,
862                target_pk,
863                pk,
864            } => {
865                let dialect = engine.dialect();
866                let sql = format!(
867                    "UPDATE {} SET {} = NULL WHERE {} = {}",
868                    dialect.quote_ident(target_table),
869                    dialect.quote_ident(foreign_key),
870                    dialect.quote_ident(target_pk),
871                    dialect.placeholder(1),
872                );
873                engine.execute_raw(&sql, vec![pk]).await?;
874                Ok(())
875            }
876            NestedWriteOp::Delete {
877                relation: _,
878                target_table,
879                target_pk,
880                pk,
881            } => {
882                let dialect = engine.dialect();
883                let sql = format!(
884                    "DELETE FROM {} WHERE {} = {}",
885                    dialect.quote_ident(target_table),
886                    dialect.quote_ident(target_pk),
887                    dialect.placeholder(1),
888                );
889                let affected = engine.execute_raw(&sql, vec![pk]).await?;
890                if affected != 1 {
891                    return Err(crate::error::QueryError::not_found(target_table)
892                        .with_context("Nested Delete by PK"));
893                }
894                Ok(())
895            }
896            NestedWriteOp::DeleteMany {
897                relation: _,
898                target_table,
899                foreign_key,
900                filter,
901            } => {
902                let dialect = engine.dialect();
903                let is_unconstrained = matches!(filter, Filter::None);
904                let sql = if is_unconstrained {
905                    format!(
906                        "DELETE FROM {} WHERE {} = {}",
907                        dialect.quote_ident(target_table),
908                        dialect.quote_ident(foreign_key),
909                        dialect.placeholder(1),
910                    )
911                } else {
912                    let (filter_sql, params_tail) = filter.to_sql(2, &crate::dialect::Postgres);
913                    let sql = format!(
914                        "DELETE FROM {} WHERE {} = {} AND ({})",
915                        dialect.quote_ident(target_table),
916                        dialect.quote_ident(foreign_key),
917                        dialect.placeholder(1),
918                        filter_sql,
919                    );
920                    let mut params = Vec::with_capacity(params_tail.len() + 1);
921                    params.push(parent_pk.clone());
922                    params.extend(params_tail);
923                    return engine.execute_raw(&sql, params).await.map(|_| ());
924                };
925                engine.execute_raw(&sql, vec![parent_pk.clone()]).await?;
926                Ok(())
927            }
928            NestedWriteOp::Update {
929                relation: _,
930                target_table,
931                target_pk,
932                pk,
933                payload,
934            } => {
935                if payload.is_empty() {
936                    return Ok(());
937                }
938                let dialect = engine.dialect();
939                let (set_text, mut update_params) = build_writeop_set_clause(&payload, dialect, 1);
940                let next_placeholder = update_params.len() + 1;
941                update_params.push(pk);
942                let sql = format!(
943                    "UPDATE {} SET {} WHERE {} = {}",
944                    dialect.quote_ident(target_table),
945                    set_text,
946                    dialect.quote_ident(target_pk),
947                    dialect.placeholder(next_placeholder),
948                );
949                let affected = engine.execute_raw(&sql, update_params).await?;
950                if affected != 1 {
951                    return Err(crate::error::QueryError::not_found(target_table)
952                        .with_context("Nested Update by PK"));
953                }
954                Ok(())
955            }
956            NestedWriteOp::UpdateMany {
957                relation: _,
958                target_table,
959                foreign_key,
960                filter,
961                payload,
962            } => {
963                if payload.is_empty() {
964                    return Ok(());
965                }
966                let dialect = engine.dialect();
967                let (set_text, mut params) = build_writeop_set_clause(&payload, dialect, 1);
968                let fk_placeholder_idx = params.len() + 1;
969                let fk_placeholder = dialect.placeholder(fk_placeholder_idx);
970                let next_placeholder = fk_placeholder_idx + 1;
971                params.push(parent_pk.clone());
972
973                let is_unconstrained = matches!(filter, Filter::None);
974                let sql = if is_unconstrained {
975                    format!(
976                        "UPDATE {} SET {} WHERE {} = {}",
977                        dialect.quote_ident(target_table),
978                        set_text,
979                        dialect.quote_ident(foreign_key),
980                        fk_placeholder,
981                    )
982                } else {
983                    let (filter_sql, filter_params) =
984                        filter.to_sql(next_placeholder, &crate::dialect::Postgres);
985                    params.extend(filter_params);
986                    format!(
987                        "UPDATE {} SET {} WHERE {} = {} AND ({})",
988                        dialect.quote_ident(target_table),
989                        set_text,
990                        dialect.quote_ident(foreign_key),
991                        fk_placeholder,
992                        filter_sql,
993                    )
994                };
995                engine.execute_raw(&sql, params).await?;
996                Ok(())
997            }
998            NestedWriteOp::Upsert {
999                relation: _,
1000                target_table,
1001                foreign_key,
1002                target_pk,
1003                pk,
1004                create_payload,
1005                update_payload,
1006            } => {
1007                let dialect = engine.dialect();
1008
1009                // Guard: document-store engines (NotSql) panic on any SQL
1010                // emission helper. Short-circuit before touching the dialect.
1011                if !dialect.supports_upsert() {
1012                    return Err(crate::error::QueryError::unsupported(format!(
1013                        "Nested Upsert is not supported by the `{}` engine",
1014                        std::any::type_name::<dyn crate::dialect::SqlDialect>()
1015                    )));
1016                }
1017
1018                // Guard: an upsert with no payloads is a no-op.
1019                if update_payload.is_empty() && create_payload.is_empty() {
1020                    return Ok(());
1021                }
1022
1023                // Guard: create_payload must not be empty — we can't INSERT
1024                // a row with no columns.
1025                if create_payload.is_empty() {
1026                    return Err(crate::error::QueryError::invalid_input(
1027                        "create_payload",
1028                        "Nested Upsert requires at least one create column when no row to update",
1029                    ));
1030                }
1031
1032                // Probe the dialect for single-statement upsert support by
1033                // checking whether upsert_clause returns a non-empty string.
1034                //
1035                // For the normal path (non-empty update_payload): build the
1036                // SET fragment with placeholders positioned AFTER the INSERT's
1037                // column values (insert occupies $1..$N where N =
1038                // create_payload.len() + 1 for the FK).
1039                //
1040                // On PG-family dialects the `start_ph` arg positions SET-clause
1041                // placeholders after the INSERT VALUES placeholders ($N+1, $N+2, ...).
1042                // On MySQL the placeholder text is always `?` — the SET-vs-VALUES split
1043                // is enforced solely by the param-push order: INSERT values are pushed
1044                // first below, then SET params via `values.extend(probe_set_params)`.
1045                let insert_arity = create_payload.len() + 1; // cols + FK
1046                let (probe_set_text, probe_set_params) =
1047                    build_writeop_set_clause(&update_payload, dialect, insert_arity + 1);
1048
1049                // Builds columns + values + placeholders + quoted_cols from
1050                // create_payload + foreign_key + parent_pk. Used by both the
1051                // single-statement and two-statement INSERT phases below.
1052                let build_insert_columns_and_values = || {
1053                    let mut columns: Vec<String> =
1054                        create_payload.iter().map(|(c, _)| c.clone()).collect();
1055                    let mut values: Vec<FilterValue> =
1056                        create_payload.iter().map(|(_, v)| v.clone()).collect();
1057                    columns.push(foreign_key.to_string());
1058                    values.push(parent_pk.clone());
1059                    let placeholders: Vec<String> =
1060                        (1..=values.len()).map(|i| dialect.placeholder(i)).collect();
1061                    let quoted_cols: Vec<String> =
1062                        columns.iter().map(|c| dialect.quote_ident(c)).collect();
1063                    (columns, values, placeholders, quoted_cols)
1064                };
1065
1066                // Pass raw (unquoted) target_pk — upsert_clause implementations
1067                // are responsible for quoting identifiers internally.
1068                let conflict_cols = [target_pk];
1069                let upsert_clause_text = dialect.upsert_clause(&conflict_cols, &probe_set_text);
1070
1071                if !upsert_clause_text.is_empty() {
1072                    // Single-statement path:
1073                    //   INSERT INTO child (cols + fk) VALUES ($1..$N) ON CONFLICT (...) DO UPDATE SET ...
1074                    //   or (empty update_payload): INSERT … ON CONFLICT DO NOTHING / INSERT IGNORE
1075                    let (_, mut values, placeholders, quoted_cols) =
1076                        build_insert_columns_and_values();
1077
1078                    // Drop the `pk` value — the single-statement form never
1079                    // references it; the conflict target is derived from the
1080                    // inserted row's PK column (which the codegen guarantees
1081                    // is present when create_payload is populated by the macro).
1082                    let _ = pk;
1083
1084                    let effective_upsert_clause = if update_payload.is_empty() {
1085                        // Pure-insert semantics: collapse to conditional INSERT.
1086                        // Use upsert_do_nothing_clause for dialect-specific DO NOTHING syntax.
1087                        // MySQL returns the idempotent self-assign form; PG/SQLite/DuckDB
1088                        // return ON CONFLICT (...) DO NOTHING.
1089                        let do_nothing = dialect.upsert_do_nothing_clause(&conflict_cols);
1090                        if do_nothing.is_empty() {
1091                            // Fallback dialect (MSSQL/CQL) with empty upsert_do_nothing_clause
1092                            // but non-empty upsert_clause_text: very rare. Treat as no-op.
1093                            return Ok(());
1094                        }
1095                        do_nothing
1096                    } else {
1097                        values.extend(probe_set_params);
1098                        upsert_clause_text
1099                    };
1100
1101                    let sql = format!(
1102                        "INSERT INTO {} ({}) VALUES ({}){}",
1103                        dialect.quote_ident(target_table),
1104                        quoted_cols.join(", "),
1105                        placeholders.join(", "),
1106                        effective_upsert_clause,
1107                    );
1108                    engine.execute_raw(&sql, values).await?;
1109                    return Ok(());
1110                }
1111
1112                // Two-statement fallback for dialects without native upsert syntax
1113                // (MSSQL, CQL). NotSql engines short-circuit above with QueryError::unsupported.
1114                //
1115                // TODO(upsert-toctou): `affected_rows == 0` cannot distinguish
1116                // "no row matched" from "row matched but no columns changed"
1117                // (MSSQL `@@ROWCOUNT`, CQL row-count semantics). A no-op UPDATE
1118                // would incorrectly trigger the INSERT, producing a PK unique
1119                // violation. Wrapping in an explicit transaction requires plumbing
1120                // not yet available here; tracked for a follow-up PR.
1121                if update_payload.is_empty() {
1122                    // Pure-insert semantics on fallback path: skip the UPDATE
1123                    // entirely and emit a bare INSERT. If the row already exists
1124                    // the engine will surface a PK/unique violation, which is
1125                    // the correct behaviour for an insert-or-fail path on dialects
1126                    // without native upsert syntax.
1127                    let (_, values, placeholders, quoted_cols) = build_insert_columns_and_values();
1128                    let insert_sql = format!(
1129                        "INSERT INTO {} ({}) VALUES ({})",
1130                        dialect.quote_ident(target_table),
1131                        quoted_cols.join(", "),
1132                        placeholders.join(", "),
1133                    );
1134                    engine.execute_raw(&insert_sql, values).await?;
1135                    return Ok(());
1136                }
1137
1138                // Phase 1: attempt UPDATE.
1139                let (set_text, mut update_params) =
1140                    build_writeop_set_clause(&update_payload, dialect, 1);
1141                let next_placeholder = update_params.len() + 1;
1142                update_params.push(pk.clone());
1143                let update_sql = format!(
1144                    "UPDATE {} SET {} WHERE {} = {}",
1145                    dialect.quote_ident(target_table),
1146                    set_text,
1147                    dialect.quote_ident(target_pk),
1148                    dialect.placeholder(next_placeholder),
1149                );
1150                let affected = engine.execute_raw(&update_sql, update_params).await?;
1151                if affected > 0 {
1152                    return Ok(());
1153                }
1154                // Phase 2: row didn't exist — INSERT it with the FK spliced in.
1155                let (_, values, placeholders, quoted_cols) = build_insert_columns_and_values();
1156                let insert_sql = format!(
1157                    "INSERT INTO {} ({}) VALUES ({})",
1158                    dialect.quote_ident(target_table),
1159                    quoted_cols.join(", "),
1160                    placeholders.join(", "),
1161                );
1162                engine.execute_raw(&insert_sql, values).await?;
1163                Ok(())
1164            }
1165            NestedWriteOp::ConnectOrCreate {
1166                relation: _,
1167                target_table,
1168                foreign_key,
1169                where_filter,
1170                create_payload,
1171            } => {
1172                // Safety: reject an empty (`Filter::None`) where. Without
1173                // this guard the UPDATE would lower to `... WHERE TRUE`
1174                // (per `Filter::None::to_sql`), rewriting every row in
1175                // the child table. `connect_or_create` semantically
1176                // requires a unique-where lookup; an empty where is a
1177                // user error, not an "always match every row" command.
1178                if matches!(where_filter, Filter::None) {
1179                    return Err(crate::error::QueryError::not_found(target_table).with_context(
1180                        "Nested ConnectOrCreate: empty `where` block would match every row; supply a unique filter",
1181                    ));
1182                }
1183                let dialect = engine.dialect();
1184                // Phase 1: attempt UPDATE to connect existing row(s).
1185                let (filter_sql, filter_params) = where_filter.to_sql(2, &crate::dialect::Postgres);
1186                let mut update_params: Vec<FilterValue> =
1187                    Vec::with_capacity(filter_params.len() + 1);
1188                update_params.push(parent_pk.clone());
1189                update_params.extend(filter_params);
1190                let update_sql = format!(
1191                    "UPDATE {} SET {} = {} WHERE {}",
1192                    dialect.quote_ident(target_table),
1193                    dialect.quote_ident(foreign_key),
1194                    dialect.placeholder(1),
1195                    filter_sql,
1196                );
1197                let affected = engine.execute_raw(&update_sql, update_params).await?;
1198                if affected > 0 {
1199                    return Ok(());
1200                }
1201                // Phase 2: no row matched — INSERT with FK spliced in.
1202                if create_payload.is_empty() {
1203                    return Err(
1204                        crate::error::QueryError::not_found(target_table).with_context(
1205                            "Nested ConnectOrCreate: no match and create payload empty",
1206                        ),
1207                    );
1208                }
1209                let mut columns: Vec<String> =
1210                    create_payload.iter().map(|(c, _)| c.clone()).collect();
1211                let mut values: Vec<FilterValue> =
1212                    create_payload.into_iter().map(|(_, v)| v).collect();
1213                columns.push(foreign_key.to_string());
1214                values.push(parent_pk.clone());
1215                let placeholders: Vec<String> =
1216                    (1..=values.len()).map(|i| dialect.placeholder(i)).collect();
1217                let quoted_cols: Vec<String> =
1218                    columns.iter().map(|c| dialect.quote_ident(c)).collect();
1219                let insert_sql = format!(
1220                    "INSERT INTO {} ({}) VALUES ({})",
1221                    dialect.quote_ident(target_table),
1222                    quoted_cols.join(", "),
1223                    placeholders.join(", "),
1224                );
1225                engine.execute_raw(&insert_sql, values).await?;
1226                Ok(())
1227            }
1228            NestedWriteOp::Create {
1229                relation: _,
1230                target_table,
1231                foreign_key,
1232                payload,
1233            } => {
1234                if payload.is_empty() {
1235                    return Ok(());
1236                }
1237
1238                let dialect = engine.dialect();
1239
1240                // All rows in a single `Create` op share the same column
1241                // set (codegen guarantee). Derive columns from the first
1242                // row, then append the FK column once. Each row
1243                // contributes its values + the parent PK.
1244                let first = &payload[0];
1245                let mut columns: Vec<String> = first.iter().map(|(c, _)| c.clone()).collect();
1246                columns.push(foreign_key.to_string());
1247                let cols_per_row = columns.len();
1248
1249                let quoted_cols: Vec<String> =
1250                    columns.iter().map(|c| dialect.quote_ident(c)).collect();
1251
1252                let mut values: Vec<FilterValue> = Vec::with_capacity(payload.len() * cols_per_row);
1253                let mut row_placeholders: Vec<String> = Vec::with_capacity(payload.len());
1254                let mut next_placeholder = 1usize;
1255
1256                for child in payload {
1257                    let mut row_phs: Vec<String> = Vec::with_capacity(cols_per_row);
1258                    for (_, v) in child {
1259                        values.push(v);
1260                        row_phs.push(dialect.placeholder(next_placeholder));
1261                        next_placeholder += 1;
1262                    }
1263                    values.push(parent_pk.clone());
1264                    row_phs.push(dialect.placeholder(next_placeholder));
1265                    next_placeholder += 1;
1266                    row_placeholders.push(format!("({})", row_phs.join(", ")));
1267                }
1268
1269                // NOTE: Combining all rows into a single multi-VALUES
1270                // INSERT means any constraint failure rolls back the
1271                // entire batch, not just the failing row. This matches
1272                // typical Prisma semantics for nested writes.
1273                let sql = format!(
1274                    "INSERT INTO {} ({}) VALUES {}",
1275                    dialect.quote_ident(target_table),
1276                    quoted_cols.join(", "),
1277                    row_placeholders.join(", "),
1278                );
1279
1280                engine.execute_raw(&sql, values).await?;
1281                Ok(())
1282            }
1283            NestedWriteOp::Set {
1284                relation: _,
1285                target_table,
1286                foreign_key,
1287                target_pk,
1288                set_pks,
1289            } => {
1290                let dialect = engine.dialect();
1291
1292                // Phase 1: disconnect current children not in set_pks.
1293                if set_pks.is_empty() {
1294                    // No NOT IN clause needed — clear every child of this parent.
1295                    let sql = format!(
1296                        "UPDATE {} SET {} = NULL WHERE {} = {}",
1297                        dialect.quote_ident(target_table),
1298                        dialect.quote_ident(foreign_key),
1299                        dialect.quote_ident(foreign_key),
1300                        dialect.placeholder(1),
1301                    );
1302                    engine.execute_raw(&sql, vec![parent_pk.clone()]).await?;
1303                    return Ok(());
1304                }
1305                // set_pks is non-empty — emit disconnect with NOT IN clause + connect.
1306                let mut disconnect_params: Vec<FilterValue> = Vec::with_capacity(set_pks.len() + 1);
1307                disconnect_params.push(parent_pk.clone());
1308                let mut not_in_placeholders: Vec<String> = Vec::with_capacity(set_pks.len());
1309                for (i, pk) in set_pks.iter().enumerate() {
1310                    disconnect_params.push(pk.clone());
1311                    not_in_placeholders.push(dialect.placeholder(i + 2));
1312                }
1313                let disconnect_sql = format!(
1314                    "UPDATE {} SET {} = NULL WHERE {} = {} AND {} NOT IN ({})",
1315                    dialect.quote_ident(target_table),
1316                    dialect.quote_ident(foreign_key),
1317                    dialect.quote_ident(foreign_key),
1318                    dialect.placeholder(1),
1319                    dialect.quote_ident(target_pk),
1320                    not_in_placeholders.join(", "),
1321                );
1322                engine
1323                    .execute_raw(&disconnect_sql, disconnect_params)
1324                    .await?;
1325
1326                // Phase 2: connect every row in set_pks (idempotent for already-connected).
1327                let mut connect_params: Vec<FilterValue> = Vec::with_capacity(set_pks.len() + 1);
1328                connect_params.push(parent_pk.clone());
1329                let mut in_placeholders: Vec<String> = Vec::with_capacity(set_pks.len());
1330                for (i, pk) in set_pks.iter().enumerate() {
1331                    connect_params.push(pk.clone());
1332                    in_placeholders.push(dialect.placeholder(i + 2));
1333                }
1334                let connect_sql = format!(
1335                    "UPDATE {} SET {} = {} WHERE {} IN ({})",
1336                    dialect.quote_ident(target_table),
1337                    dialect.quote_ident(foreign_key),
1338                    dialect.placeholder(1),
1339                    dialect.quote_ident(target_pk),
1340                    in_placeholders.join(", "),
1341                );
1342                engine.execute_raw(&connect_sql, connect_params).await?;
1343                Ok(())
1344            }
1345        }
1346    }
1347}
1348
1349#[cfg(test)]
1350mod tests {
1351    use super::*;
1352    use std::sync::{Arc, Mutex};
1353
1354    use crate::error::QueryError;
1355    use crate::traits::BoxFuture;
1356
1357    /// Captured (sql, params) entries from the mock engine.
1358    type StatementLog = Arc<Mutex<Vec<(String, Vec<FilterValue>)>>>;
1359
1360    #[derive(Clone, Copy)]
1361    enum DialectKind {
1362        Postgres,
1363        Mssql,
1364        NotSql,
1365    }
1366
1367    /// Recording mock engine for [`NestedWriteOp::execute`] tests.
1368    ///
1369    /// Captures the (sql, params) of every [`QueryEngine::execute_raw`]
1370    /// call so tests can assert the lowered shape. Returns 1 from
1371    /// `execute_raw` by default; tests that need to exercise the
1372    /// zero-affected-rows path (e.g. upsert's insert phase) can supply a
1373    /// sequence of affected-row counts via [`RecordingEngine::with_affected`].
1374    #[derive(Clone)]
1375    struct RecordingEngine {
1376        recorded: StatementLog,
1377        affected: Arc<Mutex<Vec<u64>>>,
1378        dialect_kind: DialectKind,
1379    }
1380
1381    impl RecordingEngine {
1382        fn new() -> Self {
1383            Self {
1384                recorded: Arc::new(Mutex::new(Vec::new())),
1385                affected: Arc::new(Mutex::new(Vec::new())),
1386                dialect_kind: DialectKind::Postgres,
1387            }
1388        }
1389
1390        /// Build an engine that returns each entry of `seq` from successive
1391        /// `execute_raw` calls, in order. Once exhausted, falls back to 1.
1392        fn with_affected(seq: Vec<u64>) -> Self {
1393            // Stored in reverse so we can pop in O(1).
1394            let mut rev = seq;
1395            rev.reverse();
1396            Self {
1397                recorded: Arc::new(Mutex::new(Vec::new())),
1398                affected: Arc::new(Mutex::new(rev)),
1399                dialect_kind: DialectKind::Postgres,
1400            }
1401        }
1402
1403        fn mssql() -> Self {
1404            Self {
1405                dialect_kind: DialectKind::Mssql,
1406                ..Self::new()
1407            }
1408        }
1409
1410        fn mssql_with_affected(seq: Vec<u64>) -> Self {
1411            Self {
1412                dialect_kind: DialectKind::Mssql,
1413                ..Self::with_affected(seq)
1414            }
1415        }
1416
1417        fn notsql() -> Self {
1418            Self {
1419                dialect_kind: DialectKind::NotSql,
1420                ..Self::new()
1421            }
1422        }
1423
1424        fn statements(&self) -> Vec<(String, Vec<FilterValue>)> {
1425            self.recorded.lock().unwrap().clone()
1426        }
1427    }
1428
1429    impl crate::traits::QueryEngine for RecordingEngine {
1430        fn dialect(&self) -> &dyn crate::dialect::SqlDialect {
1431            match self.dialect_kind {
1432                DialectKind::Postgres => &crate::dialect::Postgres,
1433                DialectKind::Mssql => &crate::dialect::Mssql,
1434                DialectKind::NotSql => &crate::dialect::NotSql,
1435            }
1436        }
1437
1438        fn query_many<T: Model + crate::row::FromRow + Send + 'static>(
1439            &self,
1440            _sql: &str,
1441            _params: Vec<FilterValue>,
1442        ) -> BoxFuture<'_, QueryResult<Vec<T>>> {
1443            Box::pin(async { Ok(Vec::new()) })
1444        }
1445
1446        fn query_one<T: Model + crate::row::FromRow + Send + 'static>(
1447            &self,
1448            _sql: &str,
1449            _params: Vec<FilterValue>,
1450        ) -> BoxFuture<'_, QueryResult<T>> {
1451            Box::pin(async { Err(QueryError::not_found("test")) })
1452        }
1453
1454        fn query_optional<T: Model + crate::row::FromRow + Send + 'static>(
1455            &self,
1456            _sql: &str,
1457            _params: Vec<FilterValue>,
1458        ) -> BoxFuture<'_, QueryResult<Option<T>>> {
1459            Box::pin(async { Ok(None) })
1460        }
1461
1462        fn execute_insert<T: Model + crate::row::FromRow + Send + 'static>(
1463            &self,
1464            _sql: &str,
1465            _params: Vec<FilterValue>,
1466        ) -> BoxFuture<'_, QueryResult<T>> {
1467            Box::pin(async { Err(QueryError::not_found("test")) })
1468        }
1469
1470        fn execute_update<T: Model + crate::row::FromRow + Send + 'static>(
1471            &self,
1472            _sql: &str,
1473            _params: Vec<FilterValue>,
1474        ) -> BoxFuture<'_, QueryResult<Vec<T>>> {
1475            Box::pin(async { Ok(Vec::new()) })
1476        }
1477
1478        fn execute_delete(
1479            &self,
1480            _sql: &str,
1481            _params: Vec<FilterValue>,
1482        ) -> BoxFuture<'_, QueryResult<u64>> {
1483            Box::pin(async { Ok(0) })
1484        }
1485
1486        fn execute_raw(
1487            &self,
1488            sql: &str,
1489            params: Vec<FilterValue>,
1490        ) -> BoxFuture<'_, QueryResult<u64>> {
1491            let recorded = self.recorded.clone();
1492            let affected = self.affected.clone();
1493            let sql = sql.to_string();
1494            Box::pin(async move {
1495                recorded.lock().unwrap().push((sql, params));
1496                let next = affected.lock().unwrap().pop().unwrap_or(1);
1497                Ok(next)
1498            })
1499        }
1500
1501        fn count(&self, _sql: &str, _params: Vec<FilterValue>) -> BoxFuture<'_, QueryResult<u64>> {
1502            Box::pin(async { Ok(0) })
1503        }
1504    }
1505
1506    struct TestModel;
1507
1508    impl Model for TestModel {
1509        const MODEL_NAME: &'static str = "Post";
1510        const TABLE_NAME: &'static str = "posts";
1511        const PRIMARY_KEY: &'static [&'static str] = &["id"];
1512        const COLUMNS: &'static [&'static str] = &["id", "title", "user_id"];
1513    }
1514
1515    struct TagModel;
1516
1517    impl Model for TagModel {
1518        const MODEL_NAME: &'static str = "Tag";
1519        const TABLE_NAME: &'static str = "tags";
1520        const PRIMARY_KEY: &'static [&'static str] = &["id"];
1521        const COLUMNS: &'static [&'static str] = &["id", "name"];
1522    }
1523
1524    #[test]
1525    fn test_nested_create_data() {
1526        let data: NestedCreateData<TestModel> =
1527            NestedCreateData::from_pairs([("title", FilterValue::String("Test Post".to_string()))]);
1528
1529        assert_eq!(data.data.len(), 1);
1530        assert_eq!(data.data[0].0, "title");
1531    }
1532
1533    #[test]
1534    fn test_nested_write_create() {
1535        let data: NestedCreateData<TestModel> =
1536            NestedCreateData::from_pairs([("title", FilterValue::String("Test Post".to_string()))]);
1537
1538        let write: NestedWrite<TestModel> = NestedWrite::create(data);
1539
1540        match write {
1541            NestedWrite::Create(creates) => assert_eq!(creates.len(), 1),
1542            _ => panic!("Expected Create variant"),
1543        }
1544    }
1545
1546    #[test]
1547    fn test_nested_write_connect() {
1548        let write: NestedWrite<TestModel> = NestedWrite::connect(vec![
1549            Filter::Equals("id".into(), FilterValue::Int(1)),
1550            Filter::Equals("id".into(), FilterValue::Int(2)),
1551        ]);
1552
1553        match write {
1554            NestedWrite::Connect(filters) => assert_eq!(filters.len(), 2),
1555            _ => panic!("Expected Connect variant"),
1556        }
1557    }
1558
1559    #[test]
1560    fn test_nested_write_disconnect() {
1561        let write: NestedWrite<TestModel> =
1562            NestedWrite::disconnect_one(Filter::Equals("id".into(), FilterValue::Int(1)));
1563
1564        match write {
1565            NestedWrite::Disconnect(filters) => assert_eq!(filters.len(), 1),
1566            _ => panic!("Expected Disconnect variant"),
1567        }
1568    }
1569
1570    #[test]
1571    fn test_nested_write_set() {
1572        let write: NestedWrite<TestModel> =
1573            NestedWrite::set(vec![Filter::Equals("id".into(), FilterValue::Int(1))]);
1574
1575        match write {
1576            NestedWrite::Set(filters) => assert_eq!(filters.len(), 1),
1577            _ => panic!("Expected Set variant"),
1578        }
1579    }
1580
1581    #[test]
1582    fn test_builder_one_to_many_connect() {
1583        let builder =
1584            NestedWriteBuilder::one_to_many("users", vec!["id".to_string()], "posts", "user_id");
1585
1586        let parent_id = FilterValue::Int(1);
1587        let filters = vec![Filter::Equals("id".into(), FilterValue::Int(10))];
1588
1589        let statements = builder.build_connect_sql::<TestModel>(&parent_id, &filters);
1590
1591        assert_eq!(statements.len(), 1);
1592        let (sql, params) = &statements[0];
1593        assert!(sql.contains("UPDATE"));
1594        assert!(sql.contains("posts"));
1595        assert!(sql.contains("user_id"));
1596        assert_eq!(params.len(), 2);
1597    }
1598
1599    #[test]
1600    fn test_builder_one_to_many_disconnect() {
1601        let builder =
1602            NestedWriteBuilder::one_to_many("users", vec!["id".to_string()], "posts", "user_id");
1603
1604        let parent_id = FilterValue::Int(1);
1605        let filters = vec![Filter::Equals("id".into(), FilterValue::Int(10))];
1606
1607        let statements = builder.build_disconnect_sql(&parent_id, &filters);
1608
1609        assert_eq!(statements.len(), 1);
1610        let (sql, params) = &statements[0];
1611        assert!(sql.contains("UPDATE"));
1612        assert!(sql.contains("SET"));
1613        assert!(sql.contains("NULL"));
1614        assert_eq!(params.len(), 2);
1615    }
1616
1617    #[test]
1618    fn test_builder_many_to_many_connect() {
1619        let builder = NestedWriteBuilder::many_to_many(
1620            "posts",
1621            vec!["id".to_string()],
1622            "tags",
1623            JoinTableInfo {
1624                table_name: "post_tags".to_string(),
1625                parent_column: "post_id".to_string(),
1626                related_column: "tag_id".to_string(),
1627            },
1628        );
1629
1630        let parent_id = FilterValue::Int(1);
1631        let filters = vec![Filter::Equals("id".into(), FilterValue::Int(10))];
1632
1633        let statements = builder.build_connect_sql::<TagModel>(&parent_id, &filters);
1634
1635        assert_eq!(statements.len(), 1);
1636        let (sql, _params) = &statements[0];
1637        assert!(sql.contains("INSERT INTO"));
1638        assert!(sql.contains("post_tags"));
1639        assert!(sql.contains("ON CONFLICT DO NOTHING"));
1640    }
1641
1642    #[test]
1643    fn test_builder_create() {
1644        let builder =
1645            NestedWriteBuilder::one_to_many("users", vec!["id".to_string()], "posts", "user_id");
1646
1647        let parent_id = FilterValue::Int(1);
1648        let creates = vec![NestedCreateData::<TestModel>::from_pairs([(
1649            "title",
1650            FilterValue::String("New Post".to_string()),
1651        )])];
1652
1653        let statements = builder.build_create_sql::<TestModel>(&parent_id, &creates);
1654
1655        assert_eq!(statements.len(), 1);
1656        let (sql, params) = &statements[0];
1657        assert!(sql.contains("INSERT INTO"));
1658        assert!(sql.contains("posts"));
1659        assert!(sql.contains("RETURNING"));
1660        assert_eq!(params.len(), 2); // title + user_id
1661    }
1662
1663    #[test]
1664    fn test_builder_set() {
1665        let builder =
1666            NestedWriteBuilder::one_to_many("users", vec!["id".to_string()], "posts", "user_id");
1667
1668        let parent_id = FilterValue::Int(1);
1669        let filters = vec![Filter::Equals("id".into(), FilterValue::Int(10))];
1670
1671        let statements = builder.build_set_sql::<TestModel>(&parent_id, &filters);
1672
1673        // Should have disconnect all + connect statements
1674        assert!(statements.len() >= 2);
1675
1676        // First statement should disconnect all
1677        let (first_sql, _) = &statements[0];
1678        assert!(first_sql.contains("UPDATE"));
1679        assert!(first_sql.contains("NULL"));
1680    }
1681
1682    #[test]
1683    fn test_nested_write_operations() {
1684        let mut ops = NestedWriteOperations::new();
1685        assert!(ops.is_empty());
1686        assert_eq!(ops.len(), 0);
1687
1688        ops.add_pre("SELECT 1".to_string(), vec![]);
1689        ops.add_post("SELECT 2".to_string(), vec![]);
1690
1691        assert!(!ops.is_empty());
1692        assert_eq!(ops.len(), 2);
1693    }
1694
1695    #[test]
1696    fn test_nested_create_or_connect() {
1697        let create_data: NestedCreateData<TestModel> =
1698            NestedCreateData::from_pairs([("title", FilterValue::String("New Post".to_string()))]);
1699
1700        let create_or_connect = NestedCreateOrConnectData::new(
1701            Filter::Equals("title".into(), FilterValue::String("Existing".to_string())),
1702            create_data,
1703        );
1704
1705        assert!(matches!(create_or_connect.filter, Filter::Equals(..)));
1706        assert_eq!(create_or_connect.create.data.len(), 1);
1707    }
1708
1709    #[test]
1710    fn test_nested_update_data() {
1711        let update: NestedUpdateData<TestModel> = NestedUpdateData::from_pairs(
1712            Filter::Equals("id".into(), FilterValue::Int(1)),
1713            [("title", FilterValue::String("Updated".to_string()))],
1714        );
1715
1716        assert!(matches!(update.filter, Filter::Equals(..)));
1717        assert_eq!(update.data.len(), 1);
1718        assert_eq!(update.data[0].0, "title");
1719    }
1720
1721    #[tokio::test]
1722    async fn nested_op_connect_emits_update_set_where() {
1723        let engine = RecordingEngine::new();
1724        let op = NestedWriteOp::Connect {
1725            relation: "posts",
1726            target_table: "posts",
1727            foreign_key: "author_id",
1728            target_pk: "id",
1729            pk: FilterValue::Int(42),
1730        };
1731        let parent_pk = FilterValue::Int(7);
1732        op.execute(&engine, &parent_pk).await.unwrap();
1733
1734        let stmts = engine.statements();
1735        assert_eq!(stmts.len(), 1, "expected one UPDATE statement");
1736        let (sql, params) = &stmts[0];
1737        // Postgres dialect quotes idents with double quotes.
1738        assert!(sql.contains("UPDATE"), "got: {sql}");
1739        assert!(sql.contains("posts"), "got: {sql}");
1740        assert!(sql.contains("author_id"), "got: {sql}");
1741        assert!(sql.contains("SET"), "got: {sql}");
1742        assert!(sql.contains("WHERE"), "got: {sql}");
1743        assert!(sql.contains("$1"), "got: {sql}");
1744        assert!(sql.contains("$2"), "got: {sql}");
1745        assert_eq!(params, &vec![FilterValue::Int(7), FilterValue::Int(42)]);
1746    }
1747
1748    #[tokio::test]
1749    async fn nested_op_delete_many_with_filter_emits_fk_and_filter_clause() {
1750        let engine = RecordingEngine::new();
1751        let op = NestedWriteOp::DeleteMany {
1752            relation: "posts",
1753            target_table: "posts",
1754            foreign_key: "author_id",
1755            filter: Filter::Equals("published".into(), FilterValue::Bool(false)),
1756        };
1757        op.execute(&engine, &FilterValue::Int(7)).await.unwrap();
1758
1759        let stmts = engine.statements();
1760        assert_eq!(stmts.len(), 1);
1761        let (sql, params) = &stmts[0];
1762        assert!(sql.contains("DELETE FROM"), "got: {sql}");
1763        assert!(sql.contains("author_id"), "got: {sql}");
1764        assert!(sql.contains("$1"), "got: {sql}");
1765        assert!(sql.contains("AND"), "got: {sql}");
1766        assert!(sql.contains("published"), "got: {sql}");
1767        assert_eq!(params.len(), 2);
1768        assert!(matches!(params[0], FilterValue::Int(7)));
1769        assert!(matches!(params[1], FilterValue::Bool(false)));
1770    }
1771
1772    #[tokio::test]
1773    async fn nested_op_delete_many_with_empty_filter_omits_and_clause() {
1774        let engine = RecordingEngine::new();
1775        let op = NestedWriteOp::DeleteMany {
1776            relation: "posts",
1777            target_table: "posts",
1778            foreign_key: "author_id",
1779            filter: Filter::None,
1780        };
1781        op.execute(&engine, &FilterValue::Int(7)).await.unwrap();
1782
1783        let stmts = engine.statements();
1784        let (sql, params) = &stmts[0];
1785        assert!(sql.contains("DELETE FROM"), "got: {sql}");
1786        assert!(
1787            !sql.contains("AND"),
1788            "should omit AND when filter empty: {sql}"
1789        );
1790        assert_eq!(params.len(), 1);
1791    }
1792
1793    #[tokio::test]
1794    async fn nested_op_delete_emits_delete_where_pk() {
1795        let engine = RecordingEngine::new();
1796        let op = NestedWriteOp::Delete {
1797            relation: "posts",
1798            target_table: "posts",
1799            target_pk: "id",
1800            pk: FilterValue::Int(42),
1801        };
1802        op.execute(&engine, &FilterValue::Int(7)).await.unwrap();
1803
1804        let stmts = engine.statements();
1805        assert_eq!(stmts.len(), 1);
1806        let (sql, params) = &stmts[0];
1807        assert!(sql.contains("DELETE FROM"), "got: {sql}");
1808        assert!(sql.contains("posts"), "got: {sql}");
1809        assert!(sql.contains("WHERE"), "got: {sql}");
1810        assert_eq!(params, &vec![FilterValue::Int(42)]);
1811    }
1812
1813    #[tokio::test]
1814    async fn nested_op_disconnect_emits_update_set_null() {
1815        let engine = RecordingEngine::new();
1816        let op = NestedWriteOp::Disconnect {
1817            relation: "posts",
1818            target_table: "posts",
1819            foreign_key: "author_id",
1820            target_pk: "id",
1821            pk: FilterValue::Int(42),
1822        };
1823        op.execute(&engine, &FilterValue::Int(7)).await.unwrap();
1824
1825        let stmts = engine.statements();
1826        assert_eq!(stmts.len(), 1);
1827        let (sql, params) = &stmts[0];
1828        assert!(sql.contains("UPDATE"), "got: {sql}");
1829        assert!(sql.contains("posts"), "got: {sql}");
1830        assert!(sql.contains("author_id"), "got: {sql}");
1831        assert!(sql.contains("NULL"), "got: {sql}");
1832        assert!(sql.contains("WHERE"), "got: {sql}");
1833        assert_eq!(params, &vec![FilterValue::Int(42)]);
1834    }
1835
1836    #[tokio::test]
1837    async fn nested_op_update_plain_set() {
1838        use crate::inputs::WriteOp;
1839        let engine = RecordingEngine::new();
1840        let op = NestedWriteOp::Update {
1841            relation: "posts",
1842            target_table: "posts",
1843            target_pk: "id",
1844            pk: FilterValue::Int(42),
1845            payload: vec![(
1846                "title".to_string(),
1847                WriteOp::Set(FilterValue::String("renamed".to_string())),
1848            )],
1849        };
1850        op.execute(&engine, &FilterValue::Int(7)).await.unwrap();
1851
1852        let stmts = engine.statements();
1853        assert_eq!(stmts.len(), 1);
1854        let (sql, params) = &stmts[0];
1855        assert!(sql.contains("UPDATE"), "got: {sql}");
1856        assert!(sql.contains("posts"), "got: {sql}");
1857        assert!(sql.contains("title"), "got: {sql}");
1858        assert!(sql.contains("SET"), "got: {sql}");
1859        assert!(sql.contains("WHERE"), "got: {sql}");
1860        assert!(sql.contains("$1"), "got: {sql}");
1861        assert!(sql.contains("$2"), "got: {sql}");
1862        assert_eq!(params.len(), 2);
1863        assert!(matches!(params[0], FilterValue::String(_)));
1864        assert_eq!(params[1], FilterValue::Int(42));
1865    }
1866
1867    #[tokio::test]
1868    async fn nested_op_update_increment() {
1869        use crate::inputs::WriteOp;
1870        let engine = RecordingEngine::new();
1871        let op = NestedWriteOp::Update {
1872            relation: "posts",
1873            target_table: "posts",
1874            target_pk: "id",
1875            pk: FilterValue::Int(42),
1876            payload: vec![("views".to_string(), WriteOp::Increment(FilterValue::Int(1)))],
1877        };
1878        op.execute(&engine, &FilterValue::Int(7)).await.unwrap();
1879
1880        let stmts = engine.statements();
1881        let (sql, _) = &stmts[0];
1882        // Postgres dialect quotes idents — fragment will read `"views" = "views" + $1`.
1883        assert!(sql.contains("+"), "got: {sql}");
1884        assert!(sql.contains("views"), "got: {sql}");
1885    }
1886
1887    #[tokio::test]
1888    async fn nested_op_update_mixed_set_and_increment() {
1889        use crate::inputs::WriteOp;
1890        let engine = RecordingEngine::new();
1891        let op = NestedWriteOp::Update {
1892            relation: "posts",
1893            target_table: "posts",
1894            target_pk: "id",
1895            pk: FilterValue::Int(42),
1896            payload: vec![
1897                (
1898                    "title".to_string(),
1899                    WriteOp::Set(FilterValue::String("renamed".to_string())),
1900                ),
1901                ("views".to_string(), WriteOp::Increment(FilterValue::Int(1))),
1902            ],
1903        };
1904        op.execute(&engine, &FilterValue::Int(7)).await.unwrap();
1905
1906        let stmts = engine.statements();
1907        let (sql, params) = &stmts[0];
1908        assert!(sql.contains("title"), "got: {sql}");
1909        assert!(sql.contains("views"), "got: {sql}");
1910        assert!(sql.contains("+"), "got: {sql}");
1911        // 2 SET params + 1 PK = 3 placeholders.
1912        assert!(sql.contains("$3"), "got: {sql}");
1913        assert_eq!(params.len(), 3);
1914    }
1915
1916    #[tokio::test]
1917    async fn nested_op_update_empty_payload_is_noop() {
1918        let engine = RecordingEngine::new();
1919        let op = NestedWriteOp::Update {
1920            relation: "posts",
1921            target_table: "posts",
1922            target_pk: "id",
1923            pk: FilterValue::Int(42),
1924            payload: vec![],
1925        };
1926        op.execute(&engine, &FilterValue::Int(7)).await.unwrap();
1927        assert!(
1928            engine.statements().is_empty(),
1929            "empty payload should emit no SQL"
1930        );
1931    }
1932
1933    #[tokio::test]
1934    async fn nested_op_update_many_with_filter() {
1935        use crate::inputs::WriteOp;
1936        let engine = RecordingEngine::new();
1937        let op = NestedWriteOp::UpdateMany {
1938            relation: "posts",
1939            target_table: "posts",
1940            foreign_key: "author_id",
1941            filter: Filter::Equals("published".into(), FilterValue::Bool(false)),
1942            payload: vec![("views".to_string(), WriteOp::Set(FilterValue::Int(0)))],
1943        };
1944        op.execute(&engine, &FilterValue::Int(7)).await.unwrap();
1945
1946        let stmts = engine.statements();
1947        assert_eq!(stmts.len(), 1);
1948        let (sql, params) = &stmts[0];
1949        assert!(sql.contains("UPDATE"), "got: {sql}");
1950        assert!(sql.contains("posts"), "got: {sql}");
1951        assert!(sql.contains("author_id"), "got: {sql}");
1952        assert!(sql.contains("AND"), "got: {sql}");
1953        assert!(sql.contains("published"), "got: {sql}");
1954        // payload value + FK + filter value = 3 params
1955        assert_eq!(params.len(), 3);
1956        assert_eq!(params[0], FilterValue::Int(0));
1957        assert_eq!(params[1], FilterValue::Int(7));
1958        assert_eq!(params[2], FilterValue::Bool(false));
1959    }
1960
1961    #[tokio::test]
1962    async fn nested_op_update_many_with_empty_filter() {
1963        use crate::inputs::WriteOp;
1964        let engine = RecordingEngine::new();
1965        let op = NestedWriteOp::UpdateMany {
1966            relation: "posts",
1967            target_table: "posts",
1968            foreign_key: "author_id",
1969            filter: Filter::None,
1970            payload: vec![("views".to_string(), WriteOp::Set(FilterValue::Int(0)))],
1971        };
1972        op.execute(&engine, &FilterValue::Int(7)).await.unwrap();
1973
1974        let stmts = engine.statements();
1975        let (sql, params) = &stmts[0];
1976        assert!(sql.contains("UPDATE"), "got: {sql}");
1977        assert!(
1978            !sql.contains("AND"),
1979            "should omit AND when filter empty: {sql}"
1980        );
1981        // payload value + FK = 2 params
1982        assert_eq!(params.len(), 2);
1983    }
1984
1985    #[test]
1986    fn test_nested_upsert_data() {
1987        let create: NestedCreateData<TestModel> =
1988            NestedCreateData::from_pairs([("title", FilterValue::String("New".to_string()))]);
1989
1990        let upsert: NestedUpsertData<TestModel> = NestedUpsertData::new(
1991            Filter::Equals("id".into(), FilterValue::Int(1)),
1992            create,
1993            vec![(
1994                "title".to_string(),
1995                FilterValue::String("Updated".to_string()),
1996            )],
1997        );
1998
1999        assert!(matches!(upsert.filter, Filter::Equals(..)));
2000        assert_eq!(upsert.create.data.len(), 1);
2001        assert_eq!(upsert.update.len(), 1);
2002    }
2003
2004    #[tokio::test]
2005    async fn nested_op_upsert_single_statement_on_postgres() {
2006        use crate::inputs::WriteOp;
2007        // Postgres dialect supports `ON CONFLICT (...) DO UPDATE SET ...`,
2008        // so the executor must collapse the two-statement form into a
2009        // single `INSERT ... ON CONFLICT` statement.
2010        let engine = RecordingEngine::new();
2011        let op = NestedWriteOp::Upsert {
2012            relation: "posts",
2013            target_table: "posts",
2014            foreign_key: "author_id",
2015            target_pk: "id",
2016            pk: FilterValue::Int(99),
2017            create_payload: vec![("title".to_string(), FilterValue::String("new".to_string()))],
2018            update_payload: vec![("views".to_string(), WriteOp::Increment(FilterValue::Int(1)))],
2019        };
2020        op.execute(&engine, &FilterValue::Int(7)).await.unwrap();
2021
2022        let stmts = engine.statements();
2023        assert_eq!(
2024            stmts.len(),
2025            1,
2026            "expected a single-statement upsert; got {stmts:#?}"
2027        );
2028        let (sql, params) = &stmts[0];
2029        assert!(sql.contains("INSERT INTO"), "got: {sql}");
2030        assert!(sql.contains("posts"), "got: {sql}");
2031        assert!(sql.contains("ON CONFLICT (\"id\")"), "got: {sql}");
2032        assert!(sql.contains("DO UPDATE SET"), "got: {sql}");
2033        // INSERT supplies $1 (title), $2 (author_id=parent_pk);
2034        // SET fragment uses $3 (views increment).
2035        assert!(
2036            sql.contains("VALUES ($1, $2)"),
2037            "INSERT VALUES placeholders: {sql}"
2038        );
2039        assert!(sql.contains("$3"), "got: {sql}");
2040        // Two insert values + one SET value.
2041        assert_eq!(params.len(), 3);
2042        assert_eq!(params[0], FilterValue::String("new".to_string()));
2043        assert_eq!(params[1], FilterValue::Int(7));
2044        assert_eq!(params[2], FilterValue::Int(1));
2045    }
2046
2047    #[tokio::test]
2048    async fn nested_op_upsert_two_statement_fallback_on_mssql_update_path() {
2049        use crate::inputs::WriteOp;
2050        // MSSQL's `upsert_clause` returns empty → the executor must
2051        // fall back to the existing two-statement UPDATE-then-INSERT
2052        // form. Default affected-rows = 1 means UPDATE matches a row
2053        // and the INSERT phase must not run.
2054        let engine = RecordingEngine::mssql();
2055        let op = NestedWriteOp::Upsert {
2056            relation: "posts",
2057            target_table: "posts",
2058            foreign_key: "author_id",
2059            target_pk: "id",
2060            pk: FilterValue::Int(99),
2061            create_payload: vec![("title".to_string(), FilterValue::String("new".to_string()))],
2062            update_payload: vec![("views".to_string(), WriteOp::Increment(FilterValue::Int(1)))],
2063        };
2064        op.execute(&engine, &FilterValue::Int(7)).await.unwrap();
2065
2066        let stmts = engine.statements();
2067        assert_eq!(
2068            stmts.len(),
2069            1,
2070            "expected only the UPDATE — INSERT should not have run"
2071        );
2072        let (sql, update_params) = &stmts[0];
2073        assert!(sql.starts_with("UPDATE"), "got: {sql}");
2074        assert!(!sql.contains("ON CONFLICT"), "got: {sql}");
2075        assert!(!sql.contains("ON DUPLICATE"), "got: {sql}");
2076        // MSSQL uses bracket-quoted identifiers and @P-prefixed placeholders.
2077        assert!(sql.contains("[posts]"), "got: {sql}");
2078        assert!(sql.contains("@P1"), "SET clause should use @P1: {sql}");
2079        assert!(sql.contains("@P2"), "WHERE clause should use @P2: {sql}");
2080        // Two params: the increment value ($1) and the pk ($2).
2081        assert_eq!(update_params.len(), 2);
2082        assert_eq!(update_params[0], FilterValue::Int(1)); // increment value
2083        assert_eq!(update_params[1], FilterValue::Int(99)); // pk
2084    }
2085
2086    #[tokio::test]
2087    async fn nested_op_upsert_two_statement_fallback_on_mssql_insert_path() {
2088        use crate::inputs::WriteOp;
2089        // First execute_raw (UPDATE) returns 0; the executor must
2090        // proceed to emit a separate INSERT.
2091        let engine = RecordingEngine::mssql_with_affected(vec![0, 1]);
2092        let op = NestedWriteOp::Upsert {
2093            relation: "posts",
2094            target_table: "posts",
2095            foreign_key: "author_id",
2096            target_pk: "id",
2097            pk: FilterValue::Int(99),
2098            create_payload: vec![("title".to_string(), FilterValue::String("new".to_string()))],
2099            update_payload: vec![("views".to_string(), WriteOp::Increment(FilterValue::Int(1)))],
2100        };
2101        op.execute(&engine, &FilterValue::Int(7)).await.unwrap();
2102
2103        let stmts = engine.statements();
2104        assert_eq!(stmts.len(), 2, "expected UPDATE then INSERT");
2105        let (update_sql, _) = &stmts[0];
2106        assert!(update_sql.starts_with("UPDATE"), "got: {update_sql}");
2107        let (insert_sql, insert_params) = &stmts[1];
2108        assert!(insert_sql.starts_with("INSERT INTO"), "got: {insert_sql}");
2109        assert!(!insert_sql.contains("ON CONFLICT"), "got: {insert_sql}");
2110        assert!(insert_sql.contains("[posts]"), "got: {insert_sql}");
2111        assert!(insert_sql.contains("[author_id]"), "got: {insert_sql}");
2112        assert_eq!(insert_params.len(), 2);
2113        assert_eq!(insert_params[0], FilterValue::String("new".to_string()));
2114        assert_eq!(insert_params[1], FilterValue::Int(7));
2115    }
2116
2117    #[tokio::test]
2118    async fn nested_op_connect_or_create_connect_path_when_affected() {
2119        // Default RecordingEngine returns 1 from execute_raw, so the
2120        // UPDATE matches at least one row and the INSERT phase must not
2121        // run.
2122        let engine = RecordingEngine::with_affected(vec![1]);
2123        let op = NestedWriteOp::ConnectOrCreate {
2124            relation: "posts",
2125            target_table: "posts",
2126            foreign_key: "author_id",
2127            where_filter: Filter::Equals("id".into(), FilterValue::Int(42)),
2128            create_payload: vec![(
2129                "title".to_string(),
2130                FilterValue::String("fallback".to_string()),
2131            )],
2132        };
2133        op.execute(&engine, &FilterValue::Int(7)).await.unwrap();
2134
2135        let stmts = engine.statements();
2136        assert_eq!(
2137            stmts.len(),
2138            1,
2139            "expected only the UPDATE — INSERT should not have run"
2140        );
2141        let (sql, params) = &stmts[0];
2142        assert!(sql.contains("UPDATE"), "got: {sql}");
2143        assert!(!sql.contains("INSERT"), "got: {sql}");
2144        assert!(sql.contains("posts"), "got: {sql}");
2145        assert!(sql.contains("author_id"), "got: {sql}");
2146        // params: parent_pk ($1) + filter value ($2)
2147        assert_eq!(params.len(), 2);
2148        assert_eq!(params[0], FilterValue::Int(7));
2149        assert_eq!(params[1], FilterValue::Int(42));
2150    }
2151
2152    #[tokio::test]
2153    async fn nested_op_connect_or_create_create_path_when_zero_affected() {
2154        // UPDATE returns 0 (no matching row) → executor must emit the
2155        // INSERT with the FK spliced in. Second call (the INSERT)
2156        // returns 1.
2157        let engine = RecordingEngine::with_affected(vec![0, 1]);
2158        let op = NestedWriteOp::ConnectOrCreate {
2159            relation: "posts",
2160            target_table: "posts",
2161            foreign_key: "author_id",
2162            where_filter: Filter::Equals("id".into(), FilterValue::Int(42)),
2163            create_payload: vec![(
2164                "title".to_string(),
2165                FilterValue::String("fallback".to_string()),
2166            )],
2167        };
2168        op.execute(&engine, &FilterValue::Int(7)).await.unwrap();
2169
2170        let stmts = engine.statements();
2171        assert_eq!(stmts.len(), 2);
2172        let (update_sql, _) = &stmts[0];
2173        assert!(update_sql.contains("UPDATE"), "got: {update_sql}");
2174        let (insert_sql, insert_params) = &stmts[1];
2175        assert!(insert_sql.contains("INSERT INTO"), "got: {insert_sql}");
2176        assert!(insert_sql.contains("posts"), "got: {insert_sql}");
2177        assert!(insert_sql.contains("title"), "got: {insert_sql}");
2178        assert!(insert_sql.contains("author_id"), "got: {insert_sql}");
2179        // create payload value + FK (parent_pk) = 2 params; parent_pk
2180        // must be last so the FK column lines up with $2.
2181        assert_eq!(insert_params.len(), 2);
2182        assert_eq!(
2183            insert_params[0],
2184            FilterValue::String("fallback".to_string())
2185        );
2186        assert_eq!(insert_params[1], FilterValue::Int(7));
2187    }
2188
2189    #[tokio::test]
2190    async fn nested_op_set_with_empty_list_clears_all_children() {
2191        // Empty set_pks → one UPDATE with `WHERE fk = $1`, no NOT IN clause,
2192        // no connect step.
2193        let engine = RecordingEngine::new();
2194        let op = NestedWriteOp::Set {
2195            relation: "posts",
2196            target_table: "posts",
2197            foreign_key: "author_id",
2198            target_pk: "id",
2199            set_pks: vec![],
2200        };
2201        op.execute(&engine, &FilterValue::Int(7)).await.unwrap();
2202
2203        let stmts = engine.statements();
2204        assert_eq!(stmts.len(), 1, "expected only the disconnect-all UPDATE");
2205        let (sql, params) = &stmts[0];
2206        assert!(sql.contains("UPDATE"), "got: {sql}");
2207        assert!(sql.contains("posts"), "got: {sql}");
2208        assert!(sql.contains("author_id"), "got: {sql}");
2209        assert!(sql.contains("= NULL"), "got: {sql}");
2210        assert!(!sql.contains("NOT IN"), "got: {sql}");
2211        assert!(!sql.contains(" IN ("), "got: {sql}");
2212        // Only the parent_pk param.
2213        assert_eq!(params.len(), 1);
2214        assert_eq!(params[0], FilterValue::Int(7));
2215    }
2216
2217    #[tokio::test]
2218    async fn nested_op_set_with_non_empty_list_emits_disconnect_then_connect() {
2219        let engine = RecordingEngine::new();
2220        let op = NestedWriteOp::Set {
2221            relation: "posts",
2222            target_table: "posts",
2223            foreign_key: "author_id",
2224            target_pk: "id",
2225            set_pks: vec![
2226                FilterValue::Int(1),
2227                FilterValue::Int(2),
2228                FilterValue::Int(3),
2229            ],
2230        };
2231        op.execute(&engine, &FilterValue::Int(7)).await.unwrap();
2232
2233        let stmts = engine.statements();
2234        assert_eq!(stmts.len(), 2);
2235        let (disconnect_sql, disconnect_params) = &stmts[0];
2236        assert!(disconnect_sql.contains("UPDATE"), "got: {disconnect_sql}");
2237        assert!(disconnect_sql.contains("posts"), "got: {disconnect_sql}");
2238        assert!(
2239            disconnect_sql.contains("author_id"),
2240            "got: {disconnect_sql}"
2241        );
2242        assert!(disconnect_sql.contains("= NULL"), "got: {disconnect_sql}");
2243        assert!(disconnect_sql.contains("NOT IN"), "got: {disconnect_sql}");
2244        // params: parent_pk ($1) + 3 set pks ($2..$4)
2245        assert_eq!(disconnect_params.len(), 4);
2246        assert_eq!(disconnect_params[0], FilterValue::Int(7));
2247        assert_eq!(disconnect_params[1], FilterValue::Int(1));
2248        assert_eq!(disconnect_params[2], FilterValue::Int(2));
2249        assert_eq!(disconnect_params[3], FilterValue::Int(3));
2250
2251        let (connect_sql, connect_params) = &stmts[1];
2252        assert!(connect_sql.contains("UPDATE"), "got: {connect_sql}");
2253        assert!(connect_sql.contains("posts"), "got: {connect_sql}");
2254        assert!(connect_sql.contains("author_id"), "got: {connect_sql}");
2255        assert!(connect_sql.contains(" IN ("), "got: {connect_sql}");
2256        assert!(!connect_sql.contains("NOT IN"), "got: {connect_sql}");
2257        // params: parent_pk ($1) + 3 set pks ($2..$4)
2258        assert_eq!(connect_params.len(), 4);
2259        assert_eq!(connect_params[0], FilterValue::Int(7));
2260        assert_eq!(connect_params[1], FilterValue::Int(1));
2261        assert_eq!(connect_params[2], FilterValue::Int(2));
2262        assert_eq!(connect_params[3], FilterValue::Int(3));
2263    }
2264
2265    #[tokio::test]
2266    async fn nested_op_set_with_single_element_uses_single_placeholder_in_lists() {
2267        let engine = RecordingEngine::new();
2268        let op = NestedWriteOp::Set {
2269            relation: "posts",
2270            target_table: "posts",
2271            foreign_key: "author_id",
2272            target_pk: "id",
2273            set_pks: vec![FilterValue::Int(5)],
2274        };
2275        op.execute(&engine, &FilterValue::Int(7)).await.unwrap();
2276
2277        let stmts = engine.statements();
2278        assert_eq!(stmts.len(), 2);
2279        let (disconnect_sql, _) = &stmts[0];
2280        // The NOT IN list has one placeholder.
2281        assert!(
2282            disconnect_sql.contains("NOT IN ($2)"),
2283            "got: {disconnect_sql}"
2284        );
2285        let (connect_sql, _) = &stmts[1];
2286        assert!(connect_sql.contains(" IN ($2)"), "got: {connect_sql}");
2287        assert!(!connect_sql.contains("NOT IN"), "got: {connect_sql}");
2288    }
2289
2290    #[tokio::test]
2291    async fn nested_op_set_disconnect_clears_only_current_parents_children() {
2292        // The disconnect UPDATE must be scoped to `WHERE fk = $parent AND ...`
2293        // — without this clause, we'd clear children belonging to other parents.
2294        let engine = RecordingEngine::new();
2295        let op = NestedWriteOp::Set {
2296            relation: "posts",
2297            target_table: "posts",
2298            foreign_key: "author_id",
2299            target_pk: "id",
2300            set_pks: vec![FilterValue::Int(1)],
2301        };
2302        op.execute(&engine, &FilterValue::Int(7)).await.unwrap();
2303
2304        let stmts = engine.statements();
2305        let (disconnect_sql, _) = &stmts[0];
2306        // The disconnect must scope to the parent's children.
2307        assert!(
2308            disconnect_sql.contains("author_id\" = $1"),
2309            "expected `author_id = $1` clause; got: {disconnect_sql}"
2310        );
2311    }
2312
2313    #[tokio::test]
2314    async fn nested_op_connect_or_create_rejects_empty_where() {
2315        // Filter::None would lower to `WHERE TRUE` and rewrite every row.
2316        // The executor must reject this defensively before issuing SQL.
2317        let engine = RecordingEngine::new();
2318        let op = NestedWriteOp::ConnectOrCreate {
2319            relation: "posts",
2320            target_table: "posts",
2321            foreign_key: "author_id",
2322            where_filter: Filter::None,
2323            create_payload: vec![(
2324                "title".to_string(),
2325                FilterValue::String("fallback".to_string()),
2326            )],
2327        };
2328        let err = op
2329            .execute(&engine, &FilterValue::Int(7))
2330            .await
2331            .expect_err("empty where must be rejected");
2332        let op_ctx = err.context.operation.clone().unwrap_or_default();
2333        assert!(op_ctx.contains("ConnectOrCreate"), "got: {op_ctx}");
2334        // No SQL should have been emitted.
2335        assert!(engine.statements().is_empty());
2336    }
2337
2338    #[tokio::test]
2339    async fn nested_op_upsert_single_statement_empty_update_payload_emits_do_nothing() {
2340        // PG dialect → single-statement path. Empty update_payload should emit
2341        // INSERT INTO ... VALUES (...) ON CONFLICT (target_pk) DO NOTHING
2342        // (per the upsert_do_nothing_clause helper added in 8f01cab).
2343        let engine = RecordingEngine::new();
2344        let op = NestedWriteOp::Upsert {
2345            relation: "posts",
2346            target_table: "posts",
2347            foreign_key: "author_id",
2348            target_pk: "id",
2349            pk: FilterValue::Int(99),
2350            create_payload: vec![("title".to_string(), FilterValue::String("new".into()))],
2351            update_payload: vec![],
2352        };
2353        op.execute(&engine, &FilterValue::Int(7)).await.unwrap();
2354
2355        let stmts = engine.statements();
2356        assert_eq!(
2357            stmts.len(),
2358            1,
2359            "expected one INSERT ... DO NOTHING; got {stmts:#?}"
2360        );
2361        let (sql, params) = &stmts[0];
2362        assert!(sql.starts_with("INSERT INTO"), "got: {sql}");
2363        assert!(sql.contains("posts"), "got: {sql}");
2364        assert!(sql.contains("ON CONFLICT (\"id\")"), "got: {sql}");
2365        assert!(sql.contains("DO NOTHING"), "got: {sql}");
2366        // Two insert values (title + FK); no SET params.
2367        assert_eq!(params.len(), 2);
2368        assert_eq!(params[0], FilterValue::String("new".into()));
2369        assert_eq!(params[1], FilterValue::Int(7));
2370    }
2371
2372    #[tokio::test]
2373    async fn nested_op_upsert_two_statement_fallback_empty_update_payload_emits_bare_insert() {
2374        // MSSQL dialect → two-statement fallback. On the fallback path,
2375        // empty update_payload skips the UPDATE entirely and emits a bare INSERT
2376        // (if the row already exists the engine surfaces a PK unique violation,
2377        // which is correct insert-or-fail semantics on dialects without native
2378        // upsert syntax). See production code comment at the
2379        // `if update_payload.is_empty()` guard in the two-statement fallback path.
2380        let engine = RecordingEngine::mssql();
2381        let op = NestedWriteOp::Upsert {
2382            relation: "posts",
2383            target_table: "posts",
2384            foreign_key: "author_id",
2385            target_pk: "id",
2386            pk: FilterValue::Int(99),
2387            create_payload: vec![("title".to_string(), FilterValue::String("new".into()))],
2388            update_payload: vec![],
2389        };
2390        op.execute(&engine, &FilterValue::Int(7)).await.unwrap();
2391
2392        let stmts = engine.statements();
2393        assert_eq!(stmts.len(), 1, "expected one bare INSERT; got {stmts:#?}");
2394        let (sql, params) = &stmts[0];
2395        assert!(sql.starts_with("INSERT INTO"), "got: {sql}");
2396        assert!(sql.contains("[posts]"), "got: {sql}");
2397        assert!(!sql.contains("ON CONFLICT"), "got: {sql}");
2398        assert_eq!(params.len(), 2);
2399        assert_eq!(params[0], FilterValue::String("new".into()));
2400        assert_eq!(params[1], FilterValue::Int(7));
2401    }
2402
2403    #[tokio::test]
2404    async fn nested_op_upsert_single_statement_all_unset_update_payload() {
2405        use crate::inputs::WriteOp;
2406        // Unset emits `col = NULL` literal (no placeholder consumed).
2407        // The placeholder accounting must remain consistent: only INSERT
2408        // values consume placeholders.
2409        let engine = RecordingEngine::new();
2410        let op = NestedWriteOp::Upsert {
2411            relation: "posts",
2412            target_table: "posts",
2413            foreign_key: "author_id",
2414            target_pk: "id",
2415            pk: FilterValue::Int(99),
2416            create_payload: vec![("title".to_string(), FilterValue::String("new".into()))],
2417            update_payload: vec![("deleted_at".to_string(), WriteOp::Unset)],
2418        };
2419        op.execute(&engine, &FilterValue::Int(7)).await.unwrap();
2420
2421        let stmts = engine.statements();
2422        assert_eq!(
2423            stmts.len(),
2424            1,
2425            "expected one INSERT...ON CONFLICT statement; got {stmts:#?}"
2426        );
2427        let (sql, params) = &stmts[0];
2428        assert!(sql.starts_with("INSERT INTO"), "got: {sql}");
2429        assert!(
2430            sql.contains("ON CONFLICT (\"id\") DO UPDATE SET"),
2431            "got: {sql}"
2432        );
2433        assert!(sql.contains("\"deleted_at\" = NULL"), "got: {sql}");
2434        // Only INSERT params: title + FK. No SET param (Unset consumes none).
2435        assert_eq!(params.len(), 2);
2436        assert_eq!(params[0], FilterValue::String("new".into()));
2437        assert_eq!(params[1], FilterValue::Int(7));
2438    }
2439
2440    #[tokio::test]
2441    async fn nested_op_upsert_empty_create_payload_returns_invalid_input() {
2442        use crate::inputs::WriteOp;
2443        let engine = RecordingEngine::new();
2444        let op = NestedWriteOp::Upsert {
2445            relation: "posts",
2446            target_table: "posts",
2447            foreign_key: "author_id",
2448            target_pk: "id",
2449            pk: FilterValue::Int(99),
2450            create_payload: vec![],
2451            update_payload: vec![("views".to_string(), WriteOp::Increment(FilterValue::Int(1)))],
2452        };
2453        let err = op.execute(&engine, &FilterValue::Int(7)).await.unwrap_err();
2454        assert_eq!(
2455            err.code,
2456            crate::error::ErrorCode::InvalidParameter,
2457            "expected InvalidParameter (invalid_input), got: {err:?}"
2458        );
2459        let msg = format!("{err}");
2460        assert!(
2461            msg.contains("create_payload") || msg.contains("create column"),
2462            "msg: {msg}"
2463        );
2464    }
2465
2466    #[tokio::test]
2467    async fn nested_op_upsert_on_notsql_returns_unsupported_not_panic() {
2468        use crate::inputs::WriteOp;
2469        let engine = RecordingEngine::notsql();
2470        let op = NestedWriteOp::Upsert {
2471            relation: "posts",
2472            target_table: "posts",
2473            foreign_key: "author_id",
2474            target_pk: "id",
2475            pk: FilterValue::Int(99),
2476            create_payload: vec![("title".to_string(), FilterValue::String("x".into()))],
2477            update_payload: vec![("views".to_string(), WriteOp::Increment(FilterValue::Int(1)))],
2478        };
2479        let err = op.execute(&engine, &FilterValue::Int(7)).await.unwrap_err();
2480        let msg = format!("{err}");
2481        assert!(
2482            msg.contains("Upsert is not supported")
2483                || msg.contains("unsupported")
2484                || msg.contains("Unsupported"),
2485            "msg: {msg}"
2486        );
2487        assert_eq!(
2488            engine.statements().len(),
2489            0,
2490            "no SQL should have been emitted"
2491        );
2492    }
2493}