1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
use std::task::{Context, Poll, Waker};
use std::future::Future;
use std::marker::PhantomPinned;
use std::mem::ManuallyDrop;
use std::pin::Pin;
use std::sync::Arc;
use crossbeam_utils::atomic::AtomicCell;
use crate::event_loop::EventLoop;
use crate::messages::{ProxyRegisterBody, ProxyRequest, ProxyResponse};
pub struct FutEventLoop {
pub(crate) body: Arc<AtomicCell<ProxyRegisterBody>>
}
#[must_use = "the response won't actually send until you await or poll"]
#[repr(C)]
pub struct FutResponse<'a, T>(
ManuallyDrop<_FutResponse<'a, T>>
);
#[must_use = "the response won't actually send until you await or poll"]
#[repr(C)]
struct _FutResponse<'a, T> {
response: Option<ProxyResponse>,
held_future: Option<Box<dyn Future<Output=()> + 'a>>,
message: Option<ProxyRequest>,
proxy: &'a EventLoop,
convert: fn(ProxyResponse) -> T,
_p: PhantomPinned
}
pub(crate) struct PendingRequest {
waker: Option<Waker>,
response_ptr: *mut Option<ProxyResponse>
}
impl Future for FutEventLoop {
type Output = EventLoop;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match self.body.take() {
ProxyRegisterBody::Init => {
self.body.store(ProxyRegisterBody::Polled { waker: cx.waker().clone() });
Poll::Pending
}
ProxyRegisterBody::Polled { waker: _ } => panic!("polled redundantly"),
ProxyRegisterBody::Ready { info } => Poll::Ready(EventLoop::from(info))
}
}
}
impl<'a, T> FutResponse<'a, T> {
pub(crate) fn new(
proxy: &'a EventLoop,
message: ProxyRequest,
convert: fn(ProxyResponse) -> T
) -> Self {
FutResponse(ManuallyDrop::new(_FutResponse {
response: None,
held_future: None,
message: Some(message),
proxy,
convert,
_p: PhantomPinned
}))
}
fn finalize(&mut self, response: ProxyResponse) -> Poll<T> {
let convert = self.0.convert;
unsafe { ManuallyDrop::drop(&mut self.0) };
Poll::Ready(convert(response))
}
}
impl<'a, T> Future for FutResponse<'a, T> {
type Output = T;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this_wrapper = unsafe { self.get_unchecked_mut() };
let this = &mut *this_wrapper.0;
if let Some(response) = this.response.take() {
debug_assert!(this.message.is_none());
this_wrapper.finalize(response)
} else {
if let Some(message) = this.message.take() {
debug_assert!(this.held_future.is_none());
this.held_future = Some(Box::new(this.proxy.actually_send(message, cx.waker().clone(), &mut this.response as *mut _)));
} else {
debug_assert!(this.held_future.is_some());
}
let held_future = unsafe { Pin::new_unchecked(this.held_future.as_mut().unwrap().as_mut()) };
match held_future.poll(cx) {
Poll::Ready(()) => {
let response = this.response.take().expect("FutResponse poll instantly succeeded but there is no response");
this_wrapper.finalize(response)
}
Poll::Pending => Poll::Pending
}
}
}
}
impl PendingRequest {
pub(crate) fn new(waker: Waker, response_ptr: *mut Option<ProxyResponse>) -> Self {
PendingRequest {
waker: Some(waker),
response_ptr
}
}
pub(crate) fn resolve(mut self, response: ProxyResponse) {
unsafe { *self.response_ptr = Some(response); }
let waker = self.waker.take().expect("redundantly resolved");
waker.wake();
}
}