1use std::fmt;
4use std::future::Future;
5use std::pin::Pin;
6use std::sync::{Arc, Mutex, MutexGuard};
7use std::task::{Context, Poll, Waker};
8
9use super::error::{ClientError, NotLeaderError};
10use super::serde::{NodeID, ReadRes, WriteRes};
11
12struct RastFutureState<T> {
13 finished: bool,
14 result: Option<Result<T, ClientError>>,
15 waker: Option<Waker>,
16}
17
18impl<T: fmt::Debug> fmt::Debug for RastFutureState<T> {
19 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
20 write!(f, "{:?}", self.result)
21 }
22}
23
24#[derive(Debug, Clone)]
25struct RastFuture<T> {
26 state: Arc<Mutex<RastFutureState<T>>>,
27}
28
29impl<T> RastFuture<T> {
30 fn new() -> RastFuture<T> {
31 RastFuture {
32 state: Arc::new(Mutex::new(RastFutureState { finished: false, result: None, waker: None })),
33 }
34 }
35
36 fn fill(&mut self, result: Result<T, ClientError>) {
37 if let Ok(mut state) = self.state.lock() {
39 debug_assert_eq!(state.finished, false);
40 state.finished = true;
41 state.result = Some(result);
42 state.waker.iter_mut().for_each(|waker| waker.wake_by_ref());
43 }
44 }
45
46 fn poll(&self, cx: &mut Context) -> Poll<Result<T, ClientError>> {
47 let mut state: MutexGuard<RastFutureState<T>> = match self.state.lock() {
48 Ok(guard) => guard,
49 Err(_) => {
50 return Poll::Ready(Err(ClientError::NotLeaderError(NotLeaderError::new(Some(NodeID(0))))));
52 }
53 };
54 if let Some(result) = state.result.take() {
58 Poll::Ready(result)
59 } else {
60 state.waker = Some(cx.waker().clone());
61 Poll::Pending
62 }
63 }
64}
65
66#[derive(Debug, Clone)]
68pub struct WriteFuture {
69 f: RastFuture<WriteRes>,
70}
71
72impl WriteFuture {
73 pub fn new() -> WriteFuture {
75 WriteFuture { f: RastFuture::new() }
76 }
77 pub(crate) fn fill(&mut self, result: Result<WriteRes, ClientError>) {
78 self.f.fill(result)
79 }
80}
81
82impl Future for WriteFuture {
83 type Output = Result<WriteRes, ClientError>;
84 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
85 self.f.poll(cx)
86 }
87}
88
89#[derive(Debug, Clone)]
91pub struct ReadFuture {
92 f: RastFuture<ReadRes>,
93}
94
95impl ReadFuture {
96 pub fn new() -> ReadFuture {
98 ReadFuture { f: RastFuture::new() }
99 }
100 pub(crate) fn fill(&mut self, result: Result<ReadRes, ClientError>) {
101 self.f.fill(result)
102 }
103}
104
105impl Future for ReadFuture {
106 type Output = Result<ReadRes, ClientError>;
107 fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
108 self.f.poll(cx)
109 }
110}