1mod analysis;
2pub(crate) mod cursor_expr;
3mod doctor;
4mod mysql;
5mod postgres;
6pub mod type_report;
7
8pub(crate) use analysis::chunk_sparsity_from_counts;
9#[cfg(test)]
10use analysis::{
11 build_suggestion, check_connection_limit, check_dense_surrogate_cost,
12 check_parallel_memory_risk, check_sparse_range, compute_verdict, derive_strategy,
13 recommend_parallelism, recommend_profile,
14};
15#[allow(unused_imports)]
16pub use doctor::doctor;
17#[cfg(test)]
18use postgres::{extract_scan_type, parse_pg_row_estimate};
19
20use crate::config::{Config, ExportConfig, SourceType};
21use crate::error::Result;
22use crate::types::policy::TypePolicy;
23use crate::types::target::ExportTarget;
24
25#[derive(Debug)]
26pub enum HealthVerdict {
27 Efficient,
28 Acceptable,
29 Degraded,
30 Unsafe,
31}
32
33impl std::fmt::Display for HealthVerdict {
34 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35 match self {
36 Self::Efficient => write!(f, "EFFICIENT"),
37 Self::Acceptable => write!(f, "ACCEPTABLE"),
38 Self::Degraded => write!(f, "DEGRADED"),
39 Self::Unsafe => write!(f, "UNSAFE"),
40 }
41 }
42}
43
44pub(crate) struct ExportDiagnostic {
45 pub export_name: String,
46 pub strategy: String,
47 pub mode: String,
48 pub cursor_column: Option<String>,
49 pub row_estimate: Option<i64>,
50 pub cursor_min: Option<String>,
51 pub cursor_max: Option<String>,
52 pub scan_type: Option<String>,
53 pub uses_index: bool,
54 pub verdict: HealthVerdict,
55 pub recommended_profile: &'static str,
56 pub recommended_parallel: (u32, &'static str),
57 pub warnings: Vec<String>,
58 pub suggestion: Option<String>,
59}
60
61pub(crate) fn get_export_diagnostic(
65 config: &Config,
66 export: &ExportConfig,
67) -> Result<ExportDiagnostic> {
68 let url = config.source.resolve_url()?;
69 let tls = config.source.tls.as_ref();
70 crate::source::warn_if_tls_disabled(&config.source);
71 match config.source.source_type {
72 SourceType::Postgres => postgres::diagnose_export_pg(&url, tls, export),
73 SourceType::Mysql => mysql::diagnose_export_mysql(&url, tls, export),
74 }
75}
76
77pub fn check(
78 config_path: &str,
79 export_name: Option<&str>,
80 params: Option<&std::collections::HashMap<String, String>>,
81 show_type_report: bool,
82 strict: bool,
83 json_output: bool,
84 target: Option<ExportTarget>,
85) -> Result<()> {
86 let config = Config::load_with_params(config_path, params)?;
87
88 let exports: Vec<&ExportConfig> = if let Some(name) = export_name {
89 let e = config
90 .exports
91 .iter()
92 .find(|e| e.name == name)
93 .ok_or_else(|| anyhow::anyhow!("export '{}' not found in config", name))?;
94 vec![e]
95 } else {
96 config.exports.iter().collect()
97 };
98
99 let url = config.source.resolve_url()?;
100 let tls = config.source.tls.as_ref();
101 crate::source::warn_if_tls_disabled(&config.source);
107 match config.source.source_type {
108 SourceType::Postgres => postgres::check_postgres(&url, tls, &exports, json_output)?,
109 SourceType::Mysql => mysql::check_mysql(&url, tls, &exports, json_output)?,
110 }
111
112 let mut seen_destinations: std::collections::HashSet<String> = std::collections::HashSet::new();
121 for export in &exports {
122 let dest_key = format!(
123 "{:?}:{}:{}:{}",
124 export.destination.destination_type,
125 export.destination.bucket.as_deref().unwrap_or("-"),
126 export.destination.endpoint.as_deref().unwrap_or("-"),
127 export.destination.path.as_deref().unwrap_or("-"),
128 );
129 if !seen_destinations.insert(dest_key) {
130 continue;
131 }
132 let expanded = crate::plan::build::expand_destination_templates(
133 export.destination.clone(),
134 &export.name,
135 );
136 crate::destination::create_destination(&expanded).map_err(|e| {
137 anyhow::anyhow!(
138 "export '{}': destination preflight failed: {:#}",
139 export.name,
140 e
141 )
142 })?;
143 }
144
145 if show_type_report {
146 let policy = if strict {
147 TypePolicy::strict()
148 } else {
149 TypePolicy::warn_only()
150 };
151
152 let mut any_fatal = false;
153 for export in &exports {
154 let column_overrides =
155 crate::plan::parse_column_overrides_pub(&export.columns, &export.name)?;
156 if let Some(t) = export.target.as_deref()
160 && crate::types::target::ExportTarget::parse(t).is_none()
161 {
162 anyhow::bail!(
163 "export '{}': unknown target '{t}' (expected: bigquery, duckdb)",
164 export.name
165 );
166 }
167 let eff_target = target.or_else(|| {
168 export
169 .target
170 .as_deref()
171 .and_then(crate::types::target::ExportTarget::parse)
172 });
173 let config_dir = std::path::Path::new(config_path)
174 .parent()
175 .unwrap_or_else(|| std::path::Path::new("."));
176 match type_report::collect_report(
177 &config,
178 export,
179 &column_overrides,
180 &policy,
181 eff_target,
182 config_dir,
183 params,
184 ) {
185 Ok(report) => {
186 if report.has_fatal() {
187 any_fatal = true;
188 }
189 if eff_target.is_some() && report.has_target_fail() {
190 any_fatal = true;
191 }
192 if json_output {
193 type_report::print_json(&report)?;
194 } else {
195 type_report::print_table(&report, eff_target);
196 }
197 }
198 Err(e) => {
199 log::warn!("type report for '{}' failed: {:#}", export.name, e);
200 }
201 }
202 }
203
204 if strict && any_fatal {
205 anyhow::bail!("strict mode: unsafe type mappings found (see report above)");
206 }
207 }
208
209 Ok(())
210}
211
212fn print_diagnostic(diag: &ExportDiagnostic) {
213 println!();
214 println!("Export: {}", diag.export_name);
215 println!(" Strategy: {}", diag.strategy);
216 println!(" Mode: {}", diag.mode);
217 if let Some(est) = diag.row_estimate {
218 if est >= 1_000_000 {
219 println!(" Row estimate: ~{}M", est / 1_000_000);
220 } else if est >= 1_000 {
221 println!(" Row estimate: ~{}K", est / 1_000);
222 } else {
223 println!(" Row estimate: ~{}", est);
224 }
225 }
226 if let (Some(min_v), Some(max_v)) = (&diag.cursor_min, &diag.cursor_max) {
227 println!(" Cursor range: {} .. {}", min_v, max_v);
228 }
229 if let Some(col) = &diag.cursor_column {
230 println!(" Cursor col: {}", col);
231 }
232 if let Some(scan) = &diag.scan_type {
233 let idx_label = if diag.uses_index { " (indexed)" } else { "" };
234 println!(" Scan type: {}{}", scan, idx_label);
235 }
236 println!(" Verdict: {}", diag.verdict);
237 println!(
238 " Recommended: tuning.profile: {}",
239 diag.recommended_profile
240 );
241 let (par_level, par_reason) = diag.recommended_parallel;
242 if par_level > 1 {
243 println!(" Recommended: parallel: {} ({})", par_level, par_reason);
244 } else {
245 println!(" Parallelism: {} ({})", par_level, par_reason);
246 }
247 for w in &diag.warnings {
248 println!(" Warning: {}", w);
249 }
250 if let Some(suggestion) = &diag.suggestion {
251 println!(" Suggestion: {}", suggestion);
252 }
253}
254
255#[cfg(test)]
256mod tests {
257 use super::*;
258 use crate::config::{
259 CompressionType, DestinationConfig, DestinationType, ExportConfig, ExportMode, FormatType,
260 IncrementalCursorMode, MetaColumns, TimeColumnType,
261 };
262 use doctor::{
263 categorize_dest_error, categorize_source_error, destination_error_hint, source_error_hint,
264 };
265
266 fn make_export(name: &str, mode: ExportMode, cursor: Option<&str>) -> ExportConfig {
267 ExportConfig {
268 name: name.to_string(),
269 target: None,
270 verify: crate::config::VerifyMode::Size,
271 query: Some("SELECT * FROM t".to_string()),
272 query_file: None,
273 table: None,
274 mode,
275 cursor_column: cursor.map(|s| s.to_string()),
276 cursor_fallback_column: None,
277 incremental_cursor_mode: IncrementalCursorMode::SingleColumn,
278 chunk_column: None,
279 chunk_size: 100_000,
280 chunk_size_memory_mb: None,
281 chunk_count: None,
282 parallel: 1,
283 time_column: None,
284 time_column_type: TimeColumnType::Timestamp,
285 days_window: None,
286 format: FormatType::Csv,
287 compression: CompressionType::default(),
288 compression_level: None,
289 compression_profile: None,
290 skip_empty: false,
291 destination: DestinationConfig {
292 destination_type: DestinationType::Local,
293 path: Some("./out".to_string()),
294 ..Default::default()
295 },
296 meta_columns: MetaColumns::default(),
297 quality: None,
298 max_file_size: None,
299 chunk_checkpoint: false,
300 chunk_max_attempts: None,
301 tuning: None,
302 chunk_dense: false,
303 chunk_by_days: None,
304 chunk_by_key: None,
305 source_group: None,
306 reconcile_required: false,
307 columns: Default::default(),
308 on_schema_drift: Default::default(),
309 shape_drift_warn_factor: None,
310 parquet: None,
311 }
312 }
313
314 #[test]
315 fn verdict_small_indexed_with_cursor_is_efficient() {
316 let v = compute_verdict(Some(500_000), true, true);
317 assert!(matches!(v, HealthVerdict::Efficient), "got: {v}");
318 }
319
320 #[test]
321 fn verdict_large_indexed_with_cursor_is_acceptable() {
322 let v = compute_verdict(Some(20_000_000), true, true);
323 assert!(matches!(v, HealthVerdict::Acceptable), "got: {v}");
324 }
325
326 #[test]
327 fn verdict_no_index_no_cursor_is_degraded() {
328 let v = compute_verdict(Some(500_000), false, false);
329 assert!(matches!(v, HealthVerdict::Degraded), "got: {v}");
330 }
331
332 #[test]
333 fn verdict_huge_no_index_is_unsafe() {
334 let v = compute_verdict(Some(100_000_000), false, false);
335 assert!(matches!(v, HealthVerdict::Unsafe), "got: {v}");
336 }
337
338 #[test]
339 fn parse_pg_row_estimate_from_sort_plan() {
340 let plan = "Sort (cost=12345.67..12456.78 rows=1000455 width=50)\n -> Seq Scan on orders (cost=0.00..8765.43 rows=1000455 width=50)";
341 assert_eq!(parse_pg_row_estimate(plan), Some(1_000_455));
342 }
343
344 #[test]
345 fn parse_pg_row_estimate_from_index_scan() {
346 let plan =
347 "Index Scan using idx_updated on orders (cost=0.42..81676.36 rows=500000 width=50)";
348 assert_eq!(parse_pg_row_estimate(plan), Some(500_000));
349 }
350
351 #[test]
352 fn extract_scan_type_detects_seq_scan() {
353 let plan = "Sort (cost=...)\n -> Seq Scan on users (cost=...)";
354 let st = extract_scan_type(plan);
355 assert!(st.contains("Seq Scan"), "expected Seq Scan, got: {st}");
356 }
357
358 #[test]
359 fn extract_scan_type_detects_index_scan() {
360 let plan = "Index Scan using users_pkey on users (cost=0.42..123.45 rows=100 width=50)";
361 let st = extract_scan_type(plan);
362 assert!(st.contains("Index Scan"), "expected Index Scan, got: {st}");
363 }
364
365 #[test]
366 fn suggestion_for_efficient_verdict_is_none() {
367 let e = make_export("t", ExportMode::Full, None);
368 let s = build_suggestion(&HealthVerdict::Efficient, Some(1000), true, &e);
369 assert!(
370 s.is_none(),
371 "efficient verdict should produce no suggestion"
372 );
373 }
374
375 #[test]
376 fn suggestion_for_degraded_verdict_recommends_safe_profile() {
377 let e = make_export("t", ExportMode::Full, None);
378 let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e);
379 let msg = s.expect("degraded verdict should produce a suggestion");
380 assert!(
381 msg.contains("safe"),
382 "suggestion should recommend safe profile, got: {msg}"
383 );
384 }
385
386 fn src_err(msg: &str) -> &'static str {
387 categorize_source_error(&anyhow::anyhow!("{}", msg))
388 }
389
390 #[test]
391 fn source_password_rejected_is_auth_error() {
392 assert_eq!(
393 src_err("password authentication failed for user \"rivet\""),
394 "auth error"
395 );
396 }
397
398 #[test]
399 fn source_authentication_failed_is_auth_error() {
400 assert_eq!(src_err("FATAL: authentication failed"), "auth error");
401 }
402
403 #[test]
404 fn source_access_denied_is_auth_error() {
405 assert_eq!(
406 src_err("Access denied for user 'rivet'@'localhost'"),
407 "auth error"
408 );
409 }
410
411 #[test]
412 fn source_connection_refused_is_connectivity() {
413 assert_eq!(
414 src_err("connection refused (os error 61)"),
415 "connectivity error"
416 );
417 }
418
419 #[test]
420 fn source_timed_out_is_connectivity() {
421 assert_eq!(src_err("connection timed out"), "connectivity error");
422 }
423
424 #[test]
425 fn source_dns_translate_host_is_connectivity() {
426 assert_eq!(
427 src_err("could not translate host name \"db.bad\" to address"),
428 "connectivity error"
429 );
430 }
431
432 #[test]
433 fn source_name_not_known_is_connectivity() {
434 assert_eq!(src_err("Name or service not known"), "connectivity error");
435 }
436
437 #[test]
438 fn source_unknown_error_is_generic() {
439 assert_eq!(src_err("something totally unexpected"), "error");
440 }
441
442 fn dest_config(dtype: DestinationType) -> DestinationConfig {
443 DestinationConfig {
444 destination_type: dtype,
445 bucket: Some("b".to_string()),
446 ..Default::default()
447 }
448 }
449
450 fn dest_err(msg: &str, dtype: DestinationType) -> &'static str {
451 let cfg = dest_config(dtype);
452 categorize_dest_error(&anyhow::anyhow!("{}", msg), &cfg)
453 }
454
455 #[test]
456 fn dest_credential_loading_is_auth_error() {
457 assert_eq!(
458 dest_err(
459 "loading credential to sign http request",
460 DestinationType::Gcs
461 ),
462 "auth error"
463 );
464 }
465
466 #[test]
467 fn dest_permission_denied_is_auth_error() {
468 assert_eq!(
469 dest_err("permission denied on resource bucket", DestinationType::S3),
470 "auth error"
471 );
472 }
473
474 #[test]
475 fn dest_forbidden_is_auth_error() {
476 assert_eq!(
477 dest_err("403 Forbidden", DestinationType::Gcs),
478 "auth error"
479 );
480 }
481
482 #[test]
483 fn dest_unauthorized_is_auth_error() {
484 assert_eq!(
485 dest_err("401 Unauthorized", DestinationType::S3),
486 "auth error"
487 );
488 }
489
490 #[test]
491 fn dest_invalid_grant_is_auth_error() {
492 assert_eq!(
493 dest_err(
494 "invalid_grant: token has been revoked",
495 DestinationType::Gcs
496 ),
497 "auth error"
498 );
499 }
500
501 #[test]
502 fn dest_nosuchbucket_s3_is_bucket_not_found() {
503 assert_eq!(
504 dest_err(
505 "NoSuchBucket: the specified bucket does not exist",
506 DestinationType::S3
507 ),
508 "bucket not found"
509 );
510 }
511
512 #[test]
513 fn dest_not_found_gcs_is_bucket_not_found() {
514 assert_eq!(
515 dest_err("bucket not found (404)", DestinationType::Gcs),
516 "bucket not found"
517 );
518 }
519
520 #[test]
521 fn dest_not_found_local_is_path_not_found() {
522 assert_eq!(
523 dest_err("path not found: /tmp/missing", DestinationType::Local),
524 "path not found"
525 );
526 }
527
528 #[test]
529 fn dest_connection_refused_is_connectivity() {
530 assert_eq!(
531 dest_err("connection refused to endpoint", DestinationType::S3),
532 "connectivity error"
533 );
534 }
535
536 #[test]
537 fn dest_dns_error_is_connectivity() {
538 assert_eq!(
539 dest_err("dns error: failed to lookup address", DestinationType::S3),
540 "connectivity error"
541 );
542 }
543
544 #[test]
545 fn dest_timed_out_is_connectivity() {
546 assert_eq!(
547 dest_err("request timed out after 30s", DestinationType::Gcs),
548 "connectivity error"
549 );
550 }
551
552 #[test]
553 fn dest_unknown_error_is_generic() {
554 assert_eq!(
555 dest_err("something else entirely", DestinationType::S3),
556 "error"
557 );
558 }
559
560 #[test]
561 fn strategy_full_scan() {
562 let e = make_export("t", ExportMode::Full, None);
563 assert_eq!(derive_strategy(&e), "full-scan");
564 }
565
566 #[test]
567 fn strategy_full_parallel() {
568 let mut e = make_export("t", ExportMode::Full, None);
569 e.parallel = 4;
570 assert_eq!(derive_strategy(&e), "full-parallel(4)");
571 }
572
573 #[test]
574 fn strategy_incremental() {
575 let e = make_export("t", ExportMode::Incremental, Some("updated_at"));
576 assert_eq!(derive_strategy(&e), "incremental(updated_at)");
577 }
578
579 #[test]
580 fn strategy_chunked() {
581 let mut e = make_export("t", ExportMode::Chunked, None);
582 e.chunk_column = Some("id".to_string());
583 e.chunk_size = 50_000;
584 assert_eq!(derive_strategy(&e), "chunked(id, size=50000)");
585 }
586
587 #[test]
588 fn strategy_chunked_parallel() {
589 let mut e = make_export("t", ExportMode::Chunked, None);
590 e.chunk_column = Some("id".to_string());
591 e.chunk_size = 50_000;
592 e.parallel = 3;
593 assert_eq!(derive_strategy(&e), "chunked-parallel(id, size=50000, p=3)");
594 }
595
596 #[test]
597 fn strategy_time_window() {
598 let mut e = make_export("t", ExportMode::TimeWindow, None);
599 e.time_column = Some("created_at".to_string());
600 e.days_window = Some(7);
601 assert_eq!(derive_strategy(&e), "time-window(created_at, 7d)");
602 }
603
604 #[test]
605 fn profile_small_indexed_is_fast() {
606 let e = make_export("t", ExportMode::Full, None);
607 assert_eq!(recommend_profile(Some(500_000), true, &e), "fast");
608 }
609
610 #[test]
611 fn profile_medium_indexed_is_balanced() {
612 let e = make_export("t", ExportMode::Full, None);
613 assert_eq!(recommend_profile(Some(5_000_000), true, &e), "balanced");
614 }
615
616 #[test]
617 fn profile_large_indexed_is_safe() {
618 let e = make_export("t", ExportMode::Full, None);
619 assert_eq!(recommend_profile(Some(50_000_000), true, &e), "safe");
620 }
621
622 #[test]
623 fn profile_small_no_index_is_balanced() {
624 let e = make_export("t", ExportMode::Full, None);
625 assert_eq!(recommend_profile(Some(50_000), false, &e), "balanced");
626 }
627
628 #[test]
629 fn profile_small_no_index_parallel_is_safe() {
630 let mut e = make_export("t", ExportMode::Full, None);
631 e.parallel = 4;
632 assert_eq!(recommend_profile(Some(50_000), false, &e), "safe");
633 }
634
635 #[test]
636 fn profile_medium_no_index_is_balanced() {
637 let e = make_export("t", ExportMode::Full, None);
638 assert_eq!(recommend_profile(Some(500_000), false, &e), "balanced");
639 }
640
641 #[test]
642 fn profile_large_no_index_is_safe() {
643 let e = make_export("t", ExportMode::Full, None);
644 assert_eq!(recommend_profile(Some(5_000_000), false, &e), "safe");
645 }
646
647 #[test]
648 fn sparse_range_warning_when_very_sparse() {
649 let mut e = make_export("t", ExportMode::Chunked, None);
650 e.chunk_column = Some("id".to_string());
651 e.chunk_size = 100_000;
652 let w = check_sparse_range(&e, Some(100_000), Some("1"), Some("10000000"));
653 assert!(w.is_some(), "should warn about sparse range");
654 let msg = w.unwrap();
655 assert!(msg.contains("Sparse key range"), "got: {msg}");
656 assert!(msg.contains("empty"), "got: {msg}");
657 }
658
659 #[test]
660 fn sparse_range_no_warning_when_dense() {
661 let mut e = make_export("t", ExportMode::Chunked, None);
662 e.chunk_column = Some("id".to_string());
663 e.chunk_size = 100_000;
664 let w = check_sparse_range(&e, Some(100_000), Some("1"), Some("100000"));
665 assert!(w.is_none(), "should not warn for dense range");
666 }
667
668 #[test]
669 fn sparse_range_skipped_when_chunk_dense() {
670 let mut e = make_export("t", ExportMode::Chunked, None);
671 e.chunk_column = Some("id".to_string());
672 e.chunk_dense = true;
673 e.chunk_size = 100_000;
674 let w = check_sparse_range(&e, Some(100_000), Some("1"), Some("10000000"));
675 assert!(
676 w.is_none(),
677 "chunk_dense uses ordinals, not physical id span"
678 );
679 }
680
681 #[test]
682 fn dense_surrogate_warning_when_chunk_dense_builtin() {
683 let mut e = make_export("t", ExportMode::Chunked, None);
684 e.chunk_column = Some("id".to_string());
685 e.chunk_dense = true;
686 e.query = Some("SELECT id FROM orders".to_string());
687 let w = check_dense_surrogate_cost(&e);
688 assert!(w.is_some(), "should warn about built-in ROW_NUMBER cost");
689 assert!(w.unwrap().contains("global sort"));
690 }
691
692 #[test]
693 fn sparse_range_not_triggered_for_non_chunked() {
694 let e = make_export("t", ExportMode::Full, None);
695 let w = check_sparse_range(&e, Some(100), Some("1"), Some("1000000"));
696 assert!(w.is_none(), "should not warn for non-chunked mode");
697 }
698
699 #[test]
700 fn dense_surrogate_warning_with_row_number() {
701 let mut e = make_export("t", ExportMode::Chunked, None);
702 e.chunk_column = Some("rn".to_string());
703 e.query = Some("SELECT *, ROW_NUMBER() OVER (ORDER BY id) AS rn FROM orders".to_string());
704 let w = check_dense_surrogate_cost(&e);
705 assert!(w.is_some(), "should warn about ROW_NUMBER cost");
706 assert!(w.unwrap().contains("global sort"));
707 }
708
709 #[test]
710 fn no_dense_surrogate_warning_without_row_number() {
711 let mut e = make_export("t", ExportMode::Chunked, None);
712 e.chunk_column = Some("id".to_string());
713 e.query = Some("SELECT * FROM orders".to_string());
714 let w = check_dense_surrogate_cost(&e);
715 assert!(w.is_none());
716 }
717
718 #[test]
719 fn no_dense_surrogate_warning_for_non_chunked() {
720 let mut e = make_export("t", ExportMode::Full, None);
721 e.query = Some("SELECT ROW_NUMBER() OVER () AS rn FROM t".to_string());
722 let w = check_dense_surrogate_cost(&e);
723 assert!(w.is_none(), "should not warn for non-chunked mode");
724 }
725
726 #[test]
727 fn parallel_memory_warning_large_dataset() {
728 let mut e = make_export("t", ExportMode::Chunked, None);
729 e.parallel = 4;
730 let w = check_parallel_memory_risk(&e, Some(10_000_000));
731 assert!(w.is_some(), "should warn about memory risk");
732 let msg = w.unwrap();
733 assert!(msg.contains("Parallel=4"), "got: {msg}");
734 assert!(msg.contains("memory"), "got: {msg}");
735 }
736
737 #[test]
738 fn no_parallel_memory_warning_small_dataset() {
739 let mut e = make_export("t", ExportMode::Chunked, None);
740 e.parallel = 4;
741 let w = check_parallel_memory_risk(&e, Some(1_000));
742 assert!(w.is_none(), "should not warn for small dataset");
743 }
744
745 #[test]
746 fn no_parallel_memory_warning_single_worker() {
747 let e = make_export("t", ExportMode::Full, None);
748 let w = check_parallel_memory_risk(&e, Some(100_000_000));
749 assert!(w.is_none(), "should not warn when parallel=1");
750 }
751
752 #[test]
753 fn suggestion_degraded_full_recommends_incremental() {
754 let e = make_export("t", ExportMode::Full, None);
755 let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e).unwrap();
756 assert!(s.contains("incremental"), "got: {s}");
757 }
758
759 #[test]
760 fn suggestion_degraded_chunked_recommends_index() {
761 let mut e = make_export("t", ExportMode::Chunked, None);
762 e.chunk_column = Some("id".to_string());
763 let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e).unwrap();
764 assert!(s.contains("index on 'id'"), "got: {s}");
765 }
766
767 #[test]
768 fn suggestion_degraded_time_window_recommends_index() {
769 let mut e = make_export("t", ExportMode::TimeWindow, None);
770 e.time_column = Some("created_at".to_string());
771 e.days_window = Some(7);
772 let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e).unwrap();
773 assert!(s.contains("index on 'created_at'"), "got: {s}");
774 }
775
776 #[test]
777 fn suggestion_unsafe_full_recommends_incremental() {
778 let e = make_export("t", ExportMode::Full, None);
779 let s = build_suggestion(&HealthVerdict::Unsafe, Some(100_000_000), false, &e).unwrap();
780 assert!(s.contains("incremental"), "got: {s}");
781 }
782
783 #[test]
784 fn suggestion_unsafe_chunked_recommends_index_and_parallel() {
785 let mut e = make_export("t", ExportMode::Chunked, None);
786 e.chunk_column = Some("id".to_string());
787 let s = build_suggestion(&HealthVerdict::Unsafe, Some(100_000_000), false, &e).unwrap();
788 assert!(s.contains("index on 'id'"), "got: {s}");
789 assert!(s.contains("parallel"), "got: {s}");
790 }
791
792 #[test]
793 fn suggestion_unsafe_incremental_recommends_index_on_cursor() {
794 let e = make_export("t", ExportMode::Incremental, Some("updated_at"));
795 let s = build_suggestion(&HealthVerdict::Unsafe, Some(100_000_000), false, &e).unwrap();
796 assert!(s.contains("index on 'updated_at'"), "got: {s}");
797 }
798
799 #[test]
800 fn suggestion_acceptable_large_full_recommends_incremental() {
801 let e = make_export("t", ExportMode::Full, None);
802 let s = build_suggestion(&HealthVerdict::Acceptable, Some(20_000_000), true, &e).unwrap();
803 assert!(s.contains("incremental"), "got: {s}");
804 }
805
806 #[test]
807 fn parallel_only_for_chunked_mode() {
808 let e = make_export("t", ExportMode::Full, None);
809 let (level, _) = recommend_parallelism(&e, Some(1_000_000), true);
810 assert_eq!(level, 1, "non-chunked mode should recommend 1");
811 }
812
813 #[test]
814 fn parallel_small_dataset_is_one() {
815 let mut e = make_export("t", ExportMode::Chunked, None);
816 e.chunk_column = Some("id".to_string());
817 let (level, _) = recommend_parallelism(&e, Some(10_000), true);
818 assert_eq!(level, 1, "small dataset should recommend 1");
819 }
820
821 #[test]
822 fn parallel_moderate_indexed_is_two() {
823 let mut e = make_export("t", ExportMode::Chunked, None);
824 e.chunk_column = Some("id".to_string());
825 let (level, _) = recommend_parallelism(&e, Some(200_000), true);
826 assert_eq!(level, 2, "moderate indexed dataset should recommend 2");
827 }
828
829 #[test]
830 fn parallel_large_indexed_is_four() {
831 let mut e = make_export("t", ExportMode::Chunked, None);
832 e.chunk_column = Some("id".to_string());
833 let (level, _) = recommend_parallelism(&e, Some(2_000_000), true);
834 assert_eq!(level, 4, "large indexed dataset should recommend 4");
835 }
836
837 #[test]
838 fn parallel_no_index_large_is_one() {
839 let mut e = make_export("t", ExportMode::Chunked, None);
840 e.chunk_column = Some("id".to_string());
841 let (level, reason) = recommend_parallelism(&e, Some(10_000_000), false);
842 assert_eq!(level, 1, "no index + large should recommend 1");
843 assert!(reason.contains("no index"), "got: {reason}");
844 }
845
846 #[test]
847 fn parallel_no_index_moderate_is_conservative() {
848 let mut e = make_export("t", ExportMode::Chunked, None);
849 e.chunk_column = Some("id".to_string());
850 let (level, _) = recommend_parallelism(&e, Some(200_000), false);
851 assert_eq!(
852 level, 2,
853 "no index + moderate should recommend 2 (conservative)"
854 );
855 }
856
857 #[test]
858 fn suggestion_acceptable_large_chunked_recommends_parallel() {
859 let mut e = make_export("t", ExportMode::Chunked, None);
860 e.chunk_column = Some("id".to_string());
861 let s = build_suggestion(&HealthVerdict::Acceptable, Some(20_000_000), true, &e).unwrap();
862 assert!(s.contains("parallel"), "got: {s}");
863 }
864
865 #[test]
866 fn connection_limit_warn_when_parallel_meets_max() {
867 let w = check_connection_limit(20, Some(20));
868 assert!(w.is_some(), "should warn when parallel == max_connections");
869 let msg = w.unwrap();
870 assert!(msg.contains("max_connections=20"), "got: {msg}");
871 assert!(msg.contains("parallel=20"), "got: {msg}");
872 }
873
874 #[test]
875 fn connection_limit_warn_when_parallel_exceeds_max() {
876 let w = check_connection_limit(100, Some(20));
877 assert!(w.is_some(), "should warn when parallel > max_connections");
878 let msg = w.unwrap();
879 assert!(msg.contains("max_connections=20"), "got: {msg}");
880 }
881
882 #[test]
883 fn connection_limit_no_warn_when_parallel_below_max() {
884 let w = check_connection_limit(4, Some(100));
885 assert!(
886 w.is_none(),
887 "should not warn when parallel << max_connections"
888 );
889 }
890
891 #[test]
892 fn connection_limit_no_warn_when_parallel_is_one() {
893 let w = check_connection_limit(1, Some(5));
894 assert!(
895 w.is_none(),
896 "single worker never triggers connection warning"
897 );
898 }
899
900 #[test]
901 fn connection_limit_skipped_note_when_max_unknown_and_parallel_gt_one() {
902 let w = check_connection_limit(100, None);
903 assert!(w.is_some(), "should note that check was skipped");
904 let msg = w.unwrap();
905 assert!(msg.contains("skipped"), "got: {msg}");
906 }
907
908 #[test]
909 fn connection_limit_no_note_when_max_unknown_and_parallel_is_one() {
910 let w = check_connection_limit(1, None);
911 assert!(
912 w.is_none(),
913 "single worker never triggers connection warning"
914 );
915 }
916
917 #[test]
918 fn connection_limit_suggests_headroom() {
919 let w = check_connection_limit(25, Some(20)).unwrap();
920 assert!(
922 w.contains("17"),
923 "should suggest leaving headroom, got: {w}"
924 );
925 }
926
927 fn src_hint(msg: &str, st: SourceType) -> Option<&'static str> {
930 let err = anyhow::anyhow!("{}", msg);
931 let cat = categorize_source_error(&err);
932 source_error_hint(cat, &err, &st)
933 }
934
935 fn dest_hint(msg: &str, dt: DestinationType) -> Option<&'static str> {
936 let err = anyhow::anyhow!("{}", msg);
937 let dest = DestinationConfig {
938 destination_type: dt,
939 bucket: Some("b".into()),
940 ..Default::default()
941 };
942 let cat = categorize_dest_error(&err, &dest);
943 destination_error_hint(cat, &dest)
944 }
945
946 #[test]
947 fn source_tls_handshake_returns_pg_specific_tls_hint() {
948 let h = src_hint("TLS handshake failed", SourceType::Postgres).expect("hint");
949 assert!(h.contains("tls.mode") && h.contains("ca_file"), "got: {h}");
950 }
951
952 #[test]
953 fn source_tls_handshake_returns_mysql_specific_tls_hint() {
954 let h = src_hint("certificate verify failed", SourceType::Mysql).expect("hint");
955 assert!(h.contains("tls.mode"), "got: {h}");
956 }
957
958 #[test]
959 fn source_auth_error_postgres_mentions_pg_hba() {
960 let h = src_hint("password authentication failed", SourceType::Postgres).expect("hint");
961 assert!(h.contains("pg_hba") && h.contains("SELECT"), "got: {h}");
962 }
963
964 #[test]
965 fn source_auth_error_mysql_mentions_grant() {
966 let h = src_hint(
967 "Access denied for user 'rivet'@'localhost'",
968 SourceType::Mysql,
969 )
970 .expect("hint");
971 assert!(h.contains("GRANT") && h.contains("FLUSH"), "got: {h}");
972 }
973
974 #[test]
975 fn source_connectivity_error_mentions_bastion_and_network() {
976 let h = src_hint("connection refused", SourceType::Postgres).expect("hint");
977 assert!(h.contains("bastion") || h.contains("VPN"), "got: {h}");
978 }
979
980 #[test]
981 fn source_unknown_error_returns_no_hint() {
982 let h = src_hint("totally unexpected", SourceType::Postgres);
985 assert!(h.is_none(), "unknown errors should not produce a hint");
986 }
987
988 #[test]
989 fn dest_s3_auth_error_names_concrete_actions() {
990 let h = dest_hint("permission denied", DestinationType::S3).expect("hint");
991 assert!(
992 h.contains("s3:PutObject") && h.contains("cloud-permissions"),
993 "got: {h}"
994 );
995 }
996
997 #[test]
998 fn dest_gcs_auth_error_names_concrete_actions() {
999 let h = dest_hint("403 Forbidden", DestinationType::Gcs).expect("hint");
1000 assert!(
1001 h.contains("storage.objects") && h.contains("cloud-permissions"),
1002 "got: {h}"
1003 );
1004 }
1005
1006 #[test]
1007 fn categorize_dest_error_sas_expired_message_returns_sas_expired_category() {
1008 let err = anyhow::anyhow!(
1013 "Azure SAS token already expired (se=2024-01-01T00:00:00Z). Generate a new SAS and re-export."
1014 );
1015 let dest = DestinationConfig {
1016 destination_type: DestinationType::Azure,
1017 bucket: Some("c".into()),
1018 ..Default::default()
1019 };
1020 let cat = categorize_dest_error(&err, &dest);
1021 assert_eq!(
1022 cat, "sas expired",
1023 "expired-SAS error must categorise as 'sas expired', not '{cat}' — ordering in categorize_dest_error is load-bearing"
1024 );
1025 }
1026
1027 #[test]
1028 fn dest_azure_sas_expired_returns_regenerate_hint() {
1029 let h = dest_hint(
1033 "Azure SAS token already expired (se=2024-01-01T00:00:00Z)",
1034 DestinationType::Azure,
1035 )
1036 .expect("hint");
1037 assert!(
1038 h.contains("generate-sas") && h.contains("AZURE_STORAGE_SAS_TOKEN"),
1039 "got: {h}"
1040 );
1041 }
1042
1043 #[test]
1044 fn dest_s3_bucket_not_found_says_no_auto_create() {
1045 let h = dest_hint("NoSuchBucket", DestinationType::S3).expect("hint");
1046 assert!(
1047 h.contains("does NOT auto-create") && h.contains("aws s3 mb"),
1048 "got: {h}"
1049 );
1050 }
1051
1052 #[test]
1053 fn dest_s3_connectivity_error_warns_about_region_mismatch() {
1054 let h = dest_hint("dns error", DestinationType::S3).expect("hint");
1055 assert!(h.contains("region") || h.contains("endpoint"), "got: {h}");
1056 }
1057}