holdon 0.2.1

Wait for anything. Know why if it doesn't.
Documentation
use std::time::Instant;

use url::Url;

use super::hint::hints;
use super::{AttemptCtx, err_stage, ok_stage};
use crate::diagnostic::{Stage, StageKind};
use crate::util::format_error_chain;

const PING_PATH: &str = "/ping";
const VERSION_HEADER: &str = "x-influxdb-version";
const BUILD_HEADER: &str = "x-influxdb-build";
const EXPECT_VERSION_KEY: &str = "expect-version";
const TOKEN_KEY: &str = "token";
const TOKEN_AUTH_PREFIX: &str = "Token ";

pub(super) async fn probe(url: &Url, ctx: AttemptCtx) -> Vec<Stage> {
    let start = Instant::now();
    let (request_url, want_version, token) = match prepare(url) {
        Ok(v) => v,
        Err(msg) => {
            return vec![err_stage(
                StageKind::Influxdb,
                start.elapsed(),
                msg,
                Some(hints::INFLUXDB_PARSE),
            )];
        }
    };
    let mut req = crate::checker::http::raw_client()
        .get(request_url)
        .timeout(ctx.attempt_timeout);
    if let Some(t) = &token {
        req = req.header("authorization", format!("{TOKEN_AUTH_PREFIX}{t}"));
    }
    let token_ref = token.as_deref().unwrap_or("");
    let stage = match req.send().await {
        Ok(resp) => {
            let status = resp.status().as_u16();
            let version = resp
                .headers()
                .get(VERSION_HEADER)
                .and_then(|v| v.to_str().ok())
                .map(str::to_owned);
            let build = resp
                .headers()
                .get(BUILD_HEADER)
                .and_then(|v| v.to_str().ok())
                .map(str::to_owned);
            if !(status == 200 || status == 204) {
                let hint = if status == 401 {
                    hints::INFLUXDB_AUTH
                } else {
                    hints::INFLUXDB_NOT_READY
                };
                err_stage(
                    StageKind::Influxdb,
                    start.elapsed(),
                    format!("/ping returned status {status}"),
                    Some(hint),
                )
            } else if let Some(want) = want_version {
                check_version(version.as_deref(), build.as_deref(), want, start.elapsed())
            } else {
                ok_stage(StageKind::Influxdb, start.elapsed())
            }
        }
        Err(e) if e.is_timeout() => err_stage(
            StageKind::Influxdb,
            ctx.attempt_timeout,
            hints::TIMED_OUT,
            Some(hints::INFLUXDB_NOT_READY),
        ),
        Err(e) => {
            let mut msg = format_error_chain(&e);
            if !token_ref.is_empty() {
                msg = crate::util::redact_in(&msg, token_ref);
            }
            err_stage(
                StageKind::Influxdb,
                start.elapsed(),
                msg,
                Some(hints::INFLUXDB_NOT_READY),
            )
        }
    };
    vec![stage]
}

fn check_version(
    version: Option<&str>,
    build: Option<&str>,
    want: u8,
    elapsed: std::time::Duration,
) -> Stage {
    if let Some(v) = version {
        return if version_matches(v, want) {
            ok_stage(StageKind::Influxdb, elapsed)
        } else {
            err_stage(
                StageKind::Influxdb,
                elapsed,
                format!("server reports influxdb {v}, expected major {want}"),
                Some(hints::INFLUXDB_VERSION),
            )
        };
    }
    if let Some(b) = build {
        return if build_matches(b, want) {
            ok_stage(StageKind::Influxdb, elapsed)
        } else {
            err_stage(
                StageKind::Influxdb,
                elapsed,
                format!("server reports influxdb build `{b}`, expected major {want}"),
                Some(hints::INFLUXDB_VERSION),
            )
        };
    }
    err_stage(
        StageKind::Influxdb,
        elapsed,
        "server did not advertise X-Influxdb-Version or X-Influxdb-Build header",
        Some(hints::INFLUXDB_VERSION),
    )
}

fn prepare(url: &Url) -> Result<(Url, Option<u8>, Option<String>), String> {
    let mut want_version: Option<u8> = None;
    let mut token: Option<String> = None;
    for (k, v) in url.query_pairs() {
        if k.eq_ignore_ascii_case(EXPECT_VERSION_KEY) {
            match v.as_ref() {
                "1" => want_version = Some(1),
                "2" => want_version = Some(2),
                "3" => want_version = Some(3),
                other => {
                    return Err(format!(
                        "unknown influxdb:// expect-version `{other}` (only 1, 2, or 3)"
                    ));
                }
            }
        } else if k.eq_ignore_ascii_case(TOKEN_KEY) {
            if v.is_empty() {
                return Err("influxdb:// token cannot be empty".to_owned());
            }
            token = Some(v.into_owned());
        } else {
            return Err(format!(
                "unknown influxdb:// query key `{k}` (only `expect-version` and `token` supported)"
            ));
        }
    }
    let raw = url.as_str();
    let rewritten = if let Some(rest) = raw.strip_prefix("influxdb://") {
        format!("http://{rest}")
    } else if let Some(rest) = raw.strip_prefix("influxdbs://") {
        format!("https://{rest}")
    } else {
        return Err(format!(
            "influxdb probe: unexpected scheme `{}`",
            url.scheme()
        ));
    };
    let mut target =
        Url::parse(&rewritten).map_err(|e| format!("failed to rewrite influxdb:// URL: {e}"))?;
    target.set_query(None);
    let prefix = target.path().trim_end_matches('/');
    target.set_path(&format!("{prefix}{PING_PATH}"));
    Ok((target, want_version, token))
}

fn version_matches(reported: &str, want_major: u8) -> bool {
    reported
        .strip_prefix('v')
        .unwrap_or(reported)
        .split('.')
        .next()
        .and_then(|s| s.parse::<u8>().ok())
        .is_some_and(|m| m == want_major)
}

fn build_matches(reported: &str, want_major: u8) -> bool {
    if want_major != 3 {
        return false;
    }
    let lower = reported.to_ascii_lowercase();
    lower.contains("core") || lower.contains("enterprise") || lower.starts_with("v3")
}

#[cfg(test)]
#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
mod tests {
    use super::*;

    #[test]
    fn version_matches_major_only() {
        assert!(version_matches("2.7.3", 2));
        assert!(version_matches("1.8.10", 1));
        assert!(version_matches("3.0.0", 3));
        assert!(!version_matches("2.7.3", 1));
        assert!(!version_matches("garbage", 2));
    }

    #[test]
    fn version_matches_strips_v_prefix() {
        assert!(version_matches("v2.7.3", 2));
        assert!(version_matches("v1.8", 1));
        assert!(version_matches("v3.1.0", 3));
    }

    #[test]
    fn build_matches_v3() {
        assert!(build_matches("Core", 3));
        assert!(build_matches("core", 3));
        assert!(build_matches("Enterprise", 3));
        assert!(!build_matches("Core", 2));
        assert!(!build_matches("Core", 1));
    }

    #[test]
    fn prepare_rewrites_to_http_and_strips_query() {
        let u: Url = "influxdb://host:8086?expect-version=2".parse().unwrap();
        let (out, want, token) = prepare(&u).unwrap();
        assert_eq!(out.scheme(), "http");
        assert_eq!(out.path(), PING_PATH);
        assert!(out.query().is_none());
        assert_eq!(want, Some(2));
        assert!(token.is_none());
    }

    #[test]
    fn prepare_rewrites_influxdbs_to_https() {
        let u: Url = "influxdbs://host:8086".parse().unwrap();
        let (out, _, _) = prepare(&u).unwrap();
        assert_eq!(out.scheme(), "https");
    }

    #[test]
    fn prepare_rejects_unknown_query_key() {
        let u: Url = "influxdb://host:8086?bucket=metrics".parse().unwrap();
        assert!(prepare(&u).is_err());
    }

    #[test]
    fn prepare_rejects_bad_version_value() {
        let u: Url = "influxdb://host:8086?expect-version=4".parse().unwrap();
        assert!(prepare(&u).is_err());
    }

    #[test]
    fn prepare_accepts_v3() {
        let u: Url = "influxdb://host:8086?expect-version=3".parse().unwrap();
        let (_, want, _) = prepare(&u).unwrap();
        assert_eq!(want, Some(3));
    }

    #[test]
    fn prepare_extracts_token() {
        let u: Url = "influxdb://host:8086?token=abc123".parse().unwrap();
        let (_, _, token) = prepare(&u).unwrap();
        assert_eq!(token.as_deref(), Some("abc123"));
    }

    #[test]
    fn prepare_rejects_empty_token() {
        let u: Url = "influxdb://host:8086?token=".parse().unwrap();
        assert!(prepare(&u).is_err());
    }

    #[test]
    fn prepare_preserves_path_prefix() {
        let u: Url = "influxdb://host:8086/proxy".parse().unwrap();
        let (out, _, _) = prepare(&u).unwrap();
        assert_eq!(out.path(), "/proxy/ping");
    }

    #[test]
    fn prepare_trims_trailing_slash_in_prefix() {
        let u: Url = "influxdb://host:8086/proxy/".parse().unwrap();
        let (out, _, _) = prepare(&u).unwrap();
        assert_eq!(out.path(), "/proxy/ping");
    }
}