1mod analysis;
2pub(crate) mod cursor_expr;
3mod doctor;
4mod mysql;
5mod postgres;
6pub mod type_report;
7
8pub(crate) use analysis::chunk_sparsity_from_counts;
9#[cfg(test)]
10use analysis::{
11 build_suggestion, check_connection_limit, check_dense_surrogate_cost,
12 check_parallel_memory_risk, check_sparse_range, compute_verdict, derive_strategy,
13 recommend_parallelism, recommend_profile,
14};
15#[allow(unused_imports)]
16pub use doctor::doctor;
17#[cfg(test)]
18use postgres::{extract_scan_type, parse_pg_row_estimate};
19
20use crate::config::{Config, ExportConfig, SourceType};
21use crate::error::Result;
22use crate::types::policy::TypePolicy;
23use crate::types::target::ExportTarget;
24
25#[derive(Debug)]
26pub enum HealthVerdict {
27 Efficient,
28 Acceptable,
29 Degraded,
30 Unsafe,
31}
32
33impl std::fmt::Display for HealthVerdict {
34 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35 match self {
36 Self::Efficient => write!(f, "EFFICIENT"),
37 Self::Acceptable => write!(f, "ACCEPTABLE"),
38 Self::Degraded => write!(f, "DEGRADED"),
39 Self::Unsafe => write!(f, "UNSAFE"),
40 }
41 }
42}
43
44pub(crate) struct ExportDiagnostic {
45 pub export_name: String,
46 pub strategy: String,
47 pub mode: String,
48 pub cursor_column: Option<String>,
49 pub row_estimate: Option<i64>,
50 pub cursor_min: Option<String>,
51 pub cursor_max: Option<String>,
52 pub scan_type: Option<String>,
53 pub uses_index: bool,
54 pub verdict: HealthVerdict,
55 pub recommended_profile: &'static str,
56 pub recommended_parallel: (u32, &'static str),
57 pub warnings: Vec<String>,
58 pub suggestion: Option<String>,
59}
60
61pub(crate) fn get_export_diagnostic(
65 config: &Config,
66 export: &ExportConfig,
67) -> Result<ExportDiagnostic> {
68 let url = config.source.resolve_url()?;
69 let tls = config.source.tls.as_ref();
70 crate::source::warn_if_tls_disabled(&config.source);
71 match config.source.source_type {
72 SourceType::Postgres => postgres::diagnose_export_pg(&url, tls, export),
73 SourceType::Mysql => mysql::diagnose_export_mysql(&url, tls, export),
74 SourceType::Mssql => {
75 anyhow::bail!("mssql preflight diagnostics not yet implemented (scaffold)")
76 }
77 }
78}
79
80pub fn check(
81 config_path: &str,
82 export_name: Option<&str>,
83 params: Option<&std::collections::HashMap<String, String>>,
84 show_type_report: bool,
85 strict: bool,
86 json_output: bool,
87 target: Option<ExportTarget>,
88) -> Result<()> {
89 let config = Config::load_with_params(config_path, params)?;
90
91 let exports: Vec<&ExportConfig> = if let Some(name) = export_name {
92 let e = config
93 .exports
94 .iter()
95 .find(|e| e.name == name)
96 .ok_or_else(|| anyhow::anyhow!("export '{}' not found in config", name))?;
97 vec![e]
98 } else {
99 config.exports.iter().collect()
100 };
101
102 let url = config.source.resolve_url()?;
103 let tls = config.source.tls.as_ref();
104 crate::source::warn_if_tls_disabled(&config.source);
110 match config.source.source_type {
111 SourceType::Postgres => postgres::check_postgres(&url, tls, &exports, json_output)?,
112 SourceType::Mysql => mysql::check_mysql(&url, tls, &exports, json_output)?,
113 SourceType::Mssql => anyhow::bail!("mssql check not yet implemented (scaffold)"),
114 }
115
116 let mut seen_destinations: std::collections::HashSet<String> = std::collections::HashSet::new();
125 for export in &exports {
126 let dest_key = format!(
127 "{:?}:{}:{}:{}",
128 export.destination.destination_type,
129 export.destination.bucket.as_deref().unwrap_or("-"),
130 export.destination.endpoint.as_deref().unwrap_or("-"),
131 export.destination.path.as_deref().unwrap_or("-"),
132 );
133 if !seen_destinations.insert(dest_key) {
134 continue;
135 }
136 let expanded = crate::plan::build::expand_destination_templates(
137 export.destination.clone(),
138 &export.name,
139 );
140 crate::destination::create_destination(&expanded).map_err(|e| {
141 anyhow::anyhow!(
142 "export '{}': destination preflight failed: {:#}",
143 export.name,
144 e
145 )
146 })?;
147 }
148
149 if show_type_report {
150 let policy = if strict {
151 TypePolicy::strict()
152 } else {
153 TypePolicy::warn_only()
154 };
155
156 let mut any_fatal = false;
157 for export in &exports {
158 let column_overrides =
159 crate::plan::parse_column_overrides_pub(&export.columns, &export.name)?;
160 if let Some(t) = export.target.as_deref()
164 && crate::types::target::ExportTarget::parse(t).is_none()
165 {
166 anyhow::bail!(
167 "export '{}': unknown target '{t}' (expected: bigquery, duckdb)",
168 export.name
169 );
170 }
171 let eff_target = target.or_else(|| {
172 export
173 .target
174 .as_deref()
175 .and_then(crate::types::target::ExportTarget::parse)
176 });
177 let config_dir = std::path::Path::new(config_path)
178 .parent()
179 .unwrap_or_else(|| std::path::Path::new("."));
180 match type_report::collect_report(
181 &config,
182 export,
183 &column_overrides,
184 &policy,
185 eff_target,
186 config_dir,
187 params,
188 ) {
189 Ok(report) => {
190 if report.has_fatal() {
191 any_fatal = true;
192 }
193 if eff_target.is_some() && report.has_target_fail() {
194 any_fatal = true;
195 }
196 if json_output {
197 type_report::print_json(&report)?;
198 } else {
199 type_report::print_table(&report, eff_target);
200 }
201 }
202 Err(e) => {
203 log::warn!("type report for '{}' failed: {:#}", export.name, e);
204 }
205 }
206 }
207
208 if strict && any_fatal {
209 anyhow::bail!("strict mode: unsafe type mappings found (see report above)");
210 }
211 }
212
213 Ok(())
214}
215
216fn print_diagnostic(diag: &ExportDiagnostic) {
217 println!();
218 println!("Export: {}", diag.export_name);
219 println!(" Strategy: {}", diag.strategy);
220 println!(" Mode: {}", diag.mode);
221 if let Some(est) = diag.row_estimate {
222 if est >= 1_000_000 {
223 println!(" Row estimate: ~{}M", est / 1_000_000);
224 } else if est >= 1_000 {
225 println!(" Row estimate: ~{}K", est / 1_000);
226 } else {
227 println!(" Row estimate: ~{}", est);
228 }
229 }
230 if let (Some(min_v), Some(max_v)) = (&diag.cursor_min, &diag.cursor_max) {
231 println!(" Cursor range: {} .. {}", min_v, max_v);
232 }
233 if let Some(col) = &diag.cursor_column {
234 println!(" Cursor col: {}", col);
235 }
236 if let Some(scan) = &diag.scan_type {
237 let idx_label = if diag.uses_index { " (indexed)" } else { "" };
238 println!(" Scan type: {}{}", scan, idx_label);
239 }
240 println!(" Verdict: {}", diag.verdict);
241 println!(
242 " Recommended: tuning.profile: {}",
243 diag.recommended_profile
244 );
245 let (par_level, par_reason) = diag.recommended_parallel;
246 if par_level > 1 {
247 println!(" Recommended: parallel: {} ({})", par_level, par_reason);
248 } else {
249 println!(" Parallelism: {} ({})", par_level, par_reason);
250 }
251 for w in &diag.warnings {
252 println!(" Warning: {}", w);
253 }
254 if let Some(suggestion) = &diag.suggestion {
255 println!(" Suggestion: {}", suggestion);
256 }
257}
258
259#[cfg(test)]
260mod tests {
261 use super::*;
262 use crate::config::{DestinationConfig, DestinationType, ExportConfig, ExportMode, FormatType};
263 use doctor::{
264 categorize_dest_error, categorize_source_error, destination_error_hint, source_error_hint,
265 };
266
267 fn make_export(name: &str, mode: ExportMode, cursor: Option<&str>) -> ExportConfig {
268 ExportConfig {
271 mode,
272 cursor_column: cursor.map(|s| s.to_string()),
273 query: Some("SELECT * FROM t".to_string()),
274 format: FormatType::Csv,
275 destination: DestinationConfig {
276 destination_type: DestinationType::Local,
277 path: Some("./out".to_string()),
278 ..Default::default()
279 },
280 ..crate::config::sample_export(name)
281 }
282 }
283
284 #[test]
285 fn verdict_small_indexed_with_cursor_is_efficient() {
286 let v = compute_verdict(Some(500_000), true, true);
287 assert!(matches!(v, HealthVerdict::Efficient), "got: {v}");
288 }
289
290 #[test]
291 fn verdict_large_indexed_with_cursor_is_acceptable() {
292 let v = compute_verdict(Some(20_000_000), true, true);
293 assert!(matches!(v, HealthVerdict::Acceptable), "got: {v}");
294 }
295
296 #[test]
297 fn verdict_no_index_no_cursor_is_degraded() {
298 let v = compute_verdict(Some(500_000), false, false);
299 assert!(matches!(v, HealthVerdict::Degraded), "got: {v}");
300 }
301
302 #[test]
303 fn verdict_huge_no_index_is_unsafe() {
304 let v = compute_verdict(Some(100_000_000), false, false);
305 assert!(matches!(v, HealthVerdict::Unsafe), "got: {v}");
306 }
307
308 #[test]
309 fn parse_pg_row_estimate_from_sort_plan() {
310 let plan = "Sort (cost=12345.67..12456.78 rows=1000455 width=50)\n -> Seq Scan on orders (cost=0.00..8765.43 rows=1000455 width=50)";
311 assert_eq!(parse_pg_row_estimate(plan), Some(1_000_455));
312 }
313
314 #[test]
315 fn parse_pg_row_estimate_from_index_scan() {
316 let plan =
317 "Index Scan using idx_updated on orders (cost=0.42..81676.36 rows=500000 width=50)";
318 assert_eq!(parse_pg_row_estimate(plan), Some(500_000));
319 }
320
321 #[test]
322 fn extract_scan_type_detects_seq_scan() {
323 let plan = "Sort (cost=...)\n -> Seq Scan on users (cost=...)";
324 let st = extract_scan_type(plan);
325 assert!(st.contains("Seq Scan"), "expected Seq Scan, got: {st}");
326 }
327
328 #[test]
329 fn extract_scan_type_detects_index_scan() {
330 let plan = "Index Scan using users_pkey on users (cost=0.42..123.45 rows=100 width=50)";
331 let st = extract_scan_type(plan);
332 assert!(st.contains("Index Scan"), "expected Index Scan, got: {st}");
333 }
334
335 #[test]
336 fn suggestion_for_efficient_verdict_is_none() {
337 let e = make_export("t", ExportMode::Full, None);
338 let s = build_suggestion(&HealthVerdict::Efficient, Some(1000), true, &e);
339 assert!(
340 s.is_none(),
341 "efficient verdict should produce no suggestion"
342 );
343 }
344
345 #[test]
346 fn suggestion_for_degraded_verdict_recommends_safe_profile() {
347 let e = make_export("t", ExportMode::Full, None);
348 let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e);
349 let msg = s.expect("degraded verdict should produce a suggestion");
350 assert!(
351 msg.contains("safe"),
352 "suggestion should recommend safe profile, got: {msg}"
353 );
354 }
355
356 fn src_err(msg: &str) -> &'static str {
357 categorize_source_error(&anyhow::anyhow!("{}", msg))
358 }
359
360 #[test]
361 fn source_password_rejected_is_auth_error() {
362 assert_eq!(
363 src_err("password authentication failed for user \"rivet\""),
364 "auth error"
365 );
366 }
367
368 #[test]
369 fn source_authentication_failed_is_auth_error() {
370 assert_eq!(src_err("FATAL: authentication failed"), "auth error");
371 }
372
373 #[test]
374 fn source_access_denied_is_auth_error() {
375 assert_eq!(
376 src_err("Access denied for user 'rivet'@'localhost'"),
377 "auth error"
378 );
379 }
380
381 #[test]
382 fn source_connection_refused_is_connectivity() {
383 assert_eq!(
384 src_err("connection refused (os error 61)"),
385 "connectivity error"
386 );
387 }
388
389 #[test]
390 fn source_timed_out_is_connectivity() {
391 assert_eq!(src_err("connection timed out"), "connectivity error");
392 }
393
394 #[test]
395 fn source_dns_translate_host_is_connectivity() {
396 assert_eq!(
397 src_err("could not translate host name \"db.bad\" to address"),
398 "connectivity error"
399 );
400 }
401
402 #[test]
403 fn source_name_not_known_is_connectivity() {
404 assert_eq!(src_err("Name or service not known"), "connectivity error");
405 }
406
407 #[test]
408 fn source_unknown_error_is_generic() {
409 assert_eq!(src_err("something totally unexpected"), "error");
410 }
411
412 fn dest_config(dtype: DestinationType) -> DestinationConfig {
413 DestinationConfig {
414 destination_type: dtype,
415 bucket: Some("b".to_string()),
416 ..Default::default()
417 }
418 }
419
420 fn dest_err(msg: &str, dtype: DestinationType) -> &'static str {
421 let cfg = dest_config(dtype);
422 categorize_dest_error(&anyhow::anyhow!("{}", msg), &cfg)
423 }
424
425 #[test]
426 fn dest_credential_loading_is_auth_error() {
427 assert_eq!(
428 dest_err(
429 "loading credential to sign http request",
430 DestinationType::Gcs
431 ),
432 "auth error"
433 );
434 }
435
436 #[test]
437 fn dest_permission_denied_is_auth_error() {
438 assert_eq!(
439 dest_err("permission denied on resource bucket", DestinationType::S3),
440 "auth error"
441 );
442 }
443
444 #[test]
445 fn dest_forbidden_is_auth_error() {
446 assert_eq!(
447 dest_err("403 Forbidden", DestinationType::Gcs),
448 "auth error"
449 );
450 }
451
452 #[test]
453 fn dest_unauthorized_is_auth_error() {
454 assert_eq!(
455 dest_err("401 Unauthorized", DestinationType::S3),
456 "auth error"
457 );
458 }
459
460 #[test]
461 fn dest_invalid_grant_is_auth_error() {
462 assert_eq!(
463 dest_err(
464 "invalid_grant: token has been revoked",
465 DestinationType::Gcs
466 ),
467 "auth error"
468 );
469 }
470
471 #[test]
472 fn dest_nosuchbucket_s3_is_bucket_not_found() {
473 assert_eq!(
474 dest_err(
475 "NoSuchBucket: the specified bucket does not exist",
476 DestinationType::S3
477 ),
478 "bucket not found"
479 );
480 }
481
482 #[test]
483 fn dest_not_found_gcs_is_bucket_not_found() {
484 assert_eq!(
485 dest_err("bucket not found (404)", DestinationType::Gcs),
486 "bucket not found"
487 );
488 }
489
490 #[test]
491 fn dest_not_found_local_is_path_not_found() {
492 assert_eq!(
493 dest_err("path not found: /tmp/missing", DestinationType::Local),
494 "path not found"
495 );
496 }
497
498 #[test]
499 fn dest_connection_refused_is_connectivity() {
500 assert_eq!(
501 dest_err("connection refused to endpoint", DestinationType::S3),
502 "connectivity error"
503 );
504 }
505
506 #[test]
507 fn dest_dns_error_is_connectivity() {
508 assert_eq!(
509 dest_err("dns error: failed to lookup address", DestinationType::S3),
510 "connectivity error"
511 );
512 }
513
514 #[test]
515 fn dest_timed_out_is_connectivity() {
516 assert_eq!(
517 dest_err("request timed out after 30s", DestinationType::Gcs),
518 "connectivity error"
519 );
520 }
521
522 #[test]
523 fn dest_unknown_error_is_generic() {
524 assert_eq!(
525 dest_err("something else entirely", DestinationType::S3),
526 "error"
527 );
528 }
529
530 #[test]
531 fn strategy_full_scan() {
532 let e = make_export("t", ExportMode::Full, None);
533 assert_eq!(derive_strategy(&e), "full-scan");
534 }
535
536 #[test]
537 fn strategy_full_parallel() {
538 let mut e = make_export("t", ExportMode::Full, None);
539 e.parallel = 4;
540 assert_eq!(derive_strategy(&e), "full-parallel(4)");
541 }
542
543 #[test]
544 fn strategy_incremental() {
545 let e = make_export("t", ExportMode::Incremental, Some("updated_at"));
546 assert_eq!(derive_strategy(&e), "incremental(updated_at)");
547 }
548
549 #[test]
550 fn strategy_chunked() {
551 let mut e = make_export("t", ExportMode::Chunked, None);
552 e.chunk_column = Some("id".to_string());
553 e.chunk_size = 50_000;
554 assert_eq!(derive_strategy(&e), "chunked(id, size=50000)");
555 }
556
557 #[test]
558 fn strategy_chunked_parallel() {
559 let mut e = make_export("t", ExportMode::Chunked, None);
560 e.chunk_column = Some("id".to_string());
561 e.chunk_size = 50_000;
562 e.parallel = 3;
563 assert_eq!(derive_strategy(&e), "chunked-parallel(id, size=50000, p=3)");
564 }
565
566 #[test]
567 fn strategy_time_window() {
568 let mut e = make_export("t", ExportMode::TimeWindow, None);
569 e.time_column = Some("created_at".to_string());
570 e.days_window = Some(7);
571 assert_eq!(derive_strategy(&e), "time-window(created_at, 7d)");
572 }
573
574 #[test]
575 fn profile_small_indexed_is_fast() {
576 let e = make_export("t", ExportMode::Full, None);
577 assert_eq!(recommend_profile(Some(500_000), true, &e), "fast");
578 }
579
580 #[test]
581 fn profile_medium_indexed_is_balanced() {
582 let e = make_export("t", ExportMode::Full, None);
583 assert_eq!(recommend_profile(Some(5_000_000), true, &e), "balanced");
584 }
585
586 #[test]
587 fn profile_large_indexed_is_safe() {
588 let e = make_export("t", ExportMode::Full, None);
589 assert_eq!(recommend_profile(Some(50_000_000), true, &e), "safe");
590 }
591
592 #[test]
593 fn profile_small_no_index_is_balanced() {
594 let e = make_export("t", ExportMode::Full, None);
595 assert_eq!(recommend_profile(Some(50_000), false, &e), "balanced");
596 }
597
598 #[test]
599 fn profile_small_no_index_parallel_is_safe() {
600 let mut e = make_export("t", ExportMode::Full, None);
601 e.parallel = 4;
602 assert_eq!(recommend_profile(Some(50_000), false, &e), "safe");
603 }
604
605 #[test]
606 fn profile_medium_no_index_is_balanced() {
607 let e = make_export("t", ExportMode::Full, None);
608 assert_eq!(recommend_profile(Some(500_000), false, &e), "balanced");
609 }
610
611 #[test]
612 fn profile_large_no_index_is_safe() {
613 let e = make_export("t", ExportMode::Full, None);
614 assert_eq!(recommend_profile(Some(5_000_000), false, &e), "safe");
615 }
616
617 #[test]
618 fn sparse_range_warning_when_very_sparse() {
619 let mut e = make_export("t", ExportMode::Chunked, None);
620 e.chunk_column = Some("id".to_string());
621 e.chunk_size = 100_000;
622 let w = check_sparse_range(&e, Some(100_000), Some("1"), Some("10000000"));
623 assert!(w.is_some(), "should warn about sparse range");
624 let msg = w.unwrap();
625 assert!(msg.contains("Sparse key range"), "got: {msg}");
626 assert!(msg.contains("empty"), "got: {msg}");
627 }
628
629 #[test]
630 fn sparse_range_no_warning_when_dense() {
631 let mut e = make_export("t", ExportMode::Chunked, None);
632 e.chunk_column = Some("id".to_string());
633 e.chunk_size = 100_000;
634 let w = check_sparse_range(&e, Some(100_000), Some("1"), Some("100000"));
635 assert!(w.is_none(), "should not warn for dense range");
636 }
637
638 #[test]
639 fn sparse_range_skipped_when_chunk_dense() {
640 let mut e = make_export("t", ExportMode::Chunked, None);
641 e.chunk_column = Some("id".to_string());
642 e.chunk_dense = true;
643 e.chunk_size = 100_000;
644 let w = check_sparse_range(&e, Some(100_000), Some("1"), Some("10000000"));
645 assert!(
646 w.is_none(),
647 "chunk_dense uses ordinals, not physical id span"
648 );
649 }
650
651 #[test]
652 fn dense_surrogate_warning_when_chunk_dense_builtin() {
653 let mut e = make_export("t", ExportMode::Chunked, None);
654 e.chunk_column = Some("id".to_string());
655 e.chunk_dense = true;
656 e.query = Some("SELECT id FROM orders".to_string());
657 let w = check_dense_surrogate_cost(&e);
658 assert!(w.is_some(), "should warn about built-in ROW_NUMBER cost");
659 assert!(w.unwrap().contains("global sort"));
660 }
661
662 #[test]
663 fn sparse_range_not_triggered_for_non_chunked() {
664 let e = make_export("t", ExportMode::Full, None);
665 let w = check_sparse_range(&e, Some(100), Some("1"), Some("1000000"));
666 assert!(w.is_none(), "should not warn for non-chunked mode");
667 }
668
669 #[test]
670 fn dense_surrogate_warning_with_row_number() {
671 let mut e = make_export("t", ExportMode::Chunked, None);
672 e.chunk_column = Some("rn".to_string());
673 e.query = Some("SELECT *, ROW_NUMBER() OVER (ORDER BY id) AS rn FROM orders".to_string());
674 let w = check_dense_surrogate_cost(&e);
675 assert!(w.is_some(), "should warn about ROW_NUMBER cost");
676 assert!(w.unwrap().contains("global sort"));
677 }
678
679 #[test]
680 fn no_dense_surrogate_warning_without_row_number() {
681 let mut e = make_export("t", ExportMode::Chunked, None);
682 e.chunk_column = Some("id".to_string());
683 e.query = Some("SELECT * FROM orders".to_string());
684 let w = check_dense_surrogate_cost(&e);
685 assert!(w.is_none());
686 }
687
688 #[test]
689 fn no_dense_surrogate_warning_for_non_chunked() {
690 let mut e = make_export("t", ExportMode::Full, None);
691 e.query = Some("SELECT ROW_NUMBER() OVER () AS rn FROM t".to_string());
692 let w = check_dense_surrogate_cost(&e);
693 assert!(w.is_none(), "should not warn for non-chunked mode");
694 }
695
696 #[test]
697 fn parallel_memory_warning_large_dataset() {
698 let mut e = make_export("t", ExportMode::Chunked, None);
699 e.parallel = 4;
700 let w = check_parallel_memory_risk(&e, Some(10_000_000));
701 assert!(w.is_some(), "should warn about memory risk");
702 let msg = w.unwrap();
703 assert!(msg.contains("Parallel=4"), "got: {msg}");
704 assert!(msg.contains("memory"), "got: {msg}");
705 }
706
707 #[test]
708 fn no_parallel_memory_warning_small_dataset() {
709 let mut e = make_export("t", ExportMode::Chunked, None);
710 e.parallel = 4;
711 let w = check_parallel_memory_risk(&e, Some(1_000));
712 assert!(w.is_none(), "should not warn for small dataset");
713 }
714
715 #[test]
716 fn no_parallel_memory_warning_single_worker() {
717 let e = make_export("t", ExportMode::Full, None);
718 let w = check_parallel_memory_risk(&e, Some(100_000_000));
719 assert!(w.is_none(), "should not warn when parallel=1");
720 }
721
722 #[test]
723 fn suggestion_degraded_full_recommends_incremental() {
724 let e = make_export("t", ExportMode::Full, None);
725 let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e).unwrap();
726 assert!(s.contains("incremental"), "got: {s}");
727 }
728
729 #[test]
730 fn suggestion_degraded_chunked_recommends_index() {
731 let mut e = make_export("t", ExportMode::Chunked, None);
732 e.chunk_column = Some("id".to_string());
733 let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e).unwrap();
734 assert!(s.contains("index on 'id'"), "got: {s}");
735 }
736
737 #[test]
738 fn suggestion_degraded_time_window_recommends_index() {
739 let mut e = make_export("t", ExportMode::TimeWindow, None);
740 e.time_column = Some("created_at".to_string());
741 e.days_window = Some(7);
742 let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e).unwrap();
743 assert!(s.contains("index on 'created_at'"), "got: {s}");
744 }
745
746 #[test]
747 fn suggestion_unsafe_full_recommends_incremental() {
748 let e = make_export("t", ExportMode::Full, None);
749 let s = build_suggestion(&HealthVerdict::Unsafe, Some(100_000_000), false, &e).unwrap();
750 assert!(s.contains("incremental"), "got: {s}");
751 }
752
753 #[test]
754 fn suggestion_unsafe_chunked_recommends_index_and_parallel() {
755 let mut e = make_export("t", ExportMode::Chunked, None);
756 e.chunk_column = Some("id".to_string());
757 let s = build_suggestion(&HealthVerdict::Unsafe, Some(100_000_000), false, &e).unwrap();
758 assert!(s.contains("index on 'id'"), "got: {s}");
759 assert!(s.contains("parallel"), "got: {s}");
760 }
761
762 #[test]
763 fn suggestion_unsafe_incremental_recommends_index_on_cursor() {
764 let e = make_export("t", ExportMode::Incremental, Some("updated_at"));
765 let s = build_suggestion(&HealthVerdict::Unsafe, Some(100_000_000), false, &e).unwrap();
766 assert!(s.contains("index on 'updated_at'"), "got: {s}");
767 }
768
769 #[test]
770 fn suggestion_acceptable_large_full_recommends_incremental() {
771 let e = make_export("t", ExportMode::Full, None);
772 let s = build_suggestion(&HealthVerdict::Acceptable, Some(20_000_000), true, &e).unwrap();
773 assert!(s.contains("incremental"), "got: {s}");
774 }
775
776 #[test]
777 fn parallel_only_for_chunked_mode() {
778 let e = make_export("t", ExportMode::Full, None);
779 let (level, _) = recommend_parallelism(&e, Some(1_000_000), true);
780 assert_eq!(level, 1, "non-chunked mode should recommend 1");
781 }
782
783 #[test]
784 fn parallel_small_dataset_is_one() {
785 let mut e = make_export("t", ExportMode::Chunked, None);
786 e.chunk_column = Some("id".to_string());
787 let (level, _) = recommend_parallelism(&e, Some(10_000), true);
788 assert_eq!(level, 1, "small dataset should recommend 1");
789 }
790
791 #[test]
792 fn parallel_moderate_indexed_is_two() {
793 let mut e = make_export("t", ExportMode::Chunked, None);
794 e.chunk_column = Some("id".to_string());
795 let (level, _) = recommend_parallelism(&e, Some(200_000), true);
796 assert_eq!(level, 2, "moderate indexed dataset should recommend 2");
797 }
798
799 #[test]
800 fn parallel_large_indexed_is_four() {
801 let mut e = make_export("t", ExportMode::Chunked, None);
802 e.chunk_column = Some("id".to_string());
803 let (level, _) = recommend_parallelism(&e, Some(2_000_000), true);
804 assert_eq!(level, 4, "large indexed dataset should recommend 4");
805 }
806
807 #[test]
808 fn parallel_no_index_large_is_one() {
809 let mut e = make_export("t", ExportMode::Chunked, None);
810 e.chunk_column = Some("id".to_string());
811 let (level, reason) = recommend_parallelism(&e, Some(10_000_000), false);
812 assert_eq!(level, 1, "no index + large should recommend 1");
813 assert!(reason.contains("no index"), "got: {reason}");
814 }
815
816 #[test]
817 fn parallel_no_index_moderate_is_conservative() {
818 let mut e = make_export("t", ExportMode::Chunked, None);
819 e.chunk_column = Some("id".to_string());
820 let (level, _) = recommend_parallelism(&e, Some(200_000), false);
821 assert_eq!(
822 level, 2,
823 "no index + moderate should recommend 2 (conservative)"
824 );
825 }
826
827 #[test]
828 fn suggestion_acceptable_large_chunked_recommends_parallel() {
829 let mut e = make_export("t", ExportMode::Chunked, None);
830 e.chunk_column = Some("id".to_string());
831 let s = build_suggestion(&HealthVerdict::Acceptable, Some(20_000_000), true, &e).unwrap();
832 assert!(s.contains("parallel"), "got: {s}");
833 }
834
835 #[test]
836 fn connection_limit_warn_when_parallel_meets_max() {
837 let w = check_connection_limit(20, Some(20));
838 assert!(w.is_some(), "should warn when parallel == max_connections");
839 let msg = w.unwrap();
840 assert!(msg.contains("max_connections=20"), "got: {msg}");
841 assert!(msg.contains("parallel=20"), "got: {msg}");
842 }
843
844 #[test]
845 fn connection_limit_warn_when_parallel_exceeds_max() {
846 let w = check_connection_limit(100, Some(20));
847 assert!(w.is_some(), "should warn when parallel > max_connections");
848 let msg = w.unwrap();
849 assert!(msg.contains("max_connections=20"), "got: {msg}");
850 }
851
852 #[test]
853 fn connection_limit_no_warn_when_parallel_below_max() {
854 let w = check_connection_limit(4, Some(100));
855 assert!(
856 w.is_none(),
857 "should not warn when parallel << max_connections"
858 );
859 }
860
861 #[test]
862 fn connection_limit_no_warn_when_parallel_is_one() {
863 let w = check_connection_limit(1, Some(5));
864 assert!(
865 w.is_none(),
866 "single worker never triggers connection warning"
867 );
868 }
869
870 #[test]
871 fn connection_limit_skipped_note_when_max_unknown_and_parallel_gt_one() {
872 let w = check_connection_limit(100, None);
873 assert!(w.is_some(), "should note that check was skipped");
874 let msg = w.unwrap();
875 assert!(msg.contains("skipped"), "got: {msg}");
876 }
877
878 #[test]
879 fn connection_limit_no_note_when_max_unknown_and_parallel_is_one() {
880 let w = check_connection_limit(1, None);
881 assert!(
882 w.is_none(),
883 "single worker never triggers connection warning"
884 );
885 }
886
887 #[test]
888 fn connection_limit_suggests_headroom() {
889 let w = check_connection_limit(25, Some(20)).unwrap();
890 assert!(
892 w.contains("17"),
893 "should suggest leaving headroom, got: {w}"
894 );
895 }
896
897 fn src_hint(msg: &str, st: SourceType) -> Option<&'static str> {
900 let err = anyhow::anyhow!("{}", msg);
901 let cat = categorize_source_error(&err);
902 source_error_hint(cat, &err, &st)
903 }
904
905 fn dest_hint(msg: &str, dt: DestinationType) -> Option<&'static str> {
906 let err = anyhow::anyhow!("{}", msg);
907 let dest = DestinationConfig {
908 destination_type: dt,
909 bucket: Some("b".into()),
910 ..Default::default()
911 };
912 let cat = categorize_dest_error(&err, &dest);
913 destination_error_hint(cat, &dest)
914 }
915
916 #[test]
917 fn source_tls_handshake_returns_pg_specific_tls_hint() {
918 let h = src_hint("TLS handshake failed", SourceType::Postgres).expect("hint");
919 assert!(h.contains("tls.mode") && h.contains("ca_file"), "got: {h}");
920 }
921
922 #[test]
923 fn source_tls_handshake_returns_mysql_specific_tls_hint() {
924 let h = src_hint("certificate verify failed", SourceType::Mysql).expect("hint");
925 assert!(h.contains("tls.mode"), "got: {h}");
926 }
927
928 #[test]
929 fn source_auth_error_postgres_mentions_pg_hba() {
930 let h = src_hint("password authentication failed", SourceType::Postgres).expect("hint");
931 assert!(h.contains("pg_hba") && h.contains("SELECT"), "got: {h}");
932 }
933
934 #[test]
935 fn source_auth_error_mysql_mentions_grant() {
936 let h = src_hint(
937 "Access denied for user 'rivet'@'localhost'",
938 SourceType::Mysql,
939 )
940 .expect("hint");
941 assert!(h.contains("GRANT") && h.contains("FLUSH"), "got: {h}");
942 }
943
944 #[test]
945 fn source_connectivity_error_mentions_bastion_and_network() {
946 let h = src_hint("connection refused", SourceType::Postgres).expect("hint");
947 assert!(h.contains("bastion") || h.contains("VPN"), "got: {h}");
948 }
949
950 #[test]
951 fn source_unknown_error_returns_no_hint() {
952 let h = src_hint("totally unexpected", SourceType::Postgres);
955 assert!(h.is_none(), "unknown errors should not produce a hint");
956 }
957
958 #[test]
959 fn dest_s3_auth_error_names_concrete_actions() {
960 let h = dest_hint("permission denied", DestinationType::S3).expect("hint");
961 assert!(
962 h.contains("s3:PutObject") && h.contains("cloud-permissions"),
963 "got: {h}"
964 );
965 }
966
967 #[test]
968 fn dest_gcs_auth_error_names_concrete_actions() {
969 let h = dest_hint("403 Forbidden", DestinationType::Gcs).expect("hint");
970 assert!(
971 h.contains("storage.objects") && h.contains("cloud-permissions"),
972 "got: {h}"
973 );
974 }
975
976 #[test]
977 fn categorize_dest_error_sas_expired_message_returns_sas_expired_category() {
978 let err = anyhow::anyhow!(
983 "Azure SAS token already expired (se=2024-01-01T00:00:00Z). Generate a new SAS and re-export."
984 );
985 let dest = DestinationConfig {
986 destination_type: DestinationType::Azure,
987 bucket: Some("c".into()),
988 ..Default::default()
989 };
990 let cat = categorize_dest_error(&err, &dest);
991 assert_eq!(
992 cat, "sas expired",
993 "expired-SAS error must categorise as 'sas expired', not '{cat}' — ordering in categorize_dest_error is load-bearing"
994 );
995 }
996
997 #[test]
998 fn dest_azure_sas_expired_returns_regenerate_hint() {
999 let h = dest_hint(
1003 "Azure SAS token already expired (se=2024-01-01T00:00:00Z)",
1004 DestinationType::Azure,
1005 )
1006 .expect("hint");
1007 assert!(
1008 h.contains("generate-sas") && h.contains("AZURE_STORAGE_SAS_TOKEN"),
1009 "got: {h}"
1010 );
1011 }
1012
1013 #[test]
1014 fn dest_s3_bucket_not_found_says_no_auto_create() {
1015 let h = dest_hint("NoSuchBucket", DestinationType::S3).expect("hint");
1016 assert!(
1017 h.contains("does NOT auto-create") && h.contains("aws s3 mb"),
1018 "got: {h}"
1019 );
1020 }
1021
1022 #[test]
1023 fn dest_s3_connectivity_error_warns_about_region_mismatch() {
1024 let h = dest_hint("dns error", DestinationType::S3).expect("hint");
1025 assert!(h.contains("region") || h.contains("endpoint"), "got: {h}");
1026 }
1027}