---
<a id="en"></a>
# race : Staggered Async Task Runner
## Table of Contents
- [Introduction](#introduction)
- [Features](#features)
- [Installation](#installation)
- [Usage](#usage)
- [DNS Resolution with Hedged Requests](#dns-resolution-with-hedged-requests)
- [Design](#design)
- [API Reference](#api-reference)
- [Tech Stack](#tech-stack)
- [Project Structure](#project-structure)
- [History](#history)
## Introduction
`race` is a Rust library for running async tasks with staggered delays. Instead of launching all tasks simultaneously, it starts them one by one at fixed intervals, collecting results through a channel.
This pattern is useful for:
- Rate-limited API calls
- Graceful degradation with fallback servers
- Load balancing across multiple endpoints
## Features
- Staggered task execution with configurable delay
- Early termination when receiver is dropped
- Zero-copy result streaming via channel
- Generic over task type and arguments
## Installation
```sh
cargo add race
```
## Usage
```rust
use std::time::Duration;
use race::Race;
#[tokio::main]
async fn main() {
let race = Race::new(
|url: &&str| {
let url = *url;
async move {
// Simulate network request
Ok::<String, &'static str>(format!("Response from {url}"))
}
},
Duration::from_millis(100)
);
let rx = race.run(vec!["server1", "server2", "server3"]);
// Get first successful result
while let Ok((arg, result)) = rx.recv().await {
if let Ok(data) = result {
println!("From {arg}: {data}");
break;
}
}
}
```
### DNS Resolution with Hedged Requests
Query multiple hosts with 500ms staggered delay, return first successful result:
```rust
use std::net::IpAddr;
use std::time::Duration;
use race::Race;
use tokio::net::lookup_host;
#[tokio::main]
async fn main() {
let hosts = vec!["google.com:80", "cloudflare.com:80", "github.com:80"];
let race = Race::new(
|host: &&str| {
let host = *host;
async move {
let addr = lookup_host(host).await?.next().ok_or_else(|| {
std::io::Error::new(std::io::ErrorKind::NotFound, "no address")
})?;
Ok::<IpAddr, std::io::Error>(addr.ip())
}
},
Duration::from_millis(500)
);
let rx = race.run(hosts);
// Return first successful response
while let Ok((host, result)) = rx.recv().await {
if let Ok(ip) = result {
println!("Resolved {host}: {ip}");
break;
}
}
}
```
Timeline:
- 0ms: Resolve google.com
- 500ms: Resolve cloudflare.com (if no response yet)
- 1000ms: Resolve github.com (if still no response)
First successful response wins, remaining queries are abandoned when receiver drops.
## Design
```mermaid
graph TD
A[Race::new] --> B[Race::run]
B --> C[Spawn tokio task]
C --> D{For each arg}
D --> E[Check tx.is_disconnected]
G --> H[Execute gen_task]
H --> I[Send result via channel]
I --> D
D -->|Done| J[Task ends]
```
The execution flow:
1. `Race::new` stores the task generator and step duration
2. `Race::run` spawns a tokio task that iterates over arguments
3. Each iteration waits for `step * index` before execution
4. Results are sent through a bounded SPSC channel
5. Loop terminates early if receiver is dropped
## API Reference
### `Race<G>`
Staggered task runner.
Fields:
- `step: Duration` - Delay between task starts
- `gen_task: G` - Function that generates async tasks
Methods:
#### `new(gen_task: G, step: Duration) -> Self`
Create runner with task generator and step interval.
#### `run<I, A, T, E, Fut>(self, args_li: I) -> AsyncRx<(A, Result<T, E>)>`
Start execution, returns async receiver with argument and result pairs.
Type parameters:
- `I` - Iterator of arguments
- `A` - Argument type (Send + Unpin + 'static)
- `T` - Success type (Send + Unpin + 'static)
- `E` - Error type (Send + Unpin + 'static)
- `Fut` - Future type returned by gen_task
The receiver yields `(A, Result<T, E>)` tuples where `A` is the original argument and `Result<T, E>` is the task result.
## Tech Stack
- [tokio](https://tokio.rs) - Async runtime, timer
- [crossfire](https://crates.io/crates/crossfire) - Lock-free SPSC channel
## Project Structure
```
race/
├── src/
│ └── lib.rs # Core implementation
├── tests/
│ └── main.rs # Integration tests
├── readme/
│ ├── en.md # English documentation
│ └── zh.md # Chinese documentation
└── Cargo.toml
```
## History
The "race" pattern in async programming traces back to JavaScript's `Promise.race()`, introduced in ES6 (2015). Unlike `Promise.race()` which starts all tasks simultaneously, this library implements a staggered variant inspired by gRPC's "hedged requests" pattern described in Google's 2015 paper "The Tail at Scale".
Hedged requests help reduce tail latency by sending redundant requests after a delay, canceling pending ones when the first response arrives. This technique is widely used in distributed systems at Google, Amazon, and other large-scale services.
---
## About
This project is an open-source component of [js0.site ⋅ Refactoring the Internet Plan](https://js0.site).
We are redefining the development paradigm of the Internet in a componentized way. Welcome to follow us:
* [Google Group](https://groups.google.com/g/js0-site)
* [js0site.bsky.social](https://bsky.app/profile/js0site.bsky.social)
---
<a id="zh"></a>
# race : 阶梯式异步任务执行器
## 目录
- [简介](#简介)
- [特性](#特性)
- [安装](#安装)
- [使用](#使用)
- [DNS 阶梯解析](#dns-阶梯解析)
- [设计](#设计)
- [API 参考](#api-参考)
- [技术栈](#技术栈)
- [目录结构](#目录结构)
- [历史](#历史)
## 简介
`race` 是 Rust 异步任务阶梯执行库。不同于同时启动所有任务,它按固定间隔依次启动,通过通道收集结果。
适用场景:
- 限速 API 调用
- 多服务器优雅降级
- 多端点负载均衡
## 特性
- 可配置延迟的阶梯执行
- 接收端关闭时提前终止
- 通道零拷贝结果流
- 泛型支持任意任务类型和参数
## 安装
```sh
cargo add race
```
## 使用
```rust
use std::time::Duration;
use race::Race;
#[tokio::main]
async fn main() {
let race = Race::new(
|url: &&str| {
let url = *url;
async move {
// 模拟网络请求
Ok::<String, &'static str>(format!("Response from {url}"))
}
},
Duration::from_millis(100)
);
let rx = race.run(vec!["server1", "server2", "server3"]);
// 获取首个成功结果
while let Ok((arg, result)) = rx.recv().await {
if let Ok(data) = result {
println!("From {arg}: {data}");
break;
}
}
}
```
### DNS 阶梯解析
向多个主机发起解析,每 500ms 启动新请求,返回首个成功结果:
```rust
use std::net::IpAddr;
use std::time::Duration;
use race::Race;
use tokio::net::lookup_host;
#[tokio::main]
async fn main() {
let hosts = vec!["google.com:80", "cloudflare.com:80", "github.com:80"];
let race = Race::new(
|host: &&str| {
let host = *host;
async move {
let addr = lookup_host(host).await?.next().ok_or_else(|| {
std::io::Error::new(std::io::ErrorKind::NotFound, "no address")
})?;
Ok::<IpAddr, std::io::Error>(addr.ip())
}
},
Duration::from_millis(500)
);
let rx = race.run(hosts);
// 返回首个成功响应
while let Ok((host, result)) = rx.recv().await {
if let Ok(ip) = result {
println!("Resolved {host}: {ip}");
break;
}
}
}
```
时间线:
- 0ms: 解析 google.com
- 500ms: 解析 cloudflare.com(若无响应)
- 1000ms: 解析 github.com(若仍无响应)
首个成功响应返回后,接收端 drop,剩余查询自动终止。
## 设计
```mermaid
graph TD
A[Race::new] --> B[Race::run]
B --> C[Spawn tokio task]
C --> D{For each arg}
D --> E[Check tx.is_disconnected]
G --> H[Execute gen_task]
H --> I[Send result via channel]
I --> D
D -->|Done| J[Task ends]
```
执行流程:
1. `Race::new` 存储任务生成器和步进间隔
2. `Race::run` 启动 tokio 任务遍历参数
3. 每次迭代等待 `step * index` 后执行
4. 结果通过有界 SPSC 通道发送
5. 接收端关闭时循环提前终止
## API 参考
### `Race<G>`
阶梯式任务执行器。
字段:
- `step: Duration` - 任务启动间隔
- `gen_task: G` - 异步任务生成函数
方法:
#### `new(gen_task: G, step: Duration) -> Self`
创建执行器,传入任务生成器和步进间隔。
#### `run<I, A, T, E, Fut>(self, args_li: I) -> AsyncRx<(A, Result<T, E>)>`
启动执行,返回包含参数和结果对的异步接收器。
类型参数:
- `I` - 参数迭代器
- `A` - 参数类型 (Send + Unpin + 'static)
- `T` - 成功类型 (Send + Unpin + 'static)
- `E` - 错误类型 (Send + Unpin + 'static)
- `Fut` - gen_task 返回的 Future 类型
接收器产生 `(A, Result<T, E>)` 元组,其中 `A` 是原始参数,`Result<T, E>` 是任务结果。
## 技术栈
- [tokio](https://tokio.rs) - 异步运行时、定时器
- [crossfire](https://crates.io/crates/crossfire) - 无锁 SPSC 通道
## 目录结构
```
race/
├── src/
│ └── lib.rs # 核心实现
├── tests/
│ └── main.rs # 集成测试
├── readme/
│ ├── en.md # 英文文档
│ └── zh.md # 中文文档
└── Cargo.toml
```
## 历史
异步编程中的 "race" 模式源于 JavaScript 的 `Promise.race()`,在 ES6 (2015) 中引入。与 `Promise.race()` 同时启动所有任务不同,本库实现了阶梯变体,灵感来自 Google 2015 年论文《The Tail at Scale》中描述的 gRPC 对冲请求模式。
对冲请求通过延迟发送冗余请求来降低尾延迟,首个响应到达时取消其他请求。该技术广泛应用于 Google、Amazon 等大规模分布式系统。
---
## 关于
本项目为 [js0.site ⋅ 重构互联网计划](https://js0.site) 的开源组件。
我们正在以组件化的方式重新定义互联网的开发范式,欢迎关注:
* [谷歌邮件列表](https://groups.google.com/g/js0-site)
* [js0site.bsky.social](https://bsky.app/profile/js0site.bsky.social)