#![warn(missing_docs)]
pub mod callback;
pub mod client;
pub mod config;
pub mod discovery;
pub mod errors;
pub mod hooks;
pub mod mcp;
pub mod permissions;
pub mod transport;
pub mod types;
pub(crate) mod util;
#[cfg(feature = "testing")]
pub mod testing;
pub use client::Client;
pub use config::{ClientConfig, PermissionMode, SystemPrompt};
pub use errors::{Error, Result};
pub use types::content::{
ALLOWED_IMAGE_MIME_TYPES, Base64ImageSource, ContentBlock, ImageBlock, ImageSource,
MAX_IMAGE_BASE64_BYTES, TextBlock, ThinkingBlock, ToolResultBlock, ToolResultContent,
ToolUseBlock, UrlImageSource, UserContent,
};
pub use types::messages::{
AssistantMessage, AssistantMessageInner, McpServerStatus, Message, ResultMessage, SessionInfo,
StreamEvent, SystemMessage, Usage, UserMessage, UserMessageInner,
};
pub use permissions::{CanUseToolCallback, PermissionContext, PermissionDecision};
pub use hooks::{
HookCallback, HookContext, HookDecision, HookEvent, HookInput, HookMatcher, HookOutput,
};
pub use mcp::{McpServerConfig, McpServers};
pub use discovery::{check_cli_version, find_cli};
pub use transport::Transport;
pub use tokio_util::sync::CancellationToken;
pub use callback::MessageCallback;
use futures_core::Stream;
async fn collect_stream(stream: impl Stream<Item = Result<Message>>) -> Result<Vec<Message>> {
use tokio_stream::StreamExt;
tokio::pin!(stream);
let mut messages = Vec::new();
while let Some(msg) = stream.next().await {
messages.push(msg?);
}
Ok(messages)
}
#[must_use = "the future must be awaited to run the query"]
pub async fn query(config: ClientConfig) -> Result<Vec<Message>> {
let stream = query_stream(config).await?;
collect_stream(stream).await
}
#[must_use = "the future must be awaited to run the query"]
pub async fn query_stream(config: ClientConfig) -> Result<impl Stream<Item = Result<Message>>> {
let cancel = config.cancellation_token.clone();
let mut client = Client::new(config)?;
client.connect().await?;
let read_timeout = client.read_timeout();
let rx = client.take_message_rx().ok_or(Error::NotConnected)?;
Ok(async_stream::stream! {
loop {
match crate::client::recv_with_timeout(&rx, read_timeout, cancel.as_ref()).await {
Ok(msg) => yield Ok(msg),
Err(ref e) if matches!(e, crate::errors::Error::Transport(_)) => break,
Err(e) => {
yield Err(e);
break;
}
}
}
let _ = client.close().await;
})
}
#[must_use = "the future must be awaited to run the query"]
pub async fn query_with_content(
content: Vec<UserContent>,
config: ClientConfig,
) -> Result<Vec<Message>> {
let stream = query_stream_with_content(content, config).await?;
collect_stream(stream).await
}
#[must_use = "the future must be awaited to run the query"]
pub async fn query_stream_with_content(
content: Vec<UserContent>,
config: ClientConfig,
) -> Result<impl Stream<Item = Result<Message>>> {
if content.is_empty() {
return Err(Error::Config("content must not be empty".into()));
}
let cancel = config.cancellation_token.clone();
let mut client = Client::new(config)?;
client.connect().await?;
let read_timeout = client.read_timeout();
let user_message = serde_json::json!({
"type": "user",
"message": {
"role": "user",
"content": content
}
});
let json = serde_json::to_string(&user_message).map_err(Error::Json)?;
client.transport_write(&json).await?;
let rx = client.take_message_rx().ok_or(Error::NotConnected)?;
Ok(async_stream::stream! {
loop {
match crate::client::recv_with_timeout(&rx, read_timeout, cancel.as_ref()).await {
Ok(msg) => yield Ok(msg),
Err(ref e) if matches!(e, crate::errors::Error::Transport(_)) => break,
Err(e) => {
yield Err(e);
break;
}
}
}
let _ = client.close().await;
})
}