sonda-server 1.9.0

HTTP control plane for Sonda — synthetic telemetry generator
//! Sink loopback pre-flight warnings shared by `POST /scenarios` and `POST /events`.

use sonda_core::config::ScenarioEntry;
use sonda_core::sink::SinkConfig;
use tracing::warn;

pub(crate) const LOOPBACK_HINT_DOC: &str = "See docs/deployment/endpoints.md.";

pub(crate) const LOOPBACK_HOSTS: &[&str] = &["localhost", "127.0.0.1", "::1"];

pub(crate) fn is_loopback_host(host: &str) -> bool {
    LOOPBACK_HOSTS
        .iter()
        .any(|candidate| host.eq_ignore_ascii_case(candidate))
}

/// Pulls the host out of a URL or bare `host:port` authority.
pub(crate) fn extract_host(input: &str) -> Option<&str> {
    let trimmed = input.trim();
    if trimmed.is_empty() {
        return None;
    }

    let after_scheme = match trimmed.find("://") {
        Some(idx) => &trimmed[idx + 3..],
        None => trimmed,
    };

    let authority_end = after_scheme
        .find(['/', '?', '#'])
        .unwrap_or(after_scheme.len());
    let authority = &after_scheme[..authority_end];

    let authority = match authority.rfind('@') {
        Some(idx) => &authority[idx + 1..],
        None => authority,
    };

    if authority.is_empty() {
        return None;
    }

    if let Some(rest) = authority.strip_prefix('[') {
        return rest.find(']').map(|end| &rest[..end]);
    }

    let host = match authority.rfind(':') {
        Some(idx) => &authority[..idx],
        None => authority,
    };

    if host.is_empty() {
        None
    } else {
        Some(host)
    }
}

pub(crate) fn format_loopback_warning(entry_name: &str, sink_tag: &str, offender: &str) -> String {
    format!(
        "scenario entry '{entry_name}' sink `{sink_tag}` targets `{offender}` — this host \
         resolves to the sonda-server container's own loopback, not your host. Use a Docker \
         Compose service name (e.g. `victoriametrics:8428`) or a Kubernetes Service DNS name \
         instead. {LOOPBACK_HINT_DOC}"
    )
}

pub(crate) fn sink_loopback_warnings(entries: &[ScenarioEntry]) -> Vec<String> {
    let mut warnings = Vec::new();
    for entry in entries {
        let base = entry.base();
        let name = base.name.as_str();
        collect_warnings_for_sink(&base.sink, name, &mut warnings);
    }
    warnings
}

pub(crate) fn collect_warnings_for_sink(
    sink: &SinkConfig,
    entry_name: &str,
    out: &mut Vec<String>,
) {
    match sink {
        #[cfg(feature = "http")]
        SinkConfig::HttpPush { url, .. } => {
            if let Some(host) = extract_host(url) {
                if is_loopback_host(host) {
                    out.push(format_loopback_warning(entry_name, "http_push", url));
                }
            }
        }
        #[cfg(feature = "http")]
        SinkConfig::Loki { url, .. } => {
            if let Some(host) = extract_host(url) {
                if is_loopback_host(host) {
                    out.push(format_loopback_warning(entry_name, "loki", url));
                }
            }
        }
        #[cfg(feature = "remote-write")]
        SinkConfig::RemoteWrite { url, .. } => {
            if let Some(host) = extract_host(url) {
                if is_loopback_host(host) {
                    out.push(format_loopback_warning(entry_name, "remote_write", url));
                }
            }
        }
        #[cfg(feature = "otlp")]
        SinkConfig::OtlpGrpc { endpoint, .. } => {
            if let Some(host) = extract_host(endpoint) {
                if is_loopback_host(host) {
                    out.push(format_loopback_warning(entry_name, "otlp_grpc", endpoint));
                }
            }
        }
        #[cfg(feature = "kafka")]
        SinkConfig::Kafka { brokers, .. } => {
            for broker in brokers.split(',') {
                let broker = broker.trim();
                if broker.is_empty() {
                    continue;
                }
                if let Some(host) = extract_host(broker) {
                    if is_loopback_host(host) {
                        out.push(format_loopback_warning(entry_name, "kafka", broker));
                    }
                }
            }
        }
        SinkConfig::Tcp { address, .. } => {
            if let Some(host) = extract_host(address) {
                if is_loopback_host(host) {
                    out.push(format_loopback_warning(entry_name, "tcp", address));
                }
            }
        }
        SinkConfig::Udp { address } => {
            if let Some(host) = extract_host(address) {
                if is_loopback_host(host) {
                    out.push(format_loopback_warning(entry_name, "udp", address));
                }
            }
        }
        _ => {}
    }
}

pub(crate) fn log_warnings(route: &str, warnings: &[String]) {
    for message in warnings {
        warn!(message = %message, route = %route, "{}: sink pre-flight warning", route);
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use sonda_core::compile_scenario_file;
    use sonda_core::compiler::expand::InMemoryPackResolver;

    #[test]
    fn is_loopback_host_matches_canonical_hosts() {
        assert!(is_loopback_host("localhost"));
        assert!(is_loopback_host("127.0.0.1"));
        assert!(is_loopback_host("::1"));
    }

    #[test]
    fn is_loopback_host_is_case_insensitive() {
        assert!(is_loopback_host("LOCALHOST"));
        assert!(is_loopback_host("LocalHost"));
    }

    #[test]
    fn is_loopback_host_rejects_real_hostnames() {
        assert!(!is_loopback_host("victoriametrics"));
        assert!(!is_loopback_host("loki"));
        assert!(!is_loopback_host("10.0.0.1"));
        assert!(!is_loopback_host("192.168.1.10"));
        assert!(!is_loopback_host("127.0.0.2"));
    }

    #[test]
    fn extract_host_parses_http_url() {
        assert_eq!(
            extract_host("http://localhost:8428/api/v1/write"),
            Some("localhost")
        );
        assert_eq!(
            extract_host("https://victoriametrics:8428/write"),
            Some("victoriametrics")
        );
    }

    #[test]
    fn extract_host_parses_url_without_port() {
        assert_eq!(extract_host("http://localhost/push"), Some("localhost"));
    }

    #[test]
    fn extract_host_parses_bare_authority() {
        assert_eq!(extract_host("localhost:9094"), Some("localhost"));
        assert_eq!(
            extract_host("broker.example.com:9092"),
            Some("broker.example.com")
        );
    }

    #[test]
    fn extract_host_parses_ipv6_literal() {
        assert_eq!(extract_host("http://[::1]:8428/write"), Some("::1"));
        assert_eq!(extract_host("[::1]:9000"), Some("::1"));
        assert_eq!(
            extract_host("http://[2001:db8::1]/push"),
            Some("2001:db8::1")
        );
    }

    #[test]
    fn extract_host_handles_userinfo() {
        assert_eq!(
            extract_host("http://user:pass@localhost:8428/write"),
            Some("localhost")
        );
    }

    #[test]
    fn extract_host_rejects_empty_input() {
        assert_eq!(extract_host(""), None);
        assert_eq!(extract_host("   "), None);
    }

    fn compile_single_entry_with_sink(sink_yaml: &str) -> ScenarioEntry {
        let yaml = format!(
            "version: 2\n\
             kind: runnable\n\
             defaults:\n\
             \x20\x20rate: 10\n\
             \x20\x20duration: 500ms\n\
             \x20\x20encoder:\n\
             \x20\x20\x20\x20type: prometheus_text\n\
             {sink_yaml}\n\
             scenarios:\n\
             \x20\x20- id: loopback_test\n\
             \x20\x20\x20\x20signal_type: metrics\n\
             \x20\x20\x20\x20name: loopback_test\n\
             \x20\x20\x20\x20generator:\n\
             \x20\x20\x20\x20\x20\x20type: constant\n\
             \x20\x20\x20\x20\x20\x20value: 1.0\n"
        );
        let resolver = InMemoryPackResolver::new();
        let mut entries = compile_scenario_file(&yaml, &resolver).expect("compile must succeed");
        assert_eq!(entries.len(), 1, "test fixture must compile to one entry");
        entries.pop().unwrap()
    }

    #[test]
    fn sink_loopback_warnings_flags_tcp_localhost() {
        let entry = compile_single_entry_with_sink(
            "\x20\x20sink:\n\x20\x20\x20\x20type: tcp\n\x20\x20\x20\x20address: localhost:9000",
        );
        let warnings = sink_loopback_warnings(std::slice::from_ref(&entry));
        assert_eq!(warnings.len(), 1);
        assert!(warnings[0].contains("loopback_test"));
        assert!(warnings[0].contains("tcp"));
        assert!(warnings[0].contains("localhost:9000"));
        assert!(warnings[0].contains("deployment/endpoints"));
    }

    #[test]
    fn sink_loopback_warnings_flags_udp_127_0_0_1() {
        let entry = compile_single_entry_with_sink(
            "\x20\x20sink:\n\x20\x20\x20\x20type: udp\n\x20\x20\x20\x20address: 127.0.0.1:9000",
        );
        let warnings = sink_loopback_warnings(std::slice::from_ref(&entry));
        assert_eq!(warnings.len(), 1);
        assert!(warnings[0].contains("udp"));
        assert!(warnings[0].contains("127.0.0.1:9000"));
    }

    #[test]
    fn sink_loopback_warnings_skips_stdout() {
        let entry = compile_single_entry_with_sink("\x20\x20sink:\n\x20\x20\x20\x20type: stdout");
        let warnings = sink_loopback_warnings(std::slice::from_ref(&entry));
        assert!(warnings.is_empty());
    }

    #[test]
    fn sink_loopback_warnings_skips_real_tcp_host() {
        let entry = compile_single_entry_with_sink(
            "\x20\x20sink:\n\x20\x20\x20\x20type: tcp\n\x20\x20\x20\x20address: syslog.example.com:514",
        );
        let warnings = sink_loopback_warnings(std::slice::from_ref(&entry));
        assert!(warnings.is_empty());
    }

    #[cfg(feature = "http")]
    #[test]
    fn sink_loopback_warnings_flags_http_push_localhost() {
        let entry = compile_single_entry_with_sink(
            "\x20\x20sink:\n\x20\x20\x20\x20type: http_push\n\x20\x20\x20\x20url: http://localhost:8428/api/v1/write",
        );
        let warnings = sink_loopback_warnings(std::slice::from_ref(&entry));
        assert_eq!(warnings.len(), 1);
        assert!(warnings[0].contains("http_push"));
        assert!(warnings[0].contains("http://localhost:8428/api/v1/write"));
    }

    #[cfg(feature = "http")]
    #[test]
    fn sink_loopback_warnings_flags_http_push_ipv6_loopback() {
        let entry = compile_single_entry_with_sink(
            "\x20\x20sink:\n\x20\x20\x20\x20type: http_push\n\x20\x20\x20\x20url: http://[::1]:8428/api/v1/write",
        );
        let warnings = sink_loopback_warnings(std::slice::from_ref(&entry));
        assert_eq!(warnings.len(), 1);
        assert!(warnings[0].contains("[::1]"));
    }

    #[cfg(feature = "http")]
    #[test]
    fn sink_loopback_warnings_skips_http_push_service_name() {
        let entry = compile_single_entry_with_sink(
            "\x20\x20sink:\n\x20\x20\x20\x20type: http_push\n\x20\x20\x20\x20url: http://victoriametrics:8428/api/v1/write",
        );
        let warnings = sink_loopback_warnings(std::slice::from_ref(&entry));
        assert!(warnings.is_empty());
    }

    #[cfg(feature = "remote-write")]
    #[test]
    fn sink_loopback_warnings_flags_remote_write_localhost() {
        let entry = compile_single_entry_with_sink(
            "\x20\x20sink:\n\x20\x20\x20\x20type: remote_write\n\x20\x20\x20\x20url: http://localhost:8428/api/v1/write",
        );
        let warnings = sink_loopback_warnings(std::slice::from_ref(&entry));
        assert_eq!(warnings.len(), 1);
        assert!(warnings[0].contains("remote_write"));
    }

    #[cfg(feature = "kafka")]
    #[test]
    fn sink_loopback_warnings_flags_one_localhost_broker_in_mixed_list() {
        let entry = compile_single_entry_with_sink(
            "\x20\x20sink:\n\x20\x20\x20\x20type: kafka\n\
             \x20\x20\x20\x20brokers: \"localhost:9094,real-broker:9092\"\n\
             \x20\x20\x20\x20topic: logs",
        );
        let warnings = sink_loopback_warnings(std::slice::from_ref(&entry));
        assert_eq!(warnings.len(), 1);
        assert!(warnings[0].contains("kafka"));
        assert!(warnings[0].contains("localhost:9094"));
        assert!(!warnings[0].contains("real-broker"));
    }

    #[cfg(feature = "otlp")]
    #[test]
    fn sink_loopback_warnings_flags_otlp_grpc_localhost() {
        let entry = compile_single_entry_with_sink(
            "\x20\x20sink:\n\x20\x20\x20\x20type: otlp_grpc\n\
             \x20\x20\x20\x20endpoint: http://localhost:4317\n\
             \x20\x20\x20\x20signal_type: metrics",
        );
        let warnings = sink_loopback_warnings(std::slice::from_ref(&entry));
        assert_eq!(warnings.len(), 1);
        assert!(warnings[0].contains("otlp_grpc"));
        assert!(warnings[0].contains("http://localhost:4317"));
    }

    #[test]
    fn collect_warnings_for_sink_flags_tcp_localhost() {
        let sink = SinkConfig::Tcp {
            address: "127.0.0.1:9000".to_string(),
            retry: None,
        };
        let mut out = Vec::new();
        collect_warnings_for_sink(&sink, "events", &mut out);
        assert_eq!(out.len(), 1);
        assert!(out[0].contains("'events'"));
        assert!(out[0].contains("tcp"));
    }

    #[test]
    fn collect_warnings_for_sink_skips_stdout() {
        let sink = SinkConfig::Stdout;
        let mut out = Vec::new();
        collect_warnings_for_sink(&sink, "events", &mut out);
        assert!(out.is_empty());
    }
}