use crate::error::Result;
use crate::schema::{DataFlow, ProvenanceFinding, ScanResult, SourceLocation};
use once_cell::sync::Lazy;
use regex::Regex;
use std::collections::{HashMap, HashSet};
static SOURCE_PATTERNS: &[(&str, &str)] = &[
("user_input", r"input\s*\("),
("file_read", r"open\s*\(|\.read\(|\.readlines\("),
("api_call", r"requests\.(get|post|put|delete)\("),
("httpx_call", r"httpx\.(get|post)\("),
("form_data", r"request\.(form|json|args|files)"),
("command_args", r"sys\.argv"),
("env_var", r"os\.environ\[|os\.getenv\("),
];
static SINK_PATTERNS: &[(&str, &str)] = &[
("print", r"\bprint\s*\("),
("file_write", r"\.write\(|\.write_text\("),
("api_send", r"requests\.(post|put)\("),
("httpx_send", r"httpx\.post\("),
("subprocess", r"subprocess\.(run|call|Popen)\("),
("database_insert", r"\.insert\(|\.execute\(|\.query\("),
("llm_call", r"\.chat\(|\.complete\(|\.generate\("),
];
static ASSIGNMENT_PATTERN: Lazy<Regex> = Lazy::new(|| Regex::new(r"(\w+)\s*=\s*(.+)").unwrap());
#[allow(dead_code)]
static FUNCTION_CALL_PATTERN: Lazy<Regex> =
Lazy::new(|| Regex::new(r"(\w+)\s*\(([^)]*)\)").unwrap());
#[derive(Debug, Clone)]
struct TaintState {
tainted_vars: HashSet<String>,
assignments: HashMap<String, String>,
flows: Vec<DataFlow>,
}
impl TaintState {
fn new() -> Self {
Self {
tainted_vars: HashSet::new(),
assignments: HashMap::new(),
flows: Vec::new(),
}
}
fn taint_variable(&mut self, var: String, source: String) {
self.tainted_vars.insert(var.clone());
self.flows.push(DataFlow {
variable: var,
source,
readers: Vec::new(),
writers: Vec::new(),
taint_level: "high".to_string(),
});
}
#[allow(dead_code)]
fn is_tainted(&self, var: &str) -> bool {
self.tainted_vars.contains(var)
}
fn propagate_taint(&mut self, target: &str, source_expr: &str) {
for tainted_var in &self.tainted_vars {
if source_expr.contains(tainted_var) {
self.tainted_vars.insert(target.to_string());
break;
}
}
}
}
pub fn analyze(result: &ScanResult) -> Result<Vec<ProvenanceFinding>> {
let mut findings = Vec::new();
for file_path in &result.manifest.files {
if let Ok(content) = std::fs::read_to_string(file_path) {
let file_findings = scan_file(file_path, &content)?;
findings.extend(file_findings);
}
}
Ok(findings)
}
fn scan_file(file_path: &str, content: &str) -> Result<Vec<ProvenanceFinding>> {
let mut findings = Vec::new();
let mut taint_state = TaintState::new();
let lines: Vec<&str> = content.lines().collect();
for (line_num, line) in lines.iter().enumerate() {
let _line_number = (line_num + 1) as u32;
for (source_type, pattern_str) in SOURCE_PATTERNS {
let pattern = Regex::new(pattern_str).unwrap();
if pattern.is_match(line) {
if let Some(var_name) = extract_assignment_target(line) {
taint_state.taint_variable(var_name.clone(), source_type.to_string());
}
}
}
if let Some(captures) = ASSIGNMENT_PATTERN.captures(line) {
if let (Some(target), Some(expr)) = (captures.get(1), captures.get(2)) {
let target_var = target.as_str();
let source_expr = expr.as_str();
taint_state.propagate_taint(target_var, source_expr);
taint_state
.assignments
.insert(target_var.to_string(), source_expr.to_string());
}
}
}
for (line_num, line) in lines.iter().enumerate() {
let line_number = (line_num + 1) as u32;
for (sink_type, pattern_str) in SINK_PATTERNS {
let pattern = Regex::new(pattern_str).unwrap();
if pattern.is_match(line) {
let tainted_vars_in_line: Vec<String> = taint_state
.tainted_vars
.iter()
.filter(|var| line.contains(var.as_str()))
.cloned()
.collect();
if !tainted_vars_in_line.is_empty() {
findings.push(ProvenanceFinding {
id: format!(
"provenance_{}_{}",
file_path.replace(['/', '.'], "_"),
line_number
),
finding_type: "tainted_sink".to_string(),
source_type: "untrusted_input".to_string(),
sink_type: sink_type.to_string(),
tainted_variables: tainted_vars_in_line.clone(),
location: SourceLocation {
file: file_path.to_string(),
line: line_number,
end_line: Some(line_number),
function: None,
},
severity: determine_severity(sink_type),
message: format!(
"Tainted data from untrusted source reaches {}: {}",
sink_type,
tainted_vars_in_line.join(", ")
),
data_flow: extract_data_flow(&taint_state, &tainted_vars_in_line),
});
}
}
}
}
Ok(findings)
}
fn extract_assignment_target(line: &str) -> Option<String> {
if let Some(captures) = ASSIGNMENT_PATTERN.captures(line) {
captures.get(1).map(|m| m.as_str().trim().to_string())
} else {
None
}
}
fn extract_data_flow(state: &TaintState, vars: &[String]) -> Option<String> {
if vars.is_empty() {
return None;
}
let flows: Vec<String> = state
.flows
.iter()
.filter(|flow| vars.contains(&flow.variable))
.map(|flow| format!("{} ({})", flow.variable, flow.source))
.collect();
if flows.is_empty() {
None
} else {
Some(flows.join(" → "))
}
}
fn determine_severity(sink_type: &str) -> String {
match sink_type {
"subprocess" | "database_insert" => "critical".to_string(),
"file_write" | "api_send" | "llm_call" => "high".to_string(),
"print" | "httpx_send" => "medium".to_string(),
_ => "low".to_string(),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_simple_taint_flow() {
let code = r#"
user_input = input("Enter data: ")
print(user_input)
"#;
let findings = scan_file("test.py", code).unwrap();
assert!(!findings.is_empty());
assert!(findings.iter().any(|f| f.finding_type == "tainted_sink"));
assert!(findings.iter().any(|f| f.sink_type == "print"));
}
#[test]
fn test_taint_propagation() {
let code = r#"
user_input = input("Enter data: ")
processed = user_input.upper()
print(processed)
"#;
let findings = scan_file("test.py", code).unwrap();
assert!(!findings.is_empty());
assert!(findings
.iter()
.any(|f| f.tainted_variables.contains(&"processed".to_string())));
}
#[test]
fn test_file_read_to_write() {
let code = r#"
data = open("input.txt").read()
with open("output.txt", "w") as f:
f.write(data)
"#;
let findings = scan_file("test.py", code).unwrap();
assert!(!findings.is_empty());
assert!(findings.iter().any(|f| f.sink_type == "file_write"));
}
#[test]
fn test_api_to_database() {
let code = r#"
response = requests.get(url).json()
db.insert(response)
"#;
let findings = scan_file("test.py", code).unwrap();
assert!(!findings.is_empty());
assert!(findings.iter().any(|f| f.sink_type == "database_insert"));
assert!(findings.iter().any(|f| f.severity == "critical"));
}
#[test]
fn test_user_input_to_subprocess() {
let code = r#"
command = input("Enter command: ")
subprocess.run(command)
"#;
let findings = scan_file("test.py", code).unwrap();
assert!(!findings.is_empty());
assert!(findings.iter().any(|f| f.sink_type == "subprocess"));
assert!(findings.iter().any(|f| f.severity == "critical"));
}
#[test]
fn test_form_data_to_llm() {
let code = r#"
user_query = request.form['query']
response = llm.chat(user_query)
"#;
let findings = scan_file("test.py", code).unwrap();
assert!(!findings.is_empty());
assert!(findings.iter().any(|f| f.sink_type == "llm_call"));
}
#[test]
fn test_no_taint_safe_flow() {
let code = r#"
safe_data = "hardcoded value"
print(safe_data)
"#;
let findings = scan_file("test.py", code).unwrap();
assert!(findings.is_empty());
}
#[test]
fn test_multiple_transformations() {
let code = r#"
user_input = input("Enter: ")
step1 = transform1(user_input)
step2 = transform2(step1)
print(step2)
"#;
let findings = scan_file("test.py", code).unwrap();
assert!(!findings.is_empty());
let finding = findings.first().unwrap();
assert!(finding.tainted_variables.iter().any(|v| v == "step2"));
}
#[test]
fn test_severity_classification() {
assert_eq!(determine_severity("subprocess"), "critical");
assert_eq!(determine_severity("database_insert"), "critical");
assert_eq!(determine_severity("file_write"), "high");
assert_eq!(determine_severity("print"), "medium");
}
#[test]
fn test_data_flow_extraction() {
let code = r#"
user_data = input("Enter: ")
processed = user_data.strip()
"#;
let _findings = scan_file("test.py", code).unwrap();
let state = TaintState::new();
assert!(state.tainted_vars.is_empty()); }
}