1use std::path::{Path, PathBuf};
2
3use once_cell::sync::Lazy;
4use regex::Regex;
5
6use super::{CallSite, FunctionDef, LanguageParser, ParsedFile};
7use crate::analysis::cross_file::is_sanitizer;
8use crate::error::Result;
9use crate::ir::execution_surface::*;
10use crate::ir::{ArgumentSource, Language, SourceLocation};
11
12pub struct PythonParser;
13
14static SUBPROCESS_PATTERNS: Lazy<Vec<&str>> = Lazy::new(|| {
16 vec![
17 "subprocess.run",
18 "subprocess.call",
19 "subprocess.check_call",
20 "subprocess.check_output",
21 "subprocess.Popen",
22 "os.system",
23 "os.popen",
24 "os.exec",
25 "os.execv",
26 "os.execve",
27 "os.execvp",
28 ]
29});
30
31static GITPYTHON_RE: Lazy<Regex> =
34 Lazy::new(|| Regex::new(r"(?m)(\w+)\.git\.(\w+)\s*\(([^)]*)\)").unwrap());
35
36static NETWORK_PATTERNS: Lazy<Vec<&str>> = Lazy::new(|| {
37 vec![
38 "requests.get",
39 "requests.post",
40 "requests.put",
41 "requests.patch",
42 "requests.delete",
43 "requests.head",
44 "requests.request",
45 "urllib.request.urlopen",
46 "httpx.get",
47 "httpx.post",
48 "httpx.put",
49 ]
53});
54
55static HTTP_CLIENT_METHODS: Lazy<Vec<&str>> = Lazy::new(|| {
60 vec![
61 "get", "post", "put", "patch", "delete", "head", "options", "request", "fetch", "send",
62 ]
63});
64
65static HTTP_CLIENT_CTX_RE: Lazy<Regex> = Lazy::new(|| {
69 Regex::new(
70 r"(?m)async\s+with\s+(?:\w+\.)*(?:AsyncClient|ClientSession)\s*\([^)]*\)\s+as\s+(\w+)",
71 )
72 .unwrap()
73});
74
75static DYNAMIC_EXEC_PATTERNS: Lazy<Vec<&str>> =
76 Lazy::new(|| vec!["eval", "exec", "compile", "__import__"]);
77
78static SENSITIVE_ENV_VARS: Lazy<Regex> = Lazy::new(|| {
79 Regex::new(r"(?i)(AWS_|SECRET|TOKEN|PASSWORD|API_KEY|PRIVATE_KEY|CREDENTIALS|AUTH)").unwrap()
80});
81
82static FILE_READ_PATTERNS: Lazy<Vec<&str>> = Lazy::new(|| vec!["open", "pathlib.Path"]);
83
84static CALL_RE: Lazy<Regex> =
86 Lazy::new(|| Regex::new(r"(?m)(\w+(?:\.\w+)*)\s*\(([^)]*)\)").unwrap());
87
88static PARTIAL_CALL_RE: Lazy<Regex> =
92 Lazy::new(|| Regex::new(r"(\w+(?:\.\w+)*)\s*\(\s*$").unwrap());
93
94static ENV_ACCESS_RE: Lazy<Regex> = Lazy::new(|| {
96 Regex::new(
97 r#"(?m)os\.(?:environ\s*(?:\[\s*["']([^"']+)["']\s*\]|\.get\s*\(\s*["']([^"']+)["'])|getenv\s*\(\s*["']([^"']+)["']\s*\))"#,
98 )
99 .unwrap()
100});
101
102static FUNC_DEF_RE: Lazy<Regex> =
104 Lazy::new(|| Regex::new(r"(?m)^\s*(?:async\s+)?def\s+(\w+)\s*\(([^)]*)\)").unwrap());
105
106static SANITIZER_ASSIGN_RE: Lazy<Regex> =
108 Lazy::new(|| Regex::new(r"(\w+)\s*=\s*(?:await\s+)?(\w+(?:\.\w+)*)\s*\(").unwrap());
109
110impl LanguageParser for PythonParser {
111 fn language(&self) -> Language {
112 Language::Python
113 }
114
115 fn parse_file(&self, path: &Path, content: &str) -> Result<ParsedFile> {
116 let mut parsed = ParsedFile::default();
117 let file_path = PathBuf::from(path);
118
119 for cap in SANITIZER_ASSIGN_RE.captures_iter(content) {
121 let var_name = &cap[1];
122 let func_name = &cap[2];
123 if is_sanitizer(func_name) {
124 parsed.sanitized_vars.insert(var_name.to_string());
125 }
126 }
127
128 let mut param_names = std::collections::HashSet::new();
130 for cap in FUNC_DEF_RE.captures_iter(content) {
131 let func_name = &cap[1];
132 let params_str = &cap[2];
133 let is_exported = !func_name.starts_with('_');
135
136 let mut func_params = Vec::new();
137 for param in params_str.split(',') {
138 let param = param.trim().split(':').next().unwrap_or("").trim();
139 let param = param.split('=').next().unwrap_or("").trim();
140 if !param.is_empty() && param != "self" && param != "cls" {
141 param_names.insert(param.to_string());
142 func_params.push(param.to_string());
143 }
144 }
145
146 let func_line = content[..cap.get(0).map(|m| m.start()).unwrap_or(0)]
148 .lines()
149 .count()
150 + 1;
151
152 parsed.function_defs.push(FunctionDef {
153 name: func_name.to_string(),
154 params: func_params,
155 is_exported,
156 location: loc(&file_path, func_line),
157 });
158 }
159
160 let mut http_client_vars = std::collections::HashSet::new();
163 for cap in HTTP_CLIENT_CTX_RE.captures_iter(content) {
164 http_client_vars.insert(cap[1].to_string());
165 }
166
167 let lines: Vec<&str> = content.lines().collect();
169
170 for (line_idx, line) in lines.iter().enumerate() {
172 let line_num = line_idx + 1;
173 let trimmed = line.trim();
174
175 if trimmed.starts_with('#') {
177 continue;
178 }
179
180 for cap in ENV_ACCESS_RE.captures_iter(line) {
182 let var_name = cap
183 .get(1)
184 .or_else(|| cap.get(2))
185 .or_else(|| cap.get(3))
186 .map(|m| m.as_str().to_string())
187 .unwrap_or_default();
188 let is_sensitive = SENSITIVE_ENV_VARS.is_match(&var_name);
189 parsed.env_accesses.push(EnvAccess {
190 var_name: ArgumentSource::Literal(var_name),
191 is_sensitive,
192 location: loc(&file_path, line_num),
193 });
194 }
195
196 for cap in CALL_RE.captures_iter(line) {
198 let func_name = &cap[1];
199 let args_str = &cap[2];
200
201 let arg_source = classify_argument(args_str, ¶m_names, &parsed.sanitized_vars);
202
203 let all_args = args_str
205 .split(',')
206 .map(|a| classify_argument(a.trim(), ¶m_names, &parsed.sanitized_vars))
207 .collect::<Vec<_>>();
208 parsed.call_sites.push(CallSite {
209 callee: func_name.to_string(),
210 arguments: all_args,
211 caller: None, location: loc(&file_path, line_num),
213 });
214
215 if SUBPROCESS_PATTERNS
217 .iter()
218 .any(|p| func_name.ends_with(p) || func_name == *p)
219 {
220 parsed.commands.push(CommandInvocation {
221 function: func_name.to_string(),
222 command_arg: arg_source.clone(),
223 location: loc(&file_path, line_num),
224 });
225 }
226
227 if NETWORK_PATTERNS
229 .iter()
230 .any(|p| func_name.ends_with(p) || func_name == *p)
231 {
232 let sends_data = func_name.contains("post")
233 || func_name.contains("put")
234 || func_name.contains("patch")
235 || args_str.contains("data=")
236 || args_str.contains("json=");
237 let method = if func_name.contains("get") {
238 Some("GET".into())
239 } else if func_name.contains("post") {
240 Some("POST".into())
241 } else if func_name.contains("put") {
242 Some("PUT".into())
243 } else {
244 None
245 };
246 parsed.network_operations.push(NetworkOperation {
247 function: func_name.to_string(),
248 url_arg: arg_source.clone(),
249 method,
250 sends_data,
251 location: loc(&file_path, line_num),
252 });
253 }
254
255 if DYNAMIC_EXEC_PATTERNS.contains(&func_name) {
257 parsed.dynamic_exec.push(DynamicExec {
258 function: func_name.to_string(),
259 code_arg: arg_source.clone(),
260 location: loc(&file_path, line_num),
261 });
262 }
263
264 if FILE_READ_PATTERNS
266 .iter()
267 .any(|p| func_name.ends_with(p) || func_name == *p)
268 {
269 let op_type = if args_str.contains("'w")
270 || args_str.contains("\"w")
271 || args_str.contains("'a")
272 || args_str.contains("\"a")
273 {
274 FileOpType::Write
275 } else {
276 FileOpType::Read
277 };
278 parsed.file_operations.push(FileOperation {
279 operation: op_type,
280 path_arg: arg_source.clone(),
281 location: loc(&file_path, line_num),
282 });
283 }
284
285 if func_name.contains('.') {
289 let parts: Vec<&str> = func_name.rsplitn(2, '.').collect();
290 if parts.len() == 2 {
291 let method = parts[0];
292 let obj = parts[1];
293 if http_client_vars.contains(obj) && HTTP_CLIENT_METHODS.contains(&method) {
294 let sends_data = method == "post"
295 || method == "put"
296 || method == "patch"
297 || args_str.contains("data=")
298 || args_str.contains("json=");
299 let http_method = match method {
300 "get" => Some("GET".into()),
301 "post" => Some("POST".into()),
302 "put" => Some("PUT".into()),
303 "delete" => Some("DELETE".into()),
304 "head" => Some("HEAD".into()),
305 "patch" => Some("PATCH".into()),
306 _ => None,
307 };
308 parsed.network_operations.push(NetworkOperation {
309 function: func_name.to_string(),
310 url_arg: arg_source.clone(),
311 method: http_method,
312 sends_data,
313 location: loc(&file_path, line_num),
314 });
315 }
316 }
317 }
318 }
319
320 for cap in GITPYTHON_RE.captures_iter(line) {
323 let full_call = format!("{}.git.{}", &cap[1], &cap[2]);
324 let args_str = &cap[3];
325 let arg_source = classify_argument(args_str, ¶m_names, &parsed.sanitized_vars);
326 parsed.commands.push(CommandInvocation {
327 function: full_call,
328 command_arg: arg_source,
329 location: loc(&file_path, line_num),
330 });
331 }
332
333 if let Some(cap) = PARTIAL_CALL_RE.captures(trimmed) {
340 let func_name = &cap[1];
341 let first_arg_str = lines
343 .get(line_idx + 1)
344 .map(|l| l.trim().trim_end_matches(','))
345 .unwrap_or("");
346 let arg_source =
347 classify_argument(first_arg_str, ¶m_names, &parsed.sanitized_vars);
348
349 if SUBPROCESS_PATTERNS
351 .iter()
352 .any(|p| func_name.ends_with(p) || func_name == *p)
353 {
354 parsed.commands.push(CommandInvocation {
355 function: func_name.to_string(),
356 command_arg: arg_source.clone(),
357 location: loc(&file_path, line_num),
358 });
359 }
360 if NETWORK_PATTERNS
361 .iter()
362 .any(|p| func_name.ends_with(p) || func_name == *p)
363 {
364 let sends_data = func_name.contains("post")
365 || func_name.contains("put")
366 || func_name.contains("patch");
367 let method = if func_name.contains("get") {
368 Some("GET".into())
369 } else if func_name.contains("post") {
370 Some("POST".into())
371 } else if func_name.contains("put") {
372 Some("PUT".into())
373 } else {
374 None
375 };
376 parsed.network_operations.push(NetworkOperation {
377 function: func_name.to_string(),
378 url_arg: arg_source.clone(),
379 method,
380 sends_data,
381 location: loc(&file_path, line_num),
382 });
383 }
384 if DYNAMIC_EXEC_PATTERNS.contains(&func_name) {
385 parsed.dynamic_exec.push(DynamicExec {
386 function: func_name.to_string(),
387 code_arg: arg_source.clone(),
388 location: loc(&file_path, line_num),
389 });
390 }
391 if FILE_READ_PATTERNS
392 .iter()
393 .any(|p| func_name.ends_with(p) || func_name == *p)
394 {
395 parsed.file_operations.push(FileOperation {
396 operation: FileOpType::Read,
397 path_arg: arg_source.clone(),
398 location: loc(&file_path, line_num),
399 });
400 }
401
402 if func_name.contains('.') {
404 let parts: Vec<&str> = func_name.rsplitn(2, '.').collect();
405 if parts.len() == 2 {
406 let method = parts[0];
407 let obj = parts[1];
408 if http_client_vars.contains(obj) && HTTP_CLIENT_METHODS.contains(&method) {
409 let sends_data =
410 method == "post" || method == "put" || method == "patch";
411 let http_method = match method {
412 "get" => Some("GET".into()),
413 "post" => Some("POST".into()),
414 "put" => Some("PUT".into()),
415 "delete" => Some("DELETE".into()),
416 "head" => Some("HEAD".into()),
417 "patch" => Some("PATCH".into()),
418 _ => None,
419 };
420 parsed.network_operations.push(NetworkOperation {
421 function: func_name.to_string(),
422 url_arg: arg_source.clone(),
423 method: http_method,
424 sends_data,
425 location: loc(&file_path, line_num),
426 });
427 }
428 }
429 }
430 }
431 }
432
433 Ok(parsed)
434 }
435}
436
437fn classify_argument(
439 args_str: &str,
440 param_names: &std::collections::HashSet<String>,
441 sanitized_vars: &std::collections::HashSet<String>,
442) -> ArgumentSource {
443 let first_arg = args_str.split(',').next().unwrap_or("").trim();
444
445 if first_arg.is_empty() {
446 return ArgumentSource::Unknown;
447 }
448
449 let ident = first_arg.split('.').next().unwrap_or(first_arg);
451 let ident = ident.split('[').next().unwrap_or(ident);
452 if sanitized_vars.contains(ident) {
453 return ArgumentSource::Sanitized {
454 sanitizer: ident.to_string(),
455 };
456 }
457
458 if (first_arg.starts_with('"') && first_arg.ends_with('"'))
460 || (first_arg.starts_with('\'') && first_arg.ends_with('\''))
461 {
462 let val = &first_arg[1..first_arg.len() - 1];
463 return ArgumentSource::Literal(val.to_string());
464 }
465
466 if first_arg.starts_with("f\"") || first_arg.starts_with("f'") || first_arg.contains(".format(")
468 {
469 return ArgumentSource::Interpolated;
470 }
471
472 if first_arg.contains("os.environ") || first_arg.contains("os.getenv") {
474 return ArgumentSource::EnvVar {
475 name: first_arg.to_string(),
476 };
477 }
478
479 if param_names.contains(ident) {
481 return ArgumentSource::Parameter {
482 name: ident.to_string(),
483 };
484 }
485
486 ArgumentSource::Unknown
487}
488
489fn loc(file: &Path, line: usize) -> SourceLocation {
490 SourceLocation {
491 file: file.to_path_buf(),
492 line,
493 column: 0,
494 end_line: None,
495 end_column: None,
496 }
497}
498
499#[cfg(test)]
500mod tests {
501 use super::*;
502
503 #[test]
504 fn detects_subprocess_with_param() {
505 let code = r#"
506def handle(cmd: str):
507 subprocess.run(cmd, shell=True)
508"#;
509 let parsed = PythonParser.parse_file(Path::new("test.py"), code).unwrap();
510 assert_eq!(parsed.commands.len(), 1);
511 assert!(matches!(
512 parsed.commands[0].command_arg,
513 ArgumentSource::Parameter { .. }
514 ));
515 }
516
517 #[test]
518 fn detects_requests_get_with_param() {
519 let code = r#"
520def fetch(url: str):
521 requests.get(url)
522"#;
523 let parsed = PythonParser.parse_file(Path::new("test.py"), code).unwrap();
524 assert_eq!(parsed.network_operations.len(), 1);
525 assert!(matches!(
526 parsed.network_operations[0].url_arg,
527 ArgumentSource::Parameter { .. }
528 ));
529 }
530
531 #[test]
532 fn safe_literal_not_flagged_as_param() {
533 let code = r#"
534def fetch():
535 requests.get("https://api.example.com")
536"#;
537 let parsed = PythonParser.parse_file(Path::new("test.py"), code).unwrap();
538 assert_eq!(parsed.network_operations.len(), 1);
539 assert!(matches!(
540 parsed.network_operations[0].url_arg,
541 ArgumentSource::Literal(_)
542 ));
543 }
544
545 #[test]
546 fn detects_env_var_access() {
547 let code = r#"
548key = os.environ["AWS_SECRET_ACCESS_KEY"]
549"#;
550 let parsed = PythonParser.parse_file(Path::new("test.py"), code).unwrap();
551 assert_eq!(parsed.env_accesses.len(), 1);
552 assert!(parsed.env_accesses[0].is_sensitive);
553 }
554
555 #[test]
556 fn detects_eval() {
557 let code = r#"
558def run(code):
559 eval(code)
560"#;
561 let parsed = PythonParser.parse_file(Path::new("test.py"), code).unwrap();
562 assert_eq!(parsed.dynamic_exec.len(), 1);
563 assert!(matches!(
564 parsed.dynamic_exec[0].code_arg,
565 ArgumentSource::Parameter { .. }
566 ));
567 }
568
569 #[test]
570 fn detects_httpx_async_client_get() {
571 let code = r#"
572async def fetch(url: str):
573 async with httpx.AsyncClient() as client:
574 response = await client.get(url)
575"#;
576 let parsed = PythonParser.parse_file(Path::new("test.py"), code).unwrap();
577 assert_eq!(parsed.network_operations.len(), 1);
578 assert_eq!(parsed.network_operations[0].function, "client.get");
579 assert!(matches!(
580 parsed.network_operations[0].url_arg,
581 ArgumentSource::Parameter { .. }
582 ));
583 }
584
585 #[test]
586 fn detects_aiohttp_client_session_post() {
587 let code = r#"
588async def send_data(url: str, data):
589 async with aiohttp.ClientSession() as session:
590 await session.post(url, json=data)
591"#;
592 let parsed = PythonParser.parse_file(Path::new("test.py"), code).unwrap();
593 assert_eq!(parsed.network_operations.len(), 1);
594 assert_eq!(parsed.network_operations[0].function, "session.post");
595 assert!(parsed.network_operations[0].sends_data);
596 }
597
598 #[test]
599 fn detects_gitpython_command_execution() {
600 let code = r#"
601def git_log(repo, args):
602 repo.git.log(*args)
603"#;
604 let parsed = PythonParser.parse_file(Path::new("test.py"), code).unwrap();
605 assert_eq!(parsed.commands.len(), 1);
606 assert_eq!(parsed.commands[0].function, "repo.git.log");
607 }
608
609 #[test]
610 fn detects_gitpython_add_with_user_files() {
611 let code = r#"
612def stage_files(repo, files):
613 repo.git.add("--", *files)
614"#;
615 let parsed = PythonParser.parse_file(Path::new("test.py"), code).unwrap();
616 assert_eq!(parsed.commands.len(), 1);
617 assert_eq!(parsed.commands[0].function, "repo.git.add");
618 }
619
620 #[test]
621 fn no_false_positive_on_non_client_get() {
622 let code = r#"
623def process():
624 result = cache.get("key")
625"#;
626 let parsed = PythonParser.parse_file(Path::new("test.py"), code).unwrap();
627 assert!(parsed.network_operations.is_empty());
628 }
629
630 #[test]
631 fn detects_multiline_async_client_get() {
632 let code = r#"
634async def fetch_url(url: str):
635 async with AsyncClient(proxies=proxy_url) as client:
636 response = await client.get(
637 url,
638 follow_redirects=True,
639 headers={"User-Agent": user_agent},
640 )
641"#;
642 let parsed = PythonParser.parse_file(Path::new("test.py"), code).unwrap();
643 assert_eq!(
644 parsed.network_operations.len(),
645 1,
646 "should detect multi-line client.get() call"
647 );
648 assert_eq!(parsed.network_operations[0].function, "client.get");
649 assert!(matches!(
650 parsed.network_operations[0].url_arg,
651 ArgumentSource::Parameter { .. }
652 ));
653 }
654
655 #[test]
656 fn detects_multiline_subprocess_run() {
657 let code = r#"
658def execute(cmd: str):
659 subprocess.run(
660 cmd,
661 shell=True,
662 capture_output=True,
663 )
664"#;
665 let parsed = PythonParser.parse_file(Path::new("test.py"), code).unwrap();
666 assert_eq!(
667 parsed.commands.len(),
668 1,
669 "should detect multi-line subprocess.run() call"
670 );
671 }
672
673 #[test]
676 fn extracts_python_function_defs() {
677 let code = r#"
678def read_file(path: str) -> str:
679 with open(path) as f:
680 return f.read()
681
682def _internal_helper(x):
683 return x + 1
684"#;
685 let parsed = PythonParser.parse_file(Path::new("lib.py"), code).unwrap();
686 assert!(parsed.function_defs.len() >= 2);
687
688 let read_file = parsed.function_defs.iter().find(|d| d.name == "read_file");
689 assert!(read_file.is_some());
690 assert!(read_file.unwrap().is_exported); assert_eq!(read_file.unwrap().params, vec!["path"]);
692
693 let helper = parsed
694 .function_defs
695 .iter()
696 .find(|d| d.name == "_internal_helper");
697 assert!(helper.is_some());
698 assert!(!helper.unwrap().is_exported); }
700
701 #[test]
702 fn detects_python_sanitizer_assignment() {
703 let code = r#"
704def handler(raw_path: str):
705 safe_path = os.path.realpath(raw_path)
706 with open(safe_path) as f:
707 return f.read()
708"#;
709 let parsed = PythonParser.parse_file(Path::new("test.py"), code).unwrap();
710 assert!(parsed.sanitized_vars.contains("safe_path"));
711 }
712
713 #[test]
714 fn extracts_python_call_sites() {
715 let code = r#"
716def handler(args):
717 safe_path = os.path.realpath(args.path)
718 content = read_file(safe_path)
719 return content
720"#;
721 let parsed = PythonParser.parse_file(Path::new("test.py"), code).unwrap();
722 let rf_call = parsed.call_sites.iter().find(|cs| cs.callee == "read_file");
723 assert!(rf_call.is_some(), "Should find read_file call site");
724 let rf = rf_call.unwrap();
725 assert!(!rf.arguments.is_empty());
726 assert!(
727 matches!(&rf.arguments[0], ArgumentSource::Sanitized { .. }),
728 "safe_path should be Sanitized, got: {:?}",
729 rf.arguments[0]
730 );
731 }
732}