race : Staggered Async Task Executor
Table of Contents
- Introduction
- Features
- Installation
- Usage
- Design
- API Reference
- Performance
- Tech Stack
- Project Structure
- History
Introduction
race is a high-performance Rust library implementing staggered async task execution. Tasks start at fixed intervals and race to completion - fastest task wins regardless of start order.
Note: "Staggered" means tasks are launched sequentially at fixed time intervals, not simultaneously. For example, with a 50ms interval: Task 1 starts at 0ms, Task 2 at 50ms, Task 3 at 100ms, etc. This creates a "staircase" or "ladder" pattern of task launches.
Key difference from Promise.race(): Instead of starting all tasks simultaneously, tasks launch sequentially at configurable intervals, enabling:
- Rate-limited API calls - Respect API quotas and avoid overwhelming servers
- Graceful degradation - Try primary servers first, fallbacks later
- Hedged requests - Reduce tail latency with redundant requests
- Infinite task streams - Handle unlimited iterators efficiently
Features
- Staggered execution - Tasks start at configurable intervals
- Race semantics - Fastest task completes first regardless of start order
- Stream-based API - Implements
futures::Streamfor async iteration - Infinite iterator support - Tasks started on-demand, not all at once
- Non-Copy type support - Works with String, Vec, custom structs without Clone requirement
- High performance - Zero
dyndispatch,coarsetimeoptimization - Memory efficient - Pre-allocated vectors, immediate cleanup
- No
'staticrequirement - Lifetime tied to Race instance
Installation
Or add to your Cargo.toml:
[]
= "0.1.3"
Usage
Basic Usage
use StreamExt;
use Race;
async
DNS Resolution with Staggered Requests
Query multiple hosts with 500ms staggered delay, return first successful result:
use IpAddr;
use StreamExt;
use Race;
use lookup_host;
async
Timeline:
- 0ms: Start resolving google.com
- 500ms: Start resolving cloudflare.com (if no response yet)
- 1000ms: Start resolving github.com (if still no response)
First completed response wins, remaining tasks continue until Race is dropped.
Infinite Task Streams
Handle unlimited iterators efficiently - tasks are started on-demand:
use StreamExt;
use Race;
async
Non-Copy Types
Works seamlessly with non-Copy types like String, Vec, and custom structs:
use StreamExt;
use Race;
async
Design
graph TD
A[Race::new] --> B[poll_next called]
B --> C{Time to start new task?}
C -->|Yes| D[Get next arg from iterator]
D --> E[Call run function with &arg]
E --> F[Store arg and Future in ing vector]
F --> G[Update next_run time]
G --> H{More args available?}
H -->|Yes| C
H -->|No| I[Mark is_end = true]
C -->|No| J[Poll all running tasks]
I --> J
J --> K{Any task completed?}
K -->|Yes| L[Remove from ing vector]
L --> M[Return Some Ok arg, result]
K -->|No| N{All tasks done?}
N -->|Yes| O[Return None]
N -->|No| P[Set timer for next start]
P --> Q[Return Pending]
Execution Flow
- Initialization:
Race::newstores task generator function, step interval, and argument iterator - Task Scheduling: On each
poll_next, check if current time >=next_runto start new tasks - Task Creation: Call
run(&arg)to create Future, store both arg and Future iningvector - Concurrent Polling: All running tasks polled simultaneously using reverse iteration for safe removal
- Result Handling: First completed task returns
(original_arg, result)tuple immediately - Timer Management:
tokio::time::Sleepensures proper wakeup for next task start - Stream Completion: Stream ends when all tasks complete and iterator exhausted
Key Design Decisions
- Reference-based API: Task generator receives
&Ato avoid unnecessary moves - Move Semantics: Arguments moved from iterator to task storage, then returned with results
- No Clone Requirement: Works with any type that implements
Send + Unpin, including non-cloneable types - Reverse Polling: Tasks polled in reverse order to safely remove completed ones
- Coarsetime Optimization: Uses
coarsetimefor high-performance interval timing - Pre-allocated Storage:
ingvector pre-allocated to avoid frequent reallocations
API Reference
Race<'a, A, T, E, G, Fut, I>
Staggered race executor implementing futures::Stream.
Type parameters:
A- Argument typeT- Success result typeE- Error typeG- Task generator functionFn(A) -> FutFut- Future type returningResult<(A, T), E>I- Iterator type yieldingA
new(step: std::time::Duration, run: G, args_li: impl IntoIterator) -> Self
Create executor with step interval, task generator, and arguments.
Parameters:
step: std::time::Duration- Staggered delay interval for task starts. Tasks are launched sequentially at this interval, not all at once. For example,Duration::from_millis(50)means each task starts 50ms after the previous one (converted tocoarsetime::Duration)run: G- FunctionFn(&A) -> Futthat creates Future from argument referenceargs_li: impl IntoIterator<Item = A>- Iterator of arguments (can be infinite)
Returns: Race instance implementing futures::Stream<Item = Result<(A, T), E>>
Constraints:
A: Send + Unpin + 'a- Argument type must be thread-safe and unpinnableT: Send + 'a- Result type must be thread-safeE: Send + 'a- Error type must be thread-safeG: Fn(&A) -> Fut + Send + Unpin + 'a- Generator function must be thread-safeFut: Future<Output = Result<T, E>> + Send + 'a- Future must be thread-safeI: Iterator<Item = A> + Send + Unpin + 'a- Iterator must be thread-safe
Stream::poll_next
Returns Poll<Option<(A, Result<T, E>)>>:
Poll::Ready(Some((arg, Ok(result))))- Task completed successfully with original argumentPoll::Ready(Some((arg, Err(e))))- Task failed with error, still returns original argumentPoll::Ready(None)- All tasks completed and iterator exhaustedPoll::Pending- No task ready, waker registered for future notification
Performance
Optimizations implemented:
- Zero
dyndispatch - Fully generic, no virtual calls coarsetime- Fast time operations, reduced syscalls- Pre-allocated vectors - Capacity 8 to avoid small reallocations
- Efficient polling - Reverse iteration for safe removal
- Immediate cleanup - Completed tasks dropped immediately
Benchmarks show significant performance improvements over channel-based approaches.
Tech Stack
- tokio - Async runtime and sleep timer
- futures - Stream trait implementation
- coarsetime - High-performance time operations with reduced syscalls
Project Structure
race/
├── src/
│ └── lib.rs # Core implementation
├── tests/
│ └── main.rs # Integration tests with logging
├── readme/
│ ├── en.md # English documentation
│ └── zh.md # Chinese documentation
└── Cargo.toml
History
The "race" pattern in async programming traces back to JavaScript's Promise.race(), introduced in ES6 (2015). Unlike Promise.race() which starts all tasks simultaneously, this library implements a staggered variant inspired by gRPC's "hedged requests" pattern described in Google's 2015 paper "The Tail at Scale".
Hedged requests help reduce tail latency by sending redundant requests after a delay, using whichever response arrives first. This technique is widely used in distributed systems at Google, Amazon, and other large-scale services.
This Rust implementation adds modern optimizations like zero-cost abstractions, efficient time handling, and support for infinite streams.
About
This project is an open-source component of js0.site ⋅ Refactoring the Internet Plan.
We are redefining the development paradigm of the Internet in a componentized way. Welcome to follow us:
race : 阶梯式异步任务执行器
目录
简介
race 是高性能 Rust 阶梯式异步任务执行器。任务按固定间隔启动并竞赛完成 - 最快完成者获胜,与启动顺序无关。
提示:"阶梯式"(Staggered)指任务按固定时间间隔依次启动,而非同时启动。例如,50ms 间隔表示:任务1在 0ms 启动,任务2在 50ms 启动,任务3在 100ms 启动,以此类推。这形成了任务启动的"阶梯"或"梯子"模式。
与 Promise.race() 的关键区别:不是同时启动所有任务,而是按可配置间隔依次启动,适用于:
- 限速 API 调用 - 遵守 API 配额,避免服务器过载
- 优雅降级 - 优先尝试主服务器,后备服务器延后启动
- 对冲请求 - 通过冗余请求降低尾延迟
- 无限任务流 - 高效处理无限迭代器
特性
- 阶梯式执行 - 任务按可配置间隔启动
- 竞赛语义 - 最快完成的任务先返回,与启动顺序无关
- 基于 Stream 的 API - 实现
futures::Stream用于异步迭代 - 支持无限迭代器 - 任务按需启动,不会一次性启动所有
- 非 Copy 类型支持 - 无需 Clone 约束即可支持 String、Vec、自定义结构体
- 高性能 - 零
dyn分发,coarsetime优化 - 内存高效 - 预分配向量,立即清理
- 无
'static要求 - 生命周期与 Race 实例绑定
安装
或添加到 Cargo.toml:
[]
= "0.1.3"
使用
基本用法
use StreamExt;
use Race;
async
DNS 阶梯解析
向多个主机发起解析,每 500ms 启动新请求,返回首个成功结果:
use IpAddr;
use StreamExt;
use Race;
use lookup_host;
async
时间线:
- 0ms: 开始解析 google.com
- 500ms: 开始解析 cloudflare.com(若无响应)
- 1000ms: 开始解析 github.com(若仍无响应)
首个完成的响应返回,剩余任务在后台继续运行直到 Race 被 drop。
无限任务流
高效处理无限迭代器 - 任务按需启动:
use StreamExt;
use Race;
async
非 Copy 类型
无缝支持非 Copy 类型,如 String、Vec 和自定义结构体:
use StreamExt;
use Race;
async
设计
graph TD
A[Race::new] --> B[poll_next 被调用]
B --> C{是否该启动新任务?}
C -->| 是| D[从迭代器获取下个参数]
D --> E[用 &arg 调用 run 函数]
E --> F[将 arg 和 Future 存入 ing 向量]
F --> G[更新 next_run 时间]
G --> H{还有参数可用?}
H -->| 是| C
H -->| 否| I[标记 is_end = true]
C -->| 否| J[轮询所有运行中的任务]
I --> J
J --> K{有任务完成?}
K -->| 是| L[从 ing 向量移除]
L --> M[返回 Some Ok arg, result]
K -->| 否| N{所有任务完成?}
N -->| 是| O[返回 None]
N -->| 否| P[设置下次启动定时器]
P --> Q[返回 Pending]
执行流程
- 初始化:
Race::new存储任务生成器函数、步进间隔和参数迭代器 - 任务调度: 每次
poll_next检查当前时间是否 >=next_run来启动新任务 - 任务创建: 调用
run(&arg)创建 Future,将参数和 Future 存储在ing向量中 - 并发轮询: 所有运行中的任务同时轮询,使用反向迭代安全移除
- 结果处理: 首个完成的任务立即返回
(原始参数, 结果)元组 - 定时器管理:
tokio::time::Sleep确保下次任务启动的正确唤醒 - 流完成: 当所有任务完成且迭代器耗尽时流结束
关键设计决策
- 基于引用的 API: 任务生成器接收
&A避免不必要的移动 - 移动语义: 参数从迭代器移动到任务存储,然后与结果一起返回
- 无 Clone 要求: 适用于任何实现
Send + Unpin的类型,包括不可克隆类型 - 反向轮询: 任务按反向顺序轮询以安全移除已完成的任务
- Coarsetime 优化: 使用
coarsetime进行高性能间隔计时 - 预分配存储:
ing向量预分配以避免频繁重新分配
API 参考
Race<'a, A, T, E, G, Fut, I>
阶梯式竞赛执行器,实现 futures::Stream。
类型参数:
A- 参数类型T- 成功结果类型E- 错误类型G- 任务生成函数Fn(A) -> FutFut- Future 类型,返回Result<(A, T), E>I- 迭代器类型,产生A
new(step: std::time::Duration, run: G, args_li: impl IntoIterator) -> Self
创建执行器,传入步进间隔、任务生成器和参数。
参数:
step: std::time::Duration- 阶梯式延时启动间隔。每个任务按此间隔依次启动,而非同时启动所有任务。例如Duration::from_millis(50)表示每隔 50ms 启动下一个任务(转换为coarsetime::Duration)run: G- 函数Fn(&A) -> Fut,从参数引用创建 Futureargs_li: impl IntoIterator<Item = A>- 参数迭代器(可以是无限的)
返回: 实现 futures::Stream<Item = Result<(A, T), E>> 的 Race 实例
约束:
A: Send + Unpin + 'a- 参数类型必须线程安全且可 unpinT: Send + 'a- 结果类型必须线程安全E: Send + 'a- 错误类型必须线程安全G: Fn(&A) -> Fut + Send + Unpin + 'a- 生成器函数必须线程安全Fut: Future<Output = Result<T, E>> + Send + 'a- Future 必须线程安全I: Iterator<Item = A> + Send + Unpin + 'a- 迭代器必须线程安全
Stream::poll_next
返回 Poll<Option<(A, Result<T, E>)>>:
Poll::Ready(Some((arg, Ok(result))))- 任务成功完成,返回原始参数Poll::Ready(Some((arg, Err(e))))- 任务失败,仍返回原始参数Poll::Ready(None)- 所有任务完成且迭代器耗尽Poll::Pending- 暂无任务就绪,已注册唤醒器等待未来通知
性能
实现的优化:
- 零
dyn分发 - 完全泛型,无虚拟调用 coarsetime- 快速时间操作,减少系统调用- 预分配向量 - 容量 8 避免小规模重新分配
- 高效轮询 - 反向迭代安全移除
- 立即清理 - 完成的任务立即 drop
基准测试显示相比基于 channel 的方法有显著性能提升。
技术栈
- tokio - 异步运行时和睡眠定时器
- futures - Stream trait 实现
- coarsetime - 高性能时间操作,减少系统调用
目录结构
race/
├── src/
│ └── lib.rs # 核心实现
├── tests/
│ └── main.rs # 集成测试
├── readme/
│ ├── en.md # 英文文档
│ └── zh.md # 中文文档
└── Cargo.toml
历史
异步编程中的 "race" 模式源于 JavaScript 的 Promise.race(),在 ES6 (2015) 中引入。与 Promise.race() 同时启动所有任务不同,本库实现了阶梯变体,灵感来自 Google 2015 年论文 《The Tail at Scale》 中描述的 gRPC 对冲请求模式。
对冲请求通过延迟发送冗余请求来降低尾延迟,使用最先到达的响应。该技术广泛应用于 Google、Amazon 等大规模分布式系统。
关于
本项目为 js0.site ⋅ 重构互联网计划 的开源组件。
我们正在以组件化的方式重新定义互联网的开发范式,欢迎关注: