sql_splitter/duckdb/
loader.rs

1//! SQL dump loader for importing dumps into DuckDB.
2
3use super::batch::{flush_batch, BatchManager, MAX_ROWS_PER_BATCH};
4use super::types::TypeConverter;
5use super::{ImportStats, QueryConfig};
6use crate::convert::copy_to_insert::{copy_to_inserts, parse_copy_header, CopyHeader};
7use crate::parser::{
8    detect_dialect_from_file, parse_insert_for_bulk, Parser, SqlDialect, StatementType,
9};
10use crate::progress::ProgressReader;
11use crate::splitter::Compression;
12use anyhow::{Context, Result};
13use duckdb::Connection;
14use indicatif::{ProgressBar, ProgressStyle};
15use once_cell::sync::Lazy;
16use regex::Regex;
17use std::fs::File;
18use std::io::{BufRead, BufReader, Read};
19use std::path::Path;
20
21/// Maximum COPY rows to accumulate per batch before converting to INSERTs.
22/// This bounds memory usage while still enabling large batches for performance.
23const MAX_COPY_ROWS_PER_BATCH: usize = 10_000;
24
25/// Loads SQL dumps into a DuckDB database
26pub struct DumpLoader<'a> {
27    conn: &'a Connection,
28    config: &'a QueryConfig,
29}
30
31impl<'a> DumpLoader<'a> {
32    /// Create a new dump loader
33    pub fn new(conn: &'a Connection, config: &'a QueryConfig) -> Self {
34        Self { conn, config }
35    }
36
37    /// Load a SQL dump file into DuckDB
38    pub fn load(&self, dump_path: &Path) -> Result<ImportStats> {
39        let start = std::time::Instant::now();
40        let mut stats = ImportStats::default();
41
42        // Detect dialect
43        let dialect = if let Some(d) = self.config.dialect {
44            d
45        } else {
46            let result = detect_dialect_from_file(dump_path)?;
47            result.dialect
48        };
49
50        // Get file size for progress
51        let file_size = std::fs::metadata(dump_path)?.len();
52
53        // Set up progress bar
54        let progress_bar = if self.config.progress {
55            let pb = ProgressBar::new(file_size);
56            pb.set_style(
57                ProgressStyle::default_bar()
58                    .template("{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {bytes}/{total_bytes} ({percent}%)")
59                    .unwrap()
60                    .progress_chars("=>-"),
61            );
62            Some(pb)
63        } else {
64            None
65        };
66
67        // Open file with compression detection
68        let file = File::open(dump_path).context("Failed to open dump file")?;
69        let compression = Compression::from_path(dump_path);
70        let reader: Box<dyn Read> = match compression {
71            Compression::Gzip => Box::new(flate2::read::GzDecoder::new(file)),
72            Compression::Bzip2 => Box::new(bzip2::read::BzDecoder::new(file)),
73            Compression::Xz => Box::new(xz2::read::XzDecoder::new(file)),
74            Compression::Zstd => Box::new(zstd::stream::Decoder::new(file)?),
75            Compression::None => Box::new(file),
76        };
77
78        let reader: Box<dyn Read> = if let Some(ref pb) = progress_bar {
79            let pb_clone = pb.clone();
80            Box::new(ProgressReader::new(reader, move |bytes| {
81                pb_clone.set_position(bytes);
82            }))
83        } else {
84            reader
85        };
86
87        let buf_reader = BufReader::with_capacity(256 * 1024, reader);
88
89        // Parse and load statements
90        self.load_statements(buf_reader, dialect, &mut stats)?;
91
92        if let Some(pb) = progress_bar {
93            pb.finish_with_message("Import complete");
94        }
95
96        stats.duration_secs = start.elapsed().as_secs_f64();
97        Ok(stats)
98    }
99
100    /// Load statements from a reader
101    fn load_statements<R: Read>(
102        &self,
103        reader: BufReader<R>,
104        dialect: SqlDialect,
105        stats: &mut ImportStats,
106    ) -> Result<()> {
107        let mut parser = StatementReader::new(reader, dialect);
108        let mut pending_copy: Option<CopyHeader> = None;
109
110        // Batched COPY data accumulator
111        let mut copy_batch_data: Vec<u8> = Vec::new();
112        let mut copy_batch_rows: usize = 0;
113
114        // Track tables that failed (don't exist) to skip subsequent inserts
115        let mut failed_tables: std::collections::HashSet<String> = std::collections::HashSet::new();
116
117        // Batch manager for bulk INSERT loading via Appender API
118        let mut batch_mgr = BatchManager::new(MAX_ROWS_PER_BATCH);
119
120        while let Some(stmt_result) = parser.next_statement() {
121            let stmt = stmt_result?;
122
123            // Handle COPY data blocks with batching
124            if let Some(ref header) = pending_copy {
125                let trimmed = stmt.trim();
126
127                // End-of-COPY marker: "\."
128                if trimmed == "\\." {
129                    // Flush any remaining batched rows
130                    if !copy_batch_data.is_empty() {
131                        self.process_copy_batch(
132                            header,
133                            &copy_batch_data,
134                            stats,
135                            &mut failed_tables,
136                        );
137                        copy_batch_data.clear();
138                        copy_batch_rows = 0;
139                    }
140                    pending_copy = None;
141                    parser.set_copy_mode(false);
142                    continue;
143                }
144
145                // Empty/whitespace line inside COPY: skip
146                if trimmed.is_empty() {
147                    continue;
148                }
149
150                // Normal COPY data line - accumulate into batch
151                if Self::looks_like_copy_data(&stmt) {
152                    // Skip if we know this table doesn't exist
153                    if failed_tables.contains(&header.table) {
154                        continue;
155                    }
156
157                    copy_batch_data.extend_from_slice(stmt.as_bytes());
158                    copy_batch_data.push(b'\n');
159                    copy_batch_rows += 1;
160
161                    // Flush when batch gets large enough
162                    if copy_batch_rows >= MAX_COPY_ROWS_PER_BATCH {
163                        self.process_copy_batch(
164                            header,
165                            &copy_batch_data,
166                            stats,
167                            &mut failed_tables,
168                        );
169                        copy_batch_data.clear();
170                        copy_batch_rows = 0;
171                    }
172                    continue;
173                }
174
175                // Unexpected content inside COPY - flush and fall through to normal handling
176                if !copy_batch_data.is_empty() {
177                    self.process_copy_batch(header, &copy_batch_data, stats, &mut failed_tables);
178                    copy_batch_data.clear();
179                    copy_batch_rows = 0;
180                }
181                pending_copy = None;
182                parser.set_copy_mode(false);
183                // Fall through to normal statement handling
184            }
185
186            let (mut stmt_type, table_name) =
187                Parser::<&[u8]>::parse_statement_with_dialect(stmt.as_bytes(), dialect);
188
189            // For Postgres, check if statement contains COPY ... FROM stdin (may be after comments)
190            if dialect == SqlDialect::Postgres && stmt_type == StatementType::Unknown {
191                let upper = stmt.to_uppercase();
192                if let Some(copy_pos) = upper.find("COPY ") {
193                    let after_copy = &upper[copy_pos..];
194                    if after_copy.contains("FROM STDIN") {
195                        stmt_type = StatementType::Copy;
196                    }
197                }
198            }
199
200            // Filter tables if specified
201            if let Some(ref tables) = self.config.tables {
202                if !table_name.is_empty()
203                    && !tables.iter().any(|t| t.eq_ignore_ascii_case(&table_name))
204                {
205                    continue;
206                }
207            }
208
209            match stmt_type {
210                StatementType::CreateTable => {
211                    let duckdb_sql = self.convert_create_table(&stmt, dialect)?;
212                    match self.conn.execute(&duckdb_sql, []) {
213                        Ok(_) => stats.tables_created += 1,
214                        Err(e) => {
215                            stats
216                                .warnings
217                                .push(format!("Failed to create table {}: {}", table_name, e));
218                            stats.statements_skipped += 1;
219                        }
220                    }
221                }
222                StatementType::Insert => {
223                    // Try bulk loading via Appender API first
224                    if !self.try_queue_for_bulk(
225                        &stmt,
226                        dialect,
227                        &mut batch_mgr,
228                        stats,
229                        &mut failed_tables,
230                    ) {
231                        // Fallback to direct execution
232                        let duckdb_sql = self.convert_insert(&stmt, dialect)?;
233                        match self.conn.execute(&duckdb_sql, []) {
234                            Ok(_) => {
235                                stats.insert_statements += 1;
236                                stats.rows_inserted += Self::count_insert_rows(&duckdb_sql);
237                            }
238                            Err(e) => {
239                                stats
240                                    .warnings
241                                    .push(format!("Failed INSERT for {}: {}", table_name, e));
242                                stats.statements_skipped += 1;
243                            }
244                        }
245                    }
246
247                    // Flush any ready batches
248                    for mut batch in batch_mgr.get_ready_batches() {
249                        flush_batch(self.conn, &mut batch, stats, &mut failed_tables)?;
250                    }
251                }
252                StatementType::Copy => {
253                    // Parse COPY header and start buffering data
254                    if let Some(header) = parse_copy_header(&stmt) {
255                        // Check if table already known to be missing
256                        if failed_tables.contains(&header.table) {
257                            // Enter COPY mode to skip line-by-line, then exit
258                            parser.set_copy_mode(true);
259                            Self::skip_copy_block(&mut parser);
260                            parser.set_copy_mode(false);
261                            continue;
262                        }
263
264                        // Proactively check if table exists before buffering data
265                        if !self.table_exists(&header.table) {
266                            failed_tables.insert(header.table.clone());
267                            if stats.warnings.len() < 100 {
268                                stats.warnings.push(format!(
269                                    "Skipping COPY for non-existent table {}",
270                                    header.table
271                                ));
272                            }
273                            // Enter COPY mode to skip line-by-line, then exit
274                            parser.set_copy_mode(true);
275                            Self::skip_copy_block(&mut parser);
276                            parser.set_copy_mode(false);
277                            continue;
278                        }
279
280                        copy_batch_data.clear();
281                        copy_batch_rows = 0;
282                        pending_copy = Some(header);
283                        parser.set_copy_mode(true);
284                    }
285                }
286                StatementType::CreateIndex => {
287                    // Skip indexes - not needed for analytics queries
288                    stats.statements_skipped += 1;
289                }
290                _ => {
291                    // Skip other statements (ALTER, DROP, etc.)
292                    stats.statements_skipped += 1;
293                }
294            }
295        }
296
297        // Flush any remaining COPY batch (handles truncated dumps)
298        if let Some(ref header) = pending_copy {
299            if !copy_batch_data.is_empty() {
300                self.process_copy_batch(header, &copy_batch_data, stats, &mut failed_tables);
301            }
302        }
303
304        // Flush any remaining INSERT batches
305        for mut batch in batch_mgr.drain_all() {
306            flush_batch(self.conn, &mut batch, stats, &mut failed_tables)?;
307        }
308
309        Ok(())
310    }
311
312    /// Process a batch of COPY data rows, converting them to INSERTs
313    fn process_copy_batch(
314        &self,
315        header: &CopyHeader,
316        batch_data: &[u8],
317        stats: &mut ImportStats,
318        failed_tables: &mut std::collections::HashSet<String>,
319    ) {
320        if batch_data.is_empty() {
321            return;
322        }
323
324        // Skip if we already know this table doesn't exist
325        if failed_tables.contains(&header.table) {
326            return;
327        }
328
329        let inserts = copy_to_inserts(header, batch_data, SqlDialect::Postgres);
330        for insert in inserts {
331            let insert_sql = String::from_utf8_lossy(&insert);
332            match self.conn.execute(&insert_sql, []) {
333                Ok(_) => {
334                    stats.rows_inserted += Self::count_insert_rows(&insert_sql);
335                }
336                Err(e) => {
337                    let err_str = e.to_string();
338                    // If table doesn't exist, mark it and skip future inserts
339                    if err_str.contains("does not exist") {
340                        failed_tables.insert(header.table.clone());
341                        if stats.warnings.len() < 100 {
342                            stats.warnings.push(format!(
343                                "Table {} does not exist, skipping COPY data",
344                                header.table
345                            ));
346                        }
347                        return; // Skip rest of batch
348                    }
349                    // Limit warnings to avoid memory bloat on large failures
350                    if stats.warnings.len() < 100 {
351                        stats.warnings.push(format!(
352                            "Failed to insert COPY data for {}: {}",
353                            header.table, e
354                        ));
355                    }
356                    stats.statements_skipped += 1;
357                }
358            }
359        }
360    }
361
362    /// Check if a table exists in DuckDB
363    fn table_exists(&self, table: &str) -> bool {
364        let query = "SELECT 1 FROM information_schema.tables WHERE table_name = ? LIMIT 1";
365        match self.conn.prepare(query) {
366            Ok(mut stmt) => stmt.exists([table]).unwrap_or(false),
367            Err(_) => false,
368        }
369    }
370
371    /// Try to queue an INSERT statement for bulk loading via Appender API.
372    /// Returns true if successfully queued, false if fallback to direct execution is needed.
373    fn try_queue_for_bulk(
374        &self,
375        stmt: &str,
376        dialect: SqlDialect,
377        batch_mgr: &mut BatchManager,
378        stats: &mut ImportStats,
379        failed_tables: &mut std::collections::HashSet<String>,
380    ) -> bool {
381        // Quick filter: skip statements with complex clauses that Appender can't handle
382        let upper = stmt.to_uppercase();
383        if upper.contains("ON DUPLICATE KEY")
384            || upper.contains("ON CONFLICT")
385            || upper.contains("REPLACE")
386            || upper.contains("IGNORE")
387            || upper.contains("RETURNING")
388            || upper.contains("SELECT")
389        {
390            return false;
391        }
392
393        // Try to parse the INSERT statement
394        let parsed = match parse_insert_for_bulk(stmt.as_bytes()) {
395            Ok(p) => p,
396            Err(_) => return false, // Parse failed, use fallback
397        };
398
399        // Skip tables we know don't exist
400        if failed_tables.contains(&parsed.table) {
401            return true; // Pretend we handled it to skip the statement
402        }
403
404        // Skip if no rows were parsed
405        if parsed.rows.is_empty() {
406            return false;
407        }
408
409        // Convert the statement for DuckDB (for fallback)
410        let duckdb_sql = match self.convert_insert(stmt, dialect) {
411            Ok(sql) => sql,
412            Err(_) => return false,
413        };
414
415        // Queue the rows
416        if let Some(mut batch) =
417            batch_mgr.queue_insert(&parsed.table, parsed.columns, parsed.rows, duckdb_sql)
418        {
419            // Batch is ready to flush
420            if let Err(e) = flush_batch(self.conn, &mut batch, stats, failed_tables) {
421                if stats.warnings.len() < 100 {
422                    stats.warnings.push(format!("Batch flush error: {}", e));
423                }
424            }
425        }
426
427        true
428    }
429
430    /// Skip a COPY data block without parsing/processing it
431    fn skip_copy_block<R: Read>(parser: &mut StatementReader<R>) {
432        while let Some(Ok(line)) = parser.next_statement() {
433            if line.trim() == "\\." {
434                break;
435            }
436        }
437    }
438
439    /// Check if a line looks like COPY data (tab-separated values or single-column values)
440    fn looks_like_copy_data(line: &str) -> bool {
441        let trimmed = line.trim();
442
443        // Empty line is not COPY data
444        if trimmed.is_empty() {
445            return false;
446        }
447
448        // End-of-COPY marker
449        if trimmed == "\\." {
450            return false;
451        }
452
453        // Lines with tabs are definitely COPY data (multi-column)
454        // For single-column data, we need to check it doesn't look like SQL
455        let first_char = trimmed.chars().next().unwrap_or(' ');
456
457        // Quick check: if starts with common SQL keyword first char, verify it's not SQL
458        if matches!(
459            first_char,
460            'S' | 's'
461                | 'I'
462                | 'i'
463                | 'C'
464                | 'c'
465                | 'D'
466                | 'd'
467                | 'A'
468                | 'a'
469                | 'U'
470                | 'u'
471                | 'G'
472                | 'g'
473                | '-'
474                | '/'
475        ) {
476            let upper_prefix: String = trimmed.chars().take(7).collect::<String>().to_uppercase();
477            if upper_prefix.starts_with("SELECT")
478                || upper_prefix.starts_with("INSERT")
479                || upper_prefix.starts_with("CREATE")
480                || upper_prefix.starts_with("DROP")
481                || upper_prefix.starts_with("ALTER")
482                || upper_prefix.starts_with("UPDATE")
483                || upper_prefix.starts_with("GRANT")
484                || upper_prefix.starts_with("--")
485                || upper_prefix.starts_with("/*")
486            {
487                return false;
488            }
489        }
490
491        // If it contains a tab, it's multi-column COPY data
492        // If it doesn't contain a tab but passed the SQL check, it's single-column data
493        true
494    }
495
496    /// Convert a CREATE TABLE statement to DuckDB-compatible SQL
497    fn convert_create_table(&self, stmt: &str, dialect: SqlDialect) -> Result<String> {
498        let mut result = stmt.to_string();
499
500        // Convert identifier quoting
501        result = Self::convert_identifiers(&result, dialect);
502
503        // Remove MySQL-specific clauses FIRST (before type conversion)
504        // This prevents "CHARACTER SET" from being confused with "CHAR" type
505        result = Self::strip_mysql_clauses(&result);
506
507        // Convert data types in column definitions
508        result = Self::convert_types_in_statement(&result);
509
510        // Remove PostgreSQL-specific syntax
511        if dialect == SqlDialect::Postgres {
512            result = Self::strip_postgres_syntax(&result);
513        }
514
515        // Remove MSSQL-specific syntax
516        if dialect == SqlDialect::Mssql {
517            result = Self::strip_mssql_syntax(&result);
518        }
519
520        Ok(result)
521    }
522
523    /// Convert an INSERT statement to DuckDB-compatible SQL
524    fn convert_insert(&self, stmt: &str, dialect: SqlDialect) -> Result<String> {
525        let mut result = stmt.to_string();
526
527        // Convert identifier quoting
528        result = Self::convert_identifiers(&result, dialect);
529
530        // Convert MySQL backslash escapes to SQL standard
531        if dialect == SqlDialect::MySql {
532            result = Self::convert_mysql_escapes(&result);
533        }
534
535        // Remove PostgreSQL schema prefix
536        if dialect == SqlDialect::Postgres {
537            result = Self::strip_schema_prefix(&result);
538        }
539
540        // Remove MSSQL schema prefix (dbo., etc.)
541        if dialect == SqlDialect::Mssql {
542            result = Self::strip_mssql_schema_prefix(&result);
543        }
544
545        Ok(result)
546    }
547
548    /// Convert MySQL backslash escapes to SQL standard
549    fn convert_mysql_escapes(stmt: &str) -> String {
550        let mut result = String::with_capacity(stmt.len() + 100);
551        let mut chars = stmt.chars().peekable();
552        let mut in_string = false;
553
554        while let Some(c) = chars.next() {
555            if c == '\'' {
556                in_string = !in_string;
557                result.push(c);
558            } else if c == '\\' && in_string {
559                // Handle MySQL escape sequences
560                match chars.peek() {
561                    Some('\'') => {
562                        // \' -> ''
563                        chars.next();
564                        result.push_str("''");
565                    }
566                    Some('\\') => {
567                        // \\ -> \
568                        chars.next();
569                        result.push('\\');
570                    }
571                    Some('n') => {
572                        // \n -> newline
573                        chars.next();
574                        result.push('\n');
575                    }
576                    Some('r') => {
577                        // \r -> carriage return
578                        chars.next();
579                        result.push('\r');
580                    }
581                    Some('t') => {
582                        // \t -> tab
583                        chars.next();
584                        result.push('\t');
585                    }
586                    Some('0') => {
587                        // \0 -> NULL character (skip)
588                        chars.next();
589                    }
590                    Some('"') => {
591                        // \" -> "
592                        chars.next();
593                        result.push('"');
594                    }
595                    _ => {
596                        // Unknown escape, keep backslash
597                        result.push(c);
598                    }
599                }
600            } else {
601                result.push(c);
602            }
603        }
604        result
605    }
606
607    /// Convert identifier quoting (backticks/brackets to double quotes)
608    fn convert_identifiers(stmt: &str, dialect: SqlDialect) -> String {
609        match dialect {
610            SqlDialect::MySql => {
611                // Convert backticks to double quotes
612                let mut result = String::with_capacity(stmt.len());
613                let mut in_string = false;
614                let mut in_backtick = false;
615
616                for c in stmt.chars() {
617                    if c == '\'' && !in_backtick {
618                        in_string = !in_string;
619                        result.push(c);
620                    } else if c == '`' && !in_string {
621                        in_backtick = !in_backtick;
622                        result.push('"');
623                    } else {
624                        result.push(c);
625                    }
626                }
627                result
628            }
629            SqlDialect::Mssql => {
630                // Convert brackets to double quotes, strip N prefix from strings
631                let mut result = String::with_capacity(stmt.len());
632                let mut in_string = false;
633                let mut in_bracket = false;
634                let mut chars = stmt.chars().peekable();
635
636                while let Some(c) = chars.next() {
637                    if c == '\'' && !in_bracket {
638                        in_string = !in_string;
639                        result.push(c);
640                    } else if c == '[' && !in_string {
641                        in_bracket = true;
642                        result.push('"');
643                    } else if c == ']' && !in_string {
644                        // Handle ]] escape
645                        if chars.peek() == Some(&']') {
646                            chars.next();
647                            result.push(']');
648                        } else {
649                            in_bracket = false;
650                            result.push('"');
651                        }
652                    } else if c == 'N' && !in_string && !in_bracket && chars.peek() == Some(&'\'') {
653                        // Strip N prefix from N'string'
654                        // Don't push the N, the quote will be pushed in next iteration
655                    } else {
656                        result.push(c);
657                    }
658                }
659                result
660            }
661            _ => stmt.to_string(),
662        }
663    }
664
665    /// Convert SQL types in a statement
666    fn convert_types_in_statement(stmt: &str) -> String {
667        // Pattern to match column definitions with types
668        // Handles: TYPE, TYPE(size), TYPE UNSIGNED, TYPE WITH TIME ZONE
669        // IMPORTANT: Order matters - longer types first to avoid partial matches (INTEGER before INT)
670        // IMPORTANT: Types must be preceded by quote/whitespace AND followed by whitespace/paren/comma (not a closing quote)
671        // This prevents matching "date" as DATE type (column names inside quotes)
672        // Includes MySQL, PostgreSQL, SQLite, and MSSQL types
673        static RE_COLUMN_TYPE: Lazy<Regex> = Lazy::new(|| {
674            Regex::new(r#"(?i)(["'`\]\s])\s*(BIGSERIAL|SMALLSERIAL|SERIAL|BIGINT|SMALLINT|MEDIUMINT|TINYINT|INTEGER|INT|DOUBLE\s+PRECISION|DOUBLE|FLOAT|DECIMAL|NUMERIC|CHARACTER\s+VARYING|NVARCHAR|NCHAR|VARCHAR|CHAR|VARBINARY|BINARY|LONGTEXT|MEDIUMTEXT|TINYTEXT|NTEXT|TEXT|LONGBLOB|MEDIUMBLOB|TINYBLOB|IMAGE|BLOB|DATETIME2|DATETIMEOFFSET|SMALLDATETIME|DATETIME|TIMESTAMPTZ|TIMESTAMP|TIMETZ|TIME|DATE|YEAR|ENUM|SET|JSONB|JSON|UUID|UNIQUEIDENTIFIER|BYTEA|BOOLEAN|BOOL|BIT|REAL|MONEY|SMALLMONEY|INTERVAL|ROWVERSION|XML|SQL_VARIANT)\b(\s*\([^)]+\))?(\s+(?:UNSIGNED|WITH(?:OUT)?\s+TIME\s+ZONE))?"#).unwrap()
675        });
676
677        RE_COLUMN_TYPE
678            .replace_all(stmt, |caps: &regex::Captures| {
679                let full_match = caps.get(0).unwrap().as_str();
680                let leading_char = caps.get(1).unwrap().as_str();
681                let type_part = caps.get(2).unwrap().as_str();
682                let size_part = caps.get(3).map(|m| m.as_str()).unwrap_or("");
683                let suffix = caps.get(4).map(|m| m.as_str()).unwrap_or("");
684                
685                // Check if this looks like a quoted identifier (type is inside quotes)
686                // If leading char is a quote and the character before the match is also a quote, skip
687                let end_pos = caps.get(0).unwrap().end();
688                let stmt_bytes = stmt.as_bytes();
689                if end_pos < stmt_bytes.len() {
690                    let next_char = stmt_bytes[end_pos] as char;
691                    // If next character is a closing quote, this is a quoted identifier, not a type
692                    if next_char == '"' || next_char == '\'' || next_char == '`' {
693                        return full_match.to_string();
694                    }
695                }
696                
697                // Calculate the whitespace between leading char and type
698                let ws_len = full_match.len() - leading_char.len() - type_part.len() - size_part.len() - suffix.len();
699                let ws = &full_match[leading_char.len()..leading_char.len() + ws_len];
700                
701                let converted = TypeConverter::convert(&format!("{}{}{}", type_part, size_part, suffix));
702                format!("{}{}{}", leading_char, ws, converted)
703            })
704            .to_string()
705    }
706
707    /// Strip MySQL-specific clauses
708    fn strip_mysql_clauses(stmt: &str) -> String {
709        let mut result = stmt.to_string();
710
711        // Remove ENGINE clause
712        static RE_ENGINE: Lazy<Regex> =
713            Lazy::new(|| Regex::new(r"(?i)\s*ENGINE\s*=\s*\w+").unwrap());
714        result = RE_ENGINE.replace_all(&result, "").to_string();
715
716        // Remove AUTO_INCREMENT clause at table level
717        static RE_AUTO_INC: Lazy<Regex> =
718            Lazy::new(|| Regex::new(r"(?i)\s*AUTO_INCREMENT\s*=\s*\d+").unwrap());
719        result = RE_AUTO_INC.replace_all(&result, "").to_string();
720
721        // Remove column AUTO_INCREMENT
722        result = result.replace(" AUTO_INCREMENT", "");
723        result = result.replace(" auto_increment", "");
724
725        // Remove CHARACTER SET in column definitions (must come before CHARSET)
726        static RE_CHAR_SET: Lazy<Regex> =
727            Lazy::new(|| Regex::new(r"(?i)\s*CHARACTER\s+SET\s+\w+").unwrap());
728        result = RE_CHAR_SET.replace_all(&result, "").to_string();
729
730        // Remove DEFAULT CHARSET
731        static RE_CHARSET: Lazy<Regex> =
732            Lazy::new(|| Regex::new(r"(?i)\s*(DEFAULT\s+)?CHARSET\s*=\s*\w+").unwrap());
733        result = RE_CHARSET.replace_all(&result, "").to_string();
734
735        // Remove COLLATE
736        static RE_COLLATE: Lazy<Regex> =
737            Lazy::new(|| Regex::new(r"(?i)\s*COLLATE\s*=?\s*\w+").unwrap());
738        result = RE_COLLATE.replace_all(&result, "").to_string();
739
740        // Remove ROW_FORMAT
741        static RE_ROW_FORMAT: Lazy<Regex> =
742            Lazy::new(|| Regex::new(r"(?i)\s*ROW_FORMAT\s*=\s*\w+").unwrap());
743        result = RE_ROW_FORMAT.replace_all(&result, "").to_string();
744
745        // Remove KEY_BLOCK_SIZE
746        static RE_KEY_BLOCK: Lazy<Regex> =
747            Lazy::new(|| Regex::new(r"(?i)\s*KEY_BLOCK_SIZE\s*=\s*\d+").unwrap());
748        result = RE_KEY_BLOCK.replace_all(&result, "").to_string();
749
750        // Remove COMMENT
751        static RE_COMMENT: Lazy<Regex> =
752            Lazy::new(|| Regex::new(r"(?i)\s*COMMENT\s*=?\s*'[^']*'").unwrap());
753        result = RE_COMMENT.replace_all(&result, "").to_string();
754
755        // Remove MySQL conditional comments
756        static RE_COND_COMMENT: Lazy<Regex> = Lazy::new(|| Regex::new(r"/\*!\d+\s*|\*/").unwrap());
757        result = RE_COND_COMMENT.replace_all(&result, "").to_string();
758
759        // Remove ON UPDATE CURRENT_TIMESTAMP
760        static RE_ON_UPDATE: Lazy<Regex> =
761            Lazy::new(|| Regex::new(r"(?i)\s*ON\s+UPDATE\s+CURRENT_TIMESTAMP").unwrap());
762        result = RE_ON_UPDATE.replace_all(&result, "").to_string();
763
764        // Remove UNIQUE KEY constraint lines: UNIQUE KEY `name` (`col1`, `col2`)
765        // Must handle both: ,UNIQUE KEY... at end of column list and UNIQUE KEY... on its own line
766        static RE_UNIQUE_KEY: Lazy<Regex> = Lazy::new(|| {
767            Regex::new(r#"(?i),?\s*UNIQUE\s+KEY\s+[`"']?\w+[`"']?\s*\([^)]+\)"#).unwrap()
768        });
769        result = RE_UNIQUE_KEY.replace_all(&result, "").to_string();
770
771        // Remove KEY (index) constraint lines: KEY `name` (`col1`, `col2`)
772        // This handles regular indexes and FULLTEXT indexes, but NOT PRIMARY KEY or FOREIGN KEY
773        // We use a negative lookbehind pattern by only matching KEY that is preceded by comma or newline (not FOREIGN/PRIMARY)
774        static RE_KEY_INDEX: Lazy<Regex> = Lazy::new(|| {
775            Regex::new(r#"(?i)(?:,\s*|\n\s*)(?:FULLTEXT\s+|SPATIAL\s+)?KEY\s+[`"']?\w+[`"']?\s*\([^)]+\)"#).unwrap()
776        });
777        result = RE_KEY_INDEX.replace_all(&result, "").to_string();
778
779        // Remove GENERATED ALWAYS AS columns entirely
780        // Match: `col` TYPE GENERATED ALWAYS AS (expr) STORED/VIRTUAL
781        // The expression can contain nested parentheses so we match one level deep
782        static RE_GENERATED_COL: Lazy<Regex> = Lazy::new(|| {
783            Regex::new(r#"(?i),?\s*[`"']?\w+[`"']?\s+\w+\s+GENERATED\s+ALWAYS\s+AS\s*\((?:[^()]+|\([^()]*\))+\)\s*(?:STORED|VIRTUAL)?"#).unwrap()
784        });
785        result = RE_GENERATED_COL.replace_all(&result, "").to_string();
786
787        // Remove entire FOREIGN KEY constraints (DuckDB enforces them which causes issues with batch loading)
788        // Match: CONSTRAINT `name` FOREIGN KEY (...) REFERENCES ... [ON DELETE/UPDATE ...]
789        // or just: FOREIGN KEY (...) REFERENCES ... [ON DELETE/UPDATE ...]
790        static RE_FK_CONSTRAINT: Lazy<Regex> = Lazy::new(|| {
791            Regex::new(r#"(?i),?\s*(?:CONSTRAINT\s+[`"']?\w+[`"']?\s+)?FOREIGN\s+KEY\s*\([^)]+\)\s*REFERENCES\s+[`"']?\w+[`"']?\s*\([^)]+\)(?:\s+ON\s+(?:DELETE|UPDATE)\s+(?:CASCADE|SET\s+NULL|SET\s+DEFAULT|NO\s+ACTION|RESTRICT))*"#).unwrap()
792        });
793        result = RE_FK_CONSTRAINT.replace_all(&result, "").to_string();
794
795        result
796    }
797
798    /// Strip PostgreSQL-specific syntax
799    fn strip_postgres_syntax(stmt: &str) -> String {
800        let mut result = stmt.to_string();
801
802        // Remove schema prefix
803        result = Self::strip_schema_prefix(&result);
804
805        // Remove type casts
806        static RE_CAST: Lazy<Regex> = Lazy::new(|| {
807            Regex::new(r"::[a-zA-Z_][a-zA-Z0-9_]*(?:\s+[a-zA-Z_][a-zA-Z0-9_]*)*").unwrap()
808        });
809        result = RE_CAST.replace_all(&result, "").to_string();
810
811        // Remove nextval() - DuckDB handles sequences differently
812        static RE_NEXTVAL: Lazy<Regex> =
813            Lazy::new(|| Regex::new(r"(?i)\s*DEFAULT\s+nextval\s*\([^)]+\)").unwrap());
814        result = RE_NEXTVAL.replace_all(&result, "").to_string();
815
816        // Convert now() to CURRENT_TIMESTAMP
817        static RE_NOW: Lazy<Regex> =
818            Lazy::new(|| Regex::new(r"(?i)\bDEFAULT\s+now\s*\(\s*\)").unwrap());
819        result = RE_NOW
820            .replace_all(&result, "DEFAULT CURRENT_TIMESTAMP")
821            .to_string();
822
823        // Remove INHERITS clause
824        static RE_INHERITS: Lazy<Regex> =
825            Lazy::new(|| Regex::new(r"(?i)\s*INHERITS\s*\([^)]+\)").unwrap());
826        result = RE_INHERITS.replace_all(&result, "").to_string();
827
828        // Remove WITH clause (storage parameters)
829        static RE_WITH: Lazy<Regex> = Lazy::new(|| Regex::new(r"(?i)\s*WITH\s*\([^)]+\)").unwrap());
830        result = RE_WITH.replace_all(&result, "").to_string();
831
832        result
833    }
834
835    /// Strip schema prefix (e.g., public.users -> users)
836    fn strip_schema_prefix(stmt: &str) -> String {
837        static RE_SCHEMA: Lazy<Regex> =
838            Lazy::new(|| Regex::new(r#"(?i)\b(public|pg_catalog|pg_temp)\s*\.\s*"#).unwrap());
839        RE_SCHEMA.replace_all(stmt, "").to_string()
840    }
841
842    /// Strip MSSQL schema prefix (dbo., etc.) for both CREATE TABLE and INSERT
843    fn strip_mssql_schema_prefix(stmt: &str) -> String {
844        // Remove schema prefix (dbo., schema.) - quoted or unquoted
845        static RE_SCHEMA: Lazy<Regex> =
846            Lazy::new(|| Regex::new(r#"(?i)"?(dbo|master|tempdb|model|msdb)"?\s*\.\s*"#).unwrap());
847        RE_SCHEMA.replace_all(stmt, "").to_string()
848    }
849
850    /// Strip MSSQL-specific syntax
851    fn strip_mssql_syntax(stmt: &str) -> String {
852        let mut result = Self::strip_mssql_schema_prefix(stmt);
853
854        // Remove IDENTITY clause and make the column nullable (so INSERTs without id work)
855        // Pattern matches: INT IDENTITY(1,1) NOT NULL -> INT
856        static RE_IDENTITY_NOT_NULL: Lazy<Regex> = Lazy::new(|| {
857            Regex::new(r"(?i)\s*IDENTITY\s*\(\s*\d+\s*,\s*\d+\s*\)\s*NOT\s+NULL").unwrap()
858        });
859        result = RE_IDENTITY_NOT_NULL.replace_all(&result, "").to_string();
860
861        // Also handle IDENTITY without NOT NULL
862        static RE_IDENTITY: Lazy<Regex> =
863            Lazy::new(|| Regex::new(r"(?i)\s*IDENTITY\s*\(\s*\d+\s*,\s*\d+\s*\)").unwrap());
864        result = RE_IDENTITY.replace_all(&result, "").to_string();
865
866        // Remove CLUSTERED/NONCLUSTERED
867        static RE_CLUSTERED: Lazy<Regex> =
868            Lazy::new(|| Regex::new(r"(?i)\s*(?:NON)?CLUSTERED\s*").unwrap());
869        result = RE_CLUSTERED.replace_all(&result, " ").to_string();
870
871        // Remove ON [PRIMARY] (filegroup)
872        static RE_FILEGROUP: Lazy<Regex> =
873            Lazy::new(|| Regex::new(r#"(?i)\s*ON\s*"?PRIMARY"?"#).unwrap());
874        result = RE_FILEGROUP.replace_all(&result, "").to_string();
875
876        // Remove PRIMARY KEY constraints (they make columns NOT NULL which breaks IDENTITY column INSERTs)
877        static RE_PK_CONSTRAINT: Lazy<Regex> = Lazy::new(|| {
878            Regex::new(r#"(?i),?\s*CONSTRAINT\s+"?\w+"?\s+PRIMARY\s+KEY\s+\([^)]+\)"#).unwrap()
879        });
880        result = RE_PK_CONSTRAINT.replace_all(&result, "").to_string();
881
882        // Remove FOREIGN KEY constraints (analytics queries don't need FK enforcement)
883        static RE_FK_CONSTRAINT: Lazy<Regex> = Lazy::new(|| {
884            Regex::new(r#"(?i),?\s*CONSTRAINT\s+"?\w+"?\s+FOREIGN\s+KEY\s*\([^)]+\)\s*REFERENCES\s+[^\s(]+\s*\([^)]+\)"#).unwrap()
885        });
886        result = RE_FK_CONSTRAINT.replace_all(&result, "").to_string();
887
888        // Remove WITH clause for indexes
889        static RE_WITH: Lazy<Regex> = Lazy::new(|| Regex::new(r"(?i)\s*WITH\s*\([^)]+\)").unwrap());
890        result = RE_WITH.replace_all(&result, "").to_string();
891
892        // Remove TEXTIMAGE_ON
893        static RE_TEXTIMAGE: Lazy<Regex> =
894            Lazy::new(|| Regex::new(r#"(?i)\s*TEXTIMAGE_ON\s*"?\w+"?"#).unwrap());
895        result = RE_TEXTIMAGE.replace_all(&result, "").to_string();
896
897        // Convert GETDATE() to CURRENT_TIMESTAMP
898        static RE_GETDATE: Lazy<Regex> =
899            Lazy::new(|| Regex::new(r"(?i)\bGETDATE\s*\(\s*\)").unwrap());
900        result = RE_GETDATE
901            .replace_all(&result, "CURRENT_TIMESTAMP")
902            .to_string();
903
904        // Convert NEWID() to gen_random_uuid()
905        static RE_NEWID: Lazy<Regex> = Lazy::new(|| Regex::new(r"(?i)\bNEWID\s*\(\s*\)").unwrap());
906        result = RE_NEWID
907            .replace_all(&result, "gen_random_uuid()")
908            .to_string();
909
910        result
911    }
912
913    /// Count rows in an INSERT statement
914    fn count_insert_rows(sql: &str) -> u64 {
915        // Count VALUES clauses by counting opening parentheses after VALUES
916        if let Some(values_pos) = sql.to_uppercase().find("VALUES") {
917            let after_values = &sql[values_pos + 6..];
918            // Count top-level opening parens (simple heuristic)
919            let mut count = 0u64;
920            let mut depth = 0;
921            let mut in_string = false;
922            let mut prev_char = ' ';
923
924            for c in after_values.chars() {
925                if c == '\'' && prev_char != '\\' {
926                    in_string = !in_string;
927                }
928                if !in_string {
929                    if c == '(' {
930                        if depth == 0 {
931                            count += 1;
932                        }
933                        depth += 1;
934                    } else if c == ')' {
935                        depth -= 1;
936                    }
937                }
938                prev_char = c;
939            }
940            count
941        } else {
942            1
943        }
944    }
945}
946
947/// Statement reader that handles streaming SQL parsing
948struct StatementReader<R> {
949    reader: BufReader<R>,
950    dialect: SqlDialect,
951    buffer: String,
952    /// Position in buffer where unprocessed data starts (avoids O(n) shifts)
953    buffer_pos: usize,
954    eof: bool,
955    /// Track if we're inside a PostgreSQL COPY data block
956    in_copy_mode: bool,
957}
958
959impl<R: Read> StatementReader<R> {
960    fn new(reader: BufReader<R>, dialect: SqlDialect) -> Self {
961        Self {
962            reader,
963            dialect,
964            buffer: String::new(),
965            buffer_pos: 0,
966            eof: false,
967            in_copy_mode: false,
968        }
969    }
970
971    /// Compact the buffer by removing already-processed data
972    /// Only called periodically to avoid O(n²) behavior
973    fn compact_buffer(&mut self) {
974        if self.buffer_pos > 0 {
975            self.buffer.drain(..self.buffer_pos);
976            self.buffer_pos = 0;
977        }
978    }
979
980    /// Get the unprocessed portion of the buffer
981    fn remaining_buffer(&self) -> &str {
982        &self.buffer[self.buffer_pos..]
983    }
984
985    /// Set COPY mode explicitly (called by DumpLoader when entering/exiting COPY blocks)
986    fn set_copy_mode(&mut self, enabled: bool) {
987        self.in_copy_mode = enabled;
988    }
989
990    /// Strip leading SQL comments (-- and /* */) from a string
991    fn strip_leading_sql_comments(s: &str) -> &str {
992        let mut result = s.trim();
993        loop {
994            // Skip -- line comments
995            if result.starts_with("--") {
996                if let Some(pos) = result.find('\n') {
997                    result = result[pos + 1..].trim();
998                    continue;
999                } else {
1000                    return ""; // Only comment, no newline
1001                }
1002            }
1003            // Skip /* */ block comments
1004            if result.starts_with("/*") {
1005                if let Some(pos) = result.find("*/") {
1006                    result = result[pos + 2..].trim();
1007                    continue;
1008                } else {
1009                    return ""; // Unclosed block comment
1010                }
1011            }
1012            break;
1013        }
1014        result
1015    }
1016
1017    fn next_statement(&mut self) -> Option<Result<String>> {
1018        if self.eof && self.remaining_buffer().is_empty() {
1019            return None;
1020        }
1021
1022        loop {
1023            // In COPY mode, return each line individually until we see \.
1024            if self.in_copy_mode {
1025                if let Some(line) = self.extract_copy_line() {
1026                    return Some(Ok(line));
1027                }
1028            } else {
1029                // Try to find a complete statement in the buffer
1030                if let Some(stmt) = self.extract_statement() {
1031                    // COPY mode is now managed explicitly by DumpLoader via set_copy_mode()
1032                    return Some(Ok(stmt));
1033                }
1034            }
1035
1036            // Compact buffer periodically to prevent unbounded growth
1037            // Only compact when processed portion is significant
1038            if self.buffer_pos > 64 * 1024 {
1039                self.compact_buffer();
1040            }
1041
1042            // Read more data
1043            let mut line = String::new();
1044            match self.reader.read_line(&mut line) {
1045                Ok(0) => {
1046                    self.eof = true;
1047                    self.in_copy_mode = false; // Reset on EOF
1048                    let remaining = self.remaining_buffer().trim();
1049                    if !remaining.is_empty() {
1050                        let stmt = remaining.to_string();
1051                        self.buffer.clear();
1052                        self.buffer_pos = 0;
1053                        return Some(Ok(stmt));
1054                    }
1055                    return None;
1056                }
1057                Ok(_) => {
1058                    self.buffer.push_str(&line);
1059                }
1060                Err(e) => return Some(Err(e.into())),
1061            }
1062        }
1063    }
1064
1065    /// Extract a single line from the buffer for COPY data mode
1066    fn extract_copy_line(&mut self) -> Option<String> {
1067        let remaining = self.remaining_buffer();
1068        if let Some(newline_pos) = remaining.find('\n') {
1069            let line = remaining[..newline_pos].to_string();
1070            self.buffer_pos += newline_pos + 1;
1071            // COPY mode is managed by DumpLoader via set_copy_mode(), not here
1072            Some(line)
1073        } else {
1074            None
1075        }
1076    }
1077
1078    fn extract_statement(&mut self) -> Option<String> {
1079        let remaining = self.remaining_buffer();
1080        let mut in_string = false;
1081        let mut in_dollar_quote = false;
1082        let mut in_bracket = false;
1083        let mut in_line_comment = false;
1084        let mut in_block_comment = false;
1085        let mut escape_next = false;
1086        let mut chars = remaining.char_indices().peekable();
1087        let mut end_pos = None;
1088
1089        // For MSSQL, check for GO at start of line
1090        if self.dialect == SqlDialect::Mssql {
1091            if let Some(go_pos) = self.find_go_separator() {
1092                let stmt = remaining[..go_pos].to_string();
1093                // Skip past GO and any whitespace
1094                let after_go = &remaining[go_pos..];
1095                if let Some(line_end) = after_go.find('\n') {
1096                    self.buffer_pos += go_pos + line_end + 1;
1097                } else {
1098                    self.buffer_pos = self.buffer.len();
1099                }
1100
1101                let trimmed = stmt.trim();
1102                if trimmed.is_empty()
1103                    || trimmed.starts_with("--")
1104                    || (trimmed.starts_with("/*") && !trimmed.contains("/*!"))
1105                {
1106                    return self.extract_statement();
1107                }
1108                return Some(stmt);
1109            }
1110        }
1111
1112        while let Some((i, c)) = chars.next() {
1113            if escape_next {
1114                escape_next = false;
1115                continue;
1116            }
1117
1118            // Handle line comments (-- to end of line)
1119            if in_line_comment {
1120                if c == '\n' {
1121                    in_line_comment = false;
1122                }
1123                continue;
1124            }
1125
1126            // Handle block comments (/* to */)
1127            if in_block_comment {
1128                if c == '*' && chars.peek().map(|(_, c)| *c == '/').unwrap_or(false) {
1129                    chars.next();
1130                    in_block_comment = false;
1131                }
1132                continue;
1133            }
1134
1135            match c {
1136                '\\' if self.dialect == SqlDialect::MySql && in_string => {
1137                    escape_next = true;
1138                }
1139                '\'' if !in_dollar_quote && !in_bracket => {
1140                    in_string = !in_string;
1141                }
1142                '[' if self.dialect == SqlDialect::Mssql && !in_string => {
1143                    in_bracket = true;
1144                }
1145                ']' if self.dialect == SqlDialect::Mssql && !in_string => {
1146                    // Handle ]] escape
1147                    if chars.peek().map(|(_, c)| *c == ']').unwrap_or(false) {
1148                        chars.next();
1149                    } else {
1150                        in_bracket = false;
1151                    }
1152                }
1153                '$' if self.dialect == SqlDialect::Postgres && !in_string => {
1154                    // Check for dollar quote
1155                    if chars.peek().map(|(_, c)| *c == '$').unwrap_or(false) {
1156                        in_dollar_quote = !in_dollar_quote;
1157                        chars.next();
1158                    }
1159                }
1160                '-' if !in_string && !in_dollar_quote && !in_bracket => {
1161                    // Check for -- line comment
1162                    if chars.peek().map(|(_, c)| *c == '-').unwrap_or(false) {
1163                        chars.next();
1164                        in_line_comment = true;
1165                    }
1166                }
1167                '/' if !in_string && !in_dollar_quote && !in_bracket => {
1168                    // Check for /* block comment
1169                    if chars.peek().map(|(_, c)| *c == '*').unwrap_or(false) {
1170                        chars.next();
1171                        in_block_comment = true;
1172                    }
1173                }
1174                ';' if !in_string && !in_dollar_quote && !in_bracket => {
1175                    end_pos = Some(i + 1);
1176                    break;
1177                }
1178                _ => {}
1179            }
1180        }
1181
1182        if let Some(pos) = end_pos {
1183            let stmt = remaining[..pos].to_string();
1184            // Skip past the statement and any leading whitespace
1185            let after_stmt = &remaining[pos..];
1186            let trimmed_len = after_stmt.len() - after_stmt.trim_start().len();
1187            self.buffer_pos += pos + trimmed_len;
1188
1189            // Skip empty statements and comments
1190            let trimmed = stmt.trim();
1191
1192            // Strip leading comments from the statement before checking if it's just a comment
1193            let stripped = Self::strip_leading_sql_comments(trimmed);
1194            if stripped.is_empty() {
1195                return self.extract_statement();
1196            }
1197
1198            // Use the stripped version for further processing
1199            let trimmed = stripped;
1200
1201            // For Postgres COPY statements, auto-enter copy mode
1202            // This prevents accumulating COPY data while looking for the next semicolon
1203            if self.dialect == SqlDialect::Postgres {
1204                let upper = trimmed.to_uppercase();
1205                if upper.ends_with("FROM STDIN;") && upper.contains("COPY ") {
1206                    self.in_copy_mode = true;
1207                }
1208            }
1209
1210            Some(stmt)
1211        } else {
1212            None
1213        }
1214    }
1215
1216    /// Find GO batch separator at start of line (MSSQL)
1217    fn find_go_separator(&self) -> Option<usize> {
1218        let remaining = self.remaining_buffer();
1219        let mut in_string = false;
1220        let mut in_bracket = false;
1221        let mut line_start = 0;
1222
1223        for (i, c) in remaining.char_indices() {
1224            if c == '\'' && !in_bracket {
1225                in_string = !in_string;
1226            } else if c == '[' && !in_string {
1227                in_bracket = true;
1228            } else if c == ']' && !in_string {
1229                in_bracket = false;
1230            } else if c == '\n' {
1231                line_start = i + 1;
1232            } else if !in_string && !in_bracket && i == line_start {
1233                // Check for GO at start of line
1234                let rest = &remaining[i..];
1235                if rest.len() >= 2 {
1236                    let word = &rest[..2.min(rest.len())];
1237                    if word.eq_ignore_ascii_case("GO") {
1238                        // Make sure it's just GO (not GO_SOMETHING)
1239                        let after_go = if rest.len() > 2 {
1240                            rest.chars().nth(2)
1241                        } else {
1242                            None
1243                        };
1244                        if after_go.is_none()
1245                            || after_go == Some('\n')
1246                            || after_go == Some('\r')
1247                            || after_go == Some(' ')
1248                            || after_go.unwrap().is_ascii_digit()
1249                        {
1250                            return Some(i);
1251                        }
1252                    }
1253                }
1254            }
1255        }
1256        None
1257    }
1258}
1259
1260#[cfg(test)]
1261mod tests {
1262    use super::*;
1263
1264    #[test]
1265    fn test_count_insert_rows() {
1266        assert_eq!(
1267            DumpLoader::count_insert_rows("INSERT INTO t VALUES (1, 'a')"),
1268            1
1269        );
1270        assert_eq!(
1271            DumpLoader::count_insert_rows("INSERT INTO t VALUES (1, 'a'), (2, 'b'), (3, 'c')"),
1272            3
1273        );
1274        assert_eq!(
1275            DumpLoader::count_insert_rows("INSERT INTO t VALUES (1, '(test)')"),
1276            1
1277        );
1278    }
1279
1280    #[test]
1281    fn test_strip_mysql_clauses() {
1282        let sql = "CREATE TABLE t (id INT) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4";
1283        let result = DumpLoader::strip_mysql_clauses(sql);
1284        assert!(!result.contains("ENGINE"));
1285        assert!(!result.contains("CHARSET"));
1286    }
1287
1288    #[test]
1289    fn test_convert_identifiers() {
1290        let sql = "INSERT INTO `users` (`id`, `name`) VALUES (1, 'test')";
1291        let result = DumpLoader::convert_identifiers(sql, SqlDialect::MySql);
1292        assert_eq!(
1293            result,
1294            "INSERT INTO \"users\" (\"id\", \"name\") VALUES (1, 'test')"
1295        );
1296    }
1297
1298    #[test]
1299    fn test_looks_like_copy_data() {
1300        assert!(DumpLoader::looks_like_copy_data("1\tAlice\t2024-01-01"));
1301        assert!(!DumpLoader::looks_like_copy_data("SELECT * FROM users"));
1302        assert!(!DumpLoader::looks_like_copy_data("INSERT INTO t VALUES"));
1303    }
1304
1305    #[test]
1306    fn test_strip_mssql_syntax() {
1307        let sql = r#"CREATE TABLE "users" (
1308    "id" INTEGER NOT NULL,
1309    "email" VARCHAR(255) NOT NULL
1310)"#;
1311        // The IDENTITY should have been stripped by strip_mssql_syntax
1312        let result = DumpLoader::strip_mssql_syntax(sql);
1313        assert!(!result.contains("IDENTITY"), "IDENTITY should be stripped");
1314
1315        // Test with IDENTITY(1,1) NOT NULL
1316        let sql_with_identity = r#"CREATE TABLE "users" (
1317    "id" INTEGER IDENTITY(1,1) NOT NULL,
1318    "email" VARCHAR(255) NOT NULL
1319)"#;
1320        let result2 = DumpLoader::strip_mssql_syntax(sql_with_identity);
1321        assert!(
1322            !result2.contains("IDENTITY"),
1323            "IDENTITY should be stripped: {}",
1324            result2
1325        );
1326        // Since we strip IDENTITY(1,1) NOT NULL, the column should become nullable
1327        assert!(
1328            !result2.contains("IDENTITY(1,1) NOT NULL"),
1329            "Should strip full IDENTITY NOT NULL"
1330        );
1331    }
1332
1333    #[test]
1334    fn test_convert_mssql_identifiers() {
1335        let sql = "INSERT INTO [dbo].[users] ([id], [name]) VALUES (1, N'test')";
1336        let result = DumpLoader::convert_identifiers(sql, SqlDialect::Mssql);
1337        assert_eq!(
1338            result,
1339            "INSERT INTO \"dbo\".\"users\" (\"id\", \"name\") VALUES (1, 'test')"
1340        );
1341    }
1342}