Skip to main content

mdx_rust_core/
runner.rs

1//! Agent execution harness with tracing support.
2//!
3//! This module is responsible for actually invoking registered agents
4//! (either as separate processes or, later, as native Rust entrypoints)
5//! while collecting rich traces for diagnosis and optimization.
6
7use crate::registry::{AgentContract, RegisteredAgent};
8use serde::{Deserialize, Serialize};
9use std::time::{Duration, Instant};
10use tracing::{info, warn};
11
12/// A single trace event captured during an agent run.
13/// Made first-class for trace-to-patch optimization (per handoff).
14#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct TraceEvent {
16    pub timestamp_ms: u64,
17    pub event_type: String, // "llm_call", "tool_call", "error", "decision", etc.
18    pub data: serde_json::Value,
19    #[serde(default)]
20    pub span_id: Option<String>,
21    #[serde(default)]
22    pub parent_span_id: Option<String>,
23    #[serde(default)]
24    pub latency_ms: Option<u64>,
25    #[serde(default)]
26    pub token_usage: Option<serde_json::Value>, // {prompt: , completion: , total: }
27    #[serde(default)]
28    pub model: Option<String>,
29    #[serde(default)]
30    pub tool_name: Option<String>,
31    #[serde(default)]
32    pub cost_usd: Option<f64>,
33    #[serde(default)]
34    pub redacted: bool,
35    #[serde(default)]
36    pub candidate_id: Option<String>,
37}
38
39impl TraceEvent {
40    pub fn lifecycle(
41        timestamp_ms: u64,
42        event_type: impl Into<String>,
43        span_id: impl Into<String>,
44        parent_span_id: Option<String>,
45        latency_ms: Option<u64>,
46        data: serde_json::Value,
47    ) -> Self {
48        Self {
49            timestamp_ms,
50            event_type: event_type.into(),
51            data,
52            span_id: Some(span_id.into()),
53            parent_span_id,
54            latency_ms,
55            token_usage: None,
56            model: None,
57            tool_name: None,
58            cost_usd: None,
59            redacted: false,
60            candidate_id: None,
61        }
62    }
63}
64
65/// The result of running an agent on a single input, including traces.
66#[derive(Debug, Clone, Serialize, Deserialize)]
67pub struct AgentRunResult {
68    pub output: serde_json::Value,
69    pub duration_ms: u64,
70    pub success: bool,
71    pub error: Option<String>,
72    pub traces: Vec<TraceEvent>,
73}
74
75/// Run a registered agent with the given input.
76/// Currently supports Process contracts (spawns the agent binary and pipes JSON).
77/// NativeRust support will be added when we generate harnesses.
78pub async fn run_agent(
79    agent: &RegisteredAgent,
80    input: serde_json::Value,
81) -> anyhow::Result<AgentRunResult> {
82    let start = Instant::now();
83    let mut traces = vec![];
84
85    info!(agent = %agent.name, "starting agent run");
86
87    match agent.contract {
88        AgentContract::Process | AgentContract::NativeRust => {
89            // For development and the example agent, we treat both as "runnable via cargo".
90            // Real NativeRust support (in-process or harness) will come later.
91            let result = run_process_agent(agent, input).await?;
92
93            let run_span_id = format!("run-{}", start.elapsed().as_nanos());
94            traces.push(TraceEvent::lifecycle(
95                0,
96                "run_started",
97                run_span_id.clone(),
98                None,
99                None,
100                serde_json::json!({
101                    "agent": agent.name,
102                    "contract": format!("{:?}", agent.contract)
103                }),
104            ));
105            traces.push(TraceEvent::lifecycle(
106                start.elapsed().as_millis() as u64,
107                "run_completed",
108                format!("{run_span_id}:completed"),
109                Some(run_span_id),
110                Some(result.duration_ms),
111                serde_json::json!({
112                    "success": result.success,
113                    "duration_ms": result.duration_ms
114                }),
115            ));
116
117            if !result.success {
118                warn!(agent = %agent.name, error = ?result.error, "agent run failed");
119            }
120
121            Ok(AgentRunResult {
122                output: result.output,
123                duration_ms: result.duration_ms,
124                success: result.success,
125                error: result.error,
126                traces,
127            })
128        }
129    }
130}
131
132// Internal helper – the actual process invocation with timeout.
133// A broken/hanging agent must fail with a structured error, never hang the optimizer (P0 requirement).
134async fn run_process_agent(
135    agent: &RegisteredAgent,
136    input: serde_json::Value,
137) -> anyhow::Result<AgentRunResult> {
138    use std::io::Write;
139    use std::process::{Command, Stdio};
140
141    let start = Instant::now();
142    const AGENT_RUN_TIMEOUT: Duration = Duration::from_secs(120);
143
144    let manifest = agent.path.join("Cargo.toml");
145    if !manifest.exists() {
146        return Err(anyhow::anyhow!("Cannot find Cargo.toml for Process agent"));
147    }
148
149    let mut command = Command::new("cargo");
150    command
151        .current_dir(&agent.path)
152        .args([
153            "run",
154            "-q",
155            "--manifest-path",
156            manifest.to_str().unwrap(),
157            "--",
158        ])
159        .stdin(Stdio::piped())
160        .stdout(Stdio::piped())
161        .stderr(Stdio::piped());
162    configure_process_group(&mut command);
163
164    let mut child = command.spawn()?;
165
166    {
167        let mut stdin = child
168            .stdin
169            .take()
170            .ok_or_else(|| anyhow::anyhow!("no stdin"))?;
171        stdin.write_all(serde_json::to_string(&input)?.as_bytes())?;
172        stdin.write_all(b"\n")?;
173    }
174
175    let output = loop {
176        match child.try_wait() {
177            Ok(Some(_)) => break child.wait_with_output()?,
178            Ok(None) if start.elapsed() >= AGENT_RUN_TIMEOUT => {
179                terminate_process_group(child.id());
180                let _ = child.kill();
181                let _ = child.wait();
182                let duration = start.elapsed().as_millis() as u64;
183                return Ok(AgentRunResult {
184                    output: serde_json::json!({"error": "agent timed out"}),
185                    duration_ms: duration,
186                    success: false,
187                    error: Some(format!(
188                        "Agent run exceeded {}s timeout and was terminated",
189                        AGENT_RUN_TIMEOUT.as_secs()
190                    )),
191                    traces: vec![],
192                });
193            }
194            Ok(None) => std::thread::sleep(Duration::from_millis(20)),
195            Err(e) => {
196                terminate_process_group(child.id());
197                let _ = child.kill();
198                let _ = child.wait();
199                let duration = start.elapsed().as_millis() as u64;
200                return Ok(AgentRunResult {
201                    output: serde_json::json!({"error": format!("wait error: {}", e)}),
202                    duration_ms: duration,
203                    success: false,
204                    error: Some(e.to_string()),
205                    traces: vec![],
206                });
207            }
208        }
209    };
210
211    let duration = start.elapsed().as_millis() as u64;
212
213    if !output.status.success() {
214        let stderr = String::from_utf8_lossy(&output.stderr).to_string();
215        return Ok(AgentRunResult {
216            output: serde_json::json!({"error": stderr}),
217            duration_ms: duration,
218            success: false,
219            error: Some(stderr),
220            traces: vec![],
221        });
222    }
223
224    let stdout = String::from_utf8_lossy(&output.stdout);
225    let parsed: serde_json::Value = serde_json::from_str(stdout.trim())
226        .unwrap_or_else(|_| serde_json::json!({"raw": stdout.to_string()}));
227
228    Ok(AgentRunResult {
229        output: parsed,
230        duration_ms: duration,
231        success: true,
232        error: None,
233        traces: vec![],
234    })
235}
236
237#[cfg(unix)]
238fn configure_process_group(command: &mut std::process::Command) {
239    use std::os::unix::process::CommandExt;
240    command.process_group(0);
241}
242
243#[cfg(not(unix))]
244fn configure_process_group(_command: &mut std::process::Command) {}
245
246#[cfg(unix)]
247fn terminate_process_group(pid: u32) {
248    for signal in ["-TERM", "-KILL"] {
249        let _ = std::process::Command::new("kill")
250            .arg(signal)
251            .arg(format!("-{pid}"))
252            .stdin(std::process::Stdio::null())
253            .stdout(std::process::Stdio::null())
254            .stderr(std::process::Stdio::null())
255            .status();
256        std::thread::sleep(Duration::from_millis(50));
257    }
258}
259
260#[cfg(not(unix))]
261fn terminate_process_group(_pid: u32) {}
262
263#[cfg(test)]
264mod tests {
265    use super::*;
266    use crate::registry::{AgentContract, RegisteredAgent};
267    use tempfile::tempdir;
268
269    #[tokio::test]
270    async fn process_agent_receives_eof_after_json_input() {
271        let dir = tempdir().unwrap();
272        std::fs::create_dir_all(dir.path().join("src")).unwrap();
273        std::fs::write(
274            dir.path().join("Cargo.toml"),
275            "[package]\nname=\"stdin-eof-agent\"\nversion=\"0.1.0\"\nedition=\"2021\"\n",
276        )
277        .unwrap();
278        std::fs::write(
279            dir.path().join("src/main.rs"),
280            r#"
281use std::io::Read;
282
283fn main() {
284    let mut input = String::new();
285    std::io::stdin().read_to_string(&mut input).unwrap();
286    assert!(!input.trim().is_empty());
287    println!("{{\"answer\":\"read eof\",\"reasoning\":\"stdin closed\",\"confidence\":0.9}}");
288}
289"#,
290        )
291        .unwrap();
292
293        let agent = RegisteredAgent {
294            name: "stdin-eof-agent".to_string(),
295            path: dir.path().to_path_buf(),
296            contract: AgentContract::Process,
297            registered_at: "test".to_string(),
298        };
299
300        let result = run_agent(&agent, serde_json::json!({"query":"hello"}))
301            .await
302            .unwrap();
303
304        assert!(result.success, "{:?}", result.error);
305        assert_eq!(result.output["answer"], "read eof");
306    }
307}