1use anyhow::Result;
2use tokio::io::{AsyncBufReadExt, BufReader};
3use tokio::time::{timeout, Duration};
4use tracing::{info, warn};
5
6use crate::query::builder::QueryBuilder;
7use crate::query::session::QueryHandle;
8use crate::transport::stream::{create_message_stream, MessageHandler, MessageStream};
9use crate::types::config::QueryOptions;
10use crate::types::message::{
11 MessageContent, MessageRole, SDKAssistantMessage, SDKMessage, SDKResultMessage,
12};
13
14pub async fn query(prompt: &str, options: QueryOptions) -> Result<QueryResult> {
30 info!("Executing query: {}", prompt);
31
32 let handle = QueryHandle::new(options.session_id.clone());
34 let session_id = handle.session_id().to_string();
35
36 let (handler, stream) = create_message_stream();
38
39 let handler_clone = handler.clone();
41 let prompt_owned = prompt.to_string();
42 tokio::spawn(async move {
43 if let Err(e) = run_cli_session(&session_id, &prompt_owned, options, &handler_clone).await {
44 warn!("CLI session error: {}", e);
45 let _ = handler_clone
46 .send_error(anyhow::anyhow!("CLI session error: {}", e))
47 .await;
48 }
49 handler_clone.close();
50 });
51
52 tracing::debug!("Query initialized with session: {}", handle.session_id());
53
54 Ok(QueryResult {
55 handle,
56 stream,
57 _handler: handler,
58 })
59}
60
61async fn run_cli_session(
63 session_id: &str,
64 prompt: &str,
65 options: QueryOptions,
66 handler: &MessageHandler,
67) -> Result<()> {
68 let executable_path = find_qwen_executable();
70
71 match executable_path {
72 Some(qwen_path) => {
73 info!("Found Qwen CLI at: {}", qwen_path);
74 run_real_cli_session(session_id, prompt, &options, qwen_path, handler).await
75 }
76 None => {
77 info!("Qwen CLI not found in PATH, using simulated response");
79 simulate_response(session_id, prompt, handler).await
80 }
81 }
82}
83
84async fn run_real_cli_session(
86 session_id: &str,
87 prompt: &str,
88 options: &QueryOptions,
89 executable_path: &str,
90 handler: &MessageHandler,
91) -> Result<()> {
92 let mut cmd = tokio::process::Command::new(executable_path);
94 cmd.kill_on_drop(true)
95 .stdin(std::process::Stdio::null()) .stdout(std::process::Stdio::piped())
97 .stderr(std::process::Stdio::piped());
98
99 cmd.arg(prompt);
101
102 if let Some(cwd) = &options.cwd {
103 cmd.current_dir(cwd);
104 }
105
106 if let Some(model) = &options.model {
107 cmd.arg("--model").arg(model);
108 }
109
110 if options.debug {
111 cmd.arg("--debug");
112 }
113
114 cmd.arg("--channel").arg("SDK");
116
117 let mut child = match cmd.spawn() {
118 Ok(c) => c,
119 Err(e) => {
120 warn!(
121 "Failed to spawn Qwen CLI: {}, falling back to simulation",
122 e
123 );
124 return simulate_response(session_id, prompt, handler).await;
125 }
126 };
127
128 info!("Qwen CLI spawned in one-shot mode, PID: {:?}", child.id());
129
130 let stdout = child.stdout.take().expect("Failed to open stdout");
131 let stderr = child.stderr.take().expect("Failed to open stderr");
132
133 let mut stdout_reader = BufReader::new(stdout);
135 let mut line = String::new();
136 let idle_timeout = Duration::from_secs(60); loop {
139 line.clear();
140 match timeout(idle_timeout, stdout_reader.read_line(&mut line)).await {
141 Ok(Ok(0)) => break, Ok(Ok(_)) => {
143 let line = line.trim().to_string();
144 if !line.is_empty() {
145 info!("CLI output: {}", line);
146 let assistant_msg = SDKMessage::Assistant(SDKAssistantMessage {
147 session_id: session_id.to_string(),
148 message: MessageContent {
149 role: MessageRole::Assistant,
150 content: line.clone(),
151 },
152 });
153 if let Err(e) = handler.send_message(assistant_msg).await {
154 warn!("Failed to send message: {}", e);
155 break;
156 }
157 }
158 }
159 Ok(Err(e)) => {
160 warn!("Error reading stdout: {}", e);
161 break;
162 }
163 Err(_) => {
164 info!(
165 "Idle timeout reached ({}s), ending session",
166 idle_timeout.as_secs()
167 );
168 break;
169 }
170 }
171 }
172
173 let _ = child.start_kill();
175
176 let result_msg = SDKMessage::Result(SDKResultMessage {
178 session_id: session_id.to_string(),
179 result: serde_json::json!({
180 "exit_code": 0,
181 "success": true,
182 "note": "Process completed (killed after idle timeout)"
183 }),
184 exit_code: 0,
185 });
186 handler.send_message(result_msg).await?;
187
188 let mut stderr_reader = BufReader::new(stderr);
190 let mut stderr_line = String::new();
191 loop {
192 stderr_line.clear();
193 match stderr_reader.read_line(&mut stderr_line).await {
194 Ok(0) => break,
195 Ok(_) => {
196 if !stderr_line.trim().is_empty() {
197 warn!("[stderr] {}", stderr_line.trim());
198 }
199 }
200 Err(_) => break,
201 }
202 }
203
204 Ok(())
205}
206
207async fn simulate_response(session_id: &str, prompt: &str, handler: &MessageHandler) -> Result<()> {
209 info!("Qwen CLI not found, simulating response");
210
211 let thinking_msg = SDKMessage::Assistant(SDKAssistantMessage {
213 session_id: session_id.to_string(),
214 message: MessageContent {
215 role: MessageRole::Assistant,
216 content: format!(
217 "[Simulated] Processing query: \"{}\"\n\nSince the QwenCode CLI is not installed, this is a simulated response. To get real responses, install the QwenCode CLI and ensure it's in your PATH.",
218 prompt
219 ),
220 },
221 });
222 handler.send_message(thinking_msg).await?;
223
224 let result_msg = SDKMessage::Result(SDKResultMessage {
226 session_id: session_id.to_string(),
227 result: serde_json::json!({
228 "status": "simulated",
229 "note": "Install QwenCode CLI for real responses"
230 }),
231 exit_code: 0,
232 });
233 handler.send_message(result_msg).await?;
234
235 Ok(())
236}
237
238fn find_qwen_executable() -> Option<&'static str> {
240 ["qwen", "qwen-code"]
242 .iter()
243 .find(|&name| which(name).is_some())
244 .copied()
245}
246
247fn which(executable: &str) -> Option<std::path::PathBuf> {
249 std::env::var_os("PATH").and_then(|paths| {
250 std::env::split_paths(&paths)
251 .filter_map(|dir| {
252 let full_path = dir.join(executable);
253 if full_path.is_file() {
254 Some(full_path)
255 } else {
256 None
257 }
258 })
259 .next()
260 })
261}
262
263pub struct QueryResult {
265 handle: QueryHandle,
266 stream: MessageStream,
267 _handler: MessageHandler,
268}
269
270impl QueryResult {
271 pub fn handle(&self) -> &QueryHandle {
273 &self.handle
274 }
275
276 pub fn stream(&self) -> &MessageStream {
278 &self.stream
279 }
280
281 pub async fn next_message(&self) -> Option<Result<SDKMessage>> {
283 self.stream.next_message().await
284 }
285
286 pub async fn close(mut self) -> Result<()> {
288 self.handle.close().await
289 }
290}
291
292pub fn query_builder() -> QueryBuilder {
306 QueryBuilder::new()
307}
308
309#[cfg(test)]
310mod tests {
311 use super::*;
312
313 #[tokio::test]
314 async fn test_query_initial_state() {
315 let result = query("Test prompt", QueryOptions::default()).await.unwrap();
316
317 assert!(!result.handle().is_closed());
318 assert!(!result.stream().is_closed());
319 }
320
321 #[tokio::test]
322 async fn test_query_session_id() {
323 let options = QueryOptions {
324 session_id: Some("custom-session".to_string()),
325 ..Default::default()
326 };
327
328 let result = query("Test", options).await.unwrap();
329 assert_eq!(result.handle().session_id(), "custom-session");
330 }
331
332 #[tokio::test]
333 async fn test_query_close() {
334 let result = query("Test", QueryOptions::default()).await.unwrap();
335
336 let _session_id = result.handle().session_id().to_string();
337 result.close().await.unwrap();
338
339 }
342
343 #[test]
344 fn test_query_builder_function() {
345 let builder = query_builder();
346 assert!(builder.prompt.is_none());
347 }
348
349 #[tokio::test]
350 async fn test_query_with_custom_options() {
351 let options = QueryOptions {
352 model: Some("qwen-plus".to_string()),
353 debug: true,
354 max_session_turns: 10,
355 ..Default::default()
356 };
357
358 let result = query("Test with options", options).await.unwrap();
359 assert!(!result.handle().is_closed());
360 }
361}