pub mod go;
pub mod python;
pub mod rust_lang;
pub mod typescript;
pub use go::get_go_sinks;
pub use python::get_python_sinks;
pub use rust_lang::get_rust_sinks;
pub use typescript::get_typescript_sinks;
pub type TaintSink = Sink;
use crate::security::taint::types::{Location, TaintLabel};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum SinkCategory {
SqlInjection,
CommandInjection,
XSS,
PathTraversal,
CodeExecution,
SSRF,
LogInjection,
HeaderInjection,
XPathInjection,
LdapInjection,
TemplateInjection,
Deserialization,
XmlInjection,
OpenRedirect,
RegexInjection,
MemoryCorruption,
EmailHeaderInjection,
NoSqlInjection,
}
impl SinkCategory {
#[inline]
pub const fn cwe_id(&self) -> &'static str {
match self {
Self::SqlInjection => "CWE-89",
Self::CommandInjection => "CWE-78",
Self::XSS => "CWE-79",
Self::PathTraversal => "CWE-22",
Self::CodeExecution => "CWE-94",
Self::SSRF => "CWE-918",
Self::LogInjection => "CWE-117",
Self::HeaderInjection => "CWE-113",
Self::XPathInjection => "CWE-643",
Self::LdapInjection => "CWE-90",
Self::TemplateInjection => "CWE-1336",
Self::Deserialization => "CWE-502",
Self::XmlInjection => "CWE-611",
Self::OpenRedirect => "CWE-601",
Self::RegexInjection => "CWE-1333",
Self::MemoryCorruption => "CWE-120",
Self::EmailHeaderInjection => "CWE-93",
Self::NoSqlInjection => "CWE-943",
}
}
#[inline]
pub const fn description(&self) -> &'static str {
match self {
Self::SqlInjection => "SQL Injection",
Self::CommandInjection => "OS Command Injection",
Self::XSS => "Cross-Site Scripting (XSS)",
Self::PathTraversal => "Path Traversal",
Self::CodeExecution => "Code Injection",
Self::SSRF => "Server-Side Request Forgery (SSRF)",
Self::LogInjection => "Log Injection",
Self::HeaderInjection => "HTTP Header Injection",
Self::XPathInjection => "XPath Injection",
Self::LdapInjection => "LDAP Injection",
Self::TemplateInjection => "Server-Side Template Injection (SSTI)",
Self::Deserialization => "Insecure Deserialization",
Self::XmlInjection => "XML External Entity (XXE) Injection",
Self::OpenRedirect => "Open Redirect",
Self::RegexInjection => "Regular Expression Injection (ReDoS)",
Self::MemoryCorruption => "Buffer Overflow / Memory Corruption",
Self::EmailHeaderInjection => "Email Header Injection",
Self::NoSqlInjection => "NoSQL Injection",
}
}
#[inline]
pub const fn base_severity(&self) -> u8 {
match self {
Self::CodeExecution => 10,
Self::CommandInjection => 10,
Self::MemoryCorruption => 10,
Self::Deserialization => 9,
Self::SqlInjection => 9,
Self::TemplateInjection => 9,
Self::SSRF => 8,
Self::PathTraversal => 8,
Self::XmlInjection => 8,
Self::XSS => 7,
Self::LdapInjection => 7,
Self::NoSqlInjection => 7,
Self::XPathInjection => 6,
Self::HeaderInjection => 6,
Self::OpenRedirect => 5,
Self::EmailHeaderInjection => 5,
Self::RegexInjection => 4,
Self::LogInjection => 3,
}
}
#[inline]
pub const fn owasp_category(&self) -> &'static str {
match self {
Self::SqlInjection
| Self::CommandInjection
| Self::XPathInjection
| Self::LdapInjection
| Self::NoSqlInjection
| Self::CodeExecution => "A03:2021 - Injection",
Self::XSS => "A03:2021 - Injection",
Self::Deserialization | Self::XmlInjection => {
"A08:2021 - Software and Data Integrity Failures"
}
Self::PathTraversal => "A01:2021 - Broken Access Control",
Self::SSRF => "A10:2021 - Server-Side Request Forgery",
Self::TemplateInjection => "A03:2021 - Injection",
Self::HeaderInjection | Self::LogInjection | Self::EmailHeaderInjection => {
"A03:2021 - Injection"
}
Self::OpenRedirect => "A01:2021 - Broken Access Control",
Self::RegexInjection => "A03:2021 - Injection",
Self::MemoryCorruption => "A03:2021 - Injection",
}
}
pub const fn all() -> &'static [Self] {
&[
Self::SqlInjection,
Self::CommandInjection,
Self::XSS,
Self::PathTraversal,
Self::CodeExecution,
Self::SSRF,
Self::LogInjection,
Self::HeaderInjection,
Self::XPathInjection,
Self::LdapInjection,
Self::TemplateInjection,
Self::Deserialization,
Self::XmlInjection,
Self::OpenRedirect,
Self::RegexInjection,
Self::MemoryCorruption,
Self::EmailHeaderInjection,
Self::NoSqlInjection,
]
}
pub const fn recommended_sanitizers(&self) -> &'static [&'static str] {
match self {
Self::SqlInjection | Self::NoSqlInjection => &[
"parameterized_query", "prepared_statement", "sqlalchemy_text",
"django_orm", "escape_string", "quote_literal",
],
Self::CommandInjection => &[
"shlex_quote", "shlex_split", "subprocess_list", "shell_escape",
"escapeshellarg", "escapeshellcmd",
],
Self::XSS => &[
"html_escape", "escape", "sanitize_html", "DOMPurify",
"bleach_clean", "markupsafe_escape", "cgi_escape",
],
Self::PathTraversal => &[
"path_join", "realpath", "abspath", "normpath",
"secure_filename", "basename", "safe_join",
],
Self::CodeExecution => &[
"ast_literal_eval", "json_parse", "safe_eval",
],
Self::SSRF => &[
"url_validate", "domain_allowlist", "scheme_validate",
"is_private_ip", "url_parse",
],
Self::LogInjection => &[
"log_escape", "replace_newlines", "sanitize_log",
],
Self::HeaderInjection => &[
"header_escape", "remove_crlf", "validate_header",
],
Self::XPathInjection | Self::LdapInjection => &[
"escape_filter", "parameterized_xpath", "ldap_escape",
],
Self::TemplateInjection => &[
"autoescape", "sandbox_template", "safe_render",
],
Self::Deserialization => &[
"json_loads", "safe_load", "restricted_deserialize",
],
Self::XmlInjection => &[
"defusedxml", "disable_external_entities", "lxml_safe_parse",
],
Self::OpenRedirect => &[
"url_for", "is_safe_url", "validate_redirect",
],
Self::RegexInjection => &[
"re_escape", "regex_escape", "literal_pattern",
],
Self::MemoryCorruption => &[
"bounds_check", "safe_alloc", "size_validate",
],
Self::EmailHeaderInjection => &[
"email_escape", "validate_email", "header_encode",
],
}
}
}
impl std::fmt::Display for SinkCategory {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.description())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum SanitizerContext {
SqlParameterized,
SqlEscape,
SqlOrm,
ShellEscape,
CommandList,
CommandAllowlist,
HtmlEscape,
DomSanitize,
ContentSecurityPolicy,
JavaScriptEscape,
UrlEncode,
CssEscape,
PathCanonicalize,
PathBasename,
PathAllowlist,
PathSafeJoin,
LdapDnEscape,
LdapFilterEscape,
XPathEscape,
XPathParameterized,
UrlSchemeValidation,
UrlDomainAllowlist,
UrlPrivateIpBlock,
HeaderCrlfRemove,
HeaderEncode,
LogNewlineRemove,
LogEncode,
XmlEntityEncode,
XmlDisableExternalEntities,
TemplateSandbox,
TemplateAutoescape,
DeserializeTypeAllowlist,
DeserializeSignatureVerify,
DeserializeSafeFormat,
RegexEscape,
RegexTimeout,
EmailHeaderSanitize,
InputAllowlist,
InputDenylist,
TypeValidation,
LengthLimit,
}
impl SanitizerContext {
pub fn effective_against(&self) -> &'static [SinkCategory] {
match self {
Self::SqlParameterized | Self::SqlEscape | Self::SqlOrm => {
&[SinkCategory::SqlInjection]
}
Self::ShellEscape | Self::CommandList | Self::CommandAllowlist => {
&[SinkCategory::CommandInjection]
}
Self::HtmlEscape | Self::DomSanitize => &[SinkCategory::XSS],
Self::ContentSecurityPolicy => &[SinkCategory::XSS], Self::JavaScriptEscape => &[SinkCategory::XSS],
Self::UrlEncode => &[SinkCategory::XSS, SinkCategory::OpenRedirect],
Self::CssEscape => &[SinkCategory::XSS],
Self::PathCanonicalize | Self::PathBasename | Self::PathAllowlist | Self::PathSafeJoin => {
&[SinkCategory::PathTraversal]
}
Self::LdapDnEscape | Self::LdapFilterEscape => &[SinkCategory::LdapInjection],
Self::XPathEscape | Self::XPathParameterized => &[SinkCategory::XPathInjection],
Self::UrlSchemeValidation | Self::UrlDomainAllowlist | Self::UrlPrivateIpBlock => {
&[SinkCategory::SSRF, SinkCategory::OpenRedirect]
}
Self::HeaderCrlfRemove | Self::HeaderEncode => &[SinkCategory::HeaderInjection],
Self::LogNewlineRemove | Self::LogEncode => &[SinkCategory::LogInjection],
Self::XmlEntityEncode | Self::XmlDisableExternalEntities => {
&[SinkCategory::XmlInjection]
}
Self::TemplateSandbox | Self::TemplateAutoescape => &[SinkCategory::TemplateInjection],
Self::DeserializeTypeAllowlist
| Self::DeserializeSignatureVerify
| Self::DeserializeSafeFormat => &[SinkCategory::Deserialization],
Self::RegexEscape | Self::RegexTimeout => &[SinkCategory::RegexInjection],
Self::EmailHeaderSanitize => &[SinkCategory::EmailHeaderInjection],
Self::InputAllowlist => SinkCategory::all(), Self::InputDenylist => &[], Self::TypeValidation => &[
SinkCategory::SqlInjection,
SinkCategory::CommandInjection,
SinkCategory::PathTraversal,
],
Self::LengthLimit => &[SinkCategory::RegexInjection], }
}
#[inline]
pub fn is_effective_against(&self, category: SinkCategory) -> bool {
self.effective_against().contains(&category)
}
#[inline]
pub const fn strength(&self) -> u8 {
match self {
Self::SqlParameterized => 10,
Self::CommandList => 10,
Self::DomSanitize => 9,
Self::PathCanonicalize => 9,
Self::XmlDisableExternalEntities => 10,
Self::DeserializeSafeFormat => 9,
Self::InputAllowlist => 9,
Self::SqlOrm => 8,
Self::HtmlEscape => 8,
Self::ShellEscape => 8,
Self::PathAllowlist => 8,
Self::UrlDomainAllowlist => 8,
Self::TemplateSandbox => 8,
Self::DeserializeTypeAllowlist => 8,
Self::SqlEscape => 6, Self::CommandAllowlist => 7,
Self::ContentSecurityPolicy => 7,
Self::PathBasename => 7,
Self::PathSafeJoin => 7,
Self::LdapDnEscape => 7,
Self::LdapFilterEscape => 7,
Self::XPathEscape => 7,
Self::XPathParameterized => 8,
Self::HeaderCrlfRemove => 7,
Self::HeaderEncode => 7,
Self::TemplateAutoescape => 7,
Self::DeserializeSignatureVerify => 7,
Self::RegexEscape => 7,
Self::RegexTimeout => 6,
Self::TypeValidation => 7,
Self::JavaScriptEscape => 6,
Self::UrlEncode => 5,
Self::CssEscape => 6,
Self::UrlSchemeValidation => 5,
Self::UrlPrivateIpBlock => 6,
Self::LogNewlineRemove => 5,
Self::LogEncode => 5,
Self::XmlEntityEncode => 5,
Self::EmailHeaderSanitize => 6,
Self::LengthLimit => 4,
Self::InputDenylist => 2, }
}
}
impl std::fmt::Display for SanitizerContext {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Sink {
pub category: SinkCategory,
pub function_pattern: String,
pub dangerous_params: Vec<usize>,
pub sanitizers: Vec<SanitizerContext>,
pub description: String,
pub severity_override: Option<u8>,
pub confidence: f64,
pub receiver_type: Option<String>,
pub is_property: bool,
pub tags: Vec<String>,
}
impl Sink {
pub fn new(pattern: impl Into<String>, category: SinkCategory) -> Self {
Self {
category,
function_pattern: pattern.into(),
dangerous_params: Vec::new(),
sanitizers: get_default_sanitizers(category),
description: category.description().to_string(),
severity_override: None,
confidence: 1.0,
receiver_type: None,
is_property: false,
tags: Vec::new(),
}
}
pub fn sql(pattern: impl Into<String>, description: impl Into<String>) -> Self {
let mut sink = Self::new(pattern, SinkCategory::SqlInjection);
sink.description = description.into();
sink.dangerous_params = vec![0]; sink.sanitizers = vec![
SanitizerContext::SqlParameterized,
SanitizerContext::SqlOrm,
SanitizerContext::SqlEscape,
];
sink
}
pub fn command(pattern: impl Into<String>, description: impl Into<String>) -> Self {
let mut sink = Self::new(pattern, SinkCategory::CommandInjection);
sink.description = description.into();
sink.sanitizers = vec![
SanitizerContext::CommandList,
SanitizerContext::ShellEscape,
SanitizerContext::CommandAllowlist,
];
sink
}
pub fn xss(pattern: impl Into<String>, description: impl Into<String>) -> Self {
let mut sink = Self::new(pattern, SinkCategory::XSS);
sink.description = description.into();
sink.sanitizers = vec![
SanitizerContext::HtmlEscape,
SanitizerContext::DomSanitize,
SanitizerContext::ContentSecurityPolicy,
];
sink
}
pub fn path(pattern: impl Into<String>, description: impl Into<String>) -> Self {
let mut sink = Self::new(pattern, SinkCategory::PathTraversal);
sink.description = description.into();
sink.dangerous_params = vec![0]; sink.sanitizers = vec![
SanitizerContext::PathCanonicalize,
SanitizerContext::PathAllowlist,
SanitizerContext::PathBasename,
SanitizerContext::PathSafeJoin,
];
sink
}
pub fn ssrf(pattern: impl Into<String>, description: impl Into<String>) -> Self {
let mut sink = Self::new(pattern, SinkCategory::SSRF);
sink.description = description.into();
sink.dangerous_params = vec![0]; sink.sanitizers = vec![
SanitizerContext::UrlDomainAllowlist,
SanitizerContext::UrlSchemeValidation,
SanitizerContext::UrlPrivateIpBlock,
];
sink
}
pub fn code_exec(pattern: impl Into<String>, description: impl Into<String>) -> Self {
let mut sink = Self::new(pattern, SinkCategory::CodeExecution);
sink.description = description.into();
sink.severity_override = Some(10);
sink.sanitizers = vec![SanitizerContext::InputAllowlist];
sink
}
pub fn ldap(pattern: impl Into<String>, description: impl Into<String>) -> Self {
let mut sink = Self::new(pattern, SinkCategory::LdapInjection);
sink.description = description.into();
sink.sanitizers = vec![
SanitizerContext::LdapDnEscape,
SanitizerContext::LdapFilterEscape,
];
sink
}
pub fn xpath(pattern: impl Into<String>, description: impl Into<String>) -> Self {
let mut sink = Self::new(pattern, SinkCategory::XPathInjection);
sink.description = description.into();
sink.sanitizers = vec![
SanitizerContext::XPathParameterized,
SanitizerContext::XPathEscape,
];
sink
}
pub fn log(pattern: impl Into<String>, description: impl Into<String>) -> Self {
let mut sink = Self::new(pattern, SinkCategory::LogInjection);
sink.description = description.into();
sink.severity_override = Some(3); sink.sanitizers = vec![
SanitizerContext::LogNewlineRemove,
SanitizerContext::LogEncode,
];
sink
}
pub fn header(pattern: impl Into<String>, description: impl Into<String>) -> Self {
let mut sink = Self::new(pattern, SinkCategory::HeaderInjection);
sink.description = description.into();
sink.sanitizers = vec![
SanitizerContext::HeaderCrlfRemove,
SanitizerContext::HeaderEncode,
];
sink
}
pub fn with_dangerous_params(mut self, params: Vec<usize>) -> Self {
self.dangerous_params = params;
self
}
pub fn with_severity(mut self, severity: u8) -> Self {
self.severity_override = Some(severity.min(10));
self
}
pub fn with_confidence(mut self, confidence: f64) -> Self {
self.confidence = confidence.clamp(0.0, 1.0);
self
}
pub fn with_receiver(mut self, receiver: impl Into<String>) -> Self {
self.receiver_type = Some(receiver.into());
self
}
pub fn as_property(mut self) -> Self {
self.is_property = true;
self
}
pub fn with_sanitizers(mut self, sanitizers: Vec<SanitizerContext>) -> Self {
self.sanitizers = sanitizers;
self
}
pub fn with_tag(mut self, tag: impl Into<String>) -> Self {
self.tags.push(tag.into());
self
}
#[inline]
pub fn severity(&self) -> u8 {
self.severity_override.unwrap_or_else(|| self.category.base_severity())
}
#[inline]
pub fn is_dangerous_param(&self, position: usize) -> bool {
self.dangerous_params.is_empty() || self.dangerous_params.contains(&position)
}
#[inline]
pub fn accepts_sanitizer(&self, context: SanitizerContext) -> bool {
self.sanitizers.contains(&context)
}
pub fn is_dangerous_label(&self, label: &TaintLabel) -> bool {
match self.category {
SinkCategory::SqlInjection | SinkCategory::NoSqlInjection => matches!(
label,
TaintLabel::UserInput
| TaintLabel::HttpHeader
| TaintLabel::Cookie
| TaintLabel::UrlData
| TaintLabel::NetworkData
| TaintLabel::DeserializedData
),
SinkCategory::CommandInjection => matches!(
label,
TaintLabel::UserInput
| TaintLabel::ProcessArgs
| TaintLabel::Environment
| TaintLabel::FileContent
| TaintLabel::NetworkData
),
SinkCategory::XSS => matches!(
label,
TaintLabel::UserInput
| TaintLabel::UrlData
| TaintLabel::NetworkData
| TaintLabel::DatabaseQuery
| TaintLabel::DeserializedData
),
SinkCategory::PathTraversal => matches!(
label,
TaintLabel::UserInput | TaintLabel::UrlData | TaintLabel::NetworkData
),
SinkCategory::CodeExecution | SinkCategory::Deserialization => {
true
}
_ => matches!(
label,
TaintLabel::UserInput | TaintLabel::NetworkData | TaintLabel::UrlData
),
}
}
#[inline]
pub fn pattern(&self) -> &str {
&self.function_pattern
}
}
#[derive(Debug, Clone)]
pub struct CallInfo {
pub function_name: String,
pub receiver: Option<String>,
pub arguments: Vec<String>,
pub file_path: Option<String>,
pub line: Option<usize>,
}
impl CallInfo {
pub fn function(name: impl Into<String>) -> Self {
Self {
function_name: name.into(),
receiver: None,
arguments: Vec::new(),
file_path: None,
line: None,
}
}
pub fn method(receiver: impl Into<String>, name: impl Into<String>) -> Self {
Self {
function_name: name.into(),
receiver: Some(receiver.into()),
arguments: Vec::new(),
file_path: None,
line: None,
}
}
pub fn with_args(mut self, args: Vec<String>) -> Self {
self.arguments = args;
self
}
pub fn with_location(mut self, file: impl Into<String>, line: usize) -> Self {
self.file_path = Some(file.into());
self.line = Some(line);
self
}
pub fn qualified_name(&self) -> String {
match &self.receiver {
Some(recv) => format!("{}.{}", recv, self.function_name),
None => self.function_name.clone(),
}
}
}
#[derive(Debug, Default)]
pub struct SinkRegistry {
sinks: Vec<Sink>,
by_category: HashMap<SinkCategory, Vec<usize>>,
by_pattern: HashMap<String, Vec<usize>>,
}
impl SinkRegistry {
pub fn new() -> Self {
Self::default()
}
pub fn add(&mut self, sink: Sink) {
let idx = self.sinks.len();
let category = sink.category;
let pattern = sink.function_pattern.clone();
self.by_category.entry(category).or_default().push(idx);
let pattern_key = pattern
.split(&['.', '(', ' '][..])
.next()
.unwrap_or(&pattern)
.to_string();
self.by_pattern.entry(pattern_key).or_default().push(idx);
self.sinks.push(sink);
}
pub fn find_matches(&self, pattern: &str) -> Vec<&Sink> {
self.sinks
.iter()
.filter(|s| {
s.function_pattern == pattern
|| pattern.contains(&s.function_pattern)
|| s.function_pattern.contains(pattern)
})
.collect()
}
pub fn find_matches_for_call(&self, call: &CallInfo) -> Vec<&Sink> {
let qualified = call.qualified_name();
let func_name = &call.function_name;
self.sinks
.iter()
.filter(|s| {
qualified.contains(&s.function_pattern)
|| s.function_pattern.contains(&qualified)
|| func_name.contains(&s.function_pattern)
|| s.function_pattern.contains(func_name)
})
.collect()
}
pub fn sinks_for_category(&self, category: SinkCategory) -> Vec<&Sink> {
self.by_category
.get(&category)
.map(|indices| indices.iter().map(|&i| &self.sinks[i]).collect())
.unwrap_or_default()
}
pub fn all_sinks(&self) -> &[Sink] {
&self.sinks
}
pub fn len(&self) -> usize {
self.sinks.len()
}
pub fn is_empty(&self) -> bool {
self.sinks.is_empty()
}
pub fn categories(&self) -> Vec<SinkCategory> {
self.by_category.keys().copied().collect()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SanitizationEvent {
pub context: SanitizerContext,
pub location: Location,
pub function_name: String,
}
#[derive(Debug, Clone)]
pub struct SanitizationResult {
pub is_sanitized: bool,
pub applied_sanitizers: Vec<SanitizationEvent>,
pub missing_sanitizers: Vec<SanitizerContext>,
pub partial_sanitization_warning: Option<String>,
}
pub fn find_sinks(call: &CallInfo, lang: &str) -> Vec<Sink> {
let registry = get_sinks_for_language(lang);
registry
.find_matches_for_call(call)
.into_iter()
.cloned()
.collect()
}
pub fn check_sanitization(
sanitization_events: &[SanitizationEvent],
target_sink: SinkCategory,
) -> SanitizationResult {
let applied: Vec<SanitizerContext> = sanitization_events.iter().map(|e| e.context).collect();
let effective_sanitizers: Vec<SanitizerContext> = applied
.iter()
.filter(|ctx| ctx.is_effective_against(target_sink))
.copied()
.collect();
let is_sanitized = !effective_sanitizers.is_empty();
let default_sanitizers = get_default_sanitizers(target_sink);
let missing: Vec<SanitizerContext> = default_sanitizers
.iter()
.copied()
.filter(|s| !applied.contains(s))
.collect();
let partial_warning = if !applied.is_empty() && effective_sanitizers.is_empty() {
let applied_names: Vec<String> = applied.iter().map(|s| format!("{:?}", s)).collect();
Some(format!(
"Partial sanitization detected: {} applied but not effective against {}. \
Consider using: {:?}",
applied_names.join(", "),
target_sink.description(),
&default_sanitizers
))
} else {
None
};
SanitizationResult {
is_sanitized,
applied_sanitizers: sanitization_events.to_vec(),
missing_sanitizers: missing,
partial_sanitization_warning: partial_warning,
}
}
pub fn is_sanitized_for_context(
applied_contexts: &[SanitizerContext],
target: SinkCategory,
) -> bool {
applied_contexts
.iter()
.any(|ctx| ctx.is_effective_against(target))
}
pub fn get_sinks_for_language(language: &str) -> SinkRegistry {
match language.to_lowercase().as_str() {
"python" | "py" => python::get_python_sinks(),
"typescript" | "ts" | "javascript" | "js" | "tsx" | "jsx" => {
typescript::get_typescript_sinks()
}
"go" | "golang" => go::get_go_sinks(),
"rust" | "rs" => rust_lang::get_rust_sinks(),
_ => SinkRegistry::new(),
}
}
fn get_default_sanitizers(category: SinkCategory) -> Vec<SanitizerContext> {
match category {
SinkCategory::SqlInjection => vec![
SanitizerContext::SqlParameterized,
SanitizerContext::SqlOrm,
SanitizerContext::SqlEscape,
],
SinkCategory::CommandInjection => vec![
SanitizerContext::CommandList,
SanitizerContext::ShellEscape,
SanitizerContext::CommandAllowlist,
],
SinkCategory::XSS => vec![
SanitizerContext::HtmlEscape,
SanitizerContext::DomSanitize,
SanitizerContext::ContentSecurityPolicy,
SanitizerContext::JavaScriptEscape,
],
SinkCategory::PathTraversal => vec![
SanitizerContext::PathCanonicalize,
SanitizerContext::PathAllowlist,
SanitizerContext::PathBasename,
SanitizerContext::PathSafeJoin,
],
SinkCategory::CodeExecution => vec![SanitizerContext::InputAllowlist],
SinkCategory::SSRF => vec![
SanitizerContext::UrlDomainAllowlist,
SanitizerContext::UrlSchemeValidation,
SanitizerContext::UrlPrivateIpBlock,
],
SinkCategory::LogInjection => vec![
SanitizerContext::LogNewlineRemove,
SanitizerContext::LogEncode,
],
SinkCategory::HeaderInjection => vec![
SanitizerContext::HeaderCrlfRemove,
SanitizerContext::HeaderEncode,
],
SinkCategory::XPathInjection => vec![
SanitizerContext::XPathParameterized,
SanitizerContext::XPathEscape,
],
SinkCategory::LdapInjection => vec![
SanitizerContext::LdapDnEscape,
SanitizerContext::LdapFilterEscape,
],
SinkCategory::TemplateInjection => vec![
SanitizerContext::TemplateSandbox,
SanitizerContext::TemplateAutoescape,
],
SinkCategory::Deserialization => vec![
SanitizerContext::DeserializeSafeFormat,
SanitizerContext::DeserializeTypeAllowlist,
SanitizerContext::DeserializeSignatureVerify,
],
SinkCategory::XmlInjection => vec![
SanitizerContext::XmlDisableExternalEntities,
SanitizerContext::XmlEntityEncode,
],
SinkCategory::OpenRedirect => vec![
SanitizerContext::UrlDomainAllowlist,
SanitizerContext::UrlSchemeValidation,
],
SinkCategory::RegexInjection => vec![
SanitizerContext::RegexEscape,
SanitizerContext::RegexTimeout,
SanitizerContext::LengthLimit,
],
SinkCategory::MemoryCorruption => vec![
SanitizerContext::TypeValidation,
SanitizerContext::LengthLimit,
],
SinkCategory::EmailHeaderInjection => vec![SanitizerContext::EmailHeaderSanitize],
SinkCategory::NoSqlInjection => vec![
SanitizerContext::InputAllowlist,
SanitizerContext::TypeValidation,
],
}
}
pub fn recognize_sanitizer(function_name: &str, lang: &str) -> Vec<SanitizerContext> {
let name_lower = function_name.to_lowercase();
match lang.to_lowercase().as_str() {
"python" | "py" => recognize_python_sanitizer(&name_lower),
"typescript" | "ts" | "javascript" | "js" => recognize_typescript_sanitizer(&name_lower),
"go" | "golang" => recognize_go_sanitizer(&name_lower),
"rust" | "rs" => recognize_rust_sanitizer(&name_lower),
_ => Vec::new(),
}
}
fn recognize_python_sanitizer(name: &str) -> Vec<SanitizerContext> {
let mut contexts = Vec::new();
if name.contains("parameterized")
|| name.contains("prepared")
|| name.contains("bind")
|| name.contains("mogrify")
{
contexts.push(SanitizerContext::SqlParameterized);
}
if name.contains("escape") && (name.contains("html") || name.contains("markup")) {
contexts.push(SanitizerContext::HtmlEscape);
}
if name == "escape" || name == "e" || name.contains("html.escape") {
contexts.push(SanitizerContext::HtmlEscape);
}
if name.contains("bleach") || name.contains("sanitize") {
contexts.push(SanitizerContext::DomSanitize);
}
if name.contains("shlex.quote") || name.contains("quote") {
contexts.push(SanitizerContext::ShellEscape);
}
if name.contains("realpath")
|| name.contains("abspath")
|| name.contains("normpath")
|| name.contains("resolve")
{
contexts.push(SanitizerContext::PathCanonicalize);
}
if name.contains("basename") {
contexts.push(SanitizerContext::PathBasename);
}
if name.contains("urlencode") || name.contains("quote_plus") {
contexts.push(SanitizerContext::UrlEncode);
}
contexts
}
fn recognize_typescript_sanitizer(name: &str) -> Vec<SanitizerContext> {
let mut contexts = Vec::new();
if name.contains("dompurify")
|| name.contains("sanitize")
|| name.contains("createtextnode")
|| name.contains("textcontent")
{
contexts.push(SanitizerContext::DomSanitize);
}
if name.contains("encodeuricomponent") || name.contains("encodeuri") {
contexts.push(SanitizerContext::UrlEncode);
}
if name.contains("escape") && name.contains("html") {
contexts.push(SanitizerContext::HtmlEscape);
}
if name.contains("path.normalize") || name.contains("path.resolve") {
contexts.push(SanitizerContext::PathCanonicalize);
}
if name.contains("path.basename") {
contexts.push(SanitizerContext::PathBasename);
}
if name.contains("prepare") || name.contains("parameterized") {
contexts.push(SanitizerContext::SqlParameterized);
}
contexts
}
fn recognize_go_sanitizer(name: &str) -> Vec<SanitizerContext> {
let mut contexts = Vec::new();
if name.contains("template.htmlescapestring")
|| name.contains("html.escapestring")
|| name.contains("htmlescape")
{
contexts.push(SanitizerContext::HtmlEscape);
}
if name.contains("filepath.clean") || name.contains("filepath.abs") {
contexts.push(SanitizerContext::PathCanonicalize);
}
if name.contains("filepath.base") {
contexts.push(SanitizerContext::PathBasename);
}
if name.contains("url.queryescape") || name.contains("url.pathescape") {
contexts.push(SanitizerContext::UrlEncode);
}
contexts
}
fn recognize_rust_sanitizer(name: &str) -> Vec<SanitizerContext> {
let mut contexts = Vec::new();
if name.contains("encode_safe")
|| name.contains("encode_minimal")
|| name.contains("html_escape")
{
contexts.push(SanitizerContext::HtmlEscape);
}
if name.contains("ammonia") || name.contains("sanitize") {
contexts.push(SanitizerContext::DomSanitize);
}
if name.contains("canonicalize") {
contexts.push(SanitizerContext::PathCanonicalize);
}
if name.contains("file_name") || name.contains("filename") {
contexts.push(SanitizerContext::PathBasename);
}
if name.contains("sqlx::query!") || name.contains("query_as!") {
contexts.push(SanitizerContext::SqlParameterized);
}
contexts
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_sink_category_metadata() {
assert_eq!(SinkCategory::SqlInjection.cwe_id(), "CWE-89");
assert_eq!(SinkCategory::CommandInjection.base_severity(), 10);
assert!(SinkCategory::XSS.base_severity() > 0);
}
#[test]
fn test_sanitizer_effectiveness() {
assert!(SanitizerContext::HtmlEscape.is_effective_against(SinkCategory::XSS));
assert!(!SanitizerContext::HtmlEscape.is_effective_against(SinkCategory::SqlInjection));
assert!(SanitizerContext::SqlParameterized.is_effective_against(SinkCategory::SqlInjection));
assert!(!SanitizerContext::SqlParameterized.is_effective_against(SinkCategory::XSS));
}
#[test]
fn test_partial_sanitization_detection() {
let events = vec![SanitizationEvent {
context: SanitizerContext::HtmlEscape,
location: Location::new("test.py", 10, 1),
function_name: "html.escape".to_string(),
}];
let result = check_sanitization(&events, SinkCategory::SqlInjection);
assert!(!result.is_sanitized);
assert!(result.partial_sanitization_warning.is_some());
let events2 = vec![SanitizationEvent {
context: SanitizerContext::SqlParameterized,
location: Location::new("test.py", 10, 1),
function_name: "cursor.execute".to_string(),
}];
let result2 = check_sanitization(&events2, SinkCategory::SqlInjection);
assert!(result2.is_sanitized);
assert!(result2.partial_sanitization_warning.is_none());
}
#[test]
fn test_find_sinks() {
let call = CallInfo::method("cursor", "execute");
let sinks = find_sinks(&call, "python");
assert!(!sinks.is_empty());
assert!(sinks.iter().any(|s| s.category == SinkCategory::SqlInjection));
}
#[test]
fn test_sink_dangerous_params() {
let sql_sink = Sink::sql("execute", "test");
assert!(sql_sink.is_dangerous_param(0));
assert!(!sql_sink.is_dangerous_param(1));
let cmd_sink = Sink::command("exec", "test");
assert!(cmd_sink.is_dangerous_param(0));
assert!(cmd_sink.is_dangerous_param(99));
}
#[test]
fn test_sink_accepts_sanitizer() {
let sql_sink = Sink::sql("execute", "test");
assert!(sql_sink.accepts_sanitizer(SanitizerContext::SqlParameterized));
assert!(!sql_sink.accepts_sanitizer(SanitizerContext::HtmlEscape));
let xss_sink = Sink::xss("innerHTML", "test");
assert!(xss_sink.accepts_sanitizer(SanitizerContext::HtmlEscape));
assert!(!xss_sink.accepts_sanitizer(SanitizerContext::SqlParameterized));
}
#[test]
fn test_call_info_qualified_name() {
let func_call = CallInfo::function("eval");
assert_eq!(func_call.qualified_name(), "eval");
let method_call = CallInfo::method("cursor", "execute");
assert_eq!(method_call.qualified_name(), "cursor.execute");
}
#[test]
fn test_is_sanitized_for_context() {
let applied = vec![SanitizerContext::HtmlEscape];
assert!(is_sanitized_for_context(&applied, SinkCategory::XSS));
assert!(!is_sanitized_for_context(&applied, SinkCategory::SqlInjection));
}
#[test]
fn test_recognize_python_sanitizer() {
let contexts = recognize_sanitizer("html.escape", "python");
assert!(contexts.contains(&SanitizerContext::HtmlEscape));
let contexts = recognize_sanitizer("shlex.quote", "python");
assert!(contexts.contains(&SanitizerContext::ShellEscape));
}
#[test]
fn test_sink_registry() {
let mut registry = SinkRegistry::new();
registry.add(Sink::sql("execute", "test"));
registry.add(Sink::command("system", "test"));
assert_eq!(registry.len(), 2);
let sql_sinks = registry.sinks_for_category(SinkCategory::SqlInjection);
assert_eq!(sql_sinks.len(), 1);
let cmd_sinks = registry.sinks_for_category(SinkCategory::CommandInjection);
assert_eq!(cmd_sinks.len(), 1);
}
#[test]
fn test_get_sinks_for_language() {
let py = get_sinks_for_language("python");
assert!(!py.is_empty());
let ts = get_sinks_for_language("TypeScript");
assert!(!ts.is_empty());
let go = get_sinks_for_language("go");
assert!(!go.is_empty());
let rs = get_sinks_for_language("rust");
assert!(!rs.is_empty());
let unknown = get_sinks_for_language("cobol");
assert!(unknown.is_empty());
}
#[test]
fn test_all_categories() {
let all = SinkCategory::all();
assert!(all.len() >= 10);
assert!(all.contains(&SinkCategory::SqlInjection));
assert!(all.contains(&SinkCategory::XSS));
}
}