use std::{collections::HashMap, path::Path};
use sqry_core::graph::unified::StagingGraph;
use sqry_core::graph::unified::build::GraphBuildHelper;
use sqry_core::graph::unified::edge::FfiConvention;
use sqry_core::graph::unified::node::NodeId as UnifiedNodeId;
use sqry_core::graph::{GraphBuilder, GraphBuilderError, GraphResult, Language, Span};
use tree_sitter::{Node, Tree};
use super::local_scopes;
const DEFAULT_SCOPE_DEPTH: usize = 4;
const STD_C_MODULES: &[&str] = &[
"_ctypes",
"_socket",
"_ssl",
"_hashlib",
"_json",
"_pickle",
"_struct",
"_sqlite3",
"_decimal",
"_lzma",
"_bz2",
"_zlib",
"_elementtree",
"_csv",
"_datetime",
"_heapq",
"_bisect",
"_random",
"_collections",
"_functools",
"_itertools",
"_operator",
"_io",
"_thread",
"_multiprocessing",
"_posixsubprocess",
"_asyncio",
"array",
"math",
"cmath",
];
const THIRD_PARTY_C_PACKAGES: &[&str] = &[
"numpy",
"pandas",
"scipy",
"sklearn",
"cv2",
"PIL",
"torch",
"tensorflow",
"lxml",
"psycopg2",
"MySQLdb",
"sqlite3",
"cryptography",
"bcrypt",
"regex",
"ujson",
"orjson",
"msgpack",
"greenlet",
"gevent",
"uvloop",
];
#[derive(Debug, Clone, Copy)]
pub struct PythonGraphBuilder {
max_scope_depth: usize,
}
impl Default for PythonGraphBuilder {
fn default() -> Self {
Self {
max_scope_depth: DEFAULT_SCOPE_DEPTH,
}
}
}
impl PythonGraphBuilder {
#[must_use]
pub fn new(max_scope_depth: usize) -> Self {
Self { max_scope_depth }
}
}
impl GraphBuilder for PythonGraphBuilder {
fn build_graph(
&self,
tree: &Tree,
content: &[u8],
file: &Path,
staging: &mut StagingGraph,
) -> GraphResult<()> {
let mut helper = GraphBuildHelper::new(staging, file, Language::Python);
let ast_graph = ASTGraph::from_tree(tree, content, self.max_scope_depth).map_err(|e| {
GraphBuilderError::ParseError {
span: Span::default(),
reason: e,
}
})?;
let has_all = has_all_assignment(tree.root_node(), content);
let mut scope_tree = local_scopes::build(tree.root_node(), content)?;
let recursion_limits =
sqry_core::config::RecursionLimits::load_or_default().map_err(|e| {
GraphBuilderError::ParseError {
span: Span::default(),
reason: format!("Failed to load recursion limits: {e}"),
}
})?;
let file_ops_depth = recursion_limits.effective_file_ops_depth().map_err(|e| {
GraphBuilderError::ParseError {
span: Span::default(),
reason: format!("Invalid file_ops_depth configuration: {e}"),
}
})?;
let mut guard =
sqry_core::query::security::RecursionGuard::new(file_ops_depth).map_err(|e| {
GraphBuilderError::ParseError {
span: Span::default(),
reason: format!("Failed to create recursion guard: {e}"),
}
})?;
walk_tree_for_graph(
tree.root_node(),
content,
&ast_graph,
&mut helper,
has_all,
&mut guard,
&mut scope_tree,
)?;
Ok(())
}
fn language(&self) -> Language {
Language::Python
}
}
fn has_all_assignment(node: Node, content: &[u8]) -> bool {
let mut cursor = node.walk();
for child in node.children(&mut cursor) {
if child.kind() == "expression_statement" {
let assignment = child
.children(&mut child.walk())
.find(|c| c.kind() == "assignment" || c.kind() == "augmented_assignment");
if let Some(assignment) = assignment
&& let Some(left) = assignment.child_by_field_name("left")
&& let Ok(left_text) = left.utf8_text(content)
&& left_text.trim() == "__all__"
{
return true;
}
}
}
false
}
#[allow(clippy::too_many_lines)]
fn walk_tree_for_graph(
node: Node,
content: &[u8],
ast_graph: &ASTGraph,
helper: &mut GraphBuildHelper,
has_all: bool,
guard: &mut sqry_core::query::security::RecursionGuard,
scope_tree: &mut local_scopes::PythonScopeTree,
) -> GraphResult<()> {
guard.enter().map_err(|e| GraphBuilderError::ParseError {
span: Span::default(),
reason: format!("Recursion limit exceeded: {e}"),
})?;
match node.kind() {
"class_definition" => {
if let Some(name_node) = node.child_by_field_name("name")
&& let Ok(class_name) = name_node.utf8_text(content)
{
let span = span_from_node(node);
let qualified_name = class_name.to_string();
let class_id = helper.add_class(&qualified_name, Some(span));
process_class_inheritance(node, content, class_id, helper);
if !has_all && is_module_level(node) && is_public_name(class_name) {
export_from_file_module(helper, class_id);
}
}
}
"expression_statement" => {
process_all_assignment(node, content, helper);
process_annotated_assignment(node, content, ast_graph, helper);
}
"function_definition" => {
if let Some(call_context) = ast_graph.get_callable_context(node.id()) {
let span = span_from_node(node);
let func_name = node
.child_by_field_name("name")
.and_then(|n| n.utf8_text(content).ok())
.unwrap_or("");
let visibility = extract_visibility_from_name(func_name);
let is_property = has_property_decorator(node, content);
let return_type = extract_return_type_annotation(node, content);
let function_id = if is_property && call_context.is_method {
helper.add_node_with_visibility(
&call_context.qualified_name,
Some(span),
sqry_core::graph::unified::node::NodeKind::Property,
Some(visibility),
)
} else if call_context.is_method {
if return_type.is_some() {
helper.add_method_with_signature(
&call_context.qualified_name,
Some(span),
call_context.is_async,
false, Some(visibility),
return_type.as_deref(),
)
} else {
helper.add_method_with_visibility(
&call_context.qualified_name,
Some(span),
call_context.is_async,
false,
Some(visibility),
)
}
} else {
if return_type.is_some() {
helper.add_function_with_signature(
&call_context.qualified_name,
Some(span),
call_context.is_async,
false, Some(visibility),
return_type.as_deref(),
)
} else {
helper.add_function_with_visibility(
&call_context.qualified_name,
Some(span),
call_context.is_async,
false,
Some(visibility),
)
}
};
if let Some((http_method, route_path)) = extract_route_decorator_info(node, content)
{
let endpoint_name = format!("route::{http_method}::{route_path}");
let endpoint_id = helper.add_endpoint(&endpoint_name, Some(span));
helper.add_contains_edge(endpoint_id, function_id);
}
process_function_parameters(node, content, ast_graph, helper);
if !has_all
&& !call_context.is_method
&& is_module_level(node)
&& let Some(name_node) = node.child_by_field_name("name")
&& let Ok(func_name) = name_node.utf8_text(content)
&& is_public_name(func_name)
{
export_from_file_module(helper, function_id);
}
}
}
"call" => {
let is_ffi = build_ffi_call_edge(ast_graph, node, content, helper)?;
if !is_ffi {
if let Ok(Some((caller_qname, callee_qname, argument_count, is_awaited))) =
build_call_for_staging(ast_graph, node, content)
{
let call_context = ast_graph.get_callable_context(node.id());
let is_async = call_context.is_some_and(|c| c.is_async);
let source_id = helper.ensure_function(&caller_qname, None, is_async, false);
let target_id = helper.ensure_function(&callee_qname, None, false, false);
let call_span = span_from_node(node);
let argument_count = u8::try_from(argument_count).unwrap_or(u8::MAX);
helper.add_call_edge_full_with_span(
source_id,
target_id,
argument_count,
is_awaited,
vec![call_span],
);
}
}
}
"import_statement" | "import_from_statement" => {
if let Ok(Some((from_qname, to_qname))) =
build_import_for_staging(node, content, helper)
{
let from_id = helper.add_import(&from_qname, None);
let to_id = helper.add_import(&to_qname, Some(span_from_node(node)));
helper.add_import_edge(from_id, to_id);
if is_native_extension_import(&to_qname) {
build_native_import_ffi_edge(&to_qname, node, helper);
}
}
}
"identifier" => {
local_scopes::handle_identifier_for_reference(node, content, scope_tree, helper);
}
_ => {}
}
let mut cursor = node.walk();
for child in node.children(&mut cursor) {
walk_tree_for_graph(
child, content, ast_graph, helper, has_all, guard, scope_tree,
)?;
}
guard.exit();
Ok(())
}
fn build_call_for_staging(
ast_graph: &ASTGraph,
call_node: Node<'_>,
content: &[u8],
) -> GraphResult<Option<(String, String, usize, bool)>> {
let module_context;
let call_context = if let Some(ctx) = ast_graph.get_callable_context(call_node.id()) {
ctx
} else {
module_context = CallContext {
qualified_name: "<module>".to_string(),
span: (0, content.len()),
is_async: false,
is_method: false,
class_name: None,
};
&module_context
};
let Some(callee_expr) = call_node.child_by_field_name("function") else {
return Ok(None);
};
let callee_text = callee_expr
.utf8_text(content)
.map_err(|_| GraphBuilderError::ParseError {
span: span_from_node(call_node),
reason: "failed to read call expression".to_string(),
})?
.trim()
.to_string();
if callee_text.is_empty() {
return Ok(None);
}
let callee_simple = simple_name(&callee_text);
if callee_simple.is_empty() {
return Ok(None);
}
let caller_qname = call_context.qualified_name();
let target_qname = if let Some(method_name) = callee_text.strip_prefix("self.") {
if let Some(class_name) = &call_context.class_name {
format!("{}.{}", class_name, simple_name(method_name))
} else {
callee_simple.to_string()
}
} else {
callee_simple.to_string()
};
let argument_count = count_arguments(call_node);
let is_awaited = is_awaited_call(call_node);
Ok(Some((
caller_qname,
target_qname,
argument_count,
is_awaited,
)))
}
fn build_import_for_staging(
import_node: Node<'_>,
content: &[u8],
helper: &GraphBuildHelper,
) -> GraphResult<Option<(String, String)>> {
let raw_module_name = if import_node.kind() == "import_statement" {
import_node
.child_by_field_name("name")
.and_then(|n| extract_module_name(n, content))
} else if import_node.kind() == "import_from_statement" {
import_node
.child_by_field_name("module_name")
.and_then(|n| extract_module_name(n, content))
} else {
None
};
let module_name = if raw_module_name.is_none() && import_node.kind() == "import_from_statement"
{
if let Ok(import_text) = import_node.utf8_text(content) {
if let Some(from_idx) = import_text.find("from") {
if let Some(import_idx) = import_text.find("import") {
let between = import_text[from_idx + 4..import_idx].trim();
if between.starts_with('.') {
Some(between.to_string())
} else {
None
}
} else {
None
}
} else {
None
}
} else {
None
}
} else {
raw_module_name
};
let Some(module_name) = module_name else {
return Ok(None);
};
if module_name.is_empty() {
return Ok(None);
}
let resolved_path = sqry_core::graph::resolve_python_import(
std::path::Path::new(helper.file_path()),
&module_name,
import_node.kind() == "import_from_statement",
)?;
Ok(Some((helper.file_path().to_string(), resolved_path)))
}
fn span_from_node(node: Node<'_>) -> Span {
let start = node.start_position();
let end = node.end_position();
Span::new(
sqry_core::graph::node::Position::new(start.row, start.column),
sqry_core::graph::node::Position::new(end.row, end.column),
)
}
fn count_arguments(call_node: Node<'_>) -> usize {
call_node
.child_by_field_name("arguments")
.map_or(0, |args| {
args.named_children(&mut args.walk())
.filter(|child| {
!matches!(child.kind(), "," | "(" | ")")
})
.count()
})
}
fn is_awaited_call(call_node: Node<'_>) -> bool {
let mut current = call_node.parent();
while let Some(node) = current {
let kind = node.kind();
if kind == "await" || kind == "await_expression" {
return true;
}
current = node.parent();
}
false
}
fn simple_name(qualified: &str) -> &str {
qualified.split('.').next_back().unwrap_or(qualified)
}
fn ffi_library_simple_name(library_path: &str) -> String {
use std::path::Path;
let filename = Path::new(library_path)
.file_name()
.and_then(|f| f.to_str())
.unwrap_or(library_path);
if let Some(so_pos) = filename.find(".so.") {
return filename[..so_pos].to_string();
}
if let Some(dot_pos) = filename.find('.') {
let extension = &filename[dot_pos + 1..];
if extension == "so" || extension == "dll" || extension == "dylib" {
return filename[..dot_pos].to_string();
}
}
filename.to_string()
}
fn is_public_name(name: &str) -> bool {
!name.starts_with('_')
}
fn is_module_level(node: Node<'_>) -> bool {
let mut current = node.parent();
while let Some(parent) = current {
match parent.kind() {
"module" => return true,
"function_definition" | "class_definition" => return false,
_ => current = parent.parent(),
}
}
false
}
const FILE_MODULE_NAME: &str = "<file_module>";
fn export_from_file_module(
helper: &mut GraphBuildHelper,
exported: sqry_core::graph::unified::node::NodeId,
) {
let module_id = helper.add_module(FILE_MODULE_NAME, None);
helper.add_export_edge(module_id, exported);
}
fn extract_module_name(node: Node<'_>, content: &[u8]) -> Option<String> {
if node.kind() == "aliased_import" {
return node
.child_by_field_name("name")
.and_then(|name_node| name_node.utf8_text(content).ok())
.map(std::string::ToString::to_string);
}
node.utf8_text(content)
.ok()
.map(std::string::ToString::to_string)
}
fn process_all_assignment(node: Node<'_>, content: &[u8], helper: &mut GraphBuildHelper) {
let assignment = node
.children(&mut node.walk())
.find(|child| child.kind() == "assignment" || child.kind() == "augmented_assignment");
let Some(assignment) = assignment else {
return;
};
let left = assignment.child_by_field_name("left");
let Some(left) = left else {
return;
};
let Ok(left_text) = left.utf8_text(content) else {
return;
};
if left_text.trim() != "__all__" {
return;
}
let right = assignment.child_by_field_name("right");
let Some(right) = right else {
return;
};
if right.kind() == "list" || right.kind() == "tuple" {
process_all_list(right, content, helper);
}
}
fn process_all_list(list_node: Node<'_>, content: &[u8], helper: &mut GraphBuildHelper) {
for child in list_node.children(&mut list_node.walk()) {
if child.kind() == "string"
&& let Some(export_name) = extract_string_content(child, content)
&& !export_name.is_empty()
{
let span = span_from_node(child);
let export_id = helper.add_function(&export_name, Some(span), false, false);
export_from_file_module(helper, export_id);
}
}
}
fn extract_string_content(string_node: Node<'_>, content: &[u8]) -> Option<String> {
let Ok(text) = string_node.utf8_text(content) else {
return None;
};
let text = text.trim();
let stripped = text
.trim_start_matches(|c: char| {
c == 'r'
|| c == 'b'
|| c == 'f'
|| c == 'u'
|| c == 'R'
|| c == 'B'
|| c == 'F'
|| c == 'U'
})
.trim_start_matches("'''")
.trim_end_matches("'''")
.trim_start_matches("\"\"\"")
.trim_end_matches("\"\"\"")
.trim_start_matches('\'')
.trim_end_matches('\'')
.trim_start_matches('"')
.trim_end_matches('"');
Some(stripped.to_string())
}
fn process_class_inheritance(
class_node: Node<'_>,
content: &[u8],
class_id: UnifiedNodeId,
helper: &mut GraphBuildHelper,
) {
let superclasses = class_node.child_by_field_name("superclasses");
let Some(superclasses) = superclasses else {
return;
};
for child in superclasses.children(&mut superclasses.walk()) {
if child.kind() == "keyword_argument" {
continue;
}
match child.kind() {
"identifier" => {
if let Ok(base_name) = child.utf8_text(content) {
let base_name = base_name.trim();
if !base_name.is_empty() {
let span = span_from_node(child);
let base_id = helper.add_class(base_name, Some(span));
helper.add_inherits_edge(class_id, base_id);
}
}
}
"attribute" => {
if let Ok(base_name) = child.utf8_text(content) {
let base_name = base_name.trim();
if !base_name.is_empty() {
let span = span_from_node(child);
let base_id = helper.add_class(base_name, Some(span));
helper.add_inherits_edge(class_id, base_id);
}
}
}
"call" => {
if let Some(func) = child.child_by_field_name("function")
&& let Ok(base_name) = func.utf8_text(content)
{
let base_name = base_name.trim();
if !base_name.is_empty() {
let span = span_from_node(child);
let base_id = helper.add_class(base_name, Some(span));
helper.add_inherits_edge(class_id, base_id);
}
}
}
"subscript" => {
if let Some(value) = child.child_by_field_name("value")
&& let Ok(base_name) = value.utf8_text(content)
{
let base_name = base_name.trim();
if !base_name.is_empty() {
let span = span_from_node(child);
let base_id = helper.add_class(base_name, Some(span));
helper.add_inherits_edge(class_id, base_id);
}
}
}
_ => {}
}
}
}
#[derive(Debug, Clone)]
struct CallContext {
qualified_name: String,
#[allow(dead_code)] span: (usize, usize),
is_async: bool,
is_method: bool,
class_name: Option<String>,
}
impl CallContext {
fn qualified_name(&self) -> String {
self.qualified_name.clone()
}
}
struct ASTGraph {
contexts: Vec<CallContext>,
node_to_context: HashMap<usize, usize>,
}
impl ASTGraph {
fn from_tree(tree: &Tree, content: &[u8], max_depth: usize) -> Result<Self, String> {
let mut contexts = Vec::new();
let mut node_to_context = HashMap::new();
let mut scope_stack: Vec<String> = Vec::new();
let mut class_stack: Vec<String> = Vec::new();
walk_ast(
tree.root_node(),
content,
&mut contexts,
&mut node_to_context,
&mut scope_stack,
&mut class_stack,
max_depth,
)?;
Ok(Self {
contexts,
node_to_context,
})
}
#[allow(dead_code)] fn contexts(&self) -> &[CallContext] {
&self.contexts
}
fn get_callable_context(&self, node_id: usize) -> Option<&CallContext> {
self.node_to_context
.get(&node_id)
.and_then(|idx| self.contexts.get(*idx))
}
}
fn walk_ast(
node: Node,
content: &[u8],
contexts: &mut Vec<CallContext>,
node_to_context: &mut HashMap<usize, usize>,
scope_stack: &mut Vec<String>,
class_stack: &mut Vec<String>,
max_depth: usize,
) -> Result<(), String> {
if scope_stack.len() > max_depth {
return Ok(());
}
match node.kind() {
"class_definition" => {
let name_node = node
.child_by_field_name("name")
.ok_or_else(|| "class_definition missing name".to_string())?;
let class_name = name_node
.utf8_text(content)
.map_err(|_| "failed to read class name".to_string())?;
let qualified_class = if scope_stack.is_empty() {
class_name.to_string()
} else {
format!("{}.{}", scope_stack.join("."), class_name)
};
class_stack.push(qualified_class.clone());
scope_stack.push(class_name.to_string());
if let Some(body) = node.child_by_field_name("body") {
let mut cursor = body.walk();
for child in body.children(&mut cursor) {
walk_ast(
child,
content,
contexts,
node_to_context,
scope_stack,
class_stack,
max_depth,
)?;
}
}
class_stack.pop();
scope_stack.pop();
}
"function_definition" => {
let name_node = node
.child_by_field_name("name")
.ok_or_else(|| "function_definition missing name".to_string())?;
let func_name = name_node
.utf8_text(content)
.map_err(|_| "failed to read function name".to_string())?;
let is_async = node
.children(&mut node.walk())
.any(|child| child.kind() == "async");
let qualified_func = if scope_stack.is_empty() {
func_name.to_string()
} else {
format!("{}.{}", scope_stack.join("."), func_name)
};
let is_method = !class_stack.is_empty();
let class_name = class_stack.last().cloned();
let context_idx = contexts.len();
contexts.push(CallContext {
qualified_name: qualified_func.clone(),
span: (node.start_byte(), node.end_byte()),
is_async,
is_method,
class_name,
});
node_to_context.insert(node.id(), context_idx);
if let Some(body) = node.child_by_field_name("body") {
associate_descendants(body, context_idx, node_to_context);
}
scope_stack.push(func_name.to_string());
if let Some(body) = node.child_by_field_name("body") {
let mut cursor = body.walk();
for child in body.children(&mut cursor) {
walk_ast(
child,
content,
contexts,
node_to_context,
scope_stack,
class_stack,
max_depth,
)?;
}
}
scope_stack.pop();
}
_ => {
let mut cursor = node.walk();
for child in node.children(&mut cursor) {
walk_ast(
child,
content,
contexts,
node_to_context,
scope_stack,
class_stack,
max_depth,
)?;
}
}
}
Ok(())
}
fn associate_descendants(
node: Node,
context_idx: usize,
node_to_context: &mut HashMap<usize, usize>,
) {
node_to_context.insert(node.id(), context_idx);
let mut stack = vec![node];
while let Some(current) = stack.pop() {
node_to_context.insert(current.id(), context_idx);
let mut cursor = current.walk();
for child in current.children(&mut cursor) {
stack.push(child);
}
}
}
fn build_ffi_call_edge(
ast_graph: &ASTGraph,
call_node: Node<'_>,
content: &[u8],
helper: &mut GraphBuildHelper,
) -> GraphResult<bool> {
let Some(callee_expr) = call_node.child_by_field_name("function") else {
return Ok(false);
};
let callee_text = callee_expr
.utf8_text(content)
.map_err(|_| GraphBuilderError::ParseError {
span: span_from_node(call_node),
reason: "failed to read call expression".to_string(),
})?
.trim();
if is_ctypes_load_call(callee_text) {
return Ok(build_ctypes_ffi_edge(
ast_graph,
call_node,
content,
callee_text,
helper,
));
}
if is_cffi_dlopen_call(callee_text) {
return Ok(build_cffi_ffi_edge(ast_graph, call_node, content, helper));
}
Ok(false)
}
fn is_ctypes_load_call(callee_text: &str) -> bool {
callee_text == "ctypes.CDLL"
|| callee_text == "ctypes.WinDLL"
|| callee_text == "ctypes.OleDLL"
|| callee_text == "ctypes.PyDLL"
|| callee_text == "ctypes.cdll.LoadLibrary"
|| callee_text == "ctypes.windll.LoadLibrary"
|| callee_text == "ctypes.oledll.LoadLibrary"
|| callee_text == "CDLL"
|| callee_text == "WinDLL"
|| callee_text == "OleDLL"
|| callee_text == "PyDLL"
|| callee_text == "cdll.LoadLibrary"
|| callee_text == "windll.LoadLibrary"
|| callee_text == "oledll.LoadLibrary"
}
fn is_cffi_dlopen_call(callee_text: &str) -> bool {
callee_text == "ffi.dlopen"
|| callee_text == "cffi.dlopen"
|| callee_text == "_ffi.dlopen"
|| callee_text == "FFI().dlopen"
}
fn build_ctypes_ffi_edge(
ast_graph: &ASTGraph,
call_node: Node<'_>,
content: &[u8],
callee_text: &str,
helper: &mut GraphBuildHelper,
) -> bool {
let caller_id = get_ffi_caller_node_id(ast_graph, call_node, content, helper);
let convention = if callee_text.contains("WinDLL")
|| callee_text.contains("windll")
|| callee_text.contains("OleDLL")
{
FfiConvention::Stdcall
} else {
FfiConvention::C
};
let library_name = extract_ffi_library_name(call_node, content)
.unwrap_or_else(|| "ctypes::unknown".to_string());
let ffi_name = format!("native::{}", ffi_library_simple_name(&library_name));
let ffi_node_id = helper.add_module(&ffi_name, Some(span_from_node(call_node)));
helper.add_ffi_edge(caller_id, ffi_node_id, convention);
true
}
fn build_cffi_ffi_edge(
ast_graph: &ASTGraph,
call_node: Node<'_>,
content: &[u8],
helper: &mut GraphBuildHelper,
) -> bool {
let caller_id = get_ffi_caller_node_id(ast_graph, call_node, content, helper);
let library_name =
extract_ffi_library_name(call_node, content).unwrap_or_else(|| "cffi::unknown".to_string());
let ffi_name = format!("native::{}", ffi_library_simple_name(&library_name));
let ffi_node_id = helper.add_module(&ffi_name, Some(span_from_node(call_node)));
helper.add_ffi_edge(caller_id, ffi_node_id, FfiConvention::C);
true
}
fn get_ffi_caller_node_id(
ast_graph: &ASTGraph,
node: Node<'_>,
content: &[u8],
helper: &mut GraphBuildHelper,
) -> UnifiedNodeId {
let module_context;
let call_context = if let Some(ctx) = ast_graph.get_callable_context(node.id()) {
ctx
} else {
module_context = CallContext {
qualified_name: "<module>".to_string(),
span: (0, content.len()),
is_async: false,
is_method: false,
class_name: None,
};
&module_context
};
let caller_span = Some(Span::from_bytes(call_context.span.0, call_context.span.1));
helper.ensure_function(
&call_context.qualified_name(),
caller_span,
call_context.is_async,
false,
)
}
fn extract_ffi_library_name(call_node: Node<'_>, content: &[u8]) -> Option<String> {
let args = call_node.child_by_field_name("arguments")?;
let mut cursor = args.walk();
let first_arg = args
.children(&mut cursor)
.find(|child| !matches!(child.kind(), "(" | ")" | ","))?;
if first_arg.kind() == "string" {
return extract_string_content(first_arg, content);
}
if first_arg.kind() == "identifier" {
let text = first_arg.utf8_text(content).ok()?;
return Some(format!("${}", text.trim())); }
None
}
fn is_native_extension_import(module_name: &str) -> bool {
if module_name.starts_with('_') && !module_name.starts_with("__") {
return true;
}
let base_module = module_name.split('.').next().unwrap_or(module_name);
STD_C_MODULES.contains(&base_module) || THIRD_PARTY_C_PACKAGES.contains(&base_module)
}
fn build_native_import_ffi_edge(
module_name: &str,
import_node: Node<'_>,
helper: &mut GraphBuildHelper,
) {
let file_path = helper.file_path().to_string();
let importer_id = helper.add_module(&file_path, None);
let ffi_name = format!("native::{}", simple_name(module_name));
let ffi_node_id = helper.add_module(&ffi_name, Some(span_from_node(import_node)));
helper.add_ffi_edge(importer_id, ffi_node_id, FfiConvention::C);
}
const ROUTE_METHOD_NAMES: &[&str] = &["get", "post", "put", "delete", "patch"];
const ROUTE_RECEIVER_NAMES: &[&str] = &["app", "router", "blueprint"];
fn extract_route_decorator_info(func_node: Node<'_>, content: &[u8]) -> Option<(String, String)> {
let parent = func_node.parent()?;
if parent.kind() != "decorated_definition" {
return None;
}
let mut cursor = parent.walk();
for child in parent.children(&mut cursor) {
if child.kind() != "decorator" {
continue;
}
let Ok(decorator_text) = child.utf8_text(content) else {
continue;
};
let decorator_text = decorator_text.trim();
let without_at = decorator_text.strip_prefix('@')?;
if let Some(result) = parse_route_decorator_text(without_at) {
return Some(result);
}
}
None
}
fn parse_route_decorator_text(text: &str) -> Option<(String, String)> {
let paren_pos = text.find('(')?;
let accessor = &text[..paren_pos];
let args_text = &text[paren_pos + 1..];
let dot_pos = accessor.rfind('.')?;
let receiver = &accessor[..dot_pos];
let method_name = &accessor[dot_pos + 1..];
let receiver_base = receiver.rsplit('.').next().unwrap_or(receiver);
if !ROUTE_RECEIVER_NAMES.contains(&receiver_base) {
return None;
}
let path = extract_path_from_decorator_args(args_text)?;
let method_lower = method_name.to_ascii_lowercase();
if ROUTE_METHOD_NAMES.contains(&method_lower.as_str()) {
return Some((method_lower.to_ascii_uppercase(), path));
}
if method_lower == "route" {
let http_method = extract_method_from_route_args(args_text);
return Some((http_method, path));
}
None
}
fn extract_path_from_decorator_args(args_text: &str) -> Option<String> {
let trimmed = args_text.trim();
let (quote_char, start_pos) = {
let single_pos = trimmed.find('\'');
let double_pos = trimmed.find('"');
match (single_pos, double_pos) {
(Some(s), Some(d)) => {
if s < d {
('\'', s)
} else {
('"', d)
}
}
(Some(s), None) => ('\'', s),
(None, Some(d)) => ('"', d),
(None, None) => return None,
}
};
let after_open = start_pos + 1;
let close_pos = trimmed[after_open..].find(quote_char)?;
let path = &trimmed[after_open..after_open + close_pos];
if path.is_empty() {
return None;
}
Some(path.to_string())
}
fn extract_method_from_route_args(args_text: &str) -> String {
let Some(methods_pos) = args_text.find("methods") else {
return "GET".to_string();
};
let after_methods = &args_text[methods_pos..];
let Some(bracket_pos) = after_methods.find('[') else {
return "GET".to_string();
};
let after_bracket = &after_methods[bracket_pos + 1..];
let method_str = extract_first_string_literal(after_bracket);
match method_str {
Some(m) => m.to_ascii_uppercase(),
None => "GET".to_string(),
}
}
fn extract_first_string_literal(text: &str) -> Option<String> {
let trimmed = text.trim();
let (quote_char, start_pos) = {
let single_pos = trimmed.find('\'');
let double_pos = trimmed.find('"');
match (single_pos, double_pos) {
(Some(s), Some(d)) => {
if s < d {
('\'', s)
} else {
('"', d)
}
}
(Some(s), None) => ('\'', s),
(None, Some(d)) => ('"', d),
(None, None) => return None,
}
};
let after_open = start_pos + 1;
let close_pos = trimmed[after_open..].find(quote_char)?;
let literal = &trimmed[after_open..after_open + close_pos];
if literal.is_empty() {
return None;
}
Some(literal.to_string())
}
fn has_property_decorator(func_node: Node<'_>, content: &[u8]) -> bool {
let Some(parent) = func_node.parent() else {
return false;
};
if parent.kind() != "decorated_definition" {
return false;
}
let mut cursor = parent.walk();
for child in parent.children(&mut cursor) {
if child.kind() == "decorator" {
if let Ok(decorator_text) = child.utf8_text(content) {
let decorator_text = decorator_text.trim();
if decorator_text == "@property"
|| decorator_text.starts_with("@property(")
|| decorator_text.starts_with("@property (")
{
return true;
}
}
}
}
false
}
fn extract_visibility_from_name(name: &str) -> &'static str {
if name.starts_with("__") && !name.ends_with("__") {
"private"
} else if name.starts_with('_') {
"protected"
} else {
"public"
}
}
fn find_containing_scope(node: Node<'_>, content: &[u8], ast_graph: &ASTGraph) -> String {
let mut current = node;
let mut found_class_name: Option<String> = None;
while let Some(parent) = current.parent() {
match parent.kind() {
"function_definition" => {
if let Some(ctx) = ast_graph.get_callable_context(parent.id()) {
return ctx.qualified_name.clone();
}
}
"class_definition" => {
if found_class_name.is_none() {
if let Some(name_node) = parent.child_by_field_name("name")
&& let Ok(class_name) = name_node.utf8_text(content)
{
found_class_name = Some(class_name.to_string());
}
}
}
_ => {}
}
current = parent;
}
found_class_name.unwrap_or_default()
}
fn extract_return_type_annotation(func_node: Node<'_>, content: &[u8]) -> Option<String> {
let return_type_node = func_node.child_by_field_name("return_type")?;
extract_type_from_node(return_type_node, content)
}
fn process_function_parameters(
func_node: Node<'_>,
content: &[u8],
ast_graph: &ASTGraph,
helper: &mut GraphBuildHelper,
) {
let Some(params_node) = func_node.child_by_field_name("parameters") else {
return;
};
let scope_prefix = ast_graph
.get_callable_context(func_node.id())
.map_or("", |ctx| ctx.qualified_name.as_str());
for param in params_node.children(&mut params_node.walk()) {
match param.kind() {
"typed_parameter" | "typed_default_parameter" => {
process_typed_parameter(param, content, scope_prefix, helper);
}
"identifier" | "default_parameter" => {}
_ => {
if param.child_by_field_name("type").is_some() {
process_typed_parameter(param, content, scope_prefix, helper);
}
}
}
}
}
fn process_typed_parameter(
param: Node<'_>,
content: &[u8],
scope_prefix: &str,
helper: &mut GraphBuildHelper,
) {
let param_name = if let Some(name_node) = param.child_by_field_name("name") {
name_node.utf8_text(content).ok()
} else {
param
.children(&mut param.walk())
.find(|c| c.kind() == "identifier")
.and_then(|n| n.utf8_text(content).ok())
};
let Some(param_name) = param_name else {
return;
};
if param_name == "self" || param_name == "cls" {
return;
}
let Some(type_node) = param.child_by_field_name("type") else {
return;
};
let Some(type_name) = extract_type_from_node(type_node, content) else {
return;
};
let qualified_param_name = if scope_prefix.is_empty() {
format!(":{param_name}")
} else {
format!("{scope_prefix}:{param_name}")
};
let param_id = helper.add_variable(&qualified_param_name, Some(span_from_node(param)));
let type_id = helper.add_type(&type_name, None);
helper.add_typeof_edge(param_id, type_id);
helper.add_reference_edge(param_id, type_id);
}
fn process_annotated_assignment(
expr_stmt_node: Node<'_>,
content: &[u8],
ast_graph: &ASTGraph,
helper: &mut GraphBuildHelper,
) {
let scope_prefix = find_containing_scope(expr_stmt_node, content, ast_graph);
for child in expr_stmt_node.children(&mut expr_stmt_node.walk()) {
if child.kind() == "assignment" {
process_typed_assignment(child, content, &scope_prefix, helper);
}
}
}
fn process_typed_assignment(
assignment_node: Node<'_>,
content: &[u8],
scope_prefix: &str,
helper: &mut GraphBuildHelper,
) {
let Some(left) = assignment_node.child_by_field_name("left") else {
return;
};
let Some(type_node) = assignment_node.child_by_field_name("type") else {
return;
};
let Ok(var_name) = left.utf8_text(content) else {
return;
};
let Some(type_name) = extract_type_from_node(type_node, content) else {
return;
};
let qualified_var_name = if scope_prefix.is_empty() {
var_name.to_string()
} else if scope_prefix.contains('.') && !scope_prefix.contains(':') {
format!("{scope_prefix}.{var_name}")
} else {
format!("{scope_prefix}:{var_name}")
};
let var_id = helper.add_variable(&qualified_var_name, Some(span_from_node(assignment_node)));
let type_id = helper.add_type(&type_name, None);
helper.add_typeof_edge(var_id, type_id);
helper.add_reference_edge(var_id, type_id);
}
fn extract_type_from_node(type_node: Node<'_>, content: &[u8]) -> Option<String> {
match type_node.kind() {
"type" => {
type_node
.named_child(0)
.and_then(|child| extract_type_from_node(child, content))
}
"identifier" => {
type_node.utf8_text(content).ok().map(String::from)
}
"string" => {
let text = type_node.utf8_text(content).ok()?;
let trimmed = text.trim();
if (trimmed.starts_with('"') && trimmed.ends_with('"'))
|| (trimmed.starts_with('\'') && trimmed.ends_with('\''))
{
let unquoted = &trimmed[1..trimmed.len() - 1];
Some(normalize_union_type(unquoted))
} else {
Some(trimmed.to_string())
}
}
"binary_operator" => {
if let Some(left) = type_node.child_by_field_name("left") {
extract_type_from_node(left, content)
} else {
type_node
.utf8_text(content)
.ok()
.map(|text| normalize_union_type(text.trim()))
}
}
"generic_type" | "subscript" => {
if let Some(value_node) = type_node.child_by_field_name("value") {
extract_type_from_node(value_node, content)
} else {
type_node
.named_child(0)
.and_then(|child| extract_type_from_node(child, content))
.or_else(|| {
type_node.utf8_text(content).ok().and_then(|text| {
text.split('[').next().map(|s| s.trim().to_string())
})
})
}
}
"attribute" => {
type_node.utf8_text(content).ok().map(String::from)
}
"list" | "tuple" | "set" => {
type_node.utf8_text(content).ok().map(String::from)
}
_ => {
let text = type_node.utf8_text(content).ok()?;
let trimmed = text.trim();
if trimmed.contains('[') {
trimmed.split('[').next().map(|s| s.trim().to_string())
} else {
Some(normalize_union_type(trimmed))
}
}
}
}
fn normalize_union_type(type_str: &str) -> String {
if let Some(pipe_pos) = type_str.find('|') {
type_str[..pipe_pos].trim().to_string()
} else {
type_str.to_string()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_simple_name_extracts_dotted_identifiers() {
assert_eq!(simple_name("module.func"), "func");
assert_eq!(simple_name("obj.method"), "method");
assert_eq!(simple_name("package.module.func"), "func");
assert_eq!(simple_name("self.helper"), "helper");
assert_eq!(simple_name("function"), "function");
assert_eq!(simple_name(""), "");
}
#[test]
fn test_ffi_library_simple_name_extracts_library_base_names() {
assert_eq!(ffi_library_simple_name("libfoo.so"), "libfoo");
assert_eq!(ffi_library_simple_name("lib1.so"), "lib1");
assert_eq!(ffi_library_simple_name("lib2.so"), "lib2");
assert_eq!(ffi_library_simple_name("kernel32.dll"), "kernel32");
assert_eq!(ffi_library_simple_name("libSystem.dylib"), "libSystem");
assert_eq!(ffi_library_simple_name("libc.so.6"), "libc");
assert_eq!(ffi_library_simple_name("kernel32"), "kernel32");
assert_eq!(ffi_library_simple_name("numpy"), "numpy");
assert_eq!(ffi_library_simple_name("$libname"), "$libname");
assert_eq!(ffi_library_simple_name(""), "");
assert_eq!(ffi_library_simple_name("lib.so"), "lib");
}
#[test]
fn test_ffi_library_simple_name_prevents_duplicate_edges() {
let name1 = ffi_library_simple_name("lib1.so");
let name2 = ffi_library_simple_name("lib2.so");
assert_ne!(
name1, name2,
"lib1.so and lib2.so must produce different simple names"
);
assert_eq!(name1, "lib1");
assert_eq!(name2, "lib2");
}
#[test]
fn test_ffi_library_simple_name_handles_directory_paths() {
assert_eq!(ffi_library_simple_name("/opt/v1.2/libfoo.so"), "libfoo");
assert_eq!(
ffi_library_simple_name("/usr/lib/x86_64-linux-gnu/libc.so.6"),
"libc"
);
assert_eq!(ffi_library_simple_name("libs/lib1.so"), "lib1");
assert_eq!(ffi_library_simple_name("./libs/kernel32.dll"), "kernel32");
assert_eq!(
ffi_library_simple_name("../lib/libSystem.dylib"),
"libSystem"
);
}
#[test]
fn test_parse_route_decorator_app_route_default_get() {
let result = parse_route_decorator_text("app.route('/api/users')");
assert_eq!(result, Some(("GET".to_string(), "/api/users".to_string())));
}
#[test]
fn test_parse_route_decorator_app_route_with_methods_post() {
let result = parse_route_decorator_text("app.route('/api/users', methods=['POST'])");
assert_eq!(result, Some(("POST".to_string(), "/api/users".to_string())));
}
#[test]
fn test_parse_route_decorator_app_route_with_methods_put_double_quotes() {
let result = parse_route_decorator_text("app.route(\"/api/items\", methods=[\"PUT\"])");
assert_eq!(result, Some(("PUT".to_string(), "/api/items".to_string())));
}
#[test]
fn test_parse_route_decorator_app_get() {
let result = parse_route_decorator_text("app.get('/api/users')");
assert_eq!(result, Some(("GET".to_string(), "/api/users".to_string())));
}
#[test]
fn test_parse_route_decorator_app_post() {
let result = parse_route_decorator_text("app.post('/api/items')");
assert_eq!(result, Some(("POST".to_string(), "/api/items".to_string())));
}
#[test]
fn test_parse_route_decorator_app_put() {
let result = parse_route_decorator_text("app.put('/api/items/1')");
assert_eq!(
result,
Some(("PUT".to_string(), "/api/items/1".to_string()))
);
}
#[test]
fn test_parse_route_decorator_app_delete() {
let result = parse_route_decorator_text("app.delete('/api/items/1')");
assert_eq!(
result,
Some(("DELETE".to_string(), "/api/items/1".to_string()))
);
}
#[test]
fn test_parse_route_decorator_app_patch() {
let result = parse_route_decorator_text("app.patch('/api/items/1')");
assert_eq!(
result,
Some(("PATCH".to_string(), "/api/items/1".to_string()))
);
}
#[test]
fn test_parse_route_decorator_router_get_fastapi() {
let result = parse_route_decorator_text("router.get('/api/users')");
assert_eq!(result, Some(("GET".to_string(), "/api/users".to_string())));
}
#[test]
fn test_parse_route_decorator_router_post_fastapi() {
let result = parse_route_decorator_text("router.post('/api/items')");
assert_eq!(result, Some(("POST".to_string(), "/api/items".to_string())));
}
#[test]
fn test_parse_route_decorator_blueprint_route() {
let result = parse_route_decorator_text("blueprint.route('/health')");
assert_eq!(result, Some(("GET".to_string(), "/health".to_string())));
}
#[test]
fn test_parse_route_decorator_unknown_receiver_returns_none() {
let result = parse_route_decorator_text("server.get('/api/users')");
assert_eq!(result, None);
}
#[test]
fn test_parse_route_decorator_unknown_method_returns_none() {
let result = parse_route_decorator_text("app.options('/api/users')");
assert_eq!(result, None);
}
#[test]
fn test_parse_route_decorator_no_parens_returns_none() {
let result = parse_route_decorator_text("app.route");
assert_eq!(result, None);
}
#[test]
fn test_parse_route_decorator_no_dot_returns_none() {
let result = parse_route_decorator_text("route('/api/users')");
assert_eq!(result, None);
}
#[test]
fn test_extract_path_from_decorator_args_single_quotes() {
let result = extract_path_from_decorator_args("'/api/users')");
assert_eq!(result, Some("/api/users".to_string()));
}
#[test]
fn test_extract_path_from_decorator_args_double_quotes() {
let result = extract_path_from_decorator_args("\"/api/items\")");
assert_eq!(result, Some("/api/items".to_string()));
}
#[test]
fn test_extract_path_from_decorator_args_empty_returns_none() {
let result = extract_path_from_decorator_args("'')");
assert_eq!(result, None);
}
#[test]
fn test_extract_path_from_decorator_args_no_string_returns_none() {
let result = extract_path_from_decorator_args("some_var)");
assert_eq!(result, None);
}
#[test]
fn test_extract_method_from_route_args_with_methods_keyword() {
let result = extract_method_from_route_args("'/api/users', methods=['POST'])");
assert_eq!(result, "POST");
}
#[test]
fn test_extract_method_from_route_args_without_methods_keyword() {
let result = extract_method_from_route_args("'/api/users')");
assert_eq!(result, "GET");
}
#[test]
fn test_extract_method_from_route_args_delete() {
let result = extract_method_from_route_args("'/api/items', methods=['DELETE'])");
assert_eq!(result, "DELETE");
}
#[test]
fn test_extract_method_from_route_args_lowercase_normalizes() {
let result = extract_method_from_route_args("'/x', methods=['put'])");
assert_eq!(result, "PUT");
}
#[test]
fn test_extract_first_string_literal_single_quotes() {
let result = extract_first_string_literal("'POST']");
assert_eq!(result, Some("POST".to_string()));
}
#[test]
fn test_extract_first_string_literal_double_quotes() {
let result = extract_first_string_literal("\"DELETE\"]");
assert_eq!(result, Some("DELETE".to_string()));
}
#[test]
fn test_extract_first_string_literal_empty_returns_none() {
let result = extract_first_string_literal("no quotes here");
assert_eq!(result, None);
}
}