use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
use std::net::{IpAddr, SocketAddr, TcpListener, TcpStream};
use std::time::{Instant, Duration};
use log::{error, info, debug};
use serde::{Serialize, Deserialize};
use threadpool::ThreadPool;
use crate::nc_error::NCError;
use crate::nc_node::NCNodeMessage;
use crate::nc_config::NCConfiguration;
use crate::nc_node_info::{NodeID, NCNodeList};
use crate::nc_util::{nc_receive_data, nc_send_data, nc_send_data2};
#[derive(Debug, Serialize, Deserialize)]
pub(crate) enum NCServerMessage {
InitialData(NodeID, Option<Vec<u8>>),
JobStatus(NCJobStatus),
}
#[derive(Debug, Serialize, Deserialize, PartialEq)]
pub enum NCJobStatus {
Unfinished(Vec<u8>),
Waiting,
Finished,
}
pub trait NCServer {
fn initial_data(&mut self) -> Result<Option<Vec<u8>>, NCError> {
Ok(None)
}
fn prepare_data_for_node(&mut self, node_id: NodeID) -> Result<NCJobStatus, NCError>;
fn process_data_from_node(&mut self, node_id: NodeID, data: &[u8]) -> Result<(), NCError>;
fn heartbeat_timeout(&mut self, nodes: Vec<NodeID>);
fn finish_job(&mut self);
}
pub struct NCServerStarter {
config: NCConfiguration,
}
impl NCServerStarter {
pub fn new(config: NCConfiguration) -> Self {
debug!("NCServerStarter::new()");
NCServerStarter{ config }
}
pub fn start<T: NCServer + Send + 'static>(&mut self, nc_server: T) -> Result<(), NCError> {
debug!("NCServerStarter::new()");
let time_start = Instant::now();
let server_process = ServerProcess::new(&self.config, nc_server);
let server_heartbeat = ServerHeartbeat::new(&self.config);
let thread_pool = ThreadPool::new((self.config.pool_size + 1) as usize);
self.start_heartbeat_thread(&thread_pool, server_heartbeat);
self.start_main_loop(&thread_pool, server_process);
let time_taken = (Instant::now() - time_start).as_secs_f64();
info!("Time taken: {} s, {} min, {} h", time_taken, time_taken / 60.0, time_taken / (60.0 * 60.0));
thread_pool.join();
Ok(())
}
fn start_heartbeat_thread(&self, thread_pool: &ThreadPool, server_heartbeat: ServerHeartbeat) {
debug!("NCServerStarter::start_heartbeat_thread()");
thread_pool.execute(move || {
loop {
server_heartbeat.sleep();
if let Err(e) = server_heartbeat.send_check_heartbeat_message() {
error!("Error in start_heartbeat_thread(), couldn't send CheckHeartbeat message: {}", e);
break
}
}
debug!("Exit start_heartbeat_thread() main loop");
});
}
fn start_main_loop<T: NCServer + Send + 'static>(&self, thread_pool: &ThreadPool, server_process: ServerProcess<T>) {
debug!("NCServerStarter::start_main_loop()");
let ip_addr: IpAddr = "0.0.0.0".parse().unwrap(); let socket_addr = SocketAddr::new(ip_addr, server_process.port);
let listener = TcpListener::bind(socket_addr).unwrap();
let job_done = server_process.clone_job_done();
let server_process = Arc::new(server_process);
loop {
match listener.accept() {
Ok((stream, addr)) => {
debug!("Connection from node: {}", addr);
self.start_node_thread(thread_pool, stream, server_process.clone());
}
Err(e) => {
error!("IO error while accepting node connections: {}", e);
}
}
if job_done.load(Ordering::Relaxed) {
break
}
}
info!("Job is done, will call NCServer::finish_job()");
server_process.nc_server.lock().unwrap().finish_job();
}
fn start_node_thread<T: NCServer + Send + 'static>(&self, thread_pool: &ThreadPool, stream: TcpStream, server_process: Arc<ServerProcess<T>>) {
debug!("NCServerStarter::start_node_thread()");
thread_pool.execute(move || {
if let Err(e) = server_process.handle_node(stream) {
error!("Error in handle_node(): {}", e);
}
});
}
}
struct ServerHeartbeat {
server_socket: SocketAddr,
duration: Duration,
}
impl ServerHeartbeat {
fn new(config: &NCConfiguration) -> Self {
debug!("ServerHeartbeat::new()");
let ip_addr: IpAddr = "127.0.0.1".parse().unwrap();
let server_socket = SocketAddr::new(ip_addr, config.port);
let duration = Duration::from_secs(2 * config.heartbeat);
ServerHeartbeat{
server_socket,
duration,
}
}
fn sleep(&self) {
debug!("ServerHeartbeat::sleep()");
thread::sleep(self.duration);
}
fn send_check_heartbeat_message(&self) -> Result<(), NCError> {
debug!("ServerHeartbeat::send_check_heartbeat_message()");
nc_send_data(&NCNodeMessage::CheckHeartbeat, &self.server_socket)
}
}
struct ServerProcess<T> {
port: u16,
heartbeat: u64,
nc_server: Mutex<T>,
node_list: Mutex<NCNodeList>,
job_done: Arc<AtomicBool>,
}
impl<T: NCServer> ServerProcess<T> {
fn new(config: &NCConfiguration, nc_server: T) -> Self {
debug!("ServerProcess::new()");
ServerProcess{
port: config.port,
heartbeat: config.heartbeat,
nc_server: Mutex::new(nc_server),
node_list: Mutex::new(NCNodeList::new()),
job_done: Arc::new(AtomicBool::new(false)),
}
}
fn clone_job_done(&self) -> Arc<AtomicBool> {
debug!("ServerProcess::clone_job_done()");
self.job_done.clone()
}
fn handle_node(&self, mut stream: TcpStream) -> Result<(), NCError> {
debug!("ServerProcess::handle_node()");
let request: NCNodeMessage = nc_receive_data(&mut stream)?;
match request {
NCNodeMessage::Register => {
let node_id = self.node_list.lock()?.register_new_node();
let initial_data = self.nc_server.lock()?.initial_data()?;
info!("Registering new node: {}, {}", node_id, stream.peer_addr()?);
self.send_initial_data_message(node_id, initial_data, stream)?;
}
NCNodeMessage::NeedsData(node_id) => {
debug!("Node {} needs data to process", node_id);
let data_for_node = self.nc_server.lock()?.prepare_data_for_node(node_id)?;
match data_for_node {
NCJobStatus::Unfinished(data) => {
debug!("Send data to node");
self.send_job_status_unfinished_message(data, stream)?;
}
NCJobStatus::Waiting => {
debug!("Waiting for other nodes to finish");
self.send_job_status_waiting(stream)?;
}
NCJobStatus::Finished => {
debug!("Job is done, will exit handle_node()");
self.job_done.store(true, Ordering::Relaxed);
}
}
}
NCNodeMessage::HeartBeat(node_id) => {
debug!("Got heartbeat from node: {}", node_id);
self.node_list.lock()?.update_heartbeat(node_id);
}
NCNodeMessage::HasData(node_id, data) => {
debug!("Node {} has processed some data and we received the results", node_id);
self.nc_server.lock()?.process_data_from_node(node_id, &data)?;
}
NCNodeMessage::CheckHeartbeat => {
debug!("Message CheckHeartbeat received!");
let nodes = self.node_list.lock()?.check_heartbeat(self.heartbeat).collect::<Vec<NodeID>>();
self.nc_server.lock()?.heartbeat_timeout(nodes);
}
}
Ok(())
}
fn send_initial_data_message(&self, node_id: NodeID, initial_data: Option<Vec<u8>>, mut stream: TcpStream) -> Result<(), NCError> {
debug!("ServerProcess::send_initial_data_message()");
nc_send_data2(&NCServerMessage::InitialData(node_id, initial_data), &mut stream)
}
fn send_job_status_unfinished_message(&self, data: Vec<u8>, mut stream: TcpStream) -> Result<(), NCError> {
debug!("ServerProcess::send_job_status_unfinished_message()");
nc_send_data2(&NCServerMessage::JobStatus(NCJobStatus::Unfinished(data)), &mut stream)
}
fn send_job_status_waiting(&self, mut stream: TcpStream) -> Result<(), NCError> {
debug!("ServerProcess::send_job_status_waiting()");
nc_send_data2(&NCServerMessage::JobStatus(NCJobStatus::Waiting), &mut stream)
}
}