claude_code_agent_sdk/
client.rs

1//! ClaudeClient for bidirectional streaming interactions with hook support
2
3use tracing::{debug, info, instrument};
4
5use futures::stream::Stream;
6use std::pin::Pin;
7use std::sync::Arc;
8use tokio::io::AsyncWriteExt;
9use tokio::sync::Mutex;
10
11use crate::errors::{ClaudeError, Result};
12use crate::internal::message_parser::MessageParser;
13use crate::internal::query_full::QueryFull;
14use crate::internal::transport::subprocess::QueryPrompt;
15use crate::internal::transport::{SubprocessTransport, Transport};
16use crate::types::config::{ClaudeAgentOptions, PermissionMode};
17use crate::types::efficiency::{build_efficiency_hooks, merge_hooks};
18use crate::types::hooks::HookEvent;
19use crate::types::messages::{Message, UserContentBlock};
20
21/// Client for bidirectional streaming interactions with Claude
22///
23/// This client provides the same functionality as Python's ClaudeSDKClient,
24/// supporting bidirectional communication, streaming responses, and dynamic
25/// control over the Claude session.
26///
27/// # Example
28///
29/// ```no_run
30/// use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
31/// use futures::StreamExt;
32///
33/// #[tokio::main]
34/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
35///     let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
36///
37///     // Connect to Claude
38///     client.connect().await?;
39///
40///     // Send a query
41///     client.query("Hello Claude!").await?;
42///
43///     // Receive response as a stream
44///     {
45///         let mut stream = client.receive_response();
46///         while let Some(message) = stream.next().await {
47///             println!("Received: {:?}", message?);
48///         }
49///     }
50///
51///     // Disconnect
52///     client.disconnect().await?;
53///     Ok(())
54/// }
55/// ```
56pub struct ClaudeClient {
57    options: ClaudeAgentOptions,
58    query: Option<Arc<Mutex<QueryFull>>>,
59    connected: bool,
60}
61
62impl ClaudeClient {
63    /// Create a new ClaudeClient
64    ///
65    /// # Arguments
66    ///
67    /// * `options` - Configuration options for the Claude client
68    ///
69    /// # Example
70    ///
71    /// ```no_run
72    /// use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
73    ///
74    /// let client = ClaudeClient::new(ClaudeAgentOptions::default());
75    /// ```
76    pub fn new(options: ClaudeAgentOptions) -> Self {
77        Self {
78            options,
79            query: None,
80            connected: false,
81        }
82    }
83
84    /// Create a new ClaudeClient with early validation
85    ///
86    /// Unlike `new()`, this validates the configuration eagerly by attempting
87    /// to create the transport. This catches issues like invalid working directory
88    /// or missing CLI before `connect()` is called.
89    ///
90    /// # Arguments
91    ///
92    /// * `options` - Configuration options for the Claude client
93    ///
94    /// # Errors
95    ///
96    /// Returns an error if:
97    /// - The working directory does not exist or is not a directory
98    /// - Claude CLI cannot be found
99    ///
100    /// # Example
101    ///
102    /// ```no_run
103    /// use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
104    ///
105    /// let client = ClaudeClient::try_new(ClaudeAgentOptions::default())?;
106    /// # Ok::<(), claude_agent_sdk_rs::ClaudeError>(())
107    /// ```
108    pub fn try_new(options: ClaudeAgentOptions) -> Result<Self> {
109        // Validate by attempting to create transport (but don't keep it)
110        let prompt = QueryPrompt::Streaming;
111        let _ = SubprocessTransport::new(prompt, options.clone())?;
112
113        Ok(Self {
114            options,
115            query: None,
116            connected: false,
117        })
118    }
119
120    /// Connect to Claude (analogous to Python's __aenter__)
121    ///
122    /// This establishes the connection to the Claude Code CLI and initializes
123    /// the bidirectional communication channel.
124    ///
125    /// # Errors
126    ///
127    /// Returns an error if:
128    /// - Claude CLI cannot be found or started
129    /// - The initialization handshake fails
130    /// - Hook registration fails
131    /// - `can_use_tool` callback is set with incompatible `permission_prompt_tool_name`
132    #[instrument(
133        name = "claude.client.connect",
134        skip(self),
135        fields(
136            has_can_use_tool = self.options.can_use_tool.is_some(),
137            has_hooks = self.options.hooks.is_some(),
138            model = %self.options.model.as_deref().unwrap_or("default"),
139        )
140    )]
141    pub async fn connect(&mut self) -> Result<()> {
142        if self.connected {
143            debug!("Client already connected, skipping");
144            return Ok(());
145        }
146
147        info!("Connecting to Claude Code CLI");
148
149        // Automatically set permission_prompt_tool_name to "stdio" when can_use_tool is provided
150        // This matches Python SDK behavior (client.py lines 106-122)
151        // which ensures CLI uses control protocol for permission prompts
152        let mut options = self.options.clone();
153        if options.can_use_tool.is_some() && options.permission_prompt_tool_name.is_none() {
154            info!(
155                "can_use_tool callback is set, automatically setting permission_prompt_tool_name to 'stdio'"
156            );
157            options.permission_prompt_tool_name = Some("stdio".to_string());
158        }
159
160        // Validate can_use_tool configuration (aligned with Python SDK behavior)
161        // When can_use_tool callback is set, permission_prompt_tool_name must be "stdio"
162        // to ensure the control protocol can handle permission requests
163        if options.can_use_tool.is_some()
164            && let Some(ref tool_name) = options.permission_prompt_tool_name
165            && tool_name != "stdio"
166        {
167            return Err(ClaudeError::InvalidConfig(
168                        "can_use_tool callback requires permission_prompt_tool_name to be 'stdio' or unset. \
169                        Custom permission_prompt_tool_name is incompatible with can_use_tool callback."
170                            .to_string(),
171                    ));
172        }
173
174        // Create transport in streaming mode (no initial prompt)
175        let prompt = QueryPrompt::Streaming;
176        let mut transport = SubprocessTransport::new(prompt, options)?;
177
178        // Don't send initial prompt - we'll use query() for that
179        transport.connect().await?;
180
181        // Extract stdin for direct access (avoids transport lock deadlock)
182        let stdin = Arc::clone(&transport.stdin);
183
184        // Create Query with hooks
185        let mut query = QueryFull::new(Box::new(transport));
186        query.set_stdin(stdin);
187
188        // Set control request timeout from options
189        query.set_control_request_timeout(self.options.control_request_timeout);
190
191        // Extract SDK MCP servers from options
192        let sdk_mcp_servers =
193            if let crate::types::mcp::McpServers::Dict(servers_dict) = &self.options.mcp_servers {
194                servers_dict
195                    .iter()
196                    .filter_map(|(name, config)| {
197                        if let crate::types::mcp::McpServerConfig::Sdk(sdk_config) = config {
198                            Some((name.clone(), sdk_config.clone()))
199                        } else {
200                            None
201                        }
202                    })
203                    .collect()
204            } else {
205                std::collections::HashMap::new()
206            };
207        query.set_sdk_mcp_servers(sdk_mcp_servers).await;
208
209        // Set can_use_tool callback if provided
210        if let Some(ref callback) = self.options.can_use_tool {
211            query.set_can_use_tool(Some(Arc::clone(callback))).await;
212        }
213
214        // Build efficiency hooks if configured
215        let efficiency_hooks = self
216            .options
217            .efficiency
218            .as_ref()
219            .map(build_efficiency_hooks)
220            .unwrap_or_default();
221
222        // Merge user hooks with efficiency hooks
223        let merged_hooks = merge_hooks(self.options.hooks.clone(), efficiency_hooks);
224
225        // Convert hooks to internal format
226        let hooks = merged_hooks.as_ref().map(|hooks_map| {
227            hooks_map
228                .iter()
229                .map(|(event, matchers)| {
230                    let event_name = match event {
231                        HookEvent::PreToolUse => "PreToolUse",
232                        HookEvent::PostToolUse => "PostToolUse",
233                        HookEvent::UserPromptSubmit => "UserPromptSubmit",
234                        HookEvent::Stop => "Stop",
235                        HookEvent::SubagentStop => "SubagentStop",
236                        HookEvent::PreCompact => "PreCompact",
237                    };
238                    (event_name.to_string(), matchers.clone())
239                })
240                .collect()
241        });
242
243        // Start reading messages in background FIRST
244        // This must happen before initialize() because initialize()
245        // sends a control request and waits for response
246        query.start().await?;
247
248        // Initialize with hooks (sends control request)
249        query.initialize(hooks).await?;
250
251        self.query = Some(Arc::new(Mutex::new(query)));
252        self.connected = true;
253
254        info!("Successfully connected to Claude Code CLI");
255        Ok(())
256    }
257
258    /// Send a query to Claude
259    ///
260    /// This sends a new user prompt to Claude. Claude will remember the context
261    /// of previous queries within the same session.
262    ///
263    /// # Arguments
264    ///
265    /// * `prompt` - The user prompt to send
266    ///
267    /// # Errors
268    ///
269    /// Returns an error if the client is not connected or if sending fails.
270    ///
271    /// # Example
272    ///
273    /// ```no_run
274    /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
275    /// # #[tokio::main]
276    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
277    /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
278    /// # client.connect().await?;
279    /// client.query("What is 2 + 2?").await?;
280    /// # Ok(())
281    /// # }
282    /// ```
283    #[instrument(
284        name = "claude.client.query",
285        skip(self, prompt),
286        fields(session_id = "default",)
287    )]
288    pub async fn query(&mut self, prompt: impl Into<String>) -> Result<()> {
289        self.query_with_session(prompt, "default").await
290    }
291
292    /// Send a query to Claude with a specific session ID
293    ///
294    /// This sends a new user prompt to Claude. Different session IDs maintain
295    /// separate conversation contexts.
296    ///
297    /// # Arguments
298    ///
299    /// * `prompt` - The user prompt to send
300    /// * `session_id` - Session identifier for the conversation
301    ///
302    /// # Errors
303    ///
304    /// Returns an error if the client is not connected or if sending fails.
305    ///
306    /// # Example
307    ///
308    /// ```no_run
309    /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
310    /// # #[tokio::main]
311    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
312    /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
313    /// # client.connect().await?;
314    /// // Separate conversation contexts
315    /// client.query_with_session("First question", "session-1").await?;
316    /// client.query_with_session("Different question", "session-2").await?;
317    /// # Ok(())
318    /// # }
319    /// ```
320    pub async fn query_with_session(
321        &mut self,
322        prompt: impl Into<String>,
323        session_id: impl Into<String>,
324    ) -> Result<()> {
325        let query = self.query.as_ref().ok_or_else(|| {
326            ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
327        })?;
328
329        let prompt_str = prompt.into();
330        let session_id_str = session_id.into();
331
332        // Format as JSON message for stream-json input format
333        let user_message = serde_json::json!({
334            "type": "user",
335            "message": {
336                "role": "user",
337                "content": prompt_str
338            },
339            "session_id": session_id_str
340        });
341
342        let message_str = serde_json::to_string(&user_message).map_err(|e| {
343            ClaudeError::Transport(format!("Failed to serialize user message: {}", e))
344        })?;
345
346        // Write directly to stdin (bypasses transport lock)
347        let query_guard = query.lock().await;
348        let stdin = query_guard.stdin.clone();
349        drop(query_guard);
350
351        if let Some(stdin_arc) = stdin {
352            let mut stdin_guard = stdin_arc.lock().await;
353            if let Some(ref mut stdin_stream) = *stdin_guard {
354                stdin_stream
355                    .write_all(message_str.as_bytes())
356                    .await
357                    .map_err(|e| ClaudeError::Transport(format!("Failed to write query: {}", e)))?;
358                stdin_stream.write_all(b"\n").await.map_err(|e| {
359                    ClaudeError::Transport(format!("Failed to write newline: {}", e))
360                })?;
361                stdin_stream
362                    .flush()
363                    .await
364                    .map_err(|e| ClaudeError::Transport(format!("Failed to flush: {}", e)))?;
365            } else {
366                return Err(ClaudeError::Transport("stdin not available".to_string()));
367            }
368        } else {
369            return Err(ClaudeError::Transport("stdin not set".to_string()));
370        }
371
372        Ok(())
373    }
374
375    /// Send a query with structured content blocks (supports images)
376    ///
377    /// This method enables multimodal queries in bidirectional streaming mode.
378    /// Use it to send images alongside text for vision-related tasks.
379    ///
380    /// # Arguments
381    ///
382    /// * `content` - A vector of content blocks (text and/or images)
383    ///
384    /// # Errors
385    ///
386    /// Returns an error if:
387    /// - The content vector is empty (must include at least one text or image block)
388    /// - The client is not connected (call `connect()` first)
389    /// - Sending the message fails
390    ///
391    /// # Example
392    ///
393    /// ```no_run
394    /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions, UserContentBlock};
395    /// # #[tokio::main]
396    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
397    /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
398    /// # client.connect().await?;
399    /// let base64_data = "iVBORw0KGgo..."; // base64 encoded image
400    /// client.query_with_content(vec![
401    ///     UserContentBlock::text("What's in this image?"),
402    ///     UserContentBlock::image_base64("image/png", base64_data)?,
403    /// ]).await?;
404    /// # Ok(())
405    /// # }
406    /// ```
407    pub async fn query_with_content(
408        &mut self,
409        content: impl Into<Vec<UserContentBlock>>,
410    ) -> Result<()> {
411        self.query_with_content_and_session(content, "default")
412            .await
413    }
414
415    /// Send a query with structured content blocks and a specific session ID
416    ///
417    /// This method enables multimodal queries with session management for
418    /// maintaining separate conversation contexts.
419    ///
420    /// # Arguments
421    ///
422    /// * `content` - A vector of content blocks (text and/or images)
423    /// * `session_id` - Session identifier for the conversation
424    ///
425    /// # Errors
426    ///
427    /// Returns an error if:
428    /// - The content vector is empty (must include at least one text or image block)
429    /// - The client is not connected (call `connect()` first)
430    /// - Sending the message fails
431    ///
432    /// # Example
433    ///
434    /// ```no_run
435    /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions, UserContentBlock};
436    /// # #[tokio::main]
437    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
438    /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
439    /// # client.connect().await?;
440    /// client.query_with_content_and_session(
441    ///     vec![
442    ///         UserContentBlock::text("Analyze this chart"),
443    ///         UserContentBlock::image_url("https://example.com/chart.png"),
444    ///     ],
445    ///     "analysis-session",
446    /// ).await?;
447    /// # Ok(())
448    /// # }
449    /// ```
450    pub async fn query_with_content_and_session(
451        &mut self,
452        content: impl Into<Vec<UserContentBlock>>,
453        session_id: impl Into<String>,
454    ) -> Result<()> {
455        let query = self.query.as_ref().ok_or_else(|| {
456            ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
457        })?;
458
459        let content_blocks: Vec<UserContentBlock> = content.into();
460        UserContentBlock::validate_content(&content_blocks)?;
461
462        let session_id_str = session_id.into();
463
464        // Format as JSON message for stream-json input format
465        // Content is an array of content blocks, not a simple string
466        let user_message = serde_json::json!({
467            "type": "user",
468            "message": {
469                "role": "user",
470                "content": content_blocks
471            },
472            "session_id": session_id_str
473        });
474
475        let message_str = serde_json::to_string(&user_message).map_err(|e| {
476            ClaudeError::Transport(format!("Failed to serialize user message: {}", e))
477        })?;
478
479        // Write directly to stdin (bypasses transport lock)
480        let query_guard = query.lock().await;
481        let stdin = query_guard.stdin.clone();
482        drop(query_guard);
483
484        if let Some(stdin_arc) = stdin {
485            let mut stdin_guard = stdin_arc.lock().await;
486            if let Some(ref mut stdin_stream) = *stdin_guard {
487                stdin_stream
488                    .write_all(message_str.as_bytes())
489                    .await
490                    .map_err(|e| ClaudeError::Transport(format!("Failed to write query: {}", e)))?;
491                stdin_stream.write_all(b"\n").await.map_err(|e| {
492                    ClaudeError::Transport(format!("Failed to write newline: {}", e))
493                })?;
494                stdin_stream
495                    .flush()
496                    .await
497                    .map_err(|e| ClaudeError::Transport(format!("Failed to flush: {}", e)))?;
498            } else {
499                return Err(ClaudeError::Transport("stdin not available".to_string()));
500            }
501        } else {
502            return Err(ClaudeError::Transport("stdin not set".to_string()));
503        }
504
505        Ok(())
506    }
507
508    /// Receive all messages as a stream (continuous)
509    ///
510    /// This method returns a stream that yields all messages from Claude
511    /// indefinitely until the stream is closed or an error occurs.
512    ///
513    /// Use this when you want to process all messages, including multiple
514    /// responses and system events.
515    ///
516    /// # Returns
517    ///
518    /// A stream of `Result<Message>` that continues until the connection closes.
519    ///
520    /// # Example
521    ///
522    /// ```no_run
523    /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
524    /// # use futures::StreamExt;
525    /// # #[tokio::main]
526    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
527    /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
528    /// # client.connect().await?;
529    /// # client.query("Hello").await?;
530    /// let mut stream = client.receive_messages();
531    /// while let Some(message) = stream.next().await {
532    ///     println!("Received: {:?}", message?);
533    /// }
534    /// # Ok(())
535    /// # }
536    /// ```
537    pub fn receive_messages(&self) -> Pin<Box<dyn Stream<Item = Result<Message>> + Send + '_>> {
538        let query = match &self.query {
539            Some(q) => Arc::clone(q),
540            None => {
541                return Box::pin(futures::stream::once(async {
542                    Err(ClaudeError::InvalidConfig(
543                        "Client not connected. Call connect() first.".to_string(),
544                    ))
545                }));
546            }
547        };
548
549        Box::pin(async_stream::stream! {
550            let rx: Arc<Mutex<tokio::sync::mpsc::UnboundedReceiver<serde_json::Value>>> = {
551                let query_guard = query.lock().await;
552                Arc::clone(&query_guard.message_rx)
553            };
554
555            loop {
556                let message = {
557                    let mut rx_guard = rx.lock().await;
558                    rx_guard.recv().await
559                };
560
561                match message {
562                    Some(json) => {
563                        match MessageParser::parse(json) {
564                            Ok(msg) => yield Ok(msg),
565                            Err(e) => {
566                                eprintln!("Failed to parse message: {}", e);
567                                yield Err(e);
568                            }
569                        }
570                    }
571                    None => break,
572                }
573            }
574        })
575    }
576
577    /// Receive messages until a ResultMessage
578    ///
579    /// This method returns a stream that yields messages until it encounters
580    /// a `ResultMessage`, which signals the completion of a Claude response.
581    ///
582    /// This is the most common pattern for handling Claude responses, as it
583    /// processes one complete "turn" of the conversation.
584    ///
585    /// # Returns
586    ///
587    /// A stream of `Result<Message>` that ends when a ResultMessage is received.
588    ///
589    /// # Example
590    ///
591    /// ```no_run
592    /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions, Message};
593    /// # use futures::StreamExt;
594    /// # #[tokio::main]
595    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
596    /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
597    /// # client.connect().await?;
598    /// # client.query("Hello").await?;
599    /// let mut stream = client.receive_response();
600    /// while let Some(message) = stream.next().await {
601    ///     match message? {
602    ///         Message::Assistant(msg) => println!("Assistant: {:?}", msg),
603    ///         Message::Result(result) => {
604    ///             println!("Done! Cost: ${:?}", result.total_cost_usd);
605    ///             break;
606    ///         }
607    ///         _ => {}
608    ///     }
609    /// }
610    /// # Ok(())
611    /// # }
612    /// ```
613    pub fn receive_response(&self) -> Pin<Box<dyn Stream<Item = Result<Message>> + Send + '_>> {
614        let query = match &self.query {
615            Some(q) => Arc::clone(q),
616            None => {
617                return Box::pin(futures::stream::once(async {
618                    Err(ClaudeError::InvalidConfig(
619                        "Client not connected. Call connect() first.".to_string(),
620                    ))
621                }));
622            }
623        };
624
625        Box::pin(async_stream::stream! {
626            let rx: Arc<Mutex<tokio::sync::mpsc::UnboundedReceiver<serde_json::Value>>> = {
627                let query_guard = query.lock().await;
628                Arc::clone(&query_guard.message_rx)
629            };
630
631            loop {
632                let message = {
633                    let mut rx_guard = rx.lock().await;
634                    rx_guard.recv().await
635                };
636
637                match message {
638                    Some(json) => {
639                        match MessageParser::parse(json) {
640                            Ok(msg) => {
641                                let is_result = matches!(msg, Message::Result(_));
642                                yield Ok(msg);
643                                if is_result {
644                                    break;
645                                }
646                            }
647                            Err(e) => {
648                                eprintln!("Failed to parse message: {}", e);
649                                yield Err(e);
650                            }
651                        }
652                    }
653                    None => break,
654                }
655            }
656        })
657    }
658
659    /// Send an interrupt signal to stop the current Claude operation
660    ///
661    /// This is analogous to Python's `client.interrupt()`.
662    ///
663    /// # Errors
664    ///
665    /// Returns an error if the client is not connected or if sending fails.
666    pub async fn interrupt(&self) -> Result<()> {
667        let query = self.query.as_ref().ok_or_else(|| {
668            ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
669        })?;
670
671        let query_guard = query.lock().await;
672        query_guard.interrupt().await
673    }
674
675    /// Change the permission mode dynamically
676    ///
677    /// This is analogous to Python's `client.set_permission_mode()`.
678    ///
679    /// # Arguments
680    ///
681    /// * `mode` - The new permission mode to set
682    ///
683    /// # Errors
684    ///
685    /// Returns an error if the client is not connected or if sending fails.
686    pub async fn set_permission_mode(&self, mode: PermissionMode) -> Result<()> {
687        let query = self.query.as_ref().ok_or_else(|| {
688            ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
689        })?;
690
691        let query_guard = query.lock().await;
692        query_guard.set_permission_mode(mode).await
693    }
694
695    /// Change the AI model dynamically
696    ///
697    /// This is analogous to Python's `client.set_model()`.
698    ///
699    /// # Arguments
700    ///
701    /// * `model` - The new model name, or None to use default
702    ///
703    /// # Errors
704    ///
705    /// Returns an error if the client is not connected or if sending fails.
706    pub async fn set_model(&self, model: Option<&str>) -> Result<()> {
707        let query = self.query.as_ref().ok_or_else(|| {
708            ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
709        })?;
710
711        let query_guard = query.lock().await;
712        query_guard.set_model(model).await
713    }
714
715    /// Rewind tracked files to their state at a specific user message.
716    ///
717    /// This is analogous to Python's `client.rewind_files()`.
718    ///
719    /// # Requirements
720    ///
721    /// - `enable_file_checkpointing=true` in options to track file changes
722    /// - `extra_args={"replay-user-messages": None}` to receive UserMessage
723    ///   objects with `uuid` in the response stream
724    ///
725    /// # Arguments
726    ///
727    /// * `user_message_id` - UUID of the user message to rewind to. This should be
728    ///   the `uuid` field from a `UserMessage` received during the conversation.
729    ///
730    /// # Errors
731    ///
732    /// Returns an error if the client is not connected or if sending fails.
733    ///
734    /// # Example
735    ///
736    /// ```no_run
737    /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions, Message};
738    /// # use std::collections::HashMap;
739    /// # #[tokio::main]
740    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
741    /// let options = ClaudeAgentOptions::builder()
742    ///     .enable_file_checkpointing(true)
743    ///     .extra_args(HashMap::from([("replay-user-messages".to_string(), None)]))
744    ///     .build();
745    /// let mut client = ClaudeClient::new(options);
746    /// client.connect().await?;
747    ///
748    /// client.query("Make some changes to my files").await?;
749    /// let mut checkpoint_id = None;
750    /// {
751    ///     let mut stream = client.receive_response();
752    ///     use futures::StreamExt;
753    ///     while let Some(Ok(msg)) = stream.next().await {
754    ///         if let Message::User(user_msg) = &msg {
755    ///             if let Some(uuid) = &user_msg.uuid {
756    ///                 checkpoint_id = Some(uuid.clone());
757    ///             }
758    ///         }
759    ///     }
760    /// }
761    ///
762    /// // Later, rewind to that point
763    /// if let Some(id) = checkpoint_id {
764    ///     client.rewind_files(&id).await?;
765    /// }
766    /// # Ok(())
767    /// # }
768    /// ```
769    pub async fn rewind_files(&self, user_message_id: &str) -> Result<()> {
770        let query = self.query.as_ref().ok_or_else(|| {
771            ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
772        })?;
773
774        let query_guard = query.lock().await;
775        query_guard.rewind_files(user_message_id).await
776    }
777
778    /// Get server initialization info including available commands and output styles
779    ///
780    /// Returns initialization information from the Claude Code server including:
781    /// - Available commands (slash commands, system commands, etc.)
782    /// - Current and available output styles
783    /// - Server capabilities
784    ///
785    /// This is analogous to Python's `client.get_server_info()`.
786    ///
787    /// # Returns
788    ///
789    /// Dictionary with server info, or None if not connected
790    ///
791    /// # Example
792    ///
793    /// ```no_run
794    /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
795    /// # #[tokio::main]
796    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
797    /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
798    /// # client.connect().await?;
799    /// if let Some(info) = client.get_server_info().await {
800    ///     println!("Commands available: {}", info.get("commands").map(|c| c.as_array().map(|a| a.len()).unwrap_or(0)).unwrap_or(0));
801    ///     println!("Output style: {:?}", info.get("output_style"));
802    /// }
803    /// # Ok(())
804    /// # }
805    /// ```
806    pub async fn get_server_info(&self) -> Option<serde_json::Value> {
807        let query = self.query.as_ref()?;
808        let query_guard = query.lock().await;
809        query_guard.get_initialization_result().await
810    }
811
812    /// Start a new session by switching to a different session ID
813    ///
814    /// This is a convenience method that creates a new conversation context.
815    /// It's equivalent to calling `query_with_session()` with a new session ID.
816    ///
817    /// To completely clear memory and start fresh, use `ClaudeAgentOptions::builder().fork_session(true).build()`
818    /// when creating a new client.
819    ///
820    /// # Arguments
821    ///
822    /// * `session_id` - The new session ID to use
823    /// * `prompt` - Initial message for the new session
824    ///
825    /// # Errors
826    ///
827    /// Returns an error if the client is not connected or if sending fails.
828    ///
829    /// # Example
830    ///
831    /// ```no_run
832    /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
833    /// # #[tokio::main]
834    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
835    /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
836    /// # client.connect().await?;
837    /// // First conversation
838    /// client.query("Hello").await?;
839    ///
840    /// // Start new conversation with different context
841    /// client.new_session("session-2", "Tell me about Rust").await?;
842    /// # Ok(())
843    /// # }
844    /// ```
845    pub async fn new_session(
846        &mut self,
847        session_id: impl Into<String>,
848        prompt: impl Into<String>,
849    ) -> Result<()> {
850        self.query_with_session(prompt, session_id).await
851    }
852
853    /// Disconnect from Claude (analogous to Python's __aexit__)
854    ///
855    /// This cleanly shuts down the connection to Claude Code CLI.
856    ///
857    /// # Errors
858    ///
859    /// Returns an error if disconnection fails.
860    #[instrument(name = "claude.client.disconnect", skip(self))]
861    pub async fn disconnect(&mut self) -> Result<()> {
862        if !self.connected {
863            debug!("Client already disconnected");
864            return Ok(());
865        }
866
867        info!("Disconnecting from Claude Code CLI");
868
869        if let Some(query) = self.query.take() {
870            // Close stdin first (using direct access) to signal CLI to exit
871            // This will cause the background task to finish and release transport lock
872            let query_guard = query.lock().await;
873            if let Some(ref stdin_arc) = query_guard.stdin {
874                let mut stdin_guard = stdin_arc.lock().await;
875                if let Some(mut stdin_stream) = stdin_guard.take() {
876                    let _ = stdin_stream.shutdown().await;
877                }
878            }
879            let transport = Arc::clone(&query_guard.transport);
880            drop(query_guard);
881
882            // Give background task a moment to finish reading and release lock
883            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
884
885            let mut transport_guard = transport.lock().await;
886            transport_guard.close().await?;
887        }
888
889        self.connected = false;
890        debug!("Disconnected successfully");
891        Ok(())
892    }
893}
894
895impl Drop for ClaudeClient {
896    fn drop(&mut self) {
897        // Note: We can't run async code in Drop, so we can't guarantee clean shutdown
898        // Users should call disconnect() explicitly
899        if self.connected {
900            eprintln!(
901                "Warning: ClaudeClient dropped without calling disconnect(). Resources may not be cleaned up properly."
902            );
903        }
904    }
905}
906
907#[cfg(test)]
908mod tests {
909    use super::*;
910    use crate::types::permissions::{PermissionResult, PermissionResultAllow};
911    use std::sync::Arc;
912
913    #[tokio::test]
914    async fn test_connect_rejects_can_use_tool_with_custom_permission_tool() {
915        let callback: crate::types::permissions::CanUseToolCallback =
916            Arc::new(|_tool_name, _tool_input, _context| {
917                Box::pin(async move { PermissionResult::Allow(PermissionResultAllow::default()) })
918            });
919
920        let opts = ClaudeAgentOptions::builder()
921            .can_use_tool(callback)
922            .permission_prompt_tool_name("custom_tool") // Not "stdio"
923            .build();
924
925        let mut client = ClaudeClient::new(opts);
926        let result = client.connect().await;
927
928        assert!(result.is_err());
929        let err = result.unwrap_err();
930        assert!(matches!(err, ClaudeError::InvalidConfig(_)));
931        assert!(err.to_string().contains("permission_prompt_tool_name"));
932    }
933
934    #[tokio::test]
935    async fn test_connect_accepts_can_use_tool_with_stdio() {
936        let callback: crate::types::permissions::CanUseToolCallback =
937            Arc::new(|_tool_name, _tool_input, _context| {
938                Box::pin(async move { PermissionResult::Allow(PermissionResultAllow::default()) })
939            });
940
941        let opts = ClaudeAgentOptions::builder()
942            .can_use_tool(callback)
943            .permission_prompt_tool_name("stdio") // Explicitly "stdio" is OK
944            .build();
945
946        let mut client = ClaudeClient::new(opts);
947        // This will fail later (CLI not found), but should pass validation
948        let result = client.connect().await;
949
950        // Should not be InvalidConfig error about permission_prompt_tool_name
951        if let Err(ref err) = result {
952            assert!(
953                !err.to_string().contains("permission_prompt_tool_name"),
954                "Should not fail on permission_prompt_tool_name validation"
955            );
956        }
957    }
958
959    #[tokio::test]
960    async fn test_connect_accepts_can_use_tool_without_permission_tool() {
961        let callback: crate::types::permissions::CanUseToolCallback =
962            Arc::new(|_tool_name, _tool_input, _context| {
963                Box::pin(async move { PermissionResult::Allow(PermissionResultAllow::default()) })
964            });
965
966        let opts = ClaudeAgentOptions::builder()
967            .can_use_tool(callback)
968            // No permission_prompt_tool_name set - defaults to stdio
969            .build();
970
971        let mut client = ClaudeClient::new(opts);
972        // This will fail later (CLI not found), but should pass validation
973        let result = client.connect().await;
974
975        // Should not be InvalidConfig error about permission_prompt_tool_name
976        if let Err(ref err) = result {
977            assert!(
978                !err.to_string().contains("permission_prompt_tool_name"),
979                "Should not fail on permission_prompt_tool_name validation"
980            );
981        }
982    }
983}