use std::marker::PhantomData;
use std::time::Duration;
use std::time::Instant;
use bytes::Buf;
use grpc::Status;
use grpc::client::CallOptions;
use grpc::client::InvokeOnce;
use grpc::client::RecvStream as ClientRecvStream;
use grpc::client::ResponseStreamItem;
use grpc::client::SendOptions;
use grpc::client::SendStream;
use grpc::client::interceptor::Intercept;
use grpc::client::interceptor::InterceptOnce;
use grpc::client::interceptor::Intercepted;
use grpc::client::interceptor::IntoOnce;
use grpc::client::interceptor::InvokeOnceExt as _;
use grpc::client::stream_util::RecvStreamValidator;
use grpc::core::RecvMessage;
use protobuf::AsMut;
use protobuf::Message;
use protobuf::MessageMut;
use protobuf::MessageView;
use crate::ProtoRecvMessage;
use crate::ProtoSendMessage;
use crate::private::Internal;
pub(crate) mod bidi;
pub(crate) mod client_streaming;
pub(crate) mod server_streaming;
pub(crate) mod unary;
pub struct GrpcStreamingRequest<M, Tx> {
tx: Tx,
_phantom: PhantomData<M>,
}
impl<M, Tx> GrpcStreamingRequest<M, Tx>
where
Tx: SendStream,
M: Message,
for<'b> M::View<'b>: MessageView<'b>,
{
fn new(tx: Tx) -> Self {
Self {
tx,
_phantom: PhantomData,
}
}
pub async fn send(&mut self, message: M) -> Result<(), ()> {
self.tx
.send(
&ProtoSendMessage::from_view(&message),
SendOptions::default(),
)
.await
}
pub fn close(self) {}
}
pub struct GrpcStreamingResponse<M, Rx> {
rx: RecvStreamValidator<Rx>,
status: Option<Status>,
_phantom: PhantomData<M>,
}
impl<M, Rx> GrpcStreamingResponse<M, Rx>
where
Rx: ClientRecvStream,
M: Message,
for<'b> M::Mut<'b>: MessageMut<'b>,
{
fn new(rx: Rx) -> Self {
Self {
rx: RecvStreamValidator::new(rx, false),
status: None,
_phantom: PhantomData,
}
}
pub async fn recv_into(&mut self, res: &mut impl AsMut<MutProxied = M>) -> Result<(), ()> {
let mut res_view = ProtoRecvMessage::from_mut(res);
let mut i = self.rx.recv(&mut res_view).await;
if matches!(i, ResponseStreamItem::Headers(_)) {
i = self.rx.recv(&mut res_view).await;
}
drop(res_view);
match i {
ResponseStreamItem::Headers(_) => unreachable!(),
ResponseStreamItem::Message => Ok(()),
ResponseStreamItem::Trailers(trailers) => {
self.status = Some(trailers.into_status());
Err(())
}
ResponseStreamItem::StreamClosed => Err(()),
}
}
pub async fn recv(&mut self) -> Option<M> {
let mut res = M::default();
match self.recv_into(&mut res).await {
Ok(_) => Some(res),
Err(_) => None,
}
}
pub async fn status(mut self) -> Status {
if let Some(status) = self.status.take() {
status
} else {
let mut nop_msg = NopRecvMessage;
loop {
let i = self.rx.recv(&mut nop_msg).await;
if let ResponseStreamItem::Trailers(t) = i {
return t.into_status();
}
}
}
}
}
struct NopRecvMessage;
impl RecvMessage for NopRecvMessage {
fn decode(&mut self, _data: &mut dyn Buf) -> Result<(), String> {
Ok(())
}
}
pub trait CallBuilder<C: InvokeOnce>: Sized {
fn with_timeout(mut self, t: Duration) -> Self {
self.args_mut(Internal).set_deadline(Instant::now() + t);
self
}
fn with_interceptor<I: Intercept<C>>(
self,
interceptor: I,
) -> Self::Builder<Intercepted<C, IntoOnce<I>>> {
self.rebuild(|c| c.with_interceptor(interceptor), Internal)
}
fn with_once_interceptor<I: InterceptOnce<C>>(
self,
interceptor: I,
) -> Self::Builder<Intercepted<C, I>> {
self.rebuild(|c| c.with_once_interceptor(interceptor), Internal)
}
#[doc(hidden)]
type Builder<NewC: InvokeOnce>: CallBuilder<NewC>;
#[doc(hidden)]
fn rebuild<NewC: InvokeOnce>(
self,
f: impl FnOnce(C) -> NewC,
_: Internal,
) -> Self::Builder<NewC>;
#[doc(hidden)]
fn args_mut(&mut self, _: Internal) -> &mut CallOptions;
}