Skip to main content

agentshield/parser/
python.rs

1use std::path::{Path, PathBuf};
2
3use once_cell::sync::Lazy;
4use regex::Regex;
5
6use super::{CallSite, FunctionDef, LanguageParser, ParsedFile};
7use crate::analysis::cross_file::is_sanitizer;
8use crate::error::Result;
9use crate::ir::execution_surface::*;
10use crate::ir::{ArgumentSource, Language, SourceLocation};
11
12pub struct PythonParser;
13
14// Dangerous subprocess/exec functions
15static SUBPROCESS_PATTERNS: Lazy<Vec<&str>> = Lazy::new(|| {
16    vec![
17        "subprocess.run",
18        "subprocess.call",
19        "subprocess.check_call",
20        "subprocess.check_output",
21        "subprocess.Popen",
22        "os.system",
23        "os.popen",
24        "os.exec",
25        "os.execv",
26        "os.execve",
27        "os.execvp",
28    ]
29});
30
31// GitPython's `repo.git.*` methods are dynamic dispatchers that execute
32// `git <method> ...` as shell commands. We match the `.git.` segment.
33static GITPYTHON_RE: Lazy<Regex> =
34    Lazy::new(|| Regex::new(r"(?m)(\w+)\.git\.(\w+)\s*\(([^)]*)\)").unwrap());
35
36static NETWORK_PATTERNS: Lazy<Vec<&str>> = Lazy::new(|| {
37    vec![
38        "requests.get",
39        "requests.post",
40        "requests.put",
41        "requests.patch",
42        "requests.delete",
43        "requests.head",
44        "requests.request",
45        "urllib.request.urlopen",
46        "httpx.get",
47        "httpx.post",
48        "httpx.put",
49        // httpx.AsyncClient and aiohttp.ClientSession are tracked via
50        // HTTP_CLIENT_CTX_RE + HTTP_CLIENT_METHODS instead, so their actual
51        // method calls (client.get, session.post) are detected as network ops.
52    ]
53});
54
55// HTTP method names used on client variables (e.g. `client.get(url)` where
56// `client` was bound from `httpx.AsyncClient()` or `aiohttp.ClientSession()`).
57// Checked separately from NETWORK_PATTERNS because the caller object is a
58// variable, not a known module.
59static HTTP_CLIENT_METHODS: Lazy<Vec<&str>> = Lazy::new(|| {
60    vec![
61        "get", "post", "put", "patch", "delete", "head", "options", "request", "fetch", "send",
62    ]
63});
64
65// Regex to detect async context managers that produce HTTP clients.
66// Matches: `async with httpx.AsyncClient(...) as <name>:`
67//          `async with aiohttp.ClientSession(...) as <name>:`
68static HTTP_CLIENT_CTX_RE: Lazy<Regex> = Lazy::new(|| {
69    Regex::new(
70        r"(?m)async\s+with\s+(?:\w+\.)*(?:AsyncClient|ClientSession)\s*\([^)]*\)\s+as\s+(\w+)",
71    )
72    .unwrap()
73});
74
75static DYNAMIC_EXEC_PATTERNS: Lazy<Vec<&str>> =
76    Lazy::new(|| vec!["eval", "exec", "compile", "__import__"]);
77
78static SENSITIVE_ENV_VARS: Lazy<Regex> = Lazy::new(|| {
79    Regex::new(r"(?i)(AWS_|SECRET|TOKEN|PASSWORD|API_KEY|PRIVATE_KEY|CREDENTIALS|AUTH)").unwrap()
80});
81
82static FILE_READ_PATTERNS: Lazy<Vec<&str>> = Lazy::new(|| vec!["open", "pathlib.Path"]);
83
84// Regex to find function calls with arguments: func_name(args)
85static CALL_RE: Lazy<Regex> =
86    Lazy::new(|| Regex::new(r"(?m)(\w+(?:\.\w+)*)\s*\(([^)]*)\)").unwrap());
87
88// Regex to find the start of a multi-line call: func_name( with no closing )
89// Captures the function name so we can match it against patterns, then look
90// ahead to the next line(s) for the first argument.
91static PARTIAL_CALL_RE: Lazy<Regex> =
92    Lazy::new(|| Regex::new(r"(\w+(?:\.\w+)*)\s*\(\s*$").unwrap());
93
94// Regex to find os.environ / os.getenv patterns
95static ENV_ACCESS_RE: Lazy<Regex> = Lazy::new(|| {
96    Regex::new(
97        r#"(?m)os\.(?:environ\s*(?:\[\s*["']([^"']+)["']\s*\]|\.get\s*\(\s*["']([^"']+)["'])|getenv\s*\(\s*["']([^"']+)["']\s*\))"#,
98    )
99    .unwrap()
100});
101
102// Regex to find function definitions and their parameters
103static FUNC_DEF_RE: Lazy<Regex> =
104    Lazy::new(|| Regex::new(r"(?m)^\s*(?:async\s+)?def\s+(\w+)\s*\(([^)]*)\)").unwrap());
105
106// Sanitizer assignment: valid_path = validate_path(x) or valid_path = await validate_path(x)
107static SANITIZER_ASSIGN_RE: Lazy<Regex> =
108    Lazy::new(|| Regex::new(r"(\w+)\s*=\s*(?:await\s+)?(\w+(?:\.\w+)*)\s*\(").unwrap());
109
110impl LanguageParser for PythonParser {
111    fn language(&self) -> Language {
112        Language::Python
113    }
114
115    fn parse_file(&self, path: &Path, content: &str) -> Result<ParsedFile> {
116        let mut parsed = ParsedFile::default();
117        let file_path = PathBuf::from(path);
118
119        // Detect sanitizer assignments: safe_path = validate_path(x)
120        for cap in SANITIZER_ASSIGN_RE.captures_iter(content) {
121            let var_name = &cap[1];
122            let func_name = &cap[2];
123            if is_sanitizer(func_name) {
124                parsed.sanitized_vars.insert(var_name.to_string());
125            }
126        }
127
128        // Collect function parameter names + FunctionDef entries
129        let mut param_names = std::collections::HashSet::new();
130        for cap in FUNC_DEF_RE.captures_iter(content) {
131            let func_name = &cap[1];
132            let params_str = &cap[2];
133            // In Python, functions starting with _ are conventionally private
134            let is_exported = !func_name.starts_with('_');
135
136            let mut func_params = Vec::new();
137            for param in params_str.split(',') {
138                let param = param.trim().split(':').next().unwrap_or("").trim();
139                let param = param.split('=').next().unwrap_or("").trim();
140                if !param.is_empty() && param != "self" && param != "cls" {
141                    param_names.insert(param.to_string());
142                    func_params.push(param.to_string());
143                }
144            }
145
146            // Find line number for this function def
147            let func_line = content[..cap.get(0).map(|m| m.start()).unwrap_or(0)]
148                .lines()
149                .count()
150                + 1;
151
152            parsed.function_defs.push(FunctionDef {
153                name: func_name.to_string(),
154                params: func_params,
155                is_exported,
156                location: loc(&file_path, func_line),
157            });
158        }
159
160        // Collect variable names bound to HTTP clients via async context managers
161        // e.g. `async with httpx.AsyncClient() as client:` → "client"
162        let mut http_client_vars = std::collections::HashSet::new();
163        for cap in HTTP_CLIENT_CTX_RE.captures_iter(content) {
164            http_client_vars.insert(cap[1].to_string());
165        }
166
167        // Collect lines for look-ahead on multi-line calls
168        let lines: Vec<&str> = content.lines().collect();
169
170        // Scan line by line for patterns
171        for (line_idx, line) in lines.iter().enumerate() {
172            let line_num = line_idx + 1;
173            let trimmed = line.trim();
174
175            // Skip comments
176            if trimmed.starts_with('#') {
177                continue;
178            }
179
180            // Check env var access
181            for cap in ENV_ACCESS_RE.captures_iter(line) {
182                let var_name = cap
183                    .get(1)
184                    .or_else(|| cap.get(2))
185                    .or_else(|| cap.get(3))
186                    .map(|m| m.as_str().to_string())
187                    .unwrap_or_default();
188                let is_sensitive = SENSITIVE_ENV_VARS.is_match(&var_name);
189                parsed.env_accesses.push(EnvAccess {
190                    var_name: ArgumentSource::Literal(var_name),
191                    is_sensitive,
192                    location: loc(&file_path, line_num),
193                });
194            }
195
196            // Check function calls
197            for cap in CALL_RE.captures_iter(line) {
198                let func_name = &cap[1];
199                let args_str = &cap[2];
200
201                let arg_source = classify_argument(args_str, &param_names, &parsed.sanitized_vars);
202
203                // Record CallSite for cross-file analysis
204                let all_args = args_str
205                    .split(',')
206                    .map(|a| classify_argument(a.trim(), &param_names, &parsed.sanitized_vars))
207                    .collect::<Vec<_>>();
208                parsed.call_sites.push(CallSite {
209                    callee: func_name.to_string(),
210                    arguments: all_args,
211                    caller: None, // Could be improved with indentation tracking
212                    location: loc(&file_path, line_num),
213                });
214
215                // Subprocess/command execution
216                if SUBPROCESS_PATTERNS
217                    .iter()
218                    .any(|p| func_name.ends_with(p) || func_name == *p)
219                {
220                    parsed.commands.push(CommandInvocation {
221                        function: func_name.to_string(),
222                        command_arg: arg_source.clone(),
223                        location: loc(&file_path, line_num),
224                    });
225                }
226
227                // Network operations
228                if NETWORK_PATTERNS
229                    .iter()
230                    .any(|p| func_name.ends_with(p) || func_name == *p)
231                {
232                    let sends_data = func_name.contains("post")
233                        || func_name.contains("put")
234                        || func_name.contains("patch")
235                        || args_str.contains("data=")
236                        || args_str.contains("json=");
237                    let method = if func_name.contains("get") {
238                        Some("GET".into())
239                    } else if func_name.contains("post") {
240                        Some("POST".into())
241                    } else if func_name.contains("put") {
242                        Some("PUT".into())
243                    } else {
244                        None
245                    };
246                    parsed.network_operations.push(NetworkOperation {
247                        function: func_name.to_string(),
248                        url_arg: arg_source.clone(),
249                        method,
250                        sends_data,
251                        location: loc(&file_path, line_num),
252                    });
253                }
254
255                // Dynamic exec
256                if DYNAMIC_EXEC_PATTERNS.contains(&func_name) {
257                    parsed.dynamic_exec.push(DynamicExec {
258                        function: func_name.to_string(),
259                        code_arg: arg_source.clone(),
260                        location: loc(&file_path, line_num),
261                    });
262                }
263
264                // File operations (open with write mode)
265                if FILE_READ_PATTERNS
266                    .iter()
267                    .any(|p| func_name.ends_with(p) || func_name == *p)
268                {
269                    let op_type = if args_str.contains("'w")
270                        || args_str.contains("\"w")
271                        || args_str.contains("'a")
272                        || args_str.contains("\"a")
273                    {
274                        FileOpType::Write
275                    } else {
276                        FileOpType::Read
277                    };
278                    parsed.file_operations.push(FileOperation {
279                        operation: op_type,
280                        path_arg: arg_source.clone(),
281                        location: loc(&file_path, line_num),
282                    });
283                }
284
285                // HTTP client variable method calls (FN-1 fix):
286                // Detect `client.get(url)` where `client` was bound from
287                // `async with AsyncClient() as client:`.
288                if func_name.contains('.') {
289                    let parts: Vec<&str> = func_name.rsplitn(2, '.').collect();
290                    if parts.len() == 2 {
291                        let method = parts[0];
292                        let obj = parts[1];
293                        if http_client_vars.contains(obj) && HTTP_CLIENT_METHODS.contains(&method) {
294                            let sends_data = method == "post"
295                                || method == "put"
296                                || method == "patch"
297                                || args_str.contains("data=")
298                                || args_str.contains("json=");
299                            let http_method = match method {
300                                "get" => Some("GET".into()),
301                                "post" => Some("POST".into()),
302                                "put" => Some("PUT".into()),
303                                "delete" => Some("DELETE".into()),
304                                "head" => Some("HEAD".into()),
305                                "patch" => Some("PATCH".into()),
306                                _ => None,
307                            };
308                            parsed.network_operations.push(NetworkOperation {
309                                function: func_name.to_string(),
310                                url_arg: arg_source.clone(),
311                                method: http_method,
312                                sends_data,
313                                location: loc(&file_path, line_num),
314                            });
315                        }
316                    }
317                }
318            }
319
320            // GitPython command execution (FN-2 fix):
321            // Detect `repo.git.log(...)`, `repo.git.add(...)`, etc.
322            for cap in GITPYTHON_RE.captures_iter(line) {
323                let full_call = format!("{}.git.{}", &cap[1], &cap[2]);
324                let args_str = &cap[3];
325                let arg_source = classify_argument(args_str, &param_names, &parsed.sanitized_vars);
326                parsed.commands.push(CommandInvocation {
327                    function: full_call,
328                    command_arg: arg_source,
329                    location: loc(&file_path, line_num),
330                });
331            }
332
333            // Multi-line call detection: handle calls like
334            //   client.get(
335            //       url,
336            //       follow_redirects=True,
337            //   )
338            // where CALL_RE fails because `(` and `)` are on different lines.
339            if let Some(cap) = PARTIAL_CALL_RE.captures(trimmed) {
340                let func_name = &cap[1];
341                // Look ahead to find the first argument on the next non-empty line
342                let first_arg_str = lines
343                    .get(line_idx + 1)
344                    .map(|l| l.trim().trim_end_matches(','))
345                    .unwrap_or("");
346                let arg_source =
347                    classify_argument(first_arg_str, &param_names, &parsed.sanitized_vars);
348
349                // Check all pattern categories for partial calls
350                if SUBPROCESS_PATTERNS
351                    .iter()
352                    .any(|p| func_name.ends_with(p) || func_name == *p)
353                {
354                    parsed.commands.push(CommandInvocation {
355                        function: func_name.to_string(),
356                        command_arg: arg_source.clone(),
357                        location: loc(&file_path, line_num),
358                    });
359                }
360                if NETWORK_PATTERNS
361                    .iter()
362                    .any(|p| func_name.ends_with(p) || func_name == *p)
363                {
364                    let sends_data = func_name.contains("post")
365                        || func_name.contains("put")
366                        || func_name.contains("patch");
367                    let method = if func_name.contains("get") {
368                        Some("GET".into())
369                    } else if func_name.contains("post") {
370                        Some("POST".into())
371                    } else if func_name.contains("put") {
372                        Some("PUT".into())
373                    } else {
374                        None
375                    };
376                    parsed.network_operations.push(NetworkOperation {
377                        function: func_name.to_string(),
378                        url_arg: arg_source.clone(),
379                        method,
380                        sends_data,
381                        location: loc(&file_path, line_num),
382                    });
383                }
384                if DYNAMIC_EXEC_PATTERNS.contains(&func_name) {
385                    parsed.dynamic_exec.push(DynamicExec {
386                        function: func_name.to_string(),
387                        code_arg: arg_source.clone(),
388                        location: loc(&file_path, line_num),
389                    });
390                }
391                if FILE_READ_PATTERNS
392                    .iter()
393                    .any(|p| func_name.ends_with(p) || func_name == *p)
394                {
395                    parsed.file_operations.push(FileOperation {
396                        operation: FileOpType::Read,
397                        path_arg: arg_source.clone(),
398                        location: loc(&file_path, line_num),
399                    });
400                }
401
402                // HTTP client variable methods (multi-line)
403                if func_name.contains('.') {
404                    let parts: Vec<&str> = func_name.rsplitn(2, '.').collect();
405                    if parts.len() == 2 {
406                        let method = parts[0];
407                        let obj = parts[1];
408                        if http_client_vars.contains(obj) && HTTP_CLIENT_METHODS.contains(&method) {
409                            let sends_data =
410                                method == "post" || method == "put" || method == "patch";
411                            let http_method = match method {
412                                "get" => Some("GET".into()),
413                                "post" => Some("POST".into()),
414                                "put" => Some("PUT".into()),
415                                "delete" => Some("DELETE".into()),
416                                "head" => Some("HEAD".into()),
417                                "patch" => Some("PATCH".into()),
418                                _ => None,
419                            };
420                            parsed.network_operations.push(NetworkOperation {
421                                function: func_name.to_string(),
422                                url_arg: arg_source.clone(),
423                                method: http_method,
424                                sends_data,
425                                location: loc(&file_path, line_num),
426                            });
427                        }
428                    }
429                }
430            }
431        }
432
433        Ok(parsed)
434    }
435}
436
437/// Classify a call argument string to determine its source.
438fn classify_argument(
439    args_str: &str,
440    param_names: &std::collections::HashSet<String>,
441    sanitized_vars: &std::collections::HashSet<String>,
442) -> ArgumentSource {
443    let first_arg = args_str.split(',').next().unwrap_or("").trim();
444
445    if first_arg.is_empty() {
446        return ArgumentSource::Unknown;
447    }
448
449    // Check if this is a sanitized variable first
450    let ident = first_arg.split('.').next().unwrap_or(first_arg);
451    let ident = ident.split('[').next().unwrap_or(ident);
452    if sanitized_vars.contains(ident) {
453        return ArgumentSource::Sanitized {
454            sanitizer: ident.to_string(),
455        };
456    }
457
458    // String literal
459    if (first_arg.starts_with('"') && first_arg.ends_with('"'))
460        || (first_arg.starts_with('\'') && first_arg.ends_with('\''))
461    {
462        let val = &first_arg[1..first_arg.len() - 1];
463        return ArgumentSource::Literal(val.to_string());
464    }
465
466    // f-string or format
467    if first_arg.starts_with("f\"") || first_arg.starts_with("f'") || first_arg.contains(".format(")
468    {
469        return ArgumentSource::Interpolated;
470    }
471
472    // os.environ / env var
473    if first_arg.contains("os.environ") || first_arg.contains("os.getenv") {
474        return ArgumentSource::EnvVar {
475            name: first_arg.to_string(),
476        };
477    }
478
479    // Known function parameter
480    if param_names.contains(ident) {
481        return ArgumentSource::Parameter {
482            name: ident.to_string(),
483        };
484    }
485
486    ArgumentSource::Unknown
487}
488
489fn loc(file: &Path, line: usize) -> SourceLocation {
490    SourceLocation {
491        file: file.to_path_buf(),
492        line,
493        column: 0,
494        end_line: None,
495        end_column: None,
496    }
497}
498
499#[cfg(test)]
500mod tests {
501    use super::*;
502
503    #[test]
504    fn detects_subprocess_with_param() {
505        let code = r#"
506def handle(cmd: str):
507    subprocess.run(cmd, shell=True)
508"#;
509        let parsed = PythonParser.parse_file(Path::new("test.py"), code).unwrap();
510        assert_eq!(parsed.commands.len(), 1);
511        assert!(matches!(
512            parsed.commands[0].command_arg,
513            ArgumentSource::Parameter { .. }
514        ));
515    }
516
517    #[test]
518    fn detects_requests_get_with_param() {
519        let code = r#"
520def fetch(url: str):
521    requests.get(url)
522"#;
523        let parsed = PythonParser.parse_file(Path::new("test.py"), code).unwrap();
524        assert_eq!(parsed.network_operations.len(), 1);
525        assert!(matches!(
526            parsed.network_operations[0].url_arg,
527            ArgumentSource::Parameter { .. }
528        ));
529    }
530
531    #[test]
532    fn safe_literal_not_flagged_as_param() {
533        let code = r#"
534def fetch():
535    requests.get("https://api.example.com")
536"#;
537        let parsed = PythonParser.parse_file(Path::new("test.py"), code).unwrap();
538        assert_eq!(parsed.network_operations.len(), 1);
539        assert!(matches!(
540            parsed.network_operations[0].url_arg,
541            ArgumentSource::Literal(_)
542        ));
543    }
544
545    #[test]
546    fn detects_env_var_access() {
547        let code = r#"
548key = os.environ["AWS_SECRET_ACCESS_KEY"]
549"#;
550        let parsed = PythonParser.parse_file(Path::new("test.py"), code).unwrap();
551        assert_eq!(parsed.env_accesses.len(), 1);
552        assert!(parsed.env_accesses[0].is_sensitive);
553    }
554
555    #[test]
556    fn detects_eval() {
557        let code = r#"
558def run(code):
559    eval(code)
560"#;
561        let parsed = PythonParser.parse_file(Path::new("test.py"), code).unwrap();
562        assert_eq!(parsed.dynamic_exec.len(), 1);
563        assert!(matches!(
564            parsed.dynamic_exec[0].code_arg,
565            ArgumentSource::Parameter { .. }
566        ));
567    }
568
569    #[test]
570    fn detects_httpx_async_client_get() {
571        let code = r#"
572async def fetch(url: str):
573    async with httpx.AsyncClient() as client:
574        response = await client.get(url)
575"#;
576        let parsed = PythonParser.parse_file(Path::new("test.py"), code).unwrap();
577        assert_eq!(parsed.network_operations.len(), 1);
578        assert_eq!(parsed.network_operations[0].function, "client.get");
579        assert!(matches!(
580            parsed.network_operations[0].url_arg,
581            ArgumentSource::Parameter { .. }
582        ));
583    }
584
585    #[test]
586    fn detects_aiohttp_client_session_post() {
587        let code = r#"
588async def send_data(url: str, data):
589    async with aiohttp.ClientSession() as session:
590        await session.post(url, json=data)
591"#;
592        let parsed = PythonParser.parse_file(Path::new("test.py"), code).unwrap();
593        assert_eq!(parsed.network_operations.len(), 1);
594        assert_eq!(parsed.network_operations[0].function, "session.post");
595        assert!(parsed.network_operations[0].sends_data);
596    }
597
598    #[test]
599    fn detects_gitpython_command_execution() {
600        let code = r#"
601def git_log(repo, args):
602    repo.git.log(*args)
603"#;
604        let parsed = PythonParser.parse_file(Path::new("test.py"), code).unwrap();
605        assert_eq!(parsed.commands.len(), 1);
606        assert_eq!(parsed.commands[0].function, "repo.git.log");
607    }
608
609    #[test]
610    fn detects_gitpython_add_with_user_files() {
611        let code = r#"
612def stage_files(repo, files):
613    repo.git.add("--", *files)
614"#;
615        let parsed = PythonParser.parse_file(Path::new("test.py"), code).unwrap();
616        assert_eq!(parsed.commands.len(), 1);
617        assert_eq!(parsed.commands[0].function, "repo.git.add");
618    }
619
620    #[test]
621    fn no_false_positive_on_non_client_get() {
622        let code = r#"
623def process():
624    result = cache.get("key")
625"#;
626        let parsed = PythonParser.parse_file(Path::new("test.py"), code).unwrap();
627        assert!(parsed.network_operations.is_empty());
628    }
629
630    #[test]
631    fn detects_multiline_async_client_get() {
632        // Real-world pattern from the MCP fetch server
633        let code = r#"
634async def fetch_url(url: str):
635    async with AsyncClient(proxies=proxy_url) as client:
636        response = await client.get(
637            url,
638            follow_redirects=True,
639            headers={"User-Agent": user_agent},
640        )
641"#;
642        let parsed = PythonParser.parse_file(Path::new("test.py"), code).unwrap();
643        assert_eq!(
644            parsed.network_operations.len(),
645            1,
646            "should detect multi-line client.get() call"
647        );
648        assert_eq!(parsed.network_operations[0].function, "client.get");
649        assert!(matches!(
650            parsed.network_operations[0].url_arg,
651            ArgumentSource::Parameter { .. }
652        ));
653    }
654
655    #[test]
656    fn detects_multiline_subprocess_run() {
657        let code = r#"
658def execute(cmd: str):
659    subprocess.run(
660        cmd,
661        shell=True,
662        capture_output=True,
663    )
664"#;
665        let parsed = PythonParser.parse_file(Path::new("test.py"), code).unwrap();
666        assert_eq!(
667            parsed.commands.len(),
668            1,
669            "should detect multi-line subprocess.run() call"
670        );
671    }
672
673    // ── Cross-file support tests ──
674
675    #[test]
676    fn extracts_python_function_defs() {
677        let code = r#"
678def read_file(path: str) -> str:
679    with open(path) as f:
680        return f.read()
681
682def _internal_helper(x):
683    return x + 1
684"#;
685        let parsed = PythonParser.parse_file(Path::new("lib.py"), code).unwrap();
686        assert!(parsed.function_defs.len() >= 2);
687
688        let read_file = parsed.function_defs.iter().find(|d| d.name == "read_file");
689        assert!(read_file.is_some());
690        assert!(read_file.unwrap().is_exported); // no underscore prefix
691        assert_eq!(read_file.unwrap().params, vec!["path"]);
692
693        let helper = parsed
694            .function_defs
695            .iter()
696            .find(|d| d.name == "_internal_helper");
697        assert!(helper.is_some());
698        assert!(!helper.unwrap().is_exported); // underscore prefix = private
699    }
700
701    #[test]
702    fn detects_python_sanitizer_assignment() {
703        let code = r#"
704def handler(raw_path: str):
705    safe_path = os.path.realpath(raw_path)
706    with open(safe_path) as f:
707        return f.read()
708"#;
709        let parsed = PythonParser.parse_file(Path::new("test.py"), code).unwrap();
710        assert!(parsed.sanitized_vars.contains("safe_path"));
711    }
712
713    #[test]
714    fn extracts_python_call_sites() {
715        let code = r#"
716def handler(args):
717    safe_path = os.path.realpath(args.path)
718    content = read_file(safe_path)
719    return content
720"#;
721        let parsed = PythonParser.parse_file(Path::new("test.py"), code).unwrap();
722        let rf_call = parsed.call_sites.iter().find(|cs| cs.callee == "read_file");
723        assert!(rf_call.is_some(), "Should find read_file call site");
724        let rf = rf_call.unwrap();
725        assert!(!rf.arguments.is_empty());
726        assert!(
727            matches!(&rf.arguments[0], ArgumentSource::Sanitized { .. }),
728            "safe_path should be Sanitized, got: {:?}",
729            rf.arguments[0]
730        );
731    }
732}