use super::*;
use crate::rand::random;
#[doc(no_inline)]
pub use bytes::Bytes;
use futures_util::FutureExt;
#[doc(no_inline)]
pub use serde::{de::DeserializeOwned, Deserialize, Serialize};
use std::future::Future;
pub trait Request: Serialize + DeserializeOwned + Any + Send + Sync {
type Response: Serialize + DeserializeOwned + Any + Send + Sync;
const ID: u64;
}
#[doc(hidden)]
pub const fn hash_str(s: &str) -> u64 {
let mut h = 0u64;
let s = s.as_bytes();
let mut i = 0;
while i < s.len() {
h = h.wrapping_mul(33).wrapping_add(s[i] as u64);
i += 1;
}
h
}
impl Endpoint {
pub async fn call_timeout<R: Request>(
&self,
dst: SocketAddr,
request: R,
timeout: Duration,
) -> io::Result<R::Response> {
crate::time::timeout(timeout, self.call(dst, request))
.await
.map_err(|_| io::Error::new(io::ErrorKind::TimedOut, "RPC timeout"))?
}
pub async fn call<R: Request>(&self, dst: SocketAddr, request: R) -> io::Result<R::Response> {
let (rsp, _data) = self.call_with_data(dst, request, &[]).await?;
Ok(rsp)
}
pub async fn call_with_data<R: Request>(
&self,
dst: SocketAddr,
request: R,
data: &[u8],
) -> io::Result<(R::Response, Bytes)> {
let req_tag = R::ID;
let rsp_tag = random::<u64>();
let data = Bytes::copy_from_slice(data);
self.send_to_raw(dst, req_tag, Box::new((rsp_tag, request, data)))
.await?;
let (rsp, from) = self.recv_from_raw(rsp_tag).await?;
assert_eq!(from, dst);
let (rsp, data) = *rsp
.downcast::<(R::Response, Bytes)>()
.expect("message type mismatch");
Ok((rsp, data))
}
pub fn add_rpc_handler<R: Request, AsyncFn, Fut>(&self, mut f: AsyncFn)
where
AsyncFn: FnMut(R) -> Fut + Send + 'static,
Fut: Future<Output = R::Response> + Send + 'static,
{
self.add_rpc_handler_with_data(move |req, _data| f(req).map(|rsp| (rsp, vec![])))
}
pub fn add_rpc_handler_with_data<R: Request, AsyncFn, Fut>(&self, mut f: AsyncFn)
where
AsyncFn: FnMut(R, Bytes) -> Fut + Send + 'static,
Fut: Future<Output = (R::Response, Vec<u8>)> + Send + 'static,
{
let req_tag = R::ID;
let net = self.clone();
crate::task::spawn(async move {
loop {
let (data, from) = net.recv_from_raw(req_tag).await.unwrap();
let (rsp_tag, req, data) = *data
.downcast::<(u64, R, Bytes)>()
.expect("message type mismatch");
let rsp_future = f(req, data);
let net = net.clone();
crate::task::spawn(async move {
let (rsp, data) = rsp_future.await;
net.send_to_raw(from, rsp_tag, Box::new((rsp, Bytes::from(data))))
.await
.unwrap();
});
}
});
}
}