pyrograph 0.1.0

GPU-accelerated taint analysis for supply chain malware detection
Documentation
//! Property-based tests verifying invariants across randomly generated taint graphs.

use proptest::prelude::*;
use pyrograph::ir::{EdgeKind, NodeKind, TaintGraph};
use pyrograph::labels::TaintLabel;
use pyrograph::analyze;

fn random_graph(
    max_nodes: usize,
    max_edges: usize,
    max_sources: usize,
    max_sinks: usize,
) -> impl Strategy<Value = TaintGraph> {
    (
        1..=max_nodes,
        proptest::collection::vec((0..max_nodes, 0..max_nodes), 0..max_edges),
    )
        .prop_map(move |(num_nodes, edges)| {
            let num_sources = max_sources.min(num_nodes);
            let num_sinks = max_sinks.min(num_nodes.saturating_sub(num_sources));
            let mut graph = TaintGraph::new();

            for i in 0..num_nodes {
                let label = if i < num_sources {
                    Some(TaintLabel::Source(i))
                } else if i < num_sources + num_sinks {
                    Some(TaintLabel::Sink(i - num_sources))
                } else {
                    None
                };
                graph.add_node(NodeKind::Variable, format!("n{i}"), label);
            }

            for (f, t) in edges {
                if f < num_nodes && t < num_nodes {
                    graph.add_edge(f as u32, t as u32, EdgeKind::Assignment);
                }
            }

            graph
        })
}

proptest! {
    #[test]
    fn analyze_never_panics(graph in random_graph(50, 100, 5, 5)) {
        let _ = analyze(&graph);
    }
}

proptest! {
    #[test]
    fn no_sources_zero_findings(
        num_nodes in 1..20usize,
        edges in proptest::collection::vec((0..20usize, 0..20usize), 0..30),
    ) {
        let mut graph = TaintGraph::new();
        for i in 0..num_nodes {
            let label = if i < 3 { Some(TaintLabel::Sink(i)) } else { None };
            graph.add_node(NodeKind::Variable, format!("n{i}"), label);
        }
        for (f, t) in edges {
            if f < num_nodes && t < num_nodes {
                graph.add_edge(f as u32, t as u32, EdgeKind::Assignment);
            }
        }
        let findings = analyze(&graph).unwrap();
        prop_assert_eq!(findings.len(), 0, "No sources means no findings");
    }
}

proptest! {
    #[test]
    fn no_sinks_zero_findings(
        num_nodes in 1..20usize,
        edges in proptest::collection::vec((0..20usize, 0..20usize), 0..30),
    ) {
        let mut graph = TaintGraph::new();
        for i in 0..num_nodes {
            let label = if i < 3 { Some(TaintLabel::Source(i)) } else { None };
            graph.add_node(NodeKind::Variable, format!("n{i}"), label);
        }
        for (f, t) in edges {
            if f < num_nodes && t < num_nodes {
                graph.add_edge(f as u32, t as u32, EdgeKind::Assignment);
            }
        }
        let findings = analyze(&graph).unwrap();
        prop_assert_eq!(findings.len(), 0, "No sinks means no findings");
    }
}

proptest! {
    #[test]
    fn findings_have_valid_paths(graph in random_graph(30, 60, 3, 3)) {
        let findings = analyze(&graph).unwrap();
        for finding in &findings {
            prop_assert!(!finding.path.is_empty(), "Finding path must not be empty");

            let first = finding.path[0];
            let first_node = graph.node(first).unwrap();
            prop_assert!(
                first_node.label.as_ref().map_or(false, |l| l.is_source()),
                "Path must start at a source node, got {:?}", first_node.label
            );

            let last = *finding.path.last().unwrap();
            let last_node = graph.node(last).unwrap();
            prop_assert!(
                last_node.label.as_ref().map_or(false, |l| l.is_sink()),
                "Path must end at a sink node, got {:?}", last_node.label
            );

            for window in finding.path.windows(2) {
                let from = window[0];
                let to = window[1];
                let has_edge = graph.neighbors(from).any(|n| n == to);
                prop_assert!(has_edge, "Path must follow graph edges: {} -> {}", from, to);
            }
        }
    }
}

proptest! {
    #[test]
    fn analysis_is_deterministic(graph in random_graph(20, 40, 2, 2)) {
        let findings1 = analyze(&graph).unwrap();
        let findings2 = analyze(&graph).unwrap();
        prop_assert_eq!(findings1.len(), findings2.len(), "Same graph must produce same number of findings");
        for (f1, f2) in findings1.iter().zip(findings2.iter()) {
            prop_assert_eq!(f1.source, f2.source);
            prop_assert_eq!(f1.sink, f2.sink);
            prop_assert_eq!(&f1.path, &f2.path);
        }
    }
}

#[test]
fn duplicate_edges_no_extra_findings() {
    let mut graph = TaintGraph::new();
    let src = graph.add_node(NodeKind::Variable, "src".into(), Some(TaintLabel::Source(0)));
    let snk = graph.add_node(NodeKind::Call, "snk".into(), Some(TaintLabel::Sink(0)));

    for _ in 0..100 {
        graph.add_edge(src, snk, EdgeKind::Argument);
    }

    let findings = analyze(&graph).unwrap();
    assert_eq!(findings.len(), 1, "Duplicate edges must not multiply findings");
}

#[test]
fn empty_graph_zero_findings() {
    let graph = TaintGraph::new();
    let findings = analyze(&graph).unwrap();
    assert_eq!(findings.len(), 0);
}