use crate::prelude::{
asserted_short_name, into_split_messenger, CallbackRecv, CallbackRecvSend, CallbackSend, ConId, ConnectionId, ConnectionStatus, MessageRecver, MessageSender, Messenger, PollAble, PollEventStatus, PollRead, Protocol, ReSendNonBlocking,
RecvNonBlocking, RecvStatus, RemoveConnectionBarrierOnDrop, SendNonBlocking, SendNonBlockingNonMut, SendStatus, TimerTaskStatus,
};
use log::{debug, info, log_enabled, warn};
use std::{
fmt::{Debug, Display},
io::Error,
net::TcpStream,
ops::DerefMut,
sync::Arc,
thread::sleep,
time::{Duration, Instant},
};
#[derive(Debug)]
pub struct CltRecver<P: Protocol, C: CallbackRecv<P>, const MAX_MSG_SIZE: usize> {
msg_recver: MessageRecver<P, MAX_MSG_SIZE>,
callback: Arc<C>,
protocol: Arc<P>,
#[allow(dead_code)] acceptor_connection_gate: Option<RemoveConnectionBarrierOnDrop>,
}
impl<P: Protocol, C: CallbackRecv<P>, const MAX_MSG_SIZE: usize> CltRecver<P, C, MAX_MSG_SIZE> {
pub fn new(recver: MessageRecver<P, MAX_MSG_SIZE>, callback: Arc<C>, protocol: Arc<P>, acceptor_connection_gate: Option<RemoveConnectionBarrierOnDrop>) -> Self {
Self {
msg_recver: recver,
callback,
protocol,
acceptor_connection_gate,
}
}
}
impl<P: Protocol, C: CallbackRecv<P>, const MAX_MSG_SIZE: usize> RecvNonBlocking<P::RecvT> for CltRecver<P, C, MAX_MSG_SIZE> {
#[inline(always)]
fn recv(&mut self) -> Result<RecvStatus<P::RecvT>, Error> {
match self.msg_recver.recv()? {
RecvStatus::Completed(Some(msg)) => {
self.protocol.on_recv(self, &msg);
self.callback.on_recv(self.con_id(), &msg);
Ok(RecvStatus::Completed(Some(msg)))
}
RecvStatus::Completed(None) => Ok(RecvStatus::Completed(None)),
RecvStatus::WouldBlock => Ok(RecvStatus::WouldBlock),
}
}
}
impl<P: Protocol, C: CallbackRecv<P>, const MAX_MSG_SIZE: usize> ConnectionId for CltRecver<P, C, MAX_MSG_SIZE> {
#[inline(always)]
fn con_id(&self) -> &ConId {
&self.msg_recver.frm_reader.con_id
}
}
impl<P: Protocol, C: CallbackRecv<P>, const MAX_MSG_SIZE: usize> ConnectionStatus for CltRecver<P, C, MAX_MSG_SIZE> {
#[inline(always)]
fn is_connected(&self) -> bool {
self.protocol.is_connected()
}
}
impl<P: Protocol, C: CallbackRecv<P>, const MAX_MSG_SIZE: usize> PollRead for CltRecver<P, C, MAX_MSG_SIZE> {
fn on_readable_event(&mut self) -> Result<PollEventStatus, Error> {
use RecvStatus::*;
match self.recv()? {
Completed(Some(_)) => Ok(PollEventStatus::Completed),
WouldBlock => Ok(PollEventStatus::WouldBlock),
Completed(None) => Ok(PollEventStatus::Terminate),
}
}
}
impl<P: Protocol, C: CallbackRecv<P>, const MAX_MSG_SIZE: usize> PollAble for CltRecver<P, C, MAX_MSG_SIZE> {
fn source(&mut self) -> Box<&mut dyn mio::event::Source> {
Box::new(&mut self.msg_recver.frm_reader.stream_reader)
}
}
impl<P: Protocol, C: CallbackRecv<P>, const MAX_MSG_SIZE: usize> Display for CltRecver<P, C, MAX_MSG_SIZE> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let recv_t = std::any::type_name::<P::RecvT>().split("::").last().unwrap_or("Unknown").replace('>', "");
let send_t = std::any::type_name::<P::SendT>().split("::").last().unwrap_or("Unknown").replace('>', "");
write!(f, "{}<{}, RecvT:{}, SendT:{}, {}>", asserted_short_name!("CltRecver", Self), self.con_id(), recv_t, send_t, MAX_MSG_SIZE)
}
}
#[derive(Debug)]
pub struct CltSender<P: Protocol, C: CallbackSend<P>, const MAX_MSG_SIZE: usize> {
msg_sender: MessageSender<P, MAX_MSG_SIZE>,
callback: Arc<C>,
protocol: Arc<P>,
#[allow(dead_code)] acceptor_connection_gate: Option<RemoveConnectionBarrierOnDrop>,
is_on_disconnect: bool, }
impl<P: Protocol, C: CallbackSend<P>, const MAX_MSG_SIZE: usize> CltSender<P, C, MAX_MSG_SIZE> {
pub fn new(sender: MessageSender<P, MAX_MSG_SIZE>, callback: Arc<C>, protocol: Arc<P>, acceptor_connection_gate: Option<RemoveConnectionBarrierOnDrop>) -> Self {
Self {
msg_sender: sender,
callback,
protocol,
acceptor_connection_gate,
is_on_disconnect: false,
}
}
fn on_disconnect(&mut self) {
if self.is_on_disconnect {
return;
}
self.is_on_disconnect = true;
let protocol = self.protocol.clone();
match protocol.on_disconnect(self) {
Ok(()) => {
if log_enabled!(log::Level::Info) {
info!("Clean, {}::on_disconnect con_id: {}", asserted_short_name!("CltSender", Self), self.con_id());
}
}
Err(err) => {
warn!(
"Dirty, {}::on_disconnect, did peer terminate connection or did you drop CltRecver before sender? con_id: {}, err:\n{}",
asserted_short_name!("CltSender", Self),
self.con_id(),
err
);
}
}
self.msg_sender.frm_writer.shutdown(std::net::Shutdown::Both, "CltSender::on_disconnect");
}
}
impl<P: Protocol, C: CallbackSend<P>, const MAX_MSG_SIZE: usize> SendNonBlocking<P::SendT> for CltSender<P, C, MAX_MSG_SIZE> {
#[inline(always)]
fn send(&mut self, msg: &mut <P as Messenger>::SendT) -> Result<SendStatus, Error> {
self.protocol.on_send(self, msg);
let res = self.msg_sender.send(msg);
match res {
Ok(SendStatus::Completed) => {
self.protocol.on_sent(self, msg);
self.callback.on_sent(self.con_id(), msg);
Ok(SendStatus::Completed)
}
Ok(SendStatus::WouldBlock) => {
self.protocol.on_wouldblock(self, msg);
Ok(SendStatus::WouldBlock)
}
Err(e) => {
self.protocol.on_error(self, msg, &e);
Err(e)
}
}
}
#[inline(always)]
fn send_busywait_timeout(&mut self, msg: &mut <P as Messenger>::SendT, timeout: Duration) -> Result<SendStatus, Error> {
use SendStatus::{Completed, WouldBlock};
let start = Instant::now();
self.protocol.on_send(self, msg);
loop {
let res = self.msg_sender.send(msg);
match res {
Ok(Completed) => {
self.protocol.on_sent(self, msg);
self.callback.on_sent(self.con_id(), msg);
return Ok(Completed);
}
Ok(WouldBlock) => {
if start.elapsed() > timeout {
self.protocol.on_wouldblock(self, msg);
return Ok(WouldBlock);
} else {
continue;
}
}
Err(e) => {
self.protocol.on_error(self, msg, &e);
return Err(e);
}
}
}
}
#[inline(always)]
fn send_busywait(&mut self, msg: &mut <P as Messenger>::SendT) -> Result<(), Error> {
use SendStatus::{Completed, WouldBlock};
self.protocol.on_send(self, msg);
loop {
let res = self.msg_sender.send(msg);
match res {
Ok(Completed) => {
self.protocol.on_sent(self, msg);
self.callback.on_sent(self.con_id(), msg);
return Ok(());
}
Ok(WouldBlock) => continue,
Err(e) => {
self.protocol.on_error(self, msg, &e);
return Err(e);
}
}
}
}
}
impl<P: Protocol, C: CallbackSend<P>, const MAX_MSG_SIZE: usize> ReSendNonBlocking<P::SendT> for CltSender<P, C, MAX_MSG_SIZE> {
fn re_send(&mut self, msg: &P::SendT) -> Result<SendStatus, Error> {
self.msg_sender.send(msg)
}
}
impl<P: Protocol, C: CallbackSend<P>, const MAX_MSG_SIZE: usize> ConnectionId for CltSender<P, C, MAX_MSG_SIZE> {
#[inline(always)]
fn con_id(&self) -> &ConId {
&self.msg_sender.frm_writer.con_id
}
}
impl<P: Protocol, C: CallbackSend<P>, const MAX_MSG_SIZE: usize> ConnectionStatus for CltSender<P, C, MAX_MSG_SIZE> {
#[inline(always)]
fn is_connected(&self) -> bool {
self.protocol.is_connected()
}
}
impl<P: Protocol, C: CallbackSend<P>, const MAX_MSG_SIZE: usize> Display for CltSender<P, C, MAX_MSG_SIZE> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let recv_t = std::any::type_name::<P::RecvT>().split("::").last().unwrap_or("Unknown").replace('>', "");
let send_t = std::any::type_name::<P::SendT>().split("::").last().unwrap_or("Unknown").replace('>', "");
write!(f, "{}<{}, RecvT:{}, SendT:{}, {}>", asserted_short_name!("CltSender", Self), self.con_id(), recv_t, send_t, MAX_MSG_SIZE)
}
}
impl<P: Protocol, C: CallbackSend<P>, const MAX_MSG_SIZE: usize> Drop for CltSender<P, C, MAX_MSG_SIZE> {
fn drop(&mut self) {
self.on_disconnect()
}
}
#[derive(Debug)]
pub struct CltRecverRef<P: Protocol, C: CallbackRecvSend<P>, const MAX_MSG_SIZE: usize> {
con_id: ConId, clt_recver: Arc<spin::Mutex<CltRecver<P, C, MAX_MSG_SIZE>>>,
clt_sender: CltSenderRef<P, C, MAX_MSG_SIZE>,
protocol: Arc<P>,
}
impl<P: Protocol, C: CallbackRecvSend<P>, const MAX_MSG_SIZE: usize> RecvNonBlocking<P::RecvT> for CltRecverRef<P, C, MAX_MSG_SIZE> {
#[inline(always)]
fn recv(&mut self) -> Result<RecvStatus<<P as Messenger>::RecvT>, Error> {
use RecvStatus::Completed;
let res = self.clt_recver.lock().recv();
if let Ok(Completed(Some(ref msg))) = res {
self.protocol.send_reply(msg, &mut self.clt_sender)?;
}
res
}
#[inline(always)]
fn recv_busywait_timeout(&mut self, timeout: Duration) -> Result<RecvStatus<<P as Messenger>::RecvT>, Error> {
use RecvStatus::{Completed, WouldBlock};
let start = Instant::now();
loop {
let status = self.clt_recver.lock().recv()?;
match status {
Completed(Some(msg)) => {
self.protocol.send_reply(&msg, &mut self.clt_sender)?;
return Ok(Completed(Some(msg)));
}
Completed(None) => return Ok(Completed(None)),
WouldBlock => {
if start.elapsed() > timeout {
return Ok(WouldBlock);
}
}
}
}
}
#[inline(always)]
fn recv_busywait(&mut self) -> Result<Option<<P as Messenger>::RecvT>, Error> {
use RecvStatus::{Completed, WouldBlock};
loop {
let status = self.clt_recver.lock().recv()?; match status {
Completed(Some(msg)) => {
self.protocol.send_reply(&msg, &mut self.clt_sender)?;
return Ok(Some(msg));
}
Completed(None) => return Ok(None),
WouldBlock => continue,
}
}
}
}
impl<P: Protocol, C: CallbackRecvSend<P>, const MAX_MSG_SIZE: usize> ConnectionId for CltRecverRef<P, C, MAX_MSG_SIZE> {
#[inline(always)]
fn con_id(&self) -> &ConId {
&self.con_id
}
}
impl<P: Protocol, C: CallbackRecvSend<P>, const MAX_MSG_SIZE: usize> ConnectionStatus for CltRecverRef<P, C, MAX_MSG_SIZE> {
#[inline(always)]
fn is_connected(&self) -> bool {
self.protocol.is_connected()
}
}
impl<P: Protocol, C: CallbackRecvSend<P>, const MAX_MSG_SIZE: usize> PollRead for CltRecverRef<P, C, MAX_MSG_SIZE> {
fn on_readable_event(&mut self) -> Result<PollEventStatus, Error> {
use RecvStatus::*;
match self.recv()? {
Completed(Some(_)) => Ok(PollEventStatus::Completed),
WouldBlock => Ok(PollEventStatus::WouldBlock),
Completed(None) => Ok(PollEventStatus::Terminate),
}
}
}
impl<P: Protocol, C: CallbackRecvSend<P>, const MAX_MSG_SIZE: usize> PollAble for CltRecverRef<P, C, MAX_MSG_SIZE> {
fn register(&mut self, registry: &mio::Registry, token: mio::Token, interests: mio::Interest) -> Result<(), Error> {
let mut guard = self.clt_recver.lock();
registry.register(&mut guard.msg_recver.frm_reader.stream_reader, token, interests)
}
fn deregister(&mut self, registry: &mio::Registry) -> Result<(), Error> {
let mut guard = self.clt_recver.lock();
registry.deregister(&mut guard.msg_recver.frm_reader.stream_reader)
}
fn source(&mut self) -> Box<&mut dyn mio::event::Source> {
panic!("Invalid API usage. PollReadable::register and PollReadable::deregister are overridden and this call shall never be issued.")
}
}
impl<P: Protocol, C: CallbackRecvSend<P>, const MAX_MSG_SIZE: usize> Display for CltRecverRef<P, C, MAX_MSG_SIZE> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let recv_t = std::any::type_name::<P::RecvT>().split("::").last().unwrap_or("Unknown").replace('>', "");
let send_t = std::any::type_name::<P::SendT>().split("::").last().unwrap_or("Unknown").replace('>', "");
write!(f, "{}<{}, RecvT:{}, SendT:{}, {}>", asserted_short_name!("CltRecverRef", Self), self.con_id(), recv_t, send_t, MAX_MSG_SIZE)
}
}
impl<P: Protocol, C: CallbackRecvSend<P>, const MAX_MSG_SIZE: usize> Drop for CltRecverRef<P, C, MAX_MSG_SIZE> {
fn drop(&mut self) {
self.clt_recver.lock().msg_recver.frm_reader.shutdown(std::net::Shutdown::Both, "CltRecverRef::drop");
}
}
impl<P: Protocol, C: CallbackRecvSend<P>, const MAX_MSG_SIZE: usize> Clone for CltRecverRef<P, C, MAX_MSG_SIZE> {
fn clone(&self) -> Self {
Self {
con_id: self.con_id.clone(),
clt_recver: self.clt_recver.clone(),
clt_sender: self.clt_sender.clone(),
protocol: self.protocol.clone(),
}
}
}
#[derive(Debug)]
pub struct CltSenderRef<P: Protocol, C: CallbackSend<P>, const MAX_MSG_SIZE: usize> {
con_id: ConId, clt_sender: Arc<spin::Mutex<CltSender<P, C, MAX_MSG_SIZE>>>,
pub(crate) protocol: Arc<P>, }
impl<P: Protocol, C: CallbackSend<P>, const MAX_MSG_SIZE: usize> CltSenderRef<P, C, MAX_MSG_SIZE> {
pub(crate) fn send_heart_beat(&self) -> Result<SendStatus, Error> {
let mut guard = self.clt_sender.lock();
self.protocol.send_heart_beat(guard.deref_mut())
}
}
impl<P: Protocol, C: CallbackSend<P>, const MAX_MSG_SIZE: usize> SendNonBlocking<P::SendT> for CltSenderRef<P, C, MAX_MSG_SIZE> {
#[inline(always)]
fn send(&mut self, msg: &mut <P as Messenger>::SendT) -> Result<SendStatus, Error> {
self.clt_sender.lock().send(msg)
}
#[inline(always)]
fn send_busywait_timeout(&mut self, msg: &mut <P as Messenger>::SendT, timeout: Duration) -> Result<SendStatus, Error> {
use SendStatus::{Completed, WouldBlock};
let start = Instant::now();
loop {
let status = self.clt_sender.lock().send(msg)?; match status {
Completed => return Ok(Completed),
WouldBlock => {
if start.elapsed() > timeout {
return Ok(WouldBlock);
}
}
}
}
}
#[inline(always)]
fn send_busywait(&mut self, msg: &mut <P as Messenger>::SendT) -> Result<(), Error> {
use SendStatus::{Completed, WouldBlock};
loop {
let status = self.clt_sender.lock().send(msg)?; match status {
Completed => return Ok(()),
WouldBlock => continue,
}
}
}
}
impl<P: Protocol, C: CallbackSend<P>, const MAX_MSG_SIZE: usize> ConnectionId for CltSenderRef<P, C, MAX_MSG_SIZE> {
#[inline(always)]
fn con_id(&self) -> &ConId {
&self.con_id
}
}
impl<P: Protocol, C: CallbackSend<P>, const MAX_MSG_SIZE: usize> ConnectionStatus for CltSenderRef<P, C, MAX_MSG_SIZE> {
#[inline(always)]
fn is_connected(&self) -> bool {
self.clt_sender.lock().is_connected()
}
}
impl<P: Protocol, C: CallbackSend<P>, const MAX_MSG_SIZE: usize> Display for CltSenderRef<P, C, MAX_MSG_SIZE> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let recv_t = std::any::type_name::<P::RecvT>().split("::").last().unwrap_or("Unknown").replace('>', "");
let send_t = std::any::type_name::<P::SendT>().split("::").last().unwrap_or("Unknown").replace('>', "");
write!(f, "{}<{}, RecvT:{}, SendT:{}, {}>", asserted_short_name!("CltSenderRef", Self), self.con_id(), recv_t, send_t, MAX_MSG_SIZE)
}
}
impl<P: Protocol, C: CallbackSend<P>, const MAX_MSG_SIZE: usize> Drop for CltSenderRef<P, C, MAX_MSG_SIZE> {
fn drop(&mut self) {
self.clt_sender.lock().on_disconnect()
}
}
impl<P: Protocol, C: CallbackSend<P>, const MAX_MSG_SIZE: usize> Clone for CltSenderRef<P, C, MAX_MSG_SIZE> {
fn clone(&self) -> Self {
Self {
con_id: self.con_id.clone(),
clt_sender: self.clt_sender.clone(),
protocol: self.protocol.clone(),
}
}
}
#[derive(Debug)]
pub struct Clt<P: Protocol, C: CallbackRecvSend<P>, const MAX_MSG_SIZE: usize> {
clt_sender: CltSender<P, C, MAX_MSG_SIZE>, clt_recver: CltRecver<P, C, MAX_MSG_SIZE>,
}
impl<P: Protocol, C: CallbackRecvSend<P>, const MAX_MSG_SIZE: usize> Clt<P, C, MAX_MSG_SIZE> {
pub fn connect(addr: &str, timeout: Duration, retry_after: Duration, callback: Arc<C>, protocol: P, name: Option<&str>) -> Result<Self, Error> {
assert!(timeout > retry_after, "timeout: {:?}, retry_after: {:?}", timeout, retry_after);
let now = Instant::now();
let con_id = ConId::clt(name, None, addr);
while now.elapsed() < timeout {
match TcpStream::connect(addr) {
Err(e) => {
sleep(retry_after); if log_enabled!(log::Level::Debug) {
debug!("{} connection failed. e: {:?}", con_id, e);
}
continue;
}
Ok(stream) => {
let clt = Self::from_stream(stream, con_id, callback, protocol, None)?;
return Ok(clt);
}
}
}
let msg = format!("{:?} connect timeout: {:?}", con_id, timeout);
Err(Error::new(std::io::ErrorKind::TimedOut, msg))
}
pub(crate) fn from_stream(stream: TcpStream, con_id: ConId, callback: Arc<C>, protocol: P, acceptor_connection_gate: Option<RemoveConnectionBarrierOnDrop>) -> Result<Self, Error> {
let (msg_recver, msg_sender) = into_split_messenger::<P, MAX_MSG_SIZE>(con_id, stream);
let protocol = Arc::new(protocol);
let mut con = Self {
clt_recver: CltRecver::new(msg_recver, callback.clone(), protocol.clone(), acceptor_connection_gate.clone()),
clt_sender: CltSender::new(msg_sender, callback.clone(), protocol.clone(), acceptor_connection_gate),
};
protocol.on_connect(&mut con)?;
Ok(con)
}
pub fn into_split(self) -> (CltRecver<P, C, MAX_MSG_SIZE>, CltSender<P, C, MAX_MSG_SIZE>) {
(self.clt_recver, self.clt_sender)
}
pub fn into_split_ref(self) -> (CltRecverRef<P, C, MAX_MSG_SIZE>, CltSenderRef<P, C, MAX_MSG_SIZE>) {
let (recver, sender) = self.into_split();
let sender = CltSenderRef {
con_id: sender.con_id().to_owned(),
protocol: sender.protocol.clone(),
clt_sender: Arc::new(spin::Mutex::new(sender)),
};
let recver = CltRecverRef {
con_id: recver.con_id().to_owned(),
protocol: recver.protocol.clone(),
clt_recver: Arc::new(spin::Mutex::new(recver)),
clt_sender: sender.clone(),
};
match sender.protocol.conf_heart_beat_interval() {
Some(interval) => {
crate::connect::DEFAULT_HBEAT_HANDLER.schedule(sender.con_id().to_string().as_str(), interval, {
let sender = sender.clone();
move || match sender.send_heart_beat() {
Ok(SendStatus::Completed) => TimerTaskStatus::Completed,
Ok(SendStatus::WouldBlock) => TimerTaskStatus::RetryAfter(Duration::from_secs(0)),
Err(err) => {
warn!("{} Failed to send heart beat. Will no longer attempt to send. err:\n{}", sender.con_id(), err);
TimerTaskStatus::Terminate
}
}
});
}
None => {
#[cfg(debug_assertions)]
warn!(
"{}::conf_heart_beat_interval() is None, hence {}::send_heart_beat(..) will not be scheduled for this con_id: {}",
crate::prelude::short_type_name::<P>(),
crate::prelude::short_type_name::<P>(),
sender.con_id(),
);
}
}
(recver, sender)
}
pub fn into_sender_with_spawned_recver(self) -> CltSender<P, C, MAX_MSG_SIZE> {
let (recver, sender) = self.into_split();
crate::connect::DEFAULT_POLL_HANDLER.add_recver(recver.into());
sender
}
pub fn into_sender_with_spawned_recver_ref(self) -> CltSenderRef<P, C, MAX_MSG_SIZE> {
let (recver, sender) = self.into_split_ref();
crate::connect::DEFAULT_POLL_HANDLER.add_recver(recver.into());
sender
}
}
impl<P: Protocol, C: CallbackRecvSend<P>, const MAX_MSG_SIZE: usize> SendNonBlocking<P::SendT> for Clt<P, C, MAX_MSG_SIZE> {
#[inline(always)]
fn send(&mut self, msg: &mut <P as Messenger>::SendT) -> Result<SendStatus, Error> {
self.clt_sender.send(msg)
}
#[inline(always)]
fn send_busywait_timeout(&mut self, msg: &mut <P as Messenger>::SendT, timeout: Duration) -> Result<SendStatus, Error> {
self.clt_sender.send_busywait_timeout(msg, timeout)
}
#[inline(always)]
fn send_busywait(&mut self, msg: &mut <P as Messenger>::SendT) -> Result<(), Error> {
self.clt_sender.send_busywait(msg)
}
}
impl<P: Protocol, C: CallbackRecvSend<P>, const MAX_MSG_SIZE: usize> RecvNonBlocking<P::RecvT> for Clt<P, C, MAX_MSG_SIZE> {
#[inline(always)]
fn recv(&mut self) -> Result<RecvStatus<<P as Messenger>::RecvT>, Error> {
self.clt_recver.recv()
}
#[inline(always)]
fn recv_busywait_timeout(&mut self, timeout: Duration) -> Result<RecvStatus<<P as Messenger>::RecvT>, Error> {
self.clt_recver.recv_busywait_timeout(timeout)
}
#[inline(always)]
fn recv_busywait(&mut self) -> Result<Option<<P as Messenger>::RecvT>, Error> {
self.clt_recver.recv_busywait()
}
}
impl<P: Protocol, C: CallbackRecvSend<P>, const MAX_MSG_SIZE: usize> ReSendNonBlocking<P::SendT> for Clt<P, C, MAX_MSG_SIZE> {
fn re_send(&mut self, msg: &P::SendT) -> Result<SendStatus, Error> {
self.clt_sender.re_send(msg)
}
}
impl<P: Protocol, C: CallbackRecvSend<P>, const MAX_MSG_SIZE: usize> ConnectionId for Clt<P, C, MAX_MSG_SIZE> {
#[inline(always)]
fn con_id(&self) -> &ConId {
&self.clt_recver.msg_recver.frm_reader.con_id
}
}
impl<P: Protocol, C: CallbackRecvSend<P>, const MAX_MSG_SIZE: usize> ConnectionStatus for Clt<P, C, MAX_MSG_SIZE> {
#[inline(always)]
fn is_connected(&self) -> bool {
self.clt_recver.is_connected()
}
}
impl<P: Protocol, C: CallbackRecvSend<P>, const MAX_MSG_SIZE: usize> Display for Clt<P, C, MAX_MSG_SIZE> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}<{}, {}>", asserted_short_name!("Clt", Self), self.clt_recver, self.clt_sender)
}
}
#[cfg(test)]
#[cfg(feature = "unittest")]
mod test {
use super::Clt;
use crate::unittest::setup::protocol::CltTestProtocolAuthAndHbeat;
use links_core::callbacks::logger::LoggerCallback;
use links_core::unittest::setup::{self, framer::TEST_MSG_FRAME_SIZE};
#[test]
fn test_clt_not_connected() {
setup::log::configure();
let addr = setup::net::rand_avail_addr_port();
let callback = LoggerCallback::new_ref();
let protocol = CltTestProtocolAuthAndHbeat::default();
let res = Clt::<_, _, TEST_MSG_FRAME_SIZE>::connect(addr, setup::net::default_connect_timeout(), setup::net::default_connect_retry_after(), callback, protocol, Some("unittest"));
assert!(res.is_err());
}
}