Skip to main content

drizzle_cli/db/
mod.rs

1//! Database connection and migration execution for CLI commands
2//!
3//! This module provides database connectivity for running migrations and other
4//! database operations from the CLI.
5
6use std::path::Path;
7
8#[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
9use crate::config::PostgresCreds;
10use crate::config::{Credentials, Dialect, Extension, IntrospectCasing};
11use crate::error::CliError;
12use crate::output;
13#[cfg(any(
14    test,
15    feature = "rusqlite",
16    feature = "libsql",
17    feature = "turso",
18    feature = "postgres-sync",
19    feature = "tokio-postgres",
20    feature = "d1-http",
21))]
22use drizzle_migrations::Migrations;
23use drizzle_migrations::schema::Snapshot;
24
25#[cfg(feature = "d1-http")]
26mod d1_http;
27
28/// Result of a migration run
29#[derive(Debug)]
30pub struct MigrationResult {
31    /// Number of migrations applied
32    pub applied_count: usize,
33    /// Tags of applied migrations
34    pub applied_migrations: Vec<String>,
35}
36
37/// Planned migration execution details.
38#[derive(Debug, Clone)]
39pub struct MigrationPlan {
40    /// Number of already-applied migrations found in the database metadata table.
41    pub applied_count: usize,
42    /// Number of pending migrations found locally.
43    pub pending_count: usize,
44    /// Pending migration tags in execution order.
45    pub pending_migrations: Vec<String>,
46    /// Total number of non-empty SQL statements in pending migrations.
47    pub pending_statements: usize,
48}
49
50#[cfg(any(
51    test,
52    feature = "rusqlite",
53    feature = "libsql",
54    feature = "turso",
55    feature = "postgres-sync",
56    feature = "tokio-postgres",
57    feature = "d1-http",
58))]
59#[derive(Debug, Clone)]
60pub(crate) struct AppliedMigrationRecord {
61    pub(crate) hash: String,
62    pub(crate) name: String,
63}
64
65/// Planned SQL changes for `drizzle push`
66#[derive(Debug, Clone)]
67pub struct PushPlan {
68    pub sql_statements: Vec<String>,
69    pub warnings: Vec<String>,
70    pub destructive: bool,
71}
72
73/// Optional filters for introspection and push planning.
74#[derive(Debug, Clone, Default)]
75pub struct SnapshotFilters {
76    pub tables: Option<Vec<String>>,
77    pub schemas: Option<Vec<String>>,
78    pub extensions: Option<Vec<Extension>>,
79}
80
81impl SnapshotFilters {
82    const fn is_empty(&self) -> bool {
83        self.tables.is_none() && self.schemas.is_none() && self.extensions.is_none()
84    }
85}
86
87/// Plan a push by introspecting the live database and diffing against the desired snapshot.
88///
89/// # Errors
90///
91/// Returns [`CliError`] if introspecting the live database fails, if applying
92/// the given snapshot filters fails, or if generating the diff SQL fails.
93pub fn plan_push(
94    credentials: &Credentials,
95    dialect: Dialect,
96    desired: &Snapshot,
97    breakpoints: bool,
98    filters: &SnapshotFilters,
99) -> Result<PushPlan, CliError> {
100    let mut current = introspect_database(credentials, dialect)?.snapshot;
101    apply_snapshot_filters(&mut current, dialect, filters)?;
102    let (sql_statements, warnings) = generate_push_sql(&current, desired, breakpoints)?;
103    let destructive = sql_statements.iter().any(|s| is_destructive_statement(s));
104
105    Ok(PushPlan {
106        sql_statements,
107        warnings,
108        destructive,
109    })
110}
111
112/// Apply a previously planned push.
113///
114/// # Errors
115///
116/// Returns [`CliError`] if the confirmation prompt for a destructive plan
117/// fails, or if executing the planned SQL statements against the database
118/// fails.
119pub fn apply_push(
120    credentials: &Credentials,
121    dialect: Dialect,
122    plan: &PushPlan,
123    force: bool,
124) -> Result<(), CliError> {
125    if plan.sql_statements.is_empty() {
126        return Ok(());
127    }
128
129    if plan.destructive && !force {
130        let confirmed = confirm_destructive()?;
131        if !confirmed {
132            return Ok(());
133        }
134    }
135
136    execute_statements(credentials, dialect, &plan.sql_statements)
137}
138
139/// Execute migrations against the database
140///
141/// This is the main entry point that dispatches to the appropriate driver
142/// based on the credentials type.
143///
144/// # Errors
145///
146/// Returns [`CliError`] if no compiled driver matches the credentials, if
147/// connecting to the database fails, or if reading the migration tracking
148/// table or on-disk migration files fails.
149#[allow(unused_variables)] // params consumed inside feature-gated block
150pub fn plan_migrations(
151    credentials: &Credentials,
152    dialect: Dialect,
153    migrations_dir: &Path,
154    migrations_table: &str,
155    migrations_schema: &str,
156) -> Result<MigrationPlan, CliError> {
157    #[cfg(any(
158        feature = "rusqlite",
159        feature = "libsql",
160        feature = "turso",
161        feature = "postgres-sync",
162        feature = "tokio-postgres",
163        feature = "d1-http",
164    ))]
165    let set = load_migration_set(dialect, migrations_dir, migrations_table, migrations_schema)?;
166
167    match credentials {
168        #[cfg(feature = "rusqlite")]
169        Credentials::Sqlite { path } => inspect_sqlite_migrations(&set, path),
170
171        #[cfg(not(feature = "rusqlite"))]
172        Credentials::Sqlite { .. } => Err(CliError::MissingDriver {
173            dialect: "SQLite",
174            feature: "rusqlite",
175        }),
176
177        #[cfg(any(feature = "libsql", feature = "turso"))]
178        Credentials::Turso { url, auth_token } => {
179            if is_local_libsql(url) {
180                #[cfg(feature = "libsql")]
181                {
182                    inspect_libsql_local_migrations(&set, url)
183                }
184                #[cfg(not(feature = "libsql"))]
185                {
186                    Err(CliError::MissingDriver {
187                        dialect: "LibSQL (local)",
188                        feature: "libsql",
189                    })
190                }
191            } else {
192                #[cfg(feature = "turso")]
193                {
194                    inspect_turso_migrations(&set, url, auth_token.as_deref())
195                }
196                #[cfg(not(feature = "turso"))]
197                {
198                    let _ = auth_token;
199                    Err(CliError::MissingDriver {
200                        dialect: "Turso (remote)",
201                        feature: "turso",
202                    })
203                }
204            }
205        }
206
207        #[cfg(all(not(feature = "turso"), not(feature = "libsql")))]
208        Credentials::Turso { .. } => Err(CliError::MissingDriver {
209            dialect: "Turso",
210            feature: "turso or libsql",
211        }),
212
213        Credentials::Postgres(creds) => {
214            let _ = creds;
215            core::cfg_select! {
216                feature = "postgres-sync" => inspect_postgres_sync_migrations(&set, creds),
217                feature = "tokio-postgres" => inspect_postgres_async_migrations(&set, creds),
218                _ => Err(CliError::MissingDriver {
219                    dialect: "PostgreSQL",
220                    feature: "postgres-sync or tokio-postgres",
221                }),
222            }
223        }
224
225        #[cfg(feature = "d1-http")]
226        Credentials::D1 {
227            account_id,
228            database_id,
229            token,
230        } => d1_http::inspect_migrations(&set, account_id, database_id, token),
231
232        #[cfg(not(feature = "d1-http"))]
233        Credentials::D1 { .. } => Err(CliError::MissingDriver {
234            dialect: "Cloudflare D1 (HTTP)",
235            feature: "d1-http",
236        }),
237
238        Credentials::AwsDataApi { .. } => Err(CliError::UnsupportedForDriver {
239            operation: "Migration planning against AWS Data API",
240            driver: "aws-data-api",
241            hint: "AWS RDS Data API schema ops are not yet wired into this CLI. For now, \
242                   run the generated SQL with `aws rds-data execute-statement --sql=...` \
243                   (or use tokio-postgres with a temporary direct connection).",
244        }),
245    }
246}
247
248/// Verify migrations by re-running the planning logic without applying
249/// anything, surfacing any inconsistencies between the on-disk migration
250/// files and the tracking table.
251///
252/// # Errors
253///
254/// Returns the same errors as [`plan_migrations`].
255pub fn verify_migrations(
256    credentials: &Credentials,
257    dialect: Dialect,
258    migrations_dir: &Path,
259    migrations_table: &str,
260    migrations_schema: &str,
261) -> Result<MigrationPlan, CliError> {
262    plan_migrations(
263        credentials,
264        dialect,
265        migrations_dir,
266        migrations_table,
267        migrations_schema,
268    )
269}
270
271/// Apply any pending migrations against the database referenced by
272/// `credentials`.
273///
274/// # Errors
275///
276/// Returns [`CliError`] if no compiled driver matches, if connecting or
277/// starting a transaction fails, if executing a migration's SQL fails, or if
278/// writing to the tracking table fails.
279#[allow(unused_variables)] // params consumed inside feature-gated block
280pub fn run_migrations(
281    credentials: &Credentials,
282    dialect: Dialect,
283    migrations_dir: &Path,
284    migrations_table: &str,
285    migrations_schema: &str,
286) -> Result<MigrationResult, CliError> {
287    #[cfg(any(
288        feature = "rusqlite",
289        feature = "libsql",
290        feature = "turso",
291        feature = "postgres-sync",
292        feature = "tokio-postgres",
293        feature = "d1-http",
294    ))]
295    let set = load_migration_set(dialect, migrations_dir, migrations_table, migrations_schema)?;
296
297    match credentials {
298        #[cfg(feature = "rusqlite")]
299        Credentials::Sqlite { path } => run_sqlite_migrations(&set, path),
300
301        #[cfg(not(feature = "rusqlite"))]
302        Credentials::Sqlite { .. } => Err(CliError::MissingDriver {
303            dialect: "SQLite",
304            feature: "rusqlite",
305        }),
306
307        #[cfg(any(feature = "libsql", feature = "turso"))]
308        Credentials::Turso { url, auth_token } => {
309            if is_local_libsql(url) {
310                #[cfg(feature = "libsql")]
311                {
312                    run_libsql_local_migrations(&set, url)
313                }
314                #[cfg(not(feature = "libsql"))]
315                {
316                    Err(CliError::MissingDriver {
317                        dialect: "LibSQL (local)",
318                        feature: "libsql",
319                    })
320                }
321            } else {
322                #[cfg(feature = "turso")]
323                {
324                    run_turso_migrations(&set, url, auth_token.as_deref())
325                }
326                #[cfg(not(feature = "turso"))]
327                {
328                    let _ = auth_token;
329                    Err(CliError::MissingDriver {
330                        dialect: "Turso (remote)",
331                        feature: "turso",
332                    })
333                }
334            }
335        }
336
337        #[cfg(all(not(feature = "turso"), not(feature = "libsql")))]
338        Credentials::Turso { .. } => Err(CliError::MissingDriver {
339            dialect: "Turso",
340            feature: "turso or libsql",
341        }),
342
343        // PostgreSQL - prefer sync driver if available, fall back to async
344        Credentials::Postgres(creds) => {
345            let _ = creds;
346            core::cfg_select! {
347                feature = "postgres-sync" => run_postgres_sync_migrations(&set, creds),
348                feature = "tokio-postgres" => run_postgres_async_migrations(&set, creds),
349                _ => Err(CliError::MissingDriver {
350                    dialect: "PostgreSQL",
351                    feature: "postgres-sync or tokio-postgres",
352                }),
353            }
354        }
355
356        #[cfg(feature = "d1-http")]
357        Credentials::D1 {
358            account_id,
359            database_id,
360            token,
361        } => d1_http::run_migrations(&set, account_id, database_id, token),
362
363        #[cfg(not(feature = "d1-http"))]
364        Credentials::D1 { .. } => Err(CliError::MissingDriver {
365            dialect: "Cloudflare D1 (HTTP)",
366            feature: "d1-http",
367        }),
368
369        Credentials::AwsDataApi { .. } => Err(CliError::UnsupportedForDriver {
370            operation: "Running migrations against AWS Data API",
371            driver: "aws-data-api",
372            hint: "AWS RDS Data API migrations are not yet wired into this CLI. Run the \
373                   generated SQL via `aws rds-data execute-statement` or connect directly \
374                   with tokio-postgres.",
375        }),
376    }
377}
378
379#[cfg(any(
380    feature = "rusqlite",
381    feature = "libsql",
382    feature = "turso",
383    feature = "postgres-sync",
384    feature = "tokio-postgres",
385    feature = "d1-http",
386))]
387fn load_migration_set(
388    dialect: Dialect,
389    migrations_dir: &Path,
390    migrations_table: &str,
391    migrations_schema: &str,
392) -> Result<Migrations, CliError> {
393    let tracking = migration_tracking(dialect, migrations_table, migrations_schema);
394
395    // Load migrations from filesystem
396    let migrations = drizzle_migrations::MigrationDir::new(migrations_dir)
397        .discover()
398        .map_err(|e| CliError::Other(format!("Failed to load migrations: {e}")))?;
399    Ok(Migrations::with_tracking(
400        migrations,
401        dialect.to_base(),
402        tracking,
403    ))
404}
405
406#[cfg(any(
407    feature = "rusqlite",
408    feature = "libsql",
409    feature = "turso",
410    feature = "postgres-sync",
411    feature = "tokio-postgres",
412    feature = "d1-http",
413))]
414fn migration_tracking(
415    dialect: Dialect,
416    migrations_table: &str,
417    migrations_schema: &str,
418) -> drizzle_types::MigrationTracking {
419    let mut tracking = match dialect {
420        Dialect::Postgresql => drizzle_types::MigrationTracking::POSTGRES,
421        _ => drizzle_types::MigrationTracking::SQLITE,
422    };
423
424    if !migrations_table.trim().is_empty() {
425        tracking = tracking.table(migrations_table.to_owned());
426    }
427
428    if dialect == Dialect::Postgresql && !migrations_schema.trim().is_empty() {
429        tracking = tracking.schema(migrations_schema.to_owned());
430    }
431
432    tracking
433}
434
435#[cfg(any(
436    test,
437    feature = "rusqlite",
438    feature = "libsql",
439    feature = "turso",
440    feature = "postgres-sync",
441    feature = "tokio-postgres",
442    feature = "d1-http",
443))]
444pub(crate) fn build_migration_plan(
445    set: &Migrations,
446    applied: &[AppliedMigrationRecord],
447) -> Result<MigrationPlan, CliError> {
448    verify_applied_migrations_consistency(set, applied)?;
449
450    let applied_names = applied.iter().map(|m| m.name.clone()).collect::<Vec<_>>();
451    let pending = set.pending(&applied_names).collect::<Vec<_>>();
452
453    let pending_statements = pending
454        .iter()
455        .map(|m| {
456            m.statements()
457                .iter()
458                .filter(|stmt| !stmt.trim().is_empty())
459                .count()
460        })
461        .sum();
462
463    Ok(MigrationPlan {
464        applied_count: applied.len(),
465        pending_count: pending.len(),
466        pending_migrations: pending.iter().map(|m| m.tag().to_string()).collect(),
467        pending_statements,
468    })
469}
470
471#[cfg(any(
472    test,
473    feature = "rusqlite",
474    feature = "libsql",
475    feature = "turso",
476    feature = "postgres-sync",
477    feature = "tokio-postgres",
478    feature = "d1-http",
479))]
480fn verify_applied_migrations_consistency(
481    set: &Migrations,
482    applied: &[AppliedMigrationRecord],
483) -> Result<(), CliError> {
484    use std::collections::{HashMap, HashSet};
485
486    let mut local_by_name = HashMap::<&str, &str>::new();
487    for migration in set.all() {
488        if local_by_name
489            .insert(migration.name(), migration.hash())
490            .is_some()
491        {
492            return Err(CliError::MigrationError(format!(
493                "Local migrations contain duplicate name: {}",
494                migration.name()
495            )));
496        }
497    }
498
499    let mut seen_db_names = HashSet::<&str>::new();
500    for applied_row in applied {
501        if !seen_db_names.insert(applied_row.name.as_str()) {
502            return Err(CliError::MigrationError(format!(
503                "Database migration metadata contains duplicate name: {}",
504                applied_row.name
505            )));
506        }
507
508        let Some(local_hash) = local_by_name.get(applied_row.name.as_str()) else {
509            return Err(CliError::MigrationError(format!(
510                "Database contains applied migration not found locally (name: {})",
511                applied_row.name
512            )));
513        };
514
515        if *local_hash != applied_row.hash {
516            return Err(CliError::MigrationError(format!(
517                "Migration hash mismatch for {}: database={}, local={}",
518                applied_row.name, applied_row.hash, local_hash
519            )));
520        }
521    }
522
523    Ok(())
524}
525
526#[cfg(any(
527    feature = "rusqlite",
528    feature = "libsql",
529    feature = "turso",
530    feature = "postgres-sync",
531    feature = "tokio-postgres",
532))]
533fn escape_sql_literal(value: &str) -> String {
534    value.replace('\'', "''")
535}
536
537#[cfg(feature = "rusqlite")]
538fn ensure_sqlite_tracking_table(
539    conn: &rusqlite::Connection,
540    set: &Migrations,
541) -> Result<(), CliError> {
542    conn.execute(&set.create_table_sql(), [])
543        .map_err(|e| CliError::MigrationError(format!("Failed to create migrations table: {e}")))?;
544
545    let pragma_sql = format!(
546        "SELECT name FROM pragma_table_info('{}')",
547        escape_sql_literal(set.table_name())
548    );
549    let mut stmt = conn
550        .prepare(&pragma_sql)
551        .map_err(|e| CliError::MigrationError(e.to_string()))?;
552    let columns = stmt
553        .query_map([], |row| row.get::<_, String>(0))
554        .map_err(|e| CliError::MigrationError(e.to_string()))?
555        .collect::<Result<Vec<_>, _>>()
556        .map_err(|e| CliError::MigrationError(e.to_string()))?;
557    if columns.iter().any(|column| column == "name") {
558        return Ok(());
559    }
560
561    let mut stmt = conn
562        .prepare(&format!(
563            "SELECT id, hash, created_at FROM {} ORDER BY id ASC",
564            set.table_ident_sql()
565        ))
566        .map_err(|e| CliError::MigrationError(e.to_string()))?;
567    let applied = stmt
568        .query_map([], |row| {
569            Ok(drizzle_migrations::AppliedMigrationMetadata {
570                id: row.get::<_, Option<i64>>(0)?,
571                hash: row.get::<_, String>(1)?,
572                created_at: row.get::<_, i64>(2)?,
573            })
574        })
575        .map_err(|e| CliError::MigrationError(e.to_string()))?
576        .collect::<Result<Vec<_>, _>>()
577        .map_err(|e| CliError::MigrationError(e.to_string()))?;
578
579    let matched = drizzle_migrations::match_applied_migration_metadata(set.all(), &applied)
580        .map_err(|e| CliError::MigrationError(e.to_string()))?;
581
582    conn.execute("BEGIN", [])
583        .map_err(|e| CliError::MigrationError(e.to_string()))?;
584    let result = (|| -> Result<(), CliError> {
585        conn.execute(
586            &format!(
587                "ALTER TABLE {} ADD COLUMN \"name\" text",
588                set.table_ident_sql()
589            ),
590            [],
591        )
592        .map_err(|e| CliError::MigrationError(e.to_string()))?;
593        conn.execute(
594            &format!(
595                "ALTER TABLE {} ADD COLUMN \"applied_at\" TEXT",
596                set.table_ident_sql()
597            ),
598            [],
599        )
600        .map_err(|e| CliError::MigrationError(e.to_string()))?;
601
602        for row in matched {
603            let where_clause = if let Some(id) = row.id {
604                format!("\"id\" = {id}")
605            } else {
606                format!(
607                    "\"created_at\" = {} AND \"hash\" = '{}'",
608                    row.created_at,
609                    escape_sql_literal(&row.hash)
610                )
611            };
612            conn.execute(
613                &format!(
614                    "UPDATE {} SET \"name\" = '{}', \"applied_at\" = NULL WHERE {}",
615                    set.table_ident_sql(),
616                    escape_sql_literal(&row.name),
617                    where_clause
618                ),
619                [],
620            )
621            .map_err(|e| CliError::MigrationError(e.to_string()))?;
622        }
623
624        Ok(())
625    })();
626
627    match result {
628        Ok(()) => {
629            conn.execute("COMMIT", [])
630                .map_err(|e| CliError::MigrationError(e.to_string()))?;
631            Ok(())
632        }
633        Err(err) => {
634            let _ = conn.execute("ROLLBACK", []);
635            Err(err)
636        }
637    }
638}
639
640#[cfg(feature = "libsql")]
641async fn ensure_sqlite_tracking_table_libsql(
642    conn: &libsql::Connection,
643    set: &Migrations,
644) -> Result<(), CliError> {
645    conn.execute(&set.create_table_sql(), ())
646        .await
647        .map_err(|e| CliError::MigrationError(format!("Failed to create migrations table: {e}")))?;
648
649    let pragma_sql = format!(
650        "SELECT name FROM pragma_table_info('{}')",
651        escape_sql_literal(set.table_name())
652    );
653    let mut rows = conn
654        .query(&pragma_sql, ())
655        .await
656        .map_err(|e| CliError::MigrationError(e.to_string()))?;
657    let mut has_name = false;
658    while let Ok(Some(row)) = rows.next().await {
659        if let Ok(name) = row.get::<String>(0)
660            && name == "name"
661        {
662            has_name = true;
663            break;
664        }
665    }
666    if has_name {
667        return Ok(());
668    }
669
670    let mut rows = conn
671        .query(
672            &format!(
673                "SELECT id, hash, created_at FROM {} ORDER BY id ASC",
674                set.table_ident_sql()
675            ),
676            (),
677        )
678        .await
679        .map_err(|e| CliError::MigrationError(e.to_string()))?;
680    let mut applied = Vec::new();
681    while let Ok(Some(row)) = rows.next().await {
682        let hash = row
683            .get::<String>(1)
684            .map_err(|e| CliError::MigrationError(e.to_string()))?;
685        let created_at = row
686            .get::<i64>(2)
687            .map_err(|e| CliError::MigrationError(e.to_string()))?;
688        applied.push(drizzle_migrations::AppliedMigrationMetadata {
689            id: row.get::<Option<i64>>(0).ok().flatten(),
690            hash,
691            created_at,
692        });
693    }
694
695    let matched = drizzle_migrations::match_applied_migration_metadata(set.all(), &applied)
696        .map_err(|e| CliError::MigrationError(e.to_string()))?;
697
698    let tx = conn
699        .transaction()
700        .await
701        .map_err(|e| CliError::MigrationError(e.to_string()))?;
702    tx.execute(
703        &format!(
704            "ALTER TABLE {} ADD COLUMN \"name\" text",
705            set.table_ident_sql()
706        ),
707        (),
708    )
709    .await
710    .map_err(|e| CliError::MigrationError(e.to_string()))?;
711    tx.execute(
712        &format!(
713            "ALTER TABLE {} ADD COLUMN \"applied_at\" TEXT",
714            set.table_ident_sql()
715        ),
716        (),
717    )
718    .await
719    .map_err(|e| CliError::MigrationError(e.to_string()))?;
720
721    for row in matched {
722        let where_clause = if let Some(id) = row.id {
723            format!("\"id\" = {id}")
724        } else {
725            format!(
726                "\"created_at\" = {} AND \"hash\" = '{}'",
727                row.created_at,
728                escape_sql_literal(&row.hash)
729            )
730        };
731        tx.execute(
732            &format!(
733                "UPDATE {} SET \"name\" = '{}', \"applied_at\" = NULL WHERE {}",
734                set.table_ident_sql(),
735                escape_sql_literal(&row.name),
736                where_clause
737            ),
738            (),
739        )
740        .await
741        .map_err(|e| CliError::MigrationError(e.to_string()))?;
742    }
743
744    tx.commit()
745        .await
746        .map_err(|e| CliError::MigrationError(e.to_string()))?;
747    Ok(())
748}
749
750#[cfg(feature = "postgres-sync")]
751fn ensure_postgres_tracking_table_sync(
752    client: &mut postgres::Client,
753    set: &Migrations,
754) -> Result<(), CliError> {
755    client
756        .execute(&set.create_table_sql(), &[])
757        .map_err(|e| CliError::MigrationError(format!("Failed to create migrations table: {e}")))?;
758
759    let schema = set.schema_name().unwrap_or("public");
760    let rows = client
761        .query(
762            "SELECT column_name FROM information_schema.columns WHERE table_schema = $1 AND table_name = $2",
763            &[&schema, &set.table_name()],
764        )
765        .map_err(|e| CliError::MigrationError(e.to_string()))?;
766    if rows
767        .iter()
768        .filter_map(|row| row.try_get::<_, String>(0).ok())
769        .any(|column| column == "name")
770    {
771        return Ok(());
772    }
773
774    let rows = client
775        .query(
776            &format!(
777                "SELECT id, hash, created_at FROM {} ORDER BY id ASC",
778                set.table_ident_sql()
779            ),
780            &[],
781        )
782        .map_err(|e| CliError::MigrationError(e.to_string()))?;
783    let applied = rows
784        .iter()
785        .map(|row| {
786            Ok(drizzle_migrations::AppliedMigrationMetadata {
787                id: row.try_get::<_, Option<i64>>(0).ok().flatten(),
788                hash: row.try_get::<_, String>(1)?,
789                created_at: row.try_get::<_, i64>(2)?,
790            })
791        })
792        .collect::<Result<Vec<_>, postgres::Error>>()
793        .map_err(|e| CliError::MigrationError(e.to_string()))?;
794
795    let matched = drizzle_migrations::match_applied_migration_metadata(set.all(), &applied)
796        .map_err(|e| CliError::MigrationError(e.to_string()))?;
797
798    client
799        .execute(
800            &format!(
801                "ALTER TABLE {} ADD COLUMN \"name\" TEXT",
802                set.table_ident_sql()
803            ),
804            &[],
805        )
806        .map_err(|e| CliError::MigrationError(e.to_string()))?;
807    client
808        .execute(
809            &format!(
810                "ALTER TABLE {} ADD COLUMN \"applied_at\" TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP",
811                set.table_ident_sql()
812            ),
813            &[],
814        )
815        .map_err(|e| CliError::MigrationError(e.to_string()))?;
816
817    for row in matched {
818        let where_clause = if let Some(id) = row.id {
819            format!("\"id\" = {id}")
820        } else {
821            format!(
822                "\"created_at\" = {} AND \"hash\" = '{}'",
823                row.created_at,
824                escape_sql_literal(&row.hash)
825            )
826        };
827        client
828            .execute(
829                &format!(
830                    "UPDATE {} SET \"name\" = '{}', \"applied_at\" = NULL WHERE {}",
831                    set.table_ident_sql(),
832                    escape_sql_literal(&row.name),
833                    where_clause
834                ),
835                &[],
836            )
837            .map_err(|e| CliError::MigrationError(e.to_string()))?;
838    }
839
840    Ok(())
841}
842
843#[cfg(feature = "tokio-postgres")]
844async fn ensure_postgres_tracking_table_async(
845    client: &tokio_postgres::Client,
846    set: &Migrations,
847) -> Result<(), CliError> {
848    client
849        .execute(&set.create_table_sql(), &[])
850        .await
851        .map_err(|e| CliError::MigrationError(format!("Failed to create migrations table: {e}")))?;
852
853    let schema = set.schema_name().unwrap_or("public");
854    let rows = client
855        .query(
856            "SELECT column_name FROM information_schema.columns WHERE table_schema = $1 AND table_name = $2",
857            &[&schema, &set.table_name()],
858        )
859        .await
860        .map_err(|e| CliError::MigrationError(e.to_string()))?;
861    if rows
862        .iter()
863        .filter_map(|row| row.try_get::<_, String>(0).ok())
864        .any(|column| column == "name")
865    {
866        return Ok(());
867    }
868
869    let rows = client
870        .query(
871            &format!(
872                "SELECT id, hash, created_at FROM {} ORDER BY id ASC",
873                set.table_ident_sql()
874            ),
875            &[],
876        )
877        .await
878        .map_err(|e| CliError::MigrationError(e.to_string()))?;
879    let applied = rows
880        .iter()
881        .map(|row| {
882            Ok(drizzle_migrations::AppliedMigrationMetadata {
883                id: row.try_get::<_, Option<i64>>(0).ok().flatten(),
884                hash: row.try_get::<_, String>(1)?,
885                created_at: row.try_get::<_, i64>(2)?,
886            })
887        })
888        .collect::<Result<Vec<_>, tokio_postgres::Error>>()
889        .map_err(|e| CliError::MigrationError(e.to_string()))?;
890
891    let matched = drizzle_migrations::match_applied_migration_metadata(set.all(), &applied)
892        .map_err(|e| CliError::MigrationError(e.to_string()))?;
893
894    client
895        .execute(
896            &format!(
897                "ALTER TABLE {} ADD COLUMN \"name\" TEXT",
898                set.table_ident_sql()
899            ),
900            &[],
901        )
902        .await
903        .map_err(|e| CliError::MigrationError(e.to_string()))?;
904    client
905        .execute(
906            &format!(
907                "ALTER TABLE {} ADD COLUMN \"applied_at\" TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP",
908                set.table_ident_sql()
909            ),
910            &[],
911        )
912        .await
913        .map_err(|e| CliError::MigrationError(e.to_string()))?;
914
915    for row in matched {
916        let where_clause = if let Some(id) = row.id {
917            format!("\"id\" = {id}")
918        } else {
919            format!(
920                "\"created_at\" = {} AND \"hash\" = '{}'",
921                row.created_at,
922                escape_sql_literal(&row.hash)
923            )
924        };
925        client
926            .execute(
927                &format!(
928                    "UPDATE {} SET \"name\" = '{}', \"applied_at\" = NULL WHERE {}",
929                    set.table_ident_sql(),
930                    escape_sql_literal(&row.name),
931                    where_clause
932                ),
933                &[],
934            )
935            .await
936            .map_err(|e| CliError::MigrationError(e.to_string()))?;
937    }
938
939    Ok(())
940}
941
942fn is_destructive_statement(sql: &str) -> bool {
943    let s = sql.trim().to_uppercase();
944    s.contains("DROP TABLE")
945        || s.contains("DROP COLUMN")
946        || s.contains("DROP INDEX")
947        || s.contains("DROP VIEW")
948        || s.contains("DROP MATERIALIZED VIEW")
949        || s.contains("DROP TYPE")
950        || s.contains("DROP SCHEMA")
951        || s.contains("DROP SEQUENCE")
952        || s.contains("DROP ROLE")
953        || s.contains("DROP POLICY")
954        || s.contains("TRUNCATE")
955        || (s.contains("ALTER TABLE") && s.contains(" DROP "))
956}
957
958#[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
959fn is_postgres_concurrent_index_statement(sql: &str) -> bool {
960    let s = sql.trim().to_ascii_uppercase();
961    (s.starts_with("CREATE") || s.starts_with("DROP")) && s.contains("INDEX CONCURRENTLY")
962}
963
964#[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
965fn has_postgres_concurrent_index(statements: &[String]) -> bool {
966    statements
967        .iter()
968        .any(|stmt| is_postgres_concurrent_index_statement(stmt))
969}
970
971fn confirm_destructive() -> Result<bool, CliError> {
972    use std::io::{self, IsTerminal, Write};
973
974    if !io::stdin().is_terminal() {
975        return Err(CliError::Other(
976            "Refusing to apply potentially destructive changes in non-interactive mode. Use --explain or --force."
977                .into(),
978        ));
979    }
980
981    println!(
982        "{}",
983        output::warning("Potentially destructive changes detected (DROP/TRUNCATE/etc).")
984    );
985    print!("Apply anyway? [y/N]: ");
986    io::stdout()
987        .flush()
988        .map_err(|e| CliError::IoError(e.to_string()))?;
989
990    let mut line = String::new();
991    io::stdin()
992        .read_line(&mut line)
993        .map_err(|e| CliError::IoError(e.to_string()))?;
994    let ans = line.trim().to_ascii_lowercase();
995    Ok(ans == "y" || ans == "yes")
996}
997
998fn generate_push_sql(
999    current: &Snapshot,
1000    desired: &Snapshot,
1001    breakpoints: bool,
1002) -> Result<(Vec<String>, Vec<String>), CliError> {
1003    match (current, desired) {
1004        (Snapshot::Sqlite(prev_snap), Snapshot::Sqlite(curr_snap)) => {
1005            use drizzle_migrations::sqlite::collection::SQLiteDDL;
1006            use drizzle_migrations::sqlite::diff::compute_migration;
1007
1008            let prev_ddl = SQLiteDDL::from_entities(prev_snap.ddl.clone());
1009            let cur_ddl = SQLiteDDL::from_entities(curr_snap.ddl.clone());
1010
1011            let diff = compute_migration(&prev_ddl, &cur_ddl);
1012            Ok((diff.sql_statements, diff.warnings))
1013        }
1014        (Snapshot::Postgres(prev_snap), Snapshot::Postgres(curr_snap)) => {
1015            use drizzle_migrations::postgres::diff_full_snapshots;
1016            use drizzle_migrations::postgres::statements::PostgresGenerator;
1017
1018            let diff = diff_full_snapshots(prev_snap, curr_snap);
1019            let generator = PostgresGenerator::new().with_breakpoints(breakpoints);
1020            Ok((generator.generate(&diff.diffs), Vec::new()))
1021        }
1022        _ => Err(CliError::DialectMismatch),
1023    }
1024}
1025
1026fn execute_statements(
1027    credentials: &Credentials,
1028    _dialect: Dialect,
1029    statements: &[String],
1030) -> Result<(), CliError> {
1031    // In some feature combinations (no drivers), the match arms that would use `statements`
1032    // are compiled out. Touch it to avoid unused-parameter warnings.
1033    let _ = statements;
1034
1035    match credentials {
1036        #[cfg(feature = "rusqlite")]
1037        Credentials::Sqlite { path } => execute_sqlite_statements(path, statements),
1038
1039        #[cfg(not(feature = "rusqlite"))]
1040        Credentials::Sqlite { .. } => Err(CliError::MissingDriver {
1041            dialect: "SQLite",
1042            feature: "rusqlite",
1043        }),
1044
1045        #[cfg(any(feature = "libsql", feature = "turso"))]
1046        Credentials::Turso { url, auth_token } => {
1047            if is_local_libsql(url) {
1048                #[cfg(feature = "libsql")]
1049                {
1050                    execute_libsql_local_statements(url, statements)
1051                }
1052                #[cfg(not(feature = "libsql"))]
1053                {
1054                    Err(CliError::MissingDriver {
1055                        dialect: "LibSQL (local)",
1056                        feature: "libsql",
1057                    })
1058                }
1059            } else {
1060                #[cfg(feature = "turso")]
1061                {
1062                    execute_turso_statements(url, auth_token.as_deref(), statements)
1063                }
1064                #[cfg(not(feature = "turso"))]
1065                {
1066                    let _ = auth_token;
1067                    Err(CliError::MissingDriver {
1068                        dialect: "Turso (remote)",
1069                        feature: "turso",
1070                    })
1071                }
1072            }
1073        }
1074
1075        #[cfg(all(not(feature = "turso"), not(feature = "libsql")))]
1076        Credentials::Turso { .. } => Err(CliError::MissingDriver {
1077            dialect: "Turso",
1078            feature: "turso or libsql",
1079        }),
1080
1081        Credentials::Postgres(creds) => {
1082            let _ = (creds, &statements);
1083            core::cfg_select! {
1084                feature = "postgres-sync" => execute_postgres_sync_statements(creds, statements),
1085                feature = "tokio-postgres" => execute_postgres_async_statements(creds, statements),
1086                _ => Err(CliError::MissingDriver {
1087                    dialect: "PostgreSQL",
1088                    feature: "postgres-sync or tokio-postgres",
1089                }),
1090            }
1091        }
1092
1093        #[cfg(feature = "d1-http")]
1094        Credentials::D1 {
1095            account_id,
1096            database_id,
1097            token,
1098        } => d1_http::execute_statements(account_id, database_id, token, statements),
1099
1100        #[cfg(not(feature = "d1-http"))]
1101        Credentials::D1 { .. } => Err(CliError::MissingDriver {
1102            dialect: "Cloudflare D1 (HTTP)",
1103            feature: "d1-http",
1104        }),
1105
1106        Credentials::AwsDataApi { .. } => Err(CliError::UnsupportedForDriver {
1107            operation: "Direct SQL execution against AWS Data API",
1108            driver: "aws-data-api",
1109            hint: "Use `aws rds-data execute-statement --sql=\"...\"` with the matching \
1110                   --resource-arn / --secret-arn / --database, or connect directly via \
1111                   tokio-postgres.",
1112        }),
1113    }
1114}
1115
1116/// Check if a Turso URL is a local libsql database
1117#[allow(dead_code)]
1118fn is_local_libsql(url: &str) -> bool {
1119    url.starts_with("file:")
1120        || url.starts_with("./")
1121        || url.starts_with('/')
1122        || !url.contains("://")
1123}
1124
1125#[cfg(any(feature = "rusqlite", feature = "libsql", feature = "turso"))]
1126fn process_sqlite_uniques_from_indexes(
1127    raw_indexes: &[drizzle_migrations::sqlite::introspect::RawIndexInfo],
1128    index_columns: &[drizzle_migrations::sqlite::introspect::RawIndexColumn],
1129) -> Vec<drizzle_migrations::sqlite::UniqueConstraint> {
1130    use drizzle_migrations::sqlite::UniqueConstraint;
1131
1132    let mut uniques = Vec::new();
1133
1134    for idx in raw_indexes.iter().filter(|i| i.origin == "u") {
1135        let mut cols: Vec<(i32, String)> = index_columns
1136            .iter()
1137            .filter(|c| c.index_name == idx.name && c.key)
1138            .filter_map(|c| c.name.clone().map(|n| (c.seqno, n)))
1139            .collect();
1140        cols.sort_by_key(|(seq, _)| *seq);
1141        let col_names: Vec<String> = cols.into_iter().map(|(_, n)| n).collect();
1142        if col_names.is_empty() {
1143            continue;
1144        }
1145
1146        let name_explicit = !idx.name.starts_with("sqlite_autoindex_");
1147        let constraint_name = if name_explicit {
1148            idx.name.clone()
1149        } else {
1150            let refs: Vec<&str> = col_names.iter().map(String::as_str).collect();
1151            drizzle_migrations::sqlite::ddl::name_for_unique(&idx.table, &refs)
1152        };
1153
1154        let mut uniq =
1155            UniqueConstraint::from_strings(idx.table.clone(), constraint_name, col_names);
1156        uniq.name_explicit = name_explicit;
1157        uniques.push(uniq);
1158    }
1159
1160    uniques
1161}
1162
1163// ============================================================================
1164// SQLite (rusqlite)
1165// ============================================================================
1166
1167#[cfg(feature = "rusqlite")]
1168fn execute_sqlite_statements(path: &str, statements: &[String]) -> Result<(), CliError> {
1169    let conn = rusqlite::Connection::open(path).map_err(|e| {
1170        CliError::ConnectionError(format!("Failed to open SQLite database '{path}': {e}"))
1171    })?;
1172
1173    conn.execute("BEGIN", [])
1174        .map_err(|e| CliError::MigrationError(e.to_string()))?;
1175
1176    for stmt in statements {
1177        let s = stmt.trim();
1178        if s.is_empty() {
1179            continue;
1180        }
1181        if let Err(e) = conn.execute(s, []) {
1182            let _ = conn.execute("ROLLBACK", []);
1183            return Err(CliError::MigrationError(format!(
1184                "Statement failed: {e}\n{s}"
1185            )));
1186        }
1187    }
1188
1189    conn.execute("COMMIT", [])
1190        .map_err(|e| CliError::MigrationError(e.to_string()))?;
1191
1192    Ok(())
1193}
1194
1195#[cfg(feature = "rusqlite")]
1196fn run_sqlite_migrations(set: &Migrations, path: &str) -> Result<MigrationResult, CliError> {
1197    let conn = rusqlite::Connection::open(path).map_err(|e| {
1198        CliError::ConnectionError(format!("Failed to open SQLite database '{path}': {e}"))
1199    })?;
1200
1201    ensure_sqlite_tracking_table(&conn, set)?;
1202
1203    // Query applied migration names
1204    let applied_names = query_applied_names_sqlite(&conn, set)?;
1205
1206    // Get pending migrations
1207    let pending: Vec<_> = set.pending(&applied_names).collect();
1208    if pending.is_empty() {
1209        return Ok(MigrationResult {
1210            applied_count: 0,
1211            applied_migrations: vec![],
1212        });
1213    }
1214
1215    // Execute in transaction
1216    conn.execute("BEGIN", [])
1217        .map_err(|e| CliError::MigrationError(e.to_string()))?;
1218
1219    let mut applied = Vec::new();
1220    for migration in &pending {
1221        for stmt in migration.statements() {
1222            if !stmt.trim().is_empty()
1223                && let Err(e) = conn.execute(stmt, [])
1224            {
1225                let _ = conn.execute("ROLLBACK", []);
1226                return Err(CliError::MigrationError(format!(
1227                    "Migration '{}' failed: {}",
1228                    migration.hash(),
1229                    e
1230                )));
1231            }
1232        }
1233        if let Err(e) = conn.execute(&set.record_migration_sql(migration), []) {
1234            let _ = conn.execute("ROLLBACK", []);
1235            return Err(CliError::MigrationError(e.to_string()));
1236        }
1237        applied.push(migration.hash().to_string());
1238    }
1239
1240    conn.execute("COMMIT", [])
1241        .map_err(|e| CliError::MigrationError(e.to_string()))?;
1242
1243    Ok(MigrationResult {
1244        applied_count: applied.len(),
1245        applied_migrations: applied,
1246    })
1247}
1248
1249#[cfg(feature = "rusqlite")]
1250fn inspect_sqlite_migrations(set: &Migrations, path: &str) -> Result<MigrationPlan, CliError> {
1251    let conn = rusqlite::Connection::open(path).map_err(|e| {
1252        CliError::ConnectionError(format!("Failed to open SQLite database '{path}': {e}"))
1253    })?;
1254
1255    ensure_sqlite_tracking_table(&conn, set)?;
1256    let applied = query_applied_records_sqlite(&conn, set)?;
1257    build_migration_plan(set, &applied)
1258}
1259
1260#[cfg(feature = "rusqlite")]
1261fn query_applied_records_sqlite(
1262    conn: &rusqlite::Connection,
1263    set: &Migrations,
1264) -> Result<Vec<AppliedMigrationRecord>, CliError> {
1265    let sql = format!(
1266        r#"SELECT hash, "name" FROM {} WHERE "name" IS NOT NULL ORDER BY id;"#,
1267        set.table_ident_sql()
1268    );
1269    let Ok(mut stmt) = conn.prepare(&sql) else {
1270        return Ok(vec![]); // Table might not exist yet
1271    };
1272
1273    let mut applied = Vec::new();
1274    let rows = stmt
1275        .query_map([], |row| {
1276            Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
1277        })
1278        .map_err(|e| CliError::MigrationError(e.to_string()))?;
1279
1280    for row in rows {
1281        let (hash, name) = row.map_err(|e| CliError::MigrationError(e.to_string()))?;
1282        applied.push(AppliedMigrationRecord { hash, name });
1283    }
1284
1285    Ok(applied)
1286}
1287
1288#[cfg(feature = "rusqlite")]
1289fn query_applied_names_sqlite(
1290    conn: &rusqlite::Connection,
1291    set: &Migrations,
1292) -> Result<Vec<String>, CliError> {
1293    let Ok(mut stmt) = conn.prepare(&set.applied_names_sql()) else {
1294        return Ok(vec![]); // Table might not exist yet
1295    };
1296
1297    let names = stmt
1298        .query_map([], |row| row.get::<_, String>(0))
1299        .map_err(|e| CliError::MigrationError(e.to_string()))?
1300        .filter_map(Result::ok)
1301        .collect();
1302
1303    Ok(names)
1304}
1305
1306// ============================================================================
1307// PostgreSQL (postgres - sync)
1308// ============================================================================
1309
1310#[cfg(feature = "postgres-sync")]
1311fn execute_postgres_sync_statements(
1312    creds: &PostgresCreds,
1313    statements: &[String],
1314) -> Result<(), CliError> {
1315    let url = creds.connection_url();
1316    let mut client = postgres::Client::connect(&url, postgres::NoTls)
1317        .map_err(|e| CliError::ConnectionError(format!("Failed to connect to PostgreSQL: {e}")))?;
1318
1319    if has_postgres_concurrent_index(statements) {
1320        // CREATE/DROP INDEX CONCURRENTLY cannot run inside a transaction block.
1321        for stmt in statements {
1322            let s = stmt.trim();
1323            if s.is_empty() {
1324                continue;
1325            }
1326            client
1327                .execute(s, &[])
1328                .map_err(|e| CliError::MigrationError(format!("Statement failed: {e}\n{s}")))?;
1329        }
1330    } else {
1331        let mut tx = client
1332            .transaction()
1333            .map_err(|e| CliError::MigrationError(e.to_string()))?;
1334
1335        for stmt in statements {
1336            let s = stmt.trim();
1337            if s.is_empty() {
1338                continue;
1339            }
1340            tx.execute(s, &[])
1341                .map_err(|e| CliError::MigrationError(format!("Statement failed: {e}\n{s}")))?;
1342        }
1343
1344        tx.commit()
1345            .map_err(|e| CliError::MigrationError(e.to_string()))?;
1346    }
1347
1348    Ok(())
1349}
1350
1351#[cfg(feature = "postgres-sync")]
1352fn run_postgres_sync_migrations(
1353    set: &Migrations,
1354    creds: &PostgresCreds,
1355) -> Result<MigrationResult, CliError> {
1356    let url = creds.connection_url();
1357    let mut client = postgres::Client::connect(&url, postgres::NoTls)
1358        .map_err(|e| CliError::ConnectionError(format!("Failed to connect to PostgreSQL: {e}")))?;
1359
1360    // Create schema if needed
1361    if let Some(schema_sql) = set.create_schema_sql() {
1362        client
1363            .execute(&schema_sql, &[])
1364            .map_err(|e| CliError::MigrationError(e.to_string()))?;
1365    }
1366
1367    ensure_postgres_tracking_table_sync(&mut client, set)?;
1368
1369    // Query applied migration names
1370    let rows = client
1371        .query(&set.applied_names_sql(), &[])
1372        .unwrap_or_default();
1373    let applied_names: Vec<String> = rows.iter().filter_map(|r| r.try_get(0).ok()).collect();
1374
1375    // Get pending migrations
1376    let pending: Vec<_> = set.pending(&applied_names).collect();
1377    if pending.is_empty() {
1378        return Ok(MigrationResult {
1379            applied_count: 0,
1380            applied_migrations: vec![],
1381        });
1382    }
1383
1384    let has_concurrent = pending
1385        .iter()
1386        .any(|m| has_postgres_concurrent_index(m.statements()));
1387
1388    if has_concurrent {
1389        // CREATE/DROP INDEX CONCURRENTLY cannot run inside a transaction block.
1390        let mut applied = Vec::new();
1391        for migration in &pending {
1392            for stmt in migration.statements() {
1393                if !stmt.trim().is_empty() {
1394                    client.execute(stmt, &[]).map_err(|e| {
1395                        CliError::MigrationError(format!(
1396                            "Migration '{}' failed: {}",
1397                            migration.hash(),
1398                            e
1399                        ))
1400                    })?;
1401                }
1402            }
1403            client
1404                .execute(&set.record_migration_sql(migration), &[])
1405                .map_err(|e| CliError::MigrationError(e.to_string()))?;
1406            applied.push(migration.hash().to_string());
1407        }
1408
1409        return Ok(MigrationResult {
1410            applied_count: applied.len(),
1411            applied_migrations: applied,
1412        });
1413    }
1414
1415    // Execute in transaction
1416    let mut tx = client
1417        .transaction()
1418        .map_err(|e| CliError::MigrationError(e.to_string()))?;
1419
1420    let mut applied = Vec::new();
1421    for migration in &pending {
1422        for stmt in migration.statements() {
1423            if !stmt.trim().is_empty() {
1424                tx.execute(stmt, &[]).map_err(|e| {
1425                    CliError::MigrationError(format!(
1426                        "Migration '{}' failed: {}",
1427                        migration.hash(),
1428                        e
1429                    ))
1430                })?;
1431            }
1432        }
1433        tx.execute(&set.record_migration_sql(migration), &[])
1434            .map_err(|e| CliError::MigrationError(e.to_string()))?;
1435        applied.push(migration.hash().to_string());
1436    }
1437
1438    tx.commit()
1439        .map_err(|e| CliError::MigrationError(e.to_string()))?;
1440
1441    Ok(MigrationResult {
1442        applied_count: applied.len(),
1443        applied_migrations: applied,
1444    })
1445}
1446
1447#[cfg(feature = "postgres-sync")]
1448fn inspect_postgres_sync_migrations(
1449    set: &Migrations,
1450    creds: &PostgresCreds,
1451) -> Result<MigrationPlan, CliError> {
1452    let url = creds.connection_url();
1453    let mut client = postgres::Client::connect(&url, postgres::NoTls)
1454        .map_err(|e| CliError::ConnectionError(format!("Failed to connect to PostgreSQL: {e}")))?;
1455
1456    if let Some(schema_sql) = set.create_schema_sql() {
1457        client
1458            .execute(&schema_sql, &[])
1459            .map_err(|e| CliError::MigrationError(e.to_string()))?;
1460    }
1461
1462    ensure_postgres_tracking_table_sync(&mut client, set)?;
1463    let applied = query_applied_records_postgres_sync(&mut client, set);
1464    build_migration_plan(set, &applied)
1465}
1466
1467#[cfg(feature = "postgres-sync")]
1468fn query_applied_records_postgres_sync(
1469    client: &mut postgres::Client,
1470    set: &Migrations,
1471) -> Vec<AppliedMigrationRecord> {
1472    let sql = format!(
1473        r#"SELECT hash, "name" FROM {} WHERE "name" IS NOT NULL ORDER BY id;"#,
1474        set.table_ident_sql()
1475    );
1476    let rows = client.query(&sql, &[]).unwrap_or_default();
1477
1478    let mut applied = Vec::new();
1479    for row in rows {
1480        let hash = row.try_get::<_, Option<String>>(0).ok().flatten();
1481        let name = row.try_get::<_, Option<String>>(1).ok().flatten();
1482        if let (Some(hash), Some(name)) = (hash, name) {
1483            applied.push(AppliedMigrationRecord { hash, name });
1484        }
1485    }
1486
1487    applied
1488}
1489
1490// ============================================================================
1491// PostgreSQL (tokio-postgres - async)
1492// ============================================================================
1493
1494#[cfg(all(feature = "tokio-postgres", not(feature = "postgres-sync")))]
1495fn execute_postgres_async_statements(
1496    creds: &PostgresCreds,
1497    statements: &[String],
1498) -> Result<(), CliError> {
1499    let rt = tokio::runtime::Builder::new_current_thread()
1500        .enable_all()
1501        .build()
1502        .map_err(|e| CliError::Other(format!("Failed to create async runtime: {}", e)))?;
1503
1504    rt.block_on(execute_postgres_async_inner(creds, statements))
1505}
1506
1507#[cfg(all(feature = "tokio-postgres", not(feature = "postgres-sync")))]
1508async fn execute_postgres_async_inner(
1509    creds: &PostgresCreds,
1510    statements: &[String],
1511) -> Result<(), CliError> {
1512    let url = creds.connection_url();
1513    let (mut client, connection) = tokio_postgres::connect(&url, tokio_postgres::NoTls)
1514        .await
1515        .map_err(|e| {
1516            CliError::ConnectionError(format!("Failed to connect to PostgreSQL: {}", e))
1517        })?;
1518
1519    tokio::spawn(async move {
1520        if let Err(e) = connection.await {
1521            eprintln!(
1522                "{}",
1523                output::err_line(&format!("PostgreSQL connection error: {e}"))
1524            );
1525        }
1526    });
1527
1528    if has_postgres_concurrent_index(statements) {
1529        // CREATE/DROP INDEX CONCURRENTLY cannot run inside a transaction block.
1530        for stmt in statements {
1531            let s = stmt.trim();
1532            if s.is_empty() {
1533                continue;
1534            }
1535            client
1536                .execute(s, &[])
1537                .await
1538                .map_err(|e| CliError::MigrationError(format!("Statement failed: {}\n{}", e, s)))?;
1539        }
1540    } else {
1541        let tx = client
1542            .transaction()
1543            .await
1544            .map_err(|e| CliError::MigrationError(e.to_string()))?;
1545
1546        for stmt in statements {
1547            let s = stmt.trim();
1548            if s.is_empty() {
1549                continue;
1550            }
1551            tx.execute(s, &[])
1552                .await
1553                .map_err(|e| CliError::MigrationError(format!("Statement failed: {}\n{}", e, s)))?;
1554        }
1555
1556        tx.commit()
1557            .await
1558            .map_err(|e| CliError::MigrationError(e.to_string()))?;
1559    }
1560
1561    Ok(())
1562}
1563
1564#[cfg(feature = "tokio-postgres")]
1565#[allow(dead_code)] // Used when postgres-sync is not enabled
1566fn run_postgres_async_migrations(
1567    set: &Migrations,
1568    creds: &PostgresCreds,
1569) -> Result<MigrationResult, CliError> {
1570    let rt = tokio::runtime::Builder::new_current_thread()
1571        .enable_all()
1572        .build()
1573        .map_err(|e| CliError::Other(format!("Failed to create async runtime: {e}")))?;
1574
1575    rt.block_on(run_postgres_async_inner(set, creds))
1576}
1577
1578#[cfg(feature = "tokio-postgres")]
1579#[allow(dead_code)] // Used when postgres-sync is not enabled
1580fn inspect_postgres_async_migrations(
1581    set: &Migrations,
1582    creds: &PostgresCreds,
1583) -> Result<MigrationPlan, CliError> {
1584    let rt = tokio::runtime::Builder::new_current_thread()
1585        .enable_all()
1586        .build()
1587        .map_err(|e| CliError::Other(format!("Failed to create async runtime: {e}")))?;
1588
1589    rt.block_on(inspect_postgres_async_inner(set, creds))
1590}
1591
1592#[cfg(feature = "tokio-postgres")]
1593#[allow(dead_code)]
1594async fn inspect_postgres_async_inner(
1595    set: &Migrations,
1596    creds: &PostgresCreds,
1597) -> Result<MigrationPlan, CliError> {
1598    let url = creds.connection_url();
1599    let (client, connection) = tokio_postgres::connect(&url, tokio_postgres::NoTls)
1600        .await
1601        .map_err(|e| CliError::ConnectionError(format!("Failed to connect to PostgreSQL: {e}")))?;
1602
1603    tokio::spawn(async move {
1604        if let Err(e) = connection.await {
1605            eprintln!(
1606                "{}",
1607                output::err_line(&format!("PostgreSQL connection error: {e}"))
1608            );
1609        }
1610    });
1611
1612    if let Some(schema_sql) = set.create_schema_sql() {
1613        client
1614            .execute(&schema_sql, &[])
1615            .await
1616            .map_err(|e| CliError::MigrationError(e.to_string()))?;
1617    }
1618
1619    ensure_postgres_tracking_table_async(&client, set).await?;
1620    let applied = query_applied_records_postgres_async(&client, set).await;
1621    build_migration_plan(set, &applied)
1622}
1623
1624#[cfg(feature = "tokio-postgres")]
1625async fn query_applied_records_postgres_async(
1626    client: &tokio_postgres::Client,
1627    set: &Migrations,
1628) -> Vec<AppliedMigrationRecord> {
1629    let sql = format!(
1630        r#"SELECT hash, "name" FROM {} WHERE "name" IS NOT NULL ORDER BY id;"#,
1631        set.table_ident_sql()
1632    );
1633    let rows = client.query(&sql, &[]).await.unwrap_or_default();
1634
1635    let mut applied = Vec::new();
1636    for row in rows {
1637        let hash = row.try_get::<_, Option<String>>(0).ok().flatten();
1638        let name = row.try_get::<_, Option<String>>(1).ok().flatten();
1639        if let (Some(hash), Some(name)) = (hash, name) {
1640            applied.push(AppliedMigrationRecord { hash, name });
1641        }
1642    }
1643
1644    applied
1645}
1646
1647#[cfg(feature = "tokio-postgres")]
1648#[allow(dead_code)]
1649async fn run_postgres_async_inner(
1650    set: &Migrations,
1651    creds: &PostgresCreds,
1652) -> Result<MigrationResult, CliError> {
1653    let url = creds.connection_url();
1654    let (mut client, connection) = tokio_postgres::connect(&url, tokio_postgres::NoTls)
1655        .await
1656        .map_err(|e| CliError::ConnectionError(format!("Failed to connect to PostgreSQL: {e}")))?;
1657
1658    // Spawn connection handler
1659    tokio::spawn(async move {
1660        if let Err(e) = connection.await {
1661            eprintln!(
1662                "{}",
1663                output::err_line(&format!("PostgreSQL connection error: {e}"))
1664            );
1665        }
1666    });
1667
1668    // Create schema if needed
1669    if let Some(schema_sql) = set.create_schema_sql() {
1670        client
1671            .execute(&schema_sql, &[])
1672            .await
1673            .map_err(|e| CliError::MigrationError(e.to_string()))?;
1674    }
1675
1676    ensure_postgres_tracking_table_async(&client, set).await?;
1677
1678    // Query applied migration names
1679    let rows = client
1680        .query(&set.applied_names_sql(), &[])
1681        .await
1682        .unwrap_or_default();
1683    let applied_names: Vec<String> = rows.iter().filter_map(|r| r.try_get(0).ok()).collect();
1684
1685    // Get pending migrations
1686    let pending: Vec<_> = set.pending(&applied_names).collect();
1687    if pending.is_empty() {
1688        return Ok(MigrationResult {
1689            applied_count: 0,
1690            applied_migrations: vec![],
1691        });
1692    }
1693
1694    let has_concurrent = pending
1695        .iter()
1696        .any(|m| has_postgres_concurrent_index(m.statements()));
1697
1698    if has_concurrent {
1699        // CREATE/DROP INDEX CONCURRENTLY cannot run inside a transaction block.
1700        let mut applied = Vec::new();
1701        for migration in &pending {
1702            for stmt in migration.statements() {
1703                if !stmt.trim().is_empty() {
1704                    client.execute(stmt, &[]).await.map_err(|e| {
1705                        CliError::MigrationError(format!(
1706                            "Migration '{}' failed: {}",
1707                            migration.hash(),
1708                            e
1709                        ))
1710                    })?;
1711                }
1712            }
1713            client
1714                .execute(&set.record_migration_sql(migration), &[])
1715                .await
1716                .map_err(|e| CliError::MigrationError(e.to_string()))?;
1717            applied.push(migration.hash().to_string());
1718        }
1719
1720        return Ok(MigrationResult {
1721            applied_count: applied.len(),
1722            applied_migrations: applied,
1723        });
1724    }
1725
1726    // Execute in transaction
1727    let tx = client
1728        .transaction()
1729        .await
1730        .map_err(|e| CliError::MigrationError(e.to_string()))?;
1731
1732    let mut applied = Vec::new();
1733    for migration in &pending {
1734        for stmt in migration.statements() {
1735            if !stmt.trim().is_empty() {
1736                tx.execute(stmt, &[]).await.map_err(|e| {
1737                    CliError::MigrationError(format!(
1738                        "Migration '{}' failed: {}",
1739                        migration.hash(),
1740                        e
1741                    ))
1742                })?;
1743            }
1744        }
1745        tx.execute(&set.record_migration_sql(migration), &[])
1746            .await
1747            .map_err(|e| CliError::MigrationError(e.to_string()))?;
1748        applied.push(migration.hash().to_string());
1749    }
1750
1751    tx.commit()
1752        .await
1753        .map_err(|e| CliError::MigrationError(e.to_string()))?;
1754
1755    Ok(MigrationResult {
1756        applied_count: applied.len(),
1757        applied_migrations: applied,
1758    })
1759}
1760
1761// ============================================================================
1762// LibSQL (local)
1763// ============================================================================
1764
1765#[cfg(feature = "libsql")]
1766fn execute_libsql_local_statements(path: &str, statements: &[String]) -> Result<(), CliError> {
1767    let rt = tokio::runtime::Builder::new_current_thread()
1768        .enable_all()
1769        .build()
1770        .map_err(|e| CliError::Other(format!("Failed to create async runtime: {e}")))?;
1771
1772    rt.block_on(execute_libsql_local_inner(path, statements))
1773}
1774
1775#[cfg(feature = "libsql")]
1776async fn execute_libsql_local_inner(path: &str, statements: &[String]) -> Result<(), CliError> {
1777    let db = libsql::Builder::new_local(path)
1778        .build()
1779        .await
1780        .map_err(|e| {
1781            CliError::ConnectionError(format!("Failed to open LibSQL database '{path}': {e}"))
1782        })?;
1783
1784    let conn = db
1785        .connect()
1786        .map_err(|e| CliError::ConnectionError(e.to_string()))?;
1787
1788    let tx = conn
1789        .transaction()
1790        .await
1791        .map_err(|e| CliError::MigrationError(e.to_string()))?;
1792
1793    for stmt in statements {
1794        let s = stmt.trim();
1795        if s.is_empty() {
1796            continue;
1797        }
1798        if let Err(e) = tx.execute(s, ()).await {
1799            tx.rollback().await.ok();
1800            return Err(CliError::MigrationError(format!(
1801                "Statement failed: {e}\n{s}"
1802            )));
1803        }
1804    }
1805
1806    tx.commit()
1807        .await
1808        .map_err(|e| CliError::MigrationError(e.to_string()))?;
1809
1810    Ok(())
1811}
1812
1813#[cfg(feature = "libsql")]
1814fn run_libsql_local_migrations(set: &Migrations, path: &str) -> Result<MigrationResult, CliError> {
1815    let rt = tokio::runtime::Builder::new_current_thread()
1816        .enable_all()
1817        .build()
1818        .map_err(|e| CliError::Other(format!("Failed to create async runtime: {e}")))?;
1819
1820    rt.block_on(run_libsql_local_inner(set, path))
1821}
1822
1823#[cfg(feature = "libsql")]
1824fn inspect_libsql_local_migrations(
1825    set: &Migrations,
1826    path: &str,
1827) -> Result<MigrationPlan, CliError> {
1828    let rt = tokio::runtime::Builder::new_current_thread()
1829        .enable_all()
1830        .build()
1831        .map_err(|e| CliError::Other(format!("Failed to create async runtime: {e}")))?;
1832
1833    rt.block_on(inspect_libsql_local_inner(set, path))
1834}
1835
1836#[cfg(feature = "libsql")]
1837async fn inspect_libsql_local_inner(
1838    set: &Migrations,
1839    path: &str,
1840) -> Result<MigrationPlan, CliError> {
1841    let db = libsql::Builder::new_local(path)
1842        .build()
1843        .await
1844        .map_err(|e| {
1845            CliError::ConnectionError(format!("Failed to open LibSQL database '{path}': {e}"))
1846        })?;
1847
1848    let conn = db
1849        .connect()
1850        .map_err(|e| CliError::ConnectionError(e.to_string()))?;
1851
1852    ensure_sqlite_tracking_table_libsql(&conn, set).await?;
1853    let applied = query_applied_records_libsql(&conn, set).await?;
1854    build_migration_plan(set, &applied)
1855}
1856
1857#[cfg(feature = "libsql")]
1858async fn run_libsql_local_inner(set: &Migrations, path: &str) -> Result<MigrationResult, CliError> {
1859    let db = libsql::Builder::new_local(path)
1860        .build()
1861        .await
1862        .map_err(|e| {
1863            CliError::ConnectionError(format!("Failed to open LibSQL database '{path}': {e}"))
1864        })?;
1865
1866    let conn = db
1867        .connect()
1868        .map_err(|e| CliError::ConnectionError(e.to_string()))?;
1869
1870    ensure_sqlite_tracking_table_libsql(&conn, set).await?;
1871
1872    // Query applied migration names
1873    let applied_names = query_applied_names_libsql(&conn, set).await?;
1874
1875    // Get pending migrations
1876    let pending: Vec<_> = set.pending(&applied_names).collect();
1877    if pending.is_empty() {
1878        return Ok(MigrationResult {
1879            applied_count: 0,
1880            applied_migrations: vec![],
1881        });
1882    }
1883
1884    // Execute in transaction
1885    let tx = conn
1886        .transaction()
1887        .await
1888        .map_err(|e| CliError::MigrationError(e.to_string()))?;
1889
1890    let mut applied = Vec::new();
1891    for migration in &pending {
1892        for stmt in migration.statements() {
1893            if !stmt.trim().is_empty()
1894                && let Err(e) = tx.execute(stmt, ()).await
1895            {
1896                tx.rollback().await.ok();
1897                return Err(CliError::MigrationError(format!(
1898                    "Migration '{}' failed: {}",
1899                    migration.hash(),
1900                    e
1901                )));
1902            }
1903        }
1904        if let Err(e) = tx.execute(&set.record_migration_sql(migration), ()).await {
1905            tx.rollback().await.ok();
1906            return Err(CliError::MigrationError(e.to_string()));
1907        }
1908        applied.push(migration.hash().to_string());
1909    }
1910
1911    tx.commit()
1912        .await
1913        .map_err(|e| CliError::MigrationError(e.to_string()))?;
1914
1915    Ok(MigrationResult {
1916        applied_count: applied.len(),
1917        applied_migrations: applied,
1918    })
1919}
1920
1921#[cfg(feature = "libsql")]
1922async fn query_applied_names_libsql(
1923    conn: &libsql::Connection,
1924    set: &Migrations,
1925) -> Result<Vec<String>, CliError> {
1926    let Ok(mut rows) = conn.query(&set.applied_names_sql(), ()).await else {
1927        return Ok(vec![]); // Table might not exist yet
1928    };
1929
1930    let mut names = Vec::new();
1931    while let Ok(Some(row)) = rows.next().await {
1932        if let Ok(name) = row.get::<String>(0) {
1933            names.push(name);
1934        }
1935    }
1936
1937    Ok(names)
1938}
1939
1940#[cfg(feature = "libsql")]
1941async fn query_applied_records_libsql(
1942    conn: &libsql::Connection,
1943    set: &Migrations,
1944) -> Result<Vec<AppliedMigrationRecord>, CliError> {
1945    let sql = format!(
1946        r#"SELECT hash, "name" FROM {} WHERE "name" IS NOT NULL ORDER BY id;"#,
1947        set.table_ident_sql()
1948    );
1949    let Ok(mut rows) = conn.query(&sql, ()).await else {
1950        return Ok(vec![]); // Table might not exist yet
1951    };
1952
1953    let mut applied = Vec::new();
1954    while let Ok(Some(row)) = rows.next().await {
1955        if let (Ok(hash), Ok(name)) = (row.get::<String>(0), row.get::<String>(1)) {
1956            applied.push(AppliedMigrationRecord { hash, name });
1957        }
1958    }
1959
1960    Ok(applied)
1961}
1962
1963// ============================================================================
1964// Turso (remote)
1965// ============================================================================
1966
1967#[cfg(feature = "turso")]
1968fn execute_turso_statements(
1969    url: &str,
1970    auth_token: Option<&str>,
1971    statements: &[String],
1972) -> Result<(), CliError> {
1973    let rt = tokio::runtime::Builder::new_current_thread()
1974        .enable_all()
1975        .build()
1976        .map_err(|e| CliError::Other(format!("Failed to create async runtime: {e}")))?;
1977
1978    rt.block_on(execute_turso_inner(url, auth_token, statements))
1979}
1980
1981#[cfg(feature = "turso")]
1982async fn execute_turso_inner(
1983    url: &str,
1984    auth_token: Option<&str>,
1985    statements: &[String],
1986) -> Result<(), CliError> {
1987    let builder =
1988        libsql::Builder::new_remote(url.to_string(), auth_token.unwrap_or("").to_string());
1989
1990    let db = builder.build().await.map_err(|e| {
1991        CliError::ConnectionError(format!("Failed to connect to Turso '{url}': {e}"))
1992    })?;
1993
1994    let conn = db
1995        .connect()
1996        .map_err(|e| CliError::ConnectionError(e.to_string()))?;
1997
1998    let tx = conn
1999        .transaction()
2000        .await
2001        .map_err(|e| CliError::MigrationError(e.to_string()))?;
2002
2003    for stmt in statements {
2004        let s = stmt.trim();
2005        if s.is_empty() {
2006            continue;
2007        }
2008        if let Err(e) = tx.execute(s, ()).await {
2009            tx.rollback().await.ok();
2010            return Err(CliError::MigrationError(format!(
2011                "Statement failed: {e}\n{s}"
2012            )));
2013        }
2014    }
2015
2016    tx.commit()
2017        .await
2018        .map_err(|e| CliError::MigrationError(e.to_string()))?;
2019
2020    Ok(())
2021}
2022
2023#[cfg(feature = "turso")]
2024fn run_turso_migrations(
2025    set: &Migrations,
2026    url: &str,
2027    auth_token: Option<&str>,
2028) -> Result<MigrationResult, CliError> {
2029    let rt = tokio::runtime::Builder::new_current_thread()
2030        .enable_all()
2031        .build()
2032        .map_err(|e| CliError::Other(format!("Failed to create async runtime: {e}")))?;
2033
2034    rt.block_on(run_turso_inner(set, url, auth_token))
2035}
2036
2037#[cfg(feature = "turso")]
2038fn inspect_turso_migrations(
2039    set: &Migrations,
2040    url: &str,
2041    auth_token: Option<&str>,
2042) -> Result<MigrationPlan, CliError> {
2043    let rt = tokio::runtime::Builder::new_current_thread()
2044        .enable_all()
2045        .build()
2046        .map_err(|e| CliError::Other(format!("Failed to create async runtime: {e}")))?;
2047
2048    rt.block_on(inspect_turso_inner(set, url, auth_token))
2049}
2050
2051#[cfg(feature = "turso")]
2052async fn inspect_turso_inner(
2053    set: &Migrations,
2054    url: &str,
2055    auth_token: Option<&str>,
2056) -> Result<MigrationPlan, CliError> {
2057    let builder =
2058        libsql::Builder::new_remote(url.to_string(), auth_token.unwrap_or("").to_string());
2059
2060    let db = builder.build().await.map_err(|e| {
2061        CliError::ConnectionError(format!("Failed to connect to Turso '{url}': {e}"))
2062    })?;
2063
2064    let conn = db
2065        .connect()
2066        .map_err(|e| CliError::ConnectionError(e.to_string()))?;
2067
2068    ensure_sqlite_tracking_table_libsql(&conn, set).await?;
2069    let applied = query_applied_records_turso(&conn, set).await?;
2070    build_migration_plan(set, &applied)
2071}
2072
2073#[cfg(feature = "turso")]
2074async fn run_turso_inner(
2075    set: &Migrations,
2076    url: &str,
2077    auth_token: Option<&str>,
2078) -> Result<MigrationResult, CliError> {
2079    let builder =
2080        libsql::Builder::new_remote(url.to_string(), auth_token.unwrap_or("").to_string());
2081
2082    let db = builder.build().await.map_err(|e| {
2083        CliError::ConnectionError(format!("Failed to connect to Turso '{url}': {e}"))
2084    })?;
2085
2086    let conn = db
2087        .connect()
2088        .map_err(|e| CliError::ConnectionError(e.to_string()))?;
2089
2090    ensure_sqlite_tracking_table_libsql(&conn, set).await?;
2091
2092    // Query applied migration names
2093    let applied_names = query_applied_names_turso(&conn, set).await?;
2094
2095    // Get pending migrations
2096    let pending: Vec<_> = set.pending(&applied_names).collect();
2097    if pending.is_empty() {
2098        return Ok(MigrationResult {
2099            applied_count: 0,
2100            applied_migrations: vec![],
2101        });
2102    }
2103
2104    // Execute in transaction
2105    let tx = conn
2106        .transaction()
2107        .await
2108        .map_err(|e| CliError::MigrationError(e.to_string()))?;
2109
2110    let mut applied = Vec::new();
2111    for migration in &pending {
2112        for stmt in migration.statements() {
2113            if !stmt.trim().is_empty()
2114                && let Err(e) = tx.execute(stmt, ()).await
2115            {
2116                tx.rollback().await.ok();
2117                return Err(CliError::MigrationError(format!(
2118                    "Migration '{}' failed: {}",
2119                    migration.hash(),
2120                    e
2121                )));
2122            }
2123        }
2124        if let Err(e) = tx.execute(&set.record_migration_sql(migration), ()).await {
2125            tx.rollback().await.ok();
2126            return Err(CliError::MigrationError(e.to_string()));
2127        }
2128        applied.push(migration.hash().to_string());
2129    }
2130
2131    tx.commit()
2132        .await
2133        .map_err(|e| CliError::MigrationError(e.to_string()))?;
2134
2135    Ok(MigrationResult {
2136        applied_count: applied.len(),
2137        applied_migrations: applied,
2138    })
2139}
2140
2141#[cfg(feature = "turso")]
2142async fn query_applied_names_turso(
2143    conn: &libsql::Connection,
2144    set: &Migrations,
2145) -> Result<Vec<String>, CliError> {
2146    let Ok(mut rows) = conn.query(&set.applied_names_sql(), ()).await else {
2147        return Ok(vec![]); // Table might not exist yet
2148    };
2149
2150    let mut names = Vec::new();
2151    while let Ok(Some(row)) = rows.next().await {
2152        if let Ok(name) = row.get::<String>(0) {
2153            names.push(name);
2154        }
2155    }
2156
2157    Ok(names)
2158}
2159
2160#[cfg(feature = "turso")]
2161async fn query_applied_records_turso(
2162    conn: &libsql::Connection,
2163    set: &Migrations,
2164) -> Result<Vec<AppliedMigrationRecord>, CliError> {
2165    let sql = format!(
2166        r#"SELECT hash, "name" FROM {} WHERE "name" IS NOT NULL ORDER BY id;"#,
2167        set.table_ident_sql()
2168    );
2169    let Ok(mut rows) = conn.query(&sql, ()).await else {
2170        return Ok(vec![]); // Table might not exist yet
2171    };
2172
2173    let mut applied = Vec::new();
2174    while let Ok(Some(row)) = rows.next().await {
2175        if let (Ok(hash), Ok(name)) = (row.get::<String>(0), row.get::<String>(1)) {
2176            applied.push(AppliedMigrationRecord { hash, name });
2177        }
2178    }
2179
2180    Ok(applied)
2181}
2182
2183// ============================================================================
2184// Database Introspection
2185// ============================================================================
2186
2187/// Result of database introspection
2188#[derive(Debug)]
2189pub struct IntrospectResult {
2190    /// Generated Rust schema code
2191    pub schema_code: String,
2192    /// Number of tables found
2193    pub table_count: usize,
2194    /// Number of indexes found
2195    pub index_count: usize,
2196    /// Number of views found
2197    pub view_count: usize,
2198    /// Any warnings during introspection
2199    pub warnings: Vec<String>,
2200    /// The schema snapshot for migration tracking
2201    pub snapshot: Snapshot,
2202    /// Path to the generated snapshot file
2203    pub snapshot_path: std::path::PathBuf,
2204}
2205
2206/// Introspect a database and write schema/snapshot files.
2207///
2208/// This is the main entry point for CLI introspection.
2209///
2210/// # Errors
2211///
2212/// Returns [`CliError`] if connecting to the database fails, if querying the
2213/// catalogs fails, if applying the configured snapshot filters fails, or if
2214/// writing the generated schema and snapshot files to disk fails.
2215#[allow(clippy::too_many_arguments)]
2216pub fn run_introspection(
2217    credentials: &Credentials,
2218    dialect: Dialect,
2219    out_dir: &Path,
2220    init_metadata: bool,
2221    breakpoints: bool,
2222    introspect_casing: Option<IntrospectCasing>,
2223    filters: &SnapshotFilters,
2224    migrations_table: &str,
2225    migrations_schema: &str,
2226) -> Result<IntrospectResult, CliError> {
2227    use drizzle_migrations::words::generate_migration_tag;
2228
2229    // Perform introspection
2230    let mut result = introspect_database(credentials, dialect)?;
2231    apply_snapshot_filters(&mut result.snapshot, dialect, filters)?;
2232    if !filters.is_empty() || introspect_casing.is_some() {
2233        regenerate_schema_from_snapshot(&mut result, dialect, introspect_casing);
2234    }
2235
2236    // Write schema file
2237    let schema_path = out_dir.join("schema.rs");
2238    if let Some(parent) = schema_path.parent() {
2239        std::fs::create_dir_all(parent).map_err(|e| {
2240            CliError::Other(format!(
2241                "Failed to create output directory '{}': {}",
2242                parent.display(),
2243                e
2244            ))
2245        })?;
2246    }
2247    std::fs::write(&schema_path, &result.schema_code).map_err(|e| {
2248        CliError::Other(format!(
2249            "Failed to write schema file '{}': {}",
2250            schema_path.display(),
2251            e
2252        ))
2253    })?;
2254
2255    let journal_path = out_dir.join("meta").join("_journal.json");
2256    if journal_path.exists() {
2257        return Err(CliError::Other(
2258            "Detected old drizzle-kit migration folders. Upgrade them before writing new migrations."
2259                .to_string(),
2260        ));
2261    }
2262
2263    // Generate migration tag (V3 format: timestamp-based)
2264    let tag = generate_migration_tag(None);
2265
2266    // Create migration directory: {out}/{tag}/
2267    let migration_dir = out_dir.join(&tag);
2268    std::fs::create_dir_all(&migration_dir).map_err(|e| {
2269        CliError::Other(format!(
2270            "Failed to create migration directory '{}': {}",
2271            migration_dir.display(),
2272            e
2273        ))
2274    })?;
2275
2276    // Save snapshot JSON: {out}/{tag}/snapshot.json
2277    let snapshot_path = migration_dir.join("snapshot.json");
2278    result.snapshot.save(&snapshot_path).map_err(|e| {
2279        CliError::Other(format!(
2280            "Failed to write snapshot file '{}': {}",
2281            snapshot_path.display(),
2282            e
2283        ))
2284    })?;
2285
2286    // Generate initial migration SQL by diffing against empty snapshot
2287    let base_dialect = dialect.to_base();
2288    let empty_snapshot = Snapshot::empty(base_dialect);
2289    let sql_statements =
2290        generate_introspect_migration(&empty_snapshot, &result.snapshot, breakpoints)?;
2291
2292    // Write migration.sql: {out}/{tag}/migration.sql
2293    let migration_sql_path = migration_dir.join("migration.sql");
2294    let sql_content = format_migration_sql(&sql_statements, breakpoints);
2295    std::fs::write(&migration_sql_path, &sql_content).map_err(|e| {
2296        CliError::Other(format!(
2297            "Failed to write migration file '{}': {}",
2298            migration_sql_path.display(),
2299            e
2300        ))
2301    })?;
2302
2303    // Update result with path
2304    result.snapshot_path = snapshot_path;
2305
2306    if init_metadata {
2307        apply_init_metadata(
2308            credentials,
2309            dialect,
2310            out_dir,
2311            migrations_table,
2312            migrations_schema,
2313        )?;
2314    }
2315
2316    Ok(result)
2317}
2318
2319// =============================================================================
2320// Init metadata handling
2321// =============================================================================
2322
2323#[allow(unused_variables)] // params consumed inside feature-gated block
2324fn apply_init_metadata(
2325    credentials: &Credentials,
2326    dialect: Dialect,
2327    out_dir: &Path,
2328    migrations_table: &str,
2329    migrations_schema: &str,
2330) -> Result<(), CliError> {
2331    #[cfg(any(
2332        feature = "rusqlite",
2333        feature = "libsql",
2334        feature = "turso",
2335        feature = "postgres-sync",
2336        feature = "tokio-postgres",
2337        feature = "d1-http",
2338    ))]
2339    let set = load_migration_set(dialect, out_dir, migrations_table, migrations_schema)?;
2340
2341    match credentials {
2342        #[cfg(feature = "rusqlite")]
2343        Credentials::Sqlite { path } => init_sqlite_metadata(path, &set),
2344
2345        #[cfg(not(feature = "rusqlite"))]
2346        Credentials::Sqlite { .. } => Err(CliError::MissingDriver {
2347            dialect: "SQLite",
2348            feature: "rusqlite",
2349        }),
2350
2351        #[cfg(any(feature = "libsql", feature = "turso"))]
2352        Credentials::Turso { url, auth_token } => {
2353            if is_local_libsql(url) {
2354                #[cfg(feature = "libsql")]
2355                {
2356                    init_libsql_local_metadata(url, &set)
2357                }
2358                #[cfg(not(feature = "libsql"))]
2359                {
2360                    Err(CliError::MissingDriver {
2361                        dialect: "LibSQL (local)",
2362                        feature: "libsql",
2363                    })
2364                }
2365            } else {
2366                #[cfg(feature = "turso")]
2367                {
2368                    init_turso_metadata(url, auth_token.as_deref(), &set)
2369                }
2370                #[cfg(not(feature = "turso"))]
2371                {
2372                    let _ = auth_token;
2373                    Err(CliError::MissingDriver {
2374                        dialect: "Turso (remote)",
2375                        feature: "turso",
2376                    })
2377                }
2378            }
2379        }
2380
2381        #[cfg(all(not(feature = "turso"), not(feature = "libsql")))]
2382        Credentials::Turso { .. } => Err(CliError::MissingDriver {
2383            dialect: "Turso",
2384            feature: "turso or libsql",
2385        }),
2386
2387        Credentials::Postgres(creds) => {
2388            let _ = creds;
2389            core::cfg_select! {
2390                feature = "postgres-sync" => init_postgres_sync_metadata(creds, &set),
2391                feature = "tokio-postgres" => init_postgres_async_metadata(creds, &set),
2392                _ => Err(CliError::MissingDriver {
2393                    dialect: "PostgreSQL",
2394                    feature: "postgres-sync or tokio-postgres",
2395                }),
2396            }
2397        }
2398
2399        #[cfg(feature = "d1-http")]
2400        Credentials::D1 {
2401            account_id,
2402            database_id,
2403            token,
2404        } => d1_http::init_metadata(&set, account_id, database_id, token),
2405
2406        #[cfg(not(feature = "d1-http"))]
2407        Credentials::D1 { .. } => Err(CliError::MissingDriver {
2408            dialect: "Cloudflare D1 (HTTP)",
2409            feature: "d1-http",
2410        }),
2411
2412        Credentials::AwsDataApi { .. } => Err(CliError::UnsupportedForDriver {
2413            operation: "Migration metadata init against AWS Data API",
2414            driver: "aws-data-api",
2415            hint: "AWS RDS Data API schema ops are not yet wired into this CLI. Seed the \
2416                   migrations table manually via `aws rds-data execute-statement` or \
2417                   tokio-postgres.",
2418        }),
2419    }
2420}
2421
2422#[cfg(any(
2423    feature = "rusqlite",
2424    feature = "libsql",
2425    feature = "turso",
2426    feature = "postgres-sync",
2427    feature = "tokio-postgres",
2428    feature = "d1-http",
2429))]
2430pub(crate) fn validate_init_metadata(
2431    applied_names: &[String],
2432    set: &Migrations,
2433) -> Result<(), CliError> {
2434    if !applied_names.is_empty() {
2435        return Err(CliError::Other(
2436            "--init can't be used when database already has migrations set".into(),
2437        ));
2438    }
2439
2440    if set.all().len() > 1 {
2441        return Err(CliError::Other(
2442            "--init can't be used with existing migrations".into(),
2443        ));
2444    }
2445
2446    Ok(())
2447}
2448
2449// =============================================================================
2450// Init metadata implementations
2451// =============================================================================
2452
2453#[cfg(feature = "rusqlite")]
2454fn init_sqlite_metadata(path: &str, set: &Migrations) -> Result<(), CliError> {
2455    let conn = rusqlite::Connection::open(path).map_err(|e| {
2456        CliError::ConnectionError(format!("Failed to open SQLite database '{path}': {e}"))
2457    })?;
2458
2459    ensure_sqlite_tracking_table(&conn, set)?;
2460
2461    let applied_names = query_applied_names_sqlite(&conn, set)?;
2462    validate_init_metadata(&applied_names, set)?;
2463
2464    let Some(first) = set.all().first() else {
2465        return Ok(());
2466    };
2467
2468    conn.execute(&set.record_migration_sql(first), [])
2469        .map_err(|e| CliError::MigrationError(e.to_string()))?;
2470
2471    Ok(())
2472}
2473
2474#[cfg(feature = "libsql")]
2475fn init_libsql_local_metadata(path: &str, set: &Migrations) -> Result<(), CliError> {
2476    let rt = tokio::runtime::Builder::new_current_thread()
2477        .enable_all()
2478        .build()
2479        .map_err(|e| CliError::Other(format!("Failed to create async runtime: {e}")))?;
2480
2481    rt.block_on(init_libsql_local_metadata_inner(path, set))
2482}
2483
2484#[cfg(feature = "libsql")]
2485async fn init_libsql_local_metadata_inner(path: &str, set: &Migrations) -> Result<(), CliError> {
2486    let db = libsql::Builder::new_local(path)
2487        .build()
2488        .await
2489        .map_err(|e| {
2490            CliError::ConnectionError(format!("Failed to open LibSQL database '{path}': {e}"))
2491        })?;
2492
2493    let conn = db
2494        .connect()
2495        .map_err(|e| CliError::ConnectionError(e.to_string()))?;
2496
2497    ensure_sqlite_tracking_table_libsql(&conn, set).await?;
2498
2499    let applied_names = query_applied_names_libsql(&conn, set).await?;
2500    validate_init_metadata(&applied_names, set)?;
2501
2502    let Some(first) = set.all().first() else {
2503        return Ok(());
2504    };
2505
2506    conn.execute(&set.record_migration_sql(first), ())
2507        .await
2508        .map_err(|e| CliError::MigrationError(e.to_string()))?;
2509
2510    Ok(())
2511}
2512
2513#[cfg(feature = "turso")]
2514fn init_turso_metadata(
2515    url: &str,
2516    auth_token: Option<&str>,
2517    set: &Migrations,
2518) -> Result<(), CliError> {
2519    let rt = tokio::runtime::Builder::new_current_thread()
2520        .enable_all()
2521        .build()
2522        .map_err(|e| CliError::Other(format!("Failed to create async runtime: {e}")))?;
2523
2524    rt.block_on(init_turso_metadata_inner(url, auth_token, set))
2525}
2526
2527#[cfg(feature = "turso")]
2528async fn init_turso_metadata_inner(
2529    url: &str,
2530    auth_token: Option<&str>,
2531    set: &Migrations,
2532) -> Result<(), CliError> {
2533    let builder =
2534        libsql::Builder::new_remote(url.to_string(), auth_token.unwrap_or("").to_string());
2535
2536    let db = builder.build().await.map_err(|e| {
2537        CliError::ConnectionError(format!("Failed to connect to Turso '{url}': {e}"))
2538    })?;
2539
2540    let conn = db
2541        .connect()
2542        .map_err(|e| CliError::ConnectionError(e.to_string()))?;
2543
2544    ensure_sqlite_tracking_table_libsql(&conn, set).await?;
2545
2546    let applied_names = query_applied_names_turso(&conn, set).await?;
2547    validate_init_metadata(&applied_names, set)?;
2548
2549    let Some(first) = set.all().first() else {
2550        return Ok(());
2551    };
2552
2553    conn.execute(&set.record_migration_sql(first), ())
2554        .await
2555        .map_err(|e| CliError::MigrationError(e.to_string()))?;
2556
2557    Ok(())
2558}
2559
2560#[cfg(feature = "postgres-sync")]
2561fn init_postgres_sync_metadata(creds: &PostgresCreds, set: &Migrations) -> Result<(), CliError> {
2562    let url = creds.connection_url();
2563    let mut client = postgres::Client::connect(&url, postgres::NoTls)
2564        .map_err(|e| CliError::ConnectionError(format!("Failed to connect to PostgreSQL: {e}")))?;
2565
2566    if let Some(schema_sql) = set.create_schema_sql() {
2567        client
2568            .execute(&schema_sql, &[])
2569            .map_err(|e| CliError::MigrationError(e.to_string()))?;
2570    }
2571
2572    ensure_postgres_tracking_table_sync(&mut client, set)?;
2573
2574    let rows = client
2575        .query(&set.applied_names_sql(), &[])
2576        .unwrap_or_default();
2577    let applied_names: Vec<String> = rows.iter().filter_map(|r| r.try_get(0).ok()).collect();
2578
2579    validate_init_metadata(&applied_names, set)?;
2580
2581    let Some(first) = set.all().first() else {
2582        return Ok(());
2583    };
2584
2585    client
2586        .execute(&set.record_migration_sql(first), &[])
2587        .map_err(|e| CliError::MigrationError(e.to_string()))?;
2588
2589    Ok(())
2590}
2591
2592#[cfg(all(feature = "tokio-postgres", not(feature = "postgres-sync")))]
2593fn init_postgres_async_metadata(creds: &PostgresCreds, set: &Migrations) -> Result<(), CliError> {
2594    let rt = tokio::runtime::Builder::new_current_thread()
2595        .enable_all()
2596        .build()
2597        .map_err(|e| CliError::Other(format!("Failed to create async runtime: {}", e)))?;
2598
2599    rt.block_on(init_postgres_async_inner(creds, set))
2600}
2601
2602#[cfg(all(feature = "tokio-postgres", not(feature = "postgres-sync")))]
2603async fn init_postgres_async_inner(
2604    creds: &PostgresCreds,
2605    set: &Migrations,
2606) -> Result<(), CliError> {
2607    let url = creds.connection_url();
2608    let (client, connection) = tokio_postgres::connect(&url, tokio_postgres::NoTls)
2609        .await
2610        .map_err(|e| {
2611            CliError::ConnectionError(format!("Failed to connect to PostgreSQL: {}", e))
2612        })?;
2613
2614    tokio::spawn(async move {
2615        if let Err(e) = connection.await {
2616            eprintln!(
2617                "{}",
2618                output::err_line(&format!("PostgreSQL connection error: {e}"))
2619            );
2620        }
2621    });
2622
2623    if let Some(schema_sql) = set.create_schema_sql() {
2624        client
2625            .execute(&schema_sql, &[])
2626            .await
2627            .map_err(|e| CliError::MigrationError(e.to_string()))?;
2628    }
2629
2630    ensure_postgres_tracking_table_async(&client, set).await?;
2631
2632    let rows = client
2633        .query(&set.applied_names_sql(), &[])
2634        .await
2635        .unwrap_or_default();
2636    let applied_names: Vec<String> = rows.iter().filter_map(|r| r.try_get(0).ok()).collect();
2637
2638    validate_init_metadata(&applied_names, set)?;
2639
2640    let Some(first) = set.all().first() else {
2641        return Ok(());
2642    };
2643
2644    client
2645        .execute(&set.record_migration_sql(first), &[])
2646        .await
2647        .map_err(|e| CliError::MigrationError(e.to_string()))?;
2648
2649    Ok(())
2650}
2651
2652/// Generate migration SQL from snapshot diff (for introspection)
2653fn generate_introspect_migration(
2654    prev: &Snapshot,
2655    current: &Snapshot,
2656    breakpoints: bool,
2657) -> Result<Vec<String>, CliError> {
2658    match (prev, current) {
2659        (Snapshot::Sqlite(prev_snap), Snapshot::Sqlite(curr_snap)) => {
2660            use drizzle_migrations::sqlite::diff_snapshots;
2661            use drizzle_migrations::sqlite::statements::SqliteGenerator;
2662
2663            let diff = diff_snapshots(prev_snap, curr_snap);
2664            let generator = SqliteGenerator::new().with_breakpoints(breakpoints);
2665            Ok(generator.generate_migration(&diff))
2666        }
2667        (Snapshot::Postgres(prev_snap), Snapshot::Postgres(curr_snap)) => {
2668            use drizzle_migrations::postgres::diff_full_snapshots;
2669            use drizzle_migrations::postgres::statements::PostgresGenerator;
2670
2671            let diff = diff_full_snapshots(prev_snap, curr_snap);
2672            let generator = PostgresGenerator::new().with_breakpoints(breakpoints);
2673            Ok(generator.generate(&diff.diffs))
2674        }
2675        _ => Err(CliError::DialectMismatch),
2676    }
2677}
2678
2679fn format_migration_sql(sql_statements: &[String], breakpoints: bool) -> String {
2680    if sql_statements.is_empty() {
2681        "-- No tables to create (empty database)\n".to_string()
2682    } else if breakpoints {
2683        sql_statements.join("\n--> statement-breakpoint\n")
2684    } else {
2685        sql_statements.join("\n\n")
2686    }
2687}
2688
2689/// Apply the configured filters (tables, schemas, extensions) in-place on a
2690/// snapshot, removing entities that do not match.
2691///
2692/// # Errors
2693///
2694/// Returns [`CliError::DialectMismatch`] if `dialect` does not agree with the
2695/// variant of `snapshot`, or [`CliError`] if compiling a filter pattern fails.
2696pub fn apply_snapshot_filters(
2697    snapshot: &mut Snapshot,
2698    dialect: Dialect,
2699    filters: &SnapshotFilters,
2700) -> Result<(), CliError> {
2701    if filters.is_empty() {
2702        return Ok(());
2703    }
2704
2705    match (dialect, snapshot) {
2706        (Dialect::Sqlite | Dialect::Turso, Snapshot::Sqlite(sqlite)) => {
2707            apply_sqlite_snapshot_filters(sqlite, filters)
2708        }
2709        (Dialect::Postgresql, Snapshot::Postgres(postgres)) => {
2710            apply_postgres_snapshot_filters(postgres, filters)
2711        }
2712        _ => Err(CliError::DialectMismatch),
2713    }
2714}
2715
2716fn apply_sqlite_snapshot_filters(
2717    snapshot: &mut drizzle_migrations::sqlite::SQLiteSnapshot,
2718    filters: &SnapshotFilters,
2719) -> Result<(), CliError> {
2720    use drizzle_types::sqlite::ddl::SqliteEntity;
2721    use std::collections::HashSet;
2722
2723    let table_patterns = compile_patterns(filters.tables.as_deref())?;
2724    if table_patterns.is_none() {
2725        return Ok(());
2726    }
2727
2728    let mut keep_tables: HashSet<String> = HashSet::new();
2729    for entity in &snapshot.ddl {
2730        if let SqliteEntity::Table(table) = entity
2731            && matches_patterns(table.name.as_ref(), table_patterns.as_deref())
2732        {
2733            keep_tables.insert(table.name.to_string());
2734        }
2735    }
2736
2737    snapshot.ddl.retain(|entity| match entity {
2738        SqliteEntity::Table(t) => keep_tables.contains(t.name.as_ref()),
2739        SqliteEntity::Column(c) => keep_tables.contains(c.table.as_ref()),
2740        SqliteEntity::Index(i) => keep_tables.contains(i.table.as_ref()),
2741        SqliteEntity::ForeignKey(fk) => {
2742            keep_tables.contains(fk.table.as_ref()) && keep_tables.contains(fk.table_to.as_ref())
2743        }
2744        SqliteEntity::PrimaryKey(pk) => keep_tables.contains(pk.table.as_ref()),
2745        SqliteEntity::UniqueConstraint(u) => keep_tables.contains(u.table.as_ref()),
2746        SqliteEntity::CheckConstraint(c) => keep_tables.contains(c.table.as_ref()),
2747        SqliteEntity::View(v) => matches_patterns(v.name.as_ref(), table_patterns.as_deref()),
2748    });
2749
2750    Ok(())
2751}
2752
2753fn apply_postgres_snapshot_filters(
2754    snapshot: &mut drizzle_migrations::postgres::PostgresSnapshot,
2755    filters: &SnapshotFilters,
2756) -> Result<(), CliError> {
2757    use drizzle_types::postgres::ddl::PostgresEntity;
2758    use std::collections::HashSet;
2759
2760    let schema_patterns = compile_patterns(filters.schemas.as_deref())?;
2761    let table_patterns = compile_patterns(filters.tables.as_deref())?;
2762    let exclude_postgis = filters
2763        .extensions
2764        .as_ref()
2765        .is_some_and(|v| v.contains(&Extension::Postgis));
2766
2767    let is_schema_allowed = |schema: &str| -> bool {
2768        if exclude_postgis && matches!(schema, "topology" | "tiger" | "tiger_data") {
2769            return false;
2770        }
2771        matches_patterns(schema, schema_patterns.as_deref())
2772    };
2773
2774    let mut keep_tables: HashSet<(String, String)> = HashSet::new();
2775    for entity in &snapshot.ddl {
2776        if let PostgresEntity::Table(table) = entity {
2777            let schema = table.schema.as_ref();
2778            let name = table.name.as_ref();
2779            if !is_schema_allowed(schema) {
2780                continue;
2781            }
2782            if exclude_postgis
2783                && matches!(
2784                    name,
2785                    "spatial_ref_sys"
2786                        | "geometry_columns"
2787                        | "geography_columns"
2788                        | "raster_columns"
2789                        | "raster_overviews"
2790                )
2791            {
2792                continue;
2793            }
2794
2795            if matches_patterns(name, table_patterns.as_deref()) {
2796                keep_tables.insert((schema.to_string(), name.to_string()));
2797            }
2798        }
2799    }
2800
2801    let mut keep_schemas: HashSet<String> = keep_tables.iter().map(|(s, _)| s.clone()).collect();
2802    if table_patterns.is_none() {
2803        for entity in &snapshot.ddl {
2804            if let PostgresEntity::Schema(s) = entity
2805                && is_schema_allowed(s.name.as_ref())
2806            {
2807                keep_schemas.insert(s.name.to_string());
2808            }
2809        }
2810    }
2811
2812    snapshot.ddl.retain(|entity| match entity {
2813        PostgresEntity::Schema(s) => keep_schemas.contains(s.name.as_ref()),
2814        PostgresEntity::Enum(e) => keep_schemas.contains(e.schema.as_ref()),
2815        PostgresEntity::Sequence(s) => keep_schemas.contains(s.schema.as_ref()),
2816        PostgresEntity::Role(_) | PostgresEntity::Privilege(_) => true,
2817        PostgresEntity::Policy(p) => {
2818            keep_tables.contains(&(p.schema.to_string(), p.table.to_string()))
2819        }
2820        PostgresEntity::Table(t) => {
2821            keep_tables.contains(&(t.schema.to_string(), t.name.to_string()))
2822        }
2823        PostgresEntity::Column(c) => {
2824            keep_tables.contains(&(c.schema.to_string(), c.table.to_string()))
2825        }
2826        PostgresEntity::Index(i) => {
2827            keep_tables.contains(&(i.schema.to_string(), i.table.to_string()))
2828        }
2829        PostgresEntity::ForeignKey(fk) => {
2830            keep_tables.contains(&(fk.schema.to_string(), fk.table.to_string()))
2831                && keep_tables.contains(&(fk.schema_to.to_string(), fk.table_to.to_string()))
2832        }
2833        PostgresEntity::PrimaryKey(pk) => {
2834            keep_tables.contains(&(pk.schema.to_string(), pk.table.to_string()))
2835        }
2836        PostgresEntity::UniqueConstraint(u) => {
2837            keep_tables.contains(&(u.schema.to_string(), u.table.to_string()))
2838        }
2839        PostgresEntity::CheckConstraint(c) => {
2840            keep_tables.contains(&(c.schema.to_string(), c.table.to_string()))
2841        }
2842        PostgresEntity::View(v) => {
2843            if !keep_schemas.contains(v.schema.as_ref()) {
2844                return false;
2845            }
2846            matches_patterns(v.name.as_ref(), table_patterns.as_deref())
2847        }
2848    });
2849
2850    Ok(())
2851}
2852
2853#[derive(Debug, Clone)]
2854struct FilterPattern {
2855    pattern: glob::Pattern,
2856    negated: bool,
2857}
2858
2859fn compile_patterns(patterns: Option<&[String]>) -> Result<Option<Vec<FilterPattern>>, CliError> {
2860    let Some(patterns) = patterns else {
2861        return Ok(None);
2862    };
2863    if patterns.is_empty() {
2864        return Ok(None);
2865    }
2866
2867    let mut compiled = Vec::with_capacity(patterns.len());
2868    for p in patterns {
2869        let raw = p.trim();
2870        let (negated, source) = raw
2871            .strip_prefix('!')
2872            .map_or((false, raw), |stripped| (true, stripped));
2873        if source.is_empty() {
2874            return Err(CliError::Other(format!(
2875                "invalid filter pattern '{p}': empty pattern"
2876            )));
2877        }
2878
2879        compiled.push(FilterPattern {
2880            pattern: glob::Pattern::new(source)
2881                .map_err(|e| CliError::Other(format!("invalid filter pattern '{p}': {e}")))?,
2882            negated,
2883        });
2884    }
2885    Ok(Some(compiled))
2886}
2887
2888fn matches_patterns(value: &str, patterns: Option<&[FilterPattern]>) -> bool {
2889    match patterns {
2890        None => true,
2891        Some(v) => {
2892            let has_positive = v.iter().any(|m| !m.negated);
2893            let mut matched_positive = false;
2894
2895            for matcher in v {
2896                if matcher.negated {
2897                    if matcher.pattern.matches(value) {
2898                        return false;
2899                    }
2900                } else if matcher.pattern.matches(value) {
2901                    matched_positive = true;
2902                }
2903            }
2904
2905            if has_positive { matched_positive } else { true }
2906        }
2907    }
2908}
2909
2910fn regenerate_schema_from_snapshot(
2911    result: &mut IntrospectResult,
2912    dialect: Dialect,
2913    introspect_casing: Option<IntrospectCasing>,
2914) {
2915    match (&result.snapshot, dialect) {
2916        (Snapshot::Sqlite(snap), Dialect::Sqlite | Dialect::Turso) => {
2917            use drizzle_migrations::sqlite::SQLiteDDL;
2918            use drizzle_migrations::sqlite::codegen::{
2919                CodegenOptions, FieldCasing, generate_rust_schema,
2920            };
2921
2922            let field_casing = match introspect_casing {
2923                Some(IntrospectCasing::Camel) => FieldCasing::Camel,
2924                Some(IntrospectCasing::Preserve) => FieldCasing::Preserve,
2925                None => FieldCasing::Snake,
2926            };
2927
2928            let ddl = SQLiteDDL::from_entities(snap.ddl.clone());
2929            let generated = generate_rust_schema(
2930                &ddl,
2931                &CodegenOptions {
2932                    module_doc: Some("Schema introspected from filtered database objects".into()),
2933                    include_schema: true,
2934                    schema_name: "Schema".into(),
2935                    use_pub: true,
2936                    field_casing,
2937                },
2938            );
2939
2940            result.schema_code = generated.code;
2941            result.table_count = generated.tables.len();
2942            result.index_count = generated.indexes.len();
2943            result.view_count = ddl.views.list().len();
2944            result.warnings = generated.warnings;
2945        }
2946        (Snapshot::Postgres(snap), Dialect::Postgresql) => {
2947            use drizzle_migrations::postgres::PostgresDDL;
2948            use drizzle_migrations::postgres::codegen::{
2949                CodegenOptions, FieldCasing, generate_rust_schema,
2950            };
2951
2952            let field_casing = match introspect_casing {
2953                Some(IntrospectCasing::Camel) => FieldCasing::Camel,
2954                Some(IntrospectCasing::Preserve) => FieldCasing::Preserve,
2955                None => FieldCasing::Snake,
2956            };
2957
2958            let ddl = PostgresDDL::from_entities(snap.ddl.clone());
2959            let generated = generate_rust_schema(
2960                &ddl,
2961                &CodegenOptions {
2962                    module_doc: Some("Schema introspected from filtered database objects".into()),
2963                    include_schema: true,
2964                    schema_name: "Schema".into(),
2965                    use_pub: true,
2966                    field_casing,
2967                },
2968            );
2969
2970            result.schema_code = generated.code;
2971            result.table_count = generated.tables.len();
2972            result.index_count = generated.indexes.len();
2973            result.view_count = generated.views.len();
2974            result.warnings = generated.warnings;
2975        }
2976        _ => {}
2977    }
2978}
2979
2980/// Introspect a database and generate schema code
2981fn introspect_database(
2982    credentials: &Credentials,
2983    dialect: Dialect,
2984) -> Result<IntrospectResult, CliError> {
2985    match dialect {
2986        Dialect::Sqlite | Dialect::Turso => introspect_sqlite_dialect(credentials),
2987        Dialect::Postgresql => introspect_postgres_dialect(credentials),
2988    }
2989}
2990
2991/// Introspect SQLite-family databases
2992fn introspect_sqlite_dialect(credentials: &Credentials) -> Result<IntrospectResult, CliError> {
2993    match credentials {
2994        #[cfg(feature = "rusqlite")]
2995        Credentials::Sqlite { path } => introspect_rusqlite(path),
2996
2997        #[cfg(not(feature = "rusqlite"))]
2998        Credentials::Sqlite { .. } => Err(CliError::MissingDriver {
2999            dialect: "SQLite",
3000            feature: "rusqlite",
3001        }),
3002
3003        #[cfg(any(feature = "libsql", feature = "turso"))]
3004        Credentials::Turso { url, auth_token } => {
3005            if is_local_libsql(url) {
3006                #[cfg(feature = "libsql")]
3007                {
3008                    introspect_libsql_local(url)
3009                }
3010                #[cfg(not(feature = "libsql"))]
3011                {
3012                    Err(CliError::MissingDriver {
3013                        dialect: "LibSQL (local)",
3014                        feature: "libsql",
3015                    })
3016                }
3017            } else {
3018                #[cfg(feature = "turso")]
3019                {
3020                    introspect_turso(url, auth_token.as_deref())
3021                }
3022                #[cfg(not(feature = "turso"))]
3023                {
3024                    let _ = auth_token;
3025                    Err(CliError::MissingDriver {
3026                        dialect: "Turso (remote)",
3027                        feature: "turso",
3028                    })
3029                }
3030            }
3031        }
3032
3033        #[cfg(all(not(feature = "turso"), not(feature = "libsql")))]
3034        Credentials::Turso { .. } => Err(CliError::MissingDriver {
3035            dialect: "Turso",
3036            feature: "turso or libsql",
3037        }),
3038
3039        _ => Err(CliError::Other(
3040            "SQLite introspection requires sqlite path or turso credentials".into(),
3041        )),
3042    }
3043}
3044
3045/// Introspect `PostgreSQL` databases
3046fn introspect_postgres_dialect(credentials: &Credentials) -> Result<IntrospectResult, CliError> {
3047    match credentials {
3048        Credentials::Postgres(creds) => {
3049            let _ = creds;
3050            core::cfg_select! {
3051                feature = "postgres-sync" => introspect_postgres_sync(creds),
3052                feature = "tokio-postgres" => introspect_postgres_async(creds),
3053                _ => Err(CliError::MissingDriver {
3054                    dialect: "PostgreSQL",
3055                    feature: "postgres-sync or tokio-postgres",
3056                }),
3057            }
3058        }
3059
3060        _ => Err(CliError::Other(
3061            "PostgreSQL introspection requires postgres credentials".into(),
3062        )),
3063    }
3064}
3065
3066// ============================================================================
3067// SQLite Introspection (rusqlite)
3068// ============================================================================
3069
3070#[cfg(any(feature = "rusqlite", feature = "libsql", feature = "turso"))]
3071struct SqliteRawData {
3072    tables: Vec<(String, Option<String>)>,
3073    raw_columns: Vec<drizzle_migrations::sqlite::introspect::RawColumnInfo>,
3074    all_indexes: Vec<drizzle_migrations::sqlite::introspect::RawIndexInfo>,
3075    all_index_columns: Vec<drizzle_migrations::sqlite::introspect::RawIndexColumn>,
3076    all_fks: Vec<drizzle_migrations::sqlite::introspect::RawForeignKey>,
3077    all_views: Vec<drizzle_migrations::sqlite::introspect::RawViewInfo>,
3078}
3079
3080#[cfg(any(feature = "rusqlite", feature = "libsql", feature = "turso"))]
3081impl SqliteRawData {
3082    const fn empty() -> Self {
3083        Self {
3084            tables: Vec::new(),
3085            raw_columns: Vec::new(),
3086            all_indexes: Vec::new(),
3087            all_index_columns: Vec::new(),
3088            all_fks: Vec::new(),
3089            all_views: Vec::new(),
3090        }
3091    }
3092}
3093
3094#[cfg(feature = "rusqlite")]
3095fn query_rusqlite_tables_and_columns(
3096    conn: &rusqlite::Connection,
3097    raw: &mut SqliteRawData,
3098) -> Result<(), CliError> {
3099    use drizzle_migrations::sqlite::introspect::{RawColumnInfo, queries};
3100
3101    let mut tables_stmt = conn
3102        .prepare(queries::TABLES_QUERY)
3103        .map_err(|e| CliError::Other(format!("Failed to prepare tables query: {e}")))?;
3104
3105    raw.tables = tables_stmt
3106        .query_map([], |row| Ok((row.get(0)?, row.get(1)?)))
3107        .map_err(|e| CliError::Other(e.to_string()))?
3108        .filter_map(Result::ok)
3109        .collect();
3110
3111    let mut columns_stmt = conn
3112        .prepare(queries::COLUMNS_QUERY)
3113        .map_err(|e| CliError::Other(format!("Failed to prepare columns query: {e}")))?;
3114
3115    raw.raw_columns = columns_stmt
3116        .query_map([], |row| {
3117            Ok(RawColumnInfo {
3118                table: row.get(0)?,
3119                cid: row.get(1)?,
3120                name: row.get(2)?,
3121                column_type: row.get(3)?,
3122                not_null: row.get(4)?,
3123                default_value: row.get(5)?,
3124                pk: row.get(6)?,
3125                hidden: row.get(7)?,
3126                sql: row.get(8)?,
3127            })
3128        })
3129        .map_err(|e| CliError::Other(e.to_string()))?
3130        .filter_map(Result::ok)
3131        .collect();
3132
3133    Ok(())
3134}
3135
3136#[cfg(feature = "rusqlite")]
3137fn query_rusqlite_per_table(
3138    conn: &rusqlite::Connection,
3139    raw: &mut SqliteRawData,
3140) -> Result<(), CliError> {
3141    use drizzle_migrations::sqlite::introspect::{
3142        RawForeignKey, RawIndexColumn, RawIndexInfo, queries,
3143    };
3144
3145    // Collect table names up-front so we can mutate other fields on `raw` while iterating.
3146    let table_names: Vec<String> = raw.tables.iter().map(|(n, _)| n.clone()).collect();
3147
3148    for table_name in &table_names {
3149        if let Ok(mut idx_stmt) = conn.prepare(&queries::indexes_query(table_name)) {
3150            let indexes: Vec<RawIndexInfo> = idx_stmt
3151                .query_map([], |row| {
3152                    Ok(RawIndexInfo {
3153                        table: table_name.clone(),
3154                        name: row.get(1)?,
3155                        unique: row.get::<_, i32>(2)? != 0,
3156                        origin: row.get(3)?,
3157                        partial: row.get::<_, i32>(4)? != 0,
3158                    })
3159                })
3160                .map_err(|e| CliError::Other(e.to_string()))?
3161                .filter_map(Result::ok)
3162                .collect();
3163
3164            for idx in &indexes {
3165                if let Ok(mut col_stmt) = conn.prepare(&queries::index_info_query(&idx.name))
3166                    && let Ok(col_iter) = col_stmt.query_map([], |row| {
3167                        Ok(RawIndexColumn {
3168                            index_name: idx.name.clone(),
3169                            seqno: row.get(0)?,
3170                            cid: row.get(1)?,
3171                            name: row.get(2)?,
3172                            desc: row.get::<_, i32>(3)? != 0,
3173                            coll: row.get(4)?,
3174                            key: row.get::<_, i32>(5)? != 0,
3175                        })
3176                    })
3177                {
3178                    raw.all_index_columns
3179                        .extend(col_iter.filter_map(Result::ok));
3180                }
3181            }
3182            raw.all_indexes.extend(indexes);
3183        }
3184
3185        if let Ok(mut fk_stmt) = conn.prepare(&queries::foreign_keys_query(table_name))
3186            && let Ok(fk_iter) = fk_stmt.query_map([], |row| {
3187                Ok(RawForeignKey {
3188                    table: table_name.clone(),
3189                    id: row.get(0)?,
3190                    seq: row.get(1)?,
3191                    to_table: row.get(2)?,
3192                    from_column: row.get(3)?,
3193                    to_column: row.get(4)?,
3194                    on_update: row.get(5)?,
3195                    on_delete: row.get(6)?,
3196                    r#match: row.get(7)?,
3197                })
3198            })
3199        {
3200            raw.all_fks.extend(fk_iter.filter_map(Result::ok));
3201        }
3202    }
3203
3204    Ok(())
3205}
3206
3207#[cfg(feature = "rusqlite")]
3208fn query_rusqlite_views_and_view_columns(conn: &rusqlite::Connection, raw: &mut SqliteRawData) {
3209    use drizzle_migrations::sqlite::introspect::{RawColumnInfo, RawViewInfo, queries};
3210
3211    if let Ok(mut views_stmt) = conn.prepare(queries::VIEWS_QUERY)
3212        && let Ok(view_iter) = views_stmt.query_map([], |row| {
3213            Ok(RawViewInfo {
3214                name: row.get(0)?,
3215                sql: row.get(1)?,
3216            })
3217        })
3218    {
3219        raw.all_views.extend(view_iter.filter_map(Result::ok));
3220    }
3221
3222    if let Ok(mut view_cols_stmt) = conn.prepare(queries::VIEW_COLUMNS_QUERY)
3223        && let Ok(col_iter) = view_cols_stmt.query_map([], |row| {
3224            Ok(RawColumnInfo {
3225                table: row.get(0)?,
3226                cid: row.get(1)?,
3227                name: row.get(2)?,
3228                column_type: row.get(3)?,
3229                not_null: row.get::<_, i32>(4)? != 0,
3230                default_value: row.get(5)?,
3231                pk: row.get(6)?,
3232                hidden: row.get(7)?,
3233                sql: row.get(8)?,
3234            })
3235        })
3236    {
3237        raw.raw_columns.extend(col_iter.filter_map(Result::ok));
3238    }
3239}
3240
3241#[cfg(feature = "rusqlite")]
3242fn query_rusqlite_raw(conn: &rusqlite::Connection) -> Result<SqliteRawData, CliError> {
3243    let mut raw = SqliteRawData::empty();
3244    query_rusqlite_tables_and_columns(conn, &mut raw)?;
3245    query_rusqlite_per_table(conn, &mut raw)?;
3246    query_rusqlite_views_and_view_columns(conn, &mut raw);
3247    Ok(raw)
3248}
3249
3250#[cfg(any(feature = "rusqlite", feature = "libsql", feature = "turso"))]
3251fn build_sqlite_ddl(raw: SqliteRawData) -> drizzle_migrations::sqlite::SQLiteDDL {
3252    use drizzle_migrations::sqlite::{
3253        SQLiteDDL, Table, View,
3254        introspect::{
3255            parse_generated_columns_from_table_sql, parse_view_sql, process_columns,
3256            process_foreign_keys, process_indexes,
3257        },
3258    };
3259    use std::collections::{HashMap, HashSet};
3260
3261    let table_sql_map: HashMap<String, String> = raw
3262        .tables
3263        .iter()
3264        .filter_map(|(name, sql)| sql.as_ref().map(|s| (name.clone(), s.clone())))
3265        .collect();
3266
3267    let mut generated_columns: HashMap<String, drizzle_migrations::sqlite::ddl::ParsedGenerated> =
3268        HashMap::new();
3269    for (table, sql) in &table_sql_map {
3270        generated_columns.extend(parse_generated_columns_from_table_sql(table, sql));
3271    }
3272    let pk_columns: HashSet<(String, String)> = raw
3273        .raw_columns
3274        .iter()
3275        .filter(|c| c.pk > 0)
3276        .map(|c| (c.table.clone(), c.name.clone()))
3277        .collect();
3278
3279    let (columns, primary_keys) =
3280        process_columns(&raw.raw_columns, &generated_columns, &pk_columns);
3281    let indexes = process_indexes(&raw.all_indexes, &raw.all_index_columns, &table_sql_map);
3282    let foreign_keys = process_foreign_keys(&raw.all_fks);
3283    let uniques = process_sqlite_uniques_from_indexes(&raw.all_indexes, &raw.all_index_columns);
3284
3285    let mut ddl = SQLiteDDL::new();
3286
3287    for (table_name, table_sql) in &raw.tables {
3288        let mut table = Table::new(table_name.clone());
3289        if let Some(sql) = table_sql {
3290            let sql_upper = sql.to_uppercase();
3291            table.strict = sql_upper.contains(" STRICT");
3292            table.without_rowid = sql_upper.contains("WITHOUT ROWID");
3293        }
3294        ddl.tables.push(table);
3295    }
3296
3297    for col in columns {
3298        ddl.columns.push(col);
3299    }
3300    for idx in indexes {
3301        ddl.indexes.push(idx);
3302    }
3303    for fk in foreign_keys {
3304        ddl.fks.push(fk);
3305    }
3306    for pk in primary_keys {
3307        ddl.pks.push(pk);
3308    }
3309    for u in uniques {
3310        ddl.uniques.push(u);
3311    }
3312
3313    for v in raw.all_views {
3314        let mut view = View::new(v.name);
3315        if let Some(def) = parse_view_sql(&v.sql) {
3316            view.definition = Some(def.into());
3317        } else {
3318            view.error = Some("Failed to parse view SQL".into());
3319        }
3320        ddl.views.push(view);
3321    }
3322
3323    ddl
3324}
3325
3326#[cfg(feature = "rusqlite")]
3327fn introspect_rusqlite(path: &str) -> Result<IntrospectResult, CliError> {
3328    use drizzle_migrations::sqlite::codegen::{CodegenOptions, FieldCasing, generate_rust_schema};
3329
3330    let conn = rusqlite::Connection::open(path).map_err(|e| {
3331        CliError::ConnectionError(format!("Failed to open SQLite database '{path}': {e}"))
3332    })?;
3333
3334    let raw = query_rusqlite_raw(&conn)?;
3335    let ddl = build_sqlite_ddl(raw);
3336
3337    let options = CodegenOptions {
3338        module_doc: Some(format!("Schema introspected from {path}")),
3339        include_schema: true,
3340        schema_name: "Schema".to_string(),
3341        use_pub: true,
3342        field_casing: FieldCasing::default(),
3343    };
3344
3345    let generated = generate_rust_schema(&ddl, &options);
3346
3347    let mut sqlite_snapshot = drizzle_migrations::sqlite::SQLiteSnapshot::new();
3348    for entity in ddl.to_entities() {
3349        sqlite_snapshot.add_entity(entity);
3350    }
3351    let snapshot = Snapshot::Sqlite(sqlite_snapshot);
3352
3353    Ok(IntrospectResult {
3354        schema_code: generated.code,
3355        table_count: generated.tables.len(),
3356        index_count: generated.indexes.len(),
3357        view_count: ddl.views.len(),
3358        warnings: generated.warnings,
3359        snapshot,
3360        snapshot_path: std::path::PathBuf::new(),
3361    })
3362}
3363
3364// ============================================================================
3365// LibSQL Introspection (local)
3366// ============================================================================
3367
3368#[cfg(feature = "libsql")]
3369fn introspect_libsql_local(path: &str) -> Result<IntrospectResult, CliError> {
3370    let rt = tokio::runtime::Builder::new_current_thread()
3371        .enable_all()
3372        .build()
3373        .map_err(|e| CliError::Other(format!("Failed to create async runtime: {e}")))?;
3374
3375    rt.block_on(introspect_libsql_inner(path, None))
3376}
3377
3378#[cfg(any(feature = "libsql", feature = "turso"))]
3379async fn query_libsql_tables_and_columns(
3380    conn: &libsql::Connection,
3381) -> Result<
3382    (
3383        Vec<(String, Option<String>)>,
3384        Vec<drizzle_migrations::sqlite::introspect::RawColumnInfo>,
3385    ),
3386    CliError,
3387> {
3388    use drizzle_migrations::sqlite::introspect::{RawColumnInfo, queries};
3389
3390    let mut tables_rows = conn
3391        .query(queries::TABLES_QUERY, ())
3392        .await
3393        .map_err(|e| CliError::Other(format!("Failed to query tables: {e}")))?;
3394
3395    let mut tables: Vec<(String, Option<String>)> = Vec::new();
3396    while let Ok(Some(row)) = tables_rows.next().await {
3397        let name: String = row.get(0).unwrap_or_default();
3398        let sql: Option<String> = row.get(1).ok();
3399        tables.push((name, sql));
3400    }
3401
3402    let mut columns_rows = conn
3403        .query(queries::COLUMNS_QUERY, ())
3404        .await
3405        .map_err(|e| CliError::Other(format!("Failed to query columns: {e}")))?;
3406
3407    let mut raw_columns: Vec<RawColumnInfo> = Vec::new();
3408    while let Ok(Some(row)) = columns_rows.next().await {
3409        raw_columns.push(RawColumnInfo {
3410            table: row.get(0).unwrap_or_default(),
3411            cid: row.get(1).unwrap_or(0),
3412            name: row.get(2).unwrap_or_default(),
3413            column_type: row.get(3).unwrap_or_default(),
3414            not_null: row.get::<i32>(4).unwrap_or(0) != 0,
3415            default_value: row.get(5).ok(),
3416            pk: row.get(6).unwrap_or(0),
3417            hidden: row.get(7).unwrap_or(0),
3418            sql: row.get(8).ok(),
3419        });
3420    }
3421
3422    Ok((tables, raw_columns))
3423}
3424
3425#[cfg(any(feature = "libsql", feature = "turso"))]
3426async fn query_libsql_per_table(
3427    conn: &libsql::Connection,
3428    tables: &[(String, Option<String>)],
3429) -> (
3430    Vec<drizzle_migrations::sqlite::introspect::RawIndexInfo>,
3431    Vec<drizzle_migrations::sqlite::introspect::RawIndexColumn>,
3432    Vec<drizzle_migrations::sqlite::introspect::RawForeignKey>,
3433) {
3434    use drizzle_migrations::sqlite::introspect::{
3435        RawForeignKey, RawIndexColumn, RawIndexInfo, queries,
3436    };
3437
3438    let mut all_indexes: Vec<RawIndexInfo> = Vec::new();
3439    let mut all_index_columns: Vec<RawIndexColumn> = Vec::new();
3440    let mut all_fks: Vec<RawForeignKey> = Vec::new();
3441
3442    for (table_name, _) in tables {
3443        if let Ok(mut idx_rows) = conn.query(&queries::indexes_query(table_name), ()).await {
3444            while let Ok(Some(row)) = idx_rows.next().await {
3445                let idx = RawIndexInfo {
3446                    table: table_name.clone(),
3447                    name: row.get(1).unwrap_or_default(),
3448                    unique: row.get::<i32>(2).unwrap_or(0) != 0,
3449                    origin: row.get(3).unwrap_or_default(),
3450                    partial: row.get::<i32>(4).unwrap_or(0) != 0,
3451                };
3452
3453                if let Ok(mut col_rows) =
3454                    conn.query(&queries::index_info_query(&idx.name), ()).await
3455                {
3456                    while let Ok(Some(col_row)) = col_rows.next().await {
3457                        all_index_columns.push(RawIndexColumn {
3458                            index_name: idx.name.clone(),
3459                            seqno: col_row.get(0).unwrap_or(0),
3460                            cid: col_row.get(1).unwrap_or(0),
3461                            name: col_row.get(2).ok(),
3462                            desc: col_row.get::<i32>(3).unwrap_or(0) != 0,
3463                            coll: col_row.get(4).unwrap_or_default(),
3464                            key: col_row.get::<i32>(5).unwrap_or(0) != 0,
3465                        });
3466                    }
3467                }
3468
3469                all_indexes.push(idx);
3470            }
3471        }
3472
3473        if let Ok(mut fk_rows) = conn
3474            .query(&queries::foreign_keys_query(table_name), ())
3475            .await
3476        {
3477            while let Ok(Some(row)) = fk_rows.next().await {
3478                all_fks.push(RawForeignKey {
3479                    table: table_name.clone(),
3480                    id: row.get(0).unwrap_or(0),
3481                    seq: row.get(1).unwrap_or(0),
3482                    to_table: row.get(2).unwrap_or_default(),
3483                    from_column: row.get(3).unwrap_or_default(),
3484                    to_column: row.get(4).unwrap_or_default(),
3485                    on_update: row.get(5).unwrap_or_default(),
3486                    on_delete: row.get(6).unwrap_or_default(),
3487                    r#match: row.get(7).unwrap_or_default(),
3488                });
3489            }
3490        }
3491    }
3492
3493    (all_indexes, all_index_columns, all_fks)
3494}
3495
3496#[cfg(any(feature = "libsql", feature = "turso"))]
3497async fn query_libsql_views_and_view_columns(
3498    conn: &libsql::Connection,
3499    raw_columns: &mut Vec<drizzle_migrations::sqlite::introspect::RawColumnInfo>,
3500) -> Vec<drizzle_migrations::sqlite::introspect::RawViewInfo> {
3501    use drizzle_migrations::sqlite::introspect::{RawColumnInfo, RawViewInfo, queries};
3502
3503    let mut all_views: Vec<RawViewInfo> = Vec::new();
3504    if let Ok(mut views_rows) = conn.query(queries::VIEWS_QUERY, ()).await {
3505        while let Ok(Some(row)) = views_rows.next().await {
3506            let name: String = row.get(0).unwrap_or_default();
3507            let sql: String = row.get(1).unwrap_or_default();
3508            all_views.push(RawViewInfo { name, sql });
3509        }
3510    }
3511
3512    if let Ok(mut view_cols_rows) = conn.query(queries::VIEW_COLUMNS_QUERY, ()).await {
3513        while let Ok(Some(row)) = view_cols_rows.next().await {
3514            raw_columns.push(RawColumnInfo {
3515                table: row.get(0).unwrap_or_default(),
3516                cid: row.get(1).unwrap_or(0),
3517                name: row.get(2).unwrap_or_default(),
3518                column_type: row.get(3).unwrap_or_default(),
3519                not_null: row.get::<i32>(4).unwrap_or(0) != 0,
3520                default_value: row.get(5).ok(),
3521                pk: row.get(6).unwrap_or(0),
3522                hidden: row.get(7).unwrap_or(0),
3523                sql: row.get(8).ok(),
3524            });
3525        }
3526    }
3527
3528    all_views
3529}
3530
3531#[cfg(any(feature = "libsql", feature = "turso"))]
3532async fn query_libsql_raw(conn: &libsql::Connection) -> Result<SqliteRawData, CliError> {
3533    let (tables, mut raw_columns) = query_libsql_tables_and_columns(conn).await?;
3534    let (all_indexes, all_index_columns, all_fks) = query_libsql_per_table(conn, &tables).await;
3535    let all_views = query_libsql_views_and_view_columns(conn, &mut raw_columns).await;
3536
3537    Ok(SqliteRawData {
3538        tables,
3539        raw_columns,
3540        all_indexes,
3541        all_index_columns,
3542        all_fks,
3543        all_views,
3544    })
3545}
3546
3547#[cfg(feature = "libsql")]
3548async fn introspect_libsql_inner(
3549    path: &str,
3550    _auth_token: Option<&str>,
3551) -> Result<IntrospectResult, CliError> {
3552    use drizzle_migrations::sqlite::codegen::{CodegenOptions, FieldCasing, generate_rust_schema};
3553
3554    let db = libsql::Builder::new_local(path)
3555        .build()
3556        .await
3557        .map_err(|e| {
3558            CliError::ConnectionError(format!("Failed to open LibSQL database '{path}': {e}"))
3559        })?;
3560
3561    let conn = db
3562        .connect()
3563        .map_err(|e| CliError::ConnectionError(e.to_string()))?;
3564
3565    let raw = query_libsql_raw(&conn).await?;
3566    let ddl = build_sqlite_ddl(raw);
3567
3568    let options = CodegenOptions {
3569        module_doc: Some(format!("Schema introspected from {path}")),
3570        include_schema: true,
3571        schema_name: "Schema".to_string(),
3572        use_pub: true,
3573        field_casing: FieldCasing::default(),
3574    };
3575
3576    let generated = generate_rust_schema(&ddl, &options);
3577
3578    let mut sqlite_snapshot = drizzle_migrations::sqlite::SQLiteSnapshot::new();
3579    for entity in ddl.to_entities() {
3580        sqlite_snapshot.add_entity(entity);
3581    }
3582    let snapshot = Snapshot::Sqlite(sqlite_snapshot);
3583
3584    Ok(IntrospectResult {
3585        schema_code: generated.code,
3586        table_count: generated.tables.len(),
3587        index_count: generated.indexes.len(),
3588        view_count: ddl.views.len(),
3589        warnings: generated.warnings,
3590        snapshot,
3591        snapshot_path: std::path::PathBuf::new(),
3592    })
3593}
3594
3595// ============================================================================
3596// Turso Introspection (remote)
3597// ============================================================================
3598
3599#[cfg(feature = "turso")]
3600fn introspect_turso(url: &str, auth_token: Option<&str>) -> Result<IntrospectResult, CliError> {
3601    let rt = tokio::runtime::Builder::new_current_thread()
3602        .enable_all()
3603        .build()
3604        .map_err(|e| CliError::Other(format!("Failed to create async runtime: {e}")))?;
3605
3606    rt.block_on(introspect_turso_inner(url, auth_token))
3607}
3608
3609#[cfg(feature = "turso")]
3610async fn introspect_turso_inner(
3611    url: &str,
3612    auth_token: Option<&str>,
3613) -> Result<IntrospectResult, CliError> {
3614    use drizzle_migrations::sqlite::codegen::{CodegenOptions, FieldCasing, generate_rust_schema};
3615
3616    let builder =
3617        libsql::Builder::new_remote(url.to_string(), auth_token.unwrap_or("").to_string());
3618
3619    let db = builder.build().await.map_err(|e| {
3620        CliError::ConnectionError(format!("Failed to connect to Turso '{url}': {e}"))
3621    })?;
3622
3623    let conn = db
3624        .connect()
3625        .map_err(|e| CliError::ConnectionError(e.to_string()))?;
3626
3627    let raw = query_libsql_raw(&conn).await?;
3628    let ddl = build_sqlite_ddl(raw);
3629
3630    let options = CodegenOptions {
3631        module_doc: Some(format!("Schema introspected from Turso: {url}")),
3632        include_schema: true,
3633        schema_name: "Schema".to_string(),
3634        use_pub: true,
3635        field_casing: FieldCasing::default(),
3636    };
3637
3638    let generated = generate_rust_schema(&ddl, &options);
3639
3640    let mut sqlite_snapshot = drizzle_migrations::sqlite::SQLiteSnapshot::new();
3641    for entity in ddl.to_entities() {
3642        sqlite_snapshot.add_entity(entity);
3643    }
3644    let snapshot = Snapshot::Sqlite(sqlite_snapshot);
3645
3646    Ok(IntrospectResult {
3647        schema_code: generated.code,
3648        table_count: generated.tables.len(),
3649        index_count: generated.indexes.len(),
3650        view_count: ddl.views.len(),
3651        warnings: generated.warnings,
3652        snapshot,
3653        snapshot_path: std::path::PathBuf::new(),
3654    })
3655}
3656
3657// ============================================================================
3658// PostgreSQL Introspection
3659// ============================================================================
3660
3661#[cfg(feature = "postgres-sync")]
3662fn introspect_postgres_sync(creds: &PostgresCreds) -> Result<IntrospectResult, CliError> {
3663    let url = creds.connection_url();
3664    let mut client = postgres::Client::connect(&url, postgres::NoTls)
3665        .map_err(|e| CliError::ConnectionError(format!("Failed to connect to PostgreSQL: {e}")))?;
3666
3667    let raw = query_postgres_sync_raw(&mut client)?;
3668    let ddl = build_postgres_ddl(raw);
3669
3670    Ok(finalize_postgres_introspection(&ddl, &url))
3671}
3672
3673#[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
3674impl PostgresRawData {
3675    const fn empty() -> Self {
3676        Self {
3677            schemas: Vec::new(),
3678            tables: Vec::new(),
3679            columns: Vec::new(),
3680            enums: Vec::new(),
3681            sequences: Vec::new(),
3682            views: Vec::new(),
3683            indexes: Vec::new(),
3684            foreign_keys: Vec::new(),
3685            primary_keys: Vec::new(),
3686            uniques: Vec::new(),
3687            checks: Vec::new(),
3688            roles: Vec::new(),
3689            policies: Vec::new(),
3690        }
3691    }
3692}
3693
3694#[cfg(feature = "postgres-sync")]
3695fn query_postgres_sync_raw(client: &mut postgres::Client) -> Result<PostgresRawData, CliError> {
3696    let mut raw = PostgresRawData::empty();
3697    query_pg_sync_core(client, &mut raw)?;
3698    query_pg_sync_codegen_meta(client, &mut raw)?;
3699    query_pg_sync_constraints(client, &mut raw)?;
3700    query_pg_sync_security(client, &mut raw)?;
3701    Ok(raw)
3702}
3703
3704#[cfg(feature = "postgres-sync")]
3705fn query_pg_sync_core(
3706    client: &mut postgres::Client,
3707    raw: &mut PostgresRawData,
3708) -> Result<(), CliError> {
3709    use drizzle_migrations::postgres::introspect::{RawColumnInfo, RawTableInfo, queries};
3710
3711    raw.schemas = client
3712        .query(queries::SCHEMAS_QUERY, &[])
3713        .map_err(|e| CliError::Other(format!("Failed to query schemas: {e}")))?
3714        .into_iter()
3715        .map(|row| RawSchemaInfo {
3716            name: row.get::<_, String>(0),
3717        })
3718        .collect();
3719
3720    raw.tables = client
3721        .query(queries::TABLES_QUERY, &[])
3722        .map_err(|e| CliError::Other(format!("Failed to query tables: {e}")))?
3723        .into_iter()
3724        .map(|row| RawTableInfo {
3725            schema: row.get::<_, String>(0),
3726            name: row.get::<_, String>(1),
3727            is_rls_enabled: row.get::<_, bool>(2),
3728        })
3729        .collect();
3730
3731    raw.columns = client
3732        .query(queries::COLUMNS_QUERY, &[])
3733        .map_err(|e| CliError::Other(format!("Failed to query columns: {e}")))?
3734        .into_iter()
3735        .map(|row| RawColumnInfo {
3736            schema: row.get::<_, String>(0),
3737            table: row.get::<_, String>(1),
3738            name: row.get::<_, String>(2),
3739            column_type: row.get::<_, String>(3),
3740            type_schema: row.get::<_, Option<String>>(4),
3741            not_null: row.get::<_, bool>(5),
3742            default_value: row.get::<_, Option<String>>(6),
3743            is_identity: row.get::<_, bool>(7),
3744            identity_type: row.get::<_, Option<String>>(8),
3745            is_generated: row.get::<_, bool>(9),
3746            generated_expression: row.get::<_, Option<String>>(10),
3747            ordinal_position: row.get::<_, i32>(11),
3748        })
3749        .collect();
3750
3751    Ok(())
3752}
3753
3754#[cfg(feature = "postgres-sync")]
3755fn query_pg_sync_codegen_meta(
3756    client: &mut postgres::Client,
3757    raw: &mut PostgresRawData,
3758) -> Result<(), CliError> {
3759    use drizzle_migrations::postgres::introspect::{
3760        RawEnumInfo, RawSequenceInfo, RawViewInfo, queries,
3761    };
3762
3763    raw.enums = client
3764        .query(queries::ENUMS_QUERY, &[])
3765        .map_err(|e| CliError::Other(format!("Failed to query enums: {e}")))?
3766        .into_iter()
3767        .map(|row| RawEnumInfo {
3768            schema: row.get::<_, String>(0),
3769            name: row.get::<_, String>(1),
3770            values: row.get::<_, Vec<String>>(2),
3771        })
3772        .collect();
3773
3774    raw.sequences = client
3775        .query(queries::SEQUENCES_QUERY, &[])
3776        .map_err(|e| CliError::Other(format!("Failed to query sequences: {e}")))?
3777        .into_iter()
3778        .map(|row| RawSequenceInfo {
3779            schema: row.get::<_, String>(0),
3780            name: row.get::<_, String>(1),
3781            data_type: row.get::<_, Option<String>>(2),
3782            start_value: row.get::<_, Option<String>>(3),
3783            min_value: row.get::<_, Option<String>>(4),
3784            max_value: row.get::<_, Option<String>>(5),
3785            increment: row.get::<_, Option<String>>(6),
3786            cycle: row.get::<_, Option<bool>>(7),
3787            cache_value: row.get::<_, Option<String>>(8),
3788        })
3789        .collect();
3790
3791    let view_schema_filters: Option<Vec<String>> = None;
3792    raw.views = client
3793        .query(queries::VIEWS_QUERY, &[&view_schema_filters])
3794        .map_err(|e| CliError::Other(format!("Failed to query views: {e}")))?
3795        .into_iter()
3796        .map(|row| RawViewInfo {
3797            schema: row.get::<_, String>(0),
3798            name: row.get::<_, String>(1),
3799            definition: row.get::<_, String>(2),
3800            is_materialized: row.get::<_, bool>(3),
3801        })
3802        .collect();
3803
3804    Ok(())
3805}
3806
3807#[cfg(feature = "postgres-sync")]
3808fn query_pg_sync_constraints(
3809    client: &mut postgres::Client,
3810    raw: &mut PostgresRawData,
3811) -> Result<(), CliError> {
3812    use drizzle_migrations::postgres::introspect::{
3813        RawCheckInfo, RawForeignKeyInfo, RawIndexInfo, RawPrimaryKeyInfo, RawUniqueInfo,
3814        parse_index_columns, pg_action_code_to_string, queries,
3815    };
3816
3817    raw.indexes = client
3818        .query(queries::INDEXES_QUERY, &[])
3819        .map_err(|e| CliError::Other(format!("Failed to query indexes: {e}")))?
3820        .into_iter()
3821        .map(|row| {
3822            let cols: Vec<String> = row.get(6);
3823            RawIndexInfo {
3824                schema: row.get::<_, String>(0),
3825                table: row.get::<_, String>(1),
3826                name: row.get::<_, String>(2),
3827                is_unique: row.get::<_, bool>(3),
3828                is_primary: row.get::<_, bool>(4),
3829                method: row.get::<_, String>(5),
3830                columns: parse_index_columns(cols),
3831                where_clause: row.get::<_, Option<String>>(7),
3832                concurrent: false,
3833            }
3834        })
3835        .collect();
3836
3837    raw.foreign_keys = client
3838        .query(queries::FOREIGN_KEYS_QUERY, &[])
3839        .map_err(|e| CliError::Other(format!("Failed to query foreign keys: {e}")))?
3840        .into_iter()
3841        .map(|row| RawForeignKeyInfo {
3842            schema: row.get::<_, String>(0),
3843            table: row.get::<_, String>(1),
3844            name: row.get::<_, String>(2),
3845            columns: row.get::<_, Vec<String>>(3),
3846            schema_to: row.get::<_, String>(4),
3847            table_to: row.get::<_, String>(5),
3848            columns_to: row.get::<_, Vec<String>>(6),
3849            on_update: pg_action_code_to_string(&row.get::<_, String>(7)),
3850            on_delete: pg_action_code_to_string(&row.get::<_, String>(8)),
3851        })
3852        .collect();
3853
3854    raw.primary_keys = client
3855        .query(queries::PRIMARY_KEYS_QUERY, &[])
3856        .map_err(|e| CliError::Other(format!("Failed to query primary keys: {e}")))?
3857        .into_iter()
3858        .map(|row| RawPrimaryKeyInfo {
3859            schema: row.get::<_, String>(0),
3860            table: row.get::<_, String>(1),
3861            name: row.get::<_, String>(2),
3862            columns: row.get::<_, Vec<String>>(3),
3863        })
3864        .collect();
3865
3866    raw.uniques = client
3867        .query(queries::UNIQUES_QUERY, &[])
3868        .map_err(|e| CliError::Other(format!("Failed to query unique constraints: {e}")))?
3869        .into_iter()
3870        .map(|row| RawUniqueInfo {
3871            schema: row.get::<_, String>(0),
3872            table: row.get::<_, String>(1),
3873            name: row.get::<_, String>(2),
3874            columns: row.get::<_, Vec<String>>(3),
3875            nulls_not_distinct: row.get::<_, bool>(4),
3876        })
3877        .collect();
3878
3879    raw.checks = client
3880        .query(queries::CHECKS_QUERY, &[])
3881        .map_err(|e| CliError::Other(format!("Failed to query check constraints: {e}")))?
3882        .into_iter()
3883        .map(|row| RawCheckInfo {
3884            schema: row.get::<_, String>(0),
3885            table: row.get::<_, String>(1),
3886            name: row.get::<_, String>(2),
3887            expression: row.get::<_, String>(3),
3888        })
3889        .collect();
3890
3891    Ok(())
3892}
3893
3894#[cfg(feature = "postgres-sync")]
3895fn query_pg_sync_security(
3896    client: &mut postgres::Client,
3897    raw: &mut PostgresRawData,
3898) -> Result<(), CliError> {
3899    use drizzle_migrations::postgres::introspect::{RawPolicyInfo, RawRoleInfo, queries};
3900
3901    raw.roles = client
3902        .query(queries::ROLES_QUERY, &[])
3903        .map_err(|e| CliError::Other(format!("Failed to query roles: {e}")))?
3904        .into_iter()
3905        .map(|row| RawRoleInfo {
3906            name: row.get::<_, String>(0),
3907            create_db: row.get::<_, bool>(1),
3908            create_role: row.get::<_, bool>(2),
3909            inherit: row.get::<_, bool>(3),
3910        })
3911        .collect();
3912
3913    raw.policies = client
3914        .query(queries::POLICIES_QUERY, &[])
3915        .map_err(|e| CliError::Other(format!("Failed to query policies: {e}")))?
3916        .into_iter()
3917        .map(|row| RawPolicyInfo {
3918            schema: row.get::<_, String>(0),
3919            table: row.get::<_, String>(1),
3920            name: row.get::<_, String>(2),
3921            as_clause: row.get::<_, String>(3),
3922            for_clause: row.get::<_, String>(4),
3923            to: row.get::<_, Vec<String>>(5),
3924            using: row.get::<_, Option<String>>(6),
3925            with_check: row.get::<_, Option<String>>(7),
3926        })
3927        .collect();
3928
3929    Ok(())
3930}
3931
3932#[cfg(all(not(feature = "postgres-sync"), feature = "tokio-postgres"))]
3933fn introspect_postgres_async(creds: &PostgresCreds) -> Result<IntrospectResult, CliError> {
3934    let rt = tokio::runtime::Builder::new_current_thread()
3935        .enable_all()
3936        .build()
3937        .map_err(|e| CliError::Other(format!("Failed to create async runtime: {e}")))?;
3938
3939    rt.block_on(introspect_postgres_async_inner(creds))
3940}
3941
3942#[cfg(all(not(feature = "postgres-sync"), feature = "tokio-postgres"))]
3943async fn introspect_postgres_async_inner(
3944    creds: &PostgresCreds,
3945) -> Result<IntrospectResult, CliError> {
3946    let url = creds.connection_url();
3947    let (client, connection) = tokio_postgres::connect(&url, tokio_postgres::NoTls)
3948        .await
3949        .map_err(|e| CliError::ConnectionError(format!("Failed to connect to PostgreSQL: {e}")))?;
3950
3951    tokio::spawn(async move {
3952        if let Err(e) = connection.await {
3953            eprintln!(
3954                "{}",
3955                output::err_line(&format!("PostgreSQL connection error: {e}"))
3956            );
3957        }
3958    });
3959
3960    let raw = query_postgres_async_raw(&client).await?;
3961    let ddl = build_postgres_ddl(raw);
3962
3963    Ok(finalize_postgres_introspection(&ddl, &url))
3964}
3965
3966#[cfg(all(not(feature = "postgres-sync"), feature = "tokio-postgres"))]
3967async fn query_postgres_async_raw(
3968    client: &tokio_postgres::Client,
3969) -> Result<PostgresRawData, CliError> {
3970    let mut raw = PostgresRawData::empty();
3971    query_pg_async_core(client, &mut raw).await?;
3972    query_pg_async_codegen_meta(client, &mut raw).await?;
3973    query_pg_async_constraints(client, &mut raw).await?;
3974    query_pg_async_security(client, &mut raw).await?;
3975    Ok(raw)
3976}
3977
3978#[cfg(all(not(feature = "postgres-sync"), feature = "tokio-postgres"))]
3979async fn query_pg_async_core(
3980    client: &tokio_postgres::Client,
3981    raw: &mut PostgresRawData,
3982) -> Result<(), CliError> {
3983    use drizzle_migrations::postgres::introspect::{RawColumnInfo, RawTableInfo, queries};
3984
3985    raw.schemas = client
3986        .query(queries::SCHEMAS_QUERY, &[])
3987        .await
3988        .map_err(|e| CliError::Other(format!("Failed to query schemas: {e}")))?
3989        .into_iter()
3990        .map(|row| RawSchemaInfo {
3991            name: row.get::<_, String>(0),
3992        })
3993        .collect();
3994
3995    raw.tables = client
3996        .query(queries::TABLES_QUERY, &[])
3997        .await
3998        .map_err(|e| CliError::Other(format!("Failed to query tables: {e}")))?
3999        .into_iter()
4000        .map(|row| RawTableInfo {
4001            schema: row.get::<_, String>(0),
4002            name: row.get::<_, String>(1),
4003            is_rls_enabled: row.get::<_, bool>(2),
4004        })
4005        .collect();
4006
4007    raw.columns = client
4008        .query(queries::COLUMNS_QUERY, &[])
4009        .await
4010        .map_err(|e| CliError::Other(format!("Failed to query columns: {e}")))?
4011        .into_iter()
4012        .map(|row| RawColumnInfo {
4013            schema: row.get::<_, String>(0),
4014            table: row.get::<_, String>(1),
4015            name: row.get::<_, String>(2),
4016            column_type: row.get::<_, String>(3),
4017            type_schema: row.get::<_, Option<String>>(4),
4018            not_null: row.get::<_, bool>(5),
4019            default_value: row.get::<_, Option<String>>(6),
4020            is_identity: row.get::<_, bool>(7),
4021            identity_type: row.get::<_, Option<String>>(8),
4022            is_generated: row.get::<_, bool>(9),
4023            generated_expression: row.get::<_, Option<String>>(10),
4024            ordinal_position: row.get::<_, i32>(11),
4025        })
4026        .collect();
4027
4028    Ok(())
4029}
4030
4031#[cfg(all(not(feature = "postgres-sync"), feature = "tokio-postgres"))]
4032async fn query_pg_async_codegen_meta(
4033    client: &tokio_postgres::Client,
4034    raw: &mut PostgresRawData,
4035) -> Result<(), CliError> {
4036    use drizzle_migrations::postgres::introspect::{
4037        RawEnumInfo, RawSequenceInfo, RawViewInfo, queries,
4038    };
4039
4040    raw.enums = client
4041        .query(queries::ENUMS_QUERY, &[])
4042        .await
4043        .map_err(|e| CliError::Other(format!("Failed to query enums: {e}")))?
4044        .into_iter()
4045        .map(|row| RawEnumInfo {
4046            schema: row.get::<_, String>(0),
4047            name: row.get::<_, String>(1),
4048            values: row.get::<_, Vec<String>>(2),
4049        })
4050        .collect();
4051
4052    raw.sequences = client
4053        .query(queries::SEQUENCES_QUERY, &[])
4054        .await
4055        .map_err(|e| CliError::Other(format!("Failed to query sequences: {e}")))?
4056        .into_iter()
4057        .map(|row| RawSequenceInfo {
4058            schema: row.get::<_, String>(0),
4059            name: row.get::<_, String>(1),
4060            data_type: row.get::<_, Option<String>>(2),
4061            start_value: row.get::<_, Option<String>>(3),
4062            min_value: row.get::<_, Option<String>>(4),
4063            max_value: row.get::<_, Option<String>>(5),
4064            increment: row.get::<_, Option<String>>(6),
4065            cycle: row.get::<_, Option<bool>>(7),
4066            cache_value: row.get::<_, Option<String>>(8),
4067        })
4068        .collect();
4069
4070    let view_schema_filters: Option<Vec<String>> = None;
4071    raw.views = client
4072        .query(queries::VIEWS_QUERY, &[&view_schema_filters])
4073        .await
4074        .map_err(|e| CliError::Other(format!("Failed to query views: {e}")))?
4075        .into_iter()
4076        .map(|row| RawViewInfo {
4077            schema: row.get::<_, String>(0),
4078            name: row.get::<_, String>(1),
4079            definition: row.get::<_, String>(2),
4080            is_materialized: row.get::<_, bool>(3),
4081        })
4082        .collect();
4083
4084    Ok(())
4085}
4086
4087#[cfg(all(not(feature = "postgres-sync"), feature = "tokio-postgres"))]
4088async fn query_pg_async_constraints(
4089    client: &tokio_postgres::Client,
4090    raw: &mut PostgresRawData,
4091) -> Result<(), CliError> {
4092    use drizzle_migrations::postgres::introspect::{
4093        RawCheckInfo, RawForeignKeyInfo, RawIndexInfo, RawPrimaryKeyInfo, RawUniqueInfo,
4094        parse_index_columns, pg_action_code_to_string, queries,
4095    };
4096
4097    raw.indexes = client
4098        .query(queries::INDEXES_QUERY, &[])
4099        .await
4100        .map_err(|e| CliError::Other(format!("Failed to query indexes: {e}")))?
4101        .into_iter()
4102        .map(|row| {
4103            let cols: Vec<String> = row.get(6);
4104            RawIndexInfo {
4105                schema: row.get::<_, String>(0),
4106                table: row.get::<_, String>(1),
4107                name: row.get::<_, String>(2),
4108                is_unique: row.get::<_, bool>(3),
4109                is_primary: row.get::<_, bool>(4),
4110                method: row.get::<_, String>(5),
4111                columns: parse_index_columns(cols),
4112                where_clause: row.get::<_, Option<String>>(7),
4113                concurrent: false,
4114            }
4115        })
4116        .collect();
4117
4118    raw.foreign_keys = client
4119        .query(queries::FOREIGN_KEYS_QUERY, &[])
4120        .await
4121        .map_err(|e| CliError::Other(format!("Failed to query foreign keys: {e}")))?
4122        .into_iter()
4123        .map(|row| RawForeignKeyInfo {
4124            schema: row.get::<_, String>(0),
4125            table: row.get::<_, String>(1),
4126            name: row.get::<_, String>(2),
4127            columns: row.get::<_, Vec<String>>(3),
4128            schema_to: row.get::<_, String>(4),
4129            table_to: row.get::<_, String>(5),
4130            columns_to: row.get::<_, Vec<String>>(6),
4131            on_update: pg_action_code_to_string(&row.get::<_, String>(7)),
4132            on_delete: pg_action_code_to_string(&row.get::<_, String>(8)),
4133        })
4134        .collect();
4135
4136    raw.primary_keys = client
4137        .query(queries::PRIMARY_KEYS_QUERY, &[])
4138        .await
4139        .map_err(|e| CliError::Other(format!("Failed to query primary keys: {e}")))?
4140        .into_iter()
4141        .map(|row| RawPrimaryKeyInfo {
4142            schema: row.get::<_, String>(0),
4143            table: row.get::<_, String>(1),
4144            name: row.get::<_, String>(2),
4145            columns: row.get::<_, Vec<String>>(3),
4146        })
4147        .collect();
4148
4149    raw.uniques = client
4150        .query(queries::UNIQUES_QUERY, &[])
4151        .await
4152        .map_err(|e| CliError::Other(format!("Failed to query unique constraints: {e}")))?
4153        .into_iter()
4154        .map(|row| RawUniqueInfo {
4155            schema: row.get::<_, String>(0),
4156            table: row.get::<_, String>(1),
4157            name: row.get::<_, String>(2),
4158            columns: row.get::<_, Vec<String>>(3),
4159            nulls_not_distinct: row.get::<_, bool>(4),
4160        })
4161        .collect();
4162
4163    raw.checks = client
4164        .query(queries::CHECKS_QUERY, &[])
4165        .await
4166        .map_err(|e| CliError::Other(format!("Failed to query check constraints: {e}")))?
4167        .into_iter()
4168        .map(|row| RawCheckInfo {
4169            schema: row.get::<_, String>(0),
4170            table: row.get::<_, String>(1),
4171            name: row.get::<_, String>(2),
4172            expression: row.get::<_, String>(3),
4173        })
4174        .collect();
4175
4176    Ok(())
4177}
4178
4179#[cfg(all(not(feature = "postgres-sync"), feature = "tokio-postgres"))]
4180async fn query_pg_async_security(
4181    client: &tokio_postgres::Client,
4182    raw: &mut PostgresRawData,
4183) -> Result<(), CliError> {
4184    use drizzle_migrations::postgres::introspect::{RawPolicyInfo, RawRoleInfo, queries};
4185
4186    raw.roles = client
4187        .query(queries::ROLES_QUERY, &[])
4188        .await
4189        .map_err(|e| CliError::Other(format!("Failed to query roles: {e}")))?
4190        .into_iter()
4191        .map(|row| RawRoleInfo {
4192            name: row.get::<_, String>(0),
4193            create_db: row.get::<_, bool>(1),
4194            create_role: row.get::<_, bool>(2),
4195            inherit: row.get::<_, bool>(3),
4196        })
4197        .collect();
4198
4199    raw.policies = client
4200        .query(queries::POLICIES_QUERY, &[])
4201        .await
4202        .map_err(|e| CliError::Other(format!("Failed to query policies: {e}")))?
4203        .into_iter()
4204        .map(|row| RawPolicyInfo {
4205            schema: row.get::<_, String>(0),
4206            table: row.get::<_, String>(1),
4207            name: row.get::<_, String>(2),
4208            as_clause: row.get::<_, String>(3),
4209            for_clause: row.get::<_, String>(4),
4210            to: row.get::<_, Vec<String>>(5),
4211            using: row.get::<_, Option<String>>(6),
4212            with_check: row.get::<_, Option<String>>(7),
4213        })
4214        .collect();
4215
4216    Ok(())
4217}
4218
4219/// Minimal schema list for snapshot
4220#[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
4221#[derive(Debug, Clone)]
4222struct RawSchemaInfo {
4223    name: String,
4224}
4225
4226/// Aggregated raw introspection data collected from a `PostgreSQL` database.
4227///
4228/// Shared between `postgres-sync` and `tokio-postgres` code paths — the
4229/// collection phase differs (blocking vs async), but the downstream
4230/// processing is identical.
4231#[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
4232struct PostgresRawData {
4233    schemas: Vec<RawSchemaInfo>,
4234    tables: Vec<drizzle_migrations::postgres::introspect::RawTableInfo>,
4235    columns: Vec<drizzle_migrations::postgres::introspect::RawColumnInfo>,
4236    enums: Vec<drizzle_migrations::postgres::introspect::RawEnumInfo>,
4237    sequences: Vec<drizzle_migrations::postgres::introspect::RawSequenceInfo>,
4238    views: Vec<drizzle_migrations::postgres::introspect::RawViewInfo>,
4239    indexes: Vec<drizzle_migrations::postgres::introspect::RawIndexInfo>,
4240    foreign_keys: Vec<drizzle_migrations::postgres::introspect::RawForeignKeyInfo>,
4241    primary_keys: Vec<drizzle_migrations::postgres::introspect::RawPrimaryKeyInfo>,
4242    uniques: Vec<drizzle_migrations::postgres::introspect::RawUniqueInfo>,
4243    checks: Vec<drizzle_migrations::postgres::introspect::RawCheckInfo>,
4244    roles: Vec<drizzle_migrations::postgres::introspect::RawRoleInfo>,
4245    policies: Vec<drizzle_migrations::postgres::introspect::RawPolicyInfo>,
4246}
4247
4248/// Build a [`PostgresDDL`] from raw introspection data.
4249///
4250/// Identical across the sync and async paths — only the query phase differs.
4251#[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
4252fn build_postgres_ddl(
4253    raw: PostgresRawData,
4254) -> drizzle_migrations::postgres::collection::PostgresDDL {
4255    use drizzle_migrations::postgres::{
4256        PostgresDDL,
4257        ddl::Schema,
4258        introspect::{
4259            process_check_constraints, process_columns, process_enums, process_foreign_keys,
4260            process_indexes, process_policies, process_primary_keys, process_roles,
4261            process_sequences, process_tables, process_unique_constraints, process_views,
4262        },
4263    };
4264
4265    let mut ddl = PostgresDDL::new();
4266    for s in raw.schemas.into_iter().map(|s| Schema::new(s.name)) {
4267        ddl.schemas.push(s);
4268    }
4269    for e in process_enums(&raw.enums) {
4270        ddl.enums.push(e);
4271    }
4272    for s in process_sequences(&raw.sequences) {
4273        ddl.sequences.push(s);
4274    }
4275    for r in process_roles(&raw.roles) {
4276        ddl.roles.push(r);
4277    }
4278    for p in process_policies(&raw.policies) {
4279        ddl.policies.push(p);
4280    }
4281    for t in process_tables(&raw.tables) {
4282        ddl.tables.push(t);
4283    }
4284    for c in process_columns(&raw.columns) {
4285        ddl.columns.push(c);
4286    }
4287    for i in process_indexes(&raw.indexes) {
4288        ddl.indexes.push(i);
4289    }
4290    for fk in process_foreign_keys(&raw.foreign_keys) {
4291        ddl.fks.push(fk);
4292    }
4293    for pk in process_primary_keys(&raw.primary_keys) {
4294        ddl.pks.push(pk);
4295    }
4296    for u in process_unique_constraints(&raw.uniques) {
4297        ddl.uniques.push(u);
4298    }
4299    for c in process_check_constraints(&raw.checks) {
4300        ddl.checks.push(c);
4301    }
4302    for v in process_views(&raw.views) {
4303        ddl.views.push(v);
4304    }
4305    ddl
4306}
4307
4308/// Package a generated DDL + generated code into an [`IntrospectResult`].
4309#[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
4310fn finalize_postgres_introspection(
4311    ddl: &drizzle_migrations::postgres::collection::PostgresDDL,
4312    url: &str,
4313) -> IntrospectResult {
4314    use drizzle_migrations::postgres::codegen::{
4315        CodegenOptions, FieldCasing, generate_rust_schema,
4316    };
4317
4318    let options = CodegenOptions {
4319        module_doc: Some(format!("Schema introspected from {}", mask_url(url))),
4320        include_schema: true,
4321        schema_name: "Schema".to_string(),
4322        use_pub: true,
4323        field_casing: FieldCasing::default(),
4324    };
4325    let generated = generate_rust_schema(ddl, &options);
4326
4327    let mut snap = drizzle_migrations::postgres::PostgresSnapshot::new();
4328    for entity in ddl.to_entities() {
4329        snap.add_entity(entity);
4330    }
4331
4332    IntrospectResult {
4333        schema_code: generated.code,
4334        table_count: ddl.tables.list().len(),
4335        index_count: ddl.indexes.list().len(),
4336        view_count: ddl.views.list().len(),
4337        warnings: generated.warnings,
4338        snapshot: Snapshot::Postgres(snap),
4339        snapshot_path: std::path::PathBuf::new(),
4340    }
4341}
4342
4343#[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
4344fn mask_url(url: &str) -> String {
4345    if let Some(at) = url.find('@')
4346        && let Some(colon) = url[..at].rfind(':')
4347    {
4348        let scheme_end = url.find("://").map_or(0, |p| p + 3);
4349        if colon > scheme_end {
4350            return format!("{}****{}", &url[..=colon], &url[at..]);
4351        }
4352    }
4353    url.to_string()
4354}
4355
4356#[cfg(test)]
4357mod tests {
4358    use super::*;
4359
4360    #[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
4361    fn test_postgres_url() -> String {
4362        std::env::var("DATABASE_URL")
4363            .unwrap_or_else(|_| "postgres://postgres:postgres@localhost:5432/drizzle_test".into())
4364    }
4365
4366    #[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
4367    fn test_postgres_creds() -> crate::config::PostgresCreds {
4368        crate::config::PostgresCreds::Url(test_postgres_url().into_boxed_str())
4369    }
4370
4371    #[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
4372    fn unique_pg_name(prefix: &str) -> String {
4373        use std::time::{SystemTime, UNIX_EPOCH};
4374
4375        let nanos = SystemTime::now()
4376            .duration_since(UNIX_EPOCH)
4377            .expect("time went backwards")
4378            .as_nanos();
4379        format!("{}_{}_{}", prefix, std::process::id(), nanos)
4380    }
4381
4382    #[test]
4383    fn destructive_statement_detection_covers_drop_variants() {
4384        assert!(is_destructive_statement("DROP TABLE users;"));
4385        assert!(is_destructive_statement("DROP VIEW active_users;"));
4386        assert!(is_destructive_statement("DROP TYPE status;"));
4387        assert!(is_destructive_statement("DROP SCHEMA auth;"));
4388        assert!(is_destructive_statement("DROP ROLE app_user;"));
4389        assert!(is_destructive_statement(
4390            "DROP POLICY users_rls_policy ON users;"
4391        ));
4392        assert!(is_destructive_statement("TRUNCATE users;"));
4393        assert!(is_destructive_statement(
4394            "ALTER TABLE users DROP CONSTRAINT users_email_key;"
4395        ));
4396
4397        assert!(!is_destructive_statement("CREATE TABLE users(id INTEGER);"));
4398        assert!(!is_destructive_statement(
4399            "ALTER TABLE users ADD COLUMN email text;"
4400        ));
4401    }
4402
4403    #[cfg(feature = "rusqlite")]
4404    #[test]
4405    fn sqlite_migrations_run_both_when_created_at_collides() {
4406        // Regression for drizzle-orm beta.19: migration identity is the folder
4407        // name, not the `created_at` timestamp. Two migrations that share a
4408        // wall-second must both apply.
4409        use drizzle_migrations::{Migration, Migrations};
4410
4411        let dir = tempfile::tempdir().expect("tempdir");
4412        let db_path = dir.path().join("migrate.sqlite");
4413        let db_path_str = db_path.to_string_lossy().to_string();
4414
4415        let first_set = Migrations::new(
4416            vec![Migration::with_hash(
4417                "20230331141203_first",
4418                "hash_one",
4419                1_680_271_923_000,
4420                vec!["CREATE TABLE created_at_dedupe_a (id INTEGER PRIMARY KEY)".to_string()],
4421            )],
4422            drizzle_types::Dialect::SQLite,
4423        );
4424
4425        let first =
4426            run_sqlite_migrations(&first_set, &db_path_str).expect("first migrate succeeds");
4427        assert_eq!(first.applied_count, 1);
4428
4429        let second_set = Migrations::new(
4430            vec![
4431                Migration::with_hash(
4432                    "20230331141203_first",
4433                    "hash_one",
4434                    1_680_271_923_000,
4435                    vec!["CREATE TABLE created_at_dedupe_a (id INTEGER PRIMARY KEY)".to_string()],
4436                ),
4437                Migration::with_hash(
4438                    "20230331141203_second",
4439                    "hash_two",
4440                    1_680_271_923_000,
4441                    vec!["CREATE TABLE created_at_dedupe_b (id INTEGER PRIMARY KEY)".to_string()],
4442                ),
4443            ],
4444            drizzle_types::Dialect::SQLite,
4445        );
4446
4447        let second =
4448            run_sqlite_migrations(&second_set, &db_path_str).expect("second migrate succeeds");
4449        assert_eq!(
4450            second.applied_count, 1,
4451            "only the second (newly introduced by name) migration should apply"
4452        );
4453
4454        let conn = rusqlite::Connection::open(&db_path).expect("open sqlite");
4455        let rows: i64 = conn
4456            .query_row("SELECT COUNT(*) FROM __drizzle_migrations", [], |row| {
4457                row.get(0)
4458            })
4459            .expect("count migrations rows");
4460        assert_eq!(rows, 2, "both migration records should be stored");
4461
4462        let table_b_exists: i64 = conn
4463            .query_row(
4464                "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='created_at_dedupe_b'",
4465                [],
4466                |row| row.get(0),
4467            )
4468            .expect("query sqlite_master");
4469        assert_eq!(
4470            table_b_exists, 1,
4471            "the new-name migration must execute even though created_at collides"
4472        );
4473    }
4474
4475    #[cfg(feature = "rusqlite")]
4476    #[test]
4477    fn run_migrations_creates_metadata_table_with_no_local_migrations() {
4478        let dir = tempfile::tempdir().expect("tempdir");
4479        let db_path = dir.path().join("empty.sqlite");
4480        let migrations_dir = dir.path().join("migrations");
4481        std::fs::create_dir_all(&migrations_dir).expect("create migrations dir");
4482
4483        let creds = crate::config::Credentials::Sqlite {
4484            path: db_path.to_string_lossy().to_string().into_boxed_str(),
4485        };
4486
4487        let result = run_migrations(
4488            &creds,
4489            crate::config::Dialect::Sqlite,
4490            &migrations_dir,
4491            "__drizzle_migrations",
4492            "drizzle",
4493        )
4494        .expect("run migrations");
4495        assert_eq!(result.applied_count, 0);
4496
4497        let conn = rusqlite::Connection::open(&db_path).expect("open sqlite");
4498        let exists: i64 = conn
4499            .query_row(
4500                "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='__drizzle_migrations'",
4501                [],
4502                |row| row.get(0),
4503            )
4504            .expect("check metadata table");
4505        assert_eq!(exists, 1, "migrations metadata table should be created");
4506    }
4507
4508    #[cfg(any(
4509        feature = "rusqlite",
4510        feature = "libsql",
4511        feature = "turso",
4512        feature = "postgres-sync",
4513        feature = "tokio-postgres"
4514    ))]
4515    #[test]
4516    fn validate_init_metadata_matches_drizzle_orm_semantics() {
4517        use drizzle_migrations::{Migration, Migrations};
4518
4519        let empty_set = Migrations::empty(drizzle_types::Dialect::SQLite);
4520        validate_init_metadata(&[], &empty_set).expect("empty local migrations should be allowed");
4521
4522        let single = Migrations::new(
4523            vec![Migration::with_hash(
4524                "20230331141203_init",
4525                "hash_single",
4526                1_680_271_923_000,
4527                vec!["CREATE TABLE t(id INTEGER PRIMARY KEY)".to_string()],
4528            )],
4529            drizzle_types::Dialect::SQLite,
4530        );
4531        validate_init_metadata(&[], &single).expect("single local migration should be allowed");
4532
4533        let multiple = Migrations::new(
4534            vec![
4535                Migration::with_hash(
4536                    "20230331141203_first",
4537                    "hash_a",
4538                    1_680_271_923_000,
4539                    vec!["CREATE TABLE a(id INTEGER PRIMARY KEY)".to_string()],
4540                ),
4541                Migration::with_hash(
4542                    "20230331150000_second",
4543                    "hash_b",
4544                    1_680_275_400_000,
4545                    vec!["CREATE TABLE b(id INTEGER PRIMARY KEY)".to_string()],
4546                ),
4547            ],
4548            drizzle_types::Dialect::SQLite,
4549        );
4550
4551        let err =
4552            validate_init_metadata(&[], &multiple).expect_err("multiple local migrations rejected");
4553        assert_eq!(
4554            err.to_string(),
4555            "--init can't be used with existing migrations"
4556        );
4557
4558        let err = validate_init_metadata(&["20230331141203_init".to_string()], &single)
4559            .expect_err("existing db metadata should be rejected");
4560        assert_eq!(
4561            err.to_string(),
4562            "--init can't be used when database already has migrations set"
4563        );
4564    }
4565
4566    #[test]
4567    fn verify_applied_migrations_detects_hash_mismatch() {
4568        use drizzle_migrations::{Migration, Migrations};
4569
4570        let set = Migrations::new(
4571            vec![Migration::with_hash(
4572                "20230331141203_verify",
4573                "local_hash",
4574                1_680_271_923_000,
4575                vec!["CREATE TABLE t(id INTEGER PRIMARY KEY)".to_string()],
4576            )],
4577            drizzle_types::Dialect::SQLite,
4578        );
4579
4580        let applied = vec![AppliedMigrationRecord {
4581            hash: "db_hash".to_string(),
4582            name: "20230331141203_verify".to_string(),
4583        }];
4584
4585        let err = verify_applied_migrations_consistency(&set, &applied)
4586            .expect_err("hash mismatch should fail verification");
4587        assert_eq!(
4588            err.to_string(),
4589            "Migration failed: Migration hash mismatch for 20230331141203_verify: database=db_hash, local=local_hash"
4590        );
4591    }
4592
4593    #[test]
4594    fn build_migration_plan_counts_pending_statements() {
4595        use drizzle_migrations::{Migration, Migrations};
4596
4597        let set = Migrations::new(
4598            vec![
4599                Migration::with_hash(
4600                    "20230331141203_first",
4601                    "hash_a",
4602                    1_680_271_923_000,
4603                    vec![
4604                        "CREATE TABLE a(id INTEGER PRIMARY KEY)".to_string(),
4605                        "CREATE INDEX a_id_idx ON a(id)".to_string(),
4606                    ],
4607                ),
4608                Migration::with_hash(
4609                    "20230331150000_second",
4610                    "hash_b",
4611                    1_680_275_400_000,
4612                    vec!["CREATE TABLE b(id INTEGER PRIMARY KEY)".to_string()],
4613                ),
4614            ],
4615            drizzle_types::Dialect::SQLite,
4616        );
4617
4618        let applied = vec![AppliedMigrationRecord {
4619            hash: "hash_a".to_string(),
4620            name: "20230331141203_first".to_string(),
4621        }];
4622
4623        let plan = build_migration_plan(&set, &applied).expect("build migration plan");
4624        assert_eq!(plan.applied_count, 1);
4625        assert_eq!(plan.pending_count, 1);
4626        assert_eq!(plan.pending_statements, 1);
4627        assert_eq!(plan.pending_migrations, vec!["20230331150000_second"]);
4628    }
4629
4630    #[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
4631    #[test]
4632    fn postgres_concurrent_index_detection_is_case_insensitive() {
4633        assert!(is_postgres_concurrent_index_statement(
4634            "CREATE INDEX CONCURRENTLY users_email_idx ON users (email);"
4635        ));
4636        assert!(is_postgres_concurrent_index_statement(
4637            "CREATE UNIQUE INDEX CONCURRENTLY users_email_idx ON users (email);"
4638        ));
4639        assert!(is_postgres_concurrent_index_statement(
4640            "drop index concurrently users_email_idx;"
4641        ));
4642        assert!(!is_postgres_concurrent_index_statement(
4643            "CREATE INDEX users_email_idx ON users (email);"
4644        ));
4645    }
4646
4647    #[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
4648    #[test]
4649    fn postgres_concurrent_index_detection_over_statement_list() {
4650        let with_concurrent = vec![
4651            "CREATE TABLE users(id integer);".to_string(),
4652            "CREATE INDEX CONCURRENTLY users_email_idx ON users (id);".to_string(),
4653        ];
4654        let without_concurrent = vec![
4655            "CREATE TABLE users(id integer);".to_string(),
4656            "CREATE INDEX users_email_idx ON users (id);".to_string(),
4657        ];
4658
4659        assert!(has_postgres_concurrent_index(&with_concurrent));
4660        assert!(!has_postgres_concurrent_index(&without_concurrent));
4661    }
4662
4663    #[test]
4664    fn generate_push_sql_includes_concurrent_postgres_index() {
4665        use crate::snapshot::parse_result_to_snapshot;
4666        use drizzle_migrations::parser::SchemaParser;
4667        use drizzle_types::Dialect;
4668
4669        let previous = r#"
4670#[PostgresTable]
4671pub struct Users {
4672    #[column(primary)]
4673    pub id: i32,
4674    pub email: String,
4675}
4676"#;
4677
4678        let current = r#"
4679#[PostgresTable]
4680pub struct Users {
4681    #[column(primary)]
4682    pub id: i32,
4683    pub email: String,
4684}
4685
4686#[PostgresIndex(concurrent)]
4687pub struct UsersEmailIdx(Users::email);
4688"#;
4689
4690        let prev =
4691            parse_result_to_snapshot(&SchemaParser::parse(previous), Dialect::PostgreSQL, None);
4692        let curr =
4693            parse_result_to_snapshot(&SchemaParser::parse(current), Dialect::PostgreSQL, None);
4694
4695        let (sql, warnings) = generate_push_sql(&prev, &curr, false).expect("push sql generation");
4696        assert!(warnings.is_empty());
4697        assert_eq!(sql.len(), 1);
4698        assert_eq!(
4699            sql[0],
4700            "CREATE INDEX CONCURRENTLY \"users_email_idx\" ON \"users\" USING btree (\"email\" NULLS LAST);"
4701        );
4702    }
4703
4704    #[test]
4705    fn filter_patterns_support_negation_globs() {
4706        let raw = vec![
4707            "users_*".to_string(),
4708            "!users_4".to_string(),
4709            "!ad*".to_string(),
4710        ];
4711        let patterns = compile_patterns(Some(&raw)).expect("compile patterns");
4712
4713        assert!(matches_patterns("users_1", patterns.as_deref()));
4714        assert!(!matches_patterns("users_4", patterns.as_deref()));
4715        assert!(!matches_patterns("admin", patterns.as_deref()));
4716        assert!(!matches_patterns("audit", patterns.as_deref()));
4717    }
4718
4719    #[test]
4720    fn postgres_table_filter_matches_table_name_only() {
4721        use crate::snapshot::parse_result_to_snapshot;
4722        use drizzle_migrations::parser::SchemaParser;
4723        use drizzle_migrations::schema::Snapshot;
4724        use drizzle_types::Dialect as BaseDialect;
4725
4726        let code = r#"
4727#[PostgresTable(schema = "admin")]
4728pub struct AuditLog {
4729    #[column(primary)]
4730    pub id: i32,
4731}
4732
4733#[PostgresTable(schema = "public")]
4734pub struct Users {
4735    #[column(primary)]
4736    pub id: i32,
4737}
4738"#;
4739
4740        let parsed = SchemaParser::parse(code);
4741        let mut snapshot = parse_result_to_snapshot(&parsed, BaseDialect::PostgreSQL, None);
4742        let filters = SnapshotFilters {
4743            tables: Some(vec!["admin.*".to_string()]),
4744            schemas: None,
4745            extensions: None,
4746        };
4747
4748        apply_snapshot_filters(&mut snapshot, crate::config::Dialect::Postgresql, &filters)
4749            .expect("apply filters");
4750
4751        let remaining_tables = match snapshot {
4752            Snapshot::Postgres(s) => s
4753                .ddl
4754                .iter()
4755                .filter_map(|e| {
4756                    if let drizzle_types::postgres::ddl::PostgresEntity::Table(t) = e {
4757                        Some((t.schema.to_string(), t.name.to_string()))
4758                    } else {
4759                        None
4760                    }
4761                })
4762                .collect::<Vec<_>>(),
4763            _ => panic!("expected postgres snapshot"),
4764        };
4765
4766        assert!(remaining_tables.is_empty());
4767    }
4768
4769    #[test]
4770    fn postgres_schema_and_table_filters_intersect() {
4771        use crate::snapshot::parse_result_to_snapshot;
4772        use drizzle_migrations::parser::SchemaParser;
4773        use drizzle_migrations::schema::Snapshot;
4774        use drizzle_types::Dialect as BaseDialect;
4775
4776        let code = r#"
4777#[PostgresTable(schema = "dev")]
4778pub struct UsersDev {
4779    #[column(primary)]
4780    pub id: i32,
4781}
4782
4783#[PostgresTable(schema = "public")]
4784pub struct UsersPublic {
4785    #[column(primary)]
4786    pub id: i32,
4787}
4788"#;
4789
4790        let parsed = SchemaParser::parse(code);
4791        let mut snapshot = parse_result_to_snapshot(&parsed, BaseDialect::PostgreSQL, None);
4792        let filters = SnapshotFilters {
4793            tables: Some(vec!["users_*".to_string()]),
4794            schemas: Some(vec!["!dev".to_string()]),
4795            extensions: None,
4796        };
4797
4798        apply_snapshot_filters(&mut snapshot, crate::config::Dialect::Postgresql, &filters)
4799            .expect("apply filters");
4800
4801        let remaining_tables = match snapshot {
4802            Snapshot::Postgres(s) => s
4803                .ddl
4804                .iter()
4805                .filter_map(|e| {
4806                    if let drizzle_types::postgres::ddl::PostgresEntity::Table(t) = e {
4807                        Some((t.schema.to_string(), t.name.to_string()))
4808                    } else {
4809                        None
4810                    }
4811                })
4812                .collect::<Vec<_>>(),
4813            _ => panic!("expected postgres snapshot"),
4814        };
4815
4816        assert_eq!(
4817            remaining_tables,
4818            vec![("public".to_string(), "users_public".to_string())]
4819        );
4820    }
4821
4822    #[test]
4823    fn postgres_extensions_filter_excludes_postgis_internal_objects() {
4824        use crate::snapshot::parse_result_to_snapshot;
4825        use drizzle_migrations::parser::SchemaParser;
4826        use drizzle_migrations::schema::Snapshot;
4827        use drizzle_types::Dialect as BaseDialect;
4828
4829        let code = r#"
4830#[PostgresTable(schema = "topology")]
4831pub struct TopologyLayer {
4832    #[column(primary)]
4833    pub id: i32,
4834}
4835
4836#[PostgresTable]
4837pub struct SpatialRefSys {
4838    #[column(primary)]
4839    pub id: i32,
4840}
4841
4842#[PostgresTable]
4843pub struct Users {
4844    #[column(primary)]
4845    pub id: i32,
4846}
4847"#;
4848
4849        let parsed = SchemaParser::parse(code);
4850        let mut snapshot = parse_result_to_snapshot(&parsed, BaseDialect::PostgreSQL, None);
4851        let filters = SnapshotFilters {
4852            tables: None,
4853            schemas: None,
4854            extensions: Some(vec![Extension::Postgis]),
4855        };
4856
4857        apply_snapshot_filters(&mut snapshot, crate::config::Dialect::Postgresql, &filters)
4858            .expect("apply filters");
4859
4860        let remaining_tables = match snapshot {
4861            Snapshot::Postgres(s) => s
4862                .ddl
4863                .iter()
4864                .filter_map(|e| {
4865                    if let drizzle_types::postgres::ddl::PostgresEntity::Table(t) = e {
4866                        Some((t.schema.to_string(), t.name.to_string()))
4867                    } else {
4868                        None
4869                    }
4870                })
4871                .collect::<Vec<_>>(),
4872            _ => panic!("expected postgres snapshot"),
4873        };
4874
4875        assert_eq!(
4876            remaining_tables,
4877            vec![("public".to_string(), "users".to_string())]
4878        );
4879    }
4880
4881    #[test]
4882    fn format_migration_sql_respects_breakpoints_flag() {
4883        let sql = vec![
4884            "CREATE TABLE users(id integer);".to_string(),
4885            "CREATE INDEX users_id_idx ON users(id);".to_string(),
4886        ];
4887
4888        let with_breakpoints = format_migration_sql(&sql, true);
4889        assert_eq!(
4890            with_breakpoints,
4891            "CREATE TABLE users(id integer);\n--> statement-breakpoint\nCREATE INDEX users_id_idx ON users(id);"
4892        );
4893
4894        let without_breakpoints = format_migration_sql(&sql, false);
4895        assert_eq!(
4896            without_breakpoints,
4897            "CREATE TABLE users(id integer);\n\nCREATE INDEX users_id_idx ON users(id);"
4898        );
4899
4900        let empty = format_migration_sql(&[], false);
4901        assert_eq!(empty, "-- No tables to create (empty database)\n");
4902    }
4903
4904    #[test]
4905    fn regenerate_sqlite_schema_applies_introspect_casing() {
4906        use crate::config::{Casing, IntrospectCasing};
4907        use crate::snapshot::parse_result_to_snapshot;
4908        use drizzle_migrations::parser::SchemaParser;
4909        use drizzle_types::Dialect as BaseDialect;
4910
4911        let code = r#"
4912#[SQLiteTable]
4913pub struct AuditLogs {
4914    #[column(primary)]
4915    pub id: i64,
4916    pub user_name: String,
4917}
4918"#;
4919
4920        let parsed = SchemaParser::parse(code);
4921        let snapshot =
4922            parse_result_to_snapshot(&parsed, BaseDialect::SQLite, Some(Casing::SnakeCase));
4923
4924        let mut camel = IntrospectResult {
4925            schema_code: String::new(),
4926            table_count: 0,
4927            index_count: 0,
4928            view_count: 0,
4929            warnings: Vec::new(),
4930            snapshot: snapshot.clone(),
4931            snapshot_path: std::path::PathBuf::new(),
4932        };
4933        regenerate_schema_from_snapshot(
4934            &mut camel,
4935            crate::config::Dialect::Sqlite,
4936            Some(IntrospectCasing::Camel),
4937        );
4938
4939        assert_eq!(
4940            camel.schema_code,
4941            "\
4942//! Auto-generated SQLite schema from introspection
4943//!
4944//! Schema introspected from filtered database objects
4945
4946use drizzle::sqlite::prelude::*;
4947
4948#[SQLiteTable(name = \"audit_logs\")]
4949pub struct AuditLogs {
4950    #[column(primary)]
4951    pub id: i64,
4952    pub userName: String,
4953}
4954
4955#[derive(SQLiteSchema)]
4956pub struct Schema {
4957    pub auditLogs: AuditLogs,
4958}
4959"
4960        );
4961
4962        let mut preserve = IntrospectResult {
4963            schema_code: String::new(),
4964            table_count: 0,
4965            index_count: 0,
4966            view_count: 0,
4967            warnings: Vec::new(),
4968            snapshot,
4969            snapshot_path: std::path::PathBuf::new(),
4970        };
4971        regenerate_schema_from_snapshot(
4972            &mut preserve,
4973            crate::config::Dialect::Sqlite,
4974            Some(IntrospectCasing::Preserve),
4975        );
4976
4977        assert_eq!(
4978            preserve.schema_code,
4979            "\
4980//! Auto-generated SQLite schema from introspection
4981//!
4982//! Schema introspected from filtered database objects
4983
4984use drizzle::sqlite::prelude::*;
4985
4986#[SQLiteTable(name = \"audit_logs\")]
4987pub struct AuditLogs {
4988    #[column(primary)]
4989    pub id: i64,
4990    pub user_name: String,
4991}
4992
4993#[derive(SQLiteSchema)]
4994pub struct Schema {
4995    pub audit_logs: AuditLogs,
4996}
4997"
4998        );
4999    }
5000
5001    #[test]
5002    fn regenerate_postgres_schema_applies_introspect_casing() {
5003        use crate::config::{Casing, IntrospectCasing};
5004        use crate::snapshot::parse_result_to_snapshot;
5005        use drizzle_migrations::parser::SchemaParser;
5006        use drizzle_types::Dialect as BaseDialect;
5007
5008        let code = r#"
5009#[PostgresTable]
5010pub struct AuditLogs {
5011    #[column(primary)]
5012    pub id: i32,
5013    pub user_name: String,
5014}
5015"#;
5016
5017        let parsed = SchemaParser::parse(code);
5018        let snapshot =
5019            parse_result_to_snapshot(&parsed, BaseDialect::PostgreSQL, Some(Casing::SnakeCase));
5020
5021        let mut camel = IntrospectResult {
5022            schema_code: String::new(),
5023            table_count: 0,
5024            index_count: 0,
5025            view_count: 0,
5026            warnings: Vec::new(),
5027            snapshot: snapshot.clone(),
5028            snapshot_path: std::path::PathBuf::new(),
5029        };
5030        regenerate_schema_from_snapshot(
5031            &mut camel,
5032            crate::config::Dialect::Postgresql,
5033            Some(IntrospectCasing::Camel),
5034        );
5035
5036        assert_eq!(
5037            camel.schema_code,
5038            "\
5039//! Auto-generated PostgreSQL schema from introspection
5040//!
5041//! Schema introspected from filtered database objects
5042
5043use drizzle::postgres::prelude::*;
5044
5045#[PostgresTable]
5046pub struct AuditLogs {
5047    #[column(primary)]
5048    pub id: i32,
5049    pub userName: String,
5050}
5051
5052#[derive(PostgresSchema)]
5053pub struct Schema {
5054    pub auditLogs: AuditLogs,
5055}
5056"
5057        );
5058
5059        let mut preserve = IntrospectResult {
5060            schema_code: String::new(),
5061            table_count: 0,
5062            index_count: 0,
5063            view_count: 0,
5064            warnings: Vec::new(),
5065            snapshot,
5066            snapshot_path: std::path::PathBuf::new(),
5067        };
5068        regenerate_schema_from_snapshot(
5069            &mut preserve,
5070            crate::config::Dialect::Postgresql,
5071            Some(IntrospectCasing::Preserve),
5072        );
5073
5074        assert_eq!(
5075            preserve.schema_code,
5076            "\
5077//! Auto-generated PostgreSQL schema from introspection
5078//!
5079//! Schema introspected from filtered database objects
5080
5081use drizzle::postgres::prelude::*;
5082
5083#[PostgresTable]
5084pub struct AuditLogs {
5085    #[column(primary)]
5086    pub id: i32,
5087    pub user_name: String,
5088}
5089
5090#[derive(PostgresSchema)]
5091pub struct Schema {
5092    pub audit_logs: AuditLogs,
5093}
5094"
5095        );
5096    }
5097
5098    #[cfg(feature = "postgres-sync")]
5099    #[test]
5100    fn postgres_sync_migrate_applies_concurrent_index_without_transaction() {
5101        use drizzle_migrations::{Migration, Migrations};
5102        use drizzle_types::Dialect;
5103
5104        let creds = test_postgres_creds();
5105        let url = creds.connection_url();
5106
5107        let mut setup_client = match postgres::Client::connect(&url, postgres::NoTls) {
5108            Ok(c) => c,
5109            Err(e) => {
5110                eprintln!(
5111                    "Skipping postgres_sync_migrate_applies_concurrent_index_without_transaction: {}",
5112                    e
5113                );
5114                return;
5115            }
5116        };
5117
5118        let table = unique_pg_name("cli_sync_users");
5119        let index = format!("{}_email_idx", table);
5120        let migration_schema = unique_pg_name("cli_sync_mig");
5121
5122        setup_client
5123            .batch_execute(&format!(
5124                "DROP TABLE IF EXISTS \"{table}\" CASCADE; \
5125                 CREATE TABLE \"{table}\" (id integer, email text NOT NULL); \
5126                 INSERT INTO \"{table}\" (id, email) VALUES (1, 'a@example.com');"
5127            ))
5128            .expect("setup table for concurrent index test");
5129        drop(setup_client);
5130
5131        let migration_tag = format!("20260212000000_{table}");
5132        let migration_sql =
5133            format!("CREATE INDEX CONCURRENTLY \"{index}\" ON \"{table}\" (\"email\");");
5134        let set = Migrations::with_tracking(
5135            vec![Migration::new(&migration_tag, &migration_sql)],
5136            Dialect::PostgreSQL,
5137            drizzle_migrations::Tracking::POSTGRES.schema(migration_schema.clone()),
5138        );
5139
5140        let result = run_postgres_sync_migrations(&set, &creds)
5141            .expect("sync migration with concurrent index should succeed");
5142        assert_eq!(result.applied_count, 1);
5143
5144        let mut verify_client =
5145            postgres::Client::connect(&url, postgres::NoTls).expect("reconnect for verification");
5146        let exists: i64 = verify_client
5147            .query_one(
5148                "SELECT COUNT(*)::bigint FROM pg_indexes \
5149                 WHERE schemaname = 'public' AND tablename = $1 AND indexname = $2",
5150                &[&table, &index],
5151            )
5152            .expect("query pg_indexes")
5153            .get(0);
5154        assert_eq!(exists, 1, "concurrent index was not created");
5155
5156        let _ = verify_client.batch_execute(&format!(
5157            "DROP TABLE IF EXISTS \"{table}\" CASCADE; \
5158             DROP SCHEMA IF EXISTS \"{migration_schema}\" CASCADE;"
5159        ));
5160    }
5161
5162    #[cfg(feature = "postgres-sync")]
5163    #[test]
5164    fn postgres_sync_migrate_upgrades_legacy_tracking_table_and_applies_pending() {
5165        use drizzle_migrations::{Migration, Migrations};
5166        use drizzle_types::Dialect;
5167
5168        let creds = test_postgres_creds();
5169        let url = creds.connection_url();
5170        let mut client = postgres::Client::connect(&url, postgres::NoTls)
5171            .expect("connect postgres for legacy tracking upgrade test");
5172
5173        let applied_table = unique_pg_name("cli_sync_applied");
5174        let pending_table = unique_pg_name("cli_sync_pending");
5175        let migration_schema = unique_pg_name("cli_sync_tracking");
5176
5177        client
5178            .batch_execute(&format!(
5179                "DROP TABLE IF EXISTS \"{applied_table}\" CASCADE; \
5180                 DROP TABLE IF EXISTS \"{pending_table}\" CASCADE; \
5181                 DROP SCHEMA IF EXISTS \"{migration_schema}\" CASCADE; \
5182                 CREATE SCHEMA \"{migration_schema}\"; \
5183                 CREATE TABLE \"{migration_schema}\".\"__drizzle_migrations\" (id SERIAL PRIMARY KEY, hash TEXT NOT NULL, created_at BIGINT); \
5184                 CREATE TABLE \"{applied_table}\" (id integer primary key);"
5185            ))
5186            .expect("setup legacy postgres tracking metadata");
5187
5188        let first = Migration::new(
5189            &format!("20230331141203_{applied_table}"),
5190            &format!("CREATE TABLE \"{applied_table}\" (id integer primary key);"),
5191        );
5192        let second = Migration::new(
5193            &format!("20230331141204_{pending_table}"),
5194            &format!("CREATE TABLE \"{pending_table}\" (id integer primary key);"),
5195        );
5196        let set = Migrations::with_tracking(
5197            vec![first.clone(), second.clone()],
5198            Dialect::PostgreSQL,
5199            drizzle_migrations::Tracking::POSTGRES.schema(migration_schema.clone()),
5200        );
5201
5202        client
5203            .execute(
5204                &format!(
5205                    "INSERT INTO \"{migration_schema}\".\"__drizzle_migrations\" (hash, created_at) VALUES ($1, $2)"
5206                ),
5207                &[&first.hash(), &first.created_at()],
5208            )
5209            .expect("insert legacy applied migration row");
5210        drop(client);
5211
5212        let result = run_postgres_sync_migrations(&set, &creds)
5213            .expect("sync migration upgrade should succeed");
5214        assert_eq!(result.applied_count, 1);
5215        assert_eq!(result.applied_migrations, vec![second.hash().to_string()]);
5216
5217        let mut verify_client =
5218            postgres::Client::connect(&url, postgres::NoTls).expect("reconnect for verification");
5219        let columns: Vec<String> = verify_client
5220            .query(
5221                "SELECT column_name FROM information_schema.columns WHERE table_schema = $1 AND table_name = '__drizzle_migrations' ORDER BY ordinal_position",
5222                &[&migration_schema],
5223            )
5224            .expect("query upgraded tracking columns")
5225            .into_iter()
5226            .map(|row| row.get(0))
5227            .collect();
5228        assert_eq!(
5229            columns,
5230            vec!["id", "hash", "created_at", "name", "applied_at"]
5231        );
5232
5233        let rows: Vec<(String, i64, String, Option<String>)> = verify_client
5234            .query(
5235                &format!(
5236                    "SELECT hash, created_at, name, applied_at::text FROM \"{migration_schema}\".\"__drizzle_migrations\" ORDER BY id ASC"
5237                ),
5238                &[],
5239            )
5240            .expect("query upgraded metadata rows")
5241            .into_iter()
5242            .map(|row| (row.get(0), row.get(1), row.get(2), row.get(3)))
5243            .collect();
5244        assert_eq!(rows.len(), 2);
5245        assert_eq!(rows[0].0, first.hash());
5246        assert_eq!(rows[0].1, first.created_at());
5247        assert_eq!(rows[0].2, first.name());
5248        assert_eq!(rows[0].3, None);
5249        assert_eq!(rows[1].0, second.hash());
5250        assert_eq!(rows[1].1, second.created_at());
5251        assert_eq!(rows[1].2, second.name());
5252        assert!(rows[1].3.is_some());
5253
5254        let pending_exists: i64 = verify_client
5255            .query_one(
5256                "SELECT COUNT(*)::bigint FROM information_schema.tables WHERE table_schema = 'public' AND table_name = $1",
5257                &[&pending_table],
5258            )
5259            .expect("query pending table")
5260            .get(0);
5261        assert_eq!(pending_exists, 1);
5262
5263        let _ = verify_client.batch_execute(&format!(
5264            "DROP TABLE IF EXISTS \"{applied_table}\" CASCADE; \
5265             DROP TABLE IF EXISTS \"{pending_table}\" CASCADE; \
5266             DROP SCHEMA IF EXISTS \"{migration_schema}\" CASCADE;"
5267        ));
5268    }
5269
5270    #[cfg(feature = "postgres-sync")]
5271    #[test]
5272    fn postgres_sync_migrate_upgrade_rejects_unmatched_legacy_rows() {
5273        use drizzle_migrations::{Migration, Migrations};
5274        use drizzle_types::Dialect;
5275
5276        let creds = test_postgres_creds();
5277        let url = creds.connection_url();
5278        let mut client = postgres::Client::connect(&url, postgres::NoTls)
5279            .expect("connect postgres for unmatched legacy row test");
5280
5281        let migration_schema = unique_pg_name("cli_sync_tracking_unmatched");
5282        client
5283            .batch_execute(&format!(
5284                "DROP SCHEMA IF EXISTS \"{migration_schema}\" CASCADE; \
5285                 CREATE SCHEMA \"{migration_schema}\"; \
5286                 CREATE TABLE \"{migration_schema}\".\"__drizzle_migrations\" (id SERIAL PRIMARY KEY, hash TEXT NOT NULL, created_at BIGINT);"
5287            ))
5288            .expect("setup unmatched legacy metadata");
5289
5290        let migration = Migration::new(
5291            "20230331141203_cli_sync_first",
5292            "CREATE TABLE \"cli_sync_unmatched_target\" (id integer primary key);",
5293        );
5294        let set = Migrations::with_tracking(
5295            vec![migration],
5296            Dialect::PostgreSQL,
5297            drizzle_migrations::Tracking::POSTGRES.schema(migration_schema.clone()),
5298        );
5299
5300        client
5301            .execute(
5302                &format!(
5303                    "INSERT INTO \"{migration_schema}\".\"__drizzle_migrations\" (hash, created_at) VALUES ($1, $2)"
5304                ),
5305                &[&"unknown_hash", &1_680_271_924_000_i64],
5306            )
5307            .expect("insert unmatched legacy row");
5308        drop(client);
5309
5310        let err = run_postgres_sync_migrations(&set, &creds)
5311            .expect_err("unmatched legacy metadata should fail");
5312        assert!(err.to_string().contains("do not match local migrations"));
5313
5314        let mut verify_client =
5315            postgres::Client::connect(&url, postgres::NoTls).expect("reconnect for verification");
5316        let columns: Vec<String> = verify_client
5317            .query(
5318                "SELECT column_name FROM information_schema.columns WHERE table_schema = $1 AND table_name = '__drizzle_migrations' ORDER BY ordinal_position",
5319                &[&migration_schema],
5320            )
5321            .expect("query legacy tracking columns")
5322            .into_iter()
5323            .map(|row| row.get(0))
5324            .collect();
5325        assert_eq!(columns, vec!["id", "hash", "created_at"]);
5326
5327        let _ = verify_client.batch_execute(&format!(
5328            "DROP TABLE IF EXISTS \"cli_sync_unmatched_target\" CASCADE; \
5329             DROP SCHEMA IF EXISTS \"{migration_schema}\" CASCADE;"
5330        ));
5331    }
5332
5333    #[cfg(feature = "tokio-postgres")]
5334    #[test]
5335    fn tokio_postgres_migrate_applies_concurrent_index_without_transaction() {
5336        use drizzle_migrations::{Migration, Migrations};
5337        use drizzle_types::Dialect;
5338
5339        let creds = test_postgres_creds();
5340        let url = creds.connection_url();
5341        let rt = tokio::runtime::Builder::new_current_thread()
5342            .enable_all()
5343            .build()
5344            .expect("create tokio runtime");
5345
5346        rt.block_on(async {
5347            let (client, connection) = match tokio_postgres::connect(&url, tokio_postgres::NoTls).await
5348            {
5349                Ok(v) => v,
5350                Err(e) => {
5351                    eprintln!(
5352                        "Skipping tokio_postgres_migrate_applies_concurrent_index_without_transaction: {}",
5353                        e
5354                    );
5355                    return;
5356                }
5357            };
5358
5359            tokio::spawn(async move {
5360                let _ = connection.await;
5361            });
5362
5363            let table = unique_pg_name("cli_async_users");
5364            let index = format!("{}_email_idx", table);
5365            let migration_schema = unique_pg_name("cli_async_mig");
5366
5367            client
5368                .batch_execute(&format!(
5369                    "DROP TABLE IF EXISTS \"{table}\" CASCADE; \
5370                     CREATE TABLE \"{table}\" (id integer, email text NOT NULL); \
5371                     INSERT INTO \"{table}\" (id, email) VALUES (1, 'a@example.com');"
5372                ))
5373                .await
5374                .expect("setup table for async concurrent index test");
5375
5376            let migration_tag = format!("20260212000000_{table}");
5377            let migration_sql = format!(
5378                "CREATE INDEX CONCURRENTLY \"{index}\" ON \"{table}\" (\"email\");"
5379            );
5380            let set = Migrations::with_tracking(
5381                vec![Migration::new(&migration_tag, &migration_sql)],
5382                Dialect::PostgreSQL,
5383                drizzle_migrations::Tracking::POSTGRES.schema(migration_schema.clone()),
5384            );
5385
5386            let result = run_postgres_async_inner(&set, &creds)
5387                .await
5388                .expect("async migration with concurrent index should succeed");
5389            assert_eq!(result.applied_count, 1);
5390
5391            let exists: i64 = client
5392                .query_one(
5393                    "SELECT COUNT(*)::bigint FROM pg_indexes \
5394                     WHERE schemaname = 'public' AND tablename = $1 AND indexname = $2",
5395                    &[&table, &index],
5396                )
5397                .await
5398                .expect("query pg_indexes")
5399                .get(0);
5400            assert_eq!(exists, 1, "async concurrent index was not created");
5401
5402            let _ = client
5403                .batch_execute(&format!(
5404                    "DROP TABLE IF EXISTS \"{table}\" CASCADE; \
5405                     DROP SCHEMA IF EXISTS \"{migration_schema}\" CASCADE;"
5406                ))
5407                .await;
5408        });
5409    }
5410
5411    #[cfg(feature = "tokio-postgres")]
5412    #[test]
5413    fn tokio_postgres_migrate_upgrades_legacy_tracking_table_and_applies_pending() {
5414        use drizzle_migrations::{Migration, Migrations};
5415        use drizzle_types::Dialect;
5416
5417        let creds = test_postgres_creds();
5418        let url = creds.connection_url();
5419        let rt = tokio::runtime::Builder::new_current_thread()
5420            .enable_all()
5421            .build()
5422            .expect("create tokio runtime");
5423
5424        rt.block_on(async {
5425            let (client, connection) = tokio_postgres::connect(&url, tokio_postgres::NoTls)
5426                .await
5427                .expect("connect tokio-postgres for legacy tracking upgrade test");
5428
5429            tokio::spawn(async move {
5430                let _ = connection.await;
5431            });
5432
5433            let applied_table = unique_pg_name("cli_async_applied");
5434            let pending_table = unique_pg_name("cli_async_pending");
5435            let migration_schema = unique_pg_name("cli_async_tracking");
5436
5437            client
5438                .batch_execute(&format!(
5439                    "DROP TABLE IF EXISTS \"{applied_table}\" CASCADE; \
5440                     DROP TABLE IF EXISTS \"{pending_table}\" CASCADE; \
5441                     DROP SCHEMA IF EXISTS \"{migration_schema}\" CASCADE; \
5442                     CREATE SCHEMA \"{migration_schema}\"; \
5443                     CREATE TABLE \"{migration_schema}\".\"__drizzle_migrations\" (id SERIAL PRIMARY KEY, hash TEXT NOT NULL, created_at BIGINT); \
5444                     CREATE TABLE \"{applied_table}\" (id integer primary key);"
5445                ))
5446                .await
5447                .expect("setup legacy postgres tracking metadata");
5448
5449            let first = Migration::new(
5450                &format!("20230331141203_{applied_table}"),
5451                &format!("CREATE TABLE \"{applied_table}\" (id integer primary key);"),
5452            );
5453            let second = Migration::new(
5454                &format!("20230331141204_{pending_table}"),
5455                &format!("CREATE TABLE \"{pending_table}\" (id integer primary key);"),
5456            );
5457            let set = Migrations::with_tracking(
5458                vec![first.clone(), second.clone()],
5459                Dialect::PostgreSQL,
5460                drizzle_migrations::Tracking::POSTGRES.schema(migration_schema.clone()),
5461            );
5462
5463            client
5464                .execute(
5465                    &format!(
5466                        "INSERT INTO \"{migration_schema}\".\"__drizzle_migrations\" (hash, created_at) VALUES ($1, $2)"
5467                    ),
5468                    &[&first.hash(), &first.created_at()],
5469                )
5470                .await
5471                .expect("insert legacy applied migration row");
5472
5473            let result = run_postgres_async_inner(&set, &creds)
5474                .await
5475                .expect("async migration upgrade should succeed");
5476            assert_eq!(result.applied_count, 1);
5477            assert_eq!(result.applied_migrations, vec![second.hash().to_string()]);
5478
5479            let columns: Vec<String> = client
5480                .query(
5481                    "SELECT column_name FROM information_schema.columns WHERE table_schema = $1 AND table_name = '__drizzle_migrations' ORDER BY ordinal_position",
5482                    &[&migration_schema],
5483                )
5484                .await
5485                .expect("query upgraded tracking columns")
5486                .into_iter()
5487                .map(|row| row.get(0))
5488                .collect();
5489            assert_eq!(
5490                columns,
5491                vec!["id", "hash", "created_at", "name", "applied_at"]
5492            );
5493
5494            let rows: Vec<(String, i64, String, Option<String>)> = client
5495                .query(
5496                    &format!(
5497                        "SELECT hash, created_at, name, applied_at::text FROM \"{migration_schema}\".\"__drizzle_migrations\" ORDER BY id ASC"
5498                    ),
5499                    &[],
5500                )
5501                .await
5502                .expect("query upgraded metadata rows")
5503                .into_iter()
5504                .map(|row| (row.get(0), row.get(1), row.get(2), row.get(3)))
5505                .collect();
5506            assert_eq!(rows.len(), 2);
5507            assert_eq!(rows[0].0, first.hash());
5508            assert_eq!(rows[0].1, first.created_at());
5509            assert_eq!(rows[0].2, first.name());
5510            assert_eq!(rows[0].3, None);
5511            assert_eq!(rows[1].0, second.hash());
5512            assert_eq!(rows[1].1, second.created_at());
5513            assert_eq!(rows[1].2, second.name());
5514            assert!(rows[1].3.is_some());
5515
5516            let pending_exists: i64 = client
5517                .query_one(
5518                    "SELECT COUNT(*)::bigint FROM information_schema.tables WHERE table_schema = 'public' AND table_name = $1",
5519                    &[&pending_table],
5520                )
5521                .await
5522                .expect("query pending table")
5523                .get(0);
5524            assert_eq!(pending_exists, 1);
5525
5526            let _ = client
5527                .batch_execute(&format!(
5528                    "DROP TABLE IF EXISTS \"{applied_table}\" CASCADE; \
5529                     DROP TABLE IF EXISTS \"{pending_table}\" CASCADE; \
5530                     DROP SCHEMA IF EXISTS \"{migration_schema}\" CASCADE;"
5531                ))
5532                .await;
5533        });
5534    }
5535}