sql_splitter/duckdb/
loader.rs

1//! SQL dump loader for importing dumps into DuckDB.
2
3use super::batch::{flush_batch, BatchManager, MAX_ROWS_PER_BATCH};
4use super::types::TypeConverter;
5use super::{ImportStats, QueryConfig};
6use crate::convert::copy_to_insert::{copy_to_inserts, parse_copy_header, CopyHeader};
7use crate::parser::{
8    detect_dialect_from_file, parse_insert_for_bulk, Parser, SqlDialect, StatementType,
9};
10use crate::progress::ProgressReader;
11use crate::splitter::Compression;
12use anyhow::{Context, Result};
13use duckdb::Connection;
14use indicatif::{ProgressBar, ProgressStyle};
15use once_cell::sync::Lazy;
16use regex::Regex;
17use std::fs::File;
18use std::io::{BufRead, BufReader, Read};
19use std::path::Path;
20
21/// Maximum COPY rows to accumulate per batch before converting to INSERTs.
22/// This bounds memory usage while still enabling large batches for performance.
23const MAX_COPY_ROWS_PER_BATCH: usize = 10_000;
24
25/// Loads SQL dumps into a DuckDB database
26pub struct DumpLoader<'a> {
27    conn: &'a Connection,
28    config: &'a QueryConfig,
29}
30
31impl<'a> DumpLoader<'a> {
32    /// Create a new dump loader
33    pub fn new(conn: &'a Connection, config: &'a QueryConfig) -> Self {
34        Self { conn, config }
35    }
36
37    /// Load a SQL dump file into DuckDB
38    pub fn load(&self, dump_path: &Path) -> Result<ImportStats> {
39        let start = std::time::Instant::now();
40        let mut stats = ImportStats::default();
41
42        // Detect dialect
43        let dialect = if let Some(d) = self.config.dialect {
44            d
45        } else {
46            let result = detect_dialect_from_file(dump_path)?;
47            result.dialect
48        };
49
50        // Get file size for progress
51        let file_size = std::fs::metadata(dump_path)?.len();
52
53        // Set up progress bar
54        let progress_bar = if self.config.progress {
55            let pb = ProgressBar::new(file_size);
56            pb.set_style(
57                ProgressStyle::default_bar()
58                    .template("{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {bytes}/{total_bytes} ({percent}%)")
59                    .unwrap()
60                    .progress_chars("=>-"),
61            );
62            Some(pb)
63        } else {
64            None
65        };
66
67        // Open file with compression detection
68        let file = File::open(dump_path).context("Failed to open dump file")?;
69        let compression = Compression::from_path(dump_path);
70        let reader: Box<dyn Read> = match compression {
71            Compression::Gzip => Box::new(flate2::read::GzDecoder::new(file)),
72            Compression::Bzip2 => Box::new(bzip2::read::BzDecoder::new(file)),
73            Compression::Xz => Box::new(xz2::read::XzDecoder::new(file)),
74            Compression::Zstd => Box::new(zstd::stream::Decoder::new(file)?),
75            Compression::None => Box::new(file),
76        };
77
78        let reader: Box<dyn Read> = if let Some(ref pb) = progress_bar {
79            let pb_clone = pb.clone();
80            Box::new(ProgressReader::new(reader, move |bytes| {
81                pb_clone.set_position(bytes);
82            }))
83        } else {
84            reader
85        };
86
87        let buf_reader = BufReader::with_capacity(256 * 1024, reader);
88
89        // Parse and load statements
90        self.load_statements(buf_reader, dialect, &mut stats)?;
91
92        if let Some(pb) = progress_bar {
93            pb.finish_with_message("Import complete");
94        }
95
96        stats.duration_secs = start.elapsed().as_secs_f64();
97        Ok(stats)
98    }
99
100    /// Load statements from a reader
101    fn load_statements<R: Read>(
102        &self,
103        reader: BufReader<R>,
104        dialect: SqlDialect,
105        stats: &mut ImportStats,
106    ) -> Result<()> {
107        let mut parser = StatementReader::new(reader, dialect);
108        let mut pending_copy: Option<CopyHeader> = None;
109
110        // Batched COPY data accumulator
111        let mut copy_batch_data: Vec<u8> = Vec::new();
112        let mut copy_batch_rows: usize = 0;
113
114        // Track tables that failed (don't exist) to skip subsequent inserts
115        let mut failed_tables: std::collections::HashSet<String> = std::collections::HashSet::new();
116
117        // Batch manager for bulk INSERT loading via Appender API
118        let mut batch_mgr = BatchManager::new(MAX_ROWS_PER_BATCH);
119
120        while let Some(stmt_result) = parser.next_statement() {
121            let stmt = stmt_result?;
122
123            // Handle COPY data blocks with batching
124            if let Some(ref header) = pending_copy {
125                let trimmed = stmt.trim();
126
127                // End-of-COPY marker: "\."
128                if trimmed == "\\." {
129                    // Flush any remaining batched rows
130                    if !copy_batch_data.is_empty() {
131                        self.process_copy_batch(
132                            header,
133                            &copy_batch_data,
134                            stats,
135                            &mut failed_tables,
136                        );
137                        copy_batch_data.clear();
138                        copy_batch_rows = 0;
139                    }
140                    pending_copy = None;
141                    parser.set_copy_mode(false);
142                    continue;
143                }
144
145                // Empty/whitespace line inside COPY: skip
146                if trimmed.is_empty() {
147                    continue;
148                }
149
150                // Normal COPY data line - accumulate into batch
151                if Self::looks_like_copy_data(&stmt) {
152                    // Skip if we know this table doesn't exist
153                    if failed_tables.contains(&header.table) {
154                        continue;
155                    }
156
157                    copy_batch_data.extend_from_slice(stmt.as_bytes());
158                    copy_batch_data.push(b'\n');
159                    copy_batch_rows += 1;
160
161                    // Flush when batch gets large enough
162                    if copy_batch_rows >= MAX_COPY_ROWS_PER_BATCH {
163                        self.process_copy_batch(
164                            header,
165                            &copy_batch_data,
166                            stats,
167                            &mut failed_tables,
168                        );
169                        copy_batch_data.clear();
170                        copy_batch_rows = 0;
171                    }
172                    continue;
173                }
174
175                // Unexpected content inside COPY - flush and fall through to normal handling
176                if !copy_batch_data.is_empty() {
177                    self.process_copy_batch(header, &copy_batch_data, stats, &mut failed_tables);
178                    copy_batch_data.clear();
179                    copy_batch_rows = 0;
180                }
181                pending_copy = None;
182                parser.set_copy_mode(false);
183                // Fall through to normal statement handling
184            }
185
186            let (mut stmt_type, table_name) =
187                Parser::<&[u8]>::parse_statement_with_dialect(stmt.as_bytes(), dialect);
188
189            // For Postgres, check if statement contains COPY ... FROM stdin (may be after comments)
190            if dialect == SqlDialect::Postgres && stmt_type == StatementType::Unknown {
191                let upper = stmt.to_uppercase();
192                if let Some(copy_pos) = upper.find("COPY ") {
193                    let after_copy = &upper[copy_pos..];
194                    if after_copy.contains("FROM STDIN") {
195                        stmt_type = StatementType::Copy;
196                    }
197                }
198            }
199
200            // Filter tables if specified
201            if let Some(ref tables) = self.config.tables {
202                if !table_name.is_empty()
203                    && !tables.iter().any(|t| t.eq_ignore_ascii_case(&table_name))
204                {
205                    continue;
206                }
207            }
208
209            match stmt_type {
210                StatementType::CreateTable => {
211                    let duckdb_sql = self.convert_create_table(&stmt, dialect)?;
212                    match self.conn.execute(&duckdb_sql, []) {
213                        Ok(_) => stats.tables_created += 1,
214                        Err(e) => {
215                            stats
216                                .warnings
217                                .push(format!("Failed to create table {}: {}", table_name, e));
218                            stats.statements_skipped += 1;
219                        }
220                    }
221                }
222                StatementType::Insert => {
223                    // Try bulk loading via Appender API first
224                    if !self.try_queue_for_bulk(
225                        &stmt,
226                        dialect,
227                        &mut batch_mgr,
228                        stats,
229                        &mut failed_tables,
230                    ) {
231                        // Fallback to direct execution
232                        let duckdb_sql = self.convert_insert(&stmt, dialect)?;
233                        match self.conn.execute(&duckdb_sql, []) {
234                            Ok(_) => {
235                                stats.insert_statements += 1;
236                                stats.rows_inserted += Self::count_insert_rows(&duckdb_sql);
237                            }
238                            Err(e) => {
239                                stats
240                                    .warnings
241                                    .push(format!("Failed INSERT for {}: {}", table_name, e));
242                                stats.statements_skipped += 1;
243                            }
244                        }
245                    }
246
247                    // Flush any ready batches
248                    for mut batch in batch_mgr.get_ready_batches() {
249                        flush_batch(self.conn, &mut batch, stats, &mut failed_tables)?;
250                    }
251                }
252                StatementType::Copy => {
253                    // Parse COPY header and start buffering data
254                    if let Some(header) = parse_copy_header(&stmt) {
255                        // Check if table already known to be missing
256                        if failed_tables.contains(&header.table) {
257                            // Enter COPY mode to skip line-by-line, then exit
258                            parser.set_copy_mode(true);
259                            Self::skip_copy_block(&mut parser);
260                            parser.set_copy_mode(false);
261                            continue;
262                        }
263
264                        // Proactively check if table exists before buffering data
265                        if !self.table_exists(&header.table) {
266                            failed_tables.insert(header.table.clone());
267                            if stats.warnings.len() < 100 {
268                                stats.warnings.push(format!(
269                                    "Skipping COPY for non-existent table {}",
270                                    header.table
271                                ));
272                            }
273                            // Enter COPY mode to skip line-by-line, then exit
274                            parser.set_copy_mode(true);
275                            Self::skip_copy_block(&mut parser);
276                            parser.set_copy_mode(false);
277                            continue;
278                        }
279
280                        copy_batch_data.clear();
281                        copy_batch_rows = 0;
282                        pending_copy = Some(header);
283                        parser.set_copy_mode(true);
284                    }
285                }
286                StatementType::CreateIndex => {
287                    // Skip indexes - not needed for analytics queries
288                    stats.statements_skipped += 1;
289                }
290                _ => {
291                    // Skip other statements (ALTER, DROP, etc.)
292                    stats.statements_skipped += 1;
293                }
294            }
295        }
296
297        // Flush any remaining COPY batch (handles truncated dumps)
298        if let Some(ref header) = pending_copy {
299            if !copy_batch_data.is_empty() {
300                self.process_copy_batch(header, &copy_batch_data, stats, &mut failed_tables);
301            }
302        }
303
304        // Flush any remaining INSERT batches
305        for mut batch in batch_mgr.drain_all() {
306            flush_batch(self.conn, &mut batch, stats, &mut failed_tables)?;
307        }
308
309        Ok(())
310    }
311
312    /// Process a batch of COPY data rows, converting them to INSERTs
313    fn process_copy_batch(
314        &self,
315        header: &CopyHeader,
316        batch_data: &[u8],
317        stats: &mut ImportStats,
318        failed_tables: &mut std::collections::HashSet<String>,
319    ) {
320        if batch_data.is_empty() {
321            return;
322        }
323
324        // Skip if we already know this table doesn't exist
325        if failed_tables.contains(&header.table) {
326            return;
327        }
328
329        let inserts = copy_to_inserts(header, batch_data, SqlDialect::Postgres);
330        for insert in inserts {
331            let insert_sql = String::from_utf8_lossy(&insert);
332            match self.conn.execute(&insert_sql, []) {
333                Ok(_) => {
334                    stats.rows_inserted += Self::count_insert_rows(&insert_sql);
335                }
336                Err(e) => {
337                    let err_str = e.to_string();
338                    // If table doesn't exist, mark it and skip future inserts
339                    if err_str.contains("does not exist") {
340                        failed_tables.insert(header.table.clone());
341                        if stats.warnings.len() < 100 {
342                            stats.warnings.push(format!(
343                                "Table {} does not exist, skipping COPY data",
344                                header.table
345                            ));
346                        }
347                        return; // Skip rest of batch
348                    }
349                    // Limit warnings to avoid memory bloat on large failures
350                    if stats.warnings.len() < 100 {
351                        stats.warnings.push(format!(
352                            "Failed to insert COPY data for {}: {}",
353                            header.table, e
354                        ));
355                    }
356                    stats.statements_skipped += 1;
357                }
358            }
359        }
360    }
361
362    /// Check if a table exists in DuckDB
363    fn table_exists(&self, table: &str) -> bool {
364        let query = "SELECT 1 FROM information_schema.tables WHERE table_name = ? LIMIT 1";
365        match self.conn.prepare(query) {
366            Ok(mut stmt) => stmt.exists([table]).unwrap_or(false),
367            Err(_) => false,
368        }
369    }
370
371    /// Try to queue an INSERT statement for bulk loading via Appender API.
372    /// Returns true if successfully queued, false if fallback to direct execution is needed.
373    fn try_queue_for_bulk(
374        &self,
375        stmt: &str,
376        dialect: SqlDialect,
377        batch_mgr: &mut BatchManager,
378        stats: &mut ImportStats,
379        failed_tables: &mut std::collections::HashSet<String>,
380    ) -> bool {
381        // Quick filter: skip statements with complex clauses that Appender can't handle
382        let upper = stmt.to_uppercase();
383        if upper.contains("ON DUPLICATE KEY")
384            || upper.contains("ON CONFLICT")
385            || upper.contains("REPLACE")
386            || upper.contains("IGNORE")
387            || upper.contains("RETURNING")
388            || upper.contains("SELECT")
389        {
390            return false;
391        }
392
393        // Try to parse the INSERT statement
394        let parsed = match parse_insert_for_bulk(stmt.as_bytes()) {
395            Ok(p) => p,
396            Err(_) => return false, // Parse failed, use fallback
397        };
398
399        // Skip tables we know don't exist
400        if failed_tables.contains(&parsed.table) {
401            return true; // Pretend we handled it to skip the statement
402        }
403
404        // Skip if no rows were parsed
405        if parsed.rows.is_empty() {
406            return false;
407        }
408
409        // Convert the statement for DuckDB (for fallback)
410        let duckdb_sql = match self.convert_insert(stmt, dialect) {
411            Ok(sql) => sql,
412            Err(_) => return false,
413        };
414
415        // Queue the rows
416        if let Some(mut batch) =
417            batch_mgr.queue_insert(&parsed.table, parsed.columns, parsed.rows, duckdb_sql)
418        {
419            // Batch is ready to flush
420            if let Err(e) = flush_batch(self.conn, &mut batch, stats, failed_tables) {
421                if stats.warnings.len() < 100 {
422                    stats.warnings.push(format!("Batch flush error: {}", e));
423                }
424            }
425        }
426
427        true
428    }
429
430    /// Skip a COPY data block without parsing/processing it
431    fn skip_copy_block<R: Read>(parser: &mut StatementReader<R>) {
432        while let Some(Ok(line)) = parser.next_statement() {
433            if line.trim() == "\\." {
434                break;
435            }
436        }
437    }
438
439    /// Check if a line looks like COPY data (tab-separated values or single-column values)
440    fn looks_like_copy_data(line: &str) -> bool {
441        let trimmed = line.trim();
442
443        // Empty line is not COPY data
444        if trimmed.is_empty() {
445            return false;
446        }
447
448        // End-of-COPY marker
449        if trimmed == "\\." {
450            return false;
451        }
452
453        // Lines with tabs are definitely COPY data (multi-column)
454        // For single-column data, we need to check it doesn't look like SQL
455        let first_char = trimmed.chars().next().unwrap_or(' ');
456
457        // Quick check: if starts with common SQL keyword first char, verify it's not SQL
458        if matches!(
459            first_char,
460            'S' | 's'
461                | 'I'
462                | 'i'
463                | 'C'
464                | 'c'
465                | 'D'
466                | 'd'
467                | 'A'
468                | 'a'
469                | 'U'
470                | 'u'
471                | 'G'
472                | 'g'
473                | '-'
474                | '/'
475        ) {
476            let upper_prefix: String = trimmed.chars().take(7).collect::<String>().to_uppercase();
477            if upper_prefix.starts_with("SELECT")
478                || upper_prefix.starts_with("INSERT")
479                || upper_prefix.starts_with("CREATE")
480                || upper_prefix.starts_with("DROP")
481                || upper_prefix.starts_with("ALTER")
482                || upper_prefix.starts_with("UPDATE")
483                || upper_prefix.starts_with("GRANT")
484                || upper_prefix.starts_with("--")
485                || upper_prefix.starts_with("/*")
486            {
487                return false;
488            }
489        }
490
491        // If it contains a tab, it's multi-column COPY data
492        // If it doesn't contain a tab but passed the SQL check, it's single-column data
493        true
494    }
495
496    /// Convert a CREATE TABLE statement to DuckDB-compatible SQL
497    fn convert_create_table(&self, stmt: &str, dialect: SqlDialect) -> Result<String> {
498        let mut result = stmt.to_string();
499
500        // Convert identifier quoting
501        result = Self::convert_identifiers(&result, dialect);
502
503        // Remove MySQL-specific clauses FIRST (before type conversion)
504        // This prevents "CHARACTER SET" from being confused with "CHAR" type
505        result = Self::strip_mysql_clauses(&result);
506
507        // Convert data types in column definitions
508        result = Self::convert_types_in_statement(&result);
509
510        // Remove PostgreSQL-specific syntax
511        if dialect == SqlDialect::Postgres {
512            result = Self::strip_postgres_syntax(&result);
513        }
514
515        // Remove MSSQL-specific syntax
516        if dialect == SqlDialect::Mssql {
517            result = Self::strip_mssql_syntax(&result);
518        }
519
520        // Remove SQLite-specific syntax
521        if dialect == SqlDialect::Sqlite {
522            result = Self::strip_sqlite_syntax(&result);
523        }
524
525        Ok(result)
526    }
527
528    /// Convert an INSERT statement to DuckDB-compatible SQL
529    fn convert_insert(&self, stmt: &str, dialect: SqlDialect) -> Result<String> {
530        let mut result = stmt.to_string();
531
532        // Convert identifier quoting
533        result = Self::convert_identifiers(&result, dialect);
534
535        // Convert MySQL backslash escapes to SQL standard
536        if dialect == SqlDialect::MySql {
537            result = Self::convert_mysql_escapes(&result);
538        }
539
540        // Remove PostgreSQL schema prefix
541        if dialect == SqlDialect::Postgres {
542            result = Self::strip_schema_prefix(&result);
543        }
544
545        // Remove MSSQL schema prefix (dbo., etc.)
546        if dialect == SqlDialect::Mssql {
547            result = Self::strip_mssql_schema_prefix(&result);
548        }
549
550        Ok(result)
551    }
552
553    /// Convert MySQL backslash escapes to SQL standard
554    fn convert_mysql_escapes(stmt: &str) -> String {
555        let mut result = String::with_capacity(stmt.len() + 100);
556        let mut chars = stmt.chars().peekable();
557        let mut in_string = false;
558
559        while let Some(c) = chars.next() {
560            if c == '\'' {
561                in_string = !in_string;
562                result.push(c);
563            } else if c == '\\' && in_string {
564                // Handle MySQL escape sequences
565                match chars.peek() {
566                    Some('\'') => {
567                        // \' -> ''
568                        chars.next();
569                        result.push_str("''");
570                    }
571                    Some('\\') => {
572                        // \\ -> \
573                        chars.next();
574                        result.push('\\');
575                    }
576                    Some('n') => {
577                        // \n -> newline
578                        chars.next();
579                        result.push('\n');
580                    }
581                    Some('r') => {
582                        // \r -> carriage return
583                        chars.next();
584                        result.push('\r');
585                    }
586                    Some('t') => {
587                        // \t -> tab
588                        chars.next();
589                        result.push('\t');
590                    }
591                    Some('0') => {
592                        // \0 -> NULL character (skip)
593                        chars.next();
594                    }
595                    Some('"') => {
596                        // \" -> "
597                        chars.next();
598                        result.push('"');
599                    }
600                    _ => {
601                        // Unknown escape, keep backslash
602                        result.push(c);
603                    }
604                }
605            } else {
606                result.push(c);
607            }
608        }
609        result
610    }
611
612    /// Convert identifier quoting (backticks/brackets to double quotes)
613    fn convert_identifiers(stmt: &str, dialect: SqlDialect) -> String {
614        match dialect {
615            SqlDialect::MySql => {
616                // Convert backticks to double quotes
617                let mut result = String::with_capacity(stmt.len());
618                let mut in_string = false;
619                let mut in_backtick = false;
620
621                for c in stmt.chars() {
622                    if c == '\'' && !in_backtick {
623                        in_string = !in_string;
624                        result.push(c);
625                    } else if c == '`' && !in_string {
626                        in_backtick = !in_backtick;
627                        result.push('"');
628                    } else {
629                        result.push(c);
630                    }
631                }
632                result
633            }
634            SqlDialect::Mssql => {
635                // Convert brackets to double quotes, strip N prefix from strings
636                let mut result = String::with_capacity(stmt.len());
637                let mut in_string = false;
638                let mut in_bracket = false;
639                let mut chars = stmt.chars().peekable();
640
641                while let Some(c) = chars.next() {
642                    if c == '\'' && !in_bracket {
643                        in_string = !in_string;
644                        result.push(c);
645                    } else if c == '[' && !in_string {
646                        in_bracket = true;
647                        result.push('"');
648                    } else if c == ']' && !in_string {
649                        // Handle ]] escape
650                        if chars.peek() == Some(&']') {
651                            chars.next();
652                            result.push(']');
653                        } else {
654                            in_bracket = false;
655                            result.push('"');
656                        }
657                    } else if c == 'N' && !in_string && !in_bracket && chars.peek() == Some(&'\'') {
658                        // Strip N prefix from N'string'
659                        // Don't push the N, the quote will be pushed in next iteration
660                    } else {
661                        result.push(c);
662                    }
663                }
664                result
665            }
666            _ => stmt.to_string(),
667        }
668    }
669
670    /// Convert SQL types in a statement
671    fn convert_types_in_statement(stmt: &str) -> String {
672        // Pattern to match column definitions with types
673        // Handles: TYPE, TYPE(size), TYPE UNSIGNED, TYPE WITH TIME ZONE
674        // IMPORTANT: Order matters - longer types first to avoid partial matches (INTEGER before INT)
675        // IMPORTANT: Types must be preceded by quote/whitespace AND followed by whitespace/paren/comma (not a closing quote)
676        // This prevents matching "date" as DATE type (column names inside quotes)
677        // Includes MySQL, PostgreSQL, SQLite, and MSSQL types
678        static RE_COLUMN_TYPE: Lazy<Regex> = Lazy::new(|| {
679            Regex::new(r#"(?i)(["'`\]\s])\s*(BIGSERIAL|SMALLSERIAL|SERIAL|BIGINT|SMALLINT|MEDIUMINT|TINYINT|INTEGER|INT|DOUBLE\s+PRECISION|DOUBLE|FLOAT|DECIMAL|NUMERIC|CHARACTER\s+VARYING|NVARCHAR|NCHAR|VARCHAR|CHAR|VARBINARY|BINARY|LONGTEXT|MEDIUMTEXT|TINYTEXT|NTEXT|TEXT|LONGBLOB|MEDIUMBLOB|TINYBLOB|IMAGE|BLOB|DATETIME2|DATETIMEOFFSET|SMALLDATETIME|DATETIME|TIMESTAMPTZ|TIMESTAMP|TIMETZ|TIME|DATE|YEAR|ENUM|SET|JSONB|JSON|UUID|UNIQUEIDENTIFIER|BYTEA|BOOLEAN|BOOL|BIT|REAL|MONEY|SMALLMONEY|INTERVAL|ROWVERSION|XML|SQL_VARIANT)\b(\s*\([^)]+\))?(\s+(?:UNSIGNED|WITH(?:OUT)?\s+TIME\s+ZONE))?"#).unwrap()
680        });
681
682        RE_COLUMN_TYPE
683            .replace_all(stmt, |caps: &regex::Captures| {
684                let full_match = caps.get(0).unwrap().as_str();
685                let leading_char = caps.get(1).unwrap().as_str();
686                let type_part = caps.get(2).unwrap().as_str();
687                let size_part = caps.get(3).map(|m| m.as_str()).unwrap_or("");
688                let suffix = caps.get(4).map(|m| m.as_str()).unwrap_or("");
689                
690                // Check if this looks like a quoted identifier (type is inside quotes)
691                // If leading char is a quote and the character before the match is also a quote, skip
692                let end_pos = caps.get(0).unwrap().end();
693                let stmt_bytes = stmt.as_bytes();
694                if end_pos < stmt_bytes.len() {
695                    let next_char = stmt_bytes[end_pos] as char;
696                    // If next character is a closing quote, this is a quoted identifier, not a type
697                    if next_char == '"' || next_char == '\'' || next_char == '`' {
698                        return full_match.to_string();
699                    }
700                }
701                
702                // Calculate the whitespace between leading char and type
703                let ws_len = full_match.len() - leading_char.len() - type_part.len() - size_part.len() - suffix.len();
704                let ws = &full_match[leading_char.len()..leading_char.len() + ws_len];
705                
706                let converted = TypeConverter::convert(&format!("{}{}{}", type_part, size_part, suffix));
707                format!("{}{}{}", leading_char, ws, converted)
708            })
709            .to_string()
710    }
711
712    /// Strip MySQL-specific clauses
713    fn strip_mysql_clauses(stmt: &str) -> String {
714        let mut result = stmt.to_string();
715
716        // Remove ENGINE clause
717        static RE_ENGINE: Lazy<Regex> =
718            Lazy::new(|| Regex::new(r"(?i)\s*ENGINE\s*=\s*\w+").unwrap());
719        result = RE_ENGINE.replace_all(&result, "").to_string();
720
721        // Remove AUTO_INCREMENT clause at table level
722        static RE_AUTO_INC: Lazy<Regex> =
723            Lazy::new(|| Regex::new(r"(?i)\s*AUTO_INCREMENT\s*=\s*\d+").unwrap());
724        result = RE_AUTO_INC.replace_all(&result, "").to_string();
725
726        // Remove column AUTO_INCREMENT
727        result = result.replace(" AUTO_INCREMENT", "");
728        result = result.replace(" auto_increment", "");
729
730        // Remove CHARACTER SET in column definitions (must come before CHARSET)
731        static RE_CHAR_SET: Lazy<Regex> =
732            Lazy::new(|| Regex::new(r"(?i)\s*CHARACTER\s+SET\s+\w+").unwrap());
733        result = RE_CHAR_SET.replace_all(&result, "").to_string();
734
735        // Remove DEFAULT CHARSET
736        static RE_CHARSET: Lazy<Regex> =
737            Lazy::new(|| Regex::new(r"(?i)\s*(DEFAULT\s+)?CHARSET\s*=\s*\w+").unwrap());
738        result = RE_CHARSET.replace_all(&result, "").to_string();
739
740        // Remove COLLATE
741        static RE_COLLATE: Lazy<Regex> =
742            Lazy::new(|| Regex::new(r"(?i)\s*COLLATE\s*=?\s*\w+").unwrap());
743        result = RE_COLLATE.replace_all(&result, "").to_string();
744
745        // Remove ROW_FORMAT
746        static RE_ROW_FORMAT: Lazy<Regex> =
747            Lazy::new(|| Regex::new(r"(?i)\s*ROW_FORMAT\s*=\s*\w+").unwrap());
748        result = RE_ROW_FORMAT.replace_all(&result, "").to_string();
749
750        // Remove KEY_BLOCK_SIZE
751        static RE_KEY_BLOCK: Lazy<Regex> =
752            Lazy::new(|| Regex::new(r"(?i)\s*KEY_BLOCK_SIZE\s*=\s*\d+").unwrap());
753        result = RE_KEY_BLOCK.replace_all(&result, "").to_string();
754
755        // Remove COMMENT
756        static RE_COMMENT: Lazy<Regex> =
757            Lazy::new(|| Regex::new(r"(?i)\s*COMMENT\s*=?\s*'[^']*'").unwrap());
758        result = RE_COMMENT.replace_all(&result, "").to_string();
759
760        // Remove MySQL conditional comments
761        static RE_COND_COMMENT: Lazy<Regex> = Lazy::new(|| Regex::new(r"/\*!\d+\s*|\*/").unwrap());
762        result = RE_COND_COMMENT.replace_all(&result, "").to_string();
763
764        // Remove ON UPDATE CURRENT_TIMESTAMP
765        static RE_ON_UPDATE: Lazy<Regex> =
766            Lazy::new(|| Regex::new(r"(?i)\s*ON\s+UPDATE\s+CURRENT_TIMESTAMP").unwrap());
767        result = RE_ON_UPDATE.replace_all(&result, "").to_string();
768
769        // Remove UNIQUE KEY constraint lines: UNIQUE KEY `name` (`col1`, `col2`)
770        // Must handle both: ,UNIQUE KEY... at end of column list and UNIQUE KEY... on its own line
771        static RE_UNIQUE_KEY: Lazy<Regex> = Lazy::new(|| {
772            Regex::new(r#"(?i),?\s*UNIQUE\s+KEY\s+[`"']?\w+[`"']?\s*\([^)]+\)"#).unwrap()
773        });
774        result = RE_UNIQUE_KEY.replace_all(&result, "").to_string();
775
776        // Remove KEY (index) constraint lines: KEY `name` (`col1`, `col2`)
777        // This handles regular indexes and FULLTEXT indexes, but NOT PRIMARY KEY or FOREIGN KEY
778        // We use a negative lookbehind pattern by only matching KEY that is preceded by comma or newline (not FOREIGN/PRIMARY)
779        static RE_KEY_INDEX: Lazy<Regex> = Lazy::new(|| {
780            Regex::new(r#"(?i)(?:,\s*|\n\s*)(?:FULLTEXT\s+|SPATIAL\s+)?KEY\s+[`"']?\w+[`"']?\s*\([^)]+\)"#).unwrap()
781        });
782        result = RE_KEY_INDEX.replace_all(&result, "").to_string();
783
784        // Remove GENERATED ALWAYS AS columns entirely
785        // Match: `col` TYPE GENERATED ALWAYS AS (expr) STORED/VIRTUAL
786        // The expression can contain nested parentheses so we match one level deep
787        static RE_GENERATED_COL: Lazy<Regex> = Lazy::new(|| {
788            Regex::new(r#"(?i),?\s*[`"']?\w+[`"']?\s+\w+\s+GENERATED\s+ALWAYS\s+AS\s*\((?:[^()]+|\([^()]*\))+\)\s*(?:STORED|VIRTUAL)?"#).unwrap()
789        });
790        result = RE_GENERATED_COL.replace_all(&result, "").to_string();
791
792        // Remove entire FOREIGN KEY constraints (DuckDB enforces them which causes issues with batch loading)
793        // Match: CONSTRAINT `name` FOREIGN KEY (...) REFERENCES ... [ON DELETE/UPDATE ...]
794        // or just: FOREIGN KEY (...) REFERENCES ... [ON DELETE/UPDATE ...]
795        static RE_FK_CONSTRAINT: Lazy<Regex> = Lazy::new(|| {
796            Regex::new(r#"(?i),?\s*(?:CONSTRAINT\s+[`"']?\w+[`"']?\s+)?FOREIGN\s+KEY\s*\([^)]+\)\s*REFERENCES\s+[`"']?\w+[`"']?\s*\([^)]+\)(?:\s+ON\s+(?:DELETE|UPDATE)\s+(?:CASCADE|SET\s+NULL|SET\s+DEFAULT|NO\s+ACTION|RESTRICT))*"#).unwrap()
797        });
798        result = RE_FK_CONSTRAINT.replace_all(&result, "").to_string();
799
800        result
801    }
802
803    /// Strip PostgreSQL-specific syntax
804    fn strip_postgres_syntax(stmt: &str) -> String {
805        let mut result = stmt.to_string();
806
807        // Remove schema prefix
808        result = Self::strip_schema_prefix(&result);
809
810        // Remove type casts
811        static RE_CAST: Lazy<Regex> = Lazy::new(|| {
812            Regex::new(r"::[a-zA-Z_][a-zA-Z0-9_]*(?:\s+[a-zA-Z_][a-zA-Z0-9_]*)*").unwrap()
813        });
814        result = RE_CAST.replace_all(&result, "").to_string();
815
816        // Remove nextval() - DuckDB handles sequences differently
817        static RE_NEXTVAL: Lazy<Regex> =
818            Lazy::new(|| Regex::new(r"(?i)\s*DEFAULT\s+nextval\s*\([^)]+\)").unwrap());
819        result = RE_NEXTVAL.replace_all(&result, "").to_string();
820
821        // Convert now() to CURRENT_TIMESTAMP
822        static RE_NOW: Lazy<Regex> =
823            Lazy::new(|| Regex::new(r"(?i)\bDEFAULT\s+now\s*\(\s*\)").unwrap());
824        result = RE_NOW
825            .replace_all(&result, "DEFAULT CURRENT_TIMESTAMP")
826            .to_string();
827
828        // Remove INHERITS clause
829        static RE_INHERITS: Lazy<Regex> =
830            Lazy::new(|| Regex::new(r"(?i)\s*INHERITS\s*\([^)]+\)").unwrap());
831        result = RE_INHERITS.replace_all(&result, "").to_string();
832
833        // Remove WITH clause (storage parameters)
834        static RE_WITH: Lazy<Regex> = Lazy::new(|| Regex::new(r"(?i)\s*WITH\s*\([^)]+\)").unwrap());
835        result = RE_WITH.replace_all(&result, "").to_string();
836
837        result
838    }
839
840    /// Strip schema prefix (e.g., public.users -> users)
841    fn strip_schema_prefix(stmt: &str) -> String {
842        static RE_SCHEMA: Lazy<Regex> =
843            Lazy::new(|| Regex::new(r#"(?i)\b(public|pg_catalog|pg_temp)\s*\.\s*"#).unwrap());
844        RE_SCHEMA.replace_all(stmt, "").to_string()
845    }
846
847    /// Strip MSSQL schema prefix (dbo., etc.) for both CREATE TABLE and INSERT
848    fn strip_mssql_schema_prefix(stmt: &str) -> String {
849        // Remove schema prefix (dbo., schema.) - quoted or unquoted
850        static RE_SCHEMA: Lazy<Regex> =
851            Lazy::new(|| Regex::new(r#"(?i)"?(dbo|master|tempdb|model|msdb)"?\s*\.\s*"#).unwrap());
852        RE_SCHEMA.replace_all(stmt, "").to_string()
853    }
854
855    /// Strip MSSQL-specific syntax
856    fn strip_mssql_syntax(stmt: &str) -> String {
857        let mut result = Self::strip_mssql_schema_prefix(stmt);
858
859        // Remove IDENTITY clause and make the column nullable (so INSERTs without id work)
860        // Pattern matches: INT IDENTITY(1,1) NOT NULL -> INT
861        static RE_IDENTITY_NOT_NULL: Lazy<Regex> = Lazy::new(|| {
862            Regex::new(r"(?i)\s*IDENTITY\s*\(\s*\d+\s*,\s*\d+\s*\)\s*NOT\s+NULL").unwrap()
863        });
864        result = RE_IDENTITY_NOT_NULL.replace_all(&result, "").to_string();
865
866        // Also handle IDENTITY without NOT NULL
867        static RE_IDENTITY: Lazy<Regex> =
868            Lazy::new(|| Regex::new(r"(?i)\s*IDENTITY\s*\(\s*\d+\s*,\s*\d+\s*\)").unwrap());
869        result = RE_IDENTITY.replace_all(&result, "").to_string();
870
871        // Remove CLUSTERED/NONCLUSTERED
872        static RE_CLUSTERED: Lazy<Regex> =
873            Lazy::new(|| Regex::new(r"(?i)\s*(?:NON)?CLUSTERED\s*").unwrap());
874        result = RE_CLUSTERED.replace_all(&result, " ").to_string();
875
876        // Remove ON [PRIMARY] (filegroup)
877        static RE_FILEGROUP: Lazy<Regex> =
878            Lazy::new(|| Regex::new(r#"(?i)\s*ON\s*"?PRIMARY"?"#).unwrap());
879        result = RE_FILEGROUP.replace_all(&result, "").to_string();
880
881        // Remove PRIMARY KEY constraints (they make columns NOT NULL which breaks IDENTITY column INSERTs)
882        static RE_PK_CONSTRAINT: Lazy<Regex> = Lazy::new(|| {
883            Regex::new(r#"(?i),?\s*CONSTRAINT\s+"?\w+"?\s+PRIMARY\s+KEY\s+\([^)]+\)"#).unwrap()
884        });
885        result = RE_PK_CONSTRAINT.replace_all(&result, "").to_string();
886
887        // Remove FOREIGN KEY constraints (analytics queries don't need FK enforcement)
888        static RE_FK_CONSTRAINT: Lazy<Regex> = Lazy::new(|| {
889            Regex::new(r#"(?i),?\s*CONSTRAINT\s+"?\w+"?\s+FOREIGN\s+KEY\s*\([^)]+\)\s*REFERENCES\s+[^\s(]+\s*\([^)]+\)"#).unwrap()
890        });
891        result = RE_FK_CONSTRAINT.replace_all(&result, "").to_string();
892
893        // Remove WITH clause for indexes
894        static RE_WITH: Lazy<Regex> = Lazy::new(|| Regex::new(r"(?i)\s*WITH\s*\([^)]+\)").unwrap());
895        result = RE_WITH.replace_all(&result, "").to_string();
896
897        // Remove TEXTIMAGE_ON
898        static RE_TEXTIMAGE: Lazy<Regex> =
899            Lazy::new(|| Regex::new(r#"(?i)\s*TEXTIMAGE_ON\s*"?\w+"?"#).unwrap());
900        result = RE_TEXTIMAGE.replace_all(&result, "").to_string();
901
902        // Convert GETDATE() to CURRENT_TIMESTAMP
903        static RE_GETDATE: Lazy<Regex> =
904            Lazy::new(|| Regex::new(r"(?i)\bGETDATE\s*\(\s*\)").unwrap());
905        result = RE_GETDATE
906            .replace_all(&result, "CURRENT_TIMESTAMP")
907            .to_string();
908
909        // Convert NEWID() to gen_random_uuid()
910        static RE_NEWID: Lazy<Regex> = Lazy::new(|| Regex::new(r"(?i)\bNEWID\s*\(\s*\)").unwrap());
911        result = RE_NEWID
912            .replace_all(&result, "gen_random_uuid()")
913            .to_string();
914
915        result
916    }
917
918    /// Strip SQLite-specific syntax not supported by DuckDB
919    fn strip_sqlite_syntax(stmt: &str) -> String {
920        let mut result = stmt.to_string();
921
922        // Remove AUTOINCREMENT (DuckDB handles auto-increment via sequences)
923        // SQLite uses "INTEGER PRIMARY KEY AUTOINCREMENT"
924        result = result.replace(" AUTOINCREMENT", "");
925        result = result.replace(" autoincrement", "");
926
927        // Remove IF NOT EXISTS (DuckDB supports this but we want clean imports)
928        // Actually DuckDB does support IF NOT EXISTS, so leave it
929
930        // Remove STRICT table modifier (SQLite 3.37+)
931        static RE_STRICT: Lazy<Regex> = Lazy::new(|| Regex::new(r"(?i)\)\s*STRICT\s*;").unwrap());
932        result = RE_STRICT.replace_all(&result, ");").to_string();
933
934        // Remove WITHOUT ROWID (SQLite optimization not needed for analytics)
935        static RE_WITHOUT_ROWID: Lazy<Regex> =
936            Lazy::new(|| Regex::new(r"(?i)\)\s*WITHOUT\s+ROWID\s*;").unwrap());
937        result = RE_WITHOUT_ROWID.replace_all(&result, ");").to_string();
938
939        result
940    }
941
942    /// Count rows in an INSERT statement
943    fn count_insert_rows(sql: &str) -> u64 {
944        // Count VALUES clauses by counting opening parentheses after VALUES
945        if let Some(values_pos) = sql.to_uppercase().find("VALUES") {
946            let after_values = &sql[values_pos + 6..];
947            // Count top-level opening parens (simple heuristic)
948            let mut count = 0u64;
949            let mut depth = 0;
950            let mut in_string = false;
951            let mut prev_char = ' ';
952
953            for c in after_values.chars() {
954                if c == '\'' && prev_char != '\\' {
955                    in_string = !in_string;
956                }
957                if !in_string {
958                    if c == '(' {
959                        if depth == 0 {
960                            count += 1;
961                        }
962                        depth += 1;
963                    } else if c == ')' {
964                        depth -= 1;
965                    }
966                }
967                prev_char = c;
968            }
969            count
970        } else {
971            1
972        }
973    }
974}
975
976/// Statement reader that handles streaming SQL parsing
977struct StatementReader<R> {
978    reader: BufReader<R>,
979    dialect: SqlDialect,
980    buffer: String,
981    /// Position in buffer where unprocessed data starts (avoids O(n) shifts)
982    buffer_pos: usize,
983    eof: bool,
984    /// Track if we're inside a PostgreSQL COPY data block
985    in_copy_mode: bool,
986}
987
988impl<R: Read> StatementReader<R> {
989    fn new(reader: BufReader<R>, dialect: SqlDialect) -> Self {
990        Self {
991            reader,
992            dialect,
993            buffer: String::new(),
994            buffer_pos: 0,
995            eof: false,
996            in_copy_mode: false,
997        }
998    }
999
1000    /// Compact the buffer by removing already-processed data
1001    /// Only called periodically to avoid O(n²) behavior
1002    fn compact_buffer(&mut self) {
1003        if self.buffer_pos > 0 {
1004            self.buffer.drain(..self.buffer_pos);
1005            self.buffer_pos = 0;
1006        }
1007    }
1008
1009    /// Get the unprocessed portion of the buffer
1010    fn remaining_buffer(&self) -> &str {
1011        &self.buffer[self.buffer_pos..]
1012    }
1013
1014    /// Set COPY mode explicitly (called by DumpLoader when entering/exiting COPY blocks)
1015    fn set_copy_mode(&mut self, enabled: bool) {
1016        self.in_copy_mode = enabled;
1017    }
1018
1019    /// Strip leading SQL comments (-- and /* */) from a string
1020    fn strip_leading_sql_comments(s: &str) -> &str {
1021        let mut result = s.trim();
1022        loop {
1023            // Skip -- line comments
1024            if result.starts_with("--") {
1025                if let Some(pos) = result.find('\n') {
1026                    result = result[pos + 1..].trim();
1027                    continue;
1028                } else {
1029                    return ""; // Only comment, no newline
1030                }
1031            }
1032            // Skip /* */ block comments
1033            if result.starts_with("/*") {
1034                if let Some(pos) = result.find("*/") {
1035                    result = result[pos + 2..].trim();
1036                    continue;
1037                } else {
1038                    return ""; // Unclosed block comment
1039                }
1040            }
1041            break;
1042        }
1043        result
1044    }
1045
1046    fn next_statement(&mut self) -> Option<Result<String>> {
1047        if self.eof && self.remaining_buffer().is_empty() {
1048            return None;
1049        }
1050
1051        loop {
1052            // In COPY mode, return each line individually until we see \.
1053            if self.in_copy_mode {
1054                if let Some(line) = self.extract_copy_line() {
1055                    return Some(Ok(line));
1056                }
1057            } else {
1058                // Try to find a complete statement in the buffer
1059                if let Some(stmt) = self.extract_statement() {
1060                    // COPY mode is now managed explicitly by DumpLoader via set_copy_mode()
1061                    return Some(Ok(stmt));
1062                }
1063            }
1064
1065            // Compact buffer periodically to prevent unbounded growth
1066            // Only compact when processed portion is significant
1067            if self.buffer_pos > 64 * 1024 {
1068                self.compact_buffer();
1069            }
1070
1071            // Read more data
1072            let mut line = String::new();
1073            match self.reader.read_line(&mut line) {
1074                Ok(0) => {
1075                    self.eof = true;
1076                    self.in_copy_mode = false; // Reset on EOF
1077                    let remaining = self.remaining_buffer().trim();
1078                    if !remaining.is_empty() {
1079                        let stmt = remaining.to_string();
1080                        self.buffer.clear();
1081                        self.buffer_pos = 0;
1082                        return Some(Ok(stmt));
1083                    }
1084                    return None;
1085                }
1086                Ok(_) => {
1087                    self.buffer.push_str(&line);
1088                }
1089                Err(e) => return Some(Err(e.into())),
1090            }
1091        }
1092    }
1093
1094    /// Extract a single line from the buffer for COPY data mode
1095    fn extract_copy_line(&mut self) -> Option<String> {
1096        let remaining = self.remaining_buffer();
1097        if let Some(newline_pos) = remaining.find('\n') {
1098            let line = remaining[..newline_pos].to_string();
1099            self.buffer_pos += newline_pos + 1;
1100            // COPY mode is managed by DumpLoader via set_copy_mode(), not here
1101            Some(line)
1102        } else {
1103            None
1104        }
1105    }
1106
1107    fn extract_statement(&mut self) -> Option<String> {
1108        let remaining = self.remaining_buffer();
1109        let mut in_string = false;
1110        let mut in_dollar_quote = false;
1111        let mut in_bracket = false;
1112        let mut in_line_comment = false;
1113        let mut in_block_comment = false;
1114        let mut escape_next = false;
1115        let mut chars = remaining.char_indices().peekable();
1116        let mut end_pos = None;
1117
1118        // For MSSQL, check for GO at start of line
1119        if self.dialect == SqlDialect::Mssql {
1120            if let Some(go_pos) = self.find_go_separator() {
1121                let stmt = remaining[..go_pos].to_string();
1122                // Skip past GO and any whitespace
1123                let after_go = &remaining[go_pos..];
1124                if let Some(line_end) = after_go.find('\n') {
1125                    self.buffer_pos += go_pos + line_end + 1;
1126                } else {
1127                    self.buffer_pos = self.buffer.len();
1128                }
1129
1130                let trimmed = stmt.trim();
1131                if trimmed.is_empty()
1132                    || trimmed.starts_with("--")
1133                    || (trimmed.starts_with("/*") && !trimmed.contains("/*!"))
1134                {
1135                    return self.extract_statement();
1136                }
1137                return Some(stmt);
1138            }
1139        }
1140
1141        while let Some((i, c)) = chars.next() {
1142            if escape_next {
1143                escape_next = false;
1144                continue;
1145            }
1146
1147            // Handle line comments (-- to end of line)
1148            if in_line_comment {
1149                if c == '\n' {
1150                    in_line_comment = false;
1151                }
1152                continue;
1153            }
1154
1155            // Handle block comments (/* to */)
1156            if in_block_comment {
1157                if c == '*' && chars.peek().map(|(_, c)| *c == '/').unwrap_or(false) {
1158                    chars.next();
1159                    in_block_comment = false;
1160                }
1161                continue;
1162            }
1163
1164            match c {
1165                '\\' if self.dialect == SqlDialect::MySql && in_string => {
1166                    escape_next = true;
1167                }
1168                '\'' if !in_dollar_quote && !in_bracket => {
1169                    in_string = !in_string;
1170                }
1171                '[' if self.dialect == SqlDialect::Mssql && !in_string => {
1172                    in_bracket = true;
1173                }
1174                ']' if self.dialect == SqlDialect::Mssql && !in_string => {
1175                    // Handle ]] escape
1176                    if chars.peek().map(|(_, c)| *c == ']').unwrap_or(false) {
1177                        chars.next();
1178                    } else {
1179                        in_bracket = false;
1180                    }
1181                }
1182                '$' if self.dialect == SqlDialect::Postgres && !in_string => {
1183                    // Check for dollar quote
1184                    if chars.peek().map(|(_, c)| *c == '$').unwrap_or(false) {
1185                        in_dollar_quote = !in_dollar_quote;
1186                        chars.next();
1187                    }
1188                }
1189                '-' if !in_string && !in_dollar_quote && !in_bracket => {
1190                    // Check for -- line comment
1191                    if chars.peek().map(|(_, c)| *c == '-').unwrap_or(false) {
1192                        chars.next();
1193                        in_line_comment = true;
1194                    }
1195                }
1196                '/' if !in_string && !in_dollar_quote && !in_bracket => {
1197                    // Check for /* block comment
1198                    if chars.peek().map(|(_, c)| *c == '*').unwrap_or(false) {
1199                        chars.next();
1200                        in_block_comment = true;
1201                    }
1202                }
1203                ';' if !in_string && !in_dollar_quote && !in_bracket => {
1204                    end_pos = Some(i + 1);
1205                    break;
1206                }
1207                _ => {}
1208            }
1209        }
1210
1211        if let Some(pos) = end_pos {
1212            let stmt = remaining[..pos].to_string();
1213            // Skip past the statement and any leading whitespace
1214            let after_stmt = &remaining[pos..];
1215            let trimmed_len = after_stmt.len() - after_stmt.trim_start().len();
1216            self.buffer_pos += pos + trimmed_len;
1217
1218            // Skip empty statements and comments
1219            let trimmed = stmt.trim();
1220
1221            // Strip leading comments from the statement before checking if it's just a comment
1222            let stripped = Self::strip_leading_sql_comments(trimmed);
1223            if stripped.is_empty() {
1224                return self.extract_statement();
1225            }
1226
1227            // Use the stripped version for further processing
1228            let trimmed = stripped;
1229
1230            // For Postgres COPY statements, auto-enter copy mode
1231            // This prevents accumulating COPY data while looking for the next semicolon
1232            if self.dialect == SqlDialect::Postgres {
1233                let upper = trimmed.to_uppercase();
1234                if upper.ends_with("FROM STDIN;") && upper.contains("COPY ") {
1235                    self.in_copy_mode = true;
1236                }
1237            }
1238
1239            Some(stmt)
1240        } else {
1241            None
1242        }
1243    }
1244
1245    /// Find GO batch separator at start of line (MSSQL)
1246    fn find_go_separator(&self) -> Option<usize> {
1247        let remaining = self.remaining_buffer();
1248        let mut in_string = false;
1249        let mut in_bracket = false;
1250        let mut line_start = 0;
1251
1252        for (i, c) in remaining.char_indices() {
1253            if c == '\'' && !in_bracket {
1254                in_string = !in_string;
1255            } else if c == '[' && !in_string {
1256                in_bracket = true;
1257            } else if c == ']' && !in_string {
1258                in_bracket = false;
1259            } else if c == '\n' {
1260                line_start = i + 1;
1261            } else if !in_string && !in_bracket && i == line_start {
1262                // Check for GO at start of line
1263                let rest = &remaining[i..];
1264                if rest.len() >= 2 {
1265                    let word = &rest[..2.min(rest.len())];
1266                    if word.eq_ignore_ascii_case("GO") {
1267                        // Make sure it's just GO (not GO_SOMETHING)
1268                        let after_go = if rest.len() > 2 {
1269                            rest.chars().nth(2)
1270                        } else {
1271                            None
1272                        };
1273                        if after_go.is_none()
1274                            || after_go == Some('\n')
1275                            || after_go == Some('\r')
1276                            || after_go == Some(' ')
1277                            || after_go.unwrap().is_ascii_digit()
1278                        {
1279                            return Some(i);
1280                        }
1281                    }
1282                }
1283            }
1284        }
1285        None
1286    }
1287}
1288
1289#[cfg(test)]
1290mod tests {
1291    use super::*;
1292
1293    #[test]
1294    fn test_count_insert_rows() {
1295        assert_eq!(
1296            DumpLoader::count_insert_rows("INSERT INTO t VALUES (1, 'a')"),
1297            1
1298        );
1299        assert_eq!(
1300            DumpLoader::count_insert_rows("INSERT INTO t VALUES (1, 'a'), (2, 'b'), (3, 'c')"),
1301            3
1302        );
1303        assert_eq!(
1304            DumpLoader::count_insert_rows("INSERT INTO t VALUES (1, '(test)')"),
1305            1
1306        );
1307    }
1308
1309    #[test]
1310    fn test_strip_mysql_clauses() {
1311        let sql = "CREATE TABLE t (id INT) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4";
1312        let result = DumpLoader::strip_mysql_clauses(sql);
1313        assert!(!result.contains("ENGINE"));
1314        assert!(!result.contains("CHARSET"));
1315    }
1316
1317    #[test]
1318    fn test_convert_identifiers() {
1319        let sql = "INSERT INTO `users` (`id`, `name`) VALUES (1, 'test')";
1320        let result = DumpLoader::convert_identifiers(sql, SqlDialect::MySql);
1321        assert_eq!(
1322            result,
1323            "INSERT INTO \"users\" (\"id\", \"name\") VALUES (1, 'test')"
1324        );
1325    }
1326
1327    #[test]
1328    fn test_looks_like_copy_data() {
1329        assert!(DumpLoader::looks_like_copy_data("1\tAlice\t2024-01-01"));
1330        assert!(!DumpLoader::looks_like_copy_data("SELECT * FROM users"));
1331        assert!(!DumpLoader::looks_like_copy_data("INSERT INTO t VALUES"));
1332    }
1333
1334    #[test]
1335    fn test_strip_mssql_syntax() {
1336        let sql = r#"CREATE TABLE "users" (
1337    "id" INTEGER NOT NULL,
1338    "email" VARCHAR(255) NOT NULL
1339)"#;
1340        // The IDENTITY should have been stripped by strip_mssql_syntax
1341        let result = DumpLoader::strip_mssql_syntax(sql);
1342        assert!(!result.contains("IDENTITY"), "IDENTITY should be stripped");
1343
1344        // Test with IDENTITY(1,1) NOT NULL
1345        let sql_with_identity = r#"CREATE TABLE "users" (
1346    "id" INTEGER IDENTITY(1,1) NOT NULL,
1347    "email" VARCHAR(255) NOT NULL
1348)"#;
1349        let result2 = DumpLoader::strip_mssql_syntax(sql_with_identity);
1350        assert!(
1351            !result2.contains("IDENTITY"),
1352            "IDENTITY should be stripped: {}",
1353            result2
1354        );
1355        // Since we strip IDENTITY(1,1) NOT NULL, the column should become nullable
1356        assert!(
1357            !result2.contains("IDENTITY(1,1) NOT NULL"),
1358            "Should strip full IDENTITY NOT NULL"
1359        );
1360    }
1361
1362    #[test]
1363    fn test_convert_mssql_identifiers() {
1364        let sql = "INSERT INTO [dbo].[users] ([id], [name]) VALUES (1, N'test')";
1365        let result = DumpLoader::convert_identifiers(sql, SqlDialect::Mssql);
1366        assert_eq!(
1367            result,
1368            "INSERT INTO \"dbo\".\"users\" (\"id\", \"name\") VALUES (1, 'test')"
1369        );
1370    }
1371}