database_replicator/
interactive.rs

1// ABOUTME: Interactive terminal UI for database and table selection
2// ABOUTME: Provides multi-select interface for selective replication and table rules
3
4use crate::{
5    filters::ReplicationFilter,
6    migration, postgres,
7    table_rules::{QualifiedTable, TableRules},
8};
9use anyhow::{Context, Result};
10use dialoguer::{theme::ColorfulTheme, Confirm, Input, MultiSelect, Select};
11
12/// Interactive database and table selection with advanced filtering
13///
14/// Presents a terminal UI for selecting:
15/// 1. Which databases to replicate (multi-select)
16/// 2. For each selected database:
17///    - Which tables to exclude entirely
18///    - Which tables to replicate schema-only (no data)
19///    - Which tables to apply time-based filters
20/// 3. Summary and confirmation
21///
22/// Returns a tuple of `(ReplicationFilter, TableRules)` representing the user's selections.
23///
24/// # Arguments
25///
26/// * `source_url` - PostgreSQL connection string for source database
27///
28/// # Returns
29///
30/// Returns `Ok((ReplicationFilter, TableRules))` with the user's selections or an error if:
31/// - Cannot connect to source database
32/// - Cannot discover databases or tables
33/// - User cancels the operation
34///
35/// # Examples
36///
37/// ```no_run
38/// # use anyhow::Result;
39/// # use database_replicator::interactive::select_databases_and_tables;
40/// # async fn example() -> Result<()> {
41/// let (filter, rules) = select_databases_and_tables(
42///     "postgresql://user:pass@source.example.com/postgres"
43/// ).await?;
44/// # Ok(())
45/// # }
46/// ```
47pub async fn select_databases_and_tables(
48    source_url: &str,
49) -> Result<(ReplicationFilter, TableRules)> {
50    tracing::info!("Starting interactive database and table selection...");
51    tracing::info!("");
52
53    // Connect to source database
54    tracing::info!("Connecting to source database...");
55    let source_client = postgres::connect_with_retry(source_url)
56        .await
57        .context("Failed to connect to source database")?;
58    tracing::info!("✓ Connected to source");
59    tracing::info!("");
60
61    // Discover databases
62    tracing::info!("Discovering databases on source...");
63    let all_databases = migration::list_databases(&source_client)
64        .await
65        .context("Failed to list databases on source")?;
66
67    if all_databases.is_empty() {
68        tracing::warn!("⚠ No user databases found on source");
69        tracing::warn!("  Source appears to contain only template databases");
70        return Ok((ReplicationFilter::empty(), TableRules::default()));
71    }
72
73    tracing::info!("✓ Found {} database(s)", all_databases.len());
74    tracing::info!("");
75
76    // Step 1: Select databases to replicate
77    println!("Select databases to replicate:");
78    println!("(Use arrow keys to navigate, Space to select, Enter to confirm)");
79    println!();
80
81    let db_names: Vec<String> = all_databases.iter().map(|db| db.name.clone()).collect();
82
83    let db_selections = MultiSelect::with_theme(&ColorfulTheme::default())
84        .items(&db_names)
85        .interact()
86        .context("Failed to get database selection")?;
87
88    if db_selections.is_empty() {
89        tracing::warn!("⚠ No databases selected");
90        tracing::info!("  Cancelling interactive selection");
91        return Ok((ReplicationFilter::empty(), TableRules::default()));
92    }
93
94    let selected_databases: Vec<String> = db_selections
95        .iter()
96        .map(|&idx| db_names[idx].clone())
97        .collect();
98
99    tracing::info!("");
100    tracing::info!("✓ Selected {} database(s):", selected_databases.len());
101    for db in &selected_databases {
102        tracing::info!("  - {}", db);
103    }
104    tracing::info!("");
105
106    // Step 2: For each selected database, configure table-level rules
107    let mut excluded_tables: Vec<String> = Vec::new();
108    let mut table_rules = TableRules::default();
109
110    for db_name in &selected_databases {
111        // Build database-specific connection URL
112        let db_url = replace_database_in_url(source_url, db_name)
113            .context(format!("Failed to build URL for database '{}'", db_name))?;
114
115        // Connect to the specific database
116        tracing::info!("Discovering tables in database '{}'...", db_name);
117        let db_client = postgres::connect_with_retry(&db_url)
118            .await
119            .context(format!("Failed to connect to database '{}'", db_name))?;
120
121        let all_tables = migration::list_tables(&db_client)
122            .await
123            .context(format!("Failed to list tables from database '{}'", db_name))?;
124
125        if all_tables.is_empty() {
126            tracing::info!("  No tables found in database '{}'", db_name);
127            tracing::info!("");
128            continue;
129        }
130
131        tracing::info!("✓ Found {} table(s) in '{}'", all_tables.len(), db_name);
132        tracing::info!("");
133
134        // Format table names for display
135        let table_display_names: Vec<String> = all_tables
136            .iter()
137            .map(|t| {
138                if t.schema == "public" {
139                    t.name.clone()
140                } else {
141                    format!("{}.{}", t.schema, t.name)
142                }
143            })
144            .collect();
145
146        println!(
147            "Select tables to EXCLUDE from '{}' (or press Enter to include all):",
148            db_name
149        );
150        println!("(Use arrow keys to navigate, Space to select, Enter to confirm)");
151        println!();
152
153        let table_exclusions = MultiSelect::with_theme(&ColorfulTheme::default())
154            .items(&table_display_names)
155            .interact()
156            .context(format!(
157                "Failed to get table exclusion selection for database '{}'",
158                db_name
159            ))?;
160
161        // Track which tables are excluded
162        let excluded_indices: std::collections::HashSet<usize> =
163            table_exclusions.iter().copied().collect();
164
165        if !table_exclusions.is_empty() {
166            let excluded_in_db: Vec<String> = table_exclusions
167                .iter()
168                .map(|&idx| {
169                    // Build full table name in "database.table" format
170                    format!("{}.{}", db_name, table_display_names[idx])
171                })
172                .collect();
173
174            tracing::info!("");
175            tracing::info!(
176                "✓ Excluding {} table(s) from '{}':",
177                excluded_in_db.len(),
178                db_name
179            );
180            for table in &excluded_in_db {
181                tracing::info!("  - {}", table);
182            }
183
184            excluded_tables.extend(excluded_in_db);
185        } else {
186            tracing::info!("");
187            tracing::info!("✓ Including all tables from '{}'", db_name);
188        }
189
190        tracing::info!("");
191
192        // Step 2a: Select tables for schema-only replication (from non-excluded tables)
193        let remaining_tables: Vec<(usize, String)> = table_display_names
194            .iter()
195            .enumerate()
196            .filter(|(idx, _)| !excluded_indices.contains(idx))
197            .map(|(idx, name)| (idx, name.clone()))
198            .collect();
199
200        if !remaining_tables.is_empty() {
201            let remaining_names: Vec<String> = remaining_tables
202                .iter()
203                .map(|(_, name)| name.clone())
204                .collect();
205
206            println!(
207                "Select tables to replicate SCHEMA-ONLY (no data) from '{}' (or press Enter to skip):",
208                db_name
209            );
210            println!("(Use arrow keys to navigate, Space to select, Enter to confirm)");
211            println!();
212
213            let schema_only_selections = MultiSelect::with_theme(&ColorfulTheme::default())
214                .items(&remaining_names)
215                .interact()
216                .context(format!(
217                    "Failed to get schema-only selection for database '{}'",
218                    db_name
219                ))?;
220
221            if !schema_only_selections.is_empty() {
222                tracing::info!("");
223                tracing::info!(
224                    "✓ Schema-only replication for {} table(s) from '{}':",
225                    schema_only_selections.len(),
226                    db_name
227                );
228
229                for &selection_idx in &schema_only_selections {
230                    let (original_idx, display_name) = &remaining_tables[selection_idx];
231                    let table_info = &all_tables[*original_idx];
232
233                    tracing::info!("  - {}", display_name);
234
235                    // Add to table rules
236                    let qualified = QualifiedTable::new(
237                        Some(db_name.clone()),
238                        table_info.schema.clone(),
239                        table_info.name.clone(),
240                    );
241                    table_rules.add_schema_only_table(qualified)?;
242                }
243            }
244
245            tracing::info!("");
246
247            // Step 2b: Configure time filters for remaining tables (not excluded, not schema-only)
248            let schema_only_indices: std::collections::HashSet<usize> = schema_only_selections
249                .iter()
250                .map(|&sel_idx| remaining_tables[sel_idx].0)
251                .collect();
252
253            let tables_for_time_filter: Vec<(usize, String)> = remaining_tables
254                .iter()
255                .filter(|(idx, _)| !schema_only_indices.contains(idx))
256                .cloned()
257                .collect();
258
259            if !tables_for_time_filter.is_empty() {
260                let confirm_time_filters = Confirm::with_theme(&ColorfulTheme::default())
261                    .with_prompt(format!(
262                        "Configure time-based filters for tables in '{}'?",
263                        db_name
264                    ))
265                    .default(false)
266                    .interact()
267                    .context("Failed to get time filter confirmation")?;
268
269                if confirm_time_filters {
270                    tracing::info!("");
271                    tracing::info!("Configuring time filters for '{}'...", db_name);
272
273                    for (original_idx, display_name) in &tables_for_time_filter {
274                        let table_info = &all_tables[*original_idx];
275
276                        let apply_filter = Confirm::with_theme(&ColorfulTheme::default())
277                            .with_prompt(format!("Apply time filter to '{}'?", display_name))
278                            .default(false)
279                            .interact()
280                            .context("Failed to get time filter confirmation")?;
281
282                        if apply_filter {
283                            // Query table columns to show actual timestamp columns
284                            let db_client = postgres::connect_with_retry(&db_url)
285                                .await
286                                .with_context(|| {
287                                    format!(
288                                        "Failed to connect to database '{}' for column query",
289                                        db_name
290                                    )
291                                })?;
292
293                            let columns = migration::get_table_columns(
294                                &db_client,
295                                &table_info.schema,
296                                &table_info.name,
297                            )
298                            .await?;
299
300                            // Filter for timestamp columns and build display strings
301                            let timestamp_columns: Vec<(String, String)> = columns
302                                .iter()
303                                .filter(|col| col.is_timestamp)
304                                .map(|col| {
305                                    let display = format!("{} ({})", col.name, col.data_type);
306                                    (col.name.clone(), display)
307                                })
308                                .collect();
309
310                            // Prompt for column selection
311                            let column = if timestamp_columns.is_empty() {
312                                // No timestamp columns found - fall back to manual entry
313                                tracing::warn!(
314                                    "  ⚠ No timestamp columns found in table '{}'. Please enter column name manually.",
315                                    display_name
316                                );
317                                Input::with_theme(&ColorfulTheme::default())
318                                    .with_prompt("  Column name")
319                                    .default("created_at".to_string())
320                                    .interact_text()
321                                    .context("Failed to get column name")?
322                            } else {
323                                // Show timestamp columns in a selection list
324                                let display_options: Vec<String> = timestamp_columns
325                                    .iter()
326                                    .map(|(_, display)| display.clone())
327                                    .chain(std::iter::once(
328                                        "[Enter custom column name]".to_string(),
329                                    ))
330                                    .collect();
331
332                                let selection = Select::with_theme(&ColorfulTheme::default())
333                                    .with_prompt("  Select timestamp column for filtering")
334                                    .items(&display_options)
335                                    .default(0)
336                                    .interact()
337                                    .context("Failed to select timestamp column")?;
338
339                                if selection < timestamp_columns.len() {
340                                    // User selected a timestamp column
341                                    timestamp_columns[selection].0.clone()
342                                } else {
343                                    // User chose "custom" option
344                                    Input::with_theme(&ColorfulTheme::default())
345                                        .with_prompt("  Column name")
346                                        .interact_text()
347                                        .context("Failed to get custom column name")?
348                                }
349                            };
350
351                            // Prompt for time window
352                            let window: String = Input::with_theme(&ColorfulTheme::default())
353                                .with_prompt(
354                                    "  How far back to replicate data (e.g., '2 months', '90 days', '1 year')",
355                                )
356                                .default("2 months".to_string())
357                                .interact_text()
358                                .context("Failed to get time window")?;
359
360                            tracing::info!(
361                                "  ✓ Time filter for '{}': {} >= NOW() - INTERVAL '{}'",
362                                display_name,
363                                column,
364                                window
365                            );
366
367                            // Add to table rules
368                            let qualified = QualifiedTable::new(
369                                Some(db_name.clone()),
370                                table_info.schema.clone(),
371                                table_info.name.clone(),
372                            );
373                            table_rules.add_time_filter(qualified, column, window)?;
374                        }
375                    }
376                }
377            }
378        }
379
380        tracing::info!("");
381    }
382
383    // Step 3: Show summary and confirm
384    println!();
385    println!("========================================");
386    println!("Replication Configuration Summary");
387    println!("========================================");
388    println!();
389    println!("Databases to replicate: {}", selected_databases.len());
390    for db in &selected_databases {
391        println!("  ✓ {}", db);
392    }
393    println!();
394
395    if !excluded_tables.is_empty() {
396        println!("Tables to exclude: {}", excluded_tables.len());
397        for table in &excluded_tables {
398            println!("  ✗ {}", table);
399        }
400        println!();
401    }
402
403    // Show schema-only tables
404    let mut schema_only_count = 0;
405    for db in &selected_databases {
406        schema_only_count += table_rules.schema_only_tables(db).len();
407    }
408    if schema_only_count > 0 {
409        println!(
410            "Schema-only tables (DDL only, no data): {}",
411            schema_only_count
412        );
413        for db in &selected_databases {
414            let schema_only = table_rules.schema_only_tables(db);
415            if !schema_only.is_empty() {
416                for table in schema_only {
417                    println!("  📋 {}.{}", db, table);
418                }
419            }
420        }
421        println!();
422    }
423
424    // Show time filters
425    let mut time_filter_count = 0;
426    for db in &selected_databases {
427        time_filter_count += table_rules.predicate_tables(db).len();
428    }
429    if time_filter_count > 0 {
430        println!("Tables with time-based filters: {}", time_filter_count);
431        for db in &selected_databases {
432            let predicate_tables = table_rules.predicate_tables(db);
433            if !predicate_tables.is_empty() {
434                for (table, predicate) in predicate_tables {
435                    println!("  🕒 {}.{} [{}]", db, table, predicate);
436                }
437            }
438        }
439        println!();
440    }
441
442    println!("========================================");
443    println!();
444
445    let confirmed = Confirm::with_theme(&ColorfulTheme::default())
446        .with_prompt("Proceed with this configuration?")
447        .default(true)
448        .interact()
449        .context("Failed to get confirmation")?;
450
451    if !confirmed {
452        tracing::warn!("⚠ User cancelled operation");
453        anyhow::bail!("Interactive selection cancelled by user");
454    }
455
456    tracing::info!("");
457    tracing::info!("✓ Configuration confirmed");
458    tracing::info!("");
459
460    // Step 4: Convert selections to ReplicationFilter
461    let filter = if excluded_tables.is_empty() {
462        // No table exclusions - just filter by databases
463        ReplicationFilter::new(Some(selected_databases), None, None, None)?
464    } else {
465        // Include selected databases and exclude specific tables
466        ReplicationFilter::new(Some(selected_databases), None, None, Some(excluded_tables))?
467    };
468
469    Ok((filter, table_rules))
470}
471
472/// Replace the database name in a PostgreSQL connection URL
473///
474/// # Arguments
475///
476/// * `url` - PostgreSQL connection URL
477/// * `new_db_name` - New database name to use
478///
479/// # Returns
480///
481/// URL with the database name replaced
482fn replace_database_in_url(url: &str, new_db_name: &str) -> Result<String> {
483    // Split into base URL and query parameters
484    let parts: Vec<&str> = url.splitn(2, '?').collect();
485    let base_url = parts[0];
486    let query_params = parts.get(1);
487
488    // Split base URL by '/' to replace the database name
489    let url_parts: Vec<&str> = base_url.rsplitn(2, '/').collect();
490
491    if url_parts.len() != 2 {
492        anyhow::bail!("Invalid connection URL format: cannot replace database name");
493    }
494
495    // Rebuild URL with new database name
496    let new_url = if let Some(params) = query_params {
497        format!("{}/{}?{}", url_parts[1], new_db_name, params)
498    } else {
499        format!("{}/{}", url_parts[1], new_db_name)
500    };
501
502    Ok(new_url)
503}
504
505#[cfg(test)]
506mod tests {
507    use super::*;
508
509    #[test]
510    fn test_replace_database_in_url() {
511        // Basic URL
512        let url = "postgresql://user:pass@localhost:5432/olddb";
513        let new_url = replace_database_in_url(url, "newdb").unwrap();
514        assert_eq!(new_url, "postgresql://user:pass@localhost:5432/newdb");
515
516        // URL with query parameters
517        let url = "postgresql://user:pass@localhost:5432/olddb?sslmode=require";
518        let new_url = replace_database_in_url(url, "newdb").unwrap();
519        assert_eq!(
520            new_url,
521            "postgresql://user:pass@localhost:5432/newdb?sslmode=require"
522        );
523
524        // URL without port
525        let url = "postgresql://user:pass@localhost/olddb";
526        let new_url = replace_database_in_url(url, "newdb").unwrap();
527        assert_eq!(new_url, "postgresql://user:pass@localhost/newdb");
528    }
529
530    #[tokio::test]
531    #[ignore]
532    async fn test_interactive_selection() {
533        // This test requires a real source database and manual interaction
534        let source_url = std::env::var("TEST_SOURCE_URL").unwrap();
535
536        let result = select_databases_and_tables(&source_url).await;
537
538        // This will only work with manual interaction
539        match &result {
540            Ok((filter, rules)) => {
541                println!("✓ Interactive selection completed");
542                println!("Filter: {:?}", filter);
543                println!("Rules: {:?}", rules);
544            }
545            Err(e) => {
546                println!("Interactive selection error: {:?}", e);
547            }
548        }
549    }
550}