claude_code_agent_sdk/
client.rs

1//! ClaudeClient for bidirectional streaming interactions with hook support
2
3use tracing::{debug, info, instrument};
4
5use futures::stream::Stream;
6use std::pin::Pin;
7use std::sync::Arc;
8use tokio::io::AsyncWriteExt;
9use tokio::sync::Mutex;
10
11use crate::errors::{ClaudeError, Result};
12use crate::internal::message_parser::MessageParser;
13use crate::internal::query_manager::QueryManager;
14use crate::internal::transport::subprocess::QueryPrompt;
15use crate::internal::transport::{SubprocessTransport, Transport};
16use crate::types::config::{ClaudeAgentOptions, PermissionMode};
17use crate::types::efficiency::{build_efficiency_hooks, merge_hooks};
18use crate::types::hooks::HookEvent;
19use crate::types::messages::{Message, UserContentBlock};
20
21/// Client for bidirectional streaming interactions with Claude
22///
23/// This client provides the same functionality as Python's ClaudeSDKClient,
24/// supporting bidirectional communication, streaming responses, and dynamic
25/// control over the Claude session.
26///
27/// This implementation uses the Codex-style architecture with isolated queries,
28/// where each query has its own completely independent QueryFull instance.
29/// This ensures complete message isolation and prevents ResultMessage confusion.
30///
31/// # Example
32///
33/// ```no_run
34/// use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
35/// use futures::StreamExt;
36///
37/// #[tokio::main]
38/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
39///     let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
40///
41///     // Connect to Claude
42///     client.connect().await?;
43///
44///     // Send a query
45///     client.query("Hello Claude!").await?;
46///
47///     // Receive response as a stream
48///     {
49///         let mut stream = client.receive_response();
50///         while let Some(message) = stream.next().await {
51///             println!("Received: {:?}", message?);
52///         }
53///     }
54///
55///     // Disconnect
56///     client.disconnect().await?;
57///     Ok(())
58/// }
59/// ```
60pub struct ClaudeClient {
61    options: ClaudeAgentOptions,
62    /// QueryManager for creating isolated queries (Codex-style architecture)
63    query_manager: Option<Arc<QueryManager>>,
64    /// Current query_id for the active prompt
65    current_query_id: Option<String>,
66    connected: bool,
67}
68
69impl ClaudeClient {
70    /// Create a new ClaudeClient
71    ///
72    /// # Arguments
73    ///
74    /// * `options` - Configuration options for the Claude client
75    ///
76    /// # Example
77    ///
78    /// ```no_run
79    /// use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
80    ///
81    /// let client = ClaudeClient::new(ClaudeAgentOptions::default());
82    /// ```
83    pub fn new(options: ClaudeAgentOptions) -> Self {
84        Self {
85            options,
86            query_manager: None,
87            current_query_id: None,
88            connected: false,
89        }
90    }
91
92    /// Create a new ClaudeClient with early validation
93    ///
94    /// Unlike `new()`, this validates the configuration eagerly by attempting
95    /// to create the transport. This catches issues like invalid working directory
96    /// or missing CLI before `connect()` is called.
97    ///
98    /// # Arguments
99    ///
100    /// * `options` - Configuration options for the Claude client
101    ///
102    /// # Errors
103    ///
104    /// Returns an error if:
105    /// - The working directory does not exist or is not a directory
106    /// - Claude CLI cannot be found
107    ///
108    /// # Example
109    ///
110    /// ```no_run
111    /// use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
112    ///
113    /// let client = ClaudeClient::try_new(ClaudeAgentOptions::default())?;
114    /// # Ok::<(), claude_agent_sdk_rs::ClaudeError>(())
115    /// ```
116    pub fn try_new(options: ClaudeAgentOptions) -> Result<Self> {
117        // Validate by attempting to create transport (but don't keep it)
118        let prompt = QueryPrompt::Streaming;
119        let _ = SubprocessTransport::new(prompt, options.clone())?;
120
121        Ok(Self {
122            options,
123            query_manager: None,
124            current_query_id: None,
125            connected: false,
126        })
127    }
128
129    /// Connect to Claude (analogous to Python's __aenter__)
130    ///
131    /// This establishes the connection to the Claude Code CLI and initializes
132    /// the bidirectional communication channel.
133    ///
134    /// Uses the Codex-style architecture with QueryManager for complete
135    /// message isolation between queries.
136    ///
137    /// # Errors
138    ///
139    /// Returns an error if:
140    /// - Claude CLI cannot be found or started
141    /// - The initialization handshake fails
142    /// - Hook registration fails
143    /// - `can_use_tool` callback is set with incompatible `permission_prompt_tool_name`
144    #[instrument(
145        name = "claude.client.connect",
146        skip(self),
147        fields(
148            has_can_use_tool = self.options.can_use_tool.is_some(),
149            has_hooks = self.options.hooks.is_some(),
150            model = %self.options.model.as_deref().unwrap_or("default"),
151        )
152    )]
153    pub async fn connect(&mut self) -> Result<()> {
154        if self.connected {
155            debug!("Client already connected, skipping");
156            return Ok(());
157        }
158
159        info!("Connecting to Claude Code CLI (using QueryManager for isolated queries)");
160
161        // Automatically set permission_prompt_tool_name to "stdio" when can_use_tool is provided
162        // This matches Python SDK behavior (client.py lines 106-122)
163        // which ensures CLI uses control protocol for permission prompts
164        let mut options = self.options.clone();
165        if options.can_use_tool.is_some() && options.permission_prompt_tool_name.is_none() {
166            info!("can_use_tool callback is set, automatically setting permission_prompt_tool_name to 'stdio'");
167            options.permission_prompt_tool_name = Some("stdio".to_string());
168        }
169
170        // Validate can_use_tool configuration (aligned with Python SDK behavior)
171        // When can_use_tool callback is set, permission_prompt_tool_name must be "stdio"
172        // to ensure the control protocol can handle permission requests
173        if options.can_use_tool.is_some()
174            && let Some(ref tool_name) = options.permission_prompt_tool_name
175            && tool_name != "stdio"
176        {
177            return Err(ClaudeError::InvalidConfig(
178                        "can_use_tool callback requires permission_prompt_tool_name to be 'stdio' or unset. \
179                        Custom permission_prompt_tool_name is incompatible with can_use_tool callback."
180                            .to_string(),
181                    ));
182        }
183
184        // Prepare hooks for initialization
185        // Build efficiency hooks if configured
186        let efficiency_hooks = self
187            .options
188            .efficiency
189            .as_ref()
190            .map(build_efficiency_hooks)
191            .unwrap_or_default();
192
193        // Merge user hooks with efficiency hooks
194        let merged_hooks = merge_hooks(self.options.hooks.clone(), efficiency_hooks);
195
196        // Convert hooks to internal format
197        let hooks = merged_hooks.as_ref().map(|hooks_map| {
198            hooks_map
199                .iter()
200                .map(|(event, matchers)| {
201                    let event_name = match event {
202                        HookEvent::PreToolUse => "PreToolUse",
203                        HookEvent::PostToolUse => "PostToolUse",
204                        HookEvent::UserPromptSubmit => "UserPromptSubmit",
205                        HookEvent::Stop => "Stop",
206                        HookEvent::SubagentStop => "SubagentStop",
207                        HookEvent::PreCompact => "PreCompact",
208                    };
209                    (event_name.to_string(), matchers.clone())
210                })
211                .collect()
212        });
213
214        // Extract SDK MCP servers from options
215        let sdk_mcp_servers =
216            if let crate::types::mcp::McpServers::Dict(servers_dict) = &self.options.mcp_servers {
217                servers_dict
218                    .iter()
219                    .filter_map(|(name, config)| {
220                        if let crate::types::mcp::McpServerConfig::Sdk(sdk_config) = config {
221                            Some((name.clone(), sdk_config.clone()))
222                        } else {
223                            None
224                        }
225                    })
226                    .collect()
227            } else {
228                std::collections::HashMap::new()
229            };
230
231        // Clone options for the transport factory
232        let options_clone = options.clone();
233
234        // Create QueryManager with a transport factory
235        // The factory creates new transports for each isolated query
236        // Note: connect() is called later in create_query(), not here
237        let mut query_manager = QueryManager::new(move || {
238            let prompt = QueryPrompt::Streaming;
239            let transport = SubprocessTransport::new(prompt, options_clone.clone())?;
240            Ok(Box::new(transport) as Box<dyn Transport>)
241        });
242
243        // Set control request timeout
244        query_manager.set_control_request_timeout(self.options.control_request_timeout);
245
246        // Set configuration on QueryManager
247        query_manager.set_hooks(hooks).await;
248        query_manager.set_sdk_mcp_servers(sdk_mcp_servers).await;
249        query_manager.set_can_use_tool(self.options.can_use_tool.clone()).await;
250
251        let query_manager = Arc::new(query_manager);
252
253        // Create the first query for initialization
254        let first_query_id = query_manager.create_query().await?;
255
256        // Start cleanup task for inactive queries
257        Arc::clone(&query_manager).start_cleanup_task();
258
259        self.query_manager = Some(query_manager);
260        self.current_query_id = Some(first_query_id);
261        self.connected = true;
262
263        info!("Successfully connected to Claude Code CLI with QueryManager");
264        Ok(())
265    }
266
267    /// Send a query to Claude
268    ///
269    /// This sends a new user prompt to Claude. Claude will remember the context
270    /// of previous queries within the same session.
271    ///
272    /// # Arguments
273    ///
274    /// * `prompt` - The user prompt to send
275    ///
276    /// # Errors
277    ///
278    /// Returns an error if the client is not connected or if sending fails.
279    ///
280    /// # Example
281    ///
282    /// ```no_run
283    /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
284    /// # #[tokio::main]
285    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
286    /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
287    /// # client.connect().await?;
288    /// client.query("What is 2 + 2?").await?;
289    /// # Ok(())
290    /// # }
291    /// ```
292    #[instrument(
293        name = "claude.client.query",
294        skip(self, prompt),
295        fields(session_id = "default",)
296    )]
297    pub async fn query(&mut self, prompt: impl Into<String>) -> Result<()> {
298        self.query_with_session(prompt, "default").await
299    }
300
301    /// Send a query to Claude with a specific session ID
302    ///
303    /// This sends a new user prompt to Claude. Different session IDs maintain
304    /// separate conversation contexts.
305    ///
306    /// # Arguments
307    ///
308    /// * `prompt` - The user prompt to send
309    /// * `session_id` - Session identifier for the conversation
310    ///
311    /// # Errors
312    ///
313    /// Returns an error if the client is not connected or if sending fails.
314    ///
315    /// # Example
316    ///
317    /// ```no_run
318    /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
319    /// # #[tokio::main]
320    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
321    /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
322    /// # client.connect().await?;
323    /// // Separate conversation contexts
324    /// client.query_with_session("First question", "session-1").await?;
325    /// client.query_with_session("Different question", "session-2").await?;
326    /// # Ok(())
327    /// # }
328    /// ```
329    pub async fn query_with_session(
330        &mut self,
331        prompt: impl Into<String>,
332        session_id: impl Into<String>,
333    ) -> Result<()> {
334        let query_manager = self.query_manager.as_ref().ok_or_else(|| {
335            ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
336        })?;
337
338        let prompt_str = prompt.into();
339        let session_id_str = session_id.into();
340
341        // Create a new isolated query for each prompt (Codex-style architecture)
342        // This ensures complete message isolation between prompts
343        let query_id = query_manager.create_query().await?;
344        self.current_query_id = Some(query_id.clone());
345
346        // Get the isolated query
347        let query = query_manager.get_query(&query_id)?;
348
349        // Format as JSON message for stream-json input format
350        let user_message = serde_json::json!({
351            "type": "user",
352            "message": {
353                "role": "user",
354                "content": prompt_str
355            },
356            "session_id": session_id_str
357        });
358
359        let message_str = serde_json::to_string(&user_message).map_err(|e| {
360            ClaudeError::Transport(format!("Failed to serialize user message: {}", e))
361        })?;
362
363        // Write directly to stdin (bypasses transport lock)
364        let stdin = query.stdin.clone();
365
366        if let Some(stdin_arc) = stdin {
367            let mut stdin_guard = stdin_arc.lock().await;
368            if let Some(ref mut stdin_stream) = *stdin_guard {
369                stdin_stream
370                    .write_all(message_str.as_bytes())
371                    .await
372                    .map_err(|e| ClaudeError::Transport(format!("Failed to write query: {}", e)))?;
373                stdin_stream.write_all(b"\n").await.map_err(|e| {
374                    ClaudeError::Transport(format!("Failed to write newline: {}", e))
375                })?;
376                stdin_stream
377                    .flush()
378                    .await
379                    .map_err(|e| ClaudeError::Transport(format!("Failed to flush: {}", e)))?;
380            } else {
381                return Err(ClaudeError::Transport("stdin not available".to_string()));
382            }
383        } else {
384            return Err(ClaudeError::Transport("stdin not set".to_string()));
385        }
386
387        debug!(
388            query_id = %query_id,
389            session_id = %session_id_str,
390            "Sent query to isolated query"
391        );
392
393        Ok(())
394    }
395
396    /// Send a query with structured content blocks (supports images)
397    ///
398    /// This method enables multimodal queries in bidirectional streaming mode.
399    /// Use it to send images alongside text for vision-related tasks.
400    ///
401    /// # Arguments
402    ///
403    /// * `content` - A vector of content blocks (text and/or images)
404    ///
405    /// # Errors
406    ///
407    /// Returns an error if:
408    /// - The content vector is empty (must include at least one text or image block)
409    /// - The client is not connected (call `connect()` first)
410    /// - Sending the message fails
411    ///
412    /// # Example
413    ///
414    /// ```no_run
415    /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions, UserContentBlock};
416    /// # #[tokio::main]
417    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
418    /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
419    /// # client.connect().await?;
420    /// let base64_data = "iVBORw0KGgo..."; // base64 encoded image
421    /// client.query_with_content(vec![
422    ///     UserContentBlock::text("What's in this image?"),
423    ///     UserContentBlock::image_base64("image/png", base64_data)?,
424    /// ]).await?;
425    /// # Ok(())
426    /// # }
427    /// ```
428    pub async fn query_with_content(
429        &mut self,
430        content: impl Into<Vec<UserContentBlock>>,
431    ) -> Result<()> {
432        self.query_with_content_and_session(content, "default")
433            .await
434    }
435
436    /// Send a query with structured content blocks and a specific session ID
437    ///
438    /// This method enables multimodal queries with session management for
439    /// maintaining separate conversation contexts.
440    ///
441    /// # Arguments
442    ///
443    /// * `content` - A vector of content blocks (text and/or images)
444    /// * `session_id` - Session identifier for the conversation
445    ///
446    /// # Errors
447    ///
448    /// Returns an error if:
449    /// - The content vector is empty (must include at least one text or image block)
450    /// - The client is not connected (call `connect()` first)
451    /// - Sending the message fails
452    ///
453    /// # Example
454    ///
455    /// ```no_run
456    /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions, UserContentBlock};
457    /// # #[tokio::main]
458    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
459    /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
460    /// # client.connect().await?;
461    /// client.query_with_content_and_session(
462    ///     vec![
463    ///         UserContentBlock::text("Analyze this chart"),
464    ///         UserContentBlock::image_url("https://example.com/chart.png"),
465    ///     ],
466    ///     "analysis-session",
467    /// ).await?;
468    /// # Ok(())
469    /// # }
470    /// ```
471    pub async fn query_with_content_and_session(
472        &mut self,
473        content: impl Into<Vec<UserContentBlock>>,
474        session_id: impl Into<String>,
475    ) -> Result<()> {
476        let query_manager = self.query_manager.as_ref().ok_or_else(|| {
477            ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
478        })?;
479
480        let content_blocks: Vec<UserContentBlock> = content.into();
481        UserContentBlock::validate_content(&content_blocks)?;
482
483        let session_id_str = session_id.into();
484
485        // Create a new isolated query for each prompt (Codex-style architecture)
486        // This ensures complete message isolation between prompts
487        let query_id = query_manager.create_query().await?;
488        self.current_query_id = Some(query_id.clone());
489
490        // Get the isolated query
491        let query = query_manager.get_query(&query_id)?;
492
493        // Format as JSON message for stream-json input format
494        // Content is an array of content blocks, not a simple string
495        let user_message = serde_json::json!({
496            "type": "user",
497            "message": {
498                "role": "user",
499                "content": content_blocks
500            },
501            "session_id": session_id_str
502        });
503
504        let message_str = serde_json::to_string(&user_message).map_err(|e| {
505            ClaudeError::Transport(format!("Failed to serialize user message: {}", e))
506        })?;
507
508        // Write directly to stdin (bypasses transport lock)
509        let stdin = query.stdin.clone();
510
511        if let Some(stdin_arc) = stdin {
512            let mut stdin_guard = stdin_arc.lock().await;
513            if let Some(ref mut stdin_stream) = *stdin_guard {
514                stdin_stream
515                    .write_all(message_str.as_bytes())
516                    .await
517                    .map_err(|e| ClaudeError::Transport(format!("Failed to write query: {}", e)))?;
518                stdin_stream.write_all(b"\n").await.map_err(|e| {
519                    ClaudeError::Transport(format!("Failed to write newline: {}", e))
520                })?;
521                stdin_stream
522                    .flush()
523                    .await
524                    .map_err(|e| ClaudeError::Transport(format!("Failed to flush: {}", e)))?;
525            } else {
526                return Err(ClaudeError::Transport("stdin not available".to_string()));
527            }
528        } else {
529            return Err(ClaudeError::Transport("stdin not set".to_string()));
530        }
531
532        debug!(
533            query_id = %query_id,
534            session_id = %session_id_str,
535            "Sent content query to isolated query"
536        );
537
538        Ok(())
539    }
540
541    /// Receive all messages as a stream (continuous)
542    ///
543    /// This method returns a stream that yields all messages from Claude
544    /// indefinitely until the stream is closed or an error occurs.
545    ///
546    /// Use this when you want to process all messages, including multiple
547    /// responses and system events.
548    ///
549    /// # Returns
550    ///
551    /// A stream of `Result<Message>` that continues until the connection closes.
552    ///
553    /// # Example
554    ///
555    /// ```no_run
556    /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
557    /// # use futures::StreamExt;
558    /// # #[tokio::main]
559    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
560    /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
561    /// # client.connect().await?;
562    /// # client.query("Hello").await?;
563    /// let mut stream = client.receive_messages();
564    /// while let Some(message) = stream.next().await {
565    ///     println!("Received: {:?}", message?);
566    /// }
567    /// # Ok(())
568    /// # }
569    /// ```
570    pub fn receive_messages(&self) -> Pin<Box<dyn Stream<Item = Result<Message>> + Send + '_>> {
571        let query_manager = match &self.query_manager {
572            Some(qm) => Arc::clone(qm),
573            None => {
574                return Box::pin(futures::stream::once(async {
575                    Err(ClaudeError::InvalidConfig(
576                        "Client not connected. Call connect() first.".to_string(),
577                    ))
578                }));
579            }
580        };
581
582        let query_id = match &self.current_query_id {
583            Some(id) => id.clone(),
584            None => {
585                return Box::pin(futures::stream::once(async {
586                    Err(ClaudeError::InvalidConfig(
587                        "No active query. Call query() first.".to_string(),
588                    ))
589                }));
590            }
591        };
592
593        Box::pin(async_stream::stream! {
594            // Get the isolated query and its message receiver
595            let query = match query_manager.get_query(&query_id) {
596                Ok(q) => q,
597                Err(e) => {
598                    yield Err(e);
599                    return;
600                }
601            };
602
603            let rx: Arc<Mutex<tokio::sync::mpsc::UnboundedReceiver<serde_json::Value>>> = {
604                Arc::clone(&query.message_rx)
605            };
606
607            loop {
608                let message = {
609                    let mut rx_guard = rx.lock().await;
610                    rx_guard.recv().await
611                };
612
613                match message {
614                    Some(json) => {
615                        match MessageParser::parse(json) {
616                            Ok(msg) => yield Ok(msg),
617                            Err(e) => {
618                                eprintln!("Failed to parse message: {}", e);
619                                yield Err(e);
620                            }
621                        }
622                    }
623                    None => break,
624                }
625            }
626        })
627    }
628
629    /// Receive messages until a ResultMessage
630    ///
631    /// This method returns a stream that yields messages until it encounters
632    /// a `ResultMessage`, which signals the completion of a Claude response.
633    ///
634    /// This is the most common pattern for handling Claude responses, as it
635    /// processes one complete "turn" of the conversation.
636    ///
637    /// This method uses query-scoped message channels to ensure message isolation,
638    /// preventing late-arriving ResultMessages from being consumed by the wrong prompt.
639    ///
640    /// # Returns
641    ///
642    /// A stream of `Result<Message>` that ends when a ResultMessage is received.
643    ///
644    /// # Example
645    ///
646    /// ```no_run
647    /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions, Message};
648    /// # use futures::StreamExt;
649    /// # #[tokio::main]
650    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
651    /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
652    /// # client.connect().await?;
653    /// # client.query("Hello").await?;
654    /// let mut stream = client.receive_response();
655    /// while let Some(message) = stream.next().await {
656    ///     match message? {
657    ///         Message::Assistant(msg) => println!("Assistant: {:?}", msg),
658    ///         Message::Result(result) => {
659    ///             println!("Done! Cost: ${:?}", result.total_cost_usd);
660    ///             break;
661    ///         }
662    ///         _ => {}
663    ///     }
664    /// }
665    /// # Ok(())
666    /// # }
667    /// ```
668    pub fn receive_response(&self) -> Pin<Box<dyn Stream<Item = Result<Message>> + Send + '_>> {
669        let query_manager = match &self.query_manager {
670            Some(qm) => Arc::clone(qm),
671            None => {
672                return Box::pin(futures::stream::once(async {
673                    Err(ClaudeError::InvalidConfig(
674                        "Client not connected. Call connect() first.".to_string(),
675                    ))
676                }));
677            }
678        };
679
680        let query_id = match &self.current_query_id {
681            Some(id) => id.clone(),
682            None => {
683                return Box::pin(futures::stream::once(async {
684                    Err(ClaudeError::InvalidConfig(
685                        "No active query. Call query() first.".to_string(),
686                    ))
687                }));
688            }
689        };
690
691        Box::pin(async_stream::stream! {
692            // ====================================================================
693            // ISOLATED QUERY MESSAGE CHANNEL (Codex-style)
694            // ====================================================================
695            // In the Codex-style architecture, each query has its own completely
696            // isolated QueryFull instance. We get the message receiver directly
697            // from the isolated query, eliminating the need for routing logic.
698            //
699            // This provides:
700            // - Complete message isolation
701            // - No possibility of message confusion
702            // - Simpler architecture without routing overhead
703            //
704            // Note: Cleanup is handled by the periodic cleanup task in QueryManager,
705            // which removes inactive queries whose channels have been closed.
706
707            debug!(
708                query_id = %query_id,
709                "Getting message receiver from isolated query"
710            );
711
712            // Get the isolated query and its message receiver
713            let query = match query_manager.get_query(&query_id) {
714                Ok(q) => q,
715                Err(e) => {
716                    yield Err(e);
717                    return;
718                }
719            };
720
721            // Get the message receiver from the isolated query
722            // In isolated mode, we directly access the message_rx without routing
723            let rx: Arc<Mutex<tokio::sync::mpsc::UnboundedReceiver<serde_json::Value>>> = {
724                Arc::clone(&query.message_rx)
725            };
726
727            loop {
728                let message = {
729                    let mut rx_guard = rx.lock().await;
730                    rx_guard.recv().await
731                };
732
733                match message {
734                    Some(json) => {
735                        match MessageParser::parse(json) {
736                            Ok(msg) => {
737                                let is_result = matches!(msg, Message::Result(_));
738                                yield Ok(msg);
739                                if is_result {
740                                    debug!(
741                                        query_id = %query_id,
742                                        "Received ResultMessage, ending stream"
743                                    );
744                                    // Cleanup will be handled by the periodic cleanup task
745                                    // when the query becomes inactive
746                                    break;
747                                }
748                            }
749                            Err(e) => {
750                                eprintln!("Failed to parse message: {}", e);
751                                yield Err(e);
752                            }
753                        }
754                    }
755                    None => {
756                        debug!(
757                            query_id = %query_id,
758                            "Isolated query channel closed"
759                        );
760                        // Cleanup will be handled by the periodic cleanup task
761                        break;
762                    }
763                }
764            }
765        })
766    }
767
768    /// Drain any leftover messages from the previous prompt
769    ///
770    /// This method removes any messages remaining in the channel from a previous
771    /// prompt. This should be called before starting a new prompt to ensure
772    /// that the new prompt doesn't receive stale messages.
773    ///
774    /// This is important when prompts are cancelled or end unexpectedly,
775    /// as there may be buffered messages that would otherwise be received
776    /// by the next prompt.
777    ///
778    /// # Returns
779    ///
780    /// The number of messages drained from the channel.
781    ///
782    /// # Example
783    ///
784    /// ```no_run
785    /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
786    /// # #[tokio::main]
787    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
788    /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
789    /// # client.connect().await?;
790    /// // Before starting a new prompt, drain any leftover messages
791    /// let drained = client.drain_messages().await;
792    /// if drained > 0 {
793    ///     eprintln!("Drained {} leftover messages from previous prompt", drained);
794    /// }
795    /// # Ok(())
796    /// # }
797    /// ```
798    pub async fn drain_messages(&self) -> usize {
799        let Some(query_manager) = &self.query_manager else {
800            return 0;
801        };
802
803        let Some(query_id) = &self.current_query_id else {
804            return 0;
805        };
806
807        let Ok(query) = query_manager.get_query(query_id) else {
808            return 0;
809        };
810
811        let rx: Arc<Mutex<tokio::sync::mpsc::UnboundedReceiver<serde_json::Value>>> = {
812            Arc::clone(&query.message_rx)
813        };
814
815        let mut count = 0;
816        // Use try_recv to drain all currently available messages without blocking
817        loop {
818            let mut rx_guard = rx.lock().await;
819            match rx_guard.try_recv() {
820                Ok(_) => count += 1,
821                Err(tokio::sync::mpsc::error::TryRecvError::Empty) => break,
822                Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => break,
823            }
824        }
825
826        if count > 0 {
827            debug!(count, "Drained leftover messages from previous prompt");
828        }
829
830        count
831    }
832
833    /// Send an interrupt signal to stop the current Claude operation
834    ///
835    /// This is analogous to Python's `client.interrupt()`.
836    ///
837    /// # Errors
838    ///
839    /// Returns an error if the client is not connected or if sending fails.
840    pub async fn interrupt(&self) -> Result<()> {
841        let query_manager = self.query_manager.as_ref().ok_or_else(|| {
842            ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
843        })?;
844
845        let query_id = self.current_query_id.as_ref().ok_or_else(|| {
846            ClaudeError::InvalidConfig("No active query. Call query() first.".to_string())
847        })?;
848
849        let query = query_manager.get_query(query_id)?;
850        query.interrupt().await
851    }
852
853    /// Change the permission mode dynamically
854    ///
855    /// This is analogous to Python's `client.set_permission_mode()`.
856    ///
857    /// # Arguments
858    ///
859    /// * `mode` - The new permission mode to set
860    ///
861    /// # Errors
862    ///
863    /// Returns an error if the client is not connected or if sending fails.
864    pub async fn set_permission_mode(&self, mode: PermissionMode) -> Result<()> {
865        let query_manager = self.query_manager.as_ref().ok_or_else(|| {
866            ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
867        })?;
868
869        let query_id = self.current_query_id.as_ref().ok_or_else(|| {
870            ClaudeError::InvalidConfig("No active query. Call query() first.".to_string())
871        })?;
872
873        let query = query_manager.get_query(query_id)?;
874        query.set_permission_mode(mode).await
875    }
876
877    /// Change the AI model dynamically
878    ///
879    /// This is analogous to Python's `client.set_model()`.
880    ///
881    /// # Arguments
882    ///
883    /// * `model` - The new model name, or None to use default
884    ///
885    /// # Errors
886    ///
887    /// Returns an error if the client is not connected or if sending fails.
888    pub async fn set_model(&self, model: Option<&str>) -> Result<()> {
889        let query_manager = self.query_manager.as_ref().ok_or_else(|| {
890            ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
891        })?;
892
893        let query_id = self.current_query_id.as_ref().ok_or_else(|| {
894            ClaudeError::InvalidConfig("No active query. Call query() first.".to_string())
895        })?;
896
897        let query = query_manager.get_query(query_id)?;
898        query.set_model(model).await
899    }
900
901    /// Rewind tracked files to their state at a specific user message.
902    ///
903    /// This is analogous to Python's `client.rewind_files()`.
904    ///
905    /// # Requirements
906    ///
907    /// - `enable_file_checkpointing=true` in options to track file changes
908    /// - `extra_args={"replay-user-messages": None}` to receive UserMessage
909    ///   objects with `uuid` in the response stream
910    ///
911    /// # Arguments
912    ///
913    /// * `user_message_id` - UUID of the user message to rewind to. This should be
914    ///   the `uuid` field from a `UserMessage` received during the conversation.
915    ///
916    /// # Errors
917    ///
918    /// Returns an error if the client is not connected or if sending fails.
919    ///
920    /// # Example
921    ///
922    /// ```no_run
923    /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions, Message};
924    /// # use std::collections::HashMap;
925    /// # #[tokio::main]
926    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
927    /// let options = ClaudeAgentOptions::builder()
928    ///     .enable_file_checkpointing(true)
929    ///     .extra_args(HashMap::from([("replay-user-messages".to_string(), None)]))
930    ///     .build();
931    /// let mut client = ClaudeClient::new(options);
932    /// client.connect().await?;
933    ///
934    /// client.query("Make some changes to my files").await?;
935    /// let mut checkpoint_id = None;
936    /// {
937    ///     let mut stream = client.receive_response();
938    ///     use futures::StreamExt;
939    ///     while let Some(Ok(msg)) = stream.next().await {
940    ///         if let Message::User(user_msg) = &msg {
941    ///             if let Some(uuid) = &user_msg.uuid {
942    ///                 checkpoint_id = Some(uuid.clone());
943    ///             }
944    ///         }
945    ///     }
946    /// }
947    ///
948    /// // Later, rewind to that point
949    /// if let Some(id) = checkpoint_id {
950    ///     client.rewind_files(&id).await?;
951    /// }
952    /// # Ok(())
953    /// # }
954    /// ```
955    pub async fn rewind_files(&self, user_message_id: &str) -> Result<()> {
956        let query_manager = self.query_manager.as_ref().ok_or_else(|| {
957            ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
958        })?;
959
960        let query_id = self.current_query_id.as_ref().ok_or_else(|| {
961            ClaudeError::InvalidConfig("No active query. Call query() first.".to_string())
962        })?;
963
964        let query = query_manager.get_query(query_id)?;
965        query.rewind_files(user_message_id).await
966    }
967
968    /// Get server initialization info including available commands and output styles
969    ///
970    /// Returns initialization information from the Claude Code server including:
971    /// - Available commands (slash commands, system commands, etc.)
972    /// - Current and available output styles
973    /// - Server capabilities
974    ///
975    /// This is analogous to Python's `client.get_server_info()`.
976    ///
977    /// # Returns
978    ///
979    /// Dictionary with server info, or None if not connected
980    ///
981    /// # Example
982    ///
983    /// ```no_run
984    /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
985    /// # #[tokio::main]
986    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
987    /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
988    /// # client.connect().await?;
989    /// if let Some(info) = client.get_server_info().await {
990    ///     println!("Commands available: {}", info.get("commands").map(|c| c.as_array().map(|a| a.len()).unwrap_or(0)).unwrap_or(0));
991    ///     println!("Output style: {:?}", info.get("output_style"));
992    /// }
993    /// # Ok(())
994    /// # }
995    /// ```
996    pub async fn get_server_info(&self) -> Option<serde_json::Value> {
997        let query_manager = self.query_manager.as_ref()?;
998        let query_id = self.current_query_id.as_ref()?;
999        let Ok(query) = query_manager.get_query(query_id) else {
1000            return None;
1001        };
1002        query.get_initialization_result().await
1003    }
1004
1005    /// Start a new session by switching to a different session ID
1006    ///
1007    /// This is a convenience method that creates a new conversation context.
1008    /// It's equivalent to calling `query_with_session()` with a new session ID.
1009    ///
1010    /// To completely clear memory and start fresh, use `ClaudeAgentOptions::builder().fork_session(true).build()`
1011    /// when creating a new client.
1012    ///
1013    /// # Arguments
1014    ///
1015    /// * `session_id` - The new session ID to use
1016    /// * `prompt` - Initial message for the new session
1017    ///
1018    /// # Errors
1019    ///
1020    /// Returns an error if the client is not connected or if sending fails.
1021    ///
1022    /// # Example
1023    ///
1024    /// ```no_run
1025    /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
1026    /// # #[tokio::main]
1027    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
1028    /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
1029    /// # client.connect().await?;
1030    /// // First conversation
1031    /// client.query("Hello").await?;
1032    ///
1033    /// // Start new conversation with different context
1034    /// client.new_session("session-2", "Tell me about Rust").await?;
1035    /// # Ok(())
1036    /// # }
1037    /// ```
1038    pub async fn new_session(
1039        &mut self,
1040        session_id: impl Into<String>,
1041        prompt: impl Into<String>,
1042    ) -> Result<()> {
1043        self.query_with_session(prompt, session_id).await
1044    }
1045
1046    /// Disconnect from Claude (analogous to Python's __aexit__)
1047    ///
1048    /// This cleanly shuts down the connection to Claude Code CLI.
1049    ///
1050    /// # Errors
1051    ///
1052    /// Returns an error if disconnection fails.
1053    #[instrument(name = "claude.client.disconnect", skip(self))]
1054    pub async fn disconnect(&mut self) -> Result<()> {
1055        if !self.connected {
1056            debug!("Client already disconnected");
1057            return Ok(());
1058        }
1059
1060        info!("Disconnecting from Claude Code CLI (closing all isolated queries)");
1061
1062        if let Some(query_manager) = self.query_manager.take() {
1063            // Get all queries for cleanup
1064            let queries = query_manager.get_all_queries();
1065
1066            // Close all isolated queries by closing their resources
1067            // This signals each CLI process to exit
1068            for (_query_id, query) in &queries {
1069                // Close stdin (if available)
1070                if let Some(ref stdin_arc) = query.stdin {
1071                    let mut stdin_guard = stdin_arc.lock().await;
1072                    if let Some(mut stdin_stream) = stdin_guard.take() {
1073                        let _ = stdin_stream.shutdown().await;
1074                    }
1075                }
1076
1077                // Close transport
1078                let transport = Arc::clone(&query.transport);
1079                let mut transport_guard = transport.lock().await;
1080                let _ = transport_guard.close().await;
1081            }
1082
1083            // Give background tasks a moment to finish reading and release locks
1084            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1085        }
1086
1087        self.current_query_id = None;
1088        self.connected = false;
1089        debug!("Disconnected successfully");
1090        Ok(())
1091    }
1092}
1093
1094impl Drop for ClaudeClient {
1095    fn drop(&mut self) {
1096        // Note: We can't run async code in Drop, so we can't guarantee clean shutdown
1097        // Users should call disconnect() explicitly
1098        if self.connected {
1099            eprintln!(
1100                "Warning: ClaudeClient dropped without calling disconnect(). Resources may not be cleaned up properly."
1101            );
1102        }
1103    }
1104}
1105
1106#[cfg(test)]
1107mod tests {
1108    use super::*;
1109    use crate::types::permissions::{PermissionResult, PermissionResultAllow};
1110    use std::sync::Arc;
1111
1112    #[tokio::test]
1113    async fn test_connect_rejects_can_use_tool_with_custom_permission_tool() {
1114        let callback: crate::types::permissions::CanUseToolCallback =
1115            Arc::new(|_tool_name, _tool_input, _context| {
1116                Box::pin(async move { PermissionResult::Allow(PermissionResultAllow::default()) })
1117            });
1118
1119        let opts = ClaudeAgentOptions::builder()
1120            .can_use_tool(callback)
1121            .permission_prompt_tool_name("custom_tool") // Not "stdio"
1122            .build();
1123
1124        let mut client = ClaudeClient::new(opts);
1125        let result = client.connect().await;
1126
1127        assert!(result.is_err());
1128        let err = result.unwrap_err();
1129        assert!(matches!(err, ClaudeError::InvalidConfig(_)));
1130        assert!(err.to_string().contains("permission_prompt_tool_name"));
1131    }
1132
1133    #[tokio::test]
1134    async fn test_connect_accepts_can_use_tool_with_stdio() {
1135        let callback: crate::types::permissions::CanUseToolCallback =
1136            Arc::new(|_tool_name, _tool_input, _context| {
1137                Box::pin(async move { PermissionResult::Allow(PermissionResultAllow::default()) })
1138            });
1139
1140        let opts = ClaudeAgentOptions::builder()
1141            .can_use_tool(callback)
1142            .permission_prompt_tool_name("stdio") // Explicitly "stdio" is OK
1143            .build();
1144
1145        let mut client = ClaudeClient::new(opts);
1146        // This will fail later (CLI not found), but should pass validation
1147        let result = client.connect().await;
1148
1149        // Should not be InvalidConfig error about permission_prompt_tool_name
1150        if let Err(ref err) = result {
1151            assert!(
1152                !err.to_string().contains("permission_prompt_tool_name"),
1153                "Should not fail on permission_prompt_tool_name validation"
1154            );
1155        }
1156    }
1157
1158    #[tokio::test]
1159    async fn test_connect_accepts_can_use_tool_without_permission_tool() {
1160        let callback: crate::types::permissions::CanUseToolCallback =
1161            Arc::new(|_tool_name, _tool_input, _context| {
1162                Box::pin(async move { PermissionResult::Allow(PermissionResultAllow::default()) })
1163            });
1164
1165        let opts = ClaudeAgentOptions::builder()
1166            .can_use_tool(callback)
1167            // No permission_prompt_tool_name set - defaults to stdio
1168            .build();
1169
1170        let mut client = ClaudeClient::new(opts);
1171        // This will fail later (CLI not found), but should pass validation
1172        let result = client.connect().await;
1173
1174        // Should not be InvalidConfig error about permission_prompt_tool_name
1175        if let Err(ref err) = result {
1176            assert!(
1177                !err.to_string().contains("permission_prompt_tool_name"),
1178                "Should not fail on permission_prompt_tool_name validation"
1179            );
1180        }
1181    }
1182}