race 0.1.13

Staggered async task executor with race semantics / 阶梯式异步任务执行器,具备竞赛语义
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
[English]#en | [中文]#zh

---

<a id="en"></a>

# race : Staggered Async Task Executor

## Table of Contents

- [Introduction]#introduction
- [Features]#features
- [Installation]#installation
- [Usage]#usage
  - [Basic Usage]#basic-usage
  - [DNS Resolution with Staggered Requests]#dns-resolution-with-staggered-requests
  - [Infinite Task Streams]#infinite-task-streams
  - [Non-Copy Types]#non-copy-types
- [Design]#design
- [API Reference]#api-reference
- [Performance]#performance
- [Tech Stack]#tech-stack
- [Project Structure]#project-structure
- [History]#history

## Introduction

`race` is a high-performance Rust library implementing staggered async task execution. Tasks start at fixed intervals and race to completion - fastest task wins regardless of start order.

> **Note**: "Staggered" means tasks are launched sequentially at fixed time intervals, not simultaneously. For example, with a 50ms interval: Task 1 starts at 0ms, Task 2 at 50ms, Task 3 at 100ms, etc. This creates a "staircase" or "ladder" pattern of task launches.

Key difference from `Promise.race()`: Instead of starting all tasks simultaneously, tasks launch sequentially at configurable intervals, enabling:

- **Rate-limited API calls** - Respect API quotas and avoid overwhelming servers
- **Graceful degradation** - Try primary servers first, fallbacks later
- **Hedged requests** - Reduce tail latency with redundant requests
- **Infinite task streams** - Handle unlimited iterators efficiently

## Features

- **Staggered execution** - Tasks start at configurable intervals
- **Race semantics** - Fastest task completes first regardless of start order
- **Stream-based API** - Implements `futures::Stream` for async iteration
- **Infinite iterator support** - Tasks started on-demand, not all at once
- **Non-Copy type support** - Works with String, Vec, custom structs without Clone requirement
- **High performance** - Zero `dyn` dispatch, `coarsetime` optimization
- **Memory efficient** - Pre-allocated vectors, immediate cleanup
- **No `'static` requirement** - Lifetime tied to Race instance

## Installation

```sh
cargo add race
```

Or add to your `Cargo.toml`:

```toml
[dependencies]
race = "0.1.3"
```

## Usage

### Basic Usage

```rust
use futures::StreamExt;
use race::Race;

#[tokio::main]
async fn main() {
  let mut race = Race::new(
    std::time::Duration::from_millis(50),  // Start new task every 50ms
    |url: &str| async move {
      // Simulate network request with different latencies
      let latency = match url {
        "server1" => 100,
        "server2" => 20,  // Fastest
        _ => 80,
      };
      tokio::time::sleep(std::time::Duration::from_millis(latency)).await;
      Ok::<(&str, String), &'static str>((url, format!("Response from {url}")))
    },
    vec!["server1", "server2", "server3"],
  );

  // Get first completed result (server2 completes first despite starting second)
  if let Some((url, Ok(data))) = race.next().await {
    println!("First response from {url}: {data}");
  }
}
```

### DNS Resolution with Staggered Requests

Query multiple hosts with 500ms staggered delay, return first successful result:

```rust
use std::net::IpAddr;
use futures::StreamExt;
use race::Race;
use tokio::net::lookup_host;

#[tokio::main]
async fn main() {
  let hosts = vec!["google.com:80", "cloudflare.com:80", "github.com:80"];

  let mut race = Race::new(
    std::time::Duration::from_millis(500),
    |host: &str| async move {
      let addr = lookup_host(host).await?.next().ok_or_else(|| {
        std::io::Error::new(std::io::ErrorKind::NotFound, "no address")
      })?;
      Ok::<(&str, IpAddr), std::io::Error>((host, addr.ip()))
    },
    hosts,
  );

  // Return first successful response
  while let Some((host, result)) = race.next().await {
    if let Ok(ip) = result {
      println!("Resolved {host}: {ip}");
      break;
    }
  }
}
```

Timeline:
- 0ms: Start resolving google.com
- 500ms: Start resolving cloudflare.com (if no response yet)
- 1000ms: Start resolving github.com (if still no response)

First completed response wins, remaining tasks continue until Race is dropped.

### Infinite Task Streams

Handle unlimited iterators efficiently - tasks are started on-demand:

```rust
use futures::StreamExt;
use race::Race;

#[tokio::main]
async fn main() {
  // Infinite iterator - only starts tasks as needed
  let infinite_numbers = 0u64..;
  
  let mut race = Race::new(
    std::time::Duration::from_millis(50),
    |n: &u64| {
      let n = *n;
      async move {
        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
        Ok::<(u64, u64), &'static str>((n, n * n))
      }
    },
    infinite_numbers,
  );

  // Only consume what you need - no memory explosion
  for i in 0..5 {
    if let Some((n, Ok((n_val, square)))) = race.next().await {
      println!("Result {i}: {n_val}² = {square}");
    }
  }
  // Race is dropped here, remaining tasks are cancelled
}
```

### Non-Copy Types

Works seamlessly with non-Copy types like String, Vec, and custom structs:

```rust
use futures::StreamExt;
use race::Race;

#[derive(Debug, Clone)]
struct Task {
  id: u32,
  name: String,
  data: Vec<i32>,
}

#[tokio::main]
async fn main() {
  let tasks = vec![
    Task { id: 1, name: "process".to_string(), data: vec![1, 2, 3] },
    Task { id: 2, name: "analyze".to_string(), data: vec![4, 5, 6] },
  ];

  let mut race = Race::new(
    std::time::Duration::from_millis(100),
    |task: &Task| {
      let task = task.clone();
      async move {
        let sum: i32 = task.data.iter().sum();
        let result = format!("{}: sum={sum}", task.name);
        Ok::<String, &'static str>(result)
      }
    },
    tasks,
  );

  while let Some((original_task, Ok(result))) = race.next().await {
    println!("Task {}: {result}", original_task.id);
  }
}
```

## Design

```mermaid
graph TD
  A[Race::new] --> B[poll_next called]
  B --> C{Time to start new task?}
  C -->|Yes| D[Get next arg from iterator]
  D --> E[Call run function with &arg]
  E --> F[Store arg and Future in ing vector]
  F --> G[Update next_run time]
  G --> H{More args available?}
  H -->|Yes| C
  H -->|No| I[Mark is_end = true]
  C -->|No| J[Poll all running tasks]
  I --> J
  J --> K{Any task completed?}
  K -->|Yes| L[Remove from ing vector]
  L --> M[Return Some Ok arg, result]
  K -->|No| N{All tasks done?}
  N -->|Yes| O[Return None]
  N -->|No| P[Set timer for next start]
  P --> Q[Return Pending]
```

### Execution Flow

1. **Initialization**: `Race::new` stores task generator function, step interval, and argument iterator
2. **Task Scheduling**: On each `poll_next`, check if current time >= `next_run` to start new tasks
3. **Task Creation**: Call `run(&arg)` to create Future, store both arg and Future in `ing` vector
4. **Concurrent Polling**: All running tasks polled simultaneously using reverse iteration for safe removal
5. **Result Handling**: First completed task returns `(original_arg, result)` tuple immediately
6. **Timer Management**: `tokio::time::Sleep` ensures proper wakeup for next task start
7. **Stream Completion**: Stream ends when all tasks complete and iterator exhausted

### Key Design Decisions

- **Reference-based API**: Task generator receives `&A` to avoid unnecessary moves
- **Move Semantics**: Arguments moved from iterator to task storage, then returned with results
- **No Clone Requirement**: Works with any type that implements `Send + Unpin`, including non-cloneable types
- **Reverse Polling**: Tasks polled in reverse order to safely remove completed ones
- **Coarsetime Optimization**: Uses `coarsetime` for high-performance interval timing
- **Pre-allocated Storage**: `ing` vector pre-allocated to avoid frequent reallocations

## API Reference

### `Race<'a, A, T, E, G, Fut, I>`

Staggered race executor implementing `futures::Stream`.

Type parameters:
- `A` - Argument type
- `T` - Success result type  
- `E` - Error type
- `G` - Task generator function `Fn(A) -> Fut`
- `Fut` - Future type returning `Result<(A, T), E>`
- `I` - Iterator type yielding `A`

#### `new(step: std::time::Duration, run: G, args_li: impl IntoIterator) -> Self`

Create executor with step interval, task generator, and arguments.

**Parameters:**
- `step: std::time::Duration` - **Staggered delay interval for task starts**. Tasks are launched sequentially at this interval, not all at once. For example, `Duration::from_millis(50)` means each task starts 50ms after the previous one (converted to `coarsetime::Duration`)
- `run: G` - Function `Fn(&A) -> Fut` that creates Future from argument reference
- `args_li: impl IntoIterator<Item = A>` - Iterator of arguments (can be infinite)

**Returns:** `Race` instance implementing `futures::Stream<Item = Result<(A, T), E>>`

**Constraints:**
- `A: Send + Unpin + 'a` - Argument type must be thread-safe and unpinnable
- `T: Send + 'a` - Result type must be thread-safe
- `E: Send + 'a` - Error type must be thread-safe
- `G: Fn(&A) -> Fut + Send + Unpin + 'a` - Generator function must be thread-safe
- `Fut: Future<Output = Result<T, E>> + Send + 'a` - Future must be thread-safe
- `I: Iterator<Item = A> + Send + Unpin + 'a` - Iterator must be thread-safe

#### `Stream::poll_next`

Returns `Poll<Option<(A, Result<T, E>)>>`:
- `Poll::Ready(Some((arg, Ok(result))))` - Task completed successfully with original argument
- `Poll::Ready(Some((arg, Err(e))))` - Task failed with error, still returns original argument
- `Poll::Ready(None)` - All tasks completed and iterator exhausted
- `Poll::Pending` - No task ready, waker registered for future notification

## Performance

Optimizations implemented:

- **Zero `dyn` dispatch** - Fully generic, no virtual calls
- **`coarsetime`** - Fast time operations, reduced syscalls
- **Pre-allocated vectors** - Capacity 8 to avoid small reallocations
- **Efficient polling** - Reverse iteration for safe removal
- **Immediate cleanup** - Completed tasks dropped immediately

Benchmarks show significant performance improvements over channel-based approaches.

## Tech Stack

- [tokio]https://tokio.rs - Async runtime and sleep timer
- [futures]https://crates.io/crates/futures - Stream trait implementation
- [coarsetime]https://crates.io/crates/coarsetime - High-performance time operations with reduced syscalls

## Project Structure

```
race/
├── src/
│   └── lib.rs        # Core implementation
├── tests/
│   └── main.rs       # Integration tests with logging
├── readme/
│   ├── en.md         # English documentation
│   └── zh.md         # Chinese documentation
└── Cargo.toml
```

## History

The "race" pattern in async programming traces back to JavaScript's `Promise.race()`, introduced in ES6 (2015). Unlike `Promise.race()` which starts all tasks simultaneously, this library implements a staggered variant inspired by gRPC's "hedged requests" pattern described in Google's 2015 paper "The Tail at Scale".

Hedged requests help reduce tail latency by sending redundant requests after a delay, using whichever response arrives first. This technique is widely used in distributed systems at Google, Amazon, and other large-scale services.

This Rust implementation adds modern optimizations like zero-cost abstractions, efficient time handling, and support for infinite streams.

---

## About

This project is an open-source component of [js0.site ⋅ Refactoring the Internet Plan](https://js0.site).

We are redefining the development paradigm of the Internet in a componentized way. Welcome to follow us:

* [Google Group]https://groups.google.com/g/js0-site
* [js0site.bsky.social]https://bsky.app/profile/js0site.bsky.social

---

<a id="zh"></a>

# race : 阶梯式异步任务执行器

## 目录

- [简介]#简介
- [特性]#特性
- [安装]#安装
- [使用]#使用
  - [基本用法]#基本用法
  - [DNS 阶梯解析]#dns-阶梯解析
  - [无限任务流]#无限任务流
  - [非 Copy 类型]#非-copy-类型
- [设计]#设计
- [API 参考]#api-参考
- [性能]#性能
- [技术栈]#技术栈
- [目录结构]#目录结构
- [历史]#历史

## 简介

`race` 是高性能 Rust 阶梯式异步任务执行器。任务按固定间隔启动并竞赛完成 - 最快完成者获胜,与启动顺序无关。

> **提示**:"阶梯式"(Staggered)指任务按固定时间间隔依次启动,而非同时启动。例如,50ms 间隔表示:任务1在 0ms 启动,任务2在 50ms 启动,任务3在 100ms 启动,以此类推。这形成了任务启动的"阶梯"或"梯子"模式。

与 `Promise.race()` 的关键区别:不是同时启动所有任务,而是按可配置间隔依次启动,适用于:

- **限速 API 调用** - 遵守 API 配额,避免服务器过载
- **优雅降级** - 优先尝试主服务器,后备服务器延后启动
- **对冲请求** - 通过冗余请求降低尾延迟
- **无限任务流** - 高效处理无限迭代器

## 特性

- **阶梯式执行** - 任务按可配置间隔启动
- **竞赛语义** - 最快完成的任务先返回,与启动顺序无关
- **基于 Stream 的 API** - 实现 `futures::Stream` 用于异步迭代
- **支持无限迭代器** - 任务按需启动,不会一次性启动所有
- **非 Copy 类型支持** - 无需 Clone 约束即可支持 String、Vec、自定义结构体
- **高性能** - 零 `dyn` 分发,`coarsetime` 优化
- **内存高效** - 预分配向量,立即清理
- **`'static` 要求** - 生命周期与 Race 实例绑定

## 安装

```sh
cargo add race
```

或添加到 `Cargo.toml`:

```toml
[dependencies]
race = "0.1.3"
```

## 使用

### 基本用法

```rust
use futures::StreamExt;
use race::Race;

#[tokio::main]
async fn main() {
  let mut race = Race::new(
    std::time::Duration::from_millis(50),  // 每 50ms 启动新任务
    |url: &str| async move {
      // 模拟不同延迟的网络请求
      let latency = match url {
        "server1" => 100,
        "server2" => 20,  // 最快
        _ => 80,
      };
      tokio::time::sleep(std::time::Duration::from_millis(latency)).await;
      Ok::<(&str, String), &'static str>((url, format!("Response from {url}")))
    },
    vec!["server1", "server2", "server3"],
  );

  // 获取首个完成的结果(server2 虽然第二个启动但最先完成)
  if let Some((url, Ok(data))) = race.next().await {
    println!("首个响应来自 {url}: {data}");
  }
}
```

### DNS 阶梯解析

向多个主机发起解析,每 500ms 启动新请求,返回首个成功结果:

```rust
use std::net::IpAddr;
use futures::StreamExt;
use race::Race;
use tokio::net::lookup_host;

#[tokio::main]
async fn main() {
  let hosts = vec!["google.com:80", "cloudflare.com:80", "github.com:80"];

  let mut race = Race::new(
    std::time::Duration::from_millis(500),
    |host: &str| async move {
      let addr = lookup_host(host).await?.next().ok_or_else(|| {
        std::io::Error::new(std::io::ErrorKind::NotFound, "no address")
      })?;
      Ok::<(&str, IpAddr), std::io::Error>((host, addr.ip()))
    },
    hosts,
  );

  // 返回首个成功响应
  while let Some((host, result)) = race.next().await {
    if let Ok(ip) = result {
      println!("解析 {host}: {ip}");
      break;
    }
  }
}
```

时间线:
- 0ms: 开始解析 google.com
- 500ms: 开始解析 cloudflare.com(若无响应)
- 1000ms: 开始解析 github.com(若仍无响应)

首个完成的响应返回,剩余任务在后台继续运行直到 Race 被 drop。

### 无限任务流

高效处理无限迭代器 - 任务按需启动:

```rust
use futures::StreamExt;
use race::Race;

#[tokio::main]
async fn main() {
  // 无限迭代器 - 只按需启动任务
  let infinite_numbers = 0u64..;

  let mut race = Race::new(
    std::time::Duration::from_millis(50),
    |n: &u64| {
      let n = *n;
      async move {
        tokio::time::sleep(std::time::Duration::from_millis(100)).await;
        Ok::<(u64, u64), &'static str>((n, n * n))
      }
    },
    infinite_numbers,
  );

  // 只消费需要的部分 - 不会内存爆炸
  for i in 0..5 {
    if let Some((n, Ok((n_val, square)))) = race.next().await {
      println!("结果 {i}: {n_val}² = {square}");
    }
  }
  // Race 在此处被 drop,剩余任务被取消
}
```

### 非 Copy 类型

无缝支持非 Copy 类型,如 String、Vec 和自定义结构体:

```rust
use futures::StreamExt;
use race::Race;

#[derive(Debug, Clone)]
struct Task {
  id: u32,
  name: String,
  data: Vec<i32>,
}

#[tokio::main]
async fn main() {
  let tasks = vec![
    Task { id: 1, name: "process".to_string(), data: vec![1, 2, 3] },
    Task { id: 2, name: "analyze".to_string(), data: vec![4, 5, 6] },
  ];

  let mut race = Race::new(
    std::time::Duration::from_millis(100),
    |task: &Task| {
      let task = task.clone();
      async move {
        let sum: i32 = task.data.iter().sum();
        let result = format!("{}: sum={sum}", task.name);
        Ok::<String, &'static str>(result)
      }
    },
    tasks,
  );

  while let Some((original_task, Ok(result))) = race.next().await {
    println!("任务 {}: {result}", original_task.id);
  }
}
```

## 设计

```mermaid
graph TD
  A[Race::new] --> B[poll_next 被调用]
  B --> C{是否该启动新任务?}
  C -->|| D[从迭代器获取下个参数]
  D --> E[用 &arg 调用 run 函数]
  E --> F[将 arg 和 Future 存入 ing 向量]
  F --> G[更新 next_run 时间]
  G --> H{还有参数可用?}
  H -->|| C
  H -->|| I[标记 is_end = true]
  C -->|| J[轮询所有运行中的任务]
  I --> J
  J --> K{有任务完成?}
  K -->|| L[从 ing 向量移除]
  L --> M[返回 Some Ok arg, result]
  K -->|| N{所有任务完成?}
  N -->|| O[返回 None]
  N -->|| P[设置下次启动定时器]
  P --> Q[返回 Pending]
```

### 执行流程

1. **初始化**: `Race::new` 存储任务生成器函数、步进间隔和参数迭代器
2. **任务调度**: 每次 `poll_next` 检查当前时间是否 >= `next_run` 来启动新任务
3. **任务创建**: 调用 `run(&arg)` 创建 Future,将参数和 Future 存储在 `ing` 向量中
4. **并发轮询**: 所有运行中的任务同时轮询,使用反向迭代安全移除
5. **结果处理**: 首个完成的任务立即返回 `(原始参数, 结果)` 元组
6. **定时器管理**: `tokio::time::Sleep` 确保下次任务启动的正确唤醒
7. **流完成**: 当所有任务完成且迭代器耗尽时流结束

### 关键设计决策

- **基于引用的 API**: 任务生成器接收 `&A` 避免不必要的移动
- **移动语义**: 参数从迭代器移动到任务存储,然后与结果一起返回
- **无 Clone 要求**: 适用于任何实现 `Send + Unpin` 的类型,包括不可克隆类型
- **反向轮询**: 任务按反向顺序轮询以安全移除已完成的任务
- **Coarsetime 优化**: 使用 `coarsetime` 进行高性能间隔计时
- **预分配存储**: `ing` 向量预分配以避免频繁重新分配

## API 参考

### `Race<'a, A, T, E, G, Fut, I>`

阶梯式竞赛执行器,实现 `futures::Stream`。

类型参数:
- `A` - 参数类型
- `T` - 成功结果类型
- `E` - 错误类型
- `G` - 任务生成函数 `Fn(A) -> Fut`
- `Fut` - Future 类型,返回 `Result<(A, T), E>`
- `I` - 迭代器类型,产生 `A`

#### `new(step: std::time::Duration, run: G, args_li: impl IntoIterator) -> Self`

创建执行器,传入步进间隔、任务生成器和参数。

**参数:**
- `step: std::time::Duration` - **阶梯式延时启动间隔**。每个任务按此间隔依次启动,而非同时启动所有任务。例如 `Duration::from_millis(50)` 表示每隔 50ms 启动下一个任务(转换为 `coarsetime::Duration`- `run: G` - 函数 `Fn(&A) -> Fut`,从参数引用创建 Future
- `args_li: impl IntoIterator<Item = A>` - 参数迭代器(可以是无限的)

**返回:** 实现 `futures::Stream<Item = Result<(A, T), E>>` 的 `Race` 实例

**约束:**
- `A: Send + Unpin + 'a` - 参数类型必须线程安全且可 unpin
- `T: Send + 'a` - 结果类型必须线程安全
- `E: Send + 'a` - 错误类型必须线程安全
- `G: Fn(&A) -> Fut + Send + Unpin + 'a` - 生成器函数必须线程安全
- `Fut: Future<Output = Result<T, E>> + Send + 'a` - Future 必须线程安全
- `I: Iterator<Item = A> + Send + Unpin + 'a` - 迭代器必须线程安全

#### `Stream::poll_next`

返回 `Poll<Option<(A, Result<T, E>)>>`:
- `Poll::Ready(Some((arg, Ok(result))))` - 任务成功完成,返回原始参数
- `Poll::Ready(Some((arg, Err(e))))` - 任务失败,仍返回原始参数
- `Poll::Ready(None)` - 所有任务完成且迭代器耗尽
- `Poll::Pending` - 暂无任务就绪,已注册唤醒器等待未来通知

## 性能

实现的优化:

- **`dyn` 分发** - 完全泛型,无虚拟调用
- **`coarsetime`** - 快速时间操作,减少系统调用
- **预分配向量** - 容量 8 避免小规模重新分配
- **高效轮询** - 反向迭代安全移除
- **立即清理** - 完成的任务立即 drop

基准测试显示相比基于 channel 的方法有显著性能提升。

## 技术栈

- [tokio]https://tokio.rs - 异步运行时和睡眠定时器
- [futures]https://crates.io/crates/futures - Stream trait 实现
- [coarsetime]https://crates.io/crates/coarsetime - 高性能时间操作,减少系统调用

## 目录结构

```
race/
├── src/
│   └── lib.rs        # 核心实现
├── tests/
│   └── main.rs       # 集成测试
├── readme/
│   ├── en.md         # 英文文档
│   └── zh.md         # 中文文档
└── Cargo.toml
```

## 历史

异步编程中的 "race" 模式源于 JavaScript 的 `Promise.race()`,在 ES6 (2015) 中引入。与 `Promise.race()` 同时启动所有任务不同,本库实现了阶梯变体,灵感来自 Google 2015 年论文 《The Tail at Scale》 中描述的 gRPC 对冲请求模式。

对冲请求通过延迟发送冗余请求来降低尾延迟,使用最先到达的响应。该技术广泛应用于 Google、Amazon 等大规模分布式系统。

---

## 关于

本项目为 [js0.site ⋅ 重构互联网计划](https://js0.site) 的开源组件。

我们正在以组件化的方式重新定义互联网的开发范式,欢迎关注:

* [谷歌邮件列表]https://groups.google.com/g/js0-site
* [js0site.bsky.social]https://bsky.app/profile/js0site.bsky.social