drizzle_cli/db/
mod.rs

1//! Database connection and migration execution for CLI commands
2//!
3//! This module provides database connectivity for running migrations and other
4//! database operations from the CLI.
5
6use std::path::Path;
7
8#[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
9use crate::config::PostgresCreds;
10use crate::config::{Credentials, Dialect};
11use crate::error::CliError;
12use crate::output;
13use drizzle_migrations::MigrationSet;
14use drizzle_migrations::schema::Snapshot;
15
16/// Result of a migration run
17#[derive(Debug)]
18pub struct MigrationResult {
19    /// Number of migrations applied
20    pub applied_count: usize,
21    /// Tags of applied migrations
22    pub applied_migrations: Vec<String>,
23}
24
25/// Planned SQL changes for `drizzle push`
26#[derive(Debug, Clone)]
27pub struct PushPlan {
28    pub sql_statements: Vec<String>,
29    pub warnings: Vec<String>,
30    pub destructive: bool,
31}
32
33/// Plan a push by introspecting the live database and diffing against the desired snapshot.
34pub fn plan_push(
35    credentials: &Credentials,
36    dialect: Dialect,
37    desired: &Snapshot,
38    breakpoints: bool,
39) -> Result<PushPlan, CliError> {
40    let current = introspect_database(credentials, dialect)?.snapshot;
41    let (sql_statements, warnings) = generate_push_sql(&current, desired, breakpoints)?;
42    let destructive = sql_statements.iter().any(|s| is_destructive_statement(s));
43
44    Ok(PushPlan {
45        sql_statements,
46        warnings,
47        destructive,
48    })
49}
50
51/// Apply a previously planned push.
52pub fn apply_push(
53    credentials: &Credentials,
54    dialect: Dialect,
55    plan: &PushPlan,
56    force: bool,
57) -> Result<(), CliError> {
58    if plan.sql_statements.is_empty() {
59        return Ok(());
60    }
61
62    if plan.destructive && !force {
63        let confirmed = confirm_destructive()?;
64        if !confirmed {
65            return Ok(());
66        }
67    }
68
69    execute_statements(credentials, dialect, &plan.sql_statements)
70}
71
72/// Execute migrations against the database
73///
74/// This is the main entry point that dispatches to the appropriate driver
75/// based on the credentials type.
76pub fn run_migrations(
77    credentials: &Credentials,
78    dialect: Dialect,
79    migrations_dir: &Path,
80    migrations_table: &str,
81    migrations_schema: &str,
82) -> Result<MigrationResult, CliError> {
83    // Load migrations from filesystem
84    let mut set = MigrationSet::from_dir(migrations_dir, dialect.to_base())
85        .map_err(|e| CliError::Other(format!("Failed to load migrations: {}", e)))?;
86
87    // Apply overrides from config
88    if !migrations_table.trim().is_empty() {
89        set = set.with_table(migrations_table.to_string());
90    }
91    if dialect == Dialect::Postgresql && !migrations_schema.trim().is_empty() {
92        set = set.with_schema(migrations_schema.to_string());
93    }
94
95    if set.all().is_empty() {
96        return Ok(MigrationResult {
97            applied_count: 0,
98            applied_migrations: vec![],
99        });
100    }
101
102    match credentials {
103        #[cfg(feature = "rusqlite")]
104        Credentials::Sqlite { path } => run_sqlite_migrations(&set, path),
105
106        #[cfg(not(feature = "rusqlite"))]
107        Credentials::Sqlite { .. } => Err(CliError::MissingDriver {
108            dialect: "SQLite",
109            feature: "rusqlite",
110        }),
111
112        #[cfg(any(feature = "libsql", feature = "turso"))]
113        Credentials::Turso { url, auth_token } => {
114            let _auth_token = auth_token.as_deref();
115            if is_local_libsql(url) {
116                #[cfg(feature = "libsql")]
117                {
118                    run_libsql_local_migrations(&set, url)
119                }
120                #[cfg(not(feature = "libsql"))]
121                {
122                    Err(CliError::MissingDriver {
123                        dialect: "LibSQL (local)",
124                        feature: "libsql",
125                    })
126                }
127            } else {
128                #[cfg(feature = "turso")]
129                {
130                    run_turso_migrations(&set, url, _auth_token)
131                }
132                #[cfg(not(feature = "turso"))]
133                {
134                    Err(CliError::MissingDriver {
135                        dialect: "Turso (remote)",
136                        feature: "turso",
137                    })
138                }
139            }
140        }
141
142        #[cfg(all(not(feature = "turso"), not(feature = "libsql")))]
143        Credentials::Turso { .. } => Err(CliError::MissingDriver {
144            dialect: "Turso",
145            feature: "turso or libsql",
146        }),
147
148        // PostgreSQL - prefer sync driver if available, fall back to async
149        #[cfg(feature = "postgres-sync")]
150        Credentials::Postgres(creds) => run_postgres_sync_migrations(&set, creds),
151
152        #[cfg(all(not(feature = "postgres-sync"), feature = "tokio-postgres"))]
153        Credentials::Postgres(creds) => run_postgres_async_migrations(&set, creds),
154
155        #[cfg(all(not(feature = "postgres-sync"), not(feature = "tokio-postgres")))]
156        Credentials::Postgres(_) => Err(CliError::MissingDriver {
157            dialect: "PostgreSQL",
158            feature: "postgres-sync or tokio-postgres",
159        }),
160    }
161}
162
163fn is_destructive_statement(sql: &str) -> bool {
164    let s = sql.trim().to_uppercase();
165    s.contains("DROP TABLE")
166        || s.contains("DROP COLUMN")
167        || s.contains("DROP INDEX")
168        || s.contains("TRUNCATE")
169        || (s.contains("ALTER TABLE") && s.contains(" DROP "))
170}
171
172fn confirm_destructive() -> Result<bool, CliError> {
173    use std::io::{self, IsTerminal, Write};
174
175    if !io::stdin().is_terminal() {
176        return Err(CliError::Other(
177            "Refusing to apply potentially destructive changes in non-interactive mode. Use --explain or --force."
178                .into(),
179        ));
180    }
181
182    println!(
183        "{}",
184        output::warning("Potentially destructive changes detected (DROP/TRUNCATE/etc).")
185    );
186    print!("Apply anyway? [y/N]: ");
187    io::stdout()
188        .flush()
189        .map_err(|e| CliError::IoError(e.to_string()))?;
190
191    let mut line = String::new();
192    io::stdin()
193        .read_line(&mut line)
194        .map_err(|e| CliError::IoError(e.to_string()))?;
195    let ans = line.trim().to_ascii_lowercase();
196    Ok(ans == "y" || ans == "yes")
197}
198
199fn generate_push_sql(
200    current: &Snapshot,
201    desired: &Snapshot,
202    breakpoints: bool,
203) -> Result<(Vec<String>, Vec<String>), CliError> {
204    match (current, desired) {
205        (Snapshot::Sqlite(prev_snap), Snapshot::Sqlite(curr_snap)) => {
206            use drizzle_migrations::sqlite::collection::SQLiteDDL;
207            use drizzle_migrations::sqlite::diff::compute_migration;
208
209            let prev_ddl = SQLiteDDL::from_entities(prev_snap.ddl.clone());
210            let cur_ddl = SQLiteDDL::from_entities(curr_snap.ddl.clone());
211
212            let diff = compute_migration(&prev_ddl, &cur_ddl);
213            Ok((diff.sql_statements, diff.warnings))
214        }
215        (Snapshot::Postgres(prev_snap), Snapshot::Postgres(curr_snap)) => {
216            use drizzle_migrations::postgres::diff_full_snapshots;
217            use drizzle_migrations::postgres::statements::PostgresGenerator;
218
219            let diff = diff_full_snapshots(prev_snap, curr_snap);
220            let generator = PostgresGenerator::new().with_breakpoints(breakpoints);
221            Ok((generator.generate(&diff.diffs), Vec::new()))
222        }
223        _ => Err(CliError::DialectMismatch),
224    }
225}
226
227fn execute_statements(
228    credentials: &Credentials,
229    _dialect: Dialect,
230    statements: &[String],
231) -> Result<(), CliError> {
232    // In some feature combinations (no drivers), the match arms that would use `statements`
233    // are compiled out. Touch it to avoid unused-parameter warnings.
234    let _ = statements;
235
236    match credentials {
237        #[cfg(feature = "rusqlite")]
238        Credentials::Sqlite { path } => execute_sqlite_statements(path, statements),
239
240        #[cfg(not(feature = "rusqlite"))]
241        Credentials::Sqlite { .. } => Err(CliError::MissingDriver {
242            dialect: "SQLite",
243            feature: "rusqlite",
244        }),
245
246        #[cfg(any(feature = "libsql", feature = "turso"))]
247        Credentials::Turso { url, auth_token } => {
248            let _auth_token = auth_token.as_deref();
249            if is_local_libsql(url) {
250                #[cfg(feature = "libsql")]
251                {
252                    execute_libsql_local_statements(url, statements)
253                }
254                #[cfg(not(feature = "libsql"))]
255                {
256                    Err(CliError::MissingDriver {
257                        dialect: "LibSQL (local)",
258                        feature: "libsql",
259                    })
260                }
261            } else {
262                #[cfg(feature = "turso")]
263                {
264                    execute_turso_statements(url, _auth_token, statements)
265                }
266                #[cfg(not(feature = "turso"))]
267                {
268                    Err(CliError::MissingDriver {
269                        dialect: "Turso (remote)",
270                        feature: "turso",
271                    })
272                }
273            }
274        }
275
276        #[cfg(all(not(feature = "turso"), not(feature = "libsql")))]
277        Credentials::Turso { .. } => Err(CliError::MissingDriver {
278            dialect: "Turso",
279            feature: "turso or libsql",
280        }),
281
282        #[cfg(feature = "postgres-sync")]
283        Credentials::Postgres(creds) => execute_postgres_sync_statements(creds, statements),
284
285        #[cfg(all(not(feature = "postgres-sync"), feature = "tokio-postgres"))]
286        Credentials::Postgres(creds) => execute_postgres_async_statements(creds, statements),
287
288        #[cfg(all(not(feature = "postgres-sync"), not(feature = "tokio-postgres")))]
289        Credentials::Postgres(_) => Err(CliError::MissingDriver {
290            dialect: "PostgreSQL",
291            feature: "postgres-sync or tokio-postgres",
292        }),
293    }
294}
295
296/// Check if a Turso URL is a local libsql database
297#[allow(dead_code)]
298fn is_local_libsql(url: &str) -> bool {
299    url.starts_with("file:")
300        || url.starts_with("./")
301        || url.starts_with("/")
302        || !url.contains("://")
303}
304
305#[cfg(any(feature = "rusqlite", feature = "libsql", feature = "turso"))]
306fn process_sqlite_uniques_from_indexes(
307    raw_indexes: &[drizzle_migrations::sqlite::introspect::RawIndexInfo],
308    index_columns: &[drizzle_migrations::sqlite::introspect::RawIndexColumn],
309) -> Vec<drizzle_migrations::sqlite::UniqueConstraint> {
310    use drizzle_migrations::sqlite::UniqueConstraint;
311
312    let mut uniques = Vec::new();
313
314    for idx in raw_indexes.iter().filter(|i| i.origin == "u") {
315        let mut cols: Vec<(i32, String)> = index_columns
316            .iter()
317            .filter(|c| c.index_name == idx.name && c.key)
318            .filter_map(|c| c.name.clone().map(|n| (c.seqno, n)))
319            .collect();
320        cols.sort_by_key(|(seq, _)| *seq);
321        let col_names: Vec<String> = cols.into_iter().map(|(_, n)| n).collect();
322        if col_names.is_empty() {
323            continue;
324        }
325
326        let name_explicit = !idx.name.starts_with("sqlite_autoindex_");
327        let constraint_name = if name_explicit {
328            idx.name.clone()
329        } else {
330            let refs: Vec<&str> = col_names.iter().map(String::as_str).collect();
331            drizzle_migrations::sqlite::ddl::name_for_unique(&idx.table, &refs)
332        };
333
334        let mut uniq =
335            UniqueConstraint::from_strings(idx.table.clone(), constraint_name, col_names);
336        uniq.name_explicit = name_explicit;
337        uniques.push(uniq);
338    }
339
340    uniques
341}
342
343// ============================================================================
344// SQLite (rusqlite)
345// ============================================================================
346
347#[cfg(feature = "rusqlite")]
348fn execute_sqlite_statements(path: &str, statements: &[String]) -> Result<(), CliError> {
349    let conn = rusqlite::Connection::open(path).map_err(|e| {
350        CliError::ConnectionError(format!("Failed to open SQLite database '{}': {}", path, e))
351    })?;
352
353    conn.execute("BEGIN", [])
354        .map_err(|e| CliError::MigrationError(e.to_string()))?;
355
356    for stmt in statements {
357        let s = stmt.trim();
358        if s.is_empty() {
359            continue;
360        }
361        if let Err(e) = conn.execute(s, []) {
362            let _ = conn.execute("ROLLBACK", []);
363            return Err(CliError::MigrationError(format!(
364                "Statement failed: {}\n{}",
365                e, s
366            )));
367        }
368    }
369
370    conn.execute("COMMIT", [])
371        .map_err(|e| CliError::MigrationError(e.to_string()))?;
372
373    Ok(())
374}
375
376#[cfg(feature = "rusqlite")]
377fn run_sqlite_migrations(set: &MigrationSet, path: &str) -> Result<MigrationResult, CliError> {
378    let conn = rusqlite::Connection::open(path).map_err(|e| {
379        CliError::ConnectionError(format!("Failed to open SQLite database '{}': {}", path, e))
380    })?;
381
382    // Create migrations table
383    conn.execute(&set.create_table_sql(), []).map_err(|e| {
384        CliError::MigrationError(format!("Failed to create migrations table: {}", e))
385    })?;
386
387    // Query applied hashes
388    let applied_hashes = query_applied_hashes_sqlite(&conn, set)?;
389
390    // Get pending migrations
391    let pending: Vec<_> = set.pending(&applied_hashes).collect();
392    if pending.is_empty() {
393        return Ok(MigrationResult {
394            applied_count: 0,
395            applied_migrations: vec![],
396        });
397    }
398
399    // Execute in transaction
400    conn.execute("BEGIN", [])
401        .map_err(|e| CliError::MigrationError(e.to_string()))?;
402
403    let mut applied = Vec::new();
404    for migration in &pending {
405        for stmt in migration.statements() {
406            if !stmt.trim().is_empty()
407                && let Err(e) = conn.execute(stmt, [])
408            {
409                let _ = conn.execute("ROLLBACK", []);
410                return Err(CliError::MigrationError(format!(
411                    "Migration '{}' failed: {}",
412                    migration.hash(),
413                    e
414                )));
415            }
416        }
417        if let Err(e) = conn.execute(
418            &set.record_migration_sql(migration.hash(), migration.created_at()),
419            [],
420        ) {
421            let _ = conn.execute("ROLLBACK", []);
422            return Err(CliError::MigrationError(e.to_string()));
423        }
424        applied.push(migration.hash().to_string());
425    }
426
427    conn.execute("COMMIT", [])
428        .map_err(|e| CliError::MigrationError(e.to_string()))?;
429
430    Ok(MigrationResult {
431        applied_count: applied.len(),
432        applied_migrations: applied,
433    })
434}
435
436#[cfg(feature = "rusqlite")]
437fn query_applied_hashes_sqlite(
438    conn: &rusqlite::Connection,
439    set: &MigrationSet,
440) -> Result<Vec<String>, CliError> {
441    let mut stmt = match conn.prepare(&set.query_all_hashes_sql()) {
442        Ok(s) => s,
443        Err(_) => return Ok(vec![]), // Table might not exist yet
444    };
445
446    let hashes = stmt
447        .query_map([], |row| row.get(0))
448        .map_err(|e| CliError::MigrationError(e.to_string()))?
449        .filter_map(Result::ok)
450        .collect();
451
452    Ok(hashes)
453}
454
455// ============================================================================
456// PostgreSQL (postgres - sync)
457// ============================================================================
458
459#[cfg(feature = "postgres-sync")]
460fn execute_postgres_sync_statements(
461    creds: &PostgresCreds,
462    statements: &[String],
463) -> Result<(), CliError> {
464    let url = creds.connection_url();
465    let mut client = postgres::Client::connect(&url, postgres::NoTls).map_err(|e| {
466        CliError::ConnectionError(format!("Failed to connect to PostgreSQL: {}", e))
467    })?;
468
469    let mut tx = client
470        .transaction()
471        .map_err(|e| CliError::MigrationError(e.to_string()))?;
472
473    for stmt in statements {
474        let s = stmt.trim();
475        if s.is_empty() {
476            continue;
477        }
478        tx.execute(s, &[])
479            .map_err(|e| CliError::MigrationError(format!("Statement failed: {}\n{}", e, s)))?;
480    }
481
482    tx.commit()
483        .map_err(|e| CliError::MigrationError(e.to_string()))?;
484
485    Ok(())
486}
487
488#[cfg(feature = "postgres-sync")]
489fn run_postgres_sync_migrations(
490    set: &MigrationSet,
491    creds: &PostgresCreds,
492) -> Result<MigrationResult, CliError> {
493    let url = creds.connection_url();
494    let mut client = postgres::Client::connect(&url, postgres::NoTls).map_err(|e| {
495        CliError::ConnectionError(format!("Failed to connect to PostgreSQL: {}", e))
496    })?;
497
498    // Create schema if needed
499    if let Some(schema_sql) = set.create_schema_sql() {
500        client
501            .execute(&schema_sql, &[])
502            .map_err(|e| CliError::MigrationError(e.to_string()))?;
503    }
504
505    // Create migrations table
506    client
507        .execute(&set.create_table_sql(), &[])
508        .map_err(|e| CliError::MigrationError(e.to_string()))?;
509
510    // Query applied hashes
511    let rows = client
512        .query(&set.query_all_hashes_sql(), &[])
513        .unwrap_or_default();
514    let applied_hashes: Vec<String> = rows.iter().map(|r| r.get(0)).collect();
515
516    // Get pending migrations
517    let pending: Vec<_> = set.pending(&applied_hashes).collect();
518    if pending.is_empty() {
519        return Ok(MigrationResult {
520            applied_count: 0,
521            applied_migrations: vec![],
522        });
523    }
524
525    // Execute in transaction
526    let mut tx = client
527        .transaction()
528        .map_err(|e| CliError::MigrationError(e.to_string()))?;
529
530    let mut applied = Vec::new();
531    for migration in &pending {
532        for stmt in migration.statements() {
533            if !stmt.trim().is_empty() {
534                tx.execute(stmt, &[]).map_err(|e| {
535                    CliError::MigrationError(format!(
536                        "Migration '{}' failed: {}",
537                        migration.hash(),
538                        e
539                    ))
540                })?;
541            }
542        }
543        tx.execute(
544            &set.record_migration_sql(migration.hash(), migration.created_at()),
545            &[],
546        )
547        .map_err(|e| CliError::MigrationError(e.to_string()))?;
548        applied.push(migration.hash().to_string());
549    }
550
551    tx.commit()
552        .map_err(|e| CliError::MigrationError(e.to_string()))?;
553
554    Ok(MigrationResult {
555        applied_count: applied.len(),
556        applied_migrations: applied,
557    })
558}
559
560// ============================================================================
561// PostgreSQL (tokio-postgres - async)
562// ============================================================================
563
564#[cfg(feature = "tokio-postgres")]
565fn execute_postgres_async_statements(
566    creds: &PostgresCreds,
567    statements: &[String],
568) -> Result<(), CliError> {
569    let rt = tokio::runtime::Builder::new_current_thread()
570        .enable_all()
571        .build()
572        .map_err(|e| CliError::Other(format!("Failed to create async runtime: {}", e)))?;
573
574    rt.block_on(execute_postgres_async_inner(creds, statements))
575}
576
577#[cfg(feature = "tokio-postgres")]
578async fn execute_postgres_async_inner(
579    creds: &PostgresCreds,
580    statements: &[String],
581) -> Result<(), CliError> {
582    let url = creds.connection_url();
583    let (mut client, connection) = tokio_postgres::connect(&url, tokio_postgres::NoTls)
584        .await
585        .map_err(|e| {
586            CliError::ConnectionError(format!("Failed to connect to PostgreSQL: {}", e))
587        })?;
588
589    tokio::spawn(async move {
590        if let Err(e) = connection.await {
591            eprintln!(
592                "{}",
593                output::err_line(&format!("PostgreSQL connection error: {e}"))
594            );
595        }
596    });
597
598    let tx = client
599        .transaction()
600        .await
601        .map_err(|e| CliError::MigrationError(e.to_string()))?;
602
603    for stmt in statements {
604        let s = stmt.trim();
605        if s.is_empty() {
606            continue;
607        }
608        tx.execute(s, &[])
609            .await
610            .map_err(|e| CliError::MigrationError(format!("Statement failed: {}\n{}", e, s)))?;
611    }
612
613    tx.commit()
614        .await
615        .map_err(|e| CliError::MigrationError(e.to_string()))?;
616
617    Ok(())
618}
619
620#[cfg(feature = "tokio-postgres")]
621#[allow(dead_code)] // Used when postgres-sync is not enabled
622fn run_postgres_async_migrations(
623    set: &MigrationSet,
624    creds: &PostgresCreds,
625) -> Result<MigrationResult, CliError> {
626    let rt = tokio::runtime::Builder::new_current_thread()
627        .enable_all()
628        .build()
629        .map_err(|e| CliError::Other(format!("Failed to create async runtime: {}", e)))?;
630
631    rt.block_on(run_postgres_async_inner(set, creds))
632}
633
634#[cfg(feature = "tokio-postgres")]
635#[allow(dead_code)]
636async fn run_postgres_async_inner(
637    set: &MigrationSet,
638    creds: &PostgresCreds,
639) -> Result<MigrationResult, CliError> {
640    let url = creds.connection_url();
641    let (mut client, connection) = tokio_postgres::connect(&url, tokio_postgres::NoTls)
642        .await
643        .map_err(|e| {
644            CliError::ConnectionError(format!("Failed to connect to PostgreSQL: {}", e))
645        })?;
646
647    // Spawn connection handler
648    tokio::spawn(async move {
649        if let Err(e) = connection.await {
650            eprintln!(
651                "{}",
652                output::err_line(&format!("PostgreSQL connection error: {e}"))
653            );
654        }
655    });
656
657    // Create schema if needed
658    if let Some(schema_sql) = set.create_schema_sql() {
659        client
660            .execute(&schema_sql, &[])
661            .await
662            .map_err(|e| CliError::MigrationError(e.to_string()))?;
663    }
664
665    // Create migrations table
666    client
667        .execute(&set.create_table_sql(), &[])
668        .await
669        .map_err(|e| CliError::MigrationError(e.to_string()))?;
670
671    // Query applied hashes
672    let rows = client
673        .query(&set.query_all_hashes_sql(), &[])
674        .await
675        .unwrap_or_default();
676    let applied_hashes: Vec<String> = rows.iter().map(|r| r.get(0)).collect();
677
678    // Get pending migrations
679    let pending: Vec<_> = set.pending(&applied_hashes).collect();
680    if pending.is_empty() {
681        return Ok(MigrationResult {
682            applied_count: 0,
683            applied_migrations: vec![],
684        });
685    }
686
687    // Execute in transaction
688    let tx = client
689        .transaction()
690        .await
691        .map_err(|e| CliError::MigrationError(e.to_string()))?;
692
693    let mut applied = Vec::new();
694    for migration in &pending {
695        for stmt in migration.statements() {
696            if !stmt.trim().is_empty() {
697                tx.execute(stmt, &[]).await.map_err(|e| {
698                    CliError::MigrationError(format!(
699                        "Migration '{}' failed: {}",
700                        migration.hash(),
701                        e
702                    ))
703                })?;
704            }
705        }
706        tx.execute(
707            &set.record_migration_sql(migration.hash(), migration.created_at()),
708            &[],
709        )
710        .await
711        .map_err(|e| CliError::MigrationError(e.to_string()))?;
712        applied.push(migration.hash().to_string());
713    }
714
715    tx.commit()
716        .await
717        .map_err(|e| CliError::MigrationError(e.to_string()))?;
718
719    Ok(MigrationResult {
720        applied_count: applied.len(),
721        applied_migrations: applied,
722    })
723}
724
725// ============================================================================
726// LibSQL (local)
727// ============================================================================
728
729#[cfg(feature = "libsql")]
730fn execute_libsql_local_statements(path: &str, statements: &[String]) -> Result<(), CliError> {
731    let rt = tokio::runtime::Builder::new_current_thread()
732        .enable_all()
733        .build()
734        .map_err(|e| CliError::Other(format!("Failed to create async runtime: {}", e)))?;
735
736    rt.block_on(execute_libsql_local_inner(path, statements))
737}
738
739#[cfg(feature = "libsql")]
740async fn execute_libsql_local_inner(path: &str, statements: &[String]) -> Result<(), CliError> {
741    let db = libsql::Builder::new_local(path)
742        .build()
743        .await
744        .map_err(|e| {
745            CliError::ConnectionError(format!("Failed to open LibSQL database '{}': {}", path, e))
746        })?;
747
748    let conn = db
749        .connect()
750        .map_err(|e| CliError::ConnectionError(e.to_string()))?;
751
752    let tx = conn
753        .transaction()
754        .await
755        .map_err(|e| CliError::MigrationError(e.to_string()))?;
756
757    for stmt in statements {
758        let s = stmt.trim();
759        if s.is_empty() {
760            continue;
761        }
762        if let Err(e) = tx.execute(s, ()).await {
763            tx.rollback().await.ok();
764            return Err(CliError::MigrationError(format!(
765                "Statement failed: {}\n{}",
766                e, s
767            )));
768        }
769    }
770
771    tx.commit()
772        .await
773        .map_err(|e| CliError::MigrationError(e.to_string()))?;
774
775    Ok(())
776}
777
778#[cfg(feature = "libsql")]
779fn run_libsql_local_migrations(
780    set: &MigrationSet,
781    path: &str,
782) -> Result<MigrationResult, CliError> {
783    let rt = tokio::runtime::Builder::new_current_thread()
784        .enable_all()
785        .build()
786        .map_err(|e| CliError::Other(format!("Failed to create async runtime: {}", e)))?;
787
788    rt.block_on(run_libsql_local_inner(set, path))
789}
790
791#[cfg(feature = "libsql")]
792async fn run_libsql_local_inner(
793    set: &MigrationSet,
794    path: &str,
795) -> Result<MigrationResult, CliError> {
796    let db = libsql::Builder::new_local(path)
797        .build()
798        .await
799        .map_err(|e| {
800            CliError::ConnectionError(format!("Failed to open LibSQL database '{}': {}", path, e))
801        })?;
802
803    let conn = db
804        .connect()
805        .map_err(|e| CliError::ConnectionError(e.to_string()))?;
806
807    // Create migrations table
808    conn.execute(&set.create_table_sql(), ())
809        .await
810        .map_err(|e| {
811            CliError::MigrationError(format!("Failed to create migrations table: {}", e))
812        })?;
813
814    // Query applied hashes
815    let applied_hashes = query_applied_hashes_libsql(&conn, set).await?;
816
817    // Get pending migrations
818    let pending: Vec<_> = set.pending(&applied_hashes).collect();
819    if pending.is_empty() {
820        return Ok(MigrationResult {
821            applied_count: 0,
822            applied_migrations: vec![],
823        });
824    }
825
826    // Execute in transaction
827    let tx = conn
828        .transaction()
829        .await
830        .map_err(|e| CliError::MigrationError(e.to_string()))?;
831
832    let mut applied = Vec::new();
833    for migration in &pending {
834        for stmt in migration.statements() {
835            if !stmt.trim().is_empty()
836                && let Err(e) = tx.execute(stmt, ()).await
837            {
838                tx.rollback().await.ok();
839                return Err(CliError::MigrationError(format!(
840                    "Migration '{}' failed: {}",
841                    migration.hash(),
842                    e
843                )));
844            }
845        }
846        if let Err(e) = tx
847            .execute(
848                &set.record_migration_sql(migration.hash(), migration.created_at()),
849                (),
850            )
851            .await
852        {
853            tx.rollback().await.ok();
854            return Err(CliError::MigrationError(e.to_string()));
855        }
856        applied.push(migration.hash().to_string());
857    }
858
859    tx.commit()
860        .await
861        .map_err(|e| CliError::MigrationError(e.to_string()))?;
862
863    Ok(MigrationResult {
864        applied_count: applied.len(),
865        applied_migrations: applied,
866    })
867}
868
869#[cfg(feature = "libsql")]
870async fn query_applied_hashes_libsql(
871    conn: &libsql::Connection,
872    set: &MigrationSet,
873) -> Result<Vec<String>, CliError> {
874    let mut rows = match conn.query(&set.query_all_hashes_sql(), ()).await {
875        Ok(r) => r,
876        Err(_) => return Ok(vec![]), // Table might not exist yet
877    };
878
879    let mut hashes = Vec::new();
880    while let Ok(Some(row)) = rows.next().await {
881        if let Ok(hash) = row.get::<String>(0) {
882            hashes.push(hash);
883        }
884    }
885
886    Ok(hashes)
887}
888
889// ============================================================================
890// Turso (remote)
891// ============================================================================
892
893#[cfg(feature = "turso")]
894fn execute_turso_statements(
895    url: &str,
896    auth_token: Option<&str>,
897    statements: &[String],
898) -> Result<(), CliError> {
899    let rt = tokio::runtime::Builder::new_current_thread()
900        .enable_all()
901        .build()
902        .map_err(|e| CliError::Other(format!("Failed to create async runtime: {}", e)))?;
903
904    rt.block_on(execute_turso_inner(url, auth_token, statements))
905}
906
907#[cfg(feature = "turso")]
908async fn execute_turso_inner(
909    url: &str,
910    auth_token: Option<&str>,
911    statements: &[String],
912) -> Result<(), CliError> {
913    let builder =
914        libsql::Builder::new_remote(url.to_string(), auth_token.unwrap_or("").to_string());
915
916    let db = builder.build().await.map_err(|e| {
917        CliError::ConnectionError(format!("Failed to connect to Turso '{}': {}", url, e))
918    })?;
919
920    let conn = db
921        .connect()
922        .map_err(|e| CliError::ConnectionError(e.to_string()))?;
923
924    let tx = conn
925        .transaction()
926        .await
927        .map_err(|e| CliError::MigrationError(e.to_string()))?;
928
929    for stmt in statements {
930        let s = stmt.trim();
931        if s.is_empty() {
932            continue;
933        }
934        if let Err(e) = tx.execute(s, ()).await {
935            tx.rollback().await.ok();
936            return Err(CliError::MigrationError(format!(
937                "Statement failed: {}\n{}",
938                e, s
939            )));
940        }
941    }
942
943    tx.commit()
944        .await
945        .map_err(|e| CliError::MigrationError(e.to_string()))?;
946
947    Ok(())
948}
949
950#[cfg(feature = "turso")]
951fn run_turso_migrations(
952    set: &MigrationSet,
953    url: &str,
954    auth_token: Option<&str>,
955) -> Result<MigrationResult, CliError> {
956    let rt = tokio::runtime::Builder::new_current_thread()
957        .enable_all()
958        .build()
959        .map_err(|e| CliError::Other(format!("Failed to create async runtime: {}", e)))?;
960
961    rt.block_on(run_turso_inner(set, url, auth_token))
962}
963
964#[cfg(feature = "turso")]
965async fn run_turso_inner(
966    set: &MigrationSet,
967    url: &str,
968    auth_token: Option<&str>,
969) -> Result<MigrationResult, CliError> {
970    let builder =
971        libsql::Builder::new_remote(url.to_string(), auth_token.unwrap_or("").to_string());
972
973    let db = builder.build().await.map_err(|e| {
974        CliError::ConnectionError(format!("Failed to connect to Turso '{}': {}", url, e))
975    })?;
976
977    let conn = db
978        .connect()
979        .map_err(|e| CliError::ConnectionError(e.to_string()))?;
980
981    // Create migrations table
982    conn.execute(&set.create_table_sql(), ())
983        .await
984        .map_err(|e| {
985            CliError::MigrationError(format!("Failed to create migrations table: {}", e))
986        })?;
987
988    // Query applied hashes
989    let applied_hashes = query_applied_hashes_turso(&conn, set).await?;
990
991    // Get pending migrations
992    let pending: Vec<_> = set.pending(&applied_hashes).collect();
993    if pending.is_empty() {
994        return Ok(MigrationResult {
995            applied_count: 0,
996            applied_migrations: vec![],
997        });
998    }
999
1000    // Execute in transaction
1001    let tx = conn
1002        .transaction()
1003        .await
1004        .map_err(|e| CliError::MigrationError(e.to_string()))?;
1005
1006    let mut applied = Vec::new();
1007    for migration in &pending {
1008        for stmt in migration.statements() {
1009            if !stmt.trim().is_empty()
1010                && let Err(e) = tx.execute(stmt, ()).await
1011            {
1012                tx.rollback().await.ok();
1013                return Err(CliError::MigrationError(format!(
1014                    "Migration '{}' failed: {}",
1015                    migration.hash(),
1016                    e
1017                )));
1018            }
1019        }
1020        if let Err(e) = tx
1021            .execute(
1022                &set.record_migration_sql(migration.hash(), migration.created_at()),
1023                (),
1024            )
1025            .await
1026        {
1027            tx.rollback().await.ok();
1028            return Err(CliError::MigrationError(e.to_string()));
1029        }
1030        applied.push(migration.hash().to_string());
1031    }
1032
1033    tx.commit()
1034        .await
1035        .map_err(|e| CliError::MigrationError(e.to_string()))?;
1036
1037    Ok(MigrationResult {
1038        applied_count: applied.len(),
1039        applied_migrations: applied,
1040    })
1041}
1042
1043#[cfg(feature = "turso")]
1044async fn query_applied_hashes_turso(
1045    conn: &libsql::Connection,
1046    set: &MigrationSet,
1047) -> Result<Vec<String>, CliError> {
1048    let mut rows = match conn.query(&set.query_all_hashes_sql(), ()).await {
1049        Ok(r) => r,
1050        Err(_) => return Ok(vec![]), // Table might not exist yet
1051    };
1052
1053    let mut hashes = Vec::new();
1054    while let Ok(Some(row)) = rows.next().await {
1055        if let Ok(hash) = row.get::<String>(0) {
1056            hashes.push(hash);
1057        }
1058    }
1059
1060    Ok(hashes)
1061}
1062
1063// ============================================================================
1064// Database Introspection
1065// ============================================================================
1066
1067/// Result of database introspection
1068#[derive(Debug)]
1069pub struct IntrospectResult {
1070    /// Generated Rust schema code
1071    pub schema_code: String,
1072    /// Number of tables found
1073    pub table_count: usize,
1074    /// Number of indexes found
1075    pub index_count: usize,
1076    /// Number of views found
1077    pub view_count: usize,
1078    /// Any warnings during introspection
1079    pub warnings: Vec<String>,
1080    /// The schema snapshot for migration tracking
1081    pub snapshot: Snapshot,
1082    /// Path to the generated snapshot file
1083    pub snapshot_path: std::path::PathBuf,
1084}
1085
1086/// Introspect a database and write schema/snapshot files
1087///
1088/// This is the main entry point for CLI introspection.
1089pub fn run_introspection(
1090    credentials: &Credentials,
1091    dialect: Dialect,
1092    out_dir: &Path,
1093    init_metadata: bool,
1094    migrations_table: &str,
1095    migrations_schema: &str,
1096) -> Result<IntrospectResult, CliError> {
1097    use drizzle_migrations::journal::Journal;
1098    use drizzle_migrations::words::generate_migration_tag;
1099
1100    // Perform introspection
1101    let mut result = introspect_database(credentials, dialect)?;
1102
1103    // Write schema file
1104    let schema_path = out_dir.join("schema.rs");
1105    if let Some(parent) = schema_path.parent() {
1106        std::fs::create_dir_all(parent).map_err(|e| {
1107            CliError::Other(format!(
1108                "Failed to create output directory '{}': {}",
1109                parent.display(),
1110                e
1111            ))
1112        })?;
1113    }
1114    std::fs::write(&schema_path, &result.schema_code).map_err(|e| {
1115        CliError::Other(format!(
1116            "Failed to write schema file '{}': {}",
1117            schema_path.display(),
1118            e
1119        ))
1120    })?;
1121
1122    // Create meta directory for journal
1123    let meta_dir = out_dir.join("meta");
1124    std::fs::create_dir_all(&meta_dir).map_err(|e| {
1125        CliError::Other(format!(
1126            "Failed to create meta directory '{}': {}",
1127            meta_dir.display(),
1128            e
1129        ))
1130    })?;
1131
1132    // Load or create journal
1133    let journal_path = meta_dir.join("_journal.json");
1134    let mut journal = Journal::load_or_create(&journal_path, dialect.to_base())
1135        .map_err(|e| CliError::Other(format!("Failed to load journal: {}", e)))?;
1136
1137    // Generate migration tag (V3 format: timestamp-based)
1138    let tag = generate_migration_tag(None);
1139
1140    // Create migration directory: {out}/{tag}/
1141    let migration_dir = out_dir.join(&tag);
1142    std::fs::create_dir_all(&migration_dir).map_err(|e| {
1143        CliError::Other(format!(
1144            "Failed to create migration directory '{}': {}",
1145            migration_dir.display(),
1146            e
1147        ))
1148    })?;
1149
1150    // Save snapshot JSON: {out}/{tag}/snapshot.json
1151    let snapshot_path = migration_dir.join("snapshot.json");
1152    result.snapshot.save(&snapshot_path).map_err(|e| {
1153        CliError::Other(format!(
1154            "Failed to write snapshot file '{}': {}",
1155            snapshot_path.display(),
1156            e
1157        ))
1158    })?;
1159
1160    // Generate initial migration SQL by diffing against empty snapshot
1161    let base_dialect = dialect.to_base();
1162    let empty_snapshot = Snapshot::empty(base_dialect);
1163    let sql_statements = generate_introspect_migration(&empty_snapshot, &result.snapshot, true)?;
1164
1165    // Write migration.sql: {out}/{tag}/migration.sql
1166    let migration_sql_path = migration_dir.join("migration.sql");
1167    let sql_content = if sql_statements.is_empty() {
1168        "-- No tables to create (empty database)\n".to_string()
1169    } else {
1170        sql_statements.join("\n--> statement-breakpoint\n")
1171    };
1172    std::fs::write(&migration_sql_path, &sql_content).map_err(|e| {
1173        CliError::Other(format!(
1174            "Failed to write migration file '{}': {}",
1175            migration_sql_path.display(),
1176            e
1177        ))
1178    })?;
1179
1180    // Update result with path
1181    result.snapshot_path = snapshot_path;
1182
1183    // Update journal
1184    journal.add_entry(tag.clone(), true); // Default to breakpoints=true for now
1185    journal
1186        .save(&journal_path)
1187        .map_err(|e| CliError::Other(format!("Failed to save journal: {}", e)))?;
1188
1189    if init_metadata {
1190        apply_init_metadata(
1191            credentials,
1192            dialect,
1193            out_dir,
1194            migrations_table,
1195            migrations_schema,
1196        )?;
1197    }
1198
1199    Ok(result)
1200}
1201
1202// =============================================================================
1203// Init metadata handling
1204// =============================================================================
1205
1206fn apply_init_metadata(
1207    credentials: &Credentials,
1208    dialect: Dialect,
1209    out_dir: &Path,
1210    migrations_table: &str,
1211    migrations_schema: &str,
1212) -> Result<(), CliError> {
1213    use drizzle_migrations::MigrationSet;
1214
1215    let mut set = MigrationSet::from_dir(out_dir, dialect.to_base())
1216        .map_err(|e| CliError::Other(format!("Failed to load migrations: {}", e)))?;
1217
1218    if !migrations_table.trim().is_empty() {
1219        set = set.with_table(migrations_table.to_string());
1220    }
1221    if dialect == Dialect::Postgresql && !migrations_schema.trim().is_empty() {
1222        set = set.with_schema(migrations_schema.to_string());
1223    }
1224
1225    if set.all().is_empty() {
1226        return Err(CliError::Other(
1227            "--init can't be used with empty migrations".into(),
1228        ));
1229    }
1230
1231    match credentials {
1232        #[cfg(feature = "rusqlite")]
1233        Credentials::Sqlite { path } => init_sqlite_metadata(path, &set),
1234
1235        #[cfg(not(feature = "rusqlite"))]
1236        Credentials::Sqlite { .. } => Err(CliError::MissingDriver {
1237            dialect: "SQLite",
1238            feature: "rusqlite",
1239        }),
1240
1241        #[cfg(any(feature = "libsql", feature = "turso"))]
1242        Credentials::Turso { url, auth_token } => {
1243            let _auth_token = auth_token.as_deref();
1244            if is_local_libsql(url) {
1245                #[cfg(feature = "libsql")]
1246                {
1247                    init_libsql_local_metadata(url, &set)
1248                }
1249                #[cfg(not(feature = "libsql"))]
1250                {
1251                    Err(CliError::MissingDriver {
1252                        dialect: "LibSQL (local)",
1253                        feature: "libsql",
1254                    })
1255                }
1256            } else {
1257                #[cfg(feature = "turso")]
1258                {
1259                    init_turso_metadata(url, _auth_token, &set)
1260                }
1261                #[cfg(not(feature = "turso"))]
1262                {
1263                    Err(CliError::MissingDriver {
1264                        dialect: "Turso (remote)",
1265                        feature: "turso",
1266                    })
1267                }
1268            }
1269        }
1270
1271        #[cfg(all(not(feature = "turso"), not(feature = "libsql")))]
1272        Credentials::Turso { .. } => Err(CliError::MissingDriver {
1273            dialect: "Turso",
1274            feature: "turso or libsql",
1275        }),
1276
1277        #[cfg(feature = "postgres-sync")]
1278        Credentials::Postgres(creds) => init_postgres_sync_metadata(creds, &set),
1279
1280        #[cfg(all(not(feature = "postgres-sync"), feature = "tokio-postgres"))]
1281        Credentials::Postgres(creds) => init_postgres_async_metadata(creds, &set),
1282
1283        #[cfg(all(not(feature = "postgres-sync"), not(feature = "tokio-postgres")))]
1284        Credentials::Postgres(_) => Err(CliError::MissingDriver {
1285            dialect: "PostgreSQL",
1286            feature: "postgres-sync or tokio-postgres",
1287        }),
1288    }
1289}
1290
1291#[cfg(any(
1292    feature = "rusqlite",
1293    feature = "libsql",
1294    feature = "turso",
1295    feature = "postgres-sync",
1296    feature = "tokio-postgres"
1297))]
1298fn validate_init_metadata(applied_hashes: &[String], set: &MigrationSet) -> Result<(), CliError> {
1299    if !applied_hashes.is_empty() {
1300        return Err(CliError::Other(
1301            "--init can't be used when database already has migrations set".into(),
1302        ));
1303    }
1304
1305    let first = set
1306        .all()
1307        .first()
1308        .ok_or_else(|| CliError::Other("--init can't be used with empty migrations".into()))?;
1309
1310    let created_at = first.created_at();
1311    if set.all().iter().any(|m| m.created_at() != created_at) {
1312        return Err(CliError::Other(
1313            "--init can't be used with existing migrations".into(),
1314        ));
1315    }
1316
1317    Ok(())
1318}
1319
1320// =============================================================================
1321// Init metadata implementations
1322// =============================================================================
1323
1324#[cfg(feature = "rusqlite")]
1325fn init_sqlite_metadata(path: &str, set: &MigrationSet) -> Result<(), CliError> {
1326    let conn = rusqlite::Connection::open(path).map_err(|e| {
1327        CliError::ConnectionError(format!("Failed to open SQLite database '{}': {}", path, e))
1328    })?;
1329
1330    conn.execute(&set.create_table_sql(), []).map_err(|e| {
1331        CliError::MigrationError(format!("Failed to create migrations table: {}", e))
1332    })?;
1333
1334    let applied_hashes = query_applied_hashes_sqlite(&conn, set)?;
1335    validate_init_metadata(&applied_hashes, set)?;
1336
1337    let first = set
1338        .all()
1339        .first()
1340        .ok_or_else(|| CliError::Other("--init can't be used with empty migrations".into()))?;
1341
1342    conn.execute(
1343        &set.record_migration_sql(first.hash(), first.created_at()),
1344        [],
1345    )
1346    .map_err(|e| CliError::MigrationError(e.to_string()))?;
1347
1348    Ok(())
1349}
1350
1351#[cfg(feature = "libsql")]
1352fn init_libsql_local_metadata(path: &str, set: &MigrationSet) -> Result<(), CliError> {
1353    let rt = tokio::runtime::Builder::new_current_thread()
1354        .enable_all()
1355        .build()
1356        .map_err(|e| CliError::Other(format!("Failed to create async runtime: {}", e)))?;
1357
1358    rt.block_on(init_libsql_local_metadata_inner(path, set))
1359}
1360
1361#[cfg(feature = "libsql")]
1362async fn init_libsql_local_metadata_inner(path: &str, set: &MigrationSet) -> Result<(), CliError> {
1363    let db = libsql::Builder::new_local(path)
1364        .build()
1365        .await
1366        .map_err(|e| {
1367            CliError::ConnectionError(format!("Failed to open LibSQL database '{}': {}", path, e))
1368        })?;
1369
1370    let conn = db
1371        .connect()
1372        .map_err(|e| CliError::ConnectionError(e.to_string()))?;
1373
1374    conn.execute(&set.create_table_sql(), ())
1375        .await
1376        .map_err(|e| {
1377            CliError::MigrationError(format!("Failed to create migrations table: {}", e))
1378        })?;
1379
1380    let applied_hashes = query_applied_hashes_libsql(&conn, set).await?;
1381    validate_init_metadata(&applied_hashes, set)?;
1382
1383    let first = set
1384        .all()
1385        .first()
1386        .ok_or_else(|| CliError::Other("--init can't be used with empty migrations".into()))?;
1387
1388    conn.execute(
1389        &set.record_migration_sql(first.hash(), first.created_at()),
1390        (),
1391    )
1392    .await
1393    .map_err(|e| CliError::MigrationError(e.to_string()))?;
1394
1395    Ok(())
1396}
1397
1398#[cfg(feature = "turso")]
1399fn init_turso_metadata(
1400    url: &str,
1401    auth_token: Option<&str>,
1402    set: &MigrationSet,
1403) -> Result<(), CliError> {
1404    let rt = tokio::runtime::Builder::new_current_thread()
1405        .enable_all()
1406        .build()
1407        .map_err(|e| CliError::Other(format!("Failed to create async runtime: {}", e)))?;
1408
1409    rt.block_on(init_turso_metadata_inner(url, auth_token, set))
1410}
1411
1412#[cfg(feature = "turso")]
1413async fn init_turso_metadata_inner(
1414    url: &str,
1415    auth_token: Option<&str>,
1416    set: &MigrationSet,
1417) -> Result<(), CliError> {
1418    let builder =
1419        libsql::Builder::new_remote(url.to_string(), auth_token.unwrap_or("").to_string());
1420
1421    let db = builder.build().await.map_err(|e| {
1422        CliError::ConnectionError(format!("Failed to connect to Turso '{}': {}", url, e))
1423    })?;
1424
1425    let conn = db
1426        .connect()
1427        .map_err(|e| CliError::ConnectionError(e.to_string()))?;
1428
1429    conn.execute(&set.create_table_sql(), ())
1430        .await
1431        .map_err(|e| {
1432            CliError::MigrationError(format!("Failed to create migrations table: {}", e))
1433        })?;
1434
1435    let applied_hashes = query_applied_hashes_turso(&conn, set).await?;
1436    validate_init_metadata(&applied_hashes, set)?;
1437
1438    let first = set
1439        .all()
1440        .first()
1441        .ok_or_else(|| CliError::Other("--init can't be used with empty migrations".into()))?;
1442
1443    conn.execute(
1444        &set.record_migration_sql(first.hash(), first.created_at()),
1445        (),
1446    )
1447    .await
1448    .map_err(|e| CliError::MigrationError(e.to_string()))?;
1449
1450    Ok(())
1451}
1452
1453#[cfg(feature = "postgres-sync")]
1454fn init_postgres_sync_metadata(creds: &PostgresCreds, set: &MigrationSet) -> Result<(), CliError> {
1455    let url = creds.connection_url();
1456    let mut client = postgres::Client::connect(&url, postgres::NoTls).map_err(|e| {
1457        CliError::ConnectionError(format!("Failed to connect to PostgreSQL: {}", e))
1458    })?;
1459
1460    if let Some(schema_sql) = set.create_schema_sql() {
1461        client
1462            .execute(&schema_sql, &[])
1463            .map_err(|e| CliError::MigrationError(e.to_string()))?;
1464    }
1465
1466    client
1467        .execute(&set.create_table_sql(), &[])
1468        .map_err(|e| CliError::MigrationError(e.to_string()))?;
1469
1470    let rows = client
1471        .query(&set.query_all_hashes_sql(), &[])
1472        .unwrap_or_default();
1473    let applied_hashes: Vec<String> = rows.iter().map(|r| r.get(0)).collect();
1474
1475    validate_init_metadata(&applied_hashes, set)?;
1476
1477    let first = set
1478        .all()
1479        .first()
1480        .ok_or_else(|| CliError::Other("--init can't be used with empty migrations".into()))?;
1481
1482    client
1483        .execute(
1484            &set.record_migration_sql(first.hash(), first.created_at()),
1485            &[],
1486        )
1487        .map_err(|e| CliError::MigrationError(e.to_string()))?;
1488
1489    Ok(())
1490}
1491
1492#[cfg(feature = "tokio-postgres")]
1493fn init_postgres_async_metadata(creds: &PostgresCreds, set: &MigrationSet) -> Result<(), CliError> {
1494    let rt = tokio::runtime::Builder::new_current_thread()
1495        .enable_all()
1496        .build()
1497        .map_err(|e| CliError::Other(format!("Failed to create async runtime: {}", e)))?;
1498
1499    rt.block_on(init_postgres_async_inner(creds, set))
1500}
1501
1502#[cfg(feature = "tokio-postgres")]
1503async fn init_postgres_async_inner(
1504    creds: &PostgresCreds,
1505    set: &MigrationSet,
1506) -> Result<(), CliError> {
1507    let url = creds.connection_url();
1508    let (client, connection) = tokio_postgres::connect(&url, tokio_postgres::NoTls)
1509        .await
1510        .map_err(|e| {
1511            CliError::ConnectionError(format!("Failed to connect to PostgreSQL: {}", e))
1512        })?;
1513
1514    tokio::spawn(async move {
1515        if let Err(e) = connection.await {
1516            eprintln!(
1517                "{}",
1518                output::err_line(&format!("PostgreSQL connection error: {e}"))
1519            );
1520        }
1521    });
1522
1523    if let Some(schema_sql) = set.create_schema_sql() {
1524        client
1525            .execute(&schema_sql, &[])
1526            .await
1527            .map_err(|e| CliError::MigrationError(e.to_string()))?;
1528    }
1529
1530    client
1531        .execute(&set.create_table_sql(), &[])
1532        .await
1533        .map_err(|e| CliError::MigrationError(e.to_string()))?;
1534
1535    let rows = client
1536        .query(&set.query_all_hashes_sql(), &[])
1537        .await
1538        .unwrap_or_default();
1539    let applied_hashes: Vec<String> = rows.iter().map(|r| r.get(0)).collect();
1540
1541    validate_init_metadata(&applied_hashes, set)?;
1542
1543    let first = set
1544        .all()
1545        .first()
1546        .ok_or_else(|| CliError::Other("--init can't be used with empty migrations".into()))?;
1547
1548    client
1549        .execute(
1550            &set.record_migration_sql(first.hash(), first.created_at()),
1551            &[],
1552        )
1553        .await
1554        .map_err(|e| CliError::MigrationError(e.to_string()))?;
1555
1556    Ok(())
1557}
1558
1559/// Generate migration SQL from snapshot diff (for introspection)
1560fn generate_introspect_migration(
1561    prev: &Snapshot,
1562    current: &Snapshot,
1563    _breakpoints: bool,
1564) -> Result<Vec<String>, CliError> {
1565    match (prev, current) {
1566        (Snapshot::Sqlite(prev_snap), Snapshot::Sqlite(curr_snap)) => {
1567            use drizzle_migrations::sqlite::diff_snapshots;
1568            use drizzle_migrations::sqlite::statements::SqliteGenerator;
1569
1570            let diff = diff_snapshots(prev_snap, curr_snap);
1571            let generator = SqliteGenerator::new().with_breakpoints(true);
1572            Ok(generator.generate_migration(&diff))
1573        }
1574        (Snapshot::Postgres(prev_snap), Snapshot::Postgres(curr_snap)) => {
1575            use drizzle_migrations::postgres::diff_full_snapshots;
1576            use drizzle_migrations::postgres::statements::PostgresGenerator;
1577
1578            let diff = diff_full_snapshots(prev_snap, curr_snap);
1579            let generator = PostgresGenerator::new().with_breakpoints(true);
1580            Ok(generator.generate(&diff.diffs))
1581        }
1582        _ => Err(CliError::DialectMismatch),
1583    }
1584}
1585
1586/// Introspect a database and generate schema code
1587fn introspect_database(
1588    credentials: &Credentials,
1589    dialect: Dialect,
1590) -> Result<IntrospectResult, CliError> {
1591    match dialect {
1592        Dialect::Sqlite | Dialect::Turso => introspect_sqlite_dialect(credentials),
1593        Dialect::Postgresql => introspect_postgres_dialect(credentials),
1594    }
1595}
1596
1597/// Introspect SQLite-family databases
1598fn introspect_sqlite_dialect(credentials: &Credentials) -> Result<IntrospectResult, CliError> {
1599    match credentials {
1600        #[cfg(feature = "rusqlite")]
1601        Credentials::Sqlite { path } => introspect_rusqlite(path),
1602
1603        #[cfg(not(feature = "rusqlite"))]
1604        Credentials::Sqlite { .. } => Err(CliError::MissingDriver {
1605            dialect: "SQLite",
1606            feature: "rusqlite",
1607        }),
1608
1609        #[cfg(any(feature = "libsql", feature = "turso"))]
1610        Credentials::Turso { url, auth_token } => {
1611            let _auth_token = auth_token.as_deref();
1612            if is_local_libsql(url) {
1613                #[cfg(feature = "libsql")]
1614                {
1615                    introspect_libsql_local(url)
1616                }
1617                #[cfg(not(feature = "libsql"))]
1618                {
1619                    Err(CliError::MissingDriver {
1620                        dialect: "LibSQL (local)",
1621                        feature: "libsql",
1622                    })
1623                }
1624            } else {
1625                #[cfg(feature = "turso")]
1626                {
1627                    introspect_turso(url, _auth_token)
1628                }
1629                #[cfg(not(feature = "turso"))]
1630                {
1631                    Err(CliError::MissingDriver {
1632                        dialect: "Turso (remote)",
1633                        feature: "turso",
1634                    })
1635                }
1636            }
1637        }
1638
1639        #[cfg(all(not(feature = "turso"), not(feature = "libsql")))]
1640        Credentials::Turso { .. } => Err(CliError::MissingDriver {
1641            dialect: "Turso",
1642            feature: "turso or libsql",
1643        }),
1644
1645        _ => Err(CliError::Other(
1646            "SQLite introspection requires sqlite path or turso credentials".into(),
1647        )),
1648    }
1649}
1650
1651/// Introspect PostgreSQL databases
1652fn introspect_postgres_dialect(credentials: &Credentials) -> Result<IntrospectResult, CliError> {
1653    match credentials {
1654        #[cfg(feature = "postgres-sync")]
1655        Credentials::Postgres(creds) => introspect_postgres_sync(creds),
1656
1657        #[cfg(all(not(feature = "postgres-sync"), feature = "tokio-postgres"))]
1658        Credentials::Postgres(creds) => introspect_postgres_async(creds),
1659
1660        #[cfg(all(not(feature = "postgres-sync"), not(feature = "tokio-postgres")))]
1661        Credentials::Postgres(_) => Err(CliError::MissingDriver {
1662            dialect: "PostgreSQL",
1663            feature: "postgres-sync or tokio-postgres",
1664        }),
1665
1666        _ => Err(CliError::Other(
1667            "PostgreSQL introspection requires postgres credentials".into(),
1668        )),
1669    }
1670}
1671
1672// ============================================================================
1673// SQLite Introspection (rusqlite)
1674// ============================================================================
1675
1676#[cfg(feature = "rusqlite")]
1677fn introspect_rusqlite(path: &str) -> Result<IntrospectResult, CliError> {
1678    use drizzle_migrations::sqlite::{
1679        SQLiteDDL, Table, View,
1680        codegen::{CodegenOptions, generate_rust_schema},
1681        introspect::{
1682            RawColumnInfo, RawForeignKey, RawIndexColumn, RawIndexInfo, RawViewInfo,
1683            parse_generated_columns_from_table_sql, parse_view_sql, process_columns,
1684            process_foreign_keys, process_indexes, queries,
1685        },
1686    };
1687    use std::collections::{HashMap, HashSet};
1688
1689    let conn = rusqlite::Connection::open(path).map_err(|e| {
1690        CliError::ConnectionError(format!("Failed to open SQLite database '{}': {}", path, e))
1691    })?;
1692
1693    // Query tables
1694    let mut tables_stmt = conn
1695        .prepare(queries::TABLES_QUERY)
1696        .map_err(|e| CliError::Other(format!("Failed to prepare tables query: {}", e)))?;
1697
1698    let tables: Vec<(String, Option<String>)> = tables_stmt
1699        .query_map([], |row| Ok((row.get(0)?, row.get(1)?)))
1700        .map_err(|e| CliError::Other(e.to_string()))?
1701        .filter_map(Result::ok)
1702        .collect();
1703
1704    let table_sql_map: HashMap<String, String> = tables
1705        .iter()
1706        .filter_map(|(name, sql)| sql.as_ref().map(|s| (name.clone(), s.clone())))
1707        .collect();
1708
1709    // Query columns
1710    let mut columns_stmt = conn
1711        .prepare(queries::COLUMNS_QUERY)
1712        .map_err(|e| CliError::Other(format!("Failed to prepare columns query: {}", e)))?;
1713
1714    let raw_columns: Vec<RawColumnInfo> = columns_stmt
1715        .query_map([], |row| {
1716            Ok(RawColumnInfo {
1717                table: row.get(0)?,
1718                cid: row.get(1)?,
1719                name: row.get(2)?,
1720                column_type: row.get(3)?,
1721                not_null: row.get(4)?,
1722                default_value: row.get(5)?,
1723                pk: row.get(6)?,
1724                hidden: row.get(7)?,
1725                sql: row.get(8)?,
1726            })
1727        })
1728        .map_err(|e| CliError::Other(e.to_string()))?
1729        .filter_map(Result::ok)
1730        .collect();
1731
1732    // Query indexes and foreign keys for each table
1733    let mut all_indexes: Vec<RawIndexInfo> = Vec::new();
1734    let mut all_index_columns: Vec<RawIndexColumn> = Vec::new();
1735    let mut all_fks: Vec<RawForeignKey> = Vec::new();
1736    let mut all_views: Vec<RawViewInfo> = Vec::new();
1737
1738    for (table_name, _) in &tables {
1739        // Indexes
1740        if let Ok(mut idx_stmt) = conn.prepare(&queries::indexes_query(table_name)) {
1741            let indexes: Vec<RawIndexInfo> = idx_stmt
1742                .query_map([], |row| {
1743                    Ok(RawIndexInfo {
1744                        table: table_name.clone(),
1745                        name: row.get(1)?,
1746                        unique: row.get::<_, i32>(2)? != 0,
1747                        origin: row.get(3)?,
1748                        partial: row.get::<_, i32>(4)? != 0,
1749                    })
1750                })
1751                .map_err(|e| CliError::Other(e.to_string()))?
1752                .filter_map(Result::ok)
1753                .collect();
1754
1755            // Index columns
1756            for idx in &indexes {
1757                if let Ok(mut col_stmt) = conn.prepare(&queries::index_info_query(&idx.name))
1758                    && let Ok(col_iter) = col_stmt.query_map([], |row| {
1759                        Ok(RawIndexColumn {
1760                            index_name: idx.name.clone(),
1761                            seqno: row.get(0)?,
1762                            cid: row.get(1)?,
1763                            name: row.get(2)?,
1764                            desc: row.get::<_, i32>(3)? != 0,
1765                            coll: row.get(4)?,
1766                            key: row.get::<_, i32>(5)? != 0,
1767                        })
1768                    })
1769                {
1770                    all_index_columns.extend(col_iter.filter_map(Result::ok));
1771                }
1772            }
1773            all_indexes.extend(indexes);
1774        }
1775
1776        // Foreign keys
1777        if let Ok(mut fk_stmt) = conn.prepare(&queries::foreign_keys_query(table_name))
1778            && let Ok(fk_iter) = fk_stmt.query_map([], |row| {
1779                Ok(RawForeignKey {
1780                    table: table_name.clone(),
1781                    id: row.get(0)?,
1782                    seq: row.get(1)?,
1783                    to_table: row.get(2)?,
1784                    from_column: row.get(3)?,
1785                    to_column: row.get(4)?,
1786                    on_update: row.get(5)?,
1787                    on_delete: row.get(6)?,
1788                    r#match: row.get(7)?,
1789                })
1790            })
1791        {
1792            all_fks.extend(fk_iter.filter_map(Result::ok));
1793        }
1794    }
1795
1796    // Views
1797    if let Ok(mut views_stmt) = conn.prepare(queries::VIEWS_QUERY)
1798        && let Ok(view_iter) = views_stmt.query_map([], |row| {
1799            Ok(RawViewInfo {
1800                name: row.get(0)?,
1801                sql: row.get(1)?,
1802            })
1803        })
1804    {
1805        all_views.extend(view_iter.filter_map(Result::ok));
1806    }
1807
1808    // Process raw data into DDL entities
1809    let mut generated_columns: HashMap<String, drizzle_migrations::sqlite::ddl::ParsedGenerated> =
1810        HashMap::new();
1811    for (table, sql) in &table_sql_map {
1812        generated_columns.extend(parse_generated_columns_from_table_sql(table, sql));
1813    }
1814    let pk_columns: HashSet<(String, String)> = raw_columns
1815        .iter()
1816        .filter(|c| c.pk > 0)
1817        .map(|c| (c.table.clone(), c.name.clone()))
1818        .collect();
1819
1820    let (columns, primary_keys) = process_columns(&raw_columns, &generated_columns, &pk_columns);
1821    let indexes = process_indexes(&all_indexes, &all_index_columns, &table_sql_map);
1822    let foreign_keys = process_foreign_keys(&all_fks);
1823
1824    // Unique constraints (origin == 'u' indexes)
1825    let uniques = process_sqlite_uniques_from_indexes(&all_indexes, &all_index_columns);
1826
1827    // Build DDL collection
1828    let mut ddl = SQLiteDDL::new();
1829
1830    for (table_name, table_sql) in &tables {
1831        let mut table = Table::new(table_name.clone());
1832        // Parse table options from SQL if available
1833        if let Some(sql) = table_sql {
1834            let sql_upper = sql.to_uppercase();
1835            table.strict = sql_upper.contains(" STRICT");
1836            table.without_rowid = sql_upper.contains("WITHOUT ROWID");
1837        }
1838        ddl.tables.push(table);
1839    }
1840
1841    for col in columns {
1842        ddl.columns.push(col);
1843    }
1844
1845    for idx in indexes {
1846        ddl.indexes.push(idx);
1847    }
1848
1849    for fk in foreign_keys {
1850        ddl.fks.push(fk);
1851    }
1852
1853    for pk in primary_keys {
1854        ddl.pks.push(pk);
1855    }
1856
1857    for u in uniques {
1858        ddl.uniques.push(u);
1859    }
1860
1861    // Views
1862    for v in all_views {
1863        let mut view = View::new(v.name);
1864        if let Some(def) = parse_view_sql(&v.sql) {
1865            view.definition = Some(def.into());
1866        } else {
1867            view.error = Some("Failed to parse view SQL".into());
1868        }
1869        ddl.views.push(view);
1870    }
1871
1872    // Generate Rust code
1873    let options = CodegenOptions {
1874        module_doc: Some(format!("Schema introspected from {}", path)),
1875        include_schema: true,
1876        schema_name: "Schema".to_string(),
1877        use_pub: true,
1878    };
1879
1880    let generated = generate_rust_schema(&ddl, &options);
1881
1882    // Create snapshot from DDL
1883    let mut sqlite_snapshot = drizzle_migrations::sqlite::SQLiteSnapshot::new();
1884    for entity in ddl.to_entities() {
1885        sqlite_snapshot.add_entity(entity);
1886    }
1887    let snapshot = Snapshot::Sqlite(sqlite_snapshot);
1888
1889    Ok(IntrospectResult {
1890        schema_code: generated.code,
1891        table_count: generated.tables.len(),
1892        index_count: generated.indexes.len(),
1893        view_count: ddl.views.len(),
1894        warnings: generated.warnings,
1895        snapshot,
1896        snapshot_path: std::path::PathBuf::new(),
1897    })
1898}
1899
1900// ============================================================================
1901// LibSQL Introspection (local)
1902// ============================================================================
1903
1904#[cfg(feature = "libsql")]
1905fn introspect_libsql_local(path: &str) -> Result<IntrospectResult, CliError> {
1906    let rt = tokio::runtime::Builder::new_current_thread()
1907        .enable_all()
1908        .build()
1909        .map_err(|e| CliError::Other(format!("Failed to create async runtime: {}", e)))?;
1910
1911    rt.block_on(introspect_libsql_inner(path, None))
1912}
1913
1914#[cfg(feature = "libsql")]
1915async fn introspect_libsql_inner(
1916    path: &str,
1917    _auth_token: Option<&str>,
1918) -> Result<IntrospectResult, CliError> {
1919    use drizzle_migrations::sqlite::{
1920        SQLiteDDL, Table, View,
1921        codegen::{CodegenOptions, generate_rust_schema},
1922        introspect::{
1923            RawColumnInfo, RawForeignKey, RawIndexColumn, RawIndexInfo, RawViewInfo,
1924            parse_generated_columns_from_table_sql, parse_view_sql, process_columns,
1925            process_foreign_keys, process_indexes, queries,
1926        },
1927    };
1928    use std::collections::{HashMap, HashSet};
1929
1930    let db = libsql::Builder::new_local(path)
1931        .build()
1932        .await
1933        .map_err(|e| {
1934            CliError::ConnectionError(format!("Failed to open LibSQL database '{}': {}", path, e))
1935        })?;
1936
1937    let conn = db
1938        .connect()
1939        .map_err(|e| CliError::ConnectionError(e.to_string()))?;
1940
1941    // Query tables
1942    let mut tables_rows = conn
1943        .query(queries::TABLES_QUERY, ())
1944        .await
1945        .map_err(|e| CliError::Other(format!("Failed to query tables: {}", e)))?;
1946
1947    let mut tables: Vec<(String, Option<String>)> = Vec::new();
1948    while let Ok(Some(row)) = tables_rows.next().await {
1949        let name: String = row.get(0).unwrap_or_default();
1950        let sql: Option<String> = row.get(1).ok();
1951        tables.push((name, sql));
1952    }
1953
1954    let table_sql_map: HashMap<String, String> = tables
1955        .iter()
1956        .filter_map(|(name, sql)| sql.as_ref().map(|s| (name.clone(), s.clone())))
1957        .collect();
1958
1959    // Query columns
1960    let mut columns_rows = conn
1961        .query(queries::COLUMNS_QUERY, ())
1962        .await
1963        .map_err(|e| CliError::Other(format!("Failed to query columns: {}", e)))?;
1964
1965    let mut raw_columns: Vec<RawColumnInfo> = Vec::new();
1966    while let Ok(Some(row)) = columns_rows.next().await {
1967        raw_columns.push(RawColumnInfo {
1968            table: row.get(0).unwrap_or_default(),
1969            cid: row.get(1).unwrap_or(0),
1970            name: row.get(2).unwrap_or_default(),
1971            column_type: row.get(3).unwrap_or_default(),
1972            not_null: row.get::<i32>(4).unwrap_or(0) != 0,
1973            default_value: row.get(5).ok(),
1974            pk: row.get(6).unwrap_or(0),
1975            hidden: row.get(7).unwrap_or(0),
1976            sql: row.get(8).ok(),
1977        });
1978    }
1979
1980    // Query indexes and foreign keys
1981    let mut all_indexes: Vec<RawIndexInfo> = Vec::new();
1982    let mut all_index_columns: Vec<RawIndexColumn> = Vec::new();
1983    let mut all_fks: Vec<RawForeignKey> = Vec::new();
1984    let mut all_views: Vec<RawViewInfo> = Vec::new();
1985
1986    for (table_name, _) in &tables {
1987        // Indexes
1988        if let Ok(mut idx_rows) = conn.query(&queries::indexes_query(table_name), ()).await {
1989            while let Ok(Some(row)) = idx_rows.next().await {
1990                let idx = RawIndexInfo {
1991                    table: table_name.clone(),
1992                    name: row.get(1).unwrap_or_default(),
1993                    unique: row.get::<i32>(2).unwrap_or(0) != 0,
1994                    origin: row.get(3).unwrap_or_default(),
1995                    partial: row.get::<i32>(4).unwrap_or(0) != 0,
1996                };
1997
1998                // Index columns
1999                if let Ok(mut col_rows) =
2000                    conn.query(&queries::index_info_query(&idx.name), ()).await
2001                {
2002                    while let Ok(Some(col_row)) = col_rows.next().await {
2003                        all_index_columns.push(RawIndexColumn {
2004                            index_name: idx.name.clone(),
2005                            seqno: col_row.get(0).unwrap_or(0),
2006                            cid: col_row.get(1).unwrap_or(0),
2007                            name: col_row.get(2).ok(),
2008                            desc: col_row.get::<i32>(3).unwrap_or(0) != 0,
2009                            coll: col_row.get(4).unwrap_or_default(),
2010                            key: col_row.get::<i32>(5).unwrap_or(0) != 0,
2011                        });
2012                    }
2013                }
2014
2015                all_indexes.push(idx);
2016            }
2017        }
2018
2019        // Foreign keys
2020        if let Ok(mut fk_rows) = conn
2021            .query(&queries::foreign_keys_query(table_name), ())
2022            .await
2023        {
2024            while let Ok(Some(row)) = fk_rows.next().await {
2025                all_fks.push(RawForeignKey {
2026                    table: table_name.clone(),
2027                    id: row.get(0).unwrap_or(0),
2028                    seq: row.get(1).unwrap_or(0),
2029                    to_table: row.get(2).unwrap_or_default(),
2030                    from_column: row.get(3).unwrap_or_default(),
2031                    to_column: row.get(4).unwrap_or_default(),
2032                    on_update: row.get(5).unwrap_or_default(),
2033                    on_delete: row.get(6).unwrap_or_default(),
2034                    r#match: row.get(7).unwrap_or_default(),
2035                });
2036            }
2037        }
2038    }
2039
2040    // Views
2041    if let Ok(mut views_rows) = conn.query(queries::VIEWS_QUERY, ()).await {
2042        while let Ok(Some(row)) = views_rows.next().await {
2043            let name: String = row.get(0).unwrap_or_default();
2044            let sql: String = row.get(1).unwrap_or_default();
2045            all_views.push(RawViewInfo { name, sql });
2046        }
2047    }
2048
2049    // Process into DDL
2050    let mut generated_columns: HashMap<String, drizzle_migrations::sqlite::ddl::ParsedGenerated> =
2051        HashMap::new();
2052    for (table, sql) in &table_sql_map {
2053        generated_columns.extend(parse_generated_columns_from_table_sql(table, sql));
2054    }
2055    let pk_columns: HashSet<(String, String)> = raw_columns
2056        .iter()
2057        .filter(|c| c.pk > 0)
2058        .map(|c| (c.table.clone(), c.name.clone()))
2059        .collect();
2060
2061    let (columns, primary_keys) = process_columns(&raw_columns, &generated_columns, &pk_columns);
2062    let indexes = process_indexes(&all_indexes, &all_index_columns, &table_sql_map);
2063    let foreign_keys = process_foreign_keys(&all_fks);
2064    let uniques = process_sqlite_uniques_from_indexes(&all_indexes, &all_index_columns);
2065
2066    let mut ddl = SQLiteDDL::new();
2067
2068    for (table_name, table_sql) in &tables {
2069        let mut table = Table::new(table_name.clone());
2070        if let Some(sql) = table_sql {
2071            let sql_upper = sql.to_uppercase();
2072            table.strict = sql_upper.contains(" STRICT");
2073            table.without_rowid = sql_upper.contains("WITHOUT ROWID");
2074        }
2075        ddl.tables.push(table);
2076    }
2077
2078    for col in columns {
2079        ddl.columns.push(col);
2080    }
2081    for idx in indexes {
2082        ddl.indexes.push(idx);
2083    }
2084    for fk in foreign_keys {
2085        ddl.fks.push(fk);
2086    }
2087    for pk in primary_keys {
2088        ddl.pks.push(pk);
2089    }
2090
2091    for u in uniques {
2092        ddl.uniques.push(u);
2093    }
2094
2095    for v in all_views {
2096        let mut view = View::new(v.name);
2097        if let Some(def) = parse_view_sql(&v.sql) {
2098            view.definition = Some(def.into());
2099        } else {
2100            view.error = Some("Failed to parse view SQL".into());
2101        }
2102        ddl.views.push(view);
2103    }
2104
2105    let options = CodegenOptions {
2106        module_doc: Some(format!("Schema introspected from {}", path)),
2107        include_schema: true,
2108        schema_name: "Schema".to_string(),
2109        use_pub: true,
2110    };
2111
2112    let generated = generate_rust_schema(&ddl, &options);
2113
2114    // Create snapshot from DDL
2115    let mut sqlite_snapshot = drizzle_migrations::sqlite::SQLiteSnapshot::new();
2116    for entity in ddl.to_entities() {
2117        sqlite_snapshot.add_entity(entity);
2118    }
2119    let snapshot = Snapshot::Sqlite(sqlite_snapshot);
2120
2121    Ok(IntrospectResult {
2122        schema_code: generated.code,
2123        table_count: generated.tables.len(),
2124        index_count: generated.indexes.len(),
2125        view_count: ddl.views.len(),
2126        warnings: generated.warnings,
2127        snapshot,
2128        snapshot_path: std::path::PathBuf::new(),
2129    })
2130}
2131
2132// ============================================================================
2133// Turso Introspection (remote)
2134// ============================================================================
2135
2136#[cfg(feature = "turso")]
2137fn introspect_turso(url: &str, auth_token: Option<&str>) -> Result<IntrospectResult, CliError> {
2138    let rt = tokio::runtime::Builder::new_current_thread()
2139        .enable_all()
2140        .build()
2141        .map_err(|e| CliError::Other(format!("Failed to create async runtime: {}", e)))?;
2142
2143    rt.block_on(introspect_turso_inner(url, auth_token))
2144}
2145
2146#[cfg(feature = "turso")]
2147async fn introspect_turso_inner(
2148    url: &str,
2149    auth_token: Option<&str>,
2150) -> Result<IntrospectResult, CliError> {
2151    use drizzle_migrations::sqlite::{
2152        SQLiteDDL, Table, View,
2153        codegen::{CodegenOptions, generate_rust_schema},
2154        introspect::{
2155            RawColumnInfo, RawForeignKey, RawIndexColumn, RawIndexInfo, RawViewInfo,
2156            parse_generated_columns_from_table_sql, parse_view_sql, process_columns,
2157            process_foreign_keys, process_indexes, queries,
2158        },
2159    };
2160    use std::collections::{HashMap, HashSet};
2161
2162    let builder =
2163        libsql::Builder::new_remote(url.to_string(), auth_token.unwrap_or("").to_string());
2164
2165    let db = builder.build().await.map_err(|e| {
2166        CliError::ConnectionError(format!("Failed to connect to Turso '{}': {}", url, e))
2167    })?;
2168
2169    let conn = db
2170        .connect()
2171        .map_err(|e| CliError::ConnectionError(e.to_string()))?;
2172
2173    // Query tables
2174    let mut tables_rows = conn
2175        .query(queries::TABLES_QUERY, ())
2176        .await
2177        .map_err(|e| CliError::Other(format!("Failed to query tables: {}", e)))?;
2178
2179    let mut tables: Vec<(String, Option<String>)> = Vec::new();
2180    while let Ok(Some(row)) = tables_rows.next().await {
2181        let name: String = row.get(0).unwrap_or_default();
2182        let sql: Option<String> = row.get(1).ok();
2183        tables.push((name, sql));
2184    }
2185
2186    let table_sql_map: HashMap<String, String> = tables
2187        .iter()
2188        .filter_map(|(name, sql)| sql.as_ref().map(|s| (name.clone(), s.clone())))
2189        .collect();
2190
2191    // Query columns
2192    let mut columns_rows = conn
2193        .query(queries::COLUMNS_QUERY, ())
2194        .await
2195        .map_err(|e| CliError::Other(format!("Failed to query columns: {}", e)))?;
2196
2197    let mut raw_columns: Vec<RawColumnInfo> = Vec::new();
2198    while let Ok(Some(row)) = columns_rows.next().await {
2199        raw_columns.push(RawColumnInfo {
2200            table: row.get(0).unwrap_or_default(),
2201            cid: row.get(1).unwrap_or(0),
2202            name: row.get(2).unwrap_or_default(),
2203            column_type: row.get(3).unwrap_or_default(),
2204            not_null: row.get::<i32>(4).unwrap_or(0) != 0,
2205            default_value: row.get(5).ok(),
2206            pk: row.get(6).unwrap_or(0),
2207            hidden: row.get(7).unwrap_or(0),
2208            sql: row.get(8).ok(),
2209        });
2210    }
2211
2212    // Query indexes and foreign keys
2213    let mut all_indexes: Vec<RawIndexInfo> = Vec::new();
2214    let mut all_index_columns: Vec<RawIndexColumn> = Vec::new();
2215    let mut all_fks: Vec<RawForeignKey> = Vec::new();
2216    let mut all_views: Vec<RawViewInfo> = Vec::new();
2217
2218    for (table_name, _) in &tables {
2219        // Indexes
2220        if let Ok(mut idx_rows) = conn.query(&queries::indexes_query(table_name), ()).await {
2221            while let Ok(Some(row)) = idx_rows.next().await {
2222                let idx = RawIndexInfo {
2223                    table: table_name.clone(),
2224                    name: row.get(1).unwrap_or_default(),
2225                    unique: row.get::<i32>(2).unwrap_or(0) != 0,
2226                    origin: row.get(3).unwrap_or_default(),
2227                    partial: row.get::<i32>(4).unwrap_or(0) != 0,
2228                };
2229
2230                // Index columns
2231                if let Ok(mut col_rows) =
2232                    conn.query(&queries::index_info_query(&idx.name), ()).await
2233                {
2234                    while let Ok(Some(col_row)) = col_rows.next().await {
2235                        all_index_columns.push(RawIndexColumn {
2236                            index_name: idx.name.clone(),
2237                            seqno: col_row.get(0).unwrap_or(0),
2238                            cid: col_row.get(1).unwrap_or(0),
2239                            name: col_row.get(2).ok(),
2240                            desc: col_row.get::<i32>(3).unwrap_or(0) != 0,
2241                            coll: col_row.get(4).unwrap_or_default(),
2242                            key: col_row.get::<i32>(5).unwrap_or(0) != 0,
2243                        });
2244                    }
2245                }
2246
2247                all_indexes.push(idx);
2248            }
2249        }
2250
2251        // Foreign keys
2252        if let Ok(mut fk_rows) = conn
2253            .query(&queries::foreign_keys_query(table_name), ())
2254            .await
2255        {
2256            while let Ok(Some(row)) = fk_rows.next().await {
2257                all_fks.push(RawForeignKey {
2258                    table: table_name.clone(),
2259                    id: row.get(0).unwrap_or(0),
2260                    seq: row.get(1).unwrap_or(0),
2261                    to_table: row.get(2).unwrap_or_default(),
2262                    from_column: row.get(3).unwrap_or_default(),
2263                    to_column: row.get(4).unwrap_or_default(),
2264                    on_update: row.get(5).unwrap_or_default(),
2265                    on_delete: row.get(6).unwrap_or_default(),
2266                    r#match: row.get(7).unwrap_or_default(),
2267                });
2268            }
2269        }
2270    }
2271
2272    // Views
2273    if let Ok(mut views_rows) = conn.query(queries::VIEWS_QUERY, ()).await {
2274        while let Ok(Some(row)) = views_rows.next().await {
2275            let name: String = row.get(0).unwrap_or_default();
2276            let sql: String = row.get(1).unwrap_or_default();
2277            all_views.push(RawViewInfo { name, sql });
2278        }
2279    }
2280
2281    // Process into DDL
2282    let mut generated_columns: HashMap<String, drizzle_migrations::sqlite::ddl::ParsedGenerated> =
2283        HashMap::new();
2284    for (table, sql) in &table_sql_map {
2285        generated_columns.extend(parse_generated_columns_from_table_sql(table, sql));
2286    }
2287    let pk_columns: HashSet<(String, String)> = raw_columns
2288        .iter()
2289        .filter(|c| c.pk > 0)
2290        .map(|c| (c.table.clone(), c.name.clone()))
2291        .collect();
2292
2293    let (columns, primary_keys) = process_columns(&raw_columns, &generated_columns, &pk_columns);
2294    let indexes = process_indexes(&all_indexes, &all_index_columns, &table_sql_map);
2295    let foreign_keys = process_foreign_keys(&all_fks);
2296    let uniques = process_sqlite_uniques_from_indexes(&all_indexes, &all_index_columns);
2297
2298    let mut ddl = SQLiteDDL::new();
2299
2300    for (table_name, table_sql) in &tables {
2301        let mut table = Table::new(table_name.clone());
2302        if let Some(sql) = table_sql {
2303            let sql_upper = sql.to_uppercase();
2304            table.strict = sql_upper.contains(" STRICT");
2305            table.without_rowid = sql_upper.contains("WITHOUT ROWID");
2306        }
2307        ddl.tables.push(table);
2308    }
2309
2310    for col in columns {
2311        ddl.columns.push(col);
2312    }
2313    for idx in indexes {
2314        ddl.indexes.push(idx);
2315    }
2316    for fk in foreign_keys {
2317        ddl.fks.push(fk);
2318    }
2319    for pk in primary_keys {
2320        ddl.pks.push(pk);
2321    }
2322
2323    for u in uniques {
2324        ddl.uniques.push(u);
2325    }
2326
2327    for v in all_views {
2328        let mut view = View::new(v.name);
2329        if let Some(def) = parse_view_sql(&v.sql) {
2330            view.definition = Some(def.into());
2331        } else {
2332            view.error = Some("Failed to parse view SQL".into());
2333        }
2334        ddl.views.push(view);
2335    }
2336
2337    let options = CodegenOptions {
2338        module_doc: Some(format!("Schema introspected from Turso: {}", url)),
2339        include_schema: true,
2340        schema_name: "Schema".to_string(),
2341        use_pub: true,
2342    };
2343
2344    let generated = generate_rust_schema(&ddl, &options);
2345
2346    // Create snapshot from DDL
2347    let mut sqlite_snapshot = drizzle_migrations::sqlite::SQLiteSnapshot::new();
2348    for entity in ddl.to_entities() {
2349        sqlite_snapshot.add_entity(entity);
2350    }
2351    let snapshot = Snapshot::Sqlite(sqlite_snapshot);
2352
2353    Ok(IntrospectResult {
2354        schema_code: generated.code,
2355        table_count: generated.tables.len(),
2356        index_count: generated.indexes.len(),
2357        view_count: ddl.views.len(),
2358        warnings: generated.warnings,
2359        snapshot,
2360        snapshot_path: std::path::PathBuf::new(),
2361    })
2362}
2363
2364// ============================================================================
2365// PostgreSQL Introspection
2366// ============================================================================
2367
2368#[cfg(feature = "postgres-sync")]
2369fn introspect_postgres_sync(creds: &PostgresCreds) -> Result<IntrospectResult, CliError> {
2370    use drizzle_migrations::postgres::{
2371        PostgresDDL,
2372        codegen::{CodegenOptions, generate_rust_schema},
2373        ddl::Schema,
2374        introspect::{
2375            RawCheckInfo, RawColumnInfo, RawEnumInfo, RawForeignKeyInfo, RawIndexInfo,
2376            RawPolicyInfo, RawPrimaryKeyInfo, RawRoleInfo, RawSequenceInfo, RawTableInfo,
2377            RawUniqueInfo, RawViewInfo, process_check_constraints, process_columns, process_enums,
2378            process_foreign_keys, process_indexes, process_policies, process_primary_keys,
2379            process_roles, process_sequences, process_tables, process_unique_constraints,
2380            process_views,
2381        },
2382    };
2383
2384    let url = creds.connection_url();
2385    let mut client = postgres::Client::connect(&url, postgres::NoTls).map_err(|e| {
2386        CliError::ConnectionError(format!("Failed to connect to PostgreSQL: {}", e))
2387    })?;
2388
2389    // Schemas
2390    let raw_schemas: Vec<RawSchemaInfo> = client
2391        .query(
2392            drizzle_migrations::postgres::introspect::queries::SCHEMAS_QUERY,
2393            &[],
2394        )
2395        .map_err(|e| CliError::Other(format!("Failed to query schemas: {}", e)))?
2396        .into_iter()
2397        .map(|row| RawSchemaInfo {
2398            name: row.get::<_, String>(0),
2399        })
2400        .collect();
2401
2402    // Tables
2403    let raw_tables: Vec<RawTableInfo> = client
2404        .query(
2405            drizzle_migrations::postgres::introspect::queries::TABLES_QUERY,
2406            &[],
2407        )
2408        .map_err(|e| CliError::Other(format!("Failed to query tables: {}", e)))?
2409        .into_iter()
2410        .map(|row| RawTableInfo {
2411            schema: row.get::<_, String>(0),
2412            name: row.get::<_, String>(1),
2413            is_rls_enabled: row.get::<_, bool>(2),
2414        })
2415        .collect();
2416
2417    // Columns
2418    let raw_columns: Vec<RawColumnInfo> = client
2419        .query(
2420            drizzle_migrations::postgres::introspect::queries::COLUMNS_QUERY,
2421            &[],
2422        )
2423        .map_err(|e| CliError::Other(format!("Failed to query columns: {}", e)))?
2424        .into_iter()
2425        .map(|row| RawColumnInfo {
2426            schema: row.get::<_, String>(0),
2427            table: row.get::<_, String>(1),
2428            name: row.get::<_, String>(2),
2429            column_type: row.get::<_, String>(3),
2430            type_schema: row.get::<_, Option<String>>(4),
2431            not_null: row.get::<_, bool>(5),
2432            default_value: row.get::<_, Option<String>>(6),
2433            is_identity: row.get::<_, bool>(7),
2434            identity_type: row.get::<_, Option<String>>(8),
2435            is_generated: row.get::<_, bool>(9),
2436            generated_expression: row.get::<_, Option<String>>(10),
2437            ordinal_position: row.get::<_, i32>(11),
2438        })
2439        .collect();
2440
2441    // Enums
2442    let raw_enums: Vec<RawEnumInfo> = client
2443        .query(
2444            drizzle_migrations::postgres::introspect::queries::ENUMS_QUERY,
2445            &[],
2446        )
2447        .map_err(|e| CliError::Other(format!("Failed to query enums: {}", e)))?
2448        .into_iter()
2449        .map(|row| RawEnumInfo {
2450            schema: row.get::<_, String>(0),
2451            name: row.get::<_, String>(1),
2452            values: row.get::<_, Vec<String>>(2),
2453        })
2454        .collect();
2455
2456    // Sequences
2457    let raw_sequences: Vec<RawSequenceInfo> = client
2458        .query(
2459            drizzle_migrations::postgres::introspect::queries::SEQUENCES_QUERY,
2460            &[],
2461        )
2462        .map_err(|e| CliError::Other(format!("Failed to query sequences: {}", e)))?
2463        .into_iter()
2464        .map(|row| RawSequenceInfo {
2465            schema: row.get::<_, String>(0),
2466            name: row.get::<_, String>(1),
2467            data_type: row.get::<_, String>(2),
2468            start_value: row.get::<_, String>(3),
2469            min_value: row.get::<_, String>(4),
2470            max_value: row.get::<_, String>(5),
2471            increment: row.get::<_, String>(6),
2472            cycle: row.get::<_, bool>(7),
2473            cache_value: row.get::<_, String>(8),
2474        })
2475        .collect();
2476
2477    // Views
2478    let raw_views: Vec<RawViewInfo> = client
2479        .query(
2480            drizzle_migrations::postgres::introspect::queries::VIEWS_QUERY,
2481            &[],
2482        )
2483        .map_err(|e| CliError::Other(format!("Failed to query views: {}", e)))?
2484        .into_iter()
2485        .map(|row| RawViewInfo {
2486            schema: row.get::<_, String>(0),
2487            name: row.get::<_, String>(1),
2488            definition: row.get::<_, String>(2),
2489            is_materialized: row.get::<_, bool>(3),
2490        })
2491        .collect();
2492
2493    // Indexes (custom query; drizzle_migrations provides processing types but not SQL)
2494    let raw_indexes: Vec<RawIndexInfo> = client
2495        .query(POSTGRES_INDEXES_QUERY, &[])
2496        .map_err(|e| CliError::Other(format!("Failed to query indexes: {}", e)))?
2497        .into_iter()
2498        .map(|row| {
2499            let cols: Vec<String> = row.get(6);
2500            RawIndexInfo {
2501                schema: row.get::<_, String>(0),
2502                table: row.get::<_, String>(1),
2503                name: row.get::<_, String>(2),
2504                is_unique: row.get::<_, bool>(3),
2505                is_primary: row.get::<_, bool>(4),
2506                method: row.get::<_, String>(5),
2507                columns: parse_postgres_index_columns(cols),
2508                where_clause: row.get::<_, Option<String>>(7),
2509                concurrent: false,
2510            }
2511        })
2512        .collect();
2513
2514    // Foreign keys
2515    let raw_fks: Vec<RawForeignKeyInfo> = client
2516        .query(POSTGRES_FOREIGN_KEYS_QUERY, &[])
2517        .map_err(|e| CliError::Other(format!("Failed to query foreign keys: {}", e)))?
2518        .into_iter()
2519        .map(|row| RawForeignKeyInfo {
2520            schema: row.get::<_, String>(0),
2521            table: row.get::<_, String>(1),
2522            name: row.get::<_, String>(2),
2523            columns: row.get::<_, Vec<String>>(3),
2524            schema_to: row.get::<_, String>(4),
2525            table_to: row.get::<_, String>(5),
2526            columns_to: row.get::<_, Vec<String>>(6),
2527            on_update: pg_action_code_to_string(row.get::<_, String>(7)),
2528            on_delete: pg_action_code_to_string(row.get::<_, String>(8)),
2529        })
2530        .collect();
2531
2532    // Primary keys
2533    let raw_pks: Vec<RawPrimaryKeyInfo> = client
2534        .query(POSTGRES_PRIMARY_KEYS_QUERY, &[])
2535        .map_err(|e| CliError::Other(format!("Failed to query primary keys: {}", e)))?
2536        .into_iter()
2537        .map(|row| RawPrimaryKeyInfo {
2538            schema: row.get::<_, String>(0),
2539            table: row.get::<_, String>(1),
2540            name: row.get::<_, String>(2),
2541            columns: row.get::<_, Vec<String>>(3),
2542        })
2543        .collect();
2544
2545    // Unique constraints
2546    let raw_uniques: Vec<RawUniqueInfo> = client
2547        .query(POSTGRES_UNIQUES_QUERY, &[])
2548        .map_err(|e| CliError::Other(format!("Failed to query unique constraints: {}", e)))?
2549        .into_iter()
2550        .map(|row| RawUniqueInfo {
2551            schema: row.get::<_, String>(0),
2552            table: row.get::<_, String>(1),
2553            name: row.get::<_, String>(2),
2554            columns: row.get::<_, Vec<String>>(3),
2555            nulls_not_distinct: row.get::<_, bool>(4),
2556        })
2557        .collect();
2558
2559    // Check constraints
2560    let raw_checks: Vec<RawCheckInfo> = client
2561        .query(POSTGRES_CHECKS_QUERY, &[])
2562        .map_err(|e| CliError::Other(format!("Failed to query check constraints: {}", e)))?
2563        .into_iter()
2564        .map(|row| RawCheckInfo {
2565            schema: row.get::<_, String>(0),
2566            table: row.get::<_, String>(1),
2567            name: row.get::<_, String>(2),
2568            expression: row.get::<_, String>(3),
2569        })
2570        .collect();
2571
2572    // Roles (optional but useful for snapshot parity)
2573    let raw_roles: Vec<RawRoleInfo> = client
2574        .query(POSTGRES_ROLES_QUERY, &[])
2575        .map_err(|e| CliError::Other(format!("Failed to query roles: {}", e)))?
2576        .into_iter()
2577        .map(|row| RawRoleInfo {
2578            name: row.get::<_, String>(0),
2579            create_db: row.get::<_, bool>(1),
2580            create_role: row.get::<_, bool>(2),
2581            inherit: row.get::<_, bool>(3),
2582        })
2583        .collect();
2584
2585    // Policies
2586    let raw_policies: Vec<RawPolicyInfo> = client
2587        .query(POSTGRES_POLICIES_QUERY, &[])
2588        .map_err(|e| CliError::Other(format!("Failed to query policies: {}", e)))?
2589        .into_iter()
2590        .map(|row| RawPolicyInfo {
2591            schema: row.get::<_, String>(0),
2592            table: row.get::<_, String>(1),
2593            name: row.get::<_, String>(2),
2594            as_clause: row.get::<_, String>(3),
2595            for_clause: row.get::<_, String>(4),
2596            to: row.get::<_, Vec<String>>(5),
2597            using: row.get::<_, Option<String>>(6),
2598            with_check: row.get::<_, Option<String>>(7),
2599        })
2600        .collect();
2601
2602    // Process raw -> DDL entities
2603    let mut ddl = PostgresDDL::new();
2604
2605    for s in raw_schemas.into_iter().map(|s| Schema::new(s.name)) {
2606        ddl.schemas.push(s);
2607    }
2608    for e in process_enums(&raw_enums) {
2609        ddl.enums.push(e);
2610    }
2611    for s in process_sequences(&raw_sequences) {
2612        ddl.sequences.push(s);
2613    }
2614    for r in process_roles(&raw_roles) {
2615        ddl.roles.push(r);
2616    }
2617    for p in process_policies(&raw_policies) {
2618        ddl.policies.push(p);
2619    }
2620    for t in process_tables(&raw_tables) {
2621        ddl.tables.push(t);
2622    }
2623    for c in process_columns(&raw_columns) {
2624        ddl.columns.push(c);
2625    }
2626    for i in process_indexes(&raw_indexes) {
2627        ddl.indexes.push(i);
2628    }
2629    for fk in process_foreign_keys(&raw_fks) {
2630        ddl.fks.push(fk);
2631    }
2632    for pk in process_primary_keys(&raw_pks) {
2633        ddl.pks.push(pk);
2634    }
2635    for u in process_unique_constraints(&raw_uniques) {
2636        ddl.uniques.push(u);
2637    }
2638    for c in process_check_constraints(&raw_checks) {
2639        ddl.checks.push(c);
2640    }
2641    for v in process_views(&raw_views) {
2642        ddl.views.push(v);
2643    }
2644
2645    // Generate Rust schema code
2646    let options = CodegenOptions {
2647        module_doc: Some(format!("Schema introspected from {}", mask_url(&url))),
2648        include_schema: true,
2649        schema_name: "Schema".to_string(),
2650        use_pub: true,
2651    };
2652    let generated = generate_rust_schema(&ddl, &options);
2653
2654    // Build snapshot
2655    let mut snap = drizzle_migrations::postgres::PostgresSnapshot::new();
2656    for entity in ddl.to_entities() {
2657        snap.add_entity(entity);
2658    }
2659
2660    Ok(IntrospectResult {
2661        schema_code: generated.code,
2662        table_count: ddl.tables.list().len(),
2663        index_count: ddl.indexes.list().len(),
2664        view_count: ddl.views.list().len(),
2665        warnings: generated.warnings,
2666        snapshot: Snapshot::Postgres(snap),
2667        snapshot_path: std::path::PathBuf::new(),
2668    })
2669}
2670
2671#[cfg(feature = "tokio-postgres")]
2672#[allow(dead_code)]
2673fn introspect_postgres_async(creds: &PostgresCreds) -> Result<IntrospectResult, CliError> {
2674    let rt = tokio::runtime::Builder::new_current_thread()
2675        .enable_all()
2676        .build()
2677        .map_err(|e| CliError::Other(format!("Failed to create async runtime: {}", e)))?;
2678
2679    rt.block_on(introspect_postgres_async_inner(creds))
2680}
2681
2682#[cfg(feature = "tokio-postgres")]
2683async fn introspect_postgres_async_inner(
2684    creds: &PostgresCreds,
2685) -> Result<IntrospectResult, CliError> {
2686    use drizzle_migrations::postgres::{
2687        PostgresDDL,
2688        codegen::{CodegenOptions, generate_rust_schema},
2689        ddl::Schema,
2690        introspect::{
2691            RawCheckInfo, RawColumnInfo, RawEnumInfo, RawForeignKeyInfo, RawIndexInfo,
2692            RawPolicyInfo, RawPrimaryKeyInfo, RawRoleInfo, RawSequenceInfo, RawTableInfo,
2693            RawUniqueInfo, RawViewInfo, process_check_constraints, process_columns, process_enums,
2694            process_foreign_keys, process_indexes, process_policies, process_primary_keys,
2695            process_roles, process_sequences, process_tables, process_unique_constraints,
2696            process_views,
2697        },
2698    };
2699
2700    let url = creds.connection_url();
2701    let (client, connection) = tokio_postgres::connect(&url, tokio_postgres::NoTls)
2702        .await
2703        .map_err(|e| {
2704            CliError::ConnectionError(format!("Failed to connect to PostgreSQL: {}", e))
2705        })?;
2706
2707    tokio::spawn(async move {
2708        if let Err(e) = connection.await {
2709            eprintln!(
2710                "{}",
2711                output::err_line(&format!("PostgreSQL connection error: {e}"))
2712            );
2713        }
2714    });
2715
2716    let raw_schemas: Vec<RawSchemaInfo> = client
2717        .query(
2718            drizzle_migrations::postgres::introspect::queries::SCHEMAS_QUERY,
2719            &[],
2720        )
2721        .await
2722        .map_err(|e| CliError::Other(format!("Failed to query schemas: {}", e)))?
2723        .into_iter()
2724        .map(|row| RawSchemaInfo {
2725            name: row.get::<_, String>(0),
2726        })
2727        .collect();
2728
2729    let raw_tables: Vec<RawTableInfo> = client
2730        .query(
2731            drizzle_migrations::postgres::introspect::queries::TABLES_QUERY,
2732            &[],
2733        )
2734        .await
2735        .map_err(|e| CliError::Other(format!("Failed to query tables: {}", e)))?
2736        .into_iter()
2737        .map(|row| RawTableInfo {
2738            schema: row.get::<_, String>(0),
2739            name: row.get::<_, String>(1),
2740            is_rls_enabled: row.get::<_, bool>(2),
2741        })
2742        .collect();
2743
2744    let raw_columns: Vec<RawColumnInfo> = client
2745        .query(
2746            drizzle_migrations::postgres::introspect::queries::COLUMNS_QUERY,
2747            &[],
2748        )
2749        .await
2750        .map_err(|e| CliError::Other(format!("Failed to query columns: {}", e)))?
2751        .into_iter()
2752        .map(|row| RawColumnInfo {
2753            schema: row.get::<_, String>(0),
2754            table: row.get::<_, String>(1),
2755            name: row.get::<_, String>(2),
2756            column_type: row.get::<_, String>(3),
2757            type_schema: row.get::<_, Option<String>>(4),
2758            not_null: row.get::<_, bool>(5),
2759            default_value: row.get::<_, Option<String>>(6),
2760            is_identity: row.get::<_, bool>(7),
2761            identity_type: row.get::<_, Option<String>>(8),
2762            is_generated: row.get::<_, bool>(9),
2763            generated_expression: row.get::<_, Option<String>>(10),
2764            ordinal_position: row.get::<_, i32>(11),
2765        })
2766        .collect();
2767
2768    let raw_enums: Vec<RawEnumInfo> = client
2769        .query(
2770            drizzle_migrations::postgres::introspect::queries::ENUMS_QUERY,
2771            &[],
2772        )
2773        .await
2774        .map_err(|e| CliError::Other(format!("Failed to query enums: {}", e)))?
2775        .into_iter()
2776        .map(|row| RawEnumInfo {
2777            schema: row.get::<_, String>(0),
2778            name: row.get::<_, String>(1),
2779            values: row.get::<_, Vec<String>>(2),
2780        })
2781        .collect();
2782
2783    let raw_sequences: Vec<RawSequenceInfo> = client
2784        .query(
2785            drizzle_migrations::postgres::introspect::queries::SEQUENCES_QUERY,
2786            &[],
2787        )
2788        .await
2789        .map_err(|e| CliError::Other(format!("Failed to query sequences: {}", e)))?
2790        .into_iter()
2791        .map(|row| RawSequenceInfo {
2792            schema: row.get::<_, String>(0),
2793            name: row.get::<_, String>(1),
2794            data_type: row.get::<_, String>(2),
2795            start_value: row.get::<_, String>(3),
2796            min_value: row.get::<_, String>(4),
2797            max_value: row.get::<_, String>(5),
2798            increment: row.get::<_, String>(6),
2799            cycle: row.get::<_, bool>(7),
2800            cache_value: row.get::<_, String>(8),
2801        })
2802        .collect();
2803
2804    let raw_views: Vec<RawViewInfo> = client
2805        .query(
2806            drizzle_migrations::postgres::introspect::queries::VIEWS_QUERY,
2807            &[],
2808        )
2809        .await
2810        .map_err(|e| CliError::Other(format!("Failed to query views: {}", e)))?
2811        .into_iter()
2812        .map(|row| RawViewInfo {
2813            schema: row.get::<_, String>(0),
2814            name: row.get::<_, String>(1),
2815            definition: row.get::<_, String>(2),
2816            is_materialized: row.get::<_, bool>(3),
2817        })
2818        .collect();
2819
2820    let raw_indexes: Vec<RawIndexInfo> = client
2821        .query(POSTGRES_INDEXES_QUERY, &[])
2822        .await
2823        .map_err(|e| CliError::Other(format!("Failed to query indexes: {}", e)))?
2824        .into_iter()
2825        .map(|row| {
2826            let cols: Vec<String> = row.get(6);
2827            RawIndexInfo {
2828                schema: row.get::<_, String>(0),
2829                table: row.get::<_, String>(1),
2830                name: row.get::<_, String>(2),
2831                is_unique: row.get::<_, bool>(3),
2832                is_primary: row.get::<_, bool>(4),
2833                method: row.get::<_, String>(5),
2834                columns: parse_postgres_index_columns(cols),
2835                where_clause: row.get::<_, Option<String>>(7),
2836                concurrent: false,
2837            }
2838        })
2839        .collect();
2840
2841    let raw_fks: Vec<RawForeignKeyInfo> = client
2842        .query(POSTGRES_FOREIGN_KEYS_QUERY, &[])
2843        .await
2844        .map_err(|e| CliError::Other(format!("Failed to query foreign keys: {}", e)))?
2845        .into_iter()
2846        .map(|row| RawForeignKeyInfo {
2847            schema: row.get::<_, String>(0),
2848            table: row.get::<_, String>(1),
2849            name: row.get::<_, String>(2),
2850            columns: row.get::<_, Vec<String>>(3),
2851            schema_to: row.get::<_, String>(4),
2852            table_to: row.get::<_, String>(5),
2853            columns_to: row.get::<_, Vec<String>>(6),
2854            on_update: pg_action_code_to_string(row.get::<_, String>(7)),
2855            on_delete: pg_action_code_to_string(row.get::<_, String>(8)),
2856        })
2857        .collect();
2858
2859    let raw_pks: Vec<RawPrimaryKeyInfo> = client
2860        .query(POSTGRES_PRIMARY_KEYS_QUERY, &[])
2861        .await
2862        .map_err(|e| CliError::Other(format!("Failed to query primary keys: {}", e)))?
2863        .into_iter()
2864        .map(|row| RawPrimaryKeyInfo {
2865            schema: row.get::<_, String>(0),
2866            table: row.get::<_, String>(1),
2867            name: row.get::<_, String>(2),
2868            columns: row.get::<_, Vec<String>>(3),
2869        })
2870        .collect();
2871
2872    let raw_uniques: Vec<RawUniqueInfo> = client
2873        .query(POSTGRES_UNIQUES_QUERY, &[])
2874        .await
2875        .map_err(|e| CliError::Other(format!("Failed to query unique constraints: {}", e)))?
2876        .into_iter()
2877        .map(|row| RawUniqueInfo {
2878            schema: row.get::<_, String>(0),
2879            table: row.get::<_, String>(1),
2880            name: row.get::<_, String>(2),
2881            columns: row.get::<_, Vec<String>>(3),
2882            nulls_not_distinct: row.get::<_, bool>(4),
2883        })
2884        .collect();
2885
2886    let raw_checks: Vec<RawCheckInfo> = client
2887        .query(POSTGRES_CHECKS_QUERY, &[])
2888        .await
2889        .map_err(|e| CliError::Other(format!("Failed to query check constraints: {}", e)))?
2890        .into_iter()
2891        .map(|row| RawCheckInfo {
2892            schema: row.get::<_, String>(0),
2893            table: row.get::<_, String>(1),
2894            name: row.get::<_, String>(2),
2895            expression: row.get::<_, String>(3),
2896        })
2897        .collect();
2898
2899    let raw_roles: Vec<RawRoleInfo> = client
2900        .query(POSTGRES_ROLES_QUERY, &[])
2901        .await
2902        .map_err(|e| CliError::Other(format!("Failed to query roles: {}", e)))?
2903        .into_iter()
2904        .map(|row| RawRoleInfo {
2905            name: row.get::<_, String>(0),
2906            create_db: row.get::<_, bool>(1),
2907            create_role: row.get::<_, bool>(2),
2908            inherit: row.get::<_, bool>(3),
2909        })
2910        .collect();
2911
2912    let raw_policies: Vec<RawPolicyInfo> = client
2913        .query(POSTGRES_POLICIES_QUERY, &[])
2914        .await
2915        .map_err(|e| CliError::Other(format!("Failed to query policies: {}", e)))?
2916        .into_iter()
2917        .map(|row| RawPolicyInfo {
2918            schema: row.get::<_, String>(0),
2919            table: row.get::<_, String>(1),
2920            name: row.get::<_, String>(2),
2921            as_clause: row.get::<_, String>(3),
2922            for_clause: row.get::<_, String>(4),
2923            to: row.get::<_, Vec<String>>(5),
2924            using: row.get::<_, Option<String>>(6),
2925            with_check: row.get::<_, Option<String>>(7),
2926        })
2927        .collect();
2928
2929    let mut ddl = PostgresDDL::new();
2930    for s in raw_schemas.into_iter().map(|s| Schema::new(s.name)) {
2931        ddl.schemas.push(s);
2932    }
2933    for e in process_enums(&raw_enums) {
2934        ddl.enums.push(e);
2935    }
2936    for s in process_sequences(&raw_sequences) {
2937        ddl.sequences.push(s);
2938    }
2939    for r in process_roles(&raw_roles) {
2940        ddl.roles.push(r);
2941    }
2942    for p in process_policies(&raw_policies) {
2943        ddl.policies.push(p);
2944    }
2945    for t in process_tables(&raw_tables) {
2946        ddl.tables.push(t);
2947    }
2948    for c in process_columns(&raw_columns) {
2949        ddl.columns.push(c);
2950    }
2951    for i in process_indexes(&raw_indexes) {
2952        ddl.indexes.push(i);
2953    }
2954    for fk in process_foreign_keys(&raw_fks) {
2955        ddl.fks.push(fk);
2956    }
2957    for pk in process_primary_keys(&raw_pks) {
2958        ddl.pks.push(pk);
2959    }
2960    for u in process_unique_constraints(&raw_uniques) {
2961        ddl.uniques.push(u);
2962    }
2963    for c in process_check_constraints(&raw_checks) {
2964        ddl.checks.push(c);
2965    }
2966    for v in process_views(&raw_views) {
2967        ddl.views.push(v);
2968    }
2969
2970    let options = CodegenOptions {
2971        module_doc: Some(format!("Schema introspected from {}", mask_url(&url))),
2972        include_schema: true,
2973        schema_name: "Schema".to_string(),
2974        use_pub: true,
2975    };
2976    let generated = generate_rust_schema(&ddl, &options);
2977
2978    let mut snap = drizzle_migrations::postgres::PostgresSnapshot::new();
2979    for entity in ddl.to_entities() {
2980        snap.add_entity(entity);
2981    }
2982
2983    Ok(IntrospectResult {
2984        schema_code: generated.code,
2985        table_count: ddl.tables.list().len(),
2986        index_count: ddl.indexes.list().len(),
2987        view_count: ddl.views.list().len(),
2988        warnings: generated.warnings,
2989        snapshot: Snapshot::Postgres(snap),
2990        snapshot_path: std::path::PathBuf::new(),
2991    })
2992}
2993
2994// =============================================================================
2995// PostgreSQL Introspection Queries (CLI-side)
2996// =============================================================================
2997
2998/// Minimal schema list for snapshot
2999#[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
3000#[derive(Debug, Clone)]
3001struct RawSchemaInfo {
3002    name: String,
3003}
3004
3005#[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
3006const POSTGRES_INDEXES_QUERY: &str = r#"
3007SELECT
3008    ns.nspname AS schema,
3009    tbl.relname AS table,
3010    idx.relname AS name,
3011    ix.indisunique AS is_unique,
3012    ix.indisprimary AS is_primary,
3013    am.amname AS method,
3014    array_agg(pg_get_indexdef(ix.indexrelid, s.n, true) ORDER BY s.n) AS columns,
3015    pg_get_expr(ix.indpred, ix.indrelid) AS where_clause
3016FROM pg_index ix
3017JOIN pg_class idx ON idx.oid = ix.indexrelid
3018JOIN pg_class tbl ON tbl.oid = ix.indrelid
3019JOIN pg_namespace ns ON ns.oid = tbl.relnamespace
3020JOIN pg_am am ON am.oid = idx.relam
3021JOIN generate_series(1, ix.indnkeyatts) AS s(n) ON TRUE
3022WHERE ns.nspname NOT LIKE 'pg_%'
3023  AND ns.nspname <> 'information_schema'
3024GROUP BY ns.nspname, tbl.relname, idx.relname, ix.indisunique, ix.indisprimary, am.amname, ix.indpred, ix.indrelid
3025ORDER BY ns.nspname, tbl.relname, idx.relname
3026"#;
3027
3028#[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
3029const POSTGRES_FOREIGN_KEYS_QUERY: &str = r#"
3030SELECT
3031    ns.nspname AS schema,
3032    tbl.relname AS table,
3033    con.conname AS name,
3034    array_agg(src.attname ORDER BY s.ord) AS columns,
3035    ns_to.nspname AS schema_to,
3036    tbl_to.relname AS table_to,
3037    array_agg(dst.attname ORDER BY s.ord) AS columns_to,
3038    con.confupdtype::text AS on_update,
3039    con.confdeltype::text AS on_delete
3040FROM pg_constraint con
3041JOIN pg_class tbl ON tbl.oid = con.conrelid
3042JOIN pg_namespace ns ON ns.oid = tbl.relnamespace
3043JOIN pg_class tbl_to ON tbl_to.oid = con.confrelid
3044JOIN pg_namespace ns_to ON ns_to.oid = tbl_to.relnamespace
3045JOIN unnest(con.conkey) WITH ORDINALITY AS s(attnum, ord) ON TRUE
3046JOIN pg_attribute src ON src.attrelid = tbl.oid AND src.attnum = s.attnum
3047JOIN unnest(con.confkey) WITH ORDINALITY AS r(attnum, ord) ON r.ord = s.ord
3048JOIN pg_attribute dst ON dst.attrelid = tbl_to.oid AND dst.attnum = r.attnum
3049WHERE con.contype = 'f'
3050  AND ns.nspname NOT LIKE 'pg_%'
3051  AND ns.nspname <> 'information_schema'
3052GROUP BY ns.nspname, tbl.relname, con.conname, ns_to.nspname, tbl_to.relname, con.confupdtype, con.confdeltype
3053ORDER BY ns.nspname, tbl.relname, con.conname
3054"#;
3055
3056#[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
3057const POSTGRES_PRIMARY_KEYS_QUERY: &str = r#"
3058SELECT
3059    ns.nspname AS schema,
3060    tbl.relname AS table,
3061    con.conname AS name,
3062    array_agg(att.attname ORDER BY s.ord) AS columns
3063FROM pg_constraint con
3064JOIN pg_class tbl ON tbl.oid = con.conrelid
3065JOIN pg_namespace ns ON ns.oid = tbl.relnamespace
3066JOIN unnest(con.conkey) WITH ORDINALITY AS s(attnum, ord) ON TRUE
3067JOIN pg_attribute att ON att.attrelid = tbl.oid AND att.attnum = s.attnum
3068WHERE con.contype = 'p'
3069  AND ns.nspname NOT LIKE 'pg_%'
3070  AND ns.nspname <> 'information_schema'
3071GROUP BY ns.nspname, tbl.relname, con.conname
3072ORDER BY ns.nspname, tbl.relname, con.conname
3073"#;
3074
3075#[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
3076const POSTGRES_UNIQUES_QUERY: &str = r#"
3077SELECT
3078    ns.nspname AS schema,
3079    tbl.relname AS table,
3080    con.conname AS name,
3081    array_agg(att.attname ORDER BY s.ord) AS columns,
3082    COALESCE(con.connullsnotdistinct, FALSE) AS nulls_not_distinct
3083FROM pg_constraint con
3084JOIN pg_class tbl ON tbl.oid = con.conrelid
3085JOIN pg_namespace ns ON ns.oid = tbl.relnamespace
3086JOIN unnest(con.conkey) WITH ORDINALITY AS s(attnum, ord) ON TRUE
3087JOIN pg_attribute att ON att.attrelid = tbl.oid AND att.attnum = s.attnum
3088WHERE con.contype = 'u'
3089  AND ns.nspname NOT LIKE 'pg_%'
3090  AND ns.nspname <> 'information_schema'
3091GROUP BY ns.nspname, tbl.relname, con.conname, con.connullsnotdistinct
3092ORDER BY ns.nspname, tbl.relname, con.conname
3093"#;
3094
3095#[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
3096const POSTGRES_CHECKS_QUERY: &str = r#"
3097SELECT
3098    ns.nspname AS schema,
3099    tbl.relname AS table,
3100    con.conname AS name,
3101    pg_get_expr(con.conbin, con.conrelid) AS expression
3102FROM pg_constraint con
3103JOIN pg_class tbl ON tbl.oid = con.conrelid
3104JOIN pg_namespace ns ON ns.oid = tbl.relnamespace
3105WHERE con.contype = 'c'
3106  AND ns.nspname NOT LIKE 'pg_%'
3107  AND ns.nspname <> 'information_schema'
3108ORDER BY ns.nspname, tbl.relname, con.conname
3109"#;
3110
3111#[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
3112const POSTGRES_ROLES_QUERY: &str = r#"
3113SELECT
3114    rolname AS name,
3115    rolcreatedb AS create_db,
3116    rolcreaterole AS create_role,
3117    rolinherit AS inherit
3118FROM pg_roles
3119ORDER BY rolname
3120"#;
3121
3122#[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
3123const POSTGRES_POLICIES_QUERY: &str = r#"
3124SELECT
3125    schemaname AS schema,
3126    tablename AS table,
3127    policyname AS name,
3128    CASE WHEN permissive THEN 'PERMISSIVE' ELSE 'RESTRICTIVE' END AS as_clause,
3129    upper(cmd) AS for_clause,
3130    roles AS to,
3131    qual AS using,
3132    with_check AS with_check
3133FROM pg_policies
3134WHERE schemaname NOT LIKE 'pg_%'
3135  AND schemaname <> 'information_schema'
3136ORDER BY schemaname, tablename, policyname
3137"#;
3138
3139#[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
3140fn pg_action_code_to_string(code: String) -> String {
3141    match code.as_str() {
3142        "a" => "NO ACTION",
3143        "r" => "RESTRICT",
3144        "c" => "CASCADE",
3145        "n" => "SET NULL",
3146        "d" => "SET DEFAULT",
3147        _ => "NO ACTION",
3148    }
3149    .to_string()
3150}
3151
3152#[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
3153fn parse_postgres_index_columns(
3154    cols: Vec<String>,
3155) -> Vec<drizzle_migrations::postgres::introspect::RawIndexColumnInfo> {
3156    use drizzle_migrations::postgres::introspect::RawIndexColumnInfo;
3157    cols.into_iter()
3158        .map(|c| {
3159            let trimmed = c.trim().to_string();
3160            let upper = trimmed.to_uppercase();
3161
3162            let asc = !upper.contains(" DESC");
3163            let nulls_first = upper.contains(" NULLS FIRST");
3164
3165            // Strip sort/nulls directives for opclass parsing / expression detection.
3166            let mut core = trimmed.clone();
3167            for token in [" ASC", " DESC", " NULLS FIRST", " NULLS LAST"] {
3168                if let Some(pos) = core.to_uppercase().find(token) {
3169                    core.truncate(pos);
3170                    break;
3171                }
3172            }
3173            let core = core.trim().to_string();
3174
3175            // Heuristic: treat as expression if it contains parentheses or spaces.
3176            let is_expression = core.contains('(')
3177                || core.contains(')')
3178                || core.contains(' ')
3179                || core.contains("::");
3180
3181            // Heuristic opclass parsing: split whitespace and take second token if it looks like opclass.
3182            let mut opclass: Option<String> = None;
3183            let mut name = core.clone();
3184            let parts: Vec<&str> = core.split_whitespace().collect();
3185            if parts.len() >= 2 {
3186                let second = parts[1];
3187                if !matches!(second.to_uppercase().as_str(), "ASC" | "DESC" | "NULLS") {
3188                    opclass = Some(second.to_string());
3189                    name = parts[0].to_string();
3190                }
3191            }
3192
3193            RawIndexColumnInfo {
3194                name,
3195                is_expression,
3196                asc,
3197                nulls_first,
3198                opclass,
3199            }
3200        })
3201        .collect()
3202}
3203
3204#[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
3205fn mask_url(url: &str) -> String {
3206    if let Some(at) = url.find('@')
3207        && let Some(colon) = url[..at].rfind(':')
3208    {
3209        let scheme_end = url.find("://").map(|p| p + 3).unwrap_or(0);
3210        if colon > scheme_end {
3211            return format!("{}****{}", &url[..colon + 1], &url[at..]);
3212        }
3213    }
3214    url.to_string()
3215}