sql_splitter/convert/
mod.rs

1//! Convert command for translating SQL dumps between dialects.
2//!
3//! Supports conversion between MySQL, PostgreSQL, and SQLite dialects with:
4//! - Identifier quoting conversion (backticks ↔ double quotes)
5//! - String escape normalization (\' ↔ '')
6//! - Data type mapping (AUTO_INCREMENT ↔ SERIAL ↔ INTEGER PRIMARY KEY)
7//! - COPY FROM stdin → INSERT conversion
8//! - Session header conversion
9//! - Warning system for unsupported features
10
11mod copy_to_insert;
12mod types;
13mod warnings;
14
15#[allow(unused_imports)]
16pub use copy_to_insert::{
17    copy_to_inserts, parse_copy_data, parse_copy_header, CopyHeader, CopyValue,
18};
19
20use crate::parser::{Parser, SqlDialect, StatementType};
21use crate::splitter::Compression;
22use indicatif::{ProgressBar, ProgressStyle};
23use std::fs::File;
24use std::io::{BufWriter, Read, Write};
25use std::path::PathBuf;
26
27pub use types::TypeMapper;
28pub use warnings::{ConvertWarning, WarningCollector};
29
30/// Configuration for the convert command
31#[derive(Debug)]
32pub struct ConvertConfig {
33    /// Input SQL file
34    pub input: PathBuf,
35    /// Output SQL file (None for stdout)
36    pub output: Option<PathBuf>,
37    /// Source dialect (auto-detected if None)
38    pub from_dialect: Option<SqlDialect>,
39    /// Target dialect
40    pub to_dialect: SqlDialect,
41    /// Dry run mode
42    pub dry_run: bool,
43    /// Show progress
44    pub progress: bool,
45    /// Strict mode (fail on any unsupported feature)
46    pub strict: bool,
47}
48
49impl Default for ConvertConfig {
50    fn default() -> Self {
51        Self {
52            input: PathBuf::new(),
53            output: None,
54            from_dialect: None,
55            to_dialect: SqlDialect::Postgres,
56            dry_run: false,
57            progress: false,
58            strict: false,
59        }
60    }
61}
62
63/// Statistics from convert operation
64#[derive(Debug, Default)]
65pub struct ConvertStats {
66    /// Total statements processed
67    pub statements_processed: u64,
68    /// Statements converted
69    pub statements_converted: u64,
70    /// Statements passed through unchanged
71    pub statements_unchanged: u64,
72    /// Statements skipped (unsupported)
73    pub statements_skipped: u64,
74    /// Warnings generated
75    pub warnings: Vec<ConvertWarning>,
76}
77
78/// Main converter that dispatches to specific dialect converters
79pub struct Converter {
80    from: SqlDialect,
81    to: SqlDialect,
82    warnings: WarningCollector,
83    strict: bool,
84    /// Pending COPY header for data block processing
85    pending_copy_header: Option<CopyHeader>,
86}
87
88impl Converter {
89    pub fn new(from: SqlDialect, to: SqlDialect) -> Self {
90        Self {
91            from,
92            to,
93            warnings: WarningCollector::new(),
94            strict: false,
95            pending_copy_header: None,
96        }
97    }
98
99    pub fn with_strict(mut self, strict: bool) -> Self {
100        self.strict = strict;
101        self
102    }
103
104    /// Check if we have a pending COPY header (waiting for data block)
105    pub fn has_pending_copy(&self) -> bool {
106        self.pending_copy_header.is_some()
107    }
108
109    /// Process a COPY data block using the pending header
110    pub fn process_copy_data(&mut self, data: &[u8]) -> Result<Vec<Vec<u8>>, ConvertWarning> {
111        if let Some(header) = self.pending_copy_header.take() {
112            if self.from == SqlDialect::Postgres && self.to != SqlDialect::Postgres {
113                // Convert COPY data to INSERT statements
114                let inserts = copy_to_inserts(&header, data, self.to);
115                return Ok(inserts);
116            }
117        }
118        // Pass through if same dialect or no pending header
119        Ok(vec![data.to_vec()])
120    }
121
122    /// Convert a single statement
123    pub fn convert_statement(&mut self, stmt: &[u8]) -> Result<Vec<u8>, ConvertWarning> {
124        let (stmt_type, table_name) =
125            Parser::<&[u8]>::parse_statement_with_dialect(stmt, self.from);
126
127        let table = if table_name.is_empty() {
128            None
129        } else {
130            Some(table_name.as_str())
131        };
132
133        match stmt_type {
134            StatementType::CreateTable => self.convert_create_table(stmt, table),
135            StatementType::Insert => self.convert_insert(stmt, table),
136            StatementType::CreateIndex => self.convert_create_index(stmt),
137            StatementType::AlterTable => self.convert_alter_table(stmt),
138            StatementType::DropTable => self.convert_drop_table(stmt),
139            StatementType::Copy => self.convert_copy(stmt, table),
140            StatementType::Unknown => self.convert_other(stmt),
141        }
142    }
143
144    /// Convert CREATE TABLE statement
145    fn convert_create_table(
146        &mut self,
147        stmt: &[u8],
148        table_name: Option<&str>,
149    ) -> Result<Vec<u8>, ConvertWarning> {
150        let stmt_str = String::from_utf8_lossy(stmt);
151        let mut result = stmt_str.to_string();
152
153        // Detect unsupported features BEFORE conversion (so we see original types)
154        self.detect_unsupported_features(&result, table_name)?;
155
156        // Convert identifier quoting
157        result = self.convert_identifiers(&result);
158
159        // Convert data types
160        result = self.convert_data_types(&result);
161
162        // Convert AUTO_INCREMENT
163        result = self.convert_auto_increment(&result, table_name);
164
165        // Convert PostgreSQL-specific syntax
166        if self.from == SqlDialect::Postgres && self.to != SqlDialect::Postgres {
167            result = self.strip_postgres_casts(&result);
168            result = self.convert_nextval(&result);
169            result = self.convert_default_now(&result);
170            result = self.strip_schema_prefix(&result);
171        }
172
173        // Convert string escapes
174        result = self.convert_string_escapes(&result);
175
176        // Strip MySQL conditional comments
177        result = self.strip_conditional_comments(&result);
178
179        // Convert ENGINE clause
180        result = self.strip_engine_clause(&result);
181
182        // Convert CHARSET/COLLATE
183        result = self.strip_charset_clauses(&result);
184
185        Ok(result.into_bytes())
186    }
187
188    /// Convert INSERT statement
189    fn convert_insert(
190        &mut self,
191        stmt: &[u8],
192        _table_name: Option<&str>,
193    ) -> Result<Vec<u8>, ConvertWarning> {
194        let stmt_str = String::from_utf8_lossy(stmt);
195        let mut result = stmt_str.to_string();
196
197        // Convert identifier quoting
198        result = self.convert_identifiers(&result);
199
200        // Convert PostgreSQL-specific syntax
201        if self.from == SqlDialect::Postgres && self.to != SqlDialect::Postgres {
202            result = self.strip_postgres_casts(&result);
203            result = self.strip_schema_prefix(&result);
204        }
205
206        // Convert string escapes (careful with data!)
207        result = self.convert_string_escapes(&result);
208
209        Ok(result.into_bytes())
210    }
211
212    /// Convert CREATE INDEX statement
213    fn convert_create_index(&mut self, stmt: &[u8]) -> Result<Vec<u8>, ConvertWarning> {
214        let stmt_str = String::from_utf8_lossy(stmt);
215        let mut result = stmt_str.to_string();
216
217        // Convert identifier quoting
218        result = self.convert_identifiers(&result);
219
220        // Convert PostgreSQL-specific syntax
221        if self.from == SqlDialect::Postgres && self.to != SqlDialect::Postgres {
222            result = self.strip_postgres_casts(&result);
223            result = self.strip_schema_prefix(&result);
224        }
225
226        // Detect FULLTEXT/SPATIAL
227        if result.contains("FULLTEXT") || result.contains("fulltext") {
228            self.warnings.add(ConvertWarning::UnsupportedFeature {
229                feature: "FULLTEXT INDEX".to_string(),
230                suggestion: Some("Use PostgreSQL GIN index or skip".to_string()),
231            });
232            if self.strict {
233                return Err(ConvertWarning::UnsupportedFeature {
234                    feature: "FULLTEXT INDEX".to_string(),
235                    suggestion: None,
236                });
237            }
238        }
239
240        Ok(result.into_bytes())
241    }
242
243    /// Convert ALTER TABLE statement
244    fn convert_alter_table(&mut self, stmt: &[u8]) -> Result<Vec<u8>, ConvertWarning> {
245        let stmt_str = String::from_utf8_lossy(stmt);
246        let mut result = stmt_str.to_string();
247
248        result = self.convert_identifiers(&result);
249        result = self.convert_data_types(&result);
250
251        // Convert PostgreSQL-specific syntax
252        if self.from == SqlDialect::Postgres && self.to != SqlDialect::Postgres {
253            result = self.strip_postgres_casts(&result);
254            result = self.convert_nextval(&result);
255            result = self.convert_default_now(&result);
256            result = self.strip_schema_prefix(&result);
257        }
258
259        Ok(result.into_bytes())
260    }
261
262    /// Convert DROP TABLE statement
263    fn convert_drop_table(&mut self, stmt: &[u8]) -> Result<Vec<u8>, ConvertWarning> {
264        let stmt_str = String::from_utf8_lossy(stmt);
265        let mut result = stmt_str.to_string();
266
267        result = self.convert_identifiers(&result);
268
269        // Strip PostgreSQL schema prefix
270        if self.from == SqlDialect::Postgres && self.to != SqlDialect::Postgres {
271            result = self.strip_schema_prefix(&result);
272        }
273
274        Ok(result.into_bytes())
275    }
276
277    /// Convert COPY statement (PostgreSQL-specific)
278    ///
279    /// This handles the COPY header. The data block is processed separately
280    /// via process_copy_data() when called from the run() function.
281    fn convert_copy(
282        &mut self,
283        stmt: &[u8],
284        _table_name: Option<&str>,
285    ) -> Result<Vec<u8>, ConvertWarning> {
286        let stmt_str = String::from_utf8_lossy(stmt);
287
288        // Check if this contains "FROM stdin" (COPY header) or is data
289        let upper = stmt_str.to_uppercase();
290        if upper.contains("FROM STDIN") {
291            // This is a COPY header - parse it and store for later
292            if let Some(header) = parse_copy_header(&stmt_str) {
293                if self.from == SqlDialect::Postgres && self.to != SqlDialect::Postgres {
294                    // Store the header, will convert data block in process_copy_data
295                    self.pending_copy_header = Some(header);
296                    // Return empty - the actual INSERT will be generated from data
297                    return Ok(Vec::new());
298                }
299            }
300        }
301
302        // If same dialect or couldn't parse, pass through
303        Ok(stmt.to_vec())
304    }
305
306    /// Convert other statements (comments, session settings, etc.)
307    fn convert_other(&mut self, stmt: &[u8]) -> Result<Vec<u8>, ConvertWarning> {
308        let stmt_str = String::from_utf8_lossy(stmt);
309        let result = stmt_str.to_string();
310        let trimmed = result.trim();
311
312        // Skip MySQL session commands when converting to other dialects
313        if self.from == SqlDialect::MySql
314            && self.to != SqlDialect::MySql
315            && self.is_mysql_session_command(&result)
316        {
317            return Ok(Vec::new()); // Skip
318        }
319
320        // Skip PostgreSQL session commands and unsupported features when converting to other dialects
321        if self.from == SqlDialect::Postgres
322            && self.to != SqlDialect::Postgres
323            && self.is_postgres_session_command(&result)
324        {
325            return Ok(Vec::new()); // Skip
326        }
327        if self.from == SqlDialect::Postgres
328            && self.to != SqlDialect::Postgres
329            && self.is_postgres_only_feature(trimmed)
330        {
331            self.warnings.add(ConvertWarning::SkippedStatement {
332                reason: "PostgreSQL-only feature".to_string(),
333                statement_preview: trimmed.chars().take(60).collect(),
334            });
335            return Ok(Vec::new()); // Skip
336        }
337
338        // Skip SQLite pragmas when converting to other dialects
339        if self.from == SqlDialect::Sqlite
340            && self.to != SqlDialect::Sqlite
341            && self.is_sqlite_pragma(&result)
342        {
343            return Ok(Vec::new()); // Skip
344        }
345
346        // Strip conditional comments
347        if result.contains("/*!") {
348            let stripped = self.strip_conditional_comments(&result);
349            return Ok(stripped.into_bytes());
350        }
351
352        Ok(stmt.to_vec())
353    }
354
355    /// Check if statement is a MySQL session command
356    fn is_mysql_session_command(&self, stmt: &str) -> bool {
357        let upper = stmt.to_uppercase();
358        upper.contains("SET NAMES")
359            || upper.contains("SET CHARACTER")
360            || upper.contains("SET SQL_MODE")
361            || upper.contains("SET TIME_ZONE")
362            || upper.contains("SET FOREIGN_KEY_CHECKS")
363            || upper.contains("LOCK TABLES")
364            || upper.contains("UNLOCK TABLES")
365    }
366
367    /// Check if statement is a PostgreSQL session command or unsupported statement
368    fn is_postgres_session_command(&self, stmt: &str) -> bool {
369        let upper = stmt.to_uppercase();
370        // Session/transaction settings
371        upper.contains("SET CLIENT_ENCODING")
372            || upper.contains("SET STANDARD_CONFORMING_STRINGS")
373            || upper.contains("SET CHECK_FUNCTION_BODIES")
374            || upper.contains("SET SEARCH_PATH")
375            || upper.contains("SET DEFAULT_TABLESPACE")
376            || upper.contains("SET LOCK_TIMEOUT")
377            || upper.contains("SET IDLE_IN_TRANSACTION_SESSION_TIMEOUT")
378            || upper.contains("SET ROW_SECURITY")
379            || upper.contains("SET STATEMENT_TIMEOUT")
380            || upper.contains("SET XMLOPTION")
381            || upper.contains("SET CLIENT_MIN_MESSAGES")
382            || upper.contains("SET DEFAULT_TABLE_ACCESS_METHOD")
383            || upper.contains("SELECT PG_CATALOG")
384            // Ownership/permission statements
385            || upper.contains("OWNER TO")
386            || upper.contains("GRANT ")
387            || upper.contains("REVOKE ")
388    }
389
390    /// Check if statement is a PostgreSQL-only feature that should be skipped
391    fn is_postgres_only_feature(&self, stmt: &str) -> bool {
392        // Strip leading comments to find the actual statement
393        let stripped = self.strip_leading_sql_comments(stmt);
394        let upper = stripped.to_uppercase();
395
396        // These PostgreSQL features have no MySQL/SQLite equivalent
397        upper.starts_with("CREATE DOMAIN")
398            || upper.starts_with("CREATE TYPE")
399            || upper.starts_with("CREATE FUNCTION")
400            || upper.starts_with("CREATE PROCEDURE")
401            || upper.starts_with("CREATE AGGREGATE")
402            || upper.starts_with("CREATE OPERATOR")
403            || upper.starts_with("CREATE SEQUENCE")
404            || upper.starts_with("CREATE EXTENSION")
405            || upper.starts_with("CREATE SCHEMA")
406            || upper.starts_with("CREATE TRIGGER")
407            || upper.starts_with("ALTER DOMAIN")
408            || upper.starts_with("ALTER TYPE")
409            || upper.starts_with("ALTER FUNCTION")
410            || upper.starts_with("ALTER SEQUENCE")
411            || upper.starts_with("ALTER SCHEMA")
412            || upper.starts_with("COMMENT ON")
413    }
414
415    /// Strip leading SQL comments (-- and /* */) from a string
416    fn strip_leading_sql_comments(&self, stmt: &str) -> String {
417        let mut result = stmt.trim();
418        loop {
419            // Strip -- comments
420            if result.starts_with("--") {
421                if let Some(pos) = result.find('\n') {
422                    result = result[pos + 1..].trim();
423                    continue;
424                } else {
425                    return String::new();
426                }
427            }
428            // Strip /* */ comments
429            if result.starts_with("/*") {
430                if let Some(pos) = result.find("*/") {
431                    result = result[pos + 2..].trim();
432                    continue;
433                } else {
434                    return String::new();
435                }
436            }
437            break;
438        }
439        result.to_string()
440    }
441
442    /// Check if statement is a SQLite pragma
443    fn is_sqlite_pragma(&self, stmt: &str) -> bool {
444        let upper = stmt.to_uppercase();
445        upper.contains("PRAGMA")
446    }
447
448    /// Convert identifier quoting based on dialects
449    fn convert_identifiers(&self, stmt: &str) -> String {
450        match (self.from, self.to) {
451            (SqlDialect::MySql, SqlDialect::Postgres | SqlDialect::Sqlite) => {
452                // Backticks → double quotes
453                self.backticks_to_double_quotes(stmt)
454            }
455            (SqlDialect::Postgres | SqlDialect::Sqlite, SqlDialect::MySql) => {
456                // Double quotes → backticks
457                self.double_quotes_to_backticks(stmt)
458            }
459            _ => stmt.to_string(),
460        }
461    }
462
463    /// Convert backticks to double quotes
464    pub fn backticks_to_double_quotes(&self, stmt: &str) -> String {
465        let mut result = String::with_capacity(stmt.len());
466        let mut in_string = false;
467        let mut in_backtick = false;
468
469        for c in stmt.chars() {
470            if c == '\'' && !in_backtick {
471                in_string = !in_string;
472                result.push(c);
473            } else if c == '`' && !in_string {
474                in_backtick = !in_backtick;
475                result.push('"');
476            } else {
477                result.push(c);
478            }
479        }
480        result
481    }
482
483    /// Convert double quotes to backticks
484    pub fn double_quotes_to_backticks(&self, stmt: &str) -> String {
485        let mut result = String::with_capacity(stmt.len());
486        let mut in_string = false;
487        let mut in_dquote = false;
488        let chars = stmt.chars();
489
490        for c in chars {
491            if c == '\'' && !in_dquote {
492                in_string = !in_string;
493                result.push(c);
494            } else if c == '"' && !in_string {
495                in_dquote = !in_dquote;
496                result.push('`');
497            } else {
498                result.push(c);
499            }
500        }
501        result
502    }
503
504    /// Convert data types between dialects
505    fn convert_data_types(&self, stmt: &str) -> String {
506        TypeMapper::convert(stmt, self.from, self.to)
507    }
508
509    /// Convert AUTO_INCREMENT/SERIAL syntax
510    fn convert_auto_increment(&self, stmt: &str, _table_name: Option<&str>) -> String {
511        match (self.from, self.to) {
512            (SqlDialect::MySql, SqlDialect::Postgres) => {
513                // INT AUTO_INCREMENT → SERIAL
514                // BIGINT AUTO_INCREMENT → BIGSERIAL
515                let result = stmt.replace("BIGINT AUTO_INCREMENT", "BIGSERIAL");
516                let result = result.replace("bigint AUTO_INCREMENT", "BIGSERIAL");
517                let result = result.replace("INT AUTO_INCREMENT", "SERIAL");
518                let result = result.replace("int AUTO_INCREMENT", "SERIAL");
519                result.replace("AUTO_INCREMENT", "") // Clean up any remaining
520            }
521            (SqlDialect::MySql, SqlDialect::Sqlite) => {
522                // INT AUTO_INCREMENT PRIMARY KEY → INTEGER PRIMARY KEY
523                // The AUTOINCREMENT keyword is optional in SQLite
524                let result = stmt.replace("INT AUTO_INCREMENT", "INTEGER");
525                let result = result.replace("int AUTO_INCREMENT", "INTEGER");
526                result.replace("AUTO_INCREMENT", "")
527            }
528            (SqlDialect::Postgres, SqlDialect::MySql) => {
529                // SERIAL → INT AUTO_INCREMENT
530                // BIGSERIAL → BIGINT AUTO_INCREMENT
531                let result = stmt.replace("BIGSERIAL", "BIGINT AUTO_INCREMENT");
532                let result = result.replace("bigserial", "BIGINT AUTO_INCREMENT");
533                let result = result.replace("SMALLSERIAL", "SMALLINT AUTO_INCREMENT");
534                let result = result.replace("smallserial", "SMALLINT AUTO_INCREMENT");
535                let result = result.replace("SERIAL", "INT AUTO_INCREMENT");
536                result.replace("serial", "INT AUTO_INCREMENT")
537            }
538            (SqlDialect::Postgres, SqlDialect::Sqlite) => {
539                // SERIAL → INTEGER (SQLite auto-increments INTEGER PRIMARY KEY)
540                let result = stmt.replace("BIGSERIAL", "INTEGER");
541                let result = result.replace("bigserial", "INTEGER");
542                let result = result.replace("SMALLSERIAL", "INTEGER");
543                let result = result.replace("smallserial", "INTEGER");
544                let result = result.replace("SERIAL", "INTEGER");
545                result.replace("serial", "INTEGER")
546            }
547            (SqlDialect::Sqlite, SqlDialect::MySql) => {
548                // SQLite uses INTEGER PRIMARY KEY for auto-increment
549                // We can't easily detect this pattern, so just pass through
550                stmt.to_string()
551            }
552            (SqlDialect::Sqlite, SqlDialect::Postgres) => {
553                // SQLite uses INTEGER PRIMARY KEY for auto-increment
554                // We can't easily detect this pattern, so just pass through
555                stmt.to_string()
556            }
557            _ => stmt.to_string(),
558        }
559    }
560
561    /// Convert string escape sequences
562    fn convert_string_escapes(&self, stmt: &str) -> String {
563        match (self.from, self.to) {
564            (SqlDialect::MySql, SqlDialect::Postgres | SqlDialect::Sqlite) => {
565                // MySQL uses \' for escaping, PostgreSQL/SQLite use ''
566                self.mysql_escapes_to_standard(stmt)
567            }
568            _ => stmt.to_string(),
569        }
570    }
571
572    /// Convert MySQL backslash escapes to standard SQL double-quote escapes
573    fn mysql_escapes_to_standard(&self, stmt: &str) -> String {
574        let mut result = String::with_capacity(stmt.len());
575        let mut chars = stmt.chars().peekable();
576        let mut in_string = false;
577
578        while let Some(c) = chars.next() {
579            if c == '\'' {
580                in_string = !in_string;
581                result.push(c);
582            } else if c == '\\' && in_string {
583                // Check next character
584                if let Some(&next) = chars.peek() {
585                    match next {
586                        '\'' => {
587                            // \' → ''
588                            chars.next();
589                            result.push_str("''");
590                        }
591                        '\\' => {
592                            // \\ → keep as-is for data integrity
593                            chars.next();
594                            result.push_str("\\\\");
595                        }
596                        'n' | 'r' | 't' | '0' => {
597                            // Keep common escapes as-is
598                            result.push(c);
599                        }
600                        _ => {
601                            result.push(c);
602                        }
603                    }
604                } else {
605                    result.push(c);
606                }
607            } else {
608                result.push(c);
609            }
610        }
611        result
612    }
613
614    /// Strip MySQL conditional comments /*!40101 ... */
615    fn strip_conditional_comments(&self, stmt: &str) -> String {
616        let mut result = String::with_capacity(stmt.len());
617        let mut chars = stmt.chars().peekable();
618
619        while let Some(c) = chars.next() {
620            if c == '/' && chars.peek() == Some(&'*') {
621                chars.next(); // consume *
622                if chars.peek() == Some(&'!') {
623                    // Skip conditional comment
624                    chars.next(); // consume !
625                                  // Skip version number
626                    while chars.peek().map(|c| c.is_ascii_digit()).unwrap_or(false) {
627                        chars.next();
628                    }
629                    // Skip content until */
630                    let mut depth = 1;
631                    while depth > 0 {
632                        match chars.next() {
633                            Some('*') if chars.peek() == Some(&'/') => {
634                                chars.next();
635                                depth -= 1;
636                            }
637                            Some('/') if chars.peek() == Some(&'*') => {
638                                chars.next();
639                                depth += 1;
640                            }
641                            None => break,
642                            _ => {}
643                        }
644                    }
645                } else {
646                    // Regular comment, keep it
647                    result.push('/');
648                    result.push('*');
649                }
650            } else {
651                result.push(c);
652            }
653        }
654        result
655    }
656
657    /// Strip ENGINE clause
658    fn strip_engine_clause(&self, stmt: &str) -> String {
659        if self.to == SqlDialect::MySql {
660            return stmt.to_string();
661        }
662
663        // Remove ENGINE=InnoDB, ENGINE=MyISAM, etc.
664        let re = regex::Regex::new(r"(?i)\s*ENGINE\s*=\s*\w+").unwrap();
665        re.replace_all(stmt, "").to_string()
666    }
667
668    /// Strip CHARSET/COLLATE clauses
669    fn strip_charset_clauses(&self, stmt: &str) -> String {
670        if self.to == SqlDialect::MySql {
671            return stmt.to_string();
672        }
673
674        let result = stmt.to_string();
675        let re1 = regex::Regex::new(r"(?i)\s*(DEFAULT\s+)?CHARSET\s*=\s*\w+").unwrap();
676        let result = re1.replace_all(&result, "").to_string();
677
678        let re2 = regex::Regex::new(r"(?i)\s*COLLATE\s*=?\s*\w+").unwrap();
679        re2.replace_all(&result, "").to_string()
680    }
681
682    /// Strip PostgreSQL type casts (::type and ::regclass)
683    fn strip_postgres_casts(&self, stmt: &str) -> String {
684        use once_cell::sync::Lazy;
685        use regex::Regex;
686
687        // Match ::regclass, ::text, ::integer, etc. (including complex types like character varying)
688        static RE_CAST: Lazy<Regex> = Lazy::new(|| {
689            Regex::new(r"::[a-zA-Z_][a-zA-Z0-9_]*(?:\s+[a-zA-Z_][a-zA-Z0-9_]*)*").unwrap()
690        });
691
692        RE_CAST.replace_all(stmt, "").to_string()
693    }
694
695    /// Convert nextval('sequence') to NULL or remove (AUTO_INCREMENT handles it)
696    fn convert_nextval(&self, stmt: &str) -> String {
697        use once_cell::sync::Lazy;
698        use regex::Regex;
699
700        // Match nextval('sequence_name'::regclass) or nextval('sequence_name')
701        // Remove the DEFAULT nextval(...) entirely - AUTO_INCREMENT is already applied
702        static RE_NEXTVAL: Lazy<Regex> =
703            Lazy::new(|| Regex::new(r"(?i)\s*DEFAULT\s+nextval\s*\([^)]+\)").unwrap());
704
705        RE_NEXTVAL.replace_all(stmt, "").to_string()
706    }
707
708    /// Convert DEFAULT now() to DEFAULT CURRENT_TIMESTAMP
709    fn convert_default_now(&self, stmt: &str) -> String {
710        use once_cell::sync::Lazy;
711        use regex::Regex;
712
713        static RE_NOW: Lazy<Regex> =
714            Lazy::new(|| Regex::new(r"(?i)\bDEFAULT\s+now\s*\(\s*\)").unwrap());
715
716        RE_NOW
717            .replace_all(stmt, "DEFAULT CURRENT_TIMESTAMP")
718            .to_string()
719    }
720
721    /// Strip schema prefix from table names (e.g., public.users -> users)
722    fn strip_schema_prefix(&self, stmt: &str) -> String {
723        use once_cell::sync::Lazy;
724        use regex::Regex;
725
726        // Match schema.table patterns (with optional quotes)
727        // Handle: public.table, "public"."table", public."table"
728        static RE_SCHEMA: Lazy<Regex> =
729            Lazy::new(|| Regex::new(r#"(?i)\b(public|pg_catalog|pg_temp)\s*\.\s*"#).unwrap());
730
731        RE_SCHEMA.replace_all(stmt, "").to_string()
732    }
733
734    /// Detect unsupported features and add warnings
735    fn detect_unsupported_features(
736        &mut self,
737        stmt: &str,
738        table_name: Option<&str>,
739    ) -> Result<(), ConvertWarning> {
740        let upper = stmt.to_uppercase();
741
742        // MySQL-specific features
743        if self.from == SqlDialect::MySql {
744            // ENUM types
745            if upper.contains("ENUM(") {
746                let warning = ConvertWarning::UnsupportedFeature {
747                    feature: format!(
748                        "ENUM type{}",
749                        table_name
750                            .map(|t| format!(" in table {}", t))
751                            .unwrap_or_default()
752                    ),
753                    suggestion: Some(
754                        "Converted to VARCHAR - consider adding CHECK constraint".to_string(),
755                    ),
756                };
757                self.warnings.add(warning.clone());
758                if self.strict {
759                    return Err(warning);
760                }
761            }
762
763            // SET types (MySQL)
764            if upper.contains("SET(") {
765                let warning = ConvertWarning::UnsupportedFeature {
766                    feature: format!(
767                        "SET type{}",
768                        table_name
769                            .map(|t| format!(" in table {}", t))
770                            .unwrap_or_default()
771                    ),
772                    suggestion: Some(
773                        "Converted to VARCHAR - SET semantics not preserved".to_string(),
774                    ),
775                };
776                self.warnings.add(warning.clone());
777                if self.strict {
778                    return Err(warning);
779                }
780            }
781
782            // UNSIGNED
783            if upper.contains("UNSIGNED") {
784                self.warnings.add(ConvertWarning::UnsupportedFeature {
785                    feature: "UNSIGNED modifier".to_string(),
786                    suggestion: Some(
787                        "Removed - consider adding CHECK constraint for non-negative values"
788                            .to_string(),
789                    ),
790                });
791            }
792        }
793
794        // PostgreSQL-specific features
795        if self.from == SqlDialect::Postgres {
796            // Array types
797            if upper.contains("[]") || upper.contains("ARRAY[") {
798                let warning = ConvertWarning::UnsupportedFeature {
799                    feature: format!(
800                        "Array type{}",
801                        table_name
802                            .map(|t| format!(" in table {}", t))
803                            .unwrap_or_default()
804                    ),
805                    suggestion: Some(
806                        "Array types not supported in target dialect - consider using JSON"
807                            .to_string(),
808                    ),
809                };
810                self.warnings.add(warning.clone());
811                if self.strict {
812                    return Err(warning);
813                }
814            }
815
816            // INHERITS
817            if upper.contains("INHERITS") {
818                let warning = ConvertWarning::UnsupportedFeature {
819                    feature: "Table inheritance (INHERITS)".to_string(),
820                    suggestion: Some(
821                        "PostgreSQL table inheritance not supported in target dialect".to_string(),
822                    ),
823                };
824                self.warnings.add(warning.clone());
825                if self.strict {
826                    return Err(warning);
827                }
828            }
829
830            // PARTITION BY
831            if upper.contains("PARTITION BY") && self.to == SqlDialect::Sqlite {
832                let warning = ConvertWarning::UnsupportedFeature {
833                    feature: "Table partitioning".to_string(),
834                    suggestion: Some("Partitioning not supported in SQLite".to_string()),
835                };
836                self.warnings.add(warning.clone());
837                if self.strict {
838                    return Err(warning);
839                }
840            }
841        }
842
843        Ok(())
844    }
845
846    /// Get collected warnings
847    pub fn warnings(&self) -> &[ConvertWarning] {
848        self.warnings.warnings()
849    }
850}
851
852/// Run the convert command
853pub fn run(config: ConvertConfig) -> anyhow::Result<ConvertStats> {
854    let mut stats = ConvertStats::default();
855
856    // Detect or use specified source dialect
857    let from_dialect = if let Some(d) = config.from_dialect {
858        d
859    } else {
860        let result = crate::parser::detect_dialect_from_file(&config.input)?;
861        if config.progress {
862            eprintln!(
863                "Auto-detected source dialect: {} (confidence: {:?})",
864                result.dialect, result.confidence
865            );
866        }
867        result.dialect
868    };
869
870    // Check for same dialect
871    if from_dialect == config.to_dialect {
872        anyhow::bail!(
873            "Source and target dialects are the same ({}). No conversion needed.",
874            from_dialect
875        );
876    }
877
878    let progress_bar = if config.progress {
879        let pb = ProgressBar::new_spinner();
880        pb.set_style(
881            ProgressStyle::default_spinner()
882                .template("{spinner:.green} {msg}")
883                .unwrap(),
884        );
885        pb.set_message("Converting...");
886        Some(pb)
887    } else {
888        None
889    };
890
891    // Create converter
892    let mut converter = Converter::new(from_dialect, config.to_dialect).with_strict(config.strict);
893
894    // Open input file
895    let file = File::open(&config.input)?;
896    let compression = Compression::from_path(&config.input);
897    let reader: Box<dyn Read> = compression.wrap_reader(Box::new(file));
898    let mut parser = Parser::with_dialect(reader, 64 * 1024, from_dialect);
899
900    // Open output
901    let mut writer: Box<dyn Write> = if config.dry_run {
902        Box::new(std::io::sink())
903    } else {
904        match &config.output {
905            Some(path) => {
906                if let Some(parent) = path.parent() {
907                    std::fs::create_dir_all(parent)?;
908                }
909                Box::new(BufWriter::with_capacity(256 * 1024, File::create(path)?))
910            }
911            None => Box::new(BufWriter::new(std::io::stdout())),
912        }
913    };
914
915    // Write header
916    if !config.dry_run {
917        write_header(&mut writer, &config, from_dialect)?;
918    }
919
920    // Process statements
921    while let Some(stmt) = parser.read_statement()? {
922        stats.statements_processed += 1;
923
924        if let Some(ref pb) = progress_bar {
925            if stats.statements_processed % 1000 == 0 {
926                pb.set_message(format!(
927                    "Processed {} statements...",
928                    stats.statements_processed
929                ));
930            }
931        }
932
933        // Check if this is a COPY data block (follows a COPY header)
934        if converter.has_pending_copy() {
935            // This is a data block, convert it to INSERT statements
936            match converter.process_copy_data(&stmt) {
937                Ok(inserts) => {
938                    for insert in inserts {
939                        if !insert.is_empty() {
940                            stats.statements_converted += 1;
941                            if !config.dry_run {
942                                writer.write_all(&insert)?;
943                                writer.write_all(b"\n")?;
944                            }
945                        }
946                    }
947                }
948                Err(warning) => {
949                    stats.warnings.push(warning);
950                    stats.statements_skipped += 1;
951                }
952            }
953            continue;
954        }
955
956        match converter.convert_statement(&stmt) {
957            Ok(converted) => {
958                if converted.is_empty() {
959                    stats.statements_skipped += 1;
960                } else if converted == stmt {
961                    stats.statements_unchanged += 1;
962                    if !config.dry_run {
963                        writer.write_all(&converted)?;
964                        writer.write_all(b"\n")?;
965                    }
966                } else {
967                    stats.statements_converted += 1;
968                    if !config.dry_run {
969                        writer.write_all(&converted)?;
970                        writer.write_all(b"\n")?;
971                    }
972                }
973            }
974            Err(warning) => {
975                stats.warnings.push(warning);
976                stats.statements_skipped += 1;
977            }
978        }
979    }
980
981    // Collect warnings
982    stats.warnings.extend(converter.warnings().iter().cloned());
983
984    if let Some(pb) = progress_bar {
985        pb.finish_with_message(format!(
986            "Converted {} statements",
987            stats.statements_processed
988        ));
989    }
990
991    Ok(stats)
992}
993
994/// Write output header
995fn write_header(
996    writer: &mut dyn Write,
997    config: &ConvertConfig,
998    from: SqlDialect,
999) -> std::io::Result<()> {
1000    writeln!(writer, "-- Converted by sql-splitter")?;
1001    writeln!(writer, "-- From: {} → To: {}", from, config.to_dialect)?;
1002    writeln!(writer, "-- Source: {}", config.input.display())?;
1003    writeln!(writer)?;
1004
1005    // Write dialect-specific header
1006    match config.to_dialect {
1007        SqlDialect::Postgres => {
1008            writeln!(writer, "SET client_encoding = 'UTF8';")?;
1009            writeln!(writer, "SET standard_conforming_strings = on;")?;
1010        }
1011        SqlDialect::Sqlite => {
1012            writeln!(writer, "PRAGMA foreign_keys = OFF;")?;
1013        }
1014        SqlDialect::MySql => {
1015            writeln!(writer, "SET NAMES utf8mb4;")?;
1016            writeln!(writer, "SET FOREIGN_KEY_CHECKS = 0;")?;
1017        }
1018    }
1019    writeln!(writer)?;
1020
1021    Ok(())
1022}