claude_code_agent_sdk/client.rs
1//! ClaudeClient for bidirectional streaming interactions with hook support
2
3use tracing::{debug, info, instrument};
4
5use futures::stream::Stream;
6use std::pin::Pin;
7use std::sync::Arc;
8use tokio::io::AsyncWriteExt;
9use tokio::sync::Mutex;
10
11use crate::errors::{ClaudeError, Result};
12use crate::internal::message_parser::MessageParser;
13use crate::internal::query_manager::QueryManager;
14use crate::internal::transport::subprocess::QueryPrompt;
15use crate::internal::transport::{SubprocessTransport, Transport};
16use crate::types::config::{ClaudeAgentOptions, PermissionMode};
17use crate::types::efficiency::{build_efficiency_hooks, merge_hooks};
18use crate::types::hooks::HookEvent;
19use crate::types::messages::{Message, UserContentBlock};
20
21/// Client for bidirectional streaming interactions with Claude
22///
23/// This client provides the same functionality as Python's ClaudeSDKClient,
24/// supporting bidirectional communication, streaming responses, and dynamic
25/// control over the Claude session.
26///
27/// This implementation uses the Codex-style architecture with isolated queries,
28/// where each query has its own completely independent QueryFull instance.
29/// This ensures complete message isolation and prevents ResultMessage confusion.
30///
31/// # Example
32///
33/// ```no_run
34/// use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions};
35/// use futures::StreamExt;
36///
37/// #[tokio::main]
38/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
39/// let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
40///
41/// // Connect to Claude
42/// client.connect().await?;
43///
44/// // Send a query
45/// client.query("Hello Claude!").await?;
46///
47/// // Receive response as a stream
48/// {
49/// let mut stream = client.receive_response();
50/// while let Some(message) = stream.next().await {
51/// println!("Received: {:?}", message?);
52/// }
53/// }
54///
55/// // Disconnect
56/// client.disconnect().await?;
57/// Ok(())
58/// }
59/// ```
60pub struct ClaudeClient {
61 options: ClaudeAgentOptions,
62 /// QueryManager for creating isolated queries (Codex-style architecture)
63 query_manager: Option<Arc<QueryManager>>,
64 /// Current query_id for the active prompt
65 current_query_id: Option<String>,
66 connected: bool,
67}
68
69impl ClaudeClient {
70 /// Create a new ClaudeClient
71 ///
72 /// # Arguments
73 ///
74 /// * `options` - Configuration options for the Claude client
75 ///
76 /// # Example
77 ///
78 /// ```no_run
79 /// use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions};
80 ///
81 /// let client = ClaudeClient::new(ClaudeAgentOptions::default());
82 /// ```
83 pub fn new(options: ClaudeAgentOptions) -> Self {
84 Self {
85 options,
86 query_manager: None,
87 current_query_id: None,
88 connected: false,
89 }
90 }
91
92 /// Create a new ClaudeClient with early validation
93 ///
94 /// Unlike `new()`, this validates the configuration eagerly by attempting
95 /// to create the transport. This catches issues like invalid working directory
96 /// or missing CLI before `connect()` is called.
97 ///
98 /// # Arguments
99 ///
100 /// * `options` - Configuration options for the Claude client
101 ///
102 /// # Errors
103 ///
104 /// Returns an error if:
105 /// - The working directory does not exist or is not a directory
106 /// - Claude CLI cannot be found
107 ///
108 /// # Example
109 ///
110 /// ```no_run
111 /// use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions};
112 ///
113 /// let client = ClaudeClient::try_new(ClaudeAgentOptions::default())?;
114 /// # Ok::<(), claude_code_agent_sdk::ClaudeError>(())
115 /// ```
116 pub fn try_new(options: ClaudeAgentOptions) -> Result<Self> {
117 // Validate by attempting to create transport (but don't keep it)
118 let prompt = QueryPrompt::Streaming;
119 let _ = SubprocessTransport::new(prompt, options.clone())?;
120
121 Ok(Self {
122 options,
123 query_manager: None,
124 current_query_id: None,
125 connected: false,
126 })
127 }
128
129 /// Connect to Claude (analogous to Python's __aenter__)
130 ///
131 /// This establishes the connection to the Claude Code CLI and initializes
132 /// the bidirectional communication channel.
133 ///
134 /// Uses the Codex-style architecture with QueryManager for complete
135 /// message isolation between queries.
136 ///
137 /// # Errors
138 ///
139 /// Returns an error if:
140 /// - Claude CLI cannot be found or started
141 /// - The initialization handshake fails
142 /// - Hook registration fails
143 /// - `can_use_tool` callback is set with incompatible `permission_prompt_tool_name`
144 #[instrument(
145 name = "claude.client.connect",
146 skip(self),
147 fields(
148 has_can_use_tool = self.options.can_use_tool.is_some(),
149 has_hooks = self.options.hooks.is_some(),
150 model = %self.options.model.as_deref().unwrap_or("default"),
151 )
152 )]
153 pub async fn connect(&mut self) -> Result<()> {
154 if self.connected {
155 debug!("Client already connected, skipping");
156 return Ok(());
157 }
158
159 info!("Connecting to Claude Code CLI (using QueryManager for isolated queries)");
160
161 // Automatically set permission_prompt_tool_name to "stdio" when can_use_tool is provided
162 // This matches Python SDK behavior (client.py lines 106-122)
163 // which ensures CLI uses control protocol for permission prompts
164 let mut options = self.options.clone();
165 if options.can_use_tool.is_some() && options.permission_prompt_tool_name.is_none() {
166 info!("can_use_tool callback is set, automatically setting permission_prompt_tool_name to 'stdio'");
167 options.permission_prompt_tool_name = Some("stdio".to_string());
168 }
169
170 // Validate can_use_tool configuration (aligned with Python SDK behavior)
171 // When can_use_tool callback is set, permission_prompt_tool_name must be "stdio"
172 // to ensure the control protocol can handle permission requests
173 if options.can_use_tool.is_some()
174 && let Some(ref tool_name) = options.permission_prompt_tool_name
175 && tool_name != "stdio"
176 {
177 return Err(ClaudeError::InvalidConfig(
178 "can_use_tool callback requires permission_prompt_tool_name to be 'stdio' or unset. \
179 Custom permission_prompt_tool_name is incompatible with can_use_tool callback."
180 .to_string(),
181 ));
182 }
183
184 // Prepare hooks for initialization
185 // Build efficiency hooks if configured
186 let efficiency_hooks = self
187 .options
188 .efficiency
189 .as_ref()
190 .map(build_efficiency_hooks)
191 .unwrap_or_default();
192
193 // Merge user hooks with efficiency hooks
194 let merged_hooks = merge_hooks(self.options.hooks.clone(), efficiency_hooks);
195
196 // Convert hooks to internal format
197 let hooks = merged_hooks.as_ref().map(|hooks_map| {
198 hooks_map
199 .iter()
200 .map(|(event, matchers)| {
201 let event_name = match event {
202 HookEvent::PreToolUse => "PreToolUse",
203 HookEvent::PostToolUse => "PostToolUse",
204 HookEvent::UserPromptSubmit => "UserPromptSubmit",
205 HookEvent::Stop => "Stop",
206 HookEvent::SubagentStop => "SubagentStop",
207 HookEvent::PreCompact => "PreCompact",
208 };
209 (event_name.to_string(), matchers.clone())
210 })
211 .collect()
212 });
213
214 // Extract SDK MCP servers from options
215 let sdk_mcp_servers =
216 if let crate::types::mcp::McpServers::Dict(servers_dict) = &self.options.mcp_servers {
217 servers_dict
218 .iter()
219 .filter_map(|(name, config)| {
220 if let crate::types::mcp::McpServerConfig::Sdk(sdk_config) = config {
221 Some((name.clone(), sdk_config.clone()))
222 } else {
223 None
224 }
225 })
226 .collect()
227 } else {
228 std::collections::HashMap::new()
229 };
230
231 // Clone options for the transport factory
232 let options_clone = options.clone();
233
234 // Create QueryManager with a transport factory
235 // The factory creates new transports for each isolated query
236 // Note: connect() is called later in create_query(), not here
237 let mut query_manager = QueryManager::new(move || {
238 let prompt = QueryPrompt::Streaming;
239 let transport = SubprocessTransport::new(prompt, options_clone.clone())?;
240 Ok(Box::new(transport) as Box<dyn Transport>)
241 });
242
243 // Set control request timeout
244 query_manager.set_control_request_timeout(self.options.control_request_timeout);
245
246 // Set configuration on QueryManager
247 query_manager.set_hooks(hooks).await;
248 query_manager.set_sdk_mcp_servers(sdk_mcp_servers).await;
249 query_manager.set_can_use_tool(self.options.can_use_tool.clone()).await;
250
251 let query_manager = Arc::new(query_manager);
252
253 // Create the first query for initialization
254 let first_query_id = query_manager.create_query().await?;
255
256 // Start cleanup task for inactive queries
257 Arc::clone(&query_manager).start_cleanup_task();
258
259 self.query_manager = Some(query_manager);
260 self.current_query_id = Some(first_query_id);
261 self.connected = true;
262
263 info!("Successfully connected to Claude Code CLI with QueryManager");
264 Ok(())
265 }
266
267 /// Send a query to Claude
268 ///
269 /// This sends a new user prompt to Claude. Claude will remember the context
270 /// of previous queries within the same session.
271 ///
272 /// # Arguments
273 ///
274 /// * `prompt` - The user prompt to send
275 ///
276 /// # Errors
277 ///
278 /// Returns an error if the client is not connected or if sending fails.
279 ///
280 /// # Example
281 ///
282 /// ```no_run
283 /// # use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions};
284 /// # #[tokio::main]
285 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
286 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
287 /// # client.connect().await?;
288 /// client.query("What is 2 + 2?").await?;
289 /// # Ok(())
290 /// # }
291 /// ```
292 #[instrument(
293 name = "claude.client.query",
294 skip(self, prompt),
295 fields(session_id = "default",)
296 )]
297 pub async fn query(&mut self, prompt: impl Into<String>) -> Result<()> {
298 self.query_with_session(prompt, "default").await
299 }
300
301 /// Send a query to Claude with a specific session ID
302 ///
303 /// This sends a new user prompt to Claude. Different session IDs maintain
304 /// separate conversation contexts.
305 ///
306 /// # Arguments
307 ///
308 /// * `prompt` - The user prompt to send
309 /// * `session_id` - Session identifier for the conversation
310 ///
311 /// # Errors
312 ///
313 /// Returns an error if the client is not connected or if sending fails.
314 ///
315 /// # Example
316 ///
317 /// ```no_run
318 /// # use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions};
319 /// # #[tokio::main]
320 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
321 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
322 /// # client.connect().await?;
323 /// // Separate conversation contexts
324 /// client.query_with_session("First question", "session-1").await?;
325 /// client.query_with_session("Different question", "session-2").await?;
326 /// # Ok(())
327 /// # }
328 /// ```
329 pub async fn query_with_session(
330 &mut self,
331 prompt: impl Into<String>,
332 session_id: impl Into<String>,
333 ) -> Result<()> {
334 let query_manager = self.query_manager.as_ref().ok_or_else(|| {
335 ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
336 })?;
337
338 let prompt_str = prompt.into();
339 let session_id_str = session_id.into();
340
341 // Get or create query for this session (reuses existing query to maintain context)
342 let query_id = query_manager.get_or_create_session_query(&session_id_str).await?;
343 self.current_query_id = Some(query_id.clone());
344
345 // Get the isolated query
346 let query = query_manager.get_query(&query_id)?;
347
348 // Format as JSON message for stream-json input format
349 let user_message = serde_json::json!({
350 "type": "user",
351 "message": {
352 "role": "user",
353 "content": prompt_str
354 },
355 "session_id": session_id_str
356 });
357
358 let message_str = serde_json::to_string(&user_message).map_err(|e| {
359 ClaudeError::Transport(format!("Failed to serialize user message: {}", e))
360 })?;
361
362 // Write directly to stdin (bypasses transport lock)
363 let stdin = query.stdin.clone();
364
365 if let Some(stdin_arc) = stdin {
366 let mut stdin_guard = stdin_arc.lock().await;
367 if let Some(ref mut stdin_stream) = *stdin_guard {
368 stdin_stream
369 .write_all(message_str.as_bytes())
370 .await
371 .map_err(|e| ClaudeError::Transport(format!("Failed to write query: {}", e)))?;
372 stdin_stream.write_all(b"\n").await.map_err(|e| {
373 ClaudeError::Transport(format!("Failed to write newline: {}", e))
374 })?;
375 stdin_stream
376 .flush()
377 .await
378 .map_err(|e| ClaudeError::Transport(format!("Failed to flush: {}", e)))?;
379 } else {
380 return Err(ClaudeError::Transport("stdin not available".to_string()));
381 }
382 } else {
383 return Err(ClaudeError::Transport("stdin not set".to_string()));
384 }
385
386 debug!(
387 query_id = %query_id,
388 session_id = %session_id_str,
389 "Sent query to isolated query"
390 );
391
392 Ok(())
393 }
394
395 /// Send a query with structured content blocks (supports images)
396 ///
397 /// This method enables multimodal queries in bidirectional streaming mode.
398 /// Use it to send images alongside text for vision-related tasks.
399 ///
400 /// # Arguments
401 ///
402 /// * `content` - A vector of content blocks (text and/or images)
403 ///
404 /// # Errors
405 ///
406 /// Returns an error if:
407 /// - The content vector is empty (must include at least one text or image block)
408 /// - The client is not connected (call `connect()` first)
409 /// - Sending the message fails
410 ///
411 /// # Example
412 ///
413 /// ```no_run
414 /// # use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions, UserContentBlock};
415 /// # #[tokio::main]
416 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
417 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
418 /// # client.connect().await?;
419 /// let base64_data = "iVBORw0KGgo..."; // base64 encoded image
420 /// client.query_with_content(vec![
421 /// UserContentBlock::text("What's in this image?"),
422 /// UserContentBlock::image_base64("image/png", base64_data)?,
423 /// ]).await?;
424 /// # Ok(())
425 /// # }
426 /// ```
427 pub async fn query_with_content(
428 &mut self,
429 content: impl Into<Vec<UserContentBlock>>,
430 ) -> Result<()> {
431 self.query_with_content_and_session(content, "default")
432 .await
433 }
434
435 /// Send a query with structured content blocks and a specific session ID
436 ///
437 /// This method enables multimodal queries with session management for
438 /// maintaining separate conversation contexts.
439 ///
440 /// # Arguments
441 ///
442 /// * `content` - A vector of content blocks (text and/or images)
443 /// * `session_id` - Session identifier for the conversation
444 ///
445 /// # Errors
446 ///
447 /// Returns an error if:
448 /// - The content vector is empty (must include at least one text or image block)
449 /// - The client is not connected (call `connect()` first)
450 /// - Sending the message fails
451 ///
452 /// # Example
453 ///
454 /// ```no_run
455 /// # use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions, UserContentBlock};
456 /// # #[tokio::main]
457 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
458 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
459 /// # client.connect().await?;
460 /// client.query_with_content_and_session(
461 /// vec![
462 /// UserContentBlock::text("Analyze this chart"),
463 /// UserContentBlock::image_url("https://example.com/chart.png"),
464 /// ],
465 /// "analysis-session",
466 /// ).await?;
467 /// # Ok(())
468 /// # }
469 /// ```
470 pub async fn query_with_content_and_session(
471 &mut self,
472 content: impl Into<Vec<UserContentBlock>>,
473 session_id: impl Into<String>,
474 ) -> Result<()> {
475 let query_manager = self.query_manager.as_ref().ok_or_else(|| {
476 ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
477 })?;
478
479 let content_blocks: Vec<UserContentBlock> = content.into();
480 UserContentBlock::validate_content(&content_blocks)?;
481
482 let session_id_str = session_id.into();
483
484 // Get or create query for this session (reuses existing query to maintain context)
485 let query_id = query_manager.get_or_create_session_query(&session_id_str).await?;
486 self.current_query_id = Some(query_id.clone());
487
488 // Get the isolated query
489 let query = query_manager.get_query(&query_id)?;
490
491 // Format as JSON message for stream-json input format
492 // Content is an array of content blocks, not a simple string
493 let user_message = serde_json::json!({
494 "type": "user",
495 "message": {
496 "role": "user",
497 "content": content_blocks
498 },
499 "session_id": session_id_str
500 });
501
502 let message_str = serde_json::to_string(&user_message).map_err(|e| {
503 ClaudeError::Transport(format!("Failed to serialize user message: {}", e))
504 })?;
505
506 // Write directly to stdin (bypasses transport lock)
507 let stdin = query.stdin.clone();
508
509 if let Some(stdin_arc) = stdin {
510 let mut stdin_guard = stdin_arc.lock().await;
511 if let Some(ref mut stdin_stream) = *stdin_guard {
512 stdin_stream
513 .write_all(message_str.as_bytes())
514 .await
515 .map_err(|e| ClaudeError::Transport(format!("Failed to write query: {}", e)))?;
516 stdin_stream.write_all(b"\n").await.map_err(|e| {
517 ClaudeError::Transport(format!("Failed to write newline: {}", e))
518 })?;
519 stdin_stream
520 .flush()
521 .await
522 .map_err(|e| ClaudeError::Transport(format!("Failed to flush: {}", e)))?;
523 } else {
524 return Err(ClaudeError::Transport("stdin not available".to_string()));
525 }
526 } else {
527 return Err(ClaudeError::Transport("stdin not set".to_string()));
528 }
529
530 debug!(
531 query_id = %query_id,
532 session_id = %session_id_str,
533 "Sent content query to isolated query"
534 );
535
536 Ok(())
537 }
538
539 /// Receive all messages as a stream (continuous)
540 ///
541 /// This method returns a stream that yields all messages from Claude
542 /// indefinitely until the stream is closed or an error occurs.
543 ///
544 /// Use this when you want to process all messages, including multiple
545 /// responses and system events.
546 ///
547 /// # Returns
548 ///
549 /// A stream of `Result<Message>` that continues until the connection closes.
550 ///
551 /// # Example
552 ///
553 /// ```no_run
554 /// # use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions};
555 /// # use futures::StreamExt;
556 /// # #[tokio::main]
557 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
558 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
559 /// # client.connect().await?;
560 /// # client.query("Hello").await?;
561 /// let mut stream = client.receive_messages();
562 /// while let Some(message) = stream.next().await {
563 /// println!("Received: {:?}", message?);
564 /// }
565 /// # Ok(())
566 /// # }
567 /// ```
568 pub fn receive_messages(&self) -> Pin<Box<dyn Stream<Item = Result<Message>> + Send + '_>> {
569 let query_manager = match &self.query_manager {
570 Some(qm) => Arc::clone(qm),
571 None => {
572 return Box::pin(futures::stream::once(async {
573 Err(ClaudeError::InvalidConfig(
574 "Client not connected. Call connect() first.".to_string(),
575 ))
576 }));
577 }
578 };
579
580 let query_id = match &self.current_query_id {
581 Some(id) => id.clone(),
582 None => {
583 return Box::pin(futures::stream::once(async {
584 Err(ClaudeError::InvalidConfig(
585 "No active query. Call query() first.".to_string(),
586 ))
587 }));
588 }
589 };
590
591 Box::pin(async_stream::stream! {
592 // Get the isolated query and its message receiver
593 let query = match query_manager.get_query(&query_id) {
594 Ok(q) => q,
595 Err(e) => {
596 yield Err(e);
597 return;
598 }
599 };
600
601 let rx: Arc<Mutex<tokio::sync::mpsc::UnboundedReceiver<serde_json::Value>>> = {
602 Arc::clone(&query.message_rx)
603 };
604
605 loop {
606 let message = {
607 let mut rx_guard = rx.lock().await;
608 rx_guard.recv().await
609 };
610
611 match message {
612 Some(json) => {
613 match MessageParser::parse(json) {
614 Ok(msg) => yield Ok(msg),
615 Err(e) => {
616 eprintln!("Failed to parse message: {}", e);
617 yield Err(e);
618 }
619 }
620 }
621 None => break,
622 }
623 }
624 })
625 }
626
627 /// Receive messages until a ResultMessage
628 ///
629 /// This method returns a stream that yields messages until it encounters
630 /// a `ResultMessage`, which signals the completion of a Claude response.
631 ///
632 /// This is the most common pattern for handling Claude responses, as it
633 /// processes one complete "turn" of the conversation.
634 ///
635 /// This method uses query-scoped message channels to ensure message isolation,
636 /// preventing late-arriving ResultMessages from being consumed by the wrong prompt.
637 ///
638 /// # Returns
639 ///
640 /// A stream of `Result<Message>` that ends when a ResultMessage is received.
641 ///
642 /// # Example
643 ///
644 /// ```no_run
645 /// # use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions, Message};
646 /// # use futures::StreamExt;
647 /// # #[tokio::main]
648 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
649 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
650 /// # client.connect().await?;
651 /// # client.query("Hello").await?;
652 /// let mut stream = client.receive_response();
653 /// while let Some(message) = stream.next().await {
654 /// match message? {
655 /// Message::Assistant(msg) => println!("Assistant: {:?}", msg),
656 /// Message::Result(result) => {
657 /// println!("Done! Cost: ${:?}", result.total_cost_usd);
658 /// break;
659 /// }
660 /// _ => {}
661 /// }
662 /// }
663 /// # Ok(())
664 /// # }
665 /// ```
666 pub fn receive_response(&self) -> Pin<Box<dyn Stream<Item = Result<Message>> + Send + '_>> {
667 let query_manager = match &self.query_manager {
668 Some(qm) => Arc::clone(qm),
669 None => {
670 return Box::pin(futures::stream::once(async {
671 Err(ClaudeError::InvalidConfig(
672 "Client not connected. Call connect() first.".to_string(),
673 ))
674 }));
675 }
676 };
677
678 let query_id = match &self.current_query_id {
679 Some(id) => id.clone(),
680 None => {
681 return Box::pin(futures::stream::once(async {
682 Err(ClaudeError::InvalidConfig(
683 "No active query. Call query() first.".to_string(),
684 ))
685 }));
686 }
687 };
688
689 Box::pin(async_stream::stream! {
690 // ====================================================================
691 // ISOLATED QUERY MESSAGE CHANNEL (Codex-style)
692 // ====================================================================
693 // In the Codex-style architecture, each query has its own completely
694 // isolated QueryFull instance. We get the message receiver directly
695 // from the isolated query, eliminating the need for routing logic.
696 //
697 // This provides:
698 // - Complete message isolation
699 // - No possibility of message confusion
700 // - Simpler architecture without routing overhead
701 //
702 // Note: Cleanup is handled by the periodic cleanup task in QueryManager,
703 // which removes inactive queries whose channels have been closed.
704
705 debug!(
706 query_id = %query_id,
707 "Getting message receiver from isolated query"
708 );
709
710 // Get the isolated query and its message receiver
711 let query = match query_manager.get_query(&query_id) {
712 Ok(q) => q,
713 Err(e) => {
714 yield Err(e);
715 return;
716 }
717 };
718
719 // Get the message receiver from the isolated query
720 // In isolated mode, we directly access the message_rx without routing
721 let rx: Arc<Mutex<tokio::sync::mpsc::UnboundedReceiver<serde_json::Value>>> = {
722 Arc::clone(&query.message_rx)
723 };
724
725 loop {
726 let message = {
727 let mut rx_guard = rx.lock().await;
728 rx_guard.recv().await
729 };
730
731 match message {
732 Some(json) => {
733 match MessageParser::parse(json) {
734 Ok(msg) => {
735 let is_result = matches!(msg, Message::Result(_));
736 yield Ok(msg);
737 if is_result {
738 debug!(
739 query_id = %query_id,
740 "Received ResultMessage, ending stream"
741 );
742 // Cleanup will be handled by the periodic cleanup task
743 // when the query becomes inactive
744 break;
745 }
746 }
747 Err(e) => {
748 eprintln!("Failed to parse message: {}", e);
749 yield Err(e);
750 }
751 }
752 }
753 None => {
754 debug!(
755 query_id = %query_id,
756 "Isolated query channel closed"
757 );
758 // Cleanup will be handled by the periodic cleanup task
759 break;
760 }
761 }
762 }
763 })
764 }
765
766 /// Send an interrupt signal to stop the current Claude operation
767 ///
768 /// This is analogous to Python's `client.interrupt()`.
769 ///
770 /// # Errors
771 ///
772 /// Returns an error if the client is not connected or if sending fails.
773 pub async fn interrupt(&self) -> Result<()> {
774 let query_manager = self.query_manager.as_ref().ok_or_else(|| {
775 ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
776 })?;
777
778 let query_id = self.current_query_id.as_ref().ok_or_else(|| {
779 ClaudeError::InvalidConfig("No active query. Call query() first.".to_string())
780 })?;
781
782 let query = query_manager.get_query(query_id)?;
783 query.interrupt().await
784 }
785
786 /// Change the permission mode dynamically
787 ///
788 /// This is analogous to Python's `client.set_permission_mode()`.
789 ///
790 /// # Arguments
791 ///
792 /// * `mode` - The new permission mode to set
793 ///
794 /// # Errors
795 ///
796 /// Returns an error if the client is not connected or if sending fails.
797 pub async fn set_permission_mode(&self, mode: PermissionMode) -> Result<()> {
798 let query_manager = self.query_manager.as_ref().ok_or_else(|| {
799 ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
800 })?;
801
802 let query_id = self.current_query_id.as_ref().ok_or_else(|| {
803 ClaudeError::InvalidConfig("No active query. Call query() first.".to_string())
804 })?;
805
806 let query = query_manager.get_query(query_id)?;
807 query.set_permission_mode(mode).await
808 }
809
810 /// Change the AI model dynamically
811 ///
812 /// This is analogous to Python's `client.set_model()`.
813 ///
814 /// # Arguments
815 ///
816 /// * `model` - The new model name, or None to use default
817 ///
818 /// # Errors
819 ///
820 /// Returns an error if the client is not connected or if sending fails.
821 pub async fn set_model(&self, model: Option<&str>) -> Result<()> {
822 let query_manager = self.query_manager.as_ref().ok_or_else(|| {
823 ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
824 })?;
825
826 let query_id = self.current_query_id.as_ref().ok_or_else(|| {
827 ClaudeError::InvalidConfig("No active query. Call query() first.".to_string())
828 })?;
829
830 let query = query_manager.get_query(query_id)?;
831 query.set_model(model).await
832 }
833
834 /// Rewind tracked files to their state at a specific user message.
835 ///
836 /// This is analogous to Python's `client.rewind_files()`.
837 ///
838 /// # Requirements
839 ///
840 /// - `enable_file_checkpointing=true` in options to track file changes
841 /// - `extra_args={"replay-user-messages": None}` to receive UserMessage
842 /// objects with `uuid` in the response stream
843 ///
844 /// # Arguments
845 ///
846 /// * `user_message_id` - UUID of the user message to rewind to. This should be
847 /// the `uuid` field from a `UserMessage` received during the conversation.
848 ///
849 /// # Errors
850 ///
851 /// Returns an error if the client is not connected or if sending fails.
852 ///
853 /// # Example
854 ///
855 /// ```no_run
856 /// # use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions, Message};
857 /// # use std::collections::HashMap;
858 /// # #[tokio::main]
859 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
860 /// let options = ClaudeAgentOptions::builder()
861 /// .enable_file_checkpointing(true)
862 /// .extra_args(HashMap::from([("replay-user-messages".to_string(), None)]))
863 /// .build();
864 /// let mut client = ClaudeClient::new(options);
865 /// client.connect().await?;
866 ///
867 /// client.query("Make some changes to my files").await?;
868 /// let mut checkpoint_id = None;
869 /// {
870 /// let mut stream = client.receive_response();
871 /// use futures::StreamExt;
872 /// while let Some(Ok(msg)) = stream.next().await {
873 /// if let Message::User(user_msg) = &msg {
874 /// if let Some(uuid) = &user_msg.uuid {
875 /// checkpoint_id = Some(uuid.clone());
876 /// }
877 /// }
878 /// }
879 /// }
880 ///
881 /// // Later, rewind to that point
882 /// if let Some(id) = checkpoint_id {
883 /// client.rewind_files(&id).await?;
884 /// }
885 /// # Ok(())
886 /// # }
887 /// ```
888 pub async fn rewind_files(&self, user_message_id: &str) -> Result<()> {
889 let query_manager = self.query_manager.as_ref().ok_or_else(|| {
890 ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
891 })?;
892
893 let query_id = self.current_query_id.as_ref().ok_or_else(|| {
894 ClaudeError::InvalidConfig("No active query. Call query() first.".to_string())
895 })?;
896
897 let query = query_manager.get_query(query_id)?;
898 query.rewind_files(user_message_id).await
899 }
900
901 /// Get server initialization info including available commands and output styles
902 ///
903 /// Returns initialization information from the Claude Code server including:
904 /// - Available commands (slash commands, system commands, etc.)
905 /// - Current and available output styles
906 /// - Server capabilities
907 ///
908 /// This is analogous to Python's `client.get_server_info()`.
909 ///
910 /// # Returns
911 ///
912 /// Dictionary with server info, or None if not connected
913 ///
914 /// # Example
915 ///
916 /// ```no_run
917 /// # use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions};
918 /// # #[tokio::main]
919 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
920 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
921 /// # client.connect().await?;
922 /// if let Some(info) = client.get_server_info().await {
923 /// println!("Commands available: {}", info.get("commands").map(|c| c.as_array().map(|a| a.len()).unwrap_or(0)).unwrap_or(0));
924 /// println!("Output style: {:?}", info.get("output_style"));
925 /// }
926 /// # Ok(())
927 /// # }
928 /// ```
929 pub async fn get_server_info(&self) -> Option<serde_json::Value> {
930 let query_manager = self.query_manager.as_ref()?;
931 let query_id = self.current_query_id.as_ref()?;
932 let Ok(query) = query_manager.get_query(query_id) else {
933 return None;
934 };
935 query.get_initialization_result().await
936 }
937
938 /// Start a new session by switching to a different session ID
939 ///
940 /// This is a convenience method that creates a new conversation context.
941 /// It's equivalent to calling `query_with_session()` with a new session ID.
942 ///
943 /// To completely clear memory and start fresh, use `ClaudeAgentOptions::builder().fork_session(true).build()`
944 /// when creating a new client.
945 ///
946 /// # Arguments
947 ///
948 /// * `session_id` - The new session ID to use
949 /// * `prompt` - Initial message for the new session
950 ///
951 /// # Errors
952 ///
953 /// Returns an error if the client is not connected or if sending fails.
954 ///
955 /// # Example
956 ///
957 /// ```no_run
958 /// # use claude_code_agent_sdk::{ClaudeClient, ClaudeAgentOptions};
959 /// # #[tokio::main]
960 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
961 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
962 /// # client.connect().await?;
963 /// // First conversation
964 /// client.query("Hello").await?;
965 ///
966 /// // Start new conversation with different context
967 /// client.new_session("session-2", "Tell me about Rust").await?;
968 /// # Ok(())
969 /// # }
970 /// ```
971 pub async fn new_session(
972 &mut self,
973 session_id: impl Into<String>,
974 prompt: impl Into<String>,
975 ) -> Result<()> {
976 self.query_with_session(prompt, session_id).await
977 }
978
979 /// Disconnect from Claude (analogous to Python's __aexit__)
980 ///
981 /// This cleanly shuts down the connection to Claude Code CLI.
982 ///
983 /// # Errors
984 ///
985 /// Returns an error if disconnection fails.
986 #[instrument(name = "claude.client.disconnect", skip(self))]
987 pub async fn disconnect(&mut self) -> Result<()> {
988 if !self.connected {
989 debug!("Client already disconnected");
990 return Ok(());
991 }
992
993 info!("Disconnecting from Claude Code CLI (closing all isolated queries)");
994
995 if let Some(query_manager) = self.query_manager.take() {
996 // Get all queries for cleanup
997 let queries = query_manager.get_all_queries();
998
999 // Close all isolated queries by closing their resources
1000 // This signals each CLI process to exit
1001 for (_query_id, query) in &queries {
1002 // Close stdin (if available)
1003 if let Some(ref stdin_arc) = query.stdin {
1004 let mut stdin_guard = stdin_arc.lock().await;
1005 if let Some(mut stdin_stream) = stdin_guard.take() {
1006 let _ = stdin_stream.shutdown().await;
1007 }
1008 }
1009
1010 // Close transport
1011 let transport = Arc::clone(&query.transport);
1012 let mut transport_guard = transport.lock().await;
1013 let _ = transport_guard.close().await;
1014 }
1015
1016 // Give background tasks a moment to finish reading and release locks
1017 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1018 }
1019
1020 self.current_query_id = None;
1021 self.connected = false;
1022 debug!("Disconnected successfully");
1023 Ok(())
1024 }
1025}
1026
1027impl Drop for ClaudeClient {
1028 fn drop(&mut self) {
1029 // Note: We can't run async code in Drop, so we can't guarantee clean shutdown
1030 // Users should call disconnect() explicitly
1031 if self.connected {
1032 eprintln!(
1033 "Warning: ClaudeClient dropped without calling disconnect(). Resources may not be cleaned up properly."
1034 );
1035 }
1036 }
1037}
1038
1039#[cfg(test)]
1040mod tests {
1041 use super::*;
1042 use crate::types::permissions::{PermissionResult, PermissionResultAllow};
1043 use std::sync::Arc;
1044
1045 #[tokio::test]
1046 async fn test_connect_rejects_can_use_tool_with_custom_permission_tool() {
1047 let callback: crate::types::permissions::CanUseToolCallback =
1048 Arc::new(|_tool_name, _tool_input, _context| {
1049 Box::pin(async move { PermissionResult::Allow(PermissionResultAllow::default()) })
1050 });
1051
1052 let opts = ClaudeAgentOptions::builder()
1053 .can_use_tool(callback)
1054 .permission_prompt_tool_name("custom_tool") // Not "stdio"
1055 .build();
1056
1057 let mut client = ClaudeClient::new(opts);
1058 let result = client.connect().await;
1059
1060 assert!(result.is_err());
1061 let err = result.unwrap_err();
1062 assert!(matches!(err, ClaudeError::InvalidConfig(_)));
1063 assert!(err.to_string().contains("permission_prompt_tool_name"));
1064 }
1065
1066 #[tokio::test]
1067 async fn test_connect_accepts_can_use_tool_with_stdio() {
1068 let callback: crate::types::permissions::CanUseToolCallback =
1069 Arc::new(|_tool_name, _tool_input, _context| {
1070 Box::pin(async move { PermissionResult::Allow(PermissionResultAllow::default()) })
1071 });
1072
1073 let opts = ClaudeAgentOptions::builder()
1074 .can_use_tool(callback)
1075 .permission_prompt_tool_name("stdio") // Explicitly "stdio" is OK
1076 .build();
1077
1078 let mut client = ClaudeClient::new(opts);
1079 // This will fail later (CLI not found), but should pass validation
1080 let result = client.connect().await;
1081
1082 // Should not be InvalidConfig error about permission_prompt_tool_name
1083 if let Err(ref err) = result {
1084 assert!(
1085 !err.to_string().contains("permission_prompt_tool_name"),
1086 "Should not fail on permission_prompt_tool_name validation"
1087 );
1088 }
1089 }
1090
1091 #[tokio::test]
1092 async fn test_connect_accepts_can_use_tool_without_permission_tool() {
1093 let callback: crate::types::permissions::CanUseToolCallback =
1094 Arc::new(|_tool_name, _tool_input, _context| {
1095 Box::pin(async move { PermissionResult::Allow(PermissionResultAllow::default()) })
1096 });
1097
1098 let opts = ClaudeAgentOptions::builder()
1099 .can_use_tool(callback)
1100 // No permission_prompt_tool_name set - defaults to stdio
1101 .build();
1102
1103 let mut client = ClaudeClient::new(opts);
1104 // This will fail later (CLI not found), but should pass validation
1105 let result = client.connect().await;
1106
1107 // Should not be InvalidConfig error about permission_prompt_tool_name
1108 if let Err(ref err) = result {
1109 assert!(
1110 !err.to_string().contains("permission_prompt_tool_name"),
1111 "Should not fail on permission_prompt_tool_name validation"
1112 );
1113 }
1114 }
1115}