Skip to main content

claude_code_agent_sdk/
client.rs

1//! ClaudeClient for bidirectional streaming interactions with hook support
2
3use tracing::{debug, info, instrument};
4
5use futures::stream::Stream;
6use std::pin::Pin;
7use std::sync::Arc;
8use tokio::io::AsyncWriteExt;
9use tokio::sync::Mutex;
10
11use crate::errors::{ClaudeError, Result};
12use crate::internal::message_parser::MessageParser;
13use crate::internal::query_manager::QueryManager;
14use crate::internal::transport::subprocess::QueryPrompt;
15use crate::internal::transport::{SubprocessTransport, Transport};
16use crate::types::config::{ClaudeAgentOptions, PermissionMode};
17use crate::types::efficiency::{build_efficiency_hooks, merge_hooks};
18use crate::types::hooks::HookEvent;
19use crate::types::messages::{Message, UserContentBlock};
20
21/// Client for bidirectional streaming interactions with Claude
22///
23/// This client provides the same functionality as Python's ClaudeSDKClient,
24/// supporting bidirectional communication, streaming responses, and dynamic
25/// control over the Claude session.
26///
27/// This implementation uses the Codex-style architecture with isolated queries,
28/// where each query has its own completely independent QueryFull instance.
29/// This ensures complete message isolation and prevents ResultMessage confusion.
30///
31/// # Example
32///
33/// ```no_run
34/// use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions};
35/// use futures::StreamExt;
36///
37/// #[tokio::main]
38/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
39///     let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
40///
41///     // Connect to Claude
42///     client.connect().await?;
43///
44///     // Send a query
45///     client.query("Hello Claude!").await?;
46///
47///     // Receive response as a stream
48///     {
49///         let mut stream = client.receive_response();
50///         while let Some(message) = stream.next().await {
51///             println!("Received: {:?}", message?);
52///         }
53///     }
54///
55///     // Disconnect
56///     client.disconnect().await?;
57///     Ok(())
58/// }
59/// ```
60pub struct ClaudeClient {
61    options: ClaudeAgentOptions,
62    /// QueryManager for creating isolated queries (Codex-style architecture)
63    query_manager: Option<Arc<QueryManager>>,
64    /// Current query_id for the active prompt
65    current_query_id: Option<String>,
66    connected: bool,
67}
68
69impl ClaudeClient {
70    /// Create a new ClaudeClient
71    ///
72    /// # Arguments
73    ///
74    /// * `options` - Configuration options for the Claude client
75    ///
76    /// # Example
77    ///
78    /// ```no_run
79    /// use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions};
80    ///
81    /// let client = ClaudeClient::new(ClaudeAgentOptions::default());
82    /// ```
83    pub fn new(options: ClaudeAgentOptions) -> Self {
84        Self {
85            options,
86            query_manager: None,
87            current_query_id: None,
88            connected: false,
89        }
90    }
91
92    /// Create a new ClaudeClient with early validation
93    ///
94    /// Unlike `new()`, this validates the configuration eagerly by attempting
95    /// to create the transport. This catches issues like invalid working directory
96    /// or missing CLI before `connect()` is called.
97    ///
98    /// # Arguments
99    ///
100    /// * `options` - Configuration options for the Claude client
101    ///
102    /// # Errors
103    ///
104    /// Returns an error if:
105    /// - The working directory does not exist or is not a directory
106    /// - Claude CLI cannot be found
107    ///
108    /// # Example
109    ///
110    /// ```no_run
111    /// use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions};
112    ///
113    /// let client = ClaudeClient::try_new(ClaudeAgentOptions::default())?;
114    /// # Ok::<(), claude_code_agent_sdk::ClaudeError>(())
115    /// ```
116    pub fn try_new(options: ClaudeAgentOptions) -> Result<Self> {
117        // Validate by attempting to create transport (but don't keep it)
118        let prompt = QueryPrompt::Streaming;
119        let _ = SubprocessTransport::new(prompt, options.clone())?;
120
121        Ok(Self {
122            options,
123            query_manager: None,
124            current_query_id: None,
125            connected: false,
126        })
127    }
128
129    /// Connect to Claude (analogous to Python's __aenter__)
130    ///
131    /// This establishes the connection to the Claude Code CLI and initializes
132    /// the bidirectional communication channel.
133    ///
134    /// Uses the Codex-style architecture with QueryManager for complete
135    /// message isolation between queries.
136    ///
137    /// # Errors
138    ///
139    /// Returns an error if:
140    /// - Claude CLI cannot be found or started
141    /// - The initialization handshake fails
142    /// - Hook registration fails
143    /// - `can_use_tool` callback is set with incompatible `permission_prompt_tool_name`
144    #[instrument(
145        name = "claude.client.connect",
146        skip(self),
147        fields(
148            has_can_use_tool = self.options.can_use_tool.is_some(),
149            has_hooks = self.options.hooks.is_some(),
150            model = %self.options.model.as_deref().unwrap_or("default"),
151        )
152    )]
153    pub async fn connect(&mut self) -> Result<()> {
154        if self.connected {
155            debug!("Client already connected, skipping");
156            return Ok(());
157        }
158
159        info!("Connecting to Claude Code CLI (using QueryManager for isolated queries)");
160
161        // Automatically set permission_prompt_tool_name to "stdio" when can_use_tool is provided
162        // This matches Python SDK behavior (client.py lines 106-122)
163        // which ensures CLI uses control protocol for permission prompts
164        let mut options = self.options.clone();
165        if options.can_use_tool.is_some() && options.permission_prompt_tool_name.is_none() {
166            info!(
167                "can_use_tool callback is set, automatically setting permission_prompt_tool_name to 'stdio'"
168            );
169            options.permission_prompt_tool_name = Some("stdio".to_string());
170        }
171
172        // Validate can_use_tool configuration (aligned with Python SDK behavior)
173        // When can_use_tool callback is set, permission_prompt_tool_name must be "stdio"
174        // to ensure the control protocol can handle permission requests
175        if options.can_use_tool.is_some()
176            && let Some(ref tool_name) = options.permission_prompt_tool_name
177            && tool_name != "stdio"
178        {
179            return Err(ClaudeError::InvalidConfig(
180                        "can_use_tool callback requires permission_prompt_tool_name to be 'stdio' or unset. \
181                        Custom permission_prompt_tool_name is incompatible with can_use_tool callback."
182                            .to_string(),
183                    ));
184        }
185
186        // Prepare hooks for initialization
187        // Build efficiency hooks if configured
188        let efficiency_hooks = self
189            .options
190            .efficiency
191            .as_ref()
192            .map(build_efficiency_hooks)
193            .unwrap_or_default();
194
195        // Merge user hooks with efficiency hooks
196        let merged_hooks = merge_hooks(self.options.hooks.clone(), efficiency_hooks);
197
198        // Convert hooks to internal format
199        let hooks = merged_hooks.as_ref().map(|hooks_map| {
200            hooks_map
201                .iter()
202                .map(|(event, matchers)| {
203                    let event_name = match event {
204                        HookEvent::PreToolUse => "PreToolUse",
205                        HookEvent::PostToolUse => "PostToolUse",
206                        HookEvent::PostToolUseFailure => "PostToolUseFailure",
207                        HookEvent::UserPromptSubmit => "UserPromptSubmit",
208                        HookEvent::Stop => "Stop",
209                        HookEvent::SubagentStop => "SubagentStop",
210                        HookEvent::PreCompact => "PreCompact",
211                        HookEvent::Notification => "Notification",
212                        HookEvent::SubagentStart => "SubagentStart",
213                        HookEvent::PermissionRequest => "PermissionRequest",
214                    };
215                    (event_name.to_string(), matchers.clone())
216                })
217                .collect()
218        });
219
220        // Extract SDK MCP servers from options
221        let sdk_mcp_servers =
222            if let crate::types::mcp::McpServers::Dict(servers_dict) = &self.options.mcp_servers {
223                servers_dict
224                    .iter()
225                    .filter_map(|(name, config)| {
226                        if let crate::types::mcp::McpServerConfig::Sdk(sdk_config) = config {
227                            Some((name.clone(), sdk_config.clone()))
228                        } else {
229                            None
230                        }
231                    })
232                    .collect()
233            } else {
234                std::collections::HashMap::new()
235            };
236
237        // Clone options for the transport factory
238        let options_clone = options.clone();
239
240        // Create QueryManager with a transport factory
241        // The factory creates new transports for each isolated query
242        // Note: connect() is called later in create_query(), not here
243        let mut query_manager = QueryManager::new(move || {
244            let prompt = QueryPrompt::Streaming;
245            let transport = SubprocessTransport::new(prompt, options_clone.clone())?;
246            Ok(Box::new(transport) as Box<dyn Transport>)
247        });
248
249        // Set control request timeout
250        query_manager.set_control_request_timeout(self.options.control_request_timeout);
251
252        // Set configuration on QueryManager
253        query_manager.set_hooks(hooks).await;
254        query_manager.set_sdk_mcp_servers(sdk_mcp_servers).await;
255        query_manager
256            .set_can_use_tool(self.options.can_use_tool.clone())
257            .await;
258
259        // Serialize agents to JSON Value for the initialize request body
260        // This avoids OS ARG_MAX limits from passing large agent definitions via CLI flags
261        if let Some(ref agents) = self.options.agents {
262            let agents_value = serde_json::to_value(agents).map_err(|e| {
263                ClaudeError::InvalidConfig(format!("Failed to serialize agents: {}", e))
264            })?;
265            query_manager.set_agents(Some(agents_value)).await;
266        }
267
268        let query_manager = Arc::new(query_manager);
269
270        // Create the first query for initialization
271        let first_query_id = query_manager.create_query().await?;
272
273        // Start cleanup task for inactive queries
274        Arc::clone(&query_manager).start_cleanup_task();
275
276        self.query_manager = Some(query_manager);
277        self.current_query_id = Some(first_query_id);
278        self.connected = true;
279
280        info!("Successfully connected to Claude Code CLI with QueryManager");
281        Ok(())
282    }
283
284    /// Send a query to Claude
285    ///
286    /// This sends a new user prompt to Claude. Claude will remember the context
287    /// of previous queries within the same session.
288    ///
289    /// # Arguments
290    ///
291    /// * `prompt` - The user prompt to send
292    ///
293    /// # Errors
294    ///
295    /// Returns an error if the client is not connected or if sending fails.
296    ///
297    /// # Example
298    ///
299    /// ```no_run
300    /// # use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions};
301    /// # #[tokio::main]
302    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
303    /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
304    /// # client.connect().await?;
305    /// client.query("What is 2 + 2?").await?;
306    /// # Ok(())
307    /// # }
308    /// ```
309    #[instrument(
310        name = "claude.client.query",
311        skip(self, prompt),
312        fields(session_id = "default",)
313    )]
314    pub async fn query(&mut self, prompt: impl Into<String>) -> Result<()> {
315        self.query_with_session(prompt, "default").await
316    }
317
318    /// Send a query to Claude with a specific session ID
319    ///
320    /// This sends a new user prompt to Claude. Different session IDs maintain
321    /// separate conversation contexts.
322    ///
323    /// # Arguments
324    ///
325    /// * `prompt` - The user prompt to send
326    /// * `session_id` - Session identifier for the conversation
327    ///
328    /// # Errors
329    ///
330    /// Returns an error if the client is not connected or if sending fails.
331    ///
332    /// # Example
333    ///
334    /// ```no_run
335    /// # use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions};
336    /// # #[tokio::main]
337    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
338    /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
339    /// # client.connect().await?;
340    /// // Separate conversation contexts
341    /// client.query_with_session("First question", "session-1").await?;
342    /// client.query_with_session("Different question", "session-2").await?;
343    /// # Ok(())
344    /// # }
345    /// ```
346    pub async fn query_with_session(
347        &mut self,
348        prompt: impl Into<String>,
349        session_id: impl Into<String>,
350    ) -> Result<()> {
351        let query_manager = self.query_manager.as_ref().ok_or_else(|| {
352            ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
353        })?;
354
355        let prompt_str = prompt.into();
356        let session_id_str = session_id.into();
357
358        // Get or create query for this session (reuses existing query to maintain context)
359        let query_id = query_manager
360            .get_or_create_session_query(&session_id_str)
361            .await?;
362        self.current_query_id = Some(query_id.clone());
363
364        // Get the isolated query
365        let query = query_manager.get_query(&query_id)?;
366
367        // Format as JSON message for stream-json input format
368        let user_message = serde_json::json!({
369            "type": "user",
370            "message": {
371                "role": "user",
372                "content": prompt_str
373            },
374            "session_id": session_id_str
375        });
376
377        let message_str = serde_json::to_string(&user_message).map_err(|e| {
378            ClaudeError::Transport(format!("Failed to serialize user message: {}", e))
379        })?;
380
381        // Write directly to stdin (bypasses transport lock)
382        let stdin = query.stdin.clone();
383
384        if let Some(stdin_arc) = stdin {
385            let mut stdin_guard = stdin_arc.lock().await;
386            if let Some(ref mut stdin_stream) = *stdin_guard {
387                stdin_stream
388                    .write_all(message_str.as_bytes())
389                    .await
390                    .map_err(|e| ClaudeError::Transport(format!("Failed to write query: {}", e)))?;
391                stdin_stream.write_all(b"\n").await.map_err(|e| {
392                    ClaudeError::Transport(format!("Failed to write newline: {}", e))
393                })?;
394                stdin_stream
395                    .flush()
396                    .await
397                    .map_err(|e| ClaudeError::Transport(format!("Failed to flush: {}", e)))?;
398            } else {
399                return Err(ClaudeError::Transport("stdin not available".to_string()));
400            }
401        } else {
402            return Err(ClaudeError::Transport("stdin not set".to_string()));
403        }
404
405        debug!(
406            query_id = %query_id,
407            session_id = %session_id_str,
408            "Sent query to isolated query"
409        );
410
411        Ok(())
412    }
413
414    /// Send a query with structured content blocks (supports images)
415    ///
416    /// This method enables multimodal queries in bidirectional streaming mode.
417    /// Use it to send images alongside text for vision-related tasks.
418    ///
419    /// # Arguments
420    ///
421    /// * `content` - A vector of content blocks (text and/or images)
422    ///
423    /// # Errors
424    ///
425    /// Returns an error if:
426    /// - The content vector is empty (must include at least one text or image block)
427    /// - The client is not connected (call `connect()` first)
428    /// - Sending the message fails
429    ///
430    /// # Example
431    ///
432    /// ```no_run
433    /// # use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions, UserContentBlock};
434    /// # #[tokio::main]
435    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
436    /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
437    /// # client.connect().await?;
438    /// let base64_data = "iVBORw0KGgo..."; // base64 encoded image
439    /// client.query_with_content(vec![
440    ///     UserContentBlock::text("What's in this image?"),
441    ///     UserContentBlock::image_base64("image/png", base64_data)?,
442    /// ]).await?;
443    /// # Ok(())
444    /// # }
445    /// ```
446    pub async fn query_with_content(
447        &mut self,
448        content: impl Into<Vec<UserContentBlock>>,
449    ) -> Result<()> {
450        self.query_with_content_and_session(content, "default")
451            .await
452    }
453
454    /// Send a query with structured content blocks and a specific session ID
455    ///
456    /// This method enables multimodal queries with session management for
457    /// maintaining separate conversation contexts.
458    ///
459    /// # Arguments
460    ///
461    /// * `content` - A vector of content blocks (text and/or images)
462    /// * `session_id` - Session identifier for the conversation
463    ///
464    /// # Errors
465    ///
466    /// Returns an error if:
467    /// - The content vector is empty (must include at least one text or image block)
468    /// - The client is not connected (call `connect()` first)
469    /// - Sending the message fails
470    ///
471    /// # Example
472    ///
473    /// ```no_run
474    /// # use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions, UserContentBlock};
475    /// # #[tokio::main]
476    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
477    /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
478    /// # client.connect().await?;
479    /// client.query_with_content_and_session(
480    ///     vec![
481    ///         UserContentBlock::text("Analyze this chart"),
482    ///         UserContentBlock::image_url("https://example.com/chart.png"),
483    ///     ],
484    ///     "analysis-session",
485    /// ).await?;
486    /// # Ok(())
487    /// # }
488    /// ```
489    pub async fn query_with_content_and_session(
490        &mut self,
491        content: impl Into<Vec<UserContentBlock>>,
492        session_id: impl Into<String>,
493    ) -> Result<()> {
494        let query_manager = self.query_manager.as_ref().ok_or_else(|| {
495            ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
496        })?;
497
498        let content_blocks: Vec<UserContentBlock> = content.into();
499        UserContentBlock::validate_content(&content_blocks)?;
500
501        let session_id_str = session_id.into();
502
503        // Get or create query for this session (reuses existing query to maintain context)
504        let query_id = query_manager
505            .get_or_create_session_query(&session_id_str)
506            .await?;
507        self.current_query_id = Some(query_id.clone());
508
509        // Get the isolated query
510        let query = query_manager.get_query(&query_id)?;
511
512        // Format as JSON message for stream-json input format
513        // Content is an array of content blocks, not a simple string
514        let user_message = serde_json::json!({
515            "type": "user",
516            "message": {
517                "role": "user",
518                "content": content_blocks
519            },
520            "session_id": session_id_str
521        });
522
523        let message_str = serde_json::to_string(&user_message).map_err(|e| {
524            ClaudeError::Transport(format!("Failed to serialize user message: {}", e))
525        })?;
526
527        // Write directly to stdin (bypasses transport lock)
528        let stdin = query.stdin.clone();
529
530        if let Some(stdin_arc) = stdin {
531            let mut stdin_guard = stdin_arc.lock().await;
532            if let Some(ref mut stdin_stream) = *stdin_guard {
533                stdin_stream
534                    .write_all(message_str.as_bytes())
535                    .await
536                    .map_err(|e| ClaudeError::Transport(format!("Failed to write query: {}", e)))?;
537                stdin_stream.write_all(b"\n").await.map_err(|e| {
538                    ClaudeError::Transport(format!("Failed to write newline: {}", e))
539                })?;
540                stdin_stream
541                    .flush()
542                    .await
543                    .map_err(|e| ClaudeError::Transport(format!("Failed to flush: {}", e)))?;
544            } else {
545                return Err(ClaudeError::Transport("stdin not available".to_string()));
546            }
547        } else {
548            return Err(ClaudeError::Transport("stdin not set".to_string()));
549        }
550
551        debug!(
552            query_id = %query_id,
553            session_id = %session_id_str,
554            "Sent content query to isolated query"
555        );
556
557        Ok(())
558    }
559
560    /// Receive all messages as a stream (continuous)
561    ///
562    /// This method returns a stream that yields all messages from Claude
563    /// indefinitely until the stream is closed or an error occurs.
564    ///
565    /// Use this when you want to process all messages, including multiple
566    /// responses and system events.
567    ///
568    /// # Returns
569    ///
570    /// A stream of `Result<Message>` that continues until the connection closes.
571    ///
572    /// # Example
573    ///
574    /// ```no_run
575    /// # use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions};
576    /// # use futures::StreamExt;
577    /// # #[tokio::main]
578    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
579    /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
580    /// # client.connect().await?;
581    /// # client.query("Hello").await?;
582    /// let mut stream = client.receive_messages();
583    /// while let Some(message) = stream.next().await {
584    ///     println!("Received: {:?}", message?);
585    /// }
586    /// # Ok(())
587    /// # }
588    /// ```
589    pub fn receive_messages(&self) -> Pin<Box<dyn Stream<Item = Result<Message>> + Send + '_>> {
590        let query_manager = match &self.query_manager {
591            Some(qm) => Arc::clone(qm),
592            None => {
593                return Box::pin(futures::stream::once(async {
594                    Err(ClaudeError::InvalidConfig(
595                        "Client not connected. Call connect() first.".to_string(),
596                    ))
597                }));
598            }
599        };
600
601        let query_id = match &self.current_query_id {
602            Some(id) => id.clone(),
603            None => {
604                return Box::pin(futures::stream::once(async {
605                    Err(ClaudeError::InvalidConfig(
606                        "No active query. Call query() first.".to_string(),
607                    ))
608                }));
609            }
610        };
611
612        Box::pin(async_stream::stream! {
613            // Get the isolated query and its message receiver
614            let query = match query_manager.get_query(&query_id) {
615                Ok(q) => q,
616                Err(e) => {
617                    yield Err(e);
618                    return;
619                }
620            };
621
622            let rx: Arc<Mutex<tokio::sync::mpsc::UnboundedReceiver<serde_json::Value>>> = {
623                Arc::clone(&query.message_rx)
624            };
625
626            loop {
627                let message = {
628                    let mut rx_guard = rx.lock().await;
629                    rx_guard.recv().await
630                };
631
632                match message {
633                    Some(json) => {
634                        match MessageParser::parse(json) {
635                            Ok(msg) => yield Ok(msg),
636                            Err(e) => {
637                                eprintln!("Failed to parse message: {}", e);
638                                yield Err(e);
639                            }
640                        }
641                    }
642                    None => break,
643                }
644            }
645        })
646    }
647
648    /// Receive messages until a ResultMessage
649    ///
650    /// This method returns a stream that yields messages until it encounters
651    /// a `ResultMessage`, which signals the completion of a Claude response.
652    ///
653    /// This is the most common pattern for handling Claude responses, as it
654    /// processes one complete "turn" of the conversation.
655    ///
656    /// This method uses query-scoped message channels to ensure message isolation,
657    /// preventing late-arriving ResultMessages from being consumed by the wrong prompt.
658    ///
659    /// # Returns
660    ///
661    /// A stream of `Result<Message>` that ends when a ResultMessage is received.
662    ///
663    /// # Example
664    ///
665    /// ```no_run
666    /// # use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions, Message};
667    /// # use futures::StreamExt;
668    /// # #[tokio::main]
669    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
670    /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
671    /// # client.connect().await?;
672    /// # client.query("Hello").await?;
673    /// let mut stream = client.receive_response();
674    /// while let Some(message) = stream.next().await {
675    ///     match message? {
676    ///         Message::Assistant(msg) => println!("Assistant: {:?}", msg),
677    ///         Message::Result(result) => {
678    ///             println!("Done! Cost: ${:?}", result.total_cost_usd);
679    ///             break;
680    ///         }
681    ///         _ => {}
682    ///     }
683    /// }
684    /// # Ok(())
685    /// # }
686    /// ```
687    pub fn receive_response(&self) -> Pin<Box<dyn Stream<Item = Result<Message>> + Send + '_>> {
688        let query_manager = match &self.query_manager {
689            Some(qm) => Arc::clone(qm),
690            None => {
691                return Box::pin(futures::stream::once(async {
692                    Err(ClaudeError::InvalidConfig(
693                        "Client not connected. Call connect() first.".to_string(),
694                    ))
695                }));
696            }
697        };
698
699        let query_id = match &self.current_query_id {
700            Some(id) => id.clone(),
701            None => {
702                return Box::pin(futures::stream::once(async {
703                    Err(ClaudeError::InvalidConfig(
704                        "No active query. Call query() first.".to_string(),
705                    ))
706                }));
707            }
708        };
709
710        Box::pin(async_stream::stream! {
711            // ====================================================================
712            // ISOLATED QUERY MESSAGE CHANNEL (Codex-style)
713            // ====================================================================
714            // In the Codex-style architecture, each query has its own completely
715            // isolated QueryFull instance. We get the message receiver directly
716            // from the isolated query, eliminating the need for routing logic.
717            //
718            // This provides:
719            // - Complete message isolation
720            // - No possibility of message confusion
721            // - Simpler architecture without routing overhead
722            //
723            // Note: Cleanup is handled by the periodic cleanup task in QueryManager,
724            // which removes inactive queries whose channels have been closed.
725
726            debug!(
727                query_id = %query_id,
728                "Getting message receiver from isolated query"
729            );
730
731            // Get the isolated query and its message receiver
732            let query = match query_manager.get_query(&query_id) {
733                Ok(q) => q,
734                Err(e) => {
735                    yield Err(e);
736                    return;
737                }
738            };
739
740            // Get the message receiver from the isolated query
741            // In isolated mode, we directly access the message_rx without routing
742            let rx: Arc<Mutex<tokio::sync::mpsc::UnboundedReceiver<serde_json::Value>>> = {
743                Arc::clone(&query.message_rx)
744            };
745
746            loop {
747                let message = {
748                    let mut rx_guard = rx.lock().await;
749                    rx_guard.recv().await
750                };
751
752                match message {
753                    Some(json) => {
754                        match MessageParser::parse(json) {
755                            Ok(msg) => {
756                                let is_result = matches!(msg, Message::Result(_));
757                                yield Ok(msg);
758                                if is_result {
759                                    debug!(
760                                        query_id = %query_id,
761                                        "Received ResultMessage, ending stream"
762                                    );
763                                    // Cleanup will be handled by the periodic cleanup task
764                                    // when the query becomes inactive
765                                    break;
766                                }
767                            }
768                            Err(e) => {
769                                eprintln!("Failed to parse message: {}", e);
770                                yield Err(e);
771                            }
772                        }
773                    }
774                    None => {
775                        debug!(
776                            query_id = %query_id,
777                            "Isolated query channel closed"
778                        );
779                        // Cleanup will be handled by the periodic cleanup task
780                        break;
781                    }
782                }
783            }
784        })
785    }
786
787    /// Send an interrupt signal to stop the current Claude operation
788    ///
789    /// This is analogous to Python's `client.interrupt()`.
790    ///
791    /// # Errors
792    ///
793    /// Returns an error if the client is not connected or if sending fails.
794    pub async fn interrupt(&self) -> Result<()> {
795        let query_manager = self.query_manager.as_ref().ok_or_else(|| {
796            ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
797        })?;
798
799        let query_id = self.current_query_id.as_ref().ok_or_else(|| {
800            ClaudeError::InvalidConfig("No active query. Call query() first.".to_string())
801        })?;
802
803        let query = query_manager.get_query(query_id)?;
804        query.interrupt().await
805    }
806
807    /// Change the permission mode dynamically
808    ///
809    /// This is analogous to Python's `client.set_permission_mode()`.
810    ///
811    /// # Arguments
812    ///
813    /// * `mode` - The new permission mode to set
814    ///
815    /// # Errors
816    ///
817    /// Returns an error if the client is not connected or if sending fails.
818    pub async fn set_permission_mode(&self, mode: PermissionMode) -> Result<()> {
819        let query_manager = self.query_manager.as_ref().ok_or_else(|| {
820            ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
821        })?;
822
823        let query_id = self.current_query_id.as_ref().ok_or_else(|| {
824            ClaudeError::InvalidConfig("No active query. Call query() first.".to_string())
825        })?;
826
827        let query = query_manager.get_query(query_id)?;
828        query.set_permission_mode(mode).await
829    }
830
831    /// Change the AI model dynamically
832    ///
833    /// This is analogous to Python's `client.set_model()`.
834    ///
835    /// # Arguments
836    ///
837    /// * `model` - The new model name, or None to use default
838    ///
839    /// # Errors
840    ///
841    /// Returns an error if the client is not connected or if sending fails.
842    pub async fn set_model(&self, model: Option<&str>) -> Result<()> {
843        let query_manager = self.query_manager.as_ref().ok_or_else(|| {
844            ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
845        })?;
846
847        let query_id = self.current_query_id.as_ref().ok_or_else(|| {
848            ClaudeError::InvalidConfig("No active query. Call query() first.".to_string())
849        })?;
850
851        let query = query_manager.get_query(query_id)?;
852        query.set_model(model).await
853    }
854
855    /// Rewind tracked files to their state at a specific user message.
856    ///
857    /// This is analogous to Python's `client.rewind_files()`.
858    ///
859    /// # Requirements
860    ///
861    /// - `enable_file_checkpointing=true` in options to track file changes
862    /// - `extra_args={"replay-user-messages": None}` to receive UserMessage
863    ///   objects with `uuid` in the response stream
864    ///
865    /// # Arguments
866    ///
867    /// * `user_message_id` - UUID of the user message to rewind to. This should be
868    ///   the `uuid` field from a `UserMessage` received during the conversation.
869    ///
870    /// # Errors
871    ///
872    /// Returns an error if the client is not connected or if sending fails.
873    ///
874    /// # Example
875    ///
876    /// ```no_run
877    /// # use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions, Message};
878    /// # use std::collections::HashMap;
879    /// # #[tokio::main]
880    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
881    /// let options = ClaudeAgentOptions::builder()
882    ///     .enable_file_checkpointing(true)
883    ///     .extra_args(HashMap::from([("replay-user-messages".to_string(), None)]))
884    ///     .build();
885    /// let mut client = ClaudeClient::new(options);
886    /// client.connect().await?;
887    ///
888    /// client.query("Make some changes to my files").await?;
889    /// let mut checkpoint_id = None;
890    /// {
891    ///     let mut stream = client.receive_response();
892    ///     use futures::StreamExt;
893    ///     while let Some(Ok(msg)) = stream.next().await {
894    ///         if let Message::User(user_msg) = &msg {
895    ///             if let Some(uuid) = &user_msg.uuid {
896    ///                 checkpoint_id = Some(uuid.clone());
897    ///             }
898    ///         }
899    ///     }
900    /// }
901    ///
902    /// // Later, rewind to that point
903    /// if let Some(id) = checkpoint_id {
904    ///     client.rewind_files(&id).await?;
905    /// }
906    /// # Ok(())
907    /// # }
908    /// ```
909    pub async fn rewind_files(&self, user_message_id: &str) -> Result<()> {
910        let query_manager = self.query_manager.as_ref().ok_or_else(|| {
911            ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
912        })?;
913
914        let query_id = self.current_query_id.as_ref().ok_or_else(|| {
915            ClaudeError::InvalidConfig("No active query. Call query() first.".to_string())
916        })?;
917
918        let query = query_manager.get_query(query_id)?;
919        query.rewind_files(user_message_id).await
920    }
921
922    /// Get server initialization info including available commands and output styles
923    ///
924    /// Returns initialization information from the Claude Code server including:
925    /// - Available commands (slash commands, system commands, etc.)
926    /// - Current and available output styles
927    /// - Server capabilities
928    ///
929    /// This is analogous to Python's `client.get_server_info()`.
930    ///
931    /// # Returns
932    ///
933    /// Dictionary with server info, or None if not connected
934    ///
935    /// # Example
936    ///
937    /// ```no_run
938    /// # use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions};
939    /// # #[tokio::main]
940    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
941    /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
942    /// # client.connect().await?;
943    /// if let Some(info) = client.get_server_info().await {
944    ///     println!("Commands available: {}", info.get("commands").map(|c| c.as_array().map(|a| a.len()).unwrap_or(0)).unwrap_or(0));
945    ///     println!("Output style: {:?}", info.get("output_style"));
946    /// }
947    /// # Ok(())
948    /// # }
949    /// ```
950    pub async fn get_server_info(&self) -> Option<serde_json::Value> {
951        let query_manager = self.query_manager.as_ref()?;
952        let query_id = self.current_query_id.as_ref()?;
953        let Ok(query) = query_manager.get_query(query_id) else {
954            return None;
955        };
956        query.get_initialization_result().await
957    }
958
959    /// Get current MCP server connection status
960    ///
961    /// Queries the Claude Code CLI for the live connection status of all
962    /// configured MCP servers.
963    ///
964    /// This is analogous to Python's `client.get_mcp_status()`.
965    ///
966    /// # Returns
967    ///
968    /// Dictionary with MCP server status information. Contains a
969    /// `mcpServers` key with a list of server status objects, each having:
970    /// - `name`: Server name
971    /// - `status`: Connection status (`connected`, `pending`, `failed`,
972    ///   `needs-auth`, `disabled`)
973    ///
974    /// # Errors
975    ///
976    /// Returns an error if the client is not connected or if the request fails.
977    ///
978    /// # Example
979    ///
980    /// ```no_run
981    /// # use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions};
982    /// # #[tokio::main]
983    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
984    /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
985    /// # client.connect().await?;
986    /// let status = client.get_mcp_status().await?;
987    /// println!("MCP status: {:?}", status);
988    /// # Ok(())
989    /// # }
990    /// ```
991    pub async fn get_mcp_status(&self) -> Result<serde_json::Value> {
992        let query_manager = self.query_manager.as_ref().ok_or_else(|| {
993            ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
994        })?;
995
996        let query_id = self.current_query_id.as_ref().ok_or_else(|| {
997            ClaudeError::InvalidConfig("No active query. Call query() first.".to_string())
998        })?;
999
1000        let query = query_manager.get_query(query_id)?;
1001        query.get_mcp_status().await
1002    }
1003
1004    /// Start a new session by switching to a different session ID
1005    ///
1006    /// This is a convenience method that creates a new conversation context.
1007    /// It's equivalent to calling `query_with_session()` with a new session ID.
1008    ///
1009    /// To completely clear memory and start fresh, use `ClaudeAgentOptions::builder().fork_session(true).build()`
1010    /// when creating a new client.
1011    ///
1012    /// # Arguments
1013    ///
1014    /// * `session_id` - The new session ID to use
1015    /// * `prompt` - Initial message for the new session
1016    ///
1017    /// # Errors
1018    ///
1019    /// Returns an error if the client is not connected or if sending fails.
1020    ///
1021    /// # Example
1022    ///
1023    /// ```no_run
1024    /// # use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions};
1025    /// # #[tokio::main]
1026    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
1027    /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
1028    /// # client.connect().await?;
1029    /// // First conversation
1030    /// client.query("Hello").await?;
1031    ///
1032    /// // Start new conversation with different context
1033    /// client.new_session("session-2", "Tell me about Rust").await?;
1034    /// # Ok(())
1035    /// # }
1036    /// ```
1037    pub async fn new_session(
1038        &mut self,
1039        session_id: impl Into<String>,
1040        prompt: impl Into<String>,
1041    ) -> Result<()> {
1042        self.query_with_session(prompt, session_id).await
1043    }
1044
1045    /// Disconnect from Claude (analogous to Python's __aexit__)
1046    ///
1047    /// This cleanly shuts down the connection to Claude Code CLI.
1048    ///
1049    /// # Errors
1050    ///
1051    /// Returns an error if disconnection fails.
1052    #[instrument(name = "claude.client.disconnect", skip(self))]
1053    pub async fn disconnect(&mut self) -> Result<()> {
1054        if !self.connected {
1055            debug!("Client already disconnected");
1056            return Ok(());
1057        }
1058
1059        info!("Disconnecting from Claude Code CLI (closing all isolated queries)");
1060
1061        if let Some(query_manager) = self.query_manager.take() {
1062            // Get all queries for cleanup
1063            let queries = query_manager.get_all_queries();
1064
1065            // Close all isolated queries by closing their resources
1066            // This signals each CLI process to exit
1067            for (_query_id, query) in &queries {
1068                // Close stdin (if available)
1069                if let Some(ref stdin_arc) = query.stdin {
1070                    let mut stdin_guard = stdin_arc.lock().await;
1071                    if let Some(mut stdin_stream) = stdin_guard.take() {
1072                        let _ = stdin_stream.shutdown().await;
1073                    }
1074                }
1075
1076                // Close transport
1077                let transport = Arc::clone(&query.transport);
1078                let mut transport_guard = transport.lock().await;
1079                let _ = transport_guard.close().await;
1080            }
1081
1082            // Give background tasks a moment to finish reading and release locks
1083            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1084        }
1085
1086        self.current_query_id = None;
1087        self.connected = false;
1088        debug!("Disconnected successfully");
1089        Ok(())
1090    }
1091}
1092
1093impl Drop for ClaudeClient {
1094    fn drop(&mut self) {
1095        // Note: We can't run async code in Drop, so we can't guarantee clean shutdown
1096        // Users should call disconnect() explicitly
1097        if self.connected {
1098            eprintln!(
1099                "Warning: ClaudeClient dropped without calling disconnect(). Resources may not be cleaned up properly."
1100            );
1101        }
1102    }
1103}
1104
1105#[cfg(test)]
1106mod tests {
1107    use super::*;
1108    use crate::types::permissions::{PermissionResult, PermissionResultAllow};
1109    use std::sync::Arc;
1110
1111    #[tokio::test]
1112    async fn test_connect_rejects_can_use_tool_with_custom_permission_tool() {
1113        let callback: crate::types::permissions::CanUseToolCallback =
1114            Arc::new(|_tool_name, _tool_input, _context| {
1115                Box::pin(async move { PermissionResult::Allow(PermissionResultAllow::default()) })
1116            });
1117
1118        let opts = ClaudeAgentOptions::builder()
1119            .can_use_tool(callback)
1120            .permission_prompt_tool_name("custom_tool") // Not "stdio"
1121            .build();
1122
1123        let mut client = ClaudeClient::new(opts);
1124        let result = client.connect().await;
1125
1126        assert!(result.is_err());
1127        let err = result.unwrap_err();
1128        assert!(matches!(err, ClaudeError::InvalidConfig(_)));
1129        assert!(err.to_string().contains("permission_prompt_tool_name"));
1130    }
1131
1132    #[tokio::test]
1133    async fn test_connect_accepts_can_use_tool_with_stdio() {
1134        let callback: crate::types::permissions::CanUseToolCallback =
1135            Arc::new(|_tool_name, _tool_input, _context| {
1136                Box::pin(async move { PermissionResult::Allow(PermissionResultAllow::default()) })
1137            });
1138
1139        let opts = ClaudeAgentOptions::builder()
1140            .can_use_tool(callback)
1141            .permission_prompt_tool_name("stdio") // Explicitly "stdio" is OK
1142            .build();
1143
1144        let mut client = ClaudeClient::new(opts);
1145        // This will fail later (CLI not found), but should pass validation
1146        let result = client.connect().await;
1147
1148        // Should not be InvalidConfig error about permission_prompt_tool_name
1149        if let Err(ref err) = result {
1150            assert!(
1151                !err.to_string().contains("permission_prompt_tool_name"),
1152                "Should not fail on permission_prompt_tool_name validation"
1153            );
1154        }
1155    }
1156
1157    #[tokio::test]
1158    async fn test_connect_accepts_can_use_tool_without_permission_tool() {
1159        let callback: crate::types::permissions::CanUseToolCallback =
1160            Arc::new(|_tool_name, _tool_input, _context| {
1161                Box::pin(async move { PermissionResult::Allow(PermissionResultAllow::default()) })
1162            });
1163
1164        let opts = ClaudeAgentOptions::builder()
1165            .can_use_tool(callback)
1166            // No permission_prompt_tool_name set - defaults to stdio
1167            .build();
1168
1169        let mut client = ClaudeClient::new(opts);
1170        // This will fail later (CLI not found), but should pass validation
1171        let result = client.connect().await;
1172
1173        // Should not be InvalidConfig error about permission_prompt_tool_name
1174        if let Err(ref err) = result {
1175            assert!(
1176                !err.to_string().contains("permission_prompt_tool_name"),
1177                "Should not fail on permission_prompt_tool_name validation"
1178            );
1179        }
1180    }
1181}