1mod analysis;
2pub(crate) mod cursor_expr;
3mod doctor;
4mod mysql;
5mod postgres;
6pub mod type_report;
7
8pub(crate) use analysis::chunk_sparsity_from_counts;
9#[cfg(test)]
10use analysis::{
11 build_suggestion, check_connection_limit, check_dense_surrogate_cost,
12 check_parallel_memory_risk, check_sparse_range, compute_verdict, derive_strategy,
13 recommend_parallelism, recommend_profile,
14};
15#[allow(unused_imports)]
16pub use doctor::doctor;
17#[cfg(test)]
18use postgres::{extract_scan_type, parse_pg_row_estimate};
19
20use crate::config::{Config, ExportConfig, SourceType};
21use crate::error::Result;
22use crate::types::policy::TypePolicy;
23use crate::types::target::ExportTarget;
24
25#[derive(Debug)]
26pub enum HealthVerdict {
27 Efficient,
28 Acceptable,
29 Degraded,
30 Unsafe,
31}
32
33impl std::fmt::Display for HealthVerdict {
34 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35 match self {
36 Self::Efficient => write!(f, "EFFICIENT"),
37 Self::Acceptable => write!(f, "ACCEPTABLE"),
38 Self::Degraded => write!(f, "DEGRADED"),
39 Self::Unsafe => write!(f, "UNSAFE"),
40 }
41 }
42}
43
44pub(crate) struct ExportDiagnostic {
45 pub export_name: String,
46 pub strategy: String,
47 pub mode: String,
48 pub cursor_column: Option<String>,
49 pub row_estimate: Option<i64>,
50 pub cursor_min: Option<String>,
51 pub cursor_max: Option<String>,
52 pub scan_type: Option<String>,
53 pub uses_index: bool,
54 pub verdict: HealthVerdict,
55 pub recommended_profile: &'static str,
56 pub recommended_parallel: (u32, &'static str),
57 pub warnings: Vec<String>,
58 pub suggestion: Option<String>,
59}
60
61pub(crate) fn get_export_diagnostic(
65 config: &Config,
66 export: &ExportConfig,
67) -> Result<ExportDiagnostic> {
68 let url = config.source.resolve_url()?;
69 let tls = config.source.tls.as_ref();
70 match config.source.source_type {
71 SourceType::Postgres => postgres::diagnose_export_pg(&url, tls, export),
72 SourceType::Mysql => mysql::diagnose_export_mysql(&url, tls, export),
73 }
74}
75
76pub fn check(
77 config_path: &str,
78 export_name: Option<&str>,
79 params: Option<&std::collections::HashMap<String, String>>,
80 show_type_report: bool,
81 strict: bool,
82 json_output: bool,
83 target: Option<ExportTarget>,
84) -> Result<()> {
85 let config = Config::load_with_params(config_path, params)?;
86
87 let exports: Vec<&ExportConfig> = if let Some(name) = export_name {
88 let e = config
89 .exports
90 .iter()
91 .find(|e| e.name == name)
92 .ok_or_else(|| anyhow::anyhow!("export '{}' not found in config", name))?;
93 vec![e]
94 } else {
95 config.exports.iter().collect()
96 };
97
98 let url = config.source.resolve_url()?;
99 let tls = config.source.tls.as_ref();
100 match config.source.source_type {
101 SourceType::Postgres => postgres::check_postgres(&url, tls, &exports, json_output)?,
102 SourceType::Mysql => mysql::check_mysql(&url, tls, &exports, json_output)?,
103 }
104
105 let mut seen_destinations: std::collections::HashSet<String> = std::collections::HashSet::new();
114 for export in &exports {
115 let dest_key = format!(
116 "{:?}:{}:{}:{}",
117 export.destination.destination_type,
118 export.destination.bucket.as_deref().unwrap_or("-"),
119 export.destination.endpoint.as_deref().unwrap_or("-"),
120 export.destination.path.as_deref().unwrap_or("-"),
121 );
122 if !seen_destinations.insert(dest_key) {
123 continue;
124 }
125 let expanded = crate::plan::build::expand_destination_templates(
126 export.destination.clone(),
127 &export.name,
128 );
129 crate::destination::create_destination(&expanded).map_err(|e| {
130 anyhow::anyhow!(
131 "export '{}': destination preflight failed: {:#}",
132 export.name,
133 e
134 )
135 })?;
136 }
137
138 if show_type_report {
139 let policy = if strict {
140 TypePolicy::strict()
141 } else {
142 TypePolicy::warn_only()
143 };
144
145 let mut any_fatal = false;
146 for export in &exports {
147 let column_overrides =
148 crate::plan::parse_column_overrides_pub(&export.columns, &export.name)?;
149 match type_report::collect_report(&config, export, &column_overrides, &policy, target) {
150 Ok(report) => {
151 if report.has_fatal() {
152 any_fatal = true;
153 }
154 if target.is_some() && report.has_target_fail() {
155 any_fatal = true;
156 }
157 if json_output {
158 type_report::print_json(&report)?;
159 } else {
160 type_report::print_table(&report, target);
161 }
162 }
163 Err(e) => {
164 log::warn!("type report for '{}' failed: {:#}", export.name, e);
165 }
166 }
167 }
168
169 if strict && any_fatal {
170 anyhow::bail!("strict mode: unsafe type mappings found (see report above)");
171 }
172 }
173
174 Ok(())
175}
176
177fn print_diagnostic(diag: &ExportDiagnostic) {
178 println!();
179 println!("Export: {}", diag.export_name);
180 println!(" Strategy: {}", diag.strategy);
181 println!(" Mode: {}", diag.mode);
182 if let Some(est) = diag.row_estimate {
183 if est >= 1_000_000 {
184 println!(" Row estimate: ~{}M", est / 1_000_000);
185 } else if est >= 1_000 {
186 println!(" Row estimate: ~{}K", est / 1_000);
187 } else {
188 println!(" Row estimate: ~{}", est);
189 }
190 }
191 if let (Some(min_v), Some(max_v)) = (&diag.cursor_min, &diag.cursor_max) {
192 println!(" Cursor range: {} .. {}", min_v, max_v);
193 }
194 if let Some(col) = &diag.cursor_column {
195 println!(" Cursor col: {}", col);
196 }
197 if let Some(scan) = &diag.scan_type {
198 let idx_label = if diag.uses_index { " (indexed)" } else { "" };
199 println!(" Scan type: {}{}", scan, idx_label);
200 }
201 println!(" Verdict: {}", diag.verdict);
202 println!(
203 " Recommended: tuning.profile: {}",
204 diag.recommended_profile
205 );
206 let (par_level, par_reason) = diag.recommended_parallel;
207 if par_level > 1 {
208 println!(" Recommended: parallel: {} ({})", par_level, par_reason);
209 } else {
210 println!(" Parallelism: {} ({})", par_level, par_reason);
211 }
212 for w in &diag.warnings {
213 println!(" Warning: {}", w);
214 }
215 if let Some(suggestion) = &diag.suggestion {
216 println!(" Suggestion: {}", suggestion);
217 }
218}
219
220#[cfg(test)]
221mod tests {
222 use super::*;
223 use crate::config::{
224 CompressionType, DestinationConfig, DestinationType, ExportConfig, ExportMode, FormatType,
225 IncrementalCursorMode, MetaColumns, TimeColumnType,
226 };
227 use doctor::{
228 categorize_dest_error, categorize_source_error, destination_error_hint, source_error_hint,
229 };
230
231 fn make_export(name: &str, mode: ExportMode, cursor: Option<&str>) -> ExportConfig {
232 ExportConfig {
233 name: name.to_string(),
234 query: Some("SELECT * FROM t".to_string()),
235 query_file: None,
236 table: None,
237 mode,
238 cursor_column: cursor.map(|s| s.to_string()),
239 cursor_fallback_column: None,
240 incremental_cursor_mode: IncrementalCursorMode::SingleColumn,
241 chunk_column: None,
242 chunk_size: 100_000,
243 chunk_size_memory_mb: None,
244 chunk_count: None,
245 parallel: 1,
246 time_column: None,
247 time_column_type: TimeColumnType::Timestamp,
248 days_window: None,
249 format: FormatType::Csv,
250 compression: CompressionType::default(),
251 compression_level: None,
252 compression_profile: None,
253 skip_empty: false,
254 destination: DestinationConfig {
255 destination_type: DestinationType::Local,
256 path: Some("./out".to_string()),
257 ..Default::default()
258 },
259 meta_columns: MetaColumns::default(),
260 quality: None,
261 max_file_size: None,
262 chunk_checkpoint: false,
263 chunk_max_attempts: None,
264 tuning: None,
265 chunk_dense: false,
266 chunk_by_days: None,
267 source_group: None,
268 reconcile_required: false,
269 columns: Default::default(),
270 on_schema_drift: Default::default(),
271 shape_drift_warn_factor: None,
272 parquet: None,
273 }
274 }
275
276 #[test]
277 fn verdict_small_indexed_with_cursor_is_efficient() {
278 let v = compute_verdict(Some(500_000), true, true);
279 assert!(matches!(v, HealthVerdict::Efficient), "got: {v}");
280 }
281
282 #[test]
283 fn verdict_large_indexed_with_cursor_is_acceptable() {
284 let v = compute_verdict(Some(20_000_000), true, true);
285 assert!(matches!(v, HealthVerdict::Acceptable), "got: {v}");
286 }
287
288 #[test]
289 fn verdict_no_index_no_cursor_is_degraded() {
290 let v = compute_verdict(Some(500_000), false, false);
291 assert!(matches!(v, HealthVerdict::Degraded), "got: {v}");
292 }
293
294 #[test]
295 fn verdict_huge_no_index_is_unsafe() {
296 let v = compute_verdict(Some(100_000_000), false, false);
297 assert!(matches!(v, HealthVerdict::Unsafe), "got: {v}");
298 }
299
300 #[test]
301 fn parse_pg_row_estimate_from_sort_plan() {
302 let plan = "Sort (cost=12345.67..12456.78 rows=1000455 width=50)\n -> Seq Scan on orders (cost=0.00..8765.43 rows=1000455 width=50)";
303 assert_eq!(parse_pg_row_estimate(plan), Some(1_000_455));
304 }
305
306 #[test]
307 fn parse_pg_row_estimate_from_index_scan() {
308 let plan =
309 "Index Scan using idx_updated on orders (cost=0.42..81676.36 rows=500000 width=50)";
310 assert_eq!(parse_pg_row_estimate(plan), Some(500_000));
311 }
312
313 #[test]
314 fn extract_scan_type_detects_seq_scan() {
315 let plan = "Sort (cost=...)\n -> Seq Scan on users (cost=...)";
316 let st = extract_scan_type(plan);
317 assert!(st.contains("Seq Scan"), "expected Seq Scan, got: {st}");
318 }
319
320 #[test]
321 fn extract_scan_type_detects_index_scan() {
322 let plan = "Index Scan using users_pkey on users (cost=0.42..123.45 rows=100 width=50)";
323 let st = extract_scan_type(plan);
324 assert!(st.contains("Index Scan"), "expected Index Scan, got: {st}");
325 }
326
327 #[test]
328 fn suggestion_for_efficient_verdict_is_none() {
329 let e = make_export("t", ExportMode::Full, None);
330 let s = build_suggestion(&HealthVerdict::Efficient, Some(1000), true, &e);
331 assert!(
332 s.is_none(),
333 "efficient verdict should produce no suggestion"
334 );
335 }
336
337 #[test]
338 fn suggestion_for_degraded_verdict_recommends_safe_profile() {
339 let e = make_export("t", ExportMode::Full, None);
340 let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e);
341 let msg = s.expect("degraded verdict should produce a suggestion");
342 assert!(
343 msg.contains("safe"),
344 "suggestion should recommend safe profile, got: {msg}"
345 );
346 }
347
348 fn src_err(msg: &str) -> &'static str {
349 categorize_source_error(&anyhow::anyhow!("{}", msg))
350 }
351
352 #[test]
353 fn source_password_rejected_is_auth_error() {
354 assert_eq!(
355 src_err("password authentication failed for user \"rivet\""),
356 "auth error"
357 );
358 }
359
360 #[test]
361 fn source_authentication_failed_is_auth_error() {
362 assert_eq!(src_err("FATAL: authentication failed"), "auth error");
363 }
364
365 #[test]
366 fn source_access_denied_is_auth_error() {
367 assert_eq!(
368 src_err("Access denied for user 'rivet'@'localhost'"),
369 "auth error"
370 );
371 }
372
373 #[test]
374 fn source_connection_refused_is_connectivity() {
375 assert_eq!(
376 src_err("connection refused (os error 61)"),
377 "connectivity error"
378 );
379 }
380
381 #[test]
382 fn source_timed_out_is_connectivity() {
383 assert_eq!(src_err("connection timed out"), "connectivity error");
384 }
385
386 #[test]
387 fn source_dns_translate_host_is_connectivity() {
388 assert_eq!(
389 src_err("could not translate host name \"db.bad\" to address"),
390 "connectivity error"
391 );
392 }
393
394 #[test]
395 fn source_name_not_known_is_connectivity() {
396 assert_eq!(src_err("Name or service not known"), "connectivity error");
397 }
398
399 #[test]
400 fn source_unknown_error_is_generic() {
401 assert_eq!(src_err("something totally unexpected"), "error");
402 }
403
404 fn dest_config(dtype: DestinationType) -> DestinationConfig {
405 DestinationConfig {
406 destination_type: dtype,
407 bucket: Some("b".to_string()),
408 ..Default::default()
409 }
410 }
411
412 fn dest_err(msg: &str, dtype: DestinationType) -> &'static str {
413 let cfg = dest_config(dtype);
414 categorize_dest_error(&anyhow::anyhow!("{}", msg), &cfg)
415 }
416
417 #[test]
418 fn dest_credential_loading_is_auth_error() {
419 assert_eq!(
420 dest_err(
421 "loading credential to sign http request",
422 DestinationType::Gcs
423 ),
424 "auth error"
425 );
426 }
427
428 #[test]
429 fn dest_permission_denied_is_auth_error() {
430 assert_eq!(
431 dest_err("permission denied on resource bucket", DestinationType::S3),
432 "auth error"
433 );
434 }
435
436 #[test]
437 fn dest_forbidden_is_auth_error() {
438 assert_eq!(
439 dest_err("403 Forbidden", DestinationType::Gcs),
440 "auth error"
441 );
442 }
443
444 #[test]
445 fn dest_unauthorized_is_auth_error() {
446 assert_eq!(
447 dest_err("401 Unauthorized", DestinationType::S3),
448 "auth error"
449 );
450 }
451
452 #[test]
453 fn dest_invalid_grant_is_auth_error() {
454 assert_eq!(
455 dest_err(
456 "invalid_grant: token has been revoked",
457 DestinationType::Gcs
458 ),
459 "auth error"
460 );
461 }
462
463 #[test]
464 fn dest_nosuchbucket_s3_is_bucket_not_found() {
465 assert_eq!(
466 dest_err(
467 "NoSuchBucket: the specified bucket does not exist",
468 DestinationType::S3
469 ),
470 "bucket not found"
471 );
472 }
473
474 #[test]
475 fn dest_not_found_gcs_is_bucket_not_found() {
476 assert_eq!(
477 dest_err("bucket not found (404)", DestinationType::Gcs),
478 "bucket not found"
479 );
480 }
481
482 #[test]
483 fn dest_not_found_local_is_path_not_found() {
484 assert_eq!(
485 dest_err("path not found: /tmp/missing", DestinationType::Local),
486 "path not found"
487 );
488 }
489
490 #[test]
491 fn dest_connection_refused_is_connectivity() {
492 assert_eq!(
493 dest_err("connection refused to endpoint", DestinationType::S3),
494 "connectivity error"
495 );
496 }
497
498 #[test]
499 fn dest_dns_error_is_connectivity() {
500 assert_eq!(
501 dest_err("dns error: failed to lookup address", DestinationType::S3),
502 "connectivity error"
503 );
504 }
505
506 #[test]
507 fn dest_timed_out_is_connectivity() {
508 assert_eq!(
509 dest_err("request timed out after 30s", DestinationType::Gcs),
510 "connectivity error"
511 );
512 }
513
514 #[test]
515 fn dest_unknown_error_is_generic() {
516 assert_eq!(
517 dest_err("something else entirely", DestinationType::S3),
518 "error"
519 );
520 }
521
522 #[test]
523 fn strategy_full_scan() {
524 let e = make_export("t", ExportMode::Full, None);
525 assert_eq!(derive_strategy(&e), "full-scan");
526 }
527
528 #[test]
529 fn strategy_full_parallel() {
530 let mut e = make_export("t", ExportMode::Full, None);
531 e.parallel = 4;
532 assert_eq!(derive_strategy(&e), "full-parallel(4)");
533 }
534
535 #[test]
536 fn strategy_incremental() {
537 let e = make_export("t", ExportMode::Incremental, Some("updated_at"));
538 assert_eq!(derive_strategy(&e), "incremental(updated_at)");
539 }
540
541 #[test]
542 fn strategy_chunked() {
543 let mut e = make_export("t", ExportMode::Chunked, None);
544 e.chunk_column = Some("id".to_string());
545 e.chunk_size = 50_000;
546 assert_eq!(derive_strategy(&e), "chunked(id, size=50000)");
547 }
548
549 #[test]
550 fn strategy_chunked_parallel() {
551 let mut e = make_export("t", ExportMode::Chunked, None);
552 e.chunk_column = Some("id".to_string());
553 e.chunk_size = 50_000;
554 e.parallel = 3;
555 assert_eq!(derive_strategy(&e), "chunked-parallel(id, size=50000, p=3)");
556 }
557
558 #[test]
559 fn strategy_time_window() {
560 let mut e = make_export("t", ExportMode::TimeWindow, None);
561 e.time_column = Some("created_at".to_string());
562 e.days_window = Some(7);
563 assert_eq!(derive_strategy(&e), "time-window(created_at, 7d)");
564 }
565
566 #[test]
567 fn profile_small_indexed_is_fast() {
568 let e = make_export("t", ExportMode::Full, None);
569 assert_eq!(recommend_profile(Some(500_000), true, &e), "fast");
570 }
571
572 #[test]
573 fn profile_medium_indexed_is_balanced() {
574 let e = make_export("t", ExportMode::Full, None);
575 assert_eq!(recommend_profile(Some(5_000_000), true, &e), "balanced");
576 }
577
578 #[test]
579 fn profile_large_indexed_is_safe() {
580 let e = make_export("t", ExportMode::Full, None);
581 assert_eq!(recommend_profile(Some(50_000_000), true, &e), "safe");
582 }
583
584 #[test]
585 fn profile_small_no_index_is_balanced() {
586 let e = make_export("t", ExportMode::Full, None);
587 assert_eq!(recommend_profile(Some(50_000), false, &e), "balanced");
588 }
589
590 #[test]
591 fn profile_small_no_index_parallel_is_safe() {
592 let mut e = make_export("t", ExportMode::Full, None);
593 e.parallel = 4;
594 assert_eq!(recommend_profile(Some(50_000), false, &e), "safe");
595 }
596
597 #[test]
598 fn profile_medium_no_index_is_balanced() {
599 let e = make_export("t", ExportMode::Full, None);
600 assert_eq!(recommend_profile(Some(500_000), false, &e), "balanced");
601 }
602
603 #[test]
604 fn profile_large_no_index_is_safe() {
605 let e = make_export("t", ExportMode::Full, None);
606 assert_eq!(recommend_profile(Some(5_000_000), false, &e), "safe");
607 }
608
609 #[test]
610 fn sparse_range_warning_when_very_sparse() {
611 let mut e = make_export("t", ExportMode::Chunked, None);
612 e.chunk_column = Some("id".to_string());
613 e.chunk_size = 100_000;
614 let w = check_sparse_range(&e, Some(100_000), Some("1"), Some("10000000"));
615 assert!(w.is_some(), "should warn about sparse range");
616 let msg = w.unwrap();
617 assert!(msg.contains("Sparse key range"), "got: {msg}");
618 assert!(msg.contains("empty"), "got: {msg}");
619 }
620
621 #[test]
622 fn sparse_range_no_warning_when_dense() {
623 let mut e = make_export("t", ExportMode::Chunked, None);
624 e.chunk_column = Some("id".to_string());
625 e.chunk_size = 100_000;
626 let w = check_sparse_range(&e, Some(100_000), Some("1"), Some("100000"));
627 assert!(w.is_none(), "should not warn for dense range");
628 }
629
630 #[test]
631 fn sparse_range_skipped_when_chunk_dense() {
632 let mut e = make_export("t", ExportMode::Chunked, None);
633 e.chunk_column = Some("id".to_string());
634 e.chunk_dense = true;
635 e.chunk_size = 100_000;
636 let w = check_sparse_range(&e, Some(100_000), Some("1"), Some("10000000"));
637 assert!(
638 w.is_none(),
639 "chunk_dense uses ordinals, not physical id span"
640 );
641 }
642
643 #[test]
644 fn dense_surrogate_warning_when_chunk_dense_builtin() {
645 let mut e = make_export("t", ExportMode::Chunked, None);
646 e.chunk_column = Some("id".to_string());
647 e.chunk_dense = true;
648 e.query = Some("SELECT id FROM orders".to_string());
649 let w = check_dense_surrogate_cost(&e);
650 assert!(w.is_some(), "should warn about built-in ROW_NUMBER cost");
651 assert!(w.unwrap().contains("global sort"));
652 }
653
654 #[test]
655 fn sparse_range_not_triggered_for_non_chunked() {
656 let e = make_export("t", ExportMode::Full, None);
657 let w = check_sparse_range(&e, Some(100), Some("1"), Some("1000000"));
658 assert!(w.is_none(), "should not warn for non-chunked mode");
659 }
660
661 #[test]
662 fn dense_surrogate_warning_with_row_number() {
663 let mut e = make_export("t", ExportMode::Chunked, None);
664 e.chunk_column = Some("rn".to_string());
665 e.query = Some("SELECT *, ROW_NUMBER() OVER (ORDER BY id) AS rn FROM orders".to_string());
666 let w = check_dense_surrogate_cost(&e);
667 assert!(w.is_some(), "should warn about ROW_NUMBER cost");
668 assert!(w.unwrap().contains("global sort"));
669 }
670
671 #[test]
672 fn no_dense_surrogate_warning_without_row_number() {
673 let mut e = make_export("t", ExportMode::Chunked, None);
674 e.chunk_column = Some("id".to_string());
675 e.query = Some("SELECT * FROM orders".to_string());
676 let w = check_dense_surrogate_cost(&e);
677 assert!(w.is_none());
678 }
679
680 #[test]
681 fn no_dense_surrogate_warning_for_non_chunked() {
682 let mut e = make_export("t", ExportMode::Full, None);
683 e.query = Some("SELECT ROW_NUMBER() OVER () AS rn FROM t".to_string());
684 let w = check_dense_surrogate_cost(&e);
685 assert!(w.is_none(), "should not warn for non-chunked mode");
686 }
687
688 #[test]
689 fn parallel_memory_warning_large_dataset() {
690 let mut e = make_export("t", ExportMode::Chunked, None);
691 e.parallel = 4;
692 let w = check_parallel_memory_risk(&e, Some(10_000_000));
693 assert!(w.is_some(), "should warn about memory risk");
694 let msg = w.unwrap();
695 assert!(msg.contains("Parallel=4"), "got: {msg}");
696 assert!(msg.contains("memory"), "got: {msg}");
697 }
698
699 #[test]
700 fn no_parallel_memory_warning_small_dataset() {
701 let mut e = make_export("t", ExportMode::Chunked, None);
702 e.parallel = 4;
703 let w = check_parallel_memory_risk(&e, Some(1_000));
704 assert!(w.is_none(), "should not warn for small dataset");
705 }
706
707 #[test]
708 fn no_parallel_memory_warning_single_worker() {
709 let e = make_export("t", ExportMode::Full, None);
710 let w = check_parallel_memory_risk(&e, Some(100_000_000));
711 assert!(w.is_none(), "should not warn when parallel=1");
712 }
713
714 #[test]
715 fn suggestion_degraded_full_recommends_incremental() {
716 let e = make_export("t", ExportMode::Full, None);
717 let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e).unwrap();
718 assert!(s.contains("incremental"), "got: {s}");
719 }
720
721 #[test]
722 fn suggestion_degraded_chunked_recommends_index() {
723 let mut e = make_export("t", ExportMode::Chunked, None);
724 e.chunk_column = Some("id".to_string());
725 let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e).unwrap();
726 assert!(s.contains("index on 'id'"), "got: {s}");
727 }
728
729 #[test]
730 fn suggestion_degraded_time_window_recommends_index() {
731 let mut e = make_export("t", ExportMode::TimeWindow, None);
732 e.time_column = Some("created_at".to_string());
733 e.days_window = Some(7);
734 let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e).unwrap();
735 assert!(s.contains("index on 'created_at'"), "got: {s}");
736 }
737
738 #[test]
739 fn suggestion_unsafe_full_recommends_incremental() {
740 let e = make_export("t", ExportMode::Full, None);
741 let s = build_suggestion(&HealthVerdict::Unsafe, Some(100_000_000), false, &e).unwrap();
742 assert!(s.contains("incremental"), "got: {s}");
743 }
744
745 #[test]
746 fn suggestion_unsafe_chunked_recommends_index_and_parallel() {
747 let mut e = make_export("t", ExportMode::Chunked, None);
748 e.chunk_column = Some("id".to_string());
749 let s = build_suggestion(&HealthVerdict::Unsafe, Some(100_000_000), false, &e).unwrap();
750 assert!(s.contains("index on 'id'"), "got: {s}");
751 assert!(s.contains("parallel"), "got: {s}");
752 }
753
754 #[test]
755 fn suggestion_unsafe_incremental_recommends_index_on_cursor() {
756 let e = make_export("t", ExportMode::Incremental, Some("updated_at"));
757 let s = build_suggestion(&HealthVerdict::Unsafe, Some(100_000_000), false, &e).unwrap();
758 assert!(s.contains("index on 'updated_at'"), "got: {s}");
759 }
760
761 #[test]
762 fn suggestion_acceptable_large_full_recommends_incremental() {
763 let e = make_export("t", ExportMode::Full, None);
764 let s = build_suggestion(&HealthVerdict::Acceptable, Some(20_000_000), true, &e).unwrap();
765 assert!(s.contains("incremental"), "got: {s}");
766 }
767
768 #[test]
769 fn parallel_only_for_chunked_mode() {
770 let e = make_export("t", ExportMode::Full, None);
771 let (level, _) = recommend_parallelism(&e, Some(1_000_000), true);
772 assert_eq!(level, 1, "non-chunked mode should recommend 1");
773 }
774
775 #[test]
776 fn parallel_small_dataset_is_one() {
777 let mut e = make_export("t", ExportMode::Chunked, None);
778 e.chunk_column = Some("id".to_string());
779 let (level, _) = recommend_parallelism(&e, Some(10_000), true);
780 assert_eq!(level, 1, "small dataset should recommend 1");
781 }
782
783 #[test]
784 fn parallel_moderate_indexed_is_two() {
785 let mut e = make_export("t", ExportMode::Chunked, None);
786 e.chunk_column = Some("id".to_string());
787 let (level, _) = recommend_parallelism(&e, Some(200_000), true);
788 assert_eq!(level, 2, "moderate indexed dataset should recommend 2");
789 }
790
791 #[test]
792 fn parallel_large_indexed_is_four() {
793 let mut e = make_export("t", ExportMode::Chunked, None);
794 e.chunk_column = Some("id".to_string());
795 let (level, _) = recommend_parallelism(&e, Some(2_000_000), true);
796 assert_eq!(level, 4, "large indexed dataset should recommend 4");
797 }
798
799 #[test]
800 fn parallel_no_index_large_is_one() {
801 let mut e = make_export("t", ExportMode::Chunked, None);
802 e.chunk_column = Some("id".to_string());
803 let (level, reason) = recommend_parallelism(&e, Some(10_000_000), false);
804 assert_eq!(level, 1, "no index + large should recommend 1");
805 assert!(reason.contains("no index"), "got: {reason}");
806 }
807
808 #[test]
809 fn parallel_no_index_moderate_is_conservative() {
810 let mut e = make_export("t", ExportMode::Chunked, None);
811 e.chunk_column = Some("id".to_string());
812 let (level, _) = recommend_parallelism(&e, Some(200_000), false);
813 assert_eq!(
814 level, 2,
815 "no index + moderate should recommend 2 (conservative)"
816 );
817 }
818
819 #[test]
820 fn suggestion_acceptable_large_chunked_recommends_parallel() {
821 let mut e = make_export("t", ExportMode::Chunked, None);
822 e.chunk_column = Some("id".to_string());
823 let s = build_suggestion(&HealthVerdict::Acceptable, Some(20_000_000), true, &e).unwrap();
824 assert!(s.contains("parallel"), "got: {s}");
825 }
826
827 #[test]
828 fn connection_limit_warn_when_parallel_meets_max() {
829 let w = check_connection_limit(20, Some(20));
830 assert!(w.is_some(), "should warn when parallel == max_connections");
831 let msg = w.unwrap();
832 assert!(msg.contains("max_connections=20"), "got: {msg}");
833 assert!(msg.contains("parallel=20"), "got: {msg}");
834 }
835
836 #[test]
837 fn connection_limit_warn_when_parallel_exceeds_max() {
838 let w = check_connection_limit(100, Some(20));
839 assert!(w.is_some(), "should warn when parallel > max_connections");
840 let msg = w.unwrap();
841 assert!(msg.contains("max_connections=20"), "got: {msg}");
842 }
843
844 #[test]
845 fn connection_limit_no_warn_when_parallel_below_max() {
846 let w = check_connection_limit(4, Some(100));
847 assert!(
848 w.is_none(),
849 "should not warn when parallel << max_connections"
850 );
851 }
852
853 #[test]
854 fn connection_limit_no_warn_when_parallel_is_one() {
855 let w = check_connection_limit(1, Some(5));
856 assert!(
857 w.is_none(),
858 "single worker never triggers connection warning"
859 );
860 }
861
862 #[test]
863 fn connection_limit_skipped_note_when_max_unknown_and_parallel_gt_one() {
864 let w = check_connection_limit(100, None);
865 assert!(w.is_some(), "should note that check was skipped");
866 let msg = w.unwrap();
867 assert!(msg.contains("skipped"), "got: {msg}");
868 }
869
870 #[test]
871 fn connection_limit_no_note_when_max_unknown_and_parallel_is_one() {
872 let w = check_connection_limit(1, None);
873 assert!(
874 w.is_none(),
875 "single worker never triggers connection warning"
876 );
877 }
878
879 #[test]
880 fn connection_limit_suggests_headroom() {
881 let w = check_connection_limit(25, Some(20)).unwrap();
882 assert!(
884 w.contains("17"),
885 "should suggest leaving headroom, got: {w}"
886 );
887 }
888
889 fn src_hint(msg: &str, st: SourceType) -> Option<&'static str> {
892 let err = anyhow::anyhow!("{}", msg);
893 let cat = categorize_source_error(&err);
894 source_error_hint(cat, &err, &st)
895 }
896
897 fn dest_hint(msg: &str, dt: DestinationType) -> Option<&'static str> {
898 let err = anyhow::anyhow!("{}", msg);
899 let dest = DestinationConfig {
900 destination_type: dt,
901 bucket: Some("b".into()),
902 ..Default::default()
903 };
904 let cat = categorize_dest_error(&err, &dest);
905 destination_error_hint(cat, &dest)
906 }
907
908 #[test]
909 fn source_tls_handshake_returns_pg_specific_tls_hint() {
910 let h = src_hint("TLS handshake failed", SourceType::Postgres).expect("hint");
911 assert!(h.contains("tls.mode") && h.contains("ca_file"), "got: {h}");
912 }
913
914 #[test]
915 fn source_tls_handshake_returns_mysql_specific_tls_hint() {
916 let h = src_hint("certificate verify failed", SourceType::Mysql).expect("hint");
917 assert!(h.contains("tls.mode"), "got: {h}");
918 }
919
920 #[test]
921 fn source_auth_error_postgres_mentions_pg_hba() {
922 let h = src_hint("password authentication failed", SourceType::Postgres).expect("hint");
923 assert!(h.contains("pg_hba") && h.contains("SELECT"), "got: {h}");
924 }
925
926 #[test]
927 fn source_auth_error_mysql_mentions_grant() {
928 let h = src_hint(
929 "Access denied for user 'rivet'@'localhost'",
930 SourceType::Mysql,
931 )
932 .expect("hint");
933 assert!(h.contains("GRANT") && h.contains("FLUSH"), "got: {h}");
934 }
935
936 #[test]
937 fn source_connectivity_error_mentions_bastion_and_network() {
938 let h = src_hint("connection refused", SourceType::Postgres).expect("hint");
939 assert!(h.contains("bastion") || h.contains("VPN"), "got: {h}");
940 }
941
942 #[test]
943 fn source_unknown_error_returns_no_hint() {
944 let h = src_hint("totally unexpected", SourceType::Postgres);
947 assert!(h.is_none(), "unknown errors should not produce a hint");
948 }
949
950 #[test]
951 fn dest_s3_auth_error_names_concrete_actions() {
952 let h = dest_hint("permission denied", DestinationType::S3).expect("hint");
953 assert!(
954 h.contains("s3:PutObject") && h.contains("cloud-permissions"),
955 "got: {h}"
956 );
957 }
958
959 #[test]
960 fn dest_gcs_auth_error_names_concrete_actions() {
961 let h = dest_hint("403 Forbidden", DestinationType::Gcs).expect("hint");
962 assert!(
963 h.contains("storage.objects") && h.contains("cloud-permissions"),
964 "got: {h}"
965 );
966 }
967
968 #[test]
969 fn categorize_dest_error_sas_expired_message_returns_sas_expired_category() {
970 let err = anyhow::anyhow!(
975 "Azure SAS token already expired (se=2024-01-01T00:00:00Z). Generate a new SAS and re-export."
976 );
977 let dest = DestinationConfig {
978 destination_type: DestinationType::Azure,
979 bucket: Some("c".into()),
980 ..Default::default()
981 };
982 let cat = categorize_dest_error(&err, &dest);
983 assert_eq!(
984 cat, "sas expired",
985 "expired-SAS error must categorise as 'sas expired', not '{cat}' — ordering in categorize_dest_error is load-bearing"
986 );
987 }
988
989 #[test]
990 fn dest_azure_sas_expired_returns_regenerate_hint() {
991 let h = dest_hint(
995 "Azure SAS token already expired (se=2024-01-01T00:00:00Z)",
996 DestinationType::Azure,
997 )
998 .expect("hint");
999 assert!(
1000 h.contains("generate-sas") && h.contains("AZURE_STORAGE_SAS_TOKEN"),
1001 "got: {h}"
1002 );
1003 }
1004
1005 #[test]
1006 fn dest_s3_bucket_not_found_says_no_auto_create() {
1007 let h = dest_hint("NoSuchBucket", DestinationType::S3).expect("hint");
1008 assert!(
1009 h.contains("does NOT auto-create") && h.contains("aws s3 mb"),
1010 "got: {h}"
1011 );
1012 }
1013
1014 #[test]
1015 fn dest_s3_connectivity_error_warns_about_region_mismatch() {
1016 let h = dest_hint("dns error", DestinationType::S3).expect("hint");
1017 assert!(h.contains("region") || h.contains("endpoint"), "got: {h}");
1018 }
1019}