1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
use alloc::boxed::Box;
use core::{
future::Future,
mem::{self, ManuallyDrop},
pin::Pin,
task::{ready, Context, Poll},
};
use async_lock::futures::Lock;
use crate::{
markers::ParallelSend,
runtime::{schedular::SchedularPoll, InnerRuntime},
AsyncContext, Ctx,
};
pub struct WithFuture<'a, F, R> {
context: &'a AsyncContext,
lock_state: LockState<'a>,
state: WithFutureState<'a, F, R>,
}
enum LockState<'a> {
Initial,
Pending(ManuallyDrop<Lock<'a, InnerRuntime>>),
}
impl<'a> Drop for LockState<'a> {
fn drop(&mut self) {
if let LockState::Pending(ref mut x) = self {
unsafe { ManuallyDrop::drop(x) }
}
}
}
enum WithFutureState<'a, F, R> {
Initial {
closure: F,
},
FutureCreated {
future: Pin<Box<dyn Future<Output = R> + 'a + Send>>,
},
Done,
}
impl<'a, F, R> WithFuture<'a, F, R>
where
F: for<'js> FnOnce(Ctx<'js>) -> Pin<Box<dyn Future<Output = R> + 'js + Send>> + ParallelSend,
R: ParallelSend,
{
pub fn new(context: &'a AsyncContext, f: F) -> Self {
Self {
context,
lock_state: LockState::Initial,
state: WithFutureState::Initial { closure: f },
}
}
}
impl<'a, F, R> Future for WithFuture<'a, F, R>
where
F: for<'js> FnOnce(Ctx<'js>) -> Pin<Box<dyn Future<Output = R> + 'js + Send>> + ParallelSend,
R: ParallelSend + 'static,
{
type Output = R;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// Implementation ensures we don't break pin guarantees.
let this = unsafe { self.get_unchecked_mut() };
let mut lock = loop {
// We don't move the lock_state as long as it is pending
if let LockState::Pending(ref mut fut) = &mut this.lock_state {
// SAFETY: Sound as we don't move future while it is pending.
let pin = unsafe { Pin::new_unchecked(&mut **fut) };
let lock = ready!(pin.poll(cx));
// at this point we have acquired a lock, so we will now drop the future allowing
// us to reused the memory space.
unsafe { ManuallyDrop::drop(fut) };
// The pinned memory is dropped so now we can freely move into it.
this.lock_state = LockState::Initial;
break lock;
} else {
// we assign a state with manually drop so we can drop the value when we need to
// replace it.
// Assign
this.lock_state =
LockState::Pending(ManuallyDrop::new(this.context.0.rt().inner.lock()));
}
};
lock.runtime.update_stack_top();
// At this point we have locked the runtime so we start running the actual future
// we can move this memory since the future is boxed and thus movable.
let mut future = match mem::replace(&mut this.state, WithFutureState::Done) {
WithFutureState::Initial { closure } => {
// SAFETY: we have a lock, so creating this ctx is save.
let ctx = unsafe { Ctx::new_async(this.context) };
Box::pin(closure(ctx))
}
WithFutureState::FutureCreated { future } => future,
// The future was called an additional time,
// We don't have anything valid to do here so just panic.
WithFutureState::Done => panic!("With future called after it returned"),
};
let res = loop {
let mut made_progress = false;
if let Poll::Ready(x) = future.as_mut().poll(cx) {
break Poll::Ready(x);
};
let opaque = lock.runtime.get_opaque();
match opaque.poll(cx) {
SchedularPoll::Empty => {
// if the schedular is empty that means the future is waiting on an external or
// on a promise.
}
SchedularPoll::ShouldYield => {
this.state = WithFutureState::FutureCreated { future };
return Poll::Pending;
}
SchedularPoll::Pending => {
// we couldn't drive any futures so we should run some jobs to see we can get
// some progress.
}
SchedularPoll::PendingProgress => {
// We did make some progress so the root future might not be blocked, but it is
// probably still a good idea to run some jobs as most futures first require a
// single job to run before unblocking.
made_progress = true;
}
};
loop {
match lock.runtime.execute_pending_job() {
Ok(false) => break,
Ok(true) => made_progress = true,
Err(_ctx) => {
// TODO figure out what to do with a job error.
made_progress = true;
}
}
}
// If no work could be done we should yield back.
if !made_progress {
this.state = WithFutureState::FutureCreated { future };
break Poll::Pending;
}
};
// Manually drop the lock so it isn't accidentally moved into somewhere.
mem::drop(lock);
res
}
}
#[cfg(feature = "parallel")]
unsafe impl<F, R> Send for WithFuture<'_, F, R> {}