1mod analysis;
2pub(crate) mod cursor_expr;
3mod doctor;
4mod mysql;
5mod postgres;
6pub mod type_report;
7
8pub(crate) use analysis::chunk_sparsity_from_counts;
9#[cfg(test)]
10use analysis::{
11 build_suggestion, check_connection_limit, check_dense_surrogate_cost,
12 check_parallel_memory_risk, check_sparse_range, compute_verdict, derive_strategy,
13 recommend_parallelism, recommend_profile,
14};
15#[allow(unused_imports)]
16pub use doctor::doctor;
17#[cfg(test)]
18use postgres::{extract_scan_type, parse_pg_row_estimate};
19
20use crate::config::{Config, ExportConfig, SourceType};
21use crate::error::Result;
22use crate::types::policy::TypePolicy;
23use crate::types::target::ExportTarget;
24
25#[derive(Debug)]
26pub enum HealthVerdict {
27 Efficient,
28 Acceptable,
29 Degraded,
30 Unsafe,
31}
32
33impl std::fmt::Display for HealthVerdict {
34 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35 match self {
36 Self::Efficient => write!(f, "EFFICIENT"),
37 Self::Acceptable => write!(f, "ACCEPTABLE"),
38 Self::Degraded => write!(f, "DEGRADED"),
39 Self::Unsafe => write!(f, "UNSAFE"),
40 }
41 }
42}
43
44pub(crate) struct ExportDiagnostic {
45 pub export_name: String,
46 pub strategy: String,
47 pub mode: String,
48 pub cursor_column: Option<String>,
49 pub row_estimate: Option<i64>,
50 pub cursor_min: Option<String>,
51 pub cursor_max: Option<String>,
52 pub scan_type: Option<String>,
53 pub uses_index: bool,
54 pub verdict: HealthVerdict,
55 pub recommended_profile: &'static str,
56 pub recommended_parallel: (u32, &'static str),
57 pub warnings: Vec<String>,
58 pub suggestion: Option<String>,
59}
60
61pub(crate) fn get_export_diagnostic(
65 config: &Config,
66 export: &ExportConfig,
67) -> Result<ExportDiagnostic> {
68 let url = config.source.resolve_url()?;
69 let tls = config.source.tls.as_ref();
70 crate::source::warn_if_tls_disabled(&config.source);
71 match config.source.source_type {
72 SourceType::Postgres => postgres::diagnose_export_pg(&url, tls, export),
73 SourceType::Mysql => mysql::diagnose_export_mysql(&url, tls, export),
74 }
75}
76
77pub fn check(
78 config_path: &str,
79 export_name: Option<&str>,
80 params: Option<&std::collections::HashMap<String, String>>,
81 show_type_report: bool,
82 strict: bool,
83 json_output: bool,
84 target: Option<ExportTarget>,
85) -> Result<()> {
86 let config = Config::load_with_params(config_path, params)?;
87
88 let exports: Vec<&ExportConfig> = if let Some(name) = export_name {
89 let e = config
90 .exports
91 .iter()
92 .find(|e| e.name == name)
93 .ok_or_else(|| anyhow::anyhow!("export '{}' not found in config", name))?;
94 vec![e]
95 } else {
96 config.exports.iter().collect()
97 };
98
99 let url = config.source.resolve_url()?;
100 let tls = config.source.tls.as_ref();
101 crate::source::warn_if_tls_disabled(&config.source);
107 match config.source.source_type {
108 SourceType::Postgres => postgres::check_postgres(&url, tls, &exports, json_output)?,
109 SourceType::Mysql => mysql::check_mysql(&url, tls, &exports, json_output)?,
110 }
111
112 let mut seen_destinations: std::collections::HashSet<String> = std::collections::HashSet::new();
121 for export in &exports {
122 let dest_key = format!(
123 "{:?}:{}:{}:{}",
124 export.destination.destination_type,
125 export.destination.bucket.as_deref().unwrap_or("-"),
126 export.destination.endpoint.as_deref().unwrap_or("-"),
127 export.destination.path.as_deref().unwrap_or("-"),
128 );
129 if !seen_destinations.insert(dest_key) {
130 continue;
131 }
132 let expanded = crate::plan::build::expand_destination_templates(
133 export.destination.clone(),
134 &export.name,
135 );
136 crate::destination::create_destination(&expanded).map_err(|e| {
137 anyhow::anyhow!(
138 "export '{}': destination preflight failed: {:#}",
139 export.name,
140 e
141 )
142 })?;
143 }
144
145 if show_type_report {
146 let policy = if strict {
147 TypePolicy::strict()
148 } else {
149 TypePolicy::warn_only()
150 };
151
152 let mut any_fatal = false;
153 for export in &exports {
154 let column_overrides =
155 crate::plan::parse_column_overrides_pub(&export.columns, &export.name)?;
156 if let Some(t) = export.target.as_deref()
160 && crate::types::target::ExportTarget::parse(t).is_none()
161 {
162 anyhow::bail!(
163 "export '{}': unknown target '{t}' (expected: bigquery, duckdb)",
164 export.name
165 );
166 }
167 let eff_target = target.or_else(|| {
168 export
169 .target
170 .as_deref()
171 .and_then(crate::types::target::ExportTarget::parse)
172 });
173 let config_dir = std::path::Path::new(config_path)
174 .parent()
175 .unwrap_or_else(|| std::path::Path::new("."));
176 match type_report::collect_report(
177 &config,
178 export,
179 &column_overrides,
180 &policy,
181 eff_target,
182 config_dir,
183 params,
184 ) {
185 Ok(report) => {
186 if report.has_fatal() {
187 any_fatal = true;
188 }
189 if eff_target.is_some() && report.has_target_fail() {
190 any_fatal = true;
191 }
192 if json_output {
193 type_report::print_json(&report)?;
194 } else {
195 type_report::print_table(&report, eff_target);
196 }
197 }
198 Err(e) => {
199 log::warn!("type report for '{}' failed: {:#}", export.name, e);
200 }
201 }
202 }
203
204 if strict && any_fatal {
205 anyhow::bail!("strict mode: unsafe type mappings found (see report above)");
206 }
207 }
208
209 Ok(())
210}
211
212fn print_diagnostic(diag: &ExportDiagnostic) {
213 println!();
214 println!("Export: {}", diag.export_name);
215 println!(" Strategy: {}", diag.strategy);
216 println!(" Mode: {}", diag.mode);
217 if let Some(est) = diag.row_estimate {
218 if est >= 1_000_000 {
219 println!(" Row estimate: ~{}M", est / 1_000_000);
220 } else if est >= 1_000 {
221 println!(" Row estimate: ~{}K", est / 1_000);
222 } else {
223 println!(" Row estimate: ~{}", est);
224 }
225 }
226 if let (Some(min_v), Some(max_v)) = (&diag.cursor_min, &diag.cursor_max) {
227 println!(" Cursor range: {} .. {}", min_v, max_v);
228 }
229 if let Some(col) = &diag.cursor_column {
230 println!(" Cursor col: {}", col);
231 }
232 if let Some(scan) = &diag.scan_type {
233 let idx_label = if diag.uses_index { " (indexed)" } else { "" };
234 println!(" Scan type: {}{}", scan, idx_label);
235 }
236 println!(" Verdict: {}", diag.verdict);
237 println!(
238 " Recommended: tuning.profile: {}",
239 diag.recommended_profile
240 );
241 let (par_level, par_reason) = diag.recommended_parallel;
242 if par_level > 1 {
243 println!(" Recommended: parallel: {} ({})", par_level, par_reason);
244 } else {
245 println!(" Parallelism: {} ({})", par_level, par_reason);
246 }
247 for w in &diag.warnings {
248 println!(" Warning: {}", w);
249 }
250 if let Some(suggestion) = &diag.suggestion {
251 println!(" Suggestion: {}", suggestion);
252 }
253}
254
255#[cfg(test)]
256mod tests {
257 use super::*;
258 use crate::config::{DestinationConfig, DestinationType, ExportConfig, ExportMode, FormatType};
259 use doctor::{
260 categorize_dest_error, categorize_source_error, destination_error_hint, source_error_hint,
261 };
262
263 fn make_export(name: &str, mode: ExportMode, cursor: Option<&str>) -> ExportConfig {
264 ExportConfig {
267 mode,
268 cursor_column: cursor.map(|s| s.to_string()),
269 query: Some("SELECT * FROM t".to_string()),
270 format: FormatType::Csv,
271 destination: DestinationConfig {
272 destination_type: DestinationType::Local,
273 path: Some("./out".to_string()),
274 ..Default::default()
275 },
276 ..crate::config::sample_export(name)
277 }
278 }
279
280 #[test]
281 fn verdict_small_indexed_with_cursor_is_efficient() {
282 let v = compute_verdict(Some(500_000), true, true);
283 assert!(matches!(v, HealthVerdict::Efficient), "got: {v}");
284 }
285
286 #[test]
287 fn verdict_large_indexed_with_cursor_is_acceptable() {
288 let v = compute_verdict(Some(20_000_000), true, true);
289 assert!(matches!(v, HealthVerdict::Acceptable), "got: {v}");
290 }
291
292 #[test]
293 fn verdict_no_index_no_cursor_is_degraded() {
294 let v = compute_verdict(Some(500_000), false, false);
295 assert!(matches!(v, HealthVerdict::Degraded), "got: {v}");
296 }
297
298 #[test]
299 fn verdict_huge_no_index_is_unsafe() {
300 let v = compute_verdict(Some(100_000_000), false, false);
301 assert!(matches!(v, HealthVerdict::Unsafe), "got: {v}");
302 }
303
304 #[test]
305 fn parse_pg_row_estimate_from_sort_plan() {
306 let plan = "Sort (cost=12345.67..12456.78 rows=1000455 width=50)\n -> Seq Scan on orders (cost=0.00..8765.43 rows=1000455 width=50)";
307 assert_eq!(parse_pg_row_estimate(plan), Some(1_000_455));
308 }
309
310 #[test]
311 fn parse_pg_row_estimate_from_index_scan() {
312 let plan =
313 "Index Scan using idx_updated on orders (cost=0.42..81676.36 rows=500000 width=50)";
314 assert_eq!(parse_pg_row_estimate(plan), Some(500_000));
315 }
316
317 #[test]
318 fn extract_scan_type_detects_seq_scan() {
319 let plan = "Sort (cost=...)\n -> Seq Scan on users (cost=...)";
320 let st = extract_scan_type(plan);
321 assert!(st.contains("Seq Scan"), "expected Seq Scan, got: {st}");
322 }
323
324 #[test]
325 fn extract_scan_type_detects_index_scan() {
326 let plan = "Index Scan using users_pkey on users (cost=0.42..123.45 rows=100 width=50)";
327 let st = extract_scan_type(plan);
328 assert!(st.contains("Index Scan"), "expected Index Scan, got: {st}");
329 }
330
331 #[test]
332 fn suggestion_for_efficient_verdict_is_none() {
333 let e = make_export("t", ExportMode::Full, None);
334 let s = build_suggestion(&HealthVerdict::Efficient, Some(1000), true, &e);
335 assert!(
336 s.is_none(),
337 "efficient verdict should produce no suggestion"
338 );
339 }
340
341 #[test]
342 fn suggestion_for_degraded_verdict_recommends_safe_profile() {
343 let e = make_export("t", ExportMode::Full, None);
344 let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e);
345 let msg = s.expect("degraded verdict should produce a suggestion");
346 assert!(
347 msg.contains("safe"),
348 "suggestion should recommend safe profile, got: {msg}"
349 );
350 }
351
352 fn src_err(msg: &str) -> &'static str {
353 categorize_source_error(&anyhow::anyhow!("{}", msg))
354 }
355
356 #[test]
357 fn source_password_rejected_is_auth_error() {
358 assert_eq!(
359 src_err("password authentication failed for user \"rivet\""),
360 "auth error"
361 );
362 }
363
364 #[test]
365 fn source_authentication_failed_is_auth_error() {
366 assert_eq!(src_err("FATAL: authentication failed"), "auth error");
367 }
368
369 #[test]
370 fn source_access_denied_is_auth_error() {
371 assert_eq!(
372 src_err("Access denied for user 'rivet'@'localhost'"),
373 "auth error"
374 );
375 }
376
377 #[test]
378 fn source_connection_refused_is_connectivity() {
379 assert_eq!(
380 src_err("connection refused (os error 61)"),
381 "connectivity error"
382 );
383 }
384
385 #[test]
386 fn source_timed_out_is_connectivity() {
387 assert_eq!(src_err("connection timed out"), "connectivity error");
388 }
389
390 #[test]
391 fn source_dns_translate_host_is_connectivity() {
392 assert_eq!(
393 src_err("could not translate host name \"db.bad\" to address"),
394 "connectivity error"
395 );
396 }
397
398 #[test]
399 fn source_name_not_known_is_connectivity() {
400 assert_eq!(src_err("Name or service not known"), "connectivity error");
401 }
402
403 #[test]
404 fn source_unknown_error_is_generic() {
405 assert_eq!(src_err("something totally unexpected"), "error");
406 }
407
408 fn dest_config(dtype: DestinationType) -> DestinationConfig {
409 DestinationConfig {
410 destination_type: dtype,
411 bucket: Some("b".to_string()),
412 ..Default::default()
413 }
414 }
415
416 fn dest_err(msg: &str, dtype: DestinationType) -> &'static str {
417 let cfg = dest_config(dtype);
418 categorize_dest_error(&anyhow::anyhow!("{}", msg), &cfg)
419 }
420
421 #[test]
422 fn dest_credential_loading_is_auth_error() {
423 assert_eq!(
424 dest_err(
425 "loading credential to sign http request",
426 DestinationType::Gcs
427 ),
428 "auth error"
429 );
430 }
431
432 #[test]
433 fn dest_permission_denied_is_auth_error() {
434 assert_eq!(
435 dest_err("permission denied on resource bucket", DestinationType::S3),
436 "auth error"
437 );
438 }
439
440 #[test]
441 fn dest_forbidden_is_auth_error() {
442 assert_eq!(
443 dest_err("403 Forbidden", DestinationType::Gcs),
444 "auth error"
445 );
446 }
447
448 #[test]
449 fn dest_unauthorized_is_auth_error() {
450 assert_eq!(
451 dest_err("401 Unauthorized", DestinationType::S3),
452 "auth error"
453 );
454 }
455
456 #[test]
457 fn dest_invalid_grant_is_auth_error() {
458 assert_eq!(
459 dest_err(
460 "invalid_grant: token has been revoked",
461 DestinationType::Gcs
462 ),
463 "auth error"
464 );
465 }
466
467 #[test]
468 fn dest_nosuchbucket_s3_is_bucket_not_found() {
469 assert_eq!(
470 dest_err(
471 "NoSuchBucket: the specified bucket does not exist",
472 DestinationType::S3
473 ),
474 "bucket not found"
475 );
476 }
477
478 #[test]
479 fn dest_not_found_gcs_is_bucket_not_found() {
480 assert_eq!(
481 dest_err("bucket not found (404)", DestinationType::Gcs),
482 "bucket not found"
483 );
484 }
485
486 #[test]
487 fn dest_not_found_local_is_path_not_found() {
488 assert_eq!(
489 dest_err("path not found: /tmp/missing", DestinationType::Local),
490 "path not found"
491 );
492 }
493
494 #[test]
495 fn dest_connection_refused_is_connectivity() {
496 assert_eq!(
497 dest_err("connection refused to endpoint", DestinationType::S3),
498 "connectivity error"
499 );
500 }
501
502 #[test]
503 fn dest_dns_error_is_connectivity() {
504 assert_eq!(
505 dest_err("dns error: failed to lookup address", DestinationType::S3),
506 "connectivity error"
507 );
508 }
509
510 #[test]
511 fn dest_timed_out_is_connectivity() {
512 assert_eq!(
513 dest_err("request timed out after 30s", DestinationType::Gcs),
514 "connectivity error"
515 );
516 }
517
518 #[test]
519 fn dest_unknown_error_is_generic() {
520 assert_eq!(
521 dest_err("something else entirely", DestinationType::S3),
522 "error"
523 );
524 }
525
526 #[test]
527 fn strategy_full_scan() {
528 let e = make_export("t", ExportMode::Full, None);
529 assert_eq!(derive_strategy(&e), "full-scan");
530 }
531
532 #[test]
533 fn strategy_full_parallel() {
534 let mut e = make_export("t", ExportMode::Full, None);
535 e.parallel = 4;
536 assert_eq!(derive_strategy(&e), "full-parallel(4)");
537 }
538
539 #[test]
540 fn strategy_incremental() {
541 let e = make_export("t", ExportMode::Incremental, Some("updated_at"));
542 assert_eq!(derive_strategy(&e), "incremental(updated_at)");
543 }
544
545 #[test]
546 fn strategy_chunked() {
547 let mut e = make_export("t", ExportMode::Chunked, None);
548 e.chunk_column = Some("id".to_string());
549 e.chunk_size = 50_000;
550 assert_eq!(derive_strategy(&e), "chunked(id, size=50000)");
551 }
552
553 #[test]
554 fn strategy_chunked_parallel() {
555 let mut e = make_export("t", ExportMode::Chunked, None);
556 e.chunk_column = Some("id".to_string());
557 e.chunk_size = 50_000;
558 e.parallel = 3;
559 assert_eq!(derive_strategy(&e), "chunked-parallel(id, size=50000, p=3)");
560 }
561
562 #[test]
563 fn strategy_time_window() {
564 let mut e = make_export("t", ExportMode::TimeWindow, None);
565 e.time_column = Some("created_at".to_string());
566 e.days_window = Some(7);
567 assert_eq!(derive_strategy(&e), "time-window(created_at, 7d)");
568 }
569
570 #[test]
571 fn profile_small_indexed_is_fast() {
572 let e = make_export("t", ExportMode::Full, None);
573 assert_eq!(recommend_profile(Some(500_000), true, &e), "fast");
574 }
575
576 #[test]
577 fn profile_medium_indexed_is_balanced() {
578 let e = make_export("t", ExportMode::Full, None);
579 assert_eq!(recommend_profile(Some(5_000_000), true, &e), "balanced");
580 }
581
582 #[test]
583 fn profile_large_indexed_is_safe() {
584 let e = make_export("t", ExportMode::Full, None);
585 assert_eq!(recommend_profile(Some(50_000_000), true, &e), "safe");
586 }
587
588 #[test]
589 fn profile_small_no_index_is_balanced() {
590 let e = make_export("t", ExportMode::Full, None);
591 assert_eq!(recommend_profile(Some(50_000), false, &e), "balanced");
592 }
593
594 #[test]
595 fn profile_small_no_index_parallel_is_safe() {
596 let mut e = make_export("t", ExportMode::Full, None);
597 e.parallel = 4;
598 assert_eq!(recommend_profile(Some(50_000), false, &e), "safe");
599 }
600
601 #[test]
602 fn profile_medium_no_index_is_balanced() {
603 let e = make_export("t", ExportMode::Full, None);
604 assert_eq!(recommend_profile(Some(500_000), false, &e), "balanced");
605 }
606
607 #[test]
608 fn profile_large_no_index_is_safe() {
609 let e = make_export("t", ExportMode::Full, None);
610 assert_eq!(recommend_profile(Some(5_000_000), false, &e), "safe");
611 }
612
613 #[test]
614 fn sparse_range_warning_when_very_sparse() {
615 let mut e = make_export("t", ExportMode::Chunked, None);
616 e.chunk_column = Some("id".to_string());
617 e.chunk_size = 100_000;
618 let w = check_sparse_range(&e, Some(100_000), Some("1"), Some("10000000"));
619 assert!(w.is_some(), "should warn about sparse range");
620 let msg = w.unwrap();
621 assert!(msg.contains("Sparse key range"), "got: {msg}");
622 assert!(msg.contains("empty"), "got: {msg}");
623 }
624
625 #[test]
626 fn sparse_range_no_warning_when_dense() {
627 let mut e = make_export("t", ExportMode::Chunked, None);
628 e.chunk_column = Some("id".to_string());
629 e.chunk_size = 100_000;
630 let w = check_sparse_range(&e, Some(100_000), Some("1"), Some("100000"));
631 assert!(w.is_none(), "should not warn for dense range");
632 }
633
634 #[test]
635 fn sparse_range_skipped_when_chunk_dense() {
636 let mut e = make_export("t", ExportMode::Chunked, None);
637 e.chunk_column = Some("id".to_string());
638 e.chunk_dense = true;
639 e.chunk_size = 100_000;
640 let w = check_sparse_range(&e, Some(100_000), Some("1"), Some("10000000"));
641 assert!(
642 w.is_none(),
643 "chunk_dense uses ordinals, not physical id span"
644 );
645 }
646
647 #[test]
648 fn dense_surrogate_warning_when_chunk_dense_builtin() {
649 let mut e = make_export("t", ExportMode::Chunked, None);
650 e.chunk_column = Some("id".to_string());
651 e.chunk_dense = true;
652 e.query = Some("SELECT id FROM orders".to_string());
653 let w = check_dense_surrogate_cost(&e);
654 assert!(w.is_some(), "should warn about built-in ROW_NUMBER cost");
655 assert!(w.unwrap().contains("global sort"));
656 }
657
658 #[test]
659 fn sparse_range_not_triggered_for_non_chunked() {
660 let e = make_export("t", ExportMode::Full, None);
661 let w = check_sparse_range(&e, Some(100), Some("1"), Some("1000000"));
662 assert!(w.is_none(), "should not warn for non-chunked mode");
663 }
664
665 #[test]
666 fn dense_surrogate_warning_with_row_number() {
667 let mut e = make_export("t", ExportMode::Chunked, None);
668 e.chunk_column = Some("rn".to_string());
669 e.query = Some("SELECT *, ROW_NUMBER() OVER (ORDER BY id) AS rn FROM orders".to_string());
670 let w = check_dense_surrogate_cost(&e);
671 assert!(w.is_some(), "should warn about ROW_NUMBER cost");
672 assert!(w.unwrap().contains("global sort"));
673 }
674
675 #[test]
676 fn no_dense_surrogate_warning_without_row_number() {
677 let mut e = make_export("t", ExportMode::Chunked, None);
678 e.chunk_column = Some("id".to_string());
679 e.query = Some("SELECT * FROM orders".to_string());
680 let w = check_dense_surrogate_cost(&e);
681 assert!(w.is_none());
682 }
683
684 #[test]
685 fn no_dense_surrogate_warning_for_non_chunked() {
686 let mut e = make_export("t", ExportMode::Full, None);
687 e.query = Some("SELECT ROW_NUMBER() OVER () AS rn FROM t".to_string());
688 let w = check_dense_surrogate_cost(&e);
689 assert!(w.is_none(), "should not warn for non-chunked mode");
690 }
691
692 #[test]
693 fn parallel_memory_warning_large_dataset() {
694 let mut e = make_export("t", ExportMode::Chunked, None);
695 e.parallel = 4;
696 let w = check_parallel_memory_risk(&e, Some(10_000_000));
697 assert!(w.is_some(), "should warn about memory risk");
698 let msg = w.unwrap();
699 assert!(msg.contains("Parallel=4"), "got: {msg}");
700 assert!(msg.contains("memory"), "got: {msg}");
701 }
702
703 #[test]
704 fn no_parallel_memory_warning_small_dataset() {
705 let mut e = make_export("t", ExportMode::Chunked, None);
706 e.parallel = 4;
707 let w = check_parallel_memory_risk(&e, Some(1_000));
708 assert!(w.is_none(), "should not warn for small dataset");
709 }
710
711 #[test]
712 fn no_parallel_memory_warning_single_worker() {
713 let e = make_export("t", ExportMode::Full, None);
714 let w = check_parallel_memory_risk(&e, Some(100_000_000));
715 assert!(w.is_none(), "should not warn when parallel=1");
716 }
717
718 #[test]
719 fn suggestion_degraded_full_recommends_incremental() {
720 let e = make_export("t", ExportMode::Full, None);
721 let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e).unwrap();
722 assert!(s.contains("incremental"), "got: {s}");
723 }
724
725 #[test]
726 fn suggestion_degraded_chunked_recommends_index() {
727 let mut e = make_export("t", ExportMode::Chunked, None);
728 e.chunk_column = Some("id".to_string());
729 let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e).unwrap();
730 assert!(s.contains("index on 'id'"), "got: {s}");
731 }
732
733 #[test]
734 fn suggestion_degraded_time_window_recommends_index() {
735 let mut e = make_export("t", ExportMode::TimeWindow, None);
736 e.time_column = Some("created_at".to_string());
737 e.days_window = Some(7);
738 let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e).unwrap();
739 assert!(s.contains("index on 'created_at'"), "got: {s}");
740 }
741
742 #[test]
743 fn suggestion_unsafe_full_recommends_incremental() {
744 let e = make_export("t", ExportMode::Full, None);
745 let s = build_suggestion(&HealthVerdict::Unsafe, Some(100_000_000), false, &e).unwrap();
746 assert!(s.contains("incremental"), "got: {s}");
747 }
748
749 #[test]
750 fn suggestion_unsafe_chunked_recommends_index_and_parallel() {
751 let mut e = make_export("t", ExportMode::Chunked, None);
752 e.chunk_column = Some("id".to_string());
753 let s = build_suggestion(&HealthVerdict::Unsafe, Some(100_000_000), false, &e).unwrap();
754 assert!(s.contains("index on 'id'"), "got: {s}");
755 assert!(s.contains("parallel"), "got: {s}");
756 }
757
758 #[test]
759 fn suggestion_unsafe_incremental_recommends_index_on_cursor() {
760 let e = make_export("t", ExportMode::Incremental, Some("updated_at"));
761 let s = build_suggestion(&HealthVerdict::Unsafe, Some(100_000_000), false, &e).unwrap();
762 assert!(s.contains("index on 'updated_at'"), "got: {s}");
763 }
764
765 #[test]
766 fn suggestion_acceptable_large_full_recommends_incremental() {
767 let e = make_export("t", ExportMode::Full, None);
768 let s = build_suggestion(&HealthVerdict::Acceptable, Some(20_000_000), true, &e).unwrap();
769 assert!(s.contains("incremental"), "got: {s}");
770 }
771
772 #[test]
773 fn parallel_only_for_chunked_mode() {
774 let e = make_export("t", ExportMode::Full, None);
775 let (level, _) = recommend_parallelism(&e, Some(1_000_000), true);
776 assert_eq!(level, 1, "non-chunked mode should recommend 1");
777 }
778
779 #[test]
780 fn parallel_small_dataset_is_one() {
781 let mut e = make_export("t", ExportMode::Chunked, None);
782 e.chunk_column = Some("id".to_string());
783 let (level, _) = recommend_parallelism(&e, Some(10_000), true);
784 assert_eq!(level, 1, "small dataset should recommend 1");
785 }
786
787 #[test]
788 fn parallel_moderate_indexed_is_two() {
789 let mut e = make_export("t", ExportMode::Chunked, None);
790 e.chunk_column = Some("id".to_string());
791 let (level, _) = recommend_parallelism(&e, Some(200_000), true);
792 assert_eq!(level, 2, "moderate indexed dataset should recommend 2");
793 }
794
795 #[test]
796 fn parallel_large_indexed_is_four() {
797 let mut e = make_export("t", ExportMode::Chunked, None);
798 e.chunk_column = Some("id".to_string());
799 let (level, _) = recommend_parallelism(&e, Some(2_000_000), true);
800 assert_eq!(level, 4, "large indexed dataset should recommend 4");
801 }
802
803 #[test]
804 fn parallel_no_index_large_is_one() {
805 let mut e = make_export("t", ExportMode::Chunked, None);
806 e.chunk_column = Some("id".to_string());
807 let (level, reason) = recommend_parallelism(&e, Some(10_000_000), false);
808 assert_eq!(level, 1, "no index + large should recommend 1");
809 assert!(reason.contains("no index"), "got: {reason}");
810 }
811
812 #[test]
813 fn parallel_no_index_moderate_is_conservative() {
814 let mut e = make_export("t", ExportMode::Chunked, None);
815 e.chunk_column = Some("id".to_string());
816 let (level, _) = recommend_parallelism(&e, Some(200_000), false);
817 assert_eq!(
818 level, 2,
819 "no index + moderate should recommend 2 (conservative)"
820 );
821 }
822
823 #[test]
824 fn suggestion_acceptable_large_chunked_recommends_parallel() {
825 let mut e = make_export("t", ExportMode::Chunked, None);
826 e.chunk_column = Some("id".to_string());
827 let s = build_suggestion(&HealthVerdict::Acceptable, Some(20_000_000), true, &e).unwrap();
828 assert!(s.contains("parallel"), "got: {s}");
829 }
830
831 #[test]
832 fn connection_limit_warn_when_parallel_meets_max() {
833 let w = check_connection_limit(20, Some(20));
834 assert!(w.is_some(), "should warn when parallel == max_connections");
835 let msg = w.unwrap();
836 assert!(msg.contains("max_connections=20"), "got: {msg}");
837 assert!(msg.contains("parallel=20"), "got: {msg}");
838 }
839
840 #[test]
841 fn connection_limit_warn_when_parallel_exceeds_max() {
842 let w = check_connection_limit(100, Some(20));
843 assert!(w.is_some(), "should warn when parallel > max_connections");
844 let msg = w.unwrap();
845 assert!(msg.contains("max_connections=20"), "got: {msg}");
846 }
847
848 #[test]
849 fn connection_limit_no_warn_when_parallel_below_max() {
850 let w = check_connection_limit(4, Some(100));
851 assert!(
852 w.is_none(),
853 "should not warn when parallel << max_connections"
854 );
855 }
856
857 #[test]
858 fn connection_limit_no_warn_when_parallel_is_one() {
859 let w = check_connection_limit(1, Some(5));
860 assert!(
861 w.is_none(),
862 "single worker never triggers connection warning"
863 );
864 }
865
866 #[test]
867 fn connection_limit_skipped_note_when_max_unknown_and_parallel_gt_one() {
868 let w = check_connection_limit(100, None);
869 assert!(w.is_some(), "should note that check was skipped");
870 let msg = w.unwrap();
871 assert!(msg.contains("skipped"), "got: {msg}");
872 }
873
874 #[test]
875 fn connection_limit_no_note_when_max_unknown_and_parallel_is_one() {
876 let w = check_connection_limit(1, None);
877 assert!(
878 w.is_none(),
879 "single worker never triggers connection warning"
880 );
881 }
882
883 #[test]
884 fn connection_limit_suggests_headroom() {
885 let w = check_connection_limit(25, Some(20)).unwrap();
886 assert!(
888 w.contains("17"),
889 "should suggest leaving headroom, got: {w}"
890 );
891 }
892
893 fn src_hint(msg: &str, st: SourceType) -> Option<&'static str> {
896 let err = anyhow::anyhow!("{}", msg);
897 let cat = categorize_source_error(&err);
898 source_error_hint(cat, &err, &st)
899 }
900
901 fn dest_hint(msg: &str, dt: DestinationType) -> Option<&'static str> {
902 let err = anyhow::anyhow!("{}", msg);
903 let dest = DestinationConfig {
904 destination_type: dt,
905 bucket: Some("b".into()),
906 ..Default::default()
907 };
908 let cat = categorize_dest_error(&err, &dest);
909 destination_error_hint(cat, &dest)
910 }
911
912 #[test]
913 fn source_tls_handshake_returns_pg_specific_tls_hint() {
914 let h = src_hint("TLS handshake failed", SourceType::Postgres).expect("hint");
915 assert!(h.contains("tls.mode") && h.contains("ca_file"), "got: {h}");
916 }
917
918 #[test]
919 fn source_tls_handshake_returns_mysql_specific_tls_hint() {
920 let h = src_hint("certificate verify failed", SourceType::Mysql).expect("hint");
921 assert!(h.contains("tls.mode"), "got: {h}");
922 }
923
924 #[test]
925 fn source_auth_error_postgres_mentions_pg_hba() {
926 let h = src_hint("password authentication failed", SourceType::Postgres).expect("hint");
927 assert!(h.contains("pg_hba") && h.contains("SELECT"), "got: {h}");
928 }
929
930 #[test]
931 fn source_auth_error_mysql_mentions_grant() {
932 let h = src_hint(
933 "Access denied for user 'rivet'@'localhost'",
934 SourceType::Mysql,
935 )
936 .expect("hint");
937 assert!(h.contains("GRANT") && h.contains("FLUSH"), "got: {h}");
938 }
939
940 #[test]
941 fn source_connectivity_error_mentions_bastion_and_network() {
942 let h = src_hint("connection refused", SourceType::Postgres).expect("hint");
943 assert!(h.contains("bastion") || h.contains("VPN"), "got: {h}");
944 }
945
946 #[test]
947 fn source_unknown_error_returns_no_hint() {
948 let h = src_hint("totally unexpected", SourceType::Postgres);
951 assert!(h.is_none(), "unknown errors should not produce a hint");
952 }
953
954 #[test]
955 fn dest_s3_auth_error_names_concrete_actions() {
956 let h = dest_hint("permission denied", DestinationType::S3).expect("hint");
957 assert!(
958 h.contains("s3:PutObject") && h.contains("cloud-permissions"),
959 "got: {h}"
960 );
961 }
962
963 #[test]
964 fn dest_gcs_auth_error_names_concrete_actions() {
965 let h = dest_hint("403 Forbidden", DestinationType::Gcs).expect("hint");
966 assert!(
967 h.contains("storage.objects") && h.contains("cloud-permissions"),
968 "got: {h}"
969 );
970 }
971
972 #[test]
973 fn categorize_dest_error_sas_expired_message_returns_sas_expired_category() {
974 let err = anyhow::anyhow!(
979 "Azure SAS token already expired (se=2024-01-01T00:00:00Z). Generate a new SAS and re-export."
980 );
981 let dest = DestinationConfig {
982 destination_type: DestinationType::Azure,
983 bucket: Some("c".into()),
984 ..Default::default()
985 };
986 let cat = categorize_dest_error(&err, &dest);
987 assert_eq!(
988 cat, "sas expired",
989 "expired-SAS error must categorise as 'sas expired', not '{cat}' — ordering in categorize_dest_error is load-bearing"
990 );
991 }
992
993 #[test]
994 fn dest_azure_sas_expired_returns_regenerate_hint() {
995 let h = dest_hint(
999 "Azure SAS token already expired (se=2024-01-01T00:00:00Z)",
1000 DestinationType::Azure,
1001 )
1002 .expect("hint");
1003 assert!(
1004 h.contains("generate-sas") && h.contains("AZURE_STORAGE_SAS_TOKEN"),
1005 "got: {h}"
1006 );
1007 }
1008
1009 #[test]
1010 fn dest_s3_bucket_not_found_says_no_auto_create() {
1011 let h = dest_hint("NoSuchBucket", DestinationType::S3).expect("hint");
1012 assert!(
1013 h.contains("does NOT auto-create") && h.contains("aws s3 mb"),
1014 "got: {h}"
1015 );
1016 }
1017
1018 #[test]
1019 fn dest_s3_connectivity_error_warns_about_region_mismatch() {
1020 let h = dest_hint("dns error", DestinationType::S3).expect("hint");
1021 assert!(h.contains("region") || h.contains("endpoint"), "got: {h}");
1022 }
1023}