tonic_server_dispatch/
sync.rs

1/// Similar to the `dispatch_service_async` but in sync mode.
2#[macro_export]
3macro_rules! dispatch_service_sync {
4    (
5        $service:ty,
6        $hash_by:ident,
7        $(
8            $method:ident ($request:ty) -> $reply:ty,
9        )*
10    ) => {
11
12        paste::paste! {
13
14        // Trait for backend business context.
15        //
16        // This defines all gRPC methods for this server.
17        //
18        // The formats of methods are similar to the original tonic ones,
19        // except that changes
20        //   - `async fn` to `fn`
21        //   - self: from `&self` to `mut &self`
22        //   - parameter: from `Request<R>` to `R`
23        //   - retuen value: from `Response<R>` to `R`
24        //
25        // For example:
26        //
27        // ```
28        // impl DispatchBackend for MyGreeter {
29        //     async fn say_hello(&mut self, req: SayHelloRequest) -> Result<SayHelloReply, Status> {
30        //         Ok(SayHelloReply{
31        //             say: "hello".into(),
32        //         })
33        //     }
34        // }
35        // ```
36        trait DispatchBackend {
37            $(
38                fn $method(&mut self, request: $request)
39                -> Result<$reply, tonic::Status>;
40            )*
41        }
42
43        // Dispatched request.
44        //
45        // This is an internal type. You would not need to know this.
46        enum DispatchRequest {
47            $(
48                [<$method:camel>] ($request, tokio::sync::oneshot::Sender<Result<$reply, tonic::Status>>),
49            )*
50        }
51
52        impl DispatchRequest {
53            fn handle_and_reply<B>(self, ctx: &mut B)
54                where B: DispatchBackend + Send + Sync + 'static
55            {
56                match self {
57                    $(
58                        DispatchRequest::[<$method:camel>](req, resp_tx) => {
59                            let reply = ctx.$method(req);
60                            resp_tx.send(reply).unwrap();
61                        }
62                    )*
63                }
64            }
65        }
66
67        // Context for the tonic server, used to dispatch requests.
68        pub struct [<$service DispatchServer>] {
69            txs: Vec<std::sync::mpsc::SyncSender<DispatchRequest>>,
70        }
71
72        impl [<$service DispatchServer>] {
73            // create with txs
74            fn with_txs(txs: Vec<std::sync::mpsc::SyncSender<DispatchRequest>>) -> Self {
75                Self { txs }
76            }
77
78            // internal method
79            fn calc_hash(item: &impl std::hash::Hash) -> u64 {
80                use std::hash::Hasher;
81                let mut hasher = std::collections::hash_map::DefaultHasher::new();
82                item.hash(&mut hasher);
83                hasher.finish()
84            }
85        }
86
87        // The tonic server implementation.
88        //
89        // Dispatch the request to backend, and wait for the reply.
90        #[tonic::async_trait]
91        impl $service for [<$service DispatchServer>] {
92            $(
93                async fn $method(
94                    &self,
95                    request: tonic::Request<$request>,
96                ) -> Result<tonic::Response<$reply>, tonic::Status> {
97                    let request = request.into_inner();
98
99                    let shard = Self::calc_hash(&request.$hash_by) as usize % self.txs.len();
100
101                    let (resp_tx, resp_rx) = tokio::sync::oneshot::channel();
102
103                    let biz_req = DispatchRequest::[<$method:camel>](request, resp_tx);
104
105                    match self.txs[shard].try_send(biz_req) {
106                        Ok(()) => resp_rx.await.unwrap().map(tonic::Response::new),
107                        Err(_) => Err(tonic::Status::unavailable(String::new())),
108                    }
109                }
110            )*
111        }
112
113        // Start a simple backend service.
114        //
115        // You need to write your own code if any more feature, for example
116        // the backend task need to listen on another channel.
117        #[allow(dead_code)]
118        fn start_simple_dispatch_backend<B>(backend: B, task_num: usize, channel_capacity: usize)
119            -> [<$service DispatchServer>]
120            where B: Clone + DispatchBackend + Send + Sync + 'static
121        {
122            fn backend_task<B>(mut backend: B, mut req_rx: std::sync::mpsc::Receiver<DispatchRequest>)
123                where B: DispatchBackend + Send + Sync + 'static
124            {
125                while let Ok(request) = req_rx.recv() {
126                    request.handle_and_reply(&mut backend);
127                }
128            }
129
130            let mut req_txs = Vec::new();
131            for i in 0..task_num {
132                let (req_tx, req_rx) = std::sync::mpsc::sync_channel(channel_capacity);
133
134                let backend = backend.clone();
135                std::thread::Builder::new()
136                    .name(format!("biz-worker-{i}"))
137                    .spawn(|| backend_task::<B>(backend, req_rx))
138                    .unwrap();
139
140                req_txs.push(req_tx);
141            }
142
143            [<$service DispatchServer>]::with_txs(req_txs)
144        }
145
146        } // end of paste!
147    }
148}