futures_buffered/futures_unordered_bounded.rs
1use core::{
2 fmt,
3 future::Future,
4 pin::Pin,
5 task::{Context, Poll},
6};
7
8use crate::{slot_map::PinSlotMap, waker_list::WakerList};
9use futures_core::{FusedStream, Stream};
10
11/// A set of futures which may complete in any order.
12///
13/// Much like [`futures::stream::FuturesUnordered`](https://docs.rs/futures/0.3.25/futures/stream/struct.FuturesUnordered.html),
14/// this is a thread-safe, `Pin` friendly, lifetime friendly, concurrent processing stream.
15///
16/// The is different to `FuturesUnordered` in that `FuturesUnorderedBounded` has a fixed capacity for processing count.
17/// This means it's less flexible, but produces better memory efficiency.
18///
19/// ## Benchmarks
20///
21/// ### Speed
22///
23/// Running 65536 100us timers with 256 concurrent jobs in a single threaded tokio runtime:
24///
25/// ```text
26/// FuturesUnordered time: [420.47 ms 422.21 ms 423.99 ms]
27/// FuturesUnorderedBounded time: [366.02 ms 367.54 ms 369.05 ms]
28/// ```
29///
30/// ### Memory usage
31///
32/// Running 512000 `Ready<i32>` futures with 256 concurrent jobs.
33///
34/// - count: the number of times alloc/dealloc was called
35/// - alloc: the number of cumulative bytes allocated
36/// - dealloc: the number of cumulative bytes deallocated
37///
38/// ```text
39/// FuturesUnordered
40/// count: 1024002
41/// alloc: 40960144 B
42/// dealloc: 40960000 B
43///
44/// FuturesUnorderedBounded
45/// count: 2
46/// alloc: 8264 B
47/// dealloc: 0 B
48/// ```
49///
50/// ### Conclusion
51///
52/// As you can see, `FuturesUnorderedBounded` massively reduces you memory overhead while providing a significant performance gain.
53/// Perfect for if you want a fixed batch size
54///
55/// # Example
56///
57/// Making 1024 total HTTP requests, with a max concurrency of 128
58///
59/// ```
60/// use futures::future::Future;
61/// use futures::stream::StreamExt;
62/// use futures_buffered::FuturesUnorderedBounded;
63/// use hyper::client::conn::http1::{handshake, SendRequest};
64/// use hyper::body::Incoming;
65/// use hyper::{Request, Response};
66/// use hyper_util::rt::TokioIo;
67/// use tokio::net::TcpStream;
68///
69/// # #[cfg(miri)] fn main() {}
70/// # #[cfg(not(miri))] #[tokio::main]
71/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
72/// // create a tcp connection
73/// let stream = TcpStream::connect("example.com:80").await?;
74///
75/// // perform the http handshakes
76/// let (mut rs, conn) = handshake(TokioIo::new(stream)).await?;
77/// tokio::spawn(conn);
78///
79/// /// make http request to example.com and read the response
80/// fn make_req(rs: &mut SendRequest<String>) -> impl Future<Output = hyper::Result<Response<Incoming>>> {
81/// let req = Request::builder()
82/// .header("Host", "example.com")
83/// .method("GET")
84/// .body(String::new())
85/// .unwrap();
86/// rs.send_request(req)
87/// }
88///
89/// // create a queue that can hold 128 concurrent requests
90/// let mut queue = FuturesUnorderedBounded::new(128);
91///
92/// // start up 128 requests
93/// for _ in 0..128 {
94/// queue.push(make_req(&mut rs));
95/// }
96/// // wait for a request to finish and start another to fill its place - up to 1024 total requests
97/// for _ in 128..1024 {
98/// queue.next().await;
99/// queue.push(make_req(&mut rs));
100/// }
101/// // wait for the tail end to finish
102/// for _ in 0..128 {
103/// queue.next().await;
104/// }
105/// # Ok(()) }
106/// ```
107pub struct FuturesUnorderedBounded<F> {
108 pub(crate) tasks: PinSlotMap<F>,
109 pub(crate) shared: WakerList,
110}
111
112impl<F> Unpin for FuturesUnorderedBounded<F> {}
113
114impl<F> FuturesUnorderedBounded<F> {
115 /// Constructs a new, empty [`FuturesUnorderedBounded`] with the given fixed capacity.
116 ///
117 /// The returned [`FuturesUnorderedBounded`] does not contain any futures.
118 /// In this state, [`FuturesUnorderedBounded::poll_next`](Stream::poll_next) will
119 /// return [`Poll::Ready(None)`](Poll::Ready).
120 pub fn new(cap: usize) -> Self {
121 Self {
122 tasks: PinSlotMap::new(cap),
123 shared: WakerList::new(cap),
124 }
125 }
126
127 /// Push a future into the set.
128 ///
129 /// This method adds the given future to the set. This method will not
130 /// call [`poll`](core::future::Future::poll) on the submitted future. The caller must
131 /// ensure that [`FuturesUnorderedBounded::poll_next`](Stream::poll_next) is called
132 /// in order to receive wake-up notifications for the given future.
133 ///
134 /// # Panics
135 /// This method will panic if the buffer is currently full. See [`FuturesUnorderedBounded::try_push`] to get a result instead
136 #[track_caller]
137 pub fn push(&mut self, fut: F) {
138 if self.try_push(fut).is_err() {
139 panic!("attempted to push into a full `FuturesUnorderedBounded`");
140 }
141 }
142
143 /// Push a future into the set.
144 ///
145 /// This method adds the given future to the set. This method will not
146 /// call [`poll`](core::future::Future::poll) on the submitted future. The caller must
147 /// ensure that [`FuturesUnorderedBounded::poll_next`](Stream::poll_next) is called
148 /// in order to receive wake-up notifications for the given future.
149 ///
150 /// # Errors
151 /// This method will error if the buffer is currently full, returning the future back
152 pub fn try_push(&mut self, fut: F) -> Result<(), F> {
153 self.try_push_with(fut, core::convert::identity)
154 }
155
156 #[inline]
157 pub(crate) fn try_push_with<T>(&mut self, t: T, f: impl FnMut(T) -> F) -> Result<(), T> {
158 let i = self.tasks.insert_with(t, f)?;
159 // safety: i is always within capacity
160 unsafe {
161 self.shared.push(i);
162 }
163 Ok(())
164 }
165
166 /// Returns `true` if the set contains no futures.
167 pub fn is_empty(&self) -> bool {
168 self.tasks.is_empty()
169 }
170
171 /// Returns the number of futures contained in the set.
172 ///
173 /// This represents the total number of in-flight futures.
174 pub fn len(&self) -> usize {
175 self.tasks.len()
176 }
177
178 /// Returns the number of futures that can be contained in the set.
179 pub fn capacity(&self) -> usize {
180 self.tasks.capacity()
181 }
182}
183
184type PollFn<F, O> = fn(Pin<&mut F>, cx: &mut Context<'_>) -> Poll<O>;
185
186impl<F> FuturesUnorderedBounded<F> {
187 pub(crate) fn poll_inner_no_remove<O>(
188 &mut self,
189 cx: &mut Context<'_>,
190 poll_fn: PollFn<F, O>,
191 ) -> Poll<Option<(usize, O)>> {
192 const MAX: usize = 61;
193
194 if self.is_empty() {
195 return Poll::Ready(None);
196 }
197
198 self.shared.register(cx.waker());
199
200 let mut count = 0;
201 loop {
202 count += 1;
203 // if we are in a pending only loop - let's break out.
204 // we do this even with tokio-coop in case of unconstrained tasks, or non-tokio runtimes.
205 if count > MAX {
206 cx.waker().wake_by_ref();
207 return Poll::Pending;
208 }
209
210 #[cfg(feature = "tokio-coop")]
211 let coop = core::task::ready!(tokio::task::coop::poll_proceed(cx));
212
213 match unsafe { self.shared.pop() } {
214 crate::waker_list::ReadySlot::None => return Poll::Pending,
215 crate::waker_list::ReadySlot::Inconsistent => {
216 cx.waker().wake_by_ref();
217 return Poll::Pending;
218 }
219 crate::waker_list::ReadySlot::Ready((i, waker)) => {
220 #[cfg(feature = "tokio-coop")]
221 coop.made_progress();
222
223 if let Some(task) = self.tasks.get(i) {
224 let mut cx = Context::from_waker(&waker);
225
226 let res = poll_fn(task, &mut cx);
227
228 if let Poll::Ready(x) = res {
229 return Poll::Ready(Some((i, x)));
230 }
231 }
232 }
233 }
234 }
235 }
236}
237
238impl<F: Future> FuturesUnorderedBounded<F> {
239 pub(crate) fn poll_inner(&mut self, cx: &mut Context<'_>) -> Poll<Option<(usize, F::Output)>> {
240 match self.poll_inner_no_remove(cx, F::poll) {
241 Poll::Ready(Some((i, x))) => {
242 self.tasks.remove(i);
243 Poll::Ready(Some((i, x)))
244 }
245 p => p,
246 }
247 }
248}
249
250impl<F: Future> Stream for FuturesUnorderedBounded<F> {
251 type Item = F::Output;
252
253 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
254 match self.poll_inner(cx) {
255 Poll::Ready(Some((_, x))) => Poll::Ready(Some(x)),
256 Poll::Ready(None) => Poll::Ready(None),
257 Poll::Pending => Poll::Pending,
258 }
259 }
260
261 fn size_hint(&self) -> (usize, Option<usize>) {
262 let len = self.len();
263 (len, Some(len))
264 }
265}
266
267impl<F> FromIterator<F> for FuturesUnorderedBounded<F> {
268 /// Constructs a new, empty [`FuturesUnorderedBounded`] with a fixed capacity that is the length of the iterator.
269 ///
270 /// # Example
271 ///
272 /// Making 1024 total HTTP requests, with a max concurrency of 128
273 ///
274 /// ```
275 /// use futures::future::Future;
276 /// use futures::stream::StreamExt;
277 /// use futures_buffered::FuturesUnorderedBounded;
278 /// use hyper::client::conn::http1::{handshake, SendRequest};
279 /// use hyper::body::Incoming;
280 /// use hyper::{Request, Response};
281 /// use hyper_util::rt::TokioIo;
282 /// use tokio::net::TcpStream;
283 ///
284 /// # #[cfg(miri)] fn main() {}
285 /// # #[cfg(not(miri))] #[tokio::main]
286 /// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
287 /// // create a tcp connection
288 /// let stream = TcpStream::connect("example.com:80").await?;
289 ///
290 /// // perform the http handshakes
291 /// let (mut rs, conn) = handshake(TokioIo::new(stream)).await?;
292 /// tokio::spawn(conn);
293 ///
294 /// /// make http request to example.com and read the response
295 /// fn make_req(rs: &mut SendRequest<String>) -> impl Future<Output = hyper::Result<Response<Incoming>>> {
296 /// let req = Request::builder()
297 /// .header("Host", "example.com")
298 /// .method("GET")
299 /// .body(String::new())
300 /// .unwrap();
301 /// rs.send_request(req)
302 /// }
303 ///
304 /// // create a queue with an initial 128 concurrent requests
305 /// let mut queue: FuturesUnorderedBounded<_> = (0..128).map(|_| make_req(&mut rs)).collect();
306 ///
307 /// // wait for a request to finish and start another to fill its place - up to 1024 total requests
308 /// for _ in 128..1024 {
309 /// queue.next().await;
310 /// queue.push(make_req(&mut rs));
311 /// }
312 /// // wait for the tail end to finish
313 /// for _ in 0..128 {
314 /// queue.next().await;
315 /// }
316 /// # Ok(()) }
317 /// ```
318 fn from_iter<T: IntoIterator<Item = F>>(iter: T) -> Self {
319 // store the futures in our task list
320 let tasks = PinSlotMap::from_iter(iter);
321
322 // determine the actual capacity and create the shared state
323 let cap = tasks.len();
324 let shared = WakerList::new(cap);
325
326 for i in 0..cap {
327 // safety: i is always within capacity
328 unsafe {
329 shared.push(i);
330 }
331 }
332
333 // create the queue
334 Self { tasks, shared }
335 }
336}
337
338impl<Fut: Future> FusedStream for FuturesUnorderedBounded<Fut> {
339 fn is_terminated(&self) -> bool {
340 self.is_empty()
341 }
342}
343
344impl<Fut> fmt::Debug for FuturesUnorderedBounded<Fut> {
345 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
346 f.debug_struct("FuturesUnorderedBounded")
347 .field("len", &self.tasks.len())
348 .finish_non_exhaustive()
349 }
350}
351
352#[cfg(test)]
353mod tests {
354 use super::*;
355 use core::{
356 cell::Cell,
357 future::{poll_fn, ready},
358 time::Duration,
359 };
360 use futures::{channel::oneshot, StreamExt};
361 use futures_test::task::noop_context;
362 use pin_project_lite::pin_project;
363 use std::time::Instant;
364
365 pin_project!(
366 struct PollCounter<'c, F> {
367 count: &'c Cell<usize>,
368 #[pin]
369 inner: F,
370 }
371 );
372
373 impl<F: Future> Future for PollCounter<'_, F> {
374 type Output = F::Output;
375 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
376 self.count.set(self.count.get() + 1);
377 self.project().inner.poll(cx)
378 }
379 }
380
381 struct Yield {
382 done: bool,
383 }
384 impl Unpin for Yield {}
385 impl Future for Yield {
386 type Output = ();
387
388 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
389 if self.as_mut().done {
390 Poll::Ready(())
391 } else {
392 cx.waker().wake_by_ref();
393 self.as_mut().done = true;
394 Poll::Pending
395 }
396 }
397 }
398
399 fn yield_now(count: &Cell<usize>) -> PollCounter<'_, Yield> {
400 PollCounter {
401 count,
402 inner: Yield { done: false },
403 }
404 }
405
406 #[test]
407 fn single() {
408 let c = Cell::new(0);
409
410 let mut buffer = FuturesUnorderedBounded::new(10);
411 buffer.push(yield_now(&c));
412 futures::executor::block_on(buffer.next());
413
414 drop(buffer);
415 assert_eq!(c.into_inner(), 2);
416 }
417
418 #[test]
419 #[should_panic(expected = "attempted to push into a full `FuturesUnorderedBounded`")]
420 fn full() {
421 let mut buffer = FuturesUnorderedBounded::new(1);
422 buffer.push(ready(()));
423 buffer.push(ready(()));
424 }
425
426 #[test]
427 fn len() {
428 let mut buffer = FuturesUnorderedBounded::new(1);
429
430 assert_eq!(buffer.len(), 0);
431 assert!(buffer.is_empty());
432 assert_eq!(buffer.capacity(), 1);
433 assert_eq!(buffer.size_hint(), (0, Some(0)));
434 assert!(buffer.is_terminated());
435
436 buffer.push(ready(()));
437
438 assert_eq!(buffer.len(), 1);
439 assert!(!buffer.is_empty());
440 assert_eq!(buffer.capacity(), 1);
441 assert_eq!(buffer.size_hint(), (1, Some(1)));
442 assert!(!buffer.is_terminated());
443
444 futures::executor::block_on(buffer.next());
445
446 assert_eq!(buffer.len(), 0);
447 assert!(buffer.is_empty());
448 assert_eq!(buffer.capacity(), 1);
449 assert_eq!(buffer.size_hint(), (0, Some(0)));
450 assert!(buffer.is_terminated());
451 }
452
453 #[test]
454 fn from_iter() {
455 let buffer = FuturesUnorderedBounded::from_iter((0..10).map(|_| ready(())));
456
457 assert_eq!(buffer.len(), 10);
458 assert_eq!(buffer.capacity(), 10);
459 assert_eq!(buffer.size_hint(), (10, Some(10)));
460 }
461
462 #[test]
463 fn drop_while_waiting() {
464 let mut buffer = FuturesUnorderedBounded::new(10);
465 let waker = Cell::new(None);
466 buffer.push(poll_fn(|cx| {
467 waker.set(Some(cx.waker().clone()));
468 Poll::<()>::Pending
469 }));
470
471 assert_eq!(buffer.poll_next_unpin(&mut noop_context()), Poll::Pending);
472 drop(buffer);
473
474 let cx = waker.take().unwrap();
475 drop(cx);
476 }
477
478 #[test]
479 fn multi() {
480 fn wait(count: &Cell<usize>) -> PollCounter<'_, Yield> {
481 yield_now(count)
482 }
483
484 let c = Cell::new(0);
485
486 let mut buffer = FuturesUnorderedBounded::new(10);
487 // build up
488 for _ in 0..10 {
489 buffer.push(wait(&c));
490 }
491 // poll and insert
492 for _ in 0..100 {
493 assert!(futures::executor::block_on(buffer.next()).is_some());
494 buffer.push(wait(&c));
495 }
496 // drain down
497 for _ in 0..10 {
498 assert!(futures::executor::block_on(buffer.next()).is_some());
499 }
500
501 let count = c.into_inner();
502 assert_eq!(count, 220);
503 }
504
505 #[test]
506 fn very_slow_task() {
507 let c = Cell::new(0);
508
509 let now = Instant::now();
510
511 let mut buffer = FuturesUnorderedBounded::new(10);
512 // build up
513 for _ in 0..9 {
514 buffer.push(yield_now(&c));
515 }
516 // spawn a slow future among a bunch of fast ones.
517 // the test is to make sure this doesn't block the rest getting completed
518 buffer.push(yield_now(&c));
519 // poll and insert
520 for _ in 0..100 {
521 assert!(futures::executor::block_on(buffer.next()).is_some());
522 buffer.push(yield_now(&c));
523 }
524 // drain down
525 for _ in 0..10 {
526 assert!(futures::executor::block_on(buffer.next()).is_some());
527 }
528
529 let dur = now.elapsed();
530 assert!(dur < Duration::from_millis(2050));
531
532 let count = c.into_inner();
533 assert_eq!(count, 220);
534 }
535
536 #[cfg(not(miri))]
537 #[tokio::test]
538 async fn unordered_large() {
539 for i in 0..256 {
540 let mut queue: FuturesUnorderedBounded<_> = ((0..i).map(|_| async move {
541 tokio::time::sleep(Duration::from_nanos(1)).await;
542 }))
543 .collect();
544 for _ in 0..i {
545 queue.next().await.unwrap();
546 }
547 }
548 }
549
550 #[test]
551 fn correct_fairer_order() {
552 const LEN: usize = 256;
553
554 let mut buffer = FuturesUnorderedBounded::new(LEN);
555 let mut txs = vec![];
556 for _ in 0..LEN {
557 let (tx, rx) = oneshot::channel();
558 buffer.push(rx);
559 txs.push(tx);
560 }
561
562 for _ in 0..=(LEN / 61) {
563 assert!(buffer.poll_next_unpin(&mut noop_context()).is_pending());
564 }
565
566 for (i, tx) in txs.into_iter().enumerate() {
567 let _ = tx.send(i);
568 }
569
570 for i in 0..LEN {
571 let poll = buffer.poll_next_unpin(&mut noop_context());
572 assert_eq!(poll, Poll::Ready(Some(Ok(i))));
573 }
574 }
575}