1use std::path::{Path, PathBuf};
2
3use once_cell::sync::Lazy;
4use regex::Regex;
5
6use super::{CallSite, FunctionDef, LanguageParser, ParsedFile};
7use crate::analysis::cross_file::{sanitizer_category, sanitizer_label, SanitizerCategory};
8use crate::error::Result;
9use crate::ir::execution_surface::*;
10use crate::ir::{ArgumentSource, Language, SourceLocation};
11
12pub struct PythonParser;
13
14static SUBPROCESS_PATTERNS: Lazy<Vec<&str>> = Lazy::new(|| {
16 vec![
17 "subprocess.run",
18 "subprocess.call",
19 "subprocess.check_call",
20 "subprocess.check_output",
21 "subprocess.Popen",
22 "os.system",
23 "os.popen",
24 "os.exec",
25 "os.execv",
26 "os.execve",
27 "os.execvp",
28 ]
29});
30
31static GITPYTHON_RE: Lazy<Regex> =
34 Lazy::new(|| Regex::new(r"(?m)(\w+)\.git\.(\w+)\s*\(([^)]*)\)").unwrap());
35
36static NETWORK_PATTERNS: Lazy<Vec<&str>> = Lazy::new(|| {
37 vec![
38 "requests.get",
39 "requests.post",
40 "requests.put",
41 "requests.patch",
42 "requests.delete",
43 "requests.head",
44 "requests.request",
45 "urllib.request.urlopen",
46 "httpx.get",
47 "httpx.post",
48 "httpx.put",
49 ]
53});
54
55static HTTP_CLIENT_METHODS: Lazy<Vec<&str>> = Lazy::new(|| {
60 vec![
61 "get", "post", "put", "patch", "delete", "head", "options", "request", "fetch", "send",
62 ]
63});
64
65static HTTP_CLIENT_CTX_RE: Lazy<Regex> = Lazy::new(|| {
69 Regex::new(
70 r"(?m)async\s+with\s+(?:\w+\.)*(?:AsyncClient|ClientSession)\s*\([^)]*\)\s+as\s+(\w+)",
71 )
72 .unwrap()
73});
74
75static DYNAMIC_EXEC_PATTERNS: Lazy<Vec<&str>> =
76 Lazy::new(|| vec!["eval", "exec", "compile", "__import__"]);
77
78static SENSITIVE_ENV_VARS: Lazy<Regex> = Lazy::new(|| {
79 Regex::new(r"(?i)(AWS_|SECRET|TOKEN|PASSWORD|API_KEY|PRIVATE_KEY|CREDENTIALS|AUTH)").unwrap()
80});
81
82static FILE_READ_PATTERNS: Lazy<Vec<&str>> = Lazy::new(|| vec!["open", "pathlib.Path"]);
83
84static CALL_RE: Lazy<Regex> =
86 Lazy::new(|| Regex::new(r"(?m)(\w+(?:\.\w+)*)\s*\(([^)]*)\)").unwrap());
87
88static PARTIAL_CALL_RE: Lazy<Regex> =
92 Lazy::new(|| Regex::new(r"(\w+(?:\.\w+)*)\s*\(\s*$").unwrap());
93
94static ENV_ACCESS_RE: Lazy<Regex> = Lazy::new(|| {
96 Regex::new(
97 r#"(?m)os\.(?:environ\s*(?:\[\s*["']([^"']+)["']\s*\]|\.get\s*\(\s*["']([^"']+)["'])|getenv\s*\(\s*["']([^"']+)["']\s*\))"#,
98 )
99 .unwrap()
100});
101
102static FUNC_DEF_RE: Lazy<Regex> =
104 Lazy::new(|| Regex::new(r"(?m)^\s*(?:async\s+)?def\s+(\w+)\s*\(([^)]*)\)").unwrap());
105
106static SANITIZER_ASSIGN_RE: Lazy<Regex> =
108 Lazy::new(|| Regex::new(r"(\w+)\s*=\s*(?:await\s+)?(\w+(?:\.\w+)*)\s*\(").unwrap());
109
110impl LanguageParser for PythonParser {
111 fn language(&self) -> Language {
112 Language::Python
113 }
114
115 fn parse_file(&self, path: &Path, content: &str) -> Result<ParsedFile> {
116 let mut parsed = ParsedFile::default();
117 let file_path = PathBuf::from(path);
118
119 for cap in SANITIZER_ASSIGN_RE.captures_iter(content) {
121 let var_name = &cap[1];
122 let func_name = &cap[2];
123 if sanitizer_category(func_name)
124 .is_some_and(|category| !matches!(category, SanitizerCategory::Redaction))
125 {
126 parsed.sanitized_vars.insert(var_name.to_string());
127 if let Some(label) = sanitizer_label(func_name) {
128 parsed
129 .sanitized_vars
130 .insert(sanitized_var_marker(var_name, &label));
131 }
132 }
133 }
134
135 let mut param_names = std::collections::HashSet::new();
137 for cap in FUNC_DEF_RE.captures_iter(content) {
138 let func_name = &cap[1];
139 let params_str = &cap[2];
140 let is_exported = !func_name.starts_with('_');
142
143 let mut func_params = Vec::new();
144 for param in params_str.split(',') {
145 let param = param.trim().split(':').next().unwrap_or("").trim();
146 let param = param.split('=').next().unwrap_or("").trim();
147 if !param.is_empty() && param != "self" && param != "cls" {
148 param_names.insert(param.to_string());
149 func_params.push(param.to_string());
150 }
151 }
152
153 let func_line = content[..cap.get(0).map(|m| m.start()).unwrap_or(0)]
155 .lines()
156 .count()
157 + 1;
158
159 parsed.function_defs.push(FunctionDef {
160 name: func_name.to_string(),
161 params: func_params,
162 is_exported,
163 location: loc(&file_path, func_line),
164 });
165 }
166
167 let mut http_client_vars = std::collections::HashSet::new();
170 for cap in HTTP_CLIENT_CTX_RE.captures_iter(content) {
171 http_client_vars.insert(cap[1].to_string());
172 }
173
174 let lines: Vec<&str> = content.lines().collect();
176
177 for (line_idx, line) in lines.iter().enumerate() {
179 let line_num = line_idx + 1;
180 let trimmed = line.trim();
181
182 if trimmed.starts_with('#') {
184 continue;
185 }
186
187 for cap in ENV_ACCESS_RE.captures_iter(line) {
189 let var_name = cap
190 .get(1)
191 .or_else(|| cap.get(2))
192 .or_else(|| cap.get(3))
193 .map(|m| m.as_str().to_string())
194 .unwrap_or_default();
195 let is_sensitive = SENSITIVE_ENV_VARS.is_match(&var_name);
196 parsed.env_accesses.push(EnvAccess {
197 var_name: ArgumentSource::Literal(var_name),
198 is_sensitive,
199 location: loc(&file_path, line_num),
200 });
201 }
202
203 for cap in CALL_RE.captures_iter(line) {
205 let func_name = &cap[1];
206 let args_str = &cap[2];
207
208 let arg_source = classify_argument(args_str, ¶m_names, &parsed.sanitized_vars);
209
210 let all_args = args_str
212 .split(',')
213 .map(|a| classify_argument(a.trim(), ¶m_names, &parsed.sanitized_vars))
214 .collect::<Vec<_>>();
215 parsed.call_sites.push(CallSite {
216 callee: func_name.to_string(),
217 arguments: all_args,
218 caller: None, location: loc(&file_path, line_num),
220 });
221
222 if SUBPROCESS_PATTERNS
224 .iter()
225 .any(|p| func_name.ends_with(p) || func_name == *p)
226 {
227 parsed.commands.push(CommandInvocation {
228 function: func_name.to_string(),
229 command_arg: arg_source.clone(),
230 location: loc(&file_path, line_num),
231 });
232 }
233
234 if NETWORK_PATTERNS
236 .iter()
237 .any(|p| func_name.ends_with(p) || func_name == *p)
238 {
239 let sends_data = func_name.contains("post")
240 || func_name.contains("put")
241 || func_name.contains("patch")
242 || args_str.contains("data=")
243 || args_str.contains("json=");
244 let method = if func_name.contains("get") {
245 Some("GET".into())
246 } else if func_name.contains("post") {
247 Some("POST".into())
248 } else if func_name.contains("put") {
249 Some("PUT".into())
250 } else {
251 None
252 };
253 parsed.network_operations.push(NetworkOperation {
254 function: func_name.to_string(),
255 url_arg: arg_source.clone(),
256 method,
257 sends_data,
258 location: loc(&file_path, line_num),
259 });
260 }
261
262 if DYNAMIC_EXEC_PATTERNS.contains(&func_name) {
264 parsed.dynamic_exec.push(DynamicExec {
265 function: func_name.to_string(),
266 code_arg: arg_source.clone(),
267 location: loc(&file_path, line_num),
268 });
269 }
270
271 if FILE_READ_PATTERNS
273 .iter()
274 .any(|p| func_name.ends_with(p) || func_name == *p)
275 {
276 let op_type = if args_str.contains("'w")
277 || args_str.contains("\"w")
278 || args_str.contains("'a")
279 || args_str.contains("\"a")
280 {
281 FileOpType::Write
282 } else {
283 FileOpType::Read
284 };
285 parsed.file_operations.push(FileOperation {
286 operation: op_type,
287 path_arg: arg_source.clone(),
288 location: loc(&file_path, line_num),
289 });
290 }
291
292 if func_name.contains('.') {
296 let parts: Vec<&str> = func_name.rsplitn(2, '.').collect();
297 if parts.len() == 2 {
298 let method = parts[0];
299 let obj = parts[1];
300 if http_client_vars.contains(obj) && HTTP_CLIENT_METHODS.contains(&method) {
301 let sends_data = method == "post"
302 || method == "put"
303 || method == "patch"
304 || args_str.contains("data=")
305 || args_str.contains("json=");
306 let http_method = match method {
307 "get" => Some("GET".into()),
308 "post" => Some("POST".into()),
309 "put" => Some("PUT".into()),
310 "delete" => Some("DELETE".into()),
311 "head" => Some("HEAD".into()),
312 "patch" => Some("PATCH".into()),
313 _ => None,
314 };
315 parsed.network_operations.push(NetworkOperation {
316 function: func_name.to_string(),
317 url_arg: arg_source.clone(),
318 method: http_method,
319 sends_data,
320 location: loc(&file_path, line_num),
321 });
322 }
323 }
324 }
325 }
326
327 for cap in GITPYTHON_RE.captures_iter(line) {
330 let full_call = format!("{}.git.{}", &cap[1], &cap[2]);
331 let args_str = &cap[3];
332 let arg_source = classify_argument(args_str, ¶m_names, &parsed.sanitized_vars);
333 parsed.commands.push(CommandInvocation {
334 function: full_call,
335 command_arg: arg_source,
336 location: loc(&file_path, line_num),
337 });
338 }
339
340 if let Some(cap) = PARTIAL_CALL_RE.captures(trimmed) {
347 let func_name = &cap[1];
348 let first_arg_str = lines
350 .get(line_idx + 1)
351 .map(|l| l.trim().trim_end_matches(','))
352 .unwrap_or("");
353 let arg_source =
354 classify_argument(first_arg_str, ¶m_names, &parsed.sanitized_vars);
355
356 if SUBPROCESS_PATTERNS
358 .iter()
359 .any(|p| func_name.ends_with(p) || func_name == *p)
360 {
361 parsed.commands.push(CommandInvocation {
362 function: func_name.to_string(),
363 command_arg: arg_source.clone(),
364 location: loc(&file_path, line_num),
365 });
366 }
367 if NETWORK_PATTERNS
368 .iter()
369 .any(|p| func_name.ends_with(p) || func_name == *p)
370 {
371 let sends_data = func_name.contains("post")
372 || func_name.contains("put")
373 || func_name.contains("patch");
374 let method = if func_name.contains("get") {
375 Some("GET".into())
376 } else if func_name.contains("post") {
377 Some("POST".into())
378 } else if func_name.contains("put") {
379 Some("PUT".into())
380 } else {
381 None
382 };
383 parsed.network_operations.push(NetworkOperation {
384 function: func_name.to_string(),
385 url_arg: arg_source.clone(),
386 method,
387 sends_data,
388 location: loc(&file_path, line_num),
389 });
390 }
391 if DYNAMIC_EXEC_PATTERNS.contains(&func_name) {
392 parsed.dynamic_exec.push(DynamicExec {
393 function: func_name.to_string(),
394 code_arg: arg_source.clone(),
395 location: loc(&file_path, line_num),
396 });
397 }
398 if FILE_READ_PATTERNS
399 .iter()
400 .any(|p| func_name.ends_with(p) || func_name == *p)
401 {
402 parsed.file_operations.push(FileOperation {
403 operation: FileOpType::Read,
404 path_arg: arg_source.clone(),
405 location: loc(&file_path, line_num),
406 });
407 }
408
409 if func_name.contains('.') {
411 let parts: Vec<&str> = func_name.rsplitn(2, '.').collect();
412 if parts.len() == 2 {
413 let method = parts[0];
414 let obj = parts[1];
415 if http_client_vars.contains(obj) && HTTP_CLIENT_METHODS.contains(&method) {
416 let sends_data =
417 method == "post" || method == "put" || method == "patch";
418 let http_method = match method {
419 "get" => Some("GET".into()),
420 "post" => Some("POST".into()),
421 "put" => Some("PUT".into()),
422 "delete" => Some("DELETE".into()),
423 "head" => Some("HEAD".into()),
424 "patch" => Some("PATCH".into()),
425 _ => None,
426 };
427 parsed.network_operations.push(NetworkOperation {
428 function: func_name.to_string(),
429 url_arg: arg_source.clone(),
430 method: http_method,
431 sends_data,
432 location: loc(&file_path, line_num),
433 });
434 }
435 }
436 }
437 }
438 }
439
440 Ok(parsed)
441 }
442}
443
444fn classify_argument(
446 args_str: &str,
447 param_names: &std::collections::HashSet<String>,
448 sanitized_vars: &std::collections::HashSet<String>,
449) -> ArgumentSource {
450 let first_arg = args_str.split(',').next().unwrap_or("").trim();
451
452 if first_arg.is_empty() {
453 return ArgumentSource::Unknown;
454 }
455
456 let ident = first_arg.split('.').next().unwrap_or(first_arg);
458 let ident = ident.split('[').next().unwrap_or(ident);
459 if let Some(sanitizer) = sanitized_label_for_var(ident, sanitized_vars) {
460 return ArgumentSource::Sanitized { sanitizer };
461 }
462
463 if let Some(val) = strip_python_string_literal(first_arg) {
466 return ArgumentSource::Literal(val.to_string());
467 }
468
469 if first_arg.starts_with("f\"") || first_arg.starts_with("f'") || first_arg.contains(".format(")
471 {
472 return ArgumentSource::Interpolated;
473 }
474
475 if first_arg.contains("os.environ") || first_arg.contains("os.getenv") {
477 return ArgumentSource::EnvVar {
478 name: first_arg.to_string(),
479 };
480 }
481
482 if param_names.contains(ident) {
484 return ArgumentSource::Parameter {
485 name: ident.to_string(),
486 };
487 }
488
489 ArgumentSource::Unknown
490}
491
492fn strip_python_string_literal(arg: &str) -> Option<&str> {
493 arg.strip_prefix('"')
494 .and_then(|inner| inner.strip_suffix('"'))
495 .or_else(|| {
496 arg.strip_prefix('\'')
497 .and_then(|inner| inner.strip_suffix('\''))
498 })
499}
500
501fn sanitized_var_marker(var_name: &str, sanitizer_label: &str) -> String {
502 format!("{var_name}::{sanitizer_label}")
503}
504
505fn sanitized_label_for_var(
506 ident: &str,
507 sanitized_vars: &std::collections::HashSet<String>,
508) -> Option<String> {
509 for category in [
510 SanitizerCategory::Path,
511 SanitizerCategory::Network,
512 SanitizerCategory::TypeCoercion,
513 ] {
514 let prefix = format!("{}:", category.as_str());
515 if let Some(marker) = sanitized_vars
516 .iter()
517 .find(|value| value.starts_with(&format!("{ident}::{prefix}")))
518 {
519 return marker.split_once("::").map(|(_, label)| label.to_string());
520 }
521 }
522
523 sanitized_vars.contains(ident).then(|| ident.to_string())
524}
525
526fn loc(file: &Path, line: usize) -> SourceLocation {
527 SourceLocation {
528 file: file.to_path_buf(),
529 line,
530 column: 0,
531 end_line: None,
532 end_column: None,
533 }
534}
535
536#[cfg(test)]
537mod tests {
538 use super::*;
539
540 #[test]
541 fn detects_subprocess_with_param() {
542 let code = r#"
543def handle(cmd: str):
544 subprocess.run(cmd, shell=True)
545"#;
546 let parsed = PythonParser.parse_file(Path::new("test.py"), code).unwrap();
547 assert_eq!(parsed.commands.len(), 1);
548 assert!(matches!(
549 parsed.commands[0].command_arg,
550 ArgumentSource::Parameter { .. }
551 ));
552 }
553
554 #[test]
555 fn detects_requests_get_with_param() {
556 let code = r#"
557def fetch(url: str):
558 requests.get(url)
559"#;
560 let parsed = PythonParser.parse_file(Path::new("test.py"), code).unwrap();
561 assert_eq!(parsed.network_operations.len(), 1);
562 assert!(matches!(
563 parsed.network_operations[0].url_arg,
564 ArgumentSource::Parameter { .. }
565 ));
566 }
567
568 #[test]
569 fn safe_literal_not_flagged_as_param() {
570 let code = r#"
571def fetch():
572 requests.get("https://api.example.com")
573"#;
574 let parsed = PythonParser.parse_file(Path::new("test.py"), code).unwrap();
575 assert_eq!(parsed.network_operations.len(), 1);
576 assert!(matches!(
577 parsed.network_operations[0].url_arg,
578 ArgumentSource::Literal(_)
579 ));
580 }
581
582 #[test]
583 fn incomplete_quote_argument_is_unknown_not_panic() {
584 let code = r#"
585def fetch():
586 requests.get(
587 "
588 )
589"#;
590 let parsed = PythonParser.parse_file(Path::new("test.py"), code).unwrap();
591 assert_eq!(parsed.network_operations.len(), 1);
592 assert!(matches!(
593 parsed.network_operations[0].url_arg,
594 ArgumentSource::Unknown
595 ));
596 }
597
598 #[test]
599 fn detects_env_var_access() {
600 let code = r#"
601key = os.environ["AWS_SECRET_ACCESS_KEY"]
602"#;
603 let parsed = PythonParser.parse_file(Path::new("test.py"), code).unwrap();
604 assert_eq!(parsed.env_accesses.len(), 1);
605 assert!(parsed.env_accesses[0].is_sensitive);
606 }
607
608 #[test]
609 fn detects_eval() {
610 let code = r#"
611def run(code):
612 eval(code)
613"#;
614 let parsed = PythonParser.parse_file(Path::new("test.py"), code).unwrap();
615 assert_eq!(parsed.dynamic_exec.len(), 1);
616 assert!(matches!(
617 parsed.dynamic_exec[0].code_arg,
618 ArgumentSource::Parameter { .. }
619 ));
620 }
621
622 #[test]
623 fn detects_httpx_async_client_get() {
624 let code = r#"
625async def fetch(url: str):
626 async with httpx.AsyncClient() as client:
627 response = await client.get(url)
628"#;
629 let parsed = PythonParser.parse_file(Path::new("test.py"), code).unwrap();
630 assert_eq!(parsed.network_operations.len(), 1);
631 assert_eq!(parsed.network_operations[0].function, "client.get");
632 assert!(matches!(
633 parsed.network_operations[0].url_arg,
634 ArgumentSource::Parameter { .. }
635 ));
636 }
637
638 #[test]
639 fn detects_aiohttp_client_session_post() {
640 let code = r#"
641async def send_data(url: str, data):
642 async with aiohttp.ClientSession() as session:
643 await session.post(url, json=data)
644"#;
645 let parsed = PythonParser.parse_file(Path::new("test.py"), code).unwrap();
646 assert_eq!(parsed.network_operations.len(), 1);
647 assert_eq!(parsed.network_operations[0].function, "session.post");
648 assert!(parsed.network_operations[0].sends_data);
649 }
650
651 #[test]
652 fn detects_gitpython_command_execution() {
653 let code = r#"
654def git_log(repo, args):
655 repo.git.log(*args)
656"#;
657 let parsed = PythonParser.parse_file(Path::new("test.py"), code).unwrap();
658 assert_eq!(parsed.commands.len(), 1);
659 assert_eq!(parsed.commands[0].function, "repo.git.log");
660 }
661
662 #[test]
663 fn detects_gitpython_add_with_user_files() {
664 let code = r#"
665def stage_files(repo, files):
666 repo.git.add("--", *files)
667"#;
668 let parsed = PythonParser.parse_file(Path::new("test.py"), code).unwrap();
669 assert_eq!(parsed.commands.len(), 1);
670 assert_eq!(parsed.commands[0].function, "repo.git.add");
671 }
672
673 #[test]
674 fn no_false_positive_on_non_client_get() {
675 let code = r#"
676def process():
677 result = cache.get("key")
678"#;
679 let parsed = PythonParser.parse_file(Path::new("test.py"), code).unwrap();
680 assert!(parsed.network_operations.is_empty());
681 }
682
683 #[test]
684 fn detects_multiline_async_client_get() {
685 let code = r#"
687async def fetch_url(url: str):
688 async with AsyncClient(proxies=proxy_url) as client:
689 response = await client.get(
690 url,
691 follow_redirects=True,
692 headers={"User-Agent": user_agent},
693 )
694"#;
695 let parsed = PythonParser.parse_file(Path::new("test.py"), code).unwrap();
696 assert_eq!(
697 parsed.network_operations.len(),
698 1,
699 "should detect multi-line client.get() call"
700 );
701 assert_eq!(parsed.network_operations[0].function, "client.get");
702 assert!(matches!(
703 parsed.network_operations[0].url_arg,
704 ArgumentSource::Parameter { .. }
705 ));
706 }
707
708 #[test]
709 fn detects_multiline_subprocess_run() {
710 let code = r#"
711def execute(cmd: str):
712 subprocess.run(
713 cmd,
714 shell=True,
715 capture_output=True,
716 )
717"#;
718 let parsed = PythonParser.parse_file(Path::new("test.py"), code).unwrap();
719 assert_eq!(
720 parsed.commands.len(),
721 1,
722 "should detect multi-line subprocess.run() call"
723 );
724 }
725
726 #[test]
729 fn extracts_python_function_defs() {
730 let code = r#"
731def read_file(path: str) -> str:
732 with open(path) as f:
733 return f.read()
734
735def _internal_helper(x):
736 return x + 1
737"#;
738 let parsed = PythonParser.parse_file(Path::new("lib.py"), code).unwrap();
739 assert!(parsed.function_defs.len() >= 2);
740
741 let read_file = parsed.function_defs.iter().find(|d| d.name == "read_file");
742 assert!(read_file.is_some());
743 assert!(read_file.unwrap().is_exported); assert_eq!(read_file.unwrap().params, vec!["path"]);
745
746 let helper = parsed
747 .function_defs
748 .iter()
749 .find(|d| d.name == "_internal_helper");
750 assert!(helper.is_some());
751 assert!(!helper.unwrap().is_exported); }
753
754 #[test]
755 fn detects_python_sanitizer_assignment() {
756 let code = r#"
757def handler(raw_path: str):
758 safe_path = os.path.realpath(raw_path)
759 with open(safe_path) as f:
760 return f.read()
761"#;
762 let parsed = PythonParser.parse_file(Path::new("test.py"), code).unwrap();
763 assert!(parsed.sanitized_vars.contains("safe_path"));
764 }
765
766 #[test]
767 fn extracts_python_call_sites() {
768 let code = r#"
769def handler(args):
770 safe_path = os.path.realpath(args.path)
771 content = read_file(safe_path)
772 return content
773"#;
774 let parsed = PythonParser.parse_file(Path::new("test.py"), code).unwrap();
775 let rf_call = parsed.call_sites.iter().find(|cs| cs.callee == "read_file");
776 assert!(rf_call.is_some(), "Should find read_file call site");
777 let rf = rf_call.unwrap();
778 assert!(!rf.arguments.is_empty());
779 assert!(
780 matches!(&rf.arguments[0], ArgumentSource::Sanitized { .. }),
781 "safe_path should be Sanitized, got: {:?}",
782 rf.arguments[0]
783 );
784 }
785
786 #[test]
787 fn urlparse_assignment_is_not_sanitized_for_ssrf() {
788 let code = r#"
789from urllib.parse import urlparse
790import requests
791
792def handler(url: str):
793 parsed_url = urlparse(url)
794 return requests.get(parsed_url)
795"#;
796 let parsed = PythonParser.parse_file(Path::new("test.py"), code).unwrap();
797
798 assert!(!parsed.sanitized_vars.contains("parsed_url"));
799 assert_eq!(parsed.network_operations.len(), 1);
800 assert!(
801 parsed.network_operations[0].url_arg.is_tainted(),
802 "urlparse output must remain tainted for network sinks"
803 );
804 }
805
806 #[test]
807 fn redaction_assignment_is_not_sanitized_for_file_paths() {
808 let code = r#"
809def redactSecret(value: str) -> str:
810 return value.replace("secret", "[REDACTED]")
811
812def handler(path: str):
813 redacted_path = redactSecret(path)
814 return open(redacted_path).read()
815"#;
816 let parsed = PythonParser.parse_file(Path::new("test.py"), code).unwrap();
817
818 assert!(!parsed.sanitized_vars.contains("redacted_path"));
819 assert_eq!(parsed.file_operations.len(), 1);
820 assert!(
821 parsed.file_operations[0].path_arg.is_tainted(),
822 "redaction output must remain tainted for file path sinks"
823 );
824 }
825}