tonic_server_dispatch/lib.rs
1//! A typical architecture of network service is that after receiving a
2//! request, the network tasks dispatch it to the business tasks according
3//! to some fields. In this way, requests for the same content can be
4//! dispatched in the same task to avoid shared state or locking.
5//! [This tokio tutorial] gives detailed description.
6//!
7//! The same is true in `tonic`'s gRPC server. The dispatch of requests
8//! from network tasks to the business task has a pattern. This crate is
9//! an abstraction of this pattern to simplify the repetitive work in
10//! the application.
11//!
12//!
13//! # Usage
14//!
15//! Let's take the [DictService] as example.
16//!
17//! We assume that you are familiar with how to implement the original
18//! tonic server. Here we just talk about the parts related to this
19//! crate.
20//!
21//! 0. Add this Crate
22//!
23//! Add this crate and `paste` to your Cargo.toml.
24//!
25//! ``` toml
26//! tonic-server-dispatch = "*"
27//! paste = "1.0"
28//! ```
29//!
30//! 1. Define your Service
31//!
32//! This macro builds the mapping relationship between tonic
33//! network tasks and your business tasks.
34//!
35//! ``` rust
36//! dispatch_service! {
37//! DictService, // original service name
38//! key, // hash by this request field
39//!
40//! // service methods
41//! set(SetRequest) -> SetReply,
42//! get(Key) -> Value,
43//! delete(Key) -> Value,
44//! }
45//! ```
46//!
47//! This macro is the main part of this crate. Go to its
48//! [doc page] for more detail.
49//!
50//! 2. Implement your Service
51//!
52//! Define your business context for each task, and implement
53//! `DispatchBackend` for it. `DispatchBackend` defines all service
54//! methods, similar to the original tonic ones.
55//!
56//! ``` rust
57//! #[derive(Default)]
58//! struct DictCtx (HashMap<String, f64>);
59//!
60//! impl DispatchBackend for DictCtx {
61//! async fn get(&mut self, req: Key) -> Result<Value, Status> {
62//! match self.0.get(&req.key) {
63//! Some(value) => Ok(Value { value: *value }),
64//! None => Err(Status::not_found(String::new())),
65//! }
66//! }
67//!
68//! // all other methods ...
69//! }
70//! ```
71//!
72//! Compare to the original tonic prototype:
73//!
74//! ```
75//! async fn get(&self, req: tonic::Request<Key>)
76//! -> Result<tonic::Response<Value>, tonic::Status>
77//! ```
78//!
79//! the difference:
80//!
81//! - `&self` -> `&mut self`
82//! - `tonic::Request<Key>` -> `Key`
83//! - `tonic::Response<Value>` -> `Value`
84//!
85//! 3. Start your Service
86//!
87//! This starts backend tasks and creates channels.
88//! The requests are dispatched from network tasks to backend
89//! tasks by the channels, and the response are sent back by
90//! oneshot channels.
91//!
92//! ```
93//! let svc = start_simple_dispatch_backend::<DictCtx>(16, 10);
94//! ```
95//!
96//! As the function's name suggests, it just starts the simple
97//! kind of backend task, which just listen on the request channel.
98//! If you want more complex backend task (e.g. listen on another
99//! channel too), you have to create tasks and channels youself.
100//! However, the implementation of this function can also be used
101//! as your reference.
102//!
103//! Now we have finished the dispatch level. It is very simple, isn't it?
104//! Go [DictService] for the full source code.
105//!
106//! [This tokio tutorial]: https://tokio.rs/tokio/tutorial/channels
107//! [DictService]: https://github.com/WuBingzheng/tonic-server-dispatch/blob/master/examples/src/server.rs
108//! [doc page]: macro.dispatch_service.html
109
110
111/// Define the service and build the mapping relationship between tonic
112/// network tasks and your business tasks.
113///
114/// Parameters:
115///
116/// - `$service` Original service name. Because we need to generate new
117/// service name based on this name, so do not give the module prefix.
118///
119/// - `$hash_by` The field in request types which is used to calculate
120/// which business task to dispatched to. All request types should
121/// contain this field.
122///
123/// - `$method` The gRPC method name. You need list all methods.
124///
125/// - `$request` The gRPC request type.
126///
127/// - `$reply` The gRPC response type.
128///
129///
130/// This macro defines 3 items:
131///
132/// - `trait DispatchBackend` This defines all your service's gRPC
133/// methods, and you need to implement this trait for your service
134/// context.
135///
136/// - `fn start_simple_dispatch_backend` This starts a simple kind of
137/// backend tasks, which just listen on the request channel.
138/// If you want more complex backend task (e.g. listen on another
139/// channel too), you have to create tasks and channels youself.
140///
141/// - `struct [<$service DispatchServer>]` This defines the real tonic
142/// service, and this macro implement it automatically. If you use
143/// the `start_simple_dispatch_backend` which handles this struct
144/// already, then you do not need to touch this. But if you need to
145/// build backend tasks yourself, then you need to create channels
146/// and this struct with their `Sender` ends by its `with_txs()`
147/// method. See `start_simple_dispatch_backend()`'s code for example.
148///
149/// Read the [DictService] example's source code for a better understanding.
150///
151/// [DictService]: https://github.com/WuBingzheng/tonic-server-dispatch/blob/master/examples/src/server.rs
152///
153#[macro_export]
154macro_rules! dispatch_service {
155 (
156 $service:ty,
157 $hash_by:ident,
158 $(
159 $method:ident ($request:ty) -> $reply:ty,
160 )*
161 ) => {
162
163 paste::paste! {
164
165 // Trait for backend business context.
166 //
167 // This defines all gRPC methods for this server.
168 //
169 // The formats of methods are similar to the original tonic ones,
170 // except that changes
171 // - self: from `&self` to `mut &self`
172 // - parameter: from `Request<R>` to `R`
173 // - retuen value: from `Response<R>` to `R`
174 //
175 // For example:
176 //
177 // ```
178 // impl DispatchBackend for MyGreeter {
179 // async fn say_hello(&mut self, req: SayHelloRequest) -> Result<SayHelloReply, Status> {
180 // Ok(SayHelloReply{
181 // say: "hello".into(),
182 // })
183 // }
184 // }
185 // ```
186 trait DispatchBackend {
187 $(
188 fn $method(&mut self, request: $request)
189 -> impl std::future::Future<Output = Result<$reply, tonic::Status>> + Send;
190 )*
191 }
192
193 // Dispatched request.
194 //
195 // This is an internal type. You would not need to know this.
196 enum DispatchRequest {
197 $(
198 [<$method:camel>] ($request, tokio::sync::oneshot::Sender<Result<$reply, tonic::Status>>),
199 )*
200 }
201
202 impl DispatchRequest {
203 async fn handle_and_reply<B>(self, ctx: &mut B)
204 where B: DispatchBackend + Send + Sync + 'static
205 {
206 match self {
207 $(
208 DispatchRequest::[<$method:camel>](req, resp_tx) => {
209 let reply = ctx.$method(req).await;
210 resp_tx.send(reply).unwrap();
211 }
212 )*
213 }
214 }
215 }
216
217 // Context for the tonic server, used to dispatch requests.
218 pub struct [<$service DispatchServer>] {
219 txs: Vec<tokio::sync::mpsc::Sender<DispatchRequest>>,
220 }
221
222 impl [<$service DispatchServer>] {
223 // create with txs
224 fn with_txs(txs: Vec<tokio::sync::mpsc::Sender<DispatchRequest>>) -> Self {
225 Self { txs }
226 }
227
228 // internal method
229 fn calc_hash(item: &impl std::hash::Hash) -> u64 {
230 use std::hash::Hasher;
231 let mut hasher = std::collections::hash_map::DefaultHasher::new();
232 item.hash(&mut hasher);
233 hasher.finish()
234 }
235 }
236
237 // The tonic server implementation.
238 //
239 // Dispatch the request to backend, and wait for the reply.
240 #[tonic::async_trait]
241 impl $service for [<$service DispatchServer>] {
242 $(
243 async fn $method(
244 &self,
245 request: tonic::Request<$request>,
246 ) -> Result<tonic::Response<$reply>, tonic::Status> {
247 let request = request.into_inner();
248
249 let shard = Self::calc_hash(&request.$hash_by) as usize % self.txs.len();
250
251 let (resp_tx, resp_rx) = tokio::sync::oneshot::channel();
252
253 let biz_req = DispatchRequest::[<$method:camel>](request, resp_tx);
254
255 match self.txs[shard].try_send(biz_req) {
256 Ok(()) => resp_rx.await.unwrap().map(tonic::Response::new),
257 Err(_) => Err(tonic::Status::unavailable(String::new())),
258 }
259 }
260 )*
261 }
262
263 // Start a simple backend service.
264 //
265 // You need to write your own code if any more feature, for example
266 // the backend task need to listen on another channel.
267 #[allow(dead_code)]
268 fn start_simple_dispatch_backend<B>(task_num: usize, channel_capacity: usize)
269 -> [<$service DispatchServer>]
270 where B: Default + DispatchBackend + Send + Sync + 'static
271 {
272 async fn backend_task<B>(mut req_rx: tokio::sync::mpsc::Receiver<DispatchRequest>)
273 where B: Default + DispatchBackend + Send + Sync + 'static
274 {
275 let mut ctx = B::default();
276 while let Some(request) = req_rx.recv().await {
277 request.handle_and_reply(&mut ctx).await;
278 }
279 }
280
281 let mut req_txs = Vec::new();
282 for _ in 0..task_num {
283 let (req_tx, req_rx) = tokio::sync::mpsc::channel(channel_capacity);
284
285 tokio::spawn(backend_task::<B>(req_rx));
286
287 req_txs.push(req_tx);
288 }
289
290 [<$service DispatchServer>]::with_txs(req_txs)
291 }
292
293 } // end of paste!
294 }
295}