Skip to main content

reinhardt_db/migrations/
operations.rs

1//! Migration operations
2//!
3//! This module provides various migration operations inspired by Django's migration system.
4//! Operations are organized into three categories:
5//!
6//! - **Model operations** (`models`): Create, delete, and rename models (tables)
7//! - **Field operations** (`fields`): Add, remove, alter, and rename fields (columns)
8//! - **Special operations** (`special`): Run raw SQL or custom code
9//!
10//! # Example
11//!
12//! ```rust
13//! use reinhardt_db::migrations::operations::{
14//!     models::{CreateModel, DeleteModel},
15//!     fields::{AddField, RemoveField},
16//!     special::RunSQL,
17//!     FieldDefinition,
18//! };
19//! use reinhardt_db::migrations::{ProjectState, FieldType};
20//!
21//! let mut state = ProjectState::new();
22//!
23//! // Create a model
24//! let create = CreateModel::new(
25//!     "User",
26//!     vec![
27//!         FieldDefinition::new("id", FieldType::Integer, true, false, Option::<&str>::None),
28//!         FieldDefinition::new("name", FieldType::VarChar(100), false, false, Option::<&str>::None),
29//!     ],
30//! );
31//! create.state_forwards("myapp", &mut state);
32//!
33//! // Add a field
34//! let add = AddField::new("User", FieldDefinition::new("email", FieldType::VarChar(255), false, false, Option::<&str>::None));
35//! add.state_forwards("myapp", &mut state);
36//!
37//! // Run custom SQL
38//! let sql = RunSQL::new("CREATE INDEX idx_email ON myapp_user(email)");
39//! ```
40
41pub mod fields;
42pub mod models;
43pub mod postgres;
44pub mod special;
45mod to_tokens;
46
47// Re-export commonly used types for convenience
48pub use fields::{AddField, AlterField, RemoveField, RenameField};
49pub use models::{CreateModel, DeleteModel, FieldDefinition, MoveModel, RenameModel};
50pub use postgres::{CreateCollation, CreateExtension, DropExtension};
51pub use special::{RunCode, RunSQL, StateOperation};
52
53// Legacy types for backward compatibility
54// These are maintained from the original operations.rs
55use super::{FieldState, FieldType, ModelState, ProjectState};
56use pg_escape::{quote_identifier, quote_literal};
57use reinhardt_query::prelude::{
58	Alias, AlterTableStatement, ColumnDef, CreateIndexStatement, CreateTableStatement,
59	DropIndexStatement, DropTableStatement, Query, SimpleExpr, Value,
60};
61use serde::{Deserialize, Serialize};
62
63/// Index type for database indexes
64///
65/// Specifies the type of index to create. Different index types have different
66/// performance characteristics and support different operators.
67///
68/// # Examples
69///
70/// ```rust
71/// use reinhardt_db::migrations::operations::IndexType;
72///
73/// // B-Tree is the default, best for equality and range queries
74/// let btree = IndexType::BTree;
75///
76/// // Hash is best for simple equality comparisons
77/// let hash = IndexType::Hash;
78///
79/// // GIN is best for containment operators (arrays, JSONB)
80/// let gin = IndexType::Gin;
81/// ```
82#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
83#[serde(rename_all = "lowercase")]
84pub enum IndexType {
85	/// B-tree index (default)
86	///
87	/// Best for: equality and range queries (=, <, >, <=, >=, BETWEEN)
88	/// Supported by: All databases
89	#[default]
90	BTree,
91
92	/// Hash index
93	///
94	/// Best for: simple equality comparisons (=)
95	/// Supported by: PostgreSQL, MySQL
96	Hash,
97
98	/// GIN (Generalized Inverted Index)
99	///
100	/// Best for: composite values (arrays, JSONB, full-text search)
101	/// Supported by: PostgreSQL
102	Gin,
103
104	/// GiST (Generalized Search Tree)
105	///
106	/// Best for: geometric data, full-text search, range types
107	/// Supported by: PostgreSQL
108	Gist,
109
110	/// BRIN (Block Range Index)
111	///
112	/// Best for: very large tables with naturally ordered data
113	/// Supported by: PostgreSQL
114	Brin,
115
116	/// Full-text index
117	///
118	/// Best for: full-text search on text columns
119	/// Supported by: MySQL
120	Fulltext,
121
122	/// Spatial index
123	///
124	/// Best for: geometric/geographic data
125	/// Supported by: MySQL
126	Spatial,
127}
128
129impl std::fmt::Display for IndexType {
130	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
131		match self {
132			IndexType::BTree => write!(f, "btree"),
133			IndexType::Hash => write!(f, "hash"),
134			IndexType::Gin => write!(f, "gin"),
135			IndexType::Gist => write!(f, "gist"),
136			IndexType::Brin => write!(f, "brin"),
137			IndexType::Fulltext => write!(f, "fulltext"),
138			IndexType::Spatial => write!(f, "spatial"),
139		}
140	}
141}
142// ============================================================================
143// MySQL-Specific ALTER TABLE Options
144// ============================================================================
145
146/// MySQL ALTER TABLE algorithm types
147#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
148#[serde(rename_all = "UPPERCASE")]
149pub enum MySqlAlgorithm {
150	/// Instant variant.
151	Instant,
152	/// Inplace variant.
153	Inplace,
154	/// Copy variant.
155	Copy,
156	#[default]
157	/// Default variant.
158	Default,
159}
160
161impl std::fmt::Display for MySqlAlgorithm {
162	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
163		match self {
164			MySqlAlgorithm::Instant => write!(f, "INSTANT"),
165			MySqlAlgorithm::Inplace => write!(f, "INPLACE"),
166			MySqlAlgorithm::Copy => write!(f, "COPY"),
167			MySqlAlgorithm::Default => write!(f, "DEFAULT"),
168		}
169	}
170}
171
172/// MySQL ALTER TABLE lock types
173#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
174#[serde(rename_all = "UPPERCASE")]
175pub enum MySqlLock {
176	/// None variant.
177	None,
178	/// Shared variant.
179	Shared,
180	/// Exclusive variant.
181	Exclusive,
182	#[default]
183	/// Default variant.
184	Default,
185}
186
187impl std::fmt::Display for MySqlLock {
188	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
189		match self {
190			MySqlLock::None => write!(f, "NONE"),
191			MySqlLock::Shared => write!(f, "SHARED"),
192			MySqlLock::Exclusive => write!(f, "EXCLUSIVE"),
193			MySqlLock::Default => write!(f, "DEFAULT"),
194		}
195	}
196}
197
198/// MySQL ALTER TABLE options
199#[non_exhaustive]
200#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
201pub struct AlterTableOptions {
202	#[serde(default, skip_serializing_if = "Option::is_none")]
203	/// The algorithm.
204	pub algorithm: Option<MySqlAlgorithm>,
205	#[serde(default, skip_serializing_if = "Option::is_none")]
206	/// The lock.
207	pub lock: Option<MySqlLock>,
208}
209
210impl AlterTableOptions {
211	/// Creates a new instance.
212	pub fn new() -> Self {
213		Self::default()
214	}
215	/// Sets the algorithm and returns self for chaining.
216	pub fn with_algorithm(mut self, algorithm: MySqlAlgorithm) -> Self {
217		self.algorithm = Some(algorithm);
218		self
219	}
220	/// Sets the lock and returns self for chaining.
221	pub fn with_lock(mut self, lock: MySqlLock) -> Self {
222		self.lock = Some(lock);
223		self
224	}
225	/// Returns the mpty.
226	pub fn is_empty(&self) -> bool {
227		self.algorithm.is_none() && self.lock.is_none()
228	}
229	/// Converts to sql suffix.
230	pub fn to_sql_suffix(&self) -> String {
231		let mut parts = Vec::new();
232		if let Some(algo) = &self.algorithm
233			&& *algo != MySqlAlgorithm::Default
234		{
235			parts.push(format!("ALGORITHM={}", algo));
236		}
237		if let Some(lock) = &self.lock
238			&& *lock != MySqlLock::Default
239		{
240			parts.push(format!("LOCK={}", lock));
241		}
242		if parts.is_empty() {
243			String::new()
244		} else {
245			format!(", {}", parts.join(", "))
246		}
247	}
248}
249
250// ============================================================================
251// MySQL Table Partitioning
252// ============================================================================
253
254/// Partition type for MySQL table partitioning
255#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
256#[serde(rename_all = "UPPERCASE")]
257pub enum PartitionType {
258	/// Range variant.
259	Range,
260	/// List variant.
261	List,
262	/// Hash variant.
263	Hash,
264	/// Key variant.
265	Key,
266}
267
268impl std::fmt::Display for PartitionType {
269	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
270		match self {
271			PartitionType::Range => write!(f, "RANGE"),
272			PartitionType::List => write!(f, "LIST"),
273			PartitionType::Hash => write!(f, "HASH"),
274			PartitionType::Key => write!(f, "KEY"),
275		}
276	}
277}
278
279/// Partition value definition
280#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
281#[serde(tag = "type")]
282pub enum PartitionValues {
283	/// LessThan variant.
284	LessThan(String),
285	/// In variant.
286	In(Vec<String>),
287	/// ModuloCount variant.
288	ModuloCount(u32),
289}
290
291/// Individual partition definition
292#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
293pub struct PartitionDef {
294	/// The name.
295	pub name: String,
296	/// The values.
297	pub values: PartitionValues,
298}
299
300impl PartitionDef {
301	/// Creates a new instance.
302	pub fn new(name: impl Into<String>, values: PartitionValues) -> Self {
303		Self {
304			name: name.into(),
305			values,
306		}
307	}
308	/// Performs the less than operation.
309	pub fn less_than(name: impl Into<String>, value: impl Into<String>) -> Self {
310		Self::new(name, PartitionValues::LessThan(value.into()))
311	}
312	/// Performs the maxvalue operation.
313	pub fn maxvalue(name: impl Into<String>) -> Self {
314		Self::new(name, PartitionValues::LessThan("MAXVALUE".to_string()))
315	}
316	/// Performs the list in operation.
317	pub fn list_in(name: impl Into<String>, values: Vec<String>) -> Self {
318		Self::new(name, PartitionValues::In(values))
319	}
320}
321
322/// CockroachDB INTERLEAVE IN PARENT specification
323///
324/// Used to co-locate child table rows with parent table rows,
325/// improving join performance for hierarchical data.
326///
327/// **CockroachDB only**: This is ignored for other databases.
328#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
329pub struct InterleaveSpec {
330	/// Parent table name
331	pub parent_table: String,
332	/// Columns in the parent table to interleave with
333	pub parent_columns: Vec<String>,
334}
335
336/// Table partitioning options
337#[non_exhaustive]
338#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
339pub struct PartitionOptions {
340	/// The partition type.
341	pub partition_type: PartitionType,
342	/// The column.
343	pub column: String,
344	/// The partitions.
345	pub partitions: Vec<PartitionDef>,
346}
347
348impl PartitionOptions {
349	/// Creates a new instance.
350	pub fn new(
351		partition_type: PartitionType,
352		column: impl Into<String>,
353		partitions: Vec<PartitionDef>,
354	) -> Self {
355		Self {
356			partition_type,
357			column: column.into(),
358			partitions,
359		}
360	}
361	/// Performs the range operation.
362	pub fn range(column: impl Into<String>, partitions: Vec<PartitionDef>) -> Self {
363		Self::new(PartitionType::Range, column, partitions)
364	}
365	/// Performs the list operation.
366	pub fn list(column: impl Into<String>, partitions: Vec<PartitionDef>) -> Self {
367		Self::new(PartitionType::List, column, partitions)
368	}
369	/// Performs the hash operation.
370	pub fn hash(column: impl Into<String>, num_partitions: u32) -> Self {
371		Self::new(
372			PartitionType::Hash,
373			column,
374			vec![PartitionDef::new(
375				"",
376				PartitionValues::ModuloCount(num_partitions),
377			)],
378		)
379	}
380	/// Performs the key operation.
381	pub fn key(column: impl Into<String>, num_partitions: u32) -> Self {
382		Self::new(
383			PartitionType::Key,
384			column,
385			vec![PartitionDef::new(
386				"",
387				PartitionValues::ModuloCount(num_partitions),
388			)],
389		)
390	}
391	/// Converts to sql.
392	pub fn to_sql(&self) -> String {
393		let mut sql = format!("PARTITION BY {}({})", self.partition_type, self.column);
394		match self.partition_type {
395			PartitionType::Hash | PartitionType::Key => {
396				if let Some(p) = self.partitions.first()
397					&& let PartitionValues::ModuloCount(n) = &p.values
398				{
399					sql.push_str(&format!(" PARTITIONS {}", n));
400				}
401			}
402			PartitionType::Range | PartitionType::List => {
403				sql.push_str(" (");
404				let defs: Vec<String> = self
405					.partitions
406					.iter()
407					.map(|p| {
408						let vals = match &p.values {
409							PartitionValues::LessThan(v) => {
410								if v == "MAXVALUE" {
411									"VALUES LESS THAN MAXVALUE".to_string()
412								} else {
413									format!("VALUES LESS THAN ('{}')", v)
414								}
415							}
416							PartitionValues::In(v) => format!(
417								"VALUES IN ({})",
418								v.iter()
419									.map(|x| format!("'{}'", x))
420									.collect::<Vec<_>>()
421									.join(", ")
422							),
423							PartitionValues::ModuloCount(_) => String::new(),
424						};
425						format!("PARTITION {} {}", p.name, vals)
426					})
427					.collect();
428				sql.push_str(&defs.join(", "));
429				sql.push(')');
430			}
431		}
432		sql
433	}
434}
435
436/// Deferrable constraint option for PostgreSQL
437///
438/// Controls when constraint checking is performed during a transaction.
439#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
440#[serde(rename_all = "lowercase")]
441pub enum DeferrableOption {
442	/// DEFERRABLE INITIALLY IMMEDIATE
443	Immediate,
444	/// DEFERRABLE INITIALLY DEFERRED
445	Deferred,
446}
447
448impl std::fmt::Display for DeferrableOption {
449	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
450		match self {
451			DeferrableOption::Immediate => write!(f, "DEFERRABLE INITIALLY IMMEDIATE"),
452			DeferrableOption::Deferred => write!(f, "DEFERRABLE INITIALLY DEFERRED"),
453		}
454	}
455}
456
457/// Constraint definition for tables
458#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
459#[serde(tag = "type")]
460pub enum Constraint {
461	/// PrimaryKey constraint
462	///
463	/// Used for composite primary keys defined at the table level.
464	/// Single-column primary keys are typically defined directly on the column.
465	PrimaryKey {
466		/// The constraint name.
467		name: String,
468		/// The columns that form the primary key.
469		columns: Vec<String>,
470	},
471	/// ForeignKey constraint
472	ForeignKey {
473		/// The constraint name.
474		name: String,
475		/// The columns in the referencing table.
476		columns: Vec<String>,
477		/// The referenced table name.
478		referenced_table: String,
479		/// The columns in the referenced table.
480		referenced_columns: Vec<String>,
481		/// Action on delete of the referenced row.
482		on_delete: super::ForeignKeyAction,
483		/// Action on update of the referenced row.
484		on_update: super::ForeignKeyAction,
485		/// Optional deferrable constraint option.
486		#[serde(default, skip_serializing_if = "Option::is_none")]
487		deferrable: Option<DeferrableOption>,
488	},
489	/// Unique constraint
490	Unique {
491		/// The constraint name.
492		name: String,
493		/// The columns that must be unique together.
494		columns: Vec<String>,
495	},
496	/// Check constraint
497	Check {
498		/// The constraint name.
499		name: String,
500		/// The SQL check expression.
501		expression: String,
502	},
503	/// OneToOne constraint (ForeignKey + Unique combination)
504	OneToOne {
505		/// The constraint name.
506		name: String,
507		/// The column in the referencing table.
508		column: String,
509		/// The referenced table name.
510		referenced_table: String,
511		/// The referenced column name.
512		referenced_column: String,
513		/// Action on delete of the referenced row.
514		on_delete: super::ForeignKeyAction,
515		/// Action on update of the referenced row.
516		on_update: super::ForeignKeyAction,
517		/// Optional deferrable constraint option.
518		#[serde(default, skip_serializing_if = "Option::is_none")]
519		deferrable: Option<DeferrableOption>,
520	},
521	/// ManyToMany relationship metadata (intermediate table reference)
522	ManyToMany {
523		/// The relationship name.
524		name: String,
525		/// The intermediate (through) table name.
526		through_table: String,
527		/// The column referencing the source table.
528		source_column: String,
529		/// The column referencing the target table.
530		target_column: String,
531		/// The target table name.
532		target_table: String,
533	},
534	/// Exclude constraint (PostgreSQL only)
535	Exclude {
536		/// The name.
537		name: String,
538		/// The elements.
539		elements: Vec<(String, String)>,
540		#[serde(default, skip_serializing_if = "Option::is_none")]
541		/// The using.
542		using: Option<String>,
543		#[serde(default, skip_serializing_if = "Option::is_none")]
544		/// The where clause.
545		where_clause: Option<String>,
546	},
547}
548
549impl std::fmt::Display for Constraint {
550	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
551		match self {
552			Constraint::PrimaryKey { name, columns } => {
553				write!(
554					f,
555					"CONSTRAINT {} PRIMARY KEY ({})",
556					name,
557					columns.join(", ")
558				)
559			}
560			Constraint::ForeignKey {
561				name,
562				columns,
563				referenced_table,
564				referenced_columns,
565				on_delete,
566				on_update,
567				deferrable,
568			} => {
569				write!(
570					f,
571					"CONSTRAINT {} FOREIGN KEY ({}) REFERENCES {}({}) ON DELETE {} ON UPDATE {}",
572					name,
573					columns.join(", "),
574					referenced_table,
575					referenced_columns.join(", "),
576					on_delete.to_sql_keyword(),
577					on_update.to_sql_keyword()
578				)?;
579				if let Some(defer_opt) = deferrable {
580					write!(f, " {}", defer_opt)?;
581				}
582				Ok(())
583			}
584			Constraint::Unique { name, columns } => {
585				write!(f, "CONSTRAINT {} UNIQUE ({})", name, columns.join(", "))
586			}
587			Constraint::Check { name, expression } => {
588				write!(f, "CONSTRAINT {} CHECK ({})", name, expression)
589			}
590			Constraint::OneToOne {
591				name,
592				column,
593				referenced_table,
594				referenced_column,
595				on_delete,
596				on_update,
597				deferrable,
598			} => {
599				write!(
600					f,
601					"CONSTRAINT {} FOREIGN KEY ({}) REFERENCES {}({}) ON DELETE {} ON UPDATE {}",
602					name,
603					column,
604					referenced_table,
605					referenced_column,
606					on_delete.to_sql_keyword(),
607					on_update.to_sql_keyword()
608				)?;
609				if let Some(defer_opt) = deferrable {
610					write!(f, " {}", defer_opt)?;
611				}
612				write!(f, ", CONSTRAINT {}_unique UNIQUE ({})", name, column)
613			}
614			Constraint::ManyToMany { through_table, .. } => {
615				write!(f, "-- ManyToMany via {}", through_table)
616			}
617			Constraint::Exclude {
618				name,
619				elements,
620				using,
621				where_clause,
622			} => {
623				let elements_str: Vec<String> = elements
624					.iter()
625					.map(|(col, op)| format!("{} WITH {}", col, op))
626					.collect();
627				let using_str = using.as_deref().unwrap_or("gist");
628				if let Some(where_cl) = where_clause {
629					write!(
630						f,
631						"CONSTRAINT {} EXCLUDE USING {} ({}) WHERE ({})",
632						name,
633						using_str,
634						elements_str.join(", "),
635						where_cl
636					)
637				} else {
638					write!(
639						f,
640						"CONSTRAINT {} EXCLUDE USING {} ({})",
641						name,
642						using_str,
643						elements_str.join(", ")
644					)
645				}
646			}
647		}
648	}
649}
650
651/// Source for bulk data loading
652///
653/// Specifies where the data for bulk loading comes from.
654#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
655#[serde(tag = "type", content = "value")]
656pub enum BulkLoadSource {
657	/// Load from a file path
658	File(String),
659	/// Load from standard input (STDIN)
660	Stdin,
661	/// Load from a program's output (e.g., "gunzip -c file.csv.gz")
662	Program(String),
663}
664
665/// Format for bulk data loading
666///
667/// Specifies the format of the data being loaded.
668#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
669#[serde(rename_all = "lowercase")]
670pub enum BulkLoadFormat {
671	/// Plain text format (PostgreSQL default)
672	#[default]
673	Text,
674	/// CSV format
675	Csv,
676	/// Binary format (PostgreSQL-specific)
677	Binary,
678}
679
680impl std::fmt::Display for BulkLoadFormat {
681	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
682		match self {
683			BulkLoadFormat::Text => write!(f, "TEXT"),
684			BulkLoadFormat::Csv => write!(f, "CSV"),
685			BulkLoadFormat::Binary => write!(f, "BINARY"),
686		}
687	}
688}
689
690/// Options for bulk data loading
691///
692/// Provides fine-grained control over how data is parsed during bulk loading.
693#[non_exhaustive]
694#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
695pub struct BulkLoadOptions {
696	/// Field delimiter character (default: ',' for CSV, '\t' for TEXT)
697	#[serde(default, skip_serializing_if = "Option::is_none")]
698	pub delimiter: Option<char>,
699	/// String to represent NULL values
700	#[serde(default, skip_serializing_if = "Option::is_none")]
701	pub null_string: Option<String>,
702	/// Whether the file has a header row (CSV format)
703	#[serde(default)]
704	pub header: bool,
705	/// Columns to load into (if not all columns)
706	#[serde(default, skip_serializing_if = "Option::is_none")]
707	pub columns: Option<Vec<String>>,
708	/// Use LOCAL keyword (MySQL LOAD DATA LOCAL INFILE)
709	#[serde(default)]
710	pub local: bool,
711	/// Quote character for CSV (default: '"')
712	#[serde(default, skip_serializing_if = "Option::is_none")]
713	pub quote: Option<char>,
714	/// Escape character for CSV
715	#[serde(default, skip_serializing_if = "Option::is_none")]
716	pub escape: Option<char>,
717	/// Line terminator (default: '\n')
718	#[serde(default, skip_serializing_if = "Option::is_none")]
719	pub line_terminator: Option<String>,
720	/// Encoding of the file (MySQL-specific)
721	#[serde(default, skip_serializing_if = "Option::is_none")]
722	pub encoding: Option<String>,
723}
724
725impl BulkLoadOptions {
726	/// Create new BulkLoadOptions with default values
727	pub fn new() -> Self {
728		Self::default()
729	}
730
731	/// Set the field delimiter
732	pub fn with_delimiter(mut self, delimiter: char) -> Self {
733		self.delimiter = Some(delimiter);
734		self
735	}
736
737	/// Set the NULL string representation
738	pub fn with_null_string(mut self, null_string: impl Into<String>) -> Self {
739		self.null_string = Some(null_string.into());
740		self
741	}
742
743	/// Enable or disable header row
744	pub fn with_header(mut self, header: bool) -> Self {
745		self.header = header;
746		self
747	}
748
749	/// Set specific columns to load
750	pub fn with_columns(mut self, columns: Vec<String>) -> Self {
751		self.columns = Some(columns);
752		self
753	}
754
755	/// Enable LOCAL keyword for MySQL
756	pub fn with_local(mut self, local: bool) -> Self {
757		self.local = local;
758		self
759	}
760
761	/// Set the quote character for CSV
762	pub fn with_quote(mut self, quote: char) -> Self {
763		self.quote = Some(quote);
764		self
765	}
766
767	/// Set the escape character
768	pub fn with_escape(mut self, escape: char) -> Self {
769		self.escape = Some(escape);
770		self
771	}
772
773	/// Set the line terminator
774	pub fn with_line_terminator(mut self, terminator: impl Into<String>) -> Self {
775		self.line_terminator = Some(terminator.into());
776		self
777	}
778
779	/// Set the file encoding (MySQL-specific)
780	pub fn with_encoding(mut self, encoding: impl Into<String>) -> Self {
781		self.encoding = Some(encoding.into());
782		self
783	}
784}
785
786/// A migration operation (legacy enum for backward compatibility)
787///
788/// This enum is maintained for backward compatibility with existing code.
789/// New code should use the specific operation types from the `models`, `fields`,
790/// and `special` modules instead.
791#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
792#[serde(tag = "type")]
793pub enum Operation {
794	/// CreateTable variant.
795	CreateTable {
796		/// The name.
797		name: String,
798		/// The columns.
799		columns: Vec<ColumnDefinition>,
800		#[serde(default)]
801		/// The constraints.
802		constraints: Vec<Constraint>,
803		#[serde(default, skip_serializing_if = "Option::is_none")]
804		/// The without rowid.
805		without_rowid: Option<bool>,
806		#[serde(default, skip_serializing_if = "Option::is_none")]
807		/// The interleave in parent.
808		interleave_in_parent: Option<InterleaveSpec>,
809		#[serde(default, skip_serializing_if = "Option::is_none")]
810		/// The partition.
811		partition: Option<PartitionOptions>,
812	},
813	/// DropTable variant.
814	DropTable {
815		/// The name.
816		name: String,
817	},
818	/// AddColumn variant.
819	AddColumn {
820		/// The table.
821		table: String,
822		/// The column.
823		column: ColumnDefinition,
824		#[serde(default, skip_serializing_if = "Option::is_none")]
825		/// The mysql options.
826		mysql_options: Option<AlterTableOptions>,
827	},
828	/// DropColumn variant.
829	DropColumn {
830		/// The table.
831		table: String,
832		/// The column.
833		column: String,
834	},
835	/// AlterColumn variant.
836	AlterColumn {
837		/// The table.
838		table: String,
839		/// The column.
840		column: String,
841		/// Original column definition (before alteration).
842		/// This is required for generating accurate rollback SQL.
843		/// If None, rollback will attempt to reconstruct from ProjectState.
844		#[serde(default, skip_serializing_if = "Option::is_none")]
845		old_definition: Option<ColumnDefinition>,
846		/// The new definition.
847		new_definition: ColumnDefinition,
848		#[serde(default, skip_serializing_if = "Option::is_none")]
849		/// The mysql options.
850		mysql_options: Option<AlterTableOptions>,
851	},
852	/// RenameTable variant.
853	RenameTable {
854		/// The old name.
855		old_name: String,
856		/// The new name.
857		new_name: String,
858	},
859	/// RenameColumn variant.
860	RenameColumn {
861		/// The table.
862		table: String,
863		/// The old name.
864		old_name: String,
865		/// The new name.
866		new_name: String,
867	},
868	/// AddConstraint variant.
869	AddConstraint {
870		/// The table.
871		table: String,
872		/// The constraint sql.
873		constraint_sql: String,
874	},
875	/// DropConstraint variant.
876	DropConstraint {
877		/// The table.
878		table: String,
879		/// The constraint name.
880		constraint_name: String,
881	},
882	/// CreateIndex variant.
883	CreateIndex {
884		/// The table.
885		table: String,
886		/// The columns.
887		columns: Vec<String>,
888		/// The unique.
889		unique: bool,
890		/// Index type (B-Tree, Hash, GIN, GiST, etc.)
891		///
892		/// If not specified, the database will use its default index type (typically B-Tree).
893		#[serde(default, skip_serializing_if = "Option::is_none")]
894		index_type: Option<IndexType>,
895		/// Partial index condition (WHERE clause)
896		///
897		/// Creates a partial index that only indexes rows matching this condition.
898		/// Example: "status = 'active'" creates an index only for active rows.
899		#[serde(default, skip_serializing_if = "Option::is_none")]
900		where_clause: Option<String>,
901		/// Create index concurrently (PostgreSQL-specific)
902		///
903		/// When true, creates the index without locking the table for writes.
904		/// This is slower but allows concurrent operations during index creation.
905		#[serde(default)]
906		concurrently: bool,
907		/// Expression index (PostgreSQL, SQLite, MySQL 8.0+)
908		///
909		/// Index on computed expressions rather than simple column references.
910		/// When specified, these expressions are used instead of `columns`.
911		///
912		/// # Examples
913		///
914		/// ```rust,ignore
915		/// // Index on lowercase email for case-insensitive lookups
916		/// expressions: Some(vec!["LOWER(email)"]),
917		/// ```
918		///
919		/// **Note**: When `expressions` is Some, `columns` is ignored for SQL generation.
920		#[serde(default, skip_serializing_if = "Option::is_none")]
921		expressions: Option<Vec<String>>,
922		/// MySQL ALTER TABLE options (ALGORITHM, LOCK)
923		#[serde(default, skip_serializing_if = "Option::is_none")]
924		mysql_options: Option<AlterTableOptions>,
925		/// Operator class for index columns (PostgreSQL-specific)
926		///
927		/// Specifies a non-default operator class for the index.
928		/// Commonly used with extension-provided operator classes like `gin_trgm_ops`
929		/// for trigram similarity search with the pg_trgm extension.
930		///
931		/// # Examples
932		///
933		/// ```rust,ignore
934		/// // GIN index with trigram operator class for fuzzy text search
935		/// CreateIndex {
936		///     table: "products".to_string(),
937		///     columns: vec!["name".to_string()],
938		///     index_type: Some(IndexType::Gin),
939		///     operator_class: Some("gin_trgm_ops"),
940		///     ...
941		/// }
942		/// ```
943		#[serde(default, skip_serializing_if = "Option::is_none")]
944		operator_class: Option<String>,
945	},
946	/// DropIndex variant.
947	DropIndex {
948		/// The table.
949		table: String,
950		/// The columns.
951		columns: Vec<String>,
952	},
953	/// RunSQL variant.
954	RunSQL {
955		/// The sql.
956		sql: String,
957		/// The reverse sql.
958		reverse_sql: Option<String>,
959	},
960	/// RunRust variant.
961	RunRust {
962		/// The code.
963		code: String,
964		/// The reverse code.
965		reverse_code: Option<String>,
966	},
967	/// AlterTableComment variant.
968	AlterTableComment {
969		/// The table.
970		table: String,
971		/// The comment.
972		comment: Option<String>,
973	},
974	/// AlterUniqueTogether variant.
975	AlterUniqueTogether {
976		/// The table.
977		table: String,
978		/// The unique together.
979		unique_together: Vec<Vec<String>>,
980	},
981	/// AlterModelOptions variant.
982	AlterModelOptions {
983		/// The table.
984		table: String,
985		/// The options.
986		options: std::collections::HashMap<String, String>,
987	},
988	/// CreateInheritedTable variant.
989	CreateInheritedTable {
990		/// The name.
991		name: String,
992		/// The columns.
993		columns: Vec<ColumnDefinition>,
994		/// The base table.
995		base_table: String,
996		/// The join column.
997		join_column: String,
998	},
999	/// AddDiscriminatorColumn variant.
1000	AddDiscriminatorColumn {
1001		/// The table.
1002		table: String,
1003		/// The column name.
1004		column_name: String,
1005		/// The default value.
1006		default_value: String,
1007	},
1008	/// Move a model from one app to another
1009	///
1010	/// This operation handles cross-app model moves by:
1011	/// 1. Optionally renaming the table (if naming convention changes between apps)
1012	/// 2. Updating FK references to use the new table name
1013	///
1014	/// Note: This generates a RenameTable SQL if table name changes.
1015	/// The state tracking (from_app -> to_app) is handled at the ProjectState level.
1016	MoveModel {
1017		/// Name of the model being moved
1018		model_name: String,
1019		/// Source app label
1020		from_app: String,
1021		/// Target app label
1022		to_app: String,
1023		/// Whether to rename the underlying table
1024		rename_table: bool,
1025		/// Old table name (if rename_table is true)
1026		old_table_name: Option<String>,
1027		/// New table name (if rename_table is true)
1028		new_table_name: Option<String>,
1029	},
1030	/// Create a database schema (PostgreSQL, MySQL 5.0.2+)
1031	///
1032	/// Creates a new database schema namespace. In MySQL, this is equivalent to creating a database.
1033	CreateSchema {
1034		/// Name of the schema to create
1035		name: String,
1036		/// Whether to add IF NOT EXISTS clause
1037		#[serde(default)]
1038		if_not_exists: bool,
1039	},
1040	/// Drop a database schema
1041	///
1042	/// Drops an existing database schema. Use with caution as this will drop all objects in the schema.
1043	DropSchema {
1044		/// Name of the schema to drop
1045		name: String,
1046		/// Whether to add CASCADE clause (drops all contained objects)
1047		#[serde(default)]
1048		cascade: bool,
1049		/// Whether to add IF EXISTS clause
1050		#[serde(default = "default_true")]
1051		if_exists: bool,
1052	},
1053	/// Create a PostgreSQL extension (PostgreSQL-specific)
1054	///
1055	/// Creates a PostgreSQL extension like PostGIS, uuid-ossp, etc.
1056	/// This operation is only executed on PostgreSQL databases.
1057	CreateExtension {
1058		/// Name of the extension to create
1059		name: String,
1060		/// Whether to add IF NOT EXISTS clause
1061		#[serde(default = "default_true")]
1062		if_not_exists: bool,
1063		/// Optional schema to install the extension in
1064		#[serde(default)]
1065		schema: Option<String>,
1066	},
1067	/// Bulk data loading operation
1068	///
1069	/// Loads large amounts of data efficiently using database-native bulk loading commands:
1070	/// - PostgreSQL: `COPY table FROM source WITH (FORMAT csv, ...)`
1071	/// - MySQL: `LOAD DATA [LOCAL] INFILE 'path' INTO TABLE table ...`
1072	/// - SQLite: Not supported (falls back to INSERT statements)
1073	///
1074	/// # Performance
1075	///
1076	/// Bulk loading is typically 10-100x faster than individual INSERT statements
1077	/// for large datasets.
1078	///
1079	/// # Examples
1080	///
1081	/// ```rust,ignore
1082	/// use reinhardt_db::migrations::{Operation, BulkLoadSource, BulkLoadFormat, BulkLoadOptions};
1083	///
1084	/// // PostgreSQL COPY FROM file
1085	/// let op = Operation::BulkLoad {
1086	///     table: "events".to_string(),
1087	///     source: BulkLoadSource::File("/tmp/events.csv"),
1088	///     format: BulkLoadFormat::Csv,
1089	///     options: BulkLoadOptions::new()
1090	///         .with_header(true)
1091	///         .with_delimiter(','),
1092	/// };
1093	///
1094	/// // MySQL LOAD DATA LOCAL INFILE
1095	/// let op = Operation::BulkLoad {
1096	///     table: "events".to_string(),
1097	///     source: BulkLoadSource::File("/tmp/events.csv"),
1098	///     format: BulkLoadFormat::Csv,
1099	///     options: BulkLoadOptions::new()
1100	///         .with_local(true)
1101	///         .with_delimiter(','),
1102	/// };
1103	/// ```
1104	BulkLoad {
1105		/// Target table name
1106		table: String,
1107		/// Source of the data
1108		source: BulkLoadSource,
1109		/// Format of the data
1110		#[serde(default)]
1111		format: BulkLoadFormat,
1112		/// Additional loading options
1113		#[serde(default)]
1114		options: BulkLoadOptions,
1115	},
1116	/// Reset the auto-increment counter for a table
1117	///
1118	/// Sets the next value produced by the table's auto-increment mechanism.
1119	/// Typical uses include seeding IDs after a bulk import or shifting the
1120	/// sequence above a range reserved for historical data.
1121	///
1122	/// # Backend Behavior
1123	///
1124	/// - **PostgreSQL / CockroachDB**: `SELECT setval(pg_get_serial_sequence('{table}', '{column}'), {value}, false)`
1125	///   (resolves the sequence dynamically so both default `SERIAL` conventions
1126	///   and user-defined sequence names work; `false` makes the NEXT generated
1127	///   value equal `{value}`).
1128	/// - **MySQL**: `ALTER TABLE {table} AUTO_INCREMENT = {value}`.
1129	/// - **SQLite**: `INSERT OR REPLACE INTO sqlite_sequence(name, seq) VALUES (...)`
1130	///   (robust against tables that have not yet inserted any rows, where a
1131	///   simple `UPDATE` would silently no-op).
1132	SetAutoIncrementValue {
1133		/// The table whose auto-increment counter should be set.
1134		table: String,
1135		/// The auto-increment column (used to resolve the backing sequence
1136		/// on PostgreSQL / CockroachDB).
1137		column: String,
1138		/// The next value the counter should produce.
1139		value: i64,
1140	},
1141	/// Create a composite (multi-column) PRIMARY KEY constraint on an existing table
1142	///
1143	/// Emits `ALTER TABLE {table} ADD CONSTRAINT {name} PRIMARY KEY ({cols})`
1144	/// on every supported backend. When `constraint_name` is `None` the name
1145	/// defaults to `{table}_pkey`, matching PostgreSQL's conventional
1146	/// auto-generated identifier.
1147	///
1148	/// `columns` must be non-empty; emitting an empty column list would
1149	/// produce invalid SQL and is rejected as an `InvalidMigration` error at
1150	/// SQL generation time.
1151	CreateCompositePrimaryKey {
1152		/// The table to add the composite primary key to.
1153		table: String,
1154		/// The ordered list of columns participating in the primary key.
1155		columns: Vec<String>,
1156		/// Optional explicit constraint name. Defaults to `{table}_pkey`
1157		/// when `None`.
1158		#[serde(default, skip_serializing_if = "Option::is_none")]
1159		constraint_name: Option<String>,
1160	},
1161}
1162
1163/// Default value provider for serde (returns true)
1164const fn default_true() -> bool {
1165	true
1166}
1167
1168impl Operation {
1169	/// Apply this operation to the project state (forward)
1170	pub fn state_forwards(&self, app_label: &str, state: &mut ProjectState) {
1171		match self {
1172			Operation::CreateTable { name, columns, .. } => {
1173				let mut model = ModelState::new(app_label, name.clone());
1174				for column in columns {
1175					let field = FieldState::new(
1176						column.name.to_string(),
1177						column.type_definition.clone(),
1178						false,
1179					);
1180					model.add_field(field);
1181				}
1182				state.add_model(model);
1183			}
1184			Operation::DropTable { name } => {
1185				state.remove_model(app_label, name);
1186			}
1187			Operation::AddColumn { table, column, .. } => {
1188				if let Some(model) = state.get_model_mut(app_label, table) {
1189					let field = FieldState::new(
1190						column.name.to_string(),
1191						column.type_definition.clone(),
1192						false,
1193					);
1194					model.add_field(field);
1195				}
1196			}
1197			Operation::DropColumn { table, column } => {
1198				if let Some(model) = state.get_model_mut(app_label, table) {
1199					model.remove_field(column);
1200				}
1201			}
1202			Operation::AlterColumn {
1203				table,
1204				column,
1205				new_definition,
1206				..
1207			} => {
1208				if let Some(model) = state.get_model_mut(app_label, table) {
1209					let field = FieldState::new(
1210						column.to_string(),
1211						new_definition.type_definition.clone(),
1212						false,
1213					);
1214					model.alter_field(column, field);
1215				}
1216			}
1217			Operation::RenameTable { old_name, new_name } => {
1218				state.rename_model(app_label, old_name, new_name.to_string());
1219			}
1220			Operation::RenameColumn {
1221				table,
1222				old_name,
1223				new_name,
1224			} => {
1225				if let Some(model) = state.get_model_mut(app_label, table) {
1226					model.rename_field(old_name, new_name.to_string());
1227				}
1228			}
1229			Operation::CreateInheritedTable {
1230				name,
1231				columns,
1232				base_table,
1233				join_column,
1234			} => {
1235				let mut model = ModelState::new(app_label, name.clone());
1236				model.base_model = Some(base_table.to_string());
1237				model.inheritance_type = Some("joined_table".to_string());
1238
1239				let join_field = FieldState::new(
1240					join_column.to_string(),
1241					FieldType::Custom(format!("INTEGER REFERENCES {}(id)", base_table)),
1242					false,
1243				);
1244				model.add_field(join_field);
1245
1246				for column in columns {
1247					let field = FieldState::new(
1248						column.name.to_string(),
1249						column.type_definition.clone(),
1250						false,
1251					);
1252					model.add_field(field);
1253				}
1254				state.add_model(model);
1255			}
1256			Operation::AddDiscriminatorColumn {
1257				table,
1258				column_name,
1259				default_value,
1260			} => {
1261				if let Some(model) = state.get_model_mut(app_label, table) {
1262					model.discriminator_column = Some(column_name.to_string());
1263					model.inheritance_type = Some("single_table".to_string());
1264					let field = FieldState::new(
1265						column_name.to_string(),
1266						FieldType::Custom(format!("VARCHAR(50) DEFAULT '{}'", default_value)),
1267						false,
1268					);
1269					model.add_field(field);
1270				}
1271			}
1272			Operation::AddConstraint { .. }
1273			| Operation::DropConstraint { .. }
1274			| Operation::CreateIndex { .. }
1275			| Operation::DropIndex { .. }
1276			| Operation::RunSQL { .. }
1277			| Operation::RunRust { .. }
1278			| Operation::AlterTableComment { .. }
1279			| Operation::AlterUniqueTogether { .. }
1280			| Operation::AlterModelOptions { .. }
1281			| Operation::SetAutoIncrementValue { .. }
1282			| Operation::CreateCompositePrimaryKey { .. } => {
1283				// Counter/constraint-level ops do not affect ProjectState
1284				// (they track model-level structure only).
1285			}
1286			Operation::MoveModel {
1287				model_name,
1288				from_app,
1289				to_app,
1290				rename_table,
1291				old_table_name,
1292				new_table_name,
1293			} => {
1294				// Move the model from one app to another in the project state
1295				// First get the model, then remove it from the old location
1296				if let Some(model) = state.get_model(from_app, model_name).cloned() {
1297					state.remove_model(from_app, model_name);
1298
1299					// Create a new model with updated app label
1300					let mut new_model = model;
1301					new_model.app_label = to_app.to_string();
1302
1303					// Update table name if rename_table is true
1304					if *rename_table
1305						&& let (Some(_old_name), Some(new_name)) = (old_table_name, new_table_name)
1306					{
1307						new_model.table_name = new_name.to_string();
1308					}
1309
1310					state.add_model(new_model);
1311				}
1312			}
1313			// Schema operations don't affect ProjectState (models/fields only)
1314			Operation::CreateSchema { .. }
1315			| Operation::DropSchema { .. }
1316			| Operation::CreateExtension { .. } => {
1317				// No state changes for schema/extension operations
1318			}
1319			// BulkLoad is a data operation that doesn't affect model structure
1320			Operation::BulkLoad { .. } => {
1321				// No state changes for bulk data loading
1322			}
1323		}
1324	}
1325
1326	/// Generate column SQL without PRIMARY KEY constraint (for composite primary keys)
1327	///
1328	/// This function is used when the table has a composite primary key defined at the table level.
1329	/// It generates column definitions without individual PRIMARY KEY keywords to avoid conflicts.
1330	fn column_to_sql_without_pk(col: &ColumnDefinition, dialect: &SqlDialect) -> String {
1331		let mut parts = Vec::new();
1332
1333		// Column name
1334		parts.push(quote_identifier(&col.name));
1335
1336		// Column type
1337		if col.auto_increment {
1338			match dialect {
1339				SqlDialect::Postgres | SqlDialect::Cockroachdb => {
1340					// PostgreSQL 10+ uses GENERATED BY DEFAULT AS IDENTITY
1341					match &col.type_definition {
1342						FieldType::BigInteger => {
1343							parts
1344								.push("BIGINT GENERATED BY DEFAULT AS IDENTITY".to_string().into());
1345						}
1346						FieldType::Integer => {
1347							parts.push(
1348								"INTEGER GENERATED BY DEFAULT AS IDENTITY"
1349									.to_string()
1350									.into(),
1351							);
1352						}
1353						FieldType::SmallInteger => {
1354							parts.push(
1355								"SMALLINT GENERATED BY DEFAULT AS IDENTITY"
1356									.to_string()
1357									.into(),
1358							);
1359						}
1360						_ => {
1361							// Fallback for other types
1362							parts.push(col.type_definition.to_sql_for_dialect(dialect).into());
1363						}
1364					}
1365				}
1366				SqlDialect::Mysql => {
1367					parts.push(col.type_definition.to_sql_for_dialect(dialect).into());
1368					parts.push("AUTO_INCREMENT".to_string().into());
1369				}
1370				SqlDialect::Sqlite => {
1371					// SQLite requires the literal token `INTEGER` (not `BIGINT`/`SMALLINT`)
1372					// for AUTOINCREMENT columns. Widen any integer width to `INTEGER`
1373					// because SQLite's storage classes do not distinguish integer widths.
1374					match &col.type_definition {
1375						FieldType::BigInteger | FieldType::Integer | FieldType::SmallInteger => {
1376							parts.push("INTEGER".to_string().into());
1377						}
1378						_ => {
1379							// Non-integer auto_increment is invalid for SQLite; emit the
1380							// original type and let SQLite surface the error rather than
1381							// silently mis-emitting.
1382							parts.push(col.type_definition.to_sql_for_dialect(dialect).into());
1383						}
1384					}
1385					// For SQLite, if part of composite PK, we don't add AUTOINCREMENT here
1386					// It will be handled by the table-level PRIMARY KEY constraint
1387				}
1388			}
1389		} else {
1390			parts.push(col.type_definition.to_sql_for_dialect(dialect).into());
1391		}
1392
1393		// NOT NULL constraint
1394		if col.not_null {
1395			parts.push("NOT NULL".to_string().into());
1396		}
1397
1398		// UNIQUE constraint (but NOT PRIMARY KEY)
1399		if col.unique {
1400			parts.push("UNIQUE".to_string().into());
1401		}
1402
1403		// DEFAULT value
1404		if let Some(default) = &col.default {
1405			parts.push(format!("DEFAULT {}", default).into());
1406		}
1407
1408		parts.join(" ")
1409	}
1410
1411	/// Generate column SQL with all constraints
1412	fn column_to_sql(col: &ColumnDefinition, dialect: &SqlDialect) -> String {
1413		let mut parts = Vec::new();
1414
1415		// Column name
1416		parts.push(quote_identifier(&col.name));
1417
1418		// Column type (with auto_increment handling for PostgreSQL)
1419		if col.auto_increment {
1420			match dialect {
1421				SqlDialect::Postgres | SqlDialect::Cockroachdb => {
1422					// PostgreSQL 10+ uses GENERATED BY DEFAULT AS IDENTITY
1423					match &col.type_definition {
1424						FieldType::BigInteger => {
1425							parts
1426								.push("BIGINT GENERATED BY DEFAULT AS IDENTITY".to_string().into());
1427						}
1428						FieldType::Integer => {
1429							parts.push(
1430								"INTEGER GENERATED BY DEFAULT AS IDENTITY"
1431									.to_string()
1432									.into(),
1433							);
1434						}
1435						FieldType::SmallInteger => {
1436							parts.push(
1437								"SMALLINT GENERATED BY DEFAULT AS IDENTITY"
1438									.to_string()
1439									.into(),
1440							);
1441						}
1442						_ => {
1443							// Fallback for other types
1444							parts.push(col.type_definition.to_sql_for_dialect(dialect).into());
1445						}
1446					}
1447				}
1448				SqlDialect::Mysql => {
1449					parts.push(col.type_definition.to_sql_for_dialect(dialect).into());
1450					parts.push("AUTO_INCREMENT".to_string().into());
1451				}
1452				SqlDialect::Sqlite => {
1453					// SQLite requires the literal token `INTEGER` (not `BIGINT`/`SMALLINT`)
1454					// for AUTOINCREMENT columns. Widen any integer width to `INTEGER`
1455					// because SQLite's storage classes do not distinguish integer widths,
1456					// and `BIGINT PRIMARY KEY AUTOINCREMENT` is rejected at apply time
1457					// with: "AUTOINCREMENT is only allowed on an INTEGER PRIMARY KEY".
1458					//
1459					// For non-integer column types (e.g. `Uuid`), `auto_increment = true`
1460					// is meaningless in SQLite — the AUTOINCREMENT keyword would be
1461					// rejected. Emit the column type as a plain PRIMARY KEY (no
1462					// AUTOINCREMENT). See reinhardt-web#4378.
1463					let widened_to_integer = matches!(
1464						&col.type_definition,
1465						FieldType::BigInteger | FieldType::Integer | FieldType::SmallInteger
1466					);
1467					if widened_to_integer {
1468						parts.push("INTEGER".to_string().into());
1469					} else {
1470						parts.push(col.type_definition.to_sql_for_dialect(dialect).into());
1471					}
1472					// SQLite: AUTOINCREMENT requires `INTEGER PRIMARY KEY AUTOINCREMENT`.
1473					// Note that `INTEGER PRIMARY KEY` alone only enables rowid auto-assignment
1474					// (alias for the rowid); the explicit AUTOINCREMENT keyword is required to
1475					// guarantee monotonic, non-reused IDs (backed by sqlite_sequence).
1476					if col.primary_key {
1477						if widened_to_integer {
1478							parts.push("PRIMARY KEY AUTOINCREMENT".to_string().into());
1479						} else {
1480							parts.push("PRIMARY KEY".to_string().into());
1481						}
1482						// Return early to avoid duplicate PRIMARY KEY
1483						if col.unique {
1484							parts.push("UNIQUE".to_string().into());
1485						}
1486						if let Some(default) = &col.default {
1487							parts.push(format!("DEFAULT {}", default).into());
1488						}
1489						return parts.join(" ");
1490					}
1491				}
1492			}
1493		} else {
1494			parts.push(col.type_definition.to_sql_for_dialect(dialect).into());
1495		}
1496
1497		// NOT NULL constraint
1498		if col.not_null {
1499			parts.push("NOT NULL".to_string().into());
1500		}
1501
1502		// PRIMARY KEY constraint
1503		if col.primary_key {
1504			parts.push("PRIMARY KEY".to_string().into());
1505		}
1506
1507		// UNIQUE constraint
1508		if col.unique {
1509			parts.push("UNIQUE".to_string().into());
1510		}
1511
1512		// DEFAULT value
1513		if let Some(default) = &col.default {
1514			parts.push(format!("DEFAULT {}", default).into());
1515		}
1516
1517		parts.join(" ")
1518	}
1519
1520	/// Generate forward SQL
1521	pub fn to_sql(&self, dialect: &SqlDialect) -> String {
1522		match self {
1523			Operation::CreateTable {
1524				name,
1525				columns,
1526				constraints,
1527				without_rowid,
1528				interleave_in_parent,
1529				partition,
1530			} => {
1531				// Detect composite primary key
1532				let pk_columns: Vec<&String> = columns
1533					.iter()
1534					.filter(|col| col.primary_key)
1535					.map(|col| &col.name)
1536					.collect();
1537				let has_composite_pk = pk_columns.len() > 1;
1538
1539				let mut parts = Vec::new();
1540				for col in columns {
1541					// Use column_to_sql_without_pk for composite PKs to avoid duplicate PRIMARY KEY
1542					if has_composite_pk {
1543						parts.push(format!(
1544							"  {}",
1545							Self::column_to_sql_without_pk(col, dialect)
1546						));
1547					} else {
1548						parts.push(format!("  {}", Self::column_to_sql(col, dialect)));
1549					}
1550				}
1551
1552				// Add composite primary key constraint if detected
1553				if has_composite_pk {
1554					let pk_constraint_name = format!("{}_pkey", name);
1555					let quoted_pk_columns = pk_columns
1556						.iter()
1557						.map(|s| quote_identifier(s))
1558						.collect::<Vec<_>>()
1559						.join(", ");
1560					let pk_constraint = format!(
1561						"  CONSTRAINT {} PRIMARY KEY ({})",
1562						quote_identifier(&pk_constraint_name),
1563						quoted_pk_columns
1564					);
1565					parts.push(pk_constraint);
1566				}
1567
1568				for constraint in constraints {
1569					parts.push(format!("  {}", constraint));
1570				}
1571				let mut sql = format!(
1572					"CREATE TABLE {} (\n{}\n)",
1573					quote_identifier(name),
1574					parts.join(",\n")
1575				);
1576
1577				// SQLite: WITHOUT ROWID optimization for tables with explicit PRIMARY KEY
1578				if matches!(dialect, SqlDialect::Sqlite)
1579					&& let Some(true) = without_rowid
1580				{
1581					sql.push_str(" WITHOUT ROWID");
1582				}
1583
1584				// MySQL: Table partitioning
1585				if matches!(dialect, SqlDialect::Mysql)
1586					&& let Some(partition_opts) = partition
1587				{
1588					sql.push(' ');
1589					sql.push_str(&partition_opts.to_sql());
1590				}
1591
1592				// CockroachDB: INTERLEAVE IN PARENT for co-locating child rows with parent
1593				if matches!(dialect, SqlDialect::Cockroachdb)
1594					&& let Some(interleave) = interleave_in_parent
1595				{
1596					let quoted_columns = interleave
1597						.parent_columns
1598						.iter()
1599						.map(|col| quote_identifier(col))
1600						.collect::<Vec<_>>()
1601						.join(", ");
1602					sql.push_str(&format!(
1603						" INTERLEAVE IN PARENT {} ({})",
1604						quote_identifier(&interleave.parent_table),
1605						quoted_columns
1606					));
1607				}
1608
1609				sql.push(';');
1610				sql
1611			}
1612			Operation::DropTable { name } => format!("DROP TABLE {};", quote_identifier(name)),
1613			Operation::AddColumn {
1614				table,
1615				column,
1616				mysql_options,
1617			} => {
1618				let base_sql = format!(
1619					"ALTER TABLE {} ADD COLUMN {}",
1620					quote_identifier(table),
1621					Self::column_to_sql(column, dialect)
1622				);
1623
1624				// MySQL: Add ALGORITHM/LOCK options
1625				if matches!(dialect, SqlDialect::Mysql)
1626					&& let Some(opts) = mysql_options
1627				{
1628					let suffix = opts.to_sql_suffix();
1629					if !suffix.is_empty() {
1630						return format!("{}{};", base_sql, suffix);
1631					}
1632				}
1633
1634				format!("{};", base_sql)
1635			}
1636			Operation::DropColumn { table, column } => {
1637				format!(
1638					"ALTER TABLE {} DROP COLUMN {};",
1639					quote_identifier(table),
1640					quote_identifier(column)
1641				)
1642			}
1643			Operation::AlterColumn {
1644				table,
1645				column,
1646				new_definition,
1647				mysql_options,
1648				..
1649			} => {
1650				let sql_type = new_definition.type_definition.to_sql_for_dialect(dialect);
1651				match dialect {
1652					SqlDialect::Postgres | SqlDialect::Cockroachdb => {
1653						format!(
1654							"ALTER TABLE {} ALTER COLUMN {} TYPE {};",
1655							quote_identifier(table),
1656							quote_identifier(column),
1657							sql_type
1658						)
1659					}
1660					SqlDialect::Mysql => {
1661						let base_sql = format!(
1662							"ALTER TABLE {} MODIFY COLUMN {} {}",
1663							quote_identifier(table),
1664							quote_identifier(column),
1665							sql_type
1666						);
1667
1668						// MySQL: Add ALGORITHM/LOCK options
1669						if let Some(opts) = mysql_options {
1670							let suffix = opts.to_sql_suffix();
1671							if !suffix.is_empty() {
1672								return format!("{}{};", base_sql, suffix);
1673							}
1674						}
1675
1676						format!("{};", base_sql)
1677					}
1678					SqlDialect::Sqlite => {
1679						format!(
1680							"-- SQLite does not support ALTER COLUMN, table recreation required for {}",
1681							quote_identifier(table)
1682						)
1683					}
1684				}
1685			}
1686			Operation::RenameColumn {
1687				table,
1688				old_name,
1689				new_name,
1690			} => {
1691				format!(
1692					"ALTER TABLE {} RENAME COLUMN {} TO {};",
1693					quote_identifier(table),
1694					quote_identifier(old_name),
1695					quote_identifier(new_name)
1696				)
1697			}
1698			Operation::RenameTable { old_name, new_name } => {
1699				format!(
1700					"ALTER TABLE {} RENAME TO {};",
1701					quote_identifier(old_name),
1702					quote_identifier(new_name)
1703				)
1704			}
1705			Operation::AddConstraint {
1706				table,
1707				constraint_sql,
1708			} => {
1709				format!(
1710					"ALTER TABLE {} ADD {};",
1711					quote_identifier(table),
1712					constraint_sql
1713				)
1714			}
1715			Operation::DropConstraint {
1716				table,
1717				constraint_name,
1718			} => {
1719				format!(
1720					"ALTER TABLE {} DROP CONSTRAINT {};",
1721					quote_identifier(table),
1722					quote_identifier(constraint_name)
1723				)
1724			}
1725			Operation::CreateIndex {
1726				table,
1727				columns,
1728				unique,
1729				index_type,
1730				where_clause,
1731				concurrently,
1732				expressions,
1733				mysql_options,
1734				operator_class,
1735			} => {
1736				let unique_str = if *unique { "UNIQUE " } else { "" };
1737
1738				// PostgreSQL: CONCURRENTLY keyword (must come before UNIQUE)
1739				let concurrent_str = if *concurrently && matches!(dialect, SqlDialect::Postgres) {
1740					"CONCURRENTLY "
1741				} else {
1742					""
1743				};
1744
1745				// MySQL: FULLTEXT/SPATIAL prefix (replaces UNIQUE for these types)
1746				let (mysql_prefix, effective_unique) = match (index_type, dialect) {
1747					(Some(IndexType::Fulltext), SqlDialect::Mysql) => ("FULLTEXT ", ""),
1748					(Some(IndexType::Spatial), SqlDialect::Mysql) => ("SPATIAL ", ""),
1749					_ => ("", unique_str),
1750				};
1751
1752				// Determine what to index: expressions or columns
1753				let (index_content, name_suffix) =
1754					if let Some(exprs) = expressions.as_ref().filter(|e| !e.is_empty()) {
1755						// For expression indexes, use expressions and generate a hash-based suffix
1756						// Expressions are assumed to be properly formatted, no additional quoting needed
1757						let content = exprs.join(", ");
1758						let suffix = "expr";
1759						(content, suffix.to_string())
1760					} else {
1761						// Use columns with optional operator class
1762						let content = if let Some(op_class) = operator_class {
1763							// Apply operator class to each column (PostgreSQL-specific)
1764							if matches!(dialect, SqlDialect::Postgres) {
1765								columns
1766									.iter()
1767									.map(|c| format!("{} {}", quote_identifier(c), op_class))
1768									.collect::<Vec<_>>()
1769									.join(", ")
1770							} else {
1771								// Quote column names for safety (reserved words, special chars)
1772								columns
1773									.iter()
1774									.map(|c| quote_identifier(c).to_string())
1775									.collect::<Vec<_>>()
1776									.join(", ")
1777							}
1778						} else {
1779							// Quote column names for safety (reserved words, special chars)
1780							columns
1781								.iter()
1782								.map(|c| quote_identifier(c).to_string())
1783								.collect::<Vec<_>>()
1784								.join(", ")
1785						};
1786						(content, columns.join("_"))
1787					};
1788
1789				let idx_name = format!("idx_{}_{}", table, name_suffix);
1790
1791				// Index type clause (USING type) - PostgreSQL, CockroachDB
1792				let using_clause = match (index_type, dialect) {
1793					(Some(IndexType::BTree), _) => String::new(), // Default, no need to specify
1794					(Some(idx_type), SqlDialect::Postgres | SqlDialect::Cockroachdb) => {
1795						format!(" USING {}", idx_type)
1796					}
1797					// MySQL FULLTEXT/SPATIAL handled via prefix, not USING
1798					(Some(IndexType::Fulltext | IndexType::Spatial), SqlDialect::Mysql) => {
1799						String::new()
1800					}
1801					_ => String::new(),
1802				};
1803
1804				// Build base SQL with correct syntax per dialect
1805				// PostgreSQL: CREATE [UNIQUE] INDEX [CONCURRENTLY] name [USING type] ON table (cols)
1806				// MySQL: CREATE [FULLTEXT|SPATIAL|UNIQUE] INDEX name ON table (cols)
1807				// SQLite: CREATE [UNIQUE] INDEX name ON table (cols)
1808				let mut sql = match dialect {
1809					SqlDialect::Postgres | SqlDialect::Cockroachdb => {
1810						// CONCURRENTLY goes between INDEX and index_name
1811						format!(
1812							"CREATE {}INDEX {}{}",
1813							effective_unique,
1814							concurrent_str,
1815							quote_identifier(&idx_name)
1816						)
1817					}
1818					SqlDialect::Mysql => {
1819						// MySQL doesn't support CONCURRENTLY or USING (except for FULLTEXT/SPATIAL prefix)
1820						format!(
1821							"CREATE {}{}INDEX {}",
1822							mysql_prefix,
1823							effective_unique,
1824							quote_identifier(&idx_name)
1825						)
1826					}
1827					SqlDialect::Sqlite => {
1828						// SQLite doesn't support CONCURRENTLY or USING
1829						format!(
1830							"CREATE {}INDEX {}",
1831							effective_unique,
1832							quote_identifier(&idx_name)
1833						)
1834					}
1835				};
1836				// PostgreSQL: ON table USING method (columns)
1837				// MySQL/SQLite: ON table (columns)
1838				// Quote table name for safety (reserved words, special chars)
1839				sql.push_str(&format!(
1840					" ON {}{} ({})",
1841					quote_identifier(table),
1842					using_clause,
1843					index_content
1844				));
1845
1846				// Add WHERE clause for partial indexes (PostgreSQL, SQLite, CockroachDB - not MySQL)
1847				if let Some(where_cond) = where_clause
1848					&& !matches!(dialect, SqlDialect::Mysql)
1849				{
1850					sql.push_str(&format!(" WHERE {}", where_cond));
1851				}
1852
1853				// MySQL: Add ALGORITHM/LOCK options
1854				if matches!(dialect, SqlDialect::Mysql)
1855					&& let Some(opts) = mysql_options
1856				{
1857					let suffix = opts.to_sql_suffix();
1858					if !suffix.is_empty() {
1859						sql.push_str(&suffix);
1860					}
1861				}
1862
1863				sql.push(';');
1864				sql
1865			}
1866			Operation::DropIndex { table, columns } => {
1867				let idx_name = format!("idx_{}_{}", table, columns.join("_"));
1868				match dialect {
1869					SqlDialect::Mysql => {
1870						format!(
1871							"DROP INDEX {} ON {};",
1872							quote_identifier(&idx_name),
1873							quote_identifier(table)
1874						)
1875					}
1876					SqlDialect::Postgres | SqlDialect::Sqlite | SqlDialect::Cockroachdb => {
1877						format!("DROP INDEX {};", quote_identifier(&idx_name))
1878					}
1879				}
1880			}
1881			Operation::RunSQL { sql, .. } => sql.to_string(),
1882			Operation::RunRust { code, .. } => {
1883				// For SQL generation, RunRust is a no-op comment
1884				format!("-- RunRust: {}", code.lines().next().unwrap_or(""))
1885			}
1886			Operation::AlterTableComment { table, comment } => match dialect {
1887				SqlDialect::Postgres | SqlDialect::Cockroachdb => {
1888					if let Some(comment_text) = comment {
1889						format!(
1890							"COMMENT ON TABLE {} IS '{}';",
1891							quote_identifier(table),
1892							comment_text
1893						)
1894					} else {
1895						format!("COMMENT ON TABLE {} IS NULL;", quote_identifier(table))
1896					}
1897				}
1898				SqlDialect::Mysql => {
1899					if let Some(comment_text) = comment {
1900						format!(
1901							"ALTER TABLE {} COMMENT='{}';",
1902							quote_identifier(table),
1903							comment_text
1904						)
1905					} else {
1906						format!("ALTER TABLE {} COMMENT='';", quote_identifier(table))
1907					}
1908				}
1909				SqlDialect::Sqlite => String::new(),
1910			},
1911			Operation::AlterUniqueTogether {
1912				table,
1913				unique_together,
1914			} => {
1915				let mut sql = Vec::new();
1916				for (idx, fields) in unique_together.iter().enumerate() {
1917					let constraint_name = format!("{}_{}_uniq", table, idx);
1918					let fields_str = fields
1919						.iter()
1920						.map(|f| quote_identifier(f))
1921						.collect::<Vec<_>>()
1922						.join(", ");
1923					sql.push(format!(
1924						"ALTER TABLE {} ADD CONSTRAINT {} UNIQUE ({});",
1925						quote_identifier(table),
1926						quote_identifier(&constraint_name),
1927						fields_str
1928					));
1929				}
1930				sql.join("\n")
1931			}
1932			Operation::AlterModelOptions { .. } => String::new(),
1933			Operation::CreateInheritedTable {
1934				name,
1935				columns,
1936				base_table,
1937				join_column,
1938			} => {
1939				let mut parts = Vec::new();
1940				parts.push(format!(
1941					"  {} INTEGER REFERENCES {}(id)",
1942					quote_identifier(join_column),
1943					quote_identifier(base_table)
1944				));
1945				for col in columns {
1946					parts.push(format!("  {}", Self::column_to_sql(col, dialect)));
1947				}
1948				format!(
1949					"CREATE TABLE {} (\n{}\n);",
1950					quote_identifier(name),
1951					parts.join(",\n")
1952				)
1953			}
1954			Operation::AddDiscriminatorColumn {
1955				table,
1956				column_name,
1957				default_value,
1958			} => {
1959				format!(
1960					"ALTER TABLE {} ADD COLUMN {} VARCHAR(50) DEFAULT '{}';",
1961					quote_identifier(table),
1962					quote_identifier(column_name),
1963					default_value
1964				)
1965			}
1966			Operation::MoveModel {
1967				rename_table,
1968				old_table_name,
1969				new_table_name,
1970				..
1971			} => {
1972				// MoveModel generates a RenameTable SQL if table name changes
1973				// Otherwise it's a state-only operation (no SQL needed)
1974				if *rename_table {
1975					if let (Some(old_name), Some(new_name)) = (old_table_name, new_table_name) {
1976						match dialect {
1977							SqlDialect::Postgres | SqlDialect::Sqlite | SqlDialect::Cockroachdb => {
1978								format!(
1979									"ALTER TABLE {} RENAME TO {};",
1980									quote_identifier(old_name),
1981									quote_identifier(new_name)
1982								)
1983							}
1984							SqlDialect::Mysql => {
1985								format!(
1986									"RENAME TABLE {} TO {};",
1987									quote_identifier(old_name),
1988									quote_identifier(new_name)
1989								)
1990							}
1991						}
1992					} else {
1993						"-- MoveModel: No table rename specified".to_string()
1994					}
1995				} else {
1996					// State-only operation, no SQL needed
1997					"-- MoveModel: State-only operation (no table rename)".to_string()
1998				}
1999			}
2000			Operation::CreateSchema {
2001				name,
2002				if_not_exists,
2003			} => {
2004				let if_not_exists_clause = if *if_not_exists { " IF NOT EXISTS" } else { "" };
2005				format!(
2006					"CREATE SCHEMA{} {};",
2007					if_not_exists_clause,
2008					quote_identifier(name)
2009				)
2010			}
2011			Operation::DropSchema {
2012				name,
2013				cascade,
2014				if_exists,
2015			} => {
2016				let if_exists_clause = if *if_exists { " IF EXISTS" } else { "" };
2017				let cascade_clause = if *cascade { " CASCADE" } else { "" };
2018				format!(
2019					"DROP SCHEMA{} {}{};",
2020					if_exists_clause,
2021					quote_identifier(name),
2022					cascade_clause
2023				)
2024			}
2025			Operation::CreateExtension {
2026				name,
2027				if_not_exists,
2028				schema,
2029			} => {
2030				// PostgreSQL-specific
2031				let if_not_exists_clause = if *if_not_exists { " IF NOT EXISTS" } else { "" };
2032				let schema_clause = if let Some(s) = schema {
2033					format!(" SCHEMA {}", quote_identifier(s))
2034				} else {
2035					String::new()
2036				};
2037				format!(
2038					"CREATE EXTENSION{} {}{};",
2039					if_not_exists_clause,
2040					quote_identifier(name),
2041					schema_clause
2042				)
2043			}
2044			Operation::BulkLoad {
2045				table,
2046				source,
2047				format,
2048				options,
2049			} => Self::bulk_load_to_sql(table, source, format, options, dialect),
2050			Operation::SetAutoIncrementValue {
2051				table,
2052				column,
2053				value,
2054			} => Self::set_auto_increment_to_sql(table, column, *value, dialect),
2055			Operation::CreateCompositePrimaryKey {
2056				table,
2057				columns,
2058				constraint_name,
2059			} => Self::create_composite_pk_to_sql(table, columns, constraint_name.as_deref()),
2060		}
2061	}
2062
2063	/// Generate `SetAutoIncrementValue` SQL for each dialect
2064	///
2065	/// PostgreSQL / CockroachDB resolve the backing sequence via
2066	/// `pg_get_serial_sequence(...)` so that both the default
2067	/// `{table}_{column}_seq` naming and user-customized sequences work without
2068	/// the caller having to know the sequence name.
2069	fn set_auto_increment_to_sql(
2070		table: &str,
2071		column: &str,
2072		value: i64,
2073		dialect: &SqlDialect,
2074	) -> String {
2075		match dialect {
2076			SqlDialect::Postgres | SqlDialect::Cockroachdb => {
2077				// pg_get_serial_sequence takes a regclass literal for the table
2078				// and a text literal for the column. `setval(..., value, false)`
2079				// makes the NEXT generated value equal `value`, matching the
2080				// intent of "set the auto-increment to <value>".
2081				format!(
2082					"SELECT setval(pg_get_serial_sequence({}, {}), {}, false);",
2083					quote_literal(table),
2084					quote_literal(column),
2085					value
2086				)
2087			}
2088			SqlDialect::Mysql => {
2089				format!(
2090					"ALTER TABLE {} AUTO_INCREMENT = {};",
2091					quote_identifier(table),
2092					value
2093				)
2094			}
2095			SqlDialect::Sqlite => {
2096				// INSERT OR REPLACE so the statement works whether or not a
2097				// sqlite_sequence row already exists for the table. UPDATE
2098				// would silently no-op on fresh tables that have never had
2099				// a row inserted.
2100				format!(
2101					"INSERT OR REPLACE INTO sqlite_sequence(name, seq) VALUES ({}, {});",
2102					quote_literal(table),
2103					value
2104				)
2105			}
2106		}
2107	}
2108
2109	/// Generate `CreateCompositePrimaryKey` SQL
2110	///
2111	/// Produces `ALTER TABLE ... ADD CONSTRAINT ... PRIMARY KEY (...)` for
2112	/// every supported backend. Emits guaranteed-fail SQL if the column list
2113	/// is empty so the migration aborts at execution time instead of silently
2114	/// succeeding.
2115	///
2116	/// Workaround for the shared infallible `String` return type used by every
2117	/// `to_sql` arm. Converting the entire pipeline to `Result` would cascade
2118	/// through dozens of call sites, so this arm instead emits a deliberately
2119	/// invalid SQL statement (a bare identifier) that every supported backend's
2120	/// parser rejects before execution. This replaces the earlier `SELECT 1/0`
2121	/// fallback, which silently returned `NULL` on SQLite and lax-mode MySQL
2122	/// (reinhardt-web#4325). The identifier text encodes the diagnostic so it
2123	/// surfaces in the parser error message.
2124	///
2125	/// The long-term fix — migrating the `to_sql` family to `Result` and
2126	/// returning a structured `MigrationError::EmptyCompositePrimaryKey` — is
2127	/// shown in the ideal implementation below.
2128	///
2129	/// Remove this workaround once the `to_sql` family is migrated to a
2130	/// fallible signature.
2131	///
2132	/// Ideal implementation (without workaround):
2133	///   fn create_composite_pk_to_sql(
2134	///       table: &str,
2135	///       columns: &[String],
2136	///       constraint_name: Option<&str>,
2137	///   ) -> Result<String, MigrationError> {
2138	///       if columns.is_empty() {
2139	///           return Err(MigrationError::EmptyCompositePrimaryKey {
2140	///               table: table.to_owned(),
2141	///           });
2142	///       }
2143	///       // ... build the ALTER TABLE statement ...
2144	///   }
2145	fn create_composite_pk_to_sql(
2146		table: &str,
2147		columns: &[String],
2148		constraint_name: Option<&str>,
2149	) -> String {
2150		if columns.is_empty() {
2151			// Deliberately invalid SQL: a bare identifier is not a valid
2152			// statement in PostgreSQL, MySQL, or SQLite grammar, so every
2153			// backend's parser rejects it before execution. This avoids the
2154			// lax-mode MySQL / SQLite silent-pass that the previous
2155			// `SELECT 1/0` fallback was prone to (reinhardt-web#4325). The
2156			// identifier text preserves the diagnostic in the parser error.
2157			return format!(
2158				"SYNTAX_ERROR_create_composite_pk_on_{}_requires_at_least_one_column;",
2159				table.replace(|c: char| !c.is_ascii_alphanumeric(), "_")
2160			);
2161		}
2162
2163		let default_name;
2164		let name: &str = match constraint_name {
2165			Some(n) => n,
2166			None => {
2167				default_name = format!("{}_pkey", table);
2168				&default_name
2169			}
2170		};
2171
2172		let quoted_columns = columns
2173			.iter()
2174			.map(|c| quote_identifier(c).to_string())
2175			.collect::<Vec<_>>()
2176			.join(", ");
2177
2178		format!(
2179			"ALTER TABLE {} ADD CONSTRAINT {} PRIMARY KEY ({});",
2180			quote_identifier(table),
2181			quote_identifier(name),
2182			quoted_columns
2183		)
2184	}
2185
2186	/// Generate bulk load SQL for different dialects
2187	fn bulk_load_to_sql(
2188		table: &str,
2189		source: &BulkLoadSource,
2190		format: &BulkLoadFormat,
2191		options: &BulkLoadOptions,
2192		dialect: &SqlDialect,
2193	) -> String {
2194		match dialect {
2195			SqlDialect::Postgres | SqlDialect::Cockroachdb => {
2196				Self::postgres_copy_from_sql(table, source, format, options)
2197			}
2198			SqlDialect::Mysql => Self::mysql_load_data_sql(table, source, format, options),
2199			SqlDialect::Sqlite => {
2200				// SQLite does not support bulk loading natively
2201				format!(
2202					"-- SQLite does not support bulk loading. Use INSERT statements instead for table {}",
2203					quote_identifier(table)
2204				)
2205			}
2206		}
2207	}
2208
2209	/// Generate PostgreSQL COPY FROM SQL
2210	fn postgres_copy_from_sql(
2211		table: &str,
2212		source: &BulkLoadSource,
2213		format: &BulkLoadFormat,
2214		options: &BulkLoadOptions,
2215	) -> String {
2216		let source_clause = match source {
2217			BulkLoadSource::File(path) => format!("'{}'", path),
2218			BulkLoadSource::Stdin => "STDIN".to_string(),
2219			BulkLoadSource::Program(cmd) => format!("PROGRAM '{}'", cmd),
2220		};
2221
2222		let columns_clause = if let Some(cols) = &options.columns {
2223			let quoted_cols = cols
2224				.iter()
2225				.map(|c| quote_identifier(c))
2226				.collect::<Vec<_>>()
2227				.join(", ");
2228			format!(" ({})", quoted_cols)
2229		} else {
2230			String::new()
2231		};
2232
2233		let mut with_options = Vec::new();
2234
2235		// Format
2236		with_options.push(format!("FORMAT {}", format));
2237
2238		// Delimiter
2239		if let Some(delim) = options.delimiter {
2240			with_options.push(format!("DELIMITER '{}'", delim));
2241		}
2242
2243		// NULL string
2244		if let Some(null_str) = &options.null_string {
2245			with_options.push(format!("NULL '{}'", null_str));
2246		}
2247
2248		// Header
2249		if options.header {
2250			with_options.push("HEADER true".to_string());
2251		}
2252
2253		// Quote character
2254		if let Some(quote) = options.quote {
2255			with_options.push(format!("QUOTE '{}'", quote));
2256		}
2257
2258		// Escape character
2259		if let Some(escape) = options.escape {
2260			with_options.push(format!("ESCAPE '{}'", escape));
2261		}
2262
2263		format!(
2264			"COPY {}{} FROM {} WITH ({});",
2265			quote_identifier(table),
2266			columns_clause,
2267			source_clause,
2268			with_options.join(", ")
2269		)
2270	}
2271
2272	/// Generate MySQL LOAD DATA SQL
2273	fn mysql_load_data_sql(
2274		table: &str,
2275		source: &BulkLoadSource,
2276		format: &BulkLoadFormat,
2277		options: &BulkLoadOptions,
2278	) -> String {
2279		let local_clause = if options.local { " LOCAL" } else { "" };
2280
2281		let file_path = match source {
2282			BulkLoadSource::File(path) => path.clone(),
2283			BulkLoadSource::Stdin => {
2284				return format!(
2285					"-- MySQL does not support LOAD DATA from STDIN directly for table {}",
2286					quote_identifier(table)
2287				);
2288			}
2289			BulkLoadSource::Program(_) => {
2290				return format!(
2291					"-- MySQL does not support LOAD DATA from PROGRAM directly for table {}",
2292					quote_identifier(table)
2293				);
2294			}
2295		};
2296
2297		let columns_clause = if let Some(cols) = &options.columns {
2298			let quoted_cols = cols
2299				.iter()
2300				.map(|c| quote_identifier(c))
2301				.collect::<Vec<_>>()
2302				.join(", ");
2303			format!(" ({})", quoted_cols)
2304		} else {
2305			String::new()
2306		};
2307
2308		// Field terminator (delimiter)
2309		let delimiter = options.delimiter.unwrap_or(match format {
2310			BulkLoadFormat::Csv => ',',
2311			BulkLoadFormat::Text | BulkLoadFormat::Binary => '\t',
2312		});
2313
2314		let mut field_options = Vec::new();
2315		field_options.push(format!("TERMINATED BY '{}'", delimiter));
2316
2317		// Quote character for CSV
2318		if *format == BulkLoadFormat::Csv {
2319			let quote = options.quote.unwrap_or('"');
2320			field_options.push(format!("ENCLOSED BY '{}'", quote));
2321		}
2322
2323		// Escape character
2324		if let Some(escape) = options.escape {
2325			field_options.push(format!("ESCAPED BY '{}'", escape));
2326		}
2327
2328		// Line terminator
2329		let line_terminator = options
2330			.line_terminator
2331			.clone()
2332			.unwrap_or_else(|| "\\n".to_string());
2333
2334		// Encoding
2335		let encoding_clause = if let Some(enc) = &options.encoding {
2336			format!(" CHARACTER SET {}", enc)
2337		} else {
2338			String::new()
2339		};
2340
2341		// Header handling (skip first line)
2342		let ignore_clause = if options.header {
2343			" IGNORE 1 LINES"
2344		} else {
2345			""
2346		};
2347
2348		format!(
2349			"LOAD DATA{} INFILE '{}'{} INTO TABLE {} FIELDS {} LINES TERMINATED BY '{}'{}{};",
2350			local_clause,
2351			file_path,
2352			encoding_clause,
2353			quote_identifier(table),
2354			field_options.join(" "),
2355			line_terminator,
2356			ignore_clause,
2357			columns_clause
2358		)
2359	}
2360
2361	/// Generate reverse SQL (for rollback)
2362	///
2363	/// # Arguments
2364	///
2365	/// * `dialect` - SQL dialect for generating database-specific SQL
2366	/// * `project_state` - Project state for accessing model definitions (needed for DropTable, etc.)
2367	///
2368	/// # Returns
2369	///
2370	/// * `Ok(Some(sql))` - Reverse SQL generated successfully
2371	/// * `Ok(None)` - Operation is not reversible (see Design Limitation below)
2372	/// * `Err(_)` - Error generating reverse SQL
2373	///
2374	/// # Design Limitation
2375	///
2376	/// Destructive operations (`DropTable`, `DropColumn`, `DropConstraint`, `AlterColumn`)
2377	/// require a pre-operation `ProjectState` snapshot to generate reverse SQL. When the
2378	/// `project_state` parameter does not contain the necessary model/column/constraint
2379	/// definition, this method returns `Ok(None)` instead of failing.
2380	///
2381	/// This is an intentional design decision: the migration system cannot reconstruct
2382	/// lost schema information. Callers must provide the state from before the operation
2383	/// was applied to enable proper rollback. This matches Django's migration behavior
2384	/// where `state_forwards` must be called before operations are reversed.
2385	pub fn to_reverse_sql(
2386		&self,
2387		dialect: &SqlDialect,
2388		project_state: &ProjectState,
2389	) -> super::Result<Option<String>> {
2390		match self {
2391			Operation::CreateTable { name, .. } => {
2392				Ok(Some(format!("DROP TABLE {};", quote_identifier(name))))
2393			}
2394			Operation::AddColumn { table, column, .. } => Ok(Some(format!(
2395				"ALTER TABLE {} DROP COLUMN {};",
2396				quote_identifier(table),
2397				quote_identifier(&column.name)
2398			))),
2399			Operation::RunSQL { reverse_sql, .. } => {
2400				Ok(reverse_sql.as_ref().map(|s| s.to_string()))
2401			}
2402			Operation::RunRust { reverse_code, .. } => Ok(reverse_code.as_ref().map(|code| {
2403				format!(
2404					"-- RunRust (reverse): {}",
2405					code.lines().next().unwrap_or("")
2406				)
2407			})),
2408			// Phase 1: Simple reverse operations
2409			Operation::RenameTable { old_name, new_name } => Ok(Some(format!(
2410				"ALTER TABLE {} RENAME TO {};",
2411				quote_identifier(new_name),
2412				quote_identifier(old_name)
2413			))),
2414			Operation::RenameColumn {
2415				table,
2416				old_name,
2417				new_name,
2418			} => Ok(Some(format!(
2419				"ALTER TABLE {} RENAME COLUMN {} TO {};",
2420				quote_identifier(table),
2421				quote_identifier(new_name),
2422				quote_identifier(old_name)
2423			))),
2424			Operation::CreateIndex { table, columns, .. } => {
2425				// Use the same naming convention as to_sql(): idx_{table}_{columns_joined}
2426				// This ensures the rollback DROP INDEX targets the correct index name
2427				let columns_joined = columns.join("_");
2428				let index_name = format!("idx_{}_{}", table, columns_joined);
2429				// MySQL requires `DROP INDEX <name> ON <table>`; PostgreSQL/SQLite/CockroachDB
2430				// only need the index name. Mirror the dialect dispatch used by the forward
2431				// `Operation::DropIndex` SQL generator above.
2432				Ok(Some(match dialect {
2433					SqlDialect::Mysql => format!(
2434						"DROP INDEX {} ON {};",
2435						quote_identifier(&index_name),
2436						quote_identifier(table)
2437					),
2438					SqlDialect::Postgres | SqlDialect::Sqlite | SqlDialect::Cockroachdb => {
2439						format!("DROP INDEX {};", quote_identifier(&index_name))
2440					}
2441				}))
2442			}
2443			Operation::AddConstraint {
2444				table,
2445				constraint_sql,
2446			} => {
2447				// Extract constraint name from SQL
2448				// Expects format: "CONSTRAINT <name> ..." or "ADD CONSTRAINT <name> ..."
2449				let constraint_name =
2450					Self::extract_constraint_name(constraint_sql).ok_or_else(|| {
2451						super::MigrationError::InvalidMigration(format!(
2452							"Cannot extract constraint name from: {}",
2453							constraint_sql
2454						))
2455					})?;
2456				Ok(Some(format!(
2457					"ALTER TABLE {} DROP CONSTRAINT {};",
2458					quote_identifier(table),
2459					quote_identifier(&constraint_name)
2460				)))
2461			}
2462			// Phase 2: Complex reverse operations using ProjectState
2463			Operation::DropColumn { table, column } => {
2464				// Retrieve original column definition from ProjectState
2465				if let Some(model) = project_state.find_model_by_table(table)
2466					&& let Some(field) = model.get_field(column)
2467				{
2468					let col_def = ColumnDefinition::from_field_state(column.clone(), field);
2469					let col_sql = Self::column_to_sql(&col_def, dialect);
2470					return Ok(Some(format!(
2471						"ALTER TABLE {} ADD COLUMN {};",
2472						quote_identifier(table),
2473						col_sql
2474					)));
2475				}
2476				// Cannot reconstruct without state
2477				Ok(None)
2478			}
2479			Operation::AlterColumn {
2480				table,
2481				column,
2482				old_definition,
2483				new_definition: _,
2484				..
2485			} => {
2486				// Resolve the original column definition to revert to.
2487				// Prioritize the explicit `old_definition` over ProjectState lookup.
2488				let resolved_old_def = old_definition.clone().or_else(|| {
2489					project_state
2490						.find_model_by_table(table)
2491						.and_then(|model| model.get_field(column))
2492						.map(|field| ColumnDefinition::from_field_state(column.clone(), field))
2493				});
2494
2495				let Some(old_def) = resolved_old_def else {
2496					// Cannot reconstruct without state
2497					return Ok(None);
2498				};
2499
2500				let type_sql = old_def.type_definition.to_sql_for_dialect(dialect);
2501				let null_clause = if old_def.not_null { " NOT NULL" } else { "" };
2502
2503				// Dispatch reverse SQL per dialect.
2504				// SQLite is handled via the SQLite-recreation path before reaching
2505				// here (see executor::rollback_migration and
2506				// `Operation::reverse_requires_sqlite_recreation`). The placeholder
2507				// comment below is a defensive fallback: emitting executable
2508				// ALTER COLUMN syntax on SQLite would always error (#4582).
2509				let sql = match dialect {
2510					SqlDialect::Postgres => {
2511						// PostgreSQL allows multiple `ALTER COLUMN` clauses to
2512						// be combined into a single `ALTER TABLE` statement via
2513						// comma separation. Use this form (rather than two
2514						// independent statements joined by `\n`) so the payload
2515						// remains a single statement and round-trips through
2516						// `SchemaEditor::execute()`, which is backed by
2517						// `sqlx::query(sql).execute(...)` and uses the Extended
2518						// Query protocol that rejects multi-statement payloads.
2519						// Fixes #4630.
2520						let nullability_clause = if old_def.not_null {
2521							"SET NOT NULL"
2522						} else {
2523							"DROP NOT NULL"
2524						};
2525						format!(
2526							"ALTER TABLE {table} \
2527							 ALTER COLUMN {column} TYPE {type_sql}, \
2528							 ALTER COLUMN {column} {nullability_clause};",
2529							table = quote_identifier(table),
2530							column = quote_identifier(column),
2531							type_sql = type_sql,
2532							nullability_clause = nullability_clause,
2533						)
2534					}
2535					SqlDialect::Cockroachdb => {
2536						// CockroachDB does NOT accept a combined `ALTER TABLE
2537						// ... ALTER COLUMN c TYPE T, ALTER COLUMN c {SET|DROP}
2538						// NOT NULL` in a single statement (it rejects mixing
2539						// `ALTER COLUMN ... TYPE` with sibling `ALTER COLUMN`
2540						// clauses), and `SchemaEditor::execute()` is a
2541						// single-statement dispatcher (sqlx Extended Query),
2542						// so the two clauses cannot share a payload either.
2543						//
2544						// Emit the column-type reversion only; nullability
2545						// rollback for CockroachDB is intentionally
2546						// best-effort here. The full fix requires returning
2547						// `Vec<String>` from `to_reverse_sql` so multiple
2548						// single-statement payloads can be dispatched — that
2549						// change is tracked separately so this PR can unblock
2550						// the rc.30 release without cascading into ~25 call
2551						// sites and tests. The stop-gap output shape of this
2552						// branch (single statement, `ALTER COLUMN ... TYPE`,
2553						// no nullability clause, no comma-combined form) is
2554						// pinned by the inline unit test
2555						// `test_to_reverse_sql_alter_column_cockroachdb` in
2556						// the `#[cfg(test)] mod tests` block of this file,
2557						// added under reinhardt-web#4639. An end-to-end
2558						// integration test against a live CockroachDB
2559						// testcontainer is blocked on reinhardt-web#4642
2560						// (`DatabaseMigrationExecutor` calls
2561						// `pg_advisory_lock()`, which CockroachDB does not
2562						// implement). The full `Vec<String>` API refactor
2563						// that restores NOT NULL rollback fidelity is
2564						// tracked in reinhardt-web#4640 and targets
2565						// `develop/0.2.0`. Refs #4630, #4639, #4640, #4642.
2566						format!(
2567							"ALTER TABLE {} ALTER COLUMN {} TYPE {};",
2568							quote_identifier(table),
2569							quote_identifier(column),
2570							type_sql
2571						)
2572					}
2573					SqlDialect::Mysql => format!(
2574						"ALTER TABLE {} MODIFY COLUMN {} {}{};",
2575						quote_identifier(table),
2576						quote_identifier(column),
2577						type_sql,
2578						null_clause
2579					),
2580					SqlDialect::Sqlite => format!(
2581						"-- SQLite does not support ALTER COLUMN, table recreation required for {}",
2582						quote_identifier(table)
2583					),
2584				};
2585				Ok(Some(sql))
2586			}
2587			Operation::DropIndex { table, columns } => {
2588				// Enhancement opportunity: Full index reconstruction would preserve
2589				// index_type, where_clause, operator_class, and other advanced properties.
2590				// The current implementation generates a basic CREATE INDEX statement.
2591				let columns_joined = columns.join("_");
2592				let index_name = format!("idx_{}_{}", table, columns_joined);
2593				let columns_list = columns
2594					.iter()
2595					.map(|c| quote_identifier(c).to_string())
2596					.collect::<Vec<_>>()
2597					.join(", ");
2598				Ok(Some(format!(
2599					"CREATE INDEX {} ON {} ({});",
2600					quote_identifier(&index_name),
2601					quote_identifier(table),
2602					columns_list
2603				)))
2604			}
2605			Operation::DropConstraint {
2606				table,
2607				constraint_name,
2608			} => {
2609				// Retrieve constraint definition from ProjectState
2610				if let Some(model) = project_state.find_model_by_table(table)
2611					&& let Some(constraint_def) = model
2612						.constraints
2613						.iter()
2614						.find(|c| c.name == *constraint_name)
2615				{
2616					let constraint = constraint_def.to_constraint();
2617					return Ok(Some(format!(
2618						"ALTER TABLE {} ADD {};",
2619						quote_identifier(table),
2620						constraint
2621					)));
2622				}
2623				// Cannot reconstruct without state
2624				Ok(None)
2625			}
2626			Operation::DropTable { name } => {
2627				// Retrieve table definition from ProjectState and reconstruct CREATE TABLE
2628				if let Some(model) = project_state.find_model_by_table(name) {
2629					let mut parts = Vec::new();
2630
2631					// Convert fields to column definitions
2632					for (field_name, field) in &model.fields {
2633						let col_def = ColumnDefinition::from_field_state(field_name.clone(), field);
2634						parts.push(format!("  {}", Self::column_to_sql(&col_def, dialect)));
2635					}
2636
2637					// Add constraints
2638					for constraint_def in &model.constraints {
2639						let constraint = constraint_def.to_constraint();
2640						parts.push(format!("  {}", constraint));
2641					}
2642
2643					return Ok(Some(format!(
2644						"CREATE TABLE {} (\n{}\n);",
2645						quote_identifier(name),
2646						parts.join(",\n")
2647					)));
2648				}
2649				// Cannot reconstruct without state
2650				Ok(None)
2651			}
2652			Operation::BulkLoad { table, .. } => {
2653				// Reverse of bulk load is to truncate the table (remove loaded data)
2654				// Note: This removes ALL data, not just the data loaded by this operation
2655				Ok(Some(format!("TRUNCATE TABLE {};", quote_identifier(table))))
2656			}
2657			_ => Ok(None),
2658		}
2659	}
2660
2661	/// Apply operation to project state (backward/reverse)
2662	///
2663	/// This method updates the ProjectState to reflect the reverse of this operation.
2664	/// Used during migration rollback to track state changes.
2665	///
2666	/// # Arguments
2667	///
2668	/// * `app_label` - Application label for the model being modified
2669	/// * `state` - Mutable reference to the ProjectState to update
2670	///
2671	/// # Limitations
2672	///
2673	/// Some operations cannot fully reverse state without additional snapshot information:
2674	/// - `DropTable`: Cannot recreate model structure (columns, constraints) without snapshot
2675	/// - `DropColumn`: Cannot recreate column definition without snapshot
2676	/// - `AlterColumn`: Cannot restore original column definition without snapshot
2677	///
2678	/// For these operations, use `to_reverse_sql` with ProjectState before the operation
2679	/// is applied to generate proper reverse SQL.
2680	pub fn state_backwards(&self, app_label: &str, state: &mut ProjectState) {
2681		match self {
2682			Operation::CreateTable { name, .. } => {
2683				// Reverse: Remove the model from state
2684				state
2685					.models
2686					.remove(&(app_label.to_string(), name.to_string()));
2687			}
2688			Operation::DropTable { name: _ } => {
2689				// Cannot reconstruct ModelState without snapshot.
2690				// For proper rollback, use to_reverse_sql with pre-operation ProjectState.
2691			}
2692			Operation::RenameTable { old_name, new_name } => {
2693				// Reverse: Rename back from new_name to old_name
2694				if let Some(mut model) = state
2695					.models
2696					.remove(&(app_label.to_string(), new_name.to_string()))
2697				{
2698					model.table_name = old_name.to_string();
2699					state
2700						.models
2701						.insert((app_label.to_string(), old_name.to_string()), model);
2702				}
2703			}
2704			Operation::AddColumn { table, column, .. } => {
2705				// Reverse: Remove the column from the model
2706				if let Some(model) = state.find_model_by_table_mut(table) {
2707					model.remove_field(&column.name);
2708				}
2709			}
2710			Operation::DropColumn {
2711				table: _,
2712				column: _,
2713			} => {
2714				// Cannot reconstruct column definition without snapshot.
2715				// For proper rollback, use to_reverse_sql with pre-operation ProjectState.
2716			}
2717			Operation::AlterColumn {
2718				table: _,
2719				column: _,
2720				..
2721			} => {
2722				// Cannot restore original column definition without snapshot.
2723				// For proper rollback, use to_reverse_sql with pre-operation ProjectState.
2724			}
2725			Operation::RenameColumn {
2726				table,
2727				old_name,
2728				new_name,
2729			} => {
2730				// Reverse: Rename field back from new_name to old_name
2731				if let Some(model) = state.find_model_by_table_mut(table) {
2732					model.rename_field(new_name, old_name.to_string());
2733				}
2734			}
2735			Operation::AddConstraint { table, .. } => {
2736				// Reverse: Would need to remove the constraint
2737				// This requires parsing constraint_sql to get the name
2738				if let Some(model) = state.find_model_by_table_mut(table) {
2739					// Cannot reliably remove without constraint name extraction
2740					// Constraints vector remains unchanged
2741					let _ = model;
2742				}
2743			}
2744			Operation::DropConstraint {
2745				table: _,
2746				constraint_name: _,
2747			} => {
2748				// Cannot reconstruct constraint definition without snapshot.
2749				// For proper rollback, use to_reverse_sql with pre-operation ProjectState.
2750			}
2751			_ => {
2752				// Other operations don't affect schema state
2753			}
2754		}
2755	}
2756
2757	/// Extract constraint name from constraint SQL
2758	///
2759	/// Supports patterns:
2760	/// - "CONSTRAINT name CHECK ..."
2761	/// - "ADD CONSTRAINT name ..."
2762	fn extract_constraint_name(constraint_sql: &str) -> Option<String> {
2763		let sql = constraint_sql.trim();
2764
2765		// Pattern 1: "CONSTRAINT name ..."
2766		if sql.starts_with("CONSTRAINT ") || sql.contains(" CONSTRAINT ") {
2767			let parts: Vec<&str> = sql.split_whitespace().collect();
2768			if let Some(pos) = parts.iter().position(|&s| s == "CONSTRAINT")
2769				&& pos + 1 < parts.len()
2770			{
2771				return Some(parts[pos + 1].to_string());
2772			}
2773		}
2774
2775		None
2776	}
2777}
2778
2779/// Column definition for legacy operations
2780#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
2781pub struct ColumnDefinition {
2782	/// The name.
2783	pub name: String,
2784	/// The type definition.
2785	pub type_definition: FieldType,
2786	#[serde(default)]
2787	/// The not null.
2788	pub not_null: bool,
2789	#[serde(default)]
2790	/// The unique.
2791	pub unique: bool,
2792	#[serde(default)]
2793	/// The primary key.
2794	pub primary_key: bool,
2795	#[serde(default)]
2796	/// The auto increment.
2797	pub auto_increment: bool,
2798	#[serde(default)]
2799	/// The default.
2800	pub default: Option<String>,
2801}
2802
2803impl ColumnDefinition {
2804	/// Create a new column definition
2805	pub fn new(name: impl Into<String>, type_def: FieldType) -> Self {
2806		Self {
2807			name: name.into(),
2808			type_definition: type_def,
2809			not_null: false,
2810			unique: false,
2811			primary_key: false,
2812			auto_increment: false,
2813			default: None,
2814		}
2815	}
2816
2817	/// Create a ColumnDefinition from FieldState with attribute parsing
2818	///
2819	/// Reads boolean attributes (primary_key, unique, auto_increment) and the
2820	/// default expression from `FieldState.params`, and derives the NOT NULL
2821	/// constraint from `FieldState.nullable` (the single source of truth
2822	/// populated by `FieldMetadata::is_nullable()` in
2823	/// `ModelMetadata::to_model_state()`).
2824	///
2825	/// # Arguments
2826	///
2827	/// * `name` - Column name
2828	/// * `field_state` - FieldState containing field metadata and params
2829	///
2830	/// # Notes
2831	///
2832	/// - If `primary_key` is true, `not_null` is forced to true regardless of
2833	///   `FieldState.nullable` — primary keys cannot accept NULL.
2834	/// - Default values are false/None for unspecified attributes
2835	pub fn from_field_state(name: impl Into<String>, field_state: &FieldState) -> Self {
2836		let name_str = name.into();
2837		let params = &field_state.params;
2838
2839		// Parse attributes from params HashMap
2840		let primary_key = params
2841			.get("primary_key")
2842			.and_then(|v| v.parse::<bool>().ok())
2843			.unwrap_or(false);
2844
2845		// Derive `not_null` from FieldState's nullability field (single source
2846		// of truth set in `ModelMetadata::to_model_state()` from
2847		// `FieldMetadata::is_nullable()`). Primary keys are always NOT NULL
2848		// regardless of the nullability flag.
2849		//
2850		// Fixes #4573: the previous implementation derived `not_null` from a
2851		// `params["not_null"]` key, which the `#[model]` proc-macro only
2852		// emits conditionally (when its `is_not_null` calculation returns
2853		// `true`). The same macro also always emits `params["null"]`, which
2854		// `is_nullable()` reads into `field_state.nullable`. Keeping the two
2855		// keys as parallel sources of truth for nullability was the root
2856		// cause of the drift: any code path that bypassed or mis-routed the
2857		// `not_null` emission (e.g., offline state reconstruction, future
2858		// macro refactors, or hand-built `FieldState` values in tests) would
2859		// produce NULLABLE columns for non-Optional fields. Consolidating on
2860		// `field_state.nullable` removes that class of regression.
2861		let not_null = !field_state.nullable || primary_key;
2862
2863		let unique = params
2864			.get("unique")
2865			.and_then(|v| v.parse::<bool>().ok())
2866			.unwrap_or(false);
2867
2868		let auto_increment = params
2869			.get("auto_increment")
2870			.and_then(|v| v.parse::<bool>().ok())
2871			.unwrap_or(false);
2872
2873		let default = params.get("default").cloned();
2874
2875		// Resolve ForeignKey column type from the referenced model's primary
2876		// key in the global `ModelRegistry`. This addresses the macro-level
2877		// limitation that the target model's PK type is not knowable at
2878		// macro-expansion time (see issue #4430). The macro emits a
2879		// placeholder `FieldType::Uuid` for `ForeignKeyField<T>` `_id`
2880		// columns and tags the field with the `fk_target` parameter; here
2881		// we look up the referenced model and adopt its PK column type.
2882		//
2883		// If the lookup fails (e.g., the target model has not been
2884		// registered yet), we fall back to the placeholder field type so
2885		// existing behavior is preserved and the caller can surface a
2886		// downstream error rather than crash here.
2887		let type_definition = resolve_foreign_key_column_type(field_state)
2888			.unwrap_or_else(|| field_state.field_type.clone());
2889
2890		Self {
2891			name: name_str,
2892			type_definition,
2893			not_null,
2894			unique,
2895			primary_key,
2896			auto_increment,
2897			default,
2898		}
2899	}
2900}
2901
2902/// Resolve the column type of a `ForeignKeyField<T>` `_id` column by
2903/// looking up the target model's primary key in the global
2904/// `ModelRegistry`. Returns `None` if `field_state` is not tagged as a
2905/// foreign key column or if the target model / its PK cannot be
2906/// resolved.
2907///
2908/// This indirection exists because the `#[model]` macro cannot resolve
2909/// the target model's PK type at macro-expansion time (the registry is
2910/// populated at process startup via `#[ctor::ctor]`). See issue #4430.
2911///
2912/// # Lookup Strategy
2913///
2914/// The resolver coordinates two lookup paths against the
2915/// `ModelRegistry`:
2916///
2917/// 1. **Qualified `(fk_target_app, fk_target)` lookup.** The
2918///    `#[model]` macro emits `fk_target_app` for every
2919///    `ForeignKeyField<T>` field by reading the target type's *own*
2920///    `<T as Model>::app_label()` at registration time. That value is
2921///    authoritative — it respects `#[app_label = "..."]` overrides
2922///    and matches whatever key the target was registered under,
2923///    regardless of how the user spelled the type (bare ident,
2924///    `use`-imported ident, absolute path, or crate-relative path).
2925///    The qualified lookup is therefore trusted as the primary
2926///    resolution path.
2927/// 2. **By-name lookup.** Used as a defensive fallback for cases
2928///    where `fk_target_app` is absent (e.g. manually-constructed
2929///    `FieldState` outside the macro path) or the qualified lookup
2930///    misses (e.g. the target model isn't registered yet during
2931///    partial registry population at startup). The by-name lookup
2932///    returns `Some` only when *exactly one* model is registered
2933///    under the name; on ambiguity it returns `None`.
2934///
2935/// When both paths return `None` and the name is ambiguous across two
2936/// or more apps (`ModelRegistry::count_models_by_name > 1`), the
2937/// resolver emits a `tracing::warn!` so operators see a targeted
2938/// diagnostic. A genuinely missing name returns `None` silently —
2939/// that case is normal during partial registry population at startup.
2940///
2941/// See issue #4436 and PR #4440 review threads on `model_derive.rs`
2942/// line 2863 and `operations.rs` line 2836.
2943fn resolve_foreign_key_column_type(field_state: &FieldState) -> Option<FieldType> {
2944	resolve_foreign_key_column_type_with(field_state, super::model_registry::global_registry())
2945}
2946
2947/// Registry-injected variant of [`resolve_foreign_key_column_type`].
2948///
2949/// Exists so unit tests can exercise the qualified-hit / by-name
2950/// fallback / ambiguous-miss branches against a local
2951/// [`super::model_registry::ModelRegistry`] without touching global
2952/// state. Production code paths go through
2953/// [`resolve_foreign_key_column_type`].
2954fn resolve_foreign_key_column_type_with(
2955	field_state: &FieldState,
2956	registry: &super::model_registry::ModelRegistry,
2957) -> Option<FieldType> {
2958	let target_model = field_state.params.get("fk_target")?;
2959	// `fk_target_app` is sourced from the target type's own
2960	// `Model::app_label()` (see `model_derive.rs`), so the qualified
2961	// lookup is authoritative. The by-name fallback is defensive: it
2962	// covers manually-constructed `FieldState`s and partial-registry
2963	// init races where the target isn't registered yet.
2964	let target = match field_state.params.get("fk_target_app") {
2965		Some(app) => registry
2966			.find_model_qualified(app, target_model)
2967			.or_else(|| registry.find_model_by_name(target_model)),
2968		None => registry.find_model_by_name(target_model),
2969	};
2970	let target = match target {
2971		Some(t) => t,
2972		None => {
2973			// `find_model_by_name` returns `None` for both "missing"
2974			// and "ambiguous". Warn only on ambiguity so operators see
2975			// a targeted message; silent on genuinely missing targets
2976			// (normal during partial registry population at startup).
2977			if registry.count_models_by_name(target_model) > 1 {
2978				tracing::warn!(
2979					model_name = %target_model,
2980					fk_target_app = ?field_state.params.get("fk_target_app"),
2981					"FK target name is ambiguous across apps and the qualified \
2982					 lookup did not resolve a unique target. Refusing to resolve \
2983					 to avoid silent wrong-target resolution. Ensure the FK \
2984					 target type is registered and that its `Model::app_label()` \
2985					 matches one of the registered apps.",
2986				);
2987			}
2988			return None;
2989		}
2990	};
2991	// Find the primary key field of the target model.
2992	let pk_field = target
2993		.fields
2994		.values()
2995		.find(|f| f.params.get("primary_key").map(String::as_str) == Some("true"))?;
2996	Some(pk_field.field_type.clone())
2997}
2998
2999/// Convert a field type string (e.g., "reinhardt.orm.models.CharField") to FieldType.
3000///
3001/// This function parses the field type path generated by the `#[model(...)]` macro
3002/// and converts it to the corresponding `FieldType` enum variant.
3003///
3004/// # Arguments
3005///
3006/// * `field_type` - The field type path string (e.g., "reinhardt.orm.models.CharField")
3007/// * `attributes` - Field attributes containing parameters like max_length, max_digits, etc.
3008///
3009/// # Returns
3010///
3011/// * `Ok(FieldType)` - The converted FieldType
3012/// * `Err(String)` - Error message if the field type is unsupported
3013///
3014/// # Examples
3015///
3016/// ```rust,ignore
3017/// use reinhardt_db::migrations::operations::field_type_string_to_field_type;
3018/// use std::collections::HashMap;
3019///
3020/// let mut attrs = HashMap::new();
3021/// attrs.insert("max_length".to_string(), "100".to_string());
3022///
3023/// let field_type = field_type_string_to_field_type("reinhardt.orm.models.CharField", &attrs);
3024/// assert!(field_type.is_ok());
3025/// ```
3026pub fn field_type_string_to_field_type(
3027	field_type: &str,
3028	attributes: &std::collections::HashMap<String, String>,
3029) -> Result<FieldType, String> {
3030	// Extract the type name from the full path
3031	let type_name = field_type.split('.').next_back().unwrap_or(field_type);
3032
3033	match type_name {
3034		// Integer types
3035		"IntegerField"
3036		| "PositiveIntegerField"
3037		| "SmallIntegerField"
3038		| "PositiveSmallIntegerField" => Ok(FieldType::Integer),
3039		"BigIntegerField" | "PositiveBigIntegerField" => Ok(FieldType::BigInteger),
3040		"AutoField" => Ok(FieldType::Integer),
3041		"BigAutoField" => Ok(FieldType::BigInteger),
3042		"SmallAutoField" => Ok(FieldType::SmallInteger),
3043
3044		// String types
3045		"CharField" => {
3046			let max_length = attributes
3047				.get("max_length")
3048				.and_then(|v| v.parse::<u32>().ok())
3049				.ok_or_else(|| "CharField requires max_length attribute".to_string())?;
3050			Ok(FieldType::VarChar(max_length))
3051		}
3052		"TextField" => Ok(FieldType::Text),
3053		"SlugField" => {
3054			let max_length = attributes
3055				.get("max_length")
3056				.and_then(|v| v.parse::<u32>().ok())
3057				.unwrap_or(50);
3058			Ok(FieldType::VarChar(max_length))
3059		}
3060		"EmailField" => {
3061			let max_length = attributes
3062				.get("max_length")
3063				.and_then(|v| v.parse::<u32>().ok())
3064				.unwrap_or(254);
3065			Ok(FieldType::VarChar(max_length))
3066		}
3067		"URLField" => {
3068			let max_length = attributes
3069				.get("max_length")
3070				.and_then(|v| v.parse::<u32>().ok())
3071				.unwrap_or(200);
3072			Ok(FieldType::VarChar(max_length))
3073		}
3074
3075		// Boolean type
3076		"BooleanField" => Ok(FieldType::Boolean),
3077		"NullBooleanField" => Ok(FieldType::Boolean),
3078
3079		// Date/time types
3080		"DateField" => Ok(FieldType::Date),
3081		"TimeField" => Ok(FieldType::Time),
3082		"DateTimeField" => Ok(FieldType::DateTime),
3083		"DurationField" => Ok(FieldType::BigInteger), // Stored as microseconds
3084
3085		// Numeric types
3086		"FloatField" => Ok(FieldType::Float),
3087		"DecimalField" => {
3088			let precision = attributes
3089				.get("max_digits")
3090				.and_then(|v| v.parse::<u32>().ok())
3091				.unwrap_or(10);
3092			let scale = attributes
3093				.get("decimal_places")
3094				.and_then(|v| v.parse::<u32>().ok())
3095				.unwrap_or(2);
3096			Ok(FieldType::Decimal { precision, scale })
3097		}
3098
3099		// Binary types
3100		"BinaryField" => Ok(FieldType::Binary),
3101
3102		// UUID type
3103		"UUIDField" => Ok(FieldType::Uuid),
3104
3105		// JSON types
3106		"JSONField" => Ok(FieldType::Json),
3107
3108		// File fields (stored as path strings)
3109		"FileField" | "ImageField" => {
3110			let max_length = attributes
3111				.get("max_length")
3112				.and_then(|v| v.parse::<u32>().ok())
3113				.unwrap_or(100);
3114			Ok(FieldType::VarChar(max_length))
3115		}
3116
3117		// IP Address fields
3118		"GenericIPAddressField" | "IPAddressField" => {
3119			// PostgreSQL uses INET, others use VARCHAR
3120			Ok(FieldType::VarChar(39)) // Max length for IPv6
3121		}
3122
3123		// Relationship fields (stored as foreign key reference)
3124		"ForeignKey" => {
3125			// ForeignKey is typically stored as integer ID
3126			Ok(FieldType::BigInteger)
3127		}
3128		"OneToOneField" => Ok(FieldType::BigInteger),
3129
3130		// Unknown type
3131		other => Err(format!("Unsupported field type: {}", other)),
3132	}
3133}
3134
3135/// SQL dialect for generating database-specific SQL
3136#[derive(Debug, Clone, Copy)]
3137pub enum SqlDialect {
3138	/// Sqlite variant.
3139	Sqlite,
3140	/// Postgres variant.
3141	Postgres,
3142	/// Mysql variant.
3143	Mysql,
3144	/// Cockroachdb variant.
3145	Cockroachdb,
3146}
3147
3148// ============================================================================
3149// SQLite Table Recreation Support
3150// ============================================================================
3151
3152/// Represents a SQLite table recreation operation
3153///
3154/// SQLite has limited ALTER TABLE support - operations like DROP COLUMN,
3155/// ALTER COLUMN TYPE, and constraint modifications require recreating the table.
3156///
3157/// This struct generates the 4-step SQL pattern:
3158/// 1. CREATE TABLE temp_table (with new schema)
3159/// 2. INSERT INTO temp_table SELECT columns FROM old_table
3160/// 3. DROP TABLE old_table
3161/// 4. ALTER TABLE temp_table RENAME TO old_table
3162///
3163/// This type is integrated into `DatabaseMigrationExecutor` which automatically
3164/// detects SQLite operations requiring recreation and applies the 4-step process
3165/// within the migration's transaction context.
3166#[derive(Debug, Clone)]
3167pub struct SqliteTableRecreation {
3168	/// Original table name
3169	pub table_name: String,
3170	/// New column definitions (after modification)
3171	pub new_columns: Vec<ColumnDefinition>,
3172	/// Columns to copy from old table (in order matching new_columns)
3173	pub columns_to_copy: Vec<String>,
3174	/// Constraints for the new table (parsed from introspection)
3175	pub constraints: Vec<Constraint>,
3176	/// Raw constraint SQL strings (for AddConstraint operations)
3177	pub raw_constraint_sqls: Vec<String>,
3178	/// WITHOUT ROWID option
3179	pub without_rowid: bool,
3180}
3181
3182impl SqliteTableRecreation {
3183	/// Create a new table recreation for dropping a column
3184	pub fn for_drop_column(
3185		table_name: impl Into<String>,
3186		current_columns: Vec<ColumnDefinition>,
3187		column_to_drop: &str,
3188		current_constraints: Vec<Constraint>,
3189	) -> Self {
3190		let table_name = table_name.into();
3191		let new_columns: Vec<_> = current_columns
3192			.into_iter()
3193			.filter(|c| c.name != column_to_drop)
3194			.collect();
3195		let columns_to_copy: Vec<_> = new_columns.iter().map(|c| c.name.to_string()).collect();
3196
3197		// Filter out constraints that reference the dropped column
3198		let constraints: Vec<_> = current_constraints
3199			.into_iter()
3200			.filter(|c| !Self::constraint_references_column(c, column_to_drop))
3201			.collect();
3202
3203		Self {
3204			table_name,
3205			new_columns,
3206			columns_to_copy,
3207			constraints,
3208			raw_constraint_sqls: Vec::new(),
3209			without_rowid: false,
3210		}
3211	}
3212
3213	/// Create a new table recreation for altering a column type
3214	pub fn for_alter_column(
3215		table_name: impl Into<String>,
3216		current_columns: Vec<ColumnDefinition>,
3217		column_name: &str,
3218		new_definition: ColumnDefinition,
3219		current_constraints: Vec<Constraint>,
3220	) -> Self {
3221		let table_name = table_name.into();
3222		let new_columns: Vec<_> = current_columns
3223			.into_iter()
3224			.map(|c| {
3225				if c.name == column_name {
3226					new_definition.clone()
3227				} else {
3228					c
3229				}
3230			})
3231			.collect();
3232		let columns_to_copy: Vec<_> = new_columns.iter().map(|c| c.name.to_string()).collect();
3233
3234		Self {
3235			table_name,
3236			new_columns,
3237			columns_to_copy,
3238			constraints: current_constraints,
3239			raw_constraint_sqls: Vec::new(),
3240			without_rowid: false,
3241		}
3242	}
3243
3244	/// Create a new table recreation for adding a constraint
3245	///
3246	/// Since SQLite doesn't support `ALTER TABLE ADD CONSTRAINT`, we need to
3247	/// recreate the table with the new constraint included.
3248	pub fn for_add_constraint(
3249		table_name: impl Into<String>,
3250		current_columns: Vec<ColumnDefinition>,
3251		current_constraints: Vec<Constraint>,
3252		constraint_sql: String,
3253	) -> Self {
3254		let table_name = table_name.into();
3255		let columns_to_copy: Vec<_> = current_columns.iter().map(|c| c.name.to_string()).collect();
3256
3257		Self {
3258			table_name,
3259			new_columns: current_columns,
3260			columns_to_copy,
3261			constraints: current_constraints,
3262			raw_constraint_sqls: vec![constraint_sql],
3263			without_rowid: false,
3264		}
3265	}
3266
3267	/// Create a new table recreation for dropping a constraint
3268	///
3269	/// Since SQLite doesn't support `ALTER TABLE DROP CONSTRAINT`, we need to
3270	/// recreate the table without the specified constraint.
3271	pub fn for_drop_constraint(
3272		table_name: impl Into<String>,
3273		current_columns: Vec<ColumnDefinition>,
3274		current_constraints: Vec<Constraint>,
3275		constraint_name: &str,
3276	) -> Self {
3277		let table_name = table_name.into();
3278		let columns_to_copy: Vec<_> = current_columns.iter().map(|c| c.name.to_string()).collect();
3279
3280		// Filter out the constraint by name
3281		let constraints: Vec<_> = current_constraints
3282			.into_iter()
3283			.filter(|c| !Self::constraint_has_name(c, constraint_name))
3284			.collect();
3285
3286		Self {
3287			table_name,
3288			new_columns: current_columns,
3289			columns_to_copy,
3290			constraints,
3291			raw_constraint_sqls: Vec::new(),
3292			without_rowid: false,
3293		}
3294	}
3295
3296	/// Generate the 4-step SQL statements for table recreation
3297	pub fn to_sql_statements(&self) -> Vec<String> {
3298		let temp_table = format!("{}_new", self.table_name);
3299
3300		// Step 1: CREATE TABLE with new schema
3301		let column_defs: Vec<String> = self
3302			.new_columns
3303			.iter()
3304			.map(|c| Operation::column_to_sql(c, &SqlDialect::Sqlite))
3305			.collect();
3306
3307		let constraint_defs: Vec<String> = self.constraints.iter().map(|c| c.to_string()).collect();
3308
3309		let mut create_parts = column_defs;
3310		create_parts.extend(constraint_defs);
3311		// Include raw constraint SQLs (from AddConstraint operations)
3312		create_parts.extend(self.raw_constraint_sqls.clone());
3313
3314		let mut create_sql = format!(
3315			"CREATE TABLE \"{}\" (\n  {}\n)",
3316			temp_table,
3317			create_parts.join(",\n  ")
3318		);
3319		if self.without_rowid {
3320			create_sql.push_str(" WITHOUT ROWID");
3321		}
3322		create_sql.push(';');
3323
3324		// Step 2: Copy data
3325		let columns_list = self
3326			.columns_to_copy
3327			.iter()
3328			.map(|c| format!("\"{}\"", c))
3329			.collect::<Vec<_>>()
3330			.join(", ");
3331		let insert_sql = format!(
3332			"INSERT INTO \"{}\" SELECT {} FROM \"{}\";",
3333			temp_table, columns_list, self.table_name
3334		);
3335
3336		// Step 3: Drop old table
3337		let drop_sql = format!("DROP TABLE \"{}\";", self.table_name);
3338
3339		// Step 4: Rename new table
3340		let rename_sql = format!(
3341			"ALTER TABLE \"{}\" RENAME TO \"{}\";",
3342			temp_table, self.table_name
3343		);
3344
3345		vec![create_sql, insert_sql, drop_sql, rename_sql]
3346	}
3347
3348	/// Check if a constraint references a specific column
3349	fn constraint_references_column(constraint: &Constraint, column_name: &str) -> bool {
3350		match constraint {
3351			Constraint::PrimaryKey { columns, .. } => columns.iter().any(|c| c == column_name),
3352			Constraint::ForeignKey { columns, .. } => columns.iter().any(|c| c == column_name),
3353			Constraint::Unique { columns, .. } => columns.iter().any(|c| c == column_name),
3354			Constraint::Check { expression, .. } => expression.contains(column_name),
3355			Constraint::OneToOne { column, .. } => column == column_name,
3356			Constraint::ManyToMany { source_column, .. } => source_column == column_name,
3357			Constraint::Exclude { elements, .. } => {
3358				elements.iter().any(|(col, _)| col == column_name)
3359			}
3360		}
3361	}
3362
3363	/// Check if a constraint has the specified name
3364	fn constraint_has_name(constraint: &Constraint, constraint_name: &str) -> bool {
3365		match constraint {
3366			Constraint::PrimaryKey { name, .. } => name == constraint_name,
3367			Constraint::ForeignKey { name, .. } => name == constraint_name,
3368			Constraint::Unique { name, .. } => name == constraint_name,
3369			Constraint::Check { name, .. } => name == constraint_name,
3370			Constraint::OneToOne { name, .. } => name == constraint_name,
3371			Constraint::ManyToMany { name, .. } => name == constraint_name,
3372			Constraint::Exclude { name, .. } => name == constraint_name,
3373		}
3374	}
3375}
3376
3377impl Operation {
3378	/// Check if this operation requires SQLite table recreation
3379	pub fn requires_sqlite_recreation(&self) -> bool {
3380		matches!(
3381			self,
3382			Operation::DropColumn { .. }
3383				| Operation::AlterColumn { .. }
3384				| Operation::AddConstraint { .. }
3385				| Operation::DropConstraint { .. }
3386		)
3387	}
3388
3389	/// Check if the reverse of this operation requires SQLite table recreation
3390	///
3391	/// When rolling back a migration on SQLite, some reverse operations also require
3392	/// table recreation. This method identifies those cases.
3393	///
3394	/// | Forward Operation | Reverse Operation | Requires Recreation |
3395	/// |-------------------|-------------------|---------------------|
3396	/// | AddColumn         | DropColumn        | Yes                 |
3397	/// | AlterColumn       | AlterColumn       | Yes                 |
3398	/// | AddConstraint     | DropConstraint    | Yes                 |
3399	/// | DropConstraint    | AddConstraint     | Yes                 |
3400	pub fn reverse_requires_sqlite_recreation(&self) -> bool {
3401		matches!(
3402			self,
3403			// AddColumn → Reverse DropColumn (requires recreation)
3404			Operation::AddColumn { .. }
3405				// AlterColumn → Reverse AlterColumn (requires recreation)
3406				| Operation::AlterColumn { .. }
3407				// AddConstraint → Reverse DropConstraint (requires recreation)
3408				| Operation::AddConstraint { .. }
3409				// DropConstraint → Reverse AddConstraint (requires recreation)
3410				| Operation::DropConstraint { .. }
3411		)
3412	}
3413
3414	/// Generate the reverse operation (for rollback on SQLite)
3415	///
3416	/// This method returns the conceptual reverse `Operation`, which can be used
3417	/// with `handle_sqlite_recreation()` for databases that don't support direct
3418	/// ALTER TABLE operations.
3419	///
3420	/// # Arguments
3421	///
3422	/// * `project_state` - Project state for accessing model definitions
3423	///
3424	/// # Returns
3425	///
3426	/// * `Ok(Some(op))` - Reverse operation generated successfully
3427	/// * `Ok(None)` - Operation is not reversible or state information is missing
3428	/// * `Err(_)` - Error generating reverse operation
3429	pub fn to_reverse_operation(
3430		&self,
3431		project_state: &ProjectState,
3432	) -> super::Result<Option<Operation>> {
3433		match self {
3434			Operation::CreateTable { name, .. } => {
3435				Ok(Some(Operation::DropTable { name: name.clone() }))
3436			}
3437			Operation::DropTable { name } => {
3438				// Reconstruct CreateTable from ProjectState
3439				if let Some(model) = project_state.find_model_by_table(name) {
3440					let columns: Vec<ColumnDefinition> = model
3441						.fields
3442						.iter()
3443						.map(|(field_name, field)| {
3444							ColumnDefinition::from_field_state(field_name.clone(), field)
3445						})
3446						.collect();
3447					let constraints: Vec<Constraint> = model
3448						.constraints
3449						.iter()
3450						.map(|c| c.to_constraint())
3451						.collect();
3452					return Ok(Some(Operation::CreateTable {
3453						name: name.clone(),
3454						columns,
3455						constraints,
3456						without_rowid: None,
3457						interleave_in_parent: None,
3458						partition: None,
3459					}));
3460				}
3461				Ok(None)
3462			}
3463			Operation::AddColumn { table, column, .. } => Ok(Some(Operation::DropColumn {
3464				table: table.clone(),
3465				column: column.name.clone(),
3466			})),
3467			Operation::DropColumn { table, column } => {
3468				// Reconstruct AddColumn from ProjectState
3469				if let Some(model) = project_state.find_model_by_table(table)
3470					&& let Some(field) = model.get_field(column)
3471				{
3472					let col_def = ColumnDefinition::from_field_state(column.clone(), field);
3473					return Ok(Some(Operation::AddColumn {
3474						table: table.clone(),
3475						column: col_def,
3476						mysql_options: None,
3477					}));
3478				}
3479				Ok(None)
3480			}
3481			Operation::AlterColumn {
3482				table,
3483				column,
3484				old_definition,
3485				new_definition: _,
3486				..
3487			} => {
3488				// Reconstruct AlterColumn with the original definition. Prefer the
3489				// explicit `old_definition` carried by the forward operation; fall
3490				// back to ProjectState lookup only if `old_definition` is absent.
3491				let resolved_old_def = old_definition.clone().or_else(|| {
3492					project_state
3493						.find_model_by_table(table)
3494						.and_then(|model| model.get_field(column))
3495						.map(|field| ColumnDefinition::from_field_state(column.clone(), field))
3496				});
3497
3498				if let Some(col_def) = resolved_old_def {
3499					return Ok(Some(Operation::AlterColumn {
3500						table: table.clone(),
3501						column: column.clone(),
3502						old_definition: None,
3503						new_definition: col_def,
3504						mysql_options: None,
3505					}));
3506				}
3507				Ok(None)
3508			}
3509			Operation::AddConstraint {
3510				table,
3511				constraint_sql,
3512			} => {
3513				// Extract constraint name to create DropConstraint
3514				if let Some(constraint_name) = Self::extract_constraint_name(constraint_sql) {
3515					return Ok(Some(Operation::DropConstraint {
3516						table: table.clone(),
3517						constraint_name,
3518					}));
3519				}
3520				Err(super::MigrationError::InvalidMigration(format!(
3521					"Cannot extract constraint name from: {}",
3522					constraint_sql
3523				)))
3524			}
3525			Operation::DropConstraint {
3526				table,
3527				constraint_name,
3528			} => {
3529				// Reconstruct AddConstraint from ProjectState
3530				if let Some(model) = project_state.find_model_by_table(table)
3531					&& let Some(constraint_def) = model
3532						.constraints
3533						.iter()
3534						.find(|c| c.name == *constraint_name)
3535				{
3536					let constraint = constraint_def.to_constraint();
3537					return Ok(Some(Operation::AddConstraint {
3538						table: table.clone(),
3539						constraint_sql: format!("{}", constraint),
3540					}));
3541				}
3542				Ok(None)
3543			}
3544			Operation::RenameTable { old_name, new_name } => Ok(Some(Operation::RenameTable {
3545				old_name: new_name.clone(),
3546				new_name: old_name.clone(),
3547			})),
3548			Operation::RenameColumn {
3549				table,
3550				old_name,
3551				new_name,
3552			} => Ok(Some(Operation::RenameColumn {
3553				table: table.clone(),
3554				old_name: new_name.clone(),
3555				new_name: old_name.clone(),
3556			})),
3557			Operation::CreateIndex { table, columns, .. } => Ok(Some(Operation::DropIndex {
3558				table: table.clone(),
3559				columns: columns.clone(),
3560			})),
3561			Operation::DropIndex { table, columns } => {
3562				// Basic index recreation (without advanced properties)
3563				// Note: Cannot determine if the original index was unique from DropIndex alone
3564				Ok(Some(Operation::CreateIndex {
3565					table: table.clone(),
3566					columns: columns.clone(),
3567					unique: false,
3568					index_type: None,
3569					where_clause: None,
3570					concurrently: false,
3571					expressions: None,
3572					mysql_options: None,
3573					operator_class: None,
3574				}))
3575			}
3576			// Operations that are not reversible as Operations
3577			Operation::RunSQL { .. } | Operation::RunRust { .. } | Operation::BulkLoad { .. } => {
3578				Ok(None)
3579			}
3580			// Other operations - not reversible via to_reverse_operation
3581			_ => Ok(None),
3582		}
3583	}
3584}
3585
3586// Re-export for convenience (legacy)
3587pub use Operation::{AddColumn, AlterColumn, CreateTable, DropColumn};
3588
3589/// Operation statement types (reinhardt-query or sanitized raw SQL)
3590pub enum OperationStatement {
3591	/// TableCreate variant.
3592	TableCreate(CreateTableStatement),
3593	/// TableDrop variant.
3594	TableDrop(DropTableStatement),
3595	/// TableAlter variant.
3596	TableAlter(AlterTableStatement),
3597	/// TableRename variant.
3598	TableRename(AlterTableStatement),
3599	/// IndexCreate variant.
3600	IndexCreate(CreateIndexStatement),
3601	/// IndexDrop variant.
3602	IndexDrop(DropIndexStatement),
3603	/// Sanitized raw SQL (identifiers escaped with pg_escape::quote_identifier)
3604	RawSql(String),
3605}
3606
3607impl OperationStatement {
3608	/// Execute the operation statement
3609	pub async fn execute<'c, E>(&self, executor: E) -> Result<(), sqlx::Error>
3610	where
3611		E: sqlx::Executor<'c, Database = sqlx::Postgres>,
3612	{
3613		use crate::backends::sql_build_helpers;
3614		use crate::backends::types::DatabaseType;
3615		let db_type = DatabaseType::Postgres;
3616		match self {
3617			OperationStatement::TableCreate(stmt) => {
3618				let sql = sql_build_helpers::build_create_table_sql(db_type, stmt);
3619				sqlx::query(&sql).execute(executor).await?;
3620			}
3621			OperationStatement::TableDrop(stmt) => {
3622				let sql = sql_build_helpers::build_drop_table_sql(db_type, stmt);
3623				sqlx::query(&sql).execute(executor).await?;
3624			}
3625			OperationStatement::TableAlter(stmt) => {
3626				let sql = sql_build_helpers::build_alter_table_sql(db_type, stmt);
3627				sqlx::query(&sql).execute(executor).await?;
3628			}
3629			OperationStatement::TableRename(stmt) => {
3630				let sql = sql_build_helpers::build_alter_table_sql(db_type, stmt);
3631				sqlx::query(&sql).execute(executor).await?;
3632			}
3633			OperationStatement::IndexCreate(stmt) => {
3634				let sql = sql_build_helpers::build_create_index_sql(db_type, stmt);
3635				sqlx::query(&sql).execute(executor).await?;
3636			}
3637			OperationStatement::IndexDrop(stmt) => {
3638				let sql = sql_build_helpers::build_drop_index_sql(db_type, stmt);
3639				sqlx::query(&sql).execute(executor).await?;
3640			}
3641			OperationStatement::RawSql(sql) => {
3642				// Already sanitized with pg_escape::quote_identifier
3643				sqlx::query(sql).execute(executor).await?;
3644			}
3645		}
3646		Ok(())
3647	}
3648
3649	/// Convert to SQL string for logging/debugging
3650	///
3651	/// # Arguments
3652	///
3653	/// * `db_type` - Database type to generate SQL for (PostgreSQL, MySQL, SQLite)
3654	pub fn to_sql_string(&self, db_type: crate::backends::types::DatabaseType) -> String {
3655		use crate::backends::sql_build_helpers;
3656
3657		match self {
3658			OperationStatement::TableCreate(stmt) => {
3659				sql_build_helpers::build_create_table_sql(db_type, stmt)
3660			}
3661			OperationStatement::TableDrop(stmt) => {
3662				sql_build_helpers::build_drop_table_sql(db_type, stmt)
3663			}
3664			OperationStatement::TableAlter(stmt) => {
3665				sql_build_helpers::build_alter_table_sql(db_type, stmt)
3666			}
3667			OperationStatement::TableRename(stmt) => {
3668				sql_build_helpers::build_alter_table_sql(db_type, stmt)
3669			}
3670			OperationStatement::IndexCreate(stmt) => {
3671				sql_build_helpers::build_create_index_sql(db_type, stmt)
3672			}
3673			OperationStatement::IndexDrop(stmt) => {
3674				sql_build_helpers::build_drop_index_sql(db_type, stmt)
3675			}
3676			OperationStatement::RawSql(sql) => sql.clone(),
3677		}
3678	}
3679}
3680
3681impl Operation {
3682	/// Convert Operation to reinhardt-query statement or sanitized raw SQL
3683	pub fn to_statement(&self) -> OperationStatement {
3684		match self {
3685			Operation::CreateTable {
3686				name,
3687				columns,
3688				constraints,
3689				..
3690			} => {
3691				OperationStatement::TableCreate(self.build_create_table(name, columns, constraints))
3692			}
3693			Operation::DropTable { name } => {
3694				OperationStatement::TableDrop(self.build_drop_table(name))
3695			}
3696			Operation::AddColumn { table, column, .. } => {
3697				OperationStatement::TableAlter(self.build_add_column(table, column))
3698			}
3699			Operation::DropColumn { table, column } => {
3700				OperationStatement::TableAlter(self.build_drop_column(table, column))
3701			}
3702			Operation::AlterColumn {
3703				table,
3704				column,
3705				new_definition,
3706				..
3707			} => OperationStatement::TableAlter(self.build_alter_column(
3708				table,
3709				column,
3710				new_definition,
3711			)),
3712			Operation::RenameTable { old_name, new_name } => {
3713				OperationStatement::TableRename(self.build_rename_table(old_name, new_name))
3714			}
3715			// reinhardt-query does not support RENAME COLUMN, use sanitized raw SQL
3716			Operation::RenameColumn {
3717				table,
3718				old_name,
3719				new_name,
3720			} => OperationStatement::RawSql(format!(
3721				"ALTER TABLE {} RENAME COLUMN {} TO {}",
3722				quote_identifier(table),
3723				quote_identifier(old_name),
3724				quote_identifier(new_name)
3725			)),
3726			Operation::AddConstraint {
3727				table,
3728				constraint_sql,
3729			} => {
3730				// NOTE: constraint_sql validation is the caller's responsibility
3731				OperationStatement::RawSql(format!(
3732					"ALTER TABLE {} ADD {}",
3733					quote_identifier(table),
3734					constraint_sql
3735				))
3736			}
3737			Operation::DropConstraint {
3738				table,
3739				constraint_name,
3740			} => OperationStatement::RawSql(format!(
3741				"ALTER TABLE {} DROP CONSTRAINT {}",
3742				quote_identifier(table),
3743				quote_identifier(constraint_name)
3744			)),
3745			Operation::CreateIndex {
3746				table,
3747				columns,
3748				unique,
3749				..
3750			} => {
3751				let idx_name = format!("idx_{}_{}", table, columns.join("_"));
3752				OperationStatement::IndexCreate(
3753					self.build_create_index(&idx_name, table, columns, *unique),
3754				)
3755			}
3756			Operation::DropIndex { table, columns } => {
3757				let idx_name = format!("idx_{}_{}", table, columns.join("_"));
3758				OperationStatement::IndexDrop(self.build_drop_index(&idx_name))
3759			}
3760			Operation::RunSQL { sql, .. } => OperationStatement::RawSql(sql.to_string()),
3761			Operation::RunRust { code, .. } => {
3762				// RunRust operations don't produce SQL
3763				OperationStatement::RawSql(format!(
3764					"-- RunRust: {}",
3765					code.lines().next().unwrap_or("")
3766				))
3767			}
3768			Operation::AlterTableComment { table, comment } => {
3769				// PostgreSQL-specific COMMENT ON TABLE
3770				OperationStatement::RawSql(if let Some(comment_text) = comment {
3771					format!(
3772						"COMMENT ON TABLE {} IS '{}'",
3773						quote_identifier(table),
3774						comment_text.replace('\'', "''") // Escape single quotes
3775					)
3776				} else {
3777					format!("COMMENT ON TABLE {} IS NULL", quote_identifier(table))
3778				})
3779			}
3780			Operation::AlterUniqueTogether {
3781				table,
3782				unique_together,
3783			} => {
3784				let mut sqls = Vec::new();
3785				for (idx, fields) in unique_together.iter().enumerate() {
3786					let constraint_name = format!("{}_{}_uniq", table, idx);
3787					let fields_str: Vec<String> = fields
3788						.iter()
3789						.map(|f| quote_identifier(f).to_string())
3790						.collect();
3791					sqls.push(format!(
3792						"ALTER TABLE {} ADD CONSTRAINT {} UNIQUE ({})",
3793						quote_identifier(table),
3794						quote_identifier(&constraint_name),
3795						fields_str.join(", ")
3796					));
3797				}
3798				OperationStatement::RawSql(sqls.join(";\n"))
3799			}
3800			Operation::AlterModelOptions { .. } => OperationStatement::RawSql(String::new()),
3801			Operation::CreateInheritedTable {
3802				name,
3803				columns,
3804				base_table,
3805				join_column,
3806			} => {
3807				let mut stmt = Query::create_table();
3808				stmt.table(Alias::new(name.as_str())).if_not_exists();
3809
3810				// Add join column (foreign key to base table)
3811				let join_col = ColumnDef::new(Alias::new(join_column.as_str()));
3812				let join_col = join_col.integer();
3813				stmt.col(join_col);
3814
3815				// Add other columns
3816				for col in columns {
3817					let mut column = ColumnDef::new(Alias::new(col.name.as_str()));
3818					column = self.apply_column_type(column, &col.type_definition);
3819					stmt.col(column);
3820				}
3821
3822				// Add foreign key
3823				let mut fk = reinhardt_query::prelude::ForeignKey::create();
3824				fk.from_tbl(Alias::new(name.as_str()))
3825					.from_col(Alias::new(join_column.as_str()))
3826					.to_tbl(Alias::new(base_table.as_str()))
3827					.to_col(Alias::new("id"));
3828				stmt.foreign_key_from_builder(&mut fk);
3829
3830				OperationStatement::TableCreate(stmt.to_owned())
3831			}
3832			Operation::AddDiscriminatorColumn {
3833				table,
3834				column_name,
3835				default_value,
3836			} => {
3837				let mut stmt = Query::alter_table();
3838				stmt.table(Alias::new(table.as_str()));
3839
3840				let mut col = ColumnDef::new(Alias::new(column_name.as_str()));
3841				col = col
3842					.string_len(50)
3843					.default(SimpleExpr::from(default_value.to_string()));
3844				stmt.add_column(col);
3845
3846				OperationStatement::TableAlter(stmt.to_owned())
3847			}
3848			Operation::MoveModel {
3849				rename_table,
3850				old_table_name,
3851				new_table_name,
3852				..
3853			} => {
3854				// MoveModel generates a table rename if table name changes
3855				if *rename_table {
3856					if let (Some(old_name), Some(new_name)) = (old_table_name, new_table_name) {
3857						OperationStatement::TableRename(self.build_rename_table(old_name, new_name))
3858					} else {
3859						// No table rename needed
3860						OperationStatement::RawSql("-- MoveModel: State-only operation".to_string())
3861					}
3862				} else {
3863					// State-only operation, no SQL
3864					OperationStatement::RawSql("-- MoveModel: State-only operation".to_string())
3865				}
3866			}
3867			Operation::CreateSchema {
3868				name,
3869				if_not_exists,
3870			} => {
3871				// Use schema.rs helper (reinhardt-query doesn't support CREATE SCHEMA)
3872				let sql = if *if_not_exists {
3873					format!("CREATE SCHEMA IF NOT EXISTS {}", quote_identifier(name))
3874				} else {
3875					format!("CREATE SCHEMA {}", quote_identifier(name))
3876				};
3877				OperationStatement::RawSql(sql)
3878			}
3879			Operation::DropSchema {
3880				name,
3881				cascade,
3882				if_exists,
3883			} => {
3884				// Use schema.rs helper (reinhardt-query doesn't support DROP SCHEMA)
3885				let if_exists_clause = if *if_exists { " IF EXISTS" } else { "" };
3886				let cascade_clause = if *cascade { " CASCADE" } else { "" };
3887				let sql = format!(
3888					"DROP SCHEMA{} {}{}",
3889					if_exists_clause,
3890					quote_identifier(name),
3891					cascade_clause
3892				);
3893				OperationStatement::RawSql(sql)
3894			}
3895			Operation::CreateExtension {
3896				name,
3897				if_not_exists,
3898				schema,
3899			} => {
3900				// PostgreSQL-specific: Use extensions.rs helper
3901				let if_not_exists_clause = if *if_not_exists { " IF NOT EXISTS" } else { "" };
3902				let schema_clause = if let Some(s) = schema {
3903					format!(" SCHEMA {}", quote_identifier(s))
3904				} else {
3905					String::new()
3906				};
3907				let sql = format!(
3908					"CREATE EXTENSION{} {}{}",
3909					if_not_exists_clause,
3910					quote_identifier(name),
3911					schema_clause
3912				);
3913				OperationStatement::RawSql(sql)
3914			}
3915			Operation::BulkLoad {
3916				table,
3917				source,
3918				format,
3919				options,
3920			} => {
3921				// BulkLoad uses dialect-specific raw SQL
3922				// Default to PostgreSQL COPY FROM syntax for to_statement()
3923				OperationStatement::RawSql(Self::postgres_copy_from_sql(
3924					table, source, format, options,
3925				))
3926			}
3927			Operation::SetAutoIncrementValue { table, .. } => {
3928				// `to_statement` has no dialect context, but `SetAutoIncrementValue`
3929				// renders fundamentally different SQL per backend (PostgreSQL
3930				// `setval`, MySQL `ALTER TABLE AUTO_INCREMENT`, SQLite
3931				// `sqlite_sequence` upsert). Silently emitting PostgreSQL-only
3932				// SQL here would break MySQL/SQLite migrations.
3933				//
3934				// Emit guaranteed-fail SQL that aborts execution with a visible
3935				// diagnostic pointing callers at the dialect-aware `to_sql`
3936				// path. Converting the signature to
3937				// `Result<OperationStatement, MigrationError>` would cascade
3938				// through dozens of call sites.
3939				OperationStatement::RawSql(format!(
3940					"SELECT 1/0 AS \"SetAutoIncrementValue on {} requires dialect-aware rendering; call Operation::to_sql(&dialect) instead of to_statement()\";",
3941					table.replace('"', "\"\"")
3942				))
3943			}
3944			Operation::CreateCompositePrimaryKey {
3945				table,
3946				columns,
3947				constraint_name,
3948			} => OperationStatement::RawSql(Self::create_composite_pk_to_sql(
3949				table,
3950				columns,
3951				constraint_name.as_deref(),
3952			)),
3953		}
3954	}
3955
3956	/// Build CREATE TABLE statement
3957	fn build_create_table(
3958		&self,
3959		name: &str,
3960		columns: &[ColumnDefinition],
3961		constraints: &[Constraint],
3962	) -> CreateTableStatement {
3963		let mut stmt = Query::create_table();
3964		stmt.table(Alias::new(name)).if_not_exists();
3965
3966		for col in columns {
3967			let mut column = ColumnDef::new(Alias::new(col.name.as_str()));
3968			column = self.apply_column_type(column, &col.type_definition);
3969
3970			if col.not_null {
3971				column = column.not_null(true);
3972			}
3973			if col.unique {
3974				column = column.unique(true);
3975			}
3976			if col.primary_key {
3977				column = column.primary_key(true);
3978			}
3979			if col.auto_increment {
3980				column = column.auto_increment(true);
3981			}
3982			if let Some(default) = &col.default {
3983				column = column.default(SimpleExpr::from(self.convert_default_value(default)));
3984			}
3985
3986			stmt.col(column);
3987		}
3988
3989		// Add table-level constraints
3990		for constraint in constraints {
3991			match constraint {
3992				Constraint::PrimaryKey { columns, .. } => {
3993					let col_idens: Vec<Alias> =
3994						columns.iter().map(|c| Alias::new(c.as_str())).collect();
3995					stmt.primary_key(col_idens);
3996				}
3997				Constraint::ForeignKey {
3998					name,
3999					columns,
4000					referenced_table,
4001					referenced_columns,
4002					on_delete,
4003					on_update,
4004					..
4005				} => {
4006					let mut fk = reinhardt_query::prelude::ForeignKey::create();
4007					fk.name(Alias::new(name.as_str()))
4008						.from_tbl(Alias::new(name.as_str()))
4009						.to_tbl(Alias::new(referenced_table.as_str()));
4010
4011					for col in columns {
4012						fk.from_col(Alias::new(col.as_str()));
4013					}
4014					for col in referenced_columns {
4015						fk.to_col(Alias::new(col.as_str()));
4016					}
4017
4018					fk.on_delete((*on_delete).into());
4019					fk.on_update((*on_update).into());
4020
4021					stmt.foreign_key_from_builder(&mut fk);
4022				}
4023				Constraint::Unique { columns, .. } => {
4024					let col_idens: Vec<Alias> =
4025						columns.iter().map(|c| Alias::new(c.as_str())).collect();
4026					stmt.unique(col_idens);
4027				}
4028				Constraint::Check { name, expression } => {
4029					// Note: reinhardt-query doesn't have direct CHECK constraint support
4030					// This would need to be handled with raw SQL if needed
4031					let _ = (name, expression); // Suppress unused warnings
4032				}
4033				Constraint::OneToOne {
4034					name,
4035					column,
4036					referenced_table,
4037					referenced_column,
4038					on_delete,
4039					on_update,
4040					..
4041				} => {
4042					// OneToOne is ForeignKey + Unique
4043					let mut fk = reinhardt_query::prelude::ForeignKey::create();
4044					fk.name(Alias::new(name.as_str()))
4045						.from_tbl(Alias::new(name.as_str()))
4046						.to_tbl(Alias::new(referenced_table.as_str()))
4047						.from_col(Alias::new(column.as_str()))
4048						.to_col(Alias::new(referenced_column.as_str()))
4049						.on_delete((*on_delete).into())
4050						.on_update((*on_update).into());
4051
4052					stmt.foreign_key_from_builder(&mut fk);
4053
4054					// Add UNIQUE constraint separately if needed
4055					// Note: This should ideally be handled via UNIQUE column definition
4056				}
4057				Constraint::ManyToMany { .. } => {
4058					// ManyToMany is metadata only, no actual constraint in this table
4059					// The intermediate table handles the relationship
4060				}
4061				Constraint::Exclude { .. } => {
4062					// Exclude constraints are PostgreSQL-specific and not directly supported by reinhardt-query
4063					// They need to be handled with raw SQL if needed
4064				}
4065			}
4066		}
4067
4068		stmt.to_owned()
4069	}
4070
4071	/// Build DROP TABLE statement
4072	fn build_drop_table(&self, name: &str) -> DropTableStatement {
4073		Query::drop_table()
4074			.table(Alias::new(name))
4075			.if_exists()
4076			.cascade()
4077			.to_owned()
4078	}
4079
4080	/// Build ALTER TABLE ADD COLUMN statement
4081	fn build_add_column(&self, table: &str, column: &ColumnDefinition) -> AlterTableStatement {
4082		let mut stmt = Query::alter_table();
4083		stmt.table(Alias::new(table));
4084
4085		let mut col_def = ColumnDef::new(Alias::new(column.name.as_str()));
4086		col_def = self.apply_column_type(col_def, &column.type_definition);
4087
4088		if column.not_null {
4089			col_def = col_def.not_null(true);
4090		}
4091		if let Some(default) = &column.default {
4092			col_def = col_def.default(SimpleExpr::from(self.convert_default_value(default)));
4093		}
4094
4095		stmt.add_column(col_def);
4096		stmt.to_owned()
4097	}
4098
4099	/// Build ALTER TABLE DROP COLUMN statement
4100	fn build_drop_column(&self, table: &str, column: &str) -> AlterTableStatement {
4101		Query::alter_table()
4102			.table(Alias::new(table))
4103			.drop_column(Alias::new(column))
4104			.to_owned()
4105	}
4106
4107	/// Build ALTER TABLE ALTER COLUMN statement
4108	fn build_alter_column(
4109		&self,
4110		table: &str,
4111		column: &str,
4112		new_definition: &ColumnDefinition,
4113	) -> AlterTableStatement {
4114		let mut stmt = Query::alter_table();
4115		stmt.table(Alias::new(table));
4116
4117		let mut col_def = ColumnDef::new(Alias::new(column));
4118		col_def = self.apply_column_type(col_def, &new_definition.type_definition);
4119
4120		if new_definition.not_null {
4121			col_def = col_def.not_null(true);
4122		}
4123
4124		stmt.modify_column(col_def);
4125		stmt.to_owned()
4126	}
4127
4128	/// Build ALTER TABLE RENAME statement
4129	fn build_rename_table(&self, old_name: &str, new_name: &str) -> AlterTableStatement {
4130		Query::alter_table()
4131			.table(Alias::new(old_name))
4132			.rename_table(Alias::new(new_name))
4133			.to_owned()
4134	}
4135
4136	/// Build CREATE INDEX statement
4137	fn build_create_index(
4138		&self,
4139		name: &str,
4140		table: &str,
4141		columns: &[String],
4142		unique: bool,
4143	) -> CreateIndexStatement {
4144		let mut stmt = Query::create_index();
4145		stmt.name(Alias::new(name)).table(Alias::new(table));
4146
4147		for col in columns {
4148			stmt.col(Alias::new(col));
4149		}
4150
4151		if unique {
4152			stmt.unique();
4153		}
4154
4155		stmt.to_owned()
4156	}
4157
4158	/// Build DROP INDEX statement
4159	fn build_drop_index(&self, name: &str) -> DropIndexStatement {
4160		Query::drop_index().name(Alias::new(name)).to_owned()
4161	}
4162
4163	/// Apply column type to ColumnDef using `reinhardt_query`'s fluent API
4164	fn apply_column_type(&self, col_def: ColumnDef, field_type: &FieldType) -> ColumnDef {
4165		use FieldType;
4166		match field_type {
4167			FieldType::Integer => col_def.integer(),
4168			FieldType::BigInteger => col_def.big_integer(),
4169			FieldType::SmallInteger => col_def.small_integer(),
4170			FieldType::TinyInt => col_def.tiny_integer(),
4171			FieldType::VarChar(max_length) => col_def.string_len(*max_length),
4172			FieldType::Char(max_length) => col_def.char_len(*max_length),
4173			FieldType::Text | FieldType::TinyText | FieldType::MediumText | FieldType::LongText => {
4174				col_def.text()
4175			}
4176			// Use custom "BOOLEAN" type name instead of col_def.boolean() to ensure
4177			// consistent type naming across all databases. This is important for SQLite
4178			// where col_def.boolean() would generate "INTEGER", but we need "BOOLEAN"
4179			// so that sqlx's type_info().name() returns "BOOLEAN" and our convert_row
4180			// can properly detect boolean columns and convert integer 0/1 to bool values.
4181			FieldType::Boolean => col_def.custom(Alias::new("BOOLEAN")),
4182			FieldType::DateTime => col_def.timestamp(),
4183			FieldType::TimestampTz => col_def.timestamp_with_time_zone(),
4184			FieldType::Date => col_def.date(),
4185			FieldType::Time => col_def.time(),
4186			FieldType::Decimal { precision, scale } => col_def.decimal(*precision, *scale),
4187			FieldType::Float => col_def.float(),
4188			FieldType::Double | FieldType::Real => col_def.double(),
4189			FieldType::Json => col_def.json(),
4190			FieldType::JsonBinary => col_def.json_binary(),
4191			FieldType::Uuid => col_def.uuid(),
4192			FieldType::Binary | FieldType::Bytea => col_def.binary(0),
4193			FieldType::Blob | FieldType::TinyBlob | FieldType::MediumBlob | FieldType::LongBlob => {
4194				col_def.binary(0)
4195			}
4196			FieldType::MediumInt => col_def.integer(),
4197			FieldType::Year => col_def.small_integer(),
4198			FieldType::Enum { values } => {
4199				col_def.custom(Alias::new(format!("ENUM({})", values.join(","))))
4200			}
4201			FieldType::Set { values } => {
4202				col_def.custom(Alias::new(format!("SET({})", values.join(","))))
4203			}
4204			FieldType::ForeignKey { .. } => {
4205				// ForeignKey is a relationship, the actual column is typically an integer
4206				col_def.integer()
4207			}
4208			FieldType::OneToOne { .. } => {
4209				// OneToOne is a relationship, not a column type
4210				// The actual column will be a foreign key (typically BigInteger)
4211				col_def.big_integer()
4212			}
4213			FieldType::ManyToMany { .. } => {
4214				// ManyToMany is a relationship, not a column type
4215				// No column is created in the model table (uses intermediate table)
4216				col_def.big_integer()
4217			}
4218			// PostgreSQL-specific types
4219			FieldType::Array(inner) => {
4220				// PostgreSQL array type: use custom with array notation
4221				let inner_sql = inner.to_sql_string();
4222				col_def.custom(Alias::new(format!("{}[]", inner_sql)))
4223			}
4224			FieldType::HStore => col_def.custom(Alias::new("HSTORE")),
4225			FieldType::CIText => col_def.custom(Alias::new("CITEXT")),
4226			FieldType::Int4Range => col_def.custom(Alias::new("INT4RANGE")),
4227			FieldType::Int8Range => col_def.custom(Alias::new("INT8RANGE")),
4228			FieldType::NumRange => col_def.custom(Alias::new("NUMRANGE")),
4229			FieldType::DateRange => col_def.custom(Alias::new("DATERANGE")),
4230			FieldType::TsRange => col_def.custom(Alias::new("TSRANGE")),
4231			FieldType::TsTzRange => col_def.custom(Alias::new("TSTZRANGE")),
4232			FieldType::TsVector => col_def.custom(Alias::new("TSVECTOR")),
4233			FieldType::TsQuery => col_def.custom(Alias::new("TSQUERY")),
4234			FieldType::Custom(custom_type) => col_def.custom(Alias::new(custom_type)),
4235		}
4236	}
4237
4238	/// Convert default value string to `reinhardt_query::prelude::Value`
4239	fn convert_default_value(&self, default: &str) -> Value {
4240		let trimmed = default.trim();
4241
4242		// NULL
4243		if trimmed.eq_ignore_ascii_case("null") {
4244			return Value::String(None);
4245		}
4246
4247		// Boolean
4248		if trimmed.eq_ignore_ascii_case("true") {
4249			return Value::Bool(Some(true));
4250		}
4251		if trimmed.eq_ignore_ascii_case("false") {
4252			return Value::Bool(Some(false));
4253		}
4254
4255		// Integer
4256		if let Ok(i) = trimmed.parse::<i64>() {
4257			return Value::BigInt(Some(i));
4258		}
4259
4260		// Float
4261		if let Ok(f) = trimmed.parse::<f64>() {
4262			return Value::Double(Some(f));
4263		}
4264
4265		// String (quoted)
4266		if (trimmed.starts_with('"') && trimmed.ends_with('"'))
4267			|| (trimmed.starts_with('\'') && trimmed.ends_with('\''))
4268		{
4269			let unquoted = &trimmed[1..trimmed.len() - 1];
4270			return Value::String(Some(Box::new(unquoted.to_string())));
4271		}
4272
4273		// JSON array/object
4274		if ((trimmed.starts_with('[') && trimmed.ends_with(']'))
4275			|| (trimmed.starts_with('{') && trimmed.ends_with('}')))
4276			&& let Ok(json) = serde_json::from_str::<serde_json::Value>(trimmed)
4277		{
4278			return json_to_sea_value(&json);
4279		}
4280
4281		// SQL constants that should remain unquoted
4282		const SQL_CONSTANTS: &[&str] = &[
4283			"CURRENT_TIMESTAMP",
4284			"CURRENT_DATE",
4285			"CURRENT_TIME",
4286			"CURRENT_USER",
4287			"SESSION_USER",
4288			"LOCALTIME",
4289			"LOCALTIMESTAMP",
4290		];
4291
4292		// SQL function calls (e.g., NOW(), CURRENT_TIMESTAMP()) - keep unquoted
4293		if trimmed.ends_with("()") || trimmed.contains('(') {
4294			return Value::String(Some(Box::new(trimmed.to_string())));
4295		}
4296
4297		// SQL constants - keep unquoted
4298		if SQL_CONSTANTS
4299			.iter()
4300			.any(|c| trimmed.eq_ignore_ascii_case(c))
4301		{
4302			return Value::String(Some(Box::new(trimmed.to_string())));
4303		}
4304
4305		// Default: plain string - auto-quote as SQL string literal
4306		Value::String(Some(Box::new(format!("'{}'", trimmed.replace('\'', "''")))))
4307	}
4308}
4309
4310/// Helper function to convert `serde_json::Value` to `reinhardt_query::prelude::Value`
4311fn json_to_sea_value(json: &serde_json::Value) -> Value {
4312	match json {
4313		serde_json::Value::Null => Value::String(None),
4314		serde_json::Value::Bool(b) => Value::Bool(Some(*b)),
4315		serde_json::Value::Number(n) => {
4316			if let Some(i) = n.as_i64() {
4317				Value::BigInt(Some(i))
4318			} else if let Some(f) = n.as_f64() {
4319				Value::Double(Some(f))
4320			} else {
4321				Value::String(Some(Box::new(n.to_string())))
4322			}
4323		}
4324		serde_json::Value::String(s) => Value::String(Some(Box::new(s.clone()))),
4325		serde_json::Value::Array(_) | serde_json::Value::Object(_) => {
4326			// Store as JSON string
4327			Value::String(Some(Box::new(json.to_string())))
4328		}
4329	}
4330}
4331
4332// MigrationOperation trait implementation for legacy Operation enum
4333use super::operation_trait::MigrationOperation;
4334
4335impl MigrationOperation for Operation {
4336	fn migration_name_fragment(&self) -> Option<String> {
4337		match self {
4338			Operation::CreateTable { name, .. } => Some(name.to_lowercase()),
4339			Operation::DropTable { name } => Some(format!("delete_{}", name.to_lowercase())),
4340			Operation::AddColumn { table, column, .. } => Some(format!(
4341				"{}_{}",
4342				table.to_lowercase(),
4343				column.name.to_lowercase()
4344			)),
4345			Operation::DropColumn { table, column } => Some(format!(
4346				"remove_{}_{}",
4347				table.to_lowercase(),
4348				column.to_lowercase()
4349			)),
4350			Operation::AlterColumn { table, column, .. } => Some(format!(
4351				"alter_{}_{}",
4352				table.to_lowercase(),
4353				column.to_lowercase()
4354			)),
4355			Operation::RenameTable { old_name, new_name } => Some(format!(
4356				"rename_{}_to_{}",
4357				old_name.to_lowercase(),
4358				new_name.to_lowercase()
4359			)),
4360			Operation::RenameColumn {
4361				table, new_name, ..
4362			} => Some(format!(
4363				"rename_{}_{}",
4364				table.to_lowercase(),
4365				new_name.to_lowercase()
4366			)),
4367			Operation::AddConstraint { table, .. } => {
4368				Some(format!("add_constraint_{}", table.to_lowercase()))
4369			}
4370			Operation::DropConstraint {
4371				table: _,
4372				constraint_name,
4373			} => Some(format!(
4374				"drop_constraint_{}",
4375				constraint_name.to_lowercase()
4376			)),
4377			Operation::CreateIndex { table, unique, .. } => {
4378				if *unique {
4379					Some(format!("create_unique_index_{}", table.to_lowercase()))
4380				} else {
4381					Some(format!("create_index_{}", table.to_lowercase()))
4382				}
4383			}
4384			Operation::DropIndex { table, .. } => {
4385				Some(format!("drop_index_{}", table.to_lowercase()))
4386			}
4387			Operation::RunSQL { .. } => None,  // Triggers auto-naming
4388			Operation::RunRust { .. } => None, // Triggers auto-naming
4389			Operation::AlterTableComment { table, .. } => {
4390				Some(format!("alter_comment_{}", table.to_lowercase()))
4391			}
4392			Operation::AlterUniqueTogether { table, .. } => {
4393				Some(format!("alter_unique_{}", table.to_lowercase()))
4394			}
4395			Operation::AlterModelOptions { table, .. } => {
4396				Some(format!("alter_options_{}", table.to_lowercase()))
4397			}
4398			Operation::CreateInheritedTable { name, .. } => {
4399				Some(format!("create_inherited_{}", name.to_lowercase()))
4400			}
4401			Operation::AddDiscriminatorColumn { table, .. } => {
4402				Some(format!("add_discriminator_{}", table.to_lowercase()))
4403			}
4404			Operation::MoveModel {
4405				model_name,
4406				from_app,
4407				to_app,
4408				..
4409			} => Some(format!(
4410				"move_{}_{}_{}_{}",
4411				from_app.to_lowercase(),
4412				model_name.to_lowercase(),
4413				to_app.to_lowercase(),
4414				model_name.to_lowercase()
4415			)),
4416			Operation::CreateSchema { name, .. } => {
4417				Some(format!("create_schema_{}", name.to_lowercase()))
4418			}
4419			Operation::DropSchema { name, .. } => {
4420				Some(format!("drop_schema_{}", name.to_lowercase()))
4421			}
4422			Operation::CreateExtension { name, .. } => {
4423				Some(format!("create_extension_{}", name.to_lowercase()))
4424			}
4425			Operation::BulkLoad { table, .. } => {
4426				Some(format!("bulk_load_{}", table.to_lowercase()))
4427			}
4428			Operation::SetAutoIncrementValue { table, column, .. } => Some(format!(
4429				"set_auto_increment_{}_{}",
4430				table.to_lowercase(),
4431				column.to_lowercase()
4432			)),
4433			Operation::CreateCompositePrimaryKey { table, .. } => {
4434				Some(format!("composite_pk_{}", table.to_lowercase()))
4435			}
4436		}
4437	}
4438
4439	fn describe(&self) -> String {
4440		match self {
4441			Operation::CreateTable { name, .. } => format!("Create table {}", name),
4442			Operation::DropTable { name } => format!("Drop table {}", name),
4443			Operation::AddColumn { table, column, .. } => {
4444				format!("Add column {} to {}", column.name, table)
4445			}
4446			Operation::DropColumn { table, column } => {
4447				format!("Drop column {} from {}", column, table)
4448			}
4449			Operation::AlterColumn { table, column, .. } => {
4450				format!("Alter column {} on {}", column, table)
4451			}
4452			Operation::RenameTable { old_name, new_name } => {
4453				format!("Rename table {} to {}", old_name, new_name)
4454			}
4455			Operation::RenameColumn {
4456				table,
4457				old_name,
4458				new_name,
4459			} => format!("Rename column {} to {} on {}", old_name, new_name, table),
4460			Operation::AddConstraint { table, .. } => format!("Add constraint on {}", table),
4461			Operation::DropConstraint {
4462				table,
4463				constraint_name,
4464			} => format!("Drop constraint {} from {}", constraint_name, table),
4465			Operation::CreateIndex { table, unique, .. } => {
4466				if *unique {
4467					format!("Create unique index on {}", table)
4468				} else {
4469					format!("Create index on {}", table)
4470				}
4471			}
4472			Operation::DropIndex { table, .. } => format!("Drop index on {}", table),
4473			Operation::RunSQL { sql, .. } => {
4474				let preview = if sql.len() > 50 {
4475					format!("{}...", &sql[..50])
4476				} else {
4477					(*sql).to_string()
4478				};
4479				format!("RunSQL: {}", preview)
4480			}
4481			Operation::RunRust { code, .. } => {
4482				let preview = if code.len() > 50 {
4483					format!("{}...", &code[..50])
4484				} else {
4485					(*code).to_string()
4486				};
4487				format!("RunRust: {}", preview)
4488			}
4489			Operation::AlterTableComment { table, comment } => match comment {
4490				Some(c) => format!("Set comment on {} to '{}'", table, c),
4491				None => format!("Remove comment from {}", table),
4492			},
4493			Operation::AlterUniqueTogether { table, .. } => {
4494				format!("Alter unique_together on {}", table)
4495			}
4496			Operation::AlterModelOptions { table, .. } => {
4497				format!("Alter model options on {}", table)
4498			}
4499			Operation::CreateInheritedTable {
4500				name, base_table, ..
4501			} => {
4502				format!("Create inherited table {} from {}", name, base_table)
4503			}
4504			Operation::AddDiscriminatorColumn {
4505				table, column_name, ..
4506			} => format!("Add discriminator column {} to {}", column_name, table),
4507			Operation::MoveModel {
4508				model_name,
4509				from_app,
4510				to_app,
4511				..
4512			} => format!("Move model {} from {} to {}", model_name, from_app, to_app),
4513			Operation::CreateSchema { name, .. } => format!("Create schema {}", name),
4514			Operation::DropSchema { name, .. } => format!("Drop schema {}", name),
4515			Operation::CreateExtension { name, .. } => format!("Create extension {}", name),
4516			Operation::BulkLoad { table, source, .. } => {
4517				let source_desc = match source {
4518					BulkLoadSource::File(path) => format!("file '{}'", path),
4519					BulkLoadSource::Stdin => "STDIN".to_string(),
4520					BulkLoadSource::Program(cmd) => format!("program '{}'", cmd),
4521				};
4522				format!("Bulk load data into {} from {}", table, source_desc)
4523			}
4524			Operation::SetAutoIncrementValue {
4525				table,
4526				column,
4527				value,
4528			} => format!("Set auto-increment of {}.{} to {}", table, column, value),
4529			Operation::CreateCompositePrimaryKey { table, columns, .. } => format!(
4530				"Create composite primary key on {} ({})",
4531				table,
4532				columns.join(", ")
4533			),
4534		}
4535	}
4536
4537	/// Normalize operation for semantic comparison
4538	///
4539	/// Returns a normalized version where order-independent elements are sorted.
4540	/// This enables detection of semantically equivalent operations regardless of element ordering.
4541	fn normalize(&self) -> Self
4542	where
4543		Self: Sized + Clone,
4544	{
4545		match self {
4546			// CreateTable: Sort columns and constraints
4547			Operation::CreateTable {
4548				name,
4549				columns,
4550				constraints,
4551				without_rowid,
4552				interleave_in_parent,
4553				partition,
4554			} => {
4555				let mut sorted_columns = columns.clone();
4556				sorted_columns.sort_by(|a, b| a.name.cmp(&b.name));
4557
4558				let mut sorted_constraints = constraints.clone();
4559				sorted_constraints.sort();
4560
4561				Operation::CreateTable {
4562					name: name.clone(),
4563					columns: sorted_columns,
4564					constraints: sorted_constraints,
4565					without_rowid: *without_rowid,
4566					interleave_in_parent: interleave_in_parent.clone(),
4567					partition: partition.clone(),
4568				}
4569			}
4570			// CreateIndex: Sort columns
4571			Operation::CreateIndex {
4572				table,
4573				columns,
4574				unique,
4575				index_type,
4576				where_clause,
4577				concurrently,
4578				expressions,
4579				mysql_options,
4580				operator_class,
4581			} => {
4582				let mut sorted_columns = columns.clone();
4583				sorted_columns.sort();
4584
4585				Operation::CreateIndex {
4586					table: table.clone(),
4587					columns: sorted_columns,
4588					unique: *unique,
4589					index_type: *index_type,
4590					where_clause: where_clause.clone(),
4591					concurrently: *concurrently,
4592					expressions: expressions.clone(),
4593					mysql_options: *mysql_options,
4594					operator_class: operator_class.clone(),
4595				}
4596			}
4597			// DropIndex: Sort columns
4598			Operation::DropIndex { table, columns } => {
4599				let mut sorted_columns = columns.clone();
4600				sorted_columns.sort();
4601
4602				Operation::DropIndex {
4603					table: table.clone(),
4604					columns: sorted_columns,
4605				}
4606			}
4607			// AlterUniqueTogether: Sort field lists and sort within each list
4608			Operation::AlterUniqueTogether {
4609				table,
4610				unique_together,
4611			} => {
4612				let mut sorted_unique_together: Vec<Vec<String>> = unique_together
4613					.iter()
4614					.map(|field_list| {
4615						let mut sorted = field_list.clone();
4616						sorted.sort();
4617						sorted
4618					})
4619					.collect();
4620				sorted_unique_together.sort();
4621
4622				Operation::AlterUniqueTogether {
4623					table: table.clone(),
4624					unique_together: sorted_unique_together,
4625				}
4626			}
4627			// AlterModelOptions: HashMap cannot be sorted, but we can normalize by converting to sorted Vec
4628			// However, since HashMap doesn't guarantee order and the operation uses HashMap,
4629			// we'll just clone it as-is. For true semantic equality, this would need to be changed
4630			// to a BTreeMap at the type level.
4631			Operation::AlterModelOptions { table, options } => Operation::AlterModelOptions {
4632				table: table.clone(),
4633				options: options.clone(),
4634			},
4635			// All other operations: Return clone (order doesn't matter or not applicable)
4636			_ => self.clone(),
4637		}
4638	}
4639}
4640
4641#[cfg(test)]
4642mod tests {
4643	use super::*;
4644	use FieldType;
4645	use rstest::rstest;
4646
4647	#[test]
4648	fn test_create_table_to_statement() {
4649		let op = Operation::CreateTable {
4650			name: "users".to_string(),
4651			columns: vec![
4652				ColumnDefinition {
4653					name: "id".to_string(),
4654					type_definition: FieldType::Integer,
4655					not_null: false,
4656					unique: false,
4657					primary_key: true,
4658					auto_increment: true,
4659					default: None,
4660				},
4661				ColumnDefinition {
4662					name: "name".to_string(),
4663					type_definition: FieldType::VarChar(100),
4664					not_null: true,
4665					unique: false,
4666					primary_key: false,
4667					auto_increment: false,
4668					default: None,
4669				},
4670			],
4671			constraints: vec![],
4672			without_rowid: None,
4673			partition: None,
4674			interleave_in_parent: None,
4675		};
4676
4677		let stmt = op.to_statement();
4678		let sql = stmt.to_sql_string(crate::backends::types::DatabaseType::Postgres);
4679		assert!(
4680			sql.contains("CREATE TABLE"),
4681			"SQL should contain CREATE TABLE keyword, got: {}",
4682			sql
4683		);
4684		assert!(
4685			sql.contains("users"),
4686			"SQL should reference 'users' table, got: {}",
4687			sql
4688		);
4689		assert!(
4690			sql.contains("id") && sql.contains("name"),
4691			"SQL should contain both 'id' and 'name' columns, got: {}",
4692			sql
4693		);
4694	}
4695
4696	#[test]
4697	fn test_drop_table_to_statement() {
4698		let op = Operation::DropTable {
4699			name: "users".to_string(),
4700		};
4701
4702		let stmt = op.to_statement();
4703		let sql = stmt.to_sql_string(crate::backends::types::DatabaseType::Postgres);
4704		assert!(
4705			sql.contains("DROP TABLE"),
4706			"SQL should contain DROP TABLE keyword, got: {}",
4707			sql
4708		);
4709		assert!(
4710			sql.contains("users"),
4711			"SQL should reference 'users' table, got: {}",
4712			sql
4713		);
4714		assert!(
4715			sql.contains("CASCADE"),
4716			"SQL should include CASCADE option, got: {}",
4717			sql
4718		);
4719	}
4720
4721	#[test]
4722	fn test_add_column_to_statement() {
4723		let op = Operation::AddColumn {
4724			table: "users".to_string(),
4725			column: ColumnDefinition {
4726				name: "email".to_string(),
4727				type_definition: FieldType::VarChar(255),
4728				not_null: true,
4729				unique: false,
4730				primary_key: false,
4731				auto_increment: false,
4732				default: Some("''".to_string()),
4733			},
4734			mysql_options: None,
4735		};
4736
4737		let stmt = op.to_statement();
4738		let sql = stmt.to_sql_string(crate::backends::types::DatabaseType::Postgres);
4739		assert!(
4740			sql.contains("ALTER TABLE"),
4741			"SQL should contain ALTER TABLE keyword, got: {}",
4742			sql
4743		);
4744		assert!(
4745			sql.contains("users"),
4746			"SQL should reference 'users' table, got: {}",
4747			sql
4748		);
4749		assert!(
4750			sql.contains("ADD COLUMN"),
4751			"SQL should contain ADD COLUMN clause, got: {}",
4752			sql
4753		);
4754		assert!(
4755			sql.contains("email"),
4756			"SQL should reference 'email' column, got: {}",
4757			sql
4758		);
4759	}
4760
4761	#[test]
4762	fn test_drop_column_to_statement() {
4763		let op = Operation::DropColumn {
4764			table: "users".to_string(),
4765			column: "email".to_string(),
4766		};
4767
4768		let stmt = op.to_statement();
4769		let sql = stmt.to_sql_string(crate::backends::types::DatabaseType::Postgres);
4770		assert!(
4771			sql.contains("ALTER TABLE"),
4772			"SQL should contain ALTER TABLE keyword, got: {}",
4773			sql
4774		);
4775		assert!(
4776			sql.contains("users"),
4777			"SQL should reference 'users' table, got: {}",
4778			sql
4779		);
4780		assert!(
4781			sql.contains("DROP COLUMN"),
4782			"SQL should contain DROP COLUMN clause, got: {}",
4783			sql
4784		);
4785		assert!(
4786			sql.contains("email"),
4787			"SQL should reference 'email' column, got: {}",
4788			sql
4789		);
4790	}
4791
4792	#[test]
4793	fn test_alter_column_to_statement() {
4794		let op = Operation::AlterColumn {
4795			table: "users".to_string(),
4796			column: "age".to_string(),
4797			old_definition: None,
4798			new_definition: ColumnDefinition {
4799				name: "age".to_string(),
4800				type_definition: FieldType::BigInteger,
4801				not_null: true,
4802				unique: false,
4803				primary_key: false,
4804				auto_increment: false,
4805				default: None,
4806			},
4807			mysql_options: None,
4808		};
4809
4810		let stmt = op.to_statement();
4811		let sql = stmt.to_sql_string(crate::backends::types::DatabaseType::Postgres);
4812		assert!(
4813			sql.contains("ALTER TABLE"),
4814			"SQL should contain ALTER TABLE keyword, got: {}",
4815			sql
4816		);
4817		assert!(
4818			sql.contains("users"),
4819			"SQL should reference 'users' table, got: {}",
4820			sql
4821		);
4822		assert!(
4823			sql.contains("age"),
4824			"SQL should reference 'age' column, got: {}",
4825			sql
4826		);
4827	}
4828
4829	#[test]
4830	fn test_rename_table_to_statement() {
4831		let op = Operation::RenameTable {
4832			old_name: "users".to_string(),
4833			new_name: "accounts".to_string(),
4834		};
4835
4836		let stmt = op.to_statement();
4837		let sql = stmt.to_sql_string(crate::backends::types::DatabaseType::Postgres);
4838		assert!(
4839			sql.contains("users"),
4840			"SQL should reference old table name 'users', got: {}",
4841			sql
4842		);
4843		assert!(
4844			sql.contains("accounts"),
4845			"SQL should reference new table name 'accounts', got: {}",
4846			sql
4847		);
4848	}
4849
4850	#[test]
4851	fn test_rename_column_to_statement() {
4852		let op = Operation::RenameColumn {
4853			table: "users".to_string(),
4854			old_name: "name".to_string(),
4855			new_name: "full_name".to_string(),
4856		};
4857
4858		let stmt = op.to_statement();
4859		let sql = stmt.to_sql_string(crate::backends::types::DatabaseType::Postgres);
4860		assert!(
4861			sql.contains("ALTER TABLE"),
4862			"SQL should contain ALTER TABLE keyword, got: {}",
4863			sql
4864		);
4865		assert!(
4866			sql.contains("users"),
4867			"SQL should reference 'users' table, got: {}",
4868			sql
4869		);
4870		assert!(
4871			sql.contains("RENAME COLUMN"),
4872			"SQL should contain RENAME COLUMN clause, got: {}",
4873			sql
4874		);
4875		assert!(
4876			sql.contains("name"),
4877			"SQL should reference old column name 'name', got: {}",
4878			sql
4879		);
4880		assert!(
4881			sql.contains("full_name"),
4882			"SQL should reference new column name 'full_name', got: {}",
4883			sql
4884		);
4885	}
4886
4887	#[test]
4888	fn test_add_constraint_to_statement() {
4889		let op = Operation::AddConstraint {
4890			table: "users".to_string(),
4891			constraint_sql: "CONSTRAINT age_check CHECK (age >= 0)".to_string(),
4892		};
4893
4894		let stmt = op.to_statement();
4895		let sql = stmt.to_sql_string(crate::backends::types::DatabaseType::Postgres);
4896		assert!(
4897			sql.contains("ALTER TABLE"),
4898			"SQL should contain ALTER TABLE keyword, got: {}",
4899			sql
4900		);
4901		assert!(
4902			sql.contains("users"),
4903			"SQL should reference 'users' table, got: {}",
4904			sql
4905		);
4906		assert!(
4907			sql.contains("ADD"),
4908			"SQL should contain ADD keyword, got: {}",
4909			sql
4910		);
4911		assert!(
4912			sql.contains("age_check"),
4913			"SQL should contain constraint name 'age_check', got: {}",
4914			sql
4915		);
4916	}
4917
4918	#[test]
4919	fn test_drop_constraint_to_statement() {
4920		let op = Operation::DropConstraint {
4921			table: "users".to_string(),
4922			constraint_name: "age_check".to_string(),
4923		};
4924
4925		let stmt = op.to_statement();
4926		let sql = stmt.to_sql_string(crate::backends::types::DatabaseType::Postgres);
4927		assert!(
4928			sql.contains("ALTER TABLE"),
4929			"SQL should contain ALTER TABLE keyword, got: {}",
4930			sql
4931		);
4932		assert!(
4933			sql.contains("users"),
4934			"SQL should reference 'users' table, got: {}",
4935			sql
4936		);
4937		assert!(
4938			sql.contains("DROP CONSTRAINT"),
4939			"SQL should contain DROP CONSTRAINT clause, got: {}",
4940			sql
4941		);
4942		assert!(
4943			sql.contains("age_check"),
4944			"SQL should reference constraint 'age_check', got: {}",
4945			sql
4946		);
4947	}
4948
4949	#[test]
4950	fn test_create_index_to_statement() {
4951		let op = Operation::CreateIndex {
4952			table: "users".to_string(),
4953			columns: vec!["email".to_string()],
4954			unique: false,
4955			index_type: None,
4956			where_clause: None,
4957			concurrently: false,
4958			expressions: None,
4959			mysql_options: None,
4960			operator_class: None,
4961		};
4962
4963		let stmt = op.to_statement();
4964		let sql = stmt.to_sql_string(crate::backends::types::DatabaseType::Postgres);
4965		assert!(
4966			sql.contains("CREATE INDEX"),
4967			"SQL should contain CREATE INDEX keywords, got: {}",
4968			sql
4969		);
4970		assert!(
4971			sql.contains("users"),
4972			"SQL should reference 'users' table, got: {}",
4973			sql
4974		);
4975		assert!(
4976			sql.contains("email"),
4977			"SQL should reference 'email' column, got: {}",
4978			sql
4979		);
4980	}
4981
4982	#[test]
4983	fn test_create_unique_index_to_statement() {
4984		let op = Operation::CreateIndex {
4985			table: "users".to_string(),
4986			columns: vec!["email".to_string()],
4987			unique: true,
4988			index_type: None,
4989			where_clause: None,
4990			concurrently: false,
4991			expressions: None,
4992			mysql_options: None,
4993			operator_class: None,
4994		};
4995
4996		let stmt = op.to_statement();
4997		let sql = stmt.to_sql_string(crate::backends::types::DatabaseType::Postgres);
4998		assert!(
4999			sql.contains("CREATE UNIQUE INDEX"),
5000			"SQL should contain CREATE UNIQUE INDEX keywords, got: {}",
5001			sql
5002		);
5003		assert!(
5004			sql.contains("users"),
5005			"SQL should reference 'users' table, got: {}",
5006			sql
5007		);
5008		assert!(
5009			sql.contains("email"),
5010			"SQL should reference 'email' column, got: {}",
5011			sql
5012		);
5013	}
5014
5015	#[test]
5016	fn test_drop_index_to_statement() {
5017		let op = Operation::DropIndex {
5018			table: "users".to_string(),
5019			columns: vec!["email".to_string()],
5020		};
5021
5022		let stmt = op.to_statement();
5023		let sql = stmt.to_sql_string(crate::backends::types::DatabaseType::Postgres);
5024		assert!(
5025			sql.contains("DROP INDEX"),
5026			"SQL should contain DROP INDEX keywords, got: {}",
5027			sql
5028		);
5029		assert!(
5030			sql.contains("idx_users_email"),
5031			"SQL should contain generated index name 'idx_users_email', got: {}",
5032			sql
5033		);
5034	}
5035
5036	#[test]
5037	fn test_run_sql_to_statement() {
5038		let op = Operation::RunSQL {
5039			sql: "CREATE EXTENSION IF NOT EXISTS \"uuid-ossp\"".to_string(),
5040			reverse_sql: Some("DROP EXTENSION \"uuid-ossp\"".to_string()),
5041		};
5042
5043		let stmt = op.to_statement();
5044		let sql = stmt.to_sql_string(crate::backends::types::DatabaseType::Postgres);
5045		assert!(
5046			sql.contains("CREATE EXTENSION"),
5047			"SQL should contain CREATE EXTENSION keywords, got: {}",
5048			sql
5049		);
5050		assert!(
5051			sql.contains("uuid-ossp"),
5052			"SQL should reference 'uuid-ossp' extension, got: {}",
5053			sql
5054		);
5055	}
5056
5057	#[test]
5058	fn test_alter_table_comment_to_statement() {
5059		let op = Operation::AlterTableComment {
5060			table: "users".to_string(),
5061			comment: Some("User accounts table".to_string()),
5062		};
5063
5064		let stmt = op.to_statement();
5065		let sql = stmt.to_sql_string(crate::backends::types::DatabaseType::Postgres);
5066		assert!(
5067			sql.contains("COMMENT ON TABLE"),
5068			"SQL should contain COMMENT ON TABLE keywords, got: {}",
5069			sql
5070		);
5071		assert!(
5072			sql.contains("users"),
5073			"SQL should reference 'users' table, got: {}",
5074			sql
5075		);
5076		assert!(
5077			sql.contains("User accounts table"),
5078			"SQL should include comment text 'User accounts table', got: {}",
5079			sql
5080		);
5081	}
5082
5083	#[test]
5084	fn test_alter_table_comment_null_to_statement() {
5085		let op = Operation::AlterTableComment {
5086			table: "users".to_string(),
5087			comment: None,
5088		};
5089
5090		let stmt = op.to_statement();
5091		let sql = stmt.to_sql_string(crate::backends::types::DatabaseType::Postgres);
5092		assert!(
5093			sql.contains("COMMENT ON TABLE"),
5094			"SQL should contain COMMENT ON TABLE keywords, got: {}",
5095			sql
5096		);
5097		assert!(
5098			sql.contains("users"),
5099			"SQL should reference 'users' table, got: {}",
5100			sql
5101		);
5102		assert!(
5103			sql.contains("NULL"),
5104			"SQL should include NULL for null comment, got: {}",
5105			sql
5106		);
5107	}
5108
5109	#[test]
5110	fn test_alter_unique_together_to_statement() {
5111		let op = Operation::AlterUniqueTogether {
5112			table: "users".to_string(),
5113			unique_together: vec![vec!["email".to_string(), "username".to_string()]],
5114		};
5115
5116		let stmt = op.to_statement();
5117		let sql = stmt.to_sql_string(crate::backends::types::DatabaseType::Postgres);
5118		assert!(
5119			sql.contains("ALTER TABLE"),
5120			"SQL should contain ALTER TABLE keyword, got: {}",
5121			sql
5122		);
5123		assert!(
5124			sql.contains("users"),
5125			"SQL should reference 'users' table, got: {}",
5126			sql
5127		);
5128		assert!(
5129			sql.contains("ADD CONSTRAINT"),
5130			"SQL should contain ADD CONSTRAINT clause, got: {}",
5131			sql
5132		);
5133		assert!(
5134			sql.contains("UNIQUE"),
5135			"SQL should contain UNIQUE keyword, got: {}",
5136			sql
5137		);
5138		assert!(
5139			sql.contains("email") && sql.contains("username"),
5140			"SQL should reference both 'email' and 'username' columns, got: {}",
5141			sql
5142		);
5143	}
5144
5145	#[test]
5146	fn test_alter_unique_together_empty() {
5147		let op = Operation::AlterUniqueTogether {
5148			table: "users".to_string(),
5149			unique_together: vec![],
5150		};
5151
5152		let stmt = op.to_statement();
5153		let sql = stmt.to_sql_string(crate::backends::types::DatabaseType::Postgres);
5154		assert_eq!(
5155			sql, "",
5156			"SQL should be empty for empty unique_together constraint"
5157		);
5158	}
5159
5160	#[test]
5161	fn test_alter_model_options_to_statement() {
5162		let mut options = std::collections::HashMap::new();
5163		options.insert("db_table".to_string(), "custom_users".to_string());
5164
5165		let op = Operation::AlterModelOptions {
5166			table: "users".to_string(),
5167			options,
5168		};
5169
5170		let stmt = op.to_statement();
5171		let sql = stmt.to_sql_string(crate::backends::types::DatabaseType::Postgres);
5172		assert_eq!(sql, "", "SQL should be empty for model options operation");
5173	}
5174
5175	#[test]
5176	fn test_create_inherited_table_to_statement() {
5177		let op = Operation::CreateInheritedTable {
5178			name: "admin_users".to_string(),
5179			columns: vec![ColumnDefinition {
5180				name: "admin_level".to_string(),
5181				type_definition: FieldType::Integer,
5182				not_null: true,
5183				unique: false,
5184				primary_key: false,
5185				auto_increment: false,
5186				default: Some("1".to_string()),
5187			}],
5188			base_table: "users".to_string(),
5189			join_column: "user_id".to_string(),
5190		};
5191
5192		let stmt = op.to_statement();
5193		let sql = stmt.to_sql_string(crate::backends::types::DatabaseType::Postgres);
5194		assert!(
5195			sql.contains("CREATE TABLE"),
5196			"SQL should contain CREATE TABLE keywords, got: {}",
5197			sql
5198		);
5199		assert!(
5200			sql.contains("admin_users"),
5201			"SQL should reference 'admin_users' table, got: {}",
5202			sql
5203		);
5204		assert!(
5205			sql.contains("user_id"),
5206			"SQL should include join column 'user_id', got: {}",
5207			sql
5208		);
5209	}
5210
5211	#[test]
5212	fn test_add_discriminator_column_to_statement() {
5213		let op = Operation::AddDiscriminatorColumn {
5214			table: "users".to_string(),
5215			column_name: "user_type".to_string(),
5216			default_value: "regular".to_string(),
5217		};
5218
5219		let stmt = op.to_statement();
5220		let sql = stmt.to_sql_string(crate::backends::types::DatabaseType::Postgres);
5221		assert!(
5222			sql.contains("ALTER TABLE"),
5223			"SQL should contain ALTER TABLE keyword, got: {}",
5224			sql
5225		);
5226		assert!(
5227			sql.contains("users"),
5228			"SQL should reference 'users' table, got: {}",
5229			sql
5230		);
5231		assert!(
5232			sql.contains("ADD COLUMN"),
5233			"SQL should contain ADD COLUMN clause, got: {}",
5234			sql
5235		);
5236		assert!(
5237			sql.contains("user_type"),
5238			"SQL should reference 'user_type' column, got: {}",
5239			sql
5240		);
5241	}
5242
5243	#[test]
5244	fn test_state_forwards_create_table() {
5245		let mut state = ProjectState::new();
5246		let op = Operation::CreateTable {
5247			name: "users".to_string(),
5248			columns: vec![
5249				ColumnDefinition {
5250					name: "id".to_string(),
5251					type_definition: FieldType::Integer,
5252					not_null: false,
5253					unique: false,
5254					primary_key: true,
5255					auto_increment: true,
5256					default: None,
5257				},
5258				ColumnDefinition {
5259					name: "name".to_string(),
5260					type_definition: FieldType::VarChar(100),
5261					not_null: true,
5262					unique: false,
5263					primary_key: false,
5264					auto_increment: false,
5265					default: None,
5266				},
5267			],
5268			constraints: vec![],
5269			without_rowid: None,
5270			partition: None,
5271			interleave_in_parent: None,
5272		};
5273
5274		op.state_forwards("myapp", &mut state);
5275		let model = state.get_model("myapp", "users");
5276		assert!(model.is_some(), "Model 'users' should exist in state");
5277		let model = model.unwrap();
5278		assert_eq!(
5279			model.fields.len(),
5280			2,
5281			"Model should have exactly 2 fields, got: {}",
5282			model.fields.len()
5283		);
5284		assert!(
5285			model.fields.contains_key("id"),
5286			"Model should contain 'id' field"
5287		);
5288		assert!(
5289			model.fields.contains_key("name"),
5290			"Model should contain 'name' field"
5291		);
5292	}
5293
5294	#[test]
5295	fn test_state_forwards_drop_table() {
5296		let mut state = ProjectState::new();
5297		let mut model = ModelState::new("myapp", "users");
5298		model.add_field(FieldState::new("id".to_string(), FieldType::Integer, false));
5299		state.add_model(model);
5300
5301		let op = Operation::DropTable {
5302			name: "users".to_string(),
5303		};
5304
5305		op.state_forwards("myapp", &mut state);
5306		assert!(
5307			state.get_model("myapp", "users").is_none(),
5308			"Model 'users' should be removed from state after drop"
5309		);
5310	}
5311
5312	#[test]
5313	fn test_state_forwards_add_column() {
5314		let mut state = ProjectState::new();
5315		let mut model = ModelState::new("myapp", "users");
5316		model.add_field(FieldState::new("id".to_string(), FieldType::Integer, false));
5317		state.add_model(model);
5318
5319		let op = Operation::AddColumn {
5320			table: "users".to_string(),
5321			column: ColumnDefinition {
5322				name: "email".to_string(),
5323				type_definition: FieldType::VarChar(255),
5324				not_null: true,
5325				unique: false,
5326				primary_key: false,
5327				auto_increment: false,
5328				default: None,
5329			},
5330			mysql_options: None,
5331		};
5332
5333		op.state_forwards("myapp", &mut state);
5334		let model = state.get_model("myapp", "users").unwrap();
5335		assert_eq!(
5336			model.fields.len(),
5337			2,
5338			"Model should have 2 fields after adding 'email', got: {}",
5339			model.fields.len()
5340		);
5341		assert!(
5342			model.fields.contains_key("email"),
5343			"Model should contain newly added 'email' field"
5344		);
5345	}
5346
5347	#[test]
5348	fn test_state_forwards_drop_column() {
5349		let mut state = ProjectState::new();
5350		let mut model = ModelState::new("myapp", "users");
5351		model.add_field(FieldState::new("id".to_string(), FieldType::Integer, false));
5352		model.add_field(FieldState::new(
5353			"email".to_string(),
5354			FieldType::VarChar(255),
5355			false,
5356		));
5357		state.add_model(model);
5358
5359		let op = Operation::DropColumn {
5360			table: "users".to_string(),
5361			column: "email".to_string(),
5362		};
5363
5364		op.state_forwards("myapp", &mut state);
5365		let model = state.get_model("myapp", "users").unwrap();
5366		assert_eq!(
5367			model.fields.len(),
5368			1,
5369			"Model should have 1 field after dropping 'email', got: {}",
5370			model.fields.len()
5371		);
5372		assert!(
5373			!model.fields.contains_key("email"),
5374			"Model should not contain dropped 'email' field"
5375		);
5376	}
5377
5378	#[test]
5379	fn test_state_forwards_rename_table() {
5380		let mut state = ProjectState::new();
5381		let mut model = ModelState::new("myapp", "users");
5382		model.add_field(FieldState::new("id".to_string(), FieldType::Integer, false));
5383		state.add_model(model);
5384
5385		let op = Operation::RenameTable {
5386			old_name: "users".to_string(),
5387			new_name: "accounts".to_string(),
5388		};
5389
5390		op.state_forwards("myapp", &mut state);
5391		assert!(
5392			state.get_model("myapp", "users").is_none(),
5393			"Old model name 'users' should not exist after rename"
5394		);
5395		assert!(
5396			state.get_model("myapp", "accounts").is_some(),
5397			"New model name 'accounts' should exist after rename"
5398		);
5399	}
5400
5401	#[test]
5402	fn test_state_forwards_rename_column() {
5403		let mut state = ProjectState::new();
5404		let mut model = ModelState::new("myapp", "users");
5405		model.add_field(FieldState::new(
5406			"name".to_string(),
5407			FieldType::VarChar(255),
5408			false,
5409		));
5410		state.add_model(model);
5411
5412		let op = Operation::RenameColumn {
5413			table: "users".to_string(),
5414			old_name: "name".to_string(),
5415			new_name: "full_name".to_string(),
5416		};
5417
5418		op.state_forwards("myapp", &mut state);
5419		let model = state.get_model("myapp", "users").unwrap();
5420		assert!(
5421			!model.fields.contains_key("name"),
5422			"Old field name 'name' should not exist after rename"
5423		);
5424		assert!(
5425			model.fields.contains_key("full_name"),
5426			"New field name 'full_name' should exist after rename"
5427		);
5428	}
5429
5430	#[test]
5431	fn test_to_reverse_sql_create_table() {
5432		let op = Operation::CreateTable {
5433			name: "users".to_string(),
5434			columns: vec![],
5435			constraints: vec![],
5436			without_rowid: None,
5437			partition: None,
5438			interleave_in_parent: None,
5439		};
5440
5441		let state = ProjectState::default();
5442		let reverse = op.to_reverse_sql(&SqlDialect::Postgres, &state);
5443		assert!(
5444			reverse.is_ok() && reverse.as_ref().ok().unwrap().is_some(),
5445			"CreateTable should have reverse SQL operation"
5446		);
5447		let sql = reverse.unwrap().unwrap();
5448		assert!(
5449			sql.contains("DROP TABLE"),
5450			"Reverse SQL should contain DROP TABLE, got: {}",
5451			sql
5452		);
5453		assert!(
5454			sql.contains("users"),
5455			"Reverse SQL should reference 'users' table, got: {}",
5456			sql
5457		);
5458	}
5459
5460	#[test]
5461	fn test_to_reverse_sql_drop_table() {
5462		let op = Operation::DropTable {
5463			name: "users".to_string(),
5464		};
5465
5466		let state = ProjectState::default();
5467		let reverse = op.to_reverse_sql(&SqlDialect::Postgres, &state);
5468		assert!(
5469			reverse.is_ok() && reverse.as_ref().ok().unwrap().is_none(),
5470			"DropTable should not have reverse SQL (cannot recreate table structure)"
5471		);
5472	}
5473
5474	#[test]
5475	fn test_to_reverse_sql_add_column() {
5476		let op = Operation::AddColumn {
5477			table: "users".to_string(),
5478			column: ColumnDefinition {
5479				name: "email".to_string(),
5480				type_definition: FieldType::VarChar(255),
5481				not_null: false,
5482				unique: false,
5483				primary_key: false,
5484				auto_increment: false,
5485				default: None,
5486			},
5487			mysql_options: None,
5488		};
5489
5490		let state = ProjectState::default();
5491		let reverse = op.to_reverse_sql(&SqlDialect::Postgres, &state);
5492		assert!(
5493			reverse.is_ok() && reverse.as_ref().ok().unwrap().is_some(),
5494			"AddColumn should have reverse SQL operation"
5495		);
5496		let sql = reverse.unwrap().unwrap();
5497		assert!(
5498			sql.contains("DROP COLUMN"),
5499			"Reverse SQL should contain DROP COLUMN, got: {}",
5500			sql
5501		);
5502		assert!(
5503			sql.contains("email"),
5504			"Reverse SQL should reference 'email' column, got: {}",
5505			sql
5506		);
5507	}
5508
5509	/// Build an [`Operation::AlterColumn`] that carries an `old_definition`,
5510	/// so reverse-SQL generation has all the inputs it needs without a
5511	/// populated `ProjectState`. Used by the dialect-dispatch regression
5512	/// tests for reinhardt-web#4582.
5513	fn alter_column_with_old_def() -> Operation {
5514		Operation::AlterColumn {
5515			table: "products".to_string(),
5516			column: "name".to_string(),
5517			old_definition: Some(ColumnDefinition {
5518				name: "name".to_string(),
5519				type_definition: FieldType::VarChar(50),
5520				not_null: false,
5521				unique: false,
5522				primary_key: false,
5523				auto_increment: false,
5524				default: None,
5525			}),
5526			new_definition: ColumnDefinition {
5527				name: "name".to_string(),
5528				type_definition: FieldType::Text,
5529				not_null: false,
5530				unique: false,
5531				primary_key: false,
5532				auto_increment: false,
5533				default: None,
5534			},
5535			mysql_options: None,
5536		}
5537	}
5538
5539	/// Reverse SQL for Postgres must use `ALTER COLUMN ... TYPE` syntax
5540	/// (regression coverage for reinhardt-web#4582).
5541	#[test]
5542	fn test_to_reverse_sql_alter_column_postgres() {
5543		// Arrange
5544		let op = alter_column_with_old_def();
5545		let state = ProjectState::default();
5546
5547		// Act
5548		let sql = op
5549			.to_reverse_sql(&SqlDialect::Postgres, &state)
5550			.expect("reverse SQL should succeed")
5551			.expect("reverse SQL should be present");
5552
5553		// Assert
5554		assert!(
5555			sql.contains("ALTER COLUMN") && sql.contains("TYPE"),
5556			"Postgres reverse SQL should use ALTER COLUMN ... TYPE syntax, got: {}",
5557			sql
5558		);
5559		assert!(
5560			sql.contains("VARCHAR(50)"),
5561			"Postgres reverse SQL should restore VARCHAR(50), got: {}",
5562			sql
5563		);
5564	}
5565
5566	/// Reverse SQL for MySQL must use `MODIFY COLUMN` syntax — the previous
5567	/// implementation emitted Postgres `ALTER COLUMN ... TYPE` which MySQL
5568	/// rejects with error 1064 (reinhardt-web#4582).
5569	#[test]
5570	fn test_to_reverse_sql_alter_column_mysql() {
5571		// Arrange
5572		let op = alter_column_with_old_def();
5573		let state = ProjectState::default();
5574
5575		// Act
5576		let sql = op
5577			.to_reverse_sql(&SqlDialect::Mysql, &state)
5578			.expect("reverse SQL should succeed")
5579			.expect("reverse SQL should be present");
5580
5581		// Assert
5582		assert!(
5583			sql.contains("MODIFY COLUMN"),
5584			"MySQL reverse SQL should use MODIFY COLUMN syntax, got: {}",
5585			sql
5586		);
5587		assert!(
5588			!sql.contains("ALTER COLUMN"),
5589			"MySQL reverse SQL must not emit Postgres ALTER COLUMN syntax, got: {}",
5590			sql
5591		);
5592		assert!(
5593			!sql.contains(" TYPE "),
5594			"MySQL reverse SQL must not contain Postgres ' TYPE ' token, got: {}",
5595			sql
5596		);
5597		assert!(
5598			sql.contains("VARCHAR(50)"),
5599			"MySQL reverse SQL should restore VARCHAR(50), got: {}",
5600			sql
5601		);
5602	}
5603
5604	/// Reverse SQL for CockroachDB must emit the column-type reversion as a
5605	/// single, dialect-isolated statement.
5606	///
5607	/// PR #4633 split CockroachDB off from PostgreSQL because CockroachDB
5608	/// rejects the comma-combined `ALTER COLUMN ... TYPE T, ALTER COLUMN
5609	/// ... {SET|DROP} NOT NULL` form that PostgreSQL accepts. The current
5610	/// stop-gap emits only the column-type reversion and intentionally
5611	/// drops nullability rollback fidelity (the full fix is tracked in
5612	/// reinhardt-web#4640 and depends on `to_reverse_sql` returning
5613	/// `Vec<String>`).
5614	///
5615	/// This unit test guards the stop-gap output shape so that a future
5616	/// regression — e.g. re-folding CockroachDB into the Postgres comma form,
5617	/// or accidentally appending the nullability clause as a second `\n`-joined
5618	/// statement that `SchemaEditor::execute()` would reject — surfaces at
5619	/// `cargo test` time instead of only at downstream CockroachDB deployments
5620	/// (reinhardt-web#4639).
5621	#[test]
5622	fn test_to_reverse_sql_alter_column_cockroachdb() {
5623		// Arrange
5624		let op = alter_column_with_old_def();
5625		let state = ProjectState::default();
5626
5627		// Act
5628		let sql = op
5629			.to_reverse_sql(&SqlDialect::Cockroachdb, &state)
5630			.expect("reverse SQL should succeed")
5631			.expect("reverse SQL should be present");
5632
5633		// Assert: shape must match the expected single-statement form
5634		// exactly. Asserting `==` (not `contains`) is intentional: the
5635		// stop-gap is supposed to emit *only* `ALTER TABLE products ALTER
5636		// COLUMN name TYPE VARCHAR(50);` with no extra clauses (no
5637		// nullability, no USING, no comma-combined sibling). A `contains`
5638		// check would silently allow `, ALTER COLUMN ... SET NOT NULL`
5639		// suffixes — exactly the regression this test is meant to pin.
5640		//
5641		// Identifier quoting note: `quote_identifier` here is
5642		// `pg_escape::quote_identifier`, which only quotes identifiers
5643		// that fall outside PostgreSQL's unquoted-identifier grammar
5644		// (anything outside `[a-z_][a-z0-9_]*`, or a reserved keyword).
5645		// `products` and `name` are plain lowercase ASCII matching that
5646		// grammar, so the expected output is unquoted. This matches the
5647		// identical convention documented on
5648		// `test_to_reverse_sql_drop_index_includes_on_table_for_mysql`.
5649		// Identifier-quoting-by-default is tracked separately in #4674.
5650		let expected = "ALTER TABLE products ALTER COLUMN name TYPE VARCHAR(50);";
5651		assert_eq!(
5652			sql, expected,
5653			"CockroachDB reverse SQL must match the pinned single-statement \
5654			 form exactly (no extra clauses), got: {}",
5655			sql
5656		);
5657
5658		// Assert: must be exactly one SQL statement. CockroachDB rejects the
5659		// Postgres comma-combined form, and `SchemaEditor::execute()` is a
5660		// single-statement dispatcher (sqlx Extended Query), so the stop-gap
5661		// must not return a `;\n`-joined multi-statement payload either.
5662		// Trim whitespace first so a trailing `;\n` or `; ` still has its
5663		// terminating semicolon recognised by `trim_end_matches(';')`.
5664		let trimmed = sql.trim().trim_end_matches(';').trim();
5665		assert!(
5666			!trimmed.contains(';'),
5667			"CockroachDB reverse SQL must be exactly one statement (no internal \
5668			 `;`), got: {}",
5669			sql
5670		);
5671		assert!(
5672			!sql.contains(",\n") && !sql.contains(", ALTER COLUMN"),
5673			"CockroachDB reverse SQL must not emit the Postgres comma-combined \
5674			 form (CockroachDB rejects it), got: {}",
5675			sql
5676		);
5677
5678		// Assert: nullability rollback is intentionally dropped under the
5679		// stop-gap. Regressing toward emitting `SET NOT NULL` / `DROP NOT
5680		// NULL` either as a second statement or inline in the same `ALTER
5681		// TABLE` payload would re-break CockroachDB rollback — the
5682		// `Vec<String>` API refactor (reinhardt-web#4640) is the supported
5683		// path for restoring NOT NULL fidelity.
5684		assert!(
5685			!sql.contains("SET NOT NULL") && !sql.contains("DROP NOT NULL"),
5686			"CockroachDB reverse SQL must not emit nullability clause under \
5687			 the current stop-gap (tracked in #4640), got: {}",
5688			sql
5689		);
5690	}
5691
5692	/// SQLite has no general `ALTER COLUMN`; the rollback path is handled
5693	/// via table recreation in the executor. `to_reverse_sql` therefore
5694	/// returns an inert comment so that any accidental fall-through does
5695	/// not produce executable SQL that SQLite would reject
5696	/// (reinhardt-web#4582).
5697	#[test]
5698	fn test_to_reverse_sql_alter_column_sqlite() {
5699		// Arrange
5700		let op = alter_column_with_old_def();
5701		let state = ProjectState::default();
5702
5703		// Act
5704		let sql = op
5705			.to_reverse_sql(&SqlDialect::Sqlite, &state)
5706			.expect("reverse SQL should succeed")
5707			.expect("reverse SQL should be present");
5708
5709		// Assert
5710		assert!(
5711			sql.trim_start().starts_with("--"),
5712			"SQLite reverse SQL should be a SQL comment (recreation handled by executor), got: {}",
5713			sql
5714		);
5715		// Strip leading `--` (and any whitespace) before checking; the comment
5716		// body itself is allowed to mention ALTER COLUMN as English prose.
5717		let body = sql.trim_start_matches("--").trim_start();
5718		assert!(
5719			!body.to_uppercase().contains("ALTER TABLE"),
5720			"SQLite reverse SQL body must not emit executable ALTER TABLE statement, got: {}",
5721			sql
5722		);
5723	}
5724
5725	/// `to_reverse_operation` for `AlterColumn` must prefer the explicit
5726	/// `old_definition` carried by the forward operation, instead of
5727	/// always falling back to an (often empty) `ProjectState` lookup
5728	/// (reinhardt-web#4582).
5729	#[test]
5730	fn test_to_reverse_operation_alter_column_uses_old_definition() {
5731		// Arrange
5732		let op = alter_column_with_old_def();
5733		let state = ProjectState::default();
5734
5735		// Act
5736		let reverse = op
5737			.to_reverse_operation(&state)
5738			.expect("reverse operation should succeed")
5739			.expect("reverse operation should be present (old_definition is supplied)");
5740
5741		// Assert
5742		match reverse {
5743			Operation::AlterColumn { new_definition, .. } => {
5744				assert!(
5745					matches!(new_definition.type_definition, FieldType::VarChar(50)),
5746					"reverse AlterColumn should restore VARCHAR(50), got: {:?}",
5747					new_definition.type_definition
5748				);
5749			}
5750			other => panic!("reverse operation should be AlterColumn, got: {:?}", other),
5751		}
5752	}
5753
5754	#[test]
5755	fn test_to_reverse_sql_run_sql_with_reverse() {
5756		let op = Operation::RunSQL {
5757			sql: "CREATE INDEX idx_name ON users(name)".to_string(),
5758			reverse_sql: Some("DROP INDEX idx_name".to_string()),
5759		};
5760
5761		let state = ProjectState::default();
5762		let reverse = op.to_reverse_sql(&SqlDialect::Postgres, &state);
5763		assert!(
5764			reverse.is_ok() && reverse.as_ref().ok().unwrap().is_some(),
5765			"RunSQL with reverse_sql should have reverse SQL"
5766		);
5767		let sql = reverse.unwrap().unwrap();
5768		assert!(
5769			sql.contains("DROP INDEX"),
5770			"Reverse SQL should contain provided reverse_sql, got: {}",
5771			sql
5772		);
5773	}
5774
5775	#[test]
5776	fn test_to_reverse_sql_run_sql_without_reverse() {
5777		let op = Operation::RunSQL {
5778			sql: "CREATE INDEX idx_name ON users(name)".to_string(),
5779			reverse_sql: None,
5780		};
5781
5782		let state = ProjectState::default();
5783		let reverse = op.to_reverse_sql(&SqlDialect::Postgres, &state);
5784		assert!(
5785			reverse.is_ok() && reverse.as_ref().ok().unwrap().is_none(),
5786			"RunSQL without reverse_sql should not have reverse SQL"
5787		);
5788	}
5789
5790	#[test]
5791	fn test_column_definition_new() {
5792		let col = ColumnDefinition::new("id", FieldType::Integer);
5793		assert_eq!(col.name, "id", "Column name should be 'id'");
5794		assert_eq!(
5795			col.type_definition,
5796			FieldType::Integer,
5797			"Column type should be Integer"
5798		);
5799		assert!(!col.not_null, "not_null should default to false");
5800		assert!(!col.unique, "unique should default to false");
5801		assert!(!col.primary_key, "primary_key should default to false");
5802		assert!(
5803			!col.auto_increment,
5804			"auto_increment should default to false"
5805		);
5806		assert!(col.default.is_none(), "default should be None");
5807	}
5808
5809	// -----------------------------------------------------------------------
5810	// Regression tests for issue #4573:
5811	// `ColumnDefinition::from_field_state` previously read a non-existent
5812	// `params["not_null"]` key and silently emitted NULLABLE columns for
5813	// every non-PK field, violating the non-Optional Rust type ↔ NOT NULL
5814	// schema contract. These tests pin the corrected behavior:
5815	//   not_null = !field_state.nullable || primary_key
5816	// -----------------------------------------------------------------------
5817
5818	#[rstest]
5819	fn from_field_state_non_optional_bool_with_true_default() {
5820		// Arrange
5821		// Models a Rust field declared as `pub is_active: bool` with
5822		// `#[field(default = true)]`. The proc-macro sets
5823		// `field_state.nullable = false` via `FieldMetadata::with_nullable`,
5824		// and emits the default value as `params["default"] = "true"`.
5825		let mut field_state = FieldState::new("is_active", FieldType::Boolean, false);
5826		field_state
5827			.params
5828			.insert("default".to_string(), "true".to_string());
5829
5830		// Act
5831		let col = ColumnDefinition::from_field_state("is_active", &field_state);
5832
5833		// Assert
5834		assert_eq!(col.name, "is_active", "Column name should round-trip");
5835		assert_eq!(
5836			col.type_definition,
5837			FieldType::Boolean,
5838			"Boolean field type should round-trip"
5839		);
5840		assert!(
5841			col.not_null,
5842			"Non-Optional bool must emit NOT NULL (regression #4573)"
5843		);
5844		assert_eq!(
5845			col.default,
5846			Some("true".to_string()),
5847			"`#[field(default = true)]` must propagate as Some(\"true\")"
5848		);
5849		assert!(!col.primary_key, "Non-PK field must not be primary_key");
5850	}
5851
5852	#[rstest]
5853	fn from_field_state_non_optional_bool_with_false_default() {
5854		// Arrange
5855		// Models a Rust field declared as `pub is_superuser: bool` with
5856		// `#[field(default = false)]` — the symptom previously hand-patched
5857		// in PR #4513.
5858		let mut field_state = FieldState::new("is_superuser", FieldType::Boolean, false);
5859		field_state
5860			.params
5861			.insert("default".to_string(), "false".to_string());
5862
5863		// Act
5864		let col = ColumnDefinition::from_field_state("is_superuser", &field_state);
5865
5866		// Assert
5867		assert!(
5868			col.not_null,
5869			"Non-Optional bool with default=false must emit NOT NULL"
5870		);
5871		assert_eq!(
5872			col.default,
5873			Some("false".to_string()),
5874			"default=false must propagate as Some(\"false\")"
5875		);
5876	}
5877
5878	#[rstest]
5879	fn from_field_state_optional_bool_with_default() {
5880		// Arrange
5881		// Models a Rust field declared as `pub maybe_flag: Option<bool>` with
5882		// `#[field(default = true)]`. Existing behavior must be preserved:
5883		// nullable column with default still set.
5884		let mut field_state = FieldState::new("maybe_flag", FieldType::Boolean, true);
5885		field_state
5886			.params
5887			.insert("default".to_string(), "true".to_string());
5888
5889		// Act
5890		let col = ColumnDefinition::from_field_state("maybe_flag", &field_state);
5891
5892		// Assert
5893		assert!(
5894			!col.not_null,
5895			"Optional bool must remain NULLABLE — no regression on Option<T>"
5896		);
5897		assert_eq!(
5898			col.default,
5899			Some("true".to_string()),
5900			"Default propagation must work for Optional fields too"
5901		);
5902	}
5903
5904	#[rstest]
5905	fn from_field_state_non_optional_non_bool() {
5906		// Arrange
5907		// Models a Rust field declared as `pub username: String` with no
5908		// default. This confirms the fix is type-agnostic (not bool-only) —
5909		// the broader symptom in `examples-twitter/migrations/auth/0001_initial.rs`
5910		// where `username`, `bio`, `email`, etc. were all silently NULLABLE.
5911		let field_state = FieldState::new("username", FieldType::VarChar(150), false);
5912
5913		// Act
5914		let col = ColumnDefinition::from_field_state("username", &field_state);
5915
5916		// Assert
5917		assert!(
5918			col.not_null,
5919			"Non-Optional String must emit NOT NULL (regression #4573 — bug \
5920			 affected all field types, not just bool)"
5921		);
5922		assert!(
5923			col.default.is_none(),
5924			"No default annotation → default = None"
5925		);
5926	}
5927
5928	#[rstest]
5929	fn from_field_state_primary_key_is_always_not_null() {
5930		// Arrange
5931		// A primary key column must be NOT NULL even when the nullability
5932		// flag would otherwise allow NULL. This pins the `|| primary_key`
5933		// short-circuit in the corrected expression.
5934		let mut field_state = FieldState::new("id", FieldType::Uuid, true);
5935		field_state
5936			.params
5937			.insert("primary_key".to_string(), "true".to_string());
5938
5939		// Act
5940		let col = ColumnDefinition::from_field_state("id", &field_state);
5941
5942		// Assert
5943		assert!(
5944			col.primary_key,
5945			"primary_key param must propagate to ColumnDefinition"
5946		);
5947		assert!(
5948			col.not_null,
5949			"Primary key must be NOT NULL regardless of nullable flag"
5950		);
5951	}
5952
5953	#[rstest]
5954	fn from_field_state_optional_field_remains_nullable() {
5955		// Arrange
5956		// Models a Rust field declared as `pub last_login: Option<DateTime<Utc>>`
5957		// with no default. Existing nullable-field behavior must be preserved.
5958		let field_state = FieldState::new("last_login", FieldType::TimestampTz, true);
5959
5960		// Act
5961		let col = ColumnDefinition::from_field_state("last_login", &field_state);
5962
5963		// Assert
5964		assert!(
5965			!col.not_null,
5966			"Optional field with no default must remain NULLABLE"
5967		);
5968		assert!(col.default.is_none(), "No default → default = None");
5969		assert!(!col.primary_key, "Non-PK field must not be primary_key");
5970	}
5971
5972	#[test]
5973	fn test_convert_default_value_null() {
5974		let op = Operation::CreateTable {
5975			name: "test".to_string(),
5976			columns: vec![],
5977			constraints: vec![],
5978			without_rowid: None,
5979			partition: None,
5980			interleave_in_parent: None,
5981		};
5982		let value = op.convert_default_value("null");
5983		assert!(
5984			matches!(value, Value::String(None)),
5985			"NULL value should be converted to Value::String(None)"
5986		);
5987	}
5988
5989	#[test]
5990	fn test_convert_default_value_bool() {
5991		let op = Operation::CreateTable {
5992			name: "test".to_string(),
5993			columns: vec![],
5994			constraints: vec![],
5995			without_rowid: None,
5996			partition: None,
5997			interleave_in_parent: None,
5998		};
5999		let value = op.convert_default_value("true");
6000		assert!(
6001			matches!(value, Value::Bool(Some(true))),
6002			"'true' should be converted to Value::Bool(Some(true))"
6003		);
6004
6005		let value = op.convert_default_value("false");
6006		assert!(
6007			matches!(value, Value::Bool(Some(false))),
6008			"'false' should be converted to Value::Bool(Some(false))"
6009		);
6010	}
6011
6012	#[test]
6013	fn test_convert_default_value_integer() {
6014		let op = Operation::CreateTable {
6015			name: "test".to_string(),
6016			columns: vec![],
6017			constraints: vec![],
6018			without_rowid: None,
6019			partition: None,
6020			interleave_in_parent: None,
6021		};
6022		let value = op.convert_default_value("42");
6023		assert!(
6024			matches!(value, Value::BigInt(Some(42))),
6025			"Integer '42' should be converted to Value::BigInt(Some(42))"
6026		);
6027	}
6028
6029	#[test]
6030	fn test_convert_default_value_float() {
6031		let op = Operation::CreateTable {
6032			name: "test".to_string(),
6033			columns: vec![],
6034			constraints: vec![],
6035			without_rowid: None,
6036			partition: None,
6037			interleave_in_parent: None,
6038		};
6039		let value = op.convert_default_value("3.15");
6040		assert!(
6041			matches!(value, Value::Double(_)),
6042			"Float '3.15' should be converted to Value::Double"
6043		);
6044	}
6045
6046	#[test]
6047	fn test_convert_default_value_string() {
6048		let op = Operation::CreateTable {
6049			name: "test".to_string(),
6050			columns: vec![],
6051			constraints: vec![],
6052			without_rowid: None,
6053			partition: None,
6054			interleave_in_parent: None,
6055		};
6056		let value = op.convert_default_value("'hello'");
6057		match value {
6058			Value::String(Some(s)) => assert_eq!(
6059				*s, "hello",
6060				"Quoted string should be unquoted and stored as 'hello'"
6061			),
6062			_ => {
6063				panic!("Expected Value::String(Some(\"hello\")), got different variant")
6064			}
6065		}
6066	}
6067
6068	#[rstest]
6069	#[case("pending", "'pending'")]
6070	#[case("active", "'active'")]
6071	#[case("hello world", "'hello world'")]
6072	#[case("it's", "'it''s'")]
6073	fn test_convert_default_value_plain_string(#[case] input: &str, #[case] expected: &str) {
6074		// Arrange
6075		let op = Operation::CreateTable {
6076			name: "test".to_string(),
6077			columns: vec![],
6078			constraints: vec![],
6079			without_rowid: None,
6080			partition: None,
6081			interleave_in_parent: None,
6082		};
6083
6084		// Act
6085		let value = op.convert_default_value(input);
6086
6087		// Assert
6088		match value {
6089			Value::String(Some(s)) => assert_eq!(
6090				*s, expected,
6091				"Plain string '{input}' should be auto-quoted as SQL string literal"
6092			),
6093			_ => {
6094				panic!("Expected Value::String(Some(\"{expected}\")), got {value:?}")
6095			}
6096		}
6097	}
6098
6099	#[rstest]
6100	#[case("CURRENT_TIMESTAMP")]
6101	#[case("current_timestamp")]
6102	#[case("CURRENT_DATE")]
6103	#[case("CURRENT_TIME")]
6104	#[case("CURRENT_USER")]
6105	#[case("SESSION_USER")]
6106	#[case("LOCALTIME")]
6107	#[case("LOCALTIMESTAMP")]
6108	fn test_convert_default_value_sql_constant(#[case] input: &str) {
6109		// Arrange
6110		let op = Operation::CreateTable {
6111			name: "test".to_string(),
6112			columns: vec![],
6113			constraints: vec![],
6114			without_rowid: None,
6115			partition: None,
6116			interleave_in_parent: None,
6117		};
6118
6119		// Act
6120		let value = op.convert_default_value(input);
6121
6122		// Assert
6123		match value {
6124			Value::String(Some(s)) => {
6125				assert_eq!(*s, input, "SQL constant '{input}' should remain unquoted")
6126			}
6127			_ => {
6128				panic!("Expected Value::String(Some(\"{input}\")), got {value:?}")
6129			}
6130		}
6131	}
6132
6133	#[rstest]
6134	#[case("NOW()")]
6135	#[case("uuid_generate_v4()")]
6136	#[case("gen_random_uuid()")]
6137	fn test_convert_default_value_sql_function(#[case] input: &str) {
6138		// Arrange
6139		let op = Operation::CreateTable {
6140			name: "test".to_string(),
6141			columns: vec![],
6142			constraints: vec![],
6143			without_rowid: None,
6144			partition: None,
6145			interleave_in_parent: None,
6146		};
6147
6148		// Act
6149		let value = op.convert_default_value(input);
6150
6151		// Assert
6152		match value {
6153			Value::String(Some(s)) => {
6154				assert_eq!(*s, input, "SQL function '{input}' should remain unquoted")
6155			}
6156			_ => {
6157				panic!("Expected Value::String(Some(\"{input}\")), got {value:?}")
6158			}
6159		}
6160	}
6161
6162	#[test]
6163	fn test_apply_column_type_integer() {
6164		let op = Operation::CreateTable {
6165			name: "test".to_string(),
6166			columns: vec![],
6167			constraints: vec![],
6168			without_rowid: None,
6169			partition: None,
6170			interleave_in_parent: None,
6171		};
6172		let col = ColumnDef::new(Alias::new("id"));
6173		let _col = op.apply_column_type(col, &FieldType::Integer);
6174		// This test verifies that INTEGER type application doesn't panic
6175		// Internal state cannot be easily asserted with reinhardt_query's ColumnDef API
6176	}
6177
6178	#[test]
6179	fn test_apply_column_type_varchar_with_length() {
6180		let op = Operation::CreateTable {
6181			name: "test".to_string(),
6182			columns: vec![],
6183			constraints: vec![],
6184			without_rowid: None,
6185			partition: None,
6186			interleave_in_parent: None,
6187		};
6188		let col = ColumnDef::new(Alias::new("name"));
6189		let _col = op.apply_column_type(col, &FieldType::VarChar(100));
6190		// This test verifies that VARCHAR(100) type application doesn't panic
6191		// Internal state cannot be easily asserted with reinhardt_query's ColumnDef API
6192	}
6193
6194	#[test]
6195	fn test_apply_column_type_custom() {
6196		let op = Operation::CreateTable {
6197			name: "test".to_string(),
6198			columns: vec![],
6199			constraints: vec![],
6200			without_rowid: None,
6201			partition: None,
6202			interleave_in_parent: None,
6203		};
6204		let col = ColumnDef::new(Alias::new("data"));
6205		let _col = op.apply_column_type(col, &FieldType::Custom("CUSTOM_TYPE".to_string()));
6206		// This test verifies that custom type application doesn't panic
6207		// Internal state cannot be easily asserted with reinhardt_query's ColumnDef API
6208	}
6209
6210	#[test]
6211	fn test_create_index_composite() {
6212		let op = Operation::CreateIndex {
6213			table: "users".to_string(),
6214			columns: vec!["first_name".to_string(), "last_name".to_string()],
6215			unique: false,
6216			index_type: None,
6217			where_clause: None,
6218			concurrently: false,
6219			expressions: None,
6220			mysql_options: None,
6221			operator_class: None,
6222		};
6223
6224		let sql = op.to_sql(&SqlDialect::Postgres);
6225		assert!(
6226			sql.contains("first_name"),
6227			"SQL should include 'first_name' column, got: {}",
6228			sql
6229		);
6230		assert!(
6231			sql.contains("last_name"),
6232			"SQL should include 'last_name' column, got: {}",
6233			sql
6234		);
6235		assert!(
6236			sql.contains("idx_users_first_name_last_name"),
6237			"SQL should include composite index name, got: {}",
6238			sql
6239		);
6240	}
6241
6242	#[test]
6243	fn test_alter_table_comment_with_quotes() {
6244		let op = Operation::AlterTableComment {
6245			table: "users".to_string(),
6246			comment: Some("User's account table".to_string()),
6247		};
6248
6249		let stmt = op.to_statement();
6250		let sql = stmt.to_sql_string(crate::backends::types::DatabaseType::Postgres);
6251		assert!(
6252			sql.contains("COMMENT ON TABLE"),
6253			"SQL should contain COMMENT ON TABLE keywords, got: {}",
6254			sql
6255		);
6256		assert!(
6257			sql.contains("User''s account table"),
6258			"SQL should properly escape single quotes in comment, got: {}",
6259			sql
6260		);
6261	}
6262
6263	#[test]
6264	fn test_state_forwards_alter_column() {
6265		let mut state = ProjectState::new();
6266		let mut model = ModelState::new("myapp", "users");
6267		model.add_field(FieldState::new(
6268			"age".to_string(),
6269			FieldType::Integer,
6270			false,
6271		));
6272		state.add_model(model);
6273
6274		let op = Operation::AlterColumn {
6275			table: "users".to_string(),
6276			column: "age".to_string(),
6277			old_definition: None,
6278			new_definition: ColumnDefinition {
6279				name: "age".to_string(),
6280				type_definition: FieldType::BigInteger,
6281				not_null: true,
6282				unique: false,
6283				primary_key: false,
6284				auto_increment: false,
6285				default: None,
6286			},
6287			mysql_options: None,
6288		};
6289
6290		op.state_forwards("myapp", &mut state);
6291		let model = state.get_model("myapp", "users").unwrap();
6292		let field = model.fields.get("age").unwrap();
6293		assert_eq!(
6294			field.field_type,
6295			FieldType::BigInteger,
6296			"Field type should be updated to BigInteger, got: {}",
6297			field.field_type
6298		);
6299	}
6300
6301	#[test]
6302	fn test_state_forwards_create_inherited_table() {
6303		let mut state = ProjectState::new();
6304		let op = Operation::CreateInheritedTable {
6305			name: "admin_users".to_string(),
6306			columns: vec![ColumnDefinition {
6307				name: "admin_level".to_string(),
6308				type_definition: FieldType::Integer,
6309				not_null: true,
6310				unique: false,
6311				primary_key: false,
6312				auto_increment: false,
6313				default: None,
6314			}],
6315			base_table: "users".to_string(),
6316			join_column: "user_id".to_string(),
6317		};
6318
6319		op.state_forwards("myapp", &mut state);
6320		let model = state.get_model("myapp", "admin_users");
6321		assert!(
6322			model.is_some(),
6323			"Inherited table 'admin_users' should exist in state"
6324		);
6325		let model = model.unwrap();
6326		assert_eq!(
6327			model.base_model,
6328			Some("users".to_string()),
6329			"base_model should be set to 'users'"
6330		);
6331		assert_eq!(
6332			model.inheritance_type,
6333			Some("joined_table".to_string()),
6334			"inheritance_type should be 'joined_table'"
6335		);
6336	}
6337
6338	#[test]
6339	fn test_state_forwards_add_discriminator_column() {
6340		let mut state = ProjectState::new();
6341		let mut model = ModelState::new("myapp", "users");
6342		model.add_field(FieldState::new("id".to_string(), FieldType::Integer, false));
6343		state.add_model(model);
6344
6345		let op = Operation::AddDiscriminatorColumn {
6346			table: "users".to_string(),
6347			column_name: "user_type".to_string(),
6348			default_value: "regular".to_string(),
6349		};
6350
6351		op.state_forwards("myapp", &mut state);
6352		let model = state.get_model("myapp", "users").unwrap();
6353		assert_eq!(
6354			model.discriminator_column,
6355			Some("user_type".to_string()),
6356			"discriminator_column should be set to 'user_type'"
6357		);
6358		assert_eq!(
6359			model.inheritance_type,
6360			Some("single_table".to_string()),
6361			"inheritance_type should be 'single_table'"
6362		);
6363	}
6364
6365	#[rstest]
6366	fn test_to_reverse_sql_create_table_quotes_identifiers() {
6367		// Arrange
6368		let op = Operation::CreateTable {
6369			name: "user-data".to_string(),
6370			columns: vec![],
6371			constraints: vec![],
6372			without_rowid: None,
6373			partition: None,
6374			interleave_in_parent: None,
6375		};
6376		let state = ProjectState::default();
6377
6378		// Act
6379		let sql = op
6380			.to_reverse_sql(&SqlDialect::Postgres, &state)
6381			.unwrap()
6382			.unwrap();
6383
6384		// Assert
6385		assert_eq!(
6386			sql, "DROP TABLE \"user-data\";",
6387			"Identifiers with special characters must be quoted"
6388		);
6389	}
6390
6391	#[rstest]
6392	fn test_to_reverse_sql_add_column_quotes_identifiers() {
6393		// Arrange
6394		let op = Operation::AddColumn {
6395			table: "my table".to_string(),
6396			column: ColumnDefinition {
6397				name: "my column".to_string(),
6398				type_definition: FieldType::VarChar(255),
6399				not_null: false,
6400				unique: false,
6401				primary_key: false,
6402				auto_increment: false,
6403				default: None,
6404			},
6405			mysql_options: None,
6406		};
6407		let state = ProjectState::default();
6408
6409		// Act
6410		let sql = op
6411			.to_reverse_sql(&SqlDialect::Postgres, &state)
6412			.unwrap()
6413			.unwrap();
6414
6415		// Assert
6416		assert_eq!(
6417			sql, "ALTER TABLE \"my table\" DROP COLUMN \"my column\";",
6418			"Table and column names with spaces must be quoted"
6419		);
6420	}
6421
6422	#[rstest]
6423	fn test_to_reverse_sql_rename_table_quotes_identifiers() {
6424		// Arrange: both names contain special characters requiring quoting
6425		let op = Operation::RenameTable {
6426			old_name: "old; DROP TABLE users;--".to_string(),
6427			new_name: "new-name".to_string(),
6428		};
6429		let state = ProjectState::default();
6430
6431		// Act
6432		let sql = op
6433			.to_reverse_sql(&SqlDialect::Postgres, &state)
6434			.unwrap()
6435			.unwrap();
6436
6437		// Assert
6438		assert_eq!(
6439			sql, "ALTER TABLE \"new-name\" RENAME TO \"old; DROP TABLE users;--\";",
6440			"SQL injection attempt must be quoted as identifier"
6441		);
6442	}
6443
6444	#[rstest]
6445	fn test_to_reverse_sql_rename_column_quotes_identifiers() {
6446		// Arrange: use identifiers with special characters to verify quoting
6447		let op = Operation::RenameColumn {
6448			table: "my table".to_string(),
6449			old_name: "old col".to_string(),
6450			new_name: "new col".to_string(),
6451		};
6452		let state = ProjectState::default();
6453
6454		// Act
6455		let sql = op
6456			.to_reverse_sql(&SqlDialect::Postgres, &state)
6457			.unwrap()
6458			.unwrap();
6459
6460		// Assert
6461		assert_eq!(
6462			sql, "ALTER TABLE \"my table\" RENAME COLUMN \"new col\" TO \"old col\";",
6463			"Identifiers with spaces must be quoted"
6464		);
6465	}
6466
6467	#[rstest]
6468	fn test_to_reverse_sql_create_index_quotes_identifiers() {
6469		// Arrange
6470		let op = Operation::CreateIndex {
6471			table: "my-table".to_string(),
6472			columns: vec!["col a".to_string()],
6473			unique: false,
6474			index_type: None,
6475			where_clause: None,
6476			concurrently: false,
6477			expressions: None,
6478			mysql_options: None,
6479			operator_class: None,
6480		};
6481		let state = ProjectState::default();
6482
6483		// Act
6484		let sql = op
6485			.to_reverse_sql(&SqlDialect::Postgres, &state)
6486			.unwrap()
6487			.unwrap();
6488
6489		// Assert
6490		assert!(
6491			sql.contains("DROP INDEX \"idx_my-table_col a\""),
6492			"Index name must be quoted, got: {}",
6493			sql
6494		);
6495	}
6496
6497	/// Regression test for kent8192/reinhardt-web#4583.
6498	///
6499	/// MySQL requires `DROP INDEX <name> ON <table>` when reversing a `CreateIndex`
6500	/// operation. The previous implementation emitted `DROP INDEX <name>;` for every
6501	/// dialect, which produced malformed SQL on MySQL (1064 syntax error). This test
6502	/// pins the MySQL-specific `ON <table>` clause to prevent regressions.
6503	#[rstest]
6504	fn test_to_reverse_sql_create_index_emits_on_table_clause_for_mysql() {
6505		// Arrange
6506		let op = Operation::CreateIndex {
6507			table: "users".to_string(),
6508			columns: vec!["email".to_string()],
6509			unique: false,
6510			index_type: None,
6511			where_clause: None,
6512			concurrently: false,
6513			expressions: None,
6514			mysql_options: None,
6515			operator_class: None,
6516		};
6517		let state = ProjectState::default();
6518
6519		// Act
6520		let sql = op
6521			.to_reverse_sql(&SqlDialect::Mysql, &state)
6522			.unwrap()
6523			.unwrap();
6524
6525		// Assert: `quote_identifier` is `pg_escape::quote_identifier`, which only
6526		// adds quotes when the identifier contains reserved or non-lowercase
6527		// characters. For plain ASCII names, the output is unquoted. What matters
6528		// for regression is the presence of the `ON <table>` suffix.
6529		assert_eq!(
6530			sql, "DROP INDEX idx_users_email ON users;",
6531			"MySQL reverse SQL must include `ON <table>` clause"
6532		);
6533	}
6534
6535	/// Verify Postgres / SQLite / CockroachDB continue to emit the bare
6536	/// `DROP INDEX <name>;` form without an `ON <table>` clause.
6537	#[rstest]
6538	#[case(SqlDialect::Postgres, "DROP INDEX idx_users_email;")]
6539	#[case(SqlDialect::Sqlite, "DROP INDEX idx_users_email;")]
6540	#[case(SqlDialect::Cockroachdb, "DROP INDEX idx_users_email;")]
6541	fn test_to_reverse_sql_create_index_omits_on_table_for_non_mysql(
6542		#[case] dialect: SqlDialect,
6543		#[case] expected: &str,
6544	) {
6545		// Arrange
6546		let op = Operation::CreateIndex {
6547			table: "users".to_string(),
6548			columns: vec!["email".to_string()],
6549			unique: false,
6550			index_type: None,
6551			where_clause: None,
6552			concurrently: false,
6553			expressions: None,
6554			mysql_options: None,
6555			operator_class: None,
6556		};
6557		let state = ProjectState::default();
6558
6559		// Act
6560		let sql = op.to_reverse_sql(&dialect, &state).unwrap().unwrap();
6561
6562		// Assert
6563		assert_eq!(
6564			sql, expected,
6565			"Non-MySQL reverse SQL must remain unchanged for dialect {:?}",
6566			dialect
6567		);
6568	}
6569
6570	#[rstest]
6571	fn test_to_reverse_sql_add_constraint_quotes_identifiers() {
6572		// Arrange: table name with special characters triggers quoting
6573		let op = Operation::AddConstraint {
6574			table: "my-table".to_string(),
6575			constraint_sql: "CONSTRAINT chk_positive CHECK (x > 0)".to_string(),
6576		};
6577		let state = ProjectState::default();
6578
6579		// Act
6580		let sql = op
6581			.to_reverse_sql(&SqlDialect::Postgres, &state)
6582			.unwrap()
6583			.unwrap();
6584
6585		// Assert
6586		assert!(
6587			sql.contains("ALTER TABLE \"my-table\""),
6588			"Table name with special characters must be quoted, got: {}",
6589			sql
6590		);
6591		assert!(
6592			sql.contains("DROP CONSTRAINT"),
6593			"Should contain DROP CONSTRAINT, got: {}",
6594			sql
6595		);
6596	}
6597
6598	#[rstest]
6599	fn test_to_reverse_sql_bulk_load_quotes_identifiers() {
6600		// Arrange
6601		let op = Operation::BulkLoad {
6602			table: "user-data".to_string(),
6603			source: BulkLoadSource::Stdin,
6604			format: BulkLoadFormat::default(),
6605			options: BulkLoadOptions::default(),
6606		};
6607		let state = ProjectState::default();
6608
6609		// Act
6610		let sql = op
6611			.to_reverse_sql(&SqlDialect::Postgres, &state)
6612			.unwrap()
6613			.unwrap();
6614
6615		// Assert
6616		assert_eq!(
6617			sql, "TRUNCATE TABLE \"user-data\";",
6618			"Table name must be quoted"
6619		);
6620	}
6621
6622	// ========================================================================
6623	// SetAutoIncrementValue — per-backend SQL rendering
6624	// ========================================================================
6625
6626	#[rstest]
6627	#[case::postgres(SqlDialect::Postgres)]
6628	#[case::cockroachdb(SqlDialect::Cockroachdb)]
6629	fn test_set_auto_increment_postgres_uses_setval(#[case] dialect: SqlDialect) {
6630		// Arrange
6631		let op = Operation::SetAutoIncrementValue {
6632			table: "users".to_string(),
6633			column: "id".to_string(),
6634			value: 1000,
6635		};
6636
6637		// Act
6638		let sql = op.to_sql(&dialect);
6639
6640		// Assert
6641		assert_eq!(
6642			sql,
6643			"SELECT setval(pg_get_serial_sequence('users', 'id'), 1000, false);"
6644		);
6645	}
6646
6647	#[test]
6648	fn test_set_auto_increment_mysql_alters_table() {
6649		// Arrange
6650		let op = Operation::SetAutoIncrementValue {
6651			table: "users".to_string(),
6652			column: "id".to_string(),
6653			value: 1000,
6654		};
6655
6656		// Act
6657		let sql = op.to_sql(&SqlDialect::Mysql);
6658
6659		// Assert: identifier quoting uses pg_escape's `quote_identifier`
6660		// uniformly across dialects (matches convention used elsewhere in
6661		// this module, e.g. AlterTableComment). pg_escape omits quotes when
6662		// the identifier needs no escaping.
6663		assert_eq!(sql, "ALTER TABLE users AUTO_INCREMENT = 1000;");
6664	}
6665
6666	#[test]
6667	fn test_set_auto_increment_sqlite_upserts_sqlite_sequence() {
6668		// Arrange
6669		let op = Operation::SetAutoIncrementValue {
6670			table: "users".to_string(),
6671			column: "id".to_string(),
6672			value: 1000,
6673		};
6674
6675		// Act
6676		let sql = op.to_sql(&SqlDialect::Sqlite);
6677
6678		// Assert: INSERT OR REPLACE is robust vs. UPDATE which no-ops when
6679		// the sqlite_sequence row does not yet exist.
6680		assert_eq!(
6681			sql,
6682			"INSERT OR REPLACE INTO sqlite_sequence(name, seq) VALUES ('users', 1000);"
6683		);
6684	}
6685
6686	#[test]
6687	fn test_set_auto_increment_postgres_escapes_literals() {
6688		// Arrange: embedded single quote must be doubled to avoid injection.
6689		let op = Operation::SetAutoIncrementValue {
6690			table: "user's".to_string(),
6691			column: "id".to_string(),
6692			value: 42,
6693		};
6694
6695		// Act
6696		let sql = op.to_sql(&SqlDialect::Postgres);
6697
6698		// Assert
6699		assert!(
6700			sql.contains("'user''s'"),
6701			"single quote in table name must be escaped: {}",
6702			sql
6703		);
6704	}
6705
6706	// ========================================================================
6707	// CreateCompositePrimaryKey — SQL rendering and edge cases
6708	// ========================================================================
6709
6710	#[rstest]
6711	#[case::postgres(SqlDialect::Postgres)]
6712	#[case::mysql(SqlDialect::Mysql)]
6713	#[case::sqlite(SqlDialect::Sqlite)]
6714	#[case::cockroachdb(SqlDialect::Cockroachdb)]
6715	fn test_composite_pk_default_name(#[case] dialect: SqlDialect) {
6716		// Arrange
6717		let op = Operation::CreateCompositePrimaryKey {
6718			table: "order_items".to_string(),
6719			columns: vec!["order_id".to_string(), "line_number".to_string()],
6720			constraint_name: None,
6721		};
6722
6723		// Act
6724		let sql = op.to_sql(&dialect);
6725
6726		// Assert
6727		assert!(
6728			sql.contains("ALTER TABLE"),
6729			"SQL should use ALTER TABLE: {}",
6730			sql
6731		);
6732		assert!(
6733			sql.contains("ADD CONSTRAINT"),
6734			"SQL should add a named constraint: {}",
6735			sql
6736		);
6737		assert!(
6738			sql.contains("PRIMARY KEY"),
6739			"SQL should add PRIMARY KEY: {}",
6740			sql
6741		);
6742		assert!(
6743			sql.contains("order_items_pkey"),
6744			"Default constraint name should be table_pkey: {}",
6745			sql
6746		);
6747		assert!(
6748			sql.contains("order_id") && sql.contains("line_number"),
6749			"Both PK columns must appear: {}",
6750			sql
6751		);
6752	}
6753
6754	#[test]
6755	fn test_composite_pk_custom_name_and_quoting() {
6756		// Arrange
6757		let op = Operation::CreateCompositePrimaryKey {
6758			table: "tbl".to_string(),
6759			columns: vec!["a".to_string(), "b".to_string()],
6760			constraint_name: Some("my_pk".to_string()),
6761		};
6762
6763		// Act
6764		let sql = op.to_sql(&SqlDialect::Postgres);
6765
6766		// Assert: pg_escape omits quotes for identifiers that need no escaping.
6767		assert_eq!(
6768			sql,
6769			"ALTER TABLE tbl ADD CONSTRAINT my_pk PRIMARY KEY (a, b);"
6770		);
6771	}
6772
6773	#[test]
6774	fn test_composite_pk_empty_columns_produces_failing_sql() {
6775		// Arrange: empty column list is invalid; we emit a deliberately
6776		// invalid SQL statement (a bare identifier) so every backend's
6777		// parser rejects it before execution, replacing the earlier
6778		// `SELECT 1/0` fallback that silently passed on SQLite and
6779		// lax-mode MySQL (reinhardt-web#4325).
6780		let op = Operation::CreateCompositePrimaryKey {
6781			table: "tbl".to_string(),
6782			columns: vec![],
6783			constraint_name: None,
6784		};
6785
6786		// Act: verify behavior on every supported dialect.
6787		for dialect in [SqlDialect::Postgres, SqlDialect::Mysql, SqlDialect::Sqlite] {
6788			let sql = op.to_sql(&dialect);
6789
6790			// Assert: the emitted statement encodes the diagnostic and is
6791			// not a syntactically valid SELECT/DDL on any backend.
6792			assert!(
6793				sql.starts_with("SYNTAX_ERROR_create_composite_pk_on_")
6794					&& sql.contains("requires_at_least_one_column"),
6795				"Empty column list must emit a syntax-error statement with diagnostic ({:?}): {}",
6796				dialect,
6797				sql
6798			);
6799			assert!(
6800				!sql.contains("SELECT 1/0"),
6801				"Must not fall back to SELECT 1/0 (silently passes on SQLite / lax MySQL): {}",
6802				sql
6803			);
6804		}
6805	}
6806
6807	// ========================================================================
6808	// column_to_sql — SQLite AUTOINCREMENT type widening (Issue #4184)
6809	//
6810	// SQLite rejects `BIGINT PRIMARY KEY AUTOINCREMENT` at apply time with:
6811	//   "AUTOINCREMENT is only allowed on an INTEGER PRIMARY KEY"
6812	// The default `BigAutoField` from CoreSettings produces FieldType::BigInteger
6813	// + auto_increment, so the SQLite emitter must widen integer widths to the
6814	// literal `INTEGER` token.
6815	// ========================================================================
6816
6817	#[rstest]
6818	#[case::big_integer(FieldType::BigInteger)]
6819	#[case::integer(FieldType::Integer)]
6820	#[case::small_integer(FieldType::SmallInteger)]
6821	fn test_column_to_sql_sqlite_auto_increment_pk_emits_integer(#[case] field_type: FieldType) {
6822		// Arrange: BigAutoField/AutoField/SmallAutoField PK with auto_increment.
6823		let mut col = ColumnDefinition::new("id", field_type);
6824		col.primary_key = true;
6825		col.auto_increment = true;
6826		col.not_null = true;
6827
6828		// Act
6829		let sql = Operation::column_to_sql(&col, &SqlDialect::Sqlite);
6830
6831		// Assert: must use the literal `INTEGER` token (not BIGINT/SMALLINT)
6832		// to satisfy SQLite's AUTOINCREMENT constraint.
6833		assert!(
6834			sql.contains("INTEGER PRIMARY KEY AUTOINCREMENT"),
6835			"SQLite auto_increment PK must emit `INTEGER PRIMARY KEY AUTOINCREMENT`: {}",
6836			sql
6837		);
6838		assert!(
6839			!sql.contains("BIGINT"),
6840			"SQLite auto_increment must not emit BIGINT (rejected by SQLite): {}",
6841			sql
6842		);
6843		assert!(
6844			!sql.contains("SMALLINT"),
6845			"SQLite auto_increment must not emit SMALLINT (rejected by SQLite): {}",
6846			sql
6847		);
6848	}
6849
6850	#[test]
6851	fn test_column_to_sql_sqlite_big_integer_without_auto_increment_no_autoincrement() {
6852		// Arrange: plain BigInteger column without auto_increment must not emit
6853		// the AUTOINCREMENT keyword. SQLite represents all integer widths as
6854		// INTEGER (storage class), so emitting INTEGER (per to_sql_for_dialect)
6855		// is correct even without auto_increment.
6856		let mut col = ColumnDefinition::new("count", FieldType::BigInteger);
6857		col.not_null = true;
6858
6859		// Act
6860		let sql = Operation::column_to_sql(&col, &SqlDialect::Sqlite);
6861
6862		// Assert
6863		assert!(
6864			!sql.contains("AUTOINCREMENT"),
6865			"Non-auto_increment column must not emit AUTOINCREMENT: {}",
6866			sql
6867		);
6868		// SQLite accepts `BIGINT` declarations via type affinity, but our emitter
6869		// normalizes integer widths to `INTEGER` for consistency with the
6870		// auto_increment path. This assertion guards that normalization, not a
6871		// SQLite-level prohibition on BIGINT.
6872		assert!(
6873			!sql.contains("BIGINT"),
6874			"emitter is expected to normalize BigInteger to INTEGER for SQLite: {}",
6875			sql
6876		);
6877	}
6878
6879	#[test]
6880	fn test_column_to_sql_postgres_big_integer_auto_increment_unchanged() {
6881		// Arrange: regression guard — Postgres path must remain GENERATED AS IDENTITY.
6882		let mut col = ColumnDefinition::new("id", FieldType::BigInteger);
6883		col.primary_key = true;
6884		col.auto_increment = true;
6885		col.not_null = true;
6886
6887		// Act
6888		let sql = Operation::column_to_sql(&col, &SqlDialect::Postgres);
6889
6890		// Assert
6891		assert!(
6892			sql.contains("BIGINT GENERATED BY DEFAULT AS IDENTITY"),
6893			"Postgres auto_increment BigInteger must emit identity syntax: {}",
6894			sql
6895		);
6896	}
6897
6898	#[test]
6899	fn test_column_to_sql_sqlite_auto_increment_uuid_pk_omits_autoincrement() {
6900		// Arrange: a UUID primary key with auto_increment=true. The `#[model]`
6901		// macro previously emitted `auto_increment="true"` for every PK
6902		// regardless of type, which produced `"id" UUID PRIMARY KEY AUTOINCREMENT`
6903		// — rejected by SQLite with "AUTOINCREMENT is only allowed on an
6904		// INTEGER PRIMARY KEY". The emitter must defend against this combination
6905		// by omitting AUTOINCREMENT when the column type was not widened to
6906		// INTEGER. See reinhardt-web#4378.
6907		let mut col = ColumnDefinition::new("id", FieldType::Uuid);
6908		col.primary_key = true;
6909		col.auto_increment = true;
6910		col.not_null = true;
6911
6912		// Act
6913		let sql = Operation::column_to_sql(&col, &SqlDialect::Sqlite);
6914
6915		// Assert: PRIMARY KEY must be emitted, AUTOINCREMENT must NOT.
6916		assert!(
6917			sql.contains("PRIMARY KEY"),
6918			"UUID PK must still emit PRIMARY KEY: {}",
6919			sql
6920		);
6921		assert!(
6922			!sql.contains("AUTOINCREMENT"),
6923			"non-integer auto_increment PK must not emit AUTOINCREMENT (SQLite rejects it): {}",
6924			sql
6925		);
6926		// SQLite's `to_sql_for_dialect(Uuid)` returns `TEXT` (the storage class
6927		// SQLite actually uses for UUIDs); the important guarantee is that the
6928		// type was NOT silently widened to `INTEGER`, which would change the
6929		// column's semantics.
6930		assert!(
6931			!sql.contains("INTEGER"),
6932			"UUID column type must not be widened to INTEGER: {}",
6933			sql
6934		);
6935	}
6936
6937	#[test]
6938	fn test_column_to_sql_without_pk_sqlite_auto_increment_emits_integer() {
6939		// Arrange: composite PK path also widens to INTEGER for SQLite.
6940		let mut col = ColumnDefinition::new("id", FieldType::BigInteger);
6941		col.auto_increment = true;
6942		col.not_null = true;
6943
6944		// Act
6945		let sql = Operation::column_to_sql_without_pk(&col, &SqlDialect::Sqlite);
6946
6947		// Assert
6948		assert!(
6949			sql.contains("INTEGER"),
6950			"SQLite auto_increment column (composite PK path) must emit INTEGER: {}",
6951			sql
6952		);
6953		assert!(
6954			!sql.contains("BIGINT"),
6955			"SQLite auto_increment must not emit BIGINT in composite PK path: {}",
6956			sql
6957		);
6958	}
6959
6960	mod resolve_foreign_key_column_type_tests {
6961		use super::super::resolve_foreign_key_column_type_with;
6962		use super::FieldType;
6963		use crate::migrations::autodetector::FieldState;
6964		use crate::migrations::model_registry::{FieldMetadata, ModelMetadata, ModelRegistry};
6965
6966		/// Helper: build a target model registered under `(app, name)`
6967		/// whose PK column is of `pk_type`.
6968		fn target_model(app: &str, name: &str, table: &str, pk_type: FieldType) -> ModelMetadata {
6969			let mut meta = ModelMetadata::new(app, name, table);
6970			meta.add_field(
6971				"id".to_string(),
6972				FieldMetadata::new(pk_type).with_param("primary_key", "true"),
6973			);
6974			meta
6975		}
6976
6977		/// Helper: build a `ForeignKeyField`-style FieldState whose
6978		/// `fk_target` (and optionally `fk_target_app`) drive the
6979		/// resolver.
6980		fn fk_field_state(target_model: &str, target_app: Option<&str>) -> FieldState {
6981			let mut fs = FieldState::new("owner_id", FieldType::Uuid, false);
6982			fs.params
6983				.insert("fk_target".to_string(), target_model.to_string());
6984			if let Some(app) = target_app {
6985				fs.params
6986					.insert("fk_target_app".to_string(), app.to_string());
6987			}
6988			fs
6989		}
6990
6991		#[test]
6992		fn qualified_hit_resolves_to_target_pk_type() {
6993			// Arrange
6994			let registry = ModelRegistry::new();
6995			registry.register_model(target_model(
6996				"auth",
6997				"User",
6998				"auth_user",
6999				FieldType::BigInteger,
7000			));
7001			let fs = fk_field_state("User", Some("auth"));
7002
7003			// Act
7004			let resolved = resolve_foreign_key_column_type_with(&fs, &registry);
7005
7006			// Assert: qualified lookup hits and returns the target's PK type.
7007			assert_eq!(resolved, Some(FieldType::BigInteger));
7008		}
7009
7010		#[test]
7011		fn qualified_miss_falls_back_to_by_name_when_unambiguous() {
7012			// Arrange: target registered under a different app than the
7013			// macro emitted (simulates the `use`-import edge case).
7014			let registry = ModelRegistry::new();
7015			registry.register_model(target_model(
7016				"reinhardt_auth",
7017				"User",
7018				"auth_user",
7019				FieldType::Uuid,
7020			));
7021			// Macro emitted the current crate's app, which is wrong here.
7022			let fs = fk_field_state("User", Some("blog"));
7023
7024			// Act
7025			let resolved = resolve_foreign_key_column_type_with(&fs, &registry);
7026
7027			// Assert: by-name fallback resolves to the only registered
7028			// `User` model, preserving the pre-#4436 resolution path.
7029			assert_eq!(resolved, Some(FieldType::Uuid));
7030		}
7031
7032		#[test]
7033		fn ambiguous_by_name_returns_none() {
7034			// Arrange: two apps register the same model name.
7035			let registry = ModelRegistry::new();
7036			registry.register_model(target_model(
7037				"auth",
7038				"User",
7039				"auth_user",
7040				FieldType::BigInteger,
7041			));
7042			registry.register_model(target_model(
7043				"billing",
7044				"User",
7045				"billing_user",
7046				FieldType::Uuid,
7047			));
7048			// No `fk_target_app` -> straight to by-name lookup.
7049			let fs = fk_field_state("User", None);
7050
7051			// Act
7052			let resolved = resolve_foreign_key_column_type_with(&fs, &registry);
7053
7054			// Assert: conservative `None` rather than silently picking
7055			// one of the two `User` models.
7056			assert_eq!(resolved, None);
7057		}
7058
7059		#[test]
7060		fn path_typed_disambiguates_ambiguous_name() {
7061			// Arrange: two apps register `User`. The user wrote a
7062			// path-typed FK target (`ForeignKeyField<reinhardt_auth::User>`),
7063			// so the macro emits `fk_target_app="reinhardt_auth"` —
7064			// trusted as a user-explicit qualifier. The resolver must
7065			// use it to pick the correct `User`, not the unrelated
7066			// `blog.User`.
7067			let registry = ModelRegistry::new();
7068			registry.register_model(target_model(
7069				"blog",
7070				"User",
7071				"blog_user",
7072				FieldType::BigInteger,
7073			));
7074			registry.register_model(target_model(
7075				"reinhardt_auth",
7076				"User",
7077				"reinhardt_auth_user",
7078				FieldType::Uuid,
7079			));
7080			let fs = fk_field_state("User", Some("reinhardt_auth"));
7081
7082			// Act
7083			let resolved = resolve_foreign_key_column_type_with(&fs, &registry);
7084
7085			// Assert: qualified hit picks `reinhardt_auth.User`
7086			// (FieldType::Uuid), not `blog.User` (FieldType::BigInteger).
7087			assert_eq!(resolved, Some(FieldType::Uuid));
7088		}
7089
7090		#[test]
7091		fn qualified_miss_with_ambiguous_by_name_returns_none() {
7092			// Arrange: qualified lookup misses AND the by-name fallback
7093			// is itself ambiguous. The resolver must still refuse to
7094			// guess.
7095			let registry = ModelRegistry::new();
7096			registry.register_model(target_model(
7097				"auth",
7098				"User",
7099				"auth_user",
7100				FieldType::BigInteger,
7101			));
7102			registry.register_model(target_model(
7103				"billing",
7104				"User",
7105				"billing_user",
7106				FieldType::Uuid,
7107			));
7108			let fs = fk_field_state("User", Some("blog")); // misses; falls back; ambiguous.
7109
7110			// Act
7111			let resolved = resolve_foreign_key_column_type_with(&fs, &registry);
7112
7113			// Assert
7114			assert_eq!(resolved, None);
7115		}
7116
7117		#[test]
7118		fn no_fk_target_param_returns_none() {
7119			// Arrange: a non-FK field has no `fk_target` param.
7120			let registry = ModelRegistry::new();
7121			registry.register_model(target_model(
7122				"auth",
7123				"User",
7124				"auth_user",
7125				FieldType::BigInteger,
7126			));
7127			let fs = FieldState::new("name", FieldType::VarChar(64), false);
7128
7129			// Act
7130			let resolved = resolve_foreign_key_column_type_with(&fs, &registry);
7131
7132			// Assert
7133			assert_eq!(resolved, None);
7134		}
7135	}
7136}