use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use quanta::Instant;
use crate::msgbuf::MsgBuf;
use crate::rpc::Rpc;
use crate::type_alias::*;
use crate::util::likely::*;
pub struct Request<'a> {
rpc: &'a Rpc,
resp_buf: &'a mut MsgBuf,
sess_id: SessId,
sslot_idx: usize,
req_idx: ReqIdx,
start_time: Instant,
}
impl<'a> Request<'a> {
#[inline(always)]
pub(crate) fn new(
rpc: &'a Rpc,
resp_buf: &'a mut MsgBuf,
sess_id: SessId,
sslot_idx: usize,
req_idx: ReqIdx,
) -> Self {
Self {
rpc,
resp_buf,
sess_id,
sslot_idx,
req_idx,
start_time: Instant::recent(),
}
}
}
impl Request<'_> {
pub const RETX_TIMEOUT: Duration = Duration::from_millis(20);
#[inline]
pub fn abort(self) {
self.rpc
.abort_request((self.sess_id, self.sslot_idx, self.req_idx));
}
}
impl<'a> Future for Request<'a> {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let need_re_tx = Instant::recent() - self.start_time > Self::RETX_TIMEOUT;
if self.rpc.check_request_completion(
(self.sess_id, self.sslot_idx, self.req_idx),
self.resp_buf,
need_re_tx,
) {
return Poll::Ready(());
}
if unlikely(need_re_tx) {
self.start_time = Instant::recent();
}
self.rpc.progress();
cx.waker().wake_by_ref();
Poll::Pending
}
}