use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use futures::stream::BoxStream;
use futures::{Stream, StreamExt};
use serde_json::Value;
use serde_json::json;
use crate::client::InputPrompt;
use crate::errors::{Error, Result};
use crate::query::{Query, build_hooks_config};
use crate::sdk_mcp::McpSdkServer;
use crate::transport::Transport;
use crate::transport::subprocess_cli::{Prompt as TransportPrompt, SubprocessCliTransport};
use crate::types::{ClaudeAgentOptions, McpServerConfig, McpServersOption, Message};
pub struct InternalClient;
impl Default for InternalClient {
fn default() -> Self {
Self::new()
}
}
impl InternalClient {
pub fn new() -> Self {
Self
}
fn extract_sdk_mcp_servers(options: &ClaudeAgentOptions) -> HashMap<String, Arc<McpSdkServer>> {
let mut servers = HashMap::new();
if let McpServersOption::Servers(configs) = &options.mcp_servers {
for (name, config) in configs {
if let McpServerConfig::Sdk(sdk_config) = config {
servers.insert(name.clone(), sdk_config.instance.clone());
}
}
}
servers
}
fn configure_options(
options: ClaudeAgentOptions,
is_text_prompt: bool,
) -> Result<ClaudeAgentOptions> {
if options.can_use_tool.is_some() && is_text_prompt {
return Err(Error::Other(
"can_use_tool callback requires streaming mode. Please provide prompt as messages."
.to_string(),
));
}
if options.can_use_tool.is_some() && options.permission_prompt_tool_name.is_some() {
return Err(Error::Other(
"can_use_tool callback cannot be used with permission_prompt_tool_name."
.to_string(),
));
}
let mut configured_options = options;
if configured_options.can_use_tool.is_some() {
configured_options.permission_prompt_tool_name = Some("stdio".to_string());
}
Ok(configured_options)
}
async fn initialize_query(
&self,
transport_prompt: TransportPrompt,
options: ClaudeAgentOptions,
transport: Option<Box<dyn Transport>>,
) -> Result<Query> {
let mut chosen_transport: Box<dyn Transport> = if let Some(transport) = transport {
transport
} else {
Box::new(SubprocessCliTransport::new(
transport_prompt,
options.clone(),
)?)
};
chosen_transport.connect().await?;
let hooks = options.hooks.clone().unwrap_or_default();
let sdk_mcp_servers = Self::extract_sdk_mcp_servers(&options);
let (hooks_config, hook_callbacks) = build_hooks_config(&hooks);
let (reader, writer, close_handle) = chosen_transport.into_split()?;
let mut query = Query::start(
reader,
writer,
close_handle,
true,
options.can_use_tool.clone(),
hook_callbacks,
sdk_mcp_servers,
options.agents.clone(),
Duration::from_secs(60),
);
query.initialize(hooks_config).await?;
Ok(query)
}
async fn send_prompt(query: &Query, prompt: InputPrompt) -> Result<()> {
match prompt {
InputPrompt::Text(text) => {
query
.stream_input(vec![json!({
"type": "user",
"message": {"role": "user", "content": text},
"parent_tool_use_id": Value::Null,
"session_id": ""
})])
.await?;
}
InputPrompt::Messages(messages) => {
query.stream_input(messages).await?;
}
}
Ok(())
}
async fn collect_messages(mut query: Query) -> Result<Vec<Message>> {
let mut messages = Vec::new();
let read_result: Result<()> = async {
while let Some(message) = query.receive_next_message().await? {
messages.push(message);
}
Ok(())
}
.await;
let close_result = query.close().await;
match (read_result, close_result) {
(Err(err), _) => Err(err),
(Ok(()), Err(err)) => Err(err),
(Ok(()), Ok(())) => Ok(messages),
}
}
fn into_message_stream(mut query: Query) -> BoxStream<'static, Result<Message>> {
let rx = query.take_message_receiver();
if let Some(rx) = rx {
let close_handle_query = query;
futures::stream::unfold(
(rx, Some(close_handle_query)),
|(mut rx, query)| async move {
match rx.recv().await {
Some(msg) => Some((msg, (rx, query))),
None => {
if let Some(q) = query {
let _ = q.close().await;
}
None
}
}
},
)
.boxed()
} else {
futures::stream::empty().boxed()
}
}
pub async fn process_query(
&self,
prompt: InputPrompt,
options: ClaudeAgentOptions,
transport: Option<Box<dyn Transport>>,
) -> Result<Vec<Message>> {
let configured_options =
Self::configure_options(options, matches!(prompt, InputPrompt::Text(_)))?;
let transport_prompt = match &prompt {
InputPrompt::Text(text) => TransportPrompt::Text(text.clone()),
InputPrompt::Messages(_) => TransportPrompt::Messages,
};
let query = self
.initialize_query(transport_prompt, configured_options, transport)
.await?;
Self::send_prompt(&query, prompt).await?;
Self::collect_messages(query).await
}
pub async fn process_query_from_stream<S>(
&self,
prompt: S,
options: ClaudeAgentOptions,
transport: Option<Box<dyn Transport>>,
) -> Result<Vec<Message>>
where
S: Stream<Item = Value> + Unpin,
{
let configured_options = Self::configure_options(options, false)?;
let query = self
.initialize_query(TransportPrompt::Messages, configured_options, transport)
.await?;
query.stream_input_from_stream(prompt).await?;
Self::collect_messages(query).await
}
pub async fn process_query_as_stream(
&self,
prompt: InputPrompt,
options: ClaudeAgentOptions,
transport: Option<Box<dyn Transport>>,
) -> Result<BoxStream<'static, Result<Message>>> {
let configured_options =
Self::configure_options(options, matches!(prompt, InputPrompt::Text(_)))?;
let transport_prompt = match &prompt {
InputPrompt::Text(text) => TransportPrompt::Text(text.clone()),
InputPrompt::Messages(_) => TransportPrompt::Messages,
};
let query = self
.initialize_query(transport_prompt, configured_options, transport)
.await?;
Self::send_prompt(&query, prompt).await?;
Ok(Self::into_message_stream(query))
}
pub async fn process_query_from_stream_as_stream<S>(
&self,
prompt: S,
options: ClaudeAgentOptions,
transport: Option<Box<dyn Transport>>,
) -> Result<BoxStream<'static, Result<Message>>>
where
S: Stream<Item = Value> + Unpin,
{
let configured_options = Self::configure_options(options, false)?;
let query = self
.initialize_query(TransportPrompt::Messages, configured_options, transport)
.await?;
query.stream_input_from_stream(prompt).await?;
Ok(Self::into_message_stream(query))
}
}