database_replicator/
interactive.rs

1// ABOUTME: Interactive terminal UI for database and table selection
2// ABOUTME: Provides multi-step wizard with back navigation using inquire crate
3
4use crate::{
5    filters::ReplicationFilter,
6    migration, postgres,
7    serendb::{ConsoleClient, TargetState},
8    table_rules::{QualifiedTable, TableRules},
9};
10use anyhow::{Context, Result};
11use inquire::{Confirm, MultiSelect, Select, Text};
12
13/// Prompts the user to select a SerenDB project and database interactively.
14///
15/// This function will:
16/// 1. Get the SerenDB API key (from environment or prompt).
17/// 2. Fetch and display a list of projects for the user to select.
18/// 3. Fetch the default branch for the selected project.
19/// 4. Fetch and display a list of databases for the user to select.
20/// 5. Return the connection string and target state for the selected database.
21///
22/// # Returns
23///
24/// A `Result` containing a tuple of (connection_string, TargetState) for the selected database.
25/// The TargetState contains the project_id, branch_id, and database name needed for remote execution.
26pub async fn select_seren_database() -> Result<(String, TargetState)> {
27    print_header("Select SerenDB Target");
28
29    let api_key = get_api_key()?;
30    let client = ConsoleClient::new(None, api_key);
31
32    // 1. Select a project
33    let projects = client.list_projects().await?;
34    if projects.is_empty() {
35        anyhow::bail!("No projects found for your account.");
36    }
37    let project_names: Vec<String> = projects.iter().map(|p| p.name.clone()).collect();
38    let selected_project_name = Select::new("Select a project:", project_names).prompt()?;
39    let selected_project = projects
40        .into_iter()
41        .find(|p| p.name == selected_project_name)
42        .unwrap();
43
44    // 2. Select a branch
45    let branches = client.list_branches(&selected_project.id).await?;
46    if branches.is_empty() {
47        anyhow::bail!(
48            "Project '{}' has no branches. Create a branch first at console.serendb.com.",
49            selected_project.name
50        );
51    }
52
53    let branch = if branches.len() == 1 {
54        branches.into_iter().next().unwrap()
55    } else {
56        let branch_names: Vec<String> = branches.iter().map(|b| b.name.clone()).collect();
57        let selected_branch_name = Select::new("Select a branch:", branch_names).prompt()?;
58        branches
59            .into_iter()
60            .find(|b| b.name == selected_branch_name)
61            .unwrap()
62    };
63
64    // 3. Select or create a database
65    let databases = client
66        .list_databases(&selected_project.id, &branch.id)
67        .await?;
68
69    let selected_database_name = if databases.is_empty() {
70        // No databases exist - prompt for new database name
71        println!(
72            "\n  Project '{}' has no databases yet.",
73            selected_project.name
74        );
75        println!("  The database will be created during replication.\n");
76        Text::new("Enter database name to create:")
77            .with_default("serendb")
78            .prompt()?
79    } else {
80        // Let user select existing database or create new one
81        let mut database_names: Vec<String> = databases.iter().map(|db| db.name.clone()).collect();
82        database_names.push("[ Create new database ]".to_string());
83
84        let selection = Select::new("Select a database:", database_names).prompt()?;
85
86        if selection == "[ Create new database ]" {
87            Text::new("Enter database name to create:")
88                .with_default("serendb")
89                .prompt()?
90        } else {
91            selection
92        }
93    };
94
95    // 4. Get connection string
96    let conn_str = client
97        .get_connection_string(
98            &selected_project.id,
99            &branch.id,
100            &selected_database_name,
101            false,
102        )
103        .await?;
104
105    // 5. Build target state with placeholder source_url (will be updated later)
106    let target_state = TargetState::new(
107        selected_project.id.clone(),
108        selected_project.name.clone(),
109        branch.id.clone(),
110        branch.name.clone(),
111        vec![selected_database_name],
112        "", // Source URL not known yet, hash will be empty
113    );
114
115    Ok((conn_str, target_state))
116}
117
118/// Wizard step state machine
119enum WizardStep {
120    SelectDatabases,
121    SelectTablesForDb(usize), // index of current database in selected_dbs
122    SelectSchemaOnlyForDb(usize), // schema-only tables selection
123    ConfigureTimeFiltersForDb(usize), // time filter configuration
124    Review,
125}
126
127/// Cached table info for a database (to avoid repeated queries)
128struct CachedDbTables {
129    all_tables: Vec<migration::TableInfo>,
130    table_display_names: Vec<String>,
131}
132
133/// Interactive database and table selection with back navigation
134///
135/// Presents a terminal UI for selecting:
136/// 1. Which databases to replicate (multi-select)
137/// 2. For each selected database: tables to include (Enter = include all)
138/// 3. For each selected database: tables to replicate schema-only (no data)
139/// 4. For each selected database: time-based filters
140/// 5. Summary and confirmation
141///
142/// Supports back navigation:
143/// - Cancel/Esc from any step → go back to previous step
144///
145/// Returns a tuple of `(ReplicationFilter, TableRules)` representing the user's selections.
146///
147/// # Arguments
148///
149/// * `source_url` - PostgreSQL connection string for source database
150///
151/// # Returns
152///
153/// Returns `Ok((ReplicationFilter, TableRules))` with the user's selections or an error if:
154/// - Cannot connect to source database
155/// - Cannot discover databases or tables
156/// - User cancels the operation
157///
158/// # Examples
159///
160/// ```no_run
161/// # use anyhow::Result;
162/// # use database_replicator::interactive::select_databases_and_tables;
163/// # async fn example() -> Result<()> {
164/// let (filter, rules) = select_databases_and_tables(
165///     "postgresql://user:pass@source.example.com/postgres"
166/// ).await?;
167/// # Ok(())
168/// # }
169/// ```
170pub async fn select_databases_and_tables(
171    source_url: &str,
172) -> Result<(ReplicationFilter, TableRules)> {
173    tracing::info!("Starting interactive database and table selection...");
174    println!();
175
176    // Connect to source database
177    tracing::info!("Connecting to source database...");
178    let source_client = postgres::connect_with_retry(source_url)
179        .await
180        .context("Failed to connect to source database")?;
181    tracing::info!("✓ Connected to source");
182    println!();
183
184    // Discover databases
185    tracing::info!("Discovering databases on source...");
186    let all_databases = migration::list_databases(&source_client)
187        .await
188        .context("Failed to list databases on source")?;
189
190    if all_databases.is_empty() {
191        tracing::warn!("⚠ No user databases found on source");
192        tracing::warn!("  Source appears to contain only template databases");
193        return Ok((ReplicationFilter::empty(), TableRules::default()));
194    }
195
196    tracing::info!("✓ Found {} database(s)", all_databases.len());
197    println!();
198
199    let db_names: Vec<String> = all_databases.iter().map(|db| db.name.clone()).collect();
200
201    // State for wizard
202    let mut selected_db_indices: Vec<usize> = Vec::new();
203    let mut current_step = WizardStep::SelectDatabases;
204
205    // Track selections per database for back navigation
206    let mut included_tables_by_db: std::collections::HashMap<String, Vec<String>> =
207        std::collections::HashMap::new();
208    let mut schema_only_by_db: std::collections::HashMap<String, Vec<(String, String)>> =
209        std::collections::HashMap::new(); // (schema, table)
210    let mut time_filters_by_db: std::collections::HashMap<
211        String,
212        Vec<(String, String, String, String)>,
213    > = std::collections::HashMap::new(); // (schema, table, column, window)
214
215    // Cache table info per database to avoid repeated queries
216    let mut table_cache: std::collections::HashMap<String, CachedDbTables> =
217        std::collections::HashMap::new();
218
219    loop {
220        match current_step {
221            WizardStep::SelectDatabases => {
222                print_header("Step 1 of 5: Select Databases");
223                println!("Navigation: Space to toggle, Enter to confirm, Esc to cancel");
224                println!();
225
226                let defaults: Vec<usize> = selected_db_indices.clone();
227
228                let selections =
229                    MultiSelect::new("Select databases to replicate:", db_names.clone())
230                        .with_default(&defaults)
231                        .with_help_message("↑↓ navigate, Space toggle, Enter confirm")
232                        .prompt();
233
234                match selections {
235                    Ok(selected) => {
236                        // Convert selected names back to indices
237                        selected_db_indices = selected
238                            .iter()
239                            .filter_map(|name| db_names.iter().position(|n| n == name))
240                            .collect();
241
242                        if selected_db_indices.is_empty() {
243                            println!();
244                            println!("⚠ Please select at least one database");
245                            continue;
246                        }
247
248                        // Clear previous selections when re-selecting databases
249                        included_tables_by_db.clear();
250                        schema_only_by_db.clear();
251                        time_filters_by_db.clear();
252                        table_cache.clear();
253
254                        current_step = WizardStep::SelectTablesForDb(0);
255                    }
256                    Err(inquire::InquireError::OperationCanceled) => {
257                        anyhow::bail!("Operation cancelled by user");
258                    }
259                    Err(inquire::InquireError::OperationInterrupted) => {
260                        anyhow::bail!("Operation interrupted");
261                    }
262                    Err(e) => return Err(e.into()),
263                }
264            }
265
266            WizardStep::SelectTablesForDb(db_idx) => {
267                let db_name = &db_names[selected_db_indices[db_idx]].clone();
268                print_header(&format!(
269                    "Step 2 of 5: Select Tables to Include ({}/{})",
270                    db_idx + 1,
271                    selected_db_indices.len()
272                ));
273                println!("Database: {}", db_name);
274                println!("Press Enter without selecting to include ALL tables.");
275                println!("Navigation: Space to toggle, Enter to continue, Esc to go back");
276                println!();
277
278                // Get or cache tables for this database
279                let cached = get_or_cache_tables(&mut table_cache, source_url, db_name).await?;
280
281                if cached.all_tables.is_empty() {
282                    println!("  No tables found in database '{}'", db_name);
283                    // Skip to next database or next step
284                    if db_idx + 1 < selected_db_indices.len() {
285                        current_step = WizardStep::SelectTablesForDb(db_idx + 1);
286                    } else {
287                        current_step = WizardStep::SelectSchemaOnlyForDb(0);
288                    }
289                    continue;
290                }
291
292                // Get previously included tables for this database (for back navigation)
293                let previous_inclusions: Vec<usize> = included_tables_by_db
294                    .get(db_name)
295                    .map(|included| {
296                        included
297                            .iter()
298                            .filter_map(|t| {
299                                // Strip db name prefix to match display names
300                                let stripped =
301                                    t.strip_prefix(&format!("{}.", db_name)).unwrap_or(t);
302                                cached
303                                    .table_display_names
304                                    .iter()
305                                    .position(|n| n == stripped)
306                            })
307                            .collect()
308                    })
309                    .unwrap_or_default();
310
311                let selections = MultiSelect::new(
312                    "Select tables to INCLUDE (Enter = include all):",
313                    cached.table_display_names.clone(),
314                )
315                .with_default(&previous_inclusions)
316                .with_help_message("Space toggle, Enter confirm, Esc go back")
317                .prompt();
318
319                match selections {
320                    Ok(selected_inclusions) => {
321                        // If nothing selected, include all tables
322                        let db_inclusions: Vec<String> = if selected_inclusions.is_empty() {
323                            cached
324                                .table_display_names
325                                .iter()
326                                .map(|table_name| format!("{}.{}", db_name, table_name))
327                                .collect()
328                        } else {
329                            selected_inclusions
330                                .iter()
331                                .map(|table_name| format!("{}.{}", db_name, table_name))
332                                .collect()
333                        };
334
335                        // Store for back navigation
336                        included_tables_by_db.insert(db_name.clone(), db_inclusions);
337
338                        // Move to next database or schema-only step
339                        if db_idx + 1 < selected_db_indices.len() {
340                            current_step = WizardStep::SelectTablesForDb(db_idx + 1);
341                        } else {
342                            current_step = WizardStep::SelectSchemaOnlyForDb(0);
343                        }
344                    }
345                    Err(inquire::InquireError::OperationCanceled) => {
346                        // Go back to previous step
347                        if db_idx > 0 {
348                            current_step = WizardStep::SelectTablesForDb(db_idx - 1);
349                        } else {
350                            current_step = WizardStep::SelectDatabases;
351                        }
352                    }
353                    Err(inquire::InquireError::OperationInterrupted) => {
354                        anyhow::bail!("Operation interrupted");
355                    }
356                    Err(e) => return Err(e.into()),
357                }
358            }
359
360            WizardStep::SelectSchemaOnlyForDb(db_idx) => {
361                let db_name = &db_names[selected_db_indices[db_idx]].clone();
362                print_header(&format!(
363                    "Step 3 of 5: Schema-Only Tables ({}/{})",
364                    db_idx + 1,
365                    selected_db_indices.len()
366                ));
367                println!("Database: {}", db_name);
368                println!("Schema-only tables replicate structure but NO data.");
369                println!("Navigation: Space to toggle, Enter to continue, Esc to go back");
370                println!();
371
372                let cached = get_or_cache_tables(&mut table_cache, source_url, db_name).await?;
373
374                if cached.all_tables.is_empty() {
375                    // Skip to next database or time filters
376                    if db_idx + 1 < selected_db_indices.len() {
377                        current_step = WizardStep::SelectSchemaOnlyForDb(db_idx + 1);
378                    } else {
379                        current_step = WizardStep::ConfigureTimeFiltersForDb(0);
380                    }
381                    continue;
382                }
383
384                // Filter to only included tables
385                let included = included_tables_by_db.get(db_name);
386                let available_tables: Vec<(usize, String)> = cached
387                    .table_display_names
388                    .iter()
389                    .enumerate()
390                    .filter(|(_, name)| {
391                        let full_name = format!("{}.{}", db_name, name);
392                        included.is_some_and(|inc| inc.contains(&full_name))
393                    })
394                    .map(|(idx, name)| (idx, name.clone()))
395                    .collect();
396
397                if available_tables.is_empty() {
398                    println!("  No tables included from '{}'", db_name);
399                    if db_idx + 1 < selected_db_indices.len() {
400                        current_step = WizardStep::SelectSchemaOnlyForDb(db_idx + 1);
401                    } else {
402                        current_step = WizardStep::ConfigureTimeFiltersForDb(0);
403                    }
404                    continue;
405                }
406
407                let available_names: Vec<String> =
408                    available_tables.iter().map(|(_, n)| n.clone()).collect();
409
410                // Get previous schema-only selections
411                let previous_schema_only: Vec<usize> = schema_only_by_db
412                    .get(db_name)
413                    .map(|selected| {
414                        selected
415                            .iter()
416                            .filter_map(|(schema, table)| {
417                                let display = if schema == "public" {
418                                    table.clone()
419                                } else {
420                                    format!("{}.{}", schema, table)
421                                };
422                                available_names.iter().position(|n| n == &display)
423                            })
424                            .collect()
425                    })
426                    .unwrap_or_default();
427
428                let selections = MultiSelect::new(
429                    "Select tables to replicate SCHEMA-ONLY (no data):",
430                    available_names.clone(),
431                )
432                .with_default(&previous_schema_only)
433                .with_help_message("Space toggle, Enter confirm, Esc go back")
434                .prompt();
435
436                match selections {
437                    Ok(selected_schema_only) => {
438                        // Convert to (schema, table) pairs
439                        let schema_only_tables: Vec<(String, String)> = selected_schema_only
440                            .iter()
441                            .filter_map(|display_name| {
442                                available_tables
443                                    .iter()
444                                    .find(|(_, n)| n == display_name)
445                                    .map(|(idx, _)| {
446                                        let t = &cached.all_tables[*idx];
447                                        (t.schema.clone(), t.name.clone())
448                                    })
449                            })
450                            .collect();
451
452                        schema_only_by_db.insert(db_name.clone(), schema_only_tables);
453
454                        if db_idx + 1 < selected_db_indices.len() {
455                            current_step = WizardStep::SelectSchemaOnlyForDb(db_idx + 1);
456                        } else {
457                            current_step = WizardStep::ConfigureTimeFiltersForDb(0);
458                        }
459                    }
460                    Err(inquire::InquireError::OperationCanceled) => {
461                        // Go back
462                        if db_idx > 0 {
463                            current_step = WizardStep::SelectSchemaOnlyForDb(db_idx - 1);
464                        } else {
465                            let last_db = selected_db_indices.len().saturating_sub(1);
466                            current_step = WizardStep::SelectTablesForDb(last_db);
467                        }
468                    }
469                    Err(inquire::InquireError::OperationInterrupted) => {
470                        anyhow::bail!("Operation interrupted");
471                    }
472                    Err(e) => return Err(e.into()),
473                }
474            }
475
476            WizardStep::ConfigureTimeFiltersForDb(db_idx) => {
477                let db_name = &db_names[selected_db_indices[db_idx]].clone();
478                print_header(&format!(
479                    "Step 4 of 5: Time Filters ({}/{})",
480                    db_idx + 1,
481                    selected_db_indices.len()
482                ));
483                println!("Database: {}", db_name);
484                println!("Time filters limit data to recent records (e.g., last 90 days).");
485                println!();
486
487                let cached = get_or_cache_tables(&mut table_cache, source_url, db_name).await?;
488
489                if cached.all_tables.is_empty() {
490                    if db_idx + 1 < selected_db_indices.len() {
491                        current_step = WizardStep::ConfigureTimeFiltersForDb(db_idx + 1);
492                    } else {
493                        current_step = WizardStep::Review;
494                    }
495                    continue;
496                }
497
498                // Filter to included tables, excluding schema-only ones
499                let included = included_tables_by_db.get(db_name);
500                let schema_only = schema_only_by_db.get(db_name);
501                let available_tables: Vec<(usize, String)> = cached
502                    .table_display_names
503                    .iter()
504                    .enumerate()
505                    .filter(|(idx, name)| {
506                        let full_name = format!("{}.{}", db_name, name);
507                        let is_included = included.is_some_and(|inc| inc.contains(&full_name));
508                        let t = &cached.all_tables[*idx];
509                        let is_schema_only = schema_only.is_some_and(|so| {
510                            so.iter().any(|(s, n)| s == &t.schema && n == &t.name)
511                        });
512                        is_included && !is_schema_only
513                    })
514                    .map(|(idx, name)| (idx, name.clone()))
515                    .collect();
516
517                if available_tables.is_empty() {
518                    println!("  No tables available for time filtering in '{}'", db_name);
519                    if db_idx + 1 < selected_db_indices.len() {
520                        current_step = WizardStep::ConfigureTimeFiltersForDb(db_idx + 1);
521                    } else {
522                        current_step = WizardStep::Review;
523                    }
524                    continue;
525                }
526
527                // Ask if user wants to configure time filters
528                let configure = Confirm::new("Configure time-based filters for this database?")
529                    .with_default(false)
530                    .with_help_message("Enter to confirm, Esc to go back")
531                    .prompt();
532
533                match configure {
534                    Ok(true) => {
535                        // Let user select tables to filter
536                        let available_names: Vec<String> =
537                            available_tables.iter().map(|(_, n)| n.clone()).collect();
538
539                        let table_selections = MultiSelect::new(
540                            "Select tables to apply time filter:",
541                            available_names.clone(),
542                        )
543                        .with_help_message("Space toggle, Enter confirm")
544                        .prompt();
545
546                        match table_selections {
547                            Ok(selected_tables) => {
548                                let mut time_filters: Vec<(String, String, String, String)> =
549                                    Vec::new();
550
551                                for display_name in &selected_tables {
552                                    if let Some((idx, _)) =
553                                        available_tables.iter().find(|(_, n)| n == display_name)
554                                    {
555                                        let t = &cached.all_tables[*idx];
556                                        let db_url = replace_database_in_url(source_url, db_name)?;
557                                        let db_client = postgres::connect_with_retry(&db_url)
558                                            .await
559                                            .context("Failed to connect for column query")?;
560
561                                        // Get timestamp columns
562                                        let columns = migration::get_table_columns(
563                                            &db_client, &t.schema, &t.name,
564                                        )
565                                        .await?;
566
567                                        let timestamp_columns: Vec<String> = columns
568                                            .iter()
569                                            .filter(|c| c.is_timestamp)
570                                            .map(|c| format!("{} ({})", c.name, c.data_type))
571                                            .collect();
572
573                                        println!();
574                                        println!("Configure time filter for '{}':", display_name);
575
576                                        let column = if timestamp_columns.is_empty() {
577                                            println!(
578                                                "  ⚠ No timestamp columns found. Enter column name manually."
579                                            );
580                                            Text::new("  Column name:")
581                                                .with_default("created_at")
582                                                .prompt()
583                                                .context("Failed to get column name")?
584                                        } else {
585                                            let mut options = timestamp_columns.clone();
586                                            options.push("[Enter custom column name]".to_string());
587
588                                            let selection =
589                                                Select::new("  Select timestamp column:", options)
590                                                    .prompt()
591                                                    .context("Failed to select column")?;
592
593                                            if selection == "[Enter custom column name]" {
594                                                Text::new("  Column name:")
595                                                    .prompt()
596                                                    .context("Failed to get column name")?
597                                            } else {
598                                                // Extract column name from "name (type)" format
599                                                selection
600                                                    .split(" (")
601                                                    .next()
602                                                    .unwrap_or(&selection)
603                                                    .to_string()
604                                            }
605                                        };
606
607                                        let window = Text::new(
608                                            "  Time window (e.g., '90 days', '6 months', '1 year'):",
609                                        )
610                                        .with_default("90 days")
611                                        .prompt()
612                                        .context("Failed to get time window")?;
613
614                                        time_filters.push((
615                                            t.schema.clone(),
616                                            t.name.clone(),
617                                            column,
618                                            window,
619                                        ));
620                                    }
621                                }
622
623                                time_filters_by_db.insert(db_name.clone(), time_filters);
624                            }
625                            Err(inquire::InquireError::OperationCanceled) => {
626                                // Stay on this step
627                                continue;
628                            }
629                            Err(inquire::InquireError::OperationInterrupted) => {
630                                anyhow::bail!("Operation interrupted");
631                            }
632                            Err(e) => return Err(e.into()),
633                        }
634
635                        if db_idx + 1 < selected_db_indices.len() {
636                            current_step = WizardStep::ConfigureTimeFiltersForDb(db_idx + 1);
637                        } else {
638                            current_step = WizardStep::Review;
639                        }
640                    }
641                    Ok(false) => {
642                        // Skip time filters for this database
643                        if db_idx + 1 < selected_db_indices.len() {
644                            current_step = WizardStep::ConfigureTimeFiltersForDb(db_idx + 1);
645                        } else {
646                            current_step = WizardStep::Review;
647                        }
648                    }
649                    Err(inquire::InquireError::OperationCanceled) => {
650                        // Go back
651                        if db_idx > 0 {
652                            current_step = WizardStep::ConfigureTimeFiltersForDb(db_idx - 1);
653                        } else {
654                            let last_db = selected_db_indices.len().saturating_sub(1);
655                            current_step = WizardStep::SelectSchemaOnlyForDb(last_db);
656                        }
657                    }
658                    Err(inquire::InquireError::OperationInterrupted) => {
659                        anyhow::bail!("Operation interrupted");
660                    }
661                    Err(e) => return Err(e.into()),
662                }
663            }
664
665            WizardStep::Review => {
666                print_header("Step 5 of 5: Review Configuration");
667
668                // Collect all inclusions
669                let included_tables: Vec<String> =
670                    included_tables_by_db.values().flatten().cloned().collect();
671
672                let selected_databases: Vec<String> = selected_db_indices
673                    .iter()
674                    .map(|&i| db_names[i].clone())
675                    .collect();
676
677                println!();
678                println!("Databases to replicate: {}", selected_databases.len());
679                for db in &selected_databases {
680                    println!("  ✓ {}", db);
681                }
682                println!();
683
684                println!("Tables to replicate: {}", included_tables.len());
685                if included_tables.len() <= 20 {
686                    for table in &included_tables {
687                        println!("  ✓ {}", table);
688                    }
689                } else {
690                    // Show first 10 and last 5 with ellipsis
691                    for table in included_tables.iter().take(10) {
692                        println!("  ✓ {}", table);
693                    }
694                    println!("  ... ({} more tables)", included_tables.len() - 15);
695                    for table in included_tables.iter().skip(included_tables.len() - 5) {
696                        println!("  ✓ {}", table);
697                    }
698                }
699                println!();
700
701                // Show schema-only tables
702                let schema_only_count: usize = schema_only_by_db.values().map(|v| v.len()).sum();
703                if schema_only_count > 0 {
704                    println!("Schema-only tables (no data): {}", schema_only_count);
705                    for (db, tables) in &schema_only_by_db {
706                        for (schema, table) in tables {
707                            let display = if schema == "public" {
708                                format!("{}.{}", db, table)
709                            } else {
710                                format!("{}.{}.{}", db, schema, table)
711                            };
712                            println!("  ◇ {}", display);
713                        }
714                    }
715                    println!();
716                } else {
717                    println!("Schema-only tables: none");
718                    println!();
719                }
720
721                // Show time filters
722                let time_filter_count: usize = time_filters_by_db.values().map(|v| v.len()).sum();
723                if time_filter_count > 0 {
724                    println!("Time-filtered tables: {}", time_filter_count);
725                    for (db, filters) in &time_filters_by_db {
726                        for (schema, table, column, window) in filters {
727                            let display = if schema == "public" {
728                                format!("{}.{}", db, table)
729                            } else {
730                                format!("{}.{}.{}", db, schema, table)
731                            };
732                            println!("  ⏱ {} ({} >= last {})", display, column, window);
733                        }
734                    }
735                    println!();
736                } else {
737                    println!("Time filters: none");
738                    println!();
739                }
740
741                println!("───────────────────────────────────────────────────────────────");
742                println!();
743
744                let confirmed = Confirm::new("Proceed with this configuration?")
745                    .with_default(true)
746                    .with_help_message("Enter confirm, Esc go back")
747                    .prompt();
748
749                match confirmed {
750                    Ok(true) => break, // Exit loop, proceed with replication
751                    Ok(false) | Err(inquire::InquireError::OperationCanceled) => {
752                        // Go back to time filters
753                        let last_db = selected_db_indices.len().saturating_sub(1);
754                        current_step = WizardStep::ConfigureTimeFiltersForDb(last_db);
755                    }
756                    Err(inquire::InquireError::OperationInterrupted) => {
757                        anyhow::bail!("Operation interrupted");
758                    }
759                    Err(e) => return Err(e.into()),
760                }
761            }
762        }
763    }
764
765    // Build final filter from selections
766    let selected_databases: Vec<String> = selected_db_indices
767        .iter()
768        .map(|&i| db_names[i].clone())
769        .collect();
770
771    let included_tables: Vec<String> = included_tables_by_db.values().flatten().cloned().collect();
772
773    tracing::info!("");
774    tracing::info!("✓ Configuration confirmed");
775    tracing::info!("");
776
777    // Use include_tables filter (3rd parameter)
778    let filter = if included_tables.is_empty() {
779        ReplicationFilter::new(Some(selected_databases), None, None, None)?
780    } else {
781        ReplicationFilter::new(Some(selected_databases), None, Some(included_tables), None)?
782    };
783
784    // Build TableRules from selections
785    let mut table_rules = TableRules::default();
786
787    // Add schema-only tables
788    for (db, tables) in &schema_only_by_db {
789        for (schema, table) in tables {
790            let qualified = QualifiedTable::new(Some(db.clone()), schema.clone(), table.clone());
791            table_rules.add_schema_only_table(qualified)?;
792        }
793    }
794
795    // Add time filters
796    for (db, filters) in &time_filters_by_db {
797        for (schema, table, column, window) in filters {
798            let qualified = QualifiedTable::new(Some(db.clone()), schema.clone(), table.clone());
799            table_rules.add_time_filter(qualified, column.clone(), window.clone())?;
800        }
801    }
802
803    Ok((filter, table_rules))
804}
805
806/// Get or cache table info for a database
807async fn get_or_cache_tables<'a>(
808    cache: &'a mut std::collections::HashMap<String, CachedDbTables>,
809    source_url: &str,
810    db_name: &str,
811) -> Result<&'a CachedDbTables> {
812    if !cache.contains_key(db_name) {
813        let db_url = replace_database_in_url(source_url, db_name)?;
814        let db_client = postgres::connect_with_retry(&db_url)
815            .await
816            .context(format!("Failed to connect to database '{}'", db_name))?;
817
818        let all_tables = migration::list_tables(&db_client)
819            .await
820            .context(format!("Failed to list tables from database '{}'", db_name))?;
821
822        let table_display_names: Vec<String> = all_tables
823            .iter()
824            .map(|t| {
825                if t.schema == "public" {
826                    t.name.clone()
827                } else {
828                    format!("{}.{}", t.schema, t.name)
829                }
830            })
831            .collect();
832
833        cache.insert(
834            db_name.to_string(),
835            CachedDbTables {
836                all_tables,
837                table_display_names,
838            },
839        );
840    }
841
842    Ok(cache.get(db_name).unwrap())
843}
844
845/// Print a formatted header for wizard steps
846fn print_header(title: &str) {
847    println!();
848    println!("╔{}╗", "═".repeat(62));
849    println!("║  {:<60}║", title);
850    println!("╚{}╝", "═".repeat(62));
851    println!();
852}
853
854/// Replace the database name in a PostgreSQL connection URL
855///
856/// # Arguments
857///
858/// * `url` - PostgreSQL connection URL
859/// * `new_db_name` - New database name to use
860///
861/// # Returns
862///
863/// URL with the database name replaced
864fn replace_database_in_url(url: &str, new_db_name: &str) -> Result<String> {
865    // Split into base URL and query parameters
866    let parts: Vec<&str> = url.splitn(2, '?').collect();
867    let base_url = parts[0];
868    let query_params = parts.get(1);
869
870    // Split base URL by '/' to replace the database name
871    let url_parts: Vec<&str> = base_url.rsplitn(2, '/').collect();
872
873    if url_parts.len() != 2 {
874        anyhow::bail!("Invalid connection URL format: cannot replace database name");
875    }
876
877    // Rebuild URL with new database name
878    let new_url = if let Some(params) = query_params {
879        format!("{}/{}?{}", url_parts[1], new_db_name, params)
880    } else {
881        format!("{}/{}", url_parts[1], new_db_name)
882    };
883
884    Ok(new_url)
885}
886
887pub fn get_api_key() -> anyhow::Result<String> {
888    use dialoguer::{theme::ColorfulTheme, Input};
889
890    // Try environment variable first
891    if let Ok(key) = std::env::var("SEREN_API_KEY") {
892        if !key.trim().is_empty() {
893            return Ok(key.trim().to_string());
894        }
895    }
896
897    // Prompt user interactively
898    println!("\nRemote execution requires a SerenDB API key for authentication.");
899    println!("\nYou can generate an API key at:");
900    println!("  https://console.serendb.com/api-keys\n");
901
902    let key: String = Input::with_theme(&ColorfulTheme::default())
903        .with_prompt("Enter your SerenDB API key")
904        .allow_empty(false)
905        .interact_text()?;
906
907    if key.trim().is_empty() {
908        anyhow::bail!(
909            "API key is required for remote execution.\n\
910            Set the SEREN_API_KEY environment variable or run interactively.\n\
911            Get your API key at: https://console.serendb.com/api-keys\n\
912            Or use --local to run replication on your machine instead"
913        );
914    }
915
916    let trimmed_key = key.trim().to_string();
917
918    // Cache the key in env var so subsequent calls don't prompt again
919    std::env::set_var("SEREN_API_KEY", &trimmed_key);
920
921    Ok(trimmed_key)
922}
923
924#[cfg(test)]
925mod tests {
926    use super::*;
927
928    #[test]
929    fn test_replace_database_in_url() {
930        // Basic URL
931        let url = "postgresql://user:pass@localhost:5432/olddb";
932        let new_url = replace_database_in_url(url, "newdb").unwrap();
933        assert_eq!(new_url, "postgresql://user:pass@localhost:5432/newdb");
934
935        // URL with query parameters
936        let url = "postgresql://user:pass@localhost:5432/olddb?sslmode=require";
937        let new_url = replace_database_in_url(url, "newdb").unwrap();
938        assert_eq!(
939            new_url,
940            "postgresql://user:pass@localhost:5432/newdb?sslmode=require"
941        );
942
943        // URL without port
944        let url = "postgresql://user:pass@localhost/olddb";
945        let new_url = replace_database_in_url(url, "newdb").unwrap();
946        assert_eq!(new_url, "postgresql://user:pass@localhost/newdb");
947    }
948
949    #[tokio::test]
950    #[ignore]
951    async fn test_interactive_selection() {
952        // This test requires a real source database and manual interaction
953        let source_url = std::env::var("TEST_SOURCE_URL").unwrap();
954
955        let result = select_databases_and_tables(&source_url).await;
956
957        // This will only work with manual interaction
958        match &result {
959            Ok((filter, rules)) => {
960                println!("✓ Interactive selection completed");
961                println!("Filter: {:?}", filter);
962                println!("Rules: {:?}", rules);
963            }
964            Err(e) => {
965                println!("Interactive selection error: {:?}", e);
966            }
967        }
968    }
969}