Skip to main content

infigraph_core/taint/
path_traversal.rs

1use std::path::Path;
2
3use anyhow::Result;
4use serde::Serialize;
5
6use crate::graph::GraphStore;
7
8use super::interprocedural::detect_interprocedural_taint;
9
10#[derive(Debug, Clone, Serialize)]
11pub struct PathTraversalFlow {
12    pub kind: &'static str,
13    pub source_symbol: String,
14    pub sink_symbol: String,
15    pub source_kind: String,
16    pub depth: u32,
17    pub call_chain: Vec<String>,
18    pub sanitized: bool,
19}
20
21static PATH_TRAVERSAL_SOURCE_KINDS: &[&str] = &["HttpParam", "HttpBody", "HttpHeader", "UserInput"];
22
23static PATH_TRAVERSAL_SINK_CATEGORIES: &[&str] = &["PathTraversal"];
24
25static PATH_TRAVERSAL_SANITIZERS: &[&str] = &[
26    "realpath(",
27    "abspath(",
28    "canonicalize(",
29    "path.resolve(",
30    "secure_filename(",
31    "os.path.basename(",
32    "filepath.Clean(",
33    "os.path.normpath(",
34    "Path.normalize(",
35];
36
37pub fn detect_path_traversal(
38    store: &GraphStore,
39    root: &Path,
40    max_depth: u32,
41) -> Result<Vec<PathTraversalFlow>> {
42    let mut results = Vec::new();
43
44    // Intra-procedural: check existing taint flows
45    let intra_flows = super::detect_taint_flows(store, root)?;
46    for flow in &intra_flows {
47        if flow.sink_category == "PathTraversal"
48            && PATH_TRAVERSAL_SOURCE_KINDS.contains(&flow.source_kind.as_str())
49        {
50            results.push(PathTraversalFlow {
51                kind: "intra-procedural",
52                source_symbol: flow.symbol_id.clone(),
53                sink_symbol: flow.symbol_id.clone(),
54                source_kind: flow.source_kind.clone(),
55                depth: 0,
56                call_chain: vec![flow.symbol_id.clone()],
57                sanitized: flow.sanitized,
58            });
59        }
60    }
61
62    // Inter-procedural: trace across function boundaries
63    let inter_flows = detect_interprocedural_taint(store, root, max_depth)?;
64    for flow in &inter_flows {
65        if PATH_TRAVERSAL_SINK_CATEGORIES.contains(&flow.sink_category.as_str())
66            && PATH_TRAVERSAL_SOURCE_KINDS.contains(&flow.source_kind.as_str())
67        {
68            let sanitized = check_chain_sanitized(store, root, &flow.call_chain);
69            results.push(PathTraversalFlow {
70                kind: "inter-procedural",
71                source_symbol: flow.source_symbol.clone(),
72                sink_symbol: flow.sink_symbol.clone(),
73                source_kind: flow.source_kind.clone(),
74                depth: flow.depth,
75                call_chain: flow.call_chain.clone(),
76                sanitized,
77            });
78        }
79    }
80
81    Ok(results)
82}
83
84fn check_chain_sanitized(store: &GraphStore, root: &Path, chain: &[String]) -> bool {
85    let conn = match store.connection() {
86        Ok(c) => c,
87        Err(_) => return false,
88    };
89
90    for symbol_id in chain {
91        let result = conn.query(&format!(
92            "MATCH (s:Symbol) WHERE s.id = '{}' RETURN s.file, s.start_line, s.end_line",
93            crate::escape_str(symbol_id)
94        ));
95
96        if let Ok(result) = result {
97            for row in result {
98                if row.len() < 3 {
99                    continue;
100                }
101                let file = row[0].to_string();
102                let start: usize = row[1].to_string().parse().unwrap_or(0);
103                let end: usize = row[2].to_string().parse().unwrap_or(0);
104
105                if let Ok(content) = std::fs::read_to_string(root.join(&file)) {
106                    let lines: Vec<&str> = content.lines().collect();
107                    let start_idx = start.saturating_sub(1);
108                    let end_idx = end.min(lines.len());
109
110                    for line in &lines[start_idx..end_idx] {
111                        let lower = line.to_lowercase();
112                        for &pat in PATH_TRAVERSAL_SANITIZERS {
113                            if lower.contains(&pat.to_lowercase()) {
114                                return true;
115                            }
116                        }
117                    }
118                }
119            }
120        }
121    }
122
123    false
124}
125
126pub fn format_path_traversal(flows: &[PathTraversalFlow]) -> String {
127    if flows.is_empty() {
128        return "No multi-layer path traversal vulnerabilities detected.".to_string();
129    }
130
131    let active: Vec<_> = flows.iter().filter(|f| !f.sanitized).collect();
132    let sanitized = flows.len() - active.len();
133
134    let mut out = format!(
135        "Path traversal analysis: {} total ({} active, {} sanitized)\n\n",
136        flows.len(),
137        active.len(),
138        sanitized
139    );
140
141    if !active.is_empty() {
142        let intra: Vec<_> = active
143            .iter()
144            .filter(|f| f.kind == "intra-procedural")
145            .collect();
146        let inter: Vec<_> = active
147            .iter()
148            .filter(|f| f.kind == "inter-procedural")
149            .collect();
150
151        if !intra.is_empty() {
152            out.push_str(&format!("## Intra-procedural ({} flows)\n", intra.len()));
153            for f in &intra {
154                out.push_str(&format!(
155                    "  {} ({}) — same function\n",
156                    f.source_symbol, f.source_kind
157                ));
158            }
159            out.push('\n');
160        }
161
162        if !inter.is_empty() {
163            out.push_str(&format!("## Inter-procedural ({} flows)\n", inter.len()));
164            for f in &inter {
165                out.push_str(&format!(
166                    "  {} -> {} ({}, depth: {})\n    Chain: {}\n",
167                    f.source_symbol,
168                    f.sink_symbol,
169                    f.source_kind,
170                    f.depth,
171                    f.call_chain.join(" -> ")
172                ));
173            }
174            out.push('\n');
175        }
176    }
177
178    if sanitized > 0 {
179        out.push_str(&format!(
180            "\n--- {} flows sanitized (path normalization detected) ---\n",
181            sanitized
182        ));
183    }
184
185    out
186}
187
188#[cfg(test)]
189mod tests {
190    use super::*;
191
192    #[test]
193    fn test_format_empty() {
194        let result = format_path_traversal(&[]);
195        assert!(result.contains("No multi-layer"));
196    }
197
198    #[test]
199    fn test_format_with_flows() {
200        let flows = vec![
201            PathTraversalFlow {
202                kind: "intra-procedural",
203                source_symbol: "app.py::download".to_string(),
204                sink_symbol: "app.py::download".to_string(),
205                source_kind: "HttpParam".to_string(),
206                depth: 0,
207                call_chain: vec!["app.py::download".to_string()],
208                sanitized: false,
209            },
210            PathTraversalFlow {
211                kind: "inter-procedural",
212                source_symbol: "api.py::get_file".to_string(),
213                sink_symbol: "storage.py::read_file".to_string(),
214                source_kind: "HttpParam".to_string(),
215                depth: 1,
216                call_chain: vec![
217                    "api.py::get_file".to_string(),
218                    "storage.py::read_file".to_string(),
219                ],
220                sanitized: false,
221            },
222        ];
223        let result = format_path_traversal(&flows);
224        assert!(result.contains("Intra-procedural"));
225        assert!(result.contains("Inter-procedural"));
226        assert!(result.contains("2 active"));
227    }
228
229    #[test]
230    fn test_sanitized_flow() {
231        let flows = vec![PathTraversalFlow {
232            kind: "intra-procedural",
233            source_symbol: "app.py::download".to_string(),
234            sink_symbol: "app.py::download".to_string(),
235            source_kind: "HttpParam".to_string(),
236            depth: 0,
237            call_chain: vec!["app.py::download".to_string()],
238            sanitized: true,
239        }];
240        let result = format_path_traversal(&flows);
241        assert!(result.contains("0 active"));
242        assert!(result.contains("1 sanitized"));
243    }
244}