use std::path::Path;
use anyhow::Result;
use serde_json::Value;
use crate::finding::{Category, Finding, Severity};
use crate::scanner::{ScanContext, Scanner};
pub struct NetworkScanner;
impl Scanner for NetworkScanner {
fn name(&self) -> &'static str {
"network"
}
fn scan(&self, ctx: &ScanContext) -> Result<Vec<Finding>> {
let mut findings = Vec::new();
for name in &["settings.json", "settings.local.json"] {
let path = ctx.root.join(name);
if path.exists() {
if let Ok(content) = std::fs::read_to_string(&path) {
check_network_config(&content, &path, &mut findings);
}
}
}
Ok(findings)
}
}
fn check_network_config(content: &str, path: &Path, findings: &mut Vec<Finding>) {
let Ok(json): Result<Value, _> = serde_json::from_str(content) else {
return;
};
check_mcp_urls(&json, path, findings);
}
fn check_mcp_urls(json: &Value, path: &Path, findings: &mut Vec<Finding>) {
let Some(servers) = json
.pointer("/mcpServers")
.or_else(|| json.get("mcp_servers"))
.and_then(Value::as_object)
else {
return;
};
for (server_name, server_cfg) in servers {
if let Some(url) = server_cfg.get("url").and_then(Value::as_str) {
check_url(url, server_name, path, findings);
}
if let Some(url) = server_cfg.get("baseUrl").and_then(Value::as_str) {
check_url(url, server_name, path, findings);
}
if let Some(env) = server_cfg.get("env").and_then(Value::as_object) {
for (_, val) in env {
if let Some(s) = val.as_str() {
if s.starts_with("http://") || s.starts_with("https://") {
check_url(s, server_name, path, findings);
}
}
}
}
}
}
fn check_url(url: &str, server_name: &str, path: &Path, findings: &mut Vec<Finding>) {
let url_lower = url.to_lowercase();
let safe_url = sanitize_url(url);
if url_lower.starts_with("http://")
&& !url_lower.contains("localhost")
&& !url_lower.contains("127.0.0.1")
&& !url_lower.contains("::1")
{
findings.push(
Finding::new(
Severity::High,
Category::NetworkSecurity,
format!("MCP server '{}' uses unencrypted HTTP", server_name),
format!(
"The MCP server '{}' in '{}' connects over HTTP. \
Credentials and tool outputs sent to this server are transmitted \
in plain text and can be intercepted.",
server_name,
path.display(),
),
path,
format!(
"Update the URL for '{}' to use HTTPS: replace `http://` with `https://`.",
server_name
),
)
.with_evidence(safe_url.clone()),
);
}
let is_ip = is_bare_ip_address(url);
if is_ip && !url_lower.contains("127.0.0.1") && !url_lower.contains("::1") {
findings.push(
Finding::new(
Severity::Low,
Category::NetworkSecurity,
format!("MCP server '{}' connects to a bare IP address", server_name),
format!(
"Server '{}' in '{}' uses a bare IP address. \
This makes it harder to audit what service the agent is connecting to.",
server_name,
path.display()
),
path,
"Use a fully qualified domain name instead of a bare IP address.",
)
.with_evidence(safe_url),
);
}
}
fn sanitize_url(url: &str) -> String {
if let (Some(scheme_end), Some(at)) = (url.find("://"), url.find('@')) {
if at > scheme_end {
let scheme = &url[..scheme_end + 3];
let rest = &url[at + 1..];
return format!("{}[credentials-redacted]@{}", scheme, rest);
}
}
url.to_string()
}
fn is_bare_ip_address(url: &str) -> bool {
let host_part = if let Some(s) = url.find("://") {
&url[s + 3..]
} else {
url
};
let host = host_part.split('/').next().unwrap_or(host_part);
let host = host.split(':').next().unwrap_or(host);
let parts: Vec<&str> = host.split('.').collect();
if parts.len() == 4 && parts.iter().all(|p| p.parse::<u8>().is_ok()) {
return true;
}
false
}
#[cfg(test)]
mod tests {
use super::*;
use std::path::PathBuf;
fn check(json_str: &str) -> Vec<Finding> {
let mut findings = Vec::new();
check_network_config(
json_str,
&PathBuf::from("/test/settings.json"),
&mut findings,
);
findings
}
#[test]
fn detects_http_external_url() {
let json = r#"{
"mcpServers": {
"my-server": {"url": "http://api.example.com/mcp"}
}
}"#;
let f = check(json);
assert!(f
.iter()
.any(|x| x.severity == Severity::High && x.title.contains("HTTP")));
}
#[test]
fn no_finding_for_https_url() {
let json = r#"{
"mcpServers": {
"my-server": {"url": "https://api.example.com/mcp"}
}
}"#;
assert!(check(json).is_empty());
}
#[test]
fn no_finding_for_http_localhost() {
let json = r#"{
"mcpServers": {
"local": {"url": "http://localhost:3000/mcp"}
}
}"#;
assert!(
check(json).is_empty(),
"localhost HTTP should not be flagged"
);
}
#[test]
fn detects_bare_ip_url() {
let json = r#"{
"mcpServers": {
"remote": {"url": "https://192.168.1.100:8080/mcp"}
}
}"#;
let f = check(json);
assert!(f.iter().any(|x| x.title.contains("bare IP")));
}
#[test]
fn no_finding_for_localhost_ip() {
let json = r#"{
"mcpServers": {
"local": {"url": "http://127.0.0.1:3000/mcp"}
}
}"#;
let f = check(json);
assert!(!f.iter().any(|x| x.severity == Severity::High));
}
#[test]
fn is_bare_ip_address_true() {
assert!(is_bare_ip_address("https://192.168.1.1/path"));
assert!(is_bare_ip_address("http://10.0.0.1:8080"));
}
#[test]
fn is_bare_ip_address_false() {
assert!(!is_bare_ip_address("https://api.example.com/mcp"));
assert!(!is_bare_ip_address("http://localhost:3000"));
}
#[test]
fn sanitize_url_strips_credentials() {
assert_eq!(
sanitize_url("http://user:pass@host.com/path"),
"http://[credentials-redacted]@host.com/path"
);
}
#[test]
fn sanitize_url_no_credentials_unchanged() {
let clean = "https://api.example.com/mcp";
assert_eq!(sanitize_url(clean), clean);
}
}