use std::sync::Arc;
use tokio::sync::oneshot;
use tonic::async_trait;
use crate::client::CallOptions;
use crate::core::RecvMessage;
use crate::core::RequestHeaders;
use crate::core::ResponseHeaders;
use crate::core::SendMessage;
use crate::core::Trailers;
pub(crate) mod interceptor;
pub struct Server {
handler: Option<Arc<dyn DynHandle>>,
}
pub struct Call<SS, RS> {
pub headers: RequestHeaders,
pub send: SS,
pub recv: RS,
pub trailers_tx: oneshot::Sender<Trailers>,
}
#[trait_variant::make(Send)]
pub trait Listener {
type SendStream: SendStream + 'static;
type RecvStream: RecvStream + 'static;
async fn accept(&self) -> Option<Call<Self::SendStream, Self::RecvStream>>;
}
impl Server {
pub fn new() -> Self {
Self { handler: None }
}
pub fn set_handler<H>(&mut self, h: H)
where
H: Handle + Send + Sync + 'static,
{
self.handler = Some(Arc::new(h))
}
pub async fn serve(&self, l: &impl Listener) {
while let Some(call) = l.accept().await {
let mut send: Box<dyn DynSendStream> = Box::new(call.send);
let recv = BoxedRecvStream(Box::new(call.recv));
let options = CallOptions::default();
let trailers_tx = call.trailers_tx;
let trailers = self
.handler
.as_ref()
.unwrap()
.dyn_handle(call.headers, options, &mut *send, recv)
.await;
let _ = trailers_tx.send(trailers);
}
}
}
impl Default for Server {
fn default() -> Self {
Self::new()
}
}
#[trait_variant::make(Send)]
pub trait Handle: Send + Sync {
async fn handle(
&self,
headers: RequestHeaders,
options: CallOptions,
tx: &mut impl SendStream,
rx: impl RecvStream + 'static,
) -> Trailers;
}
#[async_trait]
trait DynHandle: Send + Sync {
async fn dyn_handle(
&self,
headers: RequestHeaders,
options: CallOptions,
tx: &mut dyn DynSendStream,
rx: BoxedRecvStream,
) -> Trailers;
}
#[async_trait]
impl<T: Handle> DynHandle for T {
async fn dyn_handle(
&self,
headers: RequestHeaders,
options: CallOptions,
mut tx: &mut dyn DynSendStream,
rx: BoxedRecvStream,
) -> Trailers {
self.handle(headers, options, &mut tx, rx).await
}
}
struct BoxedRecvStream(Box<dyn DynRecvStream + 'static>);
impl RecvStream for BoxedRecvStream {
async fn next(&mut self, msg: &mut dyn RecvMessage) -> Option<Result<(), ()>> {
self.0.dyn_next(msg).await
}
}
pub enum ResponseStreamItem<'a> {
Headers(ResponseHeaders),
Message(&'a dyn SendMessage),
}
#[trait_variant::make(Send)]
pub trait SendStream {
async fn send<'a>(
&mut self,
item: ResponseStreamItem<'a>,
options: SendOptions,
) -> Result<(), ()>;
}
#[async_trait]
trait DynSendStream: Send {
async fn dyn_send<'a>(
&mut self,
item: ResponseStreamItem<'a>,
options: SendOptions,
) -> Result<(), ()>;
}
#[async_trait]
impl<T: SendStream> DynSendStream for T {
async fn dyn_send<'a>(
&mut self,
item: ResponseStreamItem<'a>,
options: SendOptions,
) -> Result<(), ()> {
self.send(item, options).await
}
}
impl<'b> SendStream for &mut (dyn DynSendStream + 'b) {
async fn send<'a>(
&mut self,
item: ResponseStreamItem<'a>,
options: SendOptions,
) -> Result<(), ()> {
(**self).dyn_send(item, options).await
}
}
impl<'b> SendStream for Box<dyn DynSendStream + 'b> {
async fn send<'a>(
&mut self,
item: ResponseStreamItem<'a>,
options: SendOptions,
) -> Result<(), ()> {
(**self).dyn_send(item, options).await
}
}
#[derive(Default)]
#[non_exhaustive]
pub struct SendOptions {
pub final_msg: bool,
pub disable_compression: bool,
}
#[trait_variant::make(Send)]
pub trait RecvStream {
async fn next(&mut self, msg: &mut dyn RecvMessage) -> Option<Result<(), ()>>;
}
#[async_trait]
trait DynRecvStream: Send {
async fn dyn_next(&mut self, msg: &mut dyn RecvMessage) -> Option<Result<(), ()>>;
}
#[async_trait]
impl<T: RecvStream> DynRecvStream for T {
async fn dyn_next(&mut self, msg: &mut dyn RecvMessage) -> Option<Result<(), ()>> {
self.next(msg).await
}
}
impl<'a> RecvStream for Box<dyn DynRecvStream + 'a> {
async fn next(&mut self, msg: &mut dyn RecvMessage) -> Option<Result<(), ()>> {
(**self).dyn_next(msg).await
}
}