tonic_server_dispatch/
sync.rs1#[macro_export]
13macro_rules! dispatch_service_sync {
14 (
15 $service:ty,
16 $hash_by:ident : $hash_type:ty,
17
18 [ $(
19 $shard_method:ident ($shard_request:ty) -> $shard_reply:ty,
20 )* ],
21
22 [ $(
23 $mutable_method:ident ($mutable_request:ty) -> $mutable_reply:ty,
24 )* ],
25
26 [ $(
27 $readonly_method:ident ($readonly_request:ty) -> $readonly_reply:ty,
28 )* ]
29 ) => {
30
31 tonic_server_dispatch::_define_dispatch_server!(
35 $service,
36 $hash_by: $hash_type,
37
38 std::sync::mpsc::SyncSender,
39
40 [ $(
41 $shard_method ($shard_request) -> $shard_reply,
42 )* ],
43
44 [ $(
45 $mutable_method ($mutable_request) -> $mutable_reply,
46 )* ],
47
48 [ $(
49 $readonly_method ($readonly_request) -> $readonly_reply,
50 )* ]
51 );
52
53 paste::paste! {
54
55 trait DispatchBackendShard {
72 type Item: DispatchBackendItem;
74 fn get_item(&self, key: &$hash_type) -> Result<&Self::Item, Status>;
75 fn get_item_mut(&mut self, key: &$hash_type) -> Result<&mut Self::Item, Status>;
76
77 $(
79 fn $shard_method(&mut self, request: $shard_request)
80 -> Result<$shard_reply, tonic::Status>;
81 )*
82 }
83 trait DispatchBackendItem {
84 $(
85 fn $mutable_method(&mut self, request: $mutable_request)
86 -> Result<$mutable_reply, tonic::Status>;
87 )*
88 $(
89 fn $readonly_method(&self, request: $readonly_request)
90 -> Result<$readonly_reply, tonic::Status>;
91 )*
92 }
93
94 enum DispatchRequest {
98 $(
99 [<$shard_method:camel>] ($shard_request, tokio::sync::oneshot::Sender<Result<$shard_reply, tonic::Status>>),
100 )*
101 $(
102 [<$mutable_method:camel>] ($mutable_request, tokio::sync::oneshot::Sender<Result<$mutable_reply, tonic::Status>>),
103 )*
104 $(
105 [<$readonly_method:camel>] ($readonly_request, tokio::sync::oneshot::Sender<Result<$readonly_reply, tonic::Status>>),
106 )*
107 }
108
109 impl DispatchRequest {
110 fn handle_and_reply<B>(self, ctx: &mut B)
111 where B: DispatchBackendShard + Send + Sync + 'static
112 {
113 match self {
114 $(
115 DispatchRequest::[<$shard_method:camel>](req, resp_tx) => {
116 let reply = ctx.$shard_method(req);
117 resp_tx.send(reply).unwrap();
118 }
119 )*
120 $(
121 DispatchRequest::[<$mutable_method:camel>](req, resp_tx) => {
122 let reply = match ctx.get_item_mut(&req.$hash_by) {
123 Ok(i) => i.$mutable_method(req),
124 Err(err) => Err(err),
125 };
126 resp_tx.send(reply).unwrap();
127 }
128 )*
129 $(
130 DispatchRequest::[<$readonly_method:camel>](req, resp_tx) => {
131 let reply = match ctx.get_item(&req.$hash_by) {
132 Ok(i) => i.$readonly_method(req),
133 Err(err) => Err(err),
134 };
135 resp_tx.send(reply).unwrap();
136 }
137 )*
138 }
139 }
140 }
141
142 #[allow(dead_code)]
147 fn start_simple_dispatch_backend<B>(backend: B, task_num: usize, channel_capacity: usize)
148 -> [<$service DispatchServer>]
149 where B: Clone + DispatchBackendShard + Send + Sync + 'static
150 {
151 fn backend_task<B>(mut backend: B, mut req_rx: std::sync::mpsc::Receiver<DispatchRequest>)
152 where B: DispatchBackendShard + Send + Sync + 'static
153 {
154 while let Ok(request) = req_rx.recv() {
155 request.handle_and_reply(&mut backend);
156 }
157 }
158
159 let mut req_txs = Vec::new();
160 for i in 0..task_num {
161 let (req_tx, req_rx) = std::sync::mpsc::sync_channel(channel_capacity);
162
163 let backend = backend.clone();
164 std::thread::Builder::new()
165 .name(format!("biz-worker-{i}"))
166 .spawn(|| backend_task::<B>(backend, req_rx))
167 .unwrap();
168
169 req_txs.push(req_tx);
170 }
171
172 [<$service DispatchServer>]::with_txs(req_txs)
173 }
174
175 } }
177}