use crate::prelude::*;
use log::{error, info};
use secc::*;
use std::collections::HashMap;
use std::io::prelude::*;
use std::io::{BufReader, BufWriter};
use std::net::{SocketAddr, TcpListener, TcpStream};
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, RwLock};
use std::sync::{Condvar, Mutex};
use std::thread;
use std::thread::JoinHandle;
use std::time::Duration;
use uuid::Uuid;
struct ConnectionData {
pub system_uuid: Uuid,
pub address: SocketAddr,
pub sender: SeccSender<WireMessage>,
pub receiver: SeccReceiver<WireMessage>,
pub tx_handle: JoinHandle<()>,
pub rx_handle: JoinHandle<()>,
}
struct TcpClusterMgrData {
listen_address: SocketAddr,
system: ActorSystem,
listener: RwLock<Option<JoinHandle<()>>>,
connections: RwLock<HashMap<Uuid, ConnectionData>>,
running: AtomicBool,
}
#[derive(Clone)]
pub struct TcpClusterMgr {
data: Arc<TcpClusterMgrData>,
}
impl TcpClusterMgr {
pub fn create(system: &ActorSystem, address: SocketAddr) -> TcpClusterMgr {
let result = TcpClusterMgr {
data: Arc::new(TcpClusterMgrData {
listen_address: address,
system: system.clone(),
listener: RwLock::new(None),
connections: RwLock::new(HashMap::new()),
running: AtomicBool::new(true),
}),
};
{
let pair = Arc::new((Mutex::new(false), Condvar::new()));
let (mutex, condvar) = &*pair;
let mut started = mutex.lock().unwrap();
let join_handle = result.start_tcp_listener(pair.clone());
while !*started {
started = condvar.wait(started).unwrap();
}
let mut handle = result.data.listener.write().unwrap();
*handle = Some(join_handle);
}
result
}
fn start_tcp_listener(&self, pair: Arc<(Mutex<bool>, Condvar)>) -> JoinHandle<()> {
let system = self.data.system.clone();
let address = self.data.listen_address.clone();
let manager = self.clone();
thread::spawn(move || {
system.init_current();
let sys_uuid = system.uuid();
let listener = TcpListener::bind(address).unwrap();
info!("{}: Listening for connections on {}.", sys_uuid, address);
let (mutex, condvar) = &*pair;
let mut started = mutex.lock().unwrap();
*started = true;
condvar.notify_all();
drop(started);
while manager.data.running.load(Ordering::Relaxed) {
match listener.accept() {
Ok((stream, socket_address)) => {
info!(
"{}: Accepting connection from: {}.",
sys_uuid, socket_address
);
manager.start_tcp_threads(stream, socket_address);
}
Err(e) => {
error!("couldn't get client: {:?}", e);
}
}
}
})
}
pub fn connect(&self, address: SocketAddr, timeout: Duration) -> std::io::Result<()> {
let stream = TcpStream::connect_timeout(&address, timeout)?;
Ok(self.start_tcp_threads(stream, address))
}
fn start_tcp_threads(&self, stream: TcpStream, address: SocketAddr) {
let arc_stream = Arc::new(stream);
let (sender, receiver) = secc::create::<WireMessage>(32, Duration::from_millis(10));
let system_uuid = self.data.system.connect(&sender, &receiver);
let tx_handle = self.start_tx_thread(arc_stream.clone(), receiver.clone());
let rx_handle = self.start_rx_thread(arc_stream.clone(), sender.clone());
let data = ConnectionData {
system_uuid,
address,
receiver,
sender,
tx_handle,
rx_handle,
};
info!(
"{:?}: Connected to {:?}@{:?}",
self.data.system.uuid(),
system_uuid,
address
);
let mut connections = self.data.connections.write().unwrap();
connections.insert(data.system_uuid, data);
}
fn start_tx_thread(
&self,
stream: Arc<TcpStream>,
receiver: SeccReceiver<WireMessage>,
) -> JoinHandle<()> {
let system = self.data.system.clone();
let manager = self.clone();
thread::spawn(move || {
system.init_current();
let mut writer = BufWriter::new(&*stream);
while manager.data.running.load(Ordering::Relaxed) {
if let Ok(message) = receiver.receive_await_timeout(Duration::from_millis(10)) {
bincode::serialize_into(&mut writer, &message).unwrap();
writer.flush().unwrap();
}
}
})
}
fn start_rx_thread(
&self,
stream: Arc<TcpStream>,
sender: SeccSender<WireMessage>,
) -> JoinHandle<()> {
let system = self.data.system.clone();
let manager = self.clone();
thread::spawn(move || {
system.init_current();
let mut reader = BufReader::new(&*stream);
while manager.data.running.load(Ordering::Relaxed) {
let msg: WireMessage = bincode::deserialize_from(&mut reader).unwrap();
sender.send(msg).unwrap();
}
})
}
}
#[cfg(test)]
mod tests {
use crate::tests::*;
use super::*;
#[test]
fn test_tcp_remote_connect() {
init_test_log();
let socket_addr1 = SocketAddr::from(([127, 0, 0, 1], 7717));
let system1 = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
let cluster_mgr1 = TcpClusterMgr::create(&system1, socket_addr1);
let socket_addr2 = SocketAddr::from(([127, 0, 0, 1], 7727));
let system2 = ActorSystem::create(ActorSystemConfig::default().thread_pool_size(2));
let _cluster_mgr2 = TcpClusterMgr::create(&system2, socket_addr2);
cluster_mgr1
.connect(socket_addr2, Duration::from_millis(2000))
.unwrap();
}
}