Skip to main content

rivet/preflight/
mod.rs

1mod analysis;
2pub(crate) mod cursor_expr;
3mod doctor;
4mod mssql;
5mod mysql;
6mod postgres;
7mod schema_error;
8pub mod type_report;
9
10pub(crate) use analysis::chunk_sparsity_from_counts;
11// Re-exported so the plan layer's strategy explainer can ground its "≥ threshold"
12// narrative on the same constant `check`/`init` use, not a hard-coded copy.
13pub(crate) use analysis::SMALL_TABLE_ROW_THRESHOLD;
14#[cfg(test)]
15use analysis::{
16    build_suggestion, check_connection_limit, check_dense_surrogate_cost,
17    check_parallel_memory_risk, check_sparse_range, compute_verdict, derive_strategy,
18    recommend_parallelism, recommend_profile,
19};
20#[allow(unused_imports)]
21pub use doctor::doctor;
22// Reused at the run-time connect seam (src/pipeline/single.rs) so a failed
23// `rivet run` carries the same category + remediation hint `rivet doctor` gives.
24pub(crate) use doctor::{categorize_source_error, source_error_hint};
25#[cfg(test)]
26use postgres::{extract_scan_type, parse_pg_row_estimate};
27
28use serde::Serialize;
29
30use crate::config::{Config, ExportConfig, SourceType};
31use crate::error::Result;
32use crate::types::policy::TypePolicy;
33use crate::types::target::{ExportTarget, TargetStatus};
34
35/// Serializes lowercase ("efficient"/"acceptable"/"degraded"/"unsafe") so
36/// `rivet check --json` consumers (CI gates, orchestrators) match on a stable,
37/// case-insensitive token rather than the SHOUTING `Display` form used in the
38/// human-readable table.
39#[derive(Debug, Serialize)]
40#[serde(rename_all = "lowercase")]
41pub enum HealthVerdict {
42    Efficient,
43    Acceptable,
44    Degraded,
45    Unsafe,
46}
47
48impl std::fmt::Display for HealthVerdict {
49    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50        match self {
51            Self::Efficient => write!(f, "EFFICIENT"),
52            Self::Acceptable => write!(f, "ACCEPTABLE"),
53            Self::Degraded => write!(f, "DEGRADED"),
54            Self::Unsafe => write!(f, "UNSAFE"),
55        }
56    }
57}
58
59pub(crate) struct ExportDiagnostic {
60    pub export_name: String,
61    pub strategy: String,
62    pub mode: String,
63    pub cursor_column: Option<String>,
64    pub row_estimate: Option<i64>,
65    /// Average bytes per row from catalog/plan stats (PG EXPLAIN `width`,
66    /// MSSQL `dm_db_partition_stats` pages/row). `None` when unavailable
67    /// (e.g. MySQL, with no trustworthy scan-free estimate). Feeds the
68    /// oversized-chunk warning and is shown as the `Row width` line.
69    pub avg_row_bytes: Option<i64>,
70    pub cursor_min: Option<String>,
71    pub cursor_max: Option<String>,
72    pub scan_type: Option<String>,
73    pub uses_index: bool,
74    pub verdict: HealthVerdict,
75    pub recommended_profile: &'static str,
76    pub recommended_parallel: (u32, &'static str),
77    pub warnings: Vec<String>,
78    pub suggestion: Option<String>,
79}
80
81// Hand-rolled `Serialize` (rather than `#[derive]`) so the JSON shape stays
82// fully under our control without touching the three engine construction sites:
83//   - `recommended_parallel` (a raw `(u32, &str)`) becomes a self-describing
84//     `{ "level": N, "reason": "…" }` object instead of a positional 2-array;
85//   - a derived `capabilities` object ({uses_index, has_cursor, can_parallel})
86//     is computed from the sibling fields at serialization time — no stored
87//     field, no extra probe;
88//   - `None` optionals are skipped to keep the object lean for CI consumers.
89// `HealthVerdict` rides its own `#[derive(Serialize)]` (lowercase tokens).
90impl Serialize for ExportDiagnostic {
91    fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
92    where
93        S: serde::Serializer,
94    {
95        use serde::ser::SerializeMap;
96
97        #[derive(Serialize)]
98        struct RecommendedParallel {
99            level: u32,
100            reason: &'static str,
101        }
102        #[derive(Serialize)]
103        struct Capabilities {
104            uses_index: bool,
105            has_cursor: bool,
106            can_parallel: bool,
107        }
108
109        let mut map = serializer.serialize_map(None)?;
110        map.serialize_entry("export_name", &self.export_name)?;
111        map.serialize_entry("strategy", &self.strategy)?;
112        map.serialize_entry("mode", &self.mode)?;
113        if let Some(v) = &self.cursor_column {
114            map.serialize_entry("cursor_column", v)?;
115        }
116        if let Some(v) = &self.row_estimate {
117            map.serialize_entry("row_estimate", v)?;
118        }
119        if let Some(v) = &self.avg_row_bytes {
120            map.serialize_entry("avg_row_bytes", v)?;
121        }
122        if let Some(v) = &self.cursor_min {
123            map.serialize_entry("cursor_min", v)?;
124        }
125        if let Some(v) = &self.cursor_max {
126            map.serialize_entry("cursor_max", v)?;
127        }
128        if let Some(v) = &self.scan_type {
129            map.serialize_entry("scan_type", v)?;
130        }
131        map.serialize_entry("uses_index", &self.uses_index)?;
132        map.serialize_entry("verdict", &self.verdict)?;
133        map.serialize_entry("recommended_profile", &self.recommended_profile)?;
134        map.serialize_entry(
135            "recommended_parallel",
136            &RecommendedParallel {
137                level: self.recommended_parallel.0,
138                reason: self.recommended_parallel.1,
139            },
140        )?;
141        map.serialize_entry("warnings", &self.warnings)?;
142        if let Some(v) = &self.suggestion {
143            map.serialize_entry("suggestion", v)?;
144        }
145        map.serialize_entry(
146            "capabilities",
147            &Capabilities {
148                uses_index: self.uses_index,
149                has_cursor: self.cursor_column.is_some(),
150                can_parallel: self.recommended_parallel.0 > 1,
151            },
152        )?;
153        map.end()
154    }
155}
156
157/// Return the diagnostic for a single export without printing anything.
158///
159/// Used by `rivet plan` to capture preflight data into a `PlanArtifact`.
160pub(crate) fn get_export_diagnostic(
161    config: &Config,
162    export: &ExportConfig,
163) -> Result<ExportDiagnostic> {
164    let url = config.source.resolve_url()?;
165    let tls = config.source.tls.as_ref();
166    crate::source::warn_if_tls_disabled(&config.source);
167    match config.source.source_type {
168        SourceType::Postgres => postgres::diagnose_export_pg(&url, tls, export),
169        SourceType::Mysql => mysql::diagnose_export_mysql(&url, tls, export),
170        SourceType::Mssql => mssql::diagnose_export_mssql(&url, tls, export),
171    }
172}
173
174/// Dedup identity for a destination, shared by `check`'s credential probe
175/// and `doctor`'s write probe. Must include every field that changes where
176/// a probe lands — notably `path`, so two local destinations with different
177/// paths are probed separately. Keeping one helper prevents the two call
178/// sites from drifting apart (doctor's inline copy once omitted `path` and
179/// silently skipped the second local destination).
180fn destination_identity(d: &crate::config::DestinationConfig) -> String {
181    format!(
182        "{:?}:{}:{}:{}",
183        d.destination_type,
184        d.bucket.as_deref().unwrap_or("-"),
185        d.endpoint.as_deref().unwrap_or("-"),
186        d.path.as_deref().unwrap_or("-"),
187    )
188}
189
190/// One-line note for the "fail ✗ but rc 0" case: a column rendered `fail ✗`
191/// for `--target` does NOT gate the exit code unless `--strict` is also passed
192/// (that is the gate by design). Without this note an operator or CI reading
193/// the glyph alone would wrongly assume a non-zero exit. Pure so the exact text
194/// is unit-tested.
195fn target_fail_note(n: usize, target_label: &str) -> String {
196    let col = if n == 1 { "column" } else { "columns" };
197    format!(
198        "Note: {n} {col} FAIL {target_label} compatibility; exit code is gated only with --strict (currently exit 0)"
199    )
200}
201
202/// Build one [`ExportDiagnostic`] per export via `diagnose`, collecting them (or
203/// short-circuiting on the first error). Single-sources the connect → loop →
204/// return contract every engine's `check_*` shares — only the per-export
205/// `diagnose` call differs — so the deferred print-vs-collect decision lives in
206/// one place rather than triplicated across postgres / mysql / mssql.
207pub(super) fn collect_diagnostics<F>(
208    exports: &[&ExportConfig],
209    mut diagnose: F,
210) -> Result<Vec<ExportDiagnostic>>
211where
212    F: FnMut(&ExportConfig) -> Result<ExportDiagnostic>,
213{
214    exports.iter().map(|&e| diagnose(e)).collect()
215}
216
217pub fn check(
218    config_path: &str,
219    export_name: Option<&str>,
220    params: Option<&std::collections::HashMap<String, String>>,
221    show_type_report: bool,
222    strict: bool,
223    json_output: bool,
224    target: Option<ExportTarget>,
225) -> Result<()> {
226    let config = Config::load_with_params(config_path, params)?;
227
228    let exports: Vec<&ExportConfig> = if let Some(name) = export_name {
229        let e = config
230            .exports
231            .iter()
232            .find(|e| e.name == name)
233            .ok_or_else(|| anyhow::anyhow!("export '{}' not found in config", name))?;
234        vec![e]
235    } else {
236        config.exports.iter().collect()
237    };
238
239    let url = config.source.resolve_url()?;
240    let tls = config.source.tls.as_ref();
241    // Surface the plaintext-transport warning at preflight time too —
242    // operators should hear it from `rivet check` before they wait
243    // through a full `rivet run` to learn the same thing. `Once` inside
244    // the helper keeps emission to one line per process even when both
245    // `check` and `run` flow through it.
246    crate::source::warn_if_tls_disabled(&config.source);
247    // Each engine connects once and returns one diagnostic per export without
248    // printing. Rendering is decided here: TEXT (the per-export table) for the
249    // default human path, or — under `--json` — the diagnostic is merged into
250    // each export's type-report JSON object below (see the `if show_type_report`
251    // block). `get_export_diagnostic` already proved the diag is computable
252    // without printing; this is the multi-export variant of that path.
253    let diagnostics: Vec<ExportDiagnostic> = match config.source.source_type {
254        SourceType::Postgres => postgres::check_postgres(&url, tls, &exports)?,
255        SourceType::Mysql => mysql::check_mysql(&url, tls, &exports)?,
256        SourceType::Mssql => mssql::check_mssql(&url, tls, &exports)?,
257    };
258    if !json_output {
259        for diag in &diagnostics {
260            print_diagnostic(diag);
261        }
262    } else if !show_type_report {
263        // `--json` WITHOUT a type report (the CLI forces `show_type_report` on
264        // under `--json`, so this only fires if `check` is called directly with
265        // `json_output=true, show_type_report=false`): there is no per-export
266        // type-report object to nest the diagnostic into, so emit the diagnostic
267        // alone — still NDJSON, one object per export per line. Keeps the
268        // verdict from being silently dropped regardless of caller.
269        for diag in &diagnostics {
270            println!("{}", serde_json::to_string(diag)?);
271        }
272    }
273    // Under `--json` WITH a type report, the diagnostics are emitted nested
274    // inside each export's type-report object (the `show_type_report` block)
275    // rather than as a standalone array — see the design note there. Built only
276    // on the `--json` path (empty otherwise) since the TEXT path printed above.
277    let diag_by_export: std::collections::HashMap<&str, &ExportDiagnostic> = if json_output {
278        diagnostics
279            .iter()
280            .map(|d| (d.export_name.as_str(), d))
281            .collect()
282    } else {
283        std::collections::HashMap::new()
284    };
285
286    // Destination credential-resolution preflight.  Until 0.7.6 `check` only
287    // probed the source: a config with `AWS_ACCESS_KEY_ID` unset would pass
288    // `rivet check` (rc=0) and then explode on `run`, while `rivet doctor`
289    // caught it.  We don't issue a write-probe here (that is `doctor`'s job
290    // and has side effects) — but we *do* call `create_destination`, which
291    // resolves env vars / credentials_file existence at construction time.
292    // Each unique destination is probed once per `check` to keep multi-export
293    // configs cheap.
294    let mut seen_destinations: std::collections::HashSet<String> = std::collections::HashSet::new();
295    for export in &exports {
296        let dest_key = destination_identity(&export.destination);
297        if !seen_destinations.insert(dest_key) {
298            continue;
299        }
300        let expanded = crate::plan::build::expand_destination_templates(
301            export.destination.clone(),
302            &export.name,
303        );
304        crate::destination::create_destination(&expanded).map_err(|e| {
305            anyhow::anyhow!(
306                "export '{}': destination preflight failed: {:#}",
307                export.name,
308                e
309            )
310        })?;
311    }
312
313    // Whether the check ends clean (no strict failure, no target-fail column).
314    // Stays true for the default `rivet check` (no type report), so the "Next:
315    // rivet run" pointer still fires.
316    let mut clean = true;
317
318    if show_type_report {
319        let policy = if strict {
320            TypePolicy::strict()
321        } else {
322            TypePolicy::warn_only()
323        };
324
325        let mut any_fatal = false;
326        // Count hard target-FAIL columns (and remember which target) so that —
327        // when --strict was NOT passed and the exit code is therefore 0 — we can
328        // print a note. The "fail ✗" glyph in the table implies a hard failure,
329        // but exit is gated only by --strict; without this note an operator or CI
330        // reading the glyph alone would be misled into thinking rc != 0.
331        let mut target_fail_cols = 0usize;
332        let mut target_fail_label: Option<&'static str> = None;
333        for export in &exports {
334            let column_overrides =
335                crate::plan::parse_column_overrides_pub(&export.columns, &export.name)?;
336            // CLI `--target` wins; otherwise fall back to the per-export
337            // `target:` from the config (slice #2a). A declared-but-unknown
338            // target is a loud error — never silently ignored.
339            if let Some(t) = export.target.as_deref()
340                && crate::types::target::ExportTarget::parse(t).is_none()
341            {
342                anyhow::bail!(
343                    "export '{}': unknown target '{t}' (expected: {})",
344                    export.name,
345                    crate::types::target::ExportTarget::valid_target_names()
346                );
347            }
348            let eff_target = target.or_else(|| {
349                export
350                    .target
351                    .as_deref()
352                    .and_then(crate::types::target::ExportTarget::parse)
353            });
354            let config_dir = std::path::Path::new(config_path)
355                .parent()
356                .unwrap_or_else(|| std::path::Path::new("."));
357            match type_report::collect_report(
358                &config,
359                export,
360                &column_overrides,
361                &policy,
362                eff_target,
363                config_dir,
364                params,
365            ) {
366                Ok(report) => {
367                    if report.has_fatal() {
368                        any_fatal = true;
369                    }
370                    if let Some(t) = eff_target
371                        && report.has_target_fail()
372                    {
373                        any_fatal = true;
374                        target_fail_cols += report
375                            .columns
376                            .iter()
377                            .filter(|c| c.target_status == Some(TargetStatus::Fail))
378                            .count();
379                        target_fail_label.get_or_insert(t.label());
380                    }
381                    if json_output {
382                        // `--json` + `--type-report` interaction (DESIGN):
383                        // emit BOTH, nested. Each export gets ONE JSON object
384                        // (NDJSON, one per line, unchanged) keeping the
385                        // top-level type-report keys (`export`/`columns`/
386                        // `violations`) so existing consumers and the
387                        // `check_json_flag_outputs_type_report_as_json` test
388                        // stay green — and we attach the per-export DIAGNOSTIC
389                        // verdict under a new `"diagnostic"` key. This is the
390                        // least-surprising shape because `check --json` already
391                        // emitted one type-report object per export; we simply
392                        // enrich each with its verdict rather than printing a
393                        // second, separate JSON value (which would break a
394                        // single-`from_str` parse of stdout).
395                        print_report_json_with_diagnostic(
396                            &report,
397                            diag_by_export.get(export.name.as_str()).copied(),
398                        )?;
399                    } else {
400                        type_report::print_table(&report, eff_target);
401                    }
402                }
403                Err(e) => {
404                    log::warn!("type report for '{}' failed: {:#}", export.name, e);
405                    // The type report could not be collected, but the diagnostic
406                    // was. Under --json the verdict must still reach the
407                    // consumer, so emit a diagnostic-only object (no `columns`/
408                    // `violations`) rather than silently dropping this export.
409                    if json_output
410                        && let Some(diag) = diag_by_export.get(export.name.as_str()).copied()
411                    {
412                        println!("{}", serde_json::to_string(diag)?);
413                    }
414                }
415            }
416        }
417
418        if strict && any_fatal {
419            anyhow::bail!("strict mode: unsafe type mappings found (see report above)");
420        } else if !strict && target_fail_cols > 0 && !json_output {
421            // The table showed "fail ✗" but rc is 0 — say so explicitly. Skipped
422            // under --json so NDJSON output stays one object per line.
423            clean = false;
424            println!();
425            println!(
426                "{}",
427                target_fail_note(target_fail_cols, target_fail_label.unwrap_or("target"))
428            );
429        }
430    }
431
432    if !json_output {
433        // Verdict legend — decode the EFFICIENT/ACCEPTABLE/DEGRADED/UNSAFE words
434        // printed above and reassure that `check` is advisory: never blocks a run.
435        println!();
436        println!(
437            "Verdicts: EFFICIENT > ACCEPTABLE > DEGRADED > UNSAFE — advisory only; the run is never blocked."
438        );
439        if clean {
440            // Keep the ladder going to the final rung instead of ending cold.
441            println!(
442                "Looks good. Next: rivet run -c {config_path} --validate   # export, then verify row counts"
443            );
444        }
445    }
446
447    Ok(())
448}
449
450/// Emit one export's `--json` line: the type report (`export`/`columns`/
451/// `violations`/…) with the per-export DIAGNOSTIC verdict attached under a new
452/// `"diagnostic"` key. NDJSON — exactly one JSON object, terminated by a
453/// newline, so a multi-export config prints one parseable object per line
454/// (preserving the prior `check --json` type-report wire shape, now enriched).
455///
456/// `diag` is `None` only if the diagnostic could not be paired by export name
457/// (it always can in practice); in that case the type report is emitted as
458/// before, so the worst case is a missing `diagnostic` key, never a panic.
459fn print_report_json_with_diagnostic(
460    report: &type_report::ExportTypeReport,
461    diag: Option<&ExportDiagnostic>,
462) -> Result<()> {
463    let mut value = serde_json::to_value(report)?;
464    if let (Some(obj), Some(diag)) = (value.as_object_mut(), diag) {
465        obj.insert("diagnostic".to_string(), serde_json::to_value(diag)?);
466    }
467    println!("{}", serde_json::to_string(&value)?);
468    Ok(())
469}
470
471fn print_diagnostic(diag: &ExportDiagnostic) {
472    println!();
473    println!("Export: {}", diag.export_name);
474    println!("  Strategy:     {}", diag.strategy);
475    println!("  Mode:         {}", diag.mode);
476    if let Some(est) = diag.row_estimate {
477        if est >= 1_000_000 {
478            println!("  Row estimate: ~{}M", est / 1_000_000);
479        } else if est >= 1_000 {
480            println!("  Row estimate: ~{}K", est / 1_000);
481        } else {
482            println!("  Row estimate: ~{}", est);
483        }
484    }
485    if let Some(w) = diag.avg_row_bytes {
486        println!("  Row width:    ~{} bytes", w);
487    }
488    if let (Some(min_v), Some(max_v)) = (&diag.cursor_min, &diag.cursor_max) {
489        println!("  Cursor range: {} .. {}", min_v, max_v);
490    }
491    if let Some(col) = &diag.cursor_column {
492        println!("  Cursor col:   {}", col);
493    }
494    // Plain-language access path instead of a raw EXPLAIN node dump
495    // (`Result (cost=0.00..0.01 rows=1 width=36)`). Keyed off the authoritative
496    // `uses_index` bool, gated on `scan_type.is_some()` so engines without an
497    // EXPLAIN probe (MSSQL) stay silent.
498    if diag.scan_type.is_some() {
499        let access = if diag.uses_index {
500            "index scan (the cursor/chunk column is indexed)"
501        } else {
502            "full table scan (no index on the read path)"
503        };
504        println!("  Access:       {access}");
505    }
506    println!("  Verdict:      {}", diag.verdict);
507    println!(
508        "  Recommended:  tuning.profile: {}",
509        diag.recommended_profile
510    );
511    let (par_level, par_reason) = diag.recommended_parallel;
512    if par_level > 1 {
513        println!("  Recommended:  parallel: {} ({})", par_level, par_reason);
514    } else {
515        println!("  Parallelism:  {} ({})", par_level, par_reason);
516    }
517    for w in &diag.warnings {
518        println!("  Warning:      {}", w);
519    }
520    if let Some(suggestion) = &diag.suggestion {
521        println!("  Suggestion:   {}", suggestion);
522    }
523}
524
525#[cfg(test)]
526mod tests {
527    use super::*;
528    use crate::config::{DestinationConfig, DestinationType, ExportConfig, ExportMode, FormatType};
529    use doctor::{
530        categorize_dest_error, categorize_source_error, destination_error_hint, source_error_hint,
531    };
532    use serde_json::Value;
533
534    fn make_export(name: &str, mode: ExportMode, cursor: Option<&str>) -> ExportConfig {
535        // Baseline from the canonical test fixture; override only the fields
536        // these preflight tests vary (mode, cursor, CSV format, query, dest).
537        ExportConfig {
538            mode,
539            cursor_column: cursor.map(|s| s.to_string()),
540            query: Some("SELECT * FROM t".to_string()),
541            format: FormatType::Csv,
542            destination: DestinationConfig {
543                destination_type: DestinationType::Local,
544                path: Some("./out".to_string()),
545                ..Default::default()
546            },
547            ..crate::config::sample_export(name)
548        }
549    }
550
551    /// A representative incremental diagnostic for the `--json` serialization
552    /// tests: a cursor column (so `has_cursor` is true), an index (so
553    /// `uses_index` is true), a >1 parallel recommendation (so `can_parallel`
554    /// is true), and a couple of warnings.
555    fn sample_diagnostic(name: &str) -> ExportDiagnostic {
556        ExportDiagnostic {
557            export_name: name.to_string(),
558            strategy: "incremental(updated_at)".to_string(),
559            mode: "incremental".to_string(),
560            cursor_column: Some("updated_at".to_string()),
561            row_estimate: Some(1_234_567),
562            avg_row_bytes: Some(96),
563            cursor_min: Some("2020-01-01".to_string()),
564            cursor_max: Some("2024-01-01".to_string()),
565            scan_type: Some("Index Scan".to_string()),
566            uses_index: true,
567            verdict: HealthVerdict::Degraded,
568            recommended_profile: "safe",
569            recommended_parallel: (4, "large indexed dataset"),
570            warnings: vec!["Sparse key range".to_string(), "memory risk".to_string()],
571            suggestion: Some("create an index".to_string()),
572        }
573    }
574
575    // ── `rivet check --json`: the per-export DIAGNOSTIC verdict as JSON ───────
576
577    #[test]
578    fn diagnostic_json_has_lowercase_verdict_and_core_fields() {
579        let diag = sample_diagnostic("orders");
580        let v: serde_json::Value =
581            serde_json::from_str(&serde_json::to_string(&diag).unwrap()).unwrap();
582
583        // Verdict serializes to a stable lowercase token (not the SHOUTING
584        // Display form), so CI can match on it case-sensitively.
585        assert_eq!(v["verdict"], "degraded", "got: {v}");
586        assert_eq!(v["strategy"], "incremental(updated_at)", "got: {v}");
587        assert_eq!(v["mode"], "incremental", "got: {v}");
588        assert_eq!(v["recommended_profile"], "safe", "got: {v}");
589        assert!(v["warnings"].is_array(), "warnings must be an array: {v}");
590        assert_eq!(v["warnings"].as_array().unwrap().len(), 2, "got: {v}");
591        assert_eq!(v["export_name"], "orders", "got: {v}");
592    }
593
594    #[test]
595    fn diagnostic_json_verdict_tokens_are_all_lowercase() {
596        for (verdict, token) in [
597            (HealthVerdict::Efficient, "efficient"),
598            (HealthVerdict::Acceptable, "acceptable"),
599            (HealthVerdict::Degraded, "degraded"),
600            (HealthVerdict::Unsafe, "unsafe"),
601        ] {
602            let mut diag = sample_diagnostic("t");
603            diag.verdict = verdict;
604            let v: serde_json::Value =
605                serde_json::from_str(&serde_json::to_string(&diag).unwrap()).unwrap();
606            assert_eq!(v["verdict"], token, "verdict must lowercase to {token}");
607        }
608    }
609
610    #[test]
611    fn diagnostic_json_recommended_parallel_is_named_object_not_tuple() {
612        // The raw `(u32, &str)` must NOT leak as a positional 2-array; consumers
613        // read `recommended_parallel.level` / `.reason`.
614        let diag = sample_diagnostic("t");
615        let v: serde_json::Value =
616            serde_json::from_str(&serde_json::to_string(&diag).unwrap()).unwrap();
617        assert!(
618            v["recommended_parallel"].is_object(),
619            "recommended_parallel must be an object, got: {}",
620            v["recommended_parallel"]
621        );
622        assert_eq!(v["recommended_parallel"]["level"], 4, "got: {v}");
623        assert_eq!(
624            v["recommended_parallel"]["reason"], "large indexed dataset",
625            "got: {v}"
626        );
627    }
628
629    #[test]
630    fn diagnostic_json_capabilities_are_derived_from_fields() {
631        let diag = sample_diagnostic("t");
632        let v: serde_json::Value =
633            serde_json::from_str(&serde_json::to_string(&diag).unwrap()).unwrap();
634        let caps = &v["capabilities"];
635        assert_eq!(caps["uses_index"], true, "got: {caps}");
636        assert_eq!(caps["has_cursor"], true, "got: {caps}");
637        assert_eq!(caps["can_parallel"], true, "got: {caps}");
638    }
639
640    #[test]
641    fn diagnostic_json_capabilities_flip_with_fields() {
642        // A non-cursor, no-index, single-worker diagnostic flips all three.
643        let mut diag = sample_diagnostic("t");
644        diag.cursor_column = None;
645        diag.uses_index = false;
646        diag.recommended_parallel = (1, "small dataset");
647        let v: serde_json::Value =
648            serde_json::from_str(&serde_json::to_string(&diag).unwrap()).unwrap();
649        let caps = &v["capabilities"];
650        assert_eq!(caps["uses_index"], false, "got: {caps}");
651        assert_eq!(caps["has_cursor"], false, "got: {caps}");
652        assert_eq!(caps["can_parallel"], false, "got: {caps}");
653    }
654
655    #[test]
656    fn diagnostic_json_skips_none_optionals() {
657        // `None` optionals are omitted (not `null`) to keep the object lean.
658        let mut diag = sample_diagnostic("t");
659        diag.suggestion = None;
660        diag.scan_type = None;
661        let v: serde_json::Value =
662            serde_json::from_str(&serde_json::to_string(&diag).unwrap()).unwrap();
663        let obj = v.as_object().unwrap();
664        assert!(!obj.contains_key("suggestion"), "None must be omitted: {v}");
665        assert!(!obj.contains_key("scan_type"), "None must be omitted: {v}");
666    }
667
668    /// Build the same `Value` `print_report_json_with_diagnostic` prints, so the
669    /// merged shape is asserted without capturing stdout.
670    fn merged_check_json(report: &type_report::ExportTypeReport, diag: &ExportDiagnostic) -> Value {
671        let mut value = serde_json::to_value(report).unwrap();
672        value.as_object_mut().unwrap().insert(
673            "diagnostic".to_string(),
674            serde_json::to_value(diag).unwrap(),
675        );
676        value
677    }
678
679    fn empty_report(export: &str) -> type_report::ExportTypeReport {
680        type_report::ExportTypeReport {
681            export: export.to_string(),
682            columns: Vec::new(),
683            violations: Vec::new(),
684            target_failures: false,
685            recovery_sql: None,
686        }
687    }
688
689    #[test]
690    fn check_json_merges_diagnostic_into_type_report_object() {
691        // The `--json` + `--type-report` interaction: ONE object per export
692        // keeping the type-report keys (`export`/`columns`/`violations`) — so
693        // the existing `check_json_flag_outputs_type_report_as_json` contract
694        // holds — PLUS a nested `diagnostic` carrying the verdict.
695        let report = empty_report("orders");
696        let diag = sample_diagnostic("orders");
697        let v = merged_check_json(&report, &diag);
698
699        // Pre-existing type-report keys still at the root.
700        assert_eq!(v["export"], "orders", "got: {v}");
701        assert!(v["columns"].is_array(), "columns at root: {v}");
702        assert!(v["violations"].is_array(), "violations at root: {v}");
703
704        // The diagnostic is nested and carries the verdict + advice.
705        let d = &v["diagnostic"];
706        assert_eq!(d["verdict"], "degraded", "got: {d}");
707        assert_eq!(d["strategy"], "incremental(updated_at)", "got: {d}");
708        assert_eq!(d["mode"], "incremental", "got: {d}");
709        assert_eq!(d["recommended_profile"], "safe", "got: {d}");
710        assert!(d["warnings"].is_array(), "warnings array: {d}");
711        assert_eq!(d["capabilities"]["has_cursor"], true, "got: {d}");
712    }
713
714    #[test]
715    fn check_json_object_is_a_single_parseable_line() {
716        // NDJSON: serializing yields exactly one JSON value with no trailing
717        // data, so `serde_json::from_str(line.trim())` (as the live test does)
718        // parses it whole.
719        let report = empty_report("orders");
720        let diag = sample_diagnostic("orders");
721        let line = serde_json::to_string(&merged_check_json(&report, &diag)).unwrap();
722        assert!(!line.contains('\n'), "one object per line: {line}");
723        let parsed: Value = serde_json::from_str(line.trim()).expect("must parse whole");
724        assert_eq!(parsed["export"], "orders");
725    }
726
727    // ── L8: 'fail ✗' note when --target FAILs but --strict was not passed ─────
728    // The glyph implies a hard failure; exit is gated only by --strict. The note
729    // tells an operator/CI the exit is 0 so the glyph doesn't mislead.
730    #[test]
731    fn target_fail_note_names_count_target_and_strict_gate() {
732        let note = target_fail_note(2, "bigquery");
733        assert!(note.contains("2 columns FAIL"), "got: {note}");
734        assert!(note.contains("bigquery"), "got: {note}");
735        assert!(note.contains("--strict"), "got: {note}");
736        assert!(note.contains("exit 0"), "got: {note}");
737    }
738
739    #[test]
740    fn target_fail_note_singular_for_one_column() {
741        let note = target_fail_note(1, "duckdb");
742        assert!(note.contains("1 column FAIL"), "got: {note}");
743        assert!(!note.contains("1 columns"), "should be singular: {note}");
744    }
745
746    #[test]
747    fn verdict_small_indexed_with_cursor_is_efficient() {
748        let v = compute_verdict(Some(500_000), true, true, None, 1);
749        assert!(matches!(v, HealthVerdict::Efficient), "got: {v}");
750    }
751
752    #[test]
753    fn verdict_large_indexed_with_cursor_is_acceptable() {
754        let v = compute_verdict(Some(20_000_000), true, true, None, 1);
755        assert!(matches!(v, HealthVerdict::Acceptable), "got: {v}");
756    }
757
758    #[test]
759    fn verdict_no_index_no_cursor_is_degraded() {
760        let v = compute_verdict(Some(500_000), false, false, None, 1);
761        assert!(matches!(v, HealthVerdict::Degraded), "got: {v}");
762    }
763
764    #[test]
765    fn verdict_huge_no_index_is_unsafe() {
766        let v = compute_verdict(Some(100_000_000), false, false, None, 1);
767        assert!(matches!(v, HealthVerdict::Unsafe), "got: {v}");
768    }
769
770    #[test]
771    fn parse_pg_row_estimate_from_sort_plan() {
772        let plan = "Sort  (cost=12345.67..12456.78 rows=1000455 width=50)\n  ->  Seq Scan on orders  (cost=0.00..8765.43 rows=1000455 width=50)";
773        assert_eq!(parse_pg_row_estimate(plan), Some(1_000_455));
774    }
775
776    #[test]
777    fn parse_pg_row_estimate_from_index_scan() {
778        let plan =
779            "Index Scan using idx_updated on orders  (cost=0.42..81676.36 rows=500000 width=50)";
780        assert_eq!(parse_pg_row_estimate(plan), Some(500_000));
781    }
782
783    #[test]
784    fn extract_scan_type_detects_seq_scan() {
785        let plan = "Sort  (cost=...)\n  ->  Seq Scan on users  (cost=...)";
786        let st = extract_scan_type(plan);
787        assert!(st.contains("Seq Scan"), "expected Seq Scan, got: {st}");
788    }
789
790    #[test]
791    fn extract_scan_type_detects_index_scan() {
792        let plan = "Index Scan using users_pkey on users  (cost=0.42..123.45 rows=100 width=50)";
793        let st = extract_scan_type(plan);
794        assert!(st.contains("Index Scan"), "expected Index Scan, got: {st}");
795    }
796
797    #[test]
798    fn suggestion_for_efficient_verdict_is_none() {
799        let e = make_export("t", ExportMode::Full, None);
800        let s = build_suggestion(&HealthVerdict::Efficient, Some(1000), true, &e);
801        assert!(
802            s.is_none(),
803            "efficient verdict should produce no suggestion"
804        );
805    }
806
807    #[test]
808    fn suggestion_for_degraded_verdict_recommends_safe_profile() {
809        let e = make_export("t", ExportMode::Full, None);
810        let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e);
811        let msg = s.expect("degraded verdict should produce a suggestion");
812        assert!(
813            msg.contains("safe"),
814            "suggestion should recommend safe profile, got: {msg}"
815        );
816    }
817
818    fn src_err(msg: &str) -> &'static str {
819        categorize_source_error(&anyhow::anyhow!("{}", msg))
820    }
821
822    #[test]
823    fn source_password_rejected_is_auth_error() {
824        assert_eq!(
825            src_err("password authentication failed for user \"rivet\""),
826            "auth error"
827        );
828    }
829
830    #[test]
831    fn source_authentication_failed_is_auth_error() {
832        assert_eq!(src_err("FATAL: authentication failed"), "auth error");
833    }
834
835    #[test]
836    fn source_access_denied_is_auth_error() {
837        assert_eq!(
838            src_err("Access denied for user 'rivet'@'localhost'"),
839            "auth error"
840        );
841    }
842
843    #[test]
844    fn source_connection_refused_is_connectivity() {
845        assert_eq!(
846            src_err("connection refused (os error 61)"),
847            "connectivity error"
848        );
849    }
850
851    #[test]
852    fn source_timed_out_is_connectivity() {
853        assert_eq!(src_err("connection timed out"), "connectivity error");
854    }
855
856    #[test]
857    fn source_dns_translate_host_is_connectivity() {
858        assert_eq!(
859            src_err("could not translate host name \"db.bad\" to address"),
860            "connectivity error"
861        );
862    }
863
864    #[test]
865    fn source_name_not_known_is_connectivity() {
866        assert_eq!(src_err("Name or service not known"), "connectivity error");
867    }
868
869    #[test]
870    fn source_unknown_error_is_generic() {
871        assert_eq!(src_err("something totally unexpected"), "error");
872    }
873
874    fn dest_config(dtype: DestinationType) -> DestinationConfig {
875        DestinationConfig {
876            destination_type: dtype,
877            bucket: Some("b".to_string()),
878            ..Default::default()
879        }
880    }
881
882    fn dest_err(msg: &str, dtype: DestinationType) -> &'static str {
883        let cfg = dest_config(dtype);
884        categorize_dest_error(&anyhow::anyhow!("{}", msg), &cfg)
885    }
886
887    fn local_dest(path: &str) -> DestinationConfig {
888        DestinationConfig {
889            destination_type: DestinationType::Local,
890            path: Some(path.to_string()),
891            ..Default::default()
892        }
893    }
894
895    // Regression (doctor-dedup): doctor's inline dedup key omitted `path`,
896    // so two local destinations with different paths collapsed to one entry
897    // and the second was never write-probed. The shared identity must keep
898    // them distinct.
899    #[test]
900    fn destination_identity_distinguishes_local_paths() {
901        assert_ne!(
902            destination_identity(&local_dest("/tmp/a")),
903            destination_identity(&local_dest("/tmp/b")),
904        );
905    }
906
907    #[test]
908    fn destination_identity_collapses_identical_local_destinations() {
909        assert_eq!(
910            destination_identity(&local_dest("/tmp/a")),
911            destination_identity(&local_dest("/tmp/a")),
912        );
913    }
914
915    #[test]
916    fn destination_identity_distinguishes_buckets() {
917        let a = DestinationConfig {
918            bucket: Some("bucket-a".to_string()),
919            ..dest_config(DestinationType::S3)
920        };
921        let b = DestinationConfig {
922            bucket: Some("bucket-b".to_string()),
923            ..dest_config(DestinationType::S3)
924        };
925        assert_ne!(destination_identity(&a), destination_identity(&b));
926    }
927
928    // Same bucket name on different endpoints (e.g. AWS vs MinIO) is two
929    // distinct destinations and must be probed separately.
930    #[test]
931    fn destination_identity_distinguishes_endpoints_for_same_bucket() {
932        let aws = dest_config(DestinationType::S3);
933        let minio = DestinationConfig {
934            endpoint: Some("http://localhost:9000".to_string()),
935            ..dest_config(DestinationType::S3)
936        };
937        assert_ne!(destination_identity(&aws), destination_identity(&minio));
938    }
939
940    #[test]
941    fn dest_credential_loading_is_auth_error() {
942        assert_eq!(
943            dest_err(
944                "loading credential to sign http request",
945                DestinationType::Gcs
946            ),
947            "auth error"
948        );
949    }
950
951    #[test]
952    fn dest_permission_denied_is_auth_error() {
953        assert_eq!(
954            dest_err("permission denied on resource bucket", DestinationType::S3),
955            "auth error"
956        );
957    }
958
959    #[test]
960    fn dest_forbidden_is_auth_error() {
961        assert_eq!(
962            dest_err("403 Forbidden", DestinationType::Gcs),
963            "auth error"
964        );
965    }
966
967    #[test]
968    fn dest_unauthorized_is_auth_error() {
969        assert_eq!(
970            dest_err("401 Unauthorized", DestinationType::S3),
971            "auth error"
972        );
973    }
974
975    #[test]
976    fn dest_invalid_grant_is_auth_error() {
977        assert_eq!(
978            dest_err(
979                "invalid_grant: token has been revoked",
980                DestinationType::Gcs
981            ),
982            "auth error"
983        );
984    }
985
986    #[test]
987    fn dest_nosuchbucket_s3_is_bucket_not_found() {
988        assert_eq!(
989            dest_err(
990                "NoSuchBucket: the specified bucket does not exist",
991                DestinationType::S3
992            ),
993            "bucket not found"
994        );
995    }
996
997    #[test]
998    fn dest_not_found_gcs_is_bucket_not_found() {
999        assert_eq!(
1000            dest_err("bucket not found (404)", DestinationType::Gcs),
1001            "bucket not found"
1002        );
1003    }
1004
1005    #[test]
1006    fn dest_not_found_local_is_path_not_found() {
1007        assert_eq!(
1008            dest_err("path not found: /tmp/missing", DestinationType::Local),
1009            "path not found"
1010        );
1011    }
1012
1013    #[test]
1014    fn dest_connection_refused_is_connectivity() {
1015        assert_eq!(
1016            dest_err("connection refused to endpoint", DestinationType::S3),
1017            "connectivity error"
1018        );
1019    }
1020
1021    #[test]
1022    fn dest_dns_error_is_connectivity() {
1023        assert_eq!(
1024            dest_err("dns error: failed to lookup address", DestinationType::S3),
1025            "connectivity error"
1026        );
1027    }
1028
1029    #[test]
1030    fn dest_timed_out_is_connectivity() {
1031        assert_eq!(
1032            dest_err("request timed out after 30s", DestinationType::Gcs),
1033            "connectivity error"
1034        );
1035    }
1036
1037    #[test]
1038    fn dest_unknown_error_is_generic() {
1039        assert_eq!(
1040            dest_err("something else entirely", DestinationType::S3),
1041            "error"
1042        );
1043    }
1044
1045    #[test]
1046    fn strategy_full_scan() {
1047        let e = make_export("t", ExportMode::Full, None);
1048        assert_eq!(derive_strategy(&e), "full-scan");
1049    }
1050
1051    #[test]
1052    fn strategy_full_parallel() {
1053        let mut e = make_export("t", ExportMode::Full, None);
1054        e.parallel = 4;
1055        assert_eq!(derive_strategy(&e), "full-parallel(4)");
1056    }
1057
1058    #[test]
1059    fn strategy_incremental() {
1060        let e = make_export("t", ExportMode::Incremental, Some("updated_at"));
1061        assert_eq!(derive_strategy(&e), "incremental(updated_at)");
1062    }
1063
1064    #[test]
1065    fn strategy_chunked() {
1066        let mut e = make_export("t", ExportMode::Chunked, None);
1067        e.chunk_column = Some("id".to_string());
1068        e.chunk_size = 50_000;
1069        assert_eq!(derive_strategy(&e), "chunked(id, size=50000)");
1070    }
1071
1072    #[test]
1073    fn strategy_chunked_parallel() {
1074        let mut e = make_export("t", ExportMode::Chunked, None);
1075        e.chunk_column = Some("id".to_string());
1076        e.chunk_size = 50_000;
1077        e.parallel = 3;
1078        assert_eq!(derive_strategy(&e), "chunked-parallel(id, size=50000, p=3)");
1079    }
1080
1081    #[test]
1082    fn strategy_time_window() {
1083        let mut e = make_export("t", ExportMode::TimeWindow, None);
1084        e.time_column = Some("created_at".to_string());
1085        e.days_window = Some(7);
1086        assert_eq!(derive_strategy(&e), "time-window(created_at, 7d)");
1087    }
1088
1089    #[test]
1090    fn profile_small_indexed_is_fast() {
1091        let e = make_export("t", ExportMode::Full, None);
1092        assert_eq!(recommend_profile(Some(500_000), true, &e), "fast");
1093    }
1094
1095    #[test]
1096    fn profile_medium_indexed_is_balanced() {
1097        let e = make_export("t", ExportMode::Full, None);
1098        assert_eq!(recommend_profile(Some(5_000_000), true, &e), "balanced");
1099    }
1100
1101    #[test]
1102    fn profile_large_indexed_is_safe() {
1103        let e = make_export("t", ExportMode::Full, None);
1104        assert_eq!(recommend_profile(Some(50_000_000), true, &e), "safe");
1105    }
1106
1107    #[test]
1108    fn profile_small_no_index_is_balanced() {
1109        let e = make_export("t", ExportMode::Full, None);
1110        assert_eq!(recommend_profile(Some(50_000), false, &e), "balanced");
1111    }
1112
1113    #[test]
1114    fn profile_small_no_index_parallel_is_safe() {
1115        let mut e = make_export("t", ExportMode::Full, None);
1116        e.parallel = 4;
1117        assert_eq!(recommend_profile(Some(50_000), false, &e), "safe");
1118    }
1119
1120    #[test]
1121    fn profile_medium_no_index_is_balanced() {
1122        let e = make_export("t", ExportMode::Full, None);
1123        assert_eq!(recommend_profile(Some(500_000), false, &e), "balanced");
1124    }
1125
1126    #[test]
1127    fn profile_large_no_index_is_safe() {
1128        let e = make_export("t", ExportMode::Full, None);
1129        assert_eq!(recommend_profile(Some(5_000_000), false, &e), "safe");
1130    }
1131
1132    #[test]
1133    fn sparse_range_warning_when_very_sparse() {
1134        let mut e = make_export("t", ExportMode::Chunked, None);
1135        e.chunk_column = Some("id".to_string());
1136        e.chunk_size = 100_000;
1137        let w = check_sparse_range(&e, Some(100_000), Some("1"), Some("10000000"));
1138        assert!(w.is_some(), "should warn about sparse range");
1139        let msg = w.unwrap();
1140        assert!(msg.contains("Sparse key range"), "got: {msg}");
1141        assert!(msg.contains("empty"), "got: {msg}");
1142    }
1143
1144    #[test]
1145    fn sparse_range_no_warning_when_dense() {
1146        let mut e = make_export("t", ExportMode::Chunked, None);
1147        e.chunk_column = Some("id".to_string());
1148        e.chunk_size = 100_000;
1149        let w = check_sparse_range(&e, Some(100_000), Some("1"), Some("100000"));
1150        assert!(w.is_none(), "should not warn for dense range");
1151    }
1152
1153    #[test]
1154    fn sparse_range_skipped_when_chunk_dense() {
1155        let mut e = make_export("t", ExportMode::Chunked, None);
1156        e.chunk_column = Some("id".to_string());
1157        e.chunk_dense = true;
1158        e.chunk_size = 100_000;
1159        let w = check_sparse_range(&e, Some(100_000), Some("1"), Some("10000000"));
1160        assert!(
1161            w.is_none(),
1162            "chunk_dense uses ordinals, not physical id span"
1163        );
1164    }
1165
1166    #[test]
1167    fn dense_surrogate_warning_when_chunk_dense_builtin() {
1168        let mut e = make_export("t", ExportMode::Chunked, None);
1169        e.chunk_column = Some("id".to_string());
1170        e.chunk_dense = true;
1171        e.query = Some("SELECT id FROM orders".to_string());
1172        let w = check_dense_surrogate_cost(&e);
1173        assert!(w.is_some(), "should warn about built-in ROW_NUMBER cost");
1174        assert!(w.unwrap().contains("global sort"));
1175    }
1176
1177    #[test]
1178    fn sparse_range_not_triggered_for_non_chunked() {
1179        let e = make_export("t", ExportMode::Full, None);
1180        let w = check_sparse_range(&e, Some(100), Some("1"), Some("1000000"));
1181        assert!(w.is_none(), "should not warn for non-chunked mode");
1182    }
1183
1184    #[test]
1185    fn dense_surrogate_warning_with_row_number() {
1186        let mut e = make_export("t", ExportMode::Chunked, None);
1187        e.chunk_column = Some("rn".to_string());
1188        e.query = Some("SELECT *, ROW_NUMBER() OVER (ORDER BY id) AS rn FROM orders".to_string());
1189        let w = check_dense_surrogate_cost(&e);
1190        assert!(w.is_some(), "should warn about ROW_NUMBER cost");
1191        assert!(w.unwrap().contains("global sort"));
1192    }
1193
1194    #[test]
1195    fn no_dense_surrogate_warning_without_row_number() {
1196        let mut e = make_export("t", ExportMode::Chunked, None);
1197        e.chunk_column = Some("id".to_string());
1198        e.query = Some("SELECT * FROM orders".to_string());
1199        let w = check_dense_surrogate_cost(&e);
1200        assert!(w.is_none());
1201    }
1202
1203    #[test]
1204    fn no_dense_surrogate_warning_for_non_chunked() {
1205        let mut e = make_export("t", ExportMode::Full, None);
1206        e.query = Some("SELECT ROW_NUMBER() OVER () AS rn FROM t".to_string());
1207        let w = check_dense_surrogate_cost(&e);
1208        assert!(w.is_none(), "should not warn for non-chunked mode");
1209    }
1210
1211    #[test]
1212    fn parallel_memory_warning_large_dataset() {
1213        let mut e = make_export("t", ExportMode::Chunked, None);
1214        e.parallel = 4;
1215        let w = check_parallel_memory_risk(&e, Some(10_000_000));
1216        assert!(w.is_some(), "should warn about memory risk");
1217        let msg = w.unwrap();
1218        assert!(msg.contains("Parallel=4"), "got: {msg}");
1219        assert!(msg.contains("memory"), "got: {msg}");
1220    }
1221
1222    #[test]
1223    fn no_parallel_memory_warning_small_dataset() {
1224        let mut e = make_export("t", ExportMode::Chunked, None);
1225        e.parallel = 4;
1226        let w = check_parallel_memory_risk(&e, Some(1_000));
1227        assert!(w.is_none(), "should not warn for small dataset");
1228    }
1229
1230    #[test]
1231    fn no_parallel_memory_warning_single_worker() {
1232        let e = make_export("t", ExportMode::Full, None);
1233        let w = check_parallel_memory_risk(&e, Some(100_000_000));
1234        assert!(w.is_none(), "should not warn when parallel=1");
1235    }
1236
1237    #[test]
1238    fn suggestion_degraded_full_recommends_incremental() {
1239        let e = make_export("t", ExportMode::Full, None);
1240        let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e).unwrap();
1241        assert!(s.contains("incremental"), "got: {s}");
1242    }
1243
1244    #[test]
1245    fn suggestion_degraded_chunked_recommends_index() {
1246        let mut e = make_export("t", ExportMode::Chunked, None);
1247        e.chunk_column = Some("id".to_string());
1248        let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e).unwrap();
1249        assert!(s.contains("index on 'id'"), "got: {s}");
1250    }
1251
1252    #[test]
1253    fn suggestion_degraded_time_window_recommends_index() {
1254        let mut e = make_export("t", ExportMode::TimeWindow, None);
1255        e.time_column = Some("created_at".to_string());
1256        e.days_window = Some(7);
1257        let s = build_suggestion(&HealthVerdict::Degraded, Some(500_000), false, &e).unwrap();
1258        assert!(s.contains("index on 'created_at'"), "got: {s}");
1259    }
1260
1261    #[test]
1262    fn suggestion_unsafe_full_recommends_incremental() {
1263        let e = make_export("t", ExportMode::Full, None);
1264        let s = build_suggestion(&HealthVerdict::Unsafe, Some(100_000_000), false, &e).unwrap();
1265        assert!(s.contains("incremental"), "got: {s}");
1266    }
1267
1268    #[test]
1269    fn suggestion_unsafe_chunked_recommends_index_and_parallel() {
1270        let mut e = make_export("t", ExportMode::Chunked, None);
1271        e.chunk_column = Some("id".to_string());
1272        let s = build_suggestion(&HealthVerdict::Unsafe, Some(100_000_000), false, &e).unwrap();
1273        assert!(s.contains("index on 'id'"), "got: {s}");
1274        assert!(s.contains("parallel"), "got: {s}");
1275    }
1276
1277    #[test]
1278    fn suggestion_unsafe_incremental_recommends_index_on_cursor() {
1279        let e = make_export("t", ExportMode::Incremental, Some("updated_at"));
1280        let s = build_suggestion(&HealthVerdict::Unsafe, Some(100_000_000), false, &e).unwrap();
1281        assert!(s.contains("index on 'updated_at'"), "got: {s}");
1282    }
1283
1284    #[test]
1285    fn suggestion_acceptable_large_full_recommends_incremental() {
1286        let e = make_export("t", ExportMode::Full, None);
1287        let s = build_suggestion(&HealthVerdict::Acceptable, Some(20_000_000), true, &e).unwrap();
1288        assert!(s.contains("incremental"), "got: {s}");
1289    }
1290
1291    #[test]
1292    fn parallel_only_for_chunked_mode() {
1293        let e = make_export("t", ExportMode::Full, None);
1294        let (level, _) = recommend_parallelism(&e, Some(1_000_000), true);
1295        assert_eq!(level, 1, "non-chunked mode should recommend 1");
1296    }
1297
1298    #[test]
1299    fn parallel_small_dataset_is_one() {
1300        let mut e = make_export("t", ExportMode::Chunked, None);
1301        e.chunk_column = Some("id".to_string());
1302        let (level, _) = recommend_parallelism(&e, Some(10_000), true);
1303        assert_eq!(level, 1, "small dataset should recommend 1");
1304    }
1305
1306    #[test]
1307    fn parallel_moderate_indexed_is_two() {
1308        let mut e = make_export("t", ExportMode::Chunked, None);
1309        e.chunk_column = Some("id".to_string());
1310        let (level, _) = recommend_parallelism(&e, Some(200_000), true);
1311        assert_eq!(level, 2, "moderate indexed dataset should recommend 2");
1312    }
1313
1314    #[test]
1315    fn parallel_large_indexed_is_four() {
1316        let mut e = make_export("t", ExportMode::Chunked, None);
1317        e.chunk_column = Some("id".to_string());
1318        let (level, _) = recommend_parallelism(&e, Some(2_000_000), true);
1319        assert_eq!(level, 4, "large indexed dataset should recommend 4");
1320    }
1321
1322    #[test]
1323    fn parallel_no_index_large_is_one() {
1324        let mut e = make_export("t", ExportMode::Chunked, None);
1325        e.chunk_column = Some("id".to_string());
1326        let (level, reason) = recommend_parallelism(&e, Some(10_000_000), false);
1327        assert_eq!(level, 1, "no index + large should recommend 1");
1328        assert!(reason.contains("no index"), "got: {reason}");
1329    }
1330
1331    #[test]
1332    fn parallel_no_index_moderate_is_conservative() {
1333        let mut e = make_export("t", ExportMode::Chunked, None);
1334        e.chunk_column = Some("id".to_string());
1335        let (level, _) = recommend_parallelism(&e, Some(200_000), false);
1336        assert_eq!(
1337            level, 2,
1338            "no index + moderate should recommend 2 (conservative)"
1339        );
1340    }
1341
1342    #[test]
1343    fn suggestion_acceptable_large_chunked_recommends_parallel() {
1344        let mut e = make_export("t", ExportMode::Chunked, None);
1345        e.chunk_column = Some("id".to_string());
1346        let s = build_suggestion(&HealthVerdict::Acceptable, Some(20_000_000), true, &e).unwrap();
1347        assert!(s.contains("parallel"), "got: {s}");
1348    }
1349
1350    #[test]
1351    fn connection_limit_warn_when_parallel_meets_max() {
1352        let w = check_connection_limit(20, Some(20));
1353        assert!(w.is_some(), "should warn when parallel == max_connections");
1354        let msg = w.unwrap();
1355        assert!(msg.contains("max_connections=20"), "got: {msg}");
1356        assert!(msg.contains("parallel=20"), "got: {msg}");
1357    }
1358
1359    #[test]
1360    fn connection_limit_warn_when_parallel_exceeds_max() {
1361        let w = check_connection_limit(100, Some(20));
1362        assert!(w.is_some(), "should warn when parallel > max_connections");
1363        let msg = w.unwrap();
1364        assert!(msg.contains("max_connections=20"), "got: {msg}");
1365    }
1366
1367    #[test]
1368    fn connection_limit_no_warn_when_parallel_below_max() {
1369        let w = check_connection_limit(4, Some(100));
1370        assert!(
1371            w.is_none(),
1372            "should not warn when parallel << max_connections"
1373        );
1374    }
1375
1376    #[test]
1377    fn connection_limit_no_warn_when_parallel_is_one() {
1378        let w = check_connection_limit(1, Some(5));
1379        assert!(
1380            w.is_none(),
1381            "single worker never triggers connection warning"
1382        );
1383    }
1384
1385    #[test]
1386    fn connection_limit_skipped_note_when_max_unknown_and_parallel_gt_one() {
1387        let w = check_connection_limit(100, None);
1388        assert!(w.is_some(), "should note that check was skipped");
1389        let msg = w.unwrap();
1390        assert!(msg.contains("skipped"), "got: {msg}");
1391    }
1392
1393    #[test]
1394    fn connection_limit_no_note_when_max_unknown_and_parallel_is_one() {
1395        let w = check_connection_limit(1, None);
1396        assert!(
1397            w.is_none(),
1398            "single worker never triggers connection warning"
1399        );
1400    }
1401
1402    #[test]
1403    fn connection_limit_suggests_headroom() {
1404        let w = check_connection_limit(25, Some(20)).unwrap();
1405        // Suggested safe max should be max_connections - 3 = 17
1406        assert!(
1407            w.contains("17"),
1408            "should suggest leaving headroom, got: {w}"
1409        );
1410    }
1411
1412    // ── v0.7.4: actionable hints next to categorised errors ───────────
1413
1414    fn src_hint(msg: &str, st: SourceType) -> Option<&'static str> {
1415        let err = anyhow::anyhow!("{}", msg);
1416        let cat = categorize_source_error(&err);
1417        source_error_hint(cat, &err, &st)
1418    }
1419
1420    fn dest_hint(msg: &str, dt: DestinationType) -> Option<&'static str> {
1421        let err = anyhow::anyhow!("{}", msg);
1422        let dest = DestinationConfig {
1423            destination_type: dt,
1424            bucket: Some("b".into()),
1425            ..Default::default()
1426        };
1427        let cat = categorize_dest_error(&err, &dest);
1428        destination_error_hint(cat, &dest)
1429    }
1430
1431    #[test]
1432    fn source_tls_handshake_returns_pg_specific_tls_hint() {
1433        let h = src_hint("TLS handshake failed", SourceType::Postgres).expect("hint");
1434        assert!(h.contains("tls.mode") && h.contains("ca_file"), "got: {h}");
1435    }
1436
1437    #[test]
1438    fn source_tls_handshake_returns_mysql_specific_tls_hint() {
1439        let h = src_hint("certificate verify failed", SourceType::Mysql).expect("hint");
1440        assert!(h.contains("tls.mode"), "got: {h}");
1441    }
1442
1443    #[test]
1444    fn source_auth_error_postgres_mentions_pg_hba() {
1445        let h = src_hint("password authentication failed", SourceType::Postgres).expect("hint");
1446        assert!(h.contains("pg_hba") && h.contains("SELECT"), "got: {h}");
1447    }
1448
1449    #[test]
1450    fn source_auth_error_mysql_mentions_grant() {
1451        let h = src_hint(
1452            "Access denied for user 'rivet'@'localhost'",
1453            SourceType::Mysql,
1454        )
1455        .expect("hint");
1456        assert!(h.contains("GRANT") && h.contains("FLUSH"), "got: {h}");
1457    }
1458
1459    #[test]
1460    fn source_connectivity_error_mentions_bastion_and_network() {
1461        let h = src_hint("connection refused", SourceType::Postgres).expect("hint");
1462        assert!(h.contains("bastion") || h.contains("VPN"), "got: {h}");
1463    }
1464
1465    #[test]
1466    fn source_unknown_error_returns_no_hint() {
1467        // Generic "error" category should yield no hint — better to
1468        // print the raw driver message than to mislead.
1469        let h = src_hint("totally unexpected", SourceType::Postgres);
1470        assert!(h.is_none(), "unknown errors should not produce a hint");
1471    }
1472
1473    #[test]
1474    fn dest_s3_auth_error_names_concrete_actions() {
1475        let h = dest_hint("permission denied", DestinationType::S3).expect("hint");
1476        assert!(
1477            h.contains("s3:PutObject") && h.contains("cloud-permissions"),
1478            "got: {h}"
1479        );
1480    }
1481
1482    #[test]
1483    fn dest_gcs_auth_error_names_concrete_actions() {
1484        let h = dest_hint("403 Forbidden", DestinationType::Gcs).expect("hint");
1485        assert!(
1486            h.contains("storage.objects") && h.contains("cloud-permissions"),
1487            "got: {h}"
1488        );
1489    }
1490
1491    #[test]
1492    fn categorize_dest_error_sas_expired_message_returns_sas_expired_category() {
1493        // Guard the load-bearing ordering in categorize_dest_error: the
1494        // "sas expired" early-return must fire before the generic "token"
1495        // branch, or destination_error_hint produces the wrong hint.
1496        // This test pins the *category string*, not just the final hint text.
1497        let err = anyhow::anyhow!(
1498            "Azure SAS token already expired (se=2024-01-01T00:00:00Z). Generate a new SAS and re-export."
1499        );
1500        let dest = DestinationConfig {
1501            destination_type: DestinationType::Azure,
1502            bucket: Some("c".into()),
1503            ..Default::default()
1504        };
1505        let cat = categorize_dest_error(&err, &dest);
1506        assert_eq!(
1507            cat, "sas expired",
1508            "expired-SAS error must categorise as 'sas expired', not '{cat}' — ordering in categorize_dest_error is load-bearing"
1509        );
1510    }
1511
1512    #[test]
1513    fn dest_azure_sas_expired_returns_regenerate_hint() {
1514        // The Azure preflight (v0.7.4) bails with "expired (se=…)" —
1515        // the hint must steer the operator to `az storage container
1516        // generate-sas` not "your IAM role is broken".
1517        let h = dest_hint(
1518            "Azure SAS token already expired (se=2024-01-01T00:00:00Z)",
1519            DestinationType::Azure,
1520        )
1521        .expect("hint");
1522        assert!(
1523            h.contains("generate-sas") && h.contains("AZURE_STORAGE_SAS_TOKEN"),
1524            "got: {h}"
1525        );
1526    }
1527
1528    #[test]
1529    fn dest_s3_bucket_not_found_says_no_auto_create() {
1530        let h = dest_hint("NoSuchBucket", DestinationType::S3).expect("hint");
1531        assert!(
1532            h.contains("does NOT auto-create") && h.contains("aws s3 mb"),
1533            "got: {h}"
1534        );
1535    }
1536
1537    #[test]
1538    fn dest_s3_connectivity_error_warns_about_region_mismatch() {
1539        let h = dest_hint("dns error", DestinationType::S3).expect("hint");
1540        assert!(h.contains("region") || h.contains("endpoint"), "got: {h}");
1541    }
1542}