1use std::{
2 future::Future,
3 sync::{Arc, Mutex},
4 task::{Poll, Waker},
5};
6
7use crate::{Client, RPCResponse, RPCResult, SeqHandler, SeqRead, SerializedCommand};
8
9impl Client {
10 pub(crate) fn request<'a, R: RPCResponse>(
12 &'a self,
13 name: &'static str,
14 body: Vec<u8>,
15 ) -> RPCRequest<'a, R> {
16 RPCRequest {
17 client: self,
18 state: Arc::new(Mutex::new(RequestState::Unsent(SerializedCommand {
19 name,
20 body,
21 }))),
22 }
23 }
24}
25
26pub struct RPCRequest<'a, R: RPCResponse> {
27 client: &'a Client,
28 state: Arc<Mutex<RequestState<R>>>,
29}
30
31enum RequestState<R: RPCResponse> {
32 Unsent(SerializedCommand),
33 Pending(Waker),
34 Ready(RPCResult<R>),
35 Invalid,
36}
37
38impl<'a, T: RPCResponse> RPCRequest<'a, T> {
39 pub fn send_ignored(self) {
41 match std::mem::replace(&mut *self.state.lock().unwrap(), RequestState::Invalid) {
42 RequestState::Unsent(cmd) => {
43 self.client.send_command(cmd, None);
44 }
45 _ => {
46 panic!()
47 }
48 }
49 }
50}
51
52impl<'a, T: RPCResponse> Future for RPCRequest<'a, T> {
53 type Output = RPCResult<T>;
54
55 fn poll(
56 self: std::pin::Pin<&mut Self>,
57 cx: &mut std::task::Context<'_>,
58 ) -> std::task::Poll<Self::Output> {
59 let mut state = self.state.lock().unwrap();
60
61 match std::mem::replace(&mut *state, RequestState::Invalid) {
62 RequestState::Unsent(cmd) => {
63 *state = RequestState::Pending(cx.waker().clone());
64 self.client.send_command(cmd, Some(self.state.clone()));
65 return Poll::Pending;
66 }
67 RequestState::Pending(_) => {
68 *state = RequestState::Pending(cx.waker().clone());
69 return Poll::Pending;
70 }
71 RequestState::Ready(response) => {
72 return Poll::Ready(response);
73 }
74 RequestState::Invalid => {
75 panic!()
76 }
77 }
78 }
79}
80
81impl<T> SeqHandler for Mutex<RequestState<T>>
82where
83 T: RPCResponse,
84{
85 fn handle(&self, res: RPCResult<SeqRead>) {
86 let res = res.and_then(T::read_from);
87 let ready = RequestState::Ready(res);
88
89 match std::mem::replace(&mut *self.lock().unwrap(), ready) {
90 RequestState::Pending(waker) => waker.wake(),
91 _ => panic!(),
92 }
93 }
94}