Skip to main content

claude_wrapper/
duplex.rs

1//! Long-lived duplex stream-json sessions.
2//!
3//! [`DuplexSession`] holds a `claude` subprocess open in
4//! `--input-format stream-json --output-format stream-json` mode for
5//! the duration of a conversation. A single child is held open across
6//! many turns; user messages are written to its stdin, NDJSON events
7//! are read from its stdout and dispatched back to `send()` callers.
8//!
9//! # When to use
10//!
11//! [`DuplexSession`] is the recommended primitive for long-running
12//! hosts that drive multi-turn conversations: agent servers, IDE
13//! backends, daemons, chat UIs. Holding the child open across turns
14//! amortizes init cost and unlocks capabilities that are awkward or
15//! impossible from a transient subprocess: mid-turn permission
16//! decisions ([`PermissionHandler`]), clean
17//! [interrupts](DuplexSession::interrupt), and a typed
18//! [event subscriber stream](DuplexSession::subscribe) that fans out
19//! events to multiple consumers.
20//!
21//! For short-lived processes (CLIs, build scripts, batch jobs,
22//! lambdas) where each turn can stand on its own, prefer
23//! [`QueryCommand`] for one-off calls or [`Session`] for transient
24//! multi-turn with cumulative cost / history tracking.
25//!
26//! [`QueryCommand`]: crate::QueryCommand
27//! [`Session`]: crate::session::Session
28//!
29//! # Example
30//!
31//! ```no_run
32//! use claude_wrapper::Claude;
33//! use claude_wrapper::duplex::{DuplexOptions, DuplexSession};
34//!
35//! # async fn example() -> claude_wrapper::Result<()> {
36//! let claude = Claude::builder().build()?;
37//! let session = DuplexSession::spawn(
38//!     &claude,
39//!     DuplexOptions::default().model("haiku"),
40//! ).await?;
41//!
42//! let turn = session.send("hello").await?;
43//! if let Some(text) = turn.result_text() {
44//!     println!("{text}");
45//! }
46//!
47//! session.close().await?;
48//! # Ok(())
49//! # }
50//! ```
51//!
52//! # Subscribers
53//!
54//! For event-driven UIs that want to react to assistant tokens,
55//! tool-use blocks, or system events as they arrive, call
56//! [`DuplexSession::subscribe`] before issuing a [`DuplexSession::send`].
57//! Each receiver gets its own buffered view of the event stream;
58//! slow consumers see [`tokio::sync::broadcast::error::RecvError::Lagged`]
59//! rather than blocking the session task.
60//!
61//! ```no_run
62//! use claude_wrapper::Claude;
63//! use claude_wrapper::duplex::{DuplexOptions, DuplexSession, InboundEvent};
64//!
65//! # async fn example() -> claude_wrapper::Result<()> {
66//! let claude = Claude::builder().build()?;
67//! let session = DuplexSession::spawn(&claude, DuplexOptions::default()).await?;
68//!
69//! let mut rx = session.subscribe();
70//! let _turn = session.send("hello").await?;
71//!
72//! while let Ok(event) = rx.try_recv() {
73//!     match event {
74//!         InboundEvent::SystemInit { session_id } => {
75//!             println!("session id: {session_id}");
76//!         }
77//!         InboundEvent::Assistant(_) => {
78//!             // partial or complete assistant message
79//!         }
80//!         _ => {}
81//!     }
82//! }
83//!
84//! session.close().await?;
85//! # Ok(())
86//! # }
87//! ```
88//!
89//! For interleaved (concurrent) event handling while a turn is in
90//! flight, drive `rx.recv()` and the `send()` future together via
91//! `tokio::select!`. Pin the send future and use a block scope so
92//! its borrow of the session ends before [`DuplexSession::close`].
93//!
94//! # Mid-turn permission decisions
95//!
96//! Configure a [`PermissionHandler`] at spawn time to answer the
97//! CLI's permission prompts in-flight. The session writes
98//! `--permission-prompt-tool stdio` automatically when a handler is
99//! set, so the CLI emits `control_request` messages for tool use
100//! over the duplex channel rather than blocking on a TUI prompt.
101//!
102//! ```no_run
103//! use claude_wrapper::Claude;
104//! use claude_wrapper::duplex::{
105//!     DuplexOptions, DuplexSession, PermissionDecision, PermissionHandler,
106//! };
107//!
108//! # async fn example() -> claude_wrapper::Result<()> {
109//! let handler = PermissionHandler::new(|req| async move {
110//!     if req.tool_name == "Bash" {
111//!         PermissionDecision::Deny { message: "bash is denied".into() }
112//!     } else {
113//!         PermissionDecision::Allow { updated_input: None }
114//!     }
115//! });
116//!
117//! let claude = Claude::builder().build()?;
118//! let session = DuplexSession::spawn(
119//!     &claude,
120//!     DuplexOptions::default().on_permission(handler),
121//! ).await?;
122//! # Ok(())
123//! # }
124//! ```
125//!
126//! For human-in-the-loop UIs, return [`PermissionDecision::Defer`]
127//! from the handler, capture the [`PermissionRequest::request_id`],
128//! and answer later via [`DuplexSession::respond_to_permission`].
129//!
130//! **Known limitation:** as of claude CLI 2.1.x,
131//! `--permission-prompt-tool stdio` does not cause the CLI to emit
132//! `control_request {subtype: "can_use_tool"}` in
133//! `--print --output-format stream-json` mode. The permission handler
134//! registered here is wire-correct and unit-tested, but will not be
135//! invoked end-to-end until the upstream CLI bug is resolved. Tracked
136//! upstream at
137//! <https://github.com/anthropics/claude-agent-sdk-python/issues/469>.
138//!
139//! # Mid-turn interrupt
140//!
141//! [`DuplexSession::interrupt`] sends a clean
142//! `control_request {subtype: "interrupt"}` to the CLI. The CLI
143//! stops generating, closes the in-flight turn (`send().await`
144//! resolves with the truncated [`TurnResult`]), and answers our
145//! interrupt with a `control_response`. Use this instead of dropping
146//! the session or killing the child when you want to cancel one
147//! turn but keep the conversation going.
148//!
149//! ```no_run
150//! use std::time::Duration;
151//! use claude_wrapper::Claude;
152//! use claude_wrapper::duplex::{DuplexOptions, DuplexSession};
153//!
154//! # async fn example() -> claude_wrapper::Result<()> {
155//! let claude = Claude::builder().build()?;
156//! let session = DuplexSession::spawn(&claude, DuplexOptions::default()).await?;
157//!
158//! let send_fut = session.send("write a long essay about rust");
159//! let interrupt_fut = async {
160//!     tokio::time::sleep(Duration::from_millis(500)).await;
161//!     session.interrupt().await
162//! };
163//!
164//! let (turn, interrupt_result) = tokio::join!(send_fut, interrupt_fut);
165//! let _truncated = turn?;
166//! interrupt_result?;
167//! # Ok(())
168//! # }
169//! ```
170//!
171//! # Phased rollout
172//!
173//! This module rolled out in four PRs tracked in
174//! <https://github.com/joshrotenberg/claude-wrapper/issues/561>:
175//! `spawn`/`send`/`close` (PR 1), `subscribe` (PR 2), mid-turn
176//! permission handling (PR 3), and `interrupt` (PR 4, this one).
177
178use std::collections::HashMap;
179use std::future::Future;
180use std::pin::Pin;
181use std::process::Stdio;
182use std::sync::Arc;
183use std::time::Duration;
184
185use serde_json::Value;
186use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
187use tokio::process::{Child, ChildStdin, ChildStdout, Command};
188use tokio::sync::{broadcast, mpsc, oneshot, watch};
189use tokio::task::JoinHandle;
190use tracing::{debug, warn};
191
192use crate::Claude;
193use crate::error::{Error, Result};
194use crate::types::PermissionMode;
195
196/// Default capacity of the per-session [`broadcast::Sender`] backing
197/// [`DuplexSession::subscribe`].
198///
199/// Override per-session via [`DuplexOptions::subscriber_capacity`].
200pub const DEFAULT_SUBSCRIBER_CAPACITY: usize = 256;
201
202/// A mid-turn permission prompt from the CLI for a single tool
203/// invocation.
204///
205/// Forwarded to the [`PermissionHandler`] registered via
206/// [`DuplexOptions::on_permission`]. Capture
207/// [`Self::request_id`] inside your handler if you intend to return
208/// [`PermissionDecision::Defer`] and answer later via
209/// [`DuplexSession::respond_to_permission`].
210#[derive(Debug, Clone)]
211pub struct PermissionRequest {
212    /// CLI-assigned correlation id. Pass this to
213    /// [`DuplexSession::respond_to_permission`] when deferring.
214    pub request_id: String,
215    /// The tool the model wants to use (e.g. `"Bash"`, `"Edit"`).
216    pub tool_name: String,
217    /// The tool's `input` payload as the model produced it.
218    pub input: Value,
219    /// The full `request` object as sent by the CLI, for fields not
220    /// promoted to typed accessors.
221    pub raw: Value,
222}
223
224/// The decision returned from a [`PermissionHandler`] (or passed to
225/// [`DuplexSession::respond_to_permission`] for deferred decisions).
226///
227/// `Allow` and `Deny` both write a control response to the CLI
228/// immediately. `Defer` causes the run loop to skip writing a
229/// response; the caller is then expected to invoke
230/// [`DuplexSession::respond_to_permission`] later. Passing `Defer`
231/// to `respond_to_permission` is a no-op.
232#[derive(Debug, Clone)]
233pub enum PermissionDecision {
234    /// Allow the tool to run, optionally with rewritten input.
235    Allow {
236        /// Replace the model's input with this object before running
237        /// the tool. `None` keeps the original input.
238        updated_input: Option<Value>,
239    },
240    /// Deny the tool. The `message` is surfaced to the model.
241    Deny {
242        /// Human-readable explanation given back to the model.
243        message: String,
244    },
245    /// Decision pending; the caller will supply it later via
246    /// [`DuplexSession::respond_to_permission`].
247    Defer,
248}
249
250type PermissionFuture = Pin<Box<dyn Future<Output = PermissionDecision> + Send + 'static>>;
251type PermissionFn = dyn Fn(PermissionRequest) -> PermissionFuture + Send + Sync + 'static;
252
253/// A user-supplied async callback invoked when the CLI requests
254/// permission to use a tool.
255///
256/// Construct with [`Self::new`], passing an `async fn` or
257/// async-block closure. Cheap to clone (`Arc` under the hood).
258///
259/// The handler runs inline on the duplex session's task. The CLI is
260/// blocked on the response while the handler runs, so awaiting an
261/// async policy check (DB lookup, remote call) is fine. If the
262/// decision needs human input on a different timescale, return
263/// [`PermissionDecision::Defer`] and answer via
264/// [`DuplexSession::respond_to_permission`] when ready.
265#[derive(Clone)]
266pub struct PermissionHandler {
267    inner: Arc<PermissionFn>,
268}
269
270impl PermissionHandler {
271    /// Wrap an async closure as a permission handler.
272    ///
273    /// # Example
274    ///
275    /// ```
276    /// use claude_wrapper::duplex::{PermissionDecision, PermissionHandler};
277    ///
278    /// let _handler = PermissionHandler::new(|req| async move {
279    ///     if req.tool_name == "Bash" {
280    ///         PermissionDecision::Deny { message: "no bash".into() }
281    ///     } else {
282    ///         PermissionDecision::Allow { updated_input: None }
283    ///     }
284    /// });
285    /// ```
286    pub fn new<F, Fut>(f: F) -> Self
287    where
288        F: Fn(PermissionRequest) -> Fut + Send + Sync + 'static,
289        Fut: Future<Output = PermissionDecision> + Send + 'static,
290    {
291        Self {
292            inner: Arc::new(move |req| Box::pin(f(req))),
293        }
294    }
295
296    fn invoke(&self, req: PermissionRequest) -> PermissionFuture {
297        (self.inner)(req)
298    }
299}
300
301impl std::fmt::Debug for PermissionHandler {
302    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
303        f.debug_struct("PermissionHandler").finish_non_exhaustive()
304    }
305}
306
307/// Configuration for [`DuplexSession::spawn`].
308///
309/// Builder methods cover the most common spawn-time options. The
310/// spawn call always includes
311/// `--print --verbose --input-format stream-json --output-format stream-json`
312/// regardless of these options.
313#[derive(Debug, Default, Clone)]
314pub struct DuplexOptions {
315    model: Option<String>,
316    system_prompt: Option<String>,
317    append_system_prompt: Option<String>,
318    resume: Option<String>,
319    continue_session: bool,
320    worktree: bool,
321    worktree_name: Option<String>,
322    agent: Option<String>,
323    agents_json: Option<String>,
324    permission_mode: Option<PermissionMode>,
325    dangerously_skip_permissions: bool,
326    additional_args: Vec<String>,
327    subscriber_capacity: Option<usize>,
328    on_permission: Option<PermissionHandler>,
329}
330
331impl DuplexOptions {
332    /// Set the model for this session (`--model`).
333    #[must_use]
334    pub fn model(mut self, model: impl Into<String>) -> Self {
335        self.model = Some(model.into());
336        self
337    }
338
339    /// Set the system prompt for this session (`--system-prompt`).
340    #[must_use]
341    pub fn system_prompt(mut self, prompt: impl Into<String>) -> Self {
342        self.system_prompt = Some(prompt.into());
343        self
344    }
345
346    /// Append to the default system prompt (`--append-system-prompt`).
347    #[must_use]
348    pub fn append_system_prompt(mut self, prompt: impl Into<String>) -> Self {
349        self.append_system_prompt = Some(prompt.into());
350        self
351    }
352
353    /// Resume a prior session by id (`--resume <session_id>`).
354    ///
355    /// Mirrors [`QueryCommand::resume`](crate::QueryCommand::resume)
356    /// for the duplex path. The spawned `claude` process picks up the
357    /// conversation that produced `session_id` and continues it; turns
358    /// sent through [`DuplexSession::send`] append to the existing
359    /// history rather than starting fresh.
360    ///
361    /// Use case: a host (IDE, MCP server, agent backend) wants to
362    /// upgrade a passive on-disk session to a live duplex one --
363    /// pulls the `session_id` out of the existing JSONL log, opens a
364    /// duplex session here, and the next turn extends the same
365    /// conversation.
366    ///
367    /// `resume` and [`Self::continue_session`] are mutually exclusive
368    /// at the CLI; passing both lets the CLI decide (it errors today).
369    #[must_use]
370    pub fn resume(mut self, session_id: impl Into<String>) -> Self {
371        self.resume = Some(session_id.into());
372        self
373    }
374
375    /// Continue the most recent session in the current working
376    /// directory (`--continue`).
377    ///
378    /// Mirrors [`QueryCommand::continue_session`](crate::QueryCommand::continue_session)
379    /// for the duplex path. Use [`Self::resume`] to pick a specific
380    /// session id; use this when "the last one" is what you want.
381    #[must_use]
382    pub fn continue_session(mut self) -> Self {
383        self.continue_session = true;
384        self
385    }
386
387    /// Run this session in a fresh git worktree (`--worktree [name]`).
388    ///
389    /// `name` is the optional worktree name (the CLI auto-generates
390    /// one if omitted). Calling this method always enables the
391    /// worktree flag, with or without a name.
392    ///
393    /// Use case: an agent host wants the chat's writes isolated from
394    /// the current working tree -- the chat opens with a fresh
395    /// worktree, mutations land there, and the host can inspect or
396    /// merge later.
397    #[must_use]
398    pub fn worktree(mut self, name: Option<impl Into<String>>) -> Self {
399        self.worktree = true;
400        if let Some(n) = name {
401            self.worktree_name = Some(n.into());
402        }
403        self
404    }
405
406    /// Pin the session to a named subagent (`--agent <name>`).
407    ///
408    /// `name` is resolved by the CLI in this order: inline
409    /// definitions from [`Self::agents_json`], then user-level
410    /// `~/.claude/agents/<name>.md` files, then project-level dirs
411    /// loaded by the active `--setting-sources`.
412    ///
413    /// **Caveat**: as of Claude Code 2.1.143, the CLI silently
414    /// ignores an unknown `name` and falls back to the default
415    /// behavior -- no warning, no error. Callers that want a hard
416    /// "agent must exist" semantics should validate the name out of
417    /// band (e.g. via [`crate::artifacts::AgentsRoot::get`]) before
418    /// passing it here.
419    #[must_use]
420    pub fn agent(mut self, name: impl Into<String>) -> Self {
421        self.agent = Some(name.into());
422        self
423    }
424
425    /// Inline subagent definitions for this session
426    /// (`--agents <json>`).
427    ///
428    /// `json` is a JSON object keyed by agent name, with each value
429    /// carrying at least `description` and `prompt`. Inline
430    /// definitions take precedence over on-disk
431    /// `~/.claude/agents/*.md` of the same name. Pass [`Self::agent`]
432    /// to select which one to use as the session's persona.
433    ///
434    /// Example: `{"reviewer": {"description": "Reviews code",
435    /// "prompt": "You are a code reviewer"}}`.
436    #[must_use]
437    pub fn agents_json(mut self, json: impl Into<String>) -> Self {
438        self.agents_json = Some(json.into());
439        self
440    }
441
442    /// Set the permission mode for this session
443    /// (`--permission-mode <mode>`).
444    ///
445    /// Mirrors [`QueryCommand::permission_mode`](crate::QueryCommand::permission_mode)
446    /// for the duplex path. The default mode (when this method isn't
447    /// called) drops to the CLI's interactive prompt for every
448    /// tool-use approval, which is broken for non-interactive duplex
449    /// sessions -- nothing answers the prompts and the session stalls
450    /// or fails. Call this with [`PermissionMode::AcceptEdits`] for
451    /// the "edit files autonomously" pattern, [`PermissionMode::Plan`]
452    /// for read-only planning, etc.
453    ///
454    /// Bypass mode is a footgun; reach for [`Self::dangerously_skip_permissions`]
455    /// (or, for stricter discipline, [`crate::dangerous::DangerousClient`])
456    /// when you really need it.
457    #[must_use]
458    pub fn permission_mode(mut self, mode: PermissionMode) -> Self {
459        self.permission_mode = Some(mode);
460        self
461    }
462
463    /// Pass `--dangerously-skip-permissions` to the spawned session.
464    ///
465    /// Bypasses ALL permission checks -- file edits, bash, network,
466    /// the lot. Use only when you know the session runs in a trusted
467    /// sandbox (a fresh worktree, a container, etc.). For most "run
468    /// autonomously" cases you want [`Self::permission_mode`] with
469    /// [`PermissionMode::AcceptEdits`] instead.
470    #[must_use]
471    pub fn dangerously_skip_permissions(mut self) -> Self {
472        self.dangerously_skip_permissions = true;
473        self
474    }
475
476    /// Add a raw argument to the spawn command line.
477    ///
478    /// Escape hatch for flags not covered by the dedicated builder
479    /// methods.
480    #[must_use]
481    pub fn arg(mut self, arg: impl Into<String>) -> Self {
482        self.additional_args.push(arg.into());
483        self
484    }
485
486    /// Set the per-session [`broadcast::Sender`] capacity backing
487    /// [`DuplexSession::subscribe`].
488    ///
489    /// Defaults to [`DEFAULT_SUBSCRIBER_CAPACITY`] (256). Larger
490    /// values give slow subscribers more room before they
491    /// [`Lagged`](tokio::sync::broadcast::error::RecvError::Lagged);
492    /// smaller values reclaim memory if you do not subscribe.
493    #[must_use]
494    pub fn subscriber_capacity(mut self, capacity: usize) -> Self {
495        self.subscriber_capacity = Some(capacity);
496        self
497    }
498
499    /// Register a [`PermissionHandler`] to answer the CLI's tool-use
500    /// permission prompts in-flight.
501    ///
502    /// When set, the spawn command line includes
503    /// `--permission-prompt-tool stdio`, which configures the CLI to
504    /// emit `control_request` messages for tool use over the duplex
505    /// channel rather than blocking on a TUI prompt.
506    ///
507    /// Without a handler, the session does not pass
508    /// `--permission-prompt-tool` and the CLI applies its default
509    /// permission policy (driven by `--permission-mode`).
510    ///
511    /// **Known limitation:** as of claude CLI 2.1.x the CLI does not
512    /// emit `control_request {subtype: "can_use_tool"}` in stream-json
513    /// print mode, so this handler will not be invoked end-to-end until
514    /// an upstream fix lands. The wire handling is correct; see
515    /// <https://github.com/anthropics/claude-agent-sdk-python/issues/469>.
516    #[must_use]
517    pub fn on_permission(mut self, handler: PermissionHandler) -> Self {
518        self.on_permission = Some(handler);
519        self
520    }
521
522    fn into_args(self) -> Vec<String> {
523        let mut args = vec![
524            "--print".to_string(),
525            "--verbose".to_string(),
526            "--output-format".to_string(),
527            "stream-json".to_string(),
528            "--input-format".to_string(),
529            "stream-json".to_string(),
530        ];
531
532        if let Some(m) = self.model {
533            args.push("--model".to_string());
534            args.push(m);
535        }
536        if let Some(p) = self.system_prompt {
537            args.push("--system-prompt".to_string());
538            args.push(p);
539        }
540        if let Some(p) = self.append_system_prompt {
541            args.push("--append-system-prompt".to_string());
542            args.push(p);
543        }
544        if let Some(id) = self.resume {
545            args.push("--resume".to_string());
546            args.push(id);
547        }
548        if self.continue_session {
549            args.push("--continue".to_string());
550        }
551        if self.worktree {
552            args.push("--worktree".to_string());
553            if let Some(n) = self.worktree_name {
554                args.push(n);
555            }
556        }
557        if let Some(json) = self.agents_json {
558            args.push("--agents".to_string());
559            args.push(json);
560        }
561        if let Some(name) = self.agent {
562            args.push("--agent".to_string());
563            args.push(name);
564        }
565        if let Some(mode) = self.permission_mode {
566            args.push("--permission-mode".to_string());
567            args.push(mode.as_arg().to_string());
568        }
569        if self.dangerously_skip_permissions {
570            args.push("--dangerously-skip-permissions".to_string());
571        }
572        if self.on_permission.is_some() {
573            args.push("--permission-prompt-tool".to_string());
574            args.push("stdio".to_string());
575        }
576        args.extend(self.additional_args);
577
578        args
579    }
580}
581
582/// The result of one turn through a [`DuplexSession`].
583///
584/// `result` is the raw JSON of the `{"type": "result", ...}` message
585/// that closed the turn. `events` carries every other message
586/// received during the turn (system, assistant, stream_event, user)
587/// in arrival order, with the closing `result` excluded.
588#[derive(Debug, Clone)]
589pub struct TurnResult {
590    /// The raw `{"type": "result", ...}` message that ended the turn.
591    pub result: Value,
592    /// Every other message received during the turn, in order.
593    pub events: Vec<Value>,
594}
595
596impl TurnResult {
597    /// Extract `result.result` as a string, if present.
598    #[must_use]
599    pub fn result_text(&self) -> Option<&str> {
600        self.result.get("result").and_then(Value::as_str)
601    }
602
603    /// Extract `result.session_id`, if present.
604    #[must_use]
605    pub fn session_id(&self) -> Option<&str> {
606        self.result.get("session_id").and_then(Value::as_str)
607    }
608
609    /// Extract `total_cost_usd` (preferred) or the legacy `cost_usd`
610    /// field, if either is present.
611    #[must_use]
612    pub fn total_cost_usd(&self) -> Option<f64> {
613        self.result
614            .get("total_cost_usd")
615            .or_else(|| self.result.get("cost_usd"))
616            .and_then(Value::as_f64)
617    }
618
619    /// Extract `duration_ms`, if present.
620    #[must_use]
621    pub fn duration_ms(&self) -> Option<u64> {
622        self.result.get("duration_ms").and_then(Value::as_u64)
623    }
624}
625
626/// A classified inbound event broadcast to [`DuplexSession::subscribe`]
627/// receivers.
628///
629/// Every non-`result` message coming back from the CLI is broadcast as
630/// one of these variants. The closing `{"type": "result"}` message is
631/// not broadcast; it resolves the in-flight [`DuplexSession::send`]
632/// future and lands in [`TurnResult::result`].
633///
634/// Subscribers see the same set of events that accumulate in
635/// [`TurnResult::events`], in the same order, just classified. Adding
636/// a typed accessor for a new event type later (e.g. promoting a
637/// `system` subtype into its own variant) is non-breaking against the
638/// `Other` fallback.
639#[derive(Debug, Clone)]
640pub enum InboundEvent {
641    /// First `{"type": "system", "subtype": "init"}` event for the
642    /// session. Carries the CLI-assigned `session_id`.
643    SystemInit {
644        /// The CLI-assigned session id, useful for logging or
645        /// future resume support.
646        session_id: String,
647    },
648    /// `{"type": "assistant", ...}` -- either a complete assistant
649    /// message or, in stream-json mode, a partial chunk.
650    Assistant(Value),
651    /// `{"type": "stream_event", ...}` -- low-level streaming event
652    /// emitted while a turn is in progress.
653    StreamEvent(Value),
654    /// `{"type": "user", ...}` -- typically a tool result echo from
655    /// the CLI side.
656    User(Value),
657    /// Any other event type, including non-`init` `system` events
658    /// and any message types not yet recognised by this enum.
659    Other(Value),
660}
661
662fn classify(msg: &Value) -> InboundEvent {
663    match msg.get("type").and_then(Value::as_str) {
664        Some("system") => {
665            if msg.get("subtype").and_then(Value::as_str) == Some("init")
666                && let Some(id) = msg.get("session_id").and_then(Value::as_str)
667            {
668                return InboundEvent::SystemInit {
669                    session_id: id.to_string(),
670                };
671            }
672            InboundEvent::Other(msg.clone())
673        }
674        Some("assistant") => InboundEvent::Assistant(msg.clone()),
675        Some("stream_event") => InboundEvent::StreamEvent(msg.clone()),
676        Some("user") => InboundEvent::User(msg.clone()),
677        _ => InboundEvent::Other(msg.clone()),
678    }
679}
680
681/// Liveness state of a [`DuplexSession`]'s background task.
682///
683/// Surfaced through [`DuplexSession::is_alive`],
684/// [`DuplexSession::exit_status`], and
685/// [`DuplexSession::wait_for_exit`] for service-shaped hosts that
686/// want non-consuming visibility into whether a session is still
687/// usable. The closing [`DuplexSession::close`] still returns the
688/// full [`Result`] for the one caller that consumes the session.
689///
690/// `Failed` carries a `String` rather than the full
691/// [`Error`] because the underlying watch channel requires `Clone`
692/// and `Error` is not `Clone` (its `Io` variant wraps a non-`Clone`
693/// `std::io::Error`). The full error remains available via
694/// [`DuplexSession::close`].
695#[derive(Debug, Clone)]
696pub enum SessionExitStatus {
697    /// The session task is still running.
698    Running,
699    /// The session task completed normally (close, stdout EOF without
700    /// error).
701    Completed,
702    /// The session task ended with an error. Carries the error's
703    /// `Display` rendering.
704    Failed(String),
705}
706
707/// A long-lived `claude` subprocess in stream-json duplex mode.
708///
709/// Owns a background task that holds the child open, writes user
710/// messages to its stdin, and reads NDJSON events from its stdout.
711/// One turn at a time: calling [`Self::send`] while another turn is
712/// in flight returns [`Error::DuplexTurnInFlight`].
713///
714/// See the [module docs](crate::duplex) for the full design.
715#[derive(Debug)]
716pub struct DuplexSession {
717    outbound_tx: mpsc::UnboundedSender<OutboundMsg>,
718    events_tx: broadcast::Sender<InboundEvent>,
719    exit_rx: watch::Receiver<SessionExitStatus>,
720    join: JoinHandle<Result<()>>,
721}
722
723#[derive(Debug)]
724enum OutboundMsg {
725    Send {
726        prompt: String,
727        reply: oneshot::Sender<Result<TurnResult>>,
728    },
729    PermissionResponse {
730        request_id: String,
731        decision: PermissionDecision,
732    },
733    Interrupt {
734        reply: oneshot::Sender<Result<()>>,
735    },
736}
737
738impl DuplexSession {
739    /// Spawn a fresh `claude` subprocess in duplex mode.
740    ///
741    /// The child is started with
742    /// `--print --verbose --input-format stream-json --output-format stream-json`
743    /// plus any options applied via `opts`. The session task takes
744    /// ownership of the child; dropping the returned handle (or
745    /// calling [`Self::close`]) shuts the task down.
746    pub async fn spawn(claude: &Claude, opts: DuplexOptions) -> Result<Self> {
747        let capacity = opts
748            .subscriber_capacity
749            .unwrap_or(DEFAULT_SUBSCRIBER_CAPACITY);
750        let permission_handler = opts.on_permission.clone();
751
752        let mut command_args = Vec::new();
753        command_args.extend(claude.global_args.clone());
754        command_args.extend(opts.into_args());
755
756        debug!(
757            binary = %claude.binary.display(),
758            args = ?command_args,
759            "spawning duplex claude session"
760        );
761
762        let mut cmd = Command::new(&claude.binary);
763        cmd.args(&command_args)
764            .env_remove("CLAUDECODE")
765            .env_remove("CLAUDE_CODE_ENTRYPOINT")
766            .envs(&claude.env)
767            .stdin(Stdio::piped())
768            .stdout(Stdio::piped())
769            .stderr(Stdio::piped())
770            .kill_on_drop(true);
771
772        if let Some(ref dir) = claude.working_dir {
773            cmd.current_dir(dir);
774        }
775
776        let mut child = cmd.spawn().map_err(|e| Error::Io {
777            message: format!("failed to spawn claude: {e}"),
778            source: e,
779            working_dir: claude.working_dir.clone(),
780        })?;
781
782        let stdin = child.stdin.take().expect("stdin was piped");
783        let stdout = child.stdout.take().expect("stdout was piped");
784
785        let (outbound_tx, outbound_rx) = mpsc::unbounded_channel();
786        let (events_tx, _initial_rx) = broadcast::channel(capacity);
787        let (exit_tx, exit_rx) = watch::channel(SessionExitStatus::Running);
788
789        let join = tokio::spawn(run_session(
790            child,
791            stdin,
792            stdout,
793            outbound_rx,
794            events_tx.clone(),
795            permission_handler,
796            exit_tx,
797        ));
798
799        Ok(Self {
800            outbound_tx,
801            events_tx,
802            exit_rx,
803            join,
804        })
805    }
806
807    /// Send one user message and await the closing result event.
808    ///
809    /// Returns [`Error::DuplexTurnInFlight`] if another turn is
810    /// already pending, and [`Error::DuplexClosed`] if the session
811    /// task has already exited.
812    pub async fn send(&self, prompt: impl Into<String>) -> Result<TurnResult> {
813        let (reply_tx, reply_rx) = oneshot::channel();
814        self.outbound_tx
815            .send(OutboundMsg::Send {
816                prompt: prompt.into(),
817                reply: reply_tx,
818            })
819            .map_err(|_| Error::DuplexClosed)?;
820        reply_rx.await.map_err(|_| Error::DuplexClosed)?
821    }
822
823    /// Subscribe to the session's classified inbound event stream.
824    ///
825    /// Returns a [`broadcast::Receiver<InboundEvent>`] that receives
826    /// every non-`result` event as it arrives. Each subscriber gets
827    /// its own buffered view; subscribers added later miss earlier
828    /// events. Slow subscribers see
829    /// [`RecvError::Lagged`](tokio::sync::broadcast::error::RecvError::Lagged)
830    /// rather than blocking the session task.
831    ///
832    /// Subscribers see the same events that accumulate in
833    /// [`TurnResult::events`], in the same order.
834    ///
835    /// # Example
836    ///
837    /// ```no_run
838    /// use claude_wrapper::Claude;
839    /// use claude_wrapper::duplex::{DuplexOptions, DuplexSession, InboundEvent};
840    ///
841    /// # async fn example() -> claude_wrapper::Result<()> {
842    /// let claude = Claude::builder().build()?;
843    /// let session = DuplexSession::spawn(&claude, DuplexOptions::default()).await?;
844    /// let mut rx = session.subscribe();
845    ///
846    /// // Subscribe before send so we receive every event.
847    /// let _turn = session.send("hello").await?;
848    ///
849    /// while let Ok(event) = rx.try_recv() {
850    ///     if let InboundEvent::SystemInit { session_id } = event {
851    ///         println!("session id: {session_id}");
852    ///     }
853    /// }
854    /// # Ok(())
855    /// # }
856    /// ```
857    #[must_use]
858    pub fn subscribe(&self) -> broadcast::Receiver<InboundEvent> {
859        self.events_tx.subscribe()
860    }
861
862    /// Cheap, non-blocking liveness check.
863    ///
864    /// Returns `true` while the session task is running, `false` once
865    /// it has exited (whether normally or with an error). Multiple
866    /// concurrent callers are allowed, and the call does not consume
867    /// the session: [`Self::close`] still works after polling.
868    ///
869    /// Reads the latest value from a `tokio::sync::watch` channel
870    /// updated from inside the session task, so it never blocks and
871    /// reflects state set just before the task returns.
872    #[must_use]
873    pub fn is_alive(&self) -> bool {
874        matches!(*self.exit_rx.borrow(), SessionExitStatus::Running)
875    }
876
877    /// Snapshot the session task's [`SessionExitStatus`].
878    ///
879    /// Returns [`SessionExitStatus::Running`] while the task is still
880    /// alive, [`SessionExitStatus::Completed`] after a clean exit, or
881    /// [`SessionExitStatus::Failed`] with the underlying error
882    /// rendered to a string.
883    ///
884    /// Like [`Self::is_alive`], this is a cheap non-blocking read.
885    #[must_use]
886    pub fn exit_status(&self) -> SessionExitStatus {
887        self.exit_rx.borrow().clone()
888    }
889
890    /// Block until the session task transitions out of
891    /// [`SessionExitStatus::Running`] and return the terminal status.
892    ///
893    /// Returns immediately if the task has already exited. Multiple
894    /// concurrent callers are supported (each gets its own receiver
895    /// clone), and the call does not consume the session.
896    ///
897    /// If the underlying watch sender is dropped without ever
898    /// publishing a terminal state -- which should not happen in
899    /// practice, but is treated defensively -- this returns the last
900    /// observed value.
901    pub async fn wait_for_exit(&self) -> SessionExitStatus {
902        let mut rx = self.exit_rx.clone();
903        loop {
904            {
905                let value = rx.borrow_and_update();
906                if !matches!(*value, SessionExitStatus::Running) {
907                    return value.clone();
908                }
909            }
910            if rx.changed().await.is_err() {
911                return rx.borrow().clone();
912            }
913        }
914    }
915
916    /// Answer a deferred permission request from a different task.
917    ///
918    /// Use this after the [`PermissionHandler`] returned
919    /// [`PermissionDecision::Defer`] for the matching `request_id`.
920    /// Passing `decision = PermissionDecision::Defer` here is a
921    /// no-op (logged at `warn`); pass `Allow` or `Deny`.
922    ///
923    /// Returns [`Error::DuplexClosed`] if the session task has
924    /// already exited.
925    ///
926    /// # Example
927    ///
928    /// ```no_run
929    /// use claude_wrapper::Claude;
930    /// use claude_wrapper::duplex::{
931    ///     DuplexOptions, DuplexSession, PermissionDecision, PermissionHandler,
932    /// };
933    /// use tokio::sync::mpsc;
934    ///
935    /// # async fn example() -> claude_wrapper::Result<()> {
936    /// // Forward request_ids out to a UI thread; answer asynchronously.
937    /// let (tx, _rx) = mpsc::unbounded_channel::<String>();
938    /// let handler = PermissionHandler::new(move |req| {
939    ///     let tx = tx.clone();
940    ///     async move {
941    ///         let _ = tx.send(req.request_id);
942    ///         PermissionDecision::Defer
943    ///     }
944    /// });
945    ///
946    /// let claude = Claude::builder().build()?;
947    /// let session = DuplexSession::spawn(
948    ///     &claude,
949    ///     DuplexOptions::default().on_permission(handler),
950    /// ).await?;
951    ///
952    /// // ...later, from the UI thread:
953    /// session.respond_to_permission(
954    ///     "req-abc",
955    ///     PermissionDecision::Allow { updated_input: None },
956    /// )?;
957    /// # Ok(())
958    /// # }
959    /// ```
960    pub fn respond_to_permission(
961        &self,
962        request_id: impl Into<String>,
963        decision: PermissionDecision,
964    ) -> Result<()> {
965        if matches!(decision, PermissionDecision::Defer) {
966            warn!("respond_to_permission called with Defer; ignoring");
967            return Ok(());
968        }
969        self.outbound_tx
970            .send(OutboundMsg::PermissionResponse {
971                request_id: request_id.into(),
972                decision,
973            })
974            .map_err(|_| Error::DuplexClosed)?;
975        Ok(())
976    }
977
978    /// Send a clean interrupt to the CLI and wait for its
979    /// acknowledgment.
980    ///
981    /// Writes a `control_request {subtype: "interrupt"}` and resolves
982    /// when the matching `control_response` comes back. The
983    /// in-flight turn (if any) closes shortly after with a truncated
984    /// [`TurnResult`] -- the [`DuplexSession::send`] future for it
985    /// resolves independently. Either ordering is possible; await
986    /// both via `tokio::join!` if you care about both outcomes.
987    ///
988    /// Returns:
989    /// - `Ok(())` when the CLI acknowledges with `subtype: "success"`.
990    /// - [`Error::DuplexControlFailed`] when the CLI answers with an
991    ///   error payload.
992    /// - [`Error::DuplexClosed`] if the session task exited before
993    ///   the response arrived.
994    ///
995    /// # Example
996    ///
997    /// ```no_run
998    /// use std::time::Duration;
999    /// use claude_wrapper::Claude;
1000    /// use claude_wrapper::duplex::{DuplexOptions, DuplexSession};
1001    ///
1002    /// # async fn example() -> claude_wrapper::Result<()> {
1003    /// let claude = Claude::builder().build()?;
1004    /// let session = DuplexSession::spawn(&claude, DuplexOptions::default()).await?;
1005    ///
1006    /// let send_fut = session.send("a question that triggers tool use");
1007    /// let interrupt_fut = async {
1008    ///     tokio::time::sleep(Duration::from_millis(250)).await;
1009    ///     session.interrupt().await
1010    /// };
1011    ///
1012    /// let (turn, interrupt) = tokio::join!(send_fut, interrupt_fut);
1013    /// let _truncated = turn?;
1014    /// interrupt?;
1015    /// # Ok(())
1016    /// # }
1017    /// ```
1018    pub async fn interrupt(&self) -> Result<()> {
1019        let (reply_tx, reply_rx) = oneshot::channel();
1020        self.outbound_tx
1021            .send(OutboundMsg::Interrupt { reply: reply_tx })
1022            .map_err(|_| Error::DuplexClosed)?;
1023        reply_rx.await.map_err(|_| Error::DuplexClosed)?
1024    }
1025
1026    /// Close the session and wait for the underlying task to exit.
1027    ///
1028    /// Drops the outbound channel sender, which the session task
1029    /// observes as `recv() -> None`, then closes stdin and reaps the
1030    /// child.
1031    pub async fn close(self) -> Result<()> {
1032        drop(self.outbound_tx);
1033        drop(self.events_tx);
1034        match self.join.await {
1035            Ok(result) => result,
1036            Err(e) if e.is_cancelled() => Ok(()),
1037            Err(e) => Err(Error::Io {
1038                message: format!("duplex session task panicked: {e}"),
1039                source: std::io::Error::other(e.to_string()),
1040                working_dir: None,
1041            }),
1042        }
1043    }
1044}
1045
1046/// Time budget for the graceful child shutdown after the run loop
1047/// exits. If the child is still alive after this deadline we SIGKILL
1048/// it so close() does not hang on a misbehaving subprocess.
1049const SHUTDOWN_BUDGET: Duration = Duration::from_secs(5);
1050
1051async fn run_session(
1052    mut child: Child,
1053    mut stdin: ChildStdin,
1054    stdout: ChildStdout,
1055    mut outbound_rx: mpsc::UnboundedReceiver<OutboundMsg>,
1056    events_tx: broadcast::Sender<InboundEvent>,
1057    permission_handler: Option<PermissionHandler>,
1058    exit_tx: watch::Sender<SessionExitStatus>,
1059) -> Result<()> {
1060    let mut lines = BufReader::new(stdout).lines();
1061    let mut pending: Option<(oneshot::Sender<Result<TurnResult>>, Vec<Value>)> = None;
1062    let mut pending_control: HashMap<String, oneshot::Sender<Result<()>>> = HashMap::new();
1063    let mut next_control_id: u64 = 0;
1064    let mut stream_err: Option<Error> = None;
1065
1066    loop {
1067        tokio::select! {
1068            biased;
1069
1070            line = lines.next_line() => match line {
1071                Ok(Some(l)) => {
1072                    if l.trim().is_empty() {
1073                        continue;
1074                    }
1075                    let parsed = match serde_json::from_str::<Value>(&l) {
1076                        Ok(v) => v,
1077                        Err(e) => {
1078                            debug!(line = %l, error = %e, "failed to parse duplex event, skipping");
1079                            continue;
1080                        }
1081                    };
1082                    match handle_inbound(parsed, &mut pending, &events_tx) {
1083                        InboundAction::None => {}
1084                        InboundAction::Permission(req) => {
1085                            let request_id = req.request_id.clone();
1086                            let decision = match permission_handler.as_ref() {
1087                                Some(h) => h.invoke(req).await,
1088                                None => {
1089                                    warn!(
1090                                        request_id = %request_id,
1091                                        "received can_use_tool with no permission handler; auto-denying"
1092                                    );
1093                                    PermissionDecision::Deny {
1094                                        message:
1095                                            "no permission handler configured on duplex session"
1096                                                .into(),
1097                                    }
1098                                }
1099                            };
1100                            if matches!(decision, PermissionDecision::Defer) {
1101                                debug!(
1102                                    request_id = %request_id,
1103                                    "permission handler deferred; waiting for respond_to_permission"
1104                                );
1105                            } else if let Err(e) =
1106                                write_permission_response(&mut stdin, &request_id, &decision).await
1107                            {
1108                                warn!(error = %e, "failed to write permission response");
1109                            }
1110                        }
1111                        InboundAction::ControlResponse { request_id, outcome } => {
1112                            if let Some(reply) = pending_control.remove(&request_id) {
1113                                let _ = reply.send(outcome);
1114                            } else {
1115                                debug!(
1116                                    request_id = %request_id,
1117                                    "received control_response with no pending request"
1118                                );
1119                            }
1120                        }
1121                    }
1122                }
1123                Ok(None) => break,
1124                Err(e) => {
1125                    stream_err = Some(Error::Io {
1126                        message: "failed to read duplex stdout".to_string(),
1127                        source: e,
1128                        working_dir: None,
1129                    });
1130                    break;
1131                }
1132            },
1133
1134            msg = outbound_rx.recv() => match msg {
1135                Some(OutboundMsg::Send { prompt, reply }) => {
1136                    if pending.is_some() {
1137                        let _ = reply.send(Err(Error::DuplexTurnInFlight));
1138                        continue;
1139                    }
1140                    if let Err(e) = write_user(&mut stdin, &prompt).await {
1141                        let _ = reply.send(Err(e));
1142                        continue;
1143                    }
1144                    pending = Some((reply, Vec::new()));
1145                }
1146                Some(OutboundMsg::PermissionResponse { request_id, decision }) => {
1147                    if let Err(e) =
1148                        write_permission_response(&mut stdin, &request_id, &decision).await
1149                    {
1150                        warn!(error = %e, "failed to write deferred permission response");
1151                    }
1152                }
1153                Some(OutboundMsg::Interrupt { reply }) => {
1154                    next_control_id += 1;
1155                    let request_id = format!("interrupt-{next_control_id}");
1156                    if let Err(e) =
1157                        write_control_request(&mut stdin, &request_id, "interrupt").await
1158                    {
1159                        let _ = reply.send(Err(e));
1160                        continue;
1161                    }
1162                    pending_control.insert(request_id, reply);
1163                }
1164                None => break,
1165            },
1166        }
1167    }
1168
1169    drop(stdin);
1170    match tokio::time::timeout(SHUTDOWN_BUDGET, child.wait()).await {
1171        Ok(Ok(_status)) => {}
1172        Ok(Err(e)) => {
1173            warn!(error = %e, "failed to wait for duplex child");
1174        }
1175        Err(_) => {
1176            warn!("duplex child did not exit within shutdown budget; killing");
1177            let _ = child.kill().await;
1178        }
1179    }
1180
1181    if let Some((reply, _)) = pending.take() {
1182        let _ = reply.send(Err(Error::DuplexClosed));
1183    }
1184    for (_, reply) in pending_control.drain() {
1185        let _ = reply.send(Err(Error::DuplexClosed));
1186    }
1187
1188    let result = match stream_err {
1189        Some(e) => Err(e),
1190        None => Ok(()),
1191    };
1192    let final_state = match &result {
1193        Ok(()) => SessionExitStatus::Completed,
1194        Err(e) => SessionExitStatus::Failed(e.to_string()),
1195    };
1196    let _ = exit_tx.send(final_state);
1197    result
1198}
1199
1200/// Action returned from [`handle_inbound`] for the run loop to act
1201/// on after the side-effects (broadcast, accumulate, resolve) are
1202/// done.
1203enum InboundAction {
1204    /// No further action -- side-effects were all handled inline.
1205    None,
1206    /// A `control_request {subtype: "can_use_tool"}` was received and
1207    /// needs the [`PermissionHandler`] invoked. The run loop awaits
1208    /// the handler and writes the response.
1209    Permission(PermissionRequest),
1210    /// A `control_response` matching one of our outbound
1211    /// `control_request`s arrived. The run loop matches `request_id`
1212    /// against its `pending_control` table and resolves the
1213    /// corresponding oneshot.
1214    ControlResponse {
1215        request_id: String,
1216        outcome: Result<()>,
1217    },
1218}
1219
1220fn handle_inbound(
1221    msg: Value,
1222    pending: &mut Option<(oneshot::Sender<Result<TurnResult>>, Vec<Value>)>,
1223    events_tx: &broadcast::Sender<InboundEvent>,
1224) -> InboundAction {
1225    match msg.get("type").and_then(Value::as_str) {
1226        Some("result") => {
1227            if let Some((reply, events)) = pending.take() {
1228                let _ = reply.send(Ok(TurnResult {
1229                    result: msg,
1230                    events,
1231                }));
1232            } else {
1233                debug!("dropping orphan result event with no pending turn");
1234            }
1235            InboundAction::None
1236        }
1237        Some("control_request") => {
1238            // can_use_tool flows through the permission handler;
1239            // anything else is logged + accumulated as Other for now.
1240            if msg
1241                .get("request")
1242                .and_then(|r| r.get("subtype"))
1243                .and_then(Value::as_str)
1244                == Some("can_use_tool")
1245                && let Some(req) = parse_permission_request(&msg)
1246            {
1247                if let Some((_, events)) = pending.as_mut() {
1248                    events.push(msg);
1249                }
1250                return InboundAction::Permission(req);
1251            }
1252            debug!(
1253                ?msg,
1254                "received unhandled control_request; treating as Other"
1255            );
1256            let _ = events_tx.send(InboundEvent::Other(msg.clone()));
1257            if let Some((_, events)) = pending.as_mut() {
1258                events.push(msg);
1259            }
1260            InboundAction::None
1261        }
1262        Some("control_response") => {
1263            if let Some((request_id, outcome)) = parse_control_response(&msg) {
1264                return InboundAction::ControlResponse {
1265                    request_id,
1266                    outcome,
1267                };
1268            }
1269            debug!(
1270                ?msg,
1271                "received malformed control_response; treating as Other"
1272            );
1273            let _ = events_tx.send(InboundEvent::Other(msg.clone()));
1274            if let Some((_, events)) = pending.as_mut() {
1275                events.push(msg);
1276            }
1277            InboundAction::None
1278        }
1279        _ => {
1280            // Broadcast a classified copy. Send error means no
1281            // subscribers, which is fine -- subscribers are optional.
1282            let _ = events_tx.send(classify(&msg));
1283
1284            if let Some((_, events)) = pending.as_mut() {
1285                events.push(msg);
1286            } else {
1287                debug!("dropping inbound event with no pending turn");
1288            }
1289            InboundAction::None
1290        }
1291    }
1292}
1293
1294fn parse_permission_request(msg: &Value) -> Option<PermissionRequest> {
1295    let request_id = msg.get("request_id").and_then(Value::as_str)?;
1296    let request = msg.get("request")?;
1297    let tool_name = request.get("tool_name").and_then(Value::as_str)?;
1298    let input = request.get("input").cloned().unwrap_or(Value::Null);
1299    Some(PermissionRequest {
1300        request_id: request_id.to_string(),
1301        tool_name: tool_name.to_string(),
1302        input,
1303        raw: request.clone(),
1304    })
1305}
1306
1307/// Pull `(request_id, outcome)` out of a `control_response` envelope.
1308///
1309/// Returns `None` if `request_id` is missing or the subtype is
1310/// unrecognised. `Some((id, Ok(())))` for `subtype: "success"`,
1311/// `Some((id, Err(DuplexControlFailed)))` for `subtype: "error"`.
1312fn parse_control_response(msg: &Value) -> Option<(String, Result<()>)> {
1313    let response = msg.get("response")?;
1314    let request_id = response.get("request_id").and_then(Value::as_str)?;
1315    let outcome = match response.get("subtype").and_then(Value::as_str) {
1316        Some("success") => Ok(()),
1317        Some("error") => {
1318            let message = response
1319                .get("error")
1320                .and_then(Value::as_str)
1321                .unwrap_or("unknown control_response error")
1322                .to_string();
1323            Err(Error::DuplexControlFailed { message })
1324        }
1325        _ => return None,
1326    };
1327    Some((request_id.to_string(), outcome))
1328}
1329
1330async fn write_user(stdin: &mut ChildStdin, prompt: &str) -> Result<()> {
1331    let user_msg = serde_json::json!({
1332        "type": "user",
1333        "message": {
1334            "role": "user",
1335            "content": prompt,
1336        },
1337        "parent_tool_use_id": null,
1338    });
1339    write_line(stdin, &user_msg, "user message").await
1340}
1341
1342async fn write_control_request(
1343    stdin: &mut ChildStdin,
1344    request_id: &str,
1345    subtype: &str,
1346) -> Result<()> {
1347    let envelope = serde_json::json!({
1348        "type": "control_request",
1349        "request_id": request_id,
1350        "request": { "subtype": subtype },
1351    });
1352    write_line(stdin, &envelope, "control_request").await
1353}
1354
1355async fn write_permission_response(
1356    stdin: &mut ChildStdin,
1357    request_id: &str,
1358    decision: &PermissionDecision,
1359) -> Result<()> {
1360    let inner = match decision {
1361        PermissionDecision::Allow { updated_input } => {
1362            let mut obj = serde_json::Map::new();
1363            obj.insert("behavior".to_string(), Value::String("allow".to_string()));
1364            if let Some(input) = updated_input {
1365                obj.insert("updatedInput".to_string(), input.clone());
1366            }
1367            Value::Object(obj)
1368        }
1369        PermissionDecision::Deny { message } => serde_json::json!({
1370            "behavior": "deny",
1371            "message": message,
1372        }),
1373        PermissionDecision::Defer => {
1374            // Caller path is supposed to filter this; defensive guard.
1375            return Ok(());
1376        }
1377    };
1378    let envelope = serde_json::json!({
1379        "type": "control_response",
1380        "response": {
1381            "request_id": request_id,
1382            "subtype": "success",
1383            "response": inner,
1384        },
1385    });
1386    write_line(stdin, &envelope, "control_response").await
1387}
1388
1389async fn write_line(stdin: &mut ChildStdin, value: &Value, what: &'static str) -> Result<()> {
1390    let mut line = serde_json::to_string(value).map_err(|e| Error::Json {
1391        message: format!("failed to serialize duplex {what}"),
1392        source: e,
1393    })?;
1394    line.push('\n');
1395    stdin
1396        .write_all(line.as_bytes())
1397        .await
1398        .map_err(|e| Error::Io {
1399            message: format!("failed to write {what} to duplex stdin"),
1400            source: e,
1401            working_dir: None,
1402        })?;
1403    stdin.flush().await.map_err(|e| Error::Io {
1404        message: "failed to flush duplex stdin".to_string(),
1405        source: e,
1406        working_dir: None,
1407    })?;
1408    Ok(())
1409}
1410
1411#[cfg(test)]
1412mod tests {
1413    use super::*;
1414    use serde_json::json;
1415
1416    #[test]
1417    fn into_args_default_includes_required_flags() {
1418        let args = DuplexOptions::default().into_args();
1419        assert!(args.contains(&"--print".to_string()));
1420        assert!(args.contains(&"--verbose".to_string()));
1421        assert!(
1422            args.windows(2)
1423                .any(|w| w == ["--output-format", "stream-json"])
1424        );
1425        assert!(
1426            args.windows(2)
1427                .any(|w| w == ["--input-format", "stream-json"])
1428        );
1429    }
1430
1431    #[test]
1432    fn into_args_includes_model() {
1433        let args = DuplexOptions::default().model("haiku").into_args();
1434        assert!(args.windows(2).any(|w| w == ["--model", "haiku"]));
1435    }
1436
1437    #[test]
1438    fn into_args_includes_system_prompts() {
1439        let args = DuplexOptions::default()
1440            .system_prompt("be concise")
1441            .append_system_prompt("also polite")
1442            .into_args();
1443        assert!(
1444            args.windows(2)
1445                .any(|w| w == ["--system-prompt", "be concise"])
1446        );
1447        assert!(
1448            args.windows(2)
1449                .any(|w| w == ["--append-system-prompt", "also polite"])
1450        );
1451    }
1452
1453    #[test]
1454    fn into_args_appends_raw_args_last() {
1455        let args = DuplexOptions::default()
1456            .arg("--add-dir")
1457            .arg("/tmp/foo")
1458            .into_args();
1459        // Last two entries should be the additional args, in order.
1460        assert_eq!(&args[args.len() - 2..], &["--add-dir", "/tmp/foo"]);
1461    }
1462
1463    #[test]
1464    fn into_args_includes_resume_when_set() {
1465        let args = DuplexOptions::default().resume("abc-123").into_args();
1466        assert!(args.windows(2).any(|w| w == ["--resume", "abc-123"]));
1467    }
1468
1469    #[test]
1470    fn into_args_omits_resume_by_default() {
1471        let args = DuplexOptions::default().into_args();
1472        assert!(
1473            !args.iter().any(|a| a == "--resume"),
1474            "--resume should not appear without an explicit resume(...) call; got {args:?}"
1475        );
1476    }
1477
1478    #[test]
1479    fn into_args_includes_continue_when_set() {
1480        let args = DuplexOptions::default().continue_session().into_args();
1481        assert!(args.iter().any(|a| a == "--continue"));
1482    }
1483
1484    #[test]
1485    fn into_args_omits_continue_by_default() {
1486        let args = DuplexOptions::default().into_args();
1487        assert!(!args.iter().any(|a| a == "--continue"));
1488    }
1489
1490    #[test]
1491    fn into_args_includes_worktree_flag_without_name() {
1492        let args = DuplexOptions::default().worktree(None::<&str>).into_args();
1493        assert!(args.iter().any(|a| a == "--worktree"));
1494        // No name means no positional follows --worktree.
1495        let pos = args.iter().position(|a| a == "--worktree").unwrap();
1496        assert!(
1497            args.get(pos + 1).is_none_or(|a| a.starts_with("--")),
1498            "--worktree without a name should not be followed by a positional; got {args:?}"
1499        );
1500    }
1501
1502    #[test]
1503    fn into_args_includes_worktree_flag_with_name() {
1504        let args = DuplexOptions::default()
1505            .worktree(Some("agent-xyz"))
1506            .into_args();
1507        let pos = args.iter().position(|a| a == "--worktree").unwrap();
1508        assert_eq!(args.get(pos + 1).map(String::as_str), Some("agent-xyz"));
1509    }
1510
1511    #[test]
1512    fn into_args_omits_worktree_by_default() {
1513        let args = DuplexOptions::default().into_args();
1514        assert!(
1515            !args.iter().any(|a| a == "--worktree"),
1516            "--worktree should not appear without an explicit worktree(...) call; got {args:?}"
1517        );
1518    }
1519
1520    #[test]
1521    fn worktree_lands_before_additional_args() {
1522        // Same `--` ordering bug class as resume.
1523        let args = DuplexOptions::default()
1524            .worktree(Some("foo"))
1525            .arg("--")
1526            .arg("trailing")
1527            .into_args();
1528        let wt_pos = args.iter().position(|a| a == "--worktree").unwrap();
1529        let dash_dash_pos = args.iter().position(|a| a == "--").unwrap();
1530        assert!(
1531            wt_pos < dash_dash_pos,
1532            "--worktree must precede `--` separator; got {args:?}"
1533        );
1534    }
1535
1536    #[test]
1537    fn into_args_includes_agent_when_set() {
1538        let args = DuplexOptions::default().agent("rust-qa").into_args();
1539        assert!(
1540            args.windows(2).any(|w| w == ["--agent", "rust-qa"]),
1541            "missing --agent rust-qa in {args:?}"
1542        );
1543    }
1544
1545    #[test]
1546    fn into_args_omits_agent_by_default() {
1547        let args = DuplexOptions::default().into_args();
1548        assert!(
1549            !args.iter().any(|a| a == "--agent"),
1550            "--agent should not appear without an explicit agent(...) call; got {args:?}"
1551        );
1552    }
1553
1554    #[test]
1555    fn into_args_includes_agents_json_when_set() {
1556        let json = r#"{"reviewer":{"description":"r","prompt":"p"}}"#;
1557        let args = DuplexOptions::default().agents_json(json).into_args();
1558        let pos = args.iter().position(|a| a == "--agents").unwrap();
1559        assert_eq!(args.get(pos + 1).map(String::as_str), Some(json));
1560    }
1561
1562    #[test]
1563    fn into_args_omits_agents_json_by_default() {
1564        let args = DuplexOptions::default().into_args();
1565        assert!(!args.iter().any(|a| a == "--agents"));
1566    }
1567
1568    #[test]
1569    fn agent_and_agents_json_compose() {
1570        let json = r#"{"reviewer":{"description":"r","prompt":"p"}}"#;
1571        let args = DuplexOptions::default()
1572            .agents_json(json)
1573            .agent("reviewer")
1574            .into_args();
1575        // Both flags present.
1576        assert!(args.iter().any(|a| a == "--agents"));
1577        assert!(args.iter().any(|a| a == "--agent"));
1578    }
1579
1580    #[test]
1581    fn agent_lands_before_additional_args() {
1582        let args = DuplexOptions::default()
1583            .agent("rust-qa")
1584            .arg("--")
1585            .arg("trailing")
1586            .into_args();
1587        let agent_pos = args.iter().position(|a| a == "--agent").unwrap();
1588        let dash_dash_pos = args.iter().position(|a| a == "--").unwrap();
1589        assert!(
1590            agent_pos < dash_dash_pos,
1591            "--agent must precede `--` separator; got {args:?}"
1592        );
1593    }
1594
1595    #[test]
1596    fn agents_json_lands_before_additional_args() {
1597        let args = DuplexOptions::default()
1598            .agents_json("{}")
1599            .arg("--")
1600            .arg("trailing")
1601            .into_args();
1602        let agents_pos = args.iter().position(|a| a == "--agents").unwrap();
1603        let dash_dash_pos = args.iter().position(|a| a == "--").unwrap();
1604        assert!(
1605            agents_pos < dash_dash_pos,
1606            "--agents must precede `--` separator; got {args:?}"
1607        );
1608    }
1609
1610    #[test]
1611    fn resume_lands_before_additional_args() {
1612        // Catches the same class of bug as QueryCommand::execute_json
1613        // had: a flag appended after the user-supplied raw args (which
1614        // typically include `--`) gets eaten as a positional. Resume
1615        // must precede any caller-injected `arg(...)`.
1616        let args = DuplexOptions::default()
1617            .resume("xyz")
1618            .arg("--")
1619            .arg("trailing")
1620            .into_args();
1621        let resume_pos = args.iter().position(|a| a == "--resume").unwrap();
1622        let dash_dash_pos = args.iter().position(|a| a == "--").unwrap();
1623        assert!(
1624            resume_pos < dash_dash_pos,
1625            "--resume must precede `--` separator; got {args:?}"
1626        );
1627    }
1628
1629    #[test]
1630    fn turn_result_accessors_pull_from_result() {
1631        let r = TurnResult {
1632            result: json!({
1633                "type": "result",
1634                "result": "hello",
1635                "session_id": "sess-123",
1636                "total_cost_usd": 0.0042,
1637                "duration_ms": 1234_u64,
1638            }),
1639            events: vec![],
1640        };
1641        assert_eq!(r.result_text(), Some("hello"));
1642        assert_eq!(r.session_id(), Some("sess-123"));
1643        assert_eq!(r.total_cost_usd(), Some(0.0042));
1644        assert_eq!(r.duration_ms(), Some(1234));
1645    }
1646
1647    #[test]
1648    fn turn_result_total_cost_falls_back_to_legacy_field() {
1649        let r = TurnResult {
1650            result: json!({ "cost_usd": 0.5 }),
1651            events: vec![],
1652        };
1653        assert_eq!(r.total_cost_usd(), Some(0.5));
1654    }
1655
1656    #[test]
1657    fn turn_result_accessors_return_none_when_missing() {
1658        let r = TurnResult {
1659            result: json!({}),
1660            events: vec![],
1661        };
1662        assert_eq!(r.result_text(), None);
1663        assert_eq!(r.session_id(), None);
1664        assert_eq!(r.total_cost_usd(), None);
1665        assert_eq!(r.duration_ms(), None);
1666    }
1667
1668    #[test]
1669    fn handle_inbound_appends_non_result_to_pending_events() {
1670        let (tx, _reply_rx) = oneshot::channel::<Result<TurnResult>>();
1671        let (events_tx, _events_rx) = broadcast::channel(16);
1672        let mut pending = Some((tx, Vec::new()));
1673        handle_inbound(
1674            json!({ "type": "assistant", "message": {} }),
1675            &mut pending,
1676            &events_tx,
1677        );
1678        let (_, events) = pending.as_ref().unwrap();
1679        assert_eq!(events.len(), 1);
1680        assert_eq!(
1681            events[0].get("type").and_then(Value::as_str),
1682            Some("assistant")
1683        );
1684    }
1685
1686    #[test]
1687    fn handle_inbound_resolves_pending_on_result() {
1688        let (tx, rx) = oneshot::channel::<Result<TurnResult>>();
1689        let (events_tx, _events_rx) = broadcast::channel(16);
1690        let mut pending = Some((tx, vec![json!({ "type": "assistant" })]));
1691        handle_inbound(
1692            json!({ "type": "result", "result": "ok" }),
1693            &mut pending,
1694            &events_tx,
1695        );
1696        assert!(pending.is_none());
1697        let received = rx.blocking_recv().unwrap().unwrap();
1698        assert_eq!(received.result_text(), Some("ok"));
1699        assert_eq!(received.events.len(), 1);
1700    }
1701
1702    #[test]
1703    fn handle_inbound_drops_orphans_without_pending_turn() {
1704        let (events_tx, _events_rx) = broadcast::channel(16);
1705        let mut pending: Option<(oneshot::Sender<Result<TurnResult>>, Vec<Value>)> = None;
1706        handle_inbound(json!({ "type": "assistant" }), &mut pending, &events_tx);
1707        handle_inbound(
1708            json!({ "type": "result", "result": "ok" }),
1709            &mut pending,
1710            &events_tx,
1711        );
1712        assert!(pending.is_none());
1713    }
1714
1715    #[test]
1716    fn handle_inbound_broadcasts_classified_event() {
1717        let (tx, _reply_rx) = oneshot::channel::<Result<TurnResult>>();
1718        let (events_tx, mut events_rx) = broadcast::channel(16);
1719        let mut pending = Some((tx, Vec::new()));
1720        handle_inbound(
1721            json!({ "type": "assistant", "message": { "role": "assistant" } }),
1722            &mut pending,
1723            &events_tx,
1724        );
1725        let event = events_rx.try_recv().expect("classified event broadcast");
1726        assert!(matches!(event, InboundEvent::Assistant(_)));
1727    }
1728
1729    #[test]
1730    fn handle_inbound_does_not_broadcast_result() {
1731        let (tx, _reply_rx) = oneshot::channel::<Result<TurnResult>>();
1732        let (events_tx, mut events_rx) = broadcast::channel(16);
1733        let mut pending = Some((tx, Vec::new()));
1734        handle_inbound(
1735            json!({ "type": "result", "result": "ok" }),
1736            &mut pending,
1737            &events_tx,
1738        );
1739        // Result is not broadcast -- it lands in TurnResult.result.
1740        assert!(events_rx.try_recv().is_err());
1741    }
1742
1743    #[test]
1744    fn classify_system_init_pulls_session_id() {
1745        let v = json!({
1746            "type": "system",
1747            "subtype": "init",
1748            "session_id": "sess-abc",
1749        });
1750        match classify(&v) {
1751            InboundEvent::SystemInit { session_id } => assert_eq!(session_id, "sess-abc"),
1752            other => panic!("expected SystemInit, got {other:?}"),
1753        }
1754    }
1755
1756    #[test]
1757    fn classify_system_without_init_subtype_is_other() {
1758        let v = json!({ "type": "system", "subtype": "compaction" });
1759        assert!(matches!(classify(&v), InboundEvent::Other(_)));
1760    }
1761
1762    #[test]
1763    fn classify_system_init_without_session_id_is_other() {
1764        let v = json!({ "type": "system", "subtype": "init" });
1765        assert!(matches!(classify(&v), InboundEvent::Other(_)));
1766    }
1767
1768    #[test]
1769    fn classify_assistant_stream_event_user() {
1770        assert!(matches!(
1771            classify(&json!({ "type": "assistant" })),
1772            InboundEvent::Assistant(_)
1773        ));
1774        assert!(matches!(
1775            classify(&json!({ "type": "stream_event" })),
1776            InboundEvent::StreamEvent(_)
1777        ));
1778        assert!(matches!(
1779            classify(&json!({ "type": "user" })),
1780            InboundEvent::User(_)
1781        ));
1782    }
1783
1784    #[test]
1785    fn classify_unknown_type_is_other() {
1786        assert!(matches!(
1787            classify(&json!({ "type": "control_request" })),
1788            InboundEvent::Other(_)
1789        ));
1790        assert!(matches!(
1791            classify(&json!({ "type": "future_thing" })),
1792            InboundEvent::Other(_)
1793        ));
1794        assert!(matches!(classify(&json!({})), InboundEvent::Other(_)));
1795    }
1796
1797    #[test]
1798    fn into_args_does_not_emit_subscriber_capacity_flag() {
1799        // subscriber_capacity is runtime config, not a CLI arg.
1800        let args = DuplexOptions::default().subscriber_capacity(64).into_args();
1801        assert!(!args.iter().any(|a| a.contains("subscriber")));
1802        assert!(!args.iter().any(|a| a.contains("capacity")));
1803    }
1804
1805    #[test]
1806    fn into_args_includes_permission_prompt_tool_when_handler_set() {
1807        let handler = PermissionHandler::new(|_req| async move {
1808            PermissionDecision::Allow {
1809                updated_input: None,
1810            }
1811        });
1812        let args = DuplexOptions::default().on_permission(handler).into_args();
1813        assert!(
1814            args.windows(2)
1815                .any(|w| w == ["--permission-prompt-tool", "stdio"])
1816        );
1817    }
1818
1819    #[test]
1820    fn into_args_omits_permission_prompt_tool_without_handler() {
1821        let args = DuplexOptions::default().into_args();
1822        assert!(!args.iter().any(|a| a == "--permission-prompt-tool"));
1823    }
1824
1825    #[test]
1826    fn into_args_emits_permission_mode_flag() {
1827        let args = DuplexOptions::default()
1828            .permission_mode(PermissionMode::AcceptEdits)
1829            .into_args();
1830        assert!(
1831            args.windows(2)
1832                .any(|w| w == ["--permission-mode", "acceptEdits"]),
1833            "missing --permission-mode acceptEdits in {args:?}"
1834        );
1835    }
1836
1837    #[test]
1838    fn into_args_emits_plan_mode() {
1839        let args = DuplexOptions::default()
1840            .permission_mode(PermissionMode::Plan)
1841            .into_args();
1842        assert!(args.windows(2).any(|w| w == ["--permission-mode", "plan"]));
1843    }
1844
1845    #[test]
1846    fn into_args_omits_permission_mode_by_default() {
1847        let args = DuplexOptions::default().into_args();
1848        assert!(!args.iter().any(|a| a == "--permission-mode"));
1849    }
1850
1851    #[test]
1852    fn into_args_emits_dangerously_skip_permissions_flag() {
1853        let args = DuplexOptions::default()
1854            .dangerously_skip_permissions()
1855            .into_args();
1856        assert!(args.iter().any(|a| a == "--dangerously-skip-permissions"));
1857    }
1858
1859    #[test]
1860    fn into_args_omits_dangerously_skip_by_default() {
1861        let args = DuplexOptions::default().into_args();
1862        assert!(!args.iter().any(|a| a == "--dangerously-skip-permissions"));
1863    }
1864
1865    #[test]
1866    fn parse_permission_request_extracts_fields() {
1867        let msg = json!({
1868            "type": "control_request",
1869            "request_id": "req-1",
1870            "request": {
1871                "subtype": "can_use_tool",
1872                "tool_name": "Bash",
1873                "input": { "command": "ls" }
1874            }
1875        });
1876        let req = parse_permission_request(&msg).expect("permission request");
1877        assert_eq!(req.request_id, "req-1");
1878        assert_eq!(req.tool_name, "Bash");
1879        assert_eq!(req.input, json!({ "command": "ls" }));
1880        assert_eq!(
1881            req.raw.get("subtype").and_then(Value::as_str),
1882            Some("can_use_tool")
1883        );
1884    }
1885
1886    #[test]
1887    fn parse_permission_request_returns_none_when_missing_request_id() {
1888        let msg = json!({
1889            "type": "control_request",
1890            "request": {
1891                "subtype": "can_use_tool",
1892                "tool_name": "Bash",
1893            }
1894        });
1895        assert!(parse_permission_request(&msg).is_none());
1896    }
1897
1898    #[test]
1899    fn parse_permission_request_returns_none_when_missing_tool_name() {
1900        let msg = json!({
1901            "type": "control_request",
1902            "request_id": "req-1",
1903            "request": { "subtype": "can_use_tool" }
1904        });
1905        assert!(parse_permission_request(&msg).is_none());
1906    }
1907
1908    #[test]
1909    fn parse_permission_request_handles_missing_input() {
1910        let msg = json!({
1911            "type": "control_request",
1912            "request_id": "req-1",
1913            "request": {
1914                "subtype": "can_use_tool",
1915                "tool_name": "Bash",
1916            }
1917        });
1918        let req = parse_permission_request(&msg).expect("request");
1919        assert_eq!(req.input, Value::Null);
1920    }
1921
1922    #[test]
1923    fn handle_inbound_returns_permission_for_can_use_tool() {
1924        let (tx, _reply_rx) = oneshot::channel::<Result<TurnResult>>();
1925        let (events_tx, _events_rx) = broadcast::channel(16);
1926        let mut pending = Some((tx, Vec::new()));
1927        let action = handle_inbound(
1928            json!({
1929                "type": "control_request",
1930                "request_id": "req-1",
1931                "request": {
1932                    "subtype": "can_use_tool",
1933                    "tool_name": "Bash",
1934                    "input": { "command": "ls" }
1935                }
1936            }),
1937            &mut pending,
1938            &events_tx,
1939        );
1940        match action {
1941            InboundAction::Permission(req) => {
1942                assert_eq!(req.request_id, "req-1");
1943                assert_eq!(req.tool_name, "Bash");
1944            }
1945            InboundAction::None | InboundAction::ControlResponse { .. } => {
1946                panic!("expected Permission action");
1947            }
1948        }
1949        // Event should also be accumulated in the pending turn.
1950        let (_, events) = pending.as_ref().unwrap();
1951        assert_eq!(events.len(), 1);
1952    }
1953
1954    #[test]
1955    fn handle_inbound_treats_unknown_control_request_as_other() {
1956        let (tx, _reply_rx) = oneshot::channel::<Result<TurnResult>>();
1957        let (events_tx, mut events_rx) = broadcast::channel(16);
1958        let mut pending = Some((tx, Vec::new()));
1959        let action = handle_inbound(
1960            json!({
1961                "type": "control_request",
1962                "request_id": "req-2",
1963                "request": { "subtype": "future_subtype" }
1964            }),
1965            &mut pending,
1966            &events_tx,
1967        );
1968        assert!(matches!(action, InboundAction::None));
1969        let event = events_rx.try_recv().expect("broadcast");
1970        assert!(matches!(event, InboundEvent::Other(_)));
1971    }
1972
1973    #[tokio::test]
1974    async fn permission_handler_invokes_closure_async() {
1975        let handler = PermissionHandler::new(|req| async move {
1976            if req.tool_name == "Bash" {
1977                PermissionDecision::Deny {
1978                    message: "no bash".into(),
1979                }
1980            } else {
1981                PermissionDecision::Allow {
1982                    updated_input: None,
1983                }
1984            }
1985        });
1986        let req = PermissionRequest {
1987            request_id: "r1".into(),
1988            tool_name: "Bash".into(),
1989            input: Value::Null,
1990            raw: Value::Null,
1991        };
1992        match handler.invoke(req).await {
1993            PermissionDecision::Deny { message } => assert_eq!(message, "no bash"),
1994            other => panic!("expected Deny, got {other:?}"),
1995        }
1996    }
1997
1998    #[test]
1999    fn parse_control_response_extracts_success() {
2000        let msg = json!({
2001            "type": "control_response",
2002            "response": {
2003                "request_id": "interrupt-1",
2004                "subtype": "success",
2005                "response": {}
2006            }
2007        });
2008        let (id, outcome) = parse_control_response(&msg).expect("parsed");
2009        assert_eq!(id, "interrupt-1");
2010        assert!(outcome.is_ok());
2011    }
2012
2013    #[test]
2014    fn parse_control_response_extracts_error_with_message() {
2015        let msg = json!({
2016            "type": "control_response",
2017            "response": {
2018                "request_id": "interrupt-2",
2019                "subtype": "error",
2020                "error": "no turn in flight"
2021            }
2022        });
2023        let (id, outcome) = parse_control_response(&msg).expect("parsed");
2024        assert_eq!(id, "interrupt-2");
2025        match outcome {
2026            Err(Error::DuplexControlFailed { message }) => {
2027                assert_eq!(message, "no turn in flight");
2028            }
2029            other => panic!("expected DuplexControlFailed, got {other:?}"),
2030        }
2031    }
2032
2033    #[test]
2034    fn parse_control_response_returns_none_on_missing_request_id() {
2035        let msg = json!({
2036            "type": "control_response",
2037            "response": { "subtype": "success" }
2038        });
2039        assert!(parse_control_response(&msg).is_none());
2040    }
2041
2042    #[test]
2043    fn parse_control_response_returns_none_on_unknown_subtype() {
2044        let msg = json!({
2045            "type": "control_response",
2046            "response": { "request_id": "x", "subtype": "future_subtype" }
2047        });
2048        assert!(parse_control_response(&msg).is_none());
2049    }
2050
2051    #[test]
2052    fn handle_inbound_returns_control_response_action() {
2053        let (tx, _reply_rx) = oneshot::channel::<Result<TurnResult>>();
2054        let (events_tx, _events_rx) = broadcast::channel(16);
2055        let mut pending = Some((tx, Vec::new()));
2056        let action = handle_inbound(
2057            json!({
2058                "type": "control_response",
2059                "response": {
2060                    "request_id": "interrupt-1",
2061                    "subtype": "success",
2062                    "response": {}
2063                }
2064            }),
2065            &mut pending,
2066            &events_tx,
2067        );
2068        match action {
2069            InboundAction::ControlResponse {
2070                request_id,
2071                outcome,
2072            } => {
2073                assert_eq!(request_id, "interrupt-1");
2074                assert!(outcome.is_ok());
2075            }
2076            InboundAction::None | InboundAction::Permission(_) => {
2077                panic!("expected ControlResponse action");
2078            }
2079        }
2080    }
2081
2082    #[test]
2083    fn handle_inbound_treats_malformed_control_response_as_other() {
2084        let (tx, _reply_rx) = oneshot::channel::<Result<TurnResult>>();
2085        let (events_tx, mut events_rx) = broadcast::channel(16);
2086        let mut pending = Some((tx, Vec::new()));
2087        let action = handle_inbound(
2088            json!({
2089                "type": "control_response",
2090                "response": { "subtype": "success" }
2091            }),
2092            &mut pending,
2093            &events_tx,
2094        );
2095        assert!(matches!(action, InboundAction::None));
2096        let event = events_rx.try_recv().expect("broadcast");
2097        assert!(matches!(event, InboundEvent::Other(_)));
2098    }
2099
2100    #[tokio::test]
2101    async fn permission_handler_clones_arc() {
2102        let handler = PermissionHandler::new(|_req| async move {
2103            PermissionDecision::Allow {
2104                updated_input: None,
2105            }
2106        });
2107        let cloned = handler.clone();
2108        let req = PermissionRequest {
2109            request_id: "r1".into(),
2110            tool_name: "Read".into(),
2111            input: Value::Null,
2112            raw: Value::Null,
2113        };
2114        // Both handles invoke the same underlying closure.
2115        let _ = handler.invoke(req.clone()).await;
2116        let _ = cloned.invoke(req).await;
2117    }
2118
2119    /// Build a `DuplexSession` whose channels are wired up but whose
2120    /// background task is a no-op. Tests can drive the watch state
2121    /// machine via the returned `exit_tx` and observe the public
2122    /// accessors. The fake task idles on a oneshot so it stays alive
2123    /// for the life of the test (no JoinHandle::abort handshake
2124    /// needed).
2125    fn fake_session(
2126        initial: SessionExitStatus,
2127    ) -> (
2128        DuplexSession,
2129        watch::Sender<SessionExitStatus>,
2130        oneshot::Sender<()>,
2131    ) {
2132        let (outbound_tx, outbound_rx) = mpsc::unbounded_channel::<OutboundMsg>();
2133        let (events_tx, _events_rx) = broadcast::channel::<InboundEvent>(16);
2134        let (exit_tx, exit_rx) = watch::channel(initial);
2135        let (stop_tx, stop_rx) = oneshot::channel::<()>();
2136
2137        let join = tokio::spawn(async move {
2138            let _outbound_rx = outbound_rx;
2139            let _ = stop_rx.await;
2140            Ok::<(), Error>(())
2141        });
2142
2143        let session = DuplexSession {
2144            outbound_tx,
2145            events_tx,
2146            exit_rx,
2147            join,
2148        };
2149        (session, exit_tx, stop_tx)
2150    }
2151
2152    #[tokio::test]
2153    async fn is_alive_true_while_running() {
2154        let (session, _exit_tx, _stop) = fake_session(SessionExitStatus::Running);
2155        assert!(session.is_alive());
2156    }
2157
2158    #[tokio::test]
2159    async fn is_alive_false_after_completed() {
2160        let (session, exit_tx, _stop) = fake_session(SessionExitStatus::Running);
2161        exit_tx.send(SessionExitStatus::Completed).unwrap();
2162        assert!(!session.is_alive());
2163    }
2164
2165    #[tokio::test]
2166    async fn is_alive_false_after_failed() {
2167        let (session, exit_tx, _stop) = fake_session(SessionExitStatus::Running);
2168        exit_tx
2169            .send(SessionExitStatus::Failed("boom".into()))
2170            .unwrap();
2171        assert!(!session.is_alive());
2172    }
2173
2174    #[tokio::test]
2175    async fn exit_status_reports_running_initially() {
2176        let (session, _exit_tx, _stop) = fake_session(SessionExitStatus::Running);
2177        assert!(matches!(session.exit_status(), SessionExitStatus::Running));
2178    }
2179
2180    #[tokio::test]
2181    async fn exit_status_reflects_completed() {
2182        let (session, exit_tx, _stop) = fake_session(SessionExitStatus::Running);
2183        exit_tx.send(SessionExitStatus::Completed).unwrap();
2184        assert!(matches!(
2185            session.exit_status(),
2186            SessionExitStatus::Completed
2187        ));
2188    }
2189
2190    #[tokio::test]
2191    async fn exit_status_reflects_failed_with_message() {
2192        let (session, exit_tx, _stop) = fake_session(SessionExitStatus::Running);
2193        exit_tx
2194            .send(SessionExitStatus::Failed("oh no".into()))
2195            .unwrap();
2196        match session.exit_status() {
2197            SessionExitStatus::Failed(msg) => assert_eq!(msg, "oh no"),
2198            other => panic!("expected Failed, got {other:?}"),
2199        }
2200    }
2201
2202    #[tokio::test]
2203    async fn wait_for_exit_returns_immediately_when_already_terminal() {
2204        let (session, exit_tx, _stop) = fake_session(SessionExitStatus::Running);
2205        exit_tx.send(SessionExitStatus::Completed).unwrap();
2206        let status = tokio::time::timeout(Duration::from_secs(1), session.wait_for_exit())
2207            .await
2208            .expect("wait_for_exit should not block when already terminal");
2209        assert!(matches!(status, SessionExitStatus::Completed));
2210    }
2211
2212    #[tokio::test]
2213    async fn wait_for_exit_blocks_until_state_transitions() {
2214        let (session, exit_tx, _stop) = fake_session(SessionExitStatus::Running);
2215
2216        let waiter = async { session.wait_for_exit().await };
2217        let driver = async {
2218            tokio::time::sleep(Duration::from_millis(20)).await;
2219            exit_tx.send(SessionExitStatus::Completed).unwrap();
2220        };
2221        let (status, ()) = tokio::join!(waiter, driver);
2222        assert!(matches!(status, SessionExitStatus::Completed));
2223    }
2224
2225    #[tokio::test]
2226    async fn wait_for_exit_supports_multiple_observers() {
2227        let (session, exit_tx, _stop) = fake_session(SessionExitStatus::Running);
2228
2229        let waiter1 = async { session.wait_for_exit().await };
2230        let waiter2 = async { session.wait_for_exit().await };
2231        let driver = async {
2232            tokio::time::sleep(Duration::from_millis(20)).await;
2233            exit_tx
2234                .send(SessionExitStatus::Failed("crash".into()))
2235                .unwrap();
2236        };
2237        let (s1, s2, ()) = tokio::join!(waiter1, waiter2, driver);
2238        match s1 {
2239            SessionExitStatus::Failed(msg) => assert_eq!(msg, "crash"),
2240            other => panic!("waiter1 expected Failed, got {other:?}"),
2241        }
2242        match s2 {
2243            SessionExitStatus::Failed(msg) => assert_eq!(msg, "crash"),
2244            other => panic!("waiter2 expected Failed, got {other:?}"),
2245        }
2246    }
2247
2248    #[tokio::test]
2249    async fn wait_for_exit_returns_last_value_when_sender_dropped() {
2250        // Defensive: if exit_tx is dropped without ever publishing a
2251        // terminal value, wait_for_exit should fall back to the last
2252        // observed state rather than hang.
2253        let (session, exit_tx, _stop) = fake_session(SessionExitStatus::Running);
2254        let waiter = async { session.wait_for_exit().await };
2255        let driver = async {
2256            tokio::time::sleep(Duration::from_millis(20)).await;
2257            drop(exit_tx);
2258        };
2259        let (status, ()) = tokio::time::timeout(Duration::from_secs(1), async {
2260            tokio::join!(waiter, driver)
2261        })
2262        .await
2263        .expect("wait_for_exit must not hang when sender is dropped");
2264        assert!(matches!(status, SessionExitStatus::Running));
2265    }
2266}