tonic_server_dispatch/async.rs
1/// Define the service and build the mapping relationship between tonic
2/// network tasks and your asynchronous business tasks.
3///
4/// Use `dispatch_service_sync!` instead for synchronous mode.
5/// See [the module-level documentation](super) for more information
6/// about the 2 modes.
7///
8/// Parameters:
9///
10/// - `$service` Original service name. Because we need to generate new
11/// service name based on this name, so do not give the module prefix.
12///
13/// - `$hash_by: $hash_type` The field in request types which is used
14/// to calculate which business task to dispatched to. All request
15/// types should contain this field.
16///
17/// - `$shard_method ($shard_request) -> $shard_reply` gRPC methods that work
18/// on shard (but not on item). E.g. create or remove items on the shard.
19///
20/// - `$mutable_method ($mutable_request) -> $mutable_reply` gRPC mutable
21/// methods that work on item. E.g. update item itself.
22///
23/// - `$readonly_method ($readonly_request) -> $readonly_reply` gRPC
24/// readonly methods that work on item. E.g. query item itself.
25///
26///
27/// This macro defines 4 items:
28///
29/// - `trait DispatchBackendShard` is for each backend shard. You
30/// need to implement this trait for your shard context. It has 2 parts:
31/// 1. associated type Item, and get_item/get_item_mut methods;
32/// 2. gRPC methods that works at shard (but not item), e.g. create/delete.
33///
34/// - `trait DispatchBackendItem` is for each backend item. It has
35/// mutable and readonly gRPC methods that works at item. You
36/// need to implement this trait for your item.
37///
38/// The formats of all methods are similar to the original tonic ones,
39/// except that changes
40/// - self: from `&self` to `&mut self`
41/// - parameter: from `Request<R>` to `R`
42/// - retuen value: from `Response<R>` to `R`
43///
44/// However the meaning of `self` changes. For the original tonic methods,
45/// the `self` points to a global service context. While here, for shard
46/// methods the `self` points to a context for each shard, and for
47/// item mutable/readonly methods the `self` points to the item.
48///
49/// - `fn start_simple_dispatch_backend` This starts a simple kind of
50/// backend tasks, which just listen on the request channel.
51/// If you want more complex backend task (e.g. listen on another
52/// channel too), you have to create tasks and channels youself.
53///
54/// - `struct [<$service DispatchServer>]` This defines the real tonic
55/// service, and this macro implement it automatically. If you use
56/// the `start_simple_dispatch_backend` which handles this struct
57/// already, then you do not need to touch this. But if you need to
58/// build backend tasks yourself, then you need to create channels
59/// and this struct with their `Sender` ends by its `with_txs()`
60/// method. See `start_simple_dispatch_backend()`'s code for example.
61///
62/// Read the [DictService] example's source code for a better understanding.
63///
64/// [DictService]: https://github.com/WuBingzheng/tonic-server-dispatch/blob/master/examples/src/server_async.rs
65///
66#[macro_export]
67macro_rules! dispatch_service_async {
68 (
69 $service:ty,
70 $hash_by:ident : $hash_type:ty,
71
72 [ $(
73 $shard_method:ident ($shard_request:ty) -> $shard_reply:ty,
74 )* ],
75
76 [ $(
77 $mutable_method:ident ($mutable_request:ty) -> $mutable_reply:ty,
78 )* ],
79
80 [ $(
81 $readonly_method:ident ($readonly_request:ty) -> $readonly_reply:ty,
82 )* ]
83 ) => {
84
85 // define tonic Server: [<$service DispatchServer>]
86 //
87 // this part is same for sync and async modes.
88 tonic_server_dispatch::_define_dispatch_server!(
89 $service,
90 $hash_by: $hash_type,
91
92 tokio::sync::mpsc::Sender,
93
94 [ $(
95 $shard_method ($shard_request) -> $shard_reply,
96 )* ],
97
98 [ $(
99 $mutable_method ($mutable_request) -> $mutable_reply,
100 )* ],
101
102 [ $(
103 $readonly_method ($readonly_request) -> $readonly_reply,
104 )* ]
105 );
106
107 paste::paste! {
108
109 // 2 traits for backend business context: Shard and Item.
110 //
111 // DispatchBackendShard is for each backend shard. It has 2 parts:
112 // 1. associated type Item, and get_item/get_item_mut methods;
113 // 2. gRPC methods that works at shard (but not item), e.g. create/delete.
114 //
115 // DispatchBackendItem is for each backend item. It only has
116 // gRPC methods that works at item.
117 //
118 // The formats of all methods are similar to the original tonic ones,
119 // except that changes
120 // - self: from `&self` to `mut &self`
121 // - parameter: from `Request<R>` to `R`
122 // - retuen value: from `Response<R>` to `R`
123 // ```
124 trait DispatchBackendShard {
125 // part-1
126 type Item: DispatchBackendItem + Send + Sync;
127 fn get_item(&self, key: &$hash_type) -> Result<&Self::Item, Status>;
128 fn get_item_mut(&mut self, key: &$hash_type) -> Result<&mut Self::Item, Status>;
129
130 // part-2
131 $(
132 fn $shard_method(&mut self, request: $shard_request)
133 -> impl std::future::Future<Output = Result<$shard_reply, tonic::Status>> + Send;
134 )*
135 }
136 trait DispatchBackendItem {
137 $(
138 fn $mutable_method(&mut self, request: $mutable_request)
139 -> impl std::future::Future<Output = Result<$mutable_reply, tonic::Status>> + Send;
140 )*
141 $(
142 fn $readonly_method(&self, request: $readonly_request)
143 -> impl std::future::Future<Output = Result<$readonly_reply, tonic::Status>> + Send;
144 )*
145 }
146
147 // Dispatched request.
148 //
149 // This is an internal type. You would not need to know this.
150 enum DispatchRequest {
151 $(
152 [<$shard_method:camel>] ($shard_request, tokio::sync::oneshot::Sender<Result<$shard_reply, tonic::Status>>),
153 )*
154 $(
155 [<$mutable_method:camel>] ($mutable_request, tokio::sync::oneshot::Sender<Result<$mutable_reply, tonic::Status>>),
156 )*
157 $(
158 [<$readonly_method:camel>] ($readonly_request, tokio::sync::oneshot::Sender<Result<$readonly_reply, tonic::Status>>),
159 )*
160 }
161
162 impl DispatchRequest {
163 async fn handle_and_reply<B>(self, ctx: &mut B)
164 where B: DispatchBackendShard + Send + Sync + 'static
165 {
166 match self {
167 $(
168 DispatchRequest::[<$shard_method:camel>](req, resp_tx) => {
169 let reply = ctx.$shard_method(req).await;
170 resp_tx.send(reply).unwrap();
171 }
172 )*
173 $(
174 DispatchRequest::[<$mutable_method:camel>](req, resp_tx) => {
175 let reply = match ctx.get_item_mut(&req.$hash_by) {
176 Ok(i) => i.$mutable_method(req).await,
177 Err(err) => Err(err),
178 };
179 resp_tx.send(reply).unwrap();
180 }
181 )*
182 $(
183 DispatchRequest::[<$readonly_method:camel>](req, resp_tx) => {
184 let reply = match ctx.get_item(&req.$hash_by) {
185 Ok(i) => i.$readonly_method(req).await,
186 Err(err) => Err(err),
187 };
188 resp_tx.send(reply).unwrap();
189 }
190 )*
191 }
192 }
193 }
194
195 // Start a simple backend service.
196 //
197 // You need to write your own code if any more feature, for example
198 // the backend task need to listen on another channel.
199 #[allow(dead_code)]
200 fn start_simple_dispatch_backend<B>(backend: B, task_num: usize, channel_capacity: usize)
201 -> [<$service DispatchServer>]
202 where B: Clone + DispatchBackendShard + Send + Sync + 'static
203 {
204 async fn backend_task<B>(mut backend: B, mut req_rx: tokio::sync::mpsc::Receiver<DispatchRequest>)
205 where B: DispatchBackendShard + Send + Sync + 'static
206 {
207 while let Some(request) = req_rx.recv().await {
208 request.handle_and_reply(&mut backend).await;
209 }
210 }
211
212 let mut req_txs = Vec::new();
213 for _ in 0..task_num {
214 let (req_tx, req_rx) = tokio::sync::mpsc::channel(channel_capacity);
215
216 tokio::spawn(backend_task(backend.clone(), req_rx));
217
218 req_txs.push(req_tx);
219 }
220
221 [<$service DispatchServer>]::with_txs(req_txs)
222 }
223
224 } // end of paste!
225 }
226}