scud/attractor/handlers/
rho.rs1use anyhow::{Context as _, Result};
8use async_trait::async_trait;
9use std::collections::HashMap;
10use tokio::io::{AsyncBufReadExt, BufReader};
11use tokio::process::Command;
12use tracing::{debug, trace, warn};
13
14use crate::attractor::context::Context;
15use crate::attractor::graph::{PipelineGraph, PipelineNode};
16use crate::attractor::outcome::{Outcome, StageStatus};
17use crate::attractor::run_directory::RunDirectory;
18use crate::commands::spawn::terminal::{find_harness_binary, Harness};
19
20use super::Handler;
21
22pub struct RhoHandler;
23
24#[async_trait]
25impl Handler for RhoHandler {
26 async fn execute(
27 &self,
28 node: &PipelineNode,
29 context: &Context,
30 graph: &PipelineGraph,
31 run_dir: &RunDirectory,
32 ) -> Result<Outcome> {
33 let binary_path = find_harness_binary(Harness::Rho)
35 .context("Could not find rho-cli binary")?;
36
37 let prompt = expand_prompt(&node.prompt, graph, context).await;
39
40 if prompt.is_empty() {
41 return Ok(Outcome::failure("No prompt specified for rho handler"));
42 }
43
44 run_dir.write_prompt(&node.id, &prompt)?;
46
47 let working_dir = std::env::current_dir().unwrap_or_default();
49
50 let mut cmd = Command::new(binary_path);
52 cmd.arg("--output-format").arg("stream-json");
53 cmd.arg("-p").arg(&prompt);
54 cmd.arg("-C").arg(&working_dir);
55
56 let model = node
58 .extra_attrs
59 .get("rho_model")
60 .map(|v| v.as_str())
61 .or_else(|| node.llm_model.clone());
62 if let Some(ref m) = model {
63 cmd.arg("--model").arg(m);
64 }
65
66 cmd.current_dir(&working_dir);
67 cmd.stdout(std::process::Stdio::piped());
68 cmd.stderr(std::process::Stdio::piped());
69
70 debug!(node_id = %node.id, "spawning rho-cli");
71
72 let mut child = cmd.spawn().context("Failed to spawn rho-cli")?;
73
74 let stdout = child
75 .stdout
76 .take()
77 .context("Failed to capture rho-cli stdout")?;
78
79 let reader = BufReader::new(stdout);
81 let mut lines = reader.lines();
82 let mut response_text = String::new();
83 let mut success = true;
84 let mut error_message: Option<String> = None;
85 let mut tool_count: u32 = 0;
86
87 while let Ok(Some(line)) = lines.next_line().await {
88 if line.trim().is_empty() {
89 continue;
90 }
91
92 let json: serde_json::Value = match serde_json::from_str(&line) {
93 Ok(v) => v,
94 Err(_) => {
95 trace!(node_id = %node.id, "rho: unparsed line: {}", if line.len() > 200 { &line[..200] } else { &line });
96 continue;
97 }
98 };
99
100 let event_type = match json.get("type").and_then(|v| v.as_str()) {
101 Some(t) => t,
102 None => continue,
103 };
104
105 match event_type {
106 "text_delta" => {
107 if let Some(text) = json.get("text").and_then(|v| v.as_str()) {
108 response_text.push_str(text);
109 }
110 }
111 "tool_start" => {
112 tool_count += 1;
113 let tool_name = json
114 .get("tool_name")
115 .and_then(|v| v.as_str())
116 .unwrap_or("unknown");
117 trace!(node_id = %node.id, tool = tool_name, "rho tool start");
118 }
119 "tool_result" => {
120 let tool_success = json
121 .get("success")
122 .and_then(|v| v.as_bool())
123 .unwrap_or(true);
124 if !tool_success {
125 debug!(node_id = %node.id, "rho tool reported failure");
126 }
127 }
128 "complete" => {
129 success = json
130 .get("success")
131 .and_then(|v| v.as_bool())
132 .unwrap_or(true);
133 }
134 "error" => {
135 let msg = json
136 .get("message")
137 .and_then(|v| v.as_str())
138 .unwrap_or("Unknown rho error")
139 .to_string();
140 warn!(node_id = %node.id, error = %msg, "rho error event");
141 error_message = Some(msg);
142 success = false;
143 }
144 "session" => {
145 if let Some(sid) = json.get("session_id").and_then(|v| v.as_str()) {
146 trace!(node_id = %node.id, session_id = sid, "rho session assigned");
147 }
148 }
149 _ => {
150 trace!(node_id = %node.id, event_type, "rho: unknown event type");
151 }
152 }
153 }
154
155 let exit_status = child.wait().await.context("Failed to wait for rho-cli")?;
157
158 if !exit_status.success() && success {
160 success = false;
161 if error_message.is_none() {
162 error_message = Some(format!(
163 "rho-cli exited with code {:?}",
164 exit_status.code()
165 ));
166 }
167 }
168
169 run_dir.write_response(&node.id, &response_text)?;
171
172 let status_json = serde_json::json!({
174 "node_id": node.id,
175 "status": if success { "success" } else { "failure" },
176 "tool_calls": tool_count,
177 });
178 run_dir.write_status(&node.id, &status_json)?;
179
180 let mut updates = HashMap::new();
182 updates.insert(
183 format!("{}.response", node.id),
184 serde_json::json!(response_text),
185 );
186 updates.insert(
187 format!("{}.tool_calls", node.id),
188 serde_json::json!(tool_count),
189 );
190
191 if success {
192 Ok(Outcome {
193 status: StageStatus::Success,
194 preferred_label: None,
195 suggested_next: vec![],
196 context_updates: updates,
197 response_text: Some(response_text),
198 summary: None,
199 })
200 } else {
201 let msg = error_message.unwrap_or_else(|| "rho-cli execution failed".into());
202 Ok(Outcome::failure(&msg)
203 .with_response(response_text)
204 .with_context(updates))
205 }
206 }
207}
208
209async fn expand_prompt(prompt: &str, graph: &PipelineGraph, context: &Context) -> String {
213 let mut result = prompt.to_string();
214
215 if let Some(ref goal) = graph.graph_attrs.goal {
217 result = result.replace("$goal", goal);
218 }
219
220 let snapshot = context.snapshot().await;
222 for (key, value) in &snapshot {
223 let pattern = format!("$context.{}", key);
224 let replacement = match value {
225 serde_json::Value::String(s) => s.clone(),
226 other => other.to_string(),
227 };
228 result = result.replace(&pattern, &replacement);
229 }
230
231 result
232}