1use crate::{
5 filters::ReplicationFilter,
6 migration, postgres,
7 serendb::ConsoleClient,
8 table_rules::{QualifiedTable, TableRules},
9};
10use anyhow::{Context, Result};
11use inquire::{Confirm, MultiSelect, Select, Text};
12
13pub async fn select_seren_database() -> Result<String> {
26 print_header("Select SerenDB Target");
27
28 let api_key = get_api_key()?;
29 let client = ConsoleClient::new(None, api_key);
30
31 let projects = client.list_projects().await?;
33 if projects.is_empty() {
34 anyhow::bail!("No projects found for your account.");
35 }
36 let project_names: Vec<String> = projects.iter().map(|p| p.name.clone()).collect();
37 let selected_project_name = Select::new("Select a project:", project_names).prompt()?;
38 let selected_project = projects
39 .into_iter()
40 .find(|p| p.name == selected_project_name)
41 .unwrap();
42
43 let branch = client.get_default_branch(&selected_project.id).await?;
45 let databases = client
46 .list_databases(&selected_project.id, &branch.id)
47 .await?;
48 if databases.is_empty() {
49 anyhow::bail!(
50 "Project '{}' has no databases in its default branch.",
51 selected_project.name
52 );
53 }
54 let database_names: Vec<String> = databases.iter().map(|db| db.name.clone()).collect();
55 let selected_database_name = Select::new("Select a database:", database_names).prompt()?;
56 let selected_database = databases
57 .into_iter()
58 .find(|db| db.name == selected_database_name)
59 .unwrap();
60
61 let conn_str = client
63 .get_connection_string(
64 &selected_project.id,
65 &branch.id,
66 &selected_database.name,
67 false,
68 )
69 .await?;
70 Ok(conn_str)
71}
72
73enum WizardStep {
75 SelectDatabases,
76 SelectTablesForDb(usize), SelectSchemaOnlyForDb(usize), ConfigureTimeFiltersForDb(usize), Review,
80}
81
82struct CachedDbTables {
84 all_tables: Vec<migration::TableInfo>,
85 table_display_names: Vec<String>,
86}
87
88pub async fn select_databases_and_tables(
126 source_url: &str,
127) -> Result<(ReplicationFilter, TableRules)> {
128 tracing::info!("Starting interactive database and table selection...");
129 println!();
130
131 tracing::info!("Connecting to source database...");
133 let source_client = postgres::connect_with_retry(source_url)
134 .await
135 .context("Failed to connect to source database")?;
136 tracing::info!("✓ Connected to source");
137 println!();
138
139 tracing::info!("Discovering databases on source...");
141 let all_databases = migration::list_databases(&source_client)
142 .await
143 .context("Failed to list databases on source")?;
144
145 if all_databases.is_empty() {
146 tracing::warn!("⚠ No user databases found on source");
147 tracing::warn!(" Source appears to contain only template databases");
148 return Ok((ReplicationFilter::empty(), TableRules::default()));
149 }
150
151 tracing::info!("✓ Found {} database(s)", all_databases.len());
152 println!();
153
154 let db_names: Vec<String> = all_databases.iter().map(|db| db.name.clone()).collect();
155
156 let mut selected_db_indices: Vec<usize> = Vec::new();
158 let mut current_step = WizardStep::SelectDatabases;
159
160 let mut included_tables_by_db: std::collections::HashMap<String, Vec<String>> =
162 std::collections::HashMap::new();
163 let mut schema_only_by_db: std::collections::HashMap<String, Vec<(String, String)>> =
164 std::collections::HashMap::new(); let mut time_filters_by_db: std::collections::HashMap<
166 String,
167 Vec<(String, String, String, String)>,
168 > = std::collections::HashMap::new(); let mut table_cache: std::collections::HashMap<String, CachedDbTables> =
172 std::collections::HashMap::new();
173
174 loop {
175 match current_step {
176 WizardStep::SelectDatabases => {
177 print_header("Step 1 of 5: Select Databases");
178 println!("Navigation: Space to toggle, Enter to confirm, Esc to cancel");
179 println!();
180
181 let defaults: Vec<usize> = selected_db_indices.clone();
182
183 let selections =
184 MultiSelect::new("Select databases to replicate:", db_names.clone())
185 .with_default(&defaults)
186 .with_help_message("↑↓ navigate, Space toggle, Enter confirm")
187 .prompt();
188
189 match selections {
190 Ok(selected) => {
191 selected_db_indices = selected
193 .iter()
194 .filter_map(|name| db_names.iter().position(|n| n == name))
195 .collect();
196
197 if selected_db_indices.is_empty() {
198 println!();
199 println!("⚠ Please select at least one database");
200 continue;
201 }
202
203 included_tables_by_db.clear();
205 schema_only_by_db.clear();
206 time_filters_by_db.clear();
207 table_cache.clear();
208
209 current_step = WizardStep::SelectTablesForDb(0);
210 }
211 Err(inquire::InquireError::OperationCanceled) => {
212 anyhow::bail!("Operation cancelled by user");
213 }
214 Err(inquire::InquireError::OperationInterrupted) => {
215 anyhow::bail!("Operation interrupted");
216 }
217 Err(e) => return Err(e.into()),
218 }
219 }
220
221 WizardStep::SelectTablesForDb(db_idx) => {
222 let db_name = &db_names[selected_db_indices[db_idx]].clone();
223 print_header(&format!(
224 "Step 2 of 5: Select Tables to Include ({}/{})",
225 db_idx + 1,
226 selected_db_indices.len()
227 ));
228 println!("Database: {}", db_name);
229 println!("Press Enter without selecting to include ALL tables.");
230 println!("Navigation: Space to toggle, Enter to continue, Esc to go back");
231 println!();
232
233 let cached = get_or_cache_tables(&mut table_cache, source_url, db_name).await?;
235
236 if cached.all_tables.is_empty() {
237 println!(" No tables found in database '{}'", db_name);
238 if db_idx + 1 < selected_db_indices.len() {
240 current_step = WizardStep::SelectTablesForDb(db_idx + 1);
241 } else {
242 current_step = WizardStep::SelectSchemaOnlyForDb(0);
243 }
244 continue;
245 }
246
247 let previous_inclusions: Vec<usize> = included_tables_by_db
249 .get(db_name)
250 .map(|included| {
251 included
252 .iter()
253 .filter_map(|t| {
254 let stripped =
256 t.strip_prefix(&format!("{}.", db_name)).unwrap_or(t);
257 cached
258 .table_display_names
259 .iter()
260 .position(|n| n == stripped)
261 })
262 .collect()
263 })
264 .unwrap_or_default();
265
266 let selections = MultiSelect::new(
267 "Select tables to INCLUDE (Enter = include all):",
268 cached.table_display_names.clone(),
269 )
270 .with_default(&previous_inclusions)
271 .with_help_message("Space toggle, Enter confirm, Esc go back")
272 .prompt();
273
274 match selections {
275 Ok(selected_inclusions) => {
276 let db_inclusions: Vec<String> = if selected_inclusions.is_empty() {
278 cached
279 .table_display_names
280 .iter()
281 .map(|table_name| format!("{}.{}", db_name, table_name))
282 .collect()
283 } else {
284 selected_inclusions
285 .iter()
286 .map(|table_name| format!("{}.{}", db_name, table_name))
287 .collect()
288 };
289
290 included_tables_by_db.insert(db_name.clone(), db_inclusions);
292
293 if db_idx + 1 < selected_db_indices.len() {
295 current_step = WizardStep::SelectTablesForDb(db_idx + 1);
296 } else {
297 current_step = WizardStep::SelectSchemaOnlyForDb(0);
298 }
299 }
300 Err(inquire::InquireError::OperationCanceled) => {
301 if db_idx > 0 {
303 current_step = WizardStep::SelectTablesForDb(db_idx - 1);
304 } else {
305 current_step = WizardStep::SelectDatabases;
306 }
307 }
308 Err(inquire::InquireError::OperationInterrupted) => {
309 anyhow::bail!("Operation interrupted");
310 }
311 Err(e) => return Err(e.into()),
312 }
313 }
314
315 WizardStep::SelectSchemaOnlyForDb(db_idx) => {
316 let db_name = &db_names[selected_db_indices[db_idx]].clone();
317 print_header(&format!(
318 "Step 3 of 5: Schema-Only Tables ({}/{})",
319 db_idx + 1,
320 selected_db_indices.len()
321 ));
322 println!("Database: {}", db_name);
323 println!("Schema-only tables replicate structure but NO data.");
324 println!("Navigation: Space to toggle, Enter to continue, Esc to go back");
325 println!();
326
327 let cached = get_or_cache_tables(&mut table_cache, source_url, db_name).await?;
328
329 if cached.all_tables.is_empty() {
330 if db_idx + 1 < selected_db_indices.len() {
332 current_step = WizardStep::SelectSchemaOnlyForDb(db_idx + 1);
333 } else {
334 current_step = WizardStep::ConfigureTimeFiltersForDb(0);
335 }
336 continue;
337 }
338
339 let included = included_tables_by_db.get(db_name);
341 let available_tables: Vec<(usize, String)> = cached
342 .table_display_names
343 .iter()
344 .enumerate()
345 .filter(|(_, name)| {
346 let full_name = format!("{}.{}", db_name, name);
347 included.is_some_and(|inc| inc.contains(&full_name))
348 })
349 .map(|(idx, name)| (idx, name.clone()))
350 .collect();
351
352 if available_tables.is_empty() {
353 println!(" No tables included from '{}'", db_name);
354 if db_idx + 1 < selected_db_indices.len() {
355 current_step = WizardStep::SelectSchemaOnlyForDb(db_idx + 1);
356 } else {
357 current_step = WizardStep::ConfigureTimeFiltersForDb(0);
358 }
359 continue;
360 }
361
362 let available_names: Vec<String> =
363 available_tables.iter().map(|(_, n)| n.clone()).collect();
364
365 let previous_schema_only: Vec<usize> = schema_only_by_db
367 .get(db_name)
368 .map(|selected| {
369 selected
370 .iter()
371 .filter_map(|(schema, table)| {
372 let display = if schema == "public" {
373 table.clone()
374 } else {
375 format!("{}.{}", schema, table)
376 };
377 available_names.iter().position(|n| n == &display)
378 })
379 .collect()
380 })
381 .unwrap_or_default();
382
383 let selections = MultiSelect::new(
384 "Select tables to replicate SCHEMA-ONLY (no data):",
385 available_names.clone(),
386 )
387 .with_default(&previous_schema_only)
388 .with_help_message("Space toggle, Enter confirm, Esc go back")
389 .prompt();
390
391 match selections {
392 Ok(selected_schema_only) => {
393 let schema_only_tables: Vec<(String, String)> = selected_schema_only
395 .iter()
396 .filter_map(|display_name| {
397 available_tables
398 .iter()
399 .find(|(_, n)| n == display_name)
400 .map(|(idx, _)| {
401 let t = &cached.all_tables[*idx];
402 (t.schema.clone(), t.name.clone())
403 })
404 })
405 .collect();
406
407 schema_only_by_db.insert(db_name.clone(), schema_only_tables);
408
409 if db_idx + 1 < selected_db_indices.len() {
410 current_step = WizardStep::SelectSchemaOnlyForDb(db_idx + 1);
411 } else {
412 current_step = WizardStep::ConfigureTimeFiltersForDb(0);
413 }
414 }
415 Err(inquire::InquireError::OperationCanceled) => {
416 if db_idx > 0 {
418 current_step = WizardStep::SelectSchemaOnlyForDb(db_idx - 1);
419 } else {
420 let last_db = selected_db_indices.len().saturating_sub(1);
421 current_step = WizardStep::SelectTablesForDb(last_db);
422 }
423 }
424 Err(inquire::InquireError::OperationInterrupted) => {
425 anyhow::bail!("Operation interrupted");
426 }
427 Err(e) => return Err(e.into()),
428 }
429 }
430
431 WizardStep::ConfigureTimeFiltersForDb(db_idx) => {
432 let db_name = &db_names[selected_db_indices[db_idx]].clone();
433 print_header(&format!(
434 "Step 4 of 5: Time Filters ({}/{})",
435 db_idx + 1,
436 selected_db_indices.len()
437 ));
438 println!("Database: {}", db_name);
439 println!("Time filters limit data to recent records (e.g., last 90 days).");
440 println!();
441
442 let cached = get_or_cache_tables(&mut table_cache, source_url, db_name).await?;
443
444 if cached.all_tables.is_empty() {
445 if db_idx + 1 < selected_db_indices.len() {
446 current_step = WizardStep::ConfigureTimeFiltersForDb(db_idx + 1);
447 } else {
448 current_step = WizardStep::Review;
449 }
450 continue;
451 }
452
453 let included = included_tables_by_db.get(db_name);
455 let schema_only = schema_only_by_db.get(db_name);
456 let available_tables: Vec<(usize, String)> = cached
457 .table_display_names
458 .iter()
459 .enumerate()
460 .filter(|(idx, name)| {
461 let full_name = format!("{}.{}", db_name, name);
462 let is_included = included.is_some_and(|inc| inc.contains(&full_name));
463 let t = &cached.all_tables[*idx];
464 let is_schema_only = schema_only.is_some_and(|so| {
465 so.iter().any(|(s, n)| s == &t.schema && n == &t.name)
466 });
467 is_included && !is_schema_only
468 })
469 .map(|(idx, name)| (idx, name.clone()))
470 .collect();
471
472 if available_tables.is_empty() {
473 println!(" No tables available for time filtering in '{}'", db_name);
474 if db_idx + 1 < selected_db_indices.len() {
475 current_step = WizardStep::ConfigureTimeFiltersForDb(db_idx + 1);
476 } else {
477 current_step = WizardStep::Review;
478 }
479 continue;
480 }
481
482 let configure = Confirm::new("Configure time-based filters for this database?")
484 .with_default(false)
485 .with_help_message("Enter to confirm, Esc to go back")
486 .prompt();
487
488 match configure {
489 Ok(true) => {
490 let available_names: Vec<String> =
492 available_tables.iter().map(|(_, n)| n.clone()).collect();
493
494 let table_selections = MultiSelect::new(
495 "Select tables to apply time filter:",
496 available_names.clone(),
497 )
498 .with_help_message("Space toggle, Enter confirm")
499 .prompt();
500
501 match table_selections {
502 Ok(selected_tables) => {
503 let mut time_filters: Vec<(String, String, String, String)> =
504 Vec::new();
505
506 for display_name in &selected_tables {
507 if let Some((idx, _)) =
508 available_tables.iter().find(|(_, n)| n == display_name)
509 {
510 let t = &cached.all_tables[*idx];
511 let db_url = replace_database_in_url(source_url, db_name)?;
512 let db_client = postgres::connect_with_retry(&db_url)
513 .await
514 .context("Failed to connect for column query")?;
515
516 let columns = migration::get_table_columns(
518 &db_client, &t.schema, &t.name,
519 )
520 .await?;
521
522 let timestamp_columns: Vec<String> = columns
523 .iter()
524 .filter(|c| c.is_timestamp)
525 .map(|c| format!("{} ({})", c.name, c.data_type))
526 .collect();
527
528 println!();
529 println!("Configure time filter for '{}':", display_name);
530
531 let column = if timestamp_columns.is_empty() {
532 println!(
533 " ⚠ No timestamp columns found. Enter column name manually."
534 );
535 Text::new(" Column name:")
536 .with_default("created_at")
537 .prompt()
538 .context("Failed to get column name")?
539 } else {
540 let mut options = timestamp_columns.clone();
541 options.push("[Enter custom column name]".to_string());
542
543 let selection =
544 Select::new(" Select timestamp column:", options)
545 .prompt()
546 .context("Failed to select column")?;
547
548 if selection == "[Enter custom column name]" {
549 Text::new(" Column name:")
550 .prompt()
551 .context("Failed to get column name")?
552 } else {
553 selection
555 .split(" (")
556 .next()
557 .unwrap_or(&selection)
558 .to_string()
559 }
560 };
561
562 let window = Text::new(
563 " Time window (e.g., '90 days', '6 months', '1 year'):",
564 )
565 .with_default("90 days")
566 .prompt()
567 .context("Failed to get time window")?;
568
569 time_filters.push((
570 t.schema.clone(),
571 t.name.clone(),
572 column,
573 window,
574 ));
575 }
576 }
577
578 time_filters_by_db.insert(db_name.clone(), time_filters);
579 }
580 Err(inquire::InquireError::OperationCanceled) => {
581 continue;
583 }
584 Err(inquire::InquireError::OperationInterrupted) => {
585 anyhow::bail!("Operation interrupted");
586 }
587 Err(e) => return Err(e.into()),
588 }
589
590 if db_idx + 1 < selected_db_indices.len() {
591 current_step = WizardStep::ConfigureTimeFiltersForDb(db_idx + 1);
592 } else {
593 current_step = WizardStep::Review;
594 }
595 }
596 Ok(false) => {
597 if db_idx + 1 < selected_db_indices.len() {
599 current_step = WizardStep::ConfigureTimeFiltersForDb(db_idx + 1);
600 } else {
601 current_step = WizardStep::Review;
602 }
603 }
604 Err(inquire::InquireError::OperationCanceled) => {
605 if db_idx > 0 {
607 current_step = WizardStep::ConfigureTimeFiltersForDb(db_idx - 1);
608 } else {
609 let last_db = selected_db_indices.len().saturating_sub(1);
610 current_step = WizardStep::SelectSchemaOnlyForDb(last_db);
611 }
612 }
613 Err(inquire::InquireError::OperationInterrupted) => {
614 anyhow::bail!("Operation interrupted");
615 }
616 Err(e) => return Err(e.into()),
617 }
618 }
619
620 WizardStep::Review => {
621 print_header("Step 5 of 5: Review Configuration");
622
623 let included_tables: Vec<String> =
625 included_tables_by_db.values().flatten().cloned().collect();
626
627 let selected_databases: Vec<String> = selected_db_indices
628 .iter()
629 .map(|&i| db_names[i].clone())
630 .collect();
631
632 println!();
633 println!("Databases to replicate: {}", selected_databases.len());
634 for db in &selected_databases {
635 println!(" ✓ {}", db);
636 }
637 println!();
638
639 println!("Tables to replicate: {}", included_tables.len());
640 if included_tables.len() <= 20 {
641 for table in &included_tables {
642 println!(" ✓ {}", table);
643 }
644 } else {
645 for table in included_tables.iter().take(10) {
647 println!(" ✓ {}", table);
648 }
649 println!(" ... ({} more tables)", included_tables.len() - 15);
650 for table in included_tables.iter().skip(included_tables.len() - 5) {
651 println!(" ✓ {}", table);
652 }
653 }
654 println!();
655
656 let schema_only_count: usize = schema_only_by_db.values().map(|v| v.len()).sum();
658 if schema_only_count > 0 {
659 println!("Schema-only tables (no data): {}", schema_only_count);
660 for (db, tables) in &schema_only_by_db {
661 for (schema, table) in tables {
662 let display = if schema == "public" {
663 format!("{}.{}", db, table)
664 } else {
665 format!("{}.{}.{}", db, schema, table)
666 };
667 println!(" ◇ {}", display);
668 }
669 }
670 println!();
671 } else {
672 println!("Schema-only tables: none");
673 println!();
674 }
675
676 let time_filter_count: usize = time_filters_by_db.values().map(|v| v.len()).sum();
678 if time_filter_count > 0 {
679 println!("Time-filtered tables: {}", time_filter_count);
680 for (db, filters) in &time_filters_by_db {
681 for (schema, table, column, window) in filters {
682 let display = if schema == "public" {
683 format!("{}.{}", db, table)
684 } else {
685 format!("{}.{}.{}", db, schema, table)
686 };
687 println!(" ⏱ {} ({} >= last {})", display, column, window);
688 }
689 }
690 println!();
691 } else {
692 println!("Time filters: none");
693 println!();
694 }
695
696 println!("───────────────────────────────────────────────────────────────");
697 println!();
698
699 let confirmed = Confirm::new("Proceed with this configuration?")
700 .with_default(true)
701 .with_help_message("Enter confirm, Esc go back")
702 .prompt();
703
704 match confirmed {
705 Ok(true) => break, Ok(false) | Err(inquire::InquireError::OperationCanceled) => {
707 let last_db = selected_db_indices.len().saturating_sub(1);
709 current_step = WizardStep::ConfigureTimeFiltersForDb(last_db);
710 }
711 Err(inquire::InquireError::OperationInterrupted) => {
712 anyhow::bail!("Operation interrupted");
713 }
714 Err(e) => return Err(e.into()),
715 }
716 }
717 }
718 }
719
720 let selected_databases: Vec<String> = selected_db_indices
722 .iter()
723 .map(|&i| db_names[i].clone())
724 .collect();
725
726 let included_tables: Vec<String> = included_tables_by_db.values().flatten().cloned().collect();
727
728 tracing::info!("");
729 tracing::info!("✓ Configuration confirmed");
730 tracing::info!("");
731
732 let filter = if included_tables.is_empty() {
734 ReplicationFilter::new(Some(selected_databases), None, None, None)?
735 } else {
736 ReplicationFilter::new(Some(selected_databases), None, Some(included_tables), None)?
737 };
738
739 let mut table_rules = TableRules::default();
741
742 for (db, tables) in &schema_only_by_db {
744 for (schema, table) in tables {
745 let qualified = QualifiedTable::new(Some(db.clone()), schema.clone(), table.clone());
746 table_rules.add_schema_only_table(qualified)?;
747 }
748 }
749
750 for (db, filters) in &time_filters_by_db {
752 for (schema, table, column, window) in filters {
753 let qualified = QualifiedTable::new(Some(db.clone()), schema.clone(), table.clone());
754 table_rules.add_time_filter(qualified, column.clone(), window.clone())?;
755 }
756 }
757
758 Ok((filter, table_rules))
759}
760
761async fn get_or_cache_tables<'a>(
763 cache: &'a mut std::collections::HashMap<String, CachedDbTables>,
764 source_url: &str,
765 db_name: &str,
766) -> Result<&'a CachedDbTables> {
767 if !cache.contains_key(db_name) {
768 let db_url = replace_database_in_url(source_url, db_name)?;
769 let db_client = postgres::connect_with_retry(&db_url)
770 .await
771 .context(format!("Failed to connect to database '{}'", db_name))?;
772
773 let all_tables = migration::list_tables(&db_client)
774 .await
775 .context(format!("Failed to list tables from database '{}'", db_name))?;
776
777 let table_display_names: Vec<String> = all_tables
778 .iter()
779 .map(|t| {
780 if t.schema == "public" {
781 t.name.clone()
782 } else {
783 format!("{}.{}", t.schema, t.name)
784 }
785 })
786 .collect();
787
788 cache.insert(
789 db_name.to_string(),
790 CachedDbTables {
791 all_tables,
792 table_display_names,
793 },
794 );
795 }
796
797 Ok(cache.get(db_name).unwrap())
798}
799
800fn print_header(title: &str) {
802 println!();
803 println!("╔{}╗", "═".repeat(62));
804 println!("║ {:<60}║", title);
805 println!("╚{}╝", "═".repeat(62));
806 println!();
807}
808
809fn replace_database_in_url(url: &str, new_db_name: &str) -> Result<String> {
820 let parts: Vec<&str> = url.splitn(2, '?').collect();
822 let base_url = parts[0];
823 let query_params = parts.get(1);
824
825 let url_parts: Vec<&str> = base_url.rsplitn(2, '/').collect();
827
828 if url_parts.len() != 2 {
829 anyhow::bail!("Invalid connection URL format: cannot replace database name");
830 }
831
832 let new_url = if let Some(params) = query_params {
834 format!("{}/{}?{}", url_parts[1], new_db_name, params)
835 } else {
836 format!("{}/{}", url_parts[1], new_db_name)
837 };
838
839 Ok(new_url)
840}
841
842pub fn get_api_key() -> anyhow::Result<String> {
843 use dialoguer::{theme::ColorfulTheme, Input};
844
845 if let Ok(key) = std::env::var("SEREN_API_KEY") {
847 if !key.trim().is_empty() {
848 return Ok(key.trim().to_string());
849 }
850 }
851
852 println!("\nRemote execution requires a SerenDB API key for authentication.");
854 println!("\nYou can generate an API key at:");
855 println!(" https://console.serendb.com/api-keys\n");
856
857 let key: String = Input::with_theme(&ColorfulTheme::default())
858 .with_prompt("Enter your SerenDB API key")
859 .allow_empty(false)
860 .interact_text()?;
861
862 if key.trim().is_empty() {
863 anyhow::bail!(
864 "API key is required for remote execution.\n\
865 Set the SEREN_API_KEY environment variable or run interactively.\n\
866 Get your API key at: https://console.serendb.com/api-keys\n\
867 Or use --local to run replication on your machine instead"
868 );
869 }
870
871 Ok(key.trim().to_string())
872}
873
874#[cfg(test)]
875mod tests {
876 use super::*;
877
878 #[test]
879 fn test_replace_database_in_url() {
880 let url = "postgresql://user:pass@localhost:5432/olddb";
882 let new_url = replace_database_in_url(url, "newdb").unwrap();
883 assert_eq!(new_url, "postgresql://user:pass@localhost:5432/newdb");
884
885 let url = "postgresql://user:pass@localhost:5432/olddb?sslmode=require";
887 let new_url = replace_database_in_url(url, "newdb").unwrap();
888 assert_eq!(
889 new_url,
890 "postgresql://user:pass@localhost:5432/newdb?sslmode=require"
891 );
892
893 let url = "postgresql://user:pass@localhost/olddb";
895 let new_url = replace_database_in_url(url, "newdb").unwrap();
896 assert_eq!(new_url, "postgresql://user:pass@localhost/newdb");
897 }
898
899 #[tokio::test]
900 #[ignore]
901 async fn test_interactive_selection() {
902 let source_url = std::env::var("TEST_SOURCE_URL").unwrap();
904
905 let result = select_databases_and_tables(&source_url).await;
906
907 match &result {
909 Ok((filter, rules)) => {
910 println!("✓ Interactive selection completed");
911 println!("Filter: {:?}", filter);
912 println!("Rules: {:?}", rules);
913 }
914 Err(e) => {
915 println!("Interactive selection error: {:?}", e);
916 }
917 }
918 }
919}