hrpc/server/transport/
mock.rs

1use std::convert::Infallible;
2
3use futures_util::{future::BoxFuture, SinkExt, StreamExt};
4use tower::Service;
5
6use crate::{
7    box_error,
8    client::transport::SocketChannels,
9    common::{socket::SocketMessage, transport::mock::MockReceiver},
10    response::BoxResponse,
11    server::{socket::SocketHandler, MakeRoutes},
12};
13
14use super::Transport;
15
16/// Mock transport for the server.
17pub struct Mock {
18    rx: MockReceiver,
19}
20
21impl Mock {
22    /// Create a new mock transport.
23    pub fn new(rx: MockReceiver) -> Self {
24        Self { rx }
25    }
26}
27
28impl Transport for Mock {
29    type Error = Infallible;
30
31    fn serve<S>(mut self, mk_routes: S) -> BoxFuture<'static, Result<(), Self::Error>>
32    where
33        S: MakeRoutes,
34    {
35        let mut svc = mk_routes
36            .into_make_service()
37            .call(())
38            .into_inner()
39            .unwrap()
40            .unwrap();
41
42        Box::pin(async move {
43            while let Some((req, sender)) = self.rx.inner.recv().await {
44                let fut = Service::call(&mut svc, req);
45
46                tokio::spawn(async move {
47                    let mut resp = fut.await.expect("cant fail");
48
49                    if let Some(socket_handler) = resp.extensions_mut().remove::<SocketHandler>() {
50                        let (client_tx, server_rx) =
51                            futures_channel::mpsc::unbounded::<SocketMessage>();
52                        let (server_tx, client_rx) = futures_channel::mpsc::unbounded();
53
54                        {
55                            let client_socket_chans = SocketChannels::new(
56                                Box::pin(client_tx.sink_map_err(box_error)),
57                                Box::pin(client_rx.map(Ok)),
58                            );
59
60                            let mut resp = BoxResponse::empty();
61                            resp.extensions_mut().insert(client_socket_chans);
62
63                            sender.send(resp).expect("sender dropped");
64                        }
65
66                        let fut = (socket_handler.inner)(
67                            Box::pin(server_rx.map(Ok)),
68                            Box::pin(server_tx.sink_map_err(box_error)),
69                        );
70
71                        tokio::spawn(fut);
72                    } else {
73                        sender.send(resp).expect("sender dropped");
74                    }
75                });
76            }
77
78            Ok(())
79        })
80    }
81}