tonic_server_dispatch/
common.rs1#[macro_export]
7macro_rules! _define_dispatch_server {
8 (
9 $service:ty,
10 $hash_by:ident : $hash_type:ty,
11
12 $mpsc_sender_type:ty,
13
14 [ $(
15 $shard_method:ident ($shard_request:ty) -> $shard_reply:ty,
16 )* ],
17
18 [ $(
19 $mutable_method:ident ($mutable_request:ty) -> $mutable_reply:ty,
20 )* ],
21
22 [ $(
23 $readonly_method:ident ($readonly_request:ty) -> $readonly_reply:ty,
24 )* ]
25 ) => {
26
27 paste::paste! {
28
29 pub struct [<$service DispatchServer>] {
31 txs: Vec<$mpsc_sender_type<DispatchRequest>>,
32 }
33
34 impl [<$service DispatchServer>] {
35 fn with_txs(txs: Vec<$mpsc_sender_type<DispatchRequest>>) -> Self {
37 Self { txs }
38 }
39 }
40
41 #[tonic::async_trait]
45 impl $service for [<$service DispatchServer>] {
46 $(
47 async fn $shard_method(
48 &self,
49 request: tonic::Request<$shard_request>,
50 ) -> Result<tonic::Response<$shard_reply>, tonic::Status> {
51 tonic_server_dispatch::_service_method_body!($shard_method, self, request, $hash_by)
52 }
53 )*
54 $(
55 async fn $mutable_method(
56 &self,
57 request: tonic::Request<$mutable_request>,
58 ) -> Result<tonic::Response<$mutable_reply>, tonic::Status> {
59 tonic_server_dispatch::_service_method_body!($mutable_method, self, request, $hash_by)
60 }
61 )*
62 $(
63 async fn $readonly_method(
64 &self,
65 request: tonic::Request<$readonly_request>,
66 ) -> Result<tonic::Response<$readonly_reply>, tonic::Status> {
67 tonic_server_dispatch::_service_method_body!($readonly_method, self, request, $hash_by)
68 }
69 )*
70 }
71
72 }
73 }
74}
75
76#[macro_export]
82macro_rules! _service_method_body {
83 ($method:ident, $self:ident, $request:expr, $hash_by:ident) => {
84 paste::paste! {
85 {
86 fn calc_hash(item: &impl std::hash::Hash) -> u64 {
87 use std::hash::Hasher;
88 let mut hasher = std::collections::hash_map::DefaultHasher::new();
89 item.hash(&mut hasher);
90 hasher.finish()
91 }
92
93 let request = $request.into_inner();
94
95 let shard = calc_hash(&request.$hash_by) as usize % $self.txs.len();
96
97 let (resp_tx, resp_rx) = tokio::sync::oneshot::channel();
98
99 let biz_req = DispatchRequest::[<$method:camel>](request, resp_tx);
100
101 match $self.txs[shard].try_send(biz_req) {
102 Ok(()) => resp_rx.await.unwrap().map(tonic::Response::new),
103 Err(_) => Err(tonic::Status::unavailable(String::new())),
104 }
105 }
106 }
107 }
108}