use std::collections::BTreeMap;
use std::marker::PhantomData;
use std::net::SocketAddr;
use std::sync::Arc;
use async_trait::async_trait;
use rkyv::ser::serializers::AllocSerializer;
use rkyv::{AlignedVec, Archive, Serialize};
use crate::net::Status;
use crate::request::{Request, RequestContents};
use crate::{Body, SCRATCH_SPACE};
pub type HandlerKey = u64;
pub struct ServiceRegistry<Svc> {
handlers: BTreeMap<HandlerKey, Arc<dyn OpaqueMessageHandler>>,
service: Arc<Svc>,
}
impl<Svc> ServiceRegistry<Svc>
where
Svc: RpcService + Send + Sync + 'static,
{
pub(crate) fn new(service: Svc) -> Self {
Self {
handlers: BTreeMap::new(),
service: Arc::new(service),
}
}
pub(crate) fn into_handlers(
self,
) -> BTreeMap<HandlerKey, Arc<dyn OpaqueMessageHandler>> {
self.handlers
}
pub fn add_handler<Msg>(&mut self)
where
Msg: RequestContents + Sync + Send + 'static,
Svc: Handler<Msg>,
{
let phantom = PhantomHandler {
handler: self.service.clone(),
_msg: PhantomData::<Msg>::default(),
};
let uri = crate::to_uri_path(Svc::service_name(), <Svc as Handler<Msg>>::path());
self.handlers.insert(crate::hash(&uri), Arc::new(phantom));
}
}
pub trait RpcService: Sized {
fn service_name() -> &'static str {
std::any::type_name::<Self>()
}
fn register_handlers(registry: &mut ServiceRegistry<Self>);
}
#[async_trait]
pub trait Handler<Msg>: RpcService
where
Msg: RequestContents,
{
type Reply: TryIntoBody;
fn path() -> &'static str {
std::any::type_name::<Msg>()
}
async fn on_message(&self, msg: Request<Msg>) -> Result<Self::Reply, Status>;
}
#[async_trait]
pub(crate) trait OpaqueMessageHandler: Send + Sync {
async fn try_handle(
&self,
remote_addr: SocketAddr,
data: Body,
) -> Result<Body, AlignedVec>;
}
struct PhantomHandler<H, Msg>
where
H: Send + Sync + 'static,
Msg: Send + 'static,
{
handler: Arc<H>,
_msg: PhantomData<Msg>,
}
#[async_trait]
impl<H, Msg> OpaqueMessageHandler for PhantomHandler<H, Msg>
where
Msg: RequestContents + Send + Sync + 'static,
H: Handler<Msg> + Send + Sync + 'static,
{
async fn try_handle(
&self,
remote_addr: SocketAddr,
data: Body,
) -> Result<Body, AlignedVec> {
let view = match Msg::from_body(data).await {
Ok(view) => view,
Err(status) => {
let error = rkyv::to_bytes::<_, SCRATCH_SPACE>(&status)
.unwrap_or_else(|_| AlignedVec::new());
return Err(error);
},
};
let msg = Request::new(remote_addr, view);
self.handler
.on_message(msg)
.await
.and_then(|reply| reply.try_into_body())
.map_err(|status| {
rkyv::to_bytes::<_, SCRATCH_SPACE>(&status)
.unwrap_or_else(|_| AlignedVec::new())
})
}
}
pub trait TryIntoBody {
fn try_into_body(self) -> Result<Body, Status>;
}
pub trait TryAsBody {
fn try_as_body(&self) -> Result<Body, Status>;
}
impl<T> TryAsBody for T
where
T: Archive + Serialize<AllocSerializer<SCRATCH_SPACE>>,
{
fn try_as_body(&self) -> Result<Body, Status> {
rkyv::to_bytes::<_, SCRATCH_SPACE>(self)
.map(|v| Body::from(v.to_vec()))
.map_err(|e| Status::internal(e.to_string()))
}
}
impl<T> TryIntoBody for T
where
T: TryAsBody,
{
fn try_into_body(self) -> Result<Body, Status> {
<Self as TryAsBody>::try_as_body(&self)
}
}
impl TryIntoBody for Body {
fn try_into_body(self) -> Result<Body, Status> {
Ok(self)
}
}