use crate::code_tree::models::{
ClassInfo, ConstantInfo, EnumInfo, FileInfo, FunctionInfo, InterfaceInfo,
};
use aho_corasick::{AhoCorasick, MatchKind};
use rayon::prelude::*;
use std::collections::{BTreeMap, HashMap, HashSet};
fn get_separator(language: &str) -> &'static str {
match language {
"rust" | "cpp" => "::",
"python" | "java" | "csharp" | "dart" => ".",
"php" => "\\",
_ => "/",
}
}
pub struct ContainsEdge {
pub parent: String,
pub child: String,
}
pub struct ImportEdge {
pub file_path: String,
pub module: String,
}
pub struct FileImportEdge {
pub source: String,
pub target: String,
pub import_count: i64,
}
pub struct UsesTypeEdge {
pub function: String,
pub type_name: String,
pub target_node_type: &'static str,
pub position: &'static str,
}
pub struct FfiExposesEdge {
pub module_fn: String,
pub target_qname: String,
pub target_type: &'static str,
pub py_name: String,
}
pub struct ModuleContainsFileEdge {
pub module: String,
pub file_path: String,
}
pub fn build_module_contains_file_edges(files: &[FileInfo]) -> Vec<ModuleContainsFileEdge> {
files
.iter()
.filter(|f| !f.module_path.is_empty())
.map(|f| ModuleContainsFileEdge {
module: f.module_path.clone(),
file_path: f.path.clone(),
})
.collect()
}
pub fn build_contains_edges(files: &[FileInfo]) -> Vec<ContainsEdge> {
let mut out = Vec::new();
for f in files {
let sep = get_separator(&f.language);
for sub in &f.submodule_declarations {
out.push(ContainsEdge {
parent: f.module_path.clone(),
child: format!("{}{}{}", f.module_path, sep, sub),
});
}
}
out
}
pub fn build_import_edges(files: &[FileInfo], known_modules: &HashSet<String>) -> Vec<ImportEdge> {
let mut out = Vec::new();
for f in files {
let sep = get_separator(&f.language);
for use_path in &f.imports {
let parts: Vec<&str> = use_path.split(sep).collect();
for end in (1..=parts.len()).rev() {
let candidate = parts[..end].join(sep);
if known_modules.contains(&candidate) {
out.push(ImportEdge {
file_path: f.path.clone(),
module: candidate,
});
break;
}
}
}
}
out
}
pub fn build_file_import_edges(
files: &[FileInfo],
module_to_file: &HashMap<String, String>,
) -> Vec<FileImportEdge> {
let mut counts: HashMap<(String, String), i64> = HashMap::new();
for f in files {
let sep = get_separator(&f.language);
for use_path in &f.imports {
let parts: Vec<&str> = use_path.split(sep).collect();
for end in (1..=parts.len()).rev() {
let candidate = parts[..end].join(sep);
if let Some(target_file) = module_to_file.get(&candidate) {
if target_file != &f.path {
*counts
.entry((f.path.clone(), target_file.clone()))
.or_insert(0) += 1;
}
break;
}
}
}
}
counts
.into_iter()
.map(|((source, target), import_count)| FileImportEdge {
source,
target,
import_count,
})
.collect()
}
pub fn build_uses_type_edges(
functions: &[FunctionInfo],
classes: &[ClassInfo],
enums: &[EnumInfo],
interfaces: &[InterfaceInfo],
) -> BTreeMap<&'static str, Vec<UsesTypeEdge>> {
let mut type_lookup: HashMap<String, (String, &'static str)> = HashMap::new();
for c in classes {
if c.name.chars().count() > 1 {
let target = super::class_node_type(&c.kind);
type_lookup.insert(c.name.clone(), (c.qualified_name.clone(), target));
}
}
for e in enums {
if e.name.chars().count() > 1 {
type_lookup.insert(e.name.clone(), (e.qualified_name.clone(), "Enum"));
}
}
for i in interfaces {
if i.name.chars().count() > 1 {
let target = match i.kind.as_str() {
"trait" => "Trait",
"protocol" => "Protocol",
_ => "Interface",
};
type_lookup.insert(i.name.clone(), (i.qualified_name.clone(), target));
}
}
if type_lookup.is_empty() {
return BTreeMap::new();
}
let mut names: Vec<String> = type_lookup.keys().cloned().collect();
names.sort_by(|a, b| b.len().cmp(&a.len()).then_with(|| a.cmp(b)));
let pattern_meta: Vec<(String, &'static str)> = names
.iter()
.map(|n| {
let (q, t) = type_lookup.get(n).unwrap();
(q.clone(), *t)
})
.collect();
let ac = match AhoCorasick::builder()
.match_kind(MatchKind::LeftmostLongest)
.build(&names)
{
Ok(ac) => ac,
Err(_) => return BTreeMap::new(),
};
const POS_PARAM: u8 = 1 << 0;
const POS_RETURN: u8 = 1 << 1;
const POS_SIGNATURE: u8 = 1 << 2;
const POS_RECEIVER: u8 = 1 << 3;
let per_fn: Vec<Vec<(u32, &'static str, String, &'static str)>> = functions
.par_iter()
.map(|fn_info| {
let mut seen: HashMap<u32, u8> = HashMap::new();
let scan = |text: &str, pos_bit: u8, seen: &mut HashMap<u32, u8>| {
if text.is_empty() {
return;
}
let bytes = text.as_bytes();
for m in ac.find_iter(text) {
let start = m.start();
let end = m.end();
let before_ok = start == 0
|| !bytes[start - 1].is_ascii_alphanumeric() && bytes[start - 1] != b'_';
let after_ok = end == text.len()
|| !bytes[end].is_ascii_alphanumeric() && bytes[end] != b'_';
if !before_ok || !after_ok {
continue;
}
let pat_id = m.pattern().as_usize() as u32;
*seen.entry(pat_id).or_insert(0) |= pos_bit;
}
};
for p in &fn_info.parameters {
if let Some(t) = &p.type_annotation {
let pos_bit = if p.kind == crate::code_tree::models::ParameterKind::Receiver {
POS_RECEIVER
} else {
POS_PARAM
};
scan(t, pos_bit, &mut seen);
}
}
if let Some(rt) = &fn_info.return_type {
scan(rt, POS_RETURN, &mut seen);
}
let has_param_types = fn_info
.parameters
.iter()
.any(|p| p.type_annotation.is_some());
if !has_param_types && !fn_info.signature.is_empty() {
scan(&fn_info.signature, POS_SIGNATURE, &mut seen);
}
seen.into_iter()
.map(|(pat_id, bits)| {
let semantic_count = (bits & POS_PARAM != 0) as u8
+ (bits & POS_RETURN != 0) as u8
+ (bits & POS_RECEIVER != 0) as u8;
let position = if semantic_count >= 2 {
"both"
} else if bits & POS_RECEIVER != 0 {
"receiver"
} else if bits & POS_PARAM != 0 {
"parameter"
} else if bits & POS_RETURN != 0 {
"return"
} else if bits & POS_SIGNATURE != 0 {
"signature"
} else {
unreachable!("at least one position bit must be set");
};
let (qname, target) = &pattern_meta[pat_id as usize];
(pat_id, *target, qname.clone(), position)
})
.collect()
})
.collect();
let mut by_target_type: BTreeMap<&'static str, Vec<UsesTypeEdge>> = BTreeMap::new();
for (fn_info, matches) in functions.iter().zip(per_fn.into_iter()) {
for (_pat_id, target, qname, position) in matches {
by_target_type
.entry(target)
.or_default()
.push(UsesTypeEdge {
function: fn_info.qualified_name.clone(),
type_name: qname,
target_node_type: target,
position,
});
}
}
by_target_type
}
pub struct ReferencesEdge {
pub function: String,
pub constant: String,
pub line: u32,
}
pub struct ReferencesFnEdge {
pub caller: String,
pub callee: String,
pub line: u32,
}
pub struct DecoratesEdge {
pub decorator: String,
pub function: String,
pub decorator_name: String,
}
pub fn build_references_edges(
functions: &[FunctionInfo],
constants: &[ConstantInfo],
) -> Vec<ReferencesEdge> {
if constants.is_empty() {
return Vec::new();
}
let mut by_name: HashMap<&str, Vec<&str>> = HashMap::new();
for c in constants {
by_name
.entry(c.name.as_str())
.or_default()
.push(c.qualified_name.as_str());
}
let mut out: Vec<ReferencesEdge> = Vec::new();
for f in functions {
if f.references.is_empty() {
continue;
}
let mut seen: HashSet<&str> = HashSet::new();
for (ident, line) in &f.references {
let Some(matches) = by_name.get(ident.as_str()) else {
continue;
};
for &qname in matches {
if seen.insert(qname) {
out.push(ReferencesEdge {
function: f.qualified_name.clone(),
constant: qname.to_string(),
line: *line,
});
}
}
}
}
out
}
pub fn build_references_fn_edges(functions: &[FunctionInfo]) -> Vec<ReferencesFnEdge> {
if functions.is_empty() {
return Vec::new();
}
let mut by_name: HashMap<&str, Vec<&str>> = HashMap::new();
for f in functions {
by_name
.entry(f.name.as_str())
.or_default()
.push(f.qualified_name.as_str());
}
let mut out: Vec<ReferencesFnEdge> = Vec::new();
for f in functions {
if f.function_refs.is_empty() {
continue;
}
let caller = f.qualified_name.as_str();
let mut seen: HashSet<&str> = HashSet::new();
for (ident, line) in &f.function_refs {
let Some(matches) = by_name.get(ident.as_str()) else {
continue;
};
if matches.len() != 1 {
continue;
}
let target = matches[0];
if target == caller {
continue;
}
if seen.insert(target) {
out.push(ReferencesFnEdge {
caller: caller.to_string(),
callee: target.to_string(),
line: *line,
});
}
}
}
out
}
pub struct PyO3BindsEdge {
pub py_function: String,
pub rust_function: String,
}
pub fn build_pyo3_binds_edges(functions: &[FunctionInfo]) -> Vec<PyO3BindsEdge> {
let mut rust_idx: HashMap<(String, String), Vec<&str>> = HashMap::new();
for f in functions {
if !f
.metadata
.get("is_pymethod")
.and_then(|v| v.as_bool())
.unwrap_or(false)
{
continue;
}
let parts: Vec<&str> = f.qualified_name.split("::").collect();
if parts.len() < 2 {
continue;
}
let parent = parts[parts.len() - 2].to_string();
let method = parts[parts.len() - 1].to_string();
rust_idx
.entry((parent, method))
.or_default()
.push(f.qualified_name.as_str());
}
let mut out = Vec::new();
for f in functions {
if !f.qualified_name.contains('.') || !f.is_method {
continue;
}
let parts: Vec<&str> = f.qualified_name.split('.').collect();
if parts.len() < 3 {
continue;
}
let py_class = parts[parts.len() - 2].to_string();
let py_method = parts[parts.len() - 1].to_string();
let Some(matches) = rust_idx.get(&(py_class, py_method)) else {
continue;
};
if matches.len() != 1 {
continue; }
out.push(PyO3BindsEdge {
py_function: f.qualified_name.clone(),
rust_function: matches[0].to_string(),
});
}
out
}
pub fn build_decorates_edges(functions: &[FunctionInfo]) -> Vec<DecoratesEdge> {
if functions.is_empty() {
return Vec::new();
}
let mut by_name: HashMap<&str, Vec<&str>> = HashMap::new();
for f in functions {
by_name
.entry(f.name.as_str())
.or_default()
.push(f.qualified_name.as_str());
}
let mut out: Vec<DecoratesEdge> = Vec::new();
for f in functions {
if f.decorators.is_empty() {
continue;
}
let function_qname = f.qualified_name.as_str();
let mut seen: HashSet<&str> = HashSet::new();
for raw in &f.decorators {
let head = raw.split('(').next().unwrap_or(raw).trim();
if head.is_empty() {
continue;
}
let bare = head
.rsplit_once("::")
.map(|(_, t)| t)
.or_else(|| head.rsplit_once('.').map(|(_, t)| t))
.unwrap_or(head);
let Some(candidates) = by_name.get(bare) else {
continue;
};
if candidates.len() != 1 {
continue; }
let target = candidates[0];
if target == function_qname {
continue; }
if seen.insert(target) {
out.push(DecoratesEdge {
decorator: target.to_string(),
function: function_qname.to_string(),
decorator_name: raw.clone(),
});
}
}
}
out
}
pub fn build_ffi_exposes_edges(
functions: &[FunctionInfo],
classes: &[ClassInfo],
) -> Vec<FfiExposesEdge> {
let pymodule_fns: Vec<&FunctionInfo> = functions
.iter()
.filter(|f| {
f.metadata
.get("is_pymodule")
.and_then(|v| v.as_bool())
.unwrap_or(false)
})
.collect();
if pymodule_fns.is_empty() {
return Vec::new();
}
let pyclass_items: Vec<(&ClassInfo, String)> = classes
.iter()
.filter(|c| {
c.metadata
.get("is_pyclass")
.and_then(|v| v.as_bool())
.unwrap_or(false)
})
.map(|c| {
let py_name = c
.metadata
.get("py_name")
.and_then(|v| v.as_str())
.map(str::to_string)
.unwrap_or_else(|| c.name.clone());
(c, py_name)
})
.collect();
let pyfunc_items: Vec<(&FunctionInfo, String)> = functions
.iter()
.filter(|f| {
!f.is_method
&& !f
.metadata
.get("is_pymodule")
.and_then(|v| v.as_bool())
.unwrap_or(false)
&& f.metadata.get("ffi_kind").and_then(|v| v.as_str()) == Some("pyo3")
})
.map(|f| {
let py_name = f
.metadata
.get("py_name")
.and_then(|v| v.as_str())
.map(str::to_string)
.unwrap_or_else(|| f.name.clone());
(f, py_name)
})
.collect();
let mut out = Vec::new();
for mod_fn in &pymodule_fns {
for (c, py_name) in &pyclass_items {
out.push(FfiExposesEdge {
module_fn: mod_fn.qualified_name.clone(),
target_qname: c.qualified_name.clone(),
target_type: "Struct",
py_name: py_name.clone(),
});
}
for (f, py_name) in &pyfunc_items {
out.push(FfiExposesEdge {
module_fn: mod_fn.qualified_name.clone(),
target_qname: f.qualified_name.clone(),
target_type: "Function",
py_name: py_name.clone(),
});
}
}
out
}