claude_code_agent_sdk/client.rs
1//! ClaudeClient for bidirectional streaming interactions with hook support
2
3use tracing::{debug, info, instrument};
4
5use futures::stream::Stream;
6use std::pin::Pin;
7use std::sync::Arc;
8use tokio::io::AsyncWriteExt;
9use tokio::sync::Mutex;
10
11use crate::errors::{ClaudeError, Result};
12use crate::internal::message_parser::MessageParser;
13use crate::internal::query_manager::QueryManager;
14use crate::internal::transport::subprocess::QueryPrompt;
15use crate::internal::transport::{SubprocessTransport, Transport};
16use crate::types::config::{ClaudeAgentOptions, PermissionMode};
17use crate::types::efficiency::{build_efficiency_hooks, merge_hooks};
18use crate::types::hooks::HookEvent;
19use crate::types::messages::{Message, UserContentBlock};
20
21/// Client for bidirectional streaming interactions with Claude
22///
23/// This client provides the same functionality as Python's ClaudeSDKClient,
24/// supporting bidirectional communication, streaming responses, and dynamic
25/// control over the Claude session.
26///
27/// This implementation uses the Codex-style architecture with isolated queries,
28/// where each query has its own completely independent QueryFull instance.
29/// This ensures complete message isolation and prevents ResultMessage confusion.
30///
31/// # Example
32///
33/// ```no_run
34/// use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
35/// use futures::StreamExt;
36///
37/// #[tokio::main]
38/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
39/// let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
40///
41/// // Connect to Claude
42/// client.connect().await?;
43///
44/// // Send a query
45/// client.query("Hello Claude!").await?;
46///
47/// // Receive response as a stream
48/// {
49/// let mut stream = client.receive_response();
50/// while let Some(message) = stream.next().await {
51/// println!("Received: {:?}", message?);
52/// }
53/// }
54///
55/// // Disconnect
56/// client.disconnect().await?;
57/// Ok(())
58/// }
59/// ```
60pub struct ClaudeClient {
61 options: ClaudeAgentOptions,
62 /// QueryManager for creating isolated queries (Codex-style architecture)
63 query_manager: Option<Arc<QueryManager>>,
64 /// Current query_id for the active prompt
65 current_query_id: Option<String>,
66 connected: bool,
67}
68
69impl ClaudeClient {
70 /// Create a new ClaudeClient
71 ///
72 /// # Arguments
73 ///
74 /// * `options` - Configuration options for the Claude client
75 ///
76 /// # Example
77 ///
78 /// ```no_run
79 /// use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
80 ///
81 /// let client = ClaudeClient::new(ClaudeAgentOptions::default());
82 /// ```
83 pub fn new(options: ClaudeAgentOptions) -> Self {
84 Self {
85 options,
86 query_manager: None,
87 current_query_id: None,
88 connected: false,
89 }
90 }
91
92 /// Create a new ClaudeClient with early validation
93 ///
94 /// Unlike `new()`, this validates the configuration eagerly by attempting
95 /// to create the transport. This catches issues like invalid working directory
96 /// or missing CLI before `connect()` is called.
97 ///
98 /// # Arguments
99 ///
100 /// * `options` - Configuration options for the Claude client
101 ///
102 /// # Errors
103 ///
104 /// Returns an error if:
105 /// - The working directory does not exist or is not a directory
106 /// - Claude CLI cannot be found
107 ///
108 /// # Example
109 ///
110 /// ```no_run
111 /// use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
112 ///
113 /// let client = ClaudeClient::try_new(ClaudeAgentOptions::default())?;
114 /// # Ok::<(), claude_agent_sdk_rs::ClaudeError>(())
115 /// ```
116 pub fn try_new(options: ClaudeAgentOptions) -> Result<Self> {
117 // Validate by attempting to create transport (but don't keep it)
118 let prompt = QueryPrompt::Streaming;
119 let _ = SubprocessTransport::new(prompt, options.clone())?;
120
121 Ok(Self {
122 options,
123 query_manager: None,
124 current_query_id: None,
125 connected: false,
126 })
127 }
128
129 /// Connect to Claude (analogous to Python's __aenter__)
130 ///
131 /// This establishes the connection to the Claude Code CLI and initializes
132 /// the bidirectional communication channel.
133 ///
134 /// Uses the Codex-style architecture with QueryManager for complete
135 /// message isolation between queries.
136 ///
137 /// # Errors
138 ///
139 /// Returns an error if:
140 /// - Claude CLI cannot be found or started
141 /// - The initialization handshake fails
142 /// - Hook registration fails
143 /// - `can_use_tool` callback is set with incompatible `permission_prompt_tool_name`
144 #[instrument(
145 name = "claude.client.connect",
146 skip(self),
147 fields(
148 has_can_use_tool = self.options.can_use_tool.is_some(),
149 has_hooks = self.options.hooks.is_some(),
150 model = %self.options.model.as_deref().unwrap_or("default"),
151 )
152 )]
153 pub async fn connect(&mut self) -> Result<()> {
154 if self.connected {
155 debug!("Client already connected, skipping");
156 return Ok(());
157 }
158
159 info!("Connecting to Claude Code CLI (using QueryManager for isolated queries)");
160
161 // Automatically set permission_prompt_tool_name to "stdio" when can_use_tool is provided
162 // This matches Python SDK behavior (client.py lines 106-122)
163 // which ensures CLI uses control protocol for permission prompts
164 let mut options = self.options.clone();
165 if options.can_use_tool.is_some() && options.permission_prompt_tool_name.is_none() {
166 info!("can_use_tool callback is set, automatically setting permission_prompt_tool_name to 'stdio'");
167 options.permission_prompt_tool_name = Some("stdio".to_string());
168 }
169
170 // Validate can_use_tool configuration (aligned with Python SDK behavior)
171 // When can_use_tool callback is set, permission_prompt_tool_name must be "stdio"
172 // to ensure the control protocol can handle permission requests
173 if options.can_use_tool.is_some()
174 && let Some(ref tool_name) = options.permission_prompt_tool_name
175 && tool_name != "stdio"
176 {
177 return Err(ClaudeError::InvalidConfig(
178 "can_use_tool callback requires permission_prompt_tool_name to be 'stdio' or unset. \
179 Custom permission_prompt_tool_name is incompatible with can_use_tool callback."
180 .to_string(),
181 ));
182 }
183
184 // Prepare hooks for initialization
185 // Build efficiency hooks if configured
186 let efficiency_hooks = self
187 .options
188 .efficiency
189 .as_ref()
190 .map(build_efficiency_hooks)
191 .unwrap_or_default();
192
193 // Merge user hooks with efficiency hooks
194 let merged_hooks = merge_hooks(self.options.hooks.clone(), efficiency_hooks);
195
196 // Convert hooks to internal format
197 let hooks = merged_hooks.as_ref().map(|hooks_map| {
198 hooks_map
199 .iter()
200 .map(|(event, matchers)| {
201 let event_name = match event {
202 HookEvent::PreToolUse => "PreToolUse",
203 HookEvent::PostToolUse => "PostToolUse",
204 HookEvent::UserPromptSubmit => "UserPromptSubmit",
205 HookEvent::Stop => "Stop",
206 HookEvent::SubagentStop => "SubagentStop",
207 HookEvent::PreCompact => "PreCompact",
208 };
209 (event_name.to_string(), matchers.clone())
210 })
211 .collect()
212 });
213
214 // Extract SDK MCP servers from options
215 let sdk_mcp_servers =
216 if let crate::types::mcp::McpServers::Dict(servers_dict) = &self.options.mcp_servers {
217 servers_dict
218 .iter()
219 .filter_map(|(name, config)| {
220 if let crate::types::mcp::McpServerConfig::Sdk(sdk_config) = config {
221 Some((name.clone(), sdk_config.clone()))
222 } else {
223 None
224 }
225 })
226 .collect()
227 } else {
228 std::collections::HashMap::new()
229 };
230
231 // Clone options for the transport factory
232 let options_clone = options.clone();
233
234 // Create QueryManager with a transport factory
235 // The factory creates new transports for each isolated query
236 // Note: connect() is called later in create_query(), not here
237 let mut query_manager = QueryManager::new(move || {
238 let prompt = QueryPrompt::Streaming;
239 let transport = SubprocessTransport::new(prompt, options_clone.clone())?;
240 Ok(Box::new(transport) as Box<dyn Transport>)
241 });
242
243 // Set control request timeout
244 query_manager.set_control_request_timeout(self.options.control_request_timeout);
245
246 // Set configuration on QueryManager
247 query_manager.set_hooks(hooks).await;
248 query_manager.set_sdk_mcp_servers(sdk_mcp_servers).await;
249 query_manager.set_can_use_tool(self.options.can_use_tool.clone()).await;
250
251 let query_manager = Arc::new(query_manager);
252
253 // Create the first query for initialization
254 let first_query_id = query_manager.create_query().await?;
255
256 // Start cleanup task for inactive queries
257 Arc::clone(&query_manager).start_cleanup_task();
258
259 self.query_manager = Some(query_manager);
260 self.current_query_id = Some(first_query_id);
261 self.connected = true;
262
263 info!("Successfully connected to Claude Code CLI with QueryManager");
264 Ok(())
265 }
266
267 /// Send a query to Claude
268 ///
269 /// This sends a new user prompt to Claude. Claude will remember the context
270 /// of previous queries within the same session.
271 ///
272 /// # Arguments
273 ///
274 /// * `prompt` - The user prompt to send
275 ///
276 /// # Errors
277 ///
278 /// Returns an error if the client is not connected or if sending fails.
279 ///
280 /// # Example
281 ///
282 /// ```no_run
283 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
284 /// # #[tokio::main]
285 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
286 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
287 /// # client.connect().await?;
288 /// client.query("What is 2 + 2?").await?;
289 /// # Ok(())
290 /// # }
291 /// ```
292 #[instrument(
293 name = "claude.client.query",
294 skip(self, prompt),
295 fields(session_id = "default",)
296 )]
297 pub async fn query(&mut self, prompt: impl Into<String>) -> Result<()> {
298 self.query_with_session(prompt, "default").await
299 }
300
301 /// Send a query to Claude with a specific session ID
302 ///
303 /// This sends a new user prompt to Claude. Different session IDs maintain
304 /// separate conversation contexts.
305 ///
306 /// # Arguments
307 ///
308 /// * `prompt` - The user prompt to send
309 /// * `session_id` - Session identifier for the conversation
310 ///
311 /// # Errors
312 ///
313 /// Returns an error if the client is not connected or if sending fails.
314 ///
315 /// # Example
316 ///
317 /// ```no_run
318 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
319 /// # #[tokio::main]
320 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
321 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
322 /// # client.connect().await?;
323 /// // Separate conversation contexts
324 /// client.query_with_session("First question", "session-1").await?;
325 /// client.query_with_session("Different question", "session-2").await?;
326 /// # Ok(())
327 /// # }
328 /// ```
329 pub async fn query_with_session(
330 &mut self,
331 prompt: impl Into<String>,
332 session_id: impl Into<String>,
333 ) -> Result<()> {
334 let query_manager = self.query_manager.as_ref().ok_or_else(|| {
335 ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
336 })?;
337
338 let prompt_str = prompt.into();
339 let session_id_str = session_id.into();
340
341 // Create a new isolated query for each prompt (Codex-style architecture)
342 // This ensures complete message isolation between prompts
343 let query_id = query_manager.create_query().await?;
344 self.current_query_id = Some(query_id.clone());
345
346 // Get the isolated query
347 let query = query_manager.get_query(&query_id)?;
348
349 // Format as JSON message for stream-json input format
350 let user_message = serde_json::json!({
351 "type": "user",
352 "message": {
353 "role": "user",
354 "content": prompt_str
355 },
356 "session_id": session_id_str
357 });
358
359 let message_str = serde_json::to_string(&user_message).map_err(|e| {
360 ClaudeError::Transport(format!("Failed to serialize user message: {}", e))
361 })?;
362
363 // Write directly to stdin (bypasses transport lock)
364 let stdin = query.stdin.clone();
365
366 if let Some(stdin_arc) = stdin {
367 let mut stdin_guard = stdin_arc.lock().await;
368 if let Some(ref mut stdin_stream) = *stdin_guard {
369 stdin_stream
370 .write_all(message_str.as_bytes())
371 .await
372 .map_err(|e| ClaudeError::Transport(format!("Failed to write query: {}", e)))?;
373 stdin_stream.write_all(b"\n").await.map_err(|e| {
374 ClaudeError::Transport(format!("Failed to write newline: {}", e))
375 })?;
376 stdin_stream
377 .flush()
378 .await
379 .map_err(|e| ClaudeError::Transport(format!("Failed to flush: {}", e)))?;
380 } else {
381 return Err(ClaudeError::Transport("stdin not available".to_string()));
382 }
383 } else {
384 return Err(ClaudeError::Transport("stdin not set".to_string()));
385 }
386
387 debug!(
388 query_id = %query_id,
389 session_id = %session_id_str,
390 "Sent query to isolated query"
391 );
392
393 Ok(())
394 }
395
396 /// Send a query with structured content blocks (supports images)
397 ///
398 /// This method enables multimodal queries in bidirectional streaming mode.
399 /// Use it to send images alongside text for vision-related tasks.
400 ///
401 /// # Arguments
402 ///
403 /// * `content` - A vector of content blocks (text and/or images)
404 ///
405 /// # Errors
406 ///
407 /// Returns an error if:
408 /// - The content vector is empty (must include at least one text or image block)
409 /// - The client is not connected (call `connect()` first)
410 /// - Sending the message fails
411 ///
412 /// # Example
413 ///
414 /// ```no_run
415 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions, UserContentBlock};
416 /// # #[tokio::main]
417 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
418 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
419 /// # client.connect().await?;
420 /// let base64_data = "iVBORw0KGgo..."; // base64 encoded image
421 /// client.query_with_content(vec![
422 /// UserContentBlock::text("What's in this image?"),
423 /// UserContentBlock::image_base64("image/png", base64_data)?,
424 /// ]).await?;
425 /// # Ok(())
426 /// # }
427 /// ```
428 pub async fn query_with_content(
429 &mut self,
430 content: impl Into<Vec<UserContentBlock>>,
431 ) -> Result<()> {
432 self.query_with_content_and_session(content, "default")
433 .await
434 }
435
436 /// Send a query with structured content blocks and a specific session ID
437 ///
438 /// This method enables multimodal queries with session management for
439 /// maintaining separate conversation contexts.
440 ///
441 /// # Arguments
442 ///
443 /// * `content` - A vector of content blocks (text and/or images)
444 /// * `session_id` - Session identifier for the conversation
445 ///
446 /// # Errors
447 ///
448 /// Returns an error if:
449 /// - The content vector is empty (must include at least one text or image block)
450 /// - The client is not connected (call `connect()` first)
451 /// - Sending the message fails
452 ///
453 /// # Example
454 ///
455 /// ```no_run
456 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions, UserContentBlock};
457 /// # #[tokio::main]
458 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
459 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
460 /// # client.connect().await?;
461 /// client.query_with_content_and_session(
462 /// vec![
463 /// UserContentBlock::text("Analyze this chart"),
464 /// UserContentBlock::image_url("https://example.com/chart.png"),
465 /// ],
466 /// "analysis-session",
467 /// ).await?;
468 /// # Ok(())
469 /// # }
470 /// ```
471 pub async fn query_with_content_and_session(
472 &mut self,
473 content: impl Into<Vec<UserContentBlock>>,
474 session_id: impl Into<String>,
475 ) -> Result<()> {
476 let query_manager = self.query_manager.as_ref().ok_or_else(|| {
477 ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
478 })?;
479
480 let content_blocks: Vec<UserContentBlock> = content.into();
481 UserContentBlock::validate_content(&content_blocks)?;
482
483 let session_id_str = session_id.into();
484
485 // Create a new isolated query for each prompt (Codex-style architecture)
486 // This ensures complete message isolation between prompts
487 let query_id = query_manager.create_query().await?;
488 self.current_query_id = Some(query_id.clone());
489
490 // Get the isolated query
491 let query = query_manager.get_query(&query_id)?;
492
493 // Format as JSON message for stream-json input format
494 // Content is an array of content blocks, not a simple string
495 let user_message = serde_json::json!({
496 "type": "user",
497 "message": {
498 "role": "user",
499 "content": content_blocks
500 },
501 "session_id": session_id_str
502 });
503
504 let message_str = serde_json::to_string(&user_message).map_err(|e| {
505 ClaudeError::Transport(format!("Failed to serialize user message: {}", e))
506 })?;
507
508 // Write directly to stdin (bypasses transport lock)
509 let stdin = query.stdin.clone();
510
511 if let Some(stdin_arc) = stdin {
512 let mut stdin_guard = stdin_arc.lock().await;
513 if let Some(ref mut stdin_stream) = *stdin_guard {
514 stdin_stream
515 .write_all(message_str.as_bytes())
516 .await
517 .map_err(|e| ClaudeError::Transport(format!("Failed to write query: {}", e)))?;
518 stdin_stream.write_all(b"\n").await.map_err(|e| {
519 ClaudeError::Transport(format!("Failed to write newline: {}", e))
520 })?;
521 stdin_stream
522 .flush()
523 .await
524 .map_err(|e| ClaudeError::Transport(format!("Failed to flush: {}", e)))?;
525 } else {
526 return Err(ClaudeError::Transport("stdin not available".to_string()));
527 }
528 } else {
529 return Err(ClaudeError::Transport("stdin not set".to_string()));
530 }
531
532 debug!(
533 query_id = %query_id,
534 session_id = %session_id_str,
535 "Sent content query to isolated query"
536 );
537
538 Ok(())
539 }
540
541 /// Receive all messages as a stream (continuous)
542 ///
543 /// This method returns a stream that yields all messages from Claude
544 /// indefinitely until the stream is closed or an error occurs.
545 ///
546 /// Use this when you want to process all messages, including multiple
547 /// responses and system events.
548 ///
549 /// # Returns
550 ///
551 /// A stream of `Result<Message>` that continues until the connection closes.
552 ///
553 /// # Example
554 ///
555 /// ```no_run
556 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
557 /// # use futures::StreamExt;
558 /// # #[tokio::main]
559 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
560 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
561 /// # client.connect().await?;
562 /// # client.query("Hello").await?;
563 /// let mut stream = client.receive_messages();
564 /// while let Some(message) = stream.next().await {
565 /// println!("Received: {:?}", message?);
566 /// }
567 /// # Ok(())
568 /// # }
569 /// ```
570 pub fn receive_messages(&self) -> Pin<Box<dyn Stream<Item = Result<Message>> + Send + '_>> {
571 let query_manager = match &self.query_manager {
572 Some(qm) => Arc::clone(qm),
573 None => {
574 return Box::pin(futures::stream::once(async {
575 Err(ClaudeError::InvalidConfig(
576 "Client not connected. Call connect() first.".to_string(),
577 ))
578 }));
579 }
580 };
581
582 let query_id = match &self.current_query_id {
583 Some(id) => id.clone(),
584 None => {
585 return Box::pin(futures::stream::once(async {
586 Err(ClaudeError::InvalidConfig(
587 "No active query. Call query() first.".to_string(),
588 ))
589 }));
590 }
591 };
592
593 Box::pin(async_stream::stream! {
594 // Get the isolated query and its message receiver
595 let query = match query_manager.get_query(&query_id) {
596 Ok(q) => q,
597 Err(e) => {
598 yield Err(e);
599 return;
600 }
601 };
602
603 let rx: Arc<Mutex<tokio::sync::mpsc::UnboundedReceiver<serde_json::Value>>> = {
604 Arc::clone(&query.message_rx)
605 };
606
607 loop {
608 let message = {
609 let mut rx_guard = rx.lock().await;
610 rx_guard.recv().await
611 };
612
613 match message {
614 Some(json) => {
615 match MessageParser::parse(json) {
616 Ok(msg) => yield Ok(msg),
617 Err(e) => {
618 eprintln!("Failed to parse message: {}", e);
619 yield Err(e);
620 }
621 }
622 }
623 None => break,
624 }
625 }
626 })
627 }
628
629 /// Receive messages until a ResultMessage
630 ///
631 /// This method returns a stream that yields messages until it encounters
632 /// a `ResultMessage`, which signals the completion of a Claude response.
633 ///
634 /// This is the most common pattern for handling Claude responses, as it
635 /// processes one complete "turn" of the conversation.
636 ///
637 /// This method uses query-scoped message channels to ensure message isolation,
638 /// preventing late-arriving ResultMessages from being consumed by the wrong prompt.
639 ///
640 /// # Returns
641 ///
642 /// A stream of `Result<Message>` that ends when a ResultMessage is received.
643 ///
644 /// # Example
645 ///
646 /// ```no_run
647 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions, Message};
648 /// # use futures::StreamExt;
649 /// # #[tokio::main]
650 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
651 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
652 /// # client.connect().await?;
653 /// # client.query("Hello").await?;
654 /// let mut stream = client.receive_response();
655 /// while let Some(message) = stream.next().await {
656 /// match message? {
657 /// Message::Assistant(msg) => println!("Assistant: {:?}", msg),
658 /// Message::Result(result) => {
659 /// println!("Done! Cost: ${:?}", result.total_cost_usd);
660 /// break;
661 /// }
662 /// _ => {}
663 /// }
664 /// }
665 /// # Ok(())
666 /// # }
667 /// ```
668 pub fn receive_response(&self) -> Pin<Box<dyn Stream<Item = Result<Message>> + Send + '_>> {
669 let query_manager = match &self.query_manager {
670 Some(qm) => Arc::clone(qm),
671 None => {
672 return Box::pin(futures::stream::once(async {
673 Err(ClaudeError::InvalidConfig(
674 "Client not connected. Call connect() first.".to_string(),
675 ))
676 }));
677 }
678 };
679
680 let query_id = match &self.current_query_id {
681 Some(id) => id.clone(),
682 None => {
683 return Box::pin(futures::stream::once(async {
684 Err(ClaudeError::InvalidConfig(
685 "No active query. Call query() first.".to_string(),
686 ))
687 }));
688 }
689 };
690
691 Box::pin(async_stream::stream! {
692 // ====================================================================
693 // ISOLATED QUERY MESSAGE CHANNEL (Codex-style)
694 // ====================================================================
695 // In the Codex-style architecture, each query has its own completely
696 // isolated QueryFull instance. We get the message receiver directly
697 // from the isolated query, eliminating the need for routing logic.
698 //
699 // This provides:
700 // - Complete message isolation
701 // - No possibility of message confusion
702 // - Simpler architecture without routing overhead
703 //
704 // Note: Cleanup is handled by the periodic cleanup task in QueryManager,
705 // which removes inactive queries whose channels have been closed.
706
707 debug!(
708 query_id = %query_id,
709 "Getting message receiver from isolated query"
710 );
711
712 // Get the isolated query and its message receiver
713 let query = match query_manager.get_query(&query_id) {
714 Ok(q) => q,
715 Err(e) => {
716 yield Err(e);
717 return;
718 }
719 };
720
721 // Get the message receiver from the isolated query
722 // In isolated mode, we directly access the message_rx without routing
723 let rx: Arc<Mutex<tokio::sync::mpsc::UnboundedReceiver<serde_json::Value>>> = {
724 Arc::clone(&query.message_rx)
725 };
726
727 loop {
728 let message = {
729 let mut rx_guard = rx.lock().await;
730 rx_guard.recv().await
731 };
732
733 match message {
734 Some(json) => {
735 match MessageParser::parse(json) {
736 Ok(msg) => {
737 let is_result = matches!(msg, Message::Result(_));
738 yield Ok(msg);
739 if is_result {
740 debug!(
741 query_id = %query_id,
742 "Received ResultMessage, ending stream"
743 );
744 // Cleanup will be handled by the periodic cleanup task
745 // when the query becomes inactive
746 break;
747 }
748 }
749 Err(e) => {
750 eprintln!("Failed to parse message: {}", e);
751 yield Err(e);
752 }
753 }
754 }
755 None => {
756 debug!(
757 query_id = %query_id,
758 "Isolated query channel closed"
759 );
760 // Cleanup will be handled by the periodic cleanup task
761 break;
762 }
763 }
764 }
765 })
766 }
767
768 /// Drain any leftover messages from the previous prompt
769 ///
770 /// This method removes any messages remaining in the channel from a previous
771 /// prompt. This should be called before starting a new prompt to ensure
772 /// that the new prompt doesn't receive stale messages.
773 ///
774 /// This is important when prompts are cancelled or end unexpectedly,
775 /// as there may be buffered messages that would otherwise be received
776 /// by the next prompt.
777 ///
778 /// # Returns
779 ///
780 /// The number of messages drained from the channel.
781 ///
782 /// # Example
783 ///
784 /// ```no_run
785 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
786 /// # #[tokio::main]
787 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
788 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
789 /// # client.connect().await?;
790 /// // Before starting a new prompt, drain any leftover messages
791 /// let drained = client.drain_messages().await;
792 /// if drained > 0 {
793 /// eprintln!("Drained {} leftover messages from previous prompt", drained);
794 /// }
795 /// # Ok(())
796 /// # }
797 /// ```
798 pub async fn drain_messages(&self) -> usize {
799 let Some(query_manager) = &self.query_manager else {
800 return 0;
801 };
802
803 let Some(query_id) = &self.current_query_id else {
804 return 0;
805 };
806
807 let Ok(query) = query_manager.get_query(query_id) else {
808 return 0;
809 };
810
811 let rx: Arc<Mutex<tokio::sync::mpsc::UnboundedReceiver<serde_json::Value>>> = {
812 Arc::clone(&query.message_rx)
813 };
814
815 let mut count = 0;
816 // Use try_recv to drain all currently available messages without blocking
817 loop {
818 let mut rx_guard = rx.lock().await;
819 match rx_guard.try_recv() {
820 Ok(_) => count += 1,
821 Err(tokio::sync::mpsc::error::TryRecvError::Empty) => break,
822 Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => break,
823 }
824 }
825
826 if count > 0 {
827 debug!(count, "Drained leftover messages from previous prompt");
828 }
829
830 count
831 }
832
833 /// Send an interrupt signal to stop the current Claude operation
834 ///
835 /// This is analogous to Python's `client.interrupt()`.
836 ///
837 /// # Errors
838 ///
839 /// Returns an error if the client is not connected or if sending fails.
840 pub async fn interrupt(&self) -> Result<()> {
841 let query_manager = self.query_manager.as_ref().ok_or_else(|| {
842 ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
843 })?;
844
845 let query_id = self.current_query_id.as_ref().ok_or_else(|| {
846 ClaudeError::InvalidConfig("No active query. Call query() first.".to_string())
847 })?;
848
849 let query = query_manager.get_query(query_id)?;
850 query.interrupt().await
851 }
852
853 /// Change the permission mode dynamically
854 ///
855 /// This is analogous to Python's `client.set_permission_mode()`.
856 ///
857 /// # Arguments
858 ///
859 /// * `mode` - The new permission mode to set
860 ///
861 /// # Errors
862 ///
863 /// Returns an error if the client is not connected or if sending fails.
864 pub async fn set_permission_mode(&self, mode: PermissionMode) -> Result<()> {
865 let query_manager = self.query_manager.as_ref().ok_or_else(|| {
866 ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
867 })?;
868
869 let query_id = self.current_query_id.as_ref().ok_or_else(|| {
870 ClaudeError::InvalidConfig("No active query. Call query() first.".to_string())
871 })?;
872
873 let query = query_manager.get_query(query_id)?;
874 query.set_permission_mode(mode).await
875 }
876
877 /// Change the AI model dynamically
878 ///
879 /// This is analogous to Python's `client.set_model()`.
880 ///
881 /// # Arguments
882 ///
883 /// * `model` - The new model name, or None to use default
884 ///
885 /// # Errors
886 ///
887 /// Returns an error if the client is not connected or if sending fails.
888 pub async fn set_model(&self, model: Option<&str>) -> Result<()> {
889 let query_manager = self.query_manager.as_ref().ok_or_else(|| {
890 ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
891 })?;
892
893 let query_id = self.current_query_id.as_ref().ok_or_else(|| {
894 ClaudeError::InvalidConfig("No active query. Call query() first.".to_string())
895 })?;
896
897 let query = query_manager.get_query(query_id)?;
898 query.set_model(model).await
899 }
900
901 /// Rewind tracked files to their state at a specific user message.
902 ///
903 /// This is analogous to Python's `client.rewind_files()`.
904 ///
905 /// # Requirements
906 ///
907 /// - `enable_file_checkpointing=true` in options to track file changes
908 /// - `extra_args={"replay-user-messages": None}` to receive UserMessage
909 /// objects with `uuid` in the response stream
910 ///
911 /// # Arguments
912 ///
913 /// * `user_message_id` - UUID of the user message to rewind to. This should be
914 /// the `uuid` field from a `UserMessage` received during the conversation.
915 ///
916 /// # Errors
917 ///
918 /// Returns an error if the client is not connected or if sending fails.
919 ///
920 /// # Example
921 ///
922 /// ```no_run
923 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions, Message};
924 /// # use std::collections::HashMap;
925 /// # #[tokio::main]
926 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
927 /// let options = ClaudeAgentOptions::builder()
928 /// .enable_file_checkpointing(true)
929 /// .extra_args(HashMap::from([("replay-user-messages".to_string(), None)]))
930 /// .build();
931 /// let mut client = ClaudeClient::new(options);
932 /// client.connect().await?;
933 ///
934 /// client.query("Make some changes to my files").await?;
935 /// let mut checkpoint_id = None;
936 /// {
937 /// let mut stream = client.receive_response();
938 /// use futures::StreamExt;
939 /// while let Some(Ok(msg)) = stream.next().await {
940 /// if let Message::User(user_msg) = &msg {
941 /// if let Some(uuid) = &user_msg.uuid {
942 /// checkpoint_id = Some(uuid.clone());
943 /// }
944 /// }
945 /// }
946 /// }
947 ///
948 /// // Later, rewind to that point
949 /// if let Some(id) = checkpoint_id {
950 /// client.rewind_files(&id).await?;
951 /// }
952 /// # Ok(())
953 /// # }
954 /// ```
955 pub async fn rewind_files(&self, user_message_id: &str) -> Result<()> {
956 let query_manager = self.query_manager.as_ref().ok_or_else(|| {
957 ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
958 })?;
959
960 let query_id = self.current_query_id.as_ref().ok_or_else(|| {
961 ClaudeError::InvalidConfig("No active query. Call query() first.".to_string())
962 })?;
963
964 let query = query_manager.get_query(query_id)?;
965 query.rewind_files(user_message_id).await
966 }
967
968 /// Get server initialization info including available commands and output styles
969 ///
970 /// Returns initialization information from the Claude Code server including:
971 /// - Available commands (slash commands, system commands, etc.)
972 /// - Current and available output styles
973 /// - Server capabilities
974 ///
975 /// This is analogous to Python's `client.get_server_info()`.
976 ///
977 /// # Returns
978 ///
979 /// Dictionary with server info, or None if not connected
980 ///
981 /// # Example
982 ///
983 /// ```no_run
984 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
985 /// # #[tokio::main]
986 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
987 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
988 /// # client.connect().await?;
989 /// if let Some(info) = client.get_server_info().await {
990 /// println!("Commands available: {}", info.get("commands").map(|c| c.as_array().map(|a| a.len()).unwrap_or(0)).unwrap_or(0));
991 /// println!("Output style: {:?}", info.get("output_style"));
992 /// }
993 /// # Ok(())
994 /// # }
995 /// ```
996 pub async fn get_server_info(&self) -> Option<serde_json::Value> {
997 let query_manager = self.query_manager.as_ref()?;
998 let query_id = self.current_query_id.as_ref()?;
999 let Ok(query) = query_manager.get_query(query_id) else {
1000 return None;
1001 };
1002 query.get_initialization_result().await
1003 }
1004
1005 /// Start a new session by switching to a different session ID
1006 ///
1007 /// This is a convenience method that creates a new conversation context.
1008 /// It's equivalent to calling `query_with_session()` with a new session ID.
1009 ///
1010 /// To completely clear memory and start fresh, use `ClaudeAgentOptions::builder().fork_session(true).build()`
1011 /// when creating a new client.
1012 ///
1013 /// # Arguments
1014 ///
1015 /// * `session_id` - The new session ID to use
1016 /// * `prompt` - Initial message for the new session
1017 ///
1018 /// # Errors
1019 ///
1020 /// Returns an error if the client is not connected or if sending fails.
1021 ///
1022 /// # Example
1023 ///
1024 /// ```no_run
1025 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
1026 /// # #[tokio::main]
1027 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
1028 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
1029 /// # client.connect().await?;
1030 /// // First conversation
1031 /// client.query("Hello").await?;
1032 ///
1033 /// // Start new conversation with different context
1034 /// client.new_session("session-2", "Tell me about Rust").await?;
1035 /// # Ok(())
1036 /// # }
1037 /// ```
1038 pub async fn new_session(
1039 &mut self,
1040 session_id: impl Into<String>,
1041 prompt: impl Into<String>,
1042 ) -> Result<()> {
1043 self.query_with_session(prompt, session_id).await
1044 }
1045
1046 /// Disconnect from Claude (analogous to Python's __aexit__)
1047 ///
1048 /// This cleanly shuts down the connection to Claude Code CLI.
1049 ///
1050 /// # Errors
1051 ///
1052 /// Returns an error if disconnection fails.
1053 #[instrument(name = "claude.client.disconnect", skip(self))]
1054 pub async fn disconnect(&mut self) -> Result<()> {
1055 if !self.connected {
1056 debug!("Client already disconnected");
1057 return Ok(());
1058 }
1059
1060 info!("Disconnecting from Claude Code CLI (closing all isolated queries)");
1061
1062 if let Some(query_manager) = self.query_manager.take() {
1063 // Get all queries for cleanup
1064 let queries = query_manager.get_all_queries();
1065
1066 // Close all isolated queries by closing their resources
1067 // This signals each CLI process to exit
1068 for (_query_id, query) in &queries {
1069 // Close stdin (if available)
1070 if let Some(ref stdin_arc) = query.stdin {
1071 let mut stdin_guard = stdin_arc.lock().await;
1072 if let Some(mut stdin_stream) = stdin_guard.take() {
1073 let _ = stdin_stream.shutdown().await;
1074 }
1075 }
1076
1077 // Close transport
1078 let transport = Arc::clone(&query.transport);
1079 let mut transport_guard = transport.lock().await;
1080 let _ = transport_guard.close().await;
1081 }
1082
1083 // Give background tasks a moment to finish reading and release locks
1084 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1085 }
1086
1087 self.current_query_id = None;
1088 self.connected = false;
1089 debug!("Disconnected successfully");
1090 Ok(())
1091 }
1092}
1093
1094impl Drop for ClaudeClient {
1095 fn drop(&mut self) {
1096 // Note: We can't run async code in Drop, so we can't guarantee clean shutdown
1097 // Users should call disconnect() explicitly
1098 if self.connected {
1099 eprintln!(
1100 "Warning: ClaudeClient dropped without calling disconnect(). Resources may not be cleaned up properly."
1101 );
1102 }
1103 }
1104}
1105
1106#[cfg(test)]
1107mod tests {
1108 use super::*;
1109 use crate::types::permissions::{PermissionResult, PermissionResultAllow};
1110 use std::sync::Arc;
1111
1112 #[tokio::test]
1113 async fn test_connect_rejects_can_use_tool_with_custom_permission_tool() {
1114 let callback: crate::types::permissions::CanUseToolCallback =
1115 Arc::new(|_tool_name, _tool_input, _context| {
1116 Box::pin(async move { PermissionResult::Allow(PermissionResultAllow::default()) })
1117 });
1118
1119 let opts = ClaudeAgentOptions::builder()
1120 .can_use_tool(callback)
1121 .permission_prompt_tool_name("custom_tool") // Not "stdio"
1122 .build();
1123
1124 let mut client = ClaudeClient::new(opts);
1125 let result = client.connect().await;
1126
1127 assert!(result.is_err());
1128 let err = result.unwrap_err();
1129 assert!(matches!(err, ClaudeError::InvalidConfig(_)));
1130 assert!(err.to_string().contains("permission_prompt_tool_name"));
1131 }
1132
1133 #[tokio::test]
1134 async fn test_connect_accepts_can_use_tool_with_stdio() {
1135 let callback: crate::types::permissions::CanUseToolCallback =
1136 Arc::new(|_tool_name, _tool_input, _context| {
1137 Box::pin(async move { PermissionResult::Allow(PermissionResultAllow::default()) })
1138 });
1139
1140 let opts = ClaudeAgentOptions::builder()
1141 .can_use_tool(callback)
1142 .permission_prompt_tool_name("stdio") // Explicitly "stdio" is OK
1143 .build();
1144
1145 let mut client = ClaudeClient::new(opts);
1146 // This will fail later (CLI not found), but should pass validation
1147 let result = client.connect().await;
1148
1149 // Should not be InvalidConfig error about permission_prompt_tool_name
1150 if let Err(ref err) = result {
1151 assert!(
1152 !err.to_string().contains("permission_prompt_tool_name"),
1153 "Should not fail on permission_prompt_tool_name validation"
1154 );
1155 }
1156 }
1157
1158 #[tokio::test]
1159 async fn test_connect_accepts_can_use_tool_without_permission_tool() {
1160 let callback: crate::types::permissions::CanUseToolCallback =
1161 Arc::new(|_tool_name, _tool_input, _context| {
1162 Box::pin(async move { PermissionResult::Allow(PermissionResultAllow::default()) })
1163 });
1164
1165 let opts = ClaudeAgentOptions::builder()
1166 .can_use_tool(callback)
1167 // No permission_prompt_tool_name set - defaults to stdio
1168 .build();
1169
1170 let mut client = ClaudeClient::new(opts);
1171 // This will fail later (CLI not found), but should pass validation
1172 let result = client.connect().await;
1173
1174 // Should not be InvalidConfig error about permission_prompt_tool_name
1175 if let Err(ref err) = result {
1176 assert!(
1177 !err.to_string().contains("permission_prompt_tool_name"),
1178 "Should not fail on permission_prompt_tool_name validation"
1179 );
1180 }
1181 }
1182}