1mod analysis;
2pub(crate) mod cursor_expr;
3mod doctor;
4mod mssql;
5mod mysql;
6mod postgres;
7mod schema_error;
8pub mod type_report;
9
10pub(crate) use analysis::chunk_sparsity_from_counts;
11#[cfg(test)]
12use analysis::{
13 build_suggestion, check_connection_limit, check_dense_surrogate_cost,
14 check_parallel_memory_risk, check_sparse_range, compute_verdict, derive_strategy,
15 recommend_parallelism, recommend_profile,
16};
17#[allow(unused_imports)]
18pub use doctor::doctor;
19pub(crate) use doctor::{categorize_source_error, source_error_hint};
22#[cfg(test)]
23use postgres::{extract_scan_type, parse_pg_row_estimate};
24
25use crate::config::{Config, ExportConfig, SourceType};
26use crate::error::Result;
27use crate::types::policy::TypePolicy;
28use crate::types::target::{ExportTarget, TargetStatus};
29
30#[derive(Debug)]
31pub enum HealthVerdict {
32 Efficient,
33 Acceptable,
34 Degraded,
35 Unsafe,
36}
37
38impl std::fmt::Display for HealthVerdict {
39 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40 match self {
41 Self::Efficient => write!(f, "EFFICIENT"),
42 Self::Acceptable => write!(f, "ACCEPTABLE"),
43 Self::Degraded => write!(f, "DEGRADED"),
44 Self::Unsafe => write!(f, "UNSAFE"),
45 }
46 }
47}
48
49pub(crate) struct ExportDiagnostic {
50 pub export_name: String,
51 pub strategy: String,
52 pub mode: String,
53 pub cursor_column: Option<String>,
54 pub row_estimate: Option<i64>,
55 pub cursor_min: Option<String>,
56 pub cursor_max: Option<String>,
57 pub scan_type: Option<String>,
58 pub uses_index: bool,
59 pub verdict: HealthVerdict,
60 pub recommended_profile: &'static str,
61 pub recommended_parallel: (u32, &'static str),
62 pub warnings: Vec<String>,
63 pub suggestion: Option<String>,
64}
65
66pub(crate) fn get_export_diagnostic(
70 config: &Config,
71 export: &ExportConfig,
72) -> Result<ExportDiagnostic> {
73 let url = config.source.resolve_url()?;
74 let tls = config.source.tls.as_ref();
75 crate::source::warn_if_tls_disabled(&config.source);
76 match config.source.source_type {
77 SourceType::Postgres => postgres::diagnose_export_pg(&url, tls, export),
78 SourceType::Mysql => mysql::diagnose_export_mysql(&url, tls, export),
79 SourceType::Mssql => mssql::diagnose_export_mssql(&url, tls, export),
80 }
81}
82
83fn destination_identity(d: &crate::config::DestinationConfig) -> String {
90 format!(
91 "{:?}:{}:{}:{}",
92 d.destination_type,
93 d.bucket.as_deref().unwrap_or("-"),
94 d.endpoint.as_deref().unwrap_or("-"),
95 d.path.as_deref().unwrap_or("-"),
96 )
97}
98
99fn target_fail_note(n: usize, target_label: &str) -> String {
105 let col = if n == 1 { "column" } else { "columns" };
106 format!(
107 "Note: {n} {col} FAIL {target_label} compatibility; exit code is gated only with --strict (currently exit 0)"
108 )
109}
110
111pub fn check(
112 config_path: &str,
113 export_name: Option<&str>,
114 params: Option<&std::collections::HashMap<String, String>>,
115 show_type_report: bool,
116 strict: bool,
117 json_output: bool,
118 target: Option<ExportTarget>,
119) -> Result<()> {
120 let config = Config::load_with_params(config_path, params)?;
121
122 let exports: Vec<&ExportConfig> = if let Some(name) = export_name {
123 let e = config
124 .exports
125 .iter()
126 .find(|e| e.name == name)
127 .ok_or_else(|| anyhow::anyhow!("export '{}' not found in config", name))?;
128 vec![e]
129 } else {
130 config.exports.iter().collect()
131 };
132
133 let url = config.source.resolve_url()?;
134 let tls = config.source.tls.as_ref();
135 crate::source::warn_if_tls_disabled(&config.source);
141 match config.source.source_type {
142 SourceType::Postgres => postgres::check_postgres(&url, tls, &exports, json_output)?,
143 SourceType::Mysql => mysql::check_mysql(&url, tls, &exports, json_output)?,
144 SourceType::Mssql => mssql::check_mssql(&url, tls, &exports, json_output)?,
145 }
146
147 let mut seen_destinations: std::collections::HashSet<String> = std::collections::HashSet::new();
156 for export in &exports {
157 let dest_key = destination_identity(&export.destination);
158 if !seen_destinations.insert(dest_key) {
159 continue;
160 }
161 let expanded = crate::plan::build::expand_destination_templates(
162 export.destination.clone(),
163 &export.name,
164 );
165 crate::destination::create_destination(&expanded).map_err(|e| {
166 anyhow::anyhow!(
167 "export '{}': destination preflight failed: {:#}",
168 export.name,
169 e
170 )
171 })?;
172 }
173
174 let mut clean = true;
178
179 if show_type_report {
180 let policy = if strict {
181 TypePolicy::strict()
182 } else {
183 TypePolicy::warn_only()
184 };
185
186 let mut any_fatal = false;
187 let mut target_fail_cols = 0usize;
193 let mut target_fail_label: Option<&'static str> = None;
194 for export in &exports {
195 let column_overrides =
196 crate::plan::parse_column_overrides_pub(&export.columns, &export.name)?;
197 if let Some(t) = export.target.as_deref()
201 && crate::types::target::ExportTarget::parse(t).is_none()
202 {
203 anyhow::bail!(
204 "export '{}': unknown target '{t}' (expected: {})",
205 export.name,
206 crate::types::target::ExportTarget::valid_target_names()
207 );
208 }
209 let eff_target = target.or_else(|| {
210 export
211 .target
212 .as_deref()
213 .and_then(crate::types::target::ExportTarget::parse)
214 });
215 let config_dir = std::path::Path::new(config_path)
216 .parent()
217 .unwrap_or_else(|| std::path::Path::new("."));
218 match type_report::collect_report(
219 &config,
220 export,
221 &column_overrides,
222 &policy,
223 eff_target,
224 config_dir,
225 params,
226 ) {
227 Ok(report) => {
228 if report.has_fatal() {
229 any_fatal = true;
230 }
231 if let Some(t) = eff_target
232 && report.has_target_fail()
233 {
234 any_fatal = true;
235 target_fail_cols += report
236 .columns
237 .iter()
238 .filter(|c| c.target_status == Some(TargetStatus::Fail))
239 .count();
240 target_fail_label.get_or_insert(t.label());
241 }
242 if json_output {
243 type_report::print_json(&report)?;
244 } else {
245 type_report::print_table(&report, eff_target);
246 }
247 }
248 Err(e) => {
249 log::warn!("type report for '{}' failed: {:#}", export.name, e);
250 }
251 }
252 }
253
254 if strict && any_fatal {
255 anyhow::bail!("strict mode: unsafe type mappings found (see report above)");
256 } else if !strict && target_fail_cols > 0 && !json_output {
257 clean = false;
260 println!();
261 println!(
262 "{}",
263 target_fail_note(target_fail_cols, target_fail_label.unwrap_or("target"))
264 );
265 }
266 }
267
268 if !json_output {
269 println!();
272 println!(
273 "Verdicts: EFFICIENT > ACCEPTABLE > DEGRADED > UNSAFE — advisory only; the run is never blocked."
274 );
275 if clean {
276 println!(
278 "Looks good. Next: rivet run -c {config_path} --validate # export, then verify row counts"
279 );
280 }
281 }
282
283 Ok(())
284}
285
286fn print_diagnostic(diag: &ExportDiagnostic) {
287 println!();
288 println!("Export: {}", diag.export_name);
289 println!(" Strategy: {}", diag.strategy);
290 println!(" Mode: {}", diag.mode);
291 if let Some(est) = diag.row_estimate {
292 if est >= 1_000_000 {
293 println!(" Row estimate: ~{}M", est / 1_000_000);
294 } else if est >= 1_000 {
295 println!(" Row estimate: ~{}K", est / 1_000);
296 } else {
297 println!(" Row estimate: ~{}", est);
298 }
299 }
300 if let (Some(min_v), Some(max_v)) = (&diag.cursor_min, &diag.cursor_max) {
301 println!(" Cursor range: {} .. {}", min_v, max_v);
302 }
303 if let Some(col) = &diag.cursor_column {
304 println!(" Cursor col: {}", col);
305 }
306 if diag.scan_type.is_some() {
311 let access = if diag.uses_index {
312 "index scan (the cursor/chunk column is indexed)"
313 } else {
314 "full table scan (no index on the read path)"
315 };
316 println!(" Access: {access}");
317 }
318 println!(" Verdict: {}", diag.verdict);
319 println!(
320 " Recommended: tuning.profile: {}",
321 diag.recommended_profile
322 );
323 let (par_level, par_reason) = diag.recommended_parallel;
324 if par_level > 1 {
325 println!(" Recommended: parallel: {} ({})", par_level, par_reason);
326 } else {
327 println!(" Parallelism: {} ({})", par_level, par_reason);
328 }
329 for w in &diag.warnings {
330 println!(" Warning: {}", w);
331 }
332 if let Some(suggestion) = &diag.suggestion {
333 println!(" Suggestion: {}", suggestion);
334 }
335}
336
337#[cfg(test)]
338mod tests {
339 use super::*;
340 use crate::config::{DestinationConfig, DestinationType, ExportConfig, ExportMode, FormatType};
341 use doctor::{
342 categorize_dest_error, categorize_source_error, destination_error_hint, source_error_hint,
343 };
344
345 fn make_export(name: &str, mode: ExportMode, cursor: Option<&str>) -> ExportConfig {
346 ExportConfig {
349 mode,
350 cursor_column: cursor.map(|s| s.to_string()),
351 query: Some("SELECT * FROM t".to_string()),
352 format: FormatType::Csv,
353 destination: DestinationConfig {
354 destination_type: DestinationType::Local,
355 path: Some("./out".to_string()),
356 ..Default::default()
357 },
358 ..crate::config::sample_export(name)
359 }
360 }
361
362 #[test]
366 fn target_fail_note_names_count_target_and_strict_gate() {
367 let note = target_fail_note(2, "bigquery");
368 assert!(note.contains("2 columns FAIL"), "got: {note}");
369 assert!(note.contains("bigquery"), "got: {note}");
370 assert!(note.contains("--strict"), "got: {note}");
371 assert!(note.contains("exit 0"), "got: {note}");
372 }
373
374 #[test]
375 fn target_fail_note_singular_for_one_column() {
376 let note = target_fail_note(1, "duckdb");
377 assert!(note.contains("1 column FAIL"), "got: {note}");
378 assert!(!note.contains("1 columns"), "should be singular: {note}");
379 }
380
381 #[test]
382 fn verdict_small_indexed_with_cursor_is_efficient() {
383 let v = compute_verdict(Some(500_000), true, true);
384 assert!(matches!(v, HealthVerdict::Efficient), "got: {v}");
385 }
386
387 #[test]
388 fn verdict_large_indexed_with_cursor_is_acceptable() {
389 let v = compute_verdict(Some(20_000_000), true, true);
390 assert!(matches!(v, HealthVerdict::Acceptable), "got: {v}");
391 }
392
393 #[test]
394 fn verdict_no_index_no_cursor_is_degraded() {
395 let v = compute_verdict(Some(500_000), false, false);
396 assert!(matches!(v, HealthVerdict::Degraded), "got: {v}");
397 }
398
399 #[test]
400 fn verdict_huge_no_index_is_unsafe() {
401 let v = compute_verdict(Some(100_000_000), false, false);
402 assert!(matches!(v, HealthVerdict::Unsafe), "got: {v}");
403 }
404
405 #[test]
406 fn parse_pg_row_estimate_from_sort_plan() {
407 let plan = "Sort (cost=12345.67..12456.78 rows=1000455 width=50)\n -> Seq Scan on orders (cost=0.00..8765.43 rows=1000455 width=50)";
408 assert_eq!(parse_pg_row_estimate(plan), Some(1_000_455));
409 }
410
411 #[test]
412 fn parse_pg_row_estimate_from_index_scan() {
413 let plan =
414 "Index Scan using idx_updated on orders (cost=0.42..81676.36 rows=500000 width=50)";
415 assert_eq!(parse_pg_row_estimate(plan), Some(500_000));
416 }
417
418 #[test]
419 fn extract_scan_type_detects_seq_scan() {
420 let plan = "Sort (cost=...)\n -> Seq Scan on users (cost=...)";
421 let st = extract_scan_type(plan);
422 assert!(st.contains("Seq Scan"), "expected Seq Scan, got: {st}");
423 }
424
425 #[test]
426 fn extract_scan_type_detects_index_scan() {
427 let plan = "Index Scan using users_pkey on users (cost=0.42..123.45 rows=100 width=50)";
428 let st = extract_scan_type(plan);
429 assert!(st.contains("Index Scan"), "expected Index Scan, got: {st}");
430 }
431
432 #[test]
433 fn suggestion_for_efficient_verdict_is_none() {
434 let e = make_export("t", ExportMode::Full, None);
435 let s = build_suggestion(&HealthVerdict::Efficient, Some(1000), true, &e);
436 assert!(
437 s.is_none(),
438 "efficient verdict should produce no suggestion"
439 );
440 }
441
442 #[test]
443 fn suggestion_for_degraded_verdict_recommends_safe_profile() {
444 let e = make_export("t", ExportMode::Full, None);
445 let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e);
446 let msg = s.expect("degraded verdict should produce a suggestion");
447 assert!(
448 msg.contains("safe"),
449 "suggestion should recommend safe profile, got: {msg}"
450 );
451 }
452
453 fn src_err(msg: &str) -> &'static str {
454 categorize_source_error(&anyhow::anyhow!("{}", msg))
455 }
456
457 #[test]
458 fn source_password_rejected_is_auth_error() {
459 assert_eq!(
460 src_err("password authentication failed for user \"rivet\""),
461 "auth error"
462 );
463 }
464
465 #[test]
466 fn source_authentication_failed_is_auth_error() {
467 assert_eq!(src_err("FATAL: authentication failed"), "auth error");
468 }
469
470 #[test]
471 fn source_access_denied_is_auth_error() {
472 assert_eq!(
473 src_err("Access denied for user 'rivet'@'localhost'"),
474 "auth error"
475 );
476 }
477
478 #[test]
479 fn source_connection_refused_is_connectivity() {
480 assert_eq!(
481 src_err("connection refused (os error 61)"),
482 "connectivity error"
483 );
484 }
485
486 #[test]
487 fn source_timed_out_is_connectivity() {
488 assert_eq!(src_err("connection timed out"), "connectivity error");
489 }
490
491 #[test]
492 fn source_dns_translate_host_is_connectivity() {
493 assert_eq!(
494 src_err("could not translate host name \"db.bad\" to address"),
495 "connectivity error"
496 );
497 }
498
499 #[test]
500 fn source_name_not_known_is_connectivity() {
501 assert_eq!(src_err("Name or service not known"), "connectivity error");
502 }
503
504 #[test]
505 fn source_unknown_error_is_generic() {
506 assert_eq!(src_err("something totally unexpected"), "error");
507 }
508
509 fn dest_config(dtype: DestinationType) -> DestinationConfig {
510 DestinationConfig {
511 destination_type: dtype,
512 bucket: Some("b".to_string()),
513 ..Default::default()
514 }
515 }
516
517 fn dest_err(msg: &str, dtype: DestinationType) -> &'static str {
518 let cfg = dest_config(dtype);
519 categorize_dest_error(&anyhow::anyhow!("{}", msg), &cfg)
520 }
521
522 fn local_dest(path: &str) -> DestinationConfig {
523 DestinationConfig {
524 destination_type: DestinationType::Local,
525 path: Some(path.to_string()),
526 ..Default::default()
527 }
528 }
529
530 #[test]
535 fn destination_identity_distinguishes_local_paths() {
536 assert_ne!(
537 destination_identity(&local_dest("/tmp/a")),
538 destination_identity(&local_dest("/tmp/b")),
539 );
540 }
541
542 #[test]
543 fn destination_identity_collapses_identical_local_destinations() {
544 assert_eq!(
545 destination_identity(&local_dest("/tmp/a")),
546 destination_identity(&local_dest("/tmp/a")),
547 );
548 }
549
550 #[test]
551 fn destination_identity_distinguishes_buckets() {
552 let a = DestinationConfig {
553 bucket: Some("bucket-a".to_string()),
554 ..dest_config(DestinationType::S3)
555 };
556 let b = DestinationConfig {
557 bucket: Some("bucket-b".to_string()),
558 ..dest_config(DestinationType::S3)
559 };
560 assert_ne!(destination_identity(&a), destination_identity(&b));
561 }
562
563 #[test]
566 fn destination_identity_distinguishes_endpoints_for_same_bucket() {
567 let aws = dest_config(DestinationType::S3);
568 let minio = DestinationConfig {
569 endpoint: Some("http://localhost:9000".to_string()),
570 ..dest_config(DestinationType::S3)
571 };
572 assert_ne!(destination_identity(&aws), destination_identity(&minio));
573 }
574
575 #[test]
576 fn dest_credential_loading_is_auth_error() {
577 assert_eq!(
578 dest_err(
579 "loading credential to sign http request",
580 DestinationType::Gcs
581 ),
582 "auth error"
583 );
584 }
585
586 #[test]
587 fn dest_permission_denied_is_auth_error() {
588 assert_eq!(
589 dest_err("permission denied on resource bucket", DestinationType::S3),
590 "auth error"
591 );
592 }
593
594 #[test]
595 fn dest_forbidden_is_auth_error() {
596 assert_eq!(
597 dest_err("403 Forbidden", DestinationType::Gcs),
598 "auth error"
599 );
600 }
601
602 #[test]
603 fn dest_unauthorized_is_auth_error() {
604 assert_eq!(
605 dest_err("401 Unauthorized", DestinationType::S3),
606 "auth error"
607 );
608 }
609
610 #[test]
611 fn dest_invalid_grant_is_auth_error() {
612 assert_eq!(
613 dest_err(
614 "invalid_grant: token has been revoked",
615 DestinationType::Gcs
616 ),
617 "auth error"
618 );
619 }
620
621 #[test]
622 fn dest_nosuchbucket_s3_is_bucket_not_found() {
623 assert_eq!(
624 dest_err(
625 "NoSuchBucket: the specified bucket does not exist",
626 DestinationType::S3
627 ),
628 "bucket not found"
629 );
630 }
631
632 #[test]
633 fn dest_not_found_gcs_is_bucket_not_found() {
634 assert_eq!(
635 dest_err("bucket not found (404)", DestinationType::Gcs),
636 "bucket not found"
637 );
638 }
639
640 #[test]
641 fn dest_not_found_local_is_path_not_found() {
642 assert_eq!(
643 dest_err("path not found: /tmp/missing", DestinationType::Local),
644 "path not found"
645 );
646 }
647
648 #[test]
649 fn dest_connection_refused_is_connectivity() {
650 assert_eq!(
651 dest_err("connection refused to endpoint", DestinationType::S3),
652 "connectivity error"
653 );
654 }
655
656 #[test]
657 fn dest_dns_error_is_connectivity() {
658 assert_eq!(
659 dest_err("dns error: failed to lookup address", DestinationType::S3),
660 "connectivity error"
661 );
662 }
663
664 #[test]
665 fn dest_timed_out_is_connectivity() {
666 assert_eq!(
667 dest_err("request timed out after 30s", DestinationType::Gcs),
668 "connectivity error"
669 );
670 }
671
672 #[test]
673 fn dest_unknown_error_is_generic() {
674 assert_eq!(
675 dest_err("something else entirely", DestinationType::S3),
676 "error"
677 );
678 }
679
680 #[test]
681 fn strategy_full_scan() {
682 let e = make_export("t", ExportMode::Full, None);
683 assert_eq!(derive_strategy(&e), "full-scan");
684 }
685
686 #[test]
687 fn strategy_full_parallel() {
688 let mut e = make_export("t", ExportMode::Full, None);
689 e.parallel = 4;
690 assert_eq!(derive_strategy(&e), "full-parallel(4)");
691 }
692
693 #[test]
694 fn strategy_incremental() {
695 let e = make_export("t", ExportMode::Incremental, Some("updated_at"));
696 assert_eq!(derive_strategy(&e), "incremental(updated_at)");
697 }
698
699 #[test]
700 fn strategy_chunked() {
701 let mut e = make_export("t", ExportMode::Chunked, None);
702 e.chunk_column = Some("id".to_string());
703 e.chunk_size = 50_000;
704 assert_eq!(derive_strategy(&e), "chunked(id, size=50000)");
705 }
706
707 #[test]
708 fn strategy_chunked_parallel() {
709 let mut e = make_export("t", ExportMode::Chunked, None);
710 e.chunk_column = Some("id".to_string());
711 e.chunk_size = 50_000;
712 e.parallel = 3;
713 assert_eq!(derive_strategy(&e), "chunked-parallel(id, size=50000, p=3)");
714 }
715
716 #[test]
717 fn strategy_time_window() {
718 let mut e = make_export("t", ExportMode::TimeWindow, None);
719 e.time_column = Some("created_at".to_string());
720 e.days_window = Some(7);
721 assert_eq!(derive_strategy(&e), "time-window(created_at, 7d)");
722 }
723
724 #[test]
725 fn profile_small_indexed_is_fast() {
726 let e = make_export("t", ExportMode::Full, None);
727 assert_eq!(recommend_profile(Some(500_000), true, &e), "fast");
728 }
729
730 #[test]
731 fn profile_medium_indexed_is_balanced() {
732 let e = make_export("t", ExportMode::Full, None);
733 assert_eq!(recommend_profile(Some(5_000_000), true, &e), "balanced");
734 }
735
736 #[test]
737 fn profile_large_indexed_is_safe() {
738 let e = make_export("t", ExportMode::Full, None);
739 assert_eq!(recommend_profile(Some(50_000_000), true, &e), "safe");
740 }
741
742 #[test]
743 fn profile_small_no_index_is_balanced() {
744 let e = make_export("t", ExportMode::Full, None);
745 assert_eq!(recommend_profile(Some(50_000), false, &e), "balanced");
746 }
747
748 #[test]
749 fn profile_small_no_index_parallel_is_safe() {
750 let mut e = make_export("t", ExportMode::Full, None);
751 e.parallel = 4;
752 assert_eq!(recommend_profile(Some(50_000), false, &e), "safe");
753 }
754
755 #[test]
756 fn profile_medium_no_index_is_balanced() {
757 let e = make_export("t", ExportMode::Full, None);
758 assert_eq!(recommend_profile(Some(500_000), false, &e), "balanced");
759 }
760
761 #[test]
762 fn profile_large_no_index_is_safe() {
763 let e = make_export("t", ExportMode::Full, None);
764 assert_eq!(recommend_profile(Some(5_000_000), false, &e), "safe");
765 }
766
767 #[test]
768 fn sparse_range_warning_when_very_sparse() {
769 let mut e = make_export("t", ExportMode::Chunked, None);
770 e.chunk_column = Some("id".to_string());
771 e.chunk_size = 100_000;
772 let w = check_sparse_range(&e, Some(100_000), Some("1"), Some("10000000"));
773 assert!(w.is_some(), "should warn about sparse range");
774 let msg = w.unwrap();
775 assert!(msg.contains("Sparse key range"), "got: {msg}");
776 assert!(msg.contains("empty"), "got: {msg}");
777 }
778
779 #[test]
780 fn sparse_range_no_warning_when_dense() {
781 let mut e = make_export("t", ExportMode::Chunked, None);
782 e.chunk_column = Some("id".to_string());
783 e.chunk_size = 100_000;
784 let w = check_sparse_range(&e, Some(100_000), Some("1"), Some("100000"));
785 assert!(w.is_none(), "should not warn for dense range");
786 }
787
788 #[test]
789 fn sparse_range_skipped_when_chunk_dense() {
790 let mut e = make_export("t", ExportMode::Chunked, None);
791 e.chunk_column = Some("id".to_string());
792 e.chunk_dense = true;
793 e.chunk_size = 100_000;
794 let w = check_sparse_range(&e, Some(100_000), Some("1"), Some("10000000"));
795 assert!(
796 w.is_none(),
797 "chunk_dense uses ordinals, not physical id span"
798 );
799 }
800
801 #[test]
802 fn dense_surrogate_warning_when_chunk_dense_builtin() {
803 let mut e = make_export("t", ExportMode::Chunked, None);
804 e.chunk_column = Some("id".to_string());
805 e.chunk_dense = true;
806 e.query = Some("SELECT id FROM orders".to_string());
807 let w = check_dense_surrogate_cost(&e);
808 assert!(w.is_some(), "should warn about built-in ROW_NUMBER cost");
809 assert!(w.unwrap().contains("global sort"));
810 }
811
812 #[test]
813 fn sparse_range_not_triggered_for_non_chunked() {
814 let e = make_export("t", ExportMode::Full, None);
815 let w = check_sparse_range(&e, Some(100), Some("1"), Some("1000000"));
816 assert!(w.is_none(), "should not warn for non-chunked mode");
817 }
818
819 #[test]
820 fn dense_surrogate_warning_with_row_number() {
821 let mut e = make_export("t", ExportMode::Chunked, None);
822 e.chunk_column = Some("rn".to_string());
823 e.query = Some("SELECT *, ROW_NUMBER() OVER (ORDER BY id) AS rn FROM orders".to_string());
824 let w = check_dense_surrogate_cost(&e);
825 assert!(w.is_some(), "should warn about ROW_NUMBER cost");
826 assert!(w.unwrap().contains("global sort"));
827 }
828
829 #[test]
830 fn no_dense_surrogate_warning_without_row_number() {
831 let mut e = make_export("t", ExportMode::Chunked, None);
832 e.chunk_column = Some("id".to_string());
833 e.query = Some("SELECT * FROM orders".to_string());
834 let w = check_dense_surrogate_cost(&e);
835 assert!(w.is_none());
836 }
837
838 #[test]
839 fn no_dense_surrogate_warning_for_non_chunked() {
840 let mut e = make_export("t", ExportMode::Full, None);
841 e.query = Some("SELECT ROW_NUMBER() OVER () AS rn FROM t".to_string());
842 let w = check_dense_surrogate_cost(&e);
843 assert!(w.is_none(), "should not warn for non-chunked mode");
844 }
845
846 #[test]
847 fn parallel_memory_warning_large_dataset() {
848 let mut e = make_export("t", ExportMode::Chunked, None);
849 e.parallel = 4;
850 let w = check_parallel_memory_risk(&e, Some(10_000_000));
851 assert!(w.is_some(), "should warn about memory risk");
852 let msg = w.unwrap();
853 assert!(msg.contains("Parallel=4"), "got: {msg}");
854 assert!(msg.contains("memory"), "got: {msg}");
855 }
856
857 #[test]
858 fn no_parallel_memory_warning_small_dataset() {
859 let mut e = make_export("t", ExportMode::Chunked, None);
860 e.parallel = 4;
861 let w = check_parallel_memory_risk(&e, Some(1_000));
862 assert!(w.is_none(), "should not warn for small dataset");
863 }
864
865 #[test]
866 fn no_parallel_memory_warning_single_worker() {
867 let e = make_export("t", ExportMode::Full, None);
868 let w = check_parallel_memory_risk(&e, Some(100_000_000));
869 assert!(w.is_none(), "should not warn when parallel=1");
870 }
871
872 #[test]
873 fn suggestion_degraded_full_recommends_incremental() {
874 let e = make_export("t", ExportMode::Full, None);
875 let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e).unwrap();
876 assert!(s.contains("incremental"), "got: {s}");
877 }
878
879 #[test]
880 fn suggestion_degraded_chunked_recommends_index() {
881 let mut e = make_export("t", ExportMode::Chunked, None);
882 e.chunk_column = Some("id".to_string());
883 let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e).unwrap();
884 assert!(s.contains("index on 'id'"), "got: {s}");
885 }
886
887 #[test]
888 fn suggestion_degraded_time_window_recommends_index() {
889 let mut e = make_export("t", ExportMode::TimeWindow, None);
890 e.time_column = Some("created_at".to_string());
891 e.days_window = Some(7);
892 let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e).unwrap();
893 assert!(s.contains("index on 'created_at'"), "got: {s}");
894 }
895
896 #[test]
897 fn suggestion_unsafe_full_recommends_incremental() {
898 let e = make_export("t", ExportMode::Full, None);
899 let s = build_suggestion(&HealthVerdict::Unsafe, Some(100_000_000), false, &e).unwrap();
900 assert!(s.contains("incremental"), "got: {s}");
901 }
902
903 #[test]
904 fn suggestion_unsafe_chunked_recommends_index_and_parallel() {
905 let mut e = make_export("t", ExportMode::Chunked, None);
906 e.chunk_column = Some("id".to_string());
907 let s = build_suggestion(&HealthVerdict::Unsafe, Some(100_000_000), false, &e).unwrap();
908 assert!(s.contains("index on 'id'"), "got: {s}");
909 assert!(s.contains("parallel"), "got: {s}");
910 }
911
912 #[test]
913 fn suggestion_unsafe_incremental_recommends_index_on_cursor() {
914 let e = make_export("t", ExportMode::Incremental, Some("updated_at"));
915 let s = build_suggestion(&HealthVerdict::Unsafe, Some(100_000_000), false, &e).unwrap();
916 assert!(s.contains("index on 'updated_at'"), "got: {s}");
917 }
918
919 #[test]
920 fn suggestion_acceptable_large_full_recommends_incremental() {
921 let e = make_export("t", ExportMode::Full, None);
922 let s = build_suggestion(&HealthVerdict::Acceptable, Some(20_000_000), true, &e).unwrap();
923 assert!(s.contains("incremental"), "got: {s}");
924 }
925
926 #[test]
927 fn parallel_only_for_chunked_mode() {
928 let e = make_export("t", ExportMode::Full, None);
929 let (level, _) = recommend_parallelism(&e, Some(1_000_000), true);
930 assert_eq!(level, 1, "non-chunked mode should recommend 1");
931 }
932
933 #[test]
934 fn parallel_small_dataset_is_one() {
935 let mut e = make_export("t", ExportMode::Chunked, None);
936 e.chunk_column = Some("id".to_string());
937 let (level, _) = recommend_parallelism(&e, Some(10_000), true);
938 assert_eq!(level, 1, "small dataset should recommend 1");
939 }
940
941 #[test]
942 fn parallel_moderate_indexed_is_two() {
943 let mut e = make_export("t", ExportMode::Chunked, None);
944 e.chunk_column = Some("id".to_string());
945 let (level, _) = recommend_parallelism(&e, Some(200_000), true);
946 assert_eq!(level, 2, "moderate indexed dataset should recommend 2");
947 }
948
949 #[test]
950 fn parallel_large_indexed_is_four() {
951 let mut e = make_export("t", ExportMode::Chunked, None);
952 e.chunk_column = Some("id".to_string());
953 let (level, _) = recommend_parallelism(&e, Some(2_000_000), true);
954 assert_eq!(level, 4, "large indexed dataset should recommend 4");
955 }
956
957 #[test]
958 fn parallel_no_index_large_is_one() {
959 let mut e = make_export("t", ExportMode::Chunked, None);
960 e.chunk_column = Some("id".to_string());
961 let (level, reason) = recommend_parallelism(&e, Some(10_000_000), false);
962 assert_eq!(level, 1, "no index + large should recommend 1");
963 assert!(reason.contains("no index"), "got: {reason}");
964 }
965
966 #[test]
967 fn parallel_no_index_moderate_is_conservative() {
968 let mut e = make_export("t", ExportMode::Chunked, None);
969 e.chunk_column = Some("id".to_string());
970 let (level, _) = recommend_parallelism(&e, Some(200_000), false);
971 assert_eq!(
972 level, 2,
973 "no index + moderate should recommend 2 (conservative)"
974 );
975 }
976
977 #[test]
978 fn suggestion_acceptable_large_chunked_recommends_parallel() {
979 let mut e = make_export("t", ExportMode::Chunked, None);
980 e.chunk_column = Some("id".to_string());
981 let s = build_suggestion(&HealthVerdict::Acceptable, Some(20_000_000), true, &e).unwrap();
982 assert!(s.contains("parallel"), "got: {s}");
983 }
984
985 #[test]
986 fn connection_limit_warn_when_parallel_meets_max() {
987 let w = check_connection_limit(20, Some(20));
988 assert!(w.is_some(), "should warn when parallel == max_connections");
989 let msg = w.unwrap();
990 assert!(msg.contains("max_connections=20"), "got: {msg}");
991 assert!(msg.contains("parallel=20"), "got: {msg}");
992 }
993
994 #[test]
995 fn connection_limit_warn_when_parallel_exceeds_max() {
996 let w = check_connection_limit(100, Some(20));
997 assert!(w.is_some(), "should warn when parallel > max_connections");
998 let msg = w.unwrap();
999 assert!(msg.contains("max_connections=20"), "got: {msg}");
1000 }
1001
1002 #[test]
1003 fn connection_limit_no_warn_when_parallel_below_max() {
1004 let w = check_connection_limit(4, Some(100));
1005 assert!(
1006 w.is_none(),
1007 "should not warn when parallel << max_connections"
1008 );
1009 }
1010
1011 #[test]
1012 fn connection_limit_no_warn_when_parallel_is_one() {
1013 let w = check_connection_limit(1, Some(5));
1014 assert!(
1015 w.is_none(),
1016 "single worker never triggers connection warning"
1017 );
1018 }
1019
1020 #[test]
1021 fn connection_limit_skipped_note_when_max_unknown_and_parallel_gt_one() {
1022 let w = check_connection_limit(100, None);
1023 assert!(w.is_some(), "should note that check was skipped");
1024 let msg = w.unwrap();
1025 assert!(msg.contains("skipped"), "got: {msg}");
1026 }
1027
1028 #[test]
1029 fn connection_limit_no_note_when_max_unknown_and_parallel_is_one() {
1030 let w = check_connection_limit(1, None);
1031 assert!(
1032 w.is_none(),
1033 "single worker never triggers connection warning"
1034 );
1035 }
1036
1037 #[test]
1038 fn connection_limit_suggests_headroom() {
1039 let w = check_connection_limit(25, Some(20)).unwrap();
1040 assert!(
1042 w.contains("17"),
1043 "should suggest leaving headroom, got: {w}"
1044 );
1045 }
1046
1047 fn src_hint(msg: &str, st: SourceType) -> Option<&'static str> {
1050 let err = anyhow::anyhow!("{}", msg);
1051 let cat = categorize_source_error(&err);
1052 source_error_hint(cat, &err, &st)
1053 }
1054
1055 fn dest_hint(msg: &str, dt: DestinationType) -> Option<&'static str> {
1056 let err = anyhow::anyhow!("{}", msg);
1057 let dest = DestinationConfig {
1058 destination_type: dt,
1059 bucket: Some("b".into()),
1060 ..Default::default()
1061 };
1062 let cat = categorize_dest_error(&err, &dest);
1063 destination_error_hint(cat, &dest)
1064 }
1065
1066 #[test]
1067 fn source_tls_handshake_returns_pg_specific_tls_hint() {
1068 let h = src_hint("TLS handshake failed", SourceType::Postgres).expect("hint");
1069 assert!(h.contains("tls.mode") && h.contains("ca_file"), "got: {h}");
1070 }
1071
1072 #[test]
1073 fn source_tls_handshake_returns_mysql_specific_tls_hint() {
1074 let h = src_hint("certificate verify failed", SourceType::Mysql).expect("hint");
1075 assert!(h.contains("tls.mode"), "got: {h}");
1076 }
1077
1078 #[test]
1079 fn source_auth_error_postgres_mentions_pg_hba() {
1080 let h = src_hint("password authentication failed", SourceType::Postgres).expect("hint");
1081 assert!(h.contains("pg_hba") && h.contains("SELECT"), "got: {h}");
1082 }
1083
1084 #[test]
1085 fn source_auth_error_mysql_mentions_grant() {
1086 let h = src_hint(
1087 "Access denied for user 'rivet'@'localhost'",
1088 SourceType::Mysql,
1089 )
1090 .expect("hint");
1091 assert!(h.contains("GRANT") && h.contains("FLUSH"), "got: {h}");
1092 }
1093
1094 #[test]
1095 fn source_connectivity_error_mentions_bastion_and_network() {
1096 let h = src_hint("connection refused", SourceType::Postgres).expect("hint");
1097 assert!(h.contains("bastion") || h.contains("VPN"), "got: {h}");
1098 }
1099
1100 #[test]
1101 fn source_unknown_error_returns_no_hint() {
1102 let h = src_hint("totally unexpected", SourceType::Postgres);
1105 assert!(h.is_none(), "unknown errors should not produce a hint");
1106 }
1107
1108 #[test]
1109 fn dest_s3_auth_error_names_concrete_actions() {
1110 let h = dest_hint("permission denied", DestinationType::S3).expect("hint");
1111 assert!(
1112 h.contains("s3:PutObject") && h.contains("cloud-permissions"),
1113 "got: {h}"
1114 );
1115 }
1116
1117 #[test]
1118 fn dest_gcs_auth_error_names_concrete_actions() {
1119 let h = dest_hint("403 Forbidden", DestinationType::Gcs).expect("hint");
1120 assert!(
1121 h.contains("storage.objects") && h.contains("cloud-permissions"),
1122 "got: {h}"
1123 );
1124 }
1125
1126 #[test]
1127 fn categorize_dest_error_sas_expired_message_returns_sas_expired_category() {
1128 let err = anyhow::anyhow!(
1133 "Azure SAS token already expired (se=2024-01-01T00:00:00Z). Generate a new SAS and re-export."
1134 );
1135 let dest = DestinationConfig {
1136 destination_type: DestinationType::Azure,
1137 bucket: Some("c".into()),
1138 ..Default::default()
1139 };
1140 let cat = categorize_dest_error(&err, &dest);
1141 assert_eq!(
1142 cat, "sas expired",
1143 "expired-SAS error must categorise as 'sas expired', not '{cat}' — ordering in categorize_dest_error is load-bearing"
1144 );
1145 }
1146
1147 #[test]
1148 fn dest_azure_sas_expired_returns_regenerate_hint() {
1149 let h = dest_hint(
1153 "Azure SAS token already expired (se=2024-01-01T00:00:00Z)",
1154 DestinationType::Azure,
1155 )
1156 .expect("hint");
1157 assert!(
1158 h.contains("generate-sas") && h.contains("AZURE_STORAGE_SAS_TOKEN"),
1159 "got: {h}"
1160 );
1161 }
1162
1163 #[test]
1164 fn dest_s3_bucket_not_found_says_no_auto_create() {
1165 let h = dest_hint("NoSuchBucket", DestinationType::S3).expect("hint");
1166 assert!(
1167 h.contains("does NOT auto-create") && h.contains("aws s3 mb"),
1168 "got: {h}"
1169 );
1170 }
1171
1172 #[test]
1173 fn dest_s3_connectivity_error_warns_about_region_mismatch() {
1174 let h = dest_hint("dns error", DestinationType::S3).expect("hint");
1175 assert!(h.contains("region") || h.contains("endpoint"), "got: {h}");
1176 }
1177}