1mod analysis;
2pub(crate) mod cursor_expr;
3mod doctor;
4mod mssql;
5mod mysql;
6mod postgres;
7mod schema_error;
8pub mod type_report;
9
10pub(crate) use analysis::chunk_sparsity_from_counts;
11pub(crate) use analysis::SMALL_TABLE_ROW_THRESHOLD;
14#[cfg(test)]
15use analysis::{
16 build_suggestion, check_connection_limit, check_dense_surrogate_cost,
17 check_parallel_memory_risk, check_sparse_range, compute_verdict, derive_strategy,
18 recommend_parallelism, recommend_profile,
19};
20#[allow(unused_imports)]
21pub use doctor::doctor;
22pub(crate) use doctor::{categorize_source_error, source_error_hint};
25#[cfg(test)]
26use postgres::{extract_scan_type, parse_pg_row_estimate};
27
28use serde::Serialize;
29
30use crate::config::{Config, ExportConfig, SourceType};
31use crate::error::Result;
32use crate::types::policy::TypePolicy;
33use crate::types::target::{ExportTarget, TargetStatus};
34
35#[derive(Debug, Serialize)]
40#[serde(rename_all = "lowercase")]
41pub enum HealthVerdict {
42 Efficient,
43 Acceptable,
44 Degraded,
45 Unsafe,
46}
47
48impl std::fmt::Display for HealthVerdict {
49 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50 match self {
51 Self::Efficient => write!(f, "EFFICIENT"),
52 Self::Acceptable => write!(f, "ACCEPTABLE"),
53 Self::Degraded => write!(f, "DEGRADED"),
54 Self::Unsafe => write!(f, "UNSAFE"),
55 }
56 }
57}
58
59pub(crate) struct ExportDiagnostic {
60 pub export_name: String,
61 pub strategy: String,
62 pub mode: String,
63 pub cursor_column: Option<String>,
64 pub row_estimate: Option<i64>,
65 pub avg_row_bytes: Option<i64>,
70 pub cursor_min: Option<String>,
71 pub cursor_max: Option<String>,
72 pub scan_type: Option<String>,
73 pub uses_index: bool,
74 pub verdict: HealthVerdict,
75 pub recommended_profile: &'static str,
76 pub recommended_parallel: (u32, &'static str),
77 pub warnings: Vec<analysis::Warning>,
78 pub suggestion: Option<String>,
79}
80
81impl Serialize for ExportDiagnostic {
91 fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
92 where
93 S: serde::Serializer,
94 {
95 use serde::ser::SerializeMap;
96
97 #[derive(Serialize)]
98 struct RecommendedParallel {
99 level: u32,
100 reason: &'static str,
101 }
102 #[derive(Serialize)]
103 struct Capabilities {
104 uses_index: bool,
105 has_cursor: bool,
106 can_parallel: bool,
107 }
108
109 let mut map = serializer.serialize_map(None)?;
110 map.serialize_entry("export_name", &self.export_name)?;
111 map.serialize_entry("strategy", &self.strategy)?;
112 map.serialize_entry("mode", &self.mode)?;
113 if let Some(v) = &self.cursor_column {
114 map.serialize_entry("cursor_column", v)?;
115 }
116 if let Some(v) = &self.row_estimate {
117 map.serialize_entry("row_estimate", v)?;
118 }
119 if let Some(v) = &self.avg_row_bytes {
120 map.serialize_entry("avg_row_bytes", v)?;
121 }
122 if let Some(v) = &self.cursor_min {
123 map.serialize_entry("cursor_min", v)?;
124 }
125 if let Some(v) = &self.cursor_max {
126 map.serialize_entry("cursor_max", v)?;
127 }
128 if let Some(v) = &self.scan_type {
129 map.serialize_entry("scan_type", v)?;
130 }
131 map.serialize_entry("uses_index", &self.uses_index)?;
132 map.serialize_entry("verdict", &self.verdict)?;
133 map.serialize_entry("recommended_profile", &self.recommended_profile)?;
134 map.serialize_entry(
135 "recommended_parallel",
136 &RecommendedParallel {
137 level: self.recommended_parallel.0,
138 reason: self.recommended_parallel.1,
139 },
140 )?;
141 map.serialize_entry("warnings", &self.warnings)?;
142 if let Some(v) = &self.suggestion {
143 map.serialize_entry("suggestion", v)?;
144 }
145 map.serialize_entry(
146 "capabilities",
147 &Capabilities {
148 uses_index: self.uses_index,
149 has_cursor: self.cursor_column.is_some(),
150 can_parallel: self.recommended_parallel.0 > 1,
151 },
152 )?;
153 map.end()
154 }
155}
156
157pub(crate) fn get_export_diagnostic(
161 config: &Config,
162 export: &ExportConfig,
163) -> Result<ExportDiagnostic> {
164 let url = config.source.resolve_url()?;
165 let tls = config.source.tls.as_ref();
166 crate::source::warn_if_tls_disabled(&config.source);
167 match config.source.source_type {
168 SourceType::Postgres => postgres::diagnose_export_pg(&url, tls, export),
169 SourceType::Mysql => mysql::diagnose_export_mysql(&url, tls, export),
170 SourceType::Mssql => mssql::diagnose_export_mssql(&url, tls, export),
171 }
172}
173
174fn destination_identity(d: &crate::config::DestinationConfig) -> String {
181 format!(
182 "{:?}:{}:{}:{}",
183 d.destination_type,
184 d.bucket.as_deref().unwrap_or("-"),
185 d.endpoint.as_deref().unwrap_or("-"),
186 d.path.as_deref().unwrap_or("-"),
187 )
188}
189
190fn target_fail_note(n: usize, target_label: &str) -> String {
196 let col = if n == 1 { "column" } else { "columns" };
197 format!(
198 "Note: {n} {col} FAIL {target_label} compatibility; exit code is gated only with --strict (currently exit 0)"
199 )
200}
201
202pub(super) fn collect_diagnostics<F>(
208 exports: &[&ExportConfig],
209 mut diagnose: F,
210) -> Result<Vec<ExportDiagnostic>>
211where
212 F: FnMut(&ExportConfig) -> Result<ExportDiagnostic>,
213{
214 exports.iter().map(|&e| diagnose(e)).collect()
215}
216
217pub fn check(
218 config_path: &str,
219 export_name: Option<&str>,
220 params: Option<&std::collections::HashMap<String, String>>,
221 show_type_report: bool,
222 strict: bool,
223 json_output: bool,
224 target: Option<ExportTarget>,
225) -> Result<()> {
226 let config = Config::load_with_params(config_path, params)?;
227
228 let exports: Vec<&ExportConfig> = if let Some(name) = export_name {
229 let e = config
230 .exports
231 .iter()
232 .find(|e| e.name == name)
233 .ok_or_else(|| anyhow::anyhow!("export '{}' not found in config", name))?;
234 vec![e]
235 } else {
236 config.exports.iter().collect()
237 };
238
239 let url = config.source.resolve_url()?;
240 let tls = config.source.tls.as_ref();
241 crate::source::warn_if_tls_disabled(&config.source);
247 let diagnostics: Vec<ExportDiagnostic> = match config.source.source_type {
254 SourceType::Postgres => postgres::check_postgres(&url, tls, &exports)?,
255 SourceType::Mysql => mysql::check_mysql(&url, tls, &exports)?,
256 SourceType::Mssql => mssql::check_mssql(&url, tls, &exports)?,
257 };
258 if !json_output {
259 for diag in &diagnostics {
260 print_diagnostic(diag);
261 }
262 } else if !show_type_report {
263 for diag in &diagnostics {
270 println!("{}", serde_json::to_string(diag)?);
271 }
272 }
273 let diag_by_export: std::collections::HashMap<&str, &ExportDiagnostic> = if json_output {
278 diagnostics
279 .iter()
280 .map(|d| (d.export_name.as_str(), d))
281 .collect()
282 } else {
283 std::collections::HashMap::new()
284 };
285
286 let mut seen_destinations: std::collections::HashSet<String> = std::collections::HashSet::new();
295 for export in &exports {
296 let dest_key = destination_identity(&export.destination);
297 if !seen_destinations.insert(dest_key) {
298 continue;
299 }
300 let expanded = crate::plan::build::expand_destination_templates(
301 export.destination.clone(),
302 &export.name,
303 );
304 crate::destination::create_destination(&expanded).map_err(|e| {
305 anyhow::anyhow!(
306 "export '{}': destination preflight failed: {:#}",
307 export.name,
308 e
309 )
310 })?;
311 }
312
313 let mut clean = true;
317
318 if show_type_report {
319 let policy = if strict {
320 TypePolicy::strict()
321 } else {
322 TypePolicy::warn_only()
323 };
324
325 let mut any_fatal = false;
326 let mut target_fail_cols = 0usize;
332 let mut target_fail_label: Option<&'static str> = None;
333 for export in &exports {
334 let column_overrides =
335 crate::plan::parse_column_overrides_pub(&export.columns, &export.name)?;
336 if let Some(t) = export.target.as_deref()
340 && crate::types::target::ExportTarget::parse(t).is_none()
341 {
342 anyhow::bail!(
343 "export '{}': unknown target '{t}' (expected: {})",
344 export.name,
345 crate::types::target::ExportTarget::valid_target_names()
346 );
347 }
348 let eff_target = target.or_else(|| {
349 export
350 .target
351 .as_deref()
352 .and_then(crate::types::target::ExportTarget::parse)
353 });
354 let config_dir = std::path::Path::new(config_path)
355 .parent()
356 .unwrap_or_else(|| std::path::Path::new("."));
357 match type_report::collect_report(
358 &config,
359 export,
360 &column_overrides,
361 &policy,
362 eff_target,
363 config_dir,
364 params,
365 ) {
366 Ok(report) => {
367 if report.has_fatal() {
368 any_fatal = true;
369 }
370 if let Some(t) = eff_target
371 && report.has_target_fail()
372 {
373 any_fatal = true;
374 target_fail_cols += report
375 .columns
376 .iter()
377 .filter(|c| c.target_status == Some(TargetStatus::Fail))
378 .count();
379 target_fail_label.get_or_insert(t.label());
380 }
381 if json_output {
382 print_report_json_with_diagnostic(
396 &report,
397 diag_by_export.get(export.name.as_str()).copied(),
398 )?;
399 } else {
400 type_report::print_table(&report, eff_target);
401 }
402 }
403 Err(e) => {
404 log::warn!("type report for '{}' failed: {:#}", export.name, e);
405 if json_output
410 && let Some(diag) = diag_by_export.get(export.name.as_str()).copied()
411 {
412 println!("{}", serde_json::to_string(diag)?);
413 }
414 }
415 }
416 }
417
418 if strict && any_fatal {
419 anyhow::bail!("strict mode: unsafe type mappings found (see report above)");
420 } else if !strict && target_fail_cols > 0 && !json_output {
421 clean = false;
424 println!();
425 println!(
426 "{}",
427 target_fail_note(target_fail_cols, target_fail_label.unwrap_or("target"))
428 );
429 }
430 }
431
432 if !json_output {
433 println!();
436 println!(
437 "Verdicts: EFFICIENT > ACCEPTABLE > DEGRADED > UNSAFE — advisory only; the run is never blocked."
438 );
439 if clean {
440 println!(
442 "Looks good. Next: rivet run -c {config_path} --validate # export, then verify row counts"
443 );
444 }
445 }
446
447 Ok(())
448}
449
450fn print_report_json_with_diagnostic(
460 report: &type_report::ExportTypeReport,
461 diag: Option<&ExportDiagnostic>,
462) -> Result<()> {
463 let mut value = serde_json::to_value(report)?;
464 if let (Some(obj), Some(diag)) = (value.as_object_mut(), diag) {
465 obj.insert("diagnostic".to_string(), serde_json::to_value(diag)?);
466 }
467 println!("{}", serde_json::to_string(&value)?);
468 Ok(())
469}
470
471fn print_diagnostic(diag: &ExportDiagnostic) {
472 println!();
473 println!("Export: {}", diag.export_name);
474 println!(" Strategy: {}", diag.strategy);
475 println!(" Mode: {}", diag.mode);
476 if let Some(est) = diag.row_estimate {
477 if est >= 1_000_000 {
478 println!(" Row estimate: ~{}M", est / 1_000_000);
479 } else if est >= 1_000 {
480 println!(" Row estimate: ~{}K", est / 1_000);
481 } else {
482 println!(" Row estimate: ~{}", est);
483 }
484 }
485 if let Some(w) = diag.avg_row_bytes {
486 println!(" Row width: ~{} bytes", w);
487 }
488 if let (Some(min_v), Some(max_v)) = (&diag.cursor_min, &diag.cursor_max) {
489 println!(" Cursor range: {} .. {}", min_v, max_v);
490 }
491 if let Some(col) = &diag.cursor_column {
492 println!(" Cursor col: {}", col);
493 }
494 if diag.scan_type.is_some() {
499 let access = if diag.uses_index {
500 "index scan (the cursor/chunk column is indexed)"
501 } else {
502 "full table scan (no index on the read path)"
503 };
504 println!(" Access: {access}");
505 }
506 println!(" Verdict: {}", diag.verdict);
507 println!(
508 " Recommended: tuning.profile: {}",
509 diag.recommended_profile
510 );
511 let (par_level, par_reason) = diag.recommended_parallel;
512 if par_level > 1 {
513 println!(" Recommended: parallel: {} ({})", par_level, par_reason);
514 } else {
515 println!(" Parallelism: {} ({})", par_level, par_reason);
516 }
517 for w in &diag.warnings {
518 println!(" Warning: [{}] {}", w.severity.label(), w.message);
519 }
520 if let Some(suggestion) = &diag.suggestion {
521 println!(" Suggestion: {}", suggestion);
522 }
523}
524
525#[cfg(test)]
526mod tests {
527 use super::*;
528 use crate::config::{DestinationConfig, DestinationType, ExportConfig, ExportMode, FormatType};
529 use doctor::{
530 categorize_dest_error, categorize_source_error, destination_error_hint, source_error_hint,
531 };
532 use serde_json::Value;
533
534 fn make_export(name: &str, mode: ExportMode, cursor: Option<&str>) -> ExportConfig {
535 ExportConfig {
538 mode,
539 cursor_column: cursor.map(|s| s.to_string()),
540 query: Some("SELECT * FROM t".to_string()),
541 format: FormatType::Csv,
542 destination: DestinationConfig {
543 destination_type: DestinationType::Local,
544 path: Some("./out".to_string()),
545 ..Default::default()
546 },
547 ..crate::config::sample_export(name)
548 }
549 }
550
551 fn sample_diagnostic(name: &str) -> ExportDiagnostic {
556 ExportDiagnostic {
557 export_name: name.to_string(),
558 strategy: "incremental(updated_at)".to_string(),
559 mode: "incremental".to_string(),
560 cursor_column: Some("updated_at".to_string()),
561 row_estimate: Some(1_234_567),
562 avg_row_bytes: Some(96),
563 cursor_min: Some("2020-01-01".to_string()),
564 cursor_max: Some("2024-01-01".to_string()),
565 scan_type: Some("Index Scan".to_string()),
566 uses_index: true,
567 verdict: HealthVerdict::Degraded,
568 recommended_profile: "safe",
569 recommended_parallel: (4, "large indexed dataset"),
570 warnings: vec![
571 analysis::Warning::new(analysis::Severity::Medium, "Sparse key range".to_string()),
572 analysis::Warning::new(analysis::Severity::High, "memory risk".to_string()),
573 ],
574 suggestion: Some("create an index".to_string()),
575 }
576 }
577
578 #[test]
581 fn diagnostic_json_has_lowercase_verdict_and_core_fields() {
582 let diag = sample_diagnostic("orders");
583 let v: serde_json::Value =
584 serde_json::from_str(&serde_json::to_string(&diag).unwrap()).unwrap();
585
586 assert_eq!(v["verdict"], "degraded", "got: {v}");
589 assert_eq!(v["strategy"], "incremental(updated_at)", "got: {v}");
590 assert_eq!(v["mode"], "incremental", "got: {v}");
591 assert_eq!(v["recommended_profile"], "safe", "got: {v}");
592 assert!(v["warnings"].is_array(), "warnings must be an array: {v}");
593 assert_eq!(v["warnings"].as_array().unwrap().len(), 2, "got: {v}");
594 assert_eq!(v["warnings"][0]["severity"], "medium", "got: {v}");
596 assert_eq!(v["warnings"][0]["message"], "Sparse key range", "got: {v}");
597 assert_eq!(v["warnings"][1]["severity"], "high", "got: {v}");
598 assert_eq!(v["export_name"], "orders", "got: {v}");
599 }
600
601 #[test]
602 fn diagnostic_json_verdict_tokens_are_all_lowercase() {
603 for (verdict, token) in [
604 (HealthVerdict::Efficient, "efficient"),
605 (HealthVerdict::Acceptable, "acceptable"),
606 (HealthVerdict::Degraded, "degraded"),
607 (HealthVerdict::Unsafe, "unsafe"),
608 ] {
609 let mut diag = sample_diagnostic("t");
610 diag.verdict = verdict;
611 let v: serde_json::Value =
612 serde_json::from_str(&serde_json::to_string(&diag).unwrap()).unwrap();
613 assert_eq!(v["verdict"], token, "verdict must lowercase to {token}");
614 }
615 }
616
617 #[test]
618 fn diagnostic_json_recommended_parallel_is_named_object_not_tuple() {
619 let diag = sample_diagnostic("t");
622 let v: serde_json::Value =
623 serde_json::from_str(&serde_json::to_string(&diag).unwrap()).unwrap();
624 assert!(
625 v["recommended_parallel"].is_object(),
626 "recommended_parallel must be an object, got: {}",
627 v["recommended_parallel"]
628 );
629 assert_eq!(v["recommended_parallel"]["level"], 4, "got: {v}");
630 assert_eq!(
631 v["recommended_parallel"]["reason"], "large indexed dataset",
632 "got: {v}"
633 );
634 }
635
636 #[test]
637 fn diagnostic_json_capabilities_are_derived_from_fields() {
638 let diag = sample_diagnostic("t");
639 let v: serde_json::Value =
640 serde_json::from_str(&serde_json::to_string(&diag).unwrap()).unwrap();
641 let caps = &v["capabilities"];
642 assert_eq!(caps["uses_index"], true, "got: {caps}");
643 assert_eq!(caps["has_cursor"], true, "got: {caps}");
644 assert_eq!(caps["can_parallel"], true, "got: {caps}");
645 }
646
647 #[test]
648 fn diagnostic_json_capabilities_flip_with_fields() {
649 let mut diag = sample_diagnostic("t");
651 diag.cursor_column = None;
652 diag.uses_index = false;
653 diag.recommended_parallel = (1, "small dataset");
654 let v: serde_json::Value =
655 serde_json::from_str(&serde_json::to_string(&diag).unwrap()).unwrap();
656 let caps = &v["capabilities"];
657 assert_eq!(caps["uses_index"], false, "got: {caps}");
658 assert_eq!(caps["has_cursor"], false, "got: {caps}");
659 assert_eq!(caps["can_parallel"], false, "got: {caps}");
660 }
661
662 #[test]
663 fn diagnostic_json_skips_none_optionals() {
664 let mut diag = sample_diagnostic("t");
666 diag.suggestion = None;
667 diag.scan_type = None;
668 let v: serde_json::Value =
669 serde_json::from_str(&serde_json::to_string(&diag).unwrap()).unwrap();
670 let obj = v.as_object().unwrap();
671 assert!(!obj.contains_key("suggestion"), "None must be omitted: {v}");
672 assert!(!obj.contains_key("scan_type"), "None must be omitted: {v}");
673 }
674
675 fn merged_check_json(report: &type_report::ExportTypeReport, diag: &ExportDiagnostic) -> Value {
678 let mut value = serde_json::to_value(report).unwrap();
679 value.as_object_mut().unwrap().insert(
680 "diagnostic".to_string(),
681 serde_json::to_value(diag).unwrap(),
682 );
683 value
684 }
685
686 fn empty_report(export: &str) -> type_report::ExportTypeReport {
687 type_report::ExportTypeReport {
688 export: export.to_string(),
689 columns: Vec::new(),
690 violations: Vec::new(),
691 target_failures: false,
692 recovery_sql: None,
693 }
694 }
695
696 #[test]
697 fn check_json_merges_diagnostic_into_type_report_object() {
698 let report = empty_report("orders");
703 let diag = sample_diagnostic("orders");
704 let v = merged_check_json(&report, &diag);
705
706 assert_eq!(v["export"], "orders", "got: {v}");
708 assert!(v["columns"].is_array(), "columns at root: {v}");
709 assert!(v["violations"].is_array(), "violations at root: {v}");
710
711 let d = &v["diagnostic"];
713 assert_eq!(d["verdict"], "degraded", "got: {d}");
714 assert_eq!(d["strategy"], "incremental(updated_at)", "got: {d}");
715 assert_eq!(d["mode"], "incremental", "got: {d}");
716 assert_eq!(d["recommended_profile"], "safe", "got: {d}");
717 assert!(d["warnings"].is_array(), "warnings array: {d}");
718 assert_eq!(d["capabilities"]["has_cursor"], true, "got: {d}");
719 }
720
721 #[test]
722 fn check_json_object_is_a_single_parseable_line() {
723 let report = empty_report("orders");
727 let diag = sample_diagnostic("orders");
728 let line = serde_json::to_string(&merged_check_json(&report, &diag)).unwrap();
729 assert!(!line.contains('\n'), "one object per line: {line}");
730 let parsed: Value = serde_json::from_str(line.trim()).expect("must parse whole");
731 assert_eq!(parsed["export"], "orders");
732 }
733
734 #[test]
738 fn target_fail_note_names_count_target_and_strict_gate() {
739 let note = target_fail_note(2, "bigquery");
740 assert!(note.contains("2 columns FAIL"), "got: {note}");
741 assert!(note.contains("bigquery"), "got: {note}");
742 assert!(note.contains("--strict"), "got: {note}");
743 assert!(note.contains("exit 0"), "got: {note}");
744 }
745
746 #[test]
747 fn target_fail_note_singular_for_one_column() {
748 let note = target_fail_note(1, "duckdb");
749 assert!(note.contains("1 column FAIL"), "got: {note}");
750 assert!(!note.contains("1 columns"), "should be singular: {note}");
751 }
752
753 #[test]
754 fn verdict_small_indexed_with_cursor_is_efficient() {
755 let v = compute_verdict(Some(500_000), true, true, None, 1);
756 assert!(matches!(v, HealthVerdict::Efficient), "got: {v}");
757 }
758
759 #[test]
760 fn verdict_large_indexed_with_cursor_is_acceptable() {
761 let v = compute_verdict(Some(20_000_000), true, true, None, 1);
762 assert!(matches!(v, HealthVerdict::Acceptable), "got: {v}");
763 }
764
765 #[test]
766 fn verdict_no_index_no_cursor_is_degraded() {
767 let v = compute_verdict(Some(500_000), false, false, None, 1);
768 assert!(matches!(v, HealthVerdict::Degraded), "got: {v}");
769 }
770
771 #[test]
772 fn verdict_huge_no_index_is_unsafe() {
773 let v = compute_verdict(Some(100_000_000), false, false, None, 1);
774 assert!(matches!(v, HealthVerdict::Unsafe), "got: {v}");
775 }
776
777 #[test]
778 fn parse_pg_row_estimate_from_sort_plan() {
779 let plan = "Sort (cost=12345.67..12456.78 rows=1000455 width=50)\n -> Seq Scan on orders (cost=0.00..8765.43 rows=1000455 width=50)";
780 assert_eq!(parse_pg_row_estimate(plan), Some(1_000_455));
781 }
782
783 #[test]
784 fn parse_pg_row_estimate_from_index_scan() {
785 let plan =
786 "Index Scan using idx_updated on orders (cost=0.42..81676.36 rows=500000 width=50)";
787 assert_eq!(parse_pg_row_estimate(plan), Some(500_000));
788 }
789
790 #[test]
791 fn extract_scan_type_detects_seq_scan() {
792 let plan = "Sort (cost=...)\n -> Seq Scan on users (cost=...)";
793 let st = extract_scan_type(plan);
794 assert!(st.contains("Seq Scan"), "expected Seq Scan, got: {st}");
795 }
796
797 #[test]
798 fn extract_scan_type_detects_index_scan() {
799 let plan = "Index Scan using users_pkey on users (cost=0.42..123.45 rows=100 width=50)";
800 let st = extract_scan_type(plan);
801 assert!(st.contains("Index Scan"), "expected Index Scan, got: {st}");
802 }
803
804 #[test]
805 fn suggestion_for_efficient_verdict_is_none() {
806 let e = make_export("t", ExportMode::Full, None);
807 let s = build_suggestion(&HealthVerdict::Efficient, Some(1000), true, &e);
808 assert!(
809 s.is_none(),
810 "efficient verdict should produce no suggestion"
811 );
812 }
813
814 #[test]
815 fn suggestion_for_degraded_verdict_recommends_safe_profile() {
816 let e = make_export("t", ExportMode::Full, None);
817 let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e);
818 let msg = s.expect("degraded verdict should produce a suggestion");
819 assert!(
820 msg.contains("safe"),
821 "suggestion should recommend safe profile, got: {msg}"
822 );
823 }
824
825 fn src_err(msg: &str) -> &'static str {
826 categorize_source_error(&anyhow::anyhow!("{}", msg))
827 }
828
829 #[test]
830 fn source_password_rejected_is_auth_error() {
831 assert_eq!(
832 src_err("password authentication failed for user \"rivet\""),
833 "auth error"
834 );
835 }
836
837 #[test]
838 fn source_authentication_failed_is_auth_error() {
839 assert_eq!(src_err("FATAL: authentication failed"), "auth error");
840 }
841
842 #[test]
843 fn source_access_denied_is_auth_error() {
844 assert_eq!(
845 src_err("Access denied for user 'rivet'@'localhost'"),
846 "auth error"
847 );
848 }
849
850 #[test]
851 fn source_connection_refused_is_connectivity() {
852 assert_eq!(
853 src_err("connection refused (os error 61)"),
854 "connectivity error"
855 );
856 }
857
858 #[test]
859 fn source_timed_out_is_connectivity() {
860 assert_eq!(src_err("connection timed out"), "connectivity error");
861 }
862
863 #[test]
864 fn source_dns_translate_host_is_connectivity() {
865 assert_eq!(
866 src_err("could not translate host name \"db.bad\" to address"),
867 "connectivity error"
868 );
869 }
870
871 #[test]
872 fn source_name_not_known_is_connectivity() {
873 assert_eq!(src_err("Name or service not known"), "connectivity error");
874 }
875
876 #[test]
877 fn source_unknown_error_is_generic() {
878 assert_eq!(src_err("something totally unexpected"), "error");
879 }
880
881 fn dest_config(dtype: DestinationType) -> DestinationConfig {
882 DestinationConfig {
883 destination_type: dtype,
884 bucket: Some("b".to_string()),
885 ..Default::default()
886 }
887 }
888
889 fn dest_err(msg: &str, dtype: DestinationType) -> &'static str {
890 let cfg = dest_config(dtype);
891 categorize_dest_error(&anyhow::anyhow!("{}", msg), &cfg)
892 }
893
894 fn local_dest(path: &str) -> DestinationConfig {
895 DestinationConfig {
896 destination_type: DestinationType::Local,
897 path: Some(path.to_string()),
898 ..Default::default()
899 }
900 }
901
902 #[test]
907 fn destination_identity_distinguishes_local_paths() {
908 assert_ne!(
909 destination_identity(&local_dest("/tmp/a")),
910 destination_identity(&local_dest("/tmp/b")),
911 );
912 }
913
914 #[test]
915 fn destination_identity_collapses_identical_local_destinations() {
916 assert_eq!(
917 destination_identity(&local_dest("/tmp/a")),
918 destination_identity(&local_dest("/tmp/a")),
919 );
920 }
921
922 #[test]
923 fn destination_identity_distinguishes_buckets() {
924 let a = DestinationConfig {
925 bucket: Some("bucket-a".to_string()),
926 ..dest_config(DestinationType::S3)
927 };
928 let b = DestinationConfig {
929 bucket: Some("bucket-b".to_string()),
930 ..dest_config(DestinationType::S3)
931 };
932 assert_ne!(destination_identity(&a), destination_identity(&b));
933 }
934
935 #[test]
938 fn destination_identity_distinguishes_endpoints_for_same_bucket() {
939 let aws = dest_config(DestinationType::S3);
940 let minio = DestinationConfig {
941 endpoint: Some("http://localhost:9000".to_string()),
942 ..dest_config(DestinationType::S3)
943 };
944 assert_ne!(destination_identity(&aws), destination_identity(&minio));
945 }
946
947 #[test]
948 fn dest_credential_loading_is_auth_error() {
949 assert_eq!(
950 dest_err(
951 "loading credential to sign http request",
952 DestinationType::Gcs
953 ),
954 "auth error"
955 );
956 }
957
958 #[test]
959 fn dest_permission_denied_is_auth_error() {
960 assert_eq!(
961 dest_err("permission denied on resource bucket", DestinationType::S3),
962 "auth error"
963 );
964 }
965
966 #[test]
967 fn dest_forbidden_is_auth_error() {
968 assert_eq!(
969 dest_err("403 Forbidden", DestinationType::Gcs),
970 "auth error"
971 );
972 }
973
974 #[test]
975 fn dest_unauthorized_is_auth_error() {
976 assert_eq!(
977 dest_err("401 Unauthorized", DestinationType::S3),
978 "auth error"
979 );
980 }
981
982 #[test]
983 fn dest_invalid_grant_is_auth_error() {
984 assert_eq!(
985 dest_err(
986 "invalid_grant: token has been revoked",
987 DestinationType::Gcs
988 ),
989 "auth error"
990 );
991 }
992
993 #[test]
994 fn dest_nosuchbucket_s3_is_bucket_not_found() {
995 assert_eq!(
996 dest_err(
997 "NoSuchBucket: the specified bucket does not exist",
998 DestinationType::S3
999 ),
1000 "bucket not found"
1001 );
1002 }
1003
1004 #[test]
1005 fn dest_not_found_gcs_is_bucket_not_found() {
1006 assert_eq!(
1007 dest_err("bucket not found (404)", DestinationType::Gcs),
1008 "bucket not found"
1009 );
1010 }
1011
1012 #[test]
1013 fn dest_not_found_local_is_path_not_found() {
1014 assert_eq!(
1015 dest_err("path not found: /tmp/missing", DestinationType::Local),
1016 "path not found"
1017 );
1018 }
1019
1020 #[test]
1021 fn dest_connection_refused_is_connectivity() {
1022 assert_eq!(
1023 dest_err("connection refused to endpoint", DestinationType::S3),
1024 "connectivity error"
1025 );
1026 }
1027
1028 #[test]
1029 fn dest_dns_error_is_connectivity() {
1030 assert_eq!(
1031 dest_err("dns error: failed to lookup address", DestinationType::S3),
1032 "connectivity error"
1033 );
1034 }
1035
1036 #[test]
1037 fn dest_timed_out_is_connectivity() {
1038 assert_eq!(
1039 dest_err("request timed out after 30s", DestinationType::Gcs),
1040 "connectivity error"
1041 );
1042 }
1043
1044 #[test]
1045 fn dest_unknown_error_is_generic() {
1046 assert_eq!(
1047 dest_err("something else entirely", DestinationType::S3),
1048 "error"
1049 );
1050 }
1051
1052 #[test]
1053 fn strategy_full_scan() {
1054 let e = make_export("t", ExportMode::Full, None);
1055 assert_eq!(derive_strategy(&e), "full-scan");
1056 }
1057
1058 #[test]
1059 fn strategy_full_parallel() {
1060 let mut e = make_export("t", ExportMode::Full, None);
1061 e.parallel = 4;
1062 assert_eq!(derive_strategy(&e), "full-parallel(4)");
1063 }
1064
1065 #[test]
1066 fn strategy_incremental() {
1067 let e = make_export("t", ExportMode::Incremental, Some("updated_at"));
1068 assert_eq!(derive_strategy(&e), "incremental(updated_at)");
1069 }
1070
1071 #[test]
1072 fn strategy_chunked() {
1073 let mut e = make_export("t", ExportMode::Chunked, None);
1074 e.chunk_column = Some("id".to_string());
1075 e.chunk_size = 50_000;
1076 assert_eq!(derive_strategy(&e), "chunked(id, size=50000)");
1077 }
1078
1079 #[test]
1080 fn strategy_chunked_parallel() {
1081 let mut e = make_export("t", ExportMode::Chunked, None);
1082 e.chunk_column = Some("id".to_string());
1083 e.chunk_size = 50_000;
1084 e.parallel = 3;
1085 assert_eq!(derive_strategy(&e), "chunked-parallel(id, size=50000, p=3)");
1086 }
1087
1088 #[test]
1089 fn strategy_time_window() {
1090 let mut e = make_export("t", ExportMode::TimeWindow, None);
1091 e.time_column = Some("created_at".to_string());
1092 e.days_window = Some(7);
1093 assert_eq!(derive_strategy(&e), "time-window(created_at, 7d)");
1094 }
1095
1096 #[test]
1097 fn profile_small_indexed_is_fast() {
1098 let e = make_export("t", ExportMode::Full, None);
1099 assert_eq!(recommend_profile(Some(500_000), true, &e), "fast");
1100 }
1101
1102 #[test]
1103 fn profile_medium_indexed_is_balanced() {
1104 let e = make_export("t", ExportMode::Full, None);
1105 assert_eq!(recommend_profile(Some(5_000_000), true, &e), "balanced");
1106 }
1107
1108 #[test]
1109 fn profile_large_indexed_is_safe() {
1110 let e = make_export("t", ExportMode::Full, None);
1111 assert_eq!(recommend_profile(Some(50_000_000), true, &e), "safe");
1112 }
1113
1114 #[test]
1115 fn profile_small_no_index_is_balanced() {
1116 let e = make_export("t", ExportMode::Full, None);
1117 assert_eq!(recommend_profile(Some(50_000), false, &e), "balanced");
1118 }
1119
1120 #[test]
1121 fn profile_small_no_index_parallel_is_safe() {
1122 let mut e = make_export("t", ExportMode::Full, None);
1123 e.parallel = 4;
1124 assert_eq!(recommend_profile(Some(50_000), false, &e), "safe");
1125 }
1126
1127 #[test]
1128 fn profile_medium_no_index_is_balanced() {
1129 let e = make_export("t", ExportMode::Full, None);
1130 assert_eq!(recommend_profile(Some(500_000), false, &e), "balanced");
1131 }
1132
1133 #[test]
1134 fn profile_large_no_index_is_safe() {
1135 let e = make_export("t", ExportMode::Full, None);
1136 assert_eq!(recommend_profile(Some(5_000_000), false, &e), "safe");
1137 }
1138
1139 #[test]
1140 fn sparse_range_warning_when_very_sparse() {
1141 let mut e = make_export("t", ExportMode::Chunked, None);
1142 e.chunk_column = Some("id".to_string());
1143 e.chunk_size = 100_000;
1144 let w = check_sparse_range(&e, Some(100_000), Some("1"), Some("10000000"));
1145 assert!(w.is_some(), "should warn about sparse range");
1146 let msg = w.unwrap();
1147 assert!(msg.contains("Sparse key range"), "got: {msg}");
1148 assert!(msg.contains("empty"), "got: {msg}");
1149 }
1150
1151 #[test]
1152 fn sparse_range_no_warning_when_dense() {
1153 let mut e = make_export("t", ExportMode::Chunked, None);
1154 e.chunk_column = Some("id".to_string());
1155 e.chunk_size = 100_000;
1156 let w = check_sparse_range(&e, Some(100_000), Some("1"), Some("100000"));
1157 assert!(w.is_none(), "should not warn for dense range");
1158 }
1159
1160 #[test]
1161 fn sparse_range_skipped_when_chunk_dense() {
1162 let mut e = make_export("t", ExportMode::Chunked, None);
1163 e.chunk_column = Some("id".to_string());
1164 e.chunk_dense = true;
1165 e.chunk_size = 100_000;
1166 let w = check_sparse_range(&e, Some(100_000), Some("1"), Some("10000000"));
1167 assert!(
1168 w.is_none(),
1169 "chunk_dense uses ordinals, not physical id span"
1170 );
1171 }
1172
1173 #[test]
1174 fn dense_surrogate_warning_when_chunk_dense_builtin() {
1175 let mut e = make_export("t", ExportMode::Chunked, None);
1176 e.chunk_column = Some("id".to_string());
1177 e.chunk_dense = true;
1178 e.query = Some("SELECT id FROM orders".to_string());
1179 let w = check_dense_surrogate_cost(&e);
1180 assert!(w.is_some(), "should warn about built-in ROW_NUMBER cost");
1181 assert!(w.unwrap().contains("global sort"));
1182 }
1183
1184 #[test]
1185 fn sparse_range_not_triggered_for_non_chunked() {
1186 let e = make_export("t", ExportMode::Full, None);
1187 let w = check_sparse_range(&e, Some(100), Some("1"), Some("1000000"));
1188 assert!(w.is_none(), "should not warn for non-chunked mode");
1189 }
1190
1191 #[test]
1192 fn dense_surrogate_warning_with_row_number() {
1193 let mut e = make_export("t", ExportMode::Chunked, None);
1194 e.chunk_column = Some("rn".to_string());
1195 e.query = Some("SELECT *, ROW_NUMBER() OVER (ORDER BY id) AS rn FROM orders".to_string());
1196 let w = check_dense_surrogate_cost(&e);
1197 assert!(w.is_some(), "should warn about ROW_NUMBER cost");
1198 assert!(w.unwrap().contains("global sort"));
1199 }
1200
1201 #[test]
1202 fn no_dense_surrogate_warning_without_row_number() {
1203 let mut e = make_export("t", ExportMode::Chunked, None);
1204 e.chunk_column = Some("id".to_string());
1205 e.query = Some("SELECT * FROM orders".to_string());
1206 let w = check_dense_surrogate_cost(&e);
1207 assert!(w.is_none());
1208 }
1209
1210 #[test]
1211 fn no_dense_surrogate_warning_for_non_chunked() {
1212 let mut e = make_export("t", ExportMode::Full, None);
1213 e.query = Some("SELECT ROW_NUMBER() OVER () AS rn FROM t".to_string());
1214 let w = check_dense_surrogate_cost(&e);
1215 assert!(w.is_none(), "should not warn for non-chunked mode");
1216 }
1217
1218 #[test]
1219 fn parallel_memory_warning_large_dataset() {
1220 let mut e = make_export("t", ExportMode::Chunked, None);
1221 e.parallel = 4;
1222 let w = check_parallel_memory_risk(&e, Some(10_000_000));
1223 assert!(w.is_some(), "should warn about memory risk");
1224 let msg = w.unwrap();
1225 assert!(msg.contains("Parallel=4"), "got: {msg}");
1226 assert!(msg.contains("memory"), "got: {msg}");
1227 }
1228
1229 #[test]
1230 fn no_parallel_memory_warning_small_dataset() {
1231 let mut e = make_export("t", ExportMode::Chunked, None);
1232 e.parallel = 4;
1233 let w = check_parallel_memory_risk(&e, Some(1_000));
1234 assert!(w.is_none(), "should not warn for small dataset");
1235 }
1236
1237 #[test]
1238 fn no_parallel_memory_warning_single_worker() {
1239 let e = make_export("t", ExportMode::Full, None);
1240 let w = check_parallel_memory_risk(&e, Some(100_000_000));
1241 assert!(w.is_none(), "should not warn when parallel=1");
1242 }
1243
1244 #[test]
1245 fn suggestion_degraded_full_recommends_incremental() {
1246 let e = make_export("t", ExportMode::Full, None);
1247 let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e).unwrap();
1248 assert!(s.contains("incremental"), "got: {s}");
1249 }
1250
1251 #[test]
1252 fn suggestion_degraded_chunked_recommends_index() {
1253 let mut e = make_export("t", ExportMode::Chunked, None);
1254 e.chunk_column = Some("id".to_string());
1255 let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e).unwrap();
1256 assert!(s.contains("index on 'id'"), "got: {s}");
1257 }
1258
1259 #[test]
1260 fn suggestion_degraded_time_window_recommends_index() {
1261 let mut e = make_export("t", ExportMode::TimeWindow, None);
1262 e.time_column = Some("created_at".to_string());
1263 e.days_window = Some(7);
1264 let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e).unwrap();
1265 assert!(s.contains("index on 'created_at'"), "got: {s}");
1266 }
1267
1268 #[test]
1269 fn suggestion_unsafe_full_recommends_incremental() {
1270 let e = make_export("t", ExportMode::Full, None);
1271 let s = build_suggestion(&HealthVerdict::Unsafe, Some(100_000_000), false, &e).unwrap();
1272 assert!(s.contains("incremental"), "got: {s}");
1273 }
1274
1275 #[test]
1276 fn suggestion_unsafe_chunked_recommends_index_and_parallel() {
1277 let mut e = make_export("t", ExportMode::Chunked, None);
1278 e.chunk_column = Some("id".to_string());
1279 let s = build_suggestion(&HealthVerdict::Unsafe, Some(100_000_000), false, &e).unwrap();
1280 assert!(s.contains("index on 'id'"), "got: {s}");
1281 assert!(s.contains("parallel"), "got: {s}");
1282 }
1283
1284 #[test]
1285 fn suggestion_unsafe_incremental_recommends_index_on_cursor() {
1286 let e = make_export("t", ExportMode::Incremental, Some("updated_at"));
1287 let s = build_suggestion(&HealthVerdict::Unsafe, Some(100_000_000), false, &e).unwrap();
1288 assert!(s.contains("index on 'updated_at'"), "got: {s}");
1289 }
1290
1291 #[test]
1292 fn suggestion_acceptable_large_full_recommends_incremental() {
1293 let e = make_export("t", ExportMode::Full, None);
1294 let s = build_suggestion(&HealthVerdict::Acceptable, Some(20_000_000), true, &e).unwrap();
1295 assert!(s.contains("incremental"), "got: {s}");
1296 }
1297
1298 #[test]
1299 fn parallel_only_for_chunked_mode() {
1300 let e = make_export("t", ExportMode::Full, None);
1301 let (level, _) = recommend_parallelism(&e, Some(1_000_000), true);
1302 assert_eq!(level, 1, "non-chunked mode should recommend 1");
1303 }
1304
1305 #[test]
1306 fn parallel_small_dataset_is_one() {
1307 let mut e = make_export("t", ExportMode::Chunked, None);
1308 e.chunk_column = Some("id".to_string());
1309 let (level, _) = recommend_parallelism(&e, Some(10_000), true);
1310 assert_eq!(level, 1, "small dataset should recommend 1");
1311 }
1312
1313 #[test]
1314 fn parallel_moderate_indexed_is_two() {
1315 let mut e = make_export("t", ExportMode::Chunked, None);
1316 e.chunk_column = Some("id".to_string());
1317 let (level, _) = recommend_parallelism(&e, Some(200_000), true);
1318 assert_eq!(level, 2, "moderate indexed dataset should recommend 2");
1319 }
1320
1321 #[test]
1322 fn parallel_large_indexed_is_four() {
1323 let mut e = make_export("t", ExportMode::Chunked, None);
1324 e.chunk_column = Some("id".to_string());
1325 let (level, _) = recommend_parallelism(&e, Some(2_000_000), true);
1326 assert_eq!(level, 4, "large indexed dataset should recommend 4");
1327 }
1328
1329 #[test]
1330 fn parallel_no_index_large_is_one() {
1331 let mut e = make_export("t", ExportMode::Chunked, None);
1332 e.chunk_column = Some("id".to_string());
1333 let (level, reason) = recommend_parallelism(&e, Some(10_000_000), false);
1334 assert_eq!(level, 1, "no index + large should recommend 1");
1335 assert!(reason.contains("no index"), "got: {reason}");
1336 }
1337
1338 #[test]
1339 fn parallel_no_index_moderate_is_conservative() {
1340 let mut e = make_export("t", ExportMode::Chunked, None);
1341 e.chunk_column = Some("id".to_string());
1342 let (level, _) = recommend_parallelism(&e, Some(200_000), false);
1343 assert_eq!(
1344 level, 2,
1345 "no index + moderate should recommend 2 (conservative)"
1346 );
1347 }
1348
1349 #[test]
1350 fn suggestion_acceptable_large_chunked_recommends_parallel() {
1351 let mut e = make_export("t", ExportMode::Chunked, None);
1352 e.chunk_column = Some("id".to_string());
1353 let s = build_suggestion(&HealthVerdict::Acceptable, Some(20_000_000), true, &e).unwrap();
1354 assert!(s.contains("parallel"), "got: {s}");
1355 }
1356
1357 #[test]
1358 fn connection_limit_warn_when_parallel_meets_max() {
1359 let w = check_connection_limit(20, Some(20));
1360 assert!(w.is_some(), "should warn when parallel == max_connections");
1361 let msg = w.unwrap();
1362 assert!(msg.contains("max_connections=20"), "got: {msg}");
1363 assert!(msg.contains("parallel=20"), "got: {msg}");
1364 }
1365
1366 #[test]
1367 fn connection_limit_warn_when_parallel_exceeds_max() {
1368 let w = check_connection_limit(100, Some(20));
1369 assert!(w.is_some(), "should warn when parallel > max_connections");
1370 let msg = w.unwrap();
1371 assert!(msg.contains("max_connections=20"), "got: {msg}");
1372 }
1373
1374 #[test]
1375 fn connection_limit_no_warn_when_parallel_below_max() {
1376 let w = check_connection_limit(4, Some(100));
1377 assert!(
1378 w.is_none(),
1379 "should not warn when parallel << max_connections"
1380 );
1381 }
1382
1383 #[test]
1384 fn connection_limit_no_warn_when_parallel_is_one() {
1385 let w = check_connection_limit(1, Some(5));
1386 assert!(
1387 w.is_none(),
1388 "single worker never triggers connection warning"
1389 );
1390 }
1391
1392 #[test]
1393 fn connection_limit_skipped_note_when_max_unknown_and_parallel_gt_one() {
1394 let w = check_connection_limit(100, None);
1395 assert!(w.is_some(), "should note that check was skipped");
1396 let msg = w.unwrap();
1397 assert!(msg.contains("skipped"), "got: {msg}");
1398 }
1399
1400 #[test]
1401 fn connection_limit_no_note_when_max_unknown_and_parallel_is_one() {
1402 let w = check_connection_limit(1, None);
1403 assert!(
1404 w.is_none(),
1405 "single worker never triggers connection warning"
1406 );
1407 }
1408
1409 #[test]
1410 fn connection_limit_suggests_headroom() {
1411 let w = check_connection_limit(25, Some(20)).unwrap();
1412 assert!(
1414 w.contains("17"),
1415 "should suggest leaving headroom, got: {w}"
1416 );
1417 }
1418
1419 fn src_hint(msg: &str, st: SourceType) -> Option<&'static str> {
1422 let err = anyhow::anyhow!("{}", msg);
1423 let cat = categorize_source_error(&err);
1424 source_error_hint(cat, &err, &st)
1425 }
1426
1427 fn dest_hint(msg: &str, dt: DestinationType) -> Option<&'static str> {
1428 let err = anyhow::anyhow!("{}", msg);
1429 let dest = DestinationConfig {
1430 destination_type: dt,
1431 bucket: Some("b".into()),
1432 ..Default::default()
1433 };
1434 let cat = categorize_dest_error(&err, &dest);
1435 destination_error_hint(cat, &dest)
1436 }
1437
1438 #[test]
1439 fn source_tls_handshake_returns_pg_specific_tls_hint() {
1440 let h = src_hint("TLS handshake failed", SourceType::Postgres).expect("hint");
1441 assert!(h.contains("tls.mode") && h.contains("ca_file"), "got: {h}");
1442 }
1443
1444 #[test]
1445 fn source_tls_handshake_returns_mysql_specific_tls_hint() {
1446 let h = src_hint("certificate verify failed", SourceType::Mysql).expect("hint");
1447 assert!(h.contains("tls.mode"), "got: {h}");
1448 }
1449
1450 #[test]
1451 fn source_auth_error_postgres_mentions_pg_hba() {
1452 let h = src_hint("password authentication failed", SourceType::Postgres).expect("hint");
1453 assert!(h.contains("pg_hba") && h.contains("SELECT"), "got: {h}");
1454 }
1455
1456 #[test]
1457 fn source_auth_error_mysql_mentions_grant() {
1458 let h = src_hint(
1459 "Access denied for user 'rivet'@'localhost'",
1460 SourceType::Mysql,
1461 )
1462 .expect("hint");
1463 assert!(h.contains("GRANT") && h.contains("FLUSH"), "got: {h}");
1464 }
1465
1466 #[test]
1467 fn source_connectivity_error_mentions_bastion_and_network() {
1468 let h = src_hint("connection refused", SourceType::Postgres).expect("hint");
1469 assert!(h.contains("bastion") || h.contains("VPN"), "got: {h}");
1470 }
1471
1472 #[test]
1473 fn source_unknown_error_returns_no_hint() {
1474 let h = src_hint("totally unexpected", SourceType::Postgres);
1477 assert!(h.is_none(), "unknown errors should not produce a hint");
1478 }
1479
1480 #[test]
1481 fn dest_s3_auth_error_names_concrete_actions() {
1482 let h = dest_hint("permission denied", DestinationType::S3).expect("hint");
1483 assert!(
1484 h.contains("s3:PutObject") && h.contains("cloud-permissions"),
1485 "got: {h}"
1486 );
1487 }
1488
1489 #[test]
1490 fn dest_gcs_auth_error_names_concrete_actions() {
1491 let h = dest_hint("403 Forbidden", DestinationType::Gcs).expect("hint");
1492 assert!(
1493 h.contains("storage.objects") && h.contains("cloud-permissions"),
1494 "got: {h}"
1495 );
1496 }
1497
1498 #[test]
1499 fn categorize_dest_error_sas_expired_message_returns_sas_expired_category() {
1500 let err = anyhow::anyhow!(
1505 "Azure SAS token already expired (se=2024-01-01T00:00:00Z). Generate a new SAS and re-export."
1506 );
1507 let dest = DestinationConfig {
1508 destination_type: DestinationType::Azure,
1509 bucket: Some("c".into()),
1510 ..Default::default()
1511 };
1512 let cat = categorize_dest_error(&err, &dest);
1513 assert_eq!(
1514 cat, "sas expired",
1515 "expired-SAS error must categorise as 'sas expired', not '{cat}' — ordering in categorize_dest_error is load-bearing"
1516 );
1517 }
1518
1519 #[test]
1520 fn dest_azure_sas_expired_returns_regenerate_hint() {
1521 let h = dest_hint(
1525 "Azure SAS token already expired (se=2024-01-01T00:00:00Z)",
1526 DestinationType::Azure,
1527 )
1528 .expect("hint");
1529 assert!(
1530 h.contains("generate-sas") && h.contains("AZURE_STORAGE_SAS_TOKEN"),
1531 "got: {h}"
1532 );
1533 }
1534
1535 #[test]
1536 fn dest_s3_bucket_not_found_says_no_auto_create() {
1537 let h = dest_hint("NoSuchBucket", DestinationType::S3).expect("hint");
1538 assert!(
1539 h.contains("does NOT auto-create") && h.contains("aws s3 mb"),
1540 "got: {h}"
1541 );
1542 }
1543
1544 #[test]
1545 fn dest_s3_connectivity_error_warns_about_region_mismatch() {
1546 let h = dest_hint("dns error", DestinationType::S3).expect("hint");
1547 assert!(h.contains("region") || h.contains("endpoint"), "got: {h}");
1548 }
1549}