tonic_server_dispatch/async.rs
1/// Define the service and build the mapping relationship between tonic
2/// network tasks and your business tasks.
3///
4/// Parameters:
5///
6/// - `$service` Original service name. Because we need to generate new
7/// service name based on this name, so do not give the module prefix.
8///
9/// - `$hash_by` The field in request types which is used to calculate
10/// which business task to dispatched to. All request types should
11/// contain this field.
12///
13/// - `$method` The gRPC method name. You need list all methods.
14///
15/// - `$request` The gRPC request type.
16///
17/// - `$reply` The gRPC response type.
18///
19///
20/// This macro defines 3 items:
21///
22/// - `trait DispatchBackend` This defines all your service's gRPC
23/// methods, and you need to implement this trait for your service
24/// context.
25///
26/// - `fn start_simple_dispatch_backend` This starts a simple kind of
27/// backend tasks, which just listen on the request channel.
28/// If you want more complex backend task (e.g. listen on another
29/// channel too), you have to create tasks and channels youself.
30///
31/// - `struct [<$service DispatchServer>]` This defines the real tonic
32/// service, and this macro implement it automatically. If you use
33/// the `start_simple_dispatch_backend` which handles this struct
34/// already, then you do not need to touch this. But if you need to
35/// build backend tasks yourself, then you need to create channels
36/// and this struct with their `Sender` ends by its `with_txs()`
37/// method. See `start_simple_dispatch_backend()`'s code for example.
38///
39/// Read the [DictService] example's source code for a better understanding.
40///
41/// [DictService]: https://github.com/WuBingzheng/tonic-server-dispatch/blob/master/examples/src/server_async.rs
42///
43#[macro_export]
44macro_rules! dispatch_service_async {
45 (
46 $service:ty,
47 $hash_by:ident,
48 $(
49 $method:ident ($request:ty) -> $reply:ty,
50 )*
51 ) => {
52
53 paste::paste! {
54
55 // Trait for backend business context.
56 //
57 // This defines all gRPC methods for this server.
58 //
59 // The formats of methods are similar to the original tonic ones,
60 // except that changes
61 // - self: from `&self` to `mut &self`
62 // - parameter: from `Request<R>` to `R`
63 // - retuen value: from `Response<R>` to `R`
64 //
65 // For example:
66 //
67 // ```
68 // impl DispatchBackend for MyGreeter {
69 // async fn say_hello(&mut self, req: SayHelloRequest) -> Result<SayHelloReply, Status> {
70 // Ok(SayHelloReply{
71 // say: "hello".into(),
72 // })
73 // }
74 // }
75 // ```
76 trait DispatchBackend {
77 $(
78 fn $method(&mut self, request: $request)
79 -> impl std::future::Future<Output = Result<$reply, tonic::Status>> + Send;
80 )*
81 }
82
83 // Dispatched request.
84 //
85 // This is an internal type. You would not need to know this.
86 enum DispatchRequest {
87 $(
88 [<$method:camel>] ($request, tokio::sync::oneshot::Sender<Result<$reply, tonic::Status>>),
89 )*
90 }
91
92 impl DispatchRequest {
93 async fn handle_and_reply<B>(self, ctx: &mut B)
94 where B: DispatchBackend + Send + Sync + 'static
95 {
96 match self {
97 $(
98 DispatchRequest::[<$method:camel>](req, resp_tx) => {
99 let reply = ctx.$method(req).await;
100 resp_tx.send(reply).unwrap();
101 }
102 )*
103 }
104 }
105 }
106
107 // Context for the tonic server, used to dispatch requests.
108 pub struct [<$service DispatchServer>] {
109 txs: Vec<tokio::sync::mpsc::Sender<DispatchRequest>>,
110 }
111
112 impl [<$service DispatchServer>] {
113 // create with txs
114 fn with_txs(txs: Vec<tokio::sync::mpsc::Sender<DispatchRequest>>) -> Self {
115 Self { txs }
116 }
117
118 // internal method
119 fn calc_hash(item: &impl std::hash::Hash) -> u64 {
120 use std::hash::Hasher;
121 let mut hasher = std::collections::hash_map::DefaultHasher::new();
122 item.hash(&mut hasher);
123 hasher.finish()
124 }
125 }
126
127 // The tonic server implementation.
128 //
129 // Dispatch the request to backend, and wait for the reply.
130 #[tonic::async_trait]
131 impl $service for [<$service DispatchServer>] {
132 $(
133 async fn $method(
134 &self,
135 request: tonic::Request<$request>,
136 ) -> Result<tonic::Response<$reply>, tonic::Status> {
137 let request = request.into_inner();
138
139 let shard = Self::calc_hash(&request.$hash_by) as usize % self.txs.len();
140
141 let (resp_tx, resp_rx) = tokio::sync::oneshot::channel();
142
143 let biz_req = DispatchRequest::[<$method:camel>](request, resp_tx);
144
145 match self.txs[shard].try_send(biz_req) {
146 Ok(()) => resp_rx.await.unwrap().map(tonic::Response::new),
147 Err(_) => Err(tonic::Status::unavailable(String::new())),
148 }
149 }
150 )*
151 }
152
153 // Start a simple backend service.
154 //
155 // You need to write your own code if any more feature, for example
156 // the backend task need to listen on another channel.
157 #[allow(dead_code)]
158 fn start_simple_dispatch_backend<B>(backend: B, task_num: usize, channel_capacity: usize)
159 -> [<$service DispatchServer>]
160 where B: Clone + DispatchBackend + Send + Sync + 'static
161 {
162 async fn backend_task<B>(mut backend: B, mut req_rx: tokio::sync::mpsc::Receiver<DispatchRequest>)
163 where B: DispatchBackend + Send + Sync + 'static
164 {
165 while let Some(request) = req_rx.recv().await {
166 request.handle_and_reply(&mut backend).await;
167 }
168 }
169
170 let mut req_txs = Vec::new();
171 for _ in 0..task_num {
172 let (req_tx, req_rx) = tokio::sync::mpsc::channel(channel_capacity);
173
174 tokio::spawn(backend_task(backend.clone(), req_rx));
175
176 req_txs.push(req_tx);
177 }
178
179 [<$service DispatchServer>]::with_txs(req_txs)
180 }
181
182 } // end of paste!
183 }
184}