use super::dominators::{self, dominates};
use super::rules;
use super::{AnalysisContext, CfgAnalysis, CfgFinding, Confidence, is_entry_point_func};
use crate::cfg::StmtKind;
use crate::labels::{Cap, DataLabel, RuntimeLabelRule};
use crate::patterns::Severity;
use petgraph::graph::NodeIndex;
pub struct UnguardedSink;
fn is_all_args_constant(ctx: &AnalysisContext, sink: NodeIndex) -> bool {
let sink_info = &ctx.cfg[sink];
let callee_desc = sink_info.callee.as_deref().unwrap_or("");
let callee_parts: Vec<&str> = callee_desc.split(['.', ':']).collect();
let sink_func = sink_info.enclosing_func.as_deref();
sink_info.uses.iter().all(|u| {
if callee_parts.contains(&u.as_str()) {
return true;
}
for idx in ctx.cfg.node_indices() {
let info = &ctx.cfg[idx];
if info.enclosing_func.as_deref() != sink_func {
continue;
}
if info.defines.as_deref() == Some(u.as_str()) {
if info.uses.is_empty() && !matches!(info.label, Some(DataLabel::Source(_))) {
return true;
}
}
}
false
})
}
fn match_config_sanitizer(callee: &str, extra: &[RuntimeLabelRule]) -> Option<Cap> {
let callee_lower = callee.to_ascii_lowercase();
for rule in extra {
let cap = match rule.label {
DataLabel::Sanitizer(c) => c,
_ => continue,
};
for m in &rule.matchers {
let ml = m.to_ascii_lowercase();
if ml.ends_with('_') {
if callee_lower.starts_with(&ml) {
return Some(cap);
}
} else if callee_lower.ends_with(&ml) {
return Some(cap);
}
}
}
None
}
fn find_guard_nodes(ctx: &AnalysisContext) -> Vec<(NodeIndex, Cap)> {
let guard_rules = rules::guard_rules(ctx.lang);
let config_rules = ctx
.analysis_rules
.map(|r| r.extra_labels.as_slice())
.unwrap_or(&[]);
let mut result = Vec::new();
for idx in ctx.cfg.node_indices() {
let info = &ctx.cfg[idx];
if info.kind != StmtKind::Call {
continue;
}
if let Some(callee) = &info.callee {
if let Some(cap) = match_config_sanitizer(callee, config_rules) {
result.push((idx, cap));
continue;
}
let callee_lower = callee.to_ascii_lowercase();
for rule in guard_rules {
let matched = rule.matchers.iter().any(|m| {
let ml = m.to_ascii_lowercase();
if ml.ends_with('_') {
callee_lower.starts_with(&ml)
} else {
callee_lower.ends_with(&ml)
}
});
if matched {
result.push((idx, rule.applies_to_sink_caps));
break;
}
}
}
}
result
}
fn taint_confirms_sink(ctx: &AnalysisContext, sink: NodeIndex) -> bool {
ctx.taint_findings.iter().any(|f| f.sink == sink)
}
fn sink_arg_is_source_derived(ctx: &AnalysisContext, sink: NodeIndex) -> bool {
let sink_info = &ctx.cfg[sink];
let sink_func = sink_info.enclosing_func.as_deref();
let sink_uses = &sink_info.uses;
if sink_uses.is_empty() {
return false;
}
for idx in ctx.cfg.node_indices() {
let info = &ctx.cfg[idx];
if info.enclosing_func.as_deref() != sink_func {
continue;
}
if !matches!(info.label, Some(DataLabel::Source(_))) {
continue;
}
if let Some(def) = &info.defines
&& sink_uses.iter().any(|u| u == def)
{
return true;
}
}
false
}
fn sink_arg_is_parameter_only(ctx: &AnalysisContext, sink: NodeIndex) -> bool {
let sink_info = &ctx.cfg[sink];
let sink_func = sink_info.enclosing_func.as_deref();
let sink_uses = &sink_info.uses;
if sink_uses.is_empty() {
return true; }
let param_names: Vec<&str> = ctx
.func_summaries
.values()
.filter(|s| {
ctx.cfg[s.entry].enclosing_func.as_deref() == sink_func
})
.flat_map(|s| s.param_names.iter().map(|p| p.as_str()))
.collect();
if param_names.is_empty() {
return false; }
sink_uses.iter().all(|u| param_names.contains(&u.as_str()))
}
fn sink_in_entrypoint(ctx: &AnalysisContext, sink: NodeIndex) -> bool {
let sink_info = &ctx.cfg[sink];
if let Some(func_name) = &sink_info.enclosing_func {
is_entry_point_func(func_name, ctx.lang)
} else {
false
}
}
impl CfgAnalysis for UnguardedSink {
fn name(&self) -> &'static str {
"unguarded-sink"
}
fn run(&self, ctx: &AnalysisContext) -> Vec<CfgFinding> {
let doms = dominators::compute_dominators(ctx.cfg, ctx.entry);
let sink_nodes = dominators::find_sink_nodes(ctx.cfg);
let guard_nodes = find_guard_nodes(ctx);
let mut findings = Vec::new();
for sink in &sink_nodes {
let sink_info = &ctx.cfg[*sink];
let sink_caps = match sink_info.label {
Some(DataLabel::Sink(caps)) => caps,
_ => continue,
};
let sink_func = sink_info.enclosing_func.as_deref();
let is_guarded = guard_nodes.iter().any(|(guard_idx, guard_caps)| {
let guard_func = ctx.cfg[*guard_idx].enclosing_func.as_deref();
(*guard_caps & sink_caps) != Cap::empty()
&& guard_func == sink_func
&& dominates(&doms, *guard_idx, *sink)
});
let has_sanitizer = ctx.cfg.node_indices().any(|idx| {
let node_func = ctx.cfg[idx].enclosing_func.as_deref();
if let Some(DataLabel::Sanitizer(san_caps)) = ctx.cfg[idx].label {
(san_caps & sink_caps) != Cap::empty()
&& node_func == sink_func
&& dominates(&doms, idx, *sink)
} else {
false
}
});
if is_guarded || has_sanitizer {
continue;
}
let callee_desc = sink_info.callee.as_deref().unwrap_or("(unknown sink)");
let has_taint = taint_confirms_sink(ctx, *sink);
let source_derived = sink_arg_is_source_derived(ctx, *sink);
if is_all_args_constant(ctx, *sink) && !has_taint && !source_derived {
continue;
}
let param_only = sink_arg_is_parameter_only(ctx, *sink);
let in_entrypoint = sink_in_entrypoint(ctx, *sink);
let (severity, confidence) = if has_taint || source_derived {
(Severity::High, Confidence::High)
} else if param_only && !in_entrypoint {
(Severity::Low, Confidence::Low)
} else if !ctx.taint_active && !source_derived {
(Severity::Low, Confidence::Low)
} else if in_entrypoint && !param_only {
(Severity::Medium, Confidence::Medium)
} else {
(Severity::Medium, Confidence::Medium)
};
findings.push(CfgFinding {
rule_id: "cfg-unguarded-sink".to_string(),
title: "Unguarded sink".to_string(),
severity,
confidence,
span: sink_info.span,
message: format!("Sink `{callee_desc}` has no dominating guard or sanitizer"),
evidence: vec![*sink],
score: None,
});
}
findings
}
}