claude_code_rs/query_fn.rs
1use tokio_stream::wrappers::ReceiverStream;
2
3use crate::error::{Error, Result};
4use crate::query::Query;
5use crate::transport::subprocess::SubprocessTransport;
6use crate::types::messages::Message;
7use crate::types::options::ClaudeAgentOptions;
8
9/// Execute a one-shot query against the Claude CLI and return a stream of messages.
10///
11/// This is the simplest API - it handles connection, init handshake, sending the prompt,
12/// and returns a stream of messages until the result message arrives.
13///
14/// # Example
15/// ```no_run
16/// use claude_code_rs::{ClaudeAgentOptions, Message};
17/// use claude_code_rs::query_fn::query;
18/// use tokio_stream::StreamExt;
19///
20/// # async fn example() -> claude_code_rs::Result<()> {
21/// let options = ClaudeAgentOptions {
22/// max_turns: Some(3),
23/// ..Default::default()
24/// };
25///
26/// let mut stream = query("What is 2+2?", options).await?;
27/// while let Some(msg) = stream.next().await {
28/// match msg? {
29/// Message::Assistant { message } => {
30/// if let Some(text) = message.content.iter()
31/// .find_map(|b| b.as_text()) {
32/// print!("{text}");
33/// }
34/// }
35/// Message::Result { result } => {
36/// println!("\n[done, cost: {:?}]", result.total_cost_usd);
37/// break;
38/// }
39/// _ => {}
40/// }
41/// }
42/// # Ok(())
43/// # }
44/// ```
45pub async fn query(
46 prompt: &str,
47 options: ClaudeAgentOptions,
48) -> Result<ReceiverStream<Result<Message>>> {
49 let cli_path = options.resolve_cli_path()?;
50 let transport = SubprocessTransport::new(cli_path, &options);
51 let mut q = Query::new(
52 Box::new(transport),
53 options.hooks,
54 options.can_use_tool,
55 None, // MCP handler wired through client, not one-shot query
56 options.control_timeout,
57 );
58
59 let rx = q.connect().await?;
60
61 // Send the prompt.
62 q.send_message(prompt, None).await?;
63
64 // Keep Query alive in a background task until the consumer channel closes.
65 // When rx is dropped by the consumer, consumer_tx.send() fails and the
66 // router exits, then this task drops q — triggering proper cleanup.
67 tokio::spawn(async move {
68 q.closed().await;
69 });
70
71 Ok(ReceiverStream::new(rx))
72}
73
74/// Execute a query and collect all messages until the result.
75///
76/// Returns the full list of messages including the final ResultMessage.
77pub async fn query_collect(
78 prompt: &str,
79 options: ClaudeAgentOptions,
80) -> Result<Vec<Message>> {
81 use crate::types::messages::collect_until_result;
82
83 let mut stream = query(prompt, options).await?;
84 collect_until_result(&mut stream).await
85}
86
87/// Execute a query and return just the text response.
88///
89/// Collects all assistant text blocks and joins them.
90pub async fn query_text(
91 prompt: &str,
92 options: ClaudeAgentOptions,
93) -> Result<String> {
94 let messages = query_collect(prompt, options).await?;
95 let mut text = String::new();
96
97 for msg in &messages {
98 if let Some(t) = msg.text() {
99 text.push_str(&t);
100 }
101 }
102
103 if text.is_empty() {
104 // Check if there was an error.
105 if let Some(Message::Result { result }) = messages.last() {
106 if result.is_error {
107 return Err(Error::Process(
108 result.error.clone().unwrap_or("unknown error".into()),
109 ));
110 }
111 }
112 }
113
114 Ok(text)
115}