Skip to main content

rivet/preflight/
mod.rs

1mod analysis;
2pub(crate) mod cursor_expr;
3mod doctor;
4mod mysql;
5mod postgres;
6pub mod type_report;
7
8pub(crate) use analysis::chunk_sparsity_from_counts;
9#[cfg(test)]
10use analysis::{
11    build_suggestion, check_connection_limit, check_dense_surrogate_cost,
12    check_parallel_memory_risk, check_sparse_range, compute_verdict, derive_strategy,
13    recommend_parallelism, recommend_profile,
14};
15#[allow(unused_imports)]
16pub use doctor::doctor;
17#[cfg(test)]
18use postgres::{extract_scan_type, parse_pg_row_estimate};
19
20use crate::config::{Config, ExportConfig, SourceType};
21use crate::error::Result;
22use crate::types::policy::TypePolicy;
23use crate::types::target::ExportTarget;
24
25#[derive(Debug)]
26pub enum HealthVerdict {
27    Efficient,
28    Acceptable,
29    Degraded,
30    Unsafe,
31}
32
33impl std::fmt::Display for HealthVerdict {
34    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35        match self {
36            Self::Efficient => write!(f, "EFFICIENT"),
37            Self::Acceptable => write!(f, "ACCEPTABLE"),
38            Self::Degraded => write!(f, "DEGRADED"),
39            Self::Unsafe => write!(f, "UNSAFE"),
40        }
41    }
42}
43
44pub(crate) struct ExportDiagnostic {
45    pub export_name: String,
46    pub strategy: String,
47    pub mode: String,
48    pub cursor_column: Option<String>,
49    pub row_estimate: Option<i64>,
50    pub cursor_min: Option<String>,
51    pub cursor_max: Option<String>,
52    pub scan_type: Option<String>,
53    pub uses_index: bool,
54    pub verdict: HealthVerdict,
55    pub recommended_profile: &'static str,
56    pub recommended_parallel: (u32, &'static str),
57    pub warnings: Vec<String>,
58    pub suggestion: Option<String>,
59}
60
61/// Return the diagnostic for a single export without printing anything.
62///
63/// Used by `rivet plan` to capture preflight data into a `PlanArtifact`.
64pub(crate) fn get_export_diagnostic(
65    config: &Config,
66    export: &ExportConfig,
67) -> Result<ExportDiagnostic> {
68    let url = config.source.resolve_url()?;
69    let tls = config.source.tls.as_ref();
70    crate::source::warn_if_tls_disabled(&config.source);
71    match config.source.source_type {
72        SourceType::Postgres => postgres::diagnose_export_pg(&url, tls, export),
73        SourceType::Mysql => mysql::diagnose_export_mysql(&url, tls, export),
74    }
75}
76
77pub fn check(
78    config_path: &str,
79    export_name: Option<&str>,
80    params: Option<&std::collections::HashMap<String, String>>,
81    show_type_report: bool,
82    strict: bool,
83    json_output: bool,
84    target: Option<ExportTarget>,
85) -> Result<()> {
86    let config = Config::load_with_params(config_path, params)?;
87
88    let exports: Vec<&ExportConfig> = if let Some(name) = export_name {
89        let e = config
90            .exports
91            .iter()
92            .find(|e| e.name == name)
93            .ok_or_else(|| anyhow::anyhow!("export '{}' not found in config", name))?;
94        vec![e]
95    } else {
96        config.exports.iter().collect()
97    };
98
99    let url = config.source.resolve_url()?;
100    let tls = config.source.tls.as_ref();
101    // Surface the plaintext-transport warning at preflight time too —
102    // operators should hear it from `rivet check` before they wait
103    // through a full `rivet run` to learn the same thing. `Once` inside
104    // the helper keeps emission to one line per process even when both
105    // `check` and `run` flow through it.
106    crate::source::warn_if_tls_disabled(&config.source);
107    match config.source.source_type {
108        SourceType::Postgres => postgres::check_postgres(&url, tls, &exports, json_output)?,
109        SourceType::Mysql => mysql::check_mysql(&url, tls, &exports, json_output)?,
110    }
111
112    // Destination credential-resolution preflight.  Until 0.7.6 `check` only
113    // probed the source: a config with `AWS_ACCESS_KEY_ID` unset would pass
114    // `rivet check` (rc=0) and then explode on `run`, while `rivet doctor`
115    // caught it.  We don't issue a write-probe here (that is `doctor`'s job
116    // and has side effects) — but we *do* call `create_destination`, which
117    // resolves env vars / credentials_file existence at construction time.
118    // Each unique destination is probed once per `check` to keep multi-export
119    // configs cheap.
120    let mut seen_destinations: std::collections::HashSet<String> = std::collections::HashSet::new();
121    for export in &exports {
122        let dest_key = format!(
123            "{:?}:{}:{}:{}",
124            export.destination.destination_type,
125            export.destination.bucket.as_deref().unwrap_or("-"),
126            export.destination.endpoint.as_deref().unwrap_or("-"),
127            export.destination.path.as_deref().unwrap_or("-"),
128        );
129        if !seen_destinations.insert(dest_key) {
130            continue;
131        }
132        let expanded = crate::plan::build::expand_destination_templates(
133            export.destination.clone(),
134            &export.name,
135        );
136        crate::destination::create_destination(&expanded).map_err(|e| {
137            anyhow::anyhow!(
138                "export '{}': destination preflight failed: {:#}",
139                export.name,
140                e
141            )
142        })?;
143    }
144
145    if show_type_report {
146        let policy = if strict {
147            TypePolicy::strict()
148        } else {
149            TypePolicy::warn_only()
150        };
151
152        let mut any_fatal = false;
153        for export in &exports {
154            let column_overrides =
155                crate::plan::parse_column_overrides_pub(&export.columns, &export.name)?;
156            match type_report::collect_report(&config, export, &column_overrides, &policy, target) {
157                Ok(report) => {
158                    if report.has_fatal() {
159                        any_fatal = true;
160                    }
161                    if target.is_some() && report.has_target_fail() {
162                        any_fatal = true;
163                    }
164                    if json_output {
165                        type_report::print_json(&report)?;
166                    } else {
167                        type_report::print_table(&report, target);
168                    }
169                }
170                Err(e) => {
171                    log::warn!("type report for '{}' failed: {:#}", export.name, e);
172                }
173            }
174        }
175
176        if strict && any_fatal {
177            anyhow::bail!("strict mode: unsafe type mappings found (see report above)");
178        }
179    }
180
181    Ok(())
182}
183
184fn print_diagnostic(diag: &ExportDiagnostic) {
185    println!();
186    println!("Export: {}", diag.export_name);
187    println!("  Strategy:     {}", diag.strategy);
188    println!("  Mode:         {}", diag.mode);
189    if let Some(est) = diag.row_estimate {
190        if est >= 1_000_000 {
191            println!("  Row estimate: ~{}M", est / 1_000_000);
192        } else if est >= 1_000 {
193            println!("  Row estimate: ~{}K", est / 1_000);
194        } else {
195            println!("  Row estimate: ~{}", est);
196        }
197    }
198    if let (Some(min_v), Some(max_v)) = (&diag.cursor_min, &diag.cursor_max) {
199        println!("  Cursor range: {} .. {}", min_v, max_v);
200    }
201    if let Some(col) = &diag.cursor_column {
202        println!("  Cursor col:   {}", col);
203    }
204    if let Some(scan) = &diag.scan_type {
205        let idx_label = if diag.uses_index { " (indexed)" } else { "" };
206        println!("  Scan type:    {}{}", scan, idx_label);
207    }
208    println!("  Verdict:      {}", diag.verdict);
209    println!(
210        "  Recommended:  tuning.profile: {}",
211        diag.recommended_profile
212    );
213    let (par_level, par_reason) = diag.recommended_parallel;
214    if par_level > 1 {
215        println!("  Recommended:  parallel: {} ({})", par_level, par_reason);
216    } else {
217        println!("  Parallelism:  {} ({})", par_level, par_reason);
218    }
219    for w in &diag.warnings {
220        println!("  Warning:      {}", w);
221    }
222    if let Some(suggestion) = &diag.suggestion {
223        println!("  Suggestion:   {}", suggestion);
224    }
225}
226
227#[cfg(test)]
228mod tests {
229    use super::*;
230    use crate::config::{
231        CompressionType, DestinationConfig, DestinationType, ExportConfig, ExportMode, FormatType,
232        IncrementalCursorMode, MetaColumns, TimeColumnType,
233    };
234    use doctor::{
235        categorize_dest_error, categorize_source_error, destination_error_hint, source_error_hint,
236    };
237
238    fn make_export(name: &str, mode: ExportMode, cursor: Option<&str>) -> ExportConfig {
239        ExportConfig {
240            name: name.to_string(),
241            query: Some("SELECT * FROM t".to_string()),
242            query_file: None,
243            table: None,
244            mode,
245            cursor_column: cursor.map(|s| s.to_string()),
246            cursor_fallback_column: None,
247            incremental_cursor_mode: IncrementalCursorMode::SingleColumn,
248            chunk_column: None,
249            chunk_size: 100_000,
250            chunk_size_memory_mb: None,
251            chunk_count: None,
252            parallel: 1,
253            time_column: None,
254            time_column_type: TimeColumnType::Timestamp,
255            days_window: None,
256            format: FormatType::Csv,
257            compression: CompressionType::default(),
258            compression_level: None,
259            compression_profile: None,
260            skip_empty: false,
261            destination: DestinationConfig {
262                destination_type: DestinationType::Local,
263                path: Some("./out".to_string()),
264                ..Default::default()
265            },
266            meta_columns: MetaColumns::default(),
267            quality: None,
268            max_file_size: None,
269            chunk_checkpoint: false,
270            chunk_max_attempts: None,
271            tuning: None,
272            chunk_dense: false,
273            chunk_by_days: None,
274            source_group: None,
275            reconcile_required: false,
276            columns: Default::default(),
277            on_schema_drift: Default::default(),
278            shape_drift_warn_factor: None,
279            parquet: None,
280        }
281    }
282
283    #[test]
284    fn verdict_small_indexed_with_cursor_is_efficient() {
285        let v = compute_verdict(Some(500_000), true, true);
286        assert!(matches!(v, HealthVerdict::Efficient), "got: {v}");
287    }
288
289    #[test]
290    fn verdict_large_indexed_with_cursor_is_acceptable() {
291        let v = compute_verdict(Some(20_000_000), true, true);
292        assert!(matches!(v, HealthVerdict::Acceptable), "got: {v}");
293    }
294
295    #[test]
296    fn verdict_no_index_no_cursor_is_degraded() {
297        let v = compute_verdict(Some(500_000), false, false);
298        assert!(matches!(v, HealthVerdict::Degraded), "got: {v}");
299    }
300
301    #[test]
302    fn verdict_huge_no_index_is_unsafe() {
303        let v = compute_verdict(Some(100_000_000), false, false);
304        assert!(matches!(v, HealthVerdict::Unsafe), "got: {v}");
305    }
306
307    #[test]
308    fn parse_pg_row_estimate_from_sort_plan() {
309        let plan = "Sort  (cost=12345.67..12456.78 rows=1000455 width=50)\n  ->  Seq Scan on orders  (cost=0.00..8765.43 rows=1000455 width=50)";
310        assert_eq!(parse_pg_row_estimate(plan), Some(1_000_455));
311    }
312
313    #[test]
314    fn parse_pg_row_estimate_from_index_scan() {
315        let plan =
316            "Index Scan using idx_updated on orders  (cost=0.42..81676.36 rows=500000 width=50)";
317        assert_eq!(parse_pg_row_estimate(plan), Some(500_000));
318    }
319
320    #[test]
321    fn extract_scan_type_detects_seq_scan() {
322        let plan = "Sort  (cost=...)\n  ->  Seq Scan on users  (cost=...)";
323        let st = extract_scan_type(plan);
324        assert!(st.contains("Seq Scan"), "expected Seq Scan, got: {st}");
325    }
326
327    #[test]
328    fn extract_scan_type_detects_index_scan() {
329        let plan = "Index Scan using users_pkey on users  (cost=0.42..123.45 rows=100 width=50)";
330        let st = extract_scan_type(plan);
331        assert!(st.contains("Index Scan"), "expected Index Scan, got: {st}");
332    }
333
334    #[test]
335    fn suggestion_for_efficient_verdict_is_none() {
336        let e = make_export("t", ExportMode::Full, None);
337        let s = build_suggestion(&HealthVerdict::Efficient, Some(1000), true, &e);
338        assert!(
339            s.is_none(),
340            "efficient verdict should produce no suggestion"
341        );
342    }
343
344    #[test]
345    fn suggestion_for_degraded_verdict_recommends_safe_profile() {
346        let e = make_export("t", ExportMode::Full, None);
347        let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e);
348        let msg = s.expect("degraded verdict should produce a suggestion");
349        assert!(
350            msg.contains("safe"),
351            "suggestion should recommend safe profile, got: {msg}"
352        );
353    }
354
355    fn src_err(msg: &str) -> &'static str {
356        categorize_source_error(&anyhow::anyhow!("{}", msg))
357    }
358
359    #[test]
360    fn source_password_rejected_is_auth_error() {
361        assert_eq!(
362            src_err("password authentication failed for user \"rivet\""),
363            "auth error"
364        );
365    }
366
367    #[test]
368    fn source_authentication_failed_is_auth_error() {
369        assert_eq!(src_err("FATAL: authentication failed"), "auth error");
370    }
371
372    #[test]
373    fn source_access_denied_is_auth_error() {
374        assert_eq!(
375            src_err("Access denied for user 'rivet'@'localhost'"),
376            "auth error"
377        );
378    }
379
380    #[test]
381    fn source_connection_refused_is_connectivity() {
382        assert_eq!(
383            src_err("connection refused (os error 61)"),
384            "connectivity error"
385        );
386    }
387
388    #[test]
389    fn source_timed_out_is_connectivity() {
390        assert_eq!(src_err("connection timed out"), "connectivity error");
391    }
392
393    #[test]
394    fn source_dns_translate_host_is_connectivity() {
395        assert_eq!(
396            src_err("could not translate host name \"db.bad\" to address"),
397            "connectivity error"
398        );
399    }
400
401    #[test]
402    fn source_name_not_known_is_connectivity() {
403        assert_eq!(src_err("Name or service not known"), "connectivity error");
404    }
405
406    #[test]
407    fn source_unknown_error_is_generic() {
408        assert_eq!(src_err("something totally unexpected"), "error");
409    }
410
411    fn dest_config(dtype: DestinationType) -> DestinationConfig {
412        DestinationConfig {
413            destination_type: dtype,
414            bucket: Some("b".to_string()),
415            ..Default::default()
416        }
417    }
418
419    fn dest_err(msg: &str, dtype: DestinationType) -> &'static str {
420        let cfg = dest_config(dtype);
421        categorize_dest_error(&anyhow::anyhow!("{}", msg), &cfg)
422    }
423
424    #[test]
425    fn dest_credential_loading_is_auth_error() {
426        assert_eq!(
427            dest_err(
428                "loading credential to sign http request",
429                DestinationType::Gcs
430            ),
431            "auth error"
432        );
433    }
434
435    #[test]
436    fn dest_permission_denied_is_auth_error() {
437        assert_eq!(
438            dest_err("permission denied on resource bucket", DestinationType::S3),
439            "auth error"
440        );
441    }
442
443    #[test]
444    fn dest_forbidden_is_auth_error() {
445        assert_eq!(
446            dest_err("403 Forbidden", DestinationType::Gcs),
447            "auth error"
448        );
449    }
450
451    #[test]
452    fn dest_unauthorized_is_auth_error() {
453        assert_eq!(
454            dest_err("401 Unauthorized", DestinationType::S3),
455            "auth error"
456        );
457    }
458
459    #[test]
460    fn dest_invalid_grant_is_auth_error() {
461        assert_eq!(
462            dest_err(
463                "invalid_grant: token has been revoked",
464                DestinationType::Gcs
465            ),
466            "auth error"
467        );
468    }
469
470    #[test]
471    fn dest_nosuchbucket_s3_is_bucket_not_found() {
472        assert_eq!(
473            dest_err(
474                "NoSuchBucket: the specified bucket does not exist",
475                DestinationType::S3
476            ),
477            "bucket not found"
478        );
479    }
480
481    #[test]
482    fn dest_not_found_gcs_is_bucket_not_found() {
483        assert_eq!(
484            dest_err("bucket not found (404)", DestinationType::Gcs),
485            "bucket not found"
486        );
487    }
488
489    #[test]
490    fn dest_not_found_local_is_path_not_found() {
491        assert_eq!(
492            dest_err("path not found: /tmp/missing", DestinationType::Local),
493            "path not found"
494        );
495    }
496
497    #[test]
498    fn dest_connection_refused_is_connectivity() {
499        assert_eq!(
500            dest_err("connection refused to endpoint", DestinationType::S3),
501            "connectivity error"
502        );
503    }
504
505    #[test]
506    fn dest_dns_error_is_connectivity() {
507        assert_eq!(
508            dest_err("dns error: failed to lookup address", DestinationType::S3),
509            "connectivity error"
510        );
511    }
512
513    #[test]
514    fn dest_timed_out_is_connectivity() {
515        assert_eq!(
516            dest_err("request timed out after 30s", DestinationType::Gcs),
517            "connectivity error"
518        );
519    }
520
521    #[test]
522    fn dest_unknown_error_is_generic() {
523        assert_eq!(
524            dest_err("something else entirely", DestinationType::S3),
525            "error"
526        );
527    }
528
529    #[test]
530    fn strategy_full_scan() {
531        let e = make_export("t", ExportMode::Full, None);
532        assert_eq!(derive_strategy(&e), "full-scan");
533    }
534
535    #[test]
536    fn strategy_full_parallel() {
537        let mut e = make_export("t", ExportMode::Full, None);
538        e.parallel = 4;
539        assert_eq!(derive_strategy(&e), "full-parallel(4)");
540    }
541
542    #[test]
543    fn strategy_incremental() {
544        let e = make_export("t", ExportMode::Incremental, Some("updated_at"));
545        assert_eq!(derive_strategy(&e), "incremental(updated_at)");
546    }
547
548    #[test]
549    fn strategy_chunked() {
550        let mut e = make_export("t", ExportMode::Chunked, None);
551        e.chunk_column = Some("id".to_string());
552        e.chunk_size = 50_000;
553        assert_eq!(derive_strategy(&e), "chunked(id, size=50000)");
554    }
555
556    #[test]
557    fn strategy_chunked_parallel() {
558        let mut e = make_export("t", ExportMode::Chunked, None);
559        e.chunk_column = Some("id".to_string());
560        e.chunk_size = 50_000;
561        e.parallel = 3;
562        assert_eq!(derive_strategy(&e), "chunked-parallel(id, size=50000, p=3)");
563    }
564
565    #[test]
566    fn strategy_time_window() {
567        let mut e = make_export("t", ExportMode::TimeWindow, None);
568        e.time_column = Some("created_at".to_string());
569        e.days_window = Some(7);
570        assert_eq!(derive_strategy(&e), "time-window(created_at, 7d)");
571    }
572
573    #[test]
574    fn profile_small_indexed_is_fast() {
575        let e = make_export("t", ExportMode::Full, None);
576        assert_eq!(recommend_profile(Some(500_000), true, &e), "fast");
577    }
578
579    #[test]
580    fn profile_medium_indexed_is_balanced() {
581        let e = make_export("t", ExportMode::Full, None);
582        assert_eq!(recommend_profile(Some(5_000_000), true, &e), "balanced");
583    }
584
585    #[test]
586    fn profile_large_indexed_is_safe() {
587        let e = make_export("t", ExportMode::Full, None);
588        assert_eq!(recommend_profile(Some(50_000_000), true, &e), "safe");
589    }
590
591    #[test]
592    fn profile_small_no_index_is_balanced() {
593        let e = make_export("t", ExportMode::Full, None);
594        assert_eq!(recommend_profile(Some(50_000), false, &e), "balanced");
595    }
596
597    #[test]
598    fn profile_small_no_index_parallel_is_safe() {
599        let mut e = make_export("t", ExportMode::Full, None);
600        e.parallel = 4;
601        assert_eq!(recommend_profile(Some(50_000), false, &e), "safe");
602    }
603
604    #[test]
605    fn profile_medium_no_index_is_balanced() {
606        let e = make_export("t", ExportMode::Full, None);
607        assert_eq!(recommend_profile(Some(500_000), false, &e), "balanced");
608    }
609
610    #[test]
611    fn profile_large_no_index_is_safe() {
612        let e = make_export("t", ExportMode::Full, None);
613        assert_eq!(recommend_profile(Some(5_000_000), false, &e), "safe");
614    }
615
616    #[test]
617    fn sparse_range_warning_when_very_sparse() {
618        let mut e = make_export("t", ExportMode::Chunked, None);
619        e.chunk_column = Some("id".to_string());
620        e.chunk_size = 100_000;
621        let w = check_sparse_range(&e, Some(100_000), Some("1"), Some("10000000"));
622        assert!(w.is_some(), "should warn about sparse range");
623        let msg = w.unwrap();
624        assert!(msg.contains("Sparse key range"), "got: {msg}");
625        assert!(msg.contains("empty"), "got: {msg}");
626    }
627
628    #[test]
629    fn sparse_range_no_warning_when_dense() {
630        let mut e = make_export("t", ExportMode::Chunked, None);
631        e.chunk_column = Some("id".to_string());
632        e.chunk_size = 100_000;
633        let w = check_sparse_range(&e, Some(100_000), Some("1"), Some("100000"));
634        assert!(w.is_none(), "should not warn for dense range");
635    }
636
637    #[test]
638    fn sparse_range_skipped_when_chunk_dense() {
639        let mut e = make_export("t", ExportMode::Chunked, None);
640        e.chunk_column = Some("id".to_string());
641        e.chunk_dense = true;
642        e.chunk_size = 100_000;
643        let w = check_sparse_range(&e, Some(100_000), Some("1"), Some("10000000"));
644        assert!(
645            w.is_none(),
646            "chunk_dense uses ordinals, not physical id span"
647        );
648    }
649
650    #[test]
651    fn dense_surrogate_warning_when_chunk_dense_builtin() {
652        let mut e = make_export("t", ExportMode::Chunked, None);
653        e.chunk_column = Some("id".to_string());
654        e.chunk_dense = true;
655        e.query = Some("SELECT id FROM orders".to_string());
656        let w = check_dense_surrogate_cost(&e);
657        assert!(w.is_some(), "should warn about built-in ROW_NUMBER cost");
658        assert!(w.unwrap().contains("global sort"));
659    }
660
661    #[test]
662    fn sparse_range_not_triggered_for_non_chunked() {
663        let e = make_export("t", ExportMode::Full, None);
664        let w = check_sparse_range(&e, Some(100), Some("1"), Some("1000000"));
665        assert!(w.is_none(), "should not warn for non-chunked mode");
666    }
667
668    #[test]
669    fn dense_surrogate_warning_with_row_number() {
670        let mut e = make_export("t", ExportMode::Chunked, None);
671        e.chunk_column = Some("rn".to_string());
672        e.query = Some("SELECT *, ROW_NUMBER() OVER (ORDER BY id) AS rn FROM orders".to_string());
673        let w = check_dense_surrogate_cost(&e);
674        assert!(w.is_some(), "should warn about ROW_NUMBER cost");
675        assert!(w.unwrap().contains("global sort"));
676    }
677
678    #[test]
679    fn no_dense_surrogate_warning_without_row_number() {
680        let mut e = make_export("t", ExportMode::Chunked, None);
681        e.chunk_column = Some("id".to_string());
682        e.query = Some("SELECT * FROM orders".to_string());
683        let w = check_dense_surrogate_cost(&e);
684        assert!(w.is_none());
685    }
686
687    #[test]
688    fn no_dense_surrogate_warning_for_non_chunked() {
689        let mut e = make_export("t", ExportMode::Full, None);
690        e.query = Some("SELECT ROW_NUMBER() OVER () AS rn FROM t".to_string());
691        let w = check_dense_surrogate_cost(&e);
692        assert!(w.is_none(), "should not warn for non-chunked mode");
693    }
694
695    #[test]
696    fn parallel_memory_warning_large_dataset() {
697        let mut e = make_export("t", ExportMode::Chunked, None);
698        e.parallel = 4;
699        let w = check_parallel_memory_risk(&e, Some(10_000_000));
700        assert!(w.is_some(), "should warn about memory risk");
701        let msg = w.unwrap();
702        assert!(msg.contains("Parallel=4"), "got: {msg}");
703        assert!(msg.contains("memory"), "got: {msg}");
704    }
705
706    #[test]
707    fn no_parallel_memory_warning_small_dataset() {
708        let mut e = make_export("t", ExportMode::Chunked, None);
709        e.parallel = 4;
710        let w = check_parallel_memory_risk(&e, Some(1_000));
711        assert!(w.is_none(), "should not warn for small dataset");
712    }
713
714    #[test]
715    fn no_parallel_memory_warning_single_worker() {
716        let e = make_export("t", ExportMode::Full, None);
717        let w = check_parallel_memory_risk(&e, Some(100_000_000));
718        assert!(w.is_none(), "should not warn when parallel=1");
719    }
720
721    #[test]
722    fn suggestion_degraded_full_recommends_incremental() {
723        let e = make_export("t", ExportMode::Full, None);
724        let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e).unwrap();
725        assert!(s.contains("incremental"), "got: {s}");
726    }
727
728    #[test]
729    fn suggestion_degraded_chunked_recommends_index() {
730        let mut e = make_export("t", ExportMode::Chunked, None);
731        e.chunk_column = Some("id".to_string());
732        let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e).unwrap();
733        assert!(s.contains("index on 'id'"), "got: {s}");
734    }
735
736    #[test]
737    fn suggestion_degraded_time_window_recommends_index() {
738        let mut e = make_export("t", ExportMode::TimeWindow, None);
739        e.time_column = Some("created_at".to_string());
740        e.days_window = Some(7);
741        let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e).unwrap();
742        assert!(s.contains("index on 'created_at'"), "got: {s}");
743    }
744
745    #[test]
746    fn suggestion_unsafe_full_recommends_incremental() {
747        let e = make_export("t", ExportMode::Full, None);
748        let s = build_suggestion(&HealthVerdict::Unsafe, Some(100_000_000), false, &e).unwrap();
749        assert!(s.contains("incremental"), "got: {s}");
750    }
751
752    #[test]
753    fn suggestion_unsafe_chunked_recommends_index_and_parallel() {
754        let mut e = make_export("t", ExportMode::Chunked, None);
755        e.chunk_column = Some("id".to_string());
756        let s = build_suggestion(&HealthVerdict::Unsafe, Some(100_000_000), false, &e).unwrap();
757        assert!(s.contains("index on 'id'"), "got: {s}");
758        assert!(s.contains("parallel"), "got: {s}");
759    }
760
761    #[test]
762    fn suggestion_unsafe_incremental_recommends_index_on_cursor() {
763        let e = make_export("t", ExportMode::Incremental, Some("updated_at"));
764        let s = build_suggestion(&HealthVerdict::Unsafe, Some(100_000_000), false, &e).unwrap();
765        assert!(s.contains("index on 'updated_at'"), "got: {s}");
766    }
767
768    #[test]
769    fn suggestion_acceptable_large_full_recommends_incremental() {
770        let e = make_export("t", ExportMode::Full, None);
771        let s = build_suggestion(&HealthVerdict::Acceptable, Some(20_000_000), true, &e).unwrap();
772        assert!(s.contains("incremental"), "got: {s}");
773    }
774
775    #[test]
776    fn parallel_only_for_chunked_mode() {
777        let e = make_export("t", ExportMode::Full, None);
778        let (level, _) = recommend_parallelism(&e, Some(1_000_000), true);
779        assert_eq!(level, 1, "non-chunked mode should recommend 1");
780    }
781
782    #[test]
783    fn parallel_small_dataset_is_one() {
784        let mut e = make_export("t", ExportMode::Chunked, None);
785        e.chunk_column = Some("id".to_string());
786        let (level, _) = recommend_parallelism(&e, Some(10_000), true);
787        assert_eq!(level, 1, "small dataset should recommend 1");
788    }
789
790    #[test]
791    fn parallel_moderate_indexed_is_two() {
792        let mut e = make_export("t", ExportMode::Chunked, None);
793        e.chunk_column = Some("id".to_string());
794        let (level, _) = recommend_parallelism(&e, Some(200_000), true);
795        assert_eq!(level, 2, "moderate indexed dataset should recommend 2");
796    }
797
798    #[test]
799    fn parallel_large_indexed_is_four() {
800        let mut e = make_export("t", ExportMode::Chunked, None);
801        e.chunk_column = Some("id".to_string());
802        let (level, _) = recommend_parallelism(&e, Some(2_000_000), true);
803        assert_eq!(level, 4, "large indexed dataset should recommend 4");
804    }
805
806    #[test]
807    fn parallel_no_index_large_is_one() {
808        let mut e = make_export("t", ExportMode::Chunked, None);
809        e.chunk_column = Some("id".to_string());
810        let (level, reason) = recommend_parallelism(&e, Some(10_000_000), false);
811        assert_eq!(level, 1, "no index + large should recommend 1");
812        assert!(reason.contains("no index"), "got: {reason}");
813    }
814
815    #[test]
816    fn parallel_no_index_moderate_is_conservative() {
817        let mut e = make_export("t", ExportMode::Chunked, None);
818        e.chunk_column = Some("id".to_string());
819        let (level, _) = recommend_parallelism(&e, Some(200_000), false);
820        assert_eq!(
821            level, 2,
822            "no index + moderate should recommend 2 (conservative)"
823        );
824    }
825
826    #[test]
827    fn suggestion_acceptable_large_chunked_recommends_parallel() {
828        let mut e = make_export("t", ExportMode::Chunked, None);
829        e.chunk_column = Some("id".to_string());
830        let s = build_suggestion(&HealthVerdict::Acceptable, Some(20_000_000), true, &e).unwrap();
831        assert!(s.contains("parallel"), "got: {s}");
832    }
833
834    #[test]
835    fn connection_limit_warn_when_parallel_meets_max() {
836        let w = check_connection_limit(20, Some(20));
837        assert!(w.is_some(), "should warn when parallel == max_connections");
838        let msg = w.unwrap();
839        assert!(msg.contains("max_connections=20"), "got: {msg}");
840        assert!(msg.contains("parallel=20"), "got: {msg}");
841    }
842
843    #[test]
844    fn connection_limit_warn_when_parallel_exceeds_max() {
845        let w = check_connection_limit(100, Some(20));
846        assert!(w.is_some(), "should warn when parallel > max_connections");
847        let msg = w.unwrap();
848        assert!(msg.contains("max_connections=20"), "got: {msg}");
849    }
850
851    #[test]
852    fn connection_limit_no_warn_when_parallel_below_max() {
853        let w = check_connection_limit(4, Some(100));
854        assert!(
855            w.is_none(),
856            "should not warn when parallel << max_connections"
857        );
858    }
859
860    #[test]
861    fn connection_limit_no_warn_when_parallel_is_one() {
862        let w = check_connection_limit(1, Some(5));
863        assert!(
864            w.is_none(),
865            "single worker never triggers connection warning"
866        );
867    }
868
869    #[test]
870    fn connection_limit_skipped_note_when_max_unknown_and_parallel_gt_one() {
871        let w = check_connection_limit(100, None);
872        assert!(w.is_some(), "should note that check was skipped");
873        let msg = w.unwrap();
874        assert!(msg.contains("skipped"), "got: {msg}");
875    }
876
877    #[test]
878    fn connection_limit_no_note_when_max_unknown_and_parallel_is_one() {
879        let w = check_connection_limit(1, None);
880        assert!(
881            w.is_none(),
882            "single worker never triggers connection warning"
883        );
884    }
885
886    #[test]
887    fn connection_limit_suggests_headroom() {
888        let w = check_connection_limit(25, Some(20)).unwrap();
889        // Suggested safe max should be max_connections - 3 = 17
890        assert!(
891            w.contains("17"),
892            "should suggest leaving headroom, got: {w}"
893        );
894    }
895
896    // ── v0.7.4: actionable hints next to categorised errors ───────────
897
898    fn src_hint(msg: &str, st: SourceType) -> Option<&'static str> {
899        let err = anyhow::anyhow!("{}", msg);
900        let cat = categorize_source_error(&err);
901        source_error_hint(cat, &err, &st)
902    }
903
904    fn dest_hint(msg: &str, dt: DestinationType) -> Option<&'static str> {
905        let err = anyhow::anyhow!("{}", msg);
906        let dest = DestinationConfig {
907            destination_type: dt,
908            bucket: Some("b".into()),
909            ..Default::default()
910        };
911        let cat = categorize_dest_error(&err, &dest);
912        destination_error_hint(cat, &dest)
913    }
914
915    #[test]
916    fn source_tls_handshake_returns_pg_specific_tls_hint() {
917        let h = src_hint("TLS handshake failed", SourceType::Postgres).expect("hint");
918        assert!(h.contains("tls.mode") && h.contains("ca_file"), "got: {h}");
919    }
920
921    #[test]
922    fn source_tls_handshake_returns_mysql_specific_tls_hint() {
923        let h = src_hint("certificate verify failed", SourceType::Mysql).expect("hint");
924        assert!(h.contains("tls.mode"), "got: {h}");
925    }
926
927    #[test]
928    fn source_auth_error_postgres_mentions_pg_hba() {
929        let h = src_hint("password authentication failed", SourceType::Postgres).expect("hint");
930        assert!(h.contains("pg_hba") && h.contains("SELECT"), "got: {h}");
931    }
932
933    #[test]
934    fn source_auth_error_mysql_mentions_grant() {
935        let h = src_hint(
936            "Access denied for user 'rivet'@'localhost'",
937            SourceType::Mysql,
938        )
939        .expect("hint");
940        assert!(h.contains("GRANT") && h.contains("FLUSH"), "got: {h}");
941    }
942
943    #[test]
944    fn source_connectivity_error_mentions_bastion_and_network() {
945        let h = src_hint("connection refused", SourceType::Postgres).expect("hint");
946        assert!(h.contains("bastion") || h.contains("VPN"), "got: {h}");
947    }
948
949    #[test]
950    fn source_unknown_error_returns_no_hint() {
951        // Generic "error" category should yield no hint — better to
952        // print the raw driver message than to mislead.
953        let h = src_hint("totally unexpected", SourceType::Postgres);
954        assert!(h.is_none(), "unknown errors should not produce a hint");
955    }
956
957    #[test]
958    fn dest_s3_auth_error_names_concrete_actions() {
959        let h = dest_hint("permission denied", DestinationType::S3).expect("hint");
960        assert!(
961            h.contains("s3:PutObject") && h.contains("cloud-permissions"),
962            "got: {h}"
963        );
964    }
965
966    #[test]
967    fn dest_gcs_auth_error_names_concrete_actions() {
968        let h = dest_hint("403 Forbidden", DestinationType::Gcs).expect("hint");
969        assert!(
970            h.contains("storage.objects") && h.contains("cloud-permissions"),
971            "got: {h}"
972        );
973    }
974
975    #[test]
976    fn categorize_dest_error_sas_expired_message_returns_sas_expired_category() {
977        // Guard the load-bearing ordering in categorize_dest_error: the
978        // "sas expired" early-return must fire before the generic "token"
979        // branch, or destination_error_hint produces the wrong hint.
980        // This test pins the *category string*, not just the final hint text.
981        let err = anyhow::anyhow!(
982            "Azure SAS token already expired (se=2024-01-01T00:00:00Z). Generate a new SAS and re-export."
983        );
984        let dest = DestinationConfig {
985            destination_type: DestinationType::Azure,
986            bucket: Some("c".into()),
987            ..Default::default()
988        };
989        let cat = categorize_dest_error(&err, &dest);
990        assert_eq!(
991            cat, "sas expired",
992            "expired-SAS error must categorise as 'sas expired', not '{cat}' — ordering in categorize_dest_error is load-bearing"
993        );
994    }
995
996    #[test]
997    fn dest_azure_sas_expired_returns_regenerate_hint() {
998        // The Azure preflight (v0.7.4) bails with "expired (se=…)" —
999        // the hint must steer the operator to `az storage container
1000        // generate-sas` not "your IAM role is broken".
1001        let h = dest_hint(
1002            "Azure SAS token already expired (se=2024-01-01T00:00:00Z)",
1003            DestinationType::Azure,
1004        )
1005        .expect("hint");
1006        assert!(
1007            h.contains("generate-sas") && h.contains("AZURE_STORAGE_SAS_TOKEN"),
1008            "got: {h}"
1009        );
1010    }
1011
1012    #[test]
1013    fn dest_s3_bucket_not_found_says_no_auto_create() {
1014        let h = dest_hint("NoSuchBucket", DestinationType::S3).expect("hint");
1015        assert!(
1016            h.contains("does NOT auto-create") && h.contains("aws s3 mb"),
1017            "got: {h}"
1018        );
1019    }
1020
1021    #[test]
1022    fn dest_s3_connectivity_error_warns_about_region_mismatch() {
1023        let h = dest_hint("dns error", DestinationType::S3).expect("hint");
1024        assert!(h.contains("region") || h.contains("endpoint"), "got: {h}");
1025    }
1026}