Skip to main content

qwencode_rs/query/
handler.rs

1use anyhow::Result;
2use tokio::io::{AsyncBufReadExt, BufReader};
3use tokio::time::{timeout, Duration};
4use tracing::{info, warn};
5
6use crate::query::builder::QueryBuilder;
7use crate::query::session::QueryHandle;
8use crate::transport::stream::{create_message_stream, MessageHandler, MessageStream};
9use crate::types::config::QueryOptions;
10use crate::types::message::{
11    MessageContent, MessageRole, SDKAssistantMessage, SDKMessage, SDKResultMessage,
12};
13
14/// Main query function - executes a query against QwenCode CLI
15///
16/// # Example
17/// ```ignore
18/// use qwencode_rs::query;
19///
20/// let result = query("What files are in the current directory?", QueryOptions::default()).await?;
21/// while let Some(msg) = result.next_message().await {
22///     match msg {
23///         Ok(SDKMessage::Assistant(a)) => println!("Assistant: {}", a.message.content),
24///         Ok(SDKMessage::Result(r)) => println!("Result: {:?}", r.result),
25///         Err(e) => eprintln!("Error: {}", e),
26///     }
27/// }
28/// ```
29pub async fn query(prompt: &str, options: QueryOptions) -> Result<QueryResult> {
30    info!("Executing query: {}", prompt);
31
32    // Create a session handle
33    let handle = QueryHandle::new(options.session_id.clone());
34    let session_id = handle.session_id().to_string();
35
36    // Create message stream
37    let (handler, stream) = create_message_stream();
38
39    // Spawn task to handle CLI communication
40    let handler_clone = handler.clone();
41    let prompt_owned = prompt.to_string();
42    tokio::spawn(async move {
43        if let Err(e) = run_cli_session(&session_id, &prompt_owned, options, &handler_clone).await {
44            warn!("CLI session error: {}", e);
45            let _ = handler_clone
46                .send_error(anyhow::anyhow!("CLI session error: {}", e))
47                .await;
48        }
49        handler_clone.close();
50    });
51
52    tracing::debug!("Query initialized with session: {}", handle.session_id());
53
54    Ok(QueryResult {
55        handle,
56        stream,
57        _handler: handler,
58    })
59}
60
61/// Run a CLI session: spawn process, send prompt, read responses
62async fn run_cli_session(
63    session_id: &str,
64    prompt: &str,
65    options: QueryOptions,
66    handler: &MessageHandler,
67) -> Result<()> {
68    // Try to find qwen executable
69    let executable_path = find_qwen_executable();
70
71    match executable_path {
72        Some(qwen_path) => {
73            info!("Found Qwen CLI at: {}", qwen_path);
74            run_real_cli_session(session_id, prompt, &options, qwen_path, handler).await
75        }
76        None => {
77            // If qwen is not available, simulate a response
78            info!("Qwen CLI not found in PATH, using simulated response");
79            simulate_response(session_id, prompt, handler).await
80        }
81    }
82}
83
84/// Run a real CLI session with the qwen executable
85async fn run_real_cli_session(
86    session_id: &str,
87    prompt: &str,
88    options: &QueryOptions,
89    executable_path: &str,
90    handler: &MessageHandler,
91) -> Result<()> {
92    // Spawn the QwenCode CLI process in one-shot mode (non-interactive)
93    let mut cmd = tokio::process::Command::new(executable_path);
94    cmd.kill_on_drop(true)
95        .stdin(std::process::Stdio::null()) // No stdin needed for one-shot mode
96        .stdout(std::process::Stdio::piped())
97        .stderr(std::process::Stdio::piped());
98
99    // Add the prompt as a positional argument (one-shot mode)
100    cmd.arg(prompt);
101
102    if let Some(cwd) = &options.cwd {
103        cmd.current_dir(cwd);
104    }
105
106    if let Some(model) = &options.model {
107        cmd.arg("--model").arg(model);
108    }
109
110    if options.debug {
111        cmd.arg("--debug");
112    }
113
114    // Set channel to SDK for better identification
115    cmd.arg("--channel").arg("SDK");
116
117    let mut child = match cmd.spawn() {
118        Ok(c) => c,
119        Err(e) => {
120            warn!(
121                "Failed to spawn Qwen CLI: {}, falling back to simulation",
122                e
123            );
124            return simulate_response(session_id, prompt, handler).await;
125        }
126    };
127
128    info!("Qwen CLI spawned in one-shot mode, PID: {:?}", child.id());
129
130    let stdout = child.stdout.take().expect("Failed to open stdout");
131    let stderr = child.stderr.take().expect("Failed to open stderr");
132
133    // Read stdout line by line with timeout
134    let mut stdout_reader = BufReader::new(stdout);
135    let mut line = String::new();
136    let idle_timeout = Duration::from_secs(60); // 60s timeout for one-shot mode
137
138    loop {
139        line.clear();
140        match timeout(idle_timeout, stdout_reader.read_line(&mut line)).await {
141            Ok(Ok(0)) => break, // EOF
142            Ok(Ok(_)) => {
143                let line = line.trim().to_string();
144                if !line.is_empty() {
145                    info!("CLI output: {}", line);
146                    let assistant_msg = SDKMessage::Assistant(SDKAssistantMessage {
147                        session_id: session_id.to_string(),
148                        message: MessageContent {
149                            role: MessageRole::Assistant,
150                            content: line.clone(),
151                        },
152                    });
153                    if let Err(e) = handler.send_message(assistant_msg).await {
154                        warn!("Failed to send message: {}", e);
155                        break;
156                    }
157                }
158            }
159            Ok(Err(e)) => {
160                warn!("Error reading stdout: {}", e);
161                break;
162            }
163            Err(_) => {
164                info!(
165                    "Idle timeout reached ({}s), ending session",
166                    idle_timeout.as_secs()
167                );
168                break;
169            }
170        }
171    }
172
173    // Kill the child process if still running
174    let _ = child.start_kill();
175
176    // Send result message
177    let result_msg = SDKMessage::Result(SDKResultMessage {
178        session_id: session_id.to_string(),
179        result: serde_json::json!({
180            "exit_code": 0,
181            "success": true,
182            "note": "Process completed (killed after idle timeout)"
183        }),
184        exit_code: 0,
185    });
186    handler.send_message(result_msg).await?;
187
188    // Log stderr if any
189    let mut stderr_reader = BufReader::new(stderr);
190    let mut stderr_line = String::new();
191    loop {
192        stderr_line.clear();
193        match stderr_reader.read_line(&mut stderr_line).await {
194            Ok(0) => break,
195            Ok(_) => {
196                if !stderr_line.trim().is_empty() {
197                    warn!("[stderr] {}", stderr_line.trim());
198                }
199            }
200            Err(_) => break,
201        }
202    }
203
204    Ok(())
205}
206
207/// Simulate a response when qwen CLI is not available
208async fn simulate_response(session_id: &str, prompt: &str, handler: &MessageHandler) -> Result<()> {
209    info!("Qwen CLI not found, simulating response");
210
211    // Simulate assistant thinking
212    let thinking_msg = SDKMessage::Assistant(SDKAssistantMessage {
213        session_id: session_id.to_string(),
214        message: MessageContent {
215            role: MessageRole::Assistant,
216            content: format!(
217                "[Simulated] Processing query: \"{}\"\n\nSince the QwenCode CLI is not installed, this is a simulated response. To get real responses, install the QwenCode CLI and ensure it's in your PATH.",
218                prompt
219            ),
220        },
221    });
222    handler.send_message(thinking_msg).await?;
223
224    // Send result
225    let result_msg = SDKMessage::Result(SDKResultMessage {
226        session_id: session_id.to_string(),
227        result: serde_json::json!({
228            "status": "simulated",
229            "note": "Install QwenCode CLI for real responses"
230        }),
231        exit_code: 0,
232    });
233    handler.send_message(result_msg).await?;
234
235    Ok(())
236}
237
238/// Find the qwen executable in PATH
239fn find_qwen_executable() -> Option<&'static str> {
240    // Try common executable names
241    ["qwen", "qwen-code"]
242        .iter()
243        .find(|&name| which(name).is_some())
244        .copied()
245}
246
247/// Check if an executable exists in PATH
248fn which(executable: &str) -> Option<std::path::PathBuf> {
249    std::env::var_os("PATH").and_then(|paths| {
250        std::env::split_paths(&paths)
251            .filter_map(|dir| {
252                let full_path = dir.join(executable);
253                if full_path.is_file() {
254                    Some(full_path)
255                } else {
256                    None
257                }
258            })
259            .next()
260    })
261}
262
263/// Query result containing session handle and message stream
264pub struct QueryResult {
265    handle: QueryHandle,
266    stream: MessageStream,
267    _handler: MessageHandler,
268}
269
270impl QueryResult {
271    /// Get the session handle
272    pub fn handle(&self) -> &QueryHandle {
273        &self.handle
274    }
275
276    /// Get the message stream
277    pub fn stream(&self) -> &MessageStream {
278        &self.stream
279    }
280
281    /// Get the next message from the stream
282    pub async fn next_message(&self) -> Option<Result<SDKMessage>> {
283        self.stream.next_message().await
284    }
285
286    /// Close the query
287    pub async fn close(mut self) -> Result<()> {
288        self.handle.close().await
289    }
290}
291
292/// Query builder function for fluent API
293///
294/// # Example
295/// ```ignore
296/// use qwencode_rs::query_builder;
297///
298/// let result = query_builder()
299///     .prompt("Hello")
300///     .model("qwen-max")
301///     .debug(true)
302///     .execute()
303///     .await?;
304/// ```
305pub fn query_builder() -> QueryBuilder {
306    QueryBuilder::new()
307}
308
309#[cfg(test)]
310mod tests {
311    use super::*;
312
313    #[tokio::test]
314    async fn test_query_initial_state() {
315        let result = query("Test prompt", QueryOptions::default()).await.unwrap();
316
317        assert!(!result.handle().is_closed());
318        assert!(!result.stream().is_closed());
319    }
320
321    #[tokio::test]
322    async fn test_query_session_id() {
323        let options = QueryOptions {
324            session_id: Some("custom-session".to_string()),
325            ..Default::default()
326        };
327
328        let result = query("Test", options).await.unwrap();
329        assert_eq!(result.handle().session_id(), "custom-session");
330    }
331
332    #[tokio::test]
333    async fn test_query_close() {
334        let result = query("Test", QueryOptions::default()).await.unwrap();
335
336        let _session_id = result.handle().session_id().to_string();
337        result.close().await.unwrap();
338
339        // Session should be closed
340        // Note: We can't check handle directly after move, but close should succeed
341    }
342
343    #[test]
344    fn test_query_builder_function() {
345        let builder = query_builder();
346        assert!(builder.prompt.is_none());
347    }
348
349    #[tokio::test]
350    async fn test_query_with_custom_options() {
351        let options = QueryOptions {
352            model: Some("qwen-plus".to_string()),
353            debug: true,
354            max_session_turns: 10,
355            ..Default::default()
356        };
357
358        let result = query("Test with options", options).await.unwrap();
359        assert!(!result.handle().is_closed());
360    }
361}