tonic_server_dispatch/
sync.rs

1/// Similar to the `dispatch_service_async` but in sync mode.
2///
3/// See [the module-level documentation](super) for more information
4/// about the 2 modes.
5///
6/// The only API difference is that the methods in `DispatchBackendShard`
7/// and `DispatchBackendItem` are sync but not `async fn`.
8///
9/// And there is also a sync mode [DictService] example.
10///
11/// [DictService]: https://github.com/WuBingzheng/tonic-server-dispatch/blob/master/examples/src/server_sync.rs
12#[macro_export]
13macro_rules! dispatch_service_sync {
14    (
15        $service:ty,
16        $hash_by:ident : $hash_type:ty,
17
18        [ $(
19            $shard_mutable_method:ident ($shard_mutable_request:ty) -> $shard_mutable_reply:ty,
20        )* ],
21
22        [ $(
23            $shard_readonly_method:ident ($shard_readonly_request:ty) -> $shard_readonly_reply:ty,
24        )* ],
25
26        [ $(
27            $item_mutable_method:ident ($item_mutable_request:ty) -> $item_mutable_reply:ty,
28        )* ],
29
30        [ $(
31            $item_readonly_method:ident ($item_readonly_request:ty) -> $item_readonly_reply:ty,
32        )* ]
33    ) => {
34
35        // define tonic Server: [<$service DispatchServer>]
36        //
37        // this part is same for sync and async modes.
38        tonic_server_dispatch::_define_dispatch_server!(
39            $service,
40            $hash_by: $hash_type,
41
42            std::sync::mpsc::SyncSender,
43
44            [ $(
45                $shard_mutable_method ($shard_mutable_request) -> $shard_mutable_reply,
46            )* ],
47
48            [ $(
49                $shard_readonly_method ($shard_readonly_request) -> $shard_readonly_reply,
50            )* ],
51
52            [ $(
53                $item_mutable_method ($item_mutable_request) -> $item_mutable_reply,
54            )* ],
55
56            [ $(
57                $item_readonly_method ($item_readonly_request) -> $item_readonly_reply,
58            )* ]
59        );
60
61        paste::paste! {
62
63        // 2 traits for backend business context: Shard and Item.
64        //
65        // DispatchBackendShard is for each backend shard. It has 2 parts:
66        // 1. associated type Item, and get_item/get_item_mut methods;
67        // 2. gRPC methods that works at shard (but not item), e.g. create/delete/list.
68        //
69        // DispatchBackendItem is for each backend item. It only has
70        // gRPC methods that works at item.
71        //
72        // The formats of all methods are similar to the original tonic ones,
73        // except that changes
74        //   - `async fn` to `fn`
75        //   - self: from `&self` to `mut &self` (mutable methods)
76        //   - parameter: from `Request<R>` to `R`
77        //   - retuen value: from `Response<R>` to `R`
78        // ```
79        trait DispatchBackendShard {
80            // part-1
81            type Item: DispatchBackendItem;
82            fn get_item(&self, key: &$hash_type) -> Result<&Self::Item, Status>;
83            fn get_item_mut(&mut self, key: &$hash_type) -> Result<&mut Self::Item, Status>;
84
85            // part-2
86            $(
87                fn $shard_mutable_method(&mut self, request: $shard_mutable_request)
88                -> Result<$shard_mutable_reply, tonic::Status>;
89            )*
90            $(
91                fn $shard_readonly_method(&self, request: $shard_readonly_request)
92                -> Result<$shard_readonly_reply, tonic::Status>;
93            )*
94        }
95        trait DispatchBackendItem {
96            $(
97                fn $item_mutable_method(&mut self, request: $item_mutable_request)
98                -> Result<$item_mutable_reply, tonic::Status>;
99            )*
100            $(
101                fn $item_readonly_method(&self, request: $item_readonly_request)
102                -> Result<$item_readonly_reply, tonic::Status>;
103            )*
104        }
105
106        // Dispatched request.
107        //
108        // This is an internal type. You would not need to know this.
109        enum DispatchRequest {
110            $(
111                [<$shard_mutable_method:camel>] ($shard_mutable_request, tokio::sync::oneshot::Sender<Result<$shard_mutable_reply, tonic::Status>>),
112            )*
113            $(
114                [<$shard_readonly_method:camel>] ($shard_readonly_request, tokio::sync::oneshot::Sender<Result<$shard_readonly_reply, tonic::Status>>),
115            )*
116            $(
117                [<$item_mutable_method:camel>] ($item_mutable_request, tokio::sync::oneshot::Sender<Result<$item_mutable_reply, tonic::Status>>),
118            )*
119            $(
120                [<$item_readonly_method:camel>] ($item_readonly_request, tokio::sync::oneshot::Sender<Result<$item_readonly_reply, tonic::Status>>),
121            )*
122        }
123
124        impl DispatchRequest {
125            fn handle_and_reply<B>(self, ctx: &mut B)
126                where B: DispatchBackendShard + Send + Sync + 'static
127            {
128                match self {
129                    $(
130                        DispatchRequest::[<$shard_mutable_method:camel>](req, resp_tx) => {
131                            let reply = ctx.$shard_mutable_method(req);
132                            resp_tx.send(reply).unwrap();
133                        }
134                    )*
135                    $(
136                        DispatchRequest::[<$shard_readonly_method:camel>](req, resp_tx) => {
137                            let reply = ctx.$shard_readonly_method(req);
138                            resp_tx.send(reply).unwrap();
139                        }
140                    )*
141                    $(
142                        DispatchRequest::[<$item_mutable_method:camel>](req, resp_tx) => {
143                            let reply = match ctx.get_item_mut(&req.$hash_by) {
144                                Ok(i) => i.$item_mutable_method(req),
145                                Err(err) => Err(err),
146                            };
147                            resp_tx.send(reply).unwrap();
148                        }
149                    )*
150                    $(
151                        DispatchRequest::[<$item_readonly_method:camel>](req, resp_tx) => {
152                            let reply = match ctx.get_item(&req.$hash_by) {
153                                Ok(i) => i.$item_readonly_method(req),
154                                Err(err) => Err(err),
155                            };
156                            resp_tx.send(reply).unwrap();
157                        }
158                    )*
159                }
160            }
161        }
162
163        // Start a simple backend service.
164        //
165        // You need to write your own code if any more feature, for example
166        // the backend task need to listen on another channel.
167        #[allow(dead_code)]
168        fn start_simple_dispatch_backend<B>(backend: B, task_num: usize, channel_capacity: usize)
169            -> [<$service DispatchServer>]
170            where B: Clone + DispatchBackendShard + Send + Sync + 'static
171        {
172            fn backend_task<B>(mut backend: B, mut req_rx: std::sync::mpsc::Receiver<DispatchRequest>)
173                where B: DispatchBackendShard + Send + Sync + 'static
174            {
175                while let Ok(request) = req_rx.recv() {
176                    request.handle_and_reply(&mut backend);
177                }
178            }
179
180            let mut req_txs = Vec::new();
181            for i in 0..task_num {
182                let (req_tx, req_rx) = std::sync::mpsc::sync_channel(channel_capacity);
183
184                let backend = backend.clone();
185                std::thread::Builder::new()
186                    .name(format!("biz-worker-{i}"))
187                    .spawn(|| backend_task::<B>(backend, req_rx))
188                    .unwrap();
189
190                req_txs.push(req_tx);
191            }
192
193            [<$service DispatchServer>]::with_txs(req_txs)
194        }
195
196        } // end of paste!
197    }
198}