claude_code_agent_sdk/client.rs
1//! ClaudeClient for bidirectional streaming interactions with hook support
2
3use tracing::{debug, info, instrument};
4
5use futures::stream::Stream;
6use std::pin::Pin;
7use std::sync::Arc;
8use tokio::io::AsyncWriteExt;
9use tokio::sync::Mutex;
10
11use crate::errors::{ClaudeError, Result};
12use crate::internal::message_parser::MessageParser;
13use crate::internal::query_full::QueryFull;
14use crate::internal::transport::subprocess::QueryPrompt;
15use crate::internal::transport::{SubprocessTransport, Transport};
16use crate::types::config::{ClaudeAgentOptions, PermissionMode};
17use crate::types::efficiency::{build_efficiency_hooks, merge_hooks};
18use crate::types::hooks::HookEvent;
19use crate::types::messages::{Message, UserContentBlock};
20
21/// Client for bidirectional streaming interactions with Claude
22///
23/// This client provides the same functionality as Python's ClaudeSDKClient,
24/// supporting bidirectional communication, streaming responses, and dynamic
25/// control over the Claude session.
26///
27/// # Example
28///
29/// ```no_run
30/// use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
31/// use futures::StreamExt;
32///
33/// #[tokio::main]
34/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
35/// let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
36///
37/// // Connect to Claude
38/// client.connect().await?;
39///
40/// // Send a query
41/// client.query("Hello Claude!").await?;
42///
43/// // Receive response as a stream
44/// {
45/// let mut stream = client.receive_response();
46/// while let Some(message) = stream.next().await {
47/// println!("Received: {:?}", message?);
48/// }
49/// }
50///
51/// // Disconnect
52/// client.disconnect().await?;
53/// Ok(())
54/// }
55/// ```
56pub struct ClaudeClient {
57 options: ClaudeAgentOptions,
58 query: Option<Arc<Mutex<QueryFull>>>,
59 connected: bool,
60}
61
62impl ClaudeClient {
63 /// Create a new ClaudeClient
64 ///
65 /// # Arguments
66 ///
67 /// * `options` - Configuration options for the Claude client
68 ///
69 /// # Example
70 ///
71 /// ```no_run
72 /// use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
73 ///
74 /// let client = ClaudeClient::new(ClaudeAgentOptions::default());
75 /// ```
76 pub fn new(options: ClaudeAgentOptions) -> Self {
77 Self {
78 options,
79 query: None,
80 connected: false,
81 }
82 }
83
84 /// Create a new ClaudeClient with early validation
85 ///
86 /// Unlike `new()`, this validates the configuration eagerly by attempting
87 /// to create the transport. This catches issues like invalid working directory
88 /// or missing CLI before `connect()` is called.
89 ///
90 /// # Arguments
91 ///
92 /// * `options` - Configuration options for the Claude client
93 ///
94 /// # Errors
95 ///
96 /// Returns an error if:
97 /// - The working directory does not exist or is not a directory
98 /// - Claude CLI cannot be found
99 ///
100 /// # Example
101 ///
102 /// ```no_run
103 /// use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
104 ///
105 /// let client = ClaudeClient::try_new(ClaudeAgentOptions::default())?;
106 /// # Ok::<(), claude_agent_sdk_rs::ClaudeError>(())
107 /// ```
108 pub fn try_new(options: ClaudeAgentOptions) -> Result<Self> {
109 // Validate by attempting to create transport (but don't keep it)
110 let prompt = QueryPrompt::Streaming;
111 let _ = SubprocessTransport::new(prompt, options.clone())?;
112
113 Ok(Self {
114 options,
115 query: None,
116 connected: false,
117 })
118 }
119
120 /// Connect to Claude (analogous to Python's __aenter__)
121 ///
122 /// This establishes the connection to the Claude Code CLI and initializes
123 /// the bidirectional communication channel.
124 ///
125 /// # Errors
126 ///
127 /// Returns an error if:
128 /// - Claude CLI cannot be found or started
129 /// - The initialization handshake fails
130 /// - Hook registration fails
131 /// - `can_use_tool` callback is set with incompatible `permission_prompt_tool_name`
132 #[instrument(
133 name = "claude.client.connect",
134 skip(self),
135 fields(
136 has_can_use_tool = self.options.can_use_tool.is_some(),
137 has_hooks = self.options.hooks.is_some(),
138 model = %self.options.model.as_deref().unwrap_or("default"),
139 )
140 )]
141 pub async fn connect(&mut self) -> Result<()> {
142 if self.connected {
143 debug!("Client already connected, skipping");
144 return Ok(());
145 }
146
147 info!("Connecting to Claude Code CLI");
148
149 // Automatically set permission_prompt_tool_name to "stdio" when can_use_tool is provided
150 // This matches Python SDK behavior (client.py lines 106-122)
151 // which ensures CLI uses control protocol for permission prompts
152 let mut options = self.options.clone();
153 if options.can_use_tool.is_some() && options.permission_prompt_tool_name.is_none() {
154 info!(
155 "can_use_tool callback is set, automatically setting permission_prompt_tool_name to 'stdio'"
156 );
157 options.permission_prompt_tool_name = Some("stdio".to_string());
158 }
159
160 // Validate can_use_tool configuration (aligned with Python SDK behavior)
161 // When can_use_tool callback is set, permission_prompt_tool_name must be "stdio"
162 // to ensure the control protocol can handle permission requests
163 if options.can_use_tool.is_some()
164 && let Some(ref tool_name) = options.permission_prompt_tool_name
165 && tool_name != "stdio"
166 {
167 return Err(ClaudeError::InvalidConfig(
168 "can_use_tool callback requires permission_prompt_tool_name to be 'stdio' or unset. \
169 Custom permission_prompt_tool_name is incompatible with can_use_tool callback."
170 .to_string(),
171 ));
172 }
173
174 // Create transport in streaming mode (no initial prompt)
175 let prompt = QueryPrompt::Streaming;
176 let mut transport = SubprocessTransport::new(prompt, options)?;
177
178 // Don't send initial prompt - we'll use query() for that
179 transport.connect().await?;
180
181 // Extract stdin for direct access (avoids transport lock deadlock)
182 let stdin = Arc::clone(&transport.stdin);
183
184 // Create Query with hooks
185 let mut query = QueryFull::new(Box::new(transport));
186 query.set_stdin(stdin);
187
188 // Set control request timeout from options
189 query.set_control_request_timeout(self.options.control_request_timeout);
190
191 // Extract SDK MCP servers from options
192 let sdk_mcp_servers =
193 if let crate::types::mcp::McpServers::Dict(servers_dict) = &self.options.mcp_servers {
194 servers_dict
195 .iter()
196 .filter_map(|(name, config)| {
197 if let crate::types::mcp::McpServerConfig::Sdk(sdk_config) = config {
198 Some((name.clone(), sdk_config.clone()))
199 } else {
200 None
201 }
202 })
203 .collect()
204 } else {
205 std::collections::HashMap::new()
206 };
207 query.set_sdk_mcp_servers(sdk_mcp_servers).await;
208
209 // Set can_use_tool callback if provided
210 if let Some(ref callback) = self.options.can_use_tool {
211 query.set_can_use_tool(Some(Arc::clone(callback))).await;
212 }
213
214 // Build efficiency hooks if configured
215 let efficiency_hooks = self
216 .options
217 .efficiency
218 .as_ref()
219 .map(build_efficiency_hooks)
220 .unwrap_or_default();
221
222 // Merge user hooks with efficiency hooks
223 let merged_hooks = merge_hooks(self.options.hooks.clone(), efficiency_hooks);
224
225 // Convert hooks to internal format
226 let hooks = merged_hooks.as_ref().map(|hooks_map| {
227 hooks_map
228 .iter()
229 .map(|(event, matchers)| {
230 let event_name = match event {
231 HookEvent::PreToolUse => "PreToolUse",
232 HookEvent::PostToolUse => "PostToolUse",
233 HookEvent::UserPromptSubmit => "UserPromptSubmit",
234 HookEvent::Stop => "Stop",
235 HookEvent::SubagentStop => "SubagentStop",
236 HookEvent::PreCompact => "PreCompact",
237 };
238 (event_name.to_string(), matchers.clone())
239 })
240 .collect()
241 });
242
243 // Start reading messages in background FIRST
244 // This must happen before initialize() because initialize()
245 // sends a control request and waits for response
246 query.start().await?;
247
248 // Initialize with hooks (sends control request)
249 query.initialize(hooks).await?;
250
251 self.query = Some(Arc::new(Mutex::new(query)));
252 self.connected = true;
253
254 info!("Successfully connected to Claude Code CLI");
255 Ok(())
256 }
257
258 /// Send a query to Claude
259 ///
260 /// This sends a new user prompt to Claude. Claude will remember the context
261 /// of previous queries within the same session.
262 ///
263 /// # Arguments
264 ///
265 /// * `prompt` - The user prompt to send
266 ///
267 /// # Errors
268 ///
269 /// Returns an error if the client is not connected or if sending fails.
270 ///
271 /// # Example
272 ///
273 /// ```no_run
274 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
275 /// # #[tokio::main]
276 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
277 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
278 /// # client.connect().await?;
279 /// client.query("What is 2 + 2?").await?;
280 /// # Ok(())
281 /// # }
282 /// ```
283 #[instrument(
284 name = "claude.client.query",
285 skip(self, prompt),
286 fields(session_id = "default",)
287 )]
288 pub async fn query(&mut self, prompt: impl Into<String>) -> Result<()> {
289 self.query_with_session(prompt, "default").await
290 }
291
292 /// Send a query to Claude with a specific session ID
293 ///
294 /// This sends a new user prompt to Claude. Different session IDs maintain
295 /// separate conversation contexts.
296 ///
297 /// # Arguments
298 ///
299 /// * `prompt` - The user prompt to send
300 /// * `session_id` - Session identifier for the conversation
301 ///
302 /// # Errors
303 ///
304 /// Returns an error if the client is not connected or if sending fails.
305 ///
306 /// # Example
307 ///
308 /// ```no_run
309 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
310 /// # #[tokio::main]
311 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
312 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
313 /// # client.connect().await?;
314 /// // Separate conversation contexts
315 /// client.query_with_session("First question", "session-1").await?;
316 /// client.query_with_session("Different question", "session-2").await?;
317 /// # Ok(())
318 /// # }
319 /// ```
320 pub async fn query_with_session(
321 &mut self,
322 prompt: impl Into<String>,
323 session_id: impl Into<String>,
324 ) -> Result<()> {
325 let query = self.query.as_ref().ok_or_else(|| {
326 ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
327 })?;
328
329 let prompt_str = prompt.into();
330 let session_id_str = session_id.into();
331
332 // Format as JSON message for stream-json input format
333 let user_message = serde_json::json!({
334 "type": "user",
335 "message": {
336 "role": "user",
337 "content": prompt_str
338 },
339 "session_id": session_id_str
340 });
341
342 let message_str = serde_json::to_string(&user_message).map_err(|e| {
343 ClaudeError::Transport(format!("Failed to serialize user message: {}", e))
344 })?;
345
346 // Write directly to stdin (bypasses transport lock)
347 let query_guard = query.lock().await;
348 let stdin = query_guard.stdin.clone();
349 drop(query_guard);
350
351 if let Some(stdin_arc) = stdin {
352 let mut stdin_guard = stdin_arc.lock().await;
353 if let Some(ref mut stdin_stream) = *stdin_guard {
354 stdin_stream
355 .write_all(message_str.as_bytes())
356 .await
357 .map_err(|e| ClaudeError::Transport(format!("Failed to write query: {}", e)))?;
358 stdin_stream.write_all(b"\n").await.map_err(|e| {
359 ClaudeError::Transport(format!("Failed to write newline: {}", e))
360 })?;
361 stdin_stream
362 .flush()
363 .await
364 .map_err(|e| ClaudeError::Transport(format!("Failed to flush: {}", e)))?;
365 } else {
366 return Err(ClaudeError::Transport("stdin not available".to_string()));
367 }
368 } else {
369 return Err(ClaudeError::Transport("stdin not set".to_string()));
370 }
371
372 Ok(())
373 }
374
375 /// Send a query with structured content blocks (supports images)
376 ///
377 /// This method enables multimodal queries in bidirectional streaming mode.
378 /// Use it to send images alongside text for vision-related tasks.
379 ///
380 /// # Arguments
381 ///
382 /// * `content` - A vector of content blocks (text and/or images)
383 ///
384 /// # Errors
385 ///
386 /// Returns an error if:
387 /// - The content vector is empty (must include at least one text or image block)
388 /// - The client is not connected (call `connect()` first)
389 /// - Sending the message fails
390 ///
391 /// # Example
392 ///
393 /// ```no_run
394 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions, UserContentBlock};
395 /// # #[tokio::main]
396 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
397 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
398 /// # client.connect().await?;
399 /// let base64_data = "iVBORw0KGgo..."; // base64 encoded image
400 /// client.query_with_content(vec![
401 /// UserContentBlock::text("What's in this image?"),
402 /// UserContentBlock::image_base64("image/png", base64_data)?,
403 /// ]).await?;
404 /// # Ok(())
405 /// # }
406 /// ```
407 pub async fn query_with_content(
408 &mut self,
409 content: impl Into<Vec<UserContentBlock>>,
410 ) -> Result<()> {
411 self.query_with_content_and_session(content, "default")
412 .await
413 }
414
415 /// Send a query with structured content blocks and a specific session ID
416 ///
417 /// This method enables multimodal queries with session management for
418 /// maintaining separate conversation contexts.
419 ///
420 /// # Arguments
421 ///
422 /// * `content` - A vector of content blocks (text and/or images)
423 /// * `session_id` - Session identifier for the conversation
424 ///
425 /// # Errors
426 ///
427 /// Returns an error if:
428 /// - The content vector is empty (must include at least one text or image block)
429 /// - The client is not connected (call `connect()` first)
430 /// - Sending the message fails
431 ///
432 /// # Example
433 ///
434 /// ```no_run
435 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions, UserContentBlock};
436 /// # #[tokio::main]
437 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
438 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
439 /// # client.connect().await?;
440 /// client.query_with_content_and_session(
441 /// vec![
442 /// UserContentBlock::text("Analyze this chart"),
443 /// UserContentBlock::image_url("https://example.com/chart.png"),
444 /// ],
445 /// "analysis-session",
446 /// ).await?;
447 /// # Ok(())
448 /// # }
449 /// ```
450 pub async fn query_with_content_and_session(
451 &mut self,
452 content: impl Into<Vec<UserContentBlock>>,
453 session_id: impl Into<String>,
454 ) -> Result<()> {
455 let query = self.query.as_ref().ok_or_else(|| {
456 ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
457 })?;
458
459 let content_blocks: Vec<UserContentBlock> = content.into();
460 UserContentBlock::validate_content(&content_blocks)?;
461
462 let session_id_str = session_id.into();
463
464 // Format as JSON message for stream-json input format
465 // Content is an array of content blocks, not a simple string
466 let user_message = serde_json::json!({
467 "type": "user",
468 "message": {
469 "role": "user",
470 "content": content_blocks
471 },
472 "session_id": session_id_str
473 });
474
475 let message_str = serde_json::to_string(&user_message).map_err(|e| {
476 ClaudeError::Transport(format!("Failed to serialize user message: {}", e))
477 })?;
478
479 // Write directly to stdin (bypasses transport lock)
480 let query_guard = query.lock().await;
481 let stdin = query_guard.stdin.clone();
482 drop(query_guard);
483
484 if let Some(stdin_arc) = stdin {
485 let mut stdin_guard = stdin_arc.lock().await;
486 if let Some(ref mut stdin_stream) = *stdin_guard {
487 stdin_stream
488 .write_all(message_str.as_bytes())
489 .await
490 .map_err(|e| ClaudeError::Transport(format!("Failed to write query: {}", e)))?;
491 stdin_stream.write_all(b"\n").await.map_err(|e| {
492 ClaudeError::Transport(format!("Failed to write newline: {}", e))
493 })?;
494 stdin_stream
495 .flush()
496 .await
497 .map_err(|e| ClaudeError::Transport(format!("Failed to flush: {}", e)))?;
498 } else {
499 return Err(ClaudeError::Transport("stdin not available".to_string()));
500 }
501 } else {
502 return Err(ClaudeError::Transport("stdin not set".to_string()));
503 }
504
505 Ok(())
506 }
507
508 /// Receive all messages as a stream (continuous)
509 ///
510 /// This method returns a stream that yields all messages from Claude
511 /// indefinitely until the stream is closed or an error occurs.
512 ///
513 /// Use this when you want to process all messages, including multiple
514 /// responses and system events.
515 ///
516 /// # Returns
517 ///
518 /// A stream of `Result<Message>` that continues until the connection closes.
519 ///
520 /// # Example
521 ///
522 /// ```no_run
523 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
524 /// # use futures::StreamExt;
525 /// # #[tokio::main]
526 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
527 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
528 /// # client.connect().await?;
529 /// # client.query("Hello").await?;
530 /// let mut stream = client.receive_messages();
531 /// while let Some(message) = stream.next().await {
532 /// println!("Received: {:?}", message?);
533 /// }
534 /// # Ok(())
535 /// # }
536 /// ```
537 pub fn receive_messages(&self) -> Pin<Box<dyn Stream<Item = Result<Message>> + Send + '_>> {
538 let query = match &self.query {
539 Some(q) => Arc::clone(q),
540 None => {
541 return Box::pin(futures::stream::once(async {
542 Err(ClaudeError::InvalidConfig(
543 "Client not connected. Call connect() first.".to_string(),
544 ))
545 }));
546 }
547 };
548
549 Box::pin(async_stream::stream! {
550 let rx: Arc<Mutex<tokio::sync::mpsc::UnboundedReceiver<serde_json::Value>>> = {
551 let query_guard = query.lock().await;
552 Arc::clone(&query_guard.message_rx)
553 };
554
555 loop {
556 let message = {
557 let mut rx_guard = rx.lock().await;
558 rx_guard.recv().await
559 };
560
561 match message {
562 Some(json) => {
563 match MessageParser::parse(json) {
564 Ok(msg) => yield Ok(msg),
565 Err(e) => {
566 eprintln!("Failed to parse message: {}", e);
567 yield Err(e);
568 }
569 }
570 }
571 None => break,
572 }
573 }
574 })
575 }
576
577 /// Receive messages until a ResultMessage
578 ///
579 /// This method returns a stream that yields messages until it encounters
580 /// a `ResultMessage`, which signals the completion of a Claude response.
581 ///
582 /// This is the most common pattern for handling Claude responses, as it
583 /// processes one complete "turn" of the conversation.
584 ///
585 /// # Returns
586 ///
587 /// A stream of `Result<Message>` that ends when a ResultMessage is received.
588 ///
589 /// # Example
590 ///
591 /// ```no_run
592 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions, Message};
593 /// # use futures::StreamExt;
594 /// # #[tokio::main]
595 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
596 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
597 /// # client.connect().await?;
598 /// # client.query("Hello").await?;
599 /// let mut stream = client.receive_response();
600 /// while let Some(message) = stream.next().await {
601 /// match message? {
602 /// Message::Assistant(msg) => println!("Assistant: {:?}", msg),
603 /// Message::Result(result) => {
604 /// println!("Done! Cost: ${:?}", result.total_cost_usd);
605 /// break;
606 /// }
607 /// _ => {}
608 /// }
609 /// }
610 /// # Ok(())
611 /// # }
612 /// ```
613 pub fn receive_response(&self) -> Pin<Box<dyn Stream<Item = Result<Message>> + Send + '_>> {
614 let query = match &self.query {
615 Some(q) => Arc::clone(q),
616 None => {
617 return Box::pin(futures::stream::once(async {
618 Err(ClaudeError::InvalidConfig(
619 "Client not connected. Call connect() first.".to_string(),
620 ))
621 }));
622 }
623 };
624
625 Box::pin(async_stream::stream! {
626 let rx: Arc<Mutex<tokio::sync::mpsc::UnboundedReceiver<serde_json::Value>>> = {
627 let query_guard = query.lock().await;
628 Arc::clone(&query_guard.message_rx)
629 };
630
631 loop {
632 let message = {
633 let mut rx_guard = rx.lock().await;
634 rx_guard.recv().await
635 };
636
637 match message {
638 Some(json) => {
639 match MessageParser::parse(json) {
640 Ok(msg) => {
641 let is_result = matches!(msg, Message::Result(_));
642 yield Ok(msg);
643 if is_result {
644 break;
645 }
646 }
647 Err(e) => {
648 eprintln!("Failed to parse message: {}", e);
649 yield Err(e);
650 }
651 }
652 }
653 None => break,
654 }
655 }
656 })
657 }
658
659 /// Send an interrupt signal to stop the current Claude operation
660 ///
661 /// This is analogous to Python's `client.interrupt()`.
662 ///
663 /// # Errors
664 ///
665 /// Returns an error if the client is not connected or if sending fails.
666 pub async fn interrupt(&self) -> Result<()> {
667 let query = self.query.as_ref().ok_or_else(|| {
668 ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
669 })?;
670
671 let query_guard = query.lock().await;
672 query_guard.interrupt().await
673 }
674
675 /// Change the permission mode dynamically
676 ///
677 /// This is analogous to Python's `client.set_permission_mode()`.
678 ///
679 /// # Arguments
680 ///
681 /// * `mode` - The new permission mode to set
682 ///
683 /// # Errors
684 ///
685 /// Returns an error if the client is not connected or if sending fails.
686 pub async fn set_permission_mode(&self, mode: PermissionMode) -> Result<()> {
687 let query = self.query.as_ref().ok_or_else(|| {
688 ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
689 })?;
690
691 let query_guard = query.lock().await;
692 query_guard.set_permission_mode(mode).await
693 }
694
695 /// Change the AI model dynamically
696 ///
697 /// This is analogous to Python's `client.set_model()`.
698 ///
699 /// # Arguments
700 ///
701 /// * `model` - The new model name, or None to use default
702 ///
703 /// # Errors
704 ///
705 /// Returns an error if the client is not connected or if sending fails.
706 pub async fn set_model(&self, model: Option<&str>) -> Result<()> {
707 let query = self.query.as_ref().ok_or_else(|| {
708 ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
709 })?;
710
711 let query_guard = query.lock().await;
712 query_guard.set_model(model).await
713 }
714
715 /// Rewind tracked files to their state at a specific user message.
716 ///
717 /// This is analogous to Python's `client.rewind_files()`.
718 ///
719 /// # Requirements
720 ///
721 /// - `enable_file_checkpointing=true` in options to track file changes
722 /// - `extra_args={"replay-user-messages": None}` to receive UserMessage
723 /// objects with `uuid` in the response stream
724 ///
725 /// # Arguments
726 ///
727 /// * `user_message_id` - UUID of the user message to rewind to. This should be
728 /// the `uuid` field from a `UserMessage` received during the conversation.
729 ///
730 /// # Errors
731 ///
732 /// Returns an error if the client is not connected or if sending fails.
733 ///
734 /// # Example
735 ///
736 /// ```no_run
737 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions, Message};
738 /// # use std::collections::HashMap;
739 /// # #[tokio::main]
740 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
741 /// let options = ClaudeAgentOptions::builder()
742 /// .enable_file_checkpointing(true)
743 /// .extra_args(HashMap::from([("replay-user-messages".to_string(), None)]))
744 /// .build();
745 /// let mut client = ClaudeClient::new(options);
746 /// client.connect().await?;
747 ///
748 /// client.query("Make some changes to my files").await?;
749 /// let mut checkpoint_id = None;
750 /// {
751 /// let mut stream = client.receive_response();
752 /// use futures::StreamExt;
753 /// while let Some(Ok(msg)) = stream.next().await {
754 /// if let Message::User(user_msg) = &msg {
755 /// if let Some(uuid) = &user_msg.uuid {
756 /// checkpoint_id = Some(uuid.clone());
757 /// }
758 /// }
759 /// }
760 /// }
761 ///
762 /// // Later, rewind to that point
763 /// if let Some(id) = checkpoint_id {
764 /// client.rewind_files(&id).await?;
765 /// }
766 /// # Ok(())
767 /// # }
768 /// ```
769 pub async fn rewind_files(&self, user_message_id: &str) -> Result<()> {
770 let query = self.query.as_ref().ok_or_else(|| {
771 ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
772 })?;
773
774 let query_guard = query.lock().await;
775 query_guard.rewind_files(user_message_id).await
776 }
777
778 /// Get server initialization info including available commands and output styles
779 ///
780 /// Returns initialization information from the Claude Code server including:
781 /// - Available commands (slash commands, system commands, etc.)
782 /// - Current and available output styles
783 /// - Server capabilities
784 ///
785 /// This is analogous to Python's `client.get_server_info()`.
786 ///
787 /// # Returns
788 ///
789 /// Dictionary with server info, or None if not connected
790 ///
791 /// # Example
792 ///
793 /// ```no_run
794 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
795 /// # #[tokio::main]
796 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
797 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
798 /// # client.connect().await?;
799 /// if let Some(info) = client.get_server_info().await {
800 /// println!("Commands available: {}", info.get("commands").map(|c| c.as_array().map(|a| a.len()).unwrap_or(0)).unwrap_or(0));
801 /// println!("Output style: {:?}", info.get("output_style"));
802 /// }
803 /// # Ok(())
804 /// # }
805 /// ```
806 pub async fn get_server_info(&self) -> Option<serde_json::Value> {
807 let query = self.query.as_ref()?;
808 let query_guard = query.lock().await;
809 query_guard.get_initialization_result().await
810 }
811
812 /// Start a new session by switching to a different session ID
813 ///
814 /// This is a convenience method that creates a new conversation context.
815 /// It's equivalent to calling `query_with_session()` with a new session ID.
816 ///
817 /// To completely clear memory and start fresh, use `ClaudeAgentOptions::builder().fork_session(true).build()`
818 /// when creating a new client.
819 ///
820 /// # Arguments
821 ///
822 /// * `session_id` - The new session ID to use
823 /// * `prompt` - Initial message for the new session
824 ///
825 /// # Errors
826 ///
827 /// Returns an error if the client is not connected or if sending fails.
828 ///
829 /// # Example
830 ///
831 /// ```no_run
832 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
833 /// # #[tokio::main]
834 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
835 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
836 /// # client.connect().await?;
837 /// // First conversation
838 /// client.query("Hello").await?;
839 ///
840 /// // Start new conversation with different context
841 /// client.new_session("session-2", "Tell me about Rust").await?;
842 /// # Ok(())
843 /// # }
844 /// ```
845 pub async fn new_session(
846 &mut self,
847 session_id: impl Into<String>,
848 prompt: impl Into<String>,
849 ) -> Result<()> {
850 self.query_with_session(prompt, session_id).await
851 }
852
853 /// Disconnect from Claude (analogous to Python's __aexit__)
854 ///
855 /// This cleanly shuts down the connection to Claude Code CLI.
856 ///
857 /// # Errors
858 ///
859 /// Returns an error if disconnection fails.
860 #[instrument(name = "claude.client.disconnect", skip(self))]
861 pub async fn disconnect(&mut self) -> Result<()> {
862 if !self.connected {
863 debug!("Client already disconnected");
864 return Ok(());
865 }
866
867 info!("Disconnecting from Claude Code CLI");
868
869 if let Some(query) = self.query.take() {
870 // Close stdin first (using direct access) to signal CLI to exit
871 // This will cause the background task to finish and release transport lock
872 let query_guard = query.lock().await;
873 if let Some(ref stdin_arc) = query_guard.stdin {
874 let mut stdin_guard = stdin_arc.lock().await;
875 if let Some(mut stdin_stream) = stdin_guard.take() {
876 let _ = stdin_stream.shutdown().await;
877 }
878 }
879 let transport = Arc::clone(&query_guard.transport);
880 drop(query_guard);
881
882 // Give background task a moment to finish reading and release lock
883 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
884
885 let mut transport_guard = transport.lock().await;
886 transport_guard.close().await?;
887 }
888
889 self.connected = false;
890 debug!("Disconnected successfully");
891 Ok(())
892 }
893}
894
895impl Drop for ClaudeClient {
896 fn drop(&mut self) {
897 // Note: We can't run async code in Drop, so we can't guarantee clean shutdown
898 // Users should call disconnect() explicitly
899 if self.connected {
900 eprintln!(
901 "Warning: ClaudeClient dropped without calling disconnect(). Resources may not be cleaned up properly."
902 );
903 }
904 }
905}
906
907#[cfg(test)]
908mod tests {
909 use super::*;
910 use crate::types::permissions::{PermissionResult, PermissionResultAllow};
911 use std::sync::Arc;
912
913 #[tokio::test]
914 async fn test_connect_rejects_can_use_tool_with_custom_permission_tool() {
915 let callback: crate::types::permissions::CanUseToolCallback =
916 Arc::new(|_tool_name, _tool_input, _context| {
917 Box::pin(async move { PermissionResult::Allow(PermissionResultAllow::default()) })
918 });
919
920 let opts = ClaudeAgentOptions::builder()
921 .can_use_tool(callback)
922 .permission_prompt_tool_name("custom_tool") // Not "stdio"
923 .build();
924
925 let mut client = ClaudeClient::new(opts);
926 let result = client.connect().await;
927
928 assert!(result.is_err());
929 let err = result.unwrap_err();
930 assert!(matches!(err, ClaudeError::InvalidConfig(_)));
931 assert!(err.to_string().contains("permission_prompt_tool_name"));
932 }
933
934 #[tokio::test]
935 async fn test_connect_accepts_can_use_tool_with_stdio() {
936 let callback: crate::types::permissions::CanUseToolCallback =
937 Arc::new(|_tool_name, _tool_input, _context| {
938 Box::pin(async move { PermissionResult::Allow(PermissionResultAllow::default()) })
939 });
940
941 let opts = ClaudeAgentOptions::builder()
942 .can_use_tool(callback)
943 .permission_prompt_tool_name("stdio") // Explicitly "stdio" is OK
944 .build();
945
946 let mut client = ClaudeClient::new(opts);
947 // This will fail later (CLI not found), but should pass validation
948 let result = client.connect().await;
949
950 // Should not be InvalidConfig error about permission_prompt_tool_name
951 if let Err(ref err) = result {
952 assert!(
953 !err.to_string().contains("permission_prompt_tool_name"),
954 "Should not fail on permission_prompt_tool_name validation"
955 );
956 }
957 }
958
959 #[tokio::test]
960 async fn test_connect_accepts_can_use_tool_without_permission_tool() {
961 let callback: crate::types::permissions::CanUseToolCallback =
962 Arc::new(|_tool_name, _tool_input, _context| {
963 Box::pin(async move { PermissionResult::Allow(PermissionResultAllow::default()) })
964 });
965
966 let opts = ClaudeAgentOptions::builder()
967 .can_use_tool(callback)
968 // No permission_prompt_tool_name set - defaults to stdio
969 .build();
970
971 let mut client = ClaudeClient::new(opts);
972 // This will fail later (CLI not found), but should pass validation
973 let result = client.connect().await;
974
975 // Should not be InvalidConfig error about permission_prompt_tool_name
976 if let Err(ref err) = result {
977 assert!(
978 !err.to_string().contains("permission_prompt_tool_name"),
979 "Should not fail on permission_prompt_tool_name validation"
980 );
981 }
982 }
983}