1use crate::registry::{AgentContract, RegisteredAgent};
8use serde::{Deserialize, Serialize};
9use std::time::{Duration, Instant};
10use tracing::{info, warn};
11
12#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct TraceEvent {
16 pub timestamp_ms: u64,
17 pub event_type: String, pub data: serde_json::Value,
19 #[serde(default)]
20 pub span_id: Option<String>,
21 #[serde(default)]
22 pub parent_span_id: Option<String>,
23 #[serde(default)]
24 pub latency_ms: Option<u64>,
25 #[serde(default)]
26 pub token_usage: Option<serde_json::Value>, #[serde(default)]
28 pub model: Option<String>,
29 #[serde(default)]
30 pub tool_name: Option<String>,
31 #[serde(default)]
32 pub cost_usd: Option<f64>,
33 #[serde(default)]
34 pub redacted: bool,
35 #[serde(default)]
36 pub candidate_id: Option<String>,
37}
38
39impl TraceEvent {
40 pub fn lifecycle(
41 timestamp_ms: u64,
42 event_type: impl Into<String>,
43 span_id: impl Into<String>,
44 parent_span_id: Option<String>,
45 latency_ms: Option<u64>,
46 data: serde_json::Value,
47 ) -> Self {
48 Self {
49 timestamp_ms,
50 event_type: event_type.into(),
51 data,
52 span_id: Some(span_id.into()),
53 parent_span_id,
54 latency_ms,
55 token_usage: None,
56 model: None,
57 tool_name: None,
58 cost_usd: None,
59 redacted: false,
60 candidate_id: None,
61 }
62 }
63}
64
65#[derive(Debug, Clone, Serialize, Deserialize)]
67pub struct AgentRunResult {
68 pub output: serde_json::Value,
69 pub duration_ms: u64,
70 pub success: bool,
71 pub error: Option<String>,
72 pub traces: Vec<TraceEvent>,
73}
74
75pub async fn run_agent(
79 agent: &RegisteredAgent,
80 input: serde_json::Value,
81) -> anyhow::Result<AgentRunResult> {
82 let start = Instant::now();
83 let mut traces = vec![];
84
85 info!(agent = %agent.name, "starting agent run");
86
87 match agent.contract {
88 AgentContract::Process | AgentContract::NativeRust => {
89 let result = run_process_agent(agent, input).await?;
92
93 let run_span_id = format!("run-{}", start.elapsed().as_nanos());
94 traces.push(TraceEvent::lifecycle(
95 0,
96 "run_started",
97 run_span_id.clone(),
98 None,
99 None,
100 serde_json::json!({
101 "agent": agent.name,
102 "contract": format!("{:?}", agent.contract)
103 }),
104 ));
105 traces.push(TraceEvent::lifecycle(
106 start.elapsed().as_millis() as u64,
107 "run_completed",
108 format!("{run_span_id}:completed"),
109 Some(run_span_id),
110 Some(result.duration_ms),
111 serde_json::json!({
112 "success": result.success,
113 "duration_ms": result.duration_ms
114 }),
115 ));
116
117 if !result.success {
118 warn!(agent = %agent.name, error = ?result.error, "agent run failed");
119 }
120
121 Ok(AgentRunResult {
122 output: result.output,
123 duration_ms: result.duration_ms,
124 success: result.success,
125 error: result.error,
126 traces,
127 })
128 }
129 }
130}
131
132async fn run_process_agent(
135 agent: &RegisteredAgent,
136 input: serde_json::Value,
137) -> anyhow::Result<AgentRunResult> {
138 use std::io::Write;
139 use std::process::{Command, Stdio};
140
141 let start = Instant::now();
142 const AGENT_RUN_TIMEOUT: Duration = Duration::from_secs(120);
143
144 let manifest = agent.path.join("Cargo.toml");
145 if !manifest.exists() {
146 return Err(anyhow::anyhow!("Cannot find Cargo.toml for Process agent"));
147 }
148
149 let mut command = Command::new("cargo");
150 command
151 .current_dir(&agent.path)
152 .args([
153 "run",
154 "-q",
155 "--manifest-path",
156 manifest.to_str().unwrap(),
157 "--",
158 ])
159 .stdin(Stdio::piped())
160 .stdout(Stdio::piped())
161 .stderr(Stdio::piped());
162 configure_process_group(&mut command);
163
164 let mut child = command.spawn()?;
165
166 {
167 let mut stdin = child
168 .stdin
169 .take()
170 .ok_or_else(|| anyhow::anyhow!("no stdin"))?;
171 stdin.write_all(serde_json::to_string(&input)?.as_bytes())?;
172 stdin.write_all(b"\n")?;
173 }
174
175 let output = loop {
176 match child.try_wait() {
177 Ok(Some(_)) => break child.wait_with_output()?,
178 Ok(None) if start.elapsed() >= AGENT_RUN_TIMEOUT => {
179 terminate_process_group(child.id());
180 let _ = child.kill();
181 let _ = child.wait();
182 let duration = start.elapsed().as_millis() as u64;
183 return Ok(AgentRunResult {
184 output: serde_json::json!({"error": "agent timed out"}),
185 duration_ms: duration,
186 success: false,
187 error: Some(format!(
188 "Agent run exceeded {}s timeout and was terminated",
189 AGENT_RUN_TIMEOUT.as_secs()
190 )),
191 traces: vec![],
192 });
193 }
194 Ok(None) => std::thread::sleep(Duration::from_millis(20)),
195 Err(e) => {
196 terminate_process_group(child.id());
197 let _ = child.kill();
198 let _ = child.wait();
199 let duration = start.elapsed().as_millis() as u64;
200 return Ok(AgentRunResult {
201 output: serde_json::json!({"error": format!("wait error: {}", e)}),
202 duration_ms: duration,
203 success: false,
204 error: Some(e.to_string()),
205 traces: vec![],
206 });
207 }
208 }
209 };
210
211 let duration = start.elapsed().as_millis() as u64;
212
213 if !output.status.success() {
214 let stderr = String::from_utf8_lossy(&output.stderr).to_string();
215 return Ok(AgentRunResult {
216 output: serde_json::json!({"error": stderr}),
217 duration_ms: duration,
218 success: false,
219 error: Some(stderr),
220 traces: vec![],
221 });
222 }
223
224 let stdout = String::from_utf8_lossy(&output.stdout);
225 let parsed: serde_json::Value = serde_json::from_str(stdout.trim())
226 .unwrap_or_else(|_| serde_json::json!({"raw": stdout.to_string()}));
227
228 Ok(AgentRunResult {
229 output: parsed,
230 duration_ms: duration,
231 success: true,
232 error: None,
233 traces: vec![],
234 })
235}
236
237#[cfg(unix)]
238fn configure_process_group(command: &mut std::process::Command) {
239 use std::os::unix::process::CommandExt;
240 command.process_group(0);
241}
242
243#[cfg(not(unix))]
244fn configure_process_group(_command: &mut std::process::Command) {}
245
246#[cfg(unix)]
247fn terminate_process_group(pid: u32) {
248 for signal in ["-TERM", "-KILL"] {
249 let _ = std::process::Command::new("kill")
250 .arg(signal)
251 .arg(format!("-{pid}"))
252 .stdin(std::process::Stdio::null())
253 .stdout(std::process::Stdio::null())
254 .stderr(std::process::Stdio::null())
255 .status();
256 std::thread::sleep(Duration::from_millis(50));
257 }
258}
259
260#[cfg(not(unix))]
261fn terminate_process_group(_pid: u32) {}
262
263#[cfg(test)]
264mod tests {
265 use super::*;
266 use crate::registry::{AgentContract, RegisteredAgent};
267 use tempfile::tempdir;
268
269 #[tokio::test]
270 async fn process_agent_receives_eof_after_json_input() {
271 let dir = tempdir().unwrap();
272 std::fs::create_dir_all(dir.path().join("src")).unwrap();
273 std::fs::write(
274 dir.path().join("Cargo.toml"),
275 "[package]\nname=\"stdin-eof-agent\"\nversion=\"0.1.0\"\nedition=\"2021\"\n",
276 )
277 .unwrap();
278 std::fs::write(
279 dir.path().join("src/main.rs"),
280 r#"
281use std::io::Read;
282
283fn main() {
284 let mut input = String::new();
285 std::io::stdin().read_to_string(&mut input).unwrap();
286 assert!(!input.trim().is_empty());
287 println!("{{\"answer\":\"read eof\",\"reasoning\":\"stdin closed\",\"confidence\":0.9}}");
288}
289"#,
290 )
291 .unwrap();
292
293 let agent = RegisteredAgent {
294 name: "stdin-eof-agent".to_string(),
295 path: dir.path().to_path_buf(),
296 contract: AgentContract::Process,
297 registered_at: "test".to_string(),
298 };
299
300 let result = run_agent(&agent, serde_json::json!({"query":"hello"}))
301 .await
302 .unwrap();
303
304 assert!(result.success, "{:?}", result.error);
305 assert_eq!(result.output["answer"], "read eof");
306 }
307}