reinhardt-db 0.1.1

Django-style database layer for Reinhardt framework
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
1288
1289
1290
1291
1292
1293
1294
1295
1296
1297
1298
1299
1300
1301
1302
1303
1304
1305
1306
1307
1308
1309
1310
1311
1312
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323
1324
1325
1326
1327
1328
1329
1330
1331
1332
1333
1334
1335
1336
1337
1338
1339
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
1750
1751
1752
1753
1754
1755
1756
1757
1758
1759
1760
1761
1762
1763
1764
1765
1766
1767
1768
1769
1770
1771
1772
1773
1774
1775
1776
1777
1778
1779
1780
1781
1782
1783
1784
1785
1786
1787
1788
1789
1790
1791
1792
1793
1794
1795
1796
1797
1798
1799
1800
1801
1802
1803
1804
1805
1806
1807
1808
1809
1810
1811
1812
1813
1814
1815
1816
1817
1818
1819
1820
1821
1822
1823
1824
1825
1826
1827
1828
1829
1830
1831
1832
1833
1834
1835
1836
1837
1838
1839
1840
1841
1842
1843
1844
1845
1846
1847
1848
1849
1850
1851
1852
1853
1854
1855
1856
1857
1858
1859
1860
1861
1862
1863
1864
1865
1866
1867
1868
1869
1870
1871
1872
1873
1874
1875
1876
1877
1878
1879
1880
1881
1882
1883
1884
1885
1886
1887
1888
1889
1890
1891
1892
1893
1894
1895
1896
1897
1898
1899
1900
1901
1902
1903
1904
1905
1906
1907
1908
1909
1910
1911
1912
1913
1914
1915
1916
1917
1918
1919
1920
1921
1922
1923
1924
1925
1926
1927
1928
1929
1930
1931
1932
1933
1934
1935
1936
1937
1938
1939
1940
1941
1942
1943
1944
1945
1946
1947
1948
1949
1950
1951
1952
1953
1954
1955
1956
1957
1958
1959
1960
1961
1962
1963
1964
1965
1966
1967
1968
1969
1970
1971
1972
1973
1974
1975
1976
1977
1978
1979
1980
1981
1982
1983
1984
1985
1986
1987
1988
1989
1990
1991
1992
1993
1994
1995
1996
1997
1998
1999
2000
2001
2002
2003
2004
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026
2027
2028
2029
2030
2031
2032
2033
2034
2035
2036
2037
2038
2039
2040
2041
2042
2043
2044
2045
2046
2047
2048
2049
2050
2051
2052
2053
2054
2055
2056
2057
2058
2059
2060
2061
2062
2063
2064
2065
2066
2067
2068
2069
2070
2071
2072
2073
2074
2075
2076
2077
2078
2079
2080
2081
2082
2083
2084
2085
2086
2087
2088
2089
2090
2091
2092
2093
2094
2095
2096
2097
2098
2099
2100
2101
2102
2103
2104
2105
2106
2107
2108
2109
2110
2111
2112
2113
2114
2115
2116
2117
2118
2119
2120
2121
2122
2123
2124
2125
2126
2127
2128
2129
2130
2131
2132
2133
2134
2135
2136
2137
2138
2139
2140
2141
2142
2143
2144
2145
2146
2147
2148
2149
2150
2151
2152
2153
2154
2155
2156
2157
2158
2159
2160
2161
2162
2163
2164
2165
2166
2167
2168
//! Migration executor
//!
//! Translated from Django's db/migrations/executor.py

// Allow unused_imports: ForeignKeyAction is used in database-specific code
// that may be conditionally compiled based on feature flags
#[allow(unused_imports)]
use super::{
	DatabaseMigrationRecorder, ForeignKeyAction, Migration, MigrationError, MigrationPlan,
	MigrationService, Operation, Result, SchemaEditor, operations::SqlDialect,
};
use crate::backends::{connection::DatabaseConnection, types::DatabaseType};
use indexmap::IndexMap;
use std::collections::HashSet;

#[cfg(feature = "sqlite")]
use super::introspection::SQLiteIntrospector;

/// Split SQL string into individual statements while handling:
/// - String literals (single/double quotes)
/// - Comments (line and block)
/// - PostgreSQL dollar-quotes ($$...$$)
fn split_sql_statements(sql: &str) -> Vec<String> {
	let mut statements = Vec::new();
	let mut current = String::new();
	let mut chars = sql.chars().peekable();

	#[derive(Debug, PartialEq)]
	enum State {
		Normal,
		SingleQuote,
		DoubleQuote,
		LineComment,
		BlockComment,
		DollarQuote(String),
	}

	let mut state = State::Normal;

	while let Some(ch) = chars.next() {
		match state {
			State::Normal => {
				if ch == '\'' {
					current.push(ch);
					state = State::SingleQuote;
				} else if ch == '"' {
					current.push(ch);
					state = State::DoubleQuote;
				} else if ch == '-' && chars.peek() == Some(&'-') {
					current.push(ch);
					current.push(chars.next().unwrap());
					state = State::LineComment;
				} else if ch == '/' && chars.peek() == Some(&'*') {
					current.push(ch);
					current.push(chars.next().unwrap());
					state = State::BlockComment;
				} else if ch == '$' {
					// Potential dollar-quote start
					let mut tag = String::from("$");
					current.push(ch);

					// Collect tag until next $
					while let Some(&next_ch) = chars.peek() {
						if next_ch == '$' {
							tag.push(chars.next().unwrap());
							current.push('$');
							state = State::DollarQuote(tag);
							break;
						} else if next_ch.is_alphanumeric() || next_ch == '_' {
							tag.push(chars.next().unwrap());
							current.push(next_ch);
						} else {
							// Not a valid dollar-quote tag
							break;
						}
					}
				} else if ch == ';' {
					// Statement separator - save current statement if non-empty
					let trimmed = current.trim();
					if !trimmed.is_empty() {
						statements.push(trimmed.to_string());
					}
					current.clear();
				} else {
					current.push(ch);
				}
			}
			State::SingleQuote => {
				current.push(ch);
				if ch == '\'' {
					// Check for escaped quote ''
					if chars.peek() == Some(&'\'') {
						current.push(chars.next().unwrap());
					} else {
						state = State::Normal;
					}
				} else if ch == '\\' && chars.peek().is_some() {
					// Escaped character
					current.push(chars.next().unwrap());
				}
			}
			State::DoubleQuote => {
				current.push(ch);
				if ch == '"' {
					state = State::Normal;
				} else if ch == '\\' && chars.peek().is_some() {
					// Escaped character
					current.push(chars.next().unwrap());
				}
			}
			State::LineComment => {
				current.push(ch);
				if ch == '\n' {
					state = State::Normal;
				}
			}
			State::BlockComment => {
				current.push(ch);
				if ch == '*' && chars.peek() == Some(&'/') {
					current.push(chars.next().unwrap());
					state = State::Normal;
				}
			}
			State::DollarQuote(ref tag) => {
				current.push(ch);
				// Check if we're at the closing tag
				if ch == '$' {
					let mut potential_close = String::from("$");
					let mut temp_chars = vec![];

					// Collect potential closing tag
					while let Some(&next_ch) = chars.peek() {
						if next_ch == '$' {
							potential_close.push(chars.next().unwrap());
							temp_chars.push('$');
							break;
						} else if potential_close.len() < tag.len()
							&& (next_ch.is_alphanumeric() || next_ch == '_')
						{
							potential_close.push(chars.next().unwrap());
							temp_chars.push(next_ch);
						} else {
							break;
						}
					}

					// Add collected characters to current
					for temp_ch in &temp_chars {
						current.push(*temp_ch);
					}

					// Check if it matches the opening tag
					if potential_close == *tag {
						state = State::Normal;
					}
				}
			}
		}
	}

	// Add final statement if non-empty
	let trimmed = current.trim();
	if !trimmed.is_empty() {
		statements.push(trimmed.to_string());
	}

	statements
}

#[derive(Debug)]
/// Represents a execution result.
pub struct ExecutionResult {
	/// The applied.
	pub applied: Vec<String>,
	/// The failed.
	pub failed: Option<String>,
}

/// Migration executor using DatabaseConnection (supports multiple database types)
pub struct DatabaseMigrationExecutor {
	connection: DatabaseConnection,
	recorder: DatabaseMigrationRecorder,
	db_type: DatabaseType,
}

impl DatabaseMigrationExecutor {
	/// Create a new migration executor with DatabaseConnection
	///
	/// The database type is automatically detected from the connection.
	///
	/// # Examples
	///
	/// ```rust,ignore
	/// use reinhardt_db::migrations::executor::DatabaseMigrationExecutor;
	/// use reinhardt_db::backends::DatabaseConnection;
	///
	/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
	/// // Example: connecting to a PostgreSQL database
	/// let db = DatabaseConnection::connect_postgres("postgres://localhost/mydb").await.unwrap();
	/// let executor = DatabaseMigrationExecutor::new(db.clone());
	/// // Database type is automatically detected as PostgreSQL
	/// # });
	/// ```
	pub fn new(connection: DatabaseConnection) -> Self {
		let db_type = connection.database_type();
		let recorder = DatabaseMigrationRecorder::new(connection.clone());
		Self {
			connection,
			recorder,
			db_type,
		}
	}

	/// Get a reference to the database connection
	pub fn connection(&self) -> &DatabaseConnection {
		&self.connection
	}

	/// Get the database type
	pub fn database_type(&self) -> DatabaseType {
		self.db_type
	}

	/// Check if a table exists in the database
	///
	/// # Examples
	///
	/// ```ignore
	/// use reinhardt_db::migrations::executor::DatabaseMigrationExecutor;
	/// use reinhardt_db::backends::DatabaseConnection;
	///
	/// # async fn example() {
	/// let db = DatabaseConnection::connect_postgres("postgres://localhost/mydb").await.unwrap();
	/// let executor = DatabaseMigrationExecutor::new(db);
	/// let exists = executor.table_exists("users").await.unwrap();
	/// # }
	/// ```
	async fn table_exists(&self, table_name: &str) -> Result<bool> {
		use reinhardt_query::prelude::{
			Alias, Cond, Expr, ExprTrait, MySqlQueryBuilder, PostgresQueryBuilder, Query,
			QueryStatementBuilder, SqliteQueryBuilder,
		};

		match self.db_type {
			DatabaseType::Postgres => {
				// Build parameterized query using reinhardt-query
				let subquery = Query::select()
					.expr(Expr::asterisk())
					.from((Alias::new("information_schema"), Alias::new("tables")))
					.cond_where(
						Cond::all()
							.add(Expr::col(Alias::new("table_schema")).eq("public"))
							.add(Expr::col(Alias::new("table_name")).eq(table_name)),
					)
					.to_owned();

				let query_str = format!(
					"SELECT EXISTS ({})",
					subquery.to_string(PostgresQueryBuilder)
				);

				// For PostgreSQL, EXISTS returns a boolean value
				let result = self.connection.fetch_one(&query_str, vec![]).await?;
				match result.data.get("exists") {
					Some(crate::backends::types::QueryValue::Bool(b)) => Ok(*b),
					_ => Ok(false),
				}
			}
			DatabaseType::Sqlite => {
				// Build parameterized query using reinhardt-query
				let query = Query::select()
					.column(Alias::new("name"))
					.from(Alias::new("sqlite_master"))
					.cond_where(
						Cond::all()
							.add(Expr::col(Alias::new("type")).eq("table"))
							.add(Expr::col(Alias::new("name")).eq(table_name)),
					)
					.to_owned();

				let query_str = query.to_string(SqliteQueryBuilder);

				// For SQLite, check if any row is returned
				let result = self.connection.fetch_optional(&query_str, vec![]).await?;
				Ok(result.is_some())
			}
			DatabaseType::Mysql => {
				// Build parameterized query using reinhardt-query
				let query = Query::select()
					.column(Alias::new("TABLE_NAME"))
					.from((Alias::new("information_schema"), Alias::new("tables")))
					.cond_where(
						Cond::all()
							.add(Expr::col(Alias::new("table_schema")).eq(Expr::cust("DATABASE()")))
							.add(Expr::col(Alias::new("table_name")).eq(table_name)),
					)
					.to_owned();

				let query_str = query.to_string(MySqlQueryBuilder);

				// For MySQL, check if any row is returned
				let result = self.connection.fetch_optional(&query_str, vec![]).await?;
				Ok(result.is_some())
			}
		}
	}

	/// Performs the apply migrations operation.
	pub async fn apply_migrations(&mut self, migrations: &[Migration]) -> Result<ExecutionResult> {
		let mut applied = Vec::new();

		// Ensure the migration recorder table exists
		self.recorder.ensure_schema_table().await?;

		// Build MigrationGraph for dependency resolution
		let mut graph = super::graph::MigrationGraph::new();

		for migration in migrations {
			let key = super::graph::MigrationKey::new(
				migration.app_label.clone(),
				migration.name.clone(),
			);
			let deps: Vec<super::graph::MigrationKey> = migration
				.dependencies
				.iter()
				.map(|(app, name)| super::graph::MigrationKey::new(app.clone(), name.clone()))
				.collect();

			graph.add_migration(key, deps);
		}

		// Perform topological sort (automatically detects circular dependencies)
		let sorted_keys = graph.topological_sort()?;

		// Apply migrations in dependency-resolved order
		for key in sorted_keys {
			// Find the migration corresponding to this key
			let migration = migrations
				.iter()
				.find(|m| m.app_label == key.app_label && m.name == key.name)
				.ok_or_else(|| {
					MigrationError::DependencyError(format!("Migration not found: {}", key.id()))
				})?;

			// Check if already applied
			if self
				.recorder
				.is_applied(&migration.app_label, &migration.name)
				.await?
			{
				continue;
			}

			// Apply migration operations
			self.apply_migration(migration).await?;

			// Record migration as applied
			self.recorder
				.record_applied(&migration.app_label, &migration.name)
				.await?;

			applied.push(migration.id());
		}

		Ok(ExecutionResult {
			applied,
			failed: None,
		})
	}

	/// Rollback (unapply) a list of migrations
	///
	/// Migrations are rolled back in reverse order (newest first).
	///
	/// # Examples
	///
	/// ```no_run
	/// use reinhardt_db::migrations::{Migration, executor::DatabaseMigrationExecutor};
	/// use reinhardt_db::backends::DatabaseConnection;
	///
	/// # async fn example() {
	/// let connection = DatabaseConnection::connect_postgres("postgres://localhost/mydb").await.unwrap();
	/// let mut executor = DatabaseMigrationExecutor::new(connection);
	///
	/// let migrations = vec![Migration::new("0001_initial", "myapp")];
	/// let result = executor.rollback_migrations(&migrations).await.unwrap();
	/// # }
	/// # tokio::runtime::Runtime::new().unwrap().block_on(example());
	/// ```
	pub async fn rollback_migrations(
		&mut self,
		migrations: &[Migration],
	) -> Result<ExecutionResult> {
		let mut rolledback = Vec::new();

		// Ensure the migration recorder table exists
		self.recorder.ensure_schema_table().await?;

		// Process migrations in reverse order (newest first)
		for migration in migrations.iter().rev() {
			// Check if migration is actually applied
			let is_applied = self
				.recorder
				.is_applied(&migration.app_label, &migration.name)
				.await?;

			if !is_applied {
				continue;
			}

			// Rollback the migration
			self.rollback_migration(migration).await?;

			// Remove from recorder
			self.recorder
				.unapply(&migration.app_label, &migration.name)
				.await?;

			rolledback.push(migration.id());
		}

		Ok(ExecutionResult {
			applied: rolledback,
			failed: None,
		})
	}

	/// Rollback a single migration
	async fn rollback_migration(&mut self, migration: &Migration) -> Result<()> {
		// Skip database operations if state_only flag is set
		if migration.state_only {
			tracing::debug!(
				"Skipping database operations for migration '{}' (state_only=true)",
				migration.id()
			);
			return Ok(());
		}

		// Determine SQL dialect
		let dialect = match self.connection.database_type() {
			crate::backends::types::DatabaseType::Postgres => SqlDialect::Postgres,
			crate::backends::types::DatabaseType::Mysql => SqlDialect::Mysql,
			crate::backends::types::DatabaseType::Sqlite => SqlDialect::Sqlite,
		};

		// Create SchemaEditor for atomic operations
		let mut editor = SchemaEditor::new(
			self.connection.clone(),
			migration.atomic,
			self.connection.database_type(),
		)
		.await?;

		// Process operations in reverse order
		let project_state = super::ProjectState::default();

		for operation in migration.operations.iter().rev() {
			// Check if SQLite and reverse operation requires recreation
			#[cfg(feature = "sqlite")]
			if matches!(dialect, SqlDialect::Sqlite)
				&& operation.reverse_requires_sqlite_recreation()
			{
				// Get the reverse operation and use table recreation
				if let Some(reverse_op) = operation.to_reverse_operation(&project_state)? {
					tracing::debug!("=== SQLite Recreation for reverse of {:?} ===", operation);
					self.handle_sqlite_recreation(&reverse_op, &mut editor)
						.await?;
					tracing::debug!("✅ SQLite recreation for reverse operation completed");
					continue;
				} else {
					tracing::warn!(
						"Cannot generate reverse operation for SQLite recreation: {:?}",
						operation
					);
					// Fall through to standard SQL execution
				}
			}

			// Standard reverse SQL execution
			let reverse_sql = operation.to_reverse_sql(&dialect, &project_state)?;

			if let Some(sql) = reverse_sql {
				tracing::debug!("=== Reverse SQL for {:?} ===", operation);
				tracing::debug!("{}", sql);

				// Execute reverse SQL using SchemaEditor
				editor.execute(&sql).await?;

				tracing::debug!("✅ Reverse operation executed successfully");
			} else {
				tracing::warn!(
					"No reverse SQL available for operation in migration '{}': {:?}",
					migration.id(),
					operation
				);
			}
		}

		// Commit SchemaEditor changes
		editor.finish().await?;

		Ok(())
	}

	/// Apply a single migration with atomic transaction support
	///
	/// If the migration's `atomic` flag is true and the database supports
	/// transactional DDL (PostgreSQL, SQLite), all operations are wrapped
	/// in a transaction that can be rolled back on failure.
	///
	/// For databases that don't support transactional DDL (MySQL), operations
	/// are executed directly without transaction wrapping, and a warning is logged.
	async fn apply_migration(&self, migration: &Migration) -> Result<()> {
		// Skip database operations if state_only flag is set
		// (Django's SeparateDatabaseAndState equivalent with state_operations only)
		if migration.state_only {
			tracing::debug!(
				"Skipping database operations for migration '{}' (state_only=true)",
				migration.id()
			);
			return Ok(());
		}

		let dialect = match self.db_type {
			DatabaseType::Postgres => SqlDialect::Postgres,
			DatabaseType::Sqlite => SqlDialect::Sqlite,
			DatabaseType::Mysql => SqlDialect::Mysql,
		};

		// Create schema editor with atomic support based on migration's atomic flag
		let mut editor =
			SchemaEditor::new(self.connection.clone(), migration.atomic, self.db_type).await?;

		// Log if database_only flag is set
		// Note: ProjectState tracking during migration execution is a planned enhancement.
		// Currently, state is not tracked during apply_migration. For rollback operations,
		// use to_reverse_sql with a pre-operation ProjectState snapshot.
		if migration.database_only {
			tracing::debug!(
				"Skipping ProjectState updates for migration '{}' (database_only=true)",
				migration.id()
			);
		}

		tracing::debug!(
			"Applying migration '{}' (atomic={}, effective_atomic={})",
			migration.id(),
			migration.atomic,
			editor.is_atomic()
		);

		// Execute operations through schema editor
		for operation in &migration.operations {
			// Handle SQLite table recreation for incompatible operations
			#[cfg(feature = "sqlite")]
			if matches!(dialect, SqlDialect::Sqlite) && operation.requires_sqlite_recreation() {
				self.handle_sqlite_recreation(operation, &mut editor)
					.await?;
				continue;
			}

			// Check if this is a CreateTable operation and if the table already
			// exists. The check MUST run through the schema editor (and thus
			// through the editor's open transaction, if any) — otherwise the
			// pool may pick a different physical connection whose schema cache
			// is stale w.r.t. DDL committed earlier in this same migration.
			//
			// On SQLite, an introspection read on a separate pool connection
			// after a DDL on the transaction's connection raises SQLITE_SCHEMA
			// (code 262, "database schema is locked") because the schema
			// cookie has been bumped on the writer but the reader's prepared
			// statement cache is stale. See reinhardt-web#4584.
			if let Operation::CreateTable { name, .. } = operation {
				let table_exists = editor.table_exists(name).await?;
				if table_exists {
					tracing::info!(
						"Table '{}' already exists, skipping CREATE TABLE operation",
						name
					);
					continue;
				}
			}

			let sql = operation.to_sql(&dialect);

			tracing::debug!(
				"Executing migration SQL (length={}, semicolons={})",
				sql.len(),
				sql.matches(';').count()
			);

			// Split SQL into individual statements to handle PostgreSQL's
			// prepared statement limitation (cannot execute multiple commands)
			let statements = split_sql_statements(&sql);

			tracing::debug!("Split into {} statements", statements.len());

			for (i, statement) in statements.iter().enumerate() {
				if !statement.trim().is_empty() {
					tracing::debug!(
						"Statement {} (length: {} chars): {}",
						i + 1,
						statement.len(),
						&statement[..statement.len().min(100)]
					);

					editor.execute(statement).await.map_err(|e| {
						tracing::error!(
							"Migration operation failed: {}. SQL: {}",
							e,
							&statement[..statement.len().min(200)]
						);
						e
					})?;

					tracing::debug!("Statement {} executed successfully", i + 1);
				}
			}
		}

		// Finish (commits if atomic)
		editor.finish().await?;

		tracing::debug!("Migration '{}' applied successfully", migration.id());

		Ok(())
	}

	/// Apply migrations from a MigrationPlan
	///
	/// # Examples
	///
	/// ```no_run
	/// use reinhardt_db::migrations::{MigrationPlan, executor::DatabaseMigrationExecutor};
	/// use reinhardt_db::backends::DatabaseConnection;
	///
	/// # tokio::runtime::Runtime::new().unwrap().block_on(async {
	/// // Example: connecting to a PostgreSQL database
	/// let db = DatabaseConnection::connect_postgres("postgres://localhost/mydb").await.unwrap();
	/// let mut executor = DatabaseMigrationExecutor::new(db);
	///
	/// let plan = MigrationPlan::new();
	/// let result = executor.apply(&plan).await.unwrap();
	/// # });
	/// ```
	pub async fn apply(&mut self, plan: &MigrationPlan) -> Result<ExecutionResult> {
		let mut applied = Vec::new();

		// Ensure the migration recorder table exists
		self.recorder.ensure_schema_table().await?;

		let dialect = match self.db_type {
			DatabaseType::Postgres => SqlDialect::Postgres,
			DatabaseType::Sqlite => SqlDialect::Sqlite,
			DatabaseType::Mysql => SqlDialect::Mysql,
		};

		for migration in &plan.migrations {
			// Check if already applied
			if self
				.recorder
				.is_applied(&migration.app_label, &migration.name)
				.await?
			{
				continue;
			}

			// Apply migration
			for operation in &migration.operations {
				// Check if this is a CreateTable operation and if the table already exists
				if let Operation::CreateTable { name, .. } = operation {
					let table_exists = self.table_exists(name).await?;
					if table_exists {
						eprintln!(
							"⏭️  Table '{}' already exists, skipping CREATE TABLE operation",
							name
						);
						continue;
					}
				}

				let sql = operation.to_sql(&dialect);

				// Split SQL into individual statements to handle PostgreSQL's
				// prepared statement limitation (cannot execute multiple commands)
				for statement in split_sql_statements(&sql) {
					if !statement.trim().is_empty() {
						self.connection.execute(&statement, vec![]).await?;
					}
				}
			}

			// Record migration as applied
			self.recorder
				.record_applied(&migration.app_label, &migration.name)
				.await?;

			applied.push(migration.id());
		}

		Ok(ExecutionResult {
			applied,
			failed: None,
		})
	}

	/// Build migration plan - returns list of migrations to apply
	///
	/// Returns (app_label, migration_name) tuples in dependency order
	// Allow dead_code: public API for migration CLI tooling to preview pending migrations
	#[allow(dead_code)]
	pub async fn build_plan(&self, service: &MigrationService) -> Result<Vec<(String, String)>> {
		let graph = service.build_dependency_graph().await?;
		let mut plan = Vec::new();

		for migration in graph {
			let is_applied = self
				.recorder
				.is_applied(&migration.app_label, &migration.name)
				.await?;

			if !is_applied {
				plan.push((migration.app_label.to_string(), migration.name.to_string()));
			}
		}

		Ok(plan)
	}

	/// Read columns + constraints for a SQLite table using the SchemaEditor's
	/// currently-open transaction (if any), falling back to the live pool when
	/// the editor is non-atomic.
	///
	/// This exists because `SQLiteIntrospector` issues every query through the
	/// underlying `sqlx::SqlitePool`, which transparently picks a *different*
	/// physical connection than the one holding the editor's open transaction.
	/// That second connection cannot see uncommitted DDL — so a recreation
	/// triggered later in the same migration would rebuild the table from a
	/// stale column list and silently discard the just-`ALTER`'d column.
	/// See reinhardt-web#4447.
	#[cfg(feature = "sqlite")]
	async fn read_sqlite_table_via_editor(
		editor: &mut SchemaEditor,
		table_name: &str,
	) -> Result<(Vec<super::ColumnDefinition>, Vec<super::Constraint>)> {
		// 1. PRAGMA table_info(<table>) → columns. Identifier interpolation
		//    via the shared `sqlite_pragma` helper. See issue #4454.
		// nosemgrep: rust.actix.sql.sqlx-taint.sqlx-taint
		let table_info_sql = format!(
			"PRAGMA table_info({})",
			super::sqlite_pragma::quote_pragma_identifier(table_name)
		);
		let info_rows = editor.fetch_all(&table_info_sql, vec![]).await?;

		// Collect rows into typed records first so we can detect AUTOINCREMENT.
		struct ColRow {
			name: String,
			type_str: String,
			notnull: i64,
			default: Option<String>,
			pk: i64,
		}
		let mut col_rows: Vec<ColRow> = Vec::with_capacity(info_rows.len());
		for row in &info_rows {
			let name: String = row
				.get("name")
				.map_err(|e| MigrationError::IntrospectionError(format!("table_info name: {e}")))?;
			let type_str: String = row.get("type").unwrap_or_default();
			let notnull: i64 = row.get("notnull").unwrap_or(0);
			let default: Option<String> = row.get("dflt_value").ok();
			let pk: i64 = row.get("pk").unwrap_or(0);
			col_rows.push(ColRow {
				name,
				type_str,
				notnull,
				default,
				pk,
			});
		}

		// 2. CREATE TABLE SQL → detect AUTOINCREMENT and parse named
		//    constraint metadata (FK names + CHECK constraints).
		let create_sql_row = editor
			.fetch_optional(
				"SELECT sql FROM sqlite_master WHERE type='table' AND name=?",
				vec![table_name.into()],
			)
			.await?;
		let create_sql: Option<String> = create_sql_row.and_then(|r| r.get("sql").ok());
		let has_autoincrement = create_sql
			.as_ref()
			.map(|sql| sql.to_uppercase().contains("AUTOINCREMENT"))
			.unwrap_or(false);

		// 3. Build ColumnDefinition list, mirroring the introspector's
		//    semantics (PK columns are implicitly NOT NULL; AUTOINCREMENT is
		//    only meaningful on PK columns).
		let mut columns: Vec<super::ColumnDefinition> = col_rows
			.iter()
			.map(|c| {
				let is_pk = c.pk > 0;
				let is_auto = is_pk && has_autoincrement;
				let nullable = if is_pk { false } else { c.notnull == 0 };
				// Preserve `dflt_value` verbatim as the raw SQL fragment
				// (e.g. `'pending'` including surrounding quotes). The
				// downstream `format!("DEFAULT {}", default)` paths in
				// `operations.rs` then emit valid DDL (`DEFAULT 'pending'`,
				// not the previously broken `DEFAULT pending`). See
				// `super::sqlite_pragma` and issue #4454.
				let default = c
					.default
					.as_ref()
					.map(|v| super::sqlite_pragma::normalize_default_value(v));
				super::ColumnDefinition {
					name: c.name.clone(),
					type_definition: SQLiteIntrospector::parse_sqlite_type(&c.type_str),
					not_null: !nullable,
					unique: false,
					primary_key: is_pk,
					auto_increment: is_auto,
					default,
				}
			})
			.collect();

		// Preserve the introspector's ordering: PK first, then by name. This
		// matters because `SqliteTableRecreation` uses column order to emit
		// the new CREATE TABLE and the INSERT SELECT.
		columns.sort_by(|a, b| {
			if a.primary_key && !b.primary_key {
				std::cmp::Ordering::Less
			} else if !a.primary_key && b.primary_key {
				std::cmp::Ordering::Greater
			} else {
				a.name.cmp(&b.name)
			}
		});

		// 4. Foreign keys via PRAGMA foreign_key_list(<table>), grouped by
		//    id. Identifier interpolation via the shared `sqlite_pragma`
		//    helper. See issue #4454.
		// nosemgrep: rust.actix.sql.sqlx-taint.sqlx-taint
		let fk_sql = format!(
			"PRAGMA foreign_key_list({})",
			super::sqlite_pragma::quote_pragma_identifier(table_name)
		);
		let fk_rows = editor.fetch_all(&fk_sql, vec![]).await?;

		struct FkRow {
			id: i64,
			seq: i64,
			table: String,
			from: String,
			to: String,
			on_update: String,
			on_delete: String,
		}
		let parsed_fks: Vec<FkRow> = fk_rows
			.iter()
			.map(|row| FkRow {
				id: row.get("id").unwrap_or(0),
				seq: row.get("seq").unwrap_or(0),
				table: row.get("table").unwrap_or_default(),
				from: row.get("from").unwrap_or_default(),
				to: row.get("to").unwrap_or_default(),
				on_update: row.get("on_update").unwrap_or_default(),
				on_delete: row.get("on_delete").unwrap_or_default(),
			})
			.collect();

		let named_fks = create_sql
			.as_ref()
			.map(|sql| SQLiteIntrospector::parse_fk_constraint_names(sql))
			.unwrap_or_default();

		let mut fk_groups: std::collections::HashMap<i64, Vec<FkRow>> =
			std::collections::HashMap::new();
		for r in parsed_fks {
			fk_groups.entry(r.id).or_default().push(r);
		}

		fn fk_action(s: &str) -> super::ForeignKeyAction {
			match s {
				"CASCADE" => super::ForeignKeyAction::Cascade,
				"SET NULL" => super::ForeignKeyAction::SetNull,
				"SET DEFAULT" => super::ForeignKeyAction::SetDefault,
				"NO ACTION" => super::ForeignKeyAction::NoAction,
				_ => super::ForeignKeyAction::Restrict,
			}
		}

		let mut constraints: Vec<super::Constraint> = Vec::new();
		for (fk_id, mut group) in fk_groups {
			group.sort_by_key(|r| r.seq);
			let referenced_table = group[0].table.clone();
			let columns_from: Vec<String> = group.iter().map(|r| r.from.clone()).collect();
			let columns_to: Vec<String> = group.iter().map(|r| r.to.clone()).collect();
			let signature = (columns_from.clone(), referenced_table.clone());
			let name = named_fks
				.get(&signature)
				.cloned()
				.unwrap_or_else(|| format!("fk_{}_{}", table_name, fk_id));
			constraints.push(super::Constraint::ForeignKey {
				name,
				columns: columns_from,
				referenced_table,
				referenced_columns: columns_to,
				on_delete: fk_action(&group[0].on_delete),
				on_update: fk_action(&group[0].on_update),
				deferrable: None,
			});
		}

		// 5. Unique constraints via PRAGMA index_list / index_info where
		//    origin = 'u' (i.e. declared with the UNIQUE keyword or as a
		//    named CONSTRAINT … UNIQUE). Identifier interpolation via the
		//    shared `sqlite_pragma` helper. See issue #4454.
		// nosemgrep: rust.actix.sql.sqlx-taint.sqlx-taint
		let idx_list_sql = format!(
			"PRAGMA index_list({})",
			super::sqlite_pragma::quote_pragma_identifier(table_name)
		);
		let idx_rows = editor.fetch_all(&idx_list_sql, vec![]).await?;
		for row in &idx_rows {
			let origin: String = row.get("origin").unwrap_or_default();
			let unique: i64 = row.get("unique").unwrap_or(0);
			if origin != "u" || unique != 1 {
				continue;
			}
			let idx_name: String = row.get("name").unwrap_or_default();
			// nosemgrep: rust.actix.sql.sqlx-taint.sqlx-taint
			let info_sql = format!(
				"PRAGMA index_info({})",
				super::sqlite_pragma::quote_pragma_identifier(&idx_name)
			);
			let info_rows = editor.fetch_all(&info_sql, vec![]).await?;
			let cols: Vec<String> = info_rows
				.iter()
				.filter_map(|r| r.get::<String>("name").ok())
				.collect();
			constraints.push(super::Constraint::Unique {
				name: idx_name,
				columns: cols,
			});
		}

		// 6. CHECK constraints parsed from CREATE TABLE SQL (SQLite has no
		//    PRAGMA for these).
		if let Some(ref sql) = create_sql {
			for (idx, check) in SQLiteIntrospector::parse_check_constraints(sql)?
				.into_iter()
				.enumerate()
			{
				constraints.push(super::Constraint::Check {
					name: check.name.unwrap_or_else(|| format!("check_{}", idx)),
					expression: check.expression,
				});
			}
		}

		Ok((columns, constraints))
	}

	/// Handle SQLite table recreation for operations that require it
	///
	/// SQLite has limited ALTER TABLE support. Operations like DropColumn and AlterColumn
	/// require table recreation (CREATE new table → COPY data → DROP old → RENAME).
	///
	/// This method handles foreign key constraints by:
	/// 1. Disabling FK checks before recreation
	/// 2. Executing the table recreation
	/// 3. Re-enabling FK checks
	/// 4. Checking for FK integrity violations
	#[cfg(feature = "sqlite")]
	async fn handle_sqlite_recreation(
		&self,
		operation: &Operation,
		editor: &mut SchemaEditor,
	) -> Result<()> {
		use super::operations::SqliteTableRecreation;

		// Disable foreign key checks before table recreation
		// This prevents FK violations during the temporary DROP TABLE phase
		editor.disable_foreign_keys().await?;

		// Build the recreation plan based on operation type.
		//
		// Critical: introspection must run via the editor's open transaction so
		// that DDL applied earlier in the same migration (e.g. a preceding
		// `AddColumn`) is visible. Reading via the pool would land on a
		// different connection that cannot see the uncommitted schema and
		// would silently rebuild the table from a stale column set — the
		// root cause of reinhardt-web#4447.
		let recreation = match operation {
			Operation::DropColumn { table, column } => {
				tracing::debug!(
					"Handling SQLite table recreation for DropColumn: table={}, column={}",
					table,
					column
				);
				let (columns, constraints) =
					Self::read_sqlite_table_via_editor(editor, table).await?;
				SqliteTableRecreation::for_drop_column(table, columns, column, constraints)
			}
			Operation::AlterColumn {
				table,
				column,
				new_definition,
				..
			} => {
				tracing::debug!(
					"Handling SQLite table recreation for AlterColumn: table={}, column={}",
					table,
					column
				);
				let (columns, constraints) =
					Self::read_sqlite_table_via_editor(editor, table).await?;
				SqliteTableRecreation::for_alter_column(
					table,
					columns,
					column,
					new_definition.clone(),
					constraints,
				)
			}
			Operation::AddConstraint {
				table,
				constraint_sql,
			} => {
				tracing::debug!(
					"Handling SQLite table recreation for AddConstraint: table={}",
					table
				);
				let (columns, constraints) =
					Self::read_sqlite_table_via_editor(editor, table).await?;
				SqliteTableRecreation::for_add_constraint(
					table,
					columns,
					constraints,
					constraint_sql.clone(),
				)
			}
			Operation::DropConstraint {
				table,
				constraint_name,
			} => {
				tracing::debug!(
					"Handling SQLite table recreation for DropConstraint: table={}, constraint={}",
					table,
					constraint_name
				);
				let (columns, constraints) =
					Self::read_sqlite_table_via_editor(editor, table).await?;
				SqliteTableRecreation::for_drop_constraint(
					table,
					columns,
					constraints,
					constraint_name,
				)
			}
			_ => {
				// This branch should not be reached if requires_sqlite_recreation() is correct
				tracing::warn!(
					"Operation {:?} was passed to handle_sqlite_recreation but is not handled. \
					Attempting to execute as-is, which may fail.",
					std::mem::discriminant(operation)
				);
				// Re-enable FK checks and fall back to normal SQL execution
				editor.enable_foreign_keys().await?;
				let sql = operation.to_sql(&super::operations::SqlDialect::Sqlite);
				editor.execute(&sql).await?;
				return Ok(());
			}
		};

		// Execute recreation steps
		for stmt in recreation.to_sql_statements() {
			tracing::debug!("Executing recreation SQL: {}", &stmt[..stmt.len().min(100)]);
			editor.execute(&stmt).await?;
		}

		// Re-enable foreign key checks
		editor.enable_foreign_keys().await?;

		// Check for FK integrity violations (logs warning if any found)
		let violations = editor.check_foreign_key_integrity().await?;
		if !violations.is_empty() {
			return Err(MigrationError::ForeignKeyViolation(format!(
				"Foreign key violations detected after table recreation: {}",
				violations.join("; ")
			)));
		}

		tracing::debug!(
			"SQLite table recreation completed for {:?}",
			std::mem::discriminant(operation)
		);

		Ok(())
	}

	/// Record a migration as applied without actually running it
	pub async fn record_migration(&mut self, app_label: &str, migration_name: &str) -> Result<()> {
		self.recorder
			.record_applied(app_label, migration_name)
			.await?;
		Ok(())
	}

	/// Execute a migration by loading it from the service
	// Allow dead_code: public API for programmatic single-migration execution
	#[allow(dead_code)]
	pub async fn execute_migration(
		&mut self,
		app_label: &str,
		migration_name: &str,
		service: &MigrationService,
	) -> Result<()> {
		let migration = service.load_migration(app_label, migration_name).await?;

		// Apply operations
		self.apply_migration(&migration).await?;

		// Record as applied
		self.recorder
			.record_applied(&migration.app_label, &migration.name)
			.await?;

		Ok(())
	}
}

/// Operation optimizer for migration execution
///
/// Reorders and optimizes operations for better performance and safety.
///
/// # Example
///
/// ```rust
/// use reinhardt_db::migrations::executor::OperationOptimizer;
/// use reinhardt_db::migrations::{Operation, ColumnDefinition, FieldType};
///
/// let ops = vec![
///     Operation::AddColumn {
///         table: "users".to_string(),
///         column: ColumnDefinition::new("name", FieldType::VarChar(100)),
///         mysql_options: None,
///     },
///     Operation::CreateTable {
///         name: "users".to_string(),
///         columns: vec![],
///         constraints: vec![],
///         without_rowid: None,
///         interleave_in_parent: None,
///         partition: None,
///     },
/// ];
///
/// let optimizer = OperationOptimizer::new();
/// let optimized = optimizer.optimize(ops);
/// // CreateTable should come before AddColumn
/// ```
pub struct OperationOptimizer {
	_private: (),
}

impl OperationOptimizer {
	/// Create a new operation optimizer
	///
	/// # Example
	///
	/// ```rust
	/// use reinhardt_db::migrations::executor::OperationOptimizer;
	///
	/// let optimizer = OperationOptimizer::new();
	/// ```
	pub fn new() -> Self {
		Self { _private: () }
	}

	/// Optimize and reorder operations
	///
	/// # Example
	///
	/// ```rust
	/// use reinhardt_db::migrations::executor::OperationOptimizer;
	/// use reinhardt_db::migrations::{Operation, ColumnDefinition};
	///
	/// let ops = vec![
	///     Operation::CreateTable {
	///         name: "users".to_string(),
	///         columns: vec![],
	///         constraints: vec![],
	///         without_rowid: None,
	///         interleave_in_parent: None,
	///         partition: None,
	///     },
	/// ];
	///
	/// let optimizer = OperationOptimizer::new();
	/// let optimized = optimizer.optimize(ops);
	/// assert_eq!(optimized.len(), 1);
	/// ```
	pub fn optimize(&self, operations: Vec<Operation>) -> Vec<Operation> {
		let mut optimized = operations;

		// Step 1: Reorder operations by dependency
		optimized = self.reorder_by_dependency(optimized);

		// Step 2: Group similar operations
		optimized = self.group_similar_operations(optimized);

		// Step 3: Remove redundant operations
		optimized = self.remove_redundant_operations(optimized);

		optimized
	}

	/// Reorder operations to respect dependencies
	fn reorder_by_dependency(&self, operations: Vec<Operation>) -> Vec<Operation> {
		let mut ordered = Vec::new();
		let mut remaining = operations;
		let mut created_tables = HashSet::new();

		// Priority order:
		// 1. CreateTable (sorted by foreign key dependencies)
		// 2. AddColumn
		// 3. AlterColumn
		// 4. CreateIndex
		// 5. AddConstraint
		// 6. RunSQL
		// 7. RenameColumn
		// 8. DropColumn
		// 9. DropTable

		// First pass: Create tables (respecting foreign key dependencies)
		// Extract all CreateTable operations
		let mut create_table_ops = Vec::new();
		let mut i = 0;
		while i < remaining.len() {
			if matches!(&remaining[i], Operation::CreateTable { .. }) {
				create_table_ops.push(remaining.remove(i));
			} else {
				i += 1;
			}
		}

		// Sort CreateTable operations by dependencies using topological sort
		while !create_table_ops.is_empty() {
			let mut found_independent = false;

			for i in 0..create_table_ops.len() {
				if let Operation::CreateTable {
					name, constraints, ..
				} = &create_table_ops[i]
				{
					// Extract foreign key references from constraints
					let mut depends_on_uncreated = false;
					for constraint in constraints {
						if let Some(referenced_table) =
							self.extract_foreign_key_reference(constraint)
						{
							// Check if the referenced table has been created
							if !created_tables.contains(&referenced_table)
								&& referenced_table != *name
							{
								depends_on_uncreated = true;
								break;
							}
						}
					}

					// If this table doesn't depend on any uncreated table, we can create it now
					if !depends_on_uncreated {
						// Clone the name before removing the operation
						let name_copy = name.clone();
						let op = create_table_ops.remove(i);
						created_tables.insert(name_copy);
						ordered.push(op);
						found_independent = true;
						break;
					}
				}
			}

			// If we couldn't find any independent table, just add the remaining tables
			// (this handles circular dependencies or malformed constraints)
			if !found_independent {
				for op in create_table_ops.drain(..) {
					if let Operation::CreateTable { ref name, .. } = op {
						created_tables.insert(name.clone());
					}
					ordered.push(op);
				}
				break;
			}
		}

		// Second pass: Add columns (for all tables)
		i = 0;
		while i < remaining.len() {
			if let Operation::AddColumn { .. } = &remaining[i] {
				ordered.push(remaining.remove(i));
			} else {
				i += 1;
			}
		}

		// Third pass: Other operations
		ordered.extend(remaining);

		ordered
	}

	/// Extract the referenced table name from a FOREIGN KEY constraint
	/// Returns the referenced table name if the constraint is a ForeignKey
	fn extract_foreign_key_reference(&self, constraint: &super::Constraint) -> Option<String> {
		match constraint {
			super::Constraint::ForeignKey {
				referenced_table, ..
			} => Some(referenced_table.clone()),
			_ => None,
		}
	}

	/// Extract constraint name from SQL definition
	fn extract_constraint_name(&self, constraint_sql: &str) -> Option<String> {
		let trimmed = constraint_sql.trim();

		// Check if starts with "CONSTRAINT"
		if !trimmed.starts_with("CONSTRAINT") {
			return None;
		}

		// Skip "CONSTRAINT" and whitespace
		let after_keyword = trimmed["CONSTRAINT".len()..].trim_start();

		// Extract identifier (alphanumeric + underscore)
		let name: String = after_keyword
			.chars()
			.take_while(|c| c.is_alphanumeric() || *c == '_')
			.collect();

		if name.is_empty() { None } else { Some(name) }
	}

	/// Group similar operations together
	fn group_similar_operations(&self, operations: Vec<Operation>) -> Vec<Operation> {
		let mut by_table: IndexMap<String, Vec<Operation>> = IndexMap::new();
		let mut create_ops = Vec::new();
		let mut other_ops = Vec::new();

		for op in operations {
			match &op {
				Operation::CreateTable { .. } => {
					// CreateTable operations go first
					create_ops.push(op);
				}
				Operation::AddColumn { table, .. }
				| Operation::DropColumn { table, .. }
				| Operation::AlterColumn { table, .. } => {
					by_table.entry(table.to_string()).or_default().push(op);
				}
				_ => {
					other_ops.push(op);
				}
			}
		}

		let mut grouped = Vec::new();

		// Add create table operations first
		grouped.extend(create_ops);

		// Add table-specific operations grouped by table
		for (_, ops) in by_table {
			grouped.extend(ops);
		}

		// Add other operations
		grouped.extend(other_ops);

		grouped
	}

	/// Remove redundant operations by detecting cancellations and merging similar operations
	fn remove_redundant_operations(&self, operations: Vec<Operation>) -> Vec<Operation> {
		let mut optimized = Vec::new();
		let mut removed_indices = HashSet::new();

		// Pass 1: Detect and remove operation cancellations
		for i in 0..operations.len() {
			if removed_indices.contains(&i) {
				continue;
			}

			let op = &operations[i];
			let mut found_cancellation = false;

			// Search forward for cancelling operations
			for (j, next_op) in operations.iter().enumerate().skip(i + 1) {
				if removed_indices.contains(&j) {
					continue;
				}

				// Check for cancellation patterns
				let cancels = match (op, next_op) {
					// CreateTable + DropTable
					(
						Operation::CreateTable { name: n1, .. },
						Operation::DropTable { name: n2 },
					) if n1 == n2 => true,
					// AddColumn + DropColumn
					(
						Operation::AddColumn {
							table: t1,
							column: col1,
							..
						},
						Operation::DropColumn {
							table: t2,
							column: col2,
						},
					) if t1 == t2 && col1.name == *col2 => true,
					// CreateIndex + DropIndex
					(
						Operation::CreateIndex {
							table: t1,
							columns: c1,
							..
						},
						Operation::DropIndex {
							table: t2,
							columns: c2,
						},
					) if t1 == t2 && c1 == c2 => true,
					// AddConstraint + DropConstraint
					(
						Operation::AddConstraint {
							table: t1,
							constraint_sql,
						},
						Operation::DropConstraint {
							table: t2,
							constraint_name,
						},
					) if t1 == t2 => {
						// Try to extract constraint name from SQL for exact matching
						if let Some(extracted_name) = self.extract_constraint_name(constraint_sql) {
							// Perfect match: compare extracted name with drop target
							extracted_name == *constraint_name
						} else {
							// Fallback: approximate match by table only
							true
						}
					}
					_ => false,
				};

				if cancels {
					removed_indices.insert(i);
					removed_indices.insert(j);
					found_cancellation = true;
					break;
				}
			}

			if !found_cancellation {
				optimized.push(op.clone());
			}
		}

		// Pass 1.5: Remove duplicate CreateTable operations (keep last occurrence)
		let mut deduped = Vec::new();
		let mut create_table_map: IndexMap<String, Operation> = IndexMap::new();

		for operation in optimized {
			match &operation {
				Operation::CreateTable { name, .. } => {
					// Last CreateTable for same table wins
					create_table_map.insert(name.to_string(), operation.clone());
				}
				_ => {
					// Flush accumulated CreateTable operations before non-CreateTable operation
					for (_, create_op) in create_table_map.drain(..) {
						deduped.push(create_op);
					}
					deduped.push(operation);
				}
			}
		}

		// Flush remaining CreateTable operations
		for (_, create_op) in create_table_map {
			deduped.push(create_op);
		}

		// Pass 2: Merge consecutive AlterColumn operations on same column
		let mut merged = Vec::new();
		let mut alter_column_map: IndexMap<(String, String), Operation> = IndexMap::new();

		for operation in deduped {
			match &operation {
				Operation::AlterColumn {
					table,
					column,
					new_definition: _,
					..
				} => {
					let key = (table.to_string(), column.to_string());
					// Last AlterColumn wins (overwrites previous)
					alter_column_map.insert(key, operation.clone());
				}
				_ => {
					// Flush accumulated AlterColumn operations before non-AlterColumn operation
					for (_, alter_op) in alter_column_map.drain(..) {
						merged.push(alter_op);
					}
					merged.push(operation);
				}
			}
		}

		// Flush remaining AlterColumn operations
		for (_, alter_op) in alter_column_map {
			merged.push(alter_op);
		}

		// Pass 3: Chain consecutive RenameTable operations
		let mut chained = Vec::new();
		let mut rename_chain: IndexMap<String, String> = IndexMap::new(); // original_name -> current_name

		for operation in merged {
			match &operation {
				Operation::RenameTable { old_name, new_name } => {
					// Find if any existing chain ends with this old_name
					let mut found_chain = None;
					for (original, current) in &rename_chain {
						if current == old_name {
							found_chain = Some(original.clone());
							break;
						}
					}

					if let Some(original) = found_chain {
						// Extend existing chain: original -> new_name
						rename_chain.insert(original, new_name.clone());
					} else {
						// Start new chain: old_name -> new_name
						rename_chain.insert(old_name.clone(), new_name.clone());
					}
				}
				_ => {
					// Flush accumulated RenameTable chains before non-RenameTable operation
					for (original_name, final_name) in rename_chain.drain(..) {
						chained.push(Operation::RenameTable {
							old_name: original_name,
							new_name: final_name,
						});
					}
					chained.push(operation);
				}
			}
		}

		// Flush remaining RenameTable chains
		for (original_name, final_name) in rename_chain {
			chained.push(Operation::RenameTable {
				old_name: original_name,
				new_name: final_name,
			});
		}

		chained
	}
}

impl Default for OperationOptimizer {
	fn default() -> Self {
		Self::new()
	}
}

#[cfg(test)]
mod optimizer_tests {
	use super::*;
	use crate::migrations::{ColumnDefinition, FieldType};

	#[test]
	fn test_optimizer_creation() {
		let optimizer = OperationOptimizer::new();
		let ops = vec![];
		let optimized = optimizer.optimize(ops);
		assert_eq!(optimized.len(), 0);
	}

	#[test]
	fn test_reorder_create_before_add() {
		let optimizer = OperationOptimizer::new();

		let ops = vec![
			Operation::AddColumn {
				table: "users".to_string(),
				column: ColumnDefinition::new("name", FieldType::VarChar(100)),
				mysql_options: None,
			},
			Operation::CreateTable {
				name: "users".to_string(),
				columns: vec![],
				constraints: vec![],
				without_rowid: None,
				partition: None,
				interleave_in_parent: None,
			},
		];

		let optimized = optimizer.optimize(ops);

		// CreateTable should come before AddColumn
		assert!(matches!(optimized[0], Operation::CreateTable { .. }));
		assert!(matches!(optimized[1], Operation::AddColumn { .. }));
	}

	#[test]
	fn test_remove_duplicate_create_table() {
		let optimizer = OperationOptimizer::new();

		let ops = vec![
			Operation::CreateTable {
				name: "users".to_string(),
				columns: vec![],
				constraints: vec![],
				without_rowid: None,
				partition: None,
				interleave_in_parent: None,
			},
			Operation::CreateTable {
				name: "users".to_string(),
				columns: vec![],
				constraints: vec![],
				without_rowid: None,
				partition: None,
				interleave_in_parent: None,
			},
		];

		let optimized = optimizer.optimize(ops);
		assert_eq!(optimized.len(), 1);
	}

	#[test]
	fn test_group_operations_by_table() {
		let optimizer = OperationOptimizer::new();

		let ops = vec![
			Operation::AddColumn {
				table: "users".to_string(),
				column: ColumnDefinition::new("name", FieldType::VarChar(100)),
				mysql_options: None,
			},
			Operation::CreateTable {
				name: "posts".to_string(),
				columns: vec![],
				constraints: vec![],
				without_rowid: None,
				partition: None,
				interleave_in_parent: None,
			},
			Operation::AddColumn {
				table: "users".to_string(),
				column: ColumnDefinition::new("email", FieldType::VarChar(255)),
				mysql_options: None,
			},
		];

		let optimized = optimizer.optimize(ops);
		assert_eq!(optimized.len(), 3);
	}

	#[cfg(test)]
	mod split_sql_tests {
		use crate::migrations::executor::split_sql_statements;

		#[test]
		fn test_split_simple_statements() {
			let sql = "CREATE TABLE t1 (id INT); CREATE TABLE t2 (id INT);";
			let result = split_sql_statements(sql);
			assert_eq!(result.len(), 2);
			assert_eq!(result[0], "CREATE TABLE t1 (id INT)");
			assert_eq!(result[1], "CREATE TABLE t2 (id INT)");
		}

		#[test]
		fn test_split_with_string_literals() {
			let sql = r#"INSERT INTO t VALUES ('a;b'); INSERT INTO t VALUES ('c;d');"#;
			let result = split_sql_statements(sql);
			assert_eq!(result.len(), 2);
			assert_eq!(result[0], "INSERT INTO t VALUES ('a;b')");
			assert_eq!(result[1], "INSERT INTO t VALUES ('c;d')");
		}

		#[test]
		fn test_split_with_line_comments() {
			// Line comment after semicolon becomes part of next statement
			let sql =
				"CREATE TABLE t1 (id INT); -- comment; with semicolon\nCREATE TABLE t2 (id INT);";
			let result = split_sql_statements(sql);
			assert_eq!(result.len(), 2);
			assert_eq!(result[0], "CREATE TABLE t1 (id INT)");
			assert!(result[1].contains("-- comment"));
			assert!(result[1].contains("CREATE TABLE t2"));
		}

		#[test]
		fn test_split_with_block_comments() {
			let sql =
				"CREATE TABLE t1 (id INT); /* comment; with semicolon */ CREATE TABLE t2 (id INT);";
			let result = split_sql_statements(sql);
			assert_eq!(result.len(), 2);
			assert!(result[0].contains("CREATE TABLE t1"));
			assert!(result[1].contains("CREATE TABLE t2"));
		}

		#[test]
		fn test_split_with_dollar_quotes() {
			let sql = r#"CREATE FUNCTION f() RETURNS text AS $$SELECT 'value; with semicolon';$$ LANGUAGE sql; CREATE TABLE t1 (id INT);"#;
			let result = split_sql_statements(sql);
			assert_eq!(result.len(), 2);
			assert!(result[0].contains("CREATE FUNCTION"));
			assert!(result[0].contains("value; with semicolon"));
			assert!(result[1].contains("CREATE TABLE t1"));
		}

		#[test]
		fn test_split_with_escaped_quotes() {
			let sql = r#"INSERT INTO t VALUES ('it''s a test; value'); INSERT INTO t VALUES ('another');"#;
			let result = split_sql_statements(sql);
			assert_eq!(result.len(), 2);
			assert!(result[0].contains("it''s a test; value"));
			assert!(result[1].contains("another"));
		}

		#[test]
		fn test_split_empty_statements() {
			let sql = ";;; CREATE TABLE t1 (id INT); ;";
			let result = split_sql_statements(sql);
			assert_eq!(result.len(), 1);
			assert_eq!(result[0], "CREATE TABLE t1 (id INT)");
		}

		#[test]
		fn test_split_no_semicolon() {
			let sql = "CREATE TABLE t1 (id INT)";
			let result = split_sql_statements(sql);
			assert_eq!(result.len(), 1);
			assert_eq!(result[0], "CREATE TABLE t1 (id INT)");
		}

		#[test]
		fn test_split_whitespace_handling() {
			let sql = "  CREATE TABLE t1 (id INT)  ;  \n\n  CREATE TABLE t2 (id INT)  ;  ";
			let result = split_sql_statements(sql);
			assert_eq!(result.len(), 2);
			assert_eq!(result[0], "CREATE TABLE t1 (id INT)");
			assert_eq!(result[1], "CREATE TABLE t2 (id INT)");
		}

		#[test]
		fn test_split_reinhardt_query_migration_sql() {
			// Actual SQL generated by reinhardt-query for polls migration (from diagnostic test)
			let sql = r###"CREATE TABLE "questions_table" ( "id" bigint GENERATED BY DEFAULT AS IDENTITY NOT NULL PRIMARY KEY, "question_text" text NOT NULL, "pub_date" timestamp with time zone NOT NULL );

CREATE TABLE "choices_table" ( "id" bigint GENERATED BY DEFAULT AS IDENTITY NOT NULL PRIMARY KEY, "question_id" bigint NOT NULL, "choice_text" text NOT NULL, "votes" integer NOT NULL DEFAULT 0, FOREIGN KEY ("question_id") REFERENCES "questions_table" ("id") ON DELETE CASCADE );"###;
			let result = split_sql_statements(sql);

			// Should split into exactly 2 statements (not 3 with empty string)
			assert_eq!(
				result.len(),
				2,
				"Expected 2 statements, got {}",
				result.len()
			);

			// First statement should be CREATE TABLE questions_table
			assert!(
				result[0].contains("questions_table"),
				"First statement should contain 'questions_table'"
			);
			assert!(
				result[0].contains("question_text"),
				"First statement should contain 'question_text'"
			);
			assert!(
				!result[0].contains("choices_table"),
				"First statement should not contain 'choices_table'"
			);

			// Second statement should be CREATE TABLE choices_table
			assert!(
				result[1].contains("choices_table"),
				"Second statement should contain 'choices_table'"
			);
			assert!(
				result[1].contains("choice_text"),
				"Second statement should contain 'choice_text'"
			);
			// Verify reference to questions_table, as FOREIGN KEY constraint contains referenced table name
			assert!(
				result[1].contains("FOREIGN KEY"),
				"Second statement should contain FOREIGN KEY constraint"
			);
			assert!(
				result[1].contains("REFERENCES \"questions_table\""),
				"FOREIGN KEY should reference questions_table"
			);
		}

		#[test]
		fn test_split_multiple_foreign_keys() {
			// Case where table has multiple FOREIGN KEY constraints
			let sql = r###"CREATE TABLE "posts" ("id" bigint PRIMARY KEY);
CREATE TABLE "users" ("id" bigint PRIMARY KEY);
CREATE TABLE "comments" (
	"id" bigint PRIMARY KEY,
	"post_id" bigint,
	"user_id" bigint,
	FOREIGN KEY ("post_id") REFERENCES "posts" ("id"),
	FOREIGN KEY ("user_id") REFERENCES "users" ("id")
);"###;

			let result = split_sql_statements(sql);
			assert_eq!(result.len(), 3, "Expected 3 statements");

			// Third statement contains 2 FOREIGN KEY constraints
			assert_eq!(
				result[2].matches("FOREIGN KEY").count(),
				2,
				"Third statement should contain 2 FOREIGN KEY constraints"
			);
			assert!(
				result[2].contains("REFERENCES \"posts\""),
				"Should reference posts table"
			);
			assert!(
				result[2].contains("REFERENCES \"users\""),
				"Should reference users table"
			);
		}

		#[test]
		fn test_split_mixed_constraints() {
			// Case with mixed CHECK constraint and FOREIGN KEY
			let sql = r###"CREATE TABLE "tasks" ("id" bigint PRIMARY KEY);
CREATE TABLE "task_status" (
	"id" bigint PRIMARY KEY,
	"task_id" bigint,
	"status" text CHECK (status IN ('pending', 'completed')),
	FOREIGN KEY ("task_id") REFERENCES "tasks" ("id")
);"###;

			let result = split_sql_statements(sql);
			assert_eq!(result.len(), 2);

			// Second statement contains both CHECK constraint and FOREIGN KEY constraint
			assert!(
				result[1].contains("CHECK"),
				"Second statement should contain CHECK constraint"
			);
			assert!(
				result[1].contains("FOREIGN KEY"),
				"Second statement should contain FOREIGN KEY constraint"
			);
		}

		#[test]
		fn test_split_self_referencing_foreign_key() {
			// Case with self-referencing FOREIGN KEY
			let sql = r###"CREATE TABLE "categories" (
	"id" bigint PRIMARY KEY,
	"parent_id" bigint,
	FOREIGN KEY ("parent_id") REFERENCES "categories" ("id")
);"###;

			let result = split_sql_statements(sql);
			assert_eq!(result.len(), 1);

			// FOREIGN KEY referencing the same table
			assert!(
				result[0].contains("REFERENCES \"categories\""),
				"Should self-reference categories table"
			);
		}

		#[test]
		fn test_split_create_index_statements() {
			// Splitting CREATE INDEX statements
			let sql = r###"CREATE TABLE "products" ("id" bigint PRIMARY KEY, "name" text);
CREATE INDEX "idx_products_name" ON "products" ("name");
CREATE UNIQUE INDEX "idx_products_id" ON "products" ("id");"###;

			let result = split_sql_statements(sql);
			assert_eq!(result.len(), 3);

			assert!(
				result[0].contains("CREATE TABLE"),
				"First statement should be CREATE TABLE"
			);
			assert!(
				result[1].contains("CREATE INDEX"),
				"Second statement should be CREATE INDEX"
			);
			assert!(
				result[2].contains("CREATE UNIQUE INDEX"),
				"Third statement should be CREATE UNIQUE INDEX"
			);
		}
	}
}

#[cfg(all(test, feature = "sqlite"))]
mod rollback_orchestration_tests {
	//! In-crate tests for [`DatabaseMigrationExecutor::rollback_migrations`] and
	//! the private [`DatabaseMigrationExecutor::rollback_migration`].
	//!
	//! These exercise the orchestration layer — reverse iteration of the input
	//! slice, recorder synchronisation, the `state_only` short-circuit, and the
	//! warn-and-continue behaviour for `RunSQL` operations that lack a
	//! `reverse_sql`. Per-`Operation` SQL generation is already covered by the
	//! `to_reverse_sql` tests in `operations.rs`; this module focuses on what
	//! sits above those primitives in `executor.rs`.
	//!
	//! A real SQLite `:memory:` connection is used rather than test doubles
	//! because [`DatabaseConnection`], [`DatabaseMigrationRecorder`], and
	//! [`SchemaEditor`] are concrete types with no trait abstraction today.
	//! `:memory:` is the lightest available substitute — sub-second per case,
	//! no external services, and gated behind the `sqlite` feature that the
	//! rollback path itself already references.
	//!
	//! Not covered here (require failure injection, which needs a trait
	//! refactor to land first): non-atomic partial-rollback bookkeeping when
	//! an operation fails mid-run, atomic-mode rollback-on-failure, and
	//! foreign-key violation behaviour during rollback. Those paths remain
	//! exercised by the container-backed suite in
	//! `tests/integration/tests/migrations/migration_rollback_integration.rs`.

	use super::*;
	use crate::backends::DatabaseConnection;
	use crate::migrations::recorder::DatabaseMigrationRecorder;
	use crate::migrations::{ColumnDefinition, FieldType, Migration};
	use rstest::*;

	/// Open a fresh SQLite `:memory:` database and wrap it in a
	/// [`DatabaseMigrationExecutor`]. Each call returns an isolated database.
	async fn make_executor() -> DatabaseMigrationExecutor {
		let connection = DatabaseConnection::connect_sqlite("sqlite::memory:")
			.await
			.expect("failed to open sqlite :memory: connection");
		DatabaseMigrationExecutor::new(connection)
	}

	/// Build a single-operation `CreateTable` migration with one integer
	/// primary key column.
	fn make_create_table_migration(name: &str, table: &str) -> Migration {
		let mut migration = Migration::new(name, "rolltest");
		migration.operations.push(Operation::CreateTable {
			name: table.to_string(),
			columns: vec![ColumnDefinition::new("id", FieldType::Integer)],
			constraints: vec![],
			without_rowid: None,
			interleave_in_parent: None,
			partition: None,
		});
		migration
	}

	#[rstest]
	#[tokio::test]
	async fn rollback_with_empty_input_returns_empty_result() {
		// Arrange
		let mut executor = make_executor().await;

		// Act
		let result = executor
			.rollback_migrations(&[])
			.await
			.expect("rollback of empty input should not fail");

		// Assert
		assert!(
			result.applied.is_empty(),
			"no migrations should be reported as rolled back"
		);
		assert!(result.failed.is_none(), "no failure should be reported");
	}

	#[rstest]
	#[tokio::test]
	async fn rollback_iterates_input_slice_in_reverse_order() {
		// Arrange - apply three independent migrations.
		let m1 = make_create_table_migration("0001_a", "rolltest_a");
		let m2 = make_create_table_migration("0002_b", "rolltest_b");
		let m3 = make_create_table_migration("0003_c", "rolltest_c");
		let mut executor = make_executor().await;
		executor
			.apply_migrations(&[m1.clone(), m2.clone(), m3.clone()])
			.await
			.expect("apply m1..m3");

		// Act - pass the slice in declaration order; rollback must consume in reverse.
		let result = executor
			.rollback_migrations(&[m1.clone(), m2.clone(), m3.clone()])
			.await
			.expect("rollback m1..m3");

		// Assert - newest-first order, independent of how apply_migrations
		// ordered them via the topological sort.
		assert_eq!(
			result.applied,
			vec![m3.id(), m2.id(), m1.id()],
			"rollback_migrations must iterate input in reverse"
		);
	}

	#[rstest]
	#[tokio::test]
	async fn rollback_skips_migrations_that_were_never_applied() {
		// Arrange - apply only m1, leave m2 unrecorded.
		let m1 = make_create_table_migration("0001_first", "rolltest_first");
		let m2 = make_create_table_migration("0002_second", "rolltest_second");
		let mut executor = make_executor().await;
		executor
			.apply_migrations(std::slice::from_ref(&m1))
			.await
			.expect("apply m1");

		// Act - ask to roll both back.
		let result = executor
			.rollback_migrations(&[m1.clone(), m2.clone()])
			.await
			.expect("rollback should succeed even with an unapplied entry");

		// Assert - only the actually-applied migration is reported as rolled back.
		assert_eq!(
			result.applied,
			vec![m1.id()],
			"unapplied migrations must be silently skipped, not rolled back"
		);
	}

	#[rstest]
	#[tokio::test]
	async fn rollback_clears_recorder_state_for_rolled_back_migration() {
		// Arrange
		let migration = make_create_table_migration("0001_only", "rolltest_only");
		let mut executor = make_executor().await;
		executor
			.apply_migrations(std::slice::from_ref(&migration))
			.await
			.expect("apply migration");

		let recorder_before = DatabaseMigrationRecorder::new(executor.connection().clone());
		assert!(
			recorder_before
				.is_applied(&migration.app_label, &migration.name)
				.await
				.expect("query recorder before"),
			"sanity: migration should be applied before rollback"
		);

		// Act
		executor
			.rollback_migrations(std::slice::from_ref(&migration))
			.await
			.expect("rollback");

		// Assert - recorder no longer marks the migration as applied.
		let recorder_after = DatabaseMigrationRecorder::new(executor.connection().clone());
		assert!(
			!recorder_after
				.is_applied(&migration.app_label, &migration.name)
				.await
				.expect("query recorder after"),
			"recorder must report unapplied after successful rollback"
		);
	}

	#[rstest]
	#[tokio::test]
	async fn rollback_with_state_only_flag_skips_schema_changes() {
		// Arrange - apply a normal migration so the table really exists.
		let mut migration = make_create_table_migration("0001_state", "rolltest_state");
		let mut executor = make_executor().await;
		executor
			.apply_migrations(std::slice::from_ref(&migration))
			.await
			.expect("apply migration to create the table");

		// Re-mark the migration as state_only for the rollback request,
		// which must hit the `state_only` short-circuit in
		// `rollback_migration` and skip schema operations entirely.
		migration.state_only = true;

		// Act
		let result = executor
			.rollback_migrations(std::slice::from_ref(&migration))
			.await
			.expect("state_only rollback should succeed without DB ops");

		// Assert - rollback is recorded, but the table is intentionally still
		// present because no DROP TABLE was issued.
		assert_eq!(result.applied, vec![migration.id()]);

		let table_still_present = executor
			.connection()
			.fetch_optional(
				"SELECT name FROM sqlite_master WHERE type = 'table' AND name = ?",
				vec!["rolltest_state".into()],
			)
			.await
			.expect("introspect sqlite_master")
			.is_some();
		assert!(
			table_still_present,
			"state_only rollback must not execute schema operations"
		);
	}

	#[rstest]
	#[tokio::test]
	async fn rollback_run_sql_without_reverse_sql_completes_without_error() {
		// Pins the current contract of the `Operation::RunSQL` arm in
		// `rollback_migration` when `reverse_sql` is `None`: the rollback
		// logs a warning and continues. It does NOT return an error. If a
		// future change promotes that to a typed error, this test will fail
		// and force the change to be explicit rather than silent.
		let mut migration = Migration::new("0001_run_sql_noop", "rolltest");
		migration.operations.push(Operation::RunSQL {
			sql: "CREATE TABLE rolltest_runsql (id INTEGER PRIMARY KEY)".to_string(),
			reverse_sql: None,
		});

		let mut executor = make_executor().await;
		executor
			.apply_migrations(std::slice::from_ref(&migration))
			.await
			.expect("apply RunSQL migration");

		// Act
		let result = executor
			.rollback_migrations(std::slice::from_ref(&migration))
			.await
			.expect("rollback should succeed even when reverse_sql is missing");

		// Assert - the rollback is reported in `result.applied` and the
		// recorder is cleared even though no reverse SQL ran (the
		// warn-and-skip contract still updates bookkeeping).
		assert_eq!(result.applied, vec![migration.id()]);

		let recorder = DatabaseMigrationRecorder::new(executor.connection().clone());
		assert!(
			!recorder
				.is_applied(&migration.app_label, &migration.name)
				.await
				.expect("query recorder after"),
			"recorder must reflect unapplied state after warn-and-skip rollback"
		);
	}
}