use std::marker::PhantomData;
use std::pin::Pin;
use grpc::Status;
use grpc::StatusError;
use grpc::client::CallOptions;
use grpc::client::InvokeOnce;
use grpc::client::RecvStream as _;
use grpc::client::ResponseStreamItem;
use grpc::client::SendOptions;
use grpc::client::SendStream as _;
use grpc::client::stream_util::RecvStreamValidator;
use grpc::core::RequestHeaders;
use protobuf::AsMut;
use protobuf::AsView;
use protobuf::ClearAndParse;
use protobuf::Message;
use protobuf::MessageView;
use protobuf::Proxied;
use crate::CallBuilder;
use crate::ProtoRecvMessage;
use crate::ProtoSendMessage;
use crate::client::Internal;
pub struct UnaryCallBuilder<'a, C, ReqMsgView, Res> {
channel: C,
method: String,
req: ReqMsgView,
args: CallOptions,
_phantom: PhantomData<&'a Res>,
}
impl<'a, C, ReqMsgView, Res> UnaryCallBuilder<'a, C, ReqMsgView, Res>
where
C: InvokeOnce,
{
pub fn new(channel: C, method: impl Into<String>, req: ReqMsgView) -> Self {
Self {
channel,
req,
method: method.into(),
args: Default::default(),
_phantom: PhantomData,
}
}
pub async fn with_response_message(self, res: &mut impl AsMut<MutProxied = Res>) -> Status
where
ReqMsgView: AsView + Send + Sync + 'a,
<ReqMsgView as AsView>::Proxied: Message,
for<'b> <<ReqMsgView as AsView>::Proxied as Proxied>::View<'b>: MessageView<'b>,
Res: Message,
for<'b> Res::Mut<'b>: ClearAndParse + Send + Sync,
{
let headers = RequestHeaders::new().with_method_name(self.method);
let (mut tx, rx) = self.channel.invoke_once(headers, self.args).await;
let mut rx = RecvStreamValidator::new(rx, true);
let req = &ProtoSendMessage::from_view(&self.req);
let _ = tx.send(req, SendOptions::new().with_final_msg(true)).await;
let mut res = ProtoRecvMessage::from_mut(res);
loop {
let i = rx.recv(&mut res).await;
if let ResponseStreamItem::Trailers(t) = i {
return t.status().clone();
}
}
}
}
impl<'a, C, ReqMsgView, Res> IntoFuture for UnaryCallBuilder<'a, C, ReqMsgView, Res>
where
C: InvokeOnce + 'a,
ReqMsgView: AsView + Send + Sync + 'a,
<ReqMsgView as AsView>::Proxied: Message,
for<'b> <<ReqMsgView as AsView>::Proxied as Proxied>::View<'b>: MessageView<'b>,
Res: Message,
for<'b> Res::Mut<'b>: ClearAndParse + Send + Sync,
{
type Output = Result<Res, StatusError>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + 'a>>;
fn into_future(self) -> Self::IntoFuture {
Box::pin(async move {
let mut res = Res::default();
self.with_response_message(&mut res).await?;
Ok(res)
})
}
}
impl<'a, C: InvokeOnce, ReqMsgView, Res> CallBuilder<C>
for UnaryCallBuilder<'a, C, ReqMsgView, Res>
{
type Builder<NewC: InvokeOnce> = UnaryCallBuilder<'a, NewC, ReqMsgView, Res>;
fn rebuild<NewC>(
self,
f: impl FnOnce(C) -> NewC,
_: Internal,
) -> UnaryCallBuilder<'a, NewC, ReqMsgView, Res> {
UnaryCallBuilder {
channel: f(self.channel),
method: self.method,
req: self.req,
args: self.args,
_phantom: PhantomData,
}
}
fn args_mut(&mut self, _: Internal) -> &mut CallOptions {
&mut self.args
}
}