tonic_server_dispatch/
sync.rs1#[macro_export]
13macro_rules! dispatch_service_sync {
14 (
15 $service:ty,
16 $hash_by:ident : $hash_type:ty,
17
18 [ $(
19 $shard_mutable_method:ident ($shard_mutable_request:ty) -> $shard_mutable_reply:ty,
20 )* ],
21
22 [ $(
23 $shard_readonly_method:ident ($shard_readonly_request:ty) -> $shard_readonly_reply:ty,
24 )* ],
25
26 [ $(
27 $item_mutable_method:ident ($item_mutable_request:ty) -> $item_mutable_reply:ty,
28 )* ],
29
30 [ $(
31 $item_readonly_method:ident ($item_readonly_request:ty) -> $item_readonly_reply:ty,
32 )* ]
33 ) => {
34
35 tonic_server_dispatch::_define_dispatch_server!(
39 $service,
40 $hash_by: $hash_type,
41
42 std::sync::mpsc::SyncSender,
43
44 [ $(
45 $shard_mutable_method ($shard_mutable_request) -> $shard_mutable_reply,
46 )* ],
47
48 [ $(
49 $shard_readonly_method ($shard_readonly_request) -> $shard_readonly_reply,
50 )* ],
51
52 [ $(
53 $item_mutable_method ($item_mutable_request) -> $item_mutable_reply,
54 )* ],
55
56 [ $(
57 $item_readonly_method ($item_readonly_request) -> $item_readonly_reply,
58 )* ]
59 );
60
61 paste::paste! {
62
63 trait DispatchBackendShard {
80 type Item: DispatchBackendItem;
82 fn get_item(&self, key: &$hash_type) -> Result<&Self::Item, Status>;
83 fn get_item_mut(&mut self, key: &$hash_type) -> Result<&mut Self::Item, Status>;
84
85 $(
87 fn $shard_mutable_method(&mut self, request: $shard_mutable_request)
88 -> Result<$shard_mutable_reply, tonic::Status>;
89 )*
90 $(
91 fn $shard_readonly_method(&self, request: $shard_readonly_request)
92 -> Result<$shard_readonly_reply, tonic::Status>;
93 )*
94 }
95 trait DispatchBackendItem {
96 $(
97 fn $item_mutable_method(&mut self, request: $item_mutable_request)
98 -> Result<$item_mutable_reply, tonic::Status>;
99 )*
100 $(
101 fn $item_readonly_method(&self, request: $item_readonly_request)
102 -> Result<$item_readonly_reply, tonic::Status>;
103 )*
104 }
105
106 enum DispatchRequest {
110 $(
111 [<$shard_mutable_method:camel>] ($shard_mutable_request, tokio::sync::oneshot::Sender<Result<$shard_mutable_reply, tonic::Status>>),
112 )*
113 $(
114 [<$shard_readonly_method:camel>] ($shard_readonly_request, tokio::sync::oneshot::Sender<Result<$shard_readonly_reply, tonic::Status>>),
115 )*
116 $(
117 [<$item_mutable_method:camel>] ($item_mutable_request, tokio::sync::oneshot::Sender<Result<$item_mutable_reply, tonic::Status>>),
118 )*
119 $(
120 [<$item_readonly_method:camel>] ($item_readonly_request, tokio::sync::oneshot::Sender<Result<$item_readonly_reply, tonic::Status>>),
121 )*
122 }
123
124 impl DispatchRequest {
125 fn handle_and_reply<B>(self, ctx: &mut B)
126 where B: DispatchBackendShard + Send + Sync + 'static
127 {
128 match self {
129 $(
130 DispatchRequest::[<$shard_mutable_method:camel>](req, resp_tx) => {
131 let reply = ctx.$shard_mutable_method(req);
132 resp_tx.send(reply).unwrap();
133 }
134 )*
135 $(
136 DispatchRequest::[<$shard_readonly_method:camel>](req, resp_tx) => {
137 let reply = ctx.$shard_readonly_method(req);
138 resp_tx.send(reply).unwrap();
139 }
140 )*
141 $(
142 DispatchRequest::[<$item_mutable_method:camel>](req, resp_tx) => {
143 let reply = match ctx.get_item_mut(&req.$hash_by) {
144 Ok(i) => i.$item_mutable_method(req),
145 Err(err) => Err(err),
146 };
147 resp_tx.send(reply).unwrap();
148 }
149 )*
150 $(
151 DispatchRequest::[<$item_readonly_method:camel>](req, resp_tx) => {
152 let reply = match ctx.get_item(&req.$hash_by) {
153 Ok(i) => i.$item_readonly_method(req),
154 Err(err) => Err(err),
155 };
156 resp_tx.send(reply).unwrap();
157 }
158 )*
159 }
160 }
161 }
162
163 #[allow(dead_code)]
168 fn start_simple_dispatch_backend<B>(backend: B, task_num: usize, channel_capacity: usize)
169 -> [<$service DispatchServer>]
170 where B: Clone + DispatchBackendShard + Send + Sync + 'static
171 {
172 fn backend_task<B>(mut backend: B, mut req_rx: std::sync::mpsc::Receiver<DispatchRequest>)
173 where B: DispatchBackendShard + Send + Sync + 'static
174 {
175 while let Ok(request) = req_rx.recv() {
176 request.handle_and_reply(&mut backend);
177 }
178 }
179
180 let mut req_txs = Vec::new();
181 for i in 0..task_num {
182 let (req_tx, req_rx) = std::sync::mpsc::sync_channel(channel_capacity);
183
184 let backend = backend.clone();
185 std::thread::Builder::new()
186 .name(format!("biz-worker-{i}"))
187 .spawn(|| backend_task::<B>(backend, req_rx))
188 .unwrap();
189
190 req_txs.push(req_tx);
191 }
192
193 [<$service DispatchServer>]::with_txs(req_txs)
194 }
195
196 } }
198}