pub mod propagation;
pub mod sinks;
pub mod sources;
pub mod types;
pub use propagation::{
FunctionTaintSummary, ImplicitFlowContext, NodeId, PropagationConfig, PropagationEngine,
PropagationRule, PropagationRules, TaintFlow, TaintTraceResult, TaintedDFG, TaintedEdge,
trace_taint_from_line,
};
pub use sinks::{
get_go_sinks, get_python_sinks, get_rust_sinks, get_sinks_for_language, get_typescript_sinks,
SinkCategory, SinkRegistry, TaintSink,
};
pub use sources::{
get_go_sources, get_python_sources, get_rust_sources, get_sources_for_language,
get_typescript_sources, MatchStrategy, PythonSourceDetector, SourceRegistry, TaintSource,
TypeScriptSourceDetector,
};
pub use types::{
Location, PropagationStep, TaintLabel, TaintPropagation, TaintState, TaintedValue,
};
pub fn analyze_function_taint(
dfg: &crate::dfg::types::DFGInfo,
language: &str,
file_path: &str,
) -> Vec<TaintFlow> {
let sources = get_sources_for_language(language);
let sinks = get_sinks_for_language(language);
let mut tainted_dfg = TaintedDFG::from_dfg(dfg, file_path);
for edge in &dfg.edges {
for source in sources.all_sources() {
if TaintSource::matches(source, &edge.variable) {
tainted_dfg.mark_source(edge.from_line, source.label.clone());
}
}
}
for edge in &dfg.edges {
for sink in sinks.all_sinks() {
if edge.variable.contains(sink.pattern()) {
tainted_dfg.mark_sink(edge.to_line);
}
}
}
let mut engine = PropagationEngine::new();
engine.analyze_dfg(&mut tainted_dfg)
}
pub fn is_taint_source(pattern: &str, language: &str) -> Vec<TaintLabel> {
let sources = get_sources_for_language(language);
sources
.find_matches(pattern)
.iter()
.map(|s| s.label.clone())
.collect()
}
pub fn is_taint_sink(pattern: &str, language: &str) -> Vec<SinkCategory> {
let sinks = get_sinks_for_language(language);
sinks
.find_matches(pattern)
.iter()
.map(|s| s.category)
.collect()
}
pub fn get_sanitizers_for_sink(category: SinkCategory) -> &'static [&'static str] {
category.recommended_sanitizers()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_is_taint_source() {
let labels = is_taint_source("request.args.get('id')", "python");
assert!(!labels.is_empty());
assert!(labels.contains(&TaintLabel::UserInput));
let labels = is_taint_source("req.body.username", "typescript");
assert!(!labels.is_empty());
assert!(labels.contains(&TaintLabel::UserInput));
let labels = is_taint_source("some_random_function", "python");
assert!(labels.is_empty());
}
#[test]
fn test_is_taint_sink() {
let categories = is_taint_sink("cursor.execute", "python");
assert!(!categories.is_empty());
assert!(categories.contains(&SinkCategory::SqlInjection));
let categories = is_taint_sink("subprocess.run", "python");
assert!(!categories.is_empty());
assert!(categories.contains(&SinkCategory::CommandInjection));
let categories = is_taint_sink("safe_function", "python");
assert!(categories.is_empty());
}
#[test]
fn test_get_sanitizers_for_sink() {
let sanitizers = get_sanitizers_for_sink(SinkCategory::SqlInjection);
assert!(sanitizers.contains(&"parameterized_query"));
let sanitizers = get_sanitizers_for_sink(SinkCategory::XSS);
assert!(sanitizers.contains(&"html_escape"));
let sanitizers = get_sanitizers_for_sink(SinkCategory::CommandInjection);
assert!(sanitizers.contains(&"shell_escape"));
}
#[test]
fn test_module_integration() {
let mut engine = PropagationEngine::new();
let loc1 = Location::new("test.py", 1, 1);
let loc2 = Location::new("test.py", 2, 1);
let sink_loc = Location::new("test.py", 10, 1);
engine.introduce_taint("user_input", TaintLabel::UserInput, loc1, None);
engine.propagate_assignment("query", "user_input", loc2);
engine.check_sink("query", "sql_query", sink_loc);
assert_eq!(engine.findings().len(), 1);
let finding = &engine.findings()[0];
assert!(finding.labels.contains(&TaintLabel::UserInput));
assert_eq!(finding.sink_type, "sql_query");
}
#[test]
fn test_end_to_end_python() {
let sources = get_python_sources();
let sinks = get_python_sinks();
let request_matches = sources.find_matches("request.args.get");
assert!(!request_matches.is_empty());
assert!(request_matches
.iter()
.any(|s| s.label == TaintLabel::UserInput));
let sql_matches = sinks.find_matches("cursor.execute");
assert!(!sql_matches.is_empty());
assert!(sql_matches
.iter()
.any(|s| s.category == SinkCategory::SqlInjection));
let sql_sink = sql_matches.iter().find(|s| s.category == SinkCategory::SqlInjection).unwrap();
assert!(sql_sink.accepts_sanitizer(sinks::SanitizerContext::SqlParameterized));
assert!(!sql_sink.accepts_sanitizer(sinks::SanitizerContext::HtmlEscape));
}
#[test]
fn test_end_to_end_typescript() {
let sources = get_typescript_sources();
let sinks = get_typescript_sinks();
let body_matches = sources.find_matches("req.body");
assert!(!body_matches.is_empty());
let xss_matches = sinks.find_matches("innerHTML");
assert!(!xss_matches.is_empty());
assert!(xss_matches
.iter()
.any(|s| s.category == SinkCategory::XSS));
}
#[test]
fn test_taint_state_operations() {
let mut state = TaintState::new();
let loc = Location::new("test.py", 1, 1);
state.set_variable("x", TaintedValue::new(TaintLabel::UserInput, loc.clone()));
assert!(state.is_variable_tainted("x"));
assert!(!state.is_variable_tainted("y"));
state.set_property(
"obj",
"data",
TaintedValue::new(TaintLabel::FileContent, loc.clone()),
);
let prop_taint = state.get_property("obj", "data");
assert!(prop_taint.is_some());
assert!(prop_taint.unwrap().has_label(&TaintLabel::FileContent));
state.set_collection(
"arr",
TaintedValue::new(TaintLabel::NetworkData, loc),
);
let coll_taint = state.get_collection("arr");
assert!(coll_taint.is_some());
assert!(coll_taint.unwrap().has_label(&TaintLabel::NetworkData));
}
}