use std::marker::PhantomData;
use std::pin::Pin;
use grpc::Status;
use grpc::StatusOr;
use grpc::client::CallOptions;
use grpc::client::InvokeOnce;
use grpc::client::RecvStream;
use grpc::client::ResponseStreamItem;
use grpc::client::SendOptions;
use grpc::client::SendStream;
use grpc::client::stream_util::RecvStreamValidator;
use grpc::core::RequestHeaders;
use protobuf::AsMut;
use protobuf::AsView;
use protobuf::ClearAndParse;
use protobuf::Message;
use protobuf::MessageMut;
use protobuf::MessageView;
use crate::CallBuilder;
use crate::ProtoRecvMessage;
use crate::ProtoSendMessage;
use crate::client::Internal;
pub struct ClientStreamingCallBuilder<'a, C, Req, Res> {
channel: C,
method: String,
args: CallOptions,
_phantom: PhantomData<&'a (Req, Res)>,
}
impl<'a, C, Req, Res> ClientStreamingCallBuilder<'a, C, Req, Res>
where
C: InvokeOnce,
{
pub fn new(channel: C, method: impl Into<String>) -> Self {
Self {
channel,
method: method.into(),
args: Default::default(),
_phantom: PhantomData,
}
}
}
impl<'a, C, Req, Res> IntoFuture for ClientStreamingCallBuilder<'a, C, Req, Res>
where
C: InvokeOnce + 'a,
Req: Message,
for<'b> Req::View<'b>: MessageView<'b>,
Res: Message + ClearAndParse,
for<'b> Res::Mut<'b>: MessageMut<'b>,
{
type Output = ClientStreamingCall<'a, C, Req, Res>;
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + Send + 'a>>;
fn into_future(self) -> Self::IntoFuture {
Box::pin(async move {
let headers = RequestHeaders::new().with_method_name(self.method);
let (tx, rx) = self.channel.invoke_once(headers, self.args).await;
ClientStreamingCall {
tx,
rx: RecvStreamValidator::new(rx, true),
_phantom: PhantomData,
}
})
}
}
impl<'a, C: InvokeOnce, Req, Res> CallBuilder<C> for ClientStreamingCallBuilder<'a, C, Req, Res> {
type Builder<NewC: InvokeOnce> = ClientStreamingCallBuilder<'a, NewC, Req, Res>;
fn rebuild<NewC>(
self,
f: impl FnOnce(C) -> NewC,
_: Internal,
) -> ClientStreamingCallBuilder<'a, NewC, Req, Res> {
ClientStreamingCallBuilder {
channel: f(self.channel),
method: self.method,
args: self.args,
_phantom: PhantomData,
}
}
fn args_mut(&mut self, _: Internal) -> &mut CallOptions {
&mut self.args
}
}
pub struct ClientStreamingCall<'a, C: InvokeOnce, Req, Res> {
tx: C::SendStream,
rx: RecvStreamValidator<C::RecvStream>,
_phantom: PhantomData<&'a (Req, Res)>,
}
impl<'a, C, Req, Res> ClientStreamingCall<'a, C, Req, Res>
where
C: InvokeOnce + 'a,
Req: Message,
for<'b> Req::View<'b>: MessageView<'b>,
Res: Message,
for<'b> Res::Mut<'b>: MessageMut<'b>,
{
pub async fn send(&mut self, message: &impl AsView<Proxied = Req>) -> Result<(), ()> {
let msg = ProtoSendMessage::from_view(message);
self.tx.send(&msg, SendOptions::default()).await
}
pub async fn with_response_message(self, res: &mut impl AsMut<MutProxied = Res>) -> Status {
let Self { tx, mut rx, .. } = self;
drop(tx);
let mut res = ProtoRecvMessage::from_mut(res);
loop {
let i = rx.recv(&mut res).await;
if let ResponseStreamItem::Trailers(t) = i {
return t.into_status();
}
}
}
pub async fn close_and_recv(self) -> StatusOr<Res> {
let mut res = Res::default();
self.with_response_message(&mut res).await?;
Ok(res)
}
}