Skip to main content

claude_code/
internal_client.rs

1//! Internal client for executing single-query sessions.
2//!
3//! This module provides [`InternalClient`], a stateless helper that manages
4//! the full lifecycle of a single query: connect → initialize → send → receive → close.
5//!
6//! This is used internally by the [`query()`](crate::query_fn::query) convenience function.
7//! Most users should use [`query()`](crate::query_fn::query) or [`ClaudeSdkClient`](crate::ClaudeSdkClient)
8//! directly.
9
10use std::collections::HashMap;
11use std::sync::Arc;
12use std::time::Duration;
13
14use futures::stream::BoxStream;
15use futures::{Stream, StreamExt};
16use serde_json::Value;
17use serde_json::json;
18
19use crate::client::InputPrompt;
20use crate::errors::{Error, Result};
21use crate::query::{Query, build_hooks_config};
22use crate::sdk_mcp::McpSdkServer;
23use crate::transport::Transport;
24use crate::transport::subprocess_cli::{Prompt as TransportPrompt, SubprocessCliTransport};
25use crate::types::{ClaudeAgentOptions, McpServerConfig, McpServersOption, Message};
26
27/// Stateless internal client for executing single-query sessions.
28///
29/// Unlike [`ClaudeSdkClient`](crate::ClaudeSdkClient), this client does not maintain
30/// state between queries. Each call to [`process_query()`](Self::process_query) creates
31/// a fresh connection, executes the query, and tears down the connection.
32pub struct InternalClient;
33
34impl Default for InternalClient {
35    fn default() -> Self {
36        Self::new()
37    }
38}
39
40impl InternalClient {
41    /// Creates a new `InternalClient`.
42    ///
43    /// # Example
44    ///
45    /// ```rust
46    /// use claude_code::internal_client::InternalClient;
47    ///
48    /// let _client = InternalClient::new();
49    /// ```
50    pub fn new() -> Self {
51        Self
52    }
53
54    /// Extracts SDK MCP server instances from the options for in-process routing.
55    fn extract_sdk_mcp_servers(options: &ClaudeAgentOptions) -> HashMap<String, Arc<McpSdkServer>> {
56        let mut servers = HashMap::new();
57        if let McpServersOption::Servers(configs) = &options.mcp_servers {
58            for (name, config) in configs {
59                if let McpServerConfig::Sdk(sdk_config) = config {
60                    servers.insert(name.clone(), sdk_config.instance.clone());
61                }
62            }
63        }
64        servers
65    }
66
67    fn configure_options(
68        options: ClaudeAgentOptions,
69        is_text_prompt: bool,
70    ) -> Result<ClaudeAgentOptions> {
71        if options.can_use_tool.is_some() && is_text_prompt {
72            return Err(Error::Other(
73                "can_use_tool callback requires streaming mode. Please provide prompt as messages."
74                    .to_string(),
75            ));
76        }
77
78        if options.can_use_tool.is_some() && options.permission_prompt_tool_name.is_some() {
79            return Err(Error::Other(
80                "can_use_tool callback cannot be used with permission_prompt_tool_name."
81                    .to_string(),
82            ));
83        }
84
85        let mut configured_options = options;
86        if configured_options.can_use_tool.is_some() {
87            configured_options.permission_prompt_tool_name = Some("stdio".to_string());
88        }
89        Ok(configured_options)
90    }
91
92    async fn initialize_query(
93        &self,
94        transport_prompt: TransportPrompt,
95        options: ClaudeAgentOptions,
96        transport: Option<Box<dyn Transport>>,
97    ) -> Result<Query> {
98        let mut chosen_transport: Box<dyn Transport> = if let Some(transport) = transport {
99            transport
100        } else {
101            Box::new(SubprocessCliTransport::new(
102                transport_prompt,
103                options.clone(),
104            )?)
105        };
106        chosen_transport.connect().await?;
107
108        let hooks = options.hooks.clone().unwrap_or_default();
109        let sdk_mcp_servers = Self::extract_sdk_mcp_servers(&options);
110        let (hooks_config, hook_callbacks) = build_hooks_config(&hooks);
111
112        let (reader, writer, close_handle) = chosen_transport.into_split()?;
113
114        let mut query = Query::start(
115            reader,
116            writer,
117            close_handle,
118            true,
119            options.can_use_tool.clone(),
120            hook_callbacks,
121            sdk_mcp_servers,
122            options.agents.clone(),
123            Duration::from_secs(60),
124        );
125        query.initialize(hooks_config).await?;
126        Ok(query)
127    }
128
129    async fn send_prompt(query: &Query, prompt: InputPrompt) -> Result<()> {
130        match prompt {
131            InputPrompt::Text(text) => {
132                query
133                    .stream_input(vec![json!({
134                        "type": "user",
135                        "message": {"role": "user", "content": text},
136                        "parent_tool_use_id": Value::Null,
137                        "session_id": ""
138                    })])
139                    .await?;
140            }
141            InputPrompt::Messages(messages) => {
142                query.stream_input(messages).await?;
143            }
144        }
145        Ok(())
146    }
147
148    async fn collect_messages(mut query: Query) -> Result<Vec<Message>> {
149        let mut messages = Vec::new();
150        let read_result: Result<()> = async {
151            while let Some(message) = query.receive_next_message().await? {
152                messages.push(message);
153            }
154            Ok(())
155        }
156        .await;
157        let close_result = query.close().await;
158
159        match (read_result, close_result) {
160            (Err(err), _) => Err(err),
161            (Ok(()), Err(err)) => Err(err),
162            (Ok(()), Ok(())) => Ok(messages),
163        }
164    }
165
166    fn into_message_stream(mut query: Query) -> BoxStream<'static, Result<Message>> {
167        let rx = query.take_message_receiver();
168
169        if let Some(rx) = rx {
170            // Use the channel receiver directly — this is Send.
171            let close_handle_query = query;
172            futures::stream::unfold(
173                (rx, Some(close_handle_query)),
174                |(mut rx, query)| async move {
175                    match rx.recv().await {
176                        Some(msg) => Some((msg, (rx, query))),
177                        None => {
178                            // Channel closed — close the query.
179                            if let Some(q) = query {
180                                let _ = q.close().await;
181                            }
182                            None
183                        }
184                    }
185                },
186            )
187            .boxed()
188        } else {
189            // Fallback: empty stream.
190            futures::stream::empty().boxed()
191        }
192    }
193
194    /// Executes a complete query lifecycle: connect, send, receive all messages, and close.
195    ///
196    /// # Example
197    ///
198    /// ```rust,no_run
199    /// use claude_code::internal_client::InternalClient;
200    /// use claude_code::{InputPrompt, ClaudeAgentOptions};
201    ///
202    /// # async fn example() -> claude_code::Result<()> {
203    /// let client = InternalClient::new();
204    /// let _messages = client
205    ///     .process_query(
206    ///         InputPrompt::Text("hello".to_string()),
207    ///         ClaudeAgentOptions::default(),
208    ///         None,
209    ///     )
210    ///     .await?;
211    /// # Ok(())
212    /// # }
213    /// ```
214    pub async fn process_query(
215        &self,
216        prompt: InputPrompt,
217        options: ClaudeAgentOptions,
218        transport: Option<Box<dyn Transport>>,
219    ) -> Result<Vec<Message>> {
220        let configured_options =
221            Self::configure_options(options, matches!(prompt, InputPrompt::Text(_)))?;
222
223        let transport_prompt = match &prompt {
224            InputPrompt::Text(text) => TransportPrompt::Text(text.clone()),
225            InputPrompt::Messages(_) => TransportPrompt::Messages,
226        };
227
228        let query = self
229            .initialize_query(transport_prompt, configured_options, transport)
230            .await?;
231        Self::send_prompt(&query, prompt).await?;
232        Self::collect_messages(query).await
233    }
234
235    /// Executes a one-shot query where input messages are provided as a stream.
236    ///
237    /// # Example
238    ///
239    /// ```rust,no_run
240    /// use claude_code::internal_client::InternalClient;
241    /// use claude_code::ClaudeAgentOptions;
242    /// use futures::stream;
243    /// use serde_json::json;
244    ///
245    /// # async fn example() -> claude_code::Result<()> {
246    /// let client = InternalClient::new();
247    /// let _messages = client
248    ///     .process_query_from_stream(
249    ///         stream::iter(vec![json!({"type":"user","message":{"role":"user","content":"hello"}})]),
250    ///         ClaudeAgentOptions::default(),
251    ///         None,
252    ///     )
253    ///     .await?;
254    /// # Ok(())
255    /// # }
256    /// ```
257    pub async fn process_query_from_stream<S>(
258        &self,
259        prompt: S,
260        options: ClaudeAgentOptions,
261        transport: Option<Box<dyn Transport>>,
262    ) -> Result<Vec<Message>>
263    where
264        S: Stream<Item = Value> + Unpin,
265    {
266        let configured_options = Self::configure_options(options, false)?;
267        let query = self
268            .initialize_query(TransportPrompt::Messages, configured_options, transport)
269            .await?;
270        query.stream_input_from_stream(prompt).await?;
271        Self::collect_messages(query).await
272    }
273
274    /// Executes a one-shot query and returns a streaming response interface.
275    ///
276    /// The returned stream is `Send` and can be consumed from any tokio task.
277    ///
278    /// # Example
279    ///
280    /// ```rust,no_run
281    /// use claude_code::internal_client::InternalClient;
282    /// use claude_code::{InputPrompt, ClaudeAgentOptions};
283    /// use futures::StreamExt;
284    ///
285    /// # async fn example() -> claude_code::Result<()> {
286    /// let client = InternalClient::new();
287    /// let mut stream = client
288    ///     .process_query_as_stream(
289    ///         InputPrompt::Text("hello".to_string()),
290    ///         ClaudeAgentOptions::default(),
291    ///         None,
292    ///     )
293    ///     .await?;
294    ///
295    /// let _ = stream.next().await;
296    /// # Ok(())
297    /// # }
298    /// ```
299    pub async fn process_query_as_stream(
300        &self,
301        prompt: InputPrompt,
302        options: ClaudeAgentOptions,
303        transport: Option<Box<dyn Transport>>,
304    ) -> Result<BoxStream<'static, Result<Message>>> {
305        let configured_options =
306            Self::configure_options(options, matches!(prompt, InputPrompt::Text(_)))?;
307        let transport_prompt = match &prompt {
308            InputPrompt::Text(text) => TransportPrompt::Text(text.clone()),
309            InputPrompt::Messages(_) => TransportPrompt::Messages,
310        };
311        let query = self
312            .initialize_query(transport_prompt, configured_options, transport)
313            .await?;
314        Self::send_prompt(&query, prompt).await?;
315        Ok(Self::into_message_stream(query))
316    }
317
318    /// Executes a one-shot streamed-input query and returns a streaming response interface.
319    ///
320    /// # Example
321    ///
322    /// ```rust,no_run
323    /// use claude_code::internal_client::InternalClient;
324    /// use claude_code::ClaudeAgentOptions;
325    /// use futures::{stream, StreamExt};
326    /// use serde_json::json;
327    ///
328    /// # async fn example() -> claude_code::Result<()> {
329    /// let client = InternalClient::new();
330    /// let mut stream = client
331    ///     .process_query_from_stream_as_stream(
332    ///         stream::iter(vec![json!({"type":"user","message":{"role":"user","content":"hello"}})]),
333    ///         ClaudeAgentOptions::default(),
334    ///         None,
335    ///     )
336    ///     .await?;
337    ///
338    /// let _ = stream.next().await;
339    /// # Ok(())
340    /// # }
341    /// ```
342    pub async fn process_query_from_stream_as_stream<S>(
343        &self,
344        prompt: S,
345        options: ClaudeAgentOptions,
346        transport: Option<Box<dyn Transport>>,
347    ) -> Result<BoxStream<'static, Result<Message>>>
348    where
349        S: Stream<Item = Value> + Unpin,
350    {
351        let configured_options = Self::configure_options(options, false)?;
352        let query = self
353            .initialize_query(TransportPrompt::Messages, configured_options, transport)
354            .await?;
355        query.stream_input_from_stream(prompt).await?;
356        Ok(Self::into_message_stream(query))
357    }
358}