tonic_server_dispatch/
common.rs

1/// internal macro
2///
3// define tonic Server: [<$service DispatchServer>]
4//
5// used by sync and async modes both
6#[macro_export]
7macro_rules! _define_dispatch_server {
8    (
9        $service:ty,
10        $hash_by:ident : $hash_type:ty,
11
12        $mpsc_sender_type:ty,
13
14        [ $(
15            $shard_mutable_method:ident ($shard_mutable_request:ty) -> $shard_mutable_reply:ty,
16        )* ],
17
18        [ $(
19            $shard_readonly_method:ident ($shard_readonly_request:ty) -> $shard_readonly_reply:ty,
20        )* ],
21
22        [ $(
23            $item_mutable_method:ident ($item_mutable_request:ty) -> $item_mutable_reply:ty,
24        )* ],
25
26        [ $(
27            $item_readonly_method:ident ($item_readonly_request:ty) -> $item_readonly_reply:ty,
28        )* ]
29    ) => {
30
31        paste::paste! {
32
33        // Context for the tonic server, used to dispatch requests.
34        pub struct [<$service DispatchServer>] {
35            txs: Vec<$mpsc_sender_type<DispatchRequest>>,
36        }
37
38        impl [<$service DispatchServer>] {
39            // create with txs
40            fn with_txs(txs: Vec<$mpsc_sender_type<DispatchRequest>>) -> Self {
41                Self { txs }
42            }
43        }
44
45        // The tonic server implementation.
46        //
47        // Dispatch the request to backend, and wait for the reply.
48        #[tonic::async_trait]
49        impl $service for [<$service DispatchServer>] {
50             $(
51                async fn $shard_mutable_method(
52                    &self,
53                    request: tonic::Request<$shard_mutable_request>,
54                ) -> Result<tonic::Response<$shard_mutable_reply>, tonic::Status> {
55                    tonic_server_dispatch::_service_method_body!($shard_mutable_method, self, request, $hash_by)
56                }
57             )*
58             $(
59                async fn $shard_readonly_method(
60                    &self,
61                    request: tonic::Request<$shard_readonly_request>,
62                ) -> Result<tonic::Response<$shard_readonly_reply>, tonic::Status> {
63                    tonic_server_dispatch::_service_method_body!($shard_readonly_method, self, request, $hash_by)
64                }
65             )*
66             $(
67                async fn $item_mutable_method(
68                    &self,
69                    request: tonic::Request<$item_mutable_request>,
70                ) -> Result<tonic::Response<$item_mutable_reply>, tonic::Status> {
71                    tonic_server_dispatch::_service_method_body!($item_mutable_method, self, request, $hash_by)
72                }
73            )*
74            $(
75                async fn $item_readonly_method(
76                    &self,
77                    request: tonic::Request<$item_readonly_request>,
78                ) -> Result<tonic::Response<$item_readonly_reply>, tonic::Status> {
79                    tonic_server_dispatch::_service_method_body!($item_readonly_method, self, request, $hash_by)
80                }
81            )*
82        }
83
84        }
85    }
86}
87
88/// internal macro
89///
90// service method body, for shard_method, mutable_method and readonly_method all.
91//
92// dispatch requests to backends and wait for repsponse.
93#[macro_export]
94macro_rules! _service_method_body {
95    ($method:ident, $self:ident, $request:expr, $hash_by:ident) => {
96        paste::paste! {
97            {
98                fn calc_hash(item: &impl std::hash::Hash) -> u64 {
99                    use std::hash::Hasher;
100                    let mut hasher = std::collections::hash_map::DefaultHasher::new();
101                    item.hash(&mut hasher);
102                    hasher.finish()
103                }
104
105                let request = $request.into_inner();
106
107                let shard = calc_hash(&request.$hash_by) as usize % $self.txs.len();
108
109                let (resp_tx, resp_rx) = tokio::sync::oneshot::channel();
110
111                let biz_req = DispatchRequest::[<$method:camel>](request, resp_tx);
112
113                match $self.txs[shard].try_send(biz_req) {
114                    Ok(()) => resp_rx.await.unwrap().map(tonic::Response::new),
115                    Err(_) => Err(tonic::Status::unavailable(String::new())),
116                }
117            }
118        }
119    };
120}