#[macro_export]
macro_rules! concurrent_execute {
($($future:expr),* $(,)?) => {{
use futures::future::join_all;
let futures = vec![
$(Box::pin($future),)*
];
join_all(futures).await
}};
}
#[macro_export]
macro_rules! resilient_async {
($operation:expr, timeout: $timeout:expr, retries: $retries:expr) => {{
use anyhow::Context;
use tokio::time::{Duration, timeout};
let mut attempts = 0;
let mut last_error = None;
while attempts < $retries {
match timeout($timeout, $operation).await {
Ok(Ok(result)) => return Ok(result),
Ok(Err(e)) => {
last_error = Some(e);
attempts += 1;
if attempts < $retries {
tracing::warn!(
"Operation failed (attempt {}/{}), retrying...",
attempts,
$retries
);
tokio::time::sleep(Duration::from_millis(100 * (1 << attempts))).await;
}
}
Err(_) => {
last_error = Some(anyhow::anyhow!("Operation timed out"));
attempts += 1;
if attempts < $retries {
tracing::warn!(
"Operation timed out (attempt {}/{}), retrying...",
attempts,
$retries
);
}
}
}
}
Err(last_error
.unwrap_or_else(|| anyhow::anyhow!("Operation failed after {} attempts", $retries)))
}};
}
#[macro_export]
macro_rules! select_first_ok {
($($future:expr),* $(,)?) => {{
use tokio::select;
use futures::future::FutureExt;
select! {
$(
result = $future.fuse() => {
if result.is_ok() {
return result;
}
}
)*
}
Err(anyhow::anyhow!("All operations failed"))
}};
}
#[macro_export]
macro_rules! cancellable_async {
($token:expr, $operation:expr) => {{
tokio::select! {
_ = $token.cancelled() => {
Err(anyhow::anyhow!("Operation cancelled"))
}
result = $operation => {
result
}
}
}};
}
#[macro_export]
macro_rules! stream_batch_process {
($stream:expr, $batch_size:expr, $processor:expr) => {{
use futures::stream::{StreamExt, TryStreamExt};
$stream
.chunks($batch_size)
.map(|batch| async move { $processor(batch).await })
.buffer_unordered(4)
.try_collect()
.await
}};
}
#[macro_export]
macro_rules! async_lock_timeout {
($mutex:expr, $timeout:expr) => {{
use tokio::time::timeout;
match timeout($timeout, $mutex.lock()).await {
Ok(guard) => Ok(guard),
Err(_) => Err(anyhow::anyhow!("Failed to acquire lock within timeout")),
}
}};
}
#[macro_export]
macro_rules! spawn_logged {
($name:expr, $future:expr) => {{
tokio::spawn(async move {
let _span = tracing::info_span!("spawned_task", name = $name).entered();
match $future.await {
Ok(result) => {
tracing::debug!("Task '{}' completed successfully", $name);
Ok(result)
}
Err(e) => {
tracing::error!("Task '{}' failed: {:#}", $name, e);
Err(e)
}
}
})
}};
}
#[macro_export]
macro_rules! async_with_progress {
($progress:expr, $operation:expr) => {{
let start = std::time::Instant::now();
let result = $operation;
let duration = start.elapsed();
$progress.update_duration(duration).await;
result
}};
}
#[macro_export]
macro_rules! parallel_map {
($collection:expr, $mapper:expr) => {{
use futures::future::join_all;
let futures: Vec<_> = $collection.into_iter().map($mapper).collect();
join_all(futures).await
}};
}
#[macro_export]
macro_rules! rate_limited_async {
($operation:expr, $rate_limiter:expr) => {{
$rate_limiter.acquire().await;
$operation.await
}};
}