1use crate::{
5 filters::ReplicationFilter,
6 migration, postgres,
7 table_rules::{QualifiedTable, TableRules},
8};
9use anyhow::{Context, Result};
10use dialoguer::{theme::ColorfulTheme, Confirm, Input, MultiSelect, Select};
11
12pub async fn select_databases_and_tables(
48 source_url: &str,
49) -> Result<(ReplicationFilter, TableRules)> {
50 tracing::info!("Starting interactive database and table selection...");
51 tracing::info!("");
52
53 tracing::info!("Connecting to source database...");
55 let source_client = postgres::connect_with_retry(source_url)
56 .await
57 .context("Failed to connect to source database")?;
58 tracing::info!("✓ Connected to source");
59 tracing::info!("");
60
61 tracing::info!("Discovering databases on source...");
63 let all_databases = migration::list_databases(&source_client)
64 .await
65 .context("Failed to list databases on source")?;
66
67 if all_databases.is_empty() {
68 tracing::warn!("⚠ No user databases found on source");
69 tracing::warn!(" Source appears to contain only template databases");
70 return Ok((ReplicationFilter::empty(), TableRules::default()));
71 }
72
73 tracing::info!("✓ Found {} database(s)", all_databases.len());
74 tracing::info!("");
75
76 println!("Select databases to replicate:");
78 println!("(Use arrow keys to navigate, Space to select, Enter to confirm)");
79 println!();
80
81 let db_names: Vec<String> = all_databases.iter().map(|db| db.name.clone()).collect();
82
83 let db_selections = MultiSelect::with_theme(&ColorfulTheme::default())
84 .items(&db_names)
85 .interact()
86 .context("Failed to get database selection")?;
87
88 if db_selections.is_empty() {
89 tracing::warn!("⚠ No databases selected");
90 tracing::info!(" Cancelling interactive selection");
91 return Ok((ReplicationFilter::empty(), TableRules::default()));
92 }
93
94 let selected_databases: Vec<String> = db_selections
95 .iter()
96 .map(|&idx| db_names[idx].clone())
97 .collect();
98
99 tracing::info!("");
100 tracing::info!("✓ Selected {} database(s):", selected_databases.len());
101 for db in &selected_databases {
102 tracing::info!(" - {}", db);
103 }
104 tracing::info!("");
105
106 let mut excluded_tables: Vec<String> = Vec::new();
108 let mut table_rules = TableRules::default();
109
110 for db_name in &selected_databases {
111 let db_url = replace_database_in_url(source_url, db_name)
113 .context(format!("Failed to build URL for database '{}'", db_name))?;
114
115 tracing::info!("Discovering tables in database '{}'...", db_name);
117 let db_client = postgres::connect_with_retry(&db_url)
118 .await
119 .context(format!("Failed to connect to database '{}'", db_name))?;
120
121 let all_tables = migration::list_tables(&db_client)
122 .await
123 .context(format!("Failed to list tables from database '{}'", db_name))?;
124
125 if all_tables.is_empty() {
126 tracing::info!(" No tables found in database '{}'", db_name);
127 tracing::info!("");
128 continue;
129 }
130
131 tracing::info!("✓ Found {} table(s) in '{}'", all_tables.len(), db_name);
132 tracing::info!("");
133
134 let table_display_names: Vec<String> = all_tables
136 .iter()
137 .map(|t| {
138 if t.schema == "public" {
139 t.name.clone()
140 } else {
141 format!("{}.{}", t.schema, t.name)
142 }
143 })
144 .collect();
145
146 println!(
147 "Select tables to EXCLUDE from '{}' (or press Enter to include all):",
148 db_name
149 );
150 println!("(Use arrow keys to navigate, Space to select, Enter to confirm)");
151 println!();
152
153 let table_exclusions = MultiSelect::with_theme(&ColorfulTheme::default())
154 .items(&table_display_names)
155 .interact()
156 .context(format!(
157 "Failed to get table exclusion selection for database '{}'",
158 db_name
159 ))?;
160
161 let excluded_indices: std::collections::HashSet<usize> =
163 table_exclusions.iter().copied().collect();
164
165 if !table_exclusions.is_empty() {
166 let excluded_in_db: Vec<String> = table_exclusions
167 .iter()
168 .map(|&idx| {
169 format!("{}.{}", db_name, table_display_names[idx])
171 })
172 .collect();
173
174 tracing::info!("");
175 tracing::info!(
176 "✓ Excluding {} table(s) from '{}':",
177 excluded_in_db.len(),
178 db_name
179 );
180 for table in &excluded_in_db {
181 tracing::info!(" - {}", table);
182 }
183
184 excluded_tables.extend(excluded_in_db);
185 } else {
186 tracing::info!("");
187 tracing::info!("✓ Including all tables from '{}'", db_name);
188 }
189
190 tracing::info!("");
191
192 let remaining_tables: Vec<(usize, String)> = table_display_names
194 .iter()
195 .enumerate()
196 .filter(|(idx, _)| !excluded_indices.contains(idx))
197 .map(|(idx, name)| (idx, name.clone()))
198 .collect();
199
200 if !remaining_tables.is_empty() {
201 let remaining_names: Vec<String> = remaining_tables
202 .iter()
203 .map(|(_, name)| name.clone())
204 .collect();
205
206 println!(
207 "Select tables to replicate SCHEMA-ONLY (no data) from '{}' (or press Enter to skip):",
208 db_name
209 );
210 println!("(Use arrow keys to navigate, Space to select, Enter to confirm)");
211 println!();
212
213 let schema_only_selections = MultiSelect::with_theme(&ColorfulTheme::default())
214 .items(&remaining_names)
215 .interact()
216 .context(format!(
217 "Failed to get schema-only selection for database '{}'",
218 db_name
219 ))?;
220
221 if !schema_only_selections.is_empty() {
222 tracing::info!("");
223 tracing::info!(
224 "✓ Schema-only replication for {} table(s) from '{}':",
225 schema_only_selections.len(),
226 db_name
227 );
228
229 for &selection_idx in &schema_only_selections {
230 let (original_idx, display_name) = &remaining_tables[selection_idx];
231 let table_info = &all_tables[*original_idx];
232
233 tracing::info!(" - {}", display_name);
234
235 let qualified = QualifiedTable::new(
237 Some(db_name.clone()),
238 table_info.schema.clone(),
239 table_info.name.clone(),
240 );
241 table_rules.add_schema_only_table(qualified)?;
242 }
243 }
244
245 tracing::info!("");
246
247 let schema_only_indices: std::collections::HashSet<usize> = schema_only_selections
249 .iter()
250 .map(|&sel_idx| remaining_tables[sel_idx].0)
251 .collect();
252
253 let tables_for_time_filter: Vec<(usize, String)> = remaining_tables
254 .iter()
255 .filter(|(idx, _)| !schema_only_indices.contains(idx))
256 .cloned()
257 .collect();
258
259 if !tables_for_time_filter.is_empty() {
260 let confirm_time_filters = Confirm::with_theme(&ColorfulTheme::default())
261 .with_prompt(format!(
262 "Configure time-based filters for tables in '{}'?",
263 db_name
264 ))
265 .default(false)
266 .interact()
267 .context("Failed to get time filter confirmation")?;
268
269 if confirm_time_filters {
270 tracing::info!("");
271 tracing::info!("Configuring time filters for '{}'...", db_name);
272
273 for (original_idx, display_name) in &tables_for_time_filter {
274 let table_info = &all_tables[*original_idx];
275
276 let apply_filter = Confirm::with_theme(&ColorfulTheme::default())
277 .with_prompt(format!("Apply time filter to '{}'?", display_name))
278 .default(false)
279 .interact()
280 .context("Failed to get time filter confirmation")?;
281
282 if apply_filter {
283 let db_client = postgres::connect_with_retry(&db_url)
285 .await
286 .with_context(|| {
287 format!(
288 "Failed to connect to database '{}' for column query",
289 db_name
290 )
291 })?;
292
293 let columns = migration::get_table_columns(
294 &db_client,
295 &table_info.schema,
296 &table_info.name,
297 )
298 .await?;
299
300 let timestamp_columns: Vec<(String, String)> = columns
302 .iter()
303 .filter(|col| col.is_timestamp)
304 .map(|col| {
305 let display = format!("{} ({})", col.name, col.data_type);
306 (col.name.clone(), display)
307 })
308 .collect();
309
310 let column = if timestamp_columns.is_empty() {
312 tracing::warn!(
314 " ⚠ No timestamp columns found in table '{}'. Please enter column name manually.",
315 display_name
316 );
317 Input::with_theme(&ColorfulTheme::default())
318 .with_prompt(" Column name")
319 .default("created_at".to_string())
320 .interact_text()
321 .context("Failed to get column name")?
322 } else {
323 let display_options: Vec<String> = timestamp_columns
325 .iter()
326 .map(|(_, display)| display.clone())
327 .chain(std::iter::once(
328 "[Enter custom column name]".to_string(),
329 ))
330 .collect();
331
332 let selection = Select::with_theme(&ColorfulTheme::default())
333 .with_prompt(" Select timestamp column for filtering")
334 .items(&display_options)
335 .default(0)
336 .interact()
337 .context("Failed to select timestamp column")?;
338
339 if selection < timestamp_columns.len() {
340 timestamp_columns[selection].0.clone()
342 } else {
343 Input::with_theme(&ColorfulTheme::default())
345 .with_prompt(" Column name")
346 .interact_text()
347 .context("Failed to get custom column name")?
348 }
349 };
350
351 let window: String = Input::with_theme(&ColorfulTheme::default())
353 .with_prompt(
354 " How far back to replicate data (e.g., '2 months', '90 days', '1 year')",
355 )
356 .default("2 months".to_string())
357 .interact_text()
358 .context("Failed to get time window")?;
359
360 tracing::info!(
361 " ✓ Time filter for '{}': {} >= NOW() - INTERVAL '{}'",
362 display_name,
363 column,
364 window
365 );
366
367 let qualified = QualifiedTable::new(
369 Some(db_name.clone()),
370 table_info.schema.clone(),
371 table_info.name.clone(),
372 );
373 table_rules.add_time_filter(qualified, column, window)?;
374 }
375 }
376 }
377 }
378 }
379
380 tracing::info!("");
381 }
382
383 println!();
385 println!("========================================");
386 println!("Replication Configuration Summary");
387 println!("========================================");
388 println!();
389 println!("Databases to replicate: {}", selected_databases.len());
390 for db in &selected_databases {
391 println!(" ✓ {}", db);
392 }
393 println!();
394
395 if !excluded_tables.is_empty() {
396 println!("Tables to exclude: {}", excluded_tables.len());
397 for table in &excluded_tables {
398 println!(" ✗ {}", table);
399 }
400 println!();
401 }
402
403 let mut schema_only_count = 0;
405 for db in &selected_databases {
406 schema_only_count += table_rules.schema_only_tables(db).len();
407 }
408 if schema_only_count > 0 {
409 println!(
410 "Schema-only tables (DDL only, no data): {}",
411 schema_only_count
412 );
413 for db in &selected_databases {
414 let schema_only = table_rules.schema_only_tables(db);
415 if !schema_only.is_empty() {
416 for table in schema_only {
417 println!(" 📋 {}.{}", db, table);
418 }
419 }
420 }
421 println!();
422 }
423
424 let mut time_filter_count = 0;
426 for db in &selected_databases {
427 time_filter_count += table_rules.predicate_tables(db).len();
428 }
429 if time_filter_count > 0 {
430 println!("Tables with time-based filters: {}", time_filter_count);
431 for db in &selected_databases {
432 let predicate_tables = table_rules.predicate_tables(db);
433 if !predicate_tables.is_empty() {
434 for (table, predicate) in predicate_tables {
435 println!(" 🕒 {}.{} [{}]", db, table, predicate);
436 }
437 }
438 }
439 println!();
440 }
441
442 println!("========================================");
443 println!();
444
445 let confirmed = Confirm::with_theme(&ColorfulTheme::default())
446 .with_prompt("Proceed with this configuration?")
447 .default(true)
448 .interact()
449 .context("Failed to get confirmation")?;
450
451 if !confirmed {
452 tracing::warn!("⚠ User cancelled operation");
453 anyhow::bail!("Interactive selection cancelled by user");
454 }
455
456 tracing::info!("");
457 tracing::info!("✓ Configuration confirmed");
458 tracing::info!("");
459
460 let filter = if excluded_tables.is_empty() {
462 ReplicationFilter::new(Some(selected_databases), None, None, None)?
464 } else {
465 ReplicationFilter::new(Some(selected_databases), None, None, Some(excluded_tables))?
467 };
468
469 Ok((filter, table_rules))
470}
471
472fn replace_database_in_url(url: &str, new_db_name: &str) -> Result<String> {
483 let parts: Vec<&str> = url.splitn(2, '?').collect();
485 let base_url = parts[0];
486 let query_params = parts.get(1);
487
488 let url_parts: Vec<&str> = base_url.rsplitn(2, '/').collect();
490
491 if url_parts.len() != 2 {
492 anyhow::bail!("Invalid connection URL format: cannot replace database name");
493 }
494
495 let new_url = if let Some(params) = query_params {
497 format!("{}/{}?{}", url_parts[1], new_db_name, params)
498 } else {
499 format!("{}/{}", url_parts[1], new_db_name)
500 };
501
502 Ok(new_url)
503}
504
505#[cfg(test)]
506mod tests {
507 use super::*;
508
509 #[test]
510 fn test_replace_database_in_url() {
511 let url = "postgresql://user:pass@localhost:5432/olddb";
513 let new_url = replace_database_in_url(url, "newdb").unwrap();
514 assert_eq!(new_url, "postgresql://user:pass@localhost:5432/newdb");
515
516 let url = "postgresql://user:pass@localhost:5432/olddb?sslmode=require";
518 let new_url = replace_database_in_url(url, "newdb").unwrap();
519 assert_eq!(
520 new_url,
521 "postgresql://user:pass@localhost:5432/newdb?sslmode=require"
522 );
523
524 let url = "postgresql://user:pass@localhost/olddb";
526 let new_url = replace_database_in_url(url, "newdb").unwrap();
527 assert_eq!(new_url, "postgresql://user:pass@localhost/newdb");
528 }
529
530 #[tokio::test]
531 #[ignore]
532 async fn test_interactive_selection() {
533 let source_url = std::env::var("TEST_SOURCE_URL").unwrap();
535
536 let result = select_databases_and_tables(&source_url).await;
537
538 match &result {
540 Ok((filter, rules)) => {
541 println!("✓ Interactive selection completed");
542 println!("Filter: {:?}", filter);
543 println!("Rules: {:?}", rules);
544 }
545 Err(e) => {
546 println!("Interactive selection error: {:?}", e);
547 }
548 }
549 }
550}