Skip to main content

claude_code_agent_sdk/
client.rs

1//! ClaudeClient for bidirectional streaming interactions with hook support
2
3use tracing::{debug, info, instrument};
4
5use futures::stream::Stream;
6use std::pin::Pin;
7use std::sync::Arc;
8use tokio::io::AsyncWriteExt;
9use tokio::sync::Mutex;
10
11use crate::errors::{ClaudeError, Result};
12use crate::internal::message_parser::MessageParser;
13use crate::internal::query_manager::QueryManager;
14use crate::internal::transport::subprocess::QueryPrompt;
15use crate::internal::transport::{SubprocessTransport, Transport};
16use crate::types::config::{ClaudeAgentOptions, PermissionMode};
17use crate::types::efficiency::{build_efficiency_hooks, merge_hooks};
18use crate::types::hooks::HookEvent;
19use crate::types::messages::{Message, UserContentBlock};
20
21/// Client for bidirectional streaming interactions with Claude
22///
23/// This client provides the same functionality as Python's ClaudeSDKClient,
24/// supporting bidirectional communication, streaming responses, and dynamic
25/// control over the Claude session.
26///
27/// This implementation uses the Codex-style architecture with isolated queries,
28/// where each query has its own completely independent QueryFull instance.
29/// This ensures complete message isolation and prevents ResultMessage confusion.
30///
31/// # Example
32///
33/// ```no_run
34/// use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions};
35/// use futures::StreamExt;
36///
37/// #[tokio::main]
38/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
39///     let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
40///
41///     // Connect to Claude
42///     client.connect().await?;
43///
44///     // Send a query
45///     client.query("Hello Claude!").await?;
46///
47///     // Receive response as a stream
48///     {
49///         let mut stream = client.receive_response();
50///         while let Some(message) = stream.next().await {
51///             println!("Received: {:?}", message?);
52///         }
53///     }
54///
55///     // Disconnect
56///     client.disconnect().await?;
57///     Ok(())
58/// }
59/// ```
60pub struct ClaudeClient {
61    options: ClaudeAgentOptions,
62    /// QueryManager for creating isolated queries (Codex-style architecture)
63    query_manager: Option<Arc<QueryManager>>,
64    /// Current query_id for the active prompt
65    current_query_id: Option<String>,
66    connected: bool,
67}
68
69impl ClaudeClient {
70    /// Create a new ClaudeClient
71    ///
72    /// # Arguments
73    ///
74    /// * `options` - Configuration options for the Claude client
75    ///
76    /// # Example
77    ///
78    /// ```no_run
79    /// use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions};
80    ///
81    /// let client = ClaudeClient::new(ClaudeAgentOptions::default());
82    /// ```
83    pub fn new(options: ClaudeAgentOptions) -> Self {
84        Self {
85            options,
86            query_manager: None,
87            current_query_id: None,
88            connected: false,
89        }
90    }
91
92    /// Create a new ClaudeClient with early validation
93    ///
94    /// Unlike `new()`, this validates the configuration eagerly by attempting
95    /// to create the transport. This catches issues like invalid working directory
96    /// or missing CLI before `connect()` is called.
97    ///
98    /// # Arguments
99    ///
100    /// * `options` - Configuration options for the Claude client
101    ///
102    /// # Errors
103    ///
104    /// Returns an error if:
105    /// - The working directory does not exist or is not a directory
106    /// - Claude CLI cannot be found
107    ///
108    /// # Example
109    ///
110    /// ```no_run
111    /// use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions};
112    ///
113    /// let client = ClaudeClient::try_new(ClaudeAgentOptions::default())?;
114    /// # Ok::<(), claude_code_agent_sdk::ClaudeError>(())
115    /// ```
116    pub fn try_new(options: ClaudeAgentOptions) -> Result<Self> {
117        // Validate by attempting to create transport (but don't keep it)
118        let prompt = QueryPrompt::Streaming;
119        let _ = SubprocessTransport::new(prompt, options.clone())?;
120
121        Ok(Self {
122            options,
123            query_manager: None,
124            current_query_id: None,
125            connected: false,
126        })
127    }
128
129    /// Connect to Claude (analogous to Python's __aenter__)
130    ///
131    /// This establishes the connection to the Claude Code CLI and initializes
132    /// the bidirectional communication channel.
133    ///
134    /// Uses the Codex-style architecture with QueryManager for complete
135    /// message isolation between queries.
136    ///
137    /// # Errors
138    ///
139    /// Returns an error if:
140    /// - Claude CLI cannot be found or started
141    /// - The initialization handshake fails
142    /// - Hook registration fails
143    /// - `can_use_tool` callback is set with incompatible `permission_prompt_tool_name`
144    #[instrument(
145        name = "claude.client.connect",
146        skip(self),
147        fields(
148            has_can_use_tool = self.options.can_use_tool.is_some(),
149            has_hooks = self.options.hooks.is_some(),
150            model = %self.options.model.as_deref().unwrap_or("default"),
151        )
152    )]
153    pub async fn connect(&mut self) -> Result<()> {
154        if self.connected {
155            debug!("Client already connected, skipping");
156            return Ok(());
157        }
158
159        info!("Connecting to Claude Code CLI (using QueryManager for isolated queries)");
160
161        // Automatically set permission_prompt_tool_name to "stdio" when can_use_tool is provided
162        // This matches Python SDK behavior (client.py lines 106-122)
163        // which ensures CLI uses control protocol for permission prompts
164        let mut options = self.options.clone();
165        if options.can_use_tool.is_some() && options.permission_prompt_tool_name.is_none() {
166            info!("can_use_tool callback is set, automatically setting permission_prompt_tool_name to 'stdio'");
167            options.permission_prompt_tool_name = Some("stdio".to_string());
168        }
169
170        // Validate can_use_tool configuration (aligned with Python SDK behavior)
171        // When can_use_tool callback is set, permission_prompt_tool_name must be "stdio"
172        // to ensure the control protocol can handle permission requests
173        if options.can_use_tool.is_some()
174            && let Some(ref tool_name) = options.permission_prompt_tool_name
175            && tool_name != "stdio"
176        {
177            return Err(ClaudeError::InvalidConfig(
178                        "can_use_tool callback requires permission_prompt_tool_name to be 'stdio' or unset. \
179                        Custom permission_prompt_tool_name is incompatible with can_use_tool callback."
180                            .to_string(),
181                    ));
182        }
183
184        // Prepare hooks for initialization
185        // Build efficiency hooks if configured
186        let efficiency_hooks = self
187            .options
188            .efficiency
189            .as_ref()
190            .map(build_efficiency_hooks)
191            .unwrap_or_default();
192
193        // Merge user hooks with efficiency hooks
194        let merged_hooks = merge_hooks(self.options.hooks.clone(), efficiency_hooks);
195
196        // Convert hooks to internal format
197        let hooks = merged_hooks.as_ref().map(|hooks_map| {
198            hooks_map
199                .iter()
200                .map(|(event, matchers)| {
201                    let event_name = match event {
202                        HookEvent::PreToolUse => "PreToolUse",
203                        HookEvent::PostToolUse => "PostToolUse",
204                        HookEvent::PostToolUseFailure => "PostToolUseFailure",
205                        HookEvent::UserPromptSubmit => "UserPromptSubmit",
206                        HookEvent::Stop => "Stop",
207                        HookEvent::SubagentStop => "SubagentStop",
208                        HookEvent::PreCompact => "PreCompact",
209                        HookEvent::Notification => "Notification",
210                        HookEvent::SubagentStart => "SubagentStart",
211                        HookEvent::PermissionRequest => "PermissionRequest",
212                    };
213                    (event_name.to_string(), matchers.clone())
214                })
215                .collect()
216        });
217
218        // Extract SDK MCP servers from options
219        let sdk_mcp_servers =
220            if let crate::types::mcp::McpServers::Dict(servers_dict) = &self.options.mcp_servers {
221                servers_dict
222                    .iter()
223                    .filter_map(|(name, config)| {
224                        if let crate::types::mcp::McpServerConfig::Sdk(sdk_config) = config {
225                            Some((name.clone(), sdk_config.clone()))
226                        } else {
227                            None
228                        }
229                    })
230                    .collect()
231            } else {
232                std::collections::HashMap::new()
233            };
234
235        // Clone options for the transport factory
236        let options_clone = options.clone();
237
238        // Create QueryManager with a transport factory
239        // The factory creates new transports for each isolated query
240        // Note: connect() is called later in create_query(), not here
241        let mut query_manager = QueryManager::new(move || {
242            let prompt = QueryPrompt::Streaming;
243            let transport = SubprocessTransport::new(prompt, options_clone.clone())?;
244            Ok(Box::new(transport) as Box<dyn Transport>)
245        });
246
247        // Set control request timeout
248        query_manager.set_control_request_timeout(self.options.control_request_timeout);
249
250        // Set configuration on QueryManager
251        query_manager.set_hooks(hooks).await;
252        query_manager.set_sdk_mcp_servers(sdk_mcp_servers).await;
253        query_manager.set_can_use_tool(self.options.can_use_tool.clone()).await;
254
255        // Serialize agents to JSON Value for the initialize request body
256        // This avoids OS ARG_MAX limits from passing large agent definitions via CLI flags
257        if let Some(ref agents) = self.options.agents {
258            let agents_value = serde_json::to_value(agents).map_err(|e| {
259                ClaudeError::InvalidConfig(format!("Failed to serialize agents: {}", e))
260            })?;
261            query_manager.set_agents(Some(agents_value)).await;
262        }
263
264        let query_manager = Arc::new(query_manager);
265
266        // Create the first query for initialization
267        let first_query_id = query_manager.create_query().await?;
268
269        // Start cleanup task for inactive queries
270        Arc::clone(&query_manager).start_cleanup_task();
271
272        self.query_manager = Some(query_manager);
273        self.current_query_id = Some(first_query_id);
274        self.connected = true;
275
276        info!("Successfully connected to Claude Code CLI with QueryManager");
277        Ok(())
278    }
279
280    /// Send a query to Claude
281    ///
282    /// This sends a new user prompt to Claude. Claude will remember the context
283    /// of previous queries within the same session.
284    ///
285    /// # Arguments
286    ///
287    /// * `prompt` - The user prompt to send
288    ///
289    /// # Errors
290    ///
291    /// Returns an error if the client is not connected or if sending fails.
292    ///
293    /// # Example
294    ///
295    /// ```no_run
296    /// # use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions};
297    /// # #[tokio::main]
298    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
299    /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
300    /// # client.connect().await?;
301    /// client.query("What is 2 + 2?").await?;
302    /// # Ok(())
303    /// # }
304    /// ```
305    #[instrument(
306        name = "claude.client.query",
307        skip(self, prompt),
308        fields(session_id = "default",)
309    )]
310    pub async fn query(&mut self, prompt: impl Into<String>) -> Result<()> {
311        self.query_with_session(prompt, "default").await
312    }
313
314    /// Send a query to Claude with a specific session ID
315    ///
316    /// This sends a new user prompt to Claude. Different session IDs maintain
317    /// separate conversation contexts.
318    ///
319    /// # Arguments
320    ///
321    /// * `prompt` - The user prompt to send
322    /// * `session_id` - Session identifier for the conversation
323    ///
324    /// # Errors
325    ///
326    /// Returns an error if the client is not connected or if sending fails.
327    ///
328    /// # Example
329    ///
330    /// ```no_run
331    /// # use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions};
332    /// # #[tokio::main]
333    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
334    /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
335    /// # client.connect().await?;
336    /// // Separate conversation contexts
337    /// client.query_with_session("First question", "session-1").await?;
338    /// client.query_with_session("Different question", "session-2").await?;
339    /// # Ok(())
340    /// # }
341    /// ```
342    pub async fn query_with_session(
343        &mut self,
344        prompt: impl Into<String>,
345        session_id: impl Into<String>,
346    ) -> Result<()> {
347        let query_manager = self.query_manager.as_ref().ok_or_else(|| {
348            ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
349        })?;
350
351        let prompt_str = prompt.into();
352        let session_id_str = session_id.into();
353
354        // Get or create query for this session (reuses existing query to maintain context)
355        let query_id = query_manager.get_or_create_session_query(&session_id_str).await?;
356        self.current_query_id = Some(query_id.clone());
357
358        // Get the isolated query
359        let query = query_manager.get_query(&query_id)?;
360
361        // Format as JSON message for stream-json input format
362        let user_message = serde_json::json!({
363            "type": "user",
364            "message": {
365                "role": "user",
366                "content": prompt_str
367            },
368            "session_id": session_id_str
369        });
370
371        let message_str = serde_json::to_string(&user_message).map_err(|e| {
372            ClaudeError::Transport(format!("Failed to serialize user message: {}", e))
373        })?;
374
375        // Write directly to stdin (bypasses transport lock)
376        let stdin = query.stdin.clone();
377
378        if let Some(stdin_arc) = stdin {
379            let mut stdin_guard = stdin_arc.lock().await;
380            if let Some(ref mut stdin_stream) = *stdin_guard {
381                stdin_stream
382                    .write_all(message_str.as_bytes())
383                    .await
384                    .map_err(|e| ClaudeError::Transport(format!("Failed to write query: {}", e)))?;
385                stdin_stream.write_all(b"\n").await.map_err(|e| {
386                    ClaudeError::Transport(format!("Failed to write newline: {}", e))
387                })?;
388                stdin_stream
389                    .flush()
390                    .await
391                    .map_err(|e| ClaudeError::Transport(format!("Failed to flush: {}", e)))?;
392            } else {
393                return Err(ClaudeError::Transport("stdin not available".to_string()));
394            }
395        } else {
396            return Err(ClaudeError::Transport("stdin not set".to_string()));
397        }
398
399        debug!(
400            query_id = %query_id,
401            session_id = %session_id_str,
402            "Sent query to isolated query"
403        );
404
405        Ok(())
406    }
407
408    /// Send a query with structured content blocks (supports images)
409    ///
410    /// This method enables multimodal queries in bidirectional streaming mode.
411    /// Use it to send images alongside text for vision-related tasks.
412    ///
413    /// # Arguments
414    ///
415    /// * `content` - A vector of content blocks (text and/or images)
416    ///
417    /// # Errors
418    ///
419    /// Returns an error if:
420    /// - The content vector is empty (must include at least one text or image block)
421    /// - The client is not connected (call `connect()` first)
422    /// - Sending the message fails
423    ///
424    /// # Example
425    ///
426    /// ```no_run
427    /// # use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions, UserContentBlock};
428    /// # #[tokio::main]
429    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
430    /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
431    /// # client.connect().await?;
432    /// let base64_data = "iVBORw0KGgo..."; // base64 encoded image
433    /// client.query_with_content(vec![
434    ///     UserContentBlock::text("What's in this image?"),
435    ///     UserContentBlock::image_base64("image/png", base64_data)?,
436    /// ]).await?;
437    /// # Ok(())
438    /// # }
439    /// ```
440    pub async fn query_with_content(
441        &mut self,
442        content: impl Into<Vec<UserContentBlock>>,
443    ) -> Result<()> {
444        self.query_with_content_and_session(content, "default")
445            .await
446    }
447
448    /// Send a query with structured content blocks and a specific session ID
449    ///
450    /// This method enables multimodal queries with session management for
451    /// maintaining separate conversation contexts.
452    ///
453    /// # Arguments
454    ///
455    /// * `content` - A vector of content blocks (text and/or images)
456    /// * `session_id` - Session identifier for the conversation
457    ///
458    /// # Errors
459    ///
460    /// Returns an error if:
461    /// - The content vector is empty (must include at least one text or image block)
462    /// - The client is not connected (call `connect()` first)
463    /// - Sending the message fails
464    ///
465    /// # Example
466    ///
467    /// ```no_run
468    /// # use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions, UserContentBlock};
469    /// # #[tokio::main]
470    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
471    /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
472    /// # client.connect().await?;
473    /// client.query_with_content_and_session(
474    ///     vec![
475    ///         UserContentBlock::text("Analyze this chart"),
476    ///         UserContentBlock::image_url("https://example.com/chart.png"),
477    ///     ],
478    ///     "analysis-session",
479    /// ).await?;
480    /// # Ok(())
481    /// # }
482    /// ```
483    pub async fn query_with_content_and_session(
484        &mut self,
485        content: impl Into<Vec<UserContentBlock>>,
486        session_id: impl Into<String>,
487    ) -> Result<()> {
488        let query_manager = self.query_manager.as_ref().ok_or_else(|| {
489            ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
490        })?;
491
492        let content_blocks: Vec<UserContentBlock> = content.into();
493        UserContentBlock::validate_content(&content_blocks)?;
494
495        let session_id_str = session_id.into();
496
497        // Get or create query for this session (reuses existing query to maintain context)
498        let query_id = query_manager.get_or_create_session_query(&session_id_str).await?;
499        self.current_query_id = Some(query_id.clone());
500
501        // Get the isolated query
502        let query = query_manager.get_query(&query_id)?;
503
504        // Format as JSON message for stream-json input format
505        // Content is an array of content blocks, not a simple string
506        let user_message = serde_json::json!({
507            "type": "user",
508            "message": {
509                "role": "user",
510                "content": content_blocks
511            },
512            "session_id": session_id_str
513        });
514
515        let message_str = serde_json::to_string(&user_message).map_err(|e| {
516            ClaudeError::Transport(format!("Failed to serialize user message: {}", e))
517        })?;
518
519        // Write directly to stdin (bypasses transport lock)
520        let stdin = query.stdin.clone();
521
522        if let Some(stdin_arc) = stdin {
523            let mut stdin_guard = stdin_arc.lock().await;
524            if let Some(ref mut stdin_stream) = *stdin_guard {
525                stdin_stream
526                    .write_all(message_str.as_bytes())
527                    .await
528                    .map_err(|e| ClaudeError::Transport(format!("Failed to write query: {}", e)))?;
529                stdin_stream.write_all(b"\n").await.map_err(|e| {
530                    ClaudeError::Transport(format!("Failed to write newline: {}", e))
531                })?;
532                stdin_stream
533                    .flush()
534                    .await
535                    .map_err(|e| ClaudeError::Transport(format!("Failed to flush: {}", e)))?;
536            } else {
537                return Err(ClaudeError::Transport("stdin not available".to_string()));
538            }
539        } else {
540            return Err(ClaudeError::Transport("stdin not set".to_string()));
541        }
542
543        debug!(
544            query_id = %query_id,
545            session_id = %session_id_str,
546            "Sent content query to isolated query"
547        );
548
549        Ok(())
550    }
551
552    /// Receive all messages as a stream (continuous)
553    ///
554    /// This method returns a stream that yields all messages from Claude
555    /// indefinitely until the stream is closed or an error occurs.
556    ///
557    /// Use this when you want to process all messages, including multiple
558    /// responses and system events.
559    ///
560    /// # Returns
561    ///
562    /// A stream of `Result<Message>` that continues until the connection closes.
563    ///
564    /// # Example
565    ///
566    /// ```no_run
567    /// # use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions};
568    /// # use futures::StreamExt;
569    /// # #[tokio::main]
570    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
571    /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
572    /// # client.connect().await?;
573    /// # client.query("Hello").await?;
574    /// let mut stream = client.receive_messages();
575    /// while let Some(message) = stream.next().await {
576    ///     println!("Received: {:?}", message?);
577    /// }
578    /// # Ok(())
579    /// # }
580    /// ```
581    pub fn receive_messages(&self) -> Pin<Box<dyn Stream<Item = Result<Message>> + Send + '_>> {
582        let query_manager = match &self.query_manager {
583            Some(qm) => Arc::clone(qm),
584            None => {
585                return Box::pin(futures::stream::once(async {
586                    Err(ClaudeError::InvalidConfig(
587                        "Client not connected. Call connect() first.".to_string(),
588                    ))
589                }));
590            }
591        };
592
593        let query_id = match &self.current_query_id {
594            Some(id) => id.clone(),
595            None => {
596                return Box::pin(futures::stream::once(async {
597                    Err(ClaudeError::InvalidConfig(
598                        "No active query. Call query() first.".to_string(),
599                    ))
600                }));
601            }
602        };
603
604        Box::pin(async_stream::stream! {
605            // Get the isolated query and its message receiver
606            let query = match query_manager.get_query(&query_id) {
607                Ok(q) => q,
608                Err(e) => {
609                    yield Err(e);
610                    return;
611                }
612            };
613
614            let rx: Arc<Mutex<tokio::sync::mpsc::UnboundedReceiver<serde_json::Value>>> = {
615                Arc::clone(&query.message_rx)
616            };
617
618            loop {
619                let message = {
620                    let mut rx_guard = rx.lock().await;
621                    rx_guard.recv().await
622                };
623
624                match message {
625                    Some(json) => {
626                        match MessageParser::parse(json) {
627                            Ok(msg) => yield Ok(msg),
628                            Err(e) => {
629                                eprintln!("Failed to parse message: {}", e);
630                                yield Err(e);
631                            }
632                        }
633                    }
634                    None => break,
635                }
636            }
637        })
638    }
639
640    /// Receive messages until a ResultMessage
641    ///
642    /// This method returns a stream that yields messages until it encounters
643    /// a `ResultMessage`, which signals the completion of a Claude response.
644    ///
645    /// This is the most common pattern for handling Claude responses, as it
646    /// processes one complete "turn" of the conversation.
647    ///
648    /// This method uses query-scoped message channels to ensure message isolation,
649    /// preventing late-arriving ResultMessages from being consumed by the wrong prompt.
650    ///
651    /// # Returns
652    ///
653    /// A stream of `Result<Message>` that ends when a ResultMessage is received.
654    ///
655    /// # Example
656    ///
657    /// ```no_run
658    /// # use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions, Message};
659    /// # use futures::StreamExt;
660    /// # #[tokio::main]
661    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
662    /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
663    /// # client.connect().await?;
664    /// # client.query("Hello").await?;
665    /// let mut stream = client.receive_response();
666    /// while let Some(message) = stream.next().await {
667    ///     match message? {
668    ///         Message::Assistant(msg) => println!("Assistant: {:?}", msg),
669    ///         Message::Result(result) => {
670    ///             println!("Done! Cost: ${:?}", result.total_cost_usd);
671    ///             break;
672    ///         }
673    ///         _ => {}
674    ///     }
675    /// }
676    /// # Ok(())
677    /// # }
678    /// ```
679    pub fn receive_response(&self) -> Pin<Box<dyn Stream<Item = Result<Message>> + Send + '_>> {
680        let query_manager = match &self.query_manager {
681            Some(qm) => Arc::clone(qm),
682            None => {
683                return Box::pin(futures::stream::once(async {
684                    Err(ClaudeError::InvalidConfig(
685                        "Client not connected. Call connect() first.".to_string(),
686                    ))
687                }));
688            }
689        };
690
691        let query_id = match &self.current_query_id {
692            Some(id) => id.clone(),
693            None => {
694                return Box::pin(futures::stream::once(async {
695                    Err(ClaudeError::InvalidConfig(
696                        "No active query. Call query() first.".to_string(),
697                    ))
698                }));
699            }
700        };
701
702        Box::pin(async_stream::stream! {
703            // ====================================================================
704            // ISOLATED QUERY MESSAGE CHANNEL (Codex-style)
705            // ====================================================================
706            // In the Codex-style architecture, each query has its own completely
707            // isolated QueryFull instance. We get the message receiver directly
708            // from the isolated query, eliminating the need for routing logic.
709            //
710            // This provides:
711            // - Complete message isolation
712            // - No possibility of message confusion
713            // - Simpler architecture without routing overhead
714            //
715            // Note: Cleanup is handled by the periodic cleanup task in QueryManager,
716            // which removes inactive queries whose channels have been closed.
717
718            debug!(
719                query_id = %query_id,
720                "Getting message receiver from isolated query"
721            );
722
723            // Get the isolated query and its message receiver
724            let query = match query_manager.get_query(&query_id) {
725                Ok(q) => q,
726                Err(e) => {
727                    yield Err(e);
728                    return;
729                }
730            };
731
732            // Get the message receiver from the isolated query
733            // In isolated mode, we directly access the message_rx without routing
734            let rx: Arc<Mutex<tokio::sync::mpsc::UnboundedReceiver<serde_json::Value>>> = {
735                Arc::clone(&query.message_rx)
736            };
737
738            loop {
739                let message = {
740                    let mut rx_guard = rx.lock().await;
741                    rx_guard.recv().await
742                };
743
744                match message {
745                    Some(json) => {
746                        match MessageParser::parse(json) {
747                            Ok(msg) => {
748                                let is_result = matches!(msg, Message::Result(_));
749                                yield Ok(msg);
750                                if is_result {
751                                    debug!(
752                                        query_id = %query_id,
753                                        "Received ResultMessage, ending stream"
754                                    );
755                                    // Cleanup will be handled by the periodic cleanup task
756                                    // when the query becomes inactive
757                                    break;
758                                }
759                            }
760                            Err(e) => {
761                                eprintln!("Failed to parse message: {}", e);
762                                yield Err(e);
763                            }
764                        }
765                    }
766                    None => {
767                        debug!(
768                            query_id = %query_id,
769                            "Isolated query channel closed"
770                        );
771                        // Cleanup will be handled by the periodic cleanup task
772                        break;
773                    }
774                }
775            }
776        })
777    }
778
779    /// Send an interrupt signal to stop the current Claude operation
780    ///
781    /// This is analogous to Python's `client.interrupt()`.
782    ///
783    /// # Errors
784    ///
785    /// Returns an error if the client is not connected or if sending fails.
786    pub async fn interrupt(&self) -> Result<()> {
787        let query_manager = self.query_manager.as_ref().ok_or_else(|| {
788            ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
789        })?;
790
791        let query_id = self.current_query_id.as_ref().ok_or_else(|| {
792            ClaudeError::InvalidConfig("No active query. Call query() first.".to_string())
793        })?;
794
795        let query = query_manager.get_query(query_id)?;
796        query.interrupt().await
797    }
798
799    /// Change the permission mode dynamically
800    ///
801    /// This is analogous to Python's `client.set_permission_mode()`.
802    ///
803    /// # Arguments
804    ///
805    /// * `mode` - The new permission mode to set
806    ///
807    /// # Errors
808    ///
809    /// Returns an error if the client is not connected or if sending fails.
810    pub async fn set_permission_mode(&self, mode: PermissionMode) -> Result<()> {
811        let query_manager = self.query_manager.as_ref().ok_or_else(|| {
812            ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
813        })?;
814
815        let query_id = self.current_query_id.as_ref().ok_or_else(|| {
816            ClaudeError::InvalidConfig("No active query. Call query() first.".to_string())
817        })?;
818
819        let query = query_manager.get_query(query_id)?;
820        query.set_permission_mode(mode).await
821    }
822
823    /// Change the AI model dynamically
824    ///
825    /// This is analogous to Python's `client.set_model()`.
826    ///
827    /// # Arguments
828    ///
829    /// * `model` - The new model name, or None to use default
830    ///
831    /// # Errors
832    ///
833    /// Returns an error if the client is not connected or if sending fails.
834    pub async fn set_model(&self, model: Option<&str>) -> Result<()> {
835        let query_manager = self.query_manager.as_ref().ok_or_else(|| {
836            ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
837        })?;
838
839        let query_id = self.current_query_id.as_ref().ok_or_else(|| {
840            ClaudeError::InvalidConfig("No active query. Call query() first.".to_string())
841        })?;
842
843        let query = query_manager.get_query(query_id)?;
844        query.set_model(model).await
845    }
846
847    /// Rewind tracked files to their state at a specific user message.
848    ///
849    /// This is analogous to Python's `client.rewind_files()`.
850    ///
851    /// # Requirements
852    ///
853    /// - `enable_file_checkpointing=true` in options to track file changes
854    /// - `extra_args={"replay-user-messages": None}` to receive UserMessage
855    ///   objects with `uuid` in the response stream
856    ///
857    /// # Arguments
858    ///
859    /// * `user_message_id` - UUID of the user message to rewind to. This should be
860    ///   the `uuid` field from a `UserMessage` received during the conversation.
861    ///
862    /// # Errors
863    ///
864    /// Returns an error if the client is not connected or if sending fails.
865    ///
866    /// # Example
867    ///
868    /// ```no_run
869    /// # use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions, Message};
870    /// # use std::collections::HashMap;
871    /// # #[tokio::main]
872    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
873    /// let options = ClaudeAgentOptions::builder()
874    ///     .enable_file_checkpointing(true)
875    ///     .extra_args(HashMap::from([("replay-user-messages".to_string(), None)]))
876    ///     .build();
877    /// let mut client = ClaudeClient::new(options);
878    /// client.connect().await?;
879    ///
880    /// client.query("Make some changes to my files").await?;
881    /// let mut checkpoint_id = None;
882    /// {
883    ///     let mut stream = client.receive_response();
884    ///     use futures::StreamExt;
885    ///     while let Some(Ok(msg)) = stream.next().await {
886    ///         if let Message::User(user_msg) = &msg {
887    ///             if let Some(uuid) = &user_msg.uuid {
888    ///                 checkpoint_id = Some(uuid.clone());
889    ///             }
890    ///         }
891    ///     }
892    /// }
893    ///
894    /// // Later, rewind to that point
895    /// if let Some(id) = checkpoint_id {
896    ///     client.rewind_files(&id).await?;
897    /// }
898    /// # Ok(())
899    /// # }
900    /// ```
901    pub async fn rewind_files(&self, user_message_id: &str) -> Result<()> {
902        let query_manager = self.query_manager.as_ref().ok_or_else(|| {
903            ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
904        })?;
905
906        let query_id = self.current_query_id.as_ref().ok_or_else(|| {
907            ClaudeError::InvalidConfig("No active query. Call query() first.".to_string())
908        })?;
909
910        let query = query_manager.get_query(query_id)?;
911        query.rewind_files(user_message_id).await
912    }
913
914    /// Get server initialization info including available commands and output styles
915    ///
916    /// Returns initialization information from the Claude Code server including:
917    /// - Available commands (slash commands, system commands, etc.)
918    /// - Current and available output styles
919    /// - Server capabilities
920    ///
921    /// This is analogous to Python's `client.get_server_info()`.
922    ///
923    /// # Returns
924    ///
925    /// Dictionary with server info, or None if not connected
926    ///
927    /// # Example
928    ///
929    /// ```no_run
930    /// # use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions};
931    /// # #[tokio::main]
932    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
933    /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
934    /// # client.connect().await?;
935    /// if let Some(info) = client.get_server_info().await {
936    ///     println!("Commands available: {}", info.get("commands").map(|c| c.as_array().map(|a| a.len()).unwrap_or(0)).unwrap_or(0));
937    ///     println!("Output style: {:?}", info.get("output_style"));
938    /// }
939    /// # Ok(())
940    /// # }
941    /// ```
942    pub async fn get_server_info(&self) -> Option<serde_json::Value> {
943        let query_manager = self.query_manager.as_ref()?;
944        let query_id = self.current_query_id.as_ref()?;
945        let Ok(query) = query_manager.get_query(query_id) else {
946            return None;
947        };
948        query.get_initialization_result().await
949    }
950
951    /// Get current MCP server connection status
952    ///
953    /// Queries the Claude Code CLI for the live connection status of all
954    /// configured MCP servers.
955    ///
956    /// This is analogous to Python's `client.get_mcp_status()`.
957    ///
958    /// # Returns
959    ///
960    /// Dictionary with MCP server status information. Contains a
961    /// `mcpServers` key with a list of server status objects, each having:
962    /// - `name`: Server name
963    /// - `status`: Connection status (`connected`, `pending`, `failed`,
964    ///   `needs-auth`, `disabled`)
965    ///
966    /// # Errors
967    ///
968    /// Returns an error if the client is not connected or if the request fails.
969    ///
970    /// # Example
971    ///
972    /// ```no_run
973    /// # use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions};
974    /// # #[tokio::main]
975    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
976    /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
977    /// # client.connect().await?;
978    /// let status = client.get_mcp_status().await?;
979    /// println!("MCP status: {:?}", status);
980    /// # Ok(())
981    /// # }
982    /// ```
983    pub async fn get_mcp_status(&self) -> Result<serde_json::Value> {
984        let query_manager = self.query_manager.as_ref().ok_or_else(|| {
985            ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
986        })?;
987
988        let query_id = self.current_query_id.as_ref().ok_or_else(|| {
989            ClaudeError::InvalidConfig("No active query. Call query() first.".to_string())
990        })?;
991
992        let query = query_manager.get_query(query_id)?;
993        query.get_mcp_status().await
994    }
995
996    /// Start a new session by switching to a different session ID
997    ///
998    /// This is a convenience method that creates a new conversation context.
999    /// It's equivalent to calling `query_with_session()` with a new session ID.
1000    ///
1001    /// To completely clear memory and start fresh, use `ClaudeAgentOptions::builder().fork_session(true).build()`
1002    /// when creating a new client.
1003    ///
1004    /// # Arguments
1005    ///
1006    /// * `session_id` - The new session ID to use
1007    /// * `prompt` - Initial message for the new session
1008    ///
1009    /// # Errors
1010    ///
1011    /// Returns an error if the client is not connected or if sending fails.
1012    ///
1013    /// # Example
1014    ///
1015    /// ```no_run
1016    /// # use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions};
1017    /// # #[tokio::main]
1018    /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
1019    /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
1020    /// # client.connect().await?;
1021    /// // First conversation
1022    /// client.query("Hello").await?;
1023    ///
1024    /// // Start new conversation with different context
1025    /// client.new_session("session-2", "Tell me about Rust").await?;
1026    /// # Ok(())
1027    /// # }
1028    /// ```
1029    pub async fn new_session(
1030        &mut self,
1031        session_id: impl Into<String>,
1032        prompt: impl Into<String>,
1033    ) -> Result<()> {
1034        self.query_with_session(prompt, session_id).await
1035    }
1036
1037    /// Disconnect from Claude (analogous to Python's __aexit__)
1038    ///
1039    /// This cleanly shuts down the connection to Claude Code CLI.
1040    ///
1041    /// # Errors
1042    ///
1043    /// Returns an error if disconnection fails.
1044    #[instrument(name = "claude.client.disconnect", skip(self))]
1045    pub async fn disconnect(&mut self) -> Result<()> {
1046        if !self.connected {
1047            debug!("Client already disconnected");
1048            return Ok(());
1049        }
1050
1051        info!("Disconnecting from Claude Code CLI (closing all isolated queries)");
1052
1053        if let Some(query_manager) = self.query_manager.take() {
1054            // Get all queries for cleanup
1055            let queries = query_manager.get_all_queries();
1056
1057            // Close all isolated queries by closing their resources
1058            // This signals each CLI process to exit
1059            for (_query_id, query) in &queries {
1060                // Close stdin (if available)
1061                if let Some(ref stdin_arc) = query.stdin {
1062                    let mut stdin_guard = stdin_arc.lock().await;
1063                    if let Some(mut stdin_stream) = stdin_guard.take() {
1064                        let _ = stdin_stream.shutdown().await;
1065                    }
1066                }
1067
1068                // Close transport
1069                let transport = Arc::clone(&query.transport);
1070                let mut transport_guard = transport.lock().await;
1071                let _ = transport_guard.close().await;
1072            }
1073
1074            // Give background tasks a moment to finish reading and release locks
1075            tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1076        }
1077
1078        self.current_query_id = None;
1079        self.connected = false;
1080        debug!("Disconnected successfully");
1081        Ok(())
1082    }
1083}
1084
1085impl Drop for ClaudeClient {
1086    fn drop(&mut self) {
1087        // Note: We can't run async code in Drop, so we can't guarantee clean shutdown
1088        // Users should call disconnect() explicitly
1089        if self.connected {
1090            eprintln!(
1091                "Warning: ClaudeClient dropped without calling disconnect(). Resources may not be cleaned up properly."
1092            );
1093        }
1094    }
1095}
1096
1097#[cfg(test)]
1098mod tests {
1099    use super::*;
1100    use crate::types::permissions::{PermissionResult, PermissionResultAllow};
1101    use std::sync::Arc;
1102
1103    #[tokio::test]
1104    async fn test_connect_rejects_can_use_tool_with_custom_permission_tool() {
1105        let callback: crate::types::permissions::CanUseToolCallback =
1106            Arc::new(|_tool_name, _tool_input, _context| {
1107                Box::pin(async move { PermissionResult::Allow(PermissionResultAllow::default()) })
1108            });
1109
1110        let opts = ClaudeAgentOptions::builder()
1111            .can_use_tool(callback)
1112            .permission_prompt_tool_name("custom_tool") // Not "stdio"
1113            .build();
1114
1115        let mut client = ClaudeClient::new(opts);
1116        let result = client.connect().await;
1117
1118        assert!(result.is_err());
1119        let err = result.unwrap_err();
1120        assert!(matches!(err, ClaudeError::InvalidConfig(_)));
1121        assert!(err.to_string().contains("permission_prompt_tool_name"));
1122    }
1123
1124    #[tokio::test]
1125    async fn test_connect_accepts_can_use_tool_with_stdio() {
1126        let callback: crate::types::permissions::CanUseToolCallback =
1127            Arc::new(|_tool_name, _tool_input, _context| {
1128                Box::pin(async move { PermissionResult::Allow(PermissionResultAllow::default()) })
1129            });
1130
1131        let opts = ClaudeAgentOptions::builder()
1132            .can_use_tool(callback)
1133            .permission_prompt_tool_name("stdio") // Explicitly "stdio" is OK
1134            .build();
1135
1136        let mut client = ClaudeClient::new(opts);
1137        // This will fail later (CLI not found), but should pass validation
1138        let result = client.connect().await;
1139
1140        // Should not be InvalidConfig error about permission_prompt_tool_name
1141        if let Err(ref err) = result {
1142            assert!(
1143                !err.to_string().contains("permission_prompt_tool_name"),
1144                "Should not fail on permission_prompt_tool_name validation"
1145            );
1146        }
1147    }
1148
1149    #[tokio::test]
1150    async fn test_connect_accepts_can_use_tool_without_permission_tool() {
1151        let callback: crate::types::permissions::CanUseToolCallback =
1152            Arc::new(|_tool_name, _tool_input, _context| {
1153                Box::pin(async move { PermissionResult::Allow(PermissionResultAllow::default()) })
1154            });
1155
1156        let opts = ClaudeAgentOptions::builder()
1157            .can_use_tool(callback)
1158            // No permission_prompt_tool_name set - defaults to stdio
1159            .build();
1160
1161        let mut client = ClaudeClient::new(opts);
1162        // This will fail later (CLI not found), but should pass validation
1163        let result = client.connect().await;
1164
1165        // Should not be InvalidConfig error about permission_prompt_tool_name
1166        if let Err(ref err) = result {
1167            assert!(
1168                !err.to_string().contains("permission_prompt_tool_name"),
1169                "Should not fail on permission_prompt_tool_name validation"
1170            );
1171        }
1172    }
1173}