1use std::path::Path;
7
8#[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
9use crate::config::PostgresCreds;
10use crate::config::{Credentials, Dialect};
11use crate::error::CliError;
12use crate::output;
13use drizzle_migrations::MigrationSet;
14use drizzle_migrations::schema::Snapshot;
15
16#[derive(Debug)]
18pub struct MigrationResult {
19 pub applied_count: usize,
21 pub applied_migrations: Vec<String>,
23}
24
25#[derive(Debug, Clone)]
27pub struct PushPlan {
28 pub sql_statements: Vec<String>,
29 pub warnings: Vec<String>,
30 pub destructive: bool,
31}
32
33pub fn plan_push(
35 credentials: &Credentials,
36 dialect: Dialect,
37 desired: &Snapshot,
38 breakpoints: bool,
39) -> Result<PushPlan, CliError> {
40 let current = introspect_database(credentials, dialect)?.snapshot;
41 let (sql_statements, warnings) = generate_push_sql(¤t, desired, breakpoints)?;
42 let destructive = sql_statements.iter().any(|s| is_destructive_statement(s));
43
44 Ok(PushPlan {
45 sql_statements,
46 warnings,
47 destructive,
48 })
49}
50
51pub fn apply_push(
53 credentials: &Credentials,
54 dialect: Dialect,
55 plan: &PushPlan,
56 force: bool,
57) -> Result<(), CliError> {
58 if plan.sql_statements.is_empty() {
59 return Ok(());
60 }
61
62 if plan.destructive && !force {
63 let confirmed = confirm_destructive()?;
64 if !confirmed {
65 return Ok(());
66 }
67 }
68
69 execute_statements(credentials, dialect, &plan.sql_statements)
70}
71
72pub fn run_migrations(
77 credentials: &Credentials,
78 dialect: Dialect,
79 migrations_dir: &Path,
80 migrations_table: &str,
81 migrations_schema: &str,
82) -> Result<MigrationResult, CliError> {
83 let mut set = MigrationSet::from_dir(migrations_dir, dialect.to_base())
85 .map_err(|e| CliError::Other(format!("Failed to load migrations: {}", e)))?;
86
87 if !migrations_table.trim().is_empty() {
89 set = set.with_table(migrations_table.to_string());
90 }
91 if dialect == Dialect::Postgresql && !migrations_schema.trim().is_empty() {
92 set = set.with_schema(migrations_schema.to_string());
93 }
94
95 if set.all().is_empty() {
96 return Ok(MigrationResult {
97 applied_count: 0,
98 applied_migrations: vec![],
99 });
100 }
101
102 match credentials {
103 #[cfg(feature = "rusqlite")]
104 Credentials::Sqlite { path } => run_sqlite_migrations(&set, path),
105
106 #[cfg(not(feature = "rusqlite"))]
107 Credentials::Sqlite { .. } => Err(CliError::MissingDriver {
108 dialect: "SQLite",
109 feature: "rusqlite",
110 }),
111
112 #[cfg(any(feature = "libsql", feature = "turso"))]
113 Credentials::Turso { url, auth_token } => {
114 let _auth_token = auth_token.as_deref();
115 if is_local_libsql(url) {
116 #[cfg(feature = "libsql")]
117 {
118 run_libsql_local_migrations(&set, url)
119 }
120 #[cfg(not(feature = "libsql"))]
121 {
122 Err(CliError::MissingDriver {
123 dialect: "LibSQL (local)",
124 feature: "libsql",
125 })
126 }
127 } else {
128 #[cfg(feature = "turso")]
129 {
130 run_turso_migrations(&set, url, _auth_token)
131 }
132 #[cfg(not(feature = "turso"))]
133 {
134 Err(CliError::MissingDriver {
135 dialect: "Turso (remote)",
136 feature: "turso",
137 })
138 }
139 }
140 }
141
142 #[cfg(all(not(feature = "turso"), not(feature = "libsql")))]
143 Credentials::Turso { .. } => Err(CliError::MissingDriver {
144 dialect: "Turso",
145 feature: "turso or libsql",
146 }),
147
148 #[cfg(feature = "postgres-sync")]
150 Credentials::Postgres(creds) => run_postgres_sync_migrations(&set, creds),
151
152 #[cfg(all(not(feature = "postgres-sync"), feature = "tokio-postgres"))]
153 Credentials::Postgres(creds) => run_postgres_async_migrations(&set, creds),
154
155 #[cfg(all(not(feature = "postgres-sync"), not(feature = "tokio-postgres")))]
156 Credentials::Postgres(_) => Err(CliError::MissingDriver {
157 dialect: "PostgreSQL",
158 feature: "postgres-sync or tokio-postgres",
159 }),
160 }
161}
162
163fn is_destructive_statement(sql: &str) -> bool {
164 let s = sql.trim().to_uppercase();
165 s.contains("DROP TABLE")
166 || s.contains("DROP COLUMN")
167 || s.contains("DROP INDEX")
168 || s.contains("TRUNCATE")
169 || (s.contains("ALTER TABLE") && s.contains(" DROP "))
170}
171
172fn confirm_destructive() -> Result<bool, CliError> {
173 use std::io::{self, IsTerminal, Write};
174
175 if !io::stdin().is_terminal() {
176 return Err(CliError::Other(
177 "Refusing to apply potentially destructive changes in non-interactive mode. Use --explain or --force."
178 .into(),
179 ));
180 }
181
182 println!(
183 "{}",
184 output::warning("Potentially destructive changes detected (DROP/TRUNCATE/etc).")
185 );
186 print!("Apply anyway? [y/N]: ");
187 io::stdout()
188 .flush()
189 .map_err(|e| CliError::IoError(e.to_string()))?;
190
191 let mut line = String::new();
192 io::stdin()
193 .read_line(&mut line)
194 .map_err(|e| CliError::IoError(e.to_string()))?;
195 let ans = line.trim().to_ascii_lowercase();
196 Ok(ans == "y" || ans == "yes")
197}
198
199fn generate_push_sql(
200 current: &Snapshot,
201 desired: &Snapshot,
202 breakpoints: bool,
203) -> Result<(Vec<String>, Vec<String>), CliError> {
204 match (current, desired) {
205 (Snapshot::Sqlite(prev_snap), Snapshot::Sqlite(curr_snap)) => {
206 use drizzle_migrations::sqlite::collection::SQLiteDDL;
207 use drizzle_migrations::sqlite::diff::compute_migration;
208
209 let prev_ddl = SQLiteDDL::from_entities(prev_snap.ddl.clone());
210 let cur_ddl = SQLiteDDL::from_entities(curr_snap.ddl.clone());
211
212 let diff = compute_migration(&prev_ddl, &cur_ddl);
213 Ok((diff.sql_statements, diff.warnings))
214 }
215 (Snapshot::Postgres(prev_snap), Snapshot::Postgres(curr_snap)) => {
216 use drizzle_migrations::postgres::diff_full_snapshots;
217 use drizzle_migrations::postgres::statements::PostgresGenerator;
218
219 let diff = diff_full_snapshots(prev_snap, curr_snap);
220 let generator = PostgresGenerator::new().with_breakpoints(breakpoints);
221 Ok((generator.generate(&diff.diffs), Vec::new()))
222 }
223 _ => Err(CliError::DialectMismatch),
224 }
225}
226
227fn execute_statements(
228 credentials: &Credentials,
229 _dialect: Dialect,
230 statements: &[String],
231) -> Result<(), CliError> {
232 let _ = statements;
235
236 match credentials {
237 #[cfg(feature = "rusqlite")]
238 Credentials::Sqlite { path } => execute_sqlite_statements(path, statements),
239
240 #[cfg(not(feature = "rusqlite"))]
241 Credentials::Sqlite { .. } => Err(CliError::MissingDriver {
242 dialect: "SQLite",
243 feature: "rusqlite",
244 }),
245
246 #[cfg(any(feature = "libsql", feature = "turso"))]
247 Credentials::Turso { url, auth_token } => {
248 let _auth_token = auth_token.as_deref();
249 if is_local_libsql(url) {
250 #[cfg(feature = "libsql")]
251 {
252 execute_libsql_local_statements(url, statements)
253 }
254 #[cfg(not(feature = "libsql"))]
255 {
256 Err(CliError::MissingDriver {
257 dialect: "LibSQL (local)",
258 feature: "libsql",
259 })
260 }
261 } else {
262 #[cfg(feature = "turso")]
263 {
264 execute_turso_statements(url, _auth_token, statements)
265 }
266 #[cfg(not(feature = "turso"))]
267 {
268 Err(CliError::MissingDriver {
269 dialect: "Turso (remote)",
270 feature: "turso",
271 })
272 }
273 }
274 }
275
276 #[cfg(all(not(feature = "turso"), not(feature = "libsql")))]
277 Credentials::Turso { .. } => Err(CliError::MissingDriver {
278 dialect: "Turso",
279 feature: "turso or libsql",
280 }),
281
282 #[cfg(feature = "postgres-sync")]
283 Credentials::Postgres(creds) => execute_postgres_sync_statements(creds, statements),
284
285 #[cfg(all(not(feature = "postgres-sync"), feature = "tokio-postgres"))]
286 Credentials::Postgres(creds) => execute_postgres_async_statements(creds, statements),
287
288 #[cfg(all(not(feature = "postgres-sync"), not(feature = "tokio-postgres")))]
289 Credentials::Postgres(_) => Err(CliError::MissingDriver {
290 dialect: "PostgreSQL",
291 feature: "postgres-sync or tokio-postgres",
292 }),
293 }
294}
295
296#[allow(dead_code)]
298fn is_local_libsql(url: &str) -> bool {
299 url.starts_with("file:")
300 || url.starts_with("./")
301 || url.starts_with("/")
302 || !url.contains("://")
303}
304
305#[cfg(any(feature = "rusqlite", feature = "libsql", feature = "turso"))]
306fn process_sqlite_uniques_from_indexes(
307 raw_indexes: &[drizzle_migrations::sqlite::introspect::RawIndexInfo],
308 index_columns: &[drizzle_migrations::sqlite::introspect::RawIndexColumn],
309) -> Vec<drizzle_migrations::sqlite::UniqueConstraint> {
310 use drizzle_migrations::sqlite::UniqueConstraint;
311
312 let mut uniques = Vec::new();
313
314 for idx in raw_indexes.iter().filter(|i| i.origin == "u") {
315 let mut cols: Vec<(i32, String)> = index_columns
316 .iter()
317 .filter(|c| c.index_name == idx.name && c.key)
318 .filter_map(|c| c.name.clone().map(|n| (c.seqno, n)))
319 .collect();
320 cols.sort_by_key(|(seq, _)| *seq);
321 let col_names: Vec<String> = cols.into_iter().map(|(_, n)| n).collect();
322 if col_names.is_empty() {
323 continue;
324 }
325
326 let name_explicit = !idx.name.starts_with("sqlite_autoindex_");
327 let constraint_name = if name_explicit {
328 idx.name.clone()
329 } else {
330 let refs: Vec<&str> = col_names.iter().map(String::as_str).collect();
331 drizzle_migrations::sqlite::ddl::name_for_unique(&idx.table, &refs)
332 };
333
334 let mut uniq =
335 UniqueConstraint::from_strings(idx.table.clone(), constraint_name, col_names);
336 uniq.name_explicit = name_explicit;
337 uniques.push(uniq);
338 }
339
340 uniques
341}
342
343#[cfg(feature = "rusqlite")]
348fn execute_sqlite_statements(path: &str, statements: &[String]) -> Result<(), CliError> {
349 let conn = rusqlite::Connection::open(path).map_err(|e| {
350 CliError::ConnectionError(format!("Failed to open SQLite database '{}': {}", path, e))
351 })?;
352
353 conn.execute("BEGIN", [])
354 .map_err(|e| CliError::MigrationError(e.to_string()))?;
355
356 for stmt in statements {
357 let s = stmt.trim();
358 if s.is_empty() {
359 continue;
360 }
361 if let Err(e) = conn.execute(s, []) {
362 let _ = conn.execute("ROLLBACK", []);
363 return Err(CliError::MigrationError(format!(
364 "Statement failed: {}\n{}",
365 e, s
366 )));
367 }
368 }
369
370 conn.execute("COMMIT", [])
371 .map_err(|e| CliError::MigrationError(e.to_string()))?;
372
373 Ok(())
374}
375
376#[cfg(feature = "rusqlite")]
377fn run_sqlite_migrations(set: &MigrationSet, path: &str) -> Result<MigrationResult, CliError> {
378 let conn = rusqlite::Connection::open(path).map_err(|e| {
379 CliError::ConnectionError(format!("Failed to open SQLite database '{}': {}", path, e))
380 })?;
381
382 conn.execute(&set.create_table_sql(), []).map_err(|e| {
384 CliError::MigrationError(format!("Failed to create migrations table: {}", e))
385 })?;
386
387 let applied_hashes = query_applied_hashes_sqlite(&conn, set)?;
389
390 let pending: Vec<_> = set.pending(&applied_hashes).collect();
392 if pending.is_empty() {
393 return Ok(MigrationResult {
394 applied_count: 0,
395 applied_migrations: vec![],
396 });
397 }
398
399 conn.execute("BEGIN", [])
401 .map_err(|e| CliError::MigrationError(e.to_string()))?;
402
403 let mut applied = Vec::new();
404 for migration in &pending {
405 for stmt in migration.statements() {
406 if !stmt.trim().is_empty()
407 && let Err(e) = conn.execute(stmt, [])
408 {
409 let _ = conn.execute("ROLLBACK", []);
410 return Err(CliError::MigrationError(format!(
411 "Migration '{}' failed: {}",
412 migration.hash(),
413 e
414 )));
415 }
416 }
417 if let Err(e) = conn.execute(
418 &set.record_migration_sql(migration.hash(), migration.created_at()),
419 [],
420 ) {
421 let _ = conn.execute("ROLLBACK", []);
422 return Err(CliError::MigrationError(e.to_string()));
423 }
424 applied.push(migration.hash().to_string());
425 }
426
427 conn.execute("COMMIT", [])
428 .map_err(|e| CliError::MigrationError(e.to_string()))?;
429
430 Ok(MigrationResult {
431 applied_count: applied.len(),
432 applied_migrations: applied,
433 })
434}
435
436#[cfg(feature = "rusqlite")]
437fn query_applied_hashes_sqlite(
438 conn: &rusqlite::Connection,
439 set: &MigrationSet,
440) -> Result<Vec<String>, CliError> {
441 let mut stmt = match conn.prepare(&set.query_all_hashes_sql()) {
442 Ok(s) => s,
443 Err(_) => return Ok(vec![]), };
445
446 let hashes = stmt
447 .query_map([], |row| row.get(0))
448 .map_err(|e| CliError::MigrationError(e.to_string()))?
449 .filter_map(Result::ok)
450 .collect();
451
452 Ok(hashes)
453}
454
455#[cfg(feature = "postgres-sync")]
460fn execute_postgres_sync_statements(
461 creds: &PostgresCreds,
462 statements: &[String],
463) -> Result<(), CliError> {
464 let url = creds.connection_url();
465 let mut client = postgres::Client::connect(&url, postgres::NoTls).map_err(|e| {
466 CliError::ConnectionError(format!("Failed to connect to PostgreSQL: {}", e))
467 })?;
468
469 let mut tx = client
470 .transaction()
471 .map_err(|e| CliError::MigrationError(e.to_string()))?;
472
473 for stmt in statements {
474 let s = stmt.trim();
475 if s.is_empty() {
476 continue;
477 }
478 tx.execute(s, &[])
479 .map_err(|e| CliError::MigrationError(format!("Statement failed: {}\n{}", e, s)))?;
480 }
481
482 tx.commit()
483 .map_err(|e| CliError::MigrationError(e.to_string()))?;
484
485 Ok(())
486}
487
488#[cfg(feature = "postgres-sync")]
489fn run_postgres_sync_migrations(
490 set: &MigrationSet,
491 creds: &PostgresCreds,
492) -> Result<MigrationResult, CliError> {
493 let url = creds.connection_url();
494 let mut client = postgres::Client::connect(&url, postgres::NoTls).map_err(|e| {
495 CliError::ConnectionError(format!("Failed to connect to PostgreSQL: {}", e))
496 })?;
497
498 if let Some(schema_sql) = set.create_schema_sql() {
500 client
501 .execute(&schema_sql, &[])
502 .map_err(|e| CliError::MigrationError(e.to_string()))?;
503 }
504
505 client
507 .execute(&set.create_table_sql(), &[])
508 .map_err(|e| CliError::MigrationError(e.to_string()))?;
509
510 let rows = client
512 .query(&set.query_all_hashes_sql(), &[])
513 .unwrap_or_default();
514 let applied_hashes: Vec<String> = rows.iter().map(|r| r.get(0)).collect();
515
516 let pending: Vec<_> = set.pending(&applied_hashes).collect();
518 if pending.is_empty() {
519 return Ok(MigrationResult {
520 applied_count: 0,
521 applied_migrations: vec![],
522 });
523 }
524
525 let mut tx = client
527 .transaction()
528 .map_err(|e| CliError::MigrationError(e.to_string()))?;
529
530 let mut applied = Vec::new();
531 for migration in &pending {
532 for stmt in migration.statements() {
533 if !stmt.trim().is_empty() {
534 tx.execute(stmt, &[]).map_err(|e| {
535 CliError::MigrationError(format!(
536 "Migration '{}' failed: {}",
537 migration.hash(),
538 e
539 ))
540 })?;
541 }
542 }
543 tx.execute(
544 &set.record_migration_sql(migration.hash(), migration.created_at()),
545 &[],
546 )
547 .map_err(|e| CliError::MigrationError(e.to_string()))?;
548 applied.push(migration.hash().to_string());
549 }
550
551 tx.commit()
552 .map_err(|e| CliError::MigrationError(e.to_string()))?;
553
554 Ok(MigrationResult {
555 applied_count: applied.len(),
556 applied_migrations: applied,
557 })
558}
559
560#[cfg(feature = "tokio-postgres")]
565fn execute_postgres_async_statements(
566 creds: &PostgresCreds,
567 statements: &[String],
568) -> Result<(), CliError> {
569 let rt = tokio::runtime::Builder::new_current_thread()
570 .enable_all()
571 .build()
572 .map_err(|e| CliError::Other(format!("Failed to create async runtime: {}", e)))?;
573
574 rt.block_on(execute_postgres_async_inner(creds, statements))
575}
576
577#[cfg(feature = "tokio-postgres")]
578async fn execute_postgres_async_inner(
579 creds: &PostgresCreds,
580 statements: &[String],
581) -> Result<(), CliError> {
582 let url = creds.connection_url();
583 let (mut client, connection) = tokio_postgres::connect(&url, tokio_postgres::NoTls)
584 .await
585 .map_err(|e| {
586 CliError::ConnectionError(format!("Failed to connect to PostgreSQL: {}", e))
587 })?;
588
589 tokio::spawn(async move {
590 if let Err(e) = connection.await {
591 eprintln!(
592 "{}",
593 output::err_line(&format!("PostgreSQL connection error: {e}"))
594 );
595 }
596 });
597
598 let tx = client
599 .transaction()
600 .await
601 .map_err(|e| CliError::MigrationError(e.to_string()))?;
602
603 for stmt in statements {
604 let s = stmt.trim();
605 if s.is_empty() {
606 continue;
607 }
608 tx.execute(s, &[])
609 .await
610 .map_err(|e| CliError::MigrationError(format!("Statement failed: {}\n{}", e, s)))?;
611 }
612
613 tx.commit()
614 .await
615 .map_err(|e| CliError::MigrationError(e.to_string()))?;
616
617 Ok(())
618}
619
620#[cfg(feature = "tokio-postgres")]
621#[allow(dead_code)] fn run_postgres_async_migrations(
623 set: &MigrationSet,
624 creds: &PostgresCreds,
625) -> Result<MigrationResult, CliError> {
626 let rt = tokio::runtime::Builder::new_current_thread()
627 .enable_all()
628 .build()
629 .map_err(|e| CliError::Other(format!("Failed to create async runtime: {}", e)))?;
630
631 rt.block_on(run_postgres_async_inner(set, creds))
632}
633
634#[cfg(feature = "tokio-postgres")]
635#[allow(dead_code)]
636async fn run_postgres_async_inner(
637 set: &MigrationSet,
638 creds: &PostgresCreds,
639) -> Result<MigrationResult, CliError> {
640 let url = creds.connection_url();
641 let (mut client, connection) = tokio_postgres::connect(&url, tokio_postgres::NoTls)
642 .await
643 .map_err(|e| {
644 CliError::ConnectionError(format!("Failed to connect to PostgreSQL: {}", e))
645 })?;
646
647 tokio::spawn(async move {
649 if let Err(e) = connection.await {
650 eprintln!(
651 "{}",
652 output::err_line(&format!("PostgreSQL connection error: {e}"))
653 );
654 }
655 });
656
657 if let Some(schema_sql) = set.create_schema_sql() {
659 client
660 .execute(&schema_sql, &[])
661 .await
662 .map_err(|e| CliError::MigrationError(e.to_string()))?;
663 }
664
665 client
667 .execute(&set.create_table_sql(), &[])
668 .await
669 .map_err(|e| CliError::MigrationError(e.to_string()))?;
670
671 let rows = client
673 .query(&set.query_all_hashes_sql(), &[])
674 .await
675 .unwrap_or_default();
676 let applied_hashes: Vec<String> = rows.iter().map(|r| r.get(0)).collect();
677
678 let pending: Vec<_> = set.pending(&applied_hashes).collect();
680 if pending.is_empty() {
681 return Ok(MigrationResult {
682 applied_count: 0,
683 applied_migrations: vec![],
684 });
685 }
686
687 let tx = client
689 .transaction()
690 .await
691 .map_err(|e| CliError::MigrationError(e.to_string()))?;
692
693 let mut applied = Vec::new();
694 for migration in &pending {
695 for stmt in migration.statements() {
696 if !stmt.trim().is_empty() {
697 tx.execute(stmt, &[]).await.map_err(|e| {
698 CliError::MigrationError(format!(
699 "Migration '{}' failed: {}",
700 migration.hash(),
701 e
702 ))
703 })?;
704 }
705 }
706 tx.execute(
707 &set.record_migration_sql(migration.hash(), migration.created_at()),
708 &[],
709 )
710 .await
711 .map_err(|e| CliError::MigrationError(e.to_string()))?;
712 applied.push(migration.hash().to_string());
713 }
714
715 tx.commit()
716 .await
717 .map_err(|e| CliError::MigrationError(e.to_string()))?;
718
719 Ok(MigrationResult {
720 applied_count: applied.len(),
721 applied_migrations: applied,
722 })
723}
724
725#[cfg(feature = "libsql")]
730fn execute_libsql_local_statements(path: &str, statements: &[String]) -> Result<(), CliError> {
731 let rt = tokio::runtime::Builder::new_current_thread()
732 .enable_all()
733 .build()
734 .map_err(|e| CliError::Other(format!("Failed to create async runtime: {}", e)))?;
735
736 rt.block_on(execute_libsql_local_inner(path, statements))
737}
738
739#[cfg(feature = "libsql")]
740async fn execute_libsql_local_inner(path: &str, statements: &[String]) -> Result<(), CliError> {
741 let db = libsql::Builder::new_local(path)
742 .build()
743 .await
744 .map_err(|e| {
745 CliError::ConnectionError(format!("Failed to open LibSQL database '{}': {}", path, e))
746 })?;
747
748 let conn = db
749 .connect()
750 .map_err(|e| CliError::ConnectionError(e.to_string()))?;
751
752 let tx = conn
753 .transaction()
754 .await
755 .map_err(|e| CliError::MigrationError(e.to_string()))?;
756
757 for stmt in statements {
758 let s = stmt.trim();
759 if s.is_empty() {
760 continue;
761 }
762 if let Err(e) = tx.execute(s, ()).await {
763 tx.rollback().await.ok();
764 return Err(CliError::MigrationError(format!(
765 "Statement failed: {}\n{}",
766 e, s
767 )));
768 }
769 }
770
771 tx.commit()
772 .await
773 .map_err(|e| CliError::MigrationError(e.to_string()))?;
774
775 Ok(())
776}
777
778#[cfg(feature = "libsql")]
779fn run_libsql_local_migrations(
780 set: &MigrationSet,
781 path: &str,
782) -> Result<MigrationResult, CliError> {
783 let rt = tokio::runtime::Builder::new_current_thread()
784 .enable_all()
785 .build()
786 .map_err(|e| CliError::Other(format!("Failed to create async runtime: {}", e)))?;
787
788 rt.block_on(run_libsql_local_inner(set, path))
789}
790
791#[cfg(feature = "libsql")]
792async fn run_libsql_local_inner(
793 set: &MigrationSet,
794 path: &str,
795) -> Result<MigrationResult, CliError> {
796 let db = libsql::Builder::new_local(path)
797 .build()
798 .await
799 .map_err(|e| {
800 CliError::ConnectionError(format!("Failed to open LibSQL database '{}': {}", path, e))
801 })?;
802
803 let conn = db
804 .connect()
805 .map_err(|e| CliError::ConnectionError(e.to_string()))?;
806
807 conn.execute(&set.create_table_sql(), ())
809 .await
810 .map_err(|e| {
811 CliError::MigrationError(format!("Failed to create migrations table: {}", e))
812 })?;
813
814 let applied_hashes = query_applied_hashes_libsql(&conn, set).await?;
816
817 let pending: Vec<_> = set.pending(&applied_hashes).collect();
819 if pending.is_empty() {
820 return Ok(MigrationResult {
821 applied_count: 0,
822 applied_migrations: vec![],
823 });
824 }
825
826 let tx = conn
828 .transaction()
829 .await
830 .map_err(|e| CliError::MigrationError(e.to_string()))?;
831
832 let mut applied = Vec::new();
833 for migration in &pending {
834 for stmt in migration.statements() {
835 if !stmt.trim().is_empty()
836 && let Err(e) = tx.execute(stmt, ()).await
837 {
838 tx.rollback().await.ok();
839 return Err(CliError::MigrationError(format!(
840 "Migration '{}' failed: {}",
841 migration.hash(),
842 e
843 )));
844 }
845 }
846 if let Err(e) = tx
847 .execute(
848 &set.record_migration_sql(migration.hash(), migration.created_at()),
849 (),
850 )
851 .await
852 {
853 tx.rollback().await.ok();
854 return Err(CliError::MigrationError(e.to_string()));
855 }
856 applied.push(migration.hash().to_string());
857 }
858
859 tx.commit()
860 .await
861 .map_err(|e| CliError::MigrationError(e.to_string()))?;
862
863 Ok(MigrationResult {
864 applied_count: applied.len(),
865 applied_migrations: applied,
866 })
867}
868
869#[cfg(feature = "libsql")]
870async fn query_applied_hashes_libsql(
871 conn: &libsql::Connection,
872 set: &MigrationSet,
873) -> Result<Vec<String>, CliError> {
874 let mut rows = match conn.query(&set.query_all_hashes_sql(), ()).await {
875 Ok(r) => r,
876 Err(_) => return Ok(vec![]), };
878
879 let mut hashes = Vec::new();
880 while let Ok(Some(row)) = rows.next().await {
881 if let Ok(hash) = row.get::<String>(0) {
882 hashes.push(hash);
883 }
884 }
885
886 Ok(hashes)
887}
888
889#[cfg(feature = "turso")]
894fn execute_turso_statements(
895 url: &str,
896 auth_token: Option<&str>,
897 statements: &[String],
898) -> Result<(), CliError> {
899 let rt = tokio::runtime::Builder::new_current_thread()
900 .enable_all()
901 .build()
902 .map_err(|e| CliError::Other(format!("Failed to create async runtime: {}", e)))?;
903
904 rt.block_on(execute_turso_inner(url, auth_token, statements))
905}
906
907#[cfg(feature = "turso")]
908async fn execute_turso_inner(
909 url: &str,
910 auth_token: Option<&str>,
911 statements: &[String],
912) -> Result<(), CliError> {
913 let builder =
914 libsql::Builder::new_remote(url.to_string(), auth_token.unwrap_or("").to_string());
915
916 let db = builder.build().await.map_err(|e| {
917 CliError::ConnectionError(format!("Failed to connect to Turso '{}': {}", url, e))
918 })?;
919
920 let conn = db
921 .connect()
922 .map_err(|e| CliError::ConnectionError(e.to_string()))?;
923
924 let tx = conn
925 .transaction()
926 .await
927 .map_err(|e| CliError::MigrationError(e.to_string()))?;
928
929 for stmt in statements {
930 let s = stmt.trim();
931 if s.is_empty() {
932 continue;
933 }
934 if let Err(e) = tx.execute(s, ()).await {
935 tx.rollback().await.ok();
936 return Err(CliError::MigrationError(format!(
937 "Statement failed: {}\n{}",
938 e, s
939 )));
940 }
941 }
942
943 tx.commit()
944 .await
945 .map_err(|e| CliError::MigrationError(e.to_string()))?;
946
947 Ok(())
948}
949
950#[cfg(feature = "turso")]
951fn run_turso_migrations(
952 set: &MigrationSet,
953 url: &str,
954 auth_token: Option<&str>,
955) -> Result<MigrationResult, CliError> {
956 let rt = tokio::runtime::Builder::new_current_thread()
957 .enable_all()
958 .build()
959 .map_err(|e| CliError::Other(format!("Failed to create async runtime: {}", e)))?;
960
961 rt.block_on(run_turso_inner(set, url, auth_token))
962}
963
964#[cfg(feature = "turso")]
965async fn run_turso_inner(
966 set: &MigrationSet,
967 url: &str,
968 auth_token: Option<&str>,
969) -> Result<MigrationResult, CliError> {
970 let builder =
971 libsql::Builder::new_remote(url.to_string(), auth_token.unwrap_or("").to_string());
972
973 let db = builder.build().await.map_err(|e| {
974 CliError::ConnectionError(format!("Failed to connect to Turso '{}': {}", url, e))
975 })?;
976
977 let conn = db
978 .connect()
979 .map_err(|e| CliError::ConnectionError(e.to_string()))?;
980
981 conn.execute(&set.create_table_sql(), ())
983 .await
984 .map_err(|e| {
985 CliError::MigrationError(format!("Failed to create migrations table: {}", e))
986 })?;
987
988 let applied_hashes = query_applied_hashes_turso(&conn, set).await?;
990
991 let pending: Vec<_> = set.pending(&applied_hashes).collect();
993 if pending.is_empty() {
994 return Ok(MigrationResult {
995 applied_count: 0,
996 applied_migrations: vec![],
997 });
998 }
999
1000 let tx = conn
1002 .transaction()
1003 .await
1004 .map_err(|e| CliError::MigrationError(e.to_string()))?;
1005
1006 let mut applied = Vec::new();
1007 for migration in &pending {
1008 for stmt in migration.statements() {
1009 if !stmt.trim().is_empty()
1010 && let Err(e) = tx.execute(stmt, ()).await
1011 {
1012 tx.rollback().await.ok();
1013 return Err(CliError::MigrationError(format!(
1014 "Migration '{}' failed: {}",
1015 migration.hash(),
1016 e
1017 )));
1018 }
1019 }
1020 if let Err(e) = tx
1021 .execute(
1022 &set.record_migration_sql(migration.hash(), migration.created_at()),
1023 (),
1024 )
1025 .await
1026 {
1027 tx.rollback().await.ok();
1028 return Err(CliError::MigrationError(e.to_string()));
1029 }
1030 applied.push(migration.hash().to_string());
1031 }
1032
1033 tx.commit()
1034 .await
1035 .map_err(|e| CliError::MigrationError(e.to_string()))?;
1036
1037 Ok(MigrationResult {
1038 applied_count: applied.len(),
1039 applied_migrations: applied,
1040 })
1041}
1042
1043#[cfg(feature = "turso")]
1044async fn query_applied_hashes_turso(
1045 conn: &libsql::Connection,
1046 set: &MigrationSet,
1047) -> Result<Vec<String>, CliError> {
1048 let mut rows = match conn.query(&set.query_all_hashes_sql(), ()).await {
1049 Ok(r) => r,
1050 Err(_) => return Ok(vec![]), };
1052
1053 let mut hashes = Vec::new();
1054 while let Ok(Some(row)) = rows.next().await {
1055 if let Ok(hash) = row.get::<String>(0) {
1056 hashes.push(hash);
1057 }
1058 }
1059
1060 Ok(hashes)
1061}
1062
1063#[derive(Debug)]
1069pub struct IntrospectResult {
1070 pub schema_code: String,
1072 pub table_count: usize,
1074 pub index_count: usize,
1076 pub view_count: usize,
1078 pub warnings: Vec<String>,
1080 pub snapshot: Snapshot,
1082 pub snapshot_path: std::path::PathBuf,
1084}
1085
1086pub fn run_introspection(
1090 credentials: &Credentials,
1091 dialect: Dialect,
1092 out_dir: &Path,
1093 init_metadata: bool,
1094 migrations_table: &str,
1095 migrations_schema: &str,
1096) -> Result<IntrospectResult, CliError> {
1097 use drizzle_migrations::journal::Journal;
1098 use drizzle_migrations::words::generate_migration_tag;
1099
1100 let mut result = introspect_database(credentials, dialect)?;
1102
1103 let schema_path = out_dir.join("schema.rs");
1105 if let Some(parent) = schema_path.parent() {
1106 std::fs::create_dir_all(parent).map_err(|e| {
1107 CliError::Other(format!(
1108 "Failed to create output directory '{}': {}",
1109 parent.display(),
1110 e
1111 ))
1112 })?;
1113 }
1114 std::fs::write(&schema_path, &result.schema_code).map_err(|e| {
1115 CliError::Other(format!(
1116 "Failed to write schema file '{}': {}",
1117 schema_path.display(),
1118 e
1119 ))
1120 })?;
1121
1122 let meta_dir = out_dir.join("meta");
1124 std::fs::create_dir_all(&meta_dir).map_err(|e| {
1125 CliError::Other(format!(
1126 "Failed to create meta directory '{}': {}",
1127 meta_dir.display(),
1128 e
1129 ))
1130 })?;
1131
1132 let journal_path = meta_dir.join("_journal.json");
1134 let mut journal = Journal::load_or_create(&journal_path, dialect.to_base())
1135 .map_err(|e| CliError::Other(format!("Failed to load journal: {}", e)))?;
1136
1137 let tag = generate_migration_tag(None);
1139
1140 let migration_dir = out_dir.join(&tag);
1142 std::fs::create_dir_all(&migration_dir).map_err(|e| {
1143 CliError::Other(format!(
1144 "Failed to create migration directory '{}': {}",
1145 migration_dir.display(),
1146 e
1147 ))
1148 })?;
1149
1150 let snapshot_path = migration_dir.join("snapshot.json");
1152 result.snapshot.save(&snapshot_path).map_err(|e| {
1153 CliError::Other(format!(
1154 "Failed to write snapshot file '{}': {}",
1155 snapshot_path.display(),
1156 e
1157 ))
1158 })?;
1159
1160 let base_dialect = dialect.to_base();
1162 let empty_snapshot = Snapshot::empty(base_dialect);
1163 let sql_statements = generate_introspect_migration(&empty_snapshot, &result.snapshot, true)?;
1164
1165 let migration_sql_path = migration_dir.join("migration.sql");
1167 let sql_content = if sql_statements.is_empty() {
1168 "-- No tables to create (empty database)\n".to_string()
1169 } else {
1170 sql_statements.join("\n--> statement-breakpoint\n")
1171 };
1172 std::fs::write(&migration_sql_path, &sql_content).map_err(|e| {
1173 CliError::Other(format!(
1174 "Failed to write migration file '{}': {}",
1175 migration_sql_path.display(),
1176 e
1177 ))
1178 })?;
1179
1180 result.snapshot_path = snapshot_path;
1182
1183 journal.add_entry(tag.clone(), true); journal
1186 .save(&journal_path)
1187 .map_err(|e| CliError::Other(format!("Failed to save journal: {}", e)))?;
1188
1189 if init_metadata {
1190 apply_init_metadata(
1191 credentials,
1192 dialect,
1193 out_dir,
1194 migrations_table,
1195 migrations_schema,
1196 )?;
1197 }
1198
1199 Ok(result)
1200}
1201
1202fn apply_init_metadata(
1207 credentials: &Credentials,
1208 dialect: Dialect,
1209 out_dir: &Path,
1210 migrations_table: &str,
1211 migrations_schema: &str,
1212) -> Result<(), CliError> {
1213 use drizzle_migrations::MigrationSet;
1214
1215 let mut set = MigrationSet::from_dir(out_dir, dialect.to_base())
1216 .map_err(|e| CliError::Other(format!("Failed to load migrations: {}", e)))?;
1217
1218 if !migrations_table.trim().is_empty() {
1219 set = set.with_table(migrations_table.to_string());
1220 }
1221 if dialect == Dialect::Postgresql && !migrations_schema.trim().is_empty() {
1222 set = set.with_schema(migrations_schema.to_string());
1223 }
1224
1225 if set.all().is_empty() {
1226 return Err(CliError::Other(
1227 "--init can't be used with empty migrations".into(),
1228 ));
1229 }
1230
1231 match credentials {
1232 #[cfg(feature = "rusqlite")]
1233 Credentials::Sqlite { path } => init_sqlite_metadata(path, &set),
1234
1235 #[cfg(not(feature = "rusqlite"))]
1236 Credentials::Sqlite { .. } => Err(CliError::MissingDriver {
1237 dialect: "SQLite",
1238 feature: "rusqlite",
1239 }),
1240
1241 #[cfg(any(feature = "libsql", feature = "turso"))]
1242 Credentials::Turso { url, auth_token } => {
1243 let _auth_token = auth_token.as_deref();
1244 if is_local_libsql(url) {
1245 #[cfg(feature = "libsql")]
1246 {
1247 init_libsql_local_metadata(url, &set)
1248 }
1249 #[cfg(not(feature = "libsql"))]
1250 {
1251 Err(CliError::MissingDriver {
1252 dialect: "LibSQL (local)",
1253 feature: "libsql",
1254 })
1255 }
1256 } else {
1257 #[cfg(feature = "turso")]
1258 {
1259 init_turso_metadata(url, _auth_token, &set)
1260 }
1261 #[cfg(not(feature = "turso"))]
1262 {
1263 Err(CliError::MissingDriver {
1264 dialect: "Turso (remote)",
1265 feature: "turso",
1266 })
1267 }
1268 }
1269 }
1270
1271 #[cfg(all(not(feature = "turso"), not(feature = "libsql")))]
1272 Credentials::Turso { .. } => Err(CliError::MissingDriver {
1273 dialect: "Turso",
1274 feature: "turso or libsql",
1275 }),
1276
1277 #[cfg(feature = "postgres-sync")]
1278 Credentials::Postgres(creds) => init_postgres_sync_metadata(creds, &set),
1279
1280 #[cfg(all(not(feature = "postgres-sync"), feature = "tokio-postgres"))]
1281 Credentials::Postgres(creds) => init_postgres_async_metadata(creds, &set),
1282
1283 #[cfg(all(not(feature = "postgres-sync"), not(feature = "tokio-postgres")))]
1284 Credentials::Postgres(_) => Err(CliError::MissingDriver {
1285 dialect: "PostgreSQL",
1286 feature: "postgres-sync or tokio-postgres",
1287 }),
1288 }
1289}
1290
1291#[cfg(any(
1292 feature = "rusqlite",
1293 feature = "libsql",
1294 feature = "turso",
1295 feature = "postgres-sync",
1296 feature = "tokio-postgres"
1297))]
1298fn validate_init_metadata(applied_hashes: &[String], set: &MigrationSet) -> Result<(), CliError> {
1299 if !applied_hashes.is_empty() {
1300 return Err(CliError::Other(
1301 "--init can't be used when database already has migrations set".into(),
1302 ));
1303 }
1304
1305 let first = set
1306 .all()
1307 .first()
1308 .ok_or_else(|| CliError::Other("--init can't be used with empty migrations".into()))?;
1309
1310 let created_at = first.created_at();
1311 if set.all().iter().any(|m| m.created_at() != created_at) {
1312 return Err(CliError::Other(
1313 "--init can't be used with existing migrations".into(),
1314 ));
1315 }
1316
1317 Ok(())
1318}
1319
1320#[cfg(feature = "rusqlite")]
1325fn init_sqlite_metadata(path: &str, set: &MigrationSet) -> Result<(), CliError> {
1326 let conn = rusqlite::Connection::open(path).map_err(|e| {
1327 CliError::ConnectionError(format!("Failed to open SQLite database '{}': {}", path, e))
1328 })?;
1329
1330 conn.execute(&set.create_table_sql(), []).map_err(|e| {
1331 CliError::MigrationError(format!("Failed to create migrations table: {}", e))
1332 })?;
1333
1334 let applied_hashes = query_applied_hashes_sqlite(&conn, set)?;
1335 validate_init_metadata(&applied_hashes, set)?;
1336
1337 let first = set
1338 .all()
1339 .first()
1340 .ok_or_else(|| CliError::Other("--init can't be used with empty migrations".into()))?;
1341
1342 conn.execute(
1343 &set.record_migration_sql(first.hash(), first.created_at()),
1344 [],
1345 )
1346 .map_err(|e| CliError::MigrationError(e.to_string()))?;
1347
1348 Ok(())
1349}
1350
1351#[cfg(feature = "libsql")]
1352fn init_libsql_local_metadata(path: &str, set: &MigrationSet) -> Result<(), CliError> {
1353 let rt = tokio::runtime::Builder::new_current_thread()
1354 .enable_all()
1355 .build()
1356 .map_err(|e| CliError::Other(format!("Failed to create async runtime: {}", e)))?;
1357
1358 rt.block_on(init_libsql_local_metadata_inner(path, set))
1359}
1360
1361#[cfg(feature = "libsql")]
1362async fn init_libsql_local_metadata_inner(path: &str, set: &MigrationSet) -> Result<(), CliError> {
1363 let db = libsql::Builder::new_local(path)
1364 .build()
1365 .await
1366 .map_err(|e| {
1367 CliError::ConnectionError(format!("Failed to open LibSQL database '{}': {}", path, e))
1368 })?;
1369
1370 let conn = db
1371 .connect()
1372 .map_err(|e| CliError::ConnectionError(e.to_string()))?;
1373
1374 conn.execute(&set.create_table_sql(), ())
1375 .await
1376 .map_err(|e| {
1377 CliError::MigrationError(format!("Failed to create migrations table: {}", e))
1378 })?;
1379
1380 let applied_hashes = query_applied_hashes_libsql(&conn, set).await?;
1381 validate_init_metadata(&applied_hashes, set)?;
1382
1383 let first = set
1384 .all()
1385 .first()
1386 .ok_or_else(|| CliError::Other("--init can't be used with empty migrations".into()))?;
1387
1388 conn.execute(
1389 &set.record_migration_sql(first.hash(), first.created_at()),
1390 (),
1391 )
1392 .await
1393 .map_err(|e| CliError::MigrationError(e.to_string()))?;
1394
1395 Ok(())
1396}
1397
1398#[cfg(feature = "turso")]
1399fn init_turso_metadata(
1400 url: &str,
1401 auth_token: Option<&str>,
1402 set: &MigrationSet,
1403) -> Result<(), CliError> {
1404 let rt = tokio::runtime::Builder::new_current_thread()
1405 .enable_all()
1406 .build()
1407 .map_err(|e| CliError::Other(format!("Failed to create async runtime: {}", e)))?;
1408
1409 rt.block_on(init_turso_metadata_inner(url, auth_token, set))
1410}
1411
1412#[cfg(feature = "turso")]
1413async fn init_turso_metadata_inner(
1414 url: &str,
1415 auth_token: Option<&str>,
1416 set: &MigrationSet,
1417) -> Result<(), CliError> {
1418 let builder =
1419 libsql::Builder::new_remote(url.to_string(), auth_token.unwrap_or("").to_string());
1420
1421 let db = builder.build().await.map_err(|e| {
1422 CliError::ConnectionError(format!("Failed to connect to Turso '{}': {}", url, e))
1423 })?;
1424
1425 let conn = db
1426 .connect()
1427 .map_err(|e| CliError::ConnectionError(e.to_string()))?;
1428
1429 conn.execute(&set.create_table_sql(), ())
1430 .await
1431 .map_err(|e| {
1432 CliError::MigrationError(format!("Failed to create migrations table: {}", e))
1433 })?;
1434
1435 let applied_hashes = query_applied_hashes_turso(&conn, set).await?;
1436 validate_init_metadata(&applied_hashes, set)?;
1437
1438 let first = set
1439 .all()
1440 .first()
1441 .ok_or_else(|| CliError::Other("--init can't be used with empty migrations".into()))?;
1442
1443 conn.execute(
1444 &set.record_migration_sql(first.hash(), first.created_at()),
1445 (),
1446 )
1447 .await
1448 .map_err(|e| CliError::MigrationError(e.to_string()))?;
1449
1450 Ok(())
1451}
1452
1453#[cfg(feature = "postgres-sync")]
1454fn init_postgres_sync_metadata(creds: &PostgresCreds, set: &MigrationSet) -> Result<(), CliError> {
1455 let url = creds.connection_url();
1456 let mut client = postgres::Client::connect(&url, postgres::NoTls).map_err(|e| {
1457 CliError::ConnectionError(format!("Failed to connect to PostgreSQL: {}", e))
1458 })?;
1459
1460 if let Some(schema_sql) = set.create_schema_sql() {
1461 client
1462 .execute(&schema_sql, &[])
1463 .map_err(|e| CliError::MigrationError(e.to_string()))?;
1464 }
1465
1466 client
1467 .execute(&set.create_table_sql(), &[])
1468 .map_err(|e| CliError::MigrationError(e.to_string()))?;
1469
1470 let rows = client
1471 .query(&set.query_all_hashes_sql(), &[])
1472 .unwrap_or_default();
1473 let applied_hashes: Vec<String> = rows.iter().map(|r| r.get(0)).collect();
1474
1475 validate_init_metadata(&applied_hashes, set)?;
1476
1477 let first = set
1478 .all()
1479 .first()
1480 .ok_or_else(|| CliError::Other("--init can't be used with empty migrations".into()))?;
1481
1482 client
1483 .execute(
1484 &set.record_migration_sql(first.hash(), first.created_at()),
1485 &[],
1486 )
1487 .map_err(|e| CliError::MigrationError(e.to_string()))?;
1488
1489 Ok(())
1490}
1491
1492#[cfg(feature = "tokio-postgres")]
1493fn init_postgres_async_metadata(creds: &PostgresCreds, set: &MigrationSet) -> Result<(), CliError> {
1494 let rt = tokio::runtime::Builder::new_current_thread()
1495 .enable_all()
1496 .build()
1497 .map_err(|e| CliError::Other(format!("Failed to create async runtime: {}", e)))?;
1498
1499 rt.block_on(init_postgres_async_inner(creds, set))
1500}
1501
1502#[cfg(feature = "tokio-postgres")]
1503async fn init_postgres_async_inner(
1504 creds: &PostgresCreds,
1505 set: &MigrationSet,
1506) -> Result<(), CliError> {
1507 let url = creds.connection_url();
1508 let (client, connection) = tokio_postgres::connect(&url, tokio_postgres::NoTls)
1509 .await
1510 .map_err(|e| {
1511 CliError::ConnectionError(format!("Failed to connect to PostgreSQL: {}", e))
1512 })?;
1513
1514 tokio::spawn(async move {
1515 if let Err(e) = connection.await {
1516 eprintln!(
1517 "{}",
1518 output::err_line(&format!("PostgreSQL connection error: {e}"))
1519 );
1520 }
1521 });
1522
1523 if let Some(schema_sql) = set.create_schema_sql() {
1524 client
1525 .execute(&schema_sql, &[])
1526 .await
1527 .map_err(|e| CliError::MigrationError(e.to_string()))?;
1528 }
1529
1530 client
1531 .execute(&set.create_table_sql(), &[])
1532 .await
1533 .map_err(|e| CliError::MigrationError(e.to_string()))?;
1534
1535 let rows = client
1536 .query(&set.query_all_hashes_sql(), &[])
1537 .await
1538 .unwrap_or_default();
1539 let applied_hashes: Vec<String> = rows.iter().map(|r| r.get(0)).collect();
1540
1541 validate_init_metadata(&applied_hashes, set)?;
1542
1543 let first = set
1544 .all()
1545 .first()
1546 .ok_or_else(|| CliError::Other("--init can't be used with empty migrations".into()))?;
1547
1548 client
1549 .execute(
1550 &set.record_migration_sql(first.hash(), first.created_at()),
1551 &[],
1552 )
1553 .await
1554 .map_err(|e| CliError::MigrationError(e.to_string()))?;
1555
1556 Ok(())
1557}
1558
1559fn generate_introspect_migration(
1561 prev: &Snapshot,
1562 current: &Snapshot,
1563 _breakpoints: bool,
1564) -> Result<Vec<String>, CliError> {
1565 match (prev, current) {
1566 (Snapshot::Sqlite(prev_snap), Snapshot::Sqlite(curr_snap)) => {
1567 use drizzle_migrations::sqlite::diff_snapshots;
1568 use drizzle_migrations::sqlite::statements::SqliteGenerator;
1569
1570 let diff = diff_snapshots(prev_snap, curr_snap);
1571 let generator = SqliteGenerator::new().with_breakpoints(true);
1572 Ok(generator.generate_migration(&diff))
1573 }
1574 (Snapshot::Postgres(prev_snap), Snapshot::Postgres(curr_snap)) => {
1575 use drizzle_migrations::postgres::diff_full_snapshots;
1576 use drizzle_migrations::postgres::statements::PostgresGenerator;
1577
1578 let diff = diff_full_snapshots(prev_snap, curr_snap);
1579 let generator = PostgresGenerator::new().with_breakpoints(true);
1580 Ok(generator.generate(&diff.diffs))
1581 }
1582 _ => Err(CliError::DialectMismatch),
1583 }
1584}
1585
1586fn introspect_database(
1588 credentials: &Credentials,
1589 dialect: Dialect,
1590) -> Result<IntrospectResult, CliError> {
1591 match dialect {
1592 Dialect::Sqlite | Dialect::Turso => introspect_sqlite_dialect(credentials),
1593 Dialect::Postgresql => introspect_postgres_dialect(credentials),
1594 }
1595}
1596
1597fn introspect_sqlite_dialect(credentials: &Credentials) -> Result<IntrospectResult, CliError> {
1599 match credentials {
1600 #[cfg(feature = "rusqlite")]
1601 Credentials::Sqlite { path } => introspect_rusqlite(path),
1602
1603 #[cfg(not(feature = "rusqlite"))]
1604 Credentials::Sqlite { .. } => Err(CliError::MissingDriver {
1605 dialect: "SQLite",
1606 feature: "rusqlite",
1607 }),
1608
1609 #[cfg(any(feature = "libsql", feature = "turso"))]
1610 Credentials::Turso { url, auth_token } => {
1611 let _auth_token = auth_token.as_deref();
1612 if is_local_libsql(url) {
1613 #[cfg(feature = "libsql")]
1614 {
1615 introspect_libsql_local(url)
1616 }
1617 #[cfg(not(feature = "libsql"))]
1618 {
1619 Err(CliError::MissingDriver {
1620 dialect: "LibSQL (local)",
1621 feature: "libsql",
1622 })
1623 }
1624 } else {
1625 #[cfg(feature = "turso")]
1626 {
1627 introspect_turso(url, _auth_token)
1628 }
1629 #[cfg(not(feature = "turso"))]
1630 {
1631 Err(CliError::MissingDriver {
1632 dialect: "Turso (remote)",
1633 feature: "turso",
1634 })
1635 }
1636 }
1637 }
1638
1639 #[cfg(all(not(feature = "turso"), not(feature = "libsql")))]
1640 Credentials::Turso { .. } => Err(CliError::MissingDriver {
1641 dialect: "Turso",
1642 feature: "turso or libsql",
1643 }),
1644
1645 _ => Err(CliError::Other(
1646 "SQLite introspection requires sqlite path or turso credentials".into(),
1647 )),
1648 }
1649}
1650
1651fn introspect_postgres_dialect(credentials: &Credentials) -> Result<IntrospectResult, CliError> {
1653 match credentials {
1654 #[cfg(feature = "postgres-sync")]
1655 Credentials::Postgres(creds) => introspect_postgres_sync(creds),
1656
1657 #[cfg(all(not(feature = "postgres-sync"), feature = "tokio-postgres"))]
1658 Credentials::Postgres(creds) => introspect_postgres_async(creds),
1659
1660 #[cfg(all(not(feature = "postgres-sync"), not(feature = "tokio-postgres")))]
1661 Credentials::Postgres(_) => Err(CliError::MissingDriver {
1662 dialect: "PostgreSQL",
1663 feature: "postgres-sync or tokio-postgres",
1664 }),
1665
1666 _ => Err(CliError::Other(
1667 "PostgreSQL introspection requires postgres credentials".into(),
1668 )),
1669 }
1670}
1671
1672#[cfg(feature = "rusqlite")]
1677fn introspect_rusqlite(path: &str) -> Result<IntrospectResult, CliError> {
1678 use drizzle_migrations::sqlite::{
1679 SQLiteDDL, Table, View,
1680 codegen::{CodegenOptions, generate_rust_schema},
1681 introspect::{
1682 RawColumnInfo, RawForeignKey, RawIndexColumn, RawIndexInfo, RawViewInfo,
1683 parse_generated_columns_from_table_sql, parse_view_sql, process_columns,
1684 process_foreign_keys, process_indexes, queries,
1685 },
1686 };
1687 use std::collections::{HashMap, HashSet};
1688
1689 let conn = rusqlite::Connection::open(path).map_err(|e| {
1690 CliError::ConnectionError(format!("Failed to open SQLite database '{}': {}", path, e))
1691 })?;
1692
1693 let mut tables_stmt = conn
1695 .prepare(queries::TABLES_QUERY)
1696 .map_err(|e| CliError::Other(format!("Failed to prepare tables query: {}", e)))?;
1697
1698 let tables: Vec<(String, Option<String>)> = tables_stmt
1699 .query_map([], |row| Ok((row.get(0)?, row.get(1)?)))
1700 .map_err(|e| CliError::Other(e.to_string()))?
1701 .filter_map(Result::ok)
1702 .collect();
1703
1704 let table_sql_map: HashMap<String, String> = tables
1705 .iter()
1706 .filter_map(|(name, sql)| sql.as_ref().map(|s| (name.clone(), s.clone())))
1707 .collect();
1708
1709 let mut columns_stmt = conn
1711 .prepare(queries::COLUMNS_QUERY)
1712 .map_err(|e| CliError::Other(format!("Failed to prepare columns query: {}", e)))?;
1713
1714 let mut raw_columns: Vec<RawColumnInfo> = columns_stmt
1715 .query_map([], |row| {
1716 Ok(RawColumnInfo {
1717 table: row.get(0)?,
1718 cid: row.get(1)?,
1719 name: row.get(2)?,
1720 column_type: row.get(3)?,
1721 not_null: row.get(4)?,
1722 default_value: row.get(5)?,
1723 pk: row.get(6)?,
1724 hidden: row.get(7)?,
1725 sql: row.get(8)?,
1726 })
1727 })
1728 .map_err(|e| CliError::Other(e.to_string()))?
1729 .filter_map(Result::ok)
1730 .collect();
1731
1732 let mut all_indexes: Vec<RawIndexInfo> = Vec::new();
1734 let mut all_index_columns: Vec<RawIndexColumn> = Vec::new();
1735 let mut all_fks: Vec<RawForeignKey> = Vec::new();
1736 let mut all_views: Vec<RawViewInfo> = Vec::new();
1737
1738 for (table_name, _) in &tables {
1739 if let Ok(mut idx_stmt) = conn.prepare(&queries::indexes_query(table_name)) {
1741 let indexes: Vec<RawIndexInfo> = idx_stmt
1742 .query_map([], |row| {
1743 Ok(RawIndexInfo {
1744 table: table_name.clone(),
1745 name: row.get(1)?,
1746 unique: row.get::<_, i32>(2)? != 0,
1747 origin: row.get(3)?,
1748 partial: row.get::<_, i32>(4)? != 0,
1749 })
1750 })
1751 .map_err(|e| CliError::Other(e.to_string()))?
1752 .filter_map(Result::ok)
1753 .collect();
1754
1755 for idx in &indexes {
1757 if let Ok(mut col_stmt) = conn.prepare(&queries::index_info_query(&idx.name))
1758 && let Ok(col_iter) = col_stmt.query_map([], |row| {
1759 Ok(RawIndexColumn {
1760 index_name: idx.name.clone(),
1761 seqno: row.get(0)?,
1762 cid: row.get(1)?,
1763 name: row.get(2)?,
1764 desc: row.get::<_, i32>(3)? != 0,
1765 coll: row.get(4)?,
1766 key: row.get::<_, i32>(5)? != 0,
1767 })
1768 })
1769 {
1770 all_index_columns.extend(col_iter.filter_map(Result::ok));
1771 }
1772 }
1773 all_indexes.extend(indexes);
1774 }
1775
1776 if let Ok(mut fk_stmt) = conn.prepare(&queries::foreign_keys_query(table_name))
1778 && let Ok(fk_iter) = fk_stmt.query_map([], |row| {
1779 Ok(RawForeignKey {
1780 table: table_name.clone(),
1781 id: row.get(0)?,
1782 seq: row.get(1)?,
1783 to_table: row.get(2)?,
1784 from_column: row.get(3)?,
1785 to_column: row.get(4)?,
1786 on_update: row.get(5)?,
1787 on_delete: row.get(6)?,
1788 r#match: row.get(7)?,
1789 })
1790 })
1791 {
1792 all_fks.extend(fk_iter.filter_map(Result::ok));
1793 }
1794 }
1795
1796 if let Ok(mut views_stmt) = conn.prepare(queries::VIEWS_QUERY)
1798 && let Ok(view_iter) = views_stmt.query_map([], |row| {
1799 Ok(RawViewInfo {
1800 name: row.get(0)?,
1801 sql: row.get(1)?,
1802 })
1803 })
1804 {
1805 all_views.extend(view_iter.filter_map(Result::ok));
1806 }
1807
1808 if let Ok(mut view_cols_stmt) = conn.prepare(queries::VIEW_COLUMNS_QUERY)
1810 && let Ok(col_iter) = view_cols_stmt.query_map([], |row| {
1811 Ok(RawColumnInfo {
1812 table: row.get(0)?,
1813 cid: row.get(1)?,
1814 name: row.get(2)?,
1815 column_type: row.get(3)?,
1816 not_null: row.get::<_, i32>(4)? != 0,
1817 default_value: row.get(5)?,
1818 pk: row.get(6)?,
1819 hidden: row.get(7)?,
1820 sql: row.get(8)?,
1821 })
1822 })
1823 {
1824 raw_columns.extend(col_iter.filter_map(Result::ok));
1825 }
1826
1827 let mut generated_columns: HashMap<String, drizzle_migrations::sqlite::ddl::ParsedGenerated> =
1829 HashMap::new();
1830 for (table, sql) in &table_sql_map {
1831 generated_columns.extend(parse_generated_columns_from_table_sql(table, sql));
1832 }
1833 let pk_columns: HashSet<(String, String)> = raw_columns
1834 .iter()
1835 .filter(|c| c.pk > 0)
1836 .map(|c| (c.table.clone(), c.name.clone()))
1837 .collect();
1838
1839 let (columns, primary_keys) = process_columns(&raw_columns, &generated_columns, &pk_columns);
1840 let indexes = process_indexes(&all_indexes, &all_index_columns, &table_sql_map);
1841 let foreign_keys = process_foreign_keys(&all_fks);
1842
1843 let uniques = process_sqlite_uniques_from_indexes(&all_indexes, &all_index_columns);
1845
1846 let mut ddl = SQLiteDDL::new();
1848
1849 for (table_name, table_sql) in &tables {
1850 let mut table = Table::new(table_name.clone());
1851 if let Some(sql) = table_sql {
1853 let sql_upper = sql.to_uppercase();
1854 table.strict = sql_upper.contains(" STRICT");
1855 table.without_rowid = sql_upper.contains("WITHOUT ROWID");
1856 }
1857 ddl.tables.push(table);
1858 }
1859
1860 for col in columns {
1861 ddl.columns.push(col);
1862 }
1863
1864 for idx in indexes {
1865 ddl.indexes.push(idx);
1866 }
1867
1868 for fk in foreign_keys {
1869 ddl.fks.push(fk);
1870 }
1871
1872 for pk in primary_keys {
1873 ddl.pks.push(pk);
1874 }
1875
1876 for u in uniques {
1877 ddl.uniques.push(u);
1878 }
1879
1880 for v in all_views {
1882 let mut view = View::new(v.name);
1883 if let Some(def) = parse_view_sql(&v.sql) {
1884 view.definition = Some(def.into());
1885 } else {
1886 view.error = Some("Failed to parse view SQL".into());
1887 }
1888 ddl.views.push(view);
1889 }
1890
1891 let options = CodegenOptions {
1893 module_doc: Some(format!("Schema introspected from {}", path)),
1894 include_schema: true,
1895 schema_name: "Schema".to_string(),
1896 use_pub: true,
1897 };
1898
1899 let generated = generate_rust_schema(&ddl, &options);
1900
1901 let mut sqlite_snapshot = drizzle_migrations::sqlite::SQLiteSnapshot::new();
1903 for entity in ddl.to_entities() {
1904 sqlite_snapshot.add_entity(entity);
1905 }
1906 let snapshot = Snapshot::Sqlite(sqlite_snapshot);
1907
1908 Ok(IntrospectResult {
1909 schema_code: generated.code,
1910 table_count: generated.tables.len(),
1911 index_count: generated.indexes.len(),
1912 view_count: ddl.views.len(),
1913 warnings: generated.warnings,
1914 snapshot,
1915 snapshot_path: std::path::PathBuf::new(),
1916 })
1917}
1918
1919#[cfg(feature = "libsql")]
1924fn introspect_libsql_local(path: &str) -> Result<IntrospectResult, CliError> {
1925 let rt = tokio::runtime::Builder::new_current_thread()
1926 .enable_all()
1927 .build()
1928 .map_err(|e| CliError::Other(format!("Failed to create async runtime: {}", e)))?;
1929
1930 rt.block_on(introspect_libsql_inner(path, None))
1931}
1932
1933#[cfg(feature = "libsql")]
1934async fn introspect_libsql_inner(
1935 path: &str,
1936 _auth_token: Option<&str>,
1937) -> Result<IntrospectResult, CliError> {
1938 use drizzle_migrations::sqlite::{
1939 SQLiteDDL, Table, View,
1940 codegen::{CodegenOptions, generate_rust_schema},
1941 introspect::{
1942 RawColumnInfo, RawForeignKey, RawIndexColumn, RawIndexInfo, RawViewInfo,
1943 parse_generated_columns_from_table_sql, parse_view_sql, process_columns,
1944 process_foreign_keys, process_indexes, queries,
1945 },
1946 };
1947 use std::collections::{HashMap, HashSet};
1948
1949 let db = libsql::Builder::new_local(path)
1950 .build()
1951 .await
1952 .map_err(|e| {
1953 CliError::ConnectionError(format!("Failed to open LibSQL database '{}': {}", path, e))
1954 })?;
1955
1956 let conn = db
1957 .connect()
1958 .map_err(|e| CliError::ConnectionError(e.to_string()))?;
1959
1960 let mut tables_rows = conn
1962 .query(queries::TABLES_QUERY, ())
1963 .await
1964 .map_err(|e| CliError::Other(format!("Failed to query tables: {}", e)))?;
1965
1966 let mut tables: Vec<(String, Option<String>)> = Vec::new();
1967 while let Ok(Some(row)) = tables_rows.next().await {
1968 let name: String = row.get(0).unwrap_or_default();
1969 let sql: Option<String> = row.get(1).ok();
1970 tables.push((name, sql));
1971 }
1972
1973 let table_sql_map: HashMap<String, String> = tables
1974 .iter()
1975 .filter_map(|(name, sql)| sql.as_ref().map(|s| (name.clone(), s.clone())))
1976 .collect();
1977
1978 let mut columns_rows = conn
1980 .query(queries::COLUMNS_QUERY, ())
1981 .await
1982 .map_err(|e| CliError::Other(format!("Failed to query columns: {}", e)))?;
1983
1984 let mut raw_columns: Vec<RawColumnInfo> = Vec::new();
1985 while let Ok(Some(row)) = columns_rows.next().await {
1986 raw_columns.push(RawColumnInfo {
1987 table: row.get(0).unwrap_or_default(),
1988 cid: row.get(1).unwrap_or(0),
1989 name: row.get(2).unwrap_or_default(),
1990 column_type: row.get(3).unwrap_or_default(),
1991 not_null: row.get::<i32>(4).unwrap_or(0) != 0,
1992 default_value: row.get(5).ok(),
1993 pk: row.get(6).unwrap_or(0),
1994 hidden: row.get(7).unwrap_or(0),
1995 sql: row.get(8).ok(),
1996 });
1997 }
1998
1999 let mut all_indexes: Vec<RawIndexInfo> = Vec::new();
2001 let mut all_index_columns: Vec<RawIndexColumn> = Vec::new();
2002 let mut all_fks: Vec<RawForeignKey> = Vec::new();
2003 let mut all_views: Vec<RawViewInfo> = Vec::new();
2004
2005 for (table_name, _) in &tables {
2006 if let Ok(mut idx_rows) = conn.query(&queries::indexes_query(table_name), ()).await {
2008 while let Ok(Some(row)) = idx_rows.next().await {
2009 let idx = RawIndexInfo {
2010 table: table_name.clone(),
2011 name: row.get(1).unwrap_or_default(),
2012 unique: row.get::<i32>(2).unwrap_or(0) != 0,
2013 origin: row.get(3).unwrap_or_default(),
2014 partial: row.get::<i32>(4).unwrap_or(0) != 0,
2015 };
2016
2017 if let Ok(mut col_rows) =
2019 conn.query(&queries::index_info_query(&idx.name), ()).await
2020 {
2021 while let Ok(Some(col_row)) = col_rows.next().await {
2022 all_index_columns.push(RawIndexColumn {
2023 index_name: idx.name.clone(),
2024 seqno: col_row.get(0).unwrap_or(0),
2025 cid: col_row.get(1).unwrap_or(0),
2026 name: col_row.get(2).ok(),
2027 desc: col_row.get::<i32>(3).unwrap_or(0) != 0,
2028 coll: col_row.get(4).unwrap_or_default(),
2029 key: col_row.get::<i32>(5).unwrap_or(0) != 0,
2030 });
2031 }
2032 }
2033
2034 all_indexes.push(idx);
2035 }
2036 }
2037
2038 if let Ok(mut fk_rows) = conn
2040 .query(&queries::foreign_keys_query(table_name), ())
2041 .await
2042 {
2043 while let Ok(Some(row)) = fk_rows.next().await {
2044 all_fks.push(RawForeignKey {
2045 table: table_name.clone(),
2046 id: row.get(0).unwrap_or(0),
2047 seq: row.get(1).unwrap_or(0),
2048 to_table: row.get(2).unwrap_or_default(),
2049 from_column: row.get(3).unwrap_or_default(),
2050 to_column: row.get(4).unwrap_or_default(),
2051 on_update: row.get(5).unwrap_or_default(),
2052 on_delete: row.get(6).unwrap_or_default(),
2053 r#match: row.get(7).unwrap_or_default(),
2054 });
2055 }
2056 }
2057 }
2058
2059 if let Ok(mut views_rows) = conn.query(queries::VIEWS_QUERY, ()).await {
2061 while let Ok(Some(row)) = views_rows.next().await {
2062 let name: String = row.get(0).unwrap_or_default();
2063 let sql: String = row.get(1).unwrap_or_default();
2064 all_views.push(RawViewInfo { name, sql });
2065 }
2066 }
2067
2068 if let Ok(mut view_cols_rows) = conn.query(queries::VIEW_COLUMNS_QUERY, ()).await {
2070 while let Ok(Some(row)) = view_cols_rows.next().await {
2071 raw_columns.push(RawColumnInfo {
2072 table: row.get(0).unwrap_or_default(),
2073 cid: row.get(1).unwrap_or(0),
2074 name: row.get(2).unwrap_or_default(),
2075 column_type: row.get(3).unwrap_or_default(),
2076 not_null: row.get::<i32>(4).unwrap_or(0) != 0,
2077 default_value: row.get(5).ok(),
2078 pk: row.get(6).unwrap_or(0),
2079 hidden: row.get(7).unwrap_or(0),
2080 sql: row.get(8).ok(),
2081 });
2082 }
2083 }
2084
2085 let mut generated_columns: HashMap<String, drizzle_migrations::sqlite::ddl::ParsedGenerated> =
2087 HashMap::new();
2088 for (table, sql) in &table_sql_map {
2089 generated_columns.extend(parse_generated_columns_from_table_sql(table, sql));
2090 }
2091 let pk_columns: HashSet<(String, String)> = raw_columns
2092 .iter()
2093 .filter(|c| c.pk > 0)
2094 .map(|c| (c.table.clone(), c.name.clone()))
2095 .collect();
2096
2097 let (columns, primary_keys) = process_columns(&raw_columns, &generated_columns, &pk_columns);
2098 let indexes = process_indexes(&all_indexes, &all_index_columns, &table_sql_map);
2099 let foreign_keys = process_foreign_keys(&all_fks);
2100 let uniques = process_sqlite_uniques_from_indexes(&all_indexes, &all_index_columns);
2101
2102 let mut ddl = SQLiteDDL::new();
2103
2104 for (table_name, table_sql) in &tables {
2105 let mut table = Table::new(table_name.clone());
2106 if let Some(sql) = table_sql {
2107 let sql_upper = sql.to_uppercase();
2108 table.strict = sql_upper.contains(" STRICT");
2109 table.without_rowid = sql_upper.contains("WITHOUT ROWID");
2110 }
2111 ddl.tables.push(table);
2112 }
2113
2114 for col in columns {
2115 ddl.columns.push(col);
2116 }
2117 for idx in indexes {
2118 ddl.indexes.push(idx);
2119 }
2120 for fk in foreign_keys {
2121 ddl.fks.push(fk);
2122 }
2123 for pk in primary_keys {
2124 ddl.pks.push(pk);
2125 }
2126
2127 for u in uniques {
2128 ddl.uniques.push(u);
2129 }
2130
2131 for v in all_views {
2132 let mut view = View::new(v.name);
2133 if let Some(def) = parse_view_sql(&v.sql) {
2134 view.definition = Some(def.into());
2135 } else {
2136 view.error = Some("Failed to parse view SQL".into());
2137 }
2138 ddl.views.push(view);
2139 }
2140
2141 let options = CodegenOptions {
2142 module_doc: Some(format!("Schema introspected from {}", path)),
2143 include_schema: true,
2144 schema_name: "Schema".to_string(),
2145 use_pub: true,
2146 };
2147
2148 let generated = generate_rust_schema(&ddl, &options);
2149
2150 let mut sqlite_snapshot = drizzle_migrations::sqlite::SQLiteSnapshot::new();
2152 for entity in ddl.to_entities() {
2153 sqlite_snapshot.add_entity(entity);
2154 }
2155 let snapshot = Snapshot::Sqlite(sqlite_snapshot);
2156
2157 Ok(IntrospectResult {
2158 schema_code: generated.code,
2159 table_count: generated.tables.len(),
2160 index_count: generated.indexes.len(),
2161 view_count: ddl.views.len(),
2162 warnings: generated.warnings,
2163 snapshot,
2164 snapshot_path: std::path::PathBuf::new(),
2165 })
2166}
2167
2168#[cfg(feature = "turso")]
2173fn introspect_turso(url: &str, auth_token: Option<&str>) -> Result<IntrospectResult, CliError> {
2174 let rt = tokio::runtime::Builder::new_current_thread()
2175 .enable_all()
2176 .build()
2177 .map_err(|e| CliError::Other(format!("Failed to create async runtime: {}", e)))?;
2178
2179 rt.block_on(introspect_turso_inner(url, auth_token))
2180}
2181
2182#[cfg(feature = "turso")]
2183async fn introspect_turso_inner(
2184 url: &str,
2185 auth_token: Option<&str>,
2186) -> Result<IntrospectResult, CliError> {
2187 use drizzle_migrations::sqlite::{
2188 SQLiteDDL, Table, View,
2189 codegen::{CodegenOptions, generate_rust_schema},
2190 introspect::{
2191 RawColumnInfo, RawForeignKey, RawIndexColumn, RawIndexInfo, RawViewInfo,
2192 parse_generated_columns_from_table_sql, parse_view_sql, process_columns,
2193 process_foreign_keys, process_indexes, queries,
2194 },
2195 };
2196 use std::collections::{HashMap, HashSet};
2197
2198 let builder =
2199 libsql::Builder::new_remote(url.to_string(), auth_token.unwrap_or("").to_string());
2200
2201 let db = builder.build().await.map_err(|e| {
2202 CliError::ConnectionError(format!("Failed to connect to Turso '{}': {}", url, e))
2203 })?;
2204
2205 let conn = db
2206 .connect()
2207 .map_err(|e| CliError::ConnectionError(e.to_string()))?;
2208
2209 let mut tables_rows = conn
2211 .query(queries::TABLES_QUERY, ())
2212 .await
2213 .map_err(|e| CliError::Other(format!("Failed to query tables: {}", e)))?;
2214
2215 let mut tables: Vec<(String, Option<String>)> = Vec::new();
2216 while let Ok(Some(row)) = tables_rows.next().await {
2217 let name: String = row.get(0).unwrap_or_default();
2218 let sql: Option<String> = row.get(1).ok();
2219 tables.push((name, sql));
2220 }
2221
2222 let table_sql_map: HashMap<String, String> = tables
2223 .iter()
2224 .filter_map(|(name, sql)| sql.as_ref().map(|s| (name.clone(), s.clone())))
2225 .collect();
2226
2227 let mut columns_rows = conn
2229 .query(queries::COLUMNS_QUERY, ())
2230 .await
2231 .map_err(|e| CliError::Other(format!("Failed to query columns: {}", e)))?;
2232
2233 let mut raw_columns: Vec<RawColumnInfo> = Vec::new();
2234 while let Ok(Some(row)) = columns_rows.next().await {
2235 raw_columns.push(RawColumnInfo {
2236 table: row.get(0).unwrap_or_default(),
2237 cid: row.get(1).unwrap_or(0),
2238 name: row.get(2).unwrap_or_default(),
2239 column_type: row.get(3).unwrap_or_default(),
2240 not_null: row.get::<i32>(4).unwrap_or(0) != 0,
2241 default_value: row.get(5).ok(),
2242 pk: row.get(6).unwrap_or(0),
2243 hidden: row.get(7).unwrap_or(0),
2244 sql: row.get(8).ok(),
2245 });
2246 }
2247
2248 let mut all_indexes: Vec<RawIndexInfo> = Vec::new();
2250 let mut all_index_columns: Vec<RawIndexColumn> = Vec::new();
2251 let mut all_fks: Vec<RawForeignKey> = Vec::new();
2252 let mut all_views: Vec<RawViewInfo> = Vec::new();
2253
2254 for (table_name, _) in &tables {
2255 if let Ok(mut idx_rows) = conn.query(&queries::indexes_query(table_name), ()).await {
2257 while let Ok(Some(row)) = idx_rows.next().await {
2258 let idx = RawIndexInfo {
2259 table: table_name.clone(),
2260 name: row.get(1).unwrap_or_default(),
2261 unique: row.get::<i32>(2).unwrap_or(0) != 0,
2262 origin: row.get(3).unwrap_or_default(),
2263 partial: row.get::<i32>(4).unwrap_or(0) != 0,
2264 };
2265
2266 if let Ok(mut col_rows) =
2268 conn.query(&queries::index_info_query(&idx.name), ()).await
2269 {
2270 while let Ok(Some(col_row)) = col_rows.next().await {
2271 all_index_columns.push(RawIndexColumn {
2272 index_name: idx.name.clone(),
2273 seqno: col_row.get(0).unwrap_or(0),
2274 cid: col_row.get(1).unwrap_or(0),
2275 name: col_row.get(2).ok(),
2276 desc: col_row.get::<i32>(3).unwrap_or(0) != 0,
2277 coll: col_row.get(4).unwrap_or_default(),
2278 key: col_row.get::<i32>(5).unwrap_or(0) != 0,
2279 });
2280 }
2281 }
2282
2283 all_indexes.push(idx);
2284 }
2285 }
2286
2287 if let Ok(mut fk_rows) = conn
2289 .query(&queries::foreign_keys_query(table_name), ())
2290 .await
2291 {
2292 while let Ok(Some(row)) = fk_rows.next().await {
2293 all_fks.push(RawForeignKey {
2294 table: table_name.clone(),
2295 id: row.get(0).unwrap_or(0),
2296 seq: row.get(1).unwrap_or(0),
2297 to_table: row.get(2).unwrap_or_default(),
2298 from_column: row.get(3).unwrap_or_default(),
2299 to_column: row.get(4).unwrap_or_default(),
2300 on_update: row.get(5).unwrap_or_default(),
2301 on_delete: row.get(6).unwrap_or_default(),
2302 r#match: row.get(7).unwrap_or_default(),
2303 });
2304 }
2305 }
2306 }
2307
2308 if let Ok(mut views_rows) = conn.query(queries::VIEWS_QUERY, ()).await {
2310 while let Ok(Some(row)) = views_rows.next().await {
2311 let name: String = row.get(0).unwrap_or_default();
2312 let sql: String = row.get(1).unwrap_or_default();
2313 all_views.push(RawViewInfo { name, sql });
2314 }
2315 }
2316
2317 if let Ok(mut view_cols_rows) = conn.query(queries::VIEW_COLUMNS_QUERY, ()).await {
2319 while let Ok(Some(row)) = view_cols_rows.next().await {
2320 raw_columns.push(RawColumnInfo {
2321 table: row.get(0).unwrap_or_default(),
2322 cid: row.get(1).unwrap_or(0),
2323 name: row.get(2).unwrap_or_default(),
2324 column_type: row.get(3).unwrap_or_default(),
2325 not_null: row.get::<i32>(4).unwrap_or(0) != 0,
2326 default_value: row.get(5).ok(),
2327 pk: row.get(6).unwrap_or(0),
2328 hidden: row.get(7).unwrap_or(0),
2329 sql: row.get(8).ok(),
2330 });
2331 }
2332 }
2333
2334 let mut generated_columns: HashMap<String, drizzle_migrations::sqlite::ddl::ParsedGenerated> =
2336 HashMap::new();
2337 for (table, sql) in &table_sql_map {
2338 generated_columns.extend(parse_generated_columns_from_table_sql(table, sql));
2339 }
2340 let pk_columns: HashSet<(String, String)> = raw_columns
2341 .iter()
2342 .filter(|c| c.pk > 0)
2343 .map(|c| (c.table.clone(), c.name.clone()))
2344 .collect();
2345
2346 let (columns, primary_keys) = process_columns(&raw_columns, &generated_columns, &pk_columns);
2347 let indexes = process_indexes(&all_indexes, &all_index_columns, &table_sql_map);
2348 let foreign_keys = process_foreign_keys(&all_fks);
2349 let uniques = process_sqlite_uniques_from_indexes(&all_indexes, &all_index_columns);
2350
2351 let mut ddl = SQLiteDDL::new();
2352
2353 for (table_name, table_sql) in &tables {
2354 let mut table = Table::new(table_name.clone());
2355 if let Some(sql) = table_sql {
2356 let sql_upper = sql.to_uppercase();
2357 table.strict = sql_upper.contains(" STRICT");
2358 table.without_rowid = sql_upper.contains("WITHOUT ROWID");
2359 }
2360 ddl.tables.push(table);
2361 }
2362
2363 for col in columns {
2364 ddl.columns.push(col);
2365 }
2366 for idx in indexes {
2367 ddl.indexes.push(idx);
2368 }
2369 for fk in foreign_keys {
2370 ddl.fks.push(fk);
2371 }
2372 for pk in primary_keys {
2373 ddl.pks.push(pk);
2374 }
2375
2376 for u in uniques {
2377 ddl.uniques.push(u);
2378 }
2379
2380 for v in all_views {
2381 let mut view = View::new(v.name);
2382 if let Some(def) = parse_view_sql(&v.sql) {
2383 view.definition = Some(def.into());
2384 } else {
2385 view.error = Some("Failed to parse view SQL".into());
2386 }
2387 ddl.views.push(view);
2388 }
2389
2390 let options = CodegenOptions {
2391 module_doc: Some(format!("Schema introspected from Turso: {}", url)),
2392 include_schema: true,
2393 schema_name: "Schema".to_string(),
2394 use_pub: true,
2395 };
2396
2397 let generated = generate_rust_schema(&ddl, &options);
2398
2399 let mut sqlite_snapshot = drizzle_migrations::sqlite::SQLiteSnapshot::new();
2401 for entity in ddl.to_entities() {
2402 sqlite_snapshot.add_entity(entity);
2403 }
2404 let snapshot = Snapshot::Sqlite(sqlite_snapshot);
2405
2406 Ok(IntrospectResult {
2407 schema_code: generated.code,
2408 table_count: generated.tables.len(),
2409 index_count: generated.indexes.len(),
2410 view_count: ddl.views.len(),
2411 warnings: generated.warnings,
2412 snapshot,
2413 snapshot_path: std::path::PathBuf::new(),
2414 })
2415}
2416
2417#[cfg(feature = "postgres-sync")]
2422fn introspect_postgres_sync(creds: &PostgresCreds) -> Result<IntrospectResult, CliError> {
2423 use drizzle_migrations::postgres::{
2424 PostgresDDL,
2425 codegen::{CodegenOptions, generate_rust_schema},
2426 ddl::Schema,
2427 introspect::{
2428 RawCheckInfo, RawColumnInfo, RawEnumInfo, RawForeignKeyInfo, RawIndexInfo,
2429 RawPolicyInfo, RawPrimaryKeyInfo, RawRoleInfo, RawSequenceInfo, RawTableInfo,
2430 RawUniqueInfo, RawViewInfo, process_check_constraints, process_columns, process_enums,
2431 process_foreign_keys, process_indexes, process_policies, process_primary_keys,
2432 process_roles, process_sequences, process_tables, process_unique_constraints,
2433 process_views,
2434 },
2435 };
2436
2437 let url = creds.connection_url();
2438 let mut client = postgres::Client::connect(&url, postgres::NoTls).map_err(|e| {
2439 CliError::ConnectionError(format!("Failed to connect to PostgreSQL: {}", e))
2440 })?;
2441
2442 let raw_schemas: Vec<RawSchemaInfo> = client
2444 .query(
2445 drizzle_migrations::postgres::introspect::queries::SCHEMAS_QUERY,
2446 &[],
2447 )
2448 .map_err(|e| CliError::Other(format!("Failed to query schemas: {}", e)))?
2449 .into_iter()
2450 .map(|row| RawSchemaInfo {
2451 name: row.get::<_, String>(0),
2452 })
2453 .collect();
2454
2455 let raw_tables: Vec<RawTableInfo> = client
2457 .query(
2458 drizzle_migrations::postgres::introspect::queries::TABLES_QUERY,
2459 &[],
2460 )
2461 .map_err(|e| CliError::Other(format!("Failed to query tables: {}", e)))?
2462 .into_iter()
2463 .map(|row| RawTableInfo {
2464 schema: row.get::<_, String>(0),
2465 name: row.get::<_, String>(1),
2466 is_rls_enabled: row.get::<_, bool>(2),
2467 })
2468 .collect();
2469
2470 let raw_columns: Vec<RawColumnInfo> = client
2472 .query(
2473 drizzle_migrations::postgres::introspect::queries::COLUMNS_QUERY,
2474 &[],
2475 )
2476 .map_err(|e| CliError::Other(format!("Failed to query columns: {}", e)))?
2477 .into_iter()
2478 .map(|row| RawColumnInfo {
2479 schema: row.get::<_, String>(0),
2480 table: row.get::<_, String>(1),
2481 name: row.get::<_, String>(2),
2482 column_type: row.get::<_, String>(3),
2483 type_schema: row.get::<_, Option<String>>(4),
2484 not_null: row.get::<_, bool>(5),
2485 default_value: row.get::<_, Option<String>>(6),
2486 is_identity: row.get::<_, bool>(7),
2487 identity_type: row.get::<_, Option<String>>(8),
2488 is_generated: row.get::<_, bool>(9),
2489 generated_expression: row.get::<_, Option<String>>(10),
2490 ordinal_position: row.get::<_, i32>(11),
2491 })
2492 .collect();
2493
2494 let raw_enums: Vec<RawEnumInfo> = client
2496 .query(
2497 drizzle_migrations::postgres::introspect::queries::ENUMS_QUERY,
2498 &[],
2499 )
2500 .map_err(|e| CliError::Other(format!("Failed to query enums: {}", e)))?
2501 .into_iter()
2502 .map(|row| RawEnumInfo {
2503 schema: row.get::<_, String>(0),
2504 name: row.get::<_, String>(1),
2505 values: row.get::<_, Vec<String>>(2),
2506 })
2507 .collect();
2508
2509 let raw_sequences: Vec<RawSequenceInfo> = client
2511 .query(
2512 drizzle_migrations::postgres::introspect::queries::SEQUENCES_QUERY,
2513 &[],
2514 )
2515 .map_err(|e| CliError::Other(format!("Failed to query sequences: {}", e)))?
2516 .into_iter()
2517 .map(|row| RawSequenceInfo {
2518 schema: row.get::<_, String>(0),
2519 name: row.get::<_, String>(1),
2520 data_type: row.get::<_, String>(2),
2521 start_value: row.get::<_, String>(3),
2522 min_value: row.get::<_, String>(4),
2523 max_value: row.get::<_, String>(5),
2524 increment: row.get::<_, String>(6),
2525 cycle: row.get::<_, bool>(7),
2526 cache_value: row.get::<_, String>(8),
2527 })
2528 .collect();
2529
2530 let raw_views: Vec<RawViewInfo> = client
2532 .query(
2533 drizzle_migrations::postgres::introspect::queries::VIEWS_QUERY,
2534 &[],
2535 )
2536 .map_err(|e| CliError::Other(format!("Failed to query views: {}", e)))?
2537 .into_iter()
2538 .map(|row| RawViewInfo {
2539 schema: row.get::<_, String>(0),
2540 name: row.get::<_, String>(1),
2541 definition: row.get::<_, String>(2),
2542 is_materialized: row.get::<_, bool>(3),
2543 })
2544 .collect();
2545
2546 let raw_indexes: Vec<RawIndexInfo> = client
2548 .query(POSTGRES_INDEXES_QUERY, &[])
2549 .map_err(|e| CliError::Other(format!("Failed to query indexes: {}", e)))?
2550 .into_iter()
2551 .map(|row| {
2552 let cols: Vec<String> = row.get(6);
2553 RawIndexInfo {
2554 schema: row.get::<_, String>(0),
2555 table: row.get::<_, String>(1),
2556 name: row.get::<_, String>(2),
2557 is_unique: row.get::<_, bool>(3),
2558 is_primary: row.get::<_, bool>(4),
2559 method: row.get::<_, String>(5),
2560 columns: parse_postgres_index_columns(cols),
2561 where_clause: row.get::<_, Option<String>>(7),
2562 concurrent: false,
2563 }
2564 })
2565 .collect();
2566
2567 let raw_fks: Vec<RawForeignKeyInfo> = client
2569 .query(POSTGRES_FOREIGN_KEYS_QUERY, &[])
2570 .map_err(|e| CliError::Other(format!("Failed to query foreign keys: {}", e)))?
2571 .into_iter()
2572 .map(|row| RawForeignKeyInfo {
2573 schema: row.get::<_, String>(0),
2574 table: row.get::<_, String>(1),
2575 name: row.get::<_, String>(2),
2576 columns: row.get::<_, Vec<String>>(3),
2577 schema_to: row.get::<_, String>(4),
2578 table_to: row.get::<_, String>(5),
2579 columns_to: row.get::<_, Vec<String>>(6),
2580 on_update: pg_action_code_to_string(row.get::<_, String>(7)),
2581 on_delete: pg_action_code_to_string(row.get::<_, String>(8)),
2582 })
2583 .collect();
2584
2585 let raw_pks: Vec<RawPrimaryKeyInfo> = client
2587 .query(POSTGRES_PRIMARY_KEYS_QUERY, &[])
2588 .map_err(|e| CliError::Other(format!("Failed to query primary keys: {}", e)))?
2589 .into_iter()
2590 .map(|row| RawPrimaryKeyInfo {
2591 schema: row.get::<_, String>(0),
2592 table: row.get::<_, String>(1),
2593 name: row.get::<_, String>(2),
2594 columns: row.get::<_, Vec<String>>(3),
2595 })
2596 .collect();
2597
2598 let raw_uniques: Vec<RawUniqueInfo> = client
2600 .query(POSTGRES_UNIQUES_QUERY, &[])
2601 .map_err(|e| CliError::Other(format!("Failed to query unique constraints: {}", e)))?
2602 .into_iter()
2603 .map(|row| RawUniqueInfo {
2604 schema: row.get::<_, String>(0),
2605 table: row.get::<_, String>(1),
2606 name: row.get::<_, String>(2),
2607 columns: row.get::<_, Vec<String>>(3),
2608 nulls_not_distinct: row.get::<_, bool>(4),
2609 })
2610 .collect();
2611
2612 let raw_checks: Vec<RawCheckInfo> = client
2614 .query(POSTGRES_CHECKS_QUERY, &[])
2615 .map_err(|e| CliError::Other(format!("Failed to query check constraints: {}", e)))?
2616 .into_iter()
2617 .map(|row| RawCheckInfo {
2618 schema: row.get::<_, String>(0),
2619 table: row.get::<_, String>(1),
2620 name: row.get::<_, String>(2),
2621 expression: row.get::<_, String>(3),
2622 })
2623 .collect();
2624
2625 let raw_roles: Vec<RawRoleInfo> = client
2627 .query(POSTGRES_ROLES_QUERY, &[])
2628 .map_err(|e| CliError::Other(format!("Failed to query roles: {}", e)))?
2629 .into_iter()
2630 .map(|row| RawRoleInfo {
2631 name: row.get::<_, String>(0),
2632 create_db: row.get::<_, bool>(1),
2633 create_role: row.get::<_, bool>(2),
2634 inherit: row.get::<_, bool>(3),
2635 })
2636 .collect();
2637
2638 let raw_policies: Vec<RawPolicyInfo> = client
2640 .query(POSTGRES_POLICIES_QUERY, &[])
2641 .map_err(|e| CliError::Other(format!("Failed to query policies: {}", e)))?
2642 .into_iter()
2643 .map(|row| RawPolicyInfo {
2644 schema: row.get::<_, String>(0),
2645 table: row.get::<_, String>(1),
2646 name: row.get::<_, String>(2),
2647 as_clause: row.get::<_, String>(3),
2648 for_clause: row.get::<_, String>(4),
2649 to: row.get::<_, Vec<String>>(5),
2650 using: row.get::<_, Option<String>>(6),
2651 with_check: row.get::<_, Option<String>>(7),
2652 })
2653 .collect();
2654
2655 let mut ddl = PostgresDDL::new();
2657
2658 for s in raw_schemas.into_iter().map(|s| Schema::new(s.name)) {
2659 ddl.schemas.push(s);
2660 }
2661 for e in process_enums(&raw_enums) {
2662 ddl.enums.push(e);
2663 }
2664 for s in process_sequences(&raw_sequences) {
2665 ddl.sequences.push(s);
2666 }
2667 for r in process_roles(&raw_roles) {
2668 ddl.roles.push(r);
2669 }
2670 for p in process_policies(&raw_policies) {
2671 ddl.policies.push(p);
2672 }
2673 for t in process_tables(&raw_tables) {
2674 ddl.tables.push(t);
2675 }
2676 for c in process_columns(&raw_columns) {
2677 ddl.columns.push(c);
2678 }
2679 for i in process_indexes(&raw_indexes) {
2680 ddl.indexes.push(i);
2681 }
2682 for fk in process_foreign_keys(&raw_fks) {
2683 ddl.fks.push(fk);
2684 }
2685 for pk in process_primary_keys(&raw_pks) {
2686 ddl.pks.push(pk);
2687 }
2688 for u in process_unique_constraints(&raw_uniques) {
2689 ddl.uniques.push(u);
2690 }
2691 for c in process_check_constraints(&raw_checks) {
2692 ddl.checks.push(c);
2693 }
2694 for v in process_views(&raw_views) {
2695 ddl.views.push(v);
2696 }
2697
2698 let options = CodegenOptions {
2700 module_doc: Some(format!("Schema introspected from {}", mask_url(&url))),
2701 include_schema: true,
2702 schema_name: "Schema".to_string(),
2703 use_pub: true,
2704 };
2705 let generated = generate_rust_schema(&ddl, &options);
2706
2707 let mut snap = drizzle_migrations::postgres::PostgresSnapshot::new();
2709 for entity in ddl.to_entities() {
2710 snap.add_entity(entity);
2711 }
2712
2713 Ok(IntrospectResult {
2714 schema_code: generated.code,
2715 table_count: ddl.tables.list().len(),
2716 index_count: ddl.indexes.list().len(),
2717 view_count: ddl.views.list().len(),
2718 warnings: generated.warnings,
2719 snapshot: Snapshot::Postgres(snap),
2720 snapshot_path: std::path::PathBuf::new(),
2721 })
2722}
2723
2724#[cfg(feature = "tokio-postgres")]
2725#[allow(dead_code)]
2726fn introspect_postgres_async(creds: &PostgresCreds) -> Result<IntrospectResult, CliError> {
2727 let rt = tokio::runtime::Builder::new_current_thread()
2728 .enable_all()
2729 .build()
2730 .map_err(|e| CliError::Other(format!("Failed to create async runtime: {}", e)))?;
2731
2732 rt.block_on(introspect_postgres_async_inner(creds))
2733}
2734
2735#[cfg(feature = "tokio-postgres")]
2736async fn introspect_postgres_async_inner(
2737 creds: &PostgresCreds,
2738) -> Result<IntrospectResult, CliError> {
2739 use drizzle_migrations::postgres::{
2740 PostgresDDL,
2741 codegen::{CodegenOptions, generate_rust_schema},
2742 ddl::Schema,
2743 introspect::{
2744 RawCheckInfo, RawColumnInfo, RawEnumInfo, RawForeignKeyInfo, RawIndexInfo,
2745 RawPolicyInfo, RawPrimaryKeyInfo, RawRoleInfo, RawSequenceInfo, RawTableInfo,
2746 RawUniqueInfo, RawViewInfo, process_check_constraints, process_columns, process_enums,
2747 process_foreign_keys, process_indexes, process_policies, process_primary_keys,
2748 process_roles, process_sequences, process_tables, process_unique_constraints,
2749 process_views,
2750 },
2751 };
2752
2753 let url = creds.connection_url();
2754 let (client, connection) = tokio_postgres::connect(&url, tokio_postgres::NoTls)
2755 .await
2756 .map_err(|e| {
2757 CliError::ConnectionError(format!("Failed to connect to PostgreSQL: {}", e))
2758 })?;
2759
2760 tokio::spawn(async move {
2761 if let Err(e) = connection.await {
2762 eprintln!(
2763 "{}",
2764 output::err_line(&format!("PostgreSQL connection error: {e}"))
2765 );
2766 }
2767 });
2768
2769 let raw_schemas: Vec<RawSchemaInfo> = client
2770 .query(
2771 drizzle_migrations::postgres::introspect::queries::SCHEMAS_QUERY,
2772 &[],
2773 )
2774 .await
2775 .map_err(|e| CliError::Other(format!("Failed to query schemas: {}", e)))?
2776 .into_iter()
2777 .map(|row| RawSchemaInfo {
2778 name: row.get::<_, String>(0),
2779 })
2780 .collect();
2781
2782 let raw_tables: Vec<RawTableInfo> = client
2783 .query(
2784 drizzle_migrations::postgres::introspect::queries::TABLES_QUERY,
2785 &[],
2786 )
2787 .await
2788 .map_err(|e| CliError::Other(format!("Failed to query tables: {}", e)))?
2789 .into_iter()
2790 .map(|row| RawTableInfo {
2791 schema: row.get::<_, String>(0),
2792 name: row.get::<_, String>(1),
2793 is_rls_enabled: row.get::<_, bool>(2),
2794 })
2795 .collect();
2796
2797 let raw_columns: Vec<RawColumnInfo> = client
2798 .query(
2799 drizzle_migrations::postgres::introspect::queries::COLUMNS_QUERY,
2800 &[],
2801 )
2802 .await
2803 .map_err(|e| CliError::Other(format!("Failed to query columns: {}", e)))?
2804 .into_iter()
2805 .map(|row| RawColumnInfo {
2806 schema: row.get::<_, String>(0),
2807 table: row.get::<_, String>(1),
2808 name: row.get::<_, String>(2),
2809 column_type: row.get::<_, String>(3),
2810 type_schema: row.get::<_, Option<String>>(4),
2811 not_null: row.get::<_, bool>(5),
2812 default_value: row.get::<_, Option<String>>(6),
2813 is_identity: row.get::<_, bool>(7),
2814 identity_type: row.get::<_, Option<String>>(8),
2815 is_generated: row.get::<_, bool>(9),
2816 generated_expression: row.get::<_, Option<String>>(10),
2817 ordinal_position: row.get::<_, i32>(11),
2818 })
2819 .collect();
2820
2821 let raw_enums: Vec<RawEnumInfo> = client
2822 .query(
2823 drizzle_migrations::postgres::introspect::queries::ENUMS_QUERY,
2824 &[],
2825 )
2826 .await
2827 .map_err(|e| CliError::Other(format!("Failed to query enums: {}", e)))?
2828 .into_iter()
2829 .map(|row| RawEnumInfo {
2830 schema: row.get::<_, String>(0),
2831 name: row.get::<_, String>(1),
2832 values: row.get::<_, Vec<String>>(2),
2833 })
2834 .collect();
2835
2836 let raw_sequences: Vec<RawSequenceInfo> = client
2837 .query(
2838 drizzle_migrations::postgres::introspect::queries::SEQUENCES_QUERY,
2839 &[],
2840 )
2841 .await
2842 .map_err(|e| CliError::Other(format!("Failed to query sequences: {}", e)))?
2843 .into_iter()
2844 .map(|row| RawSequenceInfo {
2845 schema: row.get::<_, String>(0),
2846 name: row.get::<_, String>(1),
2847 data_type: row.get::<_, String>(2),
2848 start_value: row.get::<_, String>(3),
2849 min_value: row.get::<_, String>(4),
2850 max_value: row.get::<_, String>(5),
2851 increment: row.get::<_, String>(6),
2852 cycle: row.get::<_, bool>(7),
2853 cache_value: row.get::<_, String>(8),
2854 })
2855 .collect();
2856
2857 let raw_views: Vec<RawViewInfo> = client
2858 .query(
2859 drizzle_migrations::postgres::introspect::queries::VIEWS_QUERY,
2860 &[],
2861 )
2862 .await
2863 .map_err(|e| CliError::Other(format!("Failed to query views: {}", e)))?
2864 .into_iter()
2865 .map(|row| RawViewInfo {
2866 schema: row.get::<_, String>(0),
2867 name: row.get::<_, String>(1),
2868 definition: row.get::<_, String>(2),
2869 is_materialized: row.get::<_, bool>(3),
2870 })
2871 .collect();
2872
2873 let raw_indexes: Vec<RawIndexInfo> = client
2874 .query(POSTGRES_INDEXES_QUERY, &[])
2875 .await
2876 .map_err(|e| CliError::Other(format!("Failed to query indexes: {}", e)))?
2877 .into_iter()
2878 .map(|row| {
2879 let cols: Vec<String> = row.get(6);
2880 RawIndexInfo {
2881 schema: row.get::<_, String>(0),
2882 table: row.get::<_, String>(1),
2883 name: row.get::<_, String>(2),
2884 is_unique: row.get::<_, bool>(3),
2885 is_primary: row.get::<_, bool>(4),
2886 method: row.get::<_, String>(5),
2887 columns: parse_postgres_index_columns(cols),
2888 where_clause: row.get::<_, Option<String>>(7),
2889 concurrent: false,
2890 }
2891 })
2892 .collect();
2893
2894 let raw_fks: Vec<RawForeignKeyInfo> = client
2895 .query(POSTGRES_FOREIGN_KEYS_QUERY, &[])
2896 .await
2897 .map_err(|e| CliError::Other(format!("Failed to query foreign keys: {}", e)))?
2898 .into_iter()
2899 .map(|row| RawForeignKeyInfo {
2900 schema: row.get::<_, String>(0),
2901 table: row.get::<_, String>(1),
2902 name: row.get::<_, String>(2),
2903 columns: row.get::<_, Vec<String>>(3),
2904 schema_to: row.get::<_, String>(4),
2905 table_to: row.get::<_, String>(5),
2906 columns_to: row.get::<_, Vec<String>>(6),
2907 on_update: pg_action_code_to_string(row.get::<_, String>(7)),
2908 on_delete: pg_action_code_to_string(row.get::<_, String>(8)),
2909 })
2910 .collect();
2911
2912 let raw_pks: Vec<RawPrimaryKeyInfo> = client
2913 .query(POSTGRES_PRIMARY_KEYS_QUERY, &[])
2914 .await
2915 .map_err(|e| CliError::Other(format!("Failed to query primary keys: {}", e)))?
2916 .into_iter()
2917 .map(|row| RawPrimaryKeyInfo {
2918 schema: row.get::<_, String>(0),
2919 table: row.get::<_, String>(1),
2920 name: row.get::<_, String>(2),
2921 columns: row.get::<_, Vec<String>>(3),
2922 })
2923 .collect();
2924
2925 let raw_uniques: Vec<RawUniqueInfo> = client
2926 .query(POSTGRES_UNIQUES_QUERY, &[])
2927 .await
2928 .map_err(|e| CliError::Other(format!("Failed to query unique constraints: {}", e)))?
2929 .into_iter()
2930 .map(|row| RawUniqueInfo {
2931 schema: row.get::<_, String>(0),
2932 table: row.get::<_, String>(1),
2933 name: row.get::<_, String>(2),
2934 columns: row.get::<_, Vec<String>>(3),
2935 nulls_not_distinct: row.get::<_, bool>(4),
2936 })
2937 .collect();
2938
2939 let raw_checks: Vec<RawCheckInfo> = client
2940 .query(POSTGRES_CHECKS_QUERY, &[])
2941 .await
2942 .map_err(|e| CliError::Other(format!("Failed to query check constraints: {}", e)))?
2943 .into_iter()
2944 .map(|row| RawCheckInfo {
2945 schema: row.get::<_, String>(0),
2946 table: row.get::<_, String>(1),
2947 name: row.get::<_, String>(2),
2948 expression: row.get::<_, String>(3),
2949 })
2950 .collect();
2951
2952 let raw_roles: Vec<RawRoleInfo> = client
2953 .query(POSTGRES_ROLES_QUERY, &[])
2954 .await
2955 .map_err(|e| CliError::Other(format!("Failed to query roles: {}", e)))?
2956 .into_iter()
2957 .map(|row| RawRoleInfo {
2958 name: row.get::<_, String>(0),
2959 create_db: row.get::<_, bool>(1),
2960 create_role: row.get::<_, bool>(2),
2961 inherit: row.get::<_, bool>(3),
2962 })
2963 .collect();
2964
2965 let raw_policies: Vec<RawPolicyInfo> = client
2966 .query(POSTGRES_POLICIES_QUERY, &[])
2967 .await
2968 .map_err(|e| CliError::Other(format!("Failed to query policies: {}", e)))?
2969 .into_iter()
2970 .map(|row| RawPolicyInfo {
2971 schema: row.get::<_, String>(0),
2972 table: row.get::<_, String>(1),
2973 name: row.get::<_, String>(2),
2974 as_clause: row.get::<_, String>(3),
2975 for_clause: row.get::<_, String>(4),
2976 to: row.get::<_, Vec<String>>(5),
2977 using: row.get::<_, Option<String>>(6),
2978 with_check: row.get::<_, Option<String>>(7),
2979 })
2980 .collect();
2981
2982 let mut ddl = PostgresDDL::new();
2983 for s in raw_schemas.into_iter().map(|s| Schema::new(s.name)) {
2984 ddl.schemas.push(s);
2985 }
2986 for e in process_enums(&raw_enums) {
2987 ddl.enums.push(e);
2988 }
2989 for s in process_sequences(&raw_sequences) {
2990 ddl.sequences.push(s);
2991 }
2992 for r in process_roles(&raw_roles) {
2993 ddl.roles.push(r);
2994 }
2995 for p in process_policies(&raw_policies) {
2996 ddl.policies.push(p);
2997 }
2998 for t in process_tables(&raw_tables) {
2999 ddl.tables.push(t);
3000 }
3001 for c in process_columns(&raw_columns) {
3002 ddl.columns.push(c);
3003 }
3004 for i in process_indexes(&raw_indexes) {
3005 ddl.indexes.push(i);
3006 }
3007 for fk in process_foreign_keys(&raw_fks) {
3008 ddl.fks.push(fk);
3009 }
3010 for pk in process_primary_keys(&raw_pks) {
3011 ddl.pks.push(pk);
3012 }
3013 for u in process_unique_constraints(&raw_uniques) {
3014 ddl.uniques.push(u);
3015 }
3016 for c in process_check_constraints(&raw_checks) {
3017 ddl.checks.push(c);
3018 }
3019 for v in process_views(&raw_views) {
3020 ddl.views.push(v);
3021 }
3022
3023 let options = CodegenOptions {
3024 module_doc: Some(format!("Schema introspected from {}", mask_url(&url))),
3025 include_schema: true,
3026 schema_name: "Schema".to_string(),
3027 use_pub: true,
3028 };
3029 let generated = generate_rust_schema(&ddl, &options);
3030
3031 let mut snap = drizzle_migrations::postgres::PostgresSnapshot::new();
3032 for entity in ddl.to_entities() {
3033 snap.add_entity(entity);
3034 }
3035
3036 Ok(IntrospectResult {
3037 schema_code: generated.code,
3038 table_count: ddl.tables.list().len(),
3039 index_count: ddl.indexes.list().len(),
3040 view_count: ddl.views.list().len(),
3041 warnings: generated.warnings,
3042 snapshot: Snapshot::Postgres(snap),
3043 snapshot_path: std::path::PathBuf::new(),
3044 })
3045}
3046
3047#[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
3053#[derive(Debug, Clone)]
3054struct RawSchemaInfo {
3055 name: String,
3056}
3057
3058#[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
3059const POSTGRES_INDEXES_QUERY: &str = r#"
3060SELECT
3061 ns.nspname AS schema,
3062 tbl.relname AS table,
3063 idx.relname AS name,
3064 ix.indisunique AS is_unique,
3065 ix.indisprimary AS is_primary,
3066 am.amname AS method,
3067 array_agg(pg_get_indexdef(ix.indexrelid, s.n, true) ORDER BY s.n) AS columns,
3068 pg_get_expr(ix.indpred, ix.indrelid) AS where_clause
3069FROM pg_index ix
3070JOIN pg_class idx ON idx.oid = ix.indexrelid
3071JOIN pg_class tbl ON tbl.oid = ix.indrelid
3072JOIN pg_namespace ns ON ns.oid = tbl.relnamespace
3073JOIN pg_am am ON am.oid = idx.relam
3074JOIN generate_series(1, ix.indnkeyatts) AS s(n) ON TRUE
3075WHERE ns.nspname NOT LIKE 'pg_%'
3076 AND ns.nspname <> 'information_schema'
3077GROUP BY ns.nspname, tbl.relname, idx.relname, ix.indisunique, ix.indisprimary, am.amname, ix.indpred, ix.indrelid
3078ORDER BY ns.nspname, tbl.relname, idx.relname
3079"#;
3080
3081#[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
3082const POSTGRES_FOREIGN_KEYS_QUERY: &str = r#"
3083SELECT
3084 ns.nspname AS schema,
3085 tbl.relname AS table,
3086 con.conname AS name,
3087 array_agg(src.attname ORDER BY s.ord) AS columns,
3088 ns_to.nspname AS schema_to,
3089 tbl_to.relname AS table_to,
3090 array_agg(dst.attname ORDER BY s.ord) AS columns_to,
3091 con.confupdtype::text AS on_update,
3092 con.confdeltype::text AS on_delete
3093FROM pg_constraint con
3094JOIN pg_class tbl ON tbl.oid = con.conrelid
3095JOIN pg_namespace ns ON ns.oid = tbl.relnamespace
3096JOIN pg_class tbl_to ON tbl_to.oid = con.confrelid
3097JOIN pg_namespace ns_to ON ns_to.oid = tbl_to.relnamespace
3098JOIN unnest(con.conkey) WITH ORDINALITY AS s(attnum, ord) ON TRUE
3099JOIN pg_attribute src ON src.attrelid = tbl.oid AND src.attnum = s.attnum
3100JOIN unnest(con.confkey) WITH ORDINALITY AS r(attnum, ord) ON r.ord = s.ord
3101JOIN pg_attribute dst ON dst.attrelid = tbl_to.oid AND dst.attnum = r.attnum
3102WHERE con.contype = 'f'
3103 AND ns.nspname NOT LIKE 'pg_%'
3104 AND ns.nspname <> 'information_schema'
3105GROUP BY ns.nspname, tbl.relname, con.conname, ns_to.nspname, tbl_to.relname, con.confupdtype, con.confdeltype
3106ORDER BY ns.nspname, tbl.relname, con.conname
3107"#;
3108
3109#[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
3110const POSTGRES_PRIMARY_KEYS_QUERY: &str = r#"
3111SELECT
3112 ns.nspname AS schema,
3113 tbl.relname AS table,
3114 con.conname AS name,
3115 array_agg(att.attname ORDER BY s.ord) AS columns
3116FROM pg_constraint con
3117JOIN pg_class tbl ON tbl.oid = con.conrelid
3118JOIN pg_namespace ns ON ns.oid = tbl.relnamespace
3119JOIN unnest(con.conkey) WITH ORDINALITY AS s(attnum, ord) ON TRUE
3120JOIN pg_attribute att ON att.attrelid = tbl.oid AND att.attnum = s.attnum
3121WHERE con.contype = 'p'
3122 AND ns.nspname NOT LIKE 'pg_%'
3123 AND ns.nspname <> 'information_schema'
3124GROUP BY ns.nspname, tbl.relname, con.conname
3125ORDER BY ns.nspname, tbl.relname, con.conname
3126"#;
3127
3128#[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
3129const POSTGRES_UNIQUES_QUERY: &str = r#"
3130SELECT
3131 ns.nspname AS schema,
3132 tbl.relname AS table,
3133 con.conname AS name,
3134 array_agg(att.attname ORDER BY s.ord) AS columns,
3135 COALESCE(con.connullsnotdistinct, FALSE) AS nulls_not_distinct
3136FROM pg_constraint con
3137JOIN pg_class tbl ON tbl.oid = con.conrelid
3138JOIN pg_namespace ns ON ns.oid = tbl.relnamespace
3139JOIN unnest(con.conkey) WITH ORDINALITY AS s(attnum, ord) ON TRUE
3140JOIN pg_attribute att ON att.attrelid = tbl.oid AND att.attnum = s.attnum
3141WHERE con.contype = 'u'
3142 AND ns.nspname NOT LIKE 'pg_%'
3143 AND ns.nspname <> 'information_schema'
3144GROUP BY ns.nspname, tbl.relname, con.conname, con.connullsnotdistinct
3145ORDER BY ns.nspname, tbl.relname, con.conname
3146"#;
3147
3148#[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
3149const POSTGRES_CHECKS_QUERY: &str = r#"
3150SELECT
3151 ns.nspname AS schema,
3152 tbl.relname AS table,
3153 con.conname AS name,
3154 pg_get_expr(con.conbin, con.conrelid) AS expression
3155FROM pg_constraint con
3156JOIN pg_class tbl ON tbl.oid = con.conrelid
3157JOIN pg_namespace ns ON ns.oid = tbl.relnamespace
3158WHERE con.contype = 'c'
3159 AND ns.nspname NOT LIKE 'pg_%'
3160 AND ns.nspname <> 'information_schema'
3161ORDER BY ns.nspname, tbl.relname, con.conname
3162"#;
3163
3164#[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
3165const POSTGRES_ROLES_QUERY: &str = r#"
3166SELECT
3167 rolname AS name,
3168 rolcreatedb AS create_db,
3169 rolcreaterole AS create_role,
3170 rolinherit AS inherit
3171FROM pg_roles
3172ORDER BY rolname
3173"#;
3174
3175#[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
3176const POSTGRES_POLICIES_QUERY: &str = r#"
3177SELECT
3178 schemaname AS schema,
3179 tablename AS table,
3180 policyname AS name,
3181 CASE WHEN permissive THEN 'PERMISSIVE' ELSE 'RESTRICTIVE' END AS as_clause,
3182 upper(cmd) AS for_clause,
3183 roles AS to,
3184 qual AS using,
3185 with_check AS with_check
3186FROM pg_policies
3187WHERE schemaname NOT LIKE 'pg_%'
3188 AND schemaname <> 'information_schema'
3189ORDER BY schemaname, tablename, policyname
3190"#;
3191
3192#[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
3193fn pg_action_code_to_string(code: String) -> String {
3194 match code.as_str() {
3195 "a" => "NO ACTION",
3196 "r" => "RESTRICT",
3197 "c" => "CASCADE",
3198 "n" => "SET NULL",
3199 "d" => "SET DEFAULT",
3200 _ => "NO ACTION",
3201 }
3202 .to_string()
3203}
3204
3205#[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
3206fn parse_postgres_index_columns(
3207 cols: Vec<String>,
3208) -> Vec<drizzle_migrations::postgres::introspect::RawIndexColumnInfo> {
3209 use drizzle_migrations::postgres::introspect::RawIndexColumnInfo;
3210 cols.into_iter()
3211 .map(|c| {
3212 let trimmed = c.trim().to_string();
3213 let upper = trimmed.to_uppercase();
3214
3215 let asc = !upper.contains(" DESC");
3216 let nulls_first = upper.contains(" NULLS FIRST");
3217
3218 let mut core = trimmed.clone();
3220 for token in [" ASC", " DESC", " NULLS FIRST", " NULLS LAST"] {
3221 if let Some(pos) = core.to_uppercase().find(token) {
3222 core.truncate(pos);
3223 break;
3224 }
3225 }
3226 let core = core.trim().to_string();
3227
3228 let is_expression = core.contains('(')
3230 || core.contains(')')
3231 || core.contains(' ')
3232 || core.contains("::");
3233
3234 let mut opclass: Option<String> = None;
3236 let mut name = core.clone();
3237 let parts: Vec<&str> = core.split_whitespace().collect();
3238 if parts.len() >= 2 {
3239 let second = parts[1];
3240 if !matches!(second.to_uppercase().as_str(), "ASC" | "DESC" | "NULLS") {
3241 opclass = Some(second.to_string());
3242 name = parts[0].to_string();
3243 }
3244 }
3245
3246 RawIndexColumnInfo {
3247 name,
3248 is_expression,
3249 asc,
3250 nulls_first,
3251 opclass,
3252 }
3253 })
3254 .collect()
3255}
3256
3257#[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
3258fn mask_url(url: &str) -> String {
3259 if let Some(at) = url.find('@')
3260 && let Some(colon) = url[..at].rfind(':')
3261 {
3262 let scheme_end = url.find("://").map(|p| p + 3).unwrap_or(0);
3263 if colon > scheme_end {
3264 return format!("{}****{}", &url[..colon + 1], &url[at..]);
3265 }
3266 }
3267 url.to_string()
3268}