race 0.1.3

Staggered async task runner / 阶梯式异步任务执行器
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
[English]#en | [中文]#zh

---

<a id="en"></a>

# race : Staggered Async Task Runner

## Table of Contents

- [Introduction]#introduction
- [Features]#features
- [Installation]#installation
- [Usage]#usage
  - [DNS Resolution with Hedged Requests]#dns-resolution-with-hedged-requests
- [Design]#design
- [API Reference]#api-reference
- [Tech Stack]#tech-stack
- [Project Structure]#project-structure
- [History]#history

## Introduction

`race` is a Rust library for running async tasks with staggered delays. Instead of launching all tasks simultaneously, it starts them one by one at fixed intervals, collecting results through a channel.

This pattern is useful for:

- Rate-limited API calls
- Graceful degradation with fallback servers
- Load balancing across multiple endpoints

## Features

- Staggered task execution with configurable delay
- Early termination when receiver is dropped
- Zero-copy result streaming via channel
- Generic over task type and arguments

## Installation

```sh
cargo add race
```

## Usage

```rust
use std::time::Duration;
use race::Race;

#[tokio::main]
async fn main() {
  let race = Race::new(
    |url: &&str| {
      let url = *url;
      async move {
        // Simulate network request
        Ok::<String, &'static str>(format!("Response from {url}"))
      }
    },
    Duration::from_millis(100)
  );
  let rx = race.run(vec!["server1", "server2", "server3"]);

  // Get first successful result
  while let Ok((arg, result)) = rx.recv().await {
    if let Ok(data) = result {
      println!("From {arg}: {data}");
      break;
    }
  }
}
```

### DNS Resolution with Hedged Requests

Query multiple hosts with 500ms staggered delay, return first successful result:

```rust
use std::net::IpAddr;
use std::time::Duration;
use race::Race;
use tokio::net::lookup_host;

#[tokio::main]
async fn main() {
  let hosts = vec!["google.com:80", "cloudflare.com:80", "github.com:80"];

  let race = Race::new(
    |host: &&str| {
      let host = *host;
      async move {
        let addr = lookup_host(host).await?.next().ok_or_else(|| {
          std::io::Error::new(std::io::ErrorKind::NotFound, "no address")
        })?;
        Ok::<IpAddr, std::io::Error>(addr.ip())
      }
    },
    Duration::from_millis(500)
  );
  let rx = race.run(hosts);

  // Return first successful response
  while let Ok((host, result)) = rx.recv().await {
    if let Ok(ip) = result {
      println!("Resolved {host}: {ip}");
      break;
    }
  }
}
```

Timeline:
- 0ms: Resolve google.com
- 500ms: Resolve cloudflare.com (if no response yet)
- 1000ms: Resolve github.com (if still no response)

First successful response wins, remaining queries are abandoned when receiver drops.

## Design

```mermaid
graph TD
  A[Race::new] --> B[Race::run]
  B --> C[Spawn tokio task]
  C --> D{For each arg}
  D --> E[Check tx.is_disconnected]
  E -->|Yes| F[Break loop]
  E -->|No| G[Sleep delay]
  G --> H[Execute gen_task]
  H --> I[Send result via channel]
  I --> D
  D -->|Done| J[Task ends]
```

The execution flow:

1. `Race::new` stores the task generator and step duration
2. `Race::run` spawns a tokio task that iterates over arguments
3. Each iteration waits for `step * index` before execution
4. Results are sent through a bounded SPSC channel
5. Loop terminates early if receiver is dropped

## API Reference

### `Race<G>`

Staggered task runner.

Fields:
- `step: Duration` - Delay between task starts
- `gen_task: G` - Function that generates async tasks

Methods:

#### `new(gen_task: G, step: Duration) -> Self`

Create runner with task generator and step interval.

#### `run<I, A, T, E, Fut>(self, args_li: I) -> AsyncRx<(A, Result<T, E>)>`

Start execution, returns async receiver with argument and result pairs.

Type parameters:
- `I` - Iterator of arguments
- `A` - Argument type (Send + Unpin + 'static)
- `T` - Success type (Send + Unpin + 'static)
- `E` - Error type (Send + Unpin + 'static)
- `Fut` - Future type returned by gen_task

The receiver yields `(A, Result<T, E>)` tuples where `A` is the original argument and `Result<T, E>` is the task result.

## Tech Stack

- [tokio]https://tokio.rs - Async runtime, timer
- [crossfire]https://crates.io/crates/crossfire - Lock-free SPSC channel

## Project Structure

```
race/
├── src/
│   └── lib.rs        # Core implementation
├── tests/
│   └── main.rs       # Integration tests
├── readme/
│   ├── en.md         # English documentation
│   └── zh.md         # Chinese documentation
└── Cargo.toml
```

## History

The "race" pattern in async programming traces back to JavaScript's `Promise.race()`, introduced in ES6 (2015). Unlike `Promise.race()` which starts all tasks simultaneously, this library implements a staggered variant inspired by gRPC's "hedged requests" pattern described in Google's 2015 paper "The Tail at Scale".

Hedged requests help reduce tail latency by sending redundant requests after a delay, canceling pending ones when the first response arrives. This technique is widely used in distributed systems at Google, Amazon, and other large-scale services.

---

## About

This project is an open-source component of [js0.site ⋅ Refactoring the Internet Plan](https://js0.site).

We are redefining the development paradigm of the Internet in a componentized way. Welcome to follow us:

* [Google Group]https://groups.google.com/g/js0-site
* [js0site.bsky.social]https://bsky.app/profile/js0site.bsky.social

---

<a id="zh"></a>

# race : 阶梯式异步任务执行器

## 目录

- [简介]#简介
- [特性]#特性
- [安装]#安装
- [使用]#使用
  - [DNS 阶梯解析]#dns-阶梯解析
- [设计]#设计
- [API 参考]#api-参考
- [技术栈]#技术栈
- [目录结构]#目录结构
- [历史]#历史

## 简介

`race` 是 Rust 异步任务阶梯执行库。不同于同时启动所有任务,它按固定间隔依次启动,通过通道收集结果。

适用场景:

- 限速 API 调用
- 多服务器优雅降级
- 多端点负载均衡

## 特性

- 可配置延迟的阶梯执行
- 接收端关闭时提前终止
- 通道零拷贝结果流
- 泛型支持任意任务类型和参数

## 安装

```sh
cargo add race
```

## 使用

```rust
use std::time::Duration;
use race::Race;

#[tokio::main]
async fn main() {
  let race = Race::new(
    |url: &&str| {
      let url = *url;
      async move {
        // 模拟网络请求
        Ok::<String, &'static str>(format!("Response from {url}"))
      }
    },
    Duration::from_millis(100)
  );
  let rx = race.run(vec!["server1", "server2", "server3"]);

  // 获取首个成功结果
  while let Ok((arg, result)) = rx.recv().await {
    if let Ok(data) = result {
      println!("From {arg}: {data}");
      break;
    }
  }
}
```

### DNS 阶梯解析

向多个主机发起解析,每 500ms 启动新请求,返回首个成功结果:

```rust
use std::net::IpAddr;
use std::time::Duration;
use race::Race;
use tokio::net::lookup_host;

#[tokio::main]
async fn main() {
  let hosts = vec!["google.com:80", "cloudflare.com:80", "github.com:80"];

  let race = Race::new(
    |host: &&str| {
      let host = *host;
      async move {
        let addr = lookup_host(host).await?.next().ok_or_else(|| {
          std::io::Error::new(std::io::ErrorKind::NotFound, "no address")
        })?;
        Ok::<IpAddr, std::io::Error>(addr.ip())
      }
    },
    Duration::from_millis(500)
  );
  let rx = race.run(hosts);

  // 返回首个成功响应
  while let Ok((host, result)) = rx.recv().await {
    if let Ok(ip) = result {
      println!("Resolved {host}: {ip}");
      break;
    }
  }
}
```

时间线:
- 0ms: 解析 google.com
- 500ms: 解析 cloudflare.com(若无响应)
- 1000ms: 解析 github.com(若仍无响应)

首个成功响应返回后,接收端 drop,剩余查询自动终止。

## 设计

```mermaid
graph TD
  A[Race::new] --> B[Race::run]
  B --> C[Spawn tokio task]
  C --> D{For each arg}
  D --> E[Check tx.is_disconnected]
  E -->|Yes| F[Break loop]
  E -->|No| G[Sleep delay]
  G --> H[Execute gen_task]
  H --> I[Send result via channel]
  I --> D
  D -->|Done| J[Task ends]
```

执行流程:

1. `Race::new` 存储任务生成器和步进间隔
2. `Race::run` 启动 tokio 任务遍历参数
3. 每次迭代等待 `step * index` 后执行
4. 结果通过有界 SPSC 通道发送
5. 接收端关闭时循环提前终止

## API 参考

### `Race<G>`

阶梯式任务执行器。

字段:
- `step: Duration` - 任务启动间隔
- `gen_task: G` - 异步任务生成函数

方法:

#### `new(gen_task: G, step: Duration) -> Self`

创建执行器,传入任务生成器和步进间隔。

#### `run<I, A, T, E, Fut>(self, args_li: I) -> AsyncRx<(A, Result<T, E>)>`

启动执行,返回包含参数和结果对的异步接收器。

类型参数:
- `I` - 参数迭代器
- `A` - 参数类型 (Send + Unpin + 'static)
- `T` - 成功类型 (Send + Unpin + 'static)
- `E` - 错误类型 (Send + Unpin + 'static)
- `Fut` - gen_task 返回的 Future 类型

接收器产生 `(A, Result<T, E>)` 元组,其中 `A` 是原始参数,`Result<T, E>` 是任务结果。

## 技术栈

- [tokio]https://tokio.rs - 异步运行时、定时器
- [crossfire]https://crates.io/crates/crossfire - 无锁 SPSC 通道

## 目录结构

```
race/
├── src/
│   └── lib.rs        # 核心实现
├── tests/
│   └── main.rs       # 集成测试
├── readme/
│   ├── en.md         # 英文文档
│   └── zh.md         # 中文文档
└── Cargo.toml
```

## 历史

异步编程中的 "race" 模式源于 JavaScript 的 `Promise.race()`,在 ES6 (2015) 中引入。与 `Promise.race()` 同时启动所有任务不同,本库实现了阶梯变体,灵感来自 Google 2015 年论文《The Tail at Scale》中描述的 gRPC 对冲请求模式。

对冲请求通过延迟发送冗余请求来降低尾延迟,首个响应到达时取消其他请求。该技术广泛应用于 Google、Amazon 等大规模分布式系统。

---

## 关于

本项目为 [js0.site ⋅ 重构互联网计划](https://js0.site) 的开源组件。

我们正在以组件化的方式重新定义互联网的开发范式,欢迎关注:

* [谷歌邮件列表]https://groups.google.com/g/js0-site
* [js0site.bsky.social]https://bsky.app/profile/js0site.bsky.social