1use std::path::Path;
2
3use anyhow::Result;
4use serde::Serialize;
5
6use crate::graph::GraphStore;
7
8use super::interprocedural::detect_interprocedural_taint;
9
10#[derive(Debug, Clone, Serialize)]
11pub struct PathTraversalFlow {
12 pub kind: &'static str,
13 pub source_symbol: String,
14 pub sink_symbol: String,
15 pub source_kind: String,
16 pub depth: u32,
17 pub call_chain: Vec<String>,
18 pub sanitized: bool,
19}
20
21static PATH_TRAVERSAL_SOURCE_KINDS: &[&str] = &["HttpParam", "HttpBody", "HttpHeader", "UserInput"];
22
23static PATH_TRAVERSAL_SINK_CATEGORIES: &[&str] = &["PathTraversal"];
24
25static PATH_TRAVERSAL_SANITIZERS: &[&str] = &[
26 "realpath(",
27 "abspath(",
28 "canonicalize(",
29 "path.resolve(",
30 "secure_filename(",
31 "os.path.basename(",
32 "filepath.Clean(",
33 "os.path.normpath(",
34 "Path.normalize(",
35];
36
37pub fn detect_path_traversal(
38 store: &GraphStore,
39 root: &Path,
40 max_depth: u32,
41) -> Result<Vec<PathTraversalFlow>> {
42 let mut results = Vec::new();
43
44 let intra_flows = super::detect_taint_flows(store, root)?;
46 for flow in &intra_flows {
47 if flow.sink_category == "PathTraversal"
48 && PATH_TRAVERSAL_SOURCE_KINDS.contains(&flow.source_kind.as_str())
49 {
50 results.push(PathTraversalFlow {
51 kind: "intra-procedural",
52 source_symbol: flow.symbol_id.clone(),
53 sink_symbol: flow.symbol_id.clone(),
54 source_kind: flow.source_kind.clone(),
55 depth: 0,
56 call_chain: vec![flow.symbol_id.clone()],
57 sanitized: flow.sanitized,
58 });
59 }
60 }
61
62 let inter_flows = detect_interprocedural_taint(store, root, max_depth)?;
64 for flow in &inter_flows {
65 if PATH_TRAVERSAL_SINK_CATEGORIES.contains(&flow.sink_category.as_str())
66 && PATH_TRAVERSAL_SOURCE_KINDS.contains(&flow.source_kind.as_str())
67 {
68 let sanitized = check_chain_sanitized(store, root, &flow.call_chain);
69 results.push(PathTraversalFlow {
70 kind: "inter-procedural",
71 source_symbol: flow.source_symbol.clone(),
72 sink_symbol: flow.sink_symbol.clone(),
73 source_kind: flow.source_kind.clone(),
74 depth: flow.depth,
75 call_chain: flow.call_chain.clone(),
76 sanitized,
77 });
78 }
79 }
80
81 Ok(results)
82}
83
84fn check_chain_sanitized(store: &GraphStore, root: &Path, chain: &[String]) -> bool {
85 let conn = match store.connection() {
86 Ok(c) => c,
87 Err(_) => return false,
88 };
89
90 for symbol_id in chain {
91 let result = conn.query(&format!(
92 "MATCH (s:Symbol) WHERE s.id = '{}' RETURN s.file, s.start_line, s.end_line",
93 crate::escape_str(symbol_id)
94 ));
95
96 if let Ok(result) = result {
97 for row in result {
98 if row.len() < 3 {
99 continue;
100 }
101 let file = row[0].to_string();
102 let start: usize = row[1].to_string().parse().unwrap_or(0);
103 let end: usize = row[2].to_string().parse().unwrap_or(0);
104
105 if let Ok(content) = std::fs::read_to_string(root.join(&file)) {
106 let lines: Vec<&str> = content.lines().collect();
107 let start_idx = start.saturating_sub(1);
108 let end_idx = end.min(lines.len());
109
110 for line in &lines[start_idx..end_idx] {
111 let lower = line.to_lowercase();
112 for &pat in PATH_TRAVERSAL_SANITIZERS {
113 if lower.contains(&pat.to_lowercase()) {
114 return true;
115 }
116 }
117 }
118 }
119 }
120 }
121 }
122
123 false
124}
125
126pub fn format_path_traversal(flows: &[PathTraversalFlow]) -> String {
127 if flows.is_empty() {
128 return "No multi-layer path traversal vulnerabilities detected.".to_string();
129 }
130
131 let active: Vec<_> = flows.iter().filter(|f| !f.sanitized).collect();
132 let sanitized = flows.len() - active.len();
133
134 let mut out = format!(
135 "Path traversal analysis: {} total ({} active, {} sanitized)\n\n",
136 flows.len(),
137 active.len(),
138 sanitized
139 );
140
141 if !active.is_empty() {
142 let intra: Vec<_> = active
143 .iter()
144 .filter(|f| f.kind == "intra-procedural")
145 .collect();
146 let inter: Vec<_> = active
147 .iter()
148 .filter(|f| f.kind == "inter-procedural")
149 .collect();
150
151 if !intra.is_empty() {
152 out.push_str(&format!("## Intra-procedural ({} flows)\n", intra.len()));
153 for f in &intra {
154 out.push_str(&format!(
155 " {} ({}) — same function\n",
156 f.source_symbol, f.source_kind
157 ));
158 }
159 out.push('\n');
160 }
161
162 if !inter.is_empty() {
163 out.push_str(&format!("## Inter-procedural ({} flows)\n", inter.len()));
164 for f in &inter {
165 out.push_str(&format!(
166 " {} -> {} ({}, depth: {})\n Chain: {}\n",
167 f.source_symbol,
168 f.sink_symbol,
169 f.source_kind,
170 f.depth,
171 f.call_chain.join(" -> ")
172 ));
173 }
174 out.push('\n');
175 }
176 }
177
178 if sanitized > 0 {
179 out.push_str(&format!(
180 "\n--- {} flows sanitized (path normalization detected) ---\n",
181 sanitized
182 ));
183 }
184
185 out
186}
187
188#[cfg(test)]
189mod tests {
190 use super::*;
191
192 #[test]
193 fn test_format_empty() {
194 let result = format_path_traversal(&[]);
195 assert!(result.contains("No multi-layer"));
196 }
197
198 #[test]
199 fn test_format_with_flows() {
200 let flows = vec![
201 PathTraversalFlow {
202 kind: "intra-procedural",
203 source_symbol: "app.py::download".to_string(),
204 sink_symbol: "app.py::download".to_string(),
205 source_kind: "HttpParam".to_string(),
206 depth: 0,
207 call_chain: vec!["app.py::download".to_string()],
208 sanitized: false,
209 },
210 PathTraversalFlow {
211 kind: "inter-procedural",
212 source_symbol: "api.py::get_file".to_string(),
213 sink_symbol: "storage.py::read_file".to_string(),
214 source_kind: "HttpParam".to_string(),
215 depth: 1,
216 call_chain: vec![
217 "api.py::get_file".to_string(),
218 "storage.py::read_file".to_string(),
219 ],
220 sanitized: false,
221 },
222 ];
223 let result = format_path_traversal(&flows);
224 assert!(result.contains("Intra-procedural"));
225 assert!(result.contains("Inter-procedural"));
226 assert!(result.contains("2 active"));
227 }
228
229 #[test]
230 fn test_sanitized_flow() {
231 let flows = vec![PathTraversalFlow {
232 kind: "intra-procedural",
233 source_symbol: "app.py::download".to_string(),
234 sink_symbol: "app.py::download".to_string(),
235 source_kind: "HttpParam".to_string(),
236 depth: 0,
237 call_chain: vec!["app.py::download".to_string()],
238 sanitized: true,
239 }];
240 let result = format_path_traversal(&flows);
241 assert!(result.contains("0 active"));
242 assert!(result.contains("1 sanitized"));
243 }
244}