use std::collections::HashMap;
use rustpython_parser::ast::{self, Constant, Expr, ExprCall};
use crate::ir::{EdgeKind, NodeId, NodeKind};
use super::scope::Scope;
use super::visitor::PythonParser;
fn is_locals_or_globals_call(expr: &Expr) -> bool {
match expr {
Expr::Call(call) => match call.func.as_ref() {
Expr::Name(name) => name.id.as_str() == "locals" || name.id.as_str() == "globals",
_ => false,
},
_ => false,
}
}
fn is_os_environ(expr: &Expr) -> bool {
match expr {
Expr::Attribute(attr) => {
matches!(attr.value.as_ref(), Expr::Name(name) if name.id.as_str() == "os")
&& attr.attr.as_str() == "environ"
}
Expr::Name(name) => name.id.as_str() == "environ",
_ => false,
}
}
impl PythonParser {
pub(super) fn eval_expr(&mut self, expr: &Expr) -> NodeId {
match expr {
Expr::Name(node) => self.var(&node.id.to_string()),
Expr::Attribute(node) => {
let base = self.eval_expr(&node.value);
let full = format!("{}.{}", self.canonical_name(base), node.attr);
self.member_node(base, &full)
}
Expr::Call(node) => self.eval_call(node),
Expr::Constant(node) => self.constant_node(&node.value),
Expr::Subscript(node) => {
let value_id = self.eval_expr(&node.value);
self.eval_expr(&node.slice);
let name = format!("{}[]", self.canonical_name(value_id));
let id = self.graph.add_node(NodeKind::Variable, name.clone(), None);
if let Some(graph_node) = self.graph.node_mut(id) {
graph_node.alias = Some(name);
}
self.flow(value_id, id);
id
}
Expr::Tuple(node) => self.aggregate("<tuple>", &node.elts),
Expr::List(node) => self.aggregate("<list>", &node.elts),
Expr::Set(node) => self.aggregate("<set>", &node.elts),
Expr::Dict(node) => {
let id = self.graph.add_node(NodeKind::Literal, "<dict>".to_string(), None);
let mut entries = HashMap::new();
for (key_opt, value) in node.keys.iter().zip(&node.values) {
let value_id = self.eval_expr(value);
self.flow(value_id, id);
if let Some(key) = key_opt {
let key_id = self.eval_expr(key);
self.flow(key_id, id);
if let Expr::Constant(c) = key {
if let Constant::Str(s) = &c.value {
entries.insert(s.clone(), value_id);
}
}
}
}
self.dict_literals.insert(id, entries);
id
}
Expr::BinOp(node) => self.combine("<binop>", [&*node.left, &*node.right]),
Expr::BoolOp(node) => self.aggregate("<boolop>", &node.values),
Expr::UnaryOp(node) => {
let operand_id = self.eval_expr(&node.operand);
let id = self.graph.add_node(NodeKind::Call, "<unary>".to_string(), None);
self.flow(operand_id, id);
id
}
Expr::Compare(node) => {
let id = self.graph.add_node(NodeKind::Call, "<compare>".to_string(), None);
let left_id = self.eval_expr(&node.left);
self.flow(left_id, id);
for comparator in &node.comparators {
let comparator_id = self.eval_expr(comparator);
self.flow(comparator_id, id);
}
id
}
Expr::IfExp(node) => self.combine("<ifexp>", [&*node.test, &*node.body, &*node.orelse]),
Expr::NamedExpr(node) => {
let value_id = self.eval_expr(&node.value);
if self.comprehension_depth > 0 {
let scope_idx = self.scopes.len().saturating_sub(2);
let id = self.graph.add_node(NodeKind::Variable, "<walrus>".to_string(), None);
self.flow(value_id, id);
if let Expr::Name(target_name) = node.target.as_ref() {
let name = target_name.id.to_string();
if let Some(node_mut) = self.graph.node_mut(id) {
node_mut.alias = Some(name.clone());
}
if scope_idx < self.scopes.len() {
self.scopes[scope_idx].define(name, id);
} else {
self.cur().define(name, id);
}
}
id
} else {
self.bind_target(&node.target, value_id)
}
}
Expr::Lambda(node) => {
let lambda_id = self.graph.add_node(NodeKind::Variable, "<lambda>".to_string(), None);
self.scopes.push(Scope::new());
let params = self.bind_arguments(&node.args);
self.visit_lambda_body(&node.body, lambda_id, ¶ms);
self.scopes.pop();
lambda_id
}
Expr::Await(node) => self.eval_expr(&node.value),
Expr::Yield(node) => {
let value_id = node
.value
.as_ref()
.map(|v| self.eval_expr(v))
.unwrap_or_else(|| self.literal("<yield>"));
if let Some(fn_node) = self.current_function_node {
self.graph.add_edge(value_id, fn_node, EdgeKind::Return);
}
value_id
}
Expr::YieldFrom(node) => {
let value_id = self.eval_expr(&node.value);
if let Some(fn_node) = self.current_function_node {
self.graph.add_edge(value_id, fn_node, EdgeKind::Return);
}
value_id
}
Expr::JoinedStr(node) => self.aggregate("<fstring>", &node.values),
Expr::FormattedValue(node) => {
let value_id = self.eval_expr(&node.value);
let id = self.graph.add_node(NodeKind::Literal, "<fmtval>".to_string(), None);
self.flow(value_id, id);
if let Some(format_spec) = &node.format_spec {
let spec_id = self.eval_expr(format_spec);
self.flow(spec_id, id);
}
id
}
Expr::ListComp(node) => {
self.eval_comprehension("<listcomp>", &node.elt, &node.generators)
}
Expr::SetComp(node) => {
self.eval_comprehension("<setcomp>", &node.elt, &node.generators)
}
Expr::DictComp(node) => {
let id = self.graph.add_node(NodeKind::Literal, "<dictcomp>".to_string(), None);
self.comprehension_depth += 1;
self.scopes.push(Scope::new());
for generator in &node.generators {
let iter_id = self.eval_expr(&generator.iter);
self.bind_target(&generator.target, iter_id);
for if_expr in &generator.ifs {
self.eval_expr(if_expr);
}
}
let key_id = self.eval_expr(&node.key);
let value_id = self.eval_expr(&node.value);
self.scopes.pop();
self.comprehension_depth -= 1;
self.flow(key_id, id);
self.flow(value_id, id);
id
}
Expr::GeneratorExp(node) => {
self.eval_comprehension("<genexpr>", &node.elt, &node.generators)
}
Expr::Starred(node) => self.eval_expr(&node.value),
Expr::Slice(node) => self.combine_optional("<slice>", [&node.lower, &node.upper, &node.step]),
}
}
fn eval_call(&mut self, node: &ExprCall) -> NodeId {
let receiver_obj = match node.func.as_ref() {
Expr::Attribute(attr) => Some(self.eval_expr(&attr.value)),
_ => None,
};
let func_id = self.eval_expr(&node.func);
let call_name = self.canonical_name(func_id);
let call_id = self.graph.add_node(NodeKind::Call, call_name.clone(), None);
if let Some(graph_node) = self.graph.node_mut(call_id) {
graph_node.alias = Some(call_name.clone());
}
self.graph.add_edge(func_id, call_id, EdgeKind::Call);
self.graph.add_edge(func_id, call_id, EdgeKind::Assignment);
if let Some(recv) = receiver_obj {
self.flow(recv, call_id);
}
let mut arg_ids = Vec::with_capacity(node.args.len());
for arg in &node.args {
match arg {
Expr::Starred(starred) => {
let inner_id = self.eval_expr(&starred.value);
let callee_key = match node.func.as_ref() {
Expr::Name(name) => Some(name.id.to_string()),
Expr::Attribute(attr) => Some(attr.attr.to_string()),
_ => None,
};
if let Some(key) = callee_key {
if let Some(params) = self.function_params.get(&key).cloned() {
if let Some(vararg_id) = params.vararg {
self.flow(inner_id, vararg_id);
}
for ¶m_id in ¶ms.positional {
self.flow(inner_id, param_id);
}
}
}
self.graph.add_edge(inner_id, call_id, EdgeKind::Argument);
}
_ => {
let arg_id = self.eval_expr(arg);
self.graph.add_edge(arg_id, call_id, EdgeKind::Argument);
arg_ids.push(arg_id);
}
}
}
for keyword in &node.keywords {
let keyword_id = self.eval_expr(&keyword.value);
self.graph.add_edge(keyword_id, call_id, EdgeKind::Argument);
let callee_key = match node.func.as_ref() {
Expr::Name(name) => Some(name.id.to_string()),
Expr::Attribute(attr) => Some(attr.attr.to_string()),
_ => None,
};
if let Some(key) = callee_key {
if let Some(params) = self.function_params.get(&key).cloned() {
match &keyword.arg {
Some(arg_name) => {
if let Some(¶m_id) = params.by_name.get(arg_name.as_str()) {
self.flow(keyword_id, param_id);
} else if let Some(kwarg_id) = params.kwarg {
self.flow(keyword_id, kwarg_id);
}
}
None => {
if let Some(kwarg_id) = params.kwarg {
self.flow(keyword_id, kwarg_id);
}
for ¶m_id in params.by_name.values() {
self.flow(keyword_id, param_id);
}
let mut current = keyword_id;
loop {
if let Some(entries) = self.dict_literals.get(¤t).cloned() {
for (key, value_id) in entries {
if let Some(¶m_id) = params.by_name.get(&key) {
self.flow(value_id, param_id);
}
}
break;
}
if let Some(&next_id) = self.var_sources.get(¤t) {
current = next_id;
} else {
break;
}
}
}
}
}
}
}
let callee_key = match node.func.as_ref() {
Expr::Name(name) => Some(name.id.to_string()),
Expr::Attribute(attr) => Some(attr.attr.to_string()),
_ => None,
};
if let Some(key) = &callee_key {
if let Some(params) = self.function_params.get(key.as_str()).cloned() {
let positional_skip = if receiver_obj.is_some() { 1 } else { 0 };
let mut arg_iter = arg_ids.into_iter();
for param_id in params.positional.iter().skip(positional_skip).copied() {
if let Some(arg_id) = arg_iter.next() {
self.flow(arg_id, param_id);
}
}
for arg_id in arg_iter {
if let Some(vararg_id) = params.vararg {
self.flow(arg_id, vararg_id);
}
}
}
}
if let Some(key) = &callee_key {
if let Some(fn_id) = self.resolve(key) {
self.graph.add_edge(fn_id, call_id, EdgeKind::Return);
}
}
let is_format_call = match node.func.as_ref() {
Expr::Attribute(attr) => {
attr.attr.as_str() == "format" || attr.attr.as_str() == "format_map"
}
_ => false,
};
if is_format_call {
for arg in &node.args {
if is_locals_or_globals_call(arg) {
for var_id in self.all_scope_vars() {
self.graph.add_edge(var_id, call_id, EdgeKind::Argument);
}
} else if is_os_environ(arg) {
let env_id = self.eval_expr(arg);
if let Some(entries) = self.dict_literals.get(&env_id).cloned() {
for (_, value_id) in entries {
self.graph.add_edge(value_id, call_id, EdgeKind::Argument);
}
}
}
}
for keyword in &node.keywords {
if keyword.arg.is_none() {
if is_locals_or_globals_call(&keyword.value) {
for var_id in self.all_scope_vars() {
self.graph.add_edge(var_id, call_id, EdgeKind::Argument);
}
} else if is_os_environ(&keyword.value) {
let env_id = self.eval_expr(&keyword.value);
if let Some(entries) = self.dict_literals.get(&env_id).cloned() {
for (_, value_id) in entries {
self.graph.add_edge(value_id, call_id, EdgeKind::Argument);
}
}
}
}
}
}
call_id
}
fn eval_comprehension(
&mut self,
name: &str,
elt: &Expr,
generators: &[ast::Comprehension],
) -> NodeId {
let id = self.graph.add_node(NodeKind::Call, name.to_string(), None);
self.comprehension_depth += 1;
self.scopes.push(Scope::new());
for generator in generators {
let iter_id = self.eval_expr(&generator.iter);
self.bind_target(&generator.target, iter_id);
for if_expr in &generator.ifs {
self.eval_expr(if_expr);
}
}
let elt_id = self.eval_expr(elt);
self.scopes.pop();
self.comprehension_depth -= 1;
self.flow(elt_id, id);
id
}
fn aggregate(&mut self, name: &str, exprs: &[Expr]) -> NodeId {
let id = self.graph.add_node(NodeKind::Literal, name.to_string(), None);
for expr in exprs {
let expr_id = self.eval_expr(expr);
self.flow(expr_id, id);
}
id
}
fn combine<const N: usize>(&mut self, name: &str, exprs: [&Expr; N]) -> NodeId {
let id = self.graph.add_node(NodeKind::Call, name.to_string(), None);
for expr in exprs {
let expr_id = self.eval_expr(expr);
self.flow(expr_id, id);
}
id
}
fn combine_optional<const N: usize>(&mut self, name: &str, exprs: [&Option<Box<Expr>>; N]) -> NodeId {
let id = self.graph.add_node(NodeKind::Call, name.to_string(), None);
for expr in exprs.into_iter().flatten() {
let expr_id = self.eval_expr(expr);
self.flow(expr_id, id);
}
id
}
fn constant_node(&mut self, constant: &Constant) -> NodeId {
match constant {
Constant::Str(value) => self.graph.add_node(NodeKind::Literal, value.clone(), None),
_ => self.literal("<const>"),
}
}
}