use std::future::Future;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll, Waker};
use crate::grpc::error::{GrpcCallError, GrpcClientError};
use crate::grpc::transport::GrpcCallId;
use crate::host::grpc::GrpcHost;
use crate::reactor::root::RootReactor;
use crate::types::Cid;
use super::codec::Decoder;
use super::{GrpcResponse, GrpcStatus, GrpcStatusCode};
pub struct GrpcCall<D> {
reactor: Rc<RootReactor>,
host: Rc<dyn GrpcHost>,
cid_and_waker: Option<(Cid, Waker)>,
call_id: Option<GrpcCallId>,
error: Option<GrpcCallError>,
ready: bool,
decoder: D,
}
impl<D> GrpcCall<D> {
pub(super) fn new(
reactor: Rc<RootReactor>,
host: Rc<dyn GrpcHost>,
decoder: D,
result: Result<GrpcCallId, GrpcCallError>,
) -> Self {
let (call_id, error) = match result {
Ok(call_id) => (Some(call_id), None),
Err(error) => (None, Some(error)),
};
Self {
reactor,
host,
decoder,
cid_and_waker: None,
call_id,
error,
ready: false,
}
}
pub fn into_result(mut self) -> Result<Self, GrpcCallError> {
if self.error.is_some() {
return Err(self.error.take().unwrap());
}
Ok(self)
}
}
impl<R> Drop for GrpcCall<R> {
fn drop(&mut self) {
if let Some(call_id) = self.call_id.as_ref() {
let call_id = *call_id;
if !self.ready {
let _ = self.host.cancel_grpc_call(call_id.token());
}
self.reactor.remove_grpc_response(call_id);
self.reactor.remove_grpc_client(call_id);
}
}
}
impl<D> Future for GrpcCall<D>
where
D: Decoder + Unpin,
{
type Output = Result<GrpcResponse<D::Output>, GrpcClientError>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let Some(call_id) = self.call_id else {
self.ready = true;
let error = self.error.take().unwrap();
return Poll::Ready(Err(GrpcClientError::Call(error)));
};
if let Some(response_parts) = self.reactor.remove_grpc_response(call_id) {
self.ready = true;
let status = GrpcStatusCode::from_u32(response_parts.event.status_code);
if status != GrpcStatusCode::Ok {
return Poll::Ready(Err(GrpcClientError::Status(GrpcStatus::new(
status,
response_parts.status,
))));
}
let content = response_parts.content.unwrap_or_default();
self.decoder
.decode(content)
.map(GrpcResponse::new)
.map_err(|e| GrpcClientError::Decode(e.into()))
.into()
} else {
let this = &mut *self.as_mut();
match this.cid_and_waker.as_ref() {
None => {
let cid = this.reactor.active_cid();
this.reactor.insert_grpc_client(call_id, cx.waker().clone());
this.reactor.set_paused(cid, true);
this.cid_and_waker = Some((cid, cx.waker().clone()));
}
Some((cid, waker)) if !waker.will_wake(cx.waker()) => {
let _ = this
.reactor
.remove_grpc_client(call_id)
.expect("stored client");
this.reactor.insert_grpc_client(call_id, cx.waker().clone());
this.cid_and_waker = Some((*cid, cx.waker().clone()));
}
Some(_) => {}
}
Poll::Pending
}
}
}