use sqry_core::graph::GraphBuilder;
use sqry_core::graph::unified::build::staging::StagingGraph;
use sqry_lang_python::relations::PythonGraphBuilder;
use std::path::Path;
use tree_sitter::Tree;
fn parse_python(source: &str) -> Tree {
let mut parser = tree_sitter::Parser::new();
parser
.set_language(&tree_sitter_python::LANGUAGE.into())
.expect("set Python language");
parser.parse(source, None).expect("parse Python")
}
fn build_graph(source: &str) -> StagingGraph {
let tree = parse_python(source);
let mut staging = StagingGraph::new();
let builder = PythonGraphBuilder::default();
builder
.build_graph(&tree, source.as_bytes(), Path::new("test.py"), &mut staging)
.expect("build_graph should not fail");
staging
}
fn has_edge_tag(staging: &StagingGraph, tag: &str) -> bool {
use sqry_core::graph::unified::build::staging::StagingOp;
staging
.operations()
.iter()
.any(|op| matches!(op, StagingOp::AddEdge { kind, .. } if kind.tag() == tag))
}
fn all_edge_tags(staging: &StagingGraph) -> Vec<String> {
use sqry_core::graph::unified::build::staging::StagingOp;
staging
.operations()
.iter()
.filter_map(|op| {
if let StagingOp::AddEdge { kind, .. } = op {
Some(kind.tag().to_string())
} else {
None
}
})
.collect()
}
#[test]
fn all_assignment_list_creates_export_edges() {
let source = r"
def public_func():
pass
def _private_func():
pass
__all__ = ['public_func']
";
let staging = build_graph(source);
assert!(
has_edge_tag(&staging, "exports"),
"Expected exports edge from __all__. Tags: {:?}",
all_edge_tags(&staging)
);
}
#[test]
fn all_assignment_tuple_creates_export_edges() {
let source = r"
def alpha():
pass
def beta():
pass
__all__ = ('alpha', 'beta')
";
let staging = build_graph(source);
assert!(
has_edge_tag(&staging, "exports"),
"Expected exports edge from __all__ tuple. Tags: {:?}",
all_edge_tags(&staging)
);
}
#[test]
fn all_augmented_assignment() {
let source = r"
def extra():
pass
__all__ = ['extra']
__all__ += ['more']
";
let staging = build_graph(source);
assert!(
staging.stats().nodes_staged >= 1,
"Expected at least one node from augmented assignment source"
);
}
#[test]
fn class_inherits_identifier_base() {
let source = r"
class Animal:
def speak(self): pass
class Dog(Animal):
def bark(self): pass
";
let staging = build_graph(source);
assert!(
has_edge_tag(&staging, "inherits"),
"Expected inherits edge. Tags: {:?}",
all_edge_tags(&staging)
);
}
#[test]
fn class_inherits_attribute_base() {
let source = r"
import abc
class MyInterface(abc.ABC):
def method(self): pass
";
let staging = build_graph(source);
assert!(
staging.stats().nodes_staged >= 1,
"Expected class and method nodes from attribute base class"
);
}
#[test]
fn class_inherits_call_base() {
let source = r"
def make_mixin():
class M:
pass
return M
class Concrete(make_mixin()):
pass
";
let staging = build_graph(source);
assert!(
staging.stats().nodes_staged >= 1,
"Expected nodes from call-form base class"
);
}
#[test]
fn class_inherits_subscript_base() {
let source = r"
from typing import Generic, TypeVar
T = TypeVar('T')
class Box(Generic[T]):
def get(self) -> T: ...
";
let staging = build_graph(source);
assert!(
has_edge_tag(&staging, "imports"),
"Expected imports edge from 'from typing import'. Tags: {:?}",
all_edge_tags(&staging)
);
}
#[test]
fn class_inherits_skips_keyword_arguments() {
let source = r"
import abc
class MyABC(abc.ABC, metaclass=abc.ABCMeta):
def do_it(self): pass
";
let staging = build_graph(source);
assert!(staging.stats().nodes_staged >= 1);
}
#[test]
fn property_decorator_creates_property_node() {
let source = r"
class Temperature:
def __init__(self, value: float):
self._value = value
@property
def celsius(self) -> float:
return self._value
@property
def fahrenheit(self) -> float:
return self._value * 9 / 5 + 32
";
let staging = build_graph(source);
assert!(staging.stats().nodes_staged >= 2, "Expected property nodes");
}
#[test]
fn async_function_creates_function_node() {
let source = r"
import asyncio
async def fetch(url: str) -> str:
await asyncio.sleep(0)
return url
";
let staging = build_graph(source);
assert!(staging.stats().nodes_staged >= 1);
}
#[test]
fn async_generator_function() {
let source = r"
async def async_range(n: int):
for i in range(n):
yield i
";
let staging = build_graph(source);
assert!(staging.stats().nodes_staged >= 1);
}
#[test]
fn awaited_call_recorded() {
let source = r"
import asyncio
async def helper():
return 42
async def runner():
result = await helper()
return result
";
let staging = build_graph(source);
assert!(
has_edge_tag(&staging, "calls"),
"Expected calls edge for awaited call. Tags: {:?}",
all_edge_tags(&staging)
);
}
#[test]
fn flask_route_decorator() {
let source = r"
class Flask:
def route(self, path, methods=None):
def decorator(f):
return f
return decorator
app = Flask()
@app.route('/users', methods=['GET'])
def get_users():
return []
";
let staging = build_graph(source);
assert!(staging.stats().nodes_staged >= 1);
}
#[test]
fn fastapi_get_route_decorator() {
let source = r"
class APIRouter:
def get(self, path: str):
def decorator(f):
return f
return decorator
router = APIRouter()
@router.get('/items')
async def list_items():
return []
";
let staging = build_graph(source);
assert!(staging.stats().nodes_staged >= 1);
}
#[test]
fn aliased_import() {
let source = r"
import numpy as np
def use_numpy():
return np.array([1, 2, 3])
";
let staging = build_graph(source);
assert!(staging.stats().nodes_staged >= 1);
}
#[test]
fn from_import() {
let source = r"
from os.path import join
def make_path(base: str, name: str) -> str:
return join(base, name)
";
let staging = build_graph(source);
assert!(
has_edge_tag(&staging, "imports"),
"Expected imports edge. Tags: {:?}",
all_edge_tags(&staging)
);
}
#[test]
fn relative_import() {
let source = r"
from . import utils
def use_utils():
return utils.helper()
";
let staging = build_graph(source);
assert!(
has_edge_tag(&staging, "imports"),
"Expected imports edge for relative import. Tags: {:?}",
all_edge_tags(&staging)
);
}
#[test]
fn relative_import_with_module() {
let source = r"
from .models import User
def get_user(user_id: int) -> User:
return User(user_id)
";
let staging = build_graph(source);
assert!(
has_edge_tag(&staging, "imports"),
"Expected imports edge for relative import with module. Tags: {:?}",
all_edge_tags(&staging)
);
}
#[test]
fn wildcard_import() {
let source = r"
from typing import *
def foo(x: Optional[int]) -> List[str]:
return []
";
let staging = build_graph(source);
assert!(
has_edge_tag(&staging, "imports"),
"Expected imports edge for wildcard import. Tags: {:?}",
all_edge_tags(&staging)
);
}
#[test]
fn function_with_return_type_annotation() {
let source = r"
from typing import List
def get_names() -> List[str]:
return ['alice', 'bob']
";
let staging = build_graph(source);
assert!(staging.stats().nodes_staged >= 1);
}
#[test]
fn method_with_typed_parameters() {
let source = r"
class Calculator:
def add(self, a: int, b: int) -> int:
return a + b
def subtract(self, a: float, b: float) -> float:
return a - b
";
let staging = build_graph(source);
assert!(staging.stats().nodes_staged >= 2);
}
#[test]
fn lambda_creates_scope() {
let source = r"
def make_adder(n):
return lambda x: x + n
";
let staging = build_graph(source);
assert!(staging.stats().nodes_staged >= 1);
}
#[test]
fn list_comprehension_scope() {
let source = r"
def squares(n):
return [x * x for x in range(n)]
";
let staging = build_graph(source);
assert!(staging.stats().nodes_staged >= 1);
}
#[test]
fn set_comprehension_scope() {
let source = r"
def unique_squares(items):
return {x * x for x in items}
";
let staging = build_graph(source);
assert!(staging.stats().nodes_staged >= 1);
}
#[test]
fn dict_comprehension_scope() {
let source = r"
def invert(d: dict) -> dict:
return {v: k for k, v in d.items()}
";
let staging = build_graph(source);
assert!(staging.stats().nodes_staged >= 1);
}
#[test]
fn generator_expression_scope() {
let source = r"
def total(items):
return sum(x * 2 for x in items)
";
let staging = build_graph(source);
assert!(staging.stats().nodes_staged >= 1);
}
#[test]
fn walrus_operator_binding() {
let source = r"
import re
def find_match(pattern: str, text: str):
if (m := re.search(pattern, text)):
return m.group(0)
return None
";
let staging = build_graph(source);
assert!(staging.stats().nodes_staged >= 1);
}
#[test]
fn except_clause_binding() {
let source = r"
def safe_parse(text: str):
try:
return int(text)
except ValueError as e:
print(e)
return None
";
let staging = build_graph(source);
assert!(staging.stats().nodes_staged >= 1);
}
#[test]
fn with_statement_binding() {
let source = r"
def read_file(path: str) -> str:
with open(path) as f:
return f.read()
";
let staging = build_graph(source);
assert!(staging.stats().nodes_staged >= 1);
}
#[test]
fn for_statement_binding() {
let source = r"
def sum_all(numbers):
total = 0
for n in numbers:
total += n
return total
";
let staging = build_graph(source);
assert!(staging.stats().nodes_staged >= 1);
}
#[test]
fn splat_args_parameter() {
let source = r"
def variadic(*args):
return sum(args)
";
let staging = build_graph(source);
assert!(staging.stats().nodes_staged >= 1);
}
#[test]
fn kwargs_parameter() {
let source = r"
def flexible(**kwargs):
return kwargs
";
let staging = build_graph(source);
assert!(staging.stats().nodes_staged >= 1);
}
#[test]
fn typed_splat_args_parameter() {
let source = r"
def typed_variadic(*args: int) -> int:
return sum(args)
";
let staging = build_graph(source);
assert!(staging.stats().nodes_staged >= 1);
}
#[test]
fn default_parameter() {
let source = r"
def greet(name='world'):
return f'Hello, {name}!'
";
let staging = build_graph(source);
assert!(staging.stats().nodes_staged >= 1);
}
#[test]
fn typed_default_parameter() {
let source = r"
def power(base: int, exp: int = 2) -> int:
return base ** exp
";
let staging = build_graph(source);
assert!(staging.stats().nodes_staged >= 1);
}
#[test]
fn native_c_module_import_creates_ffi_edge() {
let source = r"
import math
def circle_area(radius: float) -> float:
return math.pi * radius ** 2
";
let staging = build_graph(source);
assert!(
staging.stats().nodes_staged >= 1,
"Expected at least one node from native C module import source"
);
}
#[test]
fn ctypes_cdll_usage() {
let source = r#"
import ctypes
libc = ctypes.CDLL("libc.so.6")
def call_printf():
libc.printf(b"Hello\n")
"#;
let staging = build_graph(source);
assert!(
staging.stats().nodes_staged >= 1,
"Expected at least one node from ctypes CDLL usage source"
);
}
#[test]
fn annotated_assignment_module_level() {
let source = r"
from typing import Optional
MAX_SIZE: int = 100
name: Optional[str] = None
";
let staging = build_graph(source);
assert!(staging.stats().nodes_staged >= 1);
}
#[test]
fn public_function_exported_without_all() {
let source = r"
def public_helper():
return 42
def _internal_helper():
return 0
";
let staging = build_graph(source);
assert!(
has_edge_tag(&staging, "exports"),
"Expected exports edge for public function. Tags: {:?}",
all_edge_tags(&staging)
);
}
#[test]
fn private_function_not_exported() {
let source = r"
def _private():
pass
";
let staging = build_graph(source);
assert!(
!has_edge_tag(&staging, "exports"),
"Private function should not be exported. Tags: {:?}",
all_edge_tags(&staging)
);
}
#[test]
fn class_with_dunder_methods() {
let source = r"
class Node:
def __init__(self, value: int):
self.value = value
def __repr__(self) -> str:
return f'Node({self.value})'
def __eq__(self, other: object) -> bool:
if isinstance(other, Node):
return self.value == other.value
return NotImplemented
";
let staging = build_graph(source);
assert!(staging.stats().nodes_staged >= 3);
}
#[test]
fn self_method_call_resolved() {
let source = r"
class Worker:
def prepare(self):
self.validate()
self.process()
def validate(self):
pass
def process(self):
pass
";
let staging = build_graph(source);
assert!(
has_edge_tag(&staging, "calls"),
"Expected calls edges for self method calls. Tags: {:?}",
all_edge_tags(&staging)
);
}
#[test]
fn module_level_call() {
let source = r"
def setup():
pass
def teardown():
pass
setup()
teardown()
";
let staging = build_graph(source);
assert!(
has_edge_tag(&staging, "calls"),
"Expected calls from module-level calls. Tags: {:?}",
all_edge_tags(&staging)
);
}
#[test]
fn nested_class_in_function() {
let source = r"
def make_counter():
class Counter:
def __init__(self):
self.n = 0
def increment(self):
self.n += 1
return Counter()
";
let staging = build_graph(source);
assert!(staging.stats().nodes_staged >= 2);
}
#[test]
fn nested_function_in_class() {
let source = r"
class Processor:
def process(self, items):
def transform(item):
return item * 2
return [transform(x) for x in items]
";
let staging = build_graph(source);
assert!(staging.stats().nodes_staged >= 2);
}