#![cfg(feature = "python")]
use pyrograph::{analyze, parse::parse_python};
fn assert_detects(name: &str, source: &str) {
let graph = parse_python(source, &format!("{name}.py")).unwrap();
let findings = analyze(&graph).unwrap();
assert!(!findings.is_empty(), "expected finding for {name}");
}
fn assert_clean(name: &str, source: &str) {
let graph = parse_python(source, &format!("{name}.py")).unwrap();
let findings = analyze(&graph).unwrap();
assert!(findings.is_empty(), "unexpected finding for {name}: {findings:?}");
}
#[test]
fn tp_01_setup_os_system_environ() {
assert_detects("tp_01_setup_os_system_environ", r#"
import os
from setuptools import setup
def malicious_setup():
cmd = os.environ.get("MALICIOUS_CMD")
if cmd:
os.system(cmd)
malicious_setup()
setup(name="innocent_pkg")
"#);
}
#[test]
fn tp_02_setup_subprocess_popen_sys_argv() {
assert_detects("tp_02_setup_subprocess_popen_sys_argv", r#"
import sys
import subprocess
from setuptools import setup
payload = sys.argv[1]
subprocess.Popen(["bash", "-c", payload])
setup(name="bad_pkg")
"#);
}
#[test]
fn tp_03_setup_subprocess_run_open_read() {
assert_detects("tp_03_setup_subprocess_run_open_read", r#"
import subprocess
from setuptools import setup
with open(".hidden_payload") as f:
code = f.read()
subprocess.run(code, shell=True)
setup(name="test_pkg")
"#);
}
#[test]
fn tp_04_setup_os_system_input() {
assert_detects("tp_04_setup_os_system_input", r#"
import os
from setuptools import setup
user_cmd = input("Enter command: ")
os.system(user_cmd)
setup(name="test_pkg")
"#);
}
#[test]
fn tp_05_setup_subprocess_call_request_form() {
assert_detects("tp_05_setup_subprocess_call_request_form", r#"
from flask import request
import subprocess
from setuptools import setup
cmd = request.form['cmd']
subprocess.call(cmd, shell=True)
setup(name="test_pkg")
"#);
}
#[test]
fn tp_06_setup_subprocess_check_output_getenv() {
assert_detects("tp_06_setup_subprocess_check_output_getenv", r#"
import os
import subprocess
from setuptools import setup
payload = os.getenv("PAYLOAD")
subprocess.check_output(payload, shell=True)
setup(name="test_pkg")
"#);
}
#[test]
fn tp_07_setup_os_popen_pathlib() {
assert_detects("tp_07_setup_os_popen_pathlib", r#"
import os
import pathlib
from setuptools import setup
payload = pathlib.Path("payload.sh").read_text()
os.popen(payload)
setup(name="test_pkg")
"#);
}
#[test]
fn tp_08_setup_subprocess_run_request_args() {
assert_detects("tp_08_setup_subprocess_run_request_args", r#"
import subprocess
from flask import request
from setuptools import setup
subprocess.run(request.args['cmd'], shell=True)
setup(name="test_pkg")
"#);
}
#[test]
fn tp_09_setup_exec_base64_request_json() {
assert_detects("tp_09_setup_exec_base64_request_json", r#"
from flask import request
import base64
from setuptools import setup
data = request.json['payload']
decoded = base64.b64decode(data)
exec(decoded)
setup(name="test_pkg")
"#);
}
#[test]
fn tp_10_setup_eval_open_read() {
assert_detects("tp_10_setup_eval_open_read", r#"
from setuptools import setup
with open("config.txt") as f:
config = f.read()
eval(config)
setup(name="test_pkg")
"#);
}
#[test]
fn tp_11_setup_compile_request_args() {
assert_detects("tp_11_setup_compile_request_args", r#"
from flask import request
from setuptools import setup
src = request.args.get("src")
compiled = compile(src, "<string>", "exec")
exec(compiled)
setup(name="test_pkg")
"#);
}
#[test]
fn tp_12_setup_exec_pathlib_read() {
assert_detects("tp_12_setup_exec_pathlib_read", r#"
import pathlib
from setuptools import setup
code = pathlib.Path("payload.py").read_text()
exec(code)
setup(name="test_pkg")
"#);
}
#[test]
fn tp_13_setup_eval_sys_argv() {
assert_detects("tp_13_setup_eval_sys_argv", r#"
import sys
from setuptools import setup
expr = sys.argv[2]
eval(expr)
setup(name="test_pkg")
"#);
}
#[test]
fn tp_14_setup_exec_input() {
assert_detects("tp_14_setup_exec_input", r#"
from setuptools import setup
cmd = input("Paste code here: ")
exec(cmd)
setup(name="test_pkg")
"#);
}
#[test]
fn tp_15_setup_socket_connect_environ() {
assert_detects("tp_15_setup_socket_connect_environ", r#"
import os
import socket
from setuptools import setup
c2_ip = os.environ.get("C2_HOST")
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((c2_ip, 1337))
setup(name="test_pkg")
"#);
}
#[test]
fn tp_16_setup_socket_connect_sys_argv() {
assert_detects("tp_16_setup_socket_connect_sys_argv", r#"
import sys
import socket
from setuptools import setup
host = sys.argv[1]
s = socket.socket()
s.connect((host, 80))
setup(name="test_pkg")
"#);
}
#[test]
fn tp_17_setup_pickle_loads_open_read() {
assert_detects("tp_17_setup_pickle_loads_open_read", r#"
import pickle
from setuptools import setup
data = open("payload.pkl", "rb").read()
pickle.loads(data)
setup(name="test_pkg")
"#);
}
#[test]
fn tp_18_setup_pickle_loads_request_form() {
assert_detects("tp_18_setup_pickle_loads_request_form", r#"
import pickle
from flask import request
from setuptools import setup
blob = request.form['data']
pickle.loads(blob)
setup(name="test_pkg")
"#);
}
#[test]
fn tp_19_setup_requests_get_environ() {
assert_detects("tp_19_setup_requests_get_environ", r#"
import os
import requests
from setuptools import setup
exfil = os.environ.get("WEBHOOK_URL")
requests.get(exfil)
setup(name="test_pkg")
"#);
}
#[test]
fn tp_20_setup_urllib_request_json() {
assert_detects("tp_20_setup_urllib_request_json", r#"
from flask import request
import urllib.request
from setuptools import setup
url = request.json['webhook']
urllib.request.urlopen(url)
setup(name="test_pkg")
"#);
}
#[test]
fn fp_01_setup_hardcoded_os_system() {
assert_clean("fp_01_setup_hardcoded_os_system", r#"
import os
from setuptools import setup
os.system("echo 'building module'")
setup(name="benign_pkg")
"#);
}
#[test]
fn fp_02_setup_hardcoded_subprocess_run() {
assert_clean("fp_02_setup_hardcoded_subprocess_run", r#"
import subprocess
from setuptools import setup
subprocess.run(["gcc", "-c", "math.c"])
setup(name="benign_pkg")
"#);
}
#[test]
fn fp_03_setup_hardcoded_exec() {
assert_clean("fp_03_setup_hardcoded_exec", r#"
from setuptools import setup
exec("x = 10")
setup(name="benign_pkg")
"#);
}
#[test]
fn fp_04_setup_disconnected_flow() {
assert_clean("fp_04_setup_disconnected_flow", r#"
import os
import sys
from setuptools import setup
source = sys.argv[1]
cmd = "ls -la"
os.system(cmd)
setup(name="benign_pkg")
"#);
}
#[test]
fn fp_05_setup_hardcoded_socket() {
assert_clean("fp_05_setup_hardcoded_socket", r#"
import socket
from setuptools import setup
s = socket.socket()
s.connect(("127.0.0.1", 8080))
setup(name="benign_pkg")
"#);
}
#[test]
fn fp_06_setup_hardcoded_requests() {
assert_clean("fp_06_setup_hardcoded_requests", r#"
import requests
from setuptools import setup
requests.get("https://api.github.com")
setup(name="benign_pkg")
"#);
}
#[test]
fn fp_07_setup_hardcoded_pickle() {
assert_clean("fp_07_setup_hardcoded_pickle", r#"
import pickle
from setuptools import setup
pickle.loads(b"\x80\x04\x95\x11\x00\x00\x00\x00\x00\x00\x00}\x94\x8c\x04test\x94\x8c\x04data\x94s.")
setup(name="benign_pkg")
"#);
}
#[test]
fn fp_08_setup_sanitized_int() {
assert_clean("fp_08_setup_sanitized_int", r#"
import sys
import os
from setuptools import setup
val = int(sys.argv[1])
os.system(val)
setup(name="benign_pkg")
"#);
}
#[test]
fn fp_09_setup_sanitized_float() {
assert_clean("fp_09_setup_sanitized_float", r#"
from flask import request
import subprocess
from setuptools import setup
timeout = float(request.args['timeout'])
subprocess.run(timeout)
setup(name="benign_pkg")
"#);
}
#[test]
fn fp_10_setup_sanitized_str() {
assert_clean("fp_10_setup_sanitized_str", r#"
import os
import requests
from setuptools import setup
port = str(os.environ.get("PORT", "8080"))
requests.get(port)
setup(name="benign_pkg")
"#);
}
#[test]
fn fp_11_setup_sanitized_shlex_quote() {
assert_clean("fp_11_setup_sanitized_shlex_quote", r#"
import sys
import os
import shlex
from setuptools import setup
user_input = sys.argv[2]
os.system(shlex.quote(user_input))
setup(name="benign_pkg")
"#);
}
#[test]
fn fp_12_setup_sanitized_basename() {
assert_clean("fp_12_setup_sanitized_basename", r#"
from flask import request
import os
from setuptools import setup
path = request.form['file']
os.popen(os.path.basename(path))
setup(name="benign_pkg")
"#);
}
#[test]
fn tp_21_setup_popen_with_os_environ_is_flagged() {
assert_detects("tp_21_setup_popen_with_os_environ", r#"
import os
import subprocess
from setuptools import setup
subprocess.Popen("ls", env=os.environ)
setup(name="leaky_pkg")
"#);
}
#[test]
fn fp_13_setup_sanitized_bleach_clean() {
assert_clean("fp_13_setup_sanitized_bleach_clean", r#"
import bleach
import sys
from setuptools import setup
data = sys.argv[1]
exec(bleach.clean(data))
setup(name="benign_pkg")
"#);
}