database_replicator/
interactive.rs

1// ABOUTME: Interactive terminal UI for database and table selection
2// ABOUTME: Provides multi-step wizard with back navigation using inquire crate
3
4use crate::{
5    filters::ReplicationFilter,
6    migration, postgres,
7    serendb::ConsoleClient,
8    table_rules::{QualifiedTable, TableRules},
9};
10use anyhow::{Context, Result};
11use inquire::{Confirm, MultiSelect, Select, Text};
12
13/// Prompts the user to select a SerenDB project and database interactively.
14///
15/// This function will:
16/// 1. Get the SerenDB API key (from environment or prompt).
17/// 2. Fetch and display a list of projects for the user to select.
18/// 3. Fetch the default branch for the selected project.
19/// 4. Fetch and display a list of databases for the user to select.
20/// 5. Return the connection string for the selected database.
21///
22/// # Returns
23///
24/// A `Result` containing the connection string of the selected database.
25pub async fn select_seren_database() -> Result<String> {
26    print_header("Select SerenDB Target");
27
28    let api_key = get_api_key()?;
29    let client = ConsoleClient::new(None, api_key);
30
31    // 1. Select a project
32    let projects = client.list_projects().await?;
33    if projects.is_empty() {
34        anyhow::bail!("No projects found for your account.");
35    }
36    let project_names: Vec<String> = projects.iter().map(|p| p.name.clone()).collect();
37    let selected_project_name = Select::new("Select a project:", project_names).prompt()?;
38    let selected_project = projects
39        .into_iter()
40        .find(|p| p.name == selected_project_name)
41        .unwrap();
42
43    // 2. Select a database
44    let branch = client.get_default_branch(&selected_project.id).await?;
45    let databases = client
46        .list_databases(&selected_project.id, &branch.id)
47        .await?;
48    if databases.is_empty() {
49        anyhow::bail!(
50            "Project '{}' has no databases in its default branch.",
51            selected_project.name
52        );
53    }
54    let database_names: Vec<String> = databases.iter().map(|db| db.name.clone()).collect();
55    let selected_database_name = Select::new("Select a database:", database_names).prompt()?;
56    let selected_database = databases
57        .into_iter()
58        .find(|db| db.name == selected_database_name)
59        .unwrap();
60
61    // 3. Get connection string
62    let conn_str = client
63        .get_connection_string(
64            &selected_project.id,
65            &branch.id,
66            &selected_database.name,
67            false,
68        )
69        .await?;
70    Ok(conn_str)
71}
72
73/// Wizard step state machine
74enum WizardStep {
75    SelectDatabases,
76    SelectTablesForDb(usize), // index of current database in selected_dbs
77    SelectSchemaOnlyForDb(usize), // schema-only tables selection
78    ConfigureTimeFiltersForDb(usize), // time filter configuration
79    Review,
80}
81
82/// Cached table info for a database (to avoid repeated queries)
83struct CachedDbTables {
84    all_tables: Vec<migration::TableInfo>,
85    table_display_names: Vec<String>,
86}
87
88/// Interactive database and table selection with back navigation
89///
90/// Presents a terminal UI for selecting:
91/// 1. Which databases to replicate (multi-select)
92/// 2. For each selected database: tables to include (Enter = include all)
93/// 3. For each selected database: tables to replicate schema-only (no data)
94/// 4. For each selected database: time-based filters
95/// 5. Summary and confirmation
96///
97/// Supports back navigation:
98/// - Cancel/Esc from any step → go back to previous step
99///
100/// Returns a tuple of `(ReplicationFilter, TableRules)` representing the user's selections.
101///
102/// # Arguments
103///
104/// * `source_url` - PostgreSQL connection string for source database
105///
106/// # Returns
107///
108/// Returns `Ok((ReplicationFilter, TableRules))` with the user's selections or an error if:
109/// - Cannot connect to source database
110/// - Cannot discover databases or tables
111/// - User cancels the operation
112///
113/// # Examples
114///
115/// ```no_run
116/// # use anyhow::Result;
117/// # use database_replicator::interactive::select_databases_and_tables;
118/// # async fn example() -> Result<()> {
119/// let (filter, rules) = select_databases_and_tables(
120///     "postgresql://user:pass@source.example.com/postgres"
121/// ).await?;
122/// # Ok(())
123/// # }
124/// ```
125pub async fn select_databases_and_tables(
126    source_url: &str,
127) -> Result<(ReplicationFilter, TableRules)> {
128    tracing::info!("Starting interactive database and table selection...");
129    println!();
130
131    // Connect to source database
132    tracing::info!("Connecting to source database...");
133    let source_client = postgres::connect_with_retry(source_url)
134        .await
135        .context("Failed to connect to source database")?;
136    tracing::info!("✓ Connected to source");
137    println!();
138
139    // Discover databases
140    tracing::info!("Discovering databases on source...");
141    let all_databases = migration::list_databases(&source_client)
142        .await
143        .context("Failed to list databases on source")?;
144
145    if all_databases.is_empty() {
146        tracing::warn!("⚠ No user databases found on source");
147        tracing::warn!("  Source appears to contain only template databases");
148        return Ok((ReplicationFilter::empty(), TableRules::default()));
149    }
150
151    tracing::info!("✓ Found {} database(s)", all_databases.len());
152    println!();
153
154    let db_names: Vec<String> = all_databases.iter().map(|db| db.name.clone()).collect();
155
156    // State for wizard
157    let mut selected_db_indices: Vec<usize> = Vec::new();
158    let mut current_step = WizardStep::SelectDatabases;
159
160    // Track selections per database for back navigation
161    let mut included_tables_by_db: std::collections::HashMap<String, Vec<String>> =
162        std::collections::HashMap::new();
163    let mut schema_only_by_db: std::collections::HashMap<String, Vec<(String, String)>> =
164        std::collections::HashMap::new(); // (schema, table)
165    let mut time_filters_by_db: std::collections::HashMap<
166        String,
167        Vec<(String, String, String, String)>,
168    > = std::collections::HashMap::new(); // (schema, table, column, window)
169
170    // Cache table info per database to avoid repeated queries
171    let mut table_cache: std::collections::HashMap<String, CachedDbTables> =
172        std::collections::HashMap::new();
173
174    loop {
175        match current_step {
176            WizardStep::SelectDatabases => {
177                print_header("Step 1 of 5: Select Databases");
178                println!("Navigation: Space to toggle, Enter to confirm, Esc to cancel");
179                println!();
180
181                let defaults: Vec<usize> = selected_db_indices.clone();
182
183                let selections =
184                    MultiSelect::new("Select databases to replicate:", db_names.clone())
185                        .with_default(&defaults)
186                        .with_help_message("↑↓ navigate, Space toggle, Enter confirm")
187                        .prompt();
188
189                match selections {
190                    Ok(selected) => {
191                        // Convert selected names back to indices
192                        selected_db_indices = selected
193                            .iter()
194                            .filter_map(|name| db_names.iter().position(|n| n == name))
195                            .collect();
196
197                        if selected_db_indices.is_empty() {
198                            println!();
199                            println!("⚠ Please select at least one database");
200                            continue;
201                        }
202
203                        // Clear previous selections when re-selecting databases
204                        included_tables_by_db.clear();
205                        schema_only_by_db.clear();
206                        time_filters_by_db.clear();
207                        table_cache.clear();
208
209                        current_step = WizardStep::SelectTablesForDb(0);
210                    }
211                    Err(inquire::InquireError::OperationCanceled) => {
212                        anyhow::bail!("Operation cancelled by user");
213                    }
214                    Err(inquire::InquireError::OperationInterrupted) => {
215                        anyhow::bail!("Operation interrupted");
216                    }
217                    Err(e) => return Err(e.into()),
218                }
219            }
220
221            WizardStep::SelectTablesForDb(db_idx) => {
222                let db_name = &db_names[selected_db_indices[db_idx]].clone();
223                print_header(&format!(
224                    "Step 2 of 5: Select Tables to Include ({}/{})",
225                    db_idx + 1,
226                    selected_db_indices.len()
227                ));
228                println!("Database: {}", db_name);
229                println!("Press Enter without selecting to include ALL tables.");
230                println!("Navigation: Space to toggle, Enter to continue, Esc to go back");
231                println!();
232
233                // Get or cache tables for this database
234                let cached = get_or_cache_tables(&mut table_cache, source_url, db_name).await?;
235
236                if cached.all_tables.is_empty() {
237                    println!("  No tables found in database '{}'", db_name);
238                    // Skip to next database or next step
239                    if db_idx + 1 < selected_db_indices.len() {
240                        current_step = WizardStep::SelectTablesForDb(db_idx + 1);
241                    } else {
242                        current_step = WizardStep::SelectSchemaOnlyForDb(0);
243                    }
244                    continue;
245                }
246
247                // Get previously included tables for this database (for back navigation)
248                let previous_inclusions: Vec<usize> = included_tables_by_db
249                    .get(db_name)
250                    .map(|included| {
251                        included
252                            .iter()
253                            .filter_map(|t| {
254                                // Strip db name prefix to match display names
255                                let stripped =
256                                    t.strip_prefix(&format!("{}.", db_name)).unwrap_or(t);
257                                cached
258                                    .table_display_names
259                                    .iter()
260                                    .position(|n| n == stripped)
261                            })
262                            .collect()
263                    })
264                    .unwrap_or_default();
265
266                let selections = MultiSelect::new(
267                    "Select tables to INCLUDE (Enter = include all):",
268                    cached.table_display_names.clone(),
269                )
270                .with_default(&previous_inclusions)
271                .with_help_message("Space toggle, Enter confirm, Esc go back")
272                .prompt();
273
274                match selections {
275                    Ok(selected_inclusions) => {
276                        // If nothing selected, include all tables
277                        let db_inclusions: Vec<String> = if selected_inclusions.is_empty() {
278                            cached
279                                .table_display_names
280                                .iter()
281                                .map(|table_name| format!("{}.{}", db_name, table_name))
282                                .collect()
283                        } else {
284                            selected_inclusions
285                                .iter()
286                                .map(|table_name| format!("{}.{}", db_name, table_name))
287                                .collect()
288                        };
289
290                        // Store for back navigation
291                        included_tables_by_db.insert(db_name.clone(), db_inclusions);
292
293                        // Move to next database or schema-only step
294                        if db_idx + 1 < selected_db_indices.len() {
295                            current_step = WizardStep::SelectTablesForDb(db_idx + 1);
296                        } else {
297                            current_step = WizardStep::SelectSchemaOnlyForDb(0);
298                        }
299                    }
300                    Err(inquire::InquireError::OperationCanceled) => {
301                        // Go back to previous step
302                        if db_idx > 0 {
303                            current_step = WizardStep::SelectTablesForDb(db_idx - 1);
304                        } else {
305                            current_step = WizardStep::SelectDatabases;
306                        }
307                    }
308                    Err(inquire::InquireError::OperationInterrupted) => {
309                        anyhow::bail!("Operation interrupted");
310                    }
311                    Err(e) => return Err(e.into()),
312                }
313            }
314
315            WizardStep::SelectSchemaOnlyForDb(db_idx) => {
316                let db_name = &db_names[selected_db_indices[db_idx]].clone();
317                print_header(&format!(
318                    "Step 3 of 5: Schema-Only Tables ({}/{})",
319                    db_idx + 1,
320                    selected_db_indices.len()
321                ));
322                println!("Database: {}", db_name);
323                println!("Schema-only tables replicate structure but NO data.");
324                println!("Navigation: Space to toggle, Enter to continue, Esc to go back");
325                println!();
326
327                let cached = get_or_cache_tables(&mut table_cache, source_url, db_name).await?;
328
329                if cached.all_tables.is_empty() {
330                    // Skip to next database or time filters
331                    if db_idx + 1 < selected_db_indices.len() {
332                        current_step = WizardStep::SelectSchemaOnlyForDb(db_idx + 1);
333                    } else {
334                        current_step = WizardStep::ConfigureTimeFiltersForDb(0);
335                    }
336                    continue;
337                }
338
339                // Filter to only included tables
340                let included = included_tables_by_db.get(db_name);
341                let available_tables: Vec<(usize, String)> = cached
342                    .table_display_names
343                    .iter()
344                    .enumerate()
345                    .filter(|(_, name)| {
346                        let full_name = format!("{}.{}", db_name, name);
347                        included.is_some_and(|inc| inc.contains(&full_name))
348                    })
349                    .map(|(idx, name)| (idx, name.clone()))
350                    .collect();
351
352                if available_tables.is_empty() {
353                    println!("  No tables included from '{}'", db_name);
354                    if db_idx + 1 < selected_db_indices.len() {
355                        current_step = WizardStep::SelectSchemaOnlyForDb(db_idx + 1);
356                    } else {
357                        current_step = WizardStep::ConfigureTimeFiltersForDb(0);
358                    }
359                    continue;
360                }
361
362                let available_names: Vec<String> =
363                    available_tables.iter().map(|(_, n)| n.clone()).collect();
364
365                // Get previous schema-only selections
366                let previous_schema_only: Vec<usize> = schema_only_by_db
367                    .get(db_name)
368                    .map(|selected| {
369                        selected
370                            .iter()
371                            .filter_map(|(schema, table)| {
372                                let display = if schema == "public" {
373                                    table.clone()
374                                } else {
375                                    format!("{}.{}", schema, table)
376                                };
377                                available_names.iter().position(|n| n == &display)
378                            })
379                            .collect()
380                    })
381                    .unwrap_or_default();
382
383                let selections = MultiSelect::new(
384                    "Select tables to replicate SCHEMA-ONLY (no data):",
385                    available_names.clone(),
386                )
387                .with_default(&previous_schema_only)
388                .with_help_message("Space toggle, Enter confirm, Esc go back")
389                .prompt();
390
391                match selections {
392                    Ok(selected_schema_only) => {
393                        // Convert to (schema, table) pairs
394                        let schema_only_tables: Vec<(String, String)> = selected_schema_only
395                            .iter()
396                            .filter_map(|display_name| {
397                                available_tables
398                                    .iter()
399                                    .find(|(_, n)| n == display_name)
400                                    .map(|(idx, _)| {
401                                        let t = &cached.all_tables[*idx];
402                                        (t.schema.clone(), t.name.clone())
403                                    })
404                            })
405                            .collect();
406
407                        schema_only_by_db.insert(db_name.clone(), schema_only_tables);
408
409                        if db_idx + 1 < selected_db_indices.len() {
410                            current_step = WizardStep::SelectSchemaOnlyForDb(db_idx + 1);
411                        } else {
412                            current_step = WizardStep::ConfigureTimeFiltersForDb(0);
413                        }
414                    }
415                    Err(inquire::InquireError::OperationCanceled) => {
416                        // Go back
417                        if db_idx > 0 {
418                            current_step = WizardStep::SelectSchemaOnlyForDb(db_idx - 1);
419                        } else {
420                            let last_db = selected_db_indices.len().saturating_sub(1);
421                            current_step = WizardStep::SelectTablesForDb(last_db);
422                        }
423                    }
424                    Err(inquire::InquireError::OperationInterrupted) => {
425                        anyhow::bail!("Operation interrupted");
426                    }
427                    Err(e) => return Err(e.into()),
428                }
429            }
430
431            WizardStep::ConfigureTimeFiltersForDb(db_idx) => {
432                let db_name = &db_names[selected_db_indices[db_idx]].clone();
433                print_header(&format!(
434                    "Step 4 of 5: Time Filters ({}/{})",
435                    db_idx + 1,
436                    selected_db_indices.len()
437                ));
438                println!("Database: {}", db_name);
439                println!("Time filters limit data to recent records (e.g., last 90 days).");
440                println!();
441
442                let cached = get_or_cache_tables(&mut table_cache, source_url, db_name).await?;
443
444                if cached.all_tables.is_empty() {
445                    if db_idx + 1 < selected_db_indices.len() {
446                        current_step = WizardStep::ConfigureTimeFiltersForDb(db_idx + 1);
447                    } else {
448                        current_step = WizardStep::Review;
449                    }
450                    continue;
451                }
452
453                // Filter to included tables, excluding schema-only ones
454                let included = included_tables_by_db.get(db_name);
455                let schema_only = schema_only_by_db.get(db_name);
456                let available_tables: Vec<(usize, String)> = cached
457                    .table_display_names
458                    .iter()
459                    .enumerate()
460                    .filter(|(idx, name)| {
461                        let full_name = format!("{}.{}", db_name, name);
462                        let is_included = included.is_some_and(|inc| inc.contains(&full_name));
463                        let t = &cached.all_tables[*idx];
464                        let is_schema_only = schema_only.is_some_and(|so| {
465                            so.iter().any(|(s, n)| s == &t.schema && n == &t.name)
466                        });
467                        is_included && !is_schema_only
468                    })
469                    .map(|(idx, name)| (idx, name.clone()))
470                    .collect();
471
472                if available_tables.is_empty() {
473                    println!("  No tables available for time filtering in '{}'", db_name);
474                    if db_idx + 1 < selected_db_indices.len() {
475                        current_step = WizardStep::ConfigureTimeFiltersForDb(db_idx + 1);
476                    } else {
477                        current_step = WizardStep::Review;
478                    }
479                    continue;
480                }
481
482                // Ask if user wants to configure time filters
483                let configure = Confirm::new("Configure time-based filters for this database?")
484                    .with_default(false)
485                    .with_help_message("Enter to confirm, Esc to go back")
486                    .prompt();
487
488                match configure {
489                    Ok(true) => {
490                        // Let user select tables to filter
491                        let available_names: Vec<String> =
492                            available_tables.iter().map(|(_, n)| n.clone()).collect();
493
494                        let table_selections = MultiSelect::new(
495                            "Select tables to apply time filter:",
496                            available_names.clone(),
497                        )
498                        .with_help_message("Space toggle, Enter confirm")
499                        .prompt();
500
501                        match table_selections {
502                            Ok(selected_tables) => {
503                                let mut time_filters: Vec<(String, String, String, String)> =
504                                    Vec::new();
505
506                                for display_name in &selected_tables {
507                                    if let Some((idx, _)) =
508                                        available_tables.iter().find(|(_, n)| n == display_name)
509                                    {
510                                        let t = &cached.all_tables[*idx];
511                                        let db_url = replace_database_in_url(source_url, db_name)?;
512                                        let db_client = postgres::connect_with_retry(&db_url)
513                                            .await
514                                            .context("Failed to connect for column query")?;
515
516                                        // Get timestamp columns
517                                        let columns = migration::get_table_columns(
518                                            &db_client, &t.schema, &t.name,
519                                        )
520                                        .await?;
521
522                                        let timestamp_columns: Vec<String> = columns
523                                            .iter()
524                                            .filter(|c| c.is_timestamp)
525                                            .map(|c| format!("{} ({})", c.name, c.data_type))
526                                            .collect();
527
528                                        println!();
529                                        println!("Configure time filter for '{}':", display_name);
530
531                                        let column = if timestamp_columns.is_empty() {
532                                            println!(
533                                                "  ⚠ No timestamp columns found. Enter column name manually."
534                                            );
535                                            Text::new("  Column name:")
536                                                .with_default("created_at")
537                                                .prompt()
538                                                .context("Failed to get column name")?
539                                        } else {
540                                            let mut options = timestamp_columns.clone();
541                                            options.push("[Enter custom column name]".to_string());
542
543                                            let selection =
544                                                Select::new("  Select timestamp column:", options)
545                                                    .prompt()
546                                                    .context("Failed to select column")?;
547
548                                            if selection == "[Enter custom column name]" {
549                                                Text::new("  Column name:")
550                                                    .prompt()
551                                                    .context("Failed to get column name")?
552                                            } else {
553                                                // Extract column name from "name (type)" format
554                                                selection
555                                                    .split(" (")
556                                                    .next()
557                                                    .unwrap_or(&selection)
558                                                    .to_string()
559                                            }
560                                        };
561
562                                        let window = Text::new(
563                                            "  Time window (e.g., '90 days', '6 months', '1 year'):",
564                                        )
565                                        .with_default("90 days")
566                                        .prompt()
567                                        .context("Failed to get time window")?;
568
569                                        time_filters.push((
570                                            t.schema.clone(),
571                                            t.name.clone(),
572                                            column,
573                                            window,
574                                        ));
575                                    }
576                                }
577
578                                time_filters_by_db.insert(db_name.clone(), time_filters);
579                            }
580                            Err(inquire::InquireError::OperationCanceled) => {
581                                // Stay on this step
582                                continue;
583                            }
584                            Err(inquire::InquireError::OperationInterrupted) => {
585                                anyhow::bail!("Operation interrupted");
586                            }
587                            Err(e) => return Err(e.into()),
588                        }
589
590                        if db_idx + 1 < selected_db_indices.len() {
591                            current_step = WizardStep::ConfigureTimeFiltersForDb(db_idx + 1);
592                        } else {
593                            current_step = WizardStep::Review;
594                        }
595                    }
596                    Ok(false) => {
597                        // Skip time filters for this database
598                        if db_idx + 1 < selected_db_indices.len() {
599                            current_step = WizardStep::ConfigureTimeFiltersForDb(db_idx + 1);
600                        } else {
601                            current_step = WizardStep::Review;
602                        }
603                    }
604                    Err(inquire::InquireError::OperationCanceled) => {
605                        // Go back
606                        if db_idx > 0 {
607                            current_step = WizardStep::ConfigureTimeFiltersForDb(db_idx - 1);
608                        } else {
609                            let last_db = selected_db_indices.len().saturating_sub(1);
610                            current_step = WizardStep::SelectSchemaOnlyForDb(last_db);
611                        }
612                    }
613                    Err(inquire::InquireError::OperationInterrupted) => {
614                        anyhow::bail!("Operation interrupted");
615                    }
616                    Err(e) => return Err(e.into()),
617                }
618            }
619
620            WizardStep::Review => {
621                print_header("Step 5 of 5: Review Configuration");
622
623                // Collect all inclusions
624                let included_tables: Vec<String> =
625                    included_tables_by_db.values().flatten().cloned().collect();
626
627                let selected_databases: Vec<String> = selected_db_indices
628                    .iter()
629                    .map(|&i| db_names[i].clone())
630                    .collect();
631
632                println!();
633                println!("Databases to replicate: {}", selected_databases.len());
634                for db in &selected_databases {
635                    println!("  ✓ {}", db);
636                }
637                println!();
638
639                println!("Tables to replicate: {}", included_tables.len());
640                if included_tables.len() <= 20 {
641                    for table in &included_tables {
642                        println!("  ✓ {}", table);
643                    }
644                } else {
645                    // Show first 10 and last 5 with ellipsis
646                    for table in included_tables.iter().take(10) {
647                        println!("  ✓ {}", table);
648                    }
649                    println!("  ... ({} more tables)", included_tables.len() - 15);
650                    for table in included_tables.iter().skip(included_tables.len() - 5) {
651                        println!("  ✓ {}", table);
652                    }
653                }
654                println!();
655
656                // Show schema-only tables
657                let schema_only_count: usize = schema_only_by_db.values().map(|v| v.len()).sum();
658                if schema_only_count > 0 {
659                    println!("Schema-only tables (no data): {}", schema_only_count);
660                    for (db, tables) in &schema_only_by_db {
661                        for (schema, table) in tables {
662                            let display = if schema == "public" {
663                                format!("{}.{}", db, table)
664                            } else {
665                                format!("{}.{}.{}", db, schema, table)
666                            };
667                            println!("  ◇ {}", display);
668                        }
669                    }
670                    println!();
671                } else {
672                    println!("Schema-only tables: none");
673                    println!();
674                }
675
676                // Show time filters
677                let time_filter_count: usize = time_filters_by_db.values().map(|v| v.len()).sum();
678                if time_filter_count > 0 {
679                    println!("Time-filtered tables: {}", time_filter_count);
680                    for (db, filters) in &time_filters_by_db {
681                        for (schema, table, column, window) in filters {
682                            let display = if schema == "public" {
683                                format!("{}.{}", db, table)
684                            } else {
685                                format!("{}.{}.{}", db, schema, table)
686                            };
687                            println!("  ⏱ {} ({} >= last {})", display, column, window);
688                        }
689                    }
690                    println!();
691                } else {
692                    println!("Time filters: none");
693                    println!();
694                }
695
696                println!("───────────────────────────────────────────────────────────────");
697                println!();
698
699                let confirmed = Confirm::new("Proceed with this configuration?")
700                    .with_default(true)
701                    .with_help_message("Enter confirm, Esc go back")
702                    .prompt();
703
704                match confirmed {
705                    Ok(true) => break, // Exit loop, proceed with replication
706                    Ok(false) | Err(inquire::InquireError::OperationCanceled) => {
707                        // Go back to time filters
708                        let last_db = selected_db_indices.len().saturating_sub(1);
709                        current_step = WizardStep::ConfigureTimeFiltersForDb(last_db);
710                    }
711                    Err(inquire::InquireError::OperationInterrupted) => {
712                        anyhow::bail!("Operation interrupted");
713                    }
714                    Err(e) => return Err(e.into()),
715                }
716            }
717        }
718    }
719
720    // Build final filter from selections
721    let selected_databases: Vec<String> = selected_db_indices
722        .iter()
723        .map(|&i| db_names[i].clone())
724        .collect();
725
726    let included_tables: Vec<String> = included_tables_by_db.values().flatten().cloned().collect();
727
728    tracing::info!("");
729    tracing::info!("✓ Configuration confirmed");
730    tracing::info!("");
731
732    // Use include_tables filter (3rd parameter)
733    let filter = if included_tables.is_empty() {
734        ReplicationFilter::new(Some(selected_databases), None, None, None)?
735    } else {
736        ReplicationFilter::new(Some(selected_databases), None, Some(included_tables), None)?
737    };
738
739    // Build TableRules from selections
740    let mut table_rules = TableRules::default();
741
742    // Add schema-only tables
743    for (db, tables) in &schema_only_by_db {
744        for (schema, table) in tables {
745            let qualified = QualifiedTable::new(Some(db.clone()), schema.clone(), table.clone());
746            table_rules.add_schema_only_table(qualified)?;
747        }
748    }
749
750    // Add time filters
751    for (db, filters) in &time_filters_by_db {
752        for (schema, table, column, window) in filters {
753            let qualified = QualifiedTable::new(Some(db.clone()), schema.clone(), table.clone());
754            table_rules.add_time_filter(qualified, column.clone(), window.clone())?;
755        }
756    }
757
758    Ok((filter, table_rules))
759}
760
761/// Get or cache table info for a database
762async fn get_or_cache_tables<'a>(
763    cache: &'a mut std::collections::HashMap<String, CachedDbTables>,
764    source_url: &str,
765    db_name: &str,
766) -> Result<&'a CachedDbTables> {
767    if !cache.contains_key(db_name) {
768        let db_url = replace_database_in_url(source_url, db_name)?;
769        let db_client = postgres::connect_with_retry(&db_url)
770            .await
771            .context(format!("Failed to connect to database '{}'", db_name))?;
772
773        let all_tables = migration::list_tables(&db_client)
774            .await
775            .context(format!("Failed to list tables from database '{}'", db_name))?;
776
777        let table_display_names: Vec<String> = all_tables
778            .iter()
779            .map(|t| {
780                if t.schema == "public" {
781                    t.name.clone()
782                } else {
783                    format!("{}.{}", t.schema, t.name)
784                }
785            })
786            .collect();
787
788        cache.insert(
789            db_name.to_string(),
790            CachedDbTables {
791                all_tables,
792                table_display_names,
793            },
794        );
795    }
796
797    Ok(cache.get(db_name).unwrap())
798}
799
800/// Print a formatted header for wizard steps
801fn print_header(title: &str) {
802    println!();
803    println!("╔{}╗", "═".repeat(62));
804    println!("║  {:<60}║", title);
805    println!("╚{}╝", "═".repeat(62));
806    println!();
807}
808
809/// Replace the database name in a PostgreSQL connection URL
810///
811/// # Arguments
812///
813/// * `url` - PostgreSQL connection URL
814/// * `new_db_name` - New database name to use
815///
816/// # Returns
817///
818/// URL with the database name replaced
819fn replace_database_in_url(url: &str, new_db_name: &str) -> Result<String> {
820    // Split into base URL and query parameters
821    let parts: Vec<&str> = url.splitn(2, '?').collect();
822    let base_url = parts[0];
823    let query_params = parts.get(1);
824
825    // Split base URL by '/' to replace the database name
826    let url_parts: Vec<&str> = base_url.rsplitn(2, '/').collect();
827
828    if url_parts.len() != 2 {
829        anyhow::bail!("Invalid connection URL format: cannot replace database name");
830    }
831
832    // Rebuild URL with new database name
833    let new_url = if let Some(params) = query_params {
834        format!("{}/{}?{}", url_parts[1], new_db_name, params)
835    } else {
836        format!("{}/{}", url_parts[1], new_db_name)
837    };
838
839    Ok(new_url)
840}
841
842pub fn get_api_key() -> anyhow::Result<String> {
843    use dialoguer::{theme::ColorfulTheme, Input};
844
845    // Try environment variable first
846    if let Ok(key) = std::env::var("SEREN_API_KEY") {
847        if !key.trim().is_empty() {
848            return Ok(key.trim().to_string());
849        }
850    }
851
852    // Prompt user interactively
853    println!("\nRemote execution requires a SerenDB API key for authentication.");
854    println!("\nYou can generate an API key at:");
855    println!("  https://console.serendb.com/api-keys\n");
856
857    let key: String = Input::with_theme(&ColorfulTheme::default())
858        .with_prompt("Enter your SerenDB API key")
859        .allow_empty(false)
860        .interact_text()?;
861
862    if key.trim().is_empty() {
863        anyhow::bail!(
864            "API key is required for remote execution.\n\
865            Set the SEREN_API_KEY environment variable or run interactively.\n\
866            Get your API key at: https://console.serendb.com/api-keys\n\
867            Or use --local to run replication on your machine instead"
868        );
869    }
870
871    Ok(key.trim().to_string())
872}
873
874#[cfg(test)]
875mod tests {
876    use super::*;
877
878    #[test]
879    fn test_replace_database_in_url() {
880        // Basic URL
881        let url = "postgresql://user:pass@localhost:5432/olddb";
882        let new_url = replace_database_in_url(url, "newdb").unwrap();
883        assert_eq!(new_url, "postgresql://user:pass@localhost:5432/newdb");
884
885        // URL with query parameters
886        let url = "postgresql://user:pass@localhost:5432/olddb?sslmode=require";
887        let new_url = replace_database_in_url(url, "newdb").unwrap();
888        assert_eq!(
889            new_url,
890            "postgresql://user:pass@localhost:5432/newdb?sslmode=require"
891        );
892
893        // URL without port
894        let url = "postgresql://user:pass@localhost/olddb";
895        let new_url = replace_database_in_url(url, "newdb").unwrap();
896        assert_eq!(new_url, "postgresql://user:pass@localhost/newdb");
897    }
898
899    #[tokio::test]
900    #[ignore]
901    async fn test_interactive_selection() {
902        // This test requires a real source database and manual interaction
903        let source_url = std::env::var("TEST_SOURCE_URL").unwrap();
904
905        let result = select_databases_and_tables(&source_url).await;
906
907        // This will only work with manual interaction
908        match &result {
909            Ok((filter, rules)) => {
910                println!("✓ Interactive selection completed");
911                println!("Filter: {:?}", filter);
912                println!("Rules: {:?}", rules);
913            }
914            Err(e) => {
915                println!("Interactive selection error: {:?}", e);
916            }
917        }
918    }
919}