use std::{any::type_name, fmt::Display, io::Error, net::TcpStream};
use links_core::prelude::{ConId, Messenger};
use crate::prelude::{FrameReader, FrameWriter, RecvMsg, SendMsgNonMut};
#[derive(Debug)]
pub struct MessageRecver<M: Messenger, const MAX_MSG_SIZE: usize> {
pub(crate) frm_reader: FrameReader<M, MAX_MSG_SIZE>,
phantom: std::marker::PhantomData<M>,
}
impl<M: Messenger, const MAX_MSG_SIZE: usize> MessageRecver<M, MAX_MSG_SIZE> {
pub fn new(con_id: ConId, stream: TcpStream) -> Self {
Self {
frm_reader: FrameReader::<M, MAX_MSG_SIZE>::new(con_id, stream),
phantom: std::marker::PhantomData,
}
}
}
impl<M: Messenger, const MAX_MSG_SIZE: usize> RecvMsg<M> for MessageRecver<M, MAX_MSG_SIZE> {
#[inline(always)]
fn recv(&mut self) -> Result<Option<M::RecvT>, Error> {
let opt_bytes = self.frm_reader.read_frame()?;
match opt_bytes {
Some(frame) => {
let msg = M::deserialize(&frame)?;
Ok(Some(msg))
}
None => Ok(None),
}
}
}
impl<M: Messenger, const MAX_MSG_SIZE: usize> Display for MessageRecver<M, MAX_MSG_SIZE> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let name = type_name::<M>().split("::").last().unwrap_or("Unknown");
write!(f, "{:?} MessageRecver<{}, {}>", self.frm_reader.con_id, name, MAX_MSG_SIZE)
}
}
#[derive(Debug)]
pub struct MessageSender<M: Messenger, const MAX_MSG_SIZE: usize> {
pub(crate) frm_writer: FrameWriter,
phantom: std::marker::PhantomData<M>,
}
impl<M: Messenger, const MAX_MSG_SIZE: usize> MessageSender<M, MAX_MSG_SIZE> {
pub fn new(con_id: ConId, stream: TcpStream) -> Self {
Self {
frm_writer: FrameWriter::new(con_id, stream),
phantom: std::marker::PhantomData,
}
}
}
impl<M: Messenger, const MAX_MSG_SIZE: usize> SendMsgNonMut<M> for MessageSender<M, MAX_MSG_SIZE> {
#[inline(always)]
fn send(&mut self, msg: &<M as Messenger>::SendT) -> Result<(), Error> {
let (bytes, size) = M::serialize::<MAX_MSG_SIZE>(msg)?;
self.frm_writer.write_frame(&bytes[..size])?;
Ok(())
}
}
impl<M: Messenger, const MAX_MSG_SIZE: usize> Display for MessageSender<M, MAX_MSG_SIZE> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let messenger_name = type_name::<M>().split("::").last().unwrap_or("Unknown");
write!(f, "{} MessageSender<{}, {}>", self.frm_writer.con_id, messenger_name, MAX_MSG_SIZE)
}
}
pub type MessageProcessor<M, const MAX_MSG_SIZE: usize> = (MessageRecver<M, MAX_MSG_SIZE>, MessageSender<M, MAX_MSG_SIZE>);
pub fn into_split_messenger<M: Messenger, const MAX_MSG_SIZE: usize>(mut con_id: ConId, stream: TcpStream) -> MessageProcessor<M, MAX_MSG_SIZE> {
con_id.set_local(stream.local_addr().unwrap());
con_id.set_peer(stream.peer_addr().unwrap());
let (reader, writer) = (stream.try_clone().expect("Failed to try_clone TcpStream for MessageRecver"), stream);
(MessageRecver::<M, MAX_MSG_SIZE>::new(con_id.clone(), reader), MessageSender::<M, MAX_MSG_SIZE>::new(con_id, writer))
}
#[cfg(test)]
#[cfg(feature = "unittest")]
mod test {
use crate::prelude::*;
use links_core::{
assert_error_kind_on_target_family, fmt_num,
prelude::ConId,
unittest::setup::{
self,
framer::{CltTestMessenger, SvcTestMessenger},
model::{CltTestMsg, CltTestMsgDebug, SvcTestMsg, SvcTestMsgDebug, TEST_MSG_FRAME_SIZE},
},
};
use log::info;
use rand::Rng;
use std::{
net::{TcpListener, TcpStream},
thread::{sleep, Builder},
time::{Duration, Instant},
};
#[test]
fn test_messenger() {
setup::log::configure_level(log::LevelFilter::Info);
let addr = setup::net::rand_avail_addr_port();
const WRITE_N_TIMES: usize = 100_000;
let svc = Builder::new()
.name("Thread-Svc".to_owned())
.spawn(move || {
let inp_svc_msg = SvcTestMsg::Dbg(SvcTestMsgDebug::new(b"Hello Frm Server Msg"));
let (mut svc_msg_sent_count, mut svc_msg_recv_count) = (0_usize, 0_usize);
let listener = TcpListener::bind(addr).unwrap();
let (stream, _) = listener.accept().unwrap();
let (mut svc_recver, mut svc_sender) = into_split_messenger::<SvcTestMessenger, TEST_MSG_FRAME_SIZE>(ConId::svc(Some("unittest"), addr, None), stream);
info!("{} connected", svc_sender);
while let Some(_) = svc_recver.recv().unwrap() {
svc_msg_recv_count += 1;
svc_sender.send(&inp_svc_msg).unwrap();
svc_msg_sent_count += 1;
}
info!("{} Connection Closed by Client", svc_recver);
(svc_msg_sent_count, svc_msg_recv_count)
})
.unwrap();
sleep(Duration::from_millis(100));
let inp_clt_msg = CltTestMsg::Dbg(CltTestMsgDebug::new(b"Hello Frm Client Msg"));
let (mut clt_msg_sent_count, mut clt_msg_recv_count) = (0, 0);
let stream = TcpStream::connect(addr).unwrap();
let (mut clt_recver, mut clt_sender) = into_split_messenger::<CltTestMessenger, TEST_MSG_FRAME_SIZE>(ConId::clt(Some("unittest"), None, addr), stream);
info!("{} connected", clt_sender);
let start = Instant::now();
for _ in 0..WRITE_N_TIMES {
clt_sender.send(&inp_clt_msg).unwrap();
clt_msg_sent_count += 1;
let _x = clt_recver.recv().unwrap().unwrap();
clt_msg_recv_count += 1;
}
let elapsed = start.elapsed();
if rand::thread_rng().gen_range(1..=2) % 2 == 0 {
info!("dropping clt_sender");
drop(clt_sender);
let opt = clt_recver.recv().unwrap();
info!("clt_recver.recv(): {:?}", opt);
assert_eq!(opt, None);
} else {
info!("dropping clt_recver");
drop(clt_recver);
let err = clt_sender.send(&inp_clt_msg).unwrap_err();
info!("clt_sender.send(): {}", err);
assert_error_kind_on_target_family!(err, std::io::ErrorKind::BrokenPipe);
}
let (svc_msg_sent_count, svc_msg_recv_count) = svc.join().unwrap();
info!("clt_msg_sent_count: {}, clt_msg_recv_count: {}", fmt_num!(clt_msg_sent_count), fmt_num!(clt_msg_recv_count));
info!("svc_msg_sent_count: {}, svc_msg_recv_count: {}", fmt_num!(svc_msg_sent_count), fmt_num!(svc_msg_recv_count));
info!("per round trip elapsed: {:?}, total elapsed: {:?} ", elapsed / WRITE_N_TIMES as u32, elapsed);
assert_eq!(clt_msg_sent_count, svc_msg_sent_count);
assert_eq!(clt_msg_recv_count, svc_msg_recv_count);
assert_eq!(clt_msg_sent_count, WRITE_N_TIMES);
}
}