tonic_server_dispatch/
sync.rs1#[macro_export]
3macro_rules! dispatch_service_sync {
4 (
5 $service:ty,
6 $hash_by:ident,
7 $(
8 $method:ident ($request:ty) -> $reply:ty,
9 )*
10 ) => {
11
12 paste::paste! {
13
14 trait DispatchBackend {
37 $(
38 fn $method(&mut self, request: $request)
39 -> Result<$reply, tonic::Status>;
40 )*
41 }
42
43 enum DispatchRequest {
47 $(
48 [<$method:camel>] ($request, tokio::sync::oneshot::Sender<Result<$reply, tonic::Status>>),
49 )*
50 }
51
52 impl DispatchRequest {
53 fn handle_and_reply<B>(self, ctx: &mut B)
54 where B: DispatchBackend + Send + Sync + 'static
55 {
56 match self {
57 $(
58 DispatchRequest::[<$method:camel>](req, resp_tx) => {
59 let reply = ctx.$method(req);
60 resp_tx.send(reply).unwrap();
61 }
62 )*
63 }
64 }
65 }
66
67 pub struct [<$service DispatchServer>] {
69 txs: Vec<std::sync::mpsc::SyncSender<DispatchRequest>>,
70 }
71
72 impl [<$service DispatchServer>] {
73 fn with_txs(txs: Vec<std::sync::mpsc::SyncSender<DispatchRequest>>) -> Self {
75 Self { txs }
76 }
77
78 fn calc_hash(item: &impl std::hash::Hash) -> u64 {
80 use std::hash::Hasher;
81 let mut hasher = std::collections::hash_map::DefaultHasher::new();
82 item.hash(&mut hasher);
83 hasher.finish()
84 }
85 }
86
87 #[tonic::async_trait]
91 impl $service for [<$service DispatchServer>] {
92 $(
93 async fn $method(
94 &self,
95 request: tonic::Request<$request>,
96 ) -> Result<tonic::Response<$reply>, tonic::Status> {
97 let request = request.into_inner();
98
99 let shard = Self::calc_hash(&request.$hash_by) as usize % self.txs.len();
100
101 let (resp_tx, resp_rx) = tokio::sync::oneshot::channel();
102
103 let biz_req = DispatchRequest::[<$method:camel>](request, resp_tx);
104
105 match self.txs[shard].try_send(biz_req) {
106 Ok(()) => resp_rx.await.unwrap().map(tonic::Response::new),
107 Err(_) => Err(tonic::Status::unavailable(String::new())),
108 }
109 }
110 )*
111 }
112
113 #[allow(dead_code)]
118 fn start_simple_dispatch_backend<B>(backend: B, task_num: usize, channel_capacity: usize)
119 -> [<$service DispatchServer>]
120 where B: Clone + DispatchBackend + Send + Sync + 'static
121 {
122 fn backend_task<B>(mut backend: B, mut req_rx: std::sync::mpsc::Receiver<DispatchRequest>)
123 where B: DispatchBackend + Send + Sync + 'static
124 {
125 while let Ok(request) = req_rx.recv() {
126 request.handle_and_reply(&mut backend);
127 }
128 }
129
130 let mut req_txs = Vec::new();
131 for i in 0..task_num {
132 let (req_tx, req_rx) = std::sync::mpsc::sync_channel(channel_capacity);
133
134 let backend = backend.clone();
135 std::thread::Builder::new()
136 .name(format!("biz-worker-{i}"))
137 .spawn(|| backend_task::<B>(backend, req_rx))
138 .unwrap();
139
140 req_txs.push(req_tx);
141 }
142
143 [<$service DispatchServer>]::with_txs(req_txs)
144 }
145
146 } }
148}