Skip to main content

qwencode_rs/query/
executor.rs

1use anyhow::{Context, Result};
2use tokio_util::sync::CancellationToken;
3use tracing::{debug, error, info, warn};
4
5use crate::transport::communication::{protocol_to_sdk_message, spawn_cli_process, CLIRequest};
6use crate::transport::stream::{create_message_stream, MessageStream};
7use crate::types::config::QueryOptions;
8use crate::types::message::SDKMessage;
9
10/// Execute a query against the QwenCode CLI with full process communication
11///
12/// This spawns the CLI process, sends the query via stdin, and reads
13/// responses from stdout into a message stream.
14pub async fn execute_query(prompt: &str, options: QueryOptions) -> Result<QueryResultWithCLI> {
15    info!("Executing query with CLI communication: {}", prompt);
16
17    // Spawn CLI process
18    let process = spawn_cli_process(options.path_to_qwen_executable.as_deref())
19        .await
20        .context("Failed to spawn CLI process")?;
21
22    // Create message stream
23    let (handler, stream) = create_message_stream();
24    let cancel_token = CancellationToken::new();
25
26    // Initialize connection (best effort)
27    let mut process = process;
28    let cancel_token_init = cancel_token.clone();
29    match process.initialize(&cancel_token_init).await {
30        Ok(response) => {
31            debug!(
32                "CLI initialized: protocol_version={}, streaming={}",
33                response.protocol_version, response.capabilities.streaming
34            );
35        }
36        Err(e) => {
37            warn!(
38                "CLI initialization failed (expected if CLI doesn't support init): {}",
39                e
40            );
41        }
42    }
43
44    // Create request
45    let request = CLIRequest {
46        request_type: "query".to_string(),
47        prompt: prompt.to_string(),
48        session_id: options.session_id.clone(),
49        options: options.clone(),
50    };
51
52    // Send query
53    process
54        .send_query(&request)
55        .await
56        .context("Failed to send query")?;
57
58    // Spawn message reader task
59    let handler_for_task = handler;
60    let cancel_token_task = cancel_token.clone();
61
62    tokio::spawn(async move {
63        let mut process = process;
64        let handler = handler_for_task;
65
66        loop {
67            tokio::select! {
68                _ = cancel_token_task.cancelled() => {
69                    debug!("Query cancelled");
70                    break;
71                }
72                result = process.read_message() => {
73                    match result {
74                        Ok(Some(msg)) => {
75                            match protocol_to_sdk_message(&msg) {
76                                Ok(Some(sdk_msg)) => {
77                                    if let Err(e) = handler.send_message(sdk_msg).await {
78                                        error!("Failed to send message to stream: {}", e);
79                                    }
80                                }
81                                Ok(None) => {
82                                    debug!("Message filtered out");
83                                }
84                                Err(e) => {
85                                    error!("Error converting message: {}", e);
86                                    if let Err(e) = handler.send_error(e).await {
87                                        error!("Failed to send error to stream: {}", e);
88                                    }
89                                }
90                            }
91                        }
92                        Ok(None) => {
93                            debug!("CLI process exited");
94                            break;
95                        }
96                        Err(e) => {
97                            error!("Error reading from CLI: {}", e);
98                            if let Err(e) = handler.send_error(e).await {
99                                error!("Failed to send error to stream: {}", e);
100                            }
101                            break;
102                        }
103                    }
104                }
105            }
106        }
107
108        // Cleanup
109        if let Err(e) = process.shutdown().await {
110            warn!("Error during CLI shutdown: {}", e);
111        }
112    });
113
114    // Create query handle
115    let handle = crate::query::session::QueryHandle::new(options.session_id.clone());
116
117    Ok(QueryResultWithCLI {
118        handle,
119        stream,
120        cancel_token,
121    })
122}
123
124/// Query result containing session handle and message stream (with CLI process)
125pub struct QueryResultWithCLI {
126    pub handle: crate::query::session::QueryHandle,
127    pub stream: MessageStream,
128    pub cancel_token: CancellationToken,
129}
130
131impl QueryResultWithCLI {
132    /// Get the session handle
133    pub fn handle(&self) -> &crate::query::session::QueryHandle {
134        &self.handle
135    }
136
137    /// Get the message stream
138    pub fn stream(&self) -> &MessageStream {
139        &self.stream
140    }
141
142    /// Get the next message from the stream
143    pub async fn next_message(&self) -> Option<Result<SDKMessage>> {
144        self.stream.next_message().await
145    }
146
147    /// Cancel the query
148    pub fn cancel(&self) {
149        self.cancel_token.cancel();
150    }
151
152    /// Close the query
153    pub async fn close(self) -> Result<()> {
154        self.cancel_token.cancel();
155        let mut this = self;
156        this.handle.close().await
157    }
158}
159
160#[cfg(test)]
161mod tests {
162    use super::*;
163
164    #[test]
165    fn test_query_result_with_cli_creation() {
166        // Test QueryResultWithCLI can be created (without actual CLI)
167        let (handler, stream) = create_message_stream();
168        let handle = crate::query::session::QueryHandle::new(None);
169        let cancel_token = CancellationToken::new();
170
171        let result = QueryResultWithCLI {
172            handle,
173            stream,
174            cancel_token: cancel_token.clone(),
175        };
176
177        assert!(!result.handle().is_closed());
178        // Keep handler alive to prevent stream from closing
179        drop(handler);
180    }
181
182    #[tokio::test]
183    async fn test_query_result_cancel() {
184        let (_, stream) = create_message_stream();
185        let handle = crate::query::session::QueryHandle::new(None);
186        let cancel_token = CancellationToken::new();
187
188        let result = QueryResultWithCLI {
189            handle,
190            stream,
191            cancel_token: cancel_token.clone(),
192        };
193
194        result.cancel();
195        assert!(cancel_token.is_cancelled());
196    }
197
198    #[tokio::test]
199    async fn test_query_result_close() {
200        let (_, stream) = create_message_stream();
201        let handle = crate::query::session::QueryHandle::new(Some("test-session".to_string()));
202        let cancel_token = CancellationToken::new();
203
204        let result = QueryResultWithCLI {
205            handle,
206            stream,
207            cancel_token: cancel_token.clone(),
208        };
209
210        result.close().await.unwrap();
211        assert!(cancel_token.is_cancelled());
212    }
213}