1use crate::registry::{AgentContract, RegisteredAgent};
8use schemars::JsonSchema;
9use serde::{Deserialize, Serialize};
10use std::time::{Duration, Instant};
11use tracing::{info, warn};
12
13#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
16pub struct TraceEvent {
17 pub timestamp_ms: u64,
18 pub event_type: String, pub data: serde_json::Value,
20 #[serde(default)]
21 pub span_id: Option<String>,
22 #[serde(default)]
23 pub parent_span_id: Option<String>,
24 #[serde(default)]
25 pub latency_ms: Option<u64>,
26 #[serde(default)]
27 pub token_usage: Option<serde_json::Value>, #[serde(default)]
29 pub model: Option<String>,
30 #[serde(default)]
31 pub tool_name: Option<String>,
32 #[serde(default)]
33 pub cost_usd: Option<f64>,
34 #[serde(default)]
35 pub redacted: bool,
36 #[serde(default)]
37 pub candidate_id: Option<String>,
38}
39
40impl TraceEvent {
41 pub fn lifecycle(
42 timestamp_ms: u64,
43 event_type: impl Into<String>,
44 span_id: impl Into<String>,
45 parent_span_id: Option<String>,
46 latency_ms: Option<u64>,
47 data: serde_json::Value,
48 ) -> Self {
49 Self {
50 timestamp_ms,
51 event_type: event_type.into(),
52 data,
53 span_id: Some(span_id.into()),
54 parent_span_id,
55 latency_ms,
56 token_usage: None,
57 model: None,
58 tool_name: None,
59 cost_usd: None,
60 redacted: false,
61 candidate_id: None,
62 }
63 }
64}
65
66#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
68pub struct AgentRunResult {
69 pub output: serde_json::Value,
70 pub duration_ms: u64,
71 pub success: bool,
72 pub error: Option<String>,
73 pub traces: Vec<TraceEvent>,
74}
75
76pub async fn run_agent(
80 agent: &RegisteredAgent,
81 input: serde_json::Value,
82) -> anyhow::Result<AgentRunResult> {
83 let start = Instant::now();
84 let mut traces = vec![];
85
86 info!(agent = %agent.name, "starting agent run");
87
88 match agent.contract {
89 AgentContract::Process | AgentContract::NativeRust => {
90 let result = run_process_agent(agent, input).await?;
93
94 let run_span_id = format!("run-{}", start.elapsed().as_nanos());
95 traces.push(TraceEvent::lifecycle(
96 0,
97 "run_started",
98 run_span_id.clone(),
99 None,
100 None,
101 serde_json::json!({
102 "agent": agent.name,
103 "contract": format!("{:?}", agent.contract)
104 }),
105 ));
106 traces.push(TraceEvent::lifecycle(
107 start.elapsed().as_millis() as u64,
108 "run_completed",
109 format!("{run_span_id}:completed"),
110 Some(run_span_id),
111 Some(result.duration_ms),
112 serde_json::json!({
113 "success": result.success,
114 "duration_ms": result.duration_ms
115 }),
116 ));
117
118 if !result.success {
119 warn!(agent = %agent.name, error = ?result.error, "agent run failed");
120 }
121
122 Ok(AgentRunResult {
123 output: result.output,
124 duration_ms: result.duration_ms,
125 success: result.success,
126 error: result.error,
127 traces,
128 })
129 }
130 }
131}
132
133async fn run_process_agent(
136 agent: &RegisteredAgent,
137 input: serde_json::Value,
138) -> anyhow::Result<AgentRunResult> {
139 use std::io::Write;
140 use std::process::{Command, Stdio};
141
142 let start = Instant::now();
143 const AGENT_RUN_TIMEOUT: Duration = Duration::from_secs(120);
144
145 let manifest = agent.path.join("Cargo.toml");
146 if !manifest.exists() {
147 return Err(anyhow::anyhow!("Cannot find Cargo.toml for Process agent"));
148 }
149
150 let mut command = Command::new("cargo");
151 command
152 .current_dir(&agent.path)
153 .args([
154 "run",
155 "-q",
156 "--manifest-path",
157 manifest.to_str().unwrap(),
158 "--",
159 ])
160 .stdin(Stdio::piped())
161 .stdout(Stdio::piped())
162 .stderr(Stdio::piped());
163 configure_process_group(&mut command);
164
165 let mut child = command.spawn()?;
166
167 {
168 let mut stdin = child
169 .stdin
170 .take()
171 .ok_or_else(|| anyhow::anyhow!("no stdin"))?;
172 stdin.write_all(serde_json::to_string(&input)?.as_bytes())?;
173 stdin.write_all(b"\n")?;
174 }
175
176 let output = loop {
177 match child.try_wait() {
178 Ok(Some(_)) => break child.wait_with_output()?,
179 Ok(None) if start.elapsed() >= AGENT_RUN_TIMEOUT => {
180 terminate_process_group(child.id());
181 let _ = child.kill();
182 let _ = child.wait();
183 let duration = start.elapsed().as_millis() as u64;
184 return Ok(AgentRunResult {
185 output: serde_json::json!({"error": "agent timed out"}),
186 duration_ms: duration,
187 success: false,
188 error: Some(format!(
189 "Agent run exceeded {}s timeout and was terminated",
190 AGENT_RUN_TIMEOUT.as_secs()
191 )),
192 traces: vec![],
193 });
194 }
195 Ok(None) => std::thread::sleep(Duration::from_millis(20)),
196 Err(e) => {
197 terminate_process_group(child.id());
198 let _ = child.kill();
199 let _ = child.wait();
200 let duration = start.elapsed().as_millis() as u64;
201 return Ok(AgentRunResult {
202 output: serde_json::json!({"error": format!("wait error: {}", e)}),
203 duration_ms: duration,
204 success: false,
205 error: Some(e.to_string()),
206 traces: vec![],
207 });
208 }
209 }
210 };
211
212 let duration = start.elapsed().as_millis() as u64;
213
214 if !output.status.success() {
215 let stderr = String::from_utf8_lossy(&output.stderr).to_string();
216 return Ok(AgentRunResult {
217 output: serde_json::json!({"error": stderr}),
218 duration_ms: duration,
219 success: false,
220 error: Some(stderr),
221 traces: vec![],
222 });
223 }
224
225 let stdout = String::from_utf8_lossy(&output.stdout);
226 let parsed: serde_json::Value = serde_json::from_str(stdout.trim())
227 .unwrap_or_else(|_| serde_json::json!({"raw": stdout.to_string()}));
228
229 Ok(AgentRunResult {
230 output: parsed,
231 duration_ms: duration,
232 success: true,
233 error: None,
234 traces: vec![],
235 })
236}
237
238#[cfg(unix)]
239fn configure_process_group(command: &mut std::process::Command) {
240 use std::os::unix::process::CommandExt;
241 command.process_group(0);
242}
243
244#[cfg(not(unix))]
245fn configure_process_group(_command: &mut std::process::Command) {}
246
247#[cfg(unix)]
248fn terminate_process_group(pid: u32) {
249 for signal in ["-TERM", "-KILL"] {
250 let _ = std::process::Command::new("kill")
251 .arg(signal)
252 .arg(format!("-{pid}"))
253 .stdin(std::process::Stdio::null())
254 .stdout(std::process::Stdio::null())
255 .stderr(std::process::Stdio::null())
256 .status();
257 std::thread::sleep(Duration::from_millis(50));
258 }
259}
260
261#[cfg(not(unix))]
262fn terminate_process_group(_pid: u32) {}
263
264#[cfg(test)]
265mod tests {
266 use super::*;
267 use crate::registry::{AgentContract, RegisteredAgent};
268 use tempfile::tempdir;
269
270 #[tokio::test]
271 async fn process_agent_receives_eof_after_json_input() {
272 let dir = tempdir().unwrap();
273 std::fs::create_dir_all(dir.path().join("src")).unwrap();
274 std::fs::write(
275 dir.path().join("Cargo.toml"),
276 "[package]\nname=\"stdin-eof-agent\"\nversion=\"0.1.0\"\nedition=\"2021\"\n",
277 )
278 .unwrap();
279 std::fs::write(
280 dir.path().join("src/main.rs"),
281 r#"
282use std::io::Read;
283
284fn main() {
285 let mut input = String::new();
286 std::io::stdin().read_to_string(&mut input).unwrap();
287 assert!(!input.trim().is_empty());
288 println!("{{\"answer\":\"read eof\",\"reasoning\":\"stdin closed\",\"confidence\":0.9}}");
289}
290"#,
291 )
292 .unwrap();
293
294 let agent = RegisteredAgent {
295 name: "stdin-eof-agent".to_string(),
296 path: dir.path().to_path_buf(),
297 contract: AgentContract::Process,
298 registered_at: "test".to_string(),
299 };
300
301 let result = run_agent(&agent, serde_json::json!({"query":"hello"}))
302 .await
303 .unwrap();
304
305 assert!(result.success, "{:?}", result.error);
306 assert_eq!(result.output["answer"], "read eof");
307 }
308}