claude_code_agent_sdk/client.rs
1//! ClaudeClient for bidirectional streaming interactions with hook support
2
3use tracing::{debug, info, instrument};
4
5use futures::stream::Stream;
6use std::pin::Pin;
7use std::sync::Arc;
8use tokio::io::AsyncWriteExt;
9use tokio::sync::Mutex;
10
11use crate::errors::{ClaudeError, Result};
12use crate::internal::message_parser::MessageParser;
13use crate::internal::query_full::QueryFull;
14use crate::internal::transport::subprocess::QueryPrompt;
15use crate::internal::transport::{SubprocessTransport, Transport};
16use crate::types::config::{ClaudeAgentOptions, PermissionMode};
17use crate::types::hooks::HookEvent;
18use crate::types::messages::{Message, UserContentBlock};
19
20/// Client for bidirectional streaming interactions with Claude
21///
22/// This client provides the same functionality as Python's ClaudeSDKClient,
23/// supporting bidirectional communication, streaming responses, and dynamic
24/// control over the Claude session.
25///
26/// # Example
27///
28/// ```no_run
29/// use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
30/// use futures::StreamExt;
31///
32/// #[tokio::main]
33/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
34/// let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
35///
36/// // Connect to Claude
37/// client.connect().await?;
38///
39/// // Send a query
40/// client.query("Hello Claude!").await?;
41///
42/// // Receive response as a stream
43/// {
44/// let mut stream = client.receive_response();
45/// while let Some(message) = stream.next().await {
46/// println!("Received: {:?}", message?);
47/// }
48/// }
49///
50/// // Disconnect
51/// client.disconnect().await?;
52/// Ok(())
53/// }
54/// ```
55pub struct ClaudeClient {
56 options: ClaudeAgentOptions,
57 query: Option<Arc<Mutex<QueryFull>>>,
58 connected: bool,
59}
60
61impl ClaudeClient {
62 /// Create a new ClaudeClient
63 ///
64 /// # Arguments
65 ///
66 /// * `options` - Configuration options for the Claude client
67 ///
68 /// # Example
69 ///
70 /// ```no_run
71 /// use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
72 ///
73 /// let client = ClaudeClient::new(ClaudeAgentOptions::default());
74 /// ```
75 pub fn new(options: ClaudeAgentOptions) -> Self {
76 Self {
77 options,
78 query: None,
79 connected: false,
80 }
81 }
82
83 /// Create a new ClaudeClient with early validation
84 ///
85 /// Unlike `new()`, this validates the configuration eagerly by attempting
86 /// to create the transport. This catches issues like invalid working directory
87 /// or missing CLI before `connect()` is called.
88 ///
89 /// # Arguments
90 ///
91 /// * `options` - Configuration options for the Claude client
92 ///
93 /// # Errors
94 ///
95 /// Returns an error if:
96 /// - The working directory does not exist or is not a directory
97 /// - Claude CLI cannot be found
98 ///
99 /// # Example
100 ///
101 /// ```no_run
102 /// use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
103 ///
104 /// let client = ClaudeClient::try_new(ClaudeAgentOptions::default())?;
105 /// # Ok::<(), claude_agent_sdk_rs::ClaudeError>(())
106 /// ```
107 pub fn try_new(options: ClaudeAgentOptions) -> Result<Self> {
108 // Validate by attempting to create transport (but don't keep it)
109 let prompt = QueryPrompt::Streaming;
110 let _ = SubprocessTransport::new(prompt, options.clone())?;
111
112 Ok(Self {
113 options,
114 query: None,
115 connected: false,
116 })
117 }
118
119 /// Connect to Claude (analogous to Python's __aenter__)
120 ///
121 /// This establishes the connection to the Claude Code CLI and initializes
122 /// the bidirectional communication channel.
123 ///
124 /// # Errors
125 ///
126 /// Returns an error if:
127 /// - Claude CLI cannot be found or started
128 /// - The initialization handshake fails
129 /// - Hook registration fails
130 /// - `can_use_tool` callback is set with incompatible `permission_prompt_tool_name`
131 #[instrument(
132 name = "claude.client.connect",
133 skip(self),
134 fields(
135 has_can_use_tool = self.options.can_use_tool.is_some(),
136 has_hooks = self.options.hooks.is_some(),
137 model = %self.options.model.as_deref().unwrap_or("default"),
138 )
139 )]
140 pub async fn connect(&mut self) -> Result<()> {
141 if self.connected {
142 debug!("Client already connected, skipping");
143 return Ok(());
144 }
145
146 info!("Connecting to Claude Code CLI");
147
148 // Validate can_use_tool configuration (aligned with Python SDK behavior)
149 // When can_use_tool callback is set, permission_prompt_tool_name must be "stdio"
150 // to ensure the control protocol can handle permission requests
151 if self.options.can_use_tool.is_some()
152 && let Some(ref tool_name) = self.options.permission_prompt_tool_name
153 && tool_name != "stdio" {
154 return Err(ClaudeError::InvalidConfig(
155 "can_use_tool callback requires permission_prompt_tool_name to be 'stdio' or unset. \
156 Custom permission_prompt_tool_name is incompatible with can_use_tool callback."
157 .to_string(),
158 ));
159 }
160
161 // Create transport in streaming mode (no initial prompt)
162 let prompt = QueryPrompt::Streaming;
163 let mut transport = SubprocessTransport::new(prompt, self.options.clone())?;
164
165 // Don't send initial prompt - we'll use query() for that
166 transport.connect().await?;
167
168 // Extract stdin for direct access (avoids transport lock deadlock)
169 let stdin = Arc::clone(&transport.stdin);
170
171 // Create Query with hooks
172 let mut query = QueryFull::new(Box::new(transport));
173 query.set_stdin(stdin);
174
175 // Set control request timeout from options
176 query.set_control_request_timeout(self.options.control_request_timeout);
177
178 // Extract SDK MCP servers from options
179 let sdk_mcp_servers =
180 if let crate::types::mcp::McpServers::Dict(servers_dict) = &self.options.mcp_servers {
181 servers_dict
182 .iter()
183 .filter_map(|(name, config)| {
184 if let crate::types::mcp::McpServerConfig::Sdk(sdk_config) = config {
185 Some((name.clone(), sdk_config.clone()))
186 } else {
187 None
188 }
189 })
190 .collect()
191 } else {
192 std::collections::HashMap::new()
193 };
194 query.set_sdk_mcp_servers(sdk_mcp_servers).await;
195
196 // Set can_use_tool callback if provided
197 if let Some(ref callback) = self.options.can_use_tool {
198 query.set_can_use_tool(Some(Arc::clone(callback))).await;
199 }
200
201 // Convert hooks to internal format
202 let hooks = self.options.hooks.as_ref().map(|hooks_map| {
203 hooks_map
204 .iter()
205 .map(|(event, matchers)| {
206 let event_name = match event {
207 HookEvent::PreToolUse => "PreToolUse",
208 HookEvent::PostToolUse => "PostToolUse",
209 HookEvent::UserPromptSubmit => "UserPromptSubmit",
210 HookEvent::Stop => "Stop",
211 HookEvent::SubagentStop => "SubagentStop",
212 HookEvent::PreCompact => "PreCompact",
213 };
214 (event_name.to_string(), matchers.clone())
215 })
216 .collect()
217 });
218
219 // Start reading messages in background FIRST
220 // This must happen before initialize() because initialize()
221 // sends a control request and waits for response
222 query.start().await?;
223
224 // Initialize with hooks (sends control request)
225 query.initialize(hooks).await?;
226
227 self.query = Some(Arc::new(Mutex::new(query)));
228 self.connected = true;
229
230 info!("Successfully connected to Claude Code CLI");
231 Ok(())
232 }
233
234 /// Send a query to Claude
235 ///
236 /// This sends a new user prompt to Claude. Claude will remember the context
237 /// of previous queries within the same session.
238 ///
239 /// # Arguments
240 ///
241 /// * `prompt` - The user prompt to send
242 ///
243 /// # Errors
244 ///
245 /// Returns an error if the client is not connected or if sending fails.
246 ///
247 /// # Example
248 ///
249 /// ```no_run
250 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
251 /// # #[tokio::main]
252 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
253 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
254 /// # client.connect().await?;
255 /// client.query("What is 2 + 2?").await?;
256 /// # Ok(())
257 /// # }
258 /// ```
259 #[instrument(
260 name = "claude.client.query",
261 skip(self, prompt),
262 fields(session_id = "default",)
263 )]
264 pub async fn query(&mut self, prompt: impl Into<String>) -> Result<()> {
265 self.query_with_session(prompt, "default").await
266 }
267
268 /// Send a query to Claude with a specific session ID
269 ///
270 /// This sends a new user prompt to Claude. Different session IDs maintain
271 /// separate conversation contexts.
272 ///
273 /// # Arguments
274 ///
275 /// * `prompt` - The user prompt to send
276 /// * `session_id` - Session identifier for the conversation
277 ///
278 /// # Errors
279 ///
280 /// Returns an error if the client is not connected or if sending fails.
281 ///
282 /// # Example
283 ///
284 /// ```no_run
285 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
286 /// # #[tokio::main]
287 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
288 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
289 /// # client.connect().await?;
290 /// // Separate conversation contexts
291 /// client.query_with_session("First question", "session-1").await?;
292 /// client.query_with_session("Different question", "session-2").await?;
293 /// # Ok(())
294 /// # }
295 /// ```
296 pub async fn query_with_session(
297 &mut self,
298 prompt: impl Into<String>,
299 session_id: impl Into<String>,
300 ) -> Result<()> {
301 let query = self.query.as_ref().ok_or_else(|| {
302 ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
303 })?;
304
305 let prompt_str = prompt.into();
306 let session_id_str = session_id.into();
307
308 // Format as JSON message for stream-json input format
309 let user_message = serde_json::json!({
310 "type": "user",
311 "message": {
312 "role": "user",
313 "content": prompt_str
314 },
315 "session_id": session_id_str
316 });
317
318 let message_str = serde_json::to_string(&user_message).map_err(|e| {
319 ClaudeError::Transport(format!("Failed to serialize user message: {}", e))
320 })?;
321
322 // Write directly to stdin (bypasses transport lock)
323 let query_guard = query.lock().await;
324 let stdin = query_guard.stdin.clone();
325 drop(query_guard);
326
327 if let Some(stdin_arc) = stdin {
328 let mut stdin_guard = stdin_arc.lock().await;
329 if let Some(ref mut stdin_stream) = *stdin_guard {
330 stdin_stream
331 .write_all(message_str.as_bytes())
332 .await
333 .map_err(|e| ClaudeError::Transport(format!("Failed to write query: {}", e)))?;
334 stdin_stream.write_all(b"\n").await.map_err(|e| {
335 ClaudeError::Transport(format!("Failed to write newline: {}", e))
336 })?;
337 stdin_stream
338 .flush()
339 .await
340 .map_err(|e| ClaudeError::Transport(format!("Failed to flush: {}", e)))?;
341 } else {
342 return Err(ClaudeError::Transport("stdin not available".to_string()));
343 }
344 } else {
345 return Err(ClaudeError::Transport("stdin not set".to_string()));
346 }
347
348 Ok(())
349 }
350
351 /// Send a query with structured content blocks (supports images)
352 ///
353 /// This method enables multimodal queries in bidirectional streaming mode.
354 /// Use it to send images alongside text for vision-related tasks.
355 ///
356 /// # Arguments
357 ///
358 /// * `content` - A vector of content blocks (text and/or images)
359 ///
360 /// # Errors
361 ///
362 /// Returns an error if:
363 /// - The content vector is empty (must include at least one text or image block)
364 /// - The client is not connected (call `connect()` first)
365 /// - Sending the message fails
366 ///
367 /// # Example
368 ///
369 /// ```no_run
370 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions, UserContentBlock};
371 /// # #[tokio::main]
372 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
373 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
374 /// # client.connect().await?;
375 /// let base64_data = "iVBORw0KGgo..."; // base64 encoded image
376 /// client.query_with_content(vec![
377 /// UserContentBlock::text("What's in this image?"),
378 /// UserContentBlock::image_base64("image/png", base64_data)?,
379 /// ]).await?;
380 /// # Ok(())
381 /// # }
382 /// ```
383 pub async fn query_with_content(
384 &mut self,
385 content: impl Into<Vec<UserContentBlock>>,
386 ) -> Result<()> {
387 self.query_with_content_and_session(content, "default")
388 .await
389 }
390
391 /// Send a query with structured content blocks and a specific session ID
392 ///
393 /// This method enables multimodal queries with session management for
394 /// maintaining separate conversation contexts.
395 ///
396 /// # Arguments
397 ///
398 /// * `content` - A vector of content blocks (text and/or images)
399 /// * `session_id` - Session identifier for the conversation
400 ///
401 /// # Errors
402 ///
403 /// Returns an error if:
404 /// - The content vector is empty (must include at least one text or image block)
405 /// - The client is not connected (call `connect()` first)
406 /// - Sending the message fails
407 ///
408 /// # Example
409 ///
410 /// ```no_run
411 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions, UserContentBlock};
412 /// # #[tokio::main]
413 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
414 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
415 /// # client.connect().await?;
416 /// client.query_with_content_and_session(
417 /// vec![
418 /// UserContentBlock::text("Analyze this chart"),
419 /// UserContentBlock::image_url("https://example.com/chart.png"),
420 /// ],
421 /// "analysis-session",
422 /// ).await?;
423 /// # Ok(())
424 /// # }
425 /// ```
426 pub async fn query_with_content_and_session(
427 &mut self,
428 content: impl Into<Vec<UserContentBlock>>,
429 session_id: impl Into<String>,
430 ) -> Result<()> {
431 let query = self.query.as_ref().ok_or_else(|| {
432 ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
433 })?;
434
435 let content_blocks: Vec<UserContentBlock> = content.into();
436 UserContentBlock::validate_content(&content_blocks)?;
437
438 let session_id_str = session_id.into();
439
440 // Format as JSON message for stream-json input format
441 // Content is an array of content blocks, not a simple string
442 let user_message = serde_json::json!({
443 "type": "user",
444 "message": {
445 "role": "user",
446 "content": content_blocks
447 },
448 "session_id": session_id_str
449 });
450
451 let message_str = serde_json::to_string(&user_message).map_err(|e| {
452 ClaudeError::Transport(format!("Failed to serialize user message: {}", e))
453 })?;
454
455 // Write directly to stdin (bypasses transport lock)
456 let query_guard = query.lock().await;
457 let stdin = query_guard.stdin.clone();
458 drop(query_guard);
459
460 if let Some(stdin_arc) = stdin {
461 let mut stdin_guard = stdin_arc.lock().await;
462 if let Some(ref mut stdin_stream) = *stdin_guard {
463 stdin_stream
464 .write_all(message_str.as_bytes())
465 .await
466 .map_err(|e| ClaudeError::Transport(format!("Failed to write query: {}", e)))?;
467 stdin_stream.write_all(b"\n").await.map_err(|e| {
468 ClaudeError::Transport(format!("Failed to write newline: {}", e))
469 })?;
470 stdin_stream
471 .flush()
472 .await
473 .map_err(|e| ClaudeError::Transport(format!("Failed to flush: {}", e)))?;
474 } else {
475 return Err(ClaudeError::Transport("stdin not available".to_string()));
476 }
477 } else {
478 return Err(ClaudeError::Transport("stdin not set".to_string()));
479 }
480
481 Ok(())
482 }
483
484 /// Receive all messages as a stream (continuous)
485 ///
486 /// This method returns a stream that yields all messages from Claude
487 /// indefinitely until the stream is closed or an error occurs.
488 ///
489 /// Use this when you want to process all messages, including multiple
490 /// responses and system events.
491 ///
492 /// # Returns
493 ///
494 /// A stream of `Result<Message>` that continues until the connection closes.
495 ///
496 /// # Example
497 ///
498 /// ```no_run
499 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
500 /// # use futures::StreamExt;
501 /// # #[tokio::main]
502 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
503 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
504 /// # client.connect().await?;
505 /// # client.query("Hello").await?;
506 /// let mut stream = client.receive_messages();
507 /// while let Some(message) = stream.next().await {
508 /// println!("Received: {:?}", message?);
509 /// }
510 /// # Ok(())
511 /// # }
512 /// ```
513 pub fn receive_messages(&self) -> Pin<Box<dyn Stream<Item = Result<Message>> + Send + '_>> {
514 let query = match &self.query {
515 Some(q) => Arc::clone(q),
516 None => {
517 return Box::pin(futures::stream::once(async {
518 Err(ClaudeError::InvalidConfig(
519 "Client not connected. Call connect() first.".to_string(),
520 ))
521 }));
522 }
523 };
524
525 Box::pin(async_stream::stream! {
526 let rx: Arc<Mutex<tokio::sync::mpsc::UnboundedReceiver<serde_json::Value>>> = {
527 let query_guard = query.lock().await;
528 Arc::clone(&query_guard.message_rx)
529 };
530
531 loop {
532 let message = {
533 let mut rx_guard = rx.lock().await;
534 rx_guard.recv().await
535 };
536
537 match message {
538 Some(json) => {
539 match MessageParser::parse(json) {
540 Ok(msg) => yield Ok(msg),
541 Err(e) => {
542 eprintln!("Failed to parse message: {}", e);
543 yield Err(e);
544 }
545 }
546 }
547 None => break,
548 }
549 }
550 })
551 }
552
553 /// Receive messages until a ResultMessage
554 ///
555 /// This method returns a stream that yields messages until it encounters
556 /// a `ResultMessage`, which signals the completion of a Claude response.
557 ///
558 /// This is the most common pattern for handling Claude responses, as it
559 /// processes one complete "turn" of the conversation.
560 ///
561 /// # Returns
562 ///
563 /// A stream of `Result<Message>` that ends when a ResultMessage is received.
564 ///
565 /// # Example
566 ///
567 /// ```no_run
568 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions, Message};
569 /// # use futures::StreamExt;
570 /// # #[tokio::main]
571 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
572 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
573 /// # client.connect().await?;
574 /// # client.query("Hello").await?;
575 /// let mut stream = client.receive_response();
576 /// while let Some(message) = stream.next().await {
577 /// match message? {
578 /// Message::Assistant(msg) => println!("Assistant: {:?}", msg),
579 /// Message::Result(result) => {
580 /// println!("Done! Cost: ${:?}", result.total_cost_usd);
581 /// break;
582 /// }
583 /// _ => {}
584 /// }
585 /// }
586 /// # Ok(())
587 /// # }
588 /// ```
589 pub fn receive_response(&self) -> Pin<Box<dyn Stream<Item = Result<Message>> + Send + '_>> {
590 let query = match &self.query {
591 Some(q) => Arc::clone(q),
592 None => {
593 return Box::pin(futures::stream::once(async {
594 Err(ClaudeError::InvalidConfig(
595 "Client not connected. Call connect() first.".to_string(),
596 ))
597 }));
598 }
599 };
600
601 Box::pin(async_stream::stream! {
602 let rx: Arc<Mutex<tokio::sync::mpsc::UnboundedReceiver<serde_json::Value>>> = {
603 let query_guard = query.lock().await;
604 Arc::clone(&query_guard.message_rx)
605 };
606
607 loop {
608 let message = {
609 let mut rx_guard = rx.lock().await;
610 rx_guard.recv().await
611 };
612
613 match message {
614 Some(json) => {
615 match MessageParser::parse(json) {
616 Ok(msg) => {
617 let is_result = matches!(msg, Message::Result(_));
618 yield Ok(msg);
619 if is_result {
620 break;
621 }
622 }
623 Err(e) => {
624 eprintln!("Failed to parse message: {}", e);
625 yield Err(e);
626 }
627 }
628 }
629 None => break,
630 }
631 }
632 })
633 }
634
635 /// Send an interrupt signal to stop the current Claude operation
636 ///
637 /// This is analogous to Python's `client.interrupt()`.
638 ///
639 /// # Errors
640 ///
641 /// Returns an error if the client is not connected or if sending fails.
642 pub async fn interrupt(&self) -> Result<()> {
643 let query = self.query.as_ref().ok_or_else(|| {
644 ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
645 })?;
646
647 let query_guard = query.lock().await;
648 query_guard.interrupt().await
649 }
650
651 /// Change the permission mode dynamically
652 ///
653 /// This is analogous to Python's `client.set_permission_mode()`.
654 ///
655 /// # Arguments
656 ///
657 /// * `mode` - The new permission mode to set
658 ///
659 /// # Errors
660 ///
661 /// Returns an error if the client is not connected or if sending fails.
662 pub async fn set_permission_mode(&self, mode: PermissionMode) -> Result<()> {
663 let query = self.query.as_ref().ok_or_else(|| {
664 ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
665 })?;
666
667 let query_guard = query.lock().await;
668 query_guard.set_permission_mode(mode).await
669 }
670
671 /// Change the AI model dynamically
672 ///
673 /// This is analogous to Python's `client.set_model()`.
674 ///
675 /// # Arguments
676 ///
677 /// * `model` - The new model name, or None to use default
678 ///
679 /// # Errors
680 ///
681 /// Returns an error if the client is not connected or if sending fails.
682 pub async fn set_model(&self, model: Option<&str>) -> Result<()> {
683 let query = self.query.as_ref().ok_or_else(|| {
684 ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
685 })?;
686
687 let query_guard = query.lock().await;
688 query_guard.set_model(model).await
689 }
690
691 /// Rewind tracked files to their state at a specific user message.
692 ///
693 /// This is analogous to Python's `client.rewind_files()`.
694 ///
695 /// # Requirements
696 ///
697 /// - `enable_file_checkpointing=true` in options to track file changes
698 /// - `extra_args={"replay-user-messages": None}` to receive UserMessage
699 /// objects with `uuid` in the response stream
700 ///
701 /// # Arguments
702 ///
703 /// * `user_message_id` - UUID of the user message to rewind to. This should be
704 /// the `uuid` field from a `UserMessage` received during the conversation.
705 ///
706 /// # Errors
707 ///
708 /// Returns an error if the client is not connected or if sending fails.
709 ///
710 /// # Example
711 ///
712 /// ```no_run
713 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions, Message};
714 /// # use std::collections::HashMap;
715 /// # #[tokio::main]
716 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
717 /// let options = ClaudeAgentOptions::builder()
718 /// .enable_file_checkpointing(true)
719 /// .extra_args(HashMap::from([("replay-user-messages".to_string(), None)]))
720 /// .build();
721 /// let mut client = ClaudeClient::new(options);
722 /// client.connect().await?;
723 ///
724 /// client.query("Make some changes to my files").await?;
725 /// let mut checkpoint_id = None;
726 /// {
727 /// let mut stream = client.receive_response();
728 /// use futures::StreamExt;
729 /// while let Some(Ok(msg)) = stream.next().await {
730 /// if let Message::User(user_msg) = &msg {
731 /// if let Some(uuid) = &user_msg.uuid {
732 /// checkpoint_id = Some(uuid.clone());
733 /// }
734 /// }
735 /// }
736 /// }
737 ///
738 /// // Later, rewind to that point
739 /// if let Some(id) = checkpoint_id {
740 /// client.rewind_files(&id).await?;
741 /// }
742 /// # Ok(())
743 /// # }
744 /// ```
745 pub async fn rewind_files(&self, user_message_id: &str) -> Result<()> {
746 let query = self.query.as_ref().ok_or_else(|| {
747 ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
748 })?;
749
750 let query_guard = query.lock().await;
751 query_guard.rewind_files(user_message_id).await
752 }
753
754 /// Get server initialization info including available commands and output styles
755 ///
756 /// Returns initialization information from the Claude Code server including:
757 /// - Available commands (slash commands, system commands, etc.)
758 /// - Current and available output styles
759 /// - Server capabilities
760 ///
761 /// This is analogous to Python's `client.get_server_info()`.
762 ///
763 /// # Returns
764 ///
765 /// Dictionary with server info, or None if not connected
766 ///
767 /// # Example
768 ///
769 /// ```no_run
770 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
771 /// # #[tokio::main]
772 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
773 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
774 /// # client.connect().await?;
775 /// if let Some(info) = client.get_server_info().await {
776 /// println!("Commands available: {}", info.get("commands").map(|c| c.as_array().map(|a| a.len()).unwrap_or(0)).unwrap_or(0));
777 /// println!("Output style: {:?}", info.get("output_style"));
778 /// }
779 /// # Ok(())
780 /// # }
781 /// ```
782 pub async fn get_server_info(&self) -> Option<serde_json::Value> {
783 let query = self.query.as_ref()?;
784 let query_guard = query.lock().await;
785 query_guard.get_initialization_result().await
786 }
787
788 /// Start a new session by switching to a different session ID
789 ///
790 /// This is a convenience method that creates a new conversation context.
791 /// It's equivalent to calling `query_with_session()` with a new session ID.
792 ///
793 /// To completely clear memory and start fresh, use `ClaudeAgentOptions::builder().fork_session(true).build()`
794 /// when creating a new client.
795 ///
796 /// # Arguments
797 ///
798 /// * `session_id` - The new session ID to use
799 /// * `prompt` - Initial message for the new session
800 ///
801 /// # Errors
802 ///
803 /// Returns an error if the client is not connected or if sending fails.
804 ///
805 /// # Example
806 ///
807 /// ```no_run
808 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
809 /// # #[tokio::main]
810 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
811 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
812 /// # client.connect().await?;
813 /// // First conversation
814 /// client.query("Hello").await?;
815 ///
816 /// // Start new conversation with different context
817 /// client.new_session("session-2", "Tell me about Rust").await?;
818 /// # Ok(())
819 /// # }
820 /// ```
821 pub async fn new_session(
822 &mut self,
823 session_id: impl Into<String>,
824 prompt: impl Into<String>,
825 ) -> Result<()> {
826 self.query_with_session(prompt, session_id).await
827 }
828
829 /// Disconnect from Claude (analogous to Python's __aexit__)
830 ///
831 /// This cleanly shuts down the connection to Claude Code CLI.
832 ///
833 /// # Errors
834 ///
835 /// Returns an error if disconnection fails.
836 #[instrument(name = "claude.client.disconnect", skip(self))]
837 pub async fn disconnect(&mut self) -> Result<()> {
838 if !self.connected {
839 debug!("Client already disconnected");
840 return Ok(());
841 }
842
843 info!("Disconnecting from Claude Code CLI");
844
845 if let Some(query) = self.query.take() {
846 // Close stdin first (using direct access) to signal CLI to exit
847 // This will cause the background task to finish and release transport lock
848 let query_guard = query.lock().await;
849 if let Some(ref stdin_arc) = query_guard.stdin {
850 let mut stdin_guard = stdin_arc.lock().await;
851 if let Some(mut stdin_stream) = stdin_guard.take() {
852 let _ = stdin_stream.shutdown().await;
853 }
854 }
855 let transport = Arc::clone(&query_guard.transport);
856 drop(query_guard);
857
858 // Give background task a moment to finish reading and release lock
859 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
860
861 let mut transport_guard = transport.lock().await;
862 transport_guard.close().await?;
863 }
864
865 self.connected = false;
866 debug!("Disconnected successfully");
867 Ok(())
868 }
869}
870
871impl Drop for ClaudeClient {
872 fn drop(&mut self) {
873 // Note: We can't run async code in Drop, so we can't guarantee clean shutdown
874 // Users should call disconnect() explicitly
875 if self.connected {
876 eprintln!(
877 "Warning: ClaudeClient dropped without calling disconnect(). Resources may not be cleaned up properly."
878 );
879 }
880 }
881}
882
883#[cfg(test)]
884mod tests {
885 use super::*;
886 use crate::types::permissions::{PermissionResult, PermissionResultAllow};
887 use std::sync::Arc;
888
889 #[tokio::test]
890 async fn test_connect_rejects_can_use_tool_with_custom_permission_tool() {
891 let callback: crate::types::permissions::CanUseToolCallback =
892 Arc::new(|_tool_name, _tool_input, _context| {
893 Box::pin(async move { PermissionResult::Allow(PermissionResultAllow::default()) })
894 });
895
896 let opts = ClaudeAgentOptions::builder()
897 .can_use_tool(callback)
898 .permission_prompt_tool_name("custom_tool") // Not "stdio"
899 .build();
900
901 let mut client = ClaudeClient::new(opts);
902 let result = client.connect().await;
903
904 assert!(result.is_err());
905 let err = result.unwrap_err();
906 assert!(matches!(err, ClaudeError::InvalidConfig(_)));
907 assert!(err.to_string().contains("permission_prompt_tool_name"));
908 }
909
910 #[tokio::test]
911 async fn test_connect_accepts_can_use_tool_with_stdio() {
912 let callback: crate::types::permissions::CanUseToolCallback =
913 Arc::new(|_tool_name, _tool_input, _context| {
914 Box::pin(async move { PermissionResult::Allow(PermissionResultAllow::default()) })
915 });
916
917 let opts = ClaudeAgentOptions::builder()
918 .can_use_tool(callback)
919 .permission_prompt_tool_name("stdio") // Explicitly "stdio" is OK
920 .build();
921
922 let mut client = ClaudeClient::new(opts);
923 // This will fail later (CLI not found), but should pass validation
924 let result = client.connect().await;
925
926 // Should not be InvalidConfig error about permission_prompt_tool_name
927 if let Err(ref err) = result {
928 assert!(
929 !err.to_string().contains("permission_prompt_tool_name"),
930 "Should not fail on permission_prompt_tool_name validation"
931 );
932 }
933 }
934
935 #[tokio::test]
936 async fn test_connect_accepts_can_use_tool_without_permission_tool() {
937 let callback: crate::types::permissions::CanUseToolCallback =
938 Arc::new(|_tool_name, _tool_input, _context| {
939 Box::pin(async move { PermissionResult::Allow(PermissionResultAllow::default()) })
940 });
941
942 let opts = ClaudeAgentOptions::builder()
943 .can_use_tool(callback)
944 // No permission_prompt_tool_name set - defaults to stdio
945 .build();
946
947 let mut client = ClaudeClient::new(opts);
948 // This will fail later (CLI not found), but should pass validation
949 let result = client.connect().await;
950
951 // Should not be InvalidConfig error about permission_prompt_tool_name
952 if let Err(ref err) = result {
953 assert!(
954 !err.to_string().contains("permission_prompt_tool_name"),
955 "Should not fail on permission_prompt_tool_name validation"
956 );
957 }
958 }
959}