openclaw-scan 0.1.1

Security scanner for agentic AI framework installations (OpenClaw, Claude Code, and compatible)
Documentation
//! Network security scanner.
//!
//! Inspects MCP server endpoint URLs and OAuth configurations in settings
//! files for insecure or suspicious network settings.

use std::path::Path;

use anyhow::Result;
use serde_json::Value;

use crate::finding::{Category, Finding, Severity};
use crate::scanner::{ScanContext, Scanner};

pub struct NetworkScanner;

impl Scanner for NetworkScanner {
    fn name(&self) -> &'static str {
        "network"
    }

    fn scan(&self, ctx: &ScanContext) -> Result<Vec<Finding>> {
        let mut findings = Vec::new();

        for name in &["settings.json", "settings.local.json"] {
            let path = ctx.root.join(name);
            if path.exists() {
                if let Ok(content) = std::fs::read_to_string(&path) {
                    check_network_config(&content, &path, &mut findings);
                }
            }
        }

        Ok(findings)
    }
}

fn check_network_config(content: &str, path: &Path, findings: &mut Vec<Finding>) {
    let Ok(json): Result<Value, _> = serde_json::from_str(content) else {
        return;
    };

    check_mcp_urls(&json, path, findings);
}

fn check_mcp_urls(json: &Value, path: &Path, findings: &mut Vec<Finding>) {
    let Some(servers) = json
        .pointer("/mcpServers")
        .or_else(|| json.get("mcp_servers"))
        .and_then(Value::as_object)
    else {
        return;
    };

    for (server_name, server_cfg) in servers {
        // Check `url` field
        if let Some(url) = server_cfg.get("url").and_then(Value::as_str) {
            check_url(url, server_name, path, findings);
        }
        // Check `baseUrl` field
        if let Some(url) = server_cfg.get("baseUrl").and_then(Value::as_str) {
            check_url(url, server_name, path, findings);
        }
        // Check env vars for embedded URLs
        if let Some(env) = server_cfg.get("env").and_then(Value::as_object) {
            for (_, val) in env {
                if let Some(s) = val.as_str() {
                    if s.starts_with("http://") || s.starts_with("https://") {
                        check_url(s, server_name, path, findings);
                    }
                }
            }
        }
    }
}

fn check_url(url: &str, server_name: &str, path: &Path, findings: &mut Vec<Finding>) {
    let url_lower = url.to_lowercase();
    // Strip embedded credentials before storing as evidence (H-1).
    let safe_url = sanitize_url(url);

    // HTTP (non-TLS) to a non-localhost address is a risk
    if url_lower.starts_with("http://")
        && !url_lower.contains("localhost")
        && !url_lower.contains("127.0.0.1")
        && !url_lower.contains("::1")
    {
        findings.push(
            Finding::new(
                Severity::High,
                Category::NetworkSecurity,
                format!("MCP server '{}' uses unencrypted HTTP", server_name),
                format!(
                    "The MCP server '{}' in '{}' connects over HTTP. \
                     Credentials and tool outputs sent to this server are transmitted \
                     in plain text and can be intercepted.",
                    server_name,
                    path.display(),
                ),
                path,
                format!(
                    "Update the URL for '{}' to use HTTPS: replace `http://` with `https://`.",
                    server_name
                ),
            )
            .with_evidence(safe_url.clone()),
        );
    }

    // IP address instead of hostname (harder to audit intent)
    let is_ip = is_bare_ip_address(url);
    if is_ip && !url_lower.contains("127.0.0.1") && !url_lower.contains("::1") {
        findings.push(
            Finding::new(
                Severity::Low,
                Category::NetworkSecurity,
                format!("MCP server '{}' connects to a bare IP address", server_name),
                format!(
                    "Server '{}' in '{}' uses a bare IP address. \
                     This makes it harder to audit what service the agent is connecting to.",
                    server_name,
                    path.display()
                ),
                path,
                "Use a fully qualified domain name instead of a bare IP address.",
            )
            .with_evidence(safe_url),
        );
    }
}

/// Replace `user:password@` in a URL with `[credentials-redacted]@`
/// so embedded credentials are never stored in evidence fields (H-1).
fn sanitize_url(url: &str) -> String {
    if let (Some(scheme_end), Some(at)) = (url.find("://"), url.find('@')) {
        if at > scheme_end {
            let scheme = &url[..scheme_end + 3];
            let rest = &url[at + 1..];
            return format!("{}[credentials-redacted]@{}", scheme, rest);
        }
    }
    url.to_string()
}

/// Returns `true` if the URL host part looks like a raw IPv4/IPv6 address.
fn is_bare_ip_address(url: &str) -> bool {
    // Strip scheme
    let host_part = if let Some(s) = url.find("://") {
        &url[s + 3..]
    } else {
        url
    };
    // Strip path/port
    let host = host_part.split('/').next().unwrap_or(host_part);
    let host = host.split(':').next().unwrap_or(host);
    // IPv4: four dotted decimal octets
    let parts: Vec<&str> = host.split('.').collect();
    if parts.len() == 4 && parts.iter().all(|p| p.parse::<u8>().is_ok()) {
        return true;
    }
    false
}

// ── Tests ─────────────────────────────────────────────────────────────────────

#[cfg(test)]
mod tests {
    use super::*;
    use std::path::PathBuf;

    fn check(json_str: &str) -> Vec<Finding> {
        let mut findings = Vec::new();
        check_network_config(
            json_str,
            &PathBuf::from("/test/settings.json"),
            &mut findings,
        );
        findings
    }

    #[test]
    fn detects_http_external_url() {
        let json = r#"{
            "mcpServers": {
                "my-server": {"url": "http://api.example.com/mcp"}
            }
        }"#;
        let f = check(json);
        assert!(f
            .iter()
            .any(|x| x.severity == Severity::High && x.title.contains("HTTP")));
    }

    #[test]
    fn no_finding_for_https_url() {
        let json = r#"{
            "mcpServers": {
                "my-server": {"url": "https://api.example.com/mcp"}
            }
        }"#;
        assert!(check(json).is_empty());
    }

    #[test]
    fn no_finding_for_http_localhost() {
        let json = r#"{
            "mcpServers": {
                "local": {"url": "http://localhost:3000/mcp"}
            }
        }"#;
        assert!(
            check(json).is_empty(),
            "localhost HTTP should not be flagged"
        );
    }

    #[test]
    fn detects_bare_ip_url() {
        let json = r#"{
            "mcpServers": {
                "remote": {"url": "https://192.168.1.100:8080/mcp"}
            }
        }"#;
        let f = check(json);
        assert!(f.iter().any(|x| x.title.contains("bare IP")));
    }

    #[test]
    fn no_finding_for_localhost_ip() {
        let json = r#"{
            "mcpServers": {
                "local": {"url": "http://127.0.0.1:3000/mcp"}
            }
        }"#;
        // 127.0.0.1 HTTP is low-risk (local only); should not fire the HTTP finding
        let f = check(json);
        assert!(!f.iter().any(|x| x.severity == Severity::High));
    }

    #[test]
    fn is_bare_ip_address_true() {
        assert!(is_bare_ip_address("https://192.168.1.1/path"));
        assert!(is_bare_ip_address("http://10.0.0.1:8080"));
    }

    #[test]
    fn is_bare_ip_address_false() {
        assert!(!is_bare_ip_address("https://api.example.com/mcp"));
        assert!(!is_bare_ip_address("http://localhost:3000"));
    }

    #[test]
    fn sanitize_url_strips_credentials() {
        assert_eq!(
            sanitize_url("http://user:pass@host.com/path"),
            "http://[credentials-redacted]@host.com/path"
        );
    }

    #[test]
    fn sanitize_url_no_credentials_unchanged() {
        let clean = "https://api.example.com/mcp";
        assert_eq!(sanitize_url(clean), clean);
    }
}