claude_code_agent_sdk/
client.rs

1//! ClaudeClient for bidirectional streaming interactions with hook support
2
3use tracing::{debug, info, instrument};
4
5use futures::stream::Stream;
6use std::pin::Pin;
7use std::sync::Arc;
8use tokio::io::AsyncWriteExt;
9use tokio::sync::Mutex;
10
11use crate::errors::{ClaudeError, Result};
12use crate::internal::message_parser::MessageParser;
13use crate::internal::query_manager::QueryManager;
14use crate::internal::transport::subprocess::QueryPrompt;
15use crate::internal::transport::{SubprocessTransport, Transport};
16use crate::types::config::{ClaudeAgentOptions, PermissionMode};
17use crate::types::efficiency::{build_efficiency_hooks, merge_hooks};
18use crate::types::hooks::HookEvent;
19use crate::types::messages::{Message, UserContentBlock};
20
21/// Client for bidirectional streaming interactions with Claude
22///
23/// This client provides the same functionality as Python's ClaudeSDKClient,
24/// supporting bidirectional communication, streaming responses, and dynamic
25/// control over the Claude session.
26///
27/// This implementation uses the Codex-style architecture with isolated queries,
28/// where each query has its own completely independent QueryFull instance.
29/// This ensures complete message isolation and prevents ResultMessage confusion.
30///
31/// # Example
32///
33/// ```no_run
34/// use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
35/// use futures::StreamExt;
36///
37/// #[tokio::main]
38/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
39///     let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
40///
41///     // Connect to Claude
42///     client.connect().await?;
43///
44///     // Send a query
45///     client.query("Hello Claude!").await?;
46///
47///     // Receive response as a stream
48///     {
49///         let mut stream = client.receive_response();
50///         while let Some(message) = stream.next().await {
51///             println!("Received: {:?}", message?);
52///         }
53///     }
54///
55///     // Disconnect
56///     client.disconnect().await?;
57///     Ok(())
58/// }
59/// ```
60pub struct ClaudeClient {
61    options: ClaudeAgentOptions,
62    /// QueryManager for creating isolated queries (Codex-style architecture)
63    query_manager: Option<Arc<QueryManager>>,
64    /// Current query_id for the active prompt
65    current_query_id: Option<String>,
66    connected: bool,
67}
68
69impl ClaudeClient {
70    /// Create a new ClaudeClient
71    ///
72    /// # Arguments
73    ///
74    /// * `options` - Configuration options for the Claude client
75    ///
76    /// # Example
77    ///
78    /// ```no_run
79    /// use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
80    ///
81    /// let client = ClaudeClient::new(ClaudeAgentOptions::default());
82    /// ```
83    pub fn new(options: ClaudeAgentOptions) -> Self {
84        Self {
85            options,
86            query_manager: None,
87            current_query_id: None,
88            connected: false,
89        }
90    }
91
92    /// Create a new ClaudeClient with early validation
93    ///
94    /// Unlike `new()`, this validates the configuration eagerly by attempting
95    /// to create the transport. This catches issues like invalid working directory
96    /// or missing CLI before `connect()` is called.
97    ///
98    /// # Arguments
99    ///
100    /// * `options` - Configuration options for the Claude client
101    ///
102    /// # Errors
103    ///
104    /// Returns an error if:
105    /// - The working directory does not exist or is not a directory
106    /// - Claude CLI cannot be found
107    ///
108    /// # Example
109    ///
110    /// ```no_run
111    /// use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
112    ///
113    /// let client = ClaudeClient::try_new(ClaudeAgentOptions::default())?;
114    /// # Ok::<(), claude_agent_sdk_rs::ClaudeError>(())
115    /// ```
116    pub fn try_new(options: ClaudeAgentOptions) -> Result<Self> {
117        // Validate by attempting to create transport (but don't keep it)
118        let prompt = QueryPrompt::Streaming;
119        let _ = SubprocessTransport::new(prompt, options.clone())?;
120
121        Ok(Self {
122            options,
123            query_manager: None,
124            current_query_id: None,
125            connected: false,
126        })
127    }
128
129    /// Connect to Claude (analogous to Python's __aenter__)
130    ///
131    /// This establishes the connection to the Claude Code CLI and initializes
132    /// the bidirectional communication channel.
133    ///
134    /// Uses the Codex-style architecture with QueryManager for complete
135    /// message isolation between queries.
136    ///
137    /// # Errors
138    ///
139    /// Returns an error if:
140    /// - Claude CLI cannot be found or started
141    /// - The initialization handshake fails
142    /// - Hook registration fails
143    /// - `can_use_tool` callback is set with incompatible `permission_prompt_tool_name`
144    #[instrument(
145        name = "claude.client.connect",
146        skip(self),
147        fields(
148            has_can_use_tool = self.options.can_use_tool.is_some(),
149            has_hooks = self.options.hooks.is_some(),
150            model = %self.options.model.as_deref().unwrap_or("default"),
151        )
152    )]
153    pub async fn connect(&mut self) -> Result<()> {
154        if self.connected {
155            debug!("Client already connected, skipping");
156            return Ok(());
157        }
158
159        info!("Connecting to Claude Code CLI (using QueryManager for isolated queries)");
160
161        // Automatically set permission_prompt_tool_name to "stdio" when can_use_tool is provided
162        // This matches Python SDK behavior (client.py lines 106-122)
163        // which ensures CLI uses control protocol for permission prompts
164        let mut options = self.options.clone();
165        if options.can_use_tool.is_some() && options.permission_prompt_tool_name.is_none() {
166            info!("can_use_tool callback is set, automatically setting permission_prompt_tool_name to 'stdio'");
167            options.permission_prompt_tool_name = Some("stdio".to_string());
168        }
169
170        // Validate can_use_tool configuration (aligned with Python SDK behavior)
171        // When can_use_tool callback is set, permission_prompt_tool_name must be "stdio"
172        // to ensure the control protocol can handle permission requests
173        if options.can_use_tool.is_some()
174            && let Some(ref tool_name) = options.permission_prompt_tool_name
175            && tool_name != "stdio"
176        {
177            return Err(ClaudeError::InvalidConfig(
178                        "can_use_tool callback requires permission_prompt_tool_name to be 'stdio' or unset. \
179                        Custom permission_prompt_tool_name is incompatible with can_use_tool callback."
180                            .to_string(),
181                    ));
182        }
183
184        // Prepare hooks for initialization
185        // Build efficiency hooks if configured
186        let efficiency_hooks = self
187            .options
188            .efficiency
189            .as_ref()
190            .map(build_efficiency_hooks)
191            .unwrap_or_default();
192
193        // Merge user hooks with efficiency hooks
194        let merged_hooks = merge_hooks(self.options.hooks.clone(), efficiency_hooks);
195
196        // Convert hooks to internal format
197        let hooks = merged_hooks.as_ref().map(|hooks_map| {
198            hooks_map
199                .iter()
200                .map(|(event, matchers)| {
201                    let event_name = match event {
202                        HookEvent::PreToolUse => "PreToolUse",
203                        HookEvent::PostToolUse => "PostToolUse",
204                        HookEvent::UserPromptSubmit => "UserPromptSubmit",
205                        HookEvent::Stop => "Stop",
206                        HookEvent::SubagentStop => "SubagentStop",
207                        HookEvent::PreCompact => "PreCompact",
208                    };
209                    (event_name.to_string(), matchers.clone())
210                })
211                .collect()
212        });
213
214        // Extract SDK MCP servers from options
215        let sdk_mcp_servers =
216            if let crate::types::mcp::McpServers::Dict(servers_dict) = &self.options.mcp_servers {
217                servers_dict
218                    .iter()
219                    .filter_map(|(name, config)| {
220                        if let crate::types::mcp::McpServerConfig::Sdk(sdk_config) = config {
221                            Some((name.clone(), sdk_config.clone()))
222                        } else {
223                            None
224                        }
225                    })
226                    .collect()
227            } else {
228                std::collections::HashMap::new()
229            };
230
231        // Clone options for the transport factory
232        let options_clone = options.clone();
233
234        // Create QueryManager with a transport factory
235        // The factory creates new transports for each isolated query
236        // Note: connect() is called later in create_query(), not here
237        let mut query_manager = QueryManager::new(move || {
238            let prompt = QueryPrompt::Streaming;
239            let transport = SubprocessTransport::new(prompt, options_clone.clone())?;
240            Ok(Box::new(transport) as Box<dyn Transport>)
241        });
242
243        // Set control request timeout
244        query_manager.set_control_request_timeout(self.options.control_request_timeout);
245
246        // Set configuration on QueryManager
247        query_manager.set_hooks(hooks).await;
248        query_manager.set_sdk_mcp_servers(sdk_mcp_servers).await;
249        query_manager.set_can_use_tool(self.options.can_use_tool.clone()).await;
250
251        let query_manager = Arc::new(query_manager);
252
253        // Create the first query for initialization
254        let first_query_id = query_manager.create_query().await?;
255
256        // Start cleanup task for inactive queries
257        Arc::clone(&query_manager).start_cleanup_task();
258
259        self.query_manager = Some(query_manager);
260        self.current_query_id = Some(first_query_id);
261        self.connected = true;
262
263        info!("Successfully connected to Claude Code CLI with QueryManager");
264        Ok(())
265    }
266
267    /// Send a query to Claude
268    ///
269    /// This sends a new user prompt to Claude. Claude will remember the context
270    /// of previous queries within the same session.
271    ///
272    /// # Arguments
273    ///
274    /// * `prompt` - The user prompt to send
275    ///
276    /// # Errors
277    ///
278    /// Returns an error if the client is not connected or if sending fails.
279    ///
280    /// # Example
281    ///
282    /// ```no_run
283    /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
284    /// # #[tokio::main]
285    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
286    /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
287    /// # client.connect().await?;
288    /// client.query("What is 2 + 2?").await?;
289    /// # Ok(())
290    /// # }
291    /// ```
292    #[instrument(
293        name = "claude.client.query",
294        skip(self, prompt),
295        fields(session_id = "default",)
296    )]
297    pub async fn query(&mut self, prompt: impl Into<String>) -> Result<()> {
298        self.query_with_session(prompt, "default").await
299    }
300
301    /// Send a query to Claude with a specific session ID
302    ///
303    /// This sends a new user prompt to Claude. Different session IDs maintain
304    /// separate conversation contexts.
305    ///
306    /// # Arguments
307    ///
308    /// * `prompt` - The user prompt to send
309    /// * `session_id` - Session identifier for the conversation
310    ///
311    /// # Errors
312    ///
313    /// Returns an error if the client is not connected or if sending fails.
314    ///
315    /// # Example
316    ///
317    /// ```no_run
318    /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
319    /// # #[tokio::main]
320    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
321    /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
322    /// # client.connect().await?;
323    /// // Separate conversation contexts
324    /// client.query_with_session("First question", "session-1").await?;
325    /// client.query_with_session("Different question", "session-2").await?;
326    /// # Ok(())
327    /// # }
328    /// ```
329    pub async fn query_with_session(
330        &mut self,
331        prompt: impl Into<String>,
332        session_id: impl Into<String>,
333    ) -> Result<()> {
334        let query_manager = self.query_manager.as_ref().ok_or_else(|| {
335            ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
336        })?;
337
338        let prompt_str = prompt.into();
339        let session_id_str = session_id.into();
340
341        // Get or create query for this session (reuses existing query to maintain context)
342        let query_id = query_manager.get_or_create_session_query(&session_id_str).await?;
343        self.current_query_id = Some(query_id.clone());
344
345        // Get the isolated query
346        let query = query_manager.get_query(&query_id)?;
347
348        // Drain any pending messages from previous (cancelled) prompts
349        // This prevents receiving stale messages when reusing a query
350        query.drain_pending_messages().await;
351
352        // Format as JSON message for stream-json input format
353        let user_message = serde_json::json!({
354            "type": "user",
355            "message": {
356                "role": "user",
357                "content": prompt_str
358            },
359            "session_id": session_id_str
360        });
361
362        let message_str = serde_json::to_string(&user_message).map_err(|e| {
363            ClaudeError::Transport(format!("Failed to serialize user message: {}", e))
364        })?;
365
366        // Write directly to stdin (bypasses transport lock)
367        let stdin = query.stdin.clone();
368
369        if let Some(stdin_arc) = stdin {
370            let mut stdin_guard = stdin_arc.lock().await;
371            if let Some(ref mut stdin_stream) = *stdin_guard {
372                stdin_stream
373                    .write_all(message_str.as_bytes())
374                    .await
375                    .map_err(|e| ClaudeError::Transport(format!("Failed to write query: {}", e)))?;
376                stdin_stream.write_all(b"\n").await.map_err(|e| {
377                    ClaudeError::Transport(format!("Failed to write newline: {}", e))
378                })?;
379                stdin_stream
380                    .flush()
381                    .await
382                    .map_err(|e| ClaudeError::Transport(format!("Failed to flush: {}", e)))?;
383            } else {
384                return Err(ClaudeError::Transport("stdin not available".to_string()));
385            }
386        } else {
387            return Err(ClaudeError::Transport("stdin not set".to_string()));
388        }
389
390        debug!(
391            query_id = %query_id,
392            session_id = %session_id_str,
393            "Sent query to isolated query"
394        );
395
396        Ok(())
397    }
398
399    /// Send a query with structured content blocks (supports images)
400    ///
401    /// This method enables multimodal queries in bidirectional streaming mode.
402    /// Use it to send images alongside text for vision-related tasks.
403    ///
404    /// # Arguments
405    ///
406    /// * `content` - A vector of content blocks (text and/or images)
407    ///
408    /// # Errors
409    ///
410    /// Returns an error if:
411    /// - The content vector is empty (must include at least one text or image block)
412    /// - The client is not connected (call `connect()` first)
413    /// - Sending the message fails
414    ///
415    /// # Example
416    ///
417    /// ```no_run
418    /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions, UserContentBlock};
419    /// # #[tokio::main]
420    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
421    /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
422    /// # client.connect().await?;
423    /// let base64_data = "iVBORw0KGgo..."; // base64 encoded image
424    /// client.query_with_content(vec![
425    ///     UserContentBlock::text("What's in this image?"),
426    ///     UserContentBlock::image_base64("image/png", base64_data)?,
427    /// ]).await?;
428    /// # Ok(())
429    /// # }
430    /// ```
431    pub async fn query_with_content(
432        &mut self,
433        content: impl Into<Vec<UserContentBlock>>,
434    ) -> Result<()> {
435        self.query_with_content_and_session(content, "default")
436            .await
437    }
438
439    /// Send a query with structured content blocks and a specific session ID
440    ///
441    /// This method enables multimodal queries with session management for
442    /// maintaining separate conversation contexts.
443    ///
444    /// # Arguments
445    ///
446    /// * `content` - A vector of content blocks (text and/or images)
447    /// * `session_id` - Session identifier for the conversation
448    ///
449    /// # Errors
450    ///
451    /// Returns an error if:
452    /// - The content vector is empty (must include at least one text or image block)
453    /// - The client is not connected (call `connect()` first)
454    /// - Sending the message fails
455    ///
456    /// # Example
457    ///
458    /// ```no_run
459    /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions, UserContentBlock};
460    /// # #[tokio::main]
461    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
462    /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
463    /// # client.connect().await?;
464    /// client.query_with_content_and_session(
465    ///     vec![
466    ///         UserContentBlock::text("Analyze this chart"),
467    ///         UserContentBlock::image_url("https://example.com/chart.png"),
468    ///     ],
469    ///     "analysis-session",
470    /// ).await?;
471    /// # Ok(())
472    /// # }
473    /// ```
474    pub async fn query_with_content_and_session(
475        &mut self,
476        content: impl Into<Vec<UserContentBlock>>,
477        session_id: impl Into<String>,
478    ) -> Result<()> {
479        let query_manager = self.query_manager.as_ref().ok_or_else(|| {
480            ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
481        })?;
482
483        let content_blocks: Vec<UserContentBlock> = content.into();
484        UserContentBlock::validate_content(&content_blocks)?;
485
486        let session_id_str = session_id.into();
487
488        // Get or create query for this session (reuses existing query to maintain context)
489        let query_id = query_manager.get_or_create_session_query(&session_id_str).await?;
490        self.current_query_id = Some(query_id.clone());
491
492        // Get the isolated query
493        let query = query_manager.get_query(&query_id)?;
494
495        // Drain any pending messages from previous (cancelled) prompts
496        // This prevents receiving stale messages when reusing a query
497        query.drain_pending_messages().await;
498
499        // Format as JSON message for stream-json input format
500        // Content is an array of content blocks, not a simple string
501        let user_message = serde_json::json!({
502            "type": "user",
503            "message": {
504                "role": "user",
505                "content": content_blocks
506            },
507            "session_id": session_id_str
508        });
509
510        let message_str = serde_json::to_string(&user_message).map_err(|e| {
511            ClaudeError::Transport(format!("Failed to serialize user message: {}", e))
512        })?;
513
514        // Write directly to stdin (bypasses transport lock)
515        let stdin = query.stdin.clone();
516
517        if let Some(stdin_arc) = stdin {
518            let mut stdin_guard = stdin_arc.lock().await;
519            if let Some(ref mut stdin_stream) = *stdin_guard {
520                stdin_stream
521                    .write_all(message_str.as_bytes())
522                    .await
523                    .map_err(|e| ClaudeError::Transport(format!("Failed to write query: {}", e)))?;
524                stdin_stream.write_all(b"\n").await.map_err(|e| {
525                    ClaudeError::Transport(format!("Failed to write newline: {}", e))
526                })?;
527                stdin_stream
528                    .flush()
529                    .await
530                    .map_err(|e| ClaudeError::Transport(format!("Failed to flush: {}", e)))?;
531            } else {
532                return Err(ClaudeError::Transport("stdin not available".to_string()));
533            }
534        } else {
535            return Err(ClaudeError::Transport("stdin not set".to_string()));
536        }
537
538        debug!(
539            query_id = %query_id,
540            session_id = %session_id_str,
541            "Sent content query to isolated query"
542        );
543
544        Ok(())
545    }
546
547    /// Receive all messages as a stream (continuous)
548    ///
549    /// This method returns a stream that yields all messages from Claude
550    /// indefinitely until the stream is closed or an error occurs.
551    ///
552    /// Use this when you want to process all messages, including multiple
553    /// responses and system events.
554    ///
555    /// # Returns
556    ///
557    /// A stream of `Result<Message>` that continues until the connection closes.
558    ///
559    /// # Example
560    ///
561    /// ```no_run
562    /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
563    /// # use futures::StreamExt;
564    /// # #[tokio::main]
565    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
566    /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
567    /// # client.connect().await?;
568    /// # client.query("Hello").await?;
569    /// let mut stream = client.receive_messages();
570    /// while let Some(message) = stream.next().await {
571    ///     println!("Received: {:?}", message?);
572    /// }
573    /// # Ok(())
574    /// # }
575    /// ```
576    pub fn receive_messages(&self) -> Pin<Box<dyn Stream<Item = Result<Message>> + Send + '_>> {
577        let query_manager = match &self.query_manager {
578            Some(qm) => Arc::clone(qm),
579            None => {
580                return Box::pin(futures::stream::once(async {
581                    Err(ClaudeError::InvalidConfig(
582                        "Client not connected. Call connect() first.".to_string(),
583                    ))
584                }));
585            }
586        };
587
588        let query_id = match &self.current_query_id {
589            Some(id) => id.clone(),
590            None => {
591                return Box::pin(futures::stream::once(async {
592                    Err(ClaudeError::InvalidConfig(
593                        "No active query. Call query() first.".to_string(),
594                    ))
595                }));
596            }
597        };
598
599        Box::pin(async_stream::stream! {
600            // Get the isolated query and its message receiver
601            let query = match query_manager.get_query(&query_id) {
602                Ok(q) => q,
603                Err(e) => {
604                    yield Err(e);
605                    return;
606                }
607            };
608
609            let rx: Arc<Mutex<tokio::sync::mpsc::UnboundedReceiver<serde_json::Value>>> = {
610                Arc::clone(&query.message_rx)
611            };
612
613            loop {
614                let message = {
615                    let mut rx_guard = rx.lock().await;
616                    rx_guard.recv().await
617                };
618
619                match message {
620                    Some(json) => {
621                        match MessageParser::parse(json) {
622                            Ok(msg) => yield Ok(msg),
623                            Err(e) => {
624                                eprintln!("Failed to parse message: {}", e);
625                                yield Err(e);
626                            }
627                        }
628                    }
629                    None => break,
630                }
631            }
632        })
633    }
634
635    /// Receive messages until a ResultMessage
636    ///
637    /// This method returns a stream that yields messages until it encounters
638    /// a `ResultMessage`, which signals the completion of a Claude response.
639    ///
640    /// This is the most common pattern for handling Claude responses, as it
641    /// processes one complete "turn" of the conversation.
642    ///
643    /// This method uses query-scoped message channels to ensure message isolation,
644    /// preventing late-arriving ResultMessages from being consumed by the wrong prompt.
645    ///
646    /// # Returns
647    ///
648    /// A stream of `Result<Message>` that ends when a ResultMessage is received.
649    ///
650    /// # Example
651    ///
652    /// ```no_run
653    /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions, Message};
654    /// # use futures::StreamExt;
655    /// # #[tokio::main]
656    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
657    /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
658    /// # client.connect().await?;
659    /// # client.query("Hello").await?;
660    /// let mut stream = client.receive_response();
661    /// while let Some(message) = stream.next().await {
662    ///     match message? {
663    ///         Message::Assistant(msg) => println!("Assistant: {:?}", msg),
664    ///         Message::Result(result) => {
665    ///             println!("Done! Cost: ${:?}", result.total_cost_usd);
666    ///             break;
667    ///         }
668    ///         _ => {}
669    ///     }
670    /// }
671    /// # Ok(())
672    /// # }
673    /// ```
674    pub fn receive_response(&self) -> Pin<Box<dyn Stream<Item = Result<Message>> + Send + '_>> {
675        let query_manager = match &self.query_manager {
676            Some(qm) => Arc::clone(qm),
677            None => {
678                return Box::pin(futures::stream::once(async {
679                    Err(ClaudeError::InvalidConfig(
680                        "Client not connected. Call connect() first.".to_string(),
681                    ))
682                }));
683            }
684        };
685
686        let query_id = match &self.current_query_id {
687            Some(id) => id.clone(),
688            None => {
689                return Box::pin(futures::stream::once(async {
690                    Err(ClaudeError::InvalidConfig(
691                        "No active query. Call query() first.".to_string(),
692                    ))
693                }));
694            }
695        };
696
697        Box::pin(async_stream::stream! {
698            // ====================================================================
699            // ISOLATED QUERY MESSAGE CHANNEL (Codex-style)
700            // ====================================================================
701            // In the Codex-style architecture, each query has its own completely
702            // isolated QueryFull instance. We get the message receiver directly
703            // from the isolated query, eliminating the need for routing logic.
704            //
705            // This provides:
706            // - Complete message isolation
707            // - No possibility of message confusion
708            // - Simpler architecture without routing overhead
709            //
710            // Note: Cleanup is handled by the periodic cleanup task in QueryManager,
711            // which removes inactive queries whose channels have been closed.
712
713            debug!(
714                query_id = %query_id,
715                "Getting message receiver from isolated query"
716            );
717
718            // Get the isolated query and its message receiver
719            let query = match query_manager.get_query(&query_id) {
720                Ok(q) => q,
721                Err(e) => {
722                    yield Err(e);
723                    return;
724                }
725            };
726
727            // Get the message receiver from the isolated query
728            // In isolated mode, we directly access the message_rx without routing
729            let rx: Arc<Mutex<tokio::sync::mpsc::UnboundedReceiver<serde_json::Value>>> = {
730                Arc::clone(&query.message_rx)
731            };
732
733            loop {
734                let message = {
735                    let mut rx_guard = rx.lock().await;
736                    rx_guard.recv().await
737                };
738
739                match message {
740                    Some(json) => {
741                        match MessageParser::parse(json) {
742                            Ok(msg) => {
743                                let is_result = matches!(msg, Message::Result(_));
744                                yield Ok(msg);
745                                if is_result {
746                                    debug!(
747                                        query_id = %query_id,
748                                        "Received ResultMessage, ending stream"
749                                    );
750                                    // Cleanup will be handled by the periodic cleanup task
751                                    // when the query becomes inactive
752                                    break;
753                                }
754                            }
755                            Err(e) => {
756                                eprintln!("Failed to parse message: {}", e);
757                                yield Err(e);
758                            }
759                        }
760                    }
761                    None => {
762                        debug!(
763                            query_id = %query_id,
764                            "Isolated query channel closed"
765                        );
766                        // Cleanup will be handled by the periodic cleanup task
767                        break;
768                    }
769                }
770            }
771        })
772    }
773
774    /// Drain any leftover messages from the previous prompt
775    ///
776    /// This method removes any messages remaining in the channel from a previous
777    /// prompt. This should be called before starting a new prompt to ensure
778    /// that the new prompt doesn't receive stale messages.
779    ///
780    /// This is important when prompts are cancelled or end unexpectedly,
781    /// as there may be buffered messages that would otherwise be received
782    /// by the next prompt.
783    ///
784    /// # Returns
785    ///
786    /// The number of messages drained from the channel.
787    ///
788    /// # Example
789    ///
790    /// ```no_run
791    /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
792    /// # #[tokio::main]
793    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
794    /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
795    /// # client.connect().await?;
796    /// // Before starting a new prompt, drain any leftover messages
797    /// let drained = client.drain_messages().await;
798    /// if drained > 0 {
799    ///     eprintln!("Drained {} leftover messages from previous prompt", drained);
800    /// }
801    /// # Ok(())
802    /// # }
803    /// ```
804    pub async fn drain_messages(&self) -> usize {
805        let Some(query_manager) = &self.query_manager else {
806            return 0;
807        };
808
809        let Some(query_id) = &self.current_query_id else {
810            return 0;
811        };
812
813        let Ok(query) = query_manager.get_query(query_id) else {
814            return 0;
815        };
816
817        let rx: Arc<Mutex<tokio::sync::mpsc::UnboundedReceiver<serde_json::Value>>> = {
818            Arc::clone(&query.message_rx)
819        };
820
821        let mut count = 0;
822        // Use try_recv to drain all currently available messages without blocking
823        loop {
824            let mut rx_guard = rx.lock().await;
825            match rx_guard.try_recv() {
826                Ok(_) => count += 1,
827                Err(tokio::sync::mpsc::error::TryRecvError::Empty) => break,
828                Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => break,
829            }
830        }
831
832        if count > 0 {
833            debug!(count, "Drained leftover messages from previous prompt");
834        }
835
836        count
837    }
838
839    /// Send an interrupt signal to stop the current Claude operation
840    ///
841    /// This is analogous to Python's `client.interrupt()`.
842    ///
843    /// # Errors
844    ///
845    /// Returns an error if the client is not connected or if sending fails.
846    pub async fn interrupt(&self) -> Result<()> {
847        let query_manager = self.query_manager.as_ref().ok_or_else(|| {
848            ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
849        })?;
850
851        let query_id = self.current_query_id.as_ref().ok_or_else(|| {
852            ClaudeError::InvalidConfig("No active query. Call query() first.".to_string())
853        })?;
854
855        let query = query_manager.get_query(query_id)?;
856        query.interrupt().await
857    }
858
859    /// Change the permission mode dynamically
860    ///
861    /// This is analogous to Python's `client.set_permission_mode()`.
862    ///
863    /// # Arguments
864    ///
865    /// * `mode` - The new permission mode to set
866    ///
867    /// # Errors
868    ///
869    /// Returns an error if the client is not connected or if sending fails.
870    pub async fn set_permission_mode(&self, mode: PermissionMode) -> Result<()> {
871        let query_manager = self.query_manager.as_ref().ok_or_else(|| {
872            ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
873        })?;
874
875        let query_id = self.current_query_id.as_ref().ok_or_else(|| {
876            ClaudeError::InvalidConfig("No active query. Call query() first.".to_string())
877        })?;
878
879        let query = query_manager.get_query(query_id)?;
880        query.set_permission_mode(mode).await
881    }
882
883    /// Change the AI model dynamically
884    ///
885    /// This is analogous to Python's `client.set_model()`.
886    ///
887    /// # Arguments
888    ///
889    /// * `model` - The new model name, or None to use default
890    ///
891    /// # Errors
892    ///
893    /// Returns an error if the client is not connected or if sending fails.
894    pub async fn set_model(&self, model: Option<&str>) -> Result<()> {
895        let query_manager = self.query_manager.as_ref().ok_or_else(|| {
896            ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
897        })?;
898
899        let query_id = self.current_query_id.as_ref().ok_or_else(|| {
900            ClaudeError::InvalidConfig("No active query. Call query() first.".to_string())
901        })?;
902
903        let query = query_manager.get_query(query_id)?;
904        query.set_model(model).await
905    }
906
907    /// Rewind tracked files to their state at a specific user message.
908    ///
909    /// This is analogous to Python's `client.rewind_files()`.
910    ///
911    /// # Requirements
912    ///
913    /// - `enable_file_checkpointing=true` in options to track file changes
914    /// - `extra_args={"replay-user-messages": None}` to receive UserMessage
915    ///   objects with `uuid` in the response stream
916    ///
917    /// # Arguments
918    ///
919    /// * `user_message_id` - UUID of the user message to rewind to. This should be
920    ///   the `uuid` field from a `UserMessage` received during the conversation.
921    ///
922    /// # Errors
923    ///
924    /// Returns an error if the client is not connected or if sending fails.
925    ///
926    /// # Example
927    ///
928    /// ```no_run
929    /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions, Message};
930    /// # use std::collections::HashMap;
931    /// # #[tokio::main]
932    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
933    /// let options = ClaudeAgentOptions::builder()
934    ///     .enable_file_checkpointing(true)
935    ///     .extra_args(HashMap::from([("replay-user-messages".to_string(), None)]))
936    ///     .build();
937    /// let mut client = ClaudeClient::new(options);
938    /// client.connect().await?;
939    ///
940    /// client.query("Make some changes to my files").await?;
941    /// let mut checkpoint_id = None;
942    /// {
943    ///     let mut stream = client.receive_response();
944    ///     use futures::StreamExt;
945    ///     while let Some(Ok(msg)) = stream.next().await {
946    ///         if let Message::User(user_msg) = &msg {
947    ///             if let Some(uuid) = &user_msg.uuid {
948    ///                 checkpoint_id = Some(uuid.clone());
949    ///             }
950    ///         }
951    ///     }
952    /// }
953    ///
954    /// // Later, rewind to that point
955    /// if let Some(id) = checkpoint_id {
956    ///     client.rewind_files(&id).await?;
957    /// }
958    /// # Ok(())
959    /// # }
960    /// ```
961    pub async fn rewind_files(&self, user_message_id: &str) -> Result<()> {
962        let query_manager = self.query_manager.as_ref().ok_or_else(|| {
963            ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
964        })?;
965
966        let query_id = self.current_query_id.as_ref().ok_or_else(|| {
967            ClaudeError::InvalidConfig("No active query. Call query() first.".to_string())
968        })?;
969
970        let query = query_manager.get_query(query_id)?;
971        query.rewind_files(user_message_id).await
972    }
973
974    /// Get server initialization info including available commands and output styles
975    ///
976    /// Returns initialization information from the Claude Code server including:
977    /// - Available commands (slash commands, system commands, etc.)
978    /// - Current and available output styles
979    /// - Server capabilities
980    ///
981    /// This is analogous to Python's `client.get_server_info()`.
982    ///
983    /// # Returns
984    ///
985    /// Dictionary with server info, or None if not connected
986    ///
987    /// # Example
988    ///
989    /// ```no_run
990    /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
991    /// # #[tokio::main]
992    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
993    /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
994    /// # client.connect().await?;
995    /// if let Some(info) = client.get_server_info().await {
996    ///     println!("Commands available: {}", info.get("commands").map(|c| c.as_array().map(|a| a.len()).unwrap_or(0)).unwrap_or(0));
997    ///     println!("Output style: {:?}", info.get("output_style"));
998    /// }
999    /// # Ok(())
1000    /// # }
1001    /// ```
1002    pub async fn get_server_info(&self) -> Option<serde_json::Value> {
1003        let query_manager = self.query_manager.as_ref()?;
1004        let query_id = self.current_query_id.as_ref()?;
1005        let Ok(query) = query_manager.get_query(query_id) else {
1006            return None;
1007        };
1008        query.get_initialization_result().await
1009    }
1010
1011    /// Start a new session by switching to a different session ID
1012    ///
1013    /// This is a convenience method that creates a new conversation context.
1014    /// It's equivalent to calling `query_with_session()` with a new session ID.
1015    ///
1016    /// To completely clear memory and start fresh, use `ClaudeAgentOptions::builder().fork_session(true).build()`
1017    /// when creating a new client.
1018    ///
1019    /// # Arguments
1020    ///
1021    /// * `session_id` - The new session ID to use
1022    /// * `prompt` - Initial message for the new session
1023    ///
1024    /// # Errors
1025    ///
1026    /// Returns an error if the client is not connected or if sending fails.
1027    ///
1028    /// # Example
1029    ///
1030    /// ```no_run
1031    /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
1032    /// # #[tokio::main]
1033    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
1034    /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
1035    /// # client.connect().await?;
1036    /// // First conversation
1037    /// client.query("Hello").await?;
1038    ///
1039    /// // Start new conversation with different context
1040    /// client.new_session("session-2", "Tell me about Rust").await?;
1041    /// # Ok(())
1042    /// # }
1043    /// ```
1044    pub async fn new_session(
1045        &mut self,
1046        session_id: impl Into<String>,
1047        prompt: impl Into<String>,
1048    ) -> Result<()> {
1049        self.query_with_session(prompt, session_id).await
1050    }
1051
1052    /// Disconnect from Claude (analogous to Python's __aexit__)
1053    ///
1054    /// This cleanly shuts down the connection to Claude Code CLI.
1055    ///
1056    /// # Errors
1057    ///
1058    /// Returns an error if disconnection fails.
1059    #[instrument(name = "claude.client.disconnect", skip(self))]
1060    pub async fn disconnect(&mut self) -> Result<()> {
1061        if !self.connected {
1062            debug!("Client already disconnected");
1063            return Ok(());
1064        }
1065
1066        info!("Disconnecting from Claude Code CLI (closing all isolated queries)");
1067
1068        if let Some(query_manager) = self.query_manager.take() {
1069            // Get all queries for cleanup
1070            let queries = query_manager.get_all_queries();
1071
1072            // Close all isolated queries by closing their resources
1073            // This signals each CLI process to exit
1074            for (_query_id, query) in &queries {
1075                // Close stdin (if available)
1076                if let Some(ref stdin_arc) = query.stdin {
1077                    let mut stdin_guard = stdin_arc.lock().await;
1078                    if let Some(mut stdin_stream) = stdin_guard.take() {
1079                        let _ = stdin_stream.shutdown().await;
1080                    }
1081                }
1082
1083                // Close transport
1084                let transport = Arc::clone(&query.transport);
1085                let mut transport_guard = transport.lock().await;
1086                let _ = transport_guard.close().await;
1087            }
1088
1089            // Give background tasks a moment to finish reading and release locks
1090            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1091        }
1092
1093        self.current_query_id = None;
1094        self.connected = false;
1095        debug!("Disconnected successfully");
1096        Ok(())
1097    }
1098}
1099
1100impl Drop for ClaudeClient {
1101    fn drop(&mut self) {
1102        // Note: We can't run async code in Drop, so we can't guarantee clean shutdown
1103        // Users should call disconnect() explicitly
1104        if self.connected {
1105            eprintln!(
1106                "Warning: ClaudeClient dropped without calling disconnect(). Resources may not be cleaned up properly."
1107            );
1108        }
1109    }
1110}
1111
1112#[cfg(test)]
1113mod tests {
1114    use super::*;
1115    use crate::types::permissions::{PermissionResult, PermissionResultAllow};
1116    use std::sync::Arc;
1117
1118    #[tokio::test]
1119    async fn test_connect_rejects_can_use_tool_with_custom_permission_tool() {
1120        let callback: crate::types::permissions::CanUseToolCallback =
1121            Arc::new(|_tool_name, _tool_input, _context| {
1122                Box::pin(async move { PermissionResult::Allow(PermissionResultAllow::default()) })
1123            });
1124
1125        let opts = ClaudeAgentOptions::builder()
1126            .can_use_tool(callback)
1127            .permission_prompt_tool_name("custom_tool") // Not "stdio"
1128            .build();
1129
1130        let mut client = ClaudeClient::new(opts);
1131        let result = client.connect().await;
1132
1133        assert!(result.is_err());
1134        let err = result.unwrap_err();
1135        assert!(matches!(err, ClaudeError::InvalidConfig(_)));
1136        assert!(err.to_string().contains("permission_prompt_tool_name"));
1137    }
1138
1139    #[tokio::test]
1140    async fn test_connect_accepts_can_use_tool_with_stdio() {
1141        let callback: crate::types::permissions::CanUseToolCallback =
1142            Arc::new(|_tool_name, _tool_input, _context| {
1143                Box::pin(async move { PermissionResult::Allow(PermissionResultAllow::default()) })
1144            });
1145
1146        let opts = ClaudeAgentOptions::builder()
1147            .can_use_tool(callback)
1148            .permission_prompt_tool_name("stdio") // Explicitly "stdio" is OK
1149            .build();
1150
1151        let mut client = ClaudeClient::new(opts);
1152        // This will fail later (CLI not found), but should pass validation
1153        let result = client.connect().await;
1154
1155        // Should not be InvalidConfig error about permission_prompt_tool_name
1156        if let Err(ref err) = result {
1157            assert!(
1158                !err.to_string().contains("permission_prompt_tool_name"),
1159                "Should not fail on permission_prompt_tool_name validation"
1160            );
1161        }
1162    }
1163
1164    #[tokio::test]
1165    async fn test_connect_accepts_can_use_tool_without_permission_tool() {
1166        let callback: crate::types::permissions::CanUseToolCallback =
1167            Arc::new(|_tool_name, _tool_input, _context| {
1168                Box::pin(async move { PermissionResult::Allow(PermissionResultAllow::default()) })
1169            });
1170
1171        let opts = ClaudeAgentOptions::builder()
1172            .can_use_tool(callback)
1173            // No permission_prompt_tool_name set - defaults to stdio
1174            .build();
1175
1176        let mut client = ClaudeClient::new(opts);
1177        // This will fail later (CLI not found), but should pass validation
1178        let result = client.connect().await;
1179
1180        // Should not be InvalidConfig error about permission_prompt_tool_name
1181        if let Err(ref err) = result {
1182            assert!(
1183                !err.to_string().contains("permission_prompt_tool_name"),
1184                "Should not fail on permission_prompt_tool_name validation"
1185            );
1186        }
1187    }
1188}