use std::future::Future;
use std::net::SocketAddr;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use quanta::Instant;
use rmp_serde as rmps;
use crate::msgbuf::MsgBuf;
use crate::nexus::{SmEvent, SmEventDetails};
use crate::request::Request;
use crate::rpc::Rpc;
use crate::type_alias::*;
use crate::util::likely::*;
#[derive(Clone, Copy)]
pub struct SessionHandle<'r> {
rpc: &'r Rpc,
sess_id: SessId,
remote_uri: SocketAddr,
remote_rpc_id: RpcId,
}
impl<'r> SessionHandle<'r> {
#[inline(always)]
pub(crate) fn new(
rpc: &'r Rpc,
sess_id: SessId,
remote_uri: SocketAddr,
remote_rpc_id: RpcId,
) -> Self {
Self {
rpc,
sess_id,
remote_uri,
remote_rpc_id,
}
}
}
impl<'r> SessionHandle<'r> {
#[inline(always)]
pub fn id(&self) -> SessId {
self.sess_id
}
#[inline(always)]
pub fn rpc(&self) -> &'r Rpc {
self.rpc
}
#[inline]
pub fn is_connected(&self) -> bool {
self.rpc
.session_connection_state(self.sess_id)
.unwrap_or(false)
}
pub fn connect<'a>(&'a self) -> impl Future<Output = bool> + 'a
where
'r: 'a,
{
let msg = if likely(!self.is_connected()) {
let (cli_ud_ep, cli_sess_rc_ep) = self.rpc.mark_session_connecting(self.sess_id);
let event = SmEvent {
src_rpc_id: self.rpc.id(),
dst_rpc_id: self.remote_rpc_id,
details: SmEventDetails::ConnectRequest {
cli_uri: self.rpc.nexus().uri(),
cli_ud_ep,
cli_sess_id: self.sess_id,
cli_sess_rc_ep,
},
};
rmps::to_vec(&event).unwrap()
} else {
Vec::new()
};
SessionConnect {
rpc: self.rpc,
sess_id: self.sess_id,
remote_uri: self.remote_uri,
msg,
start_time: Instant::now() - SessionConnect::TIMEOUT,
}
}
pub fn request<'a>(
&'a self,
req_type: ReqType,
req_msgbuf: &'a MsgBuf,
resp_msgbuf: &'a mut MsgBuf,
) -> Request<'a>
where
'r: 'a,
{
self.rpc
.enqueue_request(self.sess_id, req_type, req_msgbuf, resp_msgbuf)
}
}
struct SessionConnect<'a> {
rpc: &'a Rpc,
sess_id: SessId,
remote_uri: SocketAddr,
msg: Vec<u8>,
start_time: Instant,
}
impl SessionConnect<'_> {
pub const TIMEOUT: Duration = Duration::from_millis(100);
}
impl Future for SessionConnect<'_> {
type Output = bool;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
if let Some(result) = self.rpc.session_connection_state(self.sess_id) {
Poll::Ready(result)
} else {
if self.start_time.elapsed() >= Self::TIMEOUT {
self.rpc
.sm_tx
.send_to(&self.msg, self.remote_uri)
.expect("failed to send ConnectRequest");
self.start_time = Instant::now();
}
self.rpc.progress();
cx.waker().wake_by_ref();
Poll::Pending
}
}
}