use crate::detectors::ast_fingerprint::parse_root_ext;
use crate::detectors::ast_walk::AstWalkCtx;
use crate::detectors::base::{Detector, DetectorConfig};
use crate::detectors::fast_search::*;
use crate::detectors::security::ast_helpers::{
collect_named_args, node_text, python_kwarg_truthy, python_kwarg_value,
receiver_chain_label as receiver_chain_label_shared, unwrap_callee,
};
use crate::detectors::security::scan_inputs::{ScanAstInputs, ScanInputs};
use crate::graph::GraphQueryExt;
use crate::models::{Finding, Severity};
use crate::parsers::lightweight::Language;
use anyhow::Result;
use regex::Regex;
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::sync::LazyLock;
use tracing::{debug, info};
const SUPPORTED_EXTS: &[&str] = &[
"py", "js", "ts", "jsx", "tsx", "rb", "php",
];
const AST_EXTS: &[&str] = &["py", "js", "ts", "jsx", "tsx"];
const DEFAULT_EXCLUDE_PATTERNS: &[&str] = &[
"tests/",
"test_",
"_test.py",
"migrations/",
"__pycache__/",
".git/",
"node_modules/",
"venv/",
".venv/",
];
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum PickleArgKind {
StaticLiteral,
InterpolatedOrConcat,
UserVariable,
FunctionLike,
Unknown,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum PickleApi {
PickleLoad,
DillLoad,
CloudPickleLoad,
UnpicklerLoad,
PandasReadPickle,
JoblibLoad,
NumpyLoadAllowPickle,
ShelveOpen,
TorchLoadUnsafe,
YamlLoadUnsafe,
MarshalLoad,
JsUnserialize,
RubyMarshalLoad,
PhpUnserialize,
}
impl PickleApi {
fn callee_label(self) -> &'static str {
match self {
PickleApi::PickleLoad => "pickle.load/loads",
PickleApi::DillLoad => "dill.load/loads",
PickleApi::CloudPickleLoad => "cloudpickle.load/loads",
PickleApi::UnpicklerLoad => "Unpickler.load",
PickleApi::PandasReadPickle => "pandas.read_pickle",
PickleApi::JoblibLoad => "joblib.load",
PickleApi::NumpyLoadAllowPickle => "numpy.load(allow_pickle=True)",
PickleApi::ShelveOpen => "shelve.open",
PickleApi::TorchLoadUnsafe => "torch.load (without weights_only=True)",
PickleApi::YamlLoadUnsafe => "yaml.load (without SafeLoader)",
PickleApi::MarshalLoad => "marshal.load/loads",
PickleApi::JsUnserialize => "unserialize",
PickleApi::RubyMarshalLoad => "Marshal.load",
PickleApi::PhpUnserialize => "unserialize",
}
}
}
pub struct PickleDeserializationDetector {
config: DetectorConfig,
#[allow(dead_code)]
repository_path: PathBuf,
max_findings: usize,
exclude_patterns: Vec<String>,
compiled_globs: Vec<Regex>,
}
impl PickleDeserializationDetector {
pub fn new() -> Self {
Self::with_config(DetectorConfig::new(), PathBuf::from("."))
}
pub fn with_repository_path(repository_path: PathBuf) -> Self {
Self::with_config(DetectorConfig::new(), repository_path)
}
pub fn with_config(config: DetectorConfig, repository_path: PathBuf) -> Self {
let max_findings = config.get_option_or("max_findings", 100);
let exclude_patterns = config
.get_option::<Vec<String>>("exclude_patterns")
.unwrap_or_else(|| {
DEFAULT_EXCLUDE_PATTERNS
.iter()
.map(|s| s.to_string())
.collect()
});
Self {
config,
repository_path,
max_findings,
compiled_globs: crate::detectors::base::compile_glob_patterns(&exclude_patterns),
exclude_patterns,
}
}
fn should_exclude(&self, path: &str) -> bool {
crate::detectors::base::should_exclude_path(
path,
&self.exclude_patterns,
&self.compiled_globs,
)
}
fn is_trusted_serialization_context(path: &str) -> bool {
path.contains("cache/backends/") || path.contains("sessions/backends/")
}
fn scan_file_ast(&self, inputs: &ScanAstInputs<'_>) -> Vec<Finding> {
let path = inputs.path();
let content = inputs.content();
let ext = inputs.ext();
let lang = inputs.lang;
let cached_tree = inputs.cached_tree;
let mut findings = vec![];
if content.contains('\0') || content.len() > 500_000 {
return findings;
}
let owned;
let root = match cached_tree {
Some(tree) => tree.root_node(),
None => match parse_root_ext(content, lang, ext) {
Some(t) => {
owned = t;
owned.root_node()
}
None => return findings,
},
};
let bytes = content.as_bytes();
let lines: Vec<&str> = content.lines().collect();
let alias_map = if matches!(lang, Language::Python) {
super::python_imports::collect_python_from_imports(root, bytes)
} else {
HashMap::new()
};
let module_aliases = if matches!(lang, Language::Python) {
super::python_imports::collect_python_module_aliases(root, bytes)
} else {
HashMap::new()
};
let mut sites: Vec<PickleSite> = Vec::new();
let ctx = AstWalkCtx {
lang,
source: bytes,
};
let aliases = super::python_imports::PythonAliases::new(&alias_map, &module_aliases);
collect_pickle_sites(&ctx, root, &aliases, &mut sites);
for site in sites {
if findings.len() >= self.max_findings {
break;
}
let line_idx = site.call_node.start_position().row;
if let Some(line) = lines.get(line_idx) {
let prev = if line_idx > 0 {
Some(lines[line_idx - 1])
} else {
None
};
if crate::detectors::is_line_suppressed(line, prev) {
continue;
}
}
let snippet = lines.get(line_idx).map(|s| s.trim()).unwrap_or("");
let line_num = (line_idx + 1) as u32;
let severity = severity_for(site.api, site.arg_kind);
findings.push(self.build_finding(
path,
line_num,
site.api,
site.arg_kind,
severity,
snippet,
ext,
));
}
findings
}
fn scan_file_line(&self, inputs: &ScanInputs<'_>) -> Vec<Finding> {
let path = inputs.path;
let content = inputs.content;
let ext = inputs.ext;
let mut findings = vec![];
if content.len() > 500_000 {
return findings;
}
let lines: Vec<&str> = content.lines().collect();
for (i, line) in lines.iter().enumerate() {
if findings.len() >= self.max_findings {
break;
}
let prev = if i > 0 { Some(lines[i - 1]) } else { None };
if crate::detectors::is_line_suppressed(line, prev) {
continue;
}
let trimmed = line.trim_start();
if trimmed.starts_with('#') || trimmed.starts_with("//") {
continue;
}
if let Some((api, arg_kind)) = match_line_pickle(line, ext) {
let line_num = (i + 1) as u32;
let severity = severity_for(api, arg_kind);
findings.push(self.build_finding(
path,
line_num,
api,
arg_kind,
severity,
line.trim(),
ext,
));
}
}
findings
}
fn build_finding(
&self,
path: &Path,
line_num: u32,
api: PickleApi,
arg_kind: PickleArgKind,
severity: Severity,
snippet: &str,
ext: &str,
) -> Finding {
let api_name = api.callee_label();
let arg_desc = match arg_kind {
PickleArgKind::StaticLiteral => "static literal (low risk)",
PickleArgKind::InterpolatedOrConcat => "concatenated/interpolated value (RCE risk)",
PickleArgKind::UserVariable => "non-literal expression (RCE risk)",
PickleArgKind::FunctionLike => "function value (unusual for deserialization)",
PickleArgKind::Unknown => "non-static argument",
};
let lang_label = match ext {
"py" => "python",
"js" | "jsx" => "javascript",
"ts" | "tsx" => "typescript",
"rb" => "ruby",
"php" => "php",
_ => "",
};
let title = "Unsafe Deserialization (CWE-502)".to_string();
let description = format!(
"**Unsafe Deserialization Vulnerability**\n\n\
**API**: `{}`\n\n\
**Argument shape**: {}\n\n\
**Location**: {}:{}\n\n\
**Code snippet**:\n```{}\n{}\n```\n\n\
Deserializing untrusted data can allow attackers to execute arbitrary code.\n\
Pickle, dill, joblib, torch.load, yaml.load, and similar functions execute code\n\
embedded in the serialized data. An attacker who controls the input can\n\
achieve Remote Code Execution (RCE).\n\n\
This vulnerability is classified as:\n\
- **CWE-502**: Deserialization of Untrusted Data\n\
- **OWASP A8:2017**: Insecure Deserialization",
api_name,
arg_desc,
path.display(),
line_num,
lang_label,
snippet,
);
let suggested_fix = self.recommend(api);
Finding {
id: String::new(),
detector: "PickleDeserializationDetector".to_string(),
severity,
title,
description,
affected_files: vec![path.to_path_buf()],
line_start: Some(line_num),
line_end: Some(line_num),
suggested_fix: Some(suggested_fix),
estimated_effort: Some("Medium (2-8 hours)".to_string()),
category: Some("security".to_string()),
cwe_id: Some("CWE-502".to_string()),
why_it_matters: Some(
"Insecure deserialization can lead to Remote Code Execution, allowing attackers \
to take complete control of the application and server."
.to_string(),
),
..Default::default()
}
}
fn recommend(&self, api: PickleApi) -> String {
match api {
PickleApi::PickleLoad
| PickleApi::DillLoad
| PickleApi::CloudPickleLoad
| PickleApi::UnpicklerLoad => "Avoid pickle/dill/cloudpickle on untrusted data.\n\n\
- For data exchange: use `json.loads`.\n\
- For binary data: use Protocol Buffers, msgpack (strict), or a\n\
signed/encrypted container.\n\
- If pickle is required, only load from sources you control and\n\
verify their integrity (checksum or signature) first."
.to_string(),
PickleApi::PandasReadPickle | PickleApi::JoblibLoad => {
"Avoid loading pickle-backed model/dataframe artifacts from untrusted sources.\n\n\
- For ML models, prefer `safetensors`, `ONNX`, or `skops` with an\n\
explicit allowlist.\n\
- For dataframes, prefer Parquet or CSV.\n\
- Verify source integrity (checksum/signature) before loading."
.to_string()
}
PickleApi::NumpyLoadAllowPickle => {
"Avoid `numpy.load(..., allow_pickle=True)` on untrusted files.\n\n\
- Default to `allow_pickle=False` (numpy's default since 1.16.3).\n\
- Use `.npz` files without object arrays.\n\
- If you need Python objects, verify the file source first."
.to_string()
}
PickleApi::ShelveOpen => {
"Avoid `shelve.open` on user-controlled paths — shelve uses pickle\n\
internally.\n\n\
- Use SQLite or another safe key-value store.\n\
- Validate the path source before opening."
.to_string()
}
PickleApi::TorchLoadUnsafe => "Avoid `torch.load` without `weights_only=True`.\n\n\
- Pass `weights_only=True` (PyTorch 1.13+) — only loads tensors.\n\
- Prefer `safetensors` for model weights.\n\
- Validate the model source before loading."
.to_string(),
PickleApi::YamlLoadUnsafe => "Avoid `yaml.load` without a Safe loader.\n\n\
- Use `yaml.safe_load(content)` for untrusted data.\n\
- Or pass `Loader=yaml.SafeLoader`.\n\
- Never call `yaml.unsafe_load` on untrusted input."
.to_string(),
PickleApi::MarshalLoad => {
"Avoid `marshal.load`/`marshal.loads` on untrusted data — marshal\n\
deserializes Python bytecode and can execute arbitrary code.\n\n\
- Use `json` for data exchange.\n\
- If marshal is unavoidable, only load signed/verified bytecode."
.to_string()
}
PickleApi::JsUnserialize => {
"Avoid `node-serialize` / `serialize-javascript` `unserialize` on\n\
untrusted input — these libraries execute embedded code on load.\n\n\
- Use `JSON.parse` for data exchange.\n\
- For object graphs, use a structured-clone or a safe library."
.to_string()
}
PickleApi::RubyMarshalLoad => {
"Avoid `Marshal.load` on untrusted data — Ruby Marshal can\n\
instantiate arbitrary classes and trigger code execution.\n\n\
- Use `JSON.parse` for data exchange.\n\
- If Marshal is required, only load from trusted sources."
.to_string()
}
PickleApi::PhpUnserialize => {
"Avoid `unserialize` on untrusted data — PHP `unserialize` can\n\
trigger magic methods (POP-chain RCE).\n\n\
- Use `json_decode` for data exchange.\n\
- If unserialize is required, pass an `allowed_classes` allowlist\n\
(PHP 7+) and validate the input source."
.to_string()
}
}
}
}
fn severity_for(_api: PickleApi, arg_kind: PickleArgKind) -> Severity {
match arg_kind {
PickleArgKind::StaticLiteral => Severity::Low,
PickleArgKind::InterpolatedOrConcat | PickleArgKind::UserVariable => Severity::Critical,
PickleArgKind::FunctionLike | PickleArgKind::Unknown => Severity::High,
}
}
impl Default for PickleDeserializationDetector {
fn default() -> Self {
Self::new()
}
}
impl Detector for PickleDeserializationDetector {
fn name(&self) -> &'static str {
"PickleDeserializationDetector"
}
fn description(&self) -> &'static str {
"Detects unsafe deserialization patterns (pickle, torch.load, yaml.load, ...)"
}
fn bypass_postprocessor(&self) -> bool {
true
}
fn category(&self) -> &'static str {
"security"
}
fn requires_graph(&self) -> bool {
false
}
fn config(&self) -> Option<&DetectorConfig> {
Some(&self.config)
}
fn file_extensions(&self) -> &'static [&'static str] {
SUPPORTED_EXTS
}
fn content_requirements(&self) -> crate::detectors::detector_context::ContentFlags {
crate::detectors::detector_context::ContentFlags::HAS_SERIALIZE
}
fn detect(
&self,
ctx: &crate::detectors::analysis_context::AnalysisContext,
) -> Result<Vec<Finding>> {
let graph = ctx.graph;
let files = &ctx.as_file_provider();
debug!("Starting unsafe-deserialization detection (AST-first)");
let mut findings: Vec<Finding> = Vec::new();
for path in files.files_with_extensions(SUPPORTED_EXTS) {
if findings.len() >= self.max_findings {
break;
}
let path_str = path.to_string_lossy().to_string();
if self.should_exclude(&path_str) {
continue;
}
if Self::is_trusted_serialization_context(&path_str) {
continue;
}
let content = match files.content(path) {
Some(c) => c,
None => continue,
};
if !contains_any(PICKLE_KEYWORD_FINDERS, &content) {
continue;
}
let ext = path.extension().and_then(|e| e.to_str()).unwrap_or("");
let scan = ScanInputs::new(path, &content, ext);
let new_findings = if AST_EXTS.contains(&ext) {
let cached = files.tree(path);
let lang = Language::from_path(path);
let ast_inputs = ScanAstInputs::new(scan, lang, cached.as_deref());
self.scan_file_ast(&ast_inputs)
} else {
self.scan_file_line(&scan)
};
findings.extend(new_findings);
}
static HANDLER_VERB_RE: LazyLock<Regex> = LazyLock::new(|| {
Regex::new(r"^(get|post|put|delete|patch|head|options)[A-Z]").expect("valid regex")
});
for finding in &mut findings {
if !matches!(finding.severity, Severity::High | Severity::Medium) {
continue;
}
if let (Some(file_path), Some(line)) =
(finding.affected_files.first(), finding.line_start)
{
let path_str = file_path.to_string_lossy().to_string();
let i = graph.interner();
if let Some(func) = graph.find_function_at(&path_str, line) {
let raw_name = func.node_name(i);
let name_lower = raw_name.to_lowercase();
let is_route = name_lower.contains("handler")
|| name_lower.contains("route")
|| name_lower.contains("endpoint")
|| name_lower.contains("view")
|| name_lower.contains("controller")
|| name_lower.contains("middleware")
|| name_lower.contains("request")
|| name_lower.contains("response")
|| HANDLER_VERB_RE.is_match(raw_name);
if is_route {
finding.severity = Severity::Critical;
}
}
}
}
findings.retain(|f| f.severity != Severity::Low);
info!(
"PickleDeserializationDetector found {} potential vulnerabilities",
findings.len()
);
Ok(findings)
}
}
impl crate::detectors::RegisteredDetector for PickleDeserializationDetector {
fn create(init: &crate::detectors::DetectorInit) -> std::sync::Arc<dyn Detector> {
std::sync::Arc::new(Self::with_repository_path(init.repo_path.to_path_buf()))
}
}
static PICKLE_KEYWORD_FINDERS: &[&LazyLock<memchr::memmem::Finder<'static>>] = &[
&FIND_PICKLE,
&FIND_CPICKLE,
&FIND_DILL,
&FIND_CLOUDPICKLE,
&FIND_READ_PICKLE,
&FIND_JOBLIB_LOAD,
&FIND_UNPICKLER,
&FIND_SHELVE,
&FIND_NUMPY_LOAD_DOT,
&FIND_NP_LOAD_DOT,
&FIND_TORCH_LOAD_DOT,
&FIND_YAML_LOAD_DOT,
&FIND_YAML_UNSAFE_LOAD,
&FIND_YAML_FULL_LOAD,
&FIND_MARSHAL_DOT,
&FIND_UNSERIALIZE,
&FIND_NODE_SERIALIZE,
&FIND_SERIALIZE_JS,
&FIND_DESERIALIZE,
&FIND_MARSHAL_LOAD_RB,
];
struct PickleSite<'a> {
call_node: tree_sitter::Node<'a>,
api: PickleApi,
arg_kind: PickleArgKind,
}
fn collect_pickle_sites<'a>(
ctx: &AstWalkCtx<'a>,
node: tree_sitter::Node<'a>,
aliases: &super::python_imports::PythonAliases<'_>,
out: &mut Vec<PickleSite<'a>>,
) {
if let Some(site) = match_pickle_site(node, ctx.source, ctx.lang, aliases) {
out.push(site);
}
let mut cursor = node.walk();
for child in node.children(&mut cursor) {
collect_pickle_sites(ctx, child, aliases, out);
}
}
fn match_pickle_site<'a>(
node: tree_sitter::Node<'a>,
source: &'a [u8],
lang: Language,
aliases: &super::python_imports::PythonAliases<'_>,
) -> Option<PickleSite<'a>> {
match (node.kind(), lang) {
("call", Language::Python) => match_python_pickle_call(node, source, aliases),
("call_expression", Language::JavaScript | Language::TypeScript) => {
match_js_unserialize_call(node, source)
}
_ => None,
}
}
fn classify_python_pickle_callee(
module: &str,
name: &str,
arg_nodes: &[tree_sitter::Node<'_>],
source: &[u8],
obj_for_unpickler: Option<tree_sitter::Node<'_>>,
) -> Option<PickleApi> {
Some(match (module, name) {
("pickle" | "cpickle" | "_pickle", "load" | "loads") => PickleApi::PickleLoad,
("dill", "load" | "loads") => PickleApi::DillLoad,
("cloudpickle", "load" | "loads") => PickleApi::CloudPickleLoad,
(_, "load")
if obj_for_unpickler
.map(|o| o.kind() == "call" && call_is_unpickler_constructor(o, source))
.unwrap_or(false) =>
{
PickleApi::UnpicklerLoad
}
("pandas" | "pd", "read_pickle") => PickleApi::PandasReadPickle,
("joblib", "load") => PickleApi::JoblibLoad,
("numpy" | "np", "load") if numpy_load_allow_pickle_true(arg_nodes, source) => {
PickleApi::NumpyLoadAllowPickle
}
("numpy" | "np", "load") => return None,
("shelve", "open") => PickleApi::ShelveOpen,
("torch", "load") => {
if torch_load_weights_only_true(arg_nodes, source) {
return None;
}
PickleApi::TorchLoadUnsafe
}
("yaml", "load" | "unsafe_load" | "full_load") => {
if yaml_load_safe(arg_nodes, source) {
return None;
}
PickleApi::YamlLoadUnsafe
}
("marshal", "load" | "loads") => PickleApi::MarshalLoad,
_ => return None,
})
}
fn match_python_pickle_call<'a>(
node: tree_sitter::Node<'a>,
source: &'a [u8],
aliases: &super::python_imports::PythonAliases<'_>,
) -> Option<PickleSite<'a>> {
let func = node.child_by_field_name("function")?;
let func = unwrap_callee(func);
let args = node.child_by_field_name("arguments")?;
let arg_nodes = collect_named_args(args);
let api = match func.kind() {
"attribute" => {
let obj = func.child_by_field_name("object")?;
let attr = func.child_by_field_name("attribute")?;
let attr_text = node_text(attr, source)?;
let raw_label = receiver_chain_label(obj, source);
let obj_text = node_text(obj, source).unwrap_or("");
let obj_label = aliases
.modules
.get(obj_text)
.or_else(|| aliases.modules.get(raw_label.as_str()))
.cloned()
.unwrap_or(raw_label);
classify_python_pickle_callee(
obj_label.as_str(),
attr_text,
&arg_nodes,
source,
Some(obj),
)?
}
"identifier" => {
let name = node_text(func, source)?;
let module = aliases.imports.get(name)?;
classify_python_pickle_callee(module.as_str(), name, &arg_nodes, source, None)?
}
_ => return None,
};
let target = arg_nodes
.iter()
.copied()
.find(|a| a.kind() != "keyword_argument")?;
let arg_kind = classify_pickle_arg_python(target, source);
Some(PickleSite {
call_node: node,
api,
arg_kind,
})
}
fn call_is_unpickler_constructor(node: tree_sitter::Node<'_>, source: &[u8]) -> bool {
if node.kind() != "call" {
return false;
}
let Some(func) = node.child_by_field_name("function") else {
return false;
};
match func.kind() {
"identifier" => matches!(
node_text(func, source).unwrap_or(""),
"Unpickler" | "_Unpickler"
),
"attribute" => {
let attr = match func.child_by_field_name("attribute") {
Some(a) => a,
None => return false,
};
matches!(
node_text(attr, source).unwrap_or(""),
"Unpickler" | "_Unpickler"
)
}
_ => false,
}
}
fn numpy_load_allow_pickle_true(args: &[tree_sitter::Node<'_>], source: &[u8]) -> bool {
python_kwarg_truthy(
args,
"allow_pickle",
source,
true,
)
}
fn torch_load_weights_only_true(args: &[tree_sitter::Node<'_>], source: &[u8]) -> bool {
python_kwarg_truthy(
args,
"weights_only",
source,
false,
)
}
fn yaml_load_safe(args: &[tree_sitter::Node<'_>], source: &[u8]) -> bool {
let Some(value) = python_kwarg_value(args, "Loader", source) else {
return false;
};
let text = node_text(value, source).unwrap_or("");
text.contains("SafeLoader") || text.contains("BaseLoader") || text.contains("CSafeLoader")
}
#[allow(clippy::only_used_in_recursion)]
fn classify_pickle_arg_python(node: tree_sitter::Node<'_>, source: &[u8]) -> PickleArgKind {
match node.kind() {
"string" => {
let mut cursor = node.walk();
for child in node.children(&mut cursor) {
if child.kind() == "interpolation" {
return PickleArgKind::InterpolatedOrConcat;
}
}
PickleArgKind::StaticLiteral
}
"concatenated_string" => {
let mut cursor = node.walk();
for child in node.children(&mut cursor) {
if classify_pickle_arg_python(child, source) == PickleArgKind::InterpolatedOrConcat
{
return PickleArgKind::InterpolatedOrConcat;
}
}
PickleArgKind::StaticLiteral
}
"binary_operator" => {
let mut found_var = false;
let mut cursor = node.walk();
for child in node.children(&mut cursor) {
if !child.is_named() {
continue;
}
match classify_pickle_arg_python(child, source) {
PickleArgKind::UserVariable
| PickleArgKind::InterpolatedOrConcat
| PickleArgKind::Unknown => found_var = true,
_ => {}
}
}
if found_var {
PickleArgKind::InterpolatedOrConcat
} else {
PickleArgKind::StaticLiteral
}
}
"identifier" | "attribute" | "subscript" | "call" => PickleArgKind::UserVariable,
"lambda" => PickleArgKind::FunctionLike,
"parenthesized_expression" => {
for i in 0..node.named_child_count() {
if let Some(c) = node.named_child(i) {
return classify_pickle_arg_python(c, source);
}
}
PickleArgKind::Unknown
}
"await" => {
for i in 0..node.named_child_count() {
if let Some(c) = node.named_child(i) {
return classify_pickle_arg_python(c, source);
}
}
PickleArgKind::Unknown
}
"conditional_expression" => {
let mut strongest = PickleArgKind::StaticLiteral;
for i in 0..node.named_child_count() {
if let Some(c) = node.named_child(i) {
let k = classify_pickle_arg_python(c, source);
strongest = strongest_arg_kind(strongest, k);
}
}
strongest
}
_ => PickleArgKind::Unknown,
}
}
fn match_js_unserialize_call<'a>(
node: tree_sitter::Node<'a>,
source: &'a [u8],
) -> Option<PickleSite<'a>> {
let func = node.child_by_field_name("function")?;
let args = node.child_by_field_name("arguments")?;
let arg_nodes = collect_named_args(args);
let func = unwrap_callee(func);
let api = match func.kind() {
"identifier" => {
match node_text(func, source)? {
"unserialize" => PickleApi::JsUnserialize,
_ => return None,
}
}
"member_expression" => {
let obj = func.child_by_field_name("object")?;
let prop = func.child_by_field_name("property")?;
let prop_text = node_text(prop, source)?;
let recv = receiver_chain_label(obj, source);
let unserialize_aliases = matches!(
recv.as_str(),
"nodeserialize"
| "node-serialize"
| "serialize"
| "serialize-javascript"
| "serializejavascript"
);
if unserialize_aliases && prop_text == "unserialize" {
PickleApi::JsUnserialize
} else {
return None;
}
}
_ => return None,
};
let first = arg_nodes.first().copied()?;
let arg_kind = classify_pickle_arg_js(first, source);
Some(PickleSite {
call_node: node,
api,
arg_kind,
})
}
#[allow(clippy::only_used_in_recursion)]
fn classify_pickle_arg_js(node: tree_sitter::Node<'_>, source: &[u8]) -> PickleArgKind {
match node.kind() {
"string" => PickleArgKind::StaticLiteral,
"template_string" => {
let mut cursor = node.walk();
for child in node.children(&mut cursor) {
if child.kind() == "template_substitution" {
return PickleArgKind::InterpolatedOrConcat;
}
}
PickleArgKind::StaticLiteral
}
"binary_expression" => {
let left = node.child_by_field_name("left");
let right = node.child_by_field_name("right");
let mut found_var = false;
for opt in [left, right].iter().flatten() {
match classify_pickle_arg_js(*opt, source) {
PickleArgKind::UserVariable
| PickleArgKind::InterpolatedOrConcat
| PickleArgKind::Unknown => {
found_var = true;
}
_ => {}
}
}
if found_var {
PickleArgKind::InterpolatedOrConcat
} else {
PickleArgKind::StaticLiteral
}
}
"identifier" | "member_expression" | "subscript_expression" | "call_expression" => {
PickleArgKind::UserVariable
}
"arrow_function" | "function_expression" | "function" | "function_declaration" => {
PickleArgKind::FunctionLike
}
"parenthesized_expression"
| "await_expression"
| "as_expression"
| "type_assertion_expression"
| "non_null_expression"
| "satisfies_expression" => {
for i in 0..node.named_child_count() {
if let Some(c) = node.named_child(i) {
return classify_pickle_arg_js(c, source);
}
}
PickleArgKind::Unknown
}
"ternary_expression" => {
let consequence = node.child_by_field_name("consequence");
let alternative = node.child_by_field_name("alternative");
let mut strongest = PickleArgKind::StaticLiteral;
for opt in [consequence, alternative].iter().flatten() {
let k = classify_pickle_arg_js(*opt, source);
strongest = strongest_arg_kind(strongest, k);
}
strongest
}
_ => PickleArgKind::Unknown,
}
}
fn strongest_arg_kind(a: PickleArgKind, b: PickleArgKind) -> PickleArgKind {
fn rank(k: PickleArgKind) -> u8 {
match k {
PickleArgKind::UserVariable => 4,
PickleArgKind::InterpolatedOrConcat => 3,
PickleArgKind::Unknown => 2,
PickleArgKind::FunctionLike => 1,
PickleArgKind::StaticLiteral => 0,
}
}
if rank(a) >= rank(b) {
a
} else {
b
}
}
fn receiver_chain_label(node: tree_sitter::Node<'_>, source: &[u8]) -> String {
receiver_chain_label_shared(node, source, Some(&call_expression_module_label))
}
fn call_expression_module_label(
node: tree_sitter::Node<'_>,
source: &[u8],
) -> Option<&'static str> {
debug_assert_eq!(node.kind(), "call_expression");
let func = node.child_by_field_name("function")?;
let func_text = node_text(func, source)?;
let is_require_or_import =
matches!(func.kind(), "identifier" | "import") && matches!(func_text, "require" | "import");
if !is_require_or_import {
return None;
}
let args = node.child_by_field_name("arguments")?;
let arg_nodes = collect_named_args(args);
let first = arg_nodes.first()?;
let module = js_string_literal_value(*first, source)?;
match module.as_str() {
"node-serialize" => Some("node-serialize"),
"serialize-javascript" => Some("serialize-javascript"),
_ => None,
}
}
fn js_string_literal_value(node: tree_sitter::Node<'_>, source: &[u8]) -> Option<String> {
if node.kind() != "string" {
return None;
}
let mut cursor = node.walk();
let mut buf = String::new();
let mut saw_fragment = false;
for child in node.children(&mut cursor) {
if child.kind() == "string_fragment" {
if let Some(t) = node_text(child, source) {
buf.push_str(t);
saw_fragment = true;
}
}
}
if saw_fragment {
return Some(buf);
}
let raw = node_text(node, source)?;
let inner = raw
.strip_prefix('"')
.and_then(|s| s.strip_suffix('"'))
.or_else(|| raw.strip_prefix('\'').and_then(|s| s.strip_suffix('\'')))?;
Some(inner.to_string())
}
fn match_line_pickle(line: &str, ext: &str) -> Option<(PickleApi, PickleArgKind)> {
static RUBY_MARSHAL_RE: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"\bMarshal\.load\s*\(").expect("valid regex"));
static PHP_UNSERIALIZE_RE: LazyLock<Regex> =
LazyLock::new(|| Regex::new(r"(?:^|[^.>])\bunserialize\s*\(").expect("valid regex"));
let (re, api): (&Regex, PickleApi) = match ext {
"rb" => (&*RUBY_MARSHAL_RE, PickleApi::RubyMarshalLoad),
"php" => (&*PHP_UNSERIALIZE_RE, PickleApi::PhpUnserialize),
_ => return None,
};
let m = re.find(line)?;
let after = &line[m.end()..];
let arg_kind = classify_line_arg(after);
Some((api, arg_kind))
}
fn classify_line_arg(after_paren: &str) -> PickleArgKind {
let trimmed = after_paren.trim_start();
if trimmed.starts_with('"') || trimmed.starts_with('\'') {
let quote = trimmed.as_bytes()[0];
let mut i = 1;
let bytes = trimmed.as_bytes();
let mut had_interp = false;
while i < bytes.len() {
let c = bytes[i];
if c == b'\\' {
i += 2;
continue;
}
if c == quote {
break;
}
if quote == b'"' && c == b'#' && bytes.get(i + 1) == Some(&b'{') {
had_interp = true;
}
if quote == b'"' && c == b'$' {
had_interp = true;
}
i += 1;
}
if had_interp {
PickleArgKind::InterpolatedOrConcat
} else {
PickleArgKind::StaticLiteral
}
} else if trimmed.starts_with(')') {
PickleArgKind::Unknown
} else {
PickleArgKind::UserVariable
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::graph::builder::GraphBuilder;
#[test]
fn test_skips_cache_backend_paths() {
assert!(
PickleDeserializationDetector::is_trusted_serialization_context(
"cache/backends/redis.py"
)
);
assert!(
PickleDeserializationDetector::is_trusted_serialization_context(
"django/core/cache/backends/db.py"
)
);
assert!(
PickleDeserializationDetector::is_trusted_serialization_context(
"sessions/backends/db.py"
)
);
assert!(!PickleDeserializationDetector::is_trusted_serialization_context("myapp/views.py"));
}
#[test]
fn test_detects_pickle_loads_with_user_input_python() {
let content = "import pickle\ndef handle(user_data):\n return pickle.loads(user_data)\n";
let store = GraphBuilder::new().freeze();
let detector =
PickleDeserializationDetector::with_repository_path(PathBuf::from("/mock/repo"));
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![("handler.py", content)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings
.iter()
.any(|f| matches!(f.severity, Severity::Critical)),
"pickle.loads(user_data) should be Critical. Got: {:?}",
findings
.iter()
.map(|f| (&f.title, f.severity))
.collect::<Vec<_>>()
);
}
#[test]
fn test_detects_pickle_load_from_file_python() {
let content = "import pickle\ndef handle(user_path):\n return pickle.load(open(user_path, 'rb'))\n";
let store = GraphBuilder::new().freeze();
let detector =
PickleDeserializationDetector::with_repository_path(PathBuf::from("/mock/repo"));
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![("h.py", content)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
!findings.is_empty(),
"pickle.load(open(user_path)) must fire. Got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_detects_dill_loads_python() {
let content = "import dill\ndef handle(user_data):\n return dill.loads(user_data)\n";
let store = GraphBuilder::new().freeze();
let detector =
PickleDeserializationDetector::with_repository_path(PathBuf::from("/mock/repo"));
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![("h.py", content)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings
.iter()
.any(|f| matches!(f.severity, Severity::Critical)),
"dill.loads(user_data) should be Critical. Got: {:?}",
findings
.iter()
.map(|f| (&f.title, f.severity))
.collect::<Vec<_>>()
);
}
#[test]
fn test_skips_pickle_loads_with_static_bytes() {
let content = "import pickle\nsafe = pickle.loads(b'\\x80\\x04K\\x01.')\n";
let store = GraphBuilder::new().freeze();
let detector =
PickleDeserializationDetector::with_repository_path(PathBuf::from("/mock/repo"));
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![("safe.py", content)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings.is_empty(),
"pickle.loads(staticBytes) must not fire post-filter. Got: {:?}",
findings
.iter()
.map(|f| (&f.title, f.severity))
.collect::<Vec<_>>()
);
}
#[test]
fn test_skips_pickle_word_in_string_literal() {
let content = "def doc():\n msg = \"use pickle.loads to deserialize\"\n return msg\n";
let store = GraphBuilder::new().freeze();
let detector =
PickleDeserializationDetector::with_repository_path(PathBuf::from("/mock/repo"));
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![("doc.py", content)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings.is_empty(),
"`pickle.loads` inside a string literal must not fire. Got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_b1_pickle_loads_via_require_alias_js() {
let content = "function go(userData) {\n return require('node-serialize').unserialize(userData);\n}\n";
let store = GraphBuilder::new().freeze();
let detector =
PickleDeserializationDetector::with_repository_path(PathBuf::from("/mock/repo"));
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![("go.js", content)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
!findings.is_empty(),
"B1: require('node-serialize').unserialize(userData) must fire. Got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_b1_member_of_member_self_pickle_loads_python() {
let content = "class S:\n def go(self, data):\n return self.pickle.loads(data)\n";
let store = GraphBuilder::new().freeze();
let detector =
PickleDeserializationDetector::with_repository_path(PathBuf::from("/mock/repo"));
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![("s.py", content)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
!findings.is_empty(),
"B1: self.pickle.loads(data) must fire. Got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_skips_unpickler_method_name() {
let content = "class Pickler:\n def loads(self, data):\n return data\n\np = Pickler()\nresult = p.loads(b'x')\n";
let store = GraphBuilder::new().freeze();
let detector =
PickleDeserializationDetector::with_repository_path(PathBuf::from("/mock/repo"));
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![("p.py", content)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings.is_empty(),
"Method-name `loads` must not be confused with pickle.loads. Got: {:?}",
findings.iter().map(|f| &f.title).collect::<Vec<_>>()
);
}
#[test]
fn test_detects_pickle_loads_with_concatenation() {
let content =
"import pickle\ndef go(user_data):\n return pickle.loads(b'prefix' + user_data)\n";
let store = GraphBuilder::new().freeze();
let detector =
PickleDeserializationDetector::with_repository_path(PathBuf::from("/mock/repo"));
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![("c.py", content)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings
.iter()
.any(|f| matches!(f.severity, Severity::Critical)),
"pickle.loads(b'prefix' + user_data) should be Critical. Got: {:?}",
findings
.iter()
.map(|f| (&f.title, f.severity))
.collect::<Vec<_>>()
);
}
#[test]
fn test_b4_classify_arg_through_await_python() {
let content = "import pickle\nasync def go():\n return pickle.loads(await get_data())\n";
let store = GraphBuilder::new().freeze();
let detector =
PickleDeserializationDetector::with_repository_path(PathBuf::from("/mock/repo"));
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![("a.py", content)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
let f = findings
.iter()
.find(|f| f.line_start == Some(3))
.expect("B4: pickle.loads(await ...) must fire");
assert!(
matches!(f.severity, Severity::Critical),
"B4: pickle.loads(await get_data()) should be Critical (UserVariable), got {:?}",
f.severity
);
}
#[test]
fn test_numpy_load_allow_pickle_true_fires() {
let content = "import numpy as np\ndef go(user_path):\n a = np.load(user_path, allow_pickle=True)\n b = np.load(user_path)\n return (a, b)\n";
let store = GraphBuilder::new().freeze();
let detector =
PickleDeserializationDetector::with_repository_path(PathBuf::from("/mock/repo"));
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![("n.py", content)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings.iter().any(|f| f.line_start == Some(3)),
"np.load(user_path, allow_pickle=True) must fire on line 3. Got: {:?}",
findings
.iter()
.map(|f| (f.line_start, &f.title))
.collect::<Vec<_>>()
);
assert!(
!findings.iter().any(|f| f.line_start == Some(4)),
"np.load(user_path) (default allow_pickle=False) must not fire. Got: {:?}",
findings
.iter()
.map(|f| (f.line_start, &f.title))
.collect::<Vec<_>>()
);
}
#[test]
fn test_severity_critical_for_user_input_low_for_static_bytes() {
let content = "import pickle\ndef both(user_input):\n a = pickle.loads(user_input)\n b = pickle.loads(b'\\x80\\x04K\\x01.')\n return (a, b)\n";
let store = GraphBuilder::new().freeze();
let detector =
PickleDeserializationDetector::with_repository_path(PathBuf::from("/mock/repo"));
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![("b.py", content)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
let var_finding = findings.iter().find(|f| f.line_start == Some(3));
let lit_finding = findings.iter().find(|f| f.line_start == Some(4));
assert!(
var_finding.is_some(),
"Variable-arg pickle.loads on line 3 must produce a finding"
);
assert!(
matches!(var_finding.unwrap().severity, Severity::Critical),
"Variable-arg pickle.loads should be Critical, got {:?}",
var_finding.unwrap().severity
);
assert!(
lit_finding.is_none(),
"Static-bytes pickle.loads must be filtered (Low). Got: {:?}",
lit_finding.map(|f| (&f.title, f.severity))
);
}
#[test]
fn test_python_bare_pickle_loads_after_from_import() {
let content =
"from pickle import loads\n\ndef parse(user_data):\n return loads(user_data)\n";
let store = GraphBuilder::new().freeze();
let detector =
PickleDeserializationDetector::with_repository_path(PathBuf::from("/mock/repo"));
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![("p.py", content)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings
.iter()
.any(|f| f.line_start == Some(4) && f.severity == Severity::Critical),
"Should fire Critical on `loads(user_data)` after `from pickle import loads`. Got: {:?}",
findings
.iter()
.map(|f| (f.line_start, f.severity, &f.title))
.collect::<Vec<_>>()
);
}
#[test]
fn test_python_aliased_module_pickle_loads_detected() {
let content =
"import pickle as pkl\n\ndef parse(user_data):\n return pkl.loads(user_data)\n";
let store = GraphBuilder::new().freeze();
let detector =
PickleDeserializationDetector::with_repository_path(PathBuf::from("/mock/repo"));
let ctx = crate::detectors::analysis_context::AnalysisContext::test_with_mock_files(
&store,
vec![("p.py", content)],
);
let findings = detector.detect(&ctx).expect("detection should succeed");
assert!(
findings
.iter()
.any(|f| f.line_start == Some(4) && f.severity == Severity::Critical),
"Should fire Critical on `pkl.loads(user_data)` after `import pickle as pkl`. Got: {:?}",
findings
.iter()
.map(|f| (f.line_start, f.severity, &f.title))
.collect::<Vec<_>>()
);
}
}