1use crate::{
7 errors::Result,
8 transport::InputMessage,
9 types::{ClaudeCodeOptions, Message, PermissionMode},
10};
11use futures::stream::Stream;
12use std::pin::Pin;
13use tokio::sync::mpsc;
14use tokio_stream::wrappers::ReceiverStream;
15use tracing::{debug, info, warn};
16
17pub enum QueryInput {
19 Text(String),
21 Stream(Pin<Box<dyn Stream<Item = InputMessage> + Send>>),
23}
24
25impl From<String> for QueryInput {
26 fn from(s: String) -> Self {
27 QueryInput::Text(s)
28 }
29}
30
31impl From<&str> for QueryInput {
32 fn from(s: &str) -> Self {
33 QueryInput::Text(s.to_string())
34 }
35}
36
37pub async fn query(
116 prompt: impl Into<QueryInput>,
117 options: Option<ClaudeCodeOptions>,
118) -> Result<impl Stream<Item = Result<Message>>> {
119 let options = options.unwrap_or_default();
120 let prompt = prompt.into();
121
122 match prompt {
123 QueryInput::Text(text) => {
124 query_print_mode(text, options).await
126 }
127 QueryInput::Stream(_stream) => {
128 Err(crate::SdkError::NotSupported {
131 feature: "Streaming input mode not yet implemented".into(),
132 })
133 }
134 }
135}
136
137#[allow(deprecated)]
139async fn query_print_mode(
140 prompt: String,
141 options: ClaudeCodeOptions,
142) -> Result<impl Stream<Item = Result<Message>>> {
143 use std::sync::Arc;
144 use tokio::io::{AsyncBufReadExt, BufReader};
145 use tokio::process::Command;
146 use tokio::sync::Mutex;
147
148 let cli_path = crate::transport::subprocess::find_claude_cli()?;
149 let mut cmd = Command::new(&cli_path);
150
151 cmd.arg("--output-format").arg("stream-json");
153 cmd.arg("--verbose");
154
155 if let Some(ref prompt_v2) = options.system_prompt_v2 {
159 match prompt_v2 {
160 crate::types::SystemPrompt::String(s) => {
161 cmd.arg("--system-prompt").arg(s);
162 }
163 crate::types::SystemPrompt::Preset { append, .. } => {
164 if let Some(append_text) = append {
165 cmd.arg("--append-system-prompt").arg(append_text);
166 }
167 }
168 }
169 } else {
170 #[allow(deprecated)]
171 match options.system_prompt.as_deref() {
172 Some(prompt) => {
173 cmd.arg("--system-prompt").arg(prompt);
174 }
175 None => {
176 cmd.arg("--system-prompt").arg("");
177 }
178 }
179
180 #[allow(deprecated)]
181 if let Some(ref append_prompt) = options.append_system_prompt {
182 cmd.arg("--append-system-prompt").arg(append_prompt);
183 }
184 }
185
186 if !options.allowed_tools.is_empty() {
187 cmd.arg("--allowedTools")
188 .arg(options.allowed_tools.join(","));
189 }
190
191 if let Some(max_turns) = options.max_turns {
192 cmd.arg("--max-turns").arg(max_turns.to_string());
193 }
194
195 if let Some(ref thinking) = options.thinking {
197 match thinking {
198 crate::types::ThinkingConfig::Enabled { budget_tokens } => {
199 cmd.arg("--max-thinking-tokens")
200 .arg(budget_tokens.to_string());
201 }
202 crate::types::ThinkingConfig::Disabled => {
203 }
205 crate::types::ThinkingConfig::Adaptive => {
206 }
208 }
209 } else if let Some(max_thinking_tokens) = options.max_thinking_tokens {
210 if max_thinking_tokens > 0 {
211 cmd.arg("--max-thinking-tokens")
212 .arg(max_thinking_tokens.to_string());
213 }
214 }
215
216 if !options.disallowed_tools.is_empty() {
217 cmd.arg("--disallowedTools")
218 .arg(options.disallowed_tools.join(","));
219 }
220
221 if let Some(ref model) = options.model {
222 cmd.arg("--model").arg(model);
223 }
224
225 if let Some(ref tool_name) = options.permission_prompt_tool_name {
226 cmd.arg("--permission-prompt-tool").arg(tool_name);
227 }
228
229 match options.permission_mode {
230 PermissionMode::Default => {
231 cmd.arg("--permission-mode").arg("default");
232 }
233 PermissionMode::AcceptEdits => {
234 cmd.arg("--permission-mode").arg("acceptEdits");
235 }
236 PermissionMode::Plan => {
237 cmd.arg("--permission-mode").arg("plan");
238 }
239 PermissionMode::BypassPermissions => {
240 cmd.arg("--permission-mode").arg("bypassPermissions");
241 }
242 PermissionMode::DontAsk => {
243 cmd.arg("--permission-mode").arg("dontAsk");
244 }
245 }
246
247 if options.continue_conversation {
248 cmd.arg("--continue");
249 }
250
251 if let Some(ref resume_id) = options.resume {
252 cmd.arg("--resume").arg(resume_id);
253 }
254
255 if !options.mcp_servers.is_empty() {
256 let mcp_config = serde_json::json!({
257 "mcpServers": options.mcp_servers
258 });
259 cmd.arg("--mcp-config").arg(mcp_config.to_string());
260 }
261
262 if let Some(ref format) = options.output_format {
264 if format.get("type").and_then(|v| v.as_str()) == Some("json_schema") {
265 if let Some(schema) = format.get("schema") {
266 if let Ok(schema_json) = serde_json::to_string(schema) {
267 cmd.arg("--json-schema").arg(schema_json);
268 }
269 }
270 }
271 }
272
273 for (key, value) in &options.extra_args {
275 let flag = if key.starts_with("--") || key.starts_with("-") {
276 key.clone()
277 } else {
278 format!("--{key}")
279 };
280 cmd.arg(&flag);
281 if let Some(val) = value {
282 cmd.arg(val);
283 }
284 }
285
286 if let Some(ref effort) = options.effort {
288 cmd.arg("--effort").arg(effort.to_string());
289 }
290
291 cmd.arg("--print").arg("--").arg(&prompt);
293
294 cmd.stdout(std::process::Stdio::piped())
296 .stderr(std::process::Stdio::piped());
297
298 if let Some(max_tokens) = options.max_output_tokens {
301 let capped = max_tokens.clamp(1, 32000);
303 cmd.env("CLAUDE_CODE_MAX_OUTPUT_TOKENS", capped.to_string());
304 debug!("Setting max_output_tokens from option: {}", capped);
305 } else {
306 if let Ok(current_value) = std::env::var("CLAUDE_CODE_MAX_OUTPUT_TOKENS") {
308 if let Ok(tokens) = current_value.parse::<u32>() {
309 if tokens > 32000 {
310 warn!("CLAUDE_CODE_MAX_OUTPUT_TOKENS={} exceeds maximum safe value of 32000, overriding to 32000", tokens);
311 cmd.env("CLAUDE_CODE_MAX_OUTPUT_TOKENS", "32000");
312 }
313 } else {
314 warn!("Invalid CLAUDE_CODE_MAX_OUTPUT_TOKENS value: {}, setting to 8192", current_value);
315 cmd.env("CLAUDE_CODE_MAX_OUTPUT_TOKENS", "8192");
316 }
317 }
318 }
319
320 info!("Starting Claude CLI with --print mode");
321 debug!("Command: {:?}", cmd);
322
323 if let Some(user) = options.user.as_deref() {
324 crate::transport::subprocess::apply_process_user(&mut cmd, user)?;
325 }
326
327 let mut child = cmd.spawn().map_err(crate::SdkError::ProcessError)?;
328
329 let stdout = child
330 .stdout
331 .take()
332 .ok_or_else(|| crate::SdkError::ConnectionError("Failed to get stdout".into()))?;
333 let stderr = child
334 .stderr
335 .take()
336 .ok_or_else(|| crate::SdkError::ConnectionError("Failed to get stderr".into()))?;
337
338 let child = Arc::new(Mutex::new(child));
340 let child_clone = Arc::clone(&child);
341
342 let (tx, rx) = mpsc::channel(100);
344
345 tokio::spawn(async move {
347 let reader = BufReader::new(stderr);
348 let mut lines = reader.lines();
349 while let Ok(Some(line)) = lines.next_line().await {
350 if !line.trim().is_empty() {
351 debug!("Claude stderr: {}", line);
352 }
353 }
354 });
355
356 let tx_cleanup = tx.clone();
358
359 tokio::spawn(async move {
361 let reader = BufReader::new(stdout);
362 let mut lines = reader.lines();
363
364 while let Ok(Some(line)) = lines.next_line().await {
365 if line.trim().is_empty() {
366 continue;
367 }
368
369 debug!("Claude output: {}", line);
370
371 match serde_json::from_str::<serde_json::Value>(&line) {
373 Ok(json) => {
374 match crate::message_parser::parse_message(json) {
375 Ok(Some(message)) => {
376 if tx.send(Ok(message)).await.is_err() {
377 break;
378 }
379 }
380 Ok(None) => {
381 }
383 Err(e) => {
384 if tx.send(Err(e)).await.is_err() {
385 break;
386 }
387 }
388 }
389 }
390 Err(e) => {
391 debug!("Failed to parse JSON: {} - Line: {}", e, line);
392 }
393 }
394 }
395
396 let mut child = child_clone.lock().await;
398 match child.wait().await {
399 Ok(status) => {
400 if !status.success() {
401 let _ = tx
402 .send(Err(crate::SdkError::ProcessExited {
403 code: status.code(),
404 }))
405 .await;
406 }
407 }
408 Err(e) => {
409 let _ = tx.send(Err(crate::SdkError::ProcessError(e))).await;
410 }
411 }
412 });
413
414 tokio::spawn(async move {
416 tx_cleanup.closed().await;
418
419 let mut child = child.lock().await;
421 match child.try_wait() {
422 Ok(Some(_)) => {
423 debug!("Claude CLI process already exited");
425 }
426 Ok(None) => {
427 info!("Killing Claude CLI process on stream drop");
429 if let Err(e) = child.kill().await {
430 warn!("Failed to kill Claude CLI process: {}", e);
431 } else {
432 let _ = child.wait().await;
434 debug!("Claude CLI process killed and cleaned up");
435 }
436 }
437 Err(e) => {
438 warn!("Failed to check process status: {}", e);
439 }
440 }
441 });
442
443 Ok(ReceiverStream::new(rx))
445}
446
447#[cfg(test)]
448mod tests {
449 use super::*;
450
451 #[test]
452 fn test_query_input_from_string() {
453 let input: QueryInput = "Hello".into();
454 match input {
455 QueryInput::Text(s) => assert_eq!(s, "Hello"),
456 _ => panic!("Expected Text variant"),
457 }
458 }
459
460 #[test]
461 fn test_query_input_from_str() {
462 let input: QueryInput = "World".into();
463 match input {
464 QueryInput::Text(s) => assert_eq!(s, "World"),
465 _ => panic!("Expected Text variant"),
466 }
467 }
468
469 #[test]
470 fn test_extra_args_formatting() {
471 use std::collections::HashMap;
472
473 let mut extra_args = HashMap::new();
475 extra_args.insert("custom-flag".to_string(), Some("value".to_string()));
476 extra_args.insert("--already-dashed".to_string(), None);
477 extra_args.insert("-s".to_string(), Some("short".to_string()));
478
479 let options = ClaudeCodeOptions {
480 extra_args,
481 ..Default::default()
482 };
483
484 assert_eq!(options.extra_args.len(), 3);
486 assert!(options.extra_args.contains_key("custom-flag"));
487 assert!(options.extra_args.contains_key("--already-dashed"));
488 assert!(options.extra_args.contains_key("-s"));
489 }
490}