Skip to main content

zeph_acp/
terminal.rs

1// SPDX-FileCopyrightText: 2026 Andrei G <bug-ops>
2// SPDX-License-Identifier: MIT OR Apache-2.0
3
4//! IDE-proxied shell executor via ACP `terminal/*` methods.
5//!
6//! When the IDE advertises `terminal` capability, the agent routes `bash` tool
7//! calls through the IDE's integrated terminal instead of spawning a local process.
8//! This keeps the terminal visible in the IDE UI and allows live output streaming.
9//!
10//! # Security
11//!
12//! All terminal commands require an [`AcpPermissionGate`] to request IDE confirmation.
13//! Stdin writes are rate-limited and capped at 64 KiB (REQ-P23-1). Commands that
14//! resolve to shell interpreters (`bash`, `sh`, `zsh`, etc.) trigger an explicit
15//! warning in the permission prompt.
16//!
17//! # Terminal lifecycle
18//!
19//! ACP requires the terminal to remain alive until after the `tool_call_update`
20//! notification containing `ToolCallContent::Terminal(terminal_id)` is emitted.
21//! Call [`AcpShellExecutor::release_terminal`] only after that notification is sent.
22
23use std::path::PathBuf;
24use std::sync::Arc;
25use std::time::Duration;
26
27use agent_client_protocol as acp;
28use schemars::JsonSchema;
29use serde::Deserialize;
30use tokio::sync::{mpsc, oneshot};
31use tokio_util::sync::CancellationToken;
32use zeph_tools::{
33    ToolCall, ToolError, ToolOutput,
34    executor::deserialize_params,
35    registry::{InvocationHint, ToolDef},
36};
37
38use crate::{error::AcpError, permission::AcpPermissionGate};
39
40const KILL_GRACE_TIMEOUT: Duration = Duration::from_secs(5);
41
42/// Maximum stdin payload size (64 KiB). REQ-P23-1.
43const MAX_STDIN_BYTES: usize = 65_536;
44
45/// Bounded stdin channel capacity (back-pressure). MED-02.
46const STDIN_CHANNEL_CAPACITY: usize = 16;
47
48/// Stdin rate-limit interval — 100 msg/sec. MED-02.
49const STDIN_RATE_INTERVAL: Duration = Duration::from_millis(10);
50
51/// Shell interpreters that require explicit warning in permission prompt. REQ-P23-5.
52const SHELL_INTERPRETERS: &[&str] = &["bash", "sh", "zsh", "fish", "dash"];
53
54/// Transparent prefixes that wrap another command without changing its semantics.
55const TRANSPARENT_PREFIXES: &[&str] = &["env", "command", "exec", "nice", "nohup", "time"];
56
57/// Extract the effective command binary name from a shell command string.
58///
59/// Iteratively skips transparent prefixes (`env`, `command`, `exec`, etc.) and
60/// env-var assignments (`FOO=bar`) to reach the real binary. Falls back to `"bash"`
61/// if the command is empty.
62fn extract_command_binary(command: &str) -> &str {
63    // Split into tokens and skip leading env-var assignments and transparent prefixes.
64    let mut tokens = command.split_whitespace().peekable();
65    loop {
66        match tokens.peek() {
67            None => return "bash",
68            Some(tok) => {
69                // Skip env-var assignments.
70                if tok.contains('=') {
71                    tokens.next();
72                    continue;
73                }
74                // Skip transparent prefix commands.
75                let base = tok.rsplit('/').next().unwrap_or(tok);
76                if TRANSPARENT_PREFIXES.contains(&base) {
77                    tokens.next();
78                    continue;
79                }
80                // First non-prefix, non-assignment token is the binary.
81                let binary = tok.rsplit('/').next().unwrap_or(tok);
82                return binary;
83            }
84        }
85    }
86}
87
88struct ShellResult {
89    output: String,
90    exit_code: Option<u32>,
91    terminal_id: String,
92}
93
94struct TerminalRequest {
95    session_id: acp::schema::SessionId,
96    command: String,
97    args: Vec<String>,
98    cwd: Option<PathBuf>,
99    timeout: Duration,
100    reply: oneshot::Sender<Result<ShellResult, AcpError>>,
101    /// When `Some`, intermediate terminal output chunks are sent as `ToolCallUpdate`
102    /// notifications on this channel so the IDE can stream output live.
103    /// The `tool_call_id` is the ACP tool call ID to update.
104    stream_tx: Option<(mpsc::Sender<acp::schema::SessionNotification>, String)>,
105}
106
107struct TerminalReleaseRequest {
108    session_id: acp::schema::SessionId,
109    terminal_id: String,
110}
111
112struct StdinWriteRequest {
113    session_id: acp::schema::SessionId,
114    terminal_id: acp::schema::TerminalId,
115    data: Vec<u8>,
116    reply: oneshot::Sender<Result<(), AcpError>>,
117}
118
119enum TerminalMessage {
120    Execute(TerminalRequest),
121    Release(TerminalReleaseRequest),
122    WriteStdin(StdinWriteRequest),
123}
124
125/// IDE-proxied shell executor.
126///
127/// Routes `bash` tool calls to the IDE terminal via ACP `terminal/*` methods.
128/// Only constructed when the IDE advertises `terminal` capability.
129#[derive(Clone)]
130pub struct AcpShellExecutor {
131    session_id: acp::schema::SessionId,
132    request_tx: mpsc::UnboundedSender<TerminalMessage>,
133    permission_gate: Option<AcpPermissionGate>,
134    timeout: Duration,
135}
136
137impl AcpShellExecutor {
138    /// Create the executor and its background handler future.
139    ///
140    /// Spawn the returned future with `tokio::spawn`; it drives terminal
141    /// create/execute/release requests forwarded from the `bash` and
142    /// `bash_stdin` tools.
143    pub fn new(
144        conn: Arc<acp::ConnectionTo<acp::Client>>,
145        session_id: acp::schema::SessionId,
146        permission_gate: Option<AcpPermissionGate>,
147        timeout_secs: u64,
148    ) -> (Self, impl std::future::Future<Output = ()>) {
149        Self::with_timeout(
150            conn,
151            session_id,
152            permission_gate,
153            Duration::from_secs(timeout_secs),
154        )
155    }
156
157    /// Create the executor with a configurable command timeout.
158    pub fn with_timeout(
159        conn: Arc<acp::ConnectionTo<acp::Client>>,
160        session_id: acp::schema::SessionId,
161        permission_gate: Option<AcpPermissionGate>,
162        timeout: Duration,
163    ) -> (Self, impl std::future::Future<Output = ()>) {
164        let (tx, rx) = mpsc::unbounded_channel::<TerminalMessage>();
165        let handler = async move { run_terminal_handler(conn, rx).await };
166        (
167            Self {
168                session_id,
169                request_tx: tx,
170                permission_gate,
171                timeout,
172            },
173            handler,
174        )
175    }
176
177    /// Release a terminal by ID after the `tool_call_update` notification has been sent.
178    ///
179    /// This must be called after the ACP `tool_call_update` containing
180    /// `ToolCallContent::Terminal(terminal_id)` is emitted so that the IDE can
181    /// still display the terminal output when it processes the notification.
182    pub fn release_terminal(&self, terminal_id: String) {
183        self.request_tx
184            .send(TerminalMessage::Release(TerminalReleaseRequest {
185                session_id: self.session_id.clone(),
186                terminal_id,
187            }))
188            .ok();
189    }
190
191    async fn handle_bash_stdin(&self, call: &ToolCall) -> Result<Option<ToolOutput>, ToolError> {
192        // REQ-P23-2: blocked if no permission gate
193        let gate = self
194            .permission_gate
195            .as_ref()
196            .ok_or_else(|| ToolError::Blocked {
197                command: "bash_stdin: permission gate required".into(),
198            })?;
199
200        let params: BashStdinParams = deserialize_params(&call.params)?;
201
202        if params.data.len() > MAX_STDIN_BYTES {
203            return Err(ToolError::InvalidParams {
204                message: AcpError::StdinTooLarge {
205                    size: params.data.len(),
206                }
207                .to_string(),
208            });
209        }
210        let data = params.data.as_bytes().to_vec();
211
212        // REQ-P23-5: warn when writing to a shell interpreter terminal.
213        // Terminal IDs are opaque strings, but common practice is to include
214        // the command name. We always request permission explicitly for stdin writes.
215        let is_shell = SHELL_INTERPRETERS
216            .iter()
217            .any(|s| params.terminal_id.contains(s));
218        let title = if is_shell {
219            "bash_stdin [WARNING: stdin to shell interpreter — data will be executed as commands]"
220                .to_string()
221        } else {
222            "bash_stdin".to_owned()
223        };
224        let fields = acp::schema::ToolCallUpdateFields::new()
225            .title(title)
226            .raw_input(serde_json::json!({
227                "terminal_id": params.terminal_id,
228                "data_length": params.data.len(),
229            }));
230        let tool_call = acp::schema::ToolCallUpdate::new("bash_stdin".to_owned(), fields);
231        let allowed = gate
232            .check_permission(self.session_id.clone(), tool_call)
233            .await
234            .map_err(|e| ToolError::InvalidParams {
235                message: e.to_string(),
236            })?;
237        if !allowed {
238            return Err(ToolError::Blocked {
239                command: "bash_stdin: permission denied".into(),
240            });
241        }
242
243        let terminal_id: acp::schema::TerminalId = params.terminal_id.clone().into();
244        let (reply_tx, reply_rx) = oneshot::channel();
245        self.request_tx
246            .send(TerminalMessage::WriteStdin(StdinWriteRequest {
247                session_id: self.session_id.clone(),
248                terminal_id,
249                data,
250                reply: reply_tx,
251            }))
252            .map_err(|_| ToolError::InvalidParams {
253                message: "terminal handler closed".into(),
254            })?;
255        reply_rx
256            .await
257            .map_err(|_| ToolError::InvalidParams {
258                message: "terminal handler closed".into(),
259            })?
260            .map_err(|e| ToolError::InvalidParams {
261                message: e.to_string(),
262            })?;
263
264        Ok(Some(ToolOutput {
265            tool_name: zeph_tools::ToolName::new("bash_stdin"),
266            summary: format!(
267                "wrote {} bytes to stdin of {}",
268                params.data.len(),
269                params.terminal_id
270            ),
271            blocks_executed: 1,
272            filter_stats: None,
273            diff: None,
274            streamed: false,
275            terminal_id: Some(params.terminal_id),
276            locations: None,
277            raw_response: None,
278            claim_source: Some(zeph_tools::ClaimSource::Shell),
279        }))
280    }
281
282    async fn execute_shell(
283        &self,
284        command: String,
285        args: Vec<String>,
286        cwd: Option<PathBuf>,
287        stream_tx: Option<(mpsc::Sender<acp::schema::SessionNotification>, String)>,
288    ) -> Result<ShellResult, AcpError> {
289        let (reply_tx, reply_rx) = oneshot::channel();
290        self.request_tx
291            .send(TerminalMessage::Execute(TerminalRequest {
292                session_id: self.session_id.clone(),
293                command,
294                args,
295                cwd,
296                timeout: self.timeout,
297                reply: reply_tx,
298                stream_tx,
299            }))
300            .map_err(|_| AcpError::ChannelClosed)?;
301        reply_rx.await.map_err(|_| AcpError::ChannelClosed)?
302    }
303}
304
305#[derive(Deserialize, JsonSchema)]
306struct BashParams {
307    command: String,
308    #[serde(default)]
309    args: Vec<String>,
310    #[serde(default)]
311    cwd: Option<String>,
312}
313
314#[derive(Deserialize, JsonSchema)]
315struct BashStdinParams {
316    terminal_id: String,
317    data: String,
318}
319
320impl zeph_tools::ToolExecutor for AcpShellExecutor {
321    async fn execute(&self, _response: &str) -> Result<Option<ToolOutput>, ToolError> {
322        Ok(None)
323    }
324
325    fn tool_definitions(&self) -> Vec<ToolDef> {
326        let mut defs = vec![ToolDef {
327            id: "bash".into(),
328            description: "Execute a shell command in the IDE terminal.\n\nParameters: command (string, required) - shell command to run\nReturns: stdout/stderr combined with exit code\nErrors: Timeout; permission denied by IDE; command blocked by policy\nExample: {\"command\": \"cargo build\"}".into(),
329            schema: schemars::schema_for!(BashParams),
330            invocation: InvocationHint::ToolCall,
331            output_schema: None,
332        }];
333        // REQ-P23-2: bash_stdin only available when a permission gate is present.
334        if self.permission_gate.is_some() {
335            defs.push(ToolDef {
336                id: "bash_stdin".into(),
337                description: "Write data to stdin of a running terminal process.\n\nParameters: terminal_id (string, required) - terminal to write to; data (string, required) - stdin data\nReturns: confirmation\nErrors: terminal not found; terminal process exited\nExample: {\"terminal_id\": \"term-1\", \"data\": \"yes\\n\"}".into(),
338                schema: schemars::schema_for!(BashStdinParams),
339                invocation: InvocationHint::ToolCall,
340                output_schema: None,
341            });
342        }
343        defs
344    }
345
346    async fn execute_tool_call(&self, call: &ToolCall) -> Result<Option<ToolOutput>, ToolError> {
347        if call.tool_id == "bash_stdin" {
348            return self.handle_bash_stdin(call).await;
349        }
350        if call.tool_id != "bash" {
351            return Ok(None);
352        }
353
354        let params: BashParams = deserialize_params(&call.params)?;
355        let cwd = params.cwd.map(PathBuf::from);
356
357        let blocklist: Vec<String> = zeph_tools::DEFAULT_BLOCKED_COMMANDS
358            .iter()
359            .map(|s| (*s).to_owned())
360            .collect();
361
362        // Blocklist check — reject dangerous commands before hitting the permission gate.
363        if let Some(pattern) = zeph_tools::check_blocklist(&params.command, &blocklist) {
364            return Err(ToolError::Blocked { command: pattern });
365        }
366        // Also check args when the command is a shell interpreter (e.g. bash -c "rm -rf /").
367        // This prevents args-field bypass: { command: "bash", args: ["-c", "blocked cmd"] }.
368        if let Some(script) = zeph_tools::effective_shell_command(&params.command, &params.args)
369            && let Some(pattern) = zeph_tools::check_blocklist(script, &blocklist)
370        {
371            return Err(ToolError::Blocked { command: pattern });
372        }
373
374        if self.permission_gate.is_none() {
375            tracing::warn!(
376                "AcpShellExecutor has no permission gate — only blocklist applies. \
377                 Do not use in production without a permission gate."
378            );
379        }
380
381        if let Some(gate) = &self.permission_gate {
382            // Use the command binary as the cache key, not the tool_id ("bash").
383            // This makes "Allow always" apply per binary (git, cargo, etc.).
384            let cmd_binary = extract_command_binary(&params.command);
385            let fields = acp::schema::ToolCallUpdateFields::new()
386                .title(cmd_binary.to_owned())
387                .raw_input(serde_json::json!({ "command": params.command }));
388            let tool_call = acp::schema::ToolCallUpdate::new(cmd_binary.to_owned(), fields);
389            let allowed = gate
390                .check_permission(self.session_id.clone(), tool_call)
391                .await
392                .map_err(|e| ToolError::InvalidParams {
393                    message: e.to_string(),
394                })?;
395            if !allowed {
396                return Err(ToolError::Blocked {
397                    command: params.command,
398                });
399            }
400        }
401
402        let result = self
403            .execute_shell(params.command, params.args, cwd, None)
404            .await
405            .map_err(|e| ToolError::InvalidParams {
406                message: e.to_string(),
407            })?;
408
409        let is_error = !matches!(result.exit_code, Some(0) | None);
410        let summary = if is_error {
411            format!(
412                "[exit {}]\n{}",
413                result.exit_code.unwrap_or(1),
414                result.output
415            )
416        } else {
417            result.output.clone()
418        };
419        let raw_response = Some(serde_json::json!({
420            "stdout": result.output,
421            "stderr": "",
422            "interrupted": false,
423            "isImage": false,
424            "noOutputExpected": false
425        }));
426
427        Ok(Some(ToolOutput {
428            tool_name: zeph_tools::ToolName::new("bash"),
429            summary,
430            blocks_executed: 1,
431            filter_stats: None,
432            diff: None,
433            streamed: false,
434            terminal_id: Some(result.terminal_id),
435            locations: None,
436            raw_response,
437            claim_source: Some(zeph_tools::ClaimSource::Shell),
438        }))
439    }
440}
441
442async fn forward_stdin_via_ext(
443    conn: &Arc<acp::ConnectionTo<acp::Client>>,
444    session_id: &acp::schema::SessionId,
445    terminal_id: &acp::schema::TerminalId,
446    data: Vec<u8>,
447) -> Result<(), AcpError> {
448    use base64::Engine as _;
449    let encoded = base64::engine::general_purpose::STANDARD.encode(&data);
450    let params_json = serde_json::json!({
451        "session_id": session_id.to_string(),
452        "terminal_id": terminal_id.to_string(),
453        "data": encoded,
454    });
455    let req = acp::UntypedMessage::new("terminal/write_stdin", params_json)
456        .map_err(|e| AcpError::ClientError(e.to_string()))?;
457    conn.send_request(req)
458        .block_task()
459        .await
460        .map(|_| ())
461        .map_err(|e| AcpError::ClientError(e.to_string()))
462}
463
464/// Background pump: drains bounded stdin channel at ≤100 msg/sec (MED-02).
465///
466/// REQ-P23-3: on any error from `ext_method`, cancels the token and exits.
467async fn run_stdin_pump(
468    conn: Arc<acp::ConnectionTo<acp::Client>>,
469    session_id: acp::schema::SessionId,
470    terminal_id: acp::schema::TerminalId,
471    mut data_rx: mpsc::Receiver<Vec<u8>>,
472    cancel: CancellationToken,
473) {
474    let mut interval = tokio::time::interval(STDIN_RATE_INTERVAL);
475    interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
476    loop {
477        let data = tokio::select! {
478            () = cancel.cancelled() => break,
479            msg = data_rx.recv() => match msg {
480                Some(d) => d,
481                None => break,
482            },
483        };
484        // Rate-limit: wait for tick before forwarding. MED-02.
485        tokio::select! {
486            () = cancel.cancelled() => break,
487            _ = interval.tick() => {}
488        }
489        if let Err(e) = forward_stdin_via_ext(&conn, &session_id, &terminal_id, data).await {
490            // REQ-P23-3: no panics, log and cancel.
491            tracing::warn!(%terminal_id, error = %e, "stdin pump error — cancelling");
492            cancel.cancel();
493            break;
494        }
495    }
496}
497
498async fn run_terminal_handler(
499    conn: Arc<acp::ConnectionTo<acp::Client>>,
500    mut rx: mpsc::UnboundedReceiver<TerminalMessage>,
501) {
502    // Maps terminal_id -> (bounded stdin sender, CancellationToken). MED-02, REQ-P23-4.
503    let mut stdin_pumps: std::collections::HashMap<
504        String,
505        (mpsc::Sender<Vec<u8>>, CancellationToken),
506    > = std::collections::HashMap::new();
507
508    while let Some(msg) = rx.recv().await {
509        match msg {
510            TerminalMessage::Execute(req) => {
511                let result = execute_in_terminal(
512                    &conn,
513                    req.session_id,
514                    req.command,
515                    req.args,
516                    req.cwd,
517                    req.timeout,
518                    req.stream_tx,
519                )
520                .await;
521                // Cancel stdin pump when terminal completes. REQ-P23-4.
522                if let Ok(ref shell_result) = result
523                    && let Some((_, token)) = stdin_pumps.remove(&shell_result.terminal_id)
524                {
525                    token.cancel();
526                }
527                req.reply.send(result).ok();
528            }
529            TerminalMessage::Release(req) => {
530                // Cancel stdin pump on release. REQ-P23-4.
531                if let Some((_, token)) = stdin_pumps.remove(&req.terminal_id) {
532                    token.cancel();
533                }
534                let tid = req.terminal_id.clone();
535                let release_req =
536                    acp::schema::ReleaseTerminalRequest::new(req.session_id, req.terminal_id);
537                if let Err(e) = conn.send_request(release_req).block_task().await {
538                    tracing::warn!(
539                        terminal_id = %tid,
540                        error = %e,
541                        "failed to release terminal"
542                    );
543                }
544            }
545            TerminalMessage::WriteStdin(req) => {
546                let tid_str = req.terminal_id.to_string();
547
548                // Lazily start a bounded pump task per terminal. MED-02.
549                let (data_tx, cancel) = stdin_pumps.entry(tid_str).or_insert_with(|| {
550                    let (tx, rx) = mpsc::channel::<Vec<u8>>(STDIN_CHANNEL_CAPACITY);
551                    let token = CancellationToken::new();
552                    tokio::spawn(run_stdin_pump(
553                        conn.clone(),
554                        req.session_id.clone(),
555                        req.terminal_id.clone(),
556                        rx,
557                        token.clone(),
558                    ));
559                    (tx, token)
560                });
561
562                let result = if cancel.is_cancelled() {
563                    Err(AcpError::BrokenPipe)
564                } else {
565                    // Bounded send — returns Err if channel is full (back-pressure).
566                    data_tx.try_send(req.data).map_err(|_| AcpError::BrokenPipe)
567                };
568
569                req.reply.send(result).ok();
570            }
571        }
572    }
573}
574
575/// Polling interval for terminal output streaming.
576const STREAM_POLL_INTERVAL: Duration = Duration::from_millis(200);
577
578/// Kill a terminal, then wait up to [`KILL_GRACE_TIMEOUT`] for it to exit.
579async fn kill_terminal(
580    conn: &Arc<acp::ConnectionTo<acp::Client>>,
581    session_id: &acp::schema::SessionId,
582    terminal_id: &acp::schema::TerminalId,
583) -> Result<(), AcpError> {
584    tracing::warn!(%terminal_id, "terminal command timed out — sending kill");
585    let kill_req = acp::schema::KillTerminalRequest::new(session_id.clone(), terminal_id.clone());
586    conn.send_request(kill_req)
587        .block_task()
588        .await
589        .map_err(|e| AcpError::ClientError(e.to_string()))?;
590    let wait_again =
591        acp::schema::WaitForTerminalExitRequest::new(session_id.clone(), terminal_id.clone());
592    let _ = tokio::time::timeout(
593        KILL_GRACE_TIMEOUT,
594        conn.send_request(wait_again).block_task(),
595    )
596    .await;
597    Ok(())
598}
599
600/// Stream terminal output chunks to `notify_tx` while polling for process exit.
601///
602/// Returns the exit code once the process terminates or the timeout is reached.
603async fn stream_until_exit(
604    conn: &Arc<acp::ConnectionTo<acp::Client>>,
605    session_id: &acp::schema::SessionId,
606    terminal_id: &acp::schema::TerminalId,
607    timeout: Duration,
608    notify_tx: &mpsc::Sender<acp::schema::SessionNotification>,
609    tool_call_id: &str,
610) -> Result<Option<u32>, AcpError> {
611    let wait_req =
612        acp::schema::WaitForTerminalExitRequest::new(session_id.clone(), terminal_id.clone());
613    let exit_future = conn.send_request(wait_req).block_task();
614    tokio::pin!(exit_future);
615    let deadline = tokio::time::Instant::now() + timeout;
616    let mut last_output_len = 0usize;
617
618    loop {
619        tokio::select! {
620            result = &mut exit_future => {
621                return match result {
622                    Ok(resp) => Ok(resp.exit_status.exit_code),
623                    Err(e) => Err(AcpError::ClientError(e.to_string())),
624                };
625            }
626            () = tokio::time::sleep(STREAM_POLL_INTERVAL) => {
627                if tokio::time::Instant::now() >= deadline {
628                    kill_terminal(conn, session_id, terminal_id).await?;
629                    return Ok(Some(124u32));
630                }
631                let output_req =
632                    acp::schema::TerminalOutputRequest::new(session_id.clone(), terminal_id.clone());
633                if let Ok(resp) = conn.send_request(output_req).block_task().await {
634                    let new_data = resp.output.get(last_output_len..).unwrap_or("");
635                    if !new_data.is_empty() {
636                        last_output_len = resp.output.len();
637                        let mut meta = serde_json::Map::new();
638                        meta.insert(
639                            "terminal_output".to_owned(),
640                            serde_json::json!({
641                                "terminal_id": terminal_id.to_string(),
642                                "data": new_data,
643                            }),
644                        );
645                        let update = acp::schema::ToolCallUpdate::new(
646                            tool_call_id.to_owned(),
647                            acp::schema::ToolCallUpdateFields::new(),
648                        )
649                        .meta(meta);
650                        let notif = acp::schema::SessionNotification::new(
651                            session_id.clone(),
652                            acp::schema::SessionUpdate::ToolCallUpdate(update),
653                        );
654                        let _ = notify_tx.try_send(notif);
655                    }
656                }
657            }
658        }
659    }
660}
661
662async fn execute_in_terminal(
663    conn: &Arc<acp::ConnectionTo<acp::Client>>,
664    session_id: acp::schema::SessionId,
665    command: String,
666    args: Vec<String>,
667    cwd: Option<PathBuf>,
668    timeout: Duration,
669    stream_tx: Option<(mpsc::Sender<acp::schema::SessionNotification>, String)>,
670) -> Result<ShellResult, AcpError> {
671    // 1. Create terminal.
672    let create_req = acp::schema::CreateTerminalRequest::new(session_id.clone(), command)
673        .args(args)
674        .cwd(cwd);
675    let create_resp = conn
676        .send_request(create_req)
677        .block_task()
678        .await
679        .map_err(|e| AcpError::ClientError(e.to_string()))?;
680    let terminal_id = create_resp.terminal_id;
681
682    // 2. Wait for exit with timeout; kill if exceeded.
683    let exit_code = if let Some((ref notify_tx, ref tool_call_id)) = stream_tx {
684        stream_until_exit(
685            conn,
686            &session_id,
687            &terminal_id,
688            timeout,
689            notify_tx,
690            tool_call_id,
691        )
692        .await?
693    } else {
694        let wait_req =
695            acp::schema::WaitForTerminalExitRequest::new(session_id.clone(), terminal_id.clone());
696        match tokio::time::timeout(timeout, conn.send_request(wait_req).block_task()).await {
697            Ok(Ok(resp)) => resp.exit_status.exit_code,
698            Ok(Err(e)) => return Err(AcpError::ClientError(e.to_string())),
699            Err(_) => {
700                kill_terminal(conn, &session_id, &terminal_id).await?;
701                Some(124u32)
702            }
703        }
704    };
705
706    // 3. Get final output. Terminal is NOT released here — the caller releases it
707    //    after the ACP `tool_call_update` notification carrying `ToolCallContent::Terminal`
708    //    has been sent, so the IDE can still display the terminal output.
709    let output_req =
710        acp::schema::TerminalOutputRequest::new(session_id.clone(), terminal_id.clone());
711    let output_resp = conn
712        .send_request(output_req)
713        .block_task()
714        .await
715        .map_err(|e| AcpError::ClientError(e.to_string()))?;
716
717    // 4. Emit terminal_exit notification if streaming is active.
718    if let Some((ref notify_tx, ref tool_call_id)) = stream_tx {
719        let mut meta = serde_json::Map::new();
720        meta.insert(
721            "terminal_exit".to_owned(),
722            serde_json::json!({ "terminal_id": terminal_id.to_string(), "exit_code": exit_code }),
723        );
724        let update = acp::schema::ToolCallUpdate::new(
725            tool_call_id.clone(),
726            acp::schema::ToolCallUpdateFields::new(),
727        )
728        .meta(meta);
729        let notif = acp::schema::SessionNotification::new(
730            session_id.clone(),
731            acp::schema::SessionUpdate::ToolCallUpdate(update),
732        );
733        let _ = notify_tx.try_send(notif);
734    }
735
736    // Terminal release is handled by AcpShellExecutor::release_terminal via TerminalMessage::Release.
737    Ok(ShellResult {
738        output: output_resp.output,
739        exit_code,
740        terminal_id: terminal_id.to_string(),
741    })
742}
743
744// Tests disabled pending ACP 0.11 test infrastructure update (issue #3267 PR3)
745#[cfg(any())] // ACP 0.10 tests disabled — pending PR3 test infrastructure
746mod tests {
747    use std::rc::Rc;
748
749    use zeph_tools::ToolExecutor as _;
750
751    use super::*;
752
753    struct FakeTerminalClient;
754
755    #[async_trait::async_trait(?Send)]
756    impl acp::Client for FakeTerminalClient {
757        async fn request_permission(
758            &self,
759            _args: acp::schema::RequestPermissionRequest,
760        ) -> acp::Result<acp::RequestPermissionResponse> {
761            Err(acp::Error::method_not_found())
762        }
763
764        async fn create_terminal(
765            &self,
766            _args: acp::schema::CreateTerminalRequest,
767        ) -> acp::Result<acp::schema::CreateTerminalResponse> {
768            Ok(acp::schema::CreateTerminalResponse::new("term-1"))
769        }
770
771        async fn wait_for_terminal_exit(
772            &self,
773            _args: acp::schema::WaitForTerminalExitRequest,
774        ) -> acp::Result<acp::WaitForTerminalExitResponse> {
775            Ok(acp::WaitForTerminalExitResponse::new(
776                acp::TerminalExitStatus::new().exit_code(0u32),
777            ))
778        }
779
780        async fn terminal_output(
781            &self,
782            _args: acp::schema::TerminalOutputRequest,
783        ) -> acp::Result<acp::TerminalOutputResponse> {
784            Ok(acp::TerminalOutputResponse::new("hello\n", false))
785        }
786
787        async fn release_terminal(
788            &self,
789            _args: acp::schema::ReleaseTerminalRequest,
790        ) -> acp::Result<acp::ReleaseTerminalResponse> {
791            Ok(acp::ReleaseTerminalResponse::new())
792        }
793
794        async fn kill_terminal(
795            &self,
796            _args: acp::schema::KillTerminalRequest,
797        ) -> acp::Result<acp::KillTerminalResponse> {
798            Ok(acp::KillTerminalResponse::new())
799        }
800
801        async fn session_notification(
802            &self,
803            _args: acp::schema::SessionNotification,
804        ) -> acp::Result<()> {
805            Ok(())
806        }
807    }
808
809    #[tokio::test]
810    async fn bash_tool_call_returns_output() {
811        let local = tokio::task::LocalSet::new();
812        local
813            .run_until(async {
814                let conn = Rc::new(FakeTerminalClient);
815                let sid = acp::schema::SessionId::new("s1");
816                let (exec, handler) = AcpShellExecutor::new(conn, sid, None, 120);
817                tokio::task::spawn_local(handler);
818
819                let mut params = serde_json::Map::new();
820                params.insert("command".to_owned(), serde_json::json!("echo"));
821                params.insert("args".to_owned(), serde_json::json!(["hello"]));
822                let call = ToolCall {
823                    tool_id: zeph_tools::ToolName::new("bash"),
824                    params,
825                    caller_id: None,
826                };
827
828                let result = exec.execute_tool_call(&call).await.unwrap().unwrap();
829                assert_eq!(result.summary, "hello\n");
830                assert_eq!(result.tool_name, "bash");
831            })
832            .await;
833    }
834
835    #[tokio::test]
836    async fn unknown_tool_returns_none() {
837        let local = tokio::task::LocalSet::new();
838        local
839            .run_until(async {
840                let conn = Rc::new(FakeTerminalClient);
841                let sid = acp::schema::SessionId::new("s1");
842                let (exec, handler) = AcpShellExecutor::new(conn, sid, None, 120);
843                tokio::task::spawn_local(handler);
844
845                let call = ToolCall {
846                    tool_id: zeph_tools::ToolName::new("unknown"),
847                    params: serde_json::Map::new(),
848                    caller_id: None,
849                };
850                let result = exec.execute_tool_call(&call).await.unwrap();
851                assert!(result.is_none());
852            })
853            .await;
854    }
855
856    #[test]
857    fn tool_definitions_registers_bash() {
858        let (tx, _rx) = mpsc::unbounded_channel::<TerminalMessage>();
859        let exec = AcpShellExecutor {
860            session_id: acp::schema::SessionId::new("s"),
861            request_tx: tx,
862            permission_gate: None,
863            timeout: Duration::from_mins(2),
864        };
865        let defs = exec.tool_definitions();
866        assert_eq!(defs.len(), 1);
867        assert_eq!(defs[0].id, "bash");
868    }
869
870    struct NonZeroExitClient;
871
872    #[async_trait::async_trait(?Send)]
873    impl acp::Client for NonZeroExitClient {
874        async fn request_permission(
875            &self,
876            _args: acp::schema::RequestPermissionRequest,
877        ) -> acp::Result<acp::RequestPermissionResponse> {
878            Err(acp::Error::method_not_found())
879        }
880
881        async fn create_terminal(
882            &self,
883            _args: acp::schema::CreateTerminalRequest,
884        ) -> acp::Result<acp::schema::CreateTerminalResponse> {
885            Ok(acp::schema::CreateTerminalResponse::new("term-fail"))
886        }
887
888        async fn wait_for_terminal_exit(
889            &self,
890            _args: acp::schema::WaitForTerminalExitRequest,
891        ) -> acp::Result<acp::WaitForTerminalExitResponse> {
892            Ok(acp::WaitForTerminalExitResponse::new(
893                acp::TerminalExitStatus::new().exit_code(1u32),
894            ))
895        }
896
897        async fn terminal_output(
898            &self,
899            _args: acp::schema::TerminalOutputRequest,
900        ) -> acp::Result<acp::TerminalOutputResponse> {
901            Ok(acp::TerminalOutputResponse::new("error output\n", false))
902        }
903
904        async fn release_terminal(
905            &self,
906            _args: acp::schema::ReleaseTerminalRequest,
907        ) -> acp::Result<acp::ReleaseTerminalResponse> {
908            Ok(acp::ReleaseTerminalResponse::new())
909        }
910
911        async fn kill_terminal(
912            &self,
913            _args: acp::schema::KillTerminalRequest,
914        ) -> acp::Result<acp::KillTerminalResponse> {
915            Ok(acp::KillTerminalResponse::new())
916        }
917
918        async fn session_notification(
919            &self,
920            _args: acp::schema::SessionNotification,
921        ) -> acp::Result<()> {
922            Ok(())
923        }
924    }
925
926    #[tokio::test]
927    async fn nonzero_exit_code_prefixes_output() {
928        let local = tokio::task::LocalSet::new();
929        local
930            .run_until(async {
931                let conn = Rc::new(NonZeroExitClient);
932                let sid = acp::schema::SessionId::new("s1");
933                let (exec, handler) = AcpShellExecutor::new(conn, sid, None, 120);
934                tokio::task::spawn_local(handler);
935
936                let mut params = serde_json::Map::new();
937                params.insert("command".to_owned(), serde_json::json!("false"));
938                let call = ToolCall {
939                    tool_id: zeph_tools::ToolName::new("bash"),
940                    params,
941                    caller_id: None,
942                };
943
944                let result = exec.execute_tool_call(&call).await.unwrap().unwrap();
945                assert!(
946                    result.summary.starts_with("[exit 1]"),
947                    "got: {}",
948                    result.summary
949                );
950                assert!(result.summary.contains("error output\n"));
951            })
952            .await;
953    }
954
955    struct RejectPermissionClient;
956
957    #[async_trait::async_trait(?Send)]
958    impl acp::Client for RejectPermissionClient {
959        async fn request_permission(
960            &self,
961            _args: acp::schema::RequestPermissionRequest,
962        ) -> acp::Result<acp::RequestPermissionResponse> {
963            Ok(acp::RequestPermissionResponse::new(
964                acp::schema::RequestPermissionOutcome::Selected(
965                    acp::SelectedPermissionOutcome::new("reject_once"),
966                ),
967            ))
968        }
969
970        async fn create_terminal(
971            &self,
972            _args: acp::schema::CreateTerminalRequest,
973        ) -> acp::Result<acp::schema::CreateTerminalResponse> {
974            panic!("should not be called when permission denied")
975        }
976
977        async fn wait_for_terminal_exit(
978            &self,
979            _args: acp::schema::WaitForTerminalExitRequest,
980        ) -> acp::Result<acp::WaitForTerminalExitResponse> {
981            panic!("should not be called when permission denied")
982        }
983
984        async fn terminal_output(
985            &self,
986            _args: acp::schema::TerminalOutputRequest,
987        ) -> acp::Result<acp::TerminalOutputResponse> {
988            panic!("should not be called when permission denied")
989        }
990
991        async fn release_terminal(
992            &self,
993            _args: acp::schema::ReleaseTerminalRequest,
994        ) -> acp::Result<acp::ReleaseTerminalResponse> {
995            panic!("should not be called when permission denied")
996        }
997
998        async fn kill_terminal(
999            &self,
1000            _args: acp::schema::KillTerminalRequest,
1001        ) -> acp::Result<acp::KillTerminalResponse> {
1002            panic!("should not be called when permission denied")
1003        }
1004
1005        async fn session_notification(
1006            &self,
1007            _args: acp::schema::SessionNotification,
1008        ) -> acp::Result<()> {
1009            Ok(())
1010        }
1011    }
1012
1013    #[tokio::test]
1014    async fn permission_denied_returns_blocked_error() {
1015        let local = tokio::task::LocalSet::new();
1016        local
1017            .run_until(async {
1018                let perm_conn = Rc::new(RejectPermissionClient);
1019                let sid = acp::schema::SessionId::new("s1");
1020                let tmp_dir = tempfile::tempdir().unwrap();
1021                let perm_file = tmp_dir.path().join("perms.toml");
1022                let (gate, perm_handler) = AcpPermissionGate::new(perm_conn, Some(perm_file));
1023                tokio::task::spawn_local(perm_handler);
1024
1025                let term_conn = Rc::new(FakeTerminalClient);
1026                let (exec, term_handler) = AcpShellExecutor::new(term_conn, sid, Some(gate), 120);
1027                tokio::task::spawn_local(term_handler);
1028
1029                let mut params = serde_json::Map::new();
1030                params.insert("command".to_owned(), serde_json::json!("rm"));
1031                params.insert("args".to_owned(), serde_json::json!(["-rf", "/important"]));
1032                let call = ToolCall {
1033                    tool_id: zeph_tools::ToolName::new("bash"),
1034                    params,
1035                    caller_id: None,
1036                };
1037
1038                let err = exec.execute_tool_call(&call).await.unwrap_err();
1039                assert!(matches!(err, ToolError::Blocked { .. }));
1040            })
1041            .await;
1042    }
1043
1044    #[tokio::test]
1045    async fn streaming_mode_emits_terminal_exit_notification() {
1046        let local = tokio::task::LocalSet::new();
1047        local
1048            .run_until(async {
1049                let conn = Rc::new(FakeTerminalClient);
1050                let sid = acp::schema::SessionId::new("s1");
1051                let (tx, rx) = mpsc::unbounded_channel::<TerminalMessage>();
1052                let handler = async move { run_terminal_handler(conn, rx).await };
1053                tokio::task::spawn_local(handler);
1054
1055                let (stream_tx, mut stream_rx) = mpsc::channel(8);
1056                let (reply_tx, reply_rx) = oneshot::channel();
1057                tx.send(TerminalMessage::Execute(TerminalRequest {
1058                    session_id: sid,
1059                    command: "echo".to_owned(),
1060                    args: vec!["hi".to_owned()],
1061                    cwd: None,
1062                    timeout: Duration::from_secs(5),
1063                    reply: reply_tx,
1064                    stream_tx: Some((stream_tx, "tool-1".to_owned())),
1065                }))
1066                .unwrap();
1067
1068                let result = reply_rx.await.unwrap().unwrap();
1069                assert_eq!(result.output, "hello\n");
1070
1071                // At least a terminal_exit notification must arrive.
1072                let mut got_exit = false;
1073                while let Ok(notif) = stream_rx.try_recv() {
1074                    if let acp::schema::SessionUpdate::ToolCallUpdate(update) = notif.update
1075                        && let Some(meta) = update.meta
1076                        && meta.contains_key("terminal_exit")
1077                    {
1078                        got_exit = true;
1079                    }
1080                }
1081                assert!(got_exit, "expected terminal_exit notification");
1082            })
1083            .await;
1084    }
1085
1086    #[test]
1087    fn extract_command_binary_bare() {
1088        assert_eq!(extract_command_binary("git status"), "git");
1089        assert_eq!(extract_command_binary("cargo build --release"), "cargo");
1090        assert_eq!(extract_command_binary("  cat file.txt  "), "cat");
1091    }
1092
1093    #[test]
1094    fn extract_command_binary_env_prefix() {
1095        assert_eq!(extract_command_binary("env FOO=bar git status"), "git");
1096        assert_eq!(extract_command_binary("command git push"), "git");
1097        assert_eq!(extract_command_binary("exec cargo test"), "cargo");
1098    }
1099
1100    #[test]
1101    fn extract_command_binary_env_var_assignments() {
1102        assert_eq!(extract_command_binary("FOO=bar BAZ=qux git log"), "git");
1103    }
1104
1105    #[test]
1106    fn extract_command_binary_path() {
1107        assert_eq!(extract_command_binary("/usr/bin/git status"), "git");
1108        assert_eq!(
1109            extract_command_binary("/usr/local/bin/cargo build"),
1110            "cargo"
1111        );
1112    }
1113
1114    #[test]
1115    fn extract_command_binary_empty_fallback() {
1116        assert_eq!(extract_command_binary(""), "bash");
1117        assert_eq!(extract_command_binary("   "), "bash");
1118    }
1119
1120    #[tokio::test]
1121    async fn blocklist_blocked_before_permission_gate() {
1122        // rm -rf / must be blocked before the permission gate is consulted.
1123        // FakeTerminalClient panics if create_terminal is called — so if
1124        // we reach the terminal, the test fails.
1125        let local = tokio::task::LocalSet::new();
1126        local
1127            .run_until(async {
1128                let conn = Rc::new(FakeTerminalClient);
1129                let sid = acp::schema::SessionId::new("s1");
1130                // No permission gate — blocklist runs independently.
1131                let (exec, handler) = AcpShellExecutor::new(conn, sid, None, 120);
1132                tokio::task::spawn_local(handler);
1133
1134                let mut params = serde_json::Map::new();
1135                params.insert("command".to_owned(), serde_json::json!("rm -rf /"));
1136                let call = ToolCall {
1137                    tool_id: zeph_tools::ToolName::new("bash"),
1138                    params,
1139                    caller_id: None,
1140                };
1141
1142                let err = exec.execute_tool_call(&call).await.unwrap_err();
1143                assert!(matches!(err, ToolError::Blocked { .. }));
1144            })
1145            .await;
1146    }
1147
1148    #[tokio::test]
1149    async fn blocklist_sudo_blocked() {
1150        let local = tokio::task::LocalSet::new();
1151        local
1152            .run_until(async {
1153                let conn = Rc::new(FakeTerminalClient);
1154                let sid = acp::schema::SessionId::new("s1");
1155                let (exec, handler) = AcpShellExecutor::new(conn, sid, None, 120);
1156                tokio::task::spawn_local(handler);
1157
1158                let mut params = serde_json::Map::new();
1159                params.insert(
1160                    "command".to_owned(),
1161                    serde_json::json!("sudo apt install vim"),
1162                );
1163                let call = ToolCall {
1164                    tool_id: zeph_tools::ToolName::new("bash"),
1165                    params,
1166                    caller_id: None,
1167                };
1168
1169                let err = exec.execute_tool_call(&call).await.unwrap_err();
1170                assert!(matches!(err, ToolError::Blocked { .. }));
1171            })
1172            .await;
1173    }
1174
1175    #[tokio::test]
1176    async fn args_field_bypass_blocked_for_shell_interpreter() {
1177        // SEC-ACP-C2: { command: "bash", args: ["-c", "rm -rf /"] } must be blocked.
1178        let local = tokio::task::LocalSet::new();
1179        local
1180            .run_until(async {
1181                let conn = Rc::new(FakeTerminalClient);
1182                let sid = acp::schema::SessionId::new("s1");
1183                let (exec, handler) = AcpShellExecutor::new(conn, sid, None, 120);
1184                tokio::task::spawn_local(handler);
1185
1186                let mut params = serde_json::Map::new();
1187                params.insert("command".to_owned(), serde_json::json!("bash"));
1188                params.insert(
1189                    "args".to_owned(),
1190                    serde_json::json!(["-c", "sudo rm -rf /"]),
1191                );
1192                let call = ToolCall {
1193                    tool_id: zeph_tools::ToolName::new("bash"),
1194                    params,
1195                    caller_id: None,
1196                };
1197
1198                let err = exec.execute_tool_call(&call).await.unwrap_err();
1199                assert!(matches!(err, ToolError::Blocked { .. }));
1200            })
1201            .await;
1202    }
1203
1204    #[tokio::test]
1205    async fn args_field_bypass_sh_minus_c_blocked() {
1206        let local = tokio::task::LocalSet::new();
1207        local
1208            .run_until(async {
1209                let conn = Rc::new(FakeTerminalClient);
1210                let sid = acp::schema::SessionId::new("s1");
1211                let (exec, handler) = AcpShellExecutor::new(conn, sid, None, 120);
1212                tokio::task::spawn_local(handler);
1213
1214                let mut params = serde_json::Map::new();
1215                params.insert("command".to_owned(), serde_json::json!("sh"));
1216                params.insert(
1217                    "args".to_owned(),
1218                    serde_json::json!(["-c", "shutdown -h now"]),
1219                );
1220                let call = ToolCall {
1221                    tool_id: zeph_tools::ToolName::new("bash"),
1222                    params,
1223                    caller_id: None,
1224                };
1225
1226                let err = exec.execute_tool_call(&call).await.unwrap_err();
1227                assert!(matches!(err, ToolError::Blocked { .. }));
1228            })
1229            .await;
1230    }
1231
1232    #[test]
1233    fn extract_command_binary_chained_transparent_prefixes() {
1234        // SEC-ACP-I1: "env command exec sudo rm" -> "sudo", not "command"
1235        assert_eq!(
1236            extract_command_binary("env command exec sudo rm -rf /"),
1237            "sudo"
1238        );
1239        assert_eq!(extract_command_binary("nice nohup time git status"), "git");
1240    }
1241
1242    #[test]
1243    fn extract_command_binary_env_var_then_prefix_then_binary() {
1244        assert_eq!(extract_command_binary("FOO=bar env BAZ=qux git log"), "git");
1245    }
1246
1247    #[tokio::test]
1248    async fn bash_stdin_blocked_without_permission_gate() {
1249        let local = tokio::task::LocalSet::new();
1250        local
1251            .run_until(async {
1252                let conn = Rc::new(FakeTerminalClient);
1253                let sid = acp::schema::SessionId::new("s1");
1254                let (exec, handler) = AcpShellExecutor::new(conn, sid, None, 120);
1255                tokio::task::spawn_local(handler);
1256
1257                let mut params = serde_json::Map::new();
1258                params.insert("terminal_id".to_owned(), serde_json::json!("term-1"));
1259                params.insert("data".to_owned(), serde_json::json!("hello\n"));
1260                let call = ToolCall {
1261                    tool_id: zeph_tools::ToolName::new("bash_stdin"),
1262                    params,
1263                    caller_id: None,
1264                };
1265                let err = exec.execute_tool_call(&call).await.unwrap_err();
1266                assert!(matches!(err, ToolError::Blocked { .. }));
1267            })
1268            .await;
1269    }
1270
1271    #[test]
1272    fn bash_stdin_not_in_tool_definitions_without_gate() {
1273        let (tx, _rx) = mpsc::unbounded_channel::<TerminalMessage>();
1274        let exec = AcpShellExecutor {
1275            session_id: acp::schema::SessionId::new("s"),
1276            request_tx: tx,
1277            permission_gate: None,
1278            timeout: Duration::from_mins(2),
1279        };
1280        let defs = exec.tool_definitions();
1281        assert!(!defs.iter().any(|d| d.id == "bash_stdin"));
1282    }
1283
1284    #[tokio::test]
1285    async fn bash_stdin_size_limit_rejected() {
1286        let local = tokio::task::LocalSet::new();
1287        local
1288            .run_until(async {
1289                let perm_conn = Rc::new(RejectPermissionClient);
1290                let sid = acp::schema::SessionId::new("s1");
1291                let tmp_dir = tempfile::tempdir().unwrap();
1292                let perm_file = tmp_dir.path().join("perms.toml");
1293                let (gate, perm_handler) = AcpPermissionGate::new(perm_conn, Some(perm_file));
1294                tokio::task::spawn_local(perm_handler);
1295
1296                let term_conn = Rc::new(FakeTerminalClient);
1297                let (exec, term_handler) = AcpShellExecutor::new(term_conn, sid, Some(gate), 120);
1298                tokio::task::spawn_local(term_handler);
1299
1300                let oversized = "x".repeat(MAX_STDIN_BYTES + 1);
1301                let mut params = serde_json::Map::new();
1302                params.insert("terminal_id".to_owned(), serde_json::json!("term-1"));
1303                params.insert("data".to_owned(), serde_json::json!(oversized));
1304                let call = ToolCall {
1305                    tool_id: zeph_tools::ToolName::new("bash_stdin"),
1306                    params,
1307                    caller_id: None,
1308                };
1309                let err = exec.execute_tool_call(&call).await.unwrap_err();
1310                assert!(matches!(err, ToolError::InvalidParams { .. }));
1311            })
1312            .await;
1313    }
1314
1315    struct AllowPermissionClient;
1316
1317    #[async_trait::async_trait(?Send)]
1318    impl acp::Client for AllowPermissionClient {
1319        async fn request_permission(
1320            &self,
1321            _args: acp::schema::RequestPermissionRequest,
1322        ) -> acp::Result<acp::RequestPermissionResponse> {
1323            Ok(acp::RequestPermissionResponse::new(
1324                acp::schema::RequestPermissionOutcome::Selected(
1325                    acp::SelectedPermissionOutcome::new("allow_once"),
1326                ),
1327            ))
1328        }
1329
1330        async fn session_notification(
1331            &self,
1332            _args: acp::schema::SessionNotification,
1333        ) -> acp::Result<()> {
1334            Ok(())
1335        }
1336    }
1337
1338    #[tokio::test]
1339    async fn bash_stdin_with_permission_gate_succeeds() {
1340        let local = tokio::task::LocalSet::new();
1341        local
1342            .run_until(async {
1343                let perm_conn = Rc::new(AllowPermissionClient);
1344                let sid = acp::schema::SessionId::new("s1");
1345                let tmp_dir = tempfile::tempdir().unwrap();
1346                let perm_file = tmp_dir.path().join("perms.toml");
1347                let (gate, perm_handler) = AcpPermissionGate::new(perm_conn, Some(perm_file));
1348                tokio::task::spawn_local(perm_handler);
1349
1350                let term_conn = Rc::new(FakeTerminalClient);
1351                let (exec, term_handler) = AcpShellExecutor::new(term_conn, sid, Some(gate), 120);
1352                tokio::task::spawn_local(term_handler);
1353
1354                let mut params = serde_json::Map::new();
1355                params.insert("terminal_id".to_owned(), serde_json::json!("term-1"));
1356                params.insert("data".to_owned(), serde_json::json!("echo hello\n"));
1357                let call = ToolCall {
1358                    tool_id: zeph_tools::ToolName::new("bash_stdin"),
1359                    params,
1360                    caller_id: None,
1361                };
1362                let result = exec.execute_tool_call(&call).await.unwrap().unwrap();
1363                assert_eq!(result.tool_name, "bash_stdin");
1364                assert!(result.summary.contains("term-1"));
1365            })
1366            .await;
1367    }
1368
1369    #[test]
1370    fn bash_stdin_in_tool_definitions_with_gate() {
1371        let (tx, _rx) = mpsc::unbounded_channel::<TerminalMessage>();
1372        let tmp_dir = tempfile::tempdir().unwrap();
1373        let perm_file = tmp_dir.path().join("perms.toml");
1374        let perm_conn = Rc::new(AllowPermissionClient);
1375        let (gate, _handler) = AcpPermissionGate::new(perm_conn, Some(perm_file));
1376        let exec = AcpShellExecutor {
1377            session_id: acp::schema::SessionId::new("s"),
1378            request_tx: tx,
1379            permission_gate: Some(gate),
1380            timeout: Duration::from_mins(2),
1381        };
1382        let defs = exec.tool_definitions();
1383        assert!(defs.iter().any(|d| d.id == "bash_stdin"));
1384        assert!(defs.iter().any(|d| d.id == "bash"));
1385    }
1386
1387    #[tokio::test]
1388    async fn bash_stdin_exactly_64kib_boundary_accepted() {
1389        let local = tokio::task::LocalSet::new();
1390        local
1391            .run_until(async {
1392                let perm_conn = Rc::new(AllowPermissionClient);
1393                let sid = acp::schema::SessionId::new("s1");
1394                let tmp_dir = tempfile::tempdir().unwrap();
1395                let perm_file = tmp_dir.path().join("perms.toml");
1396                let (gate, perm_handler) = AcpPermissionGate::new(perm_conn, Some(perm_file));
1397                tokio::task::spawn_local(perm_handler);
1398
1399                let term_conn = Rc::new(FakeTerminalClient);
1400                let (exec, term_handler) = AcpShellExecutor::new(term_conn, sid, Some(gate), 120);
1401                tokio::task::spawn_local(term_handler);
1402
1403                // Exactly at the limit must succeed.
1404                let at_limit = "x".repeat(MAX_STDIN_BYTES);
1405                let mut params = serde_json::Map::new();
1406                params.insert("terminal_id".to_owned(), serde_json::json!("term-1"));
1407                params.insert("data".to_owned(), serde_json::json!(at_limit));
1408                let call = ToolCall {
1409                    tool_id: zeph_tools::ToolName::new("bash_stdin"),
1410                    params,
1411                    caller_id: None,
1412                };
1413                let result = exec.execute_tool_call(&call).await.unwrap().unwrap();
1414                assert_eq!(result.tool_name, "bash_stdin");
1415            })
1416            .await;
1417    }
1418
1419    #[tokio::test]
1420    async fn bash_stdin_broken_pipe_fast_fail() {
1421        // After the CancellationToken is cancelled, WriteStdin must return BrokenPipe immediately.
1422        let local = tokio::task::LocalSet::new();
1423        local
1424            .run_until(async {
1425                let (tx, rx) = mpsc::unbounded_channel::<TerminalMessage>();
1426                let conn = Rc::new(FakeTerminalClient);
1427                let handler = async move { run_terminal_handler(conn, rx).await };
1428                tokio::task::spawn_local(handler);
1429
1430                let sid = acp::schema::SessionId::new("s1");
1431                let tid: acp::schema::TerminalId = "term-bp".to_owned().into();
1432
1433                // First WriteStdin: establishes the pump and cancels via a pre-cancelled token.
1434                // We simulate a broken pump by sending two WriteStdin messages to the same
1435                // terminal: the first establishes the pump, then we fill the channel beyond
1436                // capacity so the next try_send returns Err (BrokenPipe).
1437                let mut replies = Vec::new();
1438                for _ in 0..=STDIN_CHANNEL_CAPACITY {
1439                    let (reply_tx, reply_rx) = oneshot::channel();
1440                    tx.send(TerminalMessage::WriteStdin(StdinWriteRequest {
1441                        session_id: sid.clone(),
1442                        terminal_id: tid.clone(),
1443                        data: b"x".to_vec(),
1444                        reply: reply_tx,
1445                    }))
1446                    .unwrap();
1447                    replies.push(reply_rx);
1448                }
1449                // Collect results: at least one must be BrokenPipe (channel overflow).
1450                let mut got_broken_pipe = false;
1451                for reply_rx in replies {
1452                    if let Ok(Err(AcpError::BrokenPipe)) = reply_rx.await {
1453                        got_broken_pipe = true;
1454                    }
1455                }
1456                assert!(
1457                    got_broken_pipe,
1458                    "expected at least one BrokenPipe from overflow"
1459                );
1460            })
1461            .await;
1462    }
1463
1464    #[tokio::test]
1465    async fn bash_stdin_pump_cancelled_on_release() {
1466        // After Release, the pump's CancellationToken must be cancelled.
1467        // Subsequent WriteStdin to the same terminal_id starts a fresh pump (no persistent state).
1468        let local = tokio::task::LocalSet::new();
1469        local
1470            .run_until(async {
1471                let (tx, rx) = mpsc::unbounded_channel::<TerminalMessage>();
1472                let conn = Rc::new(FakeTerminalClient);
1473                let handler = async move { run_terminal_handler(conn, rx).await };
1474                tokio::task::spawn_local(handler);
1475
1476                let sid = acp::schema::SessionId::new("s1");
1477                let tid: acp::schema::TerminalId = "term-rel".to_owned().into();
1478
1479                // Establish a pump by writing stdin.
1480                let (reply_tx, reply_rx) = oneshot::channel();
1481                tx.send(TerminalMessage::WriteStdin(StdinWriteRequest {
1482                    session_id: sid.clone(),
1483                    terminal_id: tid.clone(),
1484                    data: b"hello\n".to_vec(),
1485                    reply: reply_tx,
1486                }))
1487                .unwrap();
1488                reply_rx.await.unwrap().unwrap(); // pump established, write queued
1489
1490                // Release the terminal — must cancel the pump.
1491                tx.send(TerminalMessage::Release(TerminalReleaseRequest {
1492                    session_id: sid.clone(),
1493                    terminal_id: tid.to_string(),
1494                }))
1495                .unwrap();
1496
1497                // Allow the handler to process the Release.
1498                tokio::task::yield_now().await;
1499
1500                // Writing again after release starts a fresh pump — should succeed.
1501                let (fresh_reply, write_result) = oneshot::channel();
1502                tx.send(TerminalMessage::WriteStdin(StdinWriteRequest {
1503                    session_id: sid.clone(),
1504                    terminal_id: tid.clone(),
1505                    data: b"after release\n".to_vec(),
1506                    reply: fresh_reply,
1507                }))
1508                .unwrap();
1509                // Fresh pump: send must succeed (Ok).
1510                write_result.await.unwrap().unwrap();
1511            })
1512            .await;
1513    }
1514}