Skip to main content

drizzle_cli/db/
mod.rs

1//! Database connection and migration execution for CLI commands
2//!
3//! This module provides database connectivity for running migrations and other
4//! database operations from the CLI.
5
6use std::path::Path;
7
8#[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
9use crate::config::PostgresCreds;
10use crate::config::{Credentials, Dialect};
11use crate::error::CliError;
12use crate::output;
13use drizzle_migrations::MigrationSet;
14use drizzle_migrations::schema::Snapshot;
15
16/// Result of a migration run
17#[derive(Debug)]
18pub struct MigrationResult {
19    /// Number of migrations applied
20    pub applied_count: usize,
21    /// Tags of applied migrations
22    pub applied_migrations: Vec<String>,
23}
24
25/// Planned SQL changes for `drizzle push`
26#[derive(Debug, Clone)]
27pub struct PushPlan {
28    pub sql_statements: Vec<String>,
29    pub warnings: Vec<String>,
30    pub destructive: bool,
31}
32
33/// Plan a push by introspecting the live database and diffing against the desired snapshot.
34pub fn plan_push(
35    credentials: &Credentials,
36    dialect: Dialect,
37    desired: &Snapshot,
38    breakpoints: bool,
39) -> Result<PushPlan, CliError> {
40    let current = introspect_database(credentials, dialect)?.snapshot;
41    let (sql_statements, warnings) = generate_push_sql(&current, desired, breakpoints)?;
42    let destructive = sql_statements.iter().any(|s| is_destructive_statement(s));
43
44    Ok(PushPlan {
45        sql_statements,
46        warnings,
47        destructive,
48    })
49}
50
51/// Apply a previously planned push.
52pub fn apply_push(
53    credentials: &Credentials,
54    dialect: Dialect,
55    plan: &PushPlan,
56    force: bool,
57) -> Result<(), CliError> {
58    if plan.sql_statements.is_empty() {
59        return Ok(());
60    }
61
62    if plan.destructive && !force {
63        let confirmed = confirm_destructive()?;
64        if !confirmed {
65            return Ok(());
66        }
67    }
68
69    execute_statements(credentials, dialect, &plan.sql_statements)
70}
71
72/// Execute migrations against the database
73///
74/// This is the main entry point that dispatches to the appropriate driver
75/// based on the credentials type.
76pub fn run_migrations(
77    credentials: &Credentials,
78    dialect: Dialect,
79    migrations_dir: &Path,
80    migrations_table: &str,
81    migrations_schema: &str,
82) -> Result<MigrationResult, CliError> {
83    // Load migrations from filesystem
84    let mut set = MigrationSet::from_dir(migrations_dir, dialect.to_base())
85        .map_err(|e| CliError::Other(format!("Failed to load migrations: {}", e)))?;
86
87    // Apply overrides from config
88    if !migrations_table.trim().is_empty() {
89        set = set.with_table(migrations_table.to_string());
90    }
91    if dialect == Dialect::Postgresql && !migrations_schema.trim().is_empty() {
92        set = set.with_schema(migrations_schema.to_string());
93    }
94
95    if set.all().is_empty() {
96        return Ok(MigrationResult {
97            applied_count: 0,
98            applied_migrations: vec![],
99        });
100    }
101
102    match credentials {
103        #[cfg(feature = "rusqlite")]
104        Credentials::Sqlite { path } => run_sqlite_migrations(&set, path),
105
106        #[cfg(not(feature = "rusqlite"))]
107        Credentials::Sqlite { .. } => Err(CliError::MissingDriver {
108            dialect: "SQLite",
109            feature: "rusqlite",
110        }),
111
112        #[cfg(any(feature = "libsql", feature = "turso"))]
113        Credentials::Turso { url, auth_token } => {
114            let _auth_token = auth_token.as_deref();
115            if is_local_libsql(url) {
116                #[cfg(feature = "libsql")]
117                {
118                    run_libsql_local_migrations(&set, url)
119                }
120                #[cfg(not(feature = "libsql"))]
121                {
122                    Err(CliError::MissingDriver {
123                        dialect: "LibSQL (local)",
124                        feature: "libsql",
125                    })
126                }
127            } else {
128                #[cfg(feature = "turso")]
129                {
130                    run_turso_migrations(&set, url, _auth_token)
131                }
132                #[cfg(not(feature = "turso"))]
133                {
134                    Err(CliError::MissingDriver {
135                        dialect: "Turso (remote)",
136                        feature: "turso",
137                    })
138                }
139            }
140        }
141
142        #[cfg(all(not(feature = "turso"), not(feature = "libsql")))]
143        Credentials::Turso { .. } => Err(CliError::MissingDriver {
144            dialect: "Turso",
145            feature: "turso or libsql",
146        }),
147
148        // PostgreSQL - prefer sync driver if available, fall back to async
149        #[cfg(feature = "postgres-sync")]
150        Credentials::Postgres(creds) => run_postgres_sync_migrations(&set, creds),
151
152        #[cfg(all(not(feature = "postgres-sync"), feature = "tokio-postgres"))]
153        Credentials::Postgres(creds) => run_postgres_async_migrations(&set, creds),
154
155        #[cfg(all(not(feature = "postgres-sync"), not(feature = "tokio-postgres")))]
156        Credentials::Postgres(_) => Err(CliError::MissingDriver {
157            dialect: "PostgreSQL",
158            feature: "postgres-sync or tokio-postgres",
159        }),
160    }
161}
162
163fn is_destructive_statement(sql: &str) -> bool {
164    let s = sql.trim().to_uppercase();
165    s.contains("DROP TABLE")
166        || s.contains("DROP COLUMN")
167        || s.contains("DROP INDEX")
168        || s.contains("TRUNCATE")
169        || (s.contains("ALTER TABLE") && s.contains(" DROP "))
170}
171
172fn confirm_destructive() -> Result<bool, CliError> {
173    use std::io::{self, IsTerminal, Write};
174
175    if !io::stdin().is_terminal() {
176        return Err(CliError::Other(
177            "Refusing to apply potentially destructive changes in non-interactive mode. Use --explain or --force."
178                .into(),
179        ));
180    }
181
182    println!(
183        "{}",
184        output::warning("Potentially destructive changes detected (DROP/TRUNCATE/etc).")
185    );
186    print!("Apply anyway? [y/N]: ");
187    io::stdout()
188        .flush()
189        .map_err(|e| CliError::IoError(e.to_string()))?;
190
191    let mut line = String::new();
192    io::stdin()
193        .read_line(&mut line)
194        .map_err(|e| CliError::IoError(e.to_string()))?;
195    let ans = line.trim().to_ascii_lowercase();
196    Ok(ans == "y" || ans == "yes")
197}
198
199fn generate_push_sql(
200    current: &Snapshot,
201    desired: &Snapshot,
202    breakpoints: bool,
203) -> Result<(Vec<String>, Vec<String>), CliError> {
204    match (current, desired) {
205        (Snapshot::Sqlite(prev_snap), Snapshot::Sqlite(curr_snap)) => {
206            use drizzle_migrations::sqlite::collection::SQLiteDDL;
207            use drizzle_migrations::sqlite::diff::compute_migration;
208
209            let prev_ddl = SQLiteDDL::from_entities(prev_snap.ddl.clone());
210            let cur_ddl = SQLiteDDL::from_entities(curr_snap.ddl.clone());
211
212            let diff = compute_migration(&prev_ddl, &cur_ddl);
213            Ok((diff.sql_statements, diff.warnings))
214        }
215        (Snapshot::Postgres(prev_snap), Snapshot::Postgres(curr_snap)) => {
216            use drizzle_migrations::postgres::diff_full_snapshots;
217            use drizzle_migrations::postgres::statements::PostgresGenerator;
218
219            let diff = diff_full_snapshots(prev_snap, curr_snap);
220            let generator = PostgresGenerator::new().with_breakpoints(breakpoints);
221            Ok((generator.generate(&diff.diffs), Vec::new()))
222        }
223        _ => Err(CliError::DialectMismatch),
224    }
225}
226
227fn execute_statements(
228    credentials: &Credentials,
229    _dialect: Dialect,
230    statements: &[String],
231) -> Result<(), CliError> {
232    // In some feature combinations (no drivers), the match arms that would use `statements`
233    // are compiled out. Touch it to avoid unused-parameter warnings.
234    let _ = statements;
235
236    match credentials {
237        #[cfg(feature = "rusqlite")]
238        Credentials::Sqlite { path } => execute_sqlite_statements(path, statements),
239
240        #[cfg(not(feature = "rusqlite"))]
241        Credentials::Sqlite { .. } => Err(CliError::MissingDriver {
242            dialect: "SQLite",
243            feature: "rusqlite",
244        }),
245
246        #[cfg(any(feature = "libsql", feature = "turso"))]
247        Credentials::Turso { url, auth_token } => {
248            let _auth_token = auth_token.as_deref();
249            if is_local_libsql(url) {
250                #[cfg(feature = "libsql")]
251                {
252                    execute_libsql_local_statements(url, statements)
253                }
254                #[cfg(not(feature = "libsql"))]
255                {
256                    Err(CliError::MissingDriver {
257                        dialect: "LibSQL (local)",
258                        feature: "libsql",
259                    })
260                }
261            } else {
262                #[cfg(feature = "turso")]
263                {
264                    execute_turso_statements(url, _auth_token, statements)
265                }
266                #[cfg(not(feature = "turso"))]
267                {
268                    Err(CliError::MissingDriver {
269                        dialect: "Turso (remote)",
270                        feature: "turso",
271                    })
272                }
273            }
274        }
275
276        #[cfg(all(not(feature = "turso"), not(feature = "libsql")))]
277        Credentials::Turso { .. } => Err(CliError::MissingDriver {
278            dialect: "Turso",
279            feature: "turso or libsql",
280        }),
281
282        #[cfg(feature = "postgres-sync")]
283        Credentials::Postgres(creds) => execute_postgres_sync_statements(creds, statements),
284
285        #[cfg(all(not(feature = "postgres-sync"), feature = "tokio-postgres"))]
286        Credentials::Postgres(creds) => execute_postgres_async_statements(creds, statements),
287
288        #[cfg(all(not(feature = "postgres-sync"), not(feature = "tokio-postgres")))]
289        Credentials::Postgres(_) => Err(CliError::MissingDriver {
290            dialect: "PostgreSQL",
291            feature: "postgres-sync or tokio-postgres",
292        }),
293    }
294}
295
296/// Check if a Turso URL is a local libsql database
297#[allow(dead_code)]
298fn is_local_libsql(url: &str) -> bool {
299    url.starts_with("file:")
300        || url.starts_with("./")
301        || url.starts_with("/")
302        || !url.contains("://")
303}
304
305#[cfg(any(feature = "rusqlite", feature = "libsql", feature = "turso"))]
306fn process_sqlite_uniques_from_indexes(
307    raw_indexes: &[drizzle_migrations::sqlite::introspect::RawIndexInfo],
308    index_columns: &[drizzle_migrations::sqlite::introspect::RawIndexColumn],
309) -> Vec<drizzle_migrations::sqlite::UniqueConstraint> {
310    use drizzle_migrations::sqlite::UniqueConstraint;
311
312    let mut uniques = Vec::new();
313
314    for idx in raw_indexes.iter().filter(|i| i.origin == "u") {
315        let mut cols: Vec<(i32, String)> = index_columns
316            .iter()
317            .filter(|c| c.index_name == idx.name && c.key)
318            .filter_map(|c| c.name.clone().map(|n| (c.seqno, n)))
319            .collect();
320        cols.sort_by_key(|(seq, _)| *seq);
321        let col_names: Vec<String> = cols.into_iter().map(|(_, n)| n).collect();
322        if col_names.is_empty() {
323            continue;
324        }
325
326        let name_explicit = !idx.name.starts_with("sqlite_autoindex_");
327        let constraint_name = if name_explicit {
328            idx.name.clone()
329        } else {
330            let refs: Vec<&str> = col_names.iter().map(String::as_str).collect();
331            drizzle_migrations::sqlite::ddl::name_for_unique(&idx.table, &refs)
332        };
333
334        let mut uniq =
335            UniqueConstraint::from_strings(idx.table.clone(), constraint_name, col_names);
336        uniq.name_explicit = name_explicit;
337        uniques.push(uniq);
338    }
339
340    uniques
341}
342
343// ============================================================================
344// SQLite (rusqlite)
345// ============================================================================
346
347#[cfg(feature = "rusqlite")]
348fn execute_sqlite_statements(path: &str, statements: &[String]) -> Result<(), CliError> {
349    let conn = rusqlite::Connection::open(path).map_err(|e| {
350        CliError::ConnectionError(format!("Failed to open SQLite database '{}': {}", path, e))
351    })?;
352
353    conn.execute("BEGIN", [])
354        .map_err(|e| CliError::MigrationError(e.to_string()))?;
355
356    for stmt in statements {
357        let s = stmt.trim();
358        if s.is_empty() {
359            continue;
360        }
361        if let Err(e) = conn.execute(s, []) {
362            let _ = conn.execute("ROLLBACK", []);
363            return Err(CliError::MigrationError(format!(
364                "Statement failed: {}\n{}",
365                e, s
366            )));
367        }
368    }
369
370    conn.execute("COMMIT", [])
371        .map_err(|e| CliError::MigrationError(e.to_string()))?;
372
373    Ok(())
374}
375
376#[cfg(feature = "rusqlite")]
377fn run_sqlite_migrations(set: &MigrationSet, path: &str) -> Result<MigrationResult, CliError> {
378    let conn = rusqlite::Connection::open(path).map_err(|e| {
379        CliError::ConnectionError(format!("Failed to open SQLite database '{}': {}", path, e))
380    })?;
381
382    // Create migrations table
383    conn.execute(&set.create_table_sql(), []).map_err(|e| {
384        CliError::MigrationError(format!("Failed to create migrations table: {}", e))
385    })?;
386
387    // Query applied hashes
388    let applied_hashes = query_applied_hashes_sqlite(&conn, set)?;
389
390    // Get pending migrations
391    let pending: Vec<_> = set.pending(&applied_hashes).collect();
392    if pending.is_empty() {
393        return Ok(MigrationResult {
394            applied_count: 0,
395            applied_migrations: vec![],
396        });
397    }
398
399    // Execute in transaction
400    conn.execute("BEGIN", [])
401        .map_err(|e| CliError::MigrationError(e.to_string()))?;
402
403    let mut applied = Vec::new();
404    for migration in &pending {
405        for stmt in migration.statements() {
406            if !stmt.trim().is_empty()
407                && let Err(e) = conn.execute(stmt, [])
408            {
409                let _ = conn.execute("ROLLBACK", []);
410                return Err(CliError::MigrationError(format!(
411                    "Migration '{}' failed: {}",
412                    migration.hash(),
413                    e
414                )));
415            }
416        }
417        if let Err(e) = conn.execute(
418            &set.record_migration_sql(migration.hash(), migration.created_at()),
419            [],
420        ) {
421            let _ = conn.execute("ROLLBACK", []);
422            return Err(CliError::MigrationError(e.to_string()));
423        }
424        applied.push(migration.hash().to_string());
425    }
426
427    conn.execute("COMMIT", [])
428        .map_err(|e| CliError::MigrationError(e.to_string()))?;
429
430    Ok(MigrationResult {
431        applied_count: applied.len(),
432        applied_migrations: applied,
433    })
434}
435
436#[cfg(feature = "rusqlite")]
437fn query_applied_hashes_sqlite(
438    conn: &rusqlite::Connection,
439    set: &MigrationSet,
440) -> Result<Vec<String>, CliError> {
441    let mut stmt = match conn.prepare(&set.query_all_hashes_sql()) {
442        Ok(s) => s,
443        Err(_) => return Ok(vec![]), // Table might not exist yet
444    };
445
446    let hashes = stmt
447        .query_map([], |row| row.get(0))
448        .map_err(|e| CliError::MigrationError(e.to_string()))?
449        .filter_map(Result::ok)
450        .collect();
451
452    Ok(hashes)
453}
454
455// ============================================================================
456// PostgreSQL (postgres - sync)
457// ============================================================================
458
459#[cfg(feature = "postgres-sync")]
460fn execute_postgres_sync_statements(
461    creds: &PostgresCreds,
462    statements: &[String],
463) -> Result<(), CliError> {
464    let url = creds.connection_url();
465    let mut client = postgres::Client::connect(&url, postgres::NoTls).map_err(|e| {
466        CliError::ConnectionError(format!("Failed to connect to PostgreSQL: {}", e))
467    })?;
468
469    let mut tx = client
470        .transaction()
471        .map_err(|e| CliError::MigrationError(e.to_string()))?;
472
473    for stmt in statements {
474        let s = stmt.trim();
475        if s.is_empty() {
476            continue;
477        }
478        tx.execute(s, &[])
479            .map_err(|e| CliError::MigrationError(format!("Statement failed: {}\n{}", e, s)))?;
480    }
481
482    tx.commit()
483        .map_err(|e| CliError::MigrationError(e.to_string()))?;
484
485    Ok(())
486}
487
488#[cfg(feature = "postgres-sync")]
489fn run_postgres_sync_migrations(
490    set: &MigrationSet,
491    creds: &PostgresCreds,
492) -> Result<MigrationResult, CliError> {
493    let url = creds.connection_url();
494    let mut client = postgres::Client::connect(&url, postgres::NoTls).map_err(|e| {
495        CliError::ConnectionError(format!("Failed to connect to PostgreSQL: {}", e))
496    })?;
497
498    // Create schema if needed
499    if let Some(schema_sql) = set.create_schema_sql() {
500        client
501            .execute(&schema_sql, &[])
502            .map_err(|e| CliError::MigrationError(e.to_string()))?;
503    }
504
505    // Create migrations table
506    client
507        .execute(&set.create_table_sql(), &[])
508        .map_err(|e| CliError::MigrationError(e.to_string()))?;
509
510    // Query applied hashes
511    let rows = client
512        .query(&set.query_all_hashes_sql(), &[])
513        .unwrap_or_default();
514    let applied_hashes: Vec<String> = rows.iter().map(|r| r.get(0)).collect();
515
516    // Get pending migrations
517    let pending: Vec<_> = set.pending(&applied_hashes).collect();
518    if pending.is_empty() {
519        return Ok(MigrationResult {
520            applied_count: 0,
521            applied_migrations: vec![],
522        });
523    }
524
525    // Execute in transaction
526    let mut tx = client
527        .transaction()
528        .map_err(|e| CliError::MigrationError(e.to_string()))?;
529
530    let mut applied = Vec::new();
531    for migration in &pending {
532        for stmt in migration.statements() {
533            if !stmt.trim().is_empty() {
534                tx.execute(stmt, &[]).map_err(|e| {
535                    CliError::MigrationError(format!(
536                        "Migration '{}' failed: {}",
537                        migration.hash(),
538                        e
539                    ))
540                })?;
541            }
542        }
543        tx.execute(
544            &set.record_migration_sql(migration.hash(), migration.created_at()),
545            &[],
546        )
547        .map_err(|e| CliError::MigrationError(e.to_string()))?;
548        applied.push(migration.hash().to_string());
549    }
550
551    tx.commit()
552        .map_err(|e| CliError::MigrationError(e.to_string()))?;
553
554    Ok(MigrationResult {
555        applied_count: applied.len(),
556        applied_migrations: applied,
557    })
558}
559
560// ============================================================================
561// PostgreSQL (tokio-postgres - async)
562// ============================================================================
563
564#[cfg(feature = "tokio-postgres")]
565fn execute_postgres_async_statements(
566    creds: &PostgresCreds,
567    statements: &[String],
568) -> Result<(), CliError> {
569    let rt = tokio::runtime::Builder::new_current_thread()
570        .enable_all()
571        .build()
572        .map_err(|e| CliError::Other(format!("Failed to create async runtime: {}", e)))?;
573
574    rt.block_on(execute_postgres_async_inner(creds, statements))
575}
576
577#[cfg(feature = "tokio-postgres")]
578async fn execute_postgres_async_inner(
579    creds: &PostgresCreds,
580    statements: &[String],
581) -> Result<(), CliError> {
582    let url = creds.connection_url();
583    let (mut client, connection) = tokio_postgres::connect(&url, tokio_postgres::NoTls)
584        .await
585        .map_err(|e| {
586            CliError::ConnectionError(format!("Failed to connect to PostgreSQL: {}", e))
587        })?;
588
589    tokio::spawn(async move {
590        if let Err(e) = connection.await {
591            eprintln!(
592                "{}",
593                output::err_line(&format!("PostgreSQL connection error: {e}"))
594            );
595        }
596    });
597
598    let tx = client
599        .transaction()
600        .await
601        .map_err(|e| CliError::MigrationError(e.to_string()))?;
602
603    for stmt in statements {
604        let s = stmt.trim();
605        if s.is_empty() {
606            continue;
607        }
608        tx.execute(s, &[])
609            .await
610            .map_err(|e| CliError::MigrationError(format!("Statement failed: {}\n{}", e, s)))?;
611    }
612
613    tx.commit()
614        .await
615        .map_err(|e| CliError::MigrationError(e.to_string()))?;
616
617    Ok(())
618}
619
620#[cfg(feature = "tokio-postgres")]
621#[allow(dead_code)] // Used when postgres-sync is not enabled
622fn run_postgres_async_migrations(
623    set: &MigrationSet,
624    creds: &PostgresCreds,
625) -> Result<MigrationResult, CliError> {
626    let rt = tokio::runtime::Builder::new_current_thread()
627        .enable_all()
628        .build()
629        .map_err(|e| CliError::Other(format!("Failed to create async runtime: {}", e)))?;
630
631    rt.block_on(run_postgres_async_inner(set, creds))
632}
633
634#[cfg(feature = "tokio-postgres")]
635#[allow(dead_code)]
636async fn run_postgres_async_inner(
637    set: &MigrationSet,
638    creds: &PostgresCreds,
639) -> Result<MigrationResult, CliError> {
640    let url = creds.connection_url();
641    let (mut client, connection) = tokio_postgres::connect(&url, tokio_postgres::NoTls)
642        .await
643        .map_err(|e| {
644            CliError::ConnectionError(format!("Failed to connect to PostgreSQL: {}", e))
645        })?;
646
647    // Spawn connection handler
648    tokio::spawn(async move {
649        if let Err(e) = connection.await {
650            eprintln!(
651                "{}",
652                output::err_line(&format!("PostgreSQL connection error: {e}"))
653            );
654        }
655    });
656
657    // Create schema if needed
658    if let Some(schema_sql) = set.create_schema_sql() {
659        client
660            .execute(&schema_sql, &[])
661            .await
662            .map_err(|e| CliError::MigrationError(e.to_string()))?;
663    }
664
665    // Create migrations table
666    client
667        .execute(&set.create_table_sql(), &[])
668        .await
669        .map_err(|e| CliError::MigrationError(e.to_string()))?;
670
671    // Query applied hashes
672    let rows = client
673        .query(&set.query_all_hashes_sql(), &[])
674        .await
675        .unwrap_or_default();
676    let applied_hashes: Vec<String> = rows.iter().map(|r| r.get(0)).collect();
677
678    // Get pending migrations
679    let pending: Vec<_> = set.pending(&applied_hashes).collect();
680    if pending.is_empty() {
681        return Ok(MigrationResult {
682            applied_count: 0,
683            applied_migrations: vec![],
684        });
685    }
686
687    // Execute in transaction
688    let tx = client
689        .transaction()
690        .await
691        .map_err(|e| CliError::MigrationError(e.to_string()))?;
692
693    let mut applied = Vec::new();
694    for migration in &pending {
695        for stmt in migration.statements() {
696            if !stmt.trim().is_empty() {
697                tx.execute(stmt, &[]).await.map_err(|e| {
698                    CliError::MigrationError(format!(
699                        "Migration '{}' failed: {}",
700                        migration.hash(),
701                        e
702                    ))
703                })?;
704            }
705        }
706        tx.execute(
707            &set.record_migration_sql(migration.hash(), migration.created_at()),
708            &[],
709        )
710        .await
711        .map_err(|e| CliError::MigrationError(e.to_string()))?;
712        applied.push(migration.hash().to_string());
713    }
714
715    tx.commit()
716        .await
717        .map_err(|e| CliError::MigrationError(e.to_string()))?;
718
719    Ok(MigrationResult {
720        applied_count: applied.len(),
721        applied_migrations: applied,
722    })
723}
724
725// ============================================================================
726// LibSQL (local)
727// ============================================================================
728
729#[cfg(feature = "libsql")]
730fn execute_libsql_local_statements(path: &str, statements: &[String]) -> Result<(), CliError> {
731    let rt = tokio::runtime::Builder::new_current_thread()
732        .enable_all()
733        .build()
734        .map_err(|e| CliError::Other(format!("Failed to create async runtime: {}", e)))?;
735
736    rt.block_on(execute_libsql_local_inner(path, statements))
737}
738
739#[cfg(feature = "libsql")]
740async fn execute_libsql_local_inner(path: &str, statements: &[String]) -> Result<(), CliError> {
741    let db = libsql::Builder::new_local(path)
742        .build()
743        .await
744        .map_err(|e| {
745            CliError::ConnectionError(format!("Failed to open LibSQL database '{}': {}", path, e))
746        })?;
747
748    let conn = db
749        .connect()
750        .map_err(|e| CliError::ConnectionError(e.to_string()))?;
751
752    let tx = conn
753        .transaction()
754        .await
755        .map_err(|e| CliError::MigrationError(e.to_string()))?;
756
757    for stmt in statements {
758        let s = stmt.trim();
759        if s.is_empty() {
760            continue;
761        }
762        if let Err(e) = tx.execute(s, ()).await {
763            tx.rollback().await.ok();
764            return Err(CliError::MigrationError(format!(
765                "Statement failed: {}\n{}",
766                e, s
767            )));
768        }
769    }
770
771    tx.commit()
772        .await
773        .map_err(|e| CliError::MigrationError(e.to_string()))?;
774
775    Ok(())
776}
777
778#[cfg(feature = "libsql")]
779fn run_libsql_local_migrations(
780    set: &MigrationSet,
781    path: &str,
782) -> Result<MigrationResult, CliError> {
783    let rt = tokio::runtime::Builder::new_current_thread()
784        .enable_all()
785        .build()
786        .map_err(|e| CliError::Other(format!("Failed to create async runtime: {}", e)))?;
787
788    rt.block_on(run_libsql_local_inner(set, path))
789}
790
791#[cfg(feature = "libsql")]
792async fn run_libsql_local_inner(
793    set: &MigrationSet,
794    path: &str,
795) -> Result<MigrationResult, CliError> {
796    let db = libsql::Builder::new_local(path)
797        .build()
798        .await
799        .map_err(|e| {
800            CliError::ConnectionError(format!("Failed to open LibSQL database '{}': {}", path, e))
801        })?;
802
803    let conn = db
804        .connect()
805        .map_err(|e| CliError::ConnectionError(e.to_string()))?;
806
807    // Create migrations table
808    conn.execute(&set.create_table_sql(), ())
809        .await
810        .map_err(|e| {
811            CliError::MigrationError(format!("Failed to create migrations table: {}", e))
812        })?;
813
814    // Query applied hashes
815    let applied_hashes = query_applied_hashes_libsql(&conn, set).await?;
816
817    // Get pending migrations
818    let pending: Vec<_> = set.pending(&applied_hashes).collect();
819    if pending.is_empty() {
820        return Ok(MigrationResult {
821            applied_count: 0,
822            applied_migrations: vec![],
823        });
824    }
825
826    // Execute in transaction
827    let tx = conn
828        .transaction()
829        .await
830        .map_err(|e| CliError::MigrationError(e.to_string()))?;
831
832    let mut applied = Vec::new();
833    for migration in &pending {
834        for stmt in migration.statements() {
835            if !stmt.trim().is_empty()
836                && let Err(e) = tx.execute(stmt, ()).await
837            {
838                tx.rollback().await.ok();
839                return Err(CliError::MigrationError(format!(
840                    "Migration '{}' failed: {}",
841                    migration.hash(),
842                    e
843                )));
844            }
845        }
846        if let Err(e) = tx
847            .execute(
848                &set.record_migration_sql(migration.hash(), migration.created_at()),
849                (),
850            )
851            .await
852        {
853            tx.rollback().await.ok();
854            return Err(CliError::MigrationError(e.to_string()));
855        }
856        applied.push(migration.hash().to_string());
857    }
858
859    tx.commit()
860        .await
861        .map_err(|e| CliError::MigrationError(e.to_string()))?;
862
863    Ok(MigrationResult {
864        applied_count: applied.len(),
865        applied_migrations: applied,
866    })
867}
868
869#[cfg(feature = "libsql")]
870async fn query_applied_hashes_libsql(
871    conn: &libsql::Connection,
872    set: &MigrationSet,
873) -> Result<Vec<String>, CliError> {
874    let mut rows = match conn.query(&set.query_all_hashes_sql(), ()).await {
875        Ok(r) => r,
876        Err(_) => return Ok(vec![]), // Table might not exist yet
877    };
878
879    let mut hashes = Vec::new();
880    while let Ok(Some(row)) = rows.next().await {
881        if let Ok(hash) = row.get::<String>(0) {
882            hashes.push(hash);
883        }
884    }
885
886    Ok(hashes)
887}
888
889// ============================================================================
890// Turso (remote)
891// ============================================================================
892
893#[cfg(feature = "turso")]
894fn execute_turso_statements(
895    url: &str,
896    auth_token: Option<&str>,
897    statements: &[String],
898) -> Result<(), CliError> {
899    let rt = tokio::runtime::Builder::new_current_thread()
900        .enable_all()
901        .build()
902        .map_err(|e| CliError::Other(format!("Failed to create async runtime: {}", e)))?;
903
904    rt.block_on(execute_turso_inner(url, auth_token, statements))
905}
906
907#[cfg(feature = "turso")]
908async fn execute_turso_inner(
909    url: &str,
910    auth_token: Option<&str>,
911    statements: &[String],
912) -> Result<(), CliError> {
913    let builder =
914        libsql::Builder::new_remote(url.to_string(), auth_token.unwrap_or("").to_string());
915
916    let db = builder.build().await.map_err(|e| {
917        CliError::ConnectionError(format!("Failed to connect to Turso '{}': {}", url, e))
918    })?;
919
920    let conn = db
921        .connect()
922        .map_err(|e| CliError::ConnectionError(e.to_string()))?;
923
924    let tx = conn
925        .transaction()
926        .await
927        .map_err(|e| CliError::MigrationError(e.to_string()))?;
928
929    for stmt in statements {
930        let s = stmt.trim();
931        if s.is_empty() {
932            continue;
933        }
934        if let Err(e) = tx.execute(s, ()).await {
935            tx.rollback().await.ok();
936            return Err(CliError::MigrationError(format!(
937                "Statement failed: {}\n{}",
938                e, s
939            )));
940        }
941    }
942
943    tx.commit()
944        .await
945        .map_err(|e| CliError::MigrationError(e.to_string()))?;
946
947    Ok(())
948}
949
950#[cfg(feature = "turso")]
951fn run_turso_migrations(
952    set: &MigrationSet,
953    url: &str,
954    auth_token: Option<&str>,
955) -> Result<MigrationResult, CliError> {
956    let rt = tokio::runtime::Builder::new_current_thread()
957        .enable_all()
958        .build()
959        .map_err(|e| CliError::Other(format!("Failed to create async runtime: {}", e)))?;
960
961    rt.block_on(run_turso_inner(set, url, auth_token))
962}
963
964#[cfg(feature = "turso")]
965async fn run_turso_inner(
966    set: &MigrationSet,
967    url: &str,
968    auth_token: Option<&str>,
969) -> Result<MigrationResult, CliError> {
970    let builder =
971        libsql::Builder::new_remote(url.to_string(), auth_token.unwrap_or("").to_string());
972
973    let db = builder.build().await.map_err(|e| {
974        CliError::ConnectionError(format!("Failed to connect to Turso '{}': {}", url, e))
975    })?;
976
977    let conn = db
978        .connect()
979        .map_err(|e| CliError::ConnectionError(e.to_string()))?;
980
981    // Create migrations table
982    conn.execute(&set.create_table_sql(), ())
983        .await
984        .map_err(|e| {
985            CliError::MigrationError(format!("Failed to create migrations table: {}", e))
986        })?;
987
988    // Query applied hashes
989    let applied_hashes = query_applied_hashes_turso(&conn, set).await?;
990
991    // Get pending migrations
992    let pending: Vec<_> = set.pending(&applied_hashes).collect();
993    if pending.is_empty() {
994        return Ok(MigrationResult {
995            applied_count: 0,
996            applied_migrations: vec![],
997        });
998    }
999
1000    // Execute in transaction
1001    let tx = conn
1002        .transaction()
1003        .await
1004        .map_err(|e| CliError::MigrationError(e.to_string()))?;
1005
1006    let mut applied = Vec::new();
1007    for migration in &pending {
1008        for stmt in migration.statements() {
1009            if !stmt.trim().is_empty()
1010                && let Err(e) = tx.execute(stmt, ()).await
1011            {
1012                tx.rollback().await.ok();
1013                return Err(CliError::MigrationError(format!(
1014                    "Migration '{}' failed: {}",
1015                    migration.hash(),
1016                    e
1017                )));
1018            }
1019        }
1020        if let Err(e) = tx
1021            .execute(
1022                &set.record_migration_sql(migration.hash(), migration.created_at()),
1023                (),
1024            )
1025            .await
1026        {
1027            tx.rollback().await.ok();
1028            return Err(CliError::MigrationError(e.to_string()));
1029        }
1030        applied.push(migration.hash().to_string());
1031    }
1032
1033    tx.commit()
1034        .await
1035        .map_err(|e| CliError::MigrationError(e.to_string()))?;
1036
1037    Ok(MigrationResult {
1038        applied_count: applied.len(),
1039        applied_migrations: applied,
1040    })
1041}
1042
1043#[cfg(feature = "turso")]
1044async fn query_applied_hashes_turso(
1045    conn: &libsql::Connection,
1046    set: &MigrationSet,
1047) -> Result<Vec<String>, CliError> {
1048    let mut rows = match conn.query(&set.query_all_hashes_sql(), ()).await {
1049        Ok(r) => r,
1050        Err(_) => return Ok(vec![]), // Table might not exist yet
1051    };
1052
1053    let mut hashes = Vec::new();
1054    while let Ok(Some(row)) = rows.next().await {
1055        if let Ok(hash) = row.get::<String>(0) {
1056            hashes.push(hash);
1057        }
1058    }
1059
1060    Ok(hashes)
1061}
1062
1063// ============================================================================
1064// Database Introspection
1065// ============================================================================
1066
1067/// Result of database introspection
1068#[derive(Debug)]
1069pub struct IntrospectResult {
1070    /// Generated Rust schema code
1071    pub schema_code: String,
1072    /// Number of tables found
1073    pub table_count: usize,
1074    /// Number of indexes found
1075    pub index_count: usize,
1076    /// Number of views found
1077    pub view_count: usize,
1078    /// Any warnings during introspection
1079    pub warnings: Vec<String>,
1080    /// The schema snapshot for migration tracking
1081    pub snapshot: Snapshot,
1082    /// Path to the generated snapshot file
1083    pub snapshot_path: std::path::PathBuf,
1084}
1085
1086/// Introspect a database and write schema/snapshot files
1087///
1088/// This is the main entry point for CLI introspection.
1089pub fn run_introspection(
1090    credentials: &Credentials,
1091    dialect: Dialect,
1092    out_dir: &Path,
1093    init_metadata: bool,
1094    migrations_table: &str,
1095    migrations_schema: &str,
1096) -> Result<IntrospectResult, CliError> {
1097    use drizzle_migrations::journal::Journal;
1098    use drizzle_migrations::words::generate_migration_tag;
1099
1100    // Perform introspection
1101    let mut result = introspect_database(credentials, dialect)?;
1102
1103    // Write schema file
1104    let schema_path = out_dir.join("schema.rs");
1105    if let Some(parent) = schema_path.parent() {
1106        std::fs::create_dir_all(parent).map_err(|e| {
1107            CliError::Other(format!(
1108                "Failed to create output directory '{}': {}",
1109                parent.display(),
1110                e
1111            ))
1112        })?;
1113    }
1114    std::fs::write(&schema_path, &result.schema_code).map_err(|e| {
1115        CliError::Other(format!(
1116            "Failed to write schema file '{}': {}",
1117            schema_path.display(),
1118            e
1119        ))
1120    })?;
1121
1122    // Create meta directory for journal
1123    let meta_dir = out_dir.join("meta");
1124    std::fs::create_dir_all(&meta_dir).map_err(|e| {
1125        CliError::Other(format!(
1126            "Failed to create meta directory '{}': {}",
1127            meta_dir.display(),
1128            e
1129        ))
1130    })?;
1131
1132    // Load or create journal
1133    let journal_path = meta_dir.join("_journal.json");
1134    let mut journal = Journal::load_or_create(&journal_path, dialect.to_base())
1135        .map_err(|e| CliError::Other(format!("Failed to load journal: {}", e)))?;
1136
1137    // Generate migration tag (V3 format: timestamp-based)
1138    let tag = generate_migration_tag(None);
1139
1140    // Create migration directory: {out}/{tag}/
1141    let migration_dir = out_dir.join(&tag);
1142    std::fs::create_dir_all(&migration_dir).map_err(|e| {
1143        CliError::Other(format!(
1144            "Failed to create migration directory '{}': {}",
1145            migration_dir.display(),
1146            e
1147        ))
1148    })?;
1149
1150    // Save snapshot JSON: {out}/{tag}/snapshot.json
1151    let snapshot_path = migration_dir.join("snapshot.json");
1152    result.snapshot.save(&snapshot_path).map_err(|e| {
1153        CliError::Other(format!(
1154            "Failed to write snapshot file '{}': {}",
1155            snapshot_path.display(),
1156            e
1157        ))
1158    })?;
1159
1160    // Generate initial migration SQL by diffing against empty snapshot
1161    let base_dialect = dialect.to_base();
1162    let empty_snapshot = Snapshot::empty(base_dialect);
1163    let sql_statements = generate_introspect_migration(&empty_snapshot, &result.snapshot, true)?;
1164
1165    // Write migration.sql: {out}/{tag}/migration.sql
1166    let migration_sql_path = migration_dir.join("migration.sql");
1167    let sql_content = if sql_statements.is_empty() {
1168        "-- No tables to create (empty database)\n".to_string()
1169    } else {
1170        sql_statements.join("\n--> statement-breakpoint\n")
1171    };
1172    std::fs::write(&migration_sql_path, &sql_content).map_err(|e| {
1173        CliError::Other(format!(
1174            "Failed to write migration file '{}': {}",
1175            migration_sql_path.display(),
1176            e
1177        ))
1178    })?;
1179
1180    // Update result with path
1181    result.snapshot_path = snapshot_path;
1182
1183    // Update journal
1184    journal.add_entry(tag.clone(), true); // Default to breakpoints=true for now
1185    journal
1186        .save(&journal_path)
1187        .map_err(|e| CliError::Other(format!("Failed to save journal: {}", e)))?;
1188
1189    if init_metadata {
1190        apply_init_metadata(
1191            credentials,
1192            dialect,
1193            out_dir,
1194            migrations_table,
1195            migrations_schema,
1196        )?;
1197    }
1198
1199    Ok(result)
1200}
1201
1202// =============================================================================
1203// Init metadata handling
1204// =============================================================================
1205
1206fn apply_init_metadata(
1207    credentials: &Credentials,
1208    dialect: Dialect,
1209    out_dir: &Path,
1210    migrations_table: &str,
1211    migrations_schema: &str,
1212) -> Result<(), CliError> {
1213    use drizzle_migrations::MigrationSet;
1214
1215    let mut set = MigrationSet::from_dir(out_dir, dialect.to_base())
1216        .map_err(|e| CliError::Other(format!("Failed to load migrations: {}", e)))?;
1217
1218    if !migrations_table.trim().is_empty() {
1219        set = set.with_table(migrations_table.to_string());
1220    }
1221    if dialect == Dialect::Postgresql && !migrations_schema.trim().is_empty() {
1222        set = set.with_schema(migrations_schema.to_string());
1223    }
1224
1225    if set.all().is_empty() {
1226        return Err(CliError::Other(
1227            "--init can't be used with empty migrations".into(),
1228        ));
1229    }
1230
1231    match credentials {
1232        #[cfg(feature = "rusqlite")]
1233        Credentials::Sqlite { path } => init_sqlite_metadata(path, &set),
1234
1235        #[cfg(not(feature = "rusqlite"))]
1236        Credentials::Sqlite { .. } => Err(CliError::MissingDriver {
1237            dialect: "SQLite",
1238            feature: "rusqlite",
1239        }),
1240
1241        #[cfg(any(feature = "libsql", feature = "turso"))]
1242        Credentials::Turso { url, auth_token } => {
1243            let _auth_token = auth_token.as_deref();
1244            if is_local_libsql(url) {
1245                #[cfg(feature = "libsql")]
1246                {
1247                    init_libsql_local_metadata(url, &set)
1248                }
1249                #[cfg(not(feature = "libsql"))]
1250                {
1251                    Err(CliError::MissingDriver {
1252                        dialect: "LibSQL (local)",
1253                        feature: "libsql",
1254                    })
1255                }
1256            } else {
1257                #[cfg(feature = "turso")]
1258                {
1259                    init_turso_metadata(url, _auth_token, &set)
1260                }
1261                #[cfg(not(feature = "turso"))]
1262                {
1263                    Err(CliError::MissingDriver {
1264                        dialect: "Turso (remote)",
1265                        feature: "turso",
1266                    })
1267                }
1268            }
1269        }
1270
1271        #[cfg(all(not(feature = "turso"), not(feature = "libsql")))]
1272        Credentials::Turso { .. } => Err(CliError::MissingDriver {
1273            dialect: "Turso",
1274            feature: "turso or libsql",
1275        }),
1276
1277        #[cfg(feature = "postgres-sync")]
1278        Credentials::Postgres(creds) => init_postgres_sync_metadata(creds, &set),
1279
1280        #[cfg(all(not(feature = "postgres-sync"), feature = "tokio-postgres"))]
1281        Credentials::Postgres(creds) => init_postgres_async_metadata(creds, &set),
1282
1283        #[cfg(all(not(feature = "postgres-sync"), not(feature = "tokio-postgres")))]
1284        Credentials::Postgres(_) => Err(CliError::MissingDriver {
1285            dialect: "PostgreSQL",
1286            feature: "postgres-sync or tokio-postgres",
1287        }),
1288    }
1289}
1290
1291#[cfg(any(
1292    feature = "rusqlite",
1293    feature = "libsql",
1294    feature = "turso",
1295    feature = "postgres-sync",
1296    feature = "tokio-postgres"
1297))]
1298fn validate_init_metadata(applied_hashes: &[String], set: &MigrationSet) -> Result<(), CliError> {
1299    if !applied_hashes.is_empty() {
1300        return Err(CliError::Other(
1301            "--init can't be used when database already has migrations set".into(),
1302        ));
1303    }
1304
1305    let first = set
1306        .all()
1307        .first()
1308        .ok_or_else(|| CliError::Other("--init can't be used with empty migrations".into()))?;
1309
1310    let created_at = first.created_at();
1311    if set.all().iter().any(|m| m.created_at() != created_at) {
1312        return Err(CliError::Other(
1313            "--init can't be used with existing migrations".into(),
1314        ));
1315    }
1316
1317    Ok(())
1318}
1319
1320// =============================================================================
1321// Init metadata implementations
1322// =============================================================================
1323
1324#[cfg(feature = "rusqlite")]
1325fn init_sqlite_metadata(path: &str, set: &MigrationSet) -> Result<(), CliError> {
1326    let conn = rusqlite::Connection::open(path).map_err(|e| {
1327        CliError::ConnectionError(format!("Failed to open SQLite database '{}': {}", path, e))
1328    })?;
1329
1330    conn.execute(&set.create_table_sql(), []).map_err(|e| {
1331        CliError::MigrationError(format!("Failed to create migrations table: {}", e))
1332    })?;
1333
1334    let applied_hashes = query_applied_hashes_sqlite(&conn, set)?;
1335    validate_init_metadata(&applied_hashes, set)?;
1336
1337    let first = set
1338        .all()
1339        .first()
1340        .ok_or_else(|| CliError::Other("--init can't be used with empty migrations".into()))?;
1341
1342    conn.execute(
1343        &set.record_migration_sql(first.hash(), first.created_at()),
1344        [],
1345    )
1346    .map_err(|e| CliError::MigrationError(e.to_string()))?;
1347
1348    Ok(())
1349}
1350
1351#[cfg(feature = "libsql")]
1352fn init_libsql_local_metadata(path: &str, set: &MigrationSet) -> Result<(), CliError> {
1353    let rt = tokio::runtime::Builder::new_current_thread()
1354        .enable_all()
1355        .build()
1356        .map_err(|e| CliError::Other(format!("Failed to create async runtime: {}", e)))?;
1357
1358    rt.block_on(init_libsql_local_metadata_inner(path, set))
1359}
1360
1361#[cfg(feature = "libsql")]
1362async fn init_libsql_local_metadata_inner(path: &str, set: &MigrationSet) -> Result<(), CliError> {
1363    let db = libsql::Builder::new_local(path)
1364        .build()
1365        .await
1366        .map_err(|e| {
1367            CliError::ConnectionError(format!("Failed to open LibSQL database '{}': {}", path, e))
1368        })?;
1369
1370    let conn = db
1371        .connect()
1372        .map_err(|e| CliError::ConnectionError(e.to_string()))?;
1373
1374    conn.execute(&set.create_table_sql(), ())
1375        .await
1376        .map_err(|e| {
1377            CliError::MigrationError(format!("Failed to create migrations table: {}", e))
1378        })?;
1379
1380    let applied_hashes = query_applied_hashes_libsql(&conn, set).await?;
1381    validate_init_metadata(&applied_hashes, set)?;
1382
1383    let first = set
1384        .all()
1385        .first()
1386        .ok_or_else(|| CliError::Other("--init can't be used with empty migrations".into()))?;
1387
1388    conn.execute(
1389        &set.record_migration_sql(first.hash(), first.created_at()),
1390        (),
1391    )
1392    .await
1393    .map_err(|e| CliError::MigrationError(e.to_string()))?;
1394
1395    Ok(())
1396}
1397
1398#[cfg(feature = "turso")]
1399fn init_turso_metadata(
1400    url: &str,
1401    auth_token: Option<&str>,
1402    set: &MigrationSet,
1403) -> Result<(), CliError> {
1404    let rt = tokio::runtime::Builder::new_current_thread()
1405        .enable_all()
1406        .build()
1407        .map_err(|e| CliError::Other(format!("Failed to create async runtime: {}", e)))?;
1408
1409    rt.block_on(init_turso_metadata_inner(url, auth_token, set))
1410}
1411
1412#[cfg(feature = "turso")]
1413async fn init_turso_metadata_inner(
1414    url: &str,
1415    auth_token: Option<&str>,
1416    set: &MigrationSet,
1417) -> Result<(), CliError> {
1418    let builder =
1419        libsql::Builder::new_remote(url.to_string(), auth_token.unwrap_or("").to_string());
1420
1421    let db = builder.build().await.map_err(|e| {
1422        CliError::ConnectionError(format!("Failed to connect to Turso '{}': {}", url, e))
1423    })?;
1424
1425    let conn = db
1426        .connect()
1427        .map_err(|e| CliError::ConnectionError(e.to_string()))?;
1428
1429    conn.execute(&set.create_table_sql(), ())
1430        .await
1431        .map_err(|e| {
1432            CliError::MigrationError(format!("Failed to create migrations table: {}", e))
1433        })?;
1434
1435    let applied_hashes = query_applied_hashes_turso(&conn, set).await?;
1436    validate_init_metadata(&applied_hashes, set)?;
1437
1438    let first = set
1439        .all()
1440        .first()
1441        .ok_or_else(|| CliError::Other("--init can't be used with empty migrations".into()))?;
1442
1443    conn.execute(
1444        &set.record_migration_sql(first.hash(), first.created_at()),
1445        (),
1446    )
1447    .await
1448    .map_err(|e| CliError::MigrationError(e.to_string()))?;
1449
1450    Ok(())
1451}
1452
1453#[cfg(feature = "postgres-sync")]
1454fn init_postgres_sync_metadata(creds: &PostgresCreds, set: &MigrationSet) -> Result<(), CliError> {
1455    let url = creds.connection_url();
1456    let mut client = postgres::Client::connect(&url, postgres::NoTls).map_err(|e| {
1457        CliError::ConnectionError(format!("Failed to connect to PostgreSQL: {}", e))
1458    })?;
1459
1460    if let Some(schema_sql) = set.create_schema_sql() {
1461        client
1462            .execute(&schema_sql, &[])
1463            .map_err(|e| CliError::MigrationError(e.to_string()))?;
1464    }
1465
1466    client
1467        .execute(&set.create_table_sql(), &[])
1468        .map_err(|e| CliError::MigrationError(e.to_string()))?;
1469
1470    let rows = client
1471        .query(&set.query_all_hashes_sql(), &[])
1472        .unwrap_or_default();
1473    let applied_hashes: Vec<String> = rows.iter().map(|r| r.get(0)).collect();
1474
1475    validate_init_metadata(&applied_hashes, set)?;
1476
1477    let first = set
1478        .all()
1479        .first()
1480        .ok_or_else(|| CliError::Other("--init can't be used with empty migrations".into()))?;
1481
1482    client
1483        .execute(
1484            &set.record_migration_sql(first.hash(), first.created_at()),
1485            &[],
1486        )
1487        .map_err(|e| CliError::MigrationError(e.to_string()))?;
1488
1489    Ok(())
1490}
1491
1492#[cfg(feature = "tokio-postgres")]
1493fn init_postgres_async_metadata(creds: &PostgresCreds, set: &MigrationSet) -> Result<(), CliError> {
1494    let rt = tokio::runtime::Builder::new_current_thread()
1495        .enable_all()
1496        .build()
1497        .map_err(|e| CliError::Other(format!("Failed to create async runtime: {}", e)))?;
1498
1499    rt.block_on(init_postgres_async_inner(creds, set))
1500}
1501
1502#[cfg(feature = "tokio-postgres")]
1503async fn init_postgres_async_inner(
1504    creds: &PostgresCreds,
1505    set: &MigrationSet,
1506) -> Result<(), CliError> {
1507    let url = creds.connection_url();
1508    let (client, connection) = tokio_postgres::connect(&url, tokio_postgres::NoTls)
1509        .await
1510        .map_err(|e| {
1511            CliError::ConnectionError(format!("Failed to connect to PostgreSQL: {}", e))
1512        })?;
1513
1514    tokio::spawn(async move {
1515        if let Err(e) = connection.await {
1516            eprintln!(
1517                "{}",
1518                output::err_line(&format!("PostgreSQL connection error: {e}"))
1519            );
1520        }
1521    });
1522
1523    if let Some(schema_sql) = set.create_schema_sql() {
1524        client
1525            .execute(&schema_sql, &[])
1526            .await
1527            .map_err(|e| CliError::MigrationError(e.to_string()))?;
1528    }
1529
1530    client
1531        .execute(&set.create_table_sql(), &[])
1532        .await
1533        .map_err(|e| CliError::MigrationError(e.to_string()))?;
1534
1535    let rows = client
1536        .query(&set.query_all_hashes_sql(), &[])
1537        .await
1538        .unwrap_or_default();
1539    let applied_hashes: Vec<String> = rows.iter().map(|r| r.get(0)).collect();
1540
1541    validate_init_metadata(&applied_hashes, set)?;
1542
1543    let first = set
1544        .all()
1545        .first()
1546        .ok_or_else(|| CliError::Other("--init can't be used with empty migrations".into()))?;
1547
1548    client
1549        .execute(
1550            &set.record_migration_sql(first.hash(), first.created_at()),
1551            &[],
1552        )
1553        .await
1554        .map_err(|e| CliError::MigrationError(e.to_string()))?;
1555
1556    Ok(())
1557}
1558
1559/// Generate migration SQL from snapshot diff (for introspection)
1560fn generate_introspect_migration(
1561    prev: &Snapshot,
1562    current: &Snapshot,
1563    _breakpoints: bool,
1564) -> Result<Vec<String>, CliError> {
1565    match (prev, current) {
1566        (Snapshot::Sqlite(prev_snap), Snapshot::Sqlite(curr_snap)) => {
1567            use drizzle_migrations::sqlite::diff_snapshots;
1568            use drizzle_migrations::sqlite::statements::SqliteGenerator;
1569
1570            let diff = diff_snapshots(prev_snap, curr_snap);
1571            let generator = SqliteGenerator::new().with_breakpoints(true);
1572            Ok(generator.generate_migration(&diff))
1573        }
1574        (Snapshot::Postgres(prev_snap), Snapshot::Postgres(curr_snap)) => {
1575            use drizzle_migrations::postgres::diff_full_snapshots;
1576            use drizzle_migrations::postgres::statements::PostgresGenerator;
1577
1578            let diff = diff_full_snapshots(prev_snap, curr_snap);
1579            let generator = PostgresGenerator::new().with_breakpoints(true);
1580            Ok(generator.generate(&diff.diffs))
1581        }
1582        _ => Err(CliError::DialectMismatch),
1583    }
1584}
1585
1586/// Introspect a database and generate schema code
1587fn introspect_database(
1588    credentials: &Credentials,
1589    dialect: Dialect,
1590) -> Result<IntrospectResult, CliError> {
1591    match dialect {
1592        Dialect::Sqlite | Dialect::Turso => introspect_sqlite_dialect(credentials),
1593        Dialect::Postgresql => introspect_postgres_dialect(credentials),
1594    }
1595}
1596
1597/// Introspect SQLite-family databases
1598fn introspect_sqlite_dialect(credentials: &Credentials) -> Result<IntrospectResult, CliError> {
1599    match credentials {
1600        #[cfg(feature = "rusqlite")]
1601        Credentials::Sqlite { path } => introspect_rusqlite(path),
1602
1603        #[cfg(not(feature = "rusqlite"))]
1604        Credentials::Sqlite { .. } => Err(CliError::MissingDriver {
1605            dialect: "SQLite",
1606            feature: "rusqlite",
1607        }),
1608
1609        #[cfg(any(feature = "libsql", feature = "turso"))]
1610        Credentials::Turso { url, auth_token } => {
1611            let _auth_token = auth_token.as_deref();
1612            if is_local_libsql(url) {
1613                #[cfg(feature = "libsql")]
1614                {
1615                    introspect_libsql_local(url)
1616                }
1617                #[cfg(not(feature = "libsql"))]
1618                {
1619                    Err(CliError::MissingDriver {
1620                        dialect: "LibSQL (local)",
1621                        feature: "libsql",
1622                    })
1623                }
1624            } else {
1625                #[cfg(feature = "turso")]
1626                {
1627                    introspect_turso(url, _auth_token)
1628                }
1629                #[cfg(not(feature = "turso"))]
1630                {
1631                    Err(CliError::MissingDriver {
1632                        dialect: "Turso (remote)",
1633                        feature: "turso",
1634                    })
1635                }
1636            }
1637        }
1638
1639        #[cfg(all(not(feature = "turso"), not(feature = "libsql")))]
1640        Credentials::Turso { .. } => Err(CliError::MissingDriver {
1641            dialect: "Turso",
1642            feature: "turso or libsql",
1643        }),
1644
1645        _ => Err(CliError::Other(
1646            "SQLite introspection requires sqlite path or turso credentials".into(),
1647        )),
1648    }
1649}
1650
1651/// Introspect PostgreSQL databases
1652fn introspect_postgres_dialect(credentials: &Credentials) -> Result<IntrospectResult, CliError> {
1653    match credentials {
1654        #[cfg(feature = "postgres-sync")]
1655        Credentials::Postgres(creds) => introspect_postgres_sync(creds),
1656
1657        #[cfg(all(not(feature = "postgres-sync"), feature = "tokio-postgres"))]
1658        Credentials::Postgres(creds) => introspect_postgres_async(creds),
1659
1660        #[cfg(all(not(feature = "postgres-sync"), not(feature = "tokio-postgres")))]
1661        Credentials::Postgres(_) => Err(CliError::MissingDriver {
1662            dialect: "PostgreSQL",
1663            feature: "postgres-sync or tokio-postgres",
1664        }),
1665
1666        _ => Err(CliError::Other(
1667            "PostgreSQL introspection requires postgres credentials".into(),
1668        )),
1669    }
1670}
1671
1672// ============================================================================
1673// SQLite Introspection (rusqlite)
1674// ============================================================================
1675
1676#[cfg(feature = "rusqlite")]
1677fn introspect_rusqlite(path: &str) -> Result<IntrospectResult, CliError> {
1678    use drizzle_migrations::sqlite::{
1679        SQLiteDDL, Table, View,
1680        codegen::{CodegenOptions, generate_rust_schema},
1681        introspect::{
1682            RawColumnInfo, RawForeignKey, RawIndexColumn, RawIndexInfo, RawViewInfo,
1683            parse_generated_columns_from_table_sql, parse_view_sql, process_columns,
1684            process_foreign_keys, process_indexes, queries,
1685        },
1686    };
1687    use std::collections::{HashMap, HashSet};
1688
1689    let conn = rusqlite::Connection::open(path).map_err(|e| {
1690        CliError::ConnectionError(format!("Failed to open SQLite database '{}': {}", path, e))
1691    })?;
1692
1693    // Query tables
1694    let mut tables_stmt = conn
1695        .prepare(queries::TABLES_QUERY)
1696        .map_err(|e| CliError::Other(format!("Failed to prepare tables query: {}", e)))?;
1697
1698    let tables: Vec<(String, Option<String>)> = tables_stmt
1699        .query_map([], |row| Ok((row.get(0)?, row.get(1)?)))
1700        .map_err(|e| CliError::Other(e.to_string()))?
1701        .filter_map(Result::ok)
1702        .collect();
1703
1704    let table_sql_map: HashMap<String, String> = tables
1705        .iter()
1706        .filter_map(|(name, sql)| sql.as_ref().map(|s| (name.clone(), s.clone())))
1707        .collect();
1708
1709    // Query columns
1710    let mut columns_stmt = conn
1711        .prepare(queries::COLUMNS_QUERY)
1712        .map_err(|e| CliError::Other(format!("Failed to prepare columns query: {}", e)))?;
1713
1714    let mut raw_columns: Vec<RawColumnInfo> = columns_stmt
1715        .query_map([], |row| {
1716            Ok(RawColumnInfo {
1717                table: row.get(0)?,
1718                cid: row.get(1)?,
1719                name: row.get(2)?,
1720                column_type: row.get(3)?,
1721                not_null: row.get(4)?,
1722                default_value: row.get(5)?,
1723                pk: row.get(6)?,
1724                hidden: row.get(7)?,
1725                sql: row.get(8)?,
1726            })
1727        })
1728        .map_err(|e| CliError::Other(e.to_string()))?
1729        .filter_map(Result::ok)
1730        .collect();
1731
1732    // Query indexes and foreign keys for each table
1733    let mut all_indexes: Vec<RawIndexInfo> = Vec::new();
1734    let mut all_index_columns: Vec<RawIndexColumn> = Vec::new();
1735    let mut all_fks: Vec<RawForeignKey> = Vec::new();
1736    let mut all_views: Vec<RawViewInfo> = Vec::new();
1737
1738    for (table_name, _) in &tables {
1739        // Indexes
1740        if let Ok(mut idx_stmt) = conn.prepare(&queries::indexes_query(table_name)) {
1741            let indexes: Vec<RawIndexInfo> = idx_stmt
1742                .query_map([], |row| {
1743                    Ok(RawIndexInfo {
1744                        table: table_name.clone(),
1745                        name: row.get(1)?,
1746                        unique: row.get::<_, i32>(2)? != 0,
1747                        origin: row.get(3)?,
1748                        partial: row.get::<_, i32>(4)? != 0,
1749                    })
1750                })
1751                .map_err(|e| CliError::Other(e.to_string()))?
1752                .filter_map(Result::ok)
1753                .collect();
1754
1755            // Index columns
1756            for idx in &indexes {
1757                if let Ok(mut col_stmt) = conn.prepare(&queries::index_info_query(&idx.name))
1758                    && let Ok(col_iter) = col_stmt.query_map([], |row| {
1759                        Ok(RawIndexColumn {
1760                            index_name: idx.name.clone(),
1761                            seqno: row.get(0)?,
1762                            cid: row.get(1)?,
1763                            name: row.get(2)?,
1764                            desc: row.get::<_, i32>(3)? != 0,
1765                            coll: row.get(4)?,
1766                            key: row.get::<_, i32>(5)? != 0,
1767                        })
1768                    })
1769                {
1770                    all_index_columns.extend(col_iter.filter_map(Result::ok));
1771                }
1772            }
1773            all_indexes.extend(indexes);
1774        }
1775
1776        // Foreign keys
1777        if let Ok(mut fk_stmt) = conn.prepare(&queries::foreign_keys_query(table_name))
1778            && let Ok(fk_iter) = fk_stmt.query_map([], |row| {
1779                Ok(RawForeignKey {
1780                    table: table_name.clone(),
1781                    id: row.get(0)?,
1782                    seq: row.get(1)?,
1783                    to_table: row.get(2)?,
1784                    from_column: row.get(3)?,
1785                    to_column: row.get(4)?,
1786                    on_update: row.get(5)?,
1787                    on_delete: row.get(6)?,
1788                    r#match: row.get(7)?,
1789                })
1790            })
1791        {
1792            all_fks.extend(fk_iter.filter_map(Result::ok));
1793        }
1794    }
1795
1796    // Views
1797    if let Ok(mut views_stmt) = conn.prepare(queries::VIEWS_QUERY)
1798        && let Ok(view_iter) = views_stmt.query_map([], |row| {
1799            Ok(RawViewInfo {
1800                name: row.get(0)?,
1801                sql: row.get(1)?,
1802            })
1803        })
1804    {
1805        all_views.extend(view_iter.filter_map(Result::ok));
1806    }
1807
1808    // View columns (for codegen with column fields)
1809    if let Ok(mut view_cols_stmt) = conn.prepare(queries::VIEW_COLUMNS_QUERY)
1810        && let Ok(col_iter) = view_cols_stmt.query_map([], |row| {
1811            Ok(RawColumnInfo {
1812                table: row.get(0)?,
1813                cid: row.get(1)?,
1814                name: row.get(2)?,
1815                column_type: row.get(3)?,
1816                not_null: row.get::<_, i32>(4)? != 0,
1817                default_value: row.get(5)?,
1818                pk: row.get(6)?,
1819                hidden: row.get(7)?,
1820                sql: row.get(8)?,
1821            })
1822        })
1823    {
1824        raw_columns.extend(col_iter.filter_map(Result::ok));
1825    }
1826
1827    // Process raw data into DDL entities
1828    let mut generated_columns: HashMap<String, drizzle_migrations::sqlite::ddl::ParsedGenerated> =
1829        HashMap::new();
1830    for (table, sql) in &table_sql_map {
1831        generated_columns.extend(parse_generated_columns_from_table_sql(table, sql));
1832    }
1833    let pk_columns: HashSet<(String, String)> = raw_columns
1834        .iter()
1835        .filter(|c| c.pk > 0)
1836        .map(|c| (c.table.clone(), c.name.clone()))
1837        .collect();
1838
1839    let (columns, primary_keys) = process_columns(&raw_columns, &generated_columns, &pk_columns);
1840    let indexes = process_indexes(&all_indexes, &all_index_columns, &table_sql_map);
1841    let foreign_keys = process_foreign_keys(&all_fks);
1842
1843    // Unique constraints (origin == 'u' indexes)
1844    let uniques = process_sqlite_uniques_from_indexes(&all_indexes, &all_index_columns);
1845
1846    // Build DDL collection
1847    let mut ddl = SQLiteDDL::new();
1848
1849    for (table_name, table_sql) in &tables {
1850        let mut table = Table::new(table_name.clone());
1851        // Parse table options from SQL if available
1852        if let Some(sql) = table_sql {
1853            let sql_upper = sql.to_uppercase();
1854            table.strict = sql_upper.contains(" STRICT");
1855            table.without_rowid = sql_upper.contains("WITHOUT ROWID");
1856        }
1857        ddl.tables.push(table);
1858    }
1859
1860    for col in columns {
1861        ddl.columns.push(col);
1862    }
1863
1864    for idx in indexes {
1865        ddl.indexes.push(idx);
1866    }
1867
1868    for fk in foreign_keys {
1869        ddl.fks.push(fk);
1870    }
1871
1872    for pk in primary_keys {
1873        ddl.pks.push(pk);
1874    }
1875
1876    for u in uniques {
1877        ddl.uniques.push(u);
1878    }
1879
1880    // Views
1881    for v in all_views {
1882        let mut view = View::new(v.name);
1883        if let Some(def) = parse_view_sql(&v.sql) {
1884            view.definition = Some(def.into());
1885        } else {
1886            view.error = Some("Failed to parse view SQL".into());
1887        }
1888        ddl.views.push(view);
1889    }
1890
1891    // Generate Rust code
1892    let options = CodegenOptions {
1893        module_doc: Some(format!("Schema introspected from {}", path)),
1894        include_schema: true,
1895        schema_name: "Schema".to_string(),
1896        use_pub: true,
1897    };
1898
1899    let generated = generate_rust_schema(&ddl, &options);
1900
1901    // Create snapshot from DDL
1902    let mut sqlite_snapshot = drizzle_migrations::sqlite::SQLiteSnapshot::new();
1903    for entity in ddl.to_entities() {
1904        sqlite_snapshot.add_entity(entity);
1905    }
1906    let snapshot = Snapshot::Sqlite(sqlite_snapshot);
1907
1908    Ok(IntrospectResult {
1909        schema_code: generated.code,
1910        table_count: generated.tables.len(),
1911        index_count: generated.indexes.len(),
1912        view_count: ddl.views.len(),
1913        warnings: generated.warnings,
1914        snapshot,
1915        snapshot_path: std::path::PathBuf::new(),
1916    })
1917}
1918
1919// ============================================================================
1920// LibSQL Introspection (local)
1921// ============================================================================
1922
1923#[cfg(feature = "libsql")]
1924fn introspect_libsql_local(path: &str) -> Result<IntrospectResult, CliError> {
1925    let rt = tokio::runtime::Builder::new_current_thread()
1926        .enable_all()
1927        .build()
1928        .map_err(|e| CliError::Other(format!("Failed to create async runtime: {}", e)))?;
1929
1930    rt.block_on(introspect_libsql_inner(path, None))
1931}
1932
1933#[cfg(feature = "libsql")]
1934async fn introspect_libsql_inner(
1935    path: &str,
1936    _auth_token: Option<&str>,
1937) -> Result<IntrospectResult, CliError> {
1938    use drizzle_migrations::sqlite::{
1939        SQLiteDDL, Table, View,
1940        codegen::{CodegenOptions, generate_rust_schema},
1941        introspect::{
1942            RawColumnInfo, RawForeignKey, RawIndexColumn, RawIndexInfo, RawViewInfo,
1943            parse_generated_columns_from_table_sql, parse_view_sql, process_columns,
1944            process_foreign_keys, process_indexes, queries,
1945        },
1946    };
1947    use std::collections::{HashMap, HashSet};
1948
1949    let db = libsql::Builder::new_local(path)
1950        .build()
1951        .await
1952        .map_err(|e| {
1953            CliError::ConnectionError(format!("Failed to open LibSQL database '{}': {}", path, e))
1954        })?;
1955
1956    let conn = db
1957        .connect()
1958        .map_err(|e| CliError::ConnectionError(e.to_string()))?;
1959
1960    // Query tables
1961    let mut tables_rows = conn
1962        .query(queries::TABLES_QUERY, ())
1963        .await
1964        .map_err(|e| CliError::Other(format!("Failed to query tables: {}", e)))?;
1965
1966    let mut tables: Vec<(String, Option<String>)> = Vec::new();
1967    while let Ok(Some(row)) = tables_rows.next().await {
1968        let name: String = row.get(0).unwrap_or_default();
1969        let sql: Option<String> = row.get(1).ok();
1970        tables.push((name, sql));
1971    }
1972
1973    let table_sql_map: HashMap<String, String> = tables
1974        .iter()
1975        .filter_map(|(name, sql)| sql.as_ref().map(|s| (name.clone(), s.clone())))
1976        .collect();
1977
1978    // Query columns
1979    let mut columns_rows = conn
1980        .query(queries::COLUMNS_QUERY, ())
1981        .await
1982        .map_err(|e| CliError::Other(format!("Failed to query columns: {}", e)))?;
1983
1984    let mut raw_columns: Vec<RawColumnInfo> = Vec::new();
1985    while let Ok(Some(row)) = columns_rows.next().await {
1986        raw_columns.push(RawColumnInfo {
1987            table: row.get(0).unwrap_or_default(),
1988            cid: row.get(1).unwrap_or(0),
1989            name: row.get(2).unwrap_or_default(),
1990            column_type: row.get(3).unwrap_or_default(),
1991            not_null: row.get::<i32>(4).unwrap_or(0) != 0,
1992            default_value: row.get(5).ok(),
1993            pk: row.get(6).unwrap_or(0),
1994            hidden: row.get(7).unwrap_or(0),
1995            sql: row.get(8).ok(),
1996        });
1997    }
1998
1999    // Query indexes and foreign keys
2000    let mut all_indexes: Vec<RawIndexInfo> = Vec::new();
2001    let mut all_index_columns: Vec<RawIndexColumn> = Vec::new();
2002    let mut all_fks: Vec<RawForeignKey> = Vec::new();
2003    let mut all_views: Vec<RawViewInfo> = Vec::new();
2004
2005    for (table_name, _) in &tables {
2006        // Indexes
2007        if let Ok(mut idx_rows) = conn.query(&queries::indexes_query(table_name), ()).await {
2008            while let Ok(Some(row)) = idx_rows.next().await {
2009                let idx = RawIndexInfo {
2010                    table: table_name.clone(),
2011                    name: row.get(1).unwrap_or_default(),
2012                    unique: row.get::<i32>(2).unwrap_or(0) != 0,
2013                    origin: row.get(3).unwrap_or_default(),
2014                    partial: row.get::<i32>(4).unwrap_or(0) != 0,
2015                };
2016
2017                // Index columns
2018                if let Ok(mut col_rows) =
2019                    conn.query(&queries::index_info_query(&idx.name), ()).await
2020                {
2021                    while let Ok(Some(col_row)) = col_rows.next().await {
2022                        all_index_columns.push(RawIndexColumn {
2023                            index_name: idx.name.clone(),
2024                            seqno: col_row.get(0).unwrap_or(0),
2025                            cid: col_row.get(1).unwrap_or(0),
2026                            name: col_row.get(2).ok(),
2027                            desc: col_row.get::<i32>(3).unwrap_or(0) != 0,
2028                            coll: col_row.get(4).unwrap_or_default(),
2029                            key: col_row.get::<i32>(5).unwrap_or(0) != 0,
2030                        });
2031                    }
2032                }
2033
2034                all_indexes.push(idx);
2035            }
2036        }
2037
2038        // Foreign keys
2039        if let Ok(mut fk_rows) = conn
2040            .query(&queries::foreign_keys_query(table_name), ())
2041            .await
2042        {
2043            while let Ok(Some(row)) = fk_rows.next().await {
2044                all_fks.push(RawForeignKey {
2045                    table: table_name.clone(),
2046                    id: row.get(0).unwrap_or(0),
2047                    seq: row.get(1).unwrap_or(0),
2048                    to_table: row.get(2).unwrap_or_default(),
2049                    from_column: row.get(3).unwrap_or_default(),
2050                    to_column: row.get(4).unwrap_or_default(),
2051                    on_update: row.get(5).unwrap_or_default(),
2052                    on_delete: row.get(6).unwrap_or_default(),
2053                    r#match: row.get(7).unwrap_or_default(),
2054                });
2055            }
2056        }
2057    }
2058
2059    // Views
2060    if let Ok(mut views_rows) = conn.query(queries::VIEWS_QUERY, ()).await {
2061        while let Ok(Some(row)) = views_rows.next().await {
2062            let name: String = row.get(0).unwrap_or_default();
2063            let sql: String = row.get(1).unwrap_or_default();
2064            all_views.push(RawViewInfo { name, sql });
2065        }
2066    }
2067
2068    // View columns (for codegen with column fields)
2069    if let Ok(mut view_cols_rows) = conn.query(queries::VIEW_COLUMNS_QUERY, ()).await {
2070        while let Ok(Some(row)) = view_cols_rows.next().await {
2071            raw_columns.push(RawColumnInfo {
2072                table: row.get(0).unwrap_or_default(),
2073                cid: row.get(1).unwrap_or(0),
2074                name: row.get(2).unwrap_or_default(),
2075                column_type: row.get(3).unwrap_or_default(),
2076                not_null: row.get::<i32>(4).unwrap_or(0) != 0,
2077                default_value: row.get(5).ok(),
2078                pk: row.get(6).unwrap_or(0),
2079                hidden: row.get(7).unwrap_or(0),
2080                sql: row.get(8).ok(),
2081            });
2082        }
2083    }
2084
2085    // Process into DDL
2086    let mut generated_columns: HashMap<String, drizzle_migrations::sqlite::ddl::ParsedGenerated> =
2087        HashMap::new();
2088    for (table, sql) in &table_sql_map {
2089        generated_columns.extend(parse_generated_columns_from_table_sql(table, sql));
2090    }
2091    let pk_columns: HashSet<(String, String)> = raw_columns
2092        .iter()
2093        .filter(|c| c.pk > 0)
2094        .map(|c| (c.table.clone(), c.name.clone()))
2095        .collect();
2096
2097    let (columns, primary_keys) = process_columns(&raw_columns, &generated_columns, &pk_columns);
2098    let indexes = process_indexes(&all_indexes, &all_index_columns, &table_sql_map);
2099    let foreign_keys = process_foreign_keys(&all_fks);
2100    let uniques = process_sqlite_uniques_from_indexes(&all_indexes, &all_index_columns);
2101
2102    let mut ddl = SQLiteDDL::new();
2103
2104    for (table_name, table_sql) in &tables {
2105        let mut table = Table::new(table_name.clone());
2106        if let Some(sql) = table_sql {
2107            let sql_upper = sql.to_uppercase();
2108            table.strict = sql_upper.contains(" STRICT");
2109            table.without_rowid = sql_upper.contains("WITHOUT ROWID");
2110        }
2111        ddl.tables.push(table);
2112    }
2113
2114    for col in columns {
2115        ddl.columns.push(col);
2116    }
2117    for idx in indexes {
2118        ddl.indexes.push(idx);
2119    }
2120    for fk in foreign_keys {
2121        ddl.fks.push(fk);
2122    }
2123    for pk in primary_keys {
2124        ddl.pks.push(pk);
2125    }
2126
2127    for u in uniques {
2128        ddl.uniques.push(u);
2129    }
2130
2131    for v in all_views {
2132        let mut view = View::new(v.name);
2133        if let Some(def) = parse_view_sql(&v.sql) {
2134            view.definition = Some(def.into());
2135        } else {
2136            view.error = Some("Failed to parse view SQL".into());
2137        }
2138        ddl.views.push(view);
2139    }
2140
2141    let options = CodegenOptions {
2142        module_doc: Some(format!("Schema introspected from {}", path)),
2143        include_schema: true,
2144        schema_name: "Schema".to_string(),
2145        use_pub: true,
2146    };
2147
2148    let generated = generate_rust_schema(&ddl, &options);
2149
2150    // Create snapshot from DDL
2151    let mut sqlite_snapshot = drizzle_migrations::sqlite::SQLiteSnapshot::new();
2152    for entity in ddl.to_entities() {
2153        sqlite_snapshot.add_entity(entity);
2154    }
2155    let snapshot = Snapshot::Sqlite(sqlite_snapshot);
2156
2157    Ok(IntrospectResult {
2158        schema_code: generated.code,
2159        table_count: generated.tables.len(),
2160        index_count: generated.indexes.len(),
2161        view_count: ddl.views.len(),
2162        warnings: generated.warnings,
2163        snapshot,
2164        snapshot_path: std::path::PathBuf::new(),
2165    })
2166}
2167
2168// ============================================================================
2169// Turso Introspection (remote)
2170// ============================================================================
2171
2172#[cfg(feature = "turso")]
2173fn introspect_turso(url: &str, auth_token: Option<&str>) -> Result<IntrospectResult, CliError> {
2174    let rt = tokio::runtime::Builder::new_current_thread()
2175        .enable_all()
2176        .build()
2177        .map_err(|e| CliError::Other(format!("Failed to create async runtime: {}", e)))?;
2178
2179    rt.block_on(introspect_turso_inner(url, auth_token))
2180}
2181
2182#[cfg(feature = "turso")]
2183async fn introspect_turso_inner(
2184    url: &str,
2185    auth_token: Option<&str>,
2186) -> Result<IntrospectResult, CliError> {
2187    use drizzle_migrations::sqlite::{
2188        SQLiteDDL, Table, View,
2189        codegen::{CodegenOptions, generate_rust_schema},
2190        introspect::{
2191            RawColumnInfo, RawForeignKey, RawIndexColumn, RawIndexInfo, RawViewInfo,
2192            parse_generated_columns_from_table_sql, parse_view_sql, process_columns,
2193            process_foreign_keys, process_indexes, queries,
2194        },
2195    };
2196    use std::collections::{HashMap, HashSet};
2197
2198    let builder =
2199        libsql::Builder::new_remote(url.to_string(), auth_token.unwrap_or("").to_string());
2200
2201    let db = builder.build().await.map_err(|e| {
2202        CliError::ConnectionError(format!("Failed to connect to Turso '{}': {}", url, e))
2203    })?;
2204
2205    let conn = db
2206        .connect()
2207        .map_err(|e| CliError::ConnectionError(e.to_string()))?;
2208
2209    // Query tables
2210    let mut tables_rows = conn
2211        .query(queries::TABLES_QUERY, ())
2212        .await
2213        .map_err(|e| CliError::Other(format!("Failed to query tables: {}", e)))?;
2214
2215    let mut tables: Vec<(String, Option<String>)> = Vec::new();
2216    while let Ok(Some(row)) = tables_rows.next().await {
2217        let name: String = row.get(0).unwrap_or_default();
2218        let sql: Option<String> = row.get(1).ok();
2219        tables.push((name, sql));
2220    }
2221
2222    let table_sql_map: HashMap<String, String> = tables
2223        .iter()
2224        .filter_map(|(name, sql)| sql.as_ref().map(|s| (name.clone(), s.clone())))
2225        .collect();
2226
2227    // Query columns
2228    let mut columns_rows = conn
2229        .query(queries::COLUMNS_QUERY, ())
2230        .await
2231        .map_err(|e| CliError::Other(format!("Failed to query columns: {}", e)))?;
2232
2233    let mut raw_columns: Vec<RawColumnInfo> = Vec::new();
2234    while let Ok(Some(row)) = columns_rows.next().await {
2235        raw_columns.push(RawColumnInfo {
2236            table: row.get(0).unwrap_or_default(),
2237            cid: row.get(1).unwrap_or(0),
2238            name: row.get(2).unwrap_or_default(),
2239            column_type: row.get(3).unwrap_or_default(),
2240            not_null: row.get::<i32>(4).unwrap_or(0) != 0,
2241            default_value: row.get(5).ok(),
2242            pk: row.get(6).unwrap_or(0),
2243            hidden: row.get(7).unwrap_or(0),
2244            sql: row.get(8).ok(),
2245        });
2246    }
2247
2248    // Query indexes and foreign keys
2249    let mut all_indexes: Vec<RawIndexInfo> = Vec::new();
2250    let mut all_index_columns: Vec<RawIndexColumn> = Vec::new();
2251    let mut all_fks: Vec<RawForeignKey> = Vec::new();
2252    let mut all_views: Vec<RawViewInfo> = Vec::new();
2253
2254    for (table_name, _) in &tables {
2255        // Indexes
2256        if let Ok(mut idx_rows) = conn.query(&queries::indexes_query(table_name), ()).await {
2257            while let Ok(Some(row)) = idx_rows.next().await {
2258                let idx = RawIndexInfo {
2259                    table: table_name.clone(),
2260                    name: row.get(1).unwrap_or_default(),
2261                    unique: row.get::<i32>(2).unwrap_or(0) != 0,
2262                    origin: row.get(3).unwrap_or_default(),
2263                    partial: row.get::<i32>(4).unwrap_or(0) != 0,
2264                };
2265
2266                // Index columns
2267                if let Ok(mut col_rows) =
2268                    conn.query(&queries::index_info_query(&idx.name), ()).await
2269                {
2270                    while let Ok(Some(col_row)) = col_rows.next().await {
2271                        all_index_columns.push(RawIndexColumn {
2272                            index_name: idx.name.clone(),
2273                            seqno: col_row.get(0).unwrap_or(0),
2274                            cid: col_row.get(1).unwrap_or(0),
2275                            name: col_row.get(2).ok(),
2276                            desc: col_row.get::<i32>(3).unwrap_or(0) != 0,
2277                            coll: col_row.get(4).unwrap_or_default(),
2278                            key: col_row.get::<i32>(5).unwrap_or(0) != 0,
2279                        });
2280                    }
2281                }
2282
2283                all_indexes.push(idx);
2284            }
2285        }
2286
2287        // Foreign keys
2288        if let Ok(mut fk_rows) = conn
2289            .query(&queries::foreign_keys_query(table_name), ())
2290            .await
2291        {
2292            while let Ok(Some(row)) = fk_rows.next().await {
2293                all_fks.push(RawForeignKey {
2294                    table: table_name.clone(),
2295                    id: row.get(0).unwrap_or(0),
2296                    seq: row.get(1).unwrap_or(0),
2297                    to_table: row.get(2).unwrap_or_default(),
2298                    from_column: row.get(3).unwrap_or_default(),
2299                    to_column: row.get(4).unwrap_or_default(),
2300                    on_update: row.get(5).unwrap_or_default(),
2301                    on_delete: row.get(6).unwrap_or_default(),
2302                    r#match: row.get(7).unwrap_or_default(),
2303                });
2304            }
2305        }
2306    }
2307
2308    // Views
2309    if let Ok(mut views_rows) = conn.query(queries::VIEWS_QUERY, ()).await {
2310        while let Ok(Some(row)) = views_rows.next().await {
2311            let name: String = row.get(0).unwrap_or_default();
2312            let sql: String = row.get(1).unwrap_or_default();
2313            all_views.push(RawViewInfo { name, sql });
2314        }
2315    }
2316
2317    // View columns (for codegen with column fields)
2318    if let Ok(mut view_cols_rows) = conn.query(queries::VIEW_COLUMNS_QUERY, ()).await {
2319        while let Ok(Some(row)) = view_cols_rows.next().await {
2320            raw_columns.push(RawColumnInfo {
2321                table: row.get(0).unwrap_or_default(),
2322                cid: row.get(1).unwrap_or(0),
2323                name: row.get(2).unwrap_or_default(),
2324                column_type: row.get(3).unwrap_or_default(),
2325                not_null: row.get::<i32>(4).unwrap_or(0) != 0,
2326                default_value: row.get(5).ok(),
2327                pk: row.get(6).unwrap_or(0),
2328                hidden: row.get(7).unwrap_or(0),
2329                sql: row.get(8).ok(),
2330            });
2331        }
2332    }
2333
2334    // Process into DDL
2335    let mut generated_columns: HashMap<String, drizzle_migrations::sqlite::ddl::ParsedGenerated> =
2336        HashMap::new();
2337    for (table, sql) in &table_sql_map {
2338        generated_columns.extend(parse_generated_columns_from_table_sql(table, sql));
2339    }
2340    let pk_columns: HashSet<(String, String)> = raw_columns
2341        .iter()
2342        .filter(|c| c.pk > 0)
2343        .map(|c| (c.table.clone(), c.name.clone()))
2344        .collect();
2345
2346    let (columns, primary_keys) = process_columns(&raw_columns, &generated_columns, &pk_columns);
2347    let indexes = process_indexes(&all_indexes, &all_index_columns, &table_sql_map);
2348    let foreign_keys = process_foreign_keys(&all_fks);
2349    let uniques = process_sqlite_uniques_from_indexes(&all_indexes, &all_index_columns);
2350
2351    let mut ddl = SQLiteDDL::new();
2352
2353    for (table_name, table_sql) in &tables {
2354        let mut table = Table::new(table_name.clone());
2355        if let Some(sql) = table_sql {
2356            let sql_upper = sql.to_uppercase();
2357            table.strict = sql_upper.contains(" STRICT");
2358            table.without_rowid = sql_upper.contains("WITHOUT ROWID");
2359        }
2360        ddl.tables.push(table);
2361    }
2362
2363    for col in columns {
2364        ddl.columns.push(col);
2365    }
2366    for idx in indexes {
2367        ddl.indexes.push(idx);
2368    }
2369    for fk in foreign_keys {
2370        ddl.fks.push(fk);
2371    }
2372    for pk in primary_keys {
2373        ddl.pks.push(pk);
2374    }
2375
2376    for u in uniques {
2377        ddl.uniques.push(u);
2378    }
2379
2380    for v in all_views {
2381        let mut view = View::new(v.name);
2382        if let Some(def) = parse_view_sql(&v.sql) {
2383            view.definition = Some(def.into());
2384        } else {
2385            view.error = Some("Failed to parse view SQL".into());
2386        }
2387        ddl.views.push(view);
2388    }
2389
2390    let options = CodegenOptions {
2391        module_doc: Some(format!("Schema introspected from Turso: {}", url)),
2392        include_schema: true,
2393        schema_name: "Schema".to_string(),
2394        use_pub: true,
2395    };
2396
2397    let generated = generate_rust_schema(&ddl, &options);
2398
2399    // Create snapshot from DDL
2400    let mut sqlite_snapshot = drizzle_migrations::sqlite::SQLiteSnapshot::new();
2401    for entity in ddl.to_entities() {
2402        sqlite_snapshot.add_entity(entity);
2403    }
2404    let snapshot = Snapshot::Sqlite(sqlite_snapshot);
2405
2406    Ok(IntrospectResult {
2407        schema_code: generated.code,
2408        table_count: generated.tables.len(),
2409        index_count: generated.indexes.len(),
2410        view_count: ddl.views.len(),
2411        warnings: generated.warnings,
2412        snapshot,
2413        snapshot_path: std::path::PathBuf::new(),
2414    })
2415}
2416
2417// ============================================================================
2418// PostgreSQL Introspection
2419// ============================================================================
2420
2421#[cfg(feature = "postgres-sync")]
2422fn introspect_postgres_sync(creds: &PostgresCreds) -> Result<IntrospectResult, CliError> {
2423    use drizzle_migrations::postgres::{
2424        PostgresDDL,
2425        codegen::{CodegenOptions, generate_rust_schema},
2426        ddl::Schema,
2427        introspect::{
2428            RawCheckInfo, RawColumnInfo, RawEnumInfo, RawForeignKeyInfo, RawIndexInfo,
2429            RawPolicyInfo, RawPrimaryKeyInfo, RawRoleInfo, RawSequenceInfo, RawTableInfo,
2430            RawUniqueInfo, RawViewInfo, process_check_constraints, process_columns, process_enums,
2431            process_foreign_keys, process_indexes, process_policies, process_primary_keys,
2432            process_roles, process_sequences, process_tables, process_unique_constraints,
2433            process_views,
2434        },
2435    };
2436
2437    let url = creds.connection_url();
2438    let mut client = postgres::Client::connect(&url, postgres::NoTls).map_err(|e| {
2439        CliError::ConnectionError(format!("Failed to connect to PostgreSQL: {}", e))
2440    })?;
2441
2442    // Schemas
2443    let raw_schemas: Vec<RawSchemaInfo> = client
2444        .query(
2445            drizzle_migrations::postgres::introspect::queries::SCHEMAS_QUERY,
2446            &[],
2447        )
2448        .map_err(|e| CliError::Other(format!("Failed to query schemas: {}", e)))?
2449        .into_iter()
2450        .map(|row| RawSchemaInfo {
2451            name: row.get::<_, String>(0),
2452        })
2453        .collect();
2454
2455    // Tables
2456    let raw_tables: Vec<RawTableInfo> = client
2457        .query(
2458            drizzle_migrations::postgres::introspect::queries::TABLES_QUERY,
2459            &[],
2460        )
2461        .map_err(|e| CliError::Other(format!("Failed to query tables: {}", e)))?
2462        .into_iter()
2463        .map(|row| RawTableInfo {
2464            schema: row.get::<_, String>(0),
2465            name: row.get::<_, String>(1),
2466            is_rls_enabled: row.get::<_, bool>(2),
2467        })
2468        .collect();
2469
2470    // Columns
2471    let raw_columns: Vec<RawColumnInfo> = client
2472        .query(
2473            drizzle_migrations::postgres::introspect::queries::COLUMNS_QUERY,
2474            &[],
2475        )
2476        .map_err(|e| CliError::Other(format!("Failed to query columns: {}", e)))?
2477        .into_iter()
2478        .map(|row| RawColumnInfo {
2479            schema: row.get::<_, String>(0),
2480            table: row.get::<_, String>(1),
2481            name: row.get::<_, String>(2),
2482            column_type: row.get::<_, String>(3),
2483            type_schema: row.get::<_, Option<String>>(4),
2484            not_null: row.get::<_, bool>(5),
2485            default_value: row.get::<_, Option<String>>(6),
2486            is_identity: row.get::<_, bool>(7),
2487            identity_type: row.get::<_, Option<String>>(8),
2488            is_generated: row.get::<_, bool>(9),
2489            generated_expression: row.get::<_, Option<String>>(10),
2490            ordinal_position: row.get::<_, i32>(11),
2491        })
2492        .collect();
2493
2494    // Enums
2495    let raw_enums: Vec<RawEnumInfo> = client
2496        .query(
2497            drizzle_migrations::postgres::introspect::queries::ENUMS_QUERY,
2498            &[],
2499        )
2500        .map_err(|e| CliError::Other(format!("Failed to query enums: {}", e)))?
2501        .into_iter()
2502        .map(|row| RawEnumInfo {
2503            schema: row.get::<_, String>(0),
2504            name: row.get::<_, String>(1),
2505            values: row.get::<_, Vec<String>>(2),
2506        })
2507        .collect();
2508
2509    // Sequences
2510    let raw_sequences: Vec<RawSequenceInfo> = client
2511        .query(
2512            drizzle_migrations::postgres::introspect::queries::SEQUENCES_QUERY,
2513            &[],
2514        )
2515        .map_err(|e| CliError::Other(format!("Failed to query sequences: {}", e)))?
2516        .into_iter()
2517        .map(|row| RawSequenceInfo {
2518            schema: row.get::<_, String>(0),
2519            name: row.get::<_, String>(1),
2520            data_type: row.get::<_, String>(2),
2521            start_value: row.get::<_, String>(3),
2522            min_value: row.get::<_, String>(4),
2523            max_value: row.get::<_, String>(5),
2524            increment: row.get::<_, String>(6),
2525            cycle: row.get::<_, bool>(7),
2526            cache_value: row.get::<_, String>(8),
2527        })
2528        .collect();
2529
2530    // Views
2531    let raw_views: Vec<RawViewInfo> = client
2532        .query(
2533            drizzle_migrations::postgres::introspect::queries::VIEWS_QUERY,
2534            &[],
2535        )
2536        .map_err(|e| CliError::Other(format!("Failed to query views: {}", e)))?
2537        .into_iter()
2538        .map(|row| RawViewInfo {
2539            schema: row.get::<_, String>(0),
2540            name: row.get::<_, String>(1),
2541            definition: row.get::<_, String>(2),
2542            is_materialized: row.get::<_, bool>(3),
2543        })
2544        .collect();
2545
2546    // Indexes (custom query; drizzle_migrations provides processing types but not SQL)
2547    let raw_indexes: Vec<RawIndexInfo> = client
2548        .query(POSTGRES_INDEXES_QUERY, &[])
2549        .map_err(|e| CliError::Other(format!("Failed to query indexes: {}", e)))?
2550        .into_iter()
2551        .map(|row| {
2552            let cols: Vec<String> = row.get(6);
2553            RawIndexInfo {
2554                schema: row.get::<_, String>(0),
2555                table: row.get::<_, String>(1),
2556                name: row.get::<_, String>(2),
2557                is_unique: row.get::<_, bool>(3),
2558                is_primary: row.get::<_, bool>(4),
2559                method: row.get::<_, String>(5),
2560                columns: parse_postgres_index_columns(cols),
2561                where_clause: row.get::<_, Option<String>>(7),
2562                concurrent: false,
2563            }
2564        })
2565        .collect();
2566
2567    // Foreign keys
2568    let raw_fks: Vec<RawForeignKeyInfo> = client
2569        .query(POSTGRES_FOREIGN_KEYS_QUERY, &[])
2570        .map_err(|e| CliError::Other(format!("Failed to query foreign keys: {}", e)))?
2571        .into_iter()
2572        .map(|row| RawForeignKeyInfo {
2573            schema: row.get::<_, String>(0),
2574            table: row.get::<_, String>(1),
2575            name: row.get::<_, String>(2),
2576            columns: row.get::<_, Vec<String>>(3),
2577            schema_to: row.get::<_, String>(4),
2578            table_to: row.get::<_, String>(5),
2579            columns_to: row.get::<_, Vec<String>>(6),
2580            on_update: pg_action_code_to_string(row.get::<_, String>(7)),
2581            on_delete: pg_action_code_to_string(row.get::<_, String>(8)),
2582        })
2583        .collect();
2584
2585    // Primary keys
2586    let raw_pks: Vec<RawPrimaryKeyInfo> = client
2587        .query(POSTGRES_PRIMARY_KEYS_QUERY, &[])
2588        .map_err(|e| CliError::Other(format!("Failed to query primary keys: {}", e)))?
2589        .into_iter()
2590        .map(|row| RawPrimaryKeyInfo {
2591            schema: row.get::<_, String>(0),
2592            table: row.get::<_, String>(1),
2593            name: row.get::<_, String>(2),
2594            columns: row.get::<_, Vec<String>>(3),
2595        })
2596        .collect();
2597
2598    // Unique constraints
2599    let raw_uniques: Vec<RawUniqueInfo> = client
2600        .query(POSTGRES_UNIQUES_QUERY, &[])
2601        .map_err(|e| CliError::Other(format!("Failed to query unique constraints: {}", e)))?
2602        .into_iter()
2603        .map(|row| RawUniqueInfo {
2604            schema: row.get::<_, String>(0),
2605            table: row.get::<_, String>(1),
2606            name: row.get::<_, String>(2),
2607            columns: row.get::<_, Vec<String>>(3),
2608            nulls_not_distinct: row.get::<_, bool>(4),
2609        })
2610        .collect();
2611
2612    // Check constraints
2613    let raw_checks: Vec<RawCheckInfo> = client
2614        .query(POSTGRES_CHECKS_QUERY, &[])
2615        .map_err(|e| CliError::Other(format!("Failed to query check constraints: {}", e)))?
2616        .into_iter()
2617        .map(|row| RawCheckInfo {
2618            schema: row.get::<_, String>(0),
2619            table: row.get::<_, String>(1),
2620            name: row.get::<_, String>(2),
2621            expression: row.get::<_, String>(3),
2622        })
2623        .collect();
2624
2625    // Roles (optional but useful for snapshot parity)
2626    let raw_roles: Vec<RawRoleInfo> = client
2627        .query(POSTGRES_ROLES_QUERY, &[])
2628        .map_err(|e| CliError::Other(format!("Failed to query roles: {}", e)))?
2629        .into_iter()
2630        .map(|row| RawRoleInfo {
2631            name: row.get::<_, String>(0),
2632            create_db: row.get::<_, bool>(1),
2633            create_role: row.get::<_, bool>(2),
2634            inherit: row.get::<_, bool>(3),
2635        })
2636        .collect();
2637
2638    // Policies
2639    let raw_policies: Vec<RawPolicyInfo> = client
2640        .query(POSTGRES_POLICIES_QUERY, &[])
2641        .map_err(|e| CliError::Other(format!("Failed to query policies: {}", e)))?
2642        .into_iter()
2643        .map(|row| RawPolicyInfo {
2644            schema: row.get::<_, String>(0),
2645            table: row.get::<_, String>(1),
2646            name: row.get::<_, String>(2),
2647            as_clause: row.get::<_, String>(3),
2648            for_clause: row.get::<_, String>(4),
2649            to: row.get::<_, Vec<String>>(5),
2650            using: row.get::<_, Option<String>>(6),
2651            with_check: row.get::<_, Option<String>>(7),
2652        })
2653        .collect();
2654
2655    // Process raw -> DDL entities
2656    let mut ddl = PostgresDDL::new();
2657
2658    for s in raw_schemas.into_iter().map(|s| Schema::new(s.name)) {
2659        ddl.schemas.push(s);
2660    }
2661    for e in process_enums(&raw_enums) {
2662        ddl.enums.push(e);
2663    }
2664    for s in process_sequences(&raw_sequences) {
2665        ddl.sequences.push(s);
2666    }
2667    for r in process_roles(&raw_roles) {
2668        ddl.roles.push(r);
2669    }
2670    for p in process_policies(&raw_policies) {
2671        ddl.policies.push(p);
2672    }
2673    for t in process_tables(&raw_tables) {
2674        ddl.tables.push(t);
2675    }
2676    for c in process_columns(&raw_columns) {
2677        ddl.columns.push(c);
2678    }
2679    for i in process_indexes(&raw_indexes) {
2680        ddl.indexes.push(i);
2681    }
2682    for fk in process_foreign_keys(&raw_fks) {
2683        ddl.fks.push(fk);
2684    }
2685    for pk in process_primary_keys(&raw_pks) {
2686        ddl.pks.push(pk);
2687    }
2688    for u in process_unique_constraints(&raw_uniques) {
2689        ddl.uniques.push(u);
2690    }
2691    for c in process_check_constraints(&raw_checks) {
2692        ddl.checks.push(c);
2693    }
2694    for v in process_views(&raw_views) {
2695        ddl.views.push(v);
2696    }
2697
2698    // Generate Rust schema code
2699    let options = CodegenOptions {
2700        module_doc: Some(format!("Schema introspected from {}", mask_url(&url))),
2701        include_schema: true,
2702        schema_name: "Schema".to_string(),
2703        use_pub: true,
2704    };
2705    let generated = generate_rust_schema(&ddl, &options);
2706
2707    // Build snapshot
2708    let mut snap = drizzle_migrations::postgres::PostgresSnapshot::new();
2709    for entity in ddl.to_entities() {
2710        snap.add_entity(entity);
2711    }
2712
2713    Ok(IntrospectResult {
2714        schema_code: generated.code,
2715        table_count: ddl.tables.list().len(),
2716        index_count: ddl.indexes.list().len(),
2717        view_count: ddl.views.list().len(),
2718        warnings: generated.warnings,
2719        snapshot: Snapshot::Postgres(snap),
2720        snapshot_path: std::path::PathBuf::new(),
2721    })
2722}
2723
2724#[cfg(feature = "tokio-postgres")]
2725#[allow(dead_code)]
2726fn introspect_postgres_async(creds: &PostgresCreds) -> Result<IntrospectResult, CliError> {
2727    let rt = tokio::runtime::Builder::new_current_thread()
2728        .enable_all()
2729        .build()
2730        .map_err(|e| CliError::Other(format!("Failed to create async runtime: {}", e)))?;
2731
2732    rt.block_on(introspect_postgres_async_inner(creds))
2733}
2734
2735#[cfg(feature = "tokio-postgres")]
2736async fn introspect_postgres_async_inner(
2737    creds: &PostgresCreds,
2738) -> Result<IntrospectResult, CliError> {
2739    use drizzle_migrations::postgres::{
2740        PostgresDDL,
2741        codegen::{CodegenOptions, generate_rust_schema},
2742        ddl::Schema,
2743        introspect::{
2744            RawCheckInfo, RawColumnInfo, RawEnumInfo, RawForeignKeyInfo, RawIndexInfo,
2745            RawPolicyInfo, RawPrimaryKeyInfo, RawRoleInfo, RawSequenceInfo, RawTableInfo,
2746            RawUniqueInfo, RawViewInfo, process_check_constraints, process_columns, process_enums,
2747            process_foreign_keys, process_indexes, process_policies, process_primary_keys,
2748            process_roles, process_sequences, process_tables, process_unique_constraints,
2749            process_views,
2750        },
2751    };
2752
2753    let url = creds.connection_url();
2754    let (client, connection) = tokio_postgres::connect(&url, tokio_postgres::NoTls)
2755        .await
2756        .map_err(|e| {
2757            CliError::ConnectionError(format!("Failed to connect to PostgreSQL: {}", e))
2758        })?;
2759
2760    tokio::spawn(async move {
2761        if let Err(e) = connection.await {
2762            eprintln!(
2763                "{}",
2764                output::err_line(&format!("PostgreSQL connection error: {e}"))
2765            );
2766        }
2767    });
2768
2769    let raw_schemas: Vec<RawSchemaInfo> = client
2770        .query(
2771            drizzle_migrations::postgres::introspect::queries::SCHEMAS_QUERY,
2772            &[],
2773        )
2774        .await
2775        .map_err(|e| CliError::Other(format!("Failed to query schemas: {}", e)))?
2776        .into_iter()
2777        .map(|row| RawSchemaInfo {
2778            name: row.get::<_, String>(0),
2779        })
2780        .collect();
2781
2782    let raw_tables: Vec<RawTableInfo> = client
2783        .query(
2784            drizzle_migrations::postgres::introspect::queries::TABLES_QUERY,
2785            &[],
2786        )
2787        .await
2788        .map_err(|e| CliError::Other(format!("Failed to query tables: {}", e)))?
2789        .into_iter()
2790        .map(|row| RawTableInfo {
2791            schema: row.get::<_, String>(0),
2792            name: row.get::<_, String>(1),
2793            is_rls_enabled: row.get::<_, bool>(2),
2794        })
2795        .collect();
2796
2797    let raw_columns: Vec<RawColumnInfo> = client
2798        .query(
2799            drizzle_migrations::postgres::introspect::queries::COLUMNS_QUERY,
2800            &[],
2801        )
2802        .await
2803        .map_err(|e| CliError::Other(format!("Failed to query columns: {}", e)))?
2804        .into_iter()
2805        .map(|row| RawColumnInfo {
2806            schema: row.get::<_, String>(0),
2807            table: row.get::<_, String>(1),
2808            name: row.get::<_, String>(2),
2809            column_type: row.get::<_, String>(3),
2810            type_schema: row.get::<_, Option<String>>(4),
2811            not_null: row.get::<_, bool>(5),
2812            default_value: row.get::<_, Option<String>>(6),
2813            is_identity: row.get::<_, bool>(7),
2814            identity_type: row.get::<_, Option<String>>(8),
2815            is_generated: row.get::<_, bool>(9),
2816            generated_expression: row.get::<_, Option<String>>(10),
2817            ordinal_position: row.get::<_, i32>(11),
2818        })
2819        .collect();
2820
2821    let raw_enums: Vec<RawEnumInfo> = client
2822        .query(
2823            drizzle_migrations::postgres::introspect::queries::ENUMS_QUERY,
2824            &[],
2825        )
2826        .await
2827        .map_err(|e| CliError::Other(format!("Failed to query enums: {}", e)))?
2828        .into_iter()
2829        .map(|row| RawEnumInfo {
2830            schema: row.get::<_, String>(0),
2831            name: row.get::<_, String>(1),
2832            values: row.get::<_, Vec<String>>(2),
2833        })
2834        .collect();
2835
2836    let raw_sequences: Vec<RawSequenceInfo> = client
2837        .query(
2838            drizzle_migrations::postgres::introspect::queries::SEQUENCES_QUERY,
2839            &[],
2840        )
2841        .await
2842        .map_err(|e| CliError::Other(format!("Failed to query sequences: {}", e)))?
2843        .into_iter()
2844        .map(|row| RawSequenceInfo {
2845            schema: row.get::<_, String>(0),
2846            name: row.get::<_, String>(1),
2847            data_type: row.get::<_, String>(2),
2848            start_value: row.get::<_, String>(3),
2849            min_value: row.get::<_, String>(4),
2850            max_value: row.get::<_, String>(5),
2851            increment: row.get::<_, String>(6),
2852            cycle: row.get::<_, bool>(7),
2853            cache_value: row.get::<_, String>(8),
2854        })
2855        .collect();
2856
2857    let raw_views: Vec<RawViewInfo> = client
2858        .query(
2859            drizzle_migrations::postgres::introspect::queries::VIEWS_QUERY,
2860            &[],
2861        )
2862        .await
2863        .map_err(|e| CliError::Other(format!("Failed to query views: {}", e)))?
2864        .into_iter()
2865        .map(|row| RawViewInfo {
2866            schema: row.get::<_, String>(0),
2867            name: row.get::<_, String>(1),
2868            definition: row.get::<_, String>(2),
2869            is_materialized: row.get::<_, bool>(3),
2870        })
2871        .collect();
2872
2873    let raw_indexes: Vec<RawIndexInfo> = client
2874        .query(POSTGRES_INDEXES_QUERY, &[])
2875        .await
2876        .map_err(|e| CliError::Other(format!("Failed to query indexes: {}", e)))?
2877        .into_iter()
2878        .map(|row| {
2879            let cols: Vec<String> = row.get(6);
2880            RawIndexInfo {
2881                schema: row.get::<_, String>(0),
2882                table: row.get::<_, String>(1),
2883                name: row.get::<_, String>(2),
2884                is_unique: row.get::<_, bool>(3),
2885                is_primary: row.get::<_, bool>(4),
2886                method: row.get::<_, String>(5),
2887                columns: parse_postgres_index_columns(cols),
2888                where_clause: row.get::<_, Option<String>>(7),
2889                concurrent: false,
2890            }
2891        })
2892        .collect();
2893
2894    let raw_fks: Vec<RawForeignKeyInfo> = client
2895        .query(POSTGRES_FOREIGN_KEYS_QUERY, &[])
2896        .await
2897        .map_err(|e| CliError::Other(format!("Failed to query foreign keys: {}", e)))?
2898        .into_iter()
2899        .map(|row| RawForeignKeyInfo {
2900            schema: row.get::<_, String>(0),
2901            table: row.get::<_, String>(1),
2902            name: row.get::<_, String>(2),
2903            columns: row.get::<_, Vec<String>>(3),
2904            schema_to: row.get::<_, String>(4),
2905            table_to: row.get::<_, String>(5),
2906            columns_to: row.get::<_, Vec<String>>(6),
2907            on_update: pg_action_code_to_string(row.get::<_, String>(7)),
2908            on_delete: pg_action_code_to_string(row.get::<_, String>(8)),
2909        })
2910        .collect();
2911
2912    let raw_pks: Vec<RawPrimaryKeyInfo> = client
2913        .query(POSTGRES_PRIMARY_KEYS_QUERY, &[])
2914        .await
2915        .map_err(|e| CliError::Other(format!("Failed to query primary keys: {}", e)))?
2916        .into_iter()
2917        .map(|row| RawPrimaryKeyInfo {
2918            schema: row.get::<_, String>(0),
2919            table: row.get::<_, String>(1),
2920            name: row.get::<_, String>(2),
2921            columns: row.get::<_, Vec<String>>(3),
2922        })
2923        .collect();
2924
2925    let raw_uniques: Vec<RawUniqueInfo> = client
2926        .query(POSTGRES_UNIQUES_QUERY, &[])
2927        .await
2928        .map_err(|e| CliError::Other(format!("Failed to query unique constraints: {}", e)))?
2929        .into_iter()
2930        .map(|row| RawUniqueInfo {
2931            schema: row.get::<_, String>(0),
2932            table: row.get::<_, String>(1),
2933            name: row.get::<_, String>(2),
2934            columns: row.get::<_, Vec<String>>(3),
2935            nulls_not_distinct: row.get::<_, bool>(4),
2936        })
2937        .collect();
2938
2939    let raw_checks: Vec<RawCheckInfo> = client
2940        .query(POSTGRES_CHECKS_QUERY, &[])
2941        .await
2942        .map_err(|e| CliError::Other(format!("Failed to query check constraints: {}", e)))?
2943        .into_iter()
2944        .map(|row| RawCheckInfo {
2945            schema: row.get::<_, String>(0),
2946            table: row.get::<_, String>(1),
2947            name: row.get::<_, String>(2),
2948            expression: row.get::<_, String>(3),
2949        })
2950        .collect();
2951
2952    let raw_roles: Vec<RawRoleInfo> = client
2953        .query(POSTGRES_ROLES_QUERY, &[])
2954        .await
2955        .map_err(|e| CliError::Other(format!("Failed to query roles: {}", e)))?
2956        .into_iter()
2957        .map(|row| RawRoleInfo {
2958            name: row.get::<_, String>(0),
2959            create_db: row.get::<_, bool>(1),
2960            create_role: row.get::<_, bool>(2),
2961            inherit: row.get::<_, bool>(3),
2962        })
2963        .collect();
2964
2965    let raw_policies: Vec<RawPolicyInfo> = client
2966        .query(POSTGRES_POLICIES_QUERY, &[])
2967        .await
2968        .map_err(|e| CliError::Other(format!("Failed to query policies: {}", e)))?
2969        .into_iter()
2970        .map(|row| RawPolicyInfo {
2971            schema: row.get::<_, String>(0),
2972            table: row.get::<_, String>(1),
2973            name: row.get::<_, String>(2),
2974            as_clause: row.get::<_, String>(3),
2975            for_clause: row.get::<_, String>(4),
2976            to: row.get::<_, Vec<String>>(5),
2977            using: row.get::<_, Option<String>>(6),
2978            with_check: row.get::<_, Option<String>>(7),
2979        })
2980        .collect();
2981
2982    let mut ddl = PostgresDDL::new();
2983    for s in raw_schemas.into_iter().map(|s| Schema::new(s.name)) {
2984        ddl.schemas.push(s);
2985    }
2986    for e in process_enums(&raw_enums) {
2987        ddl.enums.push(e);
2988    }
2989    for s in process_sequences(&raw_sequences) {
2990        ddl.sequences.push(s);
2991    }
2992    for r in process_roles(&raw_roles) {
2993        ddl.roles.push(r);
2994    }
2995    for p in process_policies(&raw_policies) {
2996        ddl.policies.push(p);
2997    }
2998    for t in process_tables(&raw_tables) {
2999        ddl.tables.push(t);
3000    }
3001    for c in process_columns(&raw_columns) {
3002        ddl.columns.push(c);
3003    }
3004    for i in process_indexes(&raw_indexes) {
3005        ddl.indexes.push(i);
3006    }
3007    for fk in process_foreign_keys(&raw_fks) {
3008        ddl.fks.push(fk);
3009    }
3010    for pk in process_primary_keys(&raw_pks) {
3011        ddl.pks.push(pk);
3012    }
3013    for u in process_unique_constraints(&raw_uniques) {
3014        ddl.uniques.push(u);
3015    }
3016    for c in process_check_constraints(&raw_checks) {
3017        ddl.checks.push(c);
3018    }
3019    for v in process_views(&raw_views) {
3020        ddl.views.push(v);
3021    }
3022
3023    let options = CodegenOptions {
3024        module_doc: Some(format!("Schema introspected from {}", mask_url(&url))),
3025        include_schema: true,
3026        schema_name: "Schema".to_string(),
3027        use_pub: true,
3028    };
3029    let generated = generate_rust_schema(&ddl, &options);
3030
3031    let mut snap = drizzle_migrations::postgres::PostgresSnapshot::new();
3032    for entity in ddl.to_entities() {
3033        snap.add_entity(entity);
3034    }
3035
3036    Ok(IntrospectResult {
3037        schema_code: generated.code,
3038        table_count: ddl.tables.list().len(),
3039        index_count: ddl.indexes.list().len(),
3040        view_count: ddl.views.list().len(),
3041        warnings: generated.warnings,
3042        snapshot: Snapshot::Postgres(snap),
3043        snapshot_path: std::path::PathBuf::new(),
3044    })
3045}
3046
3047// =============================================================================
3048// PostgreSQL Introspection Queries (CLI-side)
3049// =============================================================================
3050
3051/// Minimal schema list for snapshot
3052#[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
3053#[derive(Debug, Clone)]
3054struct RawSchemaInfo {
3055    name: String,
3056}
3057
3058#[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
3059const POSTGRES_INDEXES_QUERY: &str = r#"
3060SELECT
3061    ns.nspname AS schema,
3062    tbl.relname AS table,
3063    idx.relname AS name,
3064    ix.indisunique AS is_unique,
3065    ix.indisprimary AS is_primary,
3066    am.amname AS method,
3067    array_agg(pg_get_indexdef(ix.indexrelid, s.n, true) ORDER BY s.n) AS columns,
3068    pg_get_expr(ix.indpred, ix.indrelid) AS where_clause
3069FROM pg_index ix
3070JOIN pg_class idx ON idx.oid = ix.indexrelid
3071JOIN pg_class tbl ON tbl.oid = ix.indrelid
3072JOIN pg_namespace ns ON ns.oid = tbl.relnamespace
3073JOIN pg_am am ON am.oid = idx.relam
3074JOIN generate_series(1, ix.indnkeyatts) AS s(n) ON TRUE
3075WHERE ns.nspname NOT LIKE 'pg_%'
3076  AND ns.nspname <> 'information_schema'
3077GROUP BY ns.nspname, tbl.relname, idx.relname, ix.indisunique, ix.indisprimary, am.amname, ix.indpred, ix.indrelid
3078ORDER BY ns.nspname, tbl.relname, idx.relname
3079"#;
3080
3081#[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
3082const POSTGRES_FOREIGN_KEYS_QUERY: &str = r#"
3083SELECT
3084    ns.nspname AS schema,
3085    tbl.relname AS table,
3086    con.conname AS name,
3087    array_agg(src.attname ORDER BY s.ord) AS columns,
3088    ns_to.nspname AS schema_to,
3089    tbl_to.relname AS table_to,
3090    array_agg(dst.attname ORDER BY s.ord) AS columns_to,
3091    con.confupdtype::text AS on_update,
3092    con.confdeltype::text AS on_delete
3093FROM pg_constraint con
3094JOIN pg_class tbl ON tbl.oid = con.conrelid
3095JOIN pg_namespace ns ON ns.oid = tbl.relnamespace
3096JOIN pg_class tbl_to ON tbl_to.oid = con.confrelid
3097JOIN pg_namespace ns_to ON ns_to.oid = tbl_to.relnamespace
3098JOIN unnest(con.conkey) WITH ORDINALITY AS s(attnum, ord) ON TRUE
3099JOIN pg_attribute src ON src.attrelid = tbl.oid AND src.attnum = s.attnum
3100JOIN unnest(con.confkey) WITH ORDINALITY AS r(attnum, ord) ON r.ord = s.ord
3101JOIN pg_attribute dst ON dst.attrelid = tbl_to.oid AND dst.attnum = r.attnum
3102WHERE con.contype = 'f'
3103  AND ns.nspname NOT LIKE 'pg_%'
3104  AND ns.nspname <> 'information_schema'
3105GROUP BY ns.nspname, tbl.relname, con.conname, ns_to.nspname, tbl_to.relname, con.confupdtype, con.confdeltype
3106ORDER BY ns.nspname, tbl.relname, con.conname
3107"#;
3108
3109#[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
3110const POSTGRES_PRIMARY_KEYS_QUERY: &str = r#"
3111SELECT
3112    ns.nspname AS schema,
3113    tbl.relname AS table,
3114    con.conname AS name,
3115    array_agg(att.attname ORDER BY s.ord) AS columns
3116FROM pg_constraint con
3117JOIN pg_class tbl ON tbl.oid = con.conrelid
3118JOIN pg_namespace ns ON ns.oid = tbl.relnamespace
3119JOIN unnest(con.conkey) WITH ORDINALITY AS s(attnum, ord) ON TRUE
3120JOIN pg_attribute att ON att.attrelid = tbl.oid AND att.attnum = s.attnum
3121WHERE con.contype = 'p'
3122  AND ns.nspname NOT LIKE 'pg_%'
3123  AND ns.nspname <> 'information_schema'
3124GROUP BY ns.nspname, tbl.relname, con.conname
3125ORDER BY ns.nspname, tbl.relname, con.conname
3126"#;
3127
3128#[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
3129const POSTGRES_UNIQUES_QUERY: &str = r#"
3130SELECT
3131    ns.nspname AS schema,
3132    tbl.relname AS table,
3133    con.conname AS name,
3134    array_agg(att.attname ORDER BY s.ord) AS columns,
3135    COALESCE(con.connullsnotdistinct, FALSE) AS nulls_not_distinct
3136FROM pg_constraint con
3137JOIN pg_class tbl ON tbl.oid = con.conrelid
3138JOIN pg_namespace ns ON ns.oid = tbl.relnamespace
3139JOIN unnest(con.conkey) WITH ORDINALITY AS s(attnum, ord) ON TRUE
3140JOIN pg_attribute att ON att.attrelid = tbl.oid AND att.attnum = s.attnum
3141WHERE con.contype = 'u'
3142  AND ns.nspname NOT LIKE 'pg_%'
3143  AND ns.nspname <> 'information_schema'
3144GROUP BY ns.nspname, tbl.relname, con.conname, con.connullsnotdistinct
3145ORDER BY ns.nspname, tbl.relname, con.conname
3146"#;
3147
3148#[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
3149const POSTGRES_CHECKS_QUERY: &str = r#"
3150SELECT
3151    ns.nspname AS schema,
3152    tbl.relname AS table,
3153    con.conname AS name,
3154    pg_get_expr(con.conbin, con.conrelid) AS expression
3155FROM pg_constraint con
3156JOIN pg_class tbl ON tbl.oid = con.conrelid
3157JOIN pg_namespace ns ON ns.oid = tbl.relnamespace
3158WHERE con.contype = 'c'
3159  AND ns.nspname NOT LIKE 'pg_%'
3160  AND ns.nspname <> 'information_schema'
3161ORDER BY ns.nspname, tbl.relname, con.conname
3162"#;
3163
3164#[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
3165const POSTGRES_ROLES_QUERY: &str = r#"
3166SELECT
3167    rolname AS name,
3168    rolcreatedb AS create_db,
3169    rolcreaterole AS create_role,
3170    rolinherit AS inherit
3171FROM pg_roles
3172ORDER BY rolname
3173"#;
3174
3175#[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
3176const POSTGRES_POLICIES_QUERY: &str = r#"
3177SELECT
3178    schemaname AS schema,
3179    tablename AS table,
3180    policyname AS name,
3181    CASE WHEN permissive THEN 'PERMISSIVE' ELSE 'RESTRICTIVE' END AS as_clause,
3182    upper(cmd) AS for_clause,
3183    roles AS to,
3184    qual AS using,
3185    with_check AS with_check
3186FROM pg_policies
3187WHERE schemaname NOT LIKE 'pg_%'
3188  AND schemaname <> 'information_schema'
3189ORDER BY schemaname, tablename, policyname
3190"#;
3191
3192#[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
3193fn pg_action_code_to_string(code: String) -> String {
3194    match code.as_str() {
3195        "a" => "NO ACTION",
3196        "r" => "RESTRICT",
3197        "c" => "CASCADE",
3198        "n" => "SET NULL",
3199        "d" => "SET DEFAULT",
3200        _ => "NO ACTION",
3201    }
3202    .to_string()
3203}
3204
3205#[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
3206fn parse_postgres_index_columns(
3207    cols: Vec<String>,
3208) -> Vec<drizzle_migrations::postgres::introspect::RawIndexColumnInfo> {
3209    use drizzle_migrations::postgres::introspect::RawIndexColumnInfo;
3210    cols.into_iter()
3211        .map(|c| {
3212            let trimmed = c.trim().to_string();
3213            let upper = trimmed.to_uppercase();
3214
3215            let asc = !upper.contains(" DESC");
3216            let nulls_first = upper.contains(" NULLS FIRST");
3217
3218            // Strip sort/nulls directives for opclass parsing / expression detection.
3219            let mut core = trimmed.clone();
3220            for token in [" ASC", " DESC", " NULLS FIRST", " NULLS LAST"] {
3221                if let Some(pos) = core.to_uppercase().find(token) {
3222                    core.truncate(pos);
3223                    break;
3224                }
3225            }
3226            let core = core.trim().to_string();
3227
3228            // Heuristic: treat as expression if it contains parentheses or spaces.
3229            let is_expression = core.contains('(')
3230                || core.contains(')')
3231                || core.contains(' ')
3232                || core.contains("::");
3233
3234            // Heuristic opclass parsing: split whitespace and take second token if it looks like opclass.
3235            let mut opclass: Option<String> = None;
3236            let mut name = core.clone();
3237            let parts: Vec<&str> = core.split_whitespace().collect();
3238            if parts.len() >= 2 {
3239                let second = parts[1];
3240                if !matches!(second.to_uppercase().as_str(), "ASC" | "DESC" | "NULLS") {
3241                    opclass = Some(second.to_string());
3242                    name = parts[0].to_string();
3243                }
3244            }
3245
3246            RawIndexColumnInfo {
3247                name,
3248                is_expression,
3249                asc,
3250                nulls_first,
3251                opclass,
3252            }
3253        })
3254        .collect()
3255}
3256
3257#[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
3258fn mask_url(url: &str) -> String {
3259    if let Some(at) = url.find('@')
3260        && let Some(colon) = url[..at].rfind(':')
3261    {
3262        let scheme_end = url.find("://").map(|p| p + 3).unwrap_or(0);
3263        if colon > scheme_end {
3264            return format!("{}****{}", &url[..colon + 1], &url[at..]);
3265        }
3266    }
3267    url.to_string()
3268}