1use crate::{
7 errors::Result,
8 transport::InputMessage,
9 types::{ClaudeCodeOptions, Message, PermissionMode},
10};
11use futures::stream::Stream;
12use std::pin::Pin;
13use tokio::sync::mpsc;
14use tokio_stream::wrappers::ReceiverStream;
15use tracing::{debug, info, warn};
16
17pub enum QueryInput {
19 Text(String),
21 Stream(Pin<Box<dyn Stream<Item = InputMessage> + Send>>),
23}
24
25impl From<String> for QueryInput {
26 fn from(s: String) -> Self {
27 QueryInput::Text(s)
28 }
29}
30
31impl From<&str> for QueryInput {
32 fn from(s: &str) -> Self {
33 QueryInput::Text(s.to_string())
34 }
35}
36
37pub async fn query(
116 prompt: impl Into<QueryInput>,
117 options: Option<ClaudeCodeOptions>,
118) -> Result<impl Stream<Item = Result<Message>>> {
119 let options = options.unwrap_or_default();
120 let prompt = prompt.into();
121
122 match prompt {
123 QueryInput::Text(text) => {
124 query_print_mode(text, options).await
126 }
127 QueryInput::Stream(_stream) => {
128 Err(crate::SdkError::NotSupported {
131 feature: "Streaming input mode not yet implemented".into(),
132 })
133 }
134 }
135}
136
137#[allow(deprecated)]
139async fn query_print_mode(
140 prompt: String,
141 options: ClaudeCodeOptions,
142) -> Result<impl Stream<Item = Result<Message>>> {
143 use std::sync::Arc;
144 use tokio::io::{AsyncBufReadExt, BufReader};
145 use tokio::process::Command;
146 use tokio::sync::Mutex;
147
148 let cli_path = crate::transport::subprocess::find_claude_cli()?;
149 let mut cmd = Command::new(&cli_path);
150
151 cmd.arg("--output-format").arg("stream-json");
153 cmd.arg("--verbose");
154
155 if let Some(ref prompt_v2) = options.system_prompt_v2 {
159 match prompt_v2 {
160 crate::types::SystemPrompt::String(s) => {
161 cmd.arg("--system-prompt").arg(s);
162 }
163 crate::types::SystemPrompt::Preset { append, .. } => {
164 if let Some(append_text) = append {
165 cmd.arg("--append-system-prompt").arg(append_text);
166 }
167 }
168 }
169 } else {
170 #[allow(deprecated)]
171 match options.system_prompt.as_deref() {
172 Some(prompt) => {
173 cmd.arg("--system-prompt").arg(prompt);
174 }
175 None => {
176 cmd.arg("--system-prompt").arg("");
177 }
178 }
179
180 #[allow(deprecated)]
181 if let Some(ref append_prompt) = options.append_system_prompt {
182 cmd.arg("--append-system-prompt").arg(append_prompt);
183 }
184 }
185
186 if !options.allowed_tools.is_empty() {
187 cmd.arg("--allowedTools")
188 .arg(options.allowed_tools.join(","));
189 }
190
191 if let Some(max_turns) = options.max_turns {
192 cmd.arg("--max-turns").arg(max_turns.to_string());
193 }
194
195 if let Some(ref thinking) = options.thinking {
197 match thinking {
198 crate::types::ThinkingConfig::Enabled { budget_tokens } => {
199 cmd.arg("--max-thinking-tokens")
200 .arg(budget_tokens.to_string());
201 }
202 crate::types::ThinkingConfig::Disabled => {
203 }
205 crate::types::ThinkingConfig::Adaptive => {
206 }
208 }
209 } else if let Some(max_thinking_tokens) = options.max_thinking_tokens {
210 if max_thinking_tokens > 0 {
211 cmd.arg("--max-thinking-tokens")
212 .arg(max_thinking_tokens.to_string());
213 }
214 }
215
216 if let Some(ref tools) = options.tools {
218 match tools {
219 crate::types::ToolsConfig::List(list) => {
220 if list.is_empty() {
221 cmd.arg("--tools").arg("");
222 } else {
223 cmd.arg("--tools").arg(list.join(","));
224 }
225 }
226 crate::types::ToolsConfig::Preset(_) => {
227 cmd.arg("--tools").arg("default");
228 }
229 }
230 }
231
232 if !options.disallowed_tools.is_empty() {
233 cmd.arg("--disallowedTools")
234 .arg(options.disallowed_tools.join(","));
235 }
236
237 if let Some(ref model) = options.model {
238 cmd.arg("--model").arg(model);
239 }
240
241 if let Some(ref tool_name) = options.permission_prompt_tool_name {
242 cmd.arg("--permission-prompt-tool").arg(tool_name);
243 }
244
245 match options.permission_mode {
246 PermissionMode::Default => {
247 cmd.arg("--permission-mode").arg("default");
248 }
249 PermissionMode::AcceptEdits => {
250 cmd.arg("--permission-mode").arg("acceptEdits");
251 }
252 PermissionMode::Plan => {
253 cmd.arg("--permission-mode").arg("plan");
254 }
255 PermissionMode::BypassPermissions => {
256 cmd.arg("--permission-mode").arg("bypassPermissions");
257 }
258 PermissionMode::DontAsk => {
259 cmd.arg("--permission-mode").arg("dontAsk");
260 }
261 }
262
263 if options.continue_conversation {
264 cmd.arg("--continue");
265 }
266
267 if let Some(ref resume_id) = options.resume {
268 cmd.arg("--resume").arg(resume_id);
269 }
270
271 if !options.mcp_servers.is_empty() {
272 let mcp_config = serde_json::json!({
273 "mcpServers": options.mcp_servers
274 });
275 cmd.arg("--mcp-config").arg(mcp_config.to_string());
276 }
277
278 if let Some(ref format) = options.output_format {
280 if format.get("type").and_then(|v| v.as_str()) == Some("json_schema") {
281 if let Some(schema) = format.get("schema") {
282 if let Ok(schema_json) = serde_json::to_string(schema) {
283 cmd.arg("--json-schema").arg(schema_json);
284 }
285 }
286 }
287 }
288
289 for (key, value) in &options.extra_args {
291 let flag = if key.starts_with("--") || key.starts_with("-") {
292 key.clone()
293 } else {
294 format!("--{key}")
295 };
296 cmd.arg(&flag);
297 if let Some(val) = value {
298 cmd.arg(val);
299 }
300 }
301
302 if let Some(ref effort) = options.effort {
304 cmd.arg("--effort").arg(effort.to_string());
305 }
306
307 if let Some(ref sources) = options.setting_sources {
309 let source_strs: Vec<&str> = sources
310 .iter()
311 .map(|s| match s {
312 crate::types::SettingSource::User => "user",
313 crate::types::SettingSource::Project => "project",
314 crate::types::SettingSource::Local => "local",
315 })
316 .collect();
317 cmd.arg("--setting-sources").arg(source_strs.join(","));
318 }
319
320 cmd.arg("--print").arg("--").arg(&prompt);
322
323 cmd.stdin(std::process::Stdio::null())
326 .stdout(std::process::Stdio::piped())
327 .stderr(std::process::Stdio::piped());
328
329 if let Some(max_tokens) = options.max_output_tokens {
332 let capped = max_tokens.clamp(1, 32000);
334 cmd.env("CLAUDE_CODE_MAX_OUTPUT_TOKENS", capped.to_string());
335 debug!("Setting max_output_tokens from option: {}", capped);
336 } else {
337 if let Ok(current_value) = std::env::var("CLAUDE_CODE_MAX_OUTPUT_TOKENS") {
339 if let Ok(tokens) = current_value.parse::<u32>() {
340 if tokens > 32000 {
341 warn!("CLAUDE_CODE_MAX_OUTPUT_TOKENS={} exceeds maximum safe value of 32000, overriding to 32000", tokens);
342 cmd.env("CLAUDE_CODE_MAX_OUTPUT_TOKENS", "32000");
343 }
344 } else {
345 warn!("Invalid CLAUDE_CODE_MAX_OUTPUT_TOKENS value: {}, setting to 8192", current_value);
346 cmd.env("CLAUDE_CODE_MAX_OUTPUT_TOKENS", "8192");
347 }
348 }
349 }
350
351 for (key, value) in &options.env {
353 if value.is_empty() {
354 cmd.env_remove(key);
355 } else {
356 cmd.env(key, value);
357 }
358 }
359
360 info!("Starting Claude CLI with --print mode");
361 debug!("Command: {:?}", cmd);
362
363 if let Some(user) = options.user.as_deref() {
364 crate::transport::subprocess::apply_process_user(&mut cmd, user)?;
365 }
366
367 let mut child = cmd.spawn().map_err(crate::SdkError::ProcessError)?;
368 debug!("Claude CLI spawned with PID: {:?}", child.id());
369
370 let stdout = child
371 .stdout
372 .take()
373 .ok_or_else(|| crate::SdkError::ConnectionError("Failed to get stdout".into()))?;
374 let stderr = child
375 .stderr
376 .take()
377 .ok_or_else(|| crate::SdkError::ConnectionError("Failed to get stderr".into()))?;
378
379 let child = Arc::new(Mutex::new(child));
381 let child_clone = Arc::clone(&child);
382
383 let (tx, rx) = mpsc::channel(100);
385
386 tokio::spawn(async move {
388 let reader = BufReader::new(stderr);
389 let mut lines = reader.lines();
390 while let Ok(Some(line)) = lines.next_line().await {
391 if !line.trim().is_empty() {
392 debug!("Claude stderr: {}", line);
393 }
394 }
395 });
396
397 let tx_cleanup = tx.clone();
399
400 tokio::spawn(async move {
402 let reader = BufReader::new(stdout);
403 let mut lines = reader.lines();
404
405 while let Ok(Some(line)) = lines.next_line().await {
406 if line.trim().is_empty() {
407 continue;
408 }
409
410 debug!("Claude output: {}", line);
411
412 match serde_json::from_str::<serde_json::Value>(&line) {
414 Ok(json) => {
415 match crate::message_parser::parse_message(json) {
416 Ok(Some(message)) => {
417 if tx.send(Ok(message)).await.is_err() {
418 break;
419 }
420 }
421 Ok(None) => {
422 }
424 Err(e) => {
425 if tx.send(Err(e)).await.is_err() {
426 break;
427 }
428 }
429 }
430 }
431 Err(e) => {
432 debug!("Failed to parse JSON: {} - Line: {}", e, line);
433 }
434 }
435 }
436
437 let mut child = child_clone.lock().await;
439 match child.wait().await {
440 Ok(status) => {
441 if !status.success() {
442 let _ = tx
443 .send(Err(crate::SdkError::ProcessExited {
444 code: status.code(),
445 }))
446 .await;
447 }
448 }
449 Err(e) => {
450 let _ = tx.send(Err(crate::SdkError::ProcessError(e))).await;
451 }
452 }
453 });
454
455 tokio::spawn(async move {
457 tx_cleanup.closed().await;
459
460 let mut child = child.lock().await;
462 match child.try_wait() {
463 Ok(Some(_)) => {
464 debug!("Claude CLI process already exited");
466 }
467 Ok(None) => {
468 info!("Killing Claude CLI process on stream drop");
470 if let Err(e) = child.kill().await {
471 warn!("Failed to kill Claude CLI process: {}", e);
472 } else {
473 let _ = child.wait().await;
475 debug!("Claude CLI process killed and cleaned up");
476 }
477 }
478 Err(e) => {
479 warn!("Failed to check process status: {}", e);
480 }
481 }
482 });
483
484 Ok(ReceiverStream::new(rx))
486}
487
488#[cfg(test)]
489mod tests {
490 use super::*;
491
492 #[test]
493 fn test_query_input_from_string() {
494 let input: QueryInput = "Hello".into();
495 match input {
496 QueryInput::Text(s) => assert_eq!(s, "Hello"),
497 _ => panic!("Expected Text variant"),
498 }
499 }
500
501 #[test]
502 fn test_query_input_from_str() {
503 let input: QueryInput = "World".into();
504 match input {
505 QueryInput::Text(s) => assert_eq!(s, "World"),
506 _ => panic!("Expected Text variant"),
507 }
508 }
509
510 #[test]
511 fn test_extra_args_formatting() {
512 use std::collections::HashMap;
513
514 let mut extra_args = HashMap::new();
516 extra_args.insert("custom-flag".to_string(), Some("value".to_string()));
517 extra_args.insert("--already-dashed".to_string(), None);
518 extra_args.insert("-s".to_string(), Some("short".to_string()));
519
520 let options = ClaudeCodeOptions {
521 extra_args,
522 ..Default::default()
523 };
524
525 assert_eq!(options.extra_args.len(), 3);
527 assert!(options.extra_args.contains_key("custom-flag"));
528 assert!(options.extra_args.contains_key("--already-dashed"));
529 assert!(options.extra_args.contains_key("-s"));
530 }
531}