1use std::path::PathBuf;
24use std::sync::Arc;
25use std::time::Duration;
26
27use agent_client_protocol as acp;
28use schemars::JsonSchema;
29use serde::Deserialize;
30use tokio::sync::{mpsc, oneshot};
31use tokio_util::sync::CancellationToken;
32use zeph_tools::{
33 ToolCall, ToolError, ToolOutput,
34 executor::deserialize_params,
35 registry::{InvocationHint, ToolDef},
36};
37
38use crate::{error::AcpError, permission::AcpPermissionGate};
39
40const KILL_GRACE_TIMEOUT: Duration = Duration::from_secs(5);
41
42const MAX_STDIN_BYTES: usize = 65_536;
44
45const STDIN_CHANNEL_CAPACITY: usize = 16;
47
48const STDIN_RATE_INTERVAL: Duration = Duration::from_millis(10);
50
51const SHELL_INTERPRETERS: &[&str] = &["bash", "sh", "zsh", "fish", "dash"];
53
54const TRANSPARENT_PREFIXES: &[&str] = &["env", "command", "exec", "nice", "nohup", "time"];
56
57fn extract_command_binary(command: &str) -> &str {
63 let mut tokens = command.split_whitespace().peekable();
65 loop {
66 match tokens.peek() {
67 None => return "bash",
68 Some(tok) => {
69 if tok.contains('=') {
71 tokens.next();
72 continue;
73 }
74 let base = tok.rsplit('/').next().unwrap_or(tok);
76 if TRANSPARENT_PREFIXES.contains(&base) {
77 tokens.next();
78 continue;
79 }
80 let binary = tok.rsplit('/').next().unwrap_or(tok);
82 return binary;
83 }
84 }
85 }
86}
87
88struct ShellResult {
89 output: String,
90 exit_code: Option<u32>,
91 terminal_id: String,
92}
93
94struct TerminalRequest {
95 session_id: acp::schema::SessionId,
96 command: String,
97 args: Vec<String>,
98 cwd: Option<PathBuf>,
99 timeout: Duration,
100 reply: oneshot::Sender<Result<ShellResult, AcpError>>,
101 stream_tx: Option<(mpsc::Sender<acp::schema::SessionNotification>, String)>,
105}
106
107struct TerminalReleaseRequest {
108 session_id: acp::schema::SessionId,
109 terminal_id: String,
110}
111
112struct StdinWriteRequest {
113 session_id: acp::schema::SessionId,
114 terminal_id: acp::schema::TerminalId,
115 data: Vec<u8>,
116 reply: oneshot::Sender<Result<(), AcpError>>,
117}
118
119enum TerminalMessage {
120 Execute(TerminalRequest),
121 Release(TerminalReleaseRequest),
122 WriteStdin(StdinWriteRequest),
123}
124
125#[derive(Clone)]
130pub struct AcpShellExecutor {
131 session_id: acp::schema::SessionId,
132 request_tx: mpsc::UnboundedSender<TerminalMessage>,
133 permission_gate: Option<AcpPermissionGate>,
134 timeout: Duration,
135}
136
137impl AcpShellExecutor {
138 pub fn new(
144 conn: Arc<acp::ConnectionTo<acp::Client>>,
145 session_id: acp::schema::SessionId,
146 permission_gate: Option<AcpPermissionGate>,
147 timeout_secs: u64,
148 ) -> (Self, impl std::future::Future<Output = ()>) {
149 Self::with_timeout(
150 conn,
151 session_id,
152 permission_gate,
153 Duration::from_secs(timeout_secs),
154 )
155 }
156
157 pub fn with_timeout(
159 conn: Arc<acp::ConnectionTo<acp::Client>>,
160 session_id: acp::schema::SessionId,
161 permission_gate: Option<AcpPermissionGate>,
162 timeout: Duration,
163 ) -> (Self, impl std::future::Future<Output = ()>) {
164 let (tx, rx) = mpsc::unbounded_channel::<TerminalMessage>();
165 let handler = async move { run_terminal_handler(conn, rx).await };
166 (
167 Self {
168 session_id,
169 request_tx: tx,
170 permission_gate,
171 timeout,
172 },
173 handler,
174 )
175 }
176
177 pub fn release_terminal(&self, terminal_id: String) {
183 self.request_tx
184 .send(TerminalMessage::Release(TerminalReleaseRequest {
185 session_id: self.session_id.clone(),
186 terminal_id,
187 }))
188 .ok();
189 }
190
191 async fn handle_bash_stdin(&self, call: &ToolCall) -> Result<Option<ToolOutput>, ToolError> {
192 let gate = self
194 .permission_gate
195 .as_ref()
196 .ok_or_else(|| ToolError::Blocked {
197 command: "bash_stdin: permission gate required".into(),
198 })?;
199
200 let params: BashStdinParams = deserialize_params(&call.params)?;
201
202 if params.data.len() > MAX_STDIN_BYTES {
203 return Err(ToolError::InvalidParams {
204 message: AcpError::StdinTooLarge {
205 size: params.data.len(),
206 }
207 .to_string(),
208 });
209 }
210 let data = params.data.as_bytes().to_vec();
211
212 let is_shell = SHELL_INTERPRETERS
216 .iter()
217 .any(|s| params.terminal_id.contains(s));
218 let title = if is_shell {
219 "bash_stdin [WARNING: stdin to shell interpreter — data will be executed as commands]"
220 .to_string()
221 } else {
222 "bash_stdin".to_owned()
223 };
224 let fields = acp::schema::ToolCallUpdateFields::new()
225 .title(title)
226 .raw_input(serde_json::json!({
227 "terminal_id": params.terminal_id,
228 "data_length": params.data.len(),
229 }));
230 let tool_call = acp::schema::ToolCallUpdate::new("bash_stdin".to_owned(), fields);
231 let allowed = gate
232 .check_permission(self.session_id.clone(), tool_call)
233 .await
234 .map_err(|e| ToolError::InvalidParams {
235 message: e.to_string(),
236 })?;
237 if !allowed {
238 return Err(ToolError::Blocked {
239 command: "bash_stdin: permission denied".into(),
240 });
241 }
242
243 let terminal_id: acp::schema::TerminalId = params.terminal_id.clone().into();
244 let (reply_tx, reply_rx) = oneshot::channel();
245 self.request_tx
246 .send(TerminalMessage::WriteStdin(StdinWriteRequest {
247 session_id: self.session_id.clone(),
248 terminal_id,
249 data,
250 reply: reply_tx,
251 }))
252 .map_err(|_| ToolError::InvalidParams {
253 message: "terminal handler closed".into(),
254 })?;
255 reply_rx
256 .await
257 .map_err(|_| ToolError::InvalidParams {
258 message: "terminal handler closed".into(),
259 })?
260 .map_err(|e| ToolError::InvalidParams {
261 message: e.to_string(),
262 })?;
263
264 Ok(Some(ToolOutput {
265 tool_name: zeph_tools::ToolName::new("bash_stdin"),
266 summary: format!(
267 "wrote {} bytes to stdin of {}",
268 params.data.len(),
269 params.terminal_id
270 ),
271 blocks_executed: 1,
272 filter_stats: None,
273 diff: None,
274 streamed: false,
275 terminal_id: Some(params.terminal_id),
276 locations: None,
277 raw_response: None,
278 claim_source: Some(zeph_tools::ClaimSource::Shell),
279 }))
280 }
281
282 async fn execute_shell(
283 &self,
284 command: String,
285 args: Vec<String>,
286 cwd: Option<PathBuf>,
287 stream_tx: Option<(mpsc::Sender<acp::schema::SessionNotification>, String)>,
288 ) -> Result<ShellResult, AcpError> {
289 let (reply_tx, reply_rx) = oneshot::channel();
290 self.request_tx
291 .send(TerminalMessage::Execute(TerminalRequest {
292 session_id: self.session_id.clone(),
293 command,
294 args,
295 cwd,
296 timeout: self.timeout,
297 reply: reply_tx,
298 stream_tx,
299 }))
300 .map_err(|_| AcpError::ChannelClosed)?;
301 reply_rx.await.map_err(|_| AcpError::ChannelClosed)?
302 }
303}
304
305#[derive(Deserialize, JsonSchema)]
306struct BashParams {
307 command: String,
308 #[serde(default)]
309 args: Vec<String>,
310 #[serde(default)]
311 cwd: Option<String>,
312}
313
314#[derive(Deserialize, JsonSchema)]
315struct BashStdinParams {
316 terminal_id: String,
317 data: String,
318}
319
320impl zeph_tools::ToolExecutor for AcpShellExecutor {
321 async fn execute(&self, _response: &str) -> Result<Option<ToolOutput>, ToolError> {
322 Ok(None)
323 }
324
325 fn tool_definitions(&self) -> Vec<ToolDef> {
326 let mut defs = vec![ToolDef {
327 id: "bash".into(),
328 description: "Execute a shell command in the IDE terminal.\n\nParameters: command (string, required) - shell command to run\nReturns: stdout/stderr combined with exit code\nErrors: Timeout; permission denied by IDE; command blocked by policy\nExample: {\"command\": \"cargo build\"}".into(),
329 schema: schemars::schema_for!(BashParams),
330 invocation: InvocationHint::ToolCall,
331 output_schema: None,
332 }];
333 if self.permission_gate.is_some() {
335 defs.push(ToolDef {
336 id: "bash_stdin".into(),
337 description: "Write data to stdin of a running terminal process.\n\nParameters: terminal_id (string, required) - terminal to write to; data (string, required) - stdin data\nReturns: confirmation\nErrors: terminal not found; terminal process exited\nExample: {\"terminal_id\": \"term-1\", \"data\": \"yes\\n\"}".into(),
338 schema: schemars::schema_for!(BashStdinParams),
339 invocation: InvocationHint::ToolCall,
340 output_schema: None,
341 });
342 }
343 defs
344 }
345
346 async fn execute_tool_call(&self, call: &ToolCall) -> Result<Option<ToolOutput>, ToolError> {
347 if call.tool_id == "bash_stdin" {
348 return self.handle_bash_stdin(call).await;
349 }
350 if call.tool_id != "bash" {
351 return Ok(None);
352 }
353
354 let params: BashParams = deserialize_params(&call.params)?;
355 let cwd = params.cwd.map(PathBuf::from);
356
357 let blocklist: Vec<String> = zeph_tools::DEFAULT_BLOCKED_COMMANDS
358 .iter()
359 .map(|s| (*s).to_owned())
360 .collect();
361
362 if let Some(pattern) = zeph_tools::check_blocklist(¶ms.command, &blocklist) {
364 return Err(ToolError::Blocked { command: pattern });
365 }
366 if let Some(script) = zeph_tools::effective_shell_command(¶ms.command, ¶ms.args)
369 && let Some(pattern) = zeph_tools::check_blocklist(script, &blocklist)
370 {
371 return Err(ToolError::Blocked { command: pattern });
372 }
373
374 if self.permission_gate.is_none() {
375 tracing::warn!(
376 "AcpShellExecutor has no permission gate — only blocklist applies. \
377 Do not use in production without a permission gate."
378 );
379 }
380
381 if let Some(gate) = &self.permission_gate {
382 let cmd_binary = extract_command_binary(¶ms.command);
385 let fields = acp::schema::ToolCallUpdateFields::new()
386 .title(cmd_binary.to_owned())
387 .raw_input(serde_json::json!({ "command": params.command }));
388 let tool_call = acp::schema::ToolCallUpdate::new(cmd_binary.to_owned(), fields);
389 let allowed = gate
390 .check_permission(self.session_id.clone(), tool_call)
391 .await
392 .map_err(|e| ToolError::InvalidParams {
393 message: e.to_string(),
394 })?;
395 if !allowed {
396 return Err(ToolError::Blocked {
397 command: params.command,
398 });
399 }
400 }
401
402 let result = self
403 .execute_shell(params.command, params.args, cwd, None)
404 .await
405 .map_err(|e| ToolError::InvalidParams {
406 message: e.to_string(),
407 })?;
408
409 let is_error = !matches!(result.exit_code, Some(0) | None);
410 let summary = if is_error {
411 format!(
412 "[exit {}]\n{}",
413 result.exit_code.unwrap_or(1),
414 result.output
415 )
416 } else {
417 result.output.clone()
418 };
419 let raw_response = Some(serde_json::json!({
420 "stdout": result.output,
421 "stderr": "",
422 "interrupted": false,
423 "isImage": false,
424 "noOutputExpected": false
425 }));
426
427 Ok(Some(ToolOutput {
428 tool_name: zeph_tools::ToolName::new("bash"),
429 summary,
430 blocks_executed: 1,
431 filter_stats: None,
432 diff: None,
433 streamed: false,
434 terminal_id: Some(result.terminal_id),
435 locations: None,
436 raw_response,
437 claim_source: Some(zeph_tools::ClaimSource::Shell),
438 }))
439 }
440}
441
442async fn forward_stdin_via_ext(
443 conn: &Arc<acp::ConnectionTo<acp::Client>>,
444 session_id: &acp::schema::SessionId,
445 terminal_id: &acp::schema::TerminalId,
446 data: Vec<u8>,
447) -> Result<(), AcpError> {
448 use base64::Engine as _;
449 let encoded = base64::engine::general_purpose::STANDARD.encode(&data);
450 let params_json = serde_json::json!({
451 "session_id": session_id.to_string(),
452 "terminal_id": terminal_id.to_string(),
453 "data": encoded,
454 });
455 let req = acp::UntypedMessage::new("terminal/write_stdin", params_json)
456 .map_err(|e| AcpError::ClientError(e.to_string()))?;
457 conn.send_request(req)
458 .block_task()
459 .await
460 .map(|_| ())
461 .map_err(|e| AcpError::ClientError(e.to_string()))
462}
463
464async fn run_stdin_pump(
468 conn: Arc<acp::ConnectionTo<acp::Client>>,
469 session_id: acp::schema::SessionId,
470 terminal_id: acp::schema::TerminalId,
471 mut data_rx: mpsc::Receiver<Vec<u8>>,
472 cancel: CancellationToken,
473) {
474 let mut interval = tokio::time::interval(STDIN_RATE_INTERVAL);
475 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
476 loop {
477 let data = tokio::select! {
478 () = cancel.cancelled() => break,
479 msg = data_rx.recv() => match msg {
480 Some(d) => d,
481 None => break,
482 },
483 };
484 tokio::select! {
486 () = cancel.cancelled() => break,
487 _ = interval.tick() => {}
488 }
489 if let Err(e) = forward_stdin_via_ext(&conn, &session_id, &terminal_id, data).await {
490 tracing::warn!(%terminal_id, error = %e, "stdin pump error — cancelling");
492 cancel.cancel();
493 break;
494 }
495 }
496}
497
498async fn run_terminal_handler(
499 conn: Arc<acp::ConnectionTo<acp::Client>>,
500 mut rx: mpsc::UnboundedReceiver<TerminalMessage>,
501) {
502 let mut stdin_pumps: std::collections::HashMap<
504 String,
505 (mpsc::Sender<Vec<u8>>, CancellationToken),
506 > = std::collections::HashMap::new();
507
508 while let Some(msg) = rx.recv().await {
509 match msg {
510 TerminalMessage::Execute(req) => {
511 let result = execute_in_terminal(
512 &conn,
513 req.session_id,
514 req.command,
515 req.args,
516 req.cwd,
517 req.timeout,
518 req.stream_tx,
519 )
520 .await;
521 if let Ok(ref shell_result) = result
523 && let Some((_, token)) = stdin_pumps.remove(&shell_result.terminal_id)
524 {
525 token.cancel();
526 }
527 req.reply.send(result).ok();
528 }
529 TerminalMessage::Release(req) => {
530 if let Some((_, token)) = stdin_pumps.remove(&req.terminal_id) {
532 token.cancel();
533 }
534 let tid = req.terminal_id.clone();
535 let release_req =
536 acp::schema::ReleaseTerminalRequest::new(req.session_id, req.terminal_id);
537 if let Err(e) = conn.send_request(release_req).block_task().await {
538 tracing::warn!(
539 terminal_id = %tid,
540 error = %e,
541 "failed to release terminal"
542 );
543 }
544 }
545 TerminalMessage::WriteStdin(req) => {
546 let tid_str = req.terminal_id.to_string();
547
548 let (data_tx, cancel) = stdin_pumps.entry(tid_str).or_insert_with(|| {
550 let (tx, rx) = mpsc::channel::<Vec<u8>>(STDIN_CHANNEL_CAPACITY);
551 let token = CancellationToken::new();
552 tokio::spawn(run_stdin_pump(
553 conn.clone(),
554 req.session_id.clone(),
555 req.terminal_id.clone(),
556 rx,
557 token.clone(),
558 ));
559 (tx, token)
560 });
561
562 let result = if cancel.is_cancelled() {
563 Err(AcpError::BrokenPipe)
564 } else {
565 data_tx.try_send(req.data).map_err(|_| AcpError::BrokenPipe)
567 };
568
569 req.reply.send(result).ok();
570 }
571 }
572 }
573}
574
575const STREAM_POLL_INTERVAL: Duration = Duration::from_millis(200);
577
578async fn kill_terminal(
580 conn: &Arc<acp::ConnectionTo<acp::Client>>,
581 session_id: &acp::schema::SessionId,
582 terminal_id: &acp::schema::TerminalId,
583) -> Result<(), AcpError> {
584 tracing::warn!(%terminal_id, "terminal command timed out — sending kill");
585 let kill_req = acp::schema::KillTerminalRequest::new(session_id.clone(), terminal_id.clone());
586 conn.send_request(kill_req)
587 .block_task()
588 .await
589 .map_err(|e| AcpError::ClientError(e.to_string()))?;
590 let wait_again =
591 acp::schema::WaitForTerminalExitRequest::new(session_id.clone(), terminal_id.clone());
592 let _ = tokio::time::timeout(
593 KILL_GRACE_TIMEOUT,
594 conn.send_request(wait_again).block_task(),
595 )
596 .await;
597 Ok(())
598}
599
600async fn stream_until_exit(
604 conn: &Arc<acp::ConnectionTo<acp::Client>>,
605 session_id: &acp::schema::SessionId,
606 terminal_id: &acp::schema::TerminalId,
607 timeout: Duration,
608 notify_tx: &mpsc::Sender<acp::schema::SessionNotification>,
609 tool_call_id: &str,
610) -> Result<Option<u32>, AcpError> {
611 let wait_req =
612 acp::schema::WaitForTerminalExitRequest::new(session_id.clone(), terminal_id.clone());
613 let exit_future = conn.send_request(wait_req).block_task();
614 tokio::pin!(exit_future);
615 let deadline = tokio::time::Instant::now() + timeout;
616 let mut last_output_len = 0usize;
617
618 loop {
619 tokio::select! {
620 result = &mut exit_future => {
621 return match result {
622 Ok(resp) => Ok(resp.exit_status.exit_code),
623 Err(e) => Err(AcpError::ClientError(e.to_string())),
624 };
625 }
626 () = tokio::time::sleep(STREAM_POLL_INTERVAL) => {
627 if tokio::time::Instant::now() >= deadline {
628 kill_terminal(conn, session_id, terminal_id).await?;
629 return Ok(Some(124u32));
630 }
631 let output_req =
632 acp::schema::TerminalOutputRequest::new(session_id.clone(), terminal_id.clone());
633 if let Ok(resp) = conn.send_request(output_req).block_task().await {
634 let new_data = resp.output.get(last_output_len..).unwrap_or("");
635 if !new_data.is_empty() {
636 last_output_len = resp.output.len();
637 let mut meta = serde_json::Map::new();
638 meta.insert(
639 "terminal_output".to_owned(),
640 serde_json::json!({
641 "terminal_id": terminal_id.to_string(),
642 "data": new_data,
643 }),
644 );
645 let update = acp::schema::ToolCallUpdate::new(
646 tool_call_id.to_owned(),
647 acp::schema::ToolCallUpdateFields::new(),
648 )
649 .meta(meta);
650 let notif = acp::schema::SessionNotification::new(
651 session_id.clone(),
652 acp::schema::SessionUpdate::ToolCallUpdate(update),
653 );
654 let _ = notify_tx.try_send(notif);
655 }
656 }
657 }
658 }
659 }
660}
661
662async fn execute_in_terminal(
663 conn: &Arc<acp::ConnectionTo<acp::Client>>,
664 session_id: acp::schema::SessionId,
665 command: String,
666 args: Vec<String>,
667 cwd: Option<PathBuf>,
668 timeout: Duration,
669 stream_tx: Option<(mpsc::Sender<acp::schema::SessionNotification>, String)>,
670) -> Result<ShellResult, AcpError> {
671 let create_req = acp::schema::CreateTerminalRequest::new(session_id.clone(), command)
673 .args(args)
674 .cwd(cwd);
675 let create_resp = conn
676 .send_request(create_req)
677 .block_task()
678 .await
679 .map_err(|e| AcpError::ClientError(e.to_string()))?;
680 let terminal_id = create_resp.terminal_id;
681
682 let exit_code = if let Some((ref notify_tx, ref tool_call_id)) = stream_tx {
684 stream_until_exit(
685 conn,
686 &session_id,
687 &terminal_id,
688 timeout,
689 notify_tx,
690 tool_call_id,
691 )
692 .await?
693 } else {
694 let wait_req =
695 acp::schema::WaitForTerminalExitRequest::new(session_id.clone(), terminal_id.clone());
696 match tokio::time::timeout(timeout, conn.send_request(wait_req).block_task()).await {
697 Ok(Ok(resp)) => resp.exit_status.exit_code,
698 Ok(Err(e)) => return Err(AcpError::ClientError(e.to_string())),
699 Err(_) => {
700 kill_terminal(conn, &session_id, &terminal_id).await?;
701 Some(124u32)
702 }
703 }
704 };
705
706 let output_req =
710 acp::schema::TerminalOutputRequest::new(session_id.clone(), terminal_id.clone());
711 let output_resp = conn
712 .send_request(output_req)
713 .block_task()
714 .await
715 .map_err(|e| AcpError::ClientError(e.to_string()))?;
716
717 if let Some((ref notify_tx, ref tool_call_id)) = stream_tx {
719 let mut meta = serde_json::Map::new();
720 meta.insert(
721 "terminal_exit".to_owned(),
722 serde_json::json!({ "terminal_id": terminal_id.to_string(), "exit_code": exit_code }),
723 );
724 let update = acp::schema::ToolCallUpdate::new(
725 tool_call_id.clone(),
726 acp::schema::ToolCallUpdateFields::new(),
727 )
728 .meta(meta);
729 let notif = acp::schema::SessionNotification::new(
730 session_id.clone(),
731 acp::schema::SessionUpdate::ToolCallUpdate(update),
732 );
733 let _ = notify_tx.try_send(notif);
734 }
735
736 Ok(ShellResult {
738 output: output_resp.output,
739 exit_code,
740 terminal_id: terminal_id.to_string(),
741 })
742}
743
744#[cfg(any())] mod tests {
747 use std::rc::Rc;
748
749 use zeph_tools::ToolExecutor as _;
750
751 use super::*;
752
753 struct FakeTerminalClient;
754
755 #[async_trait::async_trait(?Send)]
756 impl acp::Client for FakeTerminalClient {
757 async fn request_permission(
758 &self,
759 _args: acp::schema::RequestPermissionRequest,
760 ) -> acp::Result<acp::RequestPermissionResponse> {
761 Err(acp::Error::method_not_found())
762 }
763
764 async fn create_terminal(
765 &self,
766 _args: acp::schema::CreateTerminalRequest,
767 ) -> acp::Result<acp::schema::CreateTerminalResponse> {
768 Ok(acp::schema::CreateTerminalResponse::new("term-1"))
769 }
770
771 async fn wait_for_terminal_exit(
772 &self,
773 _args: acp::schema::WaitForTerminalExitRequest,
774 ) -> acp::Result<acp::WaitForTerminalExitResponse> {
775 Ok(acp::WaitForTerminalExitResponse::new(
776 acp::TerminalExitStatus::new().exit_code(0u32),
777 ))
778 }
779
780 async fn terminal_output(
781 &self,
782 _args: acp::schema::TerminalOutputRequest,
783 ) -> acp::Result<acp::TerminalOutputResponse> {
784 Ok(acp::TerminalOutputResponse::new("hello\n", false))
785 }
786
787 async fn release_terminal(
788 &self,
789 _args: acp::schema::ReleaseTerminalRequest,
790 ) -> acp::Result<acp::ReleaseTerminalResponse> {
791 Ok(acp::ReleaseTerminalResponse::new())
792 }
793
794 async fn kill_terminal(
795 &self,
796 _args: acp::schema::KillTerminalRequest,
797 ) -> acp::Result<acp::KillTerminalResponse> {
798 Ok(acp::KillTerminalResponse::new())
799 }
800
801 async fn session_notification(
802 &self,
803 _args: acp::schema::SessionNotification,
804 ) -> acp::Result<()> {
805 Ok(())
806 }
807 }
808
809 #[tokio::test]
810 async fn bash_tool_call_returns_output() {
811 let local = tokio::task::LocalSet::new();
812 local
813 .run_until(async {
814 let conn = Rc::new(FakeTerminalClient);
815 let sid = acp::schema::SessionId::new("s1");
816 let (exec, handler) = AcpShellExecutor::new(conn, sid, None, 120);
817 tokio::task::spawn_local(handler);
818
819 let mut params = serde_json::Map::new();
820 params.insert("command".to_owned(), serde_json::json!("echo"));
821 params.insert("args".to_owned(), serde_json::json!(["hello"]));
822 let call = ToolCall {
823 tool_id: zeph_tools::ToolName::new("bash"),
824 params,
825 caller_id: None,
826 };
827
828 let result = exec.execute_tool_call(&call).await.unwrap().unwrap();
829 assert_eq!(result.summary, "hello\n");
830 assert_eq!(result.tool_name, "bash");
831 })
832 .await;
833 }
834
835 #[tokio::test]
836 async fn unknown_tool_returns_none() {
837 let local = tokio::task::LocalSet::new();
838 local
839 .run_until(async {
840 let conn = Rc::new(FakeTerminalClient);
841 let sid = acp::schema::SessionId::new("s1");
842 let (exec, handler) = AcpShellExecutor::new(conn, sid, None, 120);
843 tokio::task::spawn_local(handler);
844
845 let call = ToolCall {
846 tool_id: zeph_tools::ToolName::new("unknown"),
847 params: serde_json::Map::new(),
848 caller_id: None,
849 };
850 let result = exec.execute_tool_call(&call).await.unwrap();
851 assert!(result.is_none());
852 })
853 .await;
854 }
855
856 #[test]
857 fn tool_definitions_registers_bash() {
858 let (tx, _rx) = mpsc::unbounded_channel::<TerminalMessage>();
859 let exec = AcpShellExecutor {
860 session_id: acp::schema::SessionId::new("s"),
861 request_tx: tx,
862 permission_gate: None,
863 timeout: Duration::from_mins(2),
864 };
865 let defs = exec.tool_definitions();
866 assert_eq!(defs.len(), 1);
867 assert_eq!(defs[0].id, "bash");
868 }
869
870 struct NonZeroExitClient;
871
872 #[async_trait::async_trait(?Send)]
873 impl acp::Client for NonZeroExitClient {
874 async fn request_permission(
875 &self,
876 _args: acp::schema::RequestPermissionRequest,
877 ) -> acp::Result<acp::RequestPermissionResponse> {
878 Err(acp::Error::method_not_found())
879 }
880
881 async fn create_terminal(
882 &self,
883 _args: acp::schema::CreateTerminalRequest,
884 ) -> acp::Result<acp::schema::CreateTerminalResponse> {
885 Ok(acp::schema::CreateTerminalResponse::new("term-fail"))
886 }
887
888 async fn wait_for_terminal_exit(
889 &self,
890 _args: acp::schema::WaitForTerminalExitRequest,
891 ) -> acp::Result<acp::WaitForTerminalExitResponse> {
892 Ok(acp::WaitForTerminalExitResponse::new(
893 acp::TerminalExitStatus::new().exit_code(1u32),
894 ))
895 }
896
897 async fn terminal_output(
898 &self,
899 _args: acp::schema::TerminalOutputRequest,
900 ) -> acp::Result<acp::TerminalOutputResponse> {
901 Ok(acp::TerminalOutputResponse::new("error output\n", false))
902 }
903
904 async fn release_terminal(
905 &self,
906 _args: acp::schema::ReleaseTerminalRequest,
907 ) -> acp::Result<acp::ReleaseTerminalResponse> {
908 Ok(acp::ReleaseTerminalResponse::new())
909 }
910
911 async fn kill_terminal(
912 &self,
913 _args: acp::schema::KillTerminalRequest,
914 ) -> acp::Result<acp::KillTerminalResponse> {
915 Ok(acp::KillTerminalResponse::new())
916 }
917
918 async fn session_notification(
919 &self,
920 _args: acp::schema::SessionNotification,
921 ) -> acp::Result<()> {
922 Ok(())
923 }
924 }
925
926 #[tokio::test]
927 async fn nonzero_exit_code_prefixes_output() {
928 let local = tokio::task::LocalSet::new();
929 local
930 .run_until(async {
931 let conn = Rc::new(NonZeroExitClient);
932 let sid = acp::schema::SessionId::new("s1");
933 let (exec, handler) = AcpShellExecutor::new(conn, sid, None, 120);
934 tokio::task::spawn_local(handler);
935
936 let mut params = serde_json::Map::new();
937 params.insert("command".to_owned(), serde_json::json!("false"));
938 let call = ToolCall {
939 tool_id: zeph_tools::ToolName::new("bash"),
940 params,
941 caller_id: None,
942 };
943
944 let result = exec.execute_tool_call(&call).await.unwrap().unwrap();
945 assert!(
946 result.summary.starts_with("[exit 1]"),
947 "got: {}",
948 result.summary
949 );
950 assert!(result.summary.contains("error output\n"));
951 })
952 .await;
953 }
954
955 struct RejectPermissionClient;
956
957 #[async_trait::async_trait(?Send)]
958 impl acp::Client for RejectPermissionClient {
959 async fn request_permission(
960 &self,
961 _args: acp::schema::RequestPermissionRequest,
962 ) -> acp::Result<acp::RequestPermissionResponse> {
963 Ok(acp::RequestPermissionResponse::new(
964 acp::schema::RequestPermissionOutcome::Selected(
965 acp::SelectedPermissionOutcome::new("reject_once"),
966 ),
967 ))
968 }
969
970 async fn create_terminal(
971 &self,
972 _args: acp::schema::CreateTerminalRequest,
973 ) -> acp::Result<acp::schema::CreateTerminalResponse> {
974 panic!("should not be called when permission denied")
975 }
976
977 async fn wait_for_terminal_exit(
978 &self,
979 _args: acp::schema::WaitForTerminalExitRequest,
980 ) -> acp::Result<acp::WaitForTerminalExitResponse> {
981 panic!("should not be called when permission denied")
982 }
983
984 async fn terminal_output(
985 &self,
986 _args: acp::schema::TerminalOutputRequest,
987 ) -> acp::Result<acp::TerminalOutputResponse> {
988 panic!("should not be called when permission denied")
989 }
990
991 async fn release_terminal(
992 &self,
993 _args: acp::schema::ReleaseTerminalRequest,
994 ) -> acp::Result<acp::ReleaseTerminalResponse> {
995 panic!("should not be called when permission denied")
996 }
997
998 async fn kill_terminal(
999 &self,
1000 _args: acp::schema::KillTerminalRequest,
1001 ) -> acp::Result<acp::KillTerminalResponse> {
1002 panic!("should not be called when permission denied")
1003 }
1004
1005 async fn session_notification(
1006 &self,
1007 _args: acp::schema::SessionNotification,
1008 ) -> acp::Result<()> {
1009 Ok(())
1010 }
1011 }
1012
1013 #[tokio::test]
1014 async fn permission_denied_returns_blocked_error() {
1015 let local = tokio::task::LocalSet::new();
1016 local
1017 .run_until(async {
1018 let perm_conn = Rc::new(RejectPermissionClient);
1019 let sid = acp::schema::SessionId::new("s1");
1020 let tmp_dir = tempfile::tempdir().unwrap();
1021 let perm_file = tmp_dir.path().join("perms.toml");
1022 let (gate, perm_handler) = AcpPermissionGate::new(perm_conn, Some(perm_file));
1023 tokio::task::spawn_local(perm_handler);
1024
1025 let term_conn = Rc::new(FakeTerminalClient);
1026 let (exec, term_handler) = AcpShellExecutor::new(term_conn, sid, Some(gate), 120);
1027 tokio::task::spawn_local(term_handler);
1028
1029 let mut params = serde_json::Map::new();
1030 params.insert("command".to_owned(), serde_json::json!("rm"));
1031 params.insert("args".to_owned(), serde_json::json!(["-rf", "/important"]));
1032 let call = ToolCall {
1033 tool_id: zeph_tools::ToolName::new("bash"),
1034 params,
1035 caller_id: None,
1036 };
1037
1038 let err = exec.execute_tool_call(&call).await.unwrap_err();
1039 assert!(matches!(err, ToolError::Blocked { .. }));
1040 })
1041 .await;
1042 }
1043
1044 #[tokio::test]
1045 async fn streaming_mode_emits_terminal_exit_notification() {
1046 let local = tokio::task::LocalSet::new();
1047 local
1048 .run_until(async {
1049 let conn = Rc::new(FakeTerminalClient);
1050 let sid = acp::schema::SessionId::new("s1");
1051 let (tx, rx) = mpsc::unbounded_channel::<TerminalMessage>();
1052 let handler = async move { run_terminal_handler(conn, rx).await };
1053 tokio::task::spawn_local(handler);
1054
1055 let (stream_tx, mut stream_rx) = mpsc::channel(8);
1056 let (reply_tx, reply_rx) = oneshot::channel();
1057 tx.send(TerminalMessage::Execute(TerminalRequest {
1058 session_id: sid,
1059 command: "echo".to_owned(),
1060 args: vec!["hi".to_owned()],
1061 cwd: None,
1062 timeout: Duration::from_secs(5),
1063 reply: reply_tx,
1064 stream_tx: Some((stream_tx, "tool-1".to_owned())),
1065 }))
1066 .unwrap();
1067
1068 let result = reply_rx.await.unwrap().unwrap();
1069 assert_eq!(result.output, "hello\n");
1070
1071 let mut got_exit = false;
1073 while let Ok(notif) = stream_rx.try_recv() {
1074 if let acp::schema::SessionUpdate::ToolCallUpdate(update) = notif.update
1075 && let Some(meta) = update.meta
1076 && meta.contains_key("terminal_exit")
1077 {
1078 got_exit = true;
1079 }
1080 }
1081 assert!(got_exit, "expected terminal_exit notification");
1082 })
1083 .await;
1084 }
1085
1086 #[test]
1087 fn extract_command_binary_bare() {
1088 assert_eq!(extract_command_binary("git status"), "git");
1089 assert_eq!(extract_command_binary("cargo build --release"), "cargo");
1090 assert_eq!(extract_command_binary(" cat file.txt "), "cat");
1091 }
1092
1093 #[test]
1094 fn extract_command_binary_env_prefix() {
1095 assert_eq!(extract_command_binary("env FOO=bar git status"), "git");
1096 assert_eq!(extract_command_binary("command git push"), "git");
1097 assert_eq!(extract_command_binary("exec cargo test"), "cargo");
1098 }
1099
1100 #[test]
1101 fn extract_command_binary_env_var_assignments() {
1102 assert_eq!(extract_command_binary("FOO=bar BAZ=qux git log"), "git");
1103 }
1104
1105 #[test]
1106 fn extract_command_binary_path() {
1107 assert_eq!(extract_command_binary("/usr/bin/git status"), "git");
1108 assert_eq!(
1109 extract_command_binary("/usr/local/bin/cargo build"),
1110 "cargo"
1111 );
1112 }
1113
1114 #[test]
1115 fn extract_command_binary_empty_fallback() {
1116 assert_eq!(extract_command_binary(""), "bash");
1117 assert_eq!(extract_command_binary(" "), "bash");
1118 }
1119
1120 #[tokio::test]
1121 async fn blocklist_blocked_before_permission_gate() {
1122 let local = tokio::task::LocalSet::new();
1126 local
1127 .run_until(async {
1128 let conn = Rc::new(FakeTerminalClient);
1129 let sid = acp::schema::SessionId::new("s1");
1130 let (exec, handler) = AcpShellExecutor::new(conn, sid, None, 120);
1132 tokio::task::spawn_local(handler);
1133
1134 let mut params = serde_json::Map::new();
1135 params.insert("command".to_owned(), serde_json::json!("rm -rf /"));
1136 let call = ToolCall {
1137 tool_id: zeph_tools::ToolName::new("bash"),
1138 params,
1139 caller_id: None,
1140 };
1141
1142 let err = exec.execute_tool_call(&call).await.unwrap_err();
1143 assert!(matches!(err, ToolError::Blocked { .. }));
1144 })
1145 .await;
1146 }
1147
1148 #[tokio::test]
1149 async fn blocklist_sudo_blocked() {
1150 let local = tokio::task::LocalSet::new();
1151 local
1152 .run_until(async {
1153 let conn = Rc::new(FakeTerminalClient);
1154 let sid = acp::schema::SessionId::new("s1");
1155 let (exec, handler) = AcpShellExecutor::new(conn, sid, None, 120);
1156 tokio::task::spawn_local(handler);
1157
1158 let mut params = serde_json::Map::new();
1159 params.insert(
1160 "command".to_owned(),
1161 serde_json::json!("sudo apt install vim"),
1162 );
1163 let call = ToolCall {
1164 tool_id: zeph_tools::ToolName::new("bash"),
1165 params,
1166 caller_id: None,
1167 };
1168
1169 let err = exec.execute_tool_call(&call).await.unwrap_err();
1170 assert!(matches!(err, ToolError::Blocked { .. }));
1171 })
1172 .await;
1173 }
1174
1175 #[tokio::test]
1176 async fn args_field_bypass_blocked_for_shell_interpreter() {
1177 let local = tokio::task::LocalSet::new();
1179 local
1180 .run_until(async {
1181 let conn = Rc::new(FakeTerminalClient);
1182 let sid = acp::schema::SessionId::new("s1");
1183 let (exec, handler) = AcpShellExecutor::new(conn, sid, None, 120);
1184 tokio::task::spawn_local(handler);
1185
1186 let mut params = serde_json::Map::new();
1187 params.insert("command".to_owned(), serde_json::json!("bash"));
1188 params.insert(
1189 "args".to_owned(),
1190 serde_json::json!(["-c", "sudo rm -rf /"]),
1191 );
1192 let call = ToolCall {
1193 tool_id: zeph_tools::ToolName::new("bash"),
1194 params,
1195 caller_id: None,
1196 };
1197
1198 let err = exec.execute_tool_call(&call).await.unwrap_err();
1199 assert!(matches!(err, ToolError::Blocked { .. }));
1200 })
1201 .await;
1202 }
1203
1204 #[tokio::test]
1205 async fn args_field_bypass_sh_minus_c_blocked() {
1206 let local = tokio::task::LocalSet::new();
1207 local
1208 .run_until(async {
1209 let conn = Rc::new(FakeTerminalClient);
1210 let sid = acp::schema::SessionId::new("s1");
1211 let (exec, handler) = AcpShellExecutor::new(conn, sid, None, 120);
1212 tokio::task::spawn_local(handler);
1213
1214 let mut params = serde_json::Map::new();
1215 params.insert("command".to_owned(), serde_json::json!("sh"));
1216 params.insert(
1217 "args".to_owned(),
1218 serde_json::json!(["-c", "shutdown -h now"]),
1219 );
1220 let call = ToolCall {
1221 tool_id: zeph_tools::ToolName::new("bash"),
1222 params,
1223 caller_id: None,
1224 };
1225
1226 let err = exec.execute_tool_call(&call).await.unwrap_err();
1227 assert!(matches!(err, ToolError::Blocked { .. }));
1228 })
1229 .await;
1230 }
1231
1232 #[test]
1233 fn extract_command_binary_chained_transparent_prefixes() {
1234 assert_eq!(
1236 extract_command_binary("env command exec sudo rm -rf /"),
1237 "sudo"
1238 );
1239 assert_eq!(extract_command_binary("nice nohup time git status"), "git");
1240 }
1241
1242 #[test]
1243 fn extract_command_binary_env_var_then_prefix_then_binary() {
1244 assert_eq!(extract_command_binary("FOO=bar env BAZ=qux git log"), "git");
1245 }
1246
1247 #[tokio::test]
1248 async fn bash_stdin_blocked_without_permission_gate() {
1249 let local = tokio::task::LocalSet::new();
1250 local
1251 .run_until(async {
1252 let conn = Rc::new(FakeTerminalClient);
1253 let sid = acp::schema::SessionId::new("s1");
1254 let (exec, handler) = AcpShellExecutor::new(conn, sid, None, 120);
1255 tokio::task::spawn_local(handler);
1256
1257 let mut params = serde_json::Map::new();
1258 params.insert("terminal_id".to_owned(), serde_json::json!("term-1"));
1259 params.insert("data".to_owned(), serde_json::json!("hello\n"));
1260 let call = ToolCall {
1261 tool_id: zeph_tools::ToolName::new("bash_stdin"),
1262 params,
1263 caller_id: None,
1264 };
1265 let err = exec.execute_tool_call(&call).await.unwrap_err();
1266 assert!(matches!(err, ToolError::Blocked { .. }));
1267 })
1268 .await;
1269 }
1270
1271 #[test]
1272 fn bash_stdin_not_in_tool_definitions_without_gate() {
1273 let (tx, _rx) = mpsc::unbounded_channel::<TerminalMessage>();
1274 let exec = AcpShellExecutor {
1275 session_id: acp::schema::SessionId::new("s"),
1276 request_tx: tx,
1277 permission_gate: None,
1278 timeout: Duration::from_mins(2),
1279 };
1280 let defs = exec.tool_definitions();
1281 assert!(!defs.iter().any(|d| d.id == "bash_stdin"));
1282 }
1283
1284 #[tokio::test]
1285 async fn bash_stdin_size_limit_rejected() {
1286 let local = tokio::task::LocalSet::new();
1287 local
1288 .run_until(async {
1289 let perm_conn = Rc::new(RejectPermissionClient);
1290 let sid = acp::schema::SessionId::new("s1");
1291 let tmp_dir = tempfile::tempdir().unwrap();
1292 let perm_file = tmp_dir.path().join("perms.toml");
1293 let (gate, perm_handler) = AcpPermissionGate::new(perm_conn, Some(perm_file));
1294 tokio::task::spawn_local(perm_handler);
1295
1296 let term_conn = Rc::new(FakeTerminalClient);
1297 let (exec, term_handler) = AcpShellExecutor::new(term_conn, sid, Some(gate), 120);
1298 tokio::task::spawn_local(term_handler);
1299
1300 let oversized = "x".repeat(MAX_STDIN_BYTES + 1);
1301 let mut params = serde_json::Map::new();
1302 params.insert("terminal_id".to_owned(), serde_json::json!("term-1"));
1303 params.insert("data".to_owned(), serde_json::json!(oversized));
1304 let call = ToolCall {
1305 tool_id: zeph_tools::ToolName::new("bash_stdin"),
1306 params,
1307 caller_id: None,
1308 };
1309 let err = exec.execute_tool_call(&call).await.unwrap_err();
1310 assert!(matches!(err, ToolError::InvalidParams { .. }));
1311 })
1312 .await;
1313 }
1314
1315 struct AllowPermissionClient;
1316
1317 #[async_trait::async_trait(?Send)]
1318 impl acp::Client for AllowPermissionClient {
1319 async fn request_permission(
1320 &self,
1321 _args: acp::schema::RequestPermissionRequest,
1322 ) -> acp::Result<acp::RequestPermissionResponse> {
1323 Ok(acp::RequestPermissionResponse::new(
1324 acp::schema::RequestPermissionOutcome::Selected(
1325 acp::SelectedPermissionOutcome::new("allow_once"),
1326 ),
1327 ))
1328 }
1329
1330 async fn session_notification(
1331 &self,
1332 _args: acp::schema::SessionNotification,
1333 ) -> acp::Result<()> {
1334 Ok(())
1335 }
1336 }
1337
1338 #[tokio::test]
1339 async fn bash_stdin_with_permission_gate_succeeds() {
1340 let local = tokio::task::LocalSet::new();
1341 local
1342 .run_until(async {
1343 let perm_conn = Rc::new(AllowPermissionClient);
1344 let sid = acp::schema::SessionId::new("s1");
1345 let tmp_dir = tempfile::tempdir().unwrap();
1346 let perm_file = tmp_dir.path().join("perms.toml");
1347 let (gate, perm_handler) = AcpPermissionGate::new(perm_conn, Some(perm_file));
1348 tokio::task::spawn_local(perm_handler);
1349
1350 let term_conn = Rc::new(FakeTerminalClient);
1351 let (exec, term_handler) = AcpShellExecutor::new(term_conn, sid, Some(gate), 120);
1352 tokio::task::spawn_local(term_handler);
1353
1354 let mut params = serde_json::Map::new();
1355 params.insert("terminal_id".to_owned(), serde_json::json!("term-1"));
1356 params.insert("data".to_owned(), serde_json::json!("echo hello\n"));
1357 let call = ToolCall {
1358 tool_id: zeph_tools::ToolName::new("bash_stdin"),
1359 params,
1360 caller_id: None,
1361 };
1362 let result = exec.execute_tool_call(&call).await.unwrap().unwrap();
1363 assert_eq!(result.tool_name, "bash_stdin");
1364 assert!(result.summary.contains("term-1"));
1365 })
1366 .await;
1367 }
1368
1369 #[test]
1370 fn bash_stdin_in_tool_definitions_with_gate() {
1371 let (tx, _rx) = mpsc::unbounded_channel::<TerminalMessage>();
1372 let tmp_dir = tempfile::tempdir().unwrap();
1373 let perm_file = tmp_dir.path().join("perms.toml");
1374 let perm_conn = Rc::new(AllowPermissionClient);
1375 let (gate, _handler) = AcpPermissionGate::new(perm_conn, Some(perm_file));
1376 let exec = AcpShellExecutor {
1377 session_id: acp::schema::SessionId::new("s"),
1378 request_tx: tx,
1379 permission_gate: Some(gate),
1380 timeout: Duration::from_mins(2),
1381 };
1382 let defs = exec.tool_definitions();
1383 assert!(defs.iter().any(|d| d.id == "bash_stdin"));
1384 assert!(defs.iter().any(|d| d.id == "bash"));
1385 }
1386
1387 #[tokio::test]
1388 async fn bash_stdin_exactly_64kib_boundary_accepted() {
1389 let local = tokio::task::LocalSet::new();
1390 local
1391 .run_until(async {
1392 let perm_conn = Rc::new(AllowPermissionClient);
1393 let sid = acp::schema::SessionId::new("s1");
1394 let tmp_dir = tempfile::tempdir().unwrap();
1395 let perm_file = tmp_dir.path().join("perms.toml");
1396 let (gate, perm_handler) = AcpPermissionGate::new(perm_conn, Some(perm_file));
1397 tokio::task::spawn_local(perm_handler);
1398
1399 let term_conn = Rc::new(FakeTerminalClient);
1400 let (exec, term_handler) = AcpShellExecutor::new(term_conn, sid, Some(gate), 120);
1401 tokio::task::spawn_local(term_handler);
1402
1403 let at_limit = "x".repeat(MAX_STDIN_BYTES);
1405 let mut params = serde_json::Map::new();
1406 params.insert("terminal_id".to_owned(), serde_json::json!("term-1"));
1407 params.insert("data".to_owned(), serde_json::json!(at_limit));
1408 let call = ToolCall {
1409 tool_id: zeph_tools::ToolName::new("bash_stdin"),
1410 params,
1411 caller_id: None,
1412 };
1413 let result = exec.execute_tool_call(&call).await.unwrap().unwrap();
1414 assert_eq!(result.tool_name, "bash_stdin");
1415 })
1416 .await;
1417 }
1418
1419 #[tokio::test]
1420 async fn bash_stdin_broken_pipe_fast_fail() {
1421 let local = tokio::task::LocalSet::new();
1423 local
1424 .run_until(async {
1425 let (tx, rx) = mpsc::unbounded_channel::<TerminalMessage>();
1426 let conn = Rc::new(FakeTerminalClient);
1427 let handler = async move { run_terminal_handler(conn, rx).await };
1428 tokio::task::spawn_local(handler);
1429
1430 let sid = acp::schema::SessionId::new("s1");
1431 let tid: acp::schema::TerminalId = "term-bp".to_owned().into();
1432
1433 let mut replies = Vec::new();
1438 for _ in 0..=STDIN_CHANNEL_CAPACITY {
1439 let (reply_tx, reply_rx) = oneshot::channel();
1440 tx.send(TerminalMessage::WriteStdin(StdinWriteRequest {
1441 session_id: sid.clone(),
1442 terminal_id: tid.clone(),
1443 data: b"x".to_vec(),
1444 reply: reply_tx,
1445 }))
1446 .unwrap();
1447 replies.push(reply_rx);
1448 }
1449 let mut got_broken_pipe = false;
1451 for reply_rx in replies {
1452 if let Ok(Err(AcpError::BrokenPipe)) = reply_rx.await {
1453 got_broken_pipe = true;
1454 }
1455 }
1456 assert!(
1457 got_broken_pipe,
1458 "expected at least one BrokenPipe from overflow"
1459 );
1460 })
1461 .await;
1462 }
1463
1464 #[tokio::test]
1465 async fn bash_stdin_pump_cancelled_on_release() {
1466 let local = tokio::task::LocalSet::new();
1469 local
1470 .run_until(async {
1471 let (tx, rx) = mpsc::unbounded_channel::<TerminalMessage>();
1472 let conn = Rc::new(FakeTerminalClient);
1473 let handler = async move { run_terminal_handler(conn, rx).await };
1474 tokio::task::spawn_local(handler);
1475
1476 let sid = acp::schema::SessionId::new("s1");
1477 let tid: acp::schema::TerminalId = "term-rel".to_owned().into();
1478
1479 let (reply_tx, reply_rx) = oneshot::channel();
1481 tx.send(TerminalMessage::WriteStdin(StdinWriteRequest {
1482 session_id: sid.clone(),
1483 terminal_id: tid.clone(),
1484 data: b"hello\n".to_vec(),
1485 reply: reply_tx,
1486 }))
1487 .unwrap();
1488 reply_rx.await.unwrap().unwrap(); tx.send(TerminalMessage::Release(TerminalReleaseRequest {
1492 session_id: sid.clone(),
1493 terminal_id: tid.to_string(),
1494 }))
1495 .unwrap();
1496
1497 tokio::task::yield_now().await;
1499
1500 let (fresh_reply, write_result) = oneshot::channel();
1502 tx.send(TerminalMessage::WriteStdin(StdinWriteRequest {
1503 session_id: sid.clone(),
1504 terminal_id: tid.clone(),
1505 data: b"after release\n".to_vec(),
1506 reply: fresh_reply,
1507 }))
1508 .unwrap();
1509 write_result.await.unwrap().unwrap();
1511 })
1512 .await;
1513 }
1514}