Skip to main content

futures_util/
lib.rs

1//! Combinators and utilities for working with `Future`s, `Stream`s, `Sink`s,
2//! and the `AsyncRead` and `AsyncWrite` traits.
3
4#![no_std]
5#![doc(test(
6    no_crate_inject,
7    attr(
8        deny(warnings, rust_2018_idioms, single_use_lifetimes),
9        allow(dead_code, unused_assignments, unused_variables)
10    )
11))]
12#![warn(missing_docs, unsafe_op_in_unsafe_fn)]
13#![cfg_attr(docsrs, feature(doc_cfg))]
14
15#[cfg(all(feature = "bilock", not(feature = "unstable")))]
16compile_error!("The `bilock` feature requires the `unstable` feature as an explicit opt-in to unstable features");
17
18#[cfg(feature = "alloc")]
19extern crate alloc;
20#[cfg(feature = "std")]
21extern crate std;
22
23// Macro re-exports
24pub use futures_core::ready;
25
26#[cfg(feature = "async-await")]
27#[macro_use]
28mod async_await;
29#[cfg(feature = "async-await")]
30#[doc(hidden)]
31pub use self::async_await::*;
32
33// Not public API.
34#[doc(hidden)]
35pub mod __private {
36    pub use crate::*;
37    pub use core::{
38        option::Option::{self, None, Some},
39        pin::Pin,
40        result::Result::{Err, Ok},
41    };
42
43    #[cfg(feature = "async-await")]
44    pub mod async_await {
45        pub use crate::async_await::*;
46    }
47}
48
49#[cfg(feature = "sink")]
50macro_rules! delegate_sink {
51    ($field:ident, $item:ty) => {
52        fn poll_ready(
53            self: core::pin::Pin<&mut Self>,
54            cx: &mut core::task::Context<'_>,
55        ) -> core::task::Poll<Result<(), Self::Error>> {
56            self.project().$field.poll_ready(cx)
57        }
58
59        fn start_send(self: core::pin::Pin<&mut Self>, item: $item) -> Result<(), Self::Error> {
60            self.project().$field.start_send(item)
61        }
62
63        fn poll_flush(
64            self: core::pin::Pin<&mut Self>,
65            cx: &mut core::task::Context<'_>,
66        ) -> core::task::Poll<Result<(), Self::Error>> {
67            self.project().$field.poll_flush(cx)
68        }
69
70        fn poll_close(
71            self: core::pin::Pin<&mut Self>,
72            cx: &mut core::task::Context<'_>,
73        ) -> core::task::Poll<Result<(), Self::Error>> {
74            self.project().$field.poll_close(cx)
75        }
76    };
77}
78
79macro_rules! delegate_future {
80    ($field:ident) => {
81        fn poll(
82            self: core::pin::Pin<&mut Self>,
83            cx: &mut core::task::Context<'_>,
84        ) -> core::task::Poll<Self::Output> {
85            self.project().$field.poll(cx)
86        }
87    };
88}
89
90macro_rules! delegate_stream {
91    ($field:ident) => {
92        fn poll_next(
93            self: core::pin::Pin<&mut Self>,
94            cx: &mut core::task::Context<'_>,
95        ) -> core::task::Poll<Option<Self::Item>> {
96            self.project().$field.poll_next(cx)
97        }
98        fn size_hint(&self) -> (usize, Option<usize>) {
99            self.$field.size_hint()
100        }
101    };
102}
103
104#[cfg(feature = "io")]
105#[cfg(feature = "std")]
106macro_rules! delegate_async_write {
107    ($field:ident) => {
108        fn poll_write(
109            self: core::pin::Pin<&mut Self>,
110            cx: &mut core::task::Context<'_>,
111            buf: &[u8],
112        ) -> core::task::Poll<std::io::Result<usize>> {
113            self.project().$field.poll_write(cx, buf)
114        }
115        fn poll_write_vectored(
116            self: core::pin::Pin<&mut Self>,
117            cx: &mut core::task::Context<'_>,
118            bufs: &[std::io::IoSlice<'_>],
119        ) -> core::task::Poll<std::io::Result<usize>> {
120            self.project().$field.poll_write_vectored(cx, bufs)
121        }
122        fn poll_flush(
123            self: core::pin::Pin<&mut Self>,
124            cx: &mut core::task::Context<'_>,
125        ) -> core::task::Poll<std::io::Result<()>> {
126            self.project().$field.poll_flush(cx)
127        }
128        fn poll_close(
129            self: core::pin::Pin<&mut Self>,
130            cx: &mut core::task::Context<'_>,
131        ) -> core::task::Poll<std::io::Result<()>> {
132            self.project().$field.poll_close(cx)
133        }
134    };
135}
136
137#[cfg(feature = "io")]
138#[cfg(feature = "std")]
139macro_rules! delegate_async_read {
140    ($field:ident) => {
141        fn poll_read(
142            self: core::pin::Pin<&mut Self>,
143            cx: &mut core::task::Context<'_>,
144            buf: &mut [u8],
145        ) -> core::task::Poll<std::io::Result<usize>> {
146            self.project().$field.poll_read(cx, buf)
147        }
148
149        fn poll_read_vectored(
150            self: core::pin::Pin<&mut Self>,
151            cx: &mut core::task::Context<'_>,
152            bufs: &mut [std::io::IoSliceMut<'_>],
153        ) -> core::task::Poll<std::io::Result<usize>> {
154            self.project().$field.poll_read_vectored(cx, bufs)
155        }
156    };
157}
158
159#[cfg(feature = "io")]
160#[cfg(feature = "std")]
161macro_rules! delegate_async_buf_read {
162    ($field:ident) => {
163        fn poll_fill_buf(
164            self: core::pin::Pin<&mut Self>,
165            cx: &mut core::task::Context<'_>,
166        ) -> core::task::Poll<std::io::Result<&[u8]>> {
167            self.project().$field.poll_fill_buf(cx)
168        }
169
170        fn consume(self: core::pin::Pin<&mut Self>, amt: usize) {
171            self.project().$field.consume(amt)
172        }
173    };
174}
175
176macro_rules! delegate_access_inner {
177    ($field:ident, $inner:ty, ($($ind:tt)*)) => {
178        /// Acquires a reference to the underlying sink or stream that this combinator is
179        /// pulling from.
180        pub fn get_ref(&self) -> &$inner {
181            (&self.$field) $($ind get_ref())*
182        }
183
184        /// Acquires a mutable reference to the underlying sink or stream that this
185        /// combinator is pulling from.
186        ///
187        /// Note that care must be taken to avoid tampering with the state of the
188        /// sink or stream which may otherwise confuse this combinator.
189        pub fn get_mut(&mut self) -> &mut $inner {
190            (&mut self.$field) $($ind get_mut())*
191        }
192
193        /// Acquires a pinned mutable reference to the underlying sink or stream that this
194        /// combinator is pulling from.
195        ///
196        /// Note that care must be taken to avoid tampering with the state of the
197        /// sink or stream which may otherwise confuse this combinator.
198        pub fn get_pin_mut(self: core::pin::Pin<&mut Self>) -> core::pin::Pin<&mut $inner> {
199            self.project().$field $($ind get_pin_mut())*
200        }
201
202        /// Consumes this combinator, returning the underlying sink or stream.
203        ///
204        /// Note that this may discard intermediate state of this combinator, so
205        /// care should be taken to avoid losing resources when this is called.
206        pub fn into_inner(self) -> $inner {
207            self.$field $($ind into_inner())*
208        }
209    }
210}
211
212macro_rules! delegate_all {
213    (@trait Future $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => {
214        impl<$($arg),*> futures_core::future::Future for $name<$($arg),*> where $t: futures_core::future::Future $(, $($bound)*)* {
215            type Output = <$t as futures_core::future::Future>::Output;
216
217            delegate_future!(inner);
218        }
219    };
220    (@trait FusedFuture $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => {
221        impl<$($arg),*> futures_core::future::FusedFuture for $name<$($arg),*> where $t: futures_core::future::FusedFuture $(, $($bound)*)* {
222            fn is_terminated(&self) -> bool {
223                self.inner.is_terminated()
224            }
225        }
226    };
227    (@trait Stream $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => {
228        impl<$($arg),*> futures_core::stream::Stream for $name<$($arg),*> where $t: futures_core::stream::Stream $(, $($bound)*)* {
229            type Item = <$t as futures_core::stream::Stream>::Item;
230
231            delegate_stream!(inner);
232        }
233    };
234    (@trait FusedStream $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => {
235        impl<$($arg),*> futures_core::stream::FusedStream for $name<$($arg),*> where $t: futures_core::stream::FusedStream $(, $($bound)*)* {
236            fn is_terminated(&self) -> bool {
237                self.inner.is_terminated()
238            }
239        }
240    };
241    (@trait Sink $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => {
242        #[cfg(feature = "sink")]
243        impl<_Item, $($arg),*> futures_sink::Sink<_Item> for $name<$($arg),*> where $t: futures_sink::Sink<_Item> $(, $($bound)*)* {
244            type Error = <$t as futures_sink::Sink<_Item>>::Error;
245
246            delegate_sink!(inner, _Item);
247        }
248    };
249    (@trait Debug $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => {
250        impl<$($arg),*> core::fmt::Debug for $name<$($arg),*> where $t: core::fmt::Debug $(, $($bound)*)* {
251            fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
252                core::fmt::Debug::fmt(&self.inner, f)
253            }
254        }
255    };
256    (@trait AccessInner[$inner:ty, ($($ind:tt)*)] $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => {
257        impl<$($arg),*> $name<$($arg),*> $(where $($bound)*)* {
258            delegate_access_inner!(inner, $inner, ($($ind)*));
259        }
260    };
261    (@trait New[|$($param:ident: $paramt:ty),*| $cons:expr] $name:ident < $($arg:ident),* > ($t:ty) $(where $($bound:tt)*)*) => {
262        impl<$($arg),*> $name<$($arg),*> $(where $($bound)*)* {
263            pub(crate) fn new($($param: $paramt),*) -> Self {
264                Self { inner: $cons }
265            }
266        }
267    };
268    ($(#[$attr:meta])* $name:ident<$($arg:ident),*>($t:ty) : $ftrait:ident $([$($targs:tt)*])* $({$($item:tt)*})* $(where $($bound:tt)*)*) => {
269        pin_project_lite::pin_project! {
270            #[must_use = "futures/streams/sinks do nothing unless you `.await` or poll them"]
271            $(#[$attr])*
272            pub struct $name< $($arg),* > $(where $($bound)*)* { #[pin] inner: $t }
273        }
274
275        impl<$($arg),*> $name< $($arg),* > $(where $($bound)*)* {
276            $($($item)*)*
277        }
278
279        delegate_all!(@trait $ftrait $([$($targs)*])* $name<$($arg),*>($t) $(where $($bound)*)*);
280    };
281    ($(#[$attr:meta])* $name:ident<$($arg:ident),*>($t:ty) : $ftrait:ident $([$($ftargs:tt)*])* + $strait:ident $([$($stargs:tt)*])* $(+ $trait:ident $([$($targs:tt)*])*)* $({$($item:tt)*})* $(where $($bound:tt)*)*) => {
282        delegate_all!($(#[$attr])* $name<$($arg),*>($t) : $strait $([$($stargs)*])* $(+ $trait $([$($targs)*])*)* $({$($item)*})* $(where $($bound)*)*);
283
284        delegate_all!(@trait $ftrait $([$($ftargs)*])* $name<$($arg),*>($t) $(where $($bound)*)*);
285    };
286}
287
288pub mod future;
289#[doc(no_inline)]
290pub use crate::future::{Future, FutureExt, TryFuture, TryFutureExt};
291
292pub mod stream;
293#[doc(no_inline)]
294pub use crate::stream::{Stream, StreamExt, TryStream, TryStreamExt};
295
296#[cfg(feature = "sink")]
297#[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
298pub mod sink;
299#[cfg(feature = "sink")]
300#[doc(no_inline)]
301pub use crate::sink::{Sink, SinkExt};
302
303pub mod task;
304
305pub mod never;
306
307#[cfg(feature = "compat")]
308#[cfg_attr(docsrs, doc(cfg(feature = "compat")))]
309pub mod compat;
310
311#[cfg(feature = "io")]
312#[cfg_attr(docsrs, doc(cfg(feature = "io")))]
313#[cfg(feature = "std")]
314pub mod io;
315#[cfg(feature = "io")]
316#[cfg(feature = "std")]
317#[doc(no_inline)]
318pub use crate::io::{
319    AsyncBufRead, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt, AsyncWrite,
320    AsyncWriteExt,
321};
322
323#[cfg(feature = "alloc")]
324pub mod lock;
325
326#[cfg_attr(target_os = "none", cfg(target_has_atomic = "ptr"))]
327#[cfg(feature = "alloc")]
328mod abortable;
329
330mod fns;
331mod macros;
332mod unfold_state;