Skip to main content

vtcode_commons/
async_utils.rs

1//! Async utility functions
2
3use anyhow::{Context, Result};
4use std::future::Future;
5use std::time::Duration;
6use tokio::io::AsyncReadExt;
7use tokio::time::timeout;
8
9pub const DEFAULT_ASYNC_TIMEOUT: Duration = Duration::from_secs(30);
10pub const SHORT_ASYNC_TIMEOUT: Duration = Duration::from_secs(5);
11pub const LONG_ASYNC_TIMEOUT: Duration = Duration::from_secs(300);
12
13/// Execute a future with a timeout and context
14pub async fn with_timeout<F, T>(fut: F, duration: Duration, context: &str) -> Result<T>
15where
16    F: Future<Output = T>,
17{
18    match timeout(duration, fut).await {
19        Ok(result) => Ok(result),
20        Err(_) => anyhow::bail!("Operation timed out after {:?}: {}", duration, context),
21    }
22}
23
24/// Execute a future with the default timeout
25pub async fn with_default_timeout<F, T>(fut: F, context: &str) -> Result<T>
26where
27    F: Future<Output = T>,
28{
29    with_timeout(fut, DEFAULT_ASYNC_TIMEOUT, context).await
30}
31
32/// Execute a future with a short timeout
33pub async fn with_short_timeout<F, T>(fut: F, context: &str) -> Result<T>
34where
35    F: Future<Output = T>,
36{
37    with_timeout(fut, SHORT_ASYNC_TIMEOUT, context).await
38}
39
40/// Execute a future with a long timeout
41pub async fn with_long_timeout<F, T>(fut: F, context: &str) -> Result<T>
42where
43    F: Future<Output = T>,
44{
45    with_timeout(fut, LONG_ASYNC_TIMEOUT, context).await
46}
47
48/// Retry an operation with exponential backoff
49pub async fn retry_with_backoff<F, Fut, T>(
50    mut op: F,
51    max_retries: usize,
52    initial_delay: Duration,
53    context: &str,
54) -> Result<T>
55where
56    F: FnMut() -> Fut,
57    Fut: Future<Output = Result<T>>,
58{
59    let mut delay = initial_delay;
60    let mut last_error = None;
61
62    for i in 0..=max_retries {
63        match op().await {
64            Ok(result) => return Ok(result),
65            Err(e) => {
66                last_error = Some(e);
67                if i < max_retries {
68                    tokio::time::sleep(delay).await;
69                    delay *= 2;
70                }
71            }
72        }
73    }
74
75    let err = last_error.unwrap_or_else(|| anyhow::anyhow!("Retry failed without error"));
76    Err(err).with_context(|| {
77        format!(
78            "Operation failed after {} retries: {}",
79            max_retries, context
80        )
81    })
82}
83
84/// Sleep with context
85pub async fn sleep_with_context(duration: Duration, _context: &str) {
86    tokio::time::sleep(duration).await;
87}
88
89/// Run multiple futures and wait for all with a timeout
90pub async fn join_all_with_timeout<F, T>(
91    futs: Vec<F>,
92    duration: Duration,
93    context: &str,
94) -> Result<Vec<T>>
95where
96    F: Future<Output = T>,
97{
98    with_timeout(futures::future::join_all(futs), duration, context).await
99}
100
101/// Read exactly `len` bytes from an async reader without zero-initializing
102/// the buffer first.
103///
104/// This avoids the double-write overhead of `vec![0u8; len]` followed by
105/// `read_exact` — the zeroing is wasted work since every byte is
106/// immediately overwritten by the read. For large payloads this can yield
107/// measurable performance gains.
108///
109/// The returned `Vec` has exactly `len` initialized bytes.
110///
111/// # Errors
112///
113/// Returns `io::Error` if the underlying reader's `read_exact` fails
114/// (e.g., unexpected EOF before `len` bytes are read).
115#[allow(unsafe_code)]
116pub async fn read_exact_uninit<R>(reader: &mut R, len: usize) -> std::io::Result<Vec<u8>>
117where
118    R: tokio::io::AsyncRead + Unpin,
119{
120    let mut buf = Vec::with_capacity(len);
121    // SAFETY:
122    // - The Vec's allocation is valid and properly aligned for `len` bytes.
123    // - `read_exact` writes `len` bytes into `dst` before any read from `buf`,
124    //   so no uninitialized memory is ever observed.
125    // - The reference `dst` is never read — only written — by `read_exact`.
126    let dst = unsafe { std::slice::from_raw_parts_mut(buf.as_mut_ptr(), len) };
127    reader.read_exact(dst).await?;
128    // SAFETY: `read_exact` initializes exactly `len` bytes on success.
129    unsafe { buf.set_len(len) };
130    Ok(buf)
131}