use std::collections::{HashMap, HashSet};
use tempfile::TempDir;
use tldr_core::security::ast_utils::{
assignment_node_kinds, call_node_kinds, string_node_kinds,
};
use tldr_core::security::taint::{
build_line_to_block, build_predecessors, build_successors, compute_taint,
detect_sanitizer, detect_sinks, detect_sources, validate_cfg, SanitizerType,
TaintInfo, TaintSink, TaintSinkType, TaintSourceType,
};
use tldr_core::security::{
scan_secrets, scan_vulnerabilities, Severity as SecretSeverity, VulnType,
};
use tldr_core::types::{BlockType, CfgBlock, CfgEdge, CfgInfo, EdgeType, RefType, VarRef};
use tldr_core::Language;
fn create_test_cfg() -> CfgInfo {
CfgInfo {
function: "test_func".to_string(),
blocks: vec![
CfgBlock {
id: 0,
block_type: BlockType::Entry,
lines: (1, 3),
calls: vec![],
},
CfgBlock {
id: 1,
block_type: BlockType::Body,
lines: (4, 6),
calls: vec![],
},
CfgBlock {
id: 2,
block_type: BlockType::Exit,
lines: (7, 8),
calls: vec![],
},
],
edges: vec![
CfgEdge {
from: 0,
to: 1,
edge_type: EdgeType::Unconditional,
condition: None,
},
CfgEdge {
from: 1,
to: 2,
edge_type: EdgeType::Unconditional,
condition: None,
},
],
entry_block: 0,
exit_blocks: vec![2],
cyclomatic_complexity: 1,
nested_functions: HashMap::new(),
}
}
fn make_def(name: &str, line: u32) -> VarRef {
VarRef {
name: name.to_string(),
ref_type: RefType::Definition,
line,
column: 0,
context: None,
group_id: None,
}
}
fn make_use(name: &str, line: u32) -> VarRef {
VarRef {
name: name.to_string(),
ref_type: RefType::Use,
line,
column: 0,
context: None,
group_id: None,
}
}
#[test]
fn test_secrets_scan_empty_directory() {
let temp_dir = TempDir::new().unwrap();
let result = scan_secrets(temp_dir.path(), 4.5, false, None);
assert!(result.is_ok());
let report = result.unwrap();
assert_eq!(report.findings.len(), 0);
}
#[test]
fn test_secrets_scan_aws_key() {
let temp_dir = TempDir::new().unwrap();
let test_file = temp_dir.path().join("config.py");
std::fs::write(
&test_file,
r#"
# AWS Configuration
AWS_ACCESS_KEY_ID = "AKIAIOSFODNN7EXAMPLE"
AWS_SECRET_ACCESS_KEY = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
"#,
)
.unwrap();
let result = scan_secrets(temp_dir.path(), 4.5, false, None);
assert!(result.is_ok());
let report = result.unwrap();
let aws_findings: Vec<_> = report
.findings
.iter()
.filter(|f| f.pattern == "AWS Access Key" || f.pattern == "AWS Secret Key")
.collect();
assert!(!aws_findings.is_empty(), "Should detect AWS keys");
}
#[test]
#[ignore = "BUG: Private key pattern may not detect all PEM formats"]
fn test_secrets_scan_private_key() {
let temp_dir = TempDir::new().unwrap();
let test_file = temp_dir.path().join("private.pem");
std::fs::write(
&test_file,
r#"
-----BEGIN RSA PRIVATE KEY-----
MIIEpAIBAAKCAQEA0Z3VS5JJcds3xfn/ygWyF8PbnGy0AHB7MQ0sL52/luJ1LhJv
-----END RSA PRIVATE KEY-----
"#,
)
.unwrap();
let result = scan_secrets(temp_dir.path(), 4.5, false, None);
assert!(result.is_ok());
let report = result.unwrap();
let key_findings: Vec<_> = report
.findings
.iter()
.filter(|f| f.pattern == "Private Key")
.collect();
println!("Found {} private key findings", key_findings.len());
}
#[test]
fn test_secrets_scan_github_token() {
let temp_dir = TempDir::new().unwrap();
let test_file = temp_dir.path().join("tokens.txt");
std::fs::write(
&test_file,
r#"
GITHUB_TOKEN=ghp_xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
"#,
)
.unwrap();
let result = scan_secrets(temp_dir.path(), 4.5, false, None);
assert!(result.is_ok());
let report = result.unwrap();
let _token_findings: Vec<_> = report
.findings
.iter()
.filter(|f| f.pattern == "GitHub Token")
.collect();
}
#[test]
fn test_secrets_scan_password_assignment() {
let temp_dir = TempDir::new().unwrap();
let test_file = temp_dir.path().join("config.env");
std::fs::write(
&test_file,
r#"
DB_PASSWORD = "super_secret_password_123"
API_KEY = "sk-abcdefghijklmnop"
"#,
)
.unwrap();
let result = scan_secrets(temp_dir.path(), 4.5, false, None);
assert!(result.is_ok());
let report = result.unwrap();
let pwd_findings: Vec<_> = report
.findings
.iter()
.filter(|f| f.pattern == "Password")
.collect();
assert!(
!pwd_findings.is_empty(),
"Should detect password assignment"
);
}
#[test]
#[ignore = "BUG: Database URL pattern may not detect all connection strings"]
fn test_secrets_scan_database_url() {
let temp_dir = TempDir::new().unwrap();
let test_file = temp_dir.path().join(".env");
std::fs::write(
&test_file,
r#"
DATABASE_URL=postgres://user:password@localhost:5432/mydb
"#,
)
.unwrap();
let result = scan_secrets(temp_dir.path(), 4.5, false, None);
assert!(result.is_ok());
let report = result.unwrap();
let db_findings: Vec<_> = report
.findings
.iter()
.filter(|f| f.pattern == "Database URL")
.collect();
println!("Found {} database URL findings", db_findings.len());
}
#[test]
fn test_secrets_scan_with_severity_filter() {
let temp_dir = TempDir::new().unwrap();
let test_file = temp_dir.path().join("mixed.txt");
std::fs::write(
&test_file,
r#"
# Critical: AWS Key
AWS_ACCESS_KEY_ID = "AKIAIOSFODNN7EXAMPLE"
# Medium: JWT
token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.test"
"#,
)
.unwrap();
let result = scan_secrets(temp_dir.path(), 4.5, false, Some(SecretSeverity::Critical));
assert!(result.is_ok());
let report = result.unwrap();
for finding in &report.findings {
assert!(
finding.severity >= SecretSeverity::Critical,
"Should only have Critical or higher severity"
);
}
}
#[test]
fn test_secrets_severity_ordering() {
assert!(SecretSeverity::Critical > SecretSeverity::High);
assert!(SecretSeverity::High > SecretSeverity::Medium);
assert!(SecretSeverity::Medium > SecretSeverity::Low);
}
#[test]
fn test_taint_source_type_variants() {
let _variants = [TaintSourceType::UserInput,
TaintSourceType::Stdin,
TaintSourceType::HttpParam,
TaintSourceType::HttpBody,
TaintSourceType::EnvVar,
TaintSourceType::FileRead];
}
#[test]
fn test_taint_sink_type_variants() {
let _variants = [TaintSinkType::SqlQuery,
TaintSinkType::CodeEval,
TaintSinkType::CodeExec,
TaintSinkType::CodeCompile,
TaintSinkType::ShellExec,
TaintSinkType::FileWrite];
}
#[test]
fn test_taint_info_new() {
let info = TaintInfo::new("test_function");
assert_eq!(info.function_name, "test_function");
assert!(info.tainted_vars.is_empty());
assert!(info.sources.is_empty());
assert!(info.sinks.is_empty());
assert!(info.flows.is_empty());
assert!(info.sanitized_vars.is_empty());
}
#[test]
fn test_taint_info_is_tainted() {
let mut info = TaintInfo::new("test");
let mut block_vars = HashSet::new();
block_vars.insert("user_input".to_string());
info.tainted_vars.insert(0, block_vars);
assert!(info.is_tainted(0, "user_input"));
assert!(!info.is_tainted(0, "other_var"));
assert!(!info.is_tainted(1, "user_input")); }
#[test]
fn test_taint_info_get_vulnerabilities() {
let mut info = TaintInfo::new("test");
info.sinks.push(TaintSink {
var: "query".to_string(),
line: 5,
sink_type: TaintSinkType::SqlQuery,
tainted: true,
statement: Some("cursor.execute(query)".to_string()),
});
info.sinks.push(TaintSink {
var: "safe_query".to_string(),
line: 10,
sink_type: TaintSinkType::SqlQuery,
tainted: false,
statement: Some("cursor.execute(safe_query)".to_string()),
});
let vulns = info.get_vulnerabilities();
assert_eq!(vulns.len(), 1);
assert_eq!(vulns[0].var, "query");
}
#[test]
fn test_detect_sources_python_input() {
let sources = detect_sources("user_input = input()", 1, Language::Python);
assert_eq!(sources.len(), 1);
assert_eq!(sources[0].var, "user_input");
assert!(matches!(sources[0].source_type, TaintSourceType::UserInput));
}
#[test]
fn test_detect_sources_python_request_args() {
let sources = detect_sources("user_id = request.args.get('id')", 1, Language::Python);
assert!(!sources.is_empty());
assert!(sources
.iter()
.any(|s| matches!(s.source_type, TaintSourceType::HttpParam)));
}
#[test]
fn test_detect_sources_python_os_environ() {
let sources = detect_sources("value = os.environ['SECRET']", 1, Language::Python);
assert!(!sources.is_empty());
assert!(sources
.iter()
.any(|s| matches!(s.source_type, TaintSourceType::EnvVar)));
}
#[test]
fn test_detect_sinks_python_sql_execute() {
let sinks = detect_sinks("cursor.execute(query)", 5, Language::Python);
assert!(!sinks.is_empty());
assert!(sinks
.iter()
.any(|s| matches!(s.sink_type, TaintSinkType::SqlQuery)));
}
#[test]
fn test_detect_sinks_python_eval() {
let sinks = detect_sinks("result = eval(user_code)", 10, Language::Python);
assert!(!sinks.is_empty());
assert!(sinks
.iter()
.any(|s| matches!(s.sink_type, TaintSinkType::CodeEval)));
}
#[test]
fn test_detect_sinks_python_exec() {
let sinks = detect_sinks("exec(dynamic_code)", 10, Language::Python);
assert!(!sinks.is_empty());
assert!(sinks
.iter()
.any(|s| matches!(s.sink_type, TaintSinkType::CodeExec)));
}
#[test]
fn test_detect_sinks_python_subprocess() {
let sinks = detect_sinks("subprocess.run(cmd, shell=True)", 15, Language::Python);
assert!(!sinks.is_empty());
assert!(sinks
.iter()
.any(|s| matches!(s.sink_type, TaintSinkType::ShellExec)));
}
#[test]
fn test_detect_sanitizer_python_int() {
let sanitizer = detect_sanitizer("safe_id = int(user_id)", Language::Python);
assert_eq!(sanitizer, Some(SanitizerType::Numeric));
}
#[test]
fn test_detect_sanitizer_python_shlex() {
let sanitizer = detect_sanitizer("safe_cmd = shlex.quote(user_input)", Language::Python);
assert_eq!(sanitizer, Some(SanitizerType::Shell));
}
#[test]
fn test_detect_sanitizer_python_html_escape() {
let sanitizer = detect_sanitizer("safe_html = html.escape(user_content)", Language::Python);
assert_eq!(sanitizer, Some(SanitizerType::Html));
}
#[test]
fn test_build_predecessors() {
let cfg = create_test_cfg();
let preds = build_predecessors(&cfg);
assert_eq!(preds.get(&0).unwrap().len(), 0); assert_eq!(preds.get(&1).unwrap(), &vec![0]);
assert_eq!(preds.get(&2).unwrap(), &vec![1]);
}
#[test]
fn test_build_successors() {
let cfg = create_test_cfg();
let succs = build_successors(&cfg);
assert_eq!(succs.get(&0).unwrap(), &vec![1]);
assert_eq!(succs.get(&1).unwrap(), &vec![2]);
assert_eq!(succs.get(&2).unwrap().len(), 0); }
#[test]
fn test_build_line_to_block() {
let cfg = create_test_cfg();
let mapping = build_line_to_block(&cfg);
assert_eq!(mapping.get(&1), Some(&0));
assert_eq!(mapping.get(&5), Some(&1));
assert_eq!(mapping.get(&7), Some(&2));
}
#[test]
fn test_validate_cfg_valid() {
let cfg = create_test_cfg();
assert!(validate_cfg(&cfg).is_ok());
}
#[test]
fn test_validate_cfg_empty_blocks() {
let cfg = CfgInfo {
function: "empty".to_string(),
blocks: vec![],
edges: vec![],
entry_block: 0,
exit_blocks: vec![],
cyclomatic_complexity: 0,
nested_functions: HashMap::new(),
};
let result = validate_cfg(&cfg);
assert!(result.is_err());
}
#[test]
fn test_validate_cfg_invalid_entry() {
let mut cfg = create_test_cfg();
cfg.entry_block = 999;
let result = validate_cfg(&cfg);
assert!(result.is_err());
}
#[test]
fn test_validate_cfg_invalid_edge() {
let mut cfg = create_test_cfg();
cfg.edges.push(CfgEdge {
from: 999, to: 1,
edge_type: EdgeType::Unconditional,
condition: None,
});
let result = validate_cfg(&cfg);
assert!(result.is_err());
}
#[test]
fn test_compute_taint_simple_propagation() {
let cfg = create_test_cfg();
let refs = vec![make_def("x", 1), make_use("x", 4), make_def("y", 4)];
let mut statements = HashMap::new();
statements.insert(1, "x = input()".to_string());
statements.insert(4, "y = x".to_string());
let result = compute_taint(&cfg, &refs, &statements, Language::Python);
assert!(result.is_ok());
let taint_info = result.unwrap();
assert!(taint_info.is_tainted(0, "x"));
}
#[test]
fn test_compute_taint_sanitizer_removes_taint() {
let cfg = create_test_cfg();
let refs = vec![
make_def("user_id", 1),
make_use("user_id", 4),
make_def("safe_id", 4),
];
let mut statements = HashMap::new();
statements.insert(1, "user_id = request.args.get('id')".to_string());
statements.insert(4, "safe_id = int(user_id)".to_string());
let result = compute_taint(&cfg, &refs, &statements, Language::Python);
assert!(result.is_ok());
let taint_info = result.unwrap();
assert!(taint_info.sanitized_vars.contains("safe_id"));
}
#[test]
fn test_compute_taint_no_sources() {
let cfg = create_test_cfg();
let refs = vec![make_def("x", 1), make_def("y", 4)];
let mut statements = HashMap::new();
statements.insert(1, "x = 42".to_string()); statements.insert(4, "y = x + 1".to_string());
let result = compute_taint(&cfg, &refs, &statements, Language::Python);
assert!(result.is_ok());
let taint_info = result.unwrap();
assert!(taint_info.sources.is_empty());
}
#[test]
fn test_scan_vulnerabilities_empty() {
let temp_dir = TempDir::new().unwrap();
let result = scan_vulnerabilities(temp_dir.path(), None, None);
assert!(result.is_ok());
let report = result.unwrap();
assert_eq!(report.findings.len(), 0);
}
#[test]
fn test_scan_vulnerabilities_sql_injection() {
let temp_dir = TempDir::new().unwrap();
let test_file = temp_dir.path().join("db.py");
std::fs::write(
&test_file,
r#"
from flask import request
def get_user():
user_id = request.args.get('id')
cursor.execute("SELECT * FROM users WHERE id = " + user_id)
"#,
)
.unwrap();
let result = scan_vulnerabilities(temp_dir.path(), Some(Language::Python), None);
assert!(result.is_ok());
let report = result.unwrap();
let _sql_injections: Vec<_> = report
.findings
.iter()
.filter(|f| matches!(f.vuln_type, VulnType::SqlInjection))
.collect();
}
#[test]
fn test_scan_vulnerabilities_command_injection() {
let temp_dir = TempDir::new().unwrap();
let test_file = temp_dir.path().join("cmd.py");
std::fs::write(
&test_file,
r#"
from flask import request
import os
def run_command():
cmd = request.args.get('cmd')
os.system(cmd)
"#,
)
.unwrap();
let result = scan_vulnerabilities(temp_dir.path(), Some(Language::Python), None);
assert!(result.is_ok());
let report = result.unwrap();
let _cmd_injections: Vec<_> = report
.findings
.iter()
.filter(|f| matches!(f.vuln_type, VulnType::CommandInjection))
.collect();
}
#[test]
fn test_call_node_kinds() {
let python_kinds = call_node_kinds(Language::Python);
assert!(python_kinds.contains(&"call"));
let ts_kinds = call_node_kinds(Language::TypeScript);
assert!(ts_kinds.contains(&"call_expression"));
let go_kinds = call_node_kinds(Language::Go);
assert!(go_kinds.contains(&"call_expression"));
let rust_kinds = call_node_kinds(Language::Rust);
assert!(rust_kinds.contains(&"call_expression"));
assert!(rust_kinds.contains(&"macro_invocation"));
}
#[test]
fn test_string_node_kinds() {
let python_kinds = string_node_kinds(Language::Python);
assert!(python_kinds.contains(&"string"));
let rust_kinds = string_node_kinds(Language::Rust);
assert!(rust_kinds.contains(&"string_literal"));
assert!(rust_kinds.contains(&"raw_string_literal"));
}
#[test]
fn test_assignment_node_kinds() {
let python_kinds = assignment_node_kinds(Language::Python);
assert!(python_kinds.contains(&"assignment"));
let rust_kinds = assignment_node_kinds(Language::Rust);
assert!(rust_kinds.contains(&"let_declaration"));
}
#[test]
fn test_detect_sources_typescript() {
let sources = detect_sources("const data = req.body", 1, Language::TypeScript);
let _ = sources;
}
#[test]
fn test_detect_sources_go() {
let sources = detect_sources("name := r.FormValue(\"name\")", 1, Language::Go);
let _ = sources;
}
#[test]
fn test_detect_sinks_typescript() {
let sinks = detect_sinks("eval(userInput)", 1, Language::TypeScript);
let _ = sinks;
}
#[test]
fn test_detect_sanitizer_typescript() {
let sanitizer = detect_sanitizer("parseInt(val)", Language::TypeScript);
assert_eq!(sanitizer, Some(SanitizerType::Numeric));
}
#[test]
fn test_taint_source_type_serialization() {
let source_type = TaintSourceType::HttpParam;
let json = serde_json::to_string(&source_type).unwrap();
assert_eq!(json, "\"http_param\"");
}
#[test]
fn test_taint_sink_type_serialization() {
let sink_type = TaintSinkType::SqlQuery;
let json = serde_json::to_string(&sink_type).unwrap();
assert_eq!(json, "\"sql_query\"");
}
#[test]
fn test_sanitizer_type_serialization() {
let sanitizer = SanitizerType::Numeric;
let json = serde_json::to_string(&sanitizer).unwrap();
assert_eq!(json, "\"numeric\"");
}
#[test]
fn test_full_security_scan() {
let temp_dir = TempDir::new().unwrap();
let test_file = temp_dir.path().join("app.py");
std::fs::write(
&test_file,
r#"
from flask import request
import os
API_KEY = "AKIAIOSFODNN7EXAMPLE"
def get_user():
user_id = request.args.get('id')
cursor.execute("SELECT * FROM users WHERE id = " + user_id)
def run_cmd():
cmd = request.form.get('cmd')
os.system(cmd)
"#,
)
.unwrap();
let secrets_result = scan_secrets(temp_dir.path(), 4.5, false, None);
assert!(secrets_result.is_ok());
let vuln_result = scan_vulnerabilities(temp_dir.path(), Some(Language::Python), None);
assert!(vuln_result.is_ok());
}
#[test]
#[ignore = "Documents potential issue with extract_call_name on invalid nodes"]
fn test_extract_call_name_invalid_node() {
}
#[test]
fn test_compute_taint_malformed_cfg() {
let empty_cfg = CfgInfo {
function: "empty".to_string(),
blocks: vec![],
edges: vec![],
entry_block: 0,
exit_blocks: vec![],
cyclomatic_complexity: 0,
nested_functions: HashMap::new(),
};
let refs: Vec<VarRef> = vec![];
let statements: HashMap<u32, String> = HashMap::new();
let result = compute_taint(&empty_cfg, &refs, &statements, Language::Python);
assert!(result.is_err());
}
#[test]
fn test_detect_sources_edge_cases() {
let sources = detect_sources("", 1, Language::Python);
assert!(sources.is_empty());
let sources = detect_sources("print('hello')", 1, Language::Python);
assert!(sources.is_empty());
let sources = detect_sources("x = input() + os.environ['KEY']", 1, Language::Python);
let _ = sources;
}