1use std::path::Path;
7
8#[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
9use crate::config::PostgresCreds;
10use crate::config::{Credentials, Dialect, Extension, IntrospectCasing};
11use crate::error::CliError;
12use crate::output;
13#[cfg(any(
14 test,
15 feature = "rusqlite",
16 feature = "libsql",
17 feature = "turso",
18 feature = "postgres-sync",
19 feature = "tokio-postgres",
20 feature = "d1-http",
21))]
22use drizzle_migrations::Migrations;
23use drizzle_migrations::schema::Snapshot;
24
25#[cfg(feature = "d1-http")]
26mod d1_http;
27
28#[derive(Debug)]
30pub struct MigrationResult {
31 pub applied_count: usize,
33 pub applied_migrations: Vec<String>,
35}
36
37#[derive(Debug, Clone)]
39pub struct MigrationPlan {
40 pub applied_count: usize,
42 pub pending_count: usize,
44 pub pending_migrations: Vec<String>,
46 pub pending_statements: usize,
48}
49
50#[cfg(any(
51 test,
52 feature = "rusqlite",
53 feature = "libsql",
54 feature = "turso",
55 feature = "postgres-sync",
56 feature = "tokio-postgres",
57 feature = "d1-http",
58))]
59#[derive(Debug, Clone)]
60pub(crate) struct AppliedMigrationRecord {
61 pub(crate) hash: String,
62 pub(crate) name: String,
63}
64
65#[derive(Debug, Clone)]
67pub struct PushPlan {
68 pub sql_statements: Vec<String>,
69 pub warnings: Vec<String>,
70 pub destructive: bool,
71}
72
73#[derive(Debug, Clone, Default)]
75pub struct SnapshotFilters {
76 pub tables: Option<Vec<String>>,
77 pub schemas: Option<Vec<String>>,
78 pub extensions: Option<Vec<Extension>>,
79}
80
81impl SnapshotFilters {
82 const fn is_empty(&self) -> bool {
83 self.tables.is_none() && self.schemas.is_none() && self.extensions.is_none()
84 }
85}
86
87pub fn plan_push(
94 credentials: &Credentials,
95 dialect: Dialect,
96 desired: &Snapshot,
97 breakpoints: bool,
98 filters: &SnapshotFilters,
99) -> Result<PushPlan, CliError> {
100 let mut current = introspect_database(credentials, dialect)?.snapshot;
101 apply_snapshot_filters(&mut current, dialect, filters)?;
102 let (sql_statements, warnings) = generate_push_sql(¤t, desired, breakpoints)?;
103 let destructive = sql_statements.iter().any(|s| is_destructive_statement(s));
104
105 Ok(PushPlan {
106 sql_statements,
107 warnings,
108 destructive,
109 })
110}
111
112pub fn apply_push(
120 credentials: &Credentials,
121 dialect: Dialect,
122 plan: &PushPlan,
123 force: bool,
124) -> Result<(), CliError> {
125 if plan.sql_statements.is_empty() {
126 return Ok(());
127 }
128
129 if plan.destructive && !force {
130 let confirmed = confirm_destructive()?;
131 if !confirmed {
132 return Ok(());
133 }
134 }
135
136 execute_statements(credentials, dialect, &plan.sql_statements)
137}
138
139#[allow(unused_variables)] pub fn plan_migrations(
151 credentials: &Credentials,
152 dialect: Dialect,
153 migrations_dir: &Path,
154 migrations_table: &str,
155 migrations_schema: &str,
156) -> Result<MigrationPlan, CliError> {
157 #[cfg(any(
158 feature = "rusqlite",
159 feature = "libsql",
160 feature = "turso",
161 feature = "postgres-sync",
162 feature = "tokio-postgres",
163 feature = "d1-http",
164 ))]
165 let set = load_migration_set(dialect, migrations_dir, migrations_table, migrations_schema)?;
166
167 match credentials {
168 #[cfg(feature = "rusqlite")]
169 Credentials::Sqlite { path } => inspect_sqlite_migrations(&set, path),
170
171 #[cfg(not(feature = "rusqlite"))]
172 Credentials::Sqlite { .. } => Err(CliError::MissingDriver {
173 dialect: "SQLite",
174 feature: "rusqlite",
175 }),
176
177 #[cfg(any(feature = "libsql", feature = "turso"))]
178 Credentials::Turso { url, auth_token } => {
179 if is_local_libsql(url) {
180 #[cfg(feature = "libsql")]
181 {
182 inspect_libsql_local_migrations(&set, url)
183 }
184 #[cfg(not(feature = "libsql"))]
185 {
186 Err(CliError::MissingDriver {
187 dialect: "LibSQL (local)",
188 feature: "libsql",
189 })
190 }
191 } else {
192 #[cfg(feature = "turso")]
193 {
194 inspect_turso_migrations(&set, url, auth_token.as_deref())
195 }
196 #[cfg(not(feature = "turso"))]
197 {
198 let _ = auth_token;
199 Err(CliError::MissingDriver {
200 dialect: "Turso (remote)",
201 feature: "turso",
202 })
203 }
204 }
205 }
206
207 #[cfg(all(not(feature = "turso"), not(feature = "libsql")))]
208 Credentials::Turso { .. } => Err(CliError::MissingDriver {
209 dialect: "Turso",
210 feature: "turso or libsql",
211 }),
212
213 Credentials::Postgres(creds) => {
214 let _ = creds;
215 core::cfg_select! {
216 feature = "postgres-sync" => inspect_postgres_sync_migrations(&set, creds),
217 feature = "tokio-postgres" => inspect_postgres_async_migrations(&set, creds),
218 _ => Err(CliError::MissingDriver {
219 dialect: "PostgreSQL",
220 feature: "postgres-sync or tokio-postgres",
221 }),
222 }
223 }
224
225 #[cfg(feature = "d1-http")]
226 Credentials::D1 {
227 account_id,
228 database_id,
229 token,
230 } => d1_http::inspect_migrations(&set, account_id, database_id, token),
231
232 #[cfg(not(feature = "d1-http"))]
233 Credentials::D1 { .. } => Err(CliError::MissingDriver {
234 dialect: "Cloudflare D1 (HTTP)",
235 feature: "d1-http",
236 }),
237
238 Credentials::AwsDataApi { .. } => Err(CliError::UnsupportedForDriver {
239 operation: "Migration planning against AWS Data API",
240 driver: "aws-data-api",
241 hint: "AWS RDS Data API schema ops are not yet wired into this CLI. For now, \
242 run the generated SQL with `aws rds-data execute-statement --sql=...` \
243 (or use tokio-postgres with a temporary direct connection).",
244 }),
245 }
246}
247
248pub fn verify_migrations(
256 credentials: &Credentials,
257 dialect: Dialect,
258 migrations_dir: &Path,
259 migrations_table: &str,
260 migrations_schema: &str,
261) -> Result<MigrationPlan, CliError> {
262 plan_migrations(
263 credentials,
264 dialect,
265 migrations_dir,
266 migrations_table,
267 migrations_schema,
268 )
269}
270
271#[allow(unused_variables)] pub fn run_migrations(
281 credentials: &Credentials,
282 dialect: Dialect,
283 migrations_dir: &Path,
284 migrations_table: &str,
285 migrations_schema: &str,
286) -> Result<MigrationResult, CliError> {
287 #[cfg(any(
288 feature = "rusqlite",
289 feature = "libsql",
290 feature = "turso",
291 feature = "postgres-sync",
292 feature = "tokio-postgres",
293 feature = "d1-http",
294 ))]
295 let set = load_migration_set(dialect, migrations_dir, migrations_table, migrations_schema)?;
296
297 match credentials {
298 #[cfg(feature = "rusqlite")]
299 Credentials::Sqlite { path } => run_sqlite_migrations(&set, path),
300
301 #[cfg(not(feature = "rusqlite"))]
302 Credentials::Sqlite { .. } => Err(CliError::MissingDriver {
303 dialect: "SQLite",
304 feature: "rusqlite",
305 }),
306
307 #[cfg(any(feature = "libsql", feature = "turso"))]
308 Credentials::Turso { url, auth_token } => {
309 if is_local_libsql(url) {
310 #[cfg(feature = "libsql")]
311 {
312 run_libsql_local_migrations(&set, url)
313 }
314 #[cfg(not(feature = "libsql"))]
315 {
316 Err(CliError::MissingDriver {
317 dialect: "LibSQL (local)",
318 feature: "libsql",
319 })
320 }
321 } else {
322 #[cfg(feature = "turso")]
323 {
324 run_turso_migrations(&set, url, auth_token.as_deref())
325 }
326 #[cfg(not(feature = "turso"))]
327 {
328 let _ = auth_token;
329 Err(CliError::MissingDriver {
330 dialect: "Turso (remote)",
331 feature: "turso",
332 })
333 }
334 }
335 }
336
337 #[cfg(all(not(feature = "turso"), not(feature = "libsql")))]
338 Credentials::Turso { .. } => Err(CliError::MissingDriver {
339 dialect: "Turso",
340 feature: "turso or libsql",
341 }),
342
343 Credentials::Postgres(creds) => {
345 let _ = creds;
346 core::cfg_select! {
347 feature = "postgres-sync" => run_postgres_sync_migrations(&set, creds),
348 feature = "tokio-postgres" => run_postgres_async_migrations(&set, creds),
349 _ => Err(CliError::MissingDriver {
350 dialect: "PostgreSQL",
351 feature: "postgres-sync or tokio-postgres",
352 }),
353 }
354 }
355
356 #[cfg(feature = "d1-http")]
357 Credentials::D1 {
358 account_id,
359 database_id,
360 token,
361 } => d1_http::run_migrations(&set, account_id, database_id, token),
362
363 #[cfg(not(feature = "d1-http"))]
364 Credentials::D1 { .. } => Err(CliError::MissingDriver {
365 dialect: "Cloudflare D1 (HTTP)",
366 feature: "d1-http",
367 }),
368
369 Credentials::AwsDataApi { .. } => Err(CliError::UnsupportedForDriver {
370 operation: "Running migrations against AWS Data API",
371 driver: "aws-data-api",
372 hint: "AWS RDS Data API migrations are not yet wired into this CLI. Run the \
373 generated SQL via `aws rds-data execute-statement` or connect directly \
374 with tokio-postgres.",
375 }),
376 }
377}
378
379#[cfg(any(
380 feature = "rusqlite",
381 feature = "libsql",
382 feature = "turso",
383 feature = "postgres-sync",
384 feature = "tokio-postgres",
385 feature = "d1-http",
386))]
387fn load_migration_set(
388 dialect: Dialect,
389 migrations_dir: &Path,
390 migrations_table: &str,
391 migrations_schema: &str,
392) -> Result<Migrations, CliError> {
393 let tracking = migration_tracking(dialect, migrations_table, migrations_schema);
394
395 let migrations = drizzle_migrations::MigrationDir::new(migrations_dir)
397 .discover()
398 .map_err(|e| CliError::Other(format!("Failed to load migrations: {e}")))?;
399 Ok(Migrations::with_tracking(
400 migrations,
401 dialect.to_base(),
402 tracking,
403 ))
404}
405
406#[cfg(any(
407 feature = "rusqlite",
408 feature = "libsql",
409 feature = "turso",
410 feature = "postgres-sync",
411 feature = "tokio-postgres",
412 feature = "d1-http",
413))]
414fn migration_tracking(
415 dialect: Dialect,
416 migrations_table: &str,
417 migrations_schema: &str,
418) -> drizzle_types::MigrationTracking {
419 let mut tracking = match dialect {
420 Dialect::Postgresql => drizzle_types::MigrationTracking::POSTGRES,
421 _ => drizzle_types::MigrationTracking::SQLITE,
422 };
423
424 if !migrations_table.trim().is_empty() {
425 tracking = tracking.table(migrations_table.to_owned());
426 }
427
428 if dialect == Dialect::Postgresql && !migrations_schema.trim().is_empty() {
429 tracking = tracking.schema(migrations_schema.to_owned());
430 }
431
432 tracking
433}
434
435#[cfg(any(
436 test,
437 feature = "rusqlite",
438 feature = "libsql",
439 feature = "turso",
440 feature = "postgres-sync",
441 feature = "tokio-postgres",
442 feature = "d1-http",
443))]
444pub(crate) fn build_migration_plan(
445 set: &Migrations,
446 applied: &[AppliedMigrationRecord],
447) -> Result<MigrationPlan, CliError> {
448 verify_applied_migrations_consistency(set, applied)?;
449
450 let applied_names = applied.iter().map(|m| m.name.clone()).collect::<Vec<_>>();
451 let pending = set.pending(&applied_names).collect::<Vec<_>>();
452
453 let pending_statements = pending
454 .iter()
455 .map(|m| {
456 m.statements()
457 .iter()
458 .filter(|stmt| !stmt.trim().is_empty())
459 .count()
460 })
461 .sum();
462
463 Ok(MigrationPlan {
464 applied_count: applied.len(),
465 pending_count: pending.len(),
466 pending_migrations: pending.iter().map(|m| m.tag().to_string()).collect(),
467 pending_statements,
468 })
469}
470
471#[cfg(any(
472 test,
473 feature = "rusqlite",
474 feature = "libsql",
475 feature = "turso",
476 feature = "postgres-sync",
477 feature = "tokio-postgres",
478 feature = "d1-http",
479))]
480fn verify_applied_migrations_consistency(
481 set: &Migrations,
482 applied: &[AppliedMigrationRecord],
483) -> Result<(), CliError> {
484 use std::collections::{HashMap, HashSet};
485
486 let mut local_by_name = HashMap::<&str, &str>::new();
487 for migration in set.all() {
488 if local_by_name
489 .insert(migration.name(), migration.hash())
490 .is_some()
491 {
492 return Err(CliError::MigrationError(format!(
493 "Local migrations contain duplicate name: {}",
494 migration.name()
495 )));
496 }
497 }
498
499 let mut seen_db_names = HashSet::<&str>::new();
500 for applied_row in applied {
501 if !seen_db_names.insert(applied_row.name.as_str()) {
502 return Err(CliError::MigrationError(format!(
503 "Database migration metadata contains duplicate name: {}",
504 applied_row.name
505 )));
506 }
507
508 let Some(local_hash) = local_by_name.get(applied_row.name.as_str()) else {
509 return Err(CliError::MigrationError(format!(
510 "Database contains applied migration not found locally (name: {})",
511 applied_row.name
512 )));
513 };
514
515 if *local_hash != applied_row.hash {
516 return Err(CliError::MigrationError(format!(
517 "Migration hash mismatch for {}: database={}, local={}",
518 applied_row.name, applied_row.hash, local_hash
519 )));
520 }
521 }
522
523 Ok(())
524}
525
526#[cfg(any(
527 feature = "rusqlite",
528 feature = "libsql",
529 feature = "turso",
530 feature = "postgres-sync",
531 feature = "tokio-postgres",
532))]
533fn escape_sql_literal(value: &str) -> String {
534 value.replace('\'', "''")
535}
536
537#[cfg(feature = "rusqlite")]
538fn ensure_sqlite_tracking_table(
539 conn: &rusqlite::Connection,
540 set: &Migrations,
541) -> Result<(), CliError> {
542 conn.execute(&set.create_table_sql(), [])
543 .map_err(|e| CliError::MigrationError(format!("Failed to create migrations table: {e}")))?;
544
545 let pragma_sql = format!(
546 "SELECT name FROM pragma_table_info('{}')",
547 escape_sql_literal(set.table_name())
548 );
549 let mut stmt = conn
550 .prepare(&pragma_sql)
551 .map_err(|e| CliError::MigrationError(e.to_string()))?;
552 let columns = stmt
553 .query_map([], |row| row.get::<_, String>(0))
554 .map_err(|e| CliError::MigrationError(e.to_string()))?
555 .collect::<Result<Vec<_>, _>>()
556 .map_err(|e| CliError::MigrationError(e.to_string()))?;
557 if columns.iter().any(|column| column == "name") {
558 return Ok(());
559 }
560
561 let mut stmt = conn
562 .prepare(&format!(
563 "SELECT id, hash, created_at FROM {} ORDER BY id ASC",
564 set.table_ident_sql()
565 ))
566 .map_err(|e| CliError::MigrationError(e.to_string()))?;
567 let applied = stmt
568 .query_map([], |row| {
569 Ok(drizzle_migrations::AppliedMigrationMetadata {
570 id: row.get::<_, Option<i64>>(0)?,
571 hash: row.get::<_, String>(1)?,
572 created_at: row.get::<_, i64>(2)?,
573 })
574 })
575 .map_err(|e| CliError::MigrationError(e.to_string()))?
576 .collect::<Result<Vec<_>, _>>()
577 .map_err(|e| CliError::MigrationError(e.to_string()))?;
578
579 let matched = drizzle_migrations::match_applied_migration_metadata(set.all(), &applied)
580 .map_err(|e| CliError::MigrationError(e.to_string()))?;
581
582 conn.execute("BEGIN", [])
583 .map_err(|e| CliError::MigrationError(e.to_string()))?;
584 let result = (|| -> Result<(), CliError> {
585 conn.execute(
586 &format!(
587 "ALTER TABLE {} ADD COLUMN \"name\" text",
588 set.table_ident_sql()
589 ),
590 [],
591 )
592 .map_err(|e| CliError::MigrationError(e.to_string()))?;
593 conn.execute(
594 &format!(
595 "ALTER TABLE {} ADD COLUMN \"applied_at\" TEXT",
596 set.table_ident_sql()
597 ),
598 [],
599 )
600 .map_err(|e| CliError::MigrationError(e.to_string()))?;
601
602 for row in matched {
603 let where_clause = if let Some(id) = row.id {
604 format!("\"id\" = {id}")
605 } else {
606 format!(
607 "\"created_at\" = {} AND \"hash\" = '{}'",
608 row.created_at,
609 escape_sql_literal(&row.hash)
610 )
611 };
612 conn.execute(
613 &format!(
614 "UPDATE {} SET \"name\" = '{}', \"applied_at\" = NULL WHERE {}",
615 set.table_ident_sql(),
616 escape_sql_literal(&row.name),
617 where_clause
618 ),
619 [],
620 )
621 .map_err(|e| CliError::MigrationError(e.to_string()))?;
622 }
623
624 Ok(())
625 })();
626
627 match result {
628 Ok(()) => {
629 conn.execute("COMMIT", [])
630 .map_err(|e| CliError::MigrationError(e.to_string()))?;
631 Ok(())
632 }
633 Err(err) => {
634 let _ = conn.execute("ROLLBACK", []);
635 Err(err)
636 }
637 }
638}
639
640#[cfg(feature = "libsql")]
641async fn ensure_sqlite_tracking_table_libsql(
642 conn: &libsql::Connection,
643 set: &Migrations,
644) -> Result<(), CliError> {
645 conn.execute(&set.create_table_sql(), ())
646 .await
647 .map_err(|e| CliError::MigrationError(format!("Failed to create migrations table: {e}")))?;
648
649 let pragma_sql = format!(
650 "SELECT name FROM pragma_table_info('{}')",
651 escape_sql_literal(set.table_name())
652 );
653 let mut rows = conn
654 .query(&pragma_sql, ())
655 .await
656 .map_err(|e| CliError::MigrationError(e.to_string()))?;
657 let mut has_name = false;
658 while let Ok(Some(row)) = rows.next().await {
659 if let Ok(name) = row.get::<String>(0)
660 && name == "name"
661 {
662 has_name = true;
663 break;
664 }
665 }
666 if has_name {
667 return Ok(());
668 }
669
670 let mut rows = conn
671 .query(
672 &format!(
673 "SELECT id, hash, created_at FROM {} ORDER BY id ASC",
674 set.table_ident_sql()
675 ),
676 (),
677 )
678 .await
679 .map_err(|e| CliError::MigrationError(e.to_string()))?;
680 let mut applied = Vec::new();
681 while let Ok(Some(row)) = rows.next().await {
682 let hash = row
683 .get::<String>(1)
684 .map_err(|e| CliError::MigrationError(e.to_string()))?;
685 let created_at = row
686 .get::<i64>(2)
687 .map_err(|e| CliError::MigrationError(e.to_string()))?;
688 applied.push(drizzle_migrations::AppliedMigrationMetadata {
689 id: row.get::<Option<i64>>(0).ok().flatten(),
690 hash,
691 created_at,
692 });
693 }
694
695 let matched = drizzle_migrations::match_applied_migration_metadata(set.all(), &applied)
696 .map_err(|e| CliError::MigrationError(e.to_string()))?;
697
698 let tx = conn
699 .transaction()
700 .await
701 .map_err(|e| CliError::MigrationError(e.to_string()))?;
702 tx.execute(
703 &format!(
704 "ALTER TABLE {} ADD COLUMN \"name\" text",
705 set.table_ident_sql()
706 ),
707 (),
708 )
709 .await
710 .map_err(|e| CliError::MigrationError(e.to_string()))?;
711 tx.execute(
712 &format!(
713 "ALTER TABLE {} ADD COLUMN \"applied_at\" TEXT",
714 set.table_ident_sql()
715 ),
716 (),
717 )
718 .await
719 .map_err(|e| CliError::MigrationError(e.to_string()))?;
720
721 for row in matched {
722 let where_clause = if let Some(id) = row.id {
723 format!("\"id\" = {id}")
724 } else {
725 format!(
726 "\"created_at\" = {} AND \"hash\" = '{}'",
727 row.created_at,
728 escape_sql_literal(&row.hash)
729 )
730 };
731 tx.execute(
732 &format!(
733 "UPDATE {} SET \"name\" = '{}', \"applied_at\" = NULL WHERE {}",
734 set.table_ident_sql(),
735 escape_sql_literal(&row.name),
736 where_clause
737 ),
738 (),
739 )
740 .await
741 .map_err(|e| CliError::MigrationError(e.to_string()))?;
742 }
743
744 tx.commit()
745 .await
746 .map_err(|e| CliError::MigrationError(e.to_string()))?;
747 Ok(())
748}
749
750#[cfg(feature = "postgres-sync")]
751fn ensure_postgres_tracking_table_sync(
752 client: &mut postgres::Client,
753 set: &Migrations,
754) -> Result<(), CliError> {
755 client
756 .execute(&set.create_table_sql(), &[])
757 .map_err(|e| CliError::MigrationError(format!("Failed to create migrations table: {e}")))?;
758
759 let schema = set.schema_name().unwrap_or("public");
760 let rows = client
761 .query(
762 "SELECT column_name FROM information_schema.columns WHERE table_schema = $1 AND table_name = $2",
763 &[&schema, &set.table_name()],
764 )
765 .map_err(|e| CliError::MigrationError(e.to_string()))?;
766 if rows
767 .iter()
768 .filter_map(|row| row.try_get::<_, String>(0).ok())
769 .any(|column| column == "name")
770 {
771 return Ok(());
772 }
773
774 let rows = client
775 .query(
776 &format!(
777 "SELECT id, hash, created_at FROM {} ORDER BY id ASC",
778 set.table_ident_sql()
779 ),
780 &[],
781 )
782 .map_err(|e| CliError::MigrationError(e.to_string()))?;
783 let applied = rows
784 .iter()
785 .map(|row| {
786 Ok(drizzle_migrations::AppliedMigrationMetadata {
787 id: row.try_get::<_, Option<i64>>(0).ok().flatten(),
788 hash: row.try_get::<_, String>(1)?,
789 created_at: row.try_get::<_, i64>(2)?,
790 })
791 })
792 .collect::<Result<Vec<_>, postgres::Error>>()
793 .map_err(|e| CliError::MigrationError(e.to_string()))?;
794
795 let matched = drizzle_migrations::match_applied_migration_metadata(set.all(), &applied)
796 .map_err(|e| CliError::MigrationError(e.to_string()))?;
797
798 client
799 .execute(
800 &format!(
801 "ALTER TABLE {} ADD COLUMN \"name\" TEXT",
802 set.table_ident_sql()
803 ),
804 &[],
805 )
806 .map_err(|e| CliError::MigrationError(e.to_string()))?;
807 client
808 .execute(
809 &format!(
810 "ALTER TABLE {} ADD COLUMN \"applied_at\" TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP",
811 set.table_ident_sql()
812 ),
813 &[],
814 )
815 .map_err(|e| CliError::MigrationError(e.to_string()))?;
816
817 for row in matched {
818 let where_clause = if let Some(id) = row.id {
819 format!("\"id\" = {id}")
820 } else {
821 format!(
822 "\"created_at\" = {} AND \"hash\" = '{}'",
823 row.created_at,
824 escape_sql_literal(&row.hash)
825 )
826 };
827 client
828 .execute(
829 &format!(
830 "UPDATE {} SET \"name\" = '{}', \"applied_at\" = NULL WHERE {}",
831 set.table_ident_sql(),
832 escape_sql_literal(&row.name),
833 where_clause
834 ),
835 &[],
836 )
837 .map_err(|e| CliError::MigrationError(e.to_string()))?;
838 }
839
840 Ok(())
841}
842
843#[cfg(feature = "tokio-postgres")]
844async fn ensure_postgres_tracking_table_async(
845 client: &tokio_postgres::Client,
846 set: &Migrations,
847) -> Result<(), CliError> {
848 client
849 .execute(&set.create_table_sql(), &[])
850 .await
851 .map_err(|e| CliError::MigrationError(format!("Failed to create migrations table: {e}")))?;
852
853 let schema = set.schema_name().unwrap_or("public");
854 let rows = client
855 .query(
856 "SELECT column_name FROM information_schema.columns WHERE table_schema = $1 AND table_name = $2",
857 &[&schema, &set.table_name()],
858 )
859 .await
860 .map_err(|e| CliError::MigrationError(e.to_string()))?;
861 if rows
862 .iter()
863 .filter_map(|row| row.try_get::<_, String>(0).ok())
864 .any(|column| column == "name")
865 {
866 return Ok(());
867 }
868
869 let rows = client
870 .query(
871 &format!(
872 "SELECT id, hash, created_at FROM {} ORDER BY id ASC",
873 set.table_ident_sql()
874 ),
875 &[],
876 )
877 .await
878 .map_err(|e| CliError::MigrationError(e.to_string()))?;
879 let applied = rows
880 .iter()
881 .map(|row| {
882 Ok(drizzle_migrations::AppliedMigrationMetadata {
883 id: row.try_get::<_, Option<i64>>(0).ok().flatten(),
884 hash: row.try_get::<_, String>(1)?,
885 created_at: row.try_get::<_, i64>(2)?,
886 })
887 })
888 .collect::<Result<Vec<_>, tokio_postgres::Error>>()
889 .map_err(|e| CliError::MigrationError(e.to_string()))?;
890
891 let matched = drizzle_migrations::match_applied_migration_metadata(set.all(), &applied)
892 .map_err(|e| CliError::MigrationError(e.to_string()))?;
893
894 client
895 .execute(
896 &format!(
897 "ALTER TABLE {} ADD COLUMN \"name\" TEXT",
898 set.table_ident_sql()
899 ),
900 &[],
901 )
902 .await
903 .map_err(|e| CliError::MigrationError(e.to_string()))?;
904 client
905 .execute(
906 &format!(
907 "ALTER TABLE {} ADD COLUMN \"applied_at\" TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP",
908 set.table_ident_sql()
909 ),
910 &[],
911 )
912 .await
913 .map_err(|e| CliError::MigrationError(e.to_string()))?;
914
915 for row in matched {
916 let where_clause = if let Some(id) = row.id {
917 format!("\"id\" = {id}")
918 } else {
919 format!(
920 "\"created_at\" = {} AND \"hash\" = '{}'",
921 row.created_at,
922 escape_sql_literal(&row.hash)
923 )
924 };
925 client
926 .execute(
927 &format!(
928 "UPDATE {} SET \"name\" = '{}', \"applied_at\" = NULL WHERE {}",
929 set.table_ident_sql(),
930 escape_sql_literal(&row.name),
931 where_clause
932 ),
933 &[],
934 )
935 .await
936 .map_err(|e| CliError::MigrationError(e.to_string()))?;
937 }
938
939 Ok(())
940}
941
942fn is_destructive_statement(sql: &str) -> bool {
943 let s = sql.trim().to_uppercase();
944 s.contains("DROP TABLE")
945 || s.contains("DROP COLUMN")
946 || s.contains("DROP INDEX")
947 || s.contains("DROP VIEW")
948 || s.contains("DROP MATERIALIZED VIEW")
949 || s.contains("DROP TYPE")
950 || s.contains("DROP SCHEMA")
951 || s.contains("DROP SEQUENCE")
952 || s.contains("DROP ROLE")
953 || s.contains("DROP POLICY")
954 || s.contains("TRUNCATE")
955 || (s.contains("ALTER TABLE") && s.contains(" DROP "))
956}
957
958#[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
959fn is_postgres_concurrent_index_statement(sql: &str) -> bool {
960 let s = sql.trim().to_ascii_uppercase();
961 (s.starts_with("CREATE") || s.starts_with("DROP")) && s.contains("INDEX CONCURRENTLY")
962}
963
964#[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
965fn has_postgres_concurrent_index(statements: &[String]) -> bool {
966 statements
967 .iter()
968 .any(|stmt| is_postgres_concurrent_index_statement(stmt))
969}
970
971fn confirm_destructive() -> Result<bool, CliError> {
972 use std::io::{self, IsTerminal, Write};
973
974 if !io::stdin().is_terminal() {
975 return Err(CliError::Other(
976 "Refusing to apply potentially destructive changes in non-interactive mode. Use --explain or --force."
977 .into(),
978 ));
979 }
980
981 println!(
982 "{}",
983 output::warning("Potentially destructive changes detected (DROP/TRUNCATE/etc).")
984 );
985 print!("Apply anyway? [y/N]: ");
986 io::stdout()
987 .flush()
988 .map_err(|e| CliError::IoError(e.to_string()))?;
989
990 let mut line = String::new();
991 io::stdin()
992 .read_line(&mut line)
993 .map_err(|e| CliError::IoError(e.to_string()))?;
994 let ans = line.trim().to_ascii_lowercase();
995 Ok(ans == "y" || ans == "yes")
996}
997
998fn generate_push_sql(
999 current: &Snapshot,
1000 desired: &Snapshot,
1001 breakpoints: bool,
1002) -> Result<(Vec<String>, Vec<String>), CliError> {
1003 match (current, desired) {
1004 (Snapshot::Sqlite(prev_snap), Snapshot::Sqlite(curr_snap)) => {
1005 use drizzle_migrations::sqlite::collection::SQLiteDDL;
1006 use drizzle_migrations::sqlite::diff::compute_migration;
1007
1008 let prev_ddl = SQLiteDDL::from_entities(prev_snap.ddl.clone());
1009 let cur_ddl = SQLiteDDL::from_entities(curr_snap.ddl.clone());
1010
1011 let diff = compute_migration(&prev_ddl, &cur_ddl);
1012 Ok((diff.sql_statements, diff.warnings))
1013 }
1014 (Snapshot::Postgres(prev_snap), Snapshot::Postgres(curr_snap)) => {
1015 use drizzle_migrations::postgres::diff_full_snapshots;
1016 use drizzle_migrations::postgres::statements::PostgresGenerator;
1017
1018 let diff = diff_full_snapshots(prev_snap, curr_snap);
1019 let generator = PostgresGenerator::new().with_breakpoints(breakpoints);
1020 Ok((generator.generate(&diff.diffs), Vec::new()))
1021 }
1022 _ => Err(CliError::DialectMismatch),
1023 }
1024}
1025
1026fn execute_statements(
1027 credentials: &Credentials,
1028 _dialect: Dialect,
1029 statements: &[String],
1030) -> Result<(), CliError> {
1031 let _ = statements;
1034
1035 match credentials {
1036 #[cfg(feature = "rusqlite")]
1037 Credentials::Sqlite { path } => execute_sqlite_statements(path, statements),
1038
1039 #[cfg(not(feature = "rusqlite"))]
1040 Credentials::Sqlite { .. } => Err(CliError::MissingDriver {
1041 dialect: "SQLite",
1042 feature: "rusqlite",
1043 }),
1044
1045 #[cfg(any(feature = "libsql", feature = "turso"))]
1046 Credentials::Turso { url, auth_token } => {
1047 if is_local_libsql(url) {
1048 #[cfg(feature = "libsql")]
1049 {
1050 execute_libsql_local_statements(url, statements)
1051 }
1052 #[cfg(not(feature = "libsql"))]
1053 {
1054 Err(CliError::MissingDriver {
1055 dialect: "LibSQL (local)",
1056 feature: "libsql",
1057 })
1058 }
1059 } else {
1060 #[cfg(feature = "turso")]
1061 {
1062 execute_turso_statements(url, auth_token.as_deref(), statements)
1063 }
1064 #[cfg(not(feature = "turso"))]
1065 {
1066 let _ = auth_token;
1067 Err(CliError::MissingDriver {
1068 dialect: "Turso (remote)",
1069 feature: "turso",
1070 })
1071 }
1072 }
1073 }
1074
1075 #[cfg(all(not(feature = "turso"), not(feature = "libsql")))]
1076 Credentials::Turso { .. } => Err(CliError::MissingDriver {
1077 dialect: "Turso",
1078 feature: "turso or libsql",
1079 }),
1080
1081 Credentials::Postgres(creds) => {
1082 let _ = (creds, &statements);
1083 core::cfg_select! {
1084 feature = "postgres-sync" => execute_postgres_sync_statements(creds, statements),
1085 feature = "tokio-postgres" => execute_postgres_async_statements(creds, statements),
1086 _ => Err(CliError::MissingDriver {
1087 dialect: "PostgreSQL",
1088 feature: "postgres-sync or tokio-postgres",
1089 }),
1090 }
1091 }
1092
1093 #[cfg(feature = "d1-http")]
1094 Credentials::D1 {
1095 account_id,
1096 database_id,
1097 token,
1098 } => d1_http::execute_statements(account_id, database_id, token, statements),
1099
1100 #[cfg(not(feature = "d1-http"))]
1101 Credentials::D1 { .. } => Err(CliError::MissingDriver {
1102 dialect: "Cloudflare D1 (HTTP)",
1103 feature: "d1-http",
1104 }),
1105
1106 Credentials::AwsDataApi { .. } => Err(CliError::UnsupportedForDriver {
1107 operation: "Direct SQL execution against AWS Data API",
1108 driver: "aws-data-api",
1109 hint: "Use `aws rds-data execute-statement --sql=\"...\"` with the matching \
1110 --resource-arn / --secret-arn / --database, or connect directly via \
1111 tokio-postgres.",
1112 }),
1113 }
1114}
1115
1116#[allow(dead_code)]
1118fn is_local_libsql(url: &str) -> bool {
1119 url.starts_with("file:")
1120 || url.starts_with("./")
1121 || url.starts_with('/')
1122 || !url.contains("://")
1123}
1124
1125#[cfg(any(feature = "rusqlite", feature = "libsql", feature = "turso"))]
1126fn process_sqlite_uniques_from_indexes(
1127 raw_indexes: &[drizzle_migrations::sqlite::introspect::RawIndexInfo],
1128 index_columns: &[drizzle_migrations::sqlite::introspect::RawIndexColumn],
1129) -> Vec<drizzle_migrations::sqlite::UniqueConstraint> {
1130 use drizzle_migrations::sqlite::UniqueConstraint;
1131
1132 let mut uniques = Vec::new();
1133
1134 for idx in raw_indexes.iter().filter(|i| i.origin == "u") {
1135 let mut cols: Vec<(i32, String)> = index_columns
1136 .iter()
1137 .filter(|c| c.index_name == idx.name && c.key)
1138 .filter_map(|c| c.name.clone().map(|n| (c.seqno, n)))
1139 .collect();
1140 cols.sort_by_key(|(seq, _)| *seq);
1141 let col_names: Vec<String> = cols.into_iter().map(|(_, n)| n).collect();
1142 if col_names.is_empty() {
1143 continue;
1144 }
1145
1146 let name_explicit = !idx.name.starts_with("sqlite_autoindex_");
1147 let constraint_name = if name_explicit {
1148 idx.name.clone()
1149 } else {
1150 let refs: Vec<&str> = col_names.iter().map(String::as_str).collect();
1151 drizzle_migrations::sqlite::ddl::name_for_unique(&idx.table, &refs)
1152 };
1153
1154 let mut uniq =
1155 UniqueConstraint::from_strings(idx.table.clone(), constraint_name, col_names);
1156 uniq.name_explicit = name_explicit;
1157 uniques.push(uniq);
1158 }
1159
1160 uniques
1161}
1162
1163#[cfg(feature = "rusqlite")]
1168fn execute_sqlite_statements(path: &str, statements: &[String]) -> Result<(), CliError> {
1169 let conn = rusqlite::Connection::open(path).map_err(|e| {
1170 CliError::ConnectionError(format!("Failed to open SQLite database '{path}': {e}"))
1171 })?;
1172
1173 conn.execute("BEGIN", [])
1174 .map_err(|e| CliError::MigrationError(e.to_string()))?;
1175
1176 for stmt in statements {
1177 let s = stmt.trim();
1178 if s.is_empty() {
1179 continue;
1180 }
1181 if let Err(e) = conn.execute(s, []) {
1182 let _ = conn.execute("ROLLBACK", []);
1183 return Err(CliError::MigrationError(format!(
1184 "Statement failed: {e}\n{s}"
1185 )));
1186 }
1187 }
1188
1189 conn.execute("COMMIT", [])
1190 .map_err(|e| CliError::MigrationError(e.to_string()))?;
1191
1192 Ok(())
1193}
1194
1195#[cfg(feature = "rusqlite")]
1196fn run_sqlite_migrations(set: &Migrations, path: &str) -> Result<MigrationResult, CliError> {
1197 let conn = rusqlite::Connection::open(path).map_err(|e| {
1198 CliError::ConnectionError(format!("Failed to open SQLite database '{path}': {e}"))
1199 })?;
1200
1201 ensure_sqlite_tracking_table(&conn, set)?;
1202
1203 let applied_names = query_applied_names_sqlite(&conn, set)?;
1205
1206 let pending: Vec<_> = set.pending(&applied_names).collect();
1208 if pending.is_empty() {
1209 return Ok(MigrationResult {
1210 applied_count: 0,
1211 applied_migrations: vec![],
1212 });
1213 }
1214
1215 conn.execute("BEGIN", [])
1217 .map_err(|e| CliError::MigrationError(e.to_string()))?;
1218
1219 let mut applied = Vec::new();
1220 for migration in &pending {
1221 for stmt in migration.statements() {
1222 if !stmt.trim().is_empty()
1223 && let Err(e) = conn.execute(stmt, [])
1224 {
1225 let _ = conn.execute("ROLLBACK", []);
1226 return Err(CliError::MigrationError(format!(
1227 "Migration '{}' failed: {}",
1228 migration.hash(),
1229 e
1230 )));
1231 }
1232 }
1233 if let Err(e) = conn.execute(&set.record_migration_sql(migration), []) {
1234 let _ = conn.execute("ROLLBACK", []);
1235 return Err(CliError::MigrationError(e.to_string()));
1236 }
1237 applied.push(migration.hash().to_string());
1238 }
1239
1240 conn.execute("COMMIT", [])
1241 .map_err(|e| CliError::MigrationError(e.to_string()))?;
1242
1243 Ok(MigrationResult {
1244 applied_count: applied.len(),
1245 applied_migrations: applied,
1246 })
1247}
1248
1249#[cfg(feature = "rusqlite")]
1250fn inspect_sqlite_migrations(set: &Migrations, path: &str) -> Result<MigrationPlan, CliError> {
1251 let conn = rusqlite::Connection::open(path).map_err(|e| {
1252 CliError::ConnectionError(format!("Failed to open SQLite database '{path}': {e}"))
1253 })?;
1254
1255 ensure_sqlite_tracking_table(&conn, set)?;
1256 let applied = query_applied_records_sqlite(&conn, set)?;
1257 build_migration_plan(set, &applied)
1258}
1259
1260#[cfg(feature = "rusqlite")]
1261fn query_applied_records_sqlite(
1262 conn: &rusqlite::Connection,
1263 set: &Migrations,
1264) -> Result<Vec<AppliedMigrationRecord>, CliError> {
1265 let sql = format!(
1266 r#"SELECT hash, "name" FROM {} WHERE "name" IS NOT NULL ORDER BY id;"#,
1267 set.table_ident_sql()
1268 );
1269 let Ok(mut stmt) = conn.prepare(&sql) else {
1270 return Ok(vec![]); };
1272
1273 let mut applied = Vec::new();
1274 let rows = stmt
1275 .query_map([], |row| {
1276 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
1277 })
1278 .map_err(|e| CliError::MigrationError(e.to_string()))?;
1279
1280 for row in rows {
1281 let (hash, name) = row.map_err(|e| CliError::MigrationError(e.to_string()))?;
1282 applied.push(AppliedMigrationRecord { hash, name });
1283 }
1284
1285 Ok(applied)
1286}
1287
1288#[cfg(feature = "rusqlite")]
1289fn query_applied_names_sqlite(
1290 conn: &rusqlite::Connection,
1291 set: &Migrations,
1292) -> Result<Vec<String>, CliError> {
1293 let Ok(mut stmt) = conn.prepare(&set.applied_names_sql()) else {
1294 return Ok(vec![]); };
1296
1297 let names = stmt
1298 .query_map([], |row| row.get::<_, String>(0))
1299 .map_err(|e| CliError::MigrationError(e.to_string()))?
1300 .filter_map(Result::ok)
1301 .collect();
1302
1303 Ok(names)
1304}
1305
1306#[cfg(feature = "postgres-sync")]
1311fn execute_postgres_sync_statements(
1312 creds: &PostgresCreds,
1313 statements: &[String],
1314) -> Result<(), CliError> {
1315 let url = creds.connection_url();
1316 let mut client = postgres::Client::connect(&url, postgres::NoTls)
1317 .map_err(|e| CliError::ConnectionError(format!("Failed to connect to PostgreSQL: {e}")))?;
1318
1319 if has_postgres_concurrent_index(statements) {
1320 for stmt in statements {
1322 let s = stmt.trim();
1323 if s.is_empty() {
1324 continue;
1325 }
1326 client
1327 .execute(s, &[])
1328 .map_err(|e| CliError::MigrationError(format!("Statement failed: {e}\n{s}")))?;
1329 }
1330 } else {
1331 let mut tx = client
1332 .transaction()
1333 .map_err(|e| CliError::MigrationError(e.to_string()))?;
1334
1335 for stmt in statements {
1336 let s = stmt.trim();
1337 if s.is_empty() {
1338 continue;
1339 }
1340 tx.execute(s, &[])
1341 .map_err(|e| CliError::MigrationError(format!("Statement failed: {e}\n{s}")))?;
1342 }
1343
1344 tx.commit()
1345 .map_err(|e| CliError::MigrationError(e.to_string()))?;
1346 }
1347
1348 Ok(())
1349}
1350
1351#[cfg(feature = "postgres-sync")]
1352fn run_postgres_sync_migrations(
1353 set: &Migrations,
1354 creds: &PostgresCreds,
1355) -> Result<MigrationResult, CliError> {
1356 let url = creds.connection_url();
1357 let mut client = postgres::Client::connect(&url, postgres::NoTls)
1358 .map_err(|e| CliError::ConnectionError(format!("Failed to connect to PostgreSQL: {e}")))?;
1359
1360 if let Some(schema_sql) = set.create_schema_sql() {
1362 client
1363 .execute(&schema_sql, &[])
1364 .map_err(|e| CliError::MigrationError(e.to_string()))?;
1365 }
1366
1367 ensure_postgres_tracking_table_sync(&mut client, set)?;
1368
1369 let rows = client
1371 .query(&set.applied_names_sql(), &[])
1372 .unwrap_or_default();
1373 let applied_names: Vec<String> = rows.iter().filter_map(|r| r.try_get(0).ok()).collect();
1374
1375 let pending: Vec<_> = set.pending(&applied_names).collect();
1377 if pending.is_empty() {
1378 return Ok(MigrationResult {
1379 applied_count: 0,
1380 applied_migrations: vec![],
1381 });
1382 }
1383
1384 let has_concurrent = pending
1385 .iter()
1386 .any(|m| has_postgres_concurrent_index(m.statements()));
1387
1388 if has_concurrent {
1389 let mut applied = Vec::new();
1391 for migration in &pending {
1392 for stmt in migration.statements() {
1393 if !stmt.trim().is_empty() {
1394 client.execute(stmt, &[]).map_err(|e| {
1395 CliError::MigrationError(format!(
1396 "Migration '{}' failed: {}",
1397 migration.hash(),
1398 e
1399 ))
1400 })?;
1401 }
1402 }
1403 client
1404 .execute(&set.record_migration_sql(migration), &[])
1405 .map_err(|e| CliError::MigrationError(e.to_string()))?;
1406 applied.push(migration.hash().to_string());
1407 }
1408
1409 return Ok(MigrationResult {
1410 applied_count: applied.len(),
1411 applied_migrations: applied,
1412 });
1413 }
1414
1415 let mut tx = client
1417 .transaction()
1418 .map_err(|e| CliError::MigrationError(e.to_string()))?;
1419
1420 let mut applied = Vec::new();
1421 for migration in &pending {
1422 for stmt in migration.statements() {
1423 if !stmt.trim().is_empty() {
1424 tx.execute(stmt, &[]).map_err(|e| {
1425 CliError::MigrationError(format!(
1426 "Migration '{}' failed: {}",
1427 migration.hash(),
1428 e
1429 ))
1430 })?;
1431 }
1432 }
1433 tx.execute(&set.record_migration_sql(migration), &[])
1434 .map_err(|e| CliError::MigrationError(e.to_string()))?;
1435 applied.push(migration.hash().to_string());
1436 }
1437
1438 tx.commit()
1439 .map_err(|e| CliError::MigrationError(e.to_string()))?;
1440
1441 Ok(MigrationResult {
1442 applied_count: applied.len(),
1443 applied_migrations: applied,
1444 })
1445}
1446
1447#[cfg(feature = "postgres-sync")]
1448fn inspect_postgres_sync_migrations(
1449 set: &Migrations,
1450 creds: &PostgresCreds,
1451) -> Result<MigrationPlan, CliError> {
1452 let url = creds.connection_url();
1453 let mut client = postgres::Client::connect(&url, postgres::NoTls)
1454 .map_err(|e| CliError::ConnectionError(format!("Failed to connect to PostgreSQL: {e}")))?;
1455
1456 if let Some(schema_sql) = set.create_schema_sql() {
1457 client
1458 .execute(&schema_sql, &[])
1459 .map_err(|e| CliError::MigrationError(e.to_string()))?;
1460 }
1461
1462 ensure_postgres_tracking_table_sync(&mut client, set)?;
1463 let applied = query_applied_records_postgres_sync(&mut client, set);
1464 build_migration_plan(set, &applied)
1465}
1466
1467#[cfg(feature = "postgres-sync")]
1468fn query_applied_records_postgres_sync(
1469 client: &mut postgres::Client,
1470 set: &Migrations,
1471) -> Vec<AppliedMigrationRecord> {
1472 let sql = format!(
1473 r#"SELECT hash, "name" FROM {} WHERE "name" IS NOT NULL ORDER BY id;"#,
1474 set.table_ident_sql()
1475 );
1476 let rows = client.query(&sql, &[]).unwrap_or_default();
1477
1478 let mut applied = Vec::new();
1479 for row in rows {
1480 let hash = row.try_get::<_, Option<String>>(0).ok().flatten();
1481 let name = row.try_get::<_, Option<String>>(1).ok().flatten();
1482 if let (Some(hash), Some(name)) = (hash, name) {
1483 applied.push(AppliedMigrationRecord { hash, name });
1484 }
1485 }
1486
1487 applied
1488}
1489
1490#[cfg(all(feature = "tokio-postgres", not(feature = "postgres-sync")))]
1495fn execute_postgres_async_statements(
1496 creds: &PostgresCreds,
1497 statements: &[String],
1498) -> Result<(), CliError> {
1499 let rt = tokio::runtime::Builder::new_current_thread()
1500 .enable_all()
1501 .build()
1502 .map_err(|e| CliError::Other(format!("Failed to create async runtime: {}", e)))?;
1503
1504 rt.block_on(execute_postgres_async_inner(creds, statements))
1505}
1506
1507#[cfg(all(feature = "tokio-postgres", not(feature = "postgres-sync")))]
1508async fn execute_postgres_async_inner(
1509 creds: &PostgresCreds,
1510 statements: &[String],
1511) -> Result<(), CliError> {
1512 let url = creds.connection_url();
1513 let (mut client, connection) = tokio_postgres::connect(&url, tokio_postgres::NoTls)
1514 .await
1515 .map_err(|e| {
1516 CliError::ConnectionError(format!("Failed to connect to PostgreSQL: {}", e))
1517 })?;
1518
1519 tokio::spawn(async move {
1520 if let Err(e) = connection.await {
1521 eprintln!(
1522 "{}",
1523 output::err_line(&format!("PostgreSQL connection error: {e}"))
1524 );
1525 }
1526 });
1527
1528 if has_postgres_concurrent_index(statements) {
1529 for stmt in statements {
1531 let s = stmt.trim();
1532 if s.is_empty() {
1533 continue;
1534 }
1535 client
1536 .execute(s, &[])
1537 .await
1538 .map_err(|e| CliError::MigrationError(format!("Statement failed: {}\n{}", e, s)))?;
1539 }
1540 } else {
1541 let tx = client
1542 .transaction()
1543 .await
1544 .map_err(|e| CliError::MigrationError(e.to_string()))?;
1545
1546 for stmt in statements {
1547 let s = stmt.trim();
1548 if s.is_empty() {
1549 continue;
1550 }
1551 tx.execute(s, &[])
1552 .await
1553 .map_err(|e| CliError::MigrationError(format!("Statement failed: {}\n{}", e, s)))?;
1554 }
1555
1556 tx.commit()
1557 .await
1558 .map_err(|e| CliError::MigrationError(e.to_string()))?;
1559 }
1560
1561 Ok(())
1562}
1563
1564#[cfg(feature = "tokio-postgres")]
1565#[allow(dead_code)] fn run_postgres_async_migrations(
1567 set: &Migrations,
1568 creds: &PostgresCreds,
1569) -> Result<MigrationResult, CliError> {
1570 let rt = tokio::runtime::Builder::new_current_thread()
1571 .enable_all()
1572 .build()
1573 .map_err(|e| CliError::Other(format!("Failed to create async runtime: {e}")))?;
1574
1575 rt.block_on(run_postgres_async_inner(set, creds))
1576}
1577
1578#[cfg(feature = "tokio-postgres")]
1579#[allow(dead_code)] fn inspect_postgres_async_migrations(
1581 set: &Migrations,
1582 creds: &PostgresCreds,
1583) -> Result<MigrationPlan, CliError> {
1584 let rt = tokio::runtime::Builder::new_current_thread()
1585 .enable_all()
1586 .build()
1587 .map_err(|e| CliError::Other(format!("Failed to create async runtime: {e}")))?;
1588
1589 rt.block_on(inspect_postgres_async_inner(set, creds))
1590}
1591
1592#[cfg(feature = "tokio-postgres")]
1593#[allow(dead_code)]
1594async fn inspect_postgres_async_inner(
1595 set: &Migrations,
1596 creds: &PostgresCreds,
1597) -> Result<MigrationPlan, CliError> {
1598 let url = creds.connection_url();
1599 let (client, connection) = tokio_postgres::connect(&url, tokio_postgres::NoTls)
1600 .await
1601 .map_err(|e| CliError::ConnectionError(format!("Failed to connect to PostgreSQL: {e}")))?;
1602
1603 tokio::spawn(async move {
1604 if let Err(e) = connection.await {
1605 eprintln!(
1606 "{}",
1607 output::err_line(&format!("PostgreSQL connection error: {e}"))
1608 );
1609 }
1610 });
1611
1612 if let Some(schema_sql) = set.create_schema_sql() {
1613 client
1614 .execute(&schema_sql, &[])
1615 .await
1616 .map_err(|e| CliError::MigrationError(e.to_string()))?;
1617 }
1618
1619 ensure_postgres_tracking_table_async(&client, set).await?;
1620 let applied = query_applied_records_postgres_async(&client, set).await;
1621 build_migration_plan(set, &applied)
1622}
1623
1624#[cfg(feature = "tokio-postgres")]
1625async fn query_applied_records_postgres_async(
1626 client: &tokio_postgres::Client,
1627 set: &Migrations,
1628) -> Vec<AppliedMigrationRecord> {
1629 let sql = format!(
1630 r#"SELECT hash, "name" FROM {} WHERE "name" IS NOT NULL ORDER BY id;"#,
1631 set.table_ident_sql()
1632 );
1633 let rows = client.query(&sql, &[]).await.unwrap_or_default();
1634
1635 let mut applied = Vec::new();
1636 for row in rows {
1637 let hash = row.try_get::<_, Option<String>>(0).ok().flatten();
1638 let name = row.try_get::<_, Option<String>>(1).ok().flatten();
1639 if let (Some(hash), Some(name)) = (hash, name) {
1640 applied.push(AppliedMigrationRecord { hash, name });
1641 }
1642 }
1643
1644 applied
1645}
1646
1647#[cfg(feature = "tokio-postgres")]
1648#[allow(dead_code)]
1649async fn run_postgres_async_inner(
1650 set: &Migrations,
1651 creds: &PostgresCreds,
1652) -> Result<MigrationResult, CliError> {
1653 let url = creds.connection_url();
1654 let (mut client, connection) = tokio_postgres::connect(&url, tokio_postgres::NoTls)
1655 .await
1656 .map_err(|e| CliError::ConnectionError(format!("Failed to connect to PostgreSQL: {e}")))?;
1657
1658 tokio::spawn(async move {
1660 if let Err(e) = connection.await {
1661 eprintln!(
1662 "{}",
1663 output::err_line(&format!("PostgreSQL connection error: {e}"))
1664 );
1665 }
1666 });
1667
1668 if let Some(schema_sql) = set.create_schema_sql() {
1670 client
1671 .execute(&schema_sql, &[])
1672 .await
1673 .map_err(|e| CliError::MigrationError(e.to_string()))?;
1674 }
1675
1676 ensure_postgres_tracking_table_async(&client, set).await?;
1677
1678 let rows = client
1680 .query(&set.applied_names_sql(), &[])
1681 .await
1682 .unwrap_or_default();
1683 let applied_names: Vec<String> = rows.iter().filter_map(|r| r.try_get(0).ok()).collect();
1684
1685 let pending: Vec<_> = set.pending(&applied_names).collect();
1687 if pending.is_empty() {
1688 return Ok(MigrationResult {
1689 applied_count: 0,
1690 applied_migrations: vec![],
1691 });
1692 }
1693
1694 let has_concurrent = pending
1695 .iter()
1696 .any(|m| has_postgres_concurrent_index(m.statements()));
1697
1698 if has_concurrent {
1699 let mut applied = Vec::new();
1701 for migration in &pending {
1702 for stmt in migration.statements() {
1703 if !stmt.trim().is_empty() {
1704 client.execute(stmt, &[]).await.map_err(|e| {
1705 CliError::MigrationError(format!(
1706 "Migration '{}' failed: {}",
1707 migration.hash(),
1708 e
1709 ))
1710 })?;
1711 }
1712 }
1713 client
1714 .execute(&set.record_migration_sql(migration), &[])
1715 .await
1716 .map_err(|e| CliError::MigrationError(e.to_string()))?;
1717 applied.push(migration.hash().to_string());
1718 }
1719
1720 return Ok(MigrationResult {
1721 applied_count: applied.len(),
1722 applied_migrations: applied,
1723 });
1724 }
1725
1726 let tx = client
1728 .transaction()
1729 .await
1730 .map_err(|e| CliError::MigrationError(e.to_string()))?;
1731
1732 let mut applied = Vec::new();
1733 for migration in &pending {
1734 for stmt in migration.statements() {
1735 if !stmt.trim().is_empty() {
1736 tx.execute(stmt, &[]).await.map_err(|e| {
1737 CliError::MigrationError(format!(
1738 "Migration '{}' failed: {}",
1739 migration.hash(),
1740 e
1741 ))
1742 })?;
1743 }
1744 }
1745 tx.execute(&set.record_migration_sql(migration), &[])
1746 .await
1747 .map_err(|e| CliError::MigrationError(e.to_string()))?;
1748 applied.push(migration.hash().to_string());
1749 }
1750
1751 tx.commit()
1752 .await
1753 .map_err(|e| CliError::MigrationError(e.to_string()))?;
1754
1755 Ok(MigrationResult {
1756 applied_count: applied.len(),
1757 applied_migrations: applied,
1758 })
1759}
1760
1761#[cfg(feature = "libsql")]
1766fn execute_libsql_local_statements(path: &str, statements: &[String]) -> Result<(), CliError> {
1767 let rt = tokio::runtime::Builder::new_current_thread()
1768 .enable_all()
1769 .build()
1770 .map_err(|e| CliError::Other(format!("Failed to create async runtime: {e}")))?;
1771
1772 rt.block_on(execute_libsql_local_inner(path, statements))
1773}
1774
1775#[cfg(feature = "libsql")]
1776async fn execute_libsql_local_inner(path: &str, statements: &[String]) -> Result<(), CliError> {
1777 let db = libsql::Builder::new_local(path)
1778 .build()
1779 .await
1780 .map_err(|e| {
1781 CliError::ConnectionError(format!("Failed to open LibSQL database '{path}': {e}"))
1782 })?;
1783
1784 let conn = db
1785 .connect()
1786 .map_err(|e| CliError::ConnectionError(e.to_string()))?;
1787
1788 let tx = conn
1789 .transaction()
1790 .await
1791 .map_err(|e| CliError::MigrationError(e.to_string()))?;
1792
1793 for stmt in statements {
1794 let s = stmt.trim();
1795 if s.is_empty() {
1796 continue;
1797 }
1798 if let Err(e) = tx.execute(s, ()).await {
1799 tx.rollback().await.ok();
1800 return Err(CliError::MigrationError(format!(
1801 "Statement failed: {e}\n{s}"
1802 )));
1803 }
1804 }
1805
1806 tx.commit()
1807 .await
1808 .map_err(|e| CliError::MigrationError(e.to_string()))?;
1809
1810 Ok(())
1811}
1812
1813#[cfg(feature = "libsql")]
1814fn run_libsql_local_migrations(set: &Migrations, path: &str) -> Result<MigrationResult, CliError> {
1815 let rt = tokio::runtime::Builder::new_current_thread()
1816 .enable_all()
1817 .build()
1818 .map_err(|e| CliError::Other(format!("Failed to create async runtime: {e}")))?;
1819
1820 rt.block_on(run_libsql_local_inner(set, path))
1821}
1822
1823#[cfg(feature = "libsql")]
1824fn inspect_libsql_local_migrations(
1825 set: &Migrations,
1826 path: &str,
1827) -> Result<MigrationPlan, CliError> {
1828 let rt = tokio::runtime::Builder::new_current_thread()
1829 .enable_all()
1830 .build()
1831 .map_err(|e| CliError::Other(format!("Failed to create async runtime: {e}")))?;
1832
1833 rt.block_on(inspect_libsql_local_inner(set, path))
1834}
1835
1836#[cfg(feature = "libsql")]
1837async fn inspect_libsql_local_inner(
1838 set: &Migrations,
1839 path: &str,
1840) -> Result<MigrationPlan, CliError> {
1841 let db = libsql::Builder::new_local(path)
1842 .build()
1843 .await
1844 .map_err(|e| {
1845 CliError::ConnectionError(format!("Failed to open LibSQL database '{path}': {e}"))
1846 })?;
1847
1848 let conn = db
1849 .connect()
1850 .map_err(|e| CliError::ConnectionError(e.to_string()))?;
1851
1852 ensure_sqlite_tracking_table_libsql(&conn, set).await?;
1853 let applied = query_applied_records_libsql(&conn, set).await?;
1854 build_migration_plan(set, &applied)
1855}
1856
1857#[cfg(feature = "libsql")]
1858async fn run_libsql_local_inner(set: &Migrations, path: &str) -> Result<MigrationResult, CliError> {
1859 let db = libsql::Builder::new_local(path)
1860 .build()
1861 .await
1862 .map_err(|e| {
1863 CliError::ConnectionError(format!("Failed to open LibSQL database '{path}': {e}"))
1864 })?;
1865
1866 let conn = db
1867 .connect()
1868 .map_err(|e| CliError::ConnectionError(e.to_string()))?;
1869
1870 ensure_sqlite_tracking_table_libsql(&conn, set).await?;
1871
1872 let applied_names = query_applied_names_libsql(&conn, set).await?;
1874
1875 let pending: Vec<_> = set.pending(&applied_names).collect();
1877 if pending.is_empty() {
1878 return Ok(MigrationResult {
1879 applied_count: 0,
1880 applied_migrations: vec![],
1881 });
1882 }
1883
1884 let tx = conn
1886 .transaction()
1887 .await
1888 .map_err(|e| CliError::MigrationError(e.to_string()))?;
1889
1890 let mut applied = Vec::new();
1891 for migration in &pending {
1892 for stmt in migration.statements() {
1893 if !stmt.trim().is_empty()
1894 && let Err(e) = tx.execute(stmt, ()).await
1895 {
1896 tx.rollback().await.ok();
1897 return Err(CliError::MigrationError(format!(
1898 "Migration '{}' failed: {}",
1899 migration.hash(),
1900 e
1901 )));
1902 }
1903 }
1904 if let Err(e) = tx.execute(&set.record_migration_sql(migration), ()).await {
1905 tx.rollback().await.ok();
1906 return Err(CliError::MigrationError(e.to_string()));
1907 }
1908 applied.push(migration.hash().to_string());
1909 }
1910
1911 tx.commit()
1912 .await
1913 .map_err(|e| CliError::MigrationError(e.to_string()))?;
1914
1915 Ok(MigrationResult {
1916 applied_count: applied.len(),
1917 applied_migrations: applied,
1918 })
1919}
1920
1921#[cfg(feature = "libsql")]
1922async fn query_applied_names_libsql(
1923 conn: &libsql::Connection,
1924 set: &Migrations,
1925) -> Result<Vec<String>, CliError> {
1926 let Ok(mut rows) = conn.query(&set.applied_names_sql(), ()).await else {
1927 return Ok(vec![]); };
1929
1930 let mut names = Vec::new();
1931 while let Ok(Some(row)) = rows.next().await {
1932 if let Ok(name) = row.get::<String>(0) {
1933 names.push(name);
1934 }
1935 }
1936
1937 Ok(names)
1938}
1939
1940#[cfg(feature = "libsql")]
1941async fn query_applied_records_libsql(
1942 conn: &libsql::Connection,
1943 set: &Migrations,
1944) -> Result<Vec<AppliedMigrationRecord>, CliError> {
1945 let sql = format!(
1946 r#"SELECT hash, "name" FROM {} WHERE "name" IS NOT NULL ORDER BY id;"#,
1947 set.table_ident_sql()
1948 );
1949 let Ok(mut rows) = conn.query(&sql, ()).await else {
1950 return Ok(vec![]); };
1952
1953 let mut applied = Vec::new();
1954 while let Ok(Some(row)) = rows.next().await {
1955 if let (Ok(hash), Ok(name)) = (row.get::<String>(0), row.get::<String>(1)) {
1956 applied.push(AppliedMigrationRecord { hash, name });
1957 }
1958 }
1959
1960 Ok(applied)
1961}
1962
1963#[cfg(feature = "turso")]
1968fn execute_turso_statements(
1969 url: &str,
1970 auth_token: Option<&str>,
1971 statements: &[String],
1972) -> Result<(), CliError> {
1973 let rt = tokio::runtime::Builder::new_current_thread()
1974 .enable_all()
1975 .build()
1976 .map_err(|e| CliError::Other(format!("Failed to create async runtime: {e}")))?;
1977
1978 rt.block_on(execute_turso_inner(url, auth_token, statements))
1979}
1980
1981#[cfg(feature = "turso")]
1982async fn execute_turso_inner(
1983 url: &str,
1984 auth_token: Option<&str>,
1985 statements: &[String],
1986) -> Result<(), CliError> {
1987 let builder =
1988 libsql::Builder::new_remote(url.to_string(), auth_token.unwrap_or("").to_string());
1989
1990 let db = builder.build().await.map_err(|e| {
1991 CliError::ConnectionError(format!("Failed to connect to Turso '{url}': {e}"))
1992 })?;
1993
1994 let conn = db
1995 .connect()
1996 .map_err(|e| CliError::ConnectionError(e.to_string()))?;
1997
1998 let tx = conn
1999 .transaction()
2000 .await
2001 .map_err(|e| CliError::MigrationError(e.to_string()))?;
2002
2003 for stmt in statements {
2004 let s = stmt.trim();
2005 if s.is_empty() {
2006 continue;
2007 }
2008 if let Err(e) = tx.execute(s, ()).await {
2009 tx.rollback().await.ok();
2010 return Err(CliError::MigrationError(format!(
2011 "Statement failed: {e}\n{s}"
2012 )));
2013 }
2014 }
2015
2016 tx.commit()
2017 .await
2018 .map_err(|e| CliError::MigrationError(e.to_string()))?;
2019
2020 Ok(())
2021}
2022
2023#[cfg(feature = "turso")]
2024fn run_turso_migrations(
2025 set: &Migrations,
2026 url: &str,
2027 auth_token: Option<&str>,
2028) -> Result<MigrationResult, CliError> {
2029 let rt = tokio::runtime::Builder::new_current_thread()
2030 .enable_all()
2031 .build()
2032 .map_err(|e| CliError::Other(format!("Failed to create async runtime: {e}")))?;
2033
2034 rt.block_on(run_turso_inner(set, url, auth_token))
2035}
2036
2037#[cfg(feature = "turso")]
2038fn inspect_turso_migrations(
2039 set: &Migrations,
2040 url: &str,
2041 auth_token: Option<&str>,
2042) -> Result<MigrationPlan, CliError> {
2043 let rt = tokio::runtime::Builder::new_current_thread()
2044 .enable_all()
2045 .build()
2046 .map_err(|e| CliError::Other(format!("Failed to create async runtime: {e}")))?;
2047
2048 rt.block_on(inspect_turso_inner(set, url, auth_token))
2049}
2050
2051#[cfg(feature = "turso")]
2052async fn inspect_turso_inner(
2053 set: &Migrations,
2054 url: &str,
2055 auth_token: Option<&str>,
2056) -> Result<MigrationPlan, CliError> {
2057 let builder =
2058 libsql::Builder::new_remote(url.to_string(), auth_token.unwrap_or("").to_string());
2059
2060 let db = builder.build().await.map_err(|e| {
2061 CliError::ConnectionError(format!("Failed to connect to Turso '{url}': {e}"))
2062 })?;
2063
2064 let conn = db
2065 .connect()
2066 .map_err(|e| CliError::ConnectionError(e.to_string()))?;
2067
2068 ensure_sqlite_tracking_table_libsql(&conn, set).await?;
2069 let applied = query_applied_records_turso(&conn, set).await?;
2070 build_migration_plan(set, &applied)
2071}
2072
2073#[cfg(feature = "turso")]
2074async fn run_turso_inner(
2075 set: &Migrations,
2076 url: &str,
2077 auth_token: Option<&str>,
2078) -> Result<MigrationResult, CliError> {
2079 let builder =
2080 libsql::Builder::new_remote(url.to_string(), auth_token.unwrap_or("").to_string());
2081
2082 let db = builder.build().await.map_err(|e| {
2083 CliError::ConnectionError(format!("Failed to connect to Turso '{url}': {e}"))
2084 })?;
2085
2086 let conn = db
2087 .connect()
2088 .map_err(|e| CliError::ConnectionError(e.to_string()))?;
2089
2090 ensure_sqlite_tracking_table_libsql(&conn, set).await?;
2091
2092 let applied_names = query_applied_names_turso(&conn, set).await?;
2094
2095 let pending: Vec<_> = set.pending(&applied_names).collect();
2097 if pending.is_empty() {
2098 return Ok(MigrationResult {
2099 applied_count: 0,
2100 applied_migrations: vec![],
2101 });
2102 }
2103
2104 let tx = conn
2106 .transaction()
2107 .await
2108 .map_err(|e| CliError::MigrationError(e.to_string()))?;
2109
2110 let mut applied = Vec::new();
2111 for migration in &pending {
2112 for stmt in migration.statements() {
2113 if !stmt.trim().is_empty()
2114 && let Err(e) = tx.execute(stmt, ()).await
2115 {
2116 tx.rollback().await.ok();
2117 return Err(CliError::MigrationError(format!(
2118 "Migration '{}' failed: {}",
2119 migration.hash(),
2120 e
2121 )));
2122 }
2123 }
2124 if let Err(e) = tx.execute(&set.record_migration_sql(migration), ()).await {
2125 tx.rollback().await.ok();
2126 return Err(CliError::MigrationError(e.to_string()));
2127 }
2128 applied.push(migration.hash().to_string());
2129 }
2130
2131 tx.commit()
2132 .await
2133 .map_err(|e| CliError::MigrationError(e.to_string()))?;
2134
2135 Ok(MigrationResult {
2136 applied_count: applied.len(),
2137 applied_migrations: applied,
2138 })
2139}
2140
2141#[cfg(feature = "turso")]
2142async fn query_applied_names_turso(
2143 conn: &libsql::Connection,
2144 set: &Migrations,
2145) -> Result<Vec<String>, CliError> {
2146 let Ok(mut rows) = conn.query(&set.applied_names_sql(), ()).await else {
2147 return Ok(vec![]); };
2149
2150 let mut names = Vec::new();
2151 while let Ok(Some(row)) = rows.next().await {
2152 if let Ok(name) = row.get::<String>(0) {
2153 names.push(name);
2154 }
2155 }
2156
2157 Ok(names)
2158}
2159
2160#[cfg(feature = "turso")]
2161async fn query_applied_records_turso(
2162 conn: &libsql::Connection,
2163 set: &Migrations,
2164) -> Result<Vec<AppliedMigrationRecord>, CliError> {
2165 let sql = format!(
2166 r#"SELECT hash, "name" FROM {} WHERE "name" IS NOT NULL ORDER BY id;"#,
2167 set.table_ident_sql()
2168 );
2169 let Ok(mut rows) = conn.query(&sql, ()).await else {
2170 return Ok(vec![]); };
2172
2173 let mut applied = Vec::new();
2174 while let Ok(Some(row)) = rows.next().await {
2175 if let (Ok(hash), Ok(name)) = (row.get::<String>(0), row.get::<String>(1)) {
2176 applied.push(AppliedMigrationRecord { hash, name });
2177 }
2178 }
2179
2180 Ok(applied)
2181}
2182
2183#[derive(Debug)]
2189pub struct IntrospectResult {
2190 pub schema_code: String,
2192 pub table_count: usize,
2194 pub index_count: usize,
2196 pub view_count: usize,
2198 pub warnings: Vec<String>,
2200 pub snapshot: Snapshot,
2202 pub snapshot_path: std::path::PathBuf,
2204}
2205
2206#[allow(clippy::too_many_arguments)]
2216pub fn run_introspection(
2217 credentials: &Credentials,
2218 dialect: Dialect,
2219 out_dir: &Path,
2220 init_metadata: bool,
2221 breakpoints: bool,
2222 introspect_casing: Option<IntrospectCasing>,
2223 filters: &SnapshotFilters,
2224 migrations_table: &str,
2225 migrations_schema: &str,
2226) -> Result<IntrospectResult, CliError> {
2227 use drizzle_migrations::words::generate_migration_tag;
2228
2229 let mut result = introspect_database(credentials, dialect)?;
2231 apply_snapshot_filters(&mut result.snapshot, dialect, filters)?;
2232 if !filters.is_empty() || introspect_casing.is_some() {
2233 regenerate_schema_from_snapshot(&mut result, dialect, introspect_casing);
2234 }
2235
2236 let schema_path = out_dir.join("schema.rs");
2238 if let Some(parent) = schema_path.parent() {
2239 std::fs::create_dir_all(parent).map_err(|e| {
2240 CliError::Other(format!(
2241 "Failed to create output directory '{}': {}",
2242 parent.display(),
2243 e
2244 ))
2245 })?;
2246 }
2247 std::fs::write(&schema_path, &result.schema_code).map_err(|e| {
2248 CliError::Other(format!(
2249 "Failed to write schema file '{}': {}",
2250 schema_path.display(),
2251 e
2252 ))
2253 })?;
2254
2255 let journal_path = out_dir.join("meta").join("_journal.json");
2256 if journal_path.exists() {
2257 return Err(CliError::Other(
2258 "Detected old drizzle-kit migration folders. Upgrade them before writing new migrations."
2259 .to_string(),
2260 ));
2261 }
2262
2263 let tag = generate_migration_tag(None);
2265
2266 let migration_dir = out_dir.join(&tag);
2268 std::fs::create_dir_all(&migration_dir).map_err(|e| {
2269 CliError::Other(format!(
2270 "Failed to create migration directory '{}': {}",
2271 migration_dir.display(),
2272 e
2273 ))
2274 })?;
2275
2276 let snapshot_path = migration_dir.join("snapshot.json");
2278 result.snapshot.save(&snapshot_path).map_err(|e| {
2279 CliError::Other(format!(
2280 "Failed to write snapshot file '{}': {}",
2281 snapshot_path.display(),
2282 e
2283 ))
2284 })?;
2285
2286 let base_dialect = dialect.to_base();
2288 let empty_snapshot = Snapshot::empty(base_dialect);
2289 let sql_statements =
2290 generate_introspect_migration(&empty_snapshot, &result.snapshot, breakpoints)?;
2291
2292 let migration_sql_path = migration_dir.join("migration.sql");
2294 let sql_content = format_migration_sql(&sql_statements, breakpoints);
2295 std::fs::write(&migration_sql_path, &sql_content).map_err(|e| {
2296 CliError::Other(format!(
2297 "Failed to write migration file '{}': {}",
2298 migration_sql_path.display(),
2299 e
2300 ))
2301 })?;
2302
2303 result.snapshot_path = snapshot_path;
2305
2306 if init_metadata {
2307 apply_init_metadata(
2308 credentials,
2309 dialect,
2310 out_dir,
2311 migrations_table,
2312 migrations_schema,
2313 )?;
2314 }
2315
2316 Ok(result)
2317}
2318
2319#[allow(unused_variables)] fn apply_init_metadata(
2325 credentials: &Credentials,
2326 dialect: Dialect,
2327 out_dir: &Path,
2328 migrations_table: &str,
2329 migrations_schema: &str,
2330) -> Result<(), CliError> {
2331 #[cfg(any(
2332 feature = "rusqlite",
2333 feature = "libsql",
2334 feature = "turso",
2335 feature = "postgres-sync",
2336 feature = "tokio-postgres",
2337 feature = "d1-http",
2338 ))]
2339 let set = load_migration_set(dialect, out_dir, migrations_table, migrations_schema)?;
2340
2341 match credentials {
2342 #[cfg(feature = "rusqlite")]
2343 Credentials::Sqlite { path } => init_sqlite_metadata(path, &set),
2344
2345 #[cfg(not(feature = "rusqlite"))]
2346 Credentials::Sqlite { .. } => Err(CliError::MissingDriver {
2347 dialect: "SQLite",
2348 feature: "rusqlite",
2349 }),
2350
2351 #[cfg(any(feature = "libsql", feature = "turso"))]
2352 Credentials::Turso { url, auth_token } => {
2353 if is_local_libsql(url) {
2354 #[cfg(feature = "libsql")]
2355 {
2356 init_libsql_local_metadata(url, &set)
2357 }
2358 #[cfg(not(feature = "libsql"))]
2359 {
2360 Err(CliError::MissingDriver {
2361 dialect: "LibSQL (local)",
2362 feature: "libsql",
2363 })
2364 }
2365 } else {
2366 #[cfg(feature = "turso")]
2367 {
2368 init_turso_metadata(url, auth_token.as_deref(), &set)
2369 }
2370 #[cfg(not(feature = "turso"))]
2371 {
2372 let _ = auth_token;
2373 Err(CliError::MissingDriver {
2374 dialect: "Turso (remote)",
2375 feature: "turso",
2376 })
2377 }
2378 }
2379 }
2380
2381 #[cfg(all(not(feature = "turso"), not(feature = "libsql")))]
2382 Credentials::Turso { .. } => Err(CliError::MissingDriver {
2383 dialect: "Turso",
2384 feature: "turso or libsql",
2385 }),
2386
2387 Credentials::Postgres(creds) => {
2388 let _ = creds;
2389 core::cfg_select! {
2390 feature = "postgres-sync" => init_postgres_sync_metadata(creds, &set),
2391 feature = "tokio-postgres" => init_postgres_async_metadata(creds, &set),
2392 _ => Err(CliError::MissingDriver {
2393 dialect: "PostgreSQL",
2394 feature: "postgres-sync or tokio-postgres",
2395 }),
2396 }
2397 }
2398
2399 #[cfg(feature = "d1-http")]
2400 Credentials::D1 {
2401 account_id,
2402 database_id,
2403 token,
2404 } => d1_http::init_metadata(&set, account_id, database_id, token),
2405
2406 #[cfg(not(feature = "d1-http"))]
2407 Credentials::D1 { .. } => Err(CliError::MissingDriver {
2408 dialect: "Cloudflare D1 (HTTP)",
2409 feature: "d1-http",
2410 }),
2411
2412 Credentials::AwsDataApi { .. } => Err(CliError::UnsupportedForDriver {
2413 operation: "Migration metadata init against AWS Data API",
2414 driver: "aws-data-api",
2415 hint: "AWS RDS Data API schema ops are not yet wired into this CLI. Seed the \
2416 migrations table manually via `aws rds-data execute-statement` or \
2417 tokio-postgres.",
2418 }),
2419 }
2420}
2421
2422#[cfg(any(
2423 feature = "rusqlite",
2424 feature = "libsql",
2425 feature = "turso",
2426 feature = "postgres-sync",
2427 feature = "tokio-postgres",
2428 feature = "d1-http",
2429))]
2430pub(crate) fn validate_init_metadata(
2431 applied_names: &[String],
2432 set: &Migrations,
2433) -> Result<(), CliError> {
2434 if !applied_names.is_empty() {
2435 return Err(CliError::Other(
2436 "--init can't be used when database already has migrations set".into(),
2437 ));
2438 }
2439
2440 if set.all().len() > 1 {
2441 return Err(CliError::Other(
2442 "--init can't be used with existing migrations".into(),
2443 ));
2444 }
2445
2446 Ok(())
2447}
2448
2449#[cfg(feature = "rusqlite")]
2454fn init_sqlite_metadata(path: &str, set: &Migrations) -> Result<(), CliError> {
2455 let conn = rusqlite::Connection::open(path).map_err(|e| {
2456 CliError::ConnectionError(format!("Failed to open SQLite database '{path}': {e}"))
2457 })?;
2458
2459 ensure_sqlite_tracking_table(&conn, set)?;
2460
2461 let applied_names = query_applied_names_sqlite(&conn, set)?;
2462 validate_init_metadata(&applied_names, set)?;
2463
2464 let Some(first) = set.all().first() else {
2465 return Ok(());
2466 };
2467
2468 conn.execute(&set.record_migration_sql(first), [])
2469 .map_err(|e| CliError::MigrationError(e.to_string()))?;
2470
2471 Ok(())
2472}
2473
2474#[cfg(feature = "libsql")]
2475fn init_libsql_local_metadata(path: &str, set: &Migrations) -> Result<(), CliError> {
2476 let rt = tokio::runtime::Builder::new_current_thread()
2477 .enable_all()
2478 .build()
2479 .map_err(|e| CliError::Other(format!("Failed to create async runtime: {e}")))?;
2480
2481 rt.block_on(init_libsql_local_metadata_inner(path, set))
2482}
2483
2484#[cfg(feature = "libsql")]
2485async fn init_libsql_local_metadata_inner(path: &str, set: &Migrations) -> Result<(), CliError> {
2486 let db = libsql::Builder::new_local(path)
2487 .build()
2488 .await
2489 .map_err(|e| {
2490 CliError::ConnectionError(format!("Failed to open LibSQL database '{path}': {e}"))
2491 })?;
2492
2493 let conn = db
2494 .connect()
2495 .map_err(|e| CliError::ConnectionError(e.to_string()))?;
2496
2497 ensure_sqlite_tracking_table_libsql(&conn, set).await?;
2498
2499 let applied_names = query_applied_names_libsql(&conn, set).await?;
2500 validate_init_metadata(&applied_names, set)?;
2501
2502 let Some(first) = set.all().first() else {
2503 return Ok(());
2504 };
2505
2506 conn.execute(&set.record_migration_sql(first), ())
2507 .await
2508 .map_err(|e| CliError::MigrationError(e.to_string()))?;
2509
2510 Ok(())
2511}
2512
2513#[cfg(feature = "turso")]
2514fn init_turso_metadata(
2515 url: &str,
2516 auth_token: Option<&str>,
2517 set: &Migrations,
2518) -> Result<(), CliError> {
2519 let rt = tokio::runtime::Builder::new_current_thread()
2520 .enable_all()
2521 .build()
2522 .map_err(|e| CliError::Other(format!("Failed to create async runtime: {e}")))?;
2523
2524 rt.block_on(init_turso_metadata_inner(url, auth_token, set))
2525}
2526
2527#[cfg(feature = "turso")]
2528async fn init_turso_metadata_inner(
2529 url: &str,
2530 auth_token: Option<&str>,
2531 set: &Migrations,
2532) -> Result<(), CliError> {
2533 let builder =
2534 libsql::Builder::new_remote(url.to_string(), auth_token.unwrap_or("").to_string());
2535
2536 let db = builder.build().await.map_err(|e| {
2537 CliError::ConnectionError(format!("Failed to connect to Turso '{url}': {e}"))
2538 })?;
2539
2540 let conn = db
2541 .connect()
2542 .map_err(|e| CliError::ConnectionError(e.to_string()))?;
2543
2544 ensure_sqlite_tracking_table_libsql(&conn, set).await?;
2545
2546 let applied_names = query_applied_names_turso(&conn, set).await?;
2547 validate_init_metadata(&applied_names, set)?;
2548
2549 let Some(first) = set.all().first() else {
2550 return Ok(());
2551 };
2552
2553 conn.execute(&set.record_migration_sql(first), ())
2554 .await
2555 .map_err(|e| CliError::MigrationError(e.to_string()))?;
2556
2557 Ok(())
2558}
2559
2560#[cfg(feature = "postgres-sync")]
2561fn init_postgres_sync_metadata(creds: &PostgresCreds, set: &Migrations) -> Result<(), CliError> {
2562 let url = creds.connection_url();
2563 let mut client = postgres::Client::connect(&url, postgres::NoTls)
2564 .map_err(|e| CliError::ConnectionError(format!("Failed to connect to PostgreSQL: {e}")))?;
2565
2566 if let Some(schema_sql) = set.create_schema_sql() {
2567 client
2568 .execute(&schema_sql, &[])
2569 .map_err(|e| CliError::MigrationError(e.to_string()))?;
2570 }
2571
2572 ensure_postgres_tracking_table_sync(&mut client, set)?;
2573
2574 let rows = client
2575 .query(&set.applied_names_sql(), &[])
2576 .unwrap_or_default();
2577 let applied_names: Vec<String> = rows.iter().filter_map(|r| r.try_get(0).ok()).collect();
2578
2579 validate_init_metadata(&applied_names, set)?;
2580
2581 let Some(first) = set.all().first() else {
2582 return Ok(());
2583 };
2584
2585 client
2586 .execute(&set.record_migration_sql(first), &[])
2587 .map_err(|e| CliError::MigrationError(e.to_string()))?;
2588
2589 Ok(())
2590}
2591
2592#[cfg(all(feature = "tokio-postgres", not(feature = "postgres-sync")))]
2593fn init_postgres_async_metadata(creds: &PostgresCreds, set: &Migrations) -> Result<(), CliError> {
2594 let rt = tokio::runtime::Builder::new_current_thread()
2595 .enable_all()
2596 .build()
2597 .map_err(|e| CliError::Other(format!("Failed to create async runtime: {}", e)))?;
2598
2599 rt.block_on(init_postgres_async_inner(creds, set))
2600}
2601
2602#[cfg(all(feature = "tokio-postgres", not(feature = "postgres-sync")))]
2603async fn init_postgres_async_inner(
2604 creds: &PostgresCreds,
2605 set: &Migrations,
2606) -> Result<(), CliError> {
2607 let url = creds.connection_url();
2608 let (client, connection) = tokio_postgres::connect(&url, tokio_postgres::NoTls)
2609 .await
2610 .map_err(|e| {
2611 CliError::ConnectionError(format!("Failed to connect to PostgreSQL: {}", e))
2612 })?;
2613
2614 tokio::spawn(async move {
2615 if let Err(e) = connection.await {
2616 eprintln!(
2617 "{}",
2618 output::err_line(&format!("PostgreSQL connection error: {e}"))
2619 );
2620 }
2621 });
2622
2623 if let Some(schema_sql) = set.create_schema_sql() {
2624 client
2625 .execute(&schema_sql, &[])
2626 .await
2627 .map_err(|e| CliError::MigrationError(e.to_string()))?;
2628 }
2629
2630 ensure_postgres_tracking_table_async(&client, set).await?;
2631
2632 let rows = client
2633 .query(&set.applied_names_sql(), &[])
2634 .await
2635 .unwrap_or_default();
2636 let applied_names: Vec<String> = rows.iter().filter_map(|r| r.try_get(0).ok()).collect();
2637
2638 validate_init_metadata(&applied_names, set)?;
2639
2640 let Some(first) = set.all().first() else {
2641 return Ok(());
2642 };
2643
2644 client
2645 .execute(&set.record_migration_sql(first), &[])
2646 .await
2647 .map_err(|e| CliError::MigrationError(e.to_string()))?;
2648
2649 Ok(())
2650}
2651
2652fn generate_introspect_migration(
2654 prev: &Snapshot,
2655 current: &Snapshot,
2656 breakpoints: bool,
2657) -> Result<Vec<String>, CliError> {
2658 match (prev, current) {
2659 (Snapshot::Sqlite(prev_snap), Snapshot::Sqlite(curr_snap)) => {
2660 use drizzle_migrations::sqlite::diff_snapshots;
2661 use drizzle_migrations::sqlite::statements::SqliteGenerator;
2662
2663 let diff = diff_snapshots(prev_snap, curr_snap);
2664 let generator = SqliteGenerator::new().with_breakpoints(breakpoints);
2665 Ok(generator.generate_migration(&diff))
2666 }
2667 (Snapshot::Postgres(prev_snap), Snapshot::Postgres(curr_snap)) => {
2668 use drizzle_migrations::postgres::diff_full_snapshots;
2669 use drizzle_migrations::postgres::statements::PostgresGenerator;
2670
2671 let diff = diff_full_snapshots(prev_snap, curr_snap);
2672 let generator = PostgresGenerator::new().with_breakpoints(breakpoints);
2673 Ok(generator.generate(&diff.diffs))
2674 }
2675 _ => Err(CliError::DialectMismatch),
2676 }
2677}
2678
2679fn format_migration_sql(sql_statements: &[String], breakpoints: bool) -> String {
2680 if sql_statements.is_empty() {
2681 "-- No tables to create (empty database)\n".to_string()
2682 } else if breakpoints {
2683 sql_statements.join("\n--> statement-breakpoint\n")
2684 } else {
2685 sql_statements.join("\n\n")
2686 }
2687}
2688
2689pub fn apply_snapshot_filters(
2697 snapshot: &mut Snapshot,
2698 dialect: Dialect,
2699 filters: &SnapshotFilters,
2700) -> Result<(), CliError> {
2701 if filters.is_empty() {
2702 return Ok(());
2703 }
2704
2705 match (dialect, snapshot) {
2706 (Dialect::Sqlite | Dialect::Turso, Snapshot::Sqlite(sqlite)) => {
2707 apply_sqlite_snapshot_filters(sqlite, filters)
2708 }
2709 (Dialect::Postgresql, Snapshot::Postgres(postgres)) => {
2710 apply_postgres_snapshot_filters(postgres, filters)
2711 }
2712 _ => Err(CliError::DialectMismatch),
2713 }
2714}
2715
2716fn apply_sqlite_snapshot_filters(
2717 snapshot: &mut drizzle_migrations::sqlite::SQLiteSnapshot,
2718 filters: &SnapshotFilters,
2719) -> Result<(), CliError> {
2720 use drizzle_types::sqlite::ddl::SqliteEntity;
2721 use std::collections::HashSet;
2722
2723 let table_patterns = compile_patterns(filters.tables.as_deref())?;
2724 if table_patterns.is_none() {
2725 return Ok(());
2726 }
2727
2728 let mut keep_tables: HashSet<String> = HashSet::new();
2729 for entity in &snapshot.ddl {
2730 if let SqliteEntity::Table(table) = entity
2731 && matches_patterns(table.name.as_ref(), table_patterns.as_deref())
2732 {
2733 keep_tables.insert(table.name.to_string());
2734 }
2735 }
2736
2737 snapshot.ddl.retain(|entity| match entity {
2738 SqliteEntity::Table(t) => keep_tables.contains(t.name.as_ref()),
2739 SqliteEntity::Column(c) => keep_tables.contains(c.table.as_ref()),
2740 SqliteEntity::Index(i) => keep_tables.contains(i.table.as_ref()),
2741 SqliteEntity::ForeignKey(fk) => {
2742 keep_tables.contains(fk.table.as_ref()) && keep_tables.contains(fk.table_to.as_ref())
2743 }
2744 SqliteEntity::PrimaryKey(pk) => keep_tables.contains(pk.table.as_ref()),
2745 SqliteEntity::UniqueConstraint(u) => keep_tables.contains(u.table.as_ref()),
2746 SqliteEntity::CheckConstraint(c) => keep_tables.contains(c.table.as_ref()),
2747 SqliteEntity::View(v) => matches_patterns(v.name.as_ref(), table_patterns.as_deref()),
2748 });
2749
2750 Ok(())
2751}
2752
2753fn apply_postgres_snapshot_filters(
2754 snapshot: &mut drizzle_migrations::postgres::PostgresSnapshot,
2755 filters: &SnapshotFilters,
2756) -> Result<(), CliError> {
2757 use drizzle_types::postgres::ddl::PostgresEntity;
2758 use std::collections::HashSet;
2759
2760 let schema_patterns = compile_patterns(filters.schemas.as_deref())?;
2761 let table_patterns = compile_patterns(filters.tables.as_deref())?;
2762 let exclude_postgis = filters
2763 .extensions
2764 .as_ref()
2765 .is_some_and(|v| v.contains(&Extension::Postgis));
2766
2767 let is_schema_allowed = |schema: &str| -> bool {
2768 if exclude_postgis && matches!(schema, "topology" | "tiger" | "tiger_data") {
2769 return false;
2770 }
2771 matches_patterns(schema, schema_patterns.as_deref())
2772 };
2773
2774 let mut keep_tables: HashSet<(String, String)> = HashSet::new();
2775 for entity in &snapshot.ddl {
2776 if let PostgresEntity::Table(table) = entity {
2777 let schema = table.schema.as_ref();
2778 let name = table.name.as_ref();
2779 if !is_schema_allowed(schema) {
2780 continue;
2781 }
2782 if exclude_postgis
2783 && matches!(
2784 name,
2785 "spatial_ref_sys"
2786 | "geometry_columns"
2787 | "geography_columns"
2788 | "raster_columns"
2789 | "raster_overviews"
2790 )
2791 {
2792 continue;
2793 }
2794
2795 if matches_patterns(name, table_patterns.as_deref()) {
2796 keep_tables.insert((schema.to_string(), name.to_string()));
2797 }
2798 }
2799 }
2800
2801 let mut keep_schemas: HashSet<String> = keep_tables.iter().map(|(s, _)| s.clone()).collect();
2802 if table_patterns.is_none() {
2803 for entity in &snapshot.ddl {
2804 if let PostgresEntity::Schema(s) = entity
2805 && is_schema_allowed(s.name.as_ref())
2806 {
2807 keep_schemas.insert(s.name.to_string());
2808 }
2809 }
2810 }
2811
2812 snapshot.ddl.retain(|entity| match entity {
2813 PostgresEntity::Schema(s) => keep_schemas.contains(s.name.as_ref()),
2814 PostgresEntity::Enum(e) => keep_schemas.contains(e.schema.as_ref()),
2815 PostgresEntity::Sequence(s) => keep_schemas.contains(s.schema.as_ref()),
2816 PostgresEntity::Role(_) | PostgresEntity::Privilege(_) => true,
2817 PostgresEntity::Policy(p) => {
2818 keep_tables.contains(&(p.schema.to_string(), p.table.to_string()))
2819 }
2820 PostgresEntity::Table(t) => {
2821 keep_tables.contains(&(t.schema.to_string(), t.name.to_string()))
2822 }
2823 PostgresEntity::Column(c) => {
2824 keep_tables.contains(&(c.schema.to_string(), c.table.to_string()))
2825 }
2826 PostgresEntity::Index(i) => {
2827 keep_tables.contains(&(i.schema.to_string(), i.table.to_string()))
2828 }
2829 PostgresEntity::ForeignKey(fk) => {
2830 keep_tables.contains(&(fk.schema.to_string(), fk.table.to_string()))
2831 && keep_tables.contains(&(fk.schema_to.to_string(), fk.table_to.to_string()))
2832 }
2833 PostgresEntity::PrimaryKey(pk) => {
2834 keep_tables.contains(&(pk.schema.to_string(), pk.table.to_string()))
2835 }
2836 PostgresEntity::UniqueConstraint(u) => {
2837 keep_tables.contains(&(u.schema.to_string(), u.table.to_string()))
2838 }
2839 PostgresEntity::CheckConstraint(c) => {
2840 keep_tables.contains(&(c.schema.to_string(), c.table.to_string()))
2841 }
2842 PostgresEntity::View(v) => {
2843 if !keep_schemas.contains(v.schema.as_ref()) {
2844 return false;
2845 }
2846 matches_patterns(v.name.as_ref(), table_patterns.as_deref())
2847 }
2848 });
2849
2850 Ok(())
2851}
2852
2853#[derive(Debug, Clone)]
2854struct FilterPattern {
2855 pattern: glob::Pattern,
2856 negated: bool,
2857}
2858
2859fn compile_patterns(patterns: Option<&[String]>) -> Result<Option<Vec<FilterPattern>>, CliError> {
2860 let Some(patterns) = patterns else {
2861 return Ok(None);
2862 };
2863 if patterns.is_empty() {
2864 return Ok(None);
2865 }
2866
2867 let mut compiled = Vec::with_capacity(patterns.len());
2868 for p in patterns {
2869 let raw = p.trim();
2870 let (negated, source) = raw
2871 .strip_prefix('!')
2872 .map_or((false, raw), |stripped| (true, stripped));
2873 if source.is_empty() {
2874 return Err(CliError::Other(format!(
2875 "invalid filter pattern '{p}': empty pattern"
2876 )));
2877 }
2878
2879 compiled.push(FilterPattern {
2880 pattern: glob::Pattern::new(source)
2881 .map_err(|e| CliError::Other(format!("invalid filter pattern '{p}': {e}")))?,
2882 negated,
2883 });
2884 }
2885 Ok(Some(compiled))
2886}
2887
2888fn matches_patterns(value: &str, patterns: Option<&[FilterPattern]>) -> bool {
2889 match patterns {
2890 None => true,
2891 Some(v) => {
2892 let has_positive = v.iter().any(|m| !m.negated);
2893 let mut matched_positive = false;
2894
2895 for matcher in v {
2896 if matcher.negated {
2897 if matcher.pattern.matches(value) {
2898 return false;
2899 }
2900 } else if matcher.pattern.matches(value) {
2901 matched_positive = true;
2902 }
2903 }
2904
2905 if has_positive { matched_positive } else { true }
2906 }
2907 }
2908}
2909
2910fn regenerate_schema_from_snapshot(
2911 result: &mut IntrospectResult,
2912 dialect: Dialect,
2913 introspect_casing: Option<IntrospectCasing>,
2914) {
2915 match (&result.snapshot, dialect) {
2916 (Snapshot::Sqlite(snap), Dialect::Sqlite | Dialect::Turso) => {
2917 use drizzle_migrations::sqlite::SQLiteDDL;
2918 use drizzle_migrations::sqlite::codegen::{
2919 CodegenOptions, FieldCasing, generate_rust_schema,
2920 };
2921
2922 let field_casing = match introspect_casing {
2923 Some(IntrospectCasing::Camel) => FieldCasing::Camel,
2924 Some(IntrospectCasing::Preserve) => FieldCasing::Preserve,
2925 None => FieldCasing::Snake,
2926 };
2927
2928 let ddl = SQLiteDDL::from_entities(snap.ddl.clone());
2929 let generated = generate_rust_schema(
2930 &ddl,
2931 &CodegenOptions {
2932 module_doc: Some("Schema introspected from filtered database objects".into()),
2933 include_schema: true,
2934 schema_name: "Schema".into(),
2935 use_pub: true,
2936 field_casing,
2937 },
2938 );
2939
2940 result.schema_code = generated.code;
2941 result.table_count = generated.tables.len();
2942 result.index_count = generated.indexes.len();
2943 result.view_count = ddl.views.list().len();
2944 result.warnings = generated.warnings;
2945 }
2946 (Snapshot::Postgres(snap), Dialect::Postgresql) => {
2947 use drizzle_migrations::postgres::PostgresDDL;
2948 use drizzle_migrations::postgres::codegen::{
2949 CodegenOptions, FieldCasing, generate_rust_schema,
2950 };
2951
2952 let field_casing = match introspect_casing {
2953 Some(IntrospectCasing::Camel) => FieldCasing::Camel,
2954 Some(IntrospectCasing::Preserve) => FieldCasing::Preserve,
2955 None => FieldCasing::Snake,
2956 };
2957
2958 let ddl = PostgresDDL::from_entities(snap.ddl.clone());
2959 let generated = generate_rust_schema(
2960 &ddl,
2961 &CodegenOptions {
2962 module_doc: Some("Schema introspected from filtered database objects".into()),
2963 include_schema: true,
2964 schema_name: "Schema".into(),
2965 use_pub: true,
2966 field_casing,
2967 },
2968 );
2969
2970 result.schema_code = generated.code;
2971 result.table_count = generated.tables.len();
2972 result.index_count = generated.indexes.len();
2973 result.view_count = generated.views.len();
2974 result.warnings = generated.warnings;
2975 }
2976 _ => {}
2977 }
2978}
2979
2980fn introspect_database(
2982 credentials: &Credentials,
2983 dialect: Dialect,
2984) -> Result<IntrospectResult, CliError> {
2985 match dialect {
2986 Dialect::Sqlite | Dialect::Turso => introspect_sqlite_dialect(credentials),
2987 Dialect::Postgresql => introspect_postgres_dialect(credentials),
2988 }
2989}
2990
2991fn introspect_sqlite_dialect(credentials: &Credentials) -> Result<IntrospectResult, CliError> {
2993 match credentials {
2994 #[cfg(feature = "rusqlite")]
2995 Credentials::Sqlite { path } => introspect_rusqlite(path),
2996
2997 #[cfg(not(feature = "rusqlite"))]
2998 Credentials::Sqlite { .. } => Err(CliError::MissingDriver {
2999 dialect: "SQLite",
3000 feature: "rusqlite",
3001 }),
3002
3003 #[cfg(any(feature = "libsql", feature = "turso"))]
3004 Credentials::Turso { url, auth_token } => {
3005 if is_local_libsql(url) {
3006 #[cfg(feature = "libsql")]
3007 {
3008 introspect_libsql_local(url)
3009 }
3010 #[cfg(not(feature = "libsql"))]
3011 {
3012 Err(CliError::MissingDriver {
3013 dialect: "LibSQL (local)",
3014 feature: "libsql",
3015 })
3016 }
3017 } else {
3018 #[cfg(feature = "turso")]
3019 {
3020 introspect_turso(url, auth_token.as_deref())
3021 }
3022 #[cfg(not(feature = "turso"))]
3023 {
3024 let _ = auth_token;
3025 Err(CliError::MissingDriver {
3026 dialect: "Turso (remote)",
3027 feature: "turso",
3028 })
3029 }
3030 }
3031 }
3032
3033 #[cfg(all(not(feature = "turso"), not(feature = "libsql")))]
3034 Credentials::Turso { .. } => Err(CliError::MissingDriver {
3035 dialect: "Turso",
3036 feature: "turso or libsql",
3037 }),
3038
3039 _ => Err(CliError::Other(
3040 "SQLite introspection requires sqlite path or turso credentials".into(),
3041 )),
3042 }
3043}
3044
3045fn introspect_postgres_dialect(credentials: &Credentials) -> Result<IntrospectResult, CliError> {
3047 match credentials {
3048 Credentials::Postgres(creds) => {
3049 let _ = creds;
3050 core::cfg_select! {
3051 feature = "postgres-sync" => introspect_postgres_sync(creds),
3052 feature = "tokio-postgres" => introspect_postgres_async(creds),
3053 _ => Err(CliError::MissingDriver {
3054 dialect: "PostgreSQL",
3055 feature: "postgres-sync or tokio-postgres",
3056 }),
3057 }
3058 }
3059
3060 _ => Err(CliError::Other(
3061 "PostgreSQL introspection requires postgres credentials".into(),
3062 )),
3063 }
3064}
3065
3066#[cfg(any(feature = "rusqlite", feature = "libsql", feature = "turso"))]
3071struct SqliteRawData {
3072 tables: Vec<(String, Option<String>)>,
3073 raw_columns: Vec<drizzle_migrations::sqlite::introspect::RawColumnInfo>,
3074 all_indexes: Vec<drizzle_migrations::sqlite::introspect::RawIndexInfo>,
3075 all_index_columns: Vec<drizzle_migrations::sqlite::introspect::RawIndexColumn>,
3076 all_fks: Vec<drizzle_migrations::sqlite::introspect::RawForeignKey>,
3077 all_views: Vec<drizzle_migrations::sqlite::introspect::RawViewInfo>,
3078}
3079
3080#[cfg(any(feature = "rusqlite", feature = "libsql", feature = "turso"))]
3081impl SqliteRawData {
3082 const fn empty() -> Self {
3083 Self {
3084 tables: Vec::new(),
3085 raw_columns: Vec::new(),
3086 all_indexes: Vec::new(),
3087 all_index_columns: Vec::new(),
3088 all_fks: Vec::new(),
3089 all_views: Vec::new(),
3090 }
3091 }
3092}
3093
3094#[cfg(feature = "rusqlite")]
3095fn query_rusqlite_tables_and_columns(
3096 conn: &rusqlite::Connection,
3097 raw: &mut SqliteRawData,
3098) -> Result<(), CliError> {
3099 use drizzle_migrations::sqlite::introspect::{RawColumnInfo, queries};
3100
3101 let mut tables_stmt = conn
3102 .prepare(queries::TABLES_QUERY)
3103 .map_err(|e| CliError::Other(format!("Failed to prepare tables query: {e}")))?;
3104
3105 raw.tables = tables_stmt
3106 .query_map([], |row| Ok((row.get(0)?, row.get(1)?)))
3107 .map_err(|e| CliError::Other(e.to_string()))?
3108 .filter_map(Result::ok)
3109 .collect();
3110
3111 let mut columns_stmt = conn
3112 .prepare(queries::COLUMNS_QUERY)
3113 .map_err(|e| CliError::Other(format!("Failed to prepare columns query: {e}")))?;
3114
3115 raw.raw_columns = columns_stmt
3116 .query_map([], |row| {
3117 Ok(RawColumnInfo {
3118 table: row.get(0)?,
3119 cid: row.get(1)?,
3120 name: row.get(2)?,
3121 column_type: row.get(3)?,
3122 not_null: row.get(4)?,
3123 default_value: row.get(5)?,
3124 pk: row.get(6)?,
3125 hidden: row.get(7)?,
3126 sql: row.get(8)?,
3127 })
3128 })
3129 .map_err(|e| CliError::Other(e.to_string()))?
3130 .filter_map(Result::ok)
3131 .collect();
3132
3133 Ok(())
3134}
3135
3136#[cfg(feature = "rusqlite")]
3137fn query_rusqlite_per_table(
3138 conn: &rusqlite::Connection,
3139 raw: &mut SqliteRawData,
3140) -> Result<(), CliError> {
3141 use drizzle_migrations::sqlite::introspect::{
3142 RawForeignKey, RawIndexColumn, RawIndexInfo, queries,
3143 };
3144
3145 let table_names: Vec<String> = raw.tables.iter().map(|(n, _)| n.clone()).collect();
3147
3148 for table_name in &table_names {
3149 if let Ok(mut idx_stmt) = conn.prepare(&queries::indexes_query(table_name)) {
3150 let indexes: Vec<RawIndexInfo> = idx_stmt
3151 .query_map([], |row| {
3152 Ok(RawIndexInfo {
3153 table: table_name.clone(),
3154 name: row.get(1)?,
3155 unique: row.get::<_, i32>(2)? != 0,
3156 origin: row.get(3)?,
3157 partial: row.get::<_, i32>(4)? != 0,
3158 })
3159 })
3160 .map_err(|e| CliError::Other(e.to_string()))?
3161 .filter_map(Result::ok)
3162 .collect();
3163
3164 for idx in &indexes {
3165 if let Ok(mut col_stmt) = conn.prepare(&queries::index_info_query(&idx.name))
3166 && let Ok(col_iter) = col_stmt.query_map([], |row| {
3167 Ok(RawIndexColumn {
3168 index_name: idx.name.clone(),
3169 seqno: row.get(0)?,
3170 cid: row.get(1)?,
3171 name: row.get(2)?,
3172 desc: row.get::<_, i32>(3)? != 0,
3173 coll: row.get(4)?,
3174 key: row.get::<_, i32>(5)? != 0,
3175 })
3176 })
3177 {
3178 raw.all_index_columns
3179 .extend(col_iter.filter_map(Result::ok));
3180 }
3181 }
3182 raw.all_indexes.extend(indexes);
3183 }
3184
3185 if let Ok(mut fk_stmt) = conn.prepare(&queries::foreign_keys_query(table_name))
3186 && let Ok(fk_iter) = fk_stmt.query_map([], |row| {
3187 Ok(RawForeignKey {
3188 table: table_name.clone(),
3189 id: row.get(0)?,
3190 seq: row.get(1)?,
3191 to_table: row.get(2)?,
3192 from_column: row.get(3)?,
3193 to_column: row.get(4)?,
3194 on_update: row.get(5)?,
3195 on_delete: row.get(6)?,
3196 r#match: row.get(7)?,
3197 })
3198 })
3199 {
3200 raw.all_fks.extend(fk_iter.filter_map(Result::ok));
3201 }
3202 }
3203
3204 Ok(())
3205}
3206
3207#[cfg(feature = "rusqlite")]
3208fn query_rusqlite_views_and_view_columns(conn: &rusqlite::Connection, raw: &mut SqliteRawData) {
3209 use drizzle_migrations::sqlite::introspect::{RawColumnInfo, RawViewInfo, queries};
3210
3211 if let Ok(mut views_stmt) = conn.prepare(queries::VIEWS_QUERY)
3212 && let Ok(view_iter) = views_stmt.query_map([], |row| {
3213 Ok(RawViewInfo {
3214 name: row.get(0)?,
3215 sql: row.get(1)?,
3216 })
3217 })
3218 {
3219 raw.all_views.extend(view_iter.filter_map(Result::ok));
3220 }
3221
3222 if let Ok(mut view_cols_stmt) = conn.prepare(queries::VIEW_COLUMNS_QUERY)
3223 && let Ok(col_iter) = view_cols_stmt.query_map([], |row| {
3224 Ok(RawColumnInfo {
3225 table: row.get(0)?,
3226 cid: row.get(1)?,
3227 name: row.get(2)?,
3228 column_type: row.get(3)?,
3229 not_null: row.get::<_, i32>(4)? != 0,
3230 default_value: row.get(5)?,
3231 pk: row.get(6)?,
3232 hidden: row.get(7)?,
3233 sql: row.get(8)?,
3234 })
3235 })
3236 {
3237 raw.raw_columns.extend(col_iter.filter_map(Result::ok));
3238 }
3239}
3240
3241#[cfg(feature = "rusqlite")]
3242fn query_rusqlite_raw(conn: &rusqlite::Connection) -> Result<SqliteRawData, CliError> {
3243 let mut raw = SqliteRawData::empty();
3244 query_rusqlite_tables_and_columns(conn, &mut raw)?;
3245 query_rusqlite_per_table(conn, &mut raw)?;
3246 query_rusqlite_views_and_view_columns(conn, &mut raw);
3247 Ok(raw)
3248}
3249
3250#[cfg(any(feature = "rusqlite", feature = "libsql", feature = "turso"))]
3251fn build_sqlite_ddl(raw: SqliteRawData) -> drizzle_migrations::sqlite::SQLiteDDL {
3252 use drizzle_migrations::sqlite::{
3253 SQLiteDDL, Table, View,
3254 introspect::{
3255 parse_generated_columns_from_table_sql, parse_view_sql, process_columns,
3256 process_foreign_keys, process_indexes,
3257 },
3258 };
3259 use std::collections::{HashMap, HashSet};
3260
3261 let table_sql_map: HashMap<String, String> = raw
3262 .tables
3263 .iter()
3264 .filter_map(|(name, sql)| sql.as_ref().map(|s| (name.clone(), s.clone())))
3265 .collect();
3266
3267 let mut generated_columns: HashMap<String, drizzle_migrations::sqlite::ddl::ParsedGenerated> =
3268 HashMap::new();
3269 for (table, sql) in &table_sql_map {
3270 generated_columns.extend(parse_generated_columns_from_table_sql(table, sql));
3271 }
3272 let pk_columns: HashSet<(String, String)> = raw
3273 .raw_columns
3274 .iter()
3275 .filter(|c| c.pk > 0)
3276 .map(|c| (c.table.clone(), c.name.clone()))
3277 .collect();
3278
3279 let (columns, primary_keys) =
3280 process_columns(&raw.raw_columns, &generated_columns, &pk_columns);
3281 let indexes = process_indexes(&raw.all_indexes, &raw.all_index_columns, &table_sql_map);
3282 let foreign_keys = process_foreign_keys(&raw.all_fks);
3283 let uniques = process_sqlite_uniques_from_indexes(&raw.all_indexes, &raw.all_index_columns);
3284
3285 let mut ddl = SQLiteDDL::new();
3286
3287 for (table_name, table_sql) in &raw.tables {
3288 let mut table = Table::new(table_name.clone());
3289 if let Some(sql) = table_sql {
3290 let sql_upper = sql.to_uppercase();
3291 table.strict = sql_upper.contains(" STRICT");
3292 table.without_rowid = sql_upper.contains("WITHOUT ROWID");
3293 }
3294 ddl.tables.push(table);
3295 }
3296
3297 for col in columns {
3298 ddl.columns.push(col);
3299 }
3300 for idx in indexes {
3301 ddl.indexes.push(idx);
3302 }
3303 for fk in foreign_keys {
3304 ddl.fks.push(fk);
3305 }
3306 for pk in primary_keys {
3307 ddl.pks.push(pk);
3308 }
3309 for u in uniques {
3310 ddl.uniques.push(u);
3311 }
3312
3313 for v in raw.all_views {
3314 let mut view = View::new(v.name);
3315 if let Some(def) = parse_view_sql(&v.sql) {
3316 view.definition = Some(def.into());
3317 } else {
3318 view.error = Some("Failed to parse view SQL".into());
3319 }
3320 ddl.views.push(view);
3321 }
3322
3323 ddl
3324}
3325
3326#[cfg(feature = "rusqlite")]
3327fn introspect_rusqlite(path: &str) -> Result<IntrospectResult, CliError> {
3328 use drizzle_migrations::sqlite::codegen::{CodegenOptions, FieldCasing, generate_rust_schema};
3329
3330 let conn = rusqlite::Connection::open(path).map_err(|e| {
3331 CliError::ConnectionError(format!("Failed to open SQLite database '{path}': {e}"))
3332 })?;
3333
3334 let raw = query_rusqlite_raw(&conn)?;
3335 let ddl = build_sqlite_ddl(raw);
3336
3337 let options = CodegenOptions {
3338 module_doc: Some(format!("Schema introspected from {path}")),
3339 include_schema: true,
3340 schema_name: "Schema".to_string(),
3341 use_pub: true,
3342 field_casing: FieldCasing::default(),
3343 };
3344
3345 let generated = generate_rust_schema(&ddl, &options);
3346
3347 let mut sqlite_snapshot = drizzle_migrations::sqlite::SQLiteSnapshot::new();
3348 for entity in ddl.to_entities() {
3349 sqlite_snapshot.add_entity(entity);
3350 }
3351 let snapshot = Snapshot::Sqlite(sqlite_snapshot);
3352
3353 Ok(IntrospectResult {
3354 schema_code: generated.code,
3355 table_count: generated.tables.len(),
3356 index_count: generated.indexes.len(),
3357 view_count: ddl.views.len(),
3358 warnings: generated.warnings,
3359 snapshot,
3360 snapshot_path: std::path::PathBuf::new(),
3361 })
3362}
3363
3364#[cfg(feature = "libsql")]
3369fn introspect_libsql_local(path: &str) -> Result<IntrospectResult, CliError> {
3370 let rt = tokio::runtime::Builder::new_current_thread()
3371 .enable_all()
3372 .build()
3373 .map_err(|e| CliError::Other(format!("Failed to create async runtime: {e}")))?;
3374
3375 rt.block_on(introspect_libsql_inner(path, None))
3376}
3377
3378#[cfg(any(feature = "libsql", feature = "turso"))]
3379async fn query_libsql_tables_and_columns(
3380 conn: &libsql::Connection,
3381) -> Result<
3382 (
3383 Vec<(String, Option<String>)>,
3384 Vec<drizzle_migrations::sqlite::introspect::RawColumnInfo>,
3385 ),
3386 CliError,
3387> {
3388 use drizzle_migrations::sqlite::introspect::{RawColumnInfo, queries};
3389
3390 let mut tables_rows = conn
3391 .query(queries::TABLES_QUERY, ())
3392 .await
3393 .map_err(|e| CliError::Other(format!("Failed to query tables: {e}")))?;
3394
3395 let mut tables: Vec<(String, Option<String>)> = Vec::new();
3396 while let Ok(Some(row)) = tables_rows.next().await {
3397 let name: String = row.get(0).unwrap_or_default();
3398 let sql: Option<String> = row.get(1).ok();
3399 tables.push((name, sql));
3400 }
3401
3402 let mut columns_rows = conn
3403 .query(queries::COLUMNS_QUERY, ())
3404 .await
3405 .map_err(|e| CliError::Other(format!("Failed to query columns: {e}")))?;
3406
3407 let mut raw_columns: Vec<RawColumnInfo> = Vec::new();
3408 while let Ok(Some(row)) = columns_rows.next().await {
3409 raw_columns.push(RawColumnInfo {
3410 table: row.get(0).unwrap_or_default(),
3411 cid: row.get(1).unwrap_or(0),
3412 name: row.get(2).unwrap_or_default(),
3413 column_type: row.get(3).unwrap_or_default(),
3414 not_null: row.get::<i32>(4).unwrap_or(0) != 0,
3415 default_value: row.get(5).ok(),
3416 pk: row.get(6).unwrap_or(0),
3417 hidden: row.get(7).unwrap_or(0),
3418 sql: row.get(8).ok(),
3419 });
3420 }
3421
3422 Ok((tables, raw_columns))
3423}
3424
3425#[cfg(any(feature = "libsql", feature = "turso"))]
3426async fn query_libsql_per_table(
3427 conn: &libsql::Connection,
3428 tables: &[(String, Option<String>)],
3429) -> (
3430 Vec<drizzle_migrations::sqlite::introspect::RawIndexInfo>,
3431 Vec<drizzle_migrations::sqlite::introspect::RawIndexColumn>,
3432 Vec<drizzle_migrations::sqlite::introspect::RawForeignKey>,
3433) {
3434 use drizzle_migrations::sqlite::introspect::{
3435 RawForeignKey, RawIndexColumn, RawIndexInfo, queries,
3436 };
3437
3438 let mut all_indexes: Vec<RawIndexInfo> = Vec::new();
3439 let mut all_index_columns: Vec<RawIndexColumn> = Vec::new();
3440 let mut all_fks: Vec<RawForeignKey> = Vec::new();
3441
3442 for (table_name, _) in tables {
3443 if let Ok(mut idx_rows) = conn.query(&queries::indexes_query(table_name), ()).await {
3444 while let Ok(Some(row)) = idx_rows.next().await {
3445 let idx = RawIndexInfo {
3446 table: table_name.clone(),
3447 name: row.get(1).unwrap_or_default(),
3448 unique: row.get::<i32>(2).unwrap_or(0) != 0,
3449 origin: row.get(3).unwrap_or_default(),
3450 partial: row.get::<i32>(4).unwrap_or(0) != 0,
3451 };
3452
3453 if let Ok(mut col_rows) =
3454 conn.query(&queries::index_info_query(&idx.name), ()).await
3455 {
3456 while let Ok(Some(col_row)) = col_rows.next().await {
3457 all_index_columns.push(RawIndexColumn {
3458 index_name: idx.name.clone(),
3459 seqno: col_row.get(0).unwrap_or(0),
3460 cid: col_row.get(1).unwrap_or(0),
3461 name: col_row.get(2).ok(),
3462 desc: col_row.get::<i32>(3).unwrap_or(0) != 0,
3463 coll: col_row.get(4).unwrap_or_default(),
3464 key: col_row.get::<i32>(5).unwrap_or(0) != 0,
3465 });
3466 }
3467 }
3468
3469 all_indexes.push(idx);
3470 }
3471 }
3472
3473 if let Ok(mut fk_rows) = conn
3474 .query(&queries::foreign_keys_query(table_name), ())
3475 .await
3476 {
3477 while let Ok(Some(row)) = fk_rows.next().await {
3478 all_fks.push(RawForeignKey {
3479 table: table_name.clone(),
3480 id: row.get(0).unwrap_or(0),
3481 seq: row.get(1).unwrap_or(0),
3482 to_table: row.get(2).unwrap_or_default(),
3483 from_column: row.get(3).unwrap_or_default(),
3484 to_column: row.get(4).unwrap_or_default(),
3485 on_update: row.get(5).unwrap_or_default(),
3486 on_delete: row.get(6).unwrap_or_default(),
3487 r#match: row.get(7).unwrap_or_default(),
3488 });
3489 }
3490 }
3491 }
3492
3493 (all_indexes, all_index_columns, all_fks)
3494}
3495
3496#[cfg(any(feature = "libsql", feature = "turso"))]
3497async fn query_libsql_views_and_view_columns(
3498 conn: &libsql::Connection,
3499 raw_columns: &mut Vec<drizzle_migrations::sqlite::introspect::RawColumnInfo>,
3500) -> Vec<drizzle_migrations::sqlite::introspect::RawViewInfo> {
3501 use drizzle_migrations::sqlite::introspect::{RawColumnInfo, RawViewInfo, queries};
3502
3503 let mut all_views: Vec<RawViewInfo> = Vec::new();
3504 if let Ok(mut views_rows) = conn.query(queries::VIEWS_QUERY, ()).await {
3505 while let Ok(Some(row)) = views_rows.next().await {
3506 let name: String = row.get(0).unwrap_or_default();
3507 let sql: String = row.get(1).unwrap_or_default();
3508 all_views.push(RawViewInfo { name, sql });
3509 }
3510 }
3511
3512 if let Ok(mut view_cols_rows) = conn.query(queries::VIEW_COLUMNS_QUERY, ()).await {
3513 while let Ok(Some(row)) = view_cols_rows.next().await {
3514 raw_columns.push(RawColumnInfo {
3515 table: row.get(0).unwrap_or_default(),
3516 cid: row.get(1).unwrap_or(0),
3517 name: row.get(2).unwrap_or_default(),
3518 column_type: row.get(3).unwrap_or_default(),
3519 not_null: row.get::<i32>(4).unwrap_or(0) != 0,
3520 default_value: row.get(5).ok(),
3521 pk: row.get(6).unwrap_or(0),
3522 hidden: row.get(7).unwrap_or(0),
3523 sql: row.get(8).ok(),
3524 });
3525 }
3526 }
3527
3528 all_views
3529}
3530
3531#[cfg(any(feature = "libsql", feature = "turso"))]
3532async fn query_libsql_raw(conn: &libsql::Connection) -> Result<SqliteRawData, CliError> {
3533 let (tables, mut raw_columns) = query_libsql_tables_and_columns(conn).await?;
3534 let (all_indexes, all_index_columns, all_fks) = query_libsql_per_table(conn, &tables).await;
3535 let all_views = query_libsql_views_and_view_columns(conn, &mut raw_columns).await;
3536
3537 Ok(SqliteRawData {
3538 tables,
3539 raw_columns,
3540 all_indexes,
3541 all_index_columns,
3542 all_fks,
3543 all_views,
3544 })
3545}
3546
3547#[cfg(feature = "libsql")]
3548async fn introspect_libsql_inner(
3549 path: &str,
3550 _auth_token: Option<&str>,
3551) -> Result<IntrospectResult, CliError> {
3552 use drizzle_migrations::sqlite::codegen::{CodegenOptions, FieldCasing, generate_rust_schema};
3553
3554 let db = libsql::Builder::new_local(path)
3555 .build()
3556 .await
3557 .map_err(|e| {
3558 CliError::ConnectionError(format!("Failed to open LibSQL database '{path}': {e}"))
3559 })?;
3560
3561 let conn = db
3562 .connect()
3563 .map_err(|e| CliError::ConnectionError(e.to_string()))?;
3564
3565 let raw = query_libsql_raw(&conn).await?;
3566 let ddl = build_sqlite_ddl(raw);
3567
3568 let options = CodegenOptions {
3569 module_doc: Some(format!("Schema introspected from {path}")),
3570 include_schema: true,
3571 schema_name: "Schema".to_string(),
3572 use_pub: true,
3573 field_casing: FieldCasing::default(),
3574 };
3575
3576 let generated = generate_rust_schema(&ddl, &options);
3577
3578 let mut sqlite_snapshot = drizzle_migrations::sqlite::SQLiteSnapshot::new();
3579 for entity in ddl.to_entities() {
3580 sqlite_snapshot.add_entity(entity);
3581 }
3582 let snapshot = Snapshot::Sqlite(sqlite_snapshot);
3583
3584 Ok(IntrospectResult {
3585 schema_code: generated.code,
3586 table_count: generated.tables.len(),
3587 index_count: generated.indexes.len(),
3588 view_count: ddl.views.len(),
3589 warnings: generated.warnings,
3590 snapshot,
3591 snapshot_path: std::path::PathBuf::new(),
3592 })
3593}
3594
3595#[cfg(feature = "turso")]
3600fn introspect_turso(url: &str, auth_token: Option<&str>) -> Result<IntrospectResult, CliError> {
3601 let rt = tokio::runtime::Builder::new_current_thread()
3602 .enable_all()
3603 .build()
3604 .map_err(|e| CliError::Other(format!("Failed to create async runtime: {e}")))?;
3605
3606 rt.block_on(introspect_turso_inner(url, auth_token))
3607}
3608
3609#[cfg(feature = "turso")]
3610async fn introspect_turso_inner(
3611 url: &str,
3612 auth_token: Option<&str>,
3613) -> Result<IntrospectResult, CliError> {
3614 use drizzle_migrations::sqlite::codegen::{CodegenOptions, FieldCasing, generate_rust_schema};
3615
3616 let builder =
3617 libsql::Builder::new_remote(url.to_string(), auth_token.unwrap_or("").to_string());
3618
3619 let db = builder.build().await.map_err(|e| {
3620 CliError::ConnectionError(format!("Failed to connect to Turso '{url}': {e}"))
3621 })?;
3622
3623 let conn = db
3624 .connect()
3625 .map_err(|e| CliError::ConnectionError(e.to_string()))?;
3626
3627 let raw = query_libsql_raw(&conn).await?;
3628 let ddl = build_sqlite_ddl(raw);
3629
3630 let options = CodegenOptions {
3631 module_doc: Some(format!("Schema introspected from Turso: {url}")),
3632 include_schema: true,
3633 schema_name: "Schema".to_string(),
3634 use_pub: true,
3635 field_casing: FieldCasing::default(),
3636 };
3637
3638 let generated = generate_rust_schema(&ddl, &options);
3639
3640 let mut sqlite_snapshot = drizzle_migrations::sqlite::SQLiteSnapshot::new();
3641 for entity in ddl.to_entities() {
3642 sqlite_snapshot.add_entity(entity);
3643 }
3644 let snapshot = Snapshot::Sqlite(sqlite_snapshot);
3645
3646 Ok(IntrospectResult {
3647 schema_code: generated.code,
3648 table_count: generated.tables.len(),
3649 index_count: generated.indexes.len(),
3650 view_count: ddl.views.len(),
3651 warnings: generated.warnings,
3652 snapshot,
3653 snapshot_path: std::path::PathBuf::new(),
3654 })
3655}
3656
3657#[cfg(feature = "postgres-sync")]
3662fn introspect_postgres_sync(creds: &PostgresCreds) -> Result<IntrospectResult, CliError> {
3663 let url = creds.connection_url();
3664 let mut client = postgres::Client::connect(&url, postgres::NoTls)
3665 .map_err(|e| CliError::ConnectionError(format!("Failed to connect to PostgreSQL: {e}")))?;
3666
3667 let raw = query_postgres_sync_raw(&mut client)?;
3668 let ddl = build_postgres_ddl(raw);
3669
3670 Ok(finalize_postgres_introspection(&ddl, &url))
3671}
3672
3673#[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
3674impl PostgresRawData {
3675 const fn empty() -> Self {
3676 Self {
3677 schemas: Vec::new(),
3678 tables: Vec::new(),
3679 columns: Vec::new(),
3680 enums: Vec::new(),
3681 sequences: Vec::new(),
3682 views: Vec::new(),
3683 indexes: Vec::new(),
3684 foreign_keys: Vec::new(),
3685 primary_keys: Vec::new(),
3686 uniques: Vec::new(),
3687 checks: Vec::new(),
3688 roles: Vec::new(),
3689 policies: Vec::new(),
3690 }
3691 }
3692}
3693
3694#[cfg(feature = "postgres-sync")]
3695fn query_postgres_sync_raw(client: &mut postgres::Client) -> Result<PostgresRawData, CliError> {
3696 let mut raw = PostgresRawData::empty();
3697 query_pg_sync_core(client, &mut raw)?;
3698 query_pg_sync_codegen_meta(client, &mut raw)?;
3699 query_pg_sync_constraints(client, &mut raw)?;
3700 query_pg_sync_security(client, &mut raw)?;
3701 Ok(raw)
3702}
3703
3704#[cfg(feature = "postgres-sync")]
3705fn query_pg_sync_core(
3706 client: &mut postgres::Client,
3707 raw: &mut PostgresRawData,
3708) -> Result<(), CliError> {
3709 use drizzle_migrations::postgres::introspect::{RawColumnInfo, RawTableInfo, queries};
3710
3711 raw.schemas = client
3712 .query(queries::SCHEMAS_QUERY, &[])
3713 .map_err(|e| CliError::Other(format!("Failed to query schemas: {e}")))?
3714 .into_iter()
3715 .map(|row| RawSchemaInfo {
3716 name: row.get::<_, String>(0),
3717 })
3718 .collect();
3719
3720 raw.tables = client
3721 .query(queries::TABLES_QUERY, &[])
3722 .map_err(|e| CliError::Other(format!("Failed to query tables: {e}")))?
3723 .into_iter()
3724 .map(|row| RawTableInfo {
3725 schema: row.get::<_, String>(0),
3726 name: row.get::<_, String>(1),
3727 is_rls_enabled: row.get::<_, bool>(2),
3728 })
3729 .collect();
3730
3731 raw.columns = client
3732 .query(queries::COLUMNS_QUERY, &[])
3733 .map_err(|e| CliError::Other(format!("Failed to query columns: {e}")))?
3734 .into_iter()
3735 .map(|row| RawColumnInfo {
3736 schema: row.get::<_, String>(0),
3737 table: row.get::<_, String>(1),
3738 name: row.get::<_, String>(2),
3739 column_type: row.get::<_, String>(3),
3740 type_schema: row.get::<_, Option<String>>(4),
3741 not_null: row.get::<_, bool>(5),
3742 default_value: row.get::<_, Option<String>>(6),
3743 is_identity: row.get::<_, bool>(7),
3744 identity_type: row.get::<_, Option<String>>(8),
3745 is_generated: row.get::<_, bool>(9),
3746 generated_expression: row.get::<_, Option<String>>(10),
3747 ordinal_position: row.get::<_, i32>(11),
3748 })
3749 .collect();
3750
3751 Ok(())
3752}
3753
3754#[cfg(feature = "postgres-sync")]
3755fn query_pg_sync_codegen_meta(
3756 client: &mut postgres::Client,
3757 raw: &mut PostgresRawData,
3758) -> Result<(), CliError> {
3759 use drizzle_migrations::postgres::introspect::{
3760 RawEnumInfo, RawSequenceInfo, RawViewInfo, queries,
3761 };
3762
3763 raw.enums = client
3764 .query(queries::ENUMS_QUERY, &[])
3765 .map_err(|e| CliError::Other(format!("Failed to query enums: {e}")))?
3766 .into_iter()
3767 .map(|row| RawEnumInfo {
3768 schema: row.get::<_, String>(0),
3769 name: row.get::<_, String>(1),
3770 values: row.get::<_, Vec<String>>(2),
3771 })
3772 .collect();
3773
3774 raw.sequences = client
3775 .query(queries::SEQUENCES_QUERY, &[])
3776 .map_err(|e| CliError::Other(format!("Failed to query sequences: {e}")))?
3777 .into_iter()
3778 .map(|row| RawSequenceInfo {
3779 schema: row.get::<_, String>(0),
3780 name: row.get::<_, String>(1),
3781 data_type: row.get::<_, Option<String>>(2),
3782 start_value: row.get::<_, Option<String>>(3),
3783 min_value: row.get::<_, Option<String>>(4),
3784 max_value: row.get::<_, Option<String>>(5),
3785 increment: row.get::<_, Option<String>>(6),
3786 cycle: row.get::<_, Option<bool>>(7),
3787 cache_value: row.get::<_, Option<String>>(8),
3788 })
3789 .collect();
3790
3791 let view_schema_filters: Option<Vec<String>> = None;
3792 raw.views = client
3793 .query(queries::VIEWS_QUERY, &[&view_schema_filters])
3794 .map_err(|e| CliError::Other(format!("Failed to query views: {e}")))?
3795 .into_iter()
3796 .map(|row| RawViewInfo {
3797 schema: row.get::<_, String>(0),
3798 name: row.get::<_, String>(1),
3799 definition: row.get::<_, String>(2),
3800 is_materialized: row.get::<_, bool>(3),
3801 })
3802 .collect();
3803
3804 Ok(())
3805}
3806
3807#[cfg(feature = "postgres-sync")]
3808fn query_pg_sync_constraints(
3809 client: &mut postgres::Client,
3810 raw: &mut PostgresRawData,
3811) -> Result<(), CliError> {
3812 use drizzle_migrations::postgres::introspect::{
3813 RawCheckInfo, RawForeignKeyInfo, RawIndexInfo, RawPrimaryKeyInfo, RawUniqueInfo,
3814 parse_index_columns, pg_action_code_to_string, queries,
3815 };
3816
3817 raw.indexes = client
3818 .query(queries::INDEXES_QUERY, &[])
3819 .map_err(|e| CliError::Other(format!("Failed to query indexes: {e}")))?
3820 .into_iter()
3821 .map(|row| {
3822 let cols: Vec<String> = row.get(6);
3823 RawIndexInfo {
3824 schema: row.get::<_, String>(0),
3825 table: row.get::<_, String>(1),
3826 name: row.get::<_, String>(2),
3827 is_unique: row.get::<_, bool>(3),
3828 is_primary: row.get::<_, bool>(4),
3829 method: row.get::<_, String>(5),
3830 columns: parse_index_columns(cols),
3831 where_clause: row.get::<_, Option<String>>(7),
3832 concurrent: false,
3833 }
3834 })
3835 .collect();
3836
3837 raw.foreign_keys = client
3838 .query(queries::FOREIGN_KEYS_QUERY, &[])
3839 .map_err(|e| CliError::Other(format!("Failed to query foreign keys: {e}")))?
3840 .into_iter()
3841 .map(|row| RawForeignKeyInfo {
3842 schema: row.get::<_, String>(0),
3843 table: row.get::<_, String>(1),
3844 name: row.get::<_, String>(2),
3845 columns: row.get::<_, Vec<String>>(3),
3846 schema_to: row.get::<_, String>(4),
3847 table_to: row.get::<_, String>(5),
3848 columns_to: row.get::<_, Vec<String>>(6),
3849 on_update: pg_action_code_to_string(&row.get::<_, String>(7)),
3850 on_delete: pg_action_code_to_string(&row.get::<_, String>(8)),
3851 })
3852 .collect();
3853
3854 raw.primary_keys = client
3855 .query(queries::PRIMARY_KEYS_QUERY, &[])
3856 .map_err(|e| CliError::Other(format!("Failed to query primary keys: {e}")))?
3857 .into_iter()
3858 .map(|row| RawPrimaryKeyInfo {
3859 schema: row.get::<_, String>(0),
3860 table: row.get::<_, String>(1),
3861 name: row.get::<_, String>(2),
3862 columns: row.get::<_, Vec<String>>(3),
3863 })
3864 .collect();
3865
3866 raw.uniques = client
3867 .query(queries::UNIQUES_QUERY, &[])
3868 .map_err(|e| CliError::Other(format!("Failed to query unique constraints: {e}")))?
3869 .into_iter()
3870 .map(|row| RawUniqueInfo {
3871 schema: row.get::<_, String>(0),
3872 table: row.get::<_, String>(1),
3873 name: row.get::<_, String>(2),
3874 columns: row.get::<_, Vec<String>>(3),
3875 nulls_not_distinct: row.get::<_, bool>(4),
3876 })
3877 .collect();
3878
3879 raw.checks = client
3880 .query(queries::CHECKS_QUERY, &[])
3881 .map_err(|e| CliError::Other(format!("Failed to query check constraints: {e}")))?
3882 .into_iter()
3883 .map(|row| RawCheckInfo {
3884 schema: row.get::<_, String>(0),
3885 table: row.get::<_, String>(1),
3886 name: row.get::<_, String>(2),
3887 expression: row.get::<_, String>(3),
3888 })
3889 .collect();
3890
3891 Ok(())
3892}
3893
3894#[cfg(feature = "postgres-sync")]
3895fn query_pg_sync_security(
3896 client: &mut postgres::Client,
3897 raw: &mut PostgresRawData,
3898) -> Result<(), CliError> {
3899 use drizzle_migrations::postgres::introspect::{RawPolicyInfo, RawRoleInfo, queries};
3900
3901 raw.roles = client
3902 .query(queries::ROLES_QUERY, &[])
3903 .map_err(|e| CliError::Other(format!("Failed to query roles: {e}")))?
3904 .into_iter()
3905 .map(|row| RawRoleInfo {
3906 name: row.get::<_, String>(0),
3907 create_db: row.get::<_, bool>(1),
3908 create_role: row.get::<_, bool>(2),
3909 inherit: row.get::<_, bool>(3),
3910 })
3911 .collect();
3912
3913 raw.policies = client
3914 .query(queries::POLICIES_QUERY, &[])
3915 .map_err(|e| CliError::Other(format!("Failed to query policies: {e}")))?
3916 .into_iter()
3917 .map(|row| RawPolicyInfo {
3918 schema: row.get::<_, String>(0),
3919 table: row.get::<_, String>(1),
3920 name: row.get::<_, String>(2),
3921 as_clause: row.get::<_, String>(3),
3922 for_clause: row.get::<_, String>(4),
3923 to: row.get::<_, Vec<String>>(5),
3924 using: row.get::<_, Option<String>>(6),
3925 with_check: row.get::<_, Option<String>>(7),
3926 })
3927 .collect();
3928
3929 Ok(())
3930}
3931
3932#[cfg(all(not(feature = "postgres-sync"), feature = "tokio-postgres"))]
3933fn introspect_postgres_async(creds: &PostgresCreds) -> Result<IntrospectResult, CliError> {
3934 let rt = tokio::runtime::Builder::new_current_thread()
3935 .enable_all()
3936 .build()
3937 .map_err(|e| CliError::Other(format!("Failed to create async runtime: {e}")))?;
3938
3939 rt.block_on(introspect_postgres_async_inner(creds))
3940}
3941
3942#[cfg(all(not(feature = "postgres-sync"), feature = "tokio-postgres"))]
3943async fn introspect_postgres_async_inner(
3944 creds: &PostgresCreds,
3945) -> Result<IntrospectResult, CliError> {
3946 let url = creds.connection_url();
3947 let (client, connection) = tokio_postgres::connect(&url, tokio_postgres::NoTls)
3948 .await
3949 .map_err(|e| CliError::ConnectionError(format!("Failed to connect to PostgreSQL: {e}")))?;
3950
3951 tokio::spawn(async move {
3952 if let Err(e) = connection.await {
3953 eprintln!(
3954 "{}",
3955 output::err_line(&format!("PostgreSQL connection error: {e}"))
3956 );
3957 }
3958 });
3959
3960 let raw = query_postgres_async_raw(&client).await?;
3961 let ddl = build_postgres_ddl(raw);
3962
3963 Ok(finalize_postgres_introspection(&ddl, &url))
3964}
3965
3966#[cfg(all(not(feature = "postgres-sync"), feature = "tokio-postgres"))]
3967async fn query_postgres_async_raw(
3968 client: &tokio_postgres::Client,
3969) -> Result<PostgresRawData, CliError> {
3970 let mut raw = PostgresRawData::empty();
3971 query_pg_async_core(client, &mut raw).await?;
3972 query_pg_async_codegen_meta(client, &mut raw).await?;
3973 query_pg_async_constraints(client, &mut raw).await?;
3974 query_pg_async_security(client, &mut raw).await?;
3975 Ok(raw)
3976}
3977
3978#[cfg(all(not(feature = "postgres-sync"), feature = "tokio-postgres"))]
3979async fn query_pg_async_core(
3980 client: &tokio_postgres::Client,
3981 raw: &mut PostgresRawData,
3982) -> Result<(), CliError> {
3983 use drizzle_migrations::postgres::introspect::{RawColumnInfo, RawTableInfo, queries};
3984
3985 raw.schemas = client
3986 .query(queries::SCHEMAS_QUERY, &[])
3987 .await
3988 .map_err(|e| CliError::Other(format!("Failed to query schemas: {e}")))?
3989 .into_iter()
3990 .map(|row| RawSchemaInfo {
3991 name: row.get::<_, String>(0),
3992 })
3993 .collect();
3994
3995 raw.tables = client
3996 .query(queries::TABLES_QUERY, &[])
3997 .await
3998 .map_err(|e| CliError::Other(format!("Failed to query tables: {e}")))?
3999 .into_iter()
4000 .map(|row| RawTableInfo {
4001 schema: row.get::<_, String>(0),
4002 name: row.get::<_, String>(1),
4003 is_rls_enabled: row.get::<_, bool>(2),
4004 })
4005 .collect();
4006
4007 raw.columns = client
4008 .query(queries::COLUMNS_QUERY, &[])
4009 .await
4010 .map_err(|e| CliError::Other(format!("Failed to query columns: {e}")))?
4011 .into_iter()
4012 .map(|row| RawColumnInfo {
4013 schema: row.get::<_, String>(0),
4014 table: row.get::<_, String>(1),
4015 name: row.get::<_, String>(2),
4016 column_type: row.get::<_, String>(3),
4017 type_schema: row.get::<_, Option<String>>(4),
4018 not_null: row.get::<_, bool>(5),
4019 default_value: row.get::<_, Option<String>>(6),
4020 is_identity: row.get::<_, bool>(7),
4021 identity_type: row.get::<_, Option<String>>(8),
4022 is_generated: row.get::<_, bool>(9),
4023 generated_expression: row.get::<_, Option<String>>(10),
4024 ordinal_position: row.get::<_, i32>(11),
4025 })
4026 .collect();
4027
4028 Ok(())
4029}
4030
4031#[cfg(all(not(feature = "postgres-sync"), feature = "tokio-postgres"))]
4032async fn query_pg_async_codegen_meta(
4033 client: &tokio_postgres::Client,
4034 raw: &mut PostgresRawData,
4035) -> Result<(), CliError> {
4036 use drizzle_migrations::postgres::introspect::{
4037 RawEnumInfo, RawSequenceInfo, RawViewInfo, queries,
4038 };
4039
4040 raw.enums = client
4041 .query(queries::ENUMS_QUERY, &[])
4042 .await
4043 .map_err(|e| CliError::Other(format!("Failed to query enums: {e}")))?
4044 .into_iter()
4045 .map(|row| RawEnumInfo {
4046 schema: row.get::<_, String>(0),
4047 name: row.get::<_, String>(1),
4048 values: row.get::<_, Vec<String>>(2),
4049 })
4050 .collect();
4051
4052 raw.sequences = client
4053 .query(queries::SEQUENCES_QUERY, &[])
4054 .await
4055 .map_err(|e| CliError::Other(format!("Failed to query sequences: {e}")))?
4056 .into_iter()
4057 .map(|row| RawSequenceInfo {
4058 schema: row.get::<_, String>(0),
4059 name: row.get::<_, String>(1),
4060 data_type: row.get::<_, Option<String>>(2),
4061 start_value: row.get::<_, Option<String>>(3),
4062 min_value: row.get::<_, Option<String>>(4),
4063 max_value: row.get::<_, Option<String>>(5),
4064 increment: row.get::<_, Option<String>>(6),
4065 cycle: row.get::<_, Option<bool>>(7),
4066 cache_value: row.get::<_, Option<String>>(8),
4067 })
4068 .collect();
4069
4070 let view_schema_filters: Option<Vec<String>> = None;
4071 raw.views = client
4072 .query(queries::VIEWS_QUERY, &[&view_schema_filters])
4073 .await
4074 .map_err(|e| CliError::Other(format!("Failed to query views: {e}")))?
4075 .into_iter()
4076 .map(|row| RawViewInfo {
4077 schema: row.get::<_, String>(0),
4078 name: row.get::<_, String>(1),
4079 definition: row.get::<_, String>(2),
4080 is_materialized: row.get::<_, bool>(3),
4081 })
4082 .collect();
4083
4084 Ok(())
4085}
4086
4087#[cfg(all(not(feature = "postgres-sync"), feature = "tokio-postgres"))]
4088async fn query_pg_async_constraints(
4089 client: &tokio_postgres::Client,
4090 raw: &mut PostgresRawData,
4091) -> Result<(), CliError> {
4092 use drizzle_migrations::postgres::introspect::{
4093 RawCheckInfo, RawForeignKeyInfo, RawIndexInfo, RawPrimaryKeyInfo, RawUniqueInfo,
4094 parse_index_columns, pg_action_code_to_string, queries,
4095 };
4096
4097 raw.indexes = client
4098 .query(queries::INDEXES_QUERY, &[])
4099 .await
4100 .map_err(|e| CliError::Other(format!("Failed to query indexes: {e}")))?
4101 .into_iter()
4102 .map(|row| {
4103 let cols: Vec<String> = row.get(6);
4104 RawIndexInfo {
4105 schema: row.get::<_, String>(0),
4106 table: row.get::<_, String>(1),
4107 name: row.get::<_, String>(2),
4108 is_unique: row.get::<_, bool>(3),
4109 is_primary: row.get::<_, bool>(4),
4110 method: row.get::<_, String>(5),
4111 columns: parse_index_columns(cols),
4112 where_clause: row.get::<_, Option<String>>(7),
4113 concurrent: false,
4114 }
4115 })
4116 .collect();
4117
4118 raw.foreign_keys = client
4119 .query(queries::FOREIGN_KEYS_QUERY, &[])
4120 .await
4121 .map_err(|e| CliError::Other(format!("Failed to query foreign keys: {e}")))?
4122 .into_iter()
4123 .map(|row| RawForeignKeyInfo {
4124 schema: row.get::<_, String>(0),
4125 table: row.get::<_, String>(1),
4126 name: row.get::<_, String>(2),
4127 columns: row.get::<_, Vec<String>>(3),
4128 schema_to: row.get::<_, String>(4),
4129 table_to: row.get::<_, String>(5),
4130 columns_to: row.get::<_, Vec<String>>(6),
4131 on_update: pg_action_code_to_string(&row.get::<_, String>(7)),
4132 on_delete: pg_action_code_to_string(&row.get::<_, String>(8)),
4133 })
4134 .collect();
4135
4136 raw.primary_keys = client
4137 .query(queries::PRIMARY_KEYS_QUERY, &[])
4138 .await
4139 .map_err(|e| CliError::Other(format!("Failed to query primary keys: {e}")))?
4140 .into_iter()
4141 .map(|row| RawPrimaryKeyInfo {
4142 schema: row.get::<_, String>(0),
4143 table: row.get::<_, String>(1),
4144 name: row.get::<_, String>(2),
4145 columns: row.get::<_, Vec<String>>(3),
4146 })
4147 .collect();
4148
4149 raw.uniques = client
4150 .query(queries::UNIQUES_QUERY, &[])
4151 .await
4152 .map_err(|e| CliError::Other(format!("Failed to query unique constraints: {e}")))?
4153 .into_iter()
4154 .map(|row| RawUniqueInfo {
4155 schema: row.get::<_, String>(0),
4156 table: row.get::<_, String>(1),
4157 name: row.get::<_, String>(2),
4158 columns: row.get::<_, Vec<String>>(3),
4159 nulls_not_distinct: row.get::<_, bool>(4),
4160 })
4161 .collect();
4162
4163 raw.checks = client
4164 .query(queries::CHECKS_QUERY, &[])
4165 .await
4166 .map_err(|e| CliError::Other(format!("Failed to query check constraints: {e}")))?
4167 .into_iter()
4168 .map(|row| RawCheckInfo {
4169 schema: row.get::<_, String>(0),
4170 table: row.get::<_, String>(1),
4171 name: row.get::<_, String>(2),
4172 expression: row.get::<_, String>(3),
4173 })
4174 .collect();
4175
4176 Ok(())
4177}
4178
4179#[cfg(all(not(feature = "postgres-sync"), feature = "tokio-postgres"))]
4180async fn query_pg_async_security(
4181 client: &tokio_postgres::Client,
4182 raw: &mut PostgresRawData,
4183) -> Result<(), CliError> {
4184 use drizzle_migrations::postgres::introspect::{RawPolicyInfo, RawRoleInfo, queries};
4185
4186 raw.roles = client
4187 .query(queries::ROLES_QUERY, &[])
4188 .await
4189 .map_err(|e| CliError::Other(format!("Failed to query roles: {e}")))?
4190 .into_iter()
4191 .map(|row| RawRoleInfo {
4192 name: row.get::<_, String>(0),
4193 create_db: row.get::<_, bool>(1),
4194 create_role: row.get::<_, bool>(2),
4195 inherit: row.get::<_, bool>(3),
4196 })
4197 .collect();
4198
4199 raw.policies = client
4200 .query(queries::POLICIES_QUERY, &[])
4201 .await
4202 .map_err(|e| CliError::Other(format!("Failed to query policies: {e}")))?
4203 .into_iter()
4204 .map(|row| RawPolicyInfo {
4205 schema: row.get::<_, String>(0),
4206 table: row.get::<_, String>(1),
4207 name: row.get::<_, String>(2),
4208 as_clause: row.get::<_, String>(3),
4209 for_clause: row.get::<_, String>(4),
4210 to: row.get::<_, Vec<String>>(5),
4211 using: row.get::<_, Option<String>>(6),
4212 with_check: row.get::<_, Option<String>>(7),
4213 })
4214 .collect();
4215
4216 Ok(())
4217}
4218
4219#[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
4221#[derive(Debug, Clone)]
4222struct RawSchemaInfo {
4223 name: String,
4224}
4225
4226#[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
4232struct PostgresRawData {
4233 schemas: Vec<RawSchemaInfo>,
4234 tables: Vec<drizzle_migrations::postgres::introspect::RawTableInfo>,
4235 columns: Vec<drizzle_migrations::postgres::introspect::RawColumnInfo>,
4236 enums: Vec<drizzle_migrations::postgres::introspect::RawEnumInfo>,
4237 sequences: Vec<drizzle_migrations::postgres::introspect::RawSequenceInfo>,
4238 views: Vec<drizzle_migrations::postgres::introspect::RawViewInfo>,
4239 indexes: Vec<drizzle_migrations::postgres::introspect::RawIndexInfo>,
4240 foreign_keys: Vec<drizzle_migrations::postgres::introspect::RawForeignKeyInfo>,
4241 primary_keys: Vec<drizzle_migrations::postgres::introspect::RawPrimaryKeyInfo>,
4242 uniques: Vec<drizzle_migrations::postgres::introspect::RawUniqueInfo>,
4243 checks: Vec<drizzle_migrations::postgres::introspect::RawCheckInfo>,
4244 roles: Vec<drizzle_migrations::postgres::introspect::RawRoleInfo>,
4245 policies: Vec<drizzle_migrations::postgres::introspect::RawPolicyInfo>,
4246}
4247
4248#[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
4252fn build_postgres_ddl(
4253 raw: PostgresRawData,
4254) -> drizzle_migrations::postgres::collection::PostgresDDL {
4255 use drizzle_migrations::postgres::{
4256 PostgresDDL,
4257 ddl::Schema,
4258 introspect::{
4259 process_check_constraints, process_columns, process_enums, process_foreign_keys,
4260 process_indexes, process_policies, process_primary_keys, process_roles,
4261 process_sequences, process_tables, process_unique_constraints, process_views,
4262 },
4263 };
4264
4265 let mut ddl = PostgresDDL::new();
4266 for s in raw.schemas.into_iter().map(|s| Schema::new(s.name)) {
4267 ddl.schemas.push(s);
4268 }
4269 for e in process_enums(&raw.enums) {
4270 ddl.enums.push(e);
4271 }
4272 for s in process_sequences(&raw.sequences) {
4273 ddl.sequences.push(s);
4274 }
4275 for r in process_roles(&raw.roles) {
4276 ddl.roles.push(r);
4277 }
4278 for p in process_policies(&raw.policies) {
4279 ddl.policies.push(p);
4280 }
4281 for t in process_tables(&raw.tables) {
4282 ddl.tables.push(t);
4283 }
4284 for c in process_columns(&raw.columns) {
4285 ddl.columns.push(c);
4286 }
4287 for i in process_indexes(&raw.indexes) {
4288 ddl.indexes.push(i);
4289 }
4290 for fk in process_foreign_keys(&raw.foreign_keys) {
4291 ddl.fks.push(fk);
4292 }
4293 for pk in process_primary_keys(&raw.primary_keys) {
4294 ddl.pks.push(pk);
4295 }
4296 for u in process_unique_constraints(&raw.uniques) {
4297 ddl.uniques.push(u);
4298 }
4299 for c in process_check_constraints(&raw.checks) {
4300 ddl.checks.push(c);
4301 }
4302 for v in process_views(&raw.views) {
4303 ddl.views.push(v);
4304 }
4305 ddl
4306}
4307
4308#[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
4310fn finalize_postgres_introspection(
4311 ddl: &drizzle_migrations::postgres::collection::PostgresDDL,
4312 url: &str,
4313) -> IntrospectResult {
4314 use drizzle_migrations::postgres::codegen::{
4315 CodegenOptions, FieldCasing, generate_rust_schema,
4316 };
4317
4318 let options = CodegenOptions {
4319 module_doc: Some(format!("Schema introspected from {}", mask_url(url))),
4320 include_schema: true,
4321 schema_name: "Schema".to_string(),
4322 use_pub: true,
4323 field_casing: FieldCasing::default(),
4324 };
4325 let generated = generate_rust_schema(ddl, &options);
4326
4327 let mut snap = drizzle_migrations::postgres::PostgresSnapshot::new();
4328 for entity in ddl.to_entities() {
4329 snap.add_entity(entity);
4330 }
4331
4332 IntrospectResult {
4333 schema_code: generated.code,
4334 table_count: ddl.tables.list().len(),
4335 index_count: ddl.indexes.list().len(),
4336 view_count: ddl.views.list().len(),
4337 warnings: generated.warnings,
4338 snapshot: Snapshot::Postgres(snap),
4339 snapshot_path: std::path::PathBuf::new(),
4340 }
4341}
4342
4343#[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
4344fn mask_url(url: &str) -> String {
4345 if let Some(at) = url.find('@')
4346 && let Some(colon) = url[..at].rfind(':')
4347 {
4348 let scheme_end = url.find("://").map_or(0, |p| p + 3);
4349 if colon > scheme_end {
4350 return format!("{}****{}", &url[..=colon], &url[at..]);
4351 }
4352 }
4353 url.to_string()
4354}
4355
4356#[cfg(test)]
4357mod tests {
4358 use super::*;
4359
4360 #[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
4361 fn test_postgres_url() -> String {
4362 std::env::var("DATABASE_URL")
4363 .unwrap_or_else(|_| "postgres://postgres:postgres@localhost:5432/drizzle_test".into())
4364 }
4365
4366 #[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
4367 fn test_postgres_creds() -> crate::config::PostgresCreds {
4368 crate::config::PostgresCreds::Url(test_postgres_url().into_boxed_str())
4369 }
4370
4371 #[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
4372 fn unique_pg_name(prefix: &str) -> String {
4373 use std::time::{SystemTime, UNIX_EPOCH};
4374
4375 let nanos = SystemTime::now()
4376 .duration_since(UNIX_EPOCH)
4377 .expect("time went backwards")
4378 .as_nanos();
4379 format!("{}_{}_{}", prefix, std::process::id(), nanos)
4380 }
4381
4382 #[test]
4383 fn destructive_statement_detection_covers_drop_variants() {
4384 assert!(is_destructive_statement("DROP TABLE users;"));
4385 assert!(is_destructive_statement("DROP VIEW active_users;"));
4386 assert!(is_destructive_statement("DROP TYPE status;"));
4387 assert!(is_destructive_statement("DROP SCHEMA auth;"));
4388 assert!(is_destructive_statement("DROP ROLE app_user;"));
4389 assert!(is_destructive_statement(
4390 "DROP POLICY users_rls_policy ON users;"
4391 ));
4392 assert!(is_destructive_statement("TRUNCATE users;"));
4393 assert!(is_destructive_statement(
4394 "ALTER TABLE users DROP CONSTRAINT users_email_key;"
4395 ));
4396
4397 assert!(!is_destructive_statement("CREATE TABLE users(id INTEGER);"));
4398 assert!(!is_destructive_statement(
4399 "ALTER TABLE users ADD COLUMN email text;"
4400 ));
4401 }
4402
4403 #[cfg(feature = "rusqlite")]
4404 #[test]
4405 fn sqlite_migrations_run_both_when_created_at_collides() {
4406 use drizzle_migrations::{Migration, Migrations};
4410
4411 let dir = tempfile::tempdir().expect("tempdir");
4412 let db_path = dir.path().join("migrate.sqlite");
4413 let db_path_str = db_path.to_string_lossy().to_string();
4414
4415 let first_set = Migrations::new(
4416 vec![Migration::with_hash(
4417 "20230331141203_first",
4418 "hash_one",
4419 1_680_271_923_000,
4420 vec!["CREATE TABLE created_at_dedupe_a (id INTEGER PRIMARY KEY)".to_string()],
4421 )],
4422 drizzle_types::Dialect::SQLite,
4423 );
4424
4425 let first =
4426 run_sqlite_migrations(&first_set, &db_path_str).expect("first migrate succeeds");
4427 assert_eq!(first.applied_count, 1);
4428
4429 let second_set = Migrations::new(
4430 vec![
4431 Migration::with_hash(
4432 "20230331141203_first",
4433 "hash_one",
4434 1_680_271_923_000,
4435 vec!["CREATE TABLE created_at_dedupe_a (id INTEGER PRIMARY KEY)".to_string()],
4436 ),
4437 Migration::with_hash(
4438 "20230331141203_second",
4439 "hash_two",
4440 1_680_271_923_000,
4441 vec!["CREATE TABLE created_at_dedupe_b (id INTEGER PRIMARY KEY)".to_string()],
4442 ),
4443 ],
4444 drizzle_types::Dialect::SQLite,
4445 );
4446
4447 let second =
4448 run_sqlite_migrations(&second_set, &db_path_str).expect("second migrate succeeds");
4449 assert_eq!(
4450 second.applied_count, 1,
4451 "only the second (newly introduced by name) migration should apply"
4452 );
4453
4454 let conn = rusqlite::Connection::open(&db_path).expect("open sqlite");
4455 let rows: i64 = conn
4456 .query_row("SELECT COUNT(*) FROM __drizzle_migrations", [], |row| {
4457 row.get(0)
4458 })
4459 .expect("count migrations rows");
4460 assert_eq!(rows, 2, "both migration records should be stored");
4461
4462 let table_b_exists: i64 = conn
4463 .query_row(
4464 "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='created_at_dedupe_b'",
4465 [],
4466 |row| row.get(0),
4467 )
4468 .expect("query sqlite_master");
4469 assert_eq!(
4470 table_b_exists, 1,
4471 "the new-name migration must execute even though created_at collides"
4472 );
4473 }
4474
4475 #[cfg(feature = "rusqlite")]
4476 #[test]
4477 fn run_migrations_creates_metadata_table_with_no_local_migrations() {
4478 let dir = tempfile::tempdir().expect("tempdir");
4479 let db_path = dir.path().join("empty.sqlite");
4480 let migrations_dir = dir.path().join("migrations");
4481 std::fs::create_dir_all(&migrations_dir).expect("create migrations dir");
4482
4483 let creds = crate::config::Credentials::Sqlite {
4484 path: db_path.to_string_lossy().to_string().into_boxed_str(),
4485 };
4486
4487 let result = run_migrations(
4488 &creds,
4489 crate::config::Dialect::Sqlite,
4490 &migrations_dir,
4491 "__drizzle_migrations",
4492 "drizzle",
4493 )
4494 .expect("run migrations");
4495 assert_eq!(result.applied_count, 0);
4496
4497 let conn = rusqlite::Connection::open(&db_path).expect("open sqlite");
4498 let exists: i64 = conn
4499 .query_row(
4500 "SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='__drizzle_migrations'",
4501 [],
4502 |row| row.get(0),
4503 )
4504 .expect("check metadata table");
4505 assert_eq!(exists, 1, "migrations metadata table should be created");
4506 }
4507
4508 #[cfg(any(
4509 feature = "rusqlite",
4510 feature = "libsql",
4511 feature = "turso",
4512 feature = "postgres-sync",
4513 feature = "tokio-postgres"
4514 ))]
4515 #[test]
4516 fn validate_init_metadata_matches_drizzle_orm_semantics() {
4517 use drizzle_migrations::{Migration, Migrations};
4518
4519 let empty_set = Migrations::empty(drizzle_types::Dialect::SQLite);
4520 validate_init_metadata(&[], &empty_set).expect("empty local migrations should be allowed");
4521
4522 let single = Migrations::new(
4523 vec![Migration::with_hash(
4524 "20230331141203_init",
4525 "hash_single",
4526 1_680_271_923_000,
4527 vec!["CREATE TABLE t(id INTEGER PRIMARY KEY)".to_string()],
4528 )],
4529 drizzle_types::Dialect::SQLite,
4530 );
4531 validate_init_metadata(&[], &single).expect("single local migration should be allowed");
4532
4533 let multiple = Migrations::new(
4534 vec![
4535 Migration::with_hash(
4536 "20230331141203_first",
4537 "hash_a",
4538 1_680_271_923_000,
4539 vec!["CREATE TABLE a(id INTEGER PRIMARY KEY)".to_string()],
4540 ),
4541 Migration::with_hash(
4542 "20230331150000_second",
4543 "hash_b",
4544 1_680_275_400_000,
4545 vec!["CREATE TABLE b(id INTEGER PRIMARY KEY)".to_string()],
4546 ),
4547 ],
4548 drizzle_types::Dialect::SQLite,
4549 );
4550
4551 let err =
4552 validate_init_metadata(&[], &multiple).expect_err("multiple local migrations rejected");
4553 assert_eq!(
4554 err.to_string(),
4555 "--init can't be used with existing migrations"
4556 );
4557
4558 let err = validate_init_metadata(&["20230331141203_init".to_string()], &single)
4559 .expect_err("existing db metadata should be rejected");
4560 assert_eq!(
4561 err.to_string(),
4562 "--init can't be used when database already has migrations set"
4563 );
4564 }
4565
4566 #[test]
4567 fn verify_applied_migrations_detects_hash_mismatch() {
4568 use drizzle_migrations::{Migration, Migrations};
4569
4570 let set = Migrations::new(
4571 vec![Migration::with_hash(
4572 "20230331141203_verify",
4573 "local_hash",
4574 1_680_271_923_000,
4575 vec!["CREATE TABLE t(id INTEGER PRIMARY KEY)".to_string()],
4576 )],
4577 drizzle_types::Dialect::SQLite,
4578 );
4579
4580 let applied = vec![AppliedMigrationRecord {
4581 hash: "db_hash".to_string(),
4582 name: "20230331141203_verify".to_string(),
4583 }];
4584
4585 let err = verify_applied_migrations_consistency(&set, &applied)
4586 .expect_err("hash mismatch should fail verification");
4587 assert_eq!(
4588 err.to_string(),
4589 "Migration failed: Migration hash mismatch for 20230331141203_verify: database=db_hash, local=local_hash"
4590 );
4591 }
4592
4593 #[test]
4594 fn build_migration_plan_counts_pending_statements() {
4595 use drizzle_migrations::{Migration, Migrations};
4596
4597 let set = Migrations::new(
4598 vec![
4599 Migration::with_hash(
4600 "20230331141203_first",
4601 "hash_a",
4602 1_680_271_923_000,
4603 vec![
4604 "CREATE TABLE a(id INTEGER PRIMARY KEY)".to_string(),
4605 "CREATE INDEX a_id_idx ON a(id)".to_string(),
4606 ],
4607 ),
4608 Migration::with_hash(
4609 "20230331150000_second",
4610 "hash_b",
4611 1_680_275_400_000,
4612 vec!["CREATE TABLE b(id INTEGER PRIMARY KEY)".to_string()],
4613 ),
4614 ],
4615 drizzle_types::Dialect::SQLite,
4616 );
4617
4618 let applied = vec![AppliedMigrationRecord {
4619 hash: "hash_a".to_string(),
4620 name: "20230331141203_first".to_string(),
4621 }];
4622
4623 let plan = build_migration_plan(&set, &applied).expect("build migration plan");
4624 assert_eq!(plan.applied_count, 1);
4625 assert_eq!(plan.pending_count, 1);
4626 assert_eq!(plan.pending_statements, 1);
4627 assert_eq!(plan.pending_migrations, vec!["20230331150000_second"]);
4628 }
4629
4630 #[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
4631 #[test]
4632 fn postgres_concurrent_index_detection_is_case_insensitive() {
4633 assert!(is_postgres_concurrent_index_statement(
4634 "CREATE INDEX CONCURRENTLY users_email_idx ON users (email);"
4635 ));
4636 assert!(is_postgres_concurrent_index_statement(
4637 "CREATE UNIQUE INDEX CONCURRENTLY users_email_idx ON users (email);"
4638 ));
4639 assert!(is_postgres_concurrent_index_statement(
4640 "drop index concurrently users_email_idx;"
4641 ));
4642 assert!(!is_postgres_concurrent_index_statement(
4643 "CREATE INDEX users_email_idx ON users (email);"
4644 ));
4645 }
4646
4647 #[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
4648 #[test]
4649 fn postgres_concurrent_index_detection_over_statement_list() {
4650 let with_concurrent = vec![
4651 "CREATE TABLE users(id integer);".to_string(),
4652 "CREATE INDEX CONCURRENTLY users_email_idx ON users (id);".to_string(),
4653 ];
4654 let without_concurrent = vec![
4655 "CREATE TABLE users(id integer);".to_string(),
4656 "CREATE INDEX users_email_idx ON users (id);".to_string(),
4657 ];
4658
4659 assert!(has_postgres_concurrent_index(&with_concurrent));
4660 assert!(!has_postgres_concurrent_index(&without_concurrent));
4661 }
4662
4663 #[test]
4664 fn generate_push_sql_includes_concurrent_postgres_index() {
4665 use crate::snapshot::parse_result_to_snapshot;
4666 use drizzle_migrations::parser::SchemaParser;
4667 use drizzle_types::Dialect;
4668
4669 let previous = r#"
4670#[PostgresTable]
4671pub struct Users {
4672 #[column(primary)]
4673 pub id: i32,
4674 pub email: String,
4675}
4676"#;
4677
4678 let current = r#"
4679#[PostgresTable]
4680pub struct Users {
4681 #[column(primary)]
4682 pub id: i32,
4683 pub email: String,
4684}
4685
4686#[PostgresIndex(concurrent)]
4687pub struct UsersEmailIdx(Users::email);
4688"#;
4689
4690 let prev =
4691 parse_result_to_snapshot(&SchemaParser::parse(previous), Dialect::PostgreSQL, None);
4692 let curr =
4693 parse_result_to_snapshot(&SchemaParser::parse(current), Dialect::PostgreSQL, None);
4694
4695 let (sql, warnings) = generate_push_sql(&prev, &curr, false).expect("push sql generation");
4696 assert!(warnings.is_empty());
4697 assert_eq!(sql.len(), 1);
4698 assert_eq!(
4699 sql[0],
4700 "CREATE INDEX CONCURRENTLY \"users_email_idx\" ON \"users\" USING btree (\"email\" NULLS LAST);"
4701 );
4702 }
4703
4704 #[test]
4705 fn filter_patterns_support_negation_globs() {
4706 let raw = vec![
4707 "users_*".to_string(),
4708 "!users_4".to_string(),
4709 "!ad*".to_string(),
4710 ];
4711 let patterns = compile_patterns(Some(&raw)).expect("compile patterns");
4712
4713 assert!(matches_patterns("users_1", patterns.as_deref()));
4714 assert!(!matches_patterns("users_4", patterns.as_deref()));
4715 assert!(!matches_patterns("admin", patterns.as_deref()));
4716 assert!(!matches_patterns("audit", patterns.as_deref()));
4717 }
4718
4719 #[test]
4720 fn postgres_table_filter_matches_table_name_only() {
4721 use crate::snapshot::parse_result_to_snapshot;
4722 use drizzle_migrations::parser::SchemaParser;
4723 use drizzle_migrations::schema::Snapshot;
4724 use drizzle_types::Dialect as BaseDialect;
4725
4726 let code = r#"
4727#[PostgresTable(schema = "admin")]
4728pub struct AuditLog {
4729 #[column(primary)]
4730 pub id: i32,
4731}
4732
4733#[PostgresTable(schema = "public")]
4734pub struct Users {
4735 #[column(primary)]
4736 pub id: i32,
4737}
4738"#;
4739
4740 let parsed = SchemaParser::parse(code);
4741 let mut snapshot = parse_result_to_snapshot(&parsed, BaseDialect::PostgreSQL, None);
4742 let filters = SnapshotFilters {
4743 tables: Some(vec!["admin.*".to_string()]),
4744 schemas: None,
4745 extensions: None,
4746 };
4747
4748 apply_snapshot_filters(&mut snapshot, crate::config::Dialect::Postgresql, &filters)
4749 .expect("apply filters");
4750
4751 let remaining_tables = match snapshot {
4752 Snapshot::Postgres(s) => s
4753 .ddl
4754 .iter()
4755 .filter_map(|e| {
4756 if let drizzle_types::postgres::ddl::PostgresEntity::Table(t) = e {
4757 Some((t.schema.to_string(), t.name.to_string()))
4758 } else {
4759 None
4760 }
4761 })
4762 .collect::<Vec<_>>(),
4763 _ => panic!("expected postgres snapshot"),
4764 };
4765
4766 assert!(remaining_tables.is_empty());
4767 }
4768
4769 #[test]
4770 fn postgres_schema_and_table_filters_intersect() {
4771 use crate::snapshot::parse_result_to_snapshot;
4772 use drizzle_migrations::parser::SchemaParser;
4773 use drizzle_migrations::schema::Snapshot;
4774 use drizzle_types::Dialect as BaseDialect;
4775
4776 let code = r#"
4777#[PostgresTable(schema = "dev")]
4778pub struct UsersDev {
4779 #[column(primary)]
4780 pub id: i32,
4781}
4782
4783#[PostgresTable(schema = "public")]
4784pub struct UsersPublic {
4785 #[column(primary)]
4786 pub id: i32,
4787}
4788"#;
4789
4790 let parsed = SchemaParser::parse(code);
4791 let mut snapshot = parse_result_to_snapshot(&parsed, BaseDialect::PostgreSQL, None);
4792 let filters = SnapshotFilters {
4793 tables: Some(vec!["users_*".to_string()]),
4794 schemas: Some(vec!["!dev".to_string()]),
4795 extensions: None,
4796 };
4797
4798 apply_snapshot_filters(&mut snapshot, crate::config::Dialect::Postgresql, &filters)
4799 .expect("apply filters");
4800
4801 let remaining_tables = match snapshot {
4802 Snapshot::Postgres(s) => s
4803 .ddl
4804 .iter()
4805 .filter_map(|e| {
4806 if let drizzle_types::postgres::ddl::PostgresEntity::Table(t) = e {
4807 Some((t.schema.to_string(), t.name.to_string()))
4808 } else {
4809 None
4810 }
4811 })
4812 .collect::<Vec<_>>(),
4813 _ => panic!("expected postgres snapshot"),
4814 };
4815
4816 assert_eq!(
4817 remaining_tables,
4818 vec![("public".to_string(), "users_public".to_string())]
4819 );
4820 }
4821
4822 #[test]
4823 fn postgres_extensions_filter_excludes_postgis_internal_objects() {
4824 use crate::snapshot::parse_result_to_snapshot;
4825 use drizzle_migrations::parser::SchemaParser;
4826 use drizzle_migrations::schema::Snapshot;
4827 use drizzle_types::Dialect as BaseDialect;
4828
4829 let code = r#"
4830#[PostgresTable(schema = "topology")]
4831pub struct TopologyLayer {
4832 #[column(primary)]
4833 pub id: i32,
4834}
4835
4836#[PostgresTable]
4837pub struct SpatialRefSys {
4838 #[column(primary)]
4839 pub id: i32,
4840}
4841
4842#[PostgresTable]
4843pub struct Users {
4844 #[column(primary)]
4845 pub id: i32,
4846}
4847"#;
4848
4849 let parsed = SchemaParser::parse(code);
4850 let mut snapshot = parse_result_to_snapshot(&parsed, BaseDialect::PostgreSQL, None);
4851 let filters = SnapshotFilters {
4852 tables: None,
4853 schemas: None,
4854 extensions: Some(vec![Extension::Postgis]),
4855 };
4856
4857 apply_snapshot_filters(&mut snapshot, crate::config::Dialect::Postgresql, &filters)
4858 .expect("apply filters");
4859
4860 let remaining_tables = match snapshot {
4861 Snapshot::Postgres(s) => s
4862 .ddl
4863 .iter()
4864 .filter_map(|e| {
4865 if let drizzle_types::postgres::ddl::PostgresEntity::Table(t) = e {
4866 Some((t.schema.to_string(), t.name.to_string()))
4867 } else {
4868 None
4869 }
4870 })
4871 .collect::<Vec<_>>(),
4872 _ => panic!("expected postgres snapshot"),
4873 };
4874
4875 assert_eq!(
4876 remaining_tables,
4877 vec![("public".to_string(), "users".to_string())]
4878 );
4879 }
4880
4881 #[test]
4882 fn format_migration_sql_respects_breakpoints_flag() {
4883 let sql = vec![
4884 "CREATE TABLE users(id integer);".to_string(),
4885 "CREATE INDEX users_id_idx ON users(id);".to_string(),
4886 ];
4887
4888 let with_breakpoints = format_migration_sql(&sql, true);
4889 assert_eq!(
4890 with_breakpoints,
4891 "CREATE TABLE users(id integer);\n--> statement-breakpoint\nCREATE INDEX users_id_idx ON users(id);"
4892 );
4893
4894 let without_breakpoints = format_migration_sql(&sql, false);
4895 assert_eq!(
4896 without_breakpoints,
4897 "CREATE TABLE users(id integer);\n\nCREATE INDEX users_id_idx ON users(id);"
4898 );
4899
4900 let empty = format_migration_sql(&[], false);
4901 assert_eq!(empty, "-- No tables to create (empty database)\n");
4902 }
4903
4904 #[test]
4905 fn regenerate_sqlite_schema_applies_introspect_casing() {
4906 use crate::config::{Casing, IntrospectCasing};
4907 use crate::snapshot::parse_result_to_snapshot;
4908 use drizzle_migrations::parser::SchemaParser;
4909 use drizzle_types::Dialect as BaseDialect;
4910
4911 let code = r#"
4912#[SQLiteTable]
4913pub struct AuditLogs {
4914 #[column(primary)]
4915 pub id: i64,
4916 pub user_name: String,
4917}
4918"#;
4919
4920 let parsed = SchemaParser::parse(code);
4921 let snapshot =
4922 parse_result_to_snapshot(&parsed, BaseDialect::SQLite, Some(Casing::SnakeCase));
4923
4924 let mut camel = IntrospectResult {
4925 schema_code: String::new(),
4926 table_count: 0,
4927 index_count: 0,
4928 view_count: 0,
4929 warnings: Vec::new(),
4930 snapshot: snapshot.clone(),
4931 snapshot_path: std::path::PathBuf::new(),
4932 };
4933 regenerate_schema_from_snapshot(
4934 &mut camel,
4935 crate::config::Dialect::Sqlite,
4936 Some(IntrospectCasing::Camel),
4937 );
4938
4939 assert_eq!(
4940 camel.schema_code,
4941 "\
4942//! Auto-generated SQLite schema from introspection
4943//!
4944//! Schema introspected from filtered database objects
4945
4946use drizzle::sqlite::prelude::*;
4947
4948#[SQLiteTable(name = \"audit_logs\")]
4949pub struct AuditLogs {
4950 #[column(primary)]
4951 pub id: i64,
4952 pub userName: String,
4953}
4954
4955#[derive(SQLiteSchema)]
4956pub struct Schema {
4957 pub auditLogs: AuditLogs,
4958}
4959"
4960 );
4961
4962 let mut preserve = IntrospectResult {
4963 schema_code: String::new(),
4964 table_count: 0,
4965 index_count: 0,
4966 view_count: 0,
4967 warnings: Vec::new(),
4968 snapshot,
4969 snapshot_path: std::path::PathBuf::new(),
4970 };
4971 regenerate_schema_from_snapshot(
4972 &mut preserve,
4973 crate::config::Dialect::Sqlite,
4974 Some(IntrospectCasing::Preserve),
4975 );
4976
4977 assert_eq!(
4978 preserve.schema_code,
4979 "\
4980//! Auto-generated SQLite schema from introspection
4981//!
4982//! Schema introspected from filtered database objects
4983
4984use drizzle::sqlite::prelude::*;
4985
4986#[SQLiteTable(name = \"audit_logs\")]
4987pub struct AuditLogs {
4988 #[column(primary)]
4989 pub id: i64,
4990 pub user_name: String,
4991}
4992
4993#[derive(SQLiteSchema)]
4994pub struct Schema {
4995 pub audit_logs: AuditLogs,
4996}
4997"
4998 );
4999 }
5000
5001 #[test]
5002 fn regenerate_postgres_schema_applies_introspect_casing() {
5003 use crate::config::{Casing, IntrospectCasing};
5004 use crate::snapshot::parse_result_to_snapshot;
5005 use drizzle_migrations::parser::SchemaParser;
5006 use drizzle_types::Dialect as BaseDialect;
5007
5008 let code = r#"
5009#[PostgresTable]
5010pub struct AuditLogs {
5011 #[column(primary)]
5012 pub id: i32,
5013 pub user_name: String,
5014}
5015"#;
5016
5017 let parsed = SchemaParser::parse(code);
5018 let snapshot =
5019 parse_result_to_snapshot(&parsed, BaseDialect::PostgreSQL, Some(Casing::SnakeCase));
5020
5021 let mut camel = IntrospectResult {
5022 schema_code: String::new(),
5023 table_count: 0,
5024 index_count: 0,
5025 view_count: 0,
5026 warnings: Vec::new(),
5027 snapshot: snapshot.clone(),
5028 snapshot_path: std::path::PathBuf::new(),
5029 };
5030 regenerate_schema_from_snapshot(
5031 &mut camel,
5032 crate::config::Dialect::Postgresql,
5033 Some(IntrospectCasing::Camel),
5034 );
5035
5036 assert_eq!(
5037 camel.schema_code,
5038 "\
5039//! Auto-generated PostgreSQL schema from introspection
5040//!
5041//! Schema introspected from filtered database objects
5042
5043use drizzle::postgres::prelude::*;
5044
5045#[PostgresTable]
5046pub struct AuditLogs {
5047 #[column(primary)]
5048 pub id: i32,
5049 pub userName: String,
5050}
5051
5052#[derive(PostgresSchema)]
5053pub struct Schema {
5054 pub auditLogs: AuditLogs,
5055}
5056"
5057 );
5058
5059 let mut preserve = IntrospectResult {
5060 schema_code: String::new(),
5061 table_count: 0,
5062 index_count: 0,
5063 view_count: 0,
5064 warnings: Vec::new(),
5065 snapshot,
5066 snapshot_path: std::path::PathBuf::new(),
5067 };
5068 regenerate_schema_from_snapshot(
5069 &mut preserve,
5070 crate::config::Dialect::Postgresql,
5071 Some(IntrospectCasing::Preserve),
5072 );
5073
5074 assert_eq!(
5075 preserve.schema_code,
5076 "\
5077//! Auto-generated PostgreSQL schema from introspection
5078//!
5079//! Schema introspected from filtered database objects
5080
5081use drizzle::postgres::prelude::*;
5082
5083#[PostgresTable]
5084pub struct AuditLogs {
5085 #[column(primary)]
5086 pub id: i32,
5087 pub user_name: String,
5088}
5089
5090#[derive(PostgresSchema)]
5091pub struct Schema {
5092 pub audit_logs: AuditLogs,
5093}
5094"
5095 );
5096 }
5097
5098 #[cfg(feature = "postgres-sync")]
5099 #[test]
5100 fn postgres_sync_migrate_applies_concurrent_index_without_transaction() {
5101 use drizzle_migrations::{Migration, Migrations};
5102 use drizzle_types::Dialect;
5103
5104 let creds = test_postgres_creds();
5105 let url = creds.connection_url();
5106
5107 let mut setup_client = match postgres::Client::connect(&url, postgres::NoTls) {
5108 Ok(c) => c,
5109 Err(e) => {
5110 eprintln!(
5111 "Skipping postgres_sync_migrate_applies_concurrent_index_without_transaction: {}",
5112 e
5113 );
5114 return;
5115 }
5116 };
5117
5118 let table = unique_pg_name("cli_sync_users");
5119 let index = format!("{}_email_idx", table);
5120 let migration_schema = unique_pg_name("cli_sync_mig");
5121
5122 setup_client
5123 .batch_execute(&format!(
5124 "DROP TABLE IF EXISTS \"{table}\" CASCADE; \
5125 CREATE TABLE \"{table}\" (id integer, email text NOT NULL); \
5126 INSERT INTO \"{table}\" (id, email) VALUES (1, 'a@example.com');"
5127 ))
5128 .expect("setup table for concurrent index test");
5129 drop(setup_client);
5130
5131 let migration_tag = format!("20260212000000_{table}");
5132 let migration_sql =
5133 format!("CREATE INDEX CONCURRENTLY \"{index}\" ON \"{table}\" (\"email\");");
5134 let set = Migrations::with_tracking(
5135 vec![Migration::new(&migration_tag, &migration_sql)],
5136 Dialect::PostgreSQL,
5137 drizzle_migrations::Tracking::POSTGRES.schema(migration_schema.clone()),
5138 );
5139
5140 let result = run_postgres_sync_migrations(&set, &creds)
5141 .expect("sync migration with concurrent index should succeed");
5142 assert_eq!(result.applied_count, 1);
5143
5144 let mut verify_client =
5145 postgres::Client::connect(&url, postgres::NoTls).expect("reconnect for verification");
5146 let exists: i64 = verify_client
5147 .query_one(
5148 "SELECT COUNT(*)::bigint FROM pg_indexes \
5149 WHERE schemaname = 'public' AND tablename = $1 AND indexname = $2",
5150 &[&table, &index],
5151 )
5152 .expect("query pg_indexes")
5153 .get(0);
5154 assert_eq!(exists, 1, "concurrent index was not created");
5155
5156 let _ = verify_client.batch_execute(&format!(
5157 "DROP TABLE IF EXISTS \"{table}\" CASCADE; \
5158 DROP SCHEMA IF EXISTS \"{migration_schema}\" CASCADE;"
5159 ));
5160 }
5161
5162 #[cfg(feature = "postgres-sync")]
5163 #[test]
5164 fn postgres_sync_migrate_upgrades_legacy_tracking_table_and_applies_pending() {
5165 use drizzle_migrations::{Migration, Migrations};
5166 use drizzle_types::Dialect;
5167
5168 let creds = test_postgres_creds();
5169 let url = creds.connection_url();
5170 let mut client = postgres::Client::connect(&url, postgres::NoTls)
5171 .expect("connect postgres for legacy tracking upgrade test");
5172
5173 let applied_table = unique_pg_name("cli_sync_applied");
5174 let pending_table = unique_pg_name("cli_sync_pending");
5175 let migration_schema = unique_pg_name("cli_sync_tracking");
5176
5177 client
5178 .batch_execute(&format!(
5179 "DROP TABLE IF EXISTS \"{applied_table}\" CASCADE; \
5180 DROP TABLE IF EXISTS \"{pending_table}\" CASCADE; \
5181 DROP SCHEMA IF EXISTS \"{migration_schema}\" CASCADE; \
5182 CREATE SCHEMA \"{migration_schema}\"; \
5183 CREATE TABLE \"{migration_schema}\".\"__drizzle_migrations\" (id SERIAL PRIMARY KEY, hash TEXT NOT NULL, created_at BIGINT); \
5184 CREATE TABLE \"{applied_table}\" (id integer primary key);"
5185 ))
5186 .expect("setup legacy postgres tracking metadata");
5187
5188 let first = Migration::new(
5189 &format!("20230331141203_{applied_table}"),
5190 &format!("CREATE TABLE \"{applied_table}\" (id integer primary key);"),
5191 );
5192 let second = Migration::new(
5193 &format!("20230331141204_{pending_table}"),
5194 &format!("CREATE TABLE \"{pending_table}\" (id integer primary key);"),
5195 );
5196 let set = Migrations::with_tracking(
5197 vec![first.clone(), second.clone()],
5198 Dialect::PostgreSQL,
5199 drizzle_migrations::Tracking::POSTGRES.schema(migration_schema.clone()),
5200 );
5201
5202 client
5203 .execute(
5204 &format!(
5205 "INSERT INTO \"{migration_schema}\".\"__drizzle_migrations\" (hash, created_at) VALUES ($1, $2)"
5206 ),
5207 &[&first.hash(), &first.created_at()],
5208 )
5209 .expect("insert legacy applied migration row");
5210 drop(client);
5211
5212 let result = run_postgres_sync_migrations(&set, &creds)
5213 .expect("sync migration upgrade should succeed");
5214 assert_eq!(result.applied_count, 1);
5215 assert_eq!(result.applied_migrations, vec![second.hash().to_string()]);
5216
5217 let mut verify_client =
5218 postgres::Client::connect(&url, postgres::NoTls).expect("reconnect for verification");
5219 let columns: Vec<String> = verify_client
5220 .query(
5221 "SELECT column_name FROM information_schema.columns WHERE table_schema = $1 AND table_name = '__drizzle_migrations' ORDER BY ordinal_position",
5222 &[&migration_schema],
5223 )
5224 .expect("query upgraded tracking columns")
5225 .into_iter()
5226 .map(|row| row.get(0))
5227 .collect();
5228 assert_eq!(
5229 columns,
5230 vec!["id", "hash", "created_at", "name", "applied_at"]
5231 );
5232
5233 let rows: Vec<(String, i64, String, Option<String>)> = verify_client
5234 .query(
5235 &format!(
5236 "SELECT hash, created_at, name, applied_at::text FROM \"{migration_schema}\".\"__drizzle_migrations\" ORDER BY id ASC"
5237 ),
5238 &[],
5239 )
5240 .expect("query upgraded metadata rows")
5241 .into_iter()
5242 .map(|row| (row.get(0), row.get(1), row.get(2), row.get(3)))
5243 .collect();
5244 assert_eq!(rows.len(), 2);
5245 assert_eq!(rows[0].0, first.hash());
5246 assert_eq!(rows[0].1, first.created_at());
5247 assert_eq!(rows[0].2, first.name());
5248 assert_eq!(rows[0].3, None);
5249 assert_eq!(rows[1].0, second.hash());
5250 assert_eq!(rows[1].1, second.created_at());
5251 assert_eq!(rows[1].2, second.name());
5252 assert!(rows[1].3.is_some());
5253
5254 let pending_exists: i64 = verify_client
5255 .query_one(
5256 "SELECT COUNT(*)::bigint FROM information_schema.tables WHERE table_schema = 'public' AND table_name = $1",
5257 &[&pending_table],
5258 )
5259 .expect("query pending table")
5260 .get(0);
5261 assert_eq!(pending_exists, 1);
5262
5263 let _ = verify_client.batch_execute(&format!(
5264 "DROP TABLE IF EXISTS \"{applied_table}\" CASCADE; \
5265 DROP TABLE IF EXISTS \"{pending_table}\" CASCADE; \
5266 DROP SCHEMA IF EXISTS \"{migration_schema}\" CASCADE;"
5267 ));
5268 }
5269
5270 #[cfg(feature = "postgres-sync")]
5271 #[test]
5272 fn postgres_sync_migrate_upgrade_rejects_unmatched_legacy_rows() {
5273 use drizzle_migrations::{Migration, Migrations};
5274 use drizzle_types::Dialect;
5275
5276 let creds = test_postgres_creds();
5277 let url = creds.connection_url();
5278 let mut client = postgres::Client::connect(&url, postgres::NoTls)
5279 .expect("connect postgres for unmatched legacy row test");
5280
5281 let migration_schema = unique_pg_name("cli_sync_tracking_unmatched");
5282 client
5283 .batch_execute(&format!(
5284 "DROP SCHEMA IF EXISTS \"{migration_schema}\" CASCADE; \
5285 CREATE SCHEMA \"{migration_schema}\"; \
5286 CREATE TABLE \"{migration_schema}\".\"__drizzle_migrations\" (id SERIAL PRIMARY KEY, hash TEXT NOT NULL, created_at BIGINT);"
5287 ))
5288 .expect("setup unmatched legacy metadata");
5289
5290 let migration = Migration::new(
5291 "20230331141203_cli_sync_first",
5292 "CREATE TABLE \"cli_sync_unmatched_target\" (id integer primary key);",
5293 );
5294 let set = Migrations::with_tracking(
5295 vec![migration],
5296 Dialect::PostgreSQL,
5297 drizzle_migrations::Tracking::POSTGRES.schema(migration_schema.clone()),
5298 );
5299
5300 client
5301 .execute(
5302 &format!(
5303 "INSERT INTO \"{migration_schema}\".\"__drizzle_migrations\" (hash, created_at) VALUES ($1, $2)"
5304 ),
5305 &[&"unknown_hash", &1_680_271_924_000_i64],
5306 )
5307 .expect("insert unmatched legacy row");
5308 drop(client);
5309
5310 let err = run_postgres_sync_migrations(&set, &creds)
5311 .expect_err("unmatched legacy metadata should fail");
5312 assert!(err.to_string().contains("do not match local migrations"));
5313
5314 let mut verify_client =
5315 postgres::Client::connect(&url, postgres::NoTls).expect("reconnect for verification");
5316 let columns: Vec<String> = verify_client
5317 .query(
5318 "SELECT column_name FROM information_schema.columns WHERE table_schema = $1 AND table_name = '__drizzle_migrations' ORDER BY ordinal_position",
5319 &[&migration_schema],
5320 )
5321 .expect("query legacy tracking columns")
5322 .into_iter()
5323 .map(|row| row.get(0))
5324 .collect();
5325 assert_eq!(columns, vec!["id", "hash", "created_at"]);
5326
5327 let _ = verify_client.batch_execute(&format!(
5328 "DROP TABLE IF EXISTS \"cli_sync_unmatched_target\" CASCADE; \
5329 DROP SCHEMA IF EXISTS \"{migration_schema}\" CASCADE;"
5330 ));
5331 }
5332
5333 #[cfg(feature = "tokio-postgres")]
5334 #[test]
5335 fn tokio_postgres_migrate_applies_concurrent_index_without_transaction() {
5336 use drizzle_migrations::{Migration, Migrations};
5337 use drizzle_types::Dialect;
5338
5339 let creds = test_postgres_creds();
5340 let url = creds.connection_url();
5341 let rt = tokio::runtime::Builder::new_current_thread()
5342 .enable_all()
5343 .build()
5344 .expect("create tokio runtime");
5345
5346 rt.block_on(async {
5347 let (client, connection) = match tokio_postgres::connect(&url, tokio_postgres::NoTls).await
5348 {
5349 Ok(v) => v,
5350 Err(e) => {
5351 eprintln!(
5352 "Skipping tokio_postgres_migrate_applies_concurrent_index_without_transaction: {}",
5353 e
5354 );
5355 return;
5356 }
5357 };
5358
5359 tokio::spawn(async move {
5360 let _ = connection.await;
5361 });
5362
5363 let table = unique_pg_name("cli_async_users");
5364 let index = format!("{}_email_idx", table);
5365 let migration_schema = unique_pg_name("cli_async_mig");
5366
5367 client
5368 .batch_execute(&format!(
5369 "DROP TABLE IF EXISTS \"{table}\" CASCADE; \
5370 CREATE TABLE \"{table}\" (id integer, email text NOT NULL); \
5371 INSERT INTO \"{table}\" (id, email) VALUES (1, 'a@example.com');"
5372 ))
5373 .await
5374 .expect("setup table for async concurrent index test");
5375
5376 let migration_tag = format!("20260212000000_{table}");
5377 let migration_sql = format!(
5378 "CREATE INDEX CONCURRENTLY \"{index}\" ON \"{table}\" (\"email\");"
5379 );
5380 let set = Migrations::with_tracking(
5381 vec![Migration::new(&migration_tag, &migration_sql)],
5382 Dialect::PostgreSQL,
5383 drizzle_migrations::Tracking::POSTGRES.schema(migration_schema.clone()),
5384 );
5385
5386 let result = run_postgres_async_inner(&set, &creds)
5387 .await
5388 .expect("async migration with concurrent index should succeed");
5389 assert_eq!(result.applied_count, 1);
5390
5391 let exists: i64 = client
5392 .query_one(
5393 "SELECT COUNT(*)::bigint FROM pg_indexes \
5394 WHERE schemaname = 'public' AND tablename = $1 AND indexname = $2",
5395 &[&table, &index],
5396 )
5397 .await
5398 .expect("query pg_indexes")
5399 .get(0);
5400 assert_eq!(exists, 1, "async concurrent index was not created");
5401
5402 let _ = client
5403 .batch_execute(&format!(
5404 "DROP TABLE IF EXISTS \"{table}\" CASCADE; \
5405 DROP SCHEMA IF EXISTS \"{migration_schema}\" CASCADE;"
5406 ))
5407 .await;
5408 });
5409 }
5410
5411 #[cfg(feature = "tokio-postgres")]
5412 #[test]
5413 fn tokio_postgres_migrate_upgrades_legacy_tracking_table_and_applies_pending() {
5414 use drizzle_migrations::{Migration, Migrations};
5415 use drizzle_types::Dialect;
5416
5417 let creds = test_postgres_creds();
5418 let url = creds.connection_url();
5419 let rt = tokio::runtime::Builder::new_current_thread()
5420 .enable_all()
5421 .build()
5422 .expect("create tokio runtime");
5423
5424 rt.block_on(async {
5425 let (client, connection) = tokio_postgres::connect(&url, tokio_postgres::NoTls)
5426 .await
5427 .expect("connect tokio-postgres for legacy tracking upgrade test");
5428
5429 tokio::spawn(async move {
5430 let _ = connection.await;
5431 });
5432
5433 let applied_table = unique_pg_name("cli_async_applied");
5434 let pending_table = unique_pg_name("cli_async_pending");
5435 let migration_schema = unique_pg_name("cli_async_tracking");
5436
5437 client
5438 .batch_execute(&format!(
5439 "DROP TABLE IF EXISTS \"{applied_table}\" CASCADE; \
5440 DROP TABLE IF EXISTS \"{pending_table}\" CASCADE; \
5441 DROP SCHEMA IF EXISTS \"{migration_schema}\" CASCADE; \
5442 CREATE SCHEMA \"{migration_schema}\"; \
5443 CREATE TABLE \"{migration_schema}\".\"__drizzle_migrations\" (id SERIAL PRIMARY KEY, hash TEXT NOT NULL, created_at BIGINT); \
5444 CREATE TABLE \"{applied_table}\" (id integer primary key);"
5445 ))
5446 .await
5447 .expect("setup legacy postgres tracking metadata");
5448
5449 let first = Migration::new(
5450 &format!("20230331141203_{applied_table}"),
5451 &format!("CREATE TABLE \"{applied_table}\" (id integer primary key);"),
5452 );
5453 let second = Migration::new(
5454 &format!("20230331141204_{pending_table}"),
5455 &format!("CREATE TABLE \"{pending_table}\" (id integer primary key);"),
5456 );
5457 let set = Migrations::with_tracking(
5458 vec![first.clone(), second.clone()],
5459 Dialect::PostgreSQL,
5460 drizzle_migrations::Tracking::POSTGRES.schema(migration_schema.clone()),
5461 );
5462
5463 client
5464 .execute(
5465 &format!(
5466 "INSERT INTO \"{migration_schema}\".\"__drizzle_migrations\" (hash, created_at) VALUES ($1, $2)"
5467 ),
5468 &[&first.hash(), &first.created_at()],
5469 )
5470 .await
5471 .expect("insert legacy applied migration row");
5472
5473 let result = run_postgres_async_inner(&set, &creds)
5474 .await
5475 .expect("async migration upgrade should succeed");
5476 assert_eq!(result.applied_count, 1);
5477 assert_eq!(result.applied_migrations, vec![second.hash().to_string()]);
5478
5479 let columns: Vec<String> = client
5480 .query(
5481 "SELECT column_name FROM information_schema.columns WHERE table_schema = $1 AND table_name = '__drizzle_migrations' ORDER BY ordinal_position",
5482 &[&migration_schema],
5483 )
5484 .await
5485 .expect("query upgraded tracking columns")
5486 .into_iter()
5487 .map(|row| row.get(0))
5488 .collect();
5489 assert_eq!(
5490 columns,
5491 vec!["id", "hash", "created_at", "name", "applied_at"]
5492 );
5493
5494 let rows: Vec<(String, i64, String, Option<String>)> = client
5495 .query(
5496 &format!(
5497 "SELECT hash, created_at, name, applied_at::text FROM \"{migration_schema}\".\"__drizzle_migrations\" ORDER BY id ASC"
5498 ),
5499 &[],
5500 )
5501 .await
5502 .expect("query upgraded metadata rows")
5503 .into_iter()
5504 .map(|row| (row.get(0), row.get(1), row.get(2), row.get(3)))
5505 .collect();
5506 assert_eq!(rows.len(), 2);
5507 assert_eq!(rows[0].0, first.hash());
5508 assert_eq!(rows[0].1, first.created_at());
5509 assert_eq!(rows[0].2, first.name());
5510 assert_eq!(rows[0].3, None);
5511 assert_eq!(rows[1].0, second.hash());
5512 assert_eq!(rows[1].1, second.created_at());
5513 assert_eq!(rows[1].2, second.name());
5514 assert!(rows[1].3.is_some());
5515
5516 let pending_exists: i64 = client
5517 .query_one(
5518 "SELECT COUNT(*)::bigint FROM information_schema.tables WHERE table_schema = 'public' AND table_name = $1",
5519 &[&pending_table],
5520 )
5521 .await
5522 .expect("query pending table")
5523 .get(0);
5524 assert_eq!(pending_exists, 1);
5525
5526 let _ = client
5527 .batch_execute(&format!(
5528 "DROP TABLE IF EXISTS \"{applied_table}\" CASCADE; \
5529 DROP TABLE IF EXISTS \"{pending_table}\" CASCADE; \
5530 DROP SCHEMA IF EXISTS \"{migration_schema}\" CASCADE;"
5531 ))
5532 .await;
5533 });
5534 }
5535}