Skip to main content

rast/
future.rs

1// Copyright 2020 Daniel Harrison. All Rights Reserved.
2
3use std::fmt;
4use std::future::Future;
5use std::pin::Pin;
6use std::sync::{Arc, Mutex, MutexGuard};
7use std::task::{Context, Poll, Waker};
8
9use super::error::{ClientError, NotLeaderError};
10use super::serde::{NodeID, ReadRes, WriteRes};
11
12struct RastFutureState<T> {
13  finished: bool,
14  result: Option<Result<T, ClientError>>,
15  waker: Option<Waker>,
16}
17
18impl<T: fmt::Debug> fmt::Debug for RastFutureState<T> {
19  fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
20    write!(f, "{:?}", self.result)
21  }
22}
23
24#[derive(Debug, Clone)]
25struct RastFuture<T> {
26  state: Arc<Mutex<RastFutureState<T>>>,
27}
28
29impl<T> RastFuture<T> {
30  fn new() -> RastFuture<T> {
31    RastFuture {
32      state: Arc::new(Mutex::new(RastFutureState { finished: false, result: None, waker: None })),
33    }
34  }
35
36  fn fill(&mut self, result: Result<T, ClientError>) {
37    // TODO: what should we do if the lock is poisoned?
38    if let Ok(mut state) = self.state.lock() {
39      debug_assert_eq!(state.finished, false);
40      state.finished = true;
41      state.result = Some(result);
42      state.waker.iter_mut().for_each(|waker| waker.wake_by_ref());
43    }
44  }
45
46  fn poll(&self, cx: &mut Context) -> Poll<Result<T, ClientError>> {
47    let mut state: MutexGuard<RastFutureState<T>> = match self.state.lock() {
48      Ok(guard) => guard,
49      Err(_) => {
50        // TODO: this isn't the right error but close enough for now
51        return Poll::Ready(Err(ClientError::NotLeaderError(NotLeaderError::new(Some(NodeID(0))))));
52      }
53    };
54    // TODO: this `take()` is technically correct since the Future api requires
55    // that poll never be called once it's returned Ready, but it makes me
56    // uncomfortable
57    if let Some(result) = state.result.take() {
58      Poll::Ready(result)
59    } else {
60      state.waker = Some(cx.waker().clone());
61      Poll::Pending
62    }
63  }
64}
65
66/// A [`Future`](std::future::Future) resolved with the result of a user write.
67#[derive(Debug, Clone)]
68pub struct WriteFuture {
69  f: RastFuture<WriteRes>,
70}
71
72impl WriteFuture {
73  /// Returns a new, empty `WriteFuture`.
74  pub fn new() -> WriteFuture {
75    WriteFuture { f: RastFuture::new() }
76  }
77  pub(crate) fn fill(&mut self, result: Result<WriteRes, ClientError>) {
78    self.f.fill(result)
79  }
80}
81
82impl Future for WriteFuture {
83  type Output = Result<WriteRes, ClientError>;
84  fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
85    self.f.poll(cx)
86  }
87}
88
89/// A [`Future`](std::future::Future) resolved with the result of a user read.
90#[derive(Debug, Clone)]
91pub struct ReadFuture {
92  f: RastFuture<ReadRes>,
93}
94
95impl ReadFuture {
96  /// Returns a new, empty `ReadFuture`.
97  pub fn new() -> ReadFuture {
98    ReadFuture { f: RastFuture::new() }
99  }
100  pub(crate) fn fill(&mut self, result: Result<ReadRes, ClientError>) {
101    self.f.fill(result)
102  }
103}
104
105impl Future for ReadFuture {
106  type Output = Result<ReadRes, ClientError>;
107  fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
108    self.f.poll(cx)
109  }
110}