1mod analysis;
2pub(crate) mod cursor_expr;
3mod doctor;
4mod mssql;
5mod mysql;
6mod postgres;
7mod schema_error;
8pub mod type_report;
9
10pub(crate) use analysis::chunk_sparsity_from_counts;
11pub(crate) use analysis::SMALL_TABLE_ROW_THRESHOLD;
14#[cfg(test)]
15use analysis::{
16 build_suggestion, check_connection_limit, check_dense_surrogate_cost,
17 check_parallel_memory_risk, check_sparse_range, compute_verdict, derive_strategy,
18 recommend_parallelism, recommend_profile,
19};
20#[allow(unused_imports)]
21pub use doctor::doctor;
22pub(crate) use doctor::{categorize_source_error, source_error_hint};
25#[cfg(test)]
26use postgres::{extract_scan_type, parse_pg_row_estimate};
27
28use serde::Serialize;
29
30use crate::config::{Config, ExportConfig, SourceType};
31use crate::error::Result;
32use crate::types::policy::TypePolicy;
33use crate::types::target::{ExportTarget, TargetStatus};
34
35#[derive(Debug, Serialize)]
40#[serde(rename_all = "lowercase")]
41pub enum HealthVerdict {
42 Efficient,
43 Acceptable,
44 Degraded,
45 Unsafe,
46}
47
48impl std::fmt::Display for HealthVerdict {
49 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50 match self {
51 Self::Efficient => write!(f, "EFFICIENT"),
52 Self::Acceptable => write!(f, "ACCEPTABLE"),
53 Self::Degraded => write!(f, "DEGRADED"),
54 Self::Unsafe => write!(f, "UNSAFE"),
55 }
56 }
57}
58
59pub(crate) struct ExportDiagnostic {
60 pub export_name: String,
61 pub strategy: String,
62 pub mode: String,
63 pub cursor_column: Option<String>,
64 pub row_estimate: Option<i64>,
65 pub avg_row_bytes: Option<i64>,
70 pub cursor_min: Option<String>,
71 pub cursor_max: Option<String>,
72 pub scan_type: Option<String>,
73 pub uses_index: bool,
74 pub verdict: HealthVerdict,
75 pub recommended_profile: &'static str,
76 pub recommended_parallel: (u32, &'static str),
77 pub warnings: Vec<String>,
78 pub suggestion: Option<String>,
79}
80
81impl Serialize for ExportDiagnostic {
91 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
92 where
93 S: serde::Serializer,
94 {
95 use serde::ser::SerializeMap;
96
97 #[derive(Serialize)]
98 struct RecommendedParallel {
99 level: u32,
100 reason: &'static str,
101 }
102 #[derive(Serialize)]
103 struct Capabilities {
104 uses_index: bool,
105 has_cursor: bool,
106 can_parallel: bool,
107 }
108
109 let mut map = serializer.serialize_map(None)?;
110 map.serialize_entry("export_name", &self.export_name)?;
111 map.serialize_entry("strategy", &self.strategy)?;
112 map.serialize_entry("mode", &self.mode)?;
113 if let Some(v) = &self.cursor_column {
114 map.serialize_entry("cursor_column", v)?;
115 }
116 if let Some(v) = &self.row_estimate {
117 map.serialize_entry("row_estimate", v)?;
118 }
119 if let Some(v) = &self.avg_row_bytes {
120 map.serialize_entry("avg_row_bytes", v)?;
121 }
122 if let Some(v) = &self.cursor_min {
123 map.serialize_entry("cursor_min", v)?;
124 }
125 if let Some(v) = &self.cursor_max {
126 map.serialize_entry("cursor_max", v)?;
127 }
128 if let Some(v) = &self.scan_type {
129 map.serialize_entry("scan_type", v)?;
130 }
131 map.serialize_entry("uses_index", &self.uses_index)?;
132 map.serialize_entry("verdict", &self.verdict)?;
133 map.serialize_entry("recommended_profile", &self.recommended_profile)?;
134 map.serialize_entry(
135 "recommended_parallel",
136 &RecommendedParallel {
137 level: self.recommended_parallel.0,
138 reason: self.recommended_parallel.1,
139 },
140 )?;
141 map.serialize_entry("warnings", &self.warnings)?;
142 if let Some(v) = &self.suggestion {
143 map.serialize_entry("suggestion", v)?;
144 }
145 map.serialize_entry(
146 "capabilities",
147 &Capabilities {
148 uses_index: self.uses_index,
149 has_cursor: self.cursor_column.is_some(),
150 can_parallel: self.recommended_parallel.0 > 1,
151 },
152 )?;
153 map.end()
154 }
155}
156
157pub(crate) fn get_export_diagnostic(
161 config: &Config,
162 export: &ExportConfig,
163) -> Result<ExportDiagnostic> {
164 let url = config.source.resolve_url()?;
165 let tls = config.source.tls.as_ref();
166 crate::source::warn_if_tls_disabled(&config.source);
167 match config.source.source_type {
168 SourceType::Postgres => postgres::diagnose_export_pg(&url, tls, export),
169 SourceType::Mysql => mysql::diagnose_export_mysql(&url, tls, export),
170 SourceType::Mssql => mssql::diagnose_export_mssql(&url, tls, export),
171 }
172}
173
174fn destination_identity(d: &crate::config::DestinationConfig) -> String {
181 format!(
182 "{:?}:{}:{}:{}",
183 d.destination_type,
184 d.bucket.as_deref().unwrap_or("-"),
185 d.endpoint.as_deref().unwrap_or("-"),
186 d.path.as_deref().unwrap_or("-"),
187 )
188}
189
190fn target_fail_note(n: usize, target_label: &str) -> String {
196 let col = if n == 1 { "column" } else { "columns" };
197 format!(
198 "Note: {n} {col} FAIL {target_label} compatibility; exit code is gated only with --strict (currently exit 0)"
199 )
200}
201
202pub(super) fn collect_diagnostics<F>(
208 exports: &[&ExportConfig],
209 mut diagnose: F,
210) -> Result<Vec<ExportDiagnostic>>
211where
212 F: FnMut(&ExportConfig) -> Result<ExportDiagnostic>,
213{
214 exports.iter().map(|&e| diagnose(e)).collect()
215}
216
217pub fn check(
218 config_path: &str,
219 export_name: Option<&str>,
220 params: Option<&std::collections::HashMap<String, String>>,
221 show_type_report: bool,
222 strict: bool,
223 json_output: bool,
224 target: Option<ExportTarget>,
225) -> Result<()> {
226 let config = Config::load_with_params(config_path, params)?;
227
228 let exports: Vec<&ExportConfig> = if let Some(name) = export_name {
229 let e = config
230 .exports
231 .iter()
232 .find(|e| e.name == name)
233 .ok_or_else(|| anyhow::anyhow!("export '{}' not found in config", name))?;
234 vec![e]
235 } else {
236 config.exports.iter().collect()
237 };
238
239 let url = config.source.resolve_url()?;
240 let tls = config.source.tls.as_ref();
241 crate::source::warn_if_tls_disabled(&config.source);
247 let diagnostics: Vec<ExportDiagnostic> = match config.source.source_type {
254 SourceType::Postgres => postgres::check_postgres(&url, tls, &exports)?,
255 SourceType::Mysql => mysql::check_mysql(&url, tls, &exports)?,
256 SourceType::Mssql => mssql::check_mssql(&url, tls, &exports)?,
257 };
258 if !json_output {
259 for diag in &diagnostics {
260 print_diagnostic(diag);
261 }
262 } else if !show_type_report {
263 for diag in &diagnostics {
270 println!("{}", serde_json::to_string(diag)?);
271 }
272 }
273 let diag_by_export: std::collections::HashMap<&str, &ExportDiagnostic> = if json_output {
278 diagnostics
279 .iter()
280 .map(|d| (d.export_name.as_str(), d))
281 .collect()
282 } else {
283 std::collections::HashMap::new()
284 };
285
286 let mut seen_destinations: std::collections::HashSet<String> = std::collections::HashSet::new();
295 for export in &exports {
296 let dest_key = destination_identity(&export.destination);
297 if !seen_destinations.insert(dest_key) {
298 continue;
299 }
300 let expanded = crate::plan::build::expand_destination_templates(
301 export.destination.clone(),
302 &export.name,
303 );
304 crate::destination::create_destination(&expanded).map_err(|e| {
305 anyhow::anyhow!(
306 "export '{}': destination preflight failed: {:#}",
307 export.name,
308 e
309 )
310 })?;
311 }
312
313 let mut clean = true;
317
318 if show_type_report {
319 let policy = if strict {
320 TypePolicy::strict()
321 } else {
322 TypePolicy::warn_only()
323 };
324
325 let mut any_fatal = false;
326 let mut target_fail_cols = 0usize;
332 let mut target_fail_label: Option<&'static str> = None;
333 for export in &exports {
334 let column_overrides =
335 crate::plan::parse_column_overrides_pub(&export.columns, &export.name)?;
336 if let Some(t) = export.target.as_deref()
340 && crate::types::target::ExportTarget::parse(t).is_none()
341 {
342 anyhow::bail!(
343 "export '{}': unknown target '{t}' (expected: {})",
344 export.name,
345 crate::types::target::ExportTarget::valid_target_names()
346 );
347 }
348 let eff_target = target.or_else(|| {
349 export
350 .target
351 .as_deref()
352 .and_then(crate::types::target::ExportTarget::parse)
353 });
354 let config_dir = std::path::Path::new(config_path)
355 .parent()
356 .unwrap_or_else(|| std::path::Path::new("."));
357 match type_report::collect_report(
358 &config,
359 export,
360 &column_overrides,
361 &policy,
362 eff_target,
363 config_dir,
364 params,
365 ) {
366 Ok(report) => {
367 if report.has_fatal() {
368 any_fatal = true;
369 }
370 if let Some(t) = eff_target
371 && report.has_target_fail()
372 {
373 any_fatal = true;
374 target_fail_cols += report
375 .columns
376 .iter()
377 .filter(|c| c.target_status == Some(TargetStatus::Fail))
378 .count();
379 target_fail_label.get_or_insert(t.label());
380 }
381 if json_output {
382 print_report_json_with_diagnostic(
396 &report,
397 diag_by_export.get(export.name.as_str()).copied(),
398 )?;
399 } else {
400 type_report::print_table(&report, eff_target);
401 }
402 }
403 Err(e) => {
404 log::warn!("type report for '{}' failed: {:#}", export.name, e);
405 if json_output
410 && let Some(diag) = diag_by_export.get(export.name.as_str()).copied()
411 {
412 println!("{}", serde_json::to_string(diag)?);
413 }
414 }
415 }
416 }
417
418 if strict && any_fatal {
419 anyhow::bail!("strict mode: unsafe type mappings found (see report above)");
420 } else if !strict && target_fail_cols > 0 && !json_output {
421 clean = false;
424 println!();
425 println!(
426 "{}",
427 target_fail_note(target_fail_cols, target_fail_label.unwrap_or("target"))
428 );
429 }
430 }
431
432 if !json_output {
433 println!();
436 println!(
437 "Verdicts: EFFICIENT > ACCEPTABLE > DEGRADED > UNSAFE — advisory only; the run is never blocked."
438 );
439 if clean {
440 println!(
442 "Looks good. Next: rivet run -c {config_path} --validate # export, then verify row counts"
443 );
444 }
445 }
446
447 Ok(())
448}
449
450fn print_report_json_with_diagnostic(
460 report: &type_report::ExportTypeReport,
461 diag: Option<&ExportDiagnostic>,
462) -> Result<()> {
463 let mut value = serde_json::to_value(report)?;
464 if let (Some(obj), Some(diag)) = (value.as_object_mut(), diag) {
465 obj.insert("diagnostic".to_string(), serde_json::to_value(diag)?);
466 }
467 println!("{}", serde_json::to_string(&value)?);
468 Ok(())
469}
470
471fn print_diagnostic(diag: &ExportDiagnostic) {
472 println!();
473 println!("Export: {}", diag.export_name);
474 println!(" Strategy: {}", diag.strategy);
475 println!(" Mode: {}", diag.mode);
476 if let Some(est) = diag.row_estimate {
477 if est >= 1_000_000 {
478 println!(" Row estimate: ~{}M", est / 1_000_000);
479 } else if est >= 1_000 {
480 println!(" Row estimate: ~{}K", est / 1_000);
481 } else {
482 println!(" Row estimate: ~{}", est);
483 }
484 }
485 if let Some(w) = diag.avg_row_bytes {
486 println!(" Row width: ~{} bytes", w);
487 }
488 if let (Some(min_v), Some(max_v)) = (&diag.cursor_min, &diag.cursor_max) {
489 println!(" Cursor range: {} .. {}", min_v, max_v);
490 }
491 if let Some(col) = &diag.cursor_column {
492 println!(" Cursor col: {}", col);
493 }
494 if diag.scan_type.is_some() {
499 let access = if diag.uses_index {
500 "index scan (the cursor/chunk column is indexed)"
501 } else {
502 "full table scan (no index on the read path)"
503 };
504 println!(" Access: {access}");
505 }
506 println!(" Verdict: {}", diag.verdict);
507 println!(
508 " Recommended: tuning.profile: {}",
509 diag.recommended_profile
510 );
511 let (par_level, par_reason) = diag.recommended_parallel;
512 if par_level > 1 {
513 println!(" Recommended: parallel: {} ({})", par_level, par_reason);
514 } else {
515 println!(" Parallelism: {} ({})", par_level, par_reason);
516 }
517 for w in &diag.warnings {
518 println!(" Warning: {}", w);
519 }
520 if let Some(suggestion) = &diag.suggestion {
521 println!(" Suggestion: {}", suggestion);
522 }
523}
524
525#[cfg(test)]
526mod tests {
527 use super::*;
528 use crate::config::{DestinationConfig, DestinationType, ExportConfig, ExportMode, FormatType};
529 use doctor::{
530 categorize_dest_error, categorize_source_error, destination_error_hint, source_error_hint,
531 };
532 use serde_json::Value;
533
534 fn make_export(name: &str, mode: ExportMode, cursor: Option<&str>) -> ExportConfig {
535 ExportConfig {
538 mode,
539 cursor_column: cursor.map(|s| s.to_string()),
540 query: Some("SELECT * FROM t".to_string()),
541 format: FormatType::Csv,
542 destination: DestinationConfig {
543 destination_type: DestinationType::Local,
544 path: Some("./out".to_string()),
545 ..Default::default()
546 },
547 ..crate::config::sample_export(name)
548 }
549 }
550
551 fn sample_diagnostic(name: &str) -> ExportDiagnostic {
556 ExportDiagnostic {
557 export_name: name.to_string(),
558 strategy: "incremental(updated_at)".to_string(),
559 mode: "incremental".to_string(),
560 cursor_column: Some("updated_at".to_string()),
561 row_estimate: Some(1_234_567),
562 avg_row_bytes: Some(96),
563 cursor_min: Some("2020-01-01".to_string()),
564 cursor_max: Some("2024-01-01".to_string()),
565 scan_type: Some("Index Scan".to_string()),
566 uses_index: true,
567 verdict: HealthVerdict::Degraded,
568 recommended_profile: "safe",
569 recommended_parallel: (4, "large indexed dataset"),
570 warnings: vec!["Sparse key range".to_string(), "memory risk".to_string()],
571 suggestion: Some("create an index".to_string()),
572 }
573 }
574
575 #[test]
578 fn diagnostic_json_has_lowercase_verdict_and_core_fields() {
579 let diag = sample_diagnostic("orders");
580 let v: serde_json::Value =
581 serde_json::from_str(&serde_json::to_string(&diag).unwrap()).unwrap();
582
583 assert_eq!(v["verdict"], "degraded", "got: {v}");
586 assert_eq!(v["strategy"], "incremental(updated_at)", "got: {v}");
587 assert_eq!(v["mode"], "incremental", "got: {v}");
588 assert_eq!(v["recommended_profile"], "safe", "got: {v}");
589 assert!(v["warnings"].is_array(), "warnings must be an array: {v}");
590 assert_eq!(v["warnings"].as_array().unwrap().len(), 2, "got: {v}");
591 assert_eq!(v["export_name"], "orders", "got: {v}");
592 }
593
594 #[test]
595 fn diagnostic_json_verdict_tokens_are_all_lowercase() {
596 for (verdict, token) in [
597 (HealthVerdict::Efficient, "efficient"),
598 (HealthVerdict::Acceptable, "acceptable"),
599 (HealthVerdict::Degraded, "degraded"),
600 (HealthVerdict::Unsafe, "unsafe"),
601 ] {
602 let mut diag = sample_diagnostic("t");
603 diag.verdict = verdict;
604 let v: serde_json::Value =
605 serde_json::from_str(&serde_json::to_string(&diag).unwrap()).unwrap();
606 assert_eq!(v["verdict"], token, "verdict must lowercase to {token}");
607 }
608 }
609
610 #[test]
611 fn diagnostic_json_recommended_parallel_is_named_object_not_tuple() {
612 let diag = sample_diagnostic("t");
615 let v: serde_json::Value =
616 serde_json::from_str(&serde_json::to_string(&diag).unwrap()).unwrap();
617 assert!(
618 v["recommended_parallel"].is_object(),
619 "recommended_parallel must be an object, got: {}",
620 v["recommended_parallel"]
621 );
622 assert_eq!(v["recommended_parallel"]["level"], 4, "got: {v}");
623 assert_eq!(
624 v["recommended_parallel"]["reason"], "large indexed dataset",
625 "got: {v}"
626 );
627 }
628
629 #[test]
630 fn diagnostic_json_capabilities_are_derived_from_fields() {
631 let diag = sample_diagnostic("t");
632 let v: serde_json::Value =
633 serde_json::from_str(&serde_json::to_string(&diag).unwrap()).unwrap();
634 let caps = &v["capabilities"];
635 assert_eq!(caps["uses_index"], true, "got: {caps}");
636 assert_eq!(caps["has_cursor"], true, "got: {caps}");
637 assert_eq!(caps["can_parallel"], true, "got: {caps}");
638 }
639
640 #[test]
641 fn diagnostic_json_capabilities_flip_with_fields() {
642 let mut diag = sample_diagnostic("t");
644 diag.cursor_column = None;
645 diag.uses_index = false;
646 diag.recommended_parallel = (1, "small dataset");
647 let v: serde_json::Value =
648 serde_json::from_str(&serde_json::to_string(&diag).unwrap()).unwrap();
649 let caps = &v["capabilities"];
650 assert_eq!(caps["uses_index"], false, "got: {caps}");
651 assert_eq!(caps["has_cursor"], false, "got: {caps}");
652 assert_eq!(caps["can_parallel"], false, "got: {caps}");
653 }
654
655 #[test]
656 fn diagnostic_json_skips_none_optionals() {
657 let mut diag = sample_diagnostic("t");
659 diag.suggestion = None;
660 diag.scan_type = None;
661 let v: serde_json::Value =
662 serde_json::from_str(&serde_json::to_string(&diag).unwrap()).unwrap();
663 let obj = v.as_object().unwrap();
664 assert!(!obj.contains_key("suggestion"), "None must be omitted: {v}");
665 assert!(!obj.contains_key("scan_type"), "None must be omitted: {v}");
666 }
667
668 fn merged_check_json(report: &type_report::ExportTypeReport, diag: &ExportDiagnostic) -> Value {
671 let mut value = serde_json::to_value(report).unwrap();
672 value.as_object_mut().unwrap().insert(
673 "diagnostic".to_string(),
674 serde_json::to_value(diag).unwrap(),
675 );
676 value
677 }
678
679 fn empty_report(export: &str) -> type_report::ExportTypeReport {
680 type_report::ExportTypeReport {
681 export: export.to_string(),
682 columns: Vec::new(),
683 violations: Vec::new(),
684 target_failures: false,
685 recovery_sql: None,
686 }
687 }
688
689 #[test]
690 fn check_json_merges_diagnostic_into_type_report_object() {
691 let report = empty_report("orders");
696 let diag = sample_diagnostic("orders");
697 let v = merged_check_json(&report, &diag);
698
699 assert_eq!(v["export"], "orders", "got: {v}");
701 assert!(v["columns"].is_array(), "columns at root: {v}");
702 assert!(v["violations"].is_array(), "violations at root: {v}");
703
704 let d = &v["diagnostic"];
706 assert_eq!(d["verdict"], "degraded", "got: {d}");
707 assert_eq!(d["strategy"], "incremental(updated_at)", "got: {d}");
708 assert_eq!(d["mode"], "incremental", "got: {d}");
709 assert_eq!(d["recommended_profile"], "safe", "got: {d}");
710 assert!(d["warnings"].is_array(), "warnings array: {d}");
711 assert_eq!(d["capabilities"]["has_cursor"], true, "got: {d}");
712 }
713
714 #[test]
715 fn check_json_object_is_a_single_parseable_line() {
716 let report = empty_report("orders");
720 let diag = sample_diagnostic("orders");
721 let line = serde_json::to_string(&merged_check_json(&report, &diag)).unwrap();
722 assert!(!line.contains('\n'), "one object per line: {line}");
723 let parsed: Value = serde_json::from_str(line.trim()).expect("must parse whole");
724 assert_eq!(parsed["export"], "orders");
725 }
726
727 #[test]
731 fn target_fail_note_names_count_target_and_strict_gate() {
732 let note = target_fail_note(2, "bigquery");
733 assert!(note.contains("2 columns FAIL"), "got: {note}");
734 assert!(note.contains("bigquery"), "got: {note}");
735 assert!(note.contains("--strict"), "got: {note}");
736 assert!(note.contains("exit 0"), "got: {note}");
737 }
738
739 #[test]
740 fn target_fail_note_singular_for_one_column() {
741 let note = target_fail_note(1, "duckdb");
742 assert!(note.contains("1 column FAIL"), "got: {note}");
743 assert!(!note.contains("1 columns"), "should be singular: {note}");
744 }
745
746 #[test]
747 fn verdict_small_indexed_with_cursor_is_efficient() {
748 let v = compute_verdict(Some(500_000), true, true, None, 1);
749 assert!(matches!(v, HealthVerdict::Efficient), "got: {v}");
750 }
751
752 #[test]
753 fn verdict_large_indexed_with_cursor_is_acceptable() {
754 let v = compute_verdict(Some(20_000_000), true, true, None, 1);
755 assert!(matches!(v, HealthVerdict::Acceptable), "got: {v}");
756 }
757
758 #[test]
759 fn verdict_no_index_no_cursor_is_degraded() {
760 let v = compute_verdict(Some(500_000), false, false, None, 1);
761 assert!(matches!(v, HealthVerdict::Degraded), "got: {v}");
762 }
763
764 #[test]
765 fn verdict_huge_no_index_is_unsafe() {
766 let v = compute_verdict(Some(100_000_000), false, false, None, 1);
767 assert!(matches!(v, HealthVerdict::Unsafe), "got: {v}");
768 }
769
770 #[test]
771 fn parse_pg_row_estimate_from_sort_plan() {
772 let plan = "Sort (cost=12345.67..12456.78 rows=1000455 width=50)\n -> Seq Scan on orders (cost=0.00..8765.43 rows=1000455 width=50)";
773 assert_eq!(parse_pg_row_estimate(plan), Some(1_000_455));
774 }
775
776 #[test]
777 fn parse_pg_row_estimate_from_index_scan() {
778 let plan =
779 "Index Scan using idx_updated on orders (cost=0.42..81676.36 rows=500000 width=50)";
780 assert_eq!(parse_pg_row_estimate(plan), Some(500_000));
781 }
782
783 #[test]
784 fn extract_scan_type_detects_seq_scan() {
785 let plan = "Sort (cost=...)\n -> Seq Scan on users (cost=...)";
786 let st = extract_scan_type(plan);
787 assert!(st.contains("Seq Scan"), "expected Seq Scan, got: {st}");
788 }
789
790 #[test]
791 fn extract_scan_type_detects_index_scan() {
792 let plan = "Index Scan using users_pkey on users (cost=0.42..123.45 rows=100 width=50)";
793 let st = extract_scan_type(plan);
794 assert!(st.contains("Index Scan"), "expected Index Scan, got: {st}");
795 }
796
797 #[test]
798 fn suggestion_for_efficient_verdict_is_none() {
799 let e = make_export("t", ExportMode::Full, None);
800 let s = build_suggestion(&HealthVerdict::Efficient, Some(1000), true, &e);
801 assert!(
802 s.is_none(),
803 "efficient verdict should produce no suggestion"
804 );
805 }
806
807 #[test]
808 fn suggestion_for_degraded_verdict_recommends_safe_profile() {
809 let e = make_export("t", ExportMode::Full, None);
810 let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e);
811 let msg = s.expect("degraded verdict should produce a suggestion");
812 assert!(
813 msg.contains("safe"),
814 "suggestion should recommend safe profile, got: {msg}"
815 );
816 }
817
818 fn src_err(msg: &str) -> &'static str {
819 categorize_source_error(&anyhow::anyhow!("{}", msg))
820 }
821
822 #[test]
823 fn source_password_rejected_is_auth_error() {
824 assert_eq!(
825 src_err("password authentication failed for user \"rivet\""),
826 "auth error"
827 );
828 }
829
830 #[test]
831 fn source_authentication_failed_is_auth_error() {
832 assert_eq!(src_err("FATAL: authentication failed"), "auth error");
833 }
834
835 #[test]
836 fn source_access_denied_is_auth_error() {
837 assert_eq!(
838 src_err("Access denied for user 'rivet'@'localhost'"),
839 "auth error"
840 );
841 }
842
843 #[test]
844 fn source_connection_refused_is_connectivity() {
845 assert_eq!(
846 src_err("connection refused (os error 61)"),
847 "connectivity error"
848 );
849 }
850
851 #[test]
852 fn source_timed_out_is_connectivity() {
853 assert_eq!(src_err("connection timed out"), "connectivity error");
854 }
855
856 #[test]
857 fn source_dns_translate_host_is_connectivity() {
858 assert_eq!(
859 src_err("could not translate host name \"db.bad\" to address"),
860 "connectivity error"
861 );
862 }
863
864 #[test]
865 fn source_name_not_known_is_connectivity() {
866 assert_eq!(src_err("Name or service not known"), "connectivity error");
867 }
868
869 #[test]
870 fn source_unknown_error_is_generic() {
871 assert_eq!(src_err("something totally unexpected"), "error");
872 }
873
874 fn dest_config(dtype: DestinationType) -> DestinationConfig {
875 DestinationConfig {
876 destination_type: dtype,
877 bucket: Some("b".to_string()),
878 ..Default::default()
879 }
880 }
881
882 fn dest_err(msg: &str, dtype: DestinationType) -> &'static str {
883 let cfg = dest_config(dtype);
884 categorize_dest_error(&anyhow::anyhow!("{}", msg), &cfg)
885 }
886
887 fn local_dest(path: &str) -> DestinationConfig {
888 DestinationConfig {
889 destination_type: DestinationType::Local,
890 path: Some(path.to_string()),
891 ..Default::default()
892 }
893 }
894
895 #[test]
900 fn destination_identity_distinguishes_local_paths() {
901 assert_ne!(
902 destination_identity(&local_dest("/tmp/a")),
903 destination_identity(&local_dest("/tmp/b")),
904 );
905 }
906
907 #[test]
908 fn destination_identity_collapses_identical_local_destinations() {
909 assert_eq!(
910 destination_identity(&local_dest("/tmp/a")),
911 destination_identity(&local_dest("/tmp/a")),
912 );
913 }
914
915 #[test]
916 fn destination_identity_distinguishes_buckets() {
917 let a = DestinationConfig {
918 bucket: Some("bucket-a".to_string()),
919 ..dest_config(DestinationType::S3)
920 };
921 let b = DestinationConfig {
922 bucket: Some("bucket-b".to_string()),
923 ..dest_config(DestinationType::S3)
924 };
925 assert_ne!(destination_identity(&a), destination_identity(&b));
926 }
927
928 #[test]
931 fn destination_identity_distinguishes_endpoints_for_same_bucket() {
932 let aws = dest_config(DestinationType::S3);
933 let minio = DestinationConfig {
934 endpoint: Some("http://localhost:9000".to_string()),
935 ..dest_config(DestinationType::S3)
936 };
937 assert_ne!(destination_identity(&aws), destination_identity(&minio));
938 }
939
940 #[test]
941 fn dest_credential_loading_is_auth_error() {
942 assert_eq!(
943 dest_err(
944 "loading credential to sign http request",
945 DestinationType::Gcs
946 ),
947 "auth error"
948 );
949 }
950
951 #[test]
952 fn dest_permission_denied_is_auth_error() {
953 assert_eq!(
954 dest_err("permission denied on resource bucket", DestinationType::S3),
955 "auth error"
956 );
957 }
958
959 #[test]
960 fn dest_forbidden_is_auth_error() {
961 assert_eq!(
962 dest_err("403 Forbidden", DestinationType::Gcs),
963 "auth error"
964 );
965 }
966
967 #[test]
968 fn dest_unauthorized_is_auth_error() {
969 assert_eq!(
970 dest_err("401 Unauthorized", DestinationType::S3),
971 "auth error"
972 );
973 }
974
975 #[test]
976 fn dest_invalid_grant_is_auth_error() {
977 assert_eq!(
978 dest_err(
979 "invalid_grant: token has been revoked",
980 DestinationType::Gcs
981 ),
982 "auth error"
983 );
984 }
985
986 #[test]
987 fn dest_nosuchbucket_s3_is_bucket_not_found() {
988 assert_eq!(
989 dest_err(
990 "NoSuchBucket: the specified bucket does not exist",
991 DestinationType::S3
992 ),
993 "bucket not found"
994 );
995 }
996
997 #[test]
998 fn dest_not_found_gcs_is_bucket_not_found() {
999 assert_eq!(
1000 dest_err("bucket not found (404)", DestinationType::Gcs),
1001 "bucket not found"
1002 );
1003 }
1004
1005 #[test]
1006 fn dest_not_found_local_is_path_not_found() {
1007 assert_eq!(
1008 dest_err("path not found: /tmp/missing", DestinationType::Local),
1009 "path not found"
1010 );
1011 }
1012
1013 #[test]
1014 fn dest_connection_refused_is_connectivity() {
1015 assert_eq!(
1016 dest_err("connection refused to endpoint", DestinationType::S3),
1017 "connectivity error"
1018 );
1019 }
1020
1021 #[test]
1022 fn dest_dns_error_is_connectivity() {
1023 assert_eq!(
1024 dest_err("dns error: failed to lookup address", DestinationType::S3),
1025 "connectivity error"
1026 );
1027 }
1028
1029 #[test]
1030 fn dest_timed_out_is_connectivity() {
1031 assert_eq!(
1032 dest_err("request timed out after 30s", DestinationType::Gcs),
1033 "connectivity error"
1034 );
1035 }
1036
1037 #[test]
1038 fn dest_unknown_error_is_generic() {
1039 assert_eq!(
1040 dest_err("something else entirely", DestinationType::S3),
1041 "error"
1042 );
1043 }
1044
1045 #[test]
1046 fn strategy_full_scan() {
1047 let e = make_export("t", ExportMode::Full, None);
1048 assert_eq!(derive_strategy(&e), "full-scan");
1049 }
1050
1051 #[test]
1052 fn strategy_full_parallel() {
1053 let mut e = make_export("t", ExportMode::Full, None);
1054 e.parallel = 4;
1055 assert_eq!(derive_strategy(&e), "full-parallel(4)");
1056 }
1057
1058 #[test]
1059 fn strategy_incremental() {
1060 let e = make_export("t", ExportMode::Incremental, Some("updated_at"));
1061 assert_eq!(derive_strategy(&e), "incremental(updated_at)");
1062 }
1063
1064 #[test]
1065 fn strategy_chunked() {
1066 let mut e = make_export("t", ExportMode::Chunked, None);
1067 e.chunk_column = Some("id".to_string());
1068 e.chunk_size = 50_000;
1069 assert_eq!(derive_strategy(&e), "chunked(id, size=50000)");
1070 }
1071
1072 #[test]
1073 fn strategy_chunked_parallel() {
1074 let mut e = make_export("t", ExportMode::Chunked, None);
1075 e.chunk_column = Some("id".to_string());
1076 e.chunk_size = 50_000;
1077 e.parallel = 3;
1078 assert_eq!(derive_strategy(&e), "chunked-parallel(id, size=50000, p=3)");
1079 }
1080
1081 #[test]
1082 fn strategy_time_window() {
1083 let mut e = make_export("t", ExportMode::TimeWindow, None);
1084 e.time_column = Some("created_at".to_string());
1085 e.days_window = Some(7);
1086 assert_eq!(derive_strategy(&e), "time-window(created_at, 7d)");
1087 }
1088
1089 #[test]
1090 fn profile_small_indexed_is_fast() {
1091 let e = make_export("t", ExportMode::Full, None);
1092 assert_eq!(recommend_profile(Some(500_000), true, &e), "fast");
1093 }
1094
1095 #[test]
1096 fn profile_medium_indexed_is_balanced() {
1097 let e = make_export("t", ExportMode::Full, None);
1098 assert_eq!(recommend_profile(Some(5_000_000), true, &e), "balanced");
1099 }
1100
1101 #[test]
1102 fn profile_large_indexed_is_safe() {
1103 let e = make_export("t", ExportMode::Full, None);
1104 assert_eq!(recommend_profile(Some(50_000_000), true, &e), "safe");
1105 }
1106
1107 #[test]
1108 fn profile_small_no_index_is_balanced() {
1109 let e = make_export("t", ExportMode::Full, None);
1110 assert_eq!(recommend_profile(Some(50_000), false, &e), "balanced");
1111 }
1112
1113 #[test]
1114 fn profile_small_no_index_parallel_is_safe() {
1115 let mut e = make_export("t", ExportMode::Full, None);
1116 e.parallel = 4;
1117 assert_eq!(recommend_profile(Some(50_000), false, &e), "safe");
1118 }
1119
1120 #[test]
1121 fn profile_medium_no_index_is_balanced() {
1122 let e = make_export("t", ExportMode::Full, None);
1123 assert_eq!(recommend_profile(Some(500_000), false, &e), "balanced");
1124 }
1125
1126 #[test]
1127 fn profile_large_no_index_is_safe() {
1128 let e = make_export("t", ExportMode::Full, None);
1129 assert_eq!(recommend_profile(Some(5_000_000), false, &e), "safe");
1130 }
1131
1132 #[test]
1133 fn sparse_range_warning_when_very_sparse() {
1134 let mut e = make_export("t", ExportMode::Chunked, None);
1135 e.chunk_column = Some("id".to_string());
1136 e.chunk_size = 100_000;
1137 let w = check_sparse_range(&e, Some(100_000), Some("1"), Some("10000000"));
1138 assert!(w.is_some(), "should warn about sparse range");
1139 let msg = w.unwrap();
1140 assert!(msg.contains("Sparse key range"), "got: {msg}");
1141 assert!(msg.contains("empty"), "got: {msg}");
1142 }
1143
1144 #[test]
1145 fn sparse_range_no_warning_when_dense() {
1146 let mut e = make_export("t", ExportMode::Chunked, None);
1147 e.chunk_column = Some("id".to_string());
1148 e.chunk_size = 100_000;
1149 let w = check_sparse_range(&e, Some(100_000), Some("1"), Some("100000"));
1150 assert!(w.is_none(), "should not warn for dense range");
1151 }
1152
1153 #[test]
1154 fn sparse_range_skipped_when_chunk_dense() {
1155 let mut e = make_export("t", ExportMode::Chunked, None);
1156 e.chunk_column = Some("id".to_string());
1157 e.chunk_dense = true;
1158 e.chunk_size = 100_000;
1159 let w = check_sparse_range(&e, Some(100_000), Some("1"), Some("10000000"));
1160 assert!(
1161 w.is_none(),
1162 "chunk_dense uses ordinals, not physical id span"
1163 );
1164 }
1165
1166 #[test]
1167 fn dense_surrogate_warning_when_chunk_dense_builtin() {
1168 let mut e = make_export("t", ExportMode::Chunked, None);
1169 e.chunk_column = Some("id".to_string());
1170 e.chunk_dense = true;
1171 e.query = Some("SELECT id FROM orders".to_string());
1172 let w = check_dense_surrogate_cost(&e);
1173 assert!(w.is_some(), "should warn about built-in ROW_NUMBER cost");
1174 assert!(w.unwrap().contains("global sort"));
1175 }
1176
1177 #[test]
1178 fn sparse_range_not_triggered_for_non_chunked() {
1179 let e = make_export("t", ExportMode::Full, None);
1180 let w = check_sparse_range(&e, Some(100), Some("1"), Some("1000000"));
1181 assert!(w.is_none(), "should not warn for non-chunked mode");
1182 }
1183
1184 #[test]
1185 fn dense_surrogate_warning_with_row_number() {
1186 let mut e = make_export("t", ExportMode::Chunked, None);
1187 e.chunk_column = Some("rn".to_string());
1188 e.query = Some("SELECT *, ROW_NUMBER() OVER (ORDER BY id) AS rn FROM orders".to_string());
1189 let w = check_dense_surrogate_cost(&e);
1190 assert!(w.is_some(), "should warn about ROW_NUMBER cost");
1191 assert!(w.unwrap().contains("global sort"));
1192 }
1193
1194 #[test]
1195 fn no_dense_surrogate_warning_without_row_number() {
1196 let mut e = make_export("t", ExportMode::Chunked, None);
1197 e.chunk_column = Some("id".to_string());
1198 e.query = Some("SELECT * FROM orders".to_string());
1199 let w = check_dense_surrogate_cost(&e);
1200 assert!(w.is_none());
1201 }
1202
1203 #[test]
1204 fn no_dense_surrogate_warning_for_non_chunked() {
1205 let mut e = make_export("t", ExportMode::Full, None);
1206 e.query = Some("SELECT ROW_NUMBER() OVER () AS rn FROM t".to_string());
1207 let w = check_dense_surrogate_cost(&e);
1208 assert!(w.is_none(), "should not warn for non-chunked mode");
1209 }
1210
1211 #[test]
1212 fn parallel_memory_warning_large_dataset() {
1213 let mut e = make_export("t", ExportMode::Chunked, None);
1214 e.parallel = 4;
1215 let w = check_parallel_memory_risk(&e, Some(10_000_000));
1216 assert!(w.is_some(), "should warn about memory risk");
1217 let msg = w.unwrap();
1218 assert!(msg.contains("Parallel=4"), "got: {msg}");
1219 assert!(msg.contains("memory"), "got: {msg}");
1220 }
1221
1222 #[test]
1223 fn no_parallel_memory_warning_small_dataset() {
1224 let mut e = make_export("t", ExportMode::Chunked, None);
1225 e.parallel = 4;
1226 let w = check_parallel_memory_risk(&e, Some(1_000));
1227 assert!(w.is_none(), "should not warn for small dataset");
1228 }
1229
1230 #[test]
1231 fn no_parallel_memory_warning_single_worker() {
1232 let e = make_export("t", ExportMode::Full, None);
1233 let w = check_parallel_memory_risk(&e, Some(100_000_000));
1234 assert!(w.is_none(), "should not warn when parallel=1");
1235 }
1236
1237 #[test]
1238 fn suggestion_degraded_full_recommends_incremental() {
1239 let e = make_export("t", ExportMode::Full, None);
1240 let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e).unwrap();
1241 assert!(s.contains("incremental"), "got: {s}");
1242 }
1243
1244 #[test]
1245 fn suggestion_degraded_chunked_recommends_index() {
1246 let mut e = make_export("t", ExportMode::Chunked, None);
1247 e.chunk_column = Some("id".to_string());
1248 let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e).unwrap();
1249 assert!(s.contains("index on 'id'"), "got: {s}");
1250 }
1251
1252 #[test]
1253 fn suggestion_degraded_time_window_recommends_index() {
1254 let mut e = make_export("t", ExportMode::TimeWindow, None);
1255 e.time_column = Some("created_at".to_string());
1256 e.days_window = Some(7);
1257 let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e).unwrap();
1258 assert!(s.contains("index on 'created_at'"), "got: {s}");
1259 }
1260
1261 #[test]
1262 fn suggestion_unsafe_full_recommends_incremental() {
1263 let e = make_export("t", ExportMode::Full, None);
1264 let s = build_suggestion(&HealthVerdict::Unsafe, Some(100_000_000), false, &e).unwrap();
1265 assert!(s.contains("incremental"), "got: {s}");
1266 }
1267
1268 #[test]
1269 fn suggestion_unsafe_chunked_recommends_index_and_parallel() {
1270 let mut e = make_export("t", ExportMode::Chunked, None);
1271 e.chunk_column = Some("id".to_string());
1272 let s = build_suggestion(&HealthVerdict::Unsafe, Some(100_000_000), false, &e).unwrap();
1273 assert!(s.contains("index on 'id'"), "got: {s}");
1274 assert!(s.contains("parallel"), "got: {s}");
1275 }
1276
1277 #[test]
1278 fn suggestion_unsafe_incremental_recommends_index_on_cursor() {
1279 let e = make_export("t", ExportMode::Incremental, Some("updated_at"));
1280 let s = build_suggestion(&HealthVerdict::Unsafe, Some(100_000_000), false, &e).unwrap();
1281 assert!(s.contains("index on 'updated_at'"), "got: {s}");
1282 }
1283
1284 #[test]
1285 fn suggestion_acceptable_large_full_recommends_incremental() {
1286 let e = make_export("t", ExportMode::Full, None);
1287 let s = build_suggestion(&HealthVerdict::Acceptable, Some(20_000_000), true, &e).unwrap();
1288 assert!(s.contains("incremental"), "got: {s}");
1289 }
1290
1291 #[test]
1292 fn parallel_only_for_chunked_mode() {
1293 let e = make_export("t", ExportMode::Full, None);
1294 let (level, _) = recommend_parallelism(&e, Some(1_000_000), true);
1295 assert_eq!(level, 1, "non-chunked mode should recommend 1");
1296 }
1297
1298 #[test]
1299 fn parallel_small_dataset_is_one() {
1300 let mut e = make_export("t", ExportMode::Chunked, None);
1301 e.chunk_column = Some("id".to_string());
1302 let (level, _) = recommend_parallelism(&e, Some(10_000), true);
1303 assert_eq!(level, 1, "small dataset should recommend 1");
1304 }
1305
1306 #[test]
1307 fn parallel_moderate_indexed_is_two() {
1308 let mut e = make_export("t", ExportMode::Chunked, None);
1309 e.chunk_column = Some("id".to_string());
1310 let (level, _) = recommend_parallelism(&e, Some(200_000), true);
1311 assert_eq!(level, 2, "moderate indexed dataset should recommend 2");
1312 }
1313
1314 #[test]
1315 fn parallel_large_indexed_is_four() {
1316 let mut e = make_export("t", ExportMode::Chunked, None);
1317 e.chunk_column = Some("id".to_string());
1318 let (level, _) = recommend_parallelism(&e, Some(2_000_000), true);
1319 assert_eq!(level, 4, "large indexed dataset should recommend 4");
1320 }
1321
1322 #[test]
1323 fn parallel_no_index_large_is_one() {
1324 let mut e = make_export("t", ExportMode::Chunked, None);
1325 e.chunk_column = Some("id".to_string());
1326 let (level, reason) = recommend_parallelism(&e, Some(10_000_000), false);
1327 assert_eq!(level, 1, "no index + large should recommend 1");
1328 assert!(reason.contains("no index"), "got: {reason}");
1329 }
1330
1331 #[test]
1332 fn parallel_no_index_moderate_is_conservative() {
1333 let mut e = make_export("t", ExportMode::Chunked, None);
1334 e.chunk_column = Some("id".to_string());
1335 let (level, _) = recommend_parallelism(&e, Some(200_000), false);
1336 assert_eq!(
1337 level, 2,
1338 "no index + moderate should recommend 2 (conservative)"
1339 );
1340 }
1341
1342 #[test]
1343 fn suggestion_acceptable_large_chunked_recommends_parallel() {
1344 let mut e = make_export("t", ExportMode::Chunked, None);
1345 e.chunk_column = Some("id".to_string());
1346 let s = build_suggestion(&HealthVerdict::Acceptable, Some(20_000_000), true, &e).unwrap();
1347 assert!(s.contains("parallel"), "got: {s}");
1348 }
1349
1350 #[test]
1351 fn connection_limit_warn_when_parallel_meets_max() {
1352 let w = check_connection_limit(20, Some(20));
1353 assert!(w.is_some(), "should warn when parallel == max_connections");
1354 let msg = w.unwrap();
1355 assert!(msg.contains("max_connections=20"), "got: {msg}");
1356 assert!(msg.contains("parallel=20"), "got: {msg}");
1357 }
1358
1359 #[test]
1360 fn connection_limit_warn_when_parallel_exceeds_max() {
1361 let w = check_connection_limit(100, Some(20));
1362 assert!(w.is_some(), "should warn when parallel > max_connections");
1363 let msg = w.unwrap();
1364 assert!(msg.contains("max_connections=20"), "got: {msg}");
1365 }
1366
1367 #[test]
1368 fn connection_limit_no_warn_when_parallel_below_max() {
1369 let w = check_connection_limit(4, Some(100));
1370 assert!(
1371 w.is_none(),
1372 "should not warn when parallel << max_connections"
1373 );
1374 }
1375
1376 #[test]
1377 fn connection_limit_no_warn_when_parallel_is_one() {
1378 let w = check_connection_limit(1, Some(5));
1379 assert!(
1380 w.is_none(),
1381 "single worker never triggers connection warning"
1382 );
1383 }
1384
1385 #[test]
1386 fn connection_limit_skipped_note_when_max_unknown_and_parallel_gt_one() {
1387 let w = check_connection_limit(100, None);
1388 assert!(w.is_some(), "should note that check was skipped");
1389 let msg = w.unwrap();
1390 assert!(msg.contains("skipped"), "got: {msg}");
1391 }
1392
1393 #[test]
1394 fn connection_limit_no_note_when_max_unknown_and_parallel_is_one() {
1395 let w = check_connection_limit(1, None);
1396 assert!(
1397 w.is_none(),
1398 "single worker never triggers connection warning"
1399 );
1400 }
1401
1402 #[test]
1403 fn connection_limit_suggests_headroom() {
1404 let w = check_connection_limit(25, Some(20)).unwrap();
1405 assert!(
1407 w.contains("17"),
1408 "should suggest leaving headroom, got: {w}"
1409 );
1410 }
1411
1412 fn src_hint(msg: &str, st: SourceType) -> Option<&'static str> {
1415 let err = anyhow::anyhow!("{}", msg);
1416 let cat = categorize_source_error(&err);
1417 source_error_hint(cat, &err, &st)
1418 }
1419
1420 fn dest_hint(msg: &str, dt: DestinationType) -> Option<&'static str> {
1421 let err = anyhow::anyhow!("{}", msg);
1422 let dest = DestinationConfig {
1423 destination_type: dt,
1424 bucket: Some("b".into()),
1425 ..Default::default()
1426 };
1427 let cat = categorize_dest_error(&err, &dest);
1428 destination_error_hint(cat, &dest)
1429 }
1430
1431 #[test]
1432 fn source_tls_handshake_returns_pg_specific_tls_hint() {
1433 let h = src_hint("TLS handshake failed", SourceType::Postgres).expect("hint");
1434 assert!(h.contains("tls.mode") && h.contains("ca_file"), "got: {h}");
1435 }
1436
1437 #[test]
1438 fn source_tls_handshake_returns_mysql_specific_tls_hint() {
1439 let h = src_hint("certificate verify failed", SourceType::Mysql).expect("hint");
1440 assert!(h.contains("tls.mode"), "got: {h}");
1441 }
1442
1443 #[test]
1444 fn source_auth_error_postgres_mentions_pg_hba() {
1445 let h = src_hint("password authentication failed", SourceType::Postgres).expect("hint");
1446 assert!(h.contains("pg_hba") && h.contains("SELECT"), "got: {h}");
1447 }
1448
1449 #[test]
1450 fn source_auth_error_mysql_mentions_grant() {
1451 let h = src_hint(
1452 "Access denied for user 'rivet'@'localhost'",
1453 SourceType::Mysql,
1454 )
1455 .expect("hint");
1456 assert!(h.contains("GRANT") && h.contains("FLUSH"), "got: {h}");
1457 }
1458
1459 #[test]
1460 fn source_connectivity_error_mentions_bastion_and_network() {
1461 let h = src_hint("connection refused", SourceType::Postgres).expect("hint");
1462 assert!(h.contains("bastion") || h.contains("VPN"), "got: {h}");
1463 }
1464
1465 #[test]
1466 fn source_unknown_error_returns_no_hint() {
1467 let h = src_hint("totally unexpected", SourceType::Postgres);
1470 assert!(h.is_none(), "unknown errors should not produce a hint");
1471 }
1472
1473 #[test]
1474 fn dest_s3_auth_error_names_concrete_actions() {
1475 let h = dest_hint("permission denied", DestinationType::S3).expect("hint");
1476 assert!(
1477 h.contains("s3:PutObject") && h.contains("cloud-permissions"),
1478 "got: {h}"
1479 );
1480 }
1481
1482 #[test]
1483 fn dest_gcs_auth_error_names_concrete_actions() {
1484 let h = dest_hint("403 Forbidden", DestinationType::Gcs).expect("hint");
1485 assert!(
1486 h.contains("storage.objects") && h.contains("cloud-permissions"),
1487 "got: {h}"
1488 );
1489 }
1490
1491 #[test]
1492 fn categorize_dest_error_sas_expired_message_returns_sas_expired_category() {
1493 let err = anyhow::anyhow!(
1498 "Azure SAS token already expired (se=2024-01-01T00:00:00Z). Generate a new SAS and re-export."
1499 );
1500 let dest = DestinationConfig {
1501 destination_type: DestinationType::Azure,
1502 bucket: Some("c".into()),
1503 ..Default::default()
1504 };
1505 let cat = categorize_dest_error(&err, &dest);
1506 assert_eq!(
1507 cat, "sas expired",
1508 "expired-SAS error must categorise as 'sas expired', not '{cat}' — ordering in categorize_dest_error is load-bearing"
1509 );
1510 }
1511
1512 #[test]
1513 fn dest_azure_sas_expired_returns_regenerate_hint() {
1514 let h = dest_hint(
1518 "Azure SAS token already expired (se=2024-01-01T00:00:00Z)",
1519 DestinationType::Azure,
1520 )
1521 .expect("hint");
1522 assert!(
1523 h.contains("generate-sas") && h.contains("AZURE_STORAGE_SAS_TOKEN"),
1524 "got: {h}"
1525 );
1526 }
1527
1528 #[test]
1529 fn dest_s3_bucket_not_found_says_no_auto_create() {
1530 let h = dest_hint("NoSuchBucket", DestinationType::S3).expect("hint");
1531 assert!(
1532 h.contains("does NOT auto-create") && h.contains("aws s3 mb"),
1533 "got: {h}"
1534 );
1535 }
1536
1537 #[test]
1538 fn dest_s3_connectivity_error_warns_about_region_mismatch() {
1539 let h = dest_hint("dns error", DestinationType::S3).expect("hint");
1540 assert!(h.contains("region") || h.contains("endpoint"), "got: {h}");
1541 }
1542}