iflow_cli_sdk_rust/query.rs
1use crate::client::IFlowClient;
2use crate::error::Result;
3use crate::types::{IFlowOptions, Message};
4use futures::stream::StreamExt;
5use std::time::Duration;
6use tokio::time::timeout;
7
8/// Simple synchronous query to iFlow
9///
10/// Sends a query to iFlow and waits for a complete response.
11/// This is a convenience function for simple request-response interactions.
12///
13/// # Arguments
14/// * `prompt` - The query prompt to send to iFlow
15///
16/// # Returns
17/// * `Ok(String)` containing the response from iFlow
18/// * `Err(IFlowError)` if there was an error
19///
20/// # Example
21/// ```no_run
22/// use iflow_cli_sdk_rust::query;
23///
24/// #[tokio::main]
25/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
26/// let response = query("What is 2 + 2?").await?;
27/// println!("{}", response);
28/// Ok(())
29/// }
30/// ```
31pub async fn query(prompt: &str) -> Result<String> {
32 let default_timeout = IFlowOptions::default().timeout;
33 query_with_timeout(prompt, default_timeout).await
34}
35
36/// Simple synchronous query to iFlow with custom options
37///
38/// Sends a query to iFlow and waits for a complete response.
39/// This is a convenience function for simple request-response interactions.
40///
41/// # Arguments
42/// * `prompt` - The query prompt to send to iFlow
43/// * `options` - Configuration options for the query
44///
45/// # Returns
46/// * `Ok(String)` containing the response from iFlow
47/// * `Err(IFlowError)` if there was an error
48///
49/// # Example
50/// ```no_run
51/// use iflow_cli_sdk_rust::{query_with_config, IFlowOptions};
52///
53/// #[tokio::main]
54/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
55/// let options = IFlowOptions::new().with_timeout(120.0);
56/// let response = query_with_config("What is 2 + 2?", options).await?;
57/// println!("{}", response);
58/// Ok(())
59/// }
60/// ```
61pub async fn query_with_config(prompt: &str, options: IFlowOptions) -> Result<String> {
62 // Apply timeout to the entire operation
63 let timeout_secs = options.timeout;
64 // Use a fraction of the total timeout for individual message reception
65 // This ensures we don't block indefinitely on any single message
66 let message_timeout_secs = (timeout_secs / 10.0).min(1.0).max(0.1);
67
68 match timeout(Duration::from_secs_f64(timeout_secs), async {
69 let local = tokio::task::LocalSet::new();
70 local
71 .run_until(async {
72 tracing::debug!("Creating IFlowClient with custom options");
73 let mut client = IFlowClient::new(Some(options));
74 tracing::debug!("Connecting to iFlow...");
75 client.connect().await?;
76 tracing::debug!("Connected to iFlow");
77
78 tracing::debug!("Sending message: {}", prompt);
79 client.send_message(prompt, None).await?;
80 tracing::debug!("Message sent");
81
82 let mut response = String::new();
83 let mut message_stream = client.messages();
84
85 // First wait for the send_message to complete by receiving the TaskFinish message
86 // The send_message function sends a TaskFinish message when the prompt is complete
87 let mut prompt_finished = false;
88 while !prompt_finished {
89 match timeout(
90 Duration::from_secs_f64(message_timeout_secs),
91 message_stream.next(),
92 )
93 .await
94 {
95 Ok(Some(message)) => {
96 tracing::debug!("Received message: {:?}", message);
97 match message {
98 Message::Assistant { content } => {
99 response.push_str(&content);
100 }
101 Message::TaskFinish { .. } => {
102 prompt_finished = true;
103 }
104 _ => {}
105 }
106 }
107 Ok(None) => {
108 // Stream ended
109 tracing::debug!("Message stream ended");
110 prompt_finished = true;
111 }
112 Err(_) => {
113 // Timeout on individual message - this is expected during normal operation
114 // Continue the loop to check if we should still wait
115 // The outer timeout will catch if we've exceeded the total time
116 }
117 }
118 }
119 tracing::debug!("Query completed, response length: {}", response.len());
120
121 client.disconnect().await?;
122 Ok(response.trim().to_string())
123 })
124 .await
125 })
126 .await
127 {
128 Ok(result) => result,
129 Err(_) => Err(crate::error::IFlowError::Timeout(
130 "Operation timed out".to_string(),
131 )),
132 }
133}
134
135/// Simple synchronous query to iFlow with custom timeout
136///
137/// Sends a query to iFlow and waits for a complete response.
138/// This is a convenience function for simple request-response interactions.
139///
140/// # Arguments
141/// * `prompt` - The query prompt to send to iFlow
142/// * `timeout_secs` - Timeout in seconds
143///
144/// # Returns
145/// * `Ok(String)` containing the response from iFlow
146/// * `Err(IFlowError)` if there was an error
147///
148/// # Example
149/// ```no_run
150/// use iflow_cli_sdk_rust::query_with_timeout;
151///
152/// #[tokio::main]
153/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
154/// let response = query_with_timeout("What is 2 + 2?", 120.0).await?;
155/// println!("{}", response);
156/// Ok(())
157/// }
158/// ```
159pub async fn query_with_timeout(prompt: &str, timeout_secs: f64) -> Result<String> {
160 // Apply timeout to the entire operation
161 // Use a fraction of the total timeout for individual message reception
162 // This ensures we don't block indefinitely on any single message
163 let message_timeout_secs = (timeout_secs / 10.0).min(1.0).max(0.1);
164
165 match timeout(Duration::from_secs_f64(timeout_secs), async {
166 let local = tokio::task::LocalSet::new();
167 local
168 .run_until(async {
169 // Create client with the specified timeout and auto-start configuration for stdio mode
170 let options = IFlowOptions::new()
171 .with_timeout(timeout_secs)
172 .with_process_config(
173 crate::types::ProcessConfig::new()
174 .enable_auto_start()
175 .stdio_mode(),
176 );
177 tracing::debug!(
178 "Creating IFlowClient with options: auto_start={}, start_port={:?}",
179 options.process.auto_start,
180 options.process.start_port
181 );
182 let mut client = IFlowClient::new(Some(options));
183 tracing::debug!("Connecting to iFlow...");
184 client.connect().await?;
185 tracing::debug!("Connected to iFlow");
186
187 tracing::debug!("Sending message: {}", prompt);
188 client.send_message(prompt, None).await?;
189 tracing::debug!("Message sent");
190
191 let mut response = String::new();
192 let mut message_stream = client.messages();
193
194 // First wait for the send_message to complete by receiving the TaskFinish message
195 // The send_message function sends a TaskFinish message when the prompt is complete
196 let mut prompt_finished = false;
197 while !prompt_finished {
198 match timeout(
199 Duration::from_secs_f64(message_timeout_secs),
200 message_stream.next(),
201 )
202 .await
203 {
204 Ok(Some(message)) => {
205 tracing::debug!("Received message: {:?}", message);
206 match message {
207 Message::Assistant { content } => {
208 response.push_str(&content);
209 }
210 Message::TaskFinish { .. } => {
211 prompt_finished = true;
212 }
213 _ => {}
214 }
215 }
216 Ok(None) => {
217 // Stream ended
218 tracing::debug!("Message stream ended");
219 prompt_finished = true;
220 }
221 Err(_) => {
222 // Timeout on individual message - this is expected during normal operation
223 // Continue the loop to check if we should still wait
224 // The outer timeout will catch if we've exceeded the total time
225 }
226 }
227 }
228 tracing::debug!("Query completed, response length: {}", response.len());
229
230 client.disconnect().await?;
231 Ok(response.trim().to_string())
232 })
233 .await
234 })
235 .await
236 {
237 Ok(result) => result,
238 Err(_) => Err(crate::error::IFlowError::Timeout(
239 "Operation timed out".to_string(),
240 )),
241 }
242}
243
244/// Stream responses from iFlow
245///
246/// Sends a query to iFlow and returns a stream of response chunks.
247/// This is useful for real-time output as the response is generated.
248///
249/// # Arguments
250/// * `prompt` - The query prompt to send to iFlow
251///
252/// # Returns
253/// * `Ok(impl Stream<Item = String>)` containing the response stream
254/// * `Err(IFlowError)` if there was an error
255///
256/// # Example
257/// ```no_run
258/// use iflow_cli_sdk_rust::query_stream;
259/// use futures::stream::StreamExt;
260///
261/// #[tokio::main]
262/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
263/// let mut stream = query_stream("Tell me a story").await?;
264///
265/// while let Some(chunk) = stream.next().await {
266/// print!("{}", chunk);
267/// // Flush stdout for real-time output
268/// use std::io::{self, Write};
269/// io::stdout().flush()?;
270/// }
271///
272/// Ok(())
273/// }
274/// ```
275pub async fn query_stream(prompt: &str) -> Result<impl futures::Stream<Item = String>> {
276 query_stream_with_timeout(prompt, 120.0).await
277}
278
279/// Stream responses from iFlow with custom options
280///
281/// Sends a query to iFlow and returns a stream of response chunks.
282/// This is useful for real-time output as the response is generated.
283///
284/// # Arguments
285/// * `prompt` - The query prompt to send to iFlow
286/// * `options` - Configuration options for the query
287///
288/// # Returns
289/// * `Ok(impl Stream<Item = String>)` containing the response stream
290/// * `Err(IFlowError)` if there was an error
291///
292/// # Example
293/// ```no_run
294/// use iflow_cli_sdk_rust::{query_stream_with_config, IFlowOptions};
295/// use futures::stream::StreamExt;
296///
297/// #[tokio::main]
298/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
299/// let options = IFlowOptions::new().with_timeout(60.0);
300/// let mut stream = query_stream_with_config("Tell me a story", options).await?;
301///
302/// while let Some(chunk) = stream.next().await {
303/// print!("{}", chunk);
304/// // Flush stdout for real-time output
305/// use std::io::{self, Write};
306/// io::stdout().flush()?;
307/// }
308///
309/// Ok(())
310/// }
311/// ```
312pub async fn query_stream_with_config(
313 prompt: &str,
314 options: IFlowOptions,
315) -> Result<impl futures::Stream<Item = String>> {
316 let local = tokio::task::LocalSet::new();
317 // We need to run this in a LocalSet context but return a stream
318 // Let's create the client and connection in the LocalSet context
319 local
320 .run_until(async {
321 // Create client with the specified options
322 let mut client = IFlowClient::new(Some(options));
323 client.connect().await?;
324
325 client.send_message(prompt, None).await?;
326
327 let (tx, rx) = futures::channel::mpsc::unbounded();
328 let message_stream = client.messages();
329
330 tokio::task::spawn_local(async move {
331 futures::pin_mut!(message_stream);
332
333 while let Some(message) = message_stream.next().await {
334 match message {
335 Message::Assistant { content } => {
336 if tx.unbounded_send(content).is_err() {
337 break;
338 }
339 }
340 Message::TaskFinish { .. } => {
341 break;
342 }
343 _ => {}
344 }
345 }
346
347 let _ = client.disconnect().await;
348 });
349
350 Ok(rx)
351 })
352 .await
353}
354
355/// Stream responses from iFlow with custom timeout
356///
357/// Sends a query to iFlow and returns a stream of response chunks.
358/// This is useful for real-time output as the response is generated.
359///
360/// # Arguments
361/// * `prompt` - The query prompt to send to iFlow
362/// * `timeout_secs` - Timeout in seconds
363///
364/// # Returns
365/// * `Ok(impl Stream<Item = String>)` containing the response stream
366/// * `Err(IFlowError)` if there was an error
367///
368/// # Example
369/// ```no_run
370/// use iflow_cli_sdk_rust::query_stream_with_timeout;
371/// use futures::stream::StreamExt;
372///
373/// #[tokio::main]
374/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
375/// let mut stream = query_stream_with_timeout("Tell me a story", 120.0).await?;
376///
377/// while let Some(chunk) = stream.next().await {
378/// print!("{}", chunk);
379/// // Flush stdout for real-time output
380/// use std::io::{self, Write};
381/// io::stdout().flush()?;
382/// }
383///
384/// Ok(())
385/// }
386/// ```
387pub async fn query_stream_with_timeout(
388 prompt: &str,
389 timeout_secs: f64,
390) -> Result<impl futures::Stream<Item = String>> {
391 let local = tokio::task::LocalSet::new();
392 // We need to run this in a LocalSet context but return a stream
393 // Let's create the client and connection in the LocalSet context
394 local
395 .run_until(async {
396 // Create client with the specified timeout
397 let options = IFlowOptions::new().with_timeout(timeout_secs);
398 let mut client = IFlowClient::new(Some(options));
399 client.connect().await?;
400
401 client.send_message(prompt, None).await?;
402
403 let (tx, rx) = futures::channel::mpsc::unbounded();
404 let message_stream = client.messages();
405
406 tokio::task::spawn_local(async move {
407 futures::pin_mut!(message_stream);
408
409 while let Some(message) = message_stream.next().await {
410 match message {
411 Message::Assistant { content } => {
412 if tx.unbounded_send(content).is_err() {
413 break;
414 }
415 }
416 Message::TaskFinish { .. } => {
417 break;
418 }
419 _ => {}
420 }
421 }
422
423 let _ = client.disconnect().await;
424 });
425
426 Ok(rx)
427 })
428 .await
429}