Skip to main content

DuplexSession

Struct DuplexSession 

Source
pub struct DuplexSession { /* private fields */ }
Expand description

A long-lived claude subprocess in stream-json duplex mode.

Owns a background task that holds the child open, writes user messages to its stdin, and reads NDJSON events from its stdout. One turn at a time: calling Self::send while another turn is in flight returns Error::DuplexTurnInFlight.

See the module docs for the full design.

Implementations§

Source§

impl DuplexSession

Source

pub async fn spawn(claude: &Claude, opts: DuplexOptions) -> Result<Self>

Spawn a fresh claude subprocess in duplex mode.

The child is started with --print --verbose --input-format stream-json --output-format stream-json plus any options applied via opts. The session task takes ownership of the child; dropping the returned handle (or calling Self::close) shuts the task down.

Source

pub async fn send(&self, prompt: impl Into<String>) -> Result<TurnResult>

Send one user message and await the closing result event.

Returns Error::DuplexTurnInFlight if another turn is already pending, and Error::DuplexClosed if the session task has already exited.

Source

pub fn subscribe(&self) -> Receiver<InboundEvent>

Subscribe to the session’s classified inbound event stream.

Returns a broadcast::Receiver<InboundEvent> that receives every non-result event as it arrives. Each subscriber gets its own buffered view; subscribers added later miss earlier events. Slow subscribers see RecvError::Lagged rather than blocking the session task.

Subscribers see the same events that accumulate in TurnResult::events, in the same order.

§Example
use claude_wrapper::Claude;
use claude_wrapper::duplex::{DuplexOptions, DuplexSession, InboundEvent};

let claude = Claude::builder().build()?;
let session = DuplexSession::spawn(&claude, DuplexOptions::default()).await?;
let mut rx = session.subscribe();

// Subscribe before send so we receive every event.
let _turn = session.send("hello").await?;

while let Ok(event) = rx.try_recv() {
    if let InboundEvent::SystemInit { session_id } = event {
        println!("session id: {session_id}");
    }
}
Source

pub fn is_alive(&self) -> bool

Cheap, non-blocking liveness check.

Returns true while the session task is running, false once it has exited (whether normally or with an error). Multiple concurrent callers are allowed, and the call does not consume the session: Self::close still works after polling.

Reads the latest value from a tokio::sync::watch channel updated from inside the session task, so it never blocks and reflects state set just before the task returns.

Source

pub fn exit_status(&self) -> SessionExitStatus

Snapshot the session task’s SessionExitStatus.

Returns SessionExitStatus::Running while the task is still alive, SessionExitStatus::Completed after a clean exit, or SessionExitStatus::Failed with the underlying error rendered to a string.

Like Self::is_alive, this is a cheap non-blocking read.

Source

pub async fn wait_for_exit(&self) -> SessionExitStatus

Block until the session task transitions out of SessionExitStatus::Running and return the terminal status.

Returns immediately if the task has already exited. Multiple concurrent callers are supported (each gets its own receiver clone), and the call does not consume the session.

If the underlying watch sender is dropped without ever publishing a terminal state – which should not happen in practice, but is treated defensively – this returns the last observed value.

Source

pub fn respond_to_permission( &self, request_id: impl Into<String>, decision: PermissionDecision, ) -> Result<()>

Answer a deferred permission request from a different task.

Use this after the PermissionHandler returned PermissionDecision::Defer for the matching request_id. Passing decision = PermissionDecision::Defer here is a no-op (logged at warn); pass Allow or Deny.

Returns Error::DuplexClosed if the session task has already exited.

§Example
use claude_wrapper::Claude;
use claude_wrapper::duplex::{
    DuplexOptions, DuplexSession, PermissionDecision, PermissionHandler,
};
use tokio::sync::mpsc;

// Forward request_ids out to a UI thread; answer asynchronously.
let (tx, _rx) = mpsc::unbounded_channel::<String>();
let handler = PermissionHandler::new(move |req| {
    let tx = tx.clone();
    async move {
        let _ = tx.send(req.request_id);
        PermissionDecision::Defer
    }
});

let claude = Claude::builder().build()?;
let session = DuplexSession::spawn(
    &claude,
    DuplexOptions::default().on_permission(handler),
).await?;

// ...later, from the UI thread:
session.respond_to_permission(
    "req-abc",
    PermissionDecision::Allow { updated_input: None },
)?;
Source

pub async fn interrupt(&self) -> Result<()>

Send a clean interrupt to the CLI and wait for its acknowledgment.

Writes a control_request {subtype: "interrupt"} and resolves when the matching control_response comes back. The in-flight turn (if any) closes shortly after with a truncated TurnResult – the DuplexSession::send future for it resolves independently. Either ordering is possible; await both via tokio::join! if you care about both outcomes.

Returns:

§Example
use std::time::Duration;
use claude_wrapper::Claude;
use claude_wrapper::duplex::{DuplexOptions, DuplexSession};

let claude = Claude::builder().build()?;
let session = DuplexSession::spawn(&claude, DuplexOptions::default()).await?;

let send_fut = session.send("a question that triggers tool use");
let interrupt_fut = async {
    tokio::time::sleep(Duration::from_millis(250)).await;
    session.interrupt().await
};

let (turn, interrupt) = tokio::join!(send_fut, interrupt_fut);
let _truncated = turn?;
interrupt?;
Source

pub async fn close(self) -> Result<()>

Close the session and wait for the underlying task to exit.

Drops the outbound channel sender, which the session task observes as recv() -> None, then closes stdin and reaps the child.

Trait Implementations§

Source§

impl Debug for DuplexSession

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more