claude_code_agent_sdk/client.rs
1//! ClaudeClient for bidirectional streaming interactions with hook support
2
3use tracing::{debug, info, instrument};
4
5use futures::stream::Stream;
6use std::pin::Pin;
7use std::sync::Arc;
8use tokio::io::AsyncWriteExt;
9use tokio::sync::Mutex;
10
11use crate::errors::{ClaudeError, Result};
12use crate::internal::message_parser::MessageParser;
13use crate::internal::query_full::QueryFull;
14use crate::internal::transport::subprocess::QueryPrompt;
15use crate::internal::transport::{SubprocessTransport, Transport};
16use crate::types::config::{ClaudeAgentOptions, PermissionMode};
17use crate::types::hooks::HookEvent;
18use crate::types::messages::{Message, UserContentBlock};
19
20/// Client for bidirectional streaming interactions with Claude
21///
22/// This client provides the same functionality as Python's ClaudeSDKClient,
23/// supporting bidirectional communication, streaming responses, and dynamic
24/// control over the Claude session.
25///
26/// # Example
27///
28/// ```no_run
29/// use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
30/// use futures::StreamExt;
31///
32/// #[tokio::main]
33/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
34/// let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
35///
36/// // Connect to Claude
37/// client.connect().await?;
38///
39/// // Send a query
40/// client.query("Hello Claude!").await?;
41///
42/// // Receive response as a stream
43/// {
44/// let mut stream = client.receive_response();
45/// while let Some(message) = stream.next().await {
46/// println!("Received: {:?}", message?);
47/// }
48/// }
49///
50/// // Disconnect
51/// client.disconnect().await?;
52/// Ok(())
53/// }
54/// ```
55pub struct ClaudeClient {
56 options: ClaudeAgentOptions,
57 query: Option<Arc<Mutex<QueryFull>>>,
58 connected: bool,
59}
60
61impl ClaudeClient {
62 /// Create a new ClaudeClient
63 ///
64 /// # Arguments
65 ///
66 /// * `options` - Configuration options for the Claude client
67 ///
68 /// # Example
69 ///
70 /// ```no_run
71 /// use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
72 ///
73 /// let client = ClaudeClient::new(ClaudeAgentOptions::default());
74 /// ```
75 pub fn new(options: ClaudeAgentOptions) -> Self {
76 Self {
77 options,
78 query: None,
79 connected: false,
80 }
81 }
82
83 /// Create a new ClaudeClient with early validation
84 ///
85 /// Unlike `new()`, this validates the configuration eagerly by attempting
86 /// to create the transport. This catches issues like invalid working directory
87 /// or missing CLI before `connect()` is called.
88 ///
89 /// # Arguments
90 ///
91 /// * `options` - Configuration options for the Claude client
92 ///
93 /// # Errors
94 ///
95 /// Returns an error if:
96 /// - The working directory does not exist or is not a directory
97 /// - Claude CLI cannot be found
98 ///
99 /// # Example
100 ///
101 /// ```no_run
102 /// use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
103 ///
104 /// let client = ClaudeClient::try_new(ClaudeAgentOptions::default())?;
105 /// # Ok::<(), claude_agent_sdk_rs::ClaudeError>(())
106 /// ```
107 pub fn try_new(options: ClaudeAgentOptions) -> Result<Self> {
108 // Validate by attempting to create transport (but don't keep it)
109 let prompt = QueryPrompt::Streaming;
110 let _ = SubprocessTransport::new(prompt, options.clone())?;
111
112 Ok(Self {
113 options,
114 query: None,
115 connected: false,
116 })
117 }
118
119 /// Connect to Claude (analogous to Python's __aenter__)
120 ///
121 /// This establishes the connection to the Claude Code CLI and initializes
122 /// the bidirectional communication channel.
123 ///
124 /// # Errors
125 ///
126 /// Returns an error if:
127 /// - Claude CLI cannot be found or started
128 /// - The initialization handshake fails
129 /// - Hook registration fails
130 /// - `can_use_tool` callback is set with incompatible `permission_prompt_tool_name`
131 #[instrument(
132 name = "claude.client.connect",
133 skip(self),
134 fields(
135 has_can_use_tool = self.options.can_use_tool.is_some(),
136 has_hooks = self.options.hooks.is_some(),
137 model = %self.options.model.as_deref().unwrap_or("default"),
138 )
139 )]
140 pub async fn connect(&mut self) -> Result<()> {
141 if self.connected {
142 debug!("Client already connected, skipping");
143 return Ok(());
144 }
145
146 info!("Connecting to Claude Code CLI");
147
148 // Validate can_use_tool configuration (aligned with Python SDK behavior)
149 // When can_use_tool callback is set, permission_prompt_tool_name must be "stdio"
150 // to ensure the control protocol can handle permission requests
151 if self.options.can_use_tool.is_some()
152 && let Some(ref tool_name) = self.options.permission_prompt_tool_name
153 && tool_name != "stdio"
154 {
155 return Err(ClaudeError::InvalidConfig(
156 "can_use_tool callback requires permission_prompt_tool_name to be 'stdio' or unset. \
157 Custom permission_prompt_tool_name is incompatible with can_use_tool callback."
158 .to_string(),
159 ));
160 }
161
162 // Create transport in streaming mode (no initial prompt)
163 let prompt = QueryPrompt::Streaming;
164 let mut transport = SubprocessTransport::new(prompt, self.options.clone())?;
165
166 // Don't send initial prompt - we'll use query() for that
167 transport.connect().await?;
168
169 // Extract stdin for direct access (avoids transport lock deadlock)
170 let stdin = Arc::clone(&transport.stdin);
171
172 // Create Query with hooks
173 let mut query = QueryFull::new(Box::new(transport));
174 query.set_stdin(stdin);
175
176 // Set control request timeout from options
177 query.set_control_request_timeout(self.options.control_request_timeout);
178
179 // Extract SDK MCP servers from options
180 let sdk_mcp_servers =
181 if let crate::types::mcp::McpServers::Dict(servers_dict) = &self.options.mcp_servers {
182 servers_dict
183 .iter()
184 .filter_map(|(name, config)| {
185 if let crate::types::mcp::McpServerConfig::Sdk(sdk_config) = config {
186 Some((name.clone(), sdk_config.clone()))
187 } else {
188 None
189 }
190 })
191 .collect()
192 } else {
193 std::collections::HashMap::new()
194 };
195 query.set_sdk_mcp_servers(sdk_mcp_servers).await;
196
197 // Set can_use_tool callback if provided
198 if let Some(ref callback) = self.options.can_use_tool {
199 query.set_can_use_tool(Some(Arc::clone(callback))).await;
200 }
201
202 // Convert hooks to internal format
203 let hooks = self.options.hooks.as_ref().map(|hooks_map| {
204 hooks_map
205 .iter()
206 .map(|(event, matchers)| {
207 let event_name = match event {
208 HookEvent::PreToolUse => "PreToolUse",
209 HookEvent::PostToolUse => "PostToolUse",
210 HookEvent::UserPromptSubmit => "UserPromptSubmit",
211 HookEvent::Stop => "Stop",
212 HookEvent::SubagentStop => "SubagentStop",
213 HookEvent::PreCompact => "PreCompact",
214 };
215 (event_name.to_string(), matchers.clone())
216 })
217 .collect()
218 });
219
220 // Start reading messages in background FIRST
221 // This must happen before initialize() because initialize()
222 // sends a control request and waits for response
223 query.start().await?;
224
225 // Initialize with hooks (sends control request)
226 query.initialize(hooks).await?;
227
228 self.query = Some(Arc::new(Mutex::new(query)));
229 self.connected = true;
230
231 info!("Successfully connected to Claude Code CLI");
232 Ok(())
233 }
234
235 /// Send a query to Claude
236 ///
237 /// This sends a new user prompt to Claude. Claude will remember the context
238 /// of previous queries within the same session.
239 ///
240 /// # Arguments
241 ///
242 /// * `prompt` - The user prompt to send
243 ///
244 /// # Errors
245 ///
246 /// Returns an error if the client is not connected or if sending fails.
247 ///
248 /// # Example
249 ///
250 /// ```no_run
251 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
252 /// # #[tokio::main]
253 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
254 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
255 /// # client.connect().await?;
256 /// client.query("What is 2 + 2?").await?;
257 /// # Ok(())
258 /// # }
259 /// ```
260 #[instrument(
261 name = "claude.client.query",
262 skip(self, prompt),
263 fields(session_id = "default",)
264 )]
265 pub async fn query(&mut self, prompt: impl Into<String>) -> Result<()> {
266 self.query_with_session(prompt, "default").await
267 }
268
269 /// Send a query to Claude with a specific session ID
270 ///
271 /// This sends a new user prompt to Claude. Different session IDs maintain
272 /// separate conversation contexts.
273 ///
274 /// # Arguments
275 ///
276 /// * `prompt` - The user prompt to send
277 /// * `session_id` - Session identifier for the conversation
278 ///
279 /// # Errors
280 ///
281 /// Returns an error if the client is not connected or if sending fails.
282 ///
283 /// # Example
284 ///
285 /// ```no_run
286 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
287 /// # #[tokio::main]
288 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
289 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
290 /// # client.connect().await?;
291 /// // Separate conversation contexts
292 /// client.query_with_session("First question", "session-1").await?;
293 /// client.query_with_session("Different question", "session-2").await?;
294 /// # Ok(())
295 /// # }
296 /// ```
297 pub async fn query_with_session(
298 &mut self,
299 prompt: impl Into<String>,
300 session_id: impl Into<String>,
301 ) -> Result<()> {
302 let query = self.query.as_ref().ok_or_else(|| {
303 ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
304 })?;
305
306 let prompt_str = prompt.into();
307 let session_id_str = session_id.into();
308
309 // Format as JSON message for stream-json input format
310 let user_message = serde_json::json!({
311 "type": "user",
312 "message": {
313 "role": "user",
314 "content": prompt_str
315 },
316 "session_id": session_id_str
317 });
318
319 let message_str = serde_json::to_string(&user_message).map_err(|e| {
320 ClaudeError::Transport(format!("Failed to serialize user message: {}", e))
321 })?;
322
323 // Write directly to stdin (bypasses transport lock)
324 let query_guard = query.lock().await;
325 let stdin = query_guard.stdin.clone();
326 drop(query_guard);
327
328 if let Some(stdin_arc) = stdin {
329 let mut stdin_guard = stdin_arc.lock().await;
330 if let Some(ref mut stdin_stream) = *stdin_guard {
331 stdin_stream
332 .write_all(message_str.as_bytes())
333 .await
334 .map_err(|e| ClaudeError::Transport(format!("Failed to write query: {}", e)))?;
335 stdin_stream.write_all(b"\n").await.map_err(|e| {
336 ClaudeError::Transport(format!("Failed to write newline: {}", e))
337 })?;
338 stdin_stream
339 .flush()
340 .await
341 .map_err(|e| ClaudeError::Transport(format!("Failed to flush: {}", e)))?;
342 } else {
343 return Err(ClaudeError::Transport("stdin not available".to_string()));
344 }
345 } else {
346 return Err(ClaudeError::Transport("stdin not set".to_string()));
347 }
348
349 Ok(())
350 }
351
352 /// Send a query with structured content blocks (supports images)
353 ///
354 /// This method enables multimodal queries in bidirectional streaming mode.
355 /// Use it to send images alongside text for vision-related tasks.
356 ///
357 /// # Arguments
358 ///
359 /// * `content` - A vector of content blocks (text and/or images)
360 ///
361 /// # Errors
362 ///
363 /// Returns an error if:
364 /// - The content vector is empty (must include at least one text or image block)
365 /// - The client is not connected (call `connect()` first)
366 /// - Sending the message fails
367 ///
368 /// # Example
369 ///
370 /// ```no_run
371 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions, UserContentBlock};
372 /// # #[tokio::main]
373 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
374 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
375 /// # client.connect().await?;
376 /// let base64_data = "iVBORw0KGgo..."; // base64 encoded image
377 /// client.query_with_content(vec![
378 /// UserContentBlock::text("What's in this image?"),
379 /// UserContentBlock::image_base64("image/png", base64_data)?,
380 /// ]).await?;
381 /// # Ok(())
382 /// # }
383 /// ```
384 pub async fn query_with_content(
385 &mut self,
386 content: impl Into<Vec<UserContentBlock>>,
387 ) -> Result<()> {
388 self.query_with_content_and_session(content, "default")
389 .await
390 }
391
392 /// Send a query with structured content blocks and a specific session ID
393 ///
394 /// This method enables multimodal queries with session management for
395 /// maintaining separate conversation contexts.
396 ///
397 /// # Arguments
398 ///
399 /// * `content` - A vector of content blocks (text and/or images)
400 /// * `session_id` - Session identifier for the conversation
401 ///
402 /// # Errors
403 ///
404 /// Returns an error if:
405 /// - The content vector is empty (must include at least one text or image block)
406 /// - The client is not connected (call `connect()` first)
407 /// - Sending the message fails
408 ///
409 /// # Example
410 ///
411 /// ```no_run
412 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions, UserContentBlock};
413 /// # #[tokio::main]
414 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
415 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
416 /// # client.connect().await?;
417 /// client.query_with_content_and_session(
418 /// vec![
419 /// UserContentBlock::text("Analyze this chart"),
420 /// UserContentBlock::image_url("https://example.com/chart.png"),
421 /// ],
422 /// "analysis-session",
423 /// ).await?;
424 /// # Ok(())
425 /// # }
426 /// ```
427 pub async fn query_with_content_and_session(
428 &mut self,
429 content: impl Into<Vec<UserContentBlock>>,
430 session_id: impl Into<String>,
431 ) -> Result<()> {
432 let query = self.query.as_ref().ok_or_else(|| {
433 ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
434 })?;
435
436 let content_blocks: Vec<UserContentBlock> = content.into();
437 UserContentBlock::validate_content(&content_blocks)?;
438
439 let session_id_str = session_id.into();
440
441 // Format as JSON message for stream-json input format
442 // Content is an array of content blocks, not a simple string
443 let user_message = serde_json::json!({
444 "type": "user",
445 "message": {
446 "role": "user",
447 "content": content_blocks
448 },
449 "session_id": session_id_str
450 });
451
452 let message_str = serde_json::to_string(&user_message).map_err(|e| {
453 ClaudeError::Transport(format!("Failed to serialize user message: {}", e))
454 })?;
455
456 // Write directly to stdin (bypasses transport lock)
457 let query_guard = query.lock().await;
458 let stdin = query_guard.stdin.clone();
459 drop(query_guard);
460
461 if let Some(stdin_arc) = stdin {
462 let mut stdin_guard = stdin_arc.lock().await;
463 if let Some(ref mut stdin_stream) = *stdin_guard {
464 stdin_stream
465 .write_all(message_str.as_bytes())
466 .await
467 .map_err(|e| ClaudeError::Transport(format!("Failed to write query: {}", e)))?;
468 stdin_stream.write_all(b"\n").await.map_err(|e| {
469 ClaudeError::Transport(format!("Failed to write newline: {}", e))
470 })?;
471 stdin_stream
472 .flush()
473 .await
474 .map_err(|e| ClaudeError::Transport(format!("Failed to flush: {}", e)))?;
475 } else {
476 return Err(ClaudeError::Transport("stdin not available".to_string()));
477 }
478 } else {
479 return Err(ClaudeError::Transport("stdin not set".to_string()));
480 }
481
482 Ok(())
483 }
484
485 /// Receive all messages as a stream (continuous)
486 ///
487 /// This method returns a stream that yields all messages from Claude
488 /// indefinitely until the stream is closed or an error occurs.
489 ///
490 /// Use this when you want to process all messages, including multiple
491 /// responses and system events.
492 ///
493 /// # Returns
494 ///
495 /// A stream of `Result<Message>` that continues until the connection closes.
496 ///
497 /// # Example
498 ///
499 /// ```no_run
500 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
501 /// # use futures::StreamExt;
502 /// # #[tokio::main]
503 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
504 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
505 /// # client.connect().await?;
506 /// # client.query("Hello").await?;
507 /// let mut stream = client.receive_messages();
508 /// while let Some(message) = stream.next().await {
509 /// println!("Received: {:?}", message?);
510 /// }
511 /// # Ok(())
512 /// # }
513 /// ```
514 pub fn receive_messages(&self) -> Pin<Box<dyn Stream<Item = Result<Message>> + Send + '_>> {
515 let query = match &self.query {
516 Some(q) => Arc::clone(q),
517 None => {
518 return Box::pin(futures::stream::once(async {
519 Err(ClaudeError::InvalidConfig(
520 "Client not connected. Call connect() first.".to_string(),
521 ))
522 }));
523 }
524 };
525
526 Box::pin(async_stream::stream! {
527 let rx: Arc<Mutex<tokio::sync::mpsc::UnboundedReceiver<serde_json::Value>>> = {
528 let query_guard = query.lock().await;
529 Arc::clone(&query_guard.message_rx)
530 };
531
532 loop {
533 let message = {
534 let mut rx_guard = rx.lock().await;
535 rx_guard.recv().await
536 };
537
538 match message {
539 Some(json) => {
540 match MessageParser::parse(json) {
541 Ok(msg) => yield Ok(msg),
542 Err(e) => {
543 eprintln!("Failed to parse message: {}", e);
544 yield Err(e);
545 }
546 }
547 }
548 None => break,
549 }
550 }
551 })
552 }
553
554 /// Receive messages until a ResultMessage
555 ///
556 /// This method returns a stream that yields messages until it encounters
557 /// a `ResultMessage`, which signals the completion of a Claude response.
558 ///
559 /// This is the most common pattern for handling Claude responses, as it
560 /// processes one complete "turn" of the conversation.
561 ///
562 /// # Returns
563 ///
564 /// A stream of `Result<Message>` that ends when a ResultMessage is received.
565 ///
566 /// # Example
567 ///
568 /// ```no_run
569 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions, Message};
570 /// # use futures::StreamExt;
571 /// # #[tokio::main]
572 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
573 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
574 /// # client.connect().await?;
575 /// # client.query("Hello").await?;
576 /// let mut stream = client.receive_response();
577 /// while let Some(message) = stream.next().await {
578 /// match message? {
579 /// Message::Assistant(msg) => println!("Assistant: {:?}", msg),
580 /// Message::Result(result) => {
581 /// println!("Done! Cost: ${:?}", result.total_cost_usd);
582 /// break;
583 /// }
584 /// _ => {}
585 /// }
586 /// }
587 /// # Ok(())
588 /// # }
589 /// ```
590 pub fn receive_response(&self) -> Pin<Box<dyn Stream<Item = Result<Message>> + Send + '_>> {
591 let query = match &self.query {
592 Some(q) => Arc::clone(q),
593 None => {
594 return Box::pin(futures::stream::once(async {
595 Err(ClaudeError::InvalidConfig(
596 "Client not connected. Call connect() first.".to_string(),
597 ))
598 }));
599 }
600 };
601
602 Box::pin(async_stream::stream! {
603 let rx: Arc<Mutex<tokio::sync::mpsc::UnboundedReceiver<serde_json::Value>>> = {
604 let query_guard = query.lock().await;
605 Arc::clone(&query_guard.message_rx)
606 };
607
608 loop {
609 let message = {
610 let mut rx_guard = rx.lock().await;
611 rx_guard.recv().await
612 };
613
614 match message {
615 Some(json) => {
616 match MessageParser::parse(json) {
617 Ok(msg) => {
618 let is_result = matches!(msg, Message::Result(_));
619 yield Ok(msg);
620 if is_result {
621 break;
622 }
623 }
624 Err(e) => {
625 eprintln!("Failed to parse message: {}", e);
626 yield Err(e);
627 }
628 }
629 }
630 None => break,
631 }
632 }
633 })
634 }
635
636 /// Send an interrupt signal to stop the current Claude operation
637 ///
638 /// This is analogous to Python's `client.interrupt()`.
639 ///
640 /// # Errors
641 ///
642 /// Returns an error if the client is not connected or if sending fails.
643 pub async fn interrupt(&self) -> Result<()> {
644 let query = self.query.as_ref().ok_or_else(|| {
645 ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
646 })?;
647
648 let query_guard = query.lock().await;
649 query_guard.interrupt().await
650 }
651
652 /// Change the permission mode dynamically
653 ///
654 /// This is analogous to Python's `client.set_permission_mode()`.
655 ///
656 /// # Arguments
657 ///
658 /// * `mode` - The new permission mode to set
659 ///
660 /// # Errors
661 ///
662 /// Returns an error if the client is not connected or if sending fails.
663 pub async fn set_permission_mode(&self, mode: PermissionMode) -> Result<()> {
664 let query = self.query.as_ref().ok_or_else(|| {
665 ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
666 })?;
667
668 let query_guard = query.lock().await;
669 query_guard.set_permission_mode(mode).await
670 }
671
672 /// Change the AI model dynamically
673 ///
674 /// This is analogous to Python's `client.set_model()`.
675 ///
676 /// # Arguments
677 ///
678 /// * `model` - The new model name, or None to use default
679 ///
680 /// # Errors
681 ///
682 /// Returns an error if the client is not connected or if sending fails.
683 pub async fn set_model(&self, model: Option<&str>) -> Result<()> {
684 let query = self.query.as_ref().ok_or_else(|| {
685 ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
686 })?;
687
688 let query_guard = query.lock().await;
689 query_guard.set_model(model).await
690 }
691
692 /// Rewind tracked files to their state at a specific user message.
693 ///
694 /// This is analogous to Python's `client.rewind_files()`.
695 ///
696 /// # Requirements
697 ///
698 /// - `enable_file_checkpointing=true` in options to track file changes
699 /// - `extra_args={"replay-user-messages": None}` to receive UserMessage
700 /// objects with `uuid` in the response stream
701 ///
702 /// # Arguments
703 ///
704 /// * `user_message_id` - UUID of the user message to rewind to. This should be
705 /// the `uuid` field from a `UserMessage` received during the conversation.
706 ///
707 /// # Errors
708 ///
709 /// Returns an error if the client is not connected or if sending fails.
710 ///
711 /// # Example
712 ///
713 /// ```no_run
714 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions, Message};
715 /// # use std::collections::HashMap;
716 /// # #[tokio::main]
717 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
718 /// let options = ClaudeAgentOptions::builder()
719 /// .enable_file_checkpointing(true)
720 /// .extra_args(HashMap::from([("replay-user-messages".to_string(), None)]))
721 /// .build();
722 /// let mut client = ClaudeClient::new(options);
723 /// client.connect().await?;
724 ///
725 /// client.query("Make some changes to my files").await?;
726 /// let mut checkpoint_id = None;
727 /// {
728 /// let mut stream = client.receive_response();
729 /// use futures::StreamExt;
730 /// while let Some(Ok(msg)) = stream.next().await {
731 /// if let Message::User(user_msg) = &msg {
732 /// if let Some(uuid) = &user_msg.uuid {
733 /// checkpoint_id = Some(uuid.clone());
734 /// }
735 /// }
736 /// }
737 /// }
738 ///
739 /// // Later, rewind to that point
740 /// if let Some(id) = checkpoint_id {
741 /// client.rewind_files(&id).await?;
742 /// }
743 /// # Ok(())
744 /// # }
745 /// ```
746 pub async fn rewind_files(&self, user_message_id: &str) -> Result<()> {
747 let query = self.query.as_ref().ok_or_else(|| {
748 ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
749 })?;
750
751 let query_guard = query.lock().await;
752 query_guard.rewind_files(user_message_id).await
753 }
754
755 /// Get server initialization info including available commands and output styles
756 ///
757 /// Returns initialization information from the Claude Code server including:
758 /// - Available commands (slash commands, system commands, etc.)
759 /// - Current and available output styles
760 /// - Server capabilities
761 ///
762 /// This is analogous to Python's `client.get_server_info()`.
763 ///
764 /// # Returns
765 ///
766 /// Dictionary with server info, or None if not connected
767 ///
768 /// # Example
769 ///
770 /// ```no_run
771 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
772 /// # #[tokio::main]
773 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
774 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
775 /// # client.connect().await?;
776 /// if let Some(info) = client.get_server_info().await {
777 /// println!("Commands available: {}", info.get("commands").map(|c| c.as_array().map(|a| a.len()).unwrap_or(0)).unwrap_or(0));
778 /// println!("Output style: {:?}", info.get("output_style"));
779 /// }
780 /// # Ok(())
781 /// # }
782 /// ```
783 pub async fn get_server_info(&self) -> Option<serde_json::Value> {
784 let query = self.query.as_ref()?;
785 let query_guard = query.lock().await;
786 query_guard.get_initialization_result().await
787 }
788
789 /// Start a new session by switching to a different session ID
790 ///
791 /// This is a convenience method that creates a new conversation context.
792 /// It's equivalent to calling `query_with_session()` with a new session ID.
793 ///
794 /// To completely clear memory and start fresh, use `ClaudeAgentOptions::builder().fork_session(true).build()`
795 /// when creating a new client.
796 ///
797 /// # Arguments
798 ///
799 /// * `session_id` - The new session ID to use
800 /// * `prompt` - Initial message for the new session
801 ///
802 /// # Errors
803 ///
804 /// Returns an error if the client is not connected or if sending fails.
805 ///
806 /// # Example
807 ///
808 /// ```no_run
809 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
810 /// # #[tokio::main]
811 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
812 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
813 /// # client.connect().await?;
814 /// // First conversation
815 /// client.query("Hello").await?;
816 ///
817 /// // Start new conversation with different context
818 /// client.new_session("session-2", "Tell me about Rust").await?;
819 /// # Ok(())
820 /// # }
821 /// ```
822 pub async fn new_session(
823 &mut self,
824 session_id: impl Into<String>,
825 prompt: impl Into<String>,
826 ) -> Result<()> {
827 self.query_with_session(prompt, session_id).await
828 }
829
830 /// Disconnect from Claude (analogous to Python's __aexit__)
831 ///
832 /// This cleanly shuts down the connection to Claude Code CLI.
833 ///
834 /// # Errors
835 ///
836 /// Returns an error if disconnection fails.
837 #[instrument(name = "claude.client.disconnect", skip(self))]
838 pub async fn disconnect(&mut self) -> Result<()> {
839 if !self.connected {
840 debug!("Client already disconnected");
841 return Ok(());
842 }
843
844 info!("Disconnecting from Claude Code CLI");
845
846 if let Some(query) = self.query.take() {
847 // Close stdin first (using direct access) to signal CLI to exit
848 // This will cause the background task to finish and release transport lock
849 let query_guard = query.lock().await;
850 if let Some(ref stdin_arc) = query_guard.stdin {
851 let mut stdin_guard = stdin_arc.lock().await;
852 if let Some(mut stdin_stream) = stdin_guard.take() {
853 let _ = stdin_stream.shutdown().await;
854 }
855 }
856 let transport = Arc::clone(&query_guard.transport);
857 drop(query_guard);
858
859 // Give background task a moment to finish reading and release lock
860 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
861
862 let mut transport_guard = transport.lock().await;
863 transport_guard.close().await?;
864 }
865
866 self.connected = false;
867 debug!("Disconnected successfully");
868 Ok(())
869 }
870}
871
872impl Drop for ClaudeClient {
873 fn drop(&mut self) {
874 // Note: We can't run async code in Drop, so we can't guarantee clean shutdown
875 // Users should call disconnect() explicitly
876 if self.connected {
877 eprintln!(
878 "Warning: ClaudeClient dropped without calling disconnect(). Resources may not be cleaned up properly."
879 );
880 }
881 }
882}
883
884#[cfg(test)]
885mod tests {
886 use super::*;
887 use crate::types::permissions::{PermissionResult, PermissionResultAllow};
888 use std::sync::Arc;
889
890 #[tokio::test]
891 async fn test_connect_rejects_can_use_tool_with_custom_permission_tool() {
892 let callback: crate::types::permissions::CanUseToolCallback =
893 Arc::new(|_tool_name, _tool_input, _context| {
894 Box::pin(async move { PermissionResult::Allow(PermissionResultAllow::default()) })
895 });
896
897 let opts = ClaudeAgentOptions::builder()
898 .can_use_tool(callback)
899 .permission_prompt_tool_name("custom_tool") // Not "stdio"
900 .build();
901
902 let mut client = ClaudeClient::new(opts);
903 let result = client.connect().await;
904
905 assert!(result.is_err());
906 let err = result.unwrap_err();
907 assert!(matches!(err, ClaudeError::InvalidConfig(_)));
908 assert!(err.to_string().contains("permission_prompt_tool_name"));
909 }
910
911 #[tokio::test]
912 async fn test_connect_accepts_can_use_tool_with_stdio() {
913 let callback: crate::types::permissions::CanUseToolCallback =
914 Arc::new(|_tool_name, _tool_input, _context| {
915 Box::pin(async move { PermissionResult::Allow(PermissionResultAllow::default()) })
916 });
917
918 let opts = ClaudeAgentOptions::builder()
919 .can_use_tool(callback)
920 .permission_prompt_tool_name("stdio") // Explicitly "stdio" is OK
921 .build();
922
923 let mut client = ClaudeClient::new(opts);
924 // This will fail later (CLI not found), but should pass validation
925 let result = client.connect().await;
926
927 // Should not be InvalidConfig error about permission_prompt_tool_name
928 if let Err(ref err) = result {
929 assert!(
930 !err.to_string().contains("permission_prompt_tool_name"),
931 "Should not fail on permission_prompt_tool_name validation"
932 );
933 }
934 }
935
936 #[tokio::test]
937 async fn test_connect_accepts_can_use_tool_without_permission_tool() {
938 let callback: crate::types::permissions::CanUseToolCallback =
939 Arc::new(|_tool_name, _tool_input, _context| {
940 Box::pin(async move { PermissionResult::Allow(PermissionResultAllow::default()) })
941 });
942
943 let opts = ClaudeAgentOptions::builder()
944 .can_use_tool(callback)
945 // No permission_prompt_tool_name set - defaults to stdio
946 .build();
947
948 let mut client = ClaudeClient::new(opts);
949 // This will fail later (CLI not found), but should pass validation
950 let result = client.connect().await;
951
952 // Should not be InvalidConfig error about permission_prompt_tool_name
953 if let Err(ref err) = result {
954 assert!(
955 !err.to_string().contains("permission_prompt_tool_name"),
956 "Should not fail on permission_prompt_tool_name validation"
957 );
958 }
959 }
960}