claude_code_rs/query_fn.rs
1use tokio_stream::wrappers::ReceiverStream;
2
3use crate::error::{Error, Result};
4use crate::query::Query;
5use crate::transport::cli_discovery;
6use crate::transport::subprocess::SubprocessTransport;
7use crate::types::messages::Message;
8use crate::types::options::ClaudeAgentOptions;
9
10/// Execute a one-shot query against the Claude CLI and return a stream of messages.
11///
12/// This is the simplest API - it handles connection, init handshake, sending the prompt,
13/// and returns a stream of messages until the result message arrives.
14///
15/// # Example
16/// ```no_run
17/// use claude_code_rs::{ClaudeAgentOptions, Message};
18/// use claude_code_rs::query_fn::query;
19/// use tokio_stream::StreamExt;
20///
21/// # async fn example() -> claude_code_rs::Result<()> {
22/// let options = ClaudeAgentOptions {
23/// max_turns: Some(3),
24/// ..Default::default()
25/// };
26///
27/// let mut stream = query("What is 2+2?", options).await?;
28/// while let Some(msg) = stream.next().await {
29/// match msg? {
30/// Message::Assistant { message } => {
31/// if let Some(text) = message.content.iter()
32/// .find_map(|b| b.as_text()) {
33/// print!("{text}");
34/// }
35/// }
36/// Message::Result { result } => {
37/// println!("\n[done, cost: {:?}]", result.total_cost_usd);
38/// break;
39/// }
40/// _ => {}
41/// }
42/// }
43/// # Ok(())
44/// # }
45/// ```
46pub async fn query(
47 prompt: &str,
48 options: ClaudeAgentOptions,
49) -> Result<ReceiverStream<Result<Message>>> {
50 let cli_path = match options.cli_path {
51 Some(ref p) => p.clone(),
52 None => cli_discovery::find_cli()?,
53 };
54
55 let transport = SubprocessTransport::new(cli_path, &options);
56 let mut q = Query::new(
57 Box::new(transport),
58 options.hooks,
59 options.can_use_tool,
60 None, // MCP handler wired through client, not one-shot query
61 options.control_timeout,
62 );
63
64 let rx = q.connect().await?;
65
66 // Send the prompt.
67 q.send_message(prompt, None).await?;
68
69 // Return the message stream. The Query and transport are kept alive
70 // by the spawned tasks until the channel is dropped.
71 // We leak the Query intentionally - it will be cleaned up when
72 // the spawned tasks finish (transport closes, channels drop).
73 // This matches the Python SDK's query() behavior.
74 std::mem::forget(q);
75
76 Ok(ReceiverStream::new(rx))
77}
78
79/// Execute a query and collect all messages until the result.
80///
81/// Returns the full list of messages including the final ResultMessage.
82pub async fn query_collect(
83 prompt: &str,
84 options: ClaudeAgentOptions,
85) -> Result<Vec<Message>> {
86 use tokio_stream::StreamExt;
87
88 let mut stream = query(prompt, options).await?;
89 let mut messages = Vec::new();
90
91 while let Some(msg) = stream.next().await {
92 let msg = msg?;
93 let is_result = msg.is_result();
94 messages.push(msg);
95 if is_result {
96 break;
97 }
98 }
99
100 Ok(messages)
101}
102
103/// Execute a query and return just the text response.
104///
105/// Collects all assistant text blocks and joins them.
106pub async fn query_text(
107 prompt: &str,
108 options: ClaudeAgentOptions,
109) -> Result<String> {
110 let messages = query_collect(prompt, options).await?;
111 let mut text = String::new();
112
113 for msg in &messages {
114 if let Some(t) = msg.text() {
115 text.push_str(&t);
116 }
117 }
118
119 if text.is_empty() {
120 // Check if there was an error.
121 if let Some(Message::Result { result }) = messages.last() {
122 if result.is_error {
123 return Err(Error::Process(
124 result.error.clone().unwrap_or("unknown error".into()),
125 ));
126 }
127 }
128 }
129
130 Ok(text)
131}