1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
use std::{future::Future, mem, pin::Pin, task::Context, task::Poll};
pub use futures_core::Stream;
pub use futures_sink::Sink;
mod either;
mod join;
mod lazy;
mod ready;
mod select;
pub use self::either::Either;
pub use self::join::{join, join_all};
pub use self::lazy::{lazy, Lazy};
pub use self::ready::Ready;
pub use self::select::select;
pub type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + 'a>>;
pub fn poll_fn<T, F>(f: F) -> impl Future<Output = T>
where
F: FnMut(&mut Context<'_>) -> Poll<T>,
{
PollFn { f }
}
#[must_use = "futures do nothing unless you `.await` or poll them"]
struct PollFn<F> {
f: F,
}
impl<F> Unpin for PollFn<F> {}
impl<T, F> Future for PollFn<F>
where
F: FnMut(&mut Context<'_>) -> Poll<T>,
{
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
(self.f)(cx)
}
}
#[doc(hidden)]
#[deprecated(since = "0.1.4", note = "Use stream_recv() fn instead")]
pub async fn next<S>(stream: &mut S) -> Option<S::Item>
where
S: Stream + Unpin,
{
stream_recv(stream).await
}
pub async fn stream_recv<S>(stream: &mut S) -> Option<S::Item>
where
S: Stream + Unpin,
{
poll_fn(|cx| Pin::new(&mut *stream).poll_next(cx)).await
}
#[doc(hidden)]
pub async fn send<S, I>(sink: &mut S, item: I) -> Result<(), S::Error>
where
S: Sink<I> + Unpin,
{
sink_write(sink, item).await
}
pub async fn sink_write<S, I>(sink: &mut S, item: I) -> Result<(), S::Error>
where
S: Sink<I> + Unpin,
{
poll_fn(|cx| Pin::new(&mut *sink).poll_ready(cx)).await?;
Pin::new(&mut *sink).start_send(item)?;
poll_fn(|cx| Pin::new(&mut *sink).poll_flush(cx)).await
}
enum MaybeDone<F>
where
F: Future,
{
Pending(F),
Done(F::Output),
Gone,
}
impl<F: Future> MaybeDone<F> {
fn take_output(self: Pin<&mut Self>) -> Option<F::Output> {
match &*self {
Self::Done(_) => {}
Self::Pending(_) | Self::Gone => return None,
}
unsafe {
match mem::replace(self.get_unchecked_mut(), Self::Gone) {
MaybeDone::Done(output) => Some(output),
_ => unreachable!(),
}
}
}
}
impl<F: Future> Future for MaybeDone<F> {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
unsafe {
match self.as_mut().get_unchecked_mut() {
MaybeDone::Pending(f) => {
let res = futures_core::ready!(Pin::new_unchecked(f).poll(cx));
self.set(Self::Done(res));
}
MaybeDone::Done(_) => {}
MaybeDone::Gone => panic!("MaybeDone polled after value taken"),
}
}
Poll::Ready(())
}
}