1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
use std::{
    error::Error as StdError,
    fmt::{self, Display, Formatter},
    task::Poll,
};

use futures_util::{Future, FutureExt};
use tokio::sync::oneshot::{self, Receiver as OneshotReceiver};
use tower::Service;

use crate::{common::transport::mock::MockSender, request::BoxRequest, response::BoxResponse};

use super::TransportError;

/// A client that uses a channel to send requests to a (possibly)
/// mock server.
#[derive(Clone)]
pub struct Mock {
    tx: MockSender,
}

impl Mock {
    /// Create a new mock client.
    pub fn new(tx: MockSender) -> Self {
        Self { tx }
    }
}

impl Service<BoxRequest> for Mock {
    type Response = BoxResponse;

    type Error = TransportError<MockError>;

    type Future = MockCallFuture;

    fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
        Ok(()).into()
    }

    fn call(&mut self, req: BoxRequest) -> Self::Future {
        let (resp_tx, resp_rx) = oneshot::channel();
        let send_res = self
            .tx
            .inner
            .send((req, resp_tx))
            .map_err(|err| MockError::Send(err.0 .0));

        match send_res {
            Ok(_) => MockCallFuture {
                inner: MockCallFutureInner::Recv(resp_rx),
            },
            Err(err) => MockCallFuture {
                inner: MockCallFutureInner::Err(Some(err)),
            },
        }
    }
}

enum MockCallFutureInner {
    Recv(OneshotReceiver<BoxResponse>),
    Err(Option<MockError>),
}

/// Future used by [`Mock`].
pub struct MockCallFuture {
    inner: MockCallFutureInner,
}

impl Future for MockCallFuture {
    type Output = Result<BoxResponse, TransportError<MockError>>;

    fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
        match &mut self.get_mut().inner {
            MockCallFutureInner::Err(err) => Poll::Ready(Err(TransportError::Transport(
                err.take().expect("future polled after completion"),
            ))),
            MockCallFutureInner::Recv(rx) => rx
                .poll_unpin(cx)
                .map_err(|_| MockError::Receive)
                .map_err(TransportError::Transport),
        }
    }
}

/// Errors this client can return.
#[derive(Debug)]
pub enum MockError {
    /// Occurs if receiving a response fails. Only happens if sender end of the
    /// channel is dropped.
    Receive,
    /// Occurs if sending a request fails. Only happens if receiver end of the
    /// channel is dropped.
    Send(BoxRequest),
}

impl Display for MockError {
    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
        match self {
            MockError::Receive => f.write_str("failed to receive response"),
            MockError::Send(_) => f.write_str("failed to send request"),
        }
    }
}

impl StdError for MockError {}