tonic_server_dispatch/
lib.rs

1//! A typical architecture of network service is that after receiving a
2//! request, the network tasks dispatch it to the business tasks according
3//! to some fields. In this way, requests for the same content can be
4//! dispatched in the same task to avoid shared state or locking.
5//! [This tokio tutorial] gives detailed description.
6//!
7//! The same is true in `tonic`'s gRPC server. The dispatch of requests
8//! from network tasks to the business task has a pattern. This crate is
9//! an abstraction of this pattern to simplify the repetitive work in
10//! the application.
11//!
12//!
13//! # Usage
14//!
15//! Let's take the [DictService] as example.
16//!
17//! We assume that you are familiar with how to implement the original
18//! tonic server. Here we just talk about the parts related to this
19//! crate.
20//!
21//! 0. Add this Crate
22//!
23//!    Add this crate and `paste` to your Cargo.toml.
24//!
25//!     ``` toml
26//!     tonic-server-dispatch = "*"
27//!     paste = "1.0"
28//!     ```
29//!
30//! 1. Define your Service
31//!
32//!    This macro builds the mapping relationship between tonic
33//!    network tasks and your business tasks.
34//!
35//!     ``` rust
36//!     dispatch_service! {
37//!         DictService, // original service name
38//!         key, // hash by this request field
39//!
40//!         // service methods
41//!         set(SetRequest) -> SetReply,
42//!         get(Key) -> Value,
43//!         delete(Key) -> Value,
44//!     }
45//!     ```
46//!
47//!    This macro is the main part of this crate. Go to its
48//!    [doc page] for more detail.
49//!
50//! 2. Implement your Service
51//!
52//!    Define your business context for each task, and implement
53//!    `DispatchBackend` for it. `DispatchBackend` defines all service
54//!    methods, similar to the original tonic ones.
55//! 
56//!     ``` rust
57//!     #[derive(Default)]
58//!     struct DictCtx (HashMap<String, f64>);
59//!
60//!     impl DispatchBackend for DictCtx {
61//!         async fn get(&mut self, req: Key) -> Result<Value, Status> {
62//!             match self.0.get(&req.key) {
63//!                 Some(value) => Ok(Value { value: *value }),
64//!                 None => Err(Status::not_found(String::new())),
65//!             }
66//!         }
67//!
68//!         // all other methods ...
69//!     }
70//!     ```
71//!
72//!    Compare to the original tonic prototype:
73//!
74//!    ```
75//!    async fn get(&self, req: tonic::Request<Key>)
76//!        -> Result<tonic::Response<Value>, tonic::Status>
77//!    ```
78//!
79//!    the difference:
80//!
81//!    - `&self` -> `&mut self`
82//!    - `tonic::Request<Key>` -> `Key`
83//!    - `tonic::Response<Value>` -> `Value`
84//!
85//! 3. Start your Service
86//!
87//!    This starts backend tasks and creates channels.
88//!    The requests are dispatched from network tasks to backend
89//!    tasks by the channels, and the response are sent back by
90//!    oneshot channels.
91//!
92//!     ```
93//!     let svc = start_simple_dispatch_backend::<DictCtx>(16, 10);
94//!     ```
95//!
96//!    As the function's name suggests, it just starts the simple
97//!    kind of backend task, which just listen on the request channel.
98//!    If you want more complex backend task (e.g. listen on another
99//!    channel too), you have to create tasks and channels youself.
100//!    However, the implementation of this function can also be used
101//!    as your reference.
102//!
103//! Now we have finished the dispatch level. It is very simple, isn't it?
104//! Go [DictService] for the full source code.
105//!
106//! [This tokio tutorial]: https://tokio.rs/tokio/tutorial/channels
107//! [DictService]: https://github.com/WuBingzheng/tonic-server-dispatch/blob/master/examples/src/server.rs
108//! [doc page]: macro.dispatch_service.html
109
110
111/// Define the service and build the mapping relationship between tonic
112/// network tasks and your business tasks.
113///
114/// Parameters:
115///
116/// - `$service` Original service name. Because we need to generate new
117///   service name based on this name, so do not give the module prefix.
118///
119/// - `$hash_by` The field in request types which is used to calculate
120///   which business task to dispatched to. All request types should
121///   contain this field.
122///
123/// - `$method` The gRPC method name. You need list all methods.
124///
125/// - `$request` The gRPC request type.
126///
127/// - `$reply` The gRPC response type.
128///
129///
130/// This macro defines 3 items:
131///
132/// - `trait DispatchBackend` This defines all your service's gRPC
133///   methods, and you need to implement this trait for your service
134///   context.
135///
136/// - `fn start_simple_dispatch_backend` This starts a simple kind of
137///   backend tasks, which just listen on the request channel.
138///    If you want more complex backend task (e.g. listen on another
139///    channel too), you have to create tasks and channels youself.
140///
141/// - `struct [<$service DispatchServer>]` This defines the real tonic
142///   service, and this macro implement it automatically. If you use
143///   the `start_simple_dispatch_backend` which handles this struct
144///   already, then you do not need to touch this. But if you need to
145///   build backend tasks yourself, then you need to create channels
146///   and this struct with their `Sender` ends by its `with_txs()`
147///   method. See `start_simple_dispatch_backend()`'s code for example.
148///
149/// Read the [DictService] example's source code for a better understanding.
150///
151/// [DictService]: https://github.com/WuBingzheng/tonic-server-dispatch/blob/master/examples/src/server.rs
152///
153#[macro_export]
154macro_rules! dispatch_service {
155    (
156        $service:ty,
157        $hash_by:ident,
158        $(
159            $method:ident ($request:ty) -> $reply:ty,
160        )*
161    ) => {
162
163        paste::paste! {
164
165        // Trait for backend business context.
166        //
167        // This defines all gRPC methods for this server.
168        //
169        // The formats of methods are similar to the original tonic ones,
170        // except that changes
171        //   - self: from `&self` to `mut &self`
172        //   - parameter: from `Request<R>` to `R`
173        //   - retuen value: from `Response<R>` to `R`
174        //
175        // For example:
176        //
177        // ```
178        // impl DispatchBackend for MyGreeter {
179        //     async fn say_hello(&mut self, req: SayHelloRequest) -> Result<SayHelloReply, Status> {
180        //         Ok(SayHelloReply{
181        //             say: "hello".into(),
182        //         })
183        //     }
184        // }
185        // ```
186        trait DispatchBackend {
187            $(
188                fn $method(&mut self, request: $request)
189                -> impl std::future::Future<Output = Result<$reply, tonic::Status>> + Send;
190            )*
191        }
192
193        // Dispatched request.
194        //
195        // This is an internal type. You would not need to know this.
196        enum DispatchRequest {
197            $(
198                [<$method:camel>] ($request, tokio::sync::oneshot::Sender<Result<$reply, tonic::Status>>),
199            )*
200        }
201
202        impl DispatchRequest {
203            async fn handle_and_reply<B>(self, ctx: &mut B)
204                where B: DispatchBackend + Send + Sync + 'static
205            {
206                match self {
207                    $(
208                        DispatchRequest::[<$method:camel>](req, resp_tx) => {
209                            let reply = ctx.$method(req).await;
210                            resp_tx.send(reply).unwrap();
211                        }
212                    )*
213                }
214            }
215        }
216
217        // Context for the tonic server, used to dispatch requests.
218        pub struct [<$service DispatchServer>] {
219            txs: Vec<tokio::sync::mpsc::Sender<DispatchRequest>>,
220        }
221
222        impl [<$service DispatchServer>] {
223            // create with txs
224            fn with_txs(txs: Vec<tokio::sync::mpsc::Sender<DispatchRequest>>) -> Self {
225                Self { txs }
226            }
227
228            // internal method
229            fn calc_hash(item: &impl std::hash::Hash) -> u64 {
230                use std::hash::Hasher;
231                let mut hasher = std::collections::hash_map::DefaultHasher::new();
232                item.hash(&mut hasher);
233                hasher.finish()
234            }
235        }
236
237        // The tonic server implementation.
238        //
239        // Dispatch the request to backend, and wait for the reply.
240        #[tonic::async_trait]
241        impl $service for [<$service DispatchServer>] {
242            $(
243                async fn $method(
244                    &self,
245                    request: tonic::Request<$request>,
246                ) -> Result<tonic::Response<$reply>, tonic::Status> {
247                    let request = request.into_inner();
248
249                    let shard = Self::calc_hash(&request.$hash_by) as usize % self.txs.len();
250
251                    let (resp_tx, resp_rx) = tokio::sync::oneshot::channel();
252
253                    let biz_req = DispatchRequest::[<$method:camel>](request, resp_tx);
254
255                    match self.txs[shard].try_send(biz_req) {
256                        Ok(()) => resp_rx.await.unwrap().map(tonic::Response::new),
257                        Err(_) => Err(tonic::Status::unavailable(String::new())),
258                    }
259                }
260            )*
261        }
262
263        // Start a simple backend service.
264        //
265        // You need to write your own code if any more feature, for example
266        // the backend task need to listen on another channel.
267        #[allow(dead_code)]
268        fn start_simple_dispatch_backend<B>(task_num: usize, channel_capacity: usize)
269            -> [<$service DispatchServer>]
270            where B: Default + DispatchBackend + Send + Sync + 'static
271        {
272            async fn backend_task<B>(mut req_rx: tokio::sync::mpsc::Receiver<DispatchRequest>)
273                where B: Default + DispatchBackend + Send + Sync + 'static
274            {
275                let mut ctx = B::default();
276                while let Some(request) = req_rx.recv().await {
277                    request.handle_and_reply(&mut ctx).await;
278                }
279            }
280
281            let mut req_txs = Vec::new();
282            for _ in 0..task_num {
283                let (req_tx, req_rx) = tokio::sync::mpsc::channel(channel_capacity);
284
285                tokio::spawn(backend_task::<B>(req_rx));
286
287                req_txs.push(req_tx);
288            }
289
290            [<$service DispatchServer>]::with_txs(req_txs)
291        }
292
293        } // end of paste!
294    }
295}