tonic_server_dispatch/
common.rs

1/// internal macro
2///
3// define tonic Server: [<$service DispatchServer>]
4//
5// used by sync and async modes both
6#[macro_export]
7macro_rules! _define_dispatch_server {
8    (
9        $service:ty,
10        $hash_by:ident : $hash_type:ty,
11
12        $mpsc_sender_type:ty,
13
14        [ $(
15            $shard_method:ident ($shard_request:ty) -> $shard_reply:ty,
16        )* ],
17
18        [ $(
19            $mutable_method:ident ($mutable_request:ty) -> $mutable_reply:ty,
20        )* ],
21
22        [ $(
23            $readonly_method:ident ($readonly_request:ty) -> $readonly_reply:ty,
24        )* ]
25    ) => {
26
27        paste::paste! {
28
29        // Context for the tonic server, used to dispatch requests.
30        pub struct [<$service DispatchServer>] {
31            txs: Vec<$mpsc_sender_type<DispatchRequest>>,
32        }
33
34        impl [<$service DispatchServer>] {
35            // create with txs
36            fn with_txs(txs: Vec<$mpsc_sender_type<DispatchRequest>>) -> Self {
37                Self { txs }
38            }
39        }
40
41        // The tonic server implementation.
42        //
43        // Dispatch the request to backend, and wait for the reply.
44        #[tonic::async_trait]
45        impl $service for [<$service DispatchServer>] {
46             $(
47                async fn $shard_method(
48                    &self,
49                    request: tonic::Request<$shard_request>,
50                ) -> Result<tonic::Response<$shard_reply>, tonic::Status> {
51                    tonic_server_dispatch::_service_method_body!($shard_method, self, request, $hash_by)
52                }
53             )*
54             $(
55                async fn $mutable_method(
56                    &self,
57                    request: tonic::Request<$mutable_request>,
58                ) -> Result<tonic::Response<$mutable_reply>, tonic::Status> {
59                    tonic_server_dispatch::_service_method_body!($mutable_method, self, request, $hash_by)
60                }
61            )*
62            $(
63                async fn $readonly_method(
64                    &self,
65                    request: tonic::Request<$readonly_request>,
66                ) -> Result<tonic::Response<$readonly_reply>, tonic::Status> {
67                    tonic_server_dispatch::_service_method_body!($readonly_method, self, request, $hash_by)
68                }
69            )*
70        }
71
72        }
73    }
74}
75
76/// internal macro
77///
78// service method body, for shard_method, mutable_method and readonly_method all.
79//
80// dispatch requests to backends and wait for repsponse.
81#[macro_export]
82macro_rules! _service_method_body {
83    ($method:ident, $self:ident, $request:expr, $hash_by:ident) => {
84        paste::paste! {
85            {
86                fn calc_hash(item: &impl std::hash::Hash) -> u64 {
87                    use std::hash::Hasher;
88                    let mut hasher = std::collections::hash_map::DefaultHasher::new();
89                    item.hash(&mut hasher);
90                    hasher.finish()
91                }
92
93                let request = $request.into_inner();
94
95                let shard = calc_hash(&request.$hash_by) as usize % $self.txs.len();
96
97                let (resp_tx, resp_rx) = tokio::sync::oneshot::channel();
98
99                let biz_req = DispatchRequest::[<$method:camel>](request, resp_tx);
100
101                match $self.txs[shard].try_send(biz_req) {
102                    Ok(()) => resp_rx.await.unwrap().map(tonic::Response::new),
103                    Err(_) => Err(tonic::Status::unavailable(String::new())),
104                }
105            }
106        }
107    }
108}