claude_code_agent_sdk/
client.rs

1//! ClaudeClient for bidirectional streaming interactions with hook support
2
3use tracing::{debug, info, instrument};
4
5use futures::stream::Stream;
6use std::pin::Pin;
7use std::sync::Arc;
8use tokio::io::AsyncWriteExt;
9use tokio::sync::Mutex;
10
11use crate::errors::{ClaudeError, Result};
12use crate::internal::message_parser::MessageParser;
13use crate::internal::query_manager::QueryManager;
14use crate::internal::transport::subprocess::QueryPrompt;
15use crate::internal::transport::{SubprocessTransport, Transport};
16use crate::types::config::{ClaudeAgentOptions, PermissionMode};
17use crate::types::efficiency::{build_efficiency_hooks, merge_hooks};
18use crate::types::hooks::HookEvent;
19use crate::types::messages::{Message, UserContentBlock};
20
21/// Client for bidirectional streaming interactions with Claude
22///
23/// This client provides the same functionality as Python's ClaudeSDKClient,
24/// supporting bidirectional communication, streaming responses, and dynamic
25/// control over the Claude session.
26///
27/// This implementation uses the Codex-style architecture with isolated queries,
28/// where each query has its own completely independent QueryFull instance.
29/// This ensures complete message isolation and prevents ResultMessage confusion.
30///
31/// # Example
32///
33/// ```no_run
34/// use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions};
35/// use futures::StreamExt;
36///
37/// #[tokio::main]
38/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
39///     let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
40///
41///     // Connect to Claude
42///     client.connect().await?;
43///
44///     // Send a query
45///     client.query("Hello Claude!").await?;
46///
47///     // Receive response as a stream
48///     {
49///         let mut stream = client.receive_response();
50///         while let Some(message) = stream.next().await {
51///             println!("Received: {:?}", message?);
52///         }
53///     }
54///
55///     // Disconnect
56///     client.disconnect().await?;
57///     Ok(())
58/// }
59/// ```
60pub struct ClaudeClient {
61    options: ClaudeAgentOptions,
62    /// QueryManager for creating isolated queries (Codex-style architecture)
63    query_manager: Option<Arc<QueryManager>>,
64    /// Current query_id for the active prompt
65    current_query_id: Option<String>,
66    connected: bool,
67}
68
69impl ClaudeClient {
70    /// Create a new ClaudeClient
71    ///
72    /// # Arguments
73    ///
74    /// * `options` - Configuration options for the Claude client
75    ///
76    /// # Example
77    ///
78    /// ```no_run
79    /// use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions};
80    ///
81    /// let client = ClaudeClient::new(ClaudeAgentOptions::default());
82    /// ```
83    pub fn new(options: ClaudeAgentOptions) -> Self {
84        Self {
85            options,
86            query_manager: None,
87            current_query_id: None,
88            connected: false,
89        }
90    }
91
92    /// Create a new ClaudeClient with early validation
93    ///
94    /// Unlike `new()`, this validates the configuration eagerly by attempting
95    /// to create the transport. This catches issues like invalid working directory
96    /// or missing CLI before `connect()` is called.
97    ///
98    /// # Arguments
99    ///
100    /// * `options` - Configuration options for the Claude client
101    ///
102    /// # Errors
103    ///
104    /// Returns an error if:
105    /// - The working directory does not exist or is not a directory
106    /// - Claude CLI cannot be found
107    ///
108    /// # Example
109    ///
110    /// ```no_run
111    /// use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions};
112    ///
113    /// let client = ClaudeClient::try_new(ClaudeAgentOptions::default())?;
114    /// # Ok::<(), claude_code_agent_sdk::ClaudeError>(())
115    /// ```
116    pub fn try_new(options: ClaudeAgentOptions) -> Result<Self> {
117        // Validate by attempting to create transport (but don't keep it)
118        let prompt = QueryPrompt::Streaming;
119        let _ = SubprocessTransport::new(prompt, options.clone())?;
120
121        Ok(Self {
122            options,
123            query_manager: None,
124            current_query_id: None,
125            connected: false,
126        })
127    }
128
129    /// Connect to Claude (analogous to Python's __aenter__)
130    ///
131    /// This establishes the connection to the Claude Code CLI and initializes
132    /// the bidirectional communication channel.
133    ///
134    /// Uses the Codex-style architecture with QueryManager for complete
135    /// message isolation between queries.
136    ///
137    /// # Errors
138    ///
139    /// Returns an error if:
140    /// - Claude CLI cannot be found or started
141    /// - The initialization handshake fails
142    /// - Hook registration fails
143    /// - `can_use_tool` callback is set with incompatible `permission_prompt_tool_name`
144    #[instrument(
145        name = "claude.client.connect",
146        skip(self),
147        fields(
148            has_can_use_tool = self.options.can_use_tool.is_some(),
149            has_hooks = self.options.hooks.is_some(),
150            model = %self.options.model.as_deref().unwrap_or("default"),
151        )
152    )]
153    pub async fn connect(&mut self) -> Result<()> {
154        if self.connected {
155            debug!("Client already connected, skipping");
156            return Ok(());
157        }
158
159        info!("Connecting to Claude Code CLI (using QueryManager for isolated queries)");
160
161        // Automatically set permission_prompt_tool_name to "stdio" when can_use_tool is provided
162        // This matches Python SDK behavior (client.py lines 106-122)
163        // which ensures CLI uses control protocol for permission prompts
164        let mut options = self.options.clone();
165        if options.can_use_tool.is_some() && options.permission_prompt_tool_name.is_none() {
166            info!("can_use_tool callback is set, automatically setting permission_prompt_tool_name to 'stdio'");
167            options.permission_prompt_tool_name = Some("stdio".to_string());
168        }
169
170        // Validate can_use_tool configuration (aligned with Python SDK behavior)
171        // When can_use_tool callback is set, permission_prompt_tool_name must be "stdio"
172        // to ensure the control protocol can handle permission requests
173        if options.can_use_tool.is_some()
174            && let Some(ref tool_name) = options.permission_prompt_tool_name
175            && tool_name != "stdio"
176        {
177            return Err(ClaudeError::InvalidConfig(
178                        "can_use_tool callback requires permission_prompt_tool_name to be 'stdio' or unset. \
179                        Custom permission_prompt_tool_name is incompatible with can_use_tool callback."
180                            .to_string(),
181                    ));
182        }
183
184        // Prepare hooks for initialization
185        // Build efficiency hooks if configured
186        let efficiency_hooks = self
187            .options
188            .efficiency
189            .as_ref()
190            .map(build_efficiency_hooks)
191            .unwrap_or_default();
192
193        // Merge user hooks with efficiency hooks
194        let merged_hooks = merge_hooks(self.options.hooks.clone(), efficiency_hooks);
195
196        // Convert hooks to internal format
197        let hooks = merged_hooks.as_ref().map(|hooks_map| {
198            hooks_map
199                .iter()
200                .map(|(event, matchers)| {
201                    let event_name = match event {
202                        HookEvent::PreToolUse => "PreToolUse",
203                        HookEvent::PostToolUse => "PostToolUse",
204                        HookEvent::UserPromptSubmit => "UserPromptSubmit",
205                        HookEvent::Stop => "Stop",
206                        HookEvent::SubagentStop => "SubagentStop",
207                        HookEvent::PreCompact => "PreCompact",
208                    };
209                    (event_name.to_string(), matchers.clone())
210                })
211                .collect()
212        });
213
214        // Extract SDK MCP servers from options
215        let sdk_mcp_servers =
216            if let crate::types::mcp::McpServers::Dict(servers_dict) = &self.options.mcp_servers {
217                servers_dict
218                    .iter()
219                    .filter_map(|(name, config)| {
220                        if let crate::types::mcp::McpServerConfig::Sdk(sdk_config) = config {
221                            Some((name.clone(), sdk_config.clone()))
222                        } else {
223                            None
224                        }
225                    })
226                    .collect()
227            } else {
228                std::collections::HashMap::new()
229            };
230
231        // Clone options for the transport factory
232        let options_clone = options.clone();
233
234        // Create QueryManager with a transport factory
235        // The factory creates new transports for each isolated query
236        // Note: connect() is called later in create_query(), not here
237        let mut query_manager = QueryManager::new(move || {
238            let prompt = QueryPrompt::Streaming;
239            let transport = SubprocessTransport::new(prompt, options_clone.clone())?;
240            Ok(Box::new(transport) as Box<dyn Transport>)
241        });
242
243        // Set control request timeout
244        query_manager.set_control_request_timeout(self.options.control_request_timeout);
245
246        // Set configuration on QueryManager
247        query_manager.set_hooks(hooks).await;
248        query_manager.set_sdk_mcp_servers(sdk_mcp_servers).await;
249        query_manager.set_can_use_tool(self.options.can_use_tool.clone()).await;
250
251        let query_manager = Arc::new(query_manager);
252
253        // Create the first query for initialization
254        let first_query_id = query_manager.create_query().await?;
255
256        // Start cleanup task for inactive queries
257        Arc::clone(&query_manager).start_cleanup_task();
258
259        self.query_manager = Some(query_manager);
260        self.current_query_id = Some(first_query_id);
261        self.connected = true;
262
263        info!("Successfully connected to Claude Code CLI with QueryManager");
264        Ok(())
265    }
266
267    /// Send a query to Claude
268    ///
269    /// This sends a new user prompt to Claude. Claude will remember the context
270    /// of previous queries within the same session.
271    ///
272    /// # Arguments
273    ///
274    /// * `prompt` - The user prompt to send
275    ///
276    /// # Errors
277    ///
278    /// Returns an error if the client is not connected or if sending fails.
279    ///
280    /// # Example
281    ///
282    /// ```no_run
283    /// # use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions};
284    /// # #[tokio::main]
285    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
286    /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
287    /// # client.connect().await?;
288    /// client.query("What is 2 + 2?").await?;
289    /// # Ok(())
290    /// # }
291    /// ```
292    #[instrument(
293        name = "claude.client.query",
294        skip(self, prompt),
295        fields(session_id = "default",)
296    )]
297    pub async fn query(&mut self, prompt: impl Into<String>) -> Result<()> {
298        self.query_with_session(prompt, "default").await
299    }
300
301    /// Send a query to Claude with a specific session ID
302    ///
303    /// This sends a new user prompt to Claude. Different session IDs maintain
304    /// separate conversation contexts.
305    ///
306    /// # Arguments
307    ///
308    /// * `prompt` - The user prompt to send
309    /// * `session_id` - Session identifier for the conversation
310    ///
311    /// # Errors
312    ///
313    /// Returns an error if the client is not connected or if sending fails.
314    ///
315    /// # Example
316    ///
317    /// ```no_run
318    /// # use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions};
319    /// # #[tokio::main]
320    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
321    /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
322    /// # client.connect().await?;
323    /// // Separate conversation contexts
324    /// client.query_with_session("First question", "session-1").await?;
325    /// client.query_with_session("Different question", "session-2").await?;
326    /// # Ok(())
327    /// # }
328    /// ```
329    pub async fn query_with_session(
330        &mut self,
331        prompt: impl Into<String>,
332        session_id: impl Into<String>,
333    ) -> Result<()> {
334        let query_manager = self.query_manager.as_ref().ok_or_else(|| {
335            ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
336        })?;
337
338        let prompt_str = prompt.into();
339        let session_id_str = session_id.into();
340
341        // Get or create query for this session (reuses existing query to maintain context)
342        let query_id = query_manager.get_or_create_session_query(&session_id_str).await?;
343        self.current_query_id = Some(query_id.clone());
344
345        // Get the isolated query
346        let query = query_manager.get_query(&query_id)?;
347
348        // Format as JSON message for stream-json input format
349        let user_message = serde_json::json!({
350            "type": "user",
351            "message": {
352                "role": "user",
353                "content": prompt_str
354            },
355            "session_id": session_id_str
356        });
357
358        let message_str = serde_json::to_string(&user_message).map_err(|e| {
359            ClaudeError::Transport(format!("Failed to serialize user message: {}", e))
360        })?;
361
362        // Write directly to stdin (bypasses transport lock)
363        let stdin = query.stdin.clone();
364
365        if let Some(stdin_arc) = stdin {
366            let mut stdin_guard = stdin_arc.lock().await;
367            if let Some(ref mut stdin_stream) = *stdin_guard {
368                stdin_stream
369                    .write_all(message_str.as_bytes())
370                    .await
371                    .map_err(|e| ClaudeError::Transport(format!("Failed to write query: {}", e)))?;
372                stdin_stream.write_all(b"\n").await.map_err(|e| {
373                    ClaudeError::Transport(format!("Failed to write newline: {}", e))
374                })?;
375                stdin_stream
376                    .flush()
377                    .await
378                    .map_err(|e| ClaudeError::Transport(format!("Failed to flush: {}", e)))?;
379            } else {
380                return Err(ClaudeError::Transport("stdin not available".to_string()));
381            }
382        } else {
383            return Err(ClaudeError::Transport("stdin not set".to_string()));
384        }
385
386        debug!(
387            query_id = %query_id,
388            session_id = %session_id_str,
389            "Sent query to isolated query"
390        );
391
392        Ok(())
393    }
394
395    /// Send a query with structured content blocks (supports images)
396    ///
397    /// This method enables multimodal queries in bidirectional streaming mode.
398    /// Use it to send images alongside text for vision-related tasks.
399    ///
400    /// # Arguments
401    ///
402    /// * `content` - A vector of content blocks (text and/or images)
403    ///
404    /// # Errors
405    ///
406    /// Returns an error if:
407    /// - The content vector is empty (must include at least one text or image block)
408    /// - The client is not connected (call `connect()` first)
409    /// - Sending the message fails
410    ///
411    /// # Example
412    ///
413    /// ```no_run
414    /// # use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions, UserContentBlock};
415    /// # #[tokio::main]
416    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
417    /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
418    /// # client.connect().await?;
419    /// let base64_data = "iVBORw0KGgo..."; // base64 encoded image
420    /// client.query_with_content(vec![
421    ///     UserContentBlock::text("What's in this image?"),
422    ///     UserContentBlock::image_base64("image/png", base64_data)?,
423    /// ]).await?;
424    /// # Ok(())
425    /// # }
426    /// ```
427    pub async fn query_with_content(
428        &mut self,
429        content: impl Into<Vec<UserContentBlock>>,
430    ) -> Result<()> {
431        self.query_with_content_and_session(content, "default")
432            .await
433    }
434
435    /// Send a query with structured content blocks and a specific session ID
436    ///
437    /// This method enables multimodal queries with session management for
438    /// maintaining separate conversation contexts.
439    ///
440    /// # Arguments
441    ///
442    /// * `content` - A vector of content blocks (text and/or images)
443    /// * `session_id` - Session identifier for the conversation
444    ///
445    /// # Errors
446    ///
447    /// Returns an error if:
448    /// - The content vector is empty (must include at least one text or image block)
449    /// - The client is not connected (call `connect()` first)
450    /// - Sending the message fails
451    ///
452    /// # Example
453    ///
454    /// ```no_run
455    /// # use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions, UserContentBlock};
456    /// # #[tokio::main]
457    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
458    /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
459    /// # client.connect().await?;
460    /// client.query_with_content_and_session(
461    ///     vec![
462    ///         UserContentBlock::text("Analyze this chart"),
463    ///         UserContentBlock::image_url("https://example.com/chart.png"),
464    ///     ],
465    ///     "analysis-session",
466    /// ).await?;
467    /// # Ok(())
468    /// # }
469    /// ```
470    pub async fn query_with_content_and_session(
471        &mut self,
472        content: impl Into<Vec<UserContentBlock>>,
473        session_id: impl Into<String>,
474    ) -> Result<()> {
475        let query_manager = self.query_manager.as_ref().ok_or_else(|| {
476            ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
477        })?;
478
479        let content_blocks: Vec<UserContentBlock> = content.into();
480        UserContentBlock::validate_content(&content_blocks)?;
481
482        let session_id_str = session_id.into();
483
484        // Get or create query for this session (reuses existing query to maintain context)
485        let query_id = query_manager.get_or_create_session_query(&session_id_str).await?;
486        self.current_query_id = Some(query_id.clone());
487
488        // Get the isolated query
489        let query = query_manager.get_query(&query_id)?;
490
491        // Format as JSON message for stream-json input format
492        // Content is an array of content blocks, not a simple string
493        let user_message = serde_json::json!({
494            "type": "user",
495            "message": {
496                "role": "user",
497                "content": content_blocks
498            },
499            "session_id": session_id_str
500        });
501
502        let message_str = serde_json::to_string(&user_message).map_err(|e| {
503            ClaudeError::Transport(format!("Failed to serialize user message: {}", e))
504        })?;
505
506        // Write directly to stdin (bypasses transport lock)
507        let stdin = query.stdin.clone();
508
509        if let Some(stdin_arc) = stdin {
510            let mut stdin_guard = stdin_arc.lock().await;
511            if let Some(ref mut stdin_stream) = *stdin_guard {
512                stdin_stream
513                    .write_all(message_str.as_bytes())
514                    .await
515                    .map_err(|e| ClaudeError::Transport(format!("Failed to write query: {}", e)))?;
516                stdin_stream.write_all(b"\n").await.map_err(|e| {
517                    ClaudeError::Transport(format!("Failed to write newline: {}", e))
518                })?;
519                stdin_stream
520                    .flush()
521                    .await
522                    .map_err(|e| ClaudeError::Transport(format!("Failed to flush: {}", e)))?;
523            } else {
524                return Err(ClaudeError::Transport("stdin not available".to_string()));
525            }
526        } else {
527            return Err(ClaudeError::Transport("stdin not set".to_string()));
528        }
529
530        debug!(
531            query_id = %query_id,
532            session_id = %session_id_str,
533            "Sent content query to isolated query"
534        );
535
536        Ok(())
537    }
538
539    /// Receive all messages as a stream (continuous)
540    ///
541    /// This method returns a stream that yields all messages from Claude
542    /// indefinitely until the stream is closed or an error occurs.
543    ///
544    /// Use this when you want to process all messages, including multiple
545    /// responses and system events.
546    ///
547    /// # Returns
548    ///
549    /// A stream of `Result<Message>` that continues until the connection closes.
550    ///
551    /// # Example
552    ///
553    /// ```no_run
554    /// # use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions};
555    /// # use futures::StreamExt;
556    /// # #[tokio::main]
557    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
558    /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
559    /// # client.connect().await?;
560    /// # client.query("Hello").await?;
561    /// let mut stream = client.receive_messages();
562    /// while let Some(message) = stream.next().await {
563    ///     println!("Received: {:?}", message?);
564    /// }
565    /// # Ok(())
566    /// # }
567    /// ```
568    pub fn receive_messages(&self) -> Pin<Box<dyn Stream<Item = Result<Message>> + Send + '_>> {
569        let query_manager = match &self.query_manager {
570            Some(qm) => Arc::clone(qm),
571            None => {
572                return Box::pin(futures::stream::once(async {
573                    Err(ClaudeError::InvalidConfig(
574                        "Client not connected. Call connect() first.".to_string(),
575                    ))
576                }));
577            }
578        };
579
580        let query_id = match &self.current_query_id {
581            Some(id) => id.clone(),
582            None => {
583                return Box::pin(futures::stream::once(async {
584                    Err(ClaudeError::InvalidConfig(
585                        "No active query. Call query() first.".to_string(),
586                    ))
587                }));
588            }
589        };
590
591        Box::pin(async_stream::stream! {
592            // Get the isolated query and its message receiver
593            let query = match query_manager.get_query(&query_id) {
594                Ok(q) => q,
595                Err(e) => {
596                    yield Err(e);
597                    return;
598                }
599            };
600
601            let rx: Arc<Mutex<tokio::sync::mpsc::UnboundedReceiver<serde_json::Value>>> = {
602                Arc::clone(&query.message_rx)
603            };
604
605            loop {
606                let message = {
607                    let mut rx_guard = rx.lock().await;
608                    rx_guard.recv().await
609                };
610
611                match message {
612                    Some(json) => {
613                        match MessageParser::parse(json) {
614                            Ok(msg) => yield Ok(msg),
615                            Err(e) => {
616                                eprintln!("Failed to parse message: {}", e);
617                                yield Err(e);
618                            }
619                        }
620                    }
621                    None => break,
622                }
623            }
624        })
625    }
626
627    /// Receive messages until a ResultMessage
628    ///
629    /// This method returns a stream that yields messages until it encounters
630    /// a `ResultMessage`, which signals the completion of a Claude response.
631    ///
632    /// This is the most common pattern for handling Claude responses, as it
633    /// processes one complete "turn" of the conversation.
634    ///
635    /// This method uses query-scoped message channels to ensure message isolation,
636    /// preventing late-arriving ResultMessages from being consumed by the wrong prompt.
637    ///
638    /// # Returns
639    ///
640    /// A stream of `Result<Message>` that ends when a ResultMessage is received.
641    ///
642    /// # Example
643    ///
644    /// ```no_run
645    /// # use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions, Message};
646    /// # use futures::StreamExt;
647    /// # #[tokio::main]
648    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
649    /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
650    /// # client.connect().await?;
651    /// # client.query("Hello").await?;
652    /// let mut stream = client.receive_response();
653    /// while let Some(message) = stream.next().await {
654    ///     match message? {
655    ///         Message::Assistant(msg) => println!("Assistant: {:?}", msg),
656    ///         Message::Result(result) => {
657    ///             println!("Done! Cost: ${:?}", result.total_cost_usd);
658    ///             break;
659    ///         }
660    ///         _ => {}
661    ///     }
662    /// }
663    /// # Ok(())
664    /// # }
665    /// ```
666    pub fn receive_response(&self) -> Pin<Box<dyn Stream<Item = Result<Message>> + Send + '_>> {
667        let query_manager = match &self.query_manager {
668            Some(qm) => Arc::clone(qm),
669            None => {
670                return Box::pin(futures::stream::once(async {
671                    Err(ClaudeError::InvalidConfig(
672                        "Client not connected. Call connect() first.".to_string(),
673                    ))
674                }));
675            }
676        };
677
678        let query_id = match &self.current_query_id {
679            Some(id) => id.clone(),
680            None => {
681                return Box::pin(futures::stream::once(async {
682                    Err(ClaudeError::InvalidConfig(
683                        "No active query. Call query() first.".to_string(),
684                    ))
685                }));
686            }
687        };
688
689        Box::pin(async_stream::stream! {
690            // ====================================================================
691            // ISOLATED QUERY MESSAGE CHANNEL (Codex-style)
692            // ====================================================================
693            // In the Codex-style architecture, each query has its own completely
694            // isolated QueryFull instance. We get the message receiver directly
695            // from the isolated query, eliminating the need for routing logic.
696            //
697            // This provides:
698            // - Complete message isolation
699            // - No possibility of message confusion
700            // - Simpler architecture without routing overhead
701            //
702            // Note: Cleanup is handled by the periodic cleanup task in QueryManager,
703            // which removes inactive queries whose channels have been closed.
704
705            debug!(
706                query_id = %query_id,
707                "Getting message receiver from isolated query"
708            );
709
710            // Get the isolated query and its message receiver
711            let query = match query_manager.get_query(&query_id) {
712                Ok(q) => q,
713                Err(e) => {
714                    yield Err(e);
715                    return;
716                }
717            };
718
719            // Get the message receiver from the isolated query
720            // In isolated mode, we directly access the message_rx without routing
721            let rx: Arc<Mutex<tokio::sync::mpsc::UnboundedReceiver<serde_json::Value>>> = {
722                Arc::clone(&query.message_rx)
723            };
724
725            loop {
726                let message = {
727                    let mut rx_guard = rx.lock().await;
728                    rx_guard.recv().await
729                };
730
731                match message {
732                    Some(json) => {
733                        match MessageParser::parse(json) {
734                            Ok(msg) => {
735                                let is_result = matches!(msg, Message::Result(_));
736                                yield Ok(msg);
737                                if is_result {
738                                    debug!(
739                                        query_id = %query_id,
740                                        "Received ResultMessage, ending stream"
741                                    );
742                                    // Cleanup will be handled by the periodic cleanup task
743                                    // when the query becomes inactive
744                                    break;
745                                }
746                            }
747                            Err(e) => {
748                                eprintln!("Failed to parse message: {}", e);
749                                yield Err(e);
750                            }
751                        }
752                    }
753                    None => {
754                        debug!(
755                            query_id = %query_id,
756                            "Isolated query channel closed"
757                        );
758                        // Cleanup will be handled by the periodic cleanup task
759                        break;
760                    }
761                }
762            }
763        })
764    }
765
766    /// Send an interrupt signal to stop the current Claude operation
767    ///
768    /// This is analogous to Python's `client.interrupt()`.
769    ///
770    /// # Errors
771    ///
772    /// Returns an error if the client is not connected or if sending fails.
773    pub async fn interrupt(&self) -> Result<()> {
774        let query_manager = self.query_manager.as_ref().ok_or_else(|| {
775            ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
776        })?;
777
778        let query_id = self.current_query_id.as_ref().ok_or_else(|| {
779            ClaudeError::InvalidConfig("No active query. Call query() first.".to_string())
780        })?;
781
782        let query = query_manager.get_query(query_id)?;
783        query.interrupt().await
784    }
785
786    /// Change the permission mode dynamically
787    ///
788    /// This is analogous to Python's `client.set_permission_mode()`.
789    ///
790    /// # Arguments
791    ///
792    /// * `mode` - The new permission mode to set
793    ///
794    /// # Errors
795    ///
796    /// Returns an error if the client is not connected or if sending fails.
797    pub async fn set_permission_mode(&self, mode: PermissionMode) -> Result<()> {
798        let query_manager = self.query_manager.as_ref().ok_or_else(|| {
799            ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
800        })?;
801
802        let query_id = self.current_query_id.as_ref().ok_or_else(|| {
803            ClaudeError::InvalidConfig("No active query. Call query() first.".to_string())
804        })?;
805
806        let query = query_manager.get_query(query_id)?;
807        query.set_permission_mode(mode).await
808    }
809
810    /// Change the AI model dynamically
811    ///
812    /// This is analogous to Python's `client.set_model()`.
813    ///
814    /// # Arguments
815    ///
816    /// * `model` - The new model name, or None to use default
817    ///
818    /// # Errors
819    ///
820    /// Returns an error if the client is not connected or if sending fails.
821    pub async fn set_model(&self, model: Option<&str>) -> Result<()> {
822        let query_manager = self.query_manager.as_ref().ok_or_else(|| {
823            ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
824        })?;
825
826        let query_id = self.current_query_id.as_ref().ok_or_else(|| {
827            ClaudeError::InvalidConfig("No active query. Call query() first.".to_string())
828        })?;
829
830        let query = query_manager.get_query(query_id)?;
831        query.set_model(model).await
832    }
833
834    /// Rewind tracked files to their state at a specific user message.
835    ///
836    /// This is analogous to Python's `client.rewind_files()`.
837    ///
838    /// # Requirements
839    ///
840    /// - `enable_file_checkpointing=true` in options to track file changes
841    /// - `extra_args={"replay-user-messages": None}` to receive UserMessage
842    ///   objects with `uuid` in the response stream
843    ///
844    /// # Arguments
845    ///
846    /// * `user_message_id` - UUID of the user message to rewind to. This should be
847    ///   the `uuid` field from a `UserMessage` received during the conversation.
848    ///
849    /// # Errors
850    ///
851    /// Returns an error if the client is not connected or if sending fails.
852    ///
853    /// # Example
854    ///
855    /// ```no_run
856    /// # use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions, Message};
857    /// # use std::collections::HashMap;
858    /// # #[tokio::main]
859    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
860    /// let options = ClaudeAgentOptions::builder()
861    ///     .enable_file_checkpointing(true)
862    ///     .extra_args(HashMap::from([("replay-user-messages".to_string(), None)]))
863    ///     .build();
864    /// let mut client = ClaudeClient::new(options);
865    /// client.connect().await?;
866    ///
867    /// client.query("Make some changes to my files").await?;
868    /// let mut checkpoint_id = None;
869    /// {
870    ///     let mut stream = client.receive_response();
871    ///     use futures::StreamExt;
872    ///     while let Some(Ok(msg)) = stream.next().await {
873    ///         if let Message::User(user_msg) = &msg {
874    ///             if let Some(uuid) = &user_msg.uuid {
875    ///                 checkpoint_id = Some(uuid.clone());
876    ///             }
877    ///         }
878    ///     }
879    /// }
880    ///
881    /// // Later, rewind to that point
882    /// if let Some(id) = checkpoint_id {
883    ///     client.rewind_files(&id).await?;
884    /// }
885    /// # Ok(())
886    /// # }
887    /// ```
888    pub async fn rewind_files(&self, user_message_id: &str) -> Result<()> {
889        let query_manager = self.query_manager.as_ref().ok_or_else(|| {
890            ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
891        })?;
892
893        let query_id = self.current_query_id.as_ref().ok_or_else(|| {
894            ClaudeError::InvalidConfig("No active query. Call query() first.".to_string())
895        })?;
896
897        let query = query_manager.get_query(query_id)?;
898        query.rewind_files(user_message_id).await
899    }
900
901    /// Get server initialization info including available commands and output styles
902    ///
903    /// Returns initialization information from the Claude Code server including:
904    /// - Available commands (slash commands, system commands, etc.)
905    /// - Current and available output styles
906    /// - Server capabilities
907    ///
908    /// This is analogous to Python's `client.get_server_info()`.
909    ///
910    /// # Returns
911    ///
912    /// Dictionary with server info, or None if not connected
913    ///
914    /// # Example
915    ///
916    /// ```no_run
917    /// # use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions};
918    /// # #[tokio::main]
919    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
920    /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
921    /// # client.connect().await?;
922    /// if let Some(info) = client.get_server_info().await {
923    ///     println!("Commands available: {}", info.get("commands").map(|c| c.as_array().map(|a| a.len()).unwrap_or(0)).unwrap_or(0));
924    ///     println!("Output style: {:?}", info.get("output_style"));
925    /// }
926    /// # Ok(())
927    /// # }
928    /// ```
929    pub async fn get_server_info(&self) -> Option<serde_json::Value> {
930        let query_manager = self.query_manager.as_ref()?;
931        let query_id = self.current_query_id.as_ref()?;
932        let Ok(query) = query_manager.get_query(query_id) else {
933            return None;
934        };
935        query.get_initialization_result().await
936    }
937
938    /// Start a new session by switching to a different session ID
939    ///
940    /// This is a convenience method that creates a new conversation context.
941    /// It's equivalent to calling `query_with_session()` with a new session ID.
942    ///
943    /// To completely clear memory and start fresh, use `ClaudeAgentOptions::builder().fork_session(true).build()`
944    /// when creating a new client.
945    ///
946    /// # Arguments
947    ///
948    /// * `session_id` - The new session ID to use
949    /// * `prompt` - Initial message for the new session
950    ///
951    /// # Errors
952    ///
953    /// Returns an error if the client is not connected or if sending fails.
954    ///
955    /// # Example
956    ///
957    /// ```no_run
958    /// # use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions};
959    /// # #[tokio::main]
960    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
961    /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
962    /// # client.connect().await?;
963    /// // First conversation
964    /// client.query("Hello").await?;
965    ///
966    /// // Start new conversation with different context
967    /// client.new_session("session-2", "Tell me about Rust").await?;
968    /// # Ok(())
969    /// # }
970    /// ```
971    pub async fn new_session(
972        &mut self,
973        session_id: impl Into<String>,
974        prompt: impl Into<String>,
975    ) -> Result<()> {
976        self.query_with_session(prompt, session_id).await
977    }
978
979    /// Disconnect from Claude (analogous to Python's __aexit__)
980    ///
981    /// This cleanly shuts down the connection to Claude Code CLI.
982    ///
983    /// # Errors
984    ///
985    /// Returns an error if disconnection fails.
986    #[instrument(name = "claude.client.disconnect", skip(self))]
987    pub async fn disconnect(&mut self) -> Result<()> {
988        if !self.connected {
989            debug!("Client already disconnected");
990            return Ok(());
991        }
992
993        info!("Disconnecting from Claude Code CLI (closing all isolated queries)");
994
995        if let Some(query_manager) = self.query_manager.take() {
996            // Get all queries for cleanup
997            let queries = query_manager.get_all_queries();
998
999            // Close all isolated queries by closing their resources
1000            // This signals each CLI process to exit
1001            for (_query_id, query) in &queries {
1002                // Close stdin (if available)
1003                if let Some(ref stdin_arc) = query.stdin {
1004                    let mut stdin_guard = stdin_arc.lock().await;
1005                    if let Some(mut stdin_stream) = stdin_guard.take() {
1006                        let _ = stdin_stream.shutdown().await;
1007                    }
1008                }
1009
1010                // Close transport
1011                let transport = Arc::clone(&query.transport);
1012                let mut transport_guard = transport.lock().await;
1013                let _ = transport_guard.close().await;
1014            }
1015
1016            // Give background tasks a moment to finish reading and release locks
1017            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1018        }
1019
1020        self.current_query_id = None;
1021        self.connected = false;
1022        debug!("Disconnected successfully");
1023        Ok(())
1024    }
1025}
1026
1027impl Drop for ClaudeClient {
1028    fn drop(&mut self) {
1029        // Note: We can't run async code in Drop, so we can't guarantee clean shutdown
1030        // Users should call disconnect() explicitly
1031        if self.connected {
1032            eprintln!(
1033                "Warning: ClaudeClient dropped without calling disconnect(). Resources may not be cleaned up properly."
1034            );
1035        }
1036    }
1037}
1038
1039#[cfg(test)]
1040mod tests {
1041    use super::*;
1042    use crate::types::permissions::{PermissionResult, PermissionResultAllow};
1043    use std::sync::Arc;
1044
1045    #[tokio::test]
1046    async fn test_connect_rejects_can_use_tool_with_custom_permission_tool() {
1047        let callback: crate::types::permissions::CanUseToolCallback =
1048            Arc::new(|_tool_name, _tool_input, _context| {
1049                Box::pin(async move { PermissionResult::Allow(PermissionResultAllow::default()) })
1050            });
1051
1052        let opts = ClaudeAgentOptions::builder()
1053            .can_use_tool(callback)
1054            .permission_prompt_tool_name("custom_tool") // Not "stdio"
1055            .build();
1056
1057        let mut client = ClaudeClient::new(opts);
1058        let result = client.connect().await;
1059
1060        assert!(result.is_err());
1061        let err = result.unwrap_err();
1062        assert!(matches!(err, ClaudeError::InvalidConfig(_)));
1063        assert!(err.to_string().contains("permission_prompt_tool_name"));
1064    }
1065
1066    #[tokio::test]
1067    async fn test_connect_accepts_can_use_tool_with_stdio() {
1068        let callback: crate::types::permissions::CanUseToolCallback =
1069            Arc::new(|_tool_name, _tool_input, _context| {
1070                Box::pin(async move { PermissionResult::Allow(PermissionResultAllow::default()) })
1071            });
1072
1073        let opts = ClaudeAgentOptions::builder()
1074            .can_use_tool(callback)
1075            .permission_prompt_tool_name("stdio") // Explicitly "stdio" is OK
1076            .build();
1077
1078        let mut client = ClaudeClient::new(opts);
1079        // This will fail later (CLI not found), but should pass validation
1080        let result = client.connect().await;
1081
1082        // Should not be InvalidConfig error about permission_prompt_tool_name
1083        if let Err(ref err) = result {
1084            assert!(
1085                !err.to_string().contains("permission_prompt_tool_name"),
1086                "Should not fail on permission_prompt_tool_name validation"
1087            );
1088        }
1089    }
1090
1091    #[tokio::test]
1092    async fn test_connect_accepts_can_use_tool_without_permission_tool() {
1093        let callback: crate::types::permissions::CanUseToolCallback =
1094            Arc::new(|_tool_name, _tool_input, _context| {
1095                Box::pin(async move { PermissionResult::Allow(PermissionResultAllow::default()) })
1096            });
1097
1098        let opts = ClaudeAgentOptions::builder()
1099            .can_use_tool(callback)
1100            // No permission_prompt_tool_name set - defaults to stdio
1101            .build();
1102
1103        let mut client = ClaudeClient::new(opts);
1104        // This will fail later (CLI not found), but should pass validation
1105        let result = client.connect().await;
1106
1107        // Should not be InvalidConfig error about permission_prompt_tool_name
1108        if let Err(ref err) = result {
1109            assert!(
1110                !err.to_string().contains("permission_prompt_tool_name"),
1111                "Should not fail on permission_prompt_tool_name validation"
1112            );
1113        }
1114    }
1115}