use std::collections::HashMap;
use std::fs;
use std::path::{Path, PathBuf};
use std::time::Instant;
use clap::Args;
use colored::Colorize;
use serde_json::Value;
use tree_sitter::Node;
use crate::output::OutputFormat;
use super::ast_cache::AstCache;
use super::error::{RemainingError, RemainingResult};
use super::types::{SecureFinding, SecureReport, SecureSummary};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SecurityAnalysis {
Taint,
Resources,
Bounds,
Contracts,
Behavioral,
Mutability,
}
impl SecurityAnalysis {
pub fn name(&self) -> &'static str {
match self {
Self::Taint => "taint",
Self::Resources => "resources",
Self::Bounds => "bounds",
Self::Contracts => "contracts",
Self::Behavioral => "behavioral",
Self::Mutability => "mutability",
}
}
}
pub const QUICK_ANALYSES: &[SecurityAnalysis] = &[
SecurityAnalysis::Taint,
SecurityAnalysis::Resources,
SecurityAnalysis::Bounds,
];
pub const FULL_ANALYSES: &[SecurityAnalysis] = &[
SecurityAnalysis::Taint,
SecurityAnalysis::Resources,
SecurityAnalysis::Bounds,
SecurityAnalysis::Contracts,
SecurityAnalysis::Behavioral,
SecurityAnalysis::Mutability,
];
#[derive(Debug, Args, Clone)]
pub struct SecureArgs {
pub path: PathBuf,
#[arg(long)]
pub detail: Option<String>,
#[arg(long)]
pub quick: bool,
#[arg(long, short = 'o')]
pub output: Option<PathBuf>,
}
impl SecureArgs {
pub fn run(&self, format: OutputFormat) -> anyhow::Result<()> {
run(self.clone(), format)
}
}
pub fn run(args: SecureArgs, format: OutputFormat) -> anyhow::Result<()> {
let start = Instant::now();
if !args.path.exists() {
return Err(RemainingError::file_not_found(&args.path).into());
}
let mut report = SecureReport::new(args.path.display().to_string());
let mut cache = AstCache::default();
let analyses = if args.quick {
QUICK_ANALYSES
} else {
FULL_ANALYSES
};
let files = collect_files(&args.path)?;
let mut all_findings = Vec::new();
let mut sub_results: HashMap<String, Value> = HashMap::new();
for analysis in analyses {
let (findings, raw_result) = run_security_analysis(*analysis, &files, &mut cache)?;
update_summary(&mut report.summary, *analysis, &findings);
all_findings.extend(findings);
if args.detail.as_deref() == Some(analysis.name()) {
sub_results.insert(analysis.name().to_string(), raw_result);
}
}
all_findings.sort_by(|a, b| severity_order(&a.severity).cmp(&severity_order(&b.severity)));
report.findings = all_findings;
report.sub_results = sub_results;
report.total_elapsed_ms = start.elapsed().as_secs_f64() * 1000.0;
let output_str = match format {
OutputFormat::Json => serde_json::to_string_pretty(&report)?,
OutputFormat::Compact => serde_json::to_string(&report)?,
OutputFormat::Text => format_text_report(&report),
OutputFormat::Sarif | OutputFormat::Dot => {
serde_json::to_string_pretty(&report)?
}
};
if let Some(output_path) = &args.output {
fs::write(output_path, &output_str)?;
} else {
println!("{}", output_str);
}
Ok(())
}
fn collect_files(path: &Path) -> RemainingResult<Vec<PathBuf>> {
let mut files = Vec::new();
if path.is_file() {
if is_supported_secure_file(path) {
files.push(path.to_path_buf());
}
} else if path.is_dir() {
for entry in walkdir::WalkDir::new(path)
.max_depth(10)
.into_iter()
.filter_map(|e| e.ok())
{
let p = entry.path();
if p.is_file() && is_supported_secure_file(p) {
files.push(p.to_path_buf());
}
}
}
Ok(files)
}
fn is_supported_secure_file(path: &std::path::Path) -> bool {
matches!(path.extension().and_then(|e| e.to_str()), Some("py" | "rs"))
}
fn is_rust_file(path: &std::path::Path) -> bool {
matches!(path.extension().and_then(|e| e.to_str()), Some("rs"))
}
fn is_rust_test_file(path: &std::path::Path) -> bool {
let p = path.to_string_lossy();
p.contains("/tests/")
|| p.contains("\\tests\\")
|| p.ends_with("_test.rs")
|| p.ends_with("tests.rs")
}
fn run_security_analysis(
analysis: SecurityAnalysis,
files: &[PathBuf],
cache: &mut AstCache,
) -> RemainingResult<(Vec<SecureFinding>, Value)> {
let mut findings = Vec::new();
for file in files {
let source = fs::read_to_string(file)?;
let tree = cache.get_or_parse(file, &source)?;
let file_findings = match analysis {
SecurityAnalysis::Taint => analyze_taint(tree.root_node(), &source, file),
SecurityAnalysis::Resources => analyze_resources(tree.root_node(), &source, file),
SecurityAnalysis::Bounds => analyze_bounds(tree.root_node(), &source, file),
SecurityAnalysis::Contracts => analyze_contracts(tree.root_node(), &source, file),
SecurityAnalysis::Behavioral => analyze_behavioral(tree.root_node(), &source, file),
SecurityAnalysis::Mutability => analyze_mutability(tree.root_node(), &source, file),
};
findings.extend(file_findings);
}
let raw_result = serde_json::to_value(&findings).unwrap_or(Value::Array(vec![]));
Ok((findings, raw_result))
}
fn update_summary(
summary: &mut SecureSummary,
analysis: SecurityAnalysis,
findings: &[SecureFinding],
) {
match analysis {
SecurityAnalysis::Taint => {
summary.taint_count = findings.len() as u32;
summary.taint_critical =
findings.iter().filter(|f| f.severity == "critical").count() as u32;
summary.unsafe_blocks = findings
.iter()
.filter(|f| f.category == "unsafe_block")
.count() as u32;
}
SecurityAnalysis::Resources => {
summary.leak_count = findings
.iter()
.filter(|f| f.category == "resource_leak")
.count() as u32;
summary.raw_pointer_ops = findings
.iter()
.filter(|f| f.category == "raw_pointer")
.count() as u32;
}
SecurityAnalysis::Bounds => {
summary.bounds_warnings =
findings.iter().filter(|f| f.category == "bounds").count() as u32;
summary.unwrap_calls =
findings.iter().filter(|f| f.category == "unwrap").count() as u32;
summary.todo_markers = findings
.iter()
.filter(|f| f.category == "todo_marker")
.count() as u32;
}
SecurityAnalysis::Contracts => {
summary.missing_contracts = findings.len() as u32;
}
SecurityAnalysis::Behavioral => {
}
SecurityAnalysis::Mutability => {
summary.mutable_params = findings.len() as u32;
}
}
}
fn severity_order(severity: &str) -> u8 {
match severity {
"critical" => 0,
"high" => 1,
"medium" => 2,
"low" => 3,
"info" => 4,
_ => 5,
}
}
const TAINT_SINKS: &[(&str, &str, &str)] = &[
("cursor.execute", "SQL Injection", "critical"),
("execute", "SQL Injection", "critical"),
("os.system", "Command Injection", "critical"),
("subprocess.call", "Command Injection", "critical"),
("subprocess.run", "Command Injection", "high"),
("subprocess.Popen", "Command Injection", "high"),
("eval", "Code Injection", "critical"),
("exec", "Code Injection", "critical"),
("pickle.loads", "Insecure Deserialization", "critical"),
("yaml.load", "Insecure Deserialization", "high"),
("open", "Path Traversal", "high"),
("render_template_string", "Template Injection", "high"),
];
fn analyze_taint(root: Node, source: &str, file: &Path) -> Vec<SecureFinding> {
if is_rust_file(file) {
return analyze_rust_unsafe_blocks(source, file);
}
let mut findings = Vec::new();
let source_bytes = source.as_bytes();
analyze_fstring_injection(root, source_bytes, file, &mut findings);
analyze_string_concat_in_sinks(root, source_bytes, file, &mut findings);
findings
}
fn analyze_fstring_injection(
root: Node,
source: &[u8],
file: &Path,
findings: &mut Vec<SecureFinding>,
) {
traverse_for_fstrings(root, source, file, findings);
}
fn traverse_for_fstrings(node: Node, source: &[u8], file: &Path, findings: &mut Vec<SecureFinding>) {
if node.kind() == "call" {
if let Some(func) = node.child_by_field_name("function") {
let func_text = node_text(func, source);
for (pattern, vuln_type, severity) in TAINT_SINKS {
if func_text.contains(pattern)
|| func_text.ends_with(pattern.split('.').next_back().unwrap_or(pattern))
{
if let Some(args) = node.child_by_field_name("arguments") {
let args_text = node_text(args, source);
if args_text.contains("f\"")
|| args_text.contains("f'")
|| args_text.contains(".format(")
|| args_text.contains(" + ")
{
findings.push(SecureFinding::new(
"taint",
*severity,
format!("{}: Potential {} - user input may flow to dangerous function",
vuln_type, vuln_type.to_lowercase()),
).with_location(file.display().to_string(), node.start_position().row as u32 + 1));
}
}
}
}
}
}
for i in 0..node.child_count() {
if let Some(child) = node.child(i) {
traverse_for_fstrings(child, source, file, findings);
}
}
}
fn analyze_string_concat_in_sinks(
_root: Node,
_source: &[u8],
_file: &Path,
_findings: &mut Vec<SecureFinding>,
) {
}
const RESOURCE_CREATORS: &[&str] = &["open", "socket", "connect", "cursor", "urlopen"];
fn analyze_resources(root: Node, source: &str, file: &Path) -> Vec<SecureFinding> {
if is_rust_file(file) {
return analyze_rust_raw_pointers(source, file);
}
let mut findings = Vec::new();
let source_bytes = source.as_bytes();
find_leaked_resources(root, source_bytes, file, &mut findings);
findings
}
fn find_leaked_resources(
node: Node,
source: &[u8],
file: &Path,
findings: &mut Vec<SecureFinding>,
) {
if node.kind() == "assignment" {
if let Some(right) = node.child_by_field_name("right") {
if right.kind() == "call" {
if let Some(func) = right.child_by_field_name("function") {
let func_text = node_text(func, source);
let func_name = func_text.split('.').next_back().unwrap_or(func_text);
if RESOURCE_CREATORS.contains(&func_name) {
if !is_inside_with(node) {
findings.push(
SecureFinding::new(
"resource_leak",
"high",
format!(
"Resource '{}' opened without context manager - may leak",
func_name
),
)
.with_location(
file.display().to_string(),
node.start_position().row as u32 + 1,
),
);
}
}
}
}
}
}
for i in 0..node.child_count() {
if let Some(child) = node.child(i) {
find_leaked_resources(child, source, file, findings);
}
}
}
fn is_inside_with(node: Node) -> bool {
let mut current = node.parent();
while let Some(parent) = current {
if parent.kind() == "with_statement" {
return true;
}
current = parent.parent();
}
false
}
fn analyze_bounds(_root: Node, source: &str, file: &Path) -> Vec<SecureFinding> {
if is_rust_file(file) {
return analyze_rust_bounds(source, file);
}
Vec::new()
}
fn analyze_contracts(_root: Node, _source: &str, _file: &Path) -> Vec<SecureFinding> {
Vec::new()
}
fn analyze_behavioral(root: Node, source: &str, file: &Path) -> Vec<SecureFinding> {
let mut findings = Vec::new();
let source_bytes = source.as_bytes();
find_bare_except(root, source_bytes, file, &mut findings);
findings
}
fn find_bare_except(node: Node, source: &[u8], file: &Path, findings: &mut Vec<SecureFinding>) {
if node.kind() == "except_clause" {
let has_type = node.children(&mut node.walk()).any(|c| {
c.kind() == "as_pattern"
|| (c.kind() == "identifier" && node_text(c, source) != "Exception")
});
if !has_type {
let text = node_text(node, source);
if text.starts_with("except:") || text.starts_with("except :") {
findings.push(
SecureFinding::new(
"behavioral",
"medium",
"Bare except clause catches all exceptions including KeyboardInterrupt",
)
.with_location(
file.display().to_string(),
node.start_position().row as u32 + 1,
),
);
}
}
}
for i in 0..node.child_count() {
if let Some(child) = node.child(i) {
find_bare_except(child, source, file, findings);
}
}
}
fn analyze_mutability(_root: Node, _source: &str, _file: &Path) -> Vec<SecureFinding> {
Vec::new()
}
fn node_text<'a>(node: Node, source: &'a [u8]) -> &'a str {
std::str::from_utf8(&source[node.start_byte()..node.end_byte()]).unwrap_or("")
}
fn analyze_rust_unsafe_blocks(source: &str, file: &Path) -> Vec<SecureFinding> {
let mut findings = Vec::new();
for (idx, line) in source.lines().enumerate() {
let trimmed = line.trim();
if trimmed.starts_with("//") {
continue;
}
if trimmed.contains("unsafe {") || trimmed.starts_with("unsafe{") {
findings.push(
SecureFinding::new(
"unsafe_block",
"high",
"unsafe block detected; verify invariants and safety rationale",
)
.with_location(file.display().to_string(), (idx + 1) as u32),
);
}
}
findings
}
fn analyze_rust_raw_pointers(source: &str, file: &Path) -> Vec<SecureFinding> {
let mut findings = Vec::new();
for (idx, line) in source.lines().enumerate() {
let trimmed = line.trim();
if trimmed.starts_with("//") {
continue;
}
if trimmed.contains("std::ptr::")
|| trimmed.contains("core::ptr::")
|| trimmed.contains("ptr::read(")
|| trimmed.contains("ptr::write(")
{
findings.push(
SecureFinding::new(
"raw_pointer",
"high",
"raw pointer operation detected; audit aliasing, lifetime, and bounds assumptions",
)
.with_location(file.display().to_string(), (idx + 1) as u32),
);
}
}
findings
}
fn analyze_rust_bounds(source: &str, file: &Path) -> Vec<SecureFinding> {
let mut findings = Vec::new();
let skip_test_only = is_rust_test_file(file);
for (idx, line) in source.lines().enumerate() {
let trimmed = line.trim();
if trimmed.starts_with("//") {
continue;
}
if !skip_test_only && trimmed.contains(".unwrap()") {
findings.push(
SecureFinding::new(
"unwrap",
"medium",
"unwrap() call in non-test code may panic at runtime",
)
.with_location(file.display().to_string(), (idx + 1) as u32),
);
}
if !skip_test_only && (trimmed.contains("todo!(") || trimmed.contains("unimplemented!(")) {
findings.push(
SecureFinding::new(
"todo_marker",
"low",
"todo!/unimplemented! marker found in non-test Rust code",
)
.with_location(file.display().to_string(), (idx + 1) as u32),
);
}
}
findings
}
fn format_text_report(report: &SecureReport) -> String {
let mut output = String::new();
output.push_str(&"=".repeat(60));
output.push('\n');
output.push_str(&format!(
"{}\n",
"SECURE - Security Analysis Dashboard".bold()
));
output.push_str(&"=".repeat(60));
output.push_str("\n\n");
output.push_str(&format!("Path: {}\n\n", report.path));
if report.findings.is_empty() {
output.push_str(&format!("{}\n", "No security issues found.".green()));
} else {
output.push_str(&format!(
"{}\n",
"Severity | Category | Description".bold()
));
output.push_str(&format!("{}\n", "-".repeat(60)));
for finding in &report.findings {
let severity_colored = match finding.severity.as_str() {
"critical" => finding.severity.red().bold().to_string(),
"high" => finding.severity.red().to_string(),
"medium" => finding.severity.yellow().to_string(),
"low" => finding.severity.blue().to_string(),
_ => finding.severity.clone(),
};
output.push_str(&format!(
"{:>8} | {:<14} | {}\n",
severity_colored, finding.category, finding.description
));
if !finding.file.is_empty() {
output.push_str(&format!(
" | | {}:{}\n",
finding.file, finding.line
));
}
}
}
output.push('\n');
output.push_str(&format!("{}\n", "Summary:".bold()));
output.push_str(&format!(
" Taint issues: {} ({} critical)\n",
report.summary.taint_count, report.summary.taint_critical
));
output.push_str(&format!(
" Resource leaks: {}\n",
report.summary.leak_count
));
output.push_str(&format!(
" Bounds warnings: {}\n",
report.summary.bounds_warnings
));
output.push_str(&format!(
" Missing contracts: {}\n",
report.summary.missing_contracts
));
output.push_str(&format!(
" Mutable params: {}\n",
report.summary.mutable_params
));
output.push_str(&format!(
" Unsafe blocks: {}\n",
report.summary.unsafe_blocks
));
output.push_str(&format!(
" Raw pointer ops: {}\n",
report.summary.raw_pointer_ops
));
output.push_str(&format!(
" Unwrap calls: {}\n",
report.summary.unwrap_calls
));
output.push_str(&format!(
" Todo markers: {}\n",
report.summary.todo_markers
));
output.push('\n');
output.push_str(&format!("Elapsed: {:.2}ms\n", report.total_elapsed_ms));
output
}
#[cfg(test)]
mod tests {
use super::*;
use tree_sitter::Parser;
use tempfile::TempDir;
fn create_test_file(dir: &TempDir, name: &str, content: &str) -> PathBuf {
let path = dir.path().join(name);
fs::write(&path, content).unwrap();
path
}
#[test]
fn test_secure_args_default() {
let args = SecureArgs {
path: PathBuf::from("test.py"),
detail: None,
quick: false,
output: None,
};
assert!(!args.quick);
}
#[test]
fn test_severity_order() {
assert!(severity_order("critical") < severity_order("high"));
assert!(severity_order("high") < severity_order("medium"));
assert!(severity_order("medium") < severity_order("low"));
assert!(severity_order("low") < severity_order("info"));
}
#[test]
fn test_taint_analysis_finds_sql_injection() {
let source = r#"
def query(user_input):
cursor.execute(f"SELECT * FROM users WHERE name = '{user_input}'")
"#;
let mut parser = Parser::new();
parser
.set_language(&tree_sitter_python::LANGUAGE.into())
.unwrap();
let tree = parser.parse(source, None).unwrap();
let findings = analyze_taint(tree.root_node(), source, &PathBuf::from("test.py"));
assert!(!findings.is_empty(), "Should detect SQL injection");
assert!(findings.iter().any(|f| f.severity == "critical"));
}
#[test]
fn test_resource_analysis_finds_leak() {
let source = r#"
def read_file():
f = open("test.txt")
data = f.read()
return data
"#;
let mut parser = Parser::new();
parser
.set_language(&tree_sitter_python::LANGUAGE.into())
.unwrap();
let tree = parser.parse(source, None).unwrap();
let findings = analyze_resources(tree.root_node(), source, &PathBuf::from("test.py"));
assert!(!findings.is_empty(), "Should detect resource leak");
}
#[test]
fn test_resource_analysis_no_leak_with_context() {
let source = r#"
def read_file():
with open("test.txt") as f:
data = f.read()
return data
"#;
let mut parser = Parser::new();
parser
.set_language(&tree_sitter_python::LANGUAGE.into())
.unwrap();
let tree = parser.parse(source, None).unwrap();
let findings = analyze_resources(tree.root_node(), source, &PathBuf::from("test.py"));
assert!(
findings.is_empty(),
"Should not detect leak with context manager"
);
}
#[test]
fn test_collect_files_includes_rust() {
let temp = TempDir::new().unwrap();
create_test_file(&temp, "sample.py", "print('ok')");
create_test_file(&temp, "lib.rs", "fn main() {}");
create_test_file(&temp, "notes.txt", "ignore");
let files = collect_files(temp.path()).unwrap();
assert!(files.iter().any(|f| f.ends_with("sample.py")));
assert!(files.iter().any(|f| f.ends_with("lib.rs")));
assert!(!files.iter().any(|f| f.ends_with("notes.txt")));
}
#[test]
fn test_rust_secure_metrics_detected() {
let source = r#"
use std::ptr;
fn risky(user: &str) {
unsafe { ptr::write(user.as_ptr() as *mut u8, b'x'); }
let _v = Some(user).unwrap();
todo!("finish hardening");
}
"#;
let mut parser = Parser::new();
parser
.set_language(&tree_sitter_rust::LANGUAGE.into())
.unwrap();
let tree = parser.parse(source, None).unwrap();
let file = PathBuf::from("src/lib.rs");
let taint_findings = analyze_taint(tree.root_node(), source, &file);
let resource_findings = analyze_resources(tree.root_node(), source, &file);
let bounds_findings = analyze_bounds(tree.root_node(), source, &file);
assert!(!taint_findings.is_empty(), "Should count unsafe blocks");
assert!(
!resource_findings.is_empty(),
"Should count raw pointer ops"
);
assert!(
bounds_findings.iter().any(|f| f.category == "unwrap"),
"Should count unwrap calls"
);
assert!(
bounds_findings.iter().any(|f| f.category == "todo_marker"),
"Should count todo markers"
);
}
}