Skip to main content

rivet/preflight/
mod.rs

1mod analysis;
2pub(crate) mod cursor_expr;
3mod doctor;
4mod mysql;
5mod postgres;
6pub mod type_report;
7
8pub(crate) use analysis::chunk_sparsity_from_counts;
9#[cfg(test)]
10use analysis::{
11    build_suggestion, check_connection_limit, check_dense_surrogate_cost,
12    check_parallel_memory_risk, check_sparse_range, compute_verdict, derive_strategy,
13    recommend_parallelism, recommend_profile,
14};
15#[allow(unused_imports)]
16pub use doctor::doctor;
17#[cfg(test)]
18use postgres::{extract_scan_type, parse_pg_row_estimate};
19
20use crate::config::{Config, ExportConfig, SourceType};
21use crate::error::Result;
22use crate::types::policy::TypePolicy;
23use crate::types::target::ExportTarget;
24
25#[derive(Debug)]
26pub enum HealthVerdict {
27    Efficient,
28    Acceptable,
29    Degraded,
30    Unsafe,
31}
32
33impl std::fmt::Display for HealthVerdict {
34    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35        match self {
36            Self::Efficient => write!(f, "EFFICIENT"),
37            Self::Acceptable => write!(f, "ACCEPTABLE"),
38            Self::Degraded => write!(f, "DEGRADED"),
39            Self::Unsafe => write!(f, "UNSAFE"),
40        }
41    }
42}
43
44pub(crate) struct ExportDiagnostic {
45    pub export_name: String,
46    pub strategy: String,
47    pub mode: String,
48    pub cursor_column: Option<String>,
49    pub row_estimate: Option<i64>,
50    pub cursor_min: Option<String>,
51    pub cursor_max: Option<String>,
52    pub scan_type: Option<String>,
53    pub uses_index: bool,
54    pub verdict: HealthVerdict,
55    pub recommended_profile: &'static str,
56    pub recommended_parallel: (u32, &'static str),
57    pub warnings: Vec<String>,
58    pub suggestion: Option<String>,
59}
60
61/// Return the diagnostic for a single export without printing anything.
62///
63/// Used by `rivet plan` to capture preflight data into a `PlanArtifact`.
64pub(crate) fn get_export_diagnostic(
65    config: &Config,
66    export: &ExportConfig,
67) -> Result<ExportDiagnostic> {
68    let url = config.source.resolve_url()?;
69    let tls = config.source.tls.as_ref();
70    match config.source.source_type {
71        SourceType::Postgres => postgres::diagnose_export_pg(&url, tls, export),
72        SourceType::Mysql => mysql::diagnose_export_mysql(&url, tls, export),
73    }
74}
75
76pub fn check(
77    config_path: &str,
78    export_name: Option<&str>,
79    params: Option<&std::collections::HashMap<String, String>>,
80    show_type_report: bool,
81    strict: bool,
82    json_output: bool,
83    target: Option<ExportTarget>,
84) -> Result<()> {
85    let config = Config::load_with_params(config_path, params)?;
86
87    let exports: Vec<&ExportConfig> = if let Some(name) = export_name {
88        let e = config
89            .exports
90            .iter()
91            .find(|e| e.name == name)
92            .ok_or_else(|| anyhow::anyhow!("export '{}' not found in config", name))?;
93        vec![e]
94    } else {
95        config.exports.iter().collect()
96    };
97
98    let url = config.source.resolve_url()?;
99    let tls = config.source.tls.as_ref();
100    match config.source.source_type {
101        SourceType::Postgres => postgres::check_postgres(&url, tls, &exports, json_output)?,
102        SourceType::Mysql => mysql::check_mysql(&url, tls, &exports, json_output)?,
103    }
104
105    // Destination credential-resolution preflight.  Until 0.7.6 `check` only
106    // probed the source: a config with `AWS_ACCESS_KEY_ID` unset would pass
107    // `rivet check` (rc=0) and then explode on `run`, while `rivet doctor`
108    // caught it.  We don't issue a write-probe here (that is `doctor`'s job
109    // and has side effects) — but we *do* call `create_destination`, which
110    // resolves env vars / credentials_file existence at construction time.
111    // Each unique destination is probed once per `check` to keep multi-export
112    // configs cheap.
113    let mut seen_destinations: std::collections::HashSet<String> = std::collections::HashSet::new();
114    for export in &exports {
115        let dest_key = format!(
116            "{:?}:{}:{}:{}",
117            export.destination.destination_type,
118            export.destination.bucket.as_deref().unwrap_or("-"),
119            export.destination.endpoint.as_deref().unwrap_or("-"),
120            export.destination.path.as_deref().unwrap_or("-"),
121        );
122        if !seen_destinations.insert(dest_key) {
123            continue;
124        }
125        let expanded = crate::plan::build::expand_destination_templates(
126            export.destination.clone(),
127            &export.name,
128        );
129        crate::destination::create_destination(&expanded).map_err(|e| {
130            anyhow::anyhow!(
131                "export '{}': destination preflight failed: {:#}",
132                export.name,
133                e
134            )
135        })?;
136    }
137
138    if show_type_report {
139        let policy = if strict {
140            TypePolicy::strict()
141        } else {
142            TypePolicy::warn_only()
143        };
144
145        let mut any_fatal = false;
146        for export in &exports {
147            let column_overrides =
148                crate::plan::parse_column_overrides_pub(&export.columns, &export.name)?;
149            match type_report::collect_report(&config, export, &column_overrides, &policy, target) {
150                Ok(report) => {
151                    if report.has_fatal() {
152                        any_fatal = true;
153                    }
154                    if target.is_some() && report.has_target_fail() {
155                        any_fatal = true;
156                    }
157                    if json_output {
158                        type_report::print_json(&report)?;
159                    } else {
160                        type_report::print_table(&report, target);
161                    }
162                }
163                Err(e) => {
164                    log::warn!("type report for '{}' failed: {:#}", export.name, e);
165                }
166            }
167        }
168
169        if strict && any_fatal {
170            anyhow::bail!("strict mode: unsafe type mappings found (see report above)");
171        }
172    }
173
174    Ok(())
175}
176
177fn print_diagnostic(diag: &ExportDiagnostic) {
178    println!();
179    println!("Export: {}", diag.export_name);
180    println!("  Strategy:     {}", diag.strategy);
181    println!("  Mode:         {}", diag.mode);
182    if let Some(est) = diag.row_estimate {
183        if est >= 1_000_000 {
184            println!("  Row estimate: ~{}M", est / 1_000_000);
185        } else if est >= 1_000 {
186            println!("  Row estimate: ~{}K", est / 1_000);
187        } else {
188            println!("  Row estimate: ~{}", est);
189        }
190    }
191    if let (Some(min_v), Some(max_v)) = (&diag.cursor_min, &diag.cursor_max) {
192        println!("  Cursor range: {} .. {}", min_v, max_v);
193    }
194    if let Some(col) = &diag.cursor_column {
195        println!("  Cursor col:   {}", col);
196    }
197    if let Some(scan) = &diag.scan_type {
198        let idx_label = if diag.uses_index { " (indexed)" } else { "" };
199        println!("  Scan type:    {}{}", scan, idx_label);
200    }
201    println!("  Verdict:      {}", diag.verdict);
202    println!(
203        "  Recommended:  tuning.profile: {}",
204        diag.recommended_profile
205    );
206    let (par_level, par_reason) = diag.recommended_parallel;
207    if par_level > 1 {
208        println!("  Recommended:  parallel: {} ({})", par_level, par_reason);
209    } else {
210        println!("  Parallelism:  {} ({})", par_level, par_reason);
211    }
212    for w in &diag.warnings {
213        println!("  Warning:      {}", w);
214    }
215    if let Some(suggestion) = &diag.suggestion {
216        println!("  Suggestion:   {}", suggestion);
217    }
218}
219
220#[cfg(test)]
221mod tests {
222    use super::*;
223    use crate::config::{
224        CompressionType, DestinationConfig, DestinationType, ExportConfig, ExportMode, FormatType,
225        IncrementalCursorMode, MetaColumns, TimeColumnType,
226    };
227    use doctor::{
228        categorize_dest_error, categorize_source_error, destination_error_hint, source_error_hint,
229    };
230
231    fn make_export(name: &str, mode: ExportMode, cursor: Option<&str>) -> ExportConfig {
232        ExportConfig {
233            name: name.to_string(),
234            query: Some("SELECT * FROM t".to_string()),
235            query_file: None,
236            table: None,
237            mode,
238            cursor_column: cursor.map(|s| s.to_string()),
239            cursor_fallback_column: None,
240            incremental_cursor_mode: IncrementalCursorMode::SingleColumn,
241            chunk_column: None,
242            chunk_size: 100_000,
243            chunk_size_memory_mb: None,
244            chunk_count: None,
245            parallel: 1,
246            time_column: None,
247            time_column_type: TimeColumnType::Timestamp,
248            days_window: None,
249            format: FormatType::Csv,
250            compression: CompressionType::default(),
251            compression_level: None,
252            compression_profile: None,
253            skip_empty: false,
254            destination: DestinationConfig {
255                destination_type: DestinationType::Local,
256                path: Some("./out".to_string()),
257                ..Default::default()
258            },
259            meta_columns: MetaColumns::default(),
260            quality: None,
261            max_file_size: None,
262            chunk_checkpoint: false,
263            chunk_max_attempts: None,
264            tuning: None,
265            chunk_dense: false,
266            chunk_by_days: None,
267            source_group: None,
268            reconcile_required: false,
269            columns: Default::default(),
270            on_schema_drift: Default::default(),
271            shape_drift_warn_factor: None,
272            parquet: None,
273        }
274    }
275
276    #[test]
277    fn verdict_small_indexed_with_cursor_is_efficient() {
278        let v = compute_verdict(Some(500_000), true, true);
279        assert!(matches!(v, HealthVerdict::Efficient), "got: {v}");
280    }
281
282    #[test]
283    fn verdict_large_indexed_with_cursor_is_acceptable() {
284        let v = compute_verdict(Some(20_000_000), true, true);
285        assert!(matches!(v, HealthVerdict::Acceptable), "got: {v}");
286    }
287
288    #[test]
289    fn verdict_no_index_no_cursor_is_degraded() {
290        let v = compute_verdict(Some(500_000), false, false);
291        assert!(matches!(v, HealthVerdict::Degraded), "got: {v}");
292    }
293
294    #[test]
295    fn verdict_huge_no_index_is_unsafe() {
296        let v = compute_verdict(Some(100_000_000), false, false);
297        assert!(matches!(v, HealthVerdict::Unsafe), "got: {v}");
298    }
299
300    #[test]
301    fn parse_pg_row_estimate_from_sort_plan() {
302        let plan = "Sort  (cost=12345.67..12456.78 rows=1000455 width=50)\n  ->  Seq Scan on orders  (cost=0.00..8765.43 rows=1000455 width=50)";
303        assert_eq!(parse_pg_row_estimate(plan), Some(1_000_455));
304    }
305
306    #[test]
307    fn parse_pg_row_estimate_from_index_scan() {
308        let plan =
309            "Index Scan using idx_updated on orders  (cost=0.42..81676.36 rows=500000 width=50)";
310        assert_eq!(parse_pg_row_estimate(plan), Some(500_000));
311    }
312
313    #[test]
314    fn extract_scan_type_detects_seq_scan() {
315        let plan = "Sort  (cost=...)\n  ->  Seq Scan on users  (cost=...)";
316        let st = extract_scan_type(plan);
317        assert!(st.contains("Seq Scan"), "expected Seq Scan, got: {st}");
318    }
319
320    #[test]
321    fn extract_scan_type_detects_index_scan() {
322        let plan = "Index Scan using users_pkey on users  (cost=0.42..123.45 rows=100 width=50)";
323        let st = extract_scan_type(plan);
324        assert!(st.contains("Index Scan"), "expected Index Scan, got: {st}");
325    }
326
327    #[test]
328    fn suggestion_for_efficient_verdict_is_none() {
329        let e = make_export("t", ExportMode::Full, None);
330        let s = build_suggestion(&HealthVerdict::Efficient, Some(1000), true, &e);
331        assert!(
332            s.is_none(),
333            "efficient verdict should produce no suggestion"
334        );
335    }
336
337    #[test]
338    fn suggestion_for_degraded_verdict_recommends_safe_profile() {
339        let e = make_export("t", ExportMode::Full, None);
340        let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e);
341        let msg = s.expect("degraded verdict should produce a suggestion");
342        assert!(
343            msg.contains("safe"),
344            "suggestion should recommend safe profile, got: {msg}"
345        );
346    }
347
348    fn src_err(msg: &str) -> &'static str {
349        categorize_source_error(&anyhow::anyhow!("{}", msg))
350    }
351
352    #[test]
353    fn source_password_rejected_is_auth_error() {
354        assert_eq!(
355            src_err("password authentication failed for user \"rivet\""),
356            "auth error"
357        );
358    }
359
360    #[test]
361    fn source_authentication_failed_is_auth_error() {
362        assert_eq!(src_err("FATAL: authentication failed"), "auth error");
363    }
364
365    #[test]
366    fn source_access_denied_is_auth_error() {
367        assert_eq!(
368            src_err("Access denied for user 'rivet'@'localhost'"),
369            "auth error"
370        );
371    }
372
373    #[test]
374    fn source_connection_refused_is_connectivity() {
375        assert_eq!(
376            src_err("connection refused (os error 61)"),
377            "connectivity error"
378        );
379    }
380
381    #[test]
382    fn source_timed_out_is_connectivity() {
383        assert_eq!(src_err("connection timed out"), "connectivity error");
384    }
385
386    #[test]
387    fn source_dns_translate_host_is_connectivity() {
388        assert_eq!(
389            src_err("could not translate host name \"db.bad\" to address"),
390            "connectivity error"
391        );
392    }
393
394    #[test]
395    fn source_name_not_known_is_connectivity() {
396        assert_eq!(src_err("Name or service not known"), "connectivity error");
397    }
398
399    #[test]
400    fn source_unknown_error_is_generic() {
401        assert_eq!(src_err("something totally unexpected"), "error");
402    }
403
404    fn dest_config(dtype: DestinationType) -> DestinationConfig {
405        DestinationConfig {
406            destination_type: dtype,
407            bucket: Some("b".to_string()),
408            ..Default::default()
409        }
410    }
411
412    fn dest_err(msg: &str, dtype: DestinationType) -> &'static str {
413        let cfg = dest_config(dtype);
414        categorize_dest_error(&anyhow::anyhow!("{}", msg), &cfg)
415    }
416
417    #[test]
418    fn dest_credential_loading_is_auth_error() {
419        assert_eq!(
420            dest_err(
421                "loading credential to sign http request",
422                DestinationType::Gcs
423            ),
424            "auth error"
425        );
426    }
427
428    #[test]
429    fn dest_permission_denied_is_auth_error() {
430        assert_eq!(
431            dest_err("permission denied on resource bucket", DestinationType::S3),
432            "auth error"
433        );
434    }
435
436    #[test]
437    fn dest_forbidden_is_auth_error() {
438        assert_eq!(
439            dest_err("403 Forbidden", DestinationType::Gcs),
440            "auth error"
441        );
442    }
443
444    #[test]
445    fn dest_unauthorized_is_auth_error() {
446        assert_eq!(
447            dest_err("401 Unauthorized", DestinationType::S3),
448            "auth error"
449        );
450    }
451
452    #[test]
453    fn dest_invalid_grant_is_auth_error() {
454        assert_eq!(
455            dest_err(
456                "invalid_grant: token has been revoked",
457                DestinationType::Gcs
458            ),
459            "auth error"
460        );
461    }
462
463    #[test]
464    fn dest_nosuchbucket_s3_is_bucket_not_found() {
465        assert_eq!(
466            dest_err(
467                "NoSuchBucket: the specified bucket does not exist",
468                DestinationType::S3
469            ),
470            "bucket not found"
471        );
472    }
473
474    #[test]
475    fn dest_not_found_gcs_is_bucket_not_found() {
476        assert_eq!(
477            dest_err("bucket not found (404)", DestinationType::Gcs),
478            "bucket not found"
479        );
480    }
481
482    #[test]
483    fn dest_not_found_local_is_path_not_found() {
484        assert_eq!(
485            dest_err("path not found: /tmp/missing", DestinationType::Local),
486            "path not found"
487        );
488    }
489
490    #[test]
491    fn dest_connection_refused_is_connectivity() {
492        assert_eq!(
493            dest_err("connection refused to endpoint", DestinationType::S3),
494            "connectivity error"
495        );
496    }
497
498    #[test]
499    fn dest_dns_error_is_connectivity() {
500        assert_eq!(
501            dest_err("dns error: failed to lookup address", DestinationType::S3),
502            "connectivity error"
503        );
504    }
505
506    #[test]
507    fn dest_timed_out_is_connectivity() {
508        assert_eq!(
509            dest_err("request timed out after 30s", DestinationType::Gcs),
510            "connectivity error"
511        );
512    }
513
514    #[test]
515    fn dest_unknown_error_is_generic() {
516        assert_eq!(
517            dest_err("something else entirely", DestinationType::S3),
518            "error"
519        );
520    }
521
522    #[test]
523    fn strategy_full_scan() {
524        let e = make_export("t", ExportMode::Full, None);
525        assert_eq!(derive_strategy(&e), "full-scan");
526    }
527
528    #[test]
529    fn strategy_full_parallel() {
530        let mut e = make_export("t", ExportMode::Full, None);
531        e.parallel = 4;
532        assert_eq!(derive_strategy(&e), "full-parallel(4)");
533    }
534
535    #[test]
536    fn strategy_incremental() {
537        let e = make_export("t", ExportMode::Incremental, Some("updated_at"));
538        assert_eq!(derive_strategy(&e), "incremental(updated_at)");
539    }
540
541    #[test]
542    fn strategy_chunked() {
543        let mut e = make_export("t", ExportMode::Chunked, None);
544        e.chunk_column = Some("id".to_string());
545        e.chunk_size = 50_000;
546        assert_eq!(derive_strategy(&e), "chunked(id, size=50000)");
547    }
548
549    #[test]
550    fn strategy_chunked_parallel() {
551        let mut e = make_export("t", ExportMode::Chunked, None);
552        e.chunk_column = Some("id".to_string());
553        e.chunk_size = 50_000;
554        e.parallel = 3;
555        assert_eq!(derive_strategy(&e), "chunked-parallel(id, size=50000, p=3)");
556    }
557
558    #[test]
559    fn strategy_time_window() {
560        let mut e = make_export("t", ExportMode::TimeWindow, None);
561        e.time_column = Some("created_at".to_string());
562        e.days_window = Some(7);
563        assert_eq!(derive_strategy(&e), "time-window(created_at, 7d)");
564    }
565
566    #[test]
567    fn profile_small_indexed_is_fast() {
568        let e = make_export("t", ExportMode::Full, None);
569        assert_eq!(recommend_profile(Some(500_000), true, &e), "fast");
570    }
571
572    #[test]
573    fn profile_medium_indexed_is_balanced() {
574        let e = make_export("t", ExportMode::Full, None);
575        assert_eq!(recommend_profile(Some(5_000_000), true, &e), "balanced");
576    }
577
578    #[test]
579    fn profile_large_indexed_is_safe() {
580        let e = make_export("t", ExportMode::Full, None);
581        assert_eq!(recommend_profile(Some(50_000_000), true, &e), "safe");
582    }
583
584    #[test]
585    fn profile_small_no_index_is_balanced() {
586        let e = make_export("t", ExportMode::Full, None);
587        assert_eq!(recommend_profile(Some(50_000), false, &e), "balanced");
588    }
589
590    #[test]
591    fn profile_small_no_index_parallel_is_safe() {
592        let mut e = make_export("t", ExportMode::Full, None);
593        e.parallel = 4;
594        assert_eq!(recommend_profile(Some(50_000), false, &e), "safe");
595    }
596
597    #[test]
598    fn profile_medium_no_index_is_balanced() {
599        let e = make_export("t", ExportMode::Full, None);
600        assert_eq!(recommend_profile(Some(500_000), false, &e), "balanced");
601    }
602
603    #[test]
604    fn profile_large_no_index_is_safe() {
605        let e = make_export("t", ExportMode::Full, None);
606        assert_eq!(recommend_profile(Some(5_000_000), false, &e), "safe");
607    }
608
609    #[test]
610    fn sparse_range_warning_when_very_sparse() {
611        let mut e = make_export("t", ExportMode::Chunked, None);
612        e.chunk_column = Some("id".to_string());
613        e.chunk_size = 100_000;
614        let w = check_sparse_range(&e, Some(100_000), Some("1"), Some("10000000"));
615        assert!(w.is_some(), "should warn about sparse range");
616        let msg = w.unwrap();
617        assert!(msg.contains("Sparse key range"), "got: {msg}");
618        assert!(msg.contains("empty"), "got: {msg}");
619    }
620
621    #[test]
622    fn sparse_range_no_warning_when_dense() {
623        let mut e = make_export("t", ExportMode::Chunked, None);
624        e.chunk_column = Some("id".to_string());
625        e.chunk_size = 100_000;
626        let w = check_sparse_range(&e, Some(100_000), Some("1"), Some("100000"));
627        assert!(w.is_none(), "should not warn for dense range");
628    }
629
630    #[test]
631    fn sparse_range_skipped_when_chunk_dense() {
632        let mut e = make_export("t", ExportMode::Chunked, None);
633        e.chunk_column = Some("id".to_string());
634        e.chunk_dense = true;
635        e.chunk_size = 100_000;
636        let w = check_sparse_range(&e, Some(100_000), Some("1"), Some("10000000"));
637        assert!(
638            w.is_none(),
639            "chunk_dense uses ordinals, not physical id span"
640        );
641    }
642
643    #[test]
644    fn dense_surrogate_warning_when_chunk_dense_builtin() {
645        let mut e = make_export("t", ExportMode::Chunked, None);
646        e.chunk_column = Some("id".to_string());
647        e.chunk_dense = true;
648        e.query = Some("SELECT id FROM orders".to_string());
649        let w = check_dense_surrogate_cost(&e);
650        assert!(w.is_some(), "should warn about built-in ROW_NUMBER cost");
651        assert!(w.unwrap().contains("global sort"));
652    }
653
654    #[test]
655    fn sparse_range_not_triggered_for_non_chunked() {
656        let e = make_export("t", ExportMode::Full, None);
657        let w = check_sparse_range(&e, Some(100), Some("1"), Some("1000000"));
658        assert!(w.is_none(), "should not warn for non-chunked mode");
659    }
660
661    #[test]
662    fn dense_surrogate_warning_with_row_number() {
663        let mut e = make_export("t", ExportMode::Chunked, None);
664        e.chunk_column = Some("rn".to_string());
665        e.query = Some("SELECT *, ROW_NUMBER() OVER (ORDER BY id) AS rn FROM orders".to_string());
666        let w = check_dense_surrogate_cost(&e);
667        assert!(w.is_some(), "should warn about ROW_NUMBER cost");
668        assert!(w.unwrap().contains("global sort"));
669    }
670
671    #[test]
672    fn no_dense_surrogate_warning_without_row_number() {
673        let mut e = make_export("t", ExportMode::Chunked, None);
674        e.chunk_column = Some("id".to_string());
675        e.query = Some("SELECT * FROM orders".to_string());
676        let w = check_dense_surrogate_cost(&e);
677        assert!(w.is_none());
678    }
679
680    #[test]
681    fn no_dense_surrogate_warning_for_non_chunked() {
682        let mut e = make_export("t", ExportMode::Full, None);
683        e.query = Some("SELECT ROW_NUMBER() OVER () AS rn FROM t".to_string());
684        let w = check_dense_surrogate_cost(&e);
685        assert!(w.is_none(), "should not warn for non-chunked mode");
686    }
687
688    #[test]
689    fn parallel_memory_warning_large_dataset() {
690        let mut e = make_export("t", ExportMode::Chunked, None);
691        e.parallel = 4;
692        let w = check_parallel_memory_risk(&e, Some(10_000_000));
693        assert!(w.is_some(), "should warn about memory risk");
694        let msg = w.unwrap();
695        assert!(msg.contains("Parallel=4"), "got: {msg}");
696        assert!(msg.contains("memory"), "got: {msg}");
697    }
698
699    #[test]
700    fn no_parallel_memory_warning_small_dataset() {
701        let mut e = make_export("t", ExportMode::Chunked, None);
702        e.parallel = 4;
703        let w = check_parallel_memory_risk(&e, Some(1_000));
704        assert!(w.is_none(), "should not warn for small dataset");
705    }
706
707    #[test]
708    fn no_parallel_memory_warning_single_worker() {
709        let e = make_export("t", ExportMode::Full, None);
710        let w = check_parallel_memory_risk(&e, Some(100_000_000));
711        assert!(w.is_none(), "should not warn when parallel=1");
712    }
713
714    #[test]
715    fn suggestion_degraded_full_recommends_incremental() {
716        let e = make_export("t", ExportMode::Full, None);
717        let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e).unwrap();
718        assert!(s.contains("incremental"), "got: {s}");
719    }
720
721    #[test]
722    fn suggestion_degraded_chunked_recommends_index() {
723        let mut e = make_export("t", ExportMode::Chunked, None);
724        e.chunk_column = Some("id".to_string());
725        let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e).unwrap();
726        assert!(s.contains("index on 'id'"), "got: {s}");
727    }
728
729    #[test]
730    fn suggestion_degraded_time_window_recommends_index() {
731        let mut e = make_export("t", ExportMode::TimeWindow, None);
732        e.time_column = Some("created_at".to_string());
733        e.days_window = Some(7);
734        let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e).unwrap();
735        assert!(s.contains("index on 'created_at'"), "got: {s}");
736    }
737
738    #[test]
739    fn suggestion_unsafe_full_recommends_incremental() {
740        let e = make_export("t", ExportMode::Full, None);
741        let s = build_suggestion(&HealthVerdict::Unsafe, Some(100_000_000), false, &e).unwrap();
742        assert!(s.contains("incremental"), "got: {s}");
743    }
744
745    #[test]
746    fn suggestion_unsafe_chunked_recommends_index_and_parallel() {
747        let mut e = make_export("t", ExportMode::Chunked, None);
748        e.chunk_column = Some("id".to_string());
749        let s = build_suggestion(&HealthVerdict::Unsafe, Some(100_000_000), false, &e).unwrap();
750        assert!(s.contains("index on 'id'"), "got: {s}");
751        assert!(s.contains("parallel"), "got: {s}");
752    }
753
754    #[test]
755    fn suggestion_unsafe_incremental_recommends_index_on_cursor() {
756        let e = make_export("t", ExportMode::Incremental, Some("updated_at"));
757        let s = build_suggestion(&HealthVerdict::Unsafe, Some(100_000_000), false, &e).unwrap();
758        assert!(s.contains("index on 'updated_at'"), "got: {s}");
759    }
760
761    #[test]
762    fn suggestion_acceptable_large_full_recommends_incremental() {
763        let e = make_export("t", ExportMode::Full, None);
764        let s = build_suggestion(&HealthVerdict::Acceptable, Some(20_000_000), true, &e).unwrap();
765        assert!(s.contains("incremental"), "got: {s}");
766    }
767
768    #[test]
769    fn parallel_only_for_chunked_mode() {
770        let e = make_export("t", ExportMode::Full, None);
771        let (level, _) = recommend_parallelism(&e, Some(1_000_000), true);
772        assert_eq!(level, 1, "non-chunked mode should recommend 1");
773    }
774
775    #[test]
776    fn parallel_small_dataset_is_one() {
777        let mut e = make_export("t", ExportMode::Chunked, None);
778        e.chunk_column = Some("id".to_string());
779        let (level, _) = recommend_parallelism(&e, Some(10_000), true);
780        assert_eq!(level, 1, "small dataset should recommend 1");
781    }
782
783    #[test]
784    fn parallel_moderate_indexed_is_two() {
785        let mut e = make_export("t", ExportMode::Chunked, None);
786        e.chunk_column = Some("id".to_string());
787        let (level, _) = recommend_parallelism(&e, Some(200_000), true);
788        assert_eq!(level, 2, "moderate indexed dataset should recommend 2");
789    }
790
791    #[test]
792    fn parallel_large_indexed_is_four() {
793        let mut e = make_export("t", ExportMode::Chunked, None);
794        e.chunk_column = Some("id".to_string());
795        let (level, _) = recommend_parallelism(&e, Some(2_000_000), true);
796        assert_eq!(level, 4, "large indexed dataset should recommend 4");
797    }
798
799    #[test]
800    fn parallel_no_index_large_is_one() {
801        let mut e = make_export("t", ExportMode::Chunked, None);
802        e.chunk_column = Some("id".to_string());
803        let (level, reason) = recommend_parallelism(&e, Some(10_000_000), false);
804        assert_eq!(level, 1, "no index + large should recommend 1");
805        assert!(reason.contains("no index"), "got: {reason}");
806    }
807
808    #[test]
809    fn parallel_no_index_moderate_is_conservative() {
810        let mut e = make_export("t", ExportMode::Chunked, None);
811        e.chunk_column = Some("id".to_string());
812        let (level, _) = recommend_parallelism(&e, Some(200_000), false);
813        assert_eq!(
814            level, 2,
815            "no index + moderate should recommend 2 (conservative)"
816        );
817    }
818
819    #[test]
820    fn suggestion_acceptable_large_chunked_recommends_parallel() {
821        let mut e = make_export("t", ExportMode::Chunked, None);
822        e.chunk_column = Some("id".to_string());
823        let s = build_suggestion(&HealthVerdict::Acceptable, Some(20_000_000), true, &e).unwrap();
824        assert!(s.contains("parallel"), "got: {s}");
825    }
826
827    #[test]
828    fn connection_limit_warn_when_parallel_meets_max() {
829        let w = check_connection_limit(20, Some(20));
830        assert!(w.is_some(), "should warn when parallel == max_connections");
831        let msg = w.unwrap();
832        assert!(msg.contains("max_connections=20"), "got: {msg}");
833        assert!(msg.contains("parallel=20"), "got: {msg}");
834    }
835
836    #[test]
837    fn connection_limit_warn_when_parallel_exceeds_max() {
838        let w = check_connection_limit(100, Some(20));
839        assert!(w.is_some(), "should warn when parallel > max_connections");
840        let msg = w.unwrap();
841        assert!(msg.contains("max_connections=20"), "got: {msg}");
842    }
843
844    #[test]
845    fn connection_limit_no_warn_when_parallel_below_max() {
846        let w = check_connection_limit(4, Some(100));
847        assert!(
848            w.is_none(),
849            "should not warn when parallel << max_connections"
850        );
851    }
852
853    #[test]
854    fn connection_limit_no_warn_when_parallel_is_one() {
855        let w = check_connection_limit(1, Some(5));
856        assert!(
857            w.is_none(),
858            "single worker never triggers connection warning"
859        );
860    }
861
862    #[test]
863    fn connection_limit_skipped_note_when_max_unknown_and_parallel_gt_one() {
864        let w = check_connection_limit(100, None);
865        assert!(w.is_some(), "should note that check was skipped");
866        let msg = w.unwrap();
867        assert!(msg.contains("skipped"), "got: {msg}");
868    }
869
870    #[test]
871    fn connection_limit_no_note_when_max_unknown_and_parallel_is_one() {
872        let w = check_connection_limit(1, None);
873        assert!(
874            w.is_none(),
875            "single worker never triggers connection warning"
876        );
877    }
878
879    #[test]
880    fn connection_limit_suggests_headroom() {
881        let w = check_connection_limit(25, Some(20)).unwrap();
882        // Suggested safe max should be max_connections - 3 = 17
883        assert!(
884            w.contains("17"),
885            "should suggest leaving headroom, got: {w}"
886        );
887    }
888
889    // ── v0.7.4: actionable hints next to categorised errors ───────────
890
891    fn src_hint(msg: &str, st: SourceType) -> Option<&'static str> {
892        let err = anyhow::anyhow!("{}", msg);
893        let cat = categorize_source_error(&err);
894        source_error_hint(cat, &err, &st)
895    }
896
897    fn dest_hint(msg: &str, dt: DestinationType) -> Option<&'static str> {
898        let err = anyhow::anyhow!("{}", msg);
899        let dest = DestinationConfig {
900            destination_type: dt,
901            bucket: Some("b".into()),
902            ..Default::default()
903        };
904        let cat = categorize_dest_error(&err, &dest);
905        destination_error_hint(cat, &dest)
906    }
907
908    #[test]
909    fn source_tls_handshake_returns_pg_specific_tls_hint() {
910        let h = src_hint("TLS handshake failed", SourceType::Postgres).expect("hint");
911        assert!(h.contains("tls.mode") && h.contains("ca_file"), "got: {h}");
912    }
913
914    #[test]
915    fn source_tls_handshake_returns_mysql_specific_tls_hint() {
916        let h = src_hint("certificate verify failed", SourceType::Mysql).expect("hint");
917        assert!(h.contains("tls.mode"), "got: {h}");
918    }
919
920    #[test]
921    fn source_auth_error_postgres_mentions_pg_hba() {
922        let h = src_hint("password authentication failed", SourceType::Postgres).expect("hint");
923        assert!(h.contains("pg_hba") && h.contains("SELECT"), "got: {h}");
924    }
925
926    #[test]
927    fn source_auth_error_mysql_mentions_grant() {
928        let h = src_hint(
929            "Access denied for user 'rivet'@'localhost'",
930            SourceType::Mysql,
931        )
932        .expect("hint");
933        assert!(h.contains("GRANT") && h.contains("FLUSH"), "got: {h}");
934    }
935
936    #[test]
937    fn source_connectivity_error_mentions_bastion_and_network() {
938        let h = src_hint("connection refused", SourceType::Postgres).expect("hint");
939        assert!(h.contains("bastion") || h.contains("VPN"), "got: {h}");
940    }
941
942    #[test]
943    fn source_unknown_error_returns_no_hint() {
944        // Generic "error" category should yield no hint — better to
945        // print the raw driver message than to mislead.
946        let h = src_hint("totally unexpected", SourceType::Postgres);
947        assert!(h.is_none(), "unknown errors should not produce a hint");
948    }
949
950    #[test]
951    fn dest_s3_auth_error_names_concrete_actions() {
952        let h = dest_hint("permission denied", DestinationType::S3).expect("hint");
953        assert!(
954            h.contains("s3:PutObject") && h.contains("cloud-permissions"),
955            "got: {h}"
956        );
957    }
958
959    #[test]
960    fn dest_gcs_auth_error_names_concrete_actions() {
961        let h = dest_hint("403 Forbidden", DestinationType::Gcs).expect("hint");
962        assert!(
963            h.contains("storage.objects") && h.contains("cloud-permissions"),
964            "got: {h}"
965        );
966    }
967
968    #[test]
969    fn categorize_dest_error_sas_expired_message_returns_sas_expired_category() {
970        // Guard the load-bearing ordering in categorize_dest_error: the
971        // "sas expired" early-return must fire before the generic "token"
972        // branch, or destination_error_hint produces the wrong hint.
973        // This test pins the *category string*, not just the final hint text.
974        let err = anyhow::anyhow!(
975            "Azure SAS token already expired (se=2024-01-01T00:00:00Z). Generate a new SAS and re-export."
976        );
977        let dest = DestinationConfig {
978            destination_type: DestinationType::Azure,
979            bucket: Some("c".into()),
980            ..Default::default()
981        };
982        let cat = categorize_dest_error(&err, &dest);
983        assert_eq!(
984            cat, "sas expired",
985            "expired-SAS error must categorise as 'sas expired', not '{cat}' — ordering in categorize_dest_error is load-bearing"
986        );
987    }
988
989    #[test]
990    fn dest_azure_sas_expired_returns_regenerate_hint() {
991        // The Azure preflight (v0.7.4) bails with "expired (se=…)" —
992        // the hint must steer the operator to `az storage container
993        // generate-sas` not "your IAM role is broken".
994        let h = dest_hint(
995            "Azure SAS token already expired (se=2024-01-01T00:00:00Z)",
996            DestinationType::Azure,
997        )
998        .expect("hint");
999        assert!(
1000            h.contains("generate-sas") && h.contains("AZURE_STORAGE_SAS_TOKEN"),
1001            "got: {h}"
1002        );
1003    }
1004
1005    #[test]
1006    fn dest_s3_bucket_not_found_says_no_auto_create() {
1007        let h = dest_hint("NoSuchBucket", DestinationType::S3).expect("hint");
1008        assert!(
1009            h.contains("does NOT auto-create") && h.contains("aws s3 mb"),
1010            "got: {h}"
1011        );
1012    }
1013
1014    #[test]
1015    fn dest_s3_connectivity_error_warns_about_region_mismatch() {
1016        let h = dest_hint("dns error", DestinationType::S3).expect("hint");
1017        assert!(h.contains("region") || h.contains("endpoint"), "got: {h}");
1018    }
1019}