1use std::cell::RefCell;
11use std::collections::HashMap;
12use std::path::PathBuf;
13use std::rc::Rc;
14use std::sync::{Arc, Mutex};
15use std::time::Instant;
16
17use agent_client_protocol::{
18 Agent, CancelNotification, ClientSideConnection, ContentBlock, CreateTerminalRequest,
19 CreateTerminalResponse, InitializeRequest, KillTerminalCommandRequest,
20 KillTerminalCommandResponse, NewSessionRequest, PromptRequest, ProtocolVersion,
21 ReleaseTerminalRequest, ReleaseTerminalResponse, RequestPermissionOutcome,
22 RequestPermissionRequest, RequestPermissionResponse, SelectedPermissionOutcome,
23 SessionNotification, SessionUpdate, StopReason, TerminalExitStatus, TerminalId,
24 TerminalOutputRequest, TerminalOutputResponse, TextContent, ToolCallStatus,
25 WaitForTerminalExitRequest, WaitForTerminalExitResponse,
26};
27use anyhow::{Context, Result};
28use std::time::Duration;
29use tokio::io::AsyncReadExt;
30use tokio::sync::mpsc;
31use tokio_util::compat::{TokioAsyncReadCompatExt, TokioAsyncWriteCompatExt};
32use tracing::{debug, warn};
33
34use crate::cli_backend::CliBackend;
35use crate::pty_executor::{PtyExecutionResult, TerminationType};
36use crate::stream_handler::{SessionResult, StreamHandler};
37
38enum AcpEvent {
40 Text(String),
41 ToolCall {
42 name: String,
43 id: String,
44 input: serde_json::Value,
45 },
46 ToolResult {
47 id: String,
48 output: String,
49 },
50 #[allow(dead_code)]
51 Error(String),
52 Done(StopReason),
54 Failed(String),
56}
57
58struct TerminalState {
60 child: tokio::process::Child,
61 output: Rc<RefCell<Vec<u8>>>,
62 exit_status: Rc<RefCell<Option<TerminalExitStatus>>>,
63 output_byte_limit: Option<u64>,
64}
65
66type Terminals = Rc<RefCell<HashMap<String, TerminalState>>>;
67
68struct RalphAcpClient {
73 tx: mpsc::UnboundedSender<AcpEvent>,
74 terminals: Terminals,
75}
76
77#[async_trait::async_trait(?Send)]
78impl agent_client_protocol::Client for RalphAcpClient {
79 async fn request_permission(
80 &self,
81 args: RequestPermissionRequest,
82 ) -> agent_client_protocol::Result<RequestPermissionResponse> {
83 let option_id = args
84 .options
85 .first()
86 .map(|o| o.option_id.clone())
87 .unwrap_or_else(|| "allowed".into());
88 Ok(RequestPermissionResponse::new(
89 RequestPermissionOutcome::Selected(SelectedPermissionOutcome::new(option_id)),
90 ))
91 }
92
93 async fn session_notification(
94 &self,
95 args: SessionNotification,
96 ) -> agent_client_protocol::Result<()> {
97 match args.update {
98 SessionUpdate::AgentMessageChunk(chunk) => {
99 if let ContentBlock::Text(text) = chunk.content {
100 let _ = self.tx.send(AcpEvent::Text(text.text));
101 }
102 }
103 SessionUpdate::ToolCall(tc) => {
104 if tc.raw_input.is_none() && tc.locations.is_empty() {
109 return Ok(());
110 }
111
112 let input = tc.raw_input.clone().unwrap_or_else(|| {
113 if let Some(loc) = tc.locations.first() {
114 serde_json::json!({"path": loc.path.display().to_string()})
115 } else {
116 serde_json::Value::Null
117 }
118 });
119 let _ = self.tx.send(AcpEvent::ToolCall {
120 name: tc.title.clone(),
121 id: tc.tool_call_id.to_string(),
122 input,
123 });
124 }
125 SessionUpdate::ToolCallUpdate(update) => {
126 if update.fields.status == Some(ToolCallStatus::Completed) {
127 let output = update
129 .fields
130 .content
131 .as_ref()
132 .and_then(|c| {
133 c.iter().find_map(|block| {
134 if let agent_client_protocol::ToolCallContent::Content(content) =
135 block
136 && let ContentBlock::Text(t) = &content.content
137 {
138 return Some(t.text.clone());
139 }
140 None
141 })
142 })
143 .or_else(|| {
144 update.fields.raw_output.as_ref().map(|v| match v {
145 serde_json::Value::String(s) => s.clone(),
146 other => other.to_string(),
147 })
148 })
149 .unwrap_or_default();
150 let _ = self.tx.send(AcpEvent::ToolResult {
151 id: update.tool_call_id.to_string(),
152 output,
153 });
154 }
155 }
156 SessionUpdate::Plan(plan) => {
157 let text = plan
158 .entries
159 .iter()
160 .map(|e| format!("- {}", e.content))
161 .collect::<Vec<_>>()
162 .join("\n");
163 if !text.is_empty() {
164 let _ = self
165 .tx
166 .send(AcpEvent::Text(format!("\n## Plan\n{}\n", text)));
167 }
168 }
169 _ => {}
170 }
171 Ok(())
172 }
173
174 async fn create_terminal(
175 &self,
176 args: CreateTerminalRequest,
177 ) -> agent_client_protocol::Result<CreateTerminalResponse> {
178 debug!("ACP create_terminal: {} {:?}", args.command, args.args);
179 let mut cmd = tokio::process::Command::new(&args.command);
180 cmd.args(&args.args)
181 .stdout(std::process::Stdio::piped())
182 .stderr(std::process::Stdio::piped())
183 .stdin(std::process::Stdio::null());
184
185 if let Some(cwd) = &args.cwd {
186 cmd.current_dir(cwd);
187 }
188 for env_var in &args.env {
189 cmd.env(&env_var.name, &env_var.value);
190 }
191
192 let mut child = cmd.spawn().map_err(|e| {
193 let mut err = agent_client_protocol::Error::internal_error();
194 err.message = format!("spawn failed: {e}");
195 err
196 })?;
197
198 let id = format!("term-{}", child.id().unwrap_or(0));
199 let output_buf = Rc::new(RefCell::new(Vec::new()));
200 let exit_status = Rc::new(RefCell::new(None));
201
202 let stdout = child.stdout.take();
204 let stderr = child.stderr.take();
205 let buf_clone = Rc::clone(&output_buf);
206 let exit_clone = Rc::clone(&exit_status);
207 let limit = args.output_byte_limit;
208
209 tokio::task::spawn_local(async move {
210 let mut combined = Vec::new();
211 if let Some(mut out) = stdout {
212 let mut tmp = vec![0u8; 8192];
213 loop {
214 match out.read(&mut tmp).await {
215 Ok(0) => break,
216 Ok(n) => combined.extend_from_slice(&tmp[..n]),
217 Err(_) => break,
218 }
219 }
220 }
221 if let Some(mut err) = stderr {
222 let mut tmp = vec![0u8; 8192];
223 loop {
224 match err.read(&mut tmp).await {
225 Ok(0) => break,
226 Ok(n) => combined.extend_from_slice(&tmp[..n]),
227 Err(_) => break,
228 }
229 }
230 }
231 if let Some(max) = limit {
233 let max = max as usize;
234 if combined.len() > max {
235 let start = combined.len() - max;
237 let s = String::from_utf8_lossy(&combined[start..]);
238 combined = s.into_owned().into_bytes();
239 }
240 }
241 *buf_clone.borrow_mut() = combined;
242 let _ = exit_clone;
244 });
245
246 self.terminals.borrow_mut().insert(
247 id.clone(),
248 TerminalState {
249 child,
250 output: output_buf,
251 exit_status,
252 output_byte_limit: args.output_byte_limit,
253 },
254 );
255
256 Ok(CreateTerminalResponse::new(TerminalId::new(id)))
257 }
258
259 async fn terminal_output(
260 &self,
261 args: TerminalOutputRequest,
262 ) -> agent_client_protocol::Result<TerminalOutputResponse> {
263 let terminals = self.terminals.borrow();
264 let state = terminals.get(args.terminal_id.0.as_ref()).ok_or_else(|| {
265 let mut err = agent_client_protocol::Error::invalid_params();
266 err.message = format!("unknown terminal: {}", args.terminal_id);
267 err
268 })?;
269
270 let buf = state.output.borrow();
271 let output = String::from_utf8_lossy(&buf).into_owned();
272 let truncated = state
273 .output_byte_limit
274 .is_some_and(|limit| buf.len() >= limit as usize);
275 let exit_status = state.exit_status.borrow().clone();
276
277 Ok(TerminalOutputResponse::new(output, truncated).exit_status(exit_status))
278 }
279
280 async fn wait_for_terminal_exit(
281 &self,
282 args: WaitForTerminalExitRequest,
283 ) -> agent_client_protocol::Result<WaitForTerminalExitResponse> {
284 let (mut child, exit_rc) = {
286 let mut terminals = self.terminals.borrow_mut();
287 let state = terminals
288 .get_mut(args.terminal_id.0.as_ref())
289 .ok_or_else(|| {
290 let mut err = agent_client_protocol::Error::invalid_params();
291 err.message = format!("unknown terminal: {}", args.terminal_id);
292 err
293 })?;
294 let exit_rc = Rc::clone(&state.exit_status);
295 if let Some(status) = state.exit_status.borrow().as_ref() {
297 return Ok(WaitForTerminalExitResponse::new(status.clone()));
298 }
299 if let Ok(Some(status)) = state.child.try_wait() {
301 let es = TerminalExitStatus::new().exit_code(status.code().map(|c| c as u32));
302 *state.exit_status.borrow_mut() = Some(es.clone());
303 return Ok(WaitForTerminalExitResponse::new(es));
304 }
305 let placeholder_child = tokio::process::Command::new("true").spawn().map_err(|e| {
307 let mut err = agent_client_protocol::Error::internal_error();
308 err.message = format!("internal error: {e}");
309 err
310 })?;
311 let real_child = std::mem::replace(&mut state.child, placeholder_child);
312 (real_child, exit_rc)
313 };
314
315 let status = child.wait().await.map_err(|e| {
316 let mut err = agent_client_protocol::Error::internal_error();
317 err.message = format!("wait failed: {e}");
318 err
319 })?;
320
321 let es = TerminalExitStatus::new().exit_code(status.code().map(|c| c as u32));
322 *exit_rc.borrow_mut() = Some(es.clone());
323
324 Ok(WaitForTerminalExitResponse::new(es))
325 }
326
327 async fn release_terminal(
328 &self,
329 args: ReleaseTerminalRequest,
330 ) -> agent_client_protocol::Result<ReleaseTerminalResponse> {
331 let mut state = self
332 .terminals
333 .borrow_mut()
334 .remove(args.terminal_id.0.as_ref())
335 .ok_or_else(|| {
336 let mut err = agent_client_protocol::Error::invalid_params();
337 err.message = format!("unknown terminal: {}", args.terminal_id);
338 err
339 })?;
340
341 let _ = state.child.kill().await;
342 Ok(ReleaseTerminalResponse::new())
343 }
344
345 async fn kill_terminal_command(
346 &self,
347 args: KillTerminalCommandRequest,
348 ) -> agent_client_protocol::Result<KillTerminalCommandResponse> {
349 let terminal_id = args.terminal_id.0.to_string();
350 let mut state = self
351 .terminals
352 .borrow_mut()
353 .remove(terminal_id.as_str())
354 .ok_or_else(|| {
355 let mut err = agent_client_protocol::Error::invalid_params();
356 err.message = format!("unknown terminal: {}", args.terminal_id);
357 err
358 })?;
359
360 let _ = state.child.kill().await;
361 if let Ok(status) = state.child.try_wait()
363 && let Some(s) = status
364 {
365 *state.exit_status.borrow_mut() =
366 Some(TerminalExitStatus::new().exit_code(s.code().map(|c| c as u32)));
367 }
368
369 self.terminals.borrow_mut().insert(terminal_id, state);
371
372 Ok(KillTerminalCommandResponse::new())
373 }
374}
375
376struct ChildKillGuard(Arc<Mutex<Option<u32>>>);
383
384impl Drop for ChildKillGuard {
385 fn drop(&mut self) {
386 if let Ok(guard) = self.0.lock()
387 && let Some(pid) = *guard
388 {
389 let pgid = nix::unistd::Pid::from_raw(-(pid as i32));
392 let _ = nix::sys::signal::kill(pgid, nix::sys::signal::Signal::SIGTERM);
393 std::thread::sleep(Duration::from_millis(100));
394 let _ = nix::sys::signal::kill(pgid, nix::sys::signal::Signal::SIGKILL);
395 }
396 }
397}
398
399pub struct AcpExecutor {
401 backend: CliBackend,
402 workspace_root: PathBuf,
403}
404
405impl AcpExecutor {
406 pub fn new(backend: CliBackend, workspace_root: PathBuf) -> Self {
407 Self {
408 backend,
409 workspace_root,
410 }
411 }
412
413 pub async fn execute<H: StreamHandler>(
418 &self,
419 prompt: &str,
420 handler: &mut H,
421 ) -> Result<PtyExecutionResult> {
422 let start = Instant::now();
423 let mut text_output = String::new();
424
425 let (tx, mut rx) = mpsc::unbounded_channel::<AcpEvent>();
426 let backend = self.backend.clone();
427 let workspace_root = self.workspace_root.clone();
428 let prompt_owned = prompt.to_string();
429
430 let child_pid = Arc::new(Mutex::new(None::<u32>));
433 let child_pid_inner = Arc::clone(&child_pid);
434 let _kill_guard = ChildKillGuard(Arc::clone(&child_pid));
435
436 let join_handle = tokio::task::spawn_blocking(move || {
439 let rt = tokio::runtime::Builder::new_current_thread()
440 .enable_all()
441 .build()
442 .expect("Failed to build ACP runtime");
443 let local = tokio::task::LocalSet::new();
444 local.block_on(
445 &rt,
446 run_acp_lifecycle(backend, workspace_root, prompt_owned, tx, child_pid_inner),
447 );
448 });
449
450 let mut stop_reason = None;
452 let mut error_msg = None;
453 while let Some(event) = rx.recv().await {
454 match event {
455 AcpEvent::Text(t) => {
456 text_output.push_str(&t);
457 handler.on_text(&t);
458 }
459 AcpEvent::ToolCall { name, id, input } => {
460 handler.on_tool_call(&name, &id, &input);
461 }
462 AcpEvent::ToolResult { id, output } => {
463 handler.on_tool_result(&id, &output);
464 }
465 AcpEvent::Error(e) => {
466 handler.on_error(&e);
467 }
468 AcpEvent::Done(reason) => {
469 stop_reason = Some(reason);
470 break;
471 }
472 AcpEvent::Failed(msg) => {
473 error_msg = Some(msg);
474 break;
475 }
476 }
477 }
478
479 if let Ok(guard) = child_pid.lock()
481 && let Some(pid) = *guard
482 {
483 let pgid = nix::unistd::Pid::from_raw(-(pid as i32));
484 let _ = nix::sys::signal::kill(pgid, nix::sys::signal::Signal::SIGKILL);
485 }
486
487 let _ = join_handle.await;
489
490 let duration_ms = start.elapsed().as_millis() as u64;
491 let (success, is_error) = if let Some(reason) = stop_reason {
492 match reason {
493 StopReason::EndTurn | StopReason::MaxTokens | StopReason::MaxTurnRequests => {
494 (true, false)
495 }
496 _ => (false, true),
497 }
498 } else if let Some(msg) = error_msg {
499 handler.on_error(&format!("ACP session failed: {}", msg));
500 (false, true)
501 } else {
502 warn!("ACP channel closed without completion");
503 (false, true)
504 };
505
506 handler.on_complete(&SessionResult {
507 duration_ms,
508 total_cost_usd: 0.0,
509 num_turns: 1,
510 is_error,
511 input_tokens: 0,
512 output_tokens: 0,
513 cache_read_tokens: 0,
514 cache_write_tokens: 0,
515 });
516
517 Ok(PtyExecutionResult {
518 output: text_output.clone(),
519 stripped_output: text_output.clone(),
520 extracted_text: text_output,
521 success,
522 exit_code: if success { Some(0) } else { Some(1) },
523 termination: TerminationType::Natural,
524 total_cost_usd: 0.0,
525 input_tokens: 0,
526 output_tokens: 0,
527 cache_read_tokens: 0,
528 cache_write_tokens: 0,
529 })
530 }
531}
532
533async fn run_acp_lifecycle(
535 backend: CliBackend,
536 workspace_root: PathBuf,
537 prompt: String,
538 tx: mpsc::UnboundedSender<AcpEvent>,
539 child_pid: Arc<Mutex<Option<u32>>>,
540) {
541 if let Err(e) =
542 run_acp_lifecycle_inner(&backend, &workspace_root, &prompt, &tx, &child_pid).await
543 {
544 let _ = tx.send(AcpEvent::Failed(e.to_string()));
545 }
546}
547
548async fn run_acp_lifecycle_inner(
549 backend: &CliBackend,
550 workspace_root: &PathBuf,
551 prompt: &str,
552 tx: &mpsc::UnboundedSender<AcpEvent>,
553 child_pid: &Arc<Mutex<Option<u32>>>,
554) -> Result<()> {
555 let mut cmd = tokio::process::Command::new(&backend.command);
558 cmd.args(&backend.args)
559 .stdin(std::process::Stdio::piped())
560 .stdout(std::process::Stdio::piped())
561 .stderr(std::process::Stdio::piped())
562 .kill_on_drop(true);
563
564 #[cfg(unix)]
565 cmd.process_group(0);
566
567 let mut child = cmd.spawn().context("Failed to spawn ACP process")?;
568
569 if let Some(pid) = child.id()
571 && let Ok(mut guard) = child_pid.lock()
572 {
573 *guard = Some(pid);
574 }
575
576 let child_stdin = child.stdin.take().context("No stdin")?;
577 let child_stdout = child.stdout.take().context("No stdout")?;
578
579 if let Some(stderr) = child.stderr.take() {
581 tokio::task::spawn_local(async move {
582 let mut reader = tokio::io::BufReader::new(stderr);
583 let mut line = String::new();
584 use tokio::io::AsyncBufReadExt;
585 while reader.read_line(&mut line).await.unwrap_or(0) > 0 {
586 warn!("kiro-cli stderr: {}", line.trim_end());
587 line.clear();
588 }
589 });
590 }
591
592 let terminals: Terminals = Rc::new(RefCell::new(HashMap::new()));
593 let client = RalphAcpClient {
594 tx: tx.clone(),
595 terminals: Rc::clone(&terminals),
596 };
597
598 let (conn, io_task) = ClientSideConnection::new(
599 client,
600 child_stdin.compat_write(),
601 child_stdout.compat(),
602 |fut| {
603 tokio::task::spawn_local(fut);
604 },
605 );
606
607 tokio::task::spawn_local(async move {
608 if let Err(e) = io_task.await {
609 debug!("ACP IO task ended: {}", e);
610 }
611 });
612
613 let init_req = InitializeRequest::new(ProtocolVersion::LATEST)
615 .client_info(agent_client_protocol::Implementation::new(
616 "ralph-orchestrator",
617 env!("CARGO_PKG_VERSION"),
618 ))
619 .client_capabilities(agent_client_protocol::ClientCapabilities::new().terminal(true));
620 conn.initialize(init_req)
621 .await
622 .context("ACP initialize failed")?;
623
624 debug!("ACP initialize succeeded");
625
626 let session = conn
628 .new_session(NewSessionRequest::new(workspace_root))
629 .await
630 .context("ACP session/new failed")?;
631
632 debug!("ACP session created: {}", session.session_id);
633
634 let session_id = session.session_id.clone();
636 debug!("ACP sending prompt...");
637 let response = conn
638 .prompt(PromptRequest::new(
639 session.session_id,
640 vec![ContentBlock::Text(TextContent::new(prompt))],
641 ))
642 .await
643 .context("ACP session/prompt failed")?;
644
645 let _ = tx.send(AcpEvent::Done(response.stop_reason));
646
647 let active_terminals: Vec<_> = terminals.borrow_mut().drain().collect();
649 for (_, mut state) in active_terminals {
650 let _ = state.child.kill().await;
651 }
652
653 let _ = conn.cancel(CancelNotification::new(session_id)).await;
655
656 match tokio::time::timeout(Duration::from_secs(2), child.wait()).await {
658 Ok(_) => {}
659 Err(_) => {
660 let _ = child.kill().await;
661 }
662 }
663
664 Ok(())
665}
666
667#[cfg(test)]
668mod tests {
669 use super::*;
670 use agent_client_protocol::Client;
671
672 #[test]
673 fn test_acp_executor_new() {
674 let backend = CliBackend::kiro_acp();
675 let executor = AcpExecutor::new(backend, PathBuf::from("/tmp"));
676 assert_eq!(executor.backend.command, "kiro-cli");
677 assert_eq!(executor.workspace_root, PathBuf::from("/tmp"));
678 }
679
680 #[tokio::test]
682 async fn test_acp_failed_event_returns_error_not_panic() {
683 let (tx, rx) = mpsc::unbounded_channel::<AcpEvent>();
684
685 tx.send(AcpEvent::Text("partial output".to_string()))
687 .unwrap();
688 tx.send(AcpEvent::Failed("session/prompt failed".to_string()))
689 .unwrap();
690 drop(tx);
691
692 let mut handler = TestHandler::default();
694 let mut text_output = String::new();
695 let mut stop_reason = None;
696 let mut error_msg = None;
697 let mut rx = rx;
698
699 while let Some(event) = rx.recv().await {
700 match event {
701 AcpEvent::Text(t) => {
702 text_output.push_str(&t);
703 handler.on_text(&t);
704 }
705 AcpEvent::ToolCall { name, id, input } => {
706 handler.on_tool_call(&name, &id, &input);
707 }
708 AcpEvent::ToolResult { id, output } => {
709 handler.on_tool_result(&id, &output);
710 }
711 AcpEvent::Error(e) => {
712 handler.on_error(&e);
713 }
714 AcpEvent::Done(reason) => {
715 stop_reason = Some(reason);
716 break;
717 }
718 AcpEvent::Failed(msg) => {
719 error_msg = Some(msg);
720 break;
721 }
722 }
723 }
724
725 assert!(stop_reason.is_none());
727 assert!(error_msg.is_some());
728 assert!(error_msg.unwrap().contains("session/prompt failed"));
729 assert!(text_output.contains("partial"));
730 }
731
732 #[derive(Default)]
733 struct TestHandler {
734 errors: Vec<String>,
735 }
736
737 impl StreamHandler for TestHandler {
738 fn on_text(&mut self, _: &str) {}
739 fn on_tool_call(&mut self, _: &str, _: &str, _: &serde_json::Value) {}
740 fn on_tool_result(&mut self, _: &str, _: &str) {}
741 fn on_error(&mut self, error: &str) {
742 self.errors.push(error.to_string());
743 }
744 fn on_complete(&mut self, _: &SessionResult) {}
745 }
746
747 fn test_client() -> (RalphAcpClient, mpsc::UnboundedReceiver<AcpEvent>, Terminals) {
749 let (tx, rx) = mpsc::unbounded_channel();
750 let terminals: Terminals = Rc::new(RefCell::new(HashMap::new()));
751 let client = RalphAcpClient {
752 tx,
753 terminals: Rc::clone(&terminals),
754 };
755 (client, rx, terminals)
756 }
757
758 #[tokio::test]
759 async fn test_create_terminal_and_output() {
760 let local = tokio::task::LocalSet::new();
761 local
762 .run_until(async {
763 let (client, _rx, terminals) = test_client();
764
765 let req = CreateTerminalRequest::new("test-session", "echo")
766 .args(vec!["hello world".into()]);
767 let resp = client.create_terminal(req).await.unwrap();
768
769 assert!(terminals.borrow().contains_key(resp.terminal_id.0.as_ref()));
771
772 let wait_req =
774 WaitForTerminalExitRequest::new("test-session", resp.terminal_id.clone());
775 let wait_resp = client.wait_for_terminal_exit(wait_req).await.unwrap();
776 assert_eq!(wait_resp.exit_status.exit_code, Some(0));
777
778 tokio::time::sleep(Duration::from_millis(100)).await;
780 tokio::task::yield_now().await;
781
782 let out_req = TerminalOutputRequest::new("test-session", resp.terminal_id.clone());
784 let out_resp = client.terminal_output(out_req).await.unwrap();
785 assert!(
786 out_resp.output.contains("hello world"),
787 "expected 'hello world' in output: {:?}",
788 out_resp.output
789 );
790 assert!(out_resp.exit_status.is_some());
791 })
792 .await;
793 }
794
795 #[tokio::test]
796 async fn test_release_terminal_removes_from_map() {
797 let local = tokio::task::LocalSet::new();
798 local
799 .run_until(async {
800 let (client, _rx, terminals) = test_client();
801
802 let req =
803 CreateTerminalRequest::new("test-session", "sleep").args(vec!["60".into()]);
804 let resp = client.create_terminal(req).await.unwrap();
805 let tid = resp.terminal_id.clone();
806
807 assert!(terminals.borrow().contains_key(tid.0.as_ref()));
808
809 let rel_req = ReleaseTerminalRequest::new("test-session", tid.clone());
810 client.release_terminal(rel_req).await.unwrap();
811
812 assert!(!terminals.borrow().contains_key(tid.0.as_ref()));
813 })
814 .await;
815 }
816
817 #[tokio::test]
818 async fn test_kill_terminal_keeps_in_map() {
819 let local = tokio::task::LocalSet::new();
820 local
821 .run_until(async {
822 let (client, _rx, terminals) = test_client();
823
824 let req =
825 CreateTerminalRequest::new("test-session", "sleep").args(vec!["60".into()]);
826 let resp = client.create_terminal(req).await.unwrap();
827 let tid = resp.terminal_id.clone();
828
829 let kill_req = KillTerminalCommandRequest::new("test-session", tid.clone());
830 client.kill_terminal_command(kill_req).await.unwrap();
831
832 assert!(terminals.borrow().contains_key(tid.0.as_ref()));
834 })
835 .await;
836 }
837
838 #[tokio::test]
839 async fn test_terminal_output_unknown_id_errors() {
840 let local = tokio::task::LocalSet::new();
841 local
842 .run_until(async {
843 let (client, _rx, _terminals) = test_client();
844
845 let req = TerminalOutputRequest::new("test-session", "nonexistent");
846 let result = client.terminal_output(req).await;
847 assert!(result.is_err());
848 })
849 .await;
850 }
851
852 #[tokio::test]
853 async fn test_terminal_failed_command_exit_code() {
854 let local = tokio::task::LocalSet::new();
855 local
856 .run_until(async {
857 let (client, _rx, _terminals) = test_client();
858
859 let req = CreateTerminalRequest::new("test-session", "false");
860 let resp = client.create_terminal(req).await.unwrap();
861
862 let wait_req =
863 WaitForTerminalExitRequest::new("test-session", resp.terminal_id.clone());
864 let wait_resp = client.wait_for_terminal_exit(wait_req).await.unwrap();
865 assert_ne!(wait_resp.exit_status.exit_code, Some(0));
866 })
867 .await;
868 }
869}