race 0.1.1

Staggered async task runner / 阶梯式异步任务执行器
Documentation

English | 中文


race : Staggered Async Task Runner

Table of Contents

Introduction

race is a Rust library for running async tasks with staggered delays. Instead of launching all tasks simultaneously, it starts them one by one at fixed intervals, collecting results through a channel.

This pattern is useful for:

  • Rate-limited API calls
  • Graceful degradation with fallback servers
  • Load balancing across multiple endpoints

Features

  • Staggered task execution with configurable delay
  • Early termination when receiver is dropped
  • Zero-copy result streaming via channel
  • Generic over task type and arguments

Installation

cargo add race

Usage

use std::time::Duration;
use race::Race;

async fn fetch(url: &'static str) -> Result<String, &'static str> {
  // Simulate network request
  Ok(format!("Response from {url}"))
}

#[tokio::main]
async fn main() {
  let race = Race::new(fetch, Duration::from_millis(100));
  let rx = race.run(vec!["server1", "server2", "server3"]);

  // Get first successful result
  while let Ok(result) = rx.recv().await {
    if let Ok(data) = result {
      println!("{data}");
      break;
    }
  }
}

DNS Resolution with Hedged Requests

Query multiple hosts with 500ms staggered delay, return first successful result:

use std::net::IpAddr;
use std::time::Duration;
use race::Race;
use tokio::net::lookup_host;

async fn resolve(host: &'static str) -> Result<IpAddr, std::io::Error> {
  let addr = lookup_host(host).await?.next().ok_or_else(|| {
    std::io::Error::new(std::io::ErrorKind::NotFound, "no address")
  })?;
  Ok(addr.ip())
}

#[tokio::main]
async fn main() {
  let hosts = vec!["google.com:80", "cloudflare.com:80", "github.com:80"];

  let race = Race::new(resolve, Duration::from_millis(500));
  let rx = race.run(hosts);

  // Return first successful response
  while let Ok(result) = rx.recv().await {
    if let Ok(ip) = result {
      println!("Resolved: {ip}");
      break;
    }
  }
}

Timeline:

  • 0ms: Resolve google.com
  • 500ms: Resolve cloudflare.com (if no response yet)
  • 1000ms: Resolve github.com (if still no response)

First successful response wins, remaining queries are abandoned when receiver drops.

Design

graph TD
  A[Race::new] --> B[Race::run]
  B --> C[Spawn tokio task]
  C --> D{For each arg}
  D --> E[Check tx.is_disconnected]
  E -->|Yes| F[Break loop]
  E -->|No| G[Sleep delay]
  G --> H[Execute gen_task]
  H --> I[Send result via channel]
  I --> D
  D -->|Done| J[Task ends]

The execution flow:

  1. Race::new stores the task generator and step duration
  2. Race::run spawns a tokio task that iterates over arguments
  3. Each iteration waits for step * index before execution
  4. Results are sent through a bounded SPSC channel
  5. Loop terminates early if receiver is dropped

API Reference

Race<G>

Staggered task runner.

Fields:

  • step: Duration - Delay between task starts
  • gen_task: G - Function that generates async tasks

Methods:

new(gen_task: G, step: Duration) -> Self

Create runner with task generator and step interval.

run<I, A, T, E, Fut>(self, args_li: I) -> AsyncRx<Result<T, E>>

Start execution, returns async receiver.

Type parameters:

  • I - Iterator of arguments
  • A - Argument type (Send + 'static)
  • T - Success type (Send + Unpin + 'static)
  • E - Error type (Send + Unpin + 'static)
  • Fut - Future type returned by gen_task

Tech Stack

Project Structure

race/
├── src/
│   └── lib.rs        # Core implementation
├── tests/
│   └── main.rs       # Integration tests
├── readme/
│   ├── en.md         # English documentation
│   └── zh.md         # Chinese documentation
└── Cargo.toml

History

The "race" pattern in async programming traces back to JavaScript's Promise.race(), introduced in ES6 (2015). Unlike Promise.race() which starts all tasks simultaneously, this library implements a staggered variant inspired by gRPC's "hedged requests" pattern described in Google's 2015 paper "The Tail at Scale".

Hedged requests help reduce tail latency by sending redundant requests after a delay, canceling pending ones when the first response arrives. This technique is widely used in distributed systems at Google, Amazon, and other large-scale services.


About

This project is an open-source component of js0.site ⋅ Refactoring the Internet Plan.

We are redefining the development paradigm of the Internet in a componentized way. Welcome to follow us:


race : 阶梯式异步任务执行器

目录

简介

race 是 Rust 异步任务阶梯执行库。不同于同时启动所有任务,它按固定间隔依次启动,通过通道收集结果。

适用场景:

  • 限速 API 调用
  • 多服务器优雅降级
  • 多端点负载均衡

特性

  • 可配置延迟的阶梯执行
  • 接收端关闭时提前终止
  • 通道零拷贝结果流
  • 泛型支持任意任务类型和参数

安装

cargo add race

使用

use std::time::Duration;
use race::Race;

async fn fetch(url: &'static str) -> Result<String, &'static str> {
  // 模拟网络请求
  Ok(format!("Response from {url}"))
}

#[tokio::main]
async fn main() {
  let race = Race::new(fetch, Duration::from_millis(100));
  let rx = race.run(vec!["server1", "server2", "server3"]);

  // 获取首个成功结果
  while let Ok(result) = rx.recv().await {
    if let Ok(data) = result {
      println!("{data}");
      break;
    }
  }
}

DNS 阶梯解析

向多个主机发起解析,每 500ms 启动新请求,返回首个成功结果:

use std::net::IpAddr;
use std::time::Duration;
use race::Race;
use tokio::net::lookup_host;

async fn resolve(host: &'static str) -> Result<IpAddr, std::io::Error> {
  let addr = lookup_host(host).await?.next().ok_or_else(|| {
    std::io::Error::new(std::io::ErrorKind::NotFound, "no address")
  })?;
  Ok(addr.ip())
}

#[tokio::main]
async fn main() {
  let hosts = vec!["google.com:80", "cloudflare.com:80", "github.com:80"];

  let race = Race::new(resolve, Duration::from_millis(500));
  let rx = race.run(hosts);

  // 返回首个成功响应
  while let Ok(result) = rx.recv().await {
    if let Ok(ip) = result {
      println!("Resolved: {ip}");
      break;
    }
  }
}

时间线:

  • 0ms: 解析 google.com
  • 500ms: 解析 cloudflare.com(若无响应)
  • 1000ms: 解析 github.com(若仍无响应)

首个成功响应返回后,接收端 drop,剩余查询自动终止。

设计

graph TD
  A[Race::new] --> B[Race::run]
  B --> C[Spawn tokio task]
  C --> D{For each arg}
  D --> E[Check tx.is_disconnected]
  E -->|Yes| F[Break loop]
  E -->|No| G[Sleep delay]
  G --> H[Execute gen_task]
  H --> I[Send result via channel]
  I --> D
  D -->|Done| J[Task ends]

执行流程:

  1. Race::new 存储任务生成器和步进间隔
  2. Race::run 启动 tokio 任务遍历参数
  3. 每次迭代等待 step * index 后执行
  4. 结果通过有界 SPSC 通道发送
  5. 接收端关闭时循环提前终止

API 参考

Race<G>

阶梯式任务执行器。

字段:

  • step: Duration - 任务启动间隔
  • gen_task: G - 异步任务生成函数

方法:

new(gen_task: G, step: Duration) -> Self

创建执行器,传入任务生成器和步进间隔。

run<I, A, T, E, Fut>(self, args_li: I) -> AsyncRx<Result<T, E>>

启动执行,返回异步接收器。

类型参数:

  • I - 参数迭代器
  • A - 参数类型 (Send + 'static)
  • T - 成功类型 (Send + Unpin + 'static)
  • E - 错误类型 (Send + Unpin + 'static)
  • Fut - gen_task 返回的 Future 类型

技术栈

目录结构

race/
├── src/
│   └── lib.rs        # 核心实现
├── tests/
│   └── main.rs       # 集成测试
├── readme/
│   ├── en.md         # 英文文档
│   └── zh.md         # 中文文档
└── Cargo.toml

历史

异步编程中的 "race" 模式源于 JavaScript 的 Promise.race(),在 ES6 (2015) 中引入。与 Promise.race() 同时启动所有任务不同,本库实现了阶梯变体,灵感来自 Google 2015 年论文《The Tail at Scale》中描述的 gRPC 对冲请求模式。

对冲请求通过延迟发送冗余请求来降低尾延迟,首个响应到达时取消其他请求。该技术广泛应用于 Google、Amazon 等大规模分布式系统。


关于

本项目为 js0.site ⋅ 重构互联网计划 的开源组件。

我们正在以组件化的方式重新定义互联网的开发范式,欢迎关注: