claude_code_agent_sdk/client.rs
1//! ClaudeClient for bidirectional streaming interactions with hook support
2
3use tracing::{debug, info, instrument};
4
5use futures::stream::Stream;
6use std::pin::Pin;
7use std::sync::Arc;
8use tokio::io::AsyncWriteExt;
9use tokio::sync::Mutex;
10
11use crate::errors::{ClaudeError, Result};
12use crate::internal::message_parser::MessageParser;
13use crate::internal::query_manager::QueryManager;
14use crate::internal::transport::subprocess::QueryPrompt;
15use crate::internal::transport::{SubprocessTransport, Transport};
16use crate::types::config::{ClaudeAgentOptions, PermissionMode};
17use crate::types::efficiency::{build_efficiency_hooks, merge_hooks};
18use crate::types::hooks::HookEvent;
19use crate::types::messages::{Message, UserContentBlock};
20
21/// Client for bidirectional streaming interactions with Claude
22///
23/// This client provides the same functionality as Python's ClaudeSDKClient,
24/// supporting bidirectional communication, streaming responses, and dynamic
25/// control over the Claude session.
26///
27/// This implementation uses the Codex-style architecture with isolated queries,
28/// where each query has its own completely independent QueryFull instance.
29/// This ensures complete message isolation and prevents ResultMessage confusion.
30///
31/// # Example
32///
33/// ```no_run
34/// use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions};
35/// use futures::StreamExt;
36///
37/// #[tokio::main]
38/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
39/// let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
40///
41/// // Connect to Claude
42/// client.connect().await?;
43///
44/// // Send a query
45/// client.query("Hello Claude!").await?;
46///
47/// // Receive response as a stream
48/// {
49/// let mut stream = client.receive_response();
50/// while let Some(message) = stream.next().await {
51/// println!("Received: {:?}", message?);
52/// }
53/// }
54///
55/// // Disconnect
56/// client.disconnect().await?;
57/// Ok(())
58/// }
59/// ```
60pub struct ClaudeClient {
61 options: ClaudeAgentOptions,
62 /// QueryManager for creating isolated queries (Codex-style architecture)
63 query_manager: Option<Arc<QueryManager>>,
64 /// Current query_id for the active prompt
65 current_query_id: Option<String>,
66 connected: bool,
67}
68
69impl ClaudeClient {
70 /// Create a new ClaudeClient
71 ///
72 /// # Arguments
73 ///
74 /// * `options` - Configuration options for the Claude client
75 ///
76 /// # Example
77 ///
78 /// ```no_run
79 /// use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions};
80 ///
81 /// let client = ClaudeClient::new(ClaudeAgentOptions::default());
82 /// ```
83 pub fn new(options: ClaudeAgentOptions) -> Self {
84 Self {
85 options,
86 query_manager: None,
87 current_query_id: None,
88 connected: false,
89 }
90 }
91
92 /// Create a new ClaudeClient with early validation
93 ///
94 /// Unlike `new()`, this validates the configuration eagerly by attempting
95 /// to create the transport. This catches issues like invalid working directory
96 /// or missing CLI before `connect()` is called.
97 ///
98 /// # Arguments
99 ///
100 /// * `options` - Configuration options for the Claude client
101 ///
102 /// # Errors
103 ///
104 /// Returns an error if:
105 /// - The working directory does not exist or is not a directory
106 /// - Claude CLI cannot be found
107 ///
108 /// # Example
109 ///
110 /// ```no_run
111 /// use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions};
112 ///
113 /// let client = ClaudeClient::try_new(ClaudeAgentOptions::default())?;
114 /// # Ok::<(), claude_code_agent_sdk::ClaudeError>(())
115 /// ```
116 pub fn try_new(options: ClaudeAgentOptions) -> Result<Self> {
117 // Validate by attempting to create transport (but don't keep it)
118 let prompt = QueryPrompt::Streaming;
119 let _ = SubprocessTransport::new(prompt, options.clone())?;
120
121 Ok(Self {
122 options,
123 query_manager: None,
124 current_query_id: None,
125 connected: false,
126 })
127 }
128
129 /// Connect to Claude (analogous to Python's __aenter__)
130 ///
131 /// This establishes the connection to the Claude Code CLI and initializes
132 /// the bidirectional communication channel.
133 ///
134 /// Uses the Codex-style architecture with QueryManager for complete
135 /// message isolation between queries.
136 ///
137 /// # Errors
138 ///
139 /// Returns an error if:
140 /// - Claude CLI cannot be found or started
141 /// - The initialization handshake fails
142 /// - Hook registration fails
143 /// - `can_use_tool` callback is set with incompatible `permission_prompt_tool_name`
144 #[instrument(
145 name = "claude.client.connect",
146 skip(self),
147 fields(
148 has_can_use_tool = self.options.can_use_tool.is_some(),
149 has_hooks = self.options.hooks.is_some(),
150 model = %self.options.model.as_deref().unwrap_or("default"),
151 )
152 )]
153 pub async fn connect(&mut self) -> Result<()> {
154 if self.connected {
155 debug!("Client already connected, skipping");
156 return Ok(());
157 }
158
159 info!("Connecting to Claude Code CLI (using QueryManager for isolated queries)");
160
161 // Automatically set permission_prompt_tool_name to "stdio" when can_use_tool is provided
162 // This matches Python SDK behavior (client.py lines 106-122)
163 // which ensures CLI uses control protocol for permission prompts
164 let mut options = self.options.clone();
165 if options.can_use_tool.is_some() && options.permission_prompt_tool_name.is_none() {
166 info!("can_use_tool callback is set, automatically setting permission_prompt_tool_name to 'stdio'");
167 options.permission_prompt_tool_name = Some("stdio".to_string());
168 }
169
170 // Validate can_use_tool configuration (aligned with Python SDK behavior)
171 // When can_use_tool callback is set, permission_prompt_tool_name must be "stdio"
172 // to ensure the control protocol can handle permission requests
173 if options.can_use_tool.is_some()
174 && let Some(ref tool_name) = options.permission_prompt_tool_name
175 && tool_name != "stdio"
176 {
177 return Err(ClaudeError::InvalidConfig(
178 "can_use_tool callback requires permission_prompt_tool_name to be 'stdio' or unset. \
179 Custom permission_prompt_tool_name is incompatible with can_use_tool callback."
180 .to_string(),
181 ));
182 }
183
184 // Prepare hooks for initialization
185 // Build efficiency hooks if configured
186 let efficiency_hooks = self
187 .options
188 .efficiency
189 .as_ref()
190 .map(build_efficiency_hooks)
191 .unwrap_or_default();
192
193 // Merge user hooks with efficiency hooks
194 let merged_hooks = merge_hooks(self.options.hooks.clone(), efficiency_hooks);
195
196 // Convert hooks to internal format
197 let hooks = merged_hooks.as_ref().map(|hooks_map| {
198 hooks_map
199 .iter()
200 .map(|(event, matchers)| {
201 let event_name = match event {
202 HookEvent::PreToolUse => "PreToolUse",
203 HookEvent::PostToolUse => "PostToolUse",
204 HookEvent::PostToolUseFailure => "PostToolUseFailure",
205 HookEvent::UserPromptSubmit => "UserPromptSubmit",
206 HookEvent::Stop => "Stop",
207 HookEvent::SubagentStop => "SubagentStop",
208 HookEvent::PreCompact => "PreCompact",
209 HookEvent::Notification => "Notification",
210 HookEvent::SubagentStart => "SubagentStart",
211 HookEvent::PermissionRequest => "PermissionRequest",
212 };
213 (event_name.to_string(), matchers.clone())
214 })
215 .collect()
216 });
217
218 // Extract SDK MCP servers from options
219 let sdk_mcp_servers =
220 if let crate::types::mcp::McpServers::Dict(servers_dict) = &self.options.mcp_servers {
221 servers_dict
222 .iter()
223 .filter_map(|(name, config)| {
224 if let crate::types::mcp::McpServerConfig::Sdk(sdk_config) = config {
225 Some((name.clone(), sdk_config.clone()))
226 } else {
227 None
228 }
229 })
230 .collect()
231 } else {
232 std::collections::HashMap::new()
233 };
234
235 // Clone options for the transport factory
236 let options_clone = options.clone();
237
238 // Create QueryManager with a transport factory
239 // The factory creates new transports for each isolated query
240 // Note: connect() is called later in create_query(), not here
241 let mut query_manager = QueryManager::new(move || {
242 let prompt = QueryPrompt::Streaming;
243 let transport = SubprocessTransport::new(prompt, options_clone.clone())?;
244 Ok(Box::new(transport) as Box<dyn Transport>)
245 });
246
247 // Set control request timeout
248 query_manager.set_control_request_timeout(self.options.control_request_timeout);
249
250 // Set configuration on QueryManager
251 query_manager.set_hooks(hooks).await;
252 query_manager.set_sdk_mcp_servers(sdk_mcp_servers).await;
253 query_manager.set_can_use_tool(self.options.can_use_tool.clone()).await;
254
255 // Serialize agents to JSON Value for the initialize request body
256 // This avoids OS ARG_MAX limits from passing large agent definitions via CLI flags
257 if let Some(ref agents) = self.options.agents {
258 let agents_value = serde_json::to_value(agents).map_err(|e| {
259 ClaudeError::InvalidConfig(format!("Failed to serialize agents: {}", e))
260 })?;
261 query_manager.set_agents(Some(agents_value)).await;
262 }
263
264 let query_manager = Arc::new(query_manager);
265
266 // Create the first query for initialization
267 let first_query_id = query_manager.create_query().await?;
268
269 // Start cleanup task for inactive queries
270 Arc::clone(&query_manager).start_cleanup_task();
271
272 self.query_manager = Some(query_manager);
273 self.current_query_id = Some(first_query_id);
274 self.connected = true;
275
276 info!("Successfully connected to Claude Code CLI with QueryManager");
277 Ok(())
278 }
279
280 /// Send a query to Claude
281 ///
282 /// This sends a new user prompt to Claude. Claude will remember the context
283 /// of previous queries within the same session.
284 ///
285 /// # Arguments
286 ///
287 /// * `prompt` - The user prompt to send
288 ///
289 /// # Errors
290 ///
291 /// Returns an error if the client is not connected or if sending fails.
292 ///
293 /// # Example
294 ///
295 /// ```no_run
296 /// # use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions};
297 /// # #[tokio::main]
298 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
299 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
300 /// # client.connect().await?;
301 /// client.query("What is 2 + 2?").await?;
302 /// # Ok(())
303 /// # }
304 /// ```
305 #[instrument(
306 name = "claude.client.query",
307 skip(self, prompt),
308 fields(session_id = "default",)
309 )]
310 pub async fn query(&mut self, prompt: impl Into<String>) -> Result<()> {
311 self.query_with_session(prompt, "default").await
312 }
313
314 /// Send a query to Claude with a specific session ID
315 ///
316 /// This sends a new user prompt to Claude. Different session IDs maintain
317 /// separate conversation contexts.
318 ///
319 /// # Arguments
320 ///
321 /// * `prompt` - The user prompt to send
322 /// * `session_id` - Session identifier for the conversation
323 ///
324 /// # Errors
325 ///
326 /// Returns an error if the client is not connected or if sending fails.
327 ///
328 /// # Example
329 ///
330 /// ```no_run
331 /// # use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions};
332 /// # #[tokio::main]
333 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
334 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
335 /// # client.connect().await?;
336 /// // Separate conversation contexts
337 /// client.query_with_session("First question", "session-1").await?;
338 /// client.query_with_session("Different question", "session-2").await?;
339 /// # Ok(())
340 /// # }
341 /// ```
342 pub async fn query_with_session(
343 &mut self,
344 prompt: impl Into<String>,
345 session_id: impl Into<String>,
346 ) -> Result<()> {
347 let query_manager = self.query_manager.as_ref().ok_or_else(|| {
348 ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
349 })?;
350
351 let prompt_str = prompt.into();
352 let session_id_str = session_id.into();
353
354 // Get or create query for this session (reuses existing query to maintain context)
355 let query_id = query_manager.get_or_create_session_query(&session_id_str).await?;
356 self.current_query_id = Some(query_id.clone());
357
358 // Get the isolated query
359 let query = query_manager.get_query(&query_id)?;
360
361 // Format as JSON message for stream-json input format
362 let user_message = serde_json::json!({
363 "type": "user",
364 "message": {
365 "role": "user",
366 "content": prompt_str
367 },
368 "session_id": session_id_str
369 });
370
371 let message_str = serde_json::to_string(&user_message).map_err(|e| {
372 ClaudeError::Transport(format!("Failed to serialize user message: {}", e))
373 })?;
374
375 // Write directly to stdin (bypasses transport lock)
376 let stdin = query.stdin.clone();
377
378 if let Some(stdin_arc) = stdin {
379 let mut stdin_guard = stdin_arc.lock().await;
380 if let Some(ref mut stdin_stream) = *stdin_guard {
381 stdin_stream
382 .write_all(message_str.as_bytes())
383 .await
384 .map_err(|e| ClaudeError::Transport(format!("Failed to write query: {}", e)))?;
385 stdin_stream.write_all(b"\n").await.map_err(|e| {
386 ClaudeError::Transport(format!("Failed to write newline: {}", e))
387 })?;
388 stdin_stream
389 .flush()
390 .await
391 .map_err(|e| ClaudeError::Transport(format!("Failed to flush: {}", e)))?;
392 } else {
393 return Err(ClaudeError::Transport("stdin not available".to_string()));
394 }
395 } else {
396 return Err(ClaudeError::Transport("stdin not set".to_string()));
397 }
398
399 debug!(
400 query_id = %query_id,
401 session_id = %session_id_str,
402 "Sent query to isolated query"
403 );
404
405 Ok(())
406 }
407
408 /// Send a query with structured content blocks (supports images)
409 ///
410 /// This method enables multimodal queries in bidirectional streaming mode.
411 /// Use it to send images alongside text for vision-related tasks.
412 ///
413 /// # Arguments
414 ///
415 /// * `content` - A vector of content blocks (text and/or images)
416 ///
417 /// # Errors
418 ///
419 /// Returns an error if:
420 /// - The content vector is empty (must include at least one text or image block)
421 /// - The client is not connected (call `connect()` first)
422 /// - Sending the message fails
423 ///
424 /// # Example
425 ///
426 /// ```no_run
427 /// # use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions, UserContentBlock};
428 /// # #[tokio::main]
429 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
430 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
431 /// # client.connect().await?;
432 /// let base64_data = "iVBORw0KGgo..."; // base64 encoded image
433 /// client.query_with_content(vec![
434 /// UserContentBlock::text("What's in this image?"),
435 /// UserContentBlock::image_base64("image/png", base64_data)?,
436 /// ]).await?;
437 /// # Ok(())
438 /// # }
439 /// ```
440 pub async fn query_with_content(
441 &mut self,
442 content: impl Into<Vec<UserContentBlock>>,
443 ) -> Result<()> {
444 self.query_with_content_and_session(content, "default")
445 .await
446 }
447
448 /// Send a query with structured content blocks and a specific session ID
449 ///
450 /// This method enables multimodal queries with session management for
451 /// maintaining separate conversation contexts.
452 ///
453 /// # Arguments
454 ///
455 /// * `content` - A vector of content blocks (text and/or images)
456 /// * `session_id` - Session identifier for the conversation
457 ///
458 /// # Errors
459 ///
460 /// Returns an error if:
461 /// - The content vector is empty (must include at least one text or image block)
462 /// - The client is not connected (call `connect()` first)
463 /// - Sending the message fails
464 ///
465 /// # Example
466 ///
467 /// ```no_run
468 /// # use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions, UserContentBlock};
469 /// # #[tokio::main]
470 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
471 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
472 /// # client.connect().await?;
473 /// client.query_with_content_and_session(
474 /// vec![
475 /// UserContentBlock::text("Analyze this chart"),
476 /// UserContentBlock::image_url("https://example.com/chart.png"),
477 /// ],
478 /// "analysis-session",
479 /// ).await?;
480 /// # Ok(())
481 /// # }
482 /// ```
483 pub async fn query_with_content_and_session(
484 &mut self,
485 content: impl Into<Vec<UserContentBlock>>,
486 session_id: impl Into<String>,
487 ) -> Result<()> {
488 let query_manager = self.query_manager.as_ref().ok_or_else(|| {
489 ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
490 })?;
491
492 let content_blocks: Vec<UserContentBlock> = content.into();
493 UserContentBlock::validate_content(&content_blocks)?;
494
495 let session_id_str = session_id.into();
496
497 // Get or create query for this session (reuses existing query to maintain context)
498 let query_id = query_manager.get_or_create_session_query(&session_id_str).await?;
499 self.current_query_id = Some(query_id.clone());
500
501 // Get the isolated query
502 let query = query_manager.get_query(&query_id)?;
503
504 // Format as JSON message for stream-json input format
505 // Content is an array of content blocks, not a simple string
506 let user_message = serde_json::json!({
507 "type": "user",
508 "message": {
509 "role": "user",
510 "content": content_blocks
511 },
512 "session_id": session_id_str
513 });
514
515 let message_str = serde_json::to_string(&user_message).map_err(|e| {
516 ClaudeError::Transport(format!("Failed to serialize user message: {}", e))
517 })?;
518
519 // Write directly to stdin (bypasses transport lock)
520 let stdin = query.stdin.clone();
521
522 if let Some(stdin_arc) = stdin {
523 let mut stdin_guard = stdin_arc.lock().await;
524 if let Some(ref mut stdin_stream) = *stdin_guard {
525 stdin_stream
526 .write_all(message_str.as_bytes())
527 .await
528 .map_err(|e| ClaudeError::Transport(format!("Failed to write query: {}", e)))?;
529 stdin_stream.write_all(b"\n").await.map_err(|e| {
530 ClaudeError::Transport(format!("Failed to write newline: {}", e))
531 })?;
532 stdin_stream
533 .flush()
534 .await
535 .map_err(|e| ClaudeError::Transport(format!("Failed to flush: {}", e)))?;
536 } else {
537 return Err(ClaudeError::Transport("stdin not available".to_string()));
538 }
539 } else {
540 return Err(ClaudeError::Transport("stdin not set".to_string()));
541 }
542
543 debug!(
544 query_id = %query_id,
545 session_id = %session_id_str,
546 "Sent content query to isolated query"
547 );
548
549 Ok(())
550 }
551
552 /// Receive all messages as a stream (continuous)
553 ///
554 /// This method returns a stream that yields all messages from Claude
555 /// indefinitely until the stream is closed or an error occurs.
556 ///
557 /// Use this when you want to process all messages, including multiple
558 /// responses and system events.
559 ///
560 /// # Returns
561 ///
562 /// A stream of `Result<Message>` that continues until the connection closes.
563 ///
564 /// # Example
565 ///
566 /// ```no_run
567 /// # use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions};
568 /// # use futures::StreamExt;
569 /// # #[tokio::main]
570 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
571 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
572 /// # client.connect().await?;
573 /// # client.query("Hello").await?;
574 /// let mut stream = client.receive_messages();
575 /// while let Some(message) = stream.next().await {
576 /// println!("Received: {:?}", message?);
577 /// }
578 /// # Ok(())
579 /// # }
580 /// ```
581 pub fn receive_messages(&self) -> Pin<Box<dyn Stream<Item = Result<Message>> + Send + '_>> {
582 let query_manager = match &self.query_manager {
583 Some(qm) => Arc::clone(qm),
584 None => {
585 return Box::pin(futures::stream::once(async {
586 Err(ClaudeError::InvalidConfig(
587 "Client not connected. Call connect() first.".to_string(),
588 ))
589 }));
590 }
591 };
592
593 let query_id = match &self.current_query_id {
594 Some(id) => id.clone(),
595 None => {
596 return Box::pin(futures::stream::once(async {
597 Err(ClaudeError::InvalidConfig(
598 "No active query. Call query() first.".to_string(),
599 ))
600 }));
601 }
602 };
603
604 Box::pin(async_stream::stream! {
605 // Get the isolated query and its message receiver
606 let query = match query_manager.get_query(&query_id) {
607 Ok(q) => q,
608 Err(e) => {
609 yield Err(e);
610 return;
611 }
612 };
613
614 let rx: Arc<Mutex<tokio::sync::mpsc::UnboundedReceiver<serde_json::Value>>> = {
615 Arc::clone(&query.message_rx)
616 };
617
618 loop {
619 let message = {
620 let mut rx_guard = rx.lock().await;
621 rx_guard.recv().await
622 };
623
624 match message {
625 Some(json) => {
626 match MessageParser::parse(json) {
627 Ok(msg) => yield Ok(msg),
628 Err(e) => {
629 eprintln!("Failed to parse message: {}", e);
630 yield Err(e);
631 }
632 }
633 }
634 None => break,
635 }
636 }
637 })
638 }
639
640 /// Receive messages until a ResultMessage
641 ///
642 /// This method returns a stream that yields messages until it encounters
643 /// a `ResultMessage`, which signals the completion of a Claude response.
644 ///
645 /// This is the most common pattern for handling Claude responses, as it
646 /// processes one complete "turn" of the conversation.
647 ///
648 /// This method uses query-scoped message channels to ensure message isolation,
649 /// preventing late-arriving ResultMessages from being consumed by the wrong prompt.
650 ///
651 /// # Returns
652 ///
653 /// A stream of `Result<Message>` that ends when a ResultMessage is received.
654 ///
655 /// # Example
656 ///
657 /// ```no_run
658 /// # use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions, Message};
659 /// # use futures::StreamExt;
660 /// # #[tokio::main]
661 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
662 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
663 /// # client.connect().await?;
664 /// # client.query("Hello").await?;
665 /// let mut stream = client.receive_response();
666 /// while let Some(message) = stream.next().await {
667 /// match message? {
668 /// Message::Assistant(msg) => println!("Assistant: {:?}", msg),
669 /// Message::Result(result) => {
670 /// println!("Done! Cost: ${:?}", result.total_cost_usd);
671 /// break;
672 /// }
673 /// _ => {}
674 /// }
675 /// }
676 /// # Ok(())
677 /// # }
678 /// ```
679 pub fn receive_response(&self) -> Pin<Box<dyn Stream<Item = Result<Message>> + Send + '_>> {
680 let query_manager = match &self.query_manager {
681 Some(qm) => Arc::clone(qm),
682 None => {
683 return Box::pin(futures::stream::once(async {
684 Err(ClaudeError::InvalidConfig(
685 "Client not connected. Call connect() first.".to_string(),
686 ))
687 }));
688 }
689 };
690
691 let query_id = match &self.current_query_id {
692 Some(id) => id.clone(),
693 None => {
694 return Box::pin(futures::stream::once(async {
695 Err(ClaudeError::InvalidConfig(
696 "No active query. Call query() first.".to_string(),
697 ))
698 }));
699 }
700 };
701
702 Box::pin(async_stream::stream! {
703 // ====================================================================
704 // ISOLATED QUERY MESSAGE CHANNEL (Codex-style)
705 // ====================================================================
706 // In the Codex-style architecture, each query has its own completely
707 // isolated QueryFull instance. We get the message receiver directly
708 // from the isolated query, eliminating the need for routing logic.
709 //
710 // This provides:
711 // - Complete message isolation
712 // - No possibility of message confusion
713 // - Simpler architecture without routing overhead
714 //
715 // Note: Cleanup is handled by the periodic cleanup task in QueryManager,
716 // which removes inactive queries whose channels have been closed.
717
718 debug!(
719 query_id = %query_id,
720 "Getting message receiver from isolated query"
721 );
722
723 // Get the isolated query and its message receiver
724 let query = match query_manager.get_query(&query_id) {
725 Ok(q) => q,
726 Err(e) => {
727 yield Err(e);
728 return;
729 }
730 };
731
732 // Get the message receiver from the isolated query
733 // In isolated mode, we directly access the message_rx without routing
734 let rx: Arc<Mutex<tokio::sync::mpsc::UnboundedReceiver<serde_json::Value>>> = {
735 Arc::clone(&query.message_rx)
736 };
737
738 loop {
739 let message = {
740 let mut rx_guard = rx.lock().await;
741 rx_guard.recv().await
742 };
743
744 match message {
745 Some(json) => {
746 match MessageParser::parse(json) {
747 Ok(msg) => {
748 let is_result = matches!(msg, Message::Result(_));
749 yield Ok(msg);
750 if is_result {
751 debug!(
752 query_id = %query_id,
753 "Received ResultMessage, ending stream"
754 );
755 // Cleanup will be handled by the periodic cleanup task
756 // when the query becomes inactive
757 break;
758 }
759 }
760 Err(e) => {
761 eprintln!("Failed to parse message: {}", e);
762 yield Err(e);
763 }
764 }
765 }
766 None => {
767 debug!(
768 query_id = %query_id,
769 "Isolated query channel closed"
770 );
771 // Cleanup will be handled by the periodic cleanup task
772 break;
773 }
774 }
775 }
776 })
777 }
778
779 /// Send an interrupt signal to stop the current Claude operation
780 ///
781 /// This is analogous to Python's `client.interrupt()`.
782 ///
783 /// # Errors
784 ///
785 /// Returns an error if the client is not connected or if sending fails.
786 pub async fn interrupt(&self) -> Result<()> {
787 let query_manager = self.query_manager.as_ref().ok_or_else(|| {
788 ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
789 })?;
790
791 let query_id = self.current_query_id.as_ref().ok_or_else(|| {
792 ClaudeError::InvalidConfig("No active query. Call query() first.".to_string())
793 })?;
794
795 let query = query_manager.get_query(query_id)?;
796 query.interrupt().await
797 }
798
799 /// Change the permission mode dynamically
800 ///
801 /// This is analogous to Python's `client.set_permission_mode()`.
802 ///
803 /// # Arguments
804 ///
805 /// * `mode` - The new permission mode to set
806 ///
807 /// # Errors
808 ///
809 /// Returns an error if the client is not connected or if sending fails.
810 pub async fn set_permission_mode(&self, mode: PermissionMode) -> Result<()> {
811 let query_manager = self.query_manager.as_ref().ok_or_else(|| {
812 ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
813 })?;
814
815 let query_id = self.current_query_id.as_ref().ok_or_else(|| {
816 ClaudeError::InvalidConfig("No active query. Call query() first.".to_string())
817 })?;
818
819 let query = query_manager.get_query(query_id)?;
820 query.set_permission_mode(mode).await
821 }
822
823 /// Change the AI model dynamically
824 ///
825 /// This is analogous to Python's `client.set_model()`.
826 ///
827 /// # Arguments
828 ///
829 /// * `model` - The new model name, or None to use default
830 ///
831 /// # Errors
832 ///
833 /// Returns an error if the client is not connected or if sending fails.
834 pub async fn set_model(&self, model: Option<&str>) -> Result<()> {
835 let query_manager = self.query_manager.as_ref().ok_or_else(|| {
836 ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
837 })?;
838
839 let query_id = self.current_query_id.as_ref().ok_or_else(|| {
840 ClaudeError::InvalidConfig("No active query. Call query() first.".to_string())
841 })?;
842
843 let query = query_manager.get_query(query_id)?;
844 query.set_model(model).await
845 }
846
847 /// Rewind tracked files to their state at a specific user message.
848 ///
849 /// This is analogous to Python's `client.rewind_files()`.
850 ///
851 /// # Requirements
852 ///
853 /// - `enable_file_checkpointing=true` in options to track file changes
854 /// - `extra_args={"replay-user-messages": None}` to receive UserMessage
855 /// objects with `uuid` in the response stream
856 ///
857 /// # Arguments
858 ///
859 /// * `user_message_id` - UUID of the user message to rewind to. This should be
860 /// the `uuid` field from a `UserMessage` received during the conversation.
861 ///
862 /// # Errors
863 ///
864 /// Returns an error if the client is not connected or if sending fails.
865 ///
866 /// # Example
867 ///
868 /// ```no_run
869 /// # use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions, Message};
870 /// # use std::collections::HashMap;
871 /// # #[tokio::main]
872 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
873 /// let options = ClaudeAgentOptions::builder()
874 /// .enable_file_checkpointing(true)
875 /// .extra_args(HashMap::from([("replay-user-messages".to_string(), None)]))
876 /// .build();
877 /// let mut client = ClaudeClient::new(options);
878 /// client.connect().await?;
879 ///
880 /// client.query("Make some changes to my files").await?;
881 /// let mut checkpoint_id = None;
882 /// {
883 /// let mut stream = client.receive_response();
884 /// use futures::StreamExt;
885 /// while let Some(Ok(msg)) = stream.next().await {
886 /// if let Message::User(user_msg) = &msg {
887 /// if let Some(uuid) = &user_msg.uuid {
888 /// checkpoint_id = Some(uuid.clone());
889 /// }
890 /// }
891 /// }
892 /// }
893 ///
894 /// // Later, rewind to that point
895 /// if let Some(id) = checkpoint_id {
896 /// client.rewind_files(&id).await?;
897 /// }
898 /// # Ok(())
899 /// # }
900 /// ```
901 pub async fn rewind_files(&self, user_message_id: &str) -> Result<()> {
902 let query_manager = self.query_manager.as_ref().ok_or_else(|| {
903 ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
904 })?;
905
906 let query_id = self.current_query_id.as_ref().ok_or_else(|| {
907 ClaudeError::InvalidConfig("No active query. Call query() first.".to_string())
908 })?;
909
910 let query = query_manager.get_query(query_id)?;
911 query.rewind_files(user_message_id).await
912 }
913
914 /// Get server initialization info including available commands and output styles
915 ///
916 /// Returns initialization information from the Claude Code server including:
917 /// - Available commands (slash commands, system commands, etc.)
918 /// - Current and available output styles
919 /// - Server capabilities
920 ///
921 /// This is analogous to Python's `client.get_server_info()`.
922 ///
923 /// # Returns
924 ///
925 /// Dictionary with server info, or None if not connected
926 ///
927 /// # Example
928 ///
929 /// ```no_run
930 /// # use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions};
931 /// # #[tokio::main]
932 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
933 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
934 /// # client.connect().await?;
935 /// if let Some(info) = client.get_server_info().await {
936 /// println!("Commands available: {}", info.get("commands").map(|c| c.as_array().map(|a| a.len()).unwrap_or(0)).unwrap_or(0));
937 /// println!("Output style: {:?}", info.get("output_style"));
938 /// }
939 /// # Ok(())
940 /// # }
941 /// ```
942 pub async fn get_server_info(&self) -> Option<serde_json::Value> {
943 let query_manager = self.query_manager.as_ref()?;
944 let query_id = self.current_query_id.as_ref()?;
945 let Ok(query) = query_manager.get_query(query_id) else {
946 return None;
947 };
948 query.get_initialization_result().await
949 }
950
951 /// Get current MCP server connection status
952 ///
953 /// Queries the Claude Code CLI for the live connection status of all
954 /// configured MCP servers.
955 ///
956 /// This is analogous to Python's `client.get_mcp_status()`.
957 ///
958 /// # Returns
959 ///
960 /// Dictionary with MCP server status information. Contains a
961 /// `mcpServers` key with a list of server status objects, each having:
962 /// - `name`: Server name
963 /// - `status`: Connection status (`connected`, `pending`, `failed`,
964 /// `needs-auth`, `disabled`)
965 ///
966 /// # Errors
967 ///
968 /// Returns an error if the client is not connected or if the request fails.
969 ///
970 /// # Example
971 ///
972 /// ```no_run
973 /// # use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions};
974 /// # #[tokio::main]
975 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
976 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
977 /// # client.connect().await?;
978 /// let status = client.get_mcp_status().await?;
979 /// println!("MCP status: {:?}", status);
980 /// # Ok(())
981 /// # }
982 /// ```
983 pub async fn get_mcp_status(&self) -> Result<serde_json::Value> {
984 let query_manager = self.query_manager.as_ref().ok_or_else(|| {
985 ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
986 })?;
987
988 let query_id = self.current_query_id.as_ref().ok_or_else(|| {
989 ClaudeError::InvalidConfig("No active query. Call query() first.".to_string())
990 })?;
991
992 let query = query_manager.get_query(query_id)?;
993 query.get_mcp_status().await
994 }
995
996 /// Start a new session by switching to a different session ID
997 ///
998 /// This is a convenience method that creates a new conversation context.
999 /// It's equivalent to calling `query_with_session()` with a new session ID.
1000 ///
1001 /// To completely clear memory and start fresh, use `ClaudeAgentOptions::builder().fork_session(true).build()`
1002 /// when creating a new client.
1003 ///
1004 /// # Arguments
1005 ///
1006 /// * `session_id` - The new session ID to use
1007 /// * `prompt` - Initial message for the new session
1008 ///
1009 /// # Errors
1010 ///
1011 /// Returns an error if the client is not connected or if sending fails.
1012 ///
1013 /// # Example
1014 ///
1015 /// ```no_run
1016 /// # use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions};
1017 /// # #[tokio::main]
1018 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
1019 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
1020 /// # client.connect().await?;
1021 /// // First conversation
1022 /// client.query("Hello").await?;
1023 ///
1024 /// // Start new conversation with different context
1025 /// client.new_session("session-2", "Tell me about Rust").await?;
1026 /// # Ok(())
1027 /// # }
1028 /// ```
1029 pub async fn new_session(
1030 &mut self,
1031 session_id: impl Into<String>,
1032 prompt: impl Into<String>,
1033 ) -> Result<()> {
1034 self.query_with_session(prompt, session_id).await
1035 }
1036
1037 /// Disconnect from Claude (analogous to Python's __aexit__)
1038 ///
1039 /// This cleanly shuts down the connection to Claude Code CLI.
1040 ///
1041 /// # Errors
1042 ///
1043 /// Returns an error if disconnection fails.
1044 #[instrument(name = "claude.client.disconnect", skip(self))]
1045 pub async fn disconnect(&mut self) -> Result<()> {
1046 if !self.connected {
1047 debug!("Client already disconnected");
1048 return Ok(());
1049 }
1050
1051 info!("Disconnecting from Claude Code CLI (closing all isolated queries)");
1052
1053 if let Some(query_manager) = self.query_manager.take() {
1054 // Get all queries for cleanup
1055 let queries = query_manager.get_all_queries();
1056
1057 // Close all isolated queries by closing their resources
1058 // This signals each CLI process to exit
1059 for (_query_id, query) in &queries {
1060 // Close stdin (if available)
1061 if let Some(ref stdin_arc) = query.stdin {
1062 let mut stdin_guard = stdin_arc.lock().await;
1063 if let Some(mut stdin_stream) = stdin_guard.take() {
1064 let _ = stdin_stream.shutdown().await;
1065 }
1066 }
1067
1068 // Close transport
1069 let transport = Arc::clone(&query.transport);
1070 let mut transport_guard = transport.lock().await;
1071 let _ = transport_guard.close().await;
1072 }
1073
1074 // Give background tasks a moment to finish reading and release locks
1075 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1076 }
1077
1078 self.current_query_id = None;
1079 self.connected = false;
1080 debug!("Disconnected successfully");
1081 Ok(())
1082 }
1083}
1084
1085impl Drop for ClaudeClient {
1086 fn drop(&mut self) {
1087 // Note: We can't run async code in Drop, so we can't guarantee clean shutdown
1088 // Users should call disconnect() explicitly
1089 if self.connected {
1090 eprintln!(
1091 "Warning: ClaudeClient dropped without calling disconnect(). Resources may not be cleaned up properly."
1092 );
1093 }
1094 }
1095}
1096
1097#[cfg(test)]
1098mod tests {
1099 use super::*;
1100 use crate::types::permissions::{PermissionResult, PermissionResultAllow};
1101 use std::sync::Arc;
1102
1103 #[tokio::test]
1104 async fn test_connect_rejects_can_use_tool_with_custom_permission_tool() {
1105 let callback: crate::types::permissions::CanUseToolCallback =
1106 Arc::new(|_tool_name, _tool_input, _context| {
1107 Box::pin(async move { PermissionResult::Allow(PermissionResultAllow::default()) })
1108 });
1109
1110 let opts = ClaudeAgentOptions::builder()
1111 .can_use_tool(callback)
1112 .permission_prompt_tool_name("custom_tool") // Not "stdio"
1113 .build();
1114
1115 let mut client = ClaudeClient::new(opts);
1116 let result = client.connect().await;
1117
1118 assert!(result.is_err());
1119 let err = result.unwrap_err();
1120 assert!(matches!(err, ClaudeError::InvalidConfig(_)));
1121 assert!(err.to_string().contains("permission_prompt_tool_name"));
1122 }
1123
1124 #[tokio::test]
1125 async fn test_connect_accepts_can_use_tool_with_stdio() {
1126 let callback: crate::types::permissions::CanUseToolCallback =
1127 Arc::new(|_tool_name, _tool_input, _context| {
1128 Box::pin(async move { PermissionResult::Allow(PermissionResultAllow::default()) })
1129 });
1130
1131 let opts = ClaudeAgentOptions::builder()
1132 .can_use_tool(callback)
1133 .permission_prompt_tool_name("stdio") // Explicitly "stdio" is OK
1134 .build();
1135
1136 let mut client = ClaudeClient::new(opts);
1137 // This will fail later (CLI not found), but should pass validation
1138 let result = client.connect().await;
1139
1140 // Should not be InvalidConfig error about permission_prompt_tool_name
1141 if let Err(ref err) = result {
1142 assert!(
1143 !err.to_string().contains("permission_prompt_tool_name"),
1144 "Should not fail on permission_prompt_tool_name validation"
1145 );
1146 }
1147 }
1148
1149 #[tokio::test]
1150 async fn test_connect_accepts_can_use_tool_without_permission_tool() {
1151 let callback: crate::types::permissions::CanUseToolCallback =
1152 Arc::new(|_tool_name, _tool_input, _context| {
1153 Box::pin(async move { PermissionResult::Allow(PermissionResultAllow::default()) })
1154 });
1155
1156 let opts = ClaudeAgentOptions::builder()
1157 .can_use_tool(callback)
1158 // No permission_prompt_tool_name set - defaults to stdio
1159 .build();
1160
1161 let mut client = ClaudeClient::new(opts);
1162 // This will fail later (CLI not found), but should pass validation
1163 let result = client.connect().await;
1164
1165 // Should not be InvalidConfig error about permission_prompt_tool_name
1166 if let Err(ref err) = result {
1167 assert!(
1168 !err.to_string().contains("permission_prompt_tool_name"),
1169 "Should not fail on permission_prompt_tool_name validation"
1170 );
1171 }
1172 }
1173}