race 0.1.6

Staggered async task runner / 阶梯式异步任务执行器
Documentation

English | 中文


race : Staggered Race Executor

Table of Contents

Introduction

race is a high-performance Rust library implementing a staggered race executor. Tasks are started at fixed intervals and race to complete - whichever finishes first is returned first, regardless of start order.

Key difference from Promise.race(): Instead of starting all tasks simultaneously, tasks are launched one by one at configurable intervals, enabling:

  • Rate-limited API calls - Respect API quotas and avoid overwhelming servers
  • Graceful degradation - Try primary servers first, fallbacks later
  • Hedged requests - Reduce tail latency with redundant requests
  • Infinite task streams - Handle unlimited iterators efficiently

Features

  • Staggered execution - Tasks start at configurable intervals
  • Race semantics - Fastest task completes first regardless of start order
  • Stream-based API - Implements futures::Stream for async iteration
  • Infinite iterator support - Tasks started on-demand, not all at once
  • High performance - Zero dyn dispatch, coarsetime optimization
  • Memory efficient - Pre-allocated vectors, immediate cleanup
  • No 'static requirement - Lifetime tied to Race instance

Installation

cargo add race

Or add to your Cargo.toml:

[dependencies]
race = "0.1.3"

Usage

Basic Usage

use coarsetime::Duration;
use futures::StreamExt;
use race::Race;

#[tokio::main]
async fn main() {
  let mut race = Race::new(
    |url: &str| async move {
      // Simulate network request with different latencies
      let latency = match url {
        "server1" => 100,
        "server2" => 20,  // Fastest
        _ => 80,
      };
      tokio::time::sleep(std::time::Duration::from_millis(latency)).await;
      Ok::<(&str, String), &'static str>((url, format!("Response from {url}")))
    },
    Duration::from_millis(50),  // Start new task every 50ms
    vec!["server1", "server2", "server3"],
  );

  // Get first completed result (server2 completes first despite starting second)
  if let Some(Ok((url, data))) = race.next().await {
    println!("First response from {url}: {data}");
  }
}

DNS Resolution with Staggered Requests

Query multiple hosts with 500ms staggered delay, return first successful result:

use std::net::IpAddr;
use futures::StreamExt;
use race::Race;
use tokio::net::lookup_host;

#[tokio::main]
async fn main() {
  let hosts = vec!["google.com:80", "cloudflare.com:80", "github.com:80"];

  let mut race = Race::new(
    |host: &str| async move {
      let addr = lookup_host(host).await?.next().ok_or_else(|| {
        std::io::Error::new(std::io::ErrorKind::NotFound, "no address")
      })?;
      Ok::<(&str, IpAddr), std::io::Error>((host, addr.ip()))
    },
    coarsetime::Duration::from_millis(500),
    hosts,
  );

  // Return first successful response
  while let Some(result) = race.next().await {
    if let Ok((host, ip)) = result {
      println!("Resolved {host}: {ip}");
      break;
    }
  }
}

Timeline:

  • 0ms: Start resolving google.com
  • 500ms: Start resolving cloudflare.com (if no response yet)
  • 1000ms: Start resolving github.com (if still no response)

First completed response wins, remaining tasks continue until Race is dropped.

Infinite Task Streams

Handle unlimited iterators efficiently - tasks are started on-demand:

use futures::StreamExt;
use race::Race;

#[tokio::main]
async fn main() {
  // Infinite iterator - only starts tasks as needed
  let infinite_numbers = 0u64..;
  
  let mut race = Race::new(
    |n: u64| async move {
      tokio::time::sleep(std::time::Duration::from_millis(100)).await;
      Ok::<(u64, u64), &'static str>((n, n * n))
    },
    coarsetime::Duration::from_millis(50),
    infinite_numbers,
  );

  // Only consume what you need - no memory explosion
  for i in 0..5 {
    if let Some(Ok((n, square))) = race.next().await {
      println!("Result {i}: {n}² = {square}");
    }
  }
  // Race is dropped here, remaining tasks are cancelled
}

Design

graph TD
  A[Race::new] --> B[poll_next called]
  B --> C{Time to start new task?}
  C -->|Yes| D[Start task from iterator]
  D --> E[Schedule next start time]
  E --> F{More args?}
  F -->|Yes| C
  F -->|No| G[Mark args exhausted]
  C -->|No| H[Poll all running tasks]
  G --> H
  H --> I{Any task completed?}
  I -->|Yes| J[Return result]
  I -->|No| K{All done?}
  K -->|Yes| L[Return None]
  K -->|No| M[Return Pending + Set Timer]

Execution flow:

  1. Race::new stores task generator, step interval, and argument iterator
  2. On each poll_next, check if it's time to start new tasks
  3. Tasks are started at fixed intervals using coarsetime for performance
  4. All running tasks are polled concurrently
  5. First completed task's result is returned immediately
  6. Timer ensures proper wakeup for next task start
  7. Stream ends when all tasks complete and iterator is exhausted

API Reference

Race<'a, A, T, E, G, Fut, I>

Staggered race executor implementing futures::Stream.

Type parameters:

  • A - Argument type
  • T - Success result type
  • E - Error type
  • G - Task generator function Fn(A) -> Fut
  • Fut - Future type returning Result<(A, T), E>
  • I - Iterator type yielding A

new(run: G, step: coarsetime::Duration, args_li: impl IntoIterator) -> Self

Create executor with task generator, step interval, and arguments.

  • run - Function that creates a task from an argument
  • step - Interval between task starts (using coarsetime::Duration)
  • args_li - Iterator of arguments (can be infinite)

Returns a Race instance that implements futures::Stream.

Stream::poll_next

Returns Poll<Option<Result<(A, T), E>>>:

  • Poll::Ready(Some(Ok((arg, result)))) - Task completed successfully
  • Poll::Ready(Some(Err(e))) - Task failed
  • Poll::Ready(None) - All tasks completed
  • Poll::Pending - No task ready yet

Performance

Optimizations implemented:

  • Zero dyn dispatch - Fully generic, no virtual calls
  • coarsetime - Fast time operations, reduced syscalls
  • Pre-allocated vectors - Capacity 8 to avoid small reallocations
  • Efficient polling - Reverse iteration for safe removal
  • Immediate cleanup - Completed tasks dropped immediately

Benchmarks show significant performance improvements over channel-based approaches.

Tech Stack

  • tokio - Async runtime and sleep timer
  • futures - Stream trait implementation
  • coarsetime - High-performance time operations with reduced syscalls

Project Structure

race/
├── src/
│   └── lib.rs        # Core implementation
├── tests/
│   └── main.rs       # Integration tests with logging
├── readme/
│   ├── en.md         # English documentation
│   └── zh.md         # Chinese documentation
└── Cargo.toml

History

The "race" pattern in async programming traces back to JavaScript's Promise.race(), introduced in ES6 (2015). Unlike Promise.race() which starts all tasks simultaneously, this library implements a staggered variant inspired by gRPC's "hedged requests" pattern described in Google's 2015 paper "The Tail at Scale".

Hedged requests help reduce tail latency by sending redundant requests after a delay, using whichever response arrives first. This technique is widely used in distributed systems at Google, Amazon, and other large-scale services.

This Rust implementation adds modern optimizations like zero-cost abstractions, efficient time handling, and support for infinite streams.


About

This project is an open-source component of js0.site ⋅ Refactoring the Internet Plan.

We are redefining the development paradigm of the Internet in a componentized way. Welcome to follow us:


race : 阶梯式竞赛执行器

目录

简介

race 是 Rust 阶梯式竞赛执行器库。任务按固定间隔启动,竞赛完成 - 谁先完成谁先返回。

Promise.race() 的关键区别:不是同时启动所有任务,而是按可配置间隔逐个启动,适用于:

  • 限速 API 调用
  • 多服务器优雅降级
  • 对冲请求降低尾延迟
  • 无限任务流(按需启动任务)

特性

  • 阶梯式执行 - 任务按可配置间隔启动
  • 竞赛语义 - 最快完成的任务先返回,与启动顺序无关
  • 基于 Stream 的 API - 实现 futures::Stream 用于异步迭代
  • 支持无限迭代器 - 任务按需启动,不会一次性启动所有
  • 高性能 - 零 dyn 分发,coarsetime 优化
  • 内存高效 - 预分配向量,立即清理
  • 'static 要求 - 生命周期与 Race 实例绑定

安装

cargo add race

或添加到 Cargo.toml

[dependencies]
race = "0.1.3"

使用

基本用法

use coarsetime::Duration;
use futures::StreamExt;
use race::Race;

#[tokio::main]
async fn main() {
  let mut race = Race::new(
    |url: &str| async move {
      // 模拟不同延迟的网络请求
      let latency = match url {
        "server1" => 100,
        "server2" => 20,  // 最快
        _ => 80,
      };
      tokio::time::sleep(std::time::Duration::from_millis(latency)).await;
      Ok::<(&str, String), &'static str>((url, format!("Response from {url}")))
    },
    Duration::from_millis(50),  // 每 50ms 启动新任务
    vec!["server1", "server2", "server3"],
  );

  // 获取首个完成的结果(server2 虽然第二个启动但最先完成)
  if let Some(Ok((url, data))) = race.next().await {
    println!("首个响应来自 {url}: {data}");
  }
}

DNS 阶梯解析

向多个主机发起解析,每 500ms 启动新请求,返回首个成功结果:

use std::net::IpAddr;
use coarsetime::Duration;
use futures::StreamExt;
use race::Race;
use tokio::net::lookup_host;

#[tokio::main]
async fn main() {
  let hosts = vec!["google.com:80", "cloudflare.com:80", "github.com:80"];

  let mut race = Race::new(
    |host: &str| async move {
      let addr = lookup_host(host).await?.next().ok_or_else(|| {
        std::io::Error::new(std::io::ErrorKind::NotFound, "no address")
      })?;
      Ok::<(&str, IpAddr), std::io::Error>((host, addr.ip()))
    },
    Duration::from_millis(500),
    hosts,
  );

  // 返回首个成功响应
  while let Some(result) = race.next().await {
    if let Ok((host, ip)) = result {
      println!("解析 {host}: {ip}");
      break;
    }
  }
}

时间线:

  • 0ms: 开始解析 google.com
  • 500ms: 开始解析 cloudflare.com(若无响应)
  • 1000ms: 开始解析 github.com(若仍无响应)

首个完成的响应返回,剩余任务在后台继续运行直到 Race 被 drop。

无限任务流

高效处理无限迭代器 - 任务按需启动:

use coarsetime::Duration;
use futures::StreamExt;
use race::Race;

#[tokio::main]
async fn main() {
  // 无限迭代器 - 只按需启动任务
  let infinite_numbers = 0u64..;
  
  let mut race = Race::new(
    |n: u64| async move {
      tokio::time::sleep(std::time::Duration::from_millis(100)).await;
      Ok::<(u64, u64), &'static str>((n, n * n))
    },
    Duration::from_millis(50),
    infinite_numbers,
  );

  // 只消费需要的部分 - 不会内存爆炸
  for i in 0..5 {
    if let Some(Ok((n, square))) = race.next().await {
      println!("结果 {i}: {n}² = {square}");
    }
  }
  // Race 在此处被 drop,剩余任务被取消
}

设计

graph TD
  A[Race::new] --> B[poll_next 被调用]
  B --> C{是否该启动新任务?}
  C -->|是| D[从迭代器启动任务]
  D --> E[安排下次启动时间]
  E --> F{还有参数?}
  F -->|是| C
  F -->|否| G[标记参数耗尽]
  C -->|否| H[轮询所有运行中的任务]
  G --> H
  H --> I{有任务完成?}
  I -->|是| J[返回结果]
  I -->|否| K{全部完成?}
  K -->|是| L[返回 None]
  K -->|否| M[返回 Pending]

执行流程:

  1. Race::new 存储任务生成器、步进间隔和参数迭代器
  2. 每次 poll_next 检查是否该启动新任务
  3. 任务按固定间隔(step)启动
  4. 所有运行中的任务并发轮询
  5. 首个完成的任务结果立即返回
  6. 当所有任务完成且迭代器耗尽时 Stream 结束

API 参考

Race<'a, A, T, E, G, Fut, I>

阶梯式竞赛执行器,实现 futures::Stream

类型参数:

  • A - 参数类型
  • T - 成功结果类型
  • E - 错误类型
  • G - 任务生成函数 Fn(A) -> Fut
  • Fut - Future 类型,返回 Result<(A, T), E>
  • I - 迭代器类型,产生 A

new(run: G, step: coarsetime::Duration, args_li: impl IntoIterator) -> Self

创建执行器,传入任务生成器、步进间隔和参数。

  • run - 从参数创建任务的函数
  • step - 任务启动间隔(使用 coarsetime::Duration
  • args_li - 参数迭代器(可以是无限的)

返回实现 futures::StreamRace 实例。

Stream::poll_next

返回 Poll<Option<Result<(A, T), E>>>

  • Poll::Ready(Some(Ok((arg, result)))) - 任务成功完成
  • Poll::Ready(Some(Err(e))) - 任务失败
  • Poll::Ready(None) - 所有任务完成
  • Poll::Pending - 暂无任务就绪

性能

实现的优化:

  • dyn 分发 - 完全泛型,无虚拟调用
  • coarsetime - 快速时间操作,减少系统调用
  • 预分配向量 - 容量 8 避免小规模重新分配
  • 高效轮询 - 反向迭代安全移除
  • 立即清理 - 完成的任务立即 drop

基准测试显示相比基于 channel 的方法有显著性能提升。

技术栈

  • tokio - 异步运行时和睡眠定时器
  • futures - Stream trait 实现
  • coarsetime - 高性能时间操作,减少系统调用

目录结构

race/
├── src/
│   └── lib.rs        # 核心实现
├── tests/
│   └── main.rs       # 集成测试
├── readme/
│   ├── en.md         # 英文文档
│   └── zh.md         # 中文文档
└── Cargo.toml

历史

异步编程中的 "race" 模式源于 JavaScript 的 Promise.race(),在 ES6 (2015) 中引入。与 Promise.race() 同时启动所有任务不同,本库实现了阶梯变体,灵感来自 Google 2015 年论文《The Tail at Scale》中描述的 gRPC 对冲请求模式。

对冲请求通过延迟发送冗余请求来降低尾延迟,使用最先到达的响应。该技术广泛应用于 Google、Amazon 等大规模分布式系统。


关于

本项目为 js0.site ⋅ 重构互联网计划 的开源组件。

我们正在以组件化的方式重新定义互联网的开发范式,欢迎关注: