tonic_server_dispatch/
async.rs

1/// Define the service and build the mapping relationship between tonic
2/// network tasks and your asynchronous business tasks.
3///
4/// Use `dispatch_service_sync!` instead for synchronous mode.
5/// See [the module-level documentation](super) for more information
6/// about the 2 modes.
7///
8/// Parameters:
9///
10/// - `$service` Original service name. Because we need to generate new
11///   service name based on this name, so do not give the module prefix.
12///
13/// - `$hash_by: $hash_type` The field in request types which is used
14///   to calculate which business task to dispatched to. All request
15///   types should contain this field.
16///
17/// - `$shard_mutable_method ($shard_mutable_request) -> $shard_mutable_reply`
18///   gRPC methods that work on mutable shard (but not on item). E.g. create
19///   or remove items on the shard.
20///
21/// - `$shard_readonly_method ($shard_readonly_request) -> $shard_readonly_reply`
22///   gRPC methods that work on readonly shard (but not on item). E.g. list
23///   items on the shard.
24///
25/// - `$item_mutable_method ($item_mutable_request) -> $item_mutable_reply`
26///   gRPC mutable methods that work on item. E.g. update item itself.
27///
28/// - `$item_readonly_method ($item_readonly_request) -> $item_readonly_reply`
29///   gRPC readonly methods that work on item. E.g. query item itself.
30///
31///
32/// This macro defines 4 items:
33///
34/// - `trait DispatchBackendShard` is for each backend shard. You
35///   need to implement this trait for your shard context. It has 2 parts:
36///    1. associated type Item, and get_item/get_item_mut methods;
37///    2. gRPC methods that works at shard (but not item), e.g. create/delete.
38///
39/// - `trait DispatchBackendItem` is for each backend item. It has
40///   mutable and readonly gRPC methods that works at item. You
41///   need to implement this trait for your item.
42///
43///   The formats of all methods are similar to the original tonic ones,
44///   except that changes
45///     - self: from `&self` to `&mut self` (for mutable methods)
46///     - parameter: from `Request<R>` to `R`
47///     - retuen value: from `Response<R>` to `R`
48///
49///   However the meaning of `self` changes. For the original tonic methods,
50///   the `self` points to a global service context. While here, for shard
51///   methods the `self` points to a context for each shard, and for
52///   item mutable/readonly methods the `self` points to the item.
53///
54/// - `fn start_simple_dispatch_backend` This starts a simple kind of
55///   backend tasks, which just listen on the request channel.
56///   If you want more complex backend task (e.g. listen on another
57///   channel too), you have to create tasks and channels youself.
58///
59/// - `struct [<$service DispatchServer>]` This defines the real tonic
60///   service, and this macro implement it automatically. If you use
61///   the `start_simple_dispatch_backend` which handles this struct
62///   already, then you do not need to touch this. But if you need to
63///   build backend tasks yourself, then you need to create channels
64///   and this struct with their `Sender` ends by its `with_txs()`
65///   method. See `start_simple_dispatch_backend()`'s code for example.
66///
67/// Read the [DictService] example's source code for a better understanding.
68///
69/// [DictService]: https://github.com/WuBingzheng/tonic-server-dispatch/blob/master/examples/src/server_async.rs
70///
71#[macro_export]
72macro_rules! dispatch_service_async {
73    (
74        $service:ty,
75        $hash_by:ident : $hash_type:ty,
76
77        [ $(
78            $shard_mutable_method:ident ($shard_mutable_request:ty) -> $shard_mutable_reply:ty,
79        )* ],
80
81        [ $(
82            $shard_readonly_method:ident ($shard_readonly_request:ty) -> $shard_readonly_reply:ty,
83        )* ],
84
85        [ $(
86            $item_mutable_method:ident ($item_mutable_request:ty) -> $item_mutable_reply:ty,
87        )* ],
88
89        [ $(
90            $item_readonly_method:ident ($item_readonly_request:ty) -> $item_readonly_reply:ty,
91        )* ]
92    ) => {
93
94        // define tonic Server: [<$service DispatchServer>]
95        //
96        // this part is same for sync and async modes.
97        tonic_server_dispatch::_define_dispatch_server!(
98            $service,
99            $hash_by: $hash_type,
100
101            tokio::sync::mpsc::Sender,
102
103            [ $(
104                $shard_mutable_method ($shard_mutable_request) -> $shard_mutable_reply,
105            )* ],
106
107            [ $(
108                $shard_readonly_method ($shard_readonly_request) -> $shard_readonly_reply,
109            )* ],
110
111            [ $(
112                $item_mutable_method ($item_mutable_request) -> $item_mutable_reply,
113            )* ],
114
115            [ $(
116                $item_readonly_method ($item_readonly_request) -> $item_readonly_reply,
117            )* ]
118        );
119
120        paste::paste! {
121
122        // 2 traits for backend business context: Shard and Item.
123        //
124        // DispatchBackendShard is for each backend shard. It has 2 parts:
125        // 1. associated type Item, and get_item/get_item_mut methods;
126        // 2. gRPC methods that works at shard (but not item), e.g. create/delete/list.
127        //
128        // DispatchBackendItem is for each backend item. It only has
129        // gRPC methods that works at item.
130        //
131        // The formats of all methods are similar to the original tonic ones,
132        // except that changes
133        //   - self: from `&self` to `mut &self` (mutable methods)
134        //   - parameter: from `Request<R>` to `R`
135        //   - retuen value: from `Response<R>` to `R`
136        // ```
137        trait DispatchBackendShard {
138            // part-1
139            type Item: DispatchBackendItem + Send + Sync;
140            fn get_item(&self, key: &$hash_type) -> Result<&Self::Item, Status>;
141            fn get_item_mut(&mut self, key: &$hash_type) -> Result<&mut Self::Item, Status>;
142
143            // part-2
144            $(
145                fn $shard_mutable_method(&mut self, request: $shard_mutable_request)
146                -> impl std::future::Future<Output = Result<$shard_mutable_reply, tonic::Status>> + Send;
147            )*
148            $(
149                fn $shard_readonly_method(&self, request: $shard_readonly_request)
150                -> impl std::future::Future<Output = Result<$shard_readonly_reply, tonic::Status>> + Send;
151            )*
152        }
153        trait DispatchBackendItem {
154            $(
155                fn $item_mutable_method(&mut self, request: $item_mutable_request)
156                -> impl std::future::Future<Output = Result<$item_mutable_reply, tonic::Status>> + Send;
157            )*
158            $(
159                fn $item_readonly_method(&self, request: $item_readonly_request)
160                -> impl std::future::Future<Output = Result<$item_readonly_reply, tonic::Status>> + Send;
161            )*
162        }
163
164        // Dispatched request.
165        //
166        // This is an internal type. You would not need to know this.
167        enum DispatchRequest {
168            $(
169                [<$shard_mutable_method:camel>] ($shard_mutable_request, tokio::sync::oneshot::Sender<Result<$shard_mutable_reply, tonic::Status>>),
170            )*
171            $(
172                [<$shard_readonly_method:camel>] ($shard_readonly_request, tokio::sync::oneshot::Sender<Result<$shard_readonly_reply, tonic::Status>>),
173            )*
174            $(
175                [<$item_mutable_method:camel>] ($item_mutable_request, tokio::sync::oneshot::Sender<Result<$item_mutable_reply, tonic::Status>>),
176            )*
177            $(
178                [<$item_readonly_method:camel>] ($item_readonly_request, tokio::sync::oneshot::Sender<Result<$item_readonly_reply, tonic::Status>>),
179            )*
180        }
181
182        impl DispatchRequest {
183            async fn handle_and_reply<B>(self, ctx: &mut B)
184                where B: DispatchBackendShard + Send + Sync + 'static
185            {
186                match self {
187                    $(
188                        DispatchRequest::[<$shard_mutable_method:camel>](req, resp_tx) => {
189                            let reply = ctx.$shard_mutable_method(req).await;
190                            resp_tx.send(reply).unwrap();
191                        }
192                    )*
193                    $(
194                        DispatchRequest::[<$shard_readonly_method:camel>](req, resp_tx) => {
195                            let reply = ctx.$shard_readonly_method(req).await;
196                            resp_tx.send(reply).unwrap();
197                        }
198                    )*
199                    $(
200                        DispatchRequest::[<$item_mutable_method:camel>](req, resp_tx) => {
201                            let reply = match ctx.get_item_mut(&req.$hash_by) {
202                                Ok(i) => i.$item_mutable_method(req).await,
203                                Err(err) => Err(err),
204                            };
205                            resp_tx.send(reply).unwrap();
206                        }
207                    )*
208                    $(
209                        DispatchRequest::[<$item_readonly_method:camel>](req, resp_tx) => {
210                            let reply = match ctx.get_item(&req.$hash_by) {
211                                Ok(i) => i.$item_readonly_method(req).await,
212                                Err(err) => Err(err),
213                            };
214                            resp_tx.send(reply).unwrap();
215                        }
216                    )*
217                }
218            }
219        }
220
221        // Start a simple backend service.
222        //
223        // You need to write your own code if any more feature, for example
224        // the backend task need to listen on another channel.
225        #[allow(dead_code)]
226        fn start_simple_dispatch_backend<B>(backend: B, task_num: usize, channel_capacity: usize)
227            -> [<$service DispatchServer>]
228            where B: Clone + DispatchBackendShard + Send + Sync + 'static
229        {
230            async fn backend_task<B>(mut backend: B, mut req_rx: tokio::sync::mpsc::Receiver<DispatchRequest>)
231                where B: DispatchBackendShard + Send + Sync + 'static
232            {
233                while let Some(request) = req_rx.recv().await {
234                    request.handle_and_reply(&mut backend).await;
235                }
236            }
237
238            let mut req_txs = Vec::new();
239            for _ in 0..task_num {
240                let (req_tx, req_rx) = tokio::sync::mpsc::channel(channel_capacity);
241
242                tokio::spawn(backend_task(backend.clone(), req_rx));
243
244                req_txs.push(req_tx);
245            }
246
247            [<$service DispatchServer>]::with_txs(req_txs)
248        }
249
250        } // end of paste!
251    }
252}