tonic_server_dispatch/
common.rs1#[macro_export]
7macro_rules! _define_dispatch_server {
8 (
9 $service:ty,
10 $hash_by:ident : $hash_type:ty,
11
12 $mpsc_sender_type:ty,
13
14 [ $(
15 $shard_mutable_method:ident ($shard_mutable_request:ty) -> $shard_mutable_reply:ty,
16 )* ],
17
18 [ $(
19 $shard_readonly_method:ident ($shard_readonly_request:ty) -> $shard_readonly_reply:ty,
20 )* ],
21
22 [ $(
23 $item_mutable_method:ident ($item_mutable_request:ty) -> $item_mutable_reply:ty,
24 )* ],
25
26 [ $(
27 $item_readonly_method:ident ($item_readonly_request:ty) -> $item_readonly_reply:ty,
28 )* ]
29 ) => {
30
31 paste::paste! {
32
33 pub struct [<$service DispatchServer>] {
35 txs: Vec<$mpsc_sender_type<DispatchRequest>>,
36 }
37
38 impl [<$service DispatchServer>] {
39 fn with_txs(txs: Vec<$mpsc_sender_type<DispatchRequest>>) -> Self {
41 Self { txs }
42 }
43 }
44
45 #[tonic::async_trait]
49 impl $service for [<$service DispatchServer>] {
50 $(
51 async fn $shard_mutable_method(
52 &self,
53 request: tonic::Request<$shard_mutable_request>,
54 ) -> Result<tonic::Response<$shard_mutable_reply>, tonic::Status> {
55 tonic_server_dispatch::_service_method_body!($shard_mutable_method, self, request, $hash_by)
56 }
57 )*
58 $(
59 async fn $shard_readonly_method(
60 &self,
61 request: tonic::Request<$shard_readonly_request>,
62 ) -> Result<tonic::Response<$shard_readonly_reply>, tonic::Status> {
63 tonic_server_dispatch::_service_method_body!($shard_readonly_method, self, request, $hash_by)
64 }
65 )*
66 $(
67 async fn $item_mutable_method(
68 &self,
69 request: tonic::Request<$item_mutable_request>,
70 ) -> Result<tonic::Response<$item_mutable_reply>, tonic::Status> {
71 tonic_server_dispatch::_service_method_body!($item_mutable_method, self, request, $hash_by)
72 }
73 )*
74 $(
75 async fn $item_readonly_method(
76 &self,
77 request: tonic::Request<$item_readonly_request>,
78 ) -> Result<tonic::Response<$item_readonly_reply>, tonic::Status> {
79 tonic_server_dispatch::_service_method_body!($item_readonly_method, self, request, $hash_by)
80 }
81 )*
82 }
83
84 }
85 }
86}
87
88#[macro_export]
94macro_rules! _service_method_body {
95 ($method:ident, $self:ident, $request:expr, $hash_by:ident) => {
96 paste::paste! {
97 {
98 fn calc_hash(item: &impl std::hash::Hash) -> u64 {
99 use std::hash::Hasher;
100 let mut hasher = std::collections::hash_map::DefaultHasher::new();
101 item.hash(&mut hasher);
102 hasher.finish()
103 }
104
105 let request = $request.into_inner();
106
107 let shard = calc_hash(&request.$hash_by) as usize % $self.txs.len();
108
109 let (resp_tx, resp_rx) = tokio::sync::oneshot::channel();
110
111 let biz_req = DispatchRequest::[<$method:camel>](request, resp_tx);
112
113 match $self.txs[shard].try_send(biz_req) {
114 Ok(()) => resp_rx.await.unwrap().map(tonic::Response::new),
115 Err(_) => Err(tonic::Status::unavailable(String::new())),
116 }
117 }
118 }
119 };
120}