use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::Duration;
use crate::channel::{Receiver, RecvTimeoutError};
use crate::network::Network;
const TIMEOUT_SEC: u64 = 2;
#[derive(Clone, Debug, PartialEq)]
pub struct SendRequest {
recipient: String,
payload: Vec<u8>,
}
impl SendRequest {
pub fn new(recipient: String, payload: Vec<u8>) -> Self {
SendRequest { recipient, payload }
}
pub fn recipient(&self) -> &str {
&self.recipient
}
pub fn payload(&self) -> &[u8] {
&self.payload
}
}
pub struct NetworkMessageSender {
rc: Box<dyn Receiver<SendRequest>>,
network: Network,
running: Arc<AtomicBool>,
}
impl NetworkMessageSender {
pub fn new(
rc: Box<dyn Receiver<SendRequest>>,
network: Network,
running: Arc<AtomicBool>,
) -> Self {
NetworkMessageSender {
rc,
network,
running,
}
}
pub fn run(&self) -> Result<(), NetworkMessageSenderError> {
let timeout = Duration::from_secs(TIMEOUT_SEC);
while self.running.load(Ordering::SeqCst) {
let send_request = match self.rc.recv_timeout(timeout) {
Ok(send_request) => send_request,
Err(RecvTimeoutError::Timeout) => continue,
Err(RecvTimeoutError::Disconnected) => {
error!("Received Disconnected Error from receiver");
return Err(NetworkMessageSenderError::RecvTimeoutError(String::from(
"Received Disconnected Error from receiver",
)));
}
};
match self
.network
.send(send_request.recipient(), send_request.payload())
{
Ok(_) => (),
Err(err) => warn!("Unable to send message: {:?}", err),
};
}
while let Ok(send_request) = self.rc.try_recv() {
match self
.network
.send(send_request.recipient(), send_request.payload())
{
Ok(_) => (),
Err(err) => warn!("Unable to send message: {:?}", err),
};
}
Ok(())
}
}
#[derive(Debug)]
pub enum NetworkMessageSenderError {
RecvTimeoutError(String),
}
#[cfg(test)]
mod tests {
use crate::channel::{Receiver, Sender};
use crossbeam_channel;
use std::sync::mpsc;
use std::thread;
use super::*;
use crate::mesh::Mesh;
use crate::network::Network;
use crate::transport::socket::TcpTransport;
use crate::transport::Transport;
fn test_network_message_sender(
sender: Box<dyn Sender<SendRequest>>,
receiver: Box<dyn Receiver<SendRequest>>,
) {
let mut transport = TcpTransport::default();
let mut listener = transport.listen("127.0.0.1:0").unwrap();
let endpoint = listener.endpoint();
let mesh1 = Mesh::new(1, 1);
let network1 = Network::new(mesh1.clone(), 0).unwrap();
let running = Arc::new(AtomicBool::new(true));
let network_message_sender = NetworkMessageSender::new(receiver, network1.clone(), running);
thread::spawn(move || {
let mesh2 = Mesh::new(1, 1);
let network2 = Network::new(mesh2.clone(), 0).unwrap();
let connection = listener.accept().unwrap();
network2.add_peer("ABC".to_string(), connection).unwrap();
let network_message = network2.recv().unwrap();
assert_eq!(network_message.peer_id(), "ABC".to_string());
assert_eq!(
network_message.payload().to_vec(),
b"FromNetworkMessageSender".to_vec()
);
});
let connection = transport.connect(&endpoint).unwrap();
network1.add_peer("123".to_string(), connection).unwrap();
thread::spawn(move || network_message_sender.run());
let send_request =
SendRequest::new("123".to_string(), b"FromNetworkMessageSender".to_vec());
sender.send(send_request).unwrap();
}
fn test_network_message_sender_rapid_fire(
sender: Box<dyn Sender<SendRequest>>,
receiver: Box<dyn Receiver<SendRequest>>,
) {
let mut transport = TcpTransport::default();
let mut listener = transport.listen("127.0.0.1:0").unwrap();
let endpoint = listener.endpoint();
let mesh1 = Mesh::new(5, 5);
let network1 = Network::new(mesh1.clone(), 0).unwrap();
let running = Arc::new(AtomicBool::new(true));
let network_message_sender = NetworkMessageSender::new(receiver, network1.clone(), running);
thread::spawn(move || {
let mesh2 = Mesh::new(5, 5);
let network2 = Network::new(mesh2.clone(), 0).unwrap();
let connection = listener.accept().unwrap();
network2.add_peer("ABC".to_string(), connection).unwrap();
for _ in 0..100 {
let network_message = network2.recv().unwrap();
assert_eq!(network_message.peer_id(), "ABC".to_string());
assert_eq!(
network_message.payload().to_vec(),
b"FromNetworkMessageSender".to_vec()
);
}
});
let connection = transport.connect(&endpoint).unwrap();
network1.add_peer("123".to_string(), connection).unwrap();
thread::spawn(move || network_message_sender.run());
let send_request =
SendRequest::new("123".to_string(), b"FromNetworkMessageSender".to_vec());
for _ in 0..100 {
sender.send(send_request.clone()).unwrap();
}
}
#[test]
fn test_receiver_crossbeam() {
let (send, recv) = crossbeam_channel::bounded(5);
let send_box = Box::new(send);
let send_clone = send_box.clone();
test_network_message_sender(send_clone, Box::new(recv));
}
#[test]
fn test_receiver_mpsc() {
let (send, recv) = mpsc::channel();
test_network_message_sender(Box::new(send), Box::new(recv));
}
#[test]
fn test_receiver_crossbeam_rapid_fire() {
let (send, recv) = crossbeam_channel::bounded(5);
test_network_message_sender_rapid_fire(Box::new(send), Box::new(recv));
}
#[test]
fn test_receiver_mpsc_rapid_fire() {
let (send, recv) = mpsc::channel();
test_network_message_sender_rapid_fire(Box::new(send), Box::new(recv));
}
}