1mod analysis;
2pub(crate) mod cursor_expr;
3mod doctor;
4mod mssql;
5mod mysql;
6mod postgres;
7mod schema_error;
8pub mod type_report;
9
10pub(crate) use analysis::chunk_sparsity_from_counts;
11#[cfg(test)]
12use analysis::{
13 build_suggestion, check_connection_limit, check_dense_surrogate_cost,
14 check_parallel_memory_risk, check_sparse_range, compute_verdict, derive_strategy,
15 recommend_parallelism, recommend_profile,
16};
17#[allow(unused_imports)]
18pub use doctor::doctor;
19pub(crate) use doctor::{categorize_source_error, source_error_hint};
22#[cfg(test)]
23use postgres::{extract_scan_type, parse_pg_row_estimate};
24
25use crate::config::{Config, ExportConfig, SourceType};
26use crate::error::Result;
27use crate::types::policy::TypePolicy;
28use crate::types::target::{ExportTarget, TargetStatus};
29
30#[derive(Debug)]
31pub enum HealthVerdict {
32 Efficient,
33 Acceptable,
34 Degraded,
35 Unsafe,
36}
37
38impl std::fmt::Display for HealthVerdict {
39 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40 match self {
41 Self::Efficient => write!(f, "EFFICIENT"),
42 Self::Acceptable => write!(f, "ACCEPTABLE"),
43 Self::Degraded => write!(f, "DEGRADED"),
44 Self::Unsafe => write!(f, "UNSAFE"),
45 }
46 }
47}
48
49pub(crate) struct ExportDiagnostic {
50 pub export_name: String,
51 pub strategy: String,
52 pub mode: String,
53 pub cursor_column: Option<String>,
54 pub row_estimate: Option<i64>,
55 pub avg_row_bytes: Option<i64>,
60 pub cursor_min: Option<String>,
61 pub cursor_max: Option<String>,
62 pub scan_type: Option<String>,
63 pub uses_index: bool,
64 pub verdict: HealthVerdict,
65 pub recommended_profile: &'static str,
66 pub recommended_parallel: (u32, &'static str),
67 pub warnings: Vec<String>,
68 pub suggestion: Option<String>,
69}
70
71pub(crate) fn get_export_diagnostic(
75 config: &Config,
76 export: &ExportConfig,
77) -> Result<ExportDiagnostic> {
78 let url = config.source.resolve_url()?;
79 let tls = config.source.tls.as_ref();
80 crate::source::warn_if_tls_disabled(&config.source);
81 match config.source.source_type {
82 SourceType::Postgres => postgres::diagnose_export_pg(&url, tls, export),
83 SourceType::Mysql => mysql::diagnose_export_mysql(&url, tls, export),
84 SourceType::Mssql => mssql::diagnose_export_mssql(&url, tls, export),
85 }
86}
87
88fn destination_identity(d: &crate::config::DestinationConfig) -> String {
95 format!(
96 "{:?}:{}:{}:{}",
97 d.destination_type,
98 d.bucket.as_deref().unwrap_or("-"),
99 d.endpoint.as_deref().unwrap_or("-"),
100 d.path.as_deref().unwrap_or("-"),
101 )
102}
103
104fn target_fail_note(n: usize, target_label: &str) -> String {
110 let col = if n == 1 { "column" } else { "columns" };
111 format!(
112 "Note: {n} {col} FAIL {target_label} compatibility; exit code is gated only with --strict (currently exit 0)"
113 )
114}
115
116pub fn check(
117 config_path: &str,
118 export_name: Option<&str>,
119 params: Option<&std::collections::HashMap<String, String>>,
120 show_type_report: bool,
121 strict: bool,
122 json_output: bool,
123 target: Option<ExportTarget>,
124) -> Result<()> {
125 let config = Config::load_with_params(config_path, params)?;
126
127 let exports: Vec<&ExportConfig> = if let Some(name) = export_name {
128 let e = config
129 .exports
130 .iter()
131 .find(|e| e.name == name)
132 .ok_or_else(|| anyhow::anyhow!("export '{}' not found in config", name))?;
133 vec![e]
134 } else {
135 config.exports.iter().collect()
136 };
137
138 let url = config.source.resolve_url()?;
139 let tls = config.source.tls.as_ref();
140 crate::source::warn_if_tls_disabled(&config.source);
146 match config.source.source_type {
147 SourceType::Postgres => postgres::check_postgres(&url, tls, &exports, json_output)?,
148 SourceType::Mysql => mysql::check_mysql(&url, tls, &exports, json_output)?,
149 SourceType::Mssql => mssql::check_mssql(&url, tls, &exports, json_output)?,
150 }
151
152 let mut seen_destinations: std::collections::HashSet<String> = std::collections::HashSet::new();
161 for export in &exports {
162 let dest_key = destination_identity(&export.destination);
163 if !seen_destinations.insert(dest_key) {
164 continue;
165 }
166 let expanded = crate::plan::build::expand_destination_templates(
167 export.destination.clone(),
168 &export.name,
169 );
170 crate::destination::create_destination(&expanded).map_err(|e| {
171 anyhow::anyhow!(
172 "export '{}': destination preflight failed: {:#}",
173 export.name,
174 e
175 )
176 })?;
177 }
178
179 let mut clean = true;
183
184 if show_type_report {
185 let policy = if strict {
186 TypePolicy::strict()
187 } else {
188 TypePolicy::warn_only()
189 };
190
191 let mut any_fatal = false;
192 let mut target_fail_cols = 0usize;
198 let mut target_fail_label: Option<&'static str> = None;
199 for export in &exports {
200 let column_overrides =
201 crate::plan::parse_column_overrides_pub(&export.columns, &export.name)?;
202 if let Some(t) = export.target.as_deref()
206 && crate::types::target::ExportTarget::parse(t).is_none()
207 {
208 anyhow::bail!(
209 "export '{}': unknown target '{t}' (expected: {})",
210 export.name,
211 crate::types::target::ExportTarget::valid_target_names()
212 );
213 }
214 let eff_target = target.or_else(|| {
215 export
216 .target
217 .as_deref()
218 .and_then(crate::types::target::ExportTarget::parse)
219 });
220 let config_dir = std::path::Path::new(config_path)
221 .parent()
222 .unwrap_or_else(|| std::path::Path::new("."));
223 match type_report::collect_report(
224 &config,
225 export,
226 &column_overrides,
227 &policy,
228 eff_target,
229 config_dir,
230 params,
231 ) {
232 Ok(report) => {
233 if report.has_fatal() {
234 any_fatal = true;
235 }
236 if let Some(t) = eff_target
237 && report.has_target_fail()
238 {
239 any_fatal = true;
240 target_fail_cols += report
241 .columns
242 .iter()
243 .filter(|c| c.target_status == Some(TargetStatus::Fail))
244 .count();
245 target_fail_label.get_or_insert(t.label());
246 }
247 if json_output {
248 type_report::print_json(&report)?;
249 } else {
250 type_report::print_table(&report, eff_target);
251 }
252 }
253 Err(e) => {
254 log::warn!("type report for '{}' failed: {:#}", export.name, e);
255 }
256 }
257 }
258
259 if strict && any_fatal {
260 anyhow::bail!("strict mode: unsafe type mappings found (see report above)");
261 } else if !strict && target_fail_cols > 0 && !json_output {
262 clean = false;
265 println!();
266 println!(
267 "{}",
268 target_fail_note(target_fail_cols, target_fail_label.unwrap_or("target"))
269 );
270 }
271 }
272
273 if !json_output {
274 println!();
277 println!(
278 "Verdicts: EFFICIENT > ACCEPTABLE > DEGRADED > UNSAFE — advisory only; the run is never blocked."
279 );
280 if clean {
281 println!(
283 "Looks good. Next: rivet run -c {config_path} --validate # export, then verify row counts"
284 );
285 }
286 }
287
288 Ok(())
289}
290
291fn print_diagnostic(diag: &ExportDiagnostic) {
292 println!();
293 println!("Export: {}", diag.export_name);
294 println!(" Strategy: {}", diag.strategy);
295 println!(" Mode: {}", diag.mode);
296 if let Some(est) = diag.row_estimate {
297 if est >= 1_000_000 {
298 println!(" Row estimate: ~{}M", est / 1_000_000);
299 } else if est >= 1_000 {
300 println!(" Row estimate: ~{}K", est / 1_000);
301 } else {
302 println!(" Row estimate: ~{}", est);
303 }
304 }
305 if let Some(w) = diag.avg_row_bytes {
306 println!(" Row width: ~{} bytes", w);
307 }
308 if let (Some(min_v), Some(max_v)) = (&diag.cursor_min, &diag.cursor_max) {
309 println!(" Cursor range: {} .. {}", min_v, max_v);
310 }
311 if let Some(col) = &diag.cursor_column {
312 println!(" Cursor col: {}", col);
313 }
314 if diag.scan_type.is_some() {
319 let access = if diag.uses_index {
320 "index scan (the cursor/chunk column is indexed)"
321 } else {
322 "full table scan (no index on the read path)"
323 };
324 println!(" Access: {access}");
325 }
326 println!(" Verdict: {}", diag.verdict);
327 println!(
328 " Recommended: tuning.profile: {}",
329 diag.recommended_profile
330 );
331 let (par_level, par_reason) = diag.recommended_parallel;
332 if par_level > 1 {
333 println!(" Recommended: parallel: {} ({})", par_level, par_reason);
334 } else {
335 println!(" Parallelism: {} ({})", par_level, par_reason);
336 }
337 for w in &diag.warnings {
338 println!(" Warning: {}", w);
339 }
340 if let Some(suggestion) = &diag.suggestion {
341 println!(" Suggestion: {}", suggestion);
342 }
343}
344
345#[cfg(test)]
346mod tests {
347 use super::*;
348 use crate::config::{DestinationConfig, DestinationType, ExportConfig, ExportMode, FormatType};
349 use doctor::{
350 categorize_dest_error, categorize_source_error, destination_error_hint, source_error_hint,
351 };
352
353 fn make_export(name: &str, mode: ExportMode, cursor: Option<&str>) -> ExportConfig {
354 ExportConfig {
357 mode,
358 cursor_column: cursor.map(|s| s.to_string()),
359 query: Some("SELECT * FROM t".to_string()),
360 format: FormatType::Csv,
361 destination: DestinationConfig {
362 destination_type: DestinationType::Local,
363 path: Some("./out".to_string()),
364 ..Default::default()
365 },
366 ..crate::config::sample_export(name)
367 }
368 }
369
370 #[test]
374 fn target_fail_note_names_count_target_and_strict_gate() {
375 let note = target_fail_note(2, "bigquery");
376 assert!(note.contains("2 columns FAIL"), "got: {note}");
377 assert!(note.contains("bigquery"), "got: {note}");
378 assert!(note.contains("--strict"), "got: {note}");
379 assert!(note.contains("exit 0"), "got: {note}");
380 }
381
382 #[test]
383 fn target_fail_note_singular_for_one_column() {
384 let note = target_fail_note(1, "duckdb");
385 assert!(note.contains("1 column FAIL"), "got: {note}");
386 assert!(!note.contains("1 columns"), "should be singular: {note}");
387 }
388
389 #[test]
390 fn verdict_small_indexed_with_cursor_is_efficient() {
391 let v = compute_verdict(Some(500_000), true, true);
392 assert!(matches!(v, HealthVerdict::Efficient), "got: {v}");
393 }
394
395 #[test]
396 fn verdict_large_indexed_with_cursor_is_acceptable() {
397 let v = compute_verdict(Some(20_000_000), true, true);
398 assert!(matches!(v, HealthVerdict::Acceptable), "got: {v}");
399 }
400
401 #[test]
402 fn verdict_no_index_no_cursor_is_degraded() {
403 let v = compute_verdict(Some(500_000), false, false);
404 assert!(matches!(v, HealthVerdict::Degraded), "got: {v}");
405 }
406
407 #[test]
408 fn verdict_huge_no_index_is_unsafe() {
409 let v = compute_verdict(Some(100_000_000), false, false);
410 assert!(matches!(v, HealthVerdict::Unsafe), "got: {v}");
411 }
412
413 #[test]
414 fn parse_pg_row_estimate_from_sort_plan() {
415 let plan = "Sort (cost=12345.67..12456.78 rows=1000455 width=50)\n -> Seq Scan on orders (cost=0.00..8765.43 rows=1000455 width=50)";
416 assert_eq!(parse_pg_row_estimate(plan), Some(1_000_455));
417 }
418
419 #[test]
420 fn parse_pg_row_estimate_from_index_scan() {
421 let plan =
422 "Index Scan using idx_updated on orders (cost=0.42..81676.36 rows=500000 width=50)";
423 assert_eq!(parse_pg_row_estimate(plan), Some(500_000));
424 }
425
426 #[test]
427 fn extract_scan_type_detects_seq_scan() {
428 let plan = "Sort (cost=...)\n -> Seq Scan on users (cost=...)";
429 let st = extract_scan_type(plan);
430 assert!(st.contains("Seq Scan"), "expected Seq Scan, got: {st}");
431 }
432
433 #[test]
434 fn extract_scan_type_detects_index_scan() {
435 let plan = "Index Scan using users_pkey on users (cost=0.42..123.45 rows=100 width=50)";
436 let st = extract_scan_type(plan);
437 assert!(st.contains("Index Scan"), "expected Index Scan, got: {st}");
438 }
439
440 #[test]
441 fn suggestion_for_efficient_verdict_is_none() {
442 let e = make_export("t", ExportMode::Full, None);
443 let s = build_suggestion(&HealthVerdict::Efficient, Some(1000), true, &e);
444 assert!(
445 s.is_none(),
446 "efficient verdict should produce no suggestion"
447 );
448 }
449
450 #[test]
451 fn suggestion_for_degraded_verdict_recommends_safe_profile() {
452 let e = make_export("t", ExportMode::Full, None);
453 let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e);
454 let msg = s.expect("degraded verdict should produce a suggestion");
455 assert!(
456 msg.contains("safe"),
457 "suggestion should recommend safe profile, got: {msg}"
458 );
459 }
460
461 fn src_err(msg: &str) -> &'static str {
462 categorize_source_error(&anyhow::anyhow!("{}", msg))
463 }
464
465 #[test]
466 fn source_password_rejected_is_auth_error() {
467 assert_eq!(
468 src_err("password authentication failed for user \"rivet\""),
469 "auth error"
470 );
471 }
472
473 #[test]
474 fn source_authentication_failed_is_auth_error() {
475 assert_eq!(src_err("FATAL: authentication failed"), "auth error");
476 }
477
478 #[test]
479 fn source_access_denied_is_auth_error() {
480 assert_eq!(
481 src_err("Access denied for user 'rivet'@'localhost'"),
482 "auth error"
483 );
484 }
485
486 #[test]
487 fn source_connection_refused_is_connectivity() {
488 assert_eq!(
489 src_err("connection refused (os error 61)"),
490 "connectivity error"
491 );
492 }
493
494 #[test]
495 fn source_timed_out_is_connectivity() {
496 assert_eq!(src_err("connection timed out"), "connectivity error");
497 }
498
499 #[test]
500 fn source_dns_translate_host_is_connectivity() {
501 assert_eq!(
502 src_err("could not translate host name \"db.bad\" to address"),
503 "connectivity error"
504 );
505 }
506
507 #[test]
508 fn source_name_not_known_is_connectivity() {
509 assert_eq!(src_err("Name or service not known"), "connectivity error");
510 }
511
512 #[test]
513 fn source_unknown_error_is_generic() {
514 assert_eq!(src_err("something totally unexpected"), "error");
515 }
516
517 fn dest_config(dtype: DestinationType) -> DestinationConfig {
518 DestinationConfig {
519 destination_type: dtype,
520 bucket: Some("b".to_string()),
521 ..Default::default()
522 }
523 }
524
525 fn dest_err(msg: &str, dtype: DestinationType) -> &'static str {
526 let cfg = dest_config(dtype);
527 categorize_dest_error(&anyhow::anyhow!("{}", msg), &cfg)
528 }
529
530 fn local_dest(path: &str) -> DestinationConfig {
531 DestinationConfig {
532 destination_type: DestinationType::Local,
533 path: Some(path.to_string()),
534 ..Default::default()
535 }
536 }
537
538 #[test]
543 fn destination_identity_distinguishes_local_paths() {
544 assert_ne!(
545 destination_identity(&local_dest("/tmp/a")),
546 destination_identity(&local_dest("/tmp/b")),
547 );
548 }
549
550 #[test]
551 fn destination_identity_collapses_identical_local_destinations() {
552 assert_eq!(
553 destination_identity(&local_dest("/tmp/a")),
554 destination_identity(&local_dest("/tmp/a")),
555 );
556 }
557
558 #[test]
559 fn destination_identity_distinguishes_buckets() {
560 let a = DestinationConfig {
561 bucket: Some("bucket-a".to_string()),
562 ..dest_config(DestinationType::S3)
563 };
564 let b = DestinationConfig {
565 bucket: Some("bucket-b".to_string()),
566 ..dest_config(DestinationType::S3)
567 };
568 assert_ne!(destination_identity(&a), destination_identity(&b));
569 }
570
571 #[test]
574 fn destination_identity_distinguishes_endpoints_for_same_bucket() {
575 let aws = dest_config(DestinationType::S3);
576 let minio = DestinationConfig {
577 endpoint: Some("http://localhost:9000".to_string()),
578 ..dest_config(DestinationType::S3)
579 };
580 assert_ne!(destination_identity(&aws), destination_identity(&minio));
581 }
582
583 #[test]
584 fn dest_credential_loading_is_auth_error() {
585 assert_eq!(
586 dest_err(
587 "loading credential to sign http request",
588 DestinationType::Gcs
589 ),
590 "auth error"
591 );
592 }
593
594 #[test]
595 fn dest_permission_denied_is_auth_error() {
596 assert_eq!(
597 dest_err("permission denied on resource bucket", DestinationType::S3),
598 "auth error"
599 );
600 }
601
602 #[test]
603 fn dest_forbidden_is_auth_error() {
604 assert_eq!(
605 dest_err("403 Forbidden", DestinationType::Gcs),
606 "auth error"
607 );
608 }
609
610 #[test]
611 fn dest_unauthorized_is_auth_error() {
612 assert_eq!(
613 dest_err("401 Unauthorized", DestinationType::S3),
614 "auth error"
615 );
616 }
617
618 #[test]
619 fn dest_invalid_grant_is_auth_error() {
620 assert_eq!(
621 dest_err(
622 "invalid_grant: token has been revoked",
623 DestinationType::Gcs
624 ),
625 "auth error"
626 );
627 }
628
629 #[test]
630 fn dest_nosuchbucket_s3_is_bucket_not_found() {
631 assert_eq!(
632 dest_err(
633 "NoSuchBucket: the specified bucket does not exist",
634 DestinationType::S3
635 ),
636 "bucket not found"
637 );
638 }
639
640 #[test]
641 fn dest_not_found_gcs_is_bucket_not_found() {
642 assert_eq!(
643 dest_err("bucket not found (404)", DestinationType::Gcs),
644 "bucket not found"
645 );
646 }
647
648 #[test]
649 fn dest_not_found_local_is_path_not_found() {
650 assert_eq!(
651 dest_err("path not found: /tmp/missing", DestinationType::Local),
652 "path not found"
653 );
654 }
655
656 #[test]
657 fn dest_connection_refused_is_connectivity() {
658 assert_eq!(
659 dest_err("connection refused to endpoint", DestinationType::S3),
660 "connectivity error"
661 );
662 }
663
664 #[test]
665 fn dest_dns_error_is_connectivity() {
666 assert_eq!(
667 dest_err("dns error: failed to lookup address", DestinationType::S3),
668 "connectivity error"
669 );
670 }
671
672 #[test]
673 fn dest_timed_out_is_connectivity() {
674 assert_eq!(
675 dest_err("request timed out after 30s", DestinationType::Gcs),
676 "connectivity error"
677 );
678 }
679
680 #[test]
681 fn dest_unknown_error_is_generic() {
682 assert_eq!(
683 dest_err("something else entirely", DestinationType::S3),
684 "error"
685 );
686 }
687
688 #[test]
689 fn strategy_full_scan() {
690 let e = make_export("t", ExportMode::Full, None);
691 assert_eq!(derive_strategy(&e), "full-scan");
692 }
693
694 #[test]
695 fn strategy_full_parallel() {
696 let mut e = make_export("t", ExportMode::Full, None);
697 e.parallel = 4;
698 assert_eq!(derive_strategy(&e), "full-parallel(4)");
699 }
700
701 #[test]
702 fn strategy_incremental() {
703 let e = make_export("t", ExportMode::Incremental, Some("updated_at"));
704 assert_eq!(derive_strategy(&e), "incremental(updated_at)");
705 }
706
707 #[test]
708 fn strategy_chunked() {
709 let mut e = make_export("t", ExportMode::Chunked, None);
710 e.chunk_column = Some("id".to_string());
711 e.chunk_size = 50_000;
712 assert_eq!(derive_strategy(&e), "chunked(id, size=50000)");
713 }
714
715 #[test]
716 fn strategy_chunked_parallel() {
717 let mut e = make_export("t", ExportMode::Chunked, None);
718 e.chunk_column = Some("id".to_string());
719 e.chunk_size = 50_000;
720 e.parallel = 3;
721 assert_eq!(derive_strategy(&e), "chunked-parallel(id, size=50000, p=3)");
722 }
723
724 #[test]
725 fn strategy_time_window() {
726 let mut e = make_export("t", ExportMode::TimeWindow, None);
727 e.time_column = Some("created_at".to_string());
728 e.days_window = Some(7);
729 assert_eq!(derive_strategy(&e), "time-window(created_at, 7d)");
730 }
731
732 #[test]
733 fn profile_small_indexed_is_fast() {
734 let e = make_export("t", ExportMode::Full, None);
735 assert_eq!(recommend_profile(Some(500_000), true, &e), "fast");
736 }
737
738 #[test]
739 fn profile_medium_indexed_is_balanced() {
740 let e = make_export("t", ExportMode::Full, None);
741 assert_eq!(recommend_profile(Some(5_000_000), true, &e), "balanced");
742 }
743
744 #[test]
745 fn profile_large_indexed_is_safe() {
746 let e = make_export("t", ExportMode::Full, None);
747 assert_eq!(recommend_profile(Some(50_000_000), true, &e), "safe");
748 }
749
750 #[test]
751 fn profile_small_no_index_is_balanced() {
752 let e = make_export("t", ExportMode::Full, None);
753 assert_eq!(recommend_profile(Some(50_000), false, &e), "balanced");
754 }
755
756 #[test]
757 fn profile_small_no_index_parallel_is_safe() {
758 let mut e = make_export("t", ExportMode::Full, None);
759 e.parallel = 4;
760 assert_eq!(recommend_profile(Some(50_000), false, &e), "safe");
761 }
762
763 #[test]
764 fn profile_medium_no_index_is_balanced() {
765 let e = make_export("t", ExportMode::Full, None);
766 assert_eq!(recommend_profile(Some(500_000), false, &e), "balanced");
767 }
768
769 #[test]
770 fn profile_large_no_index_is_safe() {
771 let e = make_export("t", ExportMode::Full, None);
772 assert_eq!(recommend_profile(Some(5_000_000), false, &e), "safe");
773 }
774
775 #[test]
776 fn sparse_range_warning_when_very_sparse() {
777 let mut e = make_export("t", ExportMode::Chunked, None);
778 e.chunk_column = Some("id".to_string());
779 e.chunk_size = 100_000;
780 let w = check_sparse_range(&e, Some(100_000), Some("1"), Some("10000000"));
781 assert!(w.is_some(), "should warn about sparse range");
782 let msg = w.unwrap();
783 assert!(msg.contains("Sparse key range"), "got: {msg}");
784 assert!(msg.contains("empty"), "got: {msg}");
785 }
786
787 #[test]
788 fn sparse_range_no_warning_when_dense() {
789 let mut e = make_export("t", ExportMode::Chunked, None);
790 e.chunk_column = Some("id".to_string());
791 e.chunk_size = 100_000;
792 let w = check_sparse_range(&e, Some(100_000), Some("1"), Some("100000"));
793 assert!(w.is_none(), "should not warn for dense range");
794 }
795
796 #[test]
797 fn sparse_range_skipped_when_chunk_dense() {
798 let mut e = make_export("t", ExportMode::Chunked, None);
799 e.chunk_column = Some("id".to_string());
800 e.chunk_dense = true;
801 e.chunk_size = 100_000;
802 let w = check_sparse_range(&e, Some(100_000), Some("1"), Some("10000000"));
803 assert!(
804 w.is_none(),
805 "chunk_dense uses ordinals, not physical id span"
806 );
807 }
808
809 #[test]
810 fn dense_surrogate_warning_when_chunk_dense_builtin() {
811 let mut e = make_export("t", ExportMode::Chunked, None);
812 e.chunk_column = Some("id".to_string());
813 e.chunk_dense = true;
814 e.query = Some("SELECT id FROM orders".to_string());
815 let w = check_dense_surrogate_cost(&e);
816 assert!(w.is_some(), "should warn about built-in ROW_NUMBER cost");
817 assert!(w.unwrap().contains("global sort"));
818 }
819
820 #[test]
821 fn sparse_range_not_triggered_for_non_chunked() {
822 let e = make_export("t", ExportMode::Full, None);
823 let w = check_sparse_range(&e, Some(100), Some("1"), Some("1000000"));
824 assert!(w.is_none(), "should not warn for non-chunked mode");
825 }
826
827 #[test]
828 fn dense_surrogate_warning_with_row_number() {
829 let mut e = make_export("t", ExportMode::Chunked, None);
830 e.chunk_column = Some("rn".to_string());
831 e.query = Some("SELECT *, ROW_NUMBER() OVER (ORDER BY id) AS rn FROM orders".to_string());
832 let w = check_dense_surrogate_cost(&e);
833 assert!(w.is_some(), "should warn about ROW_NUMBER cost");
834 assert!(w.unwrap().contains("global sort"));
835 }
836
837 #[test]
838 fn no_dense_surrogate_warning_without_row_number() {
839 let mut e = make_export("t", ExportMode::Chunked, None);
840 e.chunk_column = Some("id".to_string());
841 e.query = Some("SELECT * FROM orders".to_string());
842 let w = check_dense_surrogate_cost(&e);
843 assert!(w.is_none());
844 }
845
846 #[test]
847 fn no_dense_surrogate_warning_for_non_chunked() {
848 let mut e = make_export("t", ExportMode::Full, None);
849 e.query = Some("SELECT ROW_NUMBER() OVER () AS rn FROM t".to_string());
850 let w = check_dense_surrogate_cost(&e);
851 assert!(w.is_none(), "should not warn for non-chunked mode");
852 }
853
854 #[test]
855 fn parallel_memory_warning_large_dataset() {
856 let mut e = make_export("t", ExportMode::Chunked, None);
857 e.parallel = 4;
858 let w = check_parallel_memory_risk(&e, Some(10_000_000));
859 assert!(w.is_some(), "should warn about memory risk");
860 let msg = w.unwrap();
861 assert!(msg.contains("Parallel=4"), "got: {msg}");
862 assert!(msg.contains("memory"), "got: {msg}");
863 }
864
865 #[test]
866 fn no_parallel_memory_warning_small_dataset() {
867 let mut e = make_export("t", ExportMode::Chunked, None);
868 e.parallel = 4;
869 let w = check_parallel_memory_risk(&e, Some(1_000));
870 assert!(w.is_none(), "should not warn for small dataset");
871 }
872
873 #[test]
874 fn no_parallel_memory_warning_single_worker() {
875 let e = make_export("t", ExportMode::Full, None);
876 let w = check_parallel_memory_risk(&e, Some(100_000_000));
877 assert!(w.is_none(), "should not warn when parallel=1");
878 }
879
880 #[test]
881 fn suggestion_degraded_full_recommends_incremental() {
882 let e = make_export("t", ExportMode::Full, None);
883 let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e).unwrap();
884 assert!(s.contains("incremental"), "got: {s}");
885 }
886
887 #[test]
888 fn suggestion_degraded_chunked_recommends_index() {
889 let mut e = make_export("t", ExportMode::Chunked, None);
890 e.chunk_column = Some("id".to_string());
891 let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e).unwrap();
892 assert!(s.contains("index on 'id'"), "got: {s}");
893 }
894
895 #[test]
896 fn suggestion_degraded_time_window_recommends_index() {
897 let mut e = make_export("t", ExportMode::TimeWindow, None);
898 e.time_column = Some("created_at".to_string());
899 e.days_window = Some(7);
900 let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e).unwrap();
901 assert!(s.contains("index on 'created_at'"), "got: {s}");
902 }
903
904 #[test]
905 fn suggestion_unsafe_full_recommends_incremental() {
906 let e = make_export("t", ExportMode::Full, None);
907 let s = build_suggestion(&HealthVerdict::Unsafe, Some(100_000_000), false, &e).unwrap();
908 assert!(s.contains("incremental"), "got: {s}");
909 }
910
911 #[test]
912 fn suggestion_unsafe_chunked_recommends_index_and_parallel() {
913 let mut e = make_export("t", ExportMode::Chunked, None);
914 e.chunk_column = Some("id".to_string());
915 let s = build_suggestion(&HealthVerdict::Unsafe, Some(100_000_000), false, &e).unwrap();
916 assert!(s.contains("index on 'id'"), "got: {s}");
917 assert!(s.contains("parallel"), "got: {s}");
918 }
919
920 #[test]
921 fn suggestion_unsafe_incremental_recommends_index_on_cursor() {
922 let e = make_export("t", ExportMode::Incremental, Some("updated_at"));
923 let s = build_suggestion(&HealthVerdict::Unsafe, Some(100_000_000), false, &e).unwrap();
924 assert!(s.contains("index on 'updated_at'"), "got: {s}");
925 }
926
927 #[test]
928 fn suggestion_acceptable_large_full_recommends_incremental() {
929 let e = make_export("t", ExportMode::Full, None);
930 let s = build_suggestion(&HealthVerdict::Acceptable, Some(20_000_000), true, &e).unwrap();
931 assert!(s.contains("incremental"), "got: {s}");
932 }
933
934 #[test]
935 fn parallel_only_for_chunked_mode() {
936 let e = make_export("t", ExportMode::Full, None);
937 let (level, _) = recommend_parallelism(&e, Some(1_000_000), true);
938 assert_eq!(level, 1, "non-chunked mode should recommend 1");
939 }
940
941 #[test]
942 fn parallel_small_dataset_is_one() {
943 let mut e = make_export("t", ExportMode::Chunked, None);
944 e.chunk_column = Some("id".to_string());
945 let (level, _) = recommend_parallelism(&e, Some(10_000), true);
946 assert_eq!(level, 1, "small dataset should recommend 1");
947 }
948
949 #[test]
950 fn parallel_moderate_indexed_is_two() {
951 let mut e = make_export("t", ExportMode::Chunked, None);
952 e.chunk_column = Some("id".to_string());
953 let (level, _) = recommend_parallelism(&e, Some(200_000), true);
954 assert_eq!(level, 2, "moderate indexed dataset should recommend 2");
955 }
956
957 #[test]
958 fn parallel_large_indexed_is_four() {
959 let mut e = make_export("t", ExportMode::Chunked, None);
960 e.chunk_column = Some("id".to_string());
961 let (level, _) = recommend_parallelism(&e, Some(2_000_000), true);
962 assert_eq!(level, 4, "large indexed dataset should recommend 4");
963 }
964
965 #[test]
966 fn parallel_no_index_large_is_one() {
967 let mut e = make_export("t", ExportMode::Chunked, None);
968 e.chunk_column = Some("id".to_string());
969 let (level, reason) = recommend_parallelism(&e, Some(10_000_000), false);
970 assert_eq!(level, 1, "no index + large should recommend 1");
971 assert!(reason.contains("no index"), "got: {reason}");
972 }
973
974 #[test]
975 fn parallel_no_index_moderate_is_conservative() {
976 let mut e = make_export("t", ExportMode::Chunked, None);
977 e.chunk_column = Some("id".to_string());
978 let (level, _) = recommend_parallelism(&e, Some(200_000), false);
979 assert_eq!(
980 level, 2,
981 "no index + moderate should recommend 2 (conservative)"
982 );
983 }
984
985 #[test]
986 fn suggestion_acceptable_large_chunked_recommends_parallel() {
987 let mut e = make_export("t", ExportMode::Chunked, None);
988 e.chunk_column = Some("id".to_string());
989 let s = build_suggestion(&HealthVerdict::Acceptable, Some(20_000_000), true, &e).unwrap();
990 assert!(s.contains("parallel"), "got: {s}");
991 }
992
993 #[test]
994 fn connection_limit_warn_when_parallel_meets_max() {
995 let w = check_connection_limit(20, Some(20));
996 assert!(w.is_some(), "should warn when parallel == max_connections");
997 let msg = w.unwrap();
998 assert!(msg.contains("max_connections=20"), "got: {msg}");
999 assert!(msg.contains("parallel=20"), "got: {msg}");
1000 }
1001
1002 #[test]
1003 fn connection_limit_warn_when_parallel_exceeds_max() {
1004 let w = check_connection_limit(100, Some(20));
1005 assert!(w.is_some(), "should warn when parallel > max_connections");
1006 let msg = w.unwrap();
1007 assert!(msg.contains("max_connections=20"), "got: {msg}");
1008 }
1009
1010 #[test]
1011 fn connection_limit_no_warn_when_parallel_below_max() {
1012 let w = check_connection_limit(4, Some(100));
1013 assert!(
1014 w.is_none(),
1015 "should not warn when parallel << max_connections"
1016 );
1017 }
1018
1019 #[test]
1020 fn connection_limit_no_warn_when_parallel_is_one() {
1021 let w = check_connection_limit(1, Some(5));
1022 assert!(
1023 w.is_none(),
1024 "single worker never triggers connection warning"
1025 );
1026 }
1027
1028 #[test]
1029 fn connection_limit_skipped_note_when_max_unknown_and_parallel_gt_one() {
1030 let w = check_connection_limit(100, None);
1031 assert!(w.is_some(), "should note that check was skipped");
1032 let msg = w.unwrap();
1033 assert!(msg.contains("skipped"), "got: {msg}");
1034 }
1035
1036 #[test]
1037 fn connection_limit_no_note_when_max_unknown_and_parallel_is_one() {
1038 let w = check_connection_limit(1, None);
1039 assert!(
1040 w.is_none(),
1041 "single worker never triggers connection warning"
1042 );
1043 }
1044
1045 #[test]
1046 fn connection_limit_suggests_headroom() {
1047 let w = check_connection_limit(25, Some(20)).unwrap();
1048 assert!(
1050 w.contains("17"),
1051 "should suggest leaving headroom, got: {w}"
1052 );
1053 }
1054
1055 fn src_hint(msg: &str, st: SourceType) -> Option<&'static str> {
1058 let err = anyhow::anyhow!("{}", msg);
1059 let cat = categorize_source_error(&err);
1060 source_error_hint(cat, &err, &st)
1061 }
1062
1063 fn dest_hint(msg: &str, dt: DestinationType) -> Option<&'static str> {
1064 let err = anyhow::anyhow!("{}", msg);
1065 let dest = DestinationConfig {
1066 destination_type: dt,
1067 bucket: Some("b".into()),
1068 ..Default::default()
1069 };
1070 let cat = categorize_dest_error(&err, &dest);
1071 destination_error_hint(cat, &dest)
1072 }
1073
1074 #[test]
1075 fn source_tls_handshake_returns_pg_specific_tls_hint() {
1076 let h = src_hint("TLS handshake failed", SourceType::Postgres).expect("hint");
1077 assert!(h.contains("tls.mode") && h.contains("ca_file"), "got: {h}");
1078 }
1079
1080 #[test]
1081 fn source_tls_handshake_returns_mysql_specific_tls_hint() {
1082 let h = src_hint("certificate verify failed", SourceType::Mysql).expect("hint");
1083 assert!(h.contains("tls.mode"), "got: {h}");
1084 }
1085
1086 #[test]
1087 fn source_auth_error_postgres_mentions_pg_hba() {
1088 let h = src_hint("password authentication failed", SourceType::Postgres).expect("hint");
1089 assert!(h.contains("pg_hba") && h.contains("SELECT"), "got: {h}");
1090 }
1091
1092 #[test]
1093 fn source_auth_error_mysql_mentions_grant() {
1094 let h = src_hint(
1095 "Access denied for user 'rivet'@'localhost'",
1096 SourceType::Mysql,
1097 )
1098 .expect("hint");
1099 assert!(h.contains("GRANT") && h.contains("FLUSH"), "got: {h}");
1100 }
1101
1102 #[test]
1103 fn source_connectivity_error_mentions_bastion_and_network() {
1104 let h = src_hint("connection refused", SourceType::Postgres).expect("hint");
1105 assert!(h.contains("bastion") || h.contains("VPN"), "got: {h}");
1106 }
1107
1108 #[test]
1109 fn source_unknown_error_returns_no_hint() {
1110 let h = src_hint("totally unexpected", SourceType::Postgres);
1113 assert!(h.is_none(), "unknown errors should not produce a hint");
1114 }
1115
1116 #[test]
1117 fn dest_s3_auth_error_names_concrete_actions() {
1118 let h = dest_hint("permission denied", DestinationType::S3).expect("hint");
1119 assert!(
1120 h.contains("s3:PutObject") && h.contains("cloud-permissions"),
1121 "got: {h}"
1122 );
1123 }
1124
1125 #[test]
1126 fn dest_gcs_auth_error_names_concrete_actions() {
1127 let h = dest_hint("403 Forbidden", DestinationType::Gcs).expect("hint");
1128 assert!(
1129 h.contains("storage.objects") && h.contains("cloud-permissions"),
1130 "got: {h}"
1131 );
1132 }
1133
1134 #[test]
1135 fn categorize_dest_error_sas_expired_message_returns_sas_expired_category() {
1136 let err = anyhow::anyhow!(
1141 "Azure SAS token already expired (se=2024-01-01T00:00:00Z). Generate a new SAS and re-export."
1142 );
1143 let dest = DestinationConfig {
1144 destination_type: DestinationType::Azure,
1145 bucket: Some("c".into()),
1146 ..Default::default()
1147 };
1148 let cat = categorize_dest_error(&err, &dest);
1149 assert_eq!(
1150 cat, "sas expired",
1151 "expired-SAS error must categorise as 'sas expired', not '{cat}' — ordering in categorize_dest_error is load-bearing"
1152 );
1153 }
1154
1155 #[test]
1156 fn dest_azure_sas_expired_returns_regenerate_hint() {
1157 let h = dest_hint(
1161 "Azure SAS token already expired (se=2024-01-01T00:00:00Z)",
1162 DestinationType::Azure,
1163 )
1164 .expect("hint");
1165 assert!(
1166 h.contains("generate-sas") && h.contains("AZURE_STORAGE_SAS_TOKEN"),
1167 "got: {h}"
1168 );
1169 }
1170
1171 #[test]
1172 fn dest_s3_bucket_not_found_says_no_auto_create() {
1173 let h = dest_hint("NoSuchBucket", DestinationType::S3).expect("hint");
1174 assert!(
1175 h.contains("does NOT auto-create") && h.contains("aws s3 mb"),
1176 "got: {h}"
1177 );
1178 }
1179
1180 #[test]
1181 fn dest_s3_connectivity_error_warns_about_region_mismatch() {
1182 let h = dest_hint("dns error", DestinationType::S3).expect("hint");
1183 assert!(h.contains("region") || h.contains("endpoint"), "got: {h}");
1184 }
1185}