1use std::collections::HashMap;
11use std::sync::Arc;
12use std::time::Duration;
13
14use futures::stream::BoxStream;
15use futures::{Stream, StreamExt};
16use serde_json::Value;
17use serde_json::json;
18
19use crate::client::InputPrompt;
20use crate::errors::{Error, Result};
21use crate::query::{Query, build_hooks_config};
22use crate::sdk_mcp::McpSdkServer;
23use crate::transport::Transport;
24use crate::transport::subprocess_cli::{Prompt as TransportPrompt, SubprocessCliTransport};
25use crate::types::{ClaudeAgentOptions, McpServerConfig, McpServersOption, Message};
26
27pub struct InternalClient;
33
34impl Default for InternalClient {
35 fn default() -> Self {
36 Self::new()
37 }
38}
39
40impl InternalClient {
41 pub fn new() -> Self {
51 Self
52 }
53
54 fn extract_sdk_mcp_servers(options: &ClaudeAgentOptions) -> HashMap<String, Arc<McpSdkServer>> {
56 let mut servers = HashMap::new();
57 if let McpServersOption::Servers(configs) = &options.mcp_servers {
58 for (name, config) in configs {
59 if let McpServerConfig::Sdk(sdk_config) = config {
60 servers.insert(name.clone(), sdk_config.instance.clone());
61 }
62 }
63 }
64 servers
65 }
66
67 fn configure_options(
68 options: ClaudeAgentOptions,
69 is_text_prompt: bool,
70 ) -> Result<ClaudeAgentOptions> {
71 if options.can_use_tool.is_some() && is_text_prompt {
72 return Err(Error::Other(
73 "can_use_tool callback requires streaming mode. Please provide prompt as messages."
74 .to_string(),
75 ));
76 }
77
78 if options.can_use_tool.is_some() && options.permission_prompt_tool_name.is_some() {
79 return Err(Error::Other(
80 "can_use_tool callback cannot be used with permission_prompt_tool_name."
81 .to_string(),
82 ));
83 }
84
85 let mut configured_options = options;
86 if configured_options.can_use_tool.is_some() {
87 configured_options.permission_prompt_tool_name = Some("stdio".to_string());
88 }
89 Ok(configured_options)
90 }
91
92 async fn initialize_query(
93 &self,
94 transport_prompt: TransportPrompt,
95 options: ClaudeAgentOptions,
96 transport: Option<Box<dyn Transport>>,
97 ) -> Result<Query> {
98 let mut chosen_transport: Box<dyn Transport> = if let Some(transport) = transport {
99 transport
100 } else {
101 Box::new(SubprocessCliTransport::new(
102 transport_prompt,
103 options.clone(),
104 )?)
105 };
106 chosen_transport.connect().await?;
107
108 let hooks = options.hooks.clone().unwrap_or_default();
109 let sdk_mcp_servers = Self::extract_sdk_mcp_servers(&options);
110 let (hooks_config, hook_callbacks) = build_hooks_config(&hooks);
111
112 let (reader, writer, close_handle) = chosen_transport.into_split()?;
113
114 let mut query = Query::start(
115 reader,
116 writer,
117 close_handle,
118 true,
119 options.can_use_tool.clone(),
120 hook_callbacks,
121 sdk_mcp_servers,
122 options.agents.clone(),
123 Duration::from_secs(60),
124 );
125 query.initialize(hooks_config).await?;
126 Ok(query)
127 }
128
129 async fn send_prompt(query: &Query, prompt: InputPrompt) -> Result<()> {
130 match prompt {
131 InputPrompt::Text(text) => {
132 query
133 .stream_input(vec![json!({
134 "type": "user",
135 "message": {"role": "user", "content": text},
136 "parent_tool_use_id": Value::Null,
137 "session_id": ""
138 })])
139 .await?;
140 }
141 InputPrompt::Messages(messages) => {
142 query.stream_input(messages).await?;
143 }
144 }
145 Ok(())
146 }
147
148 async fn collect_messages(mut query: Query) -> Result<Vec<Message>> {
149 let mut messages = Vec::new();
150 let read_result: Result<()> = async {
151 while let Some(message) = query.receive_next_message().await? {
152 messages.push(message);
153 }
154 Ok(())
155 }
156 .await;
157 let close_result = query.close().await;
158
159 match (read_result, close_result) {
160 (Err(err), _) => Err(err),
161 (Ok(()), Err(err)) => Err(err),
162 (Ok(()), Ok(())) => Ok(messages),
163 }
164 }
165
166 fn into_message_stream(mut query: Query) -> BoxStream<'static, Result<Message>> {
167 let rx = query.take_message_receiver();
168
169 if let Some(rx) = rx {
170 let close_handle_query = query;
172 futures::stream::unfold(
173 (rx, Some(close_handle_query)),
174 |(mut rx, query)| async move {
175 match rx.recv().await {
176 Some(msg) => Some((msg, (rx, query))),
177 None => {
178 if let Some(q) = query {
180 let _ = q.close().await;
181 }
182 None
183 }
184 }
185 },
186 )
187 .boxed()
188 } else {
189 futures::stream::empty().boxed()
191 }
192 }
193
194 pub async fn process_query(
215 &self,
216 prompt: InputPrompt,
217 options: ClaudeAgentOptions,
218 transport: Option<Box<dyn Transport>>,
219 ) -> Result<Vec<Message>> {
220 let configured_options =
221 Self::configure_options(options, matches!(prompt, InputPrompt::Text(_)))?;
222
223 let transport_prompt = match &prompt {
224 InputPrompt::Text(text) => TransportPrompt::Text(text.clone()),
225 InputPrompt::Messages(_) => TransportPrompt::Messages,
226 };
227
228 let query = self
229 .initialize_query(transport_prompt, configured_options, transport)
230 .await?;
231 Self::send_prompt(&query, prompt).await?;
232 Self::collect_messages(query).await
233 }
234
235 pub async fn process_query_from_stream<S>(
258 &self,
259 prompt: S,
260 options: ClaudeAgentOptions,
261 transport: Option<Box<dyn Transport>>,
262 ) -> Result<Vec<Message>>
263 where
264 S: Stream<Item = Value> + Unpin,
265 {
266 let configured_options = Self::configure_options(options, false)?;
267 let query = self
268 .initialize_query(TransportPrompt::Messages, configured_options, transport)
269 .await?;
270 query.stream_input_from_stream(prompt).await?;
271 Self::collect_messages(query).await
272 }
273
274 pub async fn process_query_as_stream(
300 &self,
301 prompt: InputPrompt,
302 options: ClaudeAgentOptions,
303 transport: Option<Box<dyn Transport>>,
304 ) -> Result<BoxStream<'static, Result<Message>>> {
305 let configured_options =
306 Self::configure_options(options, matches!(prompt, InputPrompt::Text(_)))?;
307 let transport_prompt = match &prompt {
308 InputPrompt::Text(text) => TransportPrompt::Text(text.clone()),
309 InputPrompt::Messages(_) => TransportPrompt::Messages,
310 };
311 let query = self
312 .initialize_query(transport_prompt, configured_options, transport)
313 .await?;
314 Self::send_prompt(&query, prompt).await?;
315 Ok(Self::into_message_stream(query))
316 }
317
318 pub async fn process_query_from_stream_as_stream<S>(
343 &self,
344 prompt: S,
345 options: ClaudeAgentOptions,
346 transport: Option<Box<dyn Transport>>,
347 ) -> Result<BoxStream<'static, Result<Message>>>
348 where
349 S: Stream<Item = Value> + Unpin,
350 {
351 let configured_options = Self::configure_options(options, false)?;
352 let query = self
353 .initialize_query(TransportPrompt::Messages, configured_options, transport)
354 .await?;
355 query.stream_input_from_stream(prompt).await?;
356 Ok(Self::into_message_stream(query))
357 }
358}