#![cfg(unix)]
use std::fs;
use std::path::PathBuf;
use serde_json::json;
use tempfile::tempdir;
use zift::deep::config::{DeepMode, DeepRuntime};
use zift::types::{AuthCategory, Confidence, Finding, Language, ScanPass, Surface};
fn subprocess_runtime(cmd: &str, timeout_secs: u64) -> DeepRuntime {
DeepRuntime {
mode: DeepMode::Subprocess,
base_url: String::new(),
model: String::new(),
api_key: None,
max_cost_usd: None,
cost_per_1k_input: None,
cost_per_1k_output: None,
request_timeout_secs: 120,
max_candidates: 5,
max_concurrent: 1,
temperature: 0.0,
max_prompt_chars: 16_000,
excludes: Vec::new(),
language_filter: Vec::new(),
agent_cmd: Some(cmd.into()),
agent_timeout_secs: timeout_secs,
}
}
fn structural_finding(file: &str, line: usize) -> Finding {
Finding {
id: format!("structural-{file}-{line}"),
file: PathBuf::from(file),
line_start: line,
line_end: line + 2,
code_snippet: String::new(),
language: Language::TypeScript,
category: AuthCategory::Custom,
confidence: Confidence::Low,
description: "matched custom rule".into(),
pattern_rule: Some("ts-custom".into()),
policy_outputs: vec![],
pass: ScanPass::Structural,
surface: Surface::Backend,
provenance: None,
}
}
#[test]
fn happy_path_subprocess_emits_semantic_finding() {
let dir = tempdir().unwrap();
fs::write(
dir.path().join("auth.ts"),
"// imports\nfunction isAdmin(u) {\n return u.role === 'admin';\n}\n",
)
.unwrap();
let canned = json!({
"findings": [{
"line_start": 2,
"line_end": 4,
"category": "rbac",
"confidence": "high",
"description": "isAdmin role check",
"reasoning": "function name + role comparison",
"is_false_positive": false
}]
})
.to_string();
let cmd = format!(
"cat >/dev/null && printf '%s' '{}'",
canned.replace('\'', "'\"'\"'")
);
let runtime = subprocess_runtime(&cmd, 30);
let merged = zift::deep::run(Vec::new(), dir.path(), &runtime).unwrap();
let semantic: Vec<&Finding> = merged
.iter()
.filter(|f| f.pass == ScanPass::Semantic)
.collect();
assert_eq!(
semantic.len(),
1,
"expected one semantic finding, got {merged:?}"
);
assert_eq!(semantic[0].category, AuthCategory::Rbac);
assert_eq!(semantic[0].confidence, Confidence::High);
}
#[test]
fn nonzero_exit_skips_candidate_keeps_structural() {
let dir = tempdir().unwrap();
fs::write(
dir.path().join("auth.ts"),
"function isAdmin(u) { return u.role === 'admin'; }\n",
)
.unwrap();
let runtime = subprocess_runtime("cat >/dev/null && exit 7", 5);
let structural = vec![structural_finding("auth.ts", 1)];
let merged = zift::deep::run(structural.clone(), dir.path(), &runtime).unwrap();
assert_eq!(merged.len(), 1);
assert_eq!(merged[0].pass, ScanPass::Structural);
}
#[test]
fn malformed_stdout_skips_candidate() {
let dir = tempdir().unwrap();
fs::write(
dir.path().join("auth.ts"),
"function isAdmin(u) { return u.role === 'admin'; }\n",
)
.unwrap();
let runtime = subprocess_runtime("cat >/dev/null && printf 'not json'", 5);
let merged = zift::deep::run(Vec::new(), dir.path(), &runtime).unwrap();
assert!(merged.is_empty(), "expected empty merge, got {merged:?}");
}
#[test]
fn timeout_skips_candidate_and_returns_promptly() {
let dir = tempdir().unwrap();
fs::write(
dir.path().join("auth.ts"),
"function isAdmin(u) { return u.role === 'admin'; }\n",
)
.unwrap();
let runtime = subprocess_runtime("sleep 30", 1);
let start = std::time::Instant::now();
let merged = zift::deep::run(Vec::new(), dir.path(), &runtime).unwrap();
let elapsed = start.elapsed();
assert!(merged.is_empty());
assert!(
elapsed < std::time::Duration::from_secs(10),
"deep::run hung past the per-candidate timeout: {elapsed:?}",
);
}
#[test]
fn agent_receives_envelope_with_system_user_schema() {
let dir = tempdir().unwrap();
fs::write(
dir.path().join("auth.ts"),
"function isAdmin(u) { return u.role === 'admin'; }\n",
)
.unwrap();
let envelope_path = dir.path().join("captured-envelope.json");
let canned = r#"{"findings":[]}"#;
let cmd = format!(
"cat > '{}' && printf '%s' '{}'",
envelope_path.display(),
canned,
);
let runtime = subprocess_runtime(&cmd, 10);
let merged = zift::deep::run(Vec::new(), dir.path(), &runtime).unwrap();
assert!(merged.is_empty());
let raw = fs::read_to_string(&envelope_path)
.expect("agent_cmd should have written the envelope to the temp file");
let envelope: serde_json::Value =
serde_json::from_str(&raw).expect("envelope must be valid JSON");
let system = envelope
.get("system")
.and_then(|v| v.as_str())
.expect("envelope.system must be a string");
let user = envelope
.get("user")
.and_then(|v| v.as_str())
.expect("envelope.user must be a string");
assert!(!system.is_empty(), "envelope.system must be non-empty");
assert!(!user.is_empty(), "envelope.user must be non-empty");
assert!(
user.contains("auth.ts"),
"envelope.user must reference the candidate file (got: {user:?})",
);
let schema = envelope
.get("schema")
.expect("envelope.schema must be present");
assert!(
schema.is_object(),
"envelope.schema must be a JSON object (got: {schema:?})",
);
}
#[test]
fn agent_envelope_failure_path_is_observable() {
let dir = tempdir().unwrap();
fs::write(
dir.path().join("auth.ts"),
"function isAdmin(u) { return u.role === 'admin'; }\n",
)
.unwrap();
let cmd =
"in=$(cat) && echo \"$in\" | grep -q '\"definitely_not_present\"' && printf 'unreached'";
let runtime = subprocess_runtime(cmd, 5);
let structural = vec![structural_finding("auth.ts", 1)];
let merged = zift::deep::run(structural, dir.path(), &runtime).unwrap();
assert_eq!(merged.len(), 1);
assert_eq!(merged[0].pass, ScanPass::Structural);
}