agentic_codebase/semantic/
ffi_tracer.rs1use crate::parse::ReferenceKind;
7use crate::types::{AcbResult, Language};
8
9use super::resolver::ResolvedUnit;
10
11pub struct FfiTracer {
13 detectors: Vec<Box<dyn FfiDetector>>,
15}
16
17#[derive(Debug, Clone)]
19pub struct FfiEdge {
20 pub source_id: u64,
22 pub target_id: Option<u64>,
24 pub ffi_type: FfiPatternType,
26 pub binding_info: String,
28}
29
30#[derive(Debug, Clone, Copy, PartialEq, Eq)]
32pub enum FfiPatternType {
33 PyO3,
35 Ctypes,
37 Cffi,
39 RustCFfi,
41 NodeNapi,
43 Wasm,
45 HttpRpc,
47}
48
49trait FfiDetector: Send + Sync {
51 fn detect(&self, unit: &ResolvedUnit, all_units: &[ResolvedUnit]) -> Vec<FfiEdge>;
53}
54
55impl FfiTracer {
56 pub fn new() -> Self {
58 let detectors: Vec<Box<dyn FfiDetector>> = vec![
59 Box::new(PyO3Detector),
60 Box::new(CtypesDetector),
61 Box::new(HttpRpcDetector),
62 ];
63 Self { detectors }
64 }
65
66 pub fn trace(&self, units: &[ResolvedUnit]) -> AcbResult<Vec<FfiEdge>> {
68 let mut edges = Vec::new();
69
70 for unit in units {
71 for detector in &self.detectors {
72 let calls = detector.detect(unit, units);
73 edges.extend(calls);
74 }
75 }
76
77 Ok(edges)
78 }
79}
80
81impl Default for FfiTracer {
82 fn default() -> Self {
83 Self::new()
84 }
85}
86
87struct PyO3Detector;
89
90impl FfiDetector for PyO3Detector {
91 fn detect(&self, unit: &ResolvedUnit, all_units: &[ResolvedUnit]) -> Vec<FfiEdge> {
92 let mut calls = Vec::new();
93
94 if unit.unit.language != Language::Python {
95 return calls;
96 }
97
98 for ref_info in &unit.resolved_refs {
99 if ref_info.raw.kind == ReferenceKind::Import {
100 if let Some(rust_id) = find_pyo3_module(&ref_info.raw.name, all_units) {
101 calls.push(FfiEdge {
102 source_id: unit.unit.temp_id,
103 target_id: Some(rust_id),
104 ffi_type: FfiPatternType::PyO3,
105 binding_info: format!("import {}", ref_info.raw.name),
106 });
107 }
108 }
109 }
110
111 calls
112 }
113}
114
115struct CtypesDetector;
117
118impl FfiDetector for CtypesDetector {
119 fn detect(&self, unit: &ResolvedUnit, _all_units: &[ResolvedUnit]) -> Vec<FfiEdge> {
120 let mut calls = Vec::new();
121
122 if unit.unit.language != Language::Python {
123 return calls;
124 }
125
126 for ref_info in &unit.resolved_refs {
127 if ref_info.raw.kind == ReferenceKind::Import {
128 if ref_info.raw.name.contains("ctypes") {
129 calls.push(FfiEdge {
130 source_id: unit.unit.temp_id,
131 target_id: None,
132 ffi_type: FfiPatternType::Ctypes,
133 binding_info: format!("ctypes usage: {}", ref_info.raw.name),
134 });
135 } else if ref_info.raw.name.contains("cffi") {
136 calls.push(FfiEdge {
137 source_id: unit.unit.temp_id,
138 target_id: None,
139 ffi_type: FfiPatternType::Cffi,
140 binding_info: format!("cffi usage: {}", ref_info.raw.name),
141 });
142 }
143 }
144 }
145
146 calls
147 }
148}
149
150struct HttpRpcDetector;
152
153impl FfiDetector for HttpRpcDetector {
154 fn detect(&self, unit: &ResolvedUnit, _all_units: &[ResolvedUnit]) -> Vec<FfiEdge> {
155 let mut calls = Vec::new();
156
157 for ref_info in &unit.resolved_refs {
158 if ref_info.raw.kind == ReferenceKind::Call {
159 let name_lower = ref_info.raw.name.to_lowercase();
160 if name_lower.contains("fetch")
161 || name_lower.contains("request")
162 || name_lower.contains("http")
163 || name_lower.contains("axios")
164 {
165 calls.push(FfiEdge {
166 source_id: unit.unit.temp_id,
167 target_id: None,
168 ffi_type: FfiPatternType::HttpRpc,
169 binding_info: format!("HTTP call: {}", ref_info.raw.name),
170 });
171 }
172 }
173 }
174
175 calls
176 }
177}
178
179fn find_pyo3_module(name: &str, units: &[ResolvedUnit]) -> Option<u64> {
181 for unit in units {
182 if unit.unit.language == Language::Rust {
183 if let Some(meta) = unit.unit.metadata.get("pymodule") {
184 if meta == name {
185 return Some(unit.unit.temp_id);
186 }
187 }
188 }
189 }
190 None
191}