//! Secure Command - Security Analysis Dashboard
//!
//! Aggregates security sub-analyses (taint, resources, bounds, contracts,
//! behavioral, mutability) into a severity-sorted security report.
//!
//! # Sub-analyses
//!
//! - `taint`: Detect data flow from untrusted sources to sensitive sinks
//! - `resources`: Detect resource leaks (files, connections)
//! - `bounds`: Detect potential buffer overflows and bounds issues
//! - `contracts`: Analyze pre/postconditions (full mode only)
//! - `behavioral`: Analyze exception handling and state transitions (full mode only)
//! - `mutability`: Detect mutable parameter issues (full mode only)
//!
//! # Quick Mode
//!
//! Quick mode (`--quick`) runs only the fast analyses:
//! - taint, resources, bounds
//!
//! Full mode adds:
//! - contracts, behavioral, mutability
//!
//! # Example
//!
//! ```bash
//! # Analyze a file
//! tldr secure src/app.py
//!
//! # Quick mode (faster)
//! tldr secure src/app.py --quick
//!
//! # Show detail for sub-analysis
//! tldr secure src/app.py --detail taint
//!
//! # Text output
//! tldr secure src/app.py -f text
//! ```
use std::collections::HashMap;
use std::fs;
use std::path::PathBuf;
use std::time::Instant;
use clap::Args;
use colored::Colorize;
use serde_json::Value;
use tree_sitter::{Node, Parser};
use crate::output::OutputFormat;
use super::ast_cache::AstCache;
use super::error::{RemainingError, RemainingResult};
use super::types::{SecureFinding, SecureReport, SecureSummary, Severity};
// =============================================================================
// Security Analysis Types
// =============================================================================
/// Security sub-analysis types
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SecurityAnalysis {
Taint,
Resources,
Bounds,
Contracts,
Behavioral,
Mutability,
}
impl SecurityAnalysis {
/// Get the analysis name
pub fn name(&self) -> &'static str {
match self {
Self::Taint => "taint",
Self::Resources => "resources",
Self::Bounds => "bounds",
Self::Contracts => "contracts",
Self::Behavioral => "behavioral",
Self::Mutability => "mutability",
}
}
}
/// Quick mode analyses (fast)
pub const QUICK_ANALYSES: &[SecurityAnalysis] = &[
SecurityAnalysis::Taint,
SecurityAnalysis::Resources,
SecurityAnalysis::Bounds,
];
/// Full mode analyses (all)
pub const FULL_ANALYSES: &[SecurityAnalysis] = &[
SecurityAnalysis::Taint,
SecurityAnalysis::Resources,
SecurityAnalysis::Bounds,
SecurityAnalysis::Contracts,
SecurityAnalysis::Behavioral,
SecurityAnalysis::Mutability,
];
// =============================================================================
// CLI Arguments
// =============================================================================
/// Security analysis dashboard aggregating multiple security checks
#[derive(Debug, Args, Clone)]
pub struct SecureArgs {
/// File path or directory to analyze
pub path: PathBuf,
/// Show details for specific sub-analysis
#[arg(long)]
pub detail: Option<String>,
/// Run quick mode (taint, resources, bounds only)
#[arg(long)]
pub quick: bool,
/// Write output to file instead of stdout
#[arg(long, short = 'o')]
pub output: Option<PathBuf>,
}
impl SecureArgs {
/// Run the secure command with CLI-provided format
pub fn run(&self, format: OutputFormat) -> anyhow::Result<()> {
run(self.clone(), format)
}
}
// =============================================================================
// Implementation
// =============================================================================
/// Run the secure analysis
pub fn run(args: SecureArgs, format: OutputFormat) -> anyhow::Result<()> {
let start = Instant::now();
// Validate path exists
if !args.path.exists() {
return Err(RemainingError::file_not_found(&args.path).into());
}
// Create report
let mut report = SecureReport::new(args.path.display().to_string());
// Initialize AST cache for shared parsing
let mut cache = AstCache::default();
// Determine which analyses to run
let analyses = if args.quick {
QUICK_ANALYSES
} else {
FULL_ANALYSES
};
// Collect files to analyze (auto-detect Python files)
let files = collect_files(&args.path)?;
// Run sub-analyses and collect findings
let mut all_findings = Vec::new();
let mut sub_results: HashMap<String, Value> = HashMap::new();
for analysis in analyses {
let (findings, raw_result) = run_security_analysis(*analysis, &files, &mut cache)?;
// Update summary
update_summary(&mut report.summary, *analysis, &findings);
// Collect findings
all_findings.extend(findings);
// Store raw result if requested
if args.detail.as_deref() == Some(analysis.name()) {
sub_results.insert(analysis.name().to_string(), raw_result);
}
}
// Sort findings by severity (critical first)
all_findings.sort_by(|a, b| {
severity_order(&a.severity).cmp(&severity_order(&b.severity))
});
report.findings = all_findings;
report.sub_results = sub_results;
report.total_elapsed_ms = start.elapsed().as_secs_f64() * 1000.0;
// Output
let output_str = match format {
OutputFormat::Json => serde_json::to_string_pretty(&report)?,
OutputFormat::Compact => serde_json::to_string(&report)?,
OutputFormat::Text => format_text_report(&report),
OutputFormat::Sarif | OutputFormat::Dot => {
// SARIF/DOT not fully supported for secure, fall back to JSON
serde_json::to_string_pretty(&report)?
}
};
// Write output
if let Some(output_path) = &args.output {
fs::write(output_path, &output_str)?;
} else {
println!("{}", output_str);
}
Ok(())
}
/// Collect Python files to analyze
fn collect_files(path: &PathBuf) -> RemainingResult<Vec<PathBuf>> {
let mut files = Vec::new();
if path.is_file() {
files.push(path.clone());
} else if path.is_dir() {
// Walk directory and collect Python files
for entry in walkdir::WalkDir::new(path)
.max_depth(10)
.into_iter()
.filter_map(|e| e.ok())
{
let p = entry.path();
if p.is_file() {
let ext = p.extension().and_then(|e| e.to_str()).unwrap_or("");
// Auto-detect Python files by extension
if ext == "py" {
files.push(p.to_path_buf());
}
}
}
}
if files.is_empty() {
return Err(RemainingError::file_not_found(path));
}
Ok(files)
}
/// Run a specific security analysis on files
fn run_security_analysis(
analysis: SecurityAnalysis,
files: &[PathBuf],
cache: &mut AstCache,
) -> RemainingResult<(Vec<SecureFinding>, Value)> {
let mut findings = Vec::new();
for file in files {
let source = fs::read_to_string(file)?;
// Get or parse the AST
let tree = cache.get_or_parse(file, &source)?;
// Run analysis
let file_findings = match analysis {
SecurityAnalysis::Taint => analyze_taint(tree.root_node(), &source, file),
SecurityAnalysis::Resources => analyze_resources(tree.root_node(), &source, file),
SecurityAnalysis::Bounds => analyze_bounds(tree.root_node(), &source, file),
SecurityAnalysis::Contracts => analyze_contracts(tree.root_node(), &source, file),
SecurityAnalysis::Behavioral => analyze_behavioral(tree.root_node(), &source, file),
SecurityAnalysis::Mutability => analyze_mutability(tree.root_node(), &source, file),
};
findings.extend(file_findings);
}
// Create raw result
let raw_result = serde_json::to_value(&findings).unwrap_or(Value::Array(vec![]));
Ok((findings, raw_result))
}
/// Update summary based on findings
fn update_summary(summary: &mut SecureSummary, analysis: SecurityAnalysis, findings: &[SecureFinding]) {
match analysis {
SecurityAnalysis::Taint => {
summary.taint_count = findings.len() as u32;
summary.taint_critical = findings
.iter()
.filter(|f| f.severity == "critical")
.count() as u32;
}
SecurityAnalysis::Resources => {
summary.leak_count = findings.len() as u32;
}
SecurityAnalysis::Bounds => {
summary.bounds_warnings = findings.len() as u32;
}
SecurityAnalysis::Contracts => {
summary.missing_contracts = findings.len() as u32;
}
SecurityAnalysis::Behavioral => {
// Not tracked in summary
}
SecurityAnalysis::Mutability => {
summary.mutable_params = findings.len() as u32;
}
}
}
/// Get severity order (lower = more severe)
fn severity_order(severity: &str) -> u8 {
match severity {
"critical" => 0,
"high" => 1,
"medium" => 2,
"low" => 3,
"info" => 4,
_ => 5,
}
}
// =============================================================================
// Taint Analysis
// =============================================================================
/// Known taint sources in Python
const TAINT_SOURCES: &[&str] = &[
"request.args",
"request.form",
"request.data",
"request.json",
"request.values",
"input",
"raw_input",
"sys.stdin",
"os.environ",
];
/// Known taint sinks in Python
const TAINT_SINKS: &[(&str, &str, &str)] = &[
// (pattern, vuln_type, severity)
("cursor.execute", "SQL Injection", "critical"),
("execute", "SQL Injection", "critical"),
("os.system", "Command Injection", "critical"),
("subprocess.call", "Command Injection", "critical"),
("subprocess.run", "Command Injection", "high"),
("subprocess.Popen", "Command Injection", "high"),
("eval", "Code Injection", "critical"),
("exec", "Code Injection", "critical"),
("pickle.loads", "Insecure Deserialization", "critical"),
("yaml.load", "Insecure Deserialization", "high"),
("open", "Path Traversal", "high"),
("render_template_string", "Template Injection", "high"),
];
/// Analyze taint flows in a file
fn analyze_taint(root: Node, source: &str, file: &PathBuf) -> Vec<SecureFinding> {
let mut findings = Vec::new();
let source_bytes = source.as_bytes();
// Simple pattern matching for dangerous patterns
// In a full implementation, this would do data flow analysis
// Find f-strings and format strings used in dangerous contexts
analyze_fstring_injection(root, source_bytes, file, &mut findings);
// Find direct concatenation in dangerous sinks
analyze_string_concat_in_sinks(root, source_bytes, file, &mut findings);
findings
}
fn analyze_fstring_injection(root: Node, source: &[u8], file: &PathBuf, findings: &mut Vec<SecureFinding>) {
let mut cursor = root.walk();
traverse_for_fstrings(root, source, file, findings, &mut cursor);
}
fn traverse_for_fstrings(node: Node, source: &[u8], file: &PathBuf, findings: &mut Vec<SecureFinding>, cursor: &mut tree_sitter::TreeCursor) {
// Check if this is a call to a dangerous function with an f-string
if node.kind() == "call" {
if let Some(func) = node.child_by_field_name("function") {
let func_text = node_text(func, source);
// Check if it's a dangerous sink
for (pattern, vuln_type, severity) in TAINT_SINKS {
if func_text.contains(pattern) || func_text.ends_with(pattern.split('.').last().unwrap_or(pattern)) {
// Check if arguments contain f-strings or format
if let Some(args) = node.child_by_field_name("arguments") {
let args_text = node_text(args, source);
if args_text.contains("f\"") || args_text.contains("f'") ||
args_text.contains(".format(") || args_text.contains(" + ") {
findings.push(SecureFinding::new(
"taint",
*severity,
format!("{}: Potential {} - user input may flow to dangerous function",
vuln_type, vuln_type.to_lowercase()),
).with_location(file.display().to_string(), node.start_position().row as u32 + 1));
}
}
}
}
}
}
// Recurse
for i in 0..node.child_count() {
if let Some(child) = node.child(i) {
traverse_for_fstrings(child, source, file, findings, cursor);
}
}
}
fn analyze_string_concat_in_sinks(_root: Node, _source: &[u8], _file: &PathBuf, _findings: &mut Vec<SecureFinding>) {
// Placeholder for string concatenation analysis
// In a full implementation, this would track string operations
}
// =============================================================================
// Resource Analysis
// =============================================================================
/// Known resource creators
const RESOURCE_CREATORS: &[&str] = &[
"open",
"socket",
"connect",
"cursor",
"urlopen",
];
/// Analyze resource leaks in a file
fn analyze_resources(root: Node, source: &str, file: &PathBuf) -> Vec<SecureFinding> {
let mut findings = Vec::new();
let source_bytes = source.as_bytes();
// Find resource assignments outside of `with` statements
find_leaked_resources(root, source_bytes, file, &mut findings);
findings
}
fn find_leaked_resources(node: Node, source: &[u8], file: &PathBuf, findings: &mut Vec<SecureFinding>) {
// Check if this is an assignment with a resource creator
if node.kind() == "assignment" {
if let Some(right) = node.child_by_field_name("right") {
if right.kind() == "call" {
if let Some(func) = right.child_by_field_name("function") {
let func_text = node_text(func, source);
let func_name = func_text.split('.').last().unwrap_or(&func_text);
if RESOURCE_CREATORS.contains(&func_name) {
// Check if this is inside a with statement
if !is_inside_with(node) {
findings.push(SecureFinding::new(
"resource_leak",
"high",
format!("Resource '{}' opened without context manager - may leak", func_name),
).with_location(file.display().to_string(), node.start_position().row as u32 + 1));
}
}
}
}
}
}
// Recurse
for i in 0..node.child_count() {
if let Some(child) = node.child(i) {
find_leaked_resources(child, source, file, findings);
}
}
}
fn is_inside_with(node: Node) -> bool {
let mut current = node.parent();
while let Some(parent) = current {
if parent.kind() == "with_statement" {
return true;
}
current = parent.parent();
}
false
}
// =============================================================================
// Bounds Analysis
// =============================================================================
/// Analyze bounds/overflow issues in a file
fn analyze_bounds(_root: Node, _source: &str, _file: &PathBuf) -> Vec<SecureFinding> {
// Placeholder - would analyze array indexing, integer operations
Vec::new()
}
// =============================================================================
// Contracts Analysis
// =============================================================================
/// Analyze missing contracts in a file
fn analyze_contracts(_root: Node, _source: &str, _file: &PathBuf) -> Vec<SecureFinding> {
// Placeholder - would check for functions without type hints, docstrings, or assertions
Vec::new()
}
// =============================================================================
// Behavioral Analysis
// =============================================================================
/// Analyze behavioral issues (exception handling, state) in a file
fn analyze_behavioral(root: Node, source: &str, file: &PathBuf) -> Vec<SecureFinding> {
let mut findings = Vec::new();
let source_bytes = source.as_bytes();
// Find bare except clauses
find_bare_except(root, source_bytes, file, &mut findings);
findings
}
fn find_bare_except(node: Node, source: &[u8], file: &PathBuf, findings: &mut Vec<SecureFinding>) {
// Check for except clauses without exception type
if node.kind() == "except_clause" {
let has_type = node.children(&mut node.walk())
.any(|c| c.kind() == "as_pattern" ||
(c.kind() == "identifier" && node_text(c, source) != "Exception"));
if !has_type {
let text = node_text(node, source);
if text.starts_with("except:") || text.starts_with("except :") {
findings.push(SecureFinding::new(
"behavioral",
"medium",
"Bare except clause catches all exceptions including KeyboardInterrupt",
).with_location(file.display().to_string(), node.start_position().row as u32 + 1));
}
}
}
// Recurse
for i in 0..node.child_count() {
if let Some(child) = node.child(i) {
find_bare_except(child, source, file, findings);
}
}
}
// =============================================================================
// Mutability Analysis
// =============================================================================
/// Analyze mutability issues in a file
fn analyze_mutability(_root: Node, _source: &str, _file: &PathBuf) -> Vec<SecureFinding> {
// Placeholder - would check for mutable default arguments, etc.
Vec::new()
}
// =============================================================================
// Utilities
// =============================================================================
fn node_text<'a>(node: Node, source: &'a [u8]) -> &'a str {
std::str::from_utf8(&source[node.start_byte()..node.end_byte()]).unwrap_or("")
}
// =============================================================================
// Text Output
// =============================================================================
fn format_text_report(report: &SecureReport) -> String {
let mut output = String::new();
output.push_str(&"=".repeat(60));
output.push('\n');
output.push_str(&format!("{}\n", "SECURE - Security Analysis Dashboard".bold()));
output.push_str(&"=".repeat(60));
output.push_str("\n\n");
output.push_str(&format!("Path: {}\n\n", report.path));
if report.findings.is_empty() {
output.push_str(&format!("{}\n", "No security issues found.".green()));
} else {
output.push_str(&format!("{}\n", "Severity | Category | Description".bold()));
output.push_str(&format!("{}\n", "-".repeat(60)));
for finding in &report.findings {
let severity_colored = match finding.severity.as_str() {
"critical" => finding.severity.red().bold().to_string(),
"high" => finding.severity.red().to_string(),
"medium" => finding.severity.yellow().to_string(),
"low" => finding.severity.blue().to_string(),
_ => finding.severity.clone(),
};
output.push_str(&format!(
"{:>8} | {:<14} | {}\n",
severity_colored,
finding.category,
finding.description
));
if !finding.file.is_empty() {
output.push_str(&format!(" | | {}:{}\n", finding.file, finding.line));
}
}
}
output.push_str("\n");
output.push_str(&format!("{}\n", "Summary:".bold()));
output.push_str(&format!(" Taint issues: {} ({} critical)\n",
report.summary.taint_count, report.summary.taint_critical));
output.push_str(&format!(" Resource leaks: {}\n", report.summary.leak_count));
output.push_str(&format!(" Bounds warnings: {}\n", report.summary.bounds_warnings));
output.push_str(&format!(" Missing contracts: {}\n", report.summary.missing_contracts));
output.push_str(&format!(" Mutable params: {}\n", report.summary.mutable_params));
output.push_str("\n");
output.push_str(&format!("Elapsed: {:.2}ms\n", report.total_elapsed_ms));
output
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
fn create_test_file(dir: &TempDir, name: &str, content: &str) -> PathBuf {
let path = dir.path().join(name);
fs::write(&path, content).unwrap();
path
}
#[test]
fn test_secure_args_default() {
// Test that default values are set correctly
let args = SecureArgs {
path: PathBuf::from("test.py"),
detail: None,
quick: false,
output: None,
};
assert!(!args.quick);
}
#[test]
fn test_severity_order() {
assert!(severity_order("critical") < severity_order("high"));
assert!(severity_order("high") < severity_order("medium"));
assert!(severity_order("medium") < severity_order("low"));
assert!(severity_order("low") < severity_order("info"));
}
#[test]
fn test_taint_analysis_finds_sql_injection() {
let source = r#"
def query(user_input):
cursor.execute(f"SELECT * FROM users WHERE name = '{user_input}'")
"#;
let mut parser = Parser::new();
parser.set_language(&tree_sitter_python::LANGUAGE.into()).unwrap();
let tree = parser.parse(source, None).unwrap();
let findings = analyze_taint(tree.root_node(), source, &PathBuf::from("test.py"));
assert!(!findings.is_empty(), "Should detect SQL injection");
assert!(findings.iter().any(|f| f.severity == "critical"));
}
#[test]
fn test_resource_analysis_finds_leak() {
let source = r#"
def read_file():
f = open("test.txt")
data = f.read()
return data
"#;
let mut parser = Parser::new();
parser.set_language(&tree_sitter_python::LANGUAGE.into()).unwrap();
let tree = parser.parse(source, None).unwrap();
let findings = analyze_resources(tree.root_node(), source, &PathBuf::from("test.py"));
assert!(!findings.is_empty(), "Should detect resource leak");
}
#[test]
fn test_resource_analysis_no_leak_with_context() {
let source = r#"
def read_file():
with open("test.txt") as f:
data = f.read()
return data
"#;
let mut parser = Parser::new();
parser.set_language(&tree_sitter_python::LANGUAGE.into()).unwrap();
let tree = parser.parse(source, None).unwrap();
let findings = analyze_resources(tree.root_node(), source, &PathBuf::from("test.py"));
assert!(findings.is_empty(), "Should not detect leak with context manager");
}
}