claude_code_agent_sdk/client.rs
1//! ClaudeClient for bidirectional streaming interactions with hook support
2
3use tracing::{debug, info, instrument};
4
5use futures::stream::Stream;
6use std::pin::Pin;
7use std::sync::Arc;
8use tokio::io::AsyncWriteExt;
9use tokio::sync::Mutex;
10
11use crate::errors::{ClaudeError, Result};
12use crate::internal::message_parser::MessageParser;
13use crate::internal::query_full::QueryFull;
14use crate::internal::transport::subprocess::QueryPrompt;
15use crate::internal::transport::{SubprocessTransport, Transport};
16use crate::types::config::{ClaudeAgentOptions, PermissionMode};
17use crate::types::efficiency::{build_efficiency_hooks, merge_hooks};
18use crate::types::hooks::HookEvent;
19use crate::types::messages::{Message, UserContentBlock};
20
21/// Client for bidirectional streaming interactions with Claude
22///
23/// This client provides the same functionality as Python's ClaudeSDKClient,
24/// supporting bidirectional communication, streaming responses, and dynamic
25/// control over the Claude session.
26///
27/// # Example
28///
29/// ```no_run
30/// use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
31/// use futures::StreamExt;
32///
33/// #[tokio::main]
34/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
35/// let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
36///
37/// // Connect to Claude
38/// client.connect().await?;
39///
40/// // Send a query
41/// client.query("Hello Claude!").await?;
42///
43/// // Receive response as a stream
44/// {
45/// let mut stream = client.receive_response();
46/// while let Some(message) = stream.next().await {
47/// println!("Received: {:?}", message?);
48/// }
49/// }
50///
51/// // Disconnect
52/// client.disconnect().await?;
53/// Ok(())
54/// }
55/// ```
56pub struct ClaudeClient {
57 options: ClaudeAgentOptions,
58 query: Option<Arc<Mutex<QueryFull>>>,
59 connected: bool,
60}
61
62impl ClaudeClient {
63 /// Create a new ClaudeClient
64 ///
65 /// # Arguments
66 ///
67 /// * `options` - Configuration options for the Claude client
68 ///
69 /// # Example
70 ///
71 /// ```no_run
72 /// use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
73 ///
74 /// let client = ClaudeClient::new(ClaudeAgentOptions::default());
75 /// ```
76 pub fn new(options: ClaudeAgentOptions) -> Self {
77 Self {
78 options,
79 query: None,
80 connected: false,
81 }
82 }
83
84 /// Create a new ClaudeClient with early validation
85 ///
86 /// Unlike `new()`, this validates the configuration eagerly by attempting
87 /// to create the transport. This catches issues like invalid working directory
88 /// or missing CLI before `connect()` is called.
89 ///
90 /// # Arguments
91 ///
92 /// * `options` - Configuration options for the Claude client
93 ///
94 /// # Errors
95 ///
96 /// Returns an error if:
97 /// - The working directory does not exist or is not a directory
98 /// - Claude CLI cannot be found
99 ///
100 /// # Example
101 ///
102 /// ```no_run
103 /// use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
104 ///
105 /// let client = ClaudeClient::try_new(ClaudeAgentOptions::default())?;
106 /// # Ok::<(), claude_agent_sdk_rs::ClaudeError>(())
107 /// ```
108 pub fn try_new(options: ClaudeAgentOptions) -> Result<Self> {
109 // Validate by attempting to create transport (but don't keep it)
110 let prompt = QueryPrompt::Streaming;
111 let _ = SubprocessTransport::new(prompt, options.clone())?;
112
113 Ok(Self {
114 options,
115 query: None,
116 connected: false,
117 })
118 }
119
120 /// Connect to Claude (analogous to Python's __aenter__)
121 ///
122 /// This establishes the connection to the Claude Code CLI and initializes
123 /// the bidirectional communication channel.
124 ///
125 /// # Errors
126 ///
127 /// Returns an error if:
128 /// - Claude CLI cannot be found or started
129 /// - The initialization handshake fails
130 /// - Hook registration fails
131 /// - `can_use_tool` callback is set with incompatible `permission_prompt_tool_name`
132 #[instrument(
133 name = "claude.client.connect",
134 skip(self),
135 fields(
136 has_can_use_tool = self.options.can_use_tool.is_some(),
137 has_hooks = self.options.hooks.is_some(),
138 model = %self.options.model.as_deref().unwrap_or("default"),
139 )
140 )]
141 pub async fn connect(&mut self) -> Result<()> {
142 if self.connected {
143 debug!("Client already connected, skipping");
144 return Ok(());
145 }
146
147 info!("Connecting to Claude Code CLI");
148
149 // Automatically set permission_prompt_tool_name to "stdio" when can_use_tool is provided
150 // This matches Python SDK behavior (client.py lines 106-122)
151 // which ensures CLI uses control protocol for permission prompts
152 let mut options = self.options.clone();
153 if options.can_use_tool.is_some() && options.permission_prompt_tool_name.is_none() {
154 info!("can_use_tool callback is set, automatically setting permission_prompt_tool_name to 'stdio'");
155 options.permission_prompt_tool_name = Some("stdio".to_string());
156 }
157
158 // Validate can_use_tool configuration (aligned with Python SDK behavior)
159 // When can_use_tool callback is set, permission_prompt_tool_name must be "stdio"
160 // to ensure the control protocol can handle permission requests
161 if options.can_use_tool.is_some()
162 && let Some(ref tool_name) = options.permission_prompt_tool_name
163 && tool_name != "stdio"
164 {
165 return Err(ClaudeError::InvalidConfig(
166 "can_use_tool callback requires permission_prompt_tool_name to be 'stdio' or unset. \
167 Custom permission_prompt_tool_name is incompatible with can_use_tool callback."
168 .to_string(),
169 ));
170 }
171
172 // Create transport in streaming mode (no initial prompt)
173 let prompt = QueryPrompt::Streaming;
174 let mut transport = SubprocessTransport::new(prompt, options)?;
175
176 // Don't send initial prompt - we'll use query() for that
177 transport.connect().await?;
178
179 // Extract stdin for direct access (avoids transport lock deadlock)
180 let stdin = Arc::clone(&transport.stdin);
181
182 // Create Query with hooks
183 let mut query = QueryFull::new(Box::new(transport));
184 query.set_stdin(stdin);
185
186 // Set control request timeout from options
187 query.set_control_request_timeout(self.options.control_request_timeout);
188
189 // Extract SDK MCP servers from options
190 let sdk_mcp_servers =
191 if let crate::types::mcp::McpServers::Dict(servers_dict) = &self.options.mcp_servers {
192 servers_dict
193 .iter()
194 .filter_map(|(name, config)| {
195 if let crate::types::mcp::McpServerConfig::Sdk(sdk_config) = config {
196 Some((name.clone(), sdk_config.clone()))
197 } else {
198 None
199 }
200 })
201 .collect()
202 } else {
203 std::collections::HashMap::new()
204 };
205 query.set_sdk_mcp_servers(sdk_mcp_servers).await;
206
207 // Set can_use_tool callback if provided
208 if let Some(ref callback) = self.options.can_use_tool {
209 query.set_can_use_tool(Some(Arc::clone(callback))).await;
210 }
211
212 // Build efficiency hooks if configured
213 let efficiency_hooks = self
214 .options
215 .efficiency
216 .as_ref()
217 .map(build_efficiency_hooks)
218 .unwrap_or_default();
219
220 // Merge user hooks with efficiency hooks
221 let merged_hooks = merge_hooks(self.options.hooks.clone(), efficiency_hooks);
222
223 // Convert hooks to internal format
224 let hooks = merged_hooks.as_ref().map(|hooks_map| {
225 hooks_map
226 .iter()
227 .map(|(event, matchers)| {
228 let event_name = match event {
229 HookEvent::PreToolUse => "PreToolUse",
230 HookEvent::PostToolUse => "PostToolUse",
231 HookEvent::UserPromptSubmit => "UserPromptSubmit",
232 HookEvent::Stop => "Stop",
233 HookEvent::SubagentStop => "SubagentStop",
234 HookEvent::PreCompact => "PreCompact",
235 };
236 (event_name.to_string(), matchers.clone())
237 })
238 .collect()
239 });
240
241 // Start reading messages in background FIRST
242 // This must happen before initialize() because initialize()
243 // sends a control request and waits for response
244 query.start().await?;
245
246 // Initialize with hooks (sends control request)
247 query.initialize(hooks).await?;
248
249 self.query = Some(Arc::new(Mutex::new(query)));
250 self.connected = true;
251
252 info!("Successfully connected to Claude Code CLI");
253 Ok(())
254 }
255
256 /// Send a query to Claude
257 ///
258 /// This sends a new user prompt to Claude. Claude will remember the context
259 /// of previous queries within the same session.
260 ///
261 /// # Arguments
262 ///
263 /// * `prompt` - The user prompt to send
264 ///
265 /// # Errors
266 ///
267 /// Returns an error if the client is not connected or if sending fails.
268 ///
269 /// # Example
270 ///
271 /// ```no_run
272 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
273 /// # #[tokio::main]
274 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
275 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
276 /// # client.connect().await?;
277 /// client.query("What is 2 + 2?").await?;
278 /// # Ok(())
279 /// # }
280 /// ```
281 #[instrument(
282 name = "claude.client.query",
283 skip(self, prompt),
284 fields(session_id = "default",)
285 )]
286 pub async fn query(&mut self, prompt: impl Into<String>) -> Result<()> {
287 self.query_with_session(prompt, "default").await
288 }
289
290 /// Send a query to Claude with a specific session ID
291 ///
292 /// This sends a new user prompt to Claude. Different session IDs maintain
293 /// separate conversation contexts.
294 ///
295 /// # Arguments
296 ///
297 /// * `prompt` - The user prompt to send
298 /// * `session_id` - Session identifier for the conversation
299 ///
300 /// # Errors
301 ///
302 /// Returns an error if the client is not connected or if sending fails.
303 ///
304 /// # Example
305 ///
306 /// ```no_run
307 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
308 /// # #[tokio::main]
309 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
310 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
311 /// # client.connect().await?;
312 /// // Separate conversation contexts
313 /// client.query_with_session("First question", "session-1").await?;
314 /// client.query_with_session("Different question", "session-2").await?;
315 /// # Ok(())
316 /// # }
317 /// ```
318 pub async fn query_with_session(
319 &mut self,
320 prompt: impl Into<String>,
321 session_id: impl Into<String>,
322 ) -> Result<()> {
323 let query = self.query.as_ref().ok_or_else(|| {
324 ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
325 })?;
326
327 let prompt_str = prompt.into();
328 let session_id_str = session_id.into();
329
330 // Format as JSON message for stream-json input format
331 let user_message = serde_json::json!({
332 "type": "user",
333 "message": {
334 "role": "user",
335 "content": prompt_str
336 },
337 "session_id": session_id_str
338 });
339
340 let message_str = serde_json::to_string(&user_message).map_err(|e| {
341 ClaudeError::Transport(format!("Failed to serialize user message: {}", e))
342 })?;
343
344 // Write directly to stdin (bypasses transport lock)
345 let query_guard = query.lock().await;
346 let stdin = query_guard.stdin.clone();
347 drop(query_guard);
348
349 if let Some(stdin_arc) = stdin {
350 let mut stdin_guard = stdin_arc.lock().await;
351 if let Some(ref mut stdin_stream) = *stdin_guard {
352 stdin_stream
353 .write_all(message_str.as_bytes())
354 .await
355 .map_err(|e| ClaudeError::Transport(format!("Failed to write query: {}", e)))?;
356 stdin_stream.write_all(b"\n").await.map_err(|e| {
357 ClaudeError::Transport(format!("Failed to write newline: {}", e))
358 })?;
359 stdin_stream
360 .flush()
361 .await
362 .map_err(|e| ClaudeError::Transport(format!("Failed to flush: {}", e)))?;
363 } else {
364 return Err(ClaudeError::Transport("stdin not available".to_string()));
365 }
366 } else {
367 return Err(ClaudeError::Transport("stdin not set".to_string()));
368 }
369
370 Ok(())
371 }
372
373 /// Send a query with structured content blocks (supports images)
374 ///
375 /// This method enables multimodal queries in bidirectional streaming mode.
376 /// Use it to send images alongside text for vision-related tasks.
377 ///
378 /// # Arguments
379 ///
380 /// * `content` - A vector of content blocks (text and/or images)
381 ///
382 /// # Errors
383 ///
384 /// Returns an error if:
385 /// - The content vector is empty (must include at least one text or image block)
386 /// - The client is not connected (call `connect()` first)
387 /// - Sending the message fails
388 ///
389 /// # Example
390 ///
391 /// ```no_run
392 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions, UserContentBlock};
393 /// # #[tokio::main]
394 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
395 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
396 /// # client.connect().await?;
397 /// let base64_data = "iVBORw0KGgo..."; // base64 encoded image
398 /// client.query_with_content(vec![
399 /// UserContentBlock::text("What's in this image?"),
400 /// UserContentBlock::image_base64("image/png", base64_data)?,
401 /// ]).await?;
402 /// # Ok(())
403 /// # }
404 /// ```
405 pub async fn query_with_content(
406 &mut self,
407 content: impl Into<Vec<UserContentBlock>>,
408 ) -> Result<()> {
409 self.query_with_content_and_session(content, "default")
410 .await
411 }
412
413 /// Send a query with structured content blocks and a specific session ID
414 ///
415 /// This method enables multimodal queries with session management for
416 /// maintaining separate conversation contexts.
417 ///
418 /// # Arguments
419 ///
420 /// * `content` - A vector of content blocks (text and/or images)
421 /// * `session_id` - Session identifier for the conversation
422 ///
423 /// # Errors
424 ///
425 /// Returns an error if:
426 /// - The content vector is empty (must include at least one text or image block)
427 /// - The client is not connected (call `connect()` first)
428 /// - Sending the message fails
429 ///
430 /// # Example
431 ///
432 /// ```no_run
433 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions, UserContentBlock};
434 /// # #[tokio::main]
435 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
436 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
437 /// # client.connect().await?;
438 /// client.query_with_content_and_session(
439 /// vec![
440 /// UserContentBlock::text("Analyze this chart"),
441 /// UserContentBlock::image_url("https://example.com/chart.png"),
442 /// ],
443 /// "analysis-session",
444 /// ).await?;
445 /// # Ok(())
446 /// # }
447 /// ```
448 pub async fn query_with_content_and_session(
449 &mut self,
450 content: impl Into<Vec<UserContentBlock>>,
451 session_id: impl Into<String>,
452 ) -> Result<()> {
453 let query = self.query.as_ref().ok_or_else(|| {
454 ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
455 })?;
456
457 let content_blocks: Vec<UserContentBlock> = content.into();
458 UserContentBlock::validate_content(&content_blocks)?;
459
460 let session_id_str = session_id.into();
461
462 // Format as JSON message for stream-json input format
463 // Content is an array of content blocks, not a simple string
464 let user_message = serde_json::json!({
465 "type": "user",
466 "message": {
467 "role": "user",
468 "content": content_blocks
469 },
470 "session_id": session_id_str
471 });
472
473 let message_str = serde_json::to_string(&user_message).map_err(|e| {
474 ClaudeError::Transport(format!("Failed to serialize user message: {}", e))
475 })?;
476
477 // Write directly to stdin (bypasses transport lock)
478 let query_guard = query.lock().await;
479 let stdin = query_guard.stdin.clone();
480 drop(query_guard);
481
482 if let Some(stdin_arc) = stdin {
483 let mut stdin_guard = stdin_arc.lock().await;
484 if let Some(ref mut stdin_stream) = *stdin_guard {
485 stdin_stream
486 .write_all(message_str.as_bytes())
487 .await
488 .map_err(|e| ClaudeError::Transport(format!("Failed to write query: {}", e)))?;
489 stdin_stream.write_all(b"\n").await.map_err(|e| {
490 ClaudeError::Transport(format!("Failed to write newline: {}", e))
491 })?;
492 stdin_stream
493 .flush()
494 .await
495 .map_err(|e| ClaudeError::Transport(format!("Failed to flush: {}", e)))?;
496 } else {
497 return Err(ClaudeError::Transport("stdin not available".to_string()));
498 }
499 } else {
500 return Err(ClaudeError::Transport("stdin not set".to_string()));
501 }
502
503 Ok(())
504 }
505
506 /// Receive all messages as a stream (continuous)
507 ///
508 /// This method returns a stream that yields all messages from Claude
509 /// indefinitely until the stream is closed or an error occurs.
510 ///
511 /// Use this when you want to process all messages, including multiple
512 /// responses and system events.
513 ///
514 /// # Returns
515 ///
516 /// A stream of `Result<Message>` that continues until the connection closes.
517 ///
518 /// # Example
519 ///
520 /// ```no_run
521 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
522 /// # use futures::StreamExt;
523 /// # #[tokio::main]
524 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
525 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
526 /// # client.connect().await?;
527 /// # client.query("Hello").await?;
528 /// let mut stream = client.receive_messages();
529 /// while let Some(message) = stream.next().await {
530 /// println!("Received: {:?}", message?);
531 /// }
532 /// # Ok(())
533 /// # }
534 /// ```
535 pub fn receive_messages(&self) -> Pin<Box<dyn Stream<Item = Result<Message>> + Send + '_>> {
536 let query = match &self.query {
537 Some(q) => Arc::clone(q),
538 None => {
539 return Box::pin(futures::stream::once(async {
540 Err(ClaudeError::InvalidConfig(
541 "Client not connected. Call connect() first.".to_string(),
542 ))
543 }));
544 }
545 };
546
547 Box::pin(async_stream::stream! {
548 let rx: Arc<Mutex<tokio::sync::mpsc::UnboundedReceiver<serde_json::Value>>> = {
549 let query_guard = query.lock().await;
550 Arc::clone(&query_guard.message_rx)
551 };
552
553 loop {
554 let message = {
555 let mut rx_guard = rx.lock().await;
556 rx_guard.recv().await
557 };
558
559 match message {
560 Some(json) => {
561 match MessageParser::parse(json) {
562 Ok(msg) => yield Ok(msg),
563 Err(e) => {
564 eprintln!("Failed to parse message: {}", e);
565 yield Err(e);
566 }
567 }
568 }
569 None => break,
570 }
571 }
572 })
573 }
574
575 /// Receive messages until a ResultMessage
576 ///
577 /// This method returns a stream that yields messages until it encounters
578 /// a `ResultMessage`, which signals the completion of a Claude response.
579 ///
580 /// This is the most common pattern for handling Claude responses, as it
581 /// processes one complete "turn" of the conversation.
582 ///
583 /// # Returns
584 ///
585 /// A stream of `Result<Message>` that ends when a ResultMessage is received.
586 ///
587 /// # Example
588 ///
589 /// ```no_run
590 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions, Message};
591 /// # use futures::StreamExt;
592 /// # #[tokio::main]
593 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
594 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
595 /// # client.connect().await?;
596 /// # client.query("Hello").await?;
597 /// let mut stream = client.receive_response();
598 /// while let Some(message) = stream.next().await {
599 /// match message? {
600 /// Message::Assistant(msg) => println!("Assistant: {:?}", msg),
601 /// Message::Result(result) => {
602 /// println!("Done! Cost: ${:?}", result.total_cost_usd);
603 /// break;
604 /// }
605 /// _ => {}
606 /// }
607 /// }
608 /// # Ok(())
609 /// # }
610 /// ```
611 pub fn receive_response(&self) -> Pin<Box<dyn Stream<Item = Result<Message>> + Send + '_>> {
612 let query = match &self.query {
613 Some(q) => Arc::clone(q),
614 None => {
615 return Box::pin(futures::stream::once(async {
616 Err(ClaudeError::InvalidConfig(
617 "Client not connected. Call connect() first.".to_string(),
618 ))
619 }));
620 }
621 };
622
623 Box::pin(async_stream::stream! {
624 let rx: Arc<Mutex<tokio::sync::mpsc::UnboundedReceiver<serde_json::Value>>> = {
625 let query_guard = query.lock().await;
626 Arc::clone(&query_guard.message_rx)
627 };
628
629 loop {
630 let message = {
631 let mut rx_guard = rx.lock().await;
632 rx_guard.recv().await
633 };
634
635 match message {
636 Some(json) => {
637 match MessageParser::parse(json) {
638 Ok(msg) => {
639 let is_result = matches!(msg, Message::Result(_));
640 yield Ok(msg);
641 if is_result {
642 break;
643 }
644 }
645 Err(e) => {
646 eprintln!("Failed to parse message: {}", e);
647 yield Err(e);
648 }
649 }
650 }
651 None => break,
652 }
653 }
654 })
655 }
656
657 /// Send an interrupt signal to stop the current Claude operation
658 ///
659 /// This is analogous to Python's `client.interrupt()`.
660 ///
661 /// # Errors
662 ///
663 /// Returns an error if the client is not connected or if sending fails.
664 pub async fn interrupt(&self) -> Result<()> {
665 let query = self.query.as_ref().ok_or_else(|| {
666 ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
667 })?;
668
669 let query_guard = query.lock().await;
670 query_guard.interrupt().await
671 }
672
673 /// Change the permission mode dynamically
674 ///
675 /// This is analogous to Python's `client.set_permission_mode()`.
676 ///
677 /// # Arguments
678 ///
679 /// * `mode` - The new permission mode to set
680 ///
681 /// # Errors
682 ///
683 /// Returns an error if the client is not connected or if sending fails.
684 pub async fn set_permission_mode(&self, mode: PermissionMode) -> Result<()> {
685 let query = self.query.as_ref().ok_or_else(|| {
686 ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
687 })?;
688
689 let query_guard = query.lock().await;
690 query_guard.set_permission_mode(mode).await
691 }
692
693 /// Change the AI model dynamically
694 ///
695 /// This is analogous to Python's `client.set_model()`.
696 ///
697 /// # Arguments
698 ///
699 /// * `model` - The new model name, or None to use default
700 ///
701 /// # Errors
702 ///
703 /// Returns an error if the client is not connected or if sending fails.
704 pub async fn set_model(&self, model: Option<&str>) -> Result<()> {
705 let query = self.query.as_ref().ok_or_else(|| {
706 ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
707 })?;
708
709 let query_guard = query.lock().await;
710 query_guard.set_model(model).await
711 }
712
713 /// Rewind tracked files to their state at a specific user message.
714 ///
715 /// This is analogous to Python's `client.rewind_files()`.
716 ///
717 /// # Requirements
718 ///
719 /// - `enable_file_checkpointing=true` in options to track file changes
720 /// - `extra_args={"replay-user-messages": None}` to receive UserMessage
721 /// objects with `uuid` in the response stream
722 ///
723 /// # Arguments
724 ///
725 /// * `user_message_id` - UUID of the user message to rewind to. This should be
726 /// the `uuid` field from a `UserMessage` received during the conversation.
727 ///
728 /// # Errors
729 ///
730 /// Returns an error if the client is not connected or if sending fails.
731 ///
732 /// # Example
733 ///
734 /// ```no_run
735 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions, Message};
736 /// # use std::collections::HashMap;
737 /// # #[tokio::main]
738 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
739 /// let options = ClaudeAgentOptions::builder()
740 /// .enable_file_checkpointing(true)
741 /// .extra_args(HashMap::from([("replay-user-messages".to_string(), None)]))
742 /// .build();
743 /// let mut client = ClaudeClient::new(options);
744 /// client.connect().await?;
745 ///
746 /// client.query("Make some changes to my files").await?;
747 /// let mut checkpoint_id = None;
748 /// {
749 /// let mut stream = client.receive_response();
750 /// use futures::StreamExt;
751 /// while let Some(Ok(msg)) = stream.next().await {
752 /// if let Message::User(user_msg) = &msg {
753 /// if let Some(uuid) = &user_msg.uuid {
754 /// checkpoint_id = Some(uuid.clone());
755 /// }
756 /// }
757 /// }
758 /// }
759 ///
760 /// // Later, rewind to that point
761 /// if let Some(id) = checkpoint_id {
762 /// client.rewind_files(&id).await?;
763 /// }
764 /// # Ok(())
765 /// # }
766 /// ```
767 pub async fn rewind_files(&self, user_message_id: &str) -> Result<()> {
768 let query = self.query.as_ref().ok_or_else(|| {
769 ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
770 })?;
771
772 let query_guard = query.lock().await;
773 query_guard.rewind_files(user_message_id).await
774 }
775
776 /// Get server initialization info including available commands and output styles
777 ///
778 /// Returns initialization information from the Claude Code server including:
779 /// - Available commands (slash commands, system commands, etc.)
780 /// - Current and available output styles
781 /// - Server capabilities
782 ///
783 /// This is analogous to Python's `client.get_server_info()`.
784 ///
785 /// # Returns
786 ///
787 /// Dictionary with server info, or None if not connected
788 ///
789 /// # Example
790 ///
791 /// ```no_run
792 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
793 /// # #[tokio::main]
794 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
795 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
796 /// # client.connect().await?;
797 /// if let Some(info) = client.get_server_info().await {
798 /// println!("Commands available: {}", info.get("commands").map(|c| c.as_array().map(|a| a.len()).unwrap_or(0)).unwrap_or(0));
799 /// println!("Output style: {:?}", info.get("output_style"));
800 /// }
801 /// # Ok(())
802 /// # }
803 /// ```
804 pub async fn get_server_info(&self) -> Option<serde_json::Value> {
805 let query = self.query.as_ref()?;
806 let query_guard = query.lock().await;
807 query_guard.get_initialization_result().await
808 }
809
810 /// Start a new session by switching to a different session ID
811 ///
812 /// This is a convenience method that creates a new conversation context.
813 /// It's equivalent to calling `query_with_session()` with a new session ID.
814 ///
815 /// To completely clear memory and start fresh, use `ClaudeAgentOptions::builder().fork_session(true).build()`
816 /// when creating a new client.
817 ///
818 /// # Arguments
819 ///
820 /// * `session_id` - The new session ID to use
821 /// * `prompt` - Initial message for the new session
822 ///
823 /// # Errors
824 ///
825 /// Returns an error if the client is not connected or if sending fails.
826 ///
827 /// # Example
828 ///
829 /// ```no_run
830 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
831 /// # #[tokio::main]
832 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
833 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
834 /// # client.connect().await?;
835 /// // First conversation
836 /// client.query("Hello").await?;
837 ///
838 /// // Start new conversation with different context
839 /// client.new_session("session-2", "Tell me about Rust").await?;
840 /// # Ok(())
841 /// # }
842 /// ```
843 pub async fn new_session(
844 &mut self,
845 session_id: impl Into<String>,
846 prompt: impl Into<String>,
847 ) -> Result<()> {
848 self.query_with_session(prompt, session_id).await
849 }
850
851 /// Disconnect from Claude (analogous to Python's __aexit__)
852 ///
853 /// This cleanly shuts down the connection to Claude Code CLI.
854 ///
855 /// # Errors
856 ///
857 /// Returns an error if disconnection fails.
858 #[instrument(name = "claude.client.disconnect", skip(self))]
859 pub async fn disconnect(&mut self) -> Result<()> {
860 if !self.connected {
861 debug!("Client already disconnected");
862 return Ok(());
863 }
864
865 info!("Disconnecting from Claude Code CLI");
866
867 if let Some(query) = self.query.take() {
868 // Close stdin first (using direct access) to signal CLI to exit
869 // This will cause the background task to finish and release transport lock
870 let query_guard = query.lock().await;
871 if let Some(ref stdin_arc) = query_guard.stdin {
872 let mut stdin_guard = stdin_arc.lock().await;
873 if let Some(mut stdin_stream) = stdin_guard.take() {
874 let _ = stdin_stream.shutdown().await;
875 }
876 }
877 let transport = Arc::clone(&query_guard.transport);
878 drop(query_guard);
879
880 // Give background task a moment to finish reading and release lock
881 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
882
883 let mut transport_guard = transport.lock().await;
884 transport_guard.close().await?;
885 }
886
887 self.connected = false;
888 debug!("Disconnected successfully");
889 Ok(())
890 }
891}
892
893impl Drop for ClaudeClient {
894 fn drop(&mut self) {
895 // Note: We can't run async code in Drop, so we can't guarantee clean shutdown
896 // Users should call disconnect() explicitly
897 if self.connected {
898 eprintln!(
899 "Warning: ClaudeClient dropped without calling disconnect(). Resources may not be cleaned up properly."
900 );
901 }
902 }
903}
904
905#[cfg(test)]
906mod tests {
907 use super::*;
908 use crate::types::permissions::{PermissionResult, PermissionResultAllow};
909 use std::sync::Arc;
910
911 #[tokio::test]
912 async fn test_connect_rejects_can_use_tool_with_custom_permission_tool() {
913 let callback: crate::types::permissions::CanUseToolCallback =
914 Arc::new(|_tool_name, _tool_input, _context| {
915 Box::pin(async move { PermissionResult::Allow(PermissionResultAllow::default()) })
916 });
917
918 let opts = ClaudeAgentOptions::builder()
919 .can_use_tool(callback)
920 .permission_prompt_tool_name("custom_tool") // Not "stdio"
921 .build();
922
923 let mut client = ClaudeClient::new(opts);
924 let result = client.connect().await;
925
926 assert!(result.is_err());
927 let err = result.unwrap_err();
928 assert!(matches!(err, ClaudeError::InvalidConfig(_)));
929 assert!(err.to_string().contains("permission_prompt_tool_name"));
930 }
931
932 #[tokio::test]
933 async fn test_connect_accepts_can_use_tool_with_stdio() {
934 let callback: crate::types::permissions::CanUseToolCallback =
935 Arc::new(|_tool_name, _tool_input, _context| {
936 Box::pin(async move { PermissionResult::Allow(PermissionResultAllow::default()) })
937 });
938
939 let opts = ClaudeAgentOptions::builder()
940 .can_use_tool(callback)
941 .permission_prompt_tool_name("stdio") // Explicitly "stdio" is OK
942 .build();
943
944 let mut client = ClaudeClient::new(opts);
945 // This will fail later (CLI not found), but should pass validation
946 let result = client.connect().await;
947
948 // Should not be InvalidConfig error about permission_prompt_tool_name
949 if let Err(ref err) = result {
950 assert!(
951 !err.to_string().contains("permission_prompt_tool_name"),
952 "Should not fail on permission_prompt_tool_name validation"
953 );
954 }
955 }
956
957 #[tokio::test]
958 async fn test_connect_accepts_can_use_tool_without_permission_tool() {
959 let callback: crate::types::permissions::CanUseToolCallback =
960 Arc::new(|_tool_name, _tool_input, _context| {
961 Box::pin(async move { PermissionResult::Allow(PermissionResultAllow::default()) })
962 });
963
964 let opts = ClaudeAgentOptions::builder()
965 .can_use_tool(callback)
966 // No permission_prompt_tool_name set - defaults to stdio
967 .build();
968
969 let mut client = ClaudeClient::new(opts);
970 // This will fail later (CLI not found), but should pass validation
971 let result = client.connect().await;
972
973 // Should not be InvalidConfig error about permission_prompt_tool_name
974 if let Err(ref err) = result {
975 assert!(
976 !err.to_string().contains("permission_prompt_tool_name"),
977 "Should not fail on permission_prompt_tool_name validation"
978 );
979 }
980 }
981}