use rustpython_parser::ast::{self, Expr, Pattern, Stmt};
use std::collections::HashMap;
use crate::ir::{EdgeKind, NodeId, NodeKind, TaintGraph};
use super::scope::Scope;
#[derive(Clone)]
pub(super) struct FunctionParams {
pub(super) positional: Vec<NodeId>,
pub(super) by_name: HashMap<String, NodeId>,
pub(super) kwarg: Option<NodeId>,
pub(super) vararg: Option<NodeId>,
}
pub(super) struct PythonParser {
pub(super) graph: TaintGraph,
pub(super) scopes: Vec<Scope>,
pub(super) current_function_node: Option<NodeId>,
pub(super) current_class_node: Option<NodeId>,
pub(super) comprehension_depth: usize,
pub(super) function_params: HashMap<String, FunctionParams>,
pub(super) dict_literals: HashMap<NodeId, HashMap<String, NodeId>>,
pub(super) var_sources: HashMap<NodeId, NodeId>,
class_methods: HashMap<String, Vec<(String, NodeId)>>,
}
impl PythonParser {
pub(super) fn new() -> Self {
Self {
graph: TaintGraph::new(),
scopes: Vec::new(),
current_function_node: None,
current_class_node: None,
comprehension_depth: 0,
function_params: HashMap::new(),
dict_literals: HashMap::new(),
var_sources: HashMap::new(),
class_methods: HashMap::new(),
}
}
pub(super) fn into_graph(self) -> TaintGraph {
self.graph
}
fn dangerous_module_exports(module: &str) -> &'static [&'static str] {
match module {
"os" => &[
"system", "popen", "spawn", "exec", "fork", "kill",
"environ", "getenv", "putenv", "remove", "rename",
"rmdir", "mkdir", "makedirs", "walk",
],
"subprocess" => &["Popen", "call", "run", "check_output", "check_call"],
"socket" => &["socket", "create_connection", "gethostbyname", "getaddrinfo"],
"base64" => &["b64decode", "b64encode", "decodestring", "decodebytes"],
"pickle" => &["load", "loads", "dump", "dumps", "Pickler", "Unpickler"],
"marshal" => &["load", "loads", "dump", "dumps"],
_ => &[],
}
}
pub(super) fn parse_suite(&mut self, suite: &[Stmt]) {
self.scopes.push(Scope::new());
self.visit_stmts(suite);
self.scopes.pop();
}
fn visit_stmts(&mut self, stmts: &[Stmt]) {
for stmt in stmts {
self.visit_stmt(stmt);
}
}
fn visit_stmt(&mut self, stmt: &Stmt) {
match stmt {
Stmt::FunctionDef(node) => {
self.visit_function_def(&node.name, &node.args, &node.body, &node.decorator_list)
}
Stmt::AsyncFunctionDef(node) => {
self.visit_function_def(&node.name, &node.args, &node.body, &node.decorator_list)
}
Stmt::ClassDef(node) => {
let class_id = self.graph.add_node(NodeKind::Variable, node.name.to_string(), None);
self.cur().define(node.name.to_string(), class_id);
for base in &node.bases {
let base_id = self.eval_expr(base);
self.flow(base_id, class_id);
}
for keyword in &node.keywords {
let kw_id = self.eval_expr(&keyword.value);
self.flow(kw_id, class_id);
}
let mut decorated_id = class_id;
for decorator in node.decorator_list.iter().rev() {
let dec_id = self.eval_expr(decorator);
let next_id = self
.graph
.add_node(NodeKind::Variable, format!("<decorated {}>", node.name), None);
self.flow(decorated_id, next_id);
self.flow(dec_id, next_id);
decorated_id = next_id;
}
if !node.decorator_list.is_empty() {
if let Some(graph_node) = self.graph.node_mut(decorated_id) {
graph_node.alias = Some(node.name.to_string());
}
self.cur().define(node.name.to_string(), decorated_id);
}
let previous_class = self.current_class_node.replace(class_id);
self.scopes.push(Scope::new());
self.visit_stmts(&node.body);
let class_name = node.name.to_string();
let parent_scope = self.scopes.len().saturating_sub(2);
if let Some(methods) = self.class_methods.get(&class_name) {
for (method_name, method_id) in methods.clone() {
let full = format!("{}.{}", class_name, method_name);
if parent_scope < self.scopes.len() {
self.scopes[parent_scope].define(full, method_id);
}
self.flow(method_id, class_id);
}
}
for base in &node.bases {
if let Expr::Name(base_name) = base {
if let Some(base_methods) =
self.class_methods.get(&base_name.id.to_string()).cloned()
{
for (method_name, method_id) in base_methods {
let full = format!("{}.{}", class_name, method_name);
if parent_scope < self.scopes.len()
&& self.scopes[parent_scope].resolve(&full).is_none()
{
let inherited_id = self
.graph
.add_node(NodeKind::Variable, full.clone(), None);
if let Some(graph_node) = self.graph.node_mut(inherited_id) {
graph_node.alias = Some(full.clone());
}
self.flow(method_id, inherited_id);
self.scopes[parent_scope].define(full.clone(), inherited_id);
self.class_methods
.entry(class_name.clone())
.or_default()
.push((method_name, inherited_id));
}
}
}
}
}
self.scopes.pop();
self.current_class_node = previous_class;
}
Stmt::Return(node) => {
if let (Some(value), Some(function_id)) = (&node.value, self.current_function_node) {
let value_id = self.eval_expr(value);
self.graph.add_edge(value_id, function_id, EdgeKind::Return);
}
}
Stmt::Assign(node) => {
let value_id = self.eval_expr(&node.value);
for target in &node.targets {
self.bind_target(target, value_id);
}
}
Stmt::AnnAssign(node) => {
if let Some(value) = &node.value {
let value_id = self.eval_expr(value);
self.bind_target(&node.target, value_id);
} else {
let value_id = self.literal("<annassign>");
self.bind_target(&node.target, value_id);
}
}
Stmt::AugAssign(node) => {
let target_id = self.eval_expr(&node.target);
let value_id = self.eval_expr(&node.value);
self.flow(target_id, target_id);
self.flow(value_id, target_id);
}
Stmt::For(node) => self.visit_for(&node.target, &node.iter, &node.body, &node.orelse),
Stmt::AsyncFor(node) => self.visit_for(&node.target, &node.iter, &node.body, &node.orelse),
Stmt::While(node) => {
self.eval_expr(&node.test);
self.visit_stmts(&node.body);
self.visit_stmts(&node.orelse);
}
Stmt::If(node) => {
self.eval_expr(&node.test);
self.visit_stmts(&node.body);
self.visit_stmts(&node.orelse);
}
Stmt::With(node) => self.visit_with(&node.items, &node.body),
Stmt::AsyncWith(node) => self.visit_with(&node.items, &node.body),
Stmt::Try(node) => self.visit_try(&node.body, &node.handlers, &node.orelse, &node.finalbody),
Stmt::TryStar(node) => self.visit_try(&node.body, &node.handlers, &node.orelse, &node.finalbody),
Stmt::Import(node) => {
for alias in &node.names {
let binding = alias.asname.as_ref().map(|a| a.to_string()).unwrap_or_else(|| {
alias.name.split('.').next().unwrap_or(&alias.name).to_string()
});
let binding = alias.asname.as_ref().map(|a| a.to_string()).unwrap_or_else(|| {
alias.name.split('.').next().unwrap_or(&alias.name).to_string()
});
let node_name = if alias.asname.is_none() {
binding.clone()
} else {
alias.name.to_string()
};
let import_id = self.graph.add_node(NodeKind::Import, node_name.clone(), None);
if let Some(graph_node) = self.graph.node_mut(import_id) {
graph_node.alias = Some(node_name);
}
self.cur().define(binding, import_id);
}
}
Stmt::ImportFrom(node) => {
let level = node.level.map(|l| l.to_usize()).unwrap_or(0);
let module = node.module.as_ref().map(ToString::to_string);
for alias in &node.names {
if alias.name.as_str() == "*" {
let qualified = module.as_deref().unwrap_or("<unknown>");
let expanded = Self::dangerous_module_exports(qualified);
if !expanded.is_empty() {
for name in expanded {
let item_qualified = format!("{}.{}", qualified, name);
let import_id = self.graph.add_node(
NodeKind::Import,
item_qualified.clone(),
None,
);
if let Some(graph_node) = self.graph.node_mut(import_id) {
graph_node.alias = Some(item_qualified);
}
self.cur().define(name.to_string(), import_id);
}
} else {
let import_id = self.graph.add_node(
NodeKind::Import,
format!("{}.*", qualified),
None,
);
if let Some(graph_node) = self.graph.node_mut(import_id) {
graph_node.alias = Some(format!("{}.*", qualified));
}
self.cur().define(format!("{}.*", qualified), import_id);
}
continue;
}
let binding = alias.asname.as_ref().unwrap_or(&alias.name).to_string();
let prefix = if level > 0 {
".".repeat(level)
} else {
String::new()
};
let mut qualified = match (&module, prefix.is_empty()) {
(Some(m), true) => format!("{m}.{}", alias.name),
(Some(m), false) => format!("{prefix}{m}.{}", alias.name),
(None, _) => format!("{prefix}{}", alias.name),
};
if module.as_deref() == Some("flask") && alias.name.as_str() == "request" {
qualified = "request".to_string();
}
let import_id = self.graph.add_node(NodeKind::Import, qualified.clone(), None);
if let Some(graph_node) = self.graph.node_mut(import_id) {
graph_node.alias = Some(qualified);
}
self.cur().define(binding, import_id);
}
}
Stmt::Expr(node) => {
self.eval_expr(&node.value);
}
Stmt::Raise(node) => {
if let Some(expr) = &node.exc {
self.eval_expr(expr);
}
if let Some(expr) = &node.cause {
self.eval_expr(expr);
}
}
Stmt::Assert(node) => {
self.eval_expr(&node.test);
if let Some(expr) = &node.msg {
self.eval_expr(expr);
}
}
Stmt::Delete(node) => {
for target in &node.targets {
self.eval_expr(target);
}
}
Stmt::Match(node) => {
let subject_id = self.eval_expr(&node.subject);
for case in &node.cases {
if let Some(guard) = &case.guard {
self.eval_expr(guard);
}
self.scopes.push(Scope::new());
self.bind_pattern(&case.pattern, subject_id);
self.visit_stmts(&case.body);
self.scopes.pop();
}
}
Stmt::Global(node) => {
for name in &node.names {
self.cur().add_global(name.to_string());
}
}
Stmt::Nonlocal(node) => {
for name in &node.names {
self.cur().add_nonlocal(name.to_string());
}
}
Stmt::TypeAlias(node) => {
let value_id = self.eval_expr(&node.value);
self.bind_target(&node.name, value_id);
}
Stmt::Pass(_) | Stmt::Break(_) | Stmt::Continue(_) => {}
}
}
fn visit_function_def(
&mut self,
name: &ast::Identifier,
args: &ast::Arguments,
body: &[Stmt],
decorator_list: &[Expr],
) {
let function_id = self.graph.add_node(NodeKind::Variable, name.to_string(), None);
self.cur().define(name.to_string(), function_id);
let class_name = self.current_class_node.and_then(|cid| {
self.graph.node(cid).map(|n| n.name.clone())
});
if let Some(ref cname) = class_name {
self.class_methods
.entry(cname.clone())
.or_default()
.push((name.to_string(), function_id));
if self.scopes.len() >= 2 {
let class_scope = self.scopes.len() - 2;
self.scopes[class_scope].define(name.to_string(), function_id);
}
}
let mut decorated_id = function_id;
for decorator in decorator_list.iter().rev() {
let dec_id = self.eval_expr(decorator);
let next_id = self
.graph
.add_node(NodeKind::Variable, format!("<decorated {}>", name), None);
self.flow(decorated_id, next_id);
self.flow(dec_id, next_id);
decorated_id = next_id;
}
if !decorator_list.is_empty() {
if let Some(graph_node) = self.graph.node_mut(decorated_id) {
graph_node.alias = Some(name.to_string());
}
let scope_idx = self.resolve_scope_for_name(name.as_str());
self.scopes[scope_idx].define(name.to_string(), decorated_id);
}
let previous = self.current_function_node.replace(function_id);
self.scopes.push(Scope::new());
let params = self.bind_arguments(args);
self.function_params.insert(name.to_string(), params.clone());
if let Some(ref cname) = class_name {
if let Some(first_arg) = args.args.first() {
let self_name = first_arg.def.arg.to_string();
if self_name == "self" || self_name == "cls" {
if let Some(param_id) = params.positional.first().copied() {
if let Some(node) = self.graph.node_mut(param_id) {
node.alias = Some(cname.clone());
}
}
}
}
}
self.visit_stmts(body);
self.scopes.pop();
self.current_function_node = previous;
}
fn visit_for(&mut self, target: &Expr, iter: &Expr, body: &[Stmt], orelse: &[Stmt]) {
let iter_id = self.eval_expr(iter);
self.bind_target(target, iter_id);
self.visit_stmts(body);
self.visit_stmts(orelse);
}
fn visit_with(&mut self, items: &[ast::WithItem], body: &[Stmt]) {
for item in items {
let context_id = self.eval_expr(&item.context_expr);
if let Some(optional_vars) = &item.optional_vars {
self.bind_target(optional_vars, context_id);
}
}
self.visit_stmts(body);
}
fn visit_try(
&mut self,
body: &[Stmt],
handlers: &[ast::ExceptHandler],
orelse: &[Stmt],
finalbody: &[Stmt],
) {
self.visit_stmts(body);
let raised = self.raised_exprs(body);
let raised_ids: Vec<NodeId> = raised.into_iter().map(|e| self.eval_expr(e)).collect();
self.visit_stmts(orelse);
for handler in handlers {
let ast::ExceptHandler::ExceptHandler(handler) = handler;
self.scopes.push(Scope::new());
let type_id = handler.type_.as_ref().map(|t| self.eval_expr(t));
if let Some(name) = &handler.name {
let id = self.graph.add_node(NodeKind::Variable, name.to_string(), None);
if let Some(tid) = type_id {
self.flow(tid, id);
}
for &rid in &raised_ids {
self.flow(rid, id);
}
self.cur().define(name.to_string(), id);
}
self.visit_stmts(&handler.body);
self.scopes.pop();
}
self.visit_stmts(finalbody);
}
fn raised_exprs<'a>(&self, stmts: &'a [Stmt]) -> Vec<&'a Expr> {
let mut out = Vec::new();
for stmt in stmts {
match stmt {
Stmt::Raise(node) => {
if let Some(exc) = &node.exc {
out.push(exc.as_ref());
}
}
Stmt::If(node) => {
out.extend(self.raised_exprs(&node.body));
out.extend(self.raised_exprs(&node.orelse));
}
Stmt::For(node) => {
out.extend(self.raised_exprs(&node.body));
out.extend(self.raised_exprs(&node.orelse));
}
Stmt::While(node) => {
out.extend(self.raised_exprs(&node.body));
out.extend(self.raised_exprs(&node.orelse));
}
Stmt::With(node) => out.extend(self.raised_exprs(&node.body)),
Stmt::AsyncWith(node) => out.extend(self.raised_exprs(&node.body)),
Stmt::ClassDef(node) => out.extend(self.raised_exprs(&node.body)),
Stmt::Match(node) => {
for case in &node.cases {
out.extend(self.raised_exprs(&case.body));
}
}
_ => {}
}
}
out
}
pub(super) fn bind_arguments(&mut self, args: &ast::Arguments) -> FunctionParams {
let mut params = FunctionParams {
positional: Vec::new(),
by_name: HashMap::new(),
kwarg: None,
vararg: None,
};
for arg in &args.posonlyargs {
let id = self.bind_name(&arg.def.arg.to_string(), None);
params.positional.push(id);
params.by_name.insert(arg.def.arg.to_string(), id);
if let Some(default) = &arg.default {
self.eval_expr(default);
}
}
for arg in &args.args {
let id = self.bind_name(&arg.def.arg.to_string(), None);
params.positional.push(id);
params.by_name.insert(arg.def.arg.to_string(), id);
if let Some(default) = &arg.default {
self.eval_expr(default);
}
}
if let Some(vararg) = &args.vararg {
let id = self.bind_name(&vararg.arg.to_string(), None);
params.vararg = Some(id);
params.by_name.insert(vararg.arg.to_string(), id);
}
for arg in &args.kwonlyargs {
let id = self.bind_name(&arg.def.arg.to_string(), None);
params.by_name.insert(arg.def.arg.to_string(), id);
if let Some(default) = &arg.default {
self.eval_expr(default);
}
}
if let Some(kwarg) = &args.kwarg {
let id = self.bind_name(&kwarg.arg.to_string(), None);
params.kwarg = Some(id);
params.by_name.insert(kwarg.arg.to_string(), id);
}
params
}
pub(super) fn bind_target(&mut self, target: &Expr, source: NodeId) -> NodeId {
match target {
Expr::Name(node) => self.bind_name(node.id.as_str(), Some(source)),
Expr::Attribute(node) => {
let base = self.eval_expr(&node.value);
let full = format!("{}.{}", self.canonical_name(base), node.attr);
let id = self.member_node(base, &full);
self.flow(source, id);
id
}
Expr::Subscript(node) => {
let base = self.eval_expr(&node.value);
self.eval_expr(&node.slice);
let name = format!("{}[]", self.canonical_name(base));
let id = self.graph.add_node(NodeKind::Variable, name.clone(), None);
if let Some(graph_node) = self.graph.node_mut(id) {
graph_node.alias = Some(name);
}
self.flow(base, id);
self.flow(source, id);
id
}
Expr::Tuple(node) => {
let mut last = source;
for expr in &node.elts {
last = self.bind_target(expr, source);
}
last
}
Expr::List(node) => {
let mut last = source;
for expr in &node.elts {
last = self.bind_target(expr, source);
}
last
}
Expr::Starred(node) => self.bind_target(&node.value, source),
_ => source,
}
}
fn bind_pattern(&mut self, pattern: &Pattern, source: NodeId) {
match pattern {
Pattern::MatchAs(node) => {
if let Some(name) = &node.name {
self.bind_name(name.as_str(), Some(source));
}
if let Some(inner) = &node.pattern {
self.bind_pattern(inner, source);
}
}
Pattern::MatchStar(node) => {
if let Some(name) = &node.name {
self.bind_name(name.as_str(), Some(source));
}
}
Pattern::MatchSequence(node) => {
for p in &node.patterns {
self.bind_pattern(p, source);
}
}
Pattern::MatchMapping(node) => {
if let Some(rest) = &node.rest {
self.bind_name(rest.as_str(), Some(source));
}
for p in &node.patterns {
self.bind_pattern(p, source);
}
}
Pattern::MatchClass(node) => {
for p in &node.patterns {
self.bind_pattern(p, source);
}
for p in &node.kwd_patterns {
self.bind_pattern(p, source);
}
}
Pattern::MatchOr(node) => {
for p in &node.patterns {
self.bind_pattern(p, source);
}
}
_ => {}
}
}
fn bind_name(&mut self, name: &str, source: Option<NodeId>) -> NodeId {
let id = self.graph.add_node(NodeKind::Variable, name.to_string(), None);
if let Some(source_id) = source {
self.flow(source_id, id);
self.var_sources.insert(id, source_id);
if self.graph.node(source_id).is_some_and(|n| n.kind != NodeKind::Variable) {
if let Some(alias) = self.alias_from(source_id) {
if let Some(graph_node) = self.graph.node_mut(id) {
graph_node.alias = Some(alias);
}
}
}
}
let scope_idx = self.resolve_scope_for_name(name);
self.scopes[scope_idx].define(name.to_string(), id);
id
}
fn resolve_scope_for_name(&self, name: &str) -> usize {
if let Some(cur) = self.scopes.last() {
if cur.is_global(name) {
return 0;
}
if cur.is_nonlocal(name) {
for i in (0..self.scopes.len() - 1).rev() {
if self.scopes[i].resolve(name).is_some() {
return i;
}
}
return self.scopes.len().saturating_sub(2);
}
}
self.scopes.len().saturating_sub(1)
}
pub(super) fn visit_lambda_body(&mut self, body: &Expr, lambda_id: NodeId, _params: &FunctionParams) {
let value_id = self.eval_expr(body);
self.graph.add_edge(value_id, lambda_id, EdgeKind::Return);
}
pub(super) fn all_scope_vars(&self) -> Vec<NodeId> {
let mut vars = Vec::new();
for scope in &self.scopes {
for (name, &id) in scope.bindings() {
if name.contains('.') {
continue;
}
vars.push(id);
}
}
vars
}
}