#[macro_export]
macro_rules! _define_dispatch_server {
(
$service:ty,
$hash_by:ident : $hash_type:ty,
$mpsc_sender_type:ty,
[ $(
$shard_mutable_method:ident ($shard_mutable_request:ty) -> $shard_mutable_reply:ty,
)* ],
[ $(
$shard_readonly_method:ident ($shard_readonly_request:ty) -> $shard_readonly_reply:ty,
)* ],
[ $(
$item_mutable_method:ident ($item_mutable_request:ty) -> $item_mutable_reply:ty,
)* ],
[ $(
$item_readonly_method:ident ($item_readonly_request:ty) -> $item_readonly_reply:ty,
)* ]
) => {
paste::paste! {
pub struct [<$service DispatchServer>] {
txs: Vec<$mpsc_sender_type<DispatchRequest>>,
}
impl [<$service DispatchServer>] {
fn with_txs(txs: Vec<$mpsc_sender_type<DispatchRequest>>) -> Self {
Self { txs }
}
}
#[tonic::async_trait]
impl $service for [<$service DispatchServer>] {
$(
async fn $shard_mutable_method(
&self,
request: tonic::Request<$shard_mutable_request>,
) -> Result<tonic::Response<$shard_mutable_reply>, tonic::Status> {
tonic_server_dispatch::_service_method_body!($shard_mutable_method, self, request, $hash_by)
}
)*
$(
async fn $shard_readonly_method(
&self,
request: tonic::Request<$shard_readonly_request>,
) -> Result<tonic::Response<$shard_readonly_reply>, tonic::Status> {
tonic_server_dispatch::_service_method_body!($shard_readonly_method, self, request, $hash_by)
}
)*
$(
async fn $item_mutable_method(
&self,
request: tonic::Request<$item_mutable_request>,
) -> Result<tonic::Response<$item_mutable_reply>, tonic::Status> {
tonic_server_dispatch::_service_method_body!($item_mutable_method, self, request, $hash_by)
}
)*
$(
async fn $item_readonly_method(
&self,
request: tonic::Request<$item_readonly_request>,
) -> Result<tonic::Response<$item_readonly_reply>, tonic::Status> {
tonic_server_dispatch::_service_method_body!($item_readonly_method, self, request, $hash_by)
}
)*
}
}
}
}
#[macro_export]
macro_rules! _service_method_body {
($method:ident, $self:ident, $request:expr, $hash_by:ident) => {
paste::paste! {
{
fn calc_hash(item: &impl std::hash::Hash) -> u64 {
use std::hash::Hasher;
let mut hasher = std::collections::hash_map::DefaultHasher::new();
item.hash(&mut hasher);
hasher.finish()
}
let request = $request.into_inner();
let shard = calc_hash(&request.$hash_by) as usize % $self.txs.len();
let (resp_tx, resp_rx) = tokio::sync::oneshot::channel();
let biz_req = DispatchRequest::[<$method:camel>](request, resp_tx);
match $self.txs[shard].try_send(biz_req) {
Ok(()) => resp_rx.await.unwrap().map(tonic::Response::new),
Err(_) => Err(tonic::Status::unavailable(String::new())),
}
}
}
};
}