monoio_rust2go/
future.rs

1use std::future::Future;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4
5use crate::SlotReader;
6
7impl<Req, Resp, Exec> ResponseFuture<Req, Resp, Exec> {
8    pub fn new(exec: Exec, req: Req, callback: *const ()) -> Self {
9        Self::Init(exec, req, callback)
10    }
11
12    pub fn new_without_req(
13        exec: Exec,
14        req: Req,
15        callback: *const (),
16    ) -> ResponseFutureWithoutReq<Req, Resp, Exec> {
17        ResponseFutureWithoutReq(Self::Init(exec, req, callback))
18    }
19}
20
21pub enum ResponseFuture<Req, Resp, Exec> {
22    // go ffi function, request, callback function ptr
23    Init(Exec, Req, *const ()),
24    // slot
25    Executed(SlotReader<Resp, (Req, Vec<u8>)>),
26    Fused,
27}
28
29unsafe impl<Req: Send, Resp: Send, Exec> Send for ResponseFuture<Req, Resp, Exec> {}
30unsafe impl<Req: Sync, Resp: Sync, Exec> Sync for ResponseFuture<Req, Resp, Exec> {}
31
32impl<Req, Resp, Exec> Future for ResponseFuture<Req, Resp, Exec>
33where
34    // Exec: FnOnce(Req, *SlotWriter<Resp>, Callback)
35    // Note: Req is usually a tuple.
36    Exec: FnOnce(Req::Ref, *const (), *const ()) + Unpin,
37    Req: Unpin + crate::ToRef,
38{
39    type Output = (Resp, Req);
40
41    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
42        let this = self.get_mut();
43        match this {
44            Self::Executed(reader) => {
45                reader.set_waker(cx.waker());
46                if let Some((resp, attachment)) = unsafe { reader.read_with_attachment() } {
47                    *this = Self::Fused;
48                    let (req, _) = attachment.unwrap();
49                    return Poll::Ready((resp, req));
50                }
51            }
52            Self::Init(..) => {
53                // replace to take ownership
54                let (reader, mut writer) = crate::slot::new_atomic_slot::<Resp, (Req, Vec<u8>)>();
55
56                let (exec, req, cb) = match std::mem::replace(this, Self::Executed(reader)) {
57                    Self::Init(exec, req, cb) => (exec, req, cb),
58                    Self::Executed(_) => unsafe { std::hint::unreachable_unchecked() },
59                    Self::Fused => unsafe { std::hint::unreachable_unchecked() },
60                };
61
62                let (buf, req_ref) = req.calc_ref();
63                writer.attach((req, buf));
64                writer.set_waker(cx.waker().clone());
65
66                // execute the ffi function
67                let w_ptr = writer.into_ptr();
68                (exec)(req_ref, w_ptr, cb);
69            }
70            Self::Fused => {
71                panic!("Future polled after ready");
72            }
73        }
74        Poll::Pending
75    }
76}
77
78pub struct ResponseFutureWithoutReq<Req, Resp, Exec>(pub ResponseFuture<Req, Resp, Exec>);
79
80impl<Req, Resp, Exec> Future for ResponseFutureWithoutReq<Req, Resp, Exec>
81where
82    ResponseFuture<Req, Resp, Exec>: Future<Output = (Resp, Req)>,
83{
84    type Output = Resp;
85
86    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
87        unsafe { Pin::new_unchecked(&mut self.get_unchecked_mut().0) }
88            .poll(cx)
89            .map(|r| r.0)
90    }
91}