use std::fmt::Debug;
use std::future::Future;
use std::io;
use std::io::IoSlice;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Context;
use std::task::Poll;
use std::time::Duration;
use ::tokio::io::AsyncRead;
use ::tokio::io::AsyncWrite;
use ::tokio::io::ReadBuf;
use crate::private;
pub(crate) mod hyper_wrapper;
#[cfg(feature = "_runtime-tokio")]
pub(crate) mod tokio;
pub type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send>>;
pub type BoxedTaskHandle = Box<dyn TaskHandle>;
pub type BoxEndpoint = Box<dyn GrpcEndpoint>;
pub trait Runtime: Send + Sync + Debug {
fn spawn(&self, task: Pin<Box<dyn Future<Output = ()> + Send + 'static>>) -> BoxedTaskHandle;
fn get_dns_resolver(&self, opts: ResolverOptions) -> Result<Box<dyn DnsResolver>, String>;
fn sleep(&self, duration: std::time::Duration) -> Pin<Box<dyn Sleep>>;
fn tcp_stream(
&self,
target: SocketAddr,
opts: TcpOptions,
) -> BoxFuture<Result<Box<dyn GrpcEndpoint>, String>>;
fn unix_stream(
&self,
path: PathBuf,
opts: UnixSocketOptions,
) -> BoxFuture<Result<Box<dyn GrpcEndpoint>, String>> {
Box::pin(async move {
Err("Unix sockets are not supported by this runtime on this platform".to_string())
})
}
}
pub trait Sleep: Send + Sync + Future<Output = ()> {}
pub trait TaskHandle: Send + Sync {
fn abort(&self);
}
#[tonic::async_trait]
pub trait DnsResolver: Send + Sync {
async fn lookup_host_name(&self, name: &str) -> Result<Vec<std::net::IpAddr>, String>;
async fn lookup_txt(&self, name: &str) -> Result<Vec<String>, String>;
}
#[derive(Default)]
pub struct ResolverOptions {
pub(super) server_addr: Option<std::net::SocketAddr>,
}
#[derive(Default)]
pub struct TcpOptions {
pub(crate) enable_nodelay: bool,
pub(crate) keepalive: Option<Duration>,
}
#[derive(Default)]
pub struct UnixSocketOptions {
_priv: (),
}
pub trait GrpcEndpoint: Send + Unpin + 'static {
fn get_local_address(&self) -> &str;
fn get_peer_address(&self) -> &str;
fn get_network_type(&self) -> &'static str;
#[doc(hidden)]
fn poll_read_private(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
token: private::Internal,
) -> Poll<io::Result<()>>;
#[doc(hidden)]
fn poll_write_private(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
token: private::Internal,
) -> Poll<io::Result<usize>>;
#[doc(hidden)]
fn poll_flush_private(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
token: private::Internal,
) -> Poll<io::Result<()>>;
#[doc(hidden)]
fn poll_shutdown_private(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
token: private::Internal,
) -> Poll<io::Result<()>>;
#[doc(hidden)]
fn poll_write_vectored_private(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[IoSlice<'_>],
token: private::Internal,
) -> Poll<io::Result<usize>> {
let buf = bufs
.iter()
.find(|b| !b.is_empty())
.map_or(&[][..], |b| &**b);
self.poll_write_private(cx, buf, token)
}
#[doc(hidden)]
fn is_write_vectored_private(&self, _: private::Internal) -> bool {
false
}
}
pub(crate) struct AsyncIoAdapter<T> {
inner: T,
}
impl<T: GrpcEndpoint> AsyncIoAdapter<T> {
pub(crate) fn new(inner: T) -> Self {
Self { inner }
}
pub(crate) fn get_ref(&self) -> &T {
&self.inner
}
}
impl<T: GrpcEndpoint> AsyncRead for AsyncIoAdapter<T> {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
Pin::new(&mut self.inner).poll_read_private(cx, buf, private::Internal)
}
}
impl<T: GrpcEndpoint> AsyncWrite for AsyncIoAdapter<T> {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
Pin::new(&mut self.inner).poll_write_private(cx, buf, private::Internal)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.inner).poll_flush_private(cx, private::Internal)
}
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
Pin::new(&mut self.inner).poll_shutdown_private(cx, private::Internal)
}
fn poll_write_vectored(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[IoSlice<'_>],
) -> Poll<io::Result<usize>> {
Pin::new(&mut self.inner).poll_write_vectored_private(cx, bufs, private::Internal)
}
fn is_write_vectored(&self) -> bool {
self.inner.is_write_vectored_private(private::Internal)
}
}
impl GrpcEndpoint for Box<dyn GrpcEndpoint> {
fn get_local_address(&self) -> &str {
(**self).get_local_address()
}
fn get_peer_address(&self) -> &str {
(**self).get_peer_address()
}
fn get_network_type(&self) -> &'static str {
(**self).get_network_type()
}
fn poll_read_private(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
token: private::Internal,
) -> Poll<io::Result<()>> {
Pin::new(&mut **self).poll_read_private(cx, buf, token)
}
fn poll_write_private(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
token: private::Internal,
) -> Poll<io::Result<usize>> {
Pin::new(&mut **self).poll_write_private(cx, buf, token)
}
fn poll_flush_private(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
token: private::Internal,
) -> Poll<io::Result<()>> {
Pin::new(&mut **self).poll_flush_private(cx, token)
}
fn poll_shutdown_private(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
token: private::Internal,
) -> Poll<io::Result<()>> {
Pin::new(&mut **self).poll_shutdown_private(cx, token)
}
}
#[derive(Default, Debug)]
pub(crate) struct NoOpRuntime {}
impl Runtime for NoOpRuntime {
fn spawn(&self, task: Pin<Box<dyn Future<Output = ()> + Send + 'static>>) -> BoxedTaskHandle {
unimplemented!()
}
fn get_dns_resolver(&self, opts: ResolverOptions) -> Result<Box<dyn DnsResolver>, String> {
unimplemented!()
}
fn sleep(&self, duration: std::time::Duration) -> Pin<Box<dyn Sleep>> {
unimplemented!()
}
fn tcp_stream(
&self,
_target: SocketAddr,
_opts: TcpOptions,
) -> Pin<Box<dyn Future<Output = Result<Box<dyn GrpcEndpoint>, String>> + Send>> {
unimplemented!()
}
}
pub(crate) fn default_runtime() -> GrpcRuntime {
#[cfg(feature = "_runtime-tokio")]
{
return GrpcRuntime::new(tokio::TokioRuntime::default());
}
#[allow(unreachable_code)]
GrpcRuntime::new(NoOpRuntime::default())
}
#[derive(Clone, Debug)]
pub struct GrpcRuntime {
inner: Arc<dyn Runtime>,
}
impl GrpcRuntime {
pub fn new<T: Runtime + 'static>(runtime: T) -> Self {
GrpcRuntime {
inner: Arc::new(runtime),
}
}
pub fn spawn(
&self,
task: Pin<Box<dyn Future<Output = ()> + Send + 'static>>,
) -> BoxedTaskHandle {
self.inner.spawn(task)
}
pub fn get_dns_resolver(&self, opts: ResolverOptions) -> Result<Box<dyn DnsResolver>, String> {
self.inner.get_dns_resolver(opts)
}
pub fn sleep(&self, duration: std::time::Duration) -> Pin<Box<dyn Sleep>> {
self.inner.sleep(duration)
}
pub fn tcp_stream(
&self,
target: SocketAddr,
opts: TcpOptions,
) -> BoxFuture<Result<Box<dyn GrpcEndpoint>, String>> {
self.inner.tcp_stream(target, opts)
}
pub fn unix_stream(
&self,
path: PathBuf,
opts: UnixSocketOptions,
) -> BoxFuture<Result<Box<dyn GrpcEndpoint>, String>> {
self.inner.unix_stream(path, opts)
}
}