#[macro_export]
macro_rules! dispatch_service_sync {
(
$service:ty,
$hash_by:ident : $hash_type:ty,
[ $(
$shard_mutable_method:ident ($shard_mutable_request:ty) -> $shard_mutable_reply:ty,
)* ],
[ $(
$shard_readonly_method:ident ($shard_readonly_request:ty) -> $shard_readonly_reply:ty,
)* ],
[ $(
$item_mutable_method:ident ($item_mutable_request:ty) -> $item_mutable_reply:ty,
)* ],
[ $(
$item_readonly_method:ident ($item_readonly_request:ty) -> $item_readonly_reply:ty,
)* ]
) => {
tonic_server_dispatch::_define_dispatch_server!(
$service,
$hash_by: $hash_type,
std::sync::mpsc::SyncSender,
[ $(
$shard_mutable_method ($shard_mutable_request) -> $shard_mutable_reply,
)* ],
[ $(
$shard_readonly_method ($shard_readonly_request) -> $shard_readonly_reply,
)* ],
[ $(
$item_mutable_method ($item_mutable_request) -> $item_mutable_reply,
)* ],
[ $(
$item_readonly_method ($item_readonly_request) -> $item_readonly_reply,
)* ]
);
paste::paste! {
trait DispatchBackendShard {
type Item: DispatchBackendItem;
fn get_item(&self, key: &$hash_type) -> Result<&Self::Item, Status>;
fn get_item_mut(&mut self, key: &$hash_type) -> Result<&mut Self::Item, Status>;
$(
fn $shard_mutable_method(&mut self, request: $shard_mutable_request)
-> Result<$shard_mutable_reply, tonic::Status>;
)*
$(
fn $shard_readonly_method(&self, request: $shard_readonly_request)
-> Result<$shard_readonly_reply, tonic::Status>;
)*
}
trait DispatchBackendItem {
$(
fn $item_mutable_method(&mut self, request: $item_mutable_request)
-> Result<$item_mutable_reply, tonic::Status>;
)*
$(
fn $item_readonly_method(&self, request: $item_readonly_request)
-> Result<$item_readonly_reply, tonic::Status>;
)*
}
enum DispatchRequest {
$(
[<$shard_mutable_method:camel>] ($shard_mutable_request, tokio::sync::oneshot::Sender<Result<$shard_mutable_reply, tonic::Status>>),
)*
$(
[<$shard_readonly_method:camel>] ($shard_readonly_request, tokio::sync::oneshot::Sender<Result<$shard_readonly_reply, tonic::Status>>),
)*
$(
[<$item_mutable_method:camel>] ($item_mutable_request, tokio::sync::oneshot::Sender<Result<$item_mutable_reply, tonic::Status>>),
)*
$(
[<$item_readonly_method:camel>] ($item_readonly_request, tokio::sync::oneshot::Sender<Result<$item_readonly_reply, tonic::Status>>),
)*
}
impl DispatchRequest {
fn handle_and_reply<B>(self, ctx: &mut B)
where B: DispatchBackendShard + Send + Sync + 'static
{
match self {
$(
DispatchRequest::[<$shard_mutable_method:camel>](req, resp_tx) => {
let reply = ctx.$shard_mutable_method(req);
resp_tx.send(reply).unwrap();
}
)*
$(
DispatchRequest::[<$shard_readonly_method:camel>](req, resp_tx) => {
let reply = ctx.$shard_readonly_method(req);
resp_tx.send(reply).unwrap();
}
)*
$(
DispatchRequest::[<$item_mutable_method:camel>](req, resp_tx) => {
let reply = match ctx.get_item_mut(&req.$hash_by) {
Ok(i) => i.$item_mutable_method(req),
Err(err) => Err(err),
};
resp_tx.send(reply).unwrap();
}
)*
$(
DispatchRequest::[<$item_readonly_method:camel>](req, resp_tx) => {
let reply = match ctx.get_item(&req.$hash_by) {
Ok(i) => i.$item_readonly_method(req),
Err(err) => Err(err),
};
resp_tx.send(reply).unwrap();
}
)*
}
}
}
#[allow(dead_code)]
fn start_simple_dispatch_backend<B>(backend: B, task_num: usize, channel_capacity: usize)
-> [<$service DispatchServer>]
where B: Clone + DispatchBackendShard + Send + Sync + 'static
{
fn backend_task<B>(mut backend: B, mut req_rx: std::sync::mpsc::Receiver<DispatchRequest>)
where B: DispatchBackendShard + Send + Sync + 'static
{
while let Ok(request) = req_rx.recv() {
request.handle_and_reply(&mut backend);
}
}
let mut req_txs = Vec::new();
for i in 0..task_num {
let (req_tx, req_rx) = std::sync::mpsc::sync_channel(channel_capacity);
let backend = backend.clone();
std::thread::Builder::new()
.name(format!("biz-worker-{i}"))
.spawn(|| backend_task::<B>(backend, req_rx))
.unwrap();
req_txs.push(req_tx);
}
[<$service DispatchServer>]::with_txs(req_txs)
}
} }
}