Expand description
§futures-buffered
This project provides a single future structure: FuturesUnorderedBounded.
Much like futures::stream::FuturesUnordered,
this is a thread-safe, Pin friendly, lifetime friendly, concurrent processing stream.
The is different to FuturesUnordered in that FuturesUnorderedBounded has a fixed capacity for processing count.
This means it’s less flexible, but produces better memory efficiency.
§Benchmarks
§Speed
Running 65536 100us timers with 256 concurrent jobs in a single threaded tokio runtime:
FuturesUnordered time: [420.47 ms 422.21 ms 423.99 ms]
FuturesUnorderedBounded time: [366.02 ms 367.54 ms 369.05 ms]§Memory usage
Running 512000 Ready<i32> futures with 256 concurrent jobs.
- count: the number of times alloc/dealloc was called
- alloc: the number of cumulative bytes allocated
- dealloc: the number of cumulative bytes deallocated
FuturesUnordered
count: 1024002
alloc: 40960144 B
dealloc: 40960000 B
FuturesUnorderedBounded
count: 2
alloc: 8264 B
dealloc: 0 B§Conclusion
As you can see, FuturesUnorderedBounded massively reduces you memory overhead while providing a significant performance gain.
Perfect for if you want a fixed batch size
§Example
use futures::future::Future;
use futures::stream::StreamExt;
use futures_buffered::FuturesUnorderedBounded;
use hyper::client::conn::http1::{handshake, SendRequest};
use hyper::body::Incoming;
use hyper::{Request, Response};
use hyper_util::rt::TokioIo;
use tokio::net::TcpStream;
// create a tcp connection
let stream = TcpStream::connect("example.com:80").await?;
// perform the http handshakes
let (mut rs, conn) = handshake(TokioIo::new(stream)).await?;
tokio::spawn(conn);
/// make http request to example.com and read the response
fn make_req(rs: &mut SendRequest<String>) -> impl Future<Output = hyper::Result<Response<Incoming>>> {
let req = Request::builder()
.header("Host", "example.com")
.method("GET")
.body(String::new())
.unwrap();
rs.send_request(req)
}
// create a queue that can hold 128 concurrent requests
let mut queue = FuturesUnorderedBounded::new(128);
// start up 128 requests
for _ in 0..128 {
queue.push(make_req(&mut rs));
}
// wait for a request to finish and start another to fill its place - up to 1024 total requests
for _ in 128..1024 {
queue.next().await;
queue.push(make_req(&mut rs));
}
// wait for the tail end to finish
for _ in 0..128 {
queue.next().await;
}§Cooperative Scheduling
The functionality provided by this crate are technically their own async schedulers. If you are using this functionality within another async scheduler, you might not cooperate effectively and might starve other tasks in your scheduler.
This crate makes sure it doesn’t get stuck forever by forcing a yield periodically,
but if you’re using Tokio you will get better scheduling behaviour if you enable the tokio-coop feature.
Structs§
- Buffer
Unordered - Stream for the
buffered_unorderedmethod. - Buffered
Ordered - Stream for the
buffered_orderedmethod. - Futures
Ordered - An unbounded queue of futures.
- Futures
Ordered Bounded - An unbounded queue of futures.
- Futures
Unordered - A set of futures which may complete in any order.
- Futures
Unordered Bounded - A set of futures which may complete in any order.
- JoinAll
- Future for the
join_allfunction. - Merge
Bounded - A combined stream that releases values in any order that they come
- Merge
Unbounded - A combined stream that releases values in any order that they come.
- TryBuffer
Unordered - Stream for the
try_buffered_unorderedmethod. - TryBuffered
Ordered - Stream for the
try_buffered_orderedmethod. - TryJoin
All - Future for the
try_join_allfunction.
Traits§
- Buffered
Stream Ext - An extension trait for
Streams that provides a variety of convenient combinator functions. - Buffered
TryStream Ext - An extension trait for
Streams that provides a variety of convenient combinator functions. - IterExt
- Concurrency extensions for iterators of streams and futures.
- TryFuture
- A convenience for futures that return
Resultvalues that includes a variety of adapters tailored to such futures. - TryStream
- A convenience for streams that return
Resultvalues that includes a variety of adapters tailored to such futures.
Functions§
- join_
all - Creates a future which represents a collection of the outputs of the futures given.
- try_
join_ all - Creates a future which represents a collection of the outputs of the futures given.
Type Aliases§
- Merge
Deprecated