Skip to main content

rivet/preflight/
mod.rs

1mod analysis;
2pub(crate) mod cursor_expr;
3mod doctor;
4mod mssql;
5mod mysql;
6mod postgres;
7mod schema_error;
8pub mod type_report;
9
10pub(crate) use analysis::chunk_sparsity_from_counts;
11#[cfg(test)]
12use analysis::{
13    build_suggestion, check_connection_limit, check_dense_surrogate_cost,
14    check_parallel_memory_risk, check_sparse_range, compute_verdict, derive_strategy,
15    recommend_parallelism, recommend_profile,
16};
17#[allow(unused_imports)]
18pub use doctor::doctor;
19// Reused at the run-time connect seam (src/pipeline/single.rs) so a failed
20// `rivet run` carries the same category + remediation hint `rivet doctor` gives.
21pub(crate) use doctor::{categorize_source_error, source_error_hint};
22#[cfg(test)]
23use postgres::{extract_scan_type, parse_pg_row_estimate};
24
25use crate::config::{Config, ExportConfig, SourceType};
26use crate::error::Result;
27use crate::types::policy::TypePolicy;
28use crate::types::target::{ExportTarget, TargetStatus};
29
30#[derive(Debug)]
31pub enum HealthVerdict {
32    Efficient,
33    Acceptable,
34    Degraded,
35    Unsafe,
36}
37
38impl std::fmt::Display for HealthVerdict {
39    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40        match self {
41            Self::Efficient => write!(f, "EFFICIENT"),
42            Self::Acceptable => write!(f, "ACCEPTABLE"),
43            Self::Degraded => write!(f, "DEGRADED"),
44            Self::Unsafe => write!(f, "UNSAFE"),
45        }
46    }
47}
48
49pub(crate) struct ExportDiagnostic {
50    pub export_name: String,
51    pub strategy: String,
52    pub mode: String,
53    pub cursor_column: Option<String>,
54    pub row_estimate: Option<i64>,
55    /// Average bytes per row from catalog/plan stats (PG EXPLAIN `width`,
56    /// MSSQL `dm_db_partition_stats` pages/row). `None` when unavailable
57    /// (e.g. MySQL, with no trustworthy scan-free estimate). Feeds the
58    /// oversized-chunk warning and is shown as the `Row width` line.
59    pub avg_row_bytes: Option<i64>,
60    pub cursor_min: Option<String>,
61    pub cursor_max: Option<String>,
62    pub scan_type: Option<String>,
63    pub uses_index: bool,
64    pub verdict: HealthVerdict,
65    pub recommended_profile: &'static str,
66    pub recommended_parallel: (u32, &'static str),
67    pub warnings: Vec<String>,
68    pub suggestion: Option<String>,
69}
70
71/// Return the diagnostic for a single export without printing anything.
72///
73/// Used by `rivet plan` to capture preflight data into a `PlanArtifact`.
74pub(crate) fn get_export_diagnostic(
75    config: &Config,
76    export: &ExportConfig,
77) -> Result<ExportDiagnostic> {
78    let url = config.source.resolve_url()?;
79    let tls = config.source.tls.as_ref();
80    crate::source::warn_if_tls_disabled(&config.source);
81    match config.source.source_type {
82        SourceType::Postgres => postgres::diagnose_export_pg(&url, tls, export),
83        SourceType::Mysql => mysql::diagnose_export_mysql(&url, tls, export),
84        SourceType::Mssql => mssql::diagnose_export_mssql(&url, tls, export),
85    }
86}
87
88/// Dedup identity for a destination, shared by `check`'s credential probe
89/// and `doctor`'s write probe. Must include every field that changes where
90/// a probe lands — notably `path`, so two local destinations with different
91/// paths are probed separately. Keeping one helper prevents the two call
92/// sites from drifting apart (doctor's inline copy once omitted `path` and
93/// silently skipped the second local destination).
94fn destination_identity(d: &crate::config::DestinationConfig) -> String {
95    format!(
96        "{:?}:{}:{}:{}",
97        d.destination_type,
98        d.bucket.as_deref().unwrap_or("-"),
99        d.endpoint.as_deref().unwrap_or("-"),
100        d.path.as_deref().unwrap_or("-"),
101    )
102}
103
104/// One-line note for the "fail ✗ but rc 0" case: a column rendered `fail ✗`
105/// for `--target` does NOT gate the exit code unless `--strict` is also passed
106/// (that is the gate by design). Without this note an operator or CI reading
107/// the glyph alone would wrongly assume a non-zero exit. Pure so the exact text
108/// is unit-tested.
109fn target_fail_note(n: usize, target_label: &str) -> String {
110    let col = if n == 1 { "column" } else { "columns" };
111    format!(
112        "Note: {n} {col} FAIL {target_label} compatibility; exit code is gated only with --strict (currently exit 0)"
113    )
114}
115
116pub fn check(
117    config_path: &str,
118    export_name: Option<&str>,
119    params: Option<&std::collections::HashMap<String, String>>,
120    show_type_report: bool,
121    strict: bool,
122    json_output: bool,
123    target: Option<ExportTarget>,
124) -> Result<()> {
125    let config = Config::load_with_params(config_path, params)?;
126
127    let exports: Vec<&ExportConfig> = if let Some(name) = export_name {
128        let e = config
129            .exports
130            .iter()
131            .find(|e| e.name == name)
132            .ok_or_else(|| anyhow::anyhow!("export '{}' not found in config", name))?;
133        vec![e]
134    } else {
135        config.exports.iter().collect()
136    };
137
138    let url = config.source.resolve_url()?;
139    let tls = config.source.tls.as_ref();
140    // Surface the plaintext-transport warning at preflight time too —
141    // operators should hear it from `rivet check` before they wait
142    // through a full `rivet run` to learn the same thing. `Once` inside
143    // the helper keeps emission to one line per process even when both
144    // `check` and `run` flow through it.
145    crate::source::warn_if_tls_disabled(&config.source);
146    match config.source.source_type {
147        SourceType::Postgres => postgres::check_postgres(&url, tls, &exports, json_output)?,
148        SourceType::Mysql => mysql::check_mysql(&url, tls, &exports, json_output)?,
149        SourceType::Mssql => mssql::check_mssql(&url, tls, &exports, json_output)?,
150    }
151
152    // Destination credential-resolution preflight.  Until 0.7.6 `check` only
153    // probed the source: a config with `AWS_ACCESS_KEY_ID` unset would pass
154    // `rivet check` (rc=0) and then explode on `run`, while `rivet doctor`
155    // caught it.  We don't issue a write-probe here (that is `doctor`'s job
156    // and has side effects) — but we *do* call `create_destination`, which
157    // resolves env vars / credentials_file existence at construction time.
158    // Each unique destination is probed once per `check` to keep multi-export
159    // configs cheap.
160    let mut seen_destinations: std::collections::HashSet<String> = std::collections::HashSet::new();
161    for export in &exports {
162        let dest_key = destination_identity(&export.destination);
163        if !seen_destinations.insert(dest_key) {
164            continue;
165        }
166        let expanded = crate::plan::build::expand_destination_templates(
167            export.destination.clone(),
168            &export.name,
169        );
170        crate::destination::create_destination(&expanded).map_err(|e| {
171            anyhow::anyhow!(
172                "export '{}': destination preflight failed: {:#}",
173                export.name,
174                e
175            )
176        })?;
177    }
178
179    // Whether the check ends clean (no strict failure, no target-fail column).
180    // Stays true for the default `rivet check` (no type report), so the "Next:
181    // rivet run" pointer still fires.
182    let mut clean = true;
183
184    if show_type_report {
185        let policy = if strict {
186            TypePolicy::strict()
187        } else {
188            TypePolicy::warn_only()
189        };
190
191        let mut any_fatal = false;
192        // Count hard target-FAIL columns (and remember which target) so that —
193        // when --strict was NOT passed and the exit code is therefore 0 — we can
194        // print a note. The "fail ✗" glyph in the table implies a hard failure,
195        // but exit is gated only by --strict; without this note an operator or CI
196        // reading the glyph alone would be misled into thinking rc != 0.
197        let mut target_fail_cols = 0usize;
198        let mut target_fail_label: Option<&'static str> = None;
199        for export in &exports {
200            let column_overrides =
201                crate::plan::parse_column_overrides_pub(&export.columns, &export.name)?;
202            // CLI `--target` wins; otherwise fall back to the per-export
203            // `target:` from the config (slice #2a). A declared-but-unknown
204            // target is a loud error — never silently ignored.
205            if let Some(t) = export.target.as_deref()
206                && crate::types::target::ExportTarget::parse(t).is_none()
207            {
208                anyhow::bail!(
209                    "export '{}': unknown target '{t}' (expected: {})",
210                    export.name,
211                    crate::types::target::ExportTarget::valid_target_names()
212                );
213            }
214            let eff_target = target.or_else(|| {
215                export
216                    .target
217                    .as_deref()
218                    .and_then(crate::types::target::ExportTarget::parse)
219            });
220            let config_dir = std::path::Path::new(config_path)
221                .parent()
222                .unwrap_or_else(|| std::path::Path::new("."));
223            match type_report::collect_report(
224                &config,
225                export,
226                &column_overrides,
227                &policy,
228                eff_target,
229                config_dir,
230                params,
231            ) {
232                Ok(report) => {
233                    if report.has_fatal() {
234                        any_fatal = true;
235                    }
236                    if let Some(t) = eff_target
237                        && report.has_target_fail()
238                    {
239                        any_fatal = true;
240                        target_fail_cols += report
241                            .columns
242                            .iter()
243                            .filter(|c| c.target_status == Some(TargetStatus::Fail))
244                            .count();
245                        target_fail_label.get_or_insert(t.label());
246                    }
247                    if json_output {
248                        type_report::print_json(&report)?;
249                    } else {
250                        type_report::print_table(&report, eff_target);
251                    }
252                }
253                Err(e) => {
254                    log::warn!("type report for '{}' failed: {:#}", export.name, e);
255                }
256            }
257        }
258
259        if strict && any_fatal {
260            anyhow::bail!("strict mode: unsafe type mappings found (see report above)");
261        } else if !strict && target_fail_cols > 0 && !json_output {
262            // The table showed "fail ✗" but rc is 0 — say so explicitly. Skipped
263            // under --json so NDJSON output stays one object per line.
264            clean = false;
265            println!();
266            println!(
267                "{}",
268                target_fail_note(target_fail_cols, target_fail_label.unwrap_or("target"))
269            );
270        }
271    }
272
273    if !json_output {
274        // Verdict legend — decode the EFFICIENT/ACCEPTABLE/DEGRADED/UNSAFE words
275        // printed above and reassure that `check` is advisory: never blocks a run.
276        println!();
277        println!(
278            "Verdicts: EFFICIENT > ACCEPTABLE > DEGRADED > UNSAFE — advisory only; the run is never blocked."
279        );
280        if clean {
281            // Keep the ladder going to the final rung instead of ending cold.
282            println!(
283                "Looks good. Next: rivet run -c {config_path} --validate   # export, then verify row counts"
284            );
285        }
286    }
287
288    Ok(())
289}
290
291fn print_diagnostic(diag: &ExportDiagnostic) {
292    println!();
293    println!("Export: {}", diag.export_name);
294    println!("  Strategy:     {}", diag.strategy);
295    println!("  Mode:         {}", diag.mode);
296    if let Some(est) = diag.row_estimate {
297        if est >= 1_000_000 {
298            println!("  Row estimate: ~{}M", est / 1_000_000);
299        } else if est >= 1_000 {
300            println!("  Row estimate: ~{}K", est / 1_000);
301        } else {
302            println!("  Row estimate: ~{}", est);
303        }
304    }
305    if let Some(w) = diag.avg_row_bytes {
306        println!("  Row width:    ~{} bytes", w);
307    }
308    if let (Some(min_v), Some(max_v)) = (&diag.cursor_min, &diag.cursor_max) {
309        println!("  Cursor range: {} .. {}", min_v, max_v);
310    }
311    if let Some(col) = &diag.cursor_column {
312        println!("  Cursor col:   {}", col);
313    }
314    // Plain-language access path instead of a raw EXPLAIN node dump
315    // (`Result (cost=0.00..0.01 rows=1 width=36)`). Keyed off the authoritative
316    // `uses_index` bool, gated on `scan_type.is_some()` so engines without an
317    // EXPLAIN probe (MSSQL) stay silent.
318    if diag.scan_type.is_some() {
319        let access = if diag.uses_index {
320            "index scan (the cursor/chunk column is indexed)"
321        } else {
322            "full table scan (no index on the read path)"
323        };
324        println!("  Access:       {access}");
325    }
326    println!("  Verdict:      {}", diag.verdict);
327    println!(
328        "  Recommended:  tuning.profile: {}",
329        diag.recommended_profile
330    );
331    let (par_level, par_reason) = diag.recommended_parallel;
332    if par_level > 1 {
333        println!("  Recommended:  parallel: {} ({})", par_level, par_reason);
334    } else {
335        println!("  Parallelism:  {} ({})", par_level, par_reason);
336    }
337    for w in &diag.warnings {
338        println!("  Warning:      {}", w);
339    }
340    if let Some(suggestion) = &diag.suggestion {
341        println!("  Suggestion:   {}", suggestion);
342    }
343}
344
345#[cfg(test)]
346mod tests {
347    use super::*;
348    use crate::config::{DestinationConfig, DestinationType, ExportConfig, ExportMode, FormatType};
349    use doctor::{
350        categorize_dest_error, categorize_source_error, destination_error_hint, source_error_hint,
351    };
352
353    fn make_export(name: &str, mode: ExportMode, cursor: Option<&str>) -> ExportConfig {
354        // Baseline from the canonical test fixture; override only the fields
355        // these preflight tests vary (mode, cursor, CSV format, query, dest).
356        ExportConfig {
357            mode,
358            cursor_column: cursor.map(|s| s.to_string()),
359            query: Some("SELECT * FROM t".to_string()),
360            format: FormatType::Csv,
361            destination: DestinationConfig {
362                destination_type: DestinationType::Local,
363                path: Some("./out".to_string()),
364                ..Default::default()
365            },
366            ..crate::config::sample_export(name)
367        }
368    }
369
370    // ── L8: 'fail ✗' note when --target FAILs but --strict was not passed ─────
371    // The glyph implies a hard failure; exit is gated only by --strict. The note
372    // tells an operator/CI the exit is 0 so the glyph doesn't mislead.
373    #[test]
374    fn target_fail_note_names_count_target_and_strict_gate() {
375        let note = target_fail_note(2, "bigquery");
376        assert!(note.contains("2 columns FAIL"), "got: {note}");
377        assert!(note.contains("bigquery"), "got: {note}");
378        assert!(note.contains("--strict"), "got: {note}");
379        assert!(note.contains("exit 0"), "got: {note}");
380    }
381
382    #[test]
383    fn target_fail_note_singular_for_one_column() {
384        let note = target_fail_note(1, "duckdb");
385        assert!(note.contains("1 column FAIL"), "got: {note}");
386        assert!(!note.contains("1 columns"), "should be singular: {note}");
387    }
388
389    #[test]
390    fn verdict_small_indexed_with_cursor_is_efficient() {
391        let v = compute_verdict(Some(500_000), true, true);
392        assert!(matches!(v, HealthVerdict::Efficient), "got: {v}");
393    }
394
395    #[test]
396    fn verdict_large_indexed_with_cursor_is_acceptable() {
397        let v = compute_verdict(Some(20_000_000), true, true);
398        assert!(matches!(v, HealthVerdict::Acceptable), "got: {v}");
399    }
400
401    #[test]
402    fn verdict_no_index_no_cursor_is_degraded() {
403        let v = compute_verdict(Some(500_000), false, false);
404        assert!(matches!(v, HealthVerdict::Degraded), "got: {v}");
405    }
406
407    #[test]
408    fn verdict_huge_no_index_is_unsafe() {
409        let v = compute_verdict(Some(100_000_000), false, false);
410        assert!(matches!(v, HealthVerdict::Unsafe), "got: {v}");
411    }
412
413    #[test]
414    fn parse_pg_row_estimate_from_sort_plan() {
415        let plan = "Sort  (cost=12345.67..12456.78 rows=1000455 width=50)\n  ->  Seq Scan on orders  (cost=0.00..8765.43 rows=1000455 width=50)";
416        assert_eq!(parse_pg_row_estimate(plan), Some(1_000_455));
417    }
418
419    #[test]
420    fn parse_pg_row_estimate_from_index_scan() {
421        let plan =
422            "Index Scan using idx_updated on orders  (cost=0.42..81676.36 rows=500000 width=50)";
423        assert_eq!(parse_pg_row_estimate(plan), Some(500_000));
424    }
425
426    #[test]
427    fn extract_scan_type_detects_seq_scan() {
428        let plan = "Sort  (cost=...)\n  ->  Seq Scan on users  (cost=...)";
429        let st = extract_scan_type(plan);
430        assert!(st.contains("Seq Scan"), "expected Seq Scan, got: {st}");
431    }
432
433    #[test]
434    fn extract_scan_type_detects_index_scan() {
435        let plan = "Index Scan using users_pkey on users  (cost=0.42..123.45 rows=100 width=50)";
436        let st = extract_scan_type(plan);
437        assert!(st.contains("Index Scan"), "expected Index Scan, got: {st}");
438    }
439
440    #[test]
441    fn suggestion_for_efficient_verdict_is_none() {
442        let e = make_export("t", ExportMode::Full, None);
443        let s = build_suggestion(&HealthVerdict::Efficient, Some(1000), true, &e);
444        assert!(
445            s.is_none(),
446            "efficient verdict should produce no suggestion"
447        );
448    }
449
450    #[test]
451    fn suggestion_for_degraded_verdict_recommends_safe_profile() {
452        let e = make_export("t", ExportMode::Full, None);
453        let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e);
454        let msg = s.expect("degraded verdict should produce a suggestion");
455        assert!(
456            msg.contains("safe"),
457            "suggestion should recommend safe profile, got: {msg}"
458        );
459    }
460
461    fn src_err(msg: &str) -> &'static str {
462        categorize_source_error(&anyhow::anyhow!("{}", msg))
463    }
464
465    #[test]
466    fn source_password_rejected_is_auth_error() {
467        assert_eq!(
468            src_err("password authentication failed for user \"rivet\""),
469            "auth error"
470        );
471    }
472
473    #[test]
474    fn source_authentication_failed_is_auth_error() {
475        assert_eq!(src_err("FATAL: authentication failed"), "auth error");
476    }
477
478    #[test]
479    fn source_access_denied_is_auth_error() {
480        assert_eq!(
481            src_err("Access denied for user 'rivet'@'localhost'"),
482            "auth error"
483        );
484    }
485
486    #[test]
487    fn source_connection_refused_is_connectivity() {
488        assert_eq!(
489            src_err("connection refused (os error 61)"),
490            "connectivity error"
491        );
492    }
493
494    #[test]
495    fn source_timed_out_is_connectivity() {
496        assert_eq!(src_err("connection timed out"), "connectivity error");
497    }
498
499    #[test]
500    fn source_dns_translate_host_is_connectivity() {
501        assert_eq!(
502            src_err("could not translate host name \"db.bad\" to address"),
503            "connectivity error"
504        );
505    }
506
507    #[test]
508    fn source_name_not_known_is_connectivity() {
509        assert_eq!(src_err("Name or service not known"), "connectivity error");
510    }
511
512    #[test]
513    fn source_unknown_error_is_generic() {
514        assert_eq!(src_err("something totally unexpected"), "error");
515    }
516
517    fn dest_config(dtype: DestinationType) -> DestinationConfig {
518        DestinationConfig {
519            destination_type: dtype,
520            bucket: Some("b".to_string()),
521            ..Default::default()
522        }
523    }
524
525    fn dest_err(msg: &str, dtype: DestinationType) -> &'static str {
526        let cfg = dest_config(dtype);
527        categorize_dest_error(&anyhow::anyhow!("{}", msg), &cfg)
528    }
529
530    fn local_dest(path: &str) -> DestinationConfig {
531        DestinationConfig {
532            destination_type: DestinationType::Local,
533            path: Some(path.to_string()),
534            ..Default::default()
535        }
536    }
537
538    // Regression (doctor-dedup): doctor's inline dedup key omitted `path`,
539    // so two local destinations with different paths collapsed to one entry
540    // and the second was never write-probed. The shared identity must keep
541    // them distinct.
542    #[test]
543    fn destination_identity_distinguishes_local_paths() {
544        assert_ne!(
545            destination_identity(&local_dest("/tmp/a")),
546            destination_identity(&local_dest("/tmp/b")),
547        );
548    }
549
550    #[test]
551    fn destination_identity_collapses_identical_local_destinations() {
552        assert_eq!(
553            destination_identity(&local_dest("/tmp/a")),
554            destination_identity(&local_dest("/tmp/a")),
555        );
556    }
557
558    #[test]
559    fn destination_identity_distinguishes_buckets() {
560        let a = DestinationConfig {
561            bucket: Some("bucket-a".to_string()),
562            ..dest_config(DestinationType::S3)
563        };
564        let b = DestinationConfig {
565            bucket: Some("bucket-b".to_string()),
566            ..dest_config(DestinationType::S3)
567        };
568        assert_ne!(destination_identity(&a), destination_identity(&b));
569    }
570
571    // Same bucket name on different endpoints (e.g. AWS vs MinIO) is two
572    // distinct destinations and must be probed separately.
573    #[test]
574    fn destination_identity_distinguishes_endpoints_for_same_bucket() {
575        let aws = dest_config(DestinationType::S3);
576        let minio = DestinationConfig {
577            endpoint: Some("http://localhost:9000".to_string()),
578            ..dest_config(DestinationType::S3)
579        };
580        assert_ne!(destination_identity(&aws), destination_identity(&minio));
581    }
582
583    #[test]
584    fn dest_credential_loading_is_auth_error() {
585        assert_eq!(
586            dest_err(
587                "loading credential to sign http request",
588                DestinationType::Gcs
589            ),
590            "auth error"
591        );
592    }
593
594    #[test]
595    fn dest_permission_denied_is_auth_error() {
596        assert_eq!(
597            dest_err("permission denied on resource bucket", DestinationType::S3),
598            "auth error"
599        );
600    }
601
602    #[test]
603    fn dest_forbidden_is_auth_error() {
604        assert_eq!(
605            dest_err("403 Forbidden", DestinationType::Gcs),
606            "auth error"
607        );
608    }
609
610    #[test]
611    fn dest_unauthorized_is_auth_error() {
612        assert_eq!(
613            dest_err("401 Unauthorized", DestinationType::S3),
614            "auth error"
615        );
616    }
617
618    #[test]
619    fn dest_invalid_grant_is_auth_error() {
620        assert_eq!(
621            dest_err(
622                "invalid_grant: token has been revoked",
623                DestinationType::Gcs
624            ),
625            "auth error"
626        );
627    }
628
629    #[test]
630    fn dest_nosuchbucket_s3_is_bucket_not_found() {
631        assert_eq!(
632            dest_err(
633                "NoSuchBucket: the specified bucket does not exist",
634                DestinationType::S3
635            ),
636            "bucket not found"
637        );
638    }
639
640    #[test]
641    fn dest_not_found_gcs_is_bucket_not_found() {
642        assert_eq!(
643            dest_err("bucket not found (404)", DestinationType::Gcs),
644            "bucket not found"
645        );
646    }
647
648    #[test]
649    fn dest_not_found_local_is_path_not_found() {
650        assert_eq!(
651            dest_err("path not found: /tmp/missing", DestinationType::Local),
652            "path not found"
653        );
654    }
655
656    #[test]
657    fn dest_connection_refused_is_connectivity() {
658        assert_eq!(
659            dest_err("connection refused to endpoint", DestinationType::S3),
660            "connectivity error"
661        );
662    }
663
664    #[test]
665    fn dest_dns_error_is_connectivity() {
666        assert_eq!(
667            dest_err("dns error: failed to lookup address", DestinationType::S3),
668            "connectivity error"
669        );
670    }
671
672    #[test]
673    fn dest_timed_out_is_connectivity() {
674        assert_eq!(
675            dest_err("request timed out after 30s", DestinationType::Gcs),
676            "connectivity error"
677        );
678    }
679
680    #[test]
681    fn dest_unknown_error_is_generic() {
682        assert_eq!(
683            dest_err("something else entirely", DestinationType::S3),
684            "error"
685        );
686    }
687
688    #[test]
689    fn strategy_full_scan() {
690        let e = make_export("t", ExportMode::Full, None);
691        assert_eq!(derive_strategy(&e), "full-scan");
692    }
693
694    #[test]
695    fn strategy_full_parallel() {
696        let mut e = make_export("t", ExportMode::Full, None);
697        e.parallel = 4;
698        assert_eq!(derive_strategy(&e), "full-parallel(4)");
699    }
700
701    #[test]
702    fn strategy_incremental() {
703        let e = make_export("t", ExportMode::Incremental, Some("updated_at"));
704        assert_eq!(derive_strategy(&e), "incremental(updated_at)");
705    }
706
707    #[test]
708    fn strategy_chunked() {
709        let mut e = make_export("t", ExportMode::Chunked, None);
710        e.chunk_column = Some("id".to_string());
711        e.chunk_size = 50_000;
712        assert_eq!(derive_strategy(&e), "chunked(id, size=50000)");
713    }
714
715    #[test]
716    fn strategy_chunked_parallel() {
717        let mut e = make_export("t", ExportMode::Chunked, None);
718        e.chunk_column = Some("id".to_string());
719        e.chunk_size = 50_000;
720        e.parallel = 3;
721        assert_eq!(derive_strategy(&e), "chunked-parallel(id, size=50000, p=3)");
722    }
723
724    #[test]
725    fn strategy_time_window() {
726        let mut e = make_export("t", ExportMode::TimeWindow, None);
727        e.time_column = Some("created_at".to_string());
728        e.days_window = Some(7);
729        assert_eq!(derive_strategy(&e), "time-window(created_at, 7d)");
730    }
731
732    #[test]
733    fn profile_small_indexed_is_fast() {
734        let e = make_export("t", ExportMode::Full, None);
735        assert_eq!(recommend_profile(Some(500_000), true, &e), "fast");
736    }
737
738    #[test]
739    fn profile_medium_indexed_is_balanced() {
740        let e = make_export("t", ExportMode::Full, None);
741        assert_eq!(recommend_profile(Some(5_000_000), true, &e), "balanced");
742    }
743
744    #[test]
745    fn profile_large_indexed_is_safe() {
746        let e = make_export("t", ExportMode::Full, None);
747        assert_eq!(recommend_profile(Some(50_000_000), true, &e), "safe");
748    }
749
750    #[test]
751    fn profile_small_no_index_is_balanced() {
752        let e = make_export("t", ExportMode::Full, None);
753        assert_eq!(recommend_profile(Some(50_000), false, &e), "balanced");
754    }
755
756    #[test]
757    fn profile_small_no_index_parallel_is_safe() {
758        let mut e = make_export("t", ExportMode::Full, None);
759        e.parallel = 4;
760        assert_eq!(recommend_profile(Some(50_000), false, &e), "safe");
761    }
762
763    #[test]
764    fn profile_medium_no_index_is_balanced() {
765        let e = make_export("t", ExportMode::Full, None);
766        assert_eq!(recommend_profile(Some(500_000), false, &e), "balanced");
767    }
768
769    #[test]
770    fn profile_large_no_index_is_safe() {
771        let e = make_export("t", ExportMode::Full, None);
772        assert_eq!(recommend_profile(Some(5_000_000), false, &e), "safe");
773    }
774
775    #[test]
776    fn sparse_range_warning_when_very_sparse() {
777        let mut e = make_export("t", ExportMode::Chunked, None);
778        e.chunk_column = Some("id".to_string());
779        e.chunk_size = 100_000;
780        let w = check_sparse_range(&e, Some(100_000), Some("1"), Some("10000000"));
781        assert!(w.is_some(), "should warn about sparse range");
782        let msg = w.unwrap();
783        assert!(msg.contains("Sparse key range"), "got: {msg}");
784        assert!(msg.contains("empty"), "got: {msg}");
785    }
786
787    #[test]
788    fn sparse_range_no_warning_when_dense() {
789        let mut e = make_export("t", ExportMode::Chunked, None);
790        e.chunk_column = Some("id".to_string());
791        e.chunk_size = 100_000;
792        let w = check_sparse_range(&e, Some(100_000), Some("1"), Some("100000"));
793        assert!(w.is_none(), "should not warn for dense range");
794    }
795
796    #[test]
797    fn sparse_range_skipped_when_chunk_dense() {
798        let mut e = make_export("t", ExportMode::Chunked, None);
799        e.chunk_column = Some("id".to_string());
800        e.chunk_dense = true;
801        e.chunk_size = 100_000;
802        let w = check_sparse_range(&e, Some(100_000), Some("1"), Some("10000000"));
803        assert!(
804            w.is_none(),
805            "chunk_dense uses ordinals, not physical id span"
806        );
807    }
808
809    #[test]
810    fn dense_surrogate_warning_when_chunk_dense_builtin() {
811        let mut e = make_export("t", ExportMode::Chunked, None);
812        e.chunk_column = Some("id".to_string());
813        e.chunk_dense = true;
814        e.query = Some("SELECT id FROM orders".to_string());
815        let w = check_dense_surrogate_cost(&e);
816        assert!(w.is_some(), "should warn about built-in ROW_NUMBER cost");
817        assert!(w.unwrap().contains("global sort"));
818    }
819
820    #[test]
821    fn sparse_range_not_triggered_for_non_chunked() {
822        let e = make_export("t", ExportMode::Full, None);
823        let w = check_sparse_range(&e, Some(100), Some("1"), Some("1000000"));
824        assert!(w.is_none(), "should not warn for non-chunked mode");
825    }
826
827    #[test]
828    fn dense_surrogate_warning_with_row_number() {
829        let mut e = make_export("t", ExportMode::Chunked, None);
830        e.chunk_column = Some("rn".to_string());
831        e.query = Some("SELECT *, ROW_NUMBER() OVER (ORDER BY id) AS rn FROM orders".to_string());
832        let w = check_dense_surrogate_cost(&e);
833        assert!(w.is_some(), "should warn about ROW_NUMBER cost");
834        assert!(w.unwrap().contains("global sort"));
835    }
836
837    #[test]
838    fn no_dense_surrogate_warning_without_row_number() {
839        let mut e = make_export("t", ExportMode::Chunked, None);
840        e.chunk_column = Some("id".to_string());
841        e.query = Some("SELECT * FROM orders".to_string());
842        let w = check_dense_surrogate_cost(&e);
843        assert!(w.is_none());
844    }
845
846    #[test]
847    fn no_dense_surrogate_warning_for_non_chunked() {
848        let mut e = make_export("t", ExportMode::Full, None);
849        e.query = Some("SELECT ROW_NUMBER() OVER () AS rn FROM t".to_string());
850        let w = check_dense_surrogate_cost(&e);
851        assert!(w.is_none(), "should not warn for non-chunked mode");
852    }
853
854    #[test]
855    fn parallel_memory_warning_large_dataset() {
856        let mut e = make_export("t", ExportMode::Chunked, None);
857        e.parallel = 4;
858        let w = check_parallel_memory_risk(&e, Some(10_000_000));
859        assert!(w.is_some(), "should warn about memory risk");
860        let msg = w.unwrap();
861        assert!(msg.contains("Parallel=4"), "got: {msg}");
862        assert!(msg.contains("memory"), "got: {msg}");
863    }
864
865    #[test]
866    fn no_parallel_memory_warning_small_dataset() {
867        let mut e = make_export("t", ExportMode::Chunked, None);
868        e.parallel = 4;
869        let w = check_parallel_memory_risk(&e, Some(1_000));
870        assert!(w.is_none(), "should not warn for small dataset");
871    }
872
873    #[test]
874    fn no_parallel_memory_warning_single_worker() {
875        let e = make_export("t", ExportMode::Full, None);
876        let w = check_parallel_memory_risk(&e, Some(100_000_000));
877        assert!(w.is_none(), "should not warn when parallel=1");
878    }
879
880    #[test]
881    fn suggestion_degraded_full_recommends_incremental() {
882        let e = make_export("t", ExportMode::Full, None);
883        let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e).unwrap();
884        assert!(s.contains("incremental"), "got: {s}");
885    }
886
887    #[test]
888    fn suggestion_degraded_chunked_recommends_index() {
889        let mut e = make_export("t", ExportMode::Chunked, None);
890        e.chunk_column = Some("id".to_string());
891        let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e).unwrap();
892        assert!(s.contains("index on 'id'"), "got: {s}");
893    }
894
895    #[test]
896    fn suggestion_degraded_time_window_recommends_index() {
897        let mut e = make_export("t", ExportMode::TimeWindow, None);
898        e.time_column = Some("created_at".to_string());
899        e.days_window = Some(7);
900        let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e).unwrap();
901        assert!(s.contains("index on 'created_at'"), "got: {s}");
902    }
903
904    #[test]
905    fn suggestion_unsafe_full_recommends_incremental() {
906        let e = make_export("t", ExportMode::Full, None);
907        let s = build_suggestion(&HealthVerdict::Unsafe, Some(100_000_000), false, &e).unwrap();
908        assert!(s.contains("incremental"), "got: {s}");
909    }
910
911    #[test]
912    fn suggestion_unsafe_chunked_recommends_index_and_parallel() {
913        let mut e = make_export("t", ExportMode::Chunked, None);
914        e.chunk_column = Some("id".to_string());
915        let s = build_suggestion(&HealthVerdict::Unsafe, Some(100_000_000), false, &e).unwrap();
916        assert!(s.contains("index on 'id'"), "got: {s}");
917        assert!(s.contains("parallel"), "got: {s}");
918    }
919
920    #[test]
921    fn suggestion_unsafe_incremental_recommends_index_on_cursor() {
922        let e = make_export("t", ExportMode::Incremental, Some("updated_at"));
923        let s = build_suggestion(&HealthVerdict::Unsafe, Some(100_000_000), false, &e).unwrap();
924        assert!(s.contains("index on 'updated_at'"), "got: {s}");
925    }
926
927    #[test]
928    fn suggestion_acceptable_large_full_recommends_incremental() {
929        let e = make_export("t", ExportMode::Full, None);
930        let s = build_suggestion(&HealthVerdict::Acceptable, Some(20_000_000), true, &e).unwrap();
931        assert!(s.contains("incremental"), "got: {s}");
932    }
933
934    #[test]
935    fn parallel_only_for_chunked_mode() {
936        let e = make_export("t", ExportMode::Full, None);
937        let (level, _) = recommend_parallelism(&e, Some(1_000_000), true);
938        assert_eq!(level, 1, "non-chunked mode should recommend 1");
939    }
940
941    #[test]
942    fn parallel_small_dataset_is_one() {
943        let mut e = make_export("t", ExportMode::Chunked, None);
944        e.chunk_column = Some("id".to_string());
945        let (level, _) = recommend_parallelism(&e, Some(10_000), true);
946        assert_eq!(level, 1, "small dataset should recommend 1");
947    }
948
949    #[test]
950    fn parallel_moderate_indexed_is_two() {
951        let mut e = make_export("t", ExportMode::Chunked, None);
952        e.chunk_column = Some("id".to_string());
953        let (level, _) = recommend_parallelism(&e, Some(200_000), true);
954        assert_eq!(level, 2, "moderate indexed dataset should recommend 2");
955    }
956
957    #[test]
958    fn parallel_large_indexed_is_four() {
959        let mut e = make_export("t", ExportMode::Chunked, None);
960        e.chunk_column = Some("id".to_string());
961        let (level, _) = recommend_parallelism(&e, Some(2_000_000), true);
962        assert_eq!(level, 4, "large indexed dataset should recommend 4");
963    }
964
965    #[test]
966    fn parallel_no_index_large_is_one() {
967        let mut e = make_export("t", ExportMode::Chunked, None);
968        e.chunk_column = Some("id".to_string());
969        let (level, reason) = recommend_parallelism(&e, Some(10_000_000), false);
970        assert_eq!(level, 1, "no index + large should recommend 1");
971        assert!(reason.contains("no index"), "got: {reason}");
972    }
973
974    #[test]
975    fn parallel_no_index_moderate_is_conservative() {
976        let mut e = make_export("t", ExportMode::Chunked, None);
977        e.chunk_column = Some("id".to_string());
978        let (level, _) = recommend_parallelism(&e, Some(200_000), false);
979        assert_eq!(
980            level, 2,
981            "no index + moderate should recommend 2 (conservative)"
982        );
983    }
984
985    #[test]
986    fn suggestion_acceptable_large_chunked_recommends_parallel() {
987        let mut e = make_export("t", ExportMode::Chunked, None);
988        e.chunk_column = Some("id".to_string());
989        let s = build_suggestion(&HealthVerdict::Acceptable, Some(20_000_000), true, &e).unwrap();
990        assert!(s.contains("parallel"), "got: {s}");
991    }
992
993    #[test]
994    fn connection_limit_warn_when_parallel_meets_max() {
995        let w = check_connection_limit(20, Some(20));
996        assert!(w.is_some(), "should warn when parallel == max_connections");
997        let msg = w.unwrap();
998        assert!(msg.contains("max_connections=20"), "got: {msg}");
999        assert!(msg.contains("parallel=20"), "got: {msg}");
1000    }
1001
1002    #[test]
1003    fn connection_limit_warn_when_parallel_exceeds_max() {
1004        let w = check_connection_limit(100, Some(20));
1005        assert!(w.is_some(), "should warn when parallel > max_connections");
1006        let msg = w.unwrap();
1007        assert!(msg.contains("max_connections=20"), "got: {msg}");
1008    }
1009
1010    #[test]
1011    fn connection_limit_no_warn_when_parallel_below_max() {
1012        let w = check_connection_limit(4, Some(100));
1013        assert!(
1014            w.is_none(),
1015            "should not warn when parallel << max_connections"
1016        );
1017    }
1018
1019    #[test]
1020    fn connection_limit_no_warn_when_parallel_is_one() {
1021        let w = check_connection_limit(1, Some(5));
1022        assert!(
1023            w.is_none(),
1024            "single worker never triggers connection warning"
1025        );
1026    }
1027
1028    #[test]
1029    fn connection_limit_skipped_note_when_max_unknown_and_parallel_gt_one() {
1030        let w = check_connection_limit(100, None);
1031        assert!(w.is_some(), "should note that check was skipped");
1032        let msg = w.unwrap();
1033        assert!(msg.contains("skipped"), "got: {msg}");
1034    }
1035
1036    #[test]
1037    fn connection_limit_no_note_when_max_unknown_and_parallel_is_one() {
1038        let w = check_connection_limit(1, None);
1039        assert!(
1040            w.is_none(),
1041            "single worker never triggers connection warning"
1042        );
1043    }
1044
1045    #[test]
1046    fn connection_limit_suggests_headroom() {
1047        let w = check_connection_limit(25, Some(20)).unwrap();
1048        // Suggested safe max should be max_connections - 3 = 17
1049        assert!(
1050            w.contains("17"),
1051            "should suggest leaving headroom, got: {w}"
1052        );
1053    }
1054
1055    // ── v0.7.4: actionable hints next to categorised errors ───────────
1056
1057    fn src_hint(msg: &str, st: SourceType) -> Option<&'static str> {
1058        let err = anyhow::anyhow!("{}", msg);
1059        let cat = categorize_source_error(&err);
1060        source_error_hint(cat, &err, &st)
1061    }
1062
1063    fn dest_hint(msg: &str, dt: DestinationType) -> Option<&'static str> {
1064        let err = anyhow::anyhow!("{}", msg);
1065        let dest = DestinationConfig {
1066            destination_type: dt,
1067            bucket: Some("b".into()),
1068            ..Default::default()
1069        };
1070        let cat = categorize_dest_error(&err, &dest);
1071        destination_error_hint(cat, &dest)
1072    }
1073
1074    #[test]
1075    fn source_tls_handshake_returns_pg_specific_tls_hint() {
1076        let h = src_hint("TLS handshake failed", SourceType::Postgres).expect("hint");
1077        assert!(h.contains("tls.mode") && h.contains("ca_file"), "got: {h}");
1078    }
1079
1080    #[test]
1081    fn source_tls_handshake_returns_mysql_specific_tls_hint() {
1082        let h = src_hint("certificate verify failed", SourceType::Mysql).expect("hint");
1083        assert!(h.contains("tls.mode"), "got: {h}");
1084    }
1085
1086    #[test]
1087    fn source_auth_error_postgres_mentions_pg_hba() {
1088        let h = src_hint("password authentication failed", SourceType::Postgres).expect("hint");
1089        assert!(h.contains("pg_hba") && h.contains("SELECT"), "got: {h}");
1090    }
1091
1092    #[test]
1093    fn source_auth_error_mysql_mentions_grant() {
1094        let h = src_hint(
1095            "Access denied for user 'rivet'@'localhost'",
1096            SourceType::Mysql,
1097        )
1098        .expect("hint");
1099        assert!(h.contains("GRANT") && h.contains("FLUSH"), "got: {h}");
1100    }
1101
1102    #[test]
1103    fn source_connectivity_error_mentions_bastion_and_network() {
1104        let h = src_hint("connection refused", SourceType::Postgres).expect("hint");
1105        assert!(h.contains("bastion") || h.contains("VPN"), "got: {h}");
1106    }
1107
1108    #[test]
1109    fn source_unknown_error_returns_no_hint() {
1110        // Generic "error" category should yield no hint — better to
1111        // print the raw driver message than to mislead.
1112        let h = src_hint("totally unexpected", SourceType::Postgres);
1113        assert!(h.is_none(), "unknown errors should not produce a hint");
1114    }
1115
1116    #[test]
1117    fn dest_s3_auth_error_names_concrete_actions() {
1118        let h = dest_hint("permission denied", DestinationType::S3).expect("hint");
1119        assert!(
1120            h.contains("s3:PutObject") && h.contains("cloud-permissions"),
1121            "got: {h}"
1122        );
1123    }
1124
1125    #[test]
1126    fn dest_gcs_auth_error_names_concrete_actions() {
1127        let h = dest_hint("403 Forbidden", DestinationType::Gcs).expect("hint");
1128        assert!(
1129            h.contains("storage.objects") && h.contains("cloud-permissions"),
1130            "got: {h}"
1131        );
1132    }
1133
1134    #[test]
1135    fn categorize_dest_error_sas_expired_message_returns_sas_expired_category() {
1136        // Guard the load-bearing ordering in categorize_dest_error: the
1137        // "sas expired" early-return must fire before the generic "token"
1138        // branch, or destination_error_hint produces the wrong hint.
1139        // This test pins the *category string*, not just the final hint text.
1140        let err = anyhow::anyhow!(
1141            "Azure SAS token already expired (se=2024-01-01T00:00:00Z). Generate a new SAS and re-export."
1142        );
1143        let dest = DestinationConfig {
1144            destination_type: DestinationType::Azure,
1145            bucket: Some("c".into()),
1146            ..Default::default()
1147        };
1148        let cat = categorize_dest_error(&err, &dest);
1149        assert_eq!(
1150            cat, "sas expired",
1151            "expired-SAS error must categorise as 'sas expired', not '{cat}' — ordering in categorize_dest_error is load-bearing"
1152        );
1153    }
1154
1155    #[test]
1156    fn dest_azure_sas_expired_returns_regenerate_hint() {
1157        // The Azure preflight (v0.7.4) bails with "expired (se=…)" —
1158        // the hint must steer the operator to `az storage container
1159        // generate-sas` not "your IAM role is broken".
1160        let h = dest_hint(
1161            "Azure SAS token already expired (se=2024-01-01T00:00:00Z)",
1162            DestinationType::Azure,
1163        )
1164        .expect("hint");
1165        assert!(
1166            h.contains("generate-sas") && h.contains("AZURE_STORAGE_SAS_TOKEN"),
1167            "got: {h}"
1168        );
1169    }
1170
1171    #[test]
1172    fn dest_s3_bucket_not_found_says_no_auto_create() {
1173        let h = dest_hint("NoSuchBucket", DestinationType::S3).expect("hint");
1174        assert!(
1175            h.contains("does NOT auto-create") && h.contains("aws s3 mb"),
1176            "got: {h}"
1177        );
1178    }
1179
1180    #[test]
1181    fn dest_s3_connectivity_error_warns_about_region_mismatch() {
1182        let h = dest_hint("dns error", DestinationType::S3).expect("hint");
1183        assert!(h.contains("region") || h.contains("endpoint"), "got: {h}");
1184    }
1185}