hrpc/server/transport/
mock.rs1use std::convert::Infallible;
2
3use futures_util::{future::BoxFuture, SinkExt, StreamExt};
4use tower::Service;
5
6use crate::{
7 box_error,
8 client::transport::SocketChannels,
9 common::{socket::SocketMessage, transport::mock::MockReceiver},
10 response::BoxResponse,
11 server::{socket::SocketHandler, MakeRoutes},
12};
13
14use super::Transport;
15
16pub struct Mock {
18 rx: MockReceiver,
19}
20
21impl Mock {
22 pub fn new(rx: MockReceiver) -> Self {
24 Self { rx }
25 }
26}
27
28impl Transport for Mock {
29 type Error = Infallible;
30
31 fn serve<S>(mut self, mk_routes: S) -> BoxFuture<'static, Result<(), Self::Error>>
32 where
33 S: MakeRoutes,
34 {
35 let mut svc = mk_routes
36 .into_make_service()
37 .call(())
38 .into_inner()
39 .unwrap()
40 .unwrap();
41
42 Box::pin(async move {
43 while let Some((req, sender)) = self.rx.inner.recv().await {
44 let fut = Service::call(&mut svc, req);
45
46 tokio::spawn(async move {
47 let mut resp = fut.await.expect("cant fail");
48
49 if let Some(socket_handler) = resp.extensions_mut().remove::<SocketHandler>() {
50 let (client_tx, server_rx) =
51 futures_channel::mpsc::unbounded::<SocketMessage>();
52 let (server_tx, client_rx) = futures_channel::mpsc::unbounded();
53
54 {
55 let client_socket_chans = SocketChannels::new(
56 Box::pin(client_tx.sink_map_err(box_error)),
57 Box::pin(client_rx.map(Ok)),
58 );
59
60 let mut resp = BoxResponse::empty();
61 resp.extensions_mut().insert(client_socket_chans);
62
63 sender.send(resp).expect("sender dropped");
64 }
65
66 let fut = (socket_handler.inner)(
67 Box::pin(server_rx.map(Ok)),
68 Box::pin(server_tx.sink_map_err(box_error)),
69 );
70
71 tokio::spawn(fut);
72 } else {
73 sender.send(resp).expect("sender dropped");
74 }
75 });
76 }
77
78 Ok(())
79 })
80 }
81}