Skip to main content

scud/attractor/handlers/
rho.rs

1//! Rho handler — executes tasks via rho-cli subprocess.
2//!
3//! Spawns `rho-cli --output-format stream-json` with the node prompt,
4//! parses the streaming JSON events, and returns an Outcome with the
5//! collected response text.
6
7use anyhow::{Context as _, Result};
8use async_trait::async_trait;
9use std::collections::HashMap;
10use tokio::io::{AsyncBufReadExt, BufReader};
11use tokio::process::Command;
12use tracing::{debug, trace, warn};
13
14use crate::attractor::context::Context;
15use crate::attractor::graph::{PipelineGraph, PipelineNode};
16use crate::attractor::outcome::{Outcome, StageStatus};
17use crate::attractor::run_directory::RunDirectory;
18use crate::commands::spawn::terminal::{find_harness_binary, Harness};
19
20use super::Handler;
21
22pub struct RhoHandler;
23
24#[async_trait]
25impl Handler for RhoHandler {
26    async fn execute(
27        &self,
28        node: &PipelineNode,
29        context: &Context,
30        graph: &PipelineGraph,
31        run_dir: &RunDirectory,
32    ) -> Result<Outcome> {
33        // Resolve the rho-cli binary
34        let binary_path = find_harness_binary(Harness::Rho)
35            .context("Could not find rho-cli binary")?;
36
37        // Expand variables in the prompt
38        let prompt = expand_prompt(&node.prompt, graph, context).await;
39
40        if prompt.is_empty() {
41            return Ok(Outcome::failure("No prompt specified for rho handler"));
42        }
43
44        // Write prompt to run directory
45        run_dir.write_prompt(&node.id, &prompt)?;
46
47        // Determine working directory
48        let working_dir = std::env::current_dir().unwrap_or_default();
49
50        // Build the rho-cli command
51        let mut cmd = Command::new(binary_path);
52        cmd.arg("--output-format").arg("stream-json");
53        cmd.arg("-p").arg(&prompt);
54        cmd.arg("-C").arg(&working_dir);
55
56        // Optional model override from node attributes
57        let model = node
58            .extra_attrs
59            .get("rho_model")
60            .map(|v| v.as_str())
61            .or_else(|| node.llm_model.clone());
62        if let Some(ref m) = model {
63            cmd.arg("--model").arg(m);
64        }
65
66        cmd.current_dir(&working_dir);
67        cmd.stdout(std::process::Stdio::piped());
68        cmd.stderr(std::process::Stdio::piped());
69
70        debug!(node_id = %node.id, "spawning rho-cli");
71
72        let mut child = cmd.spawn().context("Failed to spawn rho-cli")?;
73
74        let stdout = child
75            .stdout
76            .take()
77            .context("Failed to capture rho-cli stdout")?;
78
79        // Parse streaming JSON output
80        let reader = BufReader::new(stdout);
81        let mut lines = reader.lines();
82        let mut response_text = String::new();
83        let mut success = true;
84        let mut error_message: Option<String> = None;
85        let mut tool_count: u32 = 0;
86
87        while let Ok(Some(line)) = lines.next_line().await {
88            if line.trim().is_empty() {
89                continue;
90            }
91
92            let json: serde_json::Value = match serde_json::from_str(&line) {
93                Ok(v) => v,
94                Err(_) => {
95                    trace!(node_id = %node.id, "rho: unparsed line: {}", if line.len() > 200 { &line[..200] } else { &line });
96                    continue;
97                }
98            };
99
100            let event_type = match json.get("type").and_then(|v| v.as_str()) {
101                Some(t) => t,
102                None => continue,
103            };
104
105            match event_type {
106                "text_delta" => {
107                    if let Some(text) = json.get("text").and_then(|v| v.as_str()) {
108                        response_text.push_str(text);
109                    }
110                }
111                "tool_start" => {
112                    tool_count += 1;
113                    let tool_name = json
114                        .get("tool_name")
115                        .and_then(|v| v.as_str())
116                        .unwrap_or("unknown");
117                    trace!(node_id = %node.id, tool = tool_name, "rho tool start");
118                }
119                "tool_result" => {
120                    let tool_success = json
121                        .get("success")
122                        .and_then(|v| v.as_bool())
123                        .unwrap_or(true);
124                    if !tool_success {
125                        debug!(node_id = %node.id, "rho tool reported failure");
126                    }
127                }
128                "complete" => {
129                    success = json
130                        .get("success")
131                        .and_then(|v| v.as_bool())
132                        .unwrap_or(true);
133                }
134                "error" => {
135                    let msg = json
136                        .get("message")
137                        .and_then(|v| v.as_str())
138                        .unwrap_or("Unknown rho error")
139                        .to_string();
140                    warn!(node_id = %node.id, error = %msg, "rho error event");
141                    error_message = Some(msg);
142                    success = false;
143                }
144                "session" => {
145                    if let Some(sid) = json.get("session_id").and_then(|v| v.as_str()) {
146                        trace!(node_id = %node.id, session_id = sid, "rho session assigned");
147                    }
148                }
149                _ => {
150                    trace!(node_id = %node.id, event_type, "rho: unknown event type");
151                }
152            }
153        }
154
155        // Wait for the child process to exit
156        let exit_status = child.wait().await.context("Failed to wait for rho-cli")?;
157
158        // If process exited with non-zero and we haven't already flagged failure
159        if !exit_status.success() && success {
160            success = false;
161            if error_message.is_none() {
162                error_message = Some(format!(
163                    "rho-cli exited with code {:?}",
164                    exit_status.code()
165                ));
166            }
167        }
168
169        // Write response to run directory
170        run_dir.write_response(&node.id, &response_text)?;
171
172        // Write status
173        let status_json = serde_json::json!({
174            "node_id": node.id,
175            "status": if success { "success" } else { "failure" },
176            "tool_calls": tool_count,
177        });
178        run_dir.write_status(&node.id, &status_json)?;
179
180        // Build context updates
181        let mut updates = HashMap::new();
182        updates.insert(
183            format!("{}.response", node.id),
184            serde_json::json!(response_text),
185        );
186        updates.insert(
187            format!("{}.tool_calls", node.id),
188            serde_json::json!(tool_count),
189        );
190
191        if success {
192            Ok(Outcome {
193                status: StageStatus::Success,
194                preferred_label: None,
195                suggested_next: vec![],
196                context_updates: updates,
197                response_text: Some(response_text),
198                summary: None,
199            })
200        } else {
201            let msg = error_message.unwrap_or_else(|| "rho-cli execution failed".into());
202            Ok(Outcome::failure(&msg)
203                .with_response(response_text)
204                .with_context(updates))
205        }
206    }
207}
208
209/// Expand `$goal` and `$context.key` variables in a prompt string.
210///
211/// Follows the same variable expansion pattern as the codergen handler.
212async fn expand_prompt(prompt: &str, graph: &PipelineGraph, context: &Context) -> String {
213    let mut result = prompt.to_string();
214
215    // Replace $goal with graph-level goal
216    if let Some(ref goal) = graph.graph_attrs.goal {
217        result = result.replace("$goal", goal);
218    }
219
220    // Replace $context.key patterns
221    let snapshot = context.snapshot().await;
222    for (key, value) in &snapshot {
223        let pattern = format!("$context.{}", key);
224        let replacement = match value {
225            serde_json::Value::String(s) => s.clone(),
226            other => other.to_string(),
227        };
228        result = result.replace(&pattern, &replacement);
229    }
230
231    result
232}