tonic_server_dispatch/
sync.rs

1/// Similar to the `dispatch_service_async` but in sync mode.
2///
3/// See [the module-level documentation](super) for more information
4/// about the 2 modes.
5///
6/// The only API difference is that the methods in `DispatchBackendShard`
7/// and `DispatchBackendItem` are sync but not `async fn`.
8///
9/// And there is also a sync mode [DictService] example.
10///
11/// [DictService]: https://github.com/WuBingzheng/tonic-server-dispatch/blob/master/examples/src/server_sync.rs
12#[macro_export]
13macro_rules! dispatch_service_sync {
14    (
15        $service:ty,
16        $hash_by:ident : $hash_type:ty,
17
18        [ $(
19            $shard_method:ident ($shard_request:ty) -> $shard_reply:ty,
20        )* ],
21
22        [ $(
23            $mutable_method:ident ($mutable_request:ty) -> $mutable_reply:ty,
24        )* ],
25
26        [ $(
27            $readonly_method:ident ($readonly_request:ty) -> $readonly_reply:ty,
28        )* ]
29    ) => {
30
31        // define tonic Server: [<$service DispatchServer>]
32        //
33        // this part is same for sync and async modes.
34        tonic_server_dispatch::_define_dispatch_server!(
35            $service,
36            $hash_by: $hash_type,
37
38            std::sync::mpsc::SyncSender,
39
40            [ $(
41                $shard_method ($shard_request) -> $shard_reply,
42            )* ],
43
44            [ $(
45                $mutable_method ($mutable_request) -> $mutable_reply,
46            )* ],
47
48            [ $(
49                $readonly_method ($readonly_request) -> $readonly_reply,
50            )* ]
51        );
52
53        paste::paste! {
54
55        // 2 traits for backend business context: Shard and Item.
56        //
57        // DispatchBackendShard is for each backend shard. It has 2 parts:
58        // 1. associated type Item, and get_item/get_item_mut methods;
59        // 2. gRPC methods that works at shard (but not item), e.g. create/delete.
60        //
61        // DispatchBackendItem is for each backend item. It only has
62        // gRPC methods that works at item.
63        //
64        // The formats of all methods are similar to the original tonic ones,
65        // except that changes
66        //   - `async fn` to `fn`
67        //   - self: from `&self` to `mut &self`
68        //   - parameter: from `Request<R>` to `R`
69        //   - retuen value: from `Response<R>` to `R`
70        // ```
71        trait DispatchBackendShard {
72            // part-1
73            type Item: DispatchBackendItem;
74            fn get_item(&self, key: &$hash_type) -> Result<&Self::Item, Status>;
75            fn get_item_mut(&mut self, key: &$hash_type) -> Result<&mut Self::Item, Status>;
76
77            // part-2
78            $(
79                fn $shard_method(&mut self, request: $shard_request)
80                -> Result<$shard_reply, tonic::Status>;
81            )*
82        }
83        trait DispatchBackendItem {
84            $(
85                fn $mutable_method(&mut self, request: $mutable_request)
86                -> Result<$mutable_reply, tonic::Status>;
87            )*
88            $(
89                fn $readonly_method(&self, request: $readonly_request)
90                -> Result<$readonly_reply, tonic::Status>;
91            )*
92        }
93
94        // Dispatched request.
95        //
96        // This is an internal type. You would not need to know this.
97        enum DispatchRequest {
98            $(
99                [<$shard_method:camel>] ($shard_request, tokio::sync::oneshot::Sender<Result<$shard_reply, tonic::Status>>),
100            )*
101            $(
102                [<$mutable_method:camel>] ($mutable_request, tokio::sync::oneshot::Sender<Result<$mutable_reply, tonic::Status>>),
103            )*
104            $(
105                [<$readonly_method:camel>] ($readonly_request, tokio::sync::oneshot::Sender<Result<$readonly_reply, tonic::Status>>),
106            )*
107        }
108
109        impl DispatchRequest {
110            fn handle_and_reply<B>(self, ctx: &mut B)
111                where B: DispatchBackendShard + Send + Sync + 'static
112            {
113                match self {
114                    $(
115                        DispatchRequest::[<$shard_method:camel>](req, resp_tx) => {
116                            let reply = ctx.$shard_method(req);
117                            resp_tx.send(reply).unwrap();
118                        }
119                    )*
120                    $(
121                        DispatchRequest::[<$mutable_method:camel>](req, resp_tx) => {
122                            let reply = match ctx.get_item_mut(&req.$hash_by) {
123                                Ok(i) => i.$mutable_method(req),
124                                Err(err) => Err(err),
125                            };
126                            resp_tx.send(reply).unwrap();
127                        }
128                    )*
129                    $(
130                        DispatchRequest::[<$readonly_method:camel>](req, resp_tx) => {
131                            let reply = match ctx.get_item(&req.$hash_by) {
132                                Ok(i) => i.$readonly_method(req),
133                                Err(err) => Err(err),
134                            };
135                            resp_tx.send(reply).unwrap();
136                        }
137                    )*
138                }
139            }
140        }
141
142        // Start a simple backend service.
143        //
144        // You need to write your own code if any more feature, for example
145        // the backend task need to listen on another channel.
146        #[allow(dead_code)]
147        fn start_simple_dispatch_backend<B>(backend: B, task_num: usize, channel_capacity: usize)
148            -> [<$service DispatchServer>]
149            where B: Clone + DispatchBackendShard + Send + Sync + 'static
150        {
151            fn backend_task<B>(mut backend: B, mut req_rx: std::sync::mpsc::Receiver<DispatchRequest>)
152                where B: DispatchBackendShard + Send + Sync + 'static
153            {
154                while let Ok(request) = req_rx.recv() {
155                    request.handle_and_reply(&mut backend);
156                }
157            }
158
159            let mut req_txs = Vec::new();
160            for i in 0..task_num {
161                let (req_tx, req_rx) = std::sync::mpsc::sync_channel(channel_capacity);
162
163                let backend = backend.clone();
164                std::thread::Builder::new()
165                    .name(format!("biz-worker-{i}"))
166                    .spawn(|| backend_task::<B>(backend, req_rx))
167                    .unwrap();
168
169                req_txs.push(req_tx);
170            }
171
172            [<$service DispatchServer>]::with_txs(req_txs)
173        }
174
175        } // end of paste!
176    }
177}