Skip to main content

rivet/preflight/
doctor.rs

1use crate::config::{Config, DestinationType, SourceType};
2use crate::error::Result;
3
4/// One `rivet doctor` probe result. `doctor --json` emits these as an array; the
5/// text path prints the same data as `[OK]/[FAIL] <name>` lines (byte-identical
6/// to before — the `println!`s are unchanged, just gated on `!json`).
7#[derive(Debug, Clone, serde::Serialize)]
8struct DoctorCheck {
9    name: String,
10    ok: bool,
11    #[serde(skip_serializing_if = "Option::is_none")]
12    detail: Option<String>,
13    #[serde(skip_serializing_if = "Option::is_none")]
14    hint: Option<String>,
15}
16
17#[derive(Debug, serde::Serialize)]
18struct DoctorReport {
19    config_path: String,
20    all_ok: bool,
21    checks: Vec<DoctorCheck>,
22}
23
24fn print_doctor_json(config_path: &str, all_ok: bool, checks: &[DoctorCheck]) {
25    let report = DoctorReport {
26        config_path: config_path.to_string(),
27        all_ok,
28        checks: checks.to_vec(),
29    };
30    println!(
31        "{}",
32        serde_json::to_string_pretty(&report).unwrap_or_else(|_| "{}".into())
33    );
34}
35
36/// Record a check and, in text mode, render its `[OK]/[FAIL]` line **from the
37/// same struct**. One call per check — there is no second hand-written line to
38/// drift out of sync with the `--json` report (the dual-emit this replaced had
39/// a `println!` and a `push` maintained in parallel at every site). A `FAIL`
40/// always carries `detail` at its call sites; `unwrap_or("")` is a belt-and-
41/// braces fallback, not an expected path.
42fn emit_check(checks: &mut Vec<DoctorCheck>, json: bool, check: DoctorCheck) {
43    if !json {
44        if check.ok {
45            println!("[OK]  {}", check.name);
46        } else {
47            println!(
48                "[FAIL] {}: {}",
49                check.name,
50                check.detail.as_deref().unwrap_or("")
51            );
52            if let Some(h) = &check.hint {
53                println!("       Hint: {}", h);
54            }
55        }
56    }
57    checks.push(check);
58}
59
60pub fn doctor(config_path: &str, json: bool) -> Result<()> {
61    if !json {
62        println!("rivet doctor: verifying auth for config '{}'", config_path);
63        println!();
64    }
65    let mut checks: Vec<DoctorCheck> = Vec::new();
66
67    let config = match Config::load(config_path) {
68        Ok(c) => {
69            emit_check(
70                &mut checks,
71                json,
72                DoctorCheck {
73                    name: "Config parsed successfully".into(),
74                    ok: true,
75                    detail: None,
76                    hint: None,
77                },
78            );
79            c
80        }
81        Err(e) => {
82            // L4: surface the config error exactly once; exit stays non-zero.
83            emit_check(
84                &mut checks,
85                json,
86                DoctorCheck {
87                    name: "Config error".into(),
88                    ok: false,
89                    detail: Some(trim_probe_error(&e)),
90                    hint: None,
91                },
92            );
93            if json {
94                print_doctor_json(config_path, false, &checks);
95            }
96            anyhow::bail!("doctor: config check failed (see output above)")
97        }
98    };
99
100    let mut all_ok = true;
101
102    match check_source_auth(&config) {
103        Ok(()) => {
104            emit_check(
105                &mut checks,
106                json,
107                DoctorCheck {
108                    name: format!("Source auth ({:?})", config.source.source_type),
109                    ok: true,
110                    detail: None,
111                    hint: None,
112                },
113            );
114            // Text-only advisory (live probe, no JSON counterpart) — stays inline
115            // after the source line, so its position is unchanged.
116            if !json {
117                note_mssql_harm_permission(&config);
118            }
119        }
120        Err(e) => {
121            all_ok = false;
122            let category = categorize_source_error(&e);
123            let hint =
124                source_error_hint(category, &e, &config.source.source_type).map(|h| h.to_string());
125            emit_check(
126                &mut checks,
127                json,
128                DoctorCheck {
129                    name: format!("Source {}", category),
130                    ok: false,
131                    detail: Some(trim_probe_error(&e)),
132                    hint,
133                },
134            );
135        }
136    }
137
138    let mut seen_destinations: Vec<String> = Vec::new();
139    for export in &config.exports {
140        let dest_key = super::destination_identity(&export.destination);
141        if seen_destinations.contains(&dest_key) {
142            continue;
143        }
144        seen_destinations.push(dest_key);
145
146        let label = match export.destination.destination_type {
147            DestinationType::Local => format!(
148                "Local({})",
149                export.destination.path.as_deref().unwrap_or(".")
150            ),
151            DestinationType::S3 => format!(
152                "S3({})",
153                export.destination.bucket.as_deref().unwrap_or("?")
154            ),
155            DestinationType::Gcs => format!(
156                "GCS({})",
157                export.destination.bucket.as_deref().unwrap_or("?")
158            ),
159            DestinationType::Azure => format!(
160                "Azure({})",
161                export.destination.bucket.as_deref().unwrap_or("?")
162            ),
163            DestinationType::Stdout => {
164                // L23: stdout streams to the terminal — nothing to auth-probe —
165                // but say so explicitly so the operator sees it was considered.
166                emit_check(
167                    &mut checks,
168                    json,
169                    DoctorCheck {
170                        name: "Destination Stdout (streaming; no preflight needed)".into(),
171                        ok: true,
172                        detail: None,
173                        hint: None,
174                    },
175                );
176                continue;
177            }
178        };
179
180        // Apply `{date}`/`{export}`/`{table}` substitution so the probe write
181        // lands at the same prefix `run` would use.
182        let expanded_dest = crate::plan::build::expand_destination_templates(
183            export.destination.clone(),
184            &export.name,
185        );
186        match check_destination_auth(&expanded_dest) {
187            Ok(()) => {
188                emit_check(
189                    &mut checks,
190                    json,
191                    DoctorCheck {
192                        name: format!("Destination {}", label),
193                        ok: true,
194                        detail: None,
195                        hint: None,
196                    },
197                );
198            }
199            Err(e) => {
200                all_ok = false;
201                let category = categorize_dest_error(&e, &expanded_dest);
202                let hint = destination_error_hint(category, &expanded_dest).map(|h| h.to_string());
203                emit_check(
204                    &mut checks,
205                    json,
206                    DoctorCheck {
207                        name: format!("Destination {} -- {}", label, category),
208                        ok: false,
209                        detail: Some(trim_probe_error(&e)),
210                        hint,
211                    },
212                );
213            }
214        }
215    }
216
217    if json {
218        print_doctor_json(config_path, all_ok, &checks);
219    } else {
220        println!();
221        if all_ok {
222            println!("All checks passed.");
223            println!("Next: rivet check -c {config_path}   # column-type & schema report");
224        } else {
225            // F-NEW-A (0.7.5 audit): the exit code now matches the fail-line so
226            // CI / cron can tell a healthy environment from a broken one.
227            println!("Some checks failed. Fix the issues above before running exports.");
228        }
229    }
230    if all_ok {
231        Ok(())
232    } else {
233        anyhow::bail!("doctor: one or more preflight checks failed (see output above)")
234    }
235}
236
237fn check_source_auth(config: &Config) -> Result<()> {
238    let url = config.source.resolve_url()?;
239    let tls = config.source.tls.as_ref();
240    // `doctor` is meant to surface security misconfigurations *before*
241    // the operator runs a real export. Plaintext-source connections are
242    // a security misconfiguration; the warn was previously only
243    // emitted from `create_source` (the `run` path), so an operator
244    // could pass `doctor` clean and only learn about the TLS gap on
245    // first `rivet run`. The helper is `Once`-gated so duplicate calls
246    // from later phases don't restack the line.
247    crate::source::warn_if_tls_disabled(&config.source);
248    match config.source.source_type {
249        SourceType::Postgres => {
250            let mut client = crate::source::postgres::connect_client(&url, tls)?;
251            client.simple_query("SELECT 1")?;
252            Ok(())
253        }
254        SourceType::Mysql => {
255            let pool = crate::source::mysql::connect_pool(&url, tls)?;
256            let mut conn = pool.get_conn()?;
257            use mysql::prelude::Queryable;
258            conn.query_drop("SELECT 1")?;
259            Ok(())
260        }
261        SourceType::Mssql => {
262            // `connect_with_tls` runs a connect + `SELECT 1` round-trip itself,
263            // so a successful construction is a green health-check.
264            crate::source::mssql::MssqlSource::connect_with_tls(&url, tls)?;
265            Ok(())
266        }
267    }
268}
269
270/// Advisory (never a `[FAIL]`): a SQL Server login that can extract data fine
271/// but lacks `VIEW SERVER STATE` will have its source-harm metrics (lock waits,
272/// via `sys.dm_os_wait_stats`) silently skipped. Surface that at health-check
273/// time so the operator can grant it if they want the metrics — but it must
274/// never gate extraction, so this only prints a `[note]` and leaves `all_ok`
275/// untouched.
276///
277/// No-op for non-MSSQL sources, and when the permission is present or
278/// undeterminable (a probe failure stays silent rather than guess).
279fn note_mssql_harm_permission(config: &Config) {
280    if config.source.source_type != SourceType::Mssql {
281        return;
282    }
283    let Ok(url) = config.source.resolve_url() else {
284        return;
285    };
286    if let Some(false) =
287        crate::source::mssql::sample_view_server_state(&url, config.source.tls.as_ref())
288    {
289        println!(
290            "[note] Source-harm metrics need VIEW SERVER STATE — this SQL Server login lacks it, \
291             so lock-wait metrics will be skipped. Data extraction is unaffected. \
292             Grant with: GRANT VIEW SERVER STATE TO [your_login];"
293        );
294    }
295}
296
297fn check_destination_auth(dest: &crate::config::DestinationConfig) -> Result<()> {
298    use crate::destination::create_destination_for_probe;
299    // L20: a preflight connectivity probe must FAIL FAST against an
300    // unreachable cloud endpoint. The export-path `create_destination`
301    // inherits a 5-attempt escalating-backoff RetryLayer (~10s of WARN-noisy
302    // retries before the `[FAIL]`); the probe factory builds the destination
303    // with retries disabled so a closed port surfaces immediately.
304    let d = create_destination_for_probe(dest)?;
305    let probe_key = crate::manifest::DOCTOR_PROBE_FILENAME;
306    let tmp = std::env::temp_dir().join(probe_key);
307    std::fs::write(&tmp, b"ok")?;
308    match d.write(&tmp, probe_key) {
309        Ok(_) => {
310            log::debug!("doctor: probe write succeeded, cleaning up");
311        }
312        Err(e) => {
313            let _ = std::fs::remove_file(&tmp);
314            return Err(e);
315        }
316    }
317    let _ = std::fs::remove_file(&tmp);
318    // FINDING #26: the write-probe drops a `.rivet_doctor_probe` object at the
319    // destination prefix to verify write access; we must remove it so `doctor`
320    // leaves the prefix exactly as it found it (the local temp above is a
321    // *separate* file in `temp_dir`, not the destination object). The probe
322    // landed at the same key `write` resolved: `probe_key` joined to the
323    // destination root.
324    remove_destination_probe(dest, probe_key);
325    Ok(())
326}
327
328/// Resolve the local destination root the way `LocalDestination::new` does.
329///
330/// Kept inline rather than reaching into the backend so `doctor` depends only
331/// on the config, not on `LocalDestination`'s private field. MUST stay in sync
332/// with `destination/local.rs`: `path` wins, then `prefix`, then `.`.
333fn local_base_path(dest: &crate::config::DestinationConfig) -> String {
334    dest.path
335        .clone()
336        .or_else(|| dest.prefix.clone())
337        .unwrap_or_else(|| ".".to_string())
338}
339
340/// Best-effort removal of the destination-side write-probe (FINDING #26).
341///
342/// Local destinations are removed directly: `write` lands the probe at
343/// `<base>/<probe_key>` (an atomic temp-then-rename), so a plain
344/// `remove_file` restores the prefix to empty. A missing probe is benign
345/// (`NotFound` swallowed) — `doctor` already passed the write check, this is
346/// pure tidy-up and must never turn a healthy run into a failure.
347///
348/// Cloud backends (S3/GCS/Azure) expose no delete on the `Destination` trait,
349/// so the probe object persists; `manifest_reconcile` already filters
350/// `DOCTOR_PROBE_FILENAME` out of listings, so the residue never confuses
351/// verification. It is logged (not silently ignored) but at DEBUG, not WARN:
352/// on the happy path the leftover is benign and a WARN on every successful
353/// cloud `doctor` run would be alarming noise — a fully clean removal needs a
354/// `delete` on the `Destination` trait, tracked separately.
355fn remove_destination_probe(dest: &crate::config::DestinationConfig, probe_key: &str) {
356    match dest.destination_type {
357        DestinationType::Local => {
358            let probe_path = std::path::Path::new(&local_base_path(dest)).join(probe_key);
359            match std::fs::remove_file(&probe_path) {
360                Ok(()) => log::debug!("doctor: removed destination probe {}", probe_path.display()),
361                Err(e) if e.kind() == std::io::ErrorKind::NotFound => {}
362                Err(e) => log::warn!(
363                    "doctor: could not remove destination probe {} (left at prefix): {e}",
364                    probe_path.display()
365                ),
366            }
367        }
368        DestinationType::Stdout => {} // streaming sink writes nothing to clean up
369        DestinationType::S3 | DestinationType::Gcs | DestinationType::Azure => {
370            log::debug!(
371                "doctor: destination probe '{probe_key}' left at the {:?} prefix \
372                 (no object-delete on this backend); manifest reconcile filters it from listings",
373                dest.destination_type
374            );
375        }
376    }
377}
378
379/// L21: trim a raw probe error to its root cause for the `[FAIL]` line.
380///
381/// opendal/reqwest errors dump the entire HTTP response — `... response: Parts
382/// { status: 403, version: HTTP/1.1, headers: {"x-amz-request-id": ...,
383/// "content-type": ..., ...} } ...` — into their `Display`. That multi-line
384/// header blob buries the one line the operator needs (the category is already
385/// shown separately). We keep the meaningful prefix and cut at the first noisy
386/// marker, collapse any embedded newlines, and cap the length as a backstop.
387/// Categorisation still runs on the full `{:#}` chain elsewhere — this only
388/// shapes what is *printed*, never what is matched.
389fn trim_probe_error(err: &anyhow::Error) -> String {
390    // Use the full `{:#}` cause chain, not the top-level Display: Postgres
391    // surfaces a wrong password as the bare wrapper `db error` and buries
392    // `password authentication failed for user …` in `.source()`. The alternate
393    // form joins the chain so the [FAIL] line shows the real reason.
394    let raw = format!("{err:#}");
395    // Detect an opendal/reqwest HTTP-response dump on a flattened copy (markers
396    // are ASCII, single-line fragments). `to_ascii_lowercase` keeps byte layout.
397    let flat = raw.replace(['\n', '\r'], " ");
398    let lower = flat.to_ascii_lowercase();
399    let cut = [
400        ", context: {",
401        " context: {",
402        " parts {",
403        ", headers: {",
404        " headers: {",
405        ", response:",
406    ]
407    .iter()
408    .filter_map(|m| lower.find(m))
409    .min();
410    let mut out = match cut {
411        // A structural HTTP dump IS present: flatten to ONE line and cut at the
412        // marker so the header blob doesn't bury the root cause.
413        Some(i) => flat[..i].trim_end_matches([' ', ',']).to_string(),
414        // No dump: a hand-authored message (a config-validation menu like the
415        // chunk_column/chunk_by_key options, or the auth detail). Preserve it
416        // VERBATIM, newlines included — flattening would mash a multi-line hint
417        // into a run-on line.
418        None => raw.trim().to_string(),
419    };
420    // Backstop against a runaway message: generous enough to keep a multi-line
421    // config hint intact, small enough to bound a 5000-char library dump.
422    const MAX: usize = 1200;
423    if out.chars().count() > MAX {
424        out = out.chars().take(MAX).collect::<String>();
425        out.push('…');
426    }
427    out
428}
429
430pub(crate) fn categorize_source_error(err: &anyhow::Error) -> &'static str {
431    // `{:#}` (alternate) walks the anyhow cause chain, not just the top
432    // Display. Postgres surfaces a wrong password as the bare top-level
433    // `"db error"` and buries `"password authentication failed for user …"`
434    // in `.source()`; `{}` would never see the real reason and the operator
435    // would get the useless generic "error" bucket. The alternate form joins
436    // the chain so the auth/connectivity needles below match the true cause.
437    let msg = format!("{err:#}").to_lowercase();
438    if msg.contains("password")
439        || msg.contains("authentication")
440        || msg.contains("access denied")
441        // MSSQL bad credentials: `"Login failed for user 'sa'"`.
442        || msg.contains("login failed")
443        // Postgres top-level Display when the real cause (auth) is nested and
444        // `{:#}` still collapses to the bare wrapper — a server-side `DbError`
445        // is never a connectivity failure (those say "connect"/"refused"), so
446        // mapping it to auth is the actionable bucket, not a misroute.
447        || msg.contains("db error")
448    {
449        "auth error"
450    } else if msg.contains("connect")
451        || msg.contains("refused")
452        || msg.contains("timed out")
453        || msg.contains("could not translate host")
454        || msg.contains("name or service not known")
455    {
456        "connectivity error"
457    } else {
458        "error"
459    }
460}
461
462pub(super) fn categorize_dest_error(
463    err: &anyhow::Error,
464    dest: &crate::config::DestinationConfig,
465) -> &'static str {
466    // `{:#}` walks the cause chain: opendal wraps the underlying
467    // reqwest/HTTP reason as a `.source()`, so the alternate form is what
468    // surfaces "error sending request" / "InvalidAccessKeyId" / "status: 403"
469    // when those are nested rather than on the top Display.
470    let msg = format!("{err:#}").to_lowercase();
471    // CONTRACT: the pattern below must match the error text emitted by
472    // `enforce_sas_expiry` in destination/azure.rs:
473    //   "Azure SAS token already expired (se=…)"
474    // If that message ever changes, update both places together.
475    if msg.contains("already expired") && msg.contains("sas") {
476        return "sas expired";
477    }
478    // L6: a local filesystem permission denial (`Permission denied (os error
479    // 13)`) is NOT an auth failure — there are no credentials involved, just
480    // directory mode bits. Route Local/Stdout permission denials to their own
481    // "permission error" category (the same FS-permissions hint applies) so the
482    // label stops misattributing an `os error 13` to credentials. Cloud
483    // backends keep falling through to "auth error" below, where a denied write
484    // genuinely is a credential/IAM problem.
485    if matches!(
486        dest.destination_type,
487        DestinationType::Local | DestinationType::Stdout
488    ) && (msg.contains("permission denied")
489        || msg.contains("permissiondenied")
490        || msg.contains("os error 13"))
491    {
492        return "permission error";
493    }
494    if msg.contains("credential")
495        || msg.contains("permission denied")
496        // opendal lowercases its `ErrorKind` with no space: `PermissionDenied`.
497        || msg.contains("permissiondenied")
498        || msg.contains("access denied")
499        || msg.contains("unauthorized")
500        || msg.contains("forbidden")
501        || msg.contains("invalid_grant")
502        || msg.contains("token")
503        // S3 rejects a bad/expired key with `InvalidAccessKeyId` + HTTP 403.
504        || msg.contains("invalidaccesskeyid")
505        || msg.contains("403")
506    {
507        "auth error"
508    } else if msg.contains("not found") || msg.contains("nosuchbucket") || msg.contains("404") {
509        match dest.destination_type {
510            DestinationType::S3 => "bucket not found",
511            DestinationType::Gcs => "bucket not found",
512            DestinationType::Azure => "container not found",
513            DestinationType::Local | DestinationType::Stdout => "path not found",
514        }
515    } else if msg.contains("connect")
516        || msg.contains("refused")
517        || msg.contains("timed out")
518        || msg.contains("dns")
519        || msg.contains("endpoint")
520        // reqwest/opendal phrasing for a transport-level failure reaching the
521        // endpoint at all (bad host, unreachable network, TLS reset) — none of
522        // the needles above match the bare "error sending request for url …".
523        || msg.contains("error sending request")
524        || msg.contains("send http request")
525    {
526        "connectivity error"
527    } else {
528        "error"
529    }
530}
531
532/// Map a categorised source error (+ raw text) to an actionable hint.
533///
534/// Returns `None` when nothing more useful than the underlying error
535/// itself can be said.  The output is intentionally short — `doctor`
536/// already prints the full driver message; the hint should add the
537/// *next action*, not re-explain the failure.
538///
539/// Categories come from [`categorize_source_error`].
540pub(crate) fn source_error_hint(
541    category: &'static str,
542    err: &anyhow::Error,
543    source_type: &crate::config::SourceType,
544) -> Option<&'static str> {
545    use crate::config::SourceType;
546    let msg = err.to_string().to_lowercase();
547
548    // TLS misconfig leaks through every category — check first so a
549    // generic "error" with a TLS root cause still gets the right hint.
550    if msg.contains("tls")
551        || msg.contains("ssl")
552        || msg.contains("certificate")
553        || msg.contains("handshake")
554    {
555        return Some(match source_type {
556            SourceType::Postgres => {
557                "TLS handshake failed. Try `tls.mode: prefer` (downgrade gracefully) or set `tls.ca_file: /path/to/ca-bundle.pem` if your DB uses a private CA."
558            }
559            SourceType::Mysql => {
560                "TLS handshake failed. Try `tls.mode: prefer` or set `tls.ca_file: /path/to/ca-bundle.pem` to trust the DB's certificate authority."
561            }
562            SourceType::Mssql => {
563                "TLS handshake failed. SQL Server forces TLS on the login handshake; set `tls.ca_file: /path/to/ca-bundle.pem` to trust a private CA, or `tls.accept_invalid_certs: true` for a self-signed dev cert."
564            }
565        });
566    }
567
568    match category {
569        "auth error" => Some(match source_type {
570            SourceType::Postgres => {
571                "Verify the user/password and that pg_hba.conf permits your client IP. The user also needs SELECT on the target tables and USAGE on the schema."
572            }
573            SourceType::Mysql => {
574                "Verify the user/password and that the user has SELECT grants on the target tables. MySQL `GRANT SELECT ON db.* TO 'user'@'host'` plus `FLUSH PRIVILEGES`."
575            }
576            SourceType::Mssql => {
577                "Verify the SQL login/password and that the login maps to a database user with SELECT on the target tables (`GRANT SELECT ON dbo.tbl TO [user]`). Check you are pointed at the right database — contained-DB users and server logins are resolved differently."
578            }
579        }),
580        "connectivity error" => Some(
581            "Verify host/port reachability from this machine. If the DB is behind a bastion or VPN, ensure the tunnel is up before running rivet. `rivet doctor` must run from the same network as `rivet run` will.",
582        ),
583        _ => None,
584    }
585}
586
587/// Map a categorised destination error (+ raw text) to an actionable hint.
588///
589/// Mirrors [`source_error_hint`] but with backend-specific guidance
590/// (S3 region / IAM / endpoint, GCS service account / ADC, Azure key /
591/// SAS / RBAC).  Returns `None` when nothing better than the raw error
592/// applies.
593pub(super) fn destination_error_hint(
594    category: &'static str,
595    dest: &crate::config::DestinationConfig,
596) -> Option<&'static str> {
597    match category {
598        "sas expired" => Some(
599            "Azure SAS token is expired or near-expiry. Generate a new SAS via `az storage container generate-sas --permissions rwdlc --expiry <future-date>` and re-export AZURE_STORAGE_SAS_TOKEN.",
600        ),
601        // L6: local FS permission denial — same actionable hint as the
602        // Local/Stdout auth branch below, just under the correct category.
603        "permission error" => Some("Verify filesystem permissions on the destination directory."),
604        "auth error" => Some(match dest.destination_type {
605            DestinationType::S3 => {
606                "Verify AWS credentials resolve (env / profile / instance role) and that the role has s3:PutObject + s3:GetObject + s3:ListBucket on the prefix. See docs/cloud-permissions.md."
607            }
608            DestinationType::Gcs => {
609                "Verify the service account credentials resolve (ADC / env / explicit credentials_file) and that the principal has storage.objects.{create,get,list} on the bucket. See docs/cloud-permissions.md."
610            }
611            DestinationType::Azure => {
612                "Verify Azure credentials. Account-key auth: check account_key_env. SAS auth: regenerate the SAS with rwdlc permissions and a future expiry. See docs/cloud-permissions.md."
613            }
614            DestinationType::Local | DestinationType::Stdout => {
615                "Verify filesystem permissions on the destination directory."
616            }
617        }),
618        "bucket not found" | "container not found" => Some(match dest.destination_type {
619            DestinationType::S3 => {
620                "Bucket must already exist; rivet does NOT auto-create. `aws s3 mb s3://<bucket>` (with the right region) before running."
621            }
622            DestinationType::Gcs => {
623                "Bucket must already exist; rivet does NOT auto-create. `gcloud storage buckets create gs://<bucket>` before running."
624            }
625            DestinationType::Azure => {
626                "Container must already exist; rivet does NOT auto-create. `az storage container create --account-name <acct> --name <container>` before running."
627            }
628            _ => "Path / bucket / container must already exist.",
629        }),
630        "connectivity error" => Some(match dest.destination_type {
631            DestinationType::S3 => {
632                "Verify endpoint and region. For non-AWS endpoints (MinIO / R2 / Wasabi) set `endpoint:` explicitly. For AWS, ensure `region:` matches the bucket's region — cross-region writes fail with a confusing redirect error."
633            }
634            DestinationType::Gcs => {
635                "Verify network reachability to storage.googleapis.com. If using a custom endpoint, set `endpoint:` explicitly."
636            }
637            DestinationType::Azure => {
638                "Verify network reachability to <account>.blob.core.windows.net. For Azurite or sovereign clouds, set `endpoint:` explicitly."
639            }
640            _ => "Verify network reachability to the destination.",
641        }),
642        "path not found" => Some(
643            "Parent directory must exist. Create it with `mkdir -p` before running, or use a different `path:` in your config.",
644        ),
645        _ => None,
646    }
647}
648
649#[cfg(test)]
650mod tests {
651    use super::*;
652
653    // doctor-dedup-path (regression): doctor's destination dedup key must
654    // include `path`, so two local destinations with different `path:` values
655    // are each probed independently — a buggy key of type+bucket+endpoint
656    // (omitting `path`) collapses both to "Local:-:-" and only probes the
657    // first, letting an unwritable second directory pass doctor and fail at
658    // run time. The dedup key now flows through `super::destination_identity`
659    // (preflight/mod.rs), which includes `path`; this drives the whole
660    // `doctor()` to prove the inline copy did not drift.
661    //
662    // The destination-side `.rivet_doctor_probe` is now cleaned up
663    // (FINDING #26), so a leftover probe FILE can no longer serve as the
664    // "was this path probed?" signal. Instead each destination uses a
665    // *nested* path that does not exist yet: probing it forces
666    // `LocalDestination::write` to `create_dir_all` the leaf directory, and
667    // cleanup removes only the probe file — so the created (now empty) leaf
668    // directory is a durable witness that the path was probed AND that #26
669    // cleanup ran. A buggy dedup that skipped the second path would leave its
670    // leaf directory absent.
671    //
672    // The source check fails fast and offline via `resolve_url` on an unset
673    // `url_env`, and `doctor` continues to the destination loop regardless.
674    #[test]
675    fn roast_doctor_write_probes_each_distinct_local_destination_path() {
676        let dir_a = tempfile::tempdir().unwrap();
677        let dir_b = tempfile::tempdir().unwrap();
678        let config_dir = tempfile::tempdir().unwrap();
679
680        // Nested leaves that doctor must create when it probes each path.
681        let leaf_a = dir_a.path().join("probe_here");
682        let leaf_b = dir_b.path().join("probe_here");
683
684        let yaml = format!(
685            r#"
686source:
687  type: postgres
688  url_env: RIVET_ROAST_DOCTOR_DEDUP_UNSET_URL_ENV
689exports:
690  - name: roast_dest_a
691    query: "SELECT 1"
692    format: csv
693    destination:
694      type: local
695      path: "{a}"
696  - name: roast_dest_b
697    query: "SELECT 1"
698    format: csv
699    destination:
700      type: local
701      path: "{b}"
702"#,
703            a = leaf_a.display(),
704            b = leaf_b.display(),
705        );
706        let config_path = config_dir.path().join("rivet.yaml");
707        std::fs::write(&config_path, yaml).unwrap();
708
709        // Returns Err (source auth fails on the unset env var); the
710        // destination probes are the observable under test.
711        let _ = doctor(config_path.to_str().unwrap(), false);
712
713        let probe = crate::manifest::DOCTOR_PROBE_FILENAME;
714
715        // Each distinct path was probed → its leaf directory now exists; and
716        // FINDING #26 cleanup ran → the probe file inside it is gone, leaving
717        // the leaf empty.
718        for (label, leaf) in [("first", &leaf_a), ("second", &leaf_b)] {
719            assert!(
720                leaf.exists(),
721                "doctor never write-probed the {label} local destination {} — its dedup key \
722                 must include `path`; a key that omits it collapses both local destinations to \
723                 one entry and only probes the first, so an unwritable second directory would \
724                 pass doctor and fail at run time",
725                leaf.display()
726            );
727            assert!(
728                !leaf.join(probe).exists(),
729                "doctor left its write-probe `{probe}` at the {label} destination {} \
730                 (FINDING #26: it must remove the destination-side probe, not only the local temp)",
731                leaf.display()
732            );
733            assert!(
734                std::fs::read_dir(leaf).unwrap().next().is_none(),
735                "doctor must leave the {label} destination {} exactly as it created it (empty)",
736                leaf.display()
737            );
738        }
739    }
740
741    // Build a bare DestinationConfig of the given type for hint dispatch.
742    fn dest_of(t: DestinationType) -> crate::config::DestinationConfig {
743        crate::config::DestinationConfig {
744            destination_type: t,
745            ..Default::default()
746        }
747    }
748
749    // AUDIT-RED doctor-categorizer: Postgres wrong-password Display is just "db error"
750    // (real cause nested in .source()); current auth needles miss it so it falls to
751    // generic "error" with no hint. Asserts CORRECT behavior; expected to FAIL until fixed.
752    #[test]
753    fn audit_pg_db_error_is_auth_with_hint() {
754        let err = anyhow::anyhow!("db error");
755        let cat = categorize_source_error(&err);
756        assert_eq!(
757            cat, "auth error",
758            "Postgres wrong-password surfaces as 'db error'; categorizer returned {:?} instead of 'auth error'",
759            cat
760        );
761        let hint = source_error_hint(cat, &err, &SourceType::Postgres);
762        assert!(
763            hint.is_some(),
764            "no actionable hint produced for Postgres 'db error' (category {:?}); operator gets no next step",
765            cat
766        );
767    }
768
769    // AUDIT-RED doctor-categorizer (#1): MSSQL wrong-login Display is
770    // "Login failed for user 'sa'" — current auth needles (password/authentication/
771    // access denied) miss it, so it falls to generic "error" with no hint.
772    // Asserts CORRECT behavior; expected to FAIL until fixed.
773    #[test]
774    fn audit_mssql_login_failed_is_auth_with_hint() {
775        let err = anyhow::anyhow!("login failed for user 'sa'");
776        let cat = categorize_source_error(&err);
777        assert_eq!(
778            cat, "auth error",
779            "MSSQL bad login surfaces as 'Login failed for user ...'; categorizer returned {:?} instead of 'auth error'",
780            cat
781        );
782        let hint = source_error_hint(cat, &err, &SourceType::Mssql);
783        assert!(
784            hint.is_some(),
785            "no actionable hint produced for MSSQL 'login failed for user' (category {:?})",
786            cat
787        );
788    }
789
790    // AUDIT-RED doctor-categorizer: MySQL 'Access denied for user' already
791    // categorizes as auth — guard so a fix for pg/mssql does not regress it.
792    // Asserts CORRECT behavior; expected to PASS today.
793    #[test]
794    fn audit_mysql_access_denied_is_auth_with_hint() {
795        let err = anyhow::anyhow!("access denied for user");
796        let cat = categorize_source_error(&err);
797        assert_eq!(
798            cat, "auth error",
799            "MySQL 'access denied for user' must stay auth; categorizer returned {:?}",
800            cat
801        );
802        let hint = source_error_hint(cat, &err, &SourceType::Mysql);
803        assert!(
804            hint.is_some(),
805            "no actionable hint produced for MySQL 'access denied for user' (category {:?})",
806            cat
807        );
808    }
809
810    // AUDIT-RED doctor-categorizer (#2): opendal S3 auth failure surfaces as
811    // "PermissionDenied ... InvalidAccessKeyId ... status: 403" — lowercased
812    // 'permissiondenied' has no space (never matches "permission denied"),
813    // '403'/'invalidaccesskeyid' are absent from the needles, so it falls to
814    // generic "error" with no hint. Asserts CORRECT behavior; expected to FAIL.
815    #[test]
816    fn audit_s3_permission_denied_403_is_auth_with_hint() {
817        let dest = dest_of(DestinationType::S3);
818        // NB: deliberately omits the words forbidden/unauthorized/credential/token
819        // and the spaced "permission denied"/"access denied" — those would match the
820        // existing needles and mask the real #2 gap (no-space PermissionDenied,
821        // InvalidAccessKeyId, 403). This is exactly opendal's S3 auth wording.
822        let err = anyhow::anyhow!(
823            "PermissionDenied at write => InvalidAccessKeyId, status: 403, https://bucket.s3.amazonaws.com/probe"
824        );
825        let cat = categorize_dest_error(&err, &dest);
826        assert_eq!(
827            cat, "auth error",
828            "S3 'PermissionDenied/InvalidAccessKeyId/403' must categorize as auth; categorizer returned {:?}",
829            cat
830        );
831        let hint = destination_error_hint(cat, &dest);
832        assert!(
833            hint.is_some(),
834            "no actionable hint produced for S3 auth failure (category {:?}); operator gets no next step",
835            cat
836        );
837    }
838
839    // AUDIT-RED doctor-categorizer (#13 Azure): opendal connectivity failure to
840    // a bad/unreachable Azure endpoint surfaces as "error sending request for url
841    // (https://x.blob.core.windows.net/...)" — none of connect/refused/timed out/
842    // dns/endpoint match, so it falls to generic "error" with no hint.
843    // Asserts CORRECT behavior; expected to FAIL until fixed.
844    #[test]
845    fn audit_azure_send_request_error_is_connectivity_with_hint() {
846        let dest = dest_of(DestinationType::Azure);
847        let err = anyhow::anyhow!(
848            "error sending request for url (https://x.blob.core.windows.net/probe)"
849        );
850        let cat = categorize_dest_error(&err, &dest);
851        assert_eq!(
852            cat, "connectivity error",
853            "Azure 'error sending request for url' must categorize as connectivity; categorizer returned {:?}",
854            cat
855        );
856        let hint = destination_error_hint(cat, &dest);
857        assert!(
858            hint.is_some(),
859            "no actionable hint produced for Azure connectivity failure (category {:?})",
860            cat
861        );
862    }
863
864    // AUDIT-RED doctor-categorizer (#27 guard): a literal "connection refused"
865    // already categorizes as connectivity — guard so a fix for the opendal
866    // send-request wording does not regress it. Asserts CORRECT behavior;
867    // expected to PASS today.
868    #[test]
869    fn audit_dest_connection_refused_is_connectivity_with_hint() {
870        let dest = dest_of(DestinationType::S3);
871        let err = anyhow::anyhow!("connection refused");
872        let cat = categorize_dest_error(&err, &dest);
873        assert_eq!(
874            cat, "connectivity error",
875            "'connection refused' must stay connectivity; categorizer returned {:?}",
876            cat
877        );
878        let hint = destination_error_hint(cat, &dest);
879        assert!(
880            hint.is_some(),
881            "no actionable hint produced for 'connection refused' (category {:?})",
882            cat
883        );
884    }
885
886    // ── regression coverage for the broadened needles (fixes #1/#2) ──────────
887
888    // The pg auth reason is nested in `.source()`; only `{:#}` surfaces it.
889    // This proves the categorizer reads the alternate form, not just the bare
890    // top-level "db error" Display — so a real run shows the true cause.
891    #[test]
892    fn source_pg_nested_password_cause_via_alternate_is_auth() {
893        let root = anyhow::anyhow!("password authentication failed for user \"rivet\"");
894        let wrapped = root.context("db error");
895        assert_eq!(categorize_source_error(&wrapped), "auth error");
896        // And the bare form (no nested cause) still maps via the "db error"
897        // needle — this is the exact shape the audit test pins.
898        assert_eq!(
899            categorize_source_error(&anyhow::anyhow!("db error")),
900            "auth error"
901        );
902    }
903
904    // Guard: a genuine connectivity failure (Display says "connect"/"refused",
905    // never "db error") must NOT be swallowed by the new "db error" auth
906    // needle. Postgres surfaces a refused connection as "error connecting to
907    // server", which carries no auth needle.
908    #[test]
909    fn source_connection_refused_stays_connectivity_not_auth() {
910        let err = anyhow::anyhow!("error connecting to server: Connection refused (os error 61)");
911        assert_eq!(categorize_source_error(&err), "connectivity error");
912    }
913
914    // opendal lowercases its `PermissionDenied` ErrorKind with no space — the
915    // pre-fix "permission denied" needle never matched it. Standalone case so
916    // coverage can't shrink back to the spaced-only needle.
917    #[test]
918    fn dest_no_space_permissiondenied_is_auth() {
919        let dest = dest_of(DestinationType::Gcs);
920        let err = anyhow::anyhow!("PermissionDenied (persistent) at write");
921        assert_eq!(categorize_dest_error(&err, &dest), "auth error");
922        assert!(destination_error_hint("auth error", &dest).is_some());
923    }
924
925    // reqwest's other transport phrasing — "failed to send http request" —
926    // must also categorize as connectivity (the audit test pins the sibling
927    // "error sending request for url" wording).
928    #[test]
929    fn dest_send_http_request_is_connectivity() {
930        let dest = dest_of(DestinationType::S3);
931        // No connect/refused/timed out/dns/endpoint substring — only the new
932        // "send http request" needle can route this to connectivity.
933        let err = anyhow::anyhow!("failed to send http request to the store");
934        assert_eq!(categorize_dest_error(&err, &dest), "connectivity error");
935    }
936
937    // Guard: an HTTP 404 (object/bucket missing) must stay "bucket not found",
938    // not get pulled into auth by the new 403 needle.
939    #[test]
940    fn dest_404_stays_bucket_not_found_after_403_needle_added() {
941        let dest = dest_of(DestinationType::S3);
942        let err = anyhow::anyhow!("NoSuchBucket, status: 404");
943        assert_eq!(categorize_dest_error(&err, &dest), "bucket not found");
944    }
945
946    // FINDING #26 (unit-level mirror of the live test): the destination-side
947    // probe must be removed, leaving the prefix exactly as doctor found it.
948    // Exercises the cleanup directly (no shared system-temp path) so it is
949    // immune to parallel-test contention on `std::env::temp_dir()`.
950    #[test]
951    fn remove_destination_probe_local_deletes_the_probe_object() {
952        let dir = tempfile::tempdir().unwrap();
953        let probe_key = crate::manifest::DOCTOR_PROBE_FILENAME;
954        // Stand in for what `LocalDestination::write` lands at the prefix.
955        std::fs::write(dir.path().join(probe_key), b"ok").unwrap();
956        let dest = crate::config::DestinationConfig {
957            destination_type: DestinationType::Local,
958            path: Some(dir.path().to_string_lossy().into_owned()),
959            ..Default::default()
960        };
961        remove_destination_probe(&dest, probe_key);
962        assert!(
963            std::fs::read_dir(dir.path()).unwrap().next().is_none(),
964            "destination prefix must be left exactly as doctor found it (empty)"
965        );
966    }
967
968    // `remove_destination_probe` resolves the prefix from `prefix` when `path`
969    // is unset (mirrors `LocalDestination::new`'s `path → prefix → "."` order).
970    #[test]
971    fn remove_destination_probe_local_uses_prefix_when_path_unset() {
972        let dir = tempfile::tempdir().unwrap();
973        let probe_key = crate::manifest::DOCTOR_PROBE_FILENAME;
974        std::fs::write(dir.path().join(probe_key), b"ok").unwrap();
975        let dest = crate::config::DestinationConfig {
976            destination_type: DestinationType::Local,
977            prefix: Some(dir.path().to_string_lossy().into_owned()),
978            ..Default::default()
979        };
980        remove_destination_probe(&dest, probe_key);
981        assert!(
982            !dir.path().join(probe_key).exists(),
983            "cleanup must follow the same base-path resolution as the writer"
984        );
985    }
986
987    // Benign no-op when the probe is already absent — cleanup runs after the
988    // write check passed, so it must never turn a healthy run into a failure.
989    #[test]
990    fn remove_destination_probe_missing_is_noop() {
991        let dir = tempfile::tempdir().unwrap();
992        let dest = crate::config::DestinationConfig {
993            destination_type: DestinationType::Local,
994            path: Some(dir.path().to_string_lossy().into_owned()),
995            ..Default::default()
996        };
997        // No probe present; must not panic and must leave the dir untouched.
998        remove_destination_probe(&dest, crate::manifest::DOCTOR_PROBE_FILENAME);
999        assert!(std::fs::read_dir(dir.path()).unwrap().next().is_none());
1000    }
1001
1002    // ── L6: local FS permission denial is "permission error", not "auth" ─────
1003
1004    // A local destination that fails with `Permission denied (os error 13)` is
1005    // a directory-mode problem, not a credential/auth one. It must categorize
1006    // as "permission error" (RED before the fix: the spaced "permission denied"
1007    // needle routed it to "auth error"), and still carry the FS-permissions
1008    // hint so the operator gets a next step.
1009    #[test]
1010    fn local_permission_denied_is_permission_error_not_auth() {
1011        let dest = dest_of(DestinationType::Local);
1012        let err = anyhow::anyhow!("Permission denied (os error 13)");
1013        let cat = categorize_dest_error(&err, &dest);
1014        assert_eq!(
1015            cat, "permission error",
1016            "a local FS `os error 13` is a directory-permission problem, not auth; got {cat:?}"
1017        );
1018        assert!(
1019            destination_error_hint(cat, &dest).is_some(),
1020            "permission error must still surface the filesystem-permissions hint"
1021        );
1022    }
1023
1024    // Guard: a CLOUD permission denial (S3/GCS/Azure) is a genuine
1025    // credential/IAM failure and must stay "auth error" — the L6 nuance is
1026    // scoped to Local/Stdout only.
1027    #[test]
1028    fn cloud_permission_denied_stays_auth_error() {
1029        for t in [
1030            DestinationType::S3,
1031            DestinationType::Gcs,
1032            DestinationType::Azure,
1033        ] {
1034            let dest = dest_of(t);
1035            let err = anyhow::anyhow!("PermissionDenied at write");
1036            assert_eq!(
1037                categorize_dest_error(&err, &dest),
1038                "auth error",
1039                "cloud permission denial must remain auth for {t:?}"
1040            );
1041        }
1042    }
1043
1044    // ── L21: trim_probe_error strips the verbose HTTP response dump ───────────
1045
1046    // An opendal/reqwest error whose Display embeds the full `Parts { ... headers:
1047    // { ... } }` response must be trimmed to the actionable prefix: no headers,
1048    // no `Parts {`, single line.
1049    #[test]
1050    fn trim_probe_error_strips_http_response_parts_and_headers() {
1051        let raw = "PermissionDenied (persistent) at write, context: { uri: https://b.s3.amazonaws.com/probe, response: Parts { status: 403, version: HTTP/1.1, headers: {\"x-amz-request-id\": \"ABC123\", \"content-type\": \"application/xml\"} }, service: s3 } => InvalidAccessKeyId";
1052        let err = anyhow::anyhow!(raw);
1053        let out = trim_probe_error(&err);
1054        assert!(
1055            !out.contains("Parts {") && !out.to_lowercase().contains("headers: {"),
1056            "trimmed error still leaks the HTTP response dump: {out:?}"
1057        );
1058        assert!(
1059            !out.contains('\n'),
1060            "trimmed error must be a single line: {out:?}"
1061        );
1062        assert!(
1063            out.starts_with("PermissionDenied (persistent) at write"),
1064            "trimmed error must keep the meaningful root-cause prefix: {out:?}"
1065        );
1066    }
1067
1068    // A clean, short error passes through essentially unchanged (no false cut).
1069    #[test]
1070    fn trim_probe_error_leaves_clean_message_intact() {
1071        let err = anyhow::anyhow!("error connecting to server: Connection refused (os error 61)");
1072        assert_eq!(
1073            trim_probe_error(&err),
1074            "error connecting to server: Connection refused (os error 61)"
1075        );
1076    }
1077
1078    // Backstop: a pathologically long single line is capped (char-boundary safe).
1079    #[test]
1080    fn trim_probe_error_caps_unbounded_line() {
1081        let err = anyhow::anyhow!("x".repeat(5000));
1082        let out = trim_probe_error(&err);
1083        assert!(
1084            out.chars().count() <= 1201,
1085            "line not capped: {} chars",
1086            out.chars().count()
1087        );
1088        assert!(
1089            out.ends_with('…'),
1090            "capped line must signal truncation: {out:?}"
1091        );
1092    }
1093
1094    #[test]
1095    fn trim_probe_error_preserves_multiline_hint_verbatim() {
1096        // A hand-authored config hint (no HTTP-dump marker) must keep its lines
1097        // — flattening or a tight cap used to drop options from the menu.
1098        let err = anyhow::anyhow!(
1099            "chunked mode needs one of:\n  - chunk_column: <int col>\n  - chunk_by_key: <col>\n  - chunk_count: <n>"
1100        );
1101        let out = trim_probe_error(&err);
1102        assert!(out.contains("chunk_column"), "got: {out:?}");
1103        assert!(out.contains("chunk_by_key"), "got: {out:?}");
1104        assert!(
1105            out.contains("chunk_count"),
1106            "all options preserved: {out:?}"
1107        );
1108        assert!(out.contains('\n'), "newlines preserved: {out:?}");
1109    }
1110
1111    // ── L4: config-load failure is surfaced exactly once ─────────────────────
1112
1113    // doctor must not return the raw config-error message (which `main` would
1114    // reprint as `Error: <msg>`, doubling it). It prints the `[FAIL]` line and
1115    // returns a distinct one-line pointer instead. RED before the fix: the
1116    // returned Err carried the same "missing field" text printed in `[FAIL]`.
1117    #[test]
1118    fn config_load_failure_returns_pointer_not_duplicate_message() {
1119        let dir = tempfile::tempdir().unwrap();
1120        let cfg = dir.path().join("rivet.yaml");
1121        // Invalid YAML structure → Config::load fails with a parse/missing-field
1122        // message; doctor must convert that to a pointer error.
1123        std::fs::write(&cfg, "source: not-a-mapping\n").unwrap();
1124        let err = doctor(cfg.to_str().unwrap(), false)
1125            .expect_err("doctor must return Err when the config fails to load");
1126        let msg = err.to_string();
1127        assert!(
1128            msg.contains("doctor: config check failed") && msg.contains("see output above"),
1129            "returned error must be the one-line pointer (so `main` does not double-print the \
1130             config error); got {msg:?}"
1131        );
1132    }
1133}