1use crate::{
5 filters::ReplicationFilter,
6 migration, postgres,
7 serendb::{ConsoleClient, TargetState},
8 table_rules::{QualifiedTable, TableRules},
9};
10use anyhow::{Context, Result};
11use inquire::{Confirm, MultiSelect, Select, Text};
12
13pub async fn select_seren_database() -> Result<(String, TargetState)> {
27 print_header("Select SerenDB Target");
28
29 let api_key = get_api_key()?;
30 let client = ConsoleClient::new(None, api_key);
31
32 let projects = client.list_projects().await?;
34 if projects.is_empty() {
35 anyhow::bail!("No projects found for your account.");
36 }
37 let project_names: Vec<String> = projects.iter().map(|p| p.name.clone()).collect();
38 let selected_project_name = Select::new("Select a project:", project_names).prompt()?;
39 let selected_project = projects
40 .into_iter()
41 .find(|p| p.name == selected_project_name)
42 .unwrap();
43
44 let branches = client.list_branches(&selected_project.id).await?;
46 if branches.is_empty() {
47 anyhow::bail!(
48 "Project '{}' has no branches. Create a branch first at console.serendb.com.",
49 selected_project.name
50 );
51 }
52
53 let branch = if branches.len() == 1 {
54 branches.into_iter().next().unwrap()
55 } else {
56 let branch_names: Vec<String> = branches.iter().map(|b| b.name.clone()).collect();
57 let selected_branch_name = Select::new("Select a branch:", branch_names).prompt()?;
58 branches
59 .into_iter()
60 .find(|b| b.name == selected_branch_name)
61 .unwrap()
62 };
63
64 let databases = client
66 .list_databases(&selected_project.id, &branch.id)
67 .await?;
68
69 let selected_database_name = if databases.is_empty() {
70 println!(
72 "\n Project '{}' has no databases yet.",
73 selected_project.name
74 );
75 println!(" The database will be created during replication.\n");
76 Text::new("Enter database name to create:")
77 .with_default("serendb")
78 .prompt()?
79 } else {
80 let mut database_names: Vec<String> = databases.iter().map(|db| db.name.clone()).collect();
82 database_names.push("[ Create new database ]".to_string());
83
84 let selection = Select::new("Select a database:", database_names).prompt()?;
85
86 if selection == "[ Create new database ]" {
87 Text::new("Enter database name to create:")
88 .with_default("serendb")
89 .prompt()?
90 } else {
91 selection
92 }
93 };
94
95 let conn_str = client
97 .get_connection_string(
98 &selected_project.id,
99 &branch.id,
100 &selected_database_name,
101 false,
102 )
103 .await?;
104
105 let target_state = TargetState::new(
107 selected_project.id.clone(),
108 selected_project.name.clone(),
109 branch.id.clone(),
110 branch.name.clone(),
111 vec![selected_database_name],
112 "", );
114
115 Ok((conn_str, target_state))
116}
117
118enum WizardStep {
120 SelectDatabases,
121 SelectTablesForDb(usize), SelectSchemaOnlyForDb(usize), ConfigureTimeFiltersForDb(usize), Review,
125}
126
127struct CachedDbTables {
129 all_tables: Vec<migration::TableInfo>,
130 table_display_names: Vec<String>,
131}
132
133pub async fn select_databases_and_tables(
171 source_url: &str,
172) -> Result<(ReplicationFilter, TableRules)> {
173 tracing::info!("Starting interactive database and table selection...");
174 println!();
175
176 tracing::info!("Connecting to source database...");
178 let source_client = postgres::connect_with_retry(source_url)
179 .await
180 .context("Failed to connect to source database")?;
181 tracing::info!("✓ Connected to source");
182 println!();
183
184 tracing::info!("Discovering databases on source...");
186 let all_databases = migration::list_databases(&source_client)
187 .await
188 .context("Failed to list databases on source")?;
189
190 if all_databases.is_empty() {
191 tracing::warn!("⚠ No user databases found on source");
192 tracing::warn!(" Source appears to contain only template databases");
193 return Ok((ReplicationFilter::empty(), TableRules::default()));
194 }
195
196 tracing::info!("✓ Found {} database(s)", all_databases.len());
197 println!();
198
199 let db_names: Vec<String> = all_databases.iter().map(|db| db.name.clone()).collect();
200
201 let mut selected_db_indices: Vec<usize> = Vec::new();
203 let mut current_step = WizardStep::SelectDatabases;
204
205 let mut included_tables_by_db: std::collections::HashMap<String, Vec<String>> =
207 std::collections::HashMap::new();
208 let mut schema_only_by_db: std::collections::HashMap<String, Vec<(String, String)>> =
209 std::collections::HashMap::new(); let mut time_filters_by_db: std::collections::HashMap<
211 String,
212 Vec<(String, String, String, String)>,
213 > = std::collections::HashMap::new(); let mut table_cache: std::collections::HashMap<String, CachedDbTables> =
217 std::collections::HashMap::new();
218
219 loop {
220 match current_step {
221 WizardStep::SelectDatabases => {
222 print_header("Step 1 of 5: Select Databases");
223 println!("Navigation: Space to toggle, Enter to confirm, Esc to cancel");
224 println!();
225
226 let defaults: Vec<usize> = selected_db_indices.clone();
227
228 let selections =
229 MultiSelect::new("Select databases to replicate:", db_names.clone())
230 .with_default(&defaults)
231 .with_help_message("↑↓ navigate, Space toggle, Enter confirm")
232 .prompt();
233
234 match selections {
235 Ok(selected) => {
236 selected_db_indices = selected
238 .iter()
239 .filter_map(|name| db_names.iter().position(|n| n == name))
240 .collect();
241
242 if selected_db_indices.is_empty() {
243 println!();
244 println!("⚠ Please select at least one database");
245 continue;
246 }
247
248 included_tables_by_db.clear();
250 schema_only_by_db.clear();
251 time_filters_by_db.clear();
252 table_cache.clear();
253
254 current_step = WizardStep::SelectTablesForDb(0);
255 }
256 Err(inquire::InquireError::OperationCanceled) => {
257 anyhow::bail!("Operation cancelled by user");
258 }
259 Err(inquire::InquireError::OperationInterrupted) => {
260 anyhow::bail!("Operation interrupted");
261 }
262 Err(e) => return Err(e.into()),
263 }
264 }
265
266 WizardStep::SelectTablesForDb(db_idx) => {
267 let db_name = &db_names[selected_db_indices[db_idx]].clone();
268 print_header(&format!(
269 "Step 2 of 5: Select Tables to Include ({}/{})",
270 db_idx + 1,
271 selected_db_indices.len()
272 ));
273 println!("Database: {}", db_name);
274 println!("Press Enter without selecting to include ALL tables.");
275 println!("Navigation: Space to toggle, Enter to continue, Esc to go back");
276 println!();
277
278 let cached = get_or_cache_tables(&mut table_cache, source_url, db_name).await?;
280
281 if cached.all_tables.is_empty() {
282 println!(" No tables found in database '{}'", db_name);
283 if db_idx + 1 < selected_db_indices.len() {
285 current_step = WizardStep::SelectTablesForDb(db_idx + 1);
286 } else {
287 current_step = WizardStep::SelectSchemaOnlyForDb(0);
288 }
289 continue;
290 }
291
292 let previous_inclusions: Vec<usize> = included_tables_by_db
294 .get(db_name)
295 .map(|included| {
296 included
297 .iter()
298 .filter_map(|t| {
299 let stripped =
301 t.strip_prefix(&format!("{}.", db_name)).unwrap_or(t);
302 cached
303 .table_display_names
304 .iter()
305 .position(|n| n == stripped)
306 })
307 .collect()
308 })
309 .unwrap_or_default();
310
311 let selections = MultiSelect::new(
312 "Select tables to INCLUDE (Enter = include all):",
313 cached.table_display_names.clone(),
314 )
315 .with_default(&previous_inclusions)
316 .with_help_message("Space toggle, Enter confirm, Esc go back")
317 .prompt();
318
319 match selections {
320 Ok(selected_inclusions) => {
321 let db_inclusions: Vec<String> = if selected_inclusions.is_empty() {
323 cached
324 .table_display_names
325 .iter()
326 .map(|table_name| format!("{}.{}", db_name, table_name))
327 .collect()
328 } else {
329 selected_inclusions
330 .iter()
331 .map(|table_name| format!("{}.{}", db_name, table_name))
332 .collect()
333 };
334
335 included_tables_by_db.insert(db_name.clone(), db_inclusions);
337
338 if db_idx + 1 < selected_db_indices.len() {
340 current_step = WizardStep::SelectTablesForDb(db_idx + 1);
341 } else {
342 current_step = WizardStep::SelectSchemaOnlyForDb(0);
343 }
344 }
345 Err(inquire::InquireError::OperationCanceled) => {
346 if db_idx > 0 {
348 current_step = WizardStep::SelectTablesForDb(db_idx - 1);
349 } else {
350 current_step = WizardStep::SelectDatabases;
351 }
352 }
353 Err(inquire::InquireError::OperationInterrupted) => {
354 anyhow::bail!("Operation interrupted");
355 }
356 Err(e) => return Err(e.into()),
357 }
358 }
359
360 WizardStep::SelectSchemaOnlyForDb(db_idx) => {
361 let db_name = &db_names[selected_db_indices[db_idx]].clone();
362 print_header(&format!(
363 "Step 3 of 5: Schema-Only Tables ({}/{})",
364 db_idx + 1,
365 selected_db_indices.len()
366 ));
367 println!("Database: {}", db_name);
368 println!("Schema-only tables replicate structure but NO data.");
369 println!("Navigation: Space to toggle, Enter to continue, Esc to go back");
370 println!();
371
372 let cached = get_or_cache_tables(&mut table_cache, source_url, db_name).await?;
373
374 if cached.all_tables.is_empty() {
375 if db_idx + 1 < selected_db_indices.len() {
377 current_step = WizardStep::SelectSchemaOnlyForDb(db_idx + 1);
378 } else {
379 current_step = WizardStep::ConfigureTimeFiltersForDb(0);
380 }
381 continue;
382 }
383
384 let included = included_tables_by_db.get(db_name);
386 let available_tables: Vec<(usize, String)> = cached
387 .table_display_names
388 .iter()
389 .enumerate()
390 .filter(|(_, name)| {
391 let full_name = format!("{}.{}", db_name, name);
392 included.is_some_and(|inc| inc.contains(&full_name))
393 })
394 .map(|(idx, name)| (idx, name.clone()))
395 .collect();
396
397 if available_tables.is_empty() {
398 println!(" No tables included from '{}'", db_name);
399 if db_idx + 1 < selected_db_indices.len() {
400 current_step = WizardStep::SelectSchemaOnlyForDb(db_idx + 1);
401 } else {
402 current_step = WizardStep::ConfigureTimeFiltersForDb(0);
403 }
404 continue;
405 }
406
407 let available_names: Vec<String> =
408 available_tables.iter().map(|(_, n)| n.clone()).collect();
409
410 let previous_schema_only: Vec<usize> = schema_only_by_db
412 .get(db_name)
413 .map(|selected| {
414 selected
415 .iter()
416 .filter_map(|(schema, table)| {
417 let display = if schema == "public" {
418 table.clone()
419 } else {
420 format!("{}.{}", schema, table)
421 };
422 available_names.iter().position(|n| n == &display)
423 })
424 .collect()
425 })
426 .unwrap_or_default();
427
428 let selections = MultiSelect::new(
429 "Select tables to replicate SCHEMA-ONLY (no data):",
430 available_names.clone(),
431 )
432 .with_default(&previous_schema_only)
433 .with_help_message("Space toggle, Enter confirm, Esc go back")
434 .prompt();
435
436 match selections {
437 Ok(selected_schema_only) => {
438 let schema_only_tables: Vec<(String, String)> = selected_schema_only
440 .iter()
441 .filter_map(|display_name| {
442 available_tables
443 .iter()
444 .find(|(_, n)| n == display_name)
445 .map(|(idx, _)| {
446 let t = &cached.all_tables[*idx];
447 (t.schema.clone(), t.name.clone())
448 })
449 })
450 .collect();
451
452 schema_only_by_db.insert(db_name.clone(), schema_only_tables);
453
454 if db_idx + 1 < selected_db_indices.len() {
455 current_step = WizardStep::SelectSchemaOnlyForDb(db_idx + 1);
456 } else {
457 current_step = WizardStep::ConfigureTimeFiltersForDb(0);
458 }
459 }
460 Err(inquire::InquireError::OperationCanceled) => {
461 if db_idx > 0 {
463 current_step = WizardStep::SelectSchemaOnlyForDb(db_idx - 1);
464 } else {
465 let last_db = selected_db_indices.len().saturating_sub(1);
466 current_step = WizardStep::SelectTablesForDb(last_db);
467 }
468 }
469 Err(inquire::InquireError::OperationInterrupted) => {
470 anyhow::bail!("Operation interrupted");
471 }
472 Err(e) => return Err(e.into()),
473 }
474 }
475
476 WizardStep::ConfigureTimeFiltersForDb(db_idx) => {
477 let db_name = &db_names[selected_db_indices[db_idx]].clone();
478 print_header(&format!(
479 "Step 4 of 5: Time Filters ({}/{})",
480 db_idx + 1,
481 selected_db_indices.len()
482 ));
483 println!("Database: {}", db_name);
484 println!("Time filters limit data to recent records (e.g., last 90 days).");
485 println!();
486
487 let cached = get_or_cache_tables(&mut table_cache, source_url, db_name).await?;
488
489 if cached.all_tables.is_empty() {
490 if db_idx + 1 < selected_db_indices.len() {
491 current_step = WizardStep::ConfigureTimeFiltersForDb(db_idx + 1);
492 } else {
493 current_step = WizardStep::Review;
494 }
495 continue;
496 }
497
498 let included = included_tables_by_db.get(db_name);
500 let schema_only = schema_only_by_db.get(db_name);
501 let available_tables: Vec<(usize, String)> = cached
502 .table_display_names
503 .iter()
504 .enumerate()
505 .filter(|(idx, name)| {
506 let full_name = format!("{}.{}", db_name, name);
507 let is_included = included.is_some_and(|inc| inc.contains(&full_name));
508 let t = &cached.all_tables[*idx];
509 let is_schema_only = schema_only.is_some_and(|so| {
510 so.iter().any(|(s, n)| s == &t.schema && n == &t.name)
511 });
512 is_included && !is_schema_only
513 })
514 .map(|(idx, name)| (idx, name.clone()))
515 .collect();
516
517 if available_tables.is_empty() {
518 println!(" No tables available for time filtering in '{}'", db_name);
519 if db_idx + 1 < selected_db_indices.len() {
520 current_step = WizardStep::ConfigureTimeFiltersForDb(db_idx + 1);
521 } else {
522 current_step = WizardStep::Review;
523 }
524 continue;
525 }
526
527 let configure = Confirm::new("Configure time-based filters for this database?")
529 .with_default(false)
530 .with_help_message("Enter to confirm, Esc to go back")
531 .prompt();
532
533 match configure {
534 Ok(true) => {
535 let available_names: Vec<String> =
537 available_tables.iter().map(|(_, n)| n.clone()).collect();
538
539 let table_selections = MultiSelect::new(
540 "Select tables to apply time filter:",
541 available_names.clone(),
542 )
543 .with_help_message("Space toggle, Enter confirm")
544 .prompt();
545
546 match table_selections {
547 Ok(selected_tables) => {
548 let mut time_filters: Vec<(String, String, String, String)> =
549 Vec::new();
550
551 for display_name in &selected_tables {
552 if let Some((idx, _)) =
553 available_tables.iter().find(|(_, n)| n == display_name)
554 {
555 let t = &cached.all_tables[*idx];
556 let db_url = replace_database_in_url(source_url, db_name)?;
557 let db_client = postgres::connect_with_retry(&db_url)
558 .await
559 .context("Failed to connect for column query")?;
560
561 let columns = migration::get_table_columns(
563 &db_client, &t.schema, &t.name,
564 )
565 .await?;
566
567 let timestamp_columns: Vec<String> = columns
568 .iter()
569 .filter(|c| c.is_timestamp)
570 .map(|c| format!("{} ({})", c.name, c.data_type))
571 .collect();
572
573 println!();
574 println!("Configure time filter for '{}':", display_name);
575
576 let column = if timestamp_columns.is_empty() {
577 println!(
578 " ⚠ No timestamp columns found. Enter column name manually."
579 );
580 Text::new(" Column name:")
581 .with_default("created_at")
582 .prompt()
583 .context("Failed to get column name")?
584 } else {
585 let mut options = timestamp_columns.clone();
586 options.push("[Enter custom column name]".to_string());
587
588 let selection =
589 Select::new(" Select timestamp column:", options)
590 .prompt()
591 .context("Failed to select column")?;
592
593 if selection == "[Enter custom column name]" {
594 Text::new(" Column name:")
595 .prompt()
596 .context("Failed to get column name")?
597 } else {
598 selection
600 .split(" (")
601 .next()
602 .unwrap_or(&selection)
603 .to_string()
604 }
605 };
606
607 let window = Text::new(
608 " Time window (e.g., '90 days', '6 months', '1 year'):",
609 )
610 .with_default("90 days")
611 .prompt()
612 .context("Failed to get time window")?;
613
614 time_filters.push((
615 t.schema.clone(),
616 t.name.clone(),
617 column,
618 window,
619 ));
620 }
621 }
622
623 time_filters_by_db.insert(db_name.clone(), time_filters);
624 }
625 Err(inquire::InquireError::OperationCanceled) => {
626 continue;
628 }
629 Err(inquire::InquireError::OperationInterrupted) => {
630 anyhow::bail!("Operation interrupted");
631 }
632 Err(e) => return Err(e.into()),
633 }
634
635 if db_idx + 1 < selected_db_indices.len() {
636 current_step = WizardStep::ConfigureTimeFiltersForDb(db_idx + 1);
637 } else {
638 current_step = WizardStep::Review;
639 }
640 }
641 Ok(false) => {
642 if db_idx + 1 < selected_db_indices.len() {
644 current_step = WizardStep::ConfigureTimeFiltersForDb(db_idx + 1);
645 } else {
646 current_step = WizardStep::Review;
647 }
648 }
649 Err(inquire::InquireError::OperationCanceled) => {
650 if db_idx > 0 {
652 current_step = WizardStep::ConfigureTimeFiltersForDb(db_idx - 1);
653 } else {
654 let last_db = selected_db_indices.len().saturating_sub(1);
655 current_step = WizardStep::SelectSchemaOnlyForDb(last_db);
656 }
657 }
658 Err(inquire::InquireError::OperationInterrupted) => {
659 anyhow::bail!("Operation interrupted");
660 }
661 Err(e) => return Err(e.into()),
662 }
663 }
664
665 WizardStep::Review => {
666 print_header("Step 5 of 5: Review Configuration");
667
668 let included_tables: Vec<String> =
670 included_tables_by_db.values().flatten().cloned().collect();
671
672 let selected_databases: Vec<String> = selected_db_indices
673 .iter()
674 .map(|&i| db_names[i].clone())
675 .collect();
676
677 println!();
678 println!("Databases to replicate: {}", selected_databases.len());
679 for db in &selected_databases {
680 println!(" ✓ {}", db);
681 }
682 println!();
683
684 println!("Tables to replicate: {}", included_tables.len());
685 if included_tables.len() <= 20 {
686 for table in &included_tables {
687 println!(" ✓ {}", table);
688 }
689 } else {
690 for table in included_tables.iter().take(10) {
692 println!(" ✓ {}", table);
693 }
694 println!(" ... ({} more tables)", included_tables.len() - 15);
695 for table in included_tables.iter().skip(included_tables.len() - 5) {
696 println!(" ✓ {}", table);
697 }
698 }
699 println!();
700
701 let schema_only_count: usize = schema_only_by_db.values().map(|v| v.len()).sum();
703 if schema_only_count > 0 {
704 println!("Schema-only tables (no data): {}", schema_only_count);
705 for (db, tables) in &schema_only_by_db {
706 for (schema, table) in tables {
707 let display = if schema == "public" {
708 format!("{}.{}", db, table)
709 } else {
710 format!("{}.{}.{}", db, schema, table)
711 };
712 println!(" ◇ {}", display);
713 }
714 }
715 println!();
716 } else {
717 println!("Schema-only tables: none");
718 println!();
719 }
720
721 let time_filter_count: usize = time_filters_by_db.values().map(|v| v.len()).sum();
723 if time_filter_count > 0 {
724 println!("Time-filtered tables: {}", time_filter_count);
725 for (db, filters) in &time_filters_by_db {
726 for (schema, table, column, window) in filters {
727 let display = if schema == "public" {
728 format!("{}.{}", db, table)
729 } else {
730 format!("{}.{}.{}", db, schema, table)
731 };
732 println!(" ⏱ {} ({} >= last {})", display, column, window);
733 }
734 }
735 println!();
736 } else {
737 println!("Time filters: none");
738 println!();
739 }
740
741 println!("───────────────────────────────────────────────────────────────");
742 println!();
743
744 let confirmed = Confirm::new("Proceed with this configuration?")
745 .with_default(true)
746 .with_help_message("Enter confirm, Esc go back")
747 .prompt();
748
749 match confirmed {
750 Ok(true) => break, Ok(false) | Err(inquire::InquireError::OperationCanceled) => {
752 let last_db = selected_db_indices.len().saturating_sub(1);
754 current_step = WizardStep::ConfigureTimeFiltersForDb(last_db);
755 }
756 Err(inquire::InquireError::OperationInterrupted) => {
757 anyhow::bail!("Operation interrupted");
758 }
759 Err(e) => return Err(e.into()),
760 }
761 }
762 }
763 }
764
765 let selected_databases: Vec<String> = selected_db_indices
767 .iter()
768 .map(|&i| db_names[i].clone())
769 .collect();
770
771 let included_tables: Vec<String> = included_tables_by_db.values().flatten().cloned().collect();
772
773 tracing::info!("");
774 tracing::info!("✓ Configuration confirmed");
775 tracing::info!("");
776
777 let filter = if included_tables.is_empty() {
779 ReplicationFilter::new(Some(selected_databases), None, None, None)?
780 } else {
781 ReplicationFilter::new(Some(selected_databases), None, Some(included_tables), None)?
782 };
783
784 let mut table_rules = TableRules::default();
786
787 for (db, tables) in &schema_only_by_db {
789 for (schema, table) in tables {
790 let qualified = QualifiedTable::new(Some(db.clone()), schema.clone(), table.clone());
791 table_rules.add_schema_only_table(qualified)?;
792 }
793 }
794
795 for (db, filters) in &time_filters_by_db {
797 for (schema, table, column, window) in filters {
798 let qualified = QualifiedTable::new(Some(db.clone()), schema.clone(), table.clone());
799 table_rules.add_time_filter(qualified, column.clone(), window.clone())?;
800 }
801 }
802
803 Ok((filter, table_rules))
804}
805
806async fn get_or_cache_tables<'a>(
808 cache: &'a mut std::collections::HashMap<String, CachedDbTables>,
809 source_url: &str,
810 db_name: &str,
811) -> Result<&'a CachedDbTables> {
812 if !cache.contains_key(db_name) {
813 let db_url = replace_database_in_url(source_url, db_name)?;
814 let db_client = postgres::connect_with_retry(&db_url)
815 .await
816 .context(format!("Failed to connect to database '{}'", db_name))?;
817
818 let all_tables = migration::list_tables(&db_client)
819 .await
820 .context(format!("Failed to list tables from database '{}'", db_name))?;
821
822 let table_display_names: Vec<String> = all_tables
823 .iter()
824 .map(|t| {
825 if t.schema == "public" {
826 t.name.clone()
827 } else {
828 format!("{}.{}", t.schema, t.name)
829 }
830 })
831 .collect();
832
833 cache.insert(
834 db_name.to_string(),
835 CachedDbTables {
836 all_tables,
837 table_display_names,
838 },
839 );
840 }
841
842 Ok(cache.get(db_name).unwrap())
843}
844
845fn print_header(title: &str) {
847 println!();
848 println!("╔{}╗", "═".repeat(62));
849 println!("║ {:<60}║", title);
850 println!("╚{}╝", "═".repeat(62));
851 println!();
852}
853
854fn replace_database_in_url(url: &str, new_db_name: &str) -> Result<String> {
865 let parts: Vec<&str> = url.splitn(2, '?').collect();
867 let base_url = parts[0];
868 let query_params = parts.get(1);
869
870 let url_parts: Vec<&str> = base_url.rsplitn(2, '/').collect();
872
873 if url_parts.len() != 2 {
874 anyhow::bail!("Invalid connection URL format: cannot replace database name");
875 }
876
877 let new_url = if let Some(params) = query_params {
879 format!("{}/{}?{}", url_parts[1], new_db_name, params)
880 } else {
881 format!("{}/{}", url_parts[1], new_db_name)
882 };
883
884 Ok(new_url)
885}
886
887pub fn get_api_key() -> anyhow::Result<String> {
888 use dialoguer::{theme::ColorfulTheme, Input};
889
890 if let Ok(key) = std::env::var("SEREN_API_KEY") {
892 if !key.trim().is_empty() {
893 return Ok(key.trim().to_string());
894 }
895 }
896
897 println!("\nRemote execution requires a SerenDB API key for authentication.");
899 println!("\nYou can generate an API key at:");
900 println!(" https://console.serendb.com/api-keys\n");
901
902 let key: String = Input::with_theme(&ColorfulTheme::default())
903 .with_prompt("Enter your SerenDB API key")
904 .allow_empty(false)
905 .interact_text()?;
906
907 if key.trim().is_empty() {
908 anyhow::bail!(
909 "API key is required for remote execution.\n\
910 Set the SEREN_API_KEY environment variable or run interactively.\n\
911 Get your API key at: https://console.serendb.com/api-keys\n\
912 Or use --local to run replication on your machine instead"
913 );
914 }
915
916 let trimmed_key = key.trim().to_string();
917
918 std::env::set_var("SEREN_API_KEY", &trimmed_key);
920
921 Ok(trimmed_key)
922}
923
924#[cfg(test)]
925mod tests {
926 use super::*;
927
928 #[test]
929 fn test_replace_database_in_url() {
930 let url = "postgresql://user:pass@localhost:5432/olddb";
932 let new_url = replace_database_in_url(url, "newdb").unwrap();
933 assert_eq!(new_url, "postgresql://user:pass@localhost:5432/newdb");
934
935 let url = "postgresql://user:pass@localhost:5432/olddb?sslmode=require";
937 let new_url = replace_database_in_url(url, "newdb").unwrap();
938 assert_eq!(
939 new_url,
940 "postgresql://user:pass@localhost:5432/newdb?sslmode=require"
941 );
942
943 let url = "postgresql://user:pass@localhost/olddb";
945 let new_url = replace_database_in_url(url, "newdb").unwrap();
946 assert_eq!(new_url, "postgresql://user:pass@localhost/newdb");
947 }
948
949 #[tokio::test]
950 #[ignore]
951 async fn test_interactive_selection() {
952 let source_url = std::env::var("TEST_SOURCE_URL").unwrap();
954
955 let result = select_databases_and_tables(&source_url).await;
956
957 match &result {
959 Ok((filter, rules)) => {
960 println!("✓ Interactive selection completed");
961 println!("Filter: {:?}", filter);
962 println!("Rules: {:?}", rules);
963 }
964 Err(e) => {
965 println!("Interactive selection error: {:?}", e);
966 }
967 }
968 }
969}