Skip to main content

rivet/preflight/
mod.rs

1mod analysis;
2pub(crate) mod cursor_expr;
3mod doctor;
4mod mssql;
5mod mysql;
6mod postgres;
7mod schema_error;
8pub mod type_report;
9
10pub(crate) use analysis::chunk_sparsity_from_counts;
11// Re-exported so the plan layer's strategy explainer can ground its "≥ threshold"
12// narrative on the same constant `check`/`init` use, not a hard-coded copy.
13pub(crate) use analysis::SMALL_TABLE_ROW_THRESHOLD;
14#[cfg(test)]
15use analysis::{
16    build_suggestion, check_connection_limit, check_dense_surrogate_cost,
17    check_parallel_memory_risk, check_sparse_range, compute_verdict, derive_strategy,
18    recommend_parallelism, recommend_profile,
19};
20#[allow(unused_imports)]
21pub use doctor::doctor;
22// Reused at the run-time connect seam (src/pipeline/single.rs) so a failed
23// `rivet run` carries the same category + remediation hint `rivet doctor` gives.
24pub(crate) use doctor::{categorize_source_error, source_error_hint};
25#[cfg(test)]
26use postgres::{extract_scan_type, parse_pg_row_estimate};
27
28use serde::Serialize;
29
30use crate::config::{Config, ExportConfig, SourceType};
31use crate::error::Result;
32use crate::types::policy::TypePolicy;
33use crate::types::target::{ExportTarget, TargetStatus};
34
35/// Serializes lowercase ("efficient"/"acceptable"/"degraded"/"unsafe") so
36/// `rivet check --json` consumers (CI gates, orchestrators) match on a stable,
37/// case-insensitive token rather than the SHOUTING `Display` form used in the
38/// human-readable table.
39#[derive(Debug, Serialize)]
40#[serde(rename_all = "lowercase")]
41pub enum HealthVerdict {
42    Efficient,
43    Acceptable,
44    Degraded,
45    Unsafe,
46}
47
48impl std::fmt::Display for HealthVerdict {
49    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50        match self {
51            Self::Efficient => write!(f, "EFFICIENT"),
52            Self::Acceptable => write!(f, "ACCEPTABLE"),
53            Self::Degraded => write!(f, "DEGRADED"),
54            Self::Unsafe => write!(f, "UNSAFE"),
55        }
56    }
57}
58
59pub(crate) struct ExportDiagnostic {
60    pub export_name: String,
61    pub strategy: String,
62    pub mode: String,
63    pub cursor_column: Option<String>,
64    pub row_estimate: Option<i64>,
65    /// Average bytes per row from catalog/plan stats (PG EXPLAIN `width`,
66    /// MSSQL `dm_db_partition_stats` pages/row). `None` when unavailable
67    /// (e.g. MySQL, with no trustworthy scan-free estimate). Feeds the
68    /// oversized-chunk warning and is shown as the `Row width` line.
69    pub avg_row_bytes: Option<i64>,
70    pub cursor_min: Option<String>,
71    pub cursor_max: Option<String>,
72    pub scan_type: Option<String>,
73    pub uses_index: bool,
74    pub verdict: HealthVerdict,
75    pub recommended_profile: &'static str,
76    pub recommended_parallel: (u32, &'static str),
77    pub warnings: Vec<analysis::Warning>,
78    pub suggestion: Option<String>,
79}
80
81// Hand-rolled `Serialize` (rather than `#[derive]`) so the JSON shape stays
82// fully under our control without touching the three engine construction sites:
83//   - `recommended_parallel` (a raw `(u32, &str)`) becomes a self-describing
84//     `{ "level": N, "reason": "…" }` object instead of a positional 2-array;
85//   - a derived `capabilities` object ({uses_index, has_cursor, can_parallel})
86//     is computed from the sibling fields at serialization time — no stored
87//     field, no extra probe;
88//   - `None` optionals are skipped to keep the object lean for CI consumers.
89// `HealthVerdict` rides its own `#[derive(Serialize)]` (lowercase tokens).
90impl Serialize for ExportDiagnostic {
91    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
92    where
93        S: serde::Serializer,
94    {
95        use serde::ser::SerializeMap;
96
97        #[derive(Serialize)]
98        struct RecommendedParallel {
99            level: u32,
100            reason: &'static str,
101        }
102        #[derive(Serialize)]
103        struct Capabilities {
104            uses_index: bool,
105            has_cursor: bool,
106            can_parallel: bool,
107        }
108
109        let mut map = serializer.serialize_map(None)?;
110        map.serialize_entry("export_name", &self.export_name)?;
111        map.serialize_entry("strategy", &self.strategy)?;
112        map.serialize_entry("mode", &self.mode)?;
113        if let Some(v) = &self.cursor_column {
114            map.serialize_entry("cursor_column", v)?;
115        }
116        if let Some(v) = &self.row_estimate {
117            map.serialize_entry("row_estimate", v)?;
118        }
119        if let Some(v) = &self.avg_row_bytes {
120            map.serialize_entry("avg_row_bytes", v)?;
121        }
122        if let Some(v) = &self.cursor_min {
123            map.serialize_entry("cursor_min", v)?;
124        }
125        if let Some(v) = &self.cursor_max {
126            map.serialize_entry("cursor_max", v)?;
127        }
128        if let Some(v) = &self.scan_type {
129            map.serialize_entry("scan_type", v)?;
130        }
131        map.serialize_entry("uses_index", &self.uses_index)?;
132        map.serialize_entry("verdict", &self.verdict)?;
133        map.serialize_entry("recommended_profile", &self.recommended_profile)?;
134        map.serialize_entry(
135            "recommended_parallel",
136            &RecommendedParallel {
137                level: self.recommended_parallel.0,
138                reason: self.recommended_parallel.1,
139            },
140        )?;
141        map.serialize_entry("warnings", &self.warnings)?;
142        if let Some(v) = &self.suggestion {
143            map.serialize_entry("suggestion", v)?;
144        }
145        map.serialize_entry(
146            "capabilities",
147            &Capabilities {
148                uses_index: self.uses_index,
149                has_cursor: self.cursor_column.is_some(),
150                can_parallel: self.recommended_parallel.0 > 1,
151            },
152        )?;
153        map.end()
154    }
155}
156
157/// Return the diagnostic for a single export without printing anything.
158///
159/// Used by `rivet plan` to capture preflight data into a `PlanArtifact`.
160pub(crate) fn get_export_diagnostic(
161    config: &Config,
162    export: &ExportConfig,
163) -> Result<ExportDiagnostic> {
164    let url = config.source.resolve_url()?;
165    let tls = config.source.tls.as_ref();
166    crate::source::warn_if_tls_disabled(&config.source);
167    match config.source.source_type {
168        SourceType::Postgres => postgres::diagnose_export_pg(&url, tls, export),
169        SourceType::Mysql => mysql::diagnose_export_mysql(&url, tls, export),
170        SourceType::Mssql => mssql::diagnose_export_mssql(&url, tls, export),
171    }
172}
173
174/// Dedup identity for a destination, shared by `check`'s credential probe
175/// and `doctor`'s write probe. Must include every field that changes where
176/// a probe lands — notably `path`, so two local destinations with different
177/// paths are probed separately. Keeping one helper prevents the two call
178/// sites from drifting apart (doctor's inline copy once omitted `path` and
179/// silently skipped the second local destination).
180fn destination_identity(d: &crate::config::DestinationConfig) -> String {
181    format!(
182        "{:?}:{}:{}:{}",
183        d.destination_type,
184        d.bucket.as_deref().unwrap_or("-"),
185        d.endpoint.as_deref().unwrap_or("-"),
186        d.path.as_deref().unwrap_or("-"),
187    )
188}
189
190/// One-line note for the "fail ✗ but rc 0" case: a column rendered `fail ✗`
191/// for `--target` does NOT gate the exit code unless `--strict` is also passed
192/// (that is the gate by design). Without this note an operator or CI reading
193/// the glyph alone would wrongly assume a non-zero exit. Pure so the exact text
194/// is unit-tested.
195fn target_fail_note(n: usize, target_label: &str) -> String {
196    let col = if n == 1 { "column" } else { "columns" };
197    format!(
198        "Note: {n} {col} FAIL {target_label} compatibility; exit code is gated only with --strict (currently exit 0)"
199    )
200}
201
202/// Build one [`ExportDiagnostic`] per export via `diagnose`, collecting them (or
203/// short-circuiting on the first error). Single-sources the connect → loop →
204/// return contract every engine's `check_*` shares — only the per-export
205/// `diagnose` call differs — so the deferred print-vs-collect decision lives in
206/// one place rather than triplicated across postgres / mysql / mssql.
207pub(super) fn collect_diagnostics<F>(
208    exports: &[&ExportConfig],
209    mut diagnose: F,
210) -> Result<Vec<ExportDiagnostic>>
211where
212    F: FnMut(&ExportConfig) -> Result<ExportDiagnostic>,
213{
214    exports.iter().map(|&e| diagnose(e)).collect()
215}
216
217pub fn check(
218    config_path: &str,
219    export_name: Option<&str>,
220    params: Option<&std::collections::HashMap<String, String>>,
221    show_type_report: bool,
222    strict: bool,
223    json_output: bool,
224    target: Option<ExportTarget>,
225) -> Result<()> {
226    let config = Config::load_with_params(config_path, params)?;
227
228    let exports: Vec<&ExportConfig> = if let Some(name) = export_name {
229        let e = config
230            .exports
231            .iter()
232            .find(|e| e.name == name)
233            .ok_or_else(|| anyhow::anyhow!("export '{}' not found in config", name))?;
234        vec![e]
235    } else {
236        config.exports.iter().collect()
237    };
238
239    let url = config.source.resolve_url()?;
240    let tls = config.source.tls.as_ref();
241    // Surface the plaintext-transport warning at preflight time too —
242    // operators should hear it from `rivet check` before they wait
243    // through a full `rivet run` to learn the same thing. `Once` inside
244    // the helper keeps emission to one line per process even when both
245    // `check` and `run` flow through it.
246    crate::source::warn_if_tls_disabled(&config.source);
247    // Each engine connects once and returns one diagnostic per export without
248    // printing. Rendering is decided here: TEXT (the per-export table) for the
249    // default human path, or — under `--json` — the diagnostic is merged into
250    // each export's type-report JSON object below (see the `if show_type_report`
251    // block). `get_export_diagnostic` already proved the diag is computable
252    // without printing; this is the multi-export variant of that path.
253    let diagnostics: Vec<ExportDiagnostic> = match config.source.source_type {
254        SourceType::Postgres => postgres::check_postgres(&url, tls, &exports)?,
255        SourceType::Mysql => mysql::check_mysql(&url, tls, &exports)?,
256        SourceType::Mssql => mssql::check_mssql(&url, tls, &exports)?,
257    };
258    if !json_output {
259        for diag in &diagnostics {
260            print_diagnostic(diag);
261        }
262    } else if !show_type_report {
263        // `--json` WITHOUT a type report (the CLI forces `show_type_report` on
264        // under `--json`, so this only fires if `check` is called directly with
265        // `json_output=true, show_type_report=false`): there is no per-export
266        // type-report object to nest the diagnostic into, so emit the diagnostic
267        // alone — still NDJSON, one object per export per line. Keeps the
268        // verdict from being silently dropped regardless of caller.
269        for diag in &diagnostics {
270            println!("{}", serde_json::to_string(diag)?);
271        }
272    }
273    // Under `--json` WITH a type report, the diagnostics are emitted nested
274    // inside each export's type-report object (the `show_type_report` block)
275    // rather than as a standalone array — see the design note there. Built only
276    // on the `--json` path (empty otherwise) since the TEXT path printed above.
277    let diag_by_export: std::collections::HashMap<&str, &ExportDiagnostic> = if json_output {
278        diagnostics
279            .iter()
280            .map(|d| (d.export_name.as_str(), d))
281            .collect()
282    } else {
283        std::collections::HashMap::new()
284    };
285
286    // Destination credential-resolution preflight.  Until 0.7.6 `check` only
287    // probed the source: a config with `AWS_ACCESS_KEY_ID` unset would pass
288    // `rivet check` (rc=0) and then explode on `run`, while `rivet doctor`
289    // caught it.  We don't issue a write-probe here (that is `doctor`'s job
290    // and has side effects) — but we *do* call `create_destination`, which
291    // resolves env vars / credentials_file existence at construction time.
292    // Each unique destination is probed once per `check` to keep multi-export
293    // configs cheap.
294    let mut seen_destinations: std::collections::HashSet<String> = std::collections::HashSet::new();
295    for export in &exports {
296        let dest_key = destination_identity(&export.destination);
297        if !seen_destinations.insert(dest_key) {
298            continue;
299        }
300        let expanded = crate::plan::build::expand_destination_templates(
301            export.destination.clone(),
302            &export.name,
303        );
304        crate::destination::create_destination(&expanded).map_err(|e| {
305            anyhow::anyhow!(
306                "export '{}': destination preflight failed: {:#}",
307                export.name,
308                e
309            )
310        })?;
311    }
312
313    // Whether the check ends clean (no strict failure, no target-fail column).
314    // Stays true for the default `rivet check` (no type report), so the "Next:
315    // rivet run" pointer still fires.
316    let mut clean = true;
317
318    if show_type_report {
319        let policy = if strict {
320            TypePolicy::strict()
321        } else {
322            TypePolicy::warn_only()
323        };
324
325        let mut any_fatal = false;
326        // Count hard target-FAIL columns (and remember which target) so that —
327        // when --strict was NOT passed and the exit code is therefore 0 — we can
328        // print a note. The "fail ✗" glyph in the table implies a hard failure,
329        // but exit is gated only by --strict; without this note an operator or CI
330        // reading the glyph alone would be misled into thinking rc != 0.
331        let mut target_fail_cols = 0usize;
332        let mut target_fail_label: Option<&'static str> = None;
333        for export in &exports {
334            let column_overrides =
335                crate::plan::parse_column_overrides_pub(&export.columns, &export.name)?;
336            // CLI `--target` wins; otherwise fall back to the per-export
337            // `target:` from the config (slice #2a). A declared-but-unknown
338            // target is a loud error — never silently ignored.
339            if let Some(t) = export.target.as_deref()
340                && crate::types::target::ExportTarget::parse(t).is_none()
341            {
342                anyhow::bail!(
343                    "export '{}': unknown target '{t}' (expected: {})",
344                    export.name,
345                    crate::types::target::ExportTarget::valid_target_names()
346                );
347            }
348            let eff_target = target.or_else(|| {
349                export
350                    .target
351                    .as_deref()
352                    .and_then(crate::types::target::ExportTarget::parse)
353            });
354            let config_dir = std::path::Path::new(config_path)
355                .parent()
356                .unwrap_or_else(|| std::path::Path::new("."));
357            match type_report::collect_report(
358                &config,
359                export,
360                &column_overrides,
361                &policy,
362                eff_target,
363                config_dir,
364                params,
365            ) {
366                Ok(report) => {
367                    if report.has_fatal() {
368                        any_fatal = true;
369                    }
370                    if let Some(t) = eff_target
371                        && report.has_target_fail()
372                    {
373                        any_fatal = true;
374                        target_fail_cols += report
375                            .columns
376                            .iter()
377                            .filter(|c| c.target_status == Some(TargetStatus::Fail))
378                            .count();
379                        target_fail_label.get_or_insert(t.label());
380                    }
381                    if json_output {
382                        // `--json` + `--type-report` interaction (DESIGN):
383                        // emit BOTH, nested. Each export gets ONE JSON object
384                        // (NDJSON, one per line, unchanged) keeping the
385                        // top-level type-report keys (`export`/`columns`/
386                        // `violations`) so existing consumers and the
387                        // `check_json_flag_outputs_type_report_as_json` test
388                        // stay green — and we attach the per-export DIAGNOSTIC
389                        // verdict under a new `"diagnostic"` key. This is the
390                        // least-surprising shape because `check --json` already
391                        // emitted one type-report object per export; we simply
392                        // enrich each with its verdict rather than printing a
393                        // second, separate JSON value (which would break a
394                        // single-`from_str` parse of stdout).
395                        print_report_json_with_diagnostic(
396                            &report,
397                            diag_by_export.get(export.name.as_str()).copied(),
398                        )?;
399                    } else {
400                        type_report::print_table(&report, eff_target);
401                    }
402                }
403                Err(e) => {
404                    log::warn!("type report for '{}' failed: {:#}", export.name, e);
405                    // The type report could not be collected, but the diagnostic
406                    // was. Under --json the verdict must still reach the
407                    // consumer, so emit a diagnostic-only object (no `columns`/
408                    // `violations`) rather than silently dropping this export.
409                    if json_output
410                        && let Some(diag) = diag_by_export.get(export.name.as_str()).copied()
411                    {
412                        println!("{}", serde_json::to_string(diag)?);
413                    }
414                }
415            }
416        }
417
418        if strict && any_fatal {
419            anyhow::bail!("strict mode: unsafe type mappings found (see report above)");
420        } else if !strict && target_fail_cols > 0 && !json_output {
421            // The table showed "fail ✗" but rc is 0 — say so explicitly. Skipped
422            // under --json so NDJSON output stays one object per line.
423            clean = false;
424            println!();
425            println!(
426                "{}",
427                target_fail_note(target_fail_cols, target_fail_label.unwrap_or("target"))
428            );
429        }
430    }
431
432    if !json_output {
433        // Verdict legend — decode the EFFICIENT/ACCEPTABLE/DEGRADED/UNSAFE words
434        // printed above and reassure that `check` is advisory: never blocks a run.
435        println!();
436        println!(
437            "Verdicts: EFFICIENT > ACCEPTABLE > DEGRADED > UNSAFE — advisory only; the run is never blocked."
438        );
439        if clean {
440            // Keep the ladder going to the final rung instead of ending cold.
441            println!(
442                "Looks good. Next: rivet run -c {config_path} --validate   # export, then verify row counts"
443            );
444        }
445    }
446
447    Ok(())
448}
449
450/// Emit one export's `--json` line: the type report (`export`/`columns`/
451/// `violations`/…) with the per-export DIAGNOSTIC verdict attached under a new
452/// `"diagnostic"` key. NDJSON — exactly one JSON object, terminated by a
453/// newline, so a multi-export config prints one parseable object per line
454/// (preserving the prior `check --json` type-report wire shape, now enriched).
455///
456/// `diag` is `None` only if the diagnostic could not be paired by export name
457/// (it always can in practice); in that case the type report is emitted as
458/// before, so the worst case is a missing `diagnostic` key, never a panic.
459fn print_report_json_with_diagnostic(
460    report: &type_report::ExportTypeReport,
461    diag: Option<&ExportDiagnostic>,
462) -> Result<()> {
463    let mut value = serde_json::to_value(report)?;
464    if let (Some(obj), Some(diag)) = (value.as_object_mut(), diag) {
465        obj.insert("diagnostic".to_string(), serde_json::to_value(diag)?);
466    }
467    println!("{}", serde_json::to_string(&value)?);
468    Ok(())
469}
470
471fn print_diagnostic(diag: &ExportDiagnostic) {
472    println!();
473    println!("Export: {}", diag.export_name);
474    println!("  Strategy:     {}", diag.strategy);
475    println!("  Mode:         {}", diag.mode);
476    if let Some(est) = diag.row_estimate {
477        if est >= 1_000_000 {
478            println!("  Row estimate: ~{}M", est / 1_000_000);
479        } else if est >= 1_000 {
480            println!("  Row estimate: ~{}K", est / 1_000);
481        } else {
482            println!("  Row estimate: ~{}", est);
483        }
484    }
485    if let Some(w) = diag.avg_row_bytes {
486        println!("  Row width:    ~{} bytes", w);
487    }
488    if let (Some(min_v), Some(max_v)) = (&diag.cursor_min, &diag.cursor_max) {
489        println!("  Cursor range: {} .. {}", min_v, max_v);
490    }
491    if let Some(col) = &diag.cursor_column {
492        println!("  Cursor col:   {}", col);
493    }
494    // Plain-language access path instead of a raw EXPLAIN node dump
495    // (`Result (cost=0.00..0.01 rows=1 width=36)`). Keyed off the authoritative
496    // `uses_index` bool, gated on `scan_type.is_some()` so engines without an
497    // EXPLAIN probe (MSSQL) stay silent.
498    if diag.scan_type.is_some() {
499        let access = if diag.uses_index {
500            "index scan (the cursor/chunk column is indexed)"
501        } else {
502            "full table scan (no index on the read path)"
503        };
504        println!("  Access:       {access}");
505    }
506    println!("  Verdict:      {}", diag.verdict);
507    println!(
508        "  Recommended:  tuning.profile: {}",
509        diag.recommended_profile
510    );
511    let (par_level, par_reason) = diag.recommended_parallel;
512    if par_level > 1 {
513        println!("  Recommended:  parallel: {} ({})", par_level, par_reason);
514    } else {
515        println!("  Parallelism:  {} ({})", par_level, par_reason);
516    }
517    for w in &diag.warnings {
518        println!("  Warning:      [{}] {}", w.severity.label(), w.message);
519    }
520    if let Some(suggestion) = &diag.suggestion {
521        println!("  Suggestion:   {}", suggestion);
522    }
523}
524
525#[cfg(test)]
526mod tests {
527    use super::*;
528    use crate::config::{DestinationConfig, DestinationType, ExportConfig, ExportMode, FormatType};
529    use doctor::{
530        categorize_dest_error, categorize_source_error, destination_error_hint, source_error_hint,
531    };
532    use serde_json::Value;
533
534    fn make_export(name: &str, mode: ExportMode, cursor: Option<&str>) -> ExportConfig {
535        // Baseline from the canonical test fixture; override only the fields
536        // these preflight tests vary (mode, cursor, CSV format, query, dest).
537        ExportConfig {
538            mode,
539            cursor_column: cursor.map(|s| s.to_string()),
540            query: Some("SELECT * FROM t".to_string()),
541            format: FormatType::Csv,
542            destination: DestinationConfig {
543                destination_type: DestinationType::Local,
544                path: Some("./out".to_string()),
545                ..Default::default()
546            },
547            ..crate::config::sample_export(name)
548        }
549    }
550
551    /// A representative incremental diagnostic for the `--json` serialization
552    /// tests: a cursor column (so `has_cursor` is true), an index (so
553    /// `uses_index` is true), a >1 parallel recommendation (so `can_parallel`
554    /// is true), and a couple of warnings.
555    fn sample_diagnostic(name: &str) -> ExportDiagnostic {
556        ExportDiagnostic {
557            export_name: name.to_string(),
558            strategy: "incremental(updated_at)".to_string(),
559            mode: "incremental".to_string(),
560            cursor_column: Some("updated_at".to_string()),
561            row_estimate: Some(1_234_567),
562            avg_row_bytes: Some(96),
563            cursor_min: Some("2020-01-01".to_string()),
564            cursor_max: Some("2024-01-01".to_string()),
565            scan_type: Some("Index Scan".to_string()),
566            uses_index: true,
567            verdict: HealthVerdict::Degraded,
568            recommended_profile: "safe",
569            recommended_parallel: (4, "large indexed dataset"),
570            warnings: vec![
571                analysis::Warning::new(analysis::Severity::Medium, "Sparse key range".to_string()),
572                analysis::Warning::new(analysis::Severity::High, "memory risk".to_string()),
573            ],
574            suggestion: Some("create an index".to_string()),
575        }
576    }
577
578    // ── `rivet check --json`: the per-export DIAGNOSTIC verdict as JSON ───────
579
580    #[test]
581    fn diagnostic_json_has_lowercase_verdict_and_core_fields() {
582        let diag = sample_diagnostic("orders");
583        let v: serde_json::Value =
584            serde_json::from_str(&serde_json::to_string(&diag).unwrap()).unwrap();
585
586        // Verdict serializes to a stable lowercase token (not the SHOUTING
587        // Display form), so CI can match on it case-sensitively.
588        assert_eq!(v["verdict"], "degraded", "got: {v}");
589        assert_eq!(v["strategy"], "incremental(updated_at)", "got: {v}");
590        assert_eq!(v["mode"], "incremental", "got: {v}");
591        assert_eq!(v["recommended_profile"], "safe", "got: {v}");
592        assert!(v["warnings"].is_array(), "warnings must be an array: {v}");
593        assert_eq!(v["warnings"].as_array().unwrap().len(), 2, "got: {v}");
594        // Each warning is a `{ severity, message }` object (per-warning severity).
595        assert_eq!(v["warnings"][0]["severity"], "medium", "got: {v}");
596        assert_eq!(v["warnings"][0]["message"], "Sparse key range", "got: {v}");
597        assert_eq!(v["warnings"][1]["severity"], "high", "got: {v}");
598        assert_eq!(v["export_name"], "orders", "got: {v}");
599    }
600
601    #[test]
602    fn diagnostic_json_verdict_tokens_are_all_lowercase() {
603        for (verdict, token) in [
604            (HealthVerdict::Efficient, "efficient"),
605            (HealthVerdict::Acceptable, "acceptable"),
606            (HealthVerdict::Degraded, "degraded"),
607            (HealthVerdict::Unsafe, "unsafe"),
608        ] {
609            let mut diag = sample_diagnostic("t");
610            diag.verdict = verdict;
611            let v: serde_json::Value =
612                serde_json::from_str(&serde_json::to_string(&diag).unwrap()).unwrap();
613            assert_eq!(v["verdict"], token, "verdict must lowercase to {token}");
614        }
615    }
616
617    #[test]
618    fn diagnostic_json_recommended_parallel_is_named_object_not_tuple() {
619        // The raw `(u32, &str)` must NOT leak as a positional 2-array; consumers
620        // read `recommended_parallel.level` / `.reason`.
621        let diag = sample_diagnostic("t");
622        let v: serde_json::Value =
623            serde_json::from_str(&serde_json::to_string(&diag).unwrap()).unwrap();
624        assert!(
625            v["recommended_parallel"].is_object(),
626            "recommended_parallel must be an object, got: {}",
627            v["recommended_parallel"]
628        );
629        assert_eq!(v["recommended_parallel"]["level"], 4, "got: {v}");
630        assert_eq!(
631            v["recommended_parallel"]["reason"], "large indexed dataset",
632            "got: {v}"
633        );
634    }
635
636    #[test]
637    fn diagnostic_json_capabilities_are_derived_from_fields() {
638        let diag = sample_diagnostic("t");
639        let v: serde_json::Value =
640            serde_json::from_str(&serde_json::to_string(&diag).unwrap()).unwrap();
641        let caps = &v["capabilities"];
642        assert_eq!(caps["uses_index"], true, "got: {caps}");
643        assert_eq!(caps["has_cursor"], true, "got: {caps}");
644        assert_eq!(caps["can_parallel"], true, "got: {caps}");
645    }
646
647    #[test]
648    fn diagnostic_json_capabilities_flip_with_fields() {
649        // A non-cursor, no-index, single-worker diagnostic flips all three.
650        let mut diag = sample_diagnostic("t");
651        diag.cursor_column = None;
652        diag.uses_index = false;
653        diag.recommended_parallel = (1, "small dataset");
654        let v: serde_json::Value =
655            serde_json::from_str(&serde_json::to_string(&diag).unwrap()).unwrap();
656        let caps = &v["capabilities"];
657        assert_eq!(caps["uses_index"], false, "got: {caps}");
658        assert_eq!(caps["has_cursor"], false, "got: {caps}");
659        assert_eq!(caps["can_parallel"], false, "got: {caps}");
660    }
661
662    #[test]
663    fn diagnostic_json_skips_none_optionals() {
664        // `None` optionals are omitted (not `null`) to keep the object lean.
665        let mut diag = sample_diagnostic("t");
666        diag.suggestion = None;
667        diag.scan_type = None;
668        let v: serde_json::Value =
669            serde_json::from_str(&serde_json::to_string(&diag).unwrap()).unwrap();
670        let obj = v.as_object().unwrap();
671        assert!(!obj.contains_key("suggestion"), "None must be omitted: {v}");
672        assert!(!obj.contains_key("scan_type"), "None must be omitted: {v}");
673    }
674
675    /// Build the same `Value` `print_report_json_with_diagnostic` prints, so the
676    /// merged shape is asserted without capturing stdout.
677    fn merged_check_json(report: &type_report::ExportTypeReport, diag: &ExportDiagnostic) -> Value {
678        let mut value = serde_json::to_value(report).unwrap();
679        value.as_object_mut().unwrap().insert(
680            "diagnostic".to_string(),
681            serde_json::to_value(diag).unwrap(),
682        );
683        value
684    }
685
686    fn empty_report(export: &str) -> type_report::ExportTypeReport {
687        type_report::ExportTypeReport {
688            export: export.to_string(),
689            columns: Vec::new(),
690            violations: Vec::new(),
691            target_failures: false,
692            recovery_sql: None,
693        }
694    }
695
696    #[test]
697    fn check_json_merges_diagnostic_into_type_report_object() {
698        // The `--json` + `--type-report` interaction: ONE object per export
699        // keeping the type-report keys (`export`/`columns`/`violations`) — so
700        // the existing `check_json_flag_outputs_type_report_as_json` contract
701        // holds — PLUS a nested `diagnostic` carrying the verdict.
702        let report = empty_report("orders");
703        let diag = sample_diagnostic("orders");
704        let v = merged_check_json(&report, &diag);
705
706        // Pre-existing type-report keys still at the root.
707        assert_eq!(v["export"], "orders", "got: {v}");
708        assert!(v["columns"].is_array(), "columns at root: {v}");
709        assert!(v["violations"].is_array(), "violations at root: {v}");
710
711        // The diagnostic is nested and carries the verdict + advice.
712        let d = &v["diagnostic"];
713        assert_eq!(d["verdict"], "degraded", "got: {d}");
714        assert_eq!(d["strategy"], "incremental(updated_at)", "got: {d}");
715        assert_eq!(d["mode"], "incremental", "got: {d}");
716        assert_eq!(d["recommended_profile"], "safe", "got: {d}");
717        assert!(d["warnings"].is_array(), "warnings array: {d}");
718        assert_eq!(d["capabilities"]["has_cursor"], true, "got: {d}");
719    }
720
721    #[test]
722    fn check_json_object_is_a_single_parseable_line() {
723        // NDJSON: serializing yields exactly one JSON value with no trailing
724        // data, so `serde_json::from_str(line.trim())` (as the live test does)
725        // parses it whole.
726        let report = empty_report("orders");
727        let diag = sample_diagnostic("orders");
728        let line = serde_json::to_string(&merged_check_json(&report, &diag)).unwrap();
729        assert!(!line.contains('\n'), "one object per line: {line}");
730        let parsed: Value = serde_json::from_str(line.trim()).expect("must parse whole");
731        assert_eq!(parsed["export"], "orders");
732    }
733
734    // ── L8: 'fail ✗' note when --target FAILs but --strict was not passed ─────
735    // The glyph implies a hard failure; exit is gated only by --strict. The note
736    // tells an operator/CI the exit is 0 so the glyph doesn't mislead.
737    #[test]
738    fn target_fail_note_names_count_target_and_strict_gate() {
739        let note = target_fail_note(2, "bigquery");
740        assert!(note.contains("2 columns FAIL"), "got: {note}");
741        assert!(note.contains("bigquery"), "got: {note}");
742        assert!(note.contains("--strict"), "got: {note}");
743        assert!(note.contains("exit 0"), "got: {note}");
744    }
745
746    #[test]
747    fn target_fail_note_singular_for_one_column() {
748        let note = target_fail_note(1, "duckdb");
749        assert!(note.contains("1 column FAIL"), "got: {note}");
750        assert!(!note.contains("1 columns"), "should be singular: {note}");
751    }
752
753    #[test]
754    fn verdict_small_indexed_with_cursor_is_efficient() {
755        let v = compute_verdict(Some(500_000), true, true, None, 1);
756        assert!(matches!(v, HealthVerdict::Efficient), "got: {v}");
757    }
758
759    #[test]
760    fn verdict_large_indexed_with_cursor_is_acceptable() {
761        let v = compute_verdict(Some(20_000_000), true, true, None, 1);
762        assert!(matches!(v, HealthVerdict::Acceptable), "got: {v}");
763    }
764
765    #[test]
766    fn verdict_no_index_no_cursor_is_degraded() {
767        let v = compute_verdict(Some(500_000), false, false, None, 1);
768        assert!(matches!(v, HealthVerdict::Degraded), "got: {v}");
769    }
770
771    #[test]
772    fn verdict_huge_no_index_is_unsafe() {
773        let v = compute_verdict(Some(100_000_000), false, false, None, 1);
774        assert!(matches!(v, HealthVerdict::Unsafe), "got: {v}");
775    }
776
777    #[test]
778    fn parse_pg_row_estimate_from_sort_plan() {
779        let plan = "Sort  (cost=12345.67..12456.78 rows=1000455 width=50)\n  ->  Seq Scan on orders  (cost=0.00..8765.43 rows=1000455 width=50)";
780        assert_eq!(parse_pg_row_estimate(plan), Some(1_000_455));
781    }
782
783    #[test]
784    fn parse_pg_row_estimate_from_index_scan() {
785        let plan =
786            "Index Scan using idx_updated on orders  (cost=0.42..81676.36 rows=500000 width=50)";
787        assert_eq!(parse_pg_row_estimate(plan), Some(500_000));
788    }
789
790    #[test]
791    fn extract_scan_type_detects_seq_scan() {
792        let plan = "Sort  (cost=...)\n  ->  Seq Scan on users  (cost=...)";
793        let st = extract_scan_type(plan);
794        assert!(st.contains("Seq Scan"), "expected Seq Scan, got: {st}");
795    }
796
797    #[test]
798    fn extract_scan_type_detects_index_scan() {
799        let plan = "Index Scan using users_pkey on users  (cost=0.42..123.45 rows=100 width=50)";
800        let st = extract_scan_type(plan);
801        assert!(st.contains("Index Scan"), "expected Index Scan, got: {st}");
802    }
803
804    #[test]
805    fn suggestion_for_efficient_verdict_is_none() {
806        let e = make_export("t", ExportMode::Full, None);
807        let s = build_suggestion(&HealthVerdict::Efficient, Some(1000), true, &e);
808        assert!(
809            s.is_none(),
810            "efficient verdict should produce no suggestion"
811        );
812    }
813
814    #[test]
815    fn suggestion_for_degraded_verdict_recommends_safe_profile() {
816        let e = make_export("t", ExportMode::Full, None);
817        let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e);
818        let msg = s.expect("degraded verdict should produce a suggestion");
819        assert!(
820            msg.contains("safe"),
821            "suggestion should recommend safe profile, got: {msg}"
822        );
823    }
824
825    fn src_err(msg: &str) -> &'static str {
826        categorize_source_error(&anyhow::anyhow!("{}", msg))
827    }
828
829    #[test]
830    fn source_password_rejected_is_auth_error() {
831        assert_eq!(
832            src_err("password authentication failed for user \"rivet\""),
833            "auth error"
834        );
835    }
836
837    #[test]
838    fn source_authentication_failed_is_auth_error() {
839        assert_eq!(src_err("FATAL: authentication failed"), "auth error");
840    }
841
842    #[test]
843    fn source_access_denied_is_auth_error() {
844        assert_eq!(
845            src_err("Access denied for user 'rivet'@'localhost'"),
846            "auth error"
847        );
848    }
849
850    #[test]
851    fn source_connection_refused_is_connectivity() {
852        assert_eq!(
853            src_err("connection refused (os error 61)"),
854            "connectivity error"
855        );
856    }
857
858    #[test]
859    fn source_timed_out_is_connectivity() {
860        assert_eq!(src_err("connection timed out"), "connectivity error");
861    }
862
863    #[test]
864    fn source_dns_translate_host_is_connectivity() {
865        assert_eq!(
866            src_err("could not translate host name \"db.bad\" to address"),
867            "connectivity error"
868        );
869    }
870
871    #[test]
872    fn source_name_not_known_is_connectivity() {
873        assert_eq!(src_err("Name or service not known"), "connectivity error");
874    }
875
876    #[test]
877    fn source_unknown_error_is_generic() {
878        assert_eq!(src_err("something totally unexpected"), "error");
879    }
880
881    fn dest_config(dtype: DestinationType) -> DestinationConfig {
882        DestinationConfig {
883            destination_type: dtype,
884            bucket: Some("b".to_string()),
885            ..Default::default()
886        }
887    }
888
889    fn dest_err(msg: &str, dtype: DestinationType) -> &'static str {
890        let cfg = dest_config(dtype);
891        categorize_dest_error(&anyhow::anyhow!("{}", msg), &cfg)
892    }
893
894    fn local_dest(path: &str) -> DestinationConfig {
895        DestinationConfig {
896            destination_type: DestinationType::Local,
897            path: Some(path.to_string()),
898            ..Default::default()
899        }
900    }
901
902    // Regression (doctor-dedup): doctor's inline dedup key omitted `path`,
903    // so two local destinations with different paths collapsed to one entry
904    // and the second was never write-probed. The shared identity must keep
905    // them distinct.
906    #[test]
907    fn destination_identity_distinguishes_local_paths() {
908        assert_ne!(
909            destination_identity(&local_dest("/tmp/a")),
910            destination_identity(&local_dest("/tmp/b")),
911        );
912    }
913
914    #[test]
915    fn destination_identity_collapses_identical_local_destinations() {
916        assert_eq!(
917            destination_identity(&local_dest("/tmp/a")),
918            destination_identity(&local_dest("/tmp/a")),
919        );
920    }
921
922    #[test]
923    fn destination_identity_distinguishes_buckets() {
924        let a = DestinationConfig {
925            bucket: Some("bucket-a".to_string()),
926            ..dest_config(DestinationType::S3)
927        };
928        let b = DestinationConfig {
929            bucket: Some("bucket-b".to_string()),
930            ..dest_config(DestinationType::S3)
931        };
932        assert_ne!(destination_identity(&a), destination_identity(&b));
933    }
934
935    // Same bucket name on different endpoints (e.g. AWS vs MinIO) is two
936    // distinct destinations and must be probed separately.
937    #[test]
938    fn destination_identity_distinguishes_endpoints_for_same_bucket() {
939        let aws = dest_config(DestinationType::S3);
940        let minio = DestinationConfig {
941            endpoint: Some("http://localhost:9000".to_string()),
942            ..dest_config(DestinationType::S3)
943        };
944        assert_ne!(destination_identity(&aws), destination_identity(&minio));
945    }
946
947    #[test]
948    fn dest_credential_loading_is_auth_error() {
949        assert_eq!(
950            dest_err(
951                "loading credential to sign http request",
952                DestinationType::Gcs
953            ),
954            "auth error"
955        );
956    }
957
958    #[test]
959    fn dest_permission_denied_is_auth_error() {
960        assert_eq!(
961            dest_err("permission denied on resource bucket", DestinationType::S3),
962            "auth error"
963        );
964    }
965
966    #[test]
967    fn dest_forbidden_is_auth_error() {
968        assert_eq!(
969            dest_err("403 Forbidden", DestinationType::Gcs),
970            "auth error"
971        );
972    }
973
974    #[test]
975    fn dest_unauthorized_is_auth_error() {
976        assert_eq!(
977            dest_err("401 Unauthorized", DestinationType::S3),
978            "auth error"
979        );
980    }
981
982    #[test]
983    fn dest_invalid_grant_is_auth_error() {
984        assert_eq!(
985            dest_err(
986                "invalid_grant: token has been revoked",
987                DestinationType::Gcs
988            ),
989            "auth error"
990        );
991    }
992
993    #[test]
994    fn dest_nosuchbucket_s3_is_bucket_not_found() {
995        assert_eq!(
996            dest_err(
997                "NoSuchBucket: the specified bucket does not exist",
998                DestinationType::S3
999            ),
1000            "bucket not found"
1001        );
1002    }
1003
1004    #[test]
1005    fn dest_not_found_gcs_is_bucket_not_found() {
1006        assert_eq!(
1007            dest_err("bucket not found (404)", DestinationType::Gcs),
1008            "bucket not found"
1009        );
1010    }
1011
1012    #[test]
1013    fn dest_not_found_local_is_path_not_found() {
1014        assert_eq!(
1015            dest_err("path not found: /tmp/missing", DestinationType::Local),
1016            "path not found"
1017        );
1018    }
1019
1020    #[test]
1021    fn dest_connection_refused_is_connectivity() {
1022        assert_eq!(
1023            dest_err("connection refused to endpoint", DestinationType::S3),
1024            "connectivity error"
1025        );
1026    }
1027
1028    #[test]
1029    fn dest_dns_error_is_connectivity() {
1030        assert_eq!(
1031            dest_err("dns error: failed to lookup address", DestinationType::S3),
1032            "connectivity error"
1033        );
1034    }
1035
1036    #[test]
1037    fn dest_timed_out_is_connectivity() {
1038        assert_eq!(
1039            dest_err("request timed out after 30s", DestinationType::Gcs),
1040            "connectivity error"
1041        );
1042    }
1043
1044    #[test]
1045    fn dest_unknown_error_is_generic() {
1046        assert_eq!(
1047            dest_err("something else entirely", DestinationType::S3),
1048            "error"
1049        );
1050    }
1051
1052    #[test]
1053    fn strategy_full_scan() {
1054        let e = make_export("t", ExportMode::Full, None);
1055        assert_eq!(derive_strategy(&e), "full-scan");
1056    }
1057
1058    #[test]
1059    fn strategy_full_parallel() {
1060        let mut e = make_export("t", ExportMode::Full, None);
1061        e.parallel = 4;
1062        assert_eq!(derive_strategy(&e), "full-parallel(4)");
1063    }
1064
1065    #[test]
1066    fn strategy_incremental() {
1067        let e = make_export("t", ExportMode::Incremental, Some("updated_at"));
1068        assert_eq!(derive_strategy(&e), "incremental(updated_at)");
1069    }
1070
1071    #[test]
1072    fn strategy_chunked() {
1073        let mut e = make_export("t", ExportMode::Chunked, None);
1074        e.chunk_column = Some("id".to_string());
1075        e.chunk_size = 50_000;
1076        assert_eq!(derive_strategy(&e), "chunked(id, size=50000)");
1077    }
1078
1079    #[test]
1080    fn strategy_chunked_parallel() {
1081        let mut e = make_export("t", ExportMode::Chunked, None);
1082        e.chunk_column = Some("id".to_string());
1083        e.chunk_size = 50_000;
1084        e.parallel = 3;
1085        assert_eq!(derive_strategy(&e), "chunked-parallel(id, size=50000, p=3)");
1086    }
1087
1088    #[test]
1089    fn strategy_time_window() {
1090        let mut e = make_export("t", ExportMode::TimeWindow, None);
1091        e.time_column = Some("created_at".to_string());
1092        e.days_window = Some(7);
1093        assert_eq!(derive_strategy(&e), "time-window(created_at, 7d)");
1094    }
1095
1096    #[test]
1097    fn profile_small_indexed_is_fast() {
1098        let e = make_export("t", ExportMode::Full, None);
1099        assert_eq!(recommend_profile(Some(500_000), true, &e), "fast");
1100    }
1101
1102    #[test]
1103    fn profile_medium_indexed_is_balanced() {
1104        let e = make_export("t", ExportMode::Full, None);
1105        assert_eq!(recommend_profile(Some(5_000_000), true, &e), "balanced");
1106    }
1107
1108    #[test]
1109    fn profile_large_indexed_is_safe() {
1110        let e = make_export("t", ExportMode::Full, None);
1111        assert_eq!(recommend_profile(Some(50_000_000), true, &e), "safe");
1112    }
1113
1114    #[test]
1115    fn profile_small_no_index_is_balanced() {
1116        let e = make_export("t", ExportMode::Full, None);
1117        assert_eq!(recommend_profile(Some(50_000), false, &e), "balanced");
1118    }
1119
1120    #[test]
1121    fn profile_small_no_index_parallel_is_safe() {
1122        let mut e = make_export("t", ExportMode::Full, None);
1123        e.parallel = 4;
1124        assert_eq!(recommend_profile(Some(50_000), false, &e), "safe");
1125    }
1126
1127    #[test]
1128    fn profile_medium_no_index_is_balanced() {
1129        let e = make_export("t", ExportMode::Full, None);
1130        assert_eq!(recommend_profile(Some(500_000), false, &e), "balanced");
1131    }
1132
1133    #[test]
1134    fn profile_large_no_index_is_safe() {
1135        let e = make_export("t", ExportMode::Full, None);
1136        assert_eq!(recommend_profile(Some(5_000_000), false, &e), "safe");
1137    }
1138
1139    #[test]
1140    fn sparse_range_warning_when_very_sparse() {
1141        let mut e = make_export("t", ExportMode::Chunked, None);
1142        e.chunk_column = Some("id".to_string());
1143        e.chunk_size = 100_000;
1144        let w = check_sparse_range(&e, Some(100_000), Some("1"), Some("10000000"));
1145        assert!(w.is_some(), "should warn about sparse range");
1146        let msg = w.unwrap();
1147        assert!(msg.contains("Sparse key range"), "got: {msg}");
1148        assert!(msg.contains("empty"), "got: {msg}");
1149    }
1150
1151    #[test]
1152    fn sparse_range_no_warning_when_dense() {
1153        let mut e = make_export("t", ExportMode::Chunked, None);
1154        e.chunk_column = Some("id".to_string());
1155        e.chunk_size = 100_000;
1156        let w = check_sparse_range(&e, Some(100_000), Some("1"), Some("100000"));
1157        assert!(w.is_none(), "should not warn for dense range");
1158    }
1159
1160    #[test]
1161    fn sparse_range_skipped_when_chunk_dense() {
1162        let mut e = make_export("t", ExportMode::Chunked, None);
1163        e.chunk_column = Some("id".to_string());
1164        e.chunk_dense = true;
1165        e.chunk_size = 100_000;
1166        let w = check_sparse_range(&e, Some(100_000), Some("1"), Some("10000000"));
1167        assert!(
1168            w.is_none(),
1169            "chunk_dense uses ordinals, not physical id span"
1170        );
1171    }
1172
1173    #[test]
1174    fn dense_surrogate_warning_when_chunk_dense_builtin() {
1175        let mut e = make_export("t", ExportMode::Chunked, None);
1176        e.chunk_column = Some("id".to_string());
1177        e.chunk_dense = true;
1178        e.query = Some("SELECT id FROM orders".to_string());
1179        let w = check_dense_surrogate_cost(&e);
1180        assert!(w.is_some(), "should warn about built-in ROW_NUMBER cost");
1181        assert!(w.unwrap().contains("global sort"));
1182    }
1183
1184    #[test]
1185    fn sparse_range_not_triggered_for_non_chunked() {
1186        let e = make_export("t", ExportMode::Full, None);
1187        let w = check_sparse_range(&e, Some(100), Some("1"), Some("1000000"));
1188        assert!(w.is_none(), "should not warn for non-chunked mode");
1189    }
1190
1191    #[test]
1192    fn dense_surrogate_warning_with_row_number() {
1193        let mut e = make_export("t", ExportMode::Chunked, None);
1194        e.chunk_column = Some("rn".to_string());
1195        e.query = Some("SELECT *, ROW_NUMBER() OVER (ORDER BY id) AS rn FROM orders".to_string());
1196        let w = check_dense_surrogate_cost(&e);
1197        assert!(w.is_some(), "should warn about ROW_NUMBER cost");
1198        assert!(w.unwrap().contains("global sort"));
1199    }
1200
1201    #[test]
1202    fn no_dense_surrogate_warning_without_row_number() {
1203        let mut e = make_export("t", ExportMode::Chunked, None);
1204        e.chunk_column = Some("id".to_string());
1205        e.query = Some("SELECT * FROM orders".to_string());
1206        let w = check_dense_surrogate_cost(&e);
1207        assert!(w.is_none());
1208    }
1209
1210    #[test]
1211    fn no_dense_surrogate_warning_for_non_chunked() {
1212        let mut e = make_export("t", ExportMode::Full, None);
1213        e.query = Some("SELECT ROW_NUMBER() OVER () AS rn FROM t".to_string());
1214        let w = check_dense_surrogate_cost(&e);
1215        assert!(w.is_none(), "should not warn for non-chunked mode");
1216    }
1217
1218    #[test]
1219    fn parallel_memory_warning_large_dataset() {
1220        let mut e = make_export("t", ExportMode::Chunked, None);
1221        e.parallel = 4;
1222        let w = check_parallel_memory_risk(&e, Some(10_000_000));
1223        assert!(w.is_some(), "should warn about memory risk");
1224        let msg = w.unwrap();
1225        assert!(msg.contains("Parallel=4"), "got: {msg}");
1226        assert!(msg.contains("memory"), "got: {msg}");
1227    }
1228
1229    #[test]
1230    fn no_parallel_memory_warning_small_dataset() {
1231        let mut e = make_export("t", ExportMode::Chunked, None);
1232        e.parallel = 4;
1233        let w = check_parallel_memory_risk(&e, Some(1_000));
1234        assert!(w.is_none(), "should not warn for small dataset");
1235    }
1236
1237    #[test]
1238    fn no_parallel_memory_warning_single_worker() {
1239        let e = make_export("t", ExportMode::Full, None);
1240        let w = check_parallel_memory_risk(&e, Some(100_000_000));
1241        assert!(w.is_none(), "should not warn when parallel=1");
1242    }
1243
1244    #[test]
1245    fn suggestion_degraded_full_recommends_incremental() {
1246        let e = make_export("t", ExportMode::Full, None);
1247        let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e).unwrap();
1248        assert!(s.contains("incremental"), "got: {s}");
1249    }
1250
1251    #[test]
1252    fn suggestion_degraded_chunked_recommends_index() {
1253        let mut e = make_export("t", ExportMode::Chunked, None);
1254        e.chunk_column = Some("id".to_string());
1255        let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e).unwrap();
1256        assert!(s.contains("index on 'id'"), "got: {s}");
1257    }
1258
1259    #[test]
1260    fn suggestion_degraded_time_window_recommends_index() {
1261        let mut e = make_export("t", ExportMode::TimeWindow, None);
1262        e.time_column = Some("created_at".to_string());
1263        e.days_window = Some(7);
1264        let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e).unwrap();
1265        assert!(s.contains("index on 'created_at'"), "got: {s}");
1266    }
1267
1268    #[test]
1269    fn suggestion_unsafe_full_recommends_incremental() {
1270        let e = make_export("t", ExportMode::Full, None);
1271        let s = build_suggestion(&HealthVerdict::Unsafe, Some(100_000_000), false, &e).unwrap();
1272        assert!(s.contains("incremental"), "got: {s}");
1273    }
1274
1275    #[test]
1276    fn suggestion_unsafe_chunked_recommends_index_and_parallel() {
1277        let mut e = make_export("t", ExportMode::Chunked, None);
1278        e.chunk_column = Some("id".to_string());
1279        let s = build_suggestion(&HealthVerdict::Unsafe, Some(100_000_000), false, &e).unwrap();
1280        assert!(s.contains("index on 'id'"), "got: {s}");
1281        assert!(s.contains("parallel"), "got: {s}");
1282    }
1283
1284    #[test]
1285    fn suggestion_unsafe_incremental_recommends_index_on_cursor() {
1286        let e = make_export("t", ExportMode::Incremental, Some("updated_at"));
1287        let s = build_suggestion(&HealthVerdict::Unsafe, Some(100_000_000), false, &e).unwrap();
1288        assert!(s.contains("index on 'updated_at'"), "got: {s}");
1289    }
1290
1291    #[test]
1292    fn suggestion_acceptable_large_full_recommends_incremental() {
1293        let e = make_export("t", ExportMode::Full, None);
1294        let s = build_suggestion(&HealthVerdict::Acceptable, Some(20_000_000), true, &e).unwrap();
1295        assert!(s.contains("incremental"), "got: {s}");
1296    }
1297
1298    #[test]
1299    fn parallel_only_for_chunked_mode() {
1300        let e = make_export("t", ExportMode::Full, None);
1301        let (level, _) = recommend_parallelism(&e, Some(1_000_000), true);
1302        assert_eq!(level, 1, "non-chunked mode should recommend 1");
1303    }
1304
1305    #[test]
1306    fn parallel_small_dataset_is_one() {
1307        let mut e = make_export("t", ExportMode::Chunked, None);
1308        e.chunk_column = Some("id".to_string());
1309        let (level, _) = recommend_parallelism(&e, Some(10_000), true);
1310        assert_eq!(level, 1, "small dataset should recommend 1");
1311    }
1312
1313    #[test]
1314    fn parallel_moderate_indexed_is_two() {
1315        let mut e = make_export("t", ExportMode::Chunked, None);
1316        e.chunk_column = Some("id".to_string());
1317        let (level, _) = recommend_parallelism(&e, Some(200_000), true);
1318        assert_eq!(level, 2, "moderate indexed dataset should recommend 2");
1319    }
1320
1321    #[test]
1322    fn parallel_large_indexed_is_four() {
1323        let mut e = make_export("t", ExportMode::Chunked, None);
1324        e.chunk_column = Some("id".to_string());
1325        let (level, _) = recommend_parallelism(&e, Some(2_000_000), true);
1326        assert_eq!(level, 4, "large indexed dataset should recommend 4");
1327    }
1328
1329    #[test]
1330    fn parallel_no_index_large_is_one() {
1331        let mut e = make_export("t", ExportMode::Chunked, None);
1332        e.chunk_column = Some("id".to_string());
1333        let (level, reason) = recommend_parallelism(&e, Some(10_000_000), false);
1334        assert_eq!(level, 1, "no index + large should recommend 1");
1335        assert!(reason.contains("no index"), "got: {reason}");
1336    }
1337
1338    #[test]
1339    fn parallel_no_index_moderate_is_conservative() {
1340        let mut e = make_export("t", ExportMode::Chunked, None);
1341        e.chunk_column = Some("id".to_string());
1342        let (level, _) = recommend_parallelism(&e, Some(200_000), false);
1343        assert_eq!(
1344            level, 2,
1345            "no index + moderate should recommend 2 (conservative)"
1346        );
1347    }
1348
1349    #[test]
1350    fn suggestion_acceptable_large_chunked_recommends_parallel() {
1351        let mut e = make_export("t", ExportMode::Chunked, None);
1352        e.chunk_column = Some("id".to_string());
1353        let s = build_suggestion(&HealthVerdict::Acceptable, Some(20_000_000), true, &e).unwrap();
1354        assert!(s.contains("parallel"), "got: {s}");
1355    }
1356
1357    #[test]
1358    fn connection_limit_warn_when_parallel_meets_max() {
1359        let w = check_connection_limit(20, Some(20));
1360        assert!(w.is_some(), "should warn when parallel == max_connections");
1361        let msg = w.unwrap();
1362        assert!(msg.contains("max_connections=20"), "got: {msg}");
1363        assert!(msg.contains("parallel=20"), "got: {msg}");
1364    }
1365
1366    #[test]
1367    fn connection_limit_warn_when_parallel_exceeds_max() {
1368        let w = check_connection_limit(100, Some(20));
1369        assert!(w.is_some(), "should warn when parallel > max_connections");
1370        let msg = w.unwrap();
1371        assert!(msg.contains("max_connections=20"), "got: {msg}");
1372    }
1373
1374    #[test]
1375    fn connection_limit_no_warn_when_parallel_below_max() {
1376        let w = check_connection_limit(4, Some(100));
1377        assert!(
1378            w.is_none(),
1379            "should not warn when parallel << max_connections"
1380        );
1381    }
1382
1383    #[test]
1384    fn connection_limit_no_warn_when_parallel_is_one() {
1385        let w = check_connection_limit(1, Some(5));
1386        assert!(
1387            w.is_none(),
1388            "single worker never triggers connection warning"
1389        );
1390    }
1391
1392    #[test]
1393    fn connection_limit_skipped_note_when_max_unknown_and_parallel_gt_one() {
1394        let w = check_connection_limit(100, None);
1395        assert!(w.is_some(), "should note that check was skipped");
1396        let msg = w.unwrap();
1397        assert!(msg.contains("skipped"), "got: {msg}");
1398    }
1399
1400    #[test]
1401    fn connection_limit_no_note_when_max_unknown_and_parallel_is_one() {
1402        let w = check_connection_limit(1, None);
1403        assert!(
1404            w.is_none(),
1405            "single worker never triggers connection warning"
1406        );
1407    }
1408
1409    #[test]
1410    fn connection_limit_suggests_headroom() {
1411        let w = check_connection_limit(25, Some(20)).unwrap();
1412        // Suggested safe max should be max_connections - 3 = 17
1413        assert!(
1414            w.contains("17"),
1415            "should suggest leaving headroom, got: {w}"
1416        );
1417    }
1418
1419    // ── v0.7.4: actionable hints next to categorised errors ───────────
1420
1421    fn src_hint(msg: &str, st: SourceType) -> Option<&'static str> {
1422        let err = anyhow::anyhow!("{}", msg);
1423        let cat = categorize_source_error(&err);
1424        source_error_hint(cat, &err, &st)
1425    }
1426
1427    fn dest_hint(msg: &str, dt: DestinationType) -> Option<&'static str> {
1428        let err = anyhow::anyhow!("{}", msg);
1429        let dest = DestinationConfig {
1430            destination_type: dt,
1431            bucket: Some("b".into()),
1432            ..Default::default()
1433        };
1434        let cat = categorize_dest_error(&err, &dest);
1435        destination_error_hint(cat, &dest)
1436    }
1437
1438    #[test]
1439    fn source_tls_handshake_returns_pg_specific_tls_hint() {
1440        let h = src_hint("TLS handshake failed", SourceType::Postgres).expect("hint");
1441        assert!(h.contains("tls.mode") && h.contains("ca_file"), "got: {h}");
1442    }
1443
1444    #[test]
1445    fn source_tls_handshake_returns_mysql_specific_tls_hint() {
1446        let h = src_hint("certificate verify failed", SourceType::Mysql).expect("hint");
1447        assert!(h.contains("tls.mode"), "got: {h}");
1448    }
1449
1450    #[test]
1451    fn source_auth_error_postgres_mentions_pg_hba() {
1452        let h = src_hint("password authentication failed", SourceType::Postgres).expect("hint");
1453        assert!(h.contains("pg_hba") && h.contains("SELECT"), "got: {h}");
1454    }
1455
1456    #[test]
1457    fn source_auth_error_mysql_mentions_grant() {
1458        let h = src_hint(
1459            "Access denied for user 'rivet'@'localhost'",
1460            SourceType::Mysql,
1461        )
1462        .expect("hint");
1463        assert!(h.contains("GRANT") && h.contains("FLUSH"), "got: {h}");
1464    }
1465
1466    #[test]
1467    fn source_connectivity_error_mentions_bastion_and_network() {
1468        let h = src_hint("connection refused", SourceType::Postgres).expect("hint");
1469        assert!(h.contains("bastion") || h.contains("VPN"), "got: {h}");
1470    }
1471
1472    #[test]
1473    fn source_unknown_error_returns_no_hint() {
1474        // Generic "error" category should yield no hint — better to
1475        // print the raw driver message than to mislead.
1476        let h = src_hint("totally unexpected", SourceType::Postgres);
1477        assert!(h.is_none(), "unknown errors should not produce a hint");
1478    }
1479
1480    #[test]
1481    fn dest_s3_auth_error_names_concrete_actions() {
1482        let h = dest_hint("permission denied", DestinationType::S3).expect("hint");
1483        assert!(
1484            h.contains("s3:PutObject") && h.contains("cloud-permissions"),
1485            "got: {h}"
1486        );
1487    }
1488
1489    #[test]
1490    fn dest_gcs_auth_error_names_concrete_actions() {
1491        let h = dest_hint("403 Forbidden", DestinationType::Gcs).expect("hint");
1492        assert!(
1493            h.contains("storage.objects") && h.contains("cloud-permissions"),
1494            "got: {h}"
1495        );
1496    }
1497
1498    #[test]
1499    fn categorize_dest_error_sas_expired_message_returns_sas_expired_category() {
1500        // Guard the load-bearing ordering in categorize_dest_error: the
1501        // "sas expired" early-return must fire before the generic "token"
1502        // branch, or destination_error_hint produces the wrong hint.
1503        // This test pins the *category string*, not just the final hint text.
1504        let err = anyhow::anyhow!(
1505            "Azure SAS token already expired (se=2024-01-01T00:00:00Z). Generate a new SAS and re-export."
1506        );
1507        let dest = DestinationConfig {
1508            destination_type: DestinationType::Azure,
1509            bucket: Some("c".into()),
1510            ..Default::default()
1511        };
1512        let cat = categorize_dest_error(&err, &dest);
1513        assert_eq!(
1514            cat, "sas expired",
1515            "expired-SAS error must categorise as 'sas expired', not '{cat}' — ordering in categorize_dest_error is load-bearing"
1516        );
1517    }
1518
1519    #[test]
1520    fn dest_azure_sas_expired_returns_regenerate_hint() {
1521        // The Azure preflight (v0.7.4) bails with "expired (se=…)" —
1522        // the hint must steer the operator to `az storage container
1523        // generate-sas` not "your IAM role is broken".
1524        let h = dest_hint(
1525            "Azure SAS token already expired (se=2024-01-01T00:00:00Z)",
1526            DestinationType::Azure,
1527        )
1528        .expect("hint");
1529        assert!(
1530            h.contains("generate-sas") && h.contains("AZURE_STORAGE_SAS_TOKEN"),
1531            "got: {h}"
1532        );
1533    }
1534
1535    #[test]
1536    fn dest_s3_bucket_not_found_says_no_auto_create() {
1537        let h = dest_hint("NoSuchBucket", DestinationType::S3).expect("hint");
1538        assert!(
1539            h.contains("does NOT auto-create") && h.contains("aws s3 mb"),
1540            "got: {h}"
1541        );
1542    }
1543
1544    #[test]
1545    fn dest_s3_connectivity_error_warns_about_region_mismatch() {
1546        let h = dest_hint("dns error", DestinationType::S3).expect("hint");
1547        assert!(h.contains("region") || h.contains("endpoint"), "got: {h}");
1548    }
1549}