tonic_server_dispatch/
async.rs

1/// Define the service and build the mapping relationship between tonic
2/// network tasks and your business tasks.
3///
4/// Parameters:
5///
6/// - `$service` Original service name. Because we need to generate new
7///   service name based on this name, so do not give the module prefix.
8///
9/// - `$hash_by` The field in request types which is used to calculate
10///   which business task to dispatched to. All request types should
11///   contain this field.
12///
13/// - `$method` The gRPC method name. You need list all methods.
14///
15/// - `$request` The gRPC request type.
16///
17/// - `$reply` The gRPC response type.
18///
19///
20/// This macro defines 3 items:
21///
22/// - `trait DispatchBackend` This defines all your service's gRPC
23///   methods, and you need to implement this trait for your service
24///   context.
25///
26/// - `fn start_simple_dispatch_backend` This starts a simple kind of
27///   backend tasks, which just listen on the request channel.
28///    If you want more complex backend task (e.g. listen on another
29///    channel too), you have to create tasks and channels youself.
30///
31/// - `struct [<$service DispatchServer>]` This defines the real tonic
32///   service, and this macro implement it automatically. If you use
33///   the `start_simple_dispatch_backend` which handles this struct
34///   already, then you do not need to touch this. But if you need to
35///   build backend tasks yourself, then you need to create channels
36///   and this struct with their `Sender` ends by its `with_txs()`
37///   method. See `start_simple_dispatch_backend()`'s code for example.
38///
39/// Read the [DictService] example's source code for a better understanding.
40///
41/// [DictService]: https://github.com/WuBingzheng/tonic-server-dispatch/blob/master/examples/src/server_async.rs
42///
43#[macro_export]
44macro_rules! dispatch_service_async {
45    (
46        $service:ty,
47        $hash_by:ident,
48        $(
49            $method:ident ($request:ty) -> $reply:ty,
50        )*
51    ) => {
52
53        paste::paste! {
54
55        // Trait for backend business context.
56        //
57        // This defines all gRPC methods for this server.
58        //
59        // The formats of methods are similar to the original tonic ones,
60        // except that changes
61        //   - self: from `&self` to `mut &self`
62        //   - parameter: from `Request<R>` to `R`
63        //   - retuen value: from `Response<R>` to `R`
64        //
65        // For example:
66        //
67        // ```
68        // impl DispatchBackend for MyGreeter {
69        //     async fn say_hello(&mut self, req: SayHelloRequest) -> Result<SayHelloReply, Status> {
70        //         Ok(SayHelloReply{
71        //             say: "hello".into(),
72        //         })
73        //     }
74        // }
75        // ```
76        trait DispatchBackend {
77            $(
78                fn $method(&mut self, request: $request)
79                -> impl std::future::Future<Output = Result<$reply, tonic::Status>> + Send;
80            )*
81        }
82
83        // Dispatched request.
84        //
85        // This is an internal type. You would not need to know this.
86        enum DispatchRequest {
87            $(
88                [<$method:camel>] ($request, tokio::sync::oneshot::Sender<Result<$reply, tonic::Status>>),
89            )*
90        }
91
92        impl DispatchRequest {
93            async fn handle_and_reply<B>(self, ctx: &mut B)
94                where B: DispatchBackend + Send + Sync + 'static
95            {
96                match self {
97                    $(
98                        DispatchRequest::[<$method:camel>](req, resp_tx) => {
99                            let reply = ctx.$method(req).await;
100                            resp_tx.send(reply).unwrap();
101                        }
102                    )*
103                }
104            }
105        }
106
107        // Context for the tonic server, used to dispatch requests.
108        pub struct [<$service DispatchServer>] {
109            txs: Vec<tokio::sync::mpsc::Sender<DispatchRequest>>,
110        }
111
112        impl [<$service DispatchServer>] {
113            // create with txs
114            fn with_txs(txs: Vec<tokio::sync::mpsc::Sender<DispatchRequest>>) -> Self {
115                Self { txs }
116            }
117
118            // internal method
119            fn calc_hash(item: &impl std::hash::Hash) -> u64 {
120                use std::hash::Hasher;
121                let mut hasher = std::collections::hash_map::DefaultHasher::new();
122                item.hash(&mut hasher);
123                hasher.finish()
124            }
125        }
126
127        // The tonic server implementation.
128        //
129        // Dispatch the request to backend, and wait for the reply.
130        #[tonic::async_trait]
131        impl $service for [<$service DispatchServer>] {
132            $(
133                async fn $method(
134                    &self,
135                    request: tonic::Request<$request>,
136                ) -> Result<tonic::Response<$reply>, tonic::Status> {
137                    let request = request.into_inner();
138
139                    let shard = Self::calc_hash(&request.$hash_by) as usize % self.txs.len();
140
141                    let (resp_tx, resp_rx) = tokio::sync::oneshot::channel();
142
143                    let biz_req = DispatchRequest::[<$method:camel>](request, resp_tx);
144
145                    match self.txs[shard].try_send(biz_req) {
146                        Ok(()) => resp_rx.await.unwrap().map(tonic::Response::new),
147                        Err(_) => Err(tonic::Status::unavailable(String::new())),
148                    }
149                }
150            )*
151        }
152
153        // Start a simple backend service.
154        //
155        // You need to write your own code if any more feature, for example
156        // the backend task need to listen on another channel.
157        #[allow(dead_code)]
158        fn start_simple_dispatch_backend<B>(backend: B, task_num: usize, channel_capacity: usize)
159            -> [<$service DispatchServer>]
160            where B: Clone + DispatchBackend + Send + Sync + 'static
161        {
162            async fn backend_task<B>(mut backend: B, mut req_rx: tokio::sync::mpsc::Receiver<DispatchRequest>)
163                where B: DispatchBackend + Send + Sync + 'static
164            {
165                while let Some(request) = req_rx.recv().await {
166                    request.handle_and_reply(&mut backend).await;
167                }
168            }
169
170            let mut req_txs = Vec::new();
171            for _ in 0..task_num {
172                let (req_tx, req_rx) = tokio::sync::mpsc::channel(channel_capacity);
173
174                tokio::spawn(backend_task(backend.clone(), req_rx));
175
176                req_txs.push(req_tx);
177            }
178
179            [<$service DispatchServer>]::with_txs(req_txs)
180        }
181
182        } // end of paste!
183    }
184}