Skip to main content

rivet/preflight/
mod.rs

1mod analysis;
2pub(crate) mod cursor_expr;
3mod doctor;
4mod mssql;
5mod mysql;
6mod postgres;
7pub mod type_report;
8
9pub(crate) use analysis::chunk_sparsity_from_counts;
10#[cfg(test)]
11use analysis::{
12    build_suggestion, check_connection_limit, check_dense_surrogate_cost,
13    check_parallel_memory_risk, check_sparse_range, compute_verdict, derive_strategy,
14    recommend_parallelism, recommend_profile,
15};
16#[allow(unused_imports)]
17pub use doctor::doctor;
18#[cfg(test)]
19use postgres::{extract_scan_type, parse_pg_row_estimate};
20
21use crate::config::{Config, ExportConfig, SourceType};
22use crate::error::Result;
23use crate::types::policy::TypePolicy;
24use crate::types::target::{ExportTarget, TargetStatus};
25
26#[derive(Debug)]
27pub enum HealthVerdict {
28    Efficient,
29    Acceptable,
30    Degraded,
31    Unsafe,
32}
33
34impl std::fmt::Display for HealthVerdict {
35    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36        match self {
37            Self::Efficient => write!(f, "EFFICIENT"),
38            Self::Acceptable => write!(f, "ACCEPTABLE"),
39            Self::Degraded => write!(f, "DEGRADED"),
40            Self::Unsafe => write!(f, "UNSAFE"),
41        }
42    }
43}
44
45pub(crate) struct ExportDiagnostic {
46    pub export_name: String,
47    pub strategy: String,
48    pub mode: String,
49    pub cursor_column: Option<String>,
50    pub row_estimate: Option<i64>,
51    pub cursor_min: Option<String>,
52    pub cursor_max: Option<String>,
53    pub scan_type: Option<String>,
54    pub uses_index: bool,
55    pub verdict: HealthVerdict,
56    pub recommended_profile: &'static str,
57    pub recommended_parallel: (u32, &'static str),
58    pub warnings: Vec<String>,
59    pub suggestion: Option<String>,
60}
61
62/// Return the diagnostic for a single export without printing anything.
63///
64/// Used by `rivet plan` to capture preflight data into a `PlanArtifact`.
65pub(crate) fn get_export_diagnostic(
66    config: &Config,
67    export: &ExportConfig,
68) -> Result<ExportDiagnostic> {
69    let url = config.source.resolve_url()?;
70    let tls = config.source.tls.as_ref();
71    crate::source::warn_if_tls_disabled(&config.source);
72    match config.source.source_type {
73        SourceType::Postgres => postgres::diagnose_export_pg(&url, tls, export),
74        SourceType::Mysql => mysql::diagnose_export_mysql(&url, tls, export),
75        SourceType::Mssql => mssql::diagnose_export_mssql(&url, tls, export),
76    }
77}
78
79/// Dedup identity for a destination, shared by `check`'s credential probe
80/// and `doctor`'s write probe. Must include every field that changes where
81/// a probe lands — notably `path`, so two local destinations with different
82/// paths are probed separately. Keeping one helper prevents the two call
83/// sites from drifting apart (doctor's inline copy once omitted `path` and
84/// silently skipped the second local destination).
85fn destination_identity(d: &crate::config::DestinationConfig) -> String {
86    format!(
87        "{:?}:{}:{}:{}",
88        d.destination_type,
89        d.bucket.as_deref().unwrap_or("-"),
90        d.endpoint.as_deref().unwrap_or("-"),
91        d.path.as_deref().unwrap_or("-"),
92    )
93}
94
95/// One-line note for the "fail ✗ but rc 0" case: a column rendered `fail ✗`
96/// for `--target` does NOT gate the exit code unless `--strict` is also passed
97/// (that is the gate by design). Without this note an operator or CI reading
98/// the glyph alone would wrongly assume a non-zero exit. Pure so the exact text
99/// is unit-tested.
100fn target_fail_note(n: usize, target_label: &str) -> String {
101    let col = if n == 1 { "column" } else { "columns" };
102    format!(
103        "Note: {n} {col} FAIL {target_label} compatibility; exit code is gated only with --strict (currently exit 0)"
104    )
105}
106
107pub fn check(
108    config_path: &str,
109    export_name: Option<&str>,
110    params: Option<&std::collections::HashMap<String, String>>,
111    show_type_report: bool,
112    strict: bool,
113    json_output: bool,
114    target: Option<ExportTarget>,
115) -> Result<()> {
116    let config = Config::load_with_params(config_path, params)?;
117
118    let exports: Vec<&ExportConfig> = if let Some(name) = export_name {
119        let e = config
120            .exports
121            .iter()
122            .find(|e| e.name == name)
123            .ok_or_else(|| anyhow::anyhow!("export '{}' not found in config", name))?;
124        vec![e]
125    } else {
126        config.exports.iter().collect()
127    };
128
129    let url = config.source.resolve_url()?;
130    let tls = config.source.tls.as_ref();
131    // Surface the plaintext-transport warning at preflight time too —
132    // operators should hear it from `rivet check` before they wait
133    // through a full `rivet run` to learn the same thing. `Once` inside
134    // the helper keeps emission to one line per process even when both
135    // `check` and `run` flow through it.
136    crate::source::warn_if_tls_disabled(&config.source);
137    match config.source.source_type {
138        SourceType::Postgres => postgres::check_postgres(&url, tls, &exports, json_output)?,
139        SourceType::Mysql => mysql::check_mysql(&url, tls, &exports, json_output)?,
140        SourceType::Mssql => mssql::check_mssql(&url, tls, &exports, json_output)?,
141    }
142
143    // Destination credential-resolution preflight.  Until 0.7.6 `check` only
144    // probed the source: a config with `AWS_ACCESS_KEY_ID` unset would pass
145    // `rivet check` (rc=0) and then explode on `run`, while `rivet doctor`
146    // caught it.  We don't issue a write-probe here (that is `doctor`'s job
147    // and has side effects) — but we *do* call `create_destination`, which
148    // resolves env vars / credentials_file existence at construction time.
149    // Each unique destination is probed once per `check` to keep multi-export
150    // configs cheap.
151    let mut seen_destinations: std::collections::HashSet<String> = std::collections::HashSet::new();
152    for export in &exports {
153        let dest_key = destination_identity(&export.destination);
154        if !seen_destinations.insert(dest_key) {
155            continue;
156        }
157        let expanded = crate::plan::build::expand_destination_templates(
158            export.destination.clone(),
159            &export.name,
160        );
161        crate::destination::create_destination(&expanded).map_err(|e| {
162            anyhow::anyhow!(
163                "export '{}': destination preflight failed: {:#}",
164                export.name,
165                e
166            )
167        })?;
168    }
169
170    if show_type_report {
171        let policy = if strict {
172            TypePolicy::strict()
173        } else {
174            TypePolicy::warn_only()
175        };
176
177        let mut any_fatal = false;
178        // Count hard target-FAIL columns (and remember which target) so that —
179        // when --strict was NOT passed and the exit code is therefore 0 — we can
180        // print a note. The "fail ✗" glyph in the table implies a hard failure,
181        // but exit is gated only by --strict; without this note an operator or CI
182        // reading the glyph alone would be misled into thinking rc != 0.
183        let mut target_fail_cols = 0usize;
184        let mut target_fail_label: Option<&'static str> = None;
185        for export in &exports {
186            let column_overrides =
187                crate::plan::parse_column_overrides_pub(&export.columns, &export.name)?;
188            // CLI `--target` wins; otherwise fall back to the per-export
189            // `target:` from the config (slice #2a). A declared-but-unknown
190            // target is a loud error — never silently ignored.
191            if let Some(t) = export.target.as_deref()
192                && crate::types::target::ExportTarget::parse(t).is_none()
193            {
194                anyhow::bail!(
195                    "export '{}': unknown target '{t}' (expected: {})",
196                    export.name,
197                    crate::types::target::ExportTarget::valid_target_names()
198                );
199            }
200            let eff_target = target.or_else(|| {
201                export
202                    .target
203                    .as_deref()
204                    .and_then(crate::types::target::ExportTarget::parse)
205            });
206            let config_dir = std::path::Path::new(config_path)
207                .parent()
208                .unwrap_or_else(|| std::path::Path::new("."));
209            match type_report::collect_report(
210                &config,
211                export,
212                &column_overrides,
213                &policy,
214                eff_target,
215                config_dir,
216                params,
217            ) {
218                Ok(report) => {
219                    if report.has_fatal() {
220                        any_fatal = true;
221                    }
222                    if let Some(t) = eff_target
223                        && report.has_target_fail()
224                    {
225                        any_fatal = true;
226                        target_fail_cols += report
227                            .columns
228                            .iter()
229                            .filter(|c| c.target_status == Some(TargetStatus::Fail))
230                            .count();
231                        target_fail_label.get_or_insert(t.label());
232                    }
233                    if json_output {
234                        type_report::print_json(&report)?;
235                    } else {
236                        type_report::print_table(&report, eff_target);
237                    }
238                }
239                Err(e) => {
240                    log::warn!("type report for '{}' failed: {:#}", export.name, e);
241                }
242            }
243        }
244
245        if strict && any_fatal {
246            anyhow::bail!("strict mode: unsafe type mappings found (see report above)");
247        } else if !strict && target_fail_cols > 0 && !json_output {
248            // The table showed "fail ✗" but rc is 0 — say so explicitly. Skipped
249            // under --json so NDJSON output stays one object per line.
250            println!();
251            println!(
252                "{}",
253                target_fail_note(target_fail_cols, target_fail_label.unwrap_or("target"))
254            );
255        }
256    }
257
258    Ok(())
259}
260
261fn print_diagnostic(diag: &ExportDiagnostic) {
262    println!();
263    println!("Export: {}", diag.export_name);
264    println!("  Strategy:     {}", diag.strategy);
265    println!("  Mode:         {}", diag.mode);
266    if let Some(est) = diag.row_estimate {
267        if est >= 1_000_000 {
268            println!("  Row estimate: ~{}M", est / 1_000_000);
269        } else if est >= 1_000 {
270            println!("  Row estimate: ~{}K", est / 1_000);
271        } else {
272            println!("  Row estimate: ~{}", est);
273        }
274    }
275    if let (Some(min_v), Some(max_v)) = (&diag.cursor_min, &diag.cursor_max) {
276        println!("  Cursor range: {} .. {}", min_v, max_v);
277    }
278    if let Some(col) = &diag.cursor_column {
279        println!("  Cursor col:   {}", col);
280    }
281    if let Some(scan) = &diag.scan_type {
282        let idx_label = if diag.uses_index { " (indexed)" } else { "" };
283        println!("  Scan type:    {}{}", scan, idx_label);
284    }
285    println!("  Verdict:      {}", diag.verdict);
286    println!(
287        "  Recommended:  tuning.profile: {}",
288        diag.recommended_profile
289    );
290    let (par_level, par_reason) = diag.recommended_parallel;
291    if par_level > 1 {
292        println!("  Recommended:  parallel: {} ({})", par_level, par_reason);
293    } else {
294        println!("  Parallelism:  {} ({})", par_level, par_reason);
295    }
296    for w in &diag.warnings {
297        println!("  Warning:      {}", w);
298    }
299    if let Some(suggestion) = &diag.suggestion {
300        println!("  Suggestion:   {}", suggestion);
301    }
302}
303
304#[cfg(test)]
305mod tests {
306    use super::*;
307    use crate::config::{DestinationConfig, DestinationType, ExportConfig, ExportMode, FormatType};
308    use doctor::{
309        categorize_dest_error, categorize_source_error, destination_error_hint, source_error_hint,
310    };
311
312    fn make_export(name: &str, mode: ExportMode, cursor: Option<&str>) -> ExportConfig {
313        // Baseline from the canonical test fixture; override only the fields
314        // these preflight tests vary (mode, cursor, CSV format, query, dest).
315        ExportConfig {
316            mode,
317            cursor_column: cursor.map(|s| s.to_string()),
318            query: Some("SELECT * FROM t".to_string()),
319            format: FormatType::Csv,
320            destination: DestinationConfig {
321                destination_type: DestinationType::Local,
322                path: Some("./out".to_string()),
323                ..Default::default()
324            },
325            ..crate::config::sample_export(name)
326        }
327    }
328
329    // ── L8: 'fail ✗' note when --target FAILs but --strict was not passed ─────
330    // The glyph implies a hard failure; exit is gated only by --strict. The note
331    // tells an operator/CI the exit is 0 so the glyph doesn't mislead.
332    #[test]
333    fn target_fail_note_names_count_target_and_strict_gate() {
334        let note = target_fail_note(2, "bigquery");
335        assert!(note.contains("2 columns FAIL"), "got: {note}");
336        assert!(note.contains("bigquery"), "got: {note}");
337        assert!(note.contains("--strict"), "got: {note}");
338        assert!(note.contains("exit 0"), "got: {note}");
339    }
340
341    #[test]
342    fn target_fail_note_singular_for_one_column() {
343        let note = target_fail_note(1, "duckdb");
344        assert!(note.contains("1 column FAIL"), "got: {note}");
345        assert!(!note.contains("1 columns"), "should be singular: {note}");
346    }
347
348    #[test]
349    fn verdict_small_indexed_with_cursor_is_efficient() {
350        let v = compute_verdict(Some(500_000), true, true);
351        assert!(matches!(v, HealthVerdict::Efficient), "got: {v}");
352    }
353
354    #[test]
355    fn verdict_large_indexed_with_cursor_is_acceptable() {
356        let v = compute_verdict(Some(20_000_000), true, true);
357        assert!(matches!(v, HealthVerdict::Acceptable), "got: {v}");
358    }
359
360    #[test]
361    fn verdict_no_index_no_cursor_is_degraded() {
362        let v = compute_verdict(Some(500_000), false, false);
363        assert!(matches!(v, HealthVerdict::Degraded), "got: {v}");
364    }
365
366    #[test]
367    fn verdict_huge_no_index_is_unsafe() {
368        let v = compute_verdict(Some(100_000_000), false, false);
369        assert!(matches!(v, HealthVerdict::Unsafe), "got: {v}");
370    }
371
372    #[test]
373    fn parse_pg_row_estimate_from_sort_plan() {
374        let plan = "Sort  (cost=12345.67..12456.78 rows=1000455 width=50)\n  ->  Seq Scan on orders  (cost=0.00..8765.43 rows=1000455 width=50)";
375        assert_eq!(parse_pg_row_estimate(plan), Some(1_000_455));
376    }
377
378    #[test]
379    fn parse_pg_row_estimate_from_index_scan() {
380        let plan =
381            "Index Scan using idx_updated on orders  (cost=0.42..81676.36 rows=500000 width=50)";
382        assert_eq!(parse_pg_row_estimate(plan), Some(500_000));
383    }
384
385    #[test]
386    fn extract_scan_type_detects_seq_scan() {
387        let plan = "Sort  (cost=...)\n  ->  Seq Scan on users  (cost=...)";
388        let st = extract_scan_type(plan);
389        assert!(st.contains("Seq Scan"), "expected Seq Scan, got: {st}");
390    }
391
392    #[test]
393    fn extract_scan_type_detects_index_scan() {
394        let plan = "Index Scan using users_pkey on users  (cost=0.42..123.45 rows=100 width=50)";
395        let st = extract_scan_type(plan);
396        assert!(st.contains("Index Scan"), "expected Index Scan, got: {st}");
397    }
398
399    #[test]
400    fn suggestion_for_efficient_verdict_is_none() {
401        let e = make_export("t", ExportMode::Full, None);
402        let s = build_suggestion(&HealthVerdict::Efficient, Some(1000), true, &e);
403        assert!(
404            s.is_none(),
405            "efficient verdict should produce no suggestion"
406        );
407    }
408
409    #[test]
410    fn suggestion_for_degraded_verdict_recommends_safe_profile() {
411        let e = make_export("t", ExportMode::Full, None);
412        let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e);
413        let msg = s.expect("degraded verdict should produce a suggestion");
414        assert!(
415            msg.contains("safe"),
416            "suggestion should recommend safe profile, got: {msg}"
417        );
418    }
419
420    fn src_err(msg: &str) -> &'static str {
421        categorize_source_error(&anyhow::anyhow!("{}", msg))
422    }
423
424    #[test]
425    fn source_password_rejected_is_auth_error() {
426        assert_eq!(
427            src_err("password authentication failed for user \"rivet\""),
428            "auth error"
429        );
430    }
431
432    #[test]
433    fn source_authentication_failed_is_auth_error() {
434        assert_eq!(src_err("FATAL: authentication failed"), "auth error");
435    }
436
437    #[test]
438    fn source_access_denied_is_auth_error() {
439        assert_eq!(
440            src_err("Access denied for user 'rivet'@'localhost'"),
441            "auth error"
442        );
443    }
444
445    #[test]
446    fn source_connection_refused_is_connectivity() {
447        assert_eq!(
448            src_err("connection refused (os error 61)"),
449            "connectivity error"
450        );
451    }
452
453    #[test]
454    fn source_timed_out_is_connectivity() {
455        assert_eq!(src_err("connection timed out"), "connectivity error");
456    }
457
458    #[test]
459    fn source_dns_translate_host_is_connectivity() {
460        assert_eq!(
461            src_err("could not translate host name \"db.bad\" to address"),
462            "connectivity error"
463        );
464    }
465
466    #[test]
467    fn source_name_not_known_is_connectivity() {
468        assert_eq!(src_err("Name or service not known"), "connectivity error");
469    }
470
471    #[test]
472    fn source_unknown_error_is_generic() {
473        assert_eq!(src_err("something totally unexpected"), "error");
474    }
475
476    fn dest_config(dtype: DestinationType) -> DestinationConfig {
477        DestinationConfig {
478            destination_type: dtype,
479            bucket: Some("b".to_string()),
480            ..Default::default()
481        }
482    }
483
484    fn dest_err(msg: &str, dtype: DestinationType) -> &'static str {
485        let cfg = dest_config(dtype);
486        categorize_dest_error(&anyhow::anyhow!("{}", msg), &cfg)
487    }
488
489    fn local_dest(path: &str) -> DestinationConfig {
490        DestinationConfig {
491            destination_type: DestinationType::Local,
492            path: Some(path.to_string()),
493            ..Default::default()
494        }
495    }
496
497    // Regression (doctor-dedup): doctor's inline dedup key omitted `path`,
498    // so two local destinations with different paths collapsed to one entry
499    // and the second was never write-probed. The shared identity must keep
500    // them distinct.
501    #[test]
502    fn destination_identity_distinguishes_local_paths() {
503        assert_ne!(
504            destination_identity(&local_dest("/tmp/a")),
505            destination_identity(&local_dest("/tmp/b")),
506        );
507    }
508
509    #[test]
510    fn destination_identity_collapses_identical_local_destinations() {
511        assert_eq!(
512            destination_identity(&local_dest("/tmp/a")),
513            destination_identity(&local_dest("/tmp/a")),
514        );
515    }
516
517    #[test]
518    fn destination_identity_distinguishes_buckets() {
519        let a = DestinationConfig {
520            bucket: Some("bucket-a".to_string()),
521            ..dest_config(DestinationType::S3)
522        };
523        let b = DestinationConfig {
524            bucket: Some("bucket-b".to_string()),
525            ..dest_config(DestinationType::S3)
526        };
527        assert_ne!(destination_identity(&a), destination_identity(&b));
528    }
529
530    // Same bucket name on different endpoints (e.g. AWS vs MinIO) is two
531    // distinct destinations and must be probed separately.
532    #[test]
533    fn destination_identity_distinguishes_endpoints_for_same_bucket() {
534        let aws = dest_config(DestinationType::S3);
535        let minio = DestinationConfig {
536            endpoint: Some("http://localhost:9000".to_string()),
537            ..dest_config(DestinationType::S3)
538        };
539        assert_ne!(destination_identity(&aws), destination_identity(&minio));
540    }
541
542    #[test]
543    fn dest_credential_loading_is_auth_error() {
544        assert_eq!(
545            dest_err(
546                "loading credential to sign http request",
547                DestinationType::Gcs
548            ),
549            "auth error"
550        );
551    }
552
553    #[test]
554    fn dest_permission_denied_is_auth_error() {
555        assert_eq!(
556            dest_err("permission denied on resource bucket", DestinationType::S3),
557            "auth error"
558        );
559    }
560
561    #[test]
562    fn dest_forbidden_is_auth_error() {
563        assert_eq!(
564            dest_err("403 Forbidden", DestinationType::Gcs),
565            "auth error"
566        );
567    }
568
569    #[test]
570    fn dest_unauthorized_is_auth_error() {
571        assert_eq!(
572            dest_err("401 Unauthorized", DestinationType::S3),
573            "auth error"
574        );
575    }
576
577    #[test]
578    fn dest_invalid_grant_is_auth_error() {
579        assert_eq!(
580            dest_err(
581                "invalid_grant: token has been revoked",
582                DestinationType::Gcs
583            ),
584            "auth error"
585        );
586    }
587
588    #[test]
589    fn dest_nosuchbucket_s3_is_bucket_not_found() {
590        assert_eq!(
591            dest_err(
592                "NoSuchBucket: the specified bucket does not exist",
593                DestinationType::S3
594            ),
595            "bucket not found"
596        );
597    }
598
599    #[test]
600    fn dest_not_found_gcs_is_bucket_not_found() {
601        assert_eq!(
602            dest_err("bucket not found (404)", DestinationType::Gcs),
603            "bucket not found"
604        );
605    }
606
607    #[test]
608    fn dest_not_found_local_is_path_not_found() {
609        assert_eq!(
610            dest_err("path not found: /tmp/missing", DestinationType::Local),
611            "path not found"
612        );
613    }
614
615    #[test]
616    fn dest_connection_refused_is_connectivity() {
617        assert_eq!(
618            dest_err("connection refused to endpoint", DestinationType::S3),
619            "connectivity error"
620        );
621    }
622
623    #[test]
624    fn dest_dns_error_is_connectivity() {
625        assert_eq!(
626            dest_err("dns error: failed to lookup address", DestinationType::S3),
627            "connectivity error"
628        );
629    }
630
631    #[test]
632    fn dest_timed_out_is_connectivity() {
633        assert_eq!(
634            dest_err("request timed out after 30s", DestinationType::Gcs),
635            "connectivity error"
636        );
637    }
638
639    #[test]
640    fn dest_unknown_error_is_generic() {
641        assert_eq!(
642            dest_err("something else entirely", DestinationType::S3),
643            "error"
644        );
645    }
646
647    #[test]
648    fn strategy_full_scan() {
649        let e = make_export("t", ExportMode::Full, None);
650        assert_eq!(derive_strategy(&e), "full-scan");
651    }
652
653    #[test]
654    fn strategy_full_parallel() {
655        let mut e = make_export("t", ExportMode::Full, None);
656        e.parallel = 4;
657        assert_eq!(derive_strategy(&e), "full-parallel(4)");
658    }
659
660    #[test]
661    fn strategy_incremental() {
662        let e = make_export("t", ExportMode::Incremental, Some("updated_at"));
663        assert_eq!(derive_strategy(&e), "incremental(updated_at)");
664    }
665
666    #[test]
667    fn strategy_chunked() {
668        let mut e = make_export("t", ExportMode::Chunked, None);
669        e.chunk_column = Some("id".to_string());
670        e.chunk_size = 50_000;
671        assert_eq!(derive_strategy(&e), "chunked(id, size=50000)");
672    }
673
674    #[test]
675    fn strategy_chunked_parallel() {
676        let mut e = make_export("t", ExportMode::Chunked, None);
677        e.chunk_column = Some("id".to_string());
678        e.chunk_size = 50_000;
679        e.parallel = 3;
680        assert_eq!(derive_strategy(&e), "chunked-parallel(id, size=50000, p=3)");
681    }
682
683    #[test]
684    fn strategy_time_window() {
685        let mut e = make_export("t", ExportMode::TimeWindow, None);
686        e.time_column = Some("created_at".to_string());
687        e.days_window = Some(7);
688        assert_eq!(derive_strategy(&e), "time-window(created_at, 7d)");
689    }
690
691    #[test]
692    fn profile_small_indexed_is_fast() {
693        let e = make_export("t", ExportMode::Full, None);
694        assert_eq!(recommend_profile(Some(500_000), true, &e), "fast");
695    }
696
697    #[test]
698    fn profile_medium_indexed_is_balanced() {
699        let e = make_export("t", ExportMode::Full, None);
700        assert_eq!(recommend_profile(Some(5_000_000), true, &e), "balanced");
701    }
702
703    #[test]
704    fn profile_large_indexed_is_safe() {
705        let e = make_export("t", ExportMode::Full, None);
706        assert_eq!(recommend_profile(Some(50_000_000), true, &e), "safe");
707    }
708
709    #[test]
710    fn profile_small_no_index_is_balanced() {
711        let e = make_export("t", ExportMode::Full, None);
712        assert_eq!(recommend_profile(Some(50_000), false, &e), "balanced");
713    }
714
715    #[test]
716    fn profile_small_no_index_parallel_is_safe() {
717        let mut e = make_export("t", ExportMode::Full, None);
718        e.parallel = 4;
719        assert_eq!(recommend_profile(Some(50_000), false, &e), "safe");
720    }
721
722    #[test]
723    fn profile_medium_no_index_is_balanced() {
724        let e = make_export("t", ExportMode::Full, None);
725        assert_eq!(recommend_profile(Some(500_000), false, &e), "balanced");
726    }
727
728    #[test]
729    fn profile_large_no_index_is_safe() {
730        let e = make_export("t", ExportMode::Full, None);
731        assert_eq!(recommend_profile(Some(5_000_000), false, &e), "safe");
732    }
733
734    #[test]
735    fn sparse_range_warning_when_very_sparse() {
736        let mut e = make_export("t", ExportMode::Chunked, None);
737        e.chunk_column = Some("id".to_string());
738        e.chunk_size = 100_000;
739        let w = check_sparse_range(&e, Some(100_000), Some("1"), Some("10000000"));
740        assert!(w.is_some(), "should warn about sparse range");
741        let msg = w.unwrap();
742        assert!(msg.contains("Sparse key range"), "got: {msg}");
743        assert!(msg.contains("empty"), "got: {msg}");
744    }
745
746    #[test]
747    fn sparse_range_no_warning_when_dense() {
748        let mut e = make_export("t", ExportMode::Chunked, None);
749        e.chunk_column = Some("id".to_string());
750        e.chunk_size = 100_000;
751        let w = check_sparse_range(&e, Some(100_000), Some("1"), Some("100000"));
752        assert!(w.is_none(), "should not warn for dense range");
753    }
754
755    #[test]
756    fn sparse_range_skipped_when_chunk_dense() {
757        let mut e = make_export("t", ExportMode::Chunked, None);
758        e.chunk_column = Some("id".to_string());
759        e.chunk_dense = true;
760        e.chunk_size = 100_000;
761        let w = check_sparse_range(&e, Some(100_000), Some("1"), Some("10000000"));
762        assert!(
763            w.is_none(),
764            "chunk_dense uses ordinals, not physical id span"
765        );
766    }
767
768    #[test]
769    fn dense_surrogate_warning_when_chunk_dense_builtin() {
770        let mut e = make_export("t", ExportMode::Chunked, None);
771        e.chunk_column = Some("id".to_string());
772        e.chunk_dense = true;
773        e.query = Some("SELECT id FROM orders".to_string());
774        let w = check_dense_surrogate_cost(&e);
775        assert!(w.is_some(), "should warn about built-in ROW_NUMBER cost");
776        assert!(w.unwrap().contains("global sort"));
777    }
778
779    #[test]
780    fn sparse_range_not_triggered_for_non_chunked() {
781        let e = make_export("t", ExportMode::Full, None);
782        let w = check_sparse_range(&e, Some(100), Some("1"), Some("1000000"));
783        assert!(w.is_none(), "should not warn for non-chunked mode");
784    }
785
786    #[test]
787    fn dense_surrogate_warning_with_row_number() {
788        let mut e = make_export("t", ExportMode::Chunked, None);
789        e.chunk_column = Some("rn".to_string());
790        e.query = Some("SELECT *, ROW_NUMBER() OVER (ORDER BY id) AS rn FROM orders".to_string());
791        let w = check_dense_surrogate_cost(&e);
792        assert!(w.is_some(), "should warn about ROW_NUMBER cost");
793        assert!(w.unwrap().contains("global sort"));
794    }
795
796    #[test]
797    fn no_dense_surrogate_warning_without_row_number() {
798        let mut e = make_export("t", ExportMode::Chunked, None);
799        e.chunk_column = Some("id".to_string());
800        e.query = Some("SELECT * FROM orders".to_string());
801        let w = check_dense_surrogate_cost(&e);
802        assert!(w.is_none());
803    }
804
805    #[test]
806    fn no_dense_surrogate_warning_for_non_chunked() {
807        let mut e = make_export("t", ExportMode::Full, None);
808        e.query = Some("SELECT ROW_NUMBER() OVER () AS rn FROM t".to_string());
809        let w = check_dense_surrogate_cost(&e);
810        assert!(w.is_none(), "should not warn for non-chunked mode");
811    }
812
813    #[test]
814    fn parallel_memory_warning_large_dataset() {
815        let mut e = make_export("t", ExportMode::Chunked, None);
816        e.parallel = 4;
817        let w = check_parallel_memory_risk(&e, Some(10_000_000));
818        assert!(w.is_some(), "should warn about memory risk");
819        let msg = w.unwrap();
820        assert!(msg.contains("Parallel=4"), "got: {msg}");
821        assert!(msg.contains("memory"), "got: {msg}");
822    }
823
824    #[test]
825    fn no_parallel_memory_warning_small_dataset() {
826        let mut e = make_export("t", ExportMode::Chunked, None);
827        e.parallel = 4;
828        let w = check_parallel_memory_risk(&e, Some(1_000));
829        assert!(w.is_none(), "should not warn for small dataset");
830    }
831
832    #[test]
833    fn no_parallel_memory_warning_single_worker() {
834        let e = make_export("t", ExportMode::Full, None);
835        let w = check_parallel_memory_risk(&e, Some(100_000_000));
836        assert!(w.is_none(), "should not warn when parallel=1");
837    }
838
839    #[test]
840    fn suggestion_degraded_full_recommends_incremental() {
841        let e = make_export("t", ExportMode::Full, None);
842        let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e).unwrap();
843        assert!(s.contains("incremental"), "got: {s}");
844    }
845
846    #[test]
847    fn suggestion_degraded_chunked_recommends_index() {
848        let mut e = make_export("t", ExportMode::Chunked, None);
849        e.chunk_column = Some("id".to_string());
850        let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e).unwrap();
851        assert!(s.contains("index on 'id'"), "got: {s}");
852    }
853
854    #[test]
855    fn suggestion_degraded_time_window_recommends_index() {
856        let mut e = make_export("t", ExportMode::TimeWindow, None);
857        e.time_column = Some("created_at".to_string());
858        e.days_window = Some(7);
859        let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e).unwrap();
860        assert!(s.contains("index on 'created_at'"), "got: {s}");
861    }
862
863    #[test]
864    fn suggestion_unsafe_full_recommends_incremental() {
865        let e = make_export("t", ExportMode::Full, None);
866        let s = build_suggestion(&HealthVerdict::Unsafe, Some(100_000_000), false, &e).unwrap();
867        assert!(s.contains("incremental"), "got: {s}");
868    }
869
870    #[test]
871    fn suggestion_unsafe_chunked_recommends_index_and_parallel() {
872        let mut e = make_export("t", ExportMode::Chunked, None);
873        e.chunk_column = Some("id".to_string());
874        let s = build_suggestion(&HealthVerdict::Unsafe, Some(100_000_000), false, &e).unwrap();
875        assert!(s.contains("index on 'id'"), "got: {s}");
876        assert!(s.contains("parallel"), "got: {s}");
877    }
878
879    #[test]
880    fn suggestion_unsafe_incremental_recommends_index_on_cursor() {
881        let e = make_export("t", ExportMode::Incremental, Some("updated_at"));
882        let s = build_suggestion(&HealthVerdict::Unsafe, Some(100_000_000), false, &e).unwrap();
883        assert!(s.contains("index on 'updated_at'"), "got: {s}");
884    }
885
886    #[test]
887    fn suggestion_acceptable_large_full_recommends_incremental() {
888        let e = make_export("t", ExportMode::Full, None);
889        let s = build_suggestion(&HealthVerdict::Acceptable, Some(20_000_000), true, &e).unwrap();
890        assert!(s.contains("incremental"), "got: {s}");
891    }
892
893    #[test]
894    fn parallel_only_for_chunked_mode() {
895        let e = make_export("t", ExportMode::Full, None);
896        let (level, _) = recommend_parallelism(&e, Some(1_000_000), true);
897        assert_eq!(level, 1, "non-chunked mode should recommend 1");
898    }
899
900    #[test]
901    fn parallel_small_dataset_is_one() {
902        let mut e = make_export("t", ExportMode::Chunked, None);
903        e.chunk_column = Some("id".to_string());
904        let (level, _) = recommend_parallelism(&e, Some(10_000), true);
905        assert_eq!(level, 1, "small dataset should recommend 1");
906    }
907
908    #[test]
909    fn parallel_moderate_indexed_is_two() {
910        let mut e = make_export("t", ExportMode::Chunked, None);
911        e.chunk_column = Some("id".to_string());
912        let (level, _) = recommend_parallelism(&e, Some(200_000), true);
913        assert_eq!(level, 2, "moderate indexed dataset should recommend 2");
914    }
915
916    #[test]
917    fn parallel_large_indexed_is_four() {
918        let mut e = make_export("t", ExportMode::Chunked, None);
919        e.chunk_column = Some("id".to_string());
920        let (level, _) = recommend_parallelism(&e, Some(2_000_000), true);
921        assert_eq!(level, 4, "large indexed dataset should recommend 4");
922    }
923
924    #[test]
925    fn parallel_no_index_large_is_one() {
926        let mut e = make_export("t", ExportMode::Chunked, None);
927        e.chunk_column = Some("id".to_string());
928        let (level, reason) = recommend_parallelism(&e, Some(10_000_000), false);
929        assert_eq!(level, 1, "no index + large should recommend 1");
930        assert!(reason.contains("no index"), "got: {reason}");
931    }
932
933    #[test]
934    fn parallel_no_index_moderate_is_conservative() {
935        let mut e = make_export("t", ExportMode::Chunked, None);
936        e.chunk_column = Some("id".to_string());
937        let (level, _) = recommend_parallelism(&e, Some(200_000), false);
938        assert_eq!(
939            level, 2,
940            "no index + moderate should recommend 2 (conservative)"
941        );
942    }
943
944    #[test]
945    fn suggestion_acceptable_large_chunked_recommends_parallel() {
946        let mut e = make_export("t", ExportMode::Chunked, None);
947        e.chunk_column = Some("id".to_string());
948        let s = build_suggestion(&HealthVerdict::Acceptable, Some(20_000_000), true, &e).unwrap();
949        assert!(s.contains("parallel"), "got: {s}");
950    }
951
952    #[test]
953    fn connection_limit_warn_when_parallel_meets_max() {
954        let w = check_connection_limit(20, Some(20));
955        assert!(w.is_some(), "should warn when parallel == max_connections");
956        let msg = w.unwrap();
957        assert!(msg.contains("max_connections=20"), "got: {msg}");
958        assert!(msg.contains("parallel=20"), "got: {msg}");
959    }
960
961    #[test]
962    fn connection_limit_warn_when_parallel_exceeds_max() {
963        let w = check_connection_limit(100, Some(20));
964        assert!(w.is_some(), "should warn when parallel > max_connections");
965        let msg = w.unwrap();
966        assert!(msg.contains("max_connections=20"), "got: {msg}");
967    }
968
969    #[test]
970    fn connection_limit_no_warn_when_parallel_below_max() {
971        let w = check_connection_limit(4, Some(100));
972        assert!(
973            w.is_none(),
974            "should not warn when parallel << max_connections"
975        );
976    }
977
978    #[test]
979    fn connection_limit_no_warn_when_parallel_is_one() {
980        let w = check_connection_limit(1, Some(5));
981        assert!(
982            w.is_none(),
983            "single worker never triggers connection warning"
984        );
985    }
986
987    #[test]
988    fn connection_limit_skipped_note_when_max_unknown_and_parallel_gt_one() {
989        let w = check_connection_limit(100, None);
990        assert!(w.is_some(), "should note that check was skipped");
991        let msg = w.unwrap();
992        assert!(msg.contains("skipped"), "got: {msg}");
993    }
994
995    #[test]
996    fn connection_limit_no_note_when_max_unknown_and_parallel_is_one() {
997        let w = check_connection_limit(1, None);
998        assert!(
999            w.is_none(),
1000            "single worker never triggers connection warning"
1001        );
1002    }
1003
1004    #[test]
1005    fn connection_limit_suggests_headroom() {
1006        let w = check_connection_limit(25, Some(20)).unwrap();
1007        // Suggested safe max should be max_connections - 3 = 17
1008        assert!(
1009            w.contains("17"),
1010            "should suggest leaving headroom, got: {w}"
1011        );
1012    }
1013
1014    // ── v0.7.4: actionable hints next to categorised errors ───────────
1015
1016    fn src_hint(msg: &str, st: SourceType) -> Option<&'static str> {
1017        let err = anyhow::anyhow!("{}", msg);
1018        let cat = categorize_source_error(&err);
1019        source_error_hint(cat, &err, &st)
1020    }
1021
1022    fn dest_hint(msg: &str, dt: DestinationType) -> Option<&'static str> {
1023        let err = anyhow::anyhow!("{}", msg);
1024        let dest = DestinationConfig {
1025            destination_type: dt,
1026            bucket: Some("b".into()),
1027            ..Default::default()
1028        };
1029        let cat = categorize_dest_error(&err, &dest);
1030        destination_error_hint(cat, &dest)
1031    }
1032
1033    #[test]
1034    fn source_tls_handshake_returns_pg_specific_tls_hint() {
1035        let h = src_hint("TLS handshake failed", SourceType::Postgres).expect("hint");
1036        assert!(h.contains("tls.mode") && h.contains("ca_file"), "got: {h}");
1037    }
1038
1039    #[test]
1040    fn source_tls_handshake_returns_mysql_specific_tls_hint() {
1041        let h = src_hint("certificate verify failed", SourceType::Mysql).expect("hint");
1042        assert!(h.contains("tls.mode"), "got: {h}");
1043    }
1044
1045    #[test]
1046    fn source_auth_error_postgres_mentions_pg_hba() {
1047        let h = src_hint("password authentication failed", SourceType::Postgres).expect("hint");
1048        assert!(h.contains("pg_hba") && h.contains("SELECT"), "got: {h}");
1049    }
1050
1051    #[test]
1052    fn source_auth_error_mysql_mentions_grant() {
1053        let h = src_hint(
1054            "Access denied for user 'rivet'@'localhost'",
1055            SourceType::Mysql,
1056        )
1057        .expect("hint");
1058        assert!(h.contains("GRANT") && h.contains("FLUSH"), "got: {h}");
1059    }
1060
1061    #[test]
1062    fn source_connectivity_error_mentions_bastion_and_network() {
1063        let h = src_hint("connection refused", SourceType::Postgres).expect("hint");
1064        assert!(h.contains("bastion") || h.contains("VPN"), "got: {h}");
1065    }
1066
1067    #[test]
1068    fn source_unknown_error_returns_no_hint() {
1069        // Generic "error" category should yield no hint — better to
1070        // print the raw driver message than to mislead.
1071        let h = src_hint("totally unexpected", SourceType::Postgres);
1072        assert!(h.is_none(), "unknown errors should not produce a hint");
1073    }
1074
1075    #[test]
1076    fn dest_s3_auth_error_names_concrete_actions() {
1077        let h = dest_hint("permission denied", DestinationType::S3).expect("hint");
1078        assert!(
1079            h.contains("s3:PutObject") && h.contains("cloud-permissions"),
1080            "got: {h}"
1081        );
1082    }
1083
1084    #[test]
1085    fn dest_gcs_auth_error_names_concrete_actions() {
1086        let h = dest_hint("403 Forbidden", DestinationType::Gcs).expect("hint");
1087        assert!(
1088            h.contains("storage.objects") && h.contains("cloud-permissions"),
1089            "got: {h}"
1090        );
1091    }
1092
1093    #[test]
1094    fn categorize_dest_error_sas_expired_message_returns_sas_expired_category() {
1095        // Guard the load-bearing ordering in categorize_dest_error: the
1096        // "sas expired" early-return must fire before the generic "token"
1097        // branch, or destination_error_hint produces the wrong hint.
1098        // This test pins the *category string*, not just the final hint text.
1099        let err = anyhow::anyhow!(
1100            "Azure SAS token already expired (se=2024-01-01T00:00:00Z). Generate a new SAS and re-export."
1101        );
1102        let dest = DestinationConfig {
1103            destination_type: DestinationType::Azure,
1104            bucket: Some("c".into()),
1105            ..Default::default()
1106        };
1107        let cat = categorize_dest_error(&err, &dest);
1108        assert_eq!(
1109            cat, "sas expired",
1110            "expired-SAS error must categorise as 'sas expired', not '{cat}' — ordering in categorize_dest_error is load-bearing"
1111        );
1112    }
1113
1114    #[test]
1115    fn dest_azure_sas_expired_returns_regenerate_hint() {
1116        // The Azure preflight (v0.7.4) bails with "expired (se=…)" —
1117        // the hint must steer the operator to `az storage container
1118        // generate-sas` not "your IAM role is broken".
1119        let h = dest_hint(
1120            "Azure SAS token already expired (se=2024-01-01T00:00:00Z)",
1121            DestinationType::Azure,
1122        )
1123        .expect("hint");
1124        assert!(
1125            h.contains("generate-sas") && h.contains("AZURE_STORAGE_SAS_TOKEN"),
1126            "got: {h}"
1127        );
1128    }
1129
1130    #[test]
1131    fn dest_s3_bucket_not_found_says_no_auto_create() {
1132        let h = dest_hint("NoSuchBucket", DestinationType::S3).expect("hint");
1133        assert!(
1134            h.contains("does NOT auto-create") && h.contains("aws s3 mb"),
1135            "got: {h}"
1136        );
1137    }
1138
1139    #[test]
1140    fn dest_s3_connectivity_error_warns_about_region_mismatch() {
1141        let h = dest_hint("dns error", DestinationType::S3).expect("hint");
1142        assert!(h.contains("region") || h.contains("endpoint"), "got: {h}");
1143    }
1144}