Skip to main content

mdx_rust_core/
runner.rs

1//! Agent execution harness with tracing support.
2//!
3//! This module is responsible for actually invoking registered agents
4//! (either as separate processes or, later, as native Rust entrypoints)
5//! while collecting rich traces for diagnosis and optimization.
6
7use crate::registry::{AgentContract, RegisteredAgent};
8use schemars::JsonSchema;
9use serde::{Deserialize, Serialize};
10use std::time::{Duration, Instant};
11use tracing::{info, warn};
12
13/// A single trace event captured during an agent run.
14/// Made first-class for trace-to-patch optimization.
15#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
16pub struct TraceEvent {
17    pub timestamp_ms: u64,
18    pub event_type: String, // "llm_call", "tool_call", "error", "decision", etc.
19    pub data: serde_json::Value,
20    #[serde(default)]
21    pub span_id: Option<String>,
22    #[serde(default)]
23    pub parent_span_id: Option<String>,
24    #[serde(default)]
25    pub latency_ms: Option<u64>,
26    #[serde(default)]
27    pub token_usage: Option<serde_json::Value>, // {prompt: , completion: , total: }
28    #[serde(default)]
29    pub model: Option<String>,
30    #[serde(default)]
31    pub tool_name: Option<String>,
32    #[serde(default)]
33    pub cost_usd: Option<f64>,
34    #[serde(default)]
35    pub redacted: bool,
36    #[serde(default)]
37    pub candidate_id: Option<String>,
38}
39
40impl TraceEvent {
41    pub fn lifecycle(
42        timestamp_ms: u64,
43        event_type: impl Into<String>,
44        span_id: impl Into<String>,
45        parent_span_id: Option<String>,
46        latency_ms: Option<u64>,
47        data: serde_json::Value,
48    ) -> Self {
49        Self {
50            timestamp_ms,
51            event_type: event_type.into(),
52            data,
53            span_id: Some(span_id.into()),
54            parent_span_id,
55            latency_ms,
56            token_usage: None,
57            model: None,
58            tool_name: None,
59            cost_usd: None,
60            redacted: false,
61            candidate_id: None,
62        }
63    }
64}
65
66/// The result of running an agent on a single input, including traces.
67#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
68pub struct AgentRunResult {
69    pub output: serde_json::Value,
70    pub duration_ms: u64,
71    pub success: bool,
72    pub error: Option<String>,
73    pub traces: Vec<TraceEvent>,
74}
75
76/// Run a registered agent with the given input.
77/// Currently supports Process contracts (spawns the agent binary and pipes JSON).
78/// NativeRust support will be added when we generate harnesses.
79pub async fn run_agent(
80    agent: &RegisteredAgent,
81    input: serde_json::Value,
82) -> anyhow::Result<AgentRunResult> {
83    let start = Instant::now();
84    let mut traces = vec![];
85
86    info!(agent = %agent.name, "starting agent run");
87
88    match agent.contract {
89        AgentContract::Process | AgentContract::NativeRust => {
90            // For development and the example agent, we treat both as "runnable via cargo".
91            // Real NativeRust support (in-process or harness) will come later.
92            let result = run_process_agent(agent, input).await?;
93
94            let run_span_id = format!("run-{}", start.elapsed().as_nanos());
95            traces.push(TraceEvent::lifecycle(
96                0,
97                "run_started",
98                run_span_id.clone(),
99                None,
100                None,
101                serde_json::json!({
102                    "agent": agent.name,
103                    "contract": format!("{:?}", agent.contract)
104                }),
105            ));
106            traces.push(TraceEvent::lifecycle(
107                start.elapsed().as_millis() as u64,
108                "run_completed",
109                format!("{run_span_id}:completed"),
110                Some(run_span_id),
111                Some(result.duration_ms),
112                serde_json::json!({
113                    "success": result.success,
114                    "duration_ms": result.duration_ms
115                }),
116            ));
117
118            if !result.success {
119                warn!(agent = %agent.name, error = ?result.error, "agent run failed");
120            }
121
122            Ok(AgentRunResult {
123                output: result.output,
124                duration_ms: result.duration_ms,
125                success: result.success,
126                error: result.error,
127                traces,
128            })
129        }
130    }
131}
132
133// Internal helper – the actual process invocation with timeout.
134// A broken/hanging agent must fail with a structured error, never hang the optimizer (P0 requirement).
135async fn run_process_agent(
136    agent: &RegisteredAgent,
137    input: serde_json::Value,
138) -> anyhow::Result<AgentRunResult> {
139    use std::io::Write;
140    use std::process::{Command, Stdio};
141
142    let start = Instant::now();
143    const AGENT_RUN_TIMEOUT: Duration = Duration::from_secs(120);
144
145    let manifest = agent.path.join("Cargo.toml");
146    if !manifest.exists() {
147        return Err(anyhow::anyhow!("Cannot find Cargo.toml for Process agent"));
148    }
149
150    let mut command = Command::new("cargo");
151    command
152        .current_dir(&agent.path)
153        .args([
154            "run",
155            "-q",
156            "--manifest-path",
157            manifest.to_str().unwrap(),
158            "--",
159        ])
160        .stdin(Stdio::piped())
161        .stdout(Stdio::piped())
162        .stderr(Stdio::piped());
163    configure_process_group(&mut command);
164
165    let mut child = command.spawn()?;
166
167    {
168        let mut stdin = child
169            .stdin
170            .take()
171            .ok_or_else(|| anyhow::anyhow!("no stdin"))?;
172        stdin.write_all(serde_json::to_string(&input)?.as_bytes())?;
173        stdin.write_all(b"\n")?;
174    }
175
176    let output = loop {
177        match child.try_wait() {
178            Ok(Some(_)) => break child.wait_with_output()?,
179            Ok(None) if start.elapsed() >= AGENT_RUN_TIMEOUT => {
180                terminate_process_group(child.id());
181                let _ = child.kill();
182                let _ = child.wait();
183                let duration = start.elapsed().as_millis() as u64;
184                return Ok(AgentRunResult {
185                    output: serde_json::json!({"error": "agent timed out"}),
186                    duration_ms: duration,
187                    success: false,
188                    error: Some(format!(
189                        "Agent run exceeded {}s timeout and was terminated",
190                        AGENT_RUN_TIMEOUT.as_secs()
191                    )),
192                    traces: vec![],
193                });
194            }
195            Ok(None) => std::thread::sleep(Duration::from_millis(20)),
196            Err(e) => {
197                terminate_process_group(child.id());
198                let _ = child.kill();
199                let _ = child.wait();
200                let duration = start.elapsed().as_millis() as u64;
201                return Ok(AgentRunResult {
202                    output: serde_json::json!({"error": format!("wait error: {}", e)}),
203                    duration_ms: duration,
204                    success: false,
205                    error: Some(e.to_string()),
206                    traces: vec![],
207                });
208            }
209        }
210    };
211
212    let duration = start.elapsed().as_millis() as u64;
213
214    if !output.status.success() {
215        let stderr = String::from_utf8_lossy(&output.stderr).to_string();
216        return Ok(AgentRunResult {
217            output: serde_json::json!({"error": stderr}),
218            duration_ms: duration,
219            success: false,
220            error: Some(stderr),
221            traces: vec![],
222        });
223    }
224
225    let stdout = String::from_utf8_lossy(&output.stdout);
226    let parsed: serde_json::Value = serde_json::from_str(stdout.trim())
227        .unwrap_or_else(|_| serde_json::json!({"raw": stdout.to_string()}));
228
229    Ok(AgentRunResult {
230        output: parsed,
231        duration_ms: duration,
232        success: true,
233        error: None,
234        traces: vec![],
235    })
236}
237
238#[cfg(unix)]
239fn configure_process_group(command: &mut std::process::Command) {
240    use std::os::unix::process::CommandExt;
241    command.process_group(0);
242}
243
244#[cfg(not(unix))]
245fn configure_process_group(_command: &mut std::process::Command) {}
246
247#[cfg(unix)]
248fn terminate_process_group(pid: u32) {
249    for signal in ["-TERM", "-KILL"] {
250        let _ = std::process::Command::new("kill")
251            .arg(signal)
252            .arg(format!("-{pid}"))
253            .stdin(std::process::Stdio::null())
254            .stdout(std::process::Stdio::null())
255            .stderr(std::process::Stdio::null())
256            .status();
257        std::thread::sleep(Duration::from_millis(50));
258    }
259}
260
261#[cfg(not(unix))]
262fn terminate_process_group(_pid: u32) {}
263
264#[cfg(test)]
265mod tests {
266    use super::*;
267    use crate::registry::{AgentContract, RegisteredAgent};
268    use tempfile::tempdir;
269
270    #[tokio::test]
271    async fn process_agent_receives_eof_after_json_input() {
272        let dir = tempdir().unwrap();
273        std::fs::create_dir_all(dir.path().join("src")).unwrap();
274        std::fs::write(
275            dir.path().join("Cargo.toml"),
276            "[package]\nname=\"stdin-eof-agent\"\nversion=\"0.1.0\"\nedition=\"2021\"\n",
277        )
278        .unwrap();
279        std::fs::write(
280            dir.path().join("src/main.rs"),
281            r#"
282use std::io::Read;
283
284fn main() {
285    let mut input = String::new();
286    std::io::stdin().read_to_string(&mut input).unwrap();
287    assert!(!input.trim().is_empty());
288    println!("{{\"answer\":\"read eof\",\"reasoning\":\"stdin closed\",\"confidence\":0.9}}");
289}
290"#,
291        )
292        .unwrap();
293
294        let agent = RegisteredAgent {
295            name: "stdin-eof-agent".to_string(),
296            path: dir.path().to_path_buf(),
297            contract: AgentContract::Process,
298            registered_at: "test".to_string(),
299        };
300
301        let result = run_agent(&agent, serde_json::json!({"query":"hello"}))
302            .await
303            .unwrap();
304
305        assert!(result.success, "{:?}", result.error);
306        assert_eq!(result.output["answer"], "read eof");
307    }
308}