qwencode_rs/query/
executor.rs1use anyhow::{Context, Result};
2use tokio_util::sync::CancellationToken;
3use tracing::{debug, error, info, warn};
4
5use crate::transport::communication::{protocol_to_sdk_message, spawn_cli_process, CLIRequest};
6use crate::transport::stream::{create_message_stream, MessageStream};
7use crate::types::config::QueryOptions;
8use crate::types::message::SDKMessage;
9
10pub async fn execute_query(prompt: &str, options: QueryOptions) -> Result<QueryResultWithCLI> {
15 info!("Executing query with CLI communication: {}", prompt);
16
17 let process = spawn_cli_process(options.path_to_qwen_executable.as_deref())
19 .await
20 .context("Failed to spawn CLI process")?;
21
22 let (handler, stream) = create_message_stream();
24 let cancel_token = CancellationToken::new();
25
26 let mut process = process;
28 let cancel_token_init = cancel_token.clone();
29 match process.initialize(&cancel_token_init).await {
30 Ok(response) => {
31 debug!(
32 "CLI initialized: protocol_version={}, streaming={}",
33 response.protocol_version, response.capabilities.streaming
34 );
35 }
36 Err(e) => {
37 warn!(
38 "CLI initialization failed (expected if CLI doesn't support init): {}",
39 e
40 );
41 }
42 }
43
44 let request = CLIRequest {
46 request_type: "query".to_string(),
47 prompt: prompt.to_string(),
48 session_id: options.session_id.clone(),
49 options: options.clone(),
50 };
51
52 process
54 .send_query(&request)
55 .await
56 .context("Failed to send query")?;
57
58 let handler_for_task = handler;
60 let cancel_token_task = cancel_token.clone();
61
62 tokio::spawn(async move {
63 let mut process = process;
64 let handler = handler_for_task;
65
66 loop {
67 tokio::select! {
68 _ = cancel_token_task.cancelled() => {
69 debug!("Query cancelled");
70 break;
71 }
72 result = process.read_message() => {
73 match result {
74 Ok(Some(msg)) => {
75 match protocol_to_sdk_message(&msg) {
76 Ok(Some(sdk_msg)) => {
77 if let Err(e) = handler.send_message(sdk_msg).await {
78 error!("Failed to send message to stream: {}", e);
79 }
80 }
81 Ok(None) => {
82 debug!("Message filtered out");
83 }
84 Err(e) => {
85 error!("Error converting message: {}", e);
86 if let Err(e) = handler.send_error(e).await {
87 error!("Failed to send error to stream: {}", e);
88 }
89 }
90 }
91 }
92 Ok(None) => {
93 debug!("CLI process exited");
94 break;
95 }
96 Err(e) => {
97 error!("Error reading from CLI: {}", e);
98 if let Err(e) = handler.send_error(e).await {
99 error!("Failed to send error to stream: {}", e);
100 }
101 break;
102 }
103 }
104 }
105 }
106 }
107
108 if let Err(e) = process.shutdown().await {
110 warn!("Error during CLI shutdown: {}", e);
111 }
112 });
113
114 let handle = crate::query::session::QueryHandle::new(options.session_id.clone());
116
117 Ok(QueryResultWithCLI {
118 handle,
119 stream,
120 cancel_token,
121 })
122}
123
124pub struct QueryResultWithCLI {
126 pub handle: crate::query::session::QueryHandle,
127 pub stream: MessageStream,
128 pub cancel_token: CancellationToken,
129}
130
131impl QueryResultWithCLI {
132 pub fn handle(&self) -> &crate::query::session::QueryHandle {
134 &self.handle
135 }
136
137 pub fn stream(&self) -> &MessageStream {
139 &self.stream
140 }
141
142 pub async fn next_message(&self) -> Option<Result<SDKMessage>> {
144 self.stream.next_message().await
145 }
146
147 pub fn cancel(&self) {
149 self.cancel_token.cancel();
150 }
151
152 pub async fn close(self) -> Result<()> {
154 self.cancel_token.cancel();
155 let mut this = self;
156 this.handle.close().await
157 }
158}
159
160#[cfg(test)]
161mod tests {
162 use super::*;
163
164 #[test]
165 fn test_query_result_with_cli_creation() {
166 let (handler, stream) = create_message_stream();
168 let handle = crate::query::session::QueryHandle::new(None);
169 let cancel_token = CancellationToken::new();
170
171 let result = QueryResultWithCLI {
172 handle,
173 stream,
174 cancel_token: cancel_token.clone(),
175 };
176
177 assert!(!result.handle().is_closed());
178 drop(handler);
180 }
181
182 #[tokio::test]
183 async fn test_query_result_cancel() {
184 let (_, stream) = create_message_stream();
185 let handle = crate::query::session::QueryHandle::new(None);
186 let cancel_token = CancellationToken::new();
187
188 let result = QueryResultWithCLI {
189 handle,
190 stream,
191 cancel_token: cancel_token.clone(),
192 };
193
194 result.cancel();
195 assert!(cancel_token.is_cancelled());
196 }
197
198 #[tokio::test]
199 async fn test_query_result_close() {
200 let (_, stream) = create_message_stream();
201 let handle = crate::query::session::QueryHandle::new(Some("test-session".to_string()));
202 let cancel_token = CancellationToken::new();
203
204 let result = QueryResultWithCLI {
205 handle,
206 stream,
207 cancel_token: cancel_token.clone(),
208 };
209
210 result.close().await.unwrap();
211 assert!(cancel_token.is_cancelled());
212 }
213}