Skip to main content

rivet/preflight/
mod.rs

1mod analysis;
2pub(crate) mod cursor_expr;
3mod doctor;
4mod mysql;
5mod postgres;
6pub mod type_report;
7
8pub(crate) use analysis::chunk_sparsity_from_counts;
9#[cfg(test)]
10use analysis::{
11    build_suggestion, check_connection_limit, check_dense_surrogate_cost,
12    check_parallel_memory_risk, check_sparse_range, compute_verdict, derive_strategy,
13    recommend_parallelism, recommend_profile,
14};
15#[allow(unused_imports)]
16pub use doctor::doctor;
17#[cfg(test)]
18use postgres::{extract_scan_type, parse_pg_row_estimate};
19
20use crate::config::{Config, ExportConfig, SourceType};
21use crate::error::Result;
22use crate::types::policy::TypePolicy;
23use crate::types::target::ExportTarget;
24
25#[derive(Debug)]
26pub enum HealthVerdict {
27    Efficient,
28    Acceptable,
29    Degraded,
30    Unsafe,
31}
32
33impl std::fmt::Display for HealthVerdict {
34    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
35        match self {
36            Self::Efficient => write!(f, "EFFICIENT"),
37            Self::Acceptable => write!(f, "ACCEPTABLE"),
38            Self::Degraded => write!(f, "DEGRADED"),
39            Self::Unsafe => write!(f, "UNSAFE"),
40        }
41    }
42}
43
44pub(crate) struct ExportDiagnostic {
45    pub export_name: String,
46    pub strategy: String,
47    pub mode: String,
48    pub cursor_column: Option<String>,
49    pub row_estimate: Option<i64>,
50    pub cursor_min: Option<String>,
51    pub cursor_max: Option<String>,
52    pub scan_type: Option<String>,
53    pub uses_index: bool,
54    pub verdict: HealthVerdict,
55    pub recommended_profile: &'static str,
56    pub recommended_parallel: (u32, &'static str),
57    pub warnings: Vec<String>,
58    pub suggestion: Option<String>,
59}
60
61/// Return the diagnostic for a single export without printing anything.
62///
63/// Used by `rivet plan` to capture preflight data into a `PlanArtifact`.
64pub(crate) fn get_export_diagnostic(
65    config: &Config,
66    export: &ExportConfig,
67) -> Result<ExportDiagnostic> {
68    let url = config.source.resolve_url()?;
69    let tls = config.source.tls.as_ref();
70    crate::source::warn_if_tls_disabled(&config.source);
71    match config.source.source_type {
72        SourceType::Postgres => postgres::diagnose_export_pg(&url, tls, export),
73        SourceType::Mysql => mysql::diagnose_export_mysql(&url, tls, export),
74        SourceType::Mssql => {
75            anyhow::bail!("mssql preflight diagnostics not yet implemented (scaffold)")
76        }
77    }
78}
79
80pub fn check(
81    config_path: &str,
82    export_name: Option<&str>,
83    params: Option<&std::collections::HashMap<String, String>>,
84    show_type_report: bool,
85    strict: bool,
86    json_output: bool,
87    target: Option<ExportTarget>,
88) -> Result<()> {
89    let config = Config::load_with_params(config_path, params)?;
90
91    let exports: Vec<&ExportConfig> = if let Some(name) = export_name {
92        let e = config
93            .exports
94            .iter()
95            .find(|e| e.name == name)
96            .ok_or_else(|| anyhow::anyhow!("export '{}' not found in config", name))?;
97        vec![e]
98    } else {
99        config.exports.iter().collect()
100    };
101
102    let url = config.source.resolve_url()?;
103    let tls = config.source.tls.as_ref();
104    // Surface the plaintext-transport warning at preflight time too —
105    // operators should hear it from `rivet check` before they wait
106    // through a full `rivet run` to learn the same thing. `Once` inside
107    // the helper keeps emission to one line per process even when both
108    // `check` and `run` flow through it.
109    crate::source::warn_if_tls_disabled(&config.source);
110    match config.source.source_type {
111        SourceType::Postgres => postgres::check_postgres(&url, tls, &exports, json_output)?,
112        SourceType::Mysql => mysql::check_mysql(&url, tls, &exports, json_output)?,
113        SourceType::Mssql => anyhow::bail!("mssql check not yet implemented (scaffold)"),
114    }
115
116    // Destination credential-resolution preflight.  Until 0.7.6 `check` only
117    // probed the source: a config with `AWS_ACCESS_KEY_ID` unset would pass
118    // `rivet check` (rc=0) and then explode on `run`, while `rivet doctor`
119    // caught it.  We don't issue a write-probe here (that is `doctor`'s job
120    // and has side effects) — but we *do* call `create_destination`, which
121    // resolves env vars / credentials_file existence at construction time.
122    // Each unique destination is probed once per `check` to keep multi-export
123    // configs cheap.
124    let mut seen_destinations: std::collections::HashSet<String> = std::collections::HashSet::new();
125    for export in &exports {
126        let dest_key = format!(
127            "{:?}:{}:{}:{}",
128            export.destination.destination_type,
129            export.destination.bucket.as_deref().unwrap_or("-"),
130            export.destination.endpoint.as_deref().unwrap_or("-"),
131            export.destination.path.as_deref().unwrap_or("-"),
132        );
133        if !seen_destinations.insert(dest_key) {
134            continue;
135        }
136        let expanded = crate::plan::build::expand_destination_templates(
137            export.destination.clone(),
138            &export.name,
139        );
140        crate::destination::create_destination(&expanded).map_err(|e| {
141            anyhow::anyhow!(
142                "export '{}': destination preflight failed: {:#}",
143                export.name,
144                e
145            )
146        })?;
147    }
148
149    if show_type_report {
150        let policy = if strict {
151            TypePolicy::strict()
152        } else {
153            TypePolicy::warn_only()
154        };
155
156        let mut any_fatal = false;
157        for export in &exports {
158            let column_overrides =
159                crate::plan::parse_column_overrides_pub(&export.columns, &export.name)?;
160            // CLI `--target` wins; otherwise fall back to the per-export
161            // `target:` from the config (slice #2a). A declared-but-unknown
162            // target is a loud error — never silently ignored.
163            if let Some(t) = export.target.as_deref()
164                && crate::types::target::ExportTarget::parse(t).is_none()
165            {
166                anyhow::bail!(
167                    "export '{}': unknown target '{t}' (expected: bigquery, duckdb)",
168                    export.name
169                );
170            }
171            let eff_target = target.or_else(|| {
172                export
173                    .target
174                    .as_deref()
175                    .and_then(crate::types::target::ExportTarget::parse)
176            });
177            let config_dir = std::path::Path::new(config_path)
178                .parent()
179                .unwrap_or_else(|| std::path::Path::new("."));
180            match type_report::collect_report(
181                &config,
182                export,
183                &column_overrides,
184                &policy,
185                eff_target,
186                config_dir,
187                params,
188            ) {
189                Ok(report) => {
190                    if report.has_fatal() {
191                        any_fatal = true;
192                    }
193                    if eff_target.is_some() && report.has_target_fail() {
194                        any_fatal = true;
195                    }
196                    if json_output {
197                        type_report::print_json(&report)?;
198                    } else {
199                        type_report::print_table(&report, eff_target);
200                    }
201                }
202                Err(e) => {
203                    log::warn!("type report for '{}' failed: {:#}", export.name, e);
204                }
205            }
206        }
207
208        if strict && any_fatal {
209            anyhow::bail!("strict mode: unsafe type mappings found (see report above)");
210        }
211    }
212
213    Ok(())
214}
215
216fn print_diagnostic(diag: &ExportDiagnostic) {
217    println!();
218    println!("Export: {}", diag.export_name);
219    println!("  Strategy:     {}", diag.strategy);
220    println!("  Mode:         {}", diag.mode);
221    if let Some(est) = diag.row_estimate {
222        if est >= 1_000_000 {
223            println!("  Row estimate: ~{}M", est / 1_000_000);
224        } else if est >= 1_000 {
225            println!("  Row estimate: ~{}K", est / 1_000);
226        } else {
227            println!("  Row estimate: ~{}", est);
228        }
229    }
230    if let (Some(min_v), Some(max_v)) = (&diag.cursor_min, &diag.cursor_max) {
231        println!("  Cursor range: {} .. {}", min_v, max_v);
232    }
233    if let Some(col) = &diag.cursor_column {
234        println!("  Cursor col:   {}", col);
235    }
236    if let Some(scan) = &diag.scan_type {
237        let idx_label = if diag.uses_index { " (indexed)" } else { "" };
238        println!("  Scan type:    {}{}", scan, idx_label);
239    }
240    println!("  Verdict:      {}", diag.verdict);
241    println!(
242        "  Recommended:  tuning.profile: {}",
243        diag.recommended_profile
244    );
245    let (par_level, par_reason) = diag.recommended_parallel;
246    if par_level > 1 {
247        println!("  Recommended:  parallel: {} ({})", par_level, par_reason);
248    } else {
249        println!("  Parallelism:  {} ({})", par_level, par_reason);
250    }
251    for w in &diag.warnings {
252        println!("  Warning:      {}", w);
253    }
254    if let Some(suggestion) = &diag.suggestion {
255        println!("  Suggestion:   {}", suggestion);
256    }
257}
258
259#[cfg(test)]
260mod tests {
261    use super::*;
262    use crate::config::{DestinationConfig, DestinationType, ExportConfig, ExportMode, FormatType};
263    use doctor::{
264        categorize_dest_error, categorize_source_error, destination_error_hint, source_error_hint,
265    };
266
267    fn make_export(name: &str, mode: ExportMode, cursor: Option<&str>) -> ExportConfig {
268        // Baseline from the canonical test fixture; override only the fields
269        // these preflight tests vary (mode, cursor, CSV format, query, dest).
270        ExportConfig {
271            mode,
272            cursor_column: cursor.map(|s| s.to_string()),
273            query: Some("SELECT * FROM t".to_string()),
274            format: FormatType::Csv,
275            destination: DestinationConfig {
276                destination_type: DestinationType::Local,
277                path: Some("./out".to_string()),
278                ..Default::default()
279            },
280            ..crate::config::sample_export(name)
281        }
282    }
283
284    #[test]
285    fn verdict_small_indexed_with_cursor_is_efficient() {
286        let v = compute_verdict(Some(500_000), true, true);
287        assert!(matches!(v, HealthVerdict::Efficient), "got: {v}");
288    }
289
290    #[test]
291    fn verdict_large_indexed_with_cursor_is_acceptable() {
292        let v = compute_verdict(Some(20_000_000), true, true);
293        assert!(matches!(v, HealthVerdict::Acceptable), "got: {v}");
294    }
295
296    #[test]
297    fn verdict_no_index_no_cursor_is_degraded() {
298        let v = compute_verdict(Some(500_000), false, false);
299        assert!(matches!(v, HealthVerdict::Degraded), "got: {v}");
300    }
301
302    #[test]
303    fn verdict_huge_no_index_is_unsafe() {
304        let v = compute_verdict(Some(100_000_000), false, false);
305        assert!(matches!(v, HealthVerdict::Unsafe), "got: {v}");
306    }
307
308    #[test]
309    fn parse_pg_row_estimate_from_sort_plan() {
310        let plan = "Sort  (cost=12345.67..12456.78 rows=1000455 width=50)\n  ->  Seq Scan on orders  (cost=0.00..8765.43 rows=1000455 width=50)";
311        assert_eq!(parse_pg_row_estimate(plan), Some(1_000_455));
312    }
313
314    #[test]
315    fn parse_pg_row_estimate_from_index_scan() {
316        let plan =
317            "Index Scan using idx_updated on orders  (cost=0.42..81676.36 rows=500000 width=50)";
318        assert_eq!(parse_pg_row_estimate(plan), Some(500_000));
319    }
320
321    #[test]
322    fn extract_scan_type_detects_seq_scan() {
323        let plan = "Sort  (cost=...)\n  ->  Seq Scan on users  (cost=...)";
324        let st = extract_scan_type(plan);
325        assert!(st.contains("Seq Scan"), "expected Seq Scan, got: {st}");
326    }
327
328    #[test]
329    fn extract_scan_type_detects_index_scan() {
330        let plan = "Index Scan using users_pkey on users  (cost=0.42..123.45 rows=100 width=50)";
331        let st = extract_scan_type(plan);
332        assert!(st.contains("Index Scan"), "expected Index Scan, got: {st}");
333    }
334
335    #[test]
336    fn suggestion_for_efficient_verdict_is_none() {
337        let e = make_export("t", ExportMode::Full, None);
338        let s = build_suggestion(&HealthVerdict::Efficient, Some(1000), true, &e);
339        assert!(
340            s.is_none(),
341            "efficient verdict should produce no suggestion"
342        );
343    }
344
345    #[test]
346    fn suggestion_for_degraded_verdict_recommends_safe_profile() {
347        let e = make_export("t", ExportMode::Full, None);
348        let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e);
349        let msg = s.expect("degraded verdict should produce a suggestion");
350        assert!(
351            msg.contains("safe"),
352            "suggestion should recommend safe profile, got: {msg}"
353        );
354    }
355
356    fn src_err(msg: &str) -> &'static str {
357        categorize_source_error(&anyhow::anyhow!("{}", msg))
358    }
359
360    #[test]
361    fn source_password_rejected_is_auth_error() {
362        assert_eq!(
363            src_err("password authentication failed for user \"rivet\""),
364            "auth error"
365        );
366    }
367
368    #[test]
369    fn source_authentication_failed_is_auth_error() {
370        assert_eq!(src_err("FATAL: authentication failed"), "auth error");
371    }
372
373    #[test]
374    fn source_access_denied_is_auth_error() {
375        assert_eq!(
376            src_err("Access denied for user 'rivet'@'localhost'"),
377            "auth error"
378        );
379    }
380
381    #[test]
382    fn source_connection_refused_is_connectivity() {
383        assert_eq!(
384            src_err("connection refused (os error 61)"),
385            "connectivity error"
386        );
387    }
388
389    #[test]
390    fn source_timed_out_is_connectivity() {
391        assert_eq!(src_err("connection timed out"), "connectivity error");
392    }
393
394    #[test]
395    fn source_dns_translate_host_is_connectivity() {
396        assert_eq!(
397            src_err("could not translate host name \"db.bad\" to address"),
398            "connectivity error"
399        );
400    }
401
402    #[test]
403    fn source_name_not_known_is_connectivity() {
404        assert_eq!(src_err("Name or service not known"), "connectivity error");
405    }
406
407    #[test]
408    fn source_unknown_error_is_generic() {
409        assert_eq!(src_err("something totally unexpected"), "error");
410    }
411
412    fn dest_config(dtype: DestinationType) -> DestinationConfig {
413        DestinationConfig {
414            destination_type: dtype,
415            bucket: Some("b".to_string()),
416            ..Default::default()
417        }
418    }
419
420    fn dest_err(msg: &str, dtype: DestinationType) -> &'static str {
421        let cfg = dest_config(dtype);
422        categorize_dest_error(&anyhow::anyhow!("{}", msg), &cfg)
423    }
424
425    #[test]
426    fn dest_credential_loading_is_auth_error() {
427        assert_eq!(
428            dest_err(
429                "loading credential to sign http request",
430                DestinationType::Gcs
431            ),
432            "auth error"
433        );
434    }
435
436    #[test]
437    fn dest_permission_denied_is_auth_error() {
438        assert_eq!(
439            dest_err("permission denied on resource bucket", DestinationType::S3),
440            "auth error"
441        );
442    }
443
444    #[test]
445    fn dest_forbidden_is_auth_error() {
446        assert_eq!(
447            dest_err("403 Forbidden", DestinationType::Gcs),
448            "auth error"
449        );
450    }
451
452    #[test]
453    fn dest_unauthorized_is_auth_error() {
454        assert_eq!(
455            dest_err("401 Unauthorized", DestinationType::S3),
456            "auth error"
457        );
458    }
459
460    #[test]
461    fn dest_invalid_grant_is_auth_error() {
462        assert_eq!(
463            dest_err(
464                "invalid_grant: token has been revoked",
465                DestinationType::Gcs
466            ),
467            "auth error"
468        );
469    }
470
471    #[test]
472    fn dest_nosuchbucket_s3_is_bucket_not_found() {
473        assert_eq!(
474            dest_err(
475                "NoSuchBucket: the specified bucket does not exist",
476                DestinationType::S3
477            ),
478            "bucket not found"
479        );
480    }
481
482    #[test]
483    fn dest_not_found_gcs_is_bucket_not_found() {
484        assert_eq!(
485            dest_err("bucket not found (404)", DestinationType::Gcs),
486            "bucket not found"
487        );
488    }
489
490    #[test]
491    fn dest_not_found_local_is_path_not_found() {
492        assert_eq!(
493            dest_err("path not found: /tmp/missing", DestinationType::Local),
494            "path not found"
495        );
496    }
497
498    #[test]
499    fn dest_connection_refused_is_connectivity() {
500        assert_eq!(
501            dest_err("connection refused to endpoint", DestinationType::S3),
502            "connectivity error"
503        );
504    }
505
506    #[test]
507    fn dest_dns_error_is_connectivity() {
508        assert_eq!(
509            dest_err("dns error: failed to lookup address", DestinationType::S3),
510            "connectivity error"
511        );
512    }
513
514    #[test]
515    fn dest_timed_out_is_connectivity() {
516        assert_eq!(
517            dest_err("request timed out after 30s", DestinationType::Gcs),
518            "connectivity error"
519        );
520    }
521
522    #[test]
523    fn dest_unknown_error_is_generic() {
524        assert_eq!(
525            dest_err("something else entirely", DestinationType::S3),
526            "error"
527        );
528    }
529
530    #[test]
531    fn strategy_full_scan() {
532        let e = make_export("t", ExportMode::Full, None);
533        assert_eq!(derive_strategy(&e), "full-scan");
534    }
535
536    #[test]
537    fn strategy_full_parallel() {
538        let mut e = make_export("t", ExportMode::Full, None);
539        e.parallel = 4;
540        assert_eq!(derive_strategy(&e), "full-parallel(4)");
541    }
542
543    #[test]
544    fn strategy_incremental() {
545        let e = make_export("t", ExportMode::Incremental, Some("updated_at"));
546        assert_eq!(derive_strategy(&e), "incremental(updated_at)");
547    }
548
549    #[test]
550    fn strategy_chunked() {
551        let mut e = make_export("t", ExportMode::Chunked, None);
552        e.chunk_column = Some("id".to_string());
553        e.chunk_size = 50_000;
554        assert_eq!(derive_strategy(&e), "chunked(id, size=50000)");
555    }
556
557    #[test]
558    fn strategy_chunked_parallel() {
559        let mut e = make_export("t", ExportMode::Chunked, None);
560        e.chunk_column = Some("id".to_string());
561        e.chunk_size = 50_000;
562        e.parallel = 3;
563        assert_eq!(derive_strategy(&e), "chunked-parallel(id, size=50000, p=3)");
564    }
565
566    #[test]
567    fn strategy_time_window() {
568        let mut e = make_export("t", ExportMode::TimeWindow, None);
569        e.time_column = Some("created_at".to_string());
570        e.days_window = Some(7);
571        assert_eq!(derive_strategy(&e), "time-window(created_at, 7d)");
572    }
573
574    #[test]
575    fn profile_small_indexed_is_fast() {
576        let e = make_export("t", ExportMode::Full, None);
577        assert_eq!(recommend_profile(Some(500_000), true, &e), "fast");
578    }
579
580    #[test]
581    fn profile_medium_indexed_is_balanced() {
582        let e = make_export("t", ExportMode::Full, None);
583        assert_eq!(recommend_profile(Some(5_000_000), true, &e), "balanced");
584    }
585
586    #[test]
587    fn profile_large_indexed_is_safe() {
588        let e = make_export("t", ExportMode::Full, None);
589        assert_eq!(recommend_profile(Some(50_000_000), true, &e), "safe");
590    }
591
592    #[test]
593    fn profile_small_no_index_is_balanced() {
594        let e = make_export("t", ExportMode::Full, None);
595        assert_eq!(recommend_profile(Some(50_000), false, &e), "balanced");
596    }
597
598    #[test]
599    fn profile_small_no_index_parallel_is_safe() {
600        let mut e = make_export("t", ExportMode::Full, None);
601        e.parallel = 4;
602        assert_eq!(recommend_profile(Some(50_000), false, &e), "safe");
603    }
604
605    #[test]
606    fn profile_medium_no_index_is_balanced() {
607        let e = make_export("t", ExportMode::Full, None);
608        assert_eq!(recommend_profile(Some(500_000), false, &e), "balanced");
609    }
610
611    #[test]
612    fn profile_large_no_index_is_safe() {
613        let e = make_export("t", ExportMode::Full, None);
614        assert_eq!(recommend_profile(Some(5_000_000), false, &e), "safe");
615    }
616
617    #[test]
618    fn sparse_range_warning_when_very_sparse() {
619        let mut e = make_export("t", ExportMode::Chunked, None);
620        e.chunk_column = Some("id".to_string());
621        e.chunk_size = 100_000;
622        let w = check_sparse_range(&e, Some(100_000), Some("1"), Some("10000000"));
623        assert!(w.is_some(), "should warn about sparse range");
624        let msg = w.unwrap();
625        assert!(msg.contains("Sparse key range"), "got: {msg}");
626        assert!(msg.contains("empty"), "got: {msg}");
627    }
628
629    #[test]
630    fn sparse_range_no_warning_when_dense() {
631        let mut e = make_export("t", ExportMode::Chunked, None);
632        e.chunk_column = Some("id".to_string());
633        e.chunk_size = 100_000;
634        let w = check_sparse_range(&e, Some(100_000), Some("1"), Some("100000"));
635        assert!(w.is_none(), "should not warn for dense range");
636    }
637
638    #[test]
639    fn sparse_range_skipped_when_chunk_dense() {
640        let mut e = make_export("t", ExportMode::Chunked, None);
641        e.chunk_column = Some("id".to_string());
642        e.chunk_dense = true;
643        e.chunk_size = 100_000;
644        let w = check_sparse_range(&e, Some(100_000), Some("1"), Some("10000000"));
645        assert!(
646            w.is_none(),
647            "chunk_dense uses ordinals, not physical id span"
648        );
649    }
650
651    #[test]
652    fn dense_surrogate_warning_when_chunk_dense_builtin() {
653        let mut e = make_export("t", ExportMode::Chunked, None);
654        e.chunk_column = Some("id".to_string());
655        e.chunk_dense = true;
656        e.query = Some("SELECT id FROM orders".to_string());
657        let w = check_dense_surrogate_cost(&e);
658        assert!(w.is_some(), "should warn about built-in ROW_NUMBER cost");
659        assert!(w.unwrap().contains("global sort"));
660    }
661
662    #[test]
663    fn sparse_range_not_triggered_for_non_chunked() {
664        let e = make_export("t", ExportMode::Full, None);
665        let w = check_sparse_range(&e, Some(100), Some("1"), Some("1000000"));
666        assert!(w.is_none(), "should not warn for non-chunked mode");
667    }
668
669    #[test]
670    fn dense_surrogate_warning_with_row_number() {
671        let mut e = make_export("t", ExportMode::Chunked, None);
672        e.chunk_column = Some("rn".to_string());
673        e.query = Some("SELECT *, ROW_NUMBER() OVER (ORDER BY id) AS rn FROM orders".to_string());
674        let w = check_dense_surrogate_cost(&e);
675        assert!(w.is_some(), "should warn about ROW_NUMBER cost");
676        assert!(w.unwrap().contains("global sort"));
677    }
678
679    #[test]
680    fn no_dense_surrogate_warning_without_row_number() {
681        let mut e = make_export("t", ExportMode::Chunked, None);
682        e.chunk_column = Some("id".to_string());
683        e.query = Some("SELECT * FROM orders".to_string());
684        let w = check_dense_surrogate_cost(&e);
685        assert!(w.is_none());
686    }
687
688    #[test]
689    fn no_dense_surrogate_warning_for_non_chunked() {
690        let mut e = make_export("t", ExportMode::Full, None);
691        e.query = Some("SELECT ROW_NUMBER() OVER () AS rn FROM t".to_string());
692        let w = check_dense_surrogate_cost(&e);
693        assert!(w.is_none(), "should not warn for non-chunked mode");
694    }
695
696    #[test]
697    fn parallel_memory_warning_large_dataset() {
698        let mut e = make_export("t", ExportMode::Chunked, None);
699        e.parallel = 4;
700        let w = check_parallel_memory_risk(&e, Some(10_000_000));
701        assert!(w.is_some(), "should warn about memory risk");
702        let msg = w.unwrap();
703        assert!(msg.contains("Parallel=4"), "got: {msg}");
704        assert!(msg.contains("memory"), "got: {msg}");
705    }
706
707    #[test]
708    fn no_parallel_memory_warning_small_dataset() {
709        let mut e = make_export("t", ExportMode::Chunked, None);
710        e.parallel = 4;
711        let w = check_parallel_memory_risk(&e, Some(1_000));
712        assert!(w.is_none(), "should not warn for small dataset");
713    }
714
715    #[test]
716    fn no_parallel_memory_warning_single_worker() {
717        let e = make_export("t", ExportMode::Full, None);
718        let w = check_parallel_memory_risk(&e, Some(100_000_000));
719        assert!(w.is_none(), "should not warn when parallel=1");
720    }
721
722    #[test]
723    fn suggestion_degraded_full_recommends_incremental() {
724        let e = make_export("t", ExportMode::Full, None);
725        let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e).unwrap();
726        assert!(s.contains("incremental"), "got: {s}");
727    }
728
729    #[test]
730    fn suggestion_degraded_chunked_recommends_index() {
731        let mut e = make_export("t", ExportMode::Chunked, None);
732        e.chunk_column = Some("id".to_string());
733        let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e).unwrap();
734        assert!(s.contains("index on 'id'"), "got: {s}");
735    }
736
737    #[test]
738    fn suggestion_degraded_time_window_recommends_index() {
739        let mut e = make_export("t", ExportMode::TimeWindow, None);
740        e.time_column = Some("created_at".to_string());
741        e.days_window = Some(7);
742        let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e).unwrap();
743        assert!(s.contains("index on 'created_at'"), "got: {s}");
744    }
745
746    #[test]
747    fn suggestion_unsafe_full_recommends_incremental() {
748        let e = make_export("t", ExportMode::Full, None);
749        let s = build_suggestion(&HealthVerdict::Unsafe, Some(100_000_000), false, &e).unwrap();
750        assert!(s.contains("incremental"), "got: {s}");
751    }
752
753    #[test]
754    fn suggestion_unsafe_chunked_recommends_index_and_parallel() {
755        let mut e = make_export("t", ExportMode::Chunked, None);
756        e.chunk_column = Some("id".to_string());
757        let s = build_suggestion(&HealthVerdict::Unsafe, Some(100_000_000), false, &e).unwrap();
758        assert!(s.contains("index on 'id'"), "got: {s}");
759        assert!(s.contains("parallel"), "got: {s}");
760    }
761
762    #[test]
763    fn suggestion_unsafe_incremental_recommends_index_on_cursor() {
764        let e = make_export("t", ExportMode::Incremental, Some("updated_at"));
765        let s = build_suggestion(&HealthVerdict::Unsafe, Some(100_000_000), false, &e).unwrap();
766        assert!(s.contains("index on 'updated_at'"), "got: {s}");
767    }
768
769    #[test]
770    fn suggestion_acceptable_large_full_recommends_incremental() {
771        let e = make_export("t", ExportMode::Full, None);
772        let s = build_suggestion(&HealthVerdict::Acceptable, Some(20_000_000), true, &e).unwrap();
773        assert!(s.contains("incremental"), "got: {s}");
774    }
775
776    #[test]
777    fn parallel_only_for_chunked_mode() {
778        let e = make_export("t", ExportMode::Full, None);
779        let (level, _) = recommend_parallelism(&e, Some(1_000_000), true);
780        assert_eq!(level, 1, "non-chunked mode should recommend 1");
781    }
782
783    #[test]
784    fn parallel_small_dataset_is_one() {
785        let mut e = make_export("t", ExportMode::Chunked, None);
786        e.chunk_column = Some("id".to_string());
787        let (level, _) = recommend_parallelism(&e, Some(10_000), true);
788        assert_eq!(level, 1, "small dataset should recommend 1");
789    }
790
791    #[test]
792    fn parallel_moderate_indexed_is_two() {
793        let mut e = make_export("t", ExportMode::Chunked, None);
794        e.chunk_column = Some("id".to_string());
795        let (level, _) = recommend_parallelism(&e, Some(200_000), true);
796        assert_eq!(level, 2, "moderate indexed dataset should recommend 2");
797    }
798
799    #[test]
800    fn parallel_large_indexed_is_four() {
801        let mut e = make_export("t", ExportMode::Chunked, None);
802        e.chunk_column = Some("id".to_string());
803        let (level, _) = recommend_parallelism(&e, Some(2_000_000), true);
804        assert_eq!(level, 4, "large indexed dataset should recommend 4");
805    }
806
807    #[test]
808    fn parallel_no_index_large_is_one() {
809        let mut e = make_export("t", ExportMode::Chunked, None);
810        e.chunk_column = Some("id".to_string());
811        let (level, reason) = recommend_parallelism(&e, Some(10_000_000), false);
812        assert_eq!(level, 1, "no index + large should recommend 1");
813        assert!(reason.contains("no index"), "got: {reason}");
814    }
815
816    #[test]
817    fn parallel_no_index_moderate_is_conservative() {
818        let mut e = make_export("t", ExportMode::Chunked, None);
819        e.chunk_column = Some("id".to_string());
820        let (level, _) = recommend_parallelism(&e, Some(200_000), false);
821        assert_eq!(
822            level, 2,
823            "no index + moderate should recommend 2 (conservative)"
824        );
825    }
826
827    #[test]
828    fn suggestion_acceptable_large_chunked_recommends_parallel() {
829        let mut e = make_export("t", ExportMode::Chunked, None);
830        e.chunk_column = Some("id".to_string());
831        let s = build_suggestion(&HealthVerdict::Acceptable, Some(20_000_000), true, &e).unwrap();
832        assert!(s.contains("parallel"), "got: {s}");
833    }
834
835    #[test]
836    fn connection_limit_warn_when_parallel_meets_max() {
837        let w = check_connection_limit(20, Some(20));
838        assert!(w.is_some(), "should warn when parallel == max_connections");
839        let msg = w.unwrap();
840        assert!(msg.contains("max_connections=20"), "got: {msg}");
841        assert!(msg.contains("parallel=20"), "got: {msg}");
842    }
843
844    #[test]
845    fn connection_limit_warn_when_parallel_exceeds_max() {
846        let w = check_connection_limit(100, Some(20));
847        assert!(w.is_some(), "should warn when parallel > max_connections");
848        let msg = w.unwrap();
849        assert!(msg.contains("max_connections=20"), "got: {msg}");
850    }
851
852    #[test]
853    fn connection_limit_no_warn_when_parallel_below_max() {
854        let w = check_connection_limit(4, Some(100));
855        assert!(
856            w.is_none(),
857            "should not warn when parallel << max_connections"
858        );
859    }
860
861    #[test]
862    fn connection_limit_no_warn_when_parallel_is_one() {
863        let w = check_connection_limit(1, Some(5));
864        assert!(
865            w.is_none(),
866            "single worker never triggers connection warning"
867        );
868    }
869
870    #[test]
871    fn connection_limit_skipped_note_when_max_unknown_and_parallel_gt_one() {
872        let w = check_connection_limit(100, None);
873        assert!(w.is_some(), "should note that check was skipped");
874        let msg = w.unwrap();
875        assert!(msg.contains("skipped"), "got: {msg}");
876    }
877
878    #[test]
879    fn connection_limit_no_note_when_max_unknown_and_parallel_is_one() {
880        let w = check_connection_limit(1, None);
881        assert!(
882            w.is_none(),
883            "single worker never triggers connection warning"
884        );
885    }
886
887    #[test]
888    fn connection_limit_suggests_headroom() {
889        let w = check_connection_limit(25, Some(20)).unwrap();
890        // Suggested safe max should be max_connections - 3 = 17
891        assert!(
892            w.contains("17"),
893            "should suggest leaving headroom, got: {w}"
894        );
895    }
896
897    // ── v0.7.4: actionable hints next to categorised errors ───────────
898
899    fn src_hint(msg: &str, st: SourceType) -> Option<&'static str> {
900        let err = anyhow::anyhow!("{}", msg);
901        let cat = categorize_source_error(&err);
902        source_error_hint(cat, &err, &st)
903    }
904
905    fn dest_hint(msg: &str, dt: DestinationType) -> Option<&'static str> {
906        let err = anyhow::anyhow!("{}", msg);
907        let dest = DestinationConfig {
908            destination_type: dt,
909            bucket: Some("b".into()),
910            ..Default::default()
911        };
912        let cat = categorize_dest_error(&err, &dest);
913        destination_error_hint(cat, &dest)
914    }
915
916    #[test]
917    fn source_tls_handshake_returns_pg_specific_tls_hint() {
918        let h = src_hint("TLS handshake failed", SourceType::Postgres).expect("hint");
919        assert!(h.contains("tls.mode") && h.contains("ca_file"), "got: {h}");
920    }
921
922    #[test]
923    fn source_tls_handshake_returns_mysql_specific_tls_hint() {
924        let h = src_hint("certificate verify failed", SourceType::Mysql).expect("hint");
925        assert!(h.contains("tls.mode"), "got: {h}");
926    }
927
928    #[test]
929    fn source_auth_error_postgres_mentions_pg_hba() {
930        let h = src_hint("password authentication failed", SourceType::Postgres).expect("hint");
931        assert!(h.contains("pg_hba") && h.contains("SELECT"), "got: {h}");
932    }
933
934    #[test]
935    fn source_auth_error_mysql_mentions_grant() {
936        let h = src_hint(
937            "Access denied for user 'rivet'@'localhost'",
938            SourceType::Mysql,
939        )
940        .expect("hint");
941        assert!(h.contains("GRANT") && h.contains("FLUSH"), "got: {h}");
942    }
943
944    #[test]
945    fn source_connectivity_error_mentions_bastion_and_network() {
946        let h = src_hint("connection refused", SourceType::Postgres).expect("hint");
947        assert!(h.contains("bastion") || h.contains("VPN"), "got: {h}");
948    }
949
950    #[test]
951    fn source_unknown_error_returns_no_hint() {
952        // Generic "error" category should yield no hint — better to
953        // print the raw driver message than to mislead.
954        let h = src_hint("totally unexpected", SourceType::Postgres);
955        assert!(h.is_none(), "unknown errors should not produce a hint");
956    }
957
958    #[test]
959    fn dest_s3_auth_error_names_concrete_actions() {
960        let h = dest_hint("permission denied", DestinationType::S3).expect("hint");
961        assert!(
962            h.contains("s3:PutObject") && h.contains("cloud-permissions"),
963            "got: {h}"
964        );
965    }
966
967    #[test]
968    fn dest_gcs_auth_error_names_concrete_actions() {
969        let h = dest_hint("403 Forbidden", DestinationType::Gcs).expect("hint");
970        assert!(
971            h.contains("storage.objects") && h.contains("cloud-permissions"),
972            "got: {h}"
973        );
974    }
975
976    #[test]
977    fn categorize_dest_error_sas_expired_message_returns_sas_expired_category() {
978        // Guard the load-bearing ordering in categorize_dest_error: the
979        // "sas expired" early-return must fire before the generic "token"
980        // branch, or destination_error_hint produces the wrong hint.
981        // This test pins the *category string*, not just the final hint text.
982        let err = anyhow::anyhow!(
983            "Azure SAS token already expired (se=2024-01-01T00:00:00Z). Generate a new SAS and re-export."
984        );
985        let dest = DestinationConfig {
986            destination_type: DestinationType::Azure,
987            bucket: Some("c".into()),
988            ..Default::default()
989        };
990        let cat = categorize_dest_error(&err, &dest);
991        assert_eq!(
992            cat, "sas expired",
993            "expired-SAS error must categorise as 'sas expired', not '{cat}' — ordering in categorize_dest_error is load-bearing"
994        );
995    }
996
997    #[test]
998    fn dest_azure_sas_expired_returns_regenerate_hint() {
999        // The Azure preflight (v0.7.4) bails with "expired (se=…)" —
1000        // the hint must steer the operator to `az storage container
1001        // generate-sas` not "your IAM role is broken".
1002        let h = dest_hint(
1003            "Azure SAS token already expired (se=2024-01-01T00:00:00Z)",
1004            DestinationType::Azure,
1005        )
1006        .expect("hint");
1007        assert!(
1008            h.contains("generate-sas") && h.contains("AZURE_STORAGE_SAS_TOKEN"),
1009            "got: {h}"
1010        );
1011    }
1012
1013    #[test]
1014    fn dest_s3_bucket_not_found_says_no_auto_create() {
1015        let h = dest_hint("NoSuchBucket", DestinationType::S3).expect("hint");
1016        assert!(
1017            h.contains("does NOT auto-create") && h.contains("aws s3 mb"),
1018            "got: {h}"
1019        );
1020    }
1021
1022    #[test]
1023    fn dest_s3_connectivity_error_warns_about_region_mismatch() {
1024        let h = dest_hint("dns error", DestinationType::S3).expect("hint");
1025        assert!(h.contains("region") || h.contains("endpoint"), "got: {h}");
1026    }
1027}