Skip to main content

ralph_adapters/
acp_executor.rs

1//! ACP (Agent Client Protocol) executor for kiro-acp backend.
2//!
3//! Implements the ACP lifecycle: spawn → initialize → session/new → session/prompt.
4//! Uses `agent-client-protocol` crate for bidirectional JSON-RPC over stdio.
5//!
6//! The ACP `Client` trait is `!Send`, so the protocol runs on a dedicated
7//! single-threaded runtime inside `spawn_blocking`. Events are streamed back
8//! to the caller via an unbounded channel for handler dispatch.
9
10use std::cell::RefCell;
11use std::collections::HashMap;
12use std::path::PathBuf;
13use std::rc::Rc;
14use std::sync::{Arc, Mutex};
15use std::time::Instant;
16
17use agent_client_protocol::{
18    Agent, CancelNotification, ClientSideConnection, ContentBlock, CreateTerminalRequest,
19    CreateTerminalResponse, InitializeRequest, KillTerminalCommandRequest,
20    KillTerminalCommandResponse, NewSessionRequest, PromptRequest, ProtocolVersion,
21    ReleaseTerminalRequest, ReleaseTerminalResponse, RequestPermissionOutcome,
22    RequestPermissionRequest, RequestPermissionResponse, SelectedPermissionOutcome,
23    SessionNotification, SessionUpdate, StopReason, TerminalExitStatus, TerminalId,
24    TerminalOutputRequest, TerminalOutputResponse, TextContent, ToolCallStatus,
25    WaitForTerminalExitRequest, WaitForTerminalExitResponse,
26};
27use anyhow::{Context, Result};
28use std::time::Duration;
29use tokio::io::AsyncReadExt;
30use tokio::sync::mpsc;
31use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
32use tracing::{debug, warn};
33
34use crate::cli_backend::CliBackend;
35use crate::pty_executor::{PtyExecutionResult, TerminationType};
36use crate::stream_handler::{SessionResult, StreamHandler};
37
38/// Events dispatched from the ACP Client impl to the executor.
39enum AcpEvent {
40    Text(String),
41    ToolCall {
42        name: String,
43        id: String,
44        input: serde_json::Value,
45    },
46    ToolResult {
47        id: String,
48        output: String,
49    },
50    #[allow(dead_code)]
51    Error(String),
52    /// Prompt completed with a stop reason.
53    Done(StopReason),
54    /// ACP lifecycle failed.
55    Failed(String),
56}
57
58/// State for a single ACP terminal (child process + captured output).
59struct TerminalState {
60    child: tokio::process::Child,
61    output: Rc<RefCell<Vec<u8>>>,
62    exit_status: Rc<RefCell<Option<TerminalExitStatus>>>,
63    output_byte_limit: Option<u64>,
64}
65
66type Terminals = Rc<RefCell<HashMap<String, TerminalState>>>;
67
68/// Ralph's implementation of the ACP `Client` trait.
69///
70/// Auto-approves all permissions and forwards session notifications
71/// as `AcpEvent`s through a channel.
72struct RalphAcpClient {
73    tx: mpsc::UnboundedSender<AcpEvent>,
74    terminals: Terminals,
75}
76
77#[async_trait::async_trait(?Send)]
78impl agent_client_protocol::Client for RalphAcpClient {
79    async fn request_permission(
80        &self,
81        args: RequestPermissionRequest,
82    ) -> agent_client_protocol::Result<RequestPermissionResponse> {
83        let option_id = args
84            .options
85            .first()
86            .map(|o| o.option_id.clone())
87            .unwrap_or_else(|| "allowed".into());
88        Ok(RequestPermissionResponse::new(
89            RequestPermissionOutcome::Selected(SelectedPermissionOutcome::new(option_id)),
90        ))
91    }
92
93    async fn session_notification(
94        &self,
95        args: SessionNotification,
96    ) -> agent_client_protocol::Result<()> {
97        match args.update {
98            SessionUpdate::AgentMessageChunk(chunk) => {
99                if let ContentBlock::Text(text) = chunk.content {
100                    let _ = self.tx.send(AcpEvent::Text(text.text));
101                }
102            }
103            SessionUpdate::ToolCall(tc) => {
104                // ACP sends two ToolCall notifications per tool:
105                // 1. Initial: no raw_input, no locations (just "tool started")
106                // 2. Update: has raw_input with actual parameters and a descriptive title
107                // Skip the first one to avoid showing bare "[Tool] ls" with no details.
108                if tc.raw_input.is_none() && tc.locations.is_empty() {
109                    return Ok(());
110                }
111
112                let input = tc.raw_input.clone().unwrap_or_else(|| {
113                    if let Some(loc) = tc.locations.first() {
114                        serde_json::json!({"path": loc.path.display().to_string()})
115                    } else {
116                        serde_json::Value::Null
117                    }
118                });
119                let _ = self.tx.send(AcpEvent::ToolCall {
120                    name: tc.title.clone(),
121                    id: tc.tool_call_id.to_string(),
122                    input,
123                });
124            }
125            SessionUpdate::ToolCallUpdate(update) => {
126                if update.fields.status == Some(ToolCallStatus::Completed) {
127                    // Try structured content first, fall back to raw_output
128                    let output = update
129                        .fields
130                        .content
131                        .as_ref()
132                        .and_then(|c| {
133                            c.iter().find_map(|block| {
134                                if let agent_client_protocol::ToolCallContent::Content(content) =
135                                    block
136                                    && let ContentBlock::Text(t) = &content.content
137                                {
138                                    return Some(t.text.clone());
139                                }
140                                None
141                            })
142                        })
143                        .or_else(|| {
144                            update.fields.raw_output.as_ref().map(|v| match v {
145                                serde_json::Value::String(s) => s.clone(),
146                                other => other.to_string(),
147                            })
148                        })
149                        .unwrap_or_default();
150                    let _ = self.tx.send(AcpEvent::ToolResult {
151                        id: update.tool_call_id.to_string(),
152                        output,
153                    });
154                }
155            }
156            SessionUpdate::Plan(plan) => {
157                let text = plan
158                    .entries
159                    .iter()
160                    .map(|e| format!("- {}", e.content))
161                    .collect::<Vec<_>>()
162                    .join("\n");
163                if !text.is_empty() {
164                    let _ = self
165                        .tx
166                        .send(AcpEvent::Text(format!("\n## Plan\n{}\n", text)));
167                }
168            }
169            _ => {}
170        }
171        Ok(())
172    }
173
174    async fn create_terminal(
175        &self,
176        args: CreateTerminalRequest,
177    ) -> agent_client_protocol::Result<CreateTerminalResponse> {
178        debug!("ACP create_terminal: {} {:?}", args.command, args.args);
179        let mut cmd = tokio::process::Command::new(&args.command);
180        cmd.args(&args.args)
181            .stdout(std::process::Stdio::piped())
182            .stderr(std::process::Stdio::piped())
183            .stdin(std::process::Stdio::null());
184
185        if let Some(cwd) = &args.cwd {
186            cmd.current_dir(cwd);
187        }
188        for env_var in &args.env {
189            cmd.env(&env_var.name, &env_var.value);
190        }
191
192        let mut child = cmd.spawn().map_err(|e| {
193            let mut err = agent_client_protocol::Error::internal_error();
194            err.message = format!("spawn failed: {e}");
195            err
196        })?;
197
198        let id = format!("term-{}", child.id().unwrap_or(0));
199        let output_buf = Rc::new(RefCell::new(Vec::new()));
200        let exit_status = Rc::new(RefCell::new(None));
201
202        // Spawn background reader for stdout
203        let stdout = child.stdout.take();
204        let stderr = child.stderr.take();
205        let buf_clone = Rc::clone(&output_buf);
206        let exit_clone = Rc::clone(&exit_status);
207        let limit = args.output_byte_limit;
208
209        tokio::task::spawn_local(async move {
210            let mut combined = Vec::new();
211            if let Some(mut out) = stdout {
212                let mut tmp = vec![0u8; 8192];
213                loop {
214                    match out.read(&mut tmp).await {
215                        Ok(0) => break,
216                        Ok(n) => combined.extend_from_slice(&tmp[..n]),
217                        Err(_) => break,
218                    }
219                }
220            }
221            if let Some(mut err) = stderr {
222                let mut tmp = vec![0u8; 8192];
223                loop {
224                    match err.read(&mut tmp).await {
225                        Ok(0) => break,
226                        Ok(n) => combined.extend_from_slice(&tmp[..n]),
227                        Err(_) => break,
228                    }
229                }
230            }
231            // Apply byte limit (truncate from beginning)
232            if let Some(max) = limit {
233                let max = max as usize;
234                if combined.len() > max {
235                    // Find a valid UTF-8 boundary
236                    let start = combined.len() - max;
237                    let s = String::from_utf8_lossy(&combined[start..]);
238                    combined = s.into_owned().into_bytes();
239                }
240            }
241            *buf_clone.borrow_mut() = combined;
242            // Mark as "reader done" — exit_status set by wait
243            let _ = exit_clone;
244        });
245
246        self.terminals.borrow_mut().insert(
247            id.clone(),
248            TerminalState {
249                child,
250                output: output_buf,
251                exit_status,
252                output_byte_limit: args.output_byte_limit,
253            },
254        );
255
256        Ok(CreateTerminalResponse::new(TerminalId::new(id)))
257    }
258
259    async fn terminal_output(
260        &self,
261        args: TerminalOutputRequest,
262    ) -> agent_client_protocol::Result<TerminalOutputResponse> {
263        let terminals = self.terminals.borrow();
264        let state = terminals.get(args.terminal_id.0.as_ref()).ok_or_else(|| {
265            let mut err = agent_client_protocol::Error::invalid_params();
266            err.message = format!("unknown terminal: {}", args.terminal_id);
267            err
268        })?;
269
270        let buf = state.output.borrow();
271        let output = String::from_utf8_lossy(&buf).into_owned();
272        let truncated = state
273            .output_byte_limit
274            .is_some_and(|limit| buf.len() >= limit as usize);
275        let exit_status = state.exit_status.borrow().clone();
276
277        Ok(TerminalOutputResponse::new(output, truncated).exit_status(exit_status))
278    }
279
280    async fn wait_for_terminal_exit(
281        &self,
282        args: WaitForTerminalExitRequest,
283    ) -> agent_client_protocol::Result<WaitForTerminalExitResponse> {
284        // Take child out temporarily to await it (can't hold borrow across await)
285        let (mut child, exit_rc) = {
286            let mut terminals = self.terminals.borrow_mut();
287            let state = terminals
288                .get_mut(args.terminal_id.0.as_ref())
289                .ok_or_else(|| {
290                    let mut err = agent_client_protocol::Error::invalid_params();
291                    err.message = format!("unknown terminal: {}", args.terminal_id);
292                    err
293                })?;
294            let exit_rc = Rc::clone(&state.exit_status);
295            // Check if already exited
296            if let Some(status) = state.exit_status.borrow().as_ref() {
297                return Ok(WaitForTerminalExitResponse::new(status.clone()));
298            }
299            // Try non-blocking wait
300            if let Ok(Some(status)) = state.child.try_wait() {
301                let es = TerminalExitStatus::new().exit_code(status.code().map(|c| c as u32));
302                *state.exit_status.borrow_mut() = Some(es.clone());
303                return Ok(WaitForTerminalExitResponse::new(es));
304            }
305            // Need to actually await — swap in a placeholder
306            let placeholder_child = tokio::process::Command::new("true").spawn().map_err(|e| {
307                let mut err = agent_client_protocol::Error::internal_error();
308                err.message = format!("internal error: {e}");
309                err
310            })?;
311            let real_child = std::mem::replace(&mut state.child, placeholder_child);
312            (real_child, exit_rc)
313        };
314
315        let status = child.wait().await.map_err(|e| {
316            let mut err = agent_client_protocol::Error::internal_error();
317            err.message = format!("wait failed: {e}");
318            err
319        })?;
320
321        let es = TerminalExitStatus::new().exit_code(status.code().map(|c| c as u32));
322        *exit_rc.borrow_mut() = Some(es.clone());
323
324        Ok(WaitForTerminalExitResponse::new(es))
325    }
326
327    async fn release_terminal(
328        &self,
329        args: ReleaseTerminalRequest,
330    ) -> agent_client_protocol::Result<ReleaseTerminalResponse> {
331        let mut state = self
332            .terminals
333            .borrow_mut()
334            .remove(args.terminal_id.0.as_ref())
335            .ok_or_else(|| {
336                let mut err = agent_client_protocol::Error::invalid_params();
337                err.message = format!("unknown terminal: {}", args.terminal_id);
338                err
339            })?;
340
341        let _ = state.child.kill().await;
342        Ok(ReleaseTerminalResponse::new())
343    }
344
345    async fn kill_terminal_command(
346        &self,
347        args: KillTerminalCommandRequest,
348    ) -> agent_client_protocol::Result<KillTerminalCommandResponse> {
349        let terminal_id = args.terminal_id.0.to_string();
350        let mut state = self
351            .terminals
352            .borrow_mut()
353            .remove(terminal_id.as_str())
354            .ok_or_else(|| {
355                let mut err = agent_client_protocol::Error::invalid_params();
356                err.message = format!("unknown terminal: {}", args.terminal_id);
357                err
358            })?;
359
360        let _ = state.child.kill().await;
361        // Try to capture exit status after kill
362        if let Ok(status) = state.child.try_wait()
363            && let Some(s) = status
364        {
365            *state.exit_status.borrow_mut() =
366                Some(TerminalExitStatus::new().exit_code(s.code().map(|c| c as u32)));
367        }
368
369        // Keep terminal state addressable after kill for subsequent output/wait requests.
370        self.terminals.borrow_mut().insert(terminal_id, state);
371
372        Ok(KillTerminalCommandResponse::new())
373    }
374}
375
376/// Drop guard that terminates the ACP child process.
377///
378/// When the `execute` future is cancelled (e.g., by `tokio::select!` on
379/// interrupt), destructors still run. This ensures the child process tree
380/// is cleaned up even if the normal cleanup code is never reached.
381/// Sends SIGTERM first for graceful shutdown, then SIGKILL.
382struct ChildKillGuard(Arc<Mutex<Option<u32>>>);
383
384impl Drop for ChildKillGuard {
385    fn drop(&mut self) {
386        if let Ok(guard) = self.0.lock()
387            && let Some(pid) = *guard
388        {
389            // Kill the entire process group (negative PID) so grandchildren
390            // (e.g. MCP servers) are also terminated — not just the direct child.
391            let pgid = nix::unistd::Pid::from_raw(-(pid as i32));
392            let _ = nix::sys::signal::kill(pgid, nix::sys::signal::Signal::SIGTERM);
393            std::thread::sleep(Duration::from_millis(100));
394            let _ = nix::sys::signal::kill(pgid, nix::sys::signal::Signal::SIGKILL);
395        }
396    }
397}
398
399/// Executor for ACP-based backends (kiro-acp).
400pub struct AcpExecutor {
401    backend: CliBackend,
402    workspace_root: PathBuf,
403}
404
405impl AcpExecutor {
406    pub fn new(backend: CliBackend, workspace_root: PathBuf) -> Self {
407        Self {
408            backend,
409            workspace_root,
410        }
411    }
412
413    /// Execute a single prompt turn via ACP.
414    ///
415    /// The ACP protocol runs on a dedicated thread (Client trait is `!Send`).
416    /// Events stream back via channel for real-time handler dispatch.
417    pub async fn execute<H: StreamHandler>(
418        &self,
419        prompt: &str,
420        handler: &mut H,
421    ) -> Result<PtyExecutionResult> {
422        let start = Instant::now();
423        let mut text_output = String::new();
424
425        let (tx, mut rx) = mpsc::unbounded_channel::<AcpEvent>();
426        let backend = self.backend.clone();
427        let workspace_root = self.workspace_root.clone();
428        let prompt_owned = prompt.to_string();
429
430        // Shared child PID for cleanup. Wrapped in a drop guard so the child
431        // is killed even when this future is cancelled by tokio::select!.
432        let child_pid = Arc::new(Mutex::new(None::<u32>));
433        let child_pid_inner = Arc::clone(&child_pid);
434        let _kill_guard = ChildKillGuard(Arc::clone(&child_pid));
435
436        // Run ACP lifecycle on a blocking thread with its own runtime
437        // (ClientSideConnection / Client trait is !Send)
438        let join_handle = tokio::task::spawn_blocking(move || {
439            let rt = tokio::runtime::Builder::new_current_thread()
440                .enable_all()
441                .build()
442                .expect("Failed to build ACP runtime");
443            let local = tokio::task::LocalSet::new();
444            local.block_on(
445                &rt,
446                run_acp_lifecycle(backend, workspace_root, prompt_owned, tx, child_pid_inner),
447            );
448        });
449
450        // Process streamed events until Done/Failed
451        let mut stop_reason = None;
452        let mut error_msg = None;
453        while let Some(event) = rx.recv().await {
454            match event {
455                AcpEvent::Text(t) => {
456                    text_output.push_str(&t);
457                    handler.on_text(&t);
458                }
459                AcpEvent::ToolCall { name, id, input } => {
460                    handler.on_tool_call(&name, &id, &input);
461                }
462                AcpEvent::ToolResult { id, output } => {
463                    handler.on_tool_result(&id, &output);
464                }
465                AcpEvent::Error(e) => {
466                    handler.on_error(&e);
467                }
468                AcpEvent::Done(reason) => {
469                    stop_reason = Some(reason);
470                    break;
471                }
472                AcpEvent::Failed(msg) => {
473                    error_msg = Some(msg);
474                    break;
475                }
476            }
477        }
478
479        // Ensure the entire process tree is killed even if the blocking task is still running.
480        if let Ok(guard) = child_pid.lock()
481            && let Some(pid) = *guard
482        {
483            let pgid = nix::unistd::Pid::from_raw(-(pid as i32));
484            let _ = nix::sys::signal::kill(pgid, nix::sys::signal::Signal::SIGKILL);
485        }
486
487        // Wait for the blocking task to finish so it doesn't leak.
488        let _ = join_handle.await;
489
490        let duration_ms = start.elapsed().as_millis() as u64;
491        let (success, is_error) = if let Some(reason) = stop_reason {
492            match reason {
493                StopReason::EndTurn | StopReason::MaxTokens | StopReason::MaxTurnRequests => {
494                    (true, false)
495                }
496                _ => (false, true),
497            }
498        } else if let Some(msg) = error_msg {
499            handler.on_error(&format!("ACP session failed: {}", msg));
500            (false, true)
501        } else {
502            warn!("ACP channel closed without completion");
503            (false, true)
504        };
505
506        handler.on_complete(&SessionResult {
507            duration_ms,
508            total_cost_usd: 0.0,
509            num_turns: 1,
510            is_error,
511            input_tokens: 0,
512            output_tokens: 0,
513            cache_read_tokens: 0,
514            cache_write_tokens: 0,
515        });
516
517        Ok(PtyExecutionResult {
518            output: text_output.clone(),
519            stripped_output: text_output.clone(),
520            extracted_text: text_output,
521            success,
522            exit_code: if success { Some(0) } else { Some(1) },
523            termination: TerminationType::Natural,
524            total_cost_usd: 0.0,
525            input_tokens: 0,
526            output_tokens: 0,
527            cache_read_tokens: 0,
528            cache_write_tokens: 0,
529        })
530    }
531}
532
533/// Runs the full ACP lifecycle on a LocalSet (single-threaded).
534async fn run_acp_lifecycle(
535    backend: CliBackend,
536    workspace_root: PathBuf,
537    prompt: String,
538    tx: mpsc::UnboundedSender<AcpEvent>,
539    child_pid: Arc<Mutex<Option<u32>>>,
540) {
541    if let Err(e) =
542        run_acp_lifecycle_inner(&backend, &workspace_root, &prompt, &tx, &child_pid).await
543    {
544        let _ = tx.send(AcpEvent::Failed(e.to_string()));
545    }
546}
547
548async fn run_acp_lifecycle_inner(
549    backend: &CliBackend,
550    workspace_root: &PathBuf,
551    prompt: &str,
552    tx: &mpsc::UnboundedSender<AcpEvent>,
553    child_pid: &Arc<Mutex<Option<u32>>>,
554) -> Result<()> {
555    // Spawn child process in its own process group so we can kill the
556    // entire tree (including MCP servers) with a single group signal.
557    let mut cmd = tokio::process::Command::new(&backend.command);
558    cmd.args(&backend.args)
559        .stdin(std::process::Stdio::piped())
560        .stdout(std::process::Stdio::piped())
561        .stderr(std::process::Stdio::piped())
562        .kill_on_drop(true);
563
564    #[cfg(unix)]
565    cmd.process_group(0);
566
567    let mut child = cmd.spawn().context("Failed to spawn ACP process")?;
568
569    // Record PID so the caller can kill the process if needed.
570    if let Some(pid) = child.id()
571        && let Ok(mut guard) = child_pid.lock()
572    {
573        *guard = Some(pid);
574    }
575
576    let child_stdin = child.stdin.take().context("No stdin")?;
577    let child_stdout = child.stdout.take().context("No stdout")?;
578
579    // Log stderr from kiro-cli so we can see errors
580    if let Some(stderr) = child.stderr.take() {
581        tokio::task::spawn_local(async move {
582            let mut reader = tokio::io::BufReader::new(stderr);
583            let mut line = String::new();
584            use tokio::io::AsyncBufReadExt;
585            while reader.read_line(&mut line).await.unwrap_or(0) > 0 {
586                warn!("kiro-cli stderr: {}", line.trim_end());
587                line.clear();
588            }
589        });
590    }
591
592    let terminals: Terminals = Rc::new(RefCell::new(HashMap::new()));
593    let client = RalphAcpClient {
594        tx: tx.clone(),
595        terminals: Rc::clone(&terminals),
596    };
597
598    let (conn, io_task) = ClientSideConnection::new(
599        client,
600        child_stdin.compat_write(),
601        child_stdout.compat(),
602        |fut| {
603            tokio::task::spawn_local(fut);
604        },
605    );
606
607    tokio::task::spawn_local(async move {
608        if let Err(e) = io_task.await {
609            debug!("ACP IO task ended: {}", e);
610        }
611    });
612
613    // Initialize
614    let init_req = InitializeRequest::new(ProtocolVersion::LATEST)
615        .client_info(agent_client_protocol::Implementation::new(
616            "ralph-orchestrator",
617            env!("CARGO_PKG_VERSION"),
618        ))
619        .client_capabilities(agent_client_protocol::ClientCapabilities::new().terminal(true));
620    conn.initialize(init_req)
621        .await
622        .context("ACP initialize failed")?;
623
624    debug!("ACP initialize succeeded");
625
626    // New session
627    let session = conn
628        .new_session(NewSessionRequest::new(workspace_root))
629        .await
630        .context("ACP session/new failed")?;
631
632    debug!("ACP session created: {}", session.session_id);
633
634    // Send prompt
635    let session_id = session.session_id.clone();
636    debug!("ACP sending prompt...");
637    let response = conn
638        .prompt(PromptRequest::new(
639            session.session_id,
640            vec![ContentBlock::Text(TextContent::new(prompt))],
641        ))
642        .await
643        .context("ACP session/prompt failed")?;
644
645    let _ = tx.send(AcpEvent::Done(response.stop_reason));
646
647    // Kill all active terminals before shutting down
648    let active_terminals: Vec<_> = terminals.borrow_mut().drain().collect();
649    for (_, mut state) in active_terminals {
650        let _ = state.child.kill().await;
651    }
652
653    // Graceful shutdown: cancel the session so kiro-cli can clean up MCP servers
654    let _ = conn.cancel(CancelNotification::new(session_id)).await;
655
656    // Give the process a moment to exit cleanly, then force-kill
657    match tokio::time::timeout(Duration::from_secs(2), child.wait()).await {
658        Ok(_) => {}
659        Err(_) => {
660            let _ = child.kill().await;
661        }
662    }
663
664    Ok(())
665}
666
667#[cfg(test)]
668mod tests {
669    use super::*;
670    use agent_client_protocol::Client;
671
672    #[test]
673    fn test_acp_executor_new() {
674        let backend = CliBackend::kiro_acp();
675        let executor = AcpExecutor::new(backend, PathBuf::from("/tmp"));
676        assert_eq!(executor.backend.command, "kiro-cli");
677        assert_eq!(executor.workspace_root, PathBuf::from("/tmp"));
678    }
679
680    /// AcpEvent::Failed should produce a graceful error, not crash the loop.
681    #[tokio::test]
682    async fn test_acp_failed_event_returns_error_not_panic() {
683        let (tx, rx) = mpsc::unbounded_channel::<AcpEvent>();
684
685        // Simulate a failed ACP session
686        tx.send(AcpEvent::Text("partial output".to_string()))
687            .unwrap();
688        tx.send(AcpEvent::Failed("session/prompt failed".to_string()))
689            .unwrap();
690        drop(tx);
691
692        // Process events the same way execute() does
693        let mut handler = TestHandler::default();
694        let mut text_output = String::new();
695        let mut stop_reason = None;
696        let mut error_msg = None;
697        let mut rx = rx;
698
699        while let Some(event) = rx.recv().await {
700            match event {
701                AcpEvent::Text(t) => {
702                    text_output.push_str(&t);
703                    handler.on_text(&t);
704                }
705                AcpEvent::ToolCall { name, id, input } => {
706                    handler.on_tool_call(&name, &id, &input);
707                }
708                AcpEvent::ToolResult { id, output } => {
709                    handler.on_tool_result(&id, &output);
710                }
711                AcpEvent::Error(e) => {
712                    handler.on_error(&e);
713                }
714                AcpEvent::Done(reason) => {
715                    stop_reason = Some(reason);
716                    break;
717                }
718                AcpEvent::Failed(msg) => {
719                    error_msg = Some(msg);
720                    break;
721                }
722            }
723        }
724
725        // Should have captured the error, not panicked
726        assert!(stop_reason.is_none());
727        assert!(error_msg.is_some());
728        assert!(error_msg.unwrap().contains("session/prompt failed"));
729        assert!(text_output.contains("partial"));
730    }
731
732    #[derive(Default)]
733    struct TestHandler {
734        errors: Vec<String>,
735    }
736
737    impl StreamHandler for TestHandler {
738        fn on_text(&mut self, _: &str) {}
739        fn on_tool_call(&mut self, _: &str, _: &str, _: &serde_json::Value) {}
740        fn on_tool_result(&mut self, _: &str, _: &str) {}
741        fn on_error(&mut self, error: &str) {
742            self.errors.push(error.to_string());
743        }
744        fn on_complete(&mut self, _: &SessionResult) {}
745    }
746
747    /// Helper to create a RalphAcpClient with a terminals map for testing.
748    fn test_client() -> (RalphAcpClient, mpsc::UnboundedReceiver<AcpEvent>, Terminals) {
749        let (tx, rx) = mpsc::unbounded_channel();
750        let terminals: Terminals = Rc::new(RefCell::new(HashMap::new()));
751        let client = RalphAcpClient {
752            tx,
753            terminals: Rc::clone(&terminals),
754        };
755        (client, rx, terminals)
756    }
757
758    #[tokio::test]
759    async fn test_create_terminal_and_output() {
760        let local = tokio::task::LocalSet::new();
761        local
762            .run_until(async {
763                let (client, _rx, terminals) = test_client();
764
765                let req = CreateTerminalRequest::new("test-session", "echo")
766                    .args(vec!["hello world".into()]);
767                let resp = client.create_terminal(req).await.unwrap();
768
769                // Terminal should be tracked
770                assert!(terminals.borrow().contains_key(resp.terminal_id.0.as_ref()));
771
772                // Wait for exit
773                let wait_req =
774                    WaitForTerminalExitRequest::new("test-session", resp.terminal_id.clone());
775                let wait_resp = client.wait_for_terminal_exit(wait_req).await.unwrap();
776                assert_eq!(wait_resp.exit_status.exit_code, Some(0));
777
778                // Give background reader a moment to finish
779                tokio::time::sleep(Duration::from_millis(100)).await;
780                tokio::task::yield_now().await;
781
782                // Get output
783                let out_req = TerminalOutputRequest::new("test-session", resp.terminal_id.clone());
784                let out_resp = client.terminal_output(out_req).await.unwrap();
785                assert!(
786                    out_resp.output.contains("hello world"),
787                    "expected 'hello world' in output: {:?}",
788                    out_resp.output
789                );
790                assert!(out_resp.exit_status.is_some());
791            })
792            .await;
793    }
794
795    #[tokio::test]
796    async fn test_release_terminal_removes_from_map() {
797        let local = tokio::task::LocalSet::new();
798        local
799            .run_until(async {
800                let (client, _rx, terminals) = test_client();
801
802                let req =
803                    CreateTerminalRequest::new("test-session", "sleep").args(vec!["60".into()]);
804                let resp = client.create_terminal(req).await.unwrap();
805                let tid = resp.terminal_id.clone();
806
807                assert!(terminals.borrow().contains_key(tid.0.as_ref()));
808
809                let rel_req = ReleaseTerminalRequest::new("test-session", tid.clone());
810                client.release_terminal(rel_req).await.unwrap();
811
812                assert!(!terminals.borrow().contains_key(tid.0.as_ref()));
813            })
814            .await;
815    }
816
817    #[tokio::test]
818    async fn test_kill_terminal_keeps_in_map() {
819        let local = tokio::task::LocalSet::new();
820        local
821            .run_until(async {
822                let (client, _rx, terminals) = test_client();
823
824                let req =
825                    CreateTerminalRequest::new("test-session", "sleep").args(vec!["60".into()]);
826                let resp = client.create_terminal(req).await.unwrap();
827                let tid = resp.terminal_id.clone();
828
829                let kill_req = KillTerminalCommandRequest::new("test-session", tid.clone());
830                client.kill_terminal_command(kill_req).await.unwrap();
831
832                // Should still be in the map
833                assert!(terminals.borrow().contains_key(tid.0.as_ref()));
834            })
835            .await;
836    }
837
838    #[tokio::test]
839    async fn test_terminal_output_unknown_id_errors() {
840        let local = tokio::task::LocalSet::new();
841        local
842            .run_until(async {
843                let (client, _rx, _terminals) = test_client();
844
845                let req = TerminalOutputRequest::new("test-session", "nonexistent");
846                let result = client.terminal_output(req).await;
847                assert!(result.is_err());
848            })
849            .await;
850    }
851
852    #[tokio::test]
853    async fn test_terminal_failed_command_exit_code() {
854        let local = tokio::task::LocalSet::new();
855        local
856            .run_until(async {
857                let (client, _rx, _terminals) = test_client();
858
859                let req = CreateTerminalRequest::new("test-session", "false");
860                let resp = client.create_terminal(req).await.unwrap();
861
862                let wait_req =
863                    WaitForTerminalExitRequest::new("test-session", resp.terminal_id.clone());
864                let wait_resp = client.wait_for_terminal_exit(wait_req).await.unwrap();
865                assert_ne!(wait_resp.exit_status.exit_code, Some(0));
866            })
867            .await;
868    }
869}