#[cfg(test)]
mod tests {
use core_affinity::CoreId;
use local_sync::mpsc::{unbounded::channel, unbounded::Tx as LocalSender};
use std::cell::RefCell;
use std::collections::VecDeque;
use std::net::SocketAddr;
use std::rc::Rc;
use std::str::FromStr;
use std::time::Instant;
use retty::bootstrap::{
BootstrapTcpClient, BootstrapTcpServer, BootstrapUdpClient, BootstrapUdpServer,
};
use retty::channel::{Context, Handler, Pipeline};
use retty::codec::{
byte_to_message_decoder::{
LineBasedFrameDecoder, TaggedByteToMessageCodec, TerminatorType,
},
string_codec::TaggedStringCodec,
};
use retty::executor::{spawn_local, yield_local, LocalExecutorBuilder};
use retty::transport::{
EcnCodepoint, Protocol, TaggedBytesMut, TaggedString, TransportContext,
};
struct EchoHandler {
is_server: bool,
tx: Rc<RefCell<Option<LocalSender<()>>>>,
count: Rc<RefCell<usize>>,
check_ecn: bool,
transmits: VecDeque<TaggedString>,
}
impl EchoHandler {
fn new(
is_server: bool,
tx: Rc<RefCell<Option<LocalSender<()>>>>,
count: Rc<RefCell<usize>>,
check_ecn: bool,
) -> Self {
EchoHandler {
is_server,
tx,
count,
check_ecn,
transmits: VecDeque::new(),
}
}
}
impl Handler for EchoHandler {
type Rin = TaggedString;
type Rout = Self::Rin;
type Win = TaggedString;
type Wout = Self::Win;
fn name(&self) -> &str {
"EchoHandler"
}
fn handle_read(
&mut self,
_ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>,
msg: Self::Rin,
) {
{
let mut count = self.count.borrow_mut();
println!(
"is_server = {}, count = {} msg = {} with ECN = {:?}",
self.is_server, *count, msg.message, msg.transport.ecn
);
*count += 1;
if self.check_ecn {
assert_eq!(Some(EcnCodepoint::Ect1), msg.transport.ecn);
}
}
if self.is_server {
self.transmits.push_back(TaggedString {
now: Instant::now(),
transport: msg.transport,
message: format!("{}\r\n", msg.message),
});
}
if msg.message == "bye" {
let mut tx = self.tx.borrow_mut();
if let Some(tx) = tx.take() {
let _ = tx.send(());
}
}
}
fn poll_timeout(
&mut self,
_ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>,
_eto: &mut Instant,
) {
}
fn poll_write(
&mut self,
ctx: &Context<Self::Rin, Self::Rout, Self::Win, Self::Wout>,
) -> Option<Self::Wout> {
if let Some(msg) = ctx.fire_poll_write() {
self.transmits.push_back(msg);
}
self.transmits.pop_front()
}
}
#[test]
fn test_echo_udp() {
let handler = LocalExecutorBuilder::new()
.name("ech_udp_thread")
.core_id(CoreId { id: 0 })
.spawn(|| async move {
const ITER: usize = 100;
let (tx, mut rx) = channel();
let server_count = Rc::new(RefCell::new(0));
let server_count_clone = server_count.clone();
let (server_done_tx, mut server_done_rx) = channel();
let server_done_tx = Rc::new(RefCell::new(Some(server_done_tx)));
let mut server = BootstrapUdpServer::new();
server.pipeline(Box::new(move || {
let pipeline: Pipeline<TaggedBytesMut, TaggedString> = Pipeline::new();
let line_based_frame_decoder_handler = TaggedByteToMessageCodec::new(Box::new(
LineBasedFrameDecoder::new(8192, true, TerminatorType::BOTH),
));
let string_codec_handler = TaggedStringCodec::new();
let echo_handler = EchoHandler::new(
true,
Rc::clone(&server_done_tx),
Rc::clone(&server_count_clone),
#[cfg(not(target_os = "windows"))]
true,
#[cfg(target_os = "windows")]
false,
);
pipeline.add_back(line_based_frame_decoder_handler);
pipeline.add_back(string_codec_handler);
pipeline.add_back(echo_handler);
pipeline.finalize()
}));
let server_addr = server.bind("127.0.0.1:0").await.unwrap();
spawn_local(async move {
let client_count = Rc::new(RefCell::new(0));
let client_count_clone = client_count.clone();
let (client_done_tx, mut client_done_rx) = channel();
let client_done_tx = Rc::new(RefCell::new(Some(client_done_tx)));
let mut client = BootstrapUdpClient::new();
client.pipeline(Box::new(move || {
let pipeline: Pipeline<TaggedBytesMut, TaggedString> = Pipeline::new();
let line_based_frame_decoder_handler = TaggedByteToMessageCodec::new(
Box::new(LineBasedFrameDecoder::new(8192, true, TerminatorType::BOTH)),
);
let string_codec_handler = TaggedStringCodec::new();
let echo_handler = EchoHandler::new(
false,
Rc::clone(&client_done_tx),
Rc::clone(&client_count_clone),
#[cfg(not(target_os = "windows"))]
true,
#[cfg(target_os = "windows")]
false,
);
pipeline.add_back(line_based_frame_decoder_handler);
pipeline.add_back(string_codec_handler);
pipeline.add_back(echo_handler);
pipeline.finalize()
}));
let client_addr = client.bind("127.0.0.1:0").await.unwrap();
let pipeline = client.connect(server_addr).await.unwrap();
for i in 0..ITER {
pipeline.write(TaggedString {
now: Instant::now(),
transport: TransportContext {
local_addr: client_addr,
peer_addr: server_addr,
ecn: EcnCodepoint::from_bits(1),
protocol: Protocol::UDP,
},
message: format!("{}\r\n", i),
});
yield_local();
}
pipeline.write(TaggedString {
now: Instant::now(),
transport: TransportContext {
local_addr: client_addr,
peer_addr: server_addr,
ecn: EcnCodepoint::from_bits(1),
protocol: Protocol::UDP,
},
message: format!("bye\r\n"),
});
yield_local();
assert!(client_done_rx.recv().await.is_some());
assert!(tx.send(client_count).is_ok());
client.graceful_stop().await;
})
.detach();
let client_count = rx.recv().await.unwrap();
assert!(server_done_rx.recv().await.is_some());
let (client_count, server_count) = (client_count.borrow(), server_count.borrow());
assert_eq!(*client_count, *server_count);
assert_eq!(ITER + 1, *client_count);
server.graceful_stop().await;
})
.unwrap();
handler.join().unwrap();
}
#[cfg(not(target_os = "windows"))]
#[test]
fn test_echo_tcp() {
LocalExecutorBuilder::default().run(async {
const ITER: usize = 100;
let (tx, mut rx) = channel();
let server_count = Rc::new(RefCell::new(0));
let server_count_clone = server_count.clone();
let (server_done_tx, mut server_done_rx) = channel();
let server_done_tx = Rc::new(RefCell::new(Some(server_done_tx)));
let mut server = BootstrapTcpServer::new();
server.pipeline(Box::new(move || {
let pipeline: Pipeline<TaggedBytesMut, TaggedString> = Pipeline::new();
let line_based_frame_decoder_handler = TaggedByteToMessageCodec::new(Box::new(
LineBasedFrameDecoder::new(8192, true, TerminatorType::BOTH),
));
let string_codec_handler = TaggedStringCodec::new();
let echo_handler = EchoHandler::new(
true,
Rc::clone(&server_done_tx),
Rc::clone(&server_count_clone),
false,
);
pipeline.add_back(line_based_frame_decoder_handler);
pipeline.add_back(string_codec_handler);
pipeline.add_back(echo_handler);
pipeline.finalize()
}));
let server_addr = server.bind("127.0.0.1:0").await.unwrap();
spawn_local(async move {
let client_count = Rc::new(RefCell::new(0));
let client_count_clone = client_count.clone();
let (client_done_tx, mut client_done_rx) = channel();
let client_done_tx = Rc::new(RefCell::new(Some(client_done_tx)));
let mut client = BootstrapTcpClient::new();
client.pipeline(Box::new(move || {
let pipeline: Pipeline<TaggedBytesMut, TaggedString> = Pipeline::new();
let line_based_frame_decoder_handler = TaggedByteToMessageCodec::new(Box::new(
LineBasedFrameDecoder::new(8192, true, TerminatorType::BOTH),
));
let string_codec_handler = TaggedStringCodec::new();
let echo_handler = EchoHandler::new(
false,
Rc::clone(&client_done_tx),
Rc::clone(&client_count_clone),
false,
);
pipeline.add_back(line_based_frame_decoder_handler);
pipeline.add_back(string_codec_handler);
pipeline.add_back(echo_handler);
pipeline.finalize()
}));
let client_addr = SocketAddr::from_str("127.0.0.1:0").unwrap();
let pipeline = client.connect(server_addr).await.unwrap();
for i in 0..ITER {
pipeline.write(TaggedString {
now: Instant::now(),
transport: TransportContext {
local_addr: client_addr,
peer_addr: server_addr,
ecn: None,
protocol: Protocol::TCP,
},
message: format!("{}\r\n", i),
});
yield_local();
}
pipeline.write(TaggedString {
now: Instant::now(),
transport: TransportContext {
local_addr: client_addr,
peer_addr: server_addr,
ecn: None,
protocol: Protocol::TCP,
},
message: format!("bye\r\n"),
});
yield_local();
assert!(client_done_rx.recv().await.is_some());
assert!(tx.send(client_count).is_ok());
client.graceful_stop().await;
})
.detach();
let client_count = rx.recv().await.unwrap();
assert!(server_done_rx.recv().await.is_some());
let (client_count, server_count) = (client_count.borrow(), server_count.borrow());
assert_eq!(*client_count, *server_count);
assert_eq!(ITER + 1, *client_count);
server.graceful_stop().await;
});
}
}