claude_code_agent_sdk/
client.rs

1//! ClaudeClient for bidirectional streaming interactions with hook support
2
3use tracing::{debug, info, instrument};
4
5use futures::stream::Stream;
6use std::pin::Pin;
7use std::sync::Arc;
8use tokio::io::AsyncWriteExt;
9use tokio::sync::Mutex;
10
11use crate::errors::{ClaudeError, Result};
12use crate::internal::message_parser::MessageParser;
13use crate::internal::query_manager::QueryManager;
14use crate::internal::transport::subprocess::QueryPrompt;
15use crate::internal::transport::{SubprocessTransport, Transport};
16use crate::types::config::{ClaudeAgentOptions, PermissionMode};
17use crate::types::efficiency::{build_efficiency_hooks, merge_hooks};
18use crate::types::hooks::HookEvent;
19use crate::types::messages::{Message, UserContentBlock};
20
21/// Client for bidirectional streaming interactions with Claude
22///
23/// This client provides the same functionality as Python's ClaudeSDKClient,
24/// supporting bidirectional communication, streaming responses, and dynamic
25/// control over the Claude session.
26///
27/// This implementation uses the Codex-style architecture with isolated queries,
28/// where each query has its own completely independent QueryFull instance.
29/// This ensures complete message isolation and prevents ResultMessage confusion.
30///
31/// # Example
32///
33/// ```no_run
34/// use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
35/// use futures::StreamExt;
36///
37/// #[tokio::main]
38/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
39///     let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
40///
41///     // Connect to Claude
42///     client.connect().await?;
43///
44///     // Send a query
45///     client.query("Hello Claude!").await?;
46///
47///     // Receive response as a stream
48///     {
49///         let mut stream = client.receive_response();
50///         while let Some(message) = stream.next().await {
51///             println!("Received: {:?}", message?);
52///         }
53///     }
54///
55///     // Disconnect
56///     client.disconnect().await?;
57///     Ok(())
58/// }
59/// ```
60pub struct ClaudeClient {
61    options: ClaudeAgentOptions,
62    /// QueryManager for creating isolated queries (Codex-style architecture)
63    query_manager: Option<Arc<QueryManager>>,
64    /// Current query_id for the active prompt
65    current_query_id: Option<String>,
66    connected: bool,
67}
68
69impl ClaudeClient {
70    /// Create a new ClaudeClient
71    ///
72    /// # Arguments
73    ///
74    /// * `options` - Configuration options for the Claude client
75    ///
76    /// # Example
77    ///
78    /// ```no_run
79    /// use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
80    ///
81    /// let client = ClaudeClient::new(ClaudeAgentOptions::default());
82    /// ```
83    pub fn new(options: ClaudeAgentOptions) -> Self {
84        Self {
85            options,
86            query_manager: None,
87            current_query_id: None,
88            connected: false,
89        }
90    }
91
92    /// Create a new ClaudeClient with early validation
93    ///
94    /// Unlike `new()`, this validates the configuration eagerly by attempting
95    /// to create the transport. This catches issues like invalid working directory
96    /// or missing CLI before `connect()` is called.
97    ///
98    /// # Arguments
99    ///
100    /// * `options` - Configuration options for the Claude client
101    ///
102    /// # Errors
103    ///
104    /// Returns an error if:
105    /// - The working directory does not exist or is not a directory
106    /// - Claude CLI cannot be found
107    ///
108    /// # Example
109    ///
110    /// ```no_run
111    /// use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
112    ///
113    /// let client = ClaudeClient::try_new(ClaudeAgentOptions::default())?;
114    /// # Ok::<(), claude_agent_sdk_rs::ClaudeError>(())
115    /// ```
116    pub fn try_new(options: ClaudeAgentOptions) -> Result<Self> {
117        // Validate by attempting to create transport (but don't keep it)
118        let prompt = QueryPrompt::Streaming;
119        let _ = SubprocessTransport::new(prompt, options.clone())?;
120
121        Ok(Self {
122            options,
123            query_manager: None,
124            current_query_id: None,
125            connected: false,
126        })
127    }
128
129    /// Connect to Claude (analogous to Python's __aenter__)
130    ///
131    /// This establishes the connection to the Claude Code CLI and initializes
132    /// the bidirectional communication channel.
133    ///
134    /// Uses the Codex-style architecture with QueryManager for complete
135    /// message isolation between queries.
136    ///
137    /// # Errors
138    ///
139    /// Returns an error if:
140    /// - Claude CLI cannot be found or started
141    /// - The initialization handshake fails
142    /// - Hook registration fails
143    /// - `can_use_tool` callback is set with incompatible `permission_prompt_tool_name`
144    #[instrument(
145        name = "claude.client.connect",
146        skip(self),
147        fields(
148            has_can_use_tool = self.options.can_use_tool.is_some(),
149            has_hooks = self.options.hooks.is_some(),
150            model = %self.options.model.as_deref().unwrap_or("default"),
151        )
152    )]
153    pub async fn connect(&mut self) -> Result<()> {
154        if self.connected {
155            debug!("Client already connected, skipping");
156            return Ok(());
157        }
158
159        info!("Connecting to Claude Code CLI (using QueryManager for isolated queries)");
160
161        // Automatically set permission_prompt_tool_name to "stdio" when can_use_tool is provided
162        // This matches Python SDK behavior (client.py lines 106-122)
163        // which ensures CLI uses control protocol for permission prompts
164        let mut options = self.options.clone();
165        if options.can_use_tool.is_some() && options.permission_prompt_tool_name.is_none() {
166            info!("can_use_tool callback is set, automatically setting permission_prompt_tool_name to 'stdio'");
167            options.permission_prompt_tool_name = Some("stdio".to_string());
168        }
169
170        // Validate can_use_tool configuration (aligned with Python SDK behavior)
171        // When can_use_tool callback is set, permission_prompt_tool_name must be "stdio"
172        // to ensure the control protocol can handle permission requests
173        if options.can_use_tool.is_some()
174            && let Some(ref tool_name) = options.permission_prompt_tool_name
175            && tool_name != "stdio"
176        {
177            return Err(ClaudeError::InvalidConfig(
178                        "can_use_tool callback requires permission_prompt_tool_name to be 'stdio' or unset. \
179                        Custom permission_prompt_tool_name is incompatible with can_use_tool callback."
180                            .to_string(),
181                    ));
182        }
183
184        // Prepare hooks for initialization
185        // Build efficiency hooks if configured
186        let efficiency_hooks = self
187            .options
188            .efficiency
189            .as_ref()
190            .map(build_efficiency_hooks)
191            .unwrap_or_default();
192
193        // Merge user hooks with efficiency hooks
194        let merged_hooks = merge_hooks(self.options.hooks.clone(), efficiency_hooks);
195
196        // Convert hooks to internal format
197        let hooks = merged_hooks.as_ref().map(|hooks_map| {
198            hooks_map
199                .iter()
200                .map(|(event, matchers)| {
201                    let event_name = match event {
202                        HookEvent::PreToolUse => "PreToolUse",
203                        HookEvent::PostToolUse => "PostToolUse",
204                        HookEvent::UserPromptSubmit => "UserPromptSubmit",
205                        HookEvent::Stop => "Stop",
206                        HookEvent::SubagentStop => "SubagentStop",
207                        HookEvent::PreCompact => "PreCompact",
208                    };
209                    (event_name.to_string(), matchers.clone())
210                })
211                .collect()
212        });
213
214        // Extract SDK MCP servers from options
215        let sdk_mcp_servers =
216            if let crate::types::mcp::McpServers::Dict(servers_dict) = &self.options.mcp_servers {
217                servers_dict
218                    .iter()
219                    .filter_map(|(name, config)| {
220                        if let crate::types::mcp::McpServerConfig::Sdk(sdk_config) = config {
221                            Some((name.clone(), sdk_config.clone()))
222                        } else {
223                            None
224                        }
225                    })
226                    .collect()
227            } else {
228                std::collections::HashMap::new()
229            };
230
231        // Clone options for the transport factory
232        let options_clone = options.clone();
233
234        // Create QueryManager with a transport factory
235        // The factory creates new transports for each isolated query
236        // Note: connect() is called later in create_query(), not here
237        let mut query_manager = QueryManager::new(move || {
238            let prompt = QueryPrompt::Streaming;
239            let transport = SubprocessTransport::new(prompt, options_clone.clone())?;
240            Ok(Box::new(transport) as Box<dyn Transport>)
241        });
242
243        // Set control request timeout
244        query_manager.set_control_request_timeout(self.options.control_request_timeout);
245
246        // Set configuration on QueryManager
247        query_manager.set_hooks(hooks).await;
248        query_manager.set_sdk_mcp_servers(sdk_mcp_servers).await;
249        query_manager.set_can_use_tool(self.options.can_use_tool.clone()).await;
250
251        let query_manager = Arc::new(query_manager);
252
253        // Create the first query for initialization
254        let first_query_id = query_manager.create_query().await?;
255
256        // Start cleanup task for inactive queries
257        Arc::clone(&query_manager).start_cleanup_task();
258
259        self.query_manager = Some(query_manager);
260        self.current_query_id = Some(first_query_id);
261        self.connected = true;
262
263        info!("Successfully connected to Claude Code CLI with QueryManager");
264        Ok(())
265    }
266
267    /// Send a query to Claude
268    ///
269    /// This sends a new user prompt to Claude. Claude will remember the context
270    /// of previous queries within the same session.
271    ///
272    /// # Arguments
273    ///
274    /// * `prompt` - The user prompt to send
275    ///
276    /// # Errors
277    ///
278    /// Returns an error if the client is not connected or if sending fails.
279    ///
280    /// # Example
281    ///
282    /// ```no_run
283    /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
284    /// # #[tokio::main]
285    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
286    /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
287    /// # client.connect().await?;
288    /// client.query("What is 2 + 2?").await?;
289    /// # Ok(())
290    /// # }
291    /// ```
292    #[instrument(
293        name = "claude.client.query",
294        skip(self, prompt),
295        fields(session_id = "default",)
296    )]
297    pub async fn query(&mut self, prompt: impl Into<String>) -> Result<()> {
298        self.query_with_session(prompt, "default").await
299    }
300
301    /// Send a query to Claude with a specific session ID
302    ///
303    /// This sends a new user prompt to Claude. Different session IDs maintain
304    /// separate conversation contexts.
305    ///
306    /// # Arguments
307    ///
308    /// * `prompt` - The user prompt to send
309    /// * `session_id` - Session identifier for the conversation
310    ///
311    /// # Errors
312    ///
313    /// Returns an error if the client is not connected or if sending fails.
314    ///
315    /// # Example
316    ///
317    /// ```no_run
318    /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
319    /// # #[tokio::main]
320    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
321    /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
322    /// # client.connect().await?;
323    /// // Separate conversation contexts
324    /// client.query_with_session("First question", "session-1").await?;
325    /// client.query_with_session("Different question", "session-2").await?;
326    /// # Ok(())
327    /// # }
328    /// ```
329    pub async fn query_with_session(
330        &mut self,
331        prompt: impl Into<String>,
332        session_id: impl Into<String>,
333    ) -> Result<()> {
334        let query_manager = self.query_manager.as_ref().ok_or_else(|| {
335            ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
336        })?;
337
338        let prompt_str = prompt.into();
339        let session_id_str = session_id.into();
340
341        // Get or create query for this session (reuses existing query to maintain context)
342        let query_id = query_manager.get_or_create_session_query(&session_id_str).await?;
343        self.current_query_id = Some(query_id.clone());
344
345        // Get the isolated query
346        let query = query_manager.get_query(&query_id)?;
347
348        // Format as JSON message for stream-json input format
349        let user_message = serde_json::json!({
350            "type": "user",
351            "message": {
352                "role": "user",
353                "content": prompt_str
354            },
355            "session_id": session_id_str
356        });
357
358        let message_str = serde_json::to_string(&user_message).map_err(|e| {
359            ClaudeError::Transport(format!("Failed to serialize user message: {}", e))
360        })?;
361
362        // Write directly to stdin (bypasses transport lock)
363        let stdin = query.stdin.clone();
364
365        if let Some(stdin_arc) = stdin {
366            let mut stdin_guard = stdin_arc.lock().await;
367            if let Some(ref mut stdin_stream) = *stdin_guard {
368                stdin_stream
369                    .write_all(message_str.as_bytes())
370                    .await
371                    .map_err(|e| ClaudeError::Transport(format!("Failed to write query: {}", e)))?;
372                stdin_stream.write_all(b"\n").await.map_err(|e| {
373                    ClaudeError::Transport(format!("Failed to write newline: {}", e))
374                })?;
375                stdin_stream
376                    .flush()
377                    .await
378                    .map_err(|e| ClaudeError::Transport(format!("Failed to flush: {}", e)))?;
379            } else {
380                return Err(ClaudeError::Transport("stdin not available".to_string()));
381            }
382        } else {
383            return Err(ClaudeError::Transport("stdin not set".to_string()));
384        }
385
386        debug!(
387            query_id = %query_id,
388            session_id = %session_id_str,
389            "Sent query to isolated query"
390        );
391
392        Ok(())
393    }
394
395    /// Send a query with structured content blocks (supports images)
396    ///
397    /// This method enables multimodal queries in bidirectional streaming mode.
398    /// Use it to send images alongside text for vision-related tasks.
399    ///
400    /// # Arguments
401    ///
402    /// * `content` - A vector of content blocks (text and/or images)
403    ///
404    /// # Errors
405    ///
406    /// Returns an error if:
407    /// - The content vector is empty (must include at least one text or image block)
408    /// - The client is not connected (call `connect()` first)
409    /// - Sending the message fails
410    ///
411    /// # Example
412    ///
413    /// ```no_run
414    /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions, UserContentBlock};
415    /// # #[tokio::main]
416    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
417    /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
418    /// # client.connect().await?;
419    /// let base64_data = "iVBORw0KGgo..."; // base64 encoded image
420    /// client.query_with_content(vec![
421    ///     UserContentBlock::text("What's in this image?"),
422    ///     UserContentBlock::image_base64("image/png", base64_data)?,
423    /// ]).await?;
424    /// # Ok(())
425    /// # }
426    /// ```
427    pub async fn query_with_content(
428        &mut self,
429        content: impl Into<Vec<UserContentBlock>>,
430    ) -> Result<()> {
431        self.query_with_content_and_session(content, "default")
432            .await
433    }
434
435    /// Send a query with structured content blocks and a specific session ID
436    ///
437    /// This method enables multimodal queries with session management for
438    /// maintaining separate conversation contexts.
439    ///
440    /// # Arguments
441    ///
442    /// * `content` - A vector of content blocks (text and/or images)
443    /// * `session_id` - Session identifier for the conversation
444    ///
445    /// # Errors
446    ///
447    /// Returns an error if:
448    /// - The content vector is empty (must include at least one text or image block)
449    /// - The client is not connected (call `connect()` first)
450    /// - Sending the message fails
451    ///
452    /// # Example
453    ///
454    /// ```no_run
455    /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions, UserContentBlock};
456    /// # #[tokio::main]
457    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
458    /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
459    /// # client.connect().await?;
460    /// client.query_with_content_and_session(
461    ///     vec![
462    ///         UserContentBlock::text("Analyze this chart"),
463    ///         UserContentBlock::image_url("https://example.com/chart.png"),
464    ///     ],
465    ///     "analysis-session",
466    /// ).await?;
467    /// # Ok(())
468    /// # }
469    /// ```
470    pub async fn query_with_content_and_session(
471        &mut self,
472        content: impl Into<Vec<UserContentBlock>>,
473        session_id: impl Into<String>,
474    ) -> Result<()> {
475        let query_manager = self.query_manager.as_ref().ok_or_else(|| {
476            ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
477        })?;
478
479        let content_blocks: Vec<UserContentBlock> = content.into();
480        UserContentBlock::validate_content(&content_blocks)?;
481
482        let session_id_str = session_id.into();
483
484        // Create a new isolated query for each prompt (Codex-style architecture)
485        // This ensures complete message isolation between prompts
486        let query_id = query_manager.create_query().await?;
487        self.current_query_id = Some(query_id.clone());
488
489        // Get the isolated query
490        let query = query_manager.get_query(&query_id)?;
491
492        // Format as JSON message for stream-json input format
493        // Content is an array of content blocks, not a simple string
494        let user_message = serde_json::json!({
495            "type": "user",
496            "message": {
497                "role": "user",
498                "content": content_blocks
499            },
500            "session_id": session_id_str
501        });
502
503        let message_str = serde_json::to_string(&user_message).map_err(|e| {
504            ClaudeError::Transport(format!("Failed to serialize user message: {}", e))
505        })?;
506
507        // Write directly to stdin (bypasses transport lock)
508        let stdin = query.stdin.clone();
509
510        if let Some(stdin_arc) = stdin {
511            let mut stdin_guard = stdin_arc.lock().await;
512            if let Some(ref mut stdin_stream) = *stdin_guard {
513                stdin_stream
514                    .write_all(message_str.as_bytes())
515                    .await
516                    .map_err(|e| ClaudeError::Transport(format!("Failed to write query: {}", e)))?;
517                stdin_stream.write_all(b"\n").await.map_err(|e| {
518                    ClaudeError::Transport(format!("Failed to write newline: {}", e))
519                })?;
520                stdin_stream
521                    .flush()
522                    .await
523                    .map_err(|e| ClaudeError::Transport(format!("Failed to flush: {}", e)))?;
524            } else {
525                return Err(ClaudeError::Transport("stdin not available".to_string()));
526            }
527        } else {
528            return Err(ClaudeError::Transport("stdin not set".to_string()));
529        }
530
531        debug!(
532            query_id = %query_id,
533            session_id = %session_id_str,
534            "Sent content query to isolated query"
535        );
536
537        Ok(())
538    }
539
540    /// Receive all messages as a stream (continuous)
541    ///
542    /// This method returns a stream that yields all messages from Claude
543    /// indefinitely until the stream is closed or an error occurs.
544    ///
545    /// Use this when you want to process all messages, including multiple
546    /// responses and system events.
547    ///
548    /// # Returns
549    ///
550    /// A stream of `Result<Message>` that continues until the connection closes.
551    ///
552    /// # Example
553    ///
554    /// ```no_run
555    /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
556    /// # use futures::StreamExt;
557    /// # #[tokio::main]
558    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
559    /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
560    /// # client.connect().await?;
561    /// # client.query("Hello").await?;
562    /// let mut stream = client.receive_messages();
563    /// while let Some(message) = stream.next().await {
564    ///     println!("Received: {:?}", message?);
565    /// }
566    /// # Ok(())
567    /// # }
568    /// ```
569    pub fn receive_messages(&self) -> Pin<Box<dyn Stream<Item = Result<Message>> + Send + '_>> {
570        let query_manager = match &self.query_manager {
571            Some(qm) => Arc::clone(qm),
572            None => {
573                return Box::pin(futures::stream::once(async {
574                    Err(ClaudeError::InvalidConfig(
575                        "Client not connected. Call connect() first.".to_string(),
576                    ))
577                }));
578            }
579        };
580
581        let query_id = match &self.current_query_id {
582            Some(id) => id.clone(),
583            None => {
584                return Box::pin(futures::stream::once(async {
585                    Err(ClaudeError::InvalidConfig(
586                        "No active query. Call query() first.".to_string(),
587                    ))
588                }));
589            }
590        };
591
592        Box::pin(async_stream::stream! {
593            // Get the isolated query and its message receiver
594            let query = match query_manager.get_query(&query_id) {
595                Ok(q) => q,
596                Err(e) => {
597                    yield Err(e);
598                    return;
599                }
600            };
601
602            let rx: Arc<Mutex<tokio::sync::mpsc::UnboundedReceiver<serde_json::Value>>> = {
603                Arc::clone(&query.message_rx)
604            };
605
606            loop {
607                let message = {
608                    let mut rx_guard = rx.lock().await;
609                    rx_guard.recv().await
610                };
611
612                match message {
613                    Some(json) => {
614                        match MessageParser::parse(json) {
615                            Ok(msg) => yield Ok(msg),
616                            Err(e) => {
617                                eprintln!("Failed to parse message: {}", e);
618                                yield Err(e);
619                            }
620                        }
621                    }
622                    None => break,
623                }
624            }
625        })
626    }
627
628    /// Receive messages until a ResultMessage
629    ///
630    /// This method returns a stream that yields messages until it encounters
631    /// a `ResultMessage`, which signals the completion of a Claude response.
632    ///
633    /// This is the most common pattern for handling Claude responses, as it
634    /// processes one complete "turn" of the conversation.
635    ///
636    /// This method uses query-scoped message channels to ensure message isolation,
637    /// preventing late-arriving ResultMessages from being consumed by the wrong prompt.
638    ///
639    /// # Returns
640    ///
641    /// A stream of `Result<Message>` that ends when a ResultMessage is received.
642    ///
643    /// # Example
644    ///
645    /// ```no_run
646    /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions, Message};
647    /// # use futures::StreamExt;
648    /// # #[tokio::main]
649    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
650    /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
651    /// # client.connect().await?;
652    /// # client.query("Hello").await?;
653    /// let mut stream = client.receive_response();
654    /// while let Some(message) = stream.next().await {
655    ///     match message? {
656    ///         Message::Assistant(msg) => println!("Assistant: {:?}", msg),
657    ///         Message::Result(result) => {
658    ///             println!("Done! Cost: ${:?}", result.total_cost_usd);
659    ///             break;
660    ///         }
661    ///         _ => {}
662    ///     }
663    /// }
664    /// # Ok(())
665    /// # }
666    /// ```
667    pub fn receive_response(&self) -> Pin<Box<dyn Stream<Item = Result<Message>> + Send + '_>> {
668        let query_manager = match &self.query_manager {
669            Some(qm) => Arc::clone(qm),
670            None => {
671                return Box::pin(futures::stream::once(async {
672                    Err(ClaudeError::InvalidConfig(
673                        "Client not connected. Call connect() first.".to_string(),
674                    ))
675                }));
676            }
677        };
678
679        let query_id = match &self.current_query_id {
680            Some(id) => id.clone(),
681            None => {
682                return Box::pin(futures::stream::once(async {
683                    Err(ClaudeError::InvalidConfig(
684                        "No active query. Call query() first.".to_string(),
685                    ))
686                }));
687            }
688        };
689
690        Box::pin(async_stream::stream! {
691            // ====================================================================
692            // ISOLATED QUERY MESSAGE CHANNEL (Codex-style)
693            // ====================================================================
694            // In the Codex-style architecture, each query has its own completely
695            // isolated QueryFull instance. We get the message receiver directly
696            // from the isolated query, eliminating the need for routing logic.
697            //
698            // This provides:
699            // - Complete message isolation
700            // - No possibility of message confusion
701            // - Simpler architecture without routing overhead
702            //
703            // Note: Cleanup is handled by the periodic cleanup task in QueryManager,
704            // which removes inactive queries whose channels have been closed.
705
706            debug!(
707                query_id = %query_id,
708                "Getting message receiver from isolated query"
709            );
710
711            // Get the isolated query and its message receiver
712            let query = match query_manager.get_query(&query_id) {
713                Ok(q) => q,
714                Err(e) => {
715                    yield Err(e);
716                    return;
717                }
718            };
719
720            // Get the message receiver from the isolated query
721            // In isolated mode, we directly access the message_rx without routing
722            let rx: Arc<Mutex<tokio::sync::mpsc::UnboundedReceiver<serde_json::Value>>> = {
723                Arc::clone(&query.message_rx)
724            };
725
726            loop {
727                let message = {
728                    let mut rx_guard = rx.lock().await;
729                    rx_guard.recv().await
730                };
731
732                match message {
733                    Some(json) => {
734                        match MessageParser::parse(json) {
735                            Ok(msg) => {
736                                let is_result = matches!(msg, Message::Result(_));
737                                yield Ok(msg);
738                                if is_result {
739                                    debug!(
740                                        query_id = %query_id,
741                                        "Received ResultMessage, ending stream"
742                                    );
743                                    // Cleanup will be handled by the periodic cleanup task
744                                    // when the query becomes inactive
745                                    break;
746                                }
747                            }
748                            Err(e) => {
749                                eprintln!("Failed to parse message: {}", e);
750                                yield Err(e);
751                            }
752                        }
753                    }
754                    None => {
755                        debug!(
756                            query_id = %query_id,
757                            "Isolated query channel closed"
758                        );
759                        // Cleanup will be handled by the periodic cleanup task
760                        break;
761                    }
762                }
763            }
764        })
765    }
766
767    /// Drain any leftover messages from the previous prompt
768    ///
769    /// This method removes any messages remaining in the channel from a previous
770    /// prompt. This should be called before starting a new prompt to ensure
771    /// that the new prompt doesn't receive stale messages.
772    ///
773    /// This is important when prompts are cancelled or end unexpectedly,
774    /// as there may be buffered messages that would otherwise be received
775    /// by the next prompt.
776    ///
777    /// # Returns
778    ///
779    /// The number of messages drained from the channel.
780    ///
781    /// # Example
782    ///
783    /// ```no_run
784    /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
785    /// # #[tokio::main]
786    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
787    /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
788    /// # client.connect().await?;
789    /// // Before starting a new prompt, drain any leftover messages
790    /// let drained = client.drain_messages().await;
791    /// if drained > 0 {
792    ///     eprintln!("Drained {} leftover messages from previous prompt", drained);
793    /// }
794    /// # Ok(())
795    /// # }
796    /// ```
797    pub async fn drain_messages(&self) -> usize {
798        let Some(query_manager) = &self.query_manager else {
799            return 0;
800        };
801
802        let Some(query_id) = &self.current_query_id else {
803            return 0;
804        };
805
806        let Ok(query) = query_manager.get_query(query_id) else {
807            return 0;
808        };
809
810        let rx: Arc<Mutex<tokio::sync::mpsc::UnboundedReceiver<serde_json::Value>>> = {
811            Arc::clone(&query.message_rx)
812        };
813
814        let mut count = 0;
815        // Use try_recv to drain all currently available messages without blocking
816        loop {
817            let mut rx_guard = rx.lock().await;
818            match rx_guard.try_recv() {
819                Ok(_) => count += 1,
820                Err(tokio::sync::mpsc::error::TryRecvError::Empty) => break,
821                Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => break,
822            }
823        }
824
825        if count > 0 {
826            debug!(count, "Drained leftover messages from previous prompt");
827        }
828
829        count
830    }
831
832    /// Send an interrupt signal to stop the current Claude operation
833    ///
834    /// This is analogous to Python's `client.interrupt()`.
835    ///
836    /// # Errors
837    ///
838    /// Returns an error if the client is not connected or if sending fails.
839    pub async fn interrupt(&self) -> Result<()> {
840        let query_manager = self.query_manager.as_ref().ok_or_else(|| {
841            ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
842        })?;
843
844        let query_id = self.current_query_id.as_ref().ok_or_else(|| {
845            ClaudeError::InvalidConfig("No active query. Call query() first.".to_string())
846        })?;
847
848        let query = query_manager.get_query(query_id)?;
849        query.interrupt().await
850    }
851
852    /// Change the permission mode dynamically
853    ///
854    /// This is analogous to Python's `client.set_permission_mode()`.
855    ///
856    /// # Arguments
857    ///
858    /// * `mode` - The new permission mode to set
859    ///
860    /// # Errors
861    ///
862    /// Returns an error if the client is not connected or if sending fails.
863    pub async fn set_permission_mode(&self, mode: PermissionMode) -> Result<()> {
864        let query_manager = self.query_manager.as_ref().ok_or_else(|| {
865            ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
866        })?;
867
868        let query_id = self.current_query_id.as_ref().ok_or_else(|| {
869            ClaudeError::InvalidConfig("No active query. Call query() first.".to_string())
870        })?;
871
872        let query = query_manager.get_query(query_id)?;
873        query.set_permission_mode(mode).await
874    }
875
876    /// Change the AI model dynamically
877    ///
878    /// This is analogous to Python's `client.set_model()`.
879    ///
880    /// # Arguments
881    ///
882    /// * `model` - The new model name, or None to use default
883    ///
884    /// # Errors
885    ///
886    /// Returns an error if the client is not connected or if sending fails.
887    pub async fn set_model(&self, model: Option<&str>) -> Result<()> {
888        let query_manager = self.query_manager.as_ref().ok_or_else(|| {
889            ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
890        })?;
891
892        let query_id = self.current_query_id.as_ref().ok_or_else(|| {
893            ClaudeError::InvalidConfig("No active query. Call query() first.".to_string())
894        })?;
895
896        let query = query_manager.get_query(query_id)?;
897        query.set_model(model).await
898    }
899
900    /// Rewind tracked files to their state at a specific user message.
901    ///
902    /// This is analogous to Python's `client.rewind_files()`.
903    ///
904    /// # Requirements
905    ///
906    /// - `enable_file_checkpointing=true` in options to track file changes
907    /// - `extra_args={"replay-user-messages": None}` to receive UserMessage
908    ///   objects with `uuid` in the response stream
909    ///
910    /// # Arguments
911    ///
912    /// * `user_message_id` - UUID of the user message to rewind to. This should be
913    ///   the `uuid` field from a `UserMessage` received during the conversation.
914    ///
915    /// # Errors
916    ///
917    /// Returns an error if the client is not connected or if sending fails.
918    ///
919    /// # Example
920    ///
921    /// ```no_run
922    /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions, Message};
923    /// # use std::collections::HashMap;
924    /// # #[tokio::main]
925    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
926    /// let options = ClaudeAgentOptions::builder()
927    ///     .enable_file_checkpointing(true)
928    ///     .extra_args(HashMap::from([("replay-user-messages".to_string(), None)]))
929    ///     .build();
930    /// let mut client = ClaudeClient::new(options);
931    /// client.connect().await?;
932    ///
933    /// client.query("Make some changes to my files").await?;
934    /// let mut checkpoint_id = None;
935    /// {
936    ///     let mut stream = client.receive_response();
937    ///     use futures::StreamExt;
938    ///     while let Some(Ok(msg)) = stream.next().await {
939    ///         if let Message::User(user_msg) = &msg {
940    ///             if let Some(uuid) = &user_msg.uuid {
941    ///                 checkpoint_id = Some(uuid.clone());
942    ///             }
943    ///         }
944    ///     }
945    /// }
946    ///
947    /// // Later, rewind to that point
948    /// if let Some(id) = checkpoint_id {
949    ///     client.rewind_files(&id).await?;
950    /// }
951    /// # Ok(())
952    /// # }
953    /// ```
954    pub async fn rewind_files(&self, user_message_id: &str) -> Result<()> {
955        let query_manager = self.query_manager.as_ref().ok_or_else(|| {
956            ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
957        })?;
958
959        let query_id = self.current_query_id.as_ref().ok_or_else(|| {
960            ClaudeError::InvalidConfig("No active query. Call query() first.".to_string())
961        })?;
962
963        let query = query_manager.get_query(query_id)?;
964        query.rewind_files(user_message_id).await
965    }
966
967    /// Get server initialization info including available commands and output styles
968    ///
969    /// Returns initialization information from the Claude Code server including:
970    /// - Available commands (slash commands, system commands, etc.)
971    /// - Current and available output styles
972    /// - Server capabilities
973    ///
974    /// This is analogous to Python's `client.get_server_info()`.
975    ///
976    /// # Returns
977    ///
978    /// Dictionary with server info, or None if not connected
979    ///
980    /// # Example
981    ///
982    /// ```no_run
983    /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
984    /// # #[tokio::main]
985    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
986    /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
987    /// # client.connect().await?;
988    /// if let Some(info) = client.get_server_info().await {
989    ///     println!("Commands available: {}", info.get("commands").map(|c| c.as_array().map(|a| a.len()).unwrap_or(0)).unwrap_or(0));
990    ///     println!("Output style: {:?}", info.get("output_style"));
991    /// }
992    /// # Ok(())
993    /// # }
994    /// ```
995    pub async fn get_server_info(&self) -> Option<serde_json::Value> {
996        let query_manager = self.query_manager.as_ref()?;
997        let query_id = self.current_query_id.as_ref()?;
998        let Ok(query) = query_manager.get_query(query_id) else {
999            return None;
1000        };
1001        query.get_initialization_result().await
1002    }
1003
1004    /// Start a new session by switching to a different session ID
1005    ///
1006    /// This is a convenience method that creates a new conversation context.
1007    /// It's equivalent to calling `query_with_session()` with a new session ID.
1008    ///
1009    /// To completely clear memory and start fresh, use `ClaudeAgentOptions::builder().fork_session(true).build()`
1010    /// when creating a new client.
1011    ///
1012    /// # Arguments
1013    ///
1014    /// * `session_id` - The new session ID to use
1015    /// * `prompt` - Initial message for the new session
1016    ///
1017    /// # Errors
1018    ///
1019    /// Returns an error if the client is not connected or if sending fails.
1020    ///
1021    /// # Example
1022    ///
1023    /// ```no_run
1024    /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
1025    /// # #[tokio::main]
1026    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
1027    /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
1028    /// # client.connect().await?;
1029    /// // First conversation
1030    /// client.query("Hello").await?;
1031    ///
1032    /// // Start new conversation with different context
1033    /// client.new_session("session-2", "Tell me about Rust").await?;
1034    /// # Ok(())
1035    /// # }
1036    /// ```
1037    pub async fn new_session(
1038        &mut self,
1039        session_id: impl Into<String>,
1040        prompt: impl Into<String>,
1041    ) -> Result<()> {
1042        self.query_with_session(prompt, session_id).await
1043    }
1044
1045    /// Disconnect from Claude (analogous to Python's __aexit__)
1046    ///
1047    /// This cleanly shuts down the connection to Claude Code CLI.
1048    ///
1049    /// # Errors
1050    ///
1051    /// Returns an error if disconnection fails.
1052    #[instrument(name = "claude.client.disconnect", skip(self))]
1053    pub async fn disconnect(&mut self) -> Result<()> {
1054        if !self.connected {
1055            debug!("Client already disconnected");
1056            return Ok(());
1057        }
1058
1059        info!("Disconnecting from Claude Code CLI (closing all isolated queries)");
1060
1061        if let Some(query_manager) = self.query_manager.take() {
1062            // Get all queries for cleanup
1063            let queries = query_manager.get_all_queries();
1064
1065            // Close all isolated queries by closing their resources
1066            // This signals each CLI process to exit
1067            for (_query_id, query) in &queries {
1068                // Close stdin (if available)
1069                if let Some(ref stdin_arc) = query.stdin {
1070                    let mut stdin_guard = stdin_arc.lock().await;
1071                    if let Some(mut stdin_stream) = stdin_guard.take() {
1072                        let _ = stdin_stream.shutdown().await;
1073                    }
1074                }
1075
1076                // Close transport
1077                let transport = Arc::clone(&query.transport);
1078                let mut transport_guard = transport.lock().await;
1079                let _ = transport_guard.close().await;
1080            }
1081
1082            // Give background tasks a moment to finish reading and release locks
1083            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1084        }
1085
1086        self.current_query_id = None;
1087        self.connected = false;
1088        debug!("Disconnected successfully");
1089        Ok(())
1090    }
1091}
1092
1093impl Drop for ClaudeClient {
1094    fn drop(&mut self) {
1095        // Note: We can't run async code in Drop, so we can't guarantee clean shutdown
1096        // Users should call disconnect() explicitly
1097        if self.connected {
1098            eprintln!(
1099                "Warning: ClaudeClient dropped without calling disconnect(). Resources may not be cleaned up properly."
1100            );
1101        }
1102    }
1103}
1104
1105#[cfg(test)]
1106mod tests {
1107    use super::*;
1108    use crate::types::permissions::{PermissionResult, PermissionResultAllow};
1109    use std::sync::Arc;
1110
1111    #[tokio::test]
1112    async fn test_connect_rejects_can_use_tool_with_custom_permission_tool() {
1113        let callback: crate::types::permissions::CanUseToolCallback =
1114            Arc::new(|_tool_name, _tool_input, _context| {
1115                Box::pin(async move { PermissionResult::Allow(PermissionResultAllow::default()) })
1116            });
1117
1118        let opts = ClaudeAgentOptions::builder()
1119            .can_use_tool(callback)
1120            .permission_prompt_tool_name("custom_tool") // Not "stdio"
1121            .build();
1122
1123        let mut client = ClaudeClient::new(opts);
1124        let result = client.connect().await;
1125
1126        assert!(result.is_err());
1127        let err = result.unwrap_err();
1128        assert!(matches!(err, ClaudeError::InvalidConfig(_)));
1129        assert!(err.to_string().contains("permission_prompt_tool_name"));
1130    }
1131
1132    #[tokio::test]
1133    async fn test_connect_accepts_can_use_tool_with_stdio() {
1134        let callback: crate::types::permissions::CanUseToolCallback =
1135            Arc::new(|_tool_name, _tool_input, _context| {
1136                Box::pin(async move { PermissionResult::Allow(PermissionResultAllow::default()) })
1137            });
1138
1139        let opts = ClaudeAgentOptions::builder()
1140            .can_use_tool(callback)
1141            .permission_prompt_tool_name("stdio") // Explicitly "stdio" is OK
1142            .build();
1143
1144        let mut client = ClaudeClient::new(opts);
1145        // This will fail later (CLI not found), but should pass validation
1146        let result = client.connect().await;
1147
1148        // Should not be InvalidConfig error about permission_prompt_tool_name
1149        if let Err(ref err) = result {
1150            assert!(
1151                !err.to_string().contains("permission_prompt_tool_name"),
1152                "Should not fail on permission_prompt_tool_name validation"
1153            );
1154        }
1155    }
1156
1157    #[tokio::test]
1158    async fn test_connect_accepts_can_use_tool_without_permission_tool() {
1159        let callback: crate::types::permissions::CanUseToolCallback =
1160            Arc::new(|_tool_name, _tool_input, _context| {
1161                Box::pin(async move { PermissionResult::Allow(PermissionResultAllow::default()) })
1162            });
1163
1164        let opts = ClaudeAgentOptions::builder()
1165            .can_use_tool(callback)
1166            // No permission_prompt_tool_name set - defaults to stdio
1167            .build();
1168
1169        let mut client = ClaudeClient::new(opts);
1170        // This will fail later (CLI not found), but should pass validation
1171        let result = client.connect().await;
1172
1173        // Should not be InvalidConfig error about permission_prompt_tool_name
1174        if let Err(ref err) = result {
1175            assert!(
1176                !err.to_string().contains("permission_prompt_tool_name"),
1177                "Should not fail on permission_prompt_tool_name validation"
1178            );
1179        }
1180    }
1181}