Skip to main content

Crate futures_buffered

Crate futures_buffered 

Source
Expand description

§futures-buffered

This project provides a single future structure: FuturesUnorderedBounded.

Much like futures::stream::FuturesUnordered, this is a thread-safe, Pin friendly, lifetime friendly, concurrent processing stream.

The is different to FuturesUnordered in that FuturesUnorderedBounded has a fixed capacity for processing count. This means it’s less flexible, but produces better memory efficiency.

§Benchmarks

§Speed

Running 65536 100us timers with 256 concurrent jobs in a single threaded tokio runtime:

FuturesUnordered         time:   [420.47 ms 422.21 ms 423.99 ms]
FuturesUnorderedBounded  time:   [366.02 ms 367.54 ms 369.05 ms]

§Memory usage

Running 512000 Ready<i32> futures with 256 concurrent jobs.

  • count: the number of times alloc/dealloc was called
  • alloc: the number of cumulative bytes allocated
  • dealloc: the number of cumulative bytes deallocated
FuturesUnordered
    count:    1024002
    alloc:    40960144 B
    dealloc:  40960000 B

FuturesUnorderedBounded
    count:    2
    alloc:    8264 B
    dealloc:  0 B

§Conclusion

As you can see, FuturesUnorderedBounded massively reduces you memory overhead while providing a significant performance gain. Perfect for if you want a fixed batch size

§Example

use futures::future::Future;
use futures::stream::StreamExt;
use futures_buffered::FuturesUnorderedBounded;
use hyper::client::conn::http1::{handshake, SendRequest};
use hyper::body::Incoming;
use hyper::{Request, Response};
use hyper_util::rt::TokioIo;
use tokio::net::TcpStream;

// create a tcp connection
let stream = TcpStream::connect("example.com:80").await?;

// perform the http handshakes
let (mut rs, conn) = handshake(TokioIo::new(stream)).await?;
tokio::spawn(conn);

/// make http request to example.com and read the response
fn make_req(rs: &mut SendRequest<String>) -> impl Future<Output = hyper::Result<Response<Incoming>>> {
    let req = Request::builder()
        .header("Host", "example.com")
        .method("GET")
        .body(String::new())
        .unwrap();
    rs.send_request(req)
}

// create a queue that can hold 128 concurrent requests
let mut queue = FuturesUnorderedBounded::new(128);

// start up 128 requests
for _ in 0..128 {
    queue.push(make_req(&mut rs));
}
// wait for a request to finish and start another to fill its place - up to 1024 total requests
for _ in 128..1024 {
    queue.next().await;
    queue.push(make_req(&mut rs));
}
// wait for the tail end to finish
for _ in 0..128 {
    queue.next().await;
}

§Cooperative Scheduling

The functionality provided by this crate are technically their own async schedulers. If you are using this functionality within another async scheduler, you might not cooperate effectively and might starve other tasks in your scheduler.

This crate makes sure it doesn’t get stuck forever by forcing a yield periodically, but if you’re using Tokio you will get better scheduling behaviour if you enable the tokio-coop feature.

Structs§

BufferUnordered
Stream for the buffered_unordered method.
BufferedOrdered
Stream for the buffered_ordered method.
FuturesOrdered
An unbounded queue of futures.
FuturesOrderedBounded
An unbounded queue of futures.
FuturesUnordered
A set of futures which may complete in any order.
FuturesUnorderedBounded
A set of futures which may complete in any order.
JoinAll
Future for the join_all function.
MergeBounded
A combined stream that releases values in any order that they come
MergeUnbounded
A combined stream that releases values in any order that they come.
TryBufferUnordered
Stream for the try_buffered_unordered method.
TryBufferedOrdered
Stream for the try_buffered_ordered method.
TryJoinAll
Future for the try_join_all function.

Traits§

BufferedStreamExt
An extension trait for Streams that provides a variety of convenient combinator functions.
BufferedTryStreamExt
An extension trait for Streams that provides a variety of convenient combinator functions.
IterExt
Concurrency extensions for iterators of streams and futures.
TryFuture
A convenience for futures that return Result values that includes a variety of adapters tailored to such futures.
TryStream
A convenience for streams that return Result values that includes a variety of adapters tailored to such futures.

Functions§

join_all
Creates a future which represents a collection of the outputs of the futures given.
try_join_all
Creates a future which represents a collection of the outputs of the futures given.

Type Aliases§

MergeDeprecated