Skip to main content

claude_code_sdk_rust/
query.rs

1use crate::error::{ClaudeSDKError, Result};
2use crate::internal::control::{
3    initialize_request, initialize_timeout_duration, respond_to_control_request,
4    send_control_request_with_callbacks_and_timeout, ControlCallbacks,
5};
6use crate::internal::parser::parse_message_line;
7use crate::internal::session_resume::{apply_materialized_options, materialize_resume_session};
8use crate::internal::session_store_validation::validate_session_store_options;
9use crate::internal::transcript_mirror::TranscriptMirrorBatcher;
10use crate::internal::transport::{SubprocessCLITransport, Transport, TransportOptions};
11use crate::types::{ClaudeAgentOptions, Message};
12use futures::{Stream, StreamExt};
13use serde::{Deserialize, Serialize};
14
15/// Token usage information from a query response.
16#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct TokenUsage {
18    pub input_tokens: i32,
19    pub output_tokens: i32,
20    pub total_tokens: i32,
21}
22
23/// Result from a one-shot query to Claude.
24#[derive(Debug, Clone)]
25pub struct QueryResult {
26    /// The text content of the response.
27    pub content: String,
28    /// Token usage statistics, if available.
29    pub usage: Option<TokenUsage>,
30    /// The reason the response finished (e.g., "end_turn", "max_tokens").
31    pub finish_reason: String,
32}
33
34/// Perform a one-shot query to Claude Code.
35///
36/// This is a convenience function for simple, stateless interactions where you do not
37/// need bidirectional communication or conversation management. It creates a temporary
38/// client, sends a single prompt, and returns the complete response.
39///
40/// # Arguments
41///
42/// * `prompt` - The prompt to send to Claude.
43/// * `options` - Optional configuration options. If None, default options are used.
44///
45/// # Returns
46///
47/// A `QueryResult` containing the response content, usage statistics, and finish reason.
48///
49/// # Example
50///
51/// ```rust
52/// use claude_code_sdk_rust::query;
53///
54/// async fn example() -> Result<(), Box<dyn std::error::Error>> {
55///     let result = query("What is Rust?", None).await?;
56///     println!("{}", result.content);
57///     Ok(())
58/// }
59/// ```
60pub async fn query(
61    prompt: impl Into<String>,
62    options: Option<ClaudeAgentOptions>,
63) -> Result<QueryResult> {
64    let messages = query_messages(prompt, options).await?;
65    Ok(summarize_messages(messages))
66}
67
68/// Perform a one-shot query and return the full message sequence.
69///
70/// This is the Rust equivalent of the Python SDK's `query()` async iterator,
71/// adapted to return the collected messages as a `Vec<Message>`.
72pub async fn query_messages(
73    prompt: impl Into<String>,
74    options: Option<ClaudeAgentOptions>,
75) -> Result<Vec<Message>> {
76    let prompt = prompt.into();
77    let mut options = options.unwrap_or_default();
78    if options.can_use_tool.is_some() {
79        return Err(ClaudeSDKError::Other(
80            "can_use_tool callback requires streaming mode. \
81             Please use ClaudeAgentClient instead of query_messages with a string prompt."
82                .to_string(),
83        ));
84    }
85    validate_session_store_options(&options)?;
86    let materialized = materialize_resume_session(&options).await?;
87    if let Some(materialized) = &materialized {
88        options = apply_materialized_options(&options, materialized);
89    }
90    let result = run_query_messages(prompt, options).await;
91    if let Some(materialized) = &materialized {
92        materialized.cleanup().await;
93    }
94    result
95}
96
97pub async fn query_messages_with_transport(
98    prompt: impl Into<String>,
99    options: Option<ClaudeAgentOptions>,
100    transport: Box<dyn Transport>,
101) -> Result<Vec<Message>> {
102    let prompt = prompt.into();
103    let options = options.unwrap_or_default();
104    if options.can_use_tool.is_some() {
105        return Err(ClaudeSDKError::Other(
106            "can_use_tool callback requires streaming mode. \
107             Please use ClaudeAgentClient instead of query_messages with a string prompt."
108                .to_string(),
109        ));
110    }
111    validate_session_store_options(&options)?;
112    run_query_messages_with_transport(prompt, options, transport).await
113}
114
115/// Perform a one-shot query from a stream of raw prompt messages.
116///
117/// Each streamed value is written to the Claude Code transport as one JSON line.
118/// If a message is an object and does not include `session_id`, `"default"` is
119/// inserted to match the interactive client's streaming prompt behavior.
120pub async fn query_stream_messages<S>(
121    stream: S,
122    options: Option<ClaudeAgentOptions>,
123) -> Result<Vec<Message>>
124where
125    S: Stream<Item = serde_json::Value> + Unpin,
126{
127    let mut options = options.unwrap_or_default();
128    validate_session_store_options(&options)?;
129    let materialized = materialize_resume_session(&options).await?;
130    if let Some(materialized) = &materialized {
131        options = apply_materialized_options(&options, materialized);
132    }
133    let result = run_query_stream_messages(stream, options).await;
134    if let Some(materialized) = &materialized {
135        materialized.cleanup().await;
136    }
137    result
138}
139
140pub async fn query_stream_messages_with_transport<S>(
141    stream: S,
142    options: Option<ClaudeAgentOptions>,
143    transport: Box<dyn Transport>,
144) -> Result<Vec<Message>>
145where
146    S: Stream<Item = serde_json::Value> + Unpin,
147{
148    let options = options.unwrap_or_default();
149    validate_session_store_options(&options)?;
150    run_query_stream_messages_with_transport(stream, options, transport).await
151}
152
153async fn run_query_messages(prompt: String, options: ClaudeAgentOptions) -> Result<Vec<Message>> {
154    // Create transport with options
155    let transport_options = TransportOptions::from(&options);
156    let transport = SubprocessCLITransport::new(transport_options);
157    run_query_messages_with_transport(prompt, options, Box::new(transport)).await
158}
159
160async fn run_query_stream_messages<S>(
161    stream: S,
162    options: ClaudeAgentOptions,
163) -> Result<Vec<Message>>
164where
165    S: Stream<Item = serde_json::Value> + Unpin,
166{
167    let transport_options = TransportOptions::from(&options);
168    let transport = SubprocessCLITransport::new(transport_options);
169    run_query_stream_messages_with_transport(stream, options, Box::new(transport)).await
170}
171
172async fn run_query_messages_with_transport(
173    prompt: String,
174    options: ClaudeAgentOptions,
175    mut transport: Box<dyn Transport>,
176) -> Result<Vec<Message>> {
177    let control_callbacks = ControlCallbacks::from_options(&options);
178    let mut transcript_mirror = TranscriptMirrorBatcher::from_options(&options);
179
180    // Connect to the CLI
181    transport.connect().await?;
182    send_control_request_with_callbacks_and_timeout(
183        transport.as_mut(),
184        initialize_request(&control_callbacks),
185        &control_callbacks,
186        initialize_timeout_duration(),
187    )
188    .await?;
189
190    // Build and send the user message
191    let user_message = serde_json::json!({
192        "type": "user",
193        "session_id": "",
194        "message": {
195            "role": "user",
196            "content": prompt
197        },
198        "parent_tool_use_id": null
199    });
200
201    transport
202        .write(format!("{}\n", user_message).as_bytes())
203        .await?;
204
205    let mut messages = Vec::new();
206
207    // Read messages until we get the result
208    while let Some(data) = transport.read().await? {
209        let line = String::from_utf8_lossy(&data);
210        let value = serde_json::from_slice::<serde_json::Value>(&data)?;
211        if value.get("type").and_then(|v| v.as_str()) == Some("control_request") {
212            respond_to_control_request(transport.as_mut(), &value, &control_callbacks).await?;
213            continue;
214        }
215        if value.get("type").and_then(|v| v.as_str()) == Some("transcript_mirror") {
216            if let Some(batcher) = &mut transcript_mirror {
217                messages.extend(batcher.enqueue_value(&value).await?);
218            }
219            continue;
220        }
221        match parse_message_line(&line)? {
222            Some(message @ Message::ResultMsg { .. }) => {
223                flush_transcript_mirror(&mut transcript_mirror).await?;
224                messages.push(message);
225                break;
226            }
227            Some(message) => {
228                messages.push(message);
229            }
230            None => {}
231        }
232    }
233
234    // Close the transport
235    flush_transcript_mirror(&mut transcript_mirror).await?;
236    transport.close().await?;
237
238    Ok(messages)
239}
240
241async fn run_query_stream_messages_with_transport<S>(
242    mut stream: S,
243    options: ClaudeAgentOptions,
244    mut transport: Box<dyn Transport>,
245) -> Result<Vec<Message>>
246where
247    S: Stream<Item = serde_json::Value> + Unpin,
248{
249    let control_callbacks = ControlCallbacks::from_options(&options);
250    let mut transcript_mirror = TranscriptMirrorBatcher::from_options(&options);
251
252    transport.connect().await?;
253    send_control_request_with_callbacks_and_timeout(
254        transport.as_mut(),
255        initialize_request(&control_callbacks),
256        &control_callbacks,
257        initialize_timeout_duration(),
258    )
259    .await?;
260
261    while let Some(mut message) = stream.next().await {
262        if let Some(object) = message.as_object_mut() {
263            object
264                .entry("session_id")
265                .or_insert_with(|| serde_json::Value::String("default".to_string()));
266        }
267        let mut json_payload = serde_json::to_vec(&message)?;
268        json_payload.push(b'\n');
269        transport.write(&json_payload).await?;
270    }
271    transport.close_input().await?;
272
273    let mut messages = Vec::new();
274    while let Some(data) = transport.read().await? {
275        let line = String::from_utf8_lossy(&data);
276        let value = serde_json::from_slice::<serde_json::Value>(&data)?;
277        if value.get("type").and_then(|v| v.as_str()) == Some("control_request") {
278            respond_to_control_request(transport.as_mut(), &value, &control_callbacks).await?;
279            continue;
280        }
281        if value.get("type").and_then(|v| v.as_str()) == Some("transcript_mirror") {
282            if let Some(batcher) = &mut transcript_mirror {
283                messages.extend(batcher.enqueue_value(&value).await?);
284            }
285            continue;
286        }
287        match parse_message_line(&line)? {
288            Some(message @ Message::ResultMsg { .. }) => {
289                flush_transcript_mirror(&mut transcript_mirror).await?;
290                messages.push(message);
291                break;
292            }
293            Some(message) => {
294                messages.push(message);
295            }
296            None => {}
297        }
298    }
299
300    flush_transcript_mirror(&mut transcript_mirror).await?;
301    transport.close().await?;
302
303    Ok(messages)
304}
305
306fn summarize_messages(messages: Vec<Message>) -> QueryResult {
307    let mut content_parts: Vec<String> = Vec::new();
308    let mut usage: Option<TokenUsage> = None;
309    let mut finish_reason = String::from("unknown");
310
311    for message in messages {
312        match message {
313            Message::AssistantMsg { content, .. } => {
314                for block in &content.content {
315                    if let crate::types::ContentBlock::Text { text } = block {
316                        content_parts.push(text.clone());
317                    }
318                }
319            }
320            Message::ResultMsg {
321                usage: msg_usage,
322                stop_reason,
323                result,
324                ..
325            } => {
326                if let Some(result_text) = result {
327                    if content_parts.is_empty() {
328                        content_parts.push(result_text);
329                    }
330                }
331                if let Some(u) = msg_usage {
332                    usage = extract_token_usage(&u);
333                }
334                if let Some(reason) = stop_reason {
335                    finish_reason = reason;
336                }
337            }
338            _ => {}
339        }
340    }
341
342    QueryResult {
343        content: content_parts.join(""),
344        usage,
345        finish_reason,
346    }
347}
348
349async fn flush_transcript_mirror(
350    transcript_mirror: &mut Option<TranscriptMirrorBatcher>,
351) -> Result<()> {
352    if let Some(batcher) = transcript_mirror {
353        let _ = batcher.flush().await?;
354    }
355    Ok(())
356}
357
358/// Extract TokenUsage from a JSON map.
359fn extract_token_usage(
360    usage_map: &serde_json::Map<String, serde_json::Value>,
361) -> Option<TokenUsage> {
362    let input_tokens = usage_map
363        .get("input_tokens")
364        .and_then(|v| v.as_i64())
365        .map(|v| v as i32)?;
366    let output_tokens = usage_map
367        .get("output_tokens")
368        .and_then(|v| v.as_i64())
369        .map(|v| v as i32)?;
370    let total_tokens = usage_map
371        .get("total_tokens")
372        .and_then(|v| v.as_i64())
373        .map(|v| v as i32)?;
374
375    Some(TokenUsage {
376        input_tokens,
377        output_tokens,
378        total_tokens,
379    })
380}