Skip to main content

rivet/preflight/
mod.rs

1mod analysis;
2pub(crate) mod cursor_expr;
3mod doctor;
4mod mssql;
5mod mysql;
6mod postgres;
7mod schema_error;
8pub mod type_report;
9
10pub(crate) use analysis::chunk_sparsity_from_counts;
11#[cfg(test)]
12use analysis::{
13    build_suggestion, check_connection_limit, check_dense_surrogate_cost,
14    check_parallel_memory_risk, check_sparse_range, compute_verdict, derive_strategy,
15    recommend_parallelism, recommend_profile,
16};
17#[allow(unused_imports)]
18pub use doctor::doctor;
19// Reused at the run-time connect seam (src/pipeline/single.rs) so a failed
20// `rivet run` carries the same category + remediation hint `rivet doctor` gives.
21pub(crate) use doctor::{categorize_source_error, source_error_hint};
22#[cfg(test)]
23use postgres::{extract_scan_type, parse_pg_row_estimate};
24
25use crate::config::{Config, ExportConfig, SourceType};
26use crate::error::Result;
27use crate::types::policy::TypePolicy;
28use crate::types::target::{ExportTarget, TargetStatus};
29
30#[derive(Debug)]
31pub enum HealthVerdict {
32    Efficient,
33    Acceptable,
34    Degraded,
35    Unsafe,
36}
37
38impl std::fmt::Display for HealthVerdict {
39    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40        match self {
41            Self::Efficient => write!(f, "EFFICIENT"),
42            Self::Acceptable => write!(f, "ACCEPTABLE"),
43            Self::Degraded => write!(f, "DEGRADED"),
44            Self::Unsafe => write!(f, "UNSAFE"),
45        }
46    }
47}
48
49pub(crate) struct ExportDiagnostic {
50    pub export_name: String,
51    pub strategy: String,
52    pub mode: String,
53    pub cursor_column: Option<String>,
54    pub row_estimate: Option<i64>,
55    pub cursor_min: Option<String>,
56    pub cursor_max: Option<String>,
57    pub scan_type: Option<String>,
58    pub uses_index: bool,
59    pub verdict: HealthVerdict,
60    pub recommended_profile: &'static str,
61    pub recommended_parallel: (u32, &'static str),
62    pub warnings: Vec<String>,
63    pub suggestion: Option<String>,
64}
65
66/// Return the diagnostic for a single export without printing anything.
67///
68/// Used by `rivet plan` to capture preflight data into a `PlanArtifact`.
69pub(crate) fn get_export_diagnostic(
70    config: &Config,
71    export: &ExportConfig,
72) -> Result<ExportDiagnostic> {
73    let url = config.source.resolve_url()?;
74    let tls = config.source.tls.as_ref();
75    crate::source::warn_if_tls_disabled(&config.source);
76    match config.source.source_type {
77        SourceType::Postgres => postgres::diagnose_export_pg(&url, tls, export),
78        SourceType::Mysql => mysql::diagnose_export_mysql(&url, tls, export),
79        SourceType::Mssql => mssql::diagnose_export_mssql(&url, tls, export),
80    }
81}
82
83/// Dedup identity for a destination, shared by `check`'s credential probe
84/// and `doctor`'s write probe. Must include every field that changes where
85/// a probe lands — notably `path`, so two local destinations with different
86/// paths are probed separately. Keeping one helper prevents the two call
87/// sites from drifting apart (doctor's inline copy once omitted `path` and
88/// silently skipped the second local destination).
89fn destination_identity(d: &crate::config::DestinationConfig) -> String {
90    format!(
91        "{:?}:{}:{}:{}",
92        d.destination_type,
93        d.bucket.as_deref().unwrap_or("-"),
94        d.endpoint.as_deref().unwrap_or("-"),
95        d.path.as_deref().unwrap_or("-"),
96    )
97}
98
99/// One-line note for the "fail ✗ but rc 0" case: a column rendered `fail ✗`
100/// for `--target` does NOT gate the exit code unless `--strict` is also passed
101/// (that is the gate by design). Without this note an operator or CI reading
102/// the glyph alone would wrongly assume a non-zero exit. Pure so the exact text
103/// is unit-tested.
104fn target_fail_note(n: usize, target_label: &str) -> String {
105    let col = if n == 1 { "column" } else { "columns" };
106    format!(
107        "Note: {n} {col} FAIL {target_label} compatibility; exit code is gated only with --strict (currently exit 0)"
108    )
109}
110
111pub fn check(
112    config_path: &str,
113    export_name: Option<&str>,
114    params: Option<&std::collections::HashMap<String, String>>,
115    show_type_report: bool,
116    strict: bool,
117    json_output: bool,
118    target: Option<ExportTarget>,
119) -> Result<()> {
120    let config = Config::load_with_params(config_path, params)?;
121
122    let exports: Vec<&ExportConfig> = if let Some(name) = export_name {
123        let e = config
124            .exports
125            .iter()
126            .find(|e| e.name == name)
127            .ok_or_else(|| anyhow::anyhow!("export '{}' not found in config", name))?;
128        vec![e]
129    } else {
130        config.exports.iter().collect()
131    };
132
133    let url = config.source.resolve_url()?;
134    let tls = config.source.tls.as_ref();
135    // Surface the plaintext-transport warning at preflight time too —
136    // operators should hear it from `rivet check` before they wait
137    // through a full `rivet run` to learn the same thing. `Once` inside
138    // the helper keeps emission to one line per process even when both
139    // `check` and `run` flow through it.
140    crate::source::warn_if_tls_disabled(&config.source);
141    match config.source.source_type {
142        SourceType::Postgres => postgres::check_postgres(&url, tls, &exports, json_output)?,
143        SourceType::Mysql => mysql::check_mysql(&url, tls, &exports, json_output)?,
144        SourceType::Mssql => mssql::check_mssql(&url, tls, &exports, json_output)?,
145    }
146
147    // Destination credential-resolution preflight.  Until 0.7.6 `check` only
148    // probed the source: a config with `AWS_ACCESS_KEY_ID` unset would pass
149    // `rivet check` (rc=0) and then explode on `run`, while `rivet doctor`
150    // caught it.  We don't issue a write-probe here (that is `doctor`'s job
151    // and has side effects) — but we *do* call `create_destination`, which
152    // resolves env vars / credentials_file existence at construction time.
153    // Each unique destination is probed once per `check` to keep multi-export
154    // configs cheap.
155    let mut seen_destinations: std::collections::HashSet<String> = std::collections::HashSet::new();
156    for export in &exports {
157        let dest_key = destination_identity(&export.destination);
158        if !seen_destinations.insert(dest_key) {
159            continue;
160        }
161        let expanded = crate::plan::build::expand_destination_templates(
162            export.destination.clone(),
163            &export.name,
164        );
165        crate::destination::create_destination(&expanded).map_err(|e| {
166            anyhow::anyhow!(
167                "export '{}': destination preflight failed: {:#}",
168                export.name,
169                e
170            )
171        })?;
172    }
173
174    // Whether the check ends clean (no strict failure, no target-fail column).
175    // Stays true for the default `rivet check` (no type report), so the "Next:
176    // rivet run" pointer still fires.
177    let mut clean = true;
178
179    if show_type_report {
180        let policy = if strict {
181            TypePolicy::strict()
182        } else {
183            TypePolicy::warn_only()
184        };
185
186        let mut any_fatal = false;
187        // Count hard target-FAIL columns (and remember which target) so that —
188        // when --strict was NOT passed and the exit code is therefore 0 — we can
189        // print a note. The "fail ✗" glyph in the table implies a hard failure,
190        // but exit is gated only by --strict; without this note an operator or CI
191        // reading the glyph alone would be misled into thinking rc != 0.
192        let mut target_fail_cols = 0usize;
193        let mut target_fail_label: Option<&'static str> = None;
194        for export in &exports {
195            let column_overrides =
196                crate::plan::parse_column_overrides_pub(&export.columns, &export.name)?;
197            // CLI `--target` wins; otherwise fall back to the per-export
198            // `target:` from the config (slice #2a). A declared-but-unknown
199            // target is a loud error — never silently ignored.
200            if let Some(t) = export.target.as_deref()
201                && crate::types::target::ExportTarget::parse(t).is_none()
202            {
203                anyhow::bail!(
204                    "export '{}': unknown target '{t}' (expected: {})",
205                    export.name,
206                    crate::types::target::ExportTarget::valid_target_names()
207                );
208            }
209            let eff_target = target.or_else(|| {
210                export
211                    .target
212                    .as_deref()
213                    .and_then(crate::types::target::ExportTarget::parse)
214            });
215            let config_dir = std::path::Path::new(config_path)
216                .parent()
217                .unwrap_or_else(|| std::path::Path::new("."));
218            match type_report::collect_report(
219                &config,
220                export,
221                &column_overrides,
222                &policy,
223                eff_target,
224                config_dir,
225                params,
226            ) {
227                Ok(report) => {
228                    if report.has_fatal() {
229                        any_fatal = true;
230                    }
231                    if let Some(t) = eff_target
232                        && report.has_target_fail()
233                    {
234                        any_fatal = true;
235                        target_fail_cols += report
236                            .columns
237                            .iter()
238                            .filter(|c| c.target_status == Some(TargetStatus::Fail))
239                            .count();
240                        target_fail_label.get_or_insert(t.label());
241                    }
242                    if json_output {
243                        type_report::print_json(&report)?;
244                    } else {
245                        type_report::print_table(&report, eff_target);
246                    }
247                }
248                Err(e) => {
249                    log::warn!("type report for '{}' failed: {:#}", export.name, e);
250                }
251            }
252        }
253
254        if strict && any_fatal {
255            anyhow::bail!("strict mode: unsafe type mappings found (see report above)");
256        } else if !strict && target_fail_cols > 0 && !json_output {
257            // The table showed "fail ✗" but rc is 0 — say so explicitly. Skipped
258            // under --json so NDJSON output stays one object per line.
259            clean = false;
260            println!();
261            println!(
262                "{}",
263                target_fail_note(target_fail_cols, target_fail_label.unwrap_or("target"))
264            );
265        }
266    }
267
268    if !json_output {
269        // Verdict legend — decode the EFFICIENT/ACCEPTABLE/DEGRADED/UNSAFE words
270        // printed above and reassure that `check` is advisory: never blocks a run.
271        println!();
272        println!(
273            "Verdicts: EFFICIENT > ACCEPTABLE > DEGRADED > UNSAFE — advisory only; the run is never blocked."
274        );
275        if clean {
276            // Keep the ladder going to the final rung instead of ending cold.
277            println!(
278                "Looks good. Next: rivet run -c {config_path} --validate   # export, then verify row counts"
279            );
280        }
281    }
282
283    Ok(())
284}
285
286fn print_diagnostic(diag: &ExportDiagnostic) {
287    println!();
288    println!("Export: {}", diag.export_name);
289    println!("  Strategy:     {}", diag.strategy);
290    println!("  Mode:         {}", diag.mode);
291    if let Some(est) = diag.row_estimate {
292        if est >= 1_000_000 {
293            println!("  Row estimate: ~{}M", est / 1_000_000);
294        } else if est >= 1_000 {
295            println!("  Row estimate: ~{}K", est / 1_000);
296        } else {
297            println!("  Row estimate: ~{}", est);
298        }
299    }
300    if let (Some(min_v), Some(max_v)) = (&diag.cursor_min, &diag.cursor_max) {
301        println!("  Cursor range: {} .. {}", min_v, max_v);
302    }
303    if let Some(col) = &diag.cursor_column {
304        println!("  Cursor col:   {}", col);
305    }
306    // Plain-language access path instead of a raw EXPLAIN node dump
307    // (`Result (cost=0.00..0.01 rows=1 width=36)`). Keyed off the authoritative
308    // `uses_index` bool, gated on `scan_type.is_some()` so engines without an
309    // EXPLAIN probe (MSSQL) stay silent.
310    if diag.scan_type.is_some() {
311        let access = if diag.uses_index {
312            "index scan (the cursor/chunk column is indexed)"
313        } else {
314            "full table scan (no index on the read path)"
315        };
316        println!("  Access:       {access}");
317    }
318    println!("  Verdict:      {}", diag.verdict);
319    println!(
320        "  Recommended:  tuning.profile: {}",
321        diag.recommended_profile
322    );
323    let (par_level, par_reason) = diag.recommended_parallel;
324    if par_level > 1 {
325        println!("  Recommended:  parallel: {} ({})", par_level, par_reason);
326    } else {
327        println!("  Parallelism:  {} ({})", par_level, par_reason);
328    }
329    for w in &diag.warnings {
330        println!("  Warning:      {}", w);
331    }
332    if let Some(suggestion) = &diag.suggestion {
333        println!("  Suggestion:   {}", suggestion);
334    }
335}
336
337#[cfg(test)]
338mod tests {
339    use super::*;
340    use crate::config::{DestinationConfig, DestinationType, ExportConfig, ExportMode, FormatType};
341    use doctor::{
342        categorize_dest_error, categorize_source_error, destination_error_hint, source_error_hint,
343    };
344
345    fn make_export(name: &str, mode: ExportMode, cursor: Option<&str>) -> ExportConfig {
346        // Baseline from the canonical test fixture; override only the fields
347        // these preflight tests vary (mode, cursor, CSV format, query, dest).
348        ExportConfig {
349            mode,
350            cursor_column: cursor.map(|s| s.to_string()),
351            query: Some("SELECT * FROM t".to_string()),
352            format: FormatType::Csv,
353            destination: DestinationConfig {
354                destination_type: DestinationType::Local,
355                path: Some("./out".to_string()),
356                ..Default::default()
357            },
358            ..crate::config::sample_export(name)
359        }
360    }
361
362    // ── L8: 'fail ✗' note when --target FAILs but --strict was not passed ─────
363    // The glyph implies a hard failure; exit is gated only by --strict. The note
364    // tells an operator/CI the exit is 0 so the glyph doesn't mislead.
365    #[test]
366    fn target_fail_note_names_count_target_and_strict_gate() {
367        let note = target_fail_note(2, "bigquery");
368        assert!(note.contains("2 columns FAIL"), "got: {note}");
369        assert!(note.contains("bigquery"), "got: {note}");
370        assert!(note.contains("--strict"), "got: {note}");
371        assert!(note.contains("exit 0"), "got: {note}");
372    }
373
374    #[test]
375    fn target_fail_note_singular_for_one_column() {
376        let note = target_fail_note(1, "duckdb");
377        assert!(note.contains("1 column FAIL"), "got: {note}");
378        assert!(!note.contains("1 columns"), "should be singular: {note}");
379    }
380
381    #[test]
382    fn verdict_small_indexed_with_cursor_is_efficient() {
383        let v = compute_verdict(Some(500_000), true, true);
384        assert!(matches!(v, HealthVerdict::Efficient), "got: {v}");
385    }
386
387    #[test]
388    fn verdict_large_indexed_with_cursor_is_acceptable() {
389        let v = compute_verdict(Some(20_000_000), true, true);
390        assert!(matches!(v, HealthVerdict::Acceptable), "got: {v}");
391    }
392
393    #[test]
394    fn verdict_no_index_no_cursor_is_degraded() {
395        let v = compute_verdict(Some(500_000), false, false);
396        assert!(matches!(v, HealthVerdict::Degraded), "got: {v}");
397    }
398
399    #[test]
400    fn verdict_huge_no_index_is_unsafe() {
401        let v = compute_verdict(Some(100_000_000), false, false);
402        assert!(matches!(v, HealthVerdict::Unsafe), "got: {v}");
403    }
404
405    #[test]
406    fn parse_pg_row_estimate_from_sort_plan() {
407        let plan = "Sort  (cost=12345.67..12456.78 rows=1000455 width=50)\n  ->  Seq Scan on orders  (cost=0.00..8765.43 rows=1000455 width=50)";
408        assert_eq!(parse_pg_row_estimate(plan), Some(1_000_455));
409    }
410
411    #[test]
412    fn parse_pg_row_estimate_from_index_scan() {
413        let plan =
414            "Index Scan using idx_updated on orders  (cost=0.42..81676.36 rows=500000 width=50)";
415        assert_eq!(parse_pg_row_estimate(plan), Some(500_000));
416    }
417
418    #[test]
419    fn extract_scan_type_detects_seq_scan() {
420        let plan = "Sort  (cost=...)\n  ->  Seq Scan on users  (cost=...)";
421        let st = extract_scan_type(plan);
422        assert!(st.contains("Seq Scan"), "expected Seq Scan, got: {st}");
423    }
424
425    #[test]
426    fn extract_scan_type_detects_index_scan() {
427        let plan = "Index Scan using users_pkey on users  (cost=0.42..123.45 rows=100 width=50)";
428        let st = extract_scan_type(plan);
429        assert!(st.contains("Index Scan"), "expected Index Scan, got: {st}");
430    }
431
432    #[test]
433    fn suggestion_for_efficient_verdict_is_none() {
434        let e = make_export("t", ExportMode::Full, None);
435        let s = build_suggestion(&HealthVerdict::Efficient, Some(1000), true, &e);
436        assert!(
437            s.is_none(),
438            "efficient verdict should produce no suggestion"
439        );
440    }
441
442    #[test]
443    fn suggestion_for_degraded_verdict_recommends_safe_profile() {
444        let e = make_export("t", ExportMode::Full, None);
445        let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e);
446        let msg = s.expect("degraded verdict should produce a suggestion");
447        assert!(
448            msg.contains("safe"),
449            "suggestion should recommend safe profile, got: {msg}"
450        );
451    }
452
453    fn src_err(msg: &str) -> &'static str {
454        categorize_source_error(&anyhow::anyhow!("{}", msg))
455    }
456
457    #[test]
458    fn source_password_rejected_is_auth_error() {
459        assert_eq!(
460            src_err("password authentication failed for user \"rivet\""),
461            "auth error"
462        );
463    }
464
465    #[test]
466    fn source_authentication_failed_is_auth_error() {
467        assert_eq!(src_err("FATAL: authentication failed"), "auth error");
468    }
469
470    #[test]
471    fn source_access_denied_is_auth_error() {
472        assert_eq!(
473            src_err("Access denied for user 'rivet'@'localhost'"),
474            "auth error"
475        );
476    }
477
478    #[test]
479    fn source_connection_refused_is_connectivity() {
480        assert_eq!(
481            src_err("connection refused (os error 61)"),
482            "connectivity error"
483        );
484    }
485
486    #[test]
487    fn source_timed_out_is_connectivity() {
488        assert_eq!(src_err("connection timed out"), "connectivity error");
489    }
490
491    #[test]
492    fn source_dns_translate_host_is_connectivity() {
493        assert_eq!(
494            src_err("could not translate host name \"db.bad\" to address"),
495            "connectivity error"
496        );
497    }
498
499    #[test]
500    fn source_name_not_known_is_connectivity() {
501        assert_eq!(src_err("Name or service not known"), "connectivity error");
502    }
503
504    #[test]
505    fn source_unknown_error_is_generic() {
506        assert_eq!(src_err("something totally unexpected"), "error");
507    }
508
509    fn dest_config(dtype: DestinationType) -> DestinationConfig {
510        DestinationConfig {
511            destination_type: dtype,
512            bucket: Some("b".to_string()),
513            ..Default::default()
514        }
515    }
516
517    fn dest_err(msg: &str, dtype: DestinationType) -> &'static str {
518        let cfg = dest_config(dtype);
519        categorize_dest_error(&anyhow::anyhow!("{}", msg), &cfg)
520    }
521
522    fn local_dest(path: &str) -> DestinationConfig {
523        DestinationConfig {
524            destination_type: DestinationType::Local,
525            path: Some(path.to_string()),
526            ..Default::default()
527        }
528    }
529
530    // Regression (doctor-dedup): doctor's inline dedup key omitted `path`,
531    // so two local destinations with different paths collapsed to one entry
532    // and the second was never write-probed. The shared identity must keep
533    // them distinct.
534    #[test]
535    fn destination_identity_distinguishes_local_paths() {
536        assert_ne!(
537            destination_identity(&local_dest("/tmp/a")),
538            destination_identity(&local_dest("/tmp/b")),
539        );
540    }
541
542    #[test]
543    fn destination_identity_collapses_identical_local_destinations() {
544        assert_eq!(
545            destination_identity(&local_dest("/tmp/a")),
546            destination_identity(&local_dest("/tmp/a")),
547        );
548    }
549
550    #[test]
551    fn destination_identity_distinguishes_buckets() {
552        let a = DestinationConfig {
553            bucket: Some("bucket-a".to_string()),
554            ..dest_config(DestinationType::S3)
555        };
556        let b = DestinationConfig {
557            bucket: Some("bucket-b".to_string()),
558            ..dest_config(DestinationType::S3)
559        };
560        assert_ne!(destination_identity(&a), destination_identity(&b));
561    }
562
563    // Same bucket name on different endpoints (e.g. AWS vs MinIO) is two
564    // distinct destinations and must be probed separately.
565    #[test]
566    fn destination_identity_distinguishes_endpoints_for_same_bucket() {
567        let aws = dest_config(DestinationType::S3);
568        let minio = DestinationConfig {
569            endpoint: Some("http://localhost:9000".to_string()),
570            ..dest_config(DestinationType::S3)
571        };
572        assert_ne!(destination_identity(&aws), destination_identity(&minio));
573    }
574
575    #[test]
576    fn dest_credential_loading_is_auth_error() {
577        assert_eq!(
578            dest_err(
579                "loading credential to sign http request",
580                DestinationType::Gcs
581            ),
582            "auth error"
583        );
584    }
585
586    #[test]
587    fn dest_permission_denied_is_auth_error() {
588        assert_eq!(
589            dest_err("permission denied on resource bucket", DestinationType::S3),
590            "auth error"
591        );
592    }
593
594    #[test]
595    fn dest_forbidden_is_auth_error() {
596        assert_eq!(
597            dest_err("403 Forbidden", DestinationType::Gcs),
598            "auth error"
599        );
600    }
601
602    #[test]
603    fn dest_unauthorized_is_auth_error() {
604        assert_eq!(
605            dest_err("401 Unauthorized", DestinationType::S3),
606            "auth error"
607        );
608    }
609
610    #[test]
611    fn dest_invalid_grant_is_auth_error() {
612        assert_eq!(
613            dest_err(
614                "invalid_grant: token has been revoked",
615                DestinationType::Gcs
616            ),
617            "auth error"
618        );
619    }
620
621    #[test]
622    fn dest_nosuchbucket_s3_is_bucket_not_found() {
623        assert_eq!(
624            dest_err(
625                "NoSuchBucket: the specified bucket does not exist",
626                DestinationType::S3
627            ),
628            "bucket not found"
629        );
630    }
631
632    #[test]
633    fn dest_not_found_gcs_is_bucket_not_found() {
634        assert_eq!(
635            dest_err("bucket not found (404)", DestinationType::Gcs),
636            "bucket not found"
637        );
638    }
639
640    #[test]
641    fn dest_not_found_local_is_path_not_found() {
642        assert_eq!(
643            dest_err("path not found: /tmp/missing", DestinationType::Local),
644            "path not found"
645        );
646    }
647
648    #[test]
649    fn dest_connection_refused_is_connectivity() {
650        assert_eq!(
651            dest_err("connection refused to endpoint", DestinationType::S3),
652            "connectivity error"
653        );
654    }
655
656    #[test]
657    fn dest_dns_error_is_connectivity() {
658        assert_eq!(
659            dest_err("dns error: failed to lookup address", DestinationType::S3),
660            "connectivity error"
661        );
662    }
663
664    #[test]
665    fn dest_timed_out_is_connectivity() {
666        assert_eq!(
667            dest_err("request timed out after 30s", DestinationType::Gcs),
668            "connectivity error"
669        );
670    }
671
672    #[test]
673    fn dest_unknown_error_is_generic() {
674        assert_eq!(
675            dest_err("something else entirely", DestinationType::S3),
676            "error"
677        );
678    }
679
680    #[test]
681    fn strategy_full_scan() {
682        let e = make_export("t", ExportMode::Full, None);
683        assert_eq!(derive_strategy(&e), "full-scan");
684    }
685
686    #[test]
687    fn strategy_full_parallel() {
688        let mut e = make_export("t", ExportMode::Full, None);
689        e.parallel = 4;
690        assert_eq!(derive_strategy(&e), "full-parallel(4)");
691    }
692
693    #[test]
694    fn strategy_incremental() {
695        let e = make_export("t", ExportMode::Incremental, Some("updated_at"));
696        assert_eq!(derive_strategy(&e), "incremental(updated_at)");
697    }
698
699    #[test]
700    fn strategy_chunked() {
701        let mut e = make_export("t", ExportMode::Chunked, None);
702        e.chunk_column = Some("id".to_string());
703        e.chunk_size = 50_000;
704        assert_eq!(derive_strategy(&e), "chunked(id, size=50000)");
705    }
706
707    #[test]
708    fn strategy_chunked_parallel() {
709        let mut e = make_export("t", ExportMode::Chunked, None);
710        e.chunk_column = Some("id".to_string());
711        e.chunk_size = 50_000;
712        e.parallel = 3;
713        assert_eq!(derive_strategy(&e), "chunked-parallel(id, size=50000, p=3)");
714    }
715
716    #[test]
717    fn strategy_time_window() {
718        let mut e = make_export("t", ExportMode::TimeWindow, None);
719        e.time_column = Some("created_at".to_string());
720        e.days_window = Some(7);
721        assert_eq!(derive_strategy(&e), "time-window(created_at, 7d)");
722    }
723
724    #[test]
725    fn profile_small_indexed_is_fast() {
726        let e = make_export("t", ExportMode::Full, None);
727        assert_eq!(recommend_profile(Some(500_000), true, &e), "fast");
728    }
729
730    #[test]
731    fn profile_medium_indexed_is_balanced() {
732        let e = make_export("t", ExportMode::Full, None);
733        assert_eq!(recommend_profile(Some(5_000_000), true, &e), "balanced");
734    }
735
736    #[test]
737    fn profile_large_indexed_is_safe() {
738        let e = make_export("t", ExportMode::Full, None);
739        assert_eq!(recommend_profile(Some(50_000_000), true, &e), "safe");
740    }
741
742    #[test]
743    fn profile_small_no_index_is_balanced() {
744        let e = make_export("t", ExportMode::Full, None);
745        assert_eq!(recommend_profile(Some(50_000), false, &e), "balanced");
746    }
747
748    #[test]
749    fn profile_small_no_index_parallel_is_safe() {
750        let mut e = make_export("t", ExportMode::Full, None);
751        e.parallel = 4;
752        assert_eq!(recommend_profile(Some(50_000), false, &e), "safe");
753    }
754
755    #[test]
756    fn profile_medium_no_index_is_balanced() {
757        let e = make_export("t", ExportMode::Full, None);
758        assert_eq!(recommend_profile(Some(500_000), false, &e), "balanced");
759    }
760
761    #[test]
762    fn profile_large_no_index_is_safe() {
763        let e = make_export("t", ExportMode::Full, None);
764        assert_eq!(recommend_profile(Some(5_000_000), false, &e), "safe");
765    }
766
767    #[test]
768    fn sparse_range_warning_when_very_sparse() {
769        let mut e = make_export("t", ExportMode::Chunked, None);
770        e.chunk_column = Some("id".to_string());
771        e.chunk_size = 100_000;
772        let w = check_sparse_range(&e, Some(100_000), Some("1"), Some("10000000"));
773        assert!(w.is_some(), "should warn about sparse range");
774        let msg = w.unwrap();
775        assert!(msg.contains("Sparse key range"), "got: {msg}");
776        assert!(msg.contains("empty"), "got: {msg}");
777    }
778
779    #[test]
780    fn sparse_range_no_warning_when_dense() {
781        let mut e = make_export("t", ExportMode::Chunked, None);
782        e.chunk_column = Some("id".to_string());
783        e.chunk_size = 100_000;
784        let w = check_sparse_range(&e, Some(100_000), Some("1"), Some("100000"));
785        assert!(w.is_none(), "should not warn for dense range");
786    }
787
788    #[test]
789    fn sparse_range_skipped_when_chunk_dense() {
790        let mut e = make_export("t", ExportMode::Chunked, None);
791        e.chunk_column = Some("id".to_string());
792        e.chunk_dense = true;
793        e.chunk_size = 100_000;
794        let w = check_sparse_range(&e, Some(100_000), Some("1"), Some("10000000"));
795        assert!(
796            w.is_none(),
797            "chunk_dense uses ordinals, not physical id span"
798        );
799    }
800
801    #[test]
802    fn dense_surrogate_warning_when_chunk_dense_builtin() {
803        let mut e = make_export("t", ExportMode::Chunked, None);
804        e.chunk_column = Some("id".to_string());
805        e.chunk_dense = true;
806        e.query = Some("SELECT id FROM orders".to_string());
807        let w = check_dense_surrogate_cost(&e);
808        assert!(w.is_some(), "should warn about built-in ROW_NUMBER cost");
809        assert!(w.unwrap().contains("global sort"));
810    }
811
812    #[test]
813    fn sparse_range_not_triggered_for_non_chunked() {
814        let e = make_export("t", ExportMode::Full, None);
815        let w = check_sparse_range(&e, Some(100), Some("1"), Some("1000000"));
816        assert!(w.is_none(), "should not warn for non-chunked mode");
817    }
818
819    #[test]
820    fn dense_surrogate_warning_with_row_number() {
821        let mut e = make_export("t", ExportMode::Chunked, None);
822        e.chunk_column = Some("rn".to_string());
823        e.query = Some("SELECT *, ROW_NUMBER() OVER (ORDER BY id) AS rn FROM orders".to_string());
824        let w = check_dense_surrogate_cost(&e);
825        assert!(w.is_some(), "should warn about ROW_NUMBER cost");
826        assert!(w.unwrap().contains("global sort"));
827    }
828
829    #[test]
830    fn no_dense_surrogate_warning_without_row_number() {
831        let mut e = make_export("t", ExportMode::Chunked, None);
832        e.chunk_column = Some("id".to_string());
833        e.query = Some("SELECT * FROM orders".to_string());
834        let w = check_dense_surrogate_cost(&e);
835        assert!(w.is_none());
836    }
837
838    #[test]
839    fn no_dense_surrogate_warning_for_non_chunked() {
840        let mut e = make_export("t", ExportMode::Full, None);
841        e.query = Some("SELECT ROW_NUMBER() OVER () AS rn FROM t".to_string());
842        let w = check_dense_surrogate_cost(&e);
843        assert!(w.is_none(), "should not warn for non-chunked mode");
844    }
845
846    #[test]
847    fn parallel_memory_warning_large_dataset() {
848        let mut e = make_export("t", ExportMode::Chunked, None);
849        e.parallel = 4;
850        let w = check_parallel_memory_risk(&e, Some(10_000_000));
851        assert!(w.is_some(), "should warn about memory risk");
852        let msg = w.unwrap();
853        assert!(msg.contains("Parallel=4"), "got: {msg}");
854        assert!(msg.contains("memory"), "got: {msg}");
855    }
856
857    #[test]
858    fn no_parallel_memory_warning_small_dataset() {
859        let mut e = make_export("t", ExportMode::Chunked, None);
860        e.parallel = 4;
861        let w = check_parallel_memory_risk(&e, Some(1_000));
862        assert!(w.is_none(), "should not warn for small dataset");
863    }
864
865    #[test]
866    fn no_parallel_memory_warning_single_worker() {
867        let e = make_export("t", ExportMode::Full, None);
868        let w = check_parallel_memory_risk(&e, Some(100_000_000));
869        assert!(w.is_none(), "should not warn when parallel=1");
870    }
871
872    #[test]
873    fn suggestion_degraded_full_recommends_incremental() {
874        let e = make_export("t", ExportMode::Full, None);
875        let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e).unwrap();
876        assert!(s.contains("incremental"), "got: {s}");
877    }
878
879    #[test]
880    fn suggestion_degraded_chunked_recommends_index() {
881        let mut e = make_export("t", ExportMode::Chunked, None);
882        e.chunk_column = Some("id".to_string());
883        let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e).unwrap();
884        assert!(s.contains("index on 'id'"), "got: {s}");
885    }
886
887    #[test]
888    fn suggestion_degraded_time_window_recommends_index() {
889        let mut e = make_export("t", ExportMode::TimeWindow, None);
890        e.time_column = Some("created_at".to_string());
891        e.days_window = Some(7);
892        let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e).unwrap();
893        assert!(s.contains("index on 'created_at'"), "got: {s}");
894    }
895
896    #[test]
897    fn suggestion_unsafe_full_recommends_incremental() {
898        let e = make_export("t", ExportMode::Full, None);
899        let s = build_suggestion(&HealthVerdict::Unsafe, Some(100_000_000), false, &e).unwrap();
900        assert!(s.contains("incremental"), "got: {s}");
901    }
902
903    #[test]
904    fn suggestion_unsafe_chunked_recommends_index_and_parallel() {
905        let mut e = make_export("t", ExportMode::Chunked, None);
906        e.chunk_column = Some("id".to_string());
907        let s = build_suggestion(&HealthVerdict::Unsafe, Some(100_000_000), false, &e).unwrap();
908        assert!(s.contains("index on 'id'"), "got: {s}");
909        assert!(s.contains("parallel"), "got: {s}");
910    }
911
912    #[test]
913    fn suggestion_unsafe_incremental_recommends_index_on_cursor() {
914        let e = make_export("t", ExportMode::Incremental, Some("updated_at"));
915        let s = build_suggestion(&HealthVerdict::Unsafe, Some(100_000_000), false, &e).unwrap();
916        assert!(s.contains("index on 'updated_at'"), "got: {s}");
917    }
918
919    #[test]
920    fn suggestion_acceptable_large_full_recommends_incremental() {
921        let e = make_export("t", ExportMode::Full, None);
922        let s = build_suggestion(&HealthVerdict::Acceptable, Some(20_000_000), true, &e).unwrap();
923        assert!(s.contains("incremental"), "got: {s}");
924    }
925
926    #[test]
927    fn parallel_only_for_chunked_mode() {
928        let e = make_export("t", ExportMode::Full, None);
929        let (level, _) = recommend_parallelism(&e, Some(1_000_000), true);
930        assert_eq!(level, 1, "non-chunked mode should recommend 1");
931    }
932
933    #[test]
934    fn parallel_small_dataset_is_one() {
935        let mut e = make_export("t", ExportMode::Chunked, None);
936        e.chunk_column = Some("id".to_string());
937        let (level, _) = recommend_parallelism(&e, Some(10_000), true);
938        assert_eq!(level, 1, "small dataset should recommend 1");
939    }
940
941    #[test]
942    fn parallel_moderate_indexed_is_two() {
943        let mut e = make_export("t", ExportMode::Chunked, None);
944        e.chunk_column = Some("id".to_string());
945        let (level, _) = recommend_parallelism(&e, Some(200_000), true);
946        assert_eq!(level, 2, "moderate indexed dataset should recommend 2");
947    }
948
949    #[test]
950    fn parallel_large_indexed_is_four() {
951        let mut e = make_export("t", ExportMode::Chunked, None);
952        e.chunk_column = Some("id".to_string());
953        let (level, _) = recommend_parallelism(&e, Some(2_000_000), true);
954        assert_eq!(level, 4, "large indexed dataset should recommend 4");
955    }
956
957    #[test]
958    fn parallel_no_index_large_is_one() {
959        let mut e = make_export("t", ExportMode::Chunked, None);
960        e.chunk_column = Some("id".to_string());
961        let (level, reason) = recommend_parallelism(&e, Some(10_000_000), false);
962        assert_eq!(level, 1, "no index + large should recommend 1");
963        assert!(reason.contains("no index"), "got: {reason}");
964    }
965
966    #[test]
967    fn parallel_no_index_moderate_is_conservative() {
968        let mut e = make_export("t", ExportMode::Chunked, None);
969        e.chunk_column = Some("id".to_string());
970        let (level, _) = recommend_parallelism(&e, Some(200_000), false);
971        assert_eq!(
972            level, 2,
973            "no index + moderate should recommend 2 (conservative)"
974        );
975    }
976
977    #[test]
978    fn suggestion_acceptable_large_chunked_recommends_parallel() {
979        let mut e = make_export("t", ExportMode::Chunked, None);
980        e.chunk_column = Some("id".to_string());
981        let s = build_suggestion(&HealthVerdict::Acceptable, Some(20_000_000), true, &e).unwrap();
982        assert!(s.contains("parallel"), "got: {s}");
983    }
984
985    #[test]
986    fn connection_limit_warn_when_parallel_meets_max() {
987        let w = check_connection_limit(20, Some(20));
988        assert!(w.is_some(), "should warn when parallel == max_connections");
989        let msg = w.unwrap();
990        assert!(msg.contains("max_connections=20"), "got: {msg}");
991        assert!(msg.contains("parallel=20"), "got: {msg}");
992    }
993
994    #[test]
995    fn connection_limit_warn_when_parallel_exceeds_max() {
996        let w = check_connection_limit(100, Some(20));
997        assert!(w.is_some(), "should warn when parallel > max_connections");
998        let msg = w.unwrap();
999        assert!(msg.contains("max_connections=20"), "got: {msg}");
1000    }
1001
1002    #[test]
1003    fn connection_limit_no_warn_when_parallel_below_max() {
1004        let w = check_connection_limit(4, Some(100));
1005        assert!(
1006            w.is_none(),
1007            "should not warn when parallel << max_connections"
1008        );
1009    }
1010
1011    #[test]
1012    fn connection_limit_no_warn_when_parallel_is_one() {
1013        let w = check_connection_limit(1, Some(5));
1014        assert!(
1015            w.is_none(),
1016            "single worker never triggers connection warning"
1017        );
1018    }
1019
1020    #[test]
1021    fn connection_limit_skipped_note_when_max_unknown_and_parallel_gt_one() {
1022        let w = check_connection_limit(100, None);
1023        assert!(w.is_some(), "should note that check was skipped");
1024        let msg = w.unwrap();
1025        assert!(msg.contains("skipped"), "got: {msg}");
1026    }
1027
1028    #[test]
1029    fn connection_limit_no_note_when_max_unknown_and_parallel_is_one() {
1030        let w = check_connection_limit(1, None);
1031        assert!(
1032            w.is_none(),
1033            "single worker never triggers connection warning"
1034        );
1035    }
1036
1037    #[test]
1038    fn connection_limit_suggests_headroom() {
1039        let w = check_connection_limit(25, Some(20)).unwrap();
1040        // Suggested safe max should be max_connections - 3 = 17
1041        assert!(
1042            w.contains("17"),
1043            "should suggest leaving headroom, got: {w}"
1044        );
1045    }
1046
1047    // ── v0.7.4: actionable hints next to categorised errors ───────────
1048
1049    fn src_hint(msg: &str, st: SourceType) -> Option<&'static str> {
1050        let err = anyhow::anyhow!("{}", msg);
1051        let cat = categorize_source_error(&err);
1052        source_error_hint(cat, &err, &st)
1053    }
1054
1055    fn dest_hint(msg: &str, dt: DestinationType) -> Option<&'static str> {
1056        let err = anyhow::anyhow!("{}", msg);
1057        let dest = DestinationConfig {
1058            destination_type: dt,
1059            bucket: Some("b".into()),
1060            ..Default::default()
1061        };
1062        let cat = categorize_dest_error(&err, &dest);
1063        destination_error_hint(cat, &dest)
1064    }
1065
1066    #[test]
1067    fn source_tls_handshake_returns_pg_specific_tls_hint() {
1068        let h = src_hint("TLS handshake failed", SourceType::Postgres).expect("hint");
1069        assert!(h.contains("tls.mode") && h.contains("ca_file"), "got: {h}");
1070    }
1071
1072    #[test]
1073    fn source_tls_handshake_returns_mysql_specific_tls_hint() {
1074        let h = src_hint("certificate verify failed", SourceType::Mysql).expect("hint");
1075        assert!(h.contains("tls.mode"), "got: {h}");
1076    }
1077
1078    #[test]
1079    fn source_auth_error_postgres_mentions_pg_hba() {
1080        let h = src_hint("password authentication failed", SourceType::Postgres).expect("hint");
1081        assert!(h.contains("pg_hba") && h.contains("SELECT"), "got: {h}");
1082    }
1083
1084    #[test]
1085    fn source_auth_error_mysql_mentions_grant() {
1086        let h = src_hint(
1087            "Access denied for user 'rivet'@'localhost'",
1088            SourceType::Mysql,
1089        )
1090        .expect("hint");
1091        assert!(h.contains("GRANT") && h.contains("FLUSH"), "got: {h}");
1092    }
1093
1094    #[test]
1095    fn source_connectivity_error_mentions_bastion_and_network() {
1096        let h = src_hint("connection refused", SourceType::Postgres).expect("hint");
1097        assert!(h.contains("bastion") || h.contains("VPN"), "got: {h}");
1098    }
1099
1100    #[test]
1101    fn source_unknown_error_returns_no_hint() {
1102        // Generic "error" category should yield no hint — better to
1103        // print the raw driver message than to mislead.
1104        let h = src_hint("totally unexpected", SourceType::Postgres);
1105        assert!(h.is_none(), "unknown errors should not produce a hint");
1106    }
1107
1108    #[test]
1109    fn dest_s3_auth_error_names_concrete_actions() {
1110        let h = dest_hint("permission denied", DestinationType::S3).expect("hint");
1111        assert!(
1112            h.contains("s3:PutObject") && h.contains("cloud-permissions"),
1113            "got: {h}"
1114        );
1115    }
1116
1117    #[test]
1118    fn dest_gcs_auth_error_names_concrete_actions() {
1119        let h = dest_hint("403 Forbidden", DestinationType::Gcs).expect("hint");
1120        assert!(
1121            h.contains("storage.objects") && h.contains("cloud-permissions"),
1122            "got: {h}"
1123        );
1124    }
1125
1126    #[test]
1127    fn categorize_dest_error_sas_expired_message_returns_sas_expired_category() {
1128        // Guard the load-bearing ordering in categorize_dest_error: the
1129        // "sas expired" early-return must fire before the generic "token"
1130        // branch, or destination_error_hint produces the wrong hint.
1131        // This test pins the *category string*, not just the final hint text.
1132        let err = anyhow::anyhow!(
1133            "Azure SAS token already expired (se=2024-01-01T00:00:00Z). Generate a new SAS and re-export."
1134        );
1135        let dest = DestinationConfig {
1136            destination_type: DestinationType::Azure,
1137            bucket: Some("c".into()),
1138            ..Default::default()
1139        };
1140        let cat = categorize_dest_error(&err, &dest);
1141        assert_eq!(
1142            cat, "sas expired",
1143            "expired-SAS error must categorise as 'sas expired', not '{cat}' — ordering in categorize_dest_error is load-bearing"
1144        );
1145    }
1146
1147    #[test]
1148    fn dest_azure_sas_expired_returns_regenerate_hint() {
1149        // The Azure preflight (v0.7.4) bails with "expired (se=…)" —
1150        // the hint must steer the operator to `az storage container
1151        // generate-sas` not "your IAM role is broken".
1152        let h = dest_hint(
1153            "Azure SAS token already expired (se=2024-01-01T00:00:00Z)",
1154            DestinationType::Azure,
1155        )
1156        .expect("hint");
1157        assert!(
1158            h.contains("generate-sas") && h.contains("AZURE_STORAGE_SAS_TOKEN"),
1159            "got: {h}"
1160        );
1161    }
1162
1163    #[test]
1164    fn dest_s3_bucket_not_found_says_no_auto_create() {
1165        let h = dest_hint("NoSuchBucket", DestinationType::S3).expect("hint");
1166        assert!(
1167            h.contains("does NOT auto-create") && h.contains("aws s3 mb"),
1168            "got: {h}"
1169        );
1170    }
1171
1172    #[test]
1173    fn dest_s3_connectivity_error_warns_about_region_mismatch() {
1174        let h = dest_hint("dns error", DestinationType::S3).expect("hint");
1175        assert!(h.contains("region") || h.contains("endpoint"), "got: {h}");
1176    }
1177}