iflow_cli_sdk_rust/
query.rs

1use crate::client::IFlowClient;
2use crate::error::Result;
3use crate::types::{IFlowOptions, Message};
4use futures::stream::StreamExt;
5use std::time::Duration;
6use tokio::time::timeout;
7
8/// Simple synchronous query to iFlow
9///
10/// Sends a query to iFlow and waits for a complete response.
11/// This is a convenience function for simple request-response interactions.
12///
13/// # Arguments
14/// * `prompt` - The query prompt to send to iFlow
15///
16/// # Returns
17/// * `Ok(String)` containing the response from iFlow
18/// * `Err(IFlowError)` if there was an error
19///
20/// # Example
21/// ```no_run
22/// use iflow_cli_sdk_rust::query;
23///
24/// #[tokio::main]
25/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
26///     let response = query("What is 2 + 2?").await?;
27///     println!("{}", response);
28///     Ok(())
29/// }
30/// ```
31pub async fn query(prompt: &str) -> Result<String> {
32    let default_timeout = IFlowOptions::default().timeout;
33    query_with_timeout(prompt, default_timeout).await
34}
35
36/// Simple synchronous query to iFlow with custom options
37///
38/// Sends a query to iFlow and waits for a complete response.
39/// This is a convenience function for simple request-response interactions.
40///
41/// # Arguments
42/// * `prompt` - The query prompt to send to iFlow
43/// * `options` - Configuration options for the query
44///
45/// # Returns
46/// * `Ok(String)` containing the response from iFlow
47/// * `Err(IFlowError)` if there was an error
48///
49/// # Example
50/// ```no_run
51/// use iflow_cli_sdk_rust::{query_with_config, IFlowOptions};
52///
53/// #[tokio::main]
54/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
55///     let options = IFlowOptions::new().with_timeout(120.0);
56///     let response = query_with_config("What is 2 + 2?", options).await?;
57///     println!("{}", response);
58///     Ok(())
59/// }
60/// ```
61pub async fn query_with_config(prompt: &str, options: IFlowOptions) -> Result<String> {
62    // Apply timeout to the entire operation
63    let timeout_secs = options.timeout;
64    // Use a fraction of the total timeout for individual message reception
65    // This ensures we don't block indefinitely on any single message
66    let message_timeout_secs = (timeout_secs / 10.0).min(1.0).max(0.1);
67
68    match timeout(Duration::from_secs_f64(timeout_secs), async {
69        let local = tokio::task::LocalSet::new();
70        local
71            .run_until(async {
72                tracing::debug!("Creating IFlowClient with custom options");
73                let mut client = IFlowClient::new(Some(options));
74                tracing::debug!("Connecting to iFlow...");
75                client.connect().await?;
76                tracing::debug!("Connected to iFlow");
77
78                tracing::debug!("Sending message: {}", prompt);
79                client.send_message(prompt, None).await?;
80                tracing::debug!("Message sent");
81
82                let mut response = String::new();
83                let mut message_stream = client.messages();
84
85                // First wait for the send_message to complete by receiving the TaskFinish message
86                // The send_message function sends a TaskFinish message when the prompt is complete
87                let mut prompt_finished = false;
88                while !prompt_finished {
89                    match timeout(
90                        Duration::from_secs_f64(message_timeout_secs),
91                        message_stream.next(),
92                    )
93                    .await
94                    {
95                        Ok(Some(message)) => {
96                            tracing::debug!("Received message: {:?}", message);
97                            match message {
98                                Message::Assistant { content } => {
99                                    response.push_str(&content);
100                                }
101                                Message::TaskFinish { .. } => {
102                                    prompt_finished = true;
103                                }
104                                _ => {}
105                            }
106                        }
107                        Ok(None) => {
108                            // Stream ended
109                            tracing::debug!("Message stream ended");
110                            prompt_finished = true;
111                        }
112                        Err(_) => {
113                            // Timeout on individual message - this is expected during normal operation
114                            // Continue the loop to check if we should still wait
115                            // The outer timeout will catch if we've exceeded the total time
116                        }
117                    }
118                }
119                tracing::debug!("Query completed, response length: {}", response.len());
120
121                client.disconnect().await?;
122                Ok(response.trim().to_string())
123            })
124            .await
125    })
126    .await
127    {
128        Ok(result) => result,
129        Err(_) => Err(crate::error::IFlowError::Timeout(
130            "Operation timed out".to_string(),
131        )),
132    }
133}
134
135/// Simple synchronous query to iFlow with custom timeout
136///
137/// Sends a query to iFlow and waits for a complete response.
138/// This is a convenience function for simple request-response interactions.
139///
140/// # Arguments
141/// * `prompt` - The query prompt to send to iFlow
142/// * `timeout_secs` - Timeout in seconds
143///
144/// # Returns
145/// * `Ok(String)` containing the response from iFlow
146/// * `Err(IFlowError)` if there was an error
147///
148/// # Example
149/// ```no_run
150/// use iflow_cli_sdk_rust::query_with_timeout;
151///
152/// #[tokio::main]
153/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
154///     let response = query_with_timeout("What is 2 + 2?", 120.0).await?;
155///     println!("{}", response);
156///     Ok(())
157/// }
158/// ```
159pub async fn query_with_timeout(prompt: &str, timeout_secs: f64) -> Result<String> {
160    // Apply timeout to the entire operation
161    // Use a fraction of the total timeout for individual message reception
162    // This ensures we don't block indefinitely on any single message
163    let message_timeout_secs = (timeout_secs / 10.0).min(1.0).max(0.1);
164
165    match timeout(Duration::from_secs_f64(timeout_secs), async {
166        let local = tokio::task::LocalSet::new();
167        local
168            .run_until(async {
169                // Create client with the specified timeout and auto-start configuration for stdio mode
170                let options = IFlowOptions::new()
171                    .with_timeout(timeout_secs)
172                    .with_process_config(
173                        crate::types::ProcessConfig::new()
174                            .enable_auto_start()
175                            .stdio_mode(),
176                    );
177                tracing::debug!(
178                    "Creating IFlowClient with options: auto_start={}, start_port={:?}",
179                    options.process.auto_start,
180                    options.process.start_port
181                );
182                let mut client = IFlowClient::new(Some(options));
183                tracing::debug!("Connecting to iFlow...");
184                client.connect().await?;
185                tracing::debug!("Connected to iFlow");
186
187                tracing::debug!("Sending message: {}", prompt);
188                client.send_message(prompt, None).await?;
189                tracing::debug!("Message sent");
190
191                let mut response = String::new();
192                let mut message_stream = client.messages();
193
194                // First wait for the send_message to complete by receiving the TaskFinish message
195                // The send_message function sends a TaskFinish message when the prompt is complete
196                let mut prompt_finished = false;
197                while !prompt_finished {
198                    match timeout(
199                        Duration::from_secs_f64(message_timeout_secs),
200                        message_stream.next(),
201                    )
202                    .await
203                    {
204                        Ok(Some(message)) => {
205                            tracing::debug!("Received message: {:?}", message);
206                            match message {
207                                Message::Assistant { content } => {
208                                    response.push_str(&content);
209                                }
210                                Message::TaskFinish { .. } => {
211                                    prompt_finished = true;
212                                }
213                                _ => {}
214                            }
215                        }
216                        Ok(None) => {
217                            // Stream ended
218                            tracing::debug!("Message stream ended");
219                            prompt_finished = true;
220                        }
221                        Err(_) => {
222                            // Timeout on individual message - this is expected during normal operation
223                            // Continue the loop to check if we should still wait
224                            // The outer timeout will catch if we've exceeded the total time
225                        }
226                    }
227                }
228                tracing::debug!("Query completed, response length: {}", response.len());
229
230                client.disconnect().await?;
231                Ok(response.trim().to_string())
232            })
233            .await
234    })
235    .await
236    {
237        Ok(result) => result,
238        Err(_) => Err(crate::error::IFlowError::Timeout(
239            "Operation timed out".to_string(),
240        )),
241    }
242}
243
244/// Stream responses from iFlow
245///
246/// Sends a query to iFlow and returns a stream of response chunks.
247/// This is useful for real-time output as the response is generated.
248///
249/// # Arguments
250/// * `prompt` - The query prompt to send to iFlow
251///
252/// # Returns
253/// * `Ok(impl Stream<Item = String>)` containing the response stream
254/// * `Err(IFlowError)` if there was an error
255///
256/// # Example
257/// ```no_run
258/// use iflow_cli_sdk_rust::query_stream;
259/// use futures::stream::StreamExt;
260///
261/// #[tokio::main]
262/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
263///     let mut stream = query_stream("Tell me a story").await?;
264///     
265///     while let Some(chunk) = stream.next().await {
266///         print!("{}", chunk);
267///         // Flush stdout for real-time output
268///         use std::io::{self, Write};
269///         io::stdout().flush()?;
270///     }
271///     
272///     Ok(())
273/// }
274/// ```
275pub async fn query_stream(prompt: &str) -> Result<impl futures::Stream<Item = String>> {
276    query_stream_with_timeout(prompt, 120.0).await
277}
278
279/// Stream responses from iFlow with custom options
280///
281/// Sends a query to iFlow and returns a stream of response chunks.
282/// This is useful for real-time output as the response is generated.
283///
284/// # Arguments
285/// * `prompt` - The query prompt to send to iFlow
286/// * `options` - Configuration options for the query
287///
288/// # Returns
289/// * `Ok(impl Stream<Item = String>)` containing the response stream
290/// * `Err(IFlowError)` if there was an error
291///
292/// # Example
293/// ```no_run
294/// use iflow_cli_sdk_rust::{query_stream_with_config, IFlowOptions};
295/// use futures::stream::StreamExt;
296///
297/// #[tokio::main]
298/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
299///     let options = IFlowOptions::new().with_timeout(60.0);
300///     let mut stream = query_stream_with_config("Tell me a story", options).await?;
301///     
302///     while let Some(chunk) = stream.next().await {
303///         print!("{}", chunk);
304///         // Flush stdout for real-time output
305///         use std::io::{self, Write};
306///         io::stdout().flush()?;
307///     }
308///     
309///     Ok(())
310/// }
311/// ```
312pub async fn query_stream_with_config(
313    prompt: &str,
314    options: IFlowOptions,
315) -> Result<impl futures::Stream<Item = String>> {
316    let local = tokio::task::LocalSet::new();
317    // We need to run this in a LocalSet context but return a stream
318    // Let's create the client and connection in the LocalSet context
319    local
320        .run_until(async {
321            // Create client with the specified options
322            let mut client = IFlowClient::new(Some(options));
323            client.connect().await?;
324
325            client.send_message(prompt, None).await?;
326
327            let (tx, rx) = futures::channel::mpsc::unbounded();
328            let message_stream = client.messages();
329
330            tokio::task::spawn_local(async move {
331                futures::pin_mut!(message_stream);
332
333                while let Some(message) = message_stream.next().await {
334                    match message {
335                        Message::Assistant { content } => {
336                            if tx.unbounded_send(content).is_err() {
337                                break;
338                            }
339                        }
340                        Message::TaskFinish { .. } => {
341                            break;
342                        }
343                        _ => {}
344                    }
345                }
346
347                let _ = client.disconnect().await;
348            });
349
350            Ok(rx)
351        })
352        .await
353}
354
355/// Stream responses from iFlow with custom timeout
356///
357/// Sends a query to iFlow and returns a stream of response chunks.
358/// This is useful for real-time output as the response is generated.
359///
360/// # Arguments
361/// * `prompt` - The query prompt to send to iFlow
362/// * `timeout_secs` - Timeout in seconds
363///
364/// # Returns
365/// * `Ok(impl Stream<Item = String>)` containing the response stream
366/// * `Err(IFlowError)` if there was an error
367///
368/// # Example
369/// ```no_run
370/// use iflow_cli_sdk_rust::query_stream_with_timeout;
371/// use futures::stream::StreamExt;
372///
373/// #[tokio::main]
374/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
375///     let mut stream = query_stream_with_timeout("Tell me a story", 120.0).await?;
376///     
377///     while let Some(chunk) = stream.next().await {
378///         print!("{}", chunk);
379///         // Flush stdout for real-time output
380///         use std::io::{self, Write};
381///         io::stdout().flush()?;
382///     }
383///     
384///     Ok(())
385/// }
386/// ```
387pub async fn query_stream_with_timeout(
388    prompt: &str,
389    timeout_secs: f64,
390) -> Result<impl futures::Stream<Item = String>> {
391    let local = tokio::task::LocalSet::new();
392    // We need to run this in a LocalSet context but return a stream
393    // Let's create the client and connection in the LocalSet context
394    local
395        .run_until(async {
396            // Create client with the specified timeout
397            let options = IFlowOptions::new().with_timeout(timeout_secs);
398            let mut client = IFlowClient::new(Some(options));
399            client.connect().await?;
400
401            client.send_message(prompt, None).await?;
402
403            let (tx, rx) = futures::channel::mpsc::unbounded();
404            let message_stream = client.messages();
405
406            tokio::task::spawn_local(async move {
407                futures::pin_mut!(message_stream);
408
409                while let Some(message) = message_stream.next().await {
410                    match message {
411                        Message::Assistant { content } => {
412                            if tx.unbounded_send(content).is_err() {
413                                break;
414                            }
415                        }
416                        Message::TaskFinish { .. } => {
417                            break;
418                        }
419                        _ => {}
420                    }
421                }
422
423                let _ = client.disconnect().await;
424            });
425
426            Ok(rx)
427        })
428        .await
429}