use crate::deep::analyzer::{AnalyzeResponse, Analyzer, TokenUsage};
use crate::deep::client::{strip_markdown_fence, truncate_for_log};
use crate::deep::config::DeepRuntime;
use crate::deep::error::DeepError;
use crate::deep::finding::SemanticFinding;
use crate::deep::prompt::RenderedPrompt;
use serde::Deserialize;
use std::io::{Read, Write};
use std::process::{Command, Stdio};
use std::sync::mpsc;
use std::thread;
use std::time::{Duration, Instant};
pub struct SubprocessClient {
cmd: String,
timeout: Duration,
}
impl std::fmt::Debug for SubprocessClient {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SubprocessClient")
.field("cmd", &"<redacted>")
.field("timeout", &self.timeout)
.finish()
}
}
impl SubprocessClient {
pub fn new(runtime: &DeepRuntime) -> Result<Self, DeepError> {
let cmd = runtime
.agent_cmd
.clone()
.ok_or_else(|| {
DeepError::Config(
"subprocess analyzer constructed without agent_cmd \
(runtime invariant violated)"
.into(),
)
})?
.trim()
.to_string();
if cmd.is_empty() {
return Err(DeepError::Config(
"subprocess agent_cmd is empty after trim".into(),
));
}
Ok(Self {
cmd,
timeout: Duration::from_secs(runtime.agent_timeout_secs),
})
}
fn run_once(&self, prompt: &RenderedPrompt) -> Result<AnalyzeResponse, DeepError> {
let envelope = build_envelope(prompt);
let mut child = shell_command(&self.cmd)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.map_err(|e| {
DeepError::Config(format!("failed to spawn agent_cmd: {e}"))
})?;
let mut stdin = child
.stdin
.take()
.expect("stdin pipe was requested via Stdio::piped");
let stdout = child
.stdout
.take()
.expect("stdout pipe was requested via Stdio::piped");
let stderr = child
.stderr
.take()
.expect("stderr pipe was requested via Stdio::piped");
let envelope_bytes = envelope.into_bytes();
let writer = thread::spawn(move || -> std::io::Result<()> {
stdin.write_all(&envelope_bytes)?;
stdin.flush()?;
drop(stdin);
Ok(())
});
let (stdout_tx, stdout_rx) = mpsc::channel::<std::io::Result<String>>();
let _stdout_thread = thread::spawn(move || {
let mut buf = String::new();
let mut handle = stdout;
let res = handle.read_to_string(&mut buf).map(|_| buf);
let _ = stdout_tx.send(res);
});
let (stderr_tx, stderr_rx) = mpsc::channel::<String>();
let _stderr_thread = thread::spawn(move || {
let mut buf = String::new();
let mut handle = stderr;
let _ = handle.read_to_string(&mut buf);
let _ = stderr_tx.send(buf);
});
let start = Instant::now();
let exit = loop {
match child.try_wait() {
Ok(Some(status)) => break status,
Ok(None) => {
if start.elapsed() >= self.timeout {
#[cfg(unix)]
kill_process_tree(&child);
#[cfg(not(unix))]
kill_process_tree(&mut child);
let _ = child.wait();
let _ = writer.join();
let drain_timeout = Duration::from_millis(500);
let _ = stdout_rx.recv_timeout(drain_timeout);
let _ = stderr_rx.recv_timeout(drain_timeout);
return Err(DeepError::Timeout {
secs: self.timeout.as_secs(),
});
}
thread::sleep(Duration::from_millis(50));
}
Err(e) => {
return Err(DeepError::Io(e));
}
}
};
if let Ok(Err(e)) = writer.join() {
tracing::debug!("subprocess: writer error (likely EPIPE on early exit): {e}");
}
let remaining = self.timeout.saturating_sub(start.elapsed());
let stdout_buf = match stdout_rx.recv_timeout(remaining) {
Ok(res) => res.map_err(DeepError::Io)?,
Err(mpsc::RecvTimeoutError::Timeout) => {
#[cfg(unix)]
kill_process_tree(&child);
#[cfg(not(unix))]
kill_process_tree(&mut child);
let _ = stderr_rx.recv_timeout(Duration::from_millis(500));
return Err(DeepError::Timeout {
secs: self.timeout.as_secs(),
});
}
Err(mpsc::RecvTimeoutError::Disconnected) => {
return Err(DeepError::BadResponse(
"subprocess stdout reader disconnected".into(),
));
}
};
let stderr_buf = stderr_rx
.recv_timeout(Duration::from_millis(500))
.unwrap_or_default();
if !exit.success() {
tracing::debug!(
exit = ?exit,
stderr = %truncate_for_log(&stderr_buf),
stdout = %truncate_for_log(&stdout_buf),
"subprocess: agent_cmd exited nonzero",
);
return Err(DeepError::BadResponse(format!(
"agent_cmd exited with {} (no findings parsed)",
exit_status_brief(&exit),
)));
}
let cleaned = strip_markdown_fence(&stdout_buf);
let parsed: FindingsEnvelope = serde_json::from_str(cleaned).map_err(|e| {
tracing::debug!(
error = %e,
preview = %truncate_for_log(&stdout_buf),
stderr_preview = %truncate_for_log(&stderr_buf),
"subprocess: stdout was not valid findings JSON",
);
DeepError::BadResponse("agent_cmd output was not valid findings JSON".into())
})?;
Ok(AnalyzeResponse {
findings: parsed.findings,
usage: TokenUsage::default(),
})
}
}
impl Analyzer for SubprocessClient {
fn analyze(&self, prompt: &RenderedPrompt) -> Result<AnalyzeResponse, DeepError> {
self.run_once(prompt)
}
}
fn build_envelope(prompt: &RenderedPrompt) -> String {
serde_json::json!({
"system": prompt.system,
"user": prompt.user,
"schema": prompt.schema,
})
.to_string()
}
#[cfg(unix)]
fn shell_command(cmd: &str) -> Command {
use std::os::unix::process::CommandExt;
let mut c = Command::new("sh");
c.arg("-c").arg(cmd);
unsafe {
c.pre_exec(|| {
if libc::setsid() == -1 {
return Err(std::io::Error::last_os_error());
}
Ok(())
});
}
c
}
#[cfg(windows)]
fn shell_command(cmd: &str) -> Command {
let mut c = Command::new("cmd");
c.arg("/C").arg(cmd);
c
}
#[cfg(unix)]
fn kill_process_tree(child: &std::process::Child) {
unsafe {
let pid = child.id() as libc::pid_t;
libc::kill(-pid, libc::SIGKILL);
}
}
#[cfg(not(unix))]
fn kill_process_tree(child: &mut std::process::Child) {
let _ = child.kill();
}
fn exit_status_brief(status: &std::process::ExitStatus) -> String {
status.to_string()
}
#[derive(Deserialize)]
struct FindingsEnvelope {
findings: Vec<SemanticFinding>,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::deep::candidate::{Candidate, CandidateKind};
use crate::deep::config::DeepMode;
use crate::deep::prompt::{PromptInputs, render};
use crate::types::Language;
use std::path::PathBuf;
fn synth_runtime(cmd: &str, timeout_secs: u64) -> DeepRuntime {
DeepRuntime {
mode: DeepMode::Subprocess,
base_url: String::new(),
model: String::new(),
api_key: None,
max_cost_usd: None,
cost_per_1k_input: None,
cost_per_1k_output: None,
request_timeout_secs: 120,
max_candidates: 50,
max_concurrent: 1,
temperature: 0.0,
max_prompt_chars: 16_000,
excludes: Vec::new(),
language_filter: Vec::new(),
agent_cmd: Some(cmd.into()),
agent_timeout_secs: timeout_secs,
}
}
fn synth_prompt() -> RenderedPrompt {
let cand = Candidate {
kind: CandidateKind::ColdRegion,
file: PathBuf::from("a.ts"),
language: Language::TypeScript,
line_start: 1,
line_end: 5,
source_snippet: "function isAdmin() { return true; }".into(),
imports: Vec::new(),
original_finding_id: None,
seed_category: None,
};
render(&PromptInputs {
candidate: &cand,
structural_finding: None,
})
}
#[test]
fn new_rejects_empty_agent_cmd() {
let mut rt = synth_runtime("nonempty", 60);
rt.agent_cmd = Some("".into());
let err = SubprocessClient::new(&rt).unwrap_err();
assert!(matches!(err, DeepError::Config(_)));
}
#[test]
fn new_rejects_whitespace_agent_cmd() {
let mut rt = synth_runtime("nonempty", 60);
rt.agent_cmd = Some(" ".into());
let err = SubprocessClient::new(&rt).unwrap_err();
assert!(matches!(err, DeepError::Config(_)));
}
#[test]
fn new_rejects_runtime_without_agent_cmd() {
let mut rt = synth_runtime("ignored", 60);
rt.agent_cmd = None;
let err = SubprocessClient::new(&rt).unwrap_err();
assert!(
matches!(err, DeepError::Config(ref msg) if msg.contains("invariant")),
"expected Config(<invariant>), got: {err:?}",
);
}
#[test]
fn build_envelope_contains_system_user_and_schema() {
let prompt = synth_prompt();
let env = build_envelope(&prompt);
let v: serde_json::Value = serde_json::from_str(&env).unwrap();
assert!(v.get("system").is_some(), "envelope missing 'system'");
assert!(v.get("user").is_some(), "envelope missing 'user'");
assert!(v.get("schema").is_some(), "envelope missing 'schema'");
assert_eq!(v.get("schema").unwrap(), &prompt.schema);
}
#[cfg(unix)]
#[test]
fn happy_path_with_cat_returning_canned_json() {
let canned = r#"{"findings": []}"#;
let cmd = format!("printf '%s' '{}'", canned);
let rt = synth_runtime(&cmd, 10);
let client = SubprocessClient::new(&rt).unwrap();
let prompt = synth_prompt();
let resp = client.analyze(&prompt).unwrap();
assert!(resp.findings.is_empty());
assert_eq!(resp.usage, TokenUsage::default());
}
#[cfg(unix)]
#[test]
fn nonzero_exit_surfaces_as_bad_response_for_skip() {
let rt = synth_runtime("exit 7", 5);
let client = SubprocessClient::new(&rt).unwrap();
let err = client.analyze(&synth_prompt()).unwrap_err();
assert!(
matches!(err, DeepError::BadResponse(_)),
"expected BadResponse for nonzero exit, got: {err:?}",
);
}
#[cfg(unix)]
#[test]
fn malformed_stdout_surfaces_as_bad_response() {
let rt = synth_runtime("printf 'this is not json'", 5);
let client = SubprocessClient::new(&rt).unwrap();
let err = client.analyze(&synth_prompt()).unwrap_err();
assert!(
matches!(err, DeepError::BadResponse(_)),
"expected BadResponse for malformed stdout, got: {err:?}",
);
}
#[cfg(unix)]
#[test]
fn timeout_kills_long_running_subprocess() {
let rt = synth_runtime("sleep 30", 1);
let client = SubprocessClient::new(&rt).unwrap();
let start = Instant::now();
let err = client.analyze(&synth_prompt()).unwrap_err();
let elapsed = start.elapsed();
assert!(
matches!(err, DeepError::Timeout { .. }),
"expected Timeout, got: {err:?}",
);
assert!(
elapsed < Duration::from_secs(5),
"timeout took too long: {elapsed:?}",
);
}
#[cfg(unix)]
#[test]
fn spawn_failure_surfaces_as_config_for_hard_fail() {
let rt = synth_runtime("definitely-not-a-command-12345", 5);
let client = SubprocessClient::new(&rt).unwrap();
let err = client.analyze(&synth_prompt()).unwrap_err();
assert!(
matches!(err, DeepError::BadResponse(_)),
"expected BadResponse (sh exited 127), got: {err:?}",
);
}
#[cfg(unix)]
#[test]
fn markdown_fence_around_stdout_is_stripped() {
let cmd = r#"printf '%s' '```json
{"findings": []}
```'"#;
let rt = synth_runtime(cmd, 5);
let client = SubprocessClient::new(&rt).unwrap();
let resp = client.analyze(&synth_prompt()).unwrap();
assert!(resp.findings.is_empty());
}
}