futures_buffered/lib.rs
1//! # futures-buffered
2//!
3//! This project provides a single future structure: `FuturesUnorderedBounded`.
4//!
5//! Much like [`futures::stream::FuturesUnordered`](https://docs.rs/futures/0.3.25/futures/stream/struct.FuturesUnordered.html),
6//! this is a thread-safe, `Pin` friendly, lifetime friendly, concurrent processing stream.
7//!
8//! The is different to `FuturesUnordered` in that `FuturesUnorderedBounded` has a fixed capacity for processing count.
9//! This means it's less flexible, but produces better memory efficiency.
10//!
11//! ## Benchmarks
12//!
13//! ### Speed
14//!
15//! Running 65536 100us timers with 256 concurrent jobs in a single threaded tokio runtime:
16//!
17//! ```text
18//! FuturesUnordered time: [420.47 ms 422.21 ms 423.99 ms]
19//! FuturesUnorderedBounded time: [366.02 ms 367.54 ms 369.05 ms]
20//! ```
21//!
22//! ### Memory usage
23//!
24//! Running 512000 `Ready<i32>` futures with 256 concurrent jobs.
25//!
26//! - count: the number of times alloc/dealloc was called
27//! - alloc: the number of cumulative bytes allocated
28//! - dealloc: the number of cumulative bytes deallocated
29//!
30//! ```text
31//! FuturesUnordered
32//! count: 1024002
33//! alloc: 40960144 B
34//! dealloc: 40960000 B
35//!
36//! FuturesUnorderedBounded
37//! count: 2
38//! alloc: 8264 B
39//! dealloc: 0 B
40//! ```
41//!
42//! ### Conclusion
43//!
44//! As you can see, `FuturesUnorderedBounded` massively reduces you memory overhead while providing a significant performance gain.
45//! Perfect for if you want a fixed batch size
46//!
47//! # Example
48//! ```
49//! use futures::future::Future;
50//! use futures::stream::StreamExt;
51//! use futures_buffered::FuturesUnorderedBounded;
52//! use hyper::client::conn::http1::{handshake, SendRequest};
53//! use hyper::body::Incoming;
54//! use hyper::{Request, Response};
55//! use hyper_util::rt::TokioIo;
56//! use tokio::net::TcpStream;
57//!
58//! # #[cfg(miri)] fn main() {}
59//! # #[cfg(not(miri))] #[tokio::main]
60//! # async fn main() -> Result<(), Box<dyn std::error::Error>> {
61//! // create a tcp connection
62//! let stream = TcpStream::connect("example.com:80").await?;
63//!
64//! // perform the http handshakes
65//! let (mut rs, conn) = handshake(TokioIo::new(stream)).await?;
66//! tokio::spawn(conn);
67//!
68//! /// make http request to example.com and read the response
69//! fn make_req(rs: &mut SendRequest<String>) -> impl Future<Output = hyper::Result<Response<Incoming>>> {
70//! let req = Request::builder()
71//! .header("Host", "example.com")
72//! .method("GET")
73//! .body(String::new())
74//! .unwrap();
75//! rs.send_request(req)
76//! }
77//!
78//! // create a queue that can hold 128 concurrent requests
79//! let mut queue = FuturesUnorderedBounded::new(128);
80//!
81//! // start up 128 requests
82//! for _ in 0..128 {
83//! queue.push(make_req(&mut rs));
84//! }
85//! // wait for a request to finish and start another to fill its place - up to 1024 total requests
86//! for _ in 128..1024 {
87//! queue.next().await;
88//! queue.push(make_req(&mut rs));
89//! }
90//! // wait for the tail end to finish
91//! for _ in 0..128 {
92//! queue.next().await;
93//! }
94//! # Ok(()) }
95//! ```
96//!
97//! # Cooperative Scheduling
98//!
99//! The functionality provided by this crate are technically their own async schedulers. If you are using this functionality within another
100//! async scheduler, you might not cooperate effectively and might starve other tasks in your scheduler.
101//!
102//! This crate makes sure it doesn't get stuck forever by forcing a yield periodically,
103//! but if you're using Tokio you will get better scheduling behaviour if you enable the `tokio-coop` feature.
104#![no_std]
105
106extern crate alloc;
107
108#[cfg(test)]
109#[macro_use(vec, dbg)]
110extern crate std;
111
112use core::future::Future;
113use futures_core::Stream;
114
115mod buffered;
116mod futures_ordered;
117mod futures_ordered_bounded;
118mod futures_unordered;
119mod futures_unordered_bounded;
120mod iter_ext;
121mod join_all;
122mod merge_bounded;
123mod merge_unbounded;
124mod slot_map;
125mod try_buffered;
126mod try_join_all;
127mod waker_list;
128
129pub use buffered::{BufferUnordered, BufferedOrdered, BufferedStreamExt};
130pub use futures_ordered::FuturesOrdered;
131pub use futures_ordered_bounded::FuturesOrderedBounded;
132pub use futures_unordered::FuturesUnordered;
133pub use futures_unordered_bounded::FuturesUnorderedBounded;
134pub use iter_ext::IterExt;
135pub use join_all::{join_all, JoinAll};
136#[allow(deprecated)]
137pub use merge_bounded::{Merge, MergeBounded};
138pub use merge_unbounded::MergeUnbounded;
139pub use try_buffered::{BufferedTryStreamExt, TryBufferUnordered, TryBufferedOrdered};
140pub use try_join_all::{try_join_all, TryJoinAll};
141
142mod private_try_future {
143 use core::future::Future;
144
145 pub trait Sealed {}
146
147 impl<F, T, E> Sealed for F where F: ?Sized + Future<Output = Result<T, E>> {}
148}
149
150/// A convenience for futures that return `Result` values that includes
151/// a variety of adapters tailored to such futures.
152///
153/// This is [`futures::TryFuture`](futures_core::future::TryFuture) except it's stricter on the future super-trait.
154pub trait TryFuture:
155 Future<Output = Result<Self::Ok, Self::Err>> + private_try_future::Sealed
156{
157 type Ok;
158 type Err;
159}
160
161impl<T, E, F: ?Sized + Future<Output = Result<T, E>>> TryFuture for F {
162 type Ok = T;
163 type Err = E;
164}
165
166mod private_try_stream {
167 use futures_core::Stream;
168
169 pub trait Sealed {}
170
171 impl<S, T, E> Sealed for S where S: ?Sized + Stream<Item = Result<T, E>> {}
172}
173
174/// A convenience for streams that return `Result` values that includes
175/// a variety of adapters tailored to such futures.
176///
177/// This is [`futures::TryStream`](futures_core::stream::TryStream) except it's stricter on the stream super-trait.
178pub trait TryStream:
179 Stream<Item = Result<Self::Ok, Self::Err>> + private_try_stream::Sealed
180{
181 type Ok;
182 type Err;
183}
184
185impl<T, E, S: ?Sized + Stream<Item = Result<T, E>>> TryStream for S {
186 type Ok = T;
187 type Err = E;
188}