claude_code_agent_sdk/client.rs
1//! ClaudeClient for bidirectional streaming interactions with hook support
2
3use tracing::{debug, info, instrument};
4
5use futures::stream::Stream;
6use std::pin::Pin;
7use std::sync::Arc;
8use tokio::io::AsyncWriteExt;
9use tokio::sync::Mutex;
10
11use crate::errors::{ClaudeError, Result};
12use crate::internal::message_parser::MessageParser;
13use crate::internal::query_manager::QueryManager;
14use crate::internal::transport::subprocess::QueryPrompt;
15use crate::internal::transport::{SubprocessTransport, Transport};
16use crate::types::config::{ClaudeAgentOptions, PermissionMode};
17use crate::types::efficiency::{build_efficiency_hooks, merge_hooks};
18use crate::types::hooks::HookEvent;
19use crate::types::messages::{Message, UserContentBlock};
20
21/// Client for bidirectional streaming interactions with Claude
22///
23/// This client provides the same functionality as Python's ClaudeSDKClient,
24/// supporting bidirectional communication, streaming responses, and dynamic
25/// control over the Claude session.
26///
27/// This implementation uses the Codex-style architecture with isolated queries,
28/// where each query has its own completely independent QueryFull instance.
29/// This ensures complete message isolation and prevents ResultMessage confusion.
30///
31/// # Example
32///
33/// ```no_run
34/// use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
35/// use futures::StreamExt;
36///
37/// #[tokio::main]
38/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
39/// let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
40///
41/// // Connect to Claude
42/// client.connect().await?;
43///
44/// // Send a query
45/// client.query("Hello Claude!").await?;
46///
47/// // Receive response as a stream
48/// {
49/// let mut stream = client.receive_response();
50/// while let Some(message) = stream.next().await {
51/// println!("Received: {:?}", message?);
52/// }
53/// }
54///
55/// // Disconnect
56/// client.disconnect().await?;
57/// Ok(())
58/// }
59/// ```
60pub struct ClaudeClient {
61 options: ClaudeAgentOptions,
62 /// QueryManager for creating isolated queries (Codex-style architecture)
63 query_manager: Option<Arc<QueryManager>>,
64 /// Current query_id for the active prompt
65 current_query_id: Option<String>,
66 connected: bool,
67}
68
69impl ClaudeClient {
70 /// Create a new ClaudeClient
71 ///
72 /// # Arguments
73 ///
74 /// * `options` - Configuration options for the Claude client
75 ///
76 /// # Example
77 ///
78 /// ```no_run
79 /// use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
80 ///
81 /// let client = ClaudeClient::new(ClaudeAgentOptions::default());
82 /// ```
83 pub fn new(options: ClaudeAgentOptions) -> Self {
84 Self {
85 options,
86 query_manager: None,
87 current_query_id: None,
88 connected: false,
89 }
90 }
91
92 /// Create a new ClaudeClient with early validation
93 ///
94 /// Unlike `new()`, this validates the configuration eagerly by attempting
95 /// to create the transport. This catches issues like invalid working directory
96 /// or missing CLI before `connect()` is called.
97 ///
98 /// # Arguments
99 ///
100 /// * `options` - Configuration options for the Claude client
101 ///
102 /// # Errors
103 ///
104 /// Returns an error if:
105 /// - The working directory does not exist or is not a directory
106 /// - Claude CLI cannot be found
107 ///
108 /// # Example
109 ///
110 /// ```no_run
111 /// use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
112 ///
113 /// let client = ClaudeClient::try_new(ClaudeAgentOptions::default())?;
114 /// # Ok::<(), claude_agent_sdk_rs::ClaudeError>(())
115 /// ```
116 pub fn try_new(options: ClaudeAgentOptions) -> Result<Self> {
117 // Validate by attempting to create transport (but don't keep it)
118 let prompt = QueryPrompt::Streaming;
119 let _ = SubprocessTransport::new(prompt, options.clone())?;
120
121 Ok(Self {
122 options,
123 query_manager: None,
124 current_query_id: None,
125 connected: false,
126 })
127 }
128
129 /// Connect to Claude (analogous to Python's __aenter__)
130 ///
131 /// This establishes the connection to the Claude Code CLI and initializes
132 /// the bidirectional communication channel.
133 ///
134 /// Uses the Codex-style architecture with QueryManager for complete
135 /// message isolation between queries.
136 ///
137 /// # Errors
138 ///
139 /// Returns an error if:
140 /// - Claude CLI cannot be found or started
141 /// - The initialization handshake fails
142 /// - Hook registration fails
143 /// - `can_use_tool` callback is set with incompatible `permission_prompt_tool_name`
144 #[instrument(
145 name = "claude.client.connect",
146 skip(self),
147 fields(
148 has_can_use_tool = self.options.can_use_tool.is_some(),
149 has_hooks = self.options.hooks.is_some(),
150 model = %self.options.model.as_deref().unwrap_or("default"),
151 )
152 )]
153 pub async fn connect(&mut self) -> Result<()> {
154 if self.connected {
155 debug!("Client already connected, skipping");
156 return Ok(());
157 }
158
159 info!("Connecting to Claude Code CLI (using QueryManager for isolated queries)");
160
161 // Automatically set permission_prompt_tool_name to "stdio" when can_use_tool is provided
162 // This matches Python SDK behavior (client.py lines 106-122)
163 // which ensures CLI uses control protocol for permission prompts
164 let mut options = self.options.clone();
165 if options.can_use_tool.is_some() && options.permission_prompt_tool_name.is_none() {
166 info!("can_use_tool callback is set, automatically setting permission_prompt_tool_name to 'stdio'");
167 options.permission_prompt_tool_name = Some("stdio".to_string());
168 }
169
170 // Validate can_use_tool configuration (aligned with Python SDK behavior)
171 // When can_use_tool callback is set, permission_prompt_tool_name must be "stdio"
172 // to ensure the control protocol can handle permission requests
173 if options.can_use_tool.is_some()
174 && let Some(ref tool_name) = options.permission_prompt_tool_name
175 && tool_name != "stdio"
176 {
177 return Err(ClaudeError::InvalidConfig(
178 "can_use_tool callback requires permission_prompt_tool_name to be 'stdio' or unset. \
179 Custom permission_prompt_tool_name is incompatible with can_use_tool callback."
180 .to_string(),
181 ));
182 }
183
184 // Prepare hooks for initialization
185 // Build efficiency hooks if configured
186 let efficiency_hooks = self
187 .options
188 .efficiency
189 .as_ref()
190 .map(build_efficiency_hooks)
191 .unwrap_or_default();
192
193 // Merge user hooks with efficiency hooks
194 let merged_hooks = merge_hooks(self.options.hooks.clone(), efficiency_hooks);
195
196 // Convert hooks to internal format
197 let hooks = merged_hooks.as_ref().map(|hooks_map| {
198 hooks_map
199 .iter()
200 .map(|(event, matchers)| {
201 let event_name = match event {
202 HookEvent::PreToolUse => "PreToolUse",
203 HookEvent::PostToolUse => "PostToolUse",
204 HookEvent::UserPromptSubmit => "UserPromptSubmit",
205 HookEvent::Stop => "Stop",
206 HookEvent::SubagentStop => "SubagentStop",
207 HookEvent::PreCompact => "PreCompact",
208 };
209 (event_name.to_string(), matchers.clone())
210 })
211 .collect()
212 });
213
214 // Extract SDK MCP servers from options
215 let sdk_mcp_servers =
216 if let crate::types::mcp::McpServers::Dict(servers_dict) = &self.options.mcp_servers {
217 servers_dict
218 .iter()
219 .filter_map(|(name, config)| {
220 if let crate::types::mcp::McpServerConfig::Sdk(sdk_config) = config {
221 Some((name.clone(), sdk_config.clone()))
222 } else {
223 None
224 }
225 })
226 .collect()
227 } else {
228 std::collections::HashMap::new()
229 };
230
231 // Clone options for the transport factory
232 let options_clone = options.clone();
233
234 // Create QueryManager with a transport factory
235 // The factory creates new transports for each isolated query
236 // Note: connect() is called later in create_query(), not here
237 let mut query_manager = QueryManager::new(move || {
238 let prompt = QueryPrompt::Streaming;
239 let transport = SubprocessTransport::new(prompt, options_clone.clone())?;
240 Ok(Box::new(transport) as Box<dyn Transport>)
241 });
242
243 // Set control request timeout
244 query_manager.set_control_request_timeout(self.options.control_request_timeout);
245
246 // Set configuration on QueryManager
247 query_manager.set_hooks(hooks).await;
248 query_manager.set_sdk_mcp_servers(sdk_mcp_servers).await;
249 query_manager.set_can_use_tool(self.options.can_use_tool.clone()).await;
250
251 let query_manager = Arc::new(query_manager);
252
253 // Create the first query for initialization
254 let first_query_id = query_manager.create_query().await?;
255
256 // Start cleanup task for inactive queries
257 Arc::clone(&query_manager).start_cleanup_task();
258
259 self.query_manager = Some(query_manager);
260 self.current_query_id = Some(first_query_id);
261 self.connected = true;
262
263 info!("Successfully connected to Claude Code CLI with QueryManager");
264 Ok(())
265 }
266
267 /// Send a query to Claude
268 ///
269 /// This sends a new user prompt to Claude. Claude will remember the context
270 /// of previous queries within the same session.
271 ///
272 /// # Arguments
273 ///
274 /// * `prompt` - The user prompt to send
275 ///
276 /// # Errors
277 ///
278 /// Returns an error if the client is not connected or if sending fails.
279 ///
280 /// # Example
281 ///
282 /// ```no_run
283 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
284 /// # #[tokio::main]
285 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
286 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
287 /// # client.connect().await?;
288 /// client.query("What is 2 + 2?").await?;
289 /// # Ok(())
290 /// # }
291 /// ```
292 #[instrument(
293 name = "claude.client.query",
294 skip(self, prompt),
295 fields(session_id = "default",)
296 )]
297 pub async fn query(&mut self, prompt: impl Into<String>) -> Result<()> {
298 self.query_with_session(prompt, "default").await
299 }
300
301 /// Send a query to Claude with a specific session ID
302 ///
303 /// This sends a new user prompt to Claude. Different session IDs maintain
304 /// separate conversation contexts.
305 ///
306 /// # Arguments
307 ///
308 /// * `prompt` - The user prompt to send
309 /// * `session_id` - Session identifier for the conversation
310 ///
311 /// # Errors
312 ///
313 /// Returns an error if the client is not connected or if sending fails.
314 ///
315 /// # Example
316 ///
317 /// ```no_run
318 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
319 /// # #[tokio::main]
320 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
321 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
322 /// # client.connect().await?;
323 /// // Separate conversation contexts
324 /// client.query_with_session("First question", "session-1").await?;
325 /// client.query_with_session("Different question", "session-2").await?;
326 /// # Ok(())
327 /// # }
328 /// ```
329 pub async fn query_with_session(
330 &mut self,
331 prompt: impl Into<String>,
332 session_id: impl Into<String>,
333 ) -> Result<()> {
334 let query_manager = self.query_manager.as_ref().ok_or_else(|| {
335 ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
336 })?;
337
338 let prompt_str = prompt.into();
339 let session_id_str = session_id.into();
340
341 // Get or create query for this session (reuses existing query to maintain context)
342 let query_id = query_manager.get_or_create_session_query(&session_id_str).await?;
343 self.current_query_id = Some(query_id.clone());
344
345 // Get the isolated query
346 let query = query_manager.get_query(&query_id)?;
347
348 // Format as JSON message for stream-json input format
349 let user_message = serde_json::json!({
350 "type": "user",
351 "message": {
352 "role": "user",
353 "content": prompt_str
354 },
355 "session_id": session_id_str
356 });
357
358 let message_str = serde_json::to_string(&user_message).map_err(|e| {
359 ClaudeError::Transport(format!("Failed to serialize user message: {}", e))
360 })?;
361
362 // Write directly to stdin (bypasses transport lock)
363 let stdin = query.stdin.clone();
364
365 if let Some(stdin_arc) = stdin {
366 let mut stdin_guard = stdin_arc.lock().await;
367 if let Some(ref mut stdin_stream) = *stdin_guard {
368 stdin_stream
369 .write_all(message_str.as_bytes())
370 .await
371 .map_err(|e| ClaudeError::Transport(format!("Failed to write query: {}", e)))?;
372 stdin_stream.write_all(b"\n").await.map_err(|e| {
373 ClaudeError::Transport(format!("Failed to write newline: {}", e))
374 })?;
375 stdin_stream
376 .flush()
377 .await
378 .map_err(|e| ClaudeError::Transport(format!("Failed to flush: {}", e)))?;
379 } else {
380 return Err(ClaudeError::Transport("stdin not available".to_string()));
381 }
382 } else {
383 return Err(ClaudeError::Transport("stdin not set".to_string()));
384 }
385
386 debug!(
387 query_id = %query_id,
388 session_id = %session_id_str,
389 "Sent query to isolated query"
390 );
391
392 Ok(())
393 }
394
395 /// Send a query with structured content blocks (supports images)
396 ///
397 /// This method enables multimodal queries in bidirectional streaming mode.
398 /// Use it to send images alongside text for vision-related tasks.
399 ///
400 /// # Arguments
401 ///
402 /// * `content` - A vector of content blocks (text and/or images)
403 ///
404 /// # Errors
405 ///
406 /// Returns an error if:
407 /// - The content vector is empty (must include at least one text or image block)
408 /// - The client is not connected (call `connect()` first)
409 /// - Sending the message fails
410 ///
411 /// # Example
412 ///
413 /// ```no_run
414 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions, UserContentBlock};
415 /// # #[tokio::main]
416 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
417 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
418 /// # client.connect().await?;
419 /// let base64_data = "iVBORw0KGgo..."; // base64 encoded image
420 /// client.query_with_content(vec![
421 /// UserContentBlock::text("What's in this image?"),
422 /// UserContentBlock::image_base64("image/png", base64_data)?,
423 /// ]).await?;
424 /// # Ok(())
425 /// # }
426 /// ```
427 pub async fn query_with_content(
428 &mut self,
429 content: impl Into<Vec<UserContentBlock>>,
430 ) -> Result<()> {
431 self.query_with_content_and_session(content, "default")
432 .await
433 }
434
435 /// Send a query with structured content blocks and a specific session ID
436 ///
437 /// This method enables multimodal queries with session management for
438 /// maintaining separate conversation contexts.
439 ///
440 /// # Arguments
441 ///
442 /// * `content` - A vector of content blocks (text and/or images)
443 /// * `session_id` - Session identifier for the conversation
444 ///
445 /// # Errors
446 ///
447 /// Returns an error if:
448 /// - The content vector is empty (must include at least one text or image block)
449 /// - The client is not connected (call `connect()` first)
450 /// - Sending the message fails
451 ///
452 /// # Example
453 ///
454 /// ```no_run
455 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions, UserContentBlock};
456 /// # #[tokio::main]
457 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
458 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
459 /// # client.connect().await?;
460 /// client.query_with_content_and_session(
461 /// vec![
462 /// UserContentBlock::text("Analyze this chart"),
463 /// UserContentBlock::image_url("https://example.com/chart.png"),
464 /// ],
465 /// "analysis-session",
466 /// ).await?;
467 /// # Ok(())
468 /// # }
469 /// ```
470 pub async fn query_with_content_and_session(
471 &mut self,
472 content: impl Into<Vec<UserContentBlock>>,
473 session_id: impl Into<String>,
474 ) -> Result<()> {
475 let query_manager = self.query_manager.as_ref().ok_or_else(|| {
476 ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
477 })?;
478
479 let content_blocks: Vec<UserContentBlock> = content.into();
480 UserContentBlock::validate_content(&content_blocks)?;
481
482 let session_id_str = session_id.into();
483
484 // Create a new isolated query for each prompt (Codex-style architecture)
485 // This ensures complete message isolation between prompts
486 let query_id = query_manager.create_query().await?;
487 self.current_query_id = Some(query_id.clone());
488
489 // Get the isolated query
490 let query = query_manager.get_query(&query_id)?;
491
492 // Format as JSON message for stream-json input format
493 // Content is an array of content blocks, not a simple string
494 let user_message = serde_json::json!({
495 "type": "user",
496 "message": {
497 "role": "user",
498 "content": content_blocks
499 },
500 "session_id": session_id_str
501 });
502
503 let message_str = serde_json::to_string(&user_message).map_err(|e| {
504 ClaudeError::Transport(format!("Failed to serialize user message: {}", e))
505 })?;
506
507 // Write directly to stdin (bypasses transport lock)
508 let stdin = query.stdin.clone();
509
510 if let Some(stdin_arc) = stdin {
511 let mut stdin_guard = stdin_arc.lock().await;
512 if let Some(ref mut stdin_stream) = *stdin_guard {
513 stdin_stream
514 .write_all(message_str.as_bytes())
515 .await
516 .map_err(|e| ClaudeError::Transport(format!("Failed to write query: {}", e)))?;
517 stdin_stream.write_all(b"\n").await.map_err(|e| {
518 ClaudeError::Transport(format!("Failed to write newline: {}", e))
519 })?;
520 stdin_stream
521 .flush()
522 .await
523 .map_err(|e| ClaudeError::Transport(format!("Failed to flush: {}", e)))?;
524 } else {
525 return Err(ClaudeError::Transport("stdin not available".to_string()));
526 }
527 } else {
528 return Err(ClaudeError::Transport("stdin not set".to_string()));
529 }
530
531 debug!(
532 query_id = %query_id,
533 session_id = %session_id_str,
534 "Sent content query to isolated query"
535 );
536
537 Ok(())
538 }
539
540 /// Receive all messages as a stream (continuous)
541 ///
542 /// This method returns a stream that yields all messages from Claude
543 /// indefinitely until the stream is closed or an error occurs.
544 ///
545 /// Use this when you want to process all messages, including multiple
546 /// responses and system events.
547 ///
548 /// # Returns
549 ///
550 /// A stream of `Result<Message>` that continues until the connection closes.
551 ///
552 /// # Example
553 ///
554 /// ```no_run
555 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
556 /// # use futures::StreamExt;
557 /// # #[tokio::main]
558 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
559 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
560 /// # client.connect().await?;
561 /// # client.query("Hello").await?;
562 /// let mut stream = client.receive_messages();
563 /// while let Some(message) = stream.next().await {
564 /// println!("Received: {:?}", message?);
565 /// }
566 /// # Ok(())
567 /// # }
568 /// ```
569 pub fn receive_messages(&self) -> Pin<Box<dyn Stream<Item = Result<Message>> + Send + '_>> {
570 let query_manager = match &self.query_manager {
571 Some(qm) => Arc::clone(qm),
572 None => {
573 return Box::pin(futures::stream::once(async {
574 Err(ClaudeError::InvalidConfig(
575 "Client not connected. Call connect() first.".to_string(),
576 ))
577 }));
578 }
579 };
580
581 let query_id = match &self.current_query_id {
582 Some(id) => id.clone(),
583 None => {
584 return Box::pin(futures::stream::once(async {
585 Err(ClaudeError::InvalidConfig(
586 "No active query. Call query() first.".to_string(),
587 ))
588 }));
589 }
590 };
591
592 Box::pin(async_stream::stream! {
593 // Get the isolated query and its message receiver
594 let query = match query_manager.get_query(&query_id) {
595 Ok(q) => q,
596 Err(e) => {
597 yield Err(e);
598 return;
599 }
600 };
601
602 let rx: Arc<Mutex<tokio::sync::mpsc::UnboundedReceiver<serde_json::Value>>> = {
603 Arc::clone(&query.message_rx)
604 };
605
606 loop {
607 let message = {
608 let mut rx_guard = rx.lock().await;
609 rx_guard.recv().await
610 };
611
612 match message {
613 Some(json) => {
614 match MessageParser::parse(json) {
615 Ok(msg) => yield Ok(msg),
616 Err(e) => {
617 eprintln!("Failed to parse message: {}", e);
618 yield Err(e);
619 }
620 }
621 }
622 None => break,
623 }
624 }
625 })
626 }
627
628 /// Receive messages until a ResultMessage
629 ///
630 /// This method returns a stream that yields messages until it encounters
631 /// a `ResultMessage`, which signals the completion of a Claude response.
632 ///
633 /// This is the most common pattern for handling Claude responses, as it
634 /// processes one complete "turn" of the conversation.
635 ///
636 /// This method uses query-scoped message channels to ensure message isolation,
637 /// preventing late-arriving ResultMessages from being consumed by the wrong prompt.
638 ///
639 /// # Returns
640 ///
641 /// A stream of `Result<Message>` that ends when a ResultMessage is received.
642 ///
643 /// # Example
644 ///
645 /// ```no_run
646 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions, Message};
647 /// # use futures::StreamExt;
648 /// # #[tokio::main]
649 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
650 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
651 /// # client.connect().await?;
652 /// # client.query("Hello").await?;
653 /// let mut stream = client.receive_response();
654 /// while let Some(message) = stream.next().await {
655 /// match message? {
656 /// Message::Assistant(msg) => println!("Assistant: {:?}", msg),
657 /// Message::Result(result) => {
658 /// println!("Done! Cost: ${:?}", result.total_cost_usd);
659 /// break;
660 /// }
661 /// _ => {}
662 /// }
663 /// }
664 /// # Ok(())
665 /// # }
666 /// ```
667 pub fn receive_response(&self) -> Pin<Box<dyn Stream<Item = Result<Message>> + Send + '_>> {
668 let query_manager = match &self.query_manager {
669 Some(qm) => Arc::clone(qm),
670 None => {
671 return Box::pin(futures::stream::once(async {
672 Err(ClaudeError::InvalidConfig(
673 "Client not connected. Call connect() first.".to_string(),
674 ))
675 }));
676 }
677 };
678
679 let query_id = match &self.current_query_id {
680 Some(id) => id.clone(),
681 None => {
682 return Box::pin(futures::stream::once(async {
683 Err(ClaudeError::InvalidConfig(
684 "No active query. Call query() first.".to_string(),
685 ))
686 }));
687 }
688 };
689
690 Box::pin(async_stream::stream! {
691 // ====================================================================
692 // ISOLATED QUERY MESSAGE CHANNEL (Codex-style)
693 // ====================================================================
694 // In the Codex-style architecture, each query has its own completely
695 // isolated QueryFull instance. We get the message receiver directly
696 // from the isolated query, eliminating the need for routing logic.
697 //
698 // This provides:
699 // - Complete message isolation
700 // - No possibility of message confusion
701 // - Simpler architecture without routing overhead
702 //
703 // Note: Cleanup is handled by the periodic cleanup task in QueryManager,
704 // which removes inactive queries whose channels have been closed.
705
706 debug!(
707 query_id = %query_id,
708 "Getting message receiver from isolated query"
709 );
710
711 // Get the isolated query and its message receiver
712 let query = match query_manager.get_query(&query_id) {
713 Ok(q) => q,
714 Err(e) => {
715 yield Err(e);
716 return;
717 }
718 };
719
720 // Get the message receiver from the isolated query
721 // In isolated mode, we directly access the message_rx without routing
722 let rx: Arc<Mutex<tokio::sync::mpsc::UnboundedReceiver<serde_json::Value>>> = {
723 Arc::clone(&query.message_rx)
724 };
725
726 loop {
727 let message = {
728 let mut rx_guard = rx.lock().await;
729 rx_guard.recv().await
730 };
731
732 match message {
733 Some(json) => {
734 match MessageParser::parse(json) {
735 Ok(msg) => {
736 let is_result = matches!(msg, Message::Result(_));
737 yield Ok(msg);
738 if is_result {
739 debug!(
740 query_id = %query_id,
741 "Received ResultMessage, ending stream"
742 );
743 // Cleanup will be handled by the periodic cleanup task
744 // when the query becomes inactive
745 break;
746 }
747 }
748 Err(e) => {
749 eprintln!("Failed to parse message: {}", e);
750 yield Err(e);
751 }
752 }
753 }
754 None => {
755 debug!(
756 query_id = %query_id,
757 "Isolated query channel closed"
758 );
759 // Cleanup will be handled by the periodic cleanup task
760 break;
761 }
762 }
763 }
764 })
765 }
766
767 /// Drain any leftover messages from the previous prompt
768 ///
769 /// This method removes any messages remaining in the channel from a previous
770 /// prompt. This should be called before starting a new prompt to ensure
771 /// that the new prompt doesn't receive stale messages.
772 ///
773 /// This is important when prompts are cancelled or end unexpectedly,
774 /// as there may be buffered messages that would otherwise be received
775 /// by the next prompt.
776 ///
777 /// # Returns
778 ///
779 /// The number of messages drained from the channel.
780 ///
781 /// # Example
782 ///
783 /// ```no_run
784 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
785 /// # #[tokio::main]
786 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
787 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
788 /// # client.connect().await?;
789 /// // Before starting a new prompt, drain any leftover messages
790 /// let drained = client.drain_messages().await;
791 /// if drained > 0 {
792 /// eprintln!("Drained {} leftover messages from previous prompt", drained);
793 /// }
794 /// # Ok(())
795 /// # }
796 /// ```
797 pub async fn drain_messages(&self) -> usize {
798 let Some(query_manager) = &self.query_manager else {
799 return 0;
800 };
801
802 let Some(query_id) = &self.current_query_id else {
803 return 0;
804 };
805
806 let Ok(query) = query_manager.get_query(query_id) else {
807 return 0;
808 };
809
810 let rx: Arc<Mutex<tokio::sync::mpsc::UnboundedReceiver<serde_json::Value>>> = {
811 Arc::clone(&query.message_rx)
812 };
813
814 let mut count = 0;
815 // Use try_recv to drain all currently available messages without blocking
816 loop {
817 let mut rx_guard = rx.lock().await;
818 match rx_guard.try_recv() {
819 Ok(_) => count += 1,
820 Err(tokio::sync::mpsc::error::TryRecvError::Empty) => break,
821 Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => break,
822 }
823 }
824
825 if count > 0 {
826 debug!(count, "Drained leftover messages from previous prompt");
827 }
828
829 count
830 }
831
832 /// Send an interrupt signal to stop the current Claude operation
833 ///
834 /// This is analogous to Python's `client.interrupt()`.
835 ///
836 /// # Errors
837 ///
838 /// Returns an error if the client is not connected or if sending fails.
839 pub async fn interrupt(&self) -> Result<()> {
840 let query_manager = self.query_manager.as_ref().ok_or_else(|| {
841 ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
842 })?;
843
844 let query_id = self.current_query_id.as_ref().ok_or_else(|| {
845 ClaudeError::InvalidConfig("No active query. Call query() first.".to_string())
846 })?;
847
848 let query = query_manager.get_query(query_id)?;
849 query.interrupt().await
850 }
851
852 /// Change the permission mode dynamically
853 ///
854 /// This is analogous to Python's `client.set_permission_mode()`.
855 ///
856 /// # Arguments
857 ///
858 /// * `mode` - The new permission mode to set
859 ///
860 /// # Errors
861 ///
862 /// Returns an error if the client is not connected or if sending fails.
863 pub async fn set_permission_mode(&self, mode: PermissionMode) -> Result<()> {
864 let query_manager = self.query_manager.as_ref().ok_or_else(|| {
865 ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
866 })?;
867
868 let query_id = self.current_query_id.as_ref().ok_or_else(|| {
869 ClaudeError::InvalidConfig("No active query. Call query() first.".to_string())
870 })?;
871
872 let query = query_manager.get_query(query_id)?;
873 query.set_permission_mode(mode).await
874 }
875
876 /// Change the AI model dynamically
877 ///
878 /// This is analogous to Python's `client.set_model()`.
879 ///
880 /// # Arguments
881 ///
882 /// * `model` - The new model name, or None to use default
883 ///
884 /// # Errors
885 ///
886 /// Returns an error if the client is not connected or if sending fails.
887 pub async fn set_model(&self, model: Option<&str>) -> Result<()> {
888 let query_manager = self.query_manager.as_ref().ok_or_else(|| {
889 ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
890 })?;
891
892 let query_id = self.current_query_id.as_ref().ok_or_else(|| {
893 ClaudeError::InvalidConfig("No active query. Call query() first.".to_string())
894 })?;
895
896 let query = query_manager.get_query(query_id)?;
897 query.set_model(model).await
898 }
899
900 /// Rewind tracked files to their state at a specific user message.
901 ///
902 /// This is analogous to Python's `client.rewind_files()`.
903 ///
904 /// # Requirements
905 ///
906 /// - `enable_file_checkpointing=true` in options to track file changes
907 /// - `extra_args={"replay-user-messages": None}` to receive UserMessage
908 /// objects with `uuid` in the response stream
909 ///
910 /// # Arguments
911 ///
912 /// * `user_message_id` - UUID of the user message to rewind to. This should be
913 /// the `uuid` field from a `UserMessage` received during the conversation.
914 ///
915 /// # Errors
916 ///
917 /// Returns an error if the client is not connected or if sending fails.
918 ///
919 /// # Example
920 ///
921 /// ```no_run
922 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions, Message};
923 /// # use std::collections::HashMap;
924 /// # #[tokio::main]
925 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
926 /// let options = ClaudeAgentOptions::builder()
927 /// .enable_file_checkpointing(true)
928 /// .extra_args(HashMap::from([("replay-user-messages".to_string(), None)]))
929 /// .build();
930 /// let mut client = ClaudeClient::new(options);
931 /// client.connect().await?;
932 ///
933 /// client.query("Make some changes to my files").await?;
934 /// let mut checkpoint_id = None;
935 /// {
936 /// let mut stream = client.receive_response();
937 /// use futures::StreamExt;
938 /// while let Some(Ok(msg)) = stream.next().await {
939 /// if let Message::User(user_msg) = &msg {
940 /// if let Some(uuid) = &user_msg.uuid {
941 /// checkpoint_id = Some(uuid.clone());
942 /// }
943 /// }
944 /// }
945 /// }
946 ///
947 /// // Later, rewind to that point
948 /// if let Some(id) = checkpoint_id {
949 /// client.rewind_files(&id).await?;
950 /// }
951 /// # Ok(())
952 /// # }
953 /// ```
954 pub async fn rewind_files(&self, user_message_id: &str) -> Result<()> {
955 let query_manager = self.query_manager.as_ref().ok_or_else(|| {
956 ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
957 })?;
958
959 let query_id = self.current_query_id.as_ref().ok_or_else(|| {
960 ClaudeError::InvalidConfig("No active query. Call query() first.".to_string())
961 })?;
962
963 let query = query_manager.get_query(query_id)?;
964 query.rewind_files(user_message_id).await
965 }
966
967 /// Get server initialization info including available commands and output styles
968 ///
969 /// Returns initialization information from the Claude Code server including:
970 /// - Available commands (slash commands, system commands, etc.)
971 /// - Current and available output styles
972 /// - Server capabilities
973 ///
974 /// This is analogous to Python's `client.get_server_info()`.
975 ///
976 /// # Returns
977 ///
978 /// Dictionary with server info, or None if not connected
979 ///
980 /// # Example
981 ///
982 /// ```no_run
983 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
984 /// # #[tokio::main]
985 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
986 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
987 /// # client.connect().await?;
988 /// if let Some(info) = client.get_server_info().await {
989 /// println!("Commands available: {}", info.get("commands").map(|c| c.as_array().map(|a| a.len()).unwrap_or(0)).unwrap_or(0));
990 /// println!("Output style: {:?}", info.get("output_style"));
991 /// }
992 /// # Ok(())
993 /// # }
994 /// ```
995 pub async fn get_server_info(&self) -> Option<serde_json::Value> {
996 let query_manager = self.query_manager.as_ref()?;
997 let query_id = self.current_query_id.as_ref()?;
998 let Ok(query) = query_manager.get_query(query_id) else {
999 return None;
1000 };
1001 query.get_initialization_result().await
1002 }
1003
1004 /// Start a new session by switching to a different session ID
1005 ///
1006 /// This is a convenience method that creates a new conversation context.
1007 /// It's equivalent to calling `query_with_session()` with a new session ID.
1008 ///
1009 /// To completely clear memory and start fresh, use `ClaudeAgentOptions::builder().fork_session(true).build()`
1010 /// when creating a new client.
1011 ///
1012 /// # Arguments
1013 ///
1014 /// * `session_id` - The new session ID to use
1015 /// * `prompt` - Initial message for the new session
1016 ///
1017 /// # Errors
1018 ///
1019 /// Returns an error if the client is not connected or if sending fails.
1020 ///
1021 /// # Example
1022 ///
1023 /// ```no_run
1024 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
1025 /// # #[tokio::main]
1026 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
1027 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
1028 /// # client.connect().await?;
1029 /// // First conversation
1030 /// client.query("Hello").await?;
1031 ///
1032 /// // Start new conversation with different context
1033 /// client.new_session("session-2", "Tell me about Rust").await?;
1034 /// # Ok(())
1035 /// # }
1036 /// ```
1037 pub async fn new_session(
1038 &mut self,
1039 session_id: impl Into<String>,
1040 prompt: impl Into<String>,
1041 ) -> Result<()> {
1042 self.query_with_session(prompt, session_id).await
1043 }
1044
1045 /// Disconnect from Claude (analogous to Python's __aexit__)
1046 ///
1047 /// This cleanly shuts down the connection to Claude Code CLI.
1048 ///
1049 /// # Errors
1050 ///
1051 /// Returns an error if disconnection fails.
1052 #[instrument(name = "claude.client.disconnect", skip(self))]
1053 pub async fn disconnect(&mut self) -> Result<()> {
1054 if !self.connected {
1055 debug!("Client already disconnected");
1056 return Ok(());
1057 }
1058
1059 info!("Disconnecting from Claude Code CLI (closing all isolated queries)");
1060
1061 if let Some(query_manager) = self.query_manager.take() {
1062 // Get all queries for cleanup
1063 let queries = query_manager.get_all_queries();
1064
1065 // Close all isolated queries by closing their resources
1066 // This signals each CLI process to exit
1067 for (_query_id, query) in &queries {
1068 // Close stdin (if available)
1069 if let Some(ref stdin_arc) = query.stdin {
1070 let mut stdin_guard = stdin_arc.lock().await;
1071 if let Some(mut stdin_stream) = stdin_guard.take() {
1072 let _ = stdin_stream.shutdown().await;
1073 }
1074 }
1075
1076 // Close transport
1077 let transport = Arc::clone(&query.transport);
1078 let mut transport_guard = transport.lock().await;
1079 let _ = transport_guard.close().await;
1080 }
1081
1082 // Give background tasks a moment to finish reading and release locks
1083 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1084 }
1085
1086 self.current_query_id = None;
1087 self.connected = false;
1088 debug!("Disconnected successfully");
1089 Ok(())
1090 }
1091}
1092
1093impl Drop for ClaudeClient {
1094 fn drop(&mut self) {
1095 // Note: We can't run async code in Drop, so we can't guarantee clean shutdown
1096 // Users should call disconnect() explicitly
1097 if self.connected {
1098 eprintln!(
1099 "Warning: ClaudeClient dropped without calling disconnect(). Resources may not be cleaned up properly."
1100 );
1101 }
1102 }
1103}
1104
1105#[cfg(test)]
1106mod tests {
1107 use super::*;
1108 use crate::types::permissions::{PermissionResult, PermissionResultAllow};
1109 use std::sync::Arc;
1110
1111 #[tokio::test]
1112 async fn test_connect_rejects_can_use_tool_with_custom_permission_tool() {
1113 let callback: crate::types::permissions::CanUseToolCallback =
1114 Arc::new(|_tool_name, _tool_input, _context| {
1115 Box::pin(async move { PermissionResult::Allow(PermissionResultAllow::default()) })
1116 });
1117
1118 let opts = ClaudeAgentOptions::builder()
1119 .can_use_tool(callback)
1120 .permission_prompt_tool_name("custom_tool") // Not "stdio"
1121 .build();
1122
1123 let mut client = ClaudeClient::new(opts);
1124 let result = client.connect().await;
1125
1126 assert!(result.is_err());
1127 let err = result.unwrap_err();
1128 assert!(matches!(err, ClaudeError::InvalidConfig(_)));
1129 assert!(err.to_string().contains("permission_prompt_tool_name"));
1130 }
1131
1132 #[tokio::test]
1133 async fn test_connect_accepts_can_use_tool_with_stdio() {
1134 let callback: crate::types::permissions::CanUseToolCallback =
1135 Arc::new(|_tool_name, _tool_input, _context| {
1136 Box::pin(async move { PermissionResult::Allow(PermissionResultAllow::default()) })
1137 });
1138
1139 let opts = ClaudeAgentOptions::builder()
1140 .can_use_tool(callback)
1141 .permission_prompt_tool_name("stdio") // Explicitly "stdio" is OK
1142 .build();
1143
1144 let mut client = ClaudeClient::new(opts);
1145 // This will fail later (CLI not found), but should pass validation
1146 let result = client.connect().await;
1147
1148 // Should not be InvalidConfig error about permission_prompt_tool_name
1149 if let Err(ref err) = result {
1150 assert!(
1151 !err.to_string().contains("permission_prompt_tool_name"),
1152 "Should not fail on permission_prompt_tool_name validation"
1153 );
1154 }
1155 }
1156
1157 #[tokio::test]
1158 async fn test_connect_accepts_can_use_tool_without_permission_tool() {
1159 let callback: crate::types::permissions::CanUseToolCallback =
1160 Arc::new(|_tool_name, _tool_input, _context| {
1161 Box::pin(async move { PermissionResult::Allow(PermissionResultAllow::default()) })
1162 });
1163
1164 let opts = ClaudeAgentOptions::builder()
1165 .can_use_tool(callback)
1166 // No permission_prompt_tool_name set - defaults to stdio
1167 .build();
1168
1169 let mut client = ClaudeClient::new(opts);
1170 // This will fail later (CLI not found), but should pass validation
1171 let result = client.connect().await;
1172
1173 // Should not be InvalidConfig error about permission_prompt_tool_name
1174 if let Err(ref err) = result {
1175 assert!(
1176 !err.to_string().contains("permission_prompt_tool_name"),
1177 "Should not fail on permission_prompt_tool_name validation"
1178 );
1179 }
1180 }
1181}