1mod analysis;
2pub(crate) mod cursor_expr;
3mod doctor;
4mod mssql;
5mod mysql;
6mod postgres;
7pub mod type_report;
8
9pub(crate) use analysis::chunk_sparsity_from_counts;
10#[cfg(test)]
11use analysis::{
12 build_suggestion, check_connection_limit, check_dense_surrogate_cost,
13 check_parallel_memory_risk, check_sparse_range, compute_verdict, derive_strategy,
14 recommend_parallelism, recommend_profile,
15};
16#[allow(unused_imports)]
17pub use doctor::doctor;
18#[cfg(test)]
19use postgres::{extract_scan_type, parse_pg_row_estimate};
20
21use crate::config::{Config, ExportConfig, SourceType};
22use crate::error::Result;
23use crate::types::policy::TypePolicy;
24use crate::types::target::{ExportTarget, TargetStatus};
25
26#[derive(Debug)]
27pub enum HealthVerdict {
28 Efficient,
29 Acceptable,
30 Degraded,
31 Unsafe,
32}
33
34impl std::fmt::Display for HealthVerdict {
35 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36 match self {
37 Self::Efficient => write!(f, "EFFICIENT"),
38 Self::Acceptable => write!(f, "ACCEPTABLE"),
39 Self::Degraded => write!(f, "DEGRADED"),
40 Self::Unsafe => write!(f, "UNSAFE"),
41 }
42 }
43}
44
45pub(crate) struct ExportDiagnostic {
46 pub export_name: String,
47 pub strategy: String,
48 pub mode: String,
49 pub cursor_column: Option<String>,
50 pub row_estimate: Option<i64>,
51 pub cursor_min: Option<String>,
52 pub cursor_max: Option<String>,
53 pub scan_type: Option<String>,
54 pub uses_index: bool,
55 pub verdict: HealthVerdict,
56 pub recommended_profile: &'static str,
57 pub recommended_parallel: (u32, &'static str),
58 pub warnings: Vec<String>,
59 pub suggestion: Option<String>,
60}
61
62pub(crate) fn get_export_diagnostic(
66 config: &Config,
67 export: &ExportConfig,
68) -> Result<ExportDiagnostic> {
69 let url = config.source.resolve_url()?;
70 let tls = config.source.tls.as_ref();
71 crate::source::warn_if_tls_disabled(&config.source);
72 match config.source.source_type {
73 SourceType::Postgres => postgres::diagnose_export_pg(&url, tls, export),
74 SourceType::Mysql => mysql::diagnose_export_mysql(&url, tls, export),
75 SourceType::Mssql => mssql::diagnose_export_mssql(&url, tls, export),
76 }
77}
78
79fn destination_identity(d: &crate::config::DestinationConfig) -> String {
86 format!(
87 "{:?}:{}:{}:{}",
88 d.destination_type,
89 d.bucket.as_deref().unwrap_or("-"),
90 d.endpoint.as_deref().unwrap_or("-"),
91 d.path.as_deref().unwrap_or("-"),
92 )
93}
94
95fn target_fail_note(n: usize, target_label: &str) -> String {
101 let col = if n == 1 { "column" } else { "columns" };
102 format!(
103 "Note: {n} {col} FAIL {target_label} compatibility; exit code is gated only with --strict (currently exit 0)"
104 )
105}
106
107pub fn check(
108 config_path: &str,
109 export_name: Option<&str>,
110 params: Option<&std::collections::HashMap<String, String>>,
111 show_type_report: bool,
112 strict: bool,
113 json_output: bool,
114 target: Option<ExportTarget>,
115) -> Result<()> {
116 let config = Config::load_with_params(config_path, params)?;
117
118 let exports: Vec<&ExportConfig> = if let Some(name) = export_name {
119 let e = config
120 .exports
121 .iter()
122 .find(|e| e.name == name)
123 .ok_or_else(|| anyhow::anyhow!("export '{}' not found in config", name))?;
124 vec![e]
125 } else {
126 config.exports.iter().collect()
127 };
128
129 let url = config.source.resolve_url()?;
130 let tls = config.source.tls.as_ref();
131 crate::source::warn_if_tls_disabled(&config.source);
137 match config.source.source_type {
138 SourceType::Postgres => postgres::check_postgres(&url, tls, &exports, json_output)?,
139 SourceType::Mysql => mysql::check_mysql(&url, tls, &exports, json_output)?,
140 SourceType::Mssql => mssql::check_mssql(&url, tls, &exports, json_output)?,
141 }
142
143 let mut seen_destinations: std::collections::HashSet<String> = std::collections::HashSet::new();
152 for export in &exports {
153 let dest_key = destination_identity(&export.destination);
154 if !seen_destinations.insert(dest_key) {
155 continue;
156 }
157 let expanded = crate::plan::build::expand_destination_templates(
158 export.destination.clone(),
159 &export.name,
160 );
161 crate::destination::create_destination(&expanded).map_err(|e| {
162 anyhow::anyhow!(
163 "export '{}': destination preflight failed: {:#}",
164 export.name,
165 e
166 )
167 })?;
168 }
169
170 if show_type_report {
171 let policy = if strict {
172 TypePolicy::strict()
173 } else {
174 TypePolicy::warn_only()
175 };
176
177 let mut any_fatal = false;
178 let mut target_fail_cols = 0usize;
184 let mut target_fail_label: Option<&'static str> = None;
185 for export in &exports {
186 let column_overrides =
187 crate::plan::parse_column_overrides_pub(&export.columns, &export.name)?;
188 if let Some(t) = export.target.as_deref()
192 && crate::types::target::ExportTarget::parse(t).is_none()
193 {
194 anyhow::bail!(
195 "export '{}': unknown target '{t}' (expected: {})",
196 export.name,
197 crate::types::target::ExportTarget::valid_target_names()
198 );
199 }
200 let eff_target = target.or_else(|| {
201 export
202 .target
203 .as_deref()
204 .and_then(crate::types::target::ExportTarget::parse)
205 });
206 let config_dir = std::path::Path::new(config_path)
207 .parent()
208 .unwrap_or_else(|| std::path::Path::new("."));
209 match type_report::collect_report(
210 &config,
211 export,
212 &column_overrides,
213 &policy,
214 eff_target,
215 config_dir,
216 params,
217 ) {
218 Ok(report) => {
219 if report.has_fatal() {
220 any_fatal = true;
221 }
222 if let Some(t) = eff_target
223 && report.has_target_fail()
224 {
225 any_fatal = true;
226 target_fail_cols += report
227 .columns
228 .iter()
229 .filter(|c| c.target_status == Some(TargetStatus::Fail))
230 .count();
231 target_fail_label.get_or_insert(t.label());
232 }
233 if json_output {
234 type_report::print_json(&report)?;
235 } else {
236 type_report::print_table(&report, eff_target);
237 }
238 }
239 Err(e) => {
240 log::warn!("type report for '{}' failed: {:#}", export.name, e);
241 }
242 }
243 }
244
245 if strict && any_fatal {
246 anyhow::bail!("strict mode: unsafe type mappings found (see report above)");
247 } else if !strict && target_fail_cols > 0 && !json_output {
248 println!();
251 println!(
252 "{}",
253 target_fail_note(target_fail_cols, target_fail_label.unwrap_or("target"))
254 );
255 }
256 }
257
258 Ok(())
259}
260
261fn print_diagnostic(diag: &ExportDiagnostic) {
262 println!();
263 println!("Export: {}", diag.export_name);
264 println!(" Strategy: {}", diag.strategy);
265 println!(" Mode: {}", diag.mode);
266 if let Some(est) = diag.row_estimate {
267 if est >= 1_000_000 {
268 println!(" Row estimate: ~{}M", est / 1_000_000);
269 } else if est >= 1_000 {
270 println!(" Row estimate: ~{}K", est / 1_000);
271 } else {
272 println!(" Row estimate: ~{}", est);
273 }
274 }
275 if let (Some(min_v), Some(max_v)) = (&diag.cursor_min, &diag.cursor_max) {
276 println!(" Cursor range: {} .. {}", min_v, max_v);
277 }
278 if let Some(col) = &diag.cursor_column {
279 println!(" Cursor col: {}", col);
280 }
281 if let Some(scan) = &diag.scan_type {
282 let idx_label = if diag.uses_index { " (indexed)" } else { "" };
283 println!(" Scan type: {}{}", scan, idx_label);
284 }
285 println!(" Verdict: {}", diag.verdict);
286 println!(
287 " Recommended: tuning.profile: {}",
288 diag.recommended_profile
289 );
290 let (par_level, par_reason) = diag.recommended_parallel;
291 if par_level > 1 {
292 println!(" Recommended: parallel: {} ({})", par_level, par_reason);
293 } else {
294 println!(" Parallelism: {} ({})", par_level, par_reason);
295 }
296 for w in &diag.warnings {
297 println!(" Warning: {}", w);
298 }
299 if let Some(suggestion) = &diag.suggestion {
300 println!(" Suggestion: {}", suggestion);
301 }
302}
303
304#[cfg(test)]
305mod tests {
306 use super::*;
307 use crate::config::{DestinationConfig, DestinationType, ExportConfig, ExportMode, FormatType};
308 use doctor::{
309 categorize_dest_error, categorize_source_error, destination_error_hint, source_error_hint,
310 };
311
312 fn make_export(name: &str, mode: ExportMode, cursor: Option<&str>) -> ExportConfig {
313 ExportConfig {
316 mode,
317 cursor_column: cursor.map(|s| s.to_string()),
318 query: Some("SELECT * FROM t".to_string()),
319 format: FormatType::Csv,
320 destination: DestinationConfig {
321 destination_type: DestinationType::Local,
322 path: Some("./out".to_string()),
323 ..Default::default()
324 },
325 ..crate::config::sample_export(name)
326 }
327 }
328
329 #[test]
333 fn target_fail_note_names_count_target_and_strict_gate() {
334 let note = target_fail_note(2, "bigquery");
335 assert!(note.contains("2 columns FAIL"), "got: {note}");
336 assert!(note.contains("bigquery"), "got: {note}");
337 assert!(note.contains("--strict"), "got: {note}");
338 assert!(note.contains("exit 0"), "got: {note}");
339 }
340
341 #[test]
342 fn target_fail_note_singular_for_one_column() {
343 let note = target_fail_note(1, "duckdb");
344 assert!(note.contains("1 column FAIL"), "got: {note}");
345 assert!(!note.contains("1 columns"), "should be singular: {note}");
346 }
347
348 #[test]
349 fn verdict_small_indexed_with_cursor_is_efficient() {
350 let v = compute_verdict(Some(500_000), true, true);
351 assert!(matches!(v, HealthVerdict::Efficient), "got: {v}");
352 }
353
354 #[test]
355 fn verdict_large_indexed_with_cursor_is_acceptable() {
356 let v = compute_verdict(Some(20_000_000), true, true);
357 assert!(matches!(v, HealthVerdict::Acceptable), "got: {v}");
358 }
359
360 #[test]
361 fn verdict_no_index_no_cursor_is_degraded() {
362 let v = compute_verdict(Some(500_000), false, false);
363 assert!(matches!(v, HealthVerdict::Degraded), "got: {v}");
364 }
365
366 #[test]
367 fn verdict_huge_no_index_is_unsafe() {
368 let v = compute_verdict(Some(100_000_000), false, false);
369 assert!(matches!(v, HealthVerdict::Unsafe), "got: {v}");
370 }
371
372 #[test]
373 fn parse_pg_row_estimate_from_sort_plan() {
374 let plan = "Sort (cost=12345.67..12456.78 rows=1000455 width=50)\n -> Seq Scan on orders (cost=0.00..8765.43 rows=1000455 width=50)";
375 assert_eq!(parse_pg_row_estimate(plan), Some(1_000_455));
376 }
377
378 #[test]
379 fn parse_pg_row_estimate_from_index_scan() {
380 let plan =
381 "Index Scan using idx_updated on orders (cost=0.42..81676.36 rows=500000 width=50)";
382 assert_eq!(parse_pg_row_estimate(plan), Some(500_000));
383 }
384
385 #[test]
386 fn extract_scan_type_detects_seq_scan() {
387 let plan = "Sort (cost=...)\n -> Seq Scan on users (cost=...)";
388 let st = extract_scan_type(plan);
389 assert!(st.contains("Seq Scan"), "expected Seq Scan, got: {st}");
390 }
391
392 #[test]
393 fn extract_scan_type_detects_index_scan() {
394 let plan = "Index Scan using users_pkey on users (cost=0.42..123.45 rows=100 width=50)";
395 let st = extract_scan_type(plan);
396 assert!(st.contains("Index Scan"), "expected Index Scan, got: {st}");
397 }
398
399 #[test]
400 fn suggestion_for_efficient_verdict_is_none() {
401 let e = make_export("t", ExportMode::Full, None);
402 let s = build_suggestion(&HealthVerdict::Efficient, Some(1000), true, &e);
403 assert!(
404 s.is_none(),
405 "efficient verdict should produce no suggestion"
406 );
407 }
408
409 #[test]
410 fn suggestion_for_degraded_verdict_recommends_safe_profile() {
411 let e = make_export("t", ExportMode::Full, None);
412 let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e);
413 let msg = s.expect("degraded verdict should produce a suggestion");
414 assert!(
415 msg.contains("safe"),
416 "suggestion should recommend safe profile, got: {msg}"
417 );
418 }
419
420 fn src_err(msg: &str) -> &'static str {
421 categorize_source_error(&anyhow::anyhow!("{}", msg))
422 }
423
424 #[test]
425 fn source_password_rejected_is_auth_error() {
426 assert_eq!(
427 src_err("password authentication failed for user \"rivet\""),
428 "auth error"
429 );
430 }
431
432 #[test]
433 fn source_authentication_failed_is_auth_error() {
434 assert_eq!(src_err("FATAL: authentication failed"), "auth error");
435 }
436
437 #[test]
438 fn source_access_denied_is_auth_error() {
439 assert_eq!(
440 src_err("Access denied for user 'rivet'@'localhost'"),
441 "auth error"
442 );
443 }
444
445 #[test]
446 fn source_connection_refused_is_connectivity() {
447 assert_eq!(
448 src_err("connection refused (os error 61)"),
449 "connectivity error"
450 );
451 }
452
453 #[test]
454 fn source_timed_out_is_connectivity() {
455 assert_eq!(src_err("connection timed out"), "connectivity error");
456 }
457
458 #[test]
459 fn source_dns_translate_host_is_connectivity() {
460 assert_eq!(
461 src_err("could not translate host name \"db.bad\" to address"),
462 "connectivity error"
463 );
464 }
465
466 #[test]
467 fn source_name_not_known_is_connectivity() {
468 assert_eq!(src_err("Name or service not known"), "connectivity error");
469 }
470
471 #[test]
472 fn source_unknown_error_is_generic() {
473 assert_eq!(src_err("something totally unexpected"), "error");
474 }
475
476 fn dest_config(dtype: DestinationType) -> DestinationConfig {
477 DestinationConfig {
478 destination_type: dtype,
479 bucket: Some("b".to_string()),
480 ..Default::default()
481 }
482 }
483
484 fn dest_err(msg: &str, dtype: DestinationType) -> &'static str {
485 let cfg = dest_config(dtype);
486 categorize_dest_error(&anyhow::anyhow!("{}", msg), &cfg)
487 }
488
489 fn local_dest(path: &str) -> DestinationConfig {
490 DestinationConfig {
491 destination_type: DestinationType::Local,
492 path: Some(path.to_string()),
493 ..Default::default()
494 }
495 }
496
497 #[test]
502 fn destination_identity_distinguishes_local_paths() {
503 assert_ne!(
504 destination_identity(&local_dest("/tmp/a")),
505 destination_identity(&local_dest("/tmp/b")),
506 );
507 }
508
509 #[test]
510 fn destination_identity_collapses_identical_local_destinations() {
511 assert_eq!(
512 destination_identity(&local_dest("/tmp/a")),
513 destination_identity(&local_dest("/tmp/a")),
514 );
515 }
516
517 #[test]
518 fn destination_identity_distinguishes_buckets() {
519 let a = DestinationConfig {
520 bucket: Some("bucket-a".to_string()),
521 ..dest_config(DestinationType::S3)
522 };
523 let b = DestinationConfig {
524 bucket: Some("bucket-b".to_string()),
525 ..dest_config(DestinationType::S3)
526 };
527 assert_ne!(destination_identity(&a), destination_identity(&b));
528 }
529
530 #[test]
533 fn destination_identity_distinguishes_endpoints_for_same_bucket() {
534 let aws = dest_config(DestinationType::S3);
535 let minio = DestinationConfig {
536 endpoint: Some("http://localhost:9000".to_string()),
537 ..dest_config(DestinationType::S3)
538 };
539 assert_ne!(destination_identity(&aws), destination_identity(&minio));
540 }
541
542 #[test]
543 fn dest_credential_loading_is_auth_error() {
544 assert_eq!(
545 dest_err(
546 "loading credential to sign http request",
547 DestinationType::Gcs
548 ),
549 "auth error"
550 );
551 }
552
553 #[test]
554 fn dest_permission_denied_is_auth_error() {
555 assert_eq!(
556 dest_err("permission denied on resource bucket", DestinationType::S3),
557 "auth error"
558 );
559 }
560
561 #[test]
562 fn dest_forbidden_is_auth_error() {
563 assert_eq!(
564 dest_err("403 Forbidden", DestinationType::Gcs),
565 "auth error"
566 );
567 }
568
569 #[test]
570 fn dest_unauthorized_is_auth_error() {
571 assert_eq!(
572 dest_err("401 Unauthorized", DestinationType::S3),
573 "auth error"
574 );
575 }
576
577 #[test]
578 fn dest_invalid_grant_is_auth_error() {
579 assert_eq!(
580 dest_err(
581 "invalid_grant: token has been revoked",
582 DestinationType::Gcs
583 ),
584 "auth error"
585 );
586 }
587
588 #[test]
589 fn dest_nosuchbucket_s3_is_bucket_not_found() {
590 assert_eq!(
591 dest_err(
592 "NoSuchBucket: the specified bucket does not exist",
593 DestinationType::S3
594 ),
595 "bucket not found"
596 );
597 }
598
599 #[test]
600 fn dest_not_found_gcs_is_bucket_not_found() {
601 assert_eq!(
602 dest_err("bucket not found (404)", DestinationType::Gcs),
603 "bucket not found"
604 );
605 }
606
607 #[test]
608 fn dest_not_found_local_is_path_not_found() {
609 assert_eq!(
610 dest_err("path not found: /tmp/missing", DestinationType::Local),
611 "path not found"
612 );
613 }
614
615 #[test]
616 fn dest_connection_refused_is_connectivity() {
617 assert_eq!(
618 dest_err("connection refused to endpoint", DestinationType::S3),
619 "connectivity error"
620 );
621 }
622
623 #[test]
624 fn dest_dns_error_is_connectivity() {
625 assert_eq!(
626 dest_err("dns error: failed to lookup address", DestinationType::S3),
627 "connectivity error"
628 );
629 }
630
631 #[test]
632 fn dest_timed_out_is_connectivity() {
633 assert_eq!(
634 dest_err("request timed out after 30s", DestinationType::Gcs),
635 "connectivity error"
636 );
637 }
638
639 #[test]
640 fn dest_unknown_error_is_generic() {
641 assert_eq!(
642 dest_err("something else entirely", DestinationType::S3),
643 "error"
644 );
645 }
646
647 #[test]
648 fn strategy_full_scan() {
649 let e = make_export("t", ExportMode::Full, None);
650 assert_eq!(derive_strategy(&e), "full-scan");
651 }
652
653 #[test]
654 fn strategy_full_parallel() {
655 let mut e = make_export("t", ExportMode::Full, None);
656 e.parallel = 4;
657 assert_eq!(derive_strategy(&e), "full-parallel(4)");
658 }
659
660 #[test]
661 fn strategy_incremental() {
662 let e = make_export("t", ExportMode::Incremental, Some("updated_at"));
663 assert_eq!(derive_strategy(&e), "incremental(updated_at)");
664 }
665
666 #[test]
667 fn strategy_chunked() {
668 let mut e = make_export("t", ExportMode::Chunked, None);
669 e.chunk_column = Some("id".to_string());
670 e.chunk_size = 50_000;
671 assert_eq!(derive_strategy(&e), "chunked(id, size=50000)");
672 }
673
674 #[test]
675 fn strategy_chunked_parallel() {
676 let mut e = make_export("t", ExportMode::Chunked, None);
677 e.chunk_column = Some("id".to_string());
678 e.chunk_size = 50_000;
679 e.parallel = 3;
680 assert_eq!(derive_strategy(&e), "chunked-parallel(id, size=50000, p=3)");
681 }
682
683 #[test]
684 fn strategy_time_window() {
685 let mut e = make_export("t", ExportMode::TimeWindow, None);
686 e.time_column = Some("created_at".to_string());
687 e.days_window = Some(7);
688 assert_eq!(derive_strategy(&e), "time-window(created_at, 7d)");
689 }
690
691 #[test]
692 fn profile_small_indexed_is_fast() {
693 let e = make_export("t", ExportMode::Full, None);
694 assert_eq!(recommend_profile(Some(500_000), true, &e), "fast");
695 }
696
697 #[test]
698 fn profile_medium_indexed_is_balanced() {
699 let e = make_export("t", ExportMode::Full, None);
700 assert_eq!(recommend_profile(Some(5_000_000), true, &e), "balanced");
701 }
702
703 #[test]
704 fn profile_large_indexed_is_safe() {
705 let e = make_export("t", ExportMode::Full, None);
706 assert_eq!(recommend_profile(Some(50_000_000), true, &e), "safe");
707 }
708
709 #[test]
710 fn profile_small_no_index_is_balanced() {
711 let e = make_export("t", ExportMode::Full, None);
712 assert_eq!(recommend_profile(Some(50_000), false, &e), "balanced");
713 }
714
715 #[test]
716 fn profile_small_no_index_parallel_is_safe() {
717 let mut e = make_export("t", ExportMode::Full, None);
718 e.parallel = 4;
719 assert_eq!(recommend_profile(Some(50_000), false, &e), "safe");
720 }
721
722 #[test]
723 fn profile_medium_no_index_is_balanced() {
724 let e = make_export("t", ExportMode::Full, None);
725 assert_eq!(recommend_profile(Some(500_000), false, &e), "balanced");
726 }
727
728 #[test]
729 fn profile_large_no_index_is_safe() {
730 let e = make_export("t", ExportMode::Full, None);
731 assert_eq!(recommend_profile(Some(5_000_000), false, &e), "safe");
732 }
733
734 #[test]
735 fn sparse_range_warning_when_very_sparse() {
736 let mut e = make_export("t", ExportMode::Chunked, None);
737 e.chunk_column = Some("id".to_string());
738 e.chunk_size = 100_000;
739 let w = check_sparse_range(&e, Some(100_000), Some("1"), Some("10000000"));
740 assert!(w.is_some(), "should warn about sparse range");
741 let msg = w.unwrap();
742 assert!(msg.contains("Sparse key range"), "got: {msg}");
743 assert!(msg.contains("empty"), "got: {msg}");
744 }
745
746 #[test]
747 fn sparse_range_no_warning_when_dense() {
748 let mut e = make_export("t", ExportMode::Chunked, None);
749 e.chunk_column = Some("id".to_string());
750 e.chunk_size = 100_000;
751 let w = check_sparse_range(&e, Some(100_000), Some("1"), Some("100000"));
752 assert!(w.is_none(), "should not warn for dense range");
753 }
754
755 #[test]
756 fn sparse_range_skipped_when_chunk_dense() {
757 let mut e = make_export("t", ExportMode::Chunked, None);
758 e.chunk_column = Some("id".to_string());
759 e.chunk_dense = true;
760 e.chunk_size = 100_000;
761 let w = check_sparse_range(&e, Some(100_000), Some("1"), Some("10000000"));
762 assert!(
763 w.is_none(),
764 "chunk_dense uses ordinals, not physical id span"
765 );
766 }
767
768 #[test]
769 fn dense_surrogate_warning_when_chunk_dense_builtin() {
770 let mut e = make_export("t", ExportMode::Chunked, None);
771 e.chunk_column = Some("id".to_string());
772 e.chunk_dense = true;
773 e.query = Some("SELECT id FROM orders".to_string());
774 let w = check_dense_surrogate_cost(&e);
775 assert!(w.is_some(), "should warn about built-in ROW_NUMBER cost");
776 assert!(w.unwrap().contains("global sort"));
777 }
778
779 #[test]
780 fn sparse_range_not_triggered_for_non_chunked() {
781 let e = make_export("t", ExportMode::Full, None);
782 let w = check_sparse_range(&e, Some(100), Some("1"), Some("1000000"));
783 assert!(w.is_none(), "should not warn for non-chunked mode");
784 }
785
786 #[test]
787 fn dense_surrogate_warning_with_row_number() {
788 let mut e = make_export("t", ExportMode::Chunked, None);
789 e.chunk_column = Some("rn".to_string());
790 e.query = Some("SELECT *, ROW_NUMBER() OVER (ORDER BY id) AS rn FROM orders".to_string());
791 let w = check_dense_surrogate_cost(&e);
792 assert!(w.is_some(), "should warn about ROW_NUMBER cost");
793 assert!(w.unwrap().contains("global sort"));
794 }
795
796 #[test]
797 fn no_dense_surrogate_warning_without_row_number() {
798 let mut e = make_export("t", ExportMode::Chunked, None);
799 e.chunk_column = Some("id".to_string());
800 e.query = Some("SELECT * FROM orders".to_string());
801 let w = check_dense_surrogate_cost(&e);
802 assert!(w.is_none());
803 }
804
805 #[test]
806 fn no_dense_surrogate_warning_for_non_chunked() {
807 let mut e = make_export("t", ExportMode::Full, None);
808 e.query = Some("SELECT ROW_NUMBER() OVER () AS rn FROM t".to_string());
809 let w = check_dense_surrogate_cost(&e);
810 assert!(w.is_none(), "should not warn for non-chunked mode");
811 }
812
813 #[test]
814 fn parallel_memory_warning_large_dataset() {
815 let mut e = make_export("t", ExportMode::Chunked, None);
816 e.parallel = 4;
817 let w = check_parallel_memory_risk(&e, Some(10_000_000));
818 assert!(w.is_some(), "should warn about memory risk");
819 let msg = w.unwrap();
820 assert!(msg.contains("Parallel=4"), "got: {msg}");
821 assert!(msg.contains("memory"), "got: {msg}");
822 }
823
824 #[test]
825 fn no_parallel_memory_warning_small_dataset() {
826 let mut e = make_export("t", ExportMode::Chunked, None);
827 e.parallel = 4;
828 let w = check_parallel_memory_risk(&e, Some(1_000));
829 assert!(w.is_none(), "should not warn for small dataset");
830 }
831
832 #[test]
833 fn no_parallel_memory_warning_single_worker() {
834 let e = make_export("t", ExportMode::Full, None);
835 let w = check_parallel_memory_risk(&e, Some(100_000_000));
836 assert!(w.is_none(), "should not warn when parallel=1");
837 }
838
839 #[test]
840 fn suggestion_degraded_full_recommends_incremental() {
841 let e = make_export("t", ExportMode::Full, None);
842 let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e).unwrap();
843 assert!(s.contains("incremental"), "got: {s}");
844 }
845
846 #[test]
847 fn suggestion_degraded_chunked_recommends_index() {
848 let mut e = make_export("t", ExportMode::Chunked, None);
849 e.chunk_column = Some("id".to_string());
850 let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e).unwrap();
851 assert!(s.contains("index on 'id'"), "got: {s}");
852 }
853
854 #[test]
855 fn suggestion_degraded_time_window_recommends_index() {
856 let mut e = make_export("t", ExportMode::TimeWindow, None);
857 e.time_column = Some("created_at".to_string());
858 e.days_window = Some(7);
859 let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e).unwrap();
860 assert!(s.contains("index on 'created_at'"), "got: {s}");
861 }
862
863 #[test]
864 fn suggestion_unsafe_full_recommends_incremental() {
865 let e = make_export("t", ExportMode::Full, None);
866 let s = build_suggestion(&HealthVerdict::Unsafe, Some(100_000_000), false, &e).unwrap();
867 assert!(s.contains("incremental"), "got: {s}");
868 }
869
870 #[test]
871 fn suggestion_unsafe_chunked_recommends_index_and_parallel() {
872 let mut e = make_export("t", ExportMode::Chunked, None);
873 e.chunk_column = Some("id".to_string());
874 let s = build_suggestion(&HealthVerdict::Unsafe, Some(100_000_000), false, &e).unwrap();
875 assert!(s.contains("index on 'id'"), "got: {s}");
876 assert!(s.contains("parallel"), "got: {s}");
877 }
878
879 #[test]
880 fn suggestion_unsafe_incremental_recommends_index_on_cursor() {
881 let e = make_export("t", ExportMode::Incremental, Some("updated_at"));
882 let s = build_suggestion(&HealthVerdict::Unsafe, Some(100_000_000), false, &e).unwrap();
883 assert!(s.contains("index on 'updated_at'"), "got: {s}");
884 }
885
886 #[test]
887 fn suggestion_acceptable_large_full_recommends_incremental() {
888 let e = make_export("t", ExportMode::Full, None);
889 let s = build_suggestion(&HealthVerdict::Acceptable, Some(20_000_000), true, &e).unwrap();
890 assert!(s.contains("incremental"), "got: {s}");
891 }
892
893 #[test]
894 fn parallel_only_for_chunked_mode() {
895 let e = make_export("t", ExportMode::Full, None);
896 let (level, _) = recommend_parallelism(&e, Some(1_000_000), true);
897 assert_eq!(level, 1, "non-chunked mode should recommend 1");
898 }
899
900 #[test]
901 fn parallel_small_dataset_is_one() {
902 let mut e = make_export("t", ExportMode::Chunked, None);
903 e.chunk_column = Some("id".to_string());
904 let (level, _) = recommend_parallelism(&e, Some(10_000), true);
905 assert_eq!(level, 1, "small dataset should recommend 1");
906 }
907
908 #[test]
909 fn parallel_moderate_indexed_is_two() {
910 let mut e = make_export("t", ExportMode::Chunked, None);
911 e.chunk_column = Some("id".to_string());
912 let (level, _) = recommend_parallelism(&e, Some(200_000), true);
913 assert_eq!(level, 2, "moderate indexed dataset should recommend 2");
914 }
915
916 #[test]
917 fn parallel_large_indexed_is_four() {
918 let mut e = make_export("t", ExportMode::Chunked, None);
919 e.chunk_column = Some("id".to_string());
920 let (level, _) = recommend_parallelism(&e, Some(2_000_000), true);
921 assert_eq!(level, 4, "large indexed dataset should recommend 4");
922 }
923
924 #[test]
925 fn parallel_no_index_large_is_one() {
926 let mut e = make_export("t", ExportMode::Chunked, None);
927 e.chunk_column = Some("id".to_string());
928 let (level, reason) = recommend_parallelism(&e, Some(10_000_000), false);
929 assert_eq!(level, 1, "no index + large should recommend 1");
930 assert!(reason.contains("no index"), "got: {reason}");
931 }
932
933 #[test]
934 fn parallel_no_index_moderate_is_conservative() {
935 let mut e = make_export("t", ExportMode::Chunked, None);
936 e.chunk_column = Some("id".to_string());
937 let (level, _) = recommend_parallelism(&e, Some(200_000), false);
938 assert_eq!(
939 level, 2,
940 "no index + moderate should recommend 2 (conservative)"
941 );
942 }
943
944 #[test]
945 fn suggestion_acceptable_large_chunked_recommends_parallel() {
946 let mut e = make_export("t", ExportMode::Chunked, None);
947 e.chunk_column = Some("id".to_string());
948 let s = build_suggestion(&HealthVerdict::Acceptable, Some(20_000_000), true, &e).unwrap();
949 assert!(s.contains("parallel"), "got: {s}");
950 }
951
952 #[test]
953 fn connection_limit_warn_when_parallel_meets_max() {
954 let w = check_connection_limit(20, Some(20));
955 assert!(w.is_some(), "should warn when parallel == max_connections");
956 let msg = w.unwrap();
957 assert!(msg.contains("max_connections=20"), "got: {msg}");
958 assert!(msg.contains("parallel=20"), "got: {msg}");
959 }
960
961 #[test]
962 fn connection_limit_warn_when_parallel_exceeds_max() {
963 let w = check_connection_limit(100, Some(20));
964 assert!(w.is_some(), "should warn when parallel > max_connections");
965 let msg = w.unwrap();
966 assert!(msg.contains("max_connections=20"), "got: {msg}");
967 }
968
969 #[test]
970 fn connection_limit_no_warn_when_parallel_below_max() {
971 let w = check_connection_limit(4, Some(100));
972 assert!(
973 w.is_none(),
974 "should not warn when parallel << max_connections"
975 );
976 }
977
978 #[test]
979 fn connection_limit_no_warn_when_parallel_is_one() {
980 let w = check_connection_limit(1, Some(5));
981 assert!(
982 w.is_none(),
983 "single worker never triggers connection warning"
984 );
985 }
986
987 #[test]
988 fn connection_limit_skipped_note_when_max_unknown_and_parallel_gt_one() {
989 let w = check_connection_limit(100, None);
990 assert!(w.is_some(), "should note that check was skipped");
991 let msg = w.unwrap();
992 assert!(msg.contains("skipped"), "got: {msg}");
993 }
994
995 #[test]
996 fn connection_limit_no_note_when_max_unknown_and_parallel_is_one() {
997 let w = check_connection_limit(1, None);
998 assert!(
999 w.is_none(),
1000 "single worker never triggers connection warning"
1001 );
1002 }
1003
1004 #[test]
1005 fn connection_limit_suggests_headroom() {
1006 let w = check_connection_limit(25, Some(20)).unwrap();
1007 assert!(
1009 w.contains("17"),
1010 "should suggest leaving headroom, got: {w}"
1011 );
1012 }
1013
1014 fn src_hint(msg: &str, st: SourceType) -> Option<&'static str> {
1017 let err = anyhow::anyhow!("{}", msg);
1018 let cat = categorize_source_error(&err);
1019 source_error_hint(cat, &err, &st)
1020 }
1021
1022 fn dest_hint(msg: &str, dt: DestinationType) -> Option<&'static str> {
1023 let err = anyhow::anyhow!("{}", msg);
1024 let dest = DestinationConfig {
1025 destination_type: dt,
1026 bucket: Some("b".into()),
1027 ..Default::default()
1028 };
1029 let cat = categorize_dest_error(&err, &dest);
1030 destination_error_hint(cat, &dest)
1031 }
1032
1033 #[test]
1034 fn source_tls_handshake_returns_pg_specific_tls_hint() {
1035 let h = src_hint("TLS handshake failed", SourceType::Postgres).expect("hint");
1036 assert!(h.contains("tls.mode") && h.contains("ca_file"), "got: {h}");
1037 }
1038
1039 #[test]
1040 fn source_tls_handshake_returns_mysql_specific_tls_hint() {
1041 let h = src_hint("certificate verify failed", SourceType::Mysql).expect("hint");
1042 assert!(h.contains("tls.mode"), "got: {h}");
1043 }
1044
1045 #[test]
1046 fn source_auth_error_postgres_mentions_pg_hba() {
1047 let h = src_hint("password authentication failed", SourceType::Postgres).expect("hint");
1048 assert!(h.contains("pg_hba") && h.contains("SELECT"), "got: {h}");
1049 }
1050
1051 #[test]
1052 fn source_auth_error_mysql_mentions_grant() {
1053 let h = src_hint(
1054 "Access denied for user 'rivet'@'localhost'",
1055 SourceType::Mysql,
1056 )
1057 .expect("hint");
1058 assert!(h.contains("GRANT") && h.contains("FLUSH"), "got: {h}");
1059 }
1060
1061 #[test]
1062 fn source_connectivity_error_mentions_bastion_and_network() {
1063 let h = src_hint("connection refused", SourceType::Postgres).expect("hint");
1064 assert!(h.contains("bastion") || h.contains("VPN"), "got: {h}");
1065 }
1066
1067 #[test]
1068 fn source_unknown_error_returns_no_hint() {
1069 let h = src_hint("totally unexpected", SourceType::Postgres);
1072 assert!(h.is_none(), "unknown errors should not produce a hint");
1073 }
1074
1075 #[test]
1076 fn dest_s3_auth_error_names_concrete_actions() {
1077 let h = dest_hint("permission denied", DestinationType::S3).expect("hint");
1078 assert!(
1079 h.contains("s3:PutObject") && h.contains("cloud-permissions"),
1080 "got: {h}"
1081 );
1082 }
1083
1084 #[test]
1085 fn dest_gcs_auth_error_names_concrete_actions() {
1086 let h = dest_hint("403 Forbidden", DestinationType::Gcs).expect("hint");
1087 assert!(
1088 h.contains("storage.objects") && h.contains("cloud-permissions"),
1089 "got: {h}"
1090 );
1091 }
1092
1093 #[test]
1094 fn categorize_dest_error_sas_expired_message_returns_sas_expired_category() {
1095 let err = anyhow::anyhow!(
1100 "Azure SAS token already expired (se=2024-01-01T00:00:00Z). Generate a new SAS and re-export."
1101 );
1102 let dest = DestinationConfig {
1103 destination_type: DestinationType::Azure,
1104 bucket: Some("c".into()),
1105 ..Default::default()
1106 };
1107 let cat = categorize_dest_error(&err, &dest);
1108 assert_eq!(
1109 cat, "sas expired",
1110 "expired-SAS error must categorise as 'sas expired', not '{cat}' — ordering in categorize_dest_error is load-bearing"
1111 );
1112 }
1113
1114 #[test]
1115 fn dest_azure_sas_expired_returns_regenerate_hint() {
1116 let h = dest_hint(
1120 "Azure SAS token already expired (se=2024-01-01T00:00:00Z)",
1121 DestinationType::Azure,
1122 )
1123 .expect("hint");
1124 assert!(
1125 h.contains("generate-sas") && h.contains("AZURE_STORAGE_SAS_TOKEN"),
1126 "got: {h}"
1127 );
1128 }
1129
1130 #[test]
1131 fn dest_s3_bucket_not_found_says_no_auto_create() {
1132 let h = dest_hint("NoSuchBucket", DestinationType::S3).expect("hint");
1133 assert!(
1134 h.contains("does NOT auto-create") && h.contains("aws s3 mb"),
1135 "got: {h}"
1136 );
1137 }
1138
1139 #[test]
1140 fn dest_s3_connectivity_error_warns_about_region_mismatch() {
1141 let h = dest_hint("dns error", DestinationType::S3).expect("hint");
1142 assert!(h.contains("region") || h.contains("endpoint"), "got: {h}");
1143 }
1144}