claude_code_agent_sdk/client.rs
1//! ClaudeClient for bidirectional streaming interactions with hook support
2
3use tracing::{debug, info, instrument};
4
5use futures::stream::Stream;
6use std::pin::Pin;
7use std::sync::Arc;
8use tokio::io::AsyncWriteExt;
9use tokio::sync::Mutex;
10
11use crate::errors::{ClaudeError, Result};
12use crate::internal::message_parser::MessageParser;
13use crate::internal::query_manager::QueryManager;
14use crate::internal::transport::subprocess::QueryPrompt;
15use crate::internal::transport::{SubprocessTransport, Transport};
16use crate::types::config::{ClaudeAgentOptions, PermissionMode};
17use crate::types::efficiency::{build_efficiency_hooks, merge_hooks};
18use crate::types::hooks::HookEvent;
19use crate::types::messages::{Message, UserContentBlock};
20
21/// Client for bidirectional streaming interactions with Claude
22///
23/// This client provides the same functionality as Python's ClaudeSDKClient,
24/// supporting bidirectional communication, streaming responses, and dynamic
25/// control over the Claude session.
26///
27/// This implementation uses the Codex-style architecture with isolated queries,
28/// where each query has its own completely independent QueryFull instance.
29/// This ensures complete message isolation and prevents ResultMessage confusion.
30///
31/// # Example
32///
33/// ```no_run
34/// use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions};
35/// use futures::StreamExt;
36///
37/// #[tokio::main]
38/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
39/// let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
40///
41/// // Connect to Claude
42/// client.connect().await?;
43///
44/// // Send a query
45/// client.query("Hello Claude!").await?;
46///
47/// // Receive response as a stream
48/// {
49/// let mut stream = client.receive_response();
50/// while let Some(message) = stream.next().await {
51/// println!("Received: {:?}", message?);
52/// }
53/// }
54///
55/// // Disconnect
56/// client.disconnect().await?;
57/// Ok(())
58/// }
59/// ```
60pub struct ClaudeClient {
61 options: ClaudeAgentOptions,
62 /// QueryManager for creating isolated queries (Codex-style architecture)
63 query_manager: Option<Arc<QueryManager>>,
64 /// Current query_id for the active prompt
65 current_query_id: Option<String>,
66 connected: bool,
67}
68
69impl ClaudeClient {
70 /// Create a new ClaudeClient
71 ///
72 /// # Arguments
73 ///
74 /// * `options` - Configuration options for the Claude client
75 ///
76 /// # Example
77 ///
78 /// ```no_run
79 /// use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions};
80 ///
81 /// let client = ClaudeClient::new(ClaudeAgentOptions::default());
82 /// ```
83 pub fn new(options: ClaudeAgentOptions) -> Self {
84 Self {
85 options,
86 query_manager: None,
87 current_query_id: None,
88 connected: false,
89 }
90 }
91
92 /// Create a new ClaudeClient with early validation
93 ///
94 /// Unlike `new()`, this validates the configuration eagerly by attempting
95 /// to create the transport. This catches issues like invalid working directory
96 /// or missing CLI before `connect()` is called.
97 ///
98 /// # Arguments
99 ///
100 /// * `options` - Configuration options for the Claude client
101 ///
102 /// # Errors
103 ///
104 /// Returns an error if:
105 /// - The working directory does not exist or is not a directory
106 /// - Claude CLI cannot be found
107 ///
108 /// # Example
109 ///
110 /// ```no_run
111 /// use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions};
112 ///
113 /// let client = ClaudeClient::try_new(ClaudeAgentOptions::default())?;
114 /// # Ok::<(), claude_code_agent_sdk::ClaudeError>(())
115 /// ```
116 pub fn try_new(options: ClaudeAgentOptions) -> Result<Self> {
117 // Validate by attempting to create transport (but don't keep it)
118 let prompt = QueryPrompt::Streaming;
119 let _ = SubprocessTransport::new(prompt, options.clone())?;
120
121 Ok(Self {
122 options,
123 query_manager: None,
124 current_query_id: None,
125 connected: false,
126 })
127 }
128
129 /// Connect to Claude (analogous to Python's __aenter__)
130 ///
131 /// This establishes the connection to the Claude Code CLI and initializes
132 /// the bidirectional communication channel.
133 ///
134 /// Uses the Codex-style architecture with QueryManager for complete
135 /// message isolation between queries.
136 ///
137 /// # Errors
138 ///
139 /// Returns an error if:
140 /// - Claude CLI cannot be found or started
141 /// - The initialization handshake fails
142 /// - Hook registration fails
143 /// - `can_use_tool` callback is set with incompatible `permission_prompt_tool_name`
144 #[instrument(
145 name = "claude.client.connect",
146 skip(self),
147 fields(
148 has_can_use_tool = self.options.can_use_tool.is_some(),
149 has_hooks = self.options.hooks.is_some(),
150 model = %self.options.model.as_deref().unwrap_or("default"),
151 )
152 )]
153 pub async fn connect(&mut self) -> Result<()> {
154 if self.connected {
155 debug!("Client already connected, skipping");
156 return Ok(());
157 }
158
159 info!("Connecting to Claude Code CLI (using QueryManager for isolated queries)");
160
161 // Automatically set permission_prompt_tool_name to "stdio" when can_use_tool is provided
162 // This matches Python SDK behavior (client.py lines 106-122)
163 // which ensures CLI uses control protocol for permission prompts
164 let mut options = self.options.clone();
165 if options.can_use_tool.is_some() && options.permission_prompt_tool_name.is_none() {
166 info!(
167 "can_use_tool callback is set, automatically setting permission_prompt_tool_name to 'stdio'"
168 );
169 options.permission_prompt_tool_name = Some("stdio".to_string());
170 }
171
172 // Validate can_use_tool configuration (aligned with Python SDK behavior)
173 // When can_use_tool callback is set, permission_prompt_tool_name must be "stdio"
174 // to ensure the control protocol can handle permission requests
175 if options.can_use_tool.is_some()
176 && let Some(ref tool_name) = options.permission_prompt_tool_name
177 && tool_name != "stdio"
178 {
179 return Err(ClaudeError::InvalidConfig(
180 "can_use_tool callback requires permission_prompt_tool_name to be 'stdio' or unset. \
181 Custom permission_prompt_tool_name is incompatible with can_use_tool callback."
182 .to_string(),
183 ));
184 }
185
186 // Prepare hooks for initialization
187 // Build efficiency hooks if configured
188 let efficiency_hooks = self
189 .options
190 .efficiency
191 .as_ref()
192 .map(build_efficiency_hooks)
193 .unwrap_or_default();
194
195 // Merge user hooks with efficiency hooks
196 let merged_hooks = merge_hooks(self.options.hooks.clone(), efficiency_hooks);
197
198 // Convert hooks to internal format
199 let hooks = merged_hooks.as_ref().map(|hooks_map| {
200 hooks_map
201 .iter()
202 .map(|(event, matchers)| {
203 let event_name = match event {
204 HookEvent::PreToolUse => "PreToolUse",
205 HookEvent::PostToolUse => "PostToolUse",
206 HookEvent::PostToolUseFailure => "PostToolUseFailure",
207 HookEvent::UserPromptSubmit => "UserPromptSubmit",
208 HookEvent::Stop => "Stop",
209 HookEvent::SubagentStop => "SubagentStop",
210 HookEvent::PreCompact => "PreCompact",
211 HookEvent::Notification => "Notification",
212 HookEvent::SubagentStart => "SubagentStart",
213 HookEvent::PermissionRequest => "PermissionRequest",
214 };
215 (event_name.to_string(), matchers.clone())
216 })
217 .collect()
218 });
219
220 // Extract SDK MCP servers from options
221 let sdk_mcp_servers =
222 if let crate::types::mcp::McpServers::Dict(servers_dict) = &self.options.mcp_servers {
223 servers_dict
224 .iter()
225 .filter_map(|(name, config)| {
226 if let crate::types::mcp::McpServerConfig::Sdk(sdk_config) = config {
227 Some((name.clone(), sdk_config.clone()))
228 } else {
229 None
230 }
231 })
232 .collect()
233 } else {
234 std::collections::HashMap::new()
235 };
236
237 // Clone options for the transport factory
238 let options_clone = options.clone();
239
240 // Create QueryManager with a transport factory
241 // The factory creates new transports for each isolated query
242 // Note: connect() is called later in create_query(), not here
243 let mut query_manager = QueryManager::new(move || {
244 let prompt = QueryPrompt::Streaming;
245 let transport = SubprocessTransport::new(prompt, options_clone.clone())?;
246 Ok(Box::new(transport) as Box<dyn Transport>)
247 });
248
249 // Set control request timeout
250 query_manager.set_control_request_timeout(self.options.control_request_timeout);
251
252 // Set configuration on QueryManager
253 query_manager.set_hooks(hooks).await;
254 query_manager.set_sdk_mcp_servers(sdk_mcp_servers).await;
255 query_manager
256 .set_can_use_tool(self.options.can_use_tool.clone())
257 .await;
258
259 // Serialize agents to JSON Value for the initialize request body
260 // This avoids OS ARG_MAX limits from passing large agent definitions via CLI flags
261 if let Some(ref agents) = self.options.agents {
262 let agents_value = serde_json::to_value(agents).map_err(|e| {
263 ClaudeError::InvalidConfig(format!("Failed to serialize agents: {}", e))
264 })?;
265 query_manager.set_agents(Some(agents_value)).await;
266 }
267
268 let query_manager = Arc::new(query_manager);
269
270 // Create the first query for initialization
271 let first_query_id = query_manager.create_query().await?;
272
273 // Start cleanup task for inactive queries
274 Arc::clone(&query_manager).start_cleanup_task();
275
276 self.query_manager = Some(query_manager);
277 self.current_query_id = Some(first_query_id);
278 self.connected = true;
279
280 info!("Successfully connected to Claude Code CLI with QueryManager");
281 Ok(())
282 }
283
284 /// Send a query to Claude
285 ///
286 /// This sends a new user prompt to Claude. Claude will remember the context
287 /// of previous queries within the same session.
288 ///
289 /// # Arguments
290 ///
291 /// * `prompt` - The user prompt to send
292 ///
293 /// # Errors
294 ///
295 /// Returns an error if the client is not connected or if sending fails.
296 ///
297 /// # Example
298 ///
299 /// ```no_run
300 /// # use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions};
301 /// # #[tokio::main]
302 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
303 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
304 /// # client.connect().await?;
305 /// client.query("What is 2 + 2?").await?;
306 /// # Ok(())
307 /// # }
308 /// ```
309 #[instrument(
310 name = "claude.client.query",
311 skip(self, prompt),
312 fields(session_id = "default",)
313 )]
314 pub async fn query(&mut self, prompt: impl Into<String>) -> Result<()> {
315 self.query_with_session(prompt, "default").await
316 }
317
318 /// Send a query to Claude with a specific session ID
319 ///
320 /// This sends a new user prompt to Claude. Different session IDs maintain
321 /// separate conversation contexts.
322 ///
323 /// # Arguments
324 ///
325 /// * `prompt` - The user prompt to send
326 /// * `session_id` - Session identifier for the conversation
327 ///
328 /// # Errors
329 ///
330 /// Returns an error if the client is not connected or if sending fails.
331 ///
332 /// # Example
333 ///
334 /// ```no_run
335 /// # use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions};
336 /// # #[tokio::main]
337 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
338 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
339 /// # client.connect().await?;
340 /// // Separate conversation contexts
341 /// client.query_with_session("First question", "session-1").await?;
342 /// client.query_with_session("Different question", "session-2").await?;
343 /// # Ok(())
344 /// # }
345 /// ```
346 pub async fn query_with_session(
347 &mut self,
348 prompt: impl Into<String>,
349 session_id: impl Into<String>,
350 ) -> Result<()> {
351 let query_manager = self.query_manager.as_ref().ok_or_else(|| {
352 ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
353 })?;
354
355 let prompt_str = prompt.into();
356 let session_id_str = session_id.into();
357
358 // Get or create query for this session (reuses existing query to maintain context)
359 let query_id = query_manager
360 .get_or_create_session_query(&session_id_str)
361 .await?;
362 self.current_query_id = Some(query_id.clone());
363
364 // Get the isolated query
365 let query = query_manager.get_query(&query_id)?;
366
367 // Format as JSON message for stream-json input format
368 let user_message = serde_json::json!({
369 "type": "user",
370 "message": {
371 "role": "user",
372 "content": prompt_str
373 },
374 "session_id": session_id_str
375 });
376
377 let message_str = serde_json::to_string(&user_message).map_err(|e| {
378 ClaudeError::Transport(format!("Failed to serialize user message: {}", e))
379 })?;
380
381 // Write directly to stdin (bypasses transport lock)
382 let stdin = query.stdin.clone();
383
384 if let Some(stdin_arc) = stdin {
385 let mut stdin_guard = stdin_arc.lock().await;
386 if let Some(ref mut stdin_stream) = *stdin_guard {
387 stdin_stream
388 .write_all(message_str.as_bytes())
389 .await
390 .map_err(|e| ClaudeError::Transport(format!("Failed to write query: {}", e)))?;
391 stdin_stream.write_all(b"\n").await.map_err(|e| {
392 ClaudeError::Transport(format!("Failed to write newline: {}", e))
393 })?;
394 stdin_stream
395 .flush()
396 .await
397 .map_err(|e| ClaudeError::Transport(format!("Failed to flush: {}", e)))?;
398 } else {
399 return Err(ClaudeError::Transport("stdin not available".to_string()));
400 }
401 } else {
402 return Err(ClaudeError::Transport("stdin not set".to_string()));
403 }
404
405 debug!(
406 query_id = %query_id,
407 session_id = %session_id_str,
408 "Sent query to isolated query"
409 );
410
411 Ok(())
412 }
413
414 /// Send a query with structured content blocks (supports images)
415 ///
416 /// This method enables multimodal queries in bidirectional streaming mode.
417 /// Use it to send images alongside text for vision-related tasks.
418 ///
419 /// # Arguments
420 ///
421 /// * `content` - A vector of content blocks (text and/or images)
422 ///
423 /// # Errors
424 ///
425 /// Returns an error if:
426 /// - The content vector is empty (must include at least one text or image block)
427 /// - The client is not connected (call `connect()` first)
428 /// - Sending the message fails
429 ///
430 /// # Example
431 ///
432 /// ```no_run
433 /// # use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions, UserContentBlock};
434 /// # #[tokio::main]
435 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
436 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
437 /// # client.connect().await?;
438 /// let base64_data = "iVBORw0KGgo..."; // base64 encoded image
439 /// client.query_with_content(vec![
440 /// UserContentBlock::text("What's in this image?"),
441 /// UserContentBlock::image_base64("image/png", base64_data)?,
442 /// ]).await?;
443 /// # Ok(())
444 /// # }
445 /// ```
446 pub async fn query_with_content(
447 &mut self,
448 content: impl Into<Vec<UserContentBlock>>,
449 ) -> Result<()> {
450 self.query_with_content_and_session(content, "default")
451 .await
452 }
453
454 /// Send a query with structured content blocks and a specific session ID
455 ///
456 /// This method enables multimodal queries with session management for
457 /// maintaining separate conversation contexts.
458 ///
459 /// # Arguments
460 ///
461 /// * `content` - A vector of content blocks (text and/or images)
462 /// * `session_id` - Session identifier for the conversation
463 ///
464 /// # Errors
465 ///
466 /// Returns an error if:
467 /// - The content vector is empty (must include at least one text or image block)
468 /// - The client is not connected (call `connect()` first)
469 /// - Sending the message fails
470 ///
471 /// # Example
472 ///
473 /// ```no_run
474 /// # use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions, UserContentBlock};
475 /// # #[tokio::main]
476 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
477 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
478 /// # client.connect().await?;
479 /// client.query_with_content_and_session(
480 /// vec![
481 /// UserContentBlock::text("Analyze this chart"),
482 /// UserContentBlock::image_url("https://example.com/chart.png"),
483 /// ],
484 /// "analysis-session",
485 /// ).await?;
486 /// # Ok(())
487 /// # }
488 /// ```
489 pub async fn query_with_content_and_session(
490 &mut self,
491 content: impl Into<Vec<UserContentBlock>>,
492 session_id: impl Into<String>,
493 ) -> Result<()> {
494 let query_manager = self.query_manager.as_ref().ok_or_else(|| {
495 ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
496 })?;
497
498 let content_blocks: Vec<UserContentBlock> = content.into();
499 UserContentBlock::validate_content(&content_blocks)?;
500
501 let session_id_str = session_id.into();
502
503 // Get or create query for this session (reuses existing query to maintain context)
504 let query_id = query_manager
505 .get_or_create_session_query(&session_id_str)
506 .await?;
507 self.current_query_id = Some(query_id.clone());
508
509 // Get the isolated query
510 let query = query_manager.get_query(&query_id)?;
511
512 // Format as JSON message for stream-json input format
513 // Content is an array of content blocks, not a simple string
514 let user_message = serde_json::json!({
515 "type": "user",
516 "message": {
517 "role": "user",
518 "content": content_blocks
519 },
520 "session_id": session_id_str
521 });
522
523 let message_str = serde_json::to_string(&user_message).map_err(|e| {
524 ClaudeError::Transport(format!("Failed to serialize user message: {}", e))
525 })?;
526
527 // Write directly to stdin (bypasses transport lock)
528 let stdin = query.stdin.clone();
529
530 if let Some(stdin_arc) = stdin {
531 let mut stdin_guard = stdin_arc.lock().await;
532 if let Some(ref mut stdin_stream) = *stdin_guard {
533 stdin_stream
534 .write_all(message_str.as_bytes())
535 .await
536 .map_err(|e| ClaudeError::Transport(format!("Failed to write query: {}", e)))?;
537 stdin_stream.write_all(b"\n").await.map_err(|e| {
538 ClaudeError::Transport(format!("Failed to write newline: {}", e))
539 })?;
540 stdin_stream
541 .flush()
542 .await
543 .map_err(|e| ClaudeError::Transport(format!("Failed to flush: {}", e)))?;
544 } else {
545 return Err(ClaudeError::Transport("stdin not available".to_string()));
546 }
547 } else {
548 return Err(ClaudeError::Transport("stdin not set".to_string()));
549 }
550
551 debug!(
552 query_id = %query_id,
553 session_id = %session_id_str,
554 "Sent content query to isolated query"
555 );
556
557 Ok(())
558 }
559
560 /// Receive all messages as a stream (continuous)
561 ///
562 /// This method returns a stream that yields all messages from Claude
563 /// indefinitely until the stream is closed or an error occurs.
564 ///
565 /// Use this when you want to process all messages, including multiple
566 /// responses and system events.
567 ///
568 /// # Returns
569 ///
570 /// A stream of `Result<Message>` that continues until the connection closes.
571 ///
572 /// # Example
573 ///
574 /// ```no_run
575 /// # use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions};
576 /// # use futures::StreamExt;
577 /// # #[tokio::main]
578 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
579 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
580 /// # client.connect().await?;
581 /// # client.query("Hello").await?;
582 /// let mut stream = client.receive_messages();
583 /// while let Some(message) = stream.next().await {
584 /// println!("Received: {:?}", message?);
585 /// }
586 /// # Ok(())
587 /// # }
588 /// ```
589 pub fn receive_messages(&self) -> Pin<Box<dyn Stream<Item = Result<Message>> + Send + '_>> {
590 let query_manager = match &self.query_manager {
591 Some(qm) => Arc::clone(qm),
592 None => {
593 return Box::pin(futures::stream::once(async {
594 Err(ClaudeError::InvalidConfig(
595 "Client not connected. Call connect() first.".to_string(),
596 ))
597 }));
598 }
599 };
600
601 let query_id = match &self.current_query_id {
602 Some(id) => id.clone(),
603 None => {
604 return Box::pin(futures::stream::once(async {
605 Err(ClaudeError::InvalidConfig(
606 "No active query. Call query() first.".to_string(),
607 ))
608 }));
609 }
610 };
611
612 Box::pin(async_stream::stream! {
613 // Get the isolated query and its message receiver
614 let query = match query_manager.get_query(&query_id) {
615 Ok(q) => q,
616 Err(e) => {
617 yield Err(e);
618 return;
619 }
620 };
621
622 let rx: Arc<Mutex<tokio::sync::mpsc::UnboundedReceiver<serde_json::Value>>> = {
623 Arc::clone(&query.message_rx)
624 };
625
626 loop {
627 let message = {
628 let mut rx_guard = rx.lock().await;
629 rx_guard.recv().await
630 };
631
632 match message {
633 Some(json) => {
634 match MessageParser::parse(json) {
635 Ok(msg) => yield Ok(msg),
636 Err(e) => {
637 eprintln!("Failed to parse message: {}", e);
638 yield Err(e);
639 }
640 }
641 }
642 None => break,
643 }
644 }
645 })
646 }
647
648 /// Receive messages until a ResultMessage
649 ///
650 /// This method returns a stream that yields messages until it encounters
651 /// a `ResultMessage`, which signals the completion of a Claude response.
652 ///
653 /// This is the most common pattern for handling Claude responses, as it
654 /// processes one complete "turn" of the conversation.
655 ///
656 /// This method uses query-scoped message channels to ensure message isolation,
657 /// preventing late-arriving ResultMessages from being consumed by the wrong prompt.
658 ///
659 /// # Returns
660 ///
661 /// A stream of `Result<Message>` that ends when a ResultMessage is received.
662 ///
663 /// # Example
664 ///
665 /// ```no_run
666 /// # use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions, Message};
667 /// # use futures::StreamExt;
668 /// # #[tokio::main]
669 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
670 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
671 /// # client.connect().await?;
672 /// # client.query("Hello").await?;
673 /// let mut stream = client.receive_response();
674 /// while let Some(message) = stream.next().await {
675 /// match message? {
676 /// Message::Assistant(msg) => println!("Assistant: {:?}", msg),
677 /// Message::Result(result) => {
678 /// println!("Done! Cost: ${:?}", result.total_cost_usd);
679 /// break;
680 /// }
681 /// _ => {}
682 /// }
683 /// }
684 /// # Ok(())
685 /// # }
686 /// ```
687 pub fn receive_response(&self) -> Pin<Box<dyn Stream<Item = Result<Message>> + Send + '_>> {
688 let query_manager = match &self.query_manager {
689 Some(qm) => Arc::clone(qm),
690 None => {
691 return Box::pin(futures::stream::once(async {
692 Err(ClaudeError::InvalidConfig(
693 "Client not connected. Call connect() first.".to_string(),
694 ))
695 }));
696 }
697 };
698
699 let query_id = match &self.current_query_id {
700 Some(id) => id.clone(),
701 None => {
702 return Box::pin(futures::stream::once(async {
703 Err(ClaudeError::InvalidConfig(
704 "No active query. Call query() first.".to_string(),
705 ))
706 }));
707 }
708 };
709
710 Box::pin(async_stream::stream! {
711 // ====================================================================
712 // ISOLATED QUERY MESSAGE CHANNEL (Codex-style)
713 // ====================================================================
714 // In the Codex-style architecture, each query has its own completely
715 // isolated QueryFull instance. We get the message receiver directly
716 // from the isolated query, eliminating the need for routing logic.
717 //
718 // This provides:
719 // - Complete message isolation
720 // - No possibility of message confusion
721 // - Simpler architecture without routing overhead
722 //
723 // Note: Cleanup is handled by the periodic cleanup task in QueryManager,
724 // which removes inactive queries whose channels have been closed.
725
726 debug!(
727 query_id = %query_id,
728 "Getting message receiver from isolated query"
729 );
730
731 // Get the isolated query and its message receiver
732 let query = match query_manager.get_query(&query_id) {
733 Ok(q) => q,
734 Err(e) => {
735 yield Err(e);
736 return;
737 }
738 };
739
740 // Get the message receiver from the isolated query
741 // In isolated mode, we directly access the message_rx without routing
742 let rx: Arc<Mutex<tokio::sync::mpsc::UnboundedReceiver<serde_json::Value>>> = {
743 Arc::clone(&query.message_rx)
744 };
745
746 loop {
747 let message = {
748 let mut rx_guard = rx.lock().await;
749 rx_guard.recv().await
750 };
751
752 match message {
753 Some(json) => {
754 match MessageParser::parse(json) {
755 Ok(msg) => {
756 let is_result = matches!(msg, Message::Result(_));
757 yield Ok(msg);
758 if is_result {
759 debug!(
760 query_id = %query_id,
761 "Received ResultMessage, ending stream"
762 );
763 // Cleanup will be handled by the periodic cleanup task
764 // when the query becomes inactive
765 break;
766 }
767 }
768 Err(e) => {
769 eprintln!("Failed to parse message: {}", e);
770 yield Err(e);
771 }
772 }
773 }
774 None => {
775 debug!(
776 query_id = %query_id,
777 "Isolated query channel closed"
778 );
779 // Cleanup will be handled by the periodic cleanup task
780 break;
781 }
782 }
783 }
784 })
785 }
786
787 /// Send an interrupt signal to stop the current Claude operation
788 ///
789 /// This is analogous to Python's `client.interrupt()`.
790 ///
791 /// # Errors
792 ///
793 /// Returns an error if the client is not connected or if sending fails.
794 pub async fn interrupt(&self) -> Result<()> {
795 let query_manager = self.query_manager.as_ref().ok_or_else(|| {
796 ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
797 })?;
798
799 let query_id = self.current_query_id.as_ref().ok_or_else(|| {
800 ClaudeError::InvalidConfig("No active query. Call query() first.".to_string())
801 })?;
802
803 let query = query_manager.get_query(query_id)?;
804 query.interrupt().await
805 }
806
807 /// Change the permission mode dynamically
808 ///
809 /// This is analogous to Python's `client.set_permission_mode()`.
810 ///
811 /// # Arguments
812 ///
813 /// * `mode` - The new permission mode to set
814 ///
815 /// # Errors
816 ///
817 /// Returns an error if the client is not connected or if sending fails.
818 pub async fn set_permission_mode(&self, mode: PermissionMode) -> Result<()> {
819 let query_manager = self.query_manager.as_ref().ok_or_else(|| {
820 ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
821 })?;
822
823 let query_id = self.current_query_id.as_ref().ok_or_else(|| {
824 ClaudeError::InvalidConfig("No active query. Call query() first.".to_string())
825 })?;
826
827 let query = query_manager.get_query(query_id)?;
828 query.set_permission_mode(mode).await
829 }
830
831 /// Change the AI model dynamically
832 ///
833 /// This is analogous to Python's `client.set_model()`.
834 ///
835 /// # Arguments
836 ///
837 /// * `model` - The new model name, or None to use default
838 ///
839 /// # Errors
840 ///
841 /// Returns an error if the client is not connected or if sending fails.
842 pub async fn set_model(&self, model: Option<&str>) -> Result<()> {
843 let query_manager = self.query_manager.as_ref().ok_or_else(|| {
844 ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
845 })?;
846
847 let query_id = self.current_query_id.as_ref().ok_or_else(|| {
848 ClaudeError::InvalidConfig("No active query. Call query() first.".to_string())
849 })?;
850
851 let query = query_manager.get_query(query_id)?;
852 query.set_model(model).await
853 }
854
855 /// Rewind tracked files to their state at a specific user message.
856 ///
857 /// This is analogous to Python's `client.rewind_files()`.
858 ///
859 /// # Requirements
860 ///
861 /// - `enable_file_checkpointing=true` in options to track file changes
862 /// - `extra_args={"replay-user-messages": None}` to receive UserMessage
863 /// objects with `uuid` in the response stream
864 ///
865 /// # Arguments
866 ///
867 /// * `user_message_id` - UUID of the user message to rewind to. This should be
868 /// the `uuid` field from a `UserMessage` received during the conversation.
869 ///
870 /// # Errors
871 ///
872 /// Returns an error if the client is not connected or if sending fails.
873 ///
874 /// # Example
875 ///
876 /// ```no_run
877 /// # use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions, Message};
878 /// # use std::collections::HashMap;
879 /// # #[tokio::main]
880 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
881 /// let options = ClaudeAgentOptions::builder()
882 /// .enable_file_checkpointing(true)
883 /// .extra_args(HashMap::from([("replay-user-messages".to_string(), None)]))
884 /// .build();
885 /// let mut client = ClaudeClient::new(options);
886 /// client.connect().await?;
887 ///
888 /// client.query("Make some changes to my files").await?;
889 /// let mut checkpoint_id = None;
890 /// {
891 /// let mut stream = client.receive_response();
892 /// use futures::StreamExt;
893 /// while let Some(Ok(msg)) = stream.next().await {
894 /// if let Message::User(user_msg) = &msg {
895 /// if let Some(uuid) = &user_msg.uuid {
896 /// checkpoint_id = Some(uuid.clone());
897 /// }
898 /// }
899 /// }
900 /// }
901 ///
902 /// // Later, rewind to that point
903 /// if let Some(id) = checkpoint_id {
904 /// client.rewind_files(&id).await?;
905 /// }
906 /// # Ok(())
907 /// # }
908 /// ```
909 pub async fn rewind_files(&self, user_message_id: &str) -> Result<()> {
910 let query_manager = self.query_manager.as_ref().ok_or_else(|| {
911 ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
912 })?;
913
914 let query_id = self.current_query_id.as_ref().ok_or_else(|| {
915 ClaudeError::InvalidConfig("No active query. Call query() first.".to_string())
916 })?;
917
918 let query = query_manager.get_query(query_id)?;
919 query.rewind_files(user_message_id).await
920 }
921
922 /// Get server initialization info including available commands and output styles
923 ///
924 /// Returns initialization information from the Claude Code server including:
925 /// - Available commands (slash commands, system commands, etc.)
926 /// - Current and available output styles
927 /// - Server capabilities
928 ///
929 /// This is analogous to Python's `client.get_server_info()`.
930 ///
931 /// # Returns
932 ///
933 /// Dictionary with server info, or None if not connected
934 ///
935 /// # Example
936 ///
937 /// ```no_run
938 /// # use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions};
939 /// # #[tokio::main]
940 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
941 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
942 /// # client.connect().await?;
943 /// if let Some(info) = client.get_server_info().await {
944 /// println!("Commands available: {}", info.get("commands").map(|c| c.as_array().map(|a| a.len()).unwrap_or(0)).unwrap_or(0));
945 /// println!("Output style: {:?}", info.get("output_style"));
946 /// }
947 /// # Ok(())
948 /// # }
949 /// ```
950 pub async fn get_server_info(&self) -> Option<serde_json::Value> {
951 let query_manager = self.query_manager.as_ref()?;
952 let query_id = self.current_query_id.as_ref()?;
953 let Ok(query) = query_manager.get_query(query_id) else {
954 return None;
955 };
956 query.get_initialization_result().await
957 }
958
959 /// Get current MCP server connection status
960 ///
961 /// Queries the Claude Code CLI for the live connection status of all
962 /// configured MCP servers.
963 ///
964 /// This is analogous to Python's `client.get_mcp_status()`.
965 ///
966 /// # Returns
967 ///
968 /// Dictionary with MCP server status information. Contains a
969 /// `mcpServers` key with a list of server status objects, each having:
970 /// - `name`: Server name
971 /// - `status`: Connection status (`connected`, `pending`, `failed`,
972 /// `needs-auth`, `disabled`)
973 ///
974 /// # Errors
975 ///
976 /// Returns an error if the client is not connected or if the request fails.
977 ///
978 /// # Example
979 ///
980 /// ```no_run
981 /// # use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions};
982 /// # #[tokio::main]
983 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
984 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
985 /// # client.connect().await?;
986 /// let status = client.get_mcp_status().await?;
987 /// println!("MCP status: {:?}", status);
988 /// # Ok(())
989 /// # }
990 /// ```
991 pub async fn get_mcp_status(&self) -> Result<serde_json::Value> {
992 let query_manager = self.query_manager.as_ref().ok_or_else(|| {
993 ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
994 })?;
995
996 let query_id = self.current_query_id.as_ref().ok_or_else(|| {
997 ClaudeError::InvalidConfig("No active query. Call query() first.".to_string())
998 })?;
999
1000 let query = query_manager.get_query(query_id)?;
1001 query.get_mcp_status().await
1002 }
1003
1004 /// Start a new session by switching to a different session ID
1005 ///
1006 /// This is a convenience method that creates a new conversation context.
1007 /// It's equivalent to calling `query_with_session()` with a new session ID.
1008 ///
1009 /// To completely clear memory and start fresh, use `ClaudeAgentOptions::builder().fork_session(true).build()`
1010 /// when creating a new client.
1011 ///
1012 /// # Arguments
1013 ///
1014 /// * `session_id` - The new session ID to use
1015 /// * `prompt` - Initial message for the new session
1016 ///
1017 /// # Errors
1018 ///
1019 /// Returns an error if the client is not connected or if sending fails.
1020 ///
1021 /// # Example
1022 ///
1023 /// ```no_run
1024 /// # use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions};
1025 /// # #[tokio::main]
1026 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
1027 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
1028 /// # client.connect().await?;
1029 /// // First conversation
1030 /// client.query("Hello").await?;
1031 ///
1032 /// // Start new conversation with different context
1033 /// client.new_session("session-2", "Tell me about Rust").await?;
1034 /// # Ok(())
1035 /// # }
1036 /// ```
1037 pub async fn new_session(
1038 &mut self,
1039 session_id: impl Into<String>,
1040 prompt: impl Into<String>,
1041 ) -> Result<()> {
1042 self.query_with_session(prompt, session_id).await
1043 }
1044
1045 /// Disconnect from Claude (analogous to Python's __aexit__)
1046 ///
1047 /// This cleanly shuts down the connection to Claude Code CLI.
1048 ///
1049 /// # Errors
1050 ///
1051 /// Returns an error if disconnection fails.
1052 #[instrument(name = "claude.client.disconnect", skip(self))]
1053 pub async fn disconnect(&mut self) -> Result<()> {
1054 if !self.connected {
1055 debug!("Client already disconnected");
1056 return Ok(());
1057 }
1058
1059 info!("Disconnecting from Claude Code CLI (closing all isolated queries)");
1060
1061 if let Some(query_manager) = self.query_manager.take() {
1062 // Get all queries for cleanup
1063 let queries = query_manager.get_all_queries();
1064
1065 // Close all isolated queries by closing their resources
1066 // This signals each CLI process to exit
1067 for (_query_id, query) in &queries {
1068 // Close stdin (if available)
1069 if let Some(ref stdin_arc) = query.stdin {
1070 let mut stdin_guard = stdin_arc.lock().await;
1071 if let Some(mut stdin_stream) = stdin_guard.take() {
1072 let _ = stdin_stream.shutdown().await;
1073 }
1074 }
1075
1076 // Close transport
1077 let transport = Arc::clone(&query.transport);
1078 let mut transport_guard = transport.lock().await;
1079 let _ = transport_guard.close().await;
1080 }
1081
1082 // Give background tasks a moment to finish reading and release locks
1083 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1084 }
1085
1086 self.current_query_id = None;
1087 self.connected = false;
1088 debug!("Disconnected successfully");
1089 Ok(())
1090 }
1091}
1092
1093impl Drop for ClaudeClient {
1094 fn drop(&mut self) {
1095 // Note: We can't run async code in Drop, so we can't guarantee clean shutdown
1096 // Users should call disconnect() explicitly
1097 if self.connected {
1098 eprintln!(
1099 "Warning: ClaudeClient dropped without calling disconnect(). Resources may not be cleaned up properly."
1100 );
1101 }
1102 }
1103}
1104
1105#[cfg(test)]
1106mod tests {
1107 use super::*;
1108 use crate::types::permissions::{PermissionResult, PermissionResultAllow};
1109 use std::sync::Arc;
1110
1111 #[tokio::test]
1112 async fn test_connect_rejects_can_use_tool_with_custom_permission_tool() {
1113 let callback: crate::types::permissions::CanUseToolCallback =
1114 Arc::new(|_tool_name, _tool_input, _context| {
1115 Box::pin(async move { PermissionResult::Allow(PermissionResultAllow::default()) })
1116 });
1117
1118 let opts = ClaudeAgentOptions::builder()
1119 .can_use_tool(callback)
1120 .permission_prompt_tool_name("custom_tool") // Not "stdio"
1121 .build();
1122
1123 let mut client = ClaudeClient::new(opts);
1124 let result = client.connect().await;
1125
1126 assert!(result.is_err());
1127 let err = result.unwrap_err();
1128 assert!(matches!(err, ClaudeError::InvalidConfig(_)));
1129 assert!(err.to_string().contains("permission_prompt_tool_name"));
1130 }
1131
1132 #[tokio::test]
1133 async fn test_connect_accepts_can_use_tool_with_stdio() {
1134 let callback: crate::types::permissions::CanUseToolCallback =
1135 Arc::new(|_tool_name, _tool_input, _context| {
1136 Box::pin(async move { PermissionResult::Allow(PermissionResultAllow::default()) })
1137 });
1138
1139 let opts = ClaudeAgentOptions::builder()
1140 .can_use_tool(callback)
1141 .permission_prompt_tool_name("stdio") // Explicitly "stdio" is OK
1142 .build();
1143
1144 let mut client = ClaudeClient::new(opts);
1145 // This will fail later (CLI not found), but should pass validation
1146 let result = client.connect().await;
1147
1148 // Should not be InvalidConfig error about permission_prompt_tool_name
1149 if let Err(ref err) = result {
1150 assert!(
1151 !err.to_string().contains("permission_prompt_tool_name"),
1152 "Should not fail on permission_prompt_tool_name validation"
1153 );
1154 }
1155 }
1156
1157 #[tokio::test]
1158 async fn test_connect_accepts_can_use_tool_without_permission_tool() {
1159 let callback: crate::types::permissions::CanUseToolCallback =
1160 Arc::new(|_tool_name, _tool_input, _context| {
1161 Box::pin(async move { PermissionResult::Allow(PermissionResultAllow::default()) })
1162 });
1163
1164 let opts = ClaudeAgentOptions::builder()
1165 .can_use_tool(callback)
1166 // No permission_prompt_tool_name set - defaults to stdio
1167 .build();
1168
1169 let mut client = ClaudeClient::new(opts);
1170 // This will fail later (CLI not found), but should pass validation
1171 let result = client.connect().await;
1172
1173 // Should not be InvalidConfig error about permission_prompt_tool_name
1174 if let Err(ref err) = result {
1175 assert!(
1176 !err.to_string().contains("permission_prompt_tool_name"),
1177 "Should not fail on permission_prompt_tool_name validation"
1178 );
1179 }
1180 }
1181}