1use crate::error::{ClaudeSDKError, Result};
2use crate::internal::control::{
3 initialize_request, initialize_timeout_duration, respond_to_control_request,
4 send_control_request_with_callbacks_and_timeout, ControlCallbacks,
5};
6use crate::internal::parser::parse_message_line;
7use crate::internal::session_resume::{apply_materialized_options, materialize_resume_session};
8use crate::internal::session_store_validation::validate_session_store_options;
9use crate::internal::transcript_mirror::TranscriptMirrorBatcher;
10use crate::internal::transport::{SubprocessCLITransport, Transport, TransportOptions};
11use crate::types::{ClaudeAgentOptions, Message};
12use futures::{Stream, StreamExt};
13use serde::{Deserialize, Serialize};
14
15#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct TokenUsage {
18 pub input_tokens: i32,
19 pub output_tokens: i32,
20 pub total_tokens: i32,
21}
22
23#[derive(Debug, Clone)]
25pub struct QueryResult {
26 pub content: String,
28 pub usage: Option<TokenUsage>,
30 pub finish_reason: String,
32}
33
34pub async fn query(
61 prompt: impl Into<String>,
62 options: Option<ClaudeAgentOptions>,
63) -> Result<QueryResult> {
64 let messages = query_messages(prompt, options).await?;
65 Ok(summarize_messages(messages))
66}
67
68pub async fn query_messages(
73 prompt: impl Into<String>,
74 options: Option<ClaudeAgentOptions>,
75) -> Result<Vec<Message>> {
76 let prompt = prompt.into();
77 let mut options = options.unwrap_or_default();
78 if options.can_use_tool.is_some() {
79 return Err(ClaudeSDKError::Other(
80 "can_use_tool callback requires streaming mode. \
81 Please use ClaudeAgentClient instead of query_messages with a string prompt."
82 .to_string(),
83 ));
84 }
85 validate_session_store_options(&options)?;
86 let materialized = materialize_resume_session(&options).await?;
87 if let Some(materialized) = &materialized {
88 options = apply_materialized_options(&options, materialized);
89 }
90 let result = run_query_messages(prompt, options).await;
91 if let Some(materialized) = &materialized {
92 materialized.cleanup().await;
93 }
94 result
95}
96
97pub async fn query_messages_with_transport(
98 prompt: impl Into<String>,
99 options: Option<ClaudeAgentOptions>,
100 transport: Box<dyn Transport>,
101) -> Result<Vec<Message>> {
102 let prompt = prompt.into();
103 let options = options.unwrap_or_default();
104 if options.can_use_tool.is_some() {
105 return Err(ClaudeSDKError::Other(
106 "can_use_tool callback requires streaming mode. \
107 Please use ClaudeAgentClient instead of query_messages with a string prompt."
108 .to_string(),
109 ));
110 }
111 validate_session_store_options(&options)?;
112 run_query_messages_with_transport(prompt, options, transport).await
113}
114
115pub async fn query_stream_messages<S>(
121 stream: S,
122 options: Option<ClaudeAgentOptions>,
123) -> Result<Vec<Message>>
124where
125 S: Stream<Item = serde_json::Value> + Unpin,
126{
127 let mut options = options.unwrap_or_default();
128 validate_session_store_options(&options)?;
129 let materialized = materialize_resume_session(&options).await?;
130 if let Some(materialized) = &materialized {
131 options = apply_materialized_options(&options, materialized);
132 }
133 let result = run_query_stream_messages(stream, options).await;
134 if let Some(materialized) = &materialized {
135 materialized.cleanup().await;
136 }
137 result
138}
139
140pub async fn query_stream_messages_with_transport<S>(
141 stream: S,
142 options: Option<ClaudeAgentOptions>,
143 transport: Box<dyn Transport>,
144) -> Result<Vec<Message>>
145where
146 S: Stream<Item = serde_json::Value> + Unpin,
147{
148 let options = options.unwrap_or_default();
149 validate_session_store_options(&options)?;
150 run_query_stream_messages_with_transport(stream, options, transport).await
151}
152
153async fn run_query_messages(prompt: String, options: ClaudeAgentOptions) -> Result<Vec<Message>> {
154 let transport_options = TransportOptions::from(&options);
156 let transport = SubprocessCLITransport::new(transport_options);
157 run_query_messages_with_transport(prompt, options, Box::new(transport)).await
158}
159
160async fn run_query_stream_messages<S>(
161 stream: S,
162 options: ClaudeAgentOptions,
163) -> Result<Vec<Message>>
164where
165 S: Stream<Item = serde_json::Value> + Unpin,
166{
167 let transport_options = TransportOptions::from(&options);
168 let transport = SubprocessCLITransport::new(transport_options);
169 run_query_stream_messages_with_transport(stream, options, Box::new(transport)).await
170}
171
172async fn run_query_messages_with_transport(
173 prompt: String,
174 options: ClaudeAgentOptions,
175 mut transport: Box<dyn Transport>,
176) -> Result<Vec<Message>> {
177 let control_callbacks = ControlCallbacks::from_options(&options);
178 let mut transcript_mirror = TranscriptMirrorBatcher::from_options(&options);
179
180 transport.connect().await?;
182 send_control_request_with_callbacks_and_timeout(
183 transport.as_mut(),
184 initialize_request(&control_callbacks),
185 &control_callbacks,
186 initialize_timeout_duration(),
187 )
188 .await?;
189
190 let user_message = serde_json::json!({
192 "type": "user",
193 "session_id": "",
194 "message": {
195 "role": "user",
196 "content": prompt
197 },
198 "parent_tool_use_id": null
199 });
200
201 transport
202 .write(format!("{}\n", user_message).as_bytes())
203 .await?;
204
205 let mut messages = Vec::new();
206
207 while let Some(data) = transport.read().await? {
209 let line = String::from_utf8_lossy(&data);
210 let value = serde_json::from_slice::<serde_json::Value>(&data)?;
211 if value.get("type").and_then(|v| v.as_str()) == Some("control_request") {
212 respond_to_control_request(transport.as_mut(), &value, &control_callbacks).await?;
213 continue;
214 }
215 if value.get("type").and_then(|v| v.as_str()) == Some("transcript_mirror") {
216 if let Some(batcher) = &mut transcript_mirror {
217 messages.extend(batcher.enqueue_value(&value).await?);
218 }
219 continue;
220 }
221 match parse_message_line(&line)? {
222 Some(message @ Message::ResultMsg { .. }) => {
223 flush_transcript_mirror(&mut transcript_mirror).await?;
224 messages.push(message);
225 break;
226 }
227 Some(message) => {
228 messages.push(message);
229 }
230 None => {}
231 }
232 }
233
234 flush_transcript_mirror(&mut transcript_mirror).await?;
236 transport.close().await?;
237
238 Ok(messages)
239}
240
241async fn run_query_stream_messages_with_transport<S>(
242 mut stream: S,
243 options: ClaudeAgentOptions,
244 mut transport: Box<dyn Transport>,
245) -> Result<Vec<Message>>
246where
247 S: Stream<Item = serde_json::Value> + Unpin,
248{
249 let control_callbacks = ControlCallbacks::from_options(&options);
250 let mut transcript_mirror = TranscriptMirrorBatcher::from_options(&options);
251
252 transport.connect().await?;
253 send_control_request_with_callbacks_and_timeout(
254 transport.as_mut(),
255 initialize_request(&control_callbacks),
256 &control_callbacks,
257 initialize_timeout_duration(),
258 )
259 .await?;
260
261 while let Some(mut message) = stream.next().await {
262 if let Some(object) = message.as_object_mut() {
263 object
264 .entry("session_id")
265 .or_insert_with(|| serde_json::Value::String("default".to_string()));
266 }
267 let mut json_payload = serde_json::to_vec(&message)?;
268 json_payload.push(b'\n');
269 transport.write(&json_payload).await?;
270 }
271 transport.close_input().await?;
272
273 let mut messages = Vec::new();
274 while let Some(data) = transport.read().await? {
275 let line = String::from_utf8_lossy(&data);
276 let value = serde_json::from_slice::<serde_json::Value>(&data)?;
277 if value.get("type").and_then(|v| v.as_str()) == Some("control_request") {
278 respond_to_control_request(transport.as_mut(), &value, &control_callbacks).await?;
279 continue;
280 }
281 if value.get("type").and_then(|v| v.as_str()) == Some("transcript_mirror") {
282 if let Some(batcher) = &mut transcript_mirror {
283 messages.extend(batcher.enqueue_value(&value).await?);
284 }
285 continue;
286 }
287 match parse_message_line(&line)? {
288 Some(message @ Message::ResultMsg { .. }) => {
289 flush_transcript_mirror(&mut transcript_mirror).await?;
290 messages.push(message);
291 break;
292 }
293 Some(message) => {
294 messages.push(message);
295 }
296 None => {}
297 }
298 }
299
300 flush_transcript_mirror(&mut transcript_mirror).await?;
301 transport.close().await?;
302
303 Ok(messages)
304}
305
306fn summarize_messages(messages: Vec<Message>) -> QueryResult {
307 let mut content_parts: Vec<String> = Vec::new();
308 let mut usage: Option<TokenUsage> = None;
309 let mut finish_reason = String::from("unknown");
310
311 for message in messages {
312 match message {
313 Message::AssistantMsg { content, .. } => {
314 for block in &content.content {
315 if let crate::types::ContentBlock::Text { text } = block {
316 content_parts.push(text.clone());
317 }
318 }
319 }
320 Message::ResultMsg {
321 usage: msg_usage,
322 stop_reason,
323 result,
324 ..
325 } => {
326 if let Some(result_text) = result {
327 if content_parts.is_empty() {
328 content_parts.push(result_text);
329 }
330 }
331 if let Some(u) = msg_usage {
332 usage = extract_token_usage(&u);
333 }
334 if let Some(reason) = stop_reason {
335 finish_reason = reason;
336 }
337 }
338 _ => {}
339 }
340 }
341
342 QueryResult {
343 content: content_parts.join(""),
344 usage,
345 finish_reason,
346 }
347}
348
349async fn flush_transcript_mirror(
350 transcript_mirror: &mut Option<TranscriptMirrorBatcher>,
351) -> Result<()> {
352 if let Some(batcher) = transcript_mirror {
353 let _ = batcher.flush().await?;
354 }
355 Ok(())
356}
357
358fn extract_token_usage(
360 usage_map: &serde_json::Map<String, serde_json::Value>,
361) -> Option<TokenUsage> {
362 let input_tokens = usage_map
363 .get("input_tokens")
364 .and_then(|v| v.as_i64())
365 .map(|v| v as i32)?;
366 let output_tokens = usage_map
367 .get("output_tokens")
368 .and_then(|v| v.as_i64())
369 .map(|v| v as i32)?;
370 let total_tokens = usage_map
371 .get("total_tokens")
372 .and_then(|v| v.as_i64())
373 .map(|v| v as i32)?;
374
375 Some(TokenUsage {
376 input_tokens,
377 output_tokens,
378 total_tokens,
379 })
380}