use std::collections::{HashMap, HashSet};
use std::fs;
use std::path::{Path, PathBuf};
use std::time::Instant;
use anyhow::Result;
use clap::Args;
use serde_json::{json, Value};
use tree_sitter::{Node, Parser};
use walkdir::WalkDir;
use super::error::RemainingError;
use super::types::{Severity, TaintFlow, VulnFinding, VulnReport, VulnSummary, VulnType};
use crate::output::OutputFormat;
const MAX_TAINT_DEPTH: usize = 5;
const MAX_FILE_SIZE: u64 = 10 * 1024 * 1024;
const MAX_DIRECTORY_FILES: u32 = 1000;
#[derive(Debug, Args)]
pub struct VulnArgs {
pub path: PathBuf,
#[arg(long)]
pub severity: Option<Severity>,
#[arg(long, value_name = "TYPE")]
pub vuln_type: Option<Vec<VulnType>>,
#[arg(long)]
pub include_informational: bool,
#[arg(long, short = 'O')]
pub output: Option<PathBuf>,
}
#[derive(Debug, Clone)]
struct TaintSource {
module: &'static str,
attr: &'static str,
description: &'static str,
}
const PYTHON_SOURCES: &[TaintSource] = &[
TaintSource {
module: "request",
attr: "args",
description: "Flask request.args (GET parameters)",
},
TaintSource {
module: "request",
attr: "form",
description: "Flask request.form (POST data)",
},
TaintSource {
module: "request",
attr: "get",
description: "Flask request.get() method",
},
TaintSource {
module: "request",
attr: "values",
description: "Flask request.values",
},
TaintSource {
module: "request",
attr: "data",
description: "Flask request.data (raw body)",
},
TaintSource {
module: "request",
attr: "json",
description: "Flask request.json",
},
TaintSource {
module: "request",
attr: "cookies",
description: "Flask request.cookies",
},
TaintSource {
module: "request",
attr: "headers",
description: "Flask request.headers",
},
TaintSource {
module: "sys",
attr: "argv",
description: "Command line arguments",
},
TaintSource {
module: "",
attr: "input",
description: "Python input() builtin",
},
TaintSource {
module: "os",
attr: "environ",
description: "Environment variables",
},
];
#[derive(Debug, Clone)]
struct TaintSink {
module: &'static str,
function: &'static str,
vuln_type: VulnType,
description: &'static str,
remediation: &'static str,
}
const PYTHON_SINKS: &[TaintSink] = &[
TaintSink {
module: "cursor",
function: "execute",
vuln_type: VulnType::SqlInjection,
description: "SQL query execution with unsanitized input",
remediation: "Use parameterized queries: cursor.execute(\"SELECT * FROM users WHERE id = ?\", (user_id,))",
},
TaintSink {
module: "cursor",
function: "executemany",
vuln_type: VulnType::SqlInjection,
description: "SQL batch execution with unsanitized input",
remediation: "Use parameterized queries with placeholders",
},
TaintSink {
module: "",
function: "raw",
vuln_type: VulnType::SqlInjection,
description: "Django raw SQL query",
remediation: "Use Django ORM methods or parameterized raw queries",
},
TaintSink {
module: "os",
function: "system",
vuln_type: VulnType::CommandInjection,
description: "Shell command execution with unsanitized input",
remediation: "Use subprocess with shell=False and a list of arguments",
},
TaintSink {
module: "os",
function: "popen",
vuln_type: VulnType::CommandInjection,
description: "Shell command via os.popen",
remediation: "Use subprocess.run with shell=False",
},
TaintSink {
module: "subprocess",
function: "run",
vuln_type: VulnType::CommandInjection,
description: "Subprocess execution (dangerous with shell=True)",
remediation: "Use subprocess.run with shell=False and pass arguments as a list",
},
TaintSink {
module: "subprocess",
function: "call",
vuln_type: VulnType::CommandInjection,
description: "Subprocess call (dangerous with shell=True)",
remediation: "Use subprocess.call with shell=False and pass arguments as a list",
},
TaintSink {
module: "subprocess",
function: "Popen",
vuln_type: VulnType::CommandInjection,
description: "Subprocess Popen (dangerous with shell=True)",
remediation: "Use subprocess.Popen with shell=False",
},
TaintSink {
module: "",
function: "eval",
vuln_type: VulnType::CommandInjection,
description: "Python eval() with user input",
remediation: "Avoid eval() entirely; use ast.literal_eval() for safe parsing",
},
TaintSink {
module: "",
function: "exec",
vuln_type: VulnType::CommandInjection,
description: "Python exec() with user input",
remediation: "Avoid exec() entirely; refactor to avoid dynamic code execution",
},
TaintSink {
module: "",
function: "render_template_string",
vuln_type: VulnType::Xss,
description: "Template rendering with unsanitized input",
remediation: "Use render_template with separate .html files and auto-escaping",
},
TaintSink {
module: "",
function: "Markup",
vuln_type: VulnType::Xss,
description: "Marking string as safe HTML",
remediation: "Never mark user input as safe; let Jinja2 auto-escape",
},
TaintSink {
module: "",
function: "open",
vuln_type: VulnType::PathTraversal,
description: "File open with user-controlled path",
remediation: "Validate and sanitize file paths; use os.path.basename()",
},
TaintSink {
module: "os.path",
function: "join",
vuln_type: VulnType::PathTraversal,
description: "Path construction with user input",
remediation: "Validate that the result is within allowed directories",
},
TaintSink {
module: "requests",
function: "get",
vuln_type: VulnType::Ssrf,
description: "HTTP request with user-controlled URL",
remediation: "Validate URLs against an allowlist of permitted hosts",
},
TaintSink {
module: "requests",
function: "post",
vuln_type: VulnType::Ssrf,
description: "HTTP POST with user-controlled URL",
remediation: "Validate URLs against an allowlist of permitted hosts",
},
TaintSink {
module: "urllib",
function: "urlopen",
vuln_type: VulnType::Ssrf,
description: "URL open with user-controlled input",
remediation: "Validate URLs against an allowlist",
},
];
#[derive(Debug, Default)]
struct TaintTracker {
tainted: HashMap<String, TaintInfo>,
depth: usize,
}
#[derive(Debug, Clone)]
struct TaintInfo {
source_desc: String,
source_line: u32,
source_column: u32,
code_snippet: String,
}
impl TaintTracker {
fn new() -> Self {
Self::default()
}
fn mark_tainted(&mut self, var: String, info: TaintInfo) {
self.tainted.insert(var, info);
}
#[cfg(test)]
fn is_tainted(&self, var: &str) -> Option<&TaintInfo> {
self.tainted.get(var)
}
fn propagate(&mut self, from: &str, to: String) {
if self.depth >= MAX_TAINT_DEPTH {
return; }
if let Some(info) = self.tainted.get(from).cloned() {
self.tainted.insert(to, info);
self.depth += 1;
}
}
fn expression_is_tainted(&self, expr: &str) -> Option<&TaintInfo> {
for (var, info) in &self.tainted {
if expr.contains(var) {
return Some(info);
}
}
None
}
}
impl VulnArgs {
pub fn run(&self, format: OutputFormat) -> Result<()> {
let start = Instant::now();
if !self.path.exists() {
return Err(RemainingError::file_not_found(&self.path).into());
}
let files = collect_files(&self.path)?;
let mut all_findings: Vec<VulnFinding> = Vec::new();
let mut files_scanned: u32 = 0;
let mut files_with_vulns: HashSet<String> = HashSet::new();
for file_path in &files {
if let Ok(findings) = analyze_file(file_path) {
for finding in findings {
files_with_vulns.insert(finding.file.clone());
all_findings.push(finding);
}
}
files_scanned += 1;
}
let mut filtered_findings = all_findings;
if let Some(min_severity) = &self.severity {
filtered_findings.retain(|f| f.severity.order() <= min_severity.order());
}
if let Some(types) = &self.vuln_type {
filtered_findings.retain(|f| types.contains(&f.vuln_type));
}
if !self.include_informational {
filtered_findings.retain(|f| f.severity != Severity::Info);
}
let summary = build_summary(&filtered_findings, files_with_vulns.len() as u32);
let report = VulnReport {
findings: filtered_findings.clone(),
summary: Some(summary),
scan_duration_ms: start.elapsed().as_millis() as u64,
files_scanned,
};
let output_str = match format {
OutputFormat::Sarif => {
let sarif = generate_sarif(&report);
serde_json::to_string_pretty(&sarif)?
}
OutputFormat::Text => format_vuln_text(&report),
_ => serde_json::to_string_pretty(&report)?,
};
if let Some(ref output_path) = self.output {
fs::write(output_path, &output_str)?;
} else {
println!("{}", output_str);
}
if !filtered_findings.is_empty() {
return Err(RemainingError::findings_detected(filtered_findings.len() as u32).into());
}
Ok(())
}
}
fn collect_files(path: &Path) -> Result<Vec<PathBuf>, RemainingError> {
let mut files = Vec::new();
if path.is_file() {
let metadata = fs::metadata(path).map_err(|_| RemainingError::file_not_found(path))?;
if metadata.len() > MAX_FILE_SIZE {
return Err(RemainingError::file_too_large(path, metadata.len()));
}
files.push(path.to_path_buf());
} else if path.is_dir() {
for entry in WalkDir::new(path)
.max_depth(10)
.into_iter()
.filter_map(|e| e.ok())
{
if files.len() >= MAX_DIRECTORY_FILES as usize {
break;
}
let entry_path = entry.path();
if entry_path.is_file() && is_supported_source_file(entry_path) {
if let Ok(metadata) = fs::metadata(entry_path) {
if metadata.len() <= MAX_FILE_SIZE {
files.push(entry_path.to_path_buf());
}
}
}
}
}
Ok(files)
}
fn is_supported_source_file(path: &Path) -> bool {
matches!(path.extension().and_then(|e| e.to_str()), Some("py" | "rs"))
}
fn analyze_file(path: &Path) -> Result<Vec<VulnFinding>, RemainingError> {
let source = fs::read_to_string(path).map_err(|_| RemainingError::file_not_found(path))?;
if matches!(path.extension().and_then(|e| e.to_str()), Some("rs")) {
return Ok(analyze_rust_file(path, &source));
}
if matches!(path.extension().and_then(|e| e.to_str()), Some("py")) {
return analyze_python_file(path, &source);
}
match tldr_core::security::vuln::scan_vulnerabilities(path, None, None) {
Ok(report) => {
let mut findings = Vec::new();
for f in report.findings {
let vuln_type = match format!("{:?}", f.vuln_type).as_str() {
"SqlInjection" => VulnType::SqlInjection,
"CommandInjection" => VulnType::CommandInjection,
"Xss" => VulnType::Xss,
"PathTraversal" => VulnType::PathTraversal,
_ => VulnType::SqlInjection,
};
let severity = match f.severity.to_uppercase().as_str() {
"CRITICAL" => Severity::Critical,
"HIGH" => Severity::High,
"MEDIUM" => Severity::Medium,
"LOW" => Severity::Low,
_ => Severity::Medium,
};
let file_str = f.file.display().to_string();
findings.push(VulnFinding {
vuln_type,
severity,
cwe_id: f.cwe_id.unwrap_or_default(),
title: format!("{:?}", f.vuln_type),
description: format!(
"{} with unsanitized input",
f.sink.sink_type
),
file: file_str.clone(),
line: f.sink.line,
column: 0,
taint_flow: vec![
TaintFlow {
file: file_str.clone(),
line: f.source.line,
column: 0,
code_snippet: f.source.expression.clone(),
description: format!("Source: {}", f.source.source_type),
},
TaintFlow {
file: file_str,
line: f.sink.line,
column: 0,
code_snippet: f.sink.expression.clone(),
description: format!("Sink: {}", f.sink.sink_type),
},
],
remediation: f.remediation.clone(),
confidence: 0.85,
});
}
Ok(findings)
}
Err(_) => Ok(Vec::new()),
}
}
fn analyze_python_file(path: &Path, source: &str) -> Result<Vec<VulnFinding>, RemainingError> {
let mut parser = get_python_parser()?;
let tree = parser
.parse(source, None)
.ok_or_else(|| RemainingError::parse_error(path, "Failed to parse file"))?;
let mut findings = Vec::new();
let source_bytes = source.as_bytes();
let file_path = path.display().to_string();
analyze_node(tree.root_node(), source_bytes, &file_path, &mut findings);
Ok(findings)
}
fn analyze_rust_file(path: &Path, source: &str) -> Vec<VulnFinding> {
let file_path = path.display().to_string();
let is_test_file = is_rust_test_file(path);
let mut findings = Vec::new();
let lines: Vec<&str> = source.lines().collect();
let mut in_command_block = false;
let mut command_block_start_line: u32 = 0;
for (idx, line) in lines.iter().enumerate() {
let line_number = (idx + 1) as u32;
let trimmed = line.trim();
if trimmed.is_empty() || trimmed.starts_with("//") {
continue;
}
if (trimmed.contains("unsafe {") || trimmed.starts_with("unsafe{"))
&& !has_nearby_safety_comment(&lines, idx)
{
findings.push(rust_finding(
VulnType::UnsafeCode,
Severity::High,
RustFindingMeta {
cwe_id: "CWE-242",
title: "Unsafe Block Without Safety Rationale",
description: "unsafe block found without nearby SAFETY: justification comment",
},
RustFindingLocation {
file: &file_path,
line: line_number,
column: trimmed.find("unsafe").unwrap_or(0) as u32,
},
"Document invariants with // SAFETY: ... or avoid unsafe when possible",
0.80,
));
}
if trimmed.contains("std::mem::transmute(") || trimmed.contains("mem::transmute(") {
findings.push(rust_finding(
VulnType::MemorySafety,
Severity::Critical,
RustFindingMeta {
cwe_id: "CWE-119",
title: "Risky transmute Usage",
description: "std::mem::transmute can violate type and memory safety guarantees",
},
RustFindingLocation {
file: &file_path,
line: line_number,
column: trimmed.find("transmute").unwrap_or(0) as u32,
},
"Prefer safe conversions (From/TryFrom, bytemuck) and explicit layout checks",
0.90,
));
}
if trimmed.contains("std::ptr::")
|| trimmed.contains("core::ptr::")
|| trimmed.contains("ptr::read(")
|| trimmed.contains("ptr::write(")
{
findings.push(rust_finding(
VulnType::MemorySafety,
Severity::High,
RustFindingMeta {
cwe_id: "CWE-119",
title: "Raw Pointer Operation",
description: "raw pointer operation detected; verify lifetimes, aliasing, and bounds",
},
RustFindingLocation {
file: &file_path,
line: line_number,
column: trimmed.find("ptr::").unwrap_or(0) as u32,
},
"Use safe abstractions/slices where possible and document pointer invariants",
0.85,
));
}
if !is_test_file && trimmed.contains(".unwrap()") {
findings.push(rust_finding(
VulnType::Panic,
Severity::Medium,
RustFindingMeta {
cwe_id: "CWE-703",
title: "Potential Panic From unwrap()",
description: "unwrap() in non-test Rust code can panic in production paths",
},
RustFindingLocation {
file: &file_path,
line: line_number,
column: trimmed.find(".unwrap()").unwrap_or(0) as u32,
},
"Handle Result/Option explicitly or use expect() with actionable context",
0.70,
));
}
if trimmed.contains("format!(")
&& contains_sql_keyword(trimmed)
&& (trimmed.contains("{}") || trimmed.contains("{") || trimmed.contains("+"))
{
findings.push(rust_finding(
VulnType::SqlInjection,
Severity::Critical,
RustFindingMeta {
cwe_id: "CWE-89",
title: "SQL String Interpolation",
description: "SQL query appears to be built via string formatting/interpolation",
},
RustFindingLocation {
file: &file_path,
line: line_number,
column: trimmed.find("format!(").unwrap_or(0) as u32,
},
"Use parameterized queries via your DB client instead of format!/concatenation",
0.88,
));
}
if trimmed.contains("from_utf8_unchecked(")
|| trimmed.contains(".as_bytes()[")
|| trimmed.contains(".as_bytes().get_unchecked(")
{
findings.push(rust_finding(
VulnType::MemorySafety,
Severity::High,
RustFindingMeta {
cwe_id: "CWE-20",
title: "Unchecked Byte/String Conversion",
description: "unchecked UTF-8 or byte indexing detected without visible validation",
},
RustFindingLocation {
file: &file_path,
line: line_number,
column: trimmed
.find("as_bytes")
.or_else(|| trimmed.find("from_utf8_unchecked"))
.unwrap_or(0) as u32,
},
"Validate lengths/UTF-8 before conversion or use checked APIs",
0.82,
));
}
if trimmed.contains("Command::new(") || trimmed.contains("std::process::Command::new(") {
in_command_block = true;
command_block_start_line = line_number;
}
if in_command_block
&& trimmed.contains(".arg(")
&& !trimmed.contains(".arg(\"")
&& !trimmed.contains(".arg('")
{
findings.push(rust_finding(
VulnType::CommandInjection,
Severity::Critical,
RustFindingMeta {
cwe_id: "CWE-78",
title: "Unsanitized Process Argument",
description: "Command argument appears to be variable-driven without visible sanitization",
},
RustFindingLocation {
file: &file_path,
line: command_block_start_line.max(line_number),
column: trimmed.find(".arg(").unwrap_or(0) as u32,
},
"Validate/allowlist user-controlled arguments before passing to Command",
0.80,
));
}
if in_command_block && (trimmed.ends_with(';') || trimmed.contains(");")) {
in_command_block = false;
command_block_start_line = 0;
}
}
findings
}
struct RustFindingMeta<'a> {
cwe_id: &'a str,
title: &'a str,
description: &'a str,
}
struct RustFindingLocation<'a> {
file: &'a str,
line: u32,
column: u32,
}
fn rust_finding(
vuln_type: VulnType,
severity: Severity,
meta: RustFindingMeta<'_>,
location: RustFindingLocation<'_>,
remediation: &str,
confidence: f64,
) -> VulnFinding {
VulnFinding {
vuln_type,
severity,
cwe_id: meta.cwe_id.to_string(),
title: meta.title.to_string(),
description: meta.description.to_string(),
file: location.file.to_string(),
line: location.line,
column: location.column,
taint_flow: Vec::new(),
remediation: remediation.to_string(),
confidence,
}
}
fn has_nearby_safety_comment(lines: &[&str], index: usize) -> bool {
let start = index.saturating_sub(2);
(start..index).any(|i| lines[i].contains("SAFETY:"))
}
fn contains_sql_keyword(text: &str) -> bool {
let upper = text.to_uppercase();
["SELECT", "INSERT", "UPDATE", "DELETE", "FROM", "WHERE"]
.iter()
.any(|kw| upper.contains(kw))
}
fn is_rust_test_file(path: &Path) -> bool {
let path_str = path.to_string_lossy();
path_str.contains("/tests/")
|| path_str.contains("\\tests\\")
|| path_str.ends_with("_test.rs")
|| path_str.ends_with("tests.rs")
}
fn analyze_node(node: Node, source: &[u8], file_path: &str, findings: &mut Vec<VulnFinding>) {
match node.kind() {
"function_definition" | "async_function_definition" => {
analyze_function(node, source, file_path, findings);
}
"decorated_definition" => {
for child in node.children(&mut node.walk()) {
if child.kind() == "function_definition"
|| child.kind() == "async_function_definition"
{
analyze_function(child, source, file_path, findings);
}
}
}
_ => {
for child in node.children(&mut node.walk()) {
analyze_node(child, source, file_path, findings);
}
}
}
}
fn analyze_function(
func_node: Node,
source: &[u8],
file_path: &str,
findings: &mut Vec<VulnFinding>,
) {
let mut tracker = TaintTracker::new();
let source_lines: Vec<&str> = std::str::from_utf8(source).unwrap_or("").lines().collect();
if let Some(body) = func_node.child_by_field_name("body") {
analyze_block(
body,
source,
file_path,
&mut tracker,
findings,
&source_lines,
);
}
}
fn analyze_block(
block: Node,
source: &[u8],
file_path: &str,
tracker: &mut TaintTracker,
findings: &mut Vec<VulnFinding>,
source_lines: &[&str],
) {
for child in block.children(&mut block.walk()) {
analyze_statement(child, source, file_path, tracker, findings, source_lines);
}
}
fn analyze_statement(
stmt: Node,
source: &[u8],
file_path: &str,
tracker: &mut TaintTracker,
findings: &mut Vec<VulnFinding>,
source_lines: &[&str],
) {
match stmt.kind() {
"expression_statement" => {
if let Some(expr) = stmt.child(0) {
if expr.kind() == "assignment" {
analyze_assignment(expr, source, tracker, source_lines);
}
analyze_expression(expr, source, file_path, tracker, findings, source_lines);
}
}
"assignment" => {
analyze_assignment(stmt, source, tracker, source_lines);
}
"augmented_assignment" => {
analyze_augmented_assignment(stmt, source, tracker);
}
"if_statement" | "for_statement" | "while_statement" | "with_statement" => {
for child in stmt.children(&mut stmt.walk()) {
if child.kind() == "block" {
analyze_block(child, source, file_path, tracker, findings, source_lines);
}
}
}
"return_statement" => {
if let Some(value) = stmt.child_by_field_name("value").or_else(|| stmt.child(1)) {
let value_text = node_text(value, source);
check_xss_return(
value_text,
value,
file_path,
tracker,
findings,
source_lines,
);
}
}
_ => {
for child in stmt.children(&mut stmt.walk()) {
analyze_statement(child, source, file_path, tracker, findings, source_lines);
}
}
}
}
fn analyze_assignment(
assignment: Node,
source: &[u8],
tracker: &mut TaintTracker,
source_lines: &[&str],
) {
let lhs = assignment
.child_by_field_name("left")
.or_else(|| assignment.child(0));
let rhs = assignment
.child_by_field_name("right")
.or_else(|| assignment.child(2));
if let (Some(lhs_node), Some(rhs_node)) = (lhs, rhs) {
let lhs_text = node_text(lhs_node, source);
let rhs_text = node_text(rhs_node, source);
let line = rhs_node.start_position().row as u32 + 1;
let column = rhs_node.start_position().column as u32;
if let Some(source_desc) = is_taint_source(rhs_text) {
let code_snippet = source_lines
.get(line as usize - 1)
.map(|s| s.to_string())
.unwrap_or_default();
tracker.mark_tainted(
lhs_text.to_string(),
TaintInfo {
source_desc,
source_line: line,
source_column: column,
code_snippet,
},
);
}
if let Some(taint_info) = tracker.expression_is_tainted(rhs_text) {
let code_snippet = source_lines
.get(line as usize - 1)
.map(|s| s.to_string())
.unwrap_or_default();
tracker.mark_tainted(
lhs_text.to_string(),
TaintInfo {
source_desc: taint_info.source_desc.clone(),
source_line: line,
source_column: column,
code_snippet,
},
);
}
}
}
fn analyze_augmented_assignment(assignment: Node, source: &[u8], tracker: &mut TaintTracker) {
let lhs = assignment
.child_by_field_name("left")
.or_else(|| assignment.child(0));
let rhs = assignment
.child_by_field_name("right")
.or_else(|| assignment.child(2));
if let (Some(lhs_node), Some(rhs_node)) = (lhs, rhs) {
let lhs_text = node_text(lhs_node, source);
let rhs_text = node_text(rhs_node, source);
if tracker.expression_is_tainted(rhs_text).is_some() {
tracker.propagate(rhs_text, lhs_text.to_string());
}
}
}
fn analyze_expression(
expr: Node,
source: &[u8],
file_path: &str,
tracker: &mut TaintTracker,
findings: &mut Vec<VulnFinding>,
source_lines: &[&str],
) {
match expr.kind() {
"call" => {
analyze_call(expr, source, file_path, tracker, findings, source_lines);
}
_ => {
for child in expr.children(&mut expr.walk()) {
analyze_expression(child, source, file_path, tracker, findings, source_lines);
}
}
}
}
fn analyze_call(
call: Node,
source: &[u8],
file_path: &str,
tracker: &mut TaintTracker,
findings: &mut Vec<VulnFinding>,
source_lines: &[&str],
) {
let func = call
.child_by_field_name("function")
.or_else(|| call.child(0));
let args = call.child_by_field_name("arguments");
if let Some(func_node) = func {
let func_text = node_text(func_node, source);
let line = call.start_position().row as u32 + 1;
let column = call.start_position().column as u32;
if let Some(sink) = is_taint_sink(func_text) {
if let Some(args_node) = args {
let args_text = node_text(args_node, source);
if sink.vuln_type == VulnType::SqlInjection && is_parameterized_query(args_text) {
return; }
if let Some(taint_info) = tracker.expression_is_tainted(args_text) {
let code_snippet = source_lines
.get(line as usize - 1)
.map(|s| s.to_string())
.unwrap_or_default();
let taint_flow = vec![
TaintFlow {
file: file_path.to_string(),
line: taint_info.source_line,
column: taint_info.source_column,
code_snippet: taint_info.code_snippet.clone(),
description: format!("Source: {}", taint_info.source_desc),
},
TaintFlow {
file: file_path.to_string(),
line,
column,
code_snippet: code_snippet.clone(),
description: format!("Sink: {} call", func_text),
},
];
findings.push(VulnFinding {
vuln_type: sink.vuln_type,
severity: sink.vuln_type.default_severity(),
cwe_id: sink.vuln_type.cwe_id().to_string(),
title: format!("{} Vulnerability", vuln_type_name(sink.vuln_type)),
description: sink.description.to_string(),
file: file_path.to_string(),
line,
column,
taint_flow,
remediation: sink.remediation.to_string(),
confidence: 0.85,
});
}
if is_string_interpolation_tainted(args_text, tracker) {
let code_snippet = source_lines
.get(line as usize - 1)
.map(|s| s.to_string())
.unwrap_or_default();
let taint_info = find_taint_in_string(args_text, tracker);
let taint_flow = if let Some(info) = taint_info {
vec![
TaintFlow {
file: file_path.to_string(),
line: info.source_line,
column: info.source_column,
code_snippet: info.code_snippet.clone(),
description: format!("Source: {}", info.source_desc),
},
TaintFlow {
file: file_path.to_string(),
line,
column,
code_snippet: code_snippet.clone(),
description: format!(
"Sink: {} call with string interpolation",
func_text
),
},
]
} else {
vec![TaintFlow {
file: file_path.to_string(),
line,
column,
code_snippet,
description: format!("Sink: {} call", func_text),
}]
};
findings.push(VulnFinding {
vuln_type: sink.vuln_type,
severity: sink.vuln_type.default_severity(),
cwe_id: sink.vuln_type.cwe_id().to_string(),
title: format!("{} Vulnerability", vuln_type_name(sink.vuln_type)),
description: sink.description.to_string(),
file: file_path.to_string(),
line,
column,
taint_flow,
remediation: sink.remediation.to_string(),
confidence: 0.8,
});
}
}
}
if func_text.contains("subprocess")
|| func_text == "run"
|| func_text == "call"
|| func_text == "Popen"
{
if let Some(args_node) = args {
let args_text = node_text(args_node, source);
if args_text.contains("shell=True") || args_text.contains("shell = True") {
if let Some(taint_info) = tracker.expression_is_tainted(args_text) {
let code_snippet = source_lines
.get(line as usize - 1)
.map(|s| s.to_string())
.unwrap_or_default();
let taint_flow = vec![
TaintFlow {
file: file_path.to_string(),
line: taint_info.source_line,
column: taint_info.source_column,
code_snippet: taint_info.code_snippet.clone(),
description: format!("Source: {}", taint_info.source_desc),
},
TaintFlow {
file: file_path.to_string(),
line,
column,
code_snippet,
description: "Sink: subprocess with shell=True".to_string(),
},
];
findings.push(VulnFinding {
vuln_type: VulnType::CommandInjection,
severity: Severity::Critical,
cwe_id: "CWE-78".to_string(),
title: "Command Injection Vulnerability".to_string(),
description:
"Subprocess executed with shell=True and user-controlled input"
.to_string(),
file: file_path.to_string(),
line,
column,
taint_flow,
remediation:
"Use subprocess.run with shell=False and pass arguments as a list"
.to_string(),
confidence: 0.9,
});
}
}
}
}
}
}
fn check_xss_return(
value_text: &str,
value_node: Node,
file_path: &str,
tracker: &mut TaintTracker,
findings: &mut Vec<VulnFinding>,
source_lines: &[&str],
) {
if value_text.starts_with("f\"") || value_text.starts_with("f'") {
if let Some(taint_info) = find_taint_in_string(value_text, tracker) {
let line = value_node.start_position().row as u32 + 1;
let column = value_node.start_position().column as u32;
let code_snippet = source_lines
.get(line as usize - 1)
.map(|s| s.to_string())
.unwrap_or_default();
if value_text.contains('<') && value_text.contains('>') {
let taint_flow = vec![
TaintFlow {
file: file_path.to_string(),
line: taint_info.source_line,
column: taint_info.source_column,
code_snippet: taint_info.code_snippet.clone(),
description: format!("Source: {}", taint_info.source_desc),
},
TaintFlow {
file: file_path.to_string(),
line,
column,
code_snippet,
description: "Sink: Returning HTML with user input".to_string(),
},
];
findings.push(VulnFinding {
vuln_type: VulnType::Xss,
severity: Severity::High,
cwe_id: "CWE-79".to_string(),
title: "Cross-Site Scripting (XSS) Vulnerability".to_string(),
description: "User input embedded in HTML response without escaping"
.to_string(),
file: file_path.to_string(),
line,
column,
taint_flow,
remediation: "Use a templating engine with auto-escaping or escape user input"
.to_string(),
confidence: 0.75,
});
}
}
}
}
fn get_python_parser() -> Result<Parser, RemainingError> {
let mut parser = Parser::new();
let language = tree_sitter_python::LANGUAGE;
parser.set_language(&language.into()).map_err(|e| {
RemainingError::parse_error(PathBuf::new(), format!("Failed to set language: {}", e))
})?;
Ok(parser)
}
fn node_text<'a>(node: Node, source: &'a [u8]) -> &'a str {
node.utf8_text(source).unwrap_or("")
}
fn is_taint_source(expr: &str) -> Option<String> {
for source in PYTHON_SOURCES {
if !source.module.is_empty() {
let pattern = format!("{}.{}", source.module, source.attr);
if expr.contains(&pattern) {
return Some(source.description.to_string());
}
} else {
if expr.contains(&format!("{}(", source.attr)) {
return Some(source.description.to_string());
}
}
if expr.contains(&format!(".{}.get", source.attr))
|| expr.contains(&format!(".{}[", source.attr))
{
return Some(source.description.to_string());
}
}
None
}
fn is_taint_sink(func_expr: &str) -> Option<&'static TaintSink> {
const AMBIGUOUS_METHODS: &[&str] = &["get", "post", "put", "delete", "read", "write", "open"];
for sink in PYTHON_SINKS {
if !sink.module.is_empty() {
let pattern = format!("{}.{}", sink.module, sink.function);
if func_expr.contains(&pattern) {
return Some(sink);
}
if !AMBIGUOUS_METHODS.contains(&sink.function)
&& func_expr.ends_with(&format!(".{}", sink.function))
{
return Some(sink);
}
} else {
if func_expr == sink.function || func_expr.ends_with(&format!(".{}", sink.function)) {
return Some(sink);
}
}
}
None
}
fn is_parameterized_query(args_text: &str) -> bool {
let has_placeholder = args_text.contains("?")
|| args_text.contains("%s")
|| args_text.contains(":param")
|| args_text.contains("$1");
let has_params_collection = args_text.contains(", (")
|| args_text.contains(", [")
|| args_text.contains(",(")
|| args_text.contains(",[");
if has_placeholder && has_params_collection {
if let Some(comma_pos) = args_text.find(", (").or_else(|| args_text.find(",(")) {
let query_part = &args_text[..comma_pos];
let is_unsafe = query_part.contains("f\"")
|| query_part.contains("f'")
|| (query_part.contains(" + ")
&& !query_part.trim_start().starts_with("(\"")
&& !query_part.trim_start().starts_with("('"));
return !is_unsafe;
}
return true;
}
false
}
fn is_string_interpolation_tainted(text: &str, tracker: &TaintTracker) -> bool {
if text.contains("f\"") || text.contains("f'") {
for var in tracker.tainted.keys() {
let pattern1 = format!("{{{}}}", var); let pattern2 = format!("{{{{{}}}}}", var); if text.contains(&pattern1) || text.contains(&pattern2) {
return true;
}
}
}
if text.contains(" + ") || text.contains("\" +") || text.contains("' +") {
for var in tracker.tainted.keys() {
if text.contains(var) {
return true;
}
}
}
if text.contains(" % ") {
for var in tracker.tainted.keys() {
if text.contains(var) {
return true;
}
}
}
if text.contains(".format(") {
for var in tracker.tainted.keys() {
if text.contains(var) {
return true;
}
}
}
false
}
fn find_taint_in_string<'a>(text: &str, tracker: &'a TaintTracker) -> Option<&'a TaintInfo> {
for (var, info) in &tracker.tainted {
if text.contains(var) {
return Some(info);
}
}
None
}
fn vuln_type_name(vt: VulnType) -> &'static str {
match vt {
VulnType::SqlInjection => "SQL Injection",
VulnType::Xss => "Cross-Site Scripting (XSS)",
VulnType::CommandInjection => "Command Injection",
VulnType::Ssrf => "Server-Side Request Forgery (SSRF)",
VulnType::PathTraversal => "Path Traversal",
VulnType::Deserialization => "Insecure Deserialization",
VulnType::UnsafeCode => "Unsafe Code Risk",
VulnType::MemorySafety => "Memory Safety Violation",
VulnType::Panic => "Unchecked Panic Path",
VulnType::Xxe => "XML External Entity (XXE)",
VulnType::OpenRedirect => "Open Redirect",
VulnType::LdapInjection => "LDAP Injection",
VulnType::XpathInjection => "XPath Injection",
}
}
fn build_summary(findings: &[VulnFinding], files_with_vulns: u32) -> VulnSummary {
let mut by_severity: HashMap<String, u32> = HashMap::new();
let mut by_type: HashMap<String, u32> = HashMap::new();
for finding in findings {
*by_severity.entry(finding.severity.to_string()).or_insert(0) += 1;
*by_type
.entry(format!("{:?}", finding.vuln_type).to_lowercase())
.or_insert(0) += 1;
}
VulnSummary {
total_findings: findings.len() as u32,
by_severity,
by_type,
files_with_vulns,
}
}
fn format_vuln_text(report: &VulnReport) -> String {
let mut out = String::new();
out.push_str("=== Vulnerability Scan Results ===\n\n");
if report.findings.is_empty() {
out.push_str("No vulnerabilities found.\n");
} else {
out.push_str(&format!(
"Found {} vulnerabilities:\n\n",
report.findings.len()
));
for (i, finding) in report.findings.iter().enumerate() {
out.push_str(&format!(
"{}. [{}] {} ({})\n",
i + 1,
finding.severity.to_string().to_uppercase(),
finding.title,
finding.cwe_id
));
out.push_str(&format!(" File: {}:{}\n", finding.file, finding.line));
out.push_str(&format!(" {}\n", finding.description));
if !finding.taint_flow.is_empty() {
out.push_str(" Taint Flow:\n");
for (j, flow) in finding.taint_flow.iter().enumerate() {
out.push_str(&format!(
" {}. {}:{} - {}\n",
j + 1,
flow.file,
flow.line,
flow.description
));
if !flow.code_snippet.is_empty() {
out.push_str(&format!(" {}\n", flow.code_snippet.trim()));
}
}
}
out.push_str(&format!(" Remediation: {}\n\n", finding.remediation));
}
}
if let Some(summary) = &report.summary {
out.push_str("=== Summary ===\n");
out.push_str(&format!(
"Total: {} vulnerabilities\n",
summary.total_findings
));
out.push_str(&format!(
"Files with vulnerabilities: {}\n",
summary.files_with_vulns
));
if !summary.by_severity.is_empty() {
out.push_str("By Severity:\n");
for (sev, count) in &summary.by_severity {
out.push_str(&format!(" {}: {}\n", sev, count));
}
}
}
out.push_str(&format!("\nScan duration: {}ms\n", report.scan_duration_ms));
out.push_str(&format!("Files scanned: {}\n", report.files_scanned));
out
}
fn generate_sarif(report: &VulnReport) -> Value {
let results: Vec<Value> = report
.findings
.iter()
.map(|f| {
json!({
"ruleId": f.cwe_id,
"level": match f.severity {
Severity::Critical | Severity::High => "error",
Severity::Medium => "warning",
Severity::Low | Severity::Info => "note",
},
"message": {
"text": f.description
},
"locations": [{
"physicalLocation": {
"artifactLocation": {
"uri": f.file
},
"region": {
"startLine": f.line,
"startColumn": f.column
}
}
}],
"codeFlows": if f.taint_flow.is_empty() { None } else {
Some(vec![{
json!({
"threadFlows": [{
"locations": f.taint_flow.iter().map(|tf| {
json!({
"location": {
"physicalLocation": {
"artifactLocation": {
"uri": tf.file
},
"region": {
"startLine": tf.line,
"startColumn": tf.column
}
},
"message": {
"text": tf.description
}
}
})
}).collect::<Vec<_>>()
}]
})
}])
}
})
})
.collect();
let rules: Vec<Value> = report
.findings
.iter()
.map(|f| &f.vuln_type)
.collect::<HashSet<_>>()
.into_iter()
.map(|vt| {
json!({
"id": vt.cwe_id(),
"name": vuln_type_name(*vt),
"shortDescription": {
"text": vuln_type_name(*vt)
},
"defaultConfiguration": {
"level": match vt.default_severity() {
Severity::Critical | Severity::High => "error",
Severity::Medium => "warning",
Severity::Low | Severity::Info => "note",
}
}
})
})
.collect();
json!({
"$schema": "https://raw.githubusercontent.com/oasis-tcs/sarif-spec/master/Schemata/sarif-schema-2.1.0.json",
"version": "2.1.0",
"runs": [{
"tool": {
"driver": {
"name": "tldr-vuln",
"version": env!("CARGO_PKG_VERSION"),
"informationUri": "https://github.com/tldr-code/tldr-rs",
"rules": rules
}
},
"results": results
}]
})
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
#[test]
fn test_is_taint_source() {
assert!(is_taint_source("request.args.get('q')").is_some());
assert!(is_taint_source("request.form").is_some());
assert!(is_taint_source("input()").is_some());
assert!(is_taint_source("sys.argv").is_some());
assert!(is_taint_source("clean_var").is_none());
}
#[test]
fn test_is_taint_sink() {
assert!(is_taint_sink("cursor.execute").is_some());
assert!(is_taint_sink("os.system").is_some());
assert!(is_taint_sink("eval").is_some());
assert!(is_taint_sink("print").is_none());
}
#[test]
fn test_taint_tracker() {
let mut tracker = TaintTracker::new();
tracker.mark_tainted(
"user_input".to_string(),
TaintInfo {
source_desc: "request.args".to_string(),
source_line: 5,
source_column: 0,
code_snippet: "user_input = request.args.get('q')".to_string(),
},
);
assert!(tracker.is_tainted("user_input").is_some());
assert!(tracker.is_tainted("clean_var").is_none());
assert!(tracker
.expression_is_tainted("f\"SELECT * FROM t WHERE x = {user_input}\"")
.is_some());
}
#[test]
fn test_vuln_type_cwe_mapping() {
assert_eq!(VulnType::SqlInjection.cwe_id(), "CWE-89");
assert_eq!(VulnType::Xss.cwe_id(), "CWE-79");
assert_eq!(VulnType::CommandInjection.cwe_id(), "CWE-78");
}
#[test]
fn test_vuln_type_severity() {
assert_eq!(
VulnType::SqlInjection.default_severity(),
Severity::Critical
);
assert_eq!(VulnType::Xss.default_severity(), Severity::High);
assert_eq!(VulnType::OpenRedirect.default_severity(), Severity::Medium);
}
#[test]
fn test_string_interpolation_detection() {
let mut tracker = TaintTracker::new();
tracker.mark_tainted(
"user_query".to_string(),
TaintInfo {
source_desc: "test".to_string(),
source_line: 1,
source_column: 0,
code_snippet: "".to_string(),
},
);
assert!(is_string_interpolation_tainted(
r#"f"SELECT * FROM t WHERE x = '{user_query}'"#,
&tracker
));
assert!(is_string_interpolation_tainted(
r#""SELECT * FROM t WHERE x = '" + user_query + "'"#,
&tracker
));
assert!(!is_string_interpolation_tainted(
r#""SELECT * FROM t WHERE x = ?""#,
&tracker
));
}
#[test]
fn test_collect_files_includes_rust() {
let temp = TempDir::new().unwrap();
std::fs::write(temp.path().join("a.py"), "print('ok')").unwrap();
std::fs::write(temp.path().join("b.rs"), "fn main() {}").unwrap();
std::fs::write(temp.path().join("c.txt"), "ignore").unwrap();
let files = collect_files(temp.path()).unwrap();
assert!(files.iter().any(|f| f.ends_with("a.py")));
assert!(files.iter().any(|f| f.ends_with("b.rs")));
assert!(!files.iter().any(|f| f.ends_with("c.txt")));
}
#[test]
fn test_analyze_rust_detects_unsafe_without_safety_comment() {
let source = r#"
pub fn raw_copy(ptr: *mut u8) {
unsafe { *ptr = 7; }
}
"#;
let findings = analyze_rust_file(Path::new("src/lib.rs"), source);
assert!(findings.iter().any(|f| f.vuln_type == VulnType::UnsafeCode));
}
#[test]
fn test_analyze_rust_detects_command_and_sql_patterns() {
let source = r#"
use std::process::Command;
pub fn run(user: &str, name: &str) {
let q = format!("SELECT * FROM users WHERE name = '{}'", name);
let _ = Command::new("sh").arg(user).output();
}
"#;
let findings = analyze_rust_file(Path::new("src/lib.rs"), source);
assert!(findings
.iter()
.any(|f| f.vuln_type == VulnType::SqlInjection));
assert!(findings
.iter()
.any(|f| f.vuln_type == VulnType::CommandInjection));
}
#[test]
fn test_analyze_rust_detects_transmute_usage() {
let source = r#"
use std::mem;
pub fn cast(x: u32) -> i32 {
unsafe { mem::transmute(x) }
}
"#;
let findings = analyze_rust_file(Path::new("src/lib.rs"), source);
assert!(findings
.iter()
.any(|f| f.vuln_type == VulnType::MemorySafety && f.title.contains("transmute")));
}
#[test]
fn test_analyze_rust_detects_raw_pointer_operation() {
let source = r#"
pub unsafe fn read_ptr(p: *const u8) -> u8 {
std::ptr::read(p)
}
"#;
let findings = analyze_rust_file(Path::new("src/lib.rs"), source);
assert!(findings
.iter()
.any(|f| f.vuln_type == VulnType::MemorySafety && f.title.contains("Raw Pointer")));
}
#[test]
fn test_analyze_rust_detects_unwrap_in_non_test_code() {
let source = r#"
pub fn parse(s: &str) -> i32 {
s.parse::<i32>().unwrap()
}
"#;
let findings = analyze_rust_file(Path::new("src/lib.rs"), source);
assert!(findings
.iter()
.any(|f| f.vuln_type == VulnType::Panic && f.title.contains("unwrap")));
}
#[test]
fn test_analyze_rust_detects_unchecked_bytes_patterns() {
let source = r#"
pub fn from_raw(bytes: &[u8]) -> &str {
unsafe { std::str::from_utf8_unchecked(bytes) }
}
"#;
let findings = analyze_rust_file(Path::new("src/lib.rs"), source);
assert!(findings
.iter()
.any(|f| f.vuln_type == VulnType::MemorySafety
&& f.title.contains("Unchecked Byte/String Conversion")));
}
}