claude_code_agent_sdk/client.rs
1//! ClaudeClient for bidirectional streaming interactions with hook support
2
3use tracing::{debug, info, instrument};
4
5use futures::stream::Stream;
6use std::pin::Pin;
7use std::sync::Arc;
8use tokio::io::AsyncWriteExt;
9use tokio::sync::Mutex;
10
11use crate::errors::{ClaudeError, Result};
12use crate::internal::message_parser::MessageParser;
13use crate::internal::query_manager::QueryManager;
14use crate::internal::transport::subprocess::QueryPrompt;
15use crate::internal::transport::{SubprocessTransport, Transport};
16use crate::types::config::{ClaudeAgentOptions, PermissionMode};
17use crate::types::efficiency::{build_efficiency_hooks, merge_hooks};
18use crate::types::hooks::HookEvent;
19use crate::types::messages::{Message, UserContentBlock};
20
21/// Client for bidirectional streaming interactions with Claude
22///
23/// This client provides the same functionality as Python's ClaudeSDKClient,
24/// supporting bidirectional communication, streaming responses, and dynamic
25/// control over the Claude session.
26///
27/// This implementation uses the Codex-style architecture with isolated queries,
28/// where each query has its own completely independent QueryFull instance.
29/// This ensures complete message isolation and prevents ResultMessage confusion.
30///
31/// # Example
32///
33/// ```no_run
34/// use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
35/// use futures::StreamExt;
36///
37/// #[tokio::main]
38/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
39/// let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
40///
41/// // Connect to Claude
42/// client.connect().await?;
43///
44/// // Send a query
45/// client.query("Hello Claude!").await?;
46///
47/// // Receive response as a stream
48/// {
49/// let mut stream = client.receive_response();
50/// while let Some(message) = stream.next().await {
51/// println!("Received: {:?}", message?);
52/// }
53/// }
54///
55/// // Disconnect
56/// client.disconnect().await?;
57/// Ok(())
58/// }
59/// ```
60pub struct ClaudeClient {
61 options: ClaudeAgentOptions,
62 /// QueryManager for creating isolated queries (Codex-style architecture)
63 query_manager: Option<Arc<QueryManager>>,
64 /// Current query_id for the active prompt
65 current_query_id: Option<String>,
66 connected: bool,
67}
68
69impl ClaudeClient {
70 /// Create a new ClaudeClient
71 ///
72 /// # Arguments
73 ///
74 /// * `options` - Configuration options for the Claude client
75 ///
76 /// # Example
77 ///
78 /// ```no_run
79 /// use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
80 ///
81 /// let client = ClaudeClient::new(ClaudeAgentOptions::default());
82 /// ```
83 pub fn new(options: ClaudeAgentOptions) -> Self {
84 Self {
85 options,
86 query_manager: None,
87 current_query_id: None,
88 connected: false,
89 }
90 }
91
92 /// Create a new ClaudeClient with early validation
93 ///
94 /// Unlike `new()`, this validates the configuration eagerly by attempting
95 /// to create the transport. This catches issues like invalid working directory
96 /// or missing CLI before `connect()` is called.
97 ///
98 /// # Arguments
99 ///
100 /// * `options` - Configuration options for the Claude client
101 ///
102 /// # Errors
103 ///
104 /// Returns an error if:
105 /// - The working directory does not exist or is not a directory
106 /// - Claude CLI cannot be found
107 ///
108 /// # Example
109 ///
110 /// ```no_run
111 /// use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
112 ///
113 /// let client = ClaudeClient::try_new(ClaudeAgentOptions::default())?;
114 /// # Ok::<(), claude_agent_sdk_rs::ClaudeError>(())
115 /// ```
116 pub fn try_new(options: ClaudeAgentOptions) -> Result<Self> {
117 // Validate by attempting to create transport (but don't keep it)
118 let prompt = QueryPrompt::Streaming;
119 let _ = SubprocessTransport::new(prompt, options.clone())?;
120
121 Ok(Self {
122 options,
123 query_manager: None,
124 current_query_id: None,
125 connected: false,
126 })
127 }
128
129 /// Connect to Claude (analogous to Python's __aenter__)
130 ///
131 /// This establishes the connection to the Claude Code CLI and initializes
132 /// the bidirectional communication channel.
133 ///
134 /// Uses the Codex-style architecture with QueryManager for complete
135 /// message isolation between queries.
136 ///
137 /// # Errors
138 ///
139 /// Returns an error if:
140 /// - Claude CLI cannot be found or started
141 /// - The initialization handshake fails
142 /// - Hook registration fails
143 /// - `can_use_tool` callback is set with incompatible `permission_prompt_tool_name`
144 #[instrument(
145 name = "claude.client.connect",
146 skip(self),
147 fields(
148 has_can_use_tool = self.options.can_use_tool.is_some(),
149 has_hooks = self.options.hooks.is_some(),
150 model = %self.options.model.as_deref().unwrap_or("default"),
151 )
152 )]
153 pub async fn connect(&mut self) -> Result<()> {
154 if self.connected {
155 debug!("Client already connected, skipping");
156 return Ok(());
157 }
158
159 info!("Connecting to Claude Code CLI (using QueryManager for isolated queries)");
160
161 // Automatically set permission_prompt_tool_name to "stdio" when can_use_tool is provided
162 // This matches Python SDK behavior (client.py lines 106-122)
163 // which ensures CLI uses control protocol for permission prompts
164 let mut options = self.options.clone();
165 if options.can_use_tool.is_some() && options.permission_prompt_tool_name.is_none() {
166 info!("can_use_tool callback is set, automatically setting permission_prompt_tool_name to 'stdio'");
167 options.permission_prompt_tool_name = Some("stdio".to_string());
168 }
169
170 // Validate can_use_tool configuration (aligned with Python SDK behavior)
171 // When can_use_tool callback is set, permission_prompt_tool_name must be "stdio"
172 // to ensure the control protocol can handle permission requests
173 if options.can_use_tool.is_some()
174 && let Some(ref tool_name) = options.permission_prompt_tool_name
175 && tool_name != "stdio"
176 {
177 return Err(ClaudeError::InvalidConfig(
178 "can_use_tool callback requires permission_prompt_tool_name to be 'stdio' or unset. \
179 Custom permission_prompt_tool_name is incompatible with can_use_tool callback."
180 .to_string(),
181 ));
182 }
183
184 // Prepare hooks for initialization
185 // Build efficiency hooks if configured
186 let efficiency_hooks = self
187 .options
188 .efficiency
189 .as_ref()
190 .map(build_efficiency_hooks)
191 .unwrap_or_default();
192
193 // Merge user hooks with efficiency hooks
194 let merged_hooks = merge_hooks(self.options.hooks.clone(), efficiency_hooks);
195
196 // Convert hooks to internal format
197 let hooks = merged_hooks.as_ref().map(|hooks_map| {
198 hooks_map
199 .iter()
200 .map(|(event, matchers)| {
201 let event_name = match event {
202 HookEvent::PreToolUse => "PreToolUse",
203 HookEvent::PostToolUse => "PostToolUse",
204 HookEvent::UserPromptSubmit => "UserPromptSubmit",
205 HookEvent::Stop => "Stop",
206 HookEvent::SubagentStop => "SubagentStop",
207 HookEvent::PreCompact => "PreCompact",
208 };
209 (event_name.to_string(), matchers.clone())
210 })
211 .collect()
212 });
213
214 // Extract SDK MCP servers from options
215 let sdk_mcp_servers =
216 if let crate::types::mcp::McpServers::Dict(servers_dict) = &self.options.mcp_servers {
217 servers_dict
218 .iter()
219 .filter_map(|(name, config)| {
220 if let crate::types::mcp::McpServerConfig::Sdk(sdk_config) = config {
221 Some((name.clone(), sdk_config.clone()))
222 } else {
223 None
224 }
225 })
226 .collect()
227 } else {
228 std::collections::HashMap::new()
229 };
230
231 // Clone options for the transport factory
232 let options_clone = options.clone();
233
234 // Create QueryManager with a transport factory
235 // The factory creates new transports for each isolated query
236 // Note: connect() is called later in create_query(), not here
237 let mut query_manager = QueryManager::new(move || {
238 let prompt = QueryPrompt::Streaming;
239 let transport = SubprocessTransport::new(prompt, options_clone.clone())?;
240 Ok(Box::new(transport) as Box<dyn Transport>)
241 });
242
243 // Set control request timeout
244 query_manager.set_control_request_timeout(self.options.control_request_timeout);
245
246 // Set configuration on QueryManager
247 query_manager.set_hooks(hooks).await;
248 query_manager.set_sdk_mcp_servers(sdk_mcp_servers).await;
249 query_manager.set_can_use_tool(self.options.can_use_tool.clone()).await;
250
251 let query_manager = Arc::new(query_manager);
252
253 // Create the first query for initialization
254 let first_query_id = query_manager.create_query().await?;
255
256 // Start cleanup task for inactive queries
257 Arc::clone(&query_manager).start_cleanup_task();
258
259 self.query_manager = Some(query_manager);
260 self.current_query_id = Some(first_query_id);
261 self.connected = true;
262
263 info!("Successfully connected to Claude Code CLI with QueryManager");
264 Ok(())
265 }
266
267 /// Send a query to Claude
268 ///
269 /// This sends a new user prompt to Claude. Claude will remember the context
270 /// of previous queries within the same session.
271 ///
272 /// # Arguments
273 ///
274 /// * `prompt` - The user prompt to send
275 ///
276 /// # Errors
277 ///
278 /// Returns an error if the client is not connected or if sending fails.
279 ///
280 /// # Example
281 ///
282 /// ```no_run
283 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
284 /// # #[tokio::main]
285 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
286 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
287 /// # client.connect().await?;
288 /// client.query("What is 2 + 2?").await?;
289 /// # Ok(())
290 /// # }
291 /// ```
292 #[instrument(
293 name = "claude.client.query",
294 skip(self, prompt),
295 fields(session_id = "default",)
296 )]
297 pub async fn query(&mut self, prompt: impl Into<String>) -> Result<()> {
298 self.query_with_session(prompt, "default").await
299 }
300
301 /// Send a query to Claude with a specific session ID
302 ///
303 /// This sends a new user prompt to Claude. Different session IDs maintain
304 /// separate conversation contexts.
305 ///
306 /// # Arguments
307 ///
308 /// * `prompt` - The user prompt to send
309 /// * `session_id` - Session identifier for the conversation
310 ///
311 /// # Errors
312 ///
313 /// Returns an error if the client is not connected or if sending fails.
314 ///
315 /// # Example
316 ///
317 /// ```no_run
318 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
319 /// # #[tokio::main]
320 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
321 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
322 /// # client.connect().await?;
323 /// // Separate conversation contexts
324 /// client.query_with_session("First question", "session-1").await?;
325 /// client.query_with_session("Different question", "session-2").await?;
326 /// # Ok(())
327 /// # }
328 /// ```
329 pub async fn query_with_session(
330 &mut self,
331 prompt: impl Into<String>,
332 session_id: impl Into<String>,
333 ) -> Result<()> {
334 let query_manager = self.query_manager.as_ref().ok_or_else(|| {
335 ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
336 })?;
337
338 let prompt_str = prompt.into();
339 let session_id_str = session_id.into();
340
341 // Get or create query for this session (reuses existing query to maintain context)
342 let query_id = query_manager.get_or_create_session_query(&session_id_str).await?;
343 self.current_query_id = Some(query_id.clone());
344
345 // Get the isolated query
346 let query = query_manager.get_query(&query_id)?;
347
348 // Drain any pending messages from previous (cancelled) prompts
349 // This prevents receiving stale messages when reusing a query
350 query.drain_pending_messages().await;
351
352 // Format as JSON message for stream-json input format
353 let user_message = serde_json::json!({
354 "type": "user",
355 "message": {
356 "role": "user",
357 "content": prompt_str
358 },
359 "session_id": session_id_str
360 });
361
362 let message_str = serde_json::to_string(&user_message).map_err(|e| {
363 ClaudeError::Transport(format!("Failed to serialize user message: {}", e))
364 })?;
365
366 // Write directly to stdin (bypasses transport lock)
367 let stdin = query.stdin.clone();
368
369 if let Some(stdin_arc) = stdin {
370 let mut stdin_guard = stdin_arc.lock().await;
371 if let Some(ref mut stdin_stream) = *stdin_guard {
372 stdin_stream
373 .write_all(message_str.as_bytes())
374 .await
375 .map_err(|e| ClaudeError::Transport(format!("Failed to write query: {}", e)))?;
376 stdin_stream.write_all(b"\n").await.map_err(|e| {
377 ClaudeError::Transport(format!("Failed to write newline: {}", e))
378 })?;
379 stdin_stream
380 .flush()
381 .await
382 .map_err(|e| ClaudeError::Transport(format!("Failed to flush: {}", e)))?;
383 } else {
384 return Err(ClaudeError::Transport("stdin not available".to_string()));
385 }
386 } else {
387 return Err(ClaudeError::Transport("stdin not set".to_string()));
388 }
389
390 debug!(
391 query_id = %query_id,
392 session_id = %session_id_str,
393 "Sent query to isolated query"
394 );
395
396 Ok(())
397 }
398
399 /// Send a query with structured content blocks (supports images)
400 ///
401 /// This method enables multimodal queries in bidirectional streaming mode.
402 /// Use it to send images alongside text for vision-related tasks.
403 ///
404 /// # Arguments
405 ///
406 /// * `content` - A vector of content blocks (text and/or images)
407 ///
408 /// # Errors
409 ///
410 /// Returns an error if:
411 /// - The content vector is empty (must include at least one text or image block)
412 /// - The client is not connected (call `connect()` first)
413 /// - Sending the message fails
414 ///
415 /// # Example
416 ///
417 /// ```no_run
418 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions, UserContentBlock};
419 /// # #[tokio::main]
420 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
421 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
422 /// # client.connect().await?;
423 /// let base64_data = "iVBORw0KGgo..."; // base64 encoded image
424 /// client.query_with_content(vec![
425 /// UserContentBlock::text("What's in this image?"),
426 /// UserContentBlock::image_base64("image/png", base64_data)?,
427 /// ]).await?;
428 /// # Ok(())
429 /// # }
430 /// ```
431 pub async fn query_with_content(
432 &mut self,
433 content: impl Into<Vec<UserContentBlock>>,
434 ) -> Result<()> {
435 self.query_with_content_and_session(content, "default")
436 .await
437 }
438
439 /// Send a query with structured content blocks and a specific session ID
440 ///
441 /// This method enables multimodal queries with session management for
442 /// maintaining separate conversation contexts.
443 ///
444 /// # Arguments
445 ///
446 /// * `content` - A vector of content blocks (text and/or images)
447 /// * `session_id` - Session identifier for the conversation
448 ///
449 /// # Errors
450 ///
451 /// Returns an error if:
452 /// - The content vector is empty (must include at least one text or image block)
453 /// - The client is not connected (call `connect()` first)
454 /// - Sending the message fails
455 ///
456 /// # Example
457 ///
458 /// ```no_run
459 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions, UserContentBlock};
460 /// # #[tokio::main]
461 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
462 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
463 /// # client.connect().await?;
464 /// client.query_with_content_and_session(
465 /// vec![
466 /// UserContentBlock::text("Analyze this chart"),
467 /// UserContentBlock::image_url("https://example.com/chart.png"),
468 /// ],
469 /// "analysis-session",
470 /// ).await?;
471 /// # Ok(())
472 /// # }
473 /// ```
474 pub async fn query_with_content_and_session(
475 &mut self,
476 content: impl Into<Vec<UserContentBlock>>,
477 session_id: impl Into<String>,
478 ) -> Result<()> {
479 let query_manager = self.query_manager.as_ref().ok_or_else(|| {
480 ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
481 })?;
482
483 let content_blocks: Vec<UserContentBlock> = content.into();
484 UserContentBlock::validate_content(&content_blocks)?;
485
486 let session_id_str = session_id.into();
487
488 // Get or create query for this session (reuses existing query to maintain context)
489 let query_id = query_manager.get_or_create_session_query(&session_id_str).await?;
490 self.current_query_id = Some(query_id.clone());
491
492 // Get the isolated query
493 let query = query_manager.get_query(&query_id)?;
494
495 // Drain any pending messages from previous (cancelled) prompts
496 // This prevents receiving stale messages when reusing a query
497 query.drain_pending_messages().await;
498
499 // Format as JSON message for stream-json input format
500 // Content is an array of content blocks, not a simple string
501 let user_message = serde_json::json!({
502 "type": "user",
503 "message": {
504 "role": "user",
505 "content": content_blocks
506 },
507 "session_id": session_id_str
508 });
509
510 let message_str = serde_json::to_string(&user_message).map_err(|e| {
511 ClaudeError::Transport(format!("Failed to serialize user message: {}", e))
512 })?;
513
514 // Write directly to stdin (bypasses transport lock)
515 let stdin = query.stdin.clone();
516
517 if let Some(stdin_arc) = stdin {
518 let mut stdin_guard = stdin_arc.lock().await;
519 if let Some(ref mut stdin_stream) = *stdin_guard {
520 stdin_stream
521 .write_all(message_str.as_bytes())
522 .await
523 .map_err(|e| ClaudeError::Transport(format!("Failed to write query: {}", e)))?;
524 stdin_stream.write_all(b"\n").await.map_err(|e| {
525 ClaudeError::Transport(format!("Failed to write newline: {}", e))
526 })?;
527 stdin_stream
528 .flush()
529 .await
530 .map_err(|e| ClaudeError::Transport(format!("Failed to flush: {}", e)))?;
531 } else {
532 return Err(ClaudeError::Transport("stdin not available".to_string()));
533 }
534 } else {
535 return Err(ClaudeError::Transport("stdin not set".to_string()));
536 }
537
538 debug!(
539 query_id = %query_id,
540 session_id = %session_id_str,
541 "Sent content query to isolated query"
542 );
543
544 Ok(())
545 }
546
547 /// Receive all messages as a stream (continuous)
548 ///
549 /// This method returns a stream that yields all messages from Claude
550 /// indefinitely until the stream is closed or an error occurs.
551 ///
552 /// Use this when you want to process all messages, including multiple
553 /// responses and system events.
554 ///
555 /// # Returns
556 ///
557 /// A stream of `Result<Message>` that continues until the connection closes.
558 ///
559 /// # Example
560 ///
561 /// ```no_run
562 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
563 /// # use futures::StreamExt;
564 /// # #[tokio::main]
565 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
566 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
567 /// # client.connect().await?;
568 /// # client.query("Hello").await?;
569 /// let mut stream = client.receive_messages();
570 /// while let Some(message) = stream.next().await {
571 /// println!("Received: {:?}", message?);
572 /// }
573 /// # Ok(())
574 /// # }
575 /// ```
576 pub fn receive_messages(&self) -> Pin<Box<dyn Stream<Item = Result<Message>> + Send + '_>> {
577 let query_manager = match &self.query_manager {
578 Some(qm) => Arc::clone(qm),
579 None => {
580 return Box::pin(futures::stream::once(async {
581 Err(ClaudeError::InvalidConfig(
582 "Client not connected. Call connect() first.".to_string(),
583 ))
584 }));
585 }
586 };
587
588 let query_id = match &self.current_query_id {
589 Some(id) => id.clone(),
590 None => {
591 return Box::pin(futures::stream::once(async {
592 Err(ClaudeError::InvalidConfig(
593 "No active query. Call query() first.".to_string(),
594 ))
595 }));
596 }
597 };
598
599 Box::pin(async_stream::stream! {
600 // Get the isolated query and its message receiver
601 let query = match query_manager.get_query(&query_id) {
602 Ok(q) => q,
603 Err(e) => {
604 yield Err(e);
605 return;
606 }
607 };
608
609 let rx: Arc<Mutex<tokio::sync::mpsc::UnboundedReceiver<serde_json::Value>>> = {
610 Arc::clone(&query.message_rx)
611 };
612
613 loop {
614 let message = {
615 let mut rx_guard = rx.lock().await;
616 rx_guard.recv().await
617 };
618
619 match message {
620 Some(json) => {
621 match MessageParser::parse(json) {
622 Ok(msg) => yield Ok(msg),
623 Err(e) => {
624 eprintln!("Failed to parse message: {}", e);
625 yield Err(e);
626 }
627 }
628 }
629 None => break,
630 }
631 }
632 })
633 }
634
635 /// Receive messages until a ResultMessage
636 ///
637 /// This method returns a stream that yields messages until it encounters
638 /// a `ResultMessage`, which signals the completion of a Claude response.
639 ///
640 /// This is the most common pattern for handling Claude responses, as it
641 /// processes one complete "turn" of the conversation.
642 ///
643 /// This method uses query-scoped message channels to ensure message isolation,
644 /// preventing late-arriving ResultMessages from being consumed by the wrong prompt.
645 ///
646 /// # Returns
647 ///
648 /// A stream of `Result<Message>` that ends when a ResultMessage is received.
649 ///
650 /// # Example
651 ///
652 /// ```no_run
653 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions, Message};
654 /// # use futures::StreamExt;
655 /// # #[tokio::main]
656 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
657 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
658 /// # client.connect().await?;
659 /// # client.query("Hello").await?;
660 /// let mut stream = client.receive_response();
661 /// while let Some(message) = stream.next().await {
662 /// match message? {
663 /// Message::Assistant(msg) => println!("Assistant: {:?}", msg),
664 /// Message::Result(result) => {
665 /// println!("Done! Cost: ${:?}", result.total_cost_usd);
666 /// break;
667 /// }
668 /// _ => {}
669 /// }
670 /// }
671 /// # Ok(())
672 /// # }
673 /// ```
674 pub fn receive_response(&self) -> Pin<Box<dyn Stream<Item = Result<Message>> + Send + '_>> {
675 let query_manager = match &self.query_manager {
676 Some(qm) => Arc::clone(qm),
677 None => {
678 return Box::pin(futures::stream::once(async {
679 Err(ClaudeError::InvalidConfig(
680 "Client not connected. Call connect() first.".to_string(),
681 ))
682 }));
683 }
684 };
685
686 let query_id = match &self.current_query_id {
687 Some(id) => id.clone(),
688 None => {
689 return Box::pin(futures::stream::once(async {
690 Err(ClaudeError::InvalidConfig(
691 "No active query. Call query() first.".to_string(),
692 ))
693 }));
694 }
695 };
696
697 Box::pin(async_stream::stream! {
698 // ====================================================================
699 // ISOLATED QUERY MESSAGE CHANNEL (Codex-style)
700 // ====================================================================
701 // In the Codex-style architecture, each query has its own completely
702 // isolated QueryFull instance. We get the message receiver directly
703 // from the isolated query, eliminating the need for routing logic.
704 //
705 // This provides:
706 // - Complete message isolation
707 // - No possibility of message confusion
708 // - Simpler architecture without routing overhead
709 //
710 // Note: Cleanup is handled by the periodic cleanup task in QueryManager,
711 // which removes inactive queries whose channels have been closed.
712
713 debug!(
714 query_id = %query_id,
715 "Getting message receiver from isolated query"
716 );
717
718 // Get the isolated query and its message receiver
719 let query = match query_manager.get_query(&query_id) {
720 Ok(q) => q,
721 Err(e) => {
722 yield Err(e);
723 return;
724 }
725 };
726
727 // Get the message receiver from the isolated query
728 // In isolated mode, we directly access the message_rx without routing
729 let rx: Arc<Mutex<tokio::sync::mpsc::UnboundedReceiver<serde_json::Value>>> = {
730 Arc::clone(&query.message_rx)
731 };
732
733 loop {
734 let message = {
735 let mut rx_guard = rx.lock().await;
736 rx_guard.recv().await
737 };
738
739 match message {
740 Some(json) => {
741 match MessageParser::parse(json) {
742 Ok(msg) => {
743 let is_result = matches!(msg, Message::Result(_));
744 yield Ok(msg);
745 if is_result {
746 debug!(
747 query_id = %query_id,
748 "Received ResultMessage, ending stream"
749 );
750 // Cleanup will be handled by the periodic cleanup task
751 // when the query becomes inactive
752 break;
753 }
754 }
755 Err(e) => {
756 eprintln!("Failed to parse message: {}", e);
757 yield Err(e);
758 }
759 }
760 }
761 None => {
762 debug!(
763 query_id = %query_id,
764 "Isolated query channel closed"
765 );
766 // Cleanup will be handled by the periodic cleanup task
767 break;
768 }
769 }
770 }
771 })
772 }
773
774 /// Drain any leftover messages from the previous prompt
775 ///
776 /// This method removes any messages remaining in the channel from a previous
777 /// prompt. This should be called before starting a new prompt to ensure
778 /// that the new prompt doesn't receive stale messages.
779 ///
780 /// This is important when prompts are cancelled or end unexpectedly,
781 /// as there may be buffered messages that would otherwise be received
782 /// by the next prompt.
783 ///
784 /// # Returns
785 ///
786 /// The number of messages drained from the channel.
787 ///
788 /// # Example
789 ///
790 /// ```no_run
791 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
792 /// # #[tokio::main]
793 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
794 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
795 /// # client.connect().await?;
796 /// // Before starting a new prompt, drain any leftover messages
797 /// let drained = client.drain_messages().await;
798 /// if drained > 0 {
799 /// eprintln!("Drained {} leftover messages from previous prompt", drained);
800 /// }
801 /// # Ok(())
802 /// # }
803 /// ```
804 pub async fn drain_messages(&self) -> usize {
805 let Some(query_manager) = &self.query_manager else {
806 return 0;
807 };
808
809 let Some(query_id) = &self.current_query_id else {
810 return 0;
811 };
812
813 let Ok(query) = query_manager.get_query(query_id) else {
814 return 0;
815 };
816
817 let rx: Arc<Mutex<tokio::sync::mpsc::UnboundedReceiver<serde_json::Value>>> = {
818 Arc::clone(&query.message_rx)
819 };
820
821 let mut count = 0;
822 // Use try_recv to drain all currently available messages without blocking
823 loop {
824 let mut rx_guard = rx.lock().await;
825 match rx_guard.try_recv() {
826 Ok(_) => count += 1,
827 Err(tokio::sync::mpsc::error::TryRecvError::Empty) => break,
828 Err(tokio::sync::mpsc::error::TryRecvError::Disconnected) => break,
829 }
830 }
831
832 if count > 0 {
833 debug!(count, "Drained leftover messages from previous prompt");
834 }
835
836 count
837 }
838
839 /// Send an interrupt signal to stop the current Claude operation
840 ///
841 /// This is analogous to Python's `client.interrupt()`.
842 ///
843 /// # Errors
844 ///
845 /// Returns an error if the client is not connected or if sending fails.
846 pub async fn interrupt(&self) -> Result<()> {
847 let query_manager = self.query_manager.as_ref().ok_or_else(|| {
848 ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
849 })?;
850
851 let query_id = self.current_query_id.as_ref().ok_or_else(|| {
852 ClaudeError::InvalidConfig("No active query. Call query() first.".to_string())
853 })?;
854
855 let query = query_manager.get_query(query_id)?;
856 query.interrupt().await
857 }
858
859 /// Change the permission mode dynamically
860 ///
861 /// This is analogous to Python's `client.set_permission_mode()`.
862 ///
863 /// # Arguments
864 ///
865 /// * `mode` - The new permission mode to set
866 ///
867 /// # Errors
868 ///
869 /// Returns an error if the client is not connected or if sending fails.
870 pub async fn set_permission_mode(&self, mode: PermissionMode) -> Result<()> {
871 let query_manager = self.query_manager.as_ref().ok_or_else(|| {
872 ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
873 })?;
874
875 let query_id = self.current_query_id.as_ref().ok_or_else(|| {
876 ClaudeError::InvalidConfig("No active query. Call query() first.".to_string())
877 })?;
878
879 let query = query_manager.get_query(query_id)?;
880 query.set_permission_mode(mode).await
881 }
882
883 /// Change the AI model dynamically
884 ///
885 /// This is analogous to Python's `client.set_model()`.
886 ///
887 /// # Arguments
888 ///
889 /// * `model` - The new model name, or None to use default
890 ///
891 /// # Errors
892 ///
893 /// Returns an error if the client is not connected or if sending fails.
894 pub async fn set_model(&self, model: Option<&str>) -> Result<()> {
895 let query_manager = self.query_manager.as_ref().ok_or_else(|| {
896 ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
897 })?;
898
899 let query_id = self.current_query_id.as_ref().ok_or_else(|| {
900 ClaudeError::InvalidConfig("No active query. Call query() first.".to_string())
901 })?;
902
903 let query = query_manager.get_query(query_id)?;
904 query.set_model(model).await
905 }
906
907 /// Rewind tracked files to their state at a specific user message.
908 ///
909 /// This is analogous to Python's `client.rewind_files()`.
910 ///
911 /// # Requirements
912 ///
913 /// - `enable_file_checkpointing=true` in options to track file changes
914 /// - `extra_args={"replay-user-messages": None}` to receive UserMessage
915 /// objects with `uuid` in the response stream
916 ///
917 /// # Arguments
918 ///
919 /// * `user_message_id` - UUID of the user message to rewind to. This should be
920 /// the `uuid` field from a `UserMessage` received during the conversation.
921 ///
922 /// # Errors
923 ///
924 /// Returns an error if the client is not connected or if sending fails.
925 ///
926 /// # Example
927 ///
928 /// ```no_run
929 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions, Message};
930 /// # use std::collections::HashMap;
931 /// # #[tokio::main]
932 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
933 /// let options = ClaudeAgentOptions::builder()
934 /// .enable_file_checkpointing(true)
935 /// .extra_args(HashMap::from([("replay-user-messages".to_string(), None)]))
936 /// .build();
937 /// let mut client = ClaudeClient::new(options);
938 /// client.connect().await?;
939 ///
940 /// client.query("Make some changes to my files").await?;
941 /// let mut checkpoint_id = None;
942 /// {
943 /// let mut stream = client.receive_response();
944 /// use futures::StreamExt;
945 /// while let Some(Ok(msg)) = stream.next().await {
946 /// if let Message::User(user_msg) = &msg {
947 /// if let Some(uuid) = &user_msg.uuid {
948 /// checkpoint_id = Some(uuid.clone());
949 /// }
950 /// }
951 /// }
952 /// }
953 ///
954 /// // Later, rewind to that point
955 /// if let Some(id) = checkpoint_id {
956 /// client.rewind_files(&id).await?;
957 /// }
958 /// # Ok(())
959 /// # }
960 /// ```
961 pub async fn rewind_files(&self, user_message_id: &str) -> Result<()> {
962 let query_manager = self.query_manager.as_ref().ok_or_else(|| {
963 ClaudeError::InvalidConfig("Client not connected. Call connect() first.".to_string())
964 })?;
965
966 let query_id = self.current_query_id.as_ref().ok_or_else(|| {
967 ClaudeError::InvalidConfig("No active query. Call query() first.".to_string())
968 })?;
969
970 let query = query_manager.get_query(query_id)?;
971 query.rewind_files(user_message_id).await
972 }
973
974 /// Get server initialization info including available commands and output styles
975 ///
976 /// Returns initialization information from the Claude Code server including:
977 /// - Available commands (slash commands, system commands, etc.)
978 /// - Current and available output styles
979 /// - Server capabilities
980 ///
981 /// This is analogous to Python's `client.get_server_info()`.
982 ///
983 /// # Returns
984 ///
985 /// Dictionary with server info, or None if not connected
986 ///
987 /// # Example
988 ///
989 /// ```no_run
990 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
991 /// # #[tokio::main]
992 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
993 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
994 /// # client.connect().await?;
995 /// if let Some(info) = client.get_server_info().await {
996 /// println!("Commands available: {}", info.get("commands").map(|c| c.as_array().map(|a| a.len()).unwrap_or(0)).unwrap_or(0));
997 /// println!("Output style: {:?}", info.get("output_style"));
998 /// }
999 /// # Ok(())
1000 /// # }
1001 /// ```
1002 pub async fn get_server_info(&self) -> Option<serde_json::Value> {
1003 let query_manager = self.query_manager.as_ref()?;
1004 let query_id = self.current_query_id.as_ref()?;
1005 let Ok(query) = query_manager.get_query(query_id) else {
1006 return None;
1007 };
1008 query.get_initialization_result().await
1009 }
1010
1011 /// Start a new session by switching to a different session ID
1012 ///
1013 /// This is a convenience method that creates a new conversation context.
1014 /// It's equivalent to calling `query_with_session()` with a new session ID.
1015 ///
1016 /// To completely clear memory and start fresh, use `ClaudeAgentOptions::builder().fork_session(true).build()`
1017 /// when creating a new client.
1018 ///
1019 /// # Arguments
1020 ///
1021 /// * `session_id` - The new session ID to use
1022 /// * `prompt` - Initial message for the new session
1023 ///
1024 /// # Errors
1025 ///
1026 /// Returns an error if the client is not connected or if sending fails.
1027 ///
1028 /// # Example
1029 ///
1030 /// ```no_run
1031 /// # use claude_agent_sdk_rs::{ClaudeClient, ClaudeAgentOptions};
1032 /// # #[tokio::main]
1033 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
1034 /// # let mut client = ClaudeClient::new(ClaudeAgentOptions::default());
1035 /// # client.connect().await?;
1036 /// // First conversation
1037 /// client.query("Hello").await?;
1038 ///
1039 /// // Start new conversation with different context
1040 /// client.new_session("session-2", "Tell me about Rust").await?;
1041 /// # Ok(())
1042 /// # }
1043 /// ```
1044 pub async fn new_session(
1045 &mut self,
1046 session_id: impl Into<String>,
1047 prompt: impl Into<String>,
1048 ) -> Result<()> {
1049 self.query_with_session(prompt, session_id).await
1050 }
1051
1052 /// Disconnect from Claude (analogous to Python's __aexit__)
1053 ///
1054 /// This cleanly shuts down the connection to Claude Code CLI.
1055 ///
1056 /// # Errors
1057 ///
1058 /// Returns an error if disconnection fails.
1059 #[instrument(name = "claude.client.disconnect", skip(self))]
1060 pub async fn disconnect(&mut self) -> Result<()> {
1061 if !self.connected {
1062 debug!("Client already disconnected");
1063 return Ok(());
1064 }
1065
1066 info!("Disconnecting from Claude Code CLI (closing all isolated queries)");
1067
1068 if let Some(query_manager) = self.query_manager.take() {
1069 // Get all queries for cleanup
1070 let queries = query_manager.get_all_queries();
1071
1072 // Close all isolated queries by closing their resources
1073 // This signals each CLI process to exit
1074 for (_query_id, query) in &queries {
1075 // Close stdin (if available)
1076 if let Some(ref stdin_arc) = query.stdin {
1077 let mut stdin_guard = stdin_arc.lock().await;
1078 if let Some(mut stdin_stream) = stdin_guard.take() {
1079 let _ = stdin_stream.shutdown().await;
1080 }
1081 }
1082
1083 // Close transport
1084 let transport = Arc::clone(&query.transport);
1085 let mut transport_guard = transport.lock().await;
1086 let _ = transport_guard.close().await;
1087 }
1088
1089 // Give background tasks a moment to finish reading and release locks
1090 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
1091 }
1092
1093 self.current_query_id = None;
1094 self.connected = false;
1095 debug!("Disconnected successfully");
1096 Ok(())
1097 }
1098}
1099
1100impl Drop for ClaudeClient {
1101 fn drop(&mut self) {
1102 // Note: We can't run async code in Drop, so we can't guarantee clean shutdown
1103 // Users should call disconnect() explicitly
1104 if self.connected {
1105 eprintln!(
1106 "Warning: ClaudeClient dropped without calling disconnect(). Resources may not be cleaned up properly."
1107 );
1108 }
1109 }
1110}
1111
1112#[cfg(test)]
1113mod tests {
1114 use super::*;
1115 use crate::types::permissions::{PermissionResult, PermissionResultAllow};
1116 use std::sync::Arc;
1117
1118 #[tokio::test]
1119 async fn test_connect_rejects_can_use_tool_with_custom_permission_tool() {
1120 let callback: crate::types::permissions::CanUseToolCallback =
1121 Arc::new(|_tool_name, _tool_input, _context| {
1122 Box::pin(async move { PermissionResult::Allow(PermissionResultAllow::default()) })
1123 });
1124
1125 let opts = ClaudeAgentOptions::builder()
1126 .can_use_tool(callback)
1127 .permission_prompt_tool_name("custom_tool") // Not "stdio"
1128 .build();
1129
1130 let mut client = ClaudeClient::new(opts);
1131 let result = client.connect().await;
1132
1133 assert!(result.is_err());
1134 let err = result.unwrap_err();
1135 assert!(matches!(err, ClaudeError::InvalidConfig(_)));
1136 assert!(err.to_string().contains("permission_prompt_tool_name"));
1137 }
1138
1139 #[tokio::test]
1140 async fn test_connect_accepts_can_use_tool_with_stdio() {
1141 let callback: crate::types::permissions::CanUseToolCallback =
1142 Arc::new(|_tool_name, _tool_input, _context| {
1143 Box::pin(async move { PermissionResult::Allow(PermissionResultAllow::default()) })
1144 });
1145
1146 let opts = ClaudeAgentOptions::builder()
1147 .can_use_tool(callback)
1148 .permission_prompt_tool_name("stdio") // Explicitly "stdio" is OK
1149 .build();
1150
1151 let mut client = ClaudeClient::new(opts);
1152 // This will fail later (CLI not found), but should pass validation
1153 let result = client.connect().await;
1154
1155 // Should not be InvalidConfig error about permission_prompt_tool_name
1156 if let Err(ref err) = result {
1157 assert!(
1158 !err.to_string().contains("permission_prompt_tool_name"),
1159 "Should not fail on permission_prompt_tool_name validation"
1160 );
1161 }
1162 }
1163
1164 #[tokio::test]
1165 async fn test_connect_accepts_can_use_tool_without_permission_tool() {
1166 let callback: crate::types::permissions::CanUseToolCallback =
1167 Arc::new(|_tool_name, _tool_input, _context| {
1168 Box::pin(async move { PermissionResult::Allow(PermissionResultAllow::default()) })
1169 });
1170
1171 let opts = ClaudeAgentOptions::builder()
1172 .can_use_tool(callback)
1173 // No permission_prompt_tool_name set - defaults to stdio
1174 .build();
1175
1176 let mut client = ClaudeClient::new(opts);
1177 // This will fail later (CLI not found), but should pass validation
1178 let result = client.connect().await;
1179
1180 // Should not be InvalidConfig error about permission_prompt_tool_name
1181 if let Err(ref err) = result {
1182 assert!(
1183 !err.to_string().contains("permission_prompt_tool_name"),
1184 "Should not fail on permission_prompt_tool_name validation"
1185 );
1186 }
1187 }
1188}