1mod analysis;
2pub(crate) mod cursor_expr;
3mod doctor;
4mod mysql;
5mod postgres;
6pub mod type_report;
7
8pub(crate) use analysis::chunk_sparsity_from_counts;
9#[cfg(test)]
10use analysis::{
11 build_suggestion, check_connection_limit, check_dense_surrogate_cost,
12 check_parallel_memory_risk, check_sparse_range, compute_verdict, derive_strategy,
13 recommend_parallelism, recommend_profile,
14};
15#[allow(unused_imports)]
16pub use doctor::doctor;
17#[cfg(test)]
18use postgres::{extract_scan_type, parse_pg_row_estimate};
19
20use crate::config::{Config, ExportConfig, SourceType};
21use crate::error::Result;
22use crate::types::policy::TypePolicy;
23use crate::types::target::ExportTarget;
24
25#[derive(Debug)]
26pub enum HealthVerdict {
27 Efficient,
28 Acceptable,
29 Degraded,
30 Unsafe,
31}
32
33impl std::fmt::Display for HealthVerdict {
34 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35 match self {
36 Self::Efficient => write!(f, "EFFICIENT"),
37 Self::Acceptable => write!(f, "ACCEPTABLE"),
38 Self::Degraded => write!(f, "DEGRADED"),
39 Self::Unsafe => write!(f, "UNSAFE"),
40 }
41 }
42}
43
44pub(crate) struct ExportDiagnostic {
45 pub export_name: String,
46 pub strategy: String,
47 pub mode: String,
48 pub cursor_column: Option<String>,
49 pub row_estimate: Option<i64>,
50 pub cursor_min: Option<String>,
51 pub cursor_max: Option<String>,
52 pub scan_type: Option<String>,
53 pub uses_index: bool,
54 pub verdict: HealthVerdict,
55 pub recommended_profile: &'static str,
56 pub recommended_parallel: (u32, &'static str),
57 pub warnings: Vec<String>,
58 pub suggestion: Option<String>,
59}
60
61pub(crate) fn get_export_diagnostic(
65 config: &Config,
66 export: &ExportConfig,
67) -> Result<ExportDiagnostic> {
68 let url = config.source.resolve_url()?;
69 let tls = config.source.tls.as_ref();
70 crate::source::warn_if_tls_disabled(&config.source);
71 match config.source.source_type {
72 SourceType::Postgres => postgres::diagnose_export_pg(&url, tls, export),
73 SourceType::Mysql => mysql::diagnose_export_mysql(&url, tls, export),
74 }
75}
76
77pub fn check(
78 config_path: &str,
79 export_name: Option<&str>,
80 params: Option<&std::collections::HashMap<String, String>>,
81 show_type_report: bool,
82 strict: bool,
83 json_output: bool,
84 target: Option<ExportTarget>,
85) -> Result<()> {
86 let config = Config::load_with_params(config_path, params)?;
87
88 let exports: Vec<&ExportConfig> = if let Some(name) = export_name {
89 let e = config
90 .exports
91 .iter()
92 .find(|e| e.name == name)
93 .ok_or_else(|| anyhow::anyhow!("export '{}' not found in config", name))?;
94 vec![e]
95 } else {
96 config.exports.iter().collect()
97 };
98
99 let url = config.source.resolve_url()?;
100 let tls = config.source.tls.as_ref();
101 crate::source::warn_if_tls_disabled(&config.source);
107 match config.source.source_type {
108 SourceType::Postgres => postgres::check_postgres(&url, tls, &exports, json_output)?,
109 SourceType::Mysql => mysql::check_mysql(&url, tls, &exports, json_output)?,
110 }
111
112 let mut seen_destinations: std::collections::HashSet<String> = std::collections::HashSet::new();
121 for export in &exports {
122 let dest_key = format!(
123 "{:?}:{}:{}:{}",
124 export.destination.destination_type,
125 export.destination.bucket.as_deref().unwrap_or("-"),
126 export.destination.endpoint.as_deref().unwrap_or("-"),
127 export.destination.path.as_deref().unwrap_or("-"),
128 );
129 if !seen_destinations.insert(dest_key) {
130 continue;
131 }
132 let expanded = crate::plan::build::expand_destination_templates(
133 export.destination.clone(),
134 &export.name,
135 );
136 crate::destination::create_destination(&expanded).map_err(|e| {
137 anyhow::anyhow!(
138 "export '{}': destination preflight failed: {:#}",
139 export.name,
140 e
141 )
142 })?;
143 }
144
145 if show_type_report {
146 let policy = if strict {
147 TypePolicy::strict()
148 } else {
149 TypePolicy::warn_only()
150 };
151
152 let mut any_fatal = false;
153 for export in &exports {
154 let column_overrides =
155 crate::plan::parse_column_overrides_pub(&export.columns, &export.name)?;
156 match type_report::collect_report(&config, export, &column_overrides, &policy, target) {
157 Ok(report) => {
158 if report.has_fatal() {
159 any_fatal = true;
160 }
161 if target.is_some() && report.has_target_fail() {
162 any_fatal = true;
163 }
164 if json_output {
165 type_report::print_json(&report)?;
166 } else {
167 type_report::print_table(&report, target);
168 }
169 }
170 Err(e) => {
171 log::warn!("type report for '{}' failed: {:#}", export.name, e);
172 }
173 }
174 }
175
176 if strict && any_fatal {
177 anyhow::bail!("strict mode: unsafe type mappings found (see report above)");
178 }
179 }
180
181 Ok(())
182}
183
184fn print_diagnostic(diag: &ExportDiagnostic) {
185 println!();
186 println!("Export: {}", diag.export_name);
187 println!(" Strategy: {}", diag.strategy);
188 println!(" Mode: {}", diag.mode);
189 if let Some(est) = diag.row_estimate {
190 if est >= 1_000_000 {
191 println!(" Row estimate: ~{}M", est / 1_000_000);
192 } else if est >= 1_000 {
193 println!(" Row estimate: ~{}K", est / 1_000);
194 } else {
195 println!(" Row estimate: ~{}", est);
196 }
197 }
198 if let (Some(min_v), Some(max_v)) = (&diag.cursor_min, &diag.cursor_max) {
199 println!(" Cursor range: {} .. {}", min_v, max_v);
200 }
201 if let Some(col) = &diag.cursor_column {
202 println!(" Cursor col: {}", col);
203 }
204 if let Some(scan) = &diag.scan_type {
205 let idx_label = if diag.uses_index { " (indexed)" } else { "" };
206 println!(" Scan type: {}{}", scan, idx_label);
207 }
208 println!(" Verdict: {}", diag.verdict);
209 println!(
210 " Recommended: tuning.profile: {}",
211 diag.recommended_profile
212 );
213 let (par_level, par_reason) = diag.recommended_parallel;
214 if par_level > 1 {
215 println!(" Recommended: parallel: {} ({})", par_level, par_reason);
216 } else {
217 println!(" Parallelism: {} ({})", par_level, par_reason);
218 }
219 for w in &diag.warnings {
220 println!(" Warning: {}", w);
221 }
222 if let Some(suggestion) = &diag.suggestion {
223 println!(" Suggestion: {}", suggestion);
224 }
225}
226
227#[cfg(test)]
228mod tests {
229 use super::*;
230 use crate::config::{
231 CompressionType, DestinationConfig, DestinationType, ExportConfig, ExportMode, FormatType,
232 IncrementalCursorMode, MetaColumns, TimeColumnType,
233 };
234 use doctor::{
235 categorize_dest_error, categorize_source_error, destination_error_hint, source_error_hint,
236 };
237
238 fn make_export(name: &str, mode: ExportMode, cursor: Option<&str>) -> ExportConfig {
239 ExportConfig {
240 name: name.to_string(),
241 query: Some("SELECT * FROM t".to_string()),
242 query_file: None,
243 table: None,
244 mode,
245 cursor_column: cursor.map(|s| s.to_string()),
246 cursor_fallback_column: None,
247 incremental_cursor_mode: IncrementalCursorMode::SingleColumn,
248 chunk_column: None,
249 chunk_size: 100_000,
250 chunk_size_memory_mb: None,
251 chunk_count: None,
252 parallel: 1,
253 time_column: None,
254 time_column_type: TimeColumnType::Timestamp,
255 days_window: None,
256 format: FormatType::Csv,
257 compression: CompressionType::default(),
258 compression_level: None,
259 compression_profile: None,
260 skip_empty: false,
261 destination: DestinationConfig {
262 destination_type: DestinationType::Local,
263 path: Some("./out".to_string()),
264 ..Default::default()
265 },
266 meta_columns: MetaColumns::default(),
267 quality: None,
268 max_file_size: None,
269 chunk_checkpoint: false,
270 chunk_max_attempts: None,
271 tuning: None,
272 chunk_dense: false,
273 chunk_by_days: None,
274 source_group: None,
275 reconcile_required: false,
276 columns: Default::default(),
277 on_schema_drift: Default::default(),
278 shape_drift_warn_factor: None,
279 parquet: None,
280 }
281 }
282
283 #[test]
284 fn verdict_small_indexed_with_cursor_is_efficient() {
285 let v = compute_verdict(Some(500_000), true, true);
286 assert!(matches!(v, HealthVerdict::Efficient), "got: {v}");
287 }
288
289 #[test]
290 fn verdict_large_indexed_with_cursor_is_acceptable() {
291 let v = compute_verdict(Some(20_000_000), true, true);
292 assert!(matches!(v, HealthVerdict::Acceptable), "got: {v}");
293 }
294
295 #[test]
296 fn verdict_no_index_no_cursor_is_degraded() {
297 let v = compute_verdict(Some(500_000), false, false);
298 assert!(matches!(v, HealthVerdict::Degraded), "got: {v}");
299 }
300
301 #[test]
302 fn verdict_huge_no_index_is_unsafe() {
303 let v = compute_verdict(Some(100_000_000), false, false);
304 assert!(matches!(v, HealthVerdict::Unsafe), "got: {v}");
305 }
306
307 #[test]
308 fn parse_pg_row_estimate_from_sort_plan() {
309 let plan = "Sort (cost=12345.67..12456.78 rows=1000455 width=50)\n -> Seq Scan on orders (cost=0.00..8765.43 rows=1000455 width=50)";
310 assert_eq!(parse_pg_row_estimate(plan), Some(1_000_455));
311 }
312
313 #[test]
314 fn parse_pg_row_estimate_from_index_scan() {
315 let plan =
316 "Index Scan using idx_updated on orders (cost=0.42..81676.36 rows=500000 width=50)";
317 assert_eq!(parse_pg_row_estimate(plan), Some(500_000));
318 }
319
320 #[test]
321 fn extract_scan_type_detects_seq_scan() {
322 let plan = "Sort (cost=...)\n -> Seq Scan on users (cost=...)";
323 let st = extract_scan_type(plan);
324 assert!(st.contains("Seq Scan"), "expected Seq Scan, got: {st}");
325 }
326
327 #[test]
328 fn extract_scan_type_detects_index_scan() {
329 let plan = "Index Scan using users_pkey on users (cost=0.42..123.45 rows=100 width=50)";
330 let st = extract_scan_type(plan);
331 assert!(st.contains("Index Scan"), "expected Index Scan, got: {st}");
332 }
333
334 #[test]
335 fn suggestion_for_efficient_verdict_is_none() {
336 let e = make_export("t", ExportMode::Full, None);
337 let s = build_suggestion(&HealthVerdict::Efficient, Some(1000), true, &e);
338 assert!(
339 s.is_none(),
340 "efficient verdict should produce no suggestion"
341 );
342 }
343
344 #[test]
345 fn suggestion_for_degraded_verdict_recommends_safe_profile() {
346 let e = make_export("t", ExportMode::Full, None);
347 let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e);
348 let msg = s.expect("degraded verdict should produce a suggestion");
349 assert!(
350 msg.contains("safe"),
351 "suggestion should recommend safe profile, got: {msg}"
352 );
353 }
354
355 fn src_err(msg: &str) -> &'static str {
356 categorize_source_error(&anyhow::anyhow!("{}", msg))
357 }
358
359 #[test]
360 fn source_password_rejected_is_auth_error() {
361 assert_eq!(
362 src_err("password authentication failed for user \"rivet\""),
363 "auth error"
364 );
365 }
366
367 #[test]
368 fn source_authentication_failed_is_auth_error() {
369 assert_eq!(src_err("FATAL: authentication failed"), "auth error");
370 }
371
372 #[test]
373 fn source_access_denied_is_auth_error() {
374 assert_eq!(
375 src_err("Access denied for user 'rivet'@'localhost'"),
376 "auth error"
377 );
378 }
379
380 #[test]
381 fn source_connection_refused_is_connectivity() {
382 assert_eq!(
383 src_err("connection refused (os error 61)"),
384 "connectivity error"
385 );
386 }
387
388 #[test]
389 fn source_timed_out_is_connectivity() {
390 assert_eq!(src_err("connection timed out"), "connectivity error");
391 }
392
393 #[test]
394 fn source_dns_translate_host_is_connectivity() {
395 assert_eq!(
396 src_err("could not translate host name \"db.bad\" to address"),
397 "connectivity error"
398 );
399 }
400
401 #[test]
402 fn source_name_not_known_is_connectivity() {
403 assert_eq!(src_err("Name or service not known"), "connectivity error");
404 }
405
406 #[test]
407 fn source_unknown_error_is_generic() {
408 assert_eq!(src_err("something totally unexpected"), "error");
409 }
410
411 fn dest_config(dtype: DestinationType) -> DestinationConfig {
412 DestinationConfig {
413 destination_type: dtype,
414 bucket: Some("b".to_string()),
415 ..Default::default()
416 }
417 }
418
419 fn dest_err(msg: &str, dtype: DestinationType) -> &'static str {
420 let cfg = dest_config(dtype);
421 categorize_dest_error(&anyhow::anyhow!("{}", msg), &cfg)
422 }
423
424 #[test]
425 fn dest_credential_loading_is_auth_error() {
426 assert_eq!(
427 dest_err(
428 "loading credential to sign http request",
429 DestinationType::Gcs
430 ),
431 "auth error"
432 );
433 }
434
435 #[test]
436 fn dest_permission_denied_is_auth_error() {
437 assert_eq!(
438 dest_err("permission denied on resource bucket", DestinationType::S3),
439 "auth error"
440 );
441 }
442
443 #[test]
444 fn dest_forbidden_is_auth_error() {
445 assert_eq!(
446 dest_err("403 Forbidden", DestinationType::Gcs),
447 "auth error"
448 );
449 }
450
451 #[test]
452 fn dest_unauthorized_is_auth_error() {
453 assert_eq!(
454 dest_err("401 Unauthorized", DestinationType::S3),
455 "auth error"
456 );
457 }
458
459 #[test]
460 fn dest_invalid_grant_is_auth_error() {
461 assert_eq!(
462 dest_err(
463 "invalid_grant: token has been revoked",
464 DestinationType::Gcs
465 ),
466 "auth error"
467 );
468 }
469
470 #[test]
471 fn dest_nosuchbucket_s3_is_bucket_not_found() {
472 assert_eq!(
473 dest_err(
474 "NoSuchBucket: the specified bucket does not exist",
475 DestinationType::S3
476 ),
477 "bucket not found"
478 );
479 }
480
481 #[test]
482 fn dest_not_found_gcs_is_bucket_not_found() {
483 assert_eq!(
484 dest_err("bucket not found (404)", DestinationType::Gcs),
485 "bucket not found"
486 );
487 }
488
489 #[test]
490 fn dest_not_found_local_is_path_not_found() {
491 assert_eq!(
492 dest_err("path not found: /tmp/missing", DestinationType::Local),
493 "path not found"
494 );
495 }
496
497 #[test]
498 fn dest_connection_refused_is_connectivity() {
499 assert_eq!(
500 dest_err("connection refused to endpoint", DestinationType::S3),
501 "connectivity error"
502 );
503 }
504
505 #[test]
506 fn dest_dns_error_is_connectivity() {
507 assert_eq!(
508 dest_err("dns error: failed to lookup address", DestinationType::S3),
509 "connectivity error"
510 );
511 }
512
513 #[test]
514 fn dest_timed_out_is_connectivity() {
515 assert_eq!(
516 dest_err("request timed out after 30s", DestinationType::Gcs),
517 "connectivity error"
518 );
519 }
520
521 #[test]
522 fn dest_unknown_error_is_generic() {
523 assert_eq!(
524 dest_err("something else entirely", DestinationType::S3),
525 "error"
526 );
527 }
528
529 #[test]
530 fn strategy_full_scan() {
531 let e = make_export("t", ExportMode::Full, None);
532 assert_eq!(derive_strategy(&e), "full-scan");
533 }
534
535 #[test]
536 fn strategy_full_parallel() {
537 let mut e = make_export("t", ExportMode::Full, None);
538 e.parallel = 4;
539 assert_eq!(derive_strategy(&e), "full-parallel(4)");
540 }
541
542 #[test]
543 fn strategy_incremental() {
544 let e = make_export("t", ExportMode::Incremental, Some("updated_at"));
545 assert_eq!(derive_strategy(&e), "incremental(updated_at)");
546 }
547
548 #[test]
549 fn strategy_chunked() {
550 let mut e = make_export("t", ExportMode::Chunked, None);
551 e.chunk_column = Some("id".to_string());
552 e.chunk_size = 50_000;
553 assert_eq!(derive_strategy(&e), "chunked(id, size=50000)");
554 }
555
556 #[test]
557 fn strategy_chunked_parallel() {
558 let mut e = make_export("t", ExportMode::Chunked, None);
559 e.chunk_column = Some("id".to_string());
560 e.chunk_size = 50_000;
561 e.parallel = 3;
562 assert_eq!(derive_strategy(&e), "chunked-parallel(id, size=50000, p=3)");
563 }
564
565 #[test]
566 fn strategy_time_window() {
567 let mut e = make_export("t", ExportMode::TimeWindow, None);
568 e.time_column = Some("created_at".to_string());
569 e.days_window = Some(7);
570 assert_eq!(derive_strategy(&e), "time-window(created_at, 7d)");
571 }
572
573 #[test]
574 fn profile_small_indexed_is_fast() {
575 let e = make_export("t", ExportMode::Full, None);
576 assert_eq!(recommend_profile(Some(500_000), true, &e), "fast");
577 }
578
579 #[test]
580 fn profile_medium_indexed_is_balanced() {
581 let e = make_export("t", ExportMode::Full, None);
582 assert_eq!(recommend_profile(Some(5_000_000), true, &e), "balanced");
583 }
584
585 #[test]
586 fn profile_large_indexed_is_safe() {
587 let e = make_export("t", ExportMode::Full, None);
588 assert_eq!(recommend_profile(Some(50_000_000), true, &e), "safe");
589 }
590
591 #[test]
592 fn profile_small_no_index_is_balanced() {
593 let e = make_export("t", ExportMode::Full, None);
594 assert_eq!(recommend_profile(Some(50_000), false, &e), "balanced");
595 }
596
597 #[test]
598 fn profile_small_no_index_parallel_is_safe() {
599 let mut e = make_export("t", ExportMode::Full, None);
600 e.parallel = 4;
601 assert_eq!(recommend_profile(Some(50_000), false, &e), "safe");
602 }
603
604 #[test]
605 fn profile_medium_no_index_is_balanced() {
606 let e = make_export("t", ExportMode::Full, None);
607 assert_eq!(recommend_profile(Some(500_000), false, &e), "balanced");
608 }
609
610 #[test]
611 fn profile_large_no_index_is_safe() {
612 let e = make_export("t", ExportMode::Full, None);
613 assert_eq!(recommend_profile(Some(5_000_000), false, &e), "safe");
614 }
615
616 #[test]
617 fn sparse_range_warning_when_very_sparse() {
618 let mut e = make_export("t", ExportMode::Chunked, None);
619 e.chunk_column = Some("id".to_string());
620 e.chunk_size = 100_000;
621 let w = check_sparse_range(&e, Some(100_000), Some("1"), Some("10000000"));
622 assert!(w.is_some(), "should warn about sparse range");
623 let msg = w.unwrap();
624 assert!(msg.contains("Sparse key range"), "got: {msg}");
625 assert!(msg.contains("empty"), "got: {msg}");
626 }
627
628 #[test]
629 fn sparse_range_no_warning_when_dense() {
630 let mut e = make_export("t", ExportMode::Chunked, None);
631 e.chunk_column = Some("id".to_string());
632 e.chunk_size = 100_000;
633 let w = check_sparse_range(&e, Some(100_000), Some("1"), Some("100000"));
634 assert!(w.is_none(), "should not warn for dense range");
635 }
636
637 #[test]
638 fn sparse_range_skipped_when_chunk_dense() {
639 let mut e = make_export("t", ExportMode::Chunked, None);
640 e.chunk_column = Some("id".to_string());
641 e.chunk_dense = true;
642 e.chunk_size = 100_000;
643 let w = check_sparse_range(&e, Some(100_000), Some("1"), Some("10000000"));
644 assert!(
645 w.is_none(),
646 "chunk_dense uses ordinals, not physical id span"
647 );
648 }
649
650 #[test]
651 fn dense_surrogate_warning_when_chunk_dense_builtin() {
652 let mut e = make_export("t", ExportMode::Chunked, None);
653 e.chunk_column = Some("id".to_string());
654 e.chunk_dense = true;
655 e.query = Some("SELECT id FROM orders".to_string());
656 let w = check_dense_surrogate_cost(&e);
657 assert!(w.is_some(), "should warn about built-in ROW_NUMBER cost");
658 assert!(w.unwrap().contains("global sort"));
659 }
660
661 #[test]
662 fn sparse_range_not_triggered_for_non_chunked() {
663 let e = make_export("t", ExportMode::Full, None);
664 let w = check_sparse_range(&e, Some(100), Some("1"), Some("1000000"));
665 assert!(w.is_none(), "should not warn for non-chunked mode");
666 }
667
668 #[test]
669 fn dense_surrogate_warning_with_row_number() {
670 let mut e = make_export("t", ExportMode::Chunked, None);
671 e.chunk_column = Some("rn".to_string());
672 e.query = Some("SELECT *, ROW_NUMBER() OVER (ORDER BY id) AS rn FROM orders".to_string());
673 let w = check_dense_surrogate_cost(&e);
674 assert!(w.is_some(), "should warn about ROW_NUMBER cost");
675 assert!(w.unwrap().contains("global sort"));
676 }
677
678 #[test]
679 fn no_dense_surrogate_warning_without_row_number() {
680 let mut e = make_export("t", ExportMode::Chunked, None);
681 e.chunk_column = Some("id".to_string());
682 e.query = Some("SELECT * FROM orders".to_string());
683 let w = check_dense_surrogate_cost(&e);
684 assert!(w.is_none());
685 }
686
687 #[test]
688 fn no_dense_surrogate_warning_for_non_chunked() {
689 let mut e = make_export("t", ExportMode::Full, None);
690 e.query = Some("SELECT ROW_NUMBER() OVER () AS rn FROM t".to_string());
691 let w = check_dense_surrogate_cost(&e);
692 assert!(w.is_none(), "should not warn for non-chunked mode");
693 }
694
695 #[test]
696 fn parallel_memory_warning_large_dataset() {
697 let mut e = make_export("t", ExportMode::Chunked, None);
698 e.parallel = 4;
699 let w = check_parallel_memory_risk(&e, Some(10_000_000));
700 assert!(w.is_some(), "should warn about memory risk");
701 let msg = w.unwrap();
702 assert!(msg.contains("Parallel=4"), "got: {msg}");
703 assert!(msg.contains("memory"), "got: {msg}");
704 }
705
706 #[test]
707 fn no_parallel_memory_warning_small_dataset() {
708 let mut e = make_export("t", ExportMode::Chunked, None);
709 e.parallel = 4;
710 let w = check_parallel_memory_risk(&e, Some(1_000));
711 assert!(w.is_none(), "should not warn for small dataset");
712 }
713
714 #[test]
715 fn no_parallel_memory_warning_single_worker() {
716 let e = make_export("t", ExportMode::Full, None);
717 let w = check_parallel_memory_risk(&e, Some(100_000_000));
718 assert!(w.is_none(), "should not warn when parallel=1");
719 }
720
721 #[test]
722 fn suggestion_degraded_full_recommends_incremental() {
723 let e = make_export("t", ExportMode::Full, None);
724 let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e).unwrap();
725 assert!(s.contains("incremental"), "got: {s}");
726 }
727
728 #[test]
729 fn suggestion_degraded_chunked_recommends_index() {
730 let mut e = make_export("t", ExportMode::Chunked, None);
731 e.chunk_column = Some("id".to_string());
732 let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e).unwrap();
733 assert!(s.contains("index on 'id'"), "got: {s}");
734 }
735
736 #[test]
737 fn suggestion_degraded_time_window_recommends_index() {
738 let mut e = make_export("t", ExportMode::TimeWindow, None);
739 e.time_column = Some("created_at".to_string());
740 e.days_window = Some(7);
741 let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e).unwrap();
742 assert!(s.contains("index on 'created_at'"), "got: {s}");
743 }
744
745 #[test]
746 fn suggestion_unsafe_full_recommends_incremental() {
747 let e = make_export("t", ExportMode::Full, None);
748 let s = build_suggestion(&HealthVerdict::Unsafe, Some(100_000_000), false, &e).unwrap();
749 assert!(s.contains("incremental"), "got: {s}");
750 }
751
752 #[test]
753 fn suggestion_unsafe_chunked_recommends_index_and_parallel() {
754 let mut e = make_export("t", ExportMode::Chunked, None);
755 e.chunk_column = Some("id".to_string());
756 let s = build_suggestion(&HealthVerdict::Unsafe, Some(100_000_000), false, &e).unwrap();
757 assert!(s.contains("index on 'id'"), "got: {s}");
758 assert!(s.contains("parallel"), "got: {s}");
759 }
760
761 #[test]
762 fn suggestion_unsafe_incremental_recommends_index_on_cursor() {
763 let e = make_export("t", ExportMode::Incremental, Some("updated_at"));
764 let s = build_suggestion(&HealthVerdict::Unsafe, Some(100_000_000), false, &e).unwrap();
765 assert!(s.contains("index on 'updated_at'"), "got: {s}");
766 }
767
768 #[test]
769 fn suggestion_acceptable_large_full_recommends_incremental() {
770 let e = make_export("t", ExportMode::Full, None);
771 let s = build_suggestion(&HealthVerdict::Acceptable, Some(20_000_000), true, &e).unwrap();
772 assert!(s.contains("incremental"), "got: {s}");
773 }
774
775 #[test]
776 fn parallel_only_for_chunked_mode() {
777 let e = make_export("t", ExportMode::Full, None);
778 let (level, _) = recommend_parallelism(&e, Some(1_000_000), true);
779 assert_eq!(level, 1, "non-chunked mode should recommend 1");
780 }
781
782 #[test]
783 fn parallel_small_dataset_is_one() {
784 let mut e = make_export("t", ExportMode::Chunked, None);
785 e.chunk_column = Some("id".to_string());
786 let (level, _) = recommend_parallelism(&e, Some(10_000), true);
787 assert_eq!(level, 1, "small dataset should recommend 1");
788 }
789
790 #[test]
791 fn parallel_moderate_indexed_is_two() {
792 let mut e = make_export("t", ExportMode::Chunked, None);
793 e.chunk_column = Some("id".to_string());
794 let (level, _) = recommend_parallelism(&e, Some(200_000), true);
795 assert_eq!(level, 2, "moderate indexed dataset should recommend 2");
796 }
797
798 #[test]
799 fn parallel_large_indexed_is_four() {
800 let mut e = make_export("t", ExportMode::Chunked, None);
801 e.chunk_column = Some("id".to_string());
802 let (level, _) = recommend_parallelism(&e, Some(2_000_000), true);
803 assert_eq!(level, 4, "large indexed dataset should recommend 4");
804 }
805
806 #[test]
807 fn parallel_no_index_large_is_one() {
808 let mut e = make_export("t", ExportMode::Chunked, None);
809 e.chunk_column = Some("id".to_string());
810 let (level, reason) = recommend_parallelism(&e, Some(10_000_000), false);
811 assert_eq!(level, 1, "no index + large should recommend 1");
812 assert!(reason.contains("no index"), "got: {reason}");
813 }
814
815 #[test]
816 fn parallel_no_index_moderate_is_conservative() {
817 let mut e = make_export("t", ExportMode::Chunked, None);
818 e.chunk_column = Some("id".to_string());
819 let (level, _) = recommend_parallelism(&e, Some(200_000), false);
820 assert_eq!(
821 level, 2,
822 "no index + moderate should recommend 2 (conservative)"
823 );
824 }
825
826 #[test]
827 fn suggestion_acceptable_large_chunked_recommends_parallel() {
828 let mut e = make_export("t", ExportMode::Chunked, None);
829 e.chunk_column = Some("id".to_string());
830 let s = build_suggestion(&HealthVerdict::Acceptable, Some(20_000_000), true, &e).unwrap();
831 assert!(s.contains("parallel"), "got: {s}");
832 }
833
834 #[test]
835 fn connection_limit_warn_when_parallel_meets_max() {
836 let w = check_connection_limit(20, Some(20));
837 assert!(w.is_some(), "should warn when parallel == max_connections");
838 let msg = w.unwrap();
839 assert!(msg.contains("max_connections=20"), "got: {msg}");
840 assert!(msg.contains("parallel=20"), "got: {msg}");
841 }
842
843 #[test]
844 fn connection_limit_warn_when_parallel_exceeds_max() {
845 let w = check_connection_limit(100, Some(20));
846 assert!(w.is_some(), "should warn when parallel > max_connections");
847 let msg = w.unwrap();
848 assert!(msg.contains("max_connections=20"), "got: {msg}");
849 }
850
851 #[test]
852 fn connection_limit_no_warn_when_parallel_below_max() {
853 let w = check_connection_limit(4, Some(100));
854 assert!(
855 w.is_none(),
856 "should not warn when parallel << max_connections"
857 );
858 }
859
860 #[test]
861 fn connection_limit_no_warn_when_parallel_is_one() {
862 let w = check_connection_limit(1, Some(5));
863 assert!(
864 w.is_none(),
865 "single worker never triggers connection warning"
866 );
867 }
868
869 #[test]
870 fn connection_limit_skipped_note_when_max_unknown_and_parallel_gt_one() {
871 let w = check_connection_limit(100, None);
872 assert!(w.is_some(), "should note that check was skipped");
873 let msg = w.unwrap();
874 assert!(msg.contains("skipped"), "got: {msg}");
875 }
876
877 #[test]
878 fn connection_limit_no_note_when_max_unknown_and_parallel_is_one() {
879 let w = check_connection_limit(1, None);
880 assert!(
881 w.is_none(),
882 "single worker never triggers connection warning"
883 );
884 }
885
886 #[test]
887 fn connection_limit_suggests_headroom() {
888 let w = check_connection_limit(25, Some(20)).unwrap();
889 assert!(
891 w.contains("17"),
892 "should suggest leaving headroom, got: {w}"
893 );
894 }
895
896 fn src_hint(msg: &str, st: SourceType) -> Option<&'static str> {
899 let err = anyhow::anyhow!("{}", msg);
900 let cat = categorize_source_error(&err);
901 source_error_hint(cat, &err, &st)
902 }
903
904 fn dest_hint(msg: &str, dt: DestinationType) -> Option<&'static str> {
905 let err = anyhow::anyhow!("{}", msg);
906 let dest = DestinationConfig {
907 destination_type: dt,
908 bucket: Some("b".into()),
909 ..Default::default()
910 };
911 let cat = categorize_dest_error(&err, &dest);
912 destination_error_hint(cat, &dest)
913 }
914
915 #[test]
916 fn source_tls_handshake_returns_pg_specific_tls_hint() {
917 let h = src_hint("TLS handshake failed", SourceType::Postgres).expect("hint");
918 assert!(h.contains("tls.mode") && h.contains("ca_file"), "got: {h}");
919 }
920
921 #[test]
922 fn source_tls_handshake_returns_mysql_specific_tls_hint() {
923 let h = src_hint("certificate verify failed", SourceType::Mysql).expect("hint");
924 assert!(h.contains("tls.mode"), "got: {h}");
925 }
926
927 #[test]
928 fn source_auth_error_postgres_mentions_pg_hba() {
929 let h = src_hint("password authentication failed", SourceType::Postgres).expect("hint");
930 assert!(h.contains("pg_hba") && h.contains("SELECT"), "got: {h}");
931 }
932
933 #[test]
934 fn source_auth_error_mysql_mentions_grant() {
935 let h = src_hint(
936 "Access denied for user 'rivet'@'localhost'",
937 SourceType::Mysql,
938 )
939 .expect("hint");
940 assert!(h.contains("GRANT") && h.contains("FLUSH"), "got: {h}");
941 }
942
943 #[test]
944 fn source_connectivity_error_mentions_bastion_and_network() {
945 let h = src_hint("connection refused", SourceType::Postgres).expect("hint");
946 assert!(h.contains("bastion") || h.contains("VPN"), "got: {h}");
947 }
948
949 #[test]
950 fn source_unknown_error_returns_no_hint() {
951 let h = src_hint("totally unexpected", SourceType::Postgres);
954 assert!(h.is_none(), "unknown errors should not produce a hint");
955 }
956
957 #[test]
958 fn dest_s3_auth_error_names_concrete_actions() {
959 let h = dest_hint("permission denied", DestinationType::S3).expect("hint");
960 assert!(
961 h.contains("s3:PutObject") && h.contains("cloud-permissions"),
962 "got: {h}"
963 );
964 }
965
966 #[test]
967 fn dest_gcs_auth_error_names_concrete_actions() {
968 let h = dest_hint("403 Forbidden", DestinationType::Gcs).expect("hint");
969 assert!(
970 h.contains("storage.objects") && h.contains("cloud-permissions"),
971 "got: {h}"
972 );
973 }
974
975 #[test]
976 fn categorize_dest_error_sas_expired_message_returns_sas_expired_category() {
977 let err = anyhow::anyhow!(
982 "Azure SAS token already expired (se=2024-01-01T00:00:00Z). Generate a new SAS and re-export."
983 );
984 let dest = DestinationConfig {
985 destination_type: DestinationType::Azure,
986 bucket: Some("c".into()),
987 ..Default::default()
988 };
989 let cat = categorize_dest_error(&err, &dest);
990 assert_eq!(
991 cat, "sas expired",
992 "expired-SAS error must categorise as 'sas expired', not '{cat}' — ordering in categorize_dest_error is load-bearing"
993 );
994 }
995
996 #[test]
997 fn dest_azure_sas_expired_returns_regenerate_hint() {
998 let h = dest_hint(
1002 "Azure SAS token already expired (se=2024-01-01T00:00:00Z)",
1003 DestinationType::Azure,
1004 )
1005 .expect("hint");
1006 assert!(
1007 h.contains("generate-sas") && h.contains("AZURE_STORAGE_SAS_TOKEN"),
1008 "got: {h}"
1009 );
1010 }
1011
1012 #[test]
1013 fn dest_s3_bucket_not_found_says_no_auto_create() {
1014 let h = dest_hint("NoSuchBucket", DestinationType::S3).expect("hint");
1015 assert!(
1016 h.contains("does NOT auto-create") && h.contains("aws s3 mb"),
1017 "got: {h}"
1018 );
1019 }
1020
1021 #[test]
1022 fn dest_s3_connectivity_error_warns_about_region_mismatch() {
1023 let h = dest_hint("dns error", DestinationType::S3).expect("hint");
1024 assert!(h.contains("region") || h.contains("endpoint"), "got: {h}");
1025 }
1026}