1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
use std::{future::Future, mem, pin::Pin, task::Context, task::Poll};
pub use futures_core::Stream;
pub use futures_sink::Sink;
mod either;
mod join;
mod lazy;
mod ready;
mod select;
pub use self::either::Either;
pub use self::join::{join, join_all};
pub use self::lazy::{lazy, Lazy};
pub use self::ready::Ready;
pub use self::select::select;
pub fn poll_fn<T, F>(f: F) -> impl Future<Output = T>
where
F: FnMut(&mut Context<'_>) -> Poll<T>,
{
PollFn { f }
}
#[must_use = "futures do nothing unless you `.await` or poll them"]
struct PollFn<F> {
f: F,
}
impl<F> Unpin for PollFn<F> {}
impl<T, F> Future for PollFn<F>
where
F: FnMut(&mut Context<'_>) -> Poll<T>,
{
type Output = T;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
(self.f)(cx)
}
}
#[doc(hidden)]
#[deprecated(since = "0.1.4", note = "Use stream_recv() fn instead")]
pub async fn next<S>(stream: &mut S) -> Option<S::Item>
where
S: Stream + Unpin,
{
stream_recv(stream).await
}
pub async fn stream_recv<S>(stream: &mut S) -> Option<S::Item>
where
S: Stream + Unpin,
{
poll_fn(|cx| Pin::new(&mut *stream).poll_next(cx)).await
}
#[doc(hidden)]
pub async fn send<S, I>(sink: &mut S, item: I) -> Result<(), S::Error>
where
S: Sink<I> + Unpin,
{
sink_write(sink, item).await
}
pub async fn sink_write<S, I>(sink: &mut S, item: I) -> Result<(), S::Error>
where
S: Sink<I> + Unpin,
{
poll_fn(|cx| Pin::new(&mut *sink).poll_ready(cx)).await?;
Pin::new(&mut *sink).start_send(item)?;
poll_fn(|cx| Pin::new(&mut *sink).poll_flush(cx)).await
}
enum MaybeDone<F>
where
F: Future,
{
Pending(F),
Done(F::Output),
Gone,
}
impl<F: Future> MaybeDone<F> {
fn take_output(self: Pin<&mut Self>) -> Option<F::Output> {
match &*self {
Self::Done(_) => {}
Self::Pending(_) | Self::Gone => return None,
}
unsafe {
match mem::replace(self.get_unchecked_mut(), Self::Gone) {
MaybeDone::Done(output) => Some(output),
_ => unreachable!(),
}
}
}
}
impl<F: Future> Future for MaybeDone<F> {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
unsafe {
match self.as_mut().get_unchecked_mut() {
MaybeDone::Pending(f) => {
let res = futures_core::ready!(Pin::new_unchecked(f).poll(cx));
self.set(Self::Done(res));
}
MaybeDone::Done(_) => {}
MaybeDone::Gone => panic!("MaybeDone polled after value taken"),
}
}
Poll::Ready(())
}
}