use crate::{
Codec, Metadata, Status,
client::conn::{CancelHandle, GrpcClientConn},
timeout::parse_grpc_timeout,
};
use futures_lite::{Stream, StreamExt};
use std::{
future::{Future, IntoFuture},
pin::Pin,
task::{Context, Poll},
time::Duration,
};
use trillium::Headers;
use trillium_client::Client;
type BoxStream<T> = Pin<Box<dyn Stream<Item = T> + Send>>;
type BoxFuture<T> = Pin<Box<dyn Future<Output = T> + Send>>;
fn default_timeout(client: &Client) -> Option<Duration> {
client
.default_headers()
.get_str("grpc-timeout")
.and_then(parse_grpc_timeout)
}
pub struct UnaryConn<Req, Resp> {
engine: GrpcClientConn<Req, Resp>,
requests: Option<BoxStream<Req>>,
outcome: Option<(Option<Resp>, Result<(), Status>)>,
}
impl<Req, Resp> UnaryConn<Req, Resp>
where
Req: Send + 'static,
Resp: Send + 'static,
{
pub fn unary<C>(client: &Client, path: &str, request: Req) -> Self
where
C: Codec<Req> + Codec<Resp>,
{
let mut engine = GrpcClientConn::new::<C>(
client,
path,
Metadata::new(),
default_timeout(client),
false,
);
engine.buffer_request(request);
Self {
engine,
requests: None,
outcome: None,
}
}
pub fn client_streaming<C>(
client: &Client,
path: &str,
requests: impl Stream<Item = Req> + Send + 'static,
) -> Self
where
C: Codec<Req> + Codec<Resp>,
{
let engine = GrpcClientConn::new::<C>(
client,
path,
Metadata::new(),
default_timeout(client),
false,
);
Self {
engine,
requests: Some(Box::pin(requests)),
outcome: None,
}
}
pub fn with_ascii_metadata(mut self, key: &str, value: &str) -> Self {
self.engine.add_ascii_metadata(key, value);
self
}
pub fn with_binary_metadata(mut self, key: &str, value: impl Into<Vec<u8>>) -> Self {
self.engine.add_binary_metadata(key, value.into());
self
}
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.engine.set_deadline_from_now(timeout);
self
}
pub fn cancel_handle(&self) -> CancelHandle {
self.engine.cancel_handle()
}
pub fn metadata(&self) -> Option<&Headers> {
self.engine.headers()
}
pub fn trailers(&self) -> Option<&Headers> {
self.engine.trailers()
}
pub fn message(&self) -> Option<&Resp> {
self.outcome.as_ref().and_then(|(m, _)| m.as_ref())
}
pub fn status(&self) -> Result<(), Status> {
self.outcome
.as_ref()
.map(|(_, s)| s.clone())
.unwrap_or(Ok(()))
}
pub fn into_message(self) -> Result<Resp, Status> {
match self.outcome {
Some((message, status)) => {
status?;
message.ok_or_else(|| Status::internal("unary response had no message"))
}
None => Err(Status::internal("call has not been awaited")),
}
}
}
impl<Req, Resp> IntoFuture for UnaryConn<Req, Resp>
where
Req: Send + 'static,
Resp: Send + 'static,
{
type Output = Result<UnaryConn<Req, Resp>, Status>;
type IntoFuture = BoxFuture<Self::Output>;
fn into_future(mut self) -> Self::IntoFuture {
Box::pin(async move {
let mut engine = self.engine;
if let Some(mut requests) = self.requests.take() {
while let Some(message) = requests.next().await {
engine.send(message).await?;
}
}
engine.close_send().await?;
let outcome = read_unary(&mut engine).await;
Ok(UnaryConn {
engine,
requests: None,
outcome: Some(outcome),
})
})
}
}
async fn read_unary<Req, Resp>(
engine: &mut GrpcClientConn<Req, Resp>,
) -> (Option<Resp>, Result<(), Status>)
where
Req: Send + 'static,
Resp: Send + 'static,
{
match engine.recv().await {
Ok(Some(message)) => match engine.recv().await {
Ok(None) => (Some(message), Ok(())),
Ok(Some(_)) => (
None,
Err(Status::internal("unary response had multiple messages")),
),
Err(status) => (Some(message), Err(status)),
},
Ok(None) => (None, Err(Status::internal("unary response had no message"))),
Err(status) => (None, Err(status)),
}
}
pub struct StreamingConn<Req, Resp> {
engine: Option<Box<GrpcClientConn<Req, Resp>>>,
#[allow(clippy::type_complexity)]
in_flight: Option<BoxFuture<(Box<GrpcClientConn<Req, Resp>>, Result<Option<Resp>, Status>)>>,
}
impl<Req, Resp> StreamingConn<Req, Resp>
where
Req: Send + 'static,
Resp: Send + 'static,
{
pub fn server_streaming<C>(client: &Client, path: &str, request: Req) -> Self
where
C: Codec<Req> + Codec<Resp>,
{
let mut engine = GrpcClientConn::new::<C>(
client,
path,
Metadata::new(),
default_timeout(client),
false,
);
engine.buffer_request(request);
Self {
engine: Some(Box::new(engine)),
in_flight: None,
}
}
fn engine(&self) -> &GrpcClientConn<Req, Resp> {
self.engine
.as_deref()
.expect("engine present between polls")
}
fn engine_mut(&mut self) -> &mut GrpcClientConn<Req, Resp> {
self.engine
.as_deref_mut()
.expect("engine present between polls")
}
pub fn with_ascii_metadata(mut self, key: &str, value: &str) -> Self {
self.engine_mut().add_ascii_metadata(key, value);
self
}
pub fn with_binary_metadata(mut self, key: &str, value: impl Into<Vec<u8>>) -> Self {
self.engine_mut().add_binary_metadata(key, value.into());
self
}
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.engine_mut().set_deadline_from_now(timeout);
self
}
pub fn cancel_handle(&self) -> CancelHandle {
self.engine().cancel_handle()
}
pub fn metadata(&self) -> Option<&Headers> {
self.engine().headers()
}
pub fn trailers(&self) -> Option<&Headers> {
self.engine().trailers()
}
pub async fn recv(&mut self) -> Result<Option<Resp>, Status> {
self.engine_mut().recv().await
}
}
impl<Req, Resp> Stream for StreamingConn<Req, Resp>
where
Req: Send + 'static,
Resp: Send + 'static,
{
type Item = Result<Resp, Status>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
if this.in_flight.is_none() {
let mut engine = this.engine.take().expect("engine present between polls");
this.in_flight = Some(Box::pin(async move {
let result = engine.recv().await;
(engine, result)
}));
}
let fut = this.in_flight.as_mut().unwrap();
match fut.as_mut().poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready((engine, result)) => {
this.engine = Some(engine);
this.in_flight = None;
match result {
Ok(None) => Poll::Ready(None),
Ok(Some(msg)) => Poll::Ready(Some(Ok(msg))),
Err(status) => Poll::Ready(Some(Err(status))),
}
}
}
}
}
impl<Req, Resp> IntoFuture for StreamingConn<Req, Resp>
where
Req: Send + 'static,
Resp: Send + 'static,
{
type Output = Result<StreamingConn<Req, Resp>, Status>;
type IntoFuture = BoxFuture<Self::Output>;
fn into_future(mut self) -> Self::IntoFuture {
Box::pin(async move {
self.engine_mut().open_head().await?;
Ok(self)
})
}
}
pub struct BidiConn<Req, Resp> {
engine: Option<Box<GrpcClientConn<Req, Resp>>>,
#[allow(clippy::type_complexity)]
in_flight: Option<BoxFuture<(Box<GrpcClientConn<Req, Resp>>, Result<Option<Resp>, Status>)>>,
}
impl<Req, Resp> BidiConn<Req, Resp>
where
Req: Send + 'static,
Resp: Send + 'static,
{
pub fn bidi<C>(client: &Client, path: &str) -> Self
where
C: Codec<Req> + Codec<Resp>,
{
let engine =
GrpcClientConn::new::<C>(client, path, Metadata::new(), default_timeout(client), true);
Self {
engine: Some(Box::new(engine)),
in_flight: None,
}
}
fn engine(&self) -> &GrpcClientConn<Req, Resp> {
self.engine
.as_deref()
.expect("engine present between polls")
}
fn engine_mut(&mut self) -> &mut GrpcClientConn<Req, Resp> {
self.engine
.as_deref_mut()
.expect("engine present between polls")
}
pub fn with_ascii_metadata(mut self, key: &str, value: &str) -> Self {
self.engine_mut().add_ascii_metadata(key, value);
self
}
pub fn with_binary_metadata(mut self, key: &str, value: impl Into<Vec<u8>>) -> Self {
self.engine_mut().add_binary_metadata(key, value.into());
self
}
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.engine_mut().set_deadline_from_now(timeout);
self
}
pub fn cancel_handle(&self) -> CancelHandle {
self.engine().cancel_handle()
}
pub fn metadata(&self) -> Option<&Headers> {
self.engine().headers()
}
pub fn trailers(&self) -> Option<&Headers> {
self.engine().trailers()
}
pub async fn send(&mut self, message: Req) -> Result<(), Status> {
self.engine_mut().send(message).await
}
pub async fn close_send(&mut self) -> Result<(), Status> {
self.engine_mut().close_send().await
}
pub async fn recv(&mut self) -> Result<Option<Resp>, Status> {
self.engine_mut().recv().await
}
}
impl<Req, Resp> Stream for BidiConn<Req, Resp>
where
Req: Send + 'static,
Resp: Send + 'static,
{
type Item = Result<Resp, Status>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
if this.in_flight.is_none() {
let mut engine = this.engine.take().expect("engine present between polls");
this.in_flight = Some(Box::pin(async move {
let result = engine.recv().await;
(engine, result)
}));
}
let fut = this.in_flight.as_mut().unwrap();
match fut.as_mut().poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready((engine, result)) => {
this.engine = Some(engine);
this.in_flight = None;
match result {
Ok(None) => Poll::Ready(None),
Ok(Some(msg)) => Poll::Ready(Some(Ok(msg))),
Err(status) => Poll::Ready(Some(Err(status))),
}
}
}
}
}