Crate fast_ordered_buffer

Source
Expand description

§Fast Ordered Buffer

CI Crates.io Docs.rs

This Rust library is a faster alternative to buffered from the futures crate. It is used for buffering a stream of items into a fixed-size buffer, while preserving the order of the items.

The futures library is a great library, but it is desgined to use zero or minimal cost abstractions - even if this results in worse performance. In the case of .buffered it will wait for a slow future to complete to retain ordering - even if there are more futures waiting to be spawned concurrently.

This library is designed to allocate those waiting futures into a priority queue, and retain ordering while allowing futures to complete as fast as possible. This comes at the downside of a small amount of memory overhead (perfect case - 0 overhead, but we do not bound the number of futures that may be saved).

§Recommendations for usage

  1. This crate does not provide any form of backpressure, and so might be unsuitable for use in a high-throughput environment. If you need backpressure, consider using the futures crate, or submit an issue here and it can be added as a feature.
  2. Ensure the futures contained within this crate have a timeout.

§Getting Started

[dependencies]
fast_ordered_buffer = "0.1.0"

§Usage

use fast_ordered_buffer::FobStreamExt;
use futures::stream::{StreamExt, FuturesUnordered};
use futures::{future::FutureExt, stream};

async fn wait_then_return(i: u32, time: std::time::Duration) -> u32 {
    tokio::time::sleep(time).await;
    i
}

#[tokio::main]
async fn main() {
    let futures = vec![
        wait_then_return(5, std::time::Duration::from_millis(500)),
        wait_then_return(1, std::time::Duration::from_millis(100)),
        wait_then_return(2, std::time::Duration::from_millis(200)),
        wait_then_return(3, std::time::Duration::from_millis(300)),
        wait_then_return(4, std::time::Duration::from_millis(400)),
    ];

    // Create a stream of futures.
    let now = std::time::Instant::now();
    let result = stream::iter(futures)
        // Buffer the futures into a fixed-size buffer.
        // Try changing this to `buffered` to see the difference in performance.
        .fast_ordered_buffer(2)
        // Collect the results of the futures into a vector.
        .collect::<Vec<_>>()
        .await;

    println!("Result: {:?}, Time taken: {:?}", result, now.elapsed());
}

§Performance

§Methodology

All benchmarks taken on a Macbook Pro M2, standard YMMV disclaimer applies.

Tests

  • 10k Random Futures (Concurrency = 50)
  • 1k Random Futures (Concurrency = 10)

We use criterion to benchmark the performance of this library against the futures library. The benchmark is run with the following command:

cargo bench

We take the Mean time taken for each buffer type, fast_ordered_buffer, unordered_buffer, and buffer, rounded to the nearest millisecond.

§Results

Buffer Type10k Random Futures (Concurrency = 50)1k Random Futures (Concurrency = 10)
fast_ordered_buffer755ms399ms
unordered_buffer734ms366ms
buffer1169ms514ms

§Contribution

Please feel free to submit a pull request or open an issue if you have any suggestions or improvements.

§License

This project is licensed under either of

at your option.

Structs§

FastBufferOrdered
IdentifiableFuture

Traits§

FobStreamExt