use anyhow::Result;
use serde::Serialize;
use std::collections::HashMap;
use tree_sitter::Language;
use crate::core::graph::Edge;
use crate::core::parser::SupportedLanguage;
use crate::languages::{make_edge, resolve_scope_id, LanguagePlugin};
pub struct PythonPlugin;
impl LanguagePlugin for PythonPlugin {
fn language(&self) -> SupportedLanguage {
SupportedLanguage::Python
}
fn extensions(&self) -> &[&str] {
&["py"]
}
fn ts_language(&self) -> Language {
tree_sitter_python::language()
}
fn symbol_query_source(&self) -> &str {
include_str!("../queries/python/symbols.scm")
}
fn edge_query_source(&self) -> &str {
include_str!("../queries/python/edges.scm")
}
fn infer_symbol_kind(&self, node_kind: &str) -> &str {
match node_kind {
"function_definition" => "function",
"class_definition" => "class",
_ => "function",
}
}
fn scope_node_types(&self) -> &[&str] {
&[
"function_definition",
"class_definition",
"decorated_definition",
"module",
]
}
fn class_body_node_types(&self) -> &[&str] {
&["block"]
}
fn class_decl_node_types(&self) -> &[&str] {
&["class_definition"]
}
fn extract_metadata(
&self,
node: &tree_sitter::Node,
source: &str,
kind: &str,
) -> Result<String> {
extract_metadata(node, source, kind)
}
fn extract_edge(
&self,
pattern_index: usize,
captures: &HashMap<String, (String, u32)>,
file_path: &str,
enclosing_scope_id: Option<&str>,
) -> Vec<Edge> {
extract_py_edge(pattern_index, captures, file_path, enclosing_scope_id)
}
fn extract_docstring(&self, node: &tree_sitter::Node, source: &str) -> Option<String> {
extract_docstring(node, source)
}
fn generic_name_stopwords(&self) -> &[&str] {
&[
"__init__", "__str__", "__repr__", "__eq__", "__hash__", "__len__", "__iter__",
"__next__",
]
}
}
#[derive(Debug, Clone, Serialize, Default)]
pub struct PythonMetadata {
pub access: String,
pub is_async: bool,
pub is_static: bool,
pub is_classmethod: bool,
pub is_abstract: bool,
pub is_property: bool,
pub decorators: Vec<String>,
pub return_type: Option<String>,
pub parameters: Vec<PythonParameterInfo>,
}
#[derive(Debug, Clone, Serialize)]
pub struct PythonParameterInfo {
pub name: String,
#[serde(rename = "type")]
pub type_annotation: Option<String>,
pub has_default: bool,
}
pub fn extract_metadata(node: &tree_sitter::Node, source: &str, kind: &str) -> Result<String> {
let mut meta = PythonMetadata {
access: "public".to_string(),
..Default::default()
};
if let Some(parent) = node.parent() {
if parent.kind() == "decorated_definition" {
let mut cursor = parent.walk();
for child in parent.children(&mut cursor) {
if child.kind() == "decorator" {
if let Ok(text) = child.utf8_text(source.as_bytes()) {
let dec_name = text
.trim_start_matches('@')
.split('(')
.next()
.unwrap_or("")
.trim()
.to_string();
if !dec_name.is_empty() {
match dec_name.as_str() {
"staticmethod" => meta.is_static = true,
"classmethod" => meta.is_classmethod = true,
"abstractmethod" | "abc.abstractmethod" => meta.is_abstract = true,
"property" => meta.is_property = true,
_ => {}
}
meta.decorators.push(dec_name);
}
}
}
}
}
}
if node.kind() == "function_definition" {
let mut cursor = node.walk();
for child in node.children(&mut cursor) {
if child.kind() == "async" {
meta.is_async = true;
break;
}
}
}
if let Some(name_node) = node.child_by_field_name("name") {
if let Ok(name) = name_node.utf8_text(source.as_bytes()) {
meta.access = infer_access(name);
}
}
if let Some(return_type_node) = node.child_by_field_name("return_type") {
if let Ok(text) = return_type_node.utf8_text(source.as_bytes()) {
let clean = text.trim_start_matches("->").trim();
if !clean.is_empty() {
meta.return_type = Some(clean.to_string());
}
}
}
if kind == "function" || kind == "method" || kind == "class" {
if let Some(params_node) = node.child_by_field_name("parameters") {
meta.parameters = extract_parameters(¶ms_node, source);
}
}
let json = serde_json::to_string(&meta)?;
Ok(json)
}
fn infer_access(name: &str) -> String {
if name.starts_with("__") && !name.ends_with("__") {
"name_mangled".to_string()
} else if name.starts_with('_') {
"private".to_string()
} else {
"public".to_string()
}
}
fn extract_parameters(params_node: &tree_sitter::Node, source: &str) -> Vec<PythonParameterInfo> {
let mut params = Vec::new();
let mut cursor = params_node.walk();
for child in params_node.children(&mut cursor) {
match child.kind() {
"identifier" => {
if let Ok(name) = child.utf8_text(source.as_bytes()) {
let name = name.to_string();
if name != "self" && name != "cls" {
params.push(PythonParameterInfo {
name,
type_annotation: None,
has_default: false,
});
}
}
}
"typed_parameter" => {
let name_node = child.child_by_field_name("name").or_else(|| {
let mut c = child.walk();
let found = child.children(&mut c).find(|n| n.kind() == "identifier");
found
});
let name = name_node
.and_then(|n| n.utf8_text(source.as_bytes()).ok())
.unwrap_or_default()
.to_string();
if name == "self" || name == "cls" {
continue;
}
let type_annotation = child
.child_by_field_name("type")
.and_then(|n| n.utf8_text(source.as_bytes()).ok())
.map(|t| t.trim().to_string());
params.push(PythonParameterInfo {
name,
type_annotation,
has_default: false,
});
}
"default_parameter" => {
let name = child
.child_by_field_name("name")
.and_then(|n| n.utf8_text(source.as_bytes()).ok())
.unwrap_or_default()
.to_string();
if name == "self" || name == "cls" {
continue;
}
params.push(PythonParameterInfo {
name,
type_annotation: None,
has_default: true,
});
}
"typed_default_parameter" => {
let name = child
.child_by_field_name("name")
.and_then(|n| n.utf8_text(source.as_bytes()).ok())
.unwrap_or_default()
.to_string();
if name == "self" || name == "cls" {
continue;
}
let type_annotation = child
.child_by_field_name("type")
.and_then(|n| n.utf8_text(source.as_bytes()).ok())
.map(|t| t.trim().to_string());
params.push(PythonParameterInfo {
name,
type_annotation,
has_default: true,
});
}
_ => {}
}
}
params
}
pub fn extract_docstring(node: &tree_sitter::Node, source: &str) -> Option<String> {
let body = node.child_by_field_name("body")?;
let mut cursor = body.walk();
let first_child = body.children(&mut cursor).next()?;
if first_child.kind() != "expression_statement" {
return None;
}
let mut inner_cursor = first_child.walk();
for inner in first_child.children(&mut inner_cursor) {
if inner.kind() == "string" {
if let Ok(text) = inner.utf8_text(source.as_bytes()) {
let cleaned = text
.trim_start_matches("\"\"\"")
.trim_start_matches("'''")
.trim_end_matches("\"\"\"")
.trim_end_matches("'''")
.trim();
if !cleaned.is_empty() {
return Some(cleaned.to_string());
}
}
}
}
None
}
fn extract_py_edge(
pattern: usize,
captures: &HashMap<String, (String, u32)>,
file_path: &str,
enclosing_scope_id: Option<&str>,
) -> Vec<Edge> {
let mut edges = Vec::new();
let from_fn = resolve_scope_id(enclosing_scope_id, file_path, "function");
let from_cls = resolve_scope_id(enclosing_scope_id, file_path, "class");
match pattern {
0 => {
if let Some((imported_name, line)) = captures.get("imported_name") {
edges.push(make_edge(
format!("{file_path}::__module__::function"),
imported_name,
"imports",
file_path,
*line,
));
}
}
1 => {
if let (Some((imported_name, line)), Some((source_mod, _))) =
(captures.get("imported_name"), captures.get("source"))
{
edges.push(make_edge(
format!("{file_path}::__module__::function"),
format!("{source_mod}::{imported_name}"),
"imports",
file_path,
*line,
));
}
}
2 => {
if let Some((callee, line)) = captures.get("callee") {
edges.push(make_edge(
from_fn.clone(),
callee,
"calls",
file_path,
*line,
));
}
}
3 => {
if let (Some((object, line)), Some((method, _))) =
(captures.get("object"), captures.get("method"))
{
edges.push(make_edge(
from_fn.clone(),
format!("{object}.{method}"),
"calls",
file_path,
*line,
));
}
}
4 => {
if let Some((base_class, line)) = captures.get("base_class") {
edges.push(make_edge(
from_cls.clone(),
base_class,
"extends",
file_path,
*line,
));
}
}
_ => {}
}
edges
}