race : Staggered Race Executor
Table of Contents
- Introduction
- Features
- Installation
- Usage
- Design
- API Reference
- Performance
- Tech Stack
- Project Structure
- History
Introduction
race is a high-performance Rust library implementing a staggered race executor. Tasks are started at fixed intervals and race to complete - whichever finishes first is returned first, regardless of start order.
Key difference from Promise.race(): Instead of starting all tasks simultaneously, tasks are launched one by one at configurable intervals, enabling:
- Rate-limited API calls - Respect API quotas and avoid overwhelming servers
- Graceful degradation - Try primary servers first, fallbacks later
- Hedged requests - Reduce tail latency with redundant requests
- Infinite task streams - Handle unlimited iterators efficiently
Features
- Staggered execution - Tasks start at configurable intervals
- Race semantics - Fastest task completes first regardless of start order
- Stream-based API - Implements
futures::Streamfor async iteration - Infinite iterator support - Tasks started on-demand, not all at once
- High performance - Zero
dyndispatch,coarsetimeoptimization - Memory efficient - Pre-allocated vectors, immediate cleanup
- No
'staticrequirement - Lifetime tied to Race instance
Installation
Or add to your Cargo.toml:
[]
= "0.1.3"
Usage
Basic Usage
use Duration;
use StreamExt;
use Race;
async
DNS Resolution with Staggered Requests
Query multiple hosts with 500ms staggered delay, return first successful result:
use IpAddr;
use StreamExt;
use Race;
use lookup_host;
async
Timeline:
- 0ms: Start resolving google.com
- 500ms: Start resolving cloudflare.com (if no response yet)
- 1000ms: Start resolving github.com (if still no response)
First completed response wins, remaining tasks continue until Race is dropped.
Infinite Task Streams
Handle unlimited iterators efficiently - tasks are started on-demand:
use StreamExt;
use Race;
async
Design
graph TD
A[Race::new] --> B[poll_next called]
B --> C{Time to start new task?}
C -->|Yes| D[Start task from iterator]
D --> E[Schedule next start time]
E --> F{More args?}
F -->|Yes| C
F -->|No| G[Mark args exhausted]
C -->|No| H[Poll all running tasks]
G --> H
H --> I{Any task completed?}
I -->|Yes| J[Return result]
I -->|No| K{All done?}
K -->|Yes| L[Return None]
K -->|No| M[Return Pending + Set Timer]
Execution flow:
Race::newstores task generator, step interval, and argument iterator- On each
poll_next, check if it's time to start new tasks - Tasks are started at fixed intervals using
coarsetimefor performance - All running tasks are polled concurrently
- First completed task's result is returned immediately
- Timer ensures proper wakeup for next task start
- Stream ends when all tasks complete and iterator is exhausted
API Reference
Race<'a, A, T, E, G, Fut, I>
Staggered race executor implementing futures::Stream.
Type parameters:
A- Argument typeT- Success result typeE- Error typeG- Task generator functionFn(A) -> FutFut- Future type returningResult<(A, T), E>I- Iterator type yieldingA
new(run: G, step: coarsetime::Duration, args_li: impl IntoIterator) -> Self
Create executor with task generator, step interval, and arguments.
run- Function that creates a task from an argumentstep- Interval between task starts (usingcoarsetime::Duration)args_li- Iterator of arguments (can be infinite)
Returns a Race instance that implements futures::Stream.
Stream::poll_next
Returns Poll<Option<Result<(A, T), E>>>:
Poll::Ready(Some(Ok((arg, result))))- Task completed successfullyPoll::Ready(Some(Err(e)))- Task failedPoll::Ready(None)- All tasks completedPoll::Pending- No task ready yet
Performance
Optimizations implemented:
- Zero
dyndispatch - Fully generic, no virtual calls coarsetime- Fast time operations, reduced syscalls- Pre-allocated vectors - Capacity 8 to avoid small reallocations
- Efficient polling - Reverse iteration for safe removal
- Immediate cleanup - Completed tasks dropped immediately
Benchmarks show significant performance improvements over channel-based approaches.
Tech Stack
- tokio - Async runtime and sleep timer
- futures - Stream trait implementation
- coarsetime - High-performance time operations with reduced syscalls
Project Structure
race/
├── src/
│ └── lib.rs # Core implementation
├── tests/
│ └── main.rs # Integration tests with logging
├── readme/
│ ├── en.md # English documentation
│ └── zh.md # Chinese documentation
└── Cargo.toml
History
The "race" pattern in async programming traces back to JavaScript's Promise.race(), introduced in ES6 (2015). Unlike Promise.race() which starts all tasks simultaneously, this library implements a staggered variant inspired by gRPC's "hedged requests" pattern described in Google's 2015 paper "The Tail at Scale".
Hedged requests help reduce tail latency by sending redundant requests after a delay, using whichever response arrives first. This technique is widely used in distributed systems at Google, Amazon, and other large-scale services.
This Rust implementation adds modern optimizations like zero-cost abstractions, efficient time handling, and support for infinite streams.
About
This project is an open-source component of js0.site ⋅ Refactoring the Internet Plan.
We are redefining the development paradigm of the Internet in a componentized way. Welcome to follow us:
race : 阶梯式竞赛执行器
目录
简介
race 是 Rust 阶梯式竞赛执行器库。任务按固定间隔启动,竞赛完成 - 谁先完成谁先返回。
与 Promise.race() 的关键区别:不是同时启动所有任务,而是按可配置间隔逐个启动,适用于:
- 限速 API 调用
- 多服务器优雅降级
- 对冲请求降低尾延迟
- 无限任务流(按需启动任务)
特性
- 阶梯式执行 - 任务按可配置间隔启动
- 竞赛语义 - 最快完成的任务先返回,与启动顺序无关
- 基于 Stream 的 API - 实现
futures::Stream用于异步迭代 - 支持无限迭代器 - 任务按需启动,不会一次性启动所有
- 高性能 - 零
dyn分发,coarsetime优化 - 内存高效 - 预分配向量,立即清理
- 无
'static要求 - 生命周期与 Race 实例绑定
安装
或添加到 Cargo.toml:
[]
= "0.1.3"
使用
基本用法
use Duration;
use StreamExt;
use Race;
async
DNS 阶梯解析
向多个主机发起解析,每 500ms 启动新请求,返回首个成功结果:
use IpAddr;
use Duration;
use StreamExt;
use Race;
use lookup_host;
async
时间线:
- 0ms: 开始解析 google.com
- 500ms: 开始解析 cloudflare.com(若无响应)
- 1000ms: 开始解析 github.com(若仍无响应)
首个完成的响应返回,剩余任务在后台继续运行直到 Race 被 drop。
无限任务流
高效处理无限迭代器 - 任务按需启动:
use Duration;
use StreamExt;
use Race;
async
设计
graph TD
A[Race::new] --> B[poll_next 被调用]
B --> C{是否该启动新任务?}
C -->|是| D[从迭代器启动任务]
D --> E[安排下次启动时间]
E --> F{还有参数?}
F -->|是| C
F -->|否| G[标记参数耗尽]
C -->|否| H[轮询所有运行中的任务]
G --> H
H --> I{有任务完成?}
I -->|是| J[返回结果]
I -->|否| K{全部完成?}
K -->|是| L[返回 None]
K -->|否| M[返回 Pending]
执行流程:
Race::new存储任务生成器、步进间隔和参数迭代器- 每次
poll_next检查是否该启动新任务 - 任务按固定间隔(step)启动
- 所有运行中的任务并发轮询
- 首个完成的任务结果立即返回
- 当所有任务完成且迭代器耗尽时 Stream 结束
API 参考
Race<'a, A, T, E, G, Fut, I>
阶梯式竞赛执行器,实现 futures::Stream。
类型参数:
A- 参数类型T- 成功结果类型E- 错误类型G- 任务生成函数Fn(A) -> FutFut- Future 类型,返回Result<(A, T), E>I- 迭代器类型,产生A
new(run: G, step: coarsetime::Duration, args_li: impl IntoIterator) -> Self
创建执行器,传入任务生成器、步进间隔和参数。
run- 从参数创建任务的函数step- 任务启动间隔(使用coarsetime::Duration)args_li- 参数迭代器(可以是无限的)
返回实现 futures::Stream 的 Race 实例。
Stream::poll_next
返回 Poll<Option<Result<(A, T), E>>>:
Poll::Ready(Some(Ok((arg, result))))- 任务成功完成Poll::Ready(Some(Err(e)))- 任务失败Poll::Ready(None)- 所有任务完成Poll::Pending- 暂无任务就绪
性能
实现的优化:
- 零
dyn分发 - 完全泛型,无虚拟调用 coarsetime- 快速时间操作,减少系统调用- 预分配向量 - 容量 8 避免小规模重新分配
- 高效轮询 - 反向迭代安全移除
- 立即清理 - 完成的任务立即 drop
基准测试显示相比基于 channel 的方法有显著性能提升。
技术栈
- tokio - 异步运行时和睡眠定时器
- futures - Stream trait 实现
- coarsetime - 高性能时间操作,减少系统调用
目录结构
race/
├── src/
│ └── lib.rs # 核心实现
├── tests/
│ └── main.rs # 集成测试
├── readme/
│ ├── en.md # 英文文档
│ └── zh.md # 中文文档
└── Cargo.toml
历史
异步编程中的 "race" 模式源于 JavaScript 的 Promise.race(),在 ES6 (2015) 中引入。与 Promise.race() 同时启动所有任务不同,本库实现了阶梯变体,灵感来自 Google 2015 年论文《The Tail at Scale》中描述的 gRPC 对冲请求模式。
对冲请求通过延迟发送冗余请求来降低尾延迟,使用最先到达的响应。该技术广泛应用于 Google、Amazon 等大规模分布式系统。
关于
本项目为 js0.site ⋅ 重构互联网计划 的开源组件。
我们正在以组件化的方式重新定义互联网的开发范式,欢迎关注: