1use std::path::Path;
7
8#[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
9use crate::config::PostgresCreds;
10use crate::config::{Credentials, Dialect};
11use crate::error::CliError;
12use crate::output;
13use drizzle_migrations::MigrationSet;
14use drizzle_migrations::schema::Snapshot;
15
16#[derive(Debug)]
18pub struct MigrationResult {
19 pub applied_count: usize,
21 pub applied_migrations: Vec<String>,
23}
24
25#[derive(Debug, Clone)]
27pub struct PushPlan {
28 pub sql_statements: Vec<String>,
29 pub warnings: Vec<String>,
30 pub destructive: bool,
31}
32
33pub fn plan_push(
35 credentials: &Credentials,
36 dialect: Dialect,
37 desired: &Snapshot,
38 breakpoints: bool,
39) -> Result<PushPlan, CliError> {
40 let current = introspect_database(credentials, dialect)?.snapshot;
41 let (sql_statements, warnings) = generate_push_sql(¤t, desired, breakpoints)?;
42 let destructive = sql_statements.iter().any(|s| is_destructive_statement(s));
43
44 Ok(PushPlan {
45 sql_statements,
46 warnings,
47 destructive,
48 })
49}
50
51pub fn apply_push(
53 credentials: &Credentials,
54 dialect: Dialect,
55 plan: &PushPlan,
56 force: bool,
57) -> Result<(), CliError> {
58 if plan.sql_statements.is_empty() {
59 return Ok(());
60 }
61
62 if plan.destructive && !force {
63 let confirmed = confirm_destructive()?;
64 if !confirmed {
65 return Ok(());
66 }
67 }
68
69 execute_statements(credentials, dialect, &plan.sql_statements)
70}
71
72pub fn run_migrations(
77 credentials: &Credentials,
78 dialect: Dialect,
79 migrations_dir: &Path,
80 migrations_table: &str,
81 migrations_schema: &str,
82) -> Result<MigrationResult, CliError> {
83 let mut set = MigrationSet::from_dir(migrations_dir, dialect.to_base())
85 .map_err(|e| CliError::Other(format!("Failed to load migrations: {}", e)))?;
86
87 if !migrations_table.trim().is_empty() {
89 set = set.with_table(migrations_table.to_string());
90 }
91 if dialect == Dialect::Postgresql && !migrations_schema.trim().is_empty() {
92 set = set.with_schema(migrations_schema.to_string());
93 }
94
95 if set.all().is_empty() {
96 return Ok(MigrationResult {
97 applied_count: 0,
98 applied_migrations: vec![],
99 });
100 }
101
102 match credentials {
103 #[cfg(feature = "rusqlite")]
104 Credentials::Sqlite { path } => run_sqlite_migrations(&set, path),
105
106 #[cfg(not(feature = "rusqlite"))]
107 Credentials::Sqlite { .. } => Err(CliError::MissingDriver {
108 dialect: "SQLite",
109 feature: "rusqlite",
110 }),
111
112 #[cfg(any(feature = "libsql", feature = "turso"))]
113 Credentials::Turso { url, auth_token } => {
114 let _auth_token = auth_token.as_deref();
115 if is_local_libsql(url) {
116 #[cfg(feature = "libsql")]
117 {
118 run_libsql_local_migrations(&set, url)
119 }
120 #[cfg(not(feature = "libsql"))]
121 {
122 Err(CliError::MissingDriver {
123 dialect: "LibSQL (local)",
124 feature: "libsql",
125 })
126 }
127 } else {
128 #[cfg(feature = "turso")]
129 {
130 run_turso_migrations(&set, url, _auth_token)
131 }
132 #[cfg(not(feature = "turso"))]
133 {
134 Err(CliError::MissingDriver {
135 dialect: "Turso (remote)",
136 feature: "turso",
137 })
138 }
139 }
140 }
141
142 #[cfg(all(not(feature = "turso"), not(feature = "libsql")))]
143 Credentials::Turso { .. } => Err(CliError::MissingDriver {
144 dialect: "Turso",
145 feature: "turso or libsql",
146 }),
147
148 #[cfg(feature = "postgres-sync")]
150 Credentials::Postgres(creds) => run_postgres_sync_migrations(&set, creds),
151
152 #[cfg(all(not(feature = "postgres-sync"), feature = "tokio-postgres"))]
153 Credentials::Postgres(creds) => run_postgres_async_migrations(&set, creds),
154
155 #[cfg(all(not(feature = "postgres-sync"), not(feature = "tokio-postgres")))]
156 Credentials::Postgres(_) => Err(CliError::MissingDriver {
157 dialect: "PostgreSQL",
158 feature: "postgres-sync or tokio-postgres",
159 }),
160 }
161}
162
163fn is_destructive_statement(sql: &str) -> bool {
164 let s = sql.trim().to_uppercase();
165 s.contains("DROP TABLE")
166 || s.contains("DROP COLUMN")
167 || s.contains("DROP INDEX")
168 || s.contains("TRUNCATE")
169 || (s.contains("ALTER TABLE") && s.contains(" DROP "))
170}
171
172fn confirm_destructive() -> Result<bool, CliError> {
173 use std::io::{self, IsTerminal, Write};
174
175 if !io::stdin().is_terminal() {
176 return Err(CliError::Other(
177 "Refusing to apply potentially destructive changes in non-interactive mode. Use --explain or --force."
178 .into(),
179 ));
180 }
181
182 println!(
183 "{}",
184 output::warning("Potentially destructive changes detected (DROP/TRUNCATE/etc).")
185 );
186 print!("Apply anyway? [y/N]: ");
187 io::stdout()
188 .flush()
189 .map_err(|e| CliError::IoError(e.to_string()))?;
190
191 let mut line = String::new();
192 io::stdin()
193 .read_line(&mut line)
194 .map_err(|e| CliError::IoError(e.to_string()))?;
195 let ans = line.trim().to_ascii_lowercase();
196 Ok(ans == "y" || ans == "yes")
197}
198
199fn generate_push_sql(
200 current: &Snapshot,
201 desired: &Snapshot,
202 breakpoints: bool,
203) -> Result<(Vec<String>, Vec<String>), CliError> {
204 match (current, desired) {
205 (Snapshot::Sqlite(prev_snap), Snapshot::Sqlite(curr_snap)) => {
206 use drizzle_migrations::sqlite::collection::SQLiteDDL;
207 use drizzle_migrations::sqlite::diff::compute_migration;
208
209 let prev_ddl = SQLiteDDL::from_entities(prev_snap.ddl.clone());
210 let cur_ddl = SQLiteDDL::from_entities(curr_snap.ddl.clone());
211
212 let diff = compute_migration(&prev_ddl, &cur_ddl);
213 Ok((diff.sql_statements, diff.warnings))
214 }
215 (Snapshot::Postgres(prev_snap), Snapshot::Postgres(curr_snap)) => {
216 use drizzle_migrations::postgres::diff_full_snapshots;
217 use drizzle_migrations::postgres::statements::PostgresGenerator;
218
219 let diff = diff_full_snapshots(prev_snap, curr_snap);
220 let generator = PostgresGenerator::new().with_breakpoints(breakpoints);
221 Ok((generator.generate(&diff.diffs), Vec::new()))
222 }
223 _ => Err(CliError::DialectMismatch),
224 }
225}
226
227fn execute_statements(
228 credentials: &Credentials,
229 _dialect: Dialect,
230 statements: &[String],
231) -> Result<(), CliError> {
232 let _ = statements;
235
236 match credentials {
237 #[cfg(feature = "rusqlite")]
238 Credentials::Sqlite { path } => execute_sqlite_statements(path, statements),
239
240 #[cfg(not(feature = "rusqlite"))]
241 Credentials::Sqlite { .. } => Err(CliError::MissingDriver {
242 dialect: "SQLite",
243 feature: "rusqlite",
244 }),
245
246 #[cfg(any(feature = "libsql", feature = "turso"))]
247 Credentials::Turso { url, auth_token } => {
248 let _auth_token = auth_token.as_deref();
249 if is_local_libsql(url) {
250 #[cfg(feature = "libsql")]
251 {
252 execute_libsql_local_statements(url, statements)
253 }
254 #[cfg(not(feature = "libsql"))]
255 {
256 Err(CliError::MissingDriver {
257 dialect: "LibSQL (local)",
258 feature: "libsql",
259 })
260 }
261 } else {
262 #[cfg(feature = "turso")]
263 {
264 execute_turso_statements(url, _auth_token, statements)
265 }
266 #[cfg(not(feature = "turso"))]
267 {
268 Err(CliError::MissingDriver {
269 dialect: "Turso (remote)",
270 feature: "turso",
271 })
272 }
273 }
274 }
275
276 #[cfg(all(not(feature = "turso"), not(feature = "libsql")))]
277 Credentials::Turso { .. } => Err(CliError::MissingDriver {
278 dialect: "Turso",
279 feature: "turso or libsql",
280 }),
281
282 #[cfg(feature = "postgres-sync")]
283 Credentials::Postgres(creds) => execute_postgres_sync_statements(creds, statements),
284
285 #[cfg(all(not(feature = "postgres-sync"), feature = "tokio-postgres"))]
286 Credentials::Postgres(creds) => execute_postgres_async_statements(creds, statements),
287
288 #[cfg(all(not(feature = "postgres-sync"), not(feature = "tokio-postgres")))]
289 Credentials::Postgres(_) => Err(CliError::MissingDriver {
290 dialect: "PostgreSQL",
291 feature: "postgres-sync or tokio-postgres",
292 }),
293 }
294}
295
296#[allow(dead_code)]
298fn is_local_libsql(url: &str) -> bool {
299 url.starts_with("file:")
300 || url.starts_with("./")
301 || url.starts_with("/")
302 || !url.contains("://")
303}
304
305#[cfg(any(feature = "rusqlite", feature = "libsql", feature = "turso"))]
306fn process_sqlite_uniques_from_indexes(
307 raw_indexes: &[drizzle_migrations::sqlite::introspect::RawIndexInfo],
308 index_columns: &[drizzle_migrations::sqlite::introspect::RawIndexColumn],
309) -> Vec<drizzle_migrations::sqlite::UniqueConstraint> {
310 use drizzle_migrations::sqlite::UniqueConstraint;
311
312 let mut uniques = Vec::new();
313
314 for idx in raw_indexes.iter().filter(|i| i.origin == "u") {
315 let mut cols: Vec<(i32, String)> = index_columns
316 .iter()
317 .filter(|c| c.index_name == idx.name && c.key)
318 .filter_map(|c| c.name.clone().map(|n| (c.seqno, n)))
319 .collect();
320 cols.sort_by_key(|(seq, _)| *seq);
321 let col_names: Vec<String> = cols.into_iter().map(|(_, n)| n).collect();
322 if col_names.is_empty() {
323 continue;
324 }
325
326 let name_explicit = !idx.name.starts_with("sqlite_autoindex_");
327 let constraint_name = if name_explicit {
328 idx.name.clone()
329 } else {
330 let refs: Vec<&str> = col_names.iter().map(String::as_str).collect();
331 drizzle_migrations::sqlite::ddl::name_for_unique(&idx.table, &refs)
332 };
333
334 let mut uniq =
335 UniqueConstraint::from_strings(idx.table.clone(), constraint_name, col_names);
336 uniq.name_explicit = name_explicit;
337 uniques.push(uniq);
338 }
339
340 uniques
341}
342
343#[cfg(feature = "rusqlite")]
348fn execute_sqlite_statements(path: &str, statements: &[String]) -> Result<(), CliError> {
349 let conn = rusqlite::Connection::open(path).map_err(|e| {
350 CliError::ConnectionError(format!("Failed to open SQLite database '{}': {}", path, e))
351 })?;
352
353 conn.execute("BEGIN", [])
354 .map_err(|e| CliError::MigrationError(e.to_string()))?;
355
356 for stmt in statements {
357 let s = stmt.trim();
358 if s.is_empty() {
359 continue;
360 }
361 if let Err(e) = conn.execute(s, []) {
362 let _ = conn.execute("ROLLBACK", []);
363 return Err(CliError::MigrationError(format!(
364 "Statement failed: {}\n{}",
365 e, s
366 )));
367 }
368 }
369
370 conn.execute("COMMIT", [])
371 .map_err(|e| CliError::MigrationError(e.to_string()))?;
372
373 Ok(())
374}
375
376#[cfg(feature = "rusqlite")]
377fn run_sqlite_migrations(set: &MigrationSet, path: &str) -> Result<MigrationResult, CliError> {
378 let conn = rusqlite::Connection::open(path).map_err(|e| {
379 CliError::ConnectionError(format!("Failed to open SQLite database '{}': {}", path, e))
380 })?;
381
382 conn.execute(&set.create_table_sql(), []).map_err(|e| {
384 CliError::MigrationError(format!("Failed to create migrations table: {}", e))
385 })?;
386
387 let applied_hashes = query_applied_hashes_sqlite(&conn, set)?;
389
390 let pending: Vec<_> = set.pending(&applied_hashes).collect();
392 if pending.is_empty() {
393 return Ok(MigrationResult {
394 applied_count: 0,
395 applied_migrations: vec![],
396 });
397 }
398
399 conn.execute("BEGIN", [])
401 .map_err(|e| CliError::MigrationError(e.to_string()))?;
402
403 let mut applied = Vec::new();
404 for migration in &pending {
405 for stmt in migration.statements() {
406 if !stmt.trim().is_empty()
407 && let Err(e) = conn.execute(stmt, [])
408 {
409 let _ = conn.execute("ROLLBACK", []);
410 return Err(CliError::MigrationError(format!(
411 "Migration '{}' failed: {}",
412 migration.hash(),
413 e
414 )));
415 }
416 }
417 if let Err(e) = conn.execute(
418 &set.record_migration_sql(migration.hash(), migration.created_at()),
419 [],
420 ) {
421 let _ = conn.execute("ROLLBACK", []);
422 return Err(CliError::MigrationError(e.to_string()));
423 }
424 applied.push(migration.hash().to_string());
425 }
426
427 conn.execute("COMMIT", [])
428 .map_err(|e| CliError::MigrationError(e.to_string()))?;
429
430 Ok(MigrationResult {
431 applied_count: applied.len(),
432 applied_migrations: applied,
433 })
434}
435
436#[cfg(feature = "rusqlite")]
437fn query_applied_hashes_sqlite(
438 conn: &rusqlite::Connection,
439 set: &MigrationSet,
440) -> Result<Vec<String>, CliError> {
441 let mut stmt = match conn.prepare(&set.query_all_hashes_sql()) {
442 Ok(s) => s,
443 Err(_) => return Ok(vec![]), };
445
446 let hashes = stmt
447 .query_map([], |row| row.get(0))
448 .map_err(|e| CliError::MigrationError(e.to_string()))?
449 .filter_map(Result::ok)
450 .collect();
451
452 Ok(hashes)
453}
454
455#[cfg(feature = "postgres-sync")]
460fn execute_postgres_sync_statements(
461 creds: &PostgresCreds,
462 statements: &[String],
463) -> Result<(), CliError> {
464 let url = creds.connection_url();
465 let mut client = postgres::Client::connect(&url, postgres::NoTls).map_err(|e| {
466 CliError::ConnectionError(format!("Failed to connect to PostgreSQL: {}", e))
467 })?;
468
469 let mut tx = client
470 .transaction()
471 .map_err(|e| CliError::MigrationError(e.to_string()))?;
472
473 for stmt in statements {
474 let s = stmt.trim();
475 if s.is_empty() {
476 continue;
477 }
478 tx.execute(s, &[])
479 .map_err(|e| CliError::MigrationError(format!("Statement failed: {}\n{}", e, s)))?;
480 }
481
482 tx.commit()
483 .map_err(|e| CliError::MigrationError(e.to_string()))?;
484
485 Ok(())
486}
487
488#[cfg(feature = "postgres-sync")]
489fn run_postgres_sync_migrations(
490 set: &MigrationSet,
491 creds: &PostgresCreds,
492) -> Result<MigrationResult, CliError> {
493 let url = creds.connection_url();
494 let mut client = postgres::Client::connect(&url, postgres::NoTls).map_err(|e| {
495 CliError::ConnectionError(format!("Failed to connect to PostgreSQL: {}", e))
496 })?;
497
498 if let Some(schema_sql) = set.create_schema_sql() {
500 client
501 .execute(&schema_sql, &[])
502 .map_err(|e| CliError::MigrationError(e.to_string()))?;
503 }
504
505 client
507 .execute(&set.create_table_sql(), &[])
508 .map_err(|e| CliError::MigrationError(e.to_string()))?;
509
510 let rows = client
512 .query(&set.query_all_hashes_sql(), &[])
513 .unwrap_or_default();
514 let applied_hashes: Vec<String> = rows.iter().map(|r| r.get(0)).collect();
515
516 let pending: Vec<_> = set.pending(&applied_hashes).collect();
518 if pending.is_empty() {
519 return Ok(MigrationResult {
520 applied_count: 0,
521 applied_migrations: vec![],
522 });
523 }
524
525 let mut tx = client
527 .transaction()
528 .map_err(|e| CliError::MigrationError(e.to_string()))?;
529
530 let mut applied = Vec::new();
531 for migration in &pending {
532 for stmt in migration.statements() {
533 if !stmt.trim().is_empty() {
534 tx.execute(stmt, &[]).map_err(|e| {
535 CliError::MigrationError(format!(
536 "Migration '{}' failed: {}",
537 migration.hash(),
538 e
539 ))
540 })?;
541 }
542 }
543 tx.execute(
544 &set.record_migration_sql(migration.hash(), migration.created_at()),
545 &[],
546 )
547 .map_err(|e| CliError::MigrationError(e.to_string()))?;
548 applied.push(migration.hash().to_string());
549 }
550
551 tx.commit()
552 .map_err(|e| CliError::MigrationError(e.to_string()))?;
553
554 Ok(MigrationResult {
555 applied_count: applied.len(),
556 applied_migrations: applied,
557 })
558}
559
560#[cfg(feature = "tokio-postgres")]
565fn execute_postgres_async_statements(
566 creds: &PostgresCreds,
567 statements: &[String],
568) -> Result<(), CliError> {
569 let rt = tokio::runtime::Builder::new_current_thread()
570 .enable_all()
571 .build()
572 .map_err(|e| CliError::Other(format!("Failed to create async runtime: {}", e)))?;
573
574 rt.block_on(execute_postgres_async_inner(creds, statements))
575}
576
577#[cfg(feature = "tokio-postgres")]
578async fn execute_postgres_async_inner(
579 creds: &PostgresCreds,
580 statements: &[String],
581) -> Result<(), CliError> {
582 let url = creds.connection_url();
583 let (mut client, connection) = tokio_postgres::connect(&url, tokio_postgres::NoTls)
584 .await
585 .map_err(|e| {
586 CliError::ConnectionError(format!("Failed to connect to PostgreSQL: {}", e))
587 })?;
588
589 tokio::spawn(async move {
590 if let Err(e) = connection.await {
591 eprintln!(
592 "{}",
593 output::err_line(&format!("PostgreSQL connection error: {e}"))
594 );
595 }
596 });
597
598 let tx = client
599 .transaction()
600 .await
601 .map_err(|e| CliError::MigrationError(e.to_string()))?;
602
603 for stmt in statements {
604 let s = stmt.trim();
605 if s.is_empty() {
606 continue;
607 }
608 tx.execute(s, &[])
609 .await
610 .map_err(|e| CliError::MigrationError(format!("Statement failed: {}\n{}", e, s)))?;
611 }
612
613 tx.commit()
614 .await
615 .map_err(|e| CliError::MigrationError(e.to_string()))?;
616
617 Ok(())
618}
619
620#[cfg(feature = "tokio-postgres")]
621#[allow(dead_code)] fn run_postgres_async_migrations(
623 set: &MigrationSet,
624 creds: &PostgresCreds,
625) -> Result<MigrationResult, CliError> {
626 let rt = tokio::runtime::Builder::new_current_thread()
627 .enable_all()
628 .build()
629 .map_err(|e| CliError::Other(format!("Failed to create async runtime: {}", e)))?;
630
631 rt.block_on(run_postgres_async_inner(set, creds))
632}
633
634#[cfg(feature = "tokio-postgres")]
635#[allow(dead_code)]
636async fn run_postgres_async_inner(
637 set: &MigrationSet,
638 creds: &PostgresCreds,
639) -> Result<MigrationResult, CliError> {
640 let url = creds.connection_url();
641 let (mut client, connection) = tokio_postgres::connect(&url, tokio_postgres::NoTls)
642 .await
643 .map_err(|e| {
644 CliError::ConnectionError(format!("Failed to connect to PostgreSQL: {}", e))
645 })?;
646
647 tokio::spawn(async move {
649 if let Err(e) = connection.await {
650 eprintln!(
651 "{}",
652 output::err_line(&format!("PostgreSQL connection error: {e}"))
653 );
654 }
655 });
656
657 if let Some(schema_sql) = set.create_schema_sql() {
659 client
660 .execute(&schema_sql, &[])
661 .await
662 .map_err(|e| CliError::MigrationError(e.to_string()))?;
663 }
664
665 client
667 .execute(&set.create_table_sql(), &[])
668 .await
669 .map_err(|e| CliError::MigrationError(e.to_string()))?;
670
671 let rows = client
673 .query(&set.query_all_hashes_sql(), &[])
674 .await
675 .unwrap_or_default();
676 let applied_hashes: Vec<String> = rows.iter().map(|r| r.get(0)).collect();
677
678 let pending: Vec<_> = set.pending(&applied_hashes).collect();
680 if pending.is_empty() {
681 return Ok(MigrationResult {
682 applied_count: 0,
683 applied_migrations: vec![],
684 });
685 }
686
687 let tx = client
689 .transaction()
690 .await
691 .map_err(|e| CliError::MigrationError(e.to_string()))?;
692
693 let mut applied = Vec::new();
694 for migration in &pending {
695 for stmt in migration.statements() {
696 if !stmt.trim().is_empty() {
697 tx.execute(stmt, &[]).await.map_err(|e| {
698 CliError::MigrationError(format!(
699 "Migration '{}' failed: {}",
700 migration.hash(),
701 e
702 ))
703 })?;
704 }
705 }
706 tx.execute(
707 &set.record_migration_sql(migration.hash(), migration.created_at()),
708 &[],
709 )
710 .await
711 .map_err(|e| CliError::MigrationError(e.to_string()))?;
712 applied.push(migration.hash().to_string());
713 }
714
715 tx.commit()
716 .await
717 .map_err(|e| CliError::MigrationError(e.to_string()))?;
718
719 Ok(MigrationResult {
720 applied_count: applied.len(),
721 applied_migrations: applied,
722 })
723}
724
725#[cfg(feature = "libsql")]
730fn execute_libsql_local_statements(path: &str, statements: &[String]) -> Result<(), CliError> {
731 let rt = tokio::runtime::Builder::new_current_thread()
732 .enable_all()
733 .build()
734 .map_err(|e| CliError::Other(format!("Failed to create async runtime: {}", e)))?;
735
736 rt.block_on(execute_libsql_local_inner(path, statements))
737}
738
739#[cfg(feature = "libsql")]
740async fn execute_libsql_local_inner(path: &str, statements: &[String]) -> Result<(), CliError> {
741 let db = libsql::Builder::new_local(path)
742 .build()
743 .await
744 .map_err(|e| {
745 CliError::ConnectionError(format!("Failed to open LibSQL database '{}': {}", path, e))
746 })?;
747
748 let conn = db
749 .connect()
750 .map_err(|e| CliError::ConnectionError(e.to_string()))?;
751
752 let tx = conn
753 .transaction()
754 .await
755 .map_err(|e| CliError::MigrationError(e.to_string()))?;
756
757 for stmt in statements {
758 let s = stmt.trim();
759 if s.is_empty() {
760 continue;
761 }
762 if let Err(e) = tx.execute(s, ()).await {
763 tx.rollback().await.ok();
764 return Err(CliError::MigrationError(format!(
765 "Statement failed: {}\n{}",
766 e, s
767 )));
768 }
769 }
770
771 tx.commit()
772 .await
773 .map_err(|e| CliError::MigrationError(e.to_string()))?;
774
775 Ok(())
776}
777
778#[cfg(feature = "libsql")]
779fn run_libsql_local_migrations(
780 set: &MigrationSet,
781 path: &str,
782) -> Result<MigrationResult, CliError> {
783 let rt = tokio::runtime::Builder::new_current_thread()
784 .enable_all()
785 .build()
786 .map_err(|e| CliError::Other(format!("Failed to create async runtime: {}", e)))?;
787
788 rt.block_on(run_libsql_local_inner(set, path))
789}
790
791#[cfg(feature = "libsql")]
792async fn run_libsql_local_inner(
793 set: &MigrationSet,
794 path: &str,
795) -> Result<MigrationResult, CliError> {
796 let db = libsql::Builder::new_local(path)
797 .build()
798 .await
799 .map_err(|e| {
800 CliError::ConnectionError(format!("Failed to open LibSQL database '{}': {}", path, e))
801 })?;
802
803 let conn = db
804 .connect()
805 .map_err(|e| CliError::ConnectionError(e.to_string()))?;
806
807 conn.execute(&set.create_table_sql(), ())
809 .await
810 .map_err(|e| {
811 CliError::MigrationError(format!("Failed to create migrations table: {}", e))
812 })?;
813
814 let applied_hashes = query_applied_hashes_libsql(&conn, set).await?;
816
817 let pending: Vec<_> = set.pending(&applied_hashes).collect();
819 if pending.is_empty() {
820 return Ok(MigrationResult {
821 applied_count: 0,
822 applied_migrations: vec![],
823 });
824 }
825
826 let tx = conn
828 .transaction()
829 .await
830 .map_err(|e| CliError::MigrationError(e.to_string()))?;
831
832 let mut applied = Vec::new();
833 for migration in &pending {
834 for stmt in migration.statements() {
835 if !stmt.trim().is_empty()
836 && let Err(e) = tx.execute(stmt, ()).await
837 {
838 tx.rollback().await.ok();
839 return Err(CliError::MigrationError(format!(
840 "Migration '{}' failed: {}",
841 migration.hash(),
842 e
843 )));
844 }
845 }
846 if let Err(e) = tx
847 .execute(
848 &set.record_migration_sql(migration.hash(), migration.created_at()),
849 (),
850 )
851 .await
852 {
853 tx.rollback().await.ok();
854 return Err(CliError::MigrationError(e.to_string()));
855 }
856 applied.push(migration.hash().to_string());
857 }
858
859 tx.commit()
860 .await
861 .map_err(|e| CliError::MigrationError(e.to_string()))?;
862
863 Ok(MigrationResult {
864 applied_count: applied.len(),
865 applied_migrations: applied,
866 })
867}
868
869#[cfg(feature = "libsql")]
870async fn query_applied_hashes_libsql(
871 conn: &libsql::Connection,
872 set: &MigrationSet,
873) -> Result<Vec<String>, CliError> {
874 let mut rows = match conn.query(&set.query_all_hashes_sql(), ()).await {
875 Ok(r) => r,
876 Err(_) => return Ok(vec![]), };
878
879 let mut hashes = Vec::new();
880 while let Ok(Some(row)) = rows.next().await {
881 if let Ok(hash) = row.get::<String>(0) {
882 hashes.push(hash);
883 }
884 }
885
886 Ok(hashes)
887}
888
889#[cfg(feature = "turso")]
894fn execute_turso_statements(
895 url: &str,
896 auth_token: Option<&str>,
897 statements: &[String],
898) -> Result<(), CliError> {
899 let rt = tokio::runtime::Builder::new_current_thread()
900 .enable_all()
901 .build()
902 .map_err(|e| CliError::Other(format!("Failed to create async runtime: {}", e)))?;
903
904 rt.block_on(execute_turso_inner(url, auth_token, statements))
905}
906
907#[cfg(feature = "turso")]
908async fn execute_turso_inner(
909 url: &str,
910 auth_token: Option<&str>,
911 statements: &[String],
912) -> Result<(), CliError> {
913 let builder =
914 libsql::Builder::new_remote(url.to_string(), auth_token.unwrap_or("").to_string());
915
916 let db = builder.build().await.map_err(|e| {
917 CliError::ConnectionError(format!("Failed to connect to Turso '{}': {}", url, e))
918 })?;
919
920 let conn = db
921 .connect()
922 .map_err(|e| CliError::ConnectionError(e.to_string()))?;
923
924 let tx = conn
925 .transaction()
926 .await
927 .map_err(|e| CliError::MigrationError(e.to_string()))?;
928
929 for stmt in statements {
930 let s = stmt.trim();
931 if s.is_empty() {
932 continue;
933 }
934 if let Err(e) = tx.execute(s, ()).await {
935 tx.rollback().await.ok();
936 return Err(CliError::MigrationError(format!(
937 "Statement failed: {}\n{}",
938 e, s
939 )));
940 }
941 }
942
943 tx.commit()
944 .await
945 .map_err(|e| CliError::MigrationError(e.to_string()))?;
946
947 Ok(())
948}
949
950#[cfg(feature = "turso")]
951fn run_turso_migrations(
952 set: &MigrationSet,
953 url: &str,
954 auth_token: Option<&str>,
955) -> Result<MigrationResult, CliError> {
956 let rt = tokio::runtime::Builder::new_current_thread()
957 .enable_all()
958 .build()
959 .map_err(|e| CliError::Other(format!("Failed to create async runtime: {}", e)))?;
960
961 rt.block_on(run_turso_inner(set, url, auth_token))
962}
963
964#[cfg(feature = "turso")]
965async fn run_turso_inner(
966 set: &MigrationSet,
967 url: &str,
968 auth_token: Option<&str>,
969) -> Result<MigrationResult, CliError> {
970 let builder =
971 libsql::Builder::new_remote(url.to_string(), auth_token.unwrap_or("").to_string());
972
973 let db = builder.build().await.map_err(|e| {
974 CliError::ConnectionError(format!("Failed to connect to Turso '{}': {}", url, e))
975 })?;
976
977 let conn = db
978 .connect()
979 .map_err(|e| CliError::ConnectionError(e.to_string()))?;
980
981 conn.execute(&set.create_table_sql(), ())
983 .await
984 .map_err(|e| {
985 CliError::MigrationError(format!("Failed to create migrations table: {}", e))
986 })?;
987
988 let applied_hashes = query_applied_hashes_turso(&conn, set).await?;
990
991 let pending: Vec<_> = set.pending(&applied_hashes).collect();
993 if pending.is_empty() {
994 return Ok(MigrationResult {
995 applied_count: 0,
996 applied_migrations: vec![],
997 });
998 }
999
1000 let tx = conn
1002 .transaction()
1003 .await
1004 .map_err(|e| CliError::MigrationError(e.to_string()))?;
1005
1006 let mut applied = Vec::new();
1007 for migration in &pending {
1008 for stmt in migration.statements() {
1009 if !stmt.trim().is_empty()
1010 && let Err(e) = tx.execute(stmt, ()).await
1011 {
1012 tx.rollback().await.ok();
1013 return Err(CliError::MigrationError(format!(
1014 "Migration '{}' failed: {}",
1015 migration.hash(),
1016 e
1017 )));
1018 }
1019 }
1020 if let Err(e) = tx
1021 .execute(
1022 &set.record_migration_sql(migration.hash(), migration.created_at()),
1023 (),
1024 )
1025 .await
1026 {
1027 tx.rollback().await.ok();
1028 return Err(CliError::MigrationError(e.to_string()));
1029 }
1030 applied.push(migration.hash().to_string());
1031 }
1032
1033 tx.commit()
1034 .await
1035 .map_err(|e| CliError::MigrationError(e.to_string()))?;
1036
1037 Ok(MigrationResult {
1038 applied_count: applied.len(),
1039 applied_migrations: applied,
1040 })
1041}
1042
1043#[cfg(feature = "turso")]
1044async fn query_applied_hashes_turso(
1045 conn: &libsql::Connection,
1046 set: &MigrationSet,
1047) -> Result<Vec<String>, CliError> {
1048 let mut rows = match conn.query(&set.query_all_hashes_sql(), ()).await {
1049 Ok(r) => r,
1050 Err(_) => return Ok(vec![]), };
1052
1053 let mut hashes = Vec::new();
1054 while let Ok(Some(row)) = rows.next().await {
1055 if let Ok(hash) = row.get::<String>(0) {
1056 hashes.push(hash);
1057 }
1058 }
1059
1060 Ok(hashes)
1061}
1062
1063#[derive(Debug)]
1069pub struct IntrospectResult {
1070 pub schema_code: String,
1072 pub table_count: usize,
1074 pub index_count: usize,
1076 pub view_count: usize,
1078 pub warnings: Vec<String>,
1080 pub snapshot: Snapshot,
1082 pub snapshot_path: std::path::PathBuf,
1084}
1085
1086pub fn run_introspection(
1090 credentials: &Credentials,
1091 dialect: Dialect,
1092 out_dir: &Path,
1093 init_metadata: bool,
1094 migrations_table: &str,
1095 migrations_schema: &str,
1096) -> Result<IntrospectResult, CliError> {
1097 use drizzle_migrations::journal::Journal;
1098 use drizzle_migrations::words::generate_migration_tag;
1099
1100 let mut result = introspect_database(credentials, dialect)?;
1102
1103 let schema_path = out_dir.join("schema.rs");
1105 if let Some(parent) = schema_path.parent() {
1106 std::fs::create_dir_all(parent).map_err(|e| {
1107 CliError::Other(format!(
1108 "Failed to create output directory '{}': {}",
1109 parent.display(),
1110 e
1111 ))
1112 })?;
1113 }
1114 std::fs::write(&schema_path, &result.schema_code).map_err(|e| {
1115 CliError::Other(format!(
1116 "Failed to write schema file '{}': {}",
1117 schema_path.display(),
1118 e
1119 ))
1120 })?;
1121
1122 let meta_dir = out_dir.join("meta");
1124 std::fs::create_dir_all(&meta_dir).map_err(|e| {
1125 CliError::Other(format!(
1126 "Failed to create meta directory '{}': {}",
1127 meta_dir.display(),
1128 e
1129 ))
1130 })?;
1131
1132 let journal_path = meta_dir.join("_journal.json");
1134 let mut journal = Journal::load_or_create(&journal_path, dialect.to_base())
1135 .map_err(|e| CliError::Other(format!("Failed to load journal: {}", e)))?;
1136
1137 let tag = generate_migration_tag(None);
1139
1140 let migration_dir = out_dir.join(&tag);
1142 std::fs::create_dir_all(&migration_dir).map_err(|e| {
1143 CliError::Other(format!(
1144 "Failed to create migration directory '{}': {}",
1145 migration_dir.display(),
1146 e
1147 ))
1148 })?;
1149
1150 let snapshot_path = migration_dir.join("snapshot.json");
1152 result.snapshot.save(&snapshot_path).map_err(|e| {
1153 CliError::Other(format!(
1154 "Failed to write snapshot file '{}': {}",
1155 snapshot_path.display(),
1156 e
1157 ))
1158 })?;
1159
1160 let base_dialect = dialect.to_base();
1162 let empty_snapshot = Snapshot::empty(base_dialect);
1163 let sql_statements = generate_introspect_migration(&empty_snapshot, &result.snapshot, true)?;
1164
1165 let migration_sql_path = migration_dir.join("migration.sql");
1167 let sql_content = if sql_statements.is_empty() {
1168 "-- No tables to create (empty database)\n".to_string()
1169 } else {
1170 sql_statements.join("\n--> statement-breakpoint\n")
1171 };
1172 std::fs::write(&migration_sql_path, &sql_content).map_err(|e| {
1173 CliError::Other(format!(
1174 "Failed to write migration file '{}': {}",
1175 migration_sql_path.display(),
1176 e
1177 ))
1178 })?;
1179
1180 result.snapshot_path = snapshot_path;
1182
1183 journal.add_entry(tag.clone(), true); journal
1186 .save(&journal_path)
1187 .map_err(|e| CliError::Other(format!("Failed to save journal: {}", e)))?;
1188
1189 if init_metadata {
1190 apply_init_metadata(
1191 credentials,
1192 dialect,
1193 out_dir,
1194 migrations_table,
1195 migrations_schema,
1196 )?;
1197 }
1198
1199 Ok(result)
1200}
1201
1202fn apply_init_metadata(
1207 credentials: &Credentials,
1208 dialect: Dialect,
1209 out_dir: &Path,
1210 migrations_table: &str,
1211 migrations_schema: &str,
1212) -> Result<(), CliError> {
1213 use drizzle_migrations::MigrationSet;
1214
1215 let mut set = MigrationSet::from_dir(out_dir, dialect.to_base())
1216 .map_err(|e| CliError::Other(format!("Failed to load migrations: {}", e)))?;
1217
1218 if !migrations_table.trim().is_empty() {
1219 set = set.with_table(migrations_table.to_string());
1220 }
1221 if dialect == Dialect::Postgresql && !migrations_schema.trim().is_empty() {
1222 set = set.with_schema(migrations_schema.to_string());
1223 }
1224
1225 if set.all().is_empty() {
1226 return Err(CliError::Other(
1227 "--init can't be used with empty migrations".into(),
1228 ));
1229 }
1230
1231 match credentials {
1232 #[cfg(feature = "rusqlite")]
1233 Credentials::Sqlite { path } => init_sqlite_metadata(path, &set),
1234
1235 #[cfg(not(feature = "rusqlite"))]
1236 Credentials::Sqlite { .. } => Err(CliError::MissingDriver {
1237 dialect: "SQLite",
1238 feature: "rusqlite",
1239 }),
1240
1241 #[cfg(any(feature = "libsql", feature = "turso"))]
1242 Credentials::Turso { url, auth_token } => {
1243 let _auth_token = auth_token.as_deref();
1244 if is_local_libsql(url) {
1245 #[cfg(feature = "libsql")]
1246 {
1247 init_libsql_local_metadata(url, &set)
1248 }
1249 #[cfg(not(feature = "libsql"))]
1250 {
1251 Err(CliError::MissingDriver {
1252 dialect: "LibSQL (local)",
1253 feature: "libsql",
1254 })
1255 }
1256 } else {
1257 #[cfg(feature = "turso")]
1258 {
1259 init_turso_metadata(url, _auth_token, &set)
1260 }
1261 #[cfg(not(feature = "turso"))]
1262 {
1263 Err(CliError::MissingDriver {
1264 dialect: "Turso (remote)",
1265 feature: "turso",
1266 })
1267 }
1268 }
1269 }
1270
1271 #[cfg(all(not(feature = "turso"), not(feature = "libsql")))]
1272 Credentials::Turso { .. } => Err(CliError::MissingDriver {
1273 dialect: "Turso",
1274 feature: "turso or libsql",
1275 }),
1276
1277 #[cfg(feature = "postgres-sync")]
1278 Credentials::Postgres(creds) => init_postgres_sync_metadata(creds, &set),
1279
1280 #[cfg(all(not(feature = "postgres-sync"), feature = "tokio-postgres"))]
1281 Credentials::Postgres(creds) => init_postgres_async_metadata(creds, &set),
1282
1283 #[cfg(all(not(feature = "postgres-sync"), not(feature = "tokio-postgres")))]
1284 Credentials::Postgres(_) => Err(CliError::MissingDriver {
1285 dialect: "PostgreSQL",
1286 feature: "postgres-sync or tokio-postgres",
1287 }),
1288 }
1289}
1290
1291#[cfg(any(
1292 feature = "rusqlite",
1293 feature = "libsql",
1294 feature = "turso",
1295 feature = "postgres-sync",
1296 feature = "tokio-postgres"
1297))]
1298fn validate_init_metadata(applied_hashes: &[String], set: &MigrationSet) -> Result<(), CliError> {
1299 if !applied_hashes.is_empty() {
1300 return Err(CliError::Other(
1301 "--init can't be used when database already has migrations set".into(),
1302 ));
1303 }
1304
1305 let first = set
1306 .all()
1307 .first()
1308 .ok_or_else(|| CliError::Other("--init can't be used with empty migrations".into()))?;
1309
1310 let created_at = first.created_at();
1311 if set.all().iter().any(|m| m.created_at() != created_at) {
1312 return Err(CliError::Other(
1313 "--init can't be used with existing migrations".into(),
1314 ));
1315 }
1316
1317 Ok(())
1318}
1319
1320#[cfg(feature = "rusqlite")]
1325fn init_sqlite_metadata(path: &str, set: &MigrationSet) -> Result<(), CliError> {
1326 let conn = rusqlite::Connection::open(path).map_err(|e| {
1327 CliError::ConnectionError(format!("Failed to open SQLite database '{}': {}", path, e))
1328 })?;
1329
1330 conn.execute(&set.create_table_sql(), []).map_err(|e| {
1331 CliError::MigrationError(format!("Failed to create migrations table: {}", e))
1332 })?;
1333
1334 let applied_hashes = query_applied_hashes_sqlite(&conn, set)?;
1335 validate_init_metadata(&applied_hashes, set)?;
1336
1337 let first = set
1338 .all()
1339 .first()
1340 .ok_or_else(|| CliError::Other("--init can't be used with empty migrations".into()))?;
1341
1342 conn.execute(
1343 &set.record_migration_sql(first.hash(), first.created_at()),
1344 [],
1345 )
1346 .map_err(|e| CliError::MigrationError(e.to_string()))?;
1347
1348 Ok(())
1349}
1350
1351#[cfg(feature = "libsql")]
1352fn init_libsql_local_metadata(path: &str, set: &MigrationSet) -> Result<(), CliError> {
1353 let rt = tokio::runtime::Builder::new_current_thread()
1354 .enable_all()
1355 .build()
1356 .map_err(|e| CliError::Other(format!("Failed to create async runtime: {}", e)))?;
1357
1358 rt.block_on(init_libsql_local_metadata_inner(path, set))
1359}
1360
1361#[cfg(feature = "libsql")]
1362async fn init_libsql_local_metadata_inner(path: &str, set: &MigrationSet) -> Result<(), CliError> {
1363 let db = libsql::Builder::new_local(path)
1364 .build()
1365 .await
1366 .map_err(|e| {
1367 CliError::ConnectionError(format!("Failed to open LibSQL database '{}': {}", path, e))
1368 })?;
1369
1370 let conn = db
1371 .connect()
1372 .map_err(|e| CliError::ConnectionError(e.to_string()))?;
1373
1374 conn.execute(&set.create_table_sql(), ())
1375 .await
1376 .map_err(|e| {
1377 CliError::MigrationError(format!("Failed to create migrations table: {}", e))
1378 })?;
1379
1380 let applied_hashes = query_applied_hashes_libsql(&conn, set).await?;
1381 validate_init_metadata(&applied_hashes, set)?;
1382
1383 let first = set
1384 .all()
1385 .first()
1386 .ok_or_else(|| CliError::Other("--init can't be used with empty migrations".into()))?;
1387
1388 conn.execute(
1389 &set.record_migration_sql(first.hash(), first.created_at()),
1390 (),
1391 )
1392 .await
1393 .map_err(|e| CliError::MigrationError(e.to_string()))?;
1394
1395 Ok(())
1396}
1397
1398#[cfg(feature = "turso")]
1399fn init_turso_metadata(
1400 url: &str,
1401 auth_token: Option<&str>,
1402 set: &MigrationSet,
1403) -> Result<(), CliError> {
1404 let rt = tokio::runtime::Builder::new_current_thread()
1405 .enable_all()
1406 .build()
1407 .map_err(|e| CliError::Other(format!("Failed to create async runtime: {}", e)))?;
1408
1409 rt.block_on(init_turso_metadata_inner(url, auth_token, set))
1410}
1411
1412#[cfg(feature = "turso")]
1413async fn init_turso_metadata_inner(
1414 url: &str,
1415 auth_token: Option<&str>,
1416 set: &MigrationSet,
1417) -> Result<(), CliError> {
1418 let builder =
1419 libsql::Builder::new_remote(url.to_string(), auth_token.unwrap_or("").to_string());
1420
1421 let db = builder.build().await.map_err(|e| {
1422 CliError::ConnectionError(format!("Failed to connect to Turso '{}': {}", url, e))
1423 })?;
1424
1425 let conn = db
1426 .connect()
1427 .map_err(|e| CliError::ConnectionError(e.to_string()))?;
1428
1429 conn.execute(&set.create_table_sql(), ())
1430 .await
1431 .map_err(|e| {
1432 CliError::MigrationError(format!("Failed to create migrations table: {}", e))
1433 })?;
1434
1435 let applied_hashes = query_applied_hashes_turso(&conn, set).await?;
1436 validate_init_metadata(&applied_hashes, set)?;
1437
1438 let first = set
1439 .all()
1440 .first()
1441 .ok_or_else(|| CliError::Other("--init can't be used with empty migrations".into()))?;
1442
1443 conn.execute(
1444 &set.record_migration_sql(first.hash(), first.created_at()),
1445 (),
1446 )
1447 .await
1448 .map_err(|e| CliError::MigrationError(e.to_string()))?;
1449
1450 Ok(())
1451}
1452
1453#[cfg(feature = "postgres-sync")]
1454fn init_postgres_sync_metadata(creds: &PostgresCreds, set: &MigrationSet) -> Result<(), CliError> {
1455 let url = creds.connection_url();
1456 let mut client = postgres::Client::connect(&url, postgres::NoTls).map_err(|e| {
1457 CliError::ConnectionError(format!("Failed to connect to PostgreSQL: {}", e))
1458 })?;
1459
1460 if let Some(schema_sql) = set.create_schema_sql() {
1461 client
1462 .execute(&schema_sql, &[])
1463 .map_err(|e| CliError::MigrationError(e.to_string()))?;
1464 }
1465
1466 client
1467 .execute(&set.create_table_sql(), &[])
1468 .map_err(|e| CliError::MigrationError(e.to_string()))?;
1469
1470 let rows = client
1471 .query(&set.query_all_hashes_sql(), &[])
1472 .unwrap_or_default();
1473 let applied_hashes: Vec<String> = rows.iter().map(|r| r.get(0)).collect();
1474
1475 validate_init_metadata(&applied_hashes, set)?;
1476
1477 let first = set
1478 .all()
1479 .first()
1480 .ok_or_else(|| CliError::Other("--init can't be used with empty migrations".into()))?;
1481
1482 client
1483 .execute(
1484 &set.record_migration_sql(first.hash(), first.created_at()),
1485 &[],
1486 )
1487 .map_err(|e| CliError::MigrationError(e.to_string()))?;
1488
1489 Ok(())
1490}
1491
1492#[cfg(feature = "tokio-postgres")]
1493fn init_postgres_async_metadata(creds: &PostgresCreds, set: &MigrationSet) -> Result<(), CliError> {
1494 let rt = tokio::runtime::Builder::new_current_thread()
1495 .enable_all()
1496 .build()
1497 .map_err(|e| CliError::Other(format!("Failed to create async runtime: {}", e)))?;
1498
1499 rt.block_on(init_postgres_async_inner(creds, set))
1500}
1501
1502#[cfg(feature = "tokio-postgres")]
1503async fn init_postgres_async_inner(
1504 creds: &PostgresCreds,
1505 set: &MigrationSet,
1506) -> Result<(), CliError> {
1507 let url = creds.connection_url();
1508 let (client, connection) = tokio_postgres::connect(&url, tokio_postgres::NoTls)
1509 .await
1510 .map_err(|e| {
1511 CliError::ConnectionError(format!("Failed to connect to PostgreSQL: {}", e))
1512 })?;
1513
1514 tokio::spawn(async move {
1515 if let Err(e) = connection.await {
1516 eprintln!(
1517 "{}",
1518 output::err_line(&format!("PostgreSQL connection error: {e}"))
1519 );
1520 }
1521 });
1522
1523 if let Some(schema_sql) = set.create_schema_sql() {
1524 client
1525 .execute(&schema_sql, &[])
1526 .await
1527 .map_err(|e| CliError::MigrationError(e.to_string()))?;
1528 }
1529
1530 client
1531 .execute(&set.create_table_sql(), &[])
1532 .await
1533 .map_err(|e| CliError::MigrationError(e.to_string()))?;
1534
1535 let rows = client
1536 .query(&set.query_all_hashes_sql(), &[])
1537 .await
1538 .unwrap_or_default();
1539 let applied_hashes: Vec<String> = rows.iter().map(|r| r.get(0)).collect();
1540
1541 validate_init_metadata(&applied_hashes, set)?;
1542
1543 let first = set
1544 .all()
1545 .first()
1546 .ok_or_else(|| CliError::Other("--init can't be used with empty migrations".into()))?;
1547
1548 client
1549 .execute(
1550 &set.record_migration_sql(first.hash(), first.created_at()),
1551 &[],
1552 )
1553 .await
1554 .map_err(|e| CliError::MigrationError(e.to_string()))?;
1555
1556 Ok(())
1557}
1558
1559fn generate_introspect_migration(
1561 prev: &Snapshot,
1562 current: &Snapshot,
1563 _breakpoints: bool,
1564) -> Result<Vec<String>, CliError> {
1565 match (prev, current) {
1566 (Snapshot::Sqlite(prev_snap), Snapshot::Sqlite(curr_snap)) => {
1567 use drizzle_migrations::sqlite::diff_snapshots;
1568 use drizzle_migrations::sqlite::statements::SqliteGenerator;
1569
1570 let diff = diff_snapshots(prev_snap, curr_snap);
1571 let generator = SqliteGenerator::new().with_breakpoints(true);
1572 Ok(generator.generate_migration(&diff))
1573 }
1574 (Snapshot::Postgres(prev_snap), Snapshot::Postgres(curr_snap)) => {
1575 use drizzle_migrations::postgres::diff_full_snapshots;
1576 use drizzle_migrations::postgres::statements::PostgresGenerator;
1577
1578 let diff = diff_full_snapshots(prev_snap, curr_snap);
1579 let generator = PostgresGenerator::new().with_breakpoints(true);
1580 Ok(generator.generate(&diff.diffs))
1581 }
1582 _ => Err(CliError::DialectMismatch),
1583 }
1584}
1585
1586fn introspect_database(
1588 credentials: &Credentials,
1589 dialect: Dialect,
1590) -> Result<IntrospectResult, CliError> {
1591 match dialect {
1592 Dialect::Sqlite | Dialect::Turso => introspect_sqlite_dialect(credentials),
1593 Dialect::Postgresql => introspect_postgres_dialect(credentials),
1594 }
1595}
1596
1597fn introspect_sqlite_dialect(credentials: &Credentials) -> Result<IntrospectResult, CliError> {
1599 match credentials {
1600 #[cfg(feature = "rusqlite")]
1601 Credentials::Sqlite { path } => introspect_rusqlite(path),
1602
1603 #[cfg(not(feature = "rusqlite"))]
1604 Credentials::Sqlite { .. } => Err(CliError::MissingDriver {
1605 dialect: "SQLite",
1606 feature: "rusqlite",
1607 }),
1608
1609 #[cfg(any(feature = "libsql", feature = "turso"))]
1610 Credentials::Turso { url, auth_token } => {
1611 let _auth_token = auth_token.as_deref();
1612 if is_local_libsql(url) {
1613 #[cfg(feature = "libsql")]
1614 {
1615 introspect_libsql_local(url)
1616 }
1617 #[cfg(not(feature = "libsql"))]
1618 {
1619 Err(CliError::MissingDriver {
1620 dialect: "LibSQL (local)",
1621 feature: "libsql",
1622 })
1623 }
1624 } else {
1625 #[cfg(feature = "turso")]
1626 {
1627 introspect_turso(url, _auth_token)
1628 }
1629 #[cfg(not(feature = "turso"))]
1630 {
1631 Err(CliError::MissingDriver {
1632 dialect: "Turso (remote)",
1633 feature: "turso",
1634 })
1635 }
1636 }
1637 }
1638
1639 #[cfg(all(not(feature = "turso"), not(feature = "libsql")))]
1640 Credentials::Turso { .. } => Err(CliError::MissingDriver {
1641 dialect: "Turso",
1642 feature: "turso or libsql",
1643 }),
1644
1645 _ => Err(CliError::Other(
1646 "SQLite introspection requires sqlite path or turso credentials".into(),
1647 )),
1648 }
1649}
1650
1651fn introspect_postgres_dialect(credentials: &Credentials) -> Result<IntrospectResult, CliError> {
1653 match credentials {
1654 #[cfg(feature = "postgres-sync")]
1655 Credentials::Postgres(creds) => introspect_postgres_sync(creds),
1656
1657 #[cfg(all(not(feature = "postgres-sync"), feature = "tokio-postgres"))]
1658 Credentials::Postgres(creds) => introspect_postgres_async(creds),
1659
1660 #[cfg(all(not(feature = "postgres-sync"), not(feature = "tokio-postgres")))]
1661 Credentials::Postgres(_) => Err(CliError::MissingDriver {
1662 dialect: "PostgreSQL",
1663 feature: "postgres-sync or tokio-postgres",
1664 }),
1665
1666 _ => Err(CliError::Other(
1667 "PostgreSQL introspection requires postgres credentials".into(),
1668 )),
1669 }
1670}
1671
1672#[cfg(feature = "rusqlite")]
1677fn introspect_rusqlite(path: &str) -> Result<IntrospectResult, CliError> {
1678 use drizzle_migrations::sqlite::{
1679 SQLiteDDL, Table, View,
1680 codegen::{CodegenOptions, generate_rust_schema},
1681 introspect::{
1682 RawColumnInfo, RawForeignKey, RawIndexColumn, RawIndexInfo, RawViewInfo,
1683 parse_generated_columns_from_table_sql, parse_view_sql, process_columns,
1684 process_foreign_keys, process_indexes, queries,
1685 },
1686 };
1687 use std::collections::{HashMap, HashSet};
1688
1689 let conn = rusqlite::Connection::open(path).map_err(|e| {
1690 CliError::ConnectionError(format!("Failed to open SQLite database '{}': {}", path, e))
1691 })?;
1692
1693 let mut tables_stmt = conn
1695 .prepare(queries::TABLES_QUERY)
1696 .map_err(|e| CliError::Other(format!("Failed to prepare tables query: {}", e)))?;
1697
1698 let tables: Vec<(String, Option<String>)> = tables_stmt
1699 .query_map([], |row| Ok((row.get(0)?, row.get(1)?)))
1700 .map_err(|e| CliError::Other(e.to_string()))?
1701 .filter_map(Result::ok)
1702 .collect();
1703
1704 let table_sql_map: HashMap<String, String> = tables
1705 .iter()
1706 .filter_map(|(name, sql)| sql.as_ref().map(|s| (name.clone(), s.clone())))
1707 .collect();
1708
1709 let mut columns_stmt = conn
1711 .prepare(queries::COLUMNS_QUERY)
1712 .map_err(|e| CliError::Other(format!("Failed to prepare columns query: {}", e)))?;
1713
1714 let raw_columns: Vec<RawColumnInfo> = columns_stmt
1715 .query_map([], |row| {
1716 Ok(RawColumnInfo {
1717 table: row.get(0)?,
1718 cid: row.get(1)?,
1719 name: row.get(2)?,
1720 column_type: row.get(3)?,
1721 not_null: row.get(4)?,
1722 default_value: row.get(5)?,
1723 pk: row.get(6)?,
1724 hidden: row.get(7)?,
1725 sql: row.get(8)?,
1726 })
1727 })
1728 .map_err(|e| CliError::Other(e.to_string()))?
1729 .filter_map(Result::ok)
1730 .collect();
1731
1732 let mut all_indexes: Vec<RawIndexInfo> = Vec::new();
1734 let mut all_index_columns: Vec<RawIndexColumn> = Vec::new();
1735 let mut all_fks: Vec<RawForeignKey> = Vec::new();
1736 let mut all_views: Vec<RawViewInfo> = Vec::new();
1737
1738 for (table_name, _) in &tables {
1739 if let Ok(mut idx_stmt) = conn.prepare(&queries::indexes_query(table_name)) {
1741 let indexes: Vec<RawIndexInfo> = idx_stmt
1742 .query_map([], |row| {
1743 Ok(RawIndexInfo {
1744 table: table_name.clone(),
1745 name: row.get(1)?,
1746 unique: row.get::<_, i32>(2)? != 0,
1747 origin: row.get(3)?,
1748 partial: row.get::<_, i32>(4)? != 0,
1749 })
1750 })
1751 .map_err(|e| CliError::Other(e.to_string()))?
1752 .filter_map(Result::ok)
1753 .collect();
1754
1755 for idx in &indexes {
1757 if let Ok(mut col_stmt) = conn.prepare(&queries::index_info_query(&idx.name))
1758 && let Ok(col_iter) = col_stmt.query_map([], |row| {
1759 Ok(RawIndexColumn {
1760 index_name: idx.name.clone(),
1761 seqno: row.get(0)?,
1762 cid: row.get(1)?,
1763 name: row.get(2)?,
1764 desc: row.get::<_, i32>(3)? != 0,
1765 coll: row.get(4)?,
1766 key: row.get::<_, i32>(5)? != 0,
1767 })
1768 })
1769 {
1770 all_index_columns.extend(col_iter.filter_map(Result::ok));
1771 }
1772 }
1773 all_indexes.extend(indexes);
1774 }
1775
1776 if let Ok(mut fk_stmt) = conn.prepare(&queries::foreign_keys_query(table_name))
1778 && let Ok(fk_iter) = fk_stmt.query_map([], |row| {
1779 Ok(RawForeignKey {
1780 table: table_name.clone(),
1781 id: row.get(0)?,
1782 seq: row.get(1)?,
1783 to_table: row.get(2)?,
1784 from_column: row.get(3)?,
1785 to_column: row.get(4)?,
1786 on_update: row.get(5)?,
1787 on_delete: row.get(6)?,
1788 r#match: row.get(7)?,
1789 })
1790 })
1791 {
1792 all_fks.extend(fk_iter.filter_map(Result::ok));
1793 }
1794 }
1795
1796 if let Ok(mut views_stmt) = conn.prepare(queries::VIEWS_QUERY)
1798 && let Ok(view_iter) = views_stmt.query_map([], |row| {
1799 Ok(RawViewInfo {
1800 name: row.get(0)?,
1801 sql: row.get(1)?,
1802 })
1803 })
1804 {
1805 all_views.extend(view_iter.filter_map(Result::ok));
1806 }
1807
1808 let mut generated_columns: HashMap<String, drizzle_migrations::sqlite::ddl::ParsedGenerated> =
1810 HashMap::new();
1811 for (table, sql) in &table_sql_map {
1812 generated_columns.extend(parse_generated_columns_from_table_sql(table, sql));
1813 }
1814 let pk_columns: HashSet<(String, String)> = raw_columns
1815 .iter()
1816 .filter(|c| c.pk > 0)
1817 .map(|c| (c.table.clone(), c.name.clone()))
1818 .collect();
1819
1820 let (columns, primary_keys) = process_columns(&raw_columns, &generated_columns, &pk_columns);
1821 let indexes = process_indexes(&all_indexes, &all_index_columns, &table_sql_map);
1822 let foreign_keys = process_foreign_keys(&all_fks);
1823
1824 let uniques = process_sqlite_uniques_from_indexes(&all_indexes, &all_index_columns);
1826
1827 let mut ddl = SQLiteDDL::new();
1829
1830 for (table_name, table_sql) in &tables {
1831 let mut table = Table::new(table_name.clone());
1832 if let Some(sql) = table_sql {
1834 let sql_upper = sql.to_uppercase();
1835 table.strict = sql_upper.contains(" STRICT");
1836 table.without_rowid = sql_upper.contains("WITHOUT ROWID");
1837 }
1838 ddl.tables.push(table);
1839 }
1840
1841 for col in columns {
1842 ddl.columns.push(col);
1843 }
1844
1845 for idx in indexes {
1846 ddl.indexes.push(idx);
1847 }
1848
1849 for fk in foreign_keys {
1850 ddl.fks.push(fk);
1851 }
1852
1853 for pk in primary_keys {
1854 ddl.pks.push(pk);
1855 }
1856
1857 for u in uniques {
1858 ddl.uniques.push(u);
1859 }
1860
1861 for v in all_views {
1863 let mut view = View::new(v.name);
1864 if let Some(def) = parse_view_sql(&v.sql) {
1865 view.definition = Some(def.into());
1866 } else {
1867 view.error = Some("Failed to parse view SQL".into());
1868 }
1869 ddl.views.push(view);
1870 }
1871
1872 let options = CodegenOptions {
1874 module_doc: Some(format!("Schema introspected from {}", path)),
1875 include_schema: true,
1876 schema_name: "Schema".to_string(),
1877 use_pub: true,
1878 };
1879
1880 let generated = generate_rust_schema(&ddl, &options);
1881
1882 let mut sqlite_snapshot = drizzle_migrations::sqlite::SQLiteSnapshot::new();
1884 for entity in ddl.to_entities() {
1885 sqlite_snapshot.add_entity(entity);
1886 }
1887 let snapshot = Snapshot::Sqlite(sqlite_snapshot);
1888
1889 Ok(IntrospectResult {
1890 schema_code: generated.code,
1891 table_count: generated.tables.len(),
1892 index_count: generated.indexes.len(),
1893 view_count: ddl.views.len(),
1894 warnings: generated.warnings,
1895 snapshot,
1896 snapshot_path: std::path::PathBuf::new(),
1897 })
1898}
1899
1900#[cfg(feature = "libsql")]
1905fn introspect_libsql_local(path: &str) -> Result<IntrospectResult, CliError> {
1906 let rt = tokio::runtime::Builder::new_current_thread()
1907 .enable_all()
1908 .build()
1909 .map_err(|e| CliError::Other(format!("Failed to create async runtime: {}", e)))?;
1910
1911 rt.block_on(introspect_libsql_inner(path, None))
1912}
1913
1914#[cfg(feature = "libsql")]
1915async fn introspect_libsql_inner(
1916 path: &str,
1917 _auth_token: Option<&str>,
1918) -> Result<IntrospectResult, CliError> {
1919 use drizzle_migrations::sqlite::{
1920 SQLiteDDL, Table, View,
1921 codegen::{CodegenOptions, generate_rust_schema},
1922 introspect::{
1923 RawColumnInfo, RawForeignKey, RawIndexColumn, RawIndexInfo, RawViewInfo,
1924 parse_generated_columns_from_table_sql, parse_view_sql, process_columns,
1925 process_foreign_keys, process_indexes, queries,
1926 },
1927 };
1928 use std::collections::{HashMap, HashSet};
1929
1930 let db = libsql::Builder::new_local(path)
1931 .build()
1932 .await
1933 .map_err(|e| {
1934 CliError::ConnectionError(format!("Failed to open LibSQL database '{}': {}", path, e))
1935 })?;
1936
1937 let conn = db
1938 .connect()
1939 .map_err(|e| CliError::ConnectionError(e.to_string()))?;
1940
1941 let mut tables_rows = conn
1943 .query(queries::TABLES_QUERY, ())
1944 .await
1945 .map_err(|e| CliError::Other(format!("Failed to query tables: {}", e)))?;
1946
1947 let mut tables: Vec<(String, Option<String>)> = Vec::new();
1948 while let Ok(Some(row)) = tables_rows.next().await {
1949 let name: String = row.get(0).unwrap_or_default();
1950 let sql: Option<String> = row.get(1).ok();
1951 tables.push((name, sql));
1952 }
1953
1954 let table_sql_map: HashMap<String, String> = tables
1955 .iter()
1956 .filter_map(|(name, sql)| sql.as_ref().map(|s| (name.clone(), s.clone())))
1957 .collect();
1958
1959 let mut columns_rows = conn
1961 .query(queries::COLUMNS_QUERY, ())
1962 .await
1963 .map_err(|e| CliError::Other(format!("Failed to query columns: {}", e)))?;
1964
1965 let mut raw_columns: Vec<RawColumnInfo> = Vec::new();
1966 while let Ok(Some(row)) = columns_rows.next().await {
1967 raw_columns.push(RawColumnInfo {
1968 table: row.get(0).unwrap_or_default(),
1969 cid: row.get(1).unwrap_or(0),
1970 name: row.get(2).unwrap_or_default(),
1971 column_type: row.get(3).unwrap_or_default(),
1972 not_null: row.get::<i32>(4).unwrap_or(0) != 0,
1973 default_value: row.get(5).ok(),
1974 pk: row.get(6).unwrap_or(0),
1975 hidden: row.get(7).unwrap_or(0),
1976 sql: row.get(8).ok(),
1977 });
1978 }
1979
1980 let mut all_indexes: Vec<RawIndexInfo> = Vec::new();
1982 let mut all_index_columns: Vec<RawIndexColumn> = Vec::new();
1983 let mut all_fks: Vec<RawForeignKey> = Vec::new();
1984 let mut all_views: Vec<RawViewInfo> = Vec::new();
1985
1986 for (table_name, _) in &tables {
1987 if let Ok(mut idx_rows) = conn.query(&queries::indexes_query(table_name), ()).await {
1989 while let Ok(Some(row)) = idx_rows.next().await {
1990 let idx = RawIndexInfo {
1991 table: table_name.clone(),
1992 name: row.get(1).unwrap_or_default(),
1993 unique: row.get::<i32>(2).unwrap_or(0) != 0,
1994 origin: row.get(3).unwrap_or_default(),
1995 partial: row.get::<i32>(4).unwrap_or(0) != 0,
1996 };
1997
1998 if let Ok(mut col_rows) =
2000 conn.query(&queries::index_info_query(&idx.name), ()).await
2001 {
2002 while let Ok(Some(col_row)) = col_rows.next().await {
2003 all_index_columns.push(RawIndexColumn {
2004 index_name: idx.name.clone(),
2005 seqno: col_row.get(0).unwrap_or(0),
2006 cid: col_row.get(1).unwrap_or(0),
2007 name: col_row.get(2).ok(),
2008 desc: col_row.get::<i32>(3).unwrap_or(0) != 0,
2009 coll: col_row.get(4).unwrap_or_default(),
2010 key: col_row.get::<i32>(5).unwrap_or(0) != 0,
2011 });
2012 }
2013 }
2014
2015 all_indexes.push(idx);
2016 }
2017 }
2018
2019 if let Ok(mut fk_rows) = conn
2021 .query(&queries::foreign_keys_query(table_name), ())
2022 .await
2023 {
2024 while let Ok(Some(row)) = fk_rows.next().await {
2025 all_fks.push(RawForeignKey {
2026 table: table_name.clone(),
2027 id: row.get(0).unwrap_or(0),
2028 seq: row.get(1).unwrap_or(0),
2029 to_table: row.get(2).unwrap_or_default(),
2030 from_column: row.get(3).unwrap_or_default(),
2031 to_column: row.get(4).unwrap_or_default(),
2032 on_update: row.get(5).unwrap_or_default(),
2033 on_delete: row.get(6).unwrap_or_default(),
2034 r#match: row.get(7).unwrap_or_default(),
2035 });
2036 }
2037 }
2038 }
2039
2040 if let Ok(mut views_rows) = conn.query(queries::VIEWS_QUERY, ()).await {
2042 while let Ok(Some(row)) = views_rows.next().await {
2043 let name: String = row.get(0).unwrap_or_default();
2044 let sql: String = row.get(1).unwrap_or_default();
2045 all_views.push(RawViewInfo { name, sql });
2046 }
2047 }
2048
2049 let mut generated_columns: HashMap<String, drizzle_migrations::sqlite::ddl::ParsedGenerated> =
2051 HashMap::new();
2052 for (table, sql) in &table_sql_map {
2053 generated_columns.extend(parse_generated_columns_from_table_sql(table, sql));
2054 }
2055 let pk_columns: HashSet<(String, String)> = raw_columns
2056 .iter()
2057 .filter(|c| c.pk > 0)
2058 .map(|c| (c.table.clone(), c.name.clone()))
2059 .collect();
2060
2061 let (columns, primary_keys) = process_columns(&raw_columns, &generated_columns, &pk_columns);
2062 let indexes = process_indexes(&all_indexes, &all_index_columns, &table_sql_map);
2063 let foreign_keys = process_foreign_keys(&all_fks);
2064 let uniques = process_sqlite_uniques_from_indexes(&all_indexes, &all_index_columns);
2065
2066 let mut ddl = SQLiteDDL::new();
2067
2068 for (table_name, table_sql) in &tables {
2069 let mut table = Table::new(table_name.clone());
2070 if let Some(sql) = table_sql {
2071 let sql_upper = sql.to_uppercase();
2072 table.strict = sql_upper.contains(" STRICT");
2073 table.without_rowid = sql_upper.contains("WITHOUT ROWID");
2074 }
2075 ddl.tables.push(table);
2076 }
2077
2078 for col in columns {
2079 ddl.columns.push(col);
2080 }
2081 for idx in indexes {
2082 ddl.indexes.push(idx);
2083 }
2084 for fk in foreign_keys {
2085 ddl.fks.push(fk);
2086 }
2087 for pk in primary_keys {
2088 ddl.pks.push(pk);
2089 }
2090
2091 for u in uniques {
2092 ddl.uniques.push(u);
2093 }
2094
2095 for v in all_views {
2096 let mut view = View::new(v.name);
2097 if let Some(def) = parse_view_sql(&v.sql) {
2098 view.definition = Some(def.into());
2099 } else {
2100 view.error = Some("Failed to parse view SQL".into());
2101 }
2102 ddl.views.push(view);
2103 }
2104
2105 let options = CodegenOptions {
2106 module_doc: Some(format!("Schema introspected from {}", path)),
2107 include_schema: true,
2108 schema_name: "Schema".to_string(),
2109 use_pub: true,
2110 };
2111
2112 let generated = generate_rust_schema(&ddl, &options);
2113
2114 let mut sqlite_snapshot = drizzle_migrations::sqlite::SQLiteSnapshot::new();
2116 for entity in ddl.to_entities() {
2117 sqlite_snapshot.add_entity(entity);
2118 }
2119 let snapshot = Snapshot::Sqlite(sqlite_snapshot);
2120
2121 Ok(IntrospectResult {
2122 schema_code: generated.code,
2123 table_count: generated.tables.len(),
2124 index_count: generated.indexes.len(),
2125 view_count: ddl.views.len(),
2126 warnings: generated.warnings,
2127 snapshot,
2128 snapshot_path: std::path::PathBuf::new(),
2129 })
2130}
2131
2132#[cfg(feature = "turso")]
2137fn introspect_turso(url: &str, auth_token: Option<&str>) -> Result<IntrospectResult, CliError> {
2138 let rt = tokio::runtime::Builder::new_current_thread()
2139 .enable_all()
2140 .build()
2141 .map_err(|e| CliError::Other(format!("Failed to create async runtime: {}", e)))?;
2142
2143 rt.block_on(introspect_turso_inner(url, auth_token))
2144}
2145
2146#[cfg(feature = "turso")]
2147async fn introspect_turso_inner(
2148 url: &str,
2149 auth_token: Option<&str>,
2150) -> Result<IntrospectResult, CliError> {
2151 use drizzle_migrations::sqlite::{
2152 SQLiteDDL, Table, View,
2153 codegen::{CodegenOptions, generate_rust_schema},
2154 introspect::{
2155 RawColumnInfo, RawForeignKey, RawIndexColumn, RawIndexInfo, RawViewInfo,
2156 parse_generated_columns_from_table_sql, parse_view_sql, process_columns,
2157 process_foreign_keys, process_indexes, queries,
2158 },
2159 };
2160 use std::collections::{HashMap, HashSet};
2161
2162 let builder =
2163 libsql::Builder::new_remote(url.to_string(), auth_token.unwrap_or("").to_string());
2164
2165 let db = builder.build().await.map_err(|e| {
2166 CliError::ConnectionError(format!("Failed to connect to Turso '{}': {}", url, e))
2167 })?;
2168
2169 let conn = db
2170 .connect()
2171 .map_err(|e| CliError::ConnectionError(e.to_string()))?;
2172
2173 let mut tables_rows = conn
2175 .query(queries::TABLES_QUERY, ())
2176 .await
2177 .map_err(|e| CliError::Other(format!("Failed to query tables: {}", e)))?;
2178
2179 let mut tables: Vec<(String, Option<String>)> = Vec::new();
2180 while let Ok(Some(row)) = tables_rows.next().await {
2181 let name: String = row.get(0).unwrap_or_default();
2182 let sql: Option<String> = row.get(1).ok();
2183 tables.push((name, sql));
2184 }
2185
2186 let table_sql_map: HashMap<String, String> = tables
2187 .iter()
2188 .filter_map(|(name, sql)| sql.as_ref().map(|s| (name.clone(), s.clone())))
2189 .collect();
2190
2191 let mut columns_rows = conn
2193 .query(queries::COLUMNS_QUERY, ())
2194 .await
2195 .map_err(|e| CliError::Other(format!("Failed to query columns: {}", e)))?;
2196
2197 let mut raw_columns: Vec<RawColumnInfo> = Vec::new();
2198 while let Ok(Some(row)) = columns_rows.next().await {
2199 raw_columns.push(RawColumnInfo {
2200 table: row.get(0).unwrap_or_default(),
2201 cid: row.get(1).unwrap_or(0),
2202 name: row.get(2).unwrap_or_default(),
2203 column_type: row.get(3).unwrap_or_default(),
2204 not_null: row.get::<i32>(4).unwrap_or(0) != 0,
2205 default_value: row.get(5).ok(),
2206 pk: row.get(6).unwrap_or(0),
2207 hidden: row.get(7).unwrap_or(0),
2208 sql: row.get(8).ok(),
2209 });
2210 }
2211
2212 let mut all_indexes: Vec<RawIndexInfo> = Vec::new();
2214 let mut all_index_columns: Vec<RawIndexColumn> = Vec::new();
2215 let mut all_fks: Vec<RawForeignKey> = Vec::new();
2216 let mut all_views: Vec<RawViewInfo> = Vec::new();
2217
2218 for (table_name, _) in &tables {
2219 if let Ok(mut idx_rows) = conn.query(&queries::indexes_query(table_name), ()).await {
2221 while let Ok(Some(row)) = idx_rows.next().await {
2222 let idx = RawIndexInfo {
2223 table: table_name.clone(),
2224 name: row.get(1).unwrap_or_default(),
2225 unique: row.get::<i32>(2).unwrap_or(0) != 0,
2226 origin: row.get(3).unwrap_or_default(),
2227 partial: row.get::<i32>(4).unwrap_or(0) != 0,
2228 };
2229
2230 if let Ok(mut col_rows) =
2232 conn.query(&queries::index_info_query(&idx.name), ()).await
2233 {
2234 while let Ok(Some(col_row)) = col_rows.next().await {
2235 all_index_columns.push(RawIndexColumn {
2236 index_name: idx.name.clone(),
2237 seqno: col_row.get(0).unwrap_or(0),
2238 cid: col_row.get(1).unwrap_or(0),
2239 name: col_row.get(2).ok(),
2240 desc: col_row.get::<i32>(3).unwrap_or(0) != 0,
2241 coll: col_row.get(4).unwrap_or_default(),
2242 key: col_row.get::<i32>(5).unwrap_or(0) != 0,
2243 });
2244 }
2245 }
2246
2247 all_indexes.push(idx);
2248 }
2249 }
2250
2251 if let Ok(mut fk_rows) = conn
2253 .query(&queries::foreign_keys_query(table_name), ())
2254 .await
2255 {
2256 while let Ok(Some(row)) = fk_rows.next().await {
2257 all_fks.push(RawForeignKey {
2258 table: table_name.clone(),
2259 id: row.get(0).unwrap_or(0),
2260 seq: row.get(1).unwrap_or(0),
2261 to_table: row.get(2).unwrap_or_default(),
2262 from_column: row.get(3).unwrap_or_default(),
2263 to_column: row.get(4).unwrap_or_default(),
2264 on_update: row.get(5).unwrap_or_default(),
2265 on_delete: row.get(6).unwrap_or_default(),
2266 r#match: row.get(7).unwrap_or_default(),
2267 });
2268 }
2269 }
2270 }
2271
2272 if let Ok(mut views_rows) = conn.query(queries::VIEWS_QUERY, ()).await {
2274 while let Ok(Some(row)) = views_rows.next().await {
2275 let name: String = row.get(0).unwrap_or_default();
2276 let sql: String = row.get(1).unwrap_or_default();
2277 all_views.push(RawViewInfo { name, sql });
2278 }
2279 }
2280
2281 let mut generated_columns: HashMap<String, drizzle_migrations::sqlite::ddl::ParsedGenerated> =
2283 HashMap::new();
2284 for (table, sql) in &table_sql_map {
2285 generated_columns.extend(parse_generated_columns_from_table_sql(table, sql));
2286 }
2287 let pk_columns: HashSet<(String, String)> = raw_columns
2288 .iter()
2289 .filter(|c| c.pk > 0)
2290 .map(|c| (c.table.clone(), c.name.clone()))
2291 .collect();
2292
2293 let (columns, primary_keys) = process_columns(&raw_columns, &generated_columns, &pk_columns);
2294 let indexes = process_indexes(&all_indexes, &all_index_columns, &table_sql_map);
2295 let foreign_keys = process_foreign_keys(&all_fks);
2296 let uniques = process_sqlite_uniques_from_indexes(&all_indexes, &all_index_columns);
2297
2298 let mut ddl = SQLiteDDL::new();
2299
2300 for (table_name, table_sql) in &tables {
2301 let mut table = Table::new(table_name.clone());
2302 if let Some(sql) = table_sql {
2303 let sql_upper = sql.to_uppercase();
2304 table.strict = sql_upper.contains(" STRICT");
2305 table.without_rowid = sql_upper.contains("WITHOUT ROWID");
2306 }
2307 ddl.tables.push(table);
2308 }
2309
2310 for col in columns {
2311 ddl.columns.push(col);
2312 }
2313 for idx in indexes {
2314 ddl.indexes.push(idx);
2315 }
2316 for fk in foreign_keys {
2317 ddl.fks.push(fk);
2318 }
2319 for pk in primary_keys {
2320 ddl.pks.push(pk);
2321 }
2322
2323 for u in uniques {
2324 ddl.uniques.push(u);
2325 }
2326
2327 for v in all_views {
2328 let mut view = View::new(v.name);
2329 if let Some(def) = parse_view_sql(&v.sql) {
2330 view.definition = Some(def.into());
2331 } else {
2332 view.error = Some("Failed to parse view SQL".into());
2333 }
2334 ddl.views.push(view);
2335 }
2336
2337 let options = CodegenOptions {
2338 module_doc: Some(format!("Schema introspected from Turso: {}", url)),
2339 include_schema: true,
2340 schema_name: "Schema".to_string(),
2341 use_pub: true,
2342 };
2343
2344 let generated = generate_rust_schema(&ddl, &options);
2345
2346 let mut sqlite_snapshot = drizzle_migrations::sqlite::SQLiteSnapshot::new();
2348 for entity in ddl.to_entities() {
2349 sqlite_snapshot.add_entity(entity);
2350 }
2351 let snapshot = Snapshot::Sqlite(sqlite_snapshot);
2352
2353 Ok(IntrospectResult {
2354 schema_code: generated.code,
2355 table_count: generated.tables.len(),
2356 index_count: generated.indexes.len(),
2357 view_count: ddl.views.len(),
2358 warnings: generated.warnings,
2359 snapshot,
2360 snapshot_path: std::path::PathBuf::new(),
2361 })
2362}
2363
2364#[cfg(feature = "postgres-sync")]
2369fn introspect_postgres_sync(creds: &PostgresCreds) -> Result<IntrospectResult, CliError> {
2370 use drizzle_migrations::postgres::{
2371 PostgresDDL,
2372 codegen::{CodegenOptions, generate_rust_schema},
2373 ddl::Schema,
2374 introspect::{
2375 RawCheckInfo, RawColumnInfo, RawEnumInfo, RawForeignKeyInfo, RawIndexInfo,
2376 RawPolicyInfo, RawPrimaryKeyInfo, RawRoleInfo, RawSequenceInfo, RawTableInfo,
2377 RawUniqueInfo, RawViewInfo, process_check_constraints, process_columns, process_enums,
2378 process_foreign_keys, process_indexes, process_policies, process_primary_keys,
2379 process_roles, process_sequences, process_tables, process_unique_constraints,
2380 process_views,
2381 },
2382 };
2383
2384 let url = creds.connection_url();
2385 let mut client = postgres::Client::connect(&url, postgres::NoTls).map_err(|e| {
2386 CliError::ConnectionError(format!("Failed to connect to PostgreSQL: {}", e))
2387 })?;
2388
2389 let raw_schemas: Vec<RawSchemaInfo> = client
2391 .query(
2392 drizzle_migrations::postgres::introspect::queries::SCHEMAS_QUERY,
2393 &[],
2394 )
2395 .map_err(|e| CliError::Other(format!("Failed to query schemas: {}", e)))?
2396 .into_iter()
2397 .map(|row| RawSchemaInfo {
2398 name: row.get::<_, String>(0),
2399 })
2400 .collect();
2401
2402 let raw_tables: Vec<RawTableInfo> = client
2404 .query(
2405 drizzle_migrations::postgres::introspect::queries::TABLES_QUERY,
2406 &[],
2407 )
2408 .map_err(|e| CliError::Other(format!("Failed to query tables: {}", e)))?
2409 .into_iter()
2410 .map(|row| RawTableInfo {
2411 schema: row.get::<_, String>(0),
2412 name: row.get::<_, String>(1),
2413 is_rls_enabled: row.get::<_, bool>(2),
2414 })
2415 .collect();
2416
2417 let raw_columns: Vec<RawColumnInfo> = client
2419 .query(
2420 drizzle_migrations::postgres::introspect::queries::COLUMNS_QUERY,
2421 &[],
2422 )
2423 .map_err(|e| CliError::Other(format!("Failed to query columns: {}", e)))?
2424 .into_iter()
2425 .map(|row| RawColumnInfo {
2426 schema: row.get::<_, String>(0),
2427 table: row.get::<_, String>(1),
2428 name: row.get::<_, String>(2),
2429 column_type: row.get::<_, String>(3),
2430 type_schema: row.get::<_, Option<String>>(4),
2431 not_null: row.get::<_, bool>(5),
2432 default_value: row.get::<_, Option<String>>(6),
2433 is_identity: row.get::<_, bool>(7),
2434 identity_type: row.get::<_, Option<String>>(8),
2435 is_generated: row.get::<_, bool>(9),
2436 generated_expression: row.get::<_, Option<String>>(10),
2437 ordinal_position: row.get::<_, i32>(11),
2438 })
2439 .collect();
2440
2441 let raw_enums: Vec<RawEnumInfo> = client
2443 .query(
2444 drizzle_migrations::postgres::introspect::queries::ENUMS_QUERY,
2445 &[],
2446 )
2447 .map_err(|e| CliError::Other(format!("Failed to query enums: {}", e)))?
2448 .into_iter()
2449 .map(|row| RawEnumInfo {
2450 schema: row.get::<_, String>(0),
2451 name: row.get::<_, String>(1),
2452 values: row.get::<_, Vec<String>>(2),
2453 })
2454 .collect();
2455
2456 let raw_sequences: Vec<RawSequenceInfo> = client
2458 .query(
2459 drizzle_migrations::postgres::introspect::queries::SEQUENCES_QUERY,
2460 &[],
2461 )
2462 .map_err(|e| CliError::Other(format!("Failed to query sequences: {}", e)))?
2463 .into_iter()
2464 .map(|row| RawSequenceInfo {
2465 schema: row.get::<_, String>(0),
2466 name: row.get::<_, String>(1),
2467 data_type: row.get::<_, String>(2),
2468 start_value: row.get::<_, String>(3),
2469 min_value: row.get::<_, String>(4),
2470 max_value: row.get::<_, String>(5),
2471 increment: row.get::<_, String>(6),
2472 cycle: row.get::<_, bool>(7),
2473 cache_value: row.get::<_, String>(8),
2474 })
2475 .collect();
2476
2477 let raw_views: Vec<RawViewInfo> = client
2479 .query(
2480 drizzle_migrations::postgres::introspect::queries::VIEWS_QUERY,
2481 &[],
2482 )
2483 .map_err(|e| CliError::Other(format!("Failed to query views: {}", e)))?
2484 .into_iter()
2485 .map(|row| RawViewInfo {
2486 schema: row.get::<_, String>(0),
2487 name: row.get::<_, String>(1),
2488 definition: row.get::<_, String>(2),
2489 is_materialized: row.get::<_, bool>(3),
2490 })
2491 .collect();
2492
2493 let raw_indexes: Vec<RawIndexInfo> = client
2495 .query(POSTGRES_INDEXES_QUERY, &[])
2496 .map_err(|e| CliError::Other(format!("Failed to query indexes: {}", e)))?
2497 .into_iter()
2498 .map(|row| {
2499 let cols: Vec<String> = row.get(6);
2500 RawIndexInfo {
2501 schema: row.get::<_, String>(0),
2502 table: row.get::<_, String>(1),
2503 name: row.get::<_, String>(2),
2504 is_unique: row.get::<_, bool>(3),
2505 is_primary: row.get::<_, bool>(4),
2506 method: row.get::<_, String>(5),
2507 columns: parse_postgres_index_columns(cols),
2508 where_clause: row.get::<_, Option<String>>(7),
2509 concurrent: false,
2510 }
2511 })
2512 .collect();
2513
2514 let raw_fks: Vec<RawForeignKeyInfo> = client
2516 .query(POSTGRES_FOREIGN_KEYS_QUERY, &[])
2517 .map_err(|e| CliError::Other(format!("Failed to query foreign keys: {}", e)))?
2518 .into_iter()
2519 .map(|row| RawForeignKeyInfo {
2520 schema: row.get::<_, String>(0),
2521 table: row.get::<_, String>(1),
2522 name: row.get::<_, String>(2),
2523 columns: row.get::<_, Vec<String>>(3),
2524 schema_to: row.get::<_, String>(4),
2525 table_to: row.get::<_, String>(5),
2526 columns_to: row.get::<_, Vec<String>>(6),
2527 on_update: pg_action_code_to_string(row.get::<_, String>(7)),
2528 on_delete: pg_action_code_to_string(row.get::<_, String>(8)),
2529 })
2530 .collect();
2531
2532 let raw_pks: Vec<RawPrimaryKeyInfo> = client
2534 .query(POSTGRES_PRIMARY_KEYS_QUERY, &[])
2535 .map_err(|e| CliError::Other(format!("Failed to query primary keys: {}", e)))?
2536 .into_iter()
2537 .map(|row| RawPrimaryKeyInfo {
2538 schema: row.get::<_, String>(0),
2539 table: row.get::<_, String>(1),
2540 name: row.get::<_, String>(2),
2541 columns: row.get::<_, Vec<String>>(3),
2542 })
2543 .collect();
2544
2545 let raw_uniques: Vec<RawUniqueInfo> = client
2547 .query(POSTGRES_UNIQUES_QUERY, &[])
2548 .map_err(|e| CliError::Other(format!("Failed to query unique constraints: {}", e)))?
2549 .into_iter()
2550 .map(|row| RawUniqueInfo {
2551 schema: row.get::<_, String>(0),
2552 table: row.get::<_, String>(1),
2553 name: row.get::<_, String>(2),
2554 columns: row.get::<_, Vec<String>>(3),
2555 nulls_not_distinct: row.get::<_, bool>(4),
2556 })
2557 .collect();
2558
2559 let raw_checks: Vec<RawCheckInfo> = client
2561 .query(POSTGRES_CHECKS_QUERY, &[])
2562 .map_err(|e| CliError::Other(format!("Failed to query check constraints: {}", e)))?
2563 .into_iter()
2564 .map(|row| RawCheckInfo {
2565 schema: row.get::<_, String>(0),
2566 table: row.get::<_, String>(1),
2567 name: row.get::<_, String>(2),
2568 expression: row.get::<_, String>(3),
2569 })
2570 .collect();
2571
2572 let raw_roles: Vec<RawRoleInfo> = client
2574 .query(POSTGRES_ROLES_QUERY, &[])
2575 .map_err(|e| CliError::Other(format!("Failed to query roles: {}", e)))?
2576 .into_iter()
2577 .map(|row| RawRoleInfo {
2578 name: row.get::<_, String>(0),
2579 create_db: row.get::<_, bool>(1),
2580 create_role: row.get::<_, bool>(2),
2581 inherit: row.get::<_, bool>(3),
2582 })
2583 .collect();
2584
2585 let raw_policies: Vec<RawPolicyInfo> = client
2587 .query(POSTGRES_POLICIES_QUERY, &[])
2588 .map_err(|e| CliError::Other(format!("Failed to query policies: {}", e)))?
2589 .into_iter()
2590 .map(|row| RawPolicyInfo {
2591 schema: row.get::<_, String>(0),
2592 table: row.get::<_, String>(1),
2593 name: row.get::<_, String>(2),
2594 as_clause: row.get::<_, String>(3),
2595 for_clause: row.get::<_, String>(4),
2596 to: row.get::<_, Vec<String>>(5),
2597 using: row.get::<_, Option<String>>(6),
2598 with_check: row.get::<_, Option<String>>(7),
2599 })
2600 .collect();
2601
2602 let mut ddl = PostgresDDL::new();
2604
2605 for s in raw_schemas.into_iter().map(|s| Schema::new(s.name)) {
2606 ddl.schemas.push(s);
2607 }
2608 for e in process_enums(&raw_enums) {
2609 ddl.enums.push(e);
2610 }
2611 for s in process_sequences(&raw_sequences) {
2612 ddl.sequences.push(s);
2613 }
2614 for r in process_roles(&raw_roles) {
2615 ddl.roles.push(r);
2616 }
2617 for p in process_policies(&raw_policies) {
2618 ddl.policies.push(p);
2619 }
2620 for t in process_tables(&raw_tables) {
2621 ddl.tables.push(t);
2622 }
2623 for c in process_columns(&raw_columns) {
2624 ddl.columns.push(c);
2625 }
2626 for i in process_indexes(&raw_indexes) {
2627 ddl.indexes.push(i);
2628 }
2629 for fk in process_foreign_keys(&raw_fks) {
2630 ddl.fks.push(fk);
2631 }
2632 for pk in process_primary_keys(&raw_pks) {
2633 ddl.pks.push(pk);
2634 }
2635 for u in process_unique_constraints(&raw_uniques) {
2636 ddl.uniques.push(u);
2637 }
2638 for c in process_check_constraints(&raw_checks) {
2639 ddl.checks.push(c);
2640 }
2641 for v in process_views(&raw_views) {
2642 ddl.views.push(v);
2643 }
2644
2645 let options = CodegenOptions {
2647 module_doc: Some(format!("Schema introspected from {}", mask_url(&url))),
2648 include_schema: true,
2649 schema_name: "Schema".to_string(),
2650 use_pub: true,
2651 };
2652 let generated = generate_rust_schema(&ddl, &options);
2653
2654 let mut snap = drizzle_migrations::postgres::PostgresSnapshot::new();
2656 for entity in ddl.to_entities() {
2657 snap.add_entity(entity);
2658 }
2659
2660 Ok(IntrospectResult {
2661 schema_code: generated.code,
2662 table_count: ddl.tables.list().len(),
2663 index_count: ddl.indexes.list().len(),
2664 view_count: ddl.views.list().len(),
2665 warnings: generated.warnings,
2666 snapshot: Snapshot::Postgres(snap),
2667 snapshot_path: std::path::PathBuf::new(),
2668 })
2669}
2670
2671#[cfg(feature = "tokio-postgres")]
2672#[allow(dead_code)]
2673fn introspect_postgres_async(creds: &PostgresCreds) -> Result<IntrospectResult, CliError> {
2674 let rt = tokio::runtime::Builder::new_current_thread()
2675 .enable_all()
2676 .build()
2677 .map_err(|e| CliError::Other(format!("Failed to create async runtime: {}", e)))?;
2678
2679 rt.block_on(introspect_postgres_async_inner(creds))
2680}
2681
2682#[cfg(feature = "tokio-postgres")]
2683async fn introspect_postgres_async_inner(
2684 creds: &PostgresCreds,
2685) -> Result<IntrospectResult, CliError> {
2686 use drizzle_migrations::postgres::{
2687 PostgresDDL,
2688 codegen::{CodegenOptions, generate_rust_schema},
2689 ddl::Schema,
2690 introspect::{
2691 RawCheckInfo, RawColumnInfo, RawEnumInfo, RawForeignKeyInfo, RawIndexInfo,
2692 RawPolicyInfo, RawPrimaryKeyInfo, RawRoleInfo, RawSequenceInfo, RawTableInfo,
2693 RawUniqueInfo, RawViewInfo, process_check_constraints, process_columns, process_enums,
2694 process_foreign_keys, process_indexes, process_policies, process_primary_keys,
2695 process_roles, process_sequences, process_tables, process_unique_constraints,
2696 process_views,
2697 },
2698 };
2699
2700 let url = creds.connection_url();
2701 let (client, connection) = tokio_postgres::connect(&url, tokio_postgres::NoTls)
2702 .await
2703 .map_err(|e| {
2704 CliError::ConnectionError(format!("Failed to connect to PostgreSQL: {}", e))
2705 })?;
2706
2707 tokio::spawn(async move {
2708 if let Err(e) = connection.await {
2709 eprintln!(
2710 "{}",
2711 output::err_line(&format!("PostgreSQL connection error: {e}"))
2712 );
2713 }
2714 });
2715
2716 let raw_schemas: Vec<RawSchemaInfo> = client
2717 .query(
2718 drizzle_migrations::postgres::introspect::queries::SCHEMAS_QUERY,
2719 &[],
2720 )
2721 .await
2722 .map_err(|e| CliError::Other(format!("Failed to query schemas: {}", e)))?
2723 .into_iter()
2724 .map(|row| RawSchemaInfo {
2725 name: row.get::<_, String>(0),
2726 })
2727 .collect();
2728
2729 let raw_tables: Vec<RawTableInfo> = client
2730 .query(
2731 drizzle_migrations::postgres::introspect::queries::TABLES_QUERY,
2732 &[],
2733 )
2734 .await
2735 .map_err(|e| CliError::Other(format!("Failed to query tables: {}", e)))?
2736 .into_iter()
2737 .map(|row| RawTableInfo {
2738 schema: row.get::<_, String>(0),
2739 name: row.get::<_, String>(1),
2740 is_rls_enabled: row.get::<_, bool>(2),
2741 })
2742 .collect();
2743
2744 let raw_columns: Vec<RawColumnInfo> = client
2745 .query(
2746 drizzle_migrations::postgres::introspect::queries::COLUMNS_QUERY,
2747 &[],
2748 )
2749 .await
2750 .map_err(|e| CliError::Other(format!("Failed to query columns: {}", e)))?
2751 .into_iter()
2752 .map(|row| RawColumnInfo {
2753 schema: row.get::<_, String>(0),
2754 table: row.get::<_, String>(1),
2755 name: row.get::<_, String>(2),
2756 column_type: row.get::<_, String>(3),
2757 type_schema: row.get::<_, Option<String>>(4),
2758 not_null: row.get::<_, bool>(5),
2759 default_value: row.get::<_, Option<String>>(6),
2760 is_identity: row.get::<_, bool>(7),
2761 identity_type: row.get::<_, Option<String>>(8),
2762 is_generated: row.get::<_, bool>(9),
2763 generated_expression: row.get::<_, Option<String>>(10),
2764 ordinal_position: row.get::<_, i32>(11),
2765 })
2766 .collect();
2767
2768 let raw_enums: Vec<RawEnumInfo> = client
2769 .query(
2770 drizzle_migrations::postgres::introspect::queries::ENUMS_QUERY,
2771 &[],
2772 )
2773 .await
2774 .map_err(|e| CliError::Other(format!("Failed to query enums: {}", e)))?
2775 .into_iter()
2776 .map(|row| RawEnumInfo {
2777 schema: row.get::<_, String>(0),
2778 name: row.get::<_, String>(1),
2779 values: row.get::<_, Vec<String>>(2),
2780 })
2781 .collect();
2782
2783 let raw_sequences: Vec<RawSequenceInfo> = client
2784 .query(
2785 drizzle_migrations::postgres::introspect::queries::SEQUENCES_QUERY,
2786 &[],
2787 )
2788 .await
2789 .map_err(|e| CliError::Other(format!("Failed to query sequences: {}", e)))?
2790 .into_iter()
2791 .map(|row| RawSequenceInfo {
2792 schema: row.get::<_, String>(0),
2793 name: row.get::<_, String>(1),
2794 data_type: row.get::<_, String>(2),
2795 start_value: row.get::<_, String>(3),
2796 min_value: row.get::<_, String>(4),
2797 max_value: row.get::<_, String>(5),
2798 increment: row.get::<_, String>(6),
2799 cycle: row.get::<_, bool>(7),
2800 cache_value: row.get::<_, String>(8),
2801 })
2802 .collect();
2803
2804 let raw_views: Vec<RawViewInfo> = client
2805 .query(
2806 drizzle_migrations::postgres::introspect::queries::VIEWS_QUERY,
2807 &[],
2808 )
2809 .await
2810 .map_err(|e| CliError::Other(format!("Failed to query views: {}", e)))?
2811 .into_iter()
2812 .map(|row| RawViewInfo {
2813 schema: row.get::<_, String>(0),
2814 name: row.get::<_, String>(1),
2815 definition: row.get::<_, String>(2),
2816 is_materialized: row.get::<_, bool>(3),
2817 })
2818 .collect();
2819
2820 let raw_indexes: Vec<RawIndexInfo> = client
2821 .query(POSTGRES_INDEXES_QUERY, &[])
2822 .await
2823 .map_err(|e| CliError::Other(format!("Failed to query indexes: {}", e)))?
2824 .into_iter()
2825 .map(|row| {
2826 let cols: Vec<String> = row.get(6);
2827 RawIndexInfo {
2828 schema: row.get::<_, String>(0),
2829 table: row.get::<_, String>(1),
2830 name: row.get::<_, String>(2),
2831 is_unique: row.get::<_, bool>(3),
2832 is_primary: row.get::<_, bool>(4),
2833 method: row.get::<_, String>(5),
2834 columns: parse_postgres_index_columns(cols),
2835 where_clause: row.get::<_, Option<String>>(7),
2836 concurrent: false,
2837 }
2838 })
2839 .collect();
2840
2841 let raw_fks: Vec<RawForeignKeyInfo> = client
2842 .query(POSTGRES_FOREIGN_KEYS_QUERY, &[])
2843 .await
2844 .map_err(|e| CliError::Other(format!("Failed to query foreign keys: {}", e)))?
2845 .into_iter()
2846 .map(|row| RawForeignKeyInfo {
2847 schema: row.get::<_, String>(0),
2848 table: row.get::<_, String>(1),
2849 name: row.get::<_, String>(2),
2850 columns: row.get::<_, Vec<String>>(3),
2851 schema_to: row.get::<_, String>(4),
2852 table_to: row.get::<_, String>(5),
2853 columns_to: row.get::<_, Vec<String>>(6),
2854 on_update: pg_action_code_to_string(row.get::<_, String>(7)),
2855 on_delete: pg_action_code_to_string(row.get::<_, String>(8)),
2856 })
2857 .collect();
2858
2859 let raw_pks: Vec<RawPrimaryKeyInfo> = client
2860 .query(POSTGRES_PRIMARY_KEYS_QUERY, &[])
2861 .await
2862 .map_err(|e| CliError::Other(format!("Failed to query primary keys: {}", e)))?
2863 .into_iter()
2864 .map(|row| RawPrimaryKeyInfo {
2865 schema: row.get::<_, String>(0),
2866 table: row.get::<_, String>(1),
2867 name: row.get::<_, String>(2),
2868 columns: row.get::<_, Vec<String>>(3),
2869 })
2870 .collect();
2871
2872 let raw_uniques: Vec<RawUniqueInfo> = client
2873 .query(POSTGRES_UNIQUES_QUERY, &[])
2874 .await
2875 .map_err(|e| CliError::Other(format!("Failed to query unique constraints: {}", e)))?
2876 .into_iter()
2877 .map(|row| RawUniqueInfo {
2878 schema: row.get::<_, String>(0),
2879 table: row.get::<_, String>(1),
2880 name: row.get::<_, String>(2),
2881 columns: row.get::<_, Vec<String>>(3),
2882 nulls_not_distinct: row.get::<_, bool>(4),
2883 })
2884 .collect();
2885
2886 let raw_checks: Vec<RawCheckInfo> = client
2887 .query(POSTGRES_CHECKS_QUERY, &[])
2888 .await
2889 .map_err(|e| CliError::Other(format!("Failed to query check constraints: {}", e)))?
2890 .into_iter()
2891 .map(|row| RawCheckInfo {
2892 schema: row.get::<_, String>(0),
2893 table: row.get::<_, String>(1),
2894 name: row.get::<_, String>(2),
2895 expression: row.get::<_, String>(3),
2896 })
2897 .collect();
2898
2899 let raw_roles: Vec<RawRoleInfo> = client
2900 .query(POSTGRES_ROLES_QUERY, &[])
2901 .await
2902 .map_err(|e| CliError::Other(format!("Failed to query roles: {}", e)))?
2903 .into_iter()
2904 .map(|row| RawRoleInfo {
2905 name: row.get::<_, String>(0),
2906 create_db: row.get::<_, bool>(1),
2907 create_role: row.get::<_, bool>(2),
2908 inherit: row.get::<_, bool>(3),
2909 })
2910 .collect();
2911
2912 let raw_policies: Vec<RawPolicyInfo> = client
2913 .query(POSTGRES_POLICIES_QUERY, &[])
2914 .await
2915 .map_err(|e| CliError::Other(format!("Failed to query policies: {}", e)))?
2916 .into_iter()
2917 .map(|row| RawPolicyInfo {
2918 schema: row.get::<_, String>(0),
2919 table: row.get::<_, String>(1),
2920 name: row.get::<_, String>(2),
2921 as_clause: row.get::<_, String>(3),
2922 for_clause: row.get::<_, String>(4),
2923 to: row.get::<_, Vec<String>>(5),
2924 using: row.get::<_, Option<String>>(6),
2925 with_check: row.get::<_, Option<String>>(7),
2926 })
2927 .collect();
2928
2929 let mut ddl = PostgresDDL::new();
2930 for s in raw_schemas.into_iter().map(|s| Schema::new(s.name)) {
2931 ddl.schemas.push(s);
2932 }
2933 for e in process_enums(&raw_enums) {
2934 ddl.enums.push(e);
2935 }
2936 for s in process_sequences(&raw_sequences) {
2937 ddl.sequences.push(s);
2938 }
2939 for r in process_roles(&raw_roles) {
2940 ddl.roles.push(r);
2941 }
2942 for p in process_policies(&raw_policies) {
2943 ddl.policies.push(p);
2944 }
2945 for t in process_tables(&raw_tables) {
2946 ddl.tables.push(t);
2947 }
2948 for c in process_columns(&raw_columns) {
2949 ddl.columns.push(c);
2950 }
2951 for i in process_indexes(&raw_indexes) {
2952 ddl.indexes.push(i);
2953 }
2954 for fk in process_foreign_keys(&raw_fks) {
2955 ddl.fks.push(fk);
2956 }
2957 for pk in process_primary_keys(&raw_pks) {
2958 ddl.pks.push(pk);
2959 }
2960 for u in process_unique_constraints(&raw_uniques) {
2961 ddl.uniques.push(u);
2962 }
2963 for c in process_check_constraints(&raw_checks) {
2964 ddl.checks.push(c);
2965 }
2966 for v in process_views(&raw_views) {
2967 ddl.views.push(v);
2968 }
2969
2970 let options = CodegenOptions {
2971 module_doc: Some(format!("Schema introspected from {}", mask_url(&url))),
2972 include_schema: true,
2973 schema_name: "Schema".to_string(),
2974 use_pub: true,
2975 };
2976 let generated = generate_rust_schema(&ddl, &options);
2977
2978 let mut snap = drizzle_migrations::postgres::PostgresSnapshot::new();
2979 for entity in ddl.to_entities() {
2980 snap.add_entity(entity);
2981 }
2982
2983 Ok(IntrospectResult {
2984 schema_code: generated.code,
2985 table_count: ddl.tables.list().len(),
2986 index_count: ddl.indexes.list().len(),
2987 view_count: ddl.views.list().len(),
2988 warnings: generated.warnings,
2989 snapshot: Snapshot::Postgres(snap),
2990 snapshot_path: std::path::PathBuf::new(),
2991 })
2992}
2993
2994#[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
3000#[derive(Debug, Clone)]
3001struct RawSchemaInfo {
3002 name: String,
3003}
3004
3005#[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
3006const POSTGRES_INDEXES_QUERY: &str = r#"
3007SELECT
3008 ns.nspname AS schema,
3009 tbl.relname AS table,
3010 idx.relname AS name,
3011 ix.indisunique AS is_unique,
3012 ix.indisprimary AS is_primary,
3013 am.amname AS method,
3014 array_agg(pg_get_indexdef(ix.indexrelid, s.n, true) ORDER BY s.n) AS columns,
3015 pg_get_expr(ix.indpred, ix.indrelid) AS where_clause
3016FROM pg_index ix
3017JOIN pg_class idx ON idx.oid = ix.indexrelid
3018JOIN pg_class tbl ON tbl.oid = ix.indrelid
3019JOIN pg_namespace ns ON ns.oid = tbl.relnamespace
3020JOIN pg_am am ON am.oid = idx.relam
3021JOIN generate_series(1, ix.indnkeyatts) AS s(n) ON TRUE
3022WHERE ns.nspname NOT LIKE 'pg_%'
3023 AND ns.nspname <> 'information_schema'
3024GROUP BY ns.nspname, tbl.relname, idx.relname, ix.indisunique, ix.indisprimary, am.amname, ix.indpred, ix.indrelid
3025ORDER BY ns.nspname, tbl.relname, idx.relname
3026"#;
3027
3028#[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
3029const POSTGRES_FOREIGN_KEYS_QUERY: &str = r#"
3030SELECT
3031 ns.nspname AS schema,
3032 tbl.relname AS table,
3033 con.conname AS name,
3034 array_agg(src.attname ORDER BY s.ord) AS columns,
3035 ns_to.nspname AS schema_to,
3036 tbl_to.relname AS table_to,
3037 array_agg(dst.attname ORDER BY s.ord) AS columns_to,
3038 con.confupdtype::text AS on_update,
3039 con.confdeltype::text AS on_delete
3040FROM pg_constraint con
3041JOIN pg_class tbl ON tbl.oid = con.conrelid
3042JOIN pg_namespace ns ON ns.oid = tbl.relnamespace
3043JOIN pg_class tbl_to ON tbl_to.oid = con.confrelid
3044JOIN pg_namespace ns_to ON ns_to.oid = tbl_to.relnamespace
3045JOIN unnest(con.conkey) WITH ORDINALITY AS s(attnum, ord) ON TRUE
3046JOIN pg_attribute src ON src.attrelid = tbl.oid AND src.attnum = s.attnum
3047JOIN unnest(con.confkey) WITH ORDINALITY AS r(attnum, ord) ON r.ord = s.ord
3048JOIN pg_attribute dst ON dst.attrelid = tbl_to.oid AND dst.attnum = r.attnum
3049WHERE con.contype = 'f'
3050 AND ns.nspname NOT LIKE 'pg_%'
3051 AND ns.nspname <> 'information_schema'
3052GROUP BY ns.nspname, tbl.relname, con.conname, ns_to.nspname, tbl_to.relname, con.confupdtype, con.confdeltype
3053ORDER BY ns.nspname, tbl.relname, con.conname
3054"#;
3055
3056#[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
3057const POSTGRES_PRIMARY_KEYS_QUERY: &str = r#"
3058SELECT
3059 ns.nspname AS schema,
3060 tbl.relname AS table,
3061 con.conname AS name,
3062 array_agg(att.attname ORDER BY s.ord) AS columns
3063FROM pg_constraint con
3064JOIN pg_class tbl ON tbl.oid = con.conrelid
3065JOIN pg_namespace ns ON ns.oid = tbl.relnamespace
3066JOIN unnest(con.conkey) WITH ORDINALITY AS s(attnum, ord) ON TRUE
3067JOIN pg_attribute att ON att.attrelid = tbl.oid AND att.attnum = s.attnum
3068WHERE con.contype = 'p'
3069 AND ns.nspname NOT LIKE 'pg_%'
3070 AND ns.nspname <> 'information_schema'
3071GROUP BY ns.nspname, tbl.relname, con.conname
3072ORDER BY ns.nspname, tbl.relname, con.conname
3073"#;
3074
3075#[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
3076const POSTGRES_UNIQUES_QUERY: &str = r#"
3077SELECT
3078 ns.nspname AS schema,
3079 tbl.relname AS table,
3080 con.conname AS name,
3081 array_agg(att.attname ORDER BY s.ord) AS columns,
3082 COALESCE(con.connullsnotdistinct, FALSE) AS nulls_not_distinct
3083FROM pg_constraint con
3084JOIN pg_class tbl ON tbl.oid = con.conrelid
3085JOIN pg_namespace ns ON ns.oid = tbl.relnamespace
3086JOIN unnest(con.conkey) WITH ORDINALITY AS s(attnum, ord) ON TRUE
3087JOIN pg_attribute att ON att.attrelid = tbl.oid AND att.attnum = s.attnum
3088WHERE con.contype = 'u'
3089 AND ns.nspname NOT LIKE 'pg_%'
3090 AND ns.nspname <> 'information_schema'
3091GROUP BY ns.nspname, tbl.relname, con.conname, con.connullsnotdistinct
3092ORDER BY ns.nspname, tbl.relname, con.conname
3093"#;
3094
3095#[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
3096const POSTGRES_CHECKS_QUERY: &str = r#"
3097SELECT
3098 ns.nspname AS schema,
3099 tbl.relname AS table,
3100 con.conname AS name,
3101 pg_get_expr(con.conbin, con.conrelid) AS expression
3102FROM pg_constraint con
3103JOIN pg_class tbl ON tbl.oid = con.conrelid
3104JOIN pg_namespace ns ON ns.oid = tbl.relnamespace
3105WHERE con.contype = 'c'
3106 AND ns.nspname NOT LIKE 'pg_%'
3107 AND ns.nspname <> 'information_schema'
3108ORDER BY ns.nspname, tbl.relname, con.conname
3109"#;
3110
3111#[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
3112const POSTGRES_ROLES_QUERY: &str = r#"
3113SELECT
3114 rolname AS name,
3115 rolcreatedb AS create_db,
3116 rolcreaterole AS create_role,
3117 rolinherit AS inherit
3118FROM pg_roles
3119ORDER BY rolname
3120"#;
3121
3122#[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
3123const POSTGRES_POLICIES_QUERY: &str = r#"
3124SELECT
3125 schemaname AS schema,
3126 tablename AS table,
3127 policyname AS name,
3128 CASE WHEN permissive THEN 'PERMISSIVE' ELSE 'RESTRICTIVE' END AS as_clause,
3129 upper(cmd) AS for_clause,
3130 roles AS to,
3131 qual AS using,
3132 with_check AS with_check
3133FROM pg_policies
3134WHERE schemaname NOT LIKE 'pg_%'
3135 AND schemaname <> 'information_schema'
3136ORDER BY schemaname, tablename, policyname
3137"#;
3138
3139#[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
3140fn pg_action_code_to_string(code: String) -> String {
3141 match code.as_str() {
3142 "a" => "NO ACTION",
3143 "r" => "RESTRICT",
3144 "c" => "CASCADE",
3145 "n" => "SET NULL",
3146 "d" => "SET DEFAULT",
3147 _ => "NO ACTION",
3148 }
3149 .to_string()
3150}
3151
3152#[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
3153fn parse_postgres_index_columns(
3154 cols: Vec<String>,
3155) -> Vec<drizzle_migrations::postgres::introspect::RawIndexColumnInfo> {
3156 use drizzle_migrations::postgres::introspect::RawIndexColumnInfo;
3157 cols.into_iter()
3158 .map(|c| {
3159 let trimmed = c.trim().to_string();
3160 let upper = trimmed.to_uppercase();
3161
3162 let asc = !upper.contains(" DESC");
3163 let nulls_first = upper.contains(" NULLS FIRST");
3164
3165 let mut core = trimmed.clone();
3167 for token in [" ASC", " DESC", " NULLS FIRST", " NULLS LAST"] {
3168 if let Some(pos) = core.to_uppercase().find(token) {
3169 core.truncate(pos);
3170 break;
3171 }
3172 }
3173 let core = core.trim().to_string();
3174
3175 let is_expression = core.contains('(')
3177 || core.contains(')')
3178 || core.contains(' ')
3179 || core.contains("::");
3180
3181 let mut opclass: Option<String> = None;
3183 let mut name = core.clone();
3184 let parts: Vec<&str> = core.split_whitespace().collect();
3185 if parts.len() >= 2 {
3186 let second = parts[1];
3187 if !matches!(second.to_uppercase().as_str(), "ASC" | "DESC" | "NULLS") {
3188 opclass = Some(second.to_string());
3189 name = parts[0].to_string();
3190 }
3191 }
3192
3193 RawIndexColumnInfo {
3194 name,
3195 is_expression,
3196 asc,
3197 nulls_first,
3198 opclass,
3199 }
3200 })
3201 .collect()
3202}
3203
3204#[cfg(any(feature = "postgres-sync", feature = "tokio-postgres"))]
3205fn mask_url(url: &str) -> String {
3206 if let Some(at) = url.find('@')
3207 && let Some(colon) = url[..at].rfind(':')
3208 {
3209 let scheme_end = url.find("://").map(|p| p + 3).unwrap_or(0);
3210 if colon > scheme_end {
3211 return format!("{}****{}", &url[..colon + 1], &url[at..]);
3212 }
3213 }
3214 url.to_string()
3215}