pub use anyhow::{Error, Result};
pub use flume::bounded as flume_bounded;
use futures_lite::future::Boxed;
pub use futures_lite::stream::{Stream, StreamExt};
use futures_util::SinkExt;
#[cfg(feature = "hyper")]
pub use quic_rpc::transport::hyper::{HyperConnection, HyperServerEndpoint};
#[cfg(feature = "quinn")]
pub use quic_rpc::transport::quinn::{QuinnConnection, QuinnServerEndpoint};
pub use quic_rpc::{
client::{BoxStreamSync, UpdateSink},
message::{
BidiStreaming, BidiStreamingMsg, ClientStreaming, ClientStreamingMsg, Msg, RpcMsg,
ServerStreaming, ServerStreamingMsg,
},
pattern::{
bidi_streaming::{Error as BidiStreamingError, ItemError as BidiStreamingItemError},
client_streaming::{Error as ClientStreamingError, ItemError as ClientStreamingItemError},
rpc::Error as RpcError,
server_streaming::{Error as ServerStreamingError, ItemError as ServerStreamingItemError},
},
server::RpcChannel,
transport::{
flume::{connection as flume_connection, FlumeConnection, FlumeServerEndpoint},
ConnectionErrors,
},
RpcClient, RpcMessage, RpcServer, Service, ServiceConnection, ServiceEndpoint,
};
use std::{
cell::RefCell,
future::Future,
pin::Pin,
sync::{Arc, OnceLock},
};
use tokio::runtime::Builder;
pub use tokio::{pin, runtime::Runtime};
use tracing::{debug, error, warn};
pub trait GetServiceHandler<T: Service> {
fn get_handler(self: Arc<Self>) -> Arc<T>;
}
pub trait ServiceHandler<T: Service> {
fn handle_rpc_request<S, E>(
self: Arc<Self>,
req: T::Req,
chan: RpcChannel<S, E, T>,
rt: &'static Runtime,
) -> impl Future<Output = Result<()>> + Send
where
E: ServiceEndpoint<S>,
S: Service;
}
pub async fn run_server<E, S>(server: RpcServer<S, E>)
where
S: Service + Default + ServiceHandler<S>,
E: ServiceEndpoint<S>,
{
let service = Arc::new(S::default());
debug!("{:?}", service);
static RT: OnceLock<Runtime> = OnceLock::new();
let rt = RT.get_or_init(|| Builder::new_multi_thread().enable_all().build().unwrap());
loop {
let Ok(accepting) = server.accept().await else {
continue;
};
match accepting.read_first().await {
Err(err) => warn!(?err, "server accept failed"),
Ok((req, chan)) => {
let handler = service.clone();
rt.spawn(async move {
if let Err(err) = S::handle_rpc_request(handler, req, chan, rt).await {
warn!(?err, "internal rpc error");
}
});
}
}
}
}
pub struct ClientStreamingResponse<T, S, C, R, I = S>(
OnceLock<(RefCell<UpdateSink<S, C, T, I>>, Boxed<Result<R>>)>,
)
where
S: Service,
C: ServiceConnection<S>,
I: Service,
T: Into<I::Req>;
impl<T, S, C, R, I> ClientStreamingResponse<T, S, C, R, I>
where
S: Service,
I: Service,
C: ServiceConnection<S>,
T: Into<I::Req>,
{
pub fn new(
sink: UpdateSink<S, C, T, I>,
result: impl Future<Output = Result<R>> + Send + 'static,
) -> Self {
let lock = OnceLock::new();
let _ = lock.set((RefCell::new(sink), Box::pin(result) as _));
Self(lock)
}
pub async fn put(&mut self, item: T) -> &mut Self {
let Some((sink, _)) = self.0.get() else {
return self;
};
if let Err(e) = sink.borrow_mut().send(item).await {
error!("Send data error. ({})", e);
}
self
}
pub async fn result(&mut self) -> Result<R> {
let Some((sink, fut)) = self.0.take() else {
panic!("Only once call.");
};
sink.borrow_mut().close().await.unwrap();
drop(sink.into_inner());
fut.await
}
}
pub struct ServerStreamingResponse<R>(Pin<Box<dyn Stream<Item = Result<R, Error>>>>);
impl<R> ServerStreamingResponse<R> {
pub fn new(stream: impl Stream<Item = Result<R, Error>> + 'static) -> Self {
Self(Box::pin(stream) as _)
}
pub async fn next(&mut self) -> Option<Result<R>> {
self.0.next().await
}
}