tonic_server_dispatch/async.rs
1/// Define the service and build the mapping relationship between tonic
2/// network tasks and your asynchronous business tasks.
3///
4/// Use `dispatch_service_sync!` instead for synchronous mode.
5/// See [the module-level documentation](super) for more information
6/// about the 2 modes.
7///
8/// Parameters:
9///
10/// - `$service` Original service name. Because we need to generate new
11/// service name based on this name, so do not give the module prefix.
12///
13/// - `$hash_by: $hash_type` The field in request types which is used
14/// to calculate which business task to dispatched to. All request
15/// types should contain this field.
16///
17/// - `$shard_mutable_method ($shard_mutable_request) -> $shard_mutable_reply`
18/// gRPC methods that work on mutable shard (but not on item). E.g. create
19/// or remove items on the shard.
20///
21/// - `$shard_readonly_method ($shard_readonly_request) -> $shard_readonly_reply`
22/// gRPC methods that work on readonly shard (but not on item). E.g. list
23/// items on the shard.
24///
25/// - `$item_mutable_method ($item_mutable_request) -> $item_mutable_reply`
26/// gRPC mutable methods that work on item. E.g. update item itself.
27///
28/// - `$item_readonly_method ($item_readonly_request) -> $item_readonly_reply`
29/// gRPC readonly methods that work on item. E.g. query item itself.
30///
31///
32/// This macro defines 4 items:
33///
34/// - `trait DispatchBackendShard` is for each backend shard. You
35/// need to implement this trait for your shard context. It has 2 parts:
36/// 1. associated type Item, and get_item/get_item_mut methods;
37/// 2. gRPC methods that works at shard (but not item), e.g. create/delete.
38///
39/// - `trait DispatchBackendItem` is for each backend item. It has
40/// mutable and readonly gRPC methods that works at item. You
41/// need to implement this trait for your item.
42///
43/// The formats of all methods are similar to the original tonic ones,
44/// except that changes
45/// - self: from `&self` to `&mut self` (for mutable methods)
46/// - parameter: from `Request<R>` to `R`
47/// - retuen value: from `Response<R>` to `R`
48///
49/// However the meaning of `self` changes. For the original tonic methods,
50/// the `self` points to a global service context. While here, for shard
51/// methods the `self` points to a context for each shard, and for
52/// item mutable/readonly methods the `self` points to the item.
53///
54/// - `fn start_simple_dispatch_backend` This starts a simple kind of
55/// backend tasks, which just listen on the request channel.
56/// If you want more complex backend task (e.g. listen on another
57/// channel too), you have to create tasks and channels youself.
58///
59/// - `struct [<$service DispatchServer>]` This defines the real tonic
60/// service, and this macro implement it automatically. If you use
61/// the `start_simple_dispatch_backend` which handles this struct
62/// already, then you do not need to touch this. But if you need to
63/// build backend tasks yourself, then you need to create channels
64/// and this struct with their `Sender` ends by its `with_txs()`
65/// method. See `start_simple_dispatch_backend()`'s code for example.
66///
67/// Read the [DictService] example's source code for a better understanding.
68///
69/// [DictService]: https://github.com/WuBingzheng/tonic-server-dispatch/blob/master/examples/src/server_async.rs
70///
71#[macro_export]
72macro_rules! dispatch_service_async {
73 (
74 $service:ty,
75 $hash_by:ident : $hash_type:ty,
76
77 [ $(
78 $shard_mutable_method:ident ($shard_mutable_request:ty) -> $shard_mutable_reply:ty,
79 )* ],
80
81 [ $(
82 $shard_readonly_method:ident ($shard_readonly_request:ty) -> $shard_readonly_reply:ty,
83 )* ],
84
85 [ $(
86 $item_mutable_method:ident ($item_mutable_request:ty) -> $item_mutable_reply:ty,
87 )* ],
88
89 [ $(
90 $item_readonly_method:ident ($item_readonly_request:ty) -> $item_readonly_reply:ty,
91 )* ]
92 ) => {
93
94 // define tonic Server: [<$service DispatchServer>]
95 //
96 // this part is same for sync and async modes.
97 tonic_server_dispatch::_define_dispatch_server!(
98 $service,
99 $hash_by: $hash_type,
100
101 tokio::sync::mpsc::Sender,
102
103 [ $(
104 $shard_mutable_method ($shard_mutable_request) -> $shard_mutable_reply,
105 )* ],
106
107 [ $(
108 $shard_readonly_method ($shard_readonly_request) -> $shard_readonly_reply,
109 )* ],
110
111 [ $(
112 $item_mutable_method ($item_mutable_request) -> $item_mutable_reply,
113 )* ],
114
115 [ $(
116 $item_readonly_method ($item_readonly_request) -> $item_readonly_reply,
117 )* ]
118 );
119
120 paste::paste! {
121
122 // 2 traits for backend business context: Shard and Item.
123 //
124 // DispatchBackendShard is for each backend shard. It has 2 parts:
125 // 1. associated type Item, and get_item/get_item_mut methods;
126 // 2. gRPC methods that works at shard (but not item), e.g. create/delete/list.
127 //
128 // DispatchBackendItem is for each backend item. It only has
129 // gRPC methods that works at item.
130 //
131 // The formats of all methods are similar to the original tonic ones,
132 // except that changes
133 // - self: from `&self` to `mut &self` (mutable methods)
134 // - parameter: from `Request<R>` to `R`
135 // - retuen value: from `Response<R>` to `R`
136 // ```
137 trait DispatchBackendShard {
138 // part-1
139 type Item: DispatchBackendItem + Send + Sync;
140 fn get_item(&self, key: &$hash_type) -> Result<&Self::Item, Status>;
141 fn get_item_mut(&mut self, key: &$hash_type) -> Result<&mut Self::Item, Status>;
142
143 // part-2
144 $(
145 fn $shard_mutable_method(&mut self, request: $shard_mutable_request)
146 -> impl std::future::Future<Output = Result<$shard_mutable_reply, tonic::Status>> + Send;
147 )*
148 $(
149 fn $shard_readonly_method(&self, request: $shard_readonly_request)
150 -> impl std::future::Future<Output = Result<$shard_readonly_reply, tonic::Status>> + Send;
151 )*
152 }
153 trait DispatchBackendItem {
154 $(
155 fn $item_mutable_method(&mut self, request: $item_mutable_request)
156 -> impl std::future::Future<Output = Result<$item_mutable_reply, tonic::Status>> + Send;
157 )*
158 $(
159 fn $item_readonly_method(&self, request: $item_readonly_request)
160 -> impl std::future::Future<Output = Result<$item_readonly_reply, tonic::Status>> + Send;
161 )*
162 }
163
164 // Dispatched request.
165 //
166 // This is an internal type. You would not need to know this.
167 enum DispatchRequest {
168 $(
169 [<$shard_mutable_method:camel>] ($shard_mutable_request, tokio::sync::oneshot::Sender<Result<$shard_mutable_reply, tonic::Status>>),
170 )*
171 $(
172 [<$shard_readonly_method:camel>] ($shard_readonly_request, tokio::sync::oneshot::Sender<Result<$shard_readonly_reply, tonic::Status>>),
173 )*
174 $(
175 [<$item_mutable_method:camel>] ($item_mutable_request, tokio::sync::oneshot::Sender<Result<$item_mutable_reply, tonic::Status>>),
176 )*
177 $(
178 [<$item_readonly_method:camel>] ($item_readonly_request, tokio::sync::oneshot::Sender<Result<$item_readonly_reply, tonic::Status>>),
179 )*
180 }
181
182 impl DispatchRequest {
183 async fn handle_and_reply<B>(self, ctx: &mut B)
184 where B: DispatchBackendShard + Send + Sync + 'static
185 {
186 match self {
187 $(
188 DispatchRequest::[<$shard_mutable_method:camel>](req, resp_tx) => {
189 let reply = ctx.$shard_mutable_method(req).await;
190 resp_tx.send(reply).unwrap();
191 }
192 )*
193 $(
194 DispatchRequest::[<$shard_readonly_method:camel>](req, resp_tx) => {
195 let reply = ctx.$shard_readonly_method(req).await;
196 resp_tx.send(reply).unwrap();
197 }
198 )*
199 $(
200 DispatchRequest::[<$item_mutable_method:camel>](req, resp_tx) => {
201 let reply = match ctx.get_item_mut(&req.$hash_by) {
202 Ok(i) => i.$item_mutable_method(req).await,
203 Err(err) => Err(err),
204 };
205 resp_tx.send(reply).unwrap();
206 }
207 )*
208 $(
209 DispatchRequest::[<$item_readonly_method:camel>](req, resp_tx) => {
210 let reply = match ctx.get_item(&req.$hash_by) {
211 Ok(i) => i.$item_readonly_method(req).await,
212 Err(err) => Err(err),
213 };
214 resp_tx.send(reply).unwrap();
215 }
216 )*
217 }
218 }
219 }
220
221 // Start a simple backend service.
222 //
223 // You need to write your own code if any more feature, for example
224 // the backend task need to listen on another channel.
225 #[allow(dead_code)]
226 fn start_simple_dispatch_backend<B>(backend: B, task_num: usize, channel_capacity: usize)
227 -> [<$service DispatchServer>]
228 where B: Clone + DispatchBackendShard + Send + Sync + 'static
229 {
230 async fn backend_task<B>(mut backend: B, mut req_rx: tokio::sync::mpsc::Receiver<DispatchRequest>)
231 where B: DispatchBackendShard + Send + Sync + 'static
232 {
233 while let Some(request) = req_rx.recv().await {
234 request.handle_and_reply(&mut backend).await;
235 }
236 }
237
238 let mut req_txs = Vec::new();
239 for _ in 0..task_num {
240 let (req_tx, req_rx) = tokio::sync::mpsc::channel(channel_capacity);
241
242 tokio::spawn(backend_task(backend.clone(), req_rx));
243
244 req_txs.push(req_tx);
245 }
246
247 [<$service DispatchServer>]::with_txs(req_txs)
248 }
249
250 } // end of paste!
251 }
252}