use crate::error::Result;
use crate::schema::{NetworkFinding, ScanResult, SourceLocation};
use once_cell::sync::Lazy;
use regex::Regex;
static HTTP_LIBRARY_PATTERNS: &[&str] = &[
"import requests",
"from requests import",
"import httpx",
"from httpx import",
"from urllib.request import",
"from urllib import",
"import urllib",
"import aiohttp",
"from aiohttp import",
"import http.client",
"from http.client import",
];
static API_CLIENT_PATTERNS: &[&str] = &[
"from openai import",
"import openai",
"from anthropic import",
"import anthropic",
"from google.generativeai import",
"import google.generativeai",
];
static HTTP_METHOD_PATTERNS: Lazy<Vec<(&str, Regex)>> = Lazy::new(|| {
vec![
("requests_get", Regex::new(r"requests\.get\s*\(").unwrap()),
("requests_post", Regex::new(r"requests\.post\s*\(").unwrap()),
("requests_put", Regex::new(r"requests\.put\s*\(").unwrap()),
(
"requests_delete",
Regex::new(r"requests\.delete\s*\(").unwrap(),
),
("httpx_get", Regex::new(r"httpx\.get\s*\(").unwrap()),
("httpx_post", Regex::new(r"httpx\.post\s*\(").unwrap()),
(
"httpx_client",
Regex::new(r"httpx\.(Client|AsyncClient)\s*\(").unwrap(),
),
(
"urllib_urlopen",
Regex::new(r"urllib\.request\.urlopen\s*\(|urlopen\s*\(").unwrap(),
),
(
"aiohttp_session",
Regex::new(r"aiohttp\.ClientSession\s*\(").unwrap(),
),
]
});
static API_CLIENT_INSTANTIATION: Lazy<Vec<(&str, Regex)>> = Lazy::new(|| {
vec![
(
"openai_client",
Regex::new(r"OpenAI\s*\(|openai\.OpenAI\s*\(").unwrap(),
),
(
"anthropic_client",
Regex::new(r"Anthropic\s*\(|anthropic\.Anthropic\s*\(").unwrap(),
),
(
"google_genai",
Regex::new(r"genai\.GenerativeModel\s*\(").unwrap(),
),
]
});
static URL_PATTERNS: Lazy<Vec<(&str, Regex)>> = Lazy::new(|| {
vec![
(
"http_url",
Regex::new(r#"https?://[^\s'"]+\s*["')\]]"#).unwrap(),
),
(
"webhook_url",
Regex::new(r#"https?://hooks\.[^\s'"]+\s*["')\]]"#).unwrap(),
),
(
"api_endpoint",
Regex::new(r#"https?://api\.[^\s'"]+\s*["')\]]"#).unwrap(),
),
]
});
static SOCKET_PATTERNS: Lazy<Vec<(&str, Regex)>> = Lazy::new(|| {
vec![
(
"socket_connect",
Regex::new(r"socket\.socket\s*\(").unwrap(),
),
("socket_bind", Regex::new(r"\.bind\s*\(").unwrap()),
("socket_listen", Regex::new(r"\.listen\s*\(").unwrap()),
]
});
pub fn analyze(result: &ScanResult) -> Result<Vec<NetworkFinding>> {
let mut findings = Vec::new();
for file_path in &result.manifest.files {
if let Ok(content) = std::fs::read_to_string(file_path) {
let file_findings = scan_file(file_path, &content)?;
findings.extend(file_findings);
}
}
Ok(findings)
}
fn scan_file(file_path: &str, content: &str) -> Result<Vec<NetworkFinding>> {
let mut findings = Vec::new();
for (line_num, line) in content.lines().enumerate() {
let line_number = (line_num + 1) as u32;
for pattern in HTTP_LIBRARY_PATTERNS {
if line.contains(pattern) {
findings.push(NetworkFinding {
id: format!(
"network_import_{}_{}",
file_path.replace(['/', '.'], "_"),
line_number
),
network_type: "http_library_import".to_string(),
technology: extract_technology_from_import(line),
location: SourceLocation {
file: file_path.to_string(),
line: line_number,
end_line: Some(line_number),
function: None,
},
endpoint: None,
method: None,
message: format!("HTTP library import detected: {pattern}"),
});
break;
}
}
for pattern in API_CLIENT_PATTERNS {
if line.contains(pattern) {
findings.push(NetworkFinding {
id: format!(
"network_api_import_{}_{}",
file_path.replace(['/', '.'], "_"),
line_number
),
network_type: "api_client_import".to_string(),
technology: extract_technology_from_import(line),
location: SourceLocation {
file: file_path.to_string(),
line: line_number,
end_line: Some(line_number),
function: None,
},
endpoint: None,
method: None,
message: format!("API client import detected: {pattern}"),
});
break;
}
}
for (tech, pattern) in HTTP_METHOD_PATTERNS.iter() {
if pattern.is_match(line) {
findings.push(NetworkFinding {
id: format!(
"network_http_{}_{}",
file_path.replace(['/', '.'], "_"),
line_number
),
network_type: "http_call".to_string(),
technology: tech.to_string(),
location: SourceLocation {
file: file_path.to_string(),
line: line_number,
end_line: Some(line_number),
function: None,
},
endpoint: extract_url_from_line(line),
method: Some(extract_http_method(tech)),
message: format!("HTTP call: {tech}"),
});
}
}
for (tech, pattern) in API_CLIENT_INSTANTIATION.iter() {
if pattern.is_match(line) {
findings.push(NetworkFinding {
id: format!(
"network_client_{}_{}",
file_path.replace(['/', '.'], "_"),
line_number
),
network_type: "api_client".to_string(),
technology: tech.to_string(),
location: SourceLocation {
file: file_path.to_string(),
line: line_number,
end_line: Some(line_number),
function: None,
},
endpoint: None,
method: None,
message: format!("API client instantiation: {tech}"),
});
}
}
for (tech, pattern) in URL_PATTERNS.iter() {
if let Some(cap) = pattern.find(line) {
findings.push(NetworkFinding {
id: format!(
"network_url_{}_{}",
file_path.replace(['/', '.'], "_"),
line_number
),
network_type: "url_reference".to_string(),
technology: tech.to_string(),
location: SourceLocation {
file: file_path.to_string(),
line: line_number,
end_line: Some(line_number),
function: None,
},
endpoint: Some(
cap.as_str()
.trim_end_matches(['\'', '"', ')', ']'])
.to_string(),
),
method: None,
message: format!("URL reference: {tech}"),
});
}
}
for (tech, pattern) in SOCKET_PATTERNS.iter() {
if pattern.is_match(line) {
findings.push(NetworkFinding {
id: format!(
"network_socket_{}_{}",
file_path.replace(['/', '.'], "_"),
line_number
),
network_type: "socket_connection".to_string(),
technology: tech.to_string(),
location: SourceLocation {
file: file_path.to_string(),
line: line_number,
end_line: Some(line_number),
function: None,
},
endpoint: None,
method: None,
message: format!("Socket operation: {tech}"),
});
}
}
}
Ok(findings)
}
fn extract_technology_from_import(line: &str) -> String {
let line_lower = line.to_lowercase();
if line_lower.contains("requests") {
"requests".to_string()
} else if line_lower.contains("httpx") {
"httpx".to_string()
} else if line_lower.contains("urllib") {
"urllib".to_string()
} else if line_lower.contains("aiohttp") {
"aiohttp".to_string()
} else if line_lower.contains("openai") {
"openai".to_string()
} else if line_lower.contains("anthropic") {
"anthropic".to_string()
} else if line_lower.contains("google.generativeai") {
"google_genai".to_string()
} else {
"unknown".to_string()
}
}
fn extract_url_from_line(line: &str) -> Option<String> {
let url_pattern = Regex::new(r#"["'](https?://[^"']+)["']"#).ok()?;
url_pattern
.captures(line)
.and_then(|cap| cap.get(1))
.map(|m| m.as_str().to_string())
}
fn extract_http_method(tech: &str) -> String {
if tech.contains("_get") {
"GET".to_string()
} else if tech.contains("_post") {
"POST".to_string()
} else if tech.contains("_put") {
"PUT".to_string()
} else if tech.contains("_delete") {
"DELETE".to_string()
} else {
"UNKNOWN".to_string()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_requests_import_detection() {
let code = "import requests";
let findings = scan_file("test.py", code).unwrap();
assert!(!findings.is_empty());
assert!(findings
.iter()
.any(|f| f.network_type == "http_library_import"));
assert!(findings.iter().any(|f| f.technology == "requests"));
}
#[test]
fn test_requests_get_call() {
let code = r#"response = requests.get("https://api.example.com/data")"#;
let findings = scan_file("test.py", code).unwrap();
assert!(!findings.is_empty());
assert!(findings.iter().any(|f| f.network_type == "http_call"));
assert!(findings.iter().any(|f| f.method == Some("GET".to_string())));
}
#[test]
fn test_requests_post_call() {
let code = r#"response = requests.post(url, json=data)"#;
let findings = scan_file("test.py", code).unwrap();
assert!(!findings.is_empty());
assert!(findings
.iter()
.any(|f| f.method == Some("POST".to_string())));
}
#[test]
fn test_httpx_async_client() {
let code = "async with httpx.AsyncClient() as client:";
let findings = scan_file("test.py", code).unwrap();
assert!(!findings.is_empty());
assert!(findings.iter().any(|f| f.technology == "httpx_client"));
}
#[test]
fn test_openai_client() {
let code = r#"client = OpenAI(api_key="sk-...")"#;
let findings = scan_file("test.py", code).unwrap();
assert!(!findings.is_empty());
assert!(findings.iter().any(|f| f.network_type == "api_client"));
assert!(findings.iter().any(|f| f.technology == "openai_client"));
}
#[test]
fn test_anthropic_client() {
let code = "client = Anthropic()";
let findings = scan_file("test.py", code).unwrap();
assert!(!findings.is_empty());
assert!(findings.iter().any(|f| f.technology == "anthropic_client"));
}
#[test]
fn test_webhook_url_detection() {
let code = r#"webhook = "https://hooks.slack.com/services/...""#;
let findings = scan_file("test.py", code).unwrap();
assert!(!findings.is_empty());
assert!(findings.iter().any(|f| f.network_type == "url_reference"));
}
#[test]
fn test_url_extraction() {
let code = r#"url = "https://api.example.com/endpoint""#;
let findings = scan_file("test.py", code).unwrap();
assert!(!findings.is_empty());
let finding = findings
.iter()
.find(|f| f.network_type == "url_reference")
.unwrap();
assert!(finding.endpoint.is_some());
}
#[test]
fn test_no_false_positives() {
let code = "print('Hello World')";
let findings = scan_file("test.py", code).unwrap();
assert!(findings.is_empty());
}
}