futures_buffered/
lib.rs

1//! # futures-buffered
2//!
3//! This project provides a single future structure: `FuturesUnorderedBounded`.
4//!
5//! Much like [`futures::stream::FuturesUnordered`](https://docs.rs/futures/0.3.25/futures/stream/struct.FuturesUnordered.html),
6//! this is a thread-safe, `Pin` friendly, lifetime friendly, concurrent processing stream.
7//!
8//! The is different to `FuturesUnordered` in that `FuturesUnorderedBounded` has a fixed capacity for processing count.
9//! This means it's less flexible, but produces better memory efficiency.
10//!
11//! ## Benchmarks
12//!
13//! ### Speed
14//!
15//! Running 65536 100us timers with 256 concurrent jobs in a single threaded tokio runtime:
16//!
17//! ```text
18//! FuturesUnordered         time:   [420.47 ms 422.21 ms 423.99 ms]
19//! FuturesUnorderedBounded  time:   [366.02 ms 367.54 ms 369.05 ms]
20//! ```
21//!
22//! ### Memory usage
23//!
24//! Running 512000 `Ready<i32>` futures with 256 concurrent jobs.
25//!
26//! - count: the number of times alloc/dealloc was called
27//! - alloc: the number of cumulative bytes allocated
28//! - dealloc: the number of cumulative bytes deallocated
29//!
30//! ```text
31//! FuturesUnordered
32//!     count:    1024002
33//!     alloc:    40960144 B
34//!     dealloc:  40960000 B
35//!
36//! FuturesUnorderedBounded
37//!     count:    2
38//!     alloc:    8264 B
39//!     dealloc:  0 B
40//! ```
41//!
42//! ### Conclusion
43//!
44//! As you can see, `FuturesUnorderedBounded` massively reduces you memory overhead while providing a significant performance gain.
45//! Perfect for if you want a fixed batch size
46//!
47//! # Example
48//! ```
49//! use futures::future::Future;
50//! use futures::stream::StreamExt;
51//! use futures_buffered::FuturesUnorderedBounded;
52//! use hyper::client::conn::http1::{handshake, SendRequest};
53//! use hyper::body::Incoming;
54//! use hyper::{Request, Response};
55//! use hyper_util::rt::TokioIo;
56//! use tokio::net::TcpStream;
57//!
58//! # #[cfg(miri)] fn main() {}
59//! # #[cfg(not(miri))] #[tokio::main]
60//! # async fn main() -> Result<(), Box<dyn std::error::Error>> {
61//! // create a tcp connection
62//! let stream = TcpStream::connect("example.com:80").await?;
63//!
64//! // perform the http handshakes
65//! let (mut rs, conn) = handshake(TokioIo::new(stream)).await?;
66//! tokio::spawn(conn);
67//!
68//! /// make http request to example.com and read the response
69//! fn make_req(rs: &mut SendRequest<String>) -> impl Future<Output = hyper::Result<Response<Incoming>>> {
70//!     let req = Request::builder()
71//!         .header("Host", "example.com")
72//!         .method("GET")
73//!         .body(String::new())
74//!         .unwrap();
75//!     rs.send_request(req)
76//! }
77//!
78//! // create a queue that can hold 128 concurrent requests
79//! let mut queue = FuturesUnorderedBounded::new(128);
80//!
81//! // start up 128 requests
82//! for _ in 0..128 {
83//!     queue.push(make_req(&mut rs));
84//! }
85//! // wait for a request to finish and start another to fill its place - up to 1024 total requests
86//! for _ in 128..1024 {
87//!     queue.next().await;
88//!     queue.push(make_req(&mut rs));
89//! }
90//! // wait for the tail end to finish
91//! for _ in 0..128 {
92//!     queue.next().await;
93//! }
94//! # Ok(()) }
95//! ```
96#![no_std]
97
98extern crate alloc;
99
100#[cfg(test)]
101#[macro_use(vec, dbg)]
102extern crate std;
103
104use core::future::Future;
105use futures_core::Stream;
106
107mod buffered;
108mod futures_ordered;
109mod futures_ordered_bounded;
110mod futures_unordered;
111mod futures_unordered_bounded;
112mod iter_ext;
113mod join_all;
114mod merge_bounded;
115mod merge_unbounded;
116mod slot_map;
117mod try_buffered;
118mod try_join_all;
119mod waker_list;
120
121pub use buffered::{BufferUnordered, BufferedOrdered, BufferedStreamExt};
122pub use futures_ordered::FuturesOrdered;
123pub use futures_ordered_bounded::FuturesOrderedBounded;
124pub use futures_unordered::FuturesUnordered;
125pub use futures_unordered_bounded::FuturesUnorderedBounded;
126pub use iter_ext::IterExt;
127pub use join_all::{join_all, JoinAll};
128#[allow(deprecated)]
129pub use merge_bounded::{Merge, MergeBounded};
130pub use merge_unbounded::MergeUnbounded;
131pub use try_buffered::{BufferedTryStreamExt, TryBufferUnordered, TryBufferedOrdered};
132pub use try_join_all::{try_join_all, TryJoinAll};
133
134mod private_try_future {
135    use core::future::Future;
136
137    pub trait Sealed {}
138
139    impl<F, T, E> Sealed for F where F: ?Sized + Future<Output = Result<T, E>> {}
140}
141
142/// A convenience for futures that return `Result` values that includes
143/// a variety of adapters tailored to such futures.
144///
145/// This is [`futures::TryFuture`](futures_core::future::TryFuture) except it's stricter on the future super-trait.
146pub trait TryFuture:
147    Future<Output = Result<Self::Ok, Self::Err>> + private_try_future::Sealed
148{
149    type Ok;
150    type Err;
151}
152
153impl<T, E, F: ?Sized + Future<Output = Result<T, E>>> TryFuture for F {
154    type Ok = T;
155    type Err = E;
156}
157
158mod private_try_stream {
159    use futures_core::Stream;
160
161    pub trait Sealed {}
162
163    impl<S, T, E> Sealed for S where S: ?Sized + Stream<Item = Result<T, E>> {}
164}
165
166/// A convenience for streams that return `Result` values that includes
167/// a variety of adapters tailored to such futures.
168///
169/// This is [`futures::TryStream`](futures_core::stream::TryStream) except it's stricter on the stream super-trait.
170pub trait TryStream:
171    Stream<Item = Result<Self::Ok, Self::Err>> + private_try_stream::Sealed
172{
173    type Ok;
174    type Err;
175}
176
177impl<T, E, S: ?Sized + Stream<Item = Result<T, E>>> TryStream for S {
178    type Ok = T;
179    type Err = E;
180}