1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
use std::{
    error::Error as StdError,
    fmt::{self, Display, Formatter},
    task::Poll,
};

use futures_util::{Future, FutureExt};
use tokio::sync::oneshot::{self, Receiver as OneshotReceiver};
use tower::Service;

use crate::{common::transport::mock::MockSender, request::BoxRequest, response::BoxResponse};

use super::TransportError;

/// A client that uses a channel to send requests to a (possibly)
/// mock server.
#[derive(Clone)]
pub struct Mock {
    tx: MockSender,
}

impl Mock {
    /// Create a new mock client.
    pub fn new(tx: MockSender) -> Self {
        Self { tx }
    }
}

impl Service<BoxRequest> for Mock {
    type Response = BoxResponse;

    type Error = TransportError<MockError>;

    type Future = MockCallFuture;

    fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
        Ok(()).into()
    }

    fn call(&mut self, req: BoxRequest) -> Self::Future {
        let (resp_tx, resp_rx) = oneshot::channel();
        let send_res = self
            .tx
            .inner
            .send((req, resp_tx))
            .map_err(|err| MockError::Send(err.0 .0));

        match send_res {
            Ok(_) => MockCallFuture {
                inner: MockCallFutureInner::Recv(resp_rx),
            },
            Err(err) => MockCallFuture {
                inner: MockCallFutureInner::Err(Some(err)),
            },
        }
    }
}

enum MockCallFutureInner {
    Recv(OneshotReceiver<BoxResponse>),
    Err(Option<MockError>),
}

/// Future used by [`Mock`].
pub struct MockCallFuture {
    inner: MockCallFutureInner,
}

impl Future for MockCallFuture {
    type Output = Result<BoxResponse, TransportError<MockError>>;

    fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
        match &mut self.get_mut().inner {
            MockCallFutureInner::Err(err) => Poll::Ready(Err(TransportError::Transport(
                err.take().expect("future polled after completion"),
            ))),
            MockCallFutureInner::Recv(rx) => rx
                .poll_unpin(cx)
                .map_err(|_| MockError::Receive)
                .map_err(TransportError::Transport),
        }
    }
}

/// Errors this client can return.
#[derive(Debug)]
pub enum MockError {
    /// Occurs if receiving a response fails. Only happens if sender end of the
    /// channel is dropped.
    Receive,
    /// Occurs if sending a request fails. Only happens if receiver end of the
    /// channel is dropped.
    Send(BoxRequest),
}

impl Display for MockError {
    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
        match self {
            MockError::Receive => f.write_str("failed to receive response"),
            MockError::Send(_) => f.write_str("failed to send request"),
        }
    }
}

impl StdError for MockError {}