tonic_server_dispatch/
async.rs

1/// Define the service and build the mapping relationship between tonic
2/// network tasks and your asynchronous business tasks.
3///
4/// Use `dispatch_service_sync!` instead for synchronous mode.
5/// See [the module-level documentation](super) for more information
6/// about the 2 modes.
7///
8/// Parameters:
9///
10/// - `$service` Original service name. Because we need to generate new
11///   service name based on this name, so do not give the module prefix.
12///
13/// - `$hash_by: $hash_type` The field in request types which is used
14///   to calculate which business task to dispatched to. All request
15///   types should contain this field.
16///
17/// - `$shard_method ($shard_request) -> $shard_reply` gRPC methods that work
18///   on shard (but not on item). E.g. create or remove items on the shard.
19///
20/// - `$mutable_method ($mutable_request) -> $mutable_reply` gRPC mutable
21///   methods that work on item. E.g. update item itself.
22///
23/// - `$readonly_method ($readonly_request) -> $readonly_reply` gRPC
24///   readonly methods that work on item. E.g. query item itself.
25///
26///
27/// This macro defines 4 items:
28///
29/// - `trait DispatchBackendShard` is for each backend shard. You
30///   need to implement this trait for your shard context. It has 2 parts:
31///    1. associated type Item, and get_item/get_item_mut methods;
32///    2. gRPC methods that works at shard (but not item), e.g. create/delete.
33///
34/// - `trait DispatchBackendItem` is for each backend item. It has
35///   mutable and readonly gRPC methods that works at item. You
36///   need to implement this trait for your item.
37///
38///   The formats of all methods are similar to the original tonic ones,
39///   except that changes
40///     - self: from `&self` to `&mut self`
41///     - parameter: from `Request<R>` to `R`
42///     - retuen value: from `Response<R>` to `R`
43///
44///   However the meaning of `self` changes. For the original tonic methods,
45///   the `self` points to a global service context. While here, for shard
46///   methods the `self` points to a context for each shard, and for
47///   item mutable/readonly methods the `self` points to the item.
48///
49/// - `fn start_simple_dispatch_backend` This starts a simple kind of
50///   backend tasks, which just listen on the request channel.
51///   If you want more complex backend task (e.g. listen on another
52///   channel too), you have to create tasks and channels youself.
53///
54/// - `struct [<$service DispatchServer>]` This defines the real tonic
55///   service, and this macro implement it automatically. If you use
56///   the `start_simple_dispatch_backend` which handles this struct
57///   already, then you do not need to touch this. But if you need to
58///   build backend tasks yourself, then you need to create channels
59///   and this struct with their `Sender` ends by its `with_txs()`
60///   method. See `start_simple_dispatch_backend()`'s code for example.
61///
62/// Read the [DictService] example's source code for a better understanding.
63///
64/// [DictService]: https://github.com/WuBingzheng/tonic-server-dispatch/blob/master/examples/src/server_async.rs
65///
66#[macro_export]
67macro_rules! dispatch_service_async {
68    (
69        $service:ty,
70        $hash_by:ident : $hash_type:ty,
71
72        [ $(
73            $shard_method:ident ($shard_request:ty) -> $shard_reply:ty,
74        )* ],
75
76        [ $(
77            $mutable_method:ident ($mutable_request:ty) -> $mutable_reply:ty,
78        )* ],
79
80        [ $(
81            $readonly_method:ident ($readonly_request:ty) -> $readonly_reply:ty,
82        )* ]
83    ) => {
84
85        // define tonic Server: [<$service DispatchServer>]
86        //
87        // this part is same for sync and async modes.
88        tonic_server_dispatch::_define_dispatch_server!(
89            $service,
90            $hash_by: $hash_type,
91
92            tokio::sync::mpsc::Sender,
93
94            [ $(
95                $shard_method ($shard_request) -> $shard_reply,
96            )* ],
97
98            [ $(
99                $mutable_method ($mutable_request) -> $mutable_reply,
100            )* ],
101
102            [ $(
103                $readonly_method ($readonly_request) -> $readonly_reply,
104            )* ]
105        );
106
107        paste::paste! {
108
109        // 2 traits for backend business context: Shard and Item.
110        //
111        // DispatchBackendShard is for each backend shard. It has 2 parts:
112        // 1. associated type Item, and get_item/get_item_mut methods;
113        // 2. gRPC methods that works at shard (but not item), e.g. create/delete.
114        //
115        // DispatchBackendItem is for each backend item. It only has
116        // gRPC methods that works at item.
117        //
118        // The formats of all methods are similar to the original tonic ones,
119        // except that changes
120        //   - self: from `&self` to `mut &self`
121        //   - parameter: from `Request<R>` to `R`
122        //   - retuen value: from `Response<R>` to `R`
123        // ```
124        trait DispatchBackendShard {
125            // part-1
126            type Item: DispatchBackendItem + Send + Sync;
127            fn get_item(&self, key: &$hash_type) -> Result<&Self::Item, Status>;
128            fn get_item_mut(&mut self, key: &$hash_type) -> Result<&mut Self::Item, Status>;
129
130            // part-2
131            $(
132                fn $shard_method(&mut self, request: $shard_request)
133                -> impl std::future::Future<Output = Result<$shard_reply, tonic::Status>> + Send;
134            )*
135        }
136        trait DispatchBackendItem {
137            $(
138                fn $mutable_method(&mut self, request: $mutable_request)
139                -> impl std::future::Future<Output = Result<$mutable_reply, tonic::Status>> + Send;
140            )*
141            $(
142                fn $readonly_method(&self, request: $readonly_request)
143                -> impl std::future::Future<Output = Result<$readonly_reply, tonic::Status>> + Send;
144            )*
145        }
146
147        // Dispatched request.
148        //
149        // This is an internal type. You would not need to know this.
150        enum DispatchRequest {
151            $(
152                [<$shard_method:camel>] ($shard_request, tokio::sync::oneshot::Sender<Result<$shard_reply, tonic::Status>>),
153            )*
154            $(
155                [<$mutable_method:camel>] ($mutable_request, tokio::sync::oneshot::Sender<Result<$mutable_reply, tonic::Status>>),
156            )*
157            $(
158                [<$readonly_method:camel>] ($readonly_request, tokio::sync::oneshot::Sender<Result<$readonly_reply, tonic::Status>>),
159            )*
160        }
161
162        impl DispatchRequest {
163            async fn handle_and_reply<B>(self, ctx: &mut B)
164                where B: DispatchBackendShard + Send + Sync + 'static
165            {
166                match self {
167                    $(
168                        DispatchRequest::[<$shard_method:camel>](req, resp_tx) => {
169                            let reply = ctx.$shard_method(req).await;
170                            resp_tx.send(reply).unwrap();
171                        }
172                    )*
173                    $(
174                        DispatchRequest::[<$mutable_method:camel>](req, resp_tx) => {
175                            let reply = match ctx.get_item_mut(&req.$hash_by) {
176                                Ok(i) => i.$mutable_method(req).await,
177                                Err(err) => Err(err),
178                            };
179                            resp_tx.send(reply).unwrap();
180                        }
181                    )*
182                    $(
183                        DispatchRequest::[<$readonly_method:camel>](req, resp_tx) => {
184                            let reply = match ctx.get_item(&req.$hash_by) {
185                                Ok(i) => i.$readonly_method(req).await,
186                                Err(err) => Err(err),
187                            };
188                            resp_tx.send(reply).unwrap();
189                        }
190                    )*
191                }
192            }
193        }
194
195        // Start a simple backend service.
196        //
197        // You need to write your own code if any more feature, for example
198        // the backend task need to listen on another channel.
199        #[allow(dead_code)]
200        fn start_simple_dispatch_backend<B>(backend: B, task_num: usize, channel_capacity: usize)
201            -> [<$service DispatchServer>]
202            where B: Clone + DispatchBackendShard + Send + Sync + 'static
203        {
204            async fn backend_task<B>(mut backend: B, mut req_rx: tokio::sync::mpsc::Receiver<DispatchRequest>)
205                where B: DispatchBackendShard + Send + Sync + 'static
206            {
207                while let Some(request) = req_rx.recv().await {
208                    request.handle_and_reply(&mut backend).await;
209                }
210            }
211
212            let mut req_txs = Vec::new();
213            for _ in 0..task_num {
214                let (req_tx, req_rx) = tokio::sync::mpsc::channel(channel_capacity);
215
216                tokio::spawn(backend_task(backend.clone(), req_rx));
217
218                req_txs.push(req_tx);
219            }
220
221            [<$service DispatchServer>]::with_txs(req_txs)
222        }
223
224        } // end of paste!
225    }
226}