1mod analysis;
2mod doctor;
3mod mysql;
4mod postgres;
5
6pub(crate) use analysis::chunk_sparsity_from_counts;
7#[cfg(test)]
8use analysis::{
9 build_suggestion, check_dense_surrogate_cost, check_parallel_memory_risk, check_sparse_range,
10 compute_verdict, derive_strategy, recommend_parallelism, recommend_profile,
11};
12pub use doctor::doctor;
13#[cfg(test)]
14use postgres::{extract_scan_type, parse_pg_row_estimate};
15
16use crate::config::{Config, ExportConfig, SourceType};
17use crate::error::Result;
18
19#[derive(Debug)]
20pub enum HealthVerdict {
21 Efficient,
22 Acceptable,
23 Degraded,
24 Unsafe,
25}
26
27impl std::fmt::Display for HealthVerdict {
28 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
29 match self {
30 Self::Efficient => write!(f, "EFFICIENT"),
31 Self::Acceptable => write!(f, "ACCEPTABLE"),
32 Self::Degraded => write!(f, "DEGRADED"),
33 Self::Unsafe => write!(f, "UNSAFE"),
34 }
35 }
36}
37
38pub(crate) struct ExportDiagnostic {
39 pub export_name: String,
40 pub strategy: String,
41 pub mode: String,
42 pub cursor_column: Option<String>,
43 pub row_estimate: Option<i64>,
44 pub cursor_min: Option<String>,
45 pub cursor_max: Option<String>,
46 pub scan_type: Option<String>,
47 pub uses_index: bool,
48 pub verdict: HealthVerdict,
49 pub recommended_profile: &'static str,
50 pub recommended_parallel: (u32, &'static str),
51 pub warnings: Vec<String>,
52 pub suggestion: Option<String>,
53}
54
55pub fn check(
56 config_path: &str,
57 export_name: Option<&str>,
58 params: Option<&std::collections::HashMap<String, String>>,
59) -> Result<()> {
60 let config = Config::load_with_params(config_path, params)?;
61
62 let exports: Vec<&ExportConfig> = if let Some(name) = export_name {
63 let e = config
64 .exports
65 .iter()
66 .find(|e| e.name == name)
67 .ok_or_else(|| anyhow::anyhow!("export '{}' not found in config", name))?;
68 vec![e]
69 } else {
70 config.exports.iter().collect()
71 };
72
73 let url = config.source.resolve_url()?;
74 match config.source.source_type {
75 SourceType::Postgres => postgres::check_postgres(&url, &exports)?,
76 SourceType::Mysql => mysql::check_mysql(&url, &exports)?,
77 }
78
79 Ok(())
80}
81
82fn print_diagnostic(diag: &ExportDiagnostic) {
83 println!();
84 println!("Export: {}", diag.export_name);
85 println!(" Strategy: {}", diag.strategy);
86 println!(" Mode: {}", diag.mode);
87 if let Some(est) = diag.row_estimate {
88 if est >= 1_000_000 {
89 println!(" Row estimate: ~{}M", est / 1_000_000);
90 } else if est >= 1_000 {
91 println!(" Row estimate: ~{}K", est / 1_000);
92 } else {
93 println!(" Row estimate: ~{}", est);
94 }
95 }
96 if let (Some(min_v), Some(max_v)) = (&diag.cursor_min, &diag.cursor_max) {
97 println!(" Cursor range: {} .. {}", min_v, max_v);
98 }
99 if let Some(col) = &diag.cursor_column {
100 println!(" Cursor col: {}", col);
101 }
102 if let Some(scan) = &diag.scan_type {
103 let idx_label = if diag.uses_index { " (indexed)" } else { "" };
104 println!(" Scan type: {}{}", scan, idx_label);
105 }
106 println!(" Verdict: {}", diag.verdict);
107 println!(
108 " Recommended: tuning.profile: {}",
109 diag.recommended_profile
110 );
111 let (par_level, par_reason) = diag.recommended_parallel;
112 if par_level > 1 {
113 println!(" Recommended: parallel: {} ({})", par_level, par_reason);
114 } else {
115 println!(" Parallelism: {} ({})", par_level, par_reason);
116 }
117 for w in &diag.warnings {
118 println!(" Warning: {}", w);
119 }
120 if let Some(suggestion) = &diag.suggestion {
121 println!(" Suggestion: {}", suggestion);
122 }
123}
124
125#[cfg(test)]
126mod tests {
127 use super::*;
128 use crate::config::{
129 CompressionType, DestinationConfig, DestinationType, ExportConfig, ExportMode, FormatType,
130 MetaColumns, TimeColumnType,
131 };
132 use doctor::{categorize_dest_error, categorize_source_error};
133
134 fn make_export(name: &str, mode: ExportMode, cursor: Option<&str>) -> ExportConfig {
135 ExportConfig {
136 name: name.to_string(),
137 query: Some("SELECT * FROM t".to_string()),
138 query_file: None,
139 mode,
140 cursor_column: cursor.map(|s| s.to_string()),
141 chunk_column: None,
142 chunk_size: 100_000,
143 parallel: 1,
144 time_column: None,
145 time_column_type: TimeColumnType::Timestamp,
146 days_window: None,
147 format: FormatType::Csv,
148 compression: CompressionType::default(),
149 compression_level: None,
150 skip_empty: false,
151 destination: DestinationConfig {
152 destination_type: DestinationType::Local,
153 bucket: None,
154 prefix: None,
155 path: Some("./out".to_string()),
156 region: None,
157 endpoint: None,
158 credentials_file: None,
159 access_key_env: None,
160 secret_key_env: None,
161 aws_profile: None,
162 allow_anonymous: false,
163 },
164 meta_columns: MetaColumns::default(),
165 quality: None,
166 max_file_size: None,
167 chunk_checkpoint: false,
168 chunk_max_attempts: None,
169 tuning: None,
170 chunk_dense: false,
171 }
172 }
173
174 #[test]
175 fn verdict_small_indexed_with_cursor_is_efficient() {
176 let v = compute_verdict(Some(500_000), true, true);
177 assert!(matches!(v, HealthVerdict::Efficient), "got: {v}");
178 }
179
180 #[test]
181 fn verdict_large_indexed_with_cursor_is_acceptable() {
182 let v = compute_verdict(Some(20_000_000), true, true);
183 assert!(matches!(v, HealthVerdict::Acceptable), "got: {v}");
184 }
185
186 #[test]
187 fn verdict_no_index_no_cursor_is_degraded() {
188 let v = compute_verdict(Some(500_000), false, false);
189 assert!(matches!(v, HealthVerdict::Degraded), "got: {v}");
190 }
191
192 #[test]
193 fn verdict_huge_no_index_is_unsafe() {
194 let v = compute_verdict(Some(100_000_000), false, false);
195 assert!(matches!(v, HealthVerdict::Unsafe), "got: {v}");
196 }
197
198 #[test]
199 fn parse_pg_row_estimate_from_sort_plan() {
200 let plan = "Sort (cost=12345.67..12456.78 rows=1000455 width=50)\n -> Seq Scan on orders (cost=0.00..8765.43 rows=1000455 width=50)";
201 assert_eq!(parse_pg_row_estimate(plan), Some(1_000_455));
202 }
203
204 #[test]
205 fn parse_pg_row_estimate_from_index_scan() {
206 let plan =
207 "Index Scan using idx_updated on orders (cost=0.42..81676.36 rows=500000 width=50)";
208 assert_eq!(parse_pg_row_estimate(plan), Some(500_000));
209 }
210
211 #[test]
212 fn extract_scan_type_detects_seq_scan() {
213 let plan = "Sort (cost=...)\n -> Seq Scan on users (cost=...)";
214 let st = extract_scan_type(plan);
215 assert!(st.contains("Seq Scan"), "expected Seq Scan, got: {st}");
216 }
217
218 #[test]
219 fn extract_scan_type_detects_index_scan() {
220 let plan = "Index Scan using users_pkey on users (cost=0.42..123.45 rows=100 width=50)";
221 let st = extract_scan_type(plan);
222 assert!(st.contains("Index Scan"), "expected Index Scan, got: {st}");
223 }
224
225 #[test]
226 fn suggestion_for_efficient_verdict_is_none() {
227 let e = make_export("t", ExportMode::Full, None);
228 let s = build_suggestion(&HealthVerdict::Efficient, Some(1000), true, &e);
229 assert!(
230 s.is_none(),
231 "efficient verdict should produce no suggestion"
232 );
233 }
234
235 #[test]
236 fn suggestion_for_degraded_verdict_recommends_safe_profile() {
237 let e = make_export("t", ExportMode::Full, None);
238 let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e);
239 let msg = s.expect("degraded verdict should produce a suggestion");
240 assert!(
241 msg.contains("safe"),
242 "suggestion should recommend safe profile, got: {msg}"
243 );
244 }
245
246 fn src_err(msg: &str) -> &'static str {
247 categorize_source_error(&anyhow::anyhow!("{}", msg))
248 }
249
250 #[test]
251 fn source_password_rejected_is_auth_error() {
252 assert_eq!(
253 src_err("password authentication failed for user \"rivet\""),
254 "auth error"
255 );
256 }
257
258 #[test]
259 fn source_authentication_failed_is_auth_error() {
260 assert_eq!(src_err("FATAL: authentication failed"), "auth error");
261 }
262
263 #[test]
264 fn source_access_denied_is_auth_error() {
265 assert_eq!(
266 src_err("Access denied for user 'rivet'@'localhost'"),
267 "auth error"
268 );
269 }
270
271 #[test]
272 fn source_connection_refused_is_connectivity() {
273 assert_eq!(
274 src_err("connection refused (os error 61)"),
275 "connectivity error"
276 );
277 }
278
279 #[test]
280 fn source_timed_out_is_connectivity() {
281 assert_eq!(src_err("connection timed out"), "connectivity error");
282 }
283
284 #[test]
285 fn source_dns_translate_host_is_connectivity() {
286 assert_eq!(
287 src_err("could not translate host name \"db.bad\" to address"),
288 "connectivity error"
289 );
290 }
291
292 #[test]
293 fn source_name_not_known_is_connectivity() {
294 assert_eq!(src_err("Name or service not known"), "connectivity error");
295 }
296
297 #[test]
298 fn source_unknown_error_is_generic() {
299 assert_eq!(src_err("something totally unexpected"), "error");
300 }
301
302 fn dest_config(dtype: DestinationType) -> DestinationConfig {
303 DestinationConfig {
304 destination_type: dtype,
305 bucket: Some("b".to_string()),
306 prefix: None,
307 path: None,
308 region: None,
309 endpoint: None,
310 credentials_file: None,
311 access_key_env: None,
312 secret_key_env: None,
313 aws_profile: None,
314 allow_anonymous: false,
315 }
316 }
317
318 fn dest_err(msg: &str, dtype: DestinationType) -> &'static str {
319 let cfg = dest_config(dtype);
320 categorize_dest_error(&anyhow::anyhow!("{}", msg), &cfg)
321 }
322
323 #[test]
324 fn dest_credential_loading_is_auth_error() {
325 assert_eq!(
326 dest_err(
327 "loading credential to sign http request",
328 DestinationType::Gcs
329 ),
330 "auth error"
331 );
332 }
333
334 #[test]
335 fn dest_permission_denied_is_auth_error() {
336 assert_eq!(
337 dest_err("permission denied on resource bucket", DestinationType::S3),
338 "auth error"
339 );
340 }
341
342 #[test]
343 fn dest_forbidden_is_auth_error() {
344 assert_eq!(
345 dest_err("403 Forbidden", DestinationType::Gcs),
346 "auth error"
347 );
348 }
349
350 #[test]
351 fn dest_unauthorized_is_auth_error() {
352 assert_eq!(
353 dest_err("401 Unauthorized", DestinationType::S3),
354 "auth error"
355 );
356 }
357
358 #[test]
359 fn dest_invalid_grant_is_auth_error() {
360 assert_eq!(
361 dest_err(
362 "invalid_grant: token has been revoked",
363 DestinationType::Gcs
364 ),
365 "auth error"
366 );
367 }
368
369 #[test]
370 fn dest_nosuchbucket_s3_is_bucket_not_found() {
371 assert_eq!(
372 dest_err(
373 "NoSuchBucket: the specified bucket does not exist",
374 DestinationType::S3
375 ),
376 "bucket not found"
377 );
378 }
379
380 #[test]
381 fn dest_not_found_gcs_is_bucket_not_found() {
382 assert_eq!(
383 dest_err("bucket not found (404)", DestinationType::Gcs),
384 "bucket not found"
385 );
386 }
387
388 #[test]
389 fn dest_not_found_local_is_path_not_found() {
390 assert_eq!(
391 dest_err("path not found: /tmp/missing", DestinationType::Local),
392 "path not found"
393 );
394 }
395
396 #[test]
397 fn dest_connection_refused_is_connectivity() {
398 assert_eq!(
399 dest_err("connection refused to endpoint", DestinationType::S3),
400 "connectivity error"
401 );
402 }
403
404 #[test]
405 fn dest_dns_error_is_connectivity() {
406 assert_eq!(
407 dest_err("dns error: failed to lookup address", DestinationType::S3),
408 "connectivity error"
409 );
410 }
411
412 #[test]
413 fn dest_timed_out_is_connectivity() {
414 assert_eq!(
415 dest_err("request timed out after 30s", DestinationType::Gcs),
416 "connectivity error"
417 );
418 }
419
420 #[test]
421 fn dest_unknown_error_is_generic() {
422 assert_eq!(
423 dest_err("something else entirely", DestinationType::S3),
424 "error"
425 );
426 }
427
428 #[test]
429 fn strategy_full_scan() {
430 let e = make_export("t", ExportMode::Full, None);
431 assert_eq!(derive_strategy(&e), "full-scan");
432 }
433
434 #[test]
435 fn strategy_full_parallel() {
436 let mut e = make_export("t", ExportMode::Full, None);
437 e.parallel = 4;
438 assert_eq!(derive_strategy(&e), "full-parallel(4)");
439 }
440
441 #[test]
442 fn strategy_incremental() {
443 let e = make_export("t", ExportMode::Incremental, Some("updated_at"));
444 assert_eq!(derive_strategy(&e), "incremental(updated_at)");
445 }
446
447 #[test]
448 fn strategy_chunked() {
449 let mut e = make_export("t", ExportMode::Chunked, None);
450 e.chunk_column = Some("id".to_string());
451 e.chunk_size = 50_000;
452 assert_eq!(derive_strategy(&e), "chunked(id, size=50000)");
453 }
454
455 #[test]
456 fn strategy_chunked_parallel() {
457 let mut e = make_export("t", ExportMode::Chunked, None);
458 e.chunk_column = Some("id".to_string());
459 e.chunk_size = 50_000;
460 e.parallel = 3;
461 assert_eq!(derive_strategy(&e), "chunked-parallel(id, size=50000, p=3)");
462 }
463
464 #[test]
465 fn strategy_time_window() {
466 let mut e = make_export("t", ExportMode::TimeWindow, None);
467 e.time_column = Some("created_at".to_string());
468 e.days_window = Some(7);
469 assert_eq!(derive_strategy(&e), "time-window(created_at, 7d)");
470 }
471
472 #[test]
473 fn profile_small_indexed_is_fast() {
474 let e = make_export("t", ExportMode::Full, None);
475 assert_eq!(recommend_profile(Some(500_000), true, &e), "fast");
476 }
477
478 #[test]
479 fn profile_medium_indexed_is_balanced() {
480 let e = make_export("t", ExportMode::Full, None);
481 assert_eq!(recommend_profile(Some(5_000_000), true, &e), "balanced");
482 }
483
484 #[test]
485 fn profile_large_indexed_is_safe() {
486 let e = make_export("t", ExportMode::Full, None);
487 assert_eq!(recommend_profile(Some(50_000_000), true, &e), "safe");
488 }
489
490 #[test]
491 fn profile_small_no_index_is_balanced() {
492 let e = make_export("t", ExportMode::Full, None);
493 assert_eq!(recommend_profile(Some(50_000), false, &e), "balanced");
494 }
495
496 #[test]
497 fn profile_small_no_index_parallel_is_safe() {
498 let mut e = make_export("t", ExportMode::Full, None);
499 e.parallel = 4;
500 assert_eq!(recommend_profile(Some(50_000), false, &e), "safe");
501 }
502
503 #[test]
504 fn profile_medium_no_index_is_balanced() {
505 let e = make_export("t", ExportMode::Full, None);
506 assert_eq!(recommend_profile(Some(500_000), false, &e), "balanced");
507 }
508
509 #[test]
510 fn profile_large_no_index_is_safe() {
511 let e = make_export("t", ExportMode::Full, None);
512 assert_eq!(recommend_profile(Some(5_000_000), false, &e), "safe");
513 }
514
515 #[test]
516 fn sparse_range_warning_when_very_sparse() {
517 let mut e = make_export("t", ExportMode::Chunked, None);
518 e.chunk_column = Some("id".to_string());
519 e.chunk_size = 100_000;
520 let w = check_sparse_range(&e, Some(100_000), Some("1"), Some("10000000"));
521 assert!(w.is_some(), "should warn about sparse range");
522 let msg = w.unwrap();
523 assert!(msg.contains("Sparse key range"), "got: {msg}");
524 assert!(msg.contains("empty"), "got: {msg}");
525 }
526
527 #[test]
528 fn sparse_range_no_warning_when_dense() {
529 let mut e = make_export("t", ExportMode::Chunked, None);
530 e.chunk_column = Some("id".to_string());
531 e.chunk_size = 100_000;
532 let w = check_sparse_range(&e, Some(100_000), Some("1"), Some("100000"));
533 assert!(w.is_none(), "should not warn for dense range");
534 }
535
536 #[test]
537 fn sparse_range_skipped_when_chunk_dense() {
538 let mut e = make_export("t", ExportMode::Chunked, None);
539 e.chunk_column = Some("id".to_string());
540 e.chunk_dense = true;
541 e.chunk_size = 100_000;
542 let w = check_sparse_range(&e, Some(100_000), Some("1"), Some("10000000"));
543 assert!(
544 w.is_none(),
545 "chunk_dense uses ordinals, not physical id span"
546 );
547 }
548
549 #[test]
550 fn dense_surrogate_warning_when_chunk_dense_builtin() {
551 let mut e = make_export("t", ExportMode::Chunked, None);
552 e.chunk_column = Some("id".to_string());
553 e.chunk_dense = true;
554 e.query = Some("SELECT id FROM orders".to_string());
555 let w = check_dense_surrogate_cost(&e);
556 assert!(w.is_some(), "should warn about built-in ROW_NUMBER cost");
557 assert!(w.unwrap().contains("global sort"));
558 }
559
560 #[test]
561 fn sparse_range_not_triggered_for_non_chunked() {
562 let e = make_export("t", ExportMode::Full, None);
563 let w = check_sparse_range(&e, Some(100), Some("1"), Some("1000000"));
564 assert!(w.is_none(), "should not warn for non-chunked mode");
565 }
566
567 #[test]
568 fn dense_surrogate_warning_with_row_number() {
569 let mut e = make_export("t", ExportMode::Chunked, None);
570 e.chunk_column = Some("rn".to_string());
571 e.query = Some("SELECT *, ROW_NUMBER() OVER (ORDER BY id) AS rn FROM orders".to_string());
572 let w = check_dense_surrogate_cost(&e);
573 assert!(w.is_some(), "should warn about ROW_NUMBER cost");
574 assert!(w.unwrap().contains("global sort"));
575 }
576
577 #[test]
578 fn no_dense_surrogate_warning_without_row_number() {
579 let mut e = make_export("t", ExportMode::Chunked, None);
580 e.chunk_column = Some("id".to_string());
581 e.query = Some("SELECT * FROM orders".to_string());
582 let w = check_dense_surrogate_cost(&e);
583 assert!(w.is_none());
584 }
585
586 #[test]
587 fn no_dense_surrogate_warning_for_non_chunked() {
588 let mut e = make_export("t", ExportMode::Full, None);
589 e.query = Some("SELECT ROW_NUMBER() OVER () AS rn FROM t".to_string());
590 let w = check_dense_surrogate_cost(&e);
591 assert!(w.is_none(), "should not warn for non-chunked mode");
592 }
593
594 #[test]
595 fn parallel_memory_warning_large_dataset() {
596 let mut e = make_export("t", ExportMode::Chunked, None);
597 e.parallel = 4;
598 let w = check_parallel_memory_risk(&e, Some(10_000_000));
599 assert!(w.is_some(), "should warn about memory risk");
600 let msg = w.unwrap();
601 assert!(msg.contains("Parallel=4"), "got: {msg}");
602 assert!(msg.contains("memory"), "got: {msg}");
603 }
604
605 #[test]
606 fn no_parallel_memory_warning_small_dataset() {
607 let mut e = make_export("t", ExportMode::Chunked, None);
608 e.parallel = 4;
609 let w = check_parallel_memory_risk(&e, Some(1_000));
610 assert!(w.is_none(), "should not warn for small dataset");
611 }
612
613 #[test]
614 fn no_parallel_memory_warning_single_worker() {
615 let e = make_export("t", ExportMode::Full, None);
616 let w = check_parallel_memory_risk(&e, Some(100_000_000));
617 assert!(w.is_none(), "should not warn when parallel=1");
618 }
619
620 #[test]
621 fn suggestion_degraded_full_recommends_incremental() {
622 let e = make_export("t", ExportMode::Full, None);
623 let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e).unwrap();
624 assert!(s.contains("incremental"), "got: {s}");
625 }
626
627 #[test]
628 fn suggestion_degraded_chunked_recommends_index() {
629 let mut e = make_export("t", ExportMode::Chunked, None);
630 e.chunk_column = Some("id".to_string());
631 let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e).unwrap();
632 assert!(s.contains("index on 'id'"), "got: {s}");
633 }
634
635 #[test]
636 fn suggestion_degraded_time_window_recommends_index() {
637 let mut e = make_export("t", ExportMode::TimeWindow, None);
638 e.time_column = Some("created_at".to_string());
639 e.days_window = Some(7);
640 let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e).unwrap();
641 assert!(s.contains("index on 'created_at'"), "got: {s}");
642 }
643
644 #[test]
645 fn suggestion_unsafe_full_recommends_incremental() {
646 let e = make_export("t", ExportMode::Full, None);
647 let s = build_suggestion(&HealthVerdict::Unsafe, Some(100_000_000), false, &e).unwrap();
648 assert!(s.contains("incremental"), "got: {s}");
649 }
650
651 #[test]
652 fn suggestion_unsafe_chunked_recommends_index_and_parallel() {
653 let mut e = make_export("t", ExportMode::Chunked, None);
654 e.chunk_column = Some("id".to_string());
655 let s = build_suggestion(&HealthVerdict::Unsafe, Some(100_000_000), false, &e).unwrap();
656 assert!(s.contains("index on 'id'"), "got: {s}");
657 assert!(s.contains("parallel"), "got: {s}");
658 }
659
660 #[test]
661 fn suggestion_unsafe_incremental_recommends_index_on_cursor() {
662 let e = make_export("t", ExportMode::Incremental, Some("updated_at"));
663 let s = build_suggestion(&HealthVerdict::Unsafe, Some(100_000_000), false, &e).unwrap();
664 assert!(s.contains("index on 'updated_at'"), "got: {s}");
665 }
666
667 #[test]
668 fn suggestion_acceptable_large_full_recommends_incremental() {
669 let e = make_export("t", ExportMode::Full, None);
670 let s = build_suggestion(&HealthVerdict::Acceptable, Some(20_000_000), true, &e).unwrap();
671 assert!(s.contains("incremental"), "got: {s}");
672 }
673
674 #[test]
675 fn parallel_only_for_chunked_mode() {
676 let e = make_export("t", ExportMode::Full, None);
677 let (level, _) = recommend_parallelism(&e, Some(1_000_000), true);
678 assert_eq!(level, 1, "non-chunked mode should recommend 1");
679 }
680
681 #[test]
682 fn parallel_small_dataset_is_one() {
683 let mut e = make_export("t", ExportMode::Chunked, None);
684 e.chunk_column = Some("id".to_string());
685 let (level, _) = recommend_parallelism(&e, Some(10_000), true);
686 assert_eq!(level, 1, "small dataset should recommend 1");
687 }
688
689 #[test]
690 fn parallel_moderate_indexed_is_two() {
691 let mut e = make_export("t", ExportMode::Chunked, None);
692 e.chunk_column = Some("id".to_string());
693 let (level, _) = recommend_parallelism(&e, Some(200_000), true);
694 assert_eq!(level, 2, "moderate indexed dataset should recommend 2");
695 }
696
697 #[test]
698 fn parallel_large_indexed_is_four() {
699 let mut e = make_export("t", ExportMode::Chunked, None);
700 e.chunk_column = Some("id".to_string());
701 let (level, _) = recommend_parallelism(&e, Some(2_000_000), true);
702 assert_eq!(level, 4, "large indexed dataset should recommend 4");
703 }
704
705 #[test]
706 fn parallel_no_index_large_is_one() {
707 let mut e = make_export("t", ExportMode::Chunked, None);
708 e.chunk_column = Some("id".to_string());
709 let (level, reason) = recommend_parallelism(&e, Some(10_000_000), false);
710 assert_eq!(level, 1, "no index + large should recommend 1");
711 assert!(reason.contains("no index"), "got: {reason}");
712 }
713
714 #[test]
715 fn parallel_no_index_moderate_is_conservative() {
716 let mut e = make_export("t", ExportMode::Chunked, None);
717 e.chunk_column = Some("id".to_string());
718 let (level, _) = recommend_parallelism(&e, Some(200_000), false);
719 assert_eq!(
720 level, 2,
721 "no index + moderate should recommend 2 (conservative)"
722 );
723 }
724
725 #[test]
726 fn suggestion_acceptable_large_chunked_recommends_parallel() {
727 let mut e = make_export("t", ExportMode::Chunked, None);
728 e.chunk_column = Some("id".to_string());
729 let s = build_suggestion(&HealthVerdict::Acceptable, Some(20_000_000), true, &e).unwrap();
730 assert!(s.contains("parallel"), "got: {s}");
731 }
732}