claude_code_agent_sdk/client.rs
1//! ClaudeClient for bidirectional streaming interactions with hook support
2
3use tracing::{debug, info, instrument};
4
5use futures::stream::Stream;
6use std::pin::Pin;
7use std::sync::Arc;
8use tokio::io::AsyncWriteExt;
9use tokio::sync::Mutex;
10
11use crate::errors::{ClaudeError, Result};
12use crate::internal::message_parser::MessageParser;
13use crate::internal::query_full::QueryFull;
14use crate::internal::transport::subprocess::QueryPrompt;
15use crate::internal::transport::{SubprocessTransport, Transport};
16use crate::types::config::{ClaudeAgentOptions, PermissionMode};
17use crate::types::hooks::HookEvent;
18use crate::types::messages::{Message, UserContentBlock};
19
20/// Client for bidirectional streaming interactions with Claude
21///
22/// This client provides the same functionality as Python's ClaudeSDKClient,
23/// supporting bidirectional communication, streaming responses, and dynamic
24/// control over the Claude session.
25///
26/// # Example
27///
28/// ```no_run
29/// use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
30/// use futures::StreamExt;
31///
32/// #[tokio::main]
33/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
34/// let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
35///
36/// // Connect to Claude
37/// client.connect().await?;
38///
39/// // Send a query
40/// client.query("Hello Claude!").await?;
41///
42/// // Receive response as a stream
43/// {
44/// let mut stream = client.receive_response();
45/// while let Some(message) = stream.next().await {
46/// println!("Received: {:?}", message?);
47/// }
48/// }
49///
50/// // Disconnect
51/// client.disconnect().await?;
52/// Ok(())
53/// }
54/// ```
55pub struct ClaudeClient {
56 options: ClaudeAgentOptions,
57 query: Option<Arc<Mutex<QueryFull>>>,
58 connected: bool,
59}
60
61impl ClaudeClient {
62 /// Create a new ClaudeClient
63 ///
64 /// # Arguments
65 ///
66 /// * `options` - Configuration options for the Claude client
67 ///
68 /// # Example
69 ///
70 /// ```no_run
71 /// use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
72 ///
73 /// let client = ClaudeClient::new(ClaudeAgentOptions::default());
74 /// ```
75 pub fn new(options: ClaudeAgentOptions) -> Self {
76 Self {
77 options,
78 query: None,
79 connected: false,
80 }
81 }
82
83 /// Create a new ClaudeClient with early validation
84 ///
85 /// Unlike `new()`, this validates the configuration eagerly by attempting
86 /// to create the transport. This catches issues like invalid working directory
87 /// or missing CLI before `connect()` is called.
88 ///
89 /// # Arguments
90 ///
91 /// * `options` - Configuration options for the Claude client
92 ///
93 /// # Errors
94 ///
95 /// Returns an error if:
96 /// - The working directory does not exist or is not a directory
97 /// - Claude CLI cannot be found
98 ///
99 /// # Example
100 ///
101 /// ```no_run
102 /// use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
103 ///
104 /// let client = ClaudeClient::try_new(ClaudeAgentOptions::default())?;
105 /// # Ok::<(), claude_agent_sdk_rs::ClaudeError>(())
106 /// ```
107 pub fn try_new(options: ClaudeAgentOptions) -> Result<Self> {
108 // Validate by attempting to create transport (but don't keep it)
109 let prompt = QueryPrompt::Streaming;
110 let _ = SubprocessTransport::new(prompt, options.clone())?;
111
112 Ok(Self {
113 options,
114 query: None,
115 connected: false,
116 })
117 }
118
119 /// Connect to Claude (analogous to Python's __aenter__)
120 ///
121 /// This establishes the connection to the Claude Code CLI and initializes
122 /// the bidirectional communication channel.
123 ///
124 /// # Errors
125 ///
126 /// Returns an error if:
127 /// - Claude CLI cannot be found or started
128 /// - The initialization handshake fails
129 /// - Hook registration fails
130 /// - `can_use_tool` callback is set with incompatible `permission_prompt_tool_name`
131 #[instrument(
132 name = "claude.client.connect",
133 skip(self),
134 fields(
135 has_can_use_tool = self.options.can_use_tool.is_some(),
136 has_hooks = self.options.hooks.is_some(),
137 model = %self.options.model.as_deref().unwrap_or("default"),
138 )
139 )]
140 pub async fn connect(&mut self) -> Result<()> {
141 if self.connected {
142 debug!("Client already connected, skipping");
143 return Ok(());
144 }
145
146 info!("Connecting to Claude Code CLI");
147
148 // Automatically set permission_prompt_tool_name to "stdio" when can_use_tool is provided
149 // This matches Python SDK behavior (client.py lines 106-122)
150 // which ensures CLI uses control protocol for permission prompts
151 let mut options = self.options.clone();
152 if options.can_use_tool.is_some() && options.permission_prompt_tool_name.is_none() {
153 info!("can_use_tool callback is set, automatically setting permission_prompt_tool_name to 'stdio'");
154 options.permission_prompt_tool_name = Some("stdio".to_string());
155 }
156
157 // Validate can_use_tool configuration (aligned with Python SDK behavior)
158 // When can_use_tool callback is set, permission_prompt_tool_name must be "stdio"
159 // to ensure the control protocol can handle permission requests
160 if options.can_use_tool.is_some()
161 && let Some(ref tool_name) = options.permission_prompt_tool_name
162 && tool_name != "stdio"
163 {
164 return Err(ClaudeError::InvalidConfig(
165 "can_use_tool callback requires permission_prompt_tool_name to be 'stdio' or unset. \
166 Custom permission_prompt_tool_name is incompatible with can_use_tool callback."
167 .to_string(),
168 ));
169 }
170
171 // Create transport in streaming mode (no initial prompt)
172 let prompt = QueryPrompt::Streaming;
173 let mut transport = SubprocessTransport::new(prompt, options)?;
174
175 // Don't send initial prompt - we'll use query() for that
176 transport.connect().await?;
177
178 // Extract stdin for direct access (avoids transport lock deadlock)
179 let stdin = Arc::clone(&transport.stdin);
180
181 // Create Query with hooks
182 let mut query = QueryFull::new(Box::new(transport));
183 query.set_stdin(stdin);
184
185 // Set control request timeout from options
186 query.set_control_request_timeout(self.options.control_request_timeout);
187
188 // Extract SDK MCP servers from options
189 let sdk_mcp_servers =
190 if let crate::types::mcp::McpServers::Dict(servers_dict) = &self.options.mcp_servers {
191 servers_dict
192 .iter()
193 .filter_map(|(name, config)| {
194 if let crate::types::mcp::McpServerConfig::Sdk(sdk_config) = config {
195 Some((name.clone(), sdk_config.clone()))
196 } else {
197 None
198 }
199 })
200 .collect()
201 } else {
202 std::collections::HashMap::new()
203 };
204 query.set_sdk_mcp_servers(sdk_mcp_servers).await;
205
206 // Set can_use_tool callback if provided
207 if let Some(ref callback) = self.options.can_use_tool {
208 query.set_can_use_tool(Some(Arc::clone(callback))).await;
209 }
210
211 // Convert hooks to internal format
212 let hooks = self.options.hooks.as_ref().map(|hooks_map| {
213 hooks_map
214 .iter()
215 .map(|(event, matchers)| {
216 let event_name = match event {
217 HookEvent::PreToolUse => "PreToolUse",
218 HookEvent::PostToolUse => "PostToolUse",
219 HookEvent::UserPromptSubmit => "UserPromptSubmit",
220 HookEvent::Stop => "Stop",
221 HookEvent::SubagentStop => "SubagentStop",
222 HookEvent::PreCompact => "PreCompact",
223 };
224 (event_name.to_string(), matchers.clone())
225 })
226 .collect()
227 });
228
229 // Start reading messages in background FIRST
230 // This must happen before initialize() because initialize()
231 // sends a control request and waits for response
232 query.start().await?;
233
234 // Initialize with hooks (sends control request)
235 query.initialize(hooks).await?;
236
237 self.query = Some(Arc::new(Mutex::new(query)));
238 self.connected = true;
239
240 info!("Successfully connected to Claude Code CLI");
241 Ok(())
242 }
243
244 /// Send a query to Claude
245 ///
246 /// This sends a new user prompt to Claude. Claude will remember the context
247 /// of previous queries within the same session.
248 ///
249 /// # Arguments
250 ///
251 /// * `prompt` - The user prompt to send
252 ///
253 /// # Errors
254 ///
255 /// Returns an error if the client is not connected or if sending fails.
256 ///
257 /// # Example
258 ///
259 /// ```no_run
260 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
261 /// # #[tokio::main]
262 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
263 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
264 /// # client.connect().await?;
265 /// client.query("What is 2 + 2?").await?;
266 /// # Ok(())
267 /// # }
268 /// ```
269 #[instrument(
270 name = "claude.client.query",
271 skip(self, prompt),
272 fields(session_id = "default",)
273 )]
274 pub async fn query(&mut self, prompt: impl Into<String>) -> Result<()> {
275 self.query_with_session(prompt, "default").await
276 }
277
278 /// Send a query to Claude with a specific session ID
279 ///
280 /// This sends a new user prompt to Claude. Different session IDs maintain
281 /// separate conversation contexts.
282 ///
283 /// # Arguments
284 ///
285 /// * `prompt` - The user prompt to send
286 /// * `session_id` - Session identifier for the conversation
287 ///
288 /// # Errors
289 ///
290 /// Returns an error if the client is not connected or if sending fails.
291 ///
292 /// # Example
293 ///
294 /// ```no_run
295 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
296 /// # #[tokio::main]
297 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
298 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
299 /// # client.connect().await?;
300 /// // Separate conversation contexts
301 /// client.query_with_session("First question", "session-1").await?;
302 /// client.query_with_session("Different question", "session-2").await?;
303 /// # Ok(())
304 /// # }
305 /// ```
306 pub async fn query_with_session(
307 &mut self,
308 prompt: impl Into<String>,
309 session_id: impl Into<String>,
310 ) -> Result<()> {
311 let query = self.query.as_ref().ok_or_else(|| {
312 ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
313 })?;
314
315 let prompt_str = prompt.into();
316 let session_id_str = session_id.into();
317
318 // Format as JSON message for stream-json input format
319 let user_message = serde_json::json!({
320 "type": "user",
321 "message": {
322 "role": "user",
323 "content": prompt_str
324 },
325 "session_id": session_id_str
326 });
327
328 let message_str = serde_json::to_string(&user_message).map_err(|e| {
329 ClaudeError::Transport(format!("Failed to serialize user message: {}", e))
330 })?;
331
332 // Write directly to stdin (bypasses transport lock)
333 let query_guard = query.lock().await;
334 let stdin = query_guard.stdin.clone();
335 drop(query_guard);
336
337 if let Some(stdin_arc) = stdin {
338 let mut stdin_guard = stdin_arc.lock().await;
339 if let Some(ref mut stdin_stream) = *stdin_guard {
340 stdin_stream
341 .write_all(message_str.as_bytes())
342 .await
343 .map_err(|e| ClaudeError::Transport(format!("Failed to write query: {}", e)))?;
344 stdin_stream.write_all(b"\n").await.map_err(|e| {
345 ClaudeError::Transport(format!("Failed to write newline: {}", e))
346 })?;
347 stdin_stream
348 .flush()
349 .await
350 .map_err(|e| ClaudeError::Transport(format!("Failed to flush: {}", e)))?;
351 } else {
352 return Err(ClaudeError::Transport("stdin not available".to_string()));
353 }
354 } else {
355 return Err(ClaudeError::Transport("stdin not set".to_string()));
356 }
357
358 Ok(())
359 }
360
361 /// Send a query with structured content blocks (supports images)
362 ///
363 /// This method enables multimodal queries in bidirectional streaming mode.
364 /// Use it to send images alongside text for vision-related tasks.
365 ///
366 /// # Arguments
367 ///
368 /// * `content` - A vector of content blocks (text and/or images)
369 ///
370 /// # Errors
371 ///
372 /// Returns an error if:
373 /// - The content vector is empty (must include at least one text or image block)
374 /// - The client is not connected (call `connect()` first)
375 /// - Sending the message fails
376 ///
377 /// # Example
378 ///
379 /// ```no_run
380 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions, UserContentBlock};
381 /// # #[tokio::main]
382 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
383 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
384 /// # client.connect().await?;
385 /// let base64_data = "iVBORw0KGgo..."; // base64 encoded image
386 /// client.query_with_content(vec![
387 /// UserContentBlock::text("What's in this image?"),
388 /// UserContentBlock::image_base64("image/png", base64_data)?,
389 /// ]).await?;
390 /// # Ok(())
391 /// # }
392 /// ```
393 pub async fn query_with_content(
394 &mut self,
395 content: impl Into<Vec<UserContentBlock>>,
396 ) -> Result<()> {
397 self.query_with_content_and_session(content, "default")
398 .await
399 }
400
401 /// Send a query with structured content blocks and a specific session ID
402 ///
403 /// This method enables multimodal queries with session management for
404 /// maintaining separate conversation contexts.
405 ///
406 /// # Arguments
407 ///
408 /// * `content` - A vector of content blocks (text and/or images)
409 /// * `session_id` - Session identifier for the conversation
410 ///
411 /// # Errors
412 ///
413 /// Returns an error if:
414 /// - The content vector is empty (must include at least one text or image block)
415 /// - The client is not connected (call `connect()` first)
416 /// - Sending the message fails
417 ///
418 /// # Example
419 ///
420 /// ```no_run
421 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions, UserContentBlock};
422 /// # #[tokio::main]
423 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
424 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
425 /// # client.connect().await?;
426 /// client.query_with_content_and_session(
427 /// vec![
428 /// UserContentBlock::text("Analyze this chart"),
429 /// UserContentBlock::image_url("https://example.com/chart.png"),
430 /// ],
431 /// "analysis-session",
432 /// ).await?;
433 /// # Ok(())
434 /// # }
435 /// ```
436 pub async fn query_with_content_and_session(
437 &mut self,
438 content: impl Into<Vec<UserContentBlock>>,
439 session_id: impl Into<String>,
440 ) -> Result<()> {
441 let query = self.query.as_ref().ok_or_else(|| {
442 ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
443 })?;
444
445 let content_blocks: Vec<UserContentBlock> = content.into();
446 UserContentBlock::validate_content(&content_blocks)?;
447
448 let session_id_str = session_id.into();
449
450 // Format as JSON message for stream-json input format
451 // Content is an array of content blocks, not a simple string
452 let user_message = serde_json::json!({
453 "type": "user",
454 "message": {
455 "role": "user",
456 "content": content_blocks
457 },
458 "session_id": session_id_str
459 });
460
461 let message_str = serde_json::to_string(&user_message).map_err(|e| {
462 ClaudeError::Transport(format!("Failed to serialize user message: {}", e))
463 })?;
464
465 // Write directly to stdin (bypasses transport lock)
466 let query_guard = query.lock().await;
467 let stdin = query_guard.stdin.clone();
468 drop(query_guard);
469
470 if let Some(stdin_arc) = stdin {
471 let mut stdin_guard = stdin_arc.lock().await;
472 if let Some(ref mut stdin_stream) = *stdin_guard {
473 stdin_stream
474 .write_all(message_str.as_bytes())
475 .await
476 .map_err(|e| ClaudeError::Transport(format!("Failed to write query: {}", e)))?;
477 stdin_stream.write_all(b"\n").await.map_err(|e| {
478 ClaudeError::Transport(format!("Failed to write newline: {}", e))
479 })?;
480 stdin_stream
481 .flush()
482 .await
483 .map_err(|e| ClaudeError::Transport(format!("Failed to flush: {}", e)))?;
484 } else {
485 return Err(ClaudeError::Transport("stdin not available".to_string()));
486 }
487 } else {
488 return Err(ClaudeError::Transport("stdin not set".to_string()));
489 }
490
491 Ok(())
492 }
493
494 /// Receive all messages as a stream (continuous)
495 ///
496 /// This method returns a stream that yields all messages from Claude
497 /// indefinitely until the stream is closed or an error occurs.
498 ///
499 /// Use this when you want to process all messages, including multiple
500 /// responses and system events.
501 ///
502 /// # Returns
503 ///
504 /// A stream of `Result<Message>` that continues until the connection closes.
505 ///
506 /// # Example
507 ///
508 /// ```no_run
509 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
510 /// # use futures::StreamExt;
511 /// # #[tokio::main]
512 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
513 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
514 /// # client.connect().await?;
515 /// # client.query("Hello").await?;
516 /// let mut stream = client.receive_messages();
517 /// while let Some(message) = stream.next().await {
518 /// println!("Received: {:?}", message?);
519 /// }
520 /// # Ok(())
521 /// # }
522 /// ```
523 pub fn receive_messages(&self) -> Pin<Box<dyn Stream<Item = Result<Message>> + Send + '_>> {
524 let query = match &self.query {
525 Some(q) => Arc::clone(q),
526 None => {
527 return Box::pin(futures::stream::once(async {
528 Err(ClaudeError::InvalidConfig(
529 "Client not connected. Call connect() first.".to_string(),
530 ))
531 }));
532 }
533 };
534
535 Box::pin(async_stream::stream! {
536 let rx: Arc<Mutex<tokio::sync::mpsc::UnboundedReceiver<serde_json::Value>>> = {
537 let query_guard = query.lock().await;
538 Arc::clone(&query_guard.message_rx)
539 };
540
541 loop {
542 let message = {
543 let mut rx_guard = rx.lock().await;
544 rx_guard.recv().await
545 };
546
547 match message {
548 Some(json) => {
549 match MessageParser::parse(json) {
550 Ok(msg) => yield Ok(msg),
551 Err(e) => {
552 eprintln!("Failed to parse message: {}", e);
553 yield Err(e);
554 }
555 }
556 }
557 None => break,
558 }
559 }
560 })
561 }
562
563 /// Receive messages until a ResultMessage
564 ///
565 /// This method returns a stream that yields messages until it encounters
566 /// a `ResultMessage`, which signals the completion of a Claude response.
567 ///
568 /// This is the most common pattern for handling Claude responses, as it
569 /// processes one complete "turn" of the conversation.
570 ///
571 /// # Returns
572 ///
573 /// A stream of `Result<Message>` that ends when a ResultMessage is received.
574 ///
575 /// # Example
576 ///
577 /// ```no_run
578 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions, Message};
579 /// # use futures::StreamExt;
580 /// # #[tokio::main]
581 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
582 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
583 /// # client.connect().await?;
584 /// # client.query("Hello").await?;
585 /// let mut stream = client.receive_response();
586 /// while let Some(message) = stream.next().await {
587 /// match message? {
588 /// Message::Assistant(msg) => println!("Assistant: {:?}", msg),
589 /// Message::Result(result) => {
590 /// println!("Done! Cost: ${:?}", result.total_cost_usd);
591 /// break;
592 /// }
593 /// _ => {}
594 /// }
595 /// }
596 /// # Ok(())
597 /// # }
598 /// ```
599 pub fn receive_response(&self) -> Pin<Box<dyn Stream<Item = Result<Message>> + Send + '_>> {
600 let query = match &self.query {
601 Some(q) => Arc::clone(q),
602 None => {
603 return Box::pin(futures::stream::once(async {
604 Err(ClaudeError::InvalidConfig(
605 "Client not connected. Call connect() first.".to_string(),
606 ))
607 }));
608 }
609 };
610
611 Box::pin(async_stream::stream! {
612 let rx: Arc<Mutex<tokio::sync::mpsc::UnboundedReceiver<serde_json::Value>>> = {
613 let query_guard = query.lock().await;
614 Arc::clone(&query_guard.message_rx)
615 };
616
617 loop {
618 let message = {
619 let mut rx_guard = rx.lock().await;
620 rx_guard.recv().await
621 };
622
623 match message {
624 Some(json) => {
625 match MessageParser::parse(json) {
626 Ok(msg) => {
627 let is_result = matches!(msg, Message::Result(_));
628 yield Ok(msg);
629 if is_result {
630 break;
631 }
632 }
633 Err(e) => {
634 eprintln!("Failed to parse message: {}", e);
635 yield Err(e);
636 }
637 }
638 }
639 None => break,
640 }
641 }
642 })
643 }
644
645 /// Send an interrupt signal to stop the current Claude operation
646 ///
647 /// This is analogous to Python's `client.interrupt()`.
648 ///
649 /// # Errors
650 ///
651 /// Returns an error if the client is not connected or if sending fails.
652 pub async fn interrupt(&self) -> Result<()> {
653 let query = self.query.as_ref().ok_or_else(|| {
654 ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
655 })?;
656
657 let query_guard = query.lock().await;
658 query_guard.interrupt().await
659 }
660
661 /// Change the permission mode dynamically
662 ///
663 /// This is analogous to Python's `client.set_permission_mode()`.
664 ///
665 /// # Arguments
666 ///
667 /// * `mode` - The new permission mode to set
668 ///
669 /// # Errors
670 ///
671 /// Returns an error if the client is not connected or if sending fails.
672 pub async fn set_permission_mode(&self, mode: PermissionMode) -> Result<()> {
673 let query = self.query.as_ref().ok_or_else(|| {
674 ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
675 })?;
676
677 let query_guard = query.lock().await;
678 query_guard.set_permission_mode(mode).await
679 }
680
681 /// Change the AI model dynamically
682 ///
683 /// This is analogous to Python's `client.set_model()`.
684 ///
685 /// # Arguments
686 ///
687 /// * `model` - The new model name, or None to use default
688 ///
689 /// # Errors
690 ///
691 /// Returns an error if the client is not connected or if sending fails.
692 pub async fn set_model(&self, model: Option<&str>) -> Result<()> {
693 let query = self.query.as_ref().ok_or_else(|| {
694 ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
695 })?;
696
697 let query_guard = query.lock().await;
698 query_guard.set_model(model).await
699 }
700
701 /// Rewind tracked files to their state at a specific user message.
702 ///
703 /// This is analogous to Python's `client.rewind_files()`.
704 ///
705 /// # Requirements
706 ///
707 /// - `enable_file_checkpointing=true` in options to track file changes
708 /// - `extra_args={"replay-user-messages": None}` to receive UserMessage
709 /// objects with `uuid` in the response stream
710 ///
711 /// # Arguments
712 ///
713 /// * `user_message_id` - UUID of the user message to rewind to. This should be
714 /// the `uuid` field from a `UserMessage` received during the conversation.
715 ///
716 /// # Errors
717 ///
718 /// Returns an error if the client is not connected or if sending fails.
719 ///
720 /// # Example
721 ///
722 /// ```no_run
723 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions, Message};
724 /// # use std::collections::HashMap;
725 /// # #[tokio::main]
726 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
727 /// let options = ClaudeAgentOptions::builder()
728 /// .enable_file_checkpointing(true)
729 /// .extra_args(HashMap::from([("replay-user-messages".to_string(), None)]))
730 /// .build();
731 /// let mut client = ClaudeClient::new(options);
732 /// client.connect().await?;
733 ///
734 /// client.query("Make some changes to my files").await?;
735 /// let mut checkpoint_id = None;
736 /// {
737 /// let mut stream = client.receive_response();
738 /// use futures::StreamExt;
739 /// while let Some(Ok(msg)) = stream.next().await {
740 /// if let Message::User(user_msg) = &msg {
741 /// if let Some(uuid) = &user_msg.uuid {
742 /// checkpoint_id = Some(uuid.clone());
743 /// }
744 /// }
745 /// }
746 /// }
747 ///
748 /// // Later, rewind to that point
749 /// if let Some(id) = checkpoint_id {
750 /// client.rewind_files(&id).await?;
751 /// }
752 /// # Ok(())
753 /// # }
754 /// ```
755 pub async fn rewind_files(&self, user_message_id: &str) -> Result<()> {
756 let query = self.query.as_ref().ok_or_else(|| {
757 ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
758 })?;
759
760 let query_guard = query.lock().await;
761 query_guard.rewind_files(user_message_id).await
762 }
763
764 /// Get server initialization info including available commands and output styles
765 ///
766 /// Returns initialization information from the Claude Code server including:
767 /// - Available commands (slash commands, system commands, etc.)
768 /// - Current and available output styles
769 /// - Server capabilities
770 ///
771 /// This is analogous to Python's `client.get_server_info()`.
772 ///
773 /// # Returns
774 ///
775 /// Dictionary with server info, or None if not connected
776 ///
777 /// # Example
778 ///
779 /// ```no_run
780 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
781 /// # #[tokio::main]
782 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
783 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
784 /// # client.connect().await?;
785 /// if let Some(info) = client.get_server_info().await {
786 /// println!("Commands available: {}", info.get("commands").map(|c| c.as_array().map(|a| a.len()).unwrap_or(0)).unwrap_or(0));
787 /// println!("Output style: {:?}", info.get("output_style"));
788 /// }
789 /// # Ok(())
790 /// # }
791 /// ```
792 pub async fn get_server_info(&self) -> Option<serde_json::Value> {
793 let query = self.query.as_ref()?;
794 let query_guard = query.lock().await;
795 query_guard.get_initialization_result().await
796 }
797
798 /// Start a new session by switching to a different session ID
799 ///
800 /// This is a convenience method that creates a new conversation context.
801 /// It's equivalent to calling `query_with_session()` with a new session ID.
802 ///
803 /// To completely clear memory and start fresh, use `ClaudeAgentOptions::builder().fork_session(true).build()`
804 /// when creating a new client.
805 ///
806 /// # Arguments
807 ///
808 /// * `session_id` - The new session ID to use
809 /// * `prompt` - Initial message for the new session
810 ///
811 /// # Errors
812 ///
813 /// Returns an error if the client is not connected or if sending fails.
814 ///
815 /// # Example
816 ///
817 /// ```no_run
818 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
819 /// # #[tokio::main]
820 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
821 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
822 /// # client.connect().await?;
823 /// // First conversation
824 /// client.query("Hello").await?;
825 ///
826 /// // Start new conversation with different context
827 /// client.new_session("session-2", "Tell me about Rust").await?;
828 /// # Ok(())
829 /// # }
830 /// ```
831 pub async fn new_session(
832 &mut self,
833 session_id: impl Into<String>,
834 prompt: impl Into<String>,
835 ) -> Result<()> {
836 self.query_with_session(prompt, session_id).await
837 }
838
839 /// Disconnect from Claude (analogous to Python's __aexit__)
840 ///
841 /// This cleanly shuts down the connection to Claude Code CLI.
842 ///
843 /// # Errors
844 ///
845 /// Returns an error if disconnection fails.
846 #[instrument(name = "claude.client.disconnect", skip(self))]
847 pub async fn disconnect(&mut self) -> Result<()> {
848 if !self.connected {
849 debug!("Client already disconnected");
850 return Ok(());
851 }
852
853 info!("Disconnecting from Claude Code CLI");
854
855 if let Some(query) = self.query.take() {
856 // Close stdin first (using direct access) to signal CLI to exit
857 // This will cause the background task to finish and release transport lock
858 let query_guard = query.lock().await;
859 if let Some(ref stdin_arc) = query_guard.stdin {
860 let mut stdin_guard = stdin_arc.lock().await;
861 if let Some(mut stdin_stream) = stdin_guard.take() {
862 let _ = stdin_stream.shutdown().await;
863 }
864 }
865 let transport = Arc::clone(&query_guard.transport);
866 drop(query_guard);
867
868 // Give background task a moment to finish reading and release lock
869 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
870
871 let mut transport_guard = transport.lock().await;
872 transport_guard.close().await?;
873 }
874
875 self.connected = false;
876 debug!("Disconnected successfully");
877 Ok(())
878 }
879}
880
881impl Drop for ClaudeClient {
882 fn drop(&mut self) {
883 // Note: We can't run async code in Drop, so we can't guarantee clean shutdown
884 // Users should call disconnect() explicitly
885 if self.connected {
886 eprintln!(
887 "Warning: ClaudeClient dropped without calling disconnect(). Resources may not be cleaned up properly."
888 );
889 }
890 }
891}
892
893#[cfg(test)]
894mod tests {
895 use super::*;
896 use crate::types::permissions::{PermissionResult, PermissionResultAllow};
897 use std::sync::Arc;
898
899 #[tokio::test]
900 async fn test_connect_rejects_can_use_tool_with_custom_permission_tool() {
901 let callback: crate::types::permissions::CanUseToolCallback =
902 Arc::new(|_tool_name, _tool_input, _context| {
903 Box::pin(async move { PermissionResult::Allow(PermissionResultAllow::default()) })
904 });
905
906 let opts = ClaudeAgentOptions::builder()
907 .can_use_tool(callback)
908 .permission_prompt_tool_name("custom_tool") // Not "stdio"
909 .build();
910
911 let mut client = ClaudeClient::new(opts);
912 let result = client.connect().await;
913
914 assert!(result.is_err());
915 let err = result.unwrap_err();
916 assert!(matches!(err, ClaudeError::InvalidConfig(_)));
917 assert!(err.to_string().contains("permission_prompt_tool_name"));
918 }
919
920 #[tokio::test]
921 async fn test_connect_accepts_can_use_tool_with_stdio() {
922 let callback: crate::types::permissions::CanUseToolCallback =
923 Arc::new(|_tool_name, _tool_input, _context| {
924 Box::pin(async move { PermissionResult::Allow(PermissionResultAllow::default()) })
925 });
926
927 let opts = ClaudeAgentOptions::builder()
928 .can_use_tool(callback)
929 .permission_prompt_tool_name("stdio") // Explicitly "stdio" is OK
930 .build();
931
932 let mut client = ClaudeClient::new(opts);
933 // This will fail later (CLI not found), but should pass validation
934 let result = client.connect().await;
935
936 // Should not be InvalidConfig error about permission_prompt_tool_name
937 if let Err(ref err) = result {
938 assert!(
939 !err.to_string().contains("permission_prompt_tool_name"),
940 "Should not fail on permission_prompt_tool_name validation"
941 );
942 }
943 }
944
945 #[tokio::test]
946 async fn test_connect_accepts_can_use_tool_without_permission_tool() {
947 let callback: crate::types::permissions::CanUseToolCallback =
948 Arc::new(|_tool_name, _tool_input, _context| {
949 Box::pin(async move { PermissionResult::Allow(PermissionResultAllow::default()) })
950 });
951
952 let opts = ClaudeAgentOptions::builder()
953 .can_use_tool(callback)
954 // No permission_prompt_tool_name set - defaults to stdio
955 .build();
956
957 let mut client = ClaudeClient::new(opts);
958 // This will fail later (CLI not found), but should pass validation
959 let result = client.connect().await;
960
961 // Should not be InvalidConfig error about permission_prompt_tool_name
962 if let Err(ref err) = result {
963 assert!(
964 !err.to_string().contains("permission_prompt_tool_name"),
965 "Should not fail on permission_prompt_tool_name validation"
966 );
967 }
968 }
969}