Skip to main content

serf_rpc/
request.rs

1use std::{
2    future::Future,
3    sync::{Arc, Mutex},
4    task::{Poll, Waker},
5};
6
7use crate::{Client, RPCResponse, RPCResult, SeqHandler, SeqRead, SerializedCommand};
8
9impl Client {
10    /// Asyncrounously sends a request and waits for a response.
11    pub(crate) fn request<'a, R: RPCResponse>(
12        &'a self,
13        name: &'static str,
14        body: Vec<u8>,
15    ) -> RPCRequest<'a, R> {
16        RPCRequest {
17            client: self,
18            state: Arc::new(Mutex::new(RequestState::Unsent(SerializedCommand {
19                name,
20                body,
21            }))),
22        }
23    }
24}
25
26pub struct RPCRequest<'a, R: RPCResponse> {
27    client: &'a Client,
28    state: Arc<Mutex<RequestState<R>>>,
29}
30
31enum RequestState<R: RPCResponse> {
32    Unsent(SerializedCommand),
33    Pending(Waker),
34    Ready(RPCResult<R>),
35    Invalid,
36}
37
38impl<'a, T: RPCResponse> RPCRequest<'a, T> {
39    /// Send this request, but ignore the response
40    pub fn send_ignored(self) {
41        match std::mem::replace(&mut *self.state.lock().unwrap(), RequestState::Invalid) {
42            RequestState::Unsent(cmd) => {
43                self.client.send_command(cmd, None);
44            }
45            _ => {
46                panic!()
47            }
48        }
49    }
50}
51
52impl<'a, T: RPCResponse> Future for RPCRequest<'a, T> {
53    type Output = RPCResult<T>;
54
55    fn poll(
56        self: std::pin::Pin<&mut Self>,
57        cx: &mut std::task::Context<'_>,
58    ) -> std::task::Poll<Self::Output> {
59        let mut state = self.state.lock().unwrap();
60
61        match std::mem::replace(&mut *state, RequestState::Invalid) {
62            RequestState::Unsent(cmd) => {
63                *state = RequestState::Pending(cx.waker().clone());
64                self.client.send_command(cmd, Some(self.state.clone()));
65                return Poll::Pending;
66            }
67            RequestState::Pending(_) => {
68                *state = RequestState::Pending(cx.waker().clone());
69                return Poll::Pending;
70            }
71            RequestState::Ready(response) => {
72                return Poll::Ready(response);
73            }
74            RequestState::Invalid => {
75                panic!()
76            }
77        }
78    }
79}
80
81impl<T> SeqHandler for Mutex<RequestState<T>>
82where
83    T: RPCResponse,
84{
85    fn handle(&self, res: RPCResult<SeqRead>) {
86        let res = res.and_then(T::read_from);
87        let ready = RequestState::Ready(res);
88
89        match std::mem::replace(&mut *self.lock().unwrap(), ready) {
90            RequestState::Pending(waker) => waker.wake(),
91            _ => panic!(),
92        }
93    }
94}