pub struct DuplexSession { /* private fields */ }Expand description
A long-lived claude subprocess in stream-json duplex mode.
Owns a background task that holds the child open, writes user
messages to its stdin, and reads NDJSON events from its stdout.
One turn at a time: calling Self::send while another turn is
in flight returns Error::DuplexTurnInFlight.
See the module docs for the full design.
Implementations§
Source§impl DuplexSession
impl DuplexSession
Sourcepub async fn spawn(claude: &Claude, opts: DuplexOptions) -> Result<Self>
pub async fn spawn(claude: &Claude, opts: DuplexOptions) -> Result<Self>
Spawn a fresh claude subprocess in duplex mode.
The child is started with
--print --verbose --input-format stream-json --output-format stream-json
plus any options applied via opts. The session task takes
ownership of the child; dropping the returned handle (or
calling Self::close) shuts the task down.
Sourcepub async fn send(&self, prompt: impl Into<String>) -> Result<TurnResult>
pub async fn send(&self, prompt: impl Into<String>) -> Result<TurnResult>
Send one user message and await the closing result event.
Returns Error::DuplexTurnInFlight if another turn is
already pending, and Error::DuplexClosed if the session
task has already exited.
Sourcepub fn subscribe(&self) -> Receiver<InboundEvent>
pub fn subscribe(&self) -> Receiver<InboundEvent>
Subscribe to the session’s classified inbound event stream.
Returns a broadcast::Receiver<InboundEvent> that receives
every non-result event as it arrives. Each subscriber gets
its own buffered view; subscribers added later miss earlier
events. Slow subscribers see
RecvError::Lagged
rather than blocking the session task.
Subscribers see the same events that accumulate in
TurnResult::events, in the same order.
§Example
use claude_wrapper::Claude;
use claude_wrapper::duplex::{DuplexOptions, DuplexSession, InboundEvent};
let claude = Claude::builder().build()?;
let session = DuplexSession::spawn(&claude, DuplexOptions::default()).await?;
let mut rx = session.subscribe();
// Subscribe before send so we receive every event.
let _turn = session.send("hello").await?;
while let Ok(event) = rx.try_recv() {
if let InboundEvent::SystemInit { session_id } = event {
println!("session id: {session_id}");
}
}Sourcepub fn is_alive(&self) -> bool
pub fn is_alive(&self) -> bool
Cheap, non-blocking liveness check.
Returns true while the session task is running, false once
it has exited (whether normally or with an error). Multiple
concurrent callers are allowed, and the call does not consume
the session: Self::close still works after polling.
Reads the latest value from a tokio::sync::watch channel
updated from inside the session task, so it never blocks and
reflects state set just before the task returns.
Sourcepub fn exit_status(&self) -> SessionExitStatus
pub fn exit_status(&self) -> SessionExitStatus
Snapshot the session task’s SessionExitStatus.
Returns SessionExitStatus::Running while the task is still
alive, SessionExitStatus::Completed after a clean exit, or
SessionExitStatus::Failed with the underlying error
rendered to a string.
Like Self::is_alive, this is a cheap non-blocking read.
Sourcepub async fn wait_for_exit(&self) -> SessionExitStatus
pub async fn wait_for_exit(&self) -> SessionExitStatus
Block until the session task transitions out of
SessionExitStatus::Running and return the terminal status.
Returns immediately if the task has already exited. Multiple concurrent callers are supported (each gets its own receiver clone), and the call does not consume the session.
If the underlying watch sender is dropped without ever publishing a terminal state – which should not happen in practice, but is treated defensively – this returns the last observed value.
Sourcepub fn respond_to_permission(
&self,
request_id: impl Into<String>,
decision: PermissionDecision,
) -> Result<()>
pub fn respond_to_permission( &self, request_id: impl Into<String>, decision: PermissionDecision, ) -> Result<()>
Answer a deferred permission request from a different task.
Use this after the PermissionHandler returned
PermissionDecision::Defer for the matching request_id.
Passing decision = PermissionDecision::Defer here is a
no-op (logged at warn); pass Allow or Deny.
Returns Error::DuplexClosed if the session task has
already exited.
§Example
use claude_wrapper::Claude;
use claude_wrapper::duplex::{
DuplexOptions, DuplexSession, PermissionDecision, PermissionHandler,
};
use tokio::sync::mpsc;
// Forward request_ids out to a UI thread; answer asynchronously.
let (tx, _rx) = mpsc::unbounded_channel::<String>();
let handler = PermissionHandler::new(move |req| {
let tx = tx.clone();
async move {
let _ = tx.send(req.request_id);
PermissionDecision::Defer
}
});
let claude = Claude::builder().build()?;
let session = DuplexSession::spawn(
&claude,
DuplexOptions::default().on_permission(handler),
).await?;
// ...later, from the UI thread:
session.respond_to_permission(
"req-abc",
PermissionDecision::Allow { updated_input: None },
)?;Sourcepub async fn interrupt(&self) -> Result<()>
pub async fn interrupt(&self) -> Result<()>
Send a clean interrupt to the CLI and wait for its acknowledgment.
Writes a control_request {subtype: "interrupt"} and resolves
when the matching control_response comes back. The
in-flight turn (if any) closes shortly after with a truncated
TurnResult – the DuplexSession::send future for it
resolves independently. Either ordering is possible; await
both via tokio::join! if you care about both outcomes.
Returns:
Ok(())when the CLI acknowledges withsubtype: "success".Error::DuplexControlFailedwhen the CLI answers with an error payload.Error::DuplexClosedif the session task exited before the response arrived.
§Example
use std::time::Duration;
use claude_wrapper::Claude;
use claude_wrapper::duplex::{DuplexOptions, DuplexSession};
let claude = Claude::builder().build()?;
let session = DuplexSession::spawn(&claude, DuplexOptions::default()).await?;
let send_fut = session.send("a question that triggers tool use");
let interrupt_fut = async {
tokio::time::sleep(Duration::from_millis(250)).await;
session.interrupt().await
};
let (turn, interrupt) = tokio::join!(send_fut, interrupt_fut);
let _truncated = turn?;
interrupt?;