1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
// Copyright 2020 Daniel Harrison. All Rights Reserved.

use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex, MutexGuard};
use std::task::{Context, Poll, Waker};

use super::error::{ClientError, NotLeaderError};
use super::serde::{NodeID, ReadRes, WriteRes};

struct RastFutureState<T> {
  finished: bool,
  result: Option<Result<T, ClientError>>,
  waker: Option<Waker>,
}

impl<T: fmt::Debug> fmt::Debug for RastFutureState<T> {
  fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
    write!(f, "{:?}", self.result)
  }
}

#[derive(Debug, Clone)]
struct RastFuture<T> {
  state: Arc<Mutex<RastFutureState<T>>>,
}

impl<T> RastFuture<T> {
  fn new() -> RastFuture<T> {
    RastFuture {
      state: Arc::new(Mutex::new(RastFutureState { finished: false, result: None, waker: None })),
    }
  }

  fn fill(&mut self, result: Result<T, ClientError>) {
    // TODO: what should we do if the lock is poisoned?
    if let Ok(mut state) = self.state.lock() {
      debug_assert_eq!(state.finished, false);
      state.finished = true;
      state.result = Some(result);
      state.waker.iter_mut().for_each(|waker| waker.wake_by_ref());
    }
  }

  fn poll(&self, cx: &mut Context) -> Poll<Result<T, ClientError>> {
    let mut state: MutexGuard<RastFutureState<T>> = match self.state.lock() {
      Ok(guard) => guard,
      Err(_) => {
        // TODO: this isn't the right error but close enough for now
        return Poll::Ready(Err(ClientError::NotLeaderError(NotLeaderError::new(Some(NodeID(0))))));
      }
    };
    // TODO: this `take()` is technically correct since the Future api requires
    // that poll never be called once it's returned Ready, but it makes me
    // uncomfortable
    if let Some(result) = state.result.take() {
      Poll::Ready(result)
    } else {
      state.waker = Some(cx.waker().clone());
      Poll::Pending
    }
  }
}

/// A [`Future`](std::future::Future) resolved with the result of a user write.
#[derive(Debug, Clone)]
pub struct WriteFuture {
  f: RastFuture<WriteRes>,
}

impl WriteFuture {
  /// Returns a new, empty `WriteFuture`.
  pub fn new() -> WriteFuture {
    WriteFuture { f: RastFuture::new() }
  }
  pub(crate) fn fill(&mut self, result: Result<WriteRes, ClientError>) {
    self.f.fill(result)
  }
}

impl Future for WriteFuture {
  type Output = Result<WriteRes, ClientError>;
  fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
    self.f.poll(cx)
  }
}

/// A [`Future`](std::future::Future) resolved with the result of a user read.
#[derive(Debug, Clone)]
pub struct ReadFuture {
  f: RastFuture<ReadRes>,
}

impl ReadFuture {
  /// Returns a new, empty `ReadFuture`.
  pub fn new() -> ReadFuture {
    ReadFuture { f: RastFuture::new() }
  }
  pub(crate) fn fill(&mut self, result: Result<ReadRes, ClientError>) {
    self.f.fill(result)
  }
}

impl Future for ReadFuture {
  type Output = Result<ReadRes, ClientError>;
  fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
    self.f.poll(cx)
  }
}