Skip to main content

rivet_cli/preflight/
mod.rs

1mod analysis;
2mod doctor;
3mod mysql;
4mod postgres;
5
6pub(crate) use analysis::chunk_sparsity_from_counts;
7#[cfg(test)]
8use analysis::{
9    build_suggestion, check_dense_surrogate_cost, check_parallel_memory_risk, check_sparse_range,
10    compute_verdict, derive_strategy, recommend_parallelism, recommend_profile,
11};
12pub use doctor::doctor;
13#[cfg(test)]
14use postgres::{extract_scan_type, parse_pg_row_estimate};
15
16use crate::config::{Config, ExportConfig, SourceType};
17use crate::error::Result;
18
19#[derive(Debug)]
20pub enum HealthVerdict {
21    Efficient,
22    Acceptable,
23    Degraded,
24    Unsafe,
25}
26
27impl std::fmt::Display for HealthVerdict {
28    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
29        match self {
30            Self::Efficient => write!(f, "EFFICIENT"),
31            Self::Acceptable => write!(f, "ACCEPTABLE"),
32            Self::Degraded => write!(f, "DEGRADED"),
33            Self::Unsafe => write!(f, "UNSAFE"),
34        }
35    }
36}
37
38pub(crate) struct ExportDiagnostic {
39    pub export_name: String,
40    pub strategy: String,
41    pub mode: String,
42    pub cursor_column: Option<String>,
43    pub row_estimate: Option<i64>,
44    pub cursor_min: Option<String>,
45    pub cursor_max: Option<String>,
46    pub scan_type: Option<String>,
47    pub uses_index: bool,
48    pub verdict: HealthVerdict,
49    pub recommended_profile: &'static str,
50    pub recommended_parallel: (u32, &'static str),
51    pub warnings: Vec<String>,
52    pub suggestion: Option<String>,
53}
54
55pub fn check(
56    config_path: &str,
57    export_name: Option<&str>,
58    params: Option<&std::collections::HashMap<String, String>>,
59) -> Result<()> {
60    let config = Config::load_with_params(config_path, params)?;
61
62    let exports: Vec<&ExportConfig> = if let Some(name) = export_name {
63        let e = config
64            .exports
65            .iter()
66            .find(|e| e.name == name)
67            .ok_or_else(|| anyhow::anyhow!("export '{}' not found in config", name))?;
68        vec![e]
69    } else {
70        config.exports.iter().collect()
71    };
72
73    let url = config.source.resolve_url()?;
74    match config.source.source_type {
75        SourceType::Postgres => postgres::check_postgres(&url, &exports)?,
76        SourceType::Mysql => mysql::check_mysql(&url, &exports)?,
77    }
78
79    Ok(())
80}
81
82fn print_diagnostic(diag: &ExportDiagnostic) {
83    println!();
84    println!("Export: {}", diag.export_name);
85    println!("  Strategy:     {}", diag.strategy);
86    println!("  Mode:         {}", diag.mode);
87    if let Some(est) = diag.row_estimate {
88        if est >= 1_000_000 {
89            println!("  Row estimate: ~{}M", est / 1_000_000);
90        } else if est >= 1_000 {
91            println!("  Row estimate: ~{}K", est / 1_000);
92        } else {
93            println!("  Row estimate: ~{}", est);
94        }
95    }
96    if let (Some(min_v), Some(max_v)) = (&diag.cursor_min, &diag.cursor_max) {
97        println!("  Cursor range: {} .. {}", min_v, max_v);
98    }
99    if let Some(col) = &diag.cursor_column {
100        println!("  Cursor col:   {}", col);
101    }
102    if let Some(scan) = &diag.scan_type {
103        let idx_label = if diag.uses_index { " (indexed)" } else { "" };
104        println!("  Scan type:    {}{}", scan, idx_label);
105    }
106    println!("  Verdict:      {}", diag.verdict);
107    println!(
108        "  Recommended:  tuning.profile: {}",
109        diag.recommended_profile
110    );
111    let (par_level, par_reason) = diag.recommended_parallel;
112    if par_level > 1 {
113        println!("  Recommended:  parallel: {} ({})", par_level, par_reason);
114    } else {
115        println!("  Parallelism:  {} ({})", par_level, par_reason);
116    }
117    for w in &diag.warnings {
118        println!("  Warning:      {}", w);
119    }
120    if let Some(suggestion) = &diag.suggestion {
121        println!("  Suggestion:   {}", suggestion);
122    }
123}
124
125#[cfg(test)]
126mod tests {
127    use super::*;
128    use crate::config::{
129        CompressionType, DestinationConfig, DestinationType, ExportConfig, ExportMode, FormatType,
130        MetaColumns, TimeColumnType,
131    };
132    use doctor::{categorize_dest_error, categorize_source_error};
133
134    fn make_export(name: &str, mode: ExportMode, cursor: Option<&str>) -> ExportConfig {
135        ExportConfig {
136            name: name.to_string(),
137            query: Some("SELECT * FROM t".to_string()),
138            query_file: None,
139            mode,
140            cursor_column: cursor.map(|s| s.to_string()),
141            chunk_column: None,
142            chunk_size: 100_000,
143            parallel: 1,
144            time_column: None,
145            time_column_type: TimeColumnType::Timestamp,
146            days_window: None,
147            format: FormatType::Csv,
148            compression: CompressionType::default(),
149            compression_level: None,
150            skip_empty: false,
151            destination: DestinationConfig {
152                destination_type: DestinationType::Local,
153                bucket: None,
154                prefix: None,
155                path: Some("./out".to_string()),
156                region: None,
157                endpoint: None,
158                credentials_file: None,
159                access_key_env: None,
160                secret_key_env: None,
161                aws_profile: None,
162                allow_anonymous: false,
163            },
164            meta_columns: MetaColumns::default(),
165            quality: None,
166            max_file_size: None,
167            chunk_checkpoint: false,
168            chunk_max_attempts: None,
169            tuning: None,
170            chunk_dense: false,
171        }
172    }
173
174    #[test]
175    fn verdict_small_indexed_with_cursor_is_efficient() {
176        let v = compute_verdict(Some(500_000), true, true);
177        assert!(matches!(v, HealthVerdict::Efficient), "got: {v}");
178    }
179
180    #[test]
181    fn verdict_large_indexed_with_cursor_is_acceptable() {
182        let v = compute_verdict(Some(20_000_000), true, true);
183        assert!(matches!(v, HealthVerdict::Acceptable), "got: {v}");
184    }
185
186    #[test]
187    fn verdict_no_index_no_cursor_is_degraded() {
188        let v = compute_verdict(Some(500_000), false, false);
189        assert!(matches!(v, HealthVerdict::Degraded), "got: {v}");
190    }
191
192    #[test]
193    fn verdict_huge_no_index_is_unsafe() {
194        let v = compute_verdict(Some(100_000_000), false, false);
195        assert!(matches!(v, HealthVerdict::Unsafe), "got: {v}");
196    }
197
198    #[test]
199    fn parse_pg_row_estimate_from_sort_plan() {
200        let plan = "Sort  (cost=12345.67..12456.78 rows=1000455 width=50)\n  ->  Seq Scan on orders  (cost=0.00..8765.43 rows=1000455 width=50)";
201        assert_eq!(parse_pg_row_estimate(plan), Some(1_000_455));
202    }
203
204    #[test]
205    fn parse_pg_row_estimate_from_index_scan() {
206        let plan =
207            "Index Scan using idx_updated on orders  (cost=0.42..81676.36 rows=500000 width=50)";
208        assert_eq!(parse_pg_row_estimate(plan), Some(500_000));
209    }
210
211    #[test]
212    fn extract_scan_type_detects_seq_scan() {
213        let plan = "Sort  (cost=...)\n  ->  Seq Scan on users  (cost=...)";
214        let st = extract_scan_type(plan);
215        assert!(st.contains("Seq Scan"), "expected Seq Scan, got: {st}");
216    }
217
218    #[test]
219    fn extract_scan_type_detects_index_scan() {
220        let plan = "Index Scan using users_pkey on users  (cost=0.42..123.45 rows=100 width=50)";
221        let st = extract_scan_type(plan);
222        assert!(st.contains("Index Scan"), "expected Index Scan, got: {st}");
223    }
224
225    #[test]
226    fn suggestion_for_efficient_verdict_is_none() {
227        let e = make_export("t", ExportMode::Full, None);
228        let s = build_suggestion(&HealthVerdict::Efficient, Some(1000), true, &e);
229        assert!(
230            s.is_none(),
231            "efficient verdict should produce no suggestion"
232        );
233    }
234
235    #[test]
236    fn suggestion_for_degraded_verdict_recommends_safe_profile() {
237        let e = make_export("t", ExportMode::Full, None);
238        let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e);
239        let msg = s.expect("degraded verdict should produce a suggestion");
240        assert!(
241            msg.contains("safe"),
242            "suggestion should recommend safe profile, got: {msg}"
243        );
244    }
245
246    fn src_err(msg: &str) -> &'static str {
247        categorize_source_error(&anyhow::anyhow!("{}", msg))
248    }
249
250    #[test]
251    fn source_password_rejected_is_auth_error() {
252        assert_eq!(
253            src_err("password authentication failed for user \"rivet\""),
254            "auth error"
255        );
256    }
257
258    #[test]
259    fn source_authentication_failed_is_auth_error() {
260        assert_eq!(src_err("FATAL: authentication failed"), "auth error");
261    }
262
263    #[test]
264    fn source_access_denied_is_auth_error() {
265        assert_eq!(
266            src_err("Access denied for user 'rivet'@'localhost'"),
267            "auth error"
268        );
269    }
270
271    #[test]
272    fn source_connection_refused_is_connectivity() {
273        assert_eq!(
274            src_err("connection refused (os error 61)"),
275            "connectivity error"
276        );
277    }
278
279    #[test]
280    fn source_timed_out_is_connectivity() {
281        assert_eq!(src_err("connection timed out"), "connectivity error");
282    }
283
284    #[test]
285    fn source_dns_translate_host_is_connectivity() {
286        assert_eq!(
287            src_err("could not translate host name \"db.bad\" to address"),
288            "connectivity error"
289        );
290    }
291
292    #[test]
293    fn source_name_not_known_is_connectivity() {
294        assert_eq!(src_err("Name or service not known"), "connectivity error");
295    }
296
297    #[test]
298    fn source_unknown_error_is_generic() {
299        assert_eq!(src_err("something totally unexpected"), "error");
300    }
301
302    fn dest_config(dtype: DestinationType) -> DestinationConfig {
303        DestinationConfig {
304            destination_type: dtype,
305            bucket: Some("b".to_string()),
306            prefix: None,
307            path: None,
308            region: None,
309            endpoint: None,
310            credentials_file: None,
311            access_key_env: None,
312            secret_key_env: None,
313            aws_profile: None,
314            allow_anonymous: false,
315        }
316    }
317
318    fn dest_err(msg: &str, dtype: DestinationType) -> &'static str {
319        let cfg = dest_config(dtype);
320        categorize_dest_error(&anyhow::anyhow!("{}", msg), &cfg)
321    }
322
323    #[test]
324    fn dest_credential_loading_is_auth_error() {
325        assert_eq!(
326            dest_err(
327                "loading credential to sign http request",
328                DestinationType::Gcs
329            ),
330            "auth error"
331        );
332    }
333
334    #[test]
335    fn dest_permission_denied_is_auth_error() {
336        assert_eq!(
337            dest_err("permission denied on resource bucket", DestinationType::S3),
338            "auth error"
339        );
340    }
341
342    #[test]
343    fn dest_forbidden_is_auth_error() {
344        assert_eq!(
345            dest_err("403 Forbidden", DestinationType::Gcs),
346            "auth error"
347        );
348    }
349
350    #[test]
351    fn dest_unauthorized_is_auth_error() {
352        assert_eq!(
353            dest_err("401 Unauthorized", DestinationType::S3),
354            "auth error"
355        );
356    }
357
358    #[test]
359    fn dest_invalid_grant_is_auth_error() {
360        assert_eq!(
361            dest_err(
362                "invalid_grant: token has been revoked",
363                DestinationType::Gcs
364            ),
365            "auth error"
366        );
367    }
368
369    #[test]
370    fn dest_nosuchbucket_s3_is_bucket_not_found() {
371        assert_eq!(
372            dest_err(
373                "NoSuchBucket: the specified bucket does not exist",
374                DestinationType::S3
375            ),
376            "bucket not found"
377        );
378    }
379
380    #[test]
381    fn dest_not_found_gcs_is_bucket_not_found() {
382        assert_eq!(
383            dest_err("bucket not found (404)", DestinationType::Gcs),
384            "bucket not found"
385        );
386    }
387
388    #[test]
389    fn dest_not_found_local_is_path_not_found() {
390        assert_eq!(
391            dest_err("path not found: /tmp/missing", DestinationType::Local),
392            "path not found"
393        );
394    }
395
396    #[test]
397    fn dest_connection_refused_is_connectivity() {
398        assert_eq!(
399            dest_err("connection refused to endpoint", DestinationType::S3),
400            "connectivity error"
401        );
402    }
403
404    #[test]
405    fn dest_dns_error_is_connectivity() {
406        assert_eq!(
407            dest_err("dns error: failed to lookup address", DestinationType::S3),
408            "connectivity error"
409        );
410    }
411
412    #[test]
413    fn dest_timed_out_is_connectivity() {
414        assert_eq!(
415            dest_err("request timed out after 30s", DestinationType::Gcs),
416            "connectivity error"
417        );
418    }
419
420    #[test]
421    fn dest_unknown_error_is_generic() {
422        assert_eq!(
423            dest_err("something else entirely", DestinationType::S3),
424            "error"
425        );
426    }
427
428    #[test]
429    fn strategy_full_scan() {
430        let e = make_export("t", ExportMode::Full, None);
431        assert_eq!(derive_strategy(&e), "full-scan");
432    }
433
434    #[test]
435    fn strategy_full_parallel() {
436        let mut e = make_export("t", ExportMode::Full, None);
437        e.parallel = 4;
438        assert_eq!(derive_strategy(&e), "full-parallel(4)");
439    }
440
441    #[test]
442    fn strategy_incremental() {
443        let e = make_export("t", ExportMode::Incremental, Some("updated_at"));
444        assert_eq!(derive_strategy(&e), "incremental(updated_at)");
445    }
446
447    #[test]
448    fn strategy_chunked() {
449        let mut e = make_export("t", ExportMode::Chunked, None);
450        e.chunk_column = Some("id".to_string());
451        e.chunk_size = 50_000;
452        assert_eq!(derive_strategy(&e), "chunked(id, size=50000)");
453    }
454
455    #[test]
456    fn strategy_chunked_parallel() {
457        let mut e = make_export("t", ExportMode::Chunked, None);
458        e.chunk_column = Some("id".to_string());
459        e.chunk_size = 50_000;
460        e.parallel = 3;
461        assert_eq!(derive_strategy(&e), "chunked-parallel(id, size=50000, p=3)");
462    }
463
464    #[test]
465    fn strategy_time_window() {
466        let mut e = make_export("t", ExportMode::TimeWindow, None);
467        e.time_column = Some("created_at".to_string());
468        e.days_window = Some(7);
469        assert_eq!(derive_strategy(&e), "time-window(created_at, 7d)");
470    }
471
472    #[test]
473    fn profile_small_indexed_is_fast() {
474        let e = make_export("t", ExportMode::Full, None);
475        assert_eq!(recommend_profile(Some(500_000), true, &e), "fast");
476    }
477
478    #[test]
479    fn profile_medium_indexed_is_balanced() {
480        let e = make_export("t", ExportMode::Full, None);
481        assert_eq!(recommend_profile(Some(5_000_000), true, &e), "balanced");
482    }
483
484    #[test]
485    fn profile_large_indexed_is_safe() {
486        let e = make_export("t", ExportMode::Full, None);
487        assert_eq!(recommend_profile(Some(50_000_000), true, &e), "safe");
488    }
489
490    #[test]
491    fn profile_small_no_index_is_balanced() {
492        let e = make_export("t", ExportMode::Full, None);
493        assert_eq!(recommend_profile(Some(50_000), false, &e), "balanced");
494    }
495
496    #[test]
497    fn profile_small_no_index_parallel_is_safe() {
498        let mut e = make_export("t", ExportMode::Full, None);
499        e.parallel = 4;
500        assert_eq!(recommend_profile(Some(50_000), false, &e), "safe");
501    }
502
503    #[test]
504    fn profile_medium_no_index_is_balanced() {
505        let e = make_export("t", ExportMode::Full, None);
506        assert_eq!(recommend_profile(Some(500_000), false, &e), "balanced");
507    }
508
509    #[test]
510    fn profile_large_no_index_is_safe() {
511        let e = make_export("t", ExportMode::Full, None);
512        assert_eq!(recommend_profile(Some(5_000_000), false, &e), "safe");
513    }
514
515    #[test]
516    fn sparse_range_warning_when_very_sparse() {
517        let mut e = make_export("t", ExportMode::Chunked, None);
518        e.chunk_column = Some("id".to_string());
519        e.chunk_size = 100_000;
520        let w = check_sparse_range(&e, Some(100_000), Some("1"), Some("10000000"));
521        assert!(w.is_some(), "should warn about sparse range");
522        let msg = w.unwrap();
523        assert!(msg.contains("Sparse key range"), "got: {msg}");
524        assert!(msg.contains("empty"), "got: {msg}");
525    }
526
527    #[test]
528    fn sparse_range_no_warning_when_dense() {
529        let mut e = make_export("t", ExportMode::Chunked, None);
530        e.chunk_column = Some("id".to_string());
531        e.chunk_size = 100_000;
532        let w = check_sparse_range(&e, Some(100_000), Some("1"), Some("100000"));
533        assert!(w.is_none(), "should not warn for dense range");
534    }
535
536    #[test]
537    fn sparse_range_skipped_when_chunk_dense() {
538        let mut e = make_export("t", ExportMode::Chunked, None);
539        e.chunk_column = Some("id".to_string());
540        e.chunk_dense = true;
541        e.chunk_size = 100_000;
542        let w = check_sparse_range(&e, Some(100_000), Some("1"), Some("10000000"));
543        assert!(
544            w.is_none(),
545            "chunk_dense uses ordinals, not physical id span"
546        );
547    }
548
549    #[test]
550    fn dense_surrogate_warning_when_chunk_dense_builtin() {
551        let mut e = make_export("t", ExportMode::Chunked, None);
552        e.chunk_column = Some("id".to_string());
553        e.chunk_dense = true;
554        e.query = Some("SELECT id FROM orders".to_string());
555        let w = check_dense_surrogate_cost(&e);
556        assert!(w.is_some(), "should warn about built-in ROW_NUMBER cost");
557        assert!(w.unwrap().contains("global sort"));
558    }
559
560    #[test]
561    fn sparse_range_not_triggered_for_non_chunked() {
562        let e = make_export("t", ExportMode::Full, None);
563        let w = check_sparse_range(&e, Some(100), Some("1"), Some("1000000"));
564        assert!(w.is_none(), "should not warn for non-chunked mode");
565    }
566
567    #[test]
568    fn dense_surrogate_warning_with_row_number() {
569        let mut e = make_export("t", ExportMode::Chunked, None);
570        e.chunk_column = Some("rn".to_string());
571        e.query = Some("SELECT *, ROW_NUMBER() OVER (ORDER BY id) AS rn FROM orders".to_string());
572        let w = check_dense_surrogate_cost(&e);
573        assert!(w.is_some(), "should warn about ROW_NUMBER cost");
574        assert!(w.unwrap().contains("global sort"));
575    }
576
577    #[test]
578    fn no_dense_surrogate_warning_without_row_number() {
579        let mut e = make_export("t", ExportMode::Chunked, None);
580        e.chunk_column = Some("id".to_string());
581        e.query = Some("SELECT * FROM orders".to_string());
582        let w = check_dense_surrogate_cost(&e);
583        assert!(w.is_none());
584    }
585
586    #[test]
587    fn no_dense_surrogate_warning_for_non_chunked() {
588        let mut e = make_export("t", ExportMode::Full, None);
589        e.query = Some("SELECT ROW_NUMBER() OVER () AS rn FROM t".to_string());
590        let w = check_dense_surrogate_cost(&e);
591        assert!(w.is_none(), "should not warn for non-chunked mode");
592    }
593
594    #[test]
595    fn parallel_memory_warning_large_dataset() {
596        let mut e = make_export("t", ExportMode::Chunked, None);
597        e.parallel = 4;
598        let w = check_parallel_memory_risk(&e, Some(10_000_000));
599        assert!(w.is_some(), "should warn about memory risk");
600        let msg = w.unwrap();
601        assert!(msg.contains("Parallel=4"), "got: {msg}");
602        assert!(msg.contains("memory"), "got: {msg}");
603    }
604
605    #[test]
606    fn no_parallel_memory_warning_small_dataset() {
607        let mut e = make_export("t", ExportMode::Chunked, None);
608        e.parallel = 4;
609        let w = check_parallel_memory_risk(&e, Some(1_000));
610        assert!(w.is_none(), "should not warn for small dataset");
611    }
612
613    #[test]
614    fn no_parallel_memory_warning_single_worker() {
615        let e = make_export("t", ExportMode::Full, None);
616        let w = check_parallel_memory_risk(&e, Some(100_000_000));
617        assert!(w.is_none(), "should not warn when parallel=1");
618    }
619
620    #[test]
621    fn suggestion_degraded_full_recommends_incremental() {
622        let e = make_export("t", ExportMode::Full, None);
623        let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e).unwrap();
624        assert!(s.contains("incremental"), "got: {s}");
625    }
626
627    #[test]
628    fn suggestion_degraded_chunked_recommends_index() {
629        let mut e = make_export("t", ExportMode::Chunked, None);
630        e.chunk_column = Some("id".to_string());
631        let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e).unwrap();
632        assert!(s.contains("index on 'id'"), "got: {s}");
633    }
634
635    #[test]
636    fn suggestion_degraded_time_window_recommends_index() {
637        let mut e = make_export("t", ExportMode::TimeWindow, None);
638        e.time_column = Some("created_at".to_string());
639        e.days_window = Some(7);
640        let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e).unwrap();
641        assert!(s.contains("index on 'created_at'"), "got: {s}");
642    }
643
644    #[test]
645    fn suggestion_unsafe_full_recommends_incremental() {
646        let e = make_export("t", ExportMode::Full, None);
647        let s = build_suggestion(&HealthVerdict::Unsafe, Some(100_000_000), false, &e).unwrap();
648        assert!(s.contains("incremental"), "got: {s}");
649    }
650
651    #[test]
652    fn suggestion_unsafe_chunked_recommends_index_and_parallel() {
653        let mut e = make_export("t", ExportMode::Chunked, None);
654        e.chunk_column = Some("id".to_string());
655        let s = build_suggestion(&HealthVerdict::Unsafe, Some(100_000_000), false, &e).unwrap();
656        assert!(s.contains("index on 'id'"), "got: {s}");
657        assert!(s.contains("parallel"), "got: {s}");
658    }
659
660    #[test]
661    fn suggestion_unsafe_incremental_recommends_index_on_cursor() {
662        let e = make_export("t", ExportMode::Incremental, Some("updated_at"));
663        let s = build_suggestion(&HealthVerdict::Unsafe, Some(100_000_000), false, &e).unwrap();
664        assert!(s.contains("index on 'updated_at'"), "got: {s}");
665    }
666
667    #[test]
668    fn suggestion_acceptable_large_full_recommends_incremental() {
669        let e = make_export("t", ExportMode::Full, None);
670        let s = build_suggestion(&HealthVerdict::Acceptable, Some(20_000_000), true, &e).unwrap();
671        assert!(s.contains("incremental"), "got: {s}");
672    }
673
674    #[test]
675    fn parallel_only_for_chunked_mode() {
676        let e = make_export("t", ExportMode::Full, None);
677        let (level, _) = recommend_parallelism(&e, Some(1_000_000), true);
678        assert_eq!(level, 1, "non-chunked mode should recommend 1");
679    }
680
681    #[test]
682    fn parallel_small_dataset_is_one() {
683        let mut e = make_export("t", ExportMode::Chunked, None);
684        e.chunk_column = Some("id".to_string());
685        let (level, _) = recommend_parallelism(&e, Some(10_000), true);
686        assert_eq!(level, 1, "small dataset should recommend 1");
687    }
688
689    #[test]
690    fn parallel_moderate_indexed_is_two() {
691        let mut e = make_export("t", ExportMode::Chunked, None);
692        e.chunk_column = Some("id".to_string());
693        let (level, _) = recommend_parallelism(&e, Some(200_000), true);
694        assert_eq!(level, 2, "moderate indexed dataset should recommend 2");
695    }
696
697    #[test]
698    fn parallel_large_indexed_is_four() {
699        let mut e = make_export("t", ExportMode::Chunked, None);
700        e.chunk_column = Some("id".to_string());
701        let (level, _) = recommend_parallelism(&e, Some(2_000_000), true);
702        assert_eq!(level, 4, "large indexed dataset should recommend 4");
703    }
704
705    #[test]
706    fn parallel_no_index_large_is_one() {
707        let mut e = make_export("t", ExportMode::Chunked, None);
708        e.chunk_column = Some("id".to_string());
709        let (level, reason) = recommend_parallelism(&e, Some(10_000_000), false);
710        assert_eq!(level, 1, "no index + large should recommend 1");
711        assert!(reason.contains("no index"), "got: {reason}");
712    }
713
714    #[test]
715    fn parallel_no_index_moderate_is_conservative() {
716        let mut e = make_export("t", ExportMode::Chunked, None);
717        e.chunk_column = Some("id".to_string());
718        let (level, _) = recommend_parallelism(&e, Some(200_000), false);
719        assert_eq!(
720            level, 2,
721            "no index + moderate should recommend 2 (conservative)"
722        );
723    }
724
725    #[test]
726    fn suggestion_acceptable_large_chunked_recommends_parallel() {
727        let mut e = make_export("t", ExportMode::Chunked, None);
728        e.chunk_column = Some("id".to_string());
729        let s = build_suggestion(&HealthVerdict::Acceptable, Some(20_000_000), true, &e).unwrap();
730        assert!(s.contains("parallel"), "got: {s}");
731    }
732}